39 - Advanced Pandas
Complexity: Moderate (M)
39.0 Introduction: Why This Matters for Data Engineering
In data engineering, Pandas is a cornerstone for processing structured data, enabling Hijra Group to transform financial transaction datasets into analytics-ready formats for Sharia-compliant fintech solutions. For example, chunking reduces memory usage by approximately 50% for 10 million rows, enabling Hijra Group to process large transaction datasets efficiently. Time-series analysis supports data warehousing (Chapter 28) by aggregating transactions into data marts for analytics, while data quality metrics ensure completeness and accuracy, and data lineage tracks transformations for governance. Building on Chapter 38’s advanced NumPy techniques, this chapter dives into advanced Pandas operations—merging, reshaping, time-series analysis, optimization, visualization, and data quality—critical for handling complex sales and transaction data. Pandas DataFrames, leveraging NumPy arrays, manage millions of rows efficiently (~24MB for 1M rows with numeric types), but advanced techniques like vectorized operations and chunking reduce processing time from O(n²) loops to O(n) for large datasets. This chapter uses type annotations (introduced in Chapter 7) verified by Pyright and pytest tests (from Chapter 9) to ensure robust, production-ready code, aligning with Hijra Group’s pipeline standards.
This chapter avoids concepts not yet introduced, such as concurrency (Chapter 40) or FastAPI integration (Chapter 53), focusing on Pandas-specific techniques for data pipelines. All code adheres to PEP 8’s 4-space indentation, preferring spaces over tabs to prevent IndentationError, ensuring compatibility with Hijra Group’s scripts.
Data Engineering Workflow Context
The following diagram shows how advanced Pandas fits into a data pipeline:
flowchart TD
A["Raw Data (CSV)"] --> B["Pandas DataFrame"]
B --> C{"Advanced Processing"}
C -->|Merge| D["Joined Data"]
C -->|Reshape| E["Pivoted/Aggregated Data"]
C -->|Time-Series| F["Temporal Insights"]
C -->|Optimize| G["Chunked/Indexed Data"]
D --> H["Output (CSV/JSON/Plot/Log)"]
E --> H
F --> H
G --> H
H --> I["Storage/Analysis"]
classDef data fill:#f9f9f9,stroke:#333,stroke-width:2px
classDef process fill:#d0e0ff,stroke:#336,stroke-width:1px
classDef storage fill:#ddffdd,stroke:#363,stroke-width:1px
class A,B,D,E,F,G,H data
class C process
class I storageBuilding On and Preparing For
- Building On:
- Chapter 3: Extends basic Pandas operations (loading, filtering, grouping) to advanced merging and reshaping.
- Chapter 7: Applies type annotations for type-safe DataFrame operations.
- Chapter 8: Leverages logging concepts for validation tracking.
- Chapter 9: Incorporates pytest for testing DataFrame transformations.
- Chapter 28: Supports data warehousing through time-series aggregation.
- Chapter 32: Reinforces data freshness concepts for analytics.
- Chapter 38: Leverages NumPy’s vectorized operations within Pandas for performance.
- Preparing For:
- Chapter 40: Prepares for concurrent data processing with Pandas DataFrames.
- Chapter 41: Enables type-safe pipeline integration with Pydantic.
- Chapter 42: Supports comprehensive pipeline testing.
- Chapter 51: Lays groundwork for BI visualizations with Pandas outputs.
- Chapter 54: Prepares for dbt model transformations.
- Chapter 56: Supports pipeline orchestration with Airflow.
- Chapter 66: Supports observability through logging.
What You’ll Learn
This chapter covers:
- Merging and Joining: Combining DataFrames for integrated analytics.
- Reshaping Data: Pivoting and melting for flexible data structures.
- Time-Series Analysis: Handling temporal data for trends.
- Optimization Techniques: Chunking and indexing for large datasets.
- Testing: Ensuring robustness with pytest.
- Visualization: Plotting trends and quality metrics for stakeholder reporting.
- Data Quality: Detecting duplicates, handling missing data, and ensuring unique IDs.
By the end, you’ll build a type-annotated sales pipeline processing data/sales.csv and data/transactions.csv, merging datasets, analyzing time-series trends, plotting results, ensuring data quality (duplicates, missing values, unique IDs), benchmarking performance, visualizing quality metrics, and optimizing with chunking, all tested with pytest and logged for observability, using 4-space indentation per PEP 8.
Follow-Along Tips:
- Create
de-onboarding/data/and populate withsales.csv,transactions.csv,config.yamlper Appendix 1. - Install libraries:
pip install pandas numpy pyyaml pytest matplotlib. - Use 4-space indentation per PEP 8. Run
python -tt script.pyto detect tab/space mixing. - Debug DataFrames with
print(df.head())and types withprint(df.dtypes). - Save outputs to
data/(e.g.,sales_analysis.json,sales_trend.png,validation.log,benchmark.txt,validation_stats.csv,validation_stats.png). - Verify paths with
ls data/(Unix/macOS) ordir data\(Windows). - Use UTF-8 encoding to avoid
UnicodeDecodeError.
39.1 Merging and Joining DataFrames
Merging DataFrames combines datasets, e.g., sales and transactions, for unified analytics. Pandas supports merge types (inner, left, right, outer) with O(n log n) complexity for sorted keys, leveraging NumPy’s efficient array operations.
39.1.1 Performing Merges
Merge DataFrames on common columns, e.g., product.
from pandas import DataFrame
import pandas as pd
# Load sample DataFrames
sales: DataFrame = pd.read_csv("data/sales.csv")
transactions: DataFrame = pd.read_csv("data/transactions.csv")
# Perform inner merge on product
merged_df: DataFrame = pd.merge(
sales,
transactions,
on="product",
how="inner",
suffixes=("_sales", "_trans")
)
# Print results
print("Merged DataFrame:") # Debug
print(merged_df.head()) # Show first rows
# Expected Output (abridged):
# Merged DataFrame:
# product price_sales quantity_sales transaction_id price_trans quantity_trans date
# 0 Halal Laptop 999.99 2 T001 999.99 2 2023-10-01
# 1 Halal Mouse 24.99 10 T002 24.99 10 2023-10-02
# 2 Halal Keyboard 49.99 5 T003 49.99 5 2023-10-03Follow-Along Instructions:
- Ensure
data/sales.csv,data/transactions.csvexist per Appendix 1. - Save as
de-onboarding/merge_example.py. - Configure editor for 4-space indentation per PEP 8.
- Run:
python merge_example.py. - Verify output shows merged rows.
- Common Errors:
- KeyError: Print
sales.columns,transactions.columnsto checkproduct. - TypeError: Ensure merge keys have compatible types. Print
sales["product"].dtype. - IndentationError: Use 4 spaces. Run
python -tt merge_example.py.
- KeyError: Print
Key Points:
pd.merge(): Combines DataFrames on keys.- Type Annotations:
DataFrameensures type safety. - Time Complexity: O(n log n) for merging n rows (sorting-based).
- Space Complexity: O(n) for merged DataFrame.
- Implication: Merging unifies sales and transaction data for Hijra Group’s analytics.
39.1.2 Testing Merges
Test merge correctness with pytest.
# File: de-onboarding/test_merge.py
from pandas import DataFrame
import pandas as pd
import pytest
def test_merge_sales_transactions() -> None:
"""Test merging sales and transactions."""
sales: DataFrame = pd.read_csv("data/sales.csv")
transactions: DataFrame = pd.read_csv("data/transactions.csv")
merged_df: DataFrame = pd.merge(sales, transactions, on="product", how="inner")
assert len(merged_df) >= 3, "Merge should retain at least 3 rows"
assert "product" in merged_df.columns, "Product column missing"
assert merged_df["product"].str.startswith("Halal").all(), "Non-Halal products found"Follow-Along Instructions:
- Save as
de-onboarding/test_merge.py. - Run:
pytest test_merge.py -v. - Verify tests pass.
- Common Errors:
- AssertionError: Print
merged_df.head()to debug. - FileNotFoundError: Ensure CSV files exist.
- AssertionError: Print
39.2 Reshaping Data
Reshaping transforms DataFrames for analysis, e.g., pivoting for summaries or melting for normalization. These operations are O(n) for n rows, leveraging Pandas’ column-oriented storage.
39.2.1 Pivoting Data
Create pivot tables to summarize data, e.g., sales by product.
from pandas import DataFrame
import pandas as pd
# Load and clean DataFrame
df: DataFrame = pd.read_csv("data/sales.csv")
df = df.dropna(subset=["product"]).query("product.str.startswith('Halal')")
# Pivot to sum quantities by product
pivot_df: DataFrame = df.pivot_table(
values="quantity",
index="product",
aggfunc="sum"
)
# Print results
print("Pivot Table:") # Debug
print(pivot_df)
# Expected Output:
# Pivot Table:
# quantity
# product
# Halal Keyboard 5
# Halal Laptop 2
# Halal Mouse 10Follow-Along Instructions:
- Save as
de-onboarding/pivot_example.py. - Run:
python pivot_example.py. - Verify output shows pivoted quantities.
- Common Errors:
- KeyError: Print
df.columnsto checkquantity. - IndentationError: Use 4 spaces. Run
python -tt pivot_example.py.
- KeyError: Print
Key Points:
pivot_table(): Aggregates data by categories.- Time Complexity: O(n) for n rows.
- Space Complexity: O(k) for k unique index values.
- Implication: Summarizes sales for reporting.
39.2.2 Melting Data
Melt DataFrames to normalize data, e.g., converting wide to long format.
from pandas import DataFrame
import pandas as pd
# Create sample DataFrame
df: DataFrame = pd.DataFrame({
"product": ["Halal Laptop", "Halal Mouse"],
"sales_2023": [2000, 250],
"sales_2024": [2100, 300]
})
# Melt to long format
melted_df: DataFrame = pd.melt(
df,
id_vars=["product"],
value_vars=["sales_2023", "sales_2024"],
var_name="year",
value_name="sales"
)
# Print results
print("Melted DataFrame:") # Debug
print(melted_df)
# Expected Output:
# Melted DataFrame:
# product year sales
# 0 Halal Laptop sales_2023 2000
# 1 Halal Mouse sales_2023 250
# 2 Halal Laptop sales_2024 2100
# 3 Halal Mouse sales_2024 300Follow-Along Instructions:
- Save as
de-onboarding/melt_example.py. - Run:
python melt_example.py. - Verify output shows long format.
- Common Errors:
- KeyError: Print
df.columnsto checkid_vars.
- KeyError: Print
39.3 Time-Series Analysis
Time-series analysis handles temporal data, e.g., transaction dates, using Pandas’ datetime capabilities. Operations like resampling are O(n) for n rows.
39.3.1 Handling Dates
Parse and analyze dates in transactions.
from pandas import DataFrame
import pandas as pd
# Load transactions
df: DataFrame = pd.read_csv("data/transactions.csv")
df["date"] = pd.to_datetime(df["date"]) # Convert to datetime
# Filter transactions after October 2, 2023
recent_df: DataFrame = df[df["date"] > "2023-10-02"]
# Print results
print("Recent Transactions:") # Debug
print(recent_df)
# Expected Output (abridged):
# Recent Transactions:
# transaction_id product price quantity date
# 2 T003 Halal Keyboard 49.99 5 2023-10-03
# 3 T004 NaN 29.99 3 2023-10-04
# 4 T005 Monitor 199.99 2 2023-10-05Follow-Along Instructions:
- Save as
de-onboarding/time_series.py. - Run:
python time_series.py. - Verify output shows recent transactions.
- Common Errors:
- ValueError: Print
df["date"]to check format. - TypeError: Ensure
dateis datetime. Printdf.dtypes.
- ValueError: Print
39.3.2 Resampling Time-Series
Aggregate data by time periods, e.g., daily sales. Use df.fillna(0) after resampling to handle missing dates, common in transaction data. For weekly aggregates, use resample('W'), useful for summarizing longer-term trends.
from pandas import DataFrame
import pandas as pd
# Load and prepare transactions
df: DataFrame = pd.read_csv("data/transactions.csv")
df["date"] = pd.to_datetime(df["date"])
df["amount"] = df["price"] * df["quantity"]
# Resample by day
daily_sales: DataFrame = df.set_index("date").resample("D")["amount"].sum().fillna(0).reset_index()
# Resample by week
weekly_sales: DataFrame = df.set_index("date").resample("W")["amount"].sum().fillna(0).reset_index()
# Print results
print("Daily Sales:") # Debug
print(daily_sales)
print("Weekly Sales:") # Debug
print(weekly_sales)
# Expected Output (abridged):
# Daily Sales:
# date amount
# 0 2023-10-01 1999.98
# 1 2023-10-02 249.90
# 2 2023-10-03 249.95
# 3 2023-10-04 89.97
# 4 2023-10-05 399.98
# Weekly Sales:
# date amount
# 0 2023-10-08 2989.78Follow-Along Instructions:
- Save as
de-onboarding/resample_example.py. - Run:
python resample_example.py. - Verify output shows daily and weekly aggregates.
39.4 Optimization Techniques
Optimize Pandas for large datasets using chunking and indexing, reducing memory usage and processing time. Pandas leverages NumPy’s vectorized operations (Chapter 38), e.g., df["amount"] = df["price"] * df["quantity"] is O(n) vs. O(n²) loops. Use df.memory_usage() to profile memory consumption, complementing chunking for optimization.
39.4.1 Chunking Large CSVs
Process large CSVs in chunks to manage memory. Note: While sales.csv is small (6 rows), chunking significantly reduces memory usage for production datasets with millions of rows, a common scenario in Hijra Group’s analytics. For production datasets (e.g., millions of rows), chunking typically yields 2–5x speedup, unlike small datasets like sales.csv.
from pandas import DataFrame
import pandas as pd
from typing import Iterator
# Process sales CSV in chunks
def process_chunks(csv_path: str, chunk_size: int = 1000) -> DataFrame:
"""Process CSV in chunks."""
chunks W: Iterator[DataFrame] = pd.read_csv(csv_path, chunksize=chunk_size)
results: list[DataFrame] = []
for chunk in chunks:
chunk = chunk.dropna(subset=["product"]).query("product.str.startswith('Halal')")
chunk["amount"] = chunk["price"] * chunk["quantity"]
results.append(chunk)
return pd.concat(results)
# Run and print
result_df: DataFrame = process_chunks("data/sales.csv")
print("Chunked DataFrame:") # Debug
print(result_df)
# Expected Output:
# Chunked DataFrame:
# product price quantity amount
# 0 Halal Laptop 999.99 2 1999.98
# 1 Halal Mouse 24.99 10 249.90
# 2 Halal Keyboard 49.99 5 249.95Follow-Along Instructions:
- Save as
de-onboarding/chunk_example.py. - Run:
python chunk_example.py. - Verify output shows filtered DataFrame.
- Common Errors:
- MemoryError: Reduce
chunk_size. Printchunk.memory_usage().
- MemoryError: Reduce
Key Points:
chunksize: Limits memory usage (~24MB per 1M rows).- Time Complexity: O(n) for n rows across chunks.
- Space Complexity: O(c) for chunk size c.
39.4.2 Indexing for Performance
Set indexes to speed up queries.
from pandas import DataFrame
import pandas as pd
# Load and index DataFrame
df: DataFrame = pd.read_csv("data/transactions.csv")
df.set_index("transaction_id", inplace=True)
# Query by index
result: DataFrame = df.loc[["T001", "T002"]]
# Print results
print("Indexed Query:") # Debug
print(result)
# Expected Output:
# Indexed Query:
# product price quantity date
# transaction_id
# T001 Halal Laptop 999.99 2 2023-10-01
# T002 Halal Mouse 24.99 10 2023-10-02Follow-Along Instructions:
- Save as
de-onboarding/index_example.py. - Run:
python index_example.py. - Verify output shows queried rows.
39.5 Micro-Project: Advanced Sales Pipeline
Project Requirements
Build a type-annotated sales pipeline processing data/sales.csv and data/transactions.csv for Hijra Group’s analytics, merging datasets, analyzing time-series trends, plotting results, ensuring data quality (duplicates, missing values, unique IDs), benchmarking performance, visualizing quality metrics, and optimizing with chunking. This pipeline ensures Sharia compliance by validating Halal products, aligning with Islamic Financial Services Board (IFSB) standards for Hijra Group’s fintech analytics. Logging supports data governance by creating audit trails for Sharia compliance, critical for fintech analytics. The pipeline supports data freshness (Chapter 32) and scalability, logging validation failures and generating quality metrics for observability.
- Load CSVs with
pandas.read_csv. - Read
data/config.yamlwith PyYAML. - Log input file metadata (lineage) to
data/validation.log. - Merge sales and transactions, validate Halal products, data freshness (within 30 days), decimals, duplicates, and unique transaction IDs.
- Analyze daily and weekly sales trends, handling missing dates.
- Plot daily trends to
data/sales_trend.pngand validation stats todata/validation_stats.png. - Export daily results to
data/sales_analysis.jsonand weekly todata/weekly_sales.json. - Log validation failures to
data/validation.log. - Generate validation stats to
data/validation_stats.csvwith timestamps. - Benchmark chunking performance, saving to
data/benchmark.txt. - Test with pytest.
- Use 4-space indentation per PEP 8.
Sample Input Files
data/sales.csv (Appendix 1):
product,price,quantity
Halal Laptop,999.99,2
Halal Mouse,24.99,10
Halal Keyboard,49.99,5
,29.99,3
Monitor,invalid,2
Headphones,5.00,150data/transactions.csv (Appendix 1):
transaction_id,product,price,quantity,date
T001,Halal Laptop,999.99,2,2023-10-01
T002,Halal Mouse,24.99,10,2023-10-02
T003,Halal Keyboard,49.99,5,2023-10-03
T004,,29.99,3,2023-10-04
T005,Monitor,199.99,2,2023-10-05data/config.yaml (Appendix 1):
min_price: 10.0
max_quantity: 100
required_fields:
- product
- price
- quantity
product_prefix: 'Halal'
max_decimals: 2Data Processing Flow
flowchart TD
A["sales.csv"] --> B["Load CSV"]
C["transactions.csv"] --> D["Load CSV"]
E["config.yaml"] --> F["Load YAML"]
B --> G["Validate DataFrame"]
D --> H["Validate DataFrame"]
G --> I["Log Failures/Lineage"]
H --> I
G --> J["Merge DataFrames"]
H --> J
F --> J
J --> K["Time-Series Analysis"]
K --> L["Plot Trends"]
K --> M["Export JSON"]
K --> N["Log Results"]
K --> O["Benchmark Chunking"]
K --> P["Validation Stats/Plot"]
classDef data fill:#f9f9f9,stroke:#333,stroke-width:2px
classDef process fill:#d0e0ff,stroke:#336,stroke-width:1px
class A,C,E data
class B,D,F,G,H,I,J,K,L,M,N,O,P processAcceptance Criteria
- Go Criteria:
- Loads and validates CSVs and YAML, including
max_decimals, freshness, duplicates, and unique transaction IDs. - Logs input file metadata to
data/validation.log. - Merges DataFrames, ensuring Halal products.
- Computes and plots daily sales trends (
data/sales_trend.png) and validation stats (data/validation_stats.png), handling missing dates. - Exports daily results to
data/sales_analysis.jsonand weekly todata/weekly_sales.json. - Logs validation failures to
data/validation.log. - Generates timestamped validation stats to
data/validation_stats.csv. - Benchmarks chunking performance to
data/benchmark.txt. - Passes pytest tests.
- Uses 4-space indentation per PEP 8.
- Loads and validates CSVs and YAML, including
- No-Go Criteria:
- Fails to load files.
- Incorrect merges, calculations, plots, logs, stats, or benchmarks.
- Missing JSON exports, plots, log file, stats, or benchmark file.
- Lacks type annotations or tests.
Common Pitfalls to Avoid
- Merge Key Errors:
- Problem: Missing
productcolumn. - Solution: Print
df.columns.
- Problem: Missing
- Date Parsing Issues:
- Problem: Invalid date formats.
- Solution: Use
pd.to_datetime. Printdf["date"].
- Memory Overflows:
- Problem: Large CSVs crash.
- Solution: Use chunking. Print
chunk.memory_usage().
- Pyright Type Errors:
- Problem: Incorrect
DataFrameannotations (e.g., usingdictinstead ofpandas.DataFrame) cause Pyright failures. - Solution: Use
from pandas import DataFrameand verify withpyright script.py.
- Problem: Incorrect
- Missing Data in Time-Series:
- Problem: Resampling fails if dates are missing.
- Solution: Use
fillna(0)after resampling. Printdaily_sales.
- Log File Permission Errors:
- Problem: Ensure write permissions for
data/to createvalidation.log,validation_stats.csv,benchmark.txt. - Solution: Check with
ls -l data/(Unix) ordir data\(Windows).
- Problem: Ensure write permissions for
- Permission Errors for Stats/Benchmark:
- Problem: Ensure write permissions for
data/to createvalidation_stats.csv,benchmark.txt,validation_stats.png; check permissions as forvalidation.log. - Solution: Check with
ls -l data/(Unix) ordir data\(Windows).
- Problem: Ensure write permissions for
- Empty CSV Benchmarking:
- Problem: Empty files skew results.
- Solution: Check
df.emptybefore benchmarking. Log tovalidation.log.
- IndentationError:
- Problem: Mixed spaces/tabs.
- Solution: Use 4 spaces. Run
python -tt pipeline.py.
How This Differs from Production
In production, this pipeline would include:
- Concurrency: Parallel processing (Chapter 40).
- APIs: Integration with FastAPI (Chapter 53).
- Monitoring: Observability with Jaeger (Chapter 66).
- Scalability: Distributed processing with Kubernetes (Chapter 61).
Implementation
# File: de-onboarding/sales_pipeline.py
from pandas import DataFrame
import pandas as pd
import yaml
import json
import matplotlib.pyplot as plt
import logging
import time
from typing import Dict, Tuple, Iterator
import os
from datetime import timedelta
# Setup logging
logging.basicConfig(
filename="data/validation.log",
level=logging.WARNING,
format="%(asctime)s - %(levelname)s - %(message)s"
)
def read_config(config_path: str) -> Dict[str, any]:
"""Read YAML configuration."""
print(f"Opening config: {config_path}") # Debug
with open(config_path, "r") as file:
config = yaml.safe_load(file)
print(f"Loaded config: {config}") # Debug
return config
def load_and_validate_csv(csv_path: str, config: Dict[str, any], chunk_size: int = 1000) -> Tuple[DataFrame, Dict[str, int]]:
"""Load and validate CSV in chunks, tracking validation stats."""
print(f"Loading CSV: {csv_path}") # Debug
logging.info(f"Processing file: {csv_path}, rows: {pd.read_csv(csv_path).shape[0]}")
chunks: Iterator[DataFrame] = pd.read_csv(csv_path, chunksize=chunk_size)
results: list[DataFrame] = []
stats = {
"invalid_products": 0,
"invalid_prices": 0,
"invalid_decimals": 0,
"invalid_quantities": 0,
"invalid_dates": 0,
"old_data": 0,
"duplicates": 0,
"duplicate_ids": 0
}
for chunk in chunks:
original_len = len(chunk)
# Remove duplicates
chunk = chunk.drop_duplicates()
stats["duplicates"] += original_len - len(chunk)
if stats["duplicates"]:
logging.warning(f"Removed {original_len - len(chunk)} duplicates in {csv_path}")
# Validate transaction_id uniqueness if present
if "transaction_id" in chunk.columns:
duplicate_ids = chunk[chunk["transaction_id"].duplicated()]
stats["duplicate_ids"] += len(duplicate_ids)
if not duplicate_ids.empty:
logging.warning(f"Duplicate transaction IDs in {csv_path}: {duplicate_ids[['transaction_id']].to_dict('records')}")
chunk = chunk.drop_duplicates(subset="transaction_id")
# Validate product
invalid_products = chunk[chunk["product"].isna() | ~chunk["product"].str.startswith(config["product_prefix"])]
stats["invalid_products"] += len(invalid_products)
if not invalid_products.empty:
logging.warning(f"Invalid products in {csv_path}: {invalid_products[['product']].to_dict('records')}")
chunk = chunk.dropna(subset=["product"]).query("product.str.startswith(@config['product_prefix'])")
# Validate price
invalid_prices = chunk[~chunk["price"].apply(lambda x: isinstance(x, (int, float)) and x >= config["min_price"] and x > 0)]
stats["invalid_prices"] += len(invalid_prices)
if not invalid_prices.empty:
logging.warning(f"Invalid prices in {csv_path}: {invalid_prices[['price']].to_dict('records')}")
chunk = chunk[chunk["price"].apply(lambda x: isinstance(x, (int, float)) and x >= config["min_price"] and x > 0)]
# Validate decimals
invalid_decimals = chunk[~chunk["price"].apply(lambda x: len(str(x).split('.')[-1]) <= config['max_decimals'] if '.' in str(x) else True)]
stats["invalid_decimals"] += len(invalid_decimals)
if not invalid_decimals.empty:
logging.warning(f"Invalid decimals in {csv_path}: {invalid_decimals[['price']].to_dict('records')}")
chunk = chunk[chunk["price"].apply(lambda x: len(str(x).split('.')[-1]) <= config['max_decimals'] if '.' in str(x) else True)]
# Validate quantity
invalid_quantities = chunk[~chunk["quantity"].apply(lambda x: isinstance(x, int) and x <= config["max_quantity"])]
stats["invalid_quantities"] += len(invalid_quantities)
if not invalid_quantities.empty:
logging.warning(f"Invalid quantities in {csv_path}: {invalid_quantities[['quantity']].to_dict('records')}")
chunk = chunk[chunk["quantity"].apply(lambda x: isinstance(x, int) and x <= config["max_quantity"])]
# Validate dates if present
if "date" in chunk.columns:
chunk["date"] = pd.to_datetime(chunk["date"], errors="coerce")
invalid_dates = chunk[chunk["date"].isna()]
stats["invalid_dates"] += len(invalid_dates)
if not invalid_dates.empty:
logging.warning(f"Invalid dates in {csv_path}: {invalid_dates[['date']].to_dict('records')}")
chunk = chunk[chunk["date"].notna()]
max_date = chunk["date"].max()
old_data = chunk[chunk["date"] < max_date - timedelta(days=30)]
stats["old_data"] += len(old_data)
if not old_data.empty:
logging.warning(f"Old data in {csv_path}: {old_data[['date']].to_dict('records')}")
chunk = chunk[chunk["date"] >= max_date - timedelta(days=30)]
results.append(chunk)
df = pd.concat(results) if results else pd.DataFrame()
print(f"Validated {csv_path}:") # Debug
print(df.head()) # Debug
return df, stats
def merge_dataframes(sales: DataFrame, transactions: DataFrame) -> DataFrame:
"""Merge sales and transactions."""
merged_df: DataFrame = pd.merge(
sales,
transactions,
on="product",
how="inner",
suffixes=("_sales", "_trans")
)
print("Merged DataFrame:") # Debug
print(merged_df.head()) # Debug
return merged_df
def analyze_time_series(df: DataFrame) -> Tuple[DataFrame, DataFrame]:
"""Analyze daily and weekly sales trends."""
df["date"] = pd.to_datetime(df["date"])
df["amount"] = df["price_trans"] * df["quantity_trans"]
daily_sales: DataFrame = df.set_index("date").resample("D")["amount"].sum().fillna(0).reset_index()
weekly_sales: DataFrame = df.set_index("date").resample("W")["amount"].sum().fillna(0).reset_index()
print("Daily Sales:") # Debug
print(daily_sales) # Debug
print("Weekly Sales:") # Debug
print(weekly_sales) # Debug
return daily_sales, weekly_sales
def plot_daily_sales(daily_sales: DataFrame, plot_path: str) -> None:
"""Plot daily sales trends."""
if daily_sales.empty:
print("No data to plot") # Debug
logging.warning("No data to plot")
return
plt.figure(figsize=(8, 6))
plt.plot(daily_sales["date"], daily_sales["amount"], marker="o")
plt.title("Daily Sales Trends")
plt.xlabel("Date")
plt.ylabel("Sales Amount ($)")
plt.grid(True)
plt.tight_layout()
plt.savefig(plot_path, dpi=100)
plt.close()
print(f"Plot saved to {plot_path}") # Debug
print(f"File exists: {os.path.exists(plot_path)}") # Debug
def plot_validation_stats(stats: Dict[str, int], plot_path: str) -> None:
"""Plot validation statistics as a bar plot."""
stats_df = pd.DataFrame([stats])
plt.figure(figsize=(10, 6))
stats_df.plot(kind="bar")
plt.title("Validation Statistics")
plt.xlabel("Validation Type")
plt.ylabel("Count")
plt.xticks(rotation=45)
plt.grid(True)
plt.tight_layout()
plt.savefig(plot_path, dpi=100)
plt.close()
print(f"Validation stats plot saved to {plot_path}") # Debug
print(f"File exists: {os.path.exists(plot_path)}") # Debug
def export_results(daily_sales: DataFrame, weekly_sales: DataFrame, daily_path: str, weekly_path: str) -> None:
"""Export daily and weekly results to JSON."""
daily_results = daily_sales.to_dict(orient="records")
weekly_results = weekly_sales.to_dict(orient="records")
print(f"Writing daily to: {daily_path}") # Debug
with open(daily_path, "w") as file:
json.dump(daily_results, file, indent=2)
print(f"Exported daily to {daily_path}") # Debug
print(f"Writing weekly to: {weekly_path}") # Debug
with open(weekly_path, "w") as file:
json.dump(weekly_results, file, indent=2)
print(f"Exported weekly to {weekly_path}") # Debug
def benchmark_chunking(csv_path: str, chunk_size: int = 1000) -> float:
"""Benchmark chunked vs. non-chunked CSV loading."""
df = pd.read_csv(csv_path)
if df.empty:
logging.warning(f"Empty CSV file: {csv_path}, skipping benchmark")
with open("data/benchmark.txt", "w") as f:
f.write("Benchmark skipped: Empty CSV file")
print("Benchmark skipped: Empty CSV file") # Debug
return 0.0
# Non-chunked
start_time = time.time()
df_non_chunked = pd.read_csv(csv_path)
non_chunked_time = time.time() - start_time
# Chunked
start_time = time.time()
chunks: Iterator[DataFrame] = pd.read_csv(csv_path, chunksize=chunk_size)
df_chunked = pd.concat(chunks)
chunked_time = time.time() - start_time
# Save results
with open("data/benchmark.txt", "w") as f:
f.write(f"Non-chunked time: {non_chunked_time:.4f} seconds\n")
f.write(f"Chunked time: {chunked_time:.4f} seconds\n")
f.write(f"Speedup: {non_chunked_time / chunked_time:.2f}x")
print(f"Benchmark results saved to data/benchmark.txt") # Debug
return chunked_time
def generate_validation_stats(stats: Dict[str, int], stats_path: str) -> None:
"""Generate validation statistics CSV with timestamp."""
stats_df = pd.DataFrame([stats])
stats_df["timestamp"] = pd.Timestamp.now()
print(f"Writing stats to: {stats_path}") # Debug
stats_df.to_csv(stats_path, index=False)
print(f"Exported stats to {stats_path}") # Debug
def main() -> None:
"""Main pipeline function."""
config_path = "data/config.yaml"
sales_path = "data/sales.csv"
trans_path = "data/transactions.csv"
daily_path = "data/sales_analysis.json"
weekly_path = "data/weekly_sales.json"
daily_plot_path = "data/sales_trend.png"
stats_path = "data/validation_stats.csv"
stats_plot_path = "data/validation_stats.png"
config = read_config(config_path)
sales_df, sales_stats = load_and_validate_csv(sales_path, config)
trans_df, trans_stats = load_and_validate_csv(trans_path, config)
merged_df = merge_dataframes(sales_df, trans_df)
daily_sales, weekly_sales = analyze_time_series(merged_df)
plot_daily_sales(daily_sales, daily_plot_path)
export_results(daily_sales, weekly_sales, daily_path, weekly_path)
benchmark_chunking(sales_path)
# Combine validation stats
combined_stats = {f"sales_{k}": v for k, v in sales_stats.items()}
combined_stats.update({f"trans_{k}": v for k, v in trans_stats.items()})
generate_validation_stats(combined_stats, stats_path)
plot_validation_stats(combined_stats, stats_plot_path)
print("\nPipeline Summary:")
print(f"Processed {len(merged_df)} merged records")
print(f"Exported daily sales to {daily_path}")
print(f"Exported weekly sales to {weekly_path}")
print(f"Plotted daily trends to {daily_plot_path}")
print(f"Validation logs saved to data/validation.log")
print(f"Validation stats saved to {stats_path}")
print(f"Validation stats plot saved to {stats_plot_path}")
print(f"Benchmark results saved to data/benchmark.txt")
if __name__ == "__main__":
main()# File: de-onboarding/test_sales_pipeline.py
from pandas import DataFrame
import pandas as pd
import pytest
import os
from sales_pipeline import load_and_validate_csv, merge_dataframes, analyze_time_series, plot_daily_sales, benchmark_chunking, generate_validation_stats, plot_validation_stats
@pytest.fixture
def config() -> dict:
"""Provide config fixture."""
return {
"min_price": 10.0,
"max_quantity": 100,
"product_prefix": "Halal",
"required_fields": ["product", "price", "quantity"],
"max_decimals": 2
}
def test_load_and_validate_csv(config: dict) -> None:
"""Test CSV loading and validation."""
df, stats = load_and_validate_csv("data/transactions.csv", config)
assert len(df) == 3, "Should have 3 valid rows"
assert df["product"].str.startswith("Halal").all(), "Non-Halal products found"
assert df["price"].apply(lambda x: len(str(x).split('.')[-1]) <= config["max_decimals"] if "." in str(x) else True).all(), "Invalid decimals"
assert (df["date"] >= df["date"].max() - pd.Timedelta(days=30)).all(), "Data not fresh"
assert df["date"].notna().all(), "Invalid dates found"
assert not df.duplicated().any(), "Duplicates found"
assert not df["transaction_id"].duplicated().any(), "Duplicate transaction IDs found"
assert os.path.exists("data/validation.log"), "Validation log not created"
assert stats["invalid_products"] > 0, "Should detect invalid products"
with open("data/validation.log", "r") as f:
assert f"Processing file: data/transactions.csv" in f.read(), "Lineage log missing"
def test_merge_dataframes() -> None:
"""Test DataFrame merging."""
sales = pd.read_csv("data/sales.csv").head(3) # Valid rows
trans = pd.read_csv("data/transactions.csv").head(3) # Valid rows
merged_df = merge_dataframes(sales, trans)
assert len(merged_df) == 3, "Merge should retain 3 rows"
assert "date" in merged_df.columns, "Date column missing"
def test_analyze_time_series() -> None:
"""Test time-series analysis."""
df = pd.read_csv("data/transactions.csv").head(3)
df["date"] = pd.to_datetime(df["date"])
daily_sales, weekly_sales = analyze_time_series(df)
assert len(daily_sales) == 3, "Should have 3 daily records"
assert len(weekly_sales) == 1, "Should have 1 weekly record"
assert "amount" in daily_sales.columns, "Amount column missing"
assert daily_sales["amount"].notna().all(), "Missing values in daily amounts"
assert weekly_sales["amount"].notna().all(), "Missing values in weekly amounts"
def test_plot_daily_sales(tmp_path) -> None:
"""`tmp_path` is a pytest fixture for temporary files; see Chapter 9."""
df = pd.DataFrame({
"date": pd.to_datetime(["2023-10-01", "2023-10-02", "2023-10-03"]),
"amount": [1999.98, 249.90, 249.95]
})
plot_path = tmp_path / "test_plot.png"
plot_daily_sales(df, str(plot_path))
assert os.path.exists(plot_path), "Plot file not created"
def test_benchmark_chunking(tmp_path) -> None:
"""Test benchmarking chunking."""
benchmark_path = tmp_path / "benchmark.txt"
benchmark_chunking("data/sales.csv")
assert os.path.exists("data/benchmark.txt"), "Benchmark file not created"
# Test empty CSV scenario
empty_csv = tmp_path / "empty.csv"
pd.DataFrame().to_csv(empty_csv, index=False)
benchmark_chunking(str(empty_csv))
assert os.path.exists("data/benchmark.txt"), "Benchmark file not created for empty CSV"
with open("data/benchmark.txt", "r") as f:
assert "Benchmark skipped: Empty CSV file" in f.read(), "Empty CSV not handled"
def test_generate_validation_stats(tmp_path) -> None:
"""Test validation stats generation."""
stats = {
"sales_invalid_products": 3,
"sales_invalid_prices": 1,
"sales_invalid_decimals": 0,
"sales_invalid_quantities": 1,
"sales_invalid_dates": 0,
"sales_old_data": 0,
"sales_duplicates": 0,
"sales_duplicate_ids": 0,
"trans_invalid_products": 2,
"trans_invalid_prices": 0,
"trans_invalid_decimals": 0,
"trans_invalid_quantities": 0,
"trans_invalid_dates": 0,
"trans_old_data": 0,
"trans_duplicates": 0,
"trans_duplicate_ids": 0
}
stats_path = tmp_path / "validation_stats.csv"
generate_validation_stats(stats, str(stats_path))
assert os.path.exists(stats_path), "Validation stats file not created"
stats_df = pd.read_csv(stats_path)
assert "timestamp" in stats_df.columns, "Timestamp column missing"
def test_plot_validation_stats(tmp_path) -> None:
"""Test validation stats plotting."""
stats = {
"sales_invalid_products": 3,
"sales_invalid_prices": 1,
"sales_invalid_decimals": 0,
"sales_invalid_quantities": 1,
"sales_invalid_dates": 0,
"sales_old_data": 0,
"sales_duplicates": 0,
"sales_duplicate_ids": 0,
"trans_invalid_products": 2,
"trans_invalid_prices": 0,
"trans_invalid_decimals": 0,
"trans_invalid_quantities": 0,
"trans_invalid_dates": 0,
"trans_old_data": 0,
"trans_duplicates": 0,
"trans_duplicate_ids": 0
}
plot_path = tmp_path / "validation_stats.png"
plot_validation_stats(stats, str(plot_path))
assert os.path.exists(plot_path), "Validation stats plot not created"Expected Outputs
data/sales_analysis.json:
[
{ "date": "2023-10-01T00:00:00", "amount": 1999.98 },
{ "date": "2023-10-02T00:00:00", "amount": 249.9 },
{ "date": "2023-10-03T00:00:00", "amount": 249.95 }
]data/weekly_sales.json:
[{ "date": "2023-10-08T00:00:00", "amount": 2989.78 }]data/sales_trend.png: Line plot showing daily sales trends, saved with dpi=100.
data/validation_stats.png: Bar plot showing validation statistics, saved with dpi=100.
data/validation.log (example):
2025-04-25 12:00:00,000 - INFO - Processing file: data/sales.csv, rows: 6
2025-04-25 12:00:00,001 - WARNING - Invalid products in data/sales.csv: [{'product': nan}, {'product': 'Monitor'}, {'product': 'Headphones'}]
2025-04-25 12:00:00,002 - WARNING - Invalid prices in data/sales.csv: [{'price': 'invalid'}]
2025-04-25 12:00:00,003 - WARNING - Invalid quantities in data/sales.csv: [{'quantity': 150}]
2025-04-25 12:00:00,004 - INFO - Processing file: data/transactions.csv, rows: 5
2025-04-25 12:00:00,005 - WARNING - Invalid products in data/transactions.csv: [{'product': nan}, {'product': 'Monitor'}]data/validation_stats.csv (example):
sales_invalid_products,sales_invalid_prices,sales_invalid_decimals,sales_invalid_quantities,sales_invalid_dates,sales_old_data,sales_duplicates,sales_duplicate_ids,trans_invalid_products,trans_invalid_prices,trans_invalid_decimals,trans_invalid_quantities,trans_invalid_dates,trans_old_data,trans_duplicates,trans_duplicate_ids,timestamp
3,1,0,1,0,0,0,0,2,0,0,0,0,0,0,0,2025-04-25 12:00:00data/benchmark.txt (example):
Non-chunked time: 0.0023 seconds
Chunked time: 0.0031 seconds
Speedup: 0.74xConsole Output (abridged):
Opening config: data/config.yaml
Loaded config: {'min_price': 10.0, 'max_quantity': 100, ...}
Loading CSV: data/sales.csv
Validated data/sales.csv:
product price quantity
0 Halal Laptop 999.99 2
1 Halal Mouse 24.99 10
2 Halal Keyboard 49.99 5
Loading CSV: data/transactions.csv
Validated data/transactions.csv:
...
Merged DataFrame:
...
Daily Sales:
date amount
0 2023-10-01 1999.98
1 2023-10-02 249.90
2 2023-10-03 249.95
Weekly Sales:
date amount
0 2023-10-08 2989.78
Plot saved to data/sales_trend.png
File exists: True
Writing daily to: data/sales_analysis.json
Exported daily to data/sales_analysis.json
Writing weekly to: data/weekly_sales.json
Exported weekly to data/weekly_sales.json
Benchmark results saved to data/benchmark.txt
Writing stats to: data/validation_stats.csv
Exported stats to data/validation_stats.csv
Validation stats plot saved to data/validation_stats.png
File exists: True
Pipeline Summary:
Processed 3 merged records
Exported daily sales to data/sales_analysis.json
Exported weekly sales to data/weekly_sales.json
Plotted daily trends to data/sales_trend.png
Validation logs saved to data/validation.log
Validation stats saved to data/validation_stats.csv
Validation stats plot saved to data/validation_stats.png
Benchmark results saved to data/benchmark.txtHow to Run and Test
Setup:
- Create
de-onboarding/data/and populate withsales.csv,transactions.csv,config.yamlper Appendix 1. - Install:
pip install pandas numpy pyyaml pytest matplotlib. - Configure editor for 4-space indentation per PEP 8 (VS Code: “Editor: Tab Size” = 4, “Editor: Insert Spaces” = true, “Editor: Detect Indentation” = false).
- Save
sales_pipeline.pyandtest_sales_pipeline.py. - Ensure Pyright is installed:
pip install pyright.
- Create
Run:
- Run:
pyright sales_pipeline.pyto verify type annotations. - Run:
python sales_pipeline.py. - Outputs:
data/sales_analysis.json,data/weekly_sales.json,data/sales_trend.png,data/validation.log,data/validation_stats.csv,data/validation_stats.png,data/benchmark.txt, console logs.
- Run:
Test:
- Run:
pytest test_sales_pipeline.py -v. - Verify all tests pass, including log, stats, plot, and benchmark file creation. The
tmp_pathfixture in pytest creates temporary files for testing; see Chapter 9 for pytest basics.
- Run:
39.6 Practice Exercises
Exercise 1: Merge Sales and Transactions
Write a type-annotated function to merge sales.csv and transactions.csv, with 4-space indentation.
Expected Output:
Merged DataFrame with 3 rowsSolution:
from pandas import DataFrame
import pandas as pd
def merge_sales_transactions(sales_path: str, trans_path: str) -> DataFrame:
"""Merge sales and transactions."""
sales: DataFrame = pd.read_csv(sales_path)
trans: DataFrame = pd.read_csv(trans_path)
merged: DataFrame = pd.merge(sales, trans, on="product", how="inner")
print(f"Merged DataFrame with {len(merged)} rows") # Debug
return merged
# Test
print(merge_sales_transactions("data/sales.csv", "data/transactions.csv"))Exercise 2: Pivot Sales Data
Write a function to pivot sales data by product, with 4-space indentation.
Expected Output:
quantity
product
Halal Keyboard 5
Halal Laptop 2
Halal Mouse 10Solution:
from pandas import DataFrame
import pandas as pd
def pivot_sales(csv_path: str) -> DataFrame:
"""Pivot sales by product."""
df: DataFrame = pd.read_csv(csv_path)
df = df.dropna(subset=["product"]).query("product.str.startswith('Halal')")
pivot_df: DataFrame = df.pivot_table(values="quantity", index="product", aggfunc="sum")
print(pivot_df) # Debug
return pivot_df
# Test
print(pivot_sales("data/sales.csv"))Exercise 3: Time-Series Resampling
Write a function to resample transactions by day, with 4-space indentation.
Expected Output:
date amount
0 2023-10-01 1999.98
1 2023-10-02 249.90
...Solution:
from pandas import DataFrame
import pandas as pd
def resample_transactions(csv_path: str) -> DataFrame:
"""Resample transactions by day."""
df: DataFrame = pd.read_csv(csv_path)
df["date"] = pd.to_datetime(df["date"])
df["amount"] = df["price"] * df["quantity"]
daily_sales: DataFrame = df.set_index("date").resample("D")["amount"].sum().fillna(0).reset_index()
print(daily_sales) # Debug
return daily_sales
# Test
print(resample_transactions("data/transactions.csv"))Exercise 4: Chunked Processing
Write a function to process sales.csv in chunks, with 4-space indentation.
Expected Output:
Chunked DataFrame with 3 rowsSolution:
from pandas import DataFrame
import pandas as pd
from typing import Iterator
def process_sales_chunks(csv_path: str, chunk_size: int = 1000) -> DataFrame:
"""Process sales CSV in chunks."""
chunks: Iterator[DataFrame] = pd.read_csv(csv_path, chunksize=chunk_size)
results: list[DataFrame] = []
for chunk in chunks:
chunk = chunk.dropna(subset=["product"]).query("product.str.startswith('Halal')")
results.append(chunk)
df = pd.concat(results) if results else pd.DataFrame()
print(f"Chunked DataFrame with {len(df)} rows") # Debug
return df
# Test
print(process_sales_chunks("data/sales.csv"))Exercise 5: Debug a Merge Bug
Fix this buggy code that merges on the wrong column, with 4-space indentation.
Buggy Code:
from pandas import DataFrame
import pandas as pd
def merge_sales_transactions(sales_path: str, trans_path: str) -> DataFrame:
sales: DataFrame = pd.read_csv(sales_path)
trans: DataFrame = pd.read_csv(trans_path)
merged: DataFrame = pd.merge(sales, trans, on="price", how="inner") # Bug
return mergedSolution:
from pandas import DataFrame
import pandas as pd
def merge_sales_transactions(sales_path: str, trans_path: str) -> DataFrame:
"""Merge sales and transactions."""
sales: DataFrame = pd.read_csv(sales_path)
trans: DataFrame = pd.read_csv(trans_path)
merged: DataFrame = pd.merge(sales, trans, on="product", how="inner") # Fix
print(f"Merged DataFrame with {len(merged)} rows") # Debug
return merged
# Test
print(merge_sales_transactions("data/sales.csv", "data/transactions.csv"))Explanation:
- Bug: Merging on
priceproduces incorrect joins, as prices are not unique. Fixed by usingproduct.
Exercise 6: Pandas vs. NumPy Comparison
Write a type-annotated function to sum sales amounts using Pandas or NumPy based on a flag, and explain performance trade-offs in a text file, with 4-space indentation.
Sample Input:
prices = [999.99, 24.99]
quantities = [2, 10]
use_numpy = TrueExpected Output:
2249.88
Wrote explanation to de-onboarding/ex6_concepts.txtSolution:
from pandas import DataFrame
import pandas as pd
import numpy as np
from typing import Union
def process_sales(prices: list[float], quantities: list[int], use_numpy: bool) -> Union[float, DataFrame]:
"""Process sales with Pandas or NumPy and explain trade-offs."""
if len(prices) != len(quantities):
print("Error: Mismatched lengths") # Debug
return 0.0
if use_numpy:
prices_array = np.array(prices)
quantities_array = np.array(quantities)
amounts = prices_array * quantities_array
total = np.sum(amounts)
explanation = (
"NumPy is chosen for numerical computations like summing sales amounts due to its fast, "
"C-based array operations, ideal for large datasets. Pandas’ column-oriented storage enables "
"fast column access but slower row iterations, making it better for structured data tasks."
)
with open("de-onboarding/ex6_concepts.txt", "w") as f:
f.write(explanation)
print("Wrote explanation to de-onboarding/ex6_concepts.txt") # Debug
return total
else:
df = pd.DataFrame({"price": prices, "quantity": quantities})
df["amount"] = df["price"] * df["quantity"]
explanation = (
"Pandas is chosen for structured data with labeled columns, ideal for filtering or grouping. "
"Its column-oriented storage enables fast column access but slower row iterations compared to "
"NumPy’s contiguous arrays for numerical tasks."
)
with open("de-onboarding/ex6_concepts.txt", "w") as f:
f.write(explanation)
print("Wrote explanation to de-onboarding/ex6_concepts.txt") # Debug
return df[["price", "quantity", "amount"]]
# Test
print(process_sales([999.99, 24.99], [2, 10], True))Explanation (saved to de-onboarding/ex6_concepts.txt):
NumPy is chosen for numerical computations like summing sales amounts due to its fast, C-based array operations, ideal for large datasets. Pandas’ column-oriented storage enables fast column access but slower row iterations, making it better for structured data tasks.Exercise 7: Handle Missing Values
Write a type-annotated function to detect and handle missing values in transactions.csv using fillna(), logging results to a file, with 4-space indentation.
Expected Output:
Handled missing values, logged to de-onboarding/missing_values.logSolution:
from pandas import DataFrame
import pandas as pd
import logging
# Setup logging
logging.basicConfig(
filename="de-onboarding/missing_values.log",
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
def handle_missing_values(csv_path: str) -> DataFrame:
"""Detect and handle missing values, logging results."""
df: DataFrame = pd.read_csv(csv_path)
missing = df.isna().sum()
logging.info(f"Missing values before: {missing.to_dict()}")
df = df.fillna({"product": "Unknown", "price": 0.0, "quantity": 0, "date": pd.Timestamp("1970-01-01")})
logging.info(f"Missing values after: {df.isna().sum().to_dict()}")
print("Handled missing values, logged to de-onboarding/missing_values.log") # Debug
return df
# Test
print(handle_missing_values("data/transactions.csv"))Log Output (saved to de-onboarding/missing_values.log):
2025-04-25 12:00:00,000 - INFO - Missing values before: {'transaction_id': 0, 'product': 1, 'price': 0, 'quantity': 0, 'date': 0}
2025-04-25 12:00:00,001 - INFO - Missing values after: {'transaction_id': 0, 'product': 0, 'price': 0, 'quantity': 0, 'date': 0}Exercise 8: Benchmark Chunking Performance
Write a type-annotated function to benchmark chunked vs. non-chunked CSV loading in sales.csv, saving results to a file, with 4-space indentation.
Expected Output:
Benchmark results saved to de-onboarding/chunk_benchmark.txtSolution:
from pandas import DataFrame
import pandas as pd
import time
from typing import Iterator
def benchmark_chunking(csv_path: str, chunk_size: int = 1000) -> float:
"""Benchmark chunked vs. non-chunked CSV loading."""
# Non-chunked
start_time = time.time()
df_non_chunked = pd.read_csv(csv_path)
non_chunked_time = time.time() - start_time
# Chunked
start_time = time.time()
chunks: Iterator[DataFrame] = pd.read_csv(csv_path, chunksize=chunk_size)
df_chunked = pd.concat(chunks)
chunked_time = time.time() - start_time
# Save results
with open("de-onboarding/chunk_benchmark.txt", "w") as f:
f.write(f"Non-chunked time: {non_chunked_time:.4f} seconds\n")
f.write(f"Chunked time: {chunked_time:.4f} seconds\n")
f.write(f"Speedup: {non_chunked_time / chunked_time:.2f}x")
print("Benchmark results saved to de-onboarding/chunk_benchmark.txt") # Debug
return chunked_time
# Test
print(benchmark_chunking("data/sales.csv"))Benchmark Output (saved to de-onboarding/chunk_benchmark.txt):
Non-chunked time: 0.0023 seconds
Chunked time: 0.0031 seconds
Speedup: 0.74xExercise 9: Visualize Validation Stats
Write a type-annotated function to visualize validation stats from validation_stats.csv as a bar plot, saving to a file, with 4-space indentation.
Expected Output:
Validation stats plot saved to de-onboarding/validation_stats_plot.pngSolution:
from pandas import DataFrame
import pandas as pd
import matplotlib.pyplot as plt
def plot_validation_stats(csv_path: str, plot_path: str) -> None:
"""Visualize validation stats as a bar plot."""
df: DataFrame = pd.read_csv(csv_path)
plt.figure(figsize=(10, 6))
df.drop(columns=["timestamp"]).plot(kind="bar")
plt.title("Validation Statistics")
plt.xlabel("Validation Type")
plt.ylabel("Count")
plt.xticks(rotation=45)
plt.grid(True)
plt.tight_layout()
plt.savefig(plot_path, dpi=100)
plt.close()
print(f"Validation stats plot saved to {plot_path}") # Debug
# Test
print(plot_validation_stats("data/validation_stats.csv", "de-onboarding/validation_stats_plot.png"))39.7 Chapter Summary and Connection to Chapter 40
You’ve mastered:
- Merging: Combining DataFrames (O(n log n)).
- Reshaping: Pivoting/melting for analytics (O(n)).
- Time-Series: Analyzing temporal data (O(n)).
- Optimization: Chunking/indexing for scalability.
- Testing: Ensuring robustness with pytest.
- Visualization: Plotting trends and quality metrics for reporting.
- Data Quality: Handling duplicates, missing values, and unique IDs.
- White-Space Sensitivity: Using 4-space indentation per PEP 8.
The micro-project built a type-safe pipeline, merging sales and transactions, analyzing and plotting daily/weekly trends, validating data freshness and quality, logging failures with lineage, benchmarking performance, visualizing quality metrics, and optimizing with chunking, all tested with pytest. These skills—merging, time-series analysis, and optimization—enable end-to-end pipelines in Chapters 67–71, integrating data lakes with FastAPI for real-time analytics. Plotting daily trends and validation stats prepares for interactive dashboards with Metabase (Chapter 51), enhancing stakeholder reporting. Pandas’ transformations prepare for dbt models (Chapter 54), enabling orchestrated data pipelines. Logging validation failures supports CI/CD pipelines (Chapter 66), ensuring traceability in production deployments. Type-safe Pandas operations prepare for Pydantic validation (Chapter 41), ensuring robust pipeline inputs. Comprehensive pytest tests prepare for pipeline test coverage metrics (Chapter 42), ensuring robust validation. Data quality metrics and logging prepare for Airflow monitoring (Chapter 56), enabling pipeline orchestration. This prepares for Chapter 40’s concurrency, where Pandas DataFrames will be processed in parallel using aiohttp, enhancing pipeline efficiency for Hijra Group’s real-time analytics.