55 - Simple Scheduling with Python
Complexity: Moderate (M)
55.0 Introduction: Why This Matters for Data Engineering
In data engineering, scheduling is critical for automating recurring ETL (Extract, Transform, Load) pipelines, a core component of scalable analytics for Hijra Group’s Sharia-compliant fintech platform. Scheduling ensures timely data processing, enabling daily sales reports to comply with Islamic Financial Services Board (IFSB) standards and deliver real-time transaction insights. Python’s APScheduler library provides robust, type-annotated scheduling for tasks like processing sales data, offering a lightweight alternative to orchestrators like Airflow. This chapter builds on Phase 8’s pipeline orchestration (Chapters 52–54), introducing scheduling to automate dbt transformations and data processing, preparing for Airflow in Chapter 56.
This chapter uses type annotations (Chapter 7) verified by Pyright and pytest testing (Chapter 9), with all code adhering to PEP 8’s 4-space indentation, preferring spaces over tabs to avoid IndentationError. Error handling is limited to basic checks, deferring try/except to Chapter 56. The micro-project schedules a sales data ETL task using data/sales.csv and config.yaml, producing a JSON report, with tests for edge cases (empty.csv, invalid YAML, partial data).
Data Engineering Workflow Context
This diagram illustrates scheduling in a pipeline:
flowchart TD
A["Raw Data (CSV)"] --> B["Python ETL Script"]
B --> C["APScheduler Scheduling"]
C -->|Run at Interval| D["Pandas Validation"]
D --> E["Validate & Transform"]
E --> F["Output (JSON)"]
F --> G["Storage/Analytics"]
classDef data fill:#f9f9f9,stroke:#333,stroke-width:2px
classDef process fill:#d0e0ff,stroke:#336,stroke-width:1px
classDef storage fill:#ddffdd,stroke:#363,stroke-width:1px
class A,F data
class B,C,D,E process
class G storageBuilding On and Preparing For
- Building On:
- Chapter 2: File handling and YAML parsing with
PyYAML. - Chapter 3: Pandas for data processing.
- Chapter 7: Type annotations with Pyright.
- Chapter 9: Pytest for testing.
- Chapter 54: dbt for transformations, extended by scheduled tasks.
- Chapter 2: File handling and YAML parsing with
- Preparing For:
- Chapter 56: Airflow for advanced orchestration.
- Chapter 57: Dockerized Airflow, building on scheduled tasks.
- Chapters 67–70: Capstone projects integrating scheduling with Airflow and Kubernetes.
What You’ll Learn
This chapter covers:
- APScheduler Basics: Scheduling tasks with intervals and cron triggers (~1MB memory for small job lists, no job overlap by default).
- Type Annotations: Type-safe task definitions.
- Task Automation: Scheduling ETL processes with Pandas.
- Testing: Pytest for schedule and task validation.
- Logging: Print-based logging for task execution.
The micro-project schedules a type-annotated ETL task to process data/sales.csv, validate with config.yaml, and output to data/sales_report.json, with pytest tests for robustness.
Follow-Along Tips:
- Create
de-onboarding/data/and populate withsales.csv,config.yaml,empty.csv,sample.csvper Appendix 1. - Install libraries:
pip install apscheduler pandas pyyaml pytest. - Use 4-space indentation per PEP 8. Run
python -tt script.pyto detect tab/space issues. - Debug with print statements (e.g.,
print(df.head())). - Verify file paths with
ls data/(Unix/macOS) ordir data\(Windows). - Use UTF-8 encoding to avoid
UnicodeDecodeError.
55.1 APScheduler Basics
APScheduler (Advanced Python Scheduler) schedules tasks with triggers like intervals (e.g., every 5 seconds) or cron (e.g., daily at 8 AM). It runs in-memory, suitable for lightweight pipelines, with O(1) task scheduling and O(n) for managing n jobs. Memory usage is minimal (~1MB for small job lists), and jobs do not overlap by default (per APScheduler documentation), ensuring sequential execution. This is ideal for Hijra Group’s daily sales processing.
55.1.1 Interval-Based Scheduling
Schedule a task to run at fixed intervals. Note that BlockingScheduler runs in the foreground, unsuitable for web applications (e.g., FastAPI in Chapter 53), where BackgroundScheduler is used in production (Chapter 56). The “Scheduler stopped” log is unreachable due to blocking; use Ctrl+C to stop for demonstration.
# File: de-onboarding/interval_schedule.py
from apscheduler.schedulers.blocking import BlockingScheduler
from typing import NoReturn
from datetime import datetime
def print_time() -> None:
"""Print current time."""
print(f"Task executed at: {datetime.now()}") # Log time
def run_scheduler() -> NoReturn:
"""Run interval-based scheduler."""
scheduler = BlockingScheduler() # Create scheduler
scheduler.add_job(
print_time, # Task to run
"interval", # Trigger type
seconds=5, # Run every 5 seconds
id="time_task" # Name job for identification
# Example with parameters: add_job(print_message, "interval", seconds=5, args=["Hello"], id="message_task")
)
print("Starting scheduler...") # Log start
print(f"Scheduled {len(scheduler.get_jobs())} job(s)") # Log job count
scheduler.start() # Start scheduler (blocks execution)
print("Scheduler stopped") # Log shutdown (unreachable due to blocking)
if __name__ == "__main__":
run_scheduler()
# Expected Output (runs indefinitely):
# Starting scheduler...
# Scheduled 1 job(s)
# Task executed at: 2025-04-25 10:00:00.123456
# Task executed at: 2025-04-25 10:00:05.123456
# (continues every 5 seconds)Follow-Along Instructions:
- Install APScheduler:
pip install apscheduler. - Save as
de-onboarding/interval_schedule.py. - Configure editor for 4-space indentation per PEP 8 (VS Code: “Editor: Tab Size” = 4, “Editor: Insert Spaces” = true, “Editor: Detect Indentation” = false).
- Run:
python interval_schedule.py. - Stop with Ctrl+C.
- Common Errors:
- ModuleNotFoundError: Install
apschedulerwithpip install apscheduler. - IndentationError: Use 4 spaces. Run
python -tt interval_schedule.py.
- ModuleNotFoundError: Install
Key Points:
- BlockingScheduler: Blocks execution, suitable for standalone scripts.
add_job(): Schedules tasks with triggers, supportingidfor naming andargsfor parameters.- Time Complexity: O(1) for scheduling, O(n) for managing n jobs.
- Space Complexity: O(n) for n jobs, ~1MB for small lists.
- Implication: Ideal for simple, recurring tasks like sales processing.
55.1.2 Cron-Based Scheduling
Schedule tasks using cron-like expressions for precise timing. Interval triggers are better for short, frequent tasks (e.g., monitoring sales), while cron triggers are ideal for fixed schedules (e.g., Hijra Group’s daily reports). The “Scheduler stopped” log is unreachable; use Ctrl+C to stop.
| Field | Description | Example |
|---|---|---|
second | Seconds (0–59, */n for every n) | */10 (every 10s) |
minute | Minutes (0–59) | 0 (at minute 0) |
hour | Hours (0–23) | 8 (at 8 AM) |
# File: de-onboarding/cron_schedule.py
from apscheduler.schedulers.blocking import BlockingScheduler
from typing import NoReturn
from datetime import datetime
def print_time() -> None:
"""Print current time."""
print(f"Task executed at: {datetime.now()}") # Log time
def run_scheduler() -> NoReturn:
"""Run cron-based scheduler."""
scheduler = BlockingScheduler() # Create scheduler
scheduler.add_job(
print_time, # Task to run
"cron", # Trigger type
second="*/10", # Run every 10 seconds
id="cron_time_task" # Name job
# Example for daily at 8 AM: hour=8, minute=0 (per APScheduler docs, runs daily at 8:00 AM)
)
print("Starting scheduler...") # Log start
print(f"Scheduled {len(scheduler.get_jobs())} job(s)") # Log job count
scheduler.start() # Start scheduler
print("Scheduler stopped") # Log shutdown (unreachable due to blocking)
if __name__ == "__main__":
run_scheduler()
# Expected Output (runs indefinitely):
# Starting scheduler...
# Scheduled 1 job(s)
# Task executed at: 2025-04 aktywności:25 10:00:00.123456
# Task executed at: 2025-04-25 10:00:10.123456
# (continues every 10 seconds)Follow-Along Instructions:
- Save as
de-onboarding/cron_schedule.py. - Configure editor for 4-space indentation.
- Run:
python cron_schedule.py. - Stop with Ctrl+C.
- Common Errors:
- ValueError: Invalid cron expression. Use
second="*/10". Print expression to debug. - IndentationError: Use 4 spaces. Run
python -tt cron_schedule.py.
- ValueError: Invalid cron expression. Use
Key Points:
- Cron Trigger: Uses cron syntax (e.g.,
second="*/10"for every 10 seconds). - Flexibility: Supports complex schedules (e.g., daily at 8 AM with
hour=8, minute=0). - Implication: Useful for daily ETL tasks in Hijra Group’s pipelines.
55.2 Micro-Project: Scheduled Sales ETL Processor
Project Requirements
Build a type-annotated, scheduled ETL processor to automate sales data processing, potentially including dbt-transformed outputs from Chapter 54, using data/sales.csv and config.yaml. The task runs every 5 seconds for demonstration, processing sales data, validating records for Sharia-compliant products (e.g., Halal prefix), and outputting a JSON report to data/sales_report.json. Tests use pytest to verify functionality and edge cases (empty.csv, invalid YAML, partial data).
- Load: Read
sales.csvwithpandas.read_csv(Chapter 3) andconfig.yamlwithPyYAML(Chapter 2). - Validate: Ensure Halal products, positive prices, integer quantities, and config rules.
- Process: Compute total sales and top products using Pandas (Chapter 3).
- Output: Save results to
data/sales_report.json. - Schedule: Run every 5 seconds using APScheduler.
- Test: Use pytest (Chapter 9) for task execution and edge cases.
- Log: Print task execution and validation errors.
- Indentation: Use 4-space indentation per PEP 8, preferring spaces over tabs.
Sample Input Files
data/sales.csv (Appendix 1):
product,price,quantity
Halal Laptop,999.99,2
Halal Mouse,24.99,10
Halal Keyboard,49.99,5
,29.99,3
Monitor,invalid,2
Headphones,5.00,150data/config.yaml (Appendix 1):
min_price: 10.0
max_quantity: 100
required_fields:
- product
- price
- quantity
product_prefix: 'Halal'
max_decimals: 2Data Processing Flow
flowchart TD
A["Input CSV
sales.csv"] --> B["Load CSV
pandas.read_csv"]
B --> C["Pandas DataFrame"]
C --> D["Read YAML
config.yaml"]
D --> E["Pandas Validation"]
E -->|Invalid| F["Log Warning"]
E -->|Valid| G["Compute Metrics
Pandas"]
G --> H["Export JSON
sales_report.json"]
G --> I["APScheduler Scheduling"]
I -->|Every 5s| E
F --> J["End Task"]
H --> J
classDef data fill:#f9f9f9,stroke:#333,stroke-width:2px
classDef process fill:#d0e0ff,stroke:#336,stroke-width:1px
classDef error fill:#ffdddd,stroke:#933,stroke-width:1px
classDef endpoint fill:#ddffdd,stroke:#363,stroke-width:1px
class A,C data
class B,D,E,G,H,I process
class F error
class J endpointAcceptance Criteria
- Go Criteria:
- Loads
sales.csvandconfig.yamlcorrectly. - Validates records per config rules, ensuring Sharia-compliant products.
- Computes total sales and top 3 products.
- Exports to
data/sales_report.json. - Schedules task every 5 seconds.
- Passes pytest tests, including
empty.csv, invalid YAML, partial data, and job count. - Logs execution and errors.
- Uses type annotations and 4-space indentation.
- Loads
- No-Go Criteria:
- Fails to load files or validate data.
- Incorrect calculations or output.
- Scheduling fails or uses try/except.
- Inconsistent indentation.
Common Pitfalls to Avoid
- FileNotFoundError:
- Problem: Missing
sales.csvorconfig.yaml. - Solution: Verify paths with
print(csv_path). Ensure files exist per Appendix 1.
- Problem: Missing
- Validation Errors:
- Problem: Missing values break filtering.
- Solution: Use
dropna(). Printdf.head().
- Type Mismatches:
- Problem: Non-numeric prices cause errors.
- Solution: Validate with
utils.is_numeric_value. Printdf.dtypes.
- Scheduler Blocking:
- Problem:
BlockingSchedulerhangs. - Solution: Use short intervals (5s) for testing. Stop with Ctrl+C.
- Problem:
- Job Misfiring:
- Problem: Tasks skipped due to system load.
- Solution: Monitor logs for missed executions. In production, adjust
misfire_grace_time(Chapter 56).
- IndentationError:
- Problem: Mixed spaces/tabs.
- Solution: Use 4 spaces. Run
python -tt etl_processor.py.
How This Differs from Production
In production, this solution would include:
- Error Handling: Try/except for robust failures (Chapter 56).
- Logging: File-based logging (Chapter 52).
- Scalability: Chunked CSV processing (Chapter 40).
- Monitoring: Observability with metrics (Chapter 66).
- Distributed Scheduling: Airflow for multi-node orchestration (Chapter 56), unlike APScheduler’s single-process model.
Implementation
# File: de-onboarding/utils.py (updated from Chapter 54)
from typing import Any, Dict, Union
def is_numeric(s: str, max_decimals: int = 2) -> bool:
"""Check if string is a decimal number with up to max_decimals."""
parts = s.split(".")
if len(parts) != 2 or not parts[0].replace("-", "").isdigit() or not parts[1].isdigit():
return False
return len(parts[1]) <= max_decimals
def clean_string(s: Union[str, Any]) -> str:
"""Strip whitespace from string."""
return str(s).strip()
def is_numeric_value(x: Any) -> bool:
"""Check if value is numeric."""
return isinstance(x, (int, float))
def has_valid_decimals(x: Any, max_decimals: int) -> bool:
"""Check if value has valid decimal places."""
return is_numeric(str(x), max_decimals)
def apply_valid_decimals(x: Any, max_decimals: int) -> bool:
"""Apply decimal validation."""
return has_valid_decimals(x, max_decimals)
def is_integer(x: Any) -> bool:
"""Check if value is an integer."""
return str(x).isdigit()
def validate_sale(sale: Dict[str, str], config: Dict[str, Any]) -> bool:
"""Validate sale based on config rules."""
required_fields = config["required_fields"]
min_price = config["min_price"]
max_quantity = config["max_quantity"]
prefix = config["product_prefix"]
max_decimals = config["max_decimals"]
print(f"Validating sale: {sale}")
for field in required_fields:
if not sale[field] or sale[field].strip() == "":
print(f"Invalid sale: missing {field}: {sale}")
return False
product = clean_string(sale["product"])
if not product.startswith(prefix):
print(f"Invalid sale: product lacks '{prefix}' prefix: {sale}")
return False
price = clean_string(sale["price"])
if not is_numeric(price, max_decimals) or float(price) < min_price or float(price) <= 0:
print(f"Invalid sale: invalid price: {sale}")
return False
quantity = clean_string(sale["quantity"])
if not quantity.isdigit() or int(quantity) > max_quantity:
print(f"Invalid sale: invalid quantity: {sale}")
return False
return True
# File: de-onboarding/etl_processor.py
from apscheduler.schedulers.blocking import BlockingScheduler
from typing import Dict, Tuple, NoReturn
import pandas as pd
import yaml
import json
import os
import utils
def read_config(config_path: str) -> Dict[str, Any]:
"""Read YAML configuration (uses PyYAML from Chapter 2)."""
print(f"Opening config: {config_path}")
with open(config_path, "r") as file:
config = yaml.safe_load(file)
print(f"Loaded config: {config}")
return config
def load_and_validate_sales(csv_path: str, config: Dict[str, Any]) -> Tuple[pd.DataFrame, int, int]:
"""Load and validate sales CSV (uses Pandas from Chapter 3)."""
print(f"Loading CSV: {csv_path}")
df = pd.read_csv(csv_path) # Load CSV into DataFrame
print("Initial DataFrame:")
print(df.head())
required_fields = config["required_fields"]
missing_fields = [f for f in required_fields if f not in df.columns]
if missing_fields:
print(f"Missing columns: {missing_fields}")
return pd.DataFrame(), 0, len(df)
df = df.dropna(subset=["product"])
df = df[df["product"].str.startswith(config["product_prefix"])]
df = df[df["quantity"].apply(utils.is_integer)]
df["quantity"] = df["quantity"].astype(int)
df = df[df["quantity"] <= config["max_quantity"]]
df = df[df["price"].apply(utils.is_numeric_value)]
df = df[df["price"] > 0]
df = df[df["price"] >= config["min_price"]]
df = df[df["price"].apply(lambda x: utils.apply_valid_decimals(x, config["max_decimals"]))]
total_records = len(df)
print("Validated DataFrame:")
print(df)
return df, len(df), total_records
def process_sales(df: pd.DataFrame, config: Dict[str, Any]) -> Tuple[Dict[str, Any], int]:
"""Process sales data (uses Pandas from Chapter 3)."""
if df.empty:
print("No valid sales data")
return {"total_sales": 0.0, "unique_products": [], "top_products": {}}, 0
df["amount"] = df["price"] * df["quantity"]
print("DataFrame with Amount:")
print(df)
total_sales = df["amount"].sum()
unique_products = df["product"].unique().tolist()
sales_by_product = df.groupby("product")["amount"].sum()
top_products = sales_by_product.sort_values(ascending=False).head(3).to_dict()
valid_sales = len(df)
print(f"Valid sales: {valid_sales} records")
return {
"total_sales": float(total_sales),
"unique_products": unique_products,
"top_products": top_products
}, valid_sales
def export_results(results: Dict[str, Any], json_path: str) -> None:
"""Export results to JSON (uses file handling from Chapter 2)."""
print(f"Writing to: {json_path}")
print(f"Results: {results}")
with open(json_path, "w") as file:
json.dump(results, file, indent=2)
print(f"Exported results to {json_path}")
def etl_task(csv_path: str, config_path: str, json_path: str) -> None:
"""ETL task to process sales data."""
print(f"Running ETL task at: {pd.Timestamp.now()}")
config = read_config(config_path)
df, valid_sales, total_records = load_and_validate_sales(csv_path, config)
results, valid_sales = process_sales(df, config)
export_results(results, json_path)
print("\nSales Report:")
print(f"Total Records Processed: {total_records}")
print(f"Valid Sales: {valid_sales}")
print(f"Invalid Sales: {total_records - valid_sales}")
print(f"Total Sales: ${round(results['total_sales'], 2)}")
print(f"Unique Products: {results['unique_products']}")
print(f"Top Products: {results['top_products']}")
def run_scheduler(csv_path: str, config_path: str, json_path: str) -> NoReturn:
"""Run scheduler for ETL task."""
scheduler = BlockingScheduler()
scheduler.add_job(
etl_task, # Task to run
"interval", # Trigger type
seconds=5, # Run every 5 seconds
args=[csv_path, config_path, json_path], # Pass arguments
id="etl_task" # Name job
)
print("Starting scheduler...")
print(f"Scheduled {len(scheduler.get_jobs())} job(s)") # Log job count
scheduler.start()
print("Scheduler stopped") # Log shutdown (unreachable due to blocking)
def main() -> None:
"""Main function."""
csv_path = "data/sales.csv"
config_path = "data/config.yaml"
json_path = "data/sales_report.json"
run_scheduler(csv_path, config_path, json_path)
if __name__ == "__main__":
main()
# File: de-onboarding/test_etl_processor.py
import pytest
import pandas as pd
import os
import json
import time
from typing import Dict, Any
from apscheduler.schedulers.blocking import BlockingScheduler
from etl_processor import read_config, load_and_validate_sales, process_sales, export_results, etl_task, run_scheduler
@pytest.fixture
def config() -> Dict[str, Any]:
"""Fixture for config."""
return read_config("data/config.yaml")
def test_load_and_validate_sales(config: Dict[str, Any]) -> None:
"""Test loading and validating sales data."""
df, valid_sales, total_records = load_and_validate_sales("data/sales.csv", config)
assert valid_sales == 3
assert total_records == 3
assert len(df) == 3
assert set(df["product"]) == {"Halal Laptop", "Halal Mouse", "Halal Keyboard"}
def test_load_empty_csv(config: Dict[str, Any]) -> None:
"""Test loading empty CSV."""
df, valid_sales, total_records = load_and_validate_sales("data/empty.csv", config)
assert df.empty
assert valid_sales == 0
assert total_records == 0
def test_process_sales(config: Dict[str, Any]) -> None:
"""Test processing sales data."""
df, _, _ = load_and_validate_sales("data/sales.csv", config)
results, valid_sales = process_sales(df, config)
assert valid_sales == 3
assert results["total_sales"] == 2499.83
assert len(results["unique_products"]) == 3
assert results["top_products"]["Halal Laptop"] == 1999.98
def test_export_results(tmp_path: Any) -> None:
"""Test exporting results."""
json_path = tmp_path / "test.json"
results = {
"total_sales": 2499.83,
"unique_products": ["Halal Laptop", "Halal Mouse"],
"top_products": {"Halal Laptop": 1999.98}
}
export_results(results, str(json_path))
assert os.path.exists(json_path)
with open(json_path, "r") as file:
saved = json.load(file)
assert saved == results
def test_etl_task(tmp_path: Any, config: Dict[str, Any]) -> None:
"""Test ETL task."""
json_path = tmp_path / "sales_report.json"
etl_task("data/sales.csv", "data/config.yaml", str(json_path))
assert os.path.exists(json_path)
with open(json_path, "r") as file:
results = json.load(file)
assert results["total_sales"] == 2499.83
def test_invalid_yaml(tmp_path: Any) -> None:
"""Test handling invalid YAML."""
invalid_yaml = tmp_path / "invalid.yaml"
with open(invalid_yaml, "w") as file:
file.write("min_price: 10.0\n max_quantity: 100") # Invalid indentation
df, valid_sales, total_records = load_and_validate_sales("data/sales.csv", read_config(str(invalid_yaml)))
assert df.empty
assert valid_sales == 0
def test_partial_data(tmp_path: Any, config: Dict[str, Any]) -> None:
"""Test processing partial valid data (simulates CSV with mixed valid/invalid rows)."""
partial_csv = tmp_path / "partial.csv"
with open(partial_csv, "w") as file:
file.write("product,price,quantity\nHalal Laptop,999.99,2\nMonitor,invalid,2\n")
df, valid_sales, total_records = load_and_validate_sales(str(partial_csv), config)
assert valid_sales == 1
assert len(df) == 1
assert df["product"].iloc[0] == "Halal Laptop"
def test_scheduler_job_count() -> None:
"""Test scheduler schedules exactly one job."""
scheduler = BlockingScheduler()
scheduler.add_job(
lambda: None, # Mock task
"interval",
seconds=5,
id="test_task"
)
assert len(scheduler.get_jobs()) == 1
scheduler.shutdown()
def test_scheduler_execution(tmp_path: Any) -> None:
"""Test scheduler executes at least once. Note: time.sleep(2) assumes a 1-second interval; increase to 3 seconds for slower systems."""
timestamp_file = tmp_path / "timestamp.txt"
def mock_task() -> None:
with open(timestamp_file, "w") as f:
f.write(str(pd.Timestamp.now()))
scheduler = BlockingScheduler()
scheduler.add_job(mock_task, "interval", seconds=1, id="mock_task")
scheduler.start(paused=True) # Start paused
scheduler.resume()
time.sleep(2) # Wait for execution
scheduler.shutdown()
assert os.path.exists(timestamp_file)Expected Outputs
data/sales_report.json:
{
"total_sales": 2499.83,
"unique_products": ["Halal Laptop", "Halal Mouse", "Halal Keyboard"],
"top_products": {
"Halal Laptop": 1999.98,
"Halal Mouse": 249.9,
"Halal Keyboard": 249.95
}
}Console Output (abridged, runs every 5 seconds):
Starting scheduler...
Scheduled 1 job(s)
Running ETL task at: 2025-04-25 10:00:00.123456
Opening config: data/config.yaml
Loaded config: {'min_price': 10.0, 'max_quantity': 100, ...}
Loading CSV: data/sales.csv
Initial DataFrame:
product price quantity
0 Halal Laptop 999.99 2
...
Validated DataFrame:
product price quantity
0 Halal Laptop 999.99 2
1 Halal Mouse 24.99 10
2 Halal Keyboard 49.99 5
DataFrame with Amount:
product price quantity amount
0 Halal Laptop 999.99 2 1999.98
...
Valid sales: 3 records
Writing to: data/sales_report.json
Exported results to data/sales_report.json
Sales Report:
Total Records Processed: 3
Valid Sales: 3
Invalid Sales: 0
Total Sales: $2499.83
...How to Run and Test
Setup:
- Setup Checklist:
- Create
de-onboarding/data/directory. - Save
sales.csv,config.yaml,empty.csv,sample.csvper Appendix 1. - Verify
sales.csvhas 6 rows:wc -l data/sales.csv(Unix/macOS) orfindstr /r /n "^" data\sales.csv | find /c ":"(Windows). - Validate
config.yamlper Chapter 2:python -c "import yaml; yaml.safe_load(open('data/config.yaml'))". - Check file encoding:
file data/sales.csv(Unix/macOS, expect UTF-8) orchcp(Windows, expect 65001 for UTF-8). - Create virtual environment:
python -m venv venv, activate (Windows:venv\Scripts\activate, Unix:source venv/bin/activate). - Verify APScheduler:
python -c "import apscheduler; print(apscheduler.__version__)". - Install libraries:
pip install apscheduler pandas pyyaml pytest. - Verify Python 3.10+:
python --version. - Configure editor for 4-space indentation per PEP 8 (VS Code: “Editor: Tab Size” = 4, “Editor: Insert Spaces” = true, “Editor: Detect Indentation” = false).
- Verify type annotations:
pyright etl_processor.py(requirespip install pyright). - Save
utils.py,etl_processor.py,test_etl_processor.pyinde-onboarding/.
- Create
- Troubleshooting:
- If
FileNotFoundErrororPermissionError, check permissions:ls -l data/(Unix/macOS) ordir data\(Windows). - If
ModuleNotFoundError, install libraries or checkutils.pypath. - If
IndentationError, runpython -tt etl_processor.py. - If
yaml.YAMLError, printopen(config_path).read()to inspectconfig.yaml. - If
UnicodeDecodeError, ensure UTF-8 encoding. - If scheduler doesn’t run, print
scheduler.get_jobs()to list scheduled tasks.
- If
- Setup Checklist:
Run:
- Open terminal in
de-onboarding/. - Run:
python etl_processor.py. - Outputs:
data/sales_report.json, console logs every 5 seconds. - Stop with Ctrl+C.
- Open terminal in
Test:
- Run:
pytest test_etl_processor.py -v. - Tests:
test_load_and_validate_sales: Verifies valid data.test_load_empty_csv: Ensures empty CSV handling.test_process_sales: Checks calculations.test_export_results: Validates JSON output.test_invalid_yaml: Tests malformed YAML.test_partial_data: Verifies partial valid data.test_scheduler_job_count: Confirms one job scheduled.test_scheduler_execution: Confirms scheduler runs.
- Troubleshooting:
- If tests fail, print
df.head()oros.path.exists(json_path). - If
test_partial_datafails, printdfto inspect filtered data. - If
test_scheduler_executionfails, increasetime.sleep(2)to 3 seconds.
- If tests fail, print
- Run:
55.3 Practice Exercises
Exercise 1: Interval Scheduler
Write a type-annotated function to schedule a task printing the current time every 3 seconds, using 4-space indentation.
Expected Output:
Starting scheduler...
Scheduled 1 job(s)
Task executed at: 2025-04-25 10:00:00.123456
Task executed at: 2025-04-25 10:00:03.123456Follow-Along Instructions:
- Save as
de-onboarding/ex1_interval.py. - Run:
python ex1_interval.py. - Stop with Ctrl+C.
- Test: Verify output every 3 seconds.
Exercise 2: Cron Scheduler
Write a type-annotated function to schedule a task printing the current time every 10 seconds using cron, with 4-space indentation.
Expected Output:
Starting scheduler...
Scheduled 1 job(s)
Task executed at: 2025-04-25 10:00:00.123456
Task executed at: 2025-04-25 10:00:10.123456Follow-Along Instructions:
- Save as
de-onboarding/ex2_cron.py. - Run:
python ex2_cron.py. - Stop with Ctrl+C.
- Test: Verify output every 10 seconds.
Exercise 3: Scheduled ETL
Write a type-annotated function to schedule an ETL task processing data/sample.csv every 5 seconds, outputting to data/sample_report.json, with 4-space indentation.
Expected Output (data/sample_report.json):
{
"total_sales": 2249.88,
"unique_products": ["Halal Laptop", "Halal Mouse"],
"top_products": {
"Halal Laptop": 1999.98,
"Halal Mouse": 249.9
}
}Follow-Along Instructions:
- Save as
de-onboarding/ex3_etl.py. - Ensure
data/sample.csv,config.yaml. - Run:
python ex3_etl.py. - Stop with Ctrl+C.
- Test: Verify JSON output.
Exercise 4: Debug Scheduler Bug
Fix this buggy code where the scheduler runs too frequently (every second), ensuring 4-space indentation.
Buggy Code:
from apscheduler.schedulers.blocking import BlockingScheduler
def print_time():
print("Task executed")
scheduler = BlockingScheduler()
scheduler.add_job(print_time, "interval", seconds=1) # Bug: Too frequent
scheduler.start()Expected Output (every 5 seconds):
Starting scheduler...
Scheduled 1 job(s)
Task executedFollow-Along Instructions:
- Save as
de-onboarding/ex4_debug.py. - Run:
python ex4_debug.py. - Fix and re-run.
- Test: Verify output every 5 seconds.
Exercise 5: Pytest for Scheduler
Write a pytest test to verify the ETL task produces correct output for data/sample.csv, with 4-space indentation.
Expected Output:
test_etl_task ... passedFollow-Along Instructions:
- Save as
de-onboarding/test_ex5.py. - Run:
pytest test_ex5.py -v. - Test: Verify test passes.
Exercise 6: Conceptual Analysis of Triggers
Explain the difference between interval and cron triggers in APScheduler, when to use each for Hijra Group’s pipelines, and compare APScheduler to manual cron jobs (e.g., Linux cron) in terms of setup complexity. Save the explanation to de-onboarding/ex6_concepts.txt.
Expected Output (ex6_concepts.txt):
Interval triggers run tasks at fixed time intervals (e.g., every 5 seconds), ideal for continuous tasks like monitoring sales data. Cron triggers use a schedule (e.g., daily at 8 AM), perfect for recurring reports like Hijra Group’s daily Sharia-compliant transaction summaries. Use interval for frequent, simple tasks and cron for precise, time-specific ETL jobs. APScheduler simplifies setup within Python scripts, avoiding Linux cron’s need for external configuration, but requires running a Python process, unlike cron’s system-level scheduling.Follow-Along Instructions:
- Save explanation to
de-onboarding/ex6_concepts.txt. - Verify content with
cat ex6_concepts.txt(Unix/macOS) ortype ex6_concepts.txt(Windows).
Exercise 7: Debug Missing Function
Fix this buggy code where the scheduler fails due to a missing function, ensuring 4-space indentation.
Buggy Code:
from apscheduler.schedulers.blocking import BlockingScheduler
scheduler = BlockingScheduler()
scheduler.add_job(nonexistent_task, "interval", seconds=5) # Bug: Missing function
scheduler.start()Expected Output (every 5 seconds):
Starting scheduler...
Scheduled 1 job(s)
Task executedFollow-Along Instructions:
- Save as
de-onboarding/ex7_debug.py. - Run:
python ex7_debug.pyto see the error. - Fix by defining
nonexistent_taskand re-run. - Test: Verify output every 5 seconds. Debug with
print(scheduler.get_jobs()).
55.4 Exercise Solutions
Solution to Exercise 1: Interval Scheduler
from apscheduler.schedulers.blocking import BlockingScheduler
from typing import NoReturn
from datetime import datetime
def print_time() -> None:
"""Print current time."""
print(f"Task executed at: {datetime.now()}")
def run_scheduler() -> NoReturn:
"""Run interval scheduler."""
scheduler = BlockingScheduler()
scheduler.add_job(print_time, "interval", seconds=3, id="interval_time_task")
print("Starting scheduler...")
print(f"Scheduled {len(scheduler.get_jobs())} job(s)")
scheduler.start()
print("Scheduler stopped")
if __name__ == "__main__":
run_scheduler()Solution to Exercise 2: Cron Scheduler
from apscheduler.schedulers.blocking import BlockingScheduler
from typing import NoReturn
from datetime import datetime
def print_time() -> None:
"""Print current time."""
print(f"Task executed at: {datetime.now()}")
def run_scheduler() -> NoReturn:
"""Run cron scheduler."""
scheduler = BlockingScheduler()
scheduler.add_job(print_time, "cron", second="*/10", id="cron_time_task")
print("Starting scheduler...")
print(f"Scheduled {len(scheduler.get_jobs())} job(s)")
scheduler.start()
print("Scheduler stopped")
if __name__ == "__main__":
run_scheduler()Solution to Exercise 3: Scheduled ETL
from apscheduler.schedulers.blocking import BlockingScheduler
from typing import Dict, Tuple, NoReturn
import pandas as pd
import yaml
import json
import utils
def read_config(config_path: str) -> Dict[str, Any]:
"""Read YAML configuration."""
with open(config_path, "r") as file:
config = yaml.safe_load(file)
return config
def load_and_validate_sales(csv_path: str, config: Dict[str, Any]) -> Tuple[pd.DataFrame, int, int]:
"""Load and validate sales CSV."""
df = pd.read_csv(csv_path)
df = df.dropna(subset=["product"])
df = df[df["product"].str.startswith(config["product_prefix"])]
df = df[df["quantity"].apply(utils.is_integer)]
df["quantity"] = df["quantity"].astype(int)
df = df[df["quantity"] <= config["max_quantity"]]
df = df[df["price"].apply(utils.is_numeric_value)]
df = df[df["price"] > 0]
df = df[df["price"] >= config["min_price"]]
return df, len(df), len(df)
def process_sales(df: pd.DataFrame, config: Dict[str, Any]) -> Tuple[Dict[str, Any], int]:
"""Process sales data."""
if df.empty:
return {"total_sales": 0.0, "unique_products": [], "top_products": {}}, 0
df["amount"] = df["price"] * df["quantity"]
total_sales = df["amount"].sum()
unique_products = df["product"].unique().tolist()
sales_by_product = df.groupby("product")["amount"].sum()
top_products = sales_by_product.sort_values(ascending=False).head(3).to_dict()
return {
"total_sales": float(total_sales),
"unique_products": unique_products,
"top_products": top_products
}, len(df)
def export_results(results: Dict[str, Any], json_path: str) -> None:
"""Export results to JSON."""
with open(json_path, "w") as file:
json.dump(results, file, indent=2)
def etl_task(csv_path: str, config_path: str, json_path: str) -> None:
"""ETL task."""
config = read_config(config_path)
df, valid_sales, total_records = load_and_validate_sales(csv_path, config)
results, valid_sales = process_sales(df, config)
export_results(results, json_path)
def run_scheduler(csv_path: str, config_path: str, json_path: str) -> NoReturn:
"""Run scheduler."""
scheduler = BlockingScheduler()
scheduler.add_job(
etl_task,
"interval",
seconds=5,
args=[csv_path, config_path, json_path],
id="etl_task"
)
print("Starting scheduler...")
print(f"Scheduled {len(scheduler.get_jobs())} job(s)")
scheduler.start()
print("Scheduler stopped")
def main() -> None:
"""Main function."""
run_scheduler("data/sample.csv", "data/config.yaml", "data/sample_report.json")
if __name__ == "__main__":
main()Solution to Exercise 4: Debug Scheduler Bug
from apscheduler.schedulers.blocking import BlockingScheduler
from typing import NoReturn
def print_time() -> None:
"""Print task execution."""
print("Task executed")
def run_scheduler() -> NoReturn:
"""Run scheduler."""
scheduler = BlockingScheduler()
scheduler.add_job(print_time, "interval", seconds=5, id="print_task") # Fix: 5 seconds
print("Starting scheduler...")
print(f"Scheduled {len(scheduler.get_jobs())} job(s)")
scheduler.start()
print("Scheduler stopped")
if __name__ == "__main__":
run_scheduler()Solution to Exercise 5: Pytest for Scheduler
import pytest
import json
import os
from typing import Dict, Any
from ex3_etl import etl_task, read_config
@pytest.fixture
def config() -> Dict[str, Any]:
"""Fixture for config."""
return read_config("data/config.yaml")
def test_etl_task(tmp_path: Any, config: Dict[str, Any]) -> None:
"""Test ETL task."""
json_path = tmp_path / "sample_report.json"
etl_task("data/sample.csv", "data/config.yaml", str(json_path))
assert os.path.exists(json_path)
with open(json_path, "r") as file:
results = json.load(file)
assert results["total_sales"] == 2249.88Solution to Exercise 7: Debug Missing Function
from apscheduler.schedulers.blocking import BlockingScheduler
from typing import NoReturn
def nonexistent_task() -> None:
"""Print task execution."""
print("Task executed")
def run_scheduler() -> NoReturn:
"""Run scheduler."""
scheduler = BlockingScheduler()
scheduler.add_job(nonexistent_task, "interval", seconds=5, id="fixed_task") # Fix: Define function
print("Starting scheduler...")
print(f"Scheduled {len(scheduler.get_jobs())} job(s)")
scheduler.start()
print("Scheduler stopped")
if __name__ == "__main__":
run_scheduler()55.5 Chapter Summary and Connection to Chapter 56
You’ve mastered:
- APScheduler: Interval and cron scheduling (O(1) scheduling, O(n) job management, ~1MB memory).
- Type Annotations: Type-safe task definitions (Chapter 7).
- ETL Automation: Scheduled data processing with Pandas (Chapter 3).
- Testing: Pytest for robustness (Chapter 9).
- White-Space Sensitivity: 4-space indentation per PEP 8.
The micro-project automated a sales ETL task, scheduling it every 5 seconds, with type annotations and pytest tests, ensuring robust pipelines for Hijra Group’s Sharia-compliant analytics. This prepares for Chapter 56: Airflow Fundamentals, which extends scheduling to complex workflows with DAGs, integrating with PostgreSQL and BigQuery. Scheduling skills will be critical for capstone projects (Chapters 67–70), where Airflow pipelines orchestrate end-to-end data flows, for Chapter 64’s Kubernetes-orchestrated Airflow, and for Chapter 70’s FastAPI-based APIs (building on Chapter 53).