70 - Capstone Project Implementation Part 3
Complexity: Advanced (A)
70.0 Introduction: Why This Matters for Data Engineering
Building on Chapter 69’s transformation pipeline, which processed data/transactions.csv into data marts (fact_sales in BigQuery, pg_sales in PostgreSQL) with monthly summaries (summary_sales.json, summary_sales.csv) using dbt and Airflow, this chapter completes the end-to-end pipeline by integrating a FastAPI API and UI for stakeholder access. For Hijra Group’s Sharia-compliant fintech analytics, this enables real-time insights, such as querying top Halal products (/sales/trends?month=2023-10) or visualizing sales trends via a dashboard, supporting decisions like inventory optimization or marketing campaigns. Deployed on Kubernetes with Helm Charts in the data-pipeline namespace, the pipeline ensures scalability, OAuth2 security, and observability, with PII masking (SHA-256) for GDPR/PDPA compliance.
This chapter builds on:
- Chapter 17: PostgreSQL integration with
psycopg2. - Chapter 52–53: Django/FastAPI for APIs/UI.
- Chapter 56–58: Airflow for orchestration.
- Chapter 60–64: Kubernetes and Helm deployments.
- Chapter 65: Security (OAuth2, PII masking).
- Chapter 68: Ingestion pipeline to GCS, BigQuery, PostgreSQL.
- Chapter 69: Data marts (
fact_sales,pg_sales) and summaries (summary_sales.json/csv).
All Python code uses type annotations verified by Pyright (Chapter 7) and is tested with pytest (Chapter 9), adhering to PEP 8’s 4-space indentation to avoid IndentationError. The micro-project extends Chapter 69’s data marts by serving them through a FastAPI API/UI, querying both BigQuery and PostgreSQL, deployed via Helm, with enhanced observability and security, completing the capstone pipeline.
Data Engineering Workflow Context
This diagram illustrates the integrated pipeline, extending Chapter 69:
flowchart TD
A["Data Lake
GCS: transactions.csv"] --> B["Ingestion
Chapter 68"]
B --> C["Data Warehouse
BigQuery/PostgreSQL"]
C --> D["Transformation
dbt/Airflow (Ch. 69)"]
D --> E["Data Mart
fact_sales/pg_sales"]
E --> F["API/UI
FastAPI/Jinja2"]
F --> G["Stakeholders
Dashboards/Reports"]
H["Kubernetes
Helm Chart"] -->|Deploys| D
H -->|Deploys| F
I["Security
OAuth2, PII Masking"] --> F
J["Observability
Logging"] --> D
J --> F
classDef data fill:#f9f9f9,stroke:#333,stroke-width:2px
classDef process fill:#d0e0ff,stroke:#336,stroke-width:1px
classDef deploy fill:#ffddcc,stroke:#933,stroke-width:1px
classDef security fill:#ccffcc,stroke:#363,stroke-width:1px
classDef observe fill:#ffccff,stroke:#636,stroke-width:1px
class A,C,E,G data
class B,D,F process
class H deploy
class I security
class J observeBuilding On and Preparing For
- Building On:
- Chapter 17: PostgreSQL integration with
psycopg2. - Chapter 52–53: FastAPI for APIs/UI.
- Chapter 56–58: Airflow for orchestration.
- Chapter 60–64: Docker, Kubernetes, Helm deployments.
- Chapter 65: Security (OAuth2, PII masking).
- Chapter 68: Ingestion pipeline to GCS, BigQuery, PostgreSQL.
- Chapter 69: Data marts (
fact_sales,pg_sales) and summaries (summary_sales.json/csv).
- Chapter 17: PostgreSQL integration with
- Preparing For:
- Production deployment at Hijra Group, integrating real-time transaction APIs.
- Future learning in advanced tools (e.g., Spark, CI/CD).
What You’ll Learn
This chapter covers:
- Pipeline Integration: Connecting BigQuery and PostgreSQL data marts to FastAPI API/UI.
- FastAPI Application: Type-safe API/UI with Jinja2 templates.
- Helm Deployment: Kubernetes deployment with resource limits and secrets.
- Security: OAuth2 authentication, PII masking (SHA-256).
- Observability: Logging with response times.
- Testing: Pytest for unit, integration, and performance tests.
The micro-project integrates Chapter 69’s data marts with a FastAPI API/UI, querying summary_sales (BigQuery) and pg_sales (PostgreSQL), deployed via Helm in the data-pipeline namespace, using data/transactions.csv and pipeline_config.yaml (Appendix 1, Chapter 68).
Follow-Along Tips:
- Create
de-onboarding/data/withtransactions.csvandpipeline_config.yaml. - Install libraries:
pip install fastapi uvicorn google-cloud-bigquery apache-airflow pytest pyyaml pydantic passlib[bcrypt] python-jose[cryptography] jinja2 psycopg2-binary dbt-core dbt-postgres pandas-gbq==0.17.9 kubernetes. - Ensure Docker Desktop and Kubernetes (e.g., Minikube) are installed.
- Use 4 spaces per PEP 8 (VS Code: “Editor: Tab Size” = 4, “Editor: Insert Spaces” = true, “Editor: Detect Indentation” = false).
- Run
python -tt script.pyto detect tab/space mixing. - Debug with
print(response.json())for API calls,kubectl logsfor Kubernetes,dbt debugfor dbt.
70.1 Setup Instructions
To ensure continuity with Chapter 69 and a smooth setup, follow these steps:
Create Data Directory:
mkdir -p de-onboarding/data- Copy
transactions.csvandpipeline_config.yamlfrom Appendix 1/Chapter 68 tode-onboarding/data/.
- Copy
Install Dependencies:
python -m venv venv source venv/bin/activate # Unix/macOS venv\Scripts\activate # Windows pip install -r requirements.txtSetup GCP:
- Install Google Cloud SDK: https://cloud.google.com/sdk/docs/install.
- Authenticate:
gcloud auth application-default login. - Set environment variable:
export GOOGLE_APPLICATION_CREDENTIALS=/path/to/credentials.json. - Verify BigQuery dataset (
hijra-project.transactions_mart) from Chapter 69.
Setup PostgreSQL:
- Ensure PostgreSQL is running (
mydb, host:localhost, port:5432, user:myuser, password:mypassword). - Verify schema:
psql -h localhost -U myuser -d mydb -c 'SELECT * FROM pg_sales LIMIT 1;'.
- Ensure PostgreSQL is running (
Setup Kubernetes:
- Install Minikube:
minikube start. - Install Helm: https://helm.sh/docs/intro/install/.
- Create namespace:
kubectl create namespace data-pipeline. - Create secrets:
kubectl create secret generic bigquery-credentials --from-file=credentials.json=/path/to/credentials.json --namespace data-pipeline kubectl create secret generic postgres-credentials --from-literal=user=myuser --from-literal=password=mypassword --namespace data-pipeline - Verify:
kubectl versionandhelm version.
- Install Minikube:
Setup dbt:
- Use Chapter 69’s
de-onboarding/dbt_project/and~/.dbt/profiles.yml:dbt_transactions: target: bigquery outputs: bigquery: type: bigquery method: oauth project: hijra-project dataset: transactions_mart threads: 4 postgres: type: postgres host: localhost user: myuser password: mypassword port: 5432 dbname: mydb schema: public threads: 4 - Verify:
dbt debug --project-dir de-onboarding/dbt_project.
- Use Chapter 69’s
Verify Airflow:
- Ensure Chapter 69’s Airflow is running:
airflow webserverandairflow scheduler. - Check DAG:
airflow dags list | grep transform_sales_mart. - Verify DAG state:
airflow dags state transform_sales_mart 2023-10-01.
- Ensure Chapter 69’s Airflow is running:
Verify Setup:
- Check files:
ls de-onboarding/data/(Unix/macOS) ordir de-onboarding\data\(Windows). - Test BigQuery:
python -c "from google.cloud import bigquery; print(bigquery.Client())". - Test PostgreSQL:
psql -h localhost -U myuser -d mydb -c 'SELECT 1;'. - Test dbt:
dbt run --project-dir de-onboarding/dbt_project --target bigquery.
- Check files:
Troubleshooting:
- FileNotFoundError: Print paths with
print(os.path.abspath('data/transactions.csv')). - GCP Errors: Verify credentials with
cat $GOOGLE_APPLICATION_CREDENTIALS. - PostgreSQL Errors: Check connection with
psql -h localhost -U myuser -d mydb. - Kubernetes Issues: Check cluster status with
minikube status. - dbt Errors: Verify
profiles.ymlwithdbt debug. - Airflow Errors: Check logs with
airflow logs.
70.2 Pipeline Integration
This chapter integrates Chapter 69’s data marts (fact_sales, pg_sales, summary_sales) with a FastAPI API/UI, continuing the pipeline from ingestion (Chapter 68) and transformation (Chapter 69). The API queries BigQuery’s summary_sales for monthly analytics and PostgreSQL’s pg_sales for operational insights (e.g., real-time inventory checks). The UI renders a dashboard using Jinja2. Airflow continues to orchestrate dbt transformations, and Helm deploys the FastAPI app in the data-pipeline namespace.
70.2.1 Integration Workflow
- Ingestion (Chapter 68): Loads
transactions.csvto GCS and raw tables (raw_transactions,transactions). - Transformation (Chapter 69): dbt creates
fact_sales(BigQuery),pg_sales(PostgreSQL), andsummary_sales(view). - API/UI (Chapter 70): FastAPI serves
summary_sales(BigQuery) andpg_sales(PostgreSQL) data, with a Jinja2 dashboard. - Orchestration: Airflow schedules dbt tasks (daily, 3 retries).
- Deployment: Helm deploys FastAPI and Airflow in Kubernetes.
Time Complexity:
- Ingestion/Transformation: O(n) for n rows.
- API Queries: O(k) for k mart rows.
- Airflow Scheduling: O(t) for t tasks.
- Helm Rendering: O(1).
Space Complexity:
- Data Lake/Warehouse: O(n) for n rows.
- Mart: O(m) for m aggregated rows.
- API Response: O(k) for k rows.
70.3 FastAPI Application
FastAPI provides a type-safe API and UI, querying BigQuery (summary_sales) and PostgreSQL (pg_sales), with OAuth2 for security and SHA-256 PII masking.
70.3.1 API Setup
# File: de-onboarding/app/main.py
from typing import List, Dict, Any
from fastapi import FastAPI, Depends, HTTPException, status, Request
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from fastapi.templating import Jinja2Templates
from fastapi.responses import HTMLResponse
from pydantic import BaseModel, Field
from google.cloud import bigquery
from passlib.context import CryptContext
from jose import JWTError, jwt
from datetime import datetime, timedelta
import yaml
import hashlib
import logging
import time
import re
import psycopg2
import utils # From Chapter 69
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI(title="Transaction Pipeline API")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
templates = Jinja2Templates(directory="templates")
# Jinja2 filter for PII masking, complementing Chapter 69's transaction_id hashing
def hash_product(value: str) -> str:
"""Hash product name for PII protection, aligning with Chapter 69's SHA-256 hashing."""
if not value:
return ""
return hashlib.sha256(value.encode()).hexdigest()[:8] # Shortened for display, consistent with Chapter 65
templates.env.filters["hash_product"] = hash_product
# Security setup
config = utils.read_config("data/pipeline_config.yaml")
SECRET_KEY = config.get("security", {}).get("jwt_secret", "your-secret-key") # From pipeline_config.yaml
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# PostgreSQL connection
pg_config = config["data_warehouse"]["postgres"]
pg_conn_params = {
"dbname": pg_config["dbname"],
"host": pg_config["host"],
"port": pg_config["port"],
"user": pg_config.get("user", utils.read_k8s_secret(pg_config["secretName"], "data-pipeline")["user"]),
"password": pg_config.get("password", utils.read_k8s_secret(pg_config["secretName"], "data-pipeline")["password"])
}
class User(BaseModel):
username: str
hashed_password: str
class Token(BaseModel):
access_token: str
token_type: str
class TransactionSummary(BaseModel):
transaction_id: str
product: str
sale_month: str
total_amount: float
avg_price: float
transaction_count: int
class QueryParams(BaseModel):
product_prefix: str = Field(default="Halal", pattern=r"^[a-zA-Z0-9]+$")
min_amount: float = Field(default=0.0, ge=0.0)
max_quantity: int = Field(default=1000, ge=0)
sale_month: str = Field(default="2023-10-01", pattern=r"^\d{4}-\d{2}-\d{2}$")
# Mock user database (in production, use PostgreSQL)
users_db = {
"admin": {
"username": "admin",
"hashed_password": pwd_context.hash("password123") # Hashed password
}
}
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify password against hashed password."""
return pwd_context.verify(plain_password, hashed_password)
def create_access_token(data: Dict[str, Any]) -> str:
"""Create JWT access token."""
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
async def get_current_user(token: str = Depends(oauth2_scheme)) -> Dict[str, Any]:
"""Get current user from JWT token."""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token")
user = users_db.get(username)
if user is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found")
return user
except JWTError:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token")
@app.post("/token", response_model=Token)
async def login(form_data: OAuth2PasswordRequestForm = Depends()) -> Token:
"""Authenticate user and return JWT token."""
user = users_db.get(form_data.username)
if not user or not verify_password(form_data.password, user["hashed_password"]):
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect credentials")
access_token = create_access_token(data={"sub": user["username"]})
logger.info(f"User {user['username']} logged in")
return Token(access_token=access_token, token_type="bearer")
# BigQuery client
bq_client = bigquery.Client()
@app.get("/sales/trends", response_model=List[TransactionSummary])
async def get_sales_trends(
params: QueryParams = Depends(),
current_user: Dict[str, Any] = Depends(get_current_user)
) -> List[TransactionSummary]:
"""Get sales trends from BigQuery summary_sales view."""
start_time = time.time()
logger.info(f"User {current_user['username']} queried trends: product_prefix={params.product_prefix}, min_amount={params.min_amount}, max_quantity={params.max_quantity}, sale_month={params.sale_month}")
try:
query = """
SELECT
transaction_id,
product,
sale_month,
total_amount,
avg_price,
transaction_count
FROM `hijra-project.transactions_mart.summary_sales`
WHERE product LIKE @prefix || '%'
AND sale_month = @sale_month
"""
job_config = bigquery.QueryJobConfig(
query_parameters=[
{"name": "prefix", "parameterType": {"type": "STRING"}, "parameterValue": {"value": params.product_prefix}},
{"name": "sale_month", "parameterType": {"type": "STRING"}, "parameterValue": {"value": params.sale_month}}
]
)
logger.info(f"Executing BigQuery summary query")
query_job = bq_client.query(query, job_config=job_config)
results = [
TransactionSummary(
transaction_id=row["transaction_id"], # Hashed from Chapter 69
product=row["product"],
sale_month=row["sale_month"],
total_amount=row["total_amount"],
avg_price=row["avg_price"],
transaction_count=row["transaction_count"]
) for row in query_job
]
logger.info(f"Retrieved {len(results)} sales summaries")
return results
except Exception as e:
logger.error(f"BigQuery query failed: {str(e)}")
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Query error: {str(e)}")
finally:
duration = time.time() - start_time
logger.info(f"API response time: {duration:.2f} seconds")
@app.get("/sales/pg-trends", response_model=List[TransactionSummary])
async def get_pg_sales_trends(
params: QueryParams = Depends(),
current_user: Dict[str, Any] = Depends(get_current_user)
) -> List[TransactionSummary]:
"""Get sales trends from PostgreSQL pg_sales table."""
start_time = time.time()
logger.info(f"User {current_user['username']} queried PostgreSQL trends: product_prefix={params.product_prefix}, min_amount={params.min_amount}, max_quantity={params.max_quantity}, sale_month={params.sale_month}")
try:
conn = psycopg2.connect(**pg_conn_params)
cursor = conn.cursor()
query = """
SELECT
transaction_id,
product,
TO_CHAR(sale_date, 'YYYY-MM-DD') AS sale_month,
SUM(amount) AS total_amount,
AVG(price) AS avg_price,
COUNT(DISTINCT transaction_id) AS transaction_count
FROM pg_sales
WHERE product LIKE %s || '%%'
AND TO_CHAR(sale_date, 'YYYY-MM-DD') = %s
GROUP BY transaction_id, product, TO_CHAR(sale_date, 'YYYY-MM-DD')
HAVING SUM(amount) >= %s AND SUM(quantity) <= %s
"""
cursor.execute(query, (params.product_prefix, params.sale_month, params.min_amount, params.max_quantity))
results = [
TransactionSummary(
transaction_id=row[0],
product=row[1],
sale_month=row[2],
total_amount=row[3],
avg_price=row[4],
transaction_count=row[5]
) for row in cursor.fetchall()
]
cursor.close()
conn.close()
logger.info(f"Retrieved {len(results)} PostgreSQL sales summaries")
return results
except Exception as e:
logger.error(f"PostgreSQL query failed: {str(e)}")
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Query error: {str(e)}")
finally:
duration = time.time() - start_time
logger.info(f"API response time: {duration:.2f} seconds")
@app.get("/", response_class=HTMLResponse)
async def dashboard(request: Request, current_user: Dict[str, Any] = Depends(get_current_user)) -> HTMLResponse:
"""Render sales trends dashboard."""
start_time = time.time()
summaries = await get_sales_trends(QueryParams(product_prefix="Halal", min_amount=0.0, max_quantity=1000, sale_month="2023-10-01"), current_user)
logger.info("Rendering dashboard")
response = templates.TemplateResponse(
"dashboard.html",
{"request": request, "summaries": summaries}
)
duration = time.time() - start_time
logger.info(f"Dashboard render time: {duration:.2f} seconds")
return response70.3.2 Jinja2 Template
<!-- File: de-onboarding/app/templates/dashboard.html -->
<!DOCTYPE html>
<html>
<head>
<title>Sales Trends Dashboard</title>
<style>
body {
font-family: Arial, sans-serif;
margin: 20px;
}
table {
border-collapse: collapse;
width: 100%;
}
th,
td {
border: 1px solid #ddd;
padding: 8px;
text-align: left;
}
th {
background-color: #f2f2f2;
}
</style>
</head>
<body>
<h1>Sales Trends Dashboard</h1>
<table>
<tr>
<th>Product (Hashed)</th>
<th>Sale Month</th>
<th>Total Amount ($)</th>
<th>Average Price ($)</th>
<th>Transaction Count</th>
</tr>
{% for summary in summaries %}
<tr>
<td>{{ summary.product | hash_product }}</td>
<td>{{ summary.sale_month }}</td>
<td>{{ summary.total_amount | round(2) }}</td>
<td>{{ summary.avg_price | round(2) }}</td>
<td>{{ summary.transaction_count }}</td>
</tr>
{% endfor %}
</table>
</body>
</html>Key Points:
- Type Annotations: Pydantic models ensure type safety, verified by Pyright (Chapter 7).
- Security: OAuth2 with JWT tokens, PII masking via SHA-256, and validated query parameters (Chapter 65).
- Observability: Logging includes API response times and user information (Chapter 66).
- Time Complexity: O(k) for querying k mart rows.
- Space Complexity: O(k) for response data.
- Continuity: Queries
summary_sales(BigQuery) andpg_sales(PostgreSQL) from Chapter 69, aligning with its schema.
70.4 Helm Deployment
The Helm Chart deploys the FastAPI app and Airflow in the data-pipeline namespace, reusing Chapter 69’s secrets and adding resource limits.
70.4.1 Helm Chart Structure
# File: de-onboarding/helm/transaction-pipeline/Chart.yaml
apiVersion: v2
name: transaction-pipeline
description: Helm chart for transaction pipeline
version: 0.1.0# File: de-onboarding/helm/transaction-pipeline/values.yaml
replicaCount: 3 # Support scaling
image:
repository: transaction-pipeline
tag: latest
pullPolicy: IfNotPresent
service:
type: ClusterIP
port: 80
resources:
limits:
cpu: '500m'
memory: '1Gi'
requests:
cpu: '200m'
memory: '512Mi'
secrets:
bigqueryCredentials: '/path/to/credentials.json'
postgresCredentials:
user: 'myuser'
password: 'mypassword'# File: de-onboarding/helm/transaction-pipeline/templates/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ .Chart.Name }}
namespace: data-pipeline
spec:
replicas: {{ .Values.replicaCount }}
selector:
matchLabels:
app: {{ .Chart.Name }}
template:
metadata:
labels:
app: {{ .Chart.Name }}
spec:
containers:
- name: fastapi
image: {{ .Values.image.repository }}:{{ .Values.image.tag }}
ports:
- containerPort: 8000
resources:
limits:
cpu: {{ .Values.resources.limits.cpu }}
memory: {{ .Values.resources.limits.memory }}
requests:
cpu: {{ .Values.resources.requests.cpu }}
memory: {{ .Values.resources.requests.memory }}
env:
- name: GOOGLE_APPLICATION_CREDENTIALS
value: {{ .Values.secrets.bigqueryCredentials }}
- name: POSTGRES_USER
value: {{ .Values.secrets.postgresCredentials.user }}
- name: POSTGRES_PASSWORD
value: {{ .Values.secrets.postgresCredentials.password }}# File: de-onboarding/helm/transaction-pipeline/templates/service.yaml
apiVersion: v1
kind: Service
metadata:
name: { { .Chart.Name } }
namespace: data-pipeline
spec:
selector:
app: { { .Chart.Name } }
ports:
- protocol: TCP
port: { { .Values.service.port } }
targetPort: 8000
type: { { .Values.service.type } }# File: de-onboarding/helm/transaction-pipeline/templates/secrets.yaml
apiVersion: v1
kind: Secret
metadata:
name: pipeline-secrets
namespace: data-pipeline
type: Opaque
data:
bigqueryCredentials: { { .Values.secrets.bigqueryCredentials | b64enc } }
postgresUser: { { .Values.secrets.postgresCredentials.user | b64enc } }
postgresPassword:
{ { .Values.secrets.postgresCredentials.password | b64enc } }Key Points:
- Helm: Manages Kubernetes resources, with O(1) template rendering.
- Secrets: Reuses Chapter 69’s
bigquery-credentialsandpostgres-credentials. - Scalability:
replicaCount: 3and resource limits (cpu: "500m",memory: "1Gi") support load balancing (Chapter 62). - Time Complexity: O(1) for Helm rendering.
- Space Complexity: O(1) for Helm metadata.
- Continuity: Deploys in
data-pipelinenamespace, consistent with Chapter 69.
70.5 Security
Security includes:
- OAuth2: JWT-based authentication for API access.
- PII Masking: SHA-256 hashing for product names in UI and transaction IDs (from Chapter 69).
- Encryption: Helm secrets for BigQuery and PostgreSQL credentials.
- Query Validation: Pydantic ensures safe
product_prefix,min_amount,max_quantity, andsale_month.
70.6 Observability
Logging via Python’s logging module captures API calls, query executions, transaction counts, response times, and user information, extending Chapter 69’s Airflow logging.
70.7 Micro-Project: Integrated Transaction Pipeline
Project Requirements
Extend Chapter 69’s pipeline by integrating data marts (fact_sales, pg_sales, summary_sales) with a FastAPI API/UI, serving BigQuery (summary_sales) for analytical insights and PostgreSQL (pg_sales) for operational insights. Deploy via Helm in the data-pipeline namespace, with OAuth2, PII masking, logging, and pytest tests. The pipeline handles ~10,000 daily transactions, using data/transactions.csv and pipeline_config.yaml (Appendix 1, Chapter 68).
- Ingest: Reuse Chapter 68’s ingestion to GCS/BigQuery/PostgreSQL.
- Transform: Reuse Chapter 69’s dbt models (
fact_sales,pg_sales,summary_sales). - Orchestrate: Airflow DAG from Chapter 69 (
transform_sales_mart). - Serve: FastAPI API/UI querying
summary_salesandpg_sales. - Deploy: Helm Chart in
data-pipelinenamespace. - Test: Pytest for unit, integration, performance, and edge cases.
Sample Input
data/transactions.csv (Appendix 1):
transaction_id,product,price,quantity,date
T001,Halal Laptop,999.99,2,2023-10-01
T002,Halal Mouse,24.99,10,2023-10-02
T003,Halal Keyboard,49.99,5,2023-10-03
T004,,29.99,3,2023-10-04
T005,Monitor,199.99,2,2023-10-05data/pipeline_config.yaml (Chapter 68, extended):
pipeline:
name: transaction_pipeline
version: 1.0.0
data_lake:
gcs_bucket: hijra-transactions
path: raw/transactions
data_warehouse:
project_id: hijra-project
dataset: transactions_warehouse
table: raw_transactions
postgres:
secretName: postgres-credentials
dbname: mydb
host: localhost
port: 5432
data_mart:
dataset: transactions_mart
table: summary_sales
ingestion:
fastapi_endpoint: /upload/transactions
batch_size: 10000
trust_input: true
validation:
min_price: 10.0
max_quantity: 100
required_fields:
- transaction_id
- product
- price
- quantity
- date
product_prefix: 'Halal'
max_decimals: 2
transformation:
dbt_project: dbt_transactions
models:
- fact_sales
- pg_sales
- summary_sales
orchestration:
airflow_dag: transform_sales_mart
schedule: '0 0 * * *'
security:
pii_fields:
- transaction_id
jwt_secret: 'your-secret-key'
kubernetes:
namespace: data-pipeline
helm_chart: transaction-pipeline
resources:
cpu: '1'
memory: '2Gi'Data Processing Flow
flowchart TD
A["transactions.csv"] --> B["GCS Data Lake (Ch. 68)"]
B --> C["BigQuery/PostgreSQL Warehouse"]
C --> D["dbt Transformation (Ch. 69)"]
D --> E["BigQuery Mart: fact_sales"]
D --> F["PostgreSQL Mart: pg_sales"]
E --> G["Summary View: summary_sales"]
F --> G
G --> H["FastAPI API/UI"]
H --> I["Stakeholder Dashboard"]
J["Airflow DAG"] -->|Orchestrates| D
K["Helm Chart"] -->|Deploys| H
K -->|Deploys| J
L["Security"] --> H
M["Logging"] --> H
M --> J
classDef data fill:#f9f9f9,stroke:#333,stroke-width:2px
classDef process fill:#d0e0ff,stroke:#336,stroke-width:1px
classDef deploy fill:#ffddcc,stroke:#933,stroke-width:1px
classDef security fill:#ccffcc,stroke:#363,stroke-width:1px
classDef observe fill:#ffccff,stroke:#636,stroke-width:1px
class A,B,C,E,F,G,I data
class D,H,J process
class K deploy
class L security
class M observeAcceptance Criteria
- Go Criteria:
- Reuses Chapter 69’s data marts (
fact_sales,pg_sales,summary_sales). - Serves
summary_sales(BigQuery) andpg_sales(PostgreSQL) via FastAPI API/UI with OAuth2. - Masks PII (product names, transaction IDs from Chapter 69).
- Deploys via Helm in
data-pipelinenamespace with resource limits. - Logs operations with transaction counts, response times, and user information.
- Passes pytest tests (unit, integration, performance, edge cases).
- Uses type annotations and 4-space indentation per PEP 8.
- Reuses Chapter 69’s data marts (
- No-Go Criteria:
- Fails to serve data or deploy.
- Missing security or logging.
- Fails tests or lacks type annotations.
- Inconsistent indentation.
Common Pitfalls
- BigQuery Errors:
- Problem: Invalid credentials or dataset.
- Solution: Verify with
cat $GOOGLE_APPLICATION_CREDENTIALSorprint(os.environ.get('GOOGLE_APPLICATION_CREDENTIALS')).
- PostgreSQL Errors:
- Problem: Connection refused.
- Solution: Check connection with
psql -h localhost -U myuser -d mydb -c 'SELECT 1;'.
- FastAPI Auth:
- Problem: JWT token errors.
- Solution: Debug with
print(jwt.decode(token, SECRET_KEY, algorithms=['HS256'])).
- Helm Issues:
- Problem: Chart misconfiguration.
- Solution: Run
helm lint helm/transaction-pipelineand check pods withkubectl get pods --namespace data-pipeline.
- PII Exposure:
- Problem: Unmasked data in UI.
- Solution: Inspect HTML with browser developer tools and verify
hash_productfilter.
- Airflow DAG Failures:
- Problem:
transform_sales_martfails to execute. - Solution: Test with
airflow dags test transform_sales_mart 2023-10-01and check logs withairflow logs.
- Problem:
- IndentationError:
- Problem: Mixed spaces/tabs.
- Solution: Use 4 spaces. Run
python -tt main.py.
- Query Parameter Errors:
- Problem: Invalid
product_prefixorsale_month. - Solution: Verify Pydantic validation errors (422 status).
- Problem: Invalid
How This Differs from Production
Production would include:
- Scalability: Autoscaling pods for millions of transactions.
- Monitoring: Prometheus/Grafana for metrics.
- CI/CD: GitHub Actions for deployment.
- Secrets Management: Cloud Secret Manager.
Implementation
# File: de-onboarding/app/main.py (as above)# File: de-onboarding/tests/test_pipeline.py
from typing import List
import pytest
from fastapi.testclient import TestClient
from app.main import app, TransactionSummary, QueryParams
import yaml
from google.cloud import bigquery
from unittest.mock import patch
import os
import logging
import hashlib
import psycopg2
client = TestClient(app)
@pytest.fixture
def config():
"""Load pipeline_config.yaml."""
with open("data/pipeline_config.yaml", "r") as f:
return yaml.safe_load(f)
@pytest.fixture
def auth_token():
"""Get JWT token for authenticated requests."""
response = client.post("/token", data={"username": "admin", "password": "password123"})
return response.json()["access_token"]
@pytest.fixture
def pg_conn():
"""Connect to PostgreSQL."""
config = yaml.safe_load(open("data/pipeline_config.yaml"))
pg_config = config["data_warehouse"]["postgres"]
secret_data = utils.read_k8s_secret(pg_config["secretName"], "data-pipeline")
conn = psycopg2.connect(
dbname=pg_config["dbname"],
host=pg_config["host"],
port=pg_config["port"],
user=secret_data["user"],
password=secret_data["password"]
)
yield conn
conn.close()
def test_login():
"""Test successful login."""
response = client.post("/token", data={"username": "admin", "password": "password123"})
assert response.status_code == 200
assert "access_token" in response.json()
def test_unauthorized_access():
"""Test unauthorized access to trends endpoint."""
response = client.get("/sales/trends")
assert response.status_code == 401
def test_sales_trends(auth_token: str, config: dict):
"""Test BigQuery sales trends endpoint."""
headers = {"Authorization": f"Bearer {auth_token}"}
response = client.get("/sales/trends?product_prefix=Halal&sale_month=2023-10-01", headers=headers)
assert response.status_code == 200
summaries: List[TransactionSummary] = [TransactionSummary(**item) for item in response.json()]
assert len(summaries) > 0
for summary in summaries:
assert summary.product.startswith(config["ingestion"]["validation"]["product_prefix"])
assert summary.total_amount > 0
assert summary.transaction_count > 0
def test_pg_sales_trends(auth_token: str, config: dict, pg_conn):
"""Test PostgreSQL sales trends endpoint."""
headers = {"Authorization": f"Bearer {auth_token}"}
response = client.get("/sales/pg-trends?product_prefix=Halal&sale_month=2023-10-01", headers=headers)
assert response.status_code == 200
summaries: List[TransactionSummary] = [TransactionSummary(**item) for item in response.json()]
assert len(summaries) > 0
for summary in summaries:
assert summary.product.startswith(config["ingestion"]["validation"]["product_prefix"])
assert summary.total_amount > 0
assert summary.transaction_count > 0
def test_invalid_query_params(auth_token: str):
"""Test invalid query parameters."""
headers = {"Authorization": f"Bearer {auth_token}"}
response = client.get("/sales/trends?product_prefix=Invalid@&sale_month=invalid", headers=headers)
assert response.status_code == 422 # Validation error
def test_dashboard(auth_token: str):
"""Test dashboard rendering with PII masking."""
headers = {"Authorization": f"Bearer {auth_token}"}
response = client.get("/", headers=headers)
assert response.status_code == 200
assert "Sales Trends Dashboard" in response.text
assert any(hashlib.sha256(s.product.encode()).hexdigest()[:8] in response.text for s in [TransactionSummary(transaction_id="T001", product="Halal Laptop", sale_month="2023-10-01", total_amount=1999.98, avg_price=999.99, transaction_count=1)])
def test_bigquery_error(auth_token: str):
"""Test handling of BigQuery schema errors."""
with patch("google.cloud.bigquery.Client.query", side_effect=Exception("Invalid schema")):
headers = {"Authorization": f"Bearer {auth_token}"}
response = client.get("/sales/trends", headers=headers)
assert response.status_code == 500
assert "Query error: Invalid schema" in response.text
def test_postgres_error(auth_token: str, pg_conn):
"""Test handling of PostgreSQL connection errors."""
with patch("psycopg2.connect", side_effect=psycopg2.Error("Connection refused")):
headers = {"Authorization": f"Bearer {auth_token}"}
response = client.get("/sales/pg-trends", headers=headers)
assert response.status_code == 500
assert "Query error: Connection refused" in response.text
def test_response_time_performance(auth_token: str, caplog):
"""Test API response time is logged and under 1 second."""
headers = {"Authorization": f"Bearer {auth_token}"}
with caplog.at_level(logging.INFO):
response = client.get("/sales/trends", headers=headers)
assert response.status_code == 200
for record in caplog.records:
if "API response time" in record.message:
duration = float(record.message.split(": ")[1].split(" ")[0])
assert duration < 1.0, f"Response time {duration}s exceeds 1s"
def test_query_performance(auth_token: str):
"""Test BigQuery query performance (bytes processed < 10MB)."""
headers = {"Authorization": f"Bearer {auth_token}"}
with patch("google.cloud.bigquery.Client.query") as mock_query:
mock_job = mock_query.return_value
mock_job.total_bytes_processed = 5 * 1024 * 1024 # 5MB
mock_job.result.return_value = [
{"transaction_id": "hash", "product": "Halal Laptop", "sale_month": "2023-10-01", "total_amount": 1999.98, "avg_price": 999.99, "transaction_count": 1}
]
response = client.get("/sales/trends", headers=headers)
assert response.status_code == 200
assert mock_job.total_bytes_processed < 10 * 1024 * 1024, "Query exceeds 10MB"# File: de-onboarding/Dockerfile
FROM python:3.10-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY app/ .
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]# File: de-onboarding/requirements.txt
fastapi==0.95.0
uvicorn==0.20.0
google-cloud-bigquery==3.4.0
apache-airflow==2.5.0
pytest==7.2.0
pyyaml==6.0
pydantic==1.10.0
passlib[bcrypt]==1.7.4
python-jose[cryptography]==3.3.0
jinja2==3.1.2
psycopg2-binary==2.9.5
dbt-core==1.4.0
dbt-postgres==1.4.0
pandas-gbq==0.17.9
kubernetes==25.3.0Expected Outputs
- API Response (
GET /sales/trends?product_prefix=Halal&sale_month=2023-10-01):
[
{
"transaction_id": "<64-char-hash>",
"product": "Halal Laptop",
"sale_month": "2023-10-01",
"total_amount": 1999.98,
"avg_price": 999.99,
"transaction_count": 1
},
{
"transaction_id": "<64-char-hash>",
"product": "Halal Mouse",
"sale_month": "2023-10-01",
"total_amount": 249.9,
"avg_price": 24.99,
"transaction_count": 1
}
]- API Response (
GET /sales/pg-trends?product_prefix=Halal&sale_month=2023-10-01):
[
{
"transaction_id": "<64-char-hash>",
"product": "Halal Laptop",
"sale_month": "2023-10-01",
"total_amount": 1999.98,
"avg_price": 999.99,
"transaction_count": 1
}
]- Dashboard: HTML table with hashed product names (e.g.,
a1b2c3d4forHalal Laptop). - Airflow Logs (from Chapter 69, abridged):
[2023-10-01] INFO - Uploaded transactions.csv to GCS
[2023-10-01] INFO - Loaded 5 rows to BigQuery
[2023-10-01] INFO - dbt transformations completed: Success- dbt Output (from Chapter 69, abridged):
Running 3 models...
Completed 3 models successfully- Kubernetes Logs:
NAME READY STATUS
pipeline-xxx-1 1/1 Running
pipeline-xxx-2 1/1 Running
pipeline-xxx-3 1/1 Running- Tests: All pytest tests pass.
How to Run and Test
Setup:
- Follow Section 70.1 Setup Instructions.
- Ensure Chapter 69’s
transform_sales_martDAG is running. - Verify
fact_sales,pg_sales, andsummary_salesexist in BigQuery and PostgreSQL.
Run Locally:
- Start FastAPI:
uvicorn app.main:app --host 0.0.0.0 --port 8000. - Test API:
curl -H "Authorization: Bearer <token>" http://localhost:8000/sales/trends?product_prefix=Halal&sale_month=2023-10-01. - Test PostgreSQL API:
curl -H "Authorization: Bearer <token>" http://localhost:8000/sales/pg-trends?product_prefix=Halal&sale_month=2023-10-01. - Run Airflow:
airflow webserverandairflow scheduler. - Run dbt:
dbt run --project-dir de-onboarding/dbt_project --target bigquery.
- Start FastAPI:
Deploy with Helm:
- Build Docker image:
docker build -t transaction-pipeline:latest .. - Install Helm chart:
helm install pipeline helm/transaction-pipeline --namespace data-pipeline. - Verify:
kubectl get pods --namespace data-pipeline.
- Build Docker image:
Test Scenarios:
- Valid Data: Verify API returns
summary_salesandpg_salesdata. - Unauthorized Access: Test
/sales/trendswithout token (expect 401). - Invalid Query Parameters: Test with
product_prefix=Invalid@orsale_month=invalid(expect 422). - BigQuery Error: Test with mocked schema error (expect 500).
- PostgreSQL Error: Test with mocked connection error (expect 500).
- PII Masking: Inspect dashboard HTML for hashed products.
- Response Time: Verify logs contain response times under 1 second.
- Query Performance: Verify BigQuery queries process <10MB.
- Valid Data: Verify API returns
70.8 Practice Exercises
Exercise 1: Enhance API Endpoint
Add a query parameter to filter summaries by avg_price range in /sales/trends. Test with pytest.
Expected Output:
[
{
"transaction_id": "<hash>",
"product": "Halal Laptop",
"sale_month": "2023-10-01",
"total_amount": 1999.98,
"avg_price": 999.99,
"transaction_count": 1
}
]Exercise 2: Add Logging
Enhance FastAPI logging to include query result counts in /sales/pg-trends. Verify logs.
Expected Output:
[2023-10-01] INFO - User admin retrieved 2 PostgreSQL sales summariesExercise 3: Helm Scaling
Test the Helm chart with 3 replicas and verify resource limits (cpu: "500m", memory: "1Gi"). Verify with kubectl describe pods --namespace data-pipeline.
Expected Output:
NAME READY STATUS
pipeline-xxx-1 1/1 Running
Limits: cpu: 500m, memory: 1GiExercise 4: Query Performance
Write a pytest test to verify BigQuery query performance in /sales/trends processes <10MB. Mock query_job.total_bytes_processed.
Expected Output:
Query processed: 5MBExercise 5: API Response Time Logging
Add a test to verify API response time for /sales/pg-trends is logged and under 1 second. Test with pytest.
Expected Output:
[2023-10-01] INFO - API response time: 0.15 seconds70.9 Exercise Solutions
Solution to Exercise 1: Enhance API Endpoint
# File: de-onboarding/app/main.py (partial)
class QueryParams(BaseModel):
product_prefix: str = Field(default="Halal", pattern=r"^[a-zA-Z0-9]+$")
min_amount: float = Field(default=0.0, ge=0.0)
max_quantity: int = Field(default=1000, ge=0)
sale_month: str = Field(default="2023-10-01", pattern=r"^\d{4}-\d{2}-\d{2}$")
min_avg_price: float = Field(default=0.0, ge=0.0)
max_avg_price: float = Field(default=10000.0, ge=0.0)
@app.get("/sales/trends", response_model=List[TransactionSummary])
async def get_sales_trends(
params: QueryParams = Depends(),
current_user: Dict[str, Any] = Depends(get_current_user)
) -> List[TransactionSummary]:
"""Get sales trends from BigQuery summary_sales view with avg_price filter."""
start_time = time.time()
logger.info(f"User {current_user['username']} queried trends: product_prefix={params.product_prefix}, min_amount={params.min_amount}, max_quantity={params.max_quantity}, sale_month={params.sale_month}, min_avg_price={params.min_avg_price}, max_avg_price={params.max_avg_price}")
try:
query = """
SELECT
transaction_id,
product,
sale_month,
total_amount,
avg_price,
transaction_count
FROM `hijra-project.transactions_mart.summary_sales`
WHERE product LIKE @prefix || '%'
AND sale_month = @sale_month
AND avg_price BETWEEN @min_avg_price AND @max_avg_price
"""
job_config = bigquery.QueryJobConfig(
query_parameters=[
{"name": "prefix", "parameterType": {"type": "STRING"}, "parameterValue": {"value": params.product_prefix}},
{"name": "sale_month", "parameterType": {"type": "STRING"}, "parameterValue": {"value": params.sale_month}},
{"name": "min_avg_price", "parameterType": {"type": "FLOAT"}, "parameterValue": {"value": params.min_avg_price}},
{"name": "max_avg_price", "parameterType": {"type": "FLOAT"}, "parameterValue": {"value": params.max_avg_price}}
]
)
logger.info(f"Executing BigQuery summary query")
query_job = bq_client.query(query, job_config=job_config)
results = [
TransactionSummary(
transaction_id=row["transaction_id"],
product=row["product"],
sale_month=row["sale_month"],
total_amount=row["total_amount"],
avg_price=row["avg_price"],
transaction_count=row["transaction_count"]
) for row in query_job
]
logger.info(f"Retrieved {len(results)} sales summaries")
return results
except Exception as e:
logger.error(f"BigQuery query failed: {str(e)}")
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Query error: {str(e)}")
finally:
duration = time.time() - start_time
logger.info(f"API response time: {duration:.2f} seconds")# File: de-onboarding/tests/test_pipeline.py (partial)
def test_sales_trends_avg_price(auth_token: str, config: dict):
"""Test sales trends with avg_price filter."""
headers = {"Authorization": f"Bearer {auth_token}"}
response = client.get("/sales/trends?product_prefix=Halal&sale_month=2023-10-01&min_avg_price=500&max_avg_price=1000", headers=headers)
assert response.status_code == 200
summaries: List[TransactionSummary] = [TransactionSummary(**item) for item in response.json()]
assert len(summaries) > 0
assert all(500 <= summary.avg_price <= 1000 for summary in summaries)Solution to Exercise 2: Add Logging
# File: de-onboarding/app/main.py (partial)
@app.get("/sales/pg-trends", response_model=List[TransactionSummary])
async def get_pg_sales_trends(
params: QueryParams = Depends(),
current_user: Dict[str, Any] = Depends(get_current_user)
) -> List[TransactionSummary]:
"""Get sales trends from PostgreSQL pg_sales table with result count logging."""
start_time = time.time()
logger.info(f"User {current_user['username']} queried PostgreSQL trends: product_prefix={params.product_prefix}, min_amount={params.min_amount}, max_quantity={params.max_quantity}, sale_month={params.sale_month}")
try:
conn = psycopg2.connect(**pg_conn_params)
cursor = conn.cursor()
query = """
SELECT
transaction_id,
product,
TO_CHAR(sale_date, 'YYYY-MM-DD') AS sale_month,
SUM(amount) AS total_amount,
AVG(price) AS avg_price,
COUNT(DISTINCT transaction_id) AS transaction_count
FROM pg_sales
WHERE product LIKE %s || '%%'
AND TO_CHAR(sale_date, 'YYYY-MM-DD') = %s
GROUP BY transaction_id, product, TO_CHAR(sale_date, 'YYYY-MM-DD')
HAVING SUM(amount) >= %s AND SUM(quantity) <= %s
"""
cursor.execute(query, (params.product_prefix, params.sale_month, params.min_amount, params.max_quantity))
results = [
TransactionSummary(
transaction_id=row[0],
product=row[1],
sale_month=row[2],
total_amount=row[3],
avg_price=row[4],
transaction_count=row[5]
) for row in cursor.fetchall()
]
cursor.close()
conn.close()
logger.info(f"User {current_user['username']} retrieved {len(results)} PostgreSQL sales summaries")
return results
except Exception as e:
logger.error(f"PostgreSQL query failed: {str(e)}")
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Query error: {str(e)}")
finally:
duration = time.time() - start_time
logger.info(f"API response time: {duration:.2f} seconds")# File: de-onboarding/tests/test_pipeline.py (partial)
def test_pg_result_count_logging(auth_token: str, caplog, pg_conn):
"""Test result count logging in PostgreSQL trends endpoint."""
headers = {"Authorization": f"Bearer {auth_token}"}
with caplog.at_level(logging.INFO):
response = client.get("/sales/pg-trends", headers=headers)
assert response.status_code == 200
assert any("retrieved" in record.message and "PostgreSQL sales summaries" in record.message for record in caplog.records)Solution to Exercise 3: Helm Scaling
# File: de-onboarding/helm/transaction-pipeline/values.yaml (partial)
replicaCount: 3
resources:
limits:
cpu: '500m'
memory: '1Gi'
requests:
cpu: '200m'
memory: '512Mi'Test:
helm install pipeline helm/transaction-pipeline --namespace data-pipeline
kubectl describe pods --namespace data-pipeline | grep -E 'Limits|Requests'Solution to Exercise 4: Query Performance
# File: de-onboarding/tests/test_pipeline.py (partial)
def test_query_performance(auth_token: str):
"""Test BigQuery query performance (bytes processed < 10MB)."""
headers = {"Authorization": f"Bearer {auth_token}"}
with patch("google.cloud.bigquery.Client.query") as mock_query:
mock_job = mock_query.return_value
mock_job.total_bytes_processed = 5 * 1024 * 1024 # 5MB
mock_job.result.return_value = [
{"transaction_id": "hash", "product": "Halal Laptop", "sale_month": "2023-10-01", "total_amount": 1999.98, "avg_price": 999.99, "transaction_count": 1}
]
response = client.get("/sales/trends", headers=headers)
assert response.status_code == 200
assert mock_job.total_bytes_processed < 10 * 1024 * 1024, "Query exceeds 10MB"
print(f"Query processed: {mock_job.total_bytes_processed / 1024 / 1024}MB")Solution to Exercise 5: API Response Time Logging
# File: de-onboarding/tests/test_pipeline.py (partial)
def test_pg_response_time_performance(auth_token: str, caplog, pg_conn):
"""Test PostgreSQL API response time is logged and under 1 second."""
headers = {"Authorization": f"Bearer {auth_token}"}
with caplog.at_level(logging.INFO):
response = client.get("/sales/pg-trends", headers=headers)
assert response.status_code == 200
for record in caplog.records:
if "API response time" in record.message:
duration = float(record.message.split(": ")[1].split(" ")[0])
assert duration < 1.0, f"Response time {duration}s exceeds 1s"70.10 Chapter Summary and Connection to Future Learning
This chapter completed the transaction pipeline by integrating Chapter 69’s data marts (fact_sales, pg_sales, summary_sales) with a FastAPI API/UI, querying BigQuery for analytical insights and PostgreSQL for operational analytics. Deployed via Helm in the data-pipeline namespace with resource limits, the pipeline serves monthly sales trends (e.g., /sales/trends?month=2023-10) and real-time data (e.g., /sales/pg-trends), enabling fintech decisions like inventory optimization. OAuth2, SHA-256 PII masking, and validated query parameters ensured security, while logging with response times and user information enhanced observability. Pytest tests validated functionality, performance (<10MB processed), and edge cases (e.g., database errors). Complexity analysis (e.g., O(k) for API queries, O(1) for Helm rendering) and 4-space indentation per PEP 8 solidified production-ready skills.
Connection to Future Learning: This capstone prepares you for production deployment at Hijra Group, integrating real-time transaction APIs and scaling with autoscaling Kubernetes clusters. Explore advanced tools like Apache Spark for big data or implement CI/CD with GitHub Actions to automate deployments, enhancing Hijra Group’s analytics for growing transaction volumes while maintaining compliance.