18 - Checkpoint 3A: Database Fundamentals I Review
Complexity: Easy (E)
18.0 Introduction: Why This Matters for Data Engineering
In data engineering, robust database skills are essential for managing and querying financial transaction data to support Hijra Group’s Sharia-compliant fintech analytics. This chapter consolidates foundational database concepts from Chapters 12–17, focusing on SQL with SQLite, Python-SQLite integration, PostgreSQL basics, schema design, and type-safe programming. By mastering these skills, you ensure data integrity and efficient querying, critical for building scalable pipelines. For example, a well-designed sales database enables rapid retrieval of transaction metrics, supporting real-time analytics for Hijra Group’s stakeholders.
This checkpoint reviews SQL querying, database integration, and schema design using data/sales.db and data/sales.csv from Appendix 1, ensuring you can create normalized schemas, query data, and integrate databases with Python. All code uses PEP 8’s 4-space indentation, preferring spaces over tabs to avoid IndentationError, and avoids advanced concepts like window functions (Chapter 21) or concurrency (Chapter 40). The micro-project builds a type-annotated database tool integrating SQLite and PostgreSQL, preparing you for advanced querying and optimization in Phase 3B.
Data Engineering Workflow Context
This diagram illustrates how database skills fit into a data engineering pipeline:
flowchart TD
A["Raw Data (CSV)"] --> B["Python Scripts with SQLite/PostgreSQL"]
B --> C{"Database Operations"}
C -->|Load/Validate| D["Database Tables"]
C -->|Query| E["Aggregated Metrics"]
D --> F["Output (CSV/JSON)"]
E --> F
F --> G["Storage/Analysis"]
classDef data fill:#f9f9f9,stroke:#333,stroke-width:2px
classDef process fill:#d0e0ff,stroke:#336,stroke-width:1px
classDef storage fill:#ddffdd,stroke:#363,stroke-width:1px
class A,D,E,F data
class B,C process
class G storageBuilding On and Preparing For
- Building On:
- Chapter 12: SQL fundamentals with SQLite for querying
sales.db. - Chapter 13: Python-SQLite integration with
sqlite3andPyYAML. - Chapter 14: Advanced SQLite operations like transactions and views.
- Chapter 15: Type-safe SQLite programming with Pydantic.
- Chapter 16: PostgreSQL fundamentals with
psycopg2. - Chapter 17: Python-PostgreSQL integration with type annotations.
- Chapter 12: SQL fundamentals with SQLite for querying
- Preparing For:
- Chapter 19: Advanced SQL querying for complex analytics.
- Chapter 20: SQLite indexing and optimization for performance.
- Chapter 23: Type-safe integration of SQLite and PostgreSQL.
- Chapter 24: Consolidates database skills for cloud analytics.
What You’ll Learn
This chapter covers:
- SQL Querying: Writing SQL queries for SQLite and PostgreSQL.
- Schema Design: Creating normalized schemas with entity-relationship (ER) diagrams.
- Python Integration: Loading and querying data with
sqlite3andpsycopg2. - Type Safety: Using Pydantic for validation and type annotations.
- Testing: Writing
pytesttests for database operations.
By the end, you’ll build a type-annotated database tool that integrates SQLite and PostgreSQL, processes data/sales.csv, and queries data/sales.db, producing a JSON report and ensuring data integrity. All code adheres to 4-space indentation per PEP 8.
Follow-Along Tips:
- Create
de-onboarding/data/and populate withsales.csv,config.yaml, andsales.dbper Appendix 1. - Install libraries:
pip install pyyaml sqlite3 psycopg2-binary pydantic pytest. - Configure editor for 4-space indentation (VS Code: “Editor: Tab Size” = 4, “Editor: Insert Spaces” = true, “Editor: Detect Indentation” = false).
- Use print statements (e.g.,
print(cursor.fetchall())) to debug queries. - Verify file paths with
ls data/(Unix/macOS) ordir data\(Windows). - Use UTF-8 encoding to avoid
UnicodeDecodeError.
18.1 Core Concepts: Database Fundamentals Review
18.1.1 SQL Querying with SQLite and PostgreSQL
SQL enables efficient data retrieval from relational databases. SQLite (lightweight, file-based) and PostgreSQL (enterprise-grade, server-based) support CRUD operations (Create, Read, Update, Delete). For example, querying sales data involves:
SELECT product, SUM(price * quantity) AS total_sales
FROM sales
WHERE product LIKE 'Halal%'
GROUP BY product;Key Points:
- Time Complexity: O(n) for scanning n rows; indexing (Chapter 20) reduces to O(log n).
- Space Complexity: O(k) for k result rows.
- Implementation: SQLite stores data in a single file (
sales.db), while PostgreSQL uses a client-server model.
18.1.2 Schema Design and Normalization
Normalization reduces data redundancy using entity-relationship (ER) diagrams. For sales data, a normalized schema separates products and sales:
erDiagram
PRODUCTS ||--o{ SALES : contains
PRODUCTS {
text product_id PK
text name
real price
}
SALES {
integer sale_id PK
text product_id FK
integer quantity
date sale_date
}Key Points:
- 1NF: No repeating groups (e.g., one row per sale).
- 2NF: No partial dependencies (e.g., product details in
PRODUCTS). - 3NF: No transitive dependencies (e.g., price tied to product).
- Time Complexity: O(1) for normalized lookups with indexes; O(1) for creating tables, as it involves fixed DDL statements.
- Space Complexity: O(n + m) for n products and m sales; O(1) for schema metadata, excluding data.
- Performance Implication: Normalized schemas reduce storage but may slow real-time queries due to joins. For Hijra Group’s high-volume transactions, indexing (Chapter 20) mitigates this.
- Debugging Tip: Use tools like DBeaver or pgAdmin to visualize the ER diagram for
sales.db. To set up DBeaver, install it (dbeaver.io), connect tosales.dbvia SQLite driver, and use the ER Diagram view to visualizePRODUCTSandSALESrelationships. Visualizing in DBeaver showsPRODUCTSwith a one-to-many relationship toSALES, confirming the foreign key (see DBeaver Documentation).
18.1.3 Python Integration
Python integrates with databases using sqlite3 (built-in) and psycopg2 (PostgreSQL). Example for SQLite:
import sqlite3
conn = sqlite3.connect("data/sales.db")
cursor = conn.cursor()
cursor.execute("SELECT * FROM sales WHERE quantity > 5")
results = cursor.fetchall()
print(results) # Debug
conn.close()Key Points:
- Time Complexity: O(n) for querying n rows.
- Space Complexity: O(k) for k result rows.
- Implementation:
sqlite3uses file-based connections;psycopg2connects to a PostgreSQL server.
18.1.4 Type-Safe Programming
Pydantic ensures type safety for database inputs. Example:
from pydantic import BaseModel
class Sale(BaseModel):
product: str
price: float
quantity: int
sale = Sale(product="Halal Laptop", price=999.99, quantity=2)
print(sale) # DebugKey Points:
- Validates types at runtime, preventing invalid data.
- Time Complexity: O(1) for validation.
- Space Complexity: O(1) per object.
18.2 Micro-Project: Type-Safe Database Tool
Project Requirements
Build a type-annotated database tool that integrates SQLite and PostgreSQL, processing data/sales.csv and querying data/sales.db for Hijra Group’s analytics. The tool must handle Hijra Group’s high transaction volumes, ensuring data integrity through normalization and type-safe validation, preparing for scalability in Chapter 40. It creates a normalized schema, loads data, queries metrics, and exports results to data/sales_report.json, with pytest tests for reliability.
- Load
data/sales.csvandconfig.yaml. - Create a normalized SQLite schema in
data/sales.db. - Load data into SQLite with type-safe validation.
- Query total sales and top products in SQLite.
- Replicate schema in PostgreSQL and query.
- Export results to
data/sales_report.json. - Write
pytesttests for database operations. - Use 4-space indentation per PEP 8.
- Test edge cases with
empty.csv,invalid.csv,malformed.csv,negative.csv.
Sample Input Files
data/sales.csv (Appendix 1):
product,price,quantity
Halal Laptop,999.99,2
Halal Mouse,24.99,10
Halal Keyboard,49.99,5
,29.99,3
Monitor,invalid,2
Headphones,5.00,150data/config.yaml (Appendix 1):
min_price: 10.0
max_quantity: 100
required_fields:
- product
- price
- quantity
product_prefix: 'Halal'
max_decimals: 2data/sales.db (Appendix 1):
CREATE TABLE sales (
product TEXT,
price REAL,
quantity INTEGER
);
INSERT INTO sales (product, price, quantity) VALUES
('Halal Laptop', 999.99, 2),
('Halal Mouse', 24.99, 10),
('Halal Keyboard', 49.99, 5);Data Processing Flow
flowchart TD
A["Input CSV
sales.csv"] --> B["Load CSV
pandas.read_csv"]
B --> C["Pandas DataFrame"]
C --> D["Read YAML
config.yaml"]
D --> E["Validate DataFrame
Pydantic"]
E -->|Invalid| F["Log Warning
Skip Record"]
E -->|Valid| G["Load to SQLite
sqlite3"]
G --> H["Create PostgreSQL Schema
psycopg2"]
H --> I["Query Metrics
SQLite/PostgreSQL"]
I --> J["Export JSON
sales_report.json"]
F --> K["End Processing"]
J --> K
classDef data fill:#f9f9f9,stroke:#333,stroke-width:2px
classDef process fill:#d0e0ff,stroke:#336,stroke-width:1px
classDef error fill:#ffdddd,stroke:#933,stroke-width:1px
classDef endpoint fill:#ddffdd,stroke:#363,stroke-width:1px
class A,C data
class B,D,E,G,H,I,J process
class F error
class K endpointAcceptance Criteria
- Go Criteria:
- Loads
sales.csvandconfig.yamlcorrectly. - Creates normalized SQLite schema with
PRODUCTSandSALEStables. - Validates and loads data with Pydantic.
- Queries total sales and top products in SQLite and PostgreSQL.
- Exports results to
data/sales_report.json. - Includes
pytesttests for schema, data loading, and queries. - Uses 4-space indentation per PEP 8.
- Passes edge case tests with
empty.csv,invalid.csv,malformed.csv,negative.csv.
- Loads
- No-Go Criteria:
- Fails to load files or create schema.
- Incorrect validation or query results.
- Missing JSON export or tests.
- Inconsistent indentation or tab/space mixing.
Common Pitfalls to Avoid
- Database Connection Errors:
- Problem:
sqlite3.OperationalErrororpsycopg2.OperationalErrordue to missingsales.dbor PostgreSQL server not running. - Solution: Verify
sales.dbexists withls data/sales.db. For PostgreSQL, print connection parameters withprint(conn_params)and ensure the server is running (pg_ctl status).
- Problem:
- Schema Mismatches:
- Problem:
KeyErrordue to missing columns insales.csv. - Solution: Print schema with
sqlite3 data/sales.db ".schema"orpsql sales_db -c "\d products". Check CSV columns withprint(df.columns).
- Problem:
- Type Validation Errors:
- Problem: Pydantic raises
ValidationErrorfor non-numeric prices or quantities. - Solution: Print
df.dtypesand inspect invalid rows withprint(row.to_dict())before validation. Ensureutils.is_numeric_valueis applied correctly.
- Problem: Pydantic raises
- Query Errors:
- Problem: Incorrect query results due to join mismatches.
- Solution: Print query results with
print(cursor.fetchall()). Verify table data withSELECT * FROM products LIMIT 5.
- IndentationError:
- Problem: Mixed spaces/tabs in Python code.
- Solution: Use 4 spaces per PEP 8. Run
python -tt database_tool.pyto detect issues. Configure editor to replace tabs with spaces.
How This Differs from Production
In production, this solution would include:
- Error Handling: Try/except for robust errors (Chapter 7).
- Scalability: Chunked loading for large datasets (Chapter 40).
- Security: Encrypted connections and PII masking (Chapter 65).
- Monitoring: Logging and observability (Chapter 66).
Implementation
# File: de-onboarding/utils.py
from pydantic import BaseModel, ValidationError, validator
def is_numeric(s, max_decimals=2):
"""Check if string is a decimal number with up to max_decimals."""
parts = s.split(".")
if len(parts) != 2 or not parts[0].replace("-", "").isdigit() or not parts[1].isdigit():
return False
return len(parts[1]) <= max_decimals
def clean_string(s):
"""Strip whitespace from string."""
return s.strip() if isinstance(s, str) else s
def is_numeric_value(x):
"""Check if value is numeric."""
return isinstance(x, (int, float))
def is_integer(x):
"""Check if value is an integer."""
return isinstance(x, int) or (isinstance(x, str) and x.isdigit())
def apply_valid_decimals(x, max_decimals):
"""Apply decimal validation."""
return is_numeric(str(x), max_decimals)
class Sale(BaseModel):
product: str
price: float
quantity: int
@validator("product")
def check_prefix(cls, v):
if not v.startswith("Halal"):
raise ValueError("Product must start with 'Halal'")
return v
def validate_sale(sale: dict, config: dict) -> bool:
"""Validate sale based on config rules."""
required_fields = config["required_fields"]
min_price = config["min_price"]
max_quantity = config["max_quantity"]
prefix = config["product_prefix"]
max_decimals = config["max_decimals"]
print(f"Validating sale: {sale}")
for field in required_fields:
if field not in sale or not sale[field]:
print(f"Invalid sale: missing {field}: {sale}")
return False
try:
validated_sale = Sale(**sale)
if validated_sale.price < min_price or validated_sale.price <= 0:
print(f"Invalid sale: invalid price: {sale}")
return False
if validated_sale.quantity > max_quantity:
print(f"Invalid sale: invalid quantity: {sale}")
return False
if not apply_valid_decimals(validated_sale.price, max_decimals):
print(f"Invalid sale: too many decimals: {sale}")
return False
return True
except ValidationError as e:
print(f"Invalid sale: validation error: {e}")
return False# File: de-onboarding/database_tool.py
from typing import Dict, List, Tuple
import pandas as pd
import yaml
import json
import sqlite3
import psycopg2
from pydantic import ValidationError
import utils
def read_config(config_path: str) -> Dict:
"""Read YAML configuration."""
print(f"Opening config: {config_path}")
with open(config_path, "r") as file:
config = yaml.safe_load(file)
print(f"Loaded config: {config}")
return config
def create_sqlite_schema(db_path: str) -> sqlite3.Connection:
"""Create normalized SQLite schema."""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.executescript("""
DROP TABLE IF EXISTS products;
DROP TABLE IF EXISTS sales;
CREATE TABLE products (
product_id TEXT PRIMARY KEY,
name TEXT NOT NULL,
price REAL NOT NULL
);
CREATE TABLE sales (
sale_id INTEGER PRIMARY KEY AUTOINCREMENT,
product_id TEXT,
quantity INTEGER NOT NULL,
sale_date TEXT DEFAULT (date('now')),
FOREIGN KEY (product_id) REFERENCES products (product_id)
);
""")
conn.commit()
print("Created SQLite schema")
return conn
def create_postgresql_schema(conn_params: Dict) -> psycopg2.extensions.connection:
"""Create normalized PostgreSQL schema."""
conn = psycopg2.connect(**conn_params)
cursor = conn.cursor()
cursor.execute("""
DROP TABLE IF EXISTS sales;
DROP TABLE IF EXISTS products;
CREATE TABLE products (
product_id TEXT PRIMARY KEY,
name TEXT NOT NULL,
price REAL NOT NULL
);
CREATE TABLE sales (
sale_id SERIAL PRIMARY KEY,
product_id TEXT,
quantity INTEGER NOT NULL,
sale_date DATE DEFAULT CURRENT_DATE,
FOREIGN KEY (product_id) REFERENCES products (product_id)
);
""")
conn.commit()
print("Created PostgreSQL schema")
return conn
def load_and_validate_sales(csv_path: str, config: Dict) -> Tuple[pd.DataFrame, int, int]:
"""Load and validate sales CSV."""
print(f"Loading CSV: {csv_path}")
try:
df = pd.read_csv(csv_path)
except FileNotFoundError:
print(f"File not found: {csv_path}")
return pd.DataFrame(), 0, 0
print("Initial DataFrame:")
print(df.head())
required_fields = config["required_fields"]
missing_fields = [f for f in required_fields if f not in df.columns]
if missing_fields:
print(f"Missing columns: {missing_fields}")
return pd.DataFrame(), 0, len(df)
valid_rows = []
for _, row in df.iterrows():
sale = {k: utils.clean_string(v) for k, v in row.to_dict().items()}
if utils.validate_sale(sale, config):
valid_rows.append(sale)
valid_df = pd.DataFrame(valid_rows)
total_records = len(df)
valid_sales = len(valid_df)
print("Validated DataFrame:")
print(valid_df)
return valid_df, valid_sales, total_records
def load_to_sqlite(df: pd.DataFrame, conn: sqlite3.Connection) -> None:
"""Load validated data to SQLite."""
if df.empty:
print("No valid data to load")
return
cursor = conn.cursor()
for _, row in df.iterrows():
product_id = row["product"].replace(" ", "_").lower()
cursor.execute(
"INSERT OR IGNORE INTO products (product_id, name, price) VALUES (?, ?, ?)",
(product_id, row["product"], row["price"])
)
cursor.execute(
"INSERT INTO sales (product_id, quantity) VALUES (?, ?)",
(product_id, row["quantity"])
)
conn.commit()
print("Loaded data to SQLite")
def load_to_postgresql(df: pd.DataFrame, conn: psycopg2.extensions.connection) -> None:
"""Load validated data to PostgreSQL."""
if df.empty:
print("No valid data to load")
return
cursor = conn.cursor()
for _, row in df.iterrows():
product_id = row["product"].replace(" ", "_").lower()
cursor.execute(
"INSERT INTO products (product_id, name, price) VALUES (%s, %s, %s) ON CONFLICT DO NOTHING",
(product_id, row["product"], row["price"])
)
cursor.execute(
"INSERT INTO sales (product_id, quantity) VALUES (%s, %s)",
(product_id, row["quantity"])
)
conn.commit()
print("Loaded data to PostgreSQL")
def query_sales_metrics(conn: sqlite3.Connection | psycopg2.extensions.connection, is_sqlite: bool = True) -> Dict:
"""Query total sales and top products."""
cursor = conn.cursor()
query = """
SELECT p.name, SUM(p.price * s.quantity) AS total_sales
FROM products p
JOIN sales s ON p.product_id = s.product_id
GROUP BY p.name
ORDER BY total_sales DESC
"""
cursor.execute(query)
results = cursor.fetchall()
print("Query Results:", results)
total_sales = sum(row[1] for row in results)
top_products = {row[0]: float(row[1]) for row in results[:3]}
unique_products = [row[0] for row in results]
return {
"total_sales": float(total_sales),
"unique_products": unique_products,
"top_products": top_products
}
def export_results(results: Dict, json_path: str) -> None:
"""Export results to JSON."""
print(f"Writing to: {json_path}")
with open(json_path, "w") as file:
json.dump(results, file, indent=2)
print(f"Exported results to {json_path}")
def main() -> None:
"""Main function to process sales data."""
csv_path = "data/sales.csv"
config_path = "data/config.yaml"
db_path = "data/sales.db"
json_path = "data/sales_report.json"
pg_conn_params = {
"dbname": "sales_db",
"user": "postgres",
"password": "password",
"host": "localhost",
"port": "5432"
}
config = read_config(config_path)
df, valid_sales, total_records = load_and_validate_sales(csv_path, config)
sqlite_conn = create_sqlite_schema(db_path)
load_to_sqlite(df, sqlite_conn)
sqlite_results = query_sales_metrics(sqlite_conn, is_sqlite=True)
sqlite_conn.close()
pg_conn = create_postgresql_schema(pg_conn_params)
load_to_postgresql(df, pg_conn)
pg_results = query_sales_metrics(pg_conn, is_sqlite=False)
pg_conn.close()
combined_results = {
"sqlite_results": sqlite_results,
"postgresql_results": pg_results,
"valid_sales": valid_sales,
"total_records": total_records,
"invalid_sales": total_records - valid_sales
}
export_results(combined_results, json_path)
print("\nSales Report:")
print(f"Total Records Processed: {total_records}")
print(f"Valid Sales: {valid_sales}")
print(f"Invalid Sales: {total_records - valid_sales}")
print(f"SQLite Total Sales: ${round(sqlite_results['total_sales'], 2)}")
print(f"PostgreSQL Total Sales: ${round(pg_results['total_sales'], 2)}")
print(f"Unique Products: {sqlite_results['unique_products']}")
print(f"Top Products: {sqlite_results['top_products']}")
print("Processing completed")
if __name__ == "__main__":
main()# File: de-onboarding/test_database_tool.py
import pytest
import sqlite3
import psycopg2
import pandas as pd
from database_tool import (
read_config, create_sqlite_schema, create_postgresql_schema,
load_and_validate_sales, load_to_sqlite, load_to_postgresql,
query_sales_metrics, export_results
)
@pytest.fixture
def config():
return read_config("data/config.yaml")
@pytest.fixture
def sqlite_conn():
conn = sqlite3.connect(":memory:")
yield conn
conn.close()
@pytest.fixture
def pg_conn():
conn = psycopg2.connect(
dbname="sales_db", user="postgres", password="password",
host="localhost", port="5432"
)
yield conn
conn.close()
def test_create_sqlite_schema(sqlite_conn):
conn = create_sqlite_schema(":memory:")
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
tables = [row[0] for row in cursor.fetchall()]
assert "products" in tables
assert "sales" in tables
conn.close()
def test_load_and_validate_sales(config):
df, valid_sales, total_records = load_and_validate_sales("data/sales.csv", config)
assert valid_sales == 3
assert total_records == 6
assert len(df) == 3
assert all(df["product"].str.startswith("Halal"))
def test_load_to_sqlite(sqlite_conn, config):
df, _, _ = load_and_validate_sales("data/sales.csv", config)
load_to_sqlite(df, sqlite_conn)
cursor = sqlite_conn.cursor()
cursor.execute("SELECT COUNT(*) FROM products")
assert cursor.fetchone()[0] == 3
cursor.execute("SELECT COUNT(*) FROM sales")
assert cursor.fetchone()[0] == 3
def test_load_to_postgresql(pg_conn, config):
df, _, _ = load_and_validate_sales("data/sales.csv", config)
load_to_postgresql(df, pg_conn)
cursor = pg_conn.cursor()
cursor.execute("SELECT COUNT(*) FROM products")
assert cursor.fetchone()[0] == 3
cursor.execute("SELECT COUNT(*) FROM sales")
assert cursor.fetchone()[0] == 3
results = query_sales_metrics(pg_conn, is_sqlite=False)
assert results["total_sales"] == 2499.83
assert len(results["unique_products"]) == 3
assert results["top_products"]["Halal Laptop"] == 1999.98
def test_query_sales_metrics(sqlite_conn, config):
df, _, _ = load_and_validate_sales("data/sales.csv", config)
load_to_sqlite(df, sqlite_conn)
results = query_sales_metrics(sqlite_conn, is_sqlite=True)
assert results["total_sales"] == 2499.83
assert len(results["unique_products"]) == 3
assert results["top_products"]["Halal Laptop"] == 1999.98
def test_empty_csv(config):
df, valid_sales, total_records = load_and_validate_sales("data/empty.csv", config)
assert df.empty
assert valid_sales == 0
assert total_records == 0Expected Outputs
data/sales_report.json:
{
"sqlite_results": {
"total_sales": 2499.83,
"unique_products": ["Halal Laptop", "Halal Mouse", "Halal Keyboard"],
"top_products": {
"Halal Laptop": 1999.98,
"Halal Mouse": 249.9,
"Halal Keyboard": 249.95
}
},
"postgresql_results": {
"total_sales": 2499.83,
"unique_products": ["Halal Laptop", "Halal Mouse", "Halal Keyboard"],
"top_products": {
"Halal Laptop": 1999.98,
"Halal Mouse": 249.9,
"Halal Keyboard": 249.95
}
},
"valid_sales": 3,
"total_records": 6,
"invalid_sales": 3
}Console Output (abridged):
Opening config: data/config.yaml
Loaded config: {'min_price': 10.0, 'max_quantity': 100, ...}
Loading CSV: data/sales.csv
Initial DataFrame:
product price quantity
0 Halal Laptop 999.99 2
...
Validated DataFrame:
product price quantity
0 Halal Laptop 999.99 2
1 Halal Mouse 24.99 10
2 Halal Keyboard 49.99 5
Created SQLite schema
Loaded data to SQLite
Created PostgreSQL schema
Loaded data to PostgreSQL
Query Results: [('Halal Laptop', 1999.98), ('Halal Mouse', 249.9), ('Halal Keyboard', 249.95)]
Writing to: data/sales_report.json
Exported results to data/sales_report.json
Sales Report:
Total Records Processed: 6
Valid Sales: 3
Invalid Sales: 3
SQLite Total Sales: $2499.83
PostgreSQL Total Sales: $2499.83
Unique Products: ['Halal Laptop', 'Halal Mouse', 'Halal Keyboard']
Top Products: {'Halal Laptop': 1999.98, 'Halal Mouse': 249.9, 'Halal Keyboard': 249.95}
Processing completedHow to Run and Test
Setup:
- Setup Checklist:
- Create
de-onboarding/data/directory and populate withsales.csv,config.yaml,sales.db,empty.csv,invalid.csv,malformed.csv,negative.csvper Appendix 1. - Install libraries:
pip install pyyaml sqlite3 psycopg2-binary pydantic pytest. - Set up PostgreSQL server:
- Install PostgreSQL (e.g.,
brew install postgresqlon macOS,sudo apt-get install postgresqlon Ubuntu; see PostgreSQL Installation Guide). - Start server:
pg_ctl startorsudo service postgresql start. - Create database:
createdb sales_db. - Set up user:
psql -c "CREATE USER postgres WITH PASSWORD 'password';". - Verify:
psql -d sales_db -U postgres -c "SELECT 1;".
- Install PostgreSQL (e.g.,
- Create virtual environment:
python -m venv venv, activate (Windows:venv\Scripts\activate, Unix:source venv/bin/activate). - Verify Python 3.10+:
python --version. - Configure editor for 4-space indentation per PEP 8 (VS Code: “Editor: Tab Size” = 4, “Editor: Insert Spaces” = true, “Editor: Detect Indentation” = false).
- Save
utils.py,database_tool.py, andtest_database_tool.pyinde-onboarding/.
- Create
- Troubleshooting:
- If
FileNotFoundErrororPermissionErroroccurs, check paths withls data/(Unix/macOS) ordir data\(Windows). - If
psycopg2.OperationalError, printprint(pg_conn_params)and verify PostgreSQL server withpg_ctl status. - If
IndentationError, use 4 spaces (not tabs). Runpython -tt database_tool.py. - If
UnicodeDecodeError, ensure UTF-8 encoding for all files. - If
yaml.YAMLError, check for missing colons, e.g.,min_price 10.0instead ofmin_price: 10.0, or incorrect indentation, e.g.,min_price: 10.0with extra spaces. Printprint(open(config_path).read())to inspect. Use an online YAML linter (e.g., yamllint.com) to validate.
- If
- Setup Checklist:
Run:
- Open terminal in
de-onboarding/. - Run:
python database_tool.py. - Outputs:
data/sales_report.json, console logs.
- Open terminal in
Test Scenarios:
- Valid Data:
config = read_config("data/config.yaml") df, valid_sales, total_records = load_and_validate_sales("data/sales.csv", config) sqlite_conn = create_sqlite_schema("data/sales.db") load_to_sqlite(df, sqlite_conn) results = query_sales_metrics(sqlite_conn, is_sqlite=True) print(results) # Expected: {'total_sales': 2499.83, 'unique_products': ['Halal Laptop', 'Halal Mouse', 'Halal Keyboard'], 'top_products': {'Halal Laptop': 1999.98, 'Halal Mouse': 249.9, 'Halal Keyboard': 249.95}} sqlite_conn.close() - Empty CSV:
config = read_config("data/config.yaml") df, valid_sales, total_records = load_and_validate_sales("data/empty.csv", config) print(df, valid_sales, total_records) # Expected: Empty DataFrame, 0, 0 - Invalid Headers:
config = read_config("data/config.yaml") df, valid_sales, total_records = load_and_validate_sales("data/invalid.csv", config) print(df, valid_sales, total_records) # Expected: Empty DataFrame, 0, 2 - Malformed Data:
config = read_config("data/config.yaml") df, valid_sales, total_records = load_and_validate_sales("data/malformed.csv", config) print(df) # Expected: DataFrame with only Halal Mouse row (price: 24.99, quantity: 10) - Negative Prices:
config = read_config("data/config.yaml") df, valid_sales, total_records = load_and_validate_sales("data/negative.csv", config) print(df) # Expected: DataFrame with only Halal Mouse row (price: 24.99, quantity: 10) - Run Tests:
- Run:
pytest test_database_tool.py -v. - Verify all tests pass.
- If
test_query_sales_metricsfails, check the error message, e.g.,AssertionError: 2499.83 != 5000.0, and printresultsto debug query output. To debug, addprint(results)intest_query_sales_metricsto inspect the query output, e.g.,{'total_sales': 5000.0, ...}. Compare with expected values.
- Run:
- Valid Data:
18.3 Practice Exercises
Exercise 1: SQLite Schema Creation
Write a function to create a normalized SQLite schema, with 4-space indentation per PEP 8.
Expected Output:
Created tables: ['products', 'sales']Follow-Along Instructions:
- Save as
de-onboarding/ex1_sqlite_schema.py. - Configure editor for 4-space indentation per PEP 8.
- Run:
python ex1_sqlite_schema.py. - How to Test:
- Add:
create_schema(":memory:"). - Verify output:
Created tables: ['products', 'sales']. - Check tables with:
sqlite3 :memory: ".tables". - Common Errors:
- OperationalError: Print
db_pathto verify database path. - IndentationError: Use 4 spaces (not tabs). Run
python -tt ex1_sqlite_schema.py.
- OperationalError: Print
- Add:
Exercise 2: PostgreSQL Query
Write a function to query total sales in PostgreSQL, with 4-space indentation per PEP 8.
Sample Input:
- PostgreSQL database
sales_dbwith populatedproductsandsalestables.
Expected Output:
Total Sales: 2499.83Follow-Along Instructions:
- Save as
de-onboarding/ex2_pg_query.py. - Ensure PostgreSQL server is running with
sales_db. - Configure editor for 4-space indentation per PEP 8.
- Run:
python ex2_pg_query.py. - How to Test:
- Add:
print(query_total_sales(pg_conn_params)). - Verify output:
Total Sales: 2499.83. - Test with empty tables: Should return
0.0. - Common Errors:
- OperationalError: Print
conn_paramsto check credentials. - IndentationError: Use 4 spaces (not tabs). Run
python -tt ex2_pg_query.py.
- OperationalError: Print
- Add:
Exercise 3: Type-Safe Validation
Write a function to validate sales data with Pydantic, ensuring products start with “Halal,” with 4-space indentation per PEP 8.
Sample Input:
sale = {"product": "Halal Laptop", "price": 999.99, "quantity": 2}Expected Output:
Validated sale: {'product': 'Halal Laptop', 'price': 999.99, 'quantity': 2}Follow-Along Instructions:
- Save as
de-onboarding/ex3_validation.py. - Configure editor for 4-space indentation per PEP 8.
- Run:
python ex3_validation.py. - How to Test:
- Add:
print(validate_sale({"product": "Halal Laptop", "price": 999.99, "quantity": 2})). - Verify output matches expected.
- Test with non-Halal product: Should return
False. - Common Errors:
- ValidationError: Print
saleto inspect input data. - IndentationError: Use 4 spaces (not tabs). Run
python -tt ex3_validation.py.
- ValidationError: Print
- Add:
Exercise 4: Debug SQL Query Error
Fix this buggy SQL query that produces incorrect sales totals due to a missing JOIN condition, ensuring 4-space indentation per PEP 8. The buggy query produces inflated totals due to a Cartesian product. Run print(cursor.fetchall()) to see incorrect results (e.g., duplicated rows with inflated sums like [('Halal Laptop', 9999.98), ...]). Focus on fixing the JOIN condition. If schema errors occur, verify tables with sqlite3 data/sales.db '.schema'.
Buggy Code:
import sqlite3
def query_sales_metrics(conn):
cursor = conn.cursor()
cursor.execute("""
SELECT p.name, SUM(p.price * s.quantity) AS total_sales
FROM products p, sales s # Bug: Missing JOIN condition
GROUP BY p.name
ORDER BY total_sales DESC
""")
results = cursor.fetchall()
total_sales = sum(row[1] for row in results)
return {"total_sales": total_sales}
conn = sqlite3.connect("data/sales.db")
print(query_sales_metrics(conn))
conn.close()Sample Input:
- SQLite database
sales.dbwith populatedproductsandsalestables.
Expected Output:
{'total_sales': 2499.83}Follow-Along Instructions:
- Save as
de-onboarding/ex4_debug_query.py. - Ensure
data/sales.dbexists per Appendix 1. - Configure editor for 4-space indentation per PEP 8.
- Run:
python ex4_debug_query.pyto see incorrect output. - Fix and re-run.
- How to Test:
- Verify output matches expected.
- Test with additional products to ensure correct totals.
- Common Errors:
- OperationalError: Print
cursor.fetchall()to debug query results. - IndentationError: Use 4 spaces (not tabs). Run
python -tt ex4_debug_query.py.
- OperationalError: Print
Exercise 5: Normalization and Database Choice Trade-offs (Conceptual)
Compare normalized vs. denormalized schemas and SQLite vs. PostgreSQL for Hijra Group’s real-time sales analytics, considering transaction volume, reporting speed, and database scalability. Save your answer to de-onboarding/ex5_concepts.txt.
Expected Output (in ex5_concepts.txt):
Normalized Schema:
- Pros: Reduces redundancy, ensures data integrity (e.g., single price per product), supports efficient updates for high transaction volumes.
- Cons: Requires joins (O(n) complexity), slower for complex real-time queries.
Denormalized Schema:
- Pros: Faster queries (O(n) without joins), simpler for real-time analytics (e.g., total sales for dashboards).
- Cons: Increases redundancy, risks inconsistencies (e.g., mismatched prices), harder to update in high-volume systems.
SQLite:
- Pros: Lightweight, file-based, ideal for small-scale testing or prototyping (e.g., local sales analytics).
- Cons: Limited concurrency, not suited for high-volume production systems.
PostgreSQL:
- Pros: Enterprise-grade, supports high concurrency and complex queries, suitable for production analytics.
- Cons: Requires server setup, higher resource usage.
For Hijra Group’s real-time analytics, normalization and PostgreSQL are preferred for data integrity and scalability in high-volume transaction processing, but denormalization and SQLite may be used for faster reporting in data marts (Chapter 32) or testing.Follow-Along Instructions:
- Create
de-onboarding/ex5_concepts.txt. - Write your comparison, addressing schemas and databases for Hijra Group’s context.
- How to Test:
- Verify file exists with
cat de-onboarding/ex5_concepts.txt(Unix/macOS) ortype de-onboarding\ex5_concepts.txt(Windows). - Check content covers normalization, denormalization, SQLite, PostgreSQL, and fintech context.
- Common Errors:
- FileNotFoundError: Ensure correct path. Print
os.path.exists("de-onboarding/ex5_concepts.txt").
- FileNotFoundError: Ensure correct path. Print
- Verify file exists with
18.4 Exercise Solutions
Solution to Exercise 1: SQLite Schema Creation
import sqlite3
def create_schema(db_path: str) -> None:
"""Create normalized SQLite schema."""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.executescript("""
CREATE TABLE products (
product_id TEXT PRIMARY KEY,
name TEXT NOT NULL,
price REAL NOT NULL
);
CREATE TABLE sales (
sale_id INTEGER PRIMARY KEY AUTOINCREMENT,
product_id TEXT,
quantity INTEGER NOT NULL,
FOREIGN KEY (product_id) REFERENCES products (product_id)
);
""")
conn.commit()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
tables = [row[0] for row in cursor.fetchall()]
print(f"Created tables: {tables}")
conn.close()
create_schema(":memory:")Solution to Exercise 2: PostgreSQL Query
from typing import float
import psycopg2
def query_total_sales(conn_params: dict) -> float:
"""Query total sales in PostgreSQL."""
conn = psycopg2.connect(**conn_params)
cursor = conn.cursor()
cursor.execute("""
SELECT SUM(p.price * s.quantity)
FROM products p
JOIN sales s ON p.product_id = s.product_id
""")
total = cursor.fetchone()[0] or 0.0
print(f"Total Sales: {total}")
conn.close()
return total
conn_params = {
"dbname": "sales_db",
"user": "postgres",
"password": "password",
"host": "localhost",
"port": "5432"
}
print(query_total_sales(conn_params))Solution to Exercise 3: Type-Safe Validation
from pydantic import BaseModel, ValidationError, validator
from typing import Dict
class Sale(BaseModel):
product: str
price: float
quantity: int
@validator("product")
def check_prefix(cls, v):
if not v.startswith("Halal"):
raise ValueError("Product must start with 'Halal'")
return v
def validate_sale(sale: Dict) -> bool:
"""Validate sale with Pydantic."""
try:
validated_sale = Sale(**sale)
print(f"Validated sale: {validated_sale.dict()}")
return True
except ValidationError as e:
print(f"Validation error: {e}")
return False
sale = {"product": "Halal Laptop", "price": 999.99, "quantity": 2}
validate_sale(sale)Solution to Exercise 4: Debug SQL Query Error
import sqlite3
def query_sales_metrics(conn):
"""Query total sales with corrected JOIN."""
cursor = conn.cursor()
cursor.execute("""
SELECT p.name, SUM(p.price * s.quantity) AS total_sales
FROM products p
JOIN sales s ON p.product_id = s.product_id # Fix: Added JOIN condition
GROUP BY p.name
ORDER BY total_sales DESC
""")
results = cursor.fetchall()
total_sales = sum(row[1] for row in results)
return {"total_sales": total_sales}
conn = sqlite3.connect("data/sales.db")
print(query_sales_metrics(conn))
conn.close()Explanation:
- Bug: The original query used a Cartesian product (
FROM products p, sales s) without a JOIN condition, causing incorrect totals due to cross-joining all rows. - Fix: Added
JOIN sales s ON p.product_id = s.product_idto match products with sales correctly.
Solution to Exercise 5: Normalization and Database Choice Trade-offs (Conceptual)
File: de-onboarding/ex5_concepts.txt
Normalized Schema:
- Pros: Reduces redundancy, ensures data integrity (e.g., single price per product), supports efficient updates for high transaction volumes.
- Cons: Requires joins (O(n) complexity), slower for complex real-time queries.
Denormalized Schema:
- Pros: Faster queries (O(n) without joins), simpler for real-time analytics (e.g., total sales for dashboards).
- Cons: Increases redundancy, risks inconsistencies (e.g., mismatched prices), harder to update in high-volume systems.
SQLite:
- Pros: Lightweight, file-based, ideal for small-scale testing or prototyping (e.g., local sales analytics).
- Cons: Limited concurrency, not suited for high-volume production systems.
PostgreSQL:
- Pros: Enterprise-grade, supports high concurrency and complex queries, suitable for production analytics.
- Cons: Requires server setup, higher resource usage.
For Hijra Group’s real-time analytics, normalization and PostgreSQL are preferred for data integrity and scalability in high-volume transaction processing, but denormalization and SQLite may be used for faster reporting in data marts (Chapter 32) or testing.18.5 Chapter Summary and Connection to Chapter 19
In this chapter, you’ve mastered:
- SQL Querying: Efficient data retrieval with SQLite and PostgreSQL, including debugging query errors.
- Schema Design: Normalized schemas with ER diagrams, validated visually with tools like DBeaver, optimized for Hijra Group’s high-volume transactions.
- Python Integration: Database operations with
sqlite3andpsycopg2. - Type Safety: Pydantic for validated data loading, including Sharia-compliant Halal prefix checks.
- Testing:
pytestfor reliable database operations, covering both SQLite and PostgreSQL. - Database Selection: Understanding trade-offs between normalized/denormalized schemas and SQLite/PostgreSQL for analytics.
The micro-project built a type-annotated database tool integrating SQLite and PostgreSQL, processing data/sales.csv, and querying data/sales.db, with pytest tests ensuring reliability. The normalized schema, type-safe validation, and database trade-offs support Hijra Group’s transaction analytics, preparing you for advanced SQL querying in Chapter 19, which introduces joins, subqueries, and aggregations for complex analytics, building on this chapter’s foundational skills.