23 - Type Safe Database Integration
Complexity: Moderate (M)
23.0 Introduction: Why This Matters for Data Engineering
In data engineering, ensuring reliable and maintainable database interactions is critical for Hijra Group’s Sharia-compliant fintech analytics, where financial transaction data must be processed with precision and compliance. Type-safe programming reduces runtime errors by enforcing data type consistency at development time, catching issues like mismatched column types before they impact production pipelines. For example, a type-safe SQLite or PostgreSQL client prevents a string being passed where a float is expected for a price column, avoiding errors in analytics pipelines. This chapter builds on prior database fundamentals (Chapters 12–22) and static typing (Chapter 7), integrating SQLite and PostgreSQL with Pydantic for validation and Pyright for type checking, ensuring robust data pipelines.
This chapter aligns with Hijra Group’s need for scalable, type-safe data pipelines by combining SQLite for lightweight prototyping and PostgreSQL for production-grade databases. Using type annotations verified by Pyright, you’ll create a pipeline that processes sales and transaction data, leveraging data/sales.csv and data/transactions.csv from Appendix 1. The micro-project integrates both databases with Pydantic models, preparing for cloud analytics in Chapter 24. All Python code includes type annotations (per Chapter 7) and is tested with pytest (per Chapter 9), using 4-space indentation per PEP 8, preferring spaces over tabs to avoid IndentationError.
Data Engineering Workflow Context
This diagram illustrates how type-safe database integration fits into a data pipeline:
flowchart TD
A["Raw Data (CSV)"] --> B["Type-Safe Python Client"]
B --> C{"Database Operations"}
C -->|Validate| D["Pydantic Models"]
C -->|Query| E["SQLite/PostgreSQL"]
D --> F["Validated Data"]
E --> F
F --> G["Output (JSON/Report)"]
G --> H["Cloud Analytics"]
classDef data fill:#f9f9f9,stroke:#333,stroke-width:2px
classDef process fill:#d0e0ff,stroke:#336,stroke-width:1px
classDef storage fill:#ddffdd,stroke:#363,stroke-width:1px
class A,D,F,G data
class B,C process
class E,H storageBuilding On and Preparing For
- Building On:
- Chapter 7 (Static Typing): Extends Pyright-verified type annotations to database operations.
- Chapter 9 (Testing): Applies
pytestfor unit and integration tests. - Chapter 12–15 (SQLite): Leverages SQLite basics, Python integration, and advanced operations.
- Chapter 16–17, 21–22 (PostgreSQL): Uses PostgreSQL querying and optimization.
- Chapter 19–20 (Advanced SQLite): Builds on advanced querying and indexing.
- Preparing For:
- Chapter 24 (Checkpoint 3B): Consolidates database skills for cloud analytics.
- Chapter 25–30 (Cloud Analytics): Prepares for BigQuery integration with type-safe data.
- Chapter 53 (FastAPI): Enables type-safe APIs using Pydantic models.
What You’ll Learn
This chapter covers:
- Type-Safe Models: Using Pydantic for data validation with type annotations.
- SQLite Integration: Type-safe queries with
sqlite3. - PostgreSQL Integration: Type-safe queries with
psycopg2. - Unified Pipeline: Combining both databases with Pydantic validation.
- Testing: Unit and integration tests with
pytest. - Logging: Basic logging for pipeline observability.
By the end, you’ll build a type-safe data pipeline that processes data/sales.csv and data/transactions.csv, validates data with Pydantic, stores it in SQLite and PostgreSQL, and exports results to JSON, all verified by Pyright and tested with pytest. The pipeline uses 4-space indentation per PEP 8, ensuring maintainability.
Follow-Along Tips:
- Create
de-onboarding/data/and populate with files from Appendix 1 (sales.csv,transactions.csv,sales.db,config.yaml). - Install libraries:
pip install pydantic psycopg2-binary pyyaml pytest pyright. - Set up PostgreSQL (e.g., via Docker:
docker run -p 5432:5432 -e POSTGRES_PASSWORD=postgres postgres). If Docker fails (e.g., port 5432 in use), rundocker psto check running containers and usedocker stop <container_id>to free the port. - Configure editor for 4-space indentation per PEP 8 (VS Code: “Editor: Tab Size” = 4, “Editor: Insert Spaces” = true, “Editor: Detect Indentation” = false).
- Use print statements (e.g.,
print(df.head())) for debugging. - Verify database connections with
sqlite3 data/sales.db "SELECT * FROM sales;"andpsql -U postgres -h localhost. - Run
python -tt script.pyto detect tab/space mixing.
23.1 Type-Safe Models with Pydantic
Pydantic enforces data validation using type annotations, ensuring fields like price (float) or quantity (int) match expected types. Pydantic models are Python classes that validate data at runtime, catching errors early. For example, a Sale model ensures price is a positive float, reducing database errors.
23.1.1 Defining Pydantic Models
Create models for sales and transactions.
# File: de-onboarding/models.py
from pydantic import BaseModel, Field # Import Pydantic
from typing import Optional # For optional fields
from datetime import date # For date fields
class Sale(BaseModel): # Sale model
"""Pydantic model for sales data."""
product: str = Field(..., min_length=1) # Non-empty string
price: float = Field(..., gt=0) # Positive float
quantity: int = Field(..., gt=0) # Positive integer
class Transaction(BaseModel): # Transaction model
"""Pydantic model for transaction data."""
transaction_id: str = Field(..., min_length=1) # Non-empty string
product: Optional[str] = None # Optional product
price: float = Field(..., gt=0) # Positive float
quantity: int = Field(..., gt=0) # Positive integer
date: date # Date fieldFollow-Along Instructions:
- Save as
de-onboarding/models.py. - Install Pydantic:
pip install pydantic. - Configure editor for 4-space indentation per PEP 8.
- Test with:
from models import Sale, Transaction sale = Sale(product="Halal Laptop", price=999.99, quantity=2) trans = Transaction(transaction_id="T001", product="Halal Laptop", price=999.99, quantity=2, date=date(2023, 10, 1)) print(sale, trans) - Common Errors:
- ValidationError: If
price <= 0, Pydantic raises an error. Printsale.dict()to debug. - ModuleNotFoundError: Install
pydantic. - IndentationError: Use 4 spaces. Run
python -tt models.py.
- ValidationError: If
Key Points:
- Pydantic Fields:
Field(..., constraint)enforces rules (e.g.,gt=0for positive values). - Type Annotations:
str,float,int,Optional[str]ensure type safety. - Underlying Implementation: Pydantic uses Python’s type hints and runtime validation, integrating with Pyright for static checks. Validation is O(1) per field.
- Performance Considerations:
- Time Complexity: O(n) for validating n records.
- Space Complexity: O(1) per model instance.
- Implication: Ensures data integrity for Hijra Group’s transaction pipelines.
23.2 Type-Safe SQLite Integration
Integrate SQLite with type-safe queries using sqlite3 and Pydantic models.
23.2.1 SQLite Client with Pydantic
Create a type-safe SQLite client.
# File: de-onboarding/sqlite_client.py
import sqlite3
from typing import List, Dict, Any
from models import Sale
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class SQLiteClient:
"""Type-safe SQLite client for sales data."""
def __init__(self, db_path: str) -> None:
"""Initialize SQLite connection."""
self.db_path: str = db_path
self.conn: sqlite3.Connection = sqlite3.connect(db_path)
self.cursor: sqlite3.Cursor = self.conn.cursor()
logger.info(f"Connected to SQLite: {db_path}")
def insert_sale(self, sale: Sale) -> None:
"""Insert a sale into the database."""
query: str = """
INSERT INTO sales (product, price, quantity)
VALUES (?, ?, ?)
"""
params: tuple = (sale.product, sale.price, sale.quantity)
self.cursor.execute(query, params)
self.conn.commit()
logger.info(f"Inserted sale: {sale}")
def fetch_sales(self) -> List[Dict[str, Any]]:
"""Fetch all sales from the database."""
query: str = "SELECT product, price, quantity FROM sales"
self.cursor.execute(query)
rows: List[tuple] = self.cursor.fetchall()
return [
{"product": row[0], "price": row[1], "quantity": row[2]}
for row in rows
]
def close(self) -> None:
"""Close database connection."""
self.conn.close()
logger.info(f"Closed SQLite connection: {self.db_path}")Follow-Along Instructions:
- Save as
de-onboarding/sqlite_client.py. - Ensure
data/sales.dbexists per Appendix 1. - Test with:
from sqlite_client import SQLiteClient from models import Sale client = SQLiteClient("data/sales.db") sale = Sale(product="Halal Mouse", price=24.99, quantity=10) client.insert_sale(sale) print(client.fetch_sales()) client.close() - Common Errors:
- DatabaseError: Verify
salestable exists withsqlite3 data/sales.db ".schema sales". - IndentationError: Use 4 spaces. Run
python -tt sqlite_client.py.
- DatabaseError: Verify
Key Points:
- Type Annotations:
Sale,List[Dict[str, Any]]ensure type safety. - Logging: Tracks operations for observability.
- Time Complexity: O(1) for single-row inserts, O(n) for fetching or batch inserting n rows.
- Space Complexity: O(n) for storing n rows in memory.
- Implication: Lightweight SQLite client for prototyping pipelines.
23.3 Type-Safe PostgreSQL Integration
Integrate PostgreSQL with psycopg2 and Pydantic models.
23.3.1 PostgreSQL Client with Pydantic
Create a type-safe PostgreSQL client for transactions.
# File: de-onboarding/postgres_client.py
import psycopg2
from typing import List, Dict, Any
from models import Transaction
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class PostgresClient:
"""Type-safe PostgreSQL client for transaction data."""
def __init__(self, host: str, database: str, user: str, password: str) -> None:
"""Initialize PostgreSQL connection."""
self.conn: psycopg2.extensions.connection = psycopg2.connect(
host=host, database=database, user=user, password=password
)
self.cursor: psycopg2.extensions.cursor = self.conn.cursor()
self.create_table()
logger.info(f"Connected to PostgreSQL: {database}")
def create_table(self) -> None:
"""Create transactions table if not exists."""
query: str = """
CREATE TABLE IF NOT EXISTS transactions (
transaction_id TEXT PRIMARY KEY,
product TEXT,
price REAL CHECK (price > 0),
quantity INTEGER CHECK (quantity > 0),
date DATE
)
"""
self.cursor.execute(query)
self.conn.commit()
logger.info("Ensured transactions table exists")
def insert_transaction(self, trans: Transaction) -> None:
"""Insert a transaction into the database."""
query: str = """
INSERT INTO transactions (transaction_id, product, price, quantity, date)
VALUES (%s, %s, %s, %s, %s)
"""
params: tuple = (
trans.transaction_id, trans.product, trans.price, trans.quantity, trans.date
)
self.cursor.execute(query, params)
self.conn.commit()
logger.info(f"Inserted transaction: {trans}")
def fetch_transactions(self) -> List[Dict[str, Any]]:
"""Fetch all transactions from the database."""
query: str = """
SELECT transaction_id, product, price, quantity, date
FROM transactions
"""
self.cursor.execute(query)
rows: List[tuple] = self.cursor.fetchall()
return [
{
"transaction_id": row[0],
"product": row[1],
"price": row[2],
"quantity": row[3],
"date": row[4]
}
for row in rows
]
def close(self) -> None:
"""Close database connection."""
self.cursor.close()
self.conn.close()
logger.info("Closed PostgreSQL connection")Follow-Along Instructions:
- Save as
de-onboarding/postgres_client.py. - Set up PostgreSQL (e.g., Docker:
docker run -p 5432:5432 -e POSTGRES_PASSWORD=postgres postgres). - Install
psycopg2:pip install psycopg2-binary. - Test with:
from postgres_client import PostgresClient from models import Transaction from datetime import date client = PostgresClient("localhost", "postgres", "postgres", "postgres") trans = Transaction( transaction_id="T001", product="Halal Laptop", price=999.99, quantity=2, date=date(2023, 10, 1) ) client.insert_transaction(trans) print(client.fetch_transactions()) client.close() - Common Errors:
- OperationalError: Verify PostgreSQL is running and credentials are correct. Test with
psql -U postgres -h localhost. - IndentationError: Use 4 spaces. Run
python -tt postgres_client.py.
- OperationalError: Verify PostgreSQL is running and credentials are correct. Test with
Key Points:
- Type Annotations: Ensure type safety for queries and parameters.
- Table Creation: Includes constraints for data integrity.
- Time Complexity: O(1) for single-row inserts, O(n) for fetching or batch inserting n rows.
- Space Complexity: O(n) for storing n rows.
- Implication: Production-grade client for Hijra Group’s transaction data.
23.4 Micro-Project: Type-Safe Data Pipeline
Project Requirements
Build a type-safe data pipeline that:
- Loads
data/sales.csvanddata/transactions.csvwithpandas. - Validates data using Pydantic models (
Sale,Transaction). - Stores valid records in SQLite (
sales.db) and PostgreSQL (transactionstable). - Exports summary metrics to
data/pipeline_results.json. - Includes logging for observability.
- Tests with
pytestfor unit, integration, and edge-case scenarios (e.g., empty CSVs, invalid headers). - Uses type annotations verified by Pyright.
- Uses 4-space indentation per PEP 8, preferring spaces over tabs.
Sample Input Files
data/sales.csv (Appendix 1):
product,price,quantity
Halal Laptop,999.99,2
Halal Mouse,24.99,10
Halal Keyboard,49.99,5
,29.99,3
Monitor,invalid,2
Headphones,5.00,150data/transactions.csv (Appendix 1):
transaction_id,product,price,quantity,date
T001,Halal Laptop,999.99,2,2023-10-01
T002,Halal Mouse,24.99,10,2023-10-02
T003,Halal Keyboard,49.99,5,2023-10-03
T004,,29.99,3,2023-10-04
T005,Monitor,199.99,2,2023-10-05Data Processing Flow
flowchart TD
A["Input CSVs
sales.csv, transactions.csv"] --> B["Load CSVs
pandas.read_csv
load_and_validate_sales/transactions"]
B --> C["Pandas DataFrames"]
C --> D["Validate
Pydantic Models
load_and_validate_sales/transactions"]
D -->|Invalid| E["Log Warning"]
D -->|Valid| F["Insert into Databases
SQLite/PostgreSQL
insert_sale/insert_transaction"]
F --> G["Compute Metrics
Pandas
process_pipeline"]
G --> H["Export JSON
pipeline_results.json
export_results"]
E --> I["End Processing"]
H --> I
classDef data fill:#f9f9f9,stroke:#333,stroke-width:2px
classDef process fill:#d0e0ff,stroke:#336,stroke-width:1px
classDef error fill:#ffdddd,stroke:#933,stroke-width:1px
classDef endpoint fill:#ddffdd,stroke:#363,stroke-width:1px
class A,C,G,H data
class B,D,F process
class E error
class I endpointAcceptance Criteria
- Go Criteria:
- Loads and validates CSVs with Pydantic models.
- Inserts valid records into SQLite and PostgreSQL.
- Computes total sales and transaction counts.
- Exports results to
data/pipeline_results.json. - Includes logging for all steps, including DEBUG-level validation logs.
- Passes
pytesttests (unit, integration, edge cases). - Uses type annotations verified by Pyright.
- Uses 4-space indentation per PEP 8.
- No-Go Criteria:
- Fails to load CSVs or connect to databases.
- Incorrect validation or calculations.
- Missing JSON export.
- Fails
pytesttests. - Uses tabs or inconsistent indentation.
Common Pitfalls to Avoid
- Pydantic Validation Errors:
- Problem: Invalid data (e.g., negative price) raises
ValidationError. - Solution: Log errors and skip invalid records. Print
row.to_dict()to debug. To debug, printstr(e)in theexceptblock to see specific validation errors (e.g., ‘price must be > 0’).
- Problem: Invalid data (e.g., negative price) raises
- Database Connection Issues:
- Problem: SQLite/PostgreSQL connection fails.
- Solution: Verify
sales.dbexists and PostgreSQL is running. Test withsqlite3andpsql.
- Type Mismatches:
- Problem: Pyright reports type errors.
- Solution: Ensure correct annotations (e.g.,
List[Dict[str, Any]]). Runpyright pipeline.py.
- Test Failures:
- Problem:
pytesttests fail due to setup issues. - Solution: Mock database connections. Print test inputs to debug.
- Problem:
- IndentationError:
- Problem: Mixed spaces/tabs.
- Solution: Use 4 spaces per PEP 8. Run
python -tt pipeline.py.
How This Differs from Production
In production, this solution would include:
- Error Handling: Retry logic for database failures (Chapter 40).
- Scalability: Connection pooling for PostgreSQL (Chapter 63).
- Security: Encrypted connections and PII masking (Chapter 65).
- Monitoring: Observability with Jaeger/Grafana (Chapter 66).
- Deployment: Kubernetes with Helm Charts (Chapter 64).
Implementation
# File: de-onboarding/pipeline.py
import pandas as pd
from typing import Dict, Any, Tuple
from models import Sale, Transaction
from sqlite_client import SQLiteClient
from postgres_client import PostgresClient
import yaml
import json
from datetime import date
import logging
# Configure logging
logging.basicConfig(level=logging.DEBUG) # Enable DEBUG level
logger = logging.getLogger(__name__)
def read_config(config_path: str) -> Dict[str, Any]:
"""Read YAML configuration."""
logger.info(f"Reading config: {config_path}")
with open(config_path, "r") as file:
config: Dict[str, Any] = yaml.safe_load(file)
return config
def load_and_validate_sales(csv_path: str) -> Tuple[pd.DataFrame, int, int]:
"""Load and validate sales CSV with Pydantic."""
logger.info(f"Loading sales CSV: {csv_path}")
df = pd.read_csv(csv_path)
valid_sales: int = 0
valid_records = []
for _, row in df.iterrows():
try:
sale = Sale(
product=row["product"] if pd.notna(row["product"]) else "",
price=float(row["price"]) if pd.notna(row["price"]) else 0.0,
quantity=int(row["quantity"]) if pd.notna(row["quantity"]) else 0
)
valid_records.append(row)
valid_sales += 1
logger.debug(f"Validated sale: {row.to_dict()}")
except Exception as e:
logger.warning(f"Invalid sale: {row.to_dict()}, error: {str(e)}")
valid_df = pd.DataFrame(valid_records)
return valid_df, valid_sales, len(df)
def load_and_validate_transactions(csv_path: str) -> Tuple[pd.DataFrame, int, int]:
"""Load and validate transactions CSV with Pydantic."""
logger.info(f"Loading transactions CSV: {csv_path}")
df = pd.read_csv(csv_path)
valid_transactions: int = 0
valid_records = []
for _, row in df.iterrows():
try:
trans = Transaction(
transaction_id=row["transaction_id"] if pd.notna(row["transaction_id"]) else "",
product=row["product"] if pd.notna(row["product"]) else None,
price=float(row["price"]) if pd.notna(row["price"]) else 0.0,
quantity=int(row["quantity"]) if pd.notna(row["quantity"]) else 0,
date=date.fromisoformat(row["date"]) if pd.notna(row["date"]) else date.today()
)
valid_records.append(row)
valid_transactions += 1
logger.debug(f"Validated transaction: {row.to_dict()}")
except Exception as e:
logger.warning(f"Invalid transaction: {row.to_dict()}, error: {str(e)}")
valid_df = pd.DataFrame(valid_records)
return valid_df, valid_transactions, len(df)
def process_pipeline(
sales_csv: str,
transactions_csv: str,
config: Dict[str, Any],
sqlite_db: str,
pg_host: str,
pg_db: str,
pg_user: str,
pg_password: str
) -> Dict[str, Any]:
"""Process sales and transactions in a type-safe pipeline."""
# Initialize clients
sqlite_client = SQLiteClient(sqlite_db)
pg_client = PostgresClient(pg_host, pg_db, pg_user, pg_password)
# Process sales
sales_df, valid_sales, total_sales = load_and_validate_sales(sales_csv)
for _, row in sales_df.iterrows():
try:
sale = Sale(
product=row["product"],
price=float(row["price"]),
quantity=int(row["quantity"])
)
sqlite_client.insert_sale(sale)
except Exception as e:
logger.error(f"Failed to insert sale: {row.to_dict()}, error: {str(e)}")
# Process transactions
trans_df, valid_trans, total_trans = load_and_validate_transactions(transactions_csv)
for _, row in trans_df.iterrows():
try:
trans = Transaction(
transaction_id=row["transaction_id"],
product=row["product"] if pd.notna(row["product"]) else None,
price=float(row["price"]),
quantity=int(row["quantity"]),
date=date.fromisoformat(row["date"])
)
pg_client.insert_transaction(trans)
except Exception as e:
logger.error(f"Failed to insert transaction: {row.to_dict()}, error: {str(e)}")
# Compute metrics
sales_metrics = {
"total_sales_amount": float(sales_df["price"] * sales_df["quantity"]).sum() if not sales_df.empty else 0.0,
"valid_sales": valid_sales,
"total_sales_records": total_sales
}
trans_metrics = {
"total_transaction_amount": float(trans_df["price"] * trans_df["quantity"]).sum() if not trans_df.empty else 0.0,
"valid_transactions": valid_trans,
"total_transaction_records": total_trans
}
# Close connections
sqlite_client.close()
pg_client.close()
return {
"sales_metrics": sales_metrics,
"transaction_metrics": trans_metrics
}
def export_results(results: Dict[str, Any], json_path: str) -> None:
"""Export results to JSON."""
logger.info(f"Exporting results to {json_path}")
with open(json_path, "w") as file:
json.dump(results, file, indent=2, default=str)
def main() -> None:
"""Main function to run the pipeline."""
config_path: str = "data/config.yaml"
sales_csv: str = "data/sales.csv"
transactions_csv: str = "data/transactions.csv"
json_path: str = "data/pipeline_results.json"
sqlite_db: str = "data/sales.db"
pg_host: str = "localhost"
pg_db: str = "postgres"
pg_user: str = "postgres"
pg_password: str = "postgres"
config = read_config(config_path)
results = process_pipeline(
sales_csv, transactions_csv, config, sqlite_db,
pg_host, pg_db, pg_user, pg_password
)
export_results(results, json_path)
logger.info("Pipeline completed")
if __name__ == "__main__":
main()Test Implementation
# File: de-onboarding/tests/test_pipeline.py
import pytest
import pandas as pd
from models import Sale, Transaction
from sqlite_client import SQLiteClient
from postgres_client import PostgresClient
from pipeline import load_and_validate_sales, load_and_validate_transactions
from datetime import date
from typing import Tuple
@pytest.fixture
def sample_sales_df() -> pd.DataFrame:
"""Fixture for sample sales data."""
return pd.DataFrame({
"product": ["Halal Laptop", "Halal Mouse", ""],
"price": [999.99, 24.99, 29.99],
"quantity": [2, 10, 3]
})
@pytest.fixture
def sample_transactions_df() -> pd.DataFrame:
"""Fixture for sample transactions data."""
return pd.DataFrame({
"transaction_id": ["T001", "T002", "T003"],
"product": ["Halal Laptop", "Halal Mouse", None],
"price": [999.99, 24.99, 29.99],
"quantity": [2, 10, 3],
"date": ["2023-10-01", "2023-10-02", "2023-10-03"]
})
@pytest.fixture
def empty_sales_df() -> pd.DataFrame:
"""Fixture for empty sales data."""
return pd.DataFrame(columns=["product", "price", "quantity"])
@pytest.fixture
def invalid_headers_df() -> pd.DataFrame:
"""Fixture for sales data with invalid headers."""
return pd.DataFrame({
"name": ["Halal Laptop"],
"price": [999.99],
"quantity": [2]
})
def test_load_and_validate_sales(sample_sales_df: pd.DataFrame) -> None:
"""Test sales validation."""
df, valid_sales, total_records = load_and_validate_sales("data/sales.csv")
assert valid_sales <= total_records
assert isinstance(df, pd.DataFrame)
def test_load_and_validate_transactions(sample_transactions_df: pd.DataFrame) -> None:
"""Test transactions validation."""
df, valid_trans, total_records = load_and_validate_transactions("data/transactions.csv")
assert valid_trans <= total_records
assert isinstance(df, pd.DataFrame)
def test_empty_csv_sales(empty_sales_df: pd.DataFrame) -> None:
"""Test handling of empty sales CSV."""
empty_sales_df.to_csv("data/temp_empty.csv", index=False)
df, valid_sales, total_records = load_and_validate_sales("data/temp_empty.csv")
assert valid_sales == 0
assert total_records == 0
assert df.empty
def test_invalid_headers_sales(invalid_headers_df: pd.DataFrame) -> None:
"""Test handling of sales CSV with invalid headers."""
invalid_headers_df.to_csv("data/temp_invalid.csv", index=False)
df, valid_sales, total_records = load_and_validate_sales("data/temp_invalid.csv")
assert valid_sales == 0
assert total_records == 1
assert df.empty
def test_invalid_db_path_sqlite() -> None:
"""Test handling of invalid SQLite database path."""
with pytest.raises(sqlite3.OperationalError):
client = SQLiteClient("data/invalid.db")
client.close()
def test_sqlite_insert() -> None:
"""Test SQLite insert."""
client = SQLiteClient("data/sales.db")
sale = Sale(product="Test Product", price=100.0, quantity=5)
client.insert_sale(sale)
sales = client.fetch_sales()
client.close()
assert any(s["product"] == "Test Product" for s in sales)
def test_postgres_insert() -> None:
"""Test PostgreSQL insert."""
client = PostgresClient("localhost", "postgres", "postgres", "postgres")
trans = Transaction(
transaction_id="TEST001",
product="Test Product",
price=100.0,
quantity=5,
date=date(2023, 10, 1)
)
client.insert_transaction(trans)
transactions = client.fetch_transactions()
client.close()
assert any(t["transaction_id"] == "TEST001" for t in transactions)Expected Outputs
data/pipeline_results.json:
{
"sales_metrics": {
"total_sales_amount": 2499.83,
"valid_sales": 3,
"total_sales_records": 6
},
"transaction_metrics": {
"total_transaction_amount": 2499.83,
"valid_transactions": 3,
"total_transaction_records": 5
}
}Console Output (abridged):
INFO:__main__:Reading config: data/config.yaml
INFO:__main__:Loading sales CSV: data/sales.csv
DEBUG:__main__:Validated sale: {'product': 'Halal Laptop', 'price': 999.99, 'quantity': 2}
WARNING:__main__:Invalid sale: {...}, error: ...
INFO:__main__:Loading transactions CSV: data/transactions.csv
DEBUG:__main__:Validated transaction: {'transaction_id': 'T001', 'product': 'Halal Laptop', ...}
INFO:sqlite_client:Connected to SQLite: data/sales.db
INFO:postgres_client:Connected to PostgreSQL: postgres
INFO:__main__:Exporting results to data/pipeline_results.json
INFO:__main__:Pipeline completedHow to Run and Test
Setup:
- Create
de-onboarding/data/and populate withsales.csv,transactions.csv,sales.db,config.yamlper Appendix 1. - Install libraries:
pip install pandas pydantic psycopg2-binary pyyaml pytest pyright. - Set up PostgreSQL (e.g., Docker:
docker run -p 5432:5432 -e POSTGRES_PASSWORD=postgres postgres). If Docker fails (e.g., port 5432 in use), rundocker psto check running containers and usedocker stop <container_id>to free the port. - Save
models.py,sqlite_client.py,postgres_client.py,pipeline.py, andtests/test_pipeline.py. - Configure editor for 4-space indentation per PEP 8 (VS Code: “Editor: Tab Size” = 4, “Editor: Insert Spaces” = true, “Editor: Detect Indentation” = false).
- Create
Run:
- Run:
python pipeline.py. - Outputs:
data/pipeline_results.json, database updates, logs. - To reduce log verbosity, change
logging.basicConfig(level=logging.INFO)inpipeline.py.
- Run:
Test:
- Run:
pytest tests/test_pipeline.py -v. - Verify all tests pass.
- Ensure Pyright is installed (
pip install pyright) and runpyright pipeline.pyto catch type errors (e.g., incorrectstrvs.int) before executing the pipeline. If errors occur, create apyproject.tomlwith[tool.pyright]inde-onboarding/. - Tests create temporary files in
data/(temp_empty.csv,temp_invalid.csv). Remove them withrm data/temp_*.csv(Unix/macOS) ordel data\temp_*.csv(Windows) after testing. - Troubleshooting:
- Database Errors: Check connections with
sqlite3andpsql. - Type Errors: Run
pyright pipeline.py. - IndentationError: Run
python -tt pipeline.py.
- Database Errors: Check connections with
- Run:
23.5 Practice Exercises
Exercise 1: Pydantic Model Validation
Write a function to validate a sale using Pydantic, with 4-space indentation.
Sample Input:
sale_data = {"product": "Halal Laptop", "price": 999.99, "quantity": 2}Expected Output:
Sale(product='Halal Laptop', price=999.99, quantity=2)Instructions:
- Save as
de-onboarding/ex1_pydantic.py. - Run:
python ex1_pydantic.pyto test your solution. - Test with invalid data (e.g.,
price=-1).
Exercise 2: SQLite Insert
Write a function to insert a sale into SQLite with type safety, with 4-space indentation.
Sample Input:
sale = Sale(product="Halal Mouse", price=24.99, quantity=10)Expected Output:
Inserted sale: Halal MouseInstructions:
- Save as
de-onboarding/ex2_sqlite.py. - Ensure
sales.dbexists. - Run:
python ex2_sqlite.pyto test your solution.
Exercise 3: PostgreSQL Insert
Write a function to insert a transaction into PostgreSQL with type safety, with 4-space indentation.
Sample Input:
trans = Transaction(transaction_id="T002", product="Halal Mouse", price=24.99, quantity=10, date=date(2023, 10, 2))Expected Output:
Inserted transaction: T002Instructions:
- Save as
de-onboarding/ex3_postgres.py. - Ensure PostgreSQL is running.
- Run:
python ex3_postgres.pyto test your solution.
Exercise 4: Pipeline Metrics
Write a function to compute metrics from a validated DataFrame, with 4-space indentation.
Sample Input:
df = pd.DataFrame({
"product": ["Halal Laptop", "Halal Mouse"],
"price": [999.99, 24.99],
"quantity": [2, 10]
})Expected Output:
{'total_amount': 2249.88, 'record_count': 2}Instructions:
- Save as
de-onboarding/ex4_metrics.py. - Run:
python ex4_metrics.pyto test your solution.
Exercise 5: Debug a Type Error
Fix a buggy PostgreSQL insert with incorrect type annotations, ensuring 4-space indentation.
Buggy Code:
def insert_transaction(client: PostgresClient, trans: Dict[str, Any]) -> None: # Wrong type
client.insert_transaction(trans)Expected Fix:
Use Transaction type for trans.
Instructions:
- Save as
de-onboarding/ex5_debug.py. - Run:
pyright ex5_debug.pyto verify fix. - Run:
python ex5_debug.pyto test your solution.
Exercise 6: Conceptual Analysis of PostgreSQL’s MVCC and SQLite’s Limitations
Answer the following questions to explain PostgreSQL’s concurrency advantages and SQLite’s limitations for Hijra Group’s data pipelines:
- How does PostgreSQL’s Multi-Version Concurrency Control (MVCC) support Hijra Group’s concurrent transaction processing compared to SQLite’s limitations?
- How might SQLite’s concurrency limitations impact testing a pipeline with concurrent users?
Save your answers in a text file.
Expected Output (in de-onboarding/ex6_concepts.txt):
1. PostgreSQL’s MVCC allows multiple transactions to access data concurrently by maintaining multiple versions of rows, ensuring consistency and scalability for Hijra Group’s high-volume transaction processing (e.g., millions of records). This supports simultaneous reads and writes, critical for real-time fintech analytics, as introduced in Chapter 16. SQLite, with its single-file, serverless design, lacks robust concurrency, making it suitable for prototyping small datasets like sales.csv but inadequate for production-scale concurrency.
2. SQLite’s concurrency limitations, as discussed in Chapter 12, restrict it to single-writer operations, causing delays or failures when testing pipelines with concurrent users (e.g., multiple processes inserting sales data). This makes SQLite less effective for simulating Hijra Group’s production environment with concurrent transaction processing.Instructions:
- Save answers as
de-onboarding/ex6_concepts.txt. - Use concepts from Chapters 12–16, 21–22 (e.g., MVCC from Chapter 16, SQLite basics from Chapter 12).
- Verify content reflects trade-offs without introducing new concepts.
- Run:
cat ex6_concepts.txt(Unix/macOS) ortype ex6_concepts.txt(Windows) to test your solution.
23.6 Exercise Solutions
Solution to Exercise 1
from models import Sale
def validate_sale(sale_data: Dict[str, Any]) -> Sale:
return Sale(**sale_data)
# Test
print(validate_sale({"product": "Halal Laptop", "price": 999.99, "quantity": 2}))Solution to Exercise 2
from sqlite_client import SQLiteClient
from models import Sale
def insert_sale(db_path: str, sale: Sale) -> None:
client = SQLiteClient(db_path)
client.insert_sale(sale)
client.close()
print(f"Inserted sale: {sale.product}")
# Test
sale = Sale(product="Halal Mouse", price=24.99, quantity=10)
insert_sale("data/sales.db", sale)Solution to Exercise 3
from postgres_client import PostgresClient
from models import Transaction
from datetime import date
def insert_transaction(trans: Transaction) -> None:
client = PostgresClient("localhost", "postgres", "postgres", "postgres")
client.insert_transaction(trans)
client.close()
print(f"Inserted transaction: {trans.transaction_id}")
# Test
trans = Transaction(transaction_id="T002", product="Halal Mouse", price=24.99, quantity=10, date=date(2023, 10, 2))
insert_transaction(trans)Solution to Exercise 4
import pandas as pd
from typing import Dict, Any
def compute_metrics(df: pd.DataFrame) -> Dict[str, Any]:
total_amount = float((df["price"] * df["quantity"]).sum()) if not df.empty else 0.0
return {"total_amount": total_amount, "record_count": len(df)}
# Test
df = pd.DataFrame({
"product": ["Halal Laptop", "Halal Mouse"],
"price": [999.99, 24.99],
"quantity": [2, 10]
})
print(compute_metrics(df))Solution to Exercise 5
from postgres_client import PostgresClient
from models import Transaction
def insert_transaction(client: PostgresClient, trans: Transaction) -> None:
client.insert_transaction(trans)Solution to Exercise 6
Content (save to de-onboarding/ex6_concepts.txt):
1. PostgreSQL’s MVCC allows multiple transactions to access data concurrently by maintaining multiple versions of rows, ensuring consistency and scalability for Hijra Group’s high-volume transaction processing (e.g., millions of records). This supports simultaneous reads and writes, critical for real-time fintech analytics, as introduced in Chapter 16. SQLite, with its single-file, serverless design, lacks robust concurrency, making it suitable for prototyping small datasets like sales.csv but inadequate for production-scale concurrency.
2. SQLite’s concurrency limitations, as discussed in Chapter 12, restrict it to single-writer operations, causing delays or failures when testing pipelines with concurrent users (e.g., multiple processes inserting sales data). This makes SQLite less effective for simulating Hijra Group’s production environment with concurrent transaction processing.23.7 Chapter Summary and Connection to Chapter 24
You’ve mastered:
- Pydantic Models: Type-safe validation for sales and transactions.
- SQLite/PostgreSQL Integration: Type-safe clients with
sqlite3andpsycopg2. - Pipeline: Unified processing with logging and testing.
- Testing:
pytestfor robust verification, including edge cases (e.g., invalid headers). - White-Space Sensitivity: 4-space indentation per PEP 8.
The micro-project built a type-safe pipeline, processing CSVs into databases with Pydantic validation, verified by Pyright and tested with pytest. Enhanced debugging tips, test commands, and a deeper conceptual exercise improve learner support, robustness, and fintech relevance. This prepares for Chapter 24: Checkpoint 3B, which consolidates database skills, extending type-safe pipelines to cloud analytics with BigQuery, maintaining 4-space indentation for maintainable code.