59 - Checkpoint 8: Pipeline Orchestration Review
Complexity: Easy (E)
59.0 Introduction: Why This Matters for Data Engineering
In data engineering, orchestrating pipelines ensures reliable, scalable, and testable data workflows, critical for Hijra Group’s Sharia-compliant fintech analytics. This pipeline automates daily sales processing for Hijra Group’s Sharia-compliant products, ensuring compliance with Islamic Financial Services Board (IFSB) standards. This checkpoint consolidates skills from Chapters 52–58, covering Django, FastAPI, dbt, Airflow, and robust orchestration, enabling end-to-end pipeline development. By integrating PostgreSQL, BigQuery, and type-annotated Python, you’ll build a pipeline that processes sales data, transforms it into a data mart, and exposes metrics via a FastAPI endpoint, all orchestrated with Airflow and tested with pytest. This aligns with Hijra Group’s need for automated, production-grade pipelines that deliver actionable insights.
This chapter builds on Phase 8’s orchestration concepts, ensuring type safety (Chapter 7), testing (Chapter 9), and modular code organization (Chapter 5). All Python code uses type annotations verified by Pyright and is tested with pytest, adhering to PEP 8’s 4-space indentation (spaces, not tabs) to avoid IndentationError. The micro-project uses data/sales.csv from Appendix 1, focusing on a self-contained pipeline without introducing new concepts like Kubernetes (Chapter 61) or security (Chapter 65).
Data Engineering Workflow Context
The following diagram illustrates the pipeline orchestration flow:
flowchart TD
A["Raw Data
sales.csv"] --> B["Airflow DAG"]
B --> C["dbt Models
PostgreSQL/BigQuery"]
C --> D["Data Mart"]
D --> E["FastAPI Endpoint"]
E --> F["Metrics Output"]
B --> G["Logging"]
classDef data fill:#f9f9f9,stroke:#333,stroke-width:2px
classDef process fill:#d0e0ff,stroke:#336,stroke-width:1px
classDef endpoint fill:#ddffdd,stroke:#363,stroke-width:1px
class A,D,F data
class B,C,E process
class G endpointBuilding On and Preparing For
- Building On:
- Chapter 52: Django for UI and API development.
- Chapter 53: FastAPI for type-safe APIs.
- Chapter 54: dbt for data warehouse transformations.
- Chapter 56: Airflow for orchestration.
- Chapter 58: Complex Airflow workflows with retries.
- Chapter 17: PostgreSQL integration.
- Chapter 26: BigQuery integration.
- Preparing For:
- Chapter 60: Docker for containerized pipelines.
- Chapter 64: Airflow in Kubernetes.
- Chapter 67–70: Capstone projects integrating orchestration with Helm and security.
What You’ll Learn
This chapter reviews:
- Pipeline Orchestration: Using Airflow to schedule dbt and FastAPI tasks.
- Type-Safe Integration: Combining PostgreSQL, BigQuery, and FastAPI with Pyright-verified code.
- Testing: Writing pytest tests for pipeline components.
- Logging: Tracking workflow execution.
- Data Transformation: Building dbt models for a sales data mart.
The micro-project builds a tested Airflow pipeline that processes data/sales.csv, transforms it into a PostgreSQL/BigQuery data mart using dbt, and exposes metrics via a FastAPI endpoint, all with 4-space indentation per PEP 8.
Follow-Along Tips:
- Create
de-onboarding/data/and populate withsales.csvper Appendix 1. - Install libraries:
pip install apache-airflow fastapi uvicorn dbt-core dbt-postgres dbt-bigquery psycopg2-binary google-cloud-bigquery pandas pyyaml pytest. - Set up PostgreSQL and BigQuery (see Chapter 16, 25 setup instructions).
- Configure Airflow: Initialize with
airflow initdband start withairflow webserverandairflow scheduler. - Use 4-space indentation per PEP 8 (VS Code: “Editor: Tab Size” = 4, “Editor: Insert Spaces” = true, “Editor: Detect Indentation” = false).
- Debug with
print(df.head())for DataFrames andairflow logsfor DAGs. - Verify file paths with
ls data/(Unix/macOS) ordir data\(Windows). - Use UTF-8 encoding to avoid
UnicodeDecodeError.
59.1 Core Concepts
The following diagram illustrates the workflow of the Airflow-dbt-FastAPI pipeline:
flowchart TD
A["load_to_postgres
(PythonOperator)"] --> B["run_dbt
(BashOperator)"]
B --> C["get_sales
(FastAPI Endpoint)"]
C --> D["Metrics JSON"]
classDef task fill:#d0e0ff,stroke:#336,stroke-width:1px
classDef output fill:#ddffdd,stroke:#363,stroke-width:1px
class A,B,C task
class D output59.1.1 Airflow Pipeline Orchestration
Airflow orchestrates tasks via DAGs (Directed Acyclic Graphs), scheduling dbt transformations and API calls. DAGs are defined with type-annotated Python, using operators like PythonOperator and BashOperator. Execution is logged for observability, with O(1) task scheduling and O(n) for task execution (n tasks). Space complexity is O(n) for DAG definitions and task metadata.
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
from typing import Dict, Any
# Define DAG
dag = DAG(
"sales_pipeline",
start_date=datetime(2023, 10, 1),
schedule_interval="@daily",
)
# Define task
def run_dbt_task() -> None:
"""Run dbt models."""
print("Running dbt run") # Debug
# Placeholder for dbt run (see micro-project)
task = PythonOperator(
task_id="run_dbt",
python_callable=run_dbt_task,
dag=dag,
)Key Points:
- Time Complexity: O(1) for scheduling, O(n) for executing n tasks.
- Space Complexity: O(n) for DAG definition and task metadata.
- Implication: Airflow ensures reliable task execution for Hijra Group’s pipelines.
59.1.2 dbt for Data Transformation
dbt transforms data in PostgreSQL/BigQuery using type-annotated models. Models are SQL files with Jinja templating, compiled to optimized queries. Tests ensure data quality, with O(n) query execution (n rows) and O(k) space for output tables (k rows).
-- File: de-onboarding/dbt/models/sales_mart.sql
SELECT
product,
SUM(price * quantity) as total_sales
FROM {{ ref('sales') }}
GROUP BY productKey Points:
- Time Complexity: O(n) for transforming n rows.
- Space Complexity: O(k) for k output rows in the data mart.
- Implication: dbt enables scalable, testable transformations.
59.1.3 FastAPI for Metrics Exposure
FastAPI provides type-safe endpoints for metrics, integrated with PostgreSQL/BigQuery. Pydantic ensures input/output validation, with O(1) endpoint access, O(n) for query execution, and O(k) space for output records (k records).
from fastapi import FastAPI
from pydantic import BaseModel
from typing import Dict, List
app = FastAPI()
class SalesResponse(BaseModel):
total_sales: float
top_products: Dict[str, float]
@app.get("/sales", response_model=SalesResponse)
async def get_sales() -> SalesResponse:
print("Fetching sales metrics") # Debug
return SalesResponse(total_sales=2499.83, top_products={"Halal Laptop": 1999.98})Key Points:
- Time Complexity: O(n) for querying n rows.
- Space Complexity: O(k) for k output records.
- Implication: FastAPI delivers metrics to stakeholders efficiently.
59.2 Micro-Project: Sales Data Pipeline
Dataset Seeding
Create de-onboarding/data/sales.csv with the content from Appendix 1, ensuring UTF-8 encoding. Verify with cat data/sales.csv (Unix/macOS) or type data\sales.csv (Windows). The file should contain:
product,price,quantity
Halal Laptop,999.99,2
Halal Mouse,24.99,10
Halal Keyboard,49.99,5
,29.99,3
Monitor,invalid,2
Headphones,5.00,150Project Requirements
Build a type-annotated Airflow pipeline that:
- Loads
data/sales.csvinto PostgreSQL. - Transforms data into a BigQuery data mart using dbt.
- Exposes metrics via a FastAPI endpoint.
- Logs steps and validates with pytest.
- Uses 4-space indentation per PEP 8, preferring spaces over tabs.
Data Processing Flow
flowchart TD
A["sales.csv"] --> B["Airflow DAG"]
B --> C["Load to PostgreSQL"]
C --> D["dbt Transform to BigQuery"]
D --> E["Data Mart"]
E --> F["FastAPI Endpoint"]
B --> G["Logging"]
F --> H["Metrics JSON"]
classDef data fill:#f9f9f9,stroke:#333,stroke-width:2px
classDef process fill:#d0e0ff,stroke:#336,stroke-width:1px
classDef endpoint fill:#ddffdd,stroke:#363,stroke-width:1px
class A,E,H data
class B,C,D,F process
class G endpointAcceptance Criteria
- Go Criteria:
- Loads
sales.csvinto PostgreSQL. - Transforms data into BigQuery data mart using dbt.
- Exposes metrics via
/salesFastAPI endpoint. - Logs steps (file-based or console).
- Passes pytest tests for data loading, transformation, and API.
- Uses type annotations verified by Pyright.
- Uses 4-space indentation per PEP 8.
- Loads
- No-Go Criteria:
- Fails to load or transform data.
- Missing API endpoint or tests.
- Incorrect type annotations or indentation.
Common Pitfalls to Avoid
- Airflow Setup Issues:
- Problem: DAG not visible.
- Solution: Ensure DAG file is in
~/airflow/dags/. Runairflow dags list.
- dbt Connection Errors:
- Problem: Fails to connect to BigQuery/PostgreSQL.
- Solution: Verify
profiles.yml. Print connection config withdbt debug.
- FastAPI Errors:
- Problem: Endpoint fails.
- Solution: Check Pydantic models. Run
uvicorn app:appand test withcurl http://localhost:8000/sales.
- Type Errors:
- Problem: Pyright errors.
- Solution: Run
pyright .and fix annotations.
- Pytest Database Errors:
- Problem: Tests fail due to database connection issues.
- Solution: Verify credentials in
test_pipeline.py. Runpsql -d sales_dborbq show project.dataset.
- Log File Permission Errors:
- Problem: Logs aren’t written to
data/pipeline.log. - Solution: Check write permissions with
ls -l data/. Runchmod u+w data/(Unix/macOS) or adjust permissions in Windows Explorer.
- Problem: Logs aren’t written to
- YAML Configuration Errors:
- Problem:
dbt runfails with parsing errors. - Solution: Check
profiles.ymlfor incorrect indentation or missing keys. Validate withpython -c "import yaml; yaml.safe_load(open('de-onboarding/dbt/profiles.yml'))".
- Problem:
- IndentationError:
- Problem: Mixed spaces/tabs.
- Solution: Use 4 spaces. Run
python -tt script.py.
How This Differs from Production
In production, this pipeline would include:
- Security: OAuth2 and encryption (Chapter 65).
- Observability: Metrics with Prometheus (Chapter 66).
- Scalability: Kubernetes deployment (Chapter 64).
- CI/CD: Automated testing/deployment (Chapter 66).
- Testing: Integration tests across environments and performance tests for scalability (Chapter 43).
- Logging: Production pipelines use log rotation to manage file size, unlike the simple file-based logging here (Chapter 66).
Implementation
# File: de-onboarding/dags/sales_pipeline.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime
from typing import Dict, Any
import pandas as pd
import psycopg2
from google.cloud import bigquery
import yaml
import logging
from pathlib import Path
# Configure logging with timestamp and level
logging.basicConfig(
level=logging.INFO,
filename="data/pipeline.log",
format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
def load_to_postgres() -> None:
"""Load sales.csv to PostgreSQL."""
logger.info("Loading sales.csv to PostgreSQL")
df = pd.read_csv("data/sales.csv")
df = df.dropna(subset=["product", "price"])
df = df[df["product"].str.startswith("Halal")]
df = df[df["quantity"] <= 100]
conn = psycopg2.connect(
dbname="sales_db",
user="user",
password="password",
host="localhost",
port="5432"
)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS sales (
product TEXT,
price FLOAT,
quantity INTEGER
)
""")
for _, row in df.iterrows():
cursor.execute(
"INSERT INTO sales (product, price, quantity) VALUES (%s, %s, %s)",
(row["product"], row["price"], row["quantity"])
)
conn.commit()
conn.close()
logger.info("Loaded %d records to PostgreSQL", len(df))
def run_dbt() -> None:
"""Run dbt models."""
logger.info("Running dbt models")
# BashOperator executes this in micro-project
with DAG(
"sales_pipeline",
start_date=datetime(2023, 10, 1),
schedule_interval="@daily",
catchup=False,
) as dag:
load_task = PythonOperator(
task_id="load_to_postgres",
python_callable=load_to_postgres,
)
dbt_task = BashOperator(
task_id="run_dbt",
bash_command="cd de-onboarding/dbt && dbt run --profiles-dir .",
)
load_task >> dbt_task
# File: de-onboarding/app.py
from fastapi import FastAPI
from pydantic import BaseModel
from typing import Dict
from google.cloud import bigquery
import logging
# Configure logging with timestamp and level
logging.basicConfig(
level=logging.INFO,
filename="data/api.log",
format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
app = FastAPI()
class SalesResponse(BaseModel):
total_sales: float
top_products: Dict[str, float]
@app.get("/sales", response_model=SalesResponse)
async def get_sales() -> SalesResponse:
"""Fetch sales metrics from BigQuery."""
logger.info("Fetching sales metrics")
client = bigquery.Client()
query = """
SELECT product, total_sales
FROM `project.dataset.sales_mart`
ORDER BY total_sales DESC
LIMIT 3
"""
result = client.query(query).result()
top_products = {row["product"]: row["total_sales"] for row in result}
total_sales = sum(top_products.values())
return SalesResponse(total_sales=total_sales, top_products=top_products)
# File: de-onboarding/dbt/models/sales_mart.sql
SELECT
product,
SUM(price * quantity) as total_sales
FROM {{ ref('sales') }}
GROUP BY product
# File: de-onboarding/dbt/profiles.yml
sales_pipeline:
target: dev
outputs:
dev:
type: bigquery
method: service-account
project: your-project
dataset: dataset
threads: 4
keyfile: /path/to/keyfile.json
# File: de-onboarding/tests/test_pipeline.py
from typing import Dict
import pytest
import pandas as pd
from app import app
from fastapi.testclient import TestClient
import psycopg2
from google.cloud import bigquery
client = TestClient(app)
def test_load_to_postgres() -> None:
"""Test loading to PostgreSQL."""
df = pd.read_csv("data/sales.csv")
df = df.dropna(subset=["product", "price"])
df = df[df["product"].str.startswith("Halal")]
df = df[df["quantity"] <= 100]
conn = psycopg2.connect(
dbname="sales_db",
user="user",
password="password",
host="localhost",
port="5432"
)
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM sales")
count = cursor.fetchone()[0]
conn.close()
assert count == len(df), f"Expected {len(df)} records, got {count}"
def test_dbt_transformation() -> None:
"""Test dbt transformation to BigQuery."""
bq_client = bigquery.Client()
query = "SELECT COUNT(*) FROM `project.dataset.sales_mart`"
result = bq_client.query(query).result()
count = next(result)[0]
assert count == 3, f"Expected 3 records, got {count}"
def test_api_endpoint() -> None:
"""Test FastAPI endpoint."""
response = client.get("/sales")
assert response.status_code == 200
data = response.json()
assert "total_sales" in data
assert "top_products" in data
assert data["total_sales"] > 0
assert len(data["top_products"]) == 3Expected Outputs
PostgreSQL Table (sales):
| product | price | quantity |
|---|---|---|
| Halal Laptop | 999.99 | 2 |
| Halal Mouse | 24.99 | 10 |
| Halal Keyboard | 49.99 | 5 |
BigQuery Data Mart (sales_mart):
| product | total_sales |
|---|---|
| Halal Laptop | 1999.98 |
| Halal Mouse | 249.90 |
| Halal Keyboard | 249.95 |
FastAPI Response (/sales):
{
"total_sales": 2499.83,
"top_products": {
"Halal Laptop": 1999.98,
"Halal Mouse": 249.9,
"Halal Keyboard": 249.95
}
}Logs (data/pipeline.log, data/api.log):
2023-10-01 00:00:00,000 - INFO - Loading sales.csv to PostgreSQL
2023-10-01 00:00:01,000 - INFO - Loaded 3 records to PostgreSQL
2023-10-01 00:00:02,000 - INFO - Running dbt models
2023-10-01 00:00:03,000 - INFO - Fetching sales metricsHow to Run and Test
Setup:
- Create
de-onboarding/data/and savesales.csvper Appendix 1. - Install libraries:
pip install apache-airflow fastapi uvicorn dbt-core dbt-postgres dbt-bigquery psycopg2-binary google-cloud-bigquery pandas pyyaml pytest. - Set up PostgreSQL per Chapter 16: Create
sales_dbwithuser/password. - Set up BigQuery per Chapter 25: Configure
project.datasetand service account key. - Configure Airflow: Run
airflow initdb, setdags_foldertode-onboarding/dags/. - Configure dbt: Update
profiles.ymlwith BigQuery credentials. - Configure editor for 4-space indentation per PEP 8 (VS Code: “Editor: Tab Size” = 4, “Editor: Insert Spaces” = true, “Editor: Detect Indentation” = false).
- Create
Troubleshooting Setup:
- Airflow Initialization Failure: If
airflow initdbfails, check SQLite permissions withls -l ~/airflow/. Ensure write access or reinitialize withairflow db reset. - BigQuery Authentication: If
dbt runfails, verify keyfile path withcat /path/to/keyfile.json. Ensure the service account has BigQuery Editor permissions.
- Airflow Initialization Failure: If
Run:
- Verify type annotations:
pyright .. - Start Airflow:
airflow webserverandairflow scheduler. - Run DAG:
airflow dags trigger sales_pipeline. - Start FastAPI:
uvicorn app:app --host 0.0.0.0 --port 8000. - Test API:
curl http://localhost:8000/sales.
- Verify type annotations:
Test:
- Run pytest:
pytest tests/test_pipeline.py. - Verify PostgreSQL:
psql -d sales_db -c "SELECT * FROM sales;". - Verify BigQuery: Query
project.dataset.sales_mart. - Check logs:
cat data/pipeline.log,cat data/api.log.
- Run pytest:
59.3 Practice Exercises
The following diagram illustrates the exercise workflow:
flowchart TD
A["Test Airflow Task"] --> B["Test dbt Model"]
B --> C["Debug FastAPI"]
C --> D["Enhance Logging"]
D --> E["Optimize dbt Query"]
E --> F["Compare Orchestrators"]
F --> G["Debug Airflow DAG"]
A --> H["Verify Outputs"]
B --> H
C --> H
D --> H
E --> H
F --> H
G --> H
classDef task fill:#d0e0ff,stroke:#336,stroke-width:1px
classDef output fill:#ddffdd,stroke:#363,stroke-width:1px
class A,B,C,D,E,F,G task
class H outputExercise 1: Airflow Task Test
Write a pytest test to verify the load_to_postgres task, ensuring 3 records are loaded.
Expected Output:
test_load_to_postgres PASSEDExpected Log (data/pipeline.log):
2023-10-01 00:00:01,000 - INFO - Loaded 3 records to PostgreSQLExercise 2: dbt Model Validation
Write a dbt test to ensure sales_mart has positive total_sales.
Expected Output:
dbt test: All tests passedSample Test Log:
Completed 2 tests for sales_mart: not_null passed, positive_values passed.Exercise 3: FastAPI Endpoint Debug
Debug a FastAPI endpoint that returns incorrect total_sales.
Buggy Code:
@app.get("/sales", response_model=SalesResponse)
async def get_sales() -> SalesResponse:
client = bigquery.Client()
query = "SELECT product, total_sales FROM `project.dataset.sales_mart`"
result = client.query(query).result()
top_products = {row["product"]: row["total_sales"] for row in result}
total_sales = len(top_products) # Bug
return SalesResponse(total_sales=total_sales, top_products=top_products)Expected Output:
{
"total_sales": 2499.83,
"top_products": {
"Halal Laptop": 1999.98,
"Halal Mouse": 249.9,
"Halal Keyboard": 249.95
}
}Exercise 4: Logging Enhancement
Enhance load_to_postgres to log invalid records.
Expected Output (data/pipeline.log):
2023-10-01 00:00:00,000 - INFO - Invalid record: product=None, price=29.99, quantity=3Exercise 5: Optimize dbt Query
Optimize the sales_mart.sql model to reduce query time by adding a filter for valid quantities (e.g., quantity <= 100). Test with dbt run and measure execution time.
Sample Input (sales_mart.sql):
SELECT
product,
SUM(price * quantity) as total_sales
FROM {{ ref('sales') }}
GROUP BY productExpected Output:
dbt run: Completed in 0.5sExpected Log (data/pipeline.log):
2023-10-01 00:00:02,000 - INFO - Running dbt modelsExercise 6: Compare Orchestrators
Explain the trade-offs of using Airflow versus APScheduler for scheduling the sales pipeline. Consider scalability, observability, and complexity. Save your answer to de-onboarding/ex6_tradeoffs.txt.
Evaluation Criteria: Ensure your explanation addresses scalability (e.g., distributed execution), observability (e.g., logs/UI), and complexity (e.g., setup effort). Compare your response to the solution in solutions/ex6_tradeoffs.txt.
Expected Output (ex6_tradeoffs.txt):
Airflow offers scalability for complex pipelines with DAGs, robust observability via logs and UI, but requires setup complexity. APScheduler is simpler for lightweight scheduling, suitable for small tasks, but lacks Airflow’s observability and distributed execution for large-scale pipelines like Hijra Group’s analytics.Exercise 7: Debug Airflow DAG
Debug a buggy Airflow DAG where the dbt_task runs before load_task. Fix the task dependency and verify with airflow dags test sales_pipeline 2023-10-01.
Buggy Code:
with DAG(
"sales_pipeline",
start_date=datetime(2023, 10, 1),
schedule_interval="@daily",
catchup=False,
) as dag:
load_task = PythonOperator(
task_id="load_to_postgres",
python_callable=load_to_postgres,
)
dbt_task = BashOperator(
task_id="run_dbt",
bash_command="cd de-onboarding/dbt && dbt run --profiles-dir .",
)
dbt_task >> load_task # Bug: Incorrect dependencyExpected Output:
airflow dags test sales_pipeline 2023-10-01
# Log shows: load_to_postgres completed, then run_dbt completed59.4 Exercise Solutions
Solution to Exercise 1
# File: de-onboarding/tests/test_load.py
def test_load_to_postgres() -> None:
"""Test loading to PostgreSQL."""
load_to_postgres()
conn = psycopg2.connect(
dbname="sales_db",
user="user",
password="password",
host="localhost",
port="5432"
)
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM sales")
count = cursor.fetchone()[0]
conn.close()
assert count == 3, f"Expected 3 records, got {count}"Solution to Exercise 2
# File: de-onboarding/dbt/models/sales_mart.yml
version: 2
models:
- name: sales_mart
columns:
- name: total_sales
tests:
- not_null
- positive_valuesSolution to Exercise 3
# File: de-onboarding/app.py
@app.get("/sales", response_model=SalesResponse)
async def get_sales() -> SalesResponse:
"""Fetch sales metrics from BigQuery."""
logger.info("Fetching sales metrics")
client = bigquery.Client()
query = "SELECT product, total_sales FROM `project.dataset.sales_mart`"
result = client.query(query).result()
top_products = {row["product"]: row["total_sales"] for row in result}
total_sales = sum(top_products.values()) # Fix: Sum values
return SalesResponse(total_sales=total_sales, top_products=top_products)Explanation: Fixed total_sales by summing top_products values instead of counting keys.
Solution to Exercise 4
# File: de-onboarding/dags/sales_pipeline.py
def load_to_postgres() -> None:
"""Load sales.csv to PostgreSQL."""
logger.info("Loading sales.csv to PostgreSQL")
df = pd.read_csv("data/sales.csv")
invalid = df[df["product"].isna() | ~df["product"].str.startswith("Halal") | (df["quantity"] > 100)]
for _, row in invalid.iterrows():
logger.info("Invalid record: product=%s, price=%s, quantity=%s", row["product"], row["price"], row["quantity"])
df = df.dropna(subset=["product", "price"])
df = df[df["product"].str.startswith("Halal")]
df = df[df["quantity"] <= 100]
# Rest of the function unchangedSolution to Exercise 5
-- File: de-onboarding/dbt/models/sales_mart.sql
SELECT
product,
SUM(price * quantity) as total_sales
FROM {{ ref('sales') }}
WHERE quantity <= 100
GROUP BY productExplanation: Added WHERE quantity <= 100 to filter invalid quantities, reducing query processing time. Test with dbt run --models sales_mart and verify execution time.
Solution to Exercise 6
# File: de-onboarding/solutions/ex6_tradeoffs.txt
Airflow offers scalability for complex pipelines with DAGs, robust observability via logs and UI, but requires setup complexity. APScheduler is simpler for lightweight scheduling, suitable for small tasks, but lacks Airflow’s observability and distributed execution for large-scale pipelines like Hijra Group’s analytics.Explanation: The text compares Airflow’s scalability and observability with APScheduler’s simplicity, addressing trade-offs relevant to Hijra Group’s needs.
Solution to Exercise 7
# File: de-onboarding/dags/sales_pipeline.py
with DAG(
"sales_pipeline",
start_date=datetime(2023, 10, 1),
schedule_interval="@daily",
catchup=False,
) as dag:
load_task = PythonOperator(
task_id="load_to_postgres",
python_callable=load_to_postgres,
)
dbt_task = BashOperator(
task_id="run_dbt",
bash_command="cd de-onboarding/dbt && dbt run --profiles-dir .",
)
load_task >> dbt_task # Fix: Correct dependencyExplanation: Fixed the dependency by changing dbt_task >> load_task to load_task >> dbt_task, ensuring load_to_postgres runs before run_dbt.
59.5 Chapter Summary and Connection to Chapter 60
You’ve consolidated:
- Airflow: Orchestrating tasks with DAGs (O(1) scheduling).
- dbt: Transforming data into data marts (O(n) queries).
- FastAPI: Exposing metrics with type-safe endpoints (O(n) queries).
- Testing: Ensuring reliability with pytest.
- White-Space Sensitivity and PEP 8: Using 4-space indentation, preferring spaces over tabs.
The micro-project built a robust pipeline, integrating PostgreSQL, BigQuery, dbt, and FastAPI, tested with pytest and logged for observability, all with 4-space indentation per PEP 8. This prepares for production deployment in Phase 9.
Connection to Chapter 60
Chapter 60 introduces Docker for Data Applications, building on this chapter:
- Containerization: Packages the pipeline in Docker, enhancing portability.
- Type Safety: Maintains Pyright-verified code.
- Testing: Extends pytest to Dockerized environments.
- Fintech Context: Prepares for scalable deployments at Hijra Group, maintaining PEP 8’s 4-space indentation.