64 - Airflow in Kubernetes
Complexity: Moderate (M)
64.0 Introduction: Why This Matters for Data Engineering
In data engineering, orchestrating complex data pipelines at scale is critical for Hijra Group’s Sharia-compliant fintech analytics, where financial transaction data must be processed reliably across distributed systems. Apache Airflow, a powerful workflow orchestrator, excels in scheduling and monitoring data pipelines, while Kubernetes provides a scalable, containerized environment for deploying Airflow, ensuring high availability and resource efficiency. Deploying Airflow in Kubernetes with Helm Charts, as covered in this chapter, enables Hijra Group to manage thousands of daily transactions with fault tolerance and observability, critical for compliance with Islamic Financial Services Board (IFSB) standards.
Building on Phase 9 (Chapters 60–63), which introduced Docker, Kubernetes fundamentals, Helm Charts, and PostgreSQL in Kubernetes, this chapter focuses on deploying a type-annotated Airflow instance in Kubernetes using Helm, integrating with PostgreSQL and processing data/sales.csv. All Python code includes type annotations verified by Pyright (from Chapter 7) and tests using pytest (from Chapter 9), with 4-space indentation per PEP 8, preferring spaces over tabs to avoid IndentationError. The micro-project orchestrates a sales ETL pipeline, preparing for security (Chapter 65) and observability (Chapter 66).
Data Engineering Workflow Context
This diagram illustrates Airflow’s role in a Kubernetes-based pipeline:
flowchart TD
A["Raw Data
sales.csv"] --> B["Kubernetes Cluster"]
B --> C["Airflow Helm Chart"]
C --> D["Airflow Scheduler"]
C --> E["Airflow Webserver"]
D --> F["ETL DAG"]
F --> G["PostgreSQL Pod"]
F --> H["BigQuery"]
G --> I["Processed Data"]
H --> I
classDef data fill:#f9f9f9,stroke:#333,stroke-width:2px
classDef process fill:#d0e0ff,stroke:#336,stroke-width:1px
classDef storage fill:#ddffdd,stroke:#363,stroke-width:1px
classDef cluster fill:#ffddaa,stroke:#663,stroke-width:1px
class A,I data
class B cluster
class C,D,E,F process
class G,H storageBuilding On and Preparing For
- Building On:
- Chapter 56: Airflow fundamentals, defining DAGs for ETL processes.
- Chapter 57: Dockerized Airflow, containerizing DAGs.
- Chapter 60: Docker for data applications, packaging pipelines.
- Chapter 61: Kubernetes fundamentals, deploying pods and Helm Charts.
- Chapter 63: PostgreSQL in Kubernetes, managing stateful databases.
- Preparing For:
- Chapter 65: Secures Airflow with OAuth2 and PII masking.
- Chapter 66: Adds observability with monitoring and logging.
- Chapters 67–70: Integrates Airflow in capstone projects with FastAPI and Helm.
What You’ll Learn
This chapter covers:
- Airflow in Kubernetes: Deploying Airflow using Helm Charts.
- Type-Annotated DAGs: Writing robust DAGs with Pyright verification.
- Integration with PostgreSQL: Connecting Airflow to Kubernetes-hosted databases.
- Testing: Validating DAGs with
pytest. - Logging: Tracking pipeline execution with YAML-configured logs.
The micro-project deploys a type-annotated Airflow instance in Kubernetes, orchestrating a sales ETL pipeline that loads data/sales.csv, validates it, and stores results in PostgreSQL, with pytest tests ensuring reliability.
Follow-Along Tips:
- Create
de-onboarding/data/withsales.csvandconfig.yamlper Appendix 1. - Install tools:
pip install apache-airflow psycopg2-binary pyyaml pytest kubernetes,kubectl,helm,minikube, and Docker Desktop. - Start
minikubewith adequate resources:minikube start --cpus=2 --memory=4g. If resource errors occur, checkminikube logsfor issues like “insufficient memory”. - Use 4-space indentation per PEP 8. Run
python -tt script.pyto detect tab/space mixing. - Debug with
kubectl logs,kubectl describe, andairflow dags list. - Verify Helm Chart installation with
helm list. - Use
minikubefor local Kubernetes testing.
64.1 Airflow in Kubernetes with Helm
Airflow in Kubernetes runs as a set of pods (webserver, scheduler, workers) managed by a Helm Chart, which simplifies configuration and deployment. Helm Charts are templates for Kubernetes resources, enabling scalable deployments with O(1) resource lookup and O(n) pod scheduling for n pods. For Hijra Group’s pipelines, this ensures reliable orchestration of thousands of daily transactions.
64.1.1 Helm Chart Setup
Install the official Airflow Helm Chart and configure it for PostgreSQL.
# Add Helm repository
helm repo add apache-airflow https://airflow.apache.org
helm repo update
# Create namespace
kubectl create namespace airflow
# Install Airflow Helm Chart
helm install airflow apache-airflow/airflow \
--namespace airflow \
--set executor=LocalExecutor \
--set postgresql.enabled=false \
--set dags.persistence.enabled=false \
--set webserver.defaultUser.enabled=true \
--set webserver.defaultUser.username=admin \
--set webserver.defaultUser.password=adminFollow-Along Instructions:
- Start
minikube:minikube start --cpus=2 --memory=4g. - Install Helm: Verify with
helm version. - Pin the Helm Chart version for stability (e.g.,
--version 1.10.0for Airflow Helm Chart; check latest withhelm search repo airflow). - Run commands above in
de-onboarding/. - Verify pods:
kubectl get pods -n airflow. - Access Airflow UI:
kubectl port-forward svc/airflow-webserver 8080:8080 -n airflow, then openhttp://localhost:8080(login: admin/admin). Check DAG status (e.g., “success” or “failed”) in the UI after triggering. A screenshot of the UI would be provided in a production-grade course but is omitted here for simplicity. - Common Errors:
- Helm Not Found: Install Helm (
brew install helmon macOS,choco install kubernetes-helmon Windows). - Pod CrashLoopBackOff: Check logs with
kubectl logs <pod-name> -n airflow. - Connection Refused: Ensure port-forwarding is active.
- Helm Not Found: Install Helm (
Key Points:
- Helm Chart: Defines Airflow components (webserver, scheduler, workers).
- LocalExecutor: Runs tasks sequentially, suitable for small pipelines.
- Time Complexity: O(1) for Helm resource lookup, O(n) for scheduling n pods.
- Space Complexity: O(n) for n pods’ configurations (~10MB per pod).
- Implication: Simplifies Airflow deployment for Hijra Group’s pipelines.
64.1.2 Connecting to PostgreSQL
Configure Airflow to use a PostgreSQL database in Kubernetes (from Chapter 63).
Note: This chapter assumes a PostgreSQL service (
postgresql-service) deployed in Chapter 63 using the Bitnami PostgreSQL Helm Chart (e.g.,helm install postgresql bitnami/postgresql --namespace airflow). Review Chapter 63 for setup details, including credentials and service configuration.
# File: de-onboarding/values.yaml
airflow:
# Note: Resource limits (e.g., CPU: 500m, memory: 1Gi) will be configured in Chapter 66 for production-grade deployments
executor: LocalExecutor
config:
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: 'postgresql+psycopg2://user:[email protected]:5432/airflow'
dags:
persistence:
enabled: true
existingClaim: airflow-dags-pvcApply the configuration:
helm upgrade airflow apache-airflow/airflow \
--namespace airflow \
--values values.yamlFollow-Along Instructions:
- Save
values.yamlinde-onboarding/. - Replace
user:passwordandpostgresql-servicewith your PostgreSQL credentials and service name (from Chapter 63). - Run Helm upgrade command.
- Verify connection:
kubectl exec -it <airflow-scheduler-pod> -n airflow -- airflow connections list. - Common Errors:
- Connection Error: Check PostgreSQL service with
kubectl get svc -n airflow. - Credentials Issue: Verify credentials in
values.yaml.
- Connection Error: Check PostgreSQL service with
Key Points:
- SQL_ALCHEMY_CONN: Configures Airflow’s metadata database.
- Time Complexity: O(1) for connection setup.
- Space Complexity: O(1) for configuration storage.
- Implication: Ensures persistent metadata for Airflow tasks.
64.2 Type-Annotated Airflow DAGs
Write type-annotated DAGs for ETL pipelines, verified by Pyright, ensuring robust task definitions.
# File: de-onboarding/dags/sales_etl.py
from datetime import datetime
from typing import Dict, Any
from airflow import DAG
from airflow.operators.python import PythonOperator
import pandas as pd
import psycopg2
import yaml
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def load_config(config_path: str) -> Dict[str, Any]:
"""Load YAML configuration."""
logger.info(f"Loading config: {config_path}")
with open(config_path, "r") as file:
config = yaml.safe_load(file)
return config
def extract_sales(csv_path: str, config: Dict[str, Any]) -> pd.DataFrame:
"""Extract and validate sales data."""
logger.info(f"Extracting CSV: {csv_path}")
df = pd.read_csv(csv_path)
df = df.dropna(subset=["product"])
df = df[df["product"].str.startswith(config["product_prefix"])]
df = df[df["quantity"].apply(lambda x: str(x).isdigit())]
df["quantity"] = df["quantity"].astype(int)
df = df[df["quantity"] <= config["max_quantity"]]
df = df[df["price"].apply(lambda x: isinstance(x, (int, float)))]
df = df[df["price"] >= config["min_price"]]
logger.info(f"Extracted {len(df)} valid records")
return df
def transform_sales(df: pd.DataFrame) -> pd.DataFrame:
"""Transform sales data."""
logger.info("Transforming data")
df["amount"] = df["price"] * df["quantity"]
logger.info(f"Transformed {len(df)} records")
return df
def load_sales(df: pd.DataFrame, conn_params: Dict[str, str]) -> None:
"""Load sales data into PostgreSQL."""
logger.info("Loading data to PostgreSQL")
conn = psycopg2.connect(**conn_params)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS sales (
product TEXT,
price REAL,
quantity INTEGER,
amount REAL
)
""")
# Note: Row-by-row insertion is used for simplicity; efficient methods like pandas.to_sql or PostgresHook will be introduced in Chapter 69
for _, row in df.iterrows():
cursor.execute(
"INSERT INTO sales (product, price, quantity, amount) VALUES (%s, %s, %s, %s)",
(row["product"], row["price"], row["quantity"], row["amount"])
)
conn.commit()
cursor.close()
conn.close()
logger.info("Data loaded successfully")
# Define DAG
default_args: Dict[str, Any] = {
"owner": "airflow",
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
}
with DAG(
"sales_etl",
default_args=default_args,
description="Sales ETL Pipeline",
schedule_interval=None,
start_date=datetime(2023, 1, 1),
catchup=False,
) as dag:
config_task = PythonOperator(
task_id="load_config",
python_callable=load_config,
op_kwargs={"config_path": "/opt/airflow/dags/data/config.yaml"}
)
extract_task = PythonOperator(
task_id="extract_sales",
python_callable=extract_sales,
op_kwargs={
"csv_path": "/opt/airflow/dags/data/sales.csv",
"config": "{{ ti.xcom_pull(task_ids='load_config') }}"
}
)
transform_task = PythonOperator(
task_id="transform_sales",
python_callable=transform_sales,
op_kwargs={"df": "{{ ti.xcom_pull(task_ids='extract_sales') }}"}
)
load_task = PythonOperator(
task_id="load_sales",
python_callable=load_sales,
op_kwargs={
"df": "{{ ti.xcom_pull(task_ids='transform_sales') }}",
"conn_params": {
"dbname": "airflow",
"user": "user",
"password": "password",
"host": "postgresql-service.airflow.svc.cluster.local",
"port": "5432"
}
}
)
config_task >> extract_task >> transform_task >> load_taskFollow-Along Instructions:
- Save as
de-onboarding/dags/sales_etl.py. - Copy
data/sales.csvanddata/config.yamlto Airflow’s DAGs folder:kubectl cp data/sales.csv airflow/<scheduler-pod>:/opt/airflow/dags/data/ -n airflow. - Configure editor for 4-space indentation per PEP 8.
- Verify Pyright:
pyright sales_etl.py. - Trigger DAG:
airflow dags trigger sales_etl. - Common Errors:
- FileNotFoundError: Ensure
sales.csvandconfig.yamlare in/opt/airflow/dags/data/. - TypeError: Verify type annotations with
pyright. - IndentationError: Use 4 spaces. Run
python -tt sales_etl.py.
- FileNotFoundError: Ensure
Key Points:
- Type Annotations: Ensure type safety with
Dict[str, Any],pd.DataFrame. - XCom: Passes data between tasks (e.g., DataFrame from
extract_salestotransform_sales). - Time Complexity: O(n) for processing n rows in ETL tasks.
- Space Complexity: O(n) for DataFrame storage.
- Implication: Robust DAGs ensure reliable ETL for Hijra Group’s analytics.
64.3 Testing Airflow DAGs
Test DAGs with pytest to ensure task reliability.
# File: de-onboarding/tests/test_sales_etl.py
from typing import Dict, Any
import pytest
import pandas as pd
from dags.sales_etl import load_config, extract_sales, transform_sales, load_sales
@pytest.fixture
def config() -> Dict[str, Any]:
"""Fixture for config."""
return {
"min_price": 10.0,
"max_quantity": 100,
"required_fields": ["product", "price", "quantity"],
"product_prefix": "Halal",
"max_decimals": 2
}
def test_load_config(tmp_path) -> None:
"""Test config loading."""
config_path = tmp_path / "config.yaml"
config_path.write_text("""
min_price: 10.0
max_quantity: 100
required_fields:
- product
- price
- quantity
product_prefix: Halal
max_decimals: 2
""")
config = load_config(str(config_path))
assert config["min_price"] == 10.0
assert config["product_prefix"] == "Halal"
def test_extract_sales(config: Dict[str, Any], tmp_path) -> None:
"""Test data extraction."""
csv_path = tmp_path / "sales.csv"
csv_path.write_text("""
product,price,quantity
Halal Laptop,999.99,2
Halal Mouse,24.99,10
Non-Halal,50.00,5
""")
df = extract_sales(str(csv_path), config)
assert len(df) == 2
assert all(df["product"].str.startswith("Halal"))
assert all(df["quantity"] <= 100)
def test_transform_sales() -> None:
"""Test data transformation."""
df = pd.DataFrame({
"product": ["Halal Laptop"],
"price": [999.99],
"quantity": [2]
})
transformed = transform_sales(df)
assert "amount" in transformed.columns
assert transformed["amount"].iloc[0] == 999.99 * 2
def test_load_sales(tmp_path) -> None:
"""Test data loading (mocked).
Note: Mocking psycopg2.connect is deferred to Chapter 69 due to complexity.
For now, verify PostgreSQL contents manually with:
kubectl exec -it <postgresql-pod> -n airflow -- psql -U user -d airflow -c "SELECT * FROM sales;"
"""
df = pd.DataFrame({
"product": ["Halal Laptop"],
"price": [999.99],
"quantity": [2],
"amount": [1999.98]
})
conn_params = {
"dbname": "test",
"user": "test",
"password": "test",
"host": "localhost",
"port": "5432"
}
# Mocking database connection is complex; assume success if no errors
try:
load_sales(df, conn_params)
assert True
except Exception:
assert FalseFollow-Along Instructions:
- Save as
de-onboarding/tests/test_sales_etl.py. - Configure editor for 4-space indentation per PEP 8.
- Install pytest:
pip install pytest. - Run:
pytest tests/test_sales_etl.py -v. - Common Errors:
- ModuleNotFoundError: Ensure
dags/sales_etl.pyis accessible. - AssertionError: Print intermediate DataFrames with
print(df).
- ModuleNotFoundError: Ensure
Key Points:
- Pytest Fixtures: Provide reusable config.
- Mocking: Skips actual database connections for simplicity.
- Time Complexity: O(n) for testing n rows.
- Space Complexity: O(n) for test DataFrames.
- Implication: Ensures DAG reliability for production.
64.4 Micro-Project: Sales ETL Pipeline in Kubernetes
Project Requirements
Deploy a type-annotated Airflow instance in Kubernetes using Helm, orchestrating a sales ETL pipeline that processes data/sales.csv, validates it with data/config.yaml, and loads results into PostgreSQL. The pipeline supports Hijra Group’s transaction reporting, ensuring compliance with IFSB standards.
- Deploy Airflow with Helm, integrating with PostgreSQL.
- Write a type-annotated DAG (
sales_etl.py) for ETL. - Test DAG with
pytest, covering edge cases. - Log steps using
logging. - Use 4-space indentation per PEP 8.
- Validate with
data/sales.csvandconfig.yamlper Appendix 1.
Sample Input Files
data/sales.csv (from Appendix 1):
product,price,quantity
Halal Laptop,999.99,2
Halal Mouse,24.99,10
Halal Keyboard,49.99,5
,29.99,3
Monitor,invalid,2
Headphones,5.00,150data/config.yaml (from Appendix 1):
min_price: 10.0
max_quantity: 100
required_fields:
- product
- price
- quantity
product_prefix: 'Halal'
max_decimals: 2Data Processing Flow
flowchart TD
A["Input CSV
sales.csv"] --> B["Load Config
config.yaml"]
B --> C["Extract Task"]
C --> D["Transform Task"]
D --> E["Load Task"]
E --> F["PostgreSQL"]
F --> G["Processed Data"]
classDef data fill:#f9f9f9,stroke:#333,stroke-width:2px
classDef process fill:#d0e0ff,stroke:#336,stroke-width:1px
classDef storage fill:#ddffdd,stroke:#363,stroke-width:1px
class A,G data
class B,C,D,E process
class F storagePVC Architecture
This diagram shows how the Persistent Volume Claim (PVC) persists DAGs and data files, with PostgreSQL integration:
flowchart TD
A["Airflow Scheduler Pod"] --> B["PVC: airflow-dags-pvc"]
A --> F["PostgreSQL Pod"]
B --> C["DAGs (sales_etl.py)"]
B --> D["Data Files (sales.csv, config.yaml)"]
A --> E["Kubernetes Cluster"]
F --> G["PostgreSQL Service
postgresql-service"]
classDef pod fill:#d0e0ff,stroke:#336,stroke-width:1px
classDef storage fill:#ddffdd,stroke:#363,stroke-width:1px
classDef data fill:#f9f9f9,stroke:#333,stroke-width:2px
classDef cluster fill:#ffddaa,stroke:#663,stroke-width:1px
class A,F pod
class B,G storage
class C,D data
class E clusterAcceptance Criteria
- Go Criteria:
- Airflow deploys successfully in Kubernetes (
kubectl get pods -n airflow). - DAG loads, validates, transforms, and loads data into PostgreSQL.
- Tests pass with
pytest. - Logs steps and invalid records.
- Uses type annotations, verified by Pyright.
- Uses 4-space indentation per PEP 8.
- Airflow deploys successfully in Kubernetes (
- No-Go Criteria:
- Airflow fails to deploy.
- DAG fails to execute or load data.
- Tests fail.
- Missing type annotations or incorrect indentation.
Common Pitfalls to Avoid
- Helm Installation Failure:
- Problem: Chart not found or version mismatch.
- Solution: Run
helm repo update. Verify withhelm search repo airflow.
- Pod CrashLoopBackOff:
- Problem: Airflow pods fail.
- Solution: Check logs with
kubectl logs <pod-name> -n airflow.
- FileNotFoundError:
- Problem:
sales.csvnot found. - Solution: Copy files to
/opt/airflow/dags/data/. Verify withkubectl exec -it <scheduler-pod> -n airflow -- ls /opt/airflow/dags/data/.
- Problem:
- Type Errors:
- Problem: Pyright fails.
- Solution: Run
pyright sales_etl.py. Print variable types withprint(type(var)).
- IndentationError:
- Problem: Mixed spaces/tabs.
- Solution: Use 4 spaces. Run
python -tt sales_etl.py.
How This Differs from Production
In production, this solution would include:
- Security: OAuth2 and secrets management (Chapter 65).
- Observability: Prometheus monitoring (Chapter 66).
- Scalability: CeleryExecutor for distributed tasks.
- High Availability: Multiple scheduler replicas.
- CI/CD: Automated Helm deployments (Chapter 66).
Implementation
See sales_etl.py and test_sales_etl.py above. Helm configuration:
# File: de-onboarding/values.yaml
airflow:
# Note: Resource limits (e.g., CPU: 500m, memory: 1Gi) will be configured in Chapter 66 for production-grade deployments
executor: LocalExecutor
config:
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: 'postgresql+psycopg2://user:[email protected]:5432/airflow'
dags:
persistence:
enabled: true
existingClaim: airflow-dags-pvcPersistent Volume Claim (for DAGs):
# File: de-onboarding/dags-pvc.yaml
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: airflow-dags-pvc
namespace: airflow
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 1GiApply PVC:
kubectl apply -f dags-pvc.yamlHow to Run and Test
Setup:
- Checklist:
- Start
minikube:minikube start --cpus=2 --memory=4g. - Install
kubectl,helm, Docker Desktop. - Create
de-onboarding/data/withsales.csv,config.yaml. - Create
de-onboarding/ex5_sample_logs.txtwith sample logs (see Exercise 5). - Install libraries:
pip install apache-airflow psycopg2-binary pyyaml pytest kubernetes. - Configure editor for 4-space indentation per PEP 8.
- Save
values.yaml,dags-pvc.yaml,sales_etl.py,test_sales_etl.py,ex5_sample_logs.txt.
- Start
- Troubleshooting:
- If
FileNotFoundError, verify files in/opt/airflow/dags/data/. - If
pyrighterrors, check annotations withprint(type(var)). - If pods fail, check logs with
kubectl logs <pod-name> -n airflow.
- If
- Checklist:
Run:
- Apply PVC:
kubectl apply -f dags-pvc.yaml. - Install Airflow:
helm install airflow apache-airflow/airflow --namespace airflow --values values.yaml. - Copy files:
kubectl cp dags/ airflow/<scheduler-pod>:/opt/airflow/dags/ -n airflow. - Trigger DAG:
airflow dags trigger sales_etl. - Verify PostgreSQL:
kubectl exec -it <postgresql-pod> -n airflow -- psql -U user -d airflow -c "SELECT * FROM sales;".
- Apply PVC:
Test Scenarios:
- Valid Data: Verify PostgreSQL table has 3 rows (Halal products).
- Empty CSV: Test with
empty.csv:- Replace
sales.csv:kubectl cp data/empty.csv airflow/<scheduler-pod>:/opt/airflow/dags/data/sales.csv -n airflow. - Trigger DAG:
airflow dags trigger sales_etl. - Expected: Logs indicate no valid records, no rows inserted.
- Replace
- Invalid Headers: Test with
invalid.csv:- Replace
sales.csv:kubectl cp data/invalid.csv airflow/<scheduler-pod>:/opt/airflow/dags/data/sales.csv -n airflow. - Trigger DAG:
airflow dags trigger sales_etl. - Expected: DAG skips invalid rows, no rows inserted.
- Replace
- Malformed Data: Test with
malformed.csv:- Replace
sales.csv:kubectl cp data/malformed.csv airflow/<scheduler-pod>:/opt/airflow/dags/data/sales.csv -n airflow. - Trigger DAG:
airflow dags trigger sales_etl. - Expected: Only valid rows (e.g., Halal Mouse) inserted, others skipped.
- Replace
- Run Tests:
pytest tests/test_sales_etl.py -v. All tests should pass.
64.5 Practice Exercises
Exercise 1: Helm Configuration
Write a Helm values.yaml to configure Airflow with a custom namespace and executor, using 4-space indentation.
Expected Output:
helm install my-airflow apache-airflow/airflow --namespace my-namespace --values values.yaml
# Pods running in my-namespaceFollow-Along Instructions:
- Save as
de-onboarding/ex1_values.yaml. - Configure editor for 4-space indentation.
- Test: Run Helm install command.
- Verify:
kubectl get pods -n my-namespace.
Exercise 2: Type-Annotated DAG Task
Write a type-annotated PythonOperator task to validate sales data, using 4-space indentation.
Sample Input (data/sales.csv):
product,price,quantity
Halal Laptop,999.99,2Expected Output:
Validated 1 recordFollow-Along Instructions:
- Save as
de-onboarding/ex2_task.py. - Configure editor for 4-space indentation.
- Test: Integrate into
sales_etl.pyand trigger DAG.
Exercise 3: DAG Testing
Write a pytest test for the extract_sales function, using 4-space indentation.
Sample Input:
product,price,quantity
Halal Laptop,999.99,2
Non-Halal,50.00,5Expected Output:
2 rows (only Halal products)Follow-Along Instructions:
- Save as
de-onboarding/tests/ex3_test.py. - Configure editor for 4-space indentation.
- Run:
pytest tests/ex3_test.py -v.
Exercise 4: Debug a Failing DAG
Fix a DAG with incorrect XCom usage, ensuring 4-space indentation.
Buggy Code:
extract_task = PythonOperator(
task_id="extract_sales",
python_callable=extract_sales,
op_kwargs={"csv_path": "data/sales.csv"} # Bug: Missing config
)Expected Output:
DAG runs successfullyFollow-Along Instructions:
- Save as
de-onboarding/ex4_debug.py. - Configure editor for 4-space indentation.
- Fix and test in Airflow.
Exercise 5: Log Analysis
Analyze Airflow logs to debug task failures, using kubectl logs, with 4-space indentation for any scripts. If setup fails, use the sample log file ex5_sample_logs.txt for practice.
Sample Log File (de-onboarding/ex5_sample_logs.txt):
[2025-04-25 10:00:00] ERROR - extract_sales - FileNotFoundError: [Errno 2] No such file or directory: '/opt/airflow/dags/data/sales.csv'
[2025-04-25 10:01:00] ERROR - load_sales - OperationalError: FATAL: password authentication failed for user 'user'Task: Retrieve logs from the Airflow scheduler pod to identify:
- A
FileNotFoundErrorfor a missingsales.csvfile in theextract_salestask. - A PostgreSQL connection error in the
load_salestask due to incorrect credentials.
Expected Output:
Scenario 1: "FileNotFoundError: [Errno 2] No such file or directory: '/opt/airflow/dags/data/sales.csv'"
Scenario 2: "OperationalError: FATAL: password authentication failed for user 'user'"Follow-Along Instructions:
- Save any helper script as
de-onboarding/ex5_logs.sh. - Create
ex5_sample_logs.txtwith the sample log content above. - Scenario 1 (FileNotFoundError):
- Remove
sales.csv:kubectl exec -it <scheduler-pod> -n airflow -- rm /opt/airflow/dags/data/sales.csv. - Trigger DAG:
airflow dags trigger sales_etl. - Run:
kubectl logs <scheduler-pod> -n airflow | grep extract_sales > ex5_logs.txt. - Verify: Check
ex5_logs.txtorex5_sample_logs.txtforFileNotFoundError. - Fix:
kubectl cp data/sales.csv airflow/<scheduler-pod>:/opt/airflow/dags/data/ -n airflow.
- Remove
- Scenario 2 (PostgreSQL Error):
- Modify
values.yamlwith incorrect credentials (e.g.,password: wrong). - Run:
helm upgrade airflow apache-airflow/airflow --namespace airflow --values values.yaml. - Trigger DAG:
airflow dags trigger sales_etl. - Run:
kubectl logs <scheduler-pod> -n airflow | grep load_sales >> ex5_logs.txt. - Verify: Check
ex5_logs.txtorex5_sample_logs.txtforOperationalError. - Fix: Update
values.yamlwith correct credentials and re-runhelm upgrade.
- Modify
Exercise 6: Conceptual Analysis
Explain the benefits of deploying Airflow in Kubernetes (Chapter 64) versus Docker (Chapter 57), focusing on scalability and fault tolerance. Save the answer (100–150 words) to ex6_concepts.txt, using a text editor with 4-space indentation for any scripts.
- Evaluation Criteria: Address scalability, fault tolerance, and Hijra Group’s context; 100–150 words; clear and concise.
Expected Output (ex6_concepts.txt):
Deploying Airflow in Kubernetes offers scalability and fault tolerance over Docker. Kubernetes orchestrates multiple pods, enabling dynamic scaling of Airflow workers to handle large pipelines, unlike Docker’s single-container setup. It ensures fault tolerance through pod replication and automatic restarts, critical for Hijra Group’s analytics. Kubernetes’ Helm Charts simplify Airflow configuration, while Docker requires manual container management. However, Kubernetes is complex, requiring tools like minikube, whereas Docker is simpler for local testing. Kubernetes suits production-grade pipelines, aligning with Hijra Group’s need for reliable transaction processing.Follow-Along Instructions:
- Write the explanation in a text editor (e.g., VS Code).
- Save as
de-onboarding/ex6_concepts.txt. - Verify word count (~100–150 words) with
wc -w ex6_concepts.txt(Unix/macOS) or a text editor’s word count feature. - How to Test:
- Compare with the sample solution below.
- Ensure the explanation meets the evaluation criteria.
Exercise 7: Airflow UI Debugging
Use the Airflow UI to debug a task failure in the sales_etl DAG, saving a description of the UI’s task status to ex7_ui.txt, using a text editor with 4-space indentation for any scripts.
Task: Trigger the sales_etl DAG with a missing sales.csv file to cause a FileNotFoundError in the extract_sales task. Check the Airflow UI to identify the failure and describe the task status.
Expected Output (ex7_ui.txt):
The Airflow UI at http://localhost:8080 shows the sales_etl DAG with a failed extract_sales task. The task status is marked as 'failed' with a red icon, indicating a FileNotFoundError for sales.csv.Follow-Along Instructions:
- Remove
sales.csv:kubectl exec -it <scheduler-pod> -n airflow -- rm /opt/airflow/dags/data/sales.csv. - Trigger DAG:
airflow dags trigger sales_etl. - Access Airflow UI:
kubectl port-forward svc/airflow-webserver 8080:8080 -n airflow, openhttp://localhost:8080. - Navigate to the
sales_etlDAG, check theextract_salestask status, and note the failure. - Save the description in
de-onboarding/ex7_ui.txt. - How to Test:
- Verify
ex7_ui.txtdescribes the “failed” status. - Fix:
kubectl cp data/sales.csv airflow/<scheduler-pod>:/opt/airflow/dags/data/ -n airflow, re-trigger DAG, and confirm “success” in the UI.
- Verify
64.6 Exercise Solutions
Solution to Exercise 1: Helm Configuration
# File: de-onboarding/ex1_values.yaml
airflow:
executor: LocalExecutor
namespace: my-namespaceTest:
kubectl create namespace my-namespace
helm install my-airflow apache-airflow/airflow --namespace my-namespace --values ex1_values.yaml
kubectl get pods -n my-namespaceSolution to Exercise 2: Type-Annotated DAG Task
# File: de-onboarding/ex2_task.py
from typing import Dict, Any
import pandas as pd
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def validate_sales(df: pd.DataFrame, config: Dict[str, Any]) -> pd.DataFrame:
"""Validate sales data."""
logger.info("Validating sales data")
df = df[df["product"].str.startswith(config["product_prefix"])]
logger.info(f"Validated {len(df)} records")
return dfSolution to Exercise 3: DAG Testing
# File: de-onboarding/tests/ex3_test.py
from typing import Dict, Any
import pytest
import pandas as pd
from dags.sales_etl import extract_sales
@pytest.fixture
def config() -> Dict[str, Any]:
return {
"product_prefix": "Halal",
"min_price": 10.0,
"max_quantity": 100
}
def test_extract_sales(config: Dict[str, Any], tmp_path) -> None:
csv_path = tmp_path / "sales.csv"
csv_path.write_text("""
product,price,quantity
Halal Laptop,999.99,2
Non-Halal,50.00,5
""")
df = extract_sales(str(csv_path), config)
assert len(df) == 1
assert df["product"].iloc[0] == "Halal Laptop"Solution to Exercise 4: Debug a Failing DAG
# File: de-onboarding/ex4_debug.py
extract_task = PythonOperator(
task_id="extract_sales",
python_callable=extract_sales,
op_kwargs={
"csv_path": "/opt/airflow/dags/data/sales.csv",
"config": "{{ ti.xcom_pull(task_ids='load_config') }}"
}
)Explanation:
- Bug: Missing
configparameter caused validation failure. - Fix: Added
configvia XCom fromload_configtask.
Solution to Exercise 5: Log Analysis
# File: de-onboarding/ex5_logs.sh
#!/bin/bash
# Retrieve Airflow scheduler logs to debug task failures
# Scenario 1: FileNotFoundError
kubectl logs <scheduler-pod> -n airflow | grep extract_sales > ex5_logs.txt
# Scenario 2: PostgreSQL Error
kubectl logs <scheduler-pod> -n airflow | grep load_sales >> ex5_logs.txt
cat ex5_logs.txtExplanation:
- Scenario 1: Identifies
FileNotFoundErrorforsales.csv. Fix by copying the file. - Scenario 2: Identifies
OperationalErrorfor PostgreSQL credentials. Fix by updatingvalues.yaml.
Solution to Exercise 6: Conceptual Analysis
# File: de-onboarding/ex6_concepts.txt
Deploying Airflow in Kubernetes offers significant advantages over Docker for scalability and fault tolerance. Kubernetes orchestrates multiple pods, enabling dynamic scaling of Airflow workers to process large datasets, unlike Docker’s single-container approach in Chapter 57. Kubernetes ensures fault tolerance through pod replication and automatic restarts, critical for Hijra Group’s reliable transaction processing. Helm Charts simplify Airflow configuration, whereas Docker requires manual container management. However, Kubernetes introduces complexity, requiring tools like minikube, while Docker is simpler for local testing. For Hijra Group’s production-grade pipelines, Kubernetes’ scalability and resilience make it ideal, aligning with the need for robust analytics.Explanation:
- Content: Addresses scalability, fault tolerance, and Hijra Group’s context in ~120 words, meeting the 100–150 word requirement and evaluation criteria.
Solution to Exercise 7: Airflow UI Debugging
# File: de-onboarding/ex7_ui.txt
The Airflow UI at http://localhost:8080 shows the sales_etl DAG with a failed extract_sales task. The task status is marked as 'failed' with a red icon, indicating a FileNotFoundError for sales.csv.Explanation:
- Task: Identifies the
extract_salestask failure in the Airflow UI due to a missingsales.csv. - Fix: Copy the file to
/opt/airflow/dags/data/and re-trigger the DAG.
64.7 Chapter Summary and Connection to Chapter 65
In this chapter, you’ve mastered:
- Airflow in Kubernetes: Deploying with Helm Charts (O(n) pod scheduling).
- Type-Annotated DAGs: Writing robust ETL pipelines with Pyright.
- Testing: Validating DAGs with
pytest. - Logging: Tracking execution with
logging, reinforced by log analysis and UI debugging.
The micro-project deployed a type-annotated Airflow instance, orchestrating a sales ETL pipeline with PostgreSQL integration, ensuring reliability for Hijra Group’s analytics. All code used 4-space indentation per PEP 8.
Connection to Chapter 65
Chapter 65 introduces Security Best Practices for Data Pipelines, building on this chapter:
- Security: Adds OAuth2 and PII masking to Airflow, protecting sensitive sales data.
- Configuration: Extends Helm Charts with secrets management.
- Testing: Enhances tests for security compliance.
- Fintech Context: Ensures IFSB-compliant data handling for Hijra Group.