22 - PostgreSQL Indexing and Optimization
Complexity: Moderate (M)
22.0 Introduction: Why This Matters for Data Engineering
In data engineering, optimizing database performance is critical for handling large-scale financial transaction data at Hijra Group, ensuring fast query execution for Sharia-compliant analytics. PostgreSQL, a robust open-source RDBMS, supports advanced indexing techniques like B-tree and GIN, reducing query times from O(n) to O(log n) for lookups on indexed columns. For a 1 million-row transaction table, a B-tree index can cut SELECT query times from seconds to milliseconds, while proper optimization minimizes storage overhead (e.g., ~10–20MB for a B-tree index on a numeric column). GIN indexes excel for JSONB or full-text search, critical for querying transaction metadata in fintech applications. This chapter builds on Chapters 16 (PostgreSQL Fundamentals), 17 (Python and PostgreSQL Integration), and 21 (Advanced PostgreSQL Querying), focusing on indexing and query optimization to enhance pipeline efficiency.
This chapter uses type-annotated Python with psycopg2 for database interactions, verified by Pyright (per Chapter 7), and includes pytest tests (per Chapter 9) to validate performance improvements. All code adheres to PEP 8’s 4-space indentation, preferring spaces over tabs to avoid IndentationError, ensuring compatibility with Hijra Group’s pipeline scripts. The micro-project optimizes a transaction database using data/transactions.csv, preparing for type-safe integration (Chapter 23) and cloud analytics (Phase 4).
Data Engineering Workflow Context
This diagram illustrates how indexing and optimization fit into a data engineering pipeline:
flowchart TD
A["Raw Data (CSV)"] --> B["PostgreSQL Database"]
B --> C{"Indexing & Optimization"}
C -->|Create Indexes| D["B-tree/GIN Indexes"]
C -->|Optimize Queries| E["Query Plans (EXPLAIN)"]
D --> F["Fast Query Execution"]
E --> F
F --> G["Analytics/Reporting"]
classDef data fill:#f9f9f9,stroke:#333,stroke-width:2px
classDef process fill:#d0e0ff,stroke:#336,stroke-width:1px
classDef storage fill:#ddffdd,stroke:#363,stroke-width:1px
class A,B,D data
class C,E,F process
class G storageBuilding On and Preparing For
- Building On:
- Chapter 16: Uses
psycopg2for PostgreSQL connections and basic queries. - Chapter 17: Integrates Python with PostgreSQL using type-annotated code and YAML configs.
- Chapter 21: Leverages advanced querying (CTEs, window functions) for complex analytics, now optimized.
- Chapter 3: Uses Matplotlib for query plan visualization.
- Chapter 16: Uses
- Preparing For:
- Chapter 23: Prepares for type-safe database integration by optimizing query performance.
- Chapter 24: Supports Checkpoint 3B by ensuring efficient database operations.
- Chapter 48: Introduces advanced PostgreSQL features like expression indexes for JSONB.
- Chapter 63: Enables PostgreSQL deployment in Kubernetes with optimized schemas.
- Chapters 68–71: Supports capstone projects with optimized pipeline performance.
What You’ll Learn
This chapter covers:
- Indexing Basics: B-tree and GIN indexes for fast lookups and JSONB searches.
- Query Optimization: Using
EXPLAINto analyze and improve query plans. - Performance Tuning: Vacuuming and analyzing tables for maintenance.
- Type-Safe Integration: Type-annotated
psycopg2queries with Pydantic validation. - Testing:
pytesttests to verify performance gains. - Visualization: Matplotlib plots for query plan costs.
By the end, you’ll optimize a PostgreSQL database for transaction data, reducing query times, reporting index sizes, and validating performance with tests, using data/transactions.csv and config.yaml per Appendix 1. All code uses 4-space indentation per PEP 8.
Follow-Along Tips:
- Install PostgreSQL (
psql --version) and ensure a local server is running (pg_ctl status). - Create
de-onboarding/data/and populate withconfig.yamlper Appendix 1; generatetransactions.csvwith provided script. - Install libraries:
pip install psycopg2-binary pyyaml pydantic pytest matplotlib. - Use print statements (e.g.,
print(cursor.fetchall())) to debug queries. - Verify database connections with
psql -U postgres -d sales_db. - Configure editor for 4-space indentation (VS Code: “Editor: Tab Size” = 4, “Editor: Insert Spaces” = true, “Editor: Detect Indentation” = false).
- If
IndentationError, runpython -tt script.pyto detect tab/space mixing.
22.1 Indexing Basics
Indexes in PostgreSQL improve query performance by creating data structures for fast lookups. B-tree indexes are ideal for equality and range queries (e.g., WHERE transaction_id = 'T001'), with O(log n) time complexity. GIN indexes optimize JSONB fields or full-text search, useful for transaction metadata (e.g., searching {"details": {"subcategory": "laptop"}}). Indexes increase storage (e.g., ~10MB for a B-tree on a 1M-row numeric column, ~15–20MB for GIN on JSONB) and slow INSERT/UPDATE operations (O(log n) per operation).
22.1.1 Creating B-tree Indexes
Create a B-tree index on transaction_id for fast lookups.
# File: de-onboarding/index_basics.py
from typing import Any # For type annotations
import psycopg2 # For PostgreSQL connection
# Connect to database
conn: psycopg2.connection = psycopg2.connect(
dbname="sales_db",
user="postgres",
password="password",
host="localhost",
port="5432"
)
cursor: psycopg2.cursor = conn.cursor()
# Create transactions table with JSONB column
cursor.execute("""
CREATE TABLE IF NOT EXISTS transactions (
transaction_id TEXT PRIMARY KEY,
product TEXT,
price REAL,
quantity INTEGER,
date DATE,
metadata JSONB
)
""")
# Insert sample data
cursor.execute("""
INSERT INTO transactions (transaction_id, product, price, quantity, date, metadata)
VALUES
('T001', 'Halal Laptop', 999.99, 2, '2023-10-01', '{"category": "electronics", "details": {"subcategory": "laptop"}}'),
('T002', 'Halal Mouse', 24.99, 10, '2023-10-02', '{"category": "accessories", "details": {"subcategory": "mouse"}}'),
('T003', 'Halal Keyboard', 49.99, 5, '2023-10-03', '{"category": "accessories", "details": {"subcategory": "keyboard"}}')
""")
# Create B-tree index
cursor.execute("CREATE INDEX IF NOT EXISTS idx_transaction_id ON transactions (transaction_id)")
conn.commit()
# Query with index
cursor.execute("SELECT * FROM transactions WHERE transaction_id = 'T001'")
result: list[Any] = cursor.fetchall()
print("Query Result:", result) # Debug: print result
# Close connection
cursor.close()
conn.close()
# Expected Output:
# Query Result: [('T001', 'Halal Laptop', 999.99, 2, datetime.date(2023, 10, 1), {'category': 'electronics', 'details': {'subcategory': 'laptop'}})]Follow-Along Instructions:
- Ensure PostgreSQL is running (
pg_ctl start). - Create database:
createdb -U postgres sales_db. - Save as
de-onboarding/index_basics.py. - Configure editor for 4-space indentation per PEP 8.
- Update
passwordinpsycopg2.connectto match your setup. - Run:
python index_basics.py. - Verify output shows transaction T001.
- Common Errors:
- OperationalError: Check PostgreSQL is running and credentials are correct. Print
psycopg2.connectparameters. - IndentationError: Use 4 spaces. Run
python -tt index_basics.py. - ProgrammingError: Print
cursor.descriptionto debug query schema.
- OperationalError: Check PostgreSQL is running and credentials are correct. Print
Key Points:
- B-tree Index: O(log n) for lookups, O(log n) for inserts/updates.
- Storage: ~10MB for 1M rows (numeric column).
- Time Complexity: O(log n) for indexed queries vs. O(n) for table scans.
- Space Complexity: O(n) for index storage.
- Implication: Use B-tree for frequent lookups, e.g., transaction IDs in Hijra Group’s analytics.
22.1.2 Analyzing Query Plans with EXPLAIN
Use EXPLAIN to verify index usage.
# File: de-onboarding/explain_query.py
from typing import Any
import psycopg2
conn: psycopg2.connection = psycopg2.connect(
dbname="sales_db",
user="postgres",
password="password",
host="localhost",
port="5432"
)
cursor: psycopg2.cursor = conn.cursor()
# Analyze query plan
cursor.execute("EXPLAIN SELECT * FROM transactions WHERE transaction_id = 'T001'")
plan: list[Any] = cursor.fetchall()
print("Query Plan:") # Debug
for row in plan:
print(row[0]) # Print plan steps
# Close connection
cursor.close()
conn.close()
# Expected Output (abridged):
# Query Plan:
# Index Scan using idx_transaction_id on transactions (cost=0.15..8.17 rows=1 width=64)Follow-Along Instructions:
- Run after
index_basics.py. - Save as
de-onboarding/explain_query.py. - Configure editor for 4-space indentation per PEP 8.
- Run:
python explain_query.py. - Verify output shows “Index Scan” on
idx_transaction_id. - Common Errors:
- Seq Scan: If plan shows sequential scan, ensure index exists (
\d transactionsinpsql). Drop and recreate index.
- Seq Scan: If plan shows sequential scan, ensure index exists (
Key Points:
EXPLAIN: Shows query plan, e.g., “Index Scan” vs. “Seq Scan”.- Implication: Use
EXPLAINto confirm index usage in pipelines.
22.1.3 Creating GIN Indexes for JSONB
Create a GIN index on the metadata JSONB column for fast searches, including nested keys.
# File: de-onboarding/gin_index.py
from typing import Any
import psycopg2
conn: psycopg2.connection = psycopg2.connect(
dbname="sales_db",
user="postgres",
password="password",
host="localhost",
port="5432"
)
cursor: psycopg2.cursor = conn.cursor()
# Create GIN index
cursor.execute("CREATE INDEX IF NOT EXISTS idx_metadata ON transactions USING GIN (metadata)")
conn.commit()
# Simple JSONB query
cursor.execute("SELECT * FROM transactions WHERE metadata @> '{\"category\": \"electronics\"}'")
simple_result: list[Any] = cursor.fetchall()
print("Simple JSONB Query Result:", simple_result) # Debug: print result
# Nested JSONB query
cursor.execute("SELECT * FROM transactions WHERE metadata->'details'->>'subcategory' = 'laptop'")
nested_result: list[Any] = cursor.fetchall()
print("Nested JSONB Query Result:", nested_result) # Debug: print result
# Analyze plan for nested query
cursor.execute("EXPLAIN SELECT * FROM transactions WHERE metadata->'details'->>'subcategory' = 'laptop'")
plan: list[Any] = cursor.fetchall()
print("Nested Query Plan:") # Debug
for row in plan:
print(row[0])
print("Note: Nested JSONB queries with ->> may use sequential scans, as GIN indexes optimize @> queries. Expression indexes (Chapter 48) can optimize such queries.")
# Close connection
cursor.close()
conn.close()
# Expected Output (abridged):
# Simple JSONB Query Result: [('T001', 'Halal Laptop', 999.99, 2, datetime.date(2023, 10, 1), {'category': 'electronics', 'details': {'subcategory': 'laptop'}})]
# Nested JSONB Query Result: [('T001', 'Halal Laptop', 999.99, 2, datetime.date(2023, 10, 1), {'category': 'electronics', 'details': {'subcategory': 'laptop'}})]
# Nested Query Plan:
# Seq Scan on transactions (cost=0.00..1.03 rows=1 width=64)
# Filter: ((metadata -> 'details' ->> 'subcategory'::text) = 'laptop'::text)
# Note: Nested JSONB queries with ->> may use sequential scans, as GIN indexes optimize @> queries. Expression indexes (Chapter 48) can optimize such queries.Follow-Along Instructions:
- Run after
index_basics.py. - Save as
de-onboarding/gin_index.py. - Configure editor for 4-space indentation per PEP 8.
- Run:
python gin_index.py. - Verify output shows electronics/laptop transactions and “Bitmap Index Scan” for simple query, “Seq Scan” for nested query.
- Common Errors:
- ProgrammingError: Ensure
metadatais JSONB. Check\d transactionsinpsql. - Seq Scan: GIN indexes don’t optimize
->>queries; expression indexes are covered in Chapter 48.
- ProgrammingError: Ensure
Key Points:
- GIN Index: Optimizes JSONB searches with O(log n) complexity for
@>queries;->>queries may require expression indexes (Chapter 48). - Storage: ~15–20MB for 1M rows (JSONB column).
- Time Complexity: O(log n) for indexed
@>queries, O(n) for unindexed nested searches. - Space Complexity: O(n) for index storage.
- Implication: Use GIN for metadata searches in Hijra Group’s analytics.
22.2 Query Optimization
Optimize queries by rewriting them to leverage indexes and reduce I/O. For example, filtering before joining reduces rows processed, and selective predicates push down conditions to the storage engine.
22.2.1 Optimizing a Range Query
Optimize a date range query with an index.
# File: de-onboarding/range_query.py
from typing import Any
import psycopg2
conn: psycopg2.connection = psycopg2.connect(
dbname="sales_db",
user="postgres",
password="password",
host="localhost",
port="5432"
)
cursor: psycopg2.cursor = conn.cursor()
# Create index on date
cursor.execute("CREATE INDEX IF NOT EXISTS idx_date ON transactions (date)")
# Optimized range query
cursor.execute("""
SELECT product, SUM(price * quantity) as total_sales
FROM transactions
WHERE date BETWEEN '2023-10-01' AND '2023-10-03'
GROUP BY product
""")
results: list[Any] = cursor.fetchall()
print("Range Query Results:", results) # Debug
# Analyze plan
cursor.execute("""
EXPLAIN SELECT product, SUM(price * quantity) as total_sales
FROM transactions
WHERE date BETWEEN '2023-10-01' AND '2023-10-03'
GROUP BY product
""")
plan: list[Any] = cursor.fetchall()
print("Query Plan:") # Debug
for row in plan:
print(row[0])
conn.commit()
cursor.close()
conn.close()
# Expected Output (abridged):
# Range Query Results: [('Halal Laptop', 1999.98), ('Halal Mouse', 249.9), ('Halal Keyboard', 249.95)]
# Query Plan:
# GroupAggregate (cost=0.15..8.45 rows=3 width=36)
# -> Index Scan using idx_date on transactions (cost=0.15..8.27 rows=3 width=28)Follow-Along Instructions:
- Run after
index_basics.py. - Save as
de-onboarding/range_query.py. - Configure editor for 4-space indentation per PEP 8.
- Run:
python range_query.py. - Verify output shows results and “Index Scan” on
idx_date. - Common Errors:
- Seq Scan: Ensure
idx_dateexists. Check with\d transactionsinpsql.
- Seq Scan: Ensure
Key Points:
- Range Query: Benefits from B-tree index on
date. - Time Complexity: O(log n + k) for k matching rows.
- Implication: Optimize date-based analytics for Hijra Group’s reporting.
22.3 Performance Tuning
Maintain database performance with VACUUM and ANALYZE to update statistics and reclaim space.
22.3.1 Vacuuming and Analyzing
Run maintenance tasks to optimize performance.
# File: de-onboarding/maintenance.py
from typing import Any
import psycopg2
conn: psycopg2.connection = psycopg2.connect(
dbname="sales_db",
user="postgres",
password="password",
host="localhost",
port="5432"
)
conn.set_session(autocommit=True) # Enable autocommit for VACUUM
cursor: psycopg2.cursor = conn.cursor()
# Vacuum and analyze
cursor.execute("VACUUM ANALYZE transactions")
print("Maintenance completed") # Confirm
cursor.close()
conn.close()
# Expected Output:
# Maintenance completedFollow-Along Instructions:
- Run after
range_query.py. - Save as
de-onboarding/maintenance.py. - Configure editor for 4-space indentation per PEP 8.
- Run:
python maintenance.py. - Verify output confirms completion.
- Common Errors:
- OperationalError: Ensure autocommit is enabled for
VACUUM. Printconn.autocommit.
- OperationalError: Ensure autocommit is enabled for
Key Points:
- VACUUM: Reclaims space from deleted rows.
- ANALYZE: Updates query planner statistics.
- Implication: Schedule maintenance in pipelines to ensure performance.
22.4 Micro-Project: Optimized Transaction Database
Project Requirements
Build a type-safe PostgreSQL pipeline to load, index, and optimize a transaction database using data/transactions.csv, supporting Hijra Group’s analytics. The pipeline creates B-tree and GIN indexes, optimizes queries, tests performance improvements, handles a larger dataset (1000 rows), reports actual and estimated index sizes in a formatted table, and reduces Airflow DAG execution times, ensuring compliance with Islamic Financial Services Board (IFSB) standards.
- Generate a larger
data/transactions.csv(1000 rows) with nested JSONBmetadata. - Load
data/transactions.csvwithpandas.read_csvand insert into PostgreSQL. - Read
data/config.yamlwith PyYAML for validation rules. - Create B-tree indexes on
transaction_idanddate, and a GIN index onmetadata. - Optimize a query for total sales by product in a date range and a nested JSONB query.
- Use
EXPLAINto verify index usage. - Run
VACUUM ANALYZEfor maintenance. - Export results to
data/optimized_results.json. - Report actual and estimated index sizes in a formatted table.
- Write
pytesttests to validate performance (indexed queries 2x faster than non-indexed), malformed JSONB, invalid dates, and duplicates. - Use type annotations verified by Pyright and 4-space indentation per PEP 8.
Sample Input Files
data/transactions.csv (generated, first 5 rows shown):
transaction_id,product,price,quantity,date,metadata
T001,Halal Laptop,999.99,2,2023-10-01,{"category": "electronics", "details": {"subcategory": "laptop"}}
T002,Halal Mouse,24.99,10,2023-10-02,{"category": "accessories", "details": {"subcategory": "mouse"}}
T003,Halal Keyboard,49.99,5,2023-10-03,{"category": "accessories", "details": {"subcategory": "keyboard"}}
T004,,29.99,3,2023-10-04,{"category": "misc", "details": {"subcategory": "other"}}
T005,Monitor,199.99,2,2023-10-05,{"category": "electronics", "details": {"subcategory": "monitor"}}data/config.yaml (from Appendix 1, updated):
min_price: 10.0
max_quantity: 100
required_fields:
- transaction_id
- product
- price
- quantity
- date
- metadata
product_prefix: 'Halal'
max_decimals: 2Dataset Seeding Script
Generate a larger transactions.csv with 1000 rows and nested JSONB metadata.
# File: de-onboarding/seed_transactions.py
import pandas as pd
import random
from datetime import datetime, timedelta
import json
def generate_transactions(n: int = 1000) -> None:
"""Generate transactions.csv with 1000 rows."""
products = ["Halal Laptop", "Halal Mouse", "Halal Keyboard", "Monitor", ""]
categories = ["electronics", "accessories", "misc"]
subcategories = ["laptop", "mouse", "keyboard", "monitor", "other"]
start_date = datetime(2023, 10, 1)
data = {
"transaction_id": [f"T{i:03d}" for i in range(1, n + 1)],
"product": [random.choice(products) for _ in range(n)],
"price": [round(random.uniform(5.0, 1000.0), 2) for _ in range(n)],
"quantity": [random.randint(1, 150) for _ in range(n)],
"date": [(start_date + timedelta(days=random.randint(0, 30))).strftime("%Y-%m-%d") for _ in range(n)],
"metadata": [json.dumps({
"category": random.choice(categories),
"details": {"subcategory": random.choice(subcategories)}
}) for _ in range(n)]
}
df = pd.DataFrame(data)
df.to_csv("data/transactions.csv", index=False)
print(f"Generated data/transactions.csv with {n} rows")
if __name__ == "__main__":
generate_transactions()Follow-Along Instructions:
- Save as
de-onboarding/seed_transactions.py. - Configure editor for 4-space indentation per PEP 8.
- Run:
python seed_transactions.py. - Verify
data/transactions.csvexists with 1000 rows.
Data Processing Flow
flowchart TD
A["Input CSV
transactions.csv"] --> B["Load CSV
pandas.read_csv"]
B --> C["Pandas DataFrame"]
C --> D["Read YAML
config.yaml"]
D --> E["Validate & Insert
PostgreSQL"]
E -->|Invalid| F["Log Warning"]
E -->|Valid| G["Create Indexes
B-tree/GIN"]
G --> H["Optimize Queries
EXPLAIN"]
H --> I["Run Maintenance
VACUUM ANALYZE"]
I --> J["Export JSON
optimized_results.json"]
F --> K["End Processing"]
J --> K
classDef data fill:#f9f9f9,stroke:#333,stroke-width:2px
classDef process fill:#d0e0ff,stroke:#336,stroke-width:1px
classDef error fill:#ffdddd,stroke:#933,stroke-width:1px
classDef endpoint fill:#ddffdd,stroke:#363,stroke-width:1px
class A,C data
class B,D,E,G,H,I,J process
class F error
class K endpointAcceptance Criteria
- Go Criteria:
- Generates and loads
transactions.csv(1000 rows) andconfig.yaml. - Inserts valid records into PostgreSQL, handling malformed JSONB and invalid dates.
- Creates B-tree indexes on
transaction_idanddate, and a GIN index onmetadata. - Optimizes sales and nested JSONB queries with
EXPLAINshowing index usage where applicable. - Runs
VACUUM ANALYZE. - Exports results to
data/optimized_results.json. - Reports actual and estimated index sizes in a formatted table.
- Passes
pytesttests for correctness, performance (indexed queries 2x faster), malformed data, and duplicates. - Uses type annotations and 4-space indentation per PEP 8.
- Generates and loads
- No-Go Criteria:
- Fails to load or insert data.
- Missing indexes or non-optimized queries.
- No
pytesttests or failing tests. - Inconsistent indentation or missing type annotations.
Common Pitfalls to Avoid
- Database Connection Issues:
- Problem:
OperationalErrordue to wrong credentials. - Solution: Verify
psycopg2.connectparameters. Print connection details.
- Problem:
- Index Not Used:
- Problem: Query uses sequential scan.
- Solution: Check
EXPLAINoutput. Ensure indexes exist (\d transactionsinpsql).
- Data Validation Errors:
- Problem: Malformed JSONB or duplicate
transaction_idcauses insert failures. - Solution: Print
df.head()and validate with Pydantic. Check duplicates withdf["transaction_id"].duplicated().sum().
- Problem: Malformed JSONB or duplicate
- Type Annotation Errors:
- Problem: Pyright flags missing types.
- Solution: Use
Anyfor dynamic query results. Runpyright script.py.
- IndentationError:
- Problem: Mixed spaces/tabs.
- Solution: Use 4 spaces. Run
python -tt script.py.
- Test Failures:
- Problem:
pytesttests fail due to schema mismatches or performance variability. - Solution: Print
cursor.descriptionand query execution times; use relative thresholds.
- Problem:
How This Differs from Production
In production, this solution would include:
- Connection Pooling: Use
psycopg2.poolfor scalability (Chapter 63). - Automated Maintenance: Schedule
VACUUM ANALYZEwith Airflow (Chapter 56) to maintain index efficiency, reducing DAG execution times for data processing tasks. - Monitoring: Track query performance with Grafana (Chapter 66).
- Security: Encrypt connections with SSL (Chapter 65).
- Scalability: Use partial indexes (e.g.,
WHERE product LIKE 'Halal%') to reduce index size and partitioned tables for large datasets (Chapter 70). - Index Maintenance: Regularly rebuild indexes with
REINDEXto handle fragmentation in high-update scenarios.
Implementation
# File: de-onboarding/utils.py (updated from Chapter 17)
from typing import Any, Dict
from pydantic import BaseModel, ValidationError
import json
import re
def clean_string(s: str) -> str:
"""Strip whitespace from string."""
return s.strip()
def is_numeric(s: str, max_decimals: int = 2) -> bool:
"""Check if string is a decimal number with up to max_decimals."""
parts = s.split(".")
if len(parts) != 2 or not parts[0].isdigit() or not parts[1].isdigit():
return False
return len(parts[1]) <= max_decimals
def is_numeric_value(x: Any) -> bool:
"""Check if value is numeric."""
return isinstance(x, (int, float))
def is_integer(x: Any) -> bool:
"""Check if value is an integer."""
return str(x).isdigit()
def apply_valid_decimals(x: Any, max_decimals: int) -> bool:
"""Apply decimal validation."""
return is_numeric(str(x), max_decimals)
def is_valid_date(s: str) -> bool:
"""Check if string is a valid date in YYYY-MM-DD format."""
try:
from datetime import datetime
datetime.strptime(s, "%Y-%m-%d")
return True
except ValueError:
return False
def parse_explain_cost(plan: list[Any]) -> float:
"""Parse total cost from EXPLAIN output."""
for line in plan:
match = re.search(r"cost=[\d.]+..([\d.]+)", line[0])
if match:
return float(match.group(1))
return 0.0
class Transaction(BaseModel):
"""Pydantic model for transaction validation."""
transaction_id: str
product: str
price: float
quantity: int
date: str
metadata: dict
def validate_transaction(row: Dict[str, Any], config: Dict[str, Any]) -> bool:
"""Validate transaction using Pydantic and config rules."""
try:
# Clean data
metadata = row["metadata"]
if isinstance(metadata, str):
metadata = json.loads(metadata)
cleaned = {
"transaction_id": clean_string(str(row["transaction_id"])),
"product": clean_string(str(row["product"])),
"price": float(row["price"]),
"quantity": int(row["quantity"]),
"date": str(row["date"]),
"metadata": metadata
}
# Validate with Pydantic
Transaction(**cleaned)
# Config-based validation
if not cleaned["product"].startswith(config["product_prefix"]):
print(f"Invalid transaction: product lacks '{config['product_prefix']}' prefix: {row}")
return False
if cleaned["price"] < config["min_price"] or cleaned["price"] <= 0:
print(f"Invalid transaction: invalid price: {row}")
return False
if cleaned["quantity"] > config["max_quantity"]:
print(f"Invalid transaction: invalid quantity: {row}")
return False
if not apply_valid_decimals(cleaned["price"], config["max_decimals"]):
print(f"Invalid transaction: too many decimals: {row}")
return False
if not is_valid_date(cleaned["date"]):
print(f"Invalid transaction: invalid date: {row}")
return False
print(f"Valid transaction: {row}")
return True
except (ValidationError, ValueError, json.JSONDecodeError) as e:
print(f"Invalid transaction: {e}: {row}")
return False# File: de-onboarding/optimized_processor.py
from typing import Any, Dict, List, Tuple
import pandas as pd
import psycopg2
import yaml
import json
import os
import time
from pydantic import BaseModel
import utils
def read_config(config_path: str) -> Dict[str, Any]:
"""Read YAML configuration."""
print(f"Opening config: {config_path}")
with open(config_path, "r") as file:
config = yaml.safe_load(file)
print(f"Loaded config: {config}")
return config
def setup_database(conn: psycopg2.connection) -> None:
"""Create transactions table."""
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS transactions (
transaction_id TEXT PRIMARY KEY,
product TEXT,
price REAL,
quantity INTEGER,
date DATE,
metadata JSONB
)
""")
cursor.execute("TRUNCATE TABLE transactions") # Clear table for testing
conn.commit()
cursor.close()
def load_and_insert_transactions(csv_path: str, config: Dict[str, Any], conn: psycopg2.connection) -> Tuple[pd.DataFrame, int, int]:
"""Load and insert valid transactions."""
print(f"Loading CSV: {csv_path}")
df = pd.read_csv(csv_path)
print("Initial DataFrame:")
print(df.head())
# Validate required fields
required_fields = config["required_fields"]
missing_fields = [f for f in required_fields if f not in df.columns]
if missing_fields:
print(f"Missing columns: {missing_fields}")
return pd.DataFrame(), 0, len(df)
# Check for duplicates
if df["transaction_id"].duplicated().any():
print("Duplicate transaction IDs found")
df = df.drop_duplicates(subset="transaction_id", keep="first")
# Filter valid rows
df = df.dropna(subset=["product", "price", "quantity", "date", "metadata"])
df = df[df.apply(lambda row: utils.validate_transaction(row, config), axis=1)]
print("Validated DataFrame:")
print(df.head())
# Insert into PostgreSQL
cursor = conn.cursor()
for _, row in df.iterrows():
try:
metadata = row["metadata"]
if isinstance(metadata, str):
metadata = json.loads(metadata)
cursor.execute("""
INSERT INTO transactions (transaction_id, product, price, quantity, date, metadata)
VALUES (%s, %s, %s, %s, %s, %s)
""", (row["transaction_id"], row["product"], row["price"], row["quantity"], row["date"], json.dumps(metadata)))
except psycopg2.IntegrityError as e:
print(f"Duplicate transaction_id: {row['transaction_id']}")
conn.rollback()
continue
conn.commit()
cursor.close()
return df, len(df), len(df)
def create_indexes(conn: psycopg2.connection) -> None:
"""Create B-tree and GIN indexes."""
cursor = conn.cursor()
cursor.execute("CREATE INDEX IF NOT EXISTS idx_transaction_id ON transactions (transaction_id)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_date ON transactions (date)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_metadata ON transactions USING GIN (metadata)")
conn.commit()
cursor.close()
print("Indexes created")
def estimate_index_sizes(conn: psycopg2.connection, row_count: int) -> Dict[str, Dict[str, float]]:
"""Estimate and query actual index sizes in MB."""
cursor = conn.cursor()
cursor.execute("""
SELECT indexname, pg_relation_size(indexname::regclass) / 1048576.0 as size_mb
FROM pg_indexes
WHERE tablename = 'transactions'
""")
actual_sizes = {row[0]: row[1] for row in cursor.fetchall()}
cursor.close()
# Heuristic estimates
btree_size_mb = (row_count * 10) / 1_000_000 # ~10 bytes per row
gin_size_mb = (row_count * 15) / 1_000_000 # ~15 bytes per row
estimated_sizes = {
"idx_transaction_id": btree_size_mb,
"idx_date": btree_size_mb,
"idx_metadata": gin_size_mb
}
return {
"actual": actual_sizes,
"estimated": estimated_sizes
}
def format_index_sizes(index_sizes: Dict[str, Dict[str, float]]) -> str:
"""Format index sizes as a table."""
header = f"{'Index Name':<25} {'Actual Size (MB)':<20} {'Estimated Size (MB)':<20}"
rows = []
for index in index_sizes["actual"]:
actual = index_sizes["actual"].get(index, 0.0)
estimated = index_sizes["estimated"].get(index, 0.0)
rows.append(f"{index:<25} {actual:<20.2f} {estimated:<20.2f}")
return "\n".join([header, "-" * 65] + rows)
def optimize_queries(conn: psycopg2.connection) -> Tuple[List[Any], List[Any], List[Any], List[Any], float, float]:
"""Run optimized queries and analyze plans."""
cursor = conn.cursor()
# Sales query
start_time = time.time()
cursor.execute("""
SELECT product, SUM(price * quantity) as total_sales
FROM transactions
WHERE date BETWEEN '2023-10-01' AND '2023-10-03'
GROUP BY product
""")
sales_results = cursor.fetchall()
sales_time = time.time() - start_time
print(f"Sales Query Results: {sales_results} (Time: {sales_time:.3f}s)")
cursor.execute("""
EXPLAIN SELECT product, SUM(price * quantity) as total_sales
FROM transactions
WHERE date BETWEEN '2023-10-01' AND '2023-10-03'
GROUP BY product
""")
sales_plan = cursor.fetchall()
print("Sales Query Plan:")
for row in sales_plan:
print(row[0])
# Nested JSONB query
start_time = time.time()
cursor.execute("""
SELECT product, COUNT(*) as count
FROM transactions
WHERE metadata->'details'->>'subcategory' = 'laptop'
GROUP BY product
""")
jsonb_results = cursor.fetchall()
jsonb_time = time.time() - start_time
print(f"Nested JSONB Query Results: {jsonb_results} (Time: {jsonb_time:.3f}s)")
cursor.execute("""
EXPLAIN SELECT product, COUNT(*) as count
FROM transactions
WHERE metadata->'details'->>'subcategory' = 'laptop'
GROUP BY product
""")
jsonb_plan = cursor.fetchall()
print("Nested JSONB Query Plan:")
for row in jsonb_plan:
print(row[0])
cursor.close()
return sales_results, sales_plan, jsonb_results, jsonb_plan, sales_time, jsonb_time
def run_maintenance(conn: psycopg2.connection) -> None:
"""Run VACUUM ANALYZE."""
conn.set_session(autocommit=True)
cursor = conn.cursor()
cursor.execute("VACUUM ANALYZE transactions")
cursor.close()
print("Maintenance completed")
def export_results(sales_results: List[Any], jsonb_results: List[Any], json_path: str) -> None:
"""Export results to JSON."""
result_dict = {
"total_sales_by_product": [
{"product": row[0], "total_sales": float(row[1])} for row in sales_results
],
"laptop_products": [
{"product": row[0], "count": int(row[1])} for row in jsonb_results
]
}
print(f"Writing to: {json_path}")
with open(json_path, "w") as file:
json.dump(result_dict, file, indent=2)
print(f"Exported to {json_path}")
def main() -> None:
"""Main function to optimize transaction database."""
csv_path = "data/transactions.csv"
config_path = "data/config.yaml"
json_path = "data/optimized_results.json"
# Generate dataset if not exists
if not os.path.exists(csv_path):
print("Generating transactions.csv")
from seed_transactions import generate_transactions
generate_transactions()
config = read_config(config_path)
conn = psycopg2.connect(
dbname="sales_db",
user="postgres",
password="password",
host="localhost",
port="5432"
)
setup_database(conn)
df, valid_count, total_count = load_and_insert_transactions(csv_path, config, conn)
create_indexes(conn)
index_sizes = estimate_index_sizes(conn, total_count)
sales_results, sales_plan, jsonb_results, jsonb_plan, sales_time, jsonb_time = optimize_queries(conn)
run_maintenance(conn)
export_results(sales_results, jsonb_results, json_path)
print("\nOptimization Report:")
print(f"Total Records Processed: {total_count}")
print(f"Valid Records: {valid_count}")
print(f"Invalid Records: {total_count - valid_count}")
print(f"Sales Query Time: {sales_time:.3f}s")
print(f"Nested JSONB Query Time: {jsonb_time:.3f}s")
print("Index Sizes:")
print(format_index_sizes(index_sizes))
conn.close()
if __name__ == "__main__":
main()Test Implementation
# File: de-onboarding/test_optimized_processor.py
import pytest
import psycopg2
import pandas as pd
import time
from optimized_processor import setup_database, load_and_insert_transactions, create_indexes, optimize_queries, run_maintenance
from typing import Any
@pytest.fixture
def db_connection():
"""Create a test database connection."""
conn = psycopg2.connect(
dbname="sales_db",
user="postgres",
password="password",
host="localhost",
port="5432"
)
yield conn
conn.close()
def test_pipeline(db_connection: psycopg2.connection) -> None:
"""Test the full optimization pipeline."""
config = {
"min_price": 10.0,
"max_quantity": 100,
"required_fields": ["transaction_id", "product", "price", "quantity", "date", "metadata"],
"product_prefix": "Halal",
"max_decimals": 2
}
setup_database(db_connection)
df, valid_count, total_count = load_and_insert_transactions("data/transactions.csv", config, db_connection)
assert valid_count > 0, f"Expected valid records, got {valid_count}"
# Test performance without indexes
cursor = db_connection.cursor()
start_time = time.time()
cursor.execute("""
SELECT product, SUM(price * quantity) as total_sales
FROM transactions
WHERE date BETWEEN '2023-10-01' AND '2023-10-03'
GROUP BY product
""")
no_index_time = time.time() - start_time
cursor.close()
create_indexes(db_connection)
cursor = db_connection.cursor()
cursor.execute("SELECT indexname FROM pg_indexes WHERE tablename = 'transactions'")
indexes = [row[0] for row in cursor.fetchall()]
assert "idx_transaction_id" in indexes, "Transaction ID index missing"
assert "idx_date" in indexes, "Date index missing"
assert "idx_metadata" in indexes, "Metadata index missing"
# Test performance with indexes
sales_results, sales_plan, jsonb_results, jsonb_plan, sales_time, jsonb_time = optimize_queries(db_connection)
assert sales_time < no_index_time / 2, f"Indexed sales query ({sales_time:.3f}s) not 2x faster than non-indexed ({no_index_time:.3f}s)"
assert len(sales_results) > 0, f"Expected sales results, got {len(sales_results)}"
plan_text = " ".join(row[0] for row in sales_plan)
assert "Index Scan" in plan_text, "Sales query not using index scan"
assert len(jsonb_results) > 0, f"Expected JSONB results, got {len(jsonb_results)}"
# Test duplicate handling
df_duplicate = pd.DataFrame({
"transaction_id": ["T001"],
"product": ["Halal Laptop"],
"price": [999.99],
"quantity": [2],
"date": ["2023-10-01"],
"metadata": ['{"category": "electronics", "details": {"subcategory": "laptop"}}']
})
df_duplicate.to_csv("data/duplicate.csv", index=False)
_, valid_count, _ = load_and_insert_transactions("data/duplicate.csv", config, db_connection)
assert valid_count == 0, f"Expected 0 valid records for duplicates, got {valid_count}"
# Test malformed JSONB and invalid date
df_invalid = pd.DataFrame({
"transaction_id": ["T999"],
"product": ["Halal Laptop"],
"price": [999.99],
"quantity": [2],
"date": ["2023-13-01"],
"metadata": ['{category: electronics}']
})
df_invalid.to_csv("data/invalid.csv", index=False)
_, valid_count, _ = load_and_insert_transactions("data/invalid.csv", config, db_connection)
assert valid_count == 0, f"Expected 0 valid records for invalid data, got {valid_count}"
run_maintenance(db_connection)
cursor.close()Expected Outputs
data/optimized_results.json (sample):
{
"total_sales_by_product": [
{ "product": "Halal Laptop", "total_sales": 1999.98 },
{ "product": "Halal Mouse", "total_sales": 249.9 },
{ "product": "Halal Keyboard", "total_sales": 249.95 }
],
"laptop_products": [{ "product": "Halal Laptop", "count": 200 }]
}Console Output (abridged):
Opening config: data/config.yaml
Loaded config: {'min_price': 10.0, 'max_quantity': 100, ...}
Loading CSV: data/transactions.csv
Initial DataFrame:
transaction_id product price quantity date metadata
0 T001 Halal Laptop 999.99 2 2023-10-01 {"category": "electronics", "details": {"subcategory": "laptop"}}
...
Validated DataFrame:
transaction_id product price quantity date metadata
0 T001 Halal Laptop 999.99 2 2023-10-01 {"category": "electronics", "details": {"subcategory": "laptop"}}
...
Indexes created
Sales Query Results: [('Halal Laptop', 1999.98), ...] (Time: 0.002s)
Sales Query Plan:
GroupAggregate (cost=0.15..8.45 rows=3 width=36)
-> Index Scan using idx_date on transactions (cost=0.15..8.27 rows=3 width=28)
Nested JSONB Query Results: [('Halal Laptop', 200)] (Time: 0.003s)
Nested JSONB Query Plan:
Seq Scan on transactions (cost=0.00..1.03 rows=1 width=64)
Maintenance completed
Exported to data/optimized_results.json
Optimization Report:
Total Records Processed: 1000
Valid Records: 600
Invalid Records: 400
Sales Query Time: 0.002s
Nested JSONB Query Time: 0.003s
Index Sizes:
Index Name Actual Size (MB) Estimated Size (MB)
-----------------------------------------------------------------
idx_transaction_id 0.01 0.01
idx_date 0.01 0.01
idx_metadata 0.02 0.02How to Run and Test
Setup:
- Checklist:
- Install PostgreSQL and start server (
pg_ctl start). - Create database:
createdb -U postgres sales_db. - Create
de-onboarding/data/and populate withconfig.yamlper Appendix 1. - Run
seed_transactions.pyto generatetransactions.csv. - Install libraries:
pip install psycopg2-binary pyyaml pydantic pytest matplotlib. - Create virtual environment:
python -m venv venv, activate (Windows:venv\Scripts\activate, Unix:source venv/bin/activate). - Verify Python 3.10+:
python --version. - Configure editor for 4-space indentation per PEP 8.
- Save
utils.py,seed_transactions.py,optimized_processor.py,test_optimized_processor.py.
- Install PostgreSQL and start server (
- Troubleshooting:
- OperationalError: Check PostgreSQL status and credentials. Print
psycopg2.connectparameters. - FileNotFoundError: Verify
data/transactions.csvexists. Print path. - IndentationError: Use 4 spaces. Run
python -tt optimized_processor.py. - pytest Failure: Print
cursor.descriptionto debug schema.
- OperationalError: Check PostgreSQL status and credentials. Print
- Checklist:
Run:
- Open terminal in
de-onboarding/. - Run:
python optimized_processor.py. - Outputs:
data/optimized_results.json, console logs with formatted index size table.
- Open terminal in
Test:
- Run:
pytest test_optimized_processor.py -v. - Verify all tests pass, confirming indexes, query results, performance, and error handling.
- Run:
22.5 Practice Exercises
Exercise 1: Create a B-tree Index
Write a function to create a B-tree index on product, with 4-space indentation per PEP 8.
Expected Output:
Index idx_product createdFollow-Along Instructions:
- Save as
de-onboarding/ex1_index.py. - Configure editor for 4-space indentation per PEP 8.
- Run:
python ex1_index.py. - How to Test:
- Check index:
psql -U postgres -d sales_db -c "\d transactions". - Verify index
idx_productexists.
- Check index:
Exercise 2: Analyze Query Plan
Write a function to analyze a query plan for a product filter, with 4-space indentation per PEP 8.
Expected Output:
Query Plan:
Index Scan using idx_product on transactions ...Follow-Along Instructions:
- Save as
de-onboarding/ex2_explain.py. - Configure editor for 4-space indentation per PEP 8.
- Run:
python ex2_explain.py. - How to Test:
- Verify “Index Scan” in output.
Exercise 3: Optimize a Nested JSONB Query
Write a function to query transactions with metadata subcategory “laptop”, with 4-space indentation per PEP 8.
Expected Output:
Query Results: [('Halal Laptop', 200)]Follow-Along Instructions:
- Save as
de-onboarding/ex3_jsonb.py. - Configure editor for 4-space indentation per PEP 8.
- Run:
python ex3_jsonb.py. - How to Test:
- Verify results and
EXPLAINshows query plan (may be Seq Scan without expression index).
- Verify results and
Exercise 4: Run Maintenance
Write a function to run VACUUM ANALYZE, with 4-space indentation per PEP 8.
Expected Output:
Maintenance completedFollow-Along Instructions:
- Save as
de-onboarding/ex4_maintenance.py. - Configure editor for 4-space indentation per PEP 8.
- Run:
python ex4_maintenance.py. - How to Test:
- Verify output confirms completion.
Exercise 5: Debug a Slow Join Query
Fix a slow join query using a sequential scan, ensuring 4-space indentation per PEP 8.
Buggy Code:
cursor.execute("""
SELECT t.product, t.quantity
FROM transactions t
JOIN inventory i ON t.product = i.product
WHERE EXTRACT(YEAR FROM t.date) = 2023
""")Expected Output:
Query Plan:
Index Scan using idx_date on transactions ...Follow-Along Instructions:
- Save as
de-onboarding/ex5_debug.py. - Configure editor for 4-space indentation per PEP 8.
- Run:
python ex5_debug.pyto see slow plan. - Fix and re-run.
- How to Test:
- Verify “Index Scan” in plan.
Exercise 6: Analyze Indexing Trade-Offs with Index Size
Write a script to analyze B-tree vs. GIN index trade-offs for a 1M-row transaction table and calculate actual index sizes using pg_indexes, saving to ex6_concepts.txt, with 4-space indentation per PEP 8.
Expected Output (ex6_concepts.txt):
B-tree indexes are ideal for equality and range queries (e.g., transaction_id, date), offering O(log n) lookups but require ~10MB for 1M rows. GIN indexes optimize JSONB searches (e.g., metadata), with ~15–20MB storage and slower updates due to complex structures. Use B-tree for structured columns, GIN for JSONB in Hijra Group’s analytics.
Actual Index Sizes:
idx_transaction_id: 0.01 MB
idx_date: 0.01 MB
idx_metadata: 0.02 MBFollow-Along Instructions:
- Save as
de-onboarding/ex6_concepts.py. - Configure editor for 4-space indentation per PEP 8.
- Run:
python ex6_concepts.py. - How to Test:
- Verify
ex6_concepts.txtcontains analysis and sizes.
- Verify
Exercise 7: Extend Micro-Project with GIN Index
Extend the micro-project by adding a GIN index on metadata and querying laptop transactions, with 4-space indentation per PEP 8.
Expected Output:
Extended Query Results: [('Halal Laptop', 200)]Follow-Along Instructions:
- Save as
de-onboarding/ex7_extend.py. - Configure editor for 4-space indentation per PEP 8.
- Run:
python ex7_extend.py. - How to Test:
- Verify results and
EXPLAINshows query plan.
- Verify results and
Exercise 8: Visualize Query Plan Costs
Write a function to visualize EXPLAIN cost estimates for sales and JSONB queries using Matplotlib, saving to data/query_plan_plot.png, with 4-space indentation per PEP 8.
Expected Output:
Plot saved to data/query_plan_plot.pngFollow-Along Instructions:
- Save as
de-onboarding/ex8_visualize.py. - Configure editor for 4-space indentation per PEP 8.
- Run:
python ex8_visualize.py. - How to Test:
- Verify
data/query_plan_plot.pngshows bar plot of query costs.
- Verify
Exercise 9: Create an Expression Index
Write a function to create an expression index on metadata->'details'->>'subcategory' and re-run the nested JSONB query, noting this previews Chapter 48, with 4-space indentation per PEP 8.
Expected Output:
Expression index created
Query Results: [('Halal Laptop', 200)]Follow-Along Instructions:
- Save as
de-onboarding/ex9_expression.py. - Configure editor for 4-space indentation per PEP 8.
- Run:
python ex9_expression.py. - How to Test:
- Verify results and
EXPLAINshows index usage.
- Verify results and
22.6 Exercise Solutions
Solution to Exercise 1: Create a B-tree Index
from typing import Any
import psycopg2
def create_product_index() -> None:
conn: psycopg2.connection = psycopg2.connect(
dbname="sales_db", user="postgres", password="password", host="localhost", port="5432"
)
cursor: psycopg2.cursor = conn.cursor()
cursor.execute("CREATE INDEX IF NOT EXISTS idx_product ON transactions (product)")
conn.commit()
cursor.close()
conn.close()
print("Index idx_product created")
create_product_index()Solution to Exercise 2: Analyze Query Plan
from typing import Any
import psycopg2
def analyze_product_query() -> None:
conn: psycopg2.connection = psycopg2.connect(
dbname="sales_db", user="postgres", password="password", host="localhost", port="5432"
)
cursor: psycopg2.cursor = conn.cursor()
cursor.execute("EXPLAIN SELECT * FROM transactions WHERE product = 'Halal Laptop'")
plan: list[Any] = cursor.fetchall()
print("Query Plan:")
for row in plan:
print(row[0])
cursor.close()
conn.close()
analyze_product_query()Solution to Exercise 3: Optimize a Nested JSONB Query
from typing import Any
import psycopg2
def optimize_jsonb_query() -> None:
conn: psycopg2.connection = psycopg2.connect(
dbname="sales_db", user="postgres", password="password", host="localhost", port="5432"
)
cursor: psycopg2.cursor = conn.cursor()
cursor.execute("""
SELECT product, COUNT(*) as count
FROM transactions
WHERE metadata->'details'->>'subcategory' = 'laptop'
GROUP BY product
""")
results: list[Any] = cursor.fetchall()
print("Query Results:", results)
cursor.execute("""
EXPLAIN SELECT product, COUNT(*) as count
FROM transactions
WHERE metadata->'details'->>'subcategory' = 'laptop'
GROUP BY product
""")
plan: list[Any] = cursor.fetchall()
print("Query Plan:")
for row in plan:
print(row[0])
cursor.close()
conn.close()
optimize_jsonb_query()Solution to Exercise 4: Run Maintenance
from typing import Any
import psycopg2
def run_maintenance() -> None:
conn: psycopg2.connection = psycopg2.connect(
dbname="sales_db", user="postgres", password="password", host="localhost", port="5432"
)
conn.set_session(autocommit=True)
cursor: psycopg2.cursor = conn.cursor()
cursor.execute("VACUUM ANALYZE transactions")
cursor.close()
conn.close()
print("Maintenance completed")
run_maintenance()Solution to Exercise 5: Debug a Slow Join Query
from typing import Any
import psycopg2
def fix_slow_query() -> None:
conn: psycopg2.connection = psycopg2.connect(
dbname="sales_db", user="postgres", password="password", host="localhost", port="5432"
)
cursor: psycopg2.cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS inventory (
product TEXT PRIMARY KEY,
stock INTEGER
)
""")
cursor.execute("INSERT INTO inventory (product, stock) VALUES ('Halal Laptop', 10) ON CONFLICT DO NOTHING")
cursor.execute("""
SELECT t.product, t.quantity
FROM transactions t
JOIN inventory i ON t.product = i.product
WHERE t.date >= '2023-01-01' AND t.date < '2024-01-01'
""")
results: list[Any] = cursor.fetchall()
print("Query Results:", results)
cursor.execute("""
EXPLAIN SELECT t.product, t.quantity
FROM transactions t
JOIN inventory i ON t.product = i.product
WHERE t.date >= '2023-01-01' AND t.date < '2024-01-01'
""")
plan: list[Any] = cursor.fetchall()
print("Query Plan:")
for row in plan:
print(row[0])
conn.commit()
cursor.close()
conn.close()
fix_slow_query()Solution to Exercise 6: Analyze Indexing Trade-Offs with Index Size
from typing import Any
import psycopg2
def analyze_index_tradeoffs() -> None:
conn: psycopg2.connection = psycopg2.connect(
dbname="sales_db", user="postgres", password="password", host="localhost", port="5432"
)
cursor: psycopg2.cursor = conn.cursor()
cursor.execute("""
SELECT indexname, pg_size_pretty(pg_relation_size(indexname::regclass)) as size
FROM pg_indexes
WHERE tablename = 'transactions'
""")
sizes = cursor.fetchall()
cursor.close()
conn.close()
analysis = """
B-tree indexes are ideal for equality and range queries (e.g., transaction_id, date), offering O(log n) lookups but require ~10MB for 1M rows. GIN indexes optimize JSONB searches (e.g., metadata), with ~15–20MB storage and slower updates due to complex structures. Use B-tree for structured columns, GIN for JSONB in Hijra Group’s analytics.
Actual Index Sizes:
"""
for index, size in sizes:
analysis += f" {index}: {size}\n"
with open("ex6_concepts.txt", "w") as file:
file.write(analysis.strip())
print("Analysis saved to ex6_concepts.txt")
analyze_index_tradeoffs()Solution to Exercise 7: Extend Micro-Project with GIN Index
from typing import Any
import psycopg2
def extend_with_gin_index() -> None:
conn: psycopg2.connection = psycopg2.connect(
dbname="sales_db", user="postgres", password="password", host="localhost", port="5432"
)
cursor: psycopg2.cursor = conn.cursor()
cursor.execute("CREATE INDEX IF NOT EXISTS idx_metadata ON transactions USING GIN (metadata)")
cursor.execute("""
SELECT product, COUNT(*) as count
FROM transactions
WHERE metadata->'details'->>'subcategory' = 'laptop'
GROUP BY product
""")
results: list[Any] = cursor.fetchall()
print("Extended Query Results:", results)
cursor.execute("""
EXPLAIN SELECT product, COUNT(*) as count
FROM transactions
WHERE metadata->'details'->>'subcategory' = 'laptop'
GROUP BY product
""")
plan: list[Any] = cursor.fetchall()
print("Query Plan:")
for row in plan:
print(row[0])
conn.commit()
cursor.close()
conn.close()
extend_with_gin_index()Solution to Exercise 8: Visualize Query Plan Costs
from typing import Any
import psycopg2
import matplotlib.pyplot as plt
import utils
def visualize_query_plans() -> None:
conn: psycopg2.connection = psycopg2.connect(
dbname="sales_db", user="postgres", password="password", host="localhost", port="5432"
)
cursor: psycopg2.cursor = conn.cursor()
# Get cost for sales query
cursor.execute("""
EXPLAIN SELECT product, SUM(price * quantity) as total_sales
FROM transactions
WHERE date BETWEEN '2023-10-01' AND '2023-10-03'
GROUP BY product
""")
sales_plan = cursor.fetchall()
sales_cost = utils.parse_explain_cost(sales_plan)
# Get cost for JSONB query
cursor.execute("""
EXPLAIN SELECT product, COUNT(*) as count
FROM transactions
WHERE metadata->'details'->>'subcategory' = 'laptop'
GROUP BY product
""")
jsonb_plan = cursor.fetchall()
jsonb_cost = utils.parse_explain_cost(jsonb_plan)
# Plot costs
plt.figure(figsize=(8, 6))
plt.bar(["Sales Query", "JSONB Query"], [sales_cost, jsonb_cost])
plt.title("Query Plan Cost Estimates")
plt.ylabel("Cost (arbitrary units)")
plt.grid(True)
plt.tight_layout()
plt.savefig("data/query_plan_plot.png", dpi=100)
plt.close()
print("Plot saved to data/query_plan_plot.png")
cursor.close()
conn.close()
visualize_query_plans()Solution to Exercise 9: Create an Expression Index
from typing import Any
import psycopg2
def create_expression_index() -> None:
conn: psycopg2.connection = psycopg2.connect(
dbname="sales_db", user="postgres", password="password", host="localhost", port="5432"
)
cursor: psycopg2.cursor = conn.cursor()
cursor.execute("CREATE INDEX IF NOT EXISTS idx_subcategory ON transactions ((metadata->'details'->>'subcategory'))")
print("Expression index created")
cursor.execute("""
SELECT product, COUNT(*) as count
FROM transactions
WHERE metadata->'details'->>'subcategory' = 'laptop'
GROUP BY product
""")
results: list[Any] = cursor.fetchall()
print("Query Results:", results)
cursor.execute("""
EXPLAIN SELECT product, COUNT(*) as count
FROM transactions
WHERE metadata->'details'->>'subcategory' = 'laptop'
GROUP BY product
""")
plan: list[Any] = cursor.fetchall()
print("Query Plan:")
for row in plan:
print(row[0])
conn.commit()
cursor.close()
conn.close()
create_expression_index()22.7 Chapter Summary and Connection to Chapter 23
In this chapter, you’ve mastered:
- Indexing: B-tree indexes for O(log n) lookups, GIN for JSONB searches, and expression indexes for nested queries.
- Query Optimization: Using
EXPLAINto ensure index usage and optimize plans. - Performance Tuning:
VACUUM ANALYZEfor maintenance. - Type-Safe Integration: Type-annotated
psycopg2with Pydantic validation. - Testing:
pytesttests for performance (indexed queries 2x faster), correctness, malformed data, and duplicates. - Visualization: Matplotlib plots for query plan costs.
- White-Space Sensitivity and PEP 8: Using 4-space indentation to avoid
IndentationError.
The micro-project optimized a transaction database with a 1000-row dataset, reducing query times with B-tree and GIN indexes, reporting actual and estimated index sizes in a formatted table, validating performance with pytest, and handling errors, all with 4-space indentation per PEP 8. This prepares for Chapter 23’s type-safe integration and supports end-to-end pipeline performance in capstone projects (Chapters 68–71), where optimized indexes enhance BigQuery/FastAPI integration and Airflow DAG efficiency.
Connection to Chapter 23
Chapter 23 advances Type-Safe Database Integration, building on this chapter:
- Type Safety: Extends type-annotated
psycopg2with integrated SQLite/PostgreSQL pipelines. - Performance: Leverages optimized queries and indexes for robust pipelines, critical for BigQuery/FastAPI integration in capstone projects.
- Validation: Continues Pydantic validation for data integrity, including JSONB and date handling.
- Testing: Expands
pytesttesting for integrated systems, maintaining 4-space indentation per PEP 8.