21 - Advanced PostgreSQL Querying
Complexity: Moderate (M)
21.0 Introduction: Why This Matters for Data Engineering
Advanced PostgreSQL querying is a cornerstone of data engineering at Hijra Group, enabling complex analytics on financial transaction data for Sharia-compliant fintech solutions. PostgreSQL’s advanced features, such as Common Table Expressions (CTEs), window functions, and JSONB querying, allow sophisticated data transformations within the database, reducing external processing and improving performance. For a dataset like transactions.csv with 1 million rows (~100MB), CTEs and window functions can reduce query execution time by 20–50% compared to subqueries, leveraging PostgreSQL’s query planner for efficient joins and aggregations. This chapter builds on Chapters 16 (PostgreSQL Fundamentals) and 17 (Python and PostgreSQL Integration), introducing advanced querying techniques to prepare for indexing and optimization in Chapter 22 and type-safe integration in Chapter 23.
This chapter uses type-annotated Python code verified by Pyright (introduced in Chapter 7) and includes pytest tests (introduced in Chapter 9) to ensure query reliability. All code adheres to PEP 8’s 4-space indentation, preferring spaces over tabs to avoid IndentationError. The micro-project processes data/transactions.csv to generate analytical reports, aligning with Hijra Group’s need for actionable insights from transaction data.
Data Engineering Workflow Context
The following diagram illustrates how advanced PostgreSQL querying fits into a data engineering pipeline:
flowchart TD
A["Raw Data
transactions.csv"] --> B["Python with psycopg2"]
B --> C{"PostgreSQL Database"}
C -->|Load Data| D["Transaction Table"]
C -->|Advanced Queries| E["CTEs, Window Functions, JSONB"]
E --> F["Analytical Metrics"]
F --> G["Output (JSON/CSV)"]
F --> H["Visualization"]
classDef data fill:#f9f9f9,stroke:#333,stroke-width:2px
classDef process fill:#d0e0ff,stroke:#336,stroke-width:1px
classDef storage fill:#ddffdd,stroke:#363,stroke-width:1px
class A,D,F,G data
class B,C,E process
class H storageBuilding On and Preparing For
- Building On:
- Chapter 12: SQL Fundamentals with SQLite (basic SELECT, JOIN).
- Chapter 16: PostgreSQL Fundamentals (basic queries,
psycopg2). - Chapter 17: Python and PostgreSQL Integration (programmatic queries, YAML configs).
- Chapter 19: Advanced SQL Querying with SQLite (joins, subqueries).
- Preparing For:
- Chapter 22: PostgreSQL Indexing and Optimization (query performance).
- Chapter 23: Type-Safe Database Integration (Pydantic validation).
- Chapter 24: Checkpoint 3B (consolidates database skills).
- Chapter 53: FastAPI (API integration with PostgreSQL).
What You’ll Learn
This chapter covers:
- Common Table Expressions (CTEs): Simplify complex queries.
- Window Functions: Compute running totals and rankings.
- JSONB Querying: Handle semi-structured data.
- Type-Safe Query Execution: Use
psycopg2with type annotations. - Testing Queries: Validate results with
pytest.
The micro-project builds a type-annotated query tool to analyze data/transactions.csv, producing metrics like daily sales trends and top products, exported to JSON and visualized with Matplotlib, all using 4-space indentation per PEP 8.
Follow-Along Tips:
- Create
de-onboarding/data/and populate withtransactions.csvandconfig.yamlper Appendix 1. - Install libraries:
pip install psycopg2-binary pandas matplotlib pyyaml pytest. - Set up PostgreSQL (version 15+ recommended) and create a database (
hijra_db). - Configure editor for 4-space indentation per PEP 8 (VS Code: “Editor: Tab Size” = 4, “Editor: Insert Spaces” = true, “Editor: Detect Indentation” = false).
- Use print statements (e.g.,
print(query)) to debug SQL queries. - Verify file paths with
ls data/(Unix/macOS) ordir data\(Windows). - Use UTF-8 encoding to avoid
UnicodeDecodeError.
21.1 Common Table Expressions (CTEs)
CTEs provide a way to define temporary result sets within a query, improving readability and modularity. This chapter focuses on non-recursive CTEs, which are executed once and materialized in memory, offering O(n) complexity for data access. They are more efficient than subqueries for complex joins due to PostgreSQL’s query planner optimizations, making them ideal for aggregating sales data in Hijra Group’s analytics.
21.1.1 Using CTEs for Sales Aggregation
Compute total sales by date using a CTE to filter valid transactions.
from typing import List, Dict
import psycopg2
from psycopg2.extras import RealDictCursor
# Type-annotated connection function
def get_db_connection(config: Dict[str, str]) -> psycopg2.extensions.connection:
"""Connect to PostgreSQL database."""
return psycopg2.connect(
dbname=config["dbname"],
user=config["user"],
password=config["password"],
host=config["host"],
cursor_factory=RealDictCursor
)
# Query using CTE
def query_sales_by_date(config: Dict[str, str]) -> List[Dict]:
"""Query total sales by date using CTE."""
query = """
WITH valid_transactions AS (
SELECT product, price, quantity, date
FROM transactions
WHERE product IS NOT NULL
AND price > 0
AND quantity > 0
)
SELECT date, SUM(price * quantity) as total_sales
FROM valid_transactions
GROUP BY date
ORDER BY date;
"""
print(f"Executing query:\n{query}") # Debug: print query for inspection
conn = get_db_connection(config)
cursor = conn.cursor()
cursor.execute(query)
results = cursor.fetchall()
cursor.close()
conn.close()
print(f"Query results: {results}") # Debug: print results
return results
# Example usage
config = {
"dbname": "hijra_db",
"user": "postgres",
"password": "password",
"host": "localhost"
}
results = query_sales_by_date(config)Follow-Along Instructions:
- Ensure PostgreSQL is running and
hijra_dbexists. - Create
transactionstable and loaddata/transactions.csv(see micro-project). - Save as
de-onboarding/cte_query.py. - Configure editor for 4-space indentation per PEP 8.
- Run:
python cte_query.py. - Verify output shows daily sales (e.g.,
{'date': '2023-10-01', 'total_sales': 1999.98}). - Common Errors:
- psycopg2.OperationalError: Check PostgreSQL connection details. Print
config. - psycopg2.ProgrammingError: Verify table/column names. Print
queryand test inpsql. - IndentationError: Use 4 spaces (not tabs). Run
python -tt cte_query.py.
- psycopg2.OperationalError: Check PostgreSQL connection details. Print
Key Points:
- Syntax:
WITH cte_name AS (SELECT ...) SELECT ... FROM cte_name. - Performance: O(n) for scanning n rows, optimized by PostgreSQL’s query planner for joins.
- Space Complexity: O(k) for k rows in CTE result.
- Implication: Simplifies complex analytics, e.g., daily sales aggregation for Hijra Group’s reporting.
21.2 Window Functions
Window functions compute values over a set of rows (window) without collapsing the result, ideal for running totals or rankings. They have O(n log n) complexity for sorting operations (e.g., RANK()), but PostgreSQL optimizes with index usage, which is explored in Chapter 22.
21.2.1 Running Total with Window Functions
Calculate a running total of sales by date.
from typing import List, Dict
import psycopg2
from psycopg2.extras import RealDictCursor
def query_running_total(config: Dict[str, str]) -> List[Dict]:
"""Query running total of sales by date."""
query = """
SELECT
date,
product,
price * quantity as sale_amount,
SUM(price * quantity) OVER (
ORDER BY date
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) as running_total
FROM transactions
WHERE product IS NOT NULL
AND price > 0
AND quantity > 0
ORDER BY date;
"""
print(f"Executing query:\n{query}") # Debug: print query
conn = get_db_connection(config)
cursor = conn.cursor()
cursor.execute(query)
results = cursor.fetchall()
cursor.close()
conn.close()
print(f"Query results: {results}") # Debug: print results
return resultsFollow-Along Instructions:
- Save as
de-onboarding/window_query.py. - Configure editor for 4-space indentation per PEP 8.
- Run:
python window_query.py. - Verify output shows running totals (e.g.,
running_total: 2499.83for the last row). - Common Errors:
- psycopg2.ProgrammingError: Check window function syntax. Print
queryand test inpsql. - IndentationError: Use 4 spaces. Run
python -tt window_query.py.
- psycopg2.ProgrammingError: Check window function syntax. Print
Key Points:
- Syntax:
SUM(...) OVER (PARTITION BY ... ORDER BY ...)defines the window. - Performance: O(n log n) for sorting, O(n) for computation.
- Space Complexity: O(n) for output rows.
- Implication: Enables time-series analytics, e.g., cumulative sales for Hijra Group’s dashboards.
21.3 JSONB Querying
JSONB stores semi-structured data, allowing flexible querying with operators like ->> (extract value) and @> (contains key-value pair). This is particularly useful for customer segmentation in Hijra Group’s Sharia-compliant fintech analytics, enabling profiling of customer transactions without rigid schemas. Querying JSONB has O(n) complexity without indexes, reducible to O(log n) with GIN indexes (covered in Chapter 22). Note: The metadata JSONB column used in examples is created during the micro-project’s database setup (see Section 21.4).
21.3.1 Querying JSONB Data
Query transactions with JSONB metadata (e.g., customer info).
from typing import List, Dict
import psycopg2
from psycopg2.extras import RealDictCursor
def query_jsonb_data(config: Dict[str, str]) -> List[Dict]:
"""Query transactions with JSONB metadata."""
query = """
SELECT
product,
price,
quantity,
metadata->>'customer_id' as customer_id
FROM transactions
WHERE metadata @> '{"customer_id": "C001"}'::jsonb
AND price > 0;
"""
print(f"Executing query:\n{query}") # Debug: print query
conn = get_db_connection(config)
cursor = conn.cursor()
cursor.execute(query)
results = cursor.fetchall()
cursor.close()
conn.close()
print(f"Query results: {results}") # Debug: print results
return resultsFollow-Along Instructions:
- Ensure
transactionstable has ametadataJSONB column (created in micro-project). - Save as
de-onboarding/jsonb_query.py. - Configure editor for 4-space indentation per PEP 8.
- Run:
python jsonb_query.py. - Verify output shows transactions for
customer_id: C001. - Common Errors:
- psycopg2.DataError: Check JSONB syntax. Print
queryand inspectmetadatacolumn inpsql. - IndentationError: Use 4 spaces. Run
python -tt jsonb_query.py.
- psycopg2.DataError: Check JSONB syntax. Print
Key Points:
- Operators:
->>extracts values,@>checks for key-value pairs. - Performance: O(n) without indexes, O(log n) with GIN indexes.
- Space Complexity: O(n) for output rows.
- Implication: Supports flexible analytics, e.g., customer segmentation for Sharia-compliant profiling.
21.4 Micro-Project: Transaction Data Query Tool
Project Requirements
Build a type-annotated PostgreSQL query tool to analyze data/transactions.csv, generating metrics like daily sales, running totals, and customer-specific sales, exported to JSON and visualized with Matplotlib. The tool produces a daily sales plot to support stakeholder reporting for Hijra Group’s financial oversight, ensuring compliance with Islamic Financial Services Board (IFSB) standards by validating data integrity. The utils.py module, reused from Chapter 3, encapsulates reusable validation functions (e.g., is_numeric), which could be encapsulated as methods in a Validator class in Chapter 5’s Object-Oriented Programming, preparing for modular design.
- Load Data: Import
data/transactions.csvinto a PostgreSQLtransactionstable with ametadataJSONB column. - Read Config: Parse
data/config.yamlfor database settings. - Execute Queries:
- CTE: Daily sales by product.
- Window Function: Running total by date.
- JSONB: Customer-specific sales.
- Export Results: Save to
data/transaction_results.json. - Visualize: Plot daily sales to
data/daily_sales.png. - Test: Use
pytestto validate queries, including edge cases. - Use 4-space indentation per PEP 8, preferring spaces over tabs.
Sample Input Files
data/transactions.csv (from Appendix 1):
transaction_id,product,price,quantity,date
T001,Halal Laptop,999.99,2,2023-10-01
T002,Halal Mouse,24.99,10,2023-10-02
T003,Halal Keyboard,49.99,5,2023-10-03
T004,,29.99,3,2023-10-04
T005,Monitor,199.99,2,2023-10-05data/config.yaml (from Appendix 1):
min_price: 10.0
max_quantity: 100
required_fields:
- product
- price
- quantity
product_prefix: 'Halal'
max_decimals: 2
dbname: hijra_db
user: postgres
password: password
host: localhostData Processing Flow
flowchart TD
A["Input CSV
transactions.csv"] --> B["Load CSV
pandas/psycopg2"]
B --> C["PostgreSQL Table"]
C --> D["Read YAML
config.yaml"]
D --> E["Execute Queries
CTE/Window/JSONB"]
E -->|Invalid| F["Log Warning"]
E -->|Valid| G["Compute Metrics"]
G --> H["Export JSON
transaction_results.json"]
G --> I["Generate Plot
daily_sales.png"]
F --> J["End Processing"]
H --> J
I --> J
classDef data fill:#f9f9f9,stroke:#333,stroke-width:2px
classDef process fill:#d0e0ff,stroke:#336,stroke-width:1px
classDef error fill:#ffdddd,stroke:#933,stroke-width:1px
classDef endpoint fill:#ddffdd,stroke:#363,stroke-width:1px
class A,C,G,H data
class B,D,E,I process
class F error
class J endpointAcceptance Criteria
- Go Criteria:
- Loads
transactions.csvinto PostgreSQL with ametadataJSONB column. - Reads
config.yamlfor database settings. - Executes CTE, window function, and JSONB queries correctly.
- Exports results to
data/transaction_results.json. - Saves daily sales plot to
data/daily_sales.pngwithdpi=100. - Logs query execution and errors.
- Includes type annotations verified by Pyright.
- Passes
pytesttests for valid and edge-case scenarios. - Uses 4-space indentation per PEP 8.
- Loads
- No-Go Criteria:
- Fails to load data or connect to PostgreSQL.
- Incorrect query results or missing outputs.
- Lacks type annotations or tests.
- Inconsistent indentation or tab/space mixing.
Common Pitfalls to Avoid
- Connection Errors:
- Problem:
psycopg2.OperationalErrordue to wrong credentials. - Solution: Print
configand verify PostgreSQL settings inpsql.
- Problem:
- Query Syntax Errors:
- Problem:
psycopg2.ProgrammingErrorfrom invalid SQL. - Solution: Print
queryand test inpsql. UseEXPLAINinpsqlto inspect query plans for performance issues.
- Problem:
- JSONB Data Issues:
- Problem:
psycopg2.DataErrorfrom invalid JSONB. - Solution: Validate JSONB data. In
psql, runSELECT transaction_id, metadata FROM transactions WHERE metadata IS NOT NULL;to identify invalid JSONB records.
- Problem:
- Empty Results:
- Problem: Queries return no data.
- Solution: Check table data with
SELECT COUNT(*) FROM transactions;inpsql.
- Plotting Issues:
- Problem: Plot not saved.
- Solution: Check permissions with
ls -l data/(Unix/macOS) ordir data\(Windows). Printos.path.exists(plot_path). See “Debugging Tip” in the implementation section for plot inspection.
- IndentationError:
- Problem: Mixed spaces/tabs.
- Solution: Use 4 spaces. Run
python -tt query_tool.py.
Implementation
Debugging Tip for Visualization
When debugging the plot_daily_sales function, temporarily add plt.show() before plt.close() to inspect the plot interactively, then remove it to comply with Pyodide’s non-interactive requirements. For example:
plt.savefig(plot_path, dpi=100)
plt.show() # Temporary for debugging
plt.close()Remove plt.show() after debugging to ensure compatibility.
# File: de-onboarding/utils.py (reused from Chapter 3)
from typing import Union, Dict
def is_numeric(s: str, max_decimals: int = 2) -> bool:
"""Check if string is a decimal number with up to max_decimals."""
parts = s.split(".")
if len(parts) != 2 or not parts[0].replace("-", "").isdigit() or not parts[1].isdigit():
return False
return len(parts[1]) <= max_decimals
def clean_string(s: str) -> str:
"""Strip whitespace from string."""
return s.strip()
def is_numeric_value(x: Union[int, float, str]) -> bool:
"""Check if value is numeric."""
return isinstance(x, (int, float))
def has_valid_decimals(x: Union[int, float, str], max_decimals: int) -> bool:
"""Check if value has valid decimal places."""
return is_numeric(str(x), max_decimals)
def apply_valid_decimals(x: Union[int, float, str], max_decimals: int) -> bool:
"""Apply has_valid_decimals to a value."""
return has_valid_decimals(x, max_decimals)
def is_integer(x: Union[int, float, str]) -> bool:
"""Check if value is an integer when converted to string."""
return str(x).isdigit()# File: de-onboarding/query_tool.py
from typing import List, Dict, Any
import pandas as pd
import yaml
import psycopg2
from psycopg2.extras import RealDictCursor
import matplotlib.pyplot as plt
import json
import os
def read_config(config_path: str) -> Dict[str, Any]:
"""Read YAML configuration."""
print(f"Opening config: {config_path}") # Debug: print path
with open(config_path, "r") as file:
config = yaml.safe_load(file)
print(f"Loaded config: {config}") # Debug: print config
return config
def setup_database(config: Dict[str, Any], csv_path: str) -> None:
"""Set up transactions table and load CSV data."""
conn = psycopg2.connect(
dbname=config["dbname"],
user=config["user"],
password=config["password"],
host=config["host"]
)
cursor = conn.cursor()
# Create table with JSONB column
cursor.execute("""
DROP TABLE IF EXISTS transactions;
CREATE TABLE transactions (
transaction_id TEXT PRIMARY KEY,
product TEXT,
price REAL,
quantity INTEGER,
date DATE,
metadata JSONB
);
""")
# Load CSV and add sample JSONB data
df = pd.read_csv(csv_path)
for _, row in df.iterrows():
metadata = {
"customer_id": f"C{row['transaction_id'][-3:]}",
"is_high_value": row["transaction_id"] == "T001" # For Exercise 3
}
cursor.execute("""
INSERT INTO transactions (transaction_id, product, price, quantity, date, metadata)
VALUES (%s, %s, %s, %s, %s, %s);
""", (
row["transaction_id"],
row["product"],
row["price"],
row["quantity"],
row["date"],
json.dumps(metadata)
))
conn.commit()
cursor.close()
conn.close()
print(f"Loaded {len(df)} rows into transactions table")
def query_daily_sales(config: Dict[str, Any]) -> List[Dict]:
"""Query daily sales by product using CTE."""
query = """
WITH valid_transactions AS (
SELECT product, price, quantity, date
FROM transactions
WHERE product IS NOT NULL
AND price > 0
AND quantity > 0
AND product LIKE %s
)
SELECT date, product, SUM(price * quantity) as total_sales
FROM valid_transactions
GROUP BY date, product
ORDER BY date, product;
"""
conn = psycopg2.connect(
dbname=config["dbname"],
user=config["user"],
password=config["password"],
host=config["host"],
cursor_factory=RealDictCursor
)
cursor = conn.cursor()
cursor.execute(query, (f"{config['product_prefix']}%",))
results = cursor.fetchall()
cursor.close()
conn.close()
print(f"Daily sales results: {results}") # Debug: print results
return results
def query_running_total(config: Dict[str, Any]) -> List[Dict]:
"""Query running total of sales by date."""
query = """
SELECT
date,
product,
price * quantity as sale_amount,
SUM(price * quantity) OVER (
ORDER BY date
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) as running_total
FROM transactions
WHERE product IS NOT NULL
AND price > 0
AND quantity > 0
AND product LIKE %s
ORDER BY date;
"""
conn = psycopg2.connect(
dbname=config["dbname"],
user=config["user"],
password=config["password"],
host=config["host"],
cursor_factory=RealDictCursor
)
cursor = conn.cursor()
cursor.execute(query, (f"{config['product_prefix']}%",))
results = cursor.fetchall()
cursor.close()
conn.close()
print(f"Running total results: {results}") # Debug: print results
return results
def query_customer_sales(config: Dict[str, Any]) -> List[Dict]:
"""Query sales for a specific customer using JSONB."""
query = """
SELECT
product,
price,
quantity,
metadata->>'customer_id' as customer_id
FROM transactions
WHERE metadata @> %s
AND price > 0
AND product IS NOT NULL;
"""
conn = psycopg2.connect(
dbname=config["dbname"],
user=config["user"],
password=config["password"],
host=config["host"],
cursor_factory=RealDictCursor
)
cursor = conn.cursor()
cursor.execute(query, (json.dumps({"customer_id": "C001"}),))
results = cursor.fetchall()
cursor.close()
conn.close()
print(f"Customer sales results: {results}") # Debug: print results
return results
def export_results(results: Dict[str, List[Dict]], json_path: str) -> None:
"""Export results to JSON."""
print(f"Writing to: {json_path}") # Debug: print path
with open(json_path, "w") as file:
json.dump(results, file, indent=2, default=str)
print(f"Exported results to {json_path}")
def plot_daily_sales(daily_sales: List[Dict], plot_path: str) -> None:
"""Generate daily sales plot."""
if not daily_sales:
print("No data to plot")
return
df = pd.DataFrame(daily_sales)
df["date"] = pd.to_datetime(df["date"])
df = df.groupby("date")["total_sales"].sum().reset_index()
plt.figure(figsize=(10, 6))
plt.plot(df["date"], df["total_sales"], marker="o")
plt.title("Daily Sales Trend")
plt.xlabel("Date")
plt.ylabel("Total Sales ($)")
plt.grid(True)
plt.tight_layout()
plt.savefig(plot_path, dpi=100)
plt.close()
print(f"Plot saved to {plot_path}")
print(f"File exists: {os.path.exists(plot_path)}")
def main() -> None:
"""Main function to run query tool."""
csv_path = "data/transactions.csv"
config_path = "data/config.yaml"
json_path = "data/transaction_results.json"
plot_path = "data/daily_sales.png"
config = read_config(config_path)
setup_database(config, csv_path)
daily_sales = query_daily_sales(config)
running_total = query_running_total(config)
customer_sales = query_customer_sales(config)
results = {
"daily_sales": daily_sales,
"running_total": running_total,
"customer_sales": customer_sales
}
export_results(results, json_path)
plot_daily_sales(daily_sales, plot_path)
print("\nTransaction Report:")
print(f"Daily Sales Records: {len(daily_sales)}")
print(f"Running Total Records: {len(running_total)}")
print(f"Customer Sales Records: {len(customer_sales)}")
if __name__ == "__main__":
main()# File: de-onboarding/test_query_tool.py
import pytest
from query_tool import read_config, setup_database, query_daily_sales, query_running_total, query_customer_sales
import psycopg2
from psycopg2.extras import RealDictCursor
@pytest.fixture
def config():
"""Fixture for config."""
return read_config("data/config.yaml")
@pytest.fixture
def setup_db(config):
"""Fixture to set up database."""
setup_database(config, "data/transactions.csv")
return config
@pytest.fixture
def empty_db(config):
"""Fixture to set up empty database."""
conn = psycopg2.connect(
dbname=config["dbname"],
user=config["user"],
password=config["password"],
host=config["host"]
)
cursor = conn.cursor()
cursor.execute("""
DROP TABLE IF EXISTS transactions;
CREATE TABLE transactions (
transaction_id TEXT PRIMARY KEY,
product TEXT,
price REAL,
quantity INTEGER,
date DATE,
metadata JSONB
);
""")
conn.commit()
cursor.close()
conn.close()
return config
def test_daily_sales(setup_db):
"""Test daily sales query."""
results = query_daily_sales(setup_db)
assert len(results) == 3 # 3 valid Halal products
assert all("date" in r and "total_sales" in r for r in results)
assert any(r["total_sales"] == 1999.98 for r in results) # Halal Laptop
def test_running_total(setup_db):
"""Test running total query."""
results = query_running_total(setup_db)
assert len(results) == 3 # 3 valid Halal products
assert all("running_total" in r for r in results)
assert results[-1]["running_total"] == 2499.83 # Total of valid sales
def test_customer_sales(setup_db):
"""Test customer sales query."""
results = query_customer_sales(setup_db)
assert len(results) == 1
assert results[0]["customer_id"] == "C001"
def test_empty_table(empty_db):
"""Test daily sales with empty table."""
results = query_daily_sales(empty_db)
print("Empty table returned:", results) # Debug: print results
assert results == [] # Explicitly check empty list
def test_invalid_jsonb(setup_db):
"""Test query with invalid JSONB data."""
conn = psycopg2.connect(
dbname=setup_db["dbname"],
user=setup_db["user"],
password=setup_db["password"],
host=setup_db["host"]
)
cursor = conn.cursor()
cursor.execute("""
INSERT INTO transactions (transaction_id, product, price, quantity, date, metadata)
VALUES (%s, %s, %s, %s, %s, %s);
""", ("T006", "Halal Monitor", 299.99, 1, "2023-10-06", "{invalid}"))
conn.commit()
cursor.close()
conn.close()
results = query_customer_sales(setup_db)
print("Invalid JSONB filtered:", results) # Debug: print results
assert len(results) == 1 # Only valid JSONB records returned
assert results[0]["customer_id"] == "C001" # No exceptions raisedExpected Outputs
data/transaction_results.json:
{
"daily_sales": [
{ "date": "2023-10-01", "product": "Halal Laptop", "total_sales": 1999.98 },
{ "date": "2023-10-02", "product": "Halal Mouse", "total_sales": 249.9 },
{ "date": "2023-10-03", "product": "Halal Keyboard", "total_sales": 249.95 }
],
"running_total": [
{
"date": "2023-10-01",
"product": "Halal Laptop",
"sale_amount": 1999.98,
"running_total": 1999.98
},
{
"date": "2023-10-02",
"product": "Halal Mouse",
"sale_amount": 249.9,
"running_total": 2249.88
},
{
"date": "2023-10-03",
"product": "Halal Keyboard",
"sale_amount": 249.95,
"running_total": 2499.83
}
],
"customer_sales": [
{
"product": "Halal Laptop",
"price": 999.99,
"quantity": 2,
"customer_id": "C001"
}
]
}data/daily_sales.png: Line plot of daily sales totals with dpi=100.
Console Output (abridged):
Opening config: data/config.yaml
Loaded config: {...}
Loaded 5 rows into transactions table
Daily sales results: [{'date': '2023-10-01', 'product': 'Halal Laptop', 'total_sales': 1999.98}, ...]
Running total results: [{'date': '2023-10-01', 'product': 'Halal Laptop', 'sale_amount': 1999.98, 'running_total': 1999.98}, ...]
Customer sales results: [{'product': 'Halal Laptop', 'price': 999.99, 'quantity': 2, 'customer_id': 'C001'}]
Writing to: data/transaction_results.json
Exported results to data/transaction_results.json
Plot saved to data/daily_sales.png
File exists: True
Transaction Report:
Daily Sales Records: 3
Running Total Records: 3
Customer Sales Records: 1How to Run and Test
Setup:
- Create
de-onboarding/data/and populate withtransactions.csv,config.yamlper Appendix 1. - Install:
pip install psycopg2-binary pandas matplotlib pyyaml pytest. - Set up PostgreSQL: Create
hijra_dband updateconfig.yamlwith credentials. - Save
utils.py,query_tool.py,test_query_tool.py. - Configure editor for 4-space indentation per PEP 8.
- Create
Run:
- Run:
python query_tool.py. - Outputs:
data/transaction_results.json,data/daily_sales.png.
- Run:
Test:
- Run:
pytest test_query_tool.py -v. - Verify all tests pass, including
test_empty_tableandtest_invalid_jsonbwith debug output.
- Run:
21.5 Practice Exercises
Exercise 1: CTE for Product Sales
Write a type-annotated function to query total sales by product using a CTE.
Expected Output:
[{'product': 'Halal Laptop', 'total_sales': 1999.98}, ...]Follow-Along Instructions:
- Save as
de-onboarding/ex1_cte.py. - Configure editor for 4-space indentation per PEP 8.
- Run:
python ex1_cte.py. - How to Test:
- Add:
print(query_product_sales(config)). - Verify output includes
{'product': 'Halal Laptop', 'total_sales': 1999.98}. - Test with empty table: Should return
[].
- Add:
Solution:
from typing import List, Dict
import psycopg2
from psycopg2.extras import RealDictCursor
def query_product_sales(config: Dict[str, Any]) -> List[Dict]:
"""Query total sales by product using CTE."""
query = """
WITH valid_transactions AS (
SELECT product, price, quantity
FROM transactions
WHERE product LIKE %s
AND price > 0
AND quantity > 0
)
SELECT product, SUM(price * quantity) as total_sales
FROM valid_transactions
GROUP BY product
ORDER BY total_sales DESC;
"""
conn = psycopg2.connect(
dbname=config["dbname"],
user=config["user"],
password=config["password"],
host=config["host"],
cursor_factory=RealDictCursor
)
cursor = conn.cursor()
cursor.execute(query, (f"{config['product_prefix']}%",))
results = cursor.fetchall()
cursor.close()
conn.close()
return resultsExercise 2: Window Function for Rankings
Write a type-annotated function to rank products by sales using a window function.
Expected Output:
[{'product': 'Halal Laptop', 'sale_amount': 1999.98, 'sales_rank': 1}, ...]Follow-Along Instructions:
- Save as
de-onboarding/ex2_rankings.py. - Configure editor for 4-space indentation per PEP 8.
- Run:
python ex2_rankings.py. - How to Test:
- Add:
print(query_product_rankings(config)). - Verify output includes
{'product': 'Halal Laptop', 'sale_amount': 1999.98, 'sales_rank': 1}. - Test with empty table: Should return
[].
- Add:
Solution:
from typing import List, Dict
import psycopg2
from psycopg2.extras import RealDictCursor
def query_product_rankings(config: Dict[str, Any]) -> List[Dict]:
"""Query product sales rankings."""
query = """
SELECT
product,
price * quantity as sale_amount,
RANK() OVER (
ORDER BY price * quantity DESC
) as sales_rank
FROM transactions
WHERE product LIKE %s
AND price > 0
AND quantity > 0
ORDER BY sales_rank;
"""
conn = psycopg2.connect(
dbname=config["dbname"],
user=config["user"],
password=config["password"],
host=config["host"],
cursor_factory=RealDictCursor
)
cursor = conn.cursor()
cursor.execute(query, (f"{config['product_prefix']}%",))
results = cursor.fetchall()
cursor.close()
conn.close()
return resultsExercise 3: JSONB Query and Conceptual Analysis
Write a type-annotated function to query transactions with metadata indicating high-value customers (e.g., {"is_high_value": true}) and explain when to use JSONB vs. normalized tables, saving the explanation to de-onboarding/ex3_concepts.txt. Use the provided setup_ex3_database function to seed the necessary data.
Expected Output:
[{'product': 'Halal Laptop', 'customer_id': 'C001', 'is_high_value': 'true'}, ...]
Explanation saved to de-onboarding/ex3_concepts.txtFollow-Along Instructions:
- Save as
de-onboarding/ex3_jsonb.py. - Configure editor for 4-space indentation per PEP 8.
- Run:
python ex3_jsonb.py. - How to Test:
- Add:
print(query_high_value_customers(config)). - Verify output includes
{'product': 'Halal Laptop', 'is_high_value': 'true'}. - Check
ex3_concepts.txtfor explanation. - Test with invalid JSONB: Should exclude invalid records.
- Add:
Solution:
from typing import List, Dict
import psycopg2
from psycopg2.extras import RealDictCursor
import json
def setup_ex3_database(config: Dict[str, Any]) -> None:
"""Set up transactions table for Exercise 3 with high-value customer data."""
conn = psycopg2.connect(
dbname=config["dbname"],
user=config["user"],
password=config["password"],
host=config["host"]
)
cursor = conn.cursor()
# Create table with JSONB column
cursor.execute("""
DROP TABLE IF EXISTS transactions;
CREATE TABLE transactions (
transaction_id TEXT PRIMARY KEY,
product TEXT,
price REAL,
quantity INTEGER,
date DATE,
metadata JSONB
);
""")
# Insert sample high-value customer data
metadata = {"customer_id": "C001", "is_high_value": True}
cursor.execute("""
INSERT INTO transactions (transaction_id, product, price, quantity, date, metadata)
VALUES (%s, %s, %s, %s, %s, %s);
""", ("T001", "Halal Laptop", 999.99, 2, "2023-10-01", json.dumps(metadata)))
conn.commit()
cursor.close()
conn.close()
print("Loaded high-value customer data for Exercise 3")
def query_high_value_customers(config: Dict[str, Any]) -> List[Dict]:
"""Query transactions for high-value customers using JSONB."""
setup_ex3_database(config) # Seed data for exercise
query = """
SELECT
product,
metadata->>'customer_id' as customer_id,
metadata->>'is_high_value' as is_high_value
FROM transactions
WHERE metadata @> '{"is_high_value": true}'::jsonb
AND price > 0
AND product IS NOT NULL;
"""
conn = psycopg2.connect(
dbname=config["dbname"],
user=config["user"],
password=config["password"],
host=config["host"],
cursor_factory=RealDictCursor
)
cursor = conn.cursor()
cursor.execute(query)
results = cursor.fetchall()
cursor.close()
conn.close()
# Write conceptual explanation
explanation = """
JSONB is ideal for flexible, semi-structured data like customer metadata in Hijra Group’s analytics, enabling dynamic queries. Normalized tables are better for structured, performance-critical data like product prices, offering faster queries with indexes. Use JSONB for infrequent, flexible queries; use normalized tables for frequent, structured access.
"""
with open("de-onboarding/ex3_concepts.txt", "w") as file:
file.write(explanation)
print(f"Explanation saved to de-onboarding/ex3_concepts.txt")
return resultsExercise 4: Visualization
Write a type-annotated function to plot running totals from a window function query.
Expected Output:
Plot saved to data/running_total.pngFollow-Along Instructions:
- Save as
de-onboarding/ex4_plot.py. - Configure editor for 4-space indentation per PEP 8.
- Run:
python ex4_plot.py. - How to Test:
- Verify
data/running_total.pngexists with a line plot atdpi=100. - Test with empty data: Should print “No data to plot”.
- Verify
Solution:
from typing import List, Dict
import matplotlib.pyplot as plt
import pandas as pd
def plot_running_total(running_total: List[Dict], plot_path: str) -> None:
"""Plot running total of sales."""
if not running_total:
print("No data to plot")
return
df = pd.DataFrame(running_total)
df["date"] = pd.to_datetime(df["date"])
plt.figure(figsize=(10, 6))
plt.plot(df["date"], df["running_total"], marker="o")
plt.title("Running Total Sales")
plt.xlabel("Date")
plt.ylabel("Running Total ($)")
plt.grid(True)
plt.tight_layout()
plt.savefig(plot_path, dpi=100)
plt.close()
print(f"Plot saved to {plot_path}")Exercise 5: Debug a CTE Bug
Fix this buggy CTE query that groups by the wrong column.
Buggy Code:
def query_buggy_sales(config: Dict[str, Any]) -> List[Dict]:
query = """
WITH valid_transactions AS (
SELECT product, price, quantity, date
FROM transactions
WHERE product IS NOT NULL
)
SELECT date, SUM(price * quantity) as total_sales
FROM valid_transactions
GROUP BY price; # Bug: Wrong column
"""
conn = psycopg2.connect(
dbname=config["dbname"],
user=config["user"],
password=config["password"],
host=config["host"],
cursor_factory=RealDictCursor
)
cursor = conn.cursor()
cursor.execute(query)
results = cursor.fetchall()
cursor.close()
conn.close()
return resultsExpected Output:
[{'date': '2023-10-01', 'total_sales': 1999.98}, ...]Follow-Along Instructions:
- Save as
de-onboarding/ex5_debug.py. - Configure editor for 4-space indentation per PEP 8.
- Run buggy code:
python ex5_debug.pyto see incorrect output. - Fix and re-run.
- How to Test:
- Verify output includes
{'date': '2023-10-01', 'total_sales': 1999.98}. - Test with empty table: Should return
[].
- Verify output includes
Solution:
from typing import List, Dict
import psycopg2
from psycopg2.extras import RealDictCursor
def query_fixed_sales(config: Dict[str, Any]) -> List[Dict]:
"""Query total sales by date using CTE."""
query = """
WITH valid_transactions AS (
SELECT product, price, quantity, date
FROM transactions
WHERE product IS NOT NULL
AND price > 0
AND quantity > 0
)
SELECT date, SUM(price * quantity) as total_sales
FROM valid_transactions
GROUP BY date
ORDER BY date;
"""
conn = psycopg2.connect(
dbname=config["dbname"],
user=config["user"],
password=config["password"],
host=config["host"],
cursor_factory=RealDictCursor
)
cursor = conn.cursor()
cursor.execute(query)
results = cursor.fetchall()
cursor.close()
conn.close()
return resultsExplanation:
- Bug: Grouping by
priceinstead ofdateproduced incorrect aggregates, as prices are not unique identifiers. - Fix: Group by
dateand add validation for positive price/quantity to ensure data integrity.
Exercise 6: Conceptual Question on CTEs vs. Subqueries
Answer why Hijra Group might prefer CTEs over subqueries for daily sales reports, saving the answer to de-onboarding/ex6_concepts.txt.
Expected Output:
Answer saved to de-onboarding/ex6_concepts.txtFollow-Along Instructions:
- Save as
de-onboarding/ex6_concepts.py. - Configure editor for 4-space indentation per PEP 8.
- Run:
python ex6_concepts.py. - How to Test:
- Check
ex6_concepts.txtfor a clear, concise answer (e.g., “CTEs improve readability and modularity for complex daily sales reports, making maintenance easier for Hijra Group’s analytics team.”).
- Check
Solution:
def explain_cte_vs_subquery() -> None:
"""Explain why CTEs are preferred over subqueries for daily sales reports."""
answer = """
CTEs improve readability and modularity for complex daily sales reports, making maintenance easier for Hijra Group’s analytics team. Unlike subqueries, CTEs allow reusable temporary result sets, simplifying query structure. They also enable PostgreSQL’s query planner to optimize joins efficiently, enhancing performance for large datasets.
"""
with open("de-onboarding/ex6_concepts.txt", "w") as file:
file.write(answer)
print("Answer saved to de-onboarding/ex6_concepts.txt")21.6 Chapter Summary and Connection to Chapter 22
This chapter covered:
- CTEs: Simplified complex queries with non-recursive CTEs (O(n) complexity), ideal for sales aggregation.
- Window Functions: Enabled running totals and rankings (O(n log n) for sorting), supporting time-series analytics.
- JSONB Querying: Handled semi-structured data (O(n) without indexes), enabling customer segmentation for Sharia-compliant profiling.
- Type-Safe Queries: Used
psycopg2with Pyright-verified type annotations for robust database interactions. - Testing: Validated queries with
pytest, including edge cases like empty tables and invalid JSONB data.
The micro-project built a type-annotated query tool for data/transactions.csv, producing analytical metrics (daily sales, running totals, customer sales) and visualizations for stakeholder reporting, all with 4-space indentation per PEP 8. The modular use of utils.py, reused from Chapter 3, prepares for Chapter 5’s Object-Oriented Programming, where functions like is_numeric can be encapsulated in a Validator class. The visualization skills lay the groundwork for dashboards in Chapter 51.
Connection to Chapter 22
Chapter 22, PostgreSQL Indexing and Optimization, builds on this chapter by:
- Optimizing Queries: Introduces indexes (e.g., B-tree, GIN) to reduce CTE and JSONB query complexity from O(n) to O(log n), critical for scaling Hijra Group’s transaction analytics.
- Partitioning: Enhances performance for large datasets, extending the micro-project’s queries.
- Query Planning: Leverages
EXPLAIN(previewed in debugging tips) to analyze and tune query performance. - Fintech Context: Ensures efficient analytics for real-time reporting, maintaining PEP 8’s 4-space indentation for maintainable pipeline scripts.