67 - Capstone Project Planning
Complexity: Moderate (M)
67.0 Introduction: Why This Matters for Data Engineering
In data engineering, planning a robust pipeline is critical for delivering scalable, secure, and testable solutions that meet Hijra Group’s needs for Sharia-compliant financial analytics. This capstone project planning chapter synthesizes skills from Phases 1–9 (Chapters 1–66), guiding you to design an end-to-end pipeline integrating data lakes, data warehouses, data marts, FastAPI, Airflow, Kubernetes with Helm Charts, and security best practices. The pipeline processes financial transaction data (data/transactions.csv from Appendix 1), ensuring compliance with Islamic Financial Services Board (IFSB) standards through data validation, PII protection, and observability.
This chapter builds on production deployment (Chapters 60–66) and prepares for implementation in Chapters 68–70. It leverages Checkpoint 6 (Chapter 44), where you mastered advanced processing and testing, now applied to pipeline planning. You’ll define a pipeline that ingests, transforms, and serves transaction data, leveraging type-annotated Python, verified by Pyright, and tested with pytest. The planning process emphasizes modular design, scalability, and stakeholder reporting, aligning with Hijra Group’s ecosystem using PostgreSQL, BigQuery, Airflow, dbt, FastAPI, and Kubernetes.
Data Engineering Workflow Context
The following diagram outlines the planned pipeline:
flowchart TD
A["Raw Data
transactions.csv"] --> B["Data Lake
GCS"]
B --> C["Ingestion
FastAPI"]
C --> D["Data Warehouse
BigQuery"]
D --> E["Transformation
dbt/Airflow"]
E --> F["Data Mart
BigQuery"]
F --> G["API/UI
FastAPI"]
G --> H["Stakeholder
Dashboard"]
I["Kubernetes
Helm Chart"] --> C
I --> E
I --> G
J["Security
PII Masking"] --> C
J --> D
J --> F
K["Observability
Logging"] --> C
K --> E
K --> G
classDef data fill:#f9f9f9,stroke:#333,stroke-width:2px
classDef process fill:#d0e0ff,stroke:#336,stroke-width:1px
classDef infra fill:#ddffdd,stroke:#363,stroke-width:1px
classDef security fill:#ffdddd,stroke:#933,stroke-width:1px
class A,B,D,F,H data
class C,E,G process
class I infra
class J,K securityBuilding On and Preparing For
- Building On:
- Phase 1 (Chapters 1–6): Python basics for data handling.
- Phase 2 (Chapters 7–11): Type safety and testing with Pyright and
pytest. - Phase 3 (Chapters 12–24): SQL, SQLite, PostgreSQL for database operations.
- Phase 4 (Chapters 25–30): BigQuery for cloud analytics.
- Phase 5 (Chapters 31–37): Data lakes, data marts, and ETL pipelines.
- Phase 6 (Chapters 38–44): Advanced NumPy, Pandas, and testing.
- Phase 7 (Chapters 45–51): Web integration with FastAPI and Django.
- Phase 8 (Chapters 52–59): Pipeline orchestration with Airflow and dbt.
- Phase 9 (Chapters 60–66): Kubernetes, Helm, security, and observability.
- Preparing For:
- Chapter 68: Implements ingestion and storage.
- Chapter 69: Builds data mart transformations with dbt and Airflow.
- Chapter 70: Integrates the pipeline with FastAPI and Helm for deployment.
What You’ll Learn
This chapter covers:
- Pipeline Requirements: Define functional and non-functional requirements.
- Architecture Design: Plan data flow, components, and technologies.
- Security and Compliance: Incorporate PII masking and encryption.
- Test Planning: Design
pytesttest cases for reliability. - Scalability: Address partitioning and resource optimization.
- Micro-Project: Create a pipeline plan for transaction data processing.
By the end, you’ll produce a detailed plan for a type-annotated, testable pipeline, documented in a YAML configuration and a Markdown plan, ready for implementation in Chapters 68–70. All code uses 4-space indentation per PEP 8, preferring spaces over tabs to avoid IndentationError.
Follow-Along Tips:
- Create
de-onboarding/data/and populate withtransactions.csvandconfig.yamlfrom Appendix 1. - Install libraries:
pip install pyyaml pytest fastapi apache-airflow dbt-core google-cloud-storage google-cloud-bigquery psycopg2-binary kubernetes. - Configure editor for 4-space indentation (VS Code: “Editor: Tab Size” = 4, “Editor: Insert Spaces” = true, “Editor: Detect Indentation” = false).
- Use print statements (e.g.,
print(config)) to debug YAML parsing. - Verify file paths with
ls data/(Unix/macOS) ordir data\(Windows). - Use UTF-8 encoding to avoid
UnicodeDecodeError.
67.1 Core Concepts
67.1.1 Pipeline Requirements
Define functional and non-functional requirements to ensure the pipeline meets Hijra Group’s needs:
- Functional Requirements:
- Ingestion: Load
transactions.csvinto a Google Cloud Storage (GCS) data lake via FastAPI. - Storage: Store raw data in BigQuery data warehouse.
- Transformation: Use dbt to create a data mart with aggregated metrics (e.g., sales by product).
- Orchestration: Schedule tasks with Airflow in Kubernetes.
- Serving: Provide a FastAPI endpoint for data mart queries and a UI dashboard.
- Ingestion: Load
- Non-Functional Requirements:
- Scalability: Handle 1M transactions (~240MB in BigQuery, assuming 5 columns).
- Security: Mask PII (e.g., transaction IDs) and encrypt database connections.
- Reliability: Achieve 99.9% uptime with retries and logging.
- Testability: 80% test coverage with
pytest. - Performance: Process 1M transactions in <5 minutes.
67.1.2 Architecture Design
Design a modular pipeline with clear data flow:
- Components:
- Data Lake: GCS bucket for raw
transactions.csv. - Ingestion: FastAPI endpoint to upload CSVs to GCS.
- Data Warehouse: BigQuery for raw and processed data.
- Transformation: dbt models for data mart creation.
- Orchestration: Airflow DAGs in Kubernetes.
- API/UI: FastAPI for querying data marts and serving a Jinja2 UI.
- Infrastructure: Kubernetes with Helm Charts for deployment.
- Data Lake: GCS bucket for raw
- Data Flow:
- Upload
transactions.csvto GCS via FastAPI. - Load data into BigQuery warehouse.
- Transform data into a data mart using dbt, orchestrated by Airflow.
- Serve data mart via FastAPI API and UI.
- Upload
- Technologies:
- Python 3.10+, FastAPI, Airflow, dbt, BigQuery, GCS, PostgreSQL, Kubernetes, Helm.
- Libraries:
pandas,google-cloud-storage,google-cloud-bigquery,psycopg2,pyyaml,pytest.
67.1.3 Security and Compliance
Incorporate security best practices (Chapter 65):
- PII Masking: Hash transaction IDs using SHA-256.
- Encryption: Use TLS for FastAPI and PostgreSQL connections.
- Authentication: OAuth2 for FastAPI endpoints.
- Compliance: Adhere to GDPR/PDPA for PII handling.
- Secrets Management: Store credentials in Helm secrets.
67.1.4 Test Planning
Design pytest test cases for reliability:
- Unit Tests: Validate FastAPI endpoints, dbt models, and Airflow DAGs.
- Integration Tests: Test GCS-to-BigQuery ingestion and data mart queries.
- Edge Cases: Handle empty CSVs, invalid data, and network failures.
- Performance Tests: Ensure processing time <5 minutes for 1M rows.
- Test Data: Use
transactions.csvand synthetic data for edge cases.
67.1.5 Scalability
Address scalability for large datasets, such as during peak transaction periods like Eid, when Hijra Group may process 5M transactions daily. This requires efficient partitioning and batching, as learned in Chapter 36’s optimization techniques. BigQuery queries on a 1M-row partitioned table typically complete in <10 seconds, as per Chapter 29. The following diagram shows BigQuery’s date-based partitioning for the raw_transactions table:
classDiagram
class raw_transactions {
transaction_id: STRING
product: STRING
price: FLOAT
quantity: INTEGER
date: DATE
}
class partition_2023_10_01 {
Rows for date = 2023-10-01
}
class partition_2023_10_02 {
Rows for date = 2023-10-02
}
raw_transactions -- partition_2023_10_01 : Partitioned by date
raw_transactions -- partition_2023_10_02 : Partitioned by date- Partitioning: Use date-based partitioning in BigQuery (e.g.,
datecolumn). - Resource Optimization: Set Kubernetes resource limits (e.g., 1 CPU, 2GB RAM per pod).
- Batch Processing: Process CSVs in chunks (e.g., 10,000 rows) using Pandas.
- Time Complexity:
- Ingestion: O(n) for n rows.
- Transformation: O(n) for dbt models.
- Querying: O(k) for k rows in data mart.
- Space Complexity:
- Data Lake: O(n) for n rows (~240MB for 1M rows).
- Data Warehouse: O(n) with partitioning.
- Data Mart: O(m) for m aggregated rows.
67.2 Micro-Project: Financial Transaction Pipeline Plan
Project Requirements
Create a detailed plan for a type-annotated, testable financial transaction pipeline, processing data/transactions.csv for Hijra Group’s analytics. The plan includes:
- YAML configuration for pipeline settings, split into core and advanced sections for clarity.
- Markdown document outlining requirements, architecture, security, test cases, and scalability.
- Python script to validate the YAML configuration with type annotations and
pytesttests. - All code uses 4-space indentation per PEP 8, preferring spaces over tabs.
Sample Input Files
data/transactions.csv (from Appendix 1):
transaction_id,product,price,quantity,date
T001,Halal Laptop,999.99,2,2023-10-01
T002,Halal Mouse,24.99,10,2023-10-02
T003,Halal Keyboard,49.99,5,2023-10-03
T004,,29.99,3,2023-10-04
T005,Monitor,199.99,2,2023-10-05data/config.yaml (from Appendix 1):
min_price: 10.0
max_quantity: 100
required_fields:
- product
- price
- quantity
product_prefix: 'Halal'
max_decimals: 2Data Processing Flow
flowchart TD
A["Input CSV
transactions.csv"] --> B["FastAPI Upload
GCS Data Lake"]
B --> C["BigQuery
Data Warehouse"]
C --> D["dbt Models
Data Mart"]
D --> E["Airflow DAG
Kubernetes"]
E --> F["FastAPI API/UI
Data Mart Queries"]
F --> G["Stakeholder
Dashboard"]
classDef data fill:#f9f9f9,stroke:#333,stroke-width:2px
classDef process fill:#d0e0ff,stroke:#336,stroke-width:1px
classDef infra fill:#ddffdd,stroke:#363,stroke-width:1px
class A,C,D,G data
class B,E,F process
class E infraAcceptance Criteria
- Go Criteria:
- YAML configuration defines pipeline settings (e.g., GCS bucket, BigQuery dataset).
- Markdown plan details requirements, architecture, security, test cases, and scalability.
- Python script validates YAML with type annotations, verified by Pyright.
pytesttests cover YAML parsing and validation.- Handles edge cases (e.g., missing fields, invalid data).
- Uses 4-space indentation per PEP 8, preferring spaces over tabs.
- No-Go Criteria:
- Missing YAML or Markdown plan.
- Invalid YAML syntax or missing required fields.
- Python script lacks type annotations or fails Pyright checks.
pytesttests fail or lack coverage.- Inconsistent indentation or tab/space mixing.
Common Pitfalls to Avoid
- YAML Syntax Errors:
- Problem: Incorrect indentation or missing colons.
- Solution: Validate with
yaml.safe_load. Printopen(config_path).read()to inspect. Use an online YAML linter (e.g., yamllint) to check syntax before runningpipeline_plan.py.
- Missing Configuration Fields:
- Problem: YAML lacks required fields (e.g.,
gcs_bucket). - Solution: Check fields in Python. Print
config.keys().
- Problem: YAML lacks required fields (e.g.,
- Pyright Errors:
- Problem: Missing type annotations cause Pyright errors.
- Solution: Run
pyright --verbose pipeline_plan.pyand check for missing imports (e.g.,typing.Dict). Add annotations likeDict[str, Any].
- Test Failures:
- Problem: Edge cases (e.g., invalid YAML) not handled.
- Solution: Test with invalid YAML, as in Exercise 5. Print test inputs to debug.
- Helm Chart Misconfiguration:
- Problem: Helm chart fails due to invalid values or missing namespace.
- Solution: Run
helm lintto validate chart. Ensure namespace exists withkubectl create namespace data-pipeline.
- Kubernetes Namespace Conflicts:
- Problem: Deployment fails if namespace is already in use.
- Solution: Check existing namespaces with
kubectl get namespacesand use a unique name (e.g.,data-pipeline).
- IndentationError:
- Problem: Mixed spaces/tabs in Python code.
- Solution: Use 4 spaces per PEP 8. Run
python -tt pipeline_plan.pyto detect issues.
How This Differs from Production
In production, the plan would include:
- Stakeholder Review: Formal approval from business and compliance teams, unlike the micro-project’s learner-driven plan.
- CI/CD Integration: Automated deployment with GitHub Actions (Chapter 66), compared to manual setup here.
- Monitoring: Prometheus/Grafana for observability (Chapter 66) to track Airflow DAG failures, unlike console logging.
- High Availability: Multi-region Kubernetes clusters, versus single-cluster planning.
- Documentation: Detailed runbooks for maintenance, beyond the Markdown plan.
Implementation
YAML Configuration
data/pipeline_config.yaml:
pipeline:
name: transaction_pipeline
version: 1.0.0
data_lake:
gcs_bucket: hijra-transactions
path: raw/transactions
data_warehouse:
project_id: hijra-project
dataset: transactions_warehouse
table: raw_transactions
data_mart:
dataset: transactions_mart
table: sales_by_product
ingestion:
fastapi_endpoint: /upload/transactions
batch_size: 10000
transformation:
dbt_project: dbt_transactions
models:
- sales_by_product
- sales_by_date
orchestration:
airflow_dag: transaction_etl
schedule: '0 0 * * *' # Daily at midnight
security:
pii_fields:
- transaction_id
encryption: tls
authentication: oauth2
kubernetes:
namespace: data-pipeline
helm_chart: transaction-pipeline
resources:
cpu: '1'
memory: '2Gi'Note: The YAML is divided into core sections (pipeline, data_lake, data_warehouse, data_mart, ingestion, transformation, orchestration, security) critical for pipeline functionality, and an advanced section (kubernetes) for deployment specifics. For planning, focus on core sections; advanced fields are optional and build on Chapters 61–64.
Markdown Plan
pipeline_plan.md:
# Financial Transaction Pipeline Plan
## Overview
This plan outlines a type-annotated, testable pipeline for processing financial transaction data at Hijra Group, using `data/transactions.csv`. The pipeline integrates a GCS data lake, BigQuery data warehouse, dbt data mart, Airflow orchestration, FastAPI API/UI, and Kubernetes with Helm Charts, ensuring scalability, security, and compliance.
## Requirements
### Functional
- **Ingestion**: Upload `transactions.csv` to GCS via FastAPI.
- **Storage**: Store raw data in BigQuery warehouse.
- **Transformation**: Create data mart with dbt (e.g., sales by product).
- **Orchestration**: Schedule ETL with Airflow in Kubernetes.
- **Serving**: Provide FastAPI API and Jinja2 UI for data mart queries.
### Non-Functional
- **Scalability**: Handle 1M transactions (~240MB).
- **Security**: Mask PII (transaction_id), use TLS, OAuth2.
- **Reliability**: 99.9% uptime with retries.
- **Testability**: 80% test coverage with `pytest`.
- **Performance**: Process 1M rows in <5 minutes.
## Architecture
### Components
- **Data Lake**: GCS bucket `hijra-transactions`.
- **Ingestion**: FastAPI endpoint `/upload/transactions`.
- **Data Warehouse**: BigQuery dataset `transactions_warehouse`.
- **Data Mart**: BigQuery dataset `transactions_mart`.
- **Transformation**: dbt models `sales_by_product`, `sales_by_date`.
- **Orchestration**: Airflow DAG `transaction_etl`.
- **API/UI**: FastAPI with Jinja2 UI.
- **Infrastructure**: Kubernetes namespace `data-pipeline`.
### Data Flow
1. Upload `transactions.csv` to GCS.
2. Load into BigQuery warehouse.
3. Transform into data mart with dbt.
4. Orchestrate with Airflow.
5. Serve via FastAPI API/UI.
### Technologies
- Python 3.10+, FastAPI, Airflow, dbt, BigQuery, GCS, PostgreSQL, Kubernetes, Helm.
- Libraries: `pandas`, `google-cloud-storage`, `google-cloud-bigquery`, `psycopg2`, `pyyaml`, `pytest`.
## Security and Compliance
- **PII Masking**: Hash `transaction_id` with SHA-256.
- **Encryption**: TLS for FastAPI/PostgreSQL.
- **Authentication**: OAuth2 for API.
- **Compliance**: GDPR/PDPA for PII.
- **Secrets**: Helm secrets for credentials.
## Test Plan
The following diagram illustrates the testing process:
```mermaid
flowchart TD
A["Unit Tests<br>FastAPI, dbt, Airflow"] --> B["Integration Tests<br>GCS-to-BigQuery, Queries"]
B --> C["Edge Case Tests<br>Empty CSV, Invalid Data"]
C --> D["Performance Tests<br>1M Rows < 5min"]
classDef test fill:#d0e0ff,stroke:#336,stroke-width:1px
class A,B,C,D test
```Unit Tests
- Validate FastAPI upload endpoint.
- Test dbt model outputs.
- Verify Airflow DAG scheduling.
Integration Tests
- Test GCS-to-BigQuery ingestion.
- Validate data mart query results.
Edge Cases
- Empty CSV: Return empty results.
- Invalid Data: Skip invalid rows.
- Network Failure: Retry uploads.
Performance Tests
- Process 1M rows in <5 minutes.
Scalability
The pipeline must handle peak loads, such as 5M transactions during Eid, using partitioning and batching (Chapter 36). BigQuery queries on a 1M-row partitioned table typically complete in <10 seconds, as per Chapter 29. Date-based partitioning in BigQuery and 10,000-row chunks in Pandas ensure efficiency. The following diagram shows BigQuery’s partitioning:
classDiagram
class raw_transactions {
transaction_id: STRING
product: STRING
price: FLOAT
quantity: INTEGER
date: DATE
}
class partition_2023_10_01 {
Rows for date = 2023-10-01
}
class partition_2023_10_02 {
Rows for date = 2023-10-02
}
raw_transactions -- partition_2023_10_01 : Partitioned by date
raw_transactions -- partition_2023_10_02 : Partitioned by date- Partitioning: Date-based in BigQuery (
datecolumn). - Resources: 1 CPU, 2GB RAM per Kubernetes pod.
- Batching: Process 10,000 rows per chunk.
- Complexity:
- Ingestion: O(n) for n rows.
- Transformation: O(n) for dbt.
- Querying: O(k) for k rows.
Implementation Phases
- Chapter 68: Ingest to GCS and BigQuery.
- Chapter 69: Transform with dbt and Airflow.
- Chapter 70: Integrate with FastAPI and Helm.
Risks and Mitigation
- Risk: YAML misconfiguration.
- Mitigation: Validate with Python script.
- Risk: PII exposure.
- Mitigation: Enforce masking and encryption.
- Risk: Performance bottlenecks.
- Mitigation: Partition data and optimize resources.
#### Python Script
`pipeline_plan.py`:
```python
# File: de-onboarding/pipeline_plan.py
from typing import Dict, Any, List # For type annotations
import yaml # For YAML parsing
import os # For file checks
def read_pipeline_config(config_path: str) -> Dict[str, Any]:
"""Read and validate pipeline YAML configuration."""
print(f"Opening config: {config_path}") # Debug
if not os.path.exists(config_path): # Check file existence
print(f"Config not found: {config_path}") # Log error
return {}
with open(config_path, "r") as file: # Open file
config = yaml.safe_load(file) # Parse YAML
print(f"Loaded config: {config}") # Debug
if not config: # Check for empty config
print("Empty config") # Log error
return {}
return config # Return config
def validate_pipeline_config(config: Dict[str, Any]) -> bool:
"""Validate pipeline configuration."""
required_sections = [
"pipeline", "data_lake", "data_warehouse", "data_mart",
"ingestion", "transformation", "orchestration", "security", "kubernetes"
]
required_pipeline_fields = ["name", "version"] # Pipeline metadata
required_security_fields = ["pii_fields", "encryption", "authentication"] # Security settings
print("Validating config...") # Debug
for section in required_sections: # Check required sections
if section not in config: # Missing section
print(f"Missing section: {section}") # Log error
return False
# Validate pipeline section
pipeline = config["pipeline"] # Get pipeline section
for field in required_pipeline_fields: # Check required fields
if field not in pipeline: # Missing field
print(f"Missing pipeline field: {field}") # Log error
return False
# Validate security section
security = config["security"] # Get security section
for field in required_security_fields: # Check required fields
if field not in security: # Missing field
print(f"Missing security field: {field}") # Log error
return False
if not isinstance(security["pii_fields"], list): # Check pii_fields type
print("Invalid pii_fields: must be a list") # Log error
return False
# Validate kubernetes section
kubernetes = config["kubernetes"] # Get kubernetes section
if "resources" in kubernetes and "cpu" in kubernetes["resources"]:
cpu = kubernetes["resources"]["cpu"] # Get CPU value
if not isinstance(cpu, str) or not cpu.isdigit(): # Check CPU format
print("Invalid kubernetes.resources.cpu: must be a numeric string (e.g., '1')") # Log error wistful thinking
return False
print("Config validation passed") # Confirm validation
return True # Return True if valid
def main() -> None:
"""Main function to validate pipeline plan."""
config_path = "data/pipeline_config.yaml" # Config path
config = read_pipeline_config(config_path) # Read config
is_valid = validate_pipeline_config(config) # Validate config
print(f"Pipeline plan is {'valid' if is_valid else 'invalid'}") # Output result
if __name__ == "__main__":
main() # Run mainPytest Tests
tests/test_pipeline_plan.py:
# File: de-onboarding/tests/test_pipeline_plan.py
from typing import Dict, Any # For type annotations
import pytest # For testing
import yaml # For YAML parsing
from pipeline_plan import read_pipeline_config, validate_pipeline_config # Import functions
@pytest.fixture
def valid_config() -> Dict[str, Any]:
"""Fixture for valid pipeline config."""
return {
"pipeline": {"name": "transaction_pipeline", "version": "1.0.0"},
"data_lake": {"gcs_bucket": "hijra-transactions"},
"data_warehouse": {"project_id": "hijra-project", "dataset": "transactions_warehouse"},
"data_mart": {"dataset": "transactions_mart"},
"ingestion": {"fastapi_endpoint": "/upload/transactions"},
"transformation": {"dbt_project": "dbt_transactions"},
"orchestration": {"airflow_dag": "transaction_etl"},
"security": {"pii_fields": ["transaction_id"], "encryption": "tls", "authentication": "oauth2"},
"kubernetes": {"namespace": "data-pipeline", "resources": {"cpu": "1", "memory": "2Gi"}}
}
@pytest.fixture
def invalid_config() -> Dict[str, Any]:
"""Fixture for invalid pipeline config."""
return {
"pipeline": {"name": "transaction_pipeline"}, # Missing version
"data_lake": {"gcs_bucket": "hijra-transactions"},
"security": {"pii_fields": "transaction_id"}, # Invalid pii_fields type
"kubernetes": {"namespace": "data-pipeline", "resources": {"cpu": "1GB"}} # Invalid CPU
}
def test_read_pipeline_config(tmp_path, valid_config: Dict[str, Any]) -> None:
"""Test reading pipeline config."""
config_path = tmp_path / "pipeline_config.yaml" # Temporary file
with open(config_path, "w") as f: # Write config
yaml.dump(valid_config, f) # Save YAML
config = read_pipeline_config(str(config_path)) # Read config
assert config == valid_config # Check equality
def test_read_missing_config(tmp_path) -> None:
"""Test reading missing config."""
config_path = tmp_path / "missing.yaml" # Non-existent file
config = read_pipeline_config(str(config_path)) # Read config
assert config == {} # Expect empty dict
def test_validate_valid_config(valid_config: Dict[str, Any]) -> None:
"""Test validating valid config."""
assert validate_pipeline_config(valid_config) # Expect True
def test_validate_invalid_config(invalid_config: Dict[str, Any]) -> None:
"""Test validating invalid config."""
assert not validate_pipeline_config(invalid_config) # Expect False
def test_validate_empty_config() -> None:
"""Test validating empty config."""
assert not validate_pipeline_config({}) # Expect FalseExpected Outputs
data/pipeline_config.yaml: YAML file with pipeline settings.
pipeline_plan.md: Markdown document detailing the plan.
Console Output (from pipeline_plan.py):
Opening config: data/pipeline_config.yaml
Loaded config: {'pipeline': {'name': 'transaction_pipeline', 'version': '1.0.0'}, ...}
Validating config...
Config validation passed
Pipeline plan is validTest Output (from pytest):
===================== test session starts ======================
collected 5 items
tests/test_pipeline_plan.py ..... [100%]
===================== 5 passed in 0.02s ======================How to Run and Test
Setup:
- Checklist:
- Create
de-onboarding/data/directory. - Save
transactions.csv,config.yaml,pipeline_config.yamlper Appendix 1. - Save
pipeline_plan.md,pipeline_plan.py,tests/test_pipeline_plan.py. - Install libraries:
pip install pyyaml pytest. - Create virtual environment:
python -m venv venv, activate (Windows:venv\Scripts\activate, Unix:source venv/bin/activate). - Verify Python 3.10+:
python --version. - Configure editor for 4-space indentation per PEP 8 (VS Code: “Editor: Tab Size” = 4, “Editor: Insert Spaces” = true, “Editor: Detect Indentation” = false).
- Create
- Troubleshooting:
- If
FileNotFoundError, check paths withls data/(Unix) ordir data\(Windows). - If
ModuleNotFoundError, install libraries or verifypipeline_plan.pypath. - If
IndentationError, use 4 spaces. Runpython -tt pipeline_plan.py. - If
yaml.YAMLError, printopen(config_path).read()or use an online YAML linter.
- If
- Checklist:
Run:
- Run:
python pipeline_plan.py. - Outputs: Console logs confirming validation.
- Run:
Test:
- Run:
pytest tests/test_pipeline_plan.py -v. - Verify all tests pass.
- Test Scenarios:
- Valid config: Passes validation.
- Missing file: Returns empty dict.
- Invalid config: Fails validation (e.g., invalid
cpu). - Empty config: Fails validation.
- Run:
67.3 Practice Exercises
Exercise 1: YAML Parser
Write a function to parse pipeline_config.yaml with type annotations, ensuring 4-space indentation per PEP 8.
Expected Output:
{'pipeline': {'name': 'transaction_pipeline', 'version': '1.0.0'}, ...}Follow-Along Instructions:
- Save as
de-onboarding/ex1_yaml_parser.py. - Run:
python ex1_yaml_parser.py. - Test:
print(parse_yaml("data/pipeline_config.yaml")).
Exercise 2: Config Validator
Write a function to validate pipeline_config.yaml for required fields, with type annotations and 4-space indentation per PEP 8.
Expected Output:
TrueFollow-Along Instructions:
- Save as
de-onboarding/ex2_validator.py. - Run:
python ex2_validator.py. - Test:
print(validate_config(parse_yaml("data/pipeline_config.yaml"))). - Test invalid YAML: Should return
False.
Exercise 3: Test Case Design
Write pytest tests for a YAML parser, covering valid and invalid cases, with 4-space indentation per PEP 8.
Expected Output:
collected 3 items
tests/test_yaml_parser.py ... [100%]Follow-Along Instructions:
- Save as
de-onboarding/tests/test_ex3_yaml_parser.py. - Run:
pytest tests/test_ex3_yaml_parser.py -v. - Verify tests pass.
Exercise 4: Scalability Analysis
Write a function to estimate processing time for n rows, assuming O(n) ingestion and transformation, with 4-space indentation per PEP 8. Explain scalability in a text file.
Expected Output:
Estimated time for 1000000 rows: 300.0 secondsFollow-Along Instructions:
- Save as
de-onboarding/ex4_scalability.py. - Save explanation to
ex4_scalability.txt. - Run:
python ex4_scalability.py. - Test:
print(estimate_time(1000000)).
Exercise 5: Debug Kubernetes YAML Bug
Fix buggy code that fails to validate kubernetes.resources.cpu in pipeline_config.yaml, ensuring 4-space indentation per PEP 8.
Hint: Check if cpu is a numeric string like ‘1’, not ‘1GB’. See validate_pipeline_config in pipeline_plan.py.
Buggy Code:
from typing import Dict, Any
import yaml
def validate_config(config: Dict[str, Any]) -> bool:
if "kubernetes" not in config:
return False
if "resources" not in config["kubernetes"]:
return False
return True # Bug: Doesn't check cpu formatExpected Output:
FalseFollow-Along Instructions:
- Save as
de-onboarding/ex5_debug.py. - Run:
python ex5_debug.pyto see bug. - Fix and re-run.
- Test with invalid
cpu(e.g.,1GB).
Exercise 6: BigQuery vs. PostgreSQL Trade-offs
Write a 100–150 word text file comparing BigQuery and PostgreSQL for the data warehouse, referencing Chapters 16–24 (PostgreSQL) and 25–30 (BigQuery), with 4-space indentation for any code snippets.
Expected Output (ex6_tradeoffs.txt):
BigQuery is chosen for the data warehouse due to its scalability for 1M+ transactions, using date-based partitioning (Chapter 29) to reduce query time to O(k) for k rows. PostgreSQL (Chapter 16) excels in transactional workloads with frequent updates, but its indexing (Chapter 22) is less efficient for analytics on large datasets. BigQuery’s serverless model simplifies scaling, while PostgreSQL requires manual sharding for similar loads. For Hijra Group’s analytics, BigQuery’s performance outweighs PostgreSQL’s flexibility.Follow-Along Instructions:
- Save as
de-onboarding/ex6_tradeoffs.txt. - Optionally, include a Python snippet to query both (e.g.,
psycopg2for PostgreSQL,google-cloud-bigqueryfor BigQuery). - Verify references to Chapters 16–30.
- Test: Read and confirm content is 100–150 words.
Exercise 7: Stakeholder Report Endpoint
Write a 50–100 word text file proposing a FastAPI endpoint for a sales summary report, referencing Chapter 53, using pipeline_config.yaml for configuration. Ensure 4-space indentation for any code snippets.
Expected Output (ex7_endpoint.txt):
Propose a FastAPI endpoint `/sales/summary` to deliver a sales report from the `transactions_mart.sales_by_product` table, as configured in `pipeline_config.yaml` (Chapter 53). The endpoint queries BigQuery, aggregates sales by product, and returns JSON. It uses OAuth2 for security and supports pagination for scalability. This meets Hijra Group’s stakeholder reporting needs.Follow-Along Instructions:
- Save as
de-onboarding/ex7_endpoint.txt. - Optionally, include a Python snippet for the endpoint (e.g., FastAPI route).
- Verify reference to Chapter 53.
- Test: Read and confirm content is 50–100 words.
67.4 Exercise Solutions
Solution to Exercise 1: YAML Parser
from typing import Dict, Any
import yaml
def parse_yaml(config_path: str) -> Dict[str, Any]:
"""Parse YAML configuration."""
with open(config_path, "r") as file:
config = yaml.safe_load(file)
print(f"Parsed config: {config}")
return config
# Test
print(parse_yaml("data/pipeline_config.yaml"))Solution to Exercise 2: Config Validator
from typing import Dict, Any
import yaml
def validate_config(config: Dict[str, Any]) -> bool:
"""Validate pipeline config."""
required_sections = ["pipeline", "security"]
for section in required_sections:
if section not in config:
print(f"Missing section: {section}")
return False
if not isinstance(config["security"]["pii_fields"], list):
print("Invalid pii_fields: must be a list")
return False
return True
# Test
with open("data/pipeline_config.yaml", "r") as f:
config = yaml.safe_load(f)
print(validate_config(config))Solution to Exercise 3: Test Case Design
from typing import Dict, Any
import pytest
import yaml
from ex1_yaml_parser import parse_yaml
@pytest.fixture
def valid_yaml(tmp_path):
config = {"pipeline": {"name": "test"}, "security": {"pii_fields": ["id"]}}
path = tmp_path / "config.yaml"
with open(path, "w") as f:
yaml.dump(config, f)
return str(path)
def test_parse_valid_yaml(valid_yaml: str) -> None:
config = parse_yaml(valid_yaml)
assert config["pipeline"]["name"] == "test"
def test_parse_invalid_yaml(tmp_path) -> None:
path = tmp_path / "invalid.yaml"
with open(path, "w") as f:
f.write("invalid: : yaml") # Invalid YAML
with pytest.raises(yaml.YAMLError):
parse_yaml(str(path))
def test_parse_missing_yaml(tmp_path) -> None:
path = tmp_path / "missing.yaml"
config = parse_yaml(str(path))
assert config == {}Solution to Exercise 4: Scalability Analysis
def estimate_time(num_rows: int) -> float:
"""Estimate processing time for n rows."""
ingestion_time_per_row = 0.0001 # 0.1ms per row
transformation_time_per_row = 0.0002 # 0.2ms per row
total_time = num_rows * (ingestion_time_per_row + transformation_time_per_row)
print(f"Estimated time for {num_rows} rows: {total_time} seconds")
return total_time
# Test
print(estimate_time(1000000))ex4_scalability.txt:
The pipeline uses O(n) ingestion and transformation, suitable for 1M rows. Partitioning by date in BigQuery reduces query time to O(k) for k rows in a partition. Kubernetes resource limits (1 CPU, 2GB RAM) ensure efficient pod scaling. Batching (10,000 rows) minimizes memory usage, critical for Hijra Group's large datasets during peak periods like Eid.Solution to Exercise 5: Debug Kubernetes YAML Bug
from typing import Dict, Any
import yaml
def validate_config(config: Dict[str, Any]) -> bool:
"""Validate pipeline config."""
if "kubernetes" not in config:
return False
if "resources" not in config["kubernetes"]:
return False
if "cpu" not in config["kubernetes"]["resources"]:
return False
cpu = config["kubernetes"]["resources"]["cpu"]
if not isinstance(cpu, str) or not cpu.isdigit():
print("Invalid kubernetes.resources.cpu: must be a numeric string (e.g., '1')")
return False
return True
# Test
invalid_config = {"kubernetes": {"resources": {"cpu": "1GB"}}}
print(validate_config(invalid_config)) # FalseSolution to Exercise 6: BigQuery vs. PostgreSQL Trade-offs
ex6_tradeoffs.txt:
BigQuery is chosen for the data warehouse due to its scalability for 1M+ transactions, using date-based partitioning (Chapter 29) to reduce query time to O(k) for k rows. PostgreSQL (Chapter 16) excels in transactional workloads with frequent updates, but its indexing (Chapter 22) is less efficient for analytics on large datasets. BigQuery’s serverless model simplifies scaling, while PostgreSQL requires manual sharding for similar loads. For Hijra Group’s analytics, BigQuery’s performance outweighs PostgreSQL’s flexibility.
# Optional Python snippet
from typing import List
from google.cloud import bigquery
import psycopg2
def query_bigquery(project: str, dataset: str, table: str) -> List[dict]:
client = bigquery.Client(project=project)
query = f"SELECT product, SUM(price * quantity) as total FROM `{dataset}.{table}` GROUP BY product"
return [dict(row) for row in client.query(query).result()]
def query_postgresql(conn_params: dict, table: str) -> List[dict]:
conn = psycopg2.connect(**conn_params)
cursor = conn.cursor()
cursor.execute(f"SELECT product, SUM(price * quantity) as total FROM {table} GROUP BY product")
return [{"product": row[0], "total": row[1]} for row in cursor.fetchall()]Solution to Exercise 7: Stakeholder Report Endpoint
ex7_endpoint.txt:
Propose a FastAPI endpoint `/sales/summary` to deliver a sales report from the `transactions_mart.sales_by_product` table, as configured in `pipeline_config.yaml` (Chapter 53). The endpoint queries BigQuery, aggregates sales by product, and returns JSON. It uses OAuth2 for security and supports pagination for scalability. This meets Hijra Group’s stakeholder reporting needs.
# Optional Python snippet
from fastapi import FastAPI, Depends
from typing import List, Dict
app = FastAPI()
@app.get("/sales/summary")
async def sales_summary() -> List[Dict[str, any]]:
# Query BigQuery (pseudo-code)
return [{"product": "Halal Laptop", "total": 1999.98}]67.5 Chapter Summary and Connection to Chapter 68
In this chapter, you’ve mastered:
- Pipeline Planning: Defined requirements and architecture for a transaction pipeline.
- Security: Planned PII masking and encryption for compliance.
- Testing: Designed
pytesttest cases for reliability. - Scalability: Addressed partitioning and resource optimization for peak loads like Eid.
- White-Space Sensitivity and PEP 8: Used 4-space indentation, preferring spaces over tabs.
The micro-project produced a YAML configuration, Markdown plan, and type-annotated Python script, validated with pytest, preparing for implementation. The modular design (e.g., validate_pipeline_config) supports extensibility, while the trade-off analysis (Exercise 6) and stakeholder endpoint proposal (Exercise 7) deepen architectural and reporting skills. Chapter 68 will challenge you to implement robust CSV ingestion with FastAPI, ensuring PII masking (Chapter 65) and handling large datasets (Chapter 40).