58 - Building Complex Airflow Workflows
Complexity: Moderate (M)
58.0 Introduction: Why This Matters for Data Engineering
In data engineering, orchestrating complex workflows is essential for managing multi-step data pipelines that process financial transaction data for Hijra Group’s Sharia-compliant fintech analytics. Apache Airflow enables robust, scalable pipeline orchestration by defining Directed Acyclic Graphs (DAGs) that handle dependencies, retries, and scheduling. Building on Chapter 56 (Airflow Fundamentals) and Chapter 57 (Airflow in Docker), this chapter focuses on creating complex, type-annotated Airflow workflows with retries, branching, and dynamic task generation, ensuring reliable execution of ETL (Extract, Transform, Load) processes. These workflows are critical for Hijra Group, where pipelines may involve extracting sales data, transforming it with dbt, and loading it into BigQuery, with daily runs processing thousands of records.
This chapter uses Python 3.10+, type annotations verified by Pyright (introduced in Chapter 7), and pytest for testing (introduced in Chapter 9), aligning with the curriculum’s emphasis on type-safe, testable code. It avoids advanced Kubernetes concepts (covered in Chapter 64) and focuses on Airflow’s PythonOperator, BranchPythonOperator, and task dependencies. All code uses PEP 8’s 4-space indentation, preferring spaces over tabs to avoid IndentationError, ensuring compatibility with Hijra Group’s pipeline scripts.
Data Engineering Workflow Context
This diagram illustrates how complex Airflow workflows fit into a data engineering pipeline:
flowchart TD
A["Raw Data (CSV)"] --> B["Airflow DAG"]
B --> C{"Complex Workflow"}
C -->|Extract| D["PythonOperator: Load CSV"]
C -->|Transform| E["PythonOperator: dbt Run"]
C -->|Load| F["PythonOperator: BigQuery Load"]
C -->|Branch| G["BranchPythonOperator: Validate"]
D --> E
E --> F
G -->|Valid| E
G -->|Invalid| H["PythonOperator: Notify"]
F --> I["Data Mart"]
H --> I
classDef data fill:#f9f9f9,stroke:#333,stroke-width:2px
classDef process fill:#d0e0ff,stroke:#336,stroke-width:1px
classDef decision fill:#ffddaa,stroke:#663,stroke-width:1px
classDef storage fill:#ddffdd,stroke:#363,stroke-width:1px
class A,I data
class B,C,D,E,F,H process
class G decisionBuilding On and Preparing For
- Building On:
- Chapter 7: Uses type annotations for type-safe code.
- Chapter 9: Applies
pytestfor testing pipeline components. - Chapter 13: Leverages YAML configuration parsing for dynamic settings.
- Chapter 26: Uses BigQuery integration for loading data.
- Chapter 54: Incorporates dbt for transformations.
- Chapter 55: Extends scheduling concepts from APScheduler.
- Chapter 56: Builds on Airflow DAG basics and PythonOperator.
- Chapter 57: Uses Dockerized Airflow for consistent environments.
- Preparing For:
- Chapter 59: Prepares for consolidating orchestration skills in Checkpoint 8.
- Chapter 64: Sets the stage for deploying Airflow in Kubernetes with Helm.
- Chapter 67–70: Enables capstone projects with end-to-end pipelines.
What You’ll Learn
This chapter covers:
- Complex DAG Structures: Defining multi-step DAGs with dependencies.
- Branching: Using BranchPythonOperator for conditional workflows.
- Retries and Error Handling: Configuring retries for task resilience.
- Dynamic Task Generation: Creating tasks dynamically based on config.
- Testing: Validating workflows with
pytest. - Logging: Adding type-annotated logging for observability.
By the end, you’ll build a type-annotated Airflow pipeline that orchestrates a sales ETL process, using data/sales.csv and config.yaml (Appendix 1), with branching for validation, retries for reliability, and tests to ensure robustness. All code adheres to PEP 8’s 4-space indentation.
Follow-Along Tips:
- Create
de-onboarding/data/and populate withsales.csvandconfig.yamlper Appendix 1. - Install libraries:
pip install apache-airflow pandas pyyaml pytest. - Ensure Docker Desktop is installed and running for Airflow (Chapter 57).
- Configure Airflow in
de-onboarding/airflow/withairflow.cfganddags/folder. - Use print statements (e.g.,
print(dag.dag_id)) to debug DAGs. - Verify file paths with
ls data/(Unix/macOS) ordir data\(Windows). - Use UTF-8 encoding for all files to avoid
UnicodeDecodeError. - Configure editor for 4-space indentation per PEP 8 (VS Code: “Editor: Tab Size” = 4, “Editor: Insert Spaces” = true, “Editor: Detect Indentation” = false).
58.1 Complex DAG Structures
Airflow DAGs are Python scripts defining tasks and their dependencies. Complex DAGs involve multiple tasks with intricate dependencies, such as sequential and parallel execution. Tasks are defined using operators (e.g., PythonOperator), and dependencies are set with >> or set_upstream/set_downstream.
58.1.1 Defining Multi-Step DAGs
Create a DAG with tasks for loading, transforming, and loading data.
# File: de-onboarding/airflow/dags/sample_dag.py
from airflow import DAG # Import DAG class
from airflow.operators.python import PythonOperator # Import PythonOperator
from datetime import datetime # For schedule
import logging # For logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Define task functions
def extract_task(): # Extract data
"""Extract data task."""
logger.info("Extracting data") # Log step
return {"data": "sales.csv"} # Simulate data extraction
def transform_task(ti): # Transform data
"""Transform data task."""
data = ti.xcom_pull(task_ids="extract") # Pull data from extract task
logger.info(f"Transforming data: {data}") # Log step
return {"transformed": "cleaned_data"} # Simulate transformation
def load_task(ti): # Load data
"""Load data task."""
data = ti.xcom_pull(task_ids="transform") # Pull data from transform task
logger.info(f"Loading data: {data}") # Log step
# Define DAG
with DAG(
dag_id="sample_dag", # Unique DAG ID
start_date=datetime(2023, 10, 1), # Start date
schedule_interval="@daily", # Run daily
catchup=False, # No backfill
) as dag:
# Define tasks
extract = PythonOperator(
task_id="extract", # Unique task ID
python_callable=extract_task, # Function to call
)
transform = PythonOperator(
task_id="transform",
python_callable=transform_task,
)
load = PythonOperator(
task_id="load",
python_callable=load_task,
)
# Set dependencies
extract >> transform >> load # Sequential executionThe following diagram visualizes the task dependencies:
flowchart LR
A["extract"] --> B["transform"]
B --> C["load"]
classDef task fill:#d0e0ff,stroke:#336,stroke-width:1px
class A,B,C taskFollow-Along Instructions:
- Ensure
de-onboarding/airflow/is set up from Chapter 57. - Save as
de-onboarding/airflow/dags/sample_dag.py. - Configure editor for 4-space indentation per PEP 8.
- Start Airflow:
docker-compose upinde-onboarding/airflow/. - Access Airflow UI:
http://localhost:8080, login (default:airflow/airflow). - Enable
sample_dagin UI and trigger a run. - Check logs in UI for task execution.
- Common Errors:
- FileNotFoundError: Ensure DAG is in
dags/folder. Printos.listdir("dags/"). - IndentationError: Use 4 spaces (not tabs). Run
python -tt sample_dag.py. - Airflow Not Running: Verify Docker is running with
docker ps.
- FileNotFoundError: Ensure DAG is in
Key Points:
- DAG: Defines workflow with tasks and dependencies.
- PythonOperator: Executes Python functions as tasks.
- XCom: Shares data between tasks (O(1) push/pull).
- Time Complexity: O(n) for task execution, where n is data size.
- Space Complexity: O(1) for DAG metadata, O(n) for XCom data.
- Implication: Enables sequential ETL for Hijra Group’s pipelines.
58.2 Branching with BranchPythonOperator
Branching allows conditional execution based on data or conditions, using BranchPythonOperator to select task branches.
58.2.1 Implementing Branching
Add branching to validate data before transformation. The BranchPythonOperator evaluates conditions in O(1), as it executes a single Python function to return a task ID. However, Airflow’s scheduler resolves task dependencies across the DAG in O(n) for n tasks, as it traverses the graph to determine execution order.
# File: de-onboarding/airflow/dags/branch_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from datetime import datetime
import logging
from typing import Dict, Any # For type annotations
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def extract_task() -> Dict[str, Any]:
"""Extract data task."""
logger.info("Extracting data")
return {"data": "sales.csv", "valid": True} # Simulate valid data
def validate_task(ti) -> str:
"""Validate data and decide branch."""
data = ti.xcom_pull(task_ids="extract")
logger.info(f"Validating data: {data}")
if data.get("valid", False):
return "transform" # Proceed to transform
return "notify" # Notify on invalid data
def transform_task(ti) -> Dict[str, Any]:
"""Transform data task."""
data = ti.xcom_pull(task_ids="extract")
logger.info(f"Transforming data: {data}")
return {"transformed": "cleaned_data"}
def notify_task() -> None:
"""Notify on invalid data."""
logger.info("Notifying: Invalid data detected")
def load_task(ti) -> None:
"""Load data task."""
data = ti.xcom_pull(task_ids="transform")
logger.info(f"Loading data: {data}")
with DAG(
dag_id="branch_dag",
start_date=datetime(2023, 10, 1),
schedule_interval="@daily",
catchup=False,
) as dag:
extract = PythonOperator(task_id="extract", python_callable=extract_task)
validate = BranchPythonOperator(task_id="validate", python_callable=validate_task)
transform = PythonOperator(task_id="transform", python_callable=transform_task)
notify = PythonOperator(task_id="notify", python_callable=notify_task)
load = PythonOperator(task_id="load", python_callable=load_task)
extract >> validate
validate >> [transform, notify] # Branch to transform or notify
transform >> load # Load after transformFollow-Along Instructions:
- Save as
de-onboarding/airflow/dags/branch_dag.py. - Configure editor for 4-space indentation per PEP 8.
- Restart Airflow:
docker-compose down && docker-compose up. - Enable
branch_dagin UI and trigger. - Verify logs show
validatebranches totransform. - Common Errors:
- KeyError: Ensure XCom keys exist. Print
datainvalidate_task. - IndentationError: Use 4 spaces (not tabs). Run
python -tt branch_dag.py.
- KeyError: Ensure XCom keys exist. Print
Key Points:
- BranchPythonOperator: Returns task ID(s) to execute.
- Time Complexity: O(1) for branching decision, O(n) for scheduler’s dependency resolution across n tasks.
- Space Complexity: O(1) for branch metadata.
- Implication: Enables conditional workflows, e.g., skipping transformation on invalid data.
58.3 Retries and Error Handling
Retries make tasks resilient to transient failures, such as network issues when loading to BigQuery. The retry_delay parameter accepts a timedelta object or an integer representing seconds, controlling the wait time between retry attempts.
58.3.1 Configuring Retries
Add retries to a task.
# File: de-onboarding/airflow/dags/retry_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import logging
from typing import Dict, Any
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def load_task() -> None:
"""Load data with potential failure."""
logger.info("Attempting to load data")
raise ValueError("Simulated network failure") # Simulate failure
with DAG(
dag_id="retry_dag",
start_date=datetime(2023, 10, 1),
schedule_interval="@daily",
catchup=False,
) as dag:
load = PythonOperator(
task_id="load",
python_callable=load_task,
retries=3, # Retry 3 times
retry_delay=5, # 5 seconds between retries
)Follow-Along Instructions:
- Save as
de-onboarding/airflow/dags/retry_dag.py. - Configure editor for 4-space indentation per PEP 8.
- Restart Airflow and enable
retry_dag. - Trigger and check logs for 3 retry attempts.
- Common Errors:
- Retry Not Triggering: Ensure
retriesis set. Printload.retries. - IndentationError: Use 4 spaces (not tabs). Run
python -tt retry_dag.py.
- Retry Not Triggering: Ensure
Key Points:
- Retries: Re-run tasks on failure.
- Retry Delay: Time between retries (in seconds).
- Time Complexity: O(1) for retry logic.
- Space Complexity: O(1) for retry metadata.
- Implication: Enhances pipeline reliability for Hijra Group’s cloud operations.
58.4 Dynamic Task Generation
Dynamic tasks allow creating tasks based on configuration, e.g., processing multiple datasets.
58.4.1 Generating Tasks Dynamically
Create tasks for each dataset in config.yaml. The example below shows sequential execution, where each dataset task depends on the previous one.
# File: de-onboarding/airflow/dags/dynamic_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import yaml
import logging
from typing import Dict, Any, List
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def read_config(config_path: str) -> Dict[str, Any]:
"""Read YAML config."""
with open(config_path, "r") as file:
config = yaml.safe_load(file)
logger.info(f"Loaded config: {config}")
return config
def process_dataset(dataset: str) -> None:
"""Process a dataset."""
logger.info(f"Processing dataset: {dataset}")
with DAG(
dag_id="dynamic_dag",
start_date=datetime(2023, 10, 1),
schedule_interval="@daily",
catchup=False,
) as dag:
config = read_config("data/config.yaml")
datasets = config.get("datasets", ["sales.csv"]) # Default to sales.csv
tasks: List[PythonOperator] = []
for dataset in datasets:
task = PythonOperator(
task_id=f"process_{dataset.replace('.', '_')}",
python_callable=process_dataset,
op_kwargs={"dataset": dataset},
)
tasks.append(task)
# Set sequential dependencies
for i in range(len(tasks) - 1):
tasks[i] >> tasks[i + 1]In production, tasks may run in parallel for efficiency, especially when processing independent datasets. The following example shows parallel execution, where tasks have no dependencies between them, allowing simultaneous execution. This is explored further in Chapter 64 for Kubernetes-based scaling.
# Parallel execution example (no dependencies between tasks)
with DAG(
dag_id="parallel_dynamic_dag",
start_date=datetime(2023, 10, 1),
schedule_interval="@daily",
catchup=False,
) as dag:
config = read_config("data/config.yaml")
datasets = config.get("datasets", ["sales.csv"])
tasks: List[PythonOperator] = []
for dataset in datasets:
task = PythonOperator(
task_id=f"process_{dataset.replace('.', '_')}",
python_callable=process_dataset,
op_kwargs={"dataset": dataset},
)
tasks.append(task)
# No dependencies set, tasks run in parallelThe following diagram visualizes parallel task execution:
flowchart LR
A["process_sales_csv"]
B["process_transactions_csv"]
classDef task fill:#d0e0ff,stroke:#336,stroke-width:1px
class A,B taskFollow-Along Instructions:
- Update
data/config.yaml:min_price: 10.0 max_quantity: 100 required_fields: - product - price - quantity product_prefix: 'Halal' max_decimals: 2 datasets: - sales.csv - transactions.csv - Save as
de-onboarding/airflow/dags/dynamic_dag.py. - Configure editor for 4-space indentation per PEP 8.
- Restart Airflow and enable
dynamic_dag. - Verify logs show tasks for
sales.csvandtransactions.csv. - Common Errors:
- KeyError: Ensure
datasetsinconfig.yaml. Printconfig. - IndentationError: Use 4 spaces (not tabs). Run
python -tt dynamic_dag.py.
- KeyError: Ensure
Key Points:
- Dynamic Tasks: Generate tasks based on config.
- Time Complexity: O(n) for creating n tasks.
- Space Complexity: O(n) for n task objects.
- Implication: Scales pipelines for multiple datasets at Hijra Group.
58.5 Micro-Project: Sales ETL Pipeline with Complex Workflow
Docker Setup Recap
From Chapter 57, ensure Airflow is set up in de-onboarding/airflow/:
- A
docker-compose.ymlfile exists inde-onboarding/airflow/, defining Airflow services (webserver, scheduler, etc.). - Run
docker-compose upto start Airflow. - Access the Airflow UI at
http://localhost:8080(default login:airflow/airflow). - Place DAGs in
de-onboarding/airflow/dags/and ensuredata/is mounted for file access. - The
airflow.cfgfile inde-onboarding/airflow/configures settings like XCom backend and logging. Check Chapter 57 for details on modifyingairflow.cfgif needed.
If issues arise, verify Docker is running (docker ps) and check docker-compose.yml for correct volume mappings.
Project Requirements
Build a type-annotated Airflow pipeline orchestrating a sales ETL process for Hijra Group’s analytics, using data/sales.csv and config.yaml. The pipeline extracts data, validates it, transforms it with simulated dbt, and loads it to BigQuery, with branching for invalid data and retries for reliability:
- Extract: Load
sales.csvwith Pandas. - Validate: Check data using
utils.validate_salefrom Chapter 3. - Transform: Simulate dbt transformation.
- Load: Simulate BigQuery load.
- Notify: Log invalid data.
- Use BranchPythonOperator for validation branching.
- Configure retries (3 attempts, 5-second delay) for transform and load.
- Read
config.yamlfor validation rules. - Log steps with
logging. - Test with
pytestfor task logic. - Export metrics to
data/etl_metrics.json. - Use 4-space indentation per PEP 8, preferring spaces over tabs.
Sample Input Files
data/sales.csv (Appendix 1):
product,price,quantity
Halal Laptop,999.99,2
Halal Mouse,24.99,10
Halal Keyboard,49.99,5
,29.99,3
Monitor,invalid,2
Headphones,5.00,150data/config.yaml (Appendix 1, updated):
min_price: 10.0
max_quantity: 100
required_fields:
- product
- price
- quantity
product_prefix: 'Halal'
max_decimals: 2
datasets:
- sales.csvData Processing Flow
flowchart TD
A["sales.csv"] --> B["Extract Task"]
B --> C["Validate Task"]
C -->|Valid| D["Transform Task"]
C -->|Invalid| E["Notify Task"]
D --> F["Load Task"]
E --> G["End"]
F --> G
H["config.yaml"] --> C
classDef data fill:#f9f9f9,stroke:#333,stroke-width:2px
classDef process fill:#d0e0ff,stroke:#336,stroke-width:1px
classDef decision fill:#ffddaa,stroke:#663,stroke-width:1px
classDef endpoint fill:#ddffdd,stroke:#363,stroke-width:1px
class A,H data
class B,D,F,E process
class C decision
class G endpointAcceptance Criteria
- Go Criteria:
- Loads
sales.csvandconfig.yamlcorrectly. - Validates data, branching to transform or notify.
- Executes transform and load with retries (3 attempts, 5-second delay).
- Logs steps and invalid records.
- Exports metrics (e.g., valid/invalid counts) to
data/etl_metrics.json. - Passes
pytesttests for task logic. - Uses type annotations verified by Pyright.
- Uses 4-space indentation per PEP 8.
- Loads
- No-Go Criteria:
- Fails to load files or validate data.
- Incorrect branching or retries.
- Missing JSON export or logs.
- Lacks type annotations or tests.
- Inconsistent indentation or tab/space mixing.
Common Pitfalls to Avoid
- FileNotFoundError:
- Problem:
sales.csvorconfig.yamlnot found. - Solution: Print paths (
print(csv_path)). Ensure files are indata/.
- Problem:
- Validation Errors:
- Problem: Incorrect validation logic.
- Solution: Print
df.head()andconfigto debug.
- Branching Issues:
- Problem: Validate task returns wrong task ID.
- Solution: Print return value in
validate_task.
- Retry Failures:
- Problem: Retries not triggering.
- Solution: Print
task.retriesfor transform/load tasks.
- IndentationError:
- Problem: Mixed spaces/tabs.
- Solution: Use 4 spaces per PEP 8. Run
python -tt sales_etl_dag.py.
- Test Failures:
- Problem:
pytestfails due to logic errors. - Solution: Print intermediate results in test functions.
- Problem:
- XCom Size Limit:
- Problem: Large DataFrames in XCom cause serialization errors.
- Solution: Push smaller payloads (e.g., metadata) or use file-based storage (Chapter 64). Print
len(records)to check payload size.
How This Differs from Production
In production, this solution would include:
- Real dbt/BigQuery: Actual dbt runs and BigQuery loads (Chapter 64).
- Observability: Prometheus metrics and alerts (Chapter 66).
- Security: Encrypted connections and PII masking (Chapter 65).
- Scalability: Kubernetes for distributed execution (Chapter 64).
- CI/CD: Automated DAG deployment (Chapter 66).
- Custom Operators: Use of Airflow operators like
BigQueryOperatoror SLAs for stricter scheduling (Chapter 64). - Best Practices: Following Airflow’s official guidelines for DAG design and optimization (see https://airflow.apache.org/docs/apache-airflow/stable/best-practices.html).
- Optimized Logging: Minimizing logging verbosity to avoid performance issues, as detailed in Airflow’s logging documentation (see https://airflow.apache.org/docs/apache-airflow/stable/logging-monitoring/logging.html).
Implementation
# File: de-onboarding/utils.py (from Chapter 3, partial)
def clean_string(s: str) -> str:
"""Strip whitespace from string."""
return s.strip()
def is_numeric(s: str, max_decimals: int = 2) -> bool:
"""Check if string is a decimal number with up to max_decimals."""
parts = s.split(".")
if len(parts) != 2 or not parts[0].isdigit() or not parts[1].isdigit():
return False
return len(parts[1]) <= max_decimals
def is_numeric_value(x: Any) -> bool:
"""Check if value is numeric."""
return isinstance(x, (int, float))
def apply_valid_decimals(x: Any, max_decimals: int) -> bool:
"""Apply decimal validation."""
return is_numeric(str(x), max_decimals)
def is_integer(x: Any) -> bool:
"""Check if value is integer."""
return str(x).isdigit()
def validate_sale(sale: Dict[str, str], config: Dict[str, Any]) -> bool:
"""Validate sale based on config rules."""
required_fields = config["required_fields"]
min_price = config["min_price"]
max_quantity = config["max_quantity"]
prefix = config["product_prefix"]
max_decimals = config["max_decimals"]
print(f"Validating sale: {sale}")
for field in required_fields:
if field not in sale or not sale[field] or sale[field].strip() == "":
print(f"Invalid sale: missing {field}: {sale}")
return False
product = clean_string(sale["product"])
if not product.startswith(prefix):
print(f"Invalid sale: product lacks '{prefix}' prefix: {sale}")
return False
price = clean_string(sale["price"])
if not is_numeric(price, max_decimals) or float(price) < min_price or float(price) <= 0:
print(f"Invalid sale: invalid price: {sale}")
return False
quantity = clean_string(sale["quantity"])
if not quantity.isdigit() or int(quantity) > max_quantity:
print(f"Invalid sale: invalid quantity: {sale}")
return False
return True# File: de-onboarding/airflow/dags/sales_etl_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from datetime import datetime, timedelta
import pandas as pd
import yaml
import json
import logging
from typing import Dict, Any, List
import utils # Import utils module
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def read_config(config_path: str) -> Dict[str, Any]:
"""Read YAML config."""
logger.info(f"Reading config: {config_path}")
with open(config_path, "r") as file:
config = yaml.safe_load(file)
logger.info(f"Loaded config: {config}")
return config
def extract_task(ti: Any, csv_path: str, config_path: str) -> Dict[str, Any]:
"""Extract sales data."""
logger.info(f"Extracting data from {csv_path}")
df = pd.read_csv(csv_path)
config = read_config(config_path)
records = df.to_dict("records")
ti.xcom_push(key="records", value=records)
ti.xcom_push(key="config", value=config)
return {"records": records, "config": config}
def validate_task(ti: Any) -> str:
"""Validate data and decide branch."""
records = ti.xcom_pull(key="records", task_ids="extract")
config = ti.xcom_pull(key="config", task_ids="extract")
valid_records = [r for r in records if utils.validate_sale(r, config)]
invalid_count = len(records) - len(valid_records)
logger.info(f"Valid records: {len(valid_records)}, Invalid: {invalid_count}")
ti.xcom_push(key="valid_records", value=valid_records)
ti.xcom_push(key="metrics", value={"valid_count": len(valid_records), "invalid_count": invalid_count})
return "transform" if valid_records else "notify"
def transform_task(ti: Any) -> Dict[str, Any]:
"""Transform data (simulated dbt)."""
valid_records = ti.xcom_pull(key="valid_records", task_ids="validate")
logger.info(f"Transforming {len(valid_records)} records")
transformed = [{"product": r["product"], "amount": float(r["price"]) * int(r["quantity"])} for r in valid_records]
ti.xcom_push(key="transformed", value=transformed)
return {"transformed": transformed}
def load_task(ti: Any, json_path: str) -> None:
"""Load data to BigQuery (simulated)."""
transformed = ti.xcom_pull(key="transformed", task_ids="transform")
metrics = ti.xcom_pull(key="metrics", task_ids="validate")
logger.info(f"Loading {len(transformed)} records")
with open(json_path, "w") as file:
json.dump(metrics, file, indent=2)
logger.info(f"Metrics saved to {json_path}")
def notify_task(ti: Any) -> None:
"""Notify on invalid data."""
metrics = ti.xcom_pull(key="metrics", task_ids="validate")
logger.info(f"Invalid data detected: {metrics['invalid_count']} records")
with DAG(
dag_id="sales_etl",
start_date=datetime(2023, 10, 1),
schedule_interval="@daily",
catchup=False,
default_args={
"retries": 3,
"retry_delay": timedelta(seconds=5),
},
) as dag:
extract = PythonOperator(
task_id="extract",
python_callable=extract_task,
op_kwargs={"csv_path": "data/sales.csv", "config_path": "data/config.yaml"},
)
validate = BranchPythonOperator(
task_id="validate",
python_callable=validate_task,
)
transform = PythonOperator(
task_id="transform",
python_callable=transform_task,
)
load = PythonOperator(
task_id="load",
python_callable=load_task,
op_kwargs={"json_path": "data/etl_metrics.json"},
)
notify = PythonOperator(
task_id="notify",
python_callable=notify_task,
)
extract >> validate >> [transform, notify]
transform >> load# File: de-onboarding/tests/test_sales_etl.py
import pytest
import pandas as pd
from typing import Dict, Any
import utils
import json
import os
def test_validate_sale():
config = {
"min_price": 10.0,
"max_quantity": 100,
"required_fields": ["product", "price", "quantity"],
"product_prefix": "Halal",
"max_decimals": 2,
}
valid_sale = {"product": "Halal Laptop", "price": "999.99", "quantity": "2"}
invalid_sale = {"product": "Monitor", "price": "invalid", "quantity": "2"}
assert utils.validate_sale(valid_sale, config) is True
assert utils.validate_sale(invalid_sale, config) is False
def test_extract_task(tmp_path):
csv_path = tmp_path / "sales.csv"
df = pd.DataFrame([
{"product": "Halal Laptop", "price": 999.99, "quantity": 2},
{"product": "Monitor", "price": "invalid", "quantity": 2},
])
df.to_csv(csv_path, index=False)
config_path = tmp_path / "config.yaml"
config = {
"min_price": 10.0,
"max_quantity": 100,
"required_fields": ["product", "price", "quantity"],
"product_prefix": "Halal",
"max_decimals": 2,
}
with open(config_path, "w") as f:
yaml.dump(config, f)
class MockTaskInstance:
def xcom_push(self, key: str, value: Any) -> None:
self.__dict__[key] = value
ti = MockTaskInstance()
result = extract_task(ti, str(csv_path), str(config_path))
assert len(ti.records) == 2
assert ti.config == config
def test_validate_task():
class MockTaskInstance:
def __init__(self):
self.xcom_data = {}
def xcom_pull(self, key: str = None, task_ids: str = None) -> Any:
return self.xcom_data.get(task_ids, {}).get(key)
def xcom_push(self, key: str, value: Any) -> None:
self.xcom_data.setdefault("validate", {})[key] = value
ti = MockTaskInstance()
ti.xcom_data["extract"] = {
"records": [
{"product": "Halal Laptop", "price": "999.99", "quantity": "2"},
{"product": "Monitor", "price": "invalid", "quantity": "2"},
],
"config": {
"min_price": 10.0,
"max_quantity": 100,
"required_fields": ["product", "price", "quantity"],
"product_prefix": "Halal",
"max_decimals": 2,
},
}
branch = validate_task(ti)
assert branch == "transform"
assert len(ti.xcom_data["validate"]["valid_records"]) == 1
assert ti.xcom_data["validate"]["metrics"]["valid_count"] == 1Expected Outputs
data/etl_metrics.json:
{
"valid_count": 3,
"invalid_count": 3
}Airflow Logs (abridged):
[2023-10-01 00:00:00] INFO - Extracting data from data/sales.csv
[2023-10-01 00:00:01] INFO - Reading config: data/config.yaml
[2023-10-01 00:00:01] INFO - Validating sale: {'product': 'Halal Laptop', 'price': 999.99, 'quantity': 2}
[2023-10-01 00:00:01] INFO - Valid records: 3, Invalid: 3
[2023-10-01 00:00:02] INFO - Transforming 3 records
[2023-10-01 00:00:03] INFO - Loading 3 records
[2023-10-01 00:00:03] INFO - Metrics saved to data/etl_metrics.jsonHow to Run and Test
Setup:
- Setup Checklist:
- Create
de-onboarding/data/withsales.csv,config.yaml. - Install:
pip install apache-airflow pandas pyyaml pytest. - Ensure Docker Desktop is running.
- Set up Airflow in
de-onboarding/airflow/from Chapter 57. - Save
utils.py,sales_etl_dag.py,test_sales_etl.py. - Configure editor for 4-space indentation per PEP 8.
- Create
- Troubleshooting:
- If
FileNotFoundError, print paths (print(csv_path)). - If
ModuleNotFoundError, install libraries or checkutils.py. - If
IndentationError, use 4 spaces. Runpython -tt sales_etl_dag.py. - If
yaml.YAMLError, printopen(config_path).read().
- If
- Setup Checklist:
Run:
- Start Airflow:
docker-compose upinde-onboarding/airflow/. - Access the Airflow UI at
http://localhost:8080(default login:airflow/airflow). - In the UI, navigate to the “DAGs” tab, enable
sales_etl, and trigger a run. View task logs under “Graph View” or “Task Instance Details” to inspect execution states. - Verify logs and
data/etl_metrics.json.
- Start Airflow:
Test:
Unit Tests:
- Run:
pytest de-onboarding/tests/test_sales_etl.py -v. - Verify all tests pass.
- Test invalid data by modifying
sales.csvto have no valid records.
- Run:
Integration Test Example (for reference, explored in Chapter 59):
import pytest from airflow.models import DagBag def test_dag_execution(): dagbag = DagBag() dag = dagbag.get_dag("sales_etl") assert dag is not None # Mock task execution (simplified) assert len(dag.tasks) == 5 # extract, validate, transform, load, notify- This tests the DAG structure. Chapter 59 covers full integration testing with task execution.
58.6 Practice Exercises
Exercise 1: Add Retry to Extract Task
Modify sales_etl_dag.py to add retries (2 attempts, 3-second delay) to the extract task, with 4-space indentation per PEP 8.
Expected Output (Logs):
[2023-10-01 00:00:00] INFO - Attempting to extract data
[2023-10-01 00:00:00] ERROR - Failed, retrying (1/2)Follow-Along Instructions:
- Save as
de-onboarding/airflow/dags/sales_etl_retry.py. - Configure editor for 4-space indentation.
- Run and verify retries in logs.
- How to Test:
- Add
raise ValueError("Test retry")inextract_task. - Check logs for retry attempts.
- Add
Exercise 2: Dynamic Task for Multiple Datasets
Modify sales_etl_dag.py to process datasets from config.yaml dynamically, with 4-space indentation per PEP 8.
Expected Output (Logs):
[2023-10-01 00:00:00] INFO - Extracting data from sales.csv
[2023-10-01 00:00:01] INFO - Extracting data from transactions.csvFollow-Along Instructions:
- Update
config.yamlwithdatasets. - Save as
de-onboarding/airflow/dags/sales_etl_dynamic.py. - Configure editor for 4-space indentation.
- Run and verify logs for multiple datasets.
- How to Test:
- Add
transactions.csvtoconfig.yaml. - Check logs for both datasets.
- Add
Exercise 3: Test Transform Task
Write a pytest test for transform_task, ensuring it computes amounts correctly, with 4-space indentation per PEP 8.
Expected Output:
test_transform_task ... okFollow-Along Instructions:
- Save as
de-onboarding/tests/test_transform.py. - Configure editor for 4-space indentation.
- Run:
pytest de-onboarding/tests/test_transform.py -v. - How to Test:
- Verify transformed records have correct amounts.
Exercise 4: Debug Branching Bug
Fix this buggy validate_task that always branches to notify, with 4-space indentation per PEP 8.
def validate_task(ti):
records = ti.xcom_pull(key="records", task_ids="extract")
config = ti.xcom_pull(key="config", task_ids="extract")
valid_records = records # Bug: No validation
ti.xcom_push(key="valid_records", value=valid_records)
return "notify" # Bug: Always notifyExpected Output:
[2023-10-01 00:00:00] INFO - Valid records: 3, Invalid: 3Follow-Along Instructions:
- Save as
de-onboarding/airflow/dags/sales_etl_debug.py. - Configure editor for 4-space indentation.
- Fix and run.
- How to Test:
- Verify branching to
transformfor valid data.
- Verify branching to
Exercise 5: Explain Retry Benefits
Write a function to save a conceptual explanation of why retries improve pipeline reliability to de-onboarding/ex5_concepts.txt, with 4-space indentation per PEP 8.
Expected Output (ex5_concepts.txt):
Retries improve pipeline reliability by automatically re-attempting failed tasks due to transient issues, such as network failures or temporary service unavailability, ensuring robust execution without manual intervention.Follow-Along Instructions:
- Save as
de-onboarding/ex5_retries.py. - Configure editor for 4-space indentation.
- Run:
python de-onboarding/ex5_retries.py. - How to Test:
- Verify
ex5_concepts.txtcontains the explanation. - Check file content with
cat de-onboarding/ex5_concepts.txt(Unix/macOS) ortype de-onboarding\ex5_concepts.txt(Windows).
- Verify
Exercise 6: Debug XCom Serialization Error
Fix this buggy extract_task that pushes a large DataFrame to XCom, causing a serialization error, with 4-space indentation per PEP 8.
def extract_task(ti, csv_path, config_path):
df = pd.read_csv(csv_path) # Large DataFrame
config = read_config(config_path)
records = df.to_dict("records")
ti.xcom_push(key="records", value=records) # Bug: Large payload
return {"records": records, "config": config}Expected Output (Logs):
[2023-10-01 00:00:00] INFO - Extracting data from data/sales.csv
[2023-10-01 00:00:00] INFO - Pushed metadata: 6 recordsFollow-Along Instructions:
- Save as
de-onboarding/airflow/dags/sales_etl_xcom.py. - Configure editor for 4-space indentation.
- Fix and run.
- How to Test:
- Verify logs show metadata (record count) instead of full DataFrame.
- Check XCom in Airflow UI for smaller payload.
Exercise 7: Compare Parallel vs. Sequential Tasks
Write a function to save a conceptual explanation comparing parallel and sequential task execution in Airflow to de-onboarding/ex7_concepts.txt, with 4-space indentation per PEP 8.
Expected Output (ex7_concepts.txt):
Parallel task execution in Airflow allows independent tasks to run simultaneously, improving performance for large datasets but requiring careful resource management. Sequential execution ensures tasks complete in order, suitable for dependent tasks but slower for independent ones.Follow-Along Instructions:
- Save as
de-onboarding/ex7_parallel.py. - Configure editor for 4-space indentation.
- Run:
python de-onboarding/ex7_parallel.py. - How to Test:
- Verify
ex7_concepts.txtcontains the explanation. - Check file content with
cat de-onboarding/ex7_concepts.txt(Unix/macOS) ortype de-onboarding\ex7_concepts.txt(Windows).
- Verify
58.7 Exercise Solutions
Solution to Exercise 1: Add Retry to Extract Task
# File: de-onboarding/airflow/dags/sales_etl_retry.py (partial)
extract = PythonOperator(
task_id="extract",
python_callable=extract_task,
op_kwargs={"csv_path": "data/sales.csv", "config_path": "data/config.yaml"},
retries=2,
retry_delay=timedelta(seconds=3),
)Solution to Exercise 2: Dynamic Task for Multiple Datasets
# File: de-onboarding/airflow/dags/sales_etl_dynamic.py (partial)
with DAG(
dag_id="sales_etl_dynamic",
start_date=datetime(2023, 10, 1),
schedule_interval="@daily",
catchup=False,
default_args={"retries": 3, "retry_delay": timedelta(seconds=5)},
) as dag:
config = read_config("data/config.yaml")
datasets = config.get("datasets", ["sales.csv"])
extract_tasks = []
for dataset in datasets:
extract = PythonOperator(
task_id=f"extract_{dataset.replace('.', '_')}",
python_callable=extract_task,
op_kwargs={"csv_path": f"data/{dataset}", "config_path": "data/config.yaml"},
)
extract_tasks.append(extract)
validate = BranchPythonOperator(task_id="validate", python_callable=validate_task)
transform = PythonOperator(task_id="transform", python_callable=transform_task)
load = PythonOperator(task_id="load", python_callable=load_task, op_kwargs={"json_path": "data/etl_metrics.json"})
notify = PythonOperator(task_id="notify", python_callable=notify_task)
for extract in extract_tasks:
extract >> validate
validate >> [transform, notify]
transform >> loadSolution to Exercise 3: Test Transform Task
# File: de-onboarding/tests/test_transform.py
import pytest
from typing import Any
def test_transform_task():
class MockTaskInstance:
def __init__(self):
self.xcom_data = {"validate": {"valid_records": [
{"product": "Halal Laptop", "price": "999.99", "quantity": "2"},
{"product": "Halal Mouse", "price": "24.99", "quantity": "10"},
]}}
def xcom_pull(self, key: str, task_ids: str) -> Any:
return self.xcom_data.get(task_ids, {}).get(key)
def xcom_push(self, key: str, value: Any) -> None:
self.xcom_data.setdefault("transform", {})[key] = value
ti = MockTaskInstance()
result = transform_task(ti)
transformed = ti.xcom_data["transform"]["transformed"]
assert len(transformed) == 2
assert transformed[0]["amount"] == 999.99 * 2
assert transformed[1]["amount"] == 24.99 * 10Solution to Exercise 4: Debug Branching Bug
# File: de-onboarding/airflow/dags/sales_etl_debug.py (partial)
def validate_task(ti):
records = ti.xcom_pull(key="records", task_ids="extract")
config = ti.xcom_pull(key="config", task_ids="extract")
valid_records = [r for r in records if utils.validate_sale(r, config)]
invalid_count = len(records) - len(valid_records)
logger.info(f"Valid records: {len(valid_records)}, Invalid: {invalid_count}")
ti.xcom_push(key="valid_records", value=valid_records)
ti.xcom_push(key="metrics", value={"valid_count": len(valid_records), "invalid_count": invalid_count})
return "transform" if valid_records else "notify"Solution to Exercise 5: Explain Retry Benefits
# File: de-onboarding/ex5_retries.py
def explain_retries(output_path: str) -> None:
"""Save explanation of retry benefits to a file."""
explanation = (
"Retries improve pipeline reliability by automatically re-attempting failed tasks due to "
"transient issues, such as network failures or temporary service unavailability, ensuring "
"robust execution without manual intervention."
)
with open(output_path, "w") as file:
file.write(explanation)
print(f"Explanation saved to {output_path}")
# Test
explain_retries("de-onboarding/ex5_concepts.txt")
# Output:
# Explanation saved to de-onboarding/ex5_concepts.txt
# (Creates de-onboarding/ex5_concepts.txt with explanation)Solution to Exercise 6: Debug XCom Serialization Error
# File: de-onboarding/airflow/dags/sales_etl_xcom.py (partial)
def extract_task(ti, csv_path, config_path):
"""Extract sales data with optimized XCom payload."""
logger.info(f"Extracting data from {csv_path}")
df = pd.read_csv(csv_path)
config = read_config(config_path)
records = df.to_dict("records")
ti.xcom_push(key="record_count", value=len(records)) # Push metadata
logger.info(f"Pushed metadata: {len(records)} records")
return {"record_count": len(records), "config": config}Explanation:
- XCom Bug: Pushing a large DataFrame (
records) to XCom causes serialization errors due to size limits. - Fix: Push only metadata (
len(records)) to XCom, reducing payload size. The full DataFrame can be saved to a file or processed downstream (Chapter 64).
Solution to Exercise 7: Compare Parallel vs. Sequential Tasks
# File: de-onboarding/ex7_parallel.py
def explain_parallel_sequential(output_path: str) -> None:
"""Save explanation of parallel vs. sequential task execution to a file."""
explanation = (
"Parallel task execution in Airflow allows independent tasks to run simultaneously, improving "
"performance for large datasets but requiring careful resource management. Sequential execution "
"ensures tasks complete in order, suitable for dependent tasks but slower for independent ones."
)
with open(output_path, "w") as file:
file.write(explanation)
print(f"Explanation saved to {output_path}")
# Test
explain_parallel_sequential("de-onboarding/ex7_concepts.txt")
# Output:
# Explanation saved to de-onboarding/ex7_concepts.txt
# (Creates de-onboarding/ex7_concepts.txt with explanation)58.8 Chapter Summary and Connection to Chapter 59
In this chapter, you’ve mastered:
- Complex DAGs: Multi-step workflows with dependencies (O(n) execution).
- Branching: Conditional execution with BranchPythonOperator (O(1) decisions, O(n) dependency resolution).
- Retries: Resilient tasks with retries (O(1) logic).
- Dynamic Tasks: Config-driven task generation (O(n) creation).
- Testing:
pytestfor task logic. - Logging: Type VIA LOGGING
The micro-project built a type-annotated Airflow pipeline for sales ETL, using branching, retries, and tests, producing etl_metrics.json. It prepares for Chapter 59’s Checkpoint 8, which consolidates orchestration skills with dbt, FastAPI, and Airflow, using sales.csv for a comprehensive pipeline. The modular tasks (e.g., extract_task) can be extended in Chapter 64 for Kubernetes deployment, enhancing scalability for Hijra Group’s analytics.
Connection to Chapter 59
Chapter 59 (Checkpoint 8: Pipeline Orchestration Review) builds on this chapter:
- Consolidation: Integrates Django, FastAPI, dbt, and Airflow for a robust pipeline.
- Testing: Expands
pytesttests for end-to-end validation. - Fintech Context: Aligns with Hijra Group’s need for orchestrated analytics, maintaining PEP 8’s 4-space indentation.