68 - Capstone Project Implementation Part 1
Complexity: Advanced (A)
68.0 Introduction: Why This Matters for Data Engineering
This chapter marks the first implementation phase of the capstone project, building on Chapter 67: Capstone Project Planning to create a robust, type-annotated ingestion pipeline for a financial transaction data lake and data warehouse, deployed on Kubernetes with Helm Charts. At Hijra Group, scalable ingestion pipelines handle millions of daily transactions, ensuring Sharia-compliant analytics with data integrity and security. This phase implements the ingestion and storage components planned in Chapter 67, processing data/transactions.csv (Appendix 1) using FastAPI, Pydantic, Google Cloud Storage (GCS), BigQuery, and PostgreSQL, with PII protection (SHA-256 hashing) and comprehensive pytest testing. The pipeline aligns with Chapter 67’s pipeline_config.yaml settings, ensuring modularity and scalability.
This chapter integrates skills from prior phases:
- Phase 1–2: Python fundamentals, type annotations, testing (Chapters 1–11).
- Phase 3–4: Database operations, BigQuery analytics (Chapters 12–30).
- Phase 5: Data lakes, ETL pipelines (Chapters 31–37).
- Phase 6–8: Advanced processing, web frameworks, orchestration (Chapters 38–59).
- Phase 9: Docker, Kubernetes, Helm, security (Chapters 60–66).
- Phase 10: Pipeline planning (Chapter 67).
The micro-project deploys a Kubernetes-based ingestion pipeline using Helm, meeting Chapter 67’s functional requirements (e.g., upload to GCS, store in BigQuery) and non-functional requirements (e.g., scalability, PII protection), preparing for transformation in Chapter 69. All code uses 4-space indentation per PEP 8, preferring spaces over tabs to avoid IndentationError.
Data Engineering Workflow Context
The ingestion pipeline implements the first stage of Chapter 67’s planned workflow:
flowchart TD
A["Input CSV
transactions.csv"] --> B["FastAPI Endpoint
Pydantic Validation"]
B -->|Invalid| J["Log Warning
Skip Record"]
B --> C["Hash PII
transaction_id"]
C --> D["Pandas DataFrame"]
D --> E["Write to GCS
Data Lake"]
D --> F["Write to BigQuery
Data Warehouse"]
D --> G["Write to PostgreSQL
Data Warehouse"]
E --> H["Log Success
Structured Logging"]
F --> H
G --> H
J --> H
H --> I["Kubernetes Deployment
Helm Chart"]
classDef data fill:#f9f9f9,stroke:#333,stroke-width:2px
classDef process fill:#d0e0ff,stroke:#336,stroke-width:1px
classDef storage fill:#ddffdd,stroke:#363,stroke-width:1px
classDef error fill:#ffdddd,stroke:#933,stroke-width:1px
class A,D data
class B,C,H,I process
class E,F,G storage
class J errorBuilding On and Preparing For
- Building On:
- Chapter 2: YAML parsing for
pipeline_config.yaml. - Chapter 13, 17, 26: Pydantic, PostgreSQL, BigQuery integration.
- Chapter 31: GCS data lake creation.
- Chapter 36: Batch processing optimization.
- Chapter 53: FastAPI for API development.
- Chapter 65: PII protection (hashing).
- Chapter 61–64: Kubernetes, Helm deployments.
- Chapter 67: Pipeline planning with
pipeline_config.yaml.
- Chapter 2: YAML parsing for
- Preparing For:
- Chapter 69: Data mart transformation with dbt and Airflow.
- Chapter 70: Full pipeline integration with FastAPI.
What You’ll Learn
This chapter covers:
- Type-Safe Ingestion: Loading
transactions.csvwith Pydantic validation. - PII Protection: Hashing sensitive data (e.g., transaction IDs).
- Storage Integration: Writing to GCS data lake and BigQuery/PostgreSQL data warehouse, configured via
pipeline_config.yaml. - Kubernetes Deployment: Deploying with Helm Charts in the
data-pipelinenamespace. - Testing: Unit and integration tests with
pytest, including storage output verification. - Logging and Observability: Structured logging with processing time metrics.
The micro-project implements a Kubernetes-based ingestion pipeline, processing data/transactions.csv, with outputs to GCS, BigQuery, and PostgreSQL, tested with pytest, and deployed via Helm, directly implementing Chapter 67’s ingestion phase.
Follow-Along Tips:
- Create
de-onboarding/data/and populate withtransactions.csvandpipeline_config.yamlper Appendix 1 and Chapter 67. - Install libraries:
pip install pandas pyyaml pydantic psycopg2-binary google-cloud-storage google-cloud-bigquery fastapi uvicorn pytest kubernetes. - Install Docker Desktop,
kubectl, and Helm (helm version). - Configure editor for 4-space indentation per PEP 8 (VS Code: “Editor: Tab Size” = 4, “Editor: Insert Spaces” = true, “Editor: Detect Indentation” = false).
- Use
printfor debugging (e.g.,print(df.head()),print(config)). - Verify file paths with
ls data/(Unix/macOS) ordir data\(Windows). - Ensure Google Cloud credentials are set up (e.g.,
gcloud auth application-default login).
68.1 Core Concepts
68.1.1 Type-Safe Ingestion with Pydantic
Pydantic ensures type-safe data validation, critical for Hijra Group’s data integrity. The Transaction model validates fields like transaction_id (string), price (float), and date (datetime), as planned in Chapter 67’s ingestion requirements.
from pydantic import BaseModel
from datetime import date
from typing import Optional
class Transaction(BaseModel):
transaction_id: str
product: Optional[str] # Allow null for missing products
price: float
quantity: int
date: date
# Example usage
data = {
"transaction_id": "T001",
"product": "Halal Laptop",
"price": 999.99,
"quantity": 2,
"date": "2023-10-01"
}
transaction = Transaction(**data)
print(transaction) # Debug: print validated transaction
# Output: transaction_id='T001' product='Halal Laptop' price=999.99 quantity=2 date=datetime.date(2023, 10, 1)Key Points:
- Time Complexity: O(1) per record validation.
- Space Complexity: O(1) per record.
- Implication: Ensures robust data validation, aligning with Chapter 67’s functional requirements.
68.1.2 PII Protection
Hashing transaction_id with SHA-256 protects sensitive data, implementing Chapter 67’s security.pii_fields requirement for GDPR/PDPA compliance.
import hashlib
from typing import Optional
def hash_pii(value: Optional[str]) -> Optional[str]:
"""Hash a PII value using SHA-256."""
if not value:
return None
return hashlib.sha256(value.encode()).hexdigest()
# Example
tid = "T001"
hashed = hash_pii(tid)
print(f"Original: {tid}, Hashed: {hashed}") # Debug: print original and hashed
# Output: Original: T001, Hashed: 4d967a...Key Points:
- Time Complexity: O(1) for fixed-length strings.
- Space Complexity: O(1) for 64-byte hash output.
- Implication: Secures PII, critical for Sharia-compliant fintech.
68.1.3 Storage Integration
The pipeline writes to GCS, BigQuery, and PostgreSQL, configured via pipeline_config.yaml, implementing Chapter 67’s data lake and warehouse plans. It supports batch processing (extensible to 10,000-row chunks, per Chapter 67’s scalability plan) and BigQuery date-based partitioning (O(k) query time for k rows in a partition, per Chapter 29), ensuring scalability for 1M+ transactions.
from google.cloud import storage, bigquery
import psycopg2
from typing import Dict
import pandas as pd
def write_to_gcs(df: pd.DataFrame, bucket_name: str, destination: str) -> None:
"""Write DataFrame to GCS."""
client = storage.Client()
bucket = client.bucket(bucket_name)
blob = bucket.blob(destination)
df.to_csv(blob.open("w"), index=False)
def write_to_bigquery(df: pd.DataFrame, dataset_id: str, table_id: str) -> None:
"""Write DataFrame to BigQuery."""
client = bigquery.Client()
table_ref = f"{dataset_id}.{table_id}"
df.to_gbq(table_ref, if_exists="append")
def write_to_postgres(df: pd.DataFrame, conn_params: Dict[str, str]) -> None:
"""Write DataFrame to PostgreSQL."""
conn = psycopg2.connect(**conn_params)
df.to_sql("transactions", conn, if_exists="append", index=False)
conn.close()Key Points:
- Time Complexity: O(n) for writing n rows to storage; batch processing extensible to O(n/b) for b-sized batches.
- Space Complexity: O(n) for DataFrame in memory (~24MB for 1M rows with 5 columns).
- Implication: Enables scalable storage, supporting Chapter 67’s 1M-row goal with partitioning and batching.
68.1.4 Kubernetes Deployment with Helm
The pipeline deploys in the data-pipeline namespace using a Helm Chart with 2 replicas, implementing Chapter 67’s kubernetes configuration for scalability.
# helm/templates/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: ingestion-pipeline
spec:
replicas: 2
selector:
matchLabels:
app: ingestion-pipeline
template:
metadata:
labels:
app: ingestion-pipeline
spec:
containers:
- name: ingestion
image: ingestion-pipeline:latest
env:
- name: BUCKET_NAME
value: '{{ .Values.bucketName }}'Key Points:
- Time Complexity: O(1) for pod scheduling; O(n) for processing n transactions.
- Space Complexity: O(1) for Helm metadata; O(n) for transaction data.
- Scalability: Replicas ensure high availability, aligning with Chapter 67’s non-functional requirements.
- Implication: Supports Hijra Group’s high-throughput pipelines.
68.2 Micro-Project: Kubernetes-Based Ingestion Pipeline
Project Requirements
Implement a type-annotated ingestion pipeline for data/transactions.csv, configured via pipeline_config.yaml, and deployed on Kubernetes with Helm. The pipeline:
- Loads and validates transactions with Pydantic.
- Hashes
transaction_idfor PII protection. - Writes to GCS data lake, BigQuery, and PostgreSQL data warehouse.
- Includes structured logging with processing time metrics.
- Is tested with
pytest(unit and integration tests, including storage outputs). - Is deployed in the
data-pipelinenamespace, implementing Chapter 67’s plan.
Sample Input Files
data/transactions.csv (Appendix 1):
transaction_id,product,price,quantity,date
T001,Halal Laptop,999.99,2,2023-10-01
T002,Halal Mouse,24.99,10,2023-10-02
T003,Halal Keyboard,49.99,5,2023-10-03
T004,,29.99,3,2023-10-04
T005,Monitor,199.99,2,2023-10-05data/pipeline_config.yaml (Chapter 67):
pipeline:
name: transaction_pipeline
version: 1.0.0
data_lake:
gcs_bucket: hijra-transactions
path: raw/transactions
data_warehouse:
project_id: hijra-project
dataset: transactions_warehouse
table: raw_transactions
data_mart:
dataset: transactions_mart
table: sales_by_product
ingestion:
fastapi_endpoint: /upload/transactions
batch_size: 10000
transformation:
dbt_project: dbt_transactions
models:
- sales_by_product
- sales_by_date
orchestration:
airflow_dag: transaction_etl
schedule: '0 0 * * *' # Daily at midnight
security:
pii_fields:
- transaction_id
encryption: tls
authentication: oauth2
kubernetes:
namespace: data-pipeline
helm_chart: transaction-pipeline
resources:
cpu: '1'
memory: '2Gi'Data Processing Flow
flowchart TD
A["Input CSV
transactions.csv"] --> B["FastAPI Endpoint
Pydantic Validation"]
B -->|Invalid| J["Log Warning
Skip Record"]
B --> C["Hash PII
transaction_id"]
C --> D["Pandas DataFrame"]
D --> E["Write to GCS
Data Lake"]
D --> F["Write to BigQuery
Data Warehouse"]
D --> G["Write to PostgreSQL
Data Warehouse"]
E --> H["Log Success
Structured Logging"]
F --> H
G --> H
J --> H
H --> I["Kubernetes Deployment
Helm Chart"]
classDef data fill:#f9f9f9,stroke:#333,stroke-width:2px
classDef process fill:#d0e0ff,stroke:#336,stroke-width:1px
classDef storage fill:#ddffdd,stroke:#363,stroke-width:1px
classDef error fill:#ffdddd,stroke:#933,stroke-width:1px
class A,D data
class B,C,H,I process
class E,F,G storage
class J errorAcceptance Criteria
- Go Criteria:
- Loads and validates
transactions.csvwith Pydantic, skipping invalid records. - Hashes
transaction_idusing SHA-256. - Writes validated data to GCS, BigQuery, and PostgreSQL, configured via
pipeline_config.yaml. - Includes structured logging with processing time metrics.
- Passes
pytestunit and integration tests, including storage output verification. - Deploys in
data-pipelinenamespace with Helm, 2 replicas. - Uses 4-space indentation per PEP 8, preferring spaces over tabs.
- Loads and validates
- No-Go Criteria:
- Fails to validate or process data.
- Missing PII protection, logging, or observability metrics.
- Fails tests or deployment.
- Uses inconsistent indentation or tabs.
Common Pitfalls to Avoid
- Pydantic Validation Errors:
- Problem: Invalid data types (e.g., non-numeric
price) causeValidationError. - Solution: Log invalid records and skip them. Print
df.dtypesorrow.to_dict()to debug.
- Problem: Invalid data types (e.g., non-numeric
- GCS/BigQuery Authentication:
- Problem: Missing credentials cause
DefaultCredentialsError. - Solution: Run
gcloud auth application-default login. Printos.environ.get("GOOGLE_APPLICATION_CREDENTIALS").
- Problem: Missing credentials cause
- PostgreSQL Connection:
- Problem: Connection refused due to incorrect
conn_params. - Solution: Verify
conn_paramsinpipeline_config.yaml. Printpsycopg2.connect(**conn_params)result.
- Problem: Connection refused due to incorrect
- Kubernetes Deployment:
- Problem: Pod crashes due to missing environment variables.
- Solution: Check logs with
kubectl logs <pod-name>. Print Helm values withhelm get values ingestion-pipeline.
- Helm Syntax Error:
- Problem: Helm fails with
error parsing values.yaml. - Solution: Validate YAML with
helm lintor printcat helm/values.yaml.
- Problem: Helm fails with
- Image Pull Error:
- Problem: Pod fails with
ImagePullBackOffdue to missing Docker image. - Solution: Verify image with
docker pull ingestion-pipeline:latest. Checkkubectl describe pod <pod-name>.
- Problem: Pod fails with
- IndentationError:
- Problem: Mixed spaces/tabs in Python files.
- Solution: Use 4 spaces per PEP 8. Run
python -tt ingestion/main.py.
How This Differs from Production
In production, this pipeline would include:
- Scalability: Auto-scaling pods (Chapter 69).
- Monitoring: Prometheus/Grafana metrics (Chapter 66).
- CI/CD: Automated deployments (Chapter 66).
- Error Handling: Retry logic for network failures (Chapter 40).
- High Availability: Multi-region storage setups.
Implementation
# File: de-onboarding/ingestion/models.py
from pydantic import BaseModel
from datetime import date
from typing import Optional
class Transaction(BaseModel):
transaction_id: str
product: Optional[str] # Allow null for missing products
price: float
quantity: int
date: date
# File: de-onboarding/ingestion/utils.py
import hashlib
import logging
import yaml
from typing import Optional, Dict, Any
import pandas as pd
from google.cloud import storage, bigquery
import psycopg2
# Configure structured logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
def load_config(config_path: str) -> Dict[str, Any]:
"""Load pipeline configuration from YAML."""
logger.info(f"Loading config: {config_path}")
with open(config_path, "r") as file:
config = yaml.safe_load(file)
logger.info("Config loaded successfully")
return config
def hash_pii(value: Optional[str]) -> Optional[str]:
"""Hash a PII value using SHA-256."""
if not value:
return None
return hashlib.sha256(value.encode()).hexdigest()
def write_to_gcs(df: pd.DataFrame, bucket_name: str, destination: str) -> None:
"""Write DataFrame to GCS."""
logger.info(f"Writing to GCS: {bucket_name}/{destination}")
client = storage.Client()
bucket = client.bucket(bucket_name)
blob = bucket.blob(destination)
df.to_csv(blob.open("w"), index=False)
logger.info("GCS write complete")
def write_to_bigquery(df: pd.DataFrame, dataset_id: str, table_id: str) -> None:
"""Write DataFrame to BigQuery."""
logger.info(f"Writing to BigQuery: {dataset_id}.{table_id}")
client = bigquery.Client()
table_ref = f"{dataset_id}.{table_id}"
df.to_gbq(table_ref, if_exists="append")
logger.info("BigQuery write complete")
def write_to_postgres(df: pd.DataFrame, conn_params: Dict[str, Any]) -> None:
"""Write DataFrame to PostgreSQL."""
logger.info("Writing to PostgreSQL")
conn = psycopg2.connect(**conn_params)
df.to_sql("transactions", conn, if_exists="append", index=False)
conn.close()
logger.info("PostgreSQL write complete")
# File: de-onboarding/ingestion/main.py
from fastapi import FastAPI, UploadFile, File
import pandas as pd
import logging
import time
from typing import List, Dict, Any
from .models import Transaction
from .utils import load_config, hash_pii, write_to_gcs, write_to_bigquery, write_to_postgres
app = FastAPI()
logger = logging.getLogger(__name__)
@app.post("/ingest")
async def ingest_csv(file: UploadFile = File(...)) -> Dict[str, Any]:
"""Ingest CSV file and process transactions."""
start_time = time.time() # Start timing for observability
logger.info(f"Received file: {file.filename}")
# Load pipeline configuration
config = load_config("data/pipeline_config.yaml")
bucket_name = config["data_lake"]["gcs_bucket"] # e.g., hijra-transactions
destination = f"{config['data_lake']['path']}/processed.csv" # e.g., raw/transactions/processed.csv
dataset_id = config["data_warehouse"]["dataset"] # e.g., transactions_warehouse
table_id = config["data_warehouse"]["table"] # e.g., raw_transactions
conn_params = {
"dbname": "mydb",
"user": "myuser",
"password": "mypassword",
"host": "localhost",
"port": "5432"
} # Placeholder; in production, read from config or secrets
# Read CSV
df = pd.read_csv(file.file)
logger.info(f"Loaded {len(df)} records")
# Validate and hash PII
transactions: List[Transaction] = []
for _, row in df.iterrows():
try:
transaction = Transaction(
transaction_id=row["transaction_id"],
product=row["product"],
price=row["price"],
quantity=row["quantity"],
date=row["date"]
)
transaction.transaction_id = hash_pii(transaction.transaction_id)
transactions.append(transaction)
except Exception as e:
logger.warning(f"Invalid record: {row.to_dict()}, Error: {str(e)}")
if not transactions:
logger.error("No valid transactions")
return {"status": "error", "message": "No valid transactions"}
# Convert to DataFrame
df_valid = pd.DataFrame([t.dict() for t in transactions])
logger.info(f"Validated {len(df_valid)} transactions")
# Write to storage
write_to_gcs(df_valid, bucket_name, destination)
write_to_bigquery(df_valid, dataset_id, table_id)
write_to_postgres(df_valid, conn_params)
# Log processing time
processing_time = time.time() - start_time
logger.info(f"Processed {len(df_valid)} records in {processing_time:.2f} seconds")
return {"status": "success", "processed": len(df_valid)}
# File: de-onboarding/tests/test_ingestion.py
import pytest
import pandas as pd
import logging
from unittest.mock import patch
from ingestion.main import app
from fastapi.testclient import TestClient
from ingestion.utils import hash_pii
client = TestClient(app)
def test_ingest_valid_csv():
"""Test ingestion with valid CSV."""
df = pd.DataFrame({
"transaction_id": ["T001"],
"product": ["Halal Laptop"],
"price": [999.99],
"quantity": [2],
"date": ["2023-10-01"]
})
csv_path = "data/test.csv"
df.to_csv(csv_path, index=False)
with open(csv_path, "rb") as f:
response = client.post("/ingest", files={"file": f})
assert response.status_code == 200
assert response.json()["status"] == "success"
assert response.json()["processed"] == 1
def test_ingest_invalid_csv():
"""Test ingestion with invalid CSV (missing product)."""
df = pd.DataFrame({
"transaction_id": ["T004"],
"product": [None],
"price": [29.99],
"quantity": [3],
"date": ["2023-10-04"]
})
csv_path = "data/test_invalid.csv"
df.to_csv(csv_path, index=False)
with open(csv_path, "rb") as f:
response = client.post("/ingest", files={"file": f})
assert response.status_code == 200
assert response.json()["status"] == "error"
assert response.json()["message"] == "No valid transactions"
def test_hash_pii():
"""Test PII hashing."""
tid = "T001"
hashed = hash_pii(tid)
assert hashed is not None
assert len(hashed) == 64 # SHA-256 length
def test_processing_time_logged(caplog):
"""Test processing time is logged."""
caplog.set_level(logging.INFO)
df = pd.DataFrame({
"transaction_id": ["T001"],
"product": ["Halal Laptop"],
"price": [999.99],
"quantity": [2],
"date": ["2023-10-01"]
})
csv_path = "data/test.csv"
df.to_csv(csv_path, index=False)
with open(csv_path, "rb") as f:
client.post("/ingest", files={"file": f})
assert any("Processed 1 records in" in record.message for record in caplog.records)
@patch("ingestion.utils.write_to_gcs")
@patch("ingestion.utils.write_to_bigquery")
@patch("ingestion.utils.write_to_postgres")
def test_storage_output(mock_postgres, mock_bigquery, mock_gcs):
"""Test data is written to storage."""
df = pd.DataFrame({
"transaction_id": ["T001"],
"product": ["Halal Laptop"],
"price": [999.99],
"quantity": [2],
"date": ["2023-10-01"]
})
csv_path = "data/test.csv"
df.to_csv(csv_path, index=False)
with open(csv_path, "rb") as f:
response = client.post("/ingest", files={"file": f})
assert response.status_code == 200
assert response.json()["status"] == "success"
mock_gcs.assert_called_once()
mock_bigquery.assert_called_once()
mock_postgres.assert_called_once()
# File: de-onboarding/Dockerfile
FROM python:3.10-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY ingestion/ .
COPY data/pipeline_config.yaml ./data/
CMD ["uvicorn", "ingestion.main:app", "--host", "0.0.0.0", "--port", "8000"]
# File: de-onboarding/requirements.txt
fastapi==0.95.0
uvicorn==0.20.0
pandas==1.5.3
pydantic==1.10.7
google-cloud-storage==2.7.0
google-cloud-bigquery==3.4.1
psycopg2-binary==2.9.5
pytest==7.2.2
pyyaml==6.0
logging==0.5.1.2
# File: de-onboarding/helm/Chart.yaml
apiVersion: v2
name: ingestion-pipeline
version: 0.1.0
# File: de-onboarding/helm/values.yaml
bucketName: hijra-transactions
replicaCount: 2
image:
repository: ingestion-pipeline
tag: latest
# File: de-onboarding/helm/templates/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: ingestion-pipeline
namespace: data-pipeline
spec:
replicas: {{ .Values.replicaCount }}
selector:
matchLabels:
app: ingestion-pipeline
template:
metadata:
labels:
app: ingestion-pipeline
spec:
containers:
- name: ingestion
image: {{ .Values.image.repository }}:{{ .Values.image.tag }}
env:
- name: BUCKET_NAME
value: {{ .Values.bucketName }}
ports:
- containerPort: 8000
# File: de-onboarding/helm/templates/service.yaml
apiVersion: v1
kind: Service
metadata:
name: ingestion-pipeline
namespace: data-pipeline
spec:
selector:
app: ingestion-pipeline
ports:
- protocol: TCP
port: 80
targetPort: 8000
type: ClusterIPHow to Run and Test
Setup:
Create
de-onboarding/data/and savetransactions.csvandpipeline_config.yamlper Appendix 1 and Chapter 67.Install dependencies:
pip install -r requirements.txt.Set up Google Cloud credentials:
gcloud auth application-default login.Create BigQuery table
transactions_warehouse.raw_transactionswith schema:- Fields:
transaction_id: STRING, product: STRING, price: FLOAT, quantity: INTEGER, date: DATE - Run in Google Cloud Console or:
bq mk --table transactions_warehouse.raw_transactions transaction_id:STRING,product:STRING,price:FLOAT,quantity:INTEGER,date:DATE
- Fields:
Install Docker Desktop,
kubectl, and Helm (helm version).Configure PostgreSQL database (
mydb, user:myuser, password:mypassword).Create the
transactionstable in PostgreSQL:# File: de-onboarding/setup_postgres.py import psycopg2 conn_params = { "dbname": "mydb", "user": "myuser", "password": "mypassword", "host": "localhost", "port": "5432" } conn = psycopg2.connect(**conn_params) cursor = conn.cursor() cursor.execute(""" CREATE TABLE IF NOT EXISTS transactions ( transaction_id TEXT, product TEXT, price REAL, quantity INTEGER, date DATE ) """) conn.commit() conn.close() print("Created transactions table in PostgreSQL")- Run:
python setup_postgres.py - Verify:
psql -h localhost -U myuser -d mydb -c "SELECT * FROM transactions;"
- Run:
Create Kubernetes namespace:
kubectl create namespace data-pipeline
Build and Run Locally:
- Build Docker image from
de-onboarding/to includeingestion/andpipeline_config.yaml:docker build -t ingestion-pipeline:latest . - Run:
docker run -p 8000:8000 ingestion-pipeline:latest - Test API:
curl -X POST -F "file=@data/transactions.csv" http://localhost:8000/ingest
- Build Docker image from
Run Tests:
- Run:
pytest tests/test_ingestion.py -v - Expected: All tests pass, confirming validation, PII hashing, edge-case handling, processing time logging, and storage outputs.
- Run:
Deploy to Kubernetes:
- Build and push Docker image:
docker push ingestion-pipeline:latest - Deploy Helm Chart:
helm install ingestion-pipeline helm/ --namespace data-pipeline- Expected output:
NAME: ingestion-pipeline LAST DEPLOYED: 2025-04-25 10:00:00 STATUS: deployed
- Expected output:
- Verify pods:
kubectl get pods --namespace data-pipeline- Expected output:
NAME READY STATUS RESTARTS AGE ingestion-pipeline-xxx 1/1 Running 0 1m
- Expected output:
- Test service:
kubectl port-forward svc/ingestion-pipeline 8000:80 --namespace data-pipeline
- Build and push Docker image:
Expected Outputs
- GCS:
gs://hijra-transactions/raw/transactions/processed.csvwith hashed transaction IDs. - BigQuery:
transactions_warehouse.raw_transactionstable with validated records. - PostgreSQL:
transactionstable with validated records. - Logs (example):
2025-04-25 10:00:00 - INFO - Loading config: data/pipeline_config.yaml 2025-04-25 10:00:00 - INFO - Config loaded successfully 2025-04-25 10:00:00 - INFO - Received file: transactions.csv 2025-04-25 10:00:01 - INFO - Loaded 5 records 2025-04-25 10:00:01 - WARNING - Invalid record: {'transaction_id': 'T004', 'product': nan, 'price': 29.99, 'quantity': 3, 'date': '2023-10-04'}, Error: ... 2025-04-25 10:00:02 - INFO - Validated 4 transactions 2025-04-25 10:00:03 - INFO - Writing to GCS: hijra-transactions/raw/transactions/processed.csv 2025-04-25 10:00:04 - INFO - Processed 4 records in 2.35 seconds - API Responses:
- Success:
{"status": "success", "processed": 4} - Error:
{"status": "error", "message": "No valid transactions"}
- Success:
68.3 Practice Exercises
Exercise 1: Pydantic Validation
Write a function to validate transactions using Pydantic, logging invalid records to invalid_records.log. Use 4-space indentation per PEP 8.
Sample Input:
data = [
{"transaction_id": "T001", "product": "Halal Laptop", "price": 999.99, "quantity": 2, "date": "2023-10-01"},
{"transaction_id": "T002", "product": None, "price": "invalid", "quantity": 3, "date": "2023-10-02"}
]Expected Output (invalid_records.log):
2025-04-25 10:00:00,123 - WARNING - Invalid record: {'transaction_id': 'T002', 'product': None, 'price': 'invalid', 'quantity': 3, 'date': '2023-10-02'}, Error: Invalid priceFollow-Along Instructions:
- Save as
de-onboarding/ex1_pydantic.py. - Configure editor for 4-space indentation per PEP 8.
- Run:
python ex1_pydantic.py. - How to Test:
- Add:
validate_transactions(data)and checkinvalid_records.log. - Verify:
cat invalid_records.log(Unix/macOS) ortype invalid_records.log(Windows). - Test with empty list: Should log nothing.
- Debugging Tips:
- Print
rowbefore validation to inspect data. - Check write permissions with
ls -l de-onboarding/(Unix/macOS) ordir de-onboarding\(Windows) ifPermissionErroroccurs. - Verify log file with
os.path.exists('invalid_records.log').
- Print
- Common Errors:
- ValidationError: Print
rowto check field types. - IndentationError: Run
python -tt ex1_pydantic.py.
- ValidationError: Print
- Add:
Exercise 2: PII Hashing
Implement a function to hash multiple PII fields (transaction_id, product), testing with pytest. Use 4-space indentation per PEP 8.
Sample Input:
data = {"transaction_id": "T001", "product": "Halal Laptop"}Expected Output:
{
"transaction_id": "4d967a...",
"product": "8ebfa..."
}Follow-Along Instructions:
- Save as
de-onboarding/ex2_hashing.py. - Create
de-onboarding/tests/test_ex2_hashing.pyfor tests. - Run:
pytest tests/test_ex2_hashing.py -v. - How to Test:
- Add:
print(hash_pii_fields(data)). - Verify: Hashed values are 64 characters (SHA-256).
- Test with
Nonevalues: Should returnNone. - Debugging Tip: Print
valuebefore hashing to check input. - Common Errors:
- TypeError: Ensure inputs are strings or
None. Printtype(value).
- TypeError: Ensure inputs are strings or
- Add:
Exercise 3: Storage Integration
Write a function to write a DataFrame to GCS and PostgreSQL, logging success/failure. Use 4-space indentation per PEP 8.
Sample Input:
df = pd.DataFrame({
"transaction_id": ["T001"],
"product": ["Halal Laptop"],
"price": [999.99],
"quantity": [2],
"date": ["2023-10-01"]
})Expected Output (logs):
2025-04-25 10:00:00,123 - INFO - Writing to GCS: hijra-transactions/test.csv
2025-04-25 10:00:00,124 - INFO - GCS write complete
2025-04-25 10:00:00,125 - INFO - Writing to PostgreSQL
2025-04-25 10:00:00,126 - INFO - PostgreSQL write completeFollow-Along Instructions:
- Save as
de-onboarding/ex3_storage.py. - Ensure Google Cloud and PostgreSQL credentials are set up.
- Run:
python ex3_storage.py. - How to Test:
- Verify:
gs://hijra-transactions/test.csvexists and PostgreSQLtransactionstable has 1 row. - Test with empty DataFrame: Should log but not write.
- Debugging Tips:
- Print
df.head()to check DataFrame. - Check write permissions with
ls -l de-onboarding/(Unix/macOS) ordir de-onboarding\(Windows) ifPermissionErroroccurs. - Verify log file with
os.path.exists('storage.log').
- Print
- Common Errors:
- DefaultCredentialsError: Check
gcloud authsetup. - IndentationError: Run
python -tt ex3_storage.py.
- DefaultCredentialsError: Check
- Verify:
Exercise 4: Helm Deployment
Modify the Helm Chart to add resource limits (CPU: 500m, memory: 512Mi). Use 4-space indentation in any Python scripts.
Sample Input (helm/templates/deployment.yaml):
# Original deployment.yaml (partial)
spec:
containers:
- name: ingestion
image: ingestion-pipeline:latestExpected Output (modified deployment.yaml):
spec:
containers:
- name: ingestion
image: ingestion-pipeline:latest
resources:
limits:
cpu: '500m'
memory: '512Mi'Follow-Along Instructions:
- Edit
de-onboarding/helm/templates/deployment.yaml. - Run:
helm upgrade ingestion-pipeline helm/ --namespace data-pipeline. - How to Test:
- Verify:
kubectl describe pod <pod-name> --namespace data-pipelineshows resource limits. - Test with invalid limits (e.g.,
cpu: "invalid"): Should fail with Helm error. - Debugging Tip: Run
helm lintto validate syntax. - Common Errors:
- SyntaxError: Validate YAML with
helm lint.
- SyntaxError: Validate YAML with
- Verify:
Exercise 5: Debug Kubernetes Failure
Fix a failing pod due to a missing BUCKET_NAME environment variable, using kubectl logs. Use 4-space indentation in any Python scripts.
Sample Input (helm/templates/deployment.yaml with missing env):
spec:
containers:
- name: ingestion
image: ingestion-pipeline:latest
# Missing BUCKET_NAMEExpected Output (fixed deployment.yaml):
spec:
containers:
- name: ingestion
image: ingestion-pipeline:latest
env:
- name: BUCKET_NAME
value: '{{ .Values.bucketName }}'Follow-Along Instructions:
- Edit
de-onboarding/helm/templates/deployment.yamlto removeBUCKET_NAME. - Run:
helm upgrade ingestion-pipeline helm/ --namespace data-pipeline. - Check:
kubectl logs <pod-name> --namespace data-pipeline(showsKeyError: BUCKET_NAME). - Fix by restoring
envsection. - Re-run:
helm upgrade ingestion-pipeline helm/ --namespace data-pipeline. - How to Test:
- Verify:
kubectl get pods --namespace data-pipelineshowsRunningstatus. - Test with incorrect
bucketName: Should log GCS error. - Debugging Tips:
- Run
kubectl get pods -l app=ingestion-pipeline --namespace data-pipelineto find pod names, then usekubectl logs <pod-name> --namespace data-pipeline. - Print
kubectl describe pod <pod-name> --namespace data-pipelineto inspect events.
- Run
- Common Errors:
- CrashLoopBackOff: Check logs with
kubectl logs.
- CrashLoopBackOff: Check logs with
- Verify:
Exercise 6: Conceptual Analysis of Storage Systems
Explain the trade-offs of storing data in GCS, BigQuery, and PostgreSQL for Hijra Group’s analytics pipeline, focusing on time/space complexity and scalability. Save the answer to de-onboarding/ex6_concepts.txt. No coding is required.
Sample Input:
- Question: “Why is GCS used for raw data, BigQuery for analytics, and PostgreSQL for transactional data in this pipeline? Discuss time/space complexity and scalability.”
Expected Output (ex6_concepts.txt):
GCS is used for raw data due to its low-cost, scalable storage for unstructured data. Writes are O(n) for n rows, with O(n) space complexity, ideal for large transaction CSVs. Scalability is near-infinite with Google’s infrastructure.
BigQuery is used for analytics due to its columnar storage and query optimization. Queries are O(n) with parallel execution, and space complexity is O(n) with compression. It scales for complex analytics but is costlier than GCS.
PostgreSQL is used for transactional data due to its ACID compliance and fast row-based operations. Writes are O(n) with indexing, and space complexity is O(n). It scales with sharding but is less suited for analytics than BigQuery.Follow-Along Instructions:
- Create
de-onboarding/ex6_concepts.txt. - Write the explanation based on Chapters 31 (GCS), 25–30 (BigQuery), and 17 (PostgreSQL).
- Verify:
cat ex6_concepts.txt(Unix/macOS) ortype ex6_concepts.txt(Windows). - How to Test:
- Ensure the explanation covers time/space complexity and scalability for each system.
- Compare with sample output for completeness.
- Debugging Tip: Review Chapter 31, 25–30, and 17 notes to confirm details.
- Common Errors:
- Incomplete Answer: Ensure all three systems (GCS, BigQuery, PostgreSQL) are addressed.
Exercise 7: Performance Benchmarking
Benchmark the ingestion pipeline’s processing time for a synthetic 10,000-row CSV, estimating scalability for 1M rows. Save results to benchmark.log and analysis to ex7_scalability.txt. Use 4-space indentation per PEP 8.
Sample Input:
# Generate synthetic CSV
import pandas as pd
df = pd.DataFrame({
"transaction_id": [f"T{i:03d}" for i in range(10000)],
"product": ["Halal Laptop"] * 10000,
"price": [999.99] * 10000,
"quantity": [2] * 10000,
"date": ["2023-10-01"] * 10000
})
df.to_csv("data/synthetic.csv", index=False)Expected Output:
benchmark.log:2025-04-25 10:00:00,123 - INFO - Processed 10000 records in 5.23 secondsex7_scalability.txt:Processing 10,000 rows took 5.23 seconds. Estimating for 1M rows (100x scale): 5.23 * 100 = 523 seconds (~8.7 minutes). This exceeds Chapter 67’s <5-minute goal, suggesting optimization (e.g., 10,000-row batching, Chapter 36) for production.
Follow-Along Instructions:
- Save as
de-onboarding/ex7_benchmark.py. - Create
data/synthetic.csvusing the sample input. - Run:
python ex7_benchmark.py. - How to Test:
- Verify:
cat benchmark.log(Unix/macOS) ortype benchmark.log(Windows). - Check
ex7_scalability.txtfor estimated time and optimization suggestion. - Test with smaller CSV (e.g., 100 rows) to confirm scaling.
- Debugging Tips:
- Print
df.shapeto verify CSV size. - Check write permissions with
ls -l de-onboarding/(Unix/macOS) ordir de-onboarding\(Windows) ifPermissionErroroccurs.
- Print
- Common Errors:
- FileNotFoundError: Ensure
data/synthetic.csvexists. - IndentationError: Run
python -tt ex7_benchmark.py.
- FileNotFoundError: Ensure
- Verify:
68.4 Chapter Summary and Connection to Chapter 69
This chapter implemented a type-annotated ingestion pipeline, processing data/transactions.csv with Pydantic validation, PII protection (SHA-256 hashing), and storage in GCS, BigQuery, and PostgreSQL, configured via pipeline_config.yaml. Deployed on Kubernetes with Helm in the data-pipeline namespace, it ensures scalability (2 replicas), reliability (tested with pytest), and observability (structured logs with processing time metrics). Key complexities include O(1) for PII hashing, O(n) for DataFrame operations, and O(1) for pod scheduling, aligning with Chapter 67’s scalability plan.
Chapter 69 extends this pipeline with data mart transformations using dbt and Airflow, building on the storage layer to produce optimized analytics tables (e.g., sales_by_product), as planned in Chapter 67, continuing the capstone’s end-to-end pipeline development.