48 - PostgreSQL Performance Optimization
Complexity: Moderate (M)
48.0 Introduction: Why This Matters for Data Engineering
Optimizing PostgreSQL performance is critical for Hijra Group’s data engineering pipelines, where financial transaction databases handle thousands to millions of records daily. Efficient queries and indexing reduce latency, ensuring real-time analytics for Sharia-compliant fintech applications. For example, a well-optimized PostgreSQL query on a 1-million-row table can execute in milliseconds instead of seconds, saving compute costs and improving user experience in dashboards (Chapter 52). Building on Chapters 16 (PostgreSQL Fundamentals), 17 (Python and PostgreSQL Integration), and 47 (Advanced PostgreSQL Features), this chapter introduces indexing, query optimization, and performance monitoring, using type-annotated Python with psycopg2 for robust, testable integrations.
This chapter assumes familiarity with SQL, Python, psycopg2, type annotations (Chapter 7), and testing (Chapter 9). It uses the data/tasks.db SQLite database from Chapter 47 for initial exercises, transitioning to a PostgreSQL tasks database for the micro-project to align with production-grade systems. All code adheres to PEP 8’s 4-space indentation, preferring spaces over tabs to avoid IndentationError, ensuring compatibility with Hijra Group’s pipeline scripts. The micro-project optimizes a task database for fast queries, preparing for production-grade integrations in Chapters 49–51.
Data Engineering Workflow Context
This diagram illustrates how PostgreSQL optimization fits into a data engineering pipeline:
flowchart TD
A["Raw Data (CSV)"] --> B["Python with psycopg2"]
B --> C{"PostgreSQL Database"}
C -->|Indexing| D["Optimized Tables"]
C -->|Query Tuning| E["Efficient Queries"]
D --> F["Fast Data Retrieval"]
E --> F
F --> G["Web Apps/Dashboards"]
classDef data fill:#f9f9f9,stroke:#333,stroke-width:2px
classDef process fill:#d0e0ff,stroke:#336,stroke-width:1px
classDef storage fill:#ddffdd,stroke:#363,stroke-width:1px
class A,F data
class B,C,D,E process
class G storageBuilding On and Preparing For
- Building On:
- Chapter 16: Uses PostgreSQL basics (e.g.,
CREATE TABLE,SELECT) for schema setup. - Chapter 17: Leverages
psycopg2for type-safe Python integration with YAML configs. - Chapter 47: Applies advanced PostgreSQL features for optimization.
- Chapter 9: Uses
pytestfor testing database operations.
- Chapter 16: Uses PostgreSQL basics (e.g.,
- Preparing For:
- Chapter 49: Extends optimization to BigQuery for cloud analytics.
- Chapter 52: Prepares for Django-based dashboards with optimized queries.
- Chapter 65: Supports secure pipeline deployments with performance metrics.
- Chapter 67: Enables capstone projects with scalable database operations.
What You’ll Learn
This chapter covers:
- Indexing: Creating B-tree and composite indexes to speed up queries.
- Query Optimization: Using
EXPLAIN,EXPLAIN ANALYZE, query rewriting, and statistics updates. - Performance Monitoring: Analyzing query plans and resource usage.
- Type-Safe Integration: Building optimized, tested PostgreSQL queries with
psycopg2and Pydantic. - Testing: Validating performance improvements with
pytest.
By the end, you’ll optimize a PostgreSQL tasks database for fast task status queries, using type annotations verified by Pyright and tests with pytest, producing a performance report. The micro-project uses a PostgreSQL database seeded from data/tasks.db (Appendix 1), ensuring robust, production-ready optimizations.
Follow-Along Tips:
- Install PostgreSQL and
psycopg2-binary:pip install psycopg2-binary. - Create
de-onboarding/data/and populate withtasks.dbandconfig.yamlper Appendix 1. - Set up a PostgreSQL server (e.g., via Docker:
docker run -d -p 5432:5432 -e POSTGRES_PASSWORD=postgres postgres). - Install libraries:
pip install pyyaml pydantic pytest. - Use 4-space indentation per PEP 8. Run
python -tt script.pyto detect tab/space mixing. - Verify database connections with
psql -U postgres -h localhost. - Use print statements (e.g.,
print(cursor.fetchall())) to debug queries. - Save outputs to
data/(e.g.,performance_report.json,ex7_tradeoffs.txt).
48.1 Indexing in PostgreSQL
Indexes improve query performance by reducing the data scanned, especially for WHERE, JOIN, and ORDER BY clauses. PostgreSQL’s default B-tree index supports equality and range queries, with O(log n) lookup time for n rows, compared to O(n) for full table scans. For a 1-million-row table, a B-tree index can reduce query time from seconds to milliseconds. However, indexes increase storage (e.g., ~10–20MB for a 1M-row integer index, depending on data distribution) and slow INSERT/UPDATE operations due to index maintenance (O(log n) per operation). Frequent writes, such as daily task updates in Hijra Group’s systems, may require index pruning, explored in Chapter 63.
48.1.1 Creating B-tree Indexes
Create a B-tree index on a frequently queried column, e.g., status in the tasks table. You can estimate index size using pg_relation_size('index_name'), which returns the size in bytes (explored in Exercise 1).
from typing import Any # Import Any for flexible type annotations
import psycopg2 # Import psycopg2 for PostgreSQL connectivity
from psycopg2.extras import DictCursor # Import DictCursor for dictionary-like row access
# Connect to PostgreSQL database with connection parameters
conn: psycopg2.connection = psycopg2.connect(
dbname="postgres", # Database name
user="postgres", # Username
password="postgres", # Password
host="localhost", # Host address
port="5432" # Port number
)
cursor: DictCursor = conn.cursor(cursor_factory=DictCursor) # Create cursor for dictionary-based results
# Create tasks table if it doesn't exist
cursor.execute("""
CREATE TABLE IF NOT EXISTS tasks (
task_id TEXT PRIMARY KEY, -- Unique identifier for each task
description TEXT, -- Task description
status TEXT -- Task status (e.g., Completed, Pending)
)
""")
# Create B-tree index on status column to speed up status-based queries
cursor.execute("CREATE INDEX IF NOT EXISTS idx_status ON tasks (status)")
conn.commit() # Commit changes to the database
# Insert sample task data for testing
cursor.executemany(
"INSERT INTO tasks (task_id, description, status) VALUES (%s, %s, %s)",
[
("T001", "Process Halal Laptop sales", "Completed"), # Task 1
("T002", "Validate Halal Mouse inventory", "Pending"), # Task 2
("T003", "Update Halal Keyboard pricing", "In Progress") # Task 3
]
)
conn.commit() # Commit inserted data
# Query tasks by status to test index performance
cursor.execute("SELECT * FROM tasks WHERE status = %s", ("Completed",))
results: list[dict[str, Any]] = [dict(row) for row in cursor.fetchall()] # Convert rows to dictionaries
print("Tasks with status 'Completed':", results) # Display results
# Close cursor and connection to free resources
cursor.close()
conn.close()
# Expected Output:
# Tasks with status 'Completed': [{'task_id': 'T001', 'description': 'Process Halal Laptop sales', 'status': 'Completed'}]Follow-Along Instructions:
- Ensure PostgreSQL is running and accessible.
- Save as
de-onboarding/index_basic.py. - Configure editor for 4-space indentation per PEP 8 (VS Code: “Editor: Tab Size” = 4, “Editor: Insert Spaces” = true, “Editor: Detect Indentation” = false).
- Run:
python index_basic.py. - Verify output shows tasks with
Completedstatus. - Common Errors:
- OperationalError: Check PostgreSQL connection details. Print
psycopg2.connectparameters. - ProgrammingError: Verify table/column names. Print query with
print(cursor.query). - IndentationError: Use 4 spaces (not tabs). Run
python -tt index_basic.py.
- OperationalError: Check PostgreSQL connection details. Print
Key Points:
CREATE INDEX: Adds a B-tree index onstatus, speeding upWHERE status = ....- Time Complexity: O(log n) for indexed lookups, O(n) for index creation.
- Space Complexity: O(n) for index storage (~10–20MB for 1M rows, verifiable with
pg_relation_size). - Underlying Implementation: B-tree indexes organize data in a balanced tree, enabling logarithmic search. Each node contains keys and pointers, optimized for disk I/O.
- Implication: Indexes are critical for filtering tasks in Hijra Group’s task management systems.
48.1.2 Composite Indexes
Composite indexes optimize queries with multiple conditions, e.g., status and task_id.
from typing import Any # Import Any for flexible type annotations
import psycopg2 # Import psycopg2 for PostgreSQL connectivity
from psycopg2.extras import DictCursor # Import DictCursor for dictionary-like row access
# Connect to PostgreSQL database
conn: psycopg2.connection = psycopg2.connect(
dbname="postgres", # Database name
user="postgres", # Username
password="postgres", # Password
host="localhost", # Host address
port="5432" # Port number
)
cursor: DictCursor = conn.cursor(cursor_factory=DictCursor) # Create cursor for dictionary-based results
# Create composite index on status and task_id for multi-condition queries
cursor.execute("CREATE INDEX IF NOT EXISTS idx_status_task_id ON tasks (status, task_id)")
conn.commit() # Commit index creation
# Query tasks with multiple conditions to leverage composite index
cursor.execute("SELECT * FROM tasks WHERE status = %s AND task_id LIKE %s", ("Pending", "T%"))
results: list[dict[str, Any]] = [dict(row) for row in cursor.fetchall()] # Convert rows to dictionaries
print("Filtered Tasks:", results) # Display filtered tasks
# Close cursor and connection to free resources
cursor.close()
conn.close()
# Expected Output:
# Filtered Tasks: [{'task_id': 'T002', 'description': 'Validate Halal Mouse inventory', 'status': 'Pending'}]Follow-Along Instructions:
- Save as
de-onboarding/composite_index.py. - Ensure
taskstable exists from previous example. - Configure editor for 4-space indentation per PEP 8.
- Run:
python composite_index.py. - Verify output shows tasks matching conditions.
- Common Errors:
- IndexError: Ensure query returns results. Print
cursor.rowcount. - IndentationError: Use 4 spaces (not tabs). Run
python -tt composite_index.py.
- IndexError: Ensure query returns results. Print
Key Points:
- Composite indexes: Speed up queries with multiple
WHEREconditions. - Time Complexity: O(log n) for lookups, O(n) for index creation.
- Space Complexity: O(n) for index storage, slightly larger than single-column indexes.
- Implication: Useful for complex task queries in Hijra Group’s systems.
48.2 Query Optimization
Query optimization reduces execution time by rewriting queries and leveraging indexes. PostgreSQL’s EXPLAIN command shows query plans, revealing scan types (e.g., Index Scan vs. Seq Scan) and costs (arbitrary planner estimates, not seconds, varying by system). Indexes are most effective for highly selective queries that return few rows (e.g., specific task statuses). For a 1M-row table, an Index Scan (O(log n)) is significantly faster than a Sequential Scan (O(n)).
48.2.1 Using EXPLAIN
Analyze a query plan with EXPLAIN.
from typing import Any # Import Any for flexible type annotations
import psycopg2 # Import psycopg2 for PostgreSQL connectivity
from psycopg2.extras import DictCursor # Import DictCursor for dictionary-like row access
# Connect to PostgreSQL database
conn: psycopg2.connection = psycopg2.connect(
dbname="postgres", # Database name
user="postgres", # Username
password="postgres", # Password
host="localhost", # Host address
port="5432" # Port number
)
cursor: DictCursor = conn.cursor(cursor_factory=DictCursor) # Create cursor for dictionary-based results
# Analyze query plan for status-based query
cursor.execute("EXPLAIN SELECT * FROM tasks WHERE status = %s", ("Completed",))
plan: list[dict[str, Any]] = [dict(row) for row in cursor.fetchall()] # Convert plan rows to dictionaries
print("Query Plan:", plan) # Display query plan
# Close cursor and connection to free resources
cursor.close()
conn.close()
# Expected Output (simplified):
# Query Plan: [{'QUERY PLAN': 'Index Scan using idx_status on tasks ...'}]Follow-Along Instructions:
- Save as
de-onboarding/explain_query.py. - Ensure
taskstable andidx_statusindex exist. - Configure editor for 4-space indentation per PEP 8.
- Run:
python explain_query.py. - Verify output shows
Index Scanif index exists. - Common Errors:
- ProgrammingError: Ensure table/index exists. Run
psql -U postgres -c "\d tasks". - IndentationError: Use 4 spaces (not tabs). Run
python -tt explain_query.py.
- ProgrammingError: Ensure table/index exists. Run
Key Points:
EXPLAIN: Shows query plan, including scan type and cost.- Index Scan: Uses index (O(log n)), faster for selective queries.
- Seq Scan: Scans entire table (O(n)), slower for large tables.
- Implication: Use
EXPLAINto verify index usage in pipelines.
48.2.2 Query Rewriting
Rewrite inefficient queries to use indexes and reduce joins.
from typing import Any # Import Any for flexible type annotations
import psycopg2 # Import psycopg2 for PostgreSQL connectivity
from psycopg2.extras import DictCursor # Import DictCursor for dictionary-like row access
# Connect to PostgreSQL database
conn: psycopg2.connection = psycopg2.connect(
dbname="postgres", # Database name
user="postgres", # Username
password="postgres", # Password
host="localhost", # Host address
port="5432" # Port number
)
cursor: DictCursor = conn.cursor(cursor_factory=DictCursor) # Create cursor for dictionary-based results
# Analyze inefficient query plan (uses Seq Scan due to UPPER function)
cursor.execute("EXPLAIN SELECT * FROM tasks WHERE UPPER(status) = %s", ("COMPLETED",))
inefficient_plan: list[dict[str, Any]] = [dict(row) for row in cursor.fetchall()] # Convert plan to dictionaries
print("Inefficient Plan:", inefficient_plan) # Display inefficient plan
# Analyze optimized query plan (uses Index Scan)
cursor.execute("EXPLAIN SELECT * FROM tasks WHERE status = %s", ("Completed",))
optimized_plan: list[dict[str, Any]] = [dict(row) for row in cursor.fetchall()] # Convert plan to dictionaries
print("Optimized Plan:", optimized_plan) # Display optimized plan
# Close cursor and connection to free resources
cursor.close()
conn.close()
# Expected Output (simplified):
# Inefficient Plan: [{'QUERY PLAN': 'Seq Scan on tasks ...'}]
# Optimized Plan: [{'QUERY PLAN': 'Index Scan using idx_status on tasks ...'}]Follow-Along Instructions:
- Save as
de-onboarding/query_rewrite.py. - Ensure
taskstable and index exist. - Configure editor for 4-space indentation per PEP 8.
- Run:
python query_rewrite.py. - Verify optimized plan uses
Index Scan. - Common Errors:
- SyntaxError: Check query syntax. Print
cursor.query. - IndentationError: Use 4 spaces (not tabs). Run
python -tt query_rewrite.py.
- SyntaxError: Check query syntax. Print
Key Points:
- Avoid functions on indexed columns (e.g.,
UPPER(status)), as they prevent index usage. - Rewrite queries to match indexed conditions.
- Time Complexity: O(log n) for indexed queries, O(n) for unoptimized queries.
- Implication: Optimized queries reduce latency in Hijra Group’s dashboards.
48.3 Performance Monitoring
Monitor query performance using PostgreSQL’s pg_stat_statements extension and query execution times. This helps identify bottlenecks in production systems. In production, additional tools like pg_stat_activity provide real-time insights into active queries, explored in Chapter 66.
48.3.1 Measuring Execution Time
Measure query execution time with Python’s time module. Note that time.time() measures wall-clock time, including network latency, which may inflate results. For precise query execution times, use EXPLAIN ANALYZE in production (explored in Exercise 2).
from typing import Any # Import Any for flexible type annotations
import psycopg2 # Import psycopg2 for PostgreSQL connectivity
from psycopg2.extras import DictCursor # Import DictCursor for dictionary-like row access
import time # Import time for measuring execution duration
# Connect to PostgreSQL database
conn: psycopg2.connection = psycopg2.connect(
dbname="postgres", # Database name
user="postgres", # Username
password="postgres", # Password
host="localhost", # Host address
port="5432" # Port number
)
cursor: DictCursor = conn.cursor(cursor_factory=DictCursor) # Create cursor for dictionary-based results
# Record start time for query execution
start_time: float = time.time()
# Execute query to fetch tasks with specific status
cursor.execute("SELECT * FROM tasks WHERE status = %s", ("Completed",))
results: list[dict[str, Any]] = [dict(row) for row in cursor.fetchall()] # Convert rows to dictionaries
# Record end time and calculate duration
end_time: float = time.time()
execution_time: float = end_time - start_time # Calculate query execution time
print(f"Query Time: {execution_time:.4f} seconds") # Display execution time
print("Results:", results) # Display query results
# Close cursor and connection to free resources
cursor.close()
conn.close()
# Expected Output (example):
# Query Time: 0.0012 seconds
# Results: [{'task_id': 'T001', 'description': 'Process Halal Laptop sales', 'status': 'Completed'}]Follow-Along Instructions:
- Save as
de-onboarding/query_time.py. - Configure editor for 4-space indentation per PEP 8.
- Run:
python query_time.py. - Verify execution time is low (e.g., <0.01 seconds for small tables).
- Common Errors:
- OperationalError: Check database connection. Print connection parameters.
- IndentationError: Use 4 spaces (not tabs). Run
python -tt query_time.py.
Key Points:
- Measure execution time to compare query performance.
- Implication: Low execution times ensure fast analytics for Hijra Group.
48.4 Micro-Project: Optimized Task Database
Project Requirements
Optimize a PostgreSQL tasks database for fast task status queries, supporting Hijra Group’s task management for Sharia-compliant product sales. The project uses type-annotated Python with psycopg2, Pydantic for validation, and pytest for testing, producing a performance report comparing query times before and after optimization.
- Seed Database: Create a PostgreSQL
taskstable fromdata/tasks.db(Appendix 1). - Indexing: Add B-tree and composite indexes for
statusandtask_id. - Query Optimization: Rewrite queries to use indexes and validate with
EXPLAIN. - Performance Monitoring: Measure query execution times and generate a report.
- Type Safety: Use Pydantic for configuration validation and Pyright-verified type annotations.
- Testing: Test optimizations with
pytest, covering valid/invalid queries and Sharia-compliant audits. - Output: Save performance report to
data/performance_report.json. - Indentation: Use 4-space indentation per PEP 8, preferring spaces over tabs.
Sample Input Files
data/tasks.db (SQLite, Appendix 1):
CREATE TABLE tasks (
task_id TEXT,
description TEXT,
status TEXT
);
INSERT INTO tasks (task_id, description, status) VALUES
('T001', 'Process Halal Laptop sales', 'Completed'),
('T002', 'Validate Halal Mouse inventory', 'Pending'),
('T003', 'Update Halal Keyboard pricing', 'In Progress');data/config.yaml (Appendix 1):
db_host: 'localhost'
db_port: 5432
db_name: 'postgres'
db_user: 'postgres'
db_password: 'postgres'
required_fields:
- task_id
- description
- status
valid_statuses:
- Completed
- Pending
- In ProgressData Processing Flow
flowchart TD
A["tasks.db (SQLite)"] --> B["Load Config
config.yaml"]
B --> C["Seed PostgreSQL
tasks Table"]
C --> D["Create Indexes
B-tree, Composite"]
D --> E["Run Queries
Before Optimization"]
E --> F["Optimize Queries
EXPLAIN, Rewrite"]
F --> G["Measure Times
After Optimization"]
G --> H["Generate Report
performance_report.json"]
H --> I["Run Tests
pytest"]
classDef data fill:#f9f9f9,stroke:#333,stroke-width:2px
classDef process fill:#d0e0ff,stroke:#336,stroke-width:1px
classDef output fill:#ddffdd,stroke:#363,stroke-width:1px
class A,B,H data
class C,D,E,F,G,I process
class H outputAcceptance Criteria
- Go Criteria:
- Seeds PostgreSQL
taskstable fromdata/tasks.db. - Creates B-tree index on
statusand composite index onstatus,task_id. - Optimizes queries to use indexes, verified by
EXPLAIN. - Measures query times before/after optimization.
- Validates config with Pydantic.
- Exports report to
data/performance_report.json. - Passes
pytesttests for query performance, correctness, and audits. - Uses 4-space indentation per PEP 8.
- Seeds PostgreSQL
- No-Go Criteria:
- Fails to seed database or create indexes.
- Queries don’t use indexes (e.g., Seq Scan in
EXPLAIN). - Missing report or incorrect metrics.
- Lacks type annotations or fails Pyright checks.
- Fails tests or uses inconsistent indentation.
Common Pitfalls to Avoid
- Database Connection Errors:
- Problem:
OperationalErrordue to incorrect credentials. - Solution: Print
configto verifydb_host,db_port, etc. Ensure PostgreSQL is running.
- Problem:
- Index Not Used:
- Problem:
EXPLAINshows Seq Scan. - Solution: Avoid functions on indexed columns (e.g.,
UPPER(status)). Print query plan.
- Problem:
- Type Errors:
- Problem: Pydantic validation fails.
- Solution: Print
configand check against Pydantic model. Useprint(model.dict()).
- Test Failures:
- Problem:
pytestfails due to incorrect query results. - Solution: Print
cursor.fetchall()to debug query output.
- Problem:
- Outdated Statistics:
- Problem: Poor query plans due to outdated table statistics.
- Solution: Run
ANALYZE tasksto update statistics. Print query plan before/after.
- Large Dataset Performance:
- Problem: Slow queries on large datasets (e.g., 10,000 rows from Exercise 6).
- Solution: Verify index usage with
EXPLAINand runANALYZE. Print execution times. Debug by printing row counts withSELECT COUNT(*) FROM tasks.
- IndentationError:
- Problem: Mixed spaces/tabs.
- Solution: Use 4 spaces per PEP 8. Run
python -tt task_optimizer.py.
How This Differs from Production
In production, this solution would include:
- Scalability: Handle millions of rows with partitioning (Chapter 63).
- Monitoring: Use
pg_stat_statementsfor real-time metrics (Chapter 66). - Security: Encrypt connections with SSL (Chapter 65).
- CI/CD: Automate index creation in Helm charts (Chapter 64).
- Advanced Indexing: Use GIN or BRIN indexes for specific workloads (not covered).
Implementation
# File: de-onboarding/utils.py
from typing import Any, Dict # Import types for annotations
import yaml # Import PyYAML for parsing YAML configs
from pydantic import BaseModel, ValidationError # Import Pydantic for validation
def load_yaml(file_path: str) -> Dict[str, Any]:
"""Load YAML file into a dictionary."""
with open(file_path, "r") as file: # Open file in read mode
return yaml.safe_load(file) # Parse YAML content safely
class ConfigModel(BaseModel):
"""Pydantic model to validate database configuration."""
db_host: str # Database host address
db_port: int # Database port number
db_name: str # Database name
db_user: str # Database username
db_password: str # Database password
required_fields: list[str] # Required task table fields
valid_statuses: list[str] # Valid task statuses
def validate_config(config_path: str) -> ConfigModel:
"""Validate YAML config using Pydantic model."""
config = load_yaml(config_path) # Load YAML config
# Pydantic ensures type safety at runtime, preventing errors (Chapter 7)
try:
return ConfigModel(**config) # Validate and return Pydantic model
except ValidationError as e:
print(f"Config validation error: {e}") # Log validation errors
raise # Re-raise exception for handling# File: de-onboarding/task_optimizer.py
from typing import Any, Dict, List # Import types for annotations
import psycopg2 # Import psycopg2 for PostgreSQL connectivity
from psycopg2.extras import DictCursor # Import DictCursor for dictionary-like rows
import sqlite3 # Import sqlite3 for reading tasks.db
import json # Import json for exporting reports
import time # Import time for measuring execution
from utils import validate_config # Import config validation function
from pydantic import BaseModel # Import Pydantic for data modeling
class PerformanceMetrics(BaseModel):
"""Pydantic model to store query performance metrics."""
query: str # SQL query text
plan: List[Dict[str, Any]] # Query execution plan
execution_time: float # Query execution time in seconds
uses_index: bool # Whether query uses an index
def seed_postgres(config: BaseModel) -> None:
"""Seed PostgreSQL tasks table from SQLite tasks.db."""
# Connect to SQLite database to read tasks
sqlite_conn = sqlite3.connect("data/tasks.db") # Open tasks.db
sqlite_cursor = sqlite_conn.cursor() # Create SQLite cursor
sqlite_cursor.execute("SELECT task_id, description, status FROM tasks") # Fetch all tasks
tasks = sqlite_cursor.fetchall() # Store tasks as list of tuples
sqlite_conn.close() # Close SQLite connection
# Connect to PostgreSQL database
conn: psycopg2.connection = psycopg2.connect(
dbname=config.db_name, # Database name from config
user=config.db_user, # Username
password=config.db_password, # Password
host=config.db_host, # Host address
port=config.db_port # Port number
)
cursor: DictCursor = conn.cursor(cursor_factory=DictCursor) # Create cursor for dictionary results
# Create tasks table if it doesn't exist
cursor.execute("""
CREATE TABLE IF NOT EXISTS tasks (
task_id TEXT PRIMARY KEY, -- Unique task identifier
description TEXT, -- Task description
status TEXT -- Task status
)
""")
# Insert tasks into PostgreSQL, ignoring duplicates
cursor.executemany(
"INSERT INTO tasks (task_id, description, status) VALUES (%s, %s, %s) ON CONFLICT DO NOTHING",
tasks # List of task tuples
)
conn.commit() # Commit task insertions
# Create B-tree index on status for faster filtering
cursor.execute("CREATE INDEX IF NOT EXISTS idx_status ON tasks (status)")
# Create composite index on status and task_id for multi-condition queries
cursor.execute("CREATE INDEX IF NOT EXISTS idx_status_task_id ON tasks (status, task_id)")
conn.commit() # Commit index creations
cursor.close() # Close cursor
conn.close() # Close connection
print("Seeded PostgreSQL tasks table") # Confirm seeding
def run_query(config: BaseModel, query: str, params: tuple) -> tuple[List[Dict[str, Any]], List[Dict[str, Any]], float]:
"""Run query and return results, plan, and execution time."""
# Connect to PostgreSQL database
conn: psycopg2.connection = psycopg2.connect(
dbname=config.db_name, # Database name
user=config.db_user, # Username
password=config.db_password, # Password
host=config.db_host, # Host address
port=config.db_port # Port number
)
cursor: DictCursor = conn.cursor(cursor_factory=DictCursor) # Create cursor for dictionary results
# Get query execution plan using EXPLAIN
cursor.execute(f"EXPLAIN {query}", params) # Prefix query with EXPLAIN
plan: List[Dict[str, Any]] = [dict(row) for row in cursor.fetchall()] # Convert plan to dictionaries
# Measure query execution time
start_time = time.time() # Record start time
cursor.execute(query, params) # Execute query
results: List[Dict[str, Any]] = [dict(row) for row in cursor.fetchall()] # Convert results to dictionaries
execution_time = time.time() - start_time # Calculate duration
cursor.close() # Close cursor
conn.close() # Close connection
return results, plan, execution_time # Return results, plan, and time
def optimize_queries(config: BaseModel) -> Dict[str, List[PerformanceMetrics]]:
"""Run and optimize queries, returning performance metrics."""
metrics = {"before": [], "after": []} # Initialize metrics dictionary
# Run inefficient query (uses Seq Scan due to UPPER)
query_before = "SELECT * FROM tasks WHERE UPPER(status) = %s"
results_before, plan_before, time_before = run_query(config, query_before, ("COMPLETED",))
metrics["before"].append(PerformanceMetrics( # Store metrics
query=query_before, # Query text
plan=plan_before, # Execution plan
execution_time=time_before, # Execution time
uses_index="Index Scan" not in str(plan_before) # Check for index usage
))
# Run optimized query (uses Index Scan)
query_after = "SELECT * FROM tasks WHERE status = %s"
results_after, plan_after, time_after = run_query(config, query_after, ("Completed",))
metrics["after"].append(PerformanceMetrics( # Store metrics
query=query_after, # Query text
plan=plan_after, # Execution plan
execution_time=time_after, # Execution time
uses_index="Index Scan" in str(plan_after) # Check for index usage
))
# Display performance comparison
print(f"Before Optimization: {time_before:.4f}s, Plan: {plan_before}")
print(f"After Optimization: {time_after:.4f}s, Plan: {plan_after}")
return metrics # Return metrics dictionary
def export_report(metrics: Dict[str, List[PerformanceMetrics]], output_path: str) -> None:
"""Export performance metrics to JSON file."""
with open(output_path, "w") as file: # Open file in write mode
json.dump({k: [m.dict() for m in v] for k, v in metrics.items()}, file, indent=2) # Write JSON with indentation
print(f"Exported report to {output_path}") # Confirm export
def main() -> None:
"""Main function to optimize task database."""
config_path = "data/config.yaml" # Path to configuration file
output_path = "data/performance_report.json" # Path for performance report
config = validate_config(config_path) # Validate and load config
seed_postgres(config) # Seed PostgreSQL database
metrics = optimize_queries(config) # Optimize queries and collect metrics
export_report(metrics, output_path) # Export metrics to JSON
if __name__ == "__main__":
main() # Run main function# File: de-onboarding/tests/test_task_optimizer.py
# Test suite for task_optimizer.py, covering database seeding, query optimization,
# correctness, large datasets, Sharia-compliant audits, index creation failures,
# and empty datasets, ensuring robust testing per Chapter 9's principles.
from typing import Any, Dict, List # Import types for annotations
import pytest # Import pytest for testing
import psycopg2 # Import psycopg2 for PostgreSQL connectivity
from psycopg2.extras import DictCursor # Import DictCursor for dictionary-like rows
from task_optimizer import seed_postgres, run_query, optimize_queries, validate_config # Import project functions
from utils import ConfigModel # Import ConfigModel for validation
@pytest.fixture
def config() -> ConfigModel:
"""Fixture to provide validated configuration."""
return validate_config("data/config.yaml") # Load and validate config.yaml
@pytest.fixture
def db_connection(config: ConfigModel) -> psycopg2.connection:
"""Fixture to provide PostgreSQL connection."""
conn = psycopg2.connect( # Establish connection
dbname=config.db_name, # Database name
user=config.db_user, # Username
password=config.db_password, # Password
host=config.db_host, # Host address
port=config.db_port # Port number
)
yield conn # Provide connection to tests
conn.close() # Close connection after tests
def test_seed_postgres(config: ConfigModel, db_connection: psycopg2.connection) -> None:
"""Test database seeding with tasks from tasks.db."""
seed_postgres(config) # Seed PostgreSQL with tasks
cursor: DictCursor = db_connection.cursor(cursor_factory=DictCursor) # Create cursor
cursor.execute("SELECT COUNT(*) FROM tasks") # Count rows in tasks table
count: int = cursor.fetchone()[0] # Get row count
assert count >= 3, "Tasks table should have at least 3 rows" # Verify minimum rows
cursor.close() # Close cursor
def test_query_optimization(config: ConfigModel) -> None:
"""Test that query optimization improves performance."""
metrics = optimize_queries(config) # Run and optimize queries
assert len(metrics["before"]) == 1, "Should have one unoptimized query" # Verify unoptimized query count
assert len(metrics["after"]) == 1, "Should have one optimized query" # Verify optimized query count
assert metrics["after"][0].uses_index, "Optimized query should use index" # Verify index usage
assert metrics["before"][0].execution_time >= metrics["after"][0].execution_time, "Optimized query should be faster" # Verify performance improvement
def test_query_correctness(config: ConfigModel) -> None:
"""Test that query returns correct results."""
query = "SELECT * FROM tasks WHERE status = %s" # Query for Completed tasks
results, _, _ = run_query(config, query, ("Completed",)) # Execute query
assert len(results) == 1, "Should return one Completed task" # Verify result count
assert results[0]["status"] == "Completed", "Task status should be Completed" # Verify status
def test_large_dataset(config: ConfigModel, db_connection: psycopg2.connection) -> None:
"""Test seeding with larger dataset (from Exercise 6)."""
cursor: DictCursor = db_connection.cursor(cursor_factory=DictCursor) # Create cursor
cursor.execute("SELECT COUNT(*) FROM tasks") # Count initial rows
initial_count: int = cursor.fetchone()[0] # Store initial count
# Simulate large dataset seeding (1000 tasks for test)
tasks = [(f"T{i:04d}", f"Task {i} for Halal products", "Pending") for i in range(4, 1004)] # Generate tasks
cursor.executemany(
"INSERT INTO tasks (task_id, description, status) VALUES (%s, %s, %s) ON CONFLICT DO NOTHING",
tasks # Insert tasks
)
db_connection.commit() # Commit insertions
cursor.execute("SELECT COUNT(*) FROM tasks") # Count new rows
new_count: int = cursor.fetchone()[0] # Store new count
assert new_count >= initial_count + len(tasks), "Should add new tasks" # Verify task addition
cursor.close() # Close cursor
def test_audit_query(config: ConfigModel) -> None:
"""Test Sharia-compliant audit query (from Exercise 3)."""
query = "SELECT * FROM tasks WHERE description LIKE %s" # Query for Halal-related tasks
results, _, _ = run_query(config, query, ("Halal%",)) # Execute query
print(f"Audit query results: {results}") # Debug: display results
assert len(results) >= 1, "Should return at least one Halal-related task" # Verify result count
assert any("Halal" in r["description"] for r in results), "Tasks should include Halal-related descriptions" # Verify content
def test_index_creation_failure(config: ConfigModel) -> None:
"""Test index creation on non-existent table raises error."""
# Connect to PostgreSQL with config
conn: psycopg2.connection = psycopg2.connect(
dbname=config.db_name, # Database name
user=config.db_user, # Username
password=config.db_password, # Password
host=config.db_host, # Host address
port=config.db_port # Port number
)
cursor: DictCursor = conn.cursor(cursor_factory=DictCursor) # Create cursor
# Attempt to create index on non-existent table
with pytest.raises(psycopg2.errors.UndefinedTable): # Expect ProgrammingError
cursor.execute("CREATE INDEX idx_invalid ON nonexistent_table (column)") # Invalid query
conn.commit() # Attempt to commit
print("Index creation failed as expected") # Debug: confirm failure
cursor.close() # Close cursor
conn.close() # Close connection
def test_empty_dataset(config: ConfigModel, db_connection: psycopg2.connection) -> None:
"""Test query behavior on empty tasks table."""
cursor: DictCursor = db_connection.cursor(cursor_factory=DictCursor) # Create cursor
cursor.execute("TRUNCATE TABLE tasks") # Clear tasks table
db_connection.commit() # Commit truncation
query = "SELECT * FROM tasks WHERE status = %s" # Query for Completed tasks
results, _, _ = run_query(config, query, ("Completed",)) # Execute query
print(f"Empty dataset query results: {results}") # Debug: display results
assert len(results) == 0, "Should return zero rows for empty table" # Verify empty result
cursor.close() # Close cursorExpected Outputs
data/performance_report.json:
{
"before": [
{
"query": "SELECT * FROM tasks WHERE UPPER(status) = %s",
"plan": [{ "QUERY PLAN": "Seq Scan on tasks ..." }],
"execution_time": 0.0021,
"uses_index": false
}
],
"after": [
{
"query": "SELECT * FROM tasks WHERE status = %s",
"plan": [{ "QUERY PLAN": "Index Scan using idx_status on tasks ..." }],
"execution_time": 0.0009,
"uses_index": true
}
]
}data/ex7_tradeoffs.txt:
Indexing in PostgreSQL speeds up queries (e.g., filtering millions of tasks by status) with O(log n) lookup time but increases storage and slows INSERT/UPDATE operations due to index maintenance (O(log n)). For Hijra Group’s task management, indexes on status enable fast analytics for Sharia-compliant product tasks across large transaction volumes, but frequent task updates require careful index selection to balance performance.Console Output (abridged):
Seeded PostgreSQL tasks table
Before Optimization: 0.0021s, Plan: [{'QUERY PLAN': 'Seq Scan on tasks ...'}]
After Optimization: 0.0009s, Plan: [{'QUERY PLAN': 'Index Scan using idx_status on tasks ...'}]
Exported report to data/performance_report.json48.5 Practice Exercises
Exercise 1: Create a B-tree Index and Estimate Size for Halal Product Tasks
Write a function to create a B-tree index on description to support filtering tasks related to Halal products in Hijra Group’s task management system, and estimate its size using pg_relation_size, with 4-space indentation per PEP 8.
Expected Output:
Index idx_description created for Halal product tasks
Index Size: <size_in_bytes> bytesFollow-Along Instructions:
- Save as
de-onboarding/ex1_index.py. - Configure editor for 4-space indentation per PEP 8.
- Run:
python ex1_index.py. - How to Test:
- Verify index exists:
psql -U postgres -c "\d tasks". - Check size output is non-zero.
- Test with missing table: Should handle gracefully.
- Verify index exists:
Exercise 2: Analyze Query Plan with EXPLAIN ANALYZE
Write a function to analyze a query plan for a status filter using EXPLAIN and, optionally, EXPLAIN ANALYZE to compare estimated vs. actual costs, with 4-space indentation per PEP 8.
Expected Output:
EXPLAIN Plan: [{'QUERY PLAN': 'Index Scan using idx_status on tasks ...'}]
EXPLAIN ANALYZE Plan: [{'QUERY PLAN': 'Index Scan ... Actual time=0.123..0.456 rows=1 ...'}]Follow-Along Instructions:
- Save as
de-onboarding/ex2_explain.py. - Configure editor for 4-space indentation per PEP 8.
- Run:
python ex2_explain.py. - How to Test:
- Verify
EXPLAINshowsIndex Scan. - Verify
EXPLAIN ANALYZEshows actual times (optional task). - Test with unindexed query (e.g.,
UPPER(status)): Should showSeq Scan.
- Verify
Exercise 3: Optimize a Query for Sharia-Compliant Audit
Rewrite a query to use an index for auditing tasks related to Sharia-compliant Halal products, with 4-space indentation per PEP 8.
Sample Input:
query = "SELECT * FROM tasks WHERE UPPER(description) LIKE %s"Expected Output:
Optimized Query: SELECT * FROM tasks WHERE description LIKE %s
Plan: [{'QUERY PLAN': 'Index Scan ...'}]Follow-Along Instructions:
- Save as
de-onboarding/ex3_optimize.py. - Configure editor for 4-space indentation per PEP 8.
- Run:
python ex3_optimize.py. - How to Test:
- Verify optimized query uses index.
- Compare execution times.
- Test with
description LIKE 'Halal%'to ensure fintech relevance.
Exercise 4: Measure Query Performance
Write a function to measure query execution time, with 4-space indentation per PEP 8.
Expected Output:
Query Time: 0.0010 secondsFollow-Along Instructions:
- Save as
de-onboarding/ex4_time.py. - Configure editor for 4-space indentation per PEP 8.
- Run:
python ex4_time.py. - How to Test:
- Verify time is low (<0.01s).
- Test with complex query to compare.
Exercise 5: Debug a Slow Query with Statistics
Fix a slow query that doesn’t use an index and debug poor performance due to outdated statistics, with 4-space indentation per PEP 8.
Buggy Code:
cursor.execute("SELECT * FROM tasks WHERE UPPER(status) = %s", ("COMPLETED",))Expected Output:
Optimized Plan: [{'QUERY PLAN': 'Index Scan using idx_status on tasks ...'}]
Statistics Updated: ANALYZE completedFollow-Along Instructions:
- Save as
de-onboarding/ex5_debug.py. - Configure editor for 4-space indentation per PEP 8.
- Run:
python ex5_debug.py. - How to Test:
- Verify fixed query uses
Index Scan. - Verify
ANALYZEimproves plan. - Compare plans with
EXPLAIN.
- Verify fixed query uses
Exercise 6: Generate a Large Dataset (Optional)
Write a function to generate a large dataset (~10,000 rows, simulating millions of tasks in Hijra Group’s production systems) for the tasks table and measure query performance, with 4-space indentation per PEP 8. This is an optional advanced task to simulate Hijra Group’s transaction volumes.
Expected Output:
Generated 10000 tasks for Hijra Group’s task management
Query Time: 0.0050 secondsFollow-Along Instructions:
- Save as
de-onboarding/ex6_large_dataset.py. - Configure editor for 4-space indentation per PEP 8.
- Run:
python ex6_large_dataset.py. - How to Test:
- Verify 10,000 rows added:
psql -U postgres -c "SELECT COUNT(*) FROM tasks". - Compare query times with/without index.
- Test with invalid statuses: Should skip invalid rows.
- Verify 10,000 rows added:
Exercise 7: Explain Indexing Trade-offs (Conceptual)
Write a short explanation of indexing trade-offs in the context of Hijra Group’s task management system, saving to data/ex7_tradeoffs.txt, with 4-space indentation per PEP 8.
Expected Output (data/ex7_tradeoffs.txt):
Indexing in PostgreSQL speeds up queries (e.g., filtering millions of tasks by status) with O(log n) lookup time but increases storage and slows INSERT/UPDATE operations due to index maintenance (O(log n)). For Hijra Group’s task management, indexes on status enable fast analytics for Sharia-compliant product tasks across large transaction volumes, but frequent task updates require careful index selection to balance performance.Follow-Along Instructions:
- Save as
de-onboarding/ex7_tradeoffs.py. - Configure editor for 4-space indentation per PEP 8.
- Run:
python ex7_tradeoffs.py. - How to Test:
- Verify
data/ex7_tradeoffs.txtexists and contains explanation. - Check explanation addresses fintech context and large-scale volumes.
- Verify
48.6 Exercise Solutions
Solution to Exercise 1: Create a B-tree Index and Estimate Size for Halal Product Tasks
from typing import Any # Import Any for flexible type annotations
import psycopg2 # Import psycopg2 for PostgreSQL connectivity
from psycopg2.extras import DictCursor # Import DictCursor for dictionary-like rows
def create_description_index(config: Dict[str, Any]) -> None:
"""Create B-tree index on description and estimate size for Halal product tasks."""
# Connect to PostgreSQL with provided config
conn: psycopg2.connection = psycopg2.connect(**config)
cursor: DictCursor = conn.cursor(cursor_factory=DictCursor) # Create cursor
# Create B-tree index on description column to speed up Halal task filtering
cursor.execute("CREATE INDEX IF NOT EXISTS idx_description ON tasks (description)")
conn.commit() # Commit index creation
# Estimate index size using pg_relation_size
cursor.execute("SELECT pg_relation_size('idx_description')")
size: int = cursor.fetchone()[0] # Get size in bytes
print("Index idx_description created for Halal product tasks") # Confirm index creation
print(f"Index Size: {size} bytes") # Display index size
cursor.close() # Close cursor
conn.close() # Close connection
# Test configuration
config = {
"dbname": "postgres", # Database name
"user": "postgres", # Username
"password": "postgres", # Password
"host": "localhost", # Host address
"port": "5432" # Port number
}
create_description_index(config) # Run functionSolution to Exercise 2: Analyze Query Plan with EXPLAIN ANALYZE
from typing import Any, List # Import types for annotations
import psycopg2 # Import psycopg2 for PostgreSQL connectivity
from psycopg2.extras import DictCursor # Import DictCursor for dictionary-like rows
def analyze_query_plan(config: Dict[str, Any], status: str) -> tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
"""Analyze query plan with EXPLAIN and EXPLAIN ANALYZE."""
# Connect to PostgreSQL with config
conn: psycopg2.connection = psycopg2.connect(**config)
cursor: DictCursor = conn.cursor(cursor_factory=DictCursor) # Create cursor
# Run EXPLAIN to get estimated query plan
cursor.execute("EXPLAIN SELECT * FROM tasks WHERE status = %s", (status,))
explain_plan: List[Dict[str, Any]] = [dict(row) for row in cursor.fetchall()] # Convert plan to dictionaries
# Run EXPLAIN ANALYZE to get actual execution details (optional)
cursor.execute("EXPLAIN ANALYZE SELECT * FROM tasks WHERE status = %s", (status,))
analyze_plan: List[Dict[str, Any]] = [dict(row) for row in cursor.fetchall()] # Convert plan to dictionaries
cursor.close() # Close cursor
conn.close() # Close connection
print("EXPLAIN Plan:", explain_plan) # Display estimated plan
print("EXPLAIN ANALYZE Plan:", analyze_plan) # Display actual plan
return explain_plan, analyze_plan # Return both plans
# Test configuration
config = {
"dbname": "postgres", # Database name
"user": "postgres", # Username
"password": "postgres", # Password
"host": "localhost", # Host address
"port": "5432" # Port number
}
analyze_query_plan(config, "Completed") # Run functionSolution to Exercise 3: Optimize a Query for Sharia-Compliant Audit
from typing import Any, List # Import types for annotations
import psycopg2 # Import psycopg2 for PostgreSQL connectivity
from psycopg2.extras import DictCursor # Import DictCursor for dictionary-like rows
def optimize_query(config: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Optimize query for Halal product audit to use index."""
# Connect to PostgreSQL with config
conn: psycopg2.connection = psycopg2.connect(**config)
cursor: DictCursor = conn.cursor(cursor_factory=DictCursor) # Create cursor
optimized_query = "SELECT * FROM tasks WHERE description LIKE %s" # Optimized query for Halal tasks
# Run EXPLAIN to analyze query plan
cursor.execute("EXPLAIN " + optimized_query, ("Halal%",))
plan: List[Dict[str, Any]] = [dict(row) for row in cursor.fetchall()] # Convert plan to dictionaries
cursor.close() # Close cursor
conn.close() # Close connection
print("Optimized Query:", optimized_query) # Display query
print("Plan:", plan) # Display plan
return plan # Return query plan
# Test configuration
config = {
"dbname": "postgres", # Database name
"user": "postgres", # Username
"password": "postgres", # Password
"host": "localhost", # Host address
"port": "5432" # Port number
}
optimize_query(config) # Run functionSolution to Exercise 4: Measure Query Performance
from typing import Any, List # Import types for annotations
import psycopg2 # Import psycopg2 for PostgreSQL connectivity
from psycopg2.extras import DictCursor # Import DictCursor for dictionary-like rows
import time # Import time for measuring execution
def measure_query_time(config: Dict[str, Any], status: str) -> float:
"""Measure query execution time."""
# Connect to PostgreSQL with config
conn: psycopg2.connection = psycopg2.connect(**config)
cursor: DictCursor = conn.cursor(cursor_factory=DictCursor) # Create cursor
start_time = time.time() # Record start time
# Execute query to fetch tasks by status
cursor.execute("SELECT * FROM tasks WHERE status = %s", (status,))
results: List[Dict[str, Any]] = [dict(row) for row in cursor.fetchall()] # Convert results to dictionaries
execution_time = time.time() - start_time # Calculate duration
cursor.close() # Close cursor
conn.close() # Close connection
print(f"Query Time: {execution_time:.4f} seconds") # Display execution time
return execution_time # Return execution time
# Test configuration
config = {
"dbname": "postgres", # Database name
"user": "postgres", # Username
"password": "postgres", # Password
"host": "localhost", # Host address
"port": "5432" # Port number
}
measure_query_time(config, "Completed") # Run functionSolution to Exercise 5: Debug a Slow Query with Statistics
from typing import Any, List # Import types for annotations
import psycopg2 # Import psycopg2 for PostgreSQL connectivity
from psycopg2.extras import DictCursor # Import DictCursor for dictionary-like rows
def debug_slow_query(config: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Fix slow query and update statistics."""
# Connect to PostgreSQL with config
conn: psycopg2.connection = psycopg2.connect(**config)
cursor: DictCursor = conn.cursor(cursor_factory=DictCursor) # Create cursor
# Update table statistics to improve query planning
cursor.execute("ANALYZE tasks")
print("Statistics Updated: ANALYZE completed") # Confirm statistics update
# Analyze optimized query plan
cursor.execute("EXPLAIN SELECT * FROM tasks WHERE status = %s", ("Completed",))
plan: List[Dict[str, Any]] = [dict(row) for row in cursor.fetchall()] # Convert plan to dictionaries
cursor.close() # Close cursor
conn.close() # Close connection
print("Optimized Plan:", plan) # Display plan
return plan # Return query plan
# Test configuration
config = {
"dbname": "postgres", # Database name
"user": "postgres", # Username
"password": "postgres", # Password
"host": "localhost", # Host address
"port": "5432" # Port number
}
debug_slow_query(config) # Run functionSolution to Exercise 6: Generate a Large Dataset (Optional)
from typing import Any, List # Import types for annotations
import psycopg2 # Import psycopg2 for PostgreSQL connectivity
from psycopg2.extras import DictCursor # Import DictCursor for dictionary-like rows
import time # Import time for measuring execution
from random import choice # Import choice for random status selection
def generate_large_dataset(config: Dict[str, Any], num_rows: int = 10000) -> float:
"""Generate large dataset and measure query performance for Hijra Group’s task management."""
statuses = ["Completed", "Pending", "In Progress"] # Valid task statuses
# Generate tasks with unique IDs and Halal-related descriptions
tasks = [
(f"T{i:04d}", f"Task {i} for Halal products", choice(statuses))
for i in range(4, num_rows + 4)
]
# Connect to PostgreSQL with config
conn: psycopg2.connection = psycopg2.connect(**config)
cursor: DictCursor = conn.cursor(cursor_factory=DictCursor) # Create cursor
# Insert tasks, ignoring duplicates
cursor.executemany(
"INSERT INTO tasks (task_id, description, status) VALUES (%s, %s, %s) ON CONFLICT DO NOTHING",
tasks
)
conn.commit() # Commit insertions
print(f"Generated {num_rows} tasks for Hijra Group’s task management") # Confirm task generation
# Measure query performance on large dataset
start_time = time.time() # Record start time
cursor.execute("SELECT * FROM tasks WHERE status = %s", ("Pending",)) # Query tasks by status
results: List[Dict[str, Any]] = [dict(row) for row in cursor.fetchall()] # Convert results to dictionaries
execution_time = time.time() - start_time # Calculate duration
cursor.close() # Close cursor
conn.close() # Close connection
print(f"Query Time: {execution_time:.4f} seconds") # Display execution time
return execution_time # Return execution time
# Test configuration
config = {
"dbname": "postgres", # Database name
"user": "postgres", # Username
"password": "postgres", # Password
"host": "localhost", # Host address
"port": "5432" # Port number
}
generate_large_dataset(config) # Run functionSolution to Exercise 7: Explain Indexing Trade-offs (Conceptual)
def explain_tradeoffs(output_path: str) -> None:
"""Explain indexing trade-offs and save to file."""
# Define explanation for indexing trade-offs in Hijra Group’s context
explanation = (
"Indexing in PostgreSQL speeds up queries (e.g., filtering millions of tasks by status) with O(log n) lookup time "
"but increases storage and slows INSERT/UPDATE operations due to index maintenance (O(log n)). "
"For Hijra Group’s task management, indexes on status enable fast analytics for Sharia-compliant product tasks "
"across large transaction volumes, but frequent task updates require careful index selection to balance performance."
)
# Write explanation to file
with open(output_path, "w") as file: # Open file in write mode
file.write(explanation) # Write explanation
print(f"Saved explanation to {output_path}") # Confirm file creation
# Test
explain_tradeoffs("data/ex7_tradeoffs.txt") # Run function48.7 Chapter Summary and Connection to Chapter 49
In this chapter, you’ve mastered:
- Indexing: B-tree and composite indexes for O(log n) lookups, with size estimation via
pg_relation_sizeand considerations for maintenance overhead. - Query Optimization: Using
EXPLAINandEXPLAIN ANALYZE, rewriting queries, and updating statistics withANALYZEto handle large datasets. - Performance Monitoring: Measuring execution times and understanding wall-clock limitations, with insights into production tools like
pg_stat_statements. - Type-Safe Integration: Building robust, tested PostgreSQL pipelines with
psycopg2and Pydantic. - Testing: Validating optimizations with
pytest, including Sharia-compliant audit queries and edge cases.
The micro-project optimized a PostgreSQL tasks database, achieving faster queries through indexing and query rewriting, with a performance report saved to data/performance_report.json. Exercises reinforced fintech applications, such as auditing Halal product tasks, and explored larger datasets for realism. All code used 4-space indentation per PEP 8, ensuring maintainability. These skills prepare for scaling database operations in Kubernetes (Chapter 63) and securing pipelines (Chapter 65). Specifically, optimized PostgreSQL queries reduce latency in Django’s ORM, enabling efficient dashboards in Chapter 52, and lay the groundwork for BigQuery optimization in Chapter 49.
Connection to Chapter 49
Chapter 49 introduces BigQuery Advanced Optimization, building on this chapter:
- Query Optimization: Extends
EXPLAINto BigQuery’s query plans, leveragingEXPLAIN ANALYZEexperience. - Performance Monitoring: Applies time measurements to cloud analytics.
- Type Safety: Continues using Pydantic for configuration validation.
- Fintech Context: Optimizes large-scale transaction analytics for Hijra Group, maintaining PEP 8 compliance.