56 - Airflow Fundamentals
Complexity: Moderate (M)
56.0 Introduction: Why This Matters for Data Engineering
In data engineering, orchestrating complex data pipelines is essential for ensuring reliable, scalable, and maintainable workflows, especially for Hijra Group’s Sharia-compliant fintech analytics. Apache Airflow is a powerful platform for scheduling and monitoring data pipelines, enabling automation of ETL (Extract, Transform, Load) processes that handle financial transaction data. Airflow’s directed acyclic graphs (DAGs) provide a clear, visual way to define dependencies, ensuring tasks like loading sales data, transforming it with dbt, and exporting to BigQuery execute in the correct order. Airflow’s Python-based configuration aligns with Hijra Group’s Python-centric stack, and its type-annotated code (introduced in Chapter 7) ensures robust pipelines verified by Pyright.
Building on Chapter 55 (Simple Scheduling with APScheduler), this chapter introduces Airflow’s core concepts, focusing on DAGs, operators, and task dependencies for orchestrating sales ETL pipelines. It uses type annotations (Chapter 7), unittest/pytest testing (Chapter 9), and YAML configurations (Chapter 2) to maintain code quality. The micro-project orchestrates a sales ETL process using data/sales.csv and config.yaml (Appendix 1), preparing for Chapter 57 (Airflow in Docker) and Chapter 64 (Airflow in Kubernetes). All code follows PEP 8’s 4-space indentation, preferring spaces over tabs to avoid IndentationError.
Data Engineering Workflow Context
This diagram illustrates Airflow’s role in a data pipeline:
flowchart TD
A["Raw Data
sales.csv"] --> B["Airflow DAG"]
B --> C{"Task Execution"}
C -->|Extract| D["Load CSV
Pandas"]
C -->|Transform| E["Validate Data
Pandas/utils.py"]
C -->|Load| F["Export JSON
sales_results.json"]
D --> E --> F
F --> G["BigQuery/Data Mart"]
classDef data fill:#f9f9f9,stroke:#333,stroke-width:2px
classDef process fill:#d0e0ff,stroke:#336,stroke-width:1px
classDef storage fill:#ddffdd,stroke:#363,stroke-width:1px
class A,D,E,F data
class B,C process
class G storageBuilding On and Preparing For
- Building On:
- Chapter 3: Uses Pandas for data processing, extended to Airflow tasks.
- Chapter 7: Applies type annotations for type-safe code, verified by Pyright.
- Chapter 9: Incorporates
pytestfor testing DAGs and tasks. - Chapter 55: Extends APScheduler’s scheduling to Airflow’s DAG-based orchestration.
- Preparing For:
- Chapter 57: Prepares for containerized Airflow with Docker.
- Chapter 58: Enables complex workflows with retries and branching.
- Chapter 64: Supports Kubernetes-based Airflow deployments with Helm Charts.
- Chapter 67–70: Lays groundwork for capstone projects integrating Airflow with FastAPI and dbt.
What You’ll Learn
This chapter covers:
- Airflow Basics: DAGs, operators, and task dependencies.
- Type-Safe DAGs: Defining DAGs with type annotations.
- Operators: Using PythonOperator for ETL tasks.
- Testing: Validating DAGs and tasks with
pytest. - Logging: Capturing task execution details using Airflow’s logging system.
- Configuration: Using YAML for task parameters.
By the end, you’ll build a type-annotated Airflow DAG to orchestrate a sales ETL pipeline, processing data/sales.csv, validating with config.yaml, and exporting to data/sales_results.json. The pipeline will be tested with pytest to ensure reliability, using 4-space indentation per PEP 8.
Follow-Along Tips:
- Create
de-onboarding/data/and populate withsales.csvandconfig.yamlper Appendix 1. - Install Airflow:
pip install apache-airflow==2.9.3. - Initialize Airflow: Set
AIRFLOW_HOME=~/airflow, runairflow db init, and create an admin user:airflow users create --username admin --firstname Admin --lastname User --role Admin --email [email protected] --password admin - Ensure
airflow.cfguses SQLite (default for development): Check[core] sql_alchemy_conn = sqlite:////home/user/airflow/airflow.db. - Install libraries:
pip install pandas pyyaml pytest. - Configure editor for 4-space indentation (VS Code: “Editor: Tab Size” = 4, “Editor: Insert Spaces” = true, “Editor: Detect Indentation” = false).
- Use print statements (e.g.,
print(df.head())) to debug DataFrames. - Verify file paths with
ls data/(Unix/macOS) ordir data\(Windows). - Use UTF-8 encoding to avoid
UnicodeDecodeError.
56.1 Airflow Basics
Airflow orchestrates workflows using DAGs (Directed Acyclic Graphs), which define tasks and their dependencies. A DAG is a Python script that specifies:
- Tasks: Units of work (e.g., load CSV, validate data).
- Operators: Define task behavior (e.g.,
PythonOperatorfor Python functions). - Dependencies: Order of task execution (e.g.,
task1 >> task2).
Airflow’s scheduler runs tasks based on the DAG’s schedule interval, storing metadata in a database (e.g., SQLite for development). For Hijra Group’s daily sales processing, Airflow ensures tasks execute reliably, with logging for auditing.
56.1.1 Creating a DAG
Define a type-annotated DAG using Airflow’s DAG class.
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
from typing import Dict, Any
# Define DAG
with DAG(
dag_id="simple_sales_etl",
start_date=datetime(2023, 10, 1),
schedule_interval="@daily",
catchup=False,
description="A simple ETL pipeline for sales data",
) as dag:
# Define a sample task
def print_hello(**kwargs: Dict[str, Any]) -> None:
"""Print a hello message."""
ti = kwargs["ti"]
ti.log.info("Hello from Airflow!") # Use Airflow task logging
hello_task = PythonOperator(
task_id="print_hello",
python_callable=print_hello,
provide_context=True,
dag=dag,
)
# Expected Output (when run):
# [Airflow logs in UI or ~/airflow/logs]
# [2023-10-01 00:00:00,000] {taskinstance.py:123} INFO - Hello from Airflow!Follow-Along Instructions:
- Ensure
AIRFLOW_HOME=~/airflowand runairflow db init. - Save as
~/airflow/dags/simple_dag.py. - Configure editor for 4-space indentation per PEP 8.
- Start Airflow:
airflow webserver -p 8080andairflow scheduler(in separate terminals). - Access UI:
http://localhost:8080, login with admin credentials. - Trigger DAG: Enable
simple_sales_etland run manually. - Check logs in UI or
~/airflow/logs. - Common Errors:
- ModuleNotFoundError: Install
apache-airflowwithpip install apache-airflow==2.9.3. - DAG Not Visible: Ensure file is in
~/airflow/dags/. Runls ~/airflow/dags/. - IndentationError: Use 4 spaces (not tabs). Run
python -tt simple_dag.py.
- ModuleNotFoundError: Install
Key Points:
- DAG Parameters:
dag_id: Unique identifier.start_date: When DAG can start running.schedule_interval: Execution frequency (e.g.,@daily).catchup: Prevents backfilling past runs.description: Describes the DAG’s purpose in the UI.
- PythonOperator: Executes Python functions.
- Type Annotations:
**kwargs: Dict[str, Any]ensures type safety. - Time Complexity: O(1) for task definition, O(n) for scheduler checking n tasks.
- Space Complexity: O(1) for DAG metadata.
- Implication: DAGs enable scalable orchestration for Hijra Group’s pipelines.
56.1.2 Task Dependencies
Set dependencies using >> to define task order.
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
from typing import Dict, Any
# Define DAG
with DAG(
dag_id="dependent_sales_etl",
start_date=datetime(2023, 10, 1),
schedule_interval="@daily",
catchup=False,
description="An ETL pipeline with task dependencies for sales data",
) as dag:
# Define tasks
def extract_data(**kwargs: Dict[str, Any]) -> None:
"""Extract data."""
ti = kwargs["ti"]
ti.log.info("Extracting data...")
def transform_data(**kwargs: Dict[str, Any]) -> None:
"""Transform data."""
ti = kwargs["ti"]
ti.log.info("Transforming data...")
# Create tasks
extract_task = PythonOperator(
task_id="extract_data",
python_callable=extract_data,
provide_context=True,
dag=dag,
)
transform_task = PythonOperator(
task_id="transform_data",
python_callable=transform_data,
provide_context=True,
dag=dag,
)
# Set dependencies
extract_task >> transform_task # Extract before transform
# Expected Output (when run):
# [Airflow logs]
# [2023-10-01 00:00:00,000] {taskinstance.py:123} INFO - Extracting data...
# [2023-10-01 00:00:00,000] {taskinstance.py:123} INFO - Transforming data...Follow-Along Instructions:
- Save as
~/airflow/dags/dependent_dag.py. - Configure editor for 4-space indentation per PEP 8.
- Restart scheduler:
airflow scheduler. - Trigger DAG in UI.
- Verify task order in logs.
- Common Errors:
- CycleError: Ensure no cyclic dependencies. Check DAG graph in UI.
- IndentationError: Use 4 spaces (not tabs). Run
python -tt dependent_dag.py.
Key Points:
>>: Defines sequential execution.- Time Complexity: O(1) for dependency setup.
- Space Complexity: O(1) for dependency metadata.
- Implication: Ensures correct task order for ETL processes.
56.2 Micro-Project: Sales ETL Pipeline with Airflow
Project Requirements
Build a type-annotated Airflow DAG to orchestrate a sales ETL pipeline for Hijra Group’s analytics, processing data/sales.csv, validating with config.yaml, and exporting to data/sales_results.json. The pipeline includes three tasks:
- Extract: Load
sales.csvwith Pandas. - Transform: Validate data using
utils.pyand compute metrics. - Load: Export results to JSON.
The DAG uses PythonOperator, type annotations, and pytest tests, with logging for auditing. It aligns with Hijra Group’s need for automated, reliable sales processing. Note that XComs are used to pass data between tasks, suitable for small datasets like sales.csv (6 rows). In production, large DataFrames should be stored in files or databases to avoid XCom size limitations.
Sample Input Files
data/sales.csv (Appendix 1):
product,price,quantity
Halal Laptop,999.99,2
Halal Mouse,24.99,10
Halal Keyboard,49.99,5
,29.99,3
Monitor,invalid,2
Headphones,5.00,150data/config.yaml (Appendix 1):
min_price: 10.0
max_quantity: 100
required_fields:
- product
- price
- quantity
product_prefix: 'Halal'
max_decimals: 2Data Processing Flow
flowchart TD
A["Input CSV
sales.csv"] --> B["Airflow DAG"]
B --> C["Extract Task
Load CSV"]
C --> D["Transform Task
Validate/Compute"]
D --> E["Load Task
Export JSON"]
E --> F["Output JSON
sales_results.json"]
classDef data fill:#f9f9f9,stroke:#333,stroke-width:2px
classDef process fill:#d0e0ff,stroke:#336,stroke-width:1px
classDef output fill:#ddffdd,stroke:#363,stroke-width:1px
class A,F data
class B,C,D,E processAcceptance Criteria
- Go Criteria:
- Defines a type-annotated DAG with three tasks (extract, transform, load).
- Loads
sales.csvandconfig.yamlcorrectly, validating YAML structure. - Validates records for Halal prefix, numeric price/quantity, positive prices, and config rules.
- Computes total sales and top 3 products.
- Exports results to
data/sales_results.json. - Logs task execution and validation using Airflow’s logging system.
- Uses 4-space indentation per PEP 8, preferring spaces over tabs.
- Passes
pytesttests for DAG structure, task output, edge cases (e.g., empty CSV), and log output.
- No-Go Criteria:
- Fails to load input files.
- Incorrect validation or calculations.
- Missing JSON export.
- Lacks type annotations or tests.
- Inconsistent indentation or tab/space mixing.
Common Pitfalls to Avoid
- Airflow Setup Issues:
- Problem: DAG not visible in UI.
- Solution: Ensure file is in
~/airflow/dags/. Runls ~/airflow/dags/.
- FileNotFoundError:
- Problem:
sales.csvorconfig.yamlmissing. - Solution: Use relative paths and verify with
os.path.exists(csv_path). Ensure files are inde-onboarding/data/.
- Problem:
- Validation Errors:
- Problem: Missing values cause filtering issues.
- Solution: Use
dropna()and logdf.head().
- Type Mismatches:
- Problem: Non-numeric prices cause errors.
- Solution: Validate with
utils.is_numeric_value. Logdf.dtypes.
- Task Dependency Errors:
- Problem: Tasks run out of order.
- Solution: Verify
>>dependencies. Check DAG graph in UI.
- XCom Size Limitations:
- Problem: Large DataFrames cause XCom storage issues.
- Solution: For small datasets like
sales.csv, XComs are fine. In production, save large data to files (Chapter 58).
- Airflow Variables:
- Problem: Hardcoded paths reduce portability.
- Solution: Current relative paths are sufficient. In production, use Airflow Variables for paths (Chapter 58).
- Airflow Connections:
- Problem: Hardcoded credentials limit scalability.
- Solution: Current file-based approach is sufficient. In production, use Airflow Connections for database credentials (Chapter 58).
- YAML Validation Errors:
- Problem: Missing or malformed
config.yamlkeys. - Solution: Validate required keys in
read_configand log errors.
- Problem: Missing or malformed
- IndentationError:
- Problem: Mixed spaces/tabs.
- Solution: Use 4 spaces per PEP 8. Run
python -tt sales_etl_dag.py.
How This Differs from Production
In production, this solution would include:
- Error Handling: Retries and alerts (Chapter 58).
- Scalability: Distributed executors (Chapter 64).
- Security: Encrypted connections and PII masking (Chapter 65).
- Observability: Monitoring with Prometheus (Chapter 66).
- Testing: Integration tests with Airflow’s
TestMode(Chapter 58).
Implementation
# File: de-onboarding/utils.py (updated from Chapter 3)
from typing import Union, Any, Optional
from airflow.models import TaskInstance
def is_numeric(s: str, max_decimals: int = 2) -> bool:
"""Check if string is a decimal number with up to max_decimals."""
parts = s.split(".")
if len(parts) != 2 or not parts[0].isdigit() or not parts[1].isdigit():
return False
return len(parts[1]) <= max_decimals
def clean_string(s: Any) -> str:
"""Strip whitespace from string."""
return str(s).strip()
def is_numeric_value(x: Any) -> bool:
"""Check if value is numeric."""
return isinstance(x, (int, float))
def has_valid_decimals(x: Any, max_decimals: int) -> bool:
"""Check if value has valid decimal places."""
return is_numeric(str(x), max_decimals)
def apply_valid_decimals(x: Any, max_decimals: int) -> bool:
"""Apply has_valid_decimals to a value."""
return has_valid_decimals(x, max_decimals)
def is_integer(x: Any) -> bool:
"""Check if value is an integer."""
return str(x).isdigit()
def validate_sale(sale: dict, config: dict, ti: Optional[TaskInstance] = None) -> bool:
"""Validate sale based on config rules."""
required_fields = config["required_fields"]
min_price = config["min_price"]
max_quantity = config["max_quantity"]
prefix = config["product_prefix"]
max_decimals = config["max_decimals"]
# Fallback to print for non-Airflow contexts, e.g., standalone scripts
log = ti.log.info if ti else print
log(f"Validating sale: {sale}")
for field in required_fields:
if not sale.get(field) or clean_string(sale[field]) == "":
log(f"Invalid sale: missing {field}: {sale}")
return False
product = clean_string(sale["product"])
if not product.startswith(prefix):
log(f"Invalid sale: product lacks '{prefix}' prefix: {sale}")
return False
price = clean_string(str(sale["price"]))
if not is_numeric(price, max_decimals) or float(price) < min_price or float(price) <= 0:
log(f"Invalid sale: invalid price: {sale}")
return False
quantity = clean_string(str(sale["quantity"]))
if not quantity.isdigit() or int(quantity) > max_quantity:
log(f"Invalid sale: invalid quantity: {sale}")
return False
return True# File: ~/airflow/dags/sales_etl_dag.py
"""
Orchestrates a sales ETL pipeline for Hijra Group’s analytics, processing sales.csv,
validating with config.yaml, and exporting to sales_results.json.
"""
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
import pandas as pd
import yaml
import json
from typing import Dict, Any
import os
import time
# Define DAG
with DAG(
dag_id="sales_etl_pipeline",
start_date=datetime(2023, 10, 1),
schedule_interval="@daily",
catchup=False,
tags=["sales", "etl"],
description="Orchestrates a sales ETL pipeline for Hijra Group’s analytics",
) as dag:
def read_config(config_path: str, **kwargs: Dict[str, Any]) -> Dict[str, Any]:
"""Read and validate YAML configuration."""
ti = kwargs["ti"]
ti.log.info(f"Opening config: {config_path}")
if not os.path.exists(config_path):
ti.log.error(f"Config file not found: {config_path}")
raise FileNotFoundError(f"Config file not found: {config_path}")
with open(config_path, "r") as file:
config = yaml.safe_load(file)
required_keys = ["min_price", "max_quantity", "required_fields", "product_prefix", "max_decimals"]
missing_keys = [key for key in required_keys if key not in config]
if missing_keys:
ti.log.error(f"Missing config keys: {missing_keys}")
raise ValueError(f"Missing config keys: {missing_keys}")
ti.log.info(f"Loaded config: {config}")
return config
def extract_data(**kwargs: Dict[str, Any]) -> Dict[str, Any]:
"""Extract sales data from CSV."""
ti = kwargs["ti"]
base_dir = os.path.dirname(os.path.dirname(__file__))
csv_path = os.path.join(base_dir, "data", "sales.csv")
ti.log.info(f"Extracting CSV: {csv_path}")
if not os.path.exists(csv_path):
ti.log.error(f"CSV file not found: {csv_path}")
raise FileNotFoundError(f"CSV file not found: {csv_path}")
df = pd.read_csv(csv_path)
ti.log.info(f"Initial DataFrame:\n{df.head().to_string()}")
return {"df": df.to_dict(orient="records"), "total_records": len(df)}
def transform_data(**kwargs: Dict[str, Any]) -> Dict[str, Any]:
"""Validate and transform sales data."""
ti = kwargs["ti"]
start_time = time.time()
base_dir = os.path.dirname(os.path.dirname(__file__))
config_path = os.path.join(base_dir, "data", "config.yaml")
extracted_data = ti.xcom_pull(task_ids="extract_data")
if extracted_data is None:
ti.log.error("No data received from extract_data")
return {"results": {"total_sales": 0.0, "unique_products": [], "top_products": {}}, "valid_sales": 0}
df = pd.DataFrame(extracted_data["df"])
total_records = extracted_data["total_records"]
config = read_config(config_path, **kwargs)
ti.log.info(f"Validating {total_records} records")
# Validate using utils
valid_records = []
for record in df.to_dict(orient="records"):
from utils import validate_sale
if validate_sale(record, config, ti=ti):
valid_records.append(record)
valid_df = pd.DataFrame(valid_records)
if valid_df.empty:
ti.log.warning("No valid sales data")
return {"results": {"total_sales": 0.0, "unique_products": [], "top_products": {}}, "valid_sales": 0}
# Compute metrics
valid_df["amount"] = valid_df["price"] * valid_df["quantity"]
total_sales = valid_df["amount"].sum()
unique_products = valid_df["product"].unique().tolist()
sales_by_product = valid_df.groupby("product")["amount"].sum()
top_products = sales_by_product.sort_values(ascending=False).head(3).to_dict()
end_time = time.time()
ti.log.info(f"Valid sales: {len(valid_df)} records")
ti.log.info(f"Total sales: ${round(total_sales, 2)}")
ti.log.info(f"Transform took {end_time - start_time:.2f}s")
return {
"results": {
"total_sales": float(total_sales),
"unique_products": unique_products,
"top_products": top_products
},
"valid_sales": len(valid_df)
}
def load_data(**kwargs: Dict[str, Any]) -> None:
"""Export results to JSON."""
ti = kwargs["ti"]
base_dir = os.path.dirname(os.path.dirname(__file__))
json_path = os.path.join(base_dir, "data", "sales_results.json")
results = ti.xcom_pull(task_ids="transform_data")["results"]
ti.log.info(f"Writing to: {json_path}")
ti.log.info(f"Results: {results}")
with open(json_path, "w") as file:
json.dump(results, file, indent=2)
ti.log.info(f"Exported results to {json_path}")
# Define tasks
extract_task = PythonOperator(
task_id="extract_data",
python_callable=extract_data,
provide_context=True,
)
transform_task = PythonOperator(
task_id="transform_data",
python_callable=transform_data,
provide_context=True,
)
load_task = PythonOperator(
task_id="load_data",
python_callable=load_data,
provide_context=True,
)
# Set dependencies
extract_task >> transform_task >> load_task# File: de-onboarding/tests/test_sales_etl_dag.py
import pytest
from airflow.models import DagBag
import pandas as pd
from typing import Dict, Any
import os
import logging
import io
from contextlib import redirect_stderr
@pytest.fixture
def dagbag():
"""Load DAGs for testing."""
return DagBag(dag_folder="dags/", include_examples=False)
def test_dag_loads(dagbag):
"""Test DAG loads without errors."""
dag = dagbag.get_dag(dag_id="sales_etl_pipeline")
assert dag is not None, "DAG failed to load"
assert len(dag.tasks) == 3, f"Expected 3 tasks, got {len(dag.tasks)}"
assert dag.description == "Orchestrates a sales ETL pipeline for Hijra Group’s analytics"
def test_task_dependencies(dagbag):
"""Test task dependencies."""
dag = dagbag.get_dag(dag_id="sales_etl_pipeline")
extract_task = dag.get_task("extract_data")
transform_task = dag.get_task("transform_data")
load_task = dag.get_task("load_data")
assert extract_task.downstream_task_ids == {"transform_data"}
assert transform_task.downstream_task_ids == {"load_data"}
assert transform_task.upstream_task_ids == {"extract_data"}
assert load_task.upstream_task_ids == {"transform_data"}
def test_extract_data():
"""Test extract_data function."""
from dags.sales_etl_dag import extract_data
base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
csv_path = os.path.join(base_dir, "data", "sales.csv")
result = extract_data(csv_path=csv_path)
assert "df" in result
assert "total_records" in result
assert result["total_records"] == 6
df = pd.DataFrame(result["df"])
assert len(df) == 6
def test_transform_data_empty_input():
"""Test transform_data with empty input."""
from dags.sales_etl_dag import transform_data
base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
config_path = os.path.join(base_dir, "data", "config.yaml")
sample_data = {"df": [], "total_records": 0}
result = transform_data(ti=MockTaskInstance(sample_data), config_path=config_path)
assert result["results"]["total_sales"] == 0.0
assert result["results"]["unique_products"] == []
assert result["results"]["top_products"] == {}
assert result["valid_sales"] == 0
def test_transform_data_valid_input():
"""Test transform_data with valid input."""
from dags.sales_etl_dag import transform_data
base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
config_path = os.path.join(base_dir, "data", "config.yaml")
sample_data = {
"df": [
{"product": "Halal Laptop", "price": 999.99, "quantity": 2},
{"product": "Halal Mouse", "price": 24.99, "quantity": 10},
{"product": "Monitor", "price": 199.99, "quantity": 2}
],
"total_records": 3
}
result = transform_data(ti=MockTaskInstance(sample_data), config_path=config_path)
assert "results" in result
assert "valid_sales" in result
assert result["valid_sales"] == 2
assert result["results"]["total_sales"] == 2249.88
def test_transform_data_logging(capfd):
"""Test transform_data logging output."""
from dags.sales_etl_dag import transform_data
base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
config_path = os.path.join(base_dir, "data", "config.yaml")
sample_data = {
"df": [
{"product": "Halal Laptop", "price": 999.99, "quantity": 2}
],
"total_records": 1
}
with redirect_stderr(io.StringIO()) as stderr:
transform_data(ti=MockTaskInstance(sample_data), config_path=config_path)
log_output = stderr.getvalue()
assert "Validating sale: {'product': 'Halal Laptop'" in log_output
assert "Valid sales: 1 records" in log_output
class MockTaskInstance:
"""Mock TaskInstance for testing."""
def __init__(self, data: Dict[str, Any]):
self.data = data
self.log = logging.getLogger("mock_task_instance")
def xcom_pull(self, task_ids: str) -> Dict[str, Any]:
return self.dataExpected Outputs
data/sales_results.json:
{
"total_sales": 2499.83,
"unique_products": ["Halal Laptop", "Halal Mouse", "Halal Keyboard"],
"top_products": {
"Halal Laptop": 1999.98,
"Halal Mouse": 249.9,
"Halal Keyboard": 249.95
}
}Console Output (Airflow logs, abridged):
[2023-10-01 00:00:00,000] {taskinstance.py:123} INFO - Opening config: .../data/config.yaml
[2023-10-01 00:00:00,000] {taskinstance.py:123} INFO - Loaded config: {'min_price': 10.0, ...}
[2023-10-01 00:00:00,000] {taskinstance.py:123} INFO - Extracting CSV: .../data/sales.csv
[2023-10-01 00:00:00,000] {taskinstance.py:123} INFO - Initial DataFrame:
product price quantity
0 Halal Laptop 999.99 2
...
[2023-10-01 00:00:00,000] {taskinstance.py:123} INFO - Validating sale: {'product': 'Halal Laptop', ...}
[2023-10-01 00:00:00,000] {taskinstance.py:123} INFO - Validating 6 records
[2023-10-01 00:00:00,000] {taskinstance.py:123} INFO - Valid sales: 3 records
[2023-10-01 00:00:00,000] {taskinstance.py:123} INFO - Total sales: $2499.83
[2023-10-01 00:00:00,000] {taskinstance.py:123} INFO - Transform took 0.01s
[2023-10-01 00:00:00,000] {taskinstance.py:123} INFO - Writing to: .../data/sales_results.json
[2023-10-01 00:00:00,000] {taskinstance.py:123} INFO - Exported results to .../data/sales_results.jsonTest Output:
pytest tests/test_sales_etl_dag.py
# Expected: All tests passExercise 4 Output (de-onboarding/ex4_concepts.txt):
Airflow’s DAGs provide visual dependency management, a web UI for monitoring, and robust scheduling compared to APScheduler. For Hijra Group’s sales pipelines, Airflow ensures tasks like data validation and export run in order, with logs and retries, while APScheduler lacks a UI and dependency graphs.Exercise 5 Output (de-onboarding/ex5_performance.txt):
The transform_data task took 0.01s for 6 records. Using Pandas filtering (e.g., df[df["product"].str.startswith("Halal")]) instead of a loop could reduce execution time for larger datasets by leveraging vectorized operations.How to Run and Test
Setup:
- Setup Checklist:
- Create
de-onboarding/data/withsales.csvandconfig.yamlper Appendix 1. - Install libraries:
pip install apache-airflow==2.9.3 pandas pyyaml pytest. - Set
AIRFLOW_HOME=~/airflowand runairflow db init. - Create admin user:
airflow users create --username admin --firstname Admin --lastname User --role Admin --email [email protected] --password admin. - Create
~/airflow/dags/and savesales_etl_dag.py. - Create
de-onboarding/tests/and savetest_sales_etl_dag.py. - Save
utils.pyinde-onboarding/. - Configure editor for 4-space indentation per PEP 8.
- Create
- Troubleshooting:
- If DAG not visible, check
~/airflow/dags/and restart scheduler. - If
FileNotFoundError, verify paths withos.path.abspath(csv_path). - If
yaml.YAMLError, logopen(config_path).read()to check syntax. - If
IndentationError, runpython -tt sales_etl_dag.py.
- If DAG not visible, check
- Setup Checklist:
Run:
- Start Airflow:
airflow webserver -p 8080andairflow scheduler. - Access UI:
http://localhost:8080. - Enable and trigger
sales_etl_pipeline. - Check logs and verify
data/sales_results.json.
- Start Airflow:
Test:
- Run:
pytest de-onboarding/tests/test_sales_etl_dag.py. - Verify all tests pass, checking DAG structure, dependencies, task outputs, edge cases, and log output.
- Run:
56.3 Practice Exercises
Exercise 1: Simple DAG Creation
Write a type-annotated DAG with one PythonOperator task that logs a message, using 4-space indentation per PEP 8.
Expected Output (logs):
[2023-10-01 00:00:00,000] {taskinstance.py:123} INFO - Hello from Airflow DAG!Follow-Along Instructions:
- Save as
~/airflow/dags/ex1_dag.py. - Configure editor for 4-space indentation.
- Run Airflow and trigger DAG.
- How to Test:
- Check logs in UI.
- Verify task executes without errors.
Exercise 2: Task Dependencies
Write a type-annotated DAG with two PythonOperator tasks (task1, task2) where task2 depends on task1, using 4-space indentation.
Expected Output (logs):
[2023-10-01 00:00:00,000] {taskinstance.py:123} INFO - Task 1 completed
[2023-10-01 00:00:00,000] {taskinstance.py:123} INFO - Task 2 completedFollow-Along Instructions:
- Save as
~/airflow/dags/ex2_dag.py. - Configure editor for 4-space indentation.
- Run Airflow and trigger DAG.
- How to Test:
- Verify task order in logs.
- Check DAG graph in UI.
Exercise 3: ETL Task Implementation
Write a type-annotated PythonOperator task to load data/sales.csv and return a DataFrame, using 4-space indentation.
Expected Output:
[2023-10-01 00:00:00,000] {taskinstance.py:123} INFO - DataFrame with 6 records loadedFollow-Along Instructions:
- Save as
~/airflow/dags/ex3_dag.py. - Configure editor for 4-space indentation.
- Run Airflow and trigger DAG.
- How to Test:
- Check logs for DataFrame size.
- Verify output with
ti.log.info(df.head()).
Exercise 4: Benefits of Airflow over APScheduler
Explain the benefits of using Airflow’s DAGs over APScheduler (Chapter 55) for orchestrating Hijra Group’s sales pipelines in 2–3 sentences, focusing on dependency management, UI, and Hijra Group’s context. Save the explanation to de-onboarding/ex4_concepts.txt. Use 4-space indentation for any code examples.
Expected Output (ex4_concepts.txt):
Airflow’s DAGs provide visual dependency management, a web UI for monitoring, and robust scheduling compared to APScheduler. For Hijra Group’s sales pipelines, Airflow ensures tasks like data validation and export run in order, with logs and retries, while APScheduler lacks a UI and dependency graphs.Follow-Along Instructions:
- Create
de-onboarding/ex4_concepts.txt. - Write a 2–3 sentence explanation, addressing dependency management, UI, and Hijra Group’s needs.
- How to Test:
- Verify the file exists:
cat de-onboarding/ex4_concepts.txt(Unix/macOS) ortype de-onboarding\ex4_concepts.txt(Windows). - Ensure the explanation is 2–3 sentences and covers the specified points.
- Verify the file exists:
Exercise 5: Debug an XCom Bug
Fix this buggy DAG where transform_data fails due to missing XCom data, ensuring 4-space indentation. Hint: Check if extract_data returns data for XCom, as ti.xcom_pull may return None if no data is pushed.
Buggy Code:
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
from typing import Dict, Any
with DAG(
dag_id="xcom_bug_dag",
start_date=datetime(2023, 10, 1),
schedule_interval="@daily",
catchup=False,
) as dag:
def extract_data(**kwargs: Dict[str, Any]) -> None:
"""Extract data but forget to return."""
ti = kwargs["ti"]
ti.log.info("Extracting data...")
# Bug: No return statement
def transform_data(**kwargs: Dict[str, Any]) -> None:
"""Transform data."""
ti = kwargs["ti"]
data = ti.xcom_pull(task_ids="extract_data")
ti.log.info(f"Transforming data: {data}")
extract_task = PythonOperator(
task_id="extract_data",
python_callable=extract_data,
provide_context=True,
)
transform_task = PythonOperator(
task_id="transform_data",
python_callable=transform_data,
provide_context=True,
)
extract_task >> transform_taskExpected Output (logs):
[2023-10-01 00:00:00,000] {taskinstance.py:123} INFO - Extracting data...
[2023-10-01 00:00:00,000] {taskinstance.py:123} INFO - Transforming data: Sample dataFollow-Along Instructions:
- Save as
~/airflow/dags/ex5_dag.py. - Run Airflow and observe the error (e.g.,
NoneTypeintransform_data). - Fix and re-run.
- How to Test:
- Verify
transform_datalogs the correct data. - Check task order in logs.
- Verify
Exercise 6: Performance Analysis of transform_data
Analyze the execution time of the transform_data task in sales_etl_dag.py (logged via time.time()) for data/sales.csv and suggest one optimization to reduce it for larger datasets (e.g., thousands of records). Save the analysis to de-onboarding/ex6_performance.txt in 2–3 sentences. Use 4-space indentation for any code examples.
Expected Output (ex6_performance.txt):
The transform_data task took 0.01s for 6 records. Using Pandas filtering (e.g., df[df["product"].str.startswith("Halal")]) instead of a loop could reduce execution time for larger datasets by leveraging vectorized operations.Follow-Along Instructions:
- Run the
sales_etl_pipelineDAG and note thetransform_dataexecution time from logs. - Create
de-onboarding/ex6_performance.txt. - Write a 2–3 sentence analysis, including the logged time and one optimization.
- How to Test:
- Verify the file exists:
cat de-onboarding/ex6_performance.txt(Unix/macOS) ortype de-onboarding\ex6_performance.txt(Windows). - Ensure the analysis is 2–3 sentences and suggests a valid optimization.
- Verify the file exists:
56.4 Exercise Solutions
Solution to Exercise 1: Simple DAG Creation
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
from typing import Dict, Any
with DAG(
dag_id="ex1_dag",
start_date=datetime(2023, 10, 1),
schedule_interval="@daily",
catchup=False,
description="A simple DAG with one logging task",
) as dag:
def log_message(**kwargs: Dict[str, Any]) -> None:
"""Log a message."""
ti = kwargs["ti"]
ti.log.info("Hello from Airflow DAG!")
task = PythonOperator(
task_id="log_message",
python_callable=log_message,
provide_context=True,
dag=dag,
)Solution to Exercise 2: Task Dependencies
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
from typing import Dict, Any
with DAG(
dag_id="ex2_dag",
start_date=datetime(2023, 10, 1),
schedule_interval="@daily",
catchup=False,
description="A DAG with two dependent tasks",
) as dag:
def task1(**kwargs: Dict[str, Any]) -> None:
"""Run task 1."""
ti = kwargs["ti"]
ti.log.info("Task 1 completed")
def task2(**kwargs: Dict[str, Any]) -> None:
"""Run task 2."""
ti = kwargs["ti"]
ti.log.info("Task 2 completed")
task1_op = PythonOperator(
task_id="task1",
python_callable=task1,
provide_context=True,
dag=dag,
)
task2_op = PythonOperator(
task_id="task2",
python_callable=task2,
provide_context=True,
dag=dag,
)
task1_op >> task2_opSolution to Exercise 3: ETL Task Implementation
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
import pandas as pd
from typing import Dict, Any
import os
with DAG(
dag_id="ex3_dag",
start_date=datetime(2023, 10, 1),
schedule_interval="@daily",
catchup=False,
description="A DAG with an ETL task to load sales data",
) as dag:
def load_sales(**kwargs: Dict[str, Any]) -> Dict[str, Any]:
"""Load sales CSV."""
ti = kwargs["ti"]
base_dir = os.path.dirname(os.path.dirname(__file__))
csv_path = os.path.join(base_dir, "data", "sales.csv")
df = pd.read_csv(csv_path)
ti.log.info(f"DataFrame with {len(df)} records loaded")
return {"df": df.to_dict(orient="records")}
task = PythonOperator(
task_id="load_sales",
python_callable=load_sales,
provide_context=True,
dag=dag,
)Solution to Exercise 4: Benefits of Airflow over APScheduler
File: de-onboarding/ex4_concepts.txt
Airflow’s DAGs provide visual dependency management, a web UI for monitoring, and robust scheduling compared to APScheduler. For Hijra Group’s sales pipelines, Airflow ensures tasks like data validation and export run in order, with logs and retries, while APScheduler lacks a UI and dependency graphs.Solution to Exercise 5: Debug an XCom Bug
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
from typing import Dict, Any
with DAG(
dag_id="ex5_dag",
start_date=datetime(2023, 10, 1),
schedule_interval="@daily",
catchup=False,
description="A DAG with fixed XCom data transfer",
) as dag:
def extract_data(**kwargs: Dict[str, Any]) -> str:
"""Extract data and return for XCom."""
ti = kwargs["ti"]
ti.log.info("Extracting data...")
return "Sample data" # Fix: Return data for XCom
def transform_data(**kwargs: Dict[str, Any]) -> None:
"""Transform data."""
ti = kwargs["ti"]
data = ti.xcom_pull(task_ids="extract_data")
ti.log.info(f"Transforming data: {data}")
extract_task = PythonOperator(
task_id="extract_data",
python_callable=extract_data,
provide_context=True,
dag=dag,
)
transform_task = PythonOperator(
task_id="transform_data",
python_callable=transform_data,
provide_context=True,
dag=dag,
)
extract_task >> transform_taskExplanation:
- Bug:
extract_datalacked a return statement, causingti.xcom_pullto returnNone. - Fix: Added
return "Sample data"toextract_data, ensuring XCom data is available.
Solution to Exercise 6: Performance Analysis of transform_data
File: de-onboarding/ex6_performance.txt
The transform_data task took 0.01s for 6 records. Using Pandas filtering (e.g., df[df["product"].str.startswith("Halal")]) instead of a loop could reduce execution time for larger datasets by leveraging vectorized operations.56.5 Chapter Summary and Connection to Chapter 57
In this chapter, you’ve mastered:
- Airflow DAGs: Defining workflows with tasks and dependencies.
- Type-Safe Code: Using type annotations for robust tasks.
- Testing: Validating DAGs with
pytest, including edge cases and log output. - Logging: Capturing execution details using Airflow’s logging system.
- Performance: Measuring and analyzing task execution time for optimization.
- White-Space Sensitivity: Using 4-space indentation per PEP 8.
The micro-project built a type-annotated Airflow DAG to orchestrate a sales ETL pipeline, processing data/sales.csv, validating with config.yaml, and exporting to data/sales_results.json. It used pytest to ensure reliability, including edge-case handling (e.g., empty inputs) and log verification, aligning with Hijra Group’s need for automated pipelines.
Connection to Chapter 57
Chapter 57: Airflow in Docker builds on this chapter by:
- Containerization: Deploying Airflow in Docker for portability.
- ETL Pipeline: Extending the sales ETL DAG with Docker Compose.
- Testing: Using
pytestto validate containerized tasks. - Scalability: Preparing for Kubernetes deployments in Chapter 64.
- Fintech Context: Ensuring reliable orchestration for Hijra Group’s analytics, maintaining PEP 8’s 4-space indentation.