46 - Data Access Patterns for Applications
Complexity: Moderate (M)
46.0 Introduction: Why This Matters for Data Engineering
In data engineering, effective database access is critical for building scalable, maintainable applications that power Hijra Group’s Sharia-compliant fintech analytics. Data access patterns like Data Access Object (DAO) and Repository provide modular, reusable interfaces to interact with databases, reducing code duplication and improving testability. These patterns ensure business logic remains separate from database operations, aligning with Hijra Group’s need for robust, testable pipelines integrated with web frameworks like Django (Chapter 52) and FastAPI (Chapter 53). This chapter is a key component of a larger analytics pipeline, preparing for end-to-end solutions in capstone projects (Chapters 67–70). Building on Chapters 12–24 (database fundamentals), Chapter 17 (Python-PostgreSQL integration), Chapter 7 (type annotations), and Chapter 9 (testing), this chapter introduces type-annotated DAO and Repository patterns using psycopg2 to manage task data related to financial operations (e.g., sales processing). While earlier chapters (12–15) used SQLite with data/tasks.db (Appendix 1), Chapter 46 transitions to PostgreSQL for production-grade task management, preparing for advanced PostgreSQL features in Chapter 47.
This chapter uses type annotations verified by Pyright and tests with pytest, ensuring type-safe, reliable code. It avoids advanced concepts like async programming (Chapter 40), Kubernetes (Chapter 61), or try/except error handling (not yet introduced), focusing on synchronous database operations and modular design. All code adheres to PEP 8’s 4-space indentation, preferring spaces over tabs to avoid IndentationError, ensuring compatibility with Hijra Group’s pipeline scripts.
Data Engineering Workflow Context
This diagram illustrates how data access patterns fit into a data engineering pipeline:
flowchart TD
A["Web App (Django/FastAPI)"] --> B["Business Logic"]
B --> C["Data Access Layer (DAO/Repository)"]
C --> D["Database (PostgreSQL)"]
D --> E["Task Data"]
C --> F["Test Suite (pytest)"]
E --> C
classDef app fill:#f9f9f9,stroke:#333,stroke-width:2px
classDef logic fill:#d0e0ff,stroke:#336,stroke-width:1px
classDef data fill:#ddffdd,stroke:#363,stroke-width:1px
classDef test fill:#ffdddd,stroke:#933,stroke-width:1px
class A,B logic
class C,D,E data
class F testBuilding On and Preparing For
Building On:
- Chapters 12–24: Leverages SQL and PostgreSQL skills for querying task data.
- Chapter 17: Extends Python-PostgreSQL integration with
psycopg2for type-safe queries. - Chapter 7: Uses type annotations for type-safe database interactions.
- Chapter 9: Incorporates
pytestfor testing data access logic. - Chapter 5: Applies OOP principles for modular class design in
task_dao.py.
Preparing For:
- Chapter 47: Prepares for advanced PostgreSQL features (e.g., JSONB, search queries) by establishing robust access patterns.
- Chapters 52–53: Enables integration with Django and FastAPI for web applications.
- Chapters 67–70: Supports capstone projects by providing scalable database access and analytics for end-to-end pipelines.
What You’ll Learn
This chapter covers:
- DAO Pattern: Encapsulates database operations in a type-annotated class.
- Repository Pattern: Abstracts data access with a higher-level interface.
- Type-Safe Integration: Uses
psycopg2with Pydantic for Sharia-compliant validation. - Testing: Validates data access and analytics with
pytestand a test database. - Modular Design: Organizes code in
task_dao.pyandtask_repository.pymodules. - Financial Analytics: Integrates tasks with sales data for fintech insights.
By the end, you’ll implement a type-annotated DAO and Repository to manage tasks in a PostgreSQL database, producing a tested data access layer for Hijra Group’s task management system linked to financial analytics (e.g., sales processing tasks from data/sales.csv). The micro-project ensures modularity, testability, and Sharia compliance, adhering to PEP 8’s 4-space indentation.
Follow-Along Tips:
- Create
de-onboarding/data/and populate withconfig.yamlandsales.csvper Appendix 1. - Install libraries:
pip install psycopg2-binary pydantic pyyaml pytest pyright pandas. - Use 4 spaces (not tabs) per PEP 8. Run
python -tt script.pyto detect tab/space mixing. - Use print statements (e.g.,
print(cursor.fetchall())) to debug queries. - Verify file paths with
ls data/(Unix/macOS) ordir data\(Windows). - Use UTF-8 encoding for all files to avoid
UnicodeDecodeError. - Run PostgreSQL locally (e.g.,
docker run -p 5432:5432 -e POSTGRES_PASSWORD=postgres postgres).
46.1 Data Access Object (DAO) Pattern
The DAO pattern encapsulates database operations (CRUD: Create, Read, Update, Delete) in a single class, isolating SQL queries from business logic. It provides a type-safe interface for interacting with PostgreSQL, using psycopg2 for connection management and Pydantic for data validation. Each operation commits changes explicitly to ensure data consistency, a process called transaction management. PostgreSQL uses implicit transactions for each query, requiring commit() to persist changes, unlike SQLite’s simpler model used in earlier chapters (e.g., with data/tasks.db). Chapter 47 will explore advanced transaction features like rollbacks and savepoints, building on this foundation. For large datasets (e.g., thousands of tasks), operations like get_all_tasks may consume significant memory (O(n)), which is tested in the micro-project with runtime benchmarks to highlight performance considerations.
46.1.1 DAO Implementation
Create a TaskDAO class to manage tasks in a PostgreSQL database.
# File: de-onboarding/task_dao.py
from typing import List, Optional # Import typing for type annotations
import psycopg2 # Import psycopg2 for PostgreSQL connectivity
from pydantic import BaseModel, validator # Import Pydantic for data validation
class Task(BaseModel): # Define Pydantic model for task validation
"""Pydantic model to validate task data with type safety."""
task_id: str # Task ID as string
description: str # Task description as string
status: str # Task status as string
@validator("description") # Validate description field
def description_must_be_halal(cls, v: str) -> str:
"""Ensure task description starts with 'Halal' for Sharia compliance."""
if not v.startswith("Halal"): # Check if description starts with 'Halal'
raise ValueError("Description must start with 'Halal'") # Raise error if invalid
return v # Return valid description
class TaskDAO:
def __init__(self, db_config: dict): # Initialize with database config
"""Initialize DAO with PostgreSQL database configuration."""
print(f"Connecting with config: {db_config}") # Debug: Log config to diagnose connection issues
self.conn = psycopg2.connect(**db_config) # Connect to PostgreSQL using config
self.cursor = self.conn.cursor() # Create cursor for executing queries
print("TaskDAO initialized") # Debug: Confirm initialization
def create_task(self, task: Task) -> None: # Create a new task
"""Insert a task into the tasks table."""
query = """
INSERT INTO tasks (task_id, description, status)
VALUES (%s, %s, %s)
""" # SQL query to insert task data
print(f"Executing query: {query}") # Debug: Log query to diagnose SQL errors
self.cursor.execute(query, (task.task_id, task.description, task.status)) # Execute query with task data
self.conn.commit() # Commit transaction to save changes
print(f"Created task: {task.task_id}") # Debug: Confirm task creation
def get_task(self, task_id: str) -> Optional[Task]: # Retrieve a task by ID
"""Get a task by its task_id from the tasks table."""
query = "SELECT task_id, description, status FROM tasks WHERE task_id = %s" # SQL query to select task
print(f"Executing query: {query}") # Debug: Log query for troubleshooting
self.cursor.execute(query, (task_id,)) # Execute query with task_id
result = self.cursor.fetchone() # Fetch one result
if result: # If task exists
return Task(task_id=result[0], description=result[1], status=result[2]) # Return Task object
print(f"Task not found: {task_id}") # Debug: Log missing task
return None # Return None if task not found
def update_task(self, task: Task) -> bool: # Update an existing task
"""Update a task's description and status in the tasks table."""
query = """
UPDATE tasks SET description = %s, status = %s
WHERE task_id = %s
""" # SQL query to update task
print(f"Executing query: {query}") # Debug: Log query to diagnose errors
self.cursor.execute(query, (task.description, task.status, task.task_id)) # Execute query with task data
self.conn.commit() # Commit transaction to save changes
rows_affected = self.cursor.rowcount # Get number of updated rows
print(f"Updated task: {task.task_id}, rows affected: {rows_affected}") # Debug: Log update
return rows_affected > 0 # Return True if update occurred
def delete_task(self, task_id: str) -> bool: # Delete a task by ID
"""Delete a task by its task_id from the tasks table."""
query = "DELETE FROM tasks WHERE task_id = %s" # SQL query to delete task
print(f"Executing query: {query}") # Debug: Log query for troubleshooting
self.cursor.execute(query, (task_id,)) # Execute query with task_id
self.conn.commit() # Commit transaction to save changes
rows_affected = self.cursor.rowcount # Get number of deleted rows
print(f"Deleted task: {task_id}, rows affected: {rows_affected}") # Debug: Log deletion
return rows_affected > 0 # Return True if deletion occurred
def get_all_tasks(self) -> List[Task]: # Retrieve all tasks
"""Get all tasks from the tasks table."""
query = "SELECT task_id, description, status FROM tasks" # SQL query to select all tasks
print(f"Executing query: {query}") # Debug: Log query to diagnose errors
self.cursor.execute(query) # Execute query
results = self.cursor.fetchall() # Fetch all results
tasks = [Task(task_id=row[0], description=row[1], status=row[2]) for row in results] # Convert results to Task objects
print(f"Retrieved {len(tasks)} tasks") # Debug: Log number of tasks
return tasks # Return list of tasks
def close(self) -> None: # Close database connection
"""Close cursor and connection to PostgreSQL."""
self.cursor.close() # Close cursor
self.conn.close() # Close connection
print("TaskDAO connection closed") # Debug: Confirm closureFollow-Along Instructions:
- Ensure
de-onboarding/exists in your working directory. - Install dependencies:
pip install psycopg2-binary pydantic. - Save as
de-onboarding/task_dao.py. - Configure editor for 4-space indentation per PEP 8 (VS Code: “Editor: Tab Size” = 4, “Editor: Insert Spaces” = true, “Editor: Detect Indentation” = false).
- Common Errors:
- ModuleNotFoundError: Install
psycopg2-binaryandpydanticwithpip install psycopg2-binary pydantic. - psycopg2.OperationalError: Ensure PostgreSQL is running and
db_configis correct (e.g.,{"dbname": "postgres", "user": "postgres", "password": "postgres", "host": "localhost", "port": "5432"}). Test connectivity withpsql -h localhost -U postgres. Printdb_configto debug. - IndentationError: Use 4 spaces (not tabs). Run
python -tt task_dao.pyto detect issues. - psycopg2.ProgrammingError: Ensure the
taskstable exists. Runcreate_tasks_table_postgres.pyand printqueryto debug SQL errors.
- ModuleNotFoundError: Install
Key Points:
- Pydantic: Validates task data, ensuring Sharia-compliant descriptions (e.g., “Halal” prefix) for Hijra Group’s fintech context.
- psycopg2: Executes parameterized queries to prevent SQL injection, ensuring secure database interactions.
- Transaction Management:
conn.commit()saves changes, critical for PostgreSQL’s implicit transactions. Chapter 47 covers advanced features like rollbacks. - Time Complexity:
- Single-row operations (create, get, update, delete): O(1) with indexed
task_id(primary key). get_all_tasks: O(n) for n rows, scanning the entire table, with runtime benchmarks in tests highlighting performance.
- Single-row operations (create, get, update, delete): O(1) with indexed
- Space Complexity:
- Single-task operations: O(1) for temporary variables.
get_all_tasks: O(n) for n rows, potentially high for large datasets (e.g., thousands of tasks), as tested.
- Underlying Implementation:
psycopg2uses C-based bindings for efficient PostgreSQL queries. Pydantic validates data at runtime, ensuring type safety. Thetask_idprimary key ensures O(1) lookups; unindexed queries would be O(n).
46.2 Repository Pattern
The Repository pattern abstracts the DAO, providing a higher-level interface that mimics an in-memory collection. It decouples business logic from database specifics, making it easier to switch databases (e.g., to another RDBMS) or mock data for testing, which is crucial for scalable fintech applications.
46.2.1 Repository Implementation
Create a TaskRepository class to wrap TaskDAO.
# File: de-onboarding/task_repository.py
from typing import List, Optional # Import typing for type annotations
from task_dao import TaskDAO, Task # Import TaskDAO and Task for repository operations
class TaskRepository:
def __init__(self, db_config: dict): # Initialize with database config
"""Initialize repository with DAO for database access."""
self.dao = TaskDAO(db_config) # Create DAO instance with config
print("TaskRepository initialized") # Debug: Confirm initialization
def add(self, task: Task) -> None: # Add a new task
"""Add a task to the repository."""
self.dao.create_task(task) # Call DAO to insert task
print(f"Repository: Added task {task.task_id}") # Debug: Log task addition
def find_by_id(self, task_id: str) -> Optional[Task]: # Find a task by ID
"""Find a task by its task_id."""
task = self.dao.get_task(task_id) # Call DAO to retrieve task
print(f"Repository: Found task {task_id}: {task}") # Debug: Log retrieval
return task # Return task or None
def update(self, task: Task) -> bool: # Update a task
"""Update a task's description and status."""
success = self.dao.update_task(task) # Call DAO to update task
print(f"Repository: Updated task {task.task_id}: {success}") # Debug: Log update
return success # Return True if update succeeded
def delete(self, task_id: str) -> bool: # Delete a task
"""Delete a task by its task_id."""
success = self.dao.delete_task(task_id) # Call DAO to delete task
print(f"Repository: Deleted task {task_id}: {success}") # Debug: Log deletion
return success # Return True if deletion succeeded
def find_all(self) -> List[Task]: # Get all tasks
"""Get all tasks from the repository."""
tasks = self.dao.get_all_tasks() # Call DAO to retrieve all tasks
print(f"Repository: Retrieved {len(tasks)} tasks") # Debug: Log number of tasks
return tasks # Return list of tasks
def close(self) -> None: # Close repository
"""Close the underlying DAO connection."""
self.dao.close() # Call DAO to close connection
print("TaskRepository closed") # Debug: Confirm closureFollow-Along Instructions:
- Save as
de-onboarding/task_repository.py. - Ensure
task_dao.pyis inde-onboarding/. - Configure editor for 4-space indentation per PEP 8.
- Common Errors:
- ImportError: Ensure
task_dao.pyexists and is importable. Check withls task_dao.py. - IndentationError: Use 4 spaces (not tabs). Run
python -tt task_repository.py.
- ImportError: Ensure
Key Points:
- Abstraction:
TaskRepositorymimics a collection (e.g.,add,find_by_id), hiding SQL details for cleaner business logic. - Testability: Simplifies mocking for unit tests (Chapter 42), crucial for fintech applications.
- Time/Space Complexity: Inherits from
TaskDAO(O(1) for single operations, O(n) forfind_allwith large datasets, tested with benchmarks). - Implication: Enables seamless integration with web frameworks, aligning with Hijra Group’s modular architecture.
46.3 Micro-Project: Task Management Data Access Layer
Project Requirements
Implement a type-annotated data access layer for a task management system using DAO and Repository patterns, managing tasks in a PostgreSQL database for Hijra Group’s Sharia-compliant operational analytics. The system is a component of a larger fintech analytics pipeline, handling tasks related to financial operations (e.g., processing sales data from data/sales.csv) and providing task status summaries by product, ensuring descriptions align with Sharia compliance (e.g., “Halal” prefix). It uses config.yaml for database configuration, ensuring modularity, testability, and compliance with Islamic Financial Services Board (IFSB) standards, preparing for capstone projects (Chapters 67–70).
- Implement
TaskDAOandTaskRepositoryintask_dao.pyandtask_repository.py. - Use
psycopg2for PostgreSQL connectivity and Pydantic for Sharia-compliant validation. - Read
data/config.yamlfor database configuration. - Initialize the
taskstable withcreate_tasks_table_postgres.py. - Test CRUD operations, task-sales analytics, and large datasets with
pytestusing a test database. - Log operations using print statements for debugging.
- Use 4-space indentation per PEP 8, preferring spaces over tabs.
- Validate tasks and test edge cases (e.g., missing tasks, non-Halal descriptions, null products, large datasets).
Sample Input Files
data/config.yaml (Appendix 1, modified for PostgreSQL):
min_price: 10.0
max_quantity: 100
required_fields:
- product
- price
- quantity
product_prefix: 'Halal'
max_decimals: 2
db_config:
dbname: 'postgres'
user: 'postgres'
password: 'postgres'
host: 'localhost'
port: '5432'data/sales.csv (Appendix 1):
product,price,quantity
Halal Laptop,999.99,2
Halal Mouse,24.99,10
Halal Keyboard,49.99,5
,29.99,3
Monitor,invalid,2
Headphones,5.00,150Task Data (seeded via create_tasks_table_postgres.py):
- Table:
tasks - Columns:
task_id(TEXT, PRIMARY KEY),description(TEXT),status(TEXT) - Data:
INSERT INTO tasks (task_id, description, status) VALUES ('T001', 'Halal Laptop sales processing', 'Completed'), ('T002', 'Halal Mouse inventory validation', 'Pending'), ('T003', 'Halal Keyboard pricing update', 'In Progress');
Note: While data/tasks.db (SQLite) is referenced in Appendix 1 for earlier chapters (12–15), this project uses PostgreSQL with psycopg2 for production-grade task management, initialized by create_tasks_table_postgres.py, as clarified in the Introduction.
Data Processing Flow
flowchart TD
A["Web App Logic"] --> B["TaskRepository"]
B --> C["TaskDAO"]
C --> D["Read YAML
config.yaml"]
D --> E["Connect to PostgreSQL"]
E --> F["Execute CRUD
Tasks Table"]
F --> G["Return Results"]
F --> H["Log Operations"]
C --> I["Pydantic Validation"]
B --> J["pytest Tests"]
B --> K["Task-Sales Analytics"]
classDef app fill:#f9f9f9,stroke:#333,stroke-width:2px
classDef logic fill:#d0e0ff,stroke:#336,stroke-width:1px
classDef data fill:#ddffdd,stroke:#363,stroke-width:1px
classDef test fill:#ffdddd,stroke:#933,stroke-width:1px
class A,B logic
class C,D,E,F,G,I,K data
class H,J testAcceptance Criteria
- Go Criteria:
- Loads
config.yamland connects to PostgreSQL. - Implements type-annotated
TaskDAOandTaskRepositorywith CRUD operations. - Validates tasks with Pydantic, ensuring Sharia-compliant descriptions.
- Summarizes task statuses by product from
sales.csv, handling null entries. - Tests all operations, analytics, and large datasets with
pytestusing a test database. - Logs operations and errors with print statements.
- Uses 4-space indentation per PEP 8, preferring spaces over tabs.
- Handles edge cases (e.g., missing tasks, non-Halal descriptions, null products, large datasets).
- Loads
- No-Go Criteria:
- Fails to connect to PostgreSQL.
- Missing type annotations, tests, or analytics output.
- Incorrect CRUD operations, validation, or analytics.
- Inconsistent indentation or tab/space mixing.
Common Pitfalls to Avoid
- Database Connection Errors:
- Problem:
psycopg2.OperationalErrordue to incorrectdb_config. - Solution: Print
db_configwithprint(db_config)and verify PostgreSQL is running. Test connectivity withpsql -h localhost -U postgres.
- Problem:
- Pydantic Validation Errors:
- Problem: Non-Halal descriptions cause
ValidationError. - Solution: Print task data before validation with
print(task.dict()).
- Problem: Non-Halal descriptions cause
- SQL Injection:
- Problem: Unsafe queries expose vulnerabilities.
- Solution: Use parameterized queries (e.g.,
%swithcursor.execute) to ensure security.
- Test Database Conflicts:
- Problem: Tests modify production data.
- Solution: Use a separate test database (
test_tasks) and drop it after tests.
- IndentationError:
- Problem: Mixed spaces/tabs in code.
- Solution: Use 4 spaces per PEP 8. Run
python -tt task_manager.pyto detect issues.
- CSV Parsing Errors:
- Problem: Malformed
sales.csvcauses pandas errors. - Solution: Print
df.head()to inspect the DataFrame and verify CSV format.
- Problem: Malformed
How This Differs from Production
In production, this solution would include:
- Connection Pooling: Use
psycopg2.poolfor scalable connections (Chapter 63). - Error Handling: Try/except for robust error management (post-Chapter 46).
- Logging: File-based logging with
loggingmodule (Chapter 52). - Security: Encrypted credentials and secrets management (Chapter 65).
- Observability: Metrics for query performance monitoring (Chapter 66).
Implementation
# File: de-onboarding/create_tasks_table_postgres.py
import psycopg2 # Import psycopg2 for PostgreSQL connectivity
def create_tasks_table(db_config: dict) -> None:
"""Create tasks table in PostgreSQL if it doesn't exist."""
conn = psycopg2.connect(**db_config) # Connect to PostgreSQL using config
conn.autocommit = True # Enable autocommit for table creation
cursor = conn.cursor() # Create cursor for executing queries
cursor.execute("""
CREATE TABLE IF NOT EXISTS tasks (
task_id TEXT PRIMARY KEY,
description TEXT,
status TEXT
)
""") # Create tasks table if it doesn't exist
# Check if table is empty
cursor.execute("SELECT COUNT(*) FROM tasks") # Count rows in tasks table
if cursor.fetchone()[0] == 0: # If table is empty
cursor.executemany(
"INSERT INTO tasks (task_id, description, status) VALUES (%s, %s, %s)",
[
("T001", "Halal Laptop sales processing", "Completed"), # Initial task 1
("T002", "Halal Mouse inventory validation", "Pending"), # Initial task 2
("T003", "Halal Keyboard pricing update", "In Progress"), # Initial task 3
]
) # Insert initial data
conn.commit() # Commit transaction to save changes
cursor.close() # Close cursor
conn.close() # Close connection
print(f"Initialized tasks table in PostgreSQL") # Debug: Confirm initialization
if __name__ == "__main__":
# Default config for standalone execution
db_config = {
"dbname": "postgres",
"user": "postgres",
"password": "postgres",
"host": "localhost",
"port": "5432"
}
create_tasks_table(db_config) # Run table creation# File: de-onboarding/task_dao.py
from typing import List, Optional # Import typing for type annotations
import psycopg2 # Import psycopg2 for PostgreSQL connectivity
from pydantic import BaseModel, validator # Import Pydantic for data validation
class Task(BaseModel): # Define Pydantic model for task validation
"""Pydantic model to validate task data with type safety."""
task_id: str # Task ID as string
description: str # Task description as string
status: str # Task status as string
@validator("description") # Validate description field
def description_must_be_halal(cls, v: str) -> str:
"""Ensure task description starts with 'Halal' for Sharia compliance."""
if not v.startswith("Halal"): # Check if description starts with 'Halal'
raise ValueError("Description must start with 'Halal'") # Raise error if invalid
return v # Return valid description
class TaskDAO:
def __init__(self, db_config: dict): # Initialize with database config
"""Initialize DAO with PostgreSQL database configuration."""
print(f"Connecting with config: {db_config}") # Debug: Log config to diagnose connection issues
self.conn = psycopg2.connect(**db_config) # Connect to PostgreSQL using config
self.cursor = self.conn.cursor() # Create cursor for executing queries
print("TaskDAO initialized") # Debug: Confirm initialization
def create_task(self, task: Task) -> None: # Create a new task
"""Insert a task into the tasks table."""
query = """
INSERT INTO tasks (task_id, description, status)
VALUES (%s, %s, %s)
""" # SQL query to insert task data
print(f"Executing query: {query}") # Debug: Log query to diagnose SQL errors
self.cursor.execute(query, (task.task_id, task.description, task.status)) # Execute query with task data
self.conn.commit() # Commit transaction to save changes
print(f"Created task: {task.task_id}") # Debug: Confirm task creation
def get_task(self, task_id: str) -> Optional[Task]: # Retrieve a task by ID
"""Get a task by its task_id from the tasks table."""
query = "SELECT task_id, description, status FROM tasks WHERE task_id = %s" # SQL query to select task
print(f"Executing query: {query}") # Debug: Log query for troubleshooting
self.cursor.execute(query, (task_id,)) # Execute query with task_id
result = self.cursor.fetchone() # Fetch one result
if result: # If task exists
return Task(task_id=result[0], description=result[1], status=result[2]) # Return Task object
print(f"Task not found: {task_id}") # Debug: Log missing task
return None # Return None if task not found
def update_task(self, task: Task) -> bool: # Update an existing task
"""Update a task's description and status in the tasks table."""
query = """
UPDATE tasks SET description = %s, status = %s
WHERE task_id = %s
""" # SQL query to update task
print(f"Executing query: {query}") # Debug: Log query to diagnose errors
self.cursor.execute(query, (task.description, task.status, task.task_id)) # Execute query with task data
self.conn.commit() # Commit transaction to save changes
rows_affected = self.cursor.rowcount # Get number of updated rows
print(f"Updated task: {task.task_id}, rows affected: {rows_affected}") # Debug: Log update
return rows_affected > 0 # Return True if update occurred
def delete_task(self, task_id: str) -> bool: # Delete a task by ID
"""Delete a task by its task_id from the tasks table."""
query = "DELETE FROM tasks WHERE task_id = %s" # SQL query to delete task
print(f"Executing query: {query}") # Debug: Log query for troubleshooting
self.cursor.execute(query, (task_id,)) # Execute query with task_id
self.conn.commit() # Commit transaction to save changes
rows_affected = self.cursor.rowcount # Get number of deleted rows
print(f"Deleted task: {task_id}, rows affected: {rows_affected}") # Debug: Log deletion
return rows_affected > 0 # Return True if deletion occurred
def get_all_tasks(self) -> List[Task]: # Retrieve all tasks
"""Get all tasks from the tasks table."""
query = "SELECT task_id, description, status FROM tasks" # SQL query to select all tasks
print(f"Executing query: {query}") # Debug: Log query to diagnose errors
self.cursor.execute(query) # Execute query
results = self.cursor.fetchall() # Fetch all results
tasks = [Task(task_id=row[0], description=row[1], status=row[2]) for row in results] # Convert results to Task objects
print(f"Retrieved {len(tasks)} tasks") # Debug: Log number of tasks
return tasks # Return list of tasks
def close(self) -> None: # Close database connection
"""Close cursor and connection to PostgreSQL."""
self.cursor.close() # Close cursor
self.conn.close() # Close connection
print("TaskDAO connection closed") # Debug: Confirm closure# File: de-onboarding/task_repository.py
from typing import List, Optional # Import typing for type annotations
from task_dao import TaskDAO, Task # Import TaskDAO and Task for repository operations
class TaskRepository:
def __init__(self, db_config: dict): # Initialize with database config
"""Initialize repository with DAO for database access."""
self.dao = TaskDAO(db_config) # Create DAO instance with config
print("TaskRepository initialized") # Debug: Confirm initialization
def add(self, task: Task) -> None: # Add a new task
"""Add a task to the repository."""
self.dao.create_task(task) # Call DAO to insert task
print(f"Repository: Added task {task.task_id}") # Debug: Log task addition
def find_by_id(self, task_id: str) -> Optional[Task]: # Find a task by ID
"""Find a task by its task_id."""
task = self.dao.get_task(task_id) # Call DAO to retrieve task
print(f"Repository: Found task {task_id}: {task}") # Debug: Log retrieval
return task # Return task or None
def update(self, task: Task) -> bool: # Update a task
"""Update a task's description and status."""
success = self.dao.update_task(task) # Call DAO to update task
print(f"Repository: Updated task {task.task_id}: {success}") # Debug: Log update
return success # Return True if update succeeded
def delete(self, task_id: str) -> bool: # Delete a task
"""Delete a task by its task_id."""
success = self.dao.delete_task(task_id) # Call DAO to delete task
print(f"Repository: Deleted task {task_id}: {success}") # Debug: Log deletion
return success # Return True if deletion succeeded
def find_all(self) -> List[Task]: # Get all tasks
"""Get all tasks from the repository."""
tasks = self.dao.get_all_tasks() # Call DAO to retrieve all tasks
print(f"Repository: Retrieved {len(tasks)} tasks") # Debug: Log number of tasks
return tasks # Return list of tasks
def close(self) -> None: # Close repository
"""Close the underlying DAO connection."""
self.dao.close() # Call DAO to close connection
print("TaskRepository closed") # Debug: Confirm closure# File: de-onboarding/task_manager.py
from typing import List, Dict # Import typing for type annotations
import yaml # Import PyYAML for config parsing
import pandas as pd # Import pandas for sales data processing
from task_repository import TaskRepository, Task # Import repository and Task model
from create_tasks_table_postgres import create_tasks_table # Import table creation script
def read_config(config_path: str) -> dict: # Read YAML config
"""Read database configuration from YAML file."""
print(f"Loading config: {config_path}") # Debug: Log config path
with open(config_path, "r") as file: # Open YAML file
config = yaml.safe_load(file) # Parse YAML content
print(f"Config loaded: {config['db_config']}") # Debug: Log loaded config
return config["db_config"] # Return database config
def summarize_task_statuses(repo: TaskRepository, csv_path: str) -> Dict[str, Dict[str, int]]:
"""Summarize task statuses by product from sales.csv."""
df = pd.read_csv(csv_path) # Load sales CSV into DataFrame
print(f"Sales data head:\n{df.head()}") # Debug: Inspect DataFrame for issues
products = df["product"].dropna().unique() # Get unique non-null products, dropping nulls (e.g., ',29.99,3')
result = {product: {"Completed": 0, "Pending": 0, "In Progress": 0} for product in products} # Initialize result
tasks = repo.find_all() # Retrieve all tasks
for task in tasks: # Iterate through tasks
for product in products: # Check each product
if product.lower() in task.description.lower(): # Match product in description
result[product][task.status] += 1 # Increment status count
print(f"Task status summary: {result}") # Debug: Log summary
return result # Return status counts
def main() -> None: # Main function
"""Manage tasks and summarize statuses as part of fintech analytics pipeline."""
config_path = "data/config.yaml" # Path to config file
sales_path = "data/sales.csv" # Path to sales CSV
print(f"Starting task manager with config: {config_path}, sales: {sales_path}") # Debug: Log paths
db_config = read_config(config_path) # Load database config
# Initialize tasks table
create_tasks_table(db_config) # Create tasks table if it doesn't exist
# Initialize repository
repo = TaskRepository(db_config) # Create repository instance
# Create a new task
new_task = Task(
task_id="T004",
description="Halal Monitor sales analysis",
status="Pending"
) # Define new task for sales analysis
repo.add(new_task) # Add task to repository
# Retrieve a task
task = repo.find_by_id("T004",) # Find task by ID
if task: # If task exists
print(f"Retrieved task: {task.dict()}") # Debug: Log retrieved task
# Update a task
updated_task = Task(
task_id="T004",
description="Halal Monitor sales data analysis",
status="In Progress"
) # Define updated task
success = repo.update(updated_task) # Update task in repository
print(f"Update successful: {success}") # Debug: Log update status
# Delete a task
success = repo.delete("T004") # Delete task by ID
print(f"Delete successful: {success}") # Debug: Log deletion status
# Get all tasks
tasks = repo.find_all() # Retrieve all tasks
print(f"All tasks: {[task.dict() for task in tasks]}") # Debug: Log all tasks
# Summarize task statuses by product
status_summary = summarize_task_statuses(repo, sales_path) # Generate task status summary
print(f"Task status summary by product: {status_summary}") # Debug: Log summary
# Close repository
repo.close() # Close repository connection
if __name__ == "__main__":
main() # Run main function# File: de-onboarding/tests/test_task_repository.py
import pytest # Import pytest for testing
import psycopg2 # Import psycopg2 for PostgreSQL connectivity
import pandas as pd # Import pandas for sales data simulation
import time # Import time for runtime benchmarks
from task_repository import TaskRepository, Task # Import repository and Task model
from task_manager import summarize_task_statuses # Import analytics function
@pytest.fixture
def test_db(): # Setup test database
"""Create a temporary test database for testing."""
db_config = {
"dbname": "test_tasks",
"user": "postgres",
"password": "postgres",
"host": "localhost",
"port": "5432"
} # Define test database config
# Create test database
conn = psycopg2.connect(
dbname="postgres", user="postgres", password="postgres", host="localhost", port="5432"
) # Connect to default postgres database
conn.autocommit = True # Enable autocommit for database creation
cursor = conn.cursor() # Create cursor
cursor.execute("DROP DATABASE IF EXISTS test_tasks") # Drop test database if exists
cursor.execute("CREATE DATABASE test_tasks") # Create test database
cursor.close() # Close cursor
conn.close() # Close connection
# Create tasks table and insert initial data
conn = psycopg2.connect(**db_config) # Connect to test database
cursor = conn.cursor() # Create cursor
cursor.execute("""
CREATE TABLE tasks (
task_id TEXT PRIMARY KEY,
description TEXT,
status TEXT
)
""") # Create tasks table
cursor.executemany(
"INSERT INTO tasks (task_id, description, status) VALUES (%s, %s, %s)",
[
("T001", "Halal Laptop sales processing", "Completed"), # Initial task 1
("T002", "Halal Mouse inventory validation", "Pending"), # Initial task 2
("T003", "Halal Keyboard pricing update", "In Progress"), # Initial task 3
]
) # Insert initial data
conn.commit() # Commit transaction
cursor.close() # Close cursor
conn.close() # Close connection
yield db_config # Provide db_config to tests
# Teardown: Drop test database
conn = psycopg2.connect(
dbname="postgres", user="postgres", password="postgres", host="localhost", port="5432"
) # Reconnect to default database
conn.autocommit = True # Enable autocommit
cursor = conn.cursor() # Create cursor
cursor.execute("DROP DATABASE test_tasks") # Drop test database
cursor.close() # Close cursor
conn.close() # Close connection
def test_crud_operations(test_db): # Test CRUD operations
"""Test TaskRepository CRUD operations."""
repo = TaskRepository(test_db) # Create repository instance
# Test create
task = Task(task_id="T004", description="Halal Test task", status="Pending") # Define new task
repo.add(task) # Add task to repository
# Test read
retrieved = repo.find_by_id("T004") # Retrieve task by ID
assert retrieved is not None # Ensure task exists
assert retrieved.task_id == "T004" # Verify task ID
assert retrieved.description == "Halal Test task" # Verify description
assert retrieved.status == "Pending" # Verify status
# Test update
updated_task = Task(task_id="T004", description="Halal Updated task", status="Completed") # Define updated task
success = repo.update(updated_task) # Update task
assert success # Ensure update succeeded
retrieved = repo.find_by_id("T004") # Retrieve updated task
assert retrieved.description == "Halal Updated task" # Verify updated description
assert retrieved.status == "Completed" # Verify updated status
# Test delete
success = repo.delete("T004") # Delete task
assert success # Ensure deletion succeeded
retrieved = repo.find_by_id("T004") # Try to retrieve deleted task
assert retrieved is None # Ensure task is gone
# Test find_all
tasks = repo.find_all() # Retrieve all tasks
assert len(tasks) == 3 # Verify only initial data remains
assert any(task.task_id == "T002" for task in tasks) # Verify T002 exists
repo.close() # Close repository
def test_missing_task(test_db): # Test missing task
"""Test handling of missing task."""
repo = TaskRepository(test_db) # Create repository instance
task = repo.find_by_id("T999") # Try to find non-existent task
assert task is None # Ensure task is not found
success = repo.delete("T999") # Try to delete non-existent task
assert not success # Ensure deletion fails
repo.close() # Close repository
def test_invalid_description(test_db): # Test Sharia-compliant validator
"""Test that non-Halal descriptions raise ValidationError."""
repo = TaskRepository(test_db) # Create repository instance
with pytest.raises(pydantic.ValidationError): # Expect validation error
repo.add(Task(task_id="T005", description="Invalid task", status="Pending")) # Try invalid description
repo.close() # Close repository
def test_large_dataset(test_db): # Test large dataset performance
"""Test retrieving large number of tasks with runtime benchmark."""
repo = TaskRepository(test_db) # Create repository instance
# Insert 1000 tasks
conn = psycopg2.connect(**test_db) # Connect to test database
cursor = conn.cursor() # Create cursor
for i in range(4, 1004): # Insert tasks T004 to T1003
cursor.execute(
"INSERT INTO tasks (task_id, description, status) VALUES (%s, %s, %s)",
(f"T{i:03d}", f"Halal Task {i}", "Pending")
) # Insert task
conn.commit() # Commit transaction
cursor.close() # Close cursor
conn.close() # Close connection
start_time = time.time() # Start runtime measurement
tasks = repo.find_all() # Retrieve all tasks
runtime = time.time() - start_time # Calculate runtime
assert len(tasks) == 1003 # Verify 1000 new tasks + 3 initial
assert any(task.task_id == "T500" for task in tasks) # Verify a sample task
assert runtime < 1.0 # Ensure runtime is reasonable (<1 second)
print(f"Runtime for 1003 tasks: {runtime:.3f} seconds") # Debug: Log runtime
repo.close() # Close repository
def test_task_sales_summary(test_db, tmp_path): # Test task-sales analytics
"""Test summarizing task statuses by product, including null product handling."""
# Create temporary sales.csv with null product
sales_path = tmp_path / "sales.csv" # Define temporary file path
df = pd.DataFrame({
"product": ["Halal Laptop", "Halal Mouse", "Halal Keyboard", ""],
"price": [999.99, 24.99, 49.99, 29.99],
"quantity": [2, 10, 5, 3]
}) # Create sample sales data with null product
df.to_csv(sales_path, index=False) # Save to temporary file
repo = TaskRepository(test_db) # Create repository instance
summary = summarize_task_statuses(repo, sales_path) # Generate summary
assert summary["Halal Laptop"]["Completed"] == 1 # Verify completed task for Laptop
assert summary["Halal Mouse"]["Pending"] == 1 # Verify pending task for Mouse
assert summary["Halal Keyboard"]["In Progress"] == 1 # Verify in-progress task for Keyboard
assert "" not in summary # Verify null product is excluded
repo.close() # Close repositoryExpected Outputs
Console Output (abridged):
Starting task manager with config: data/config.yaml, sales: data/sales.csv
Loading config: data/config.yaml
Config loaded: {'dbname': 'postgres', 'user': 'postgres', 'password': 'postgres', 'host': 'localhost', 'port': '5432'}
Initialized tasks table in PostgreSQL
Connecting with config: {'dbname': 'postgres', 'user': 'postgres', 'password': 'postgres', 'host': 'localhost', 'port': '5432'}
TaskDAO initialized
TaskRepository initialized
Executing query:
INSERT INTO tasks (task_id, description, status)
VALUES (%s, %s, %s)
Created task: T004
Repository: Added task T004
Executing query: SELECT task_id, description, status FROM tasks WHERE task_id = %s
Repository: Found task T004: task_id='T004' description='Halal Monitor sales analysis' status='Pending'
Retrieved task: {'task_id': 'T004', 'description': 'Halal Monitor sales analysis', 'status': 'Pending'}
Executing query:
UPDATE tasks SET description = %s, status = %s
WHERE task_id = %s
Updated task: T004, rows affected: 1
Repository: Updated task T004: True
Update successful: True
Executing query: DELETE FROM tasks WHERE task_id = %s
Deleted task: T004, rows affected: 1
Repository: Deleted task T004: True
Delete successful: True
Executing query: SELECT task_id, description, status FROM tasks
Retrieved 3 tasks
Repository: Retrieved 3 tasks
All tasks: [{'task_id': 'T001', 'description': 'Halal Laptop sales processing', 'status': 'Completed'}, ...]
Sales data head:
product price quantity
0 Halal Laptop 999.99 2
1 Halal Mouse 24.99 10
2 Halal Keyboard 49.99 5
3 29.99 3
4 Monitor NaN 2
Task status summary: {'Halal Laptop': {'Completed': 1, 'Pending': 0, 'In Progress': 0}, ...}
Task status summary by product: {'Halal Laptop': {'Completed': 1, 'Pending': 0, 'In Progress': 0}, ...}
TaskDAO connection closed
TaskRepository closedTest Output:
pytest tests/test_task_repository.py -v
# Expected: All tests passHow to Run and Test
Setup:
Setup Checklist:
- Create
de-onboarding/data/and populate withconfig.yamlandsales.csvper Appendix 1. - Install libraries:
pip install psycopg2-binary pydantic pyyaml pytest pyright pandas. - Create virtual environment:
python -m venv venv, activate (Windows:venv\Scripts\activate, Unix:source venv/bin/activate). - Verify Python 3.10+:
python --version. - Configure editor for 4-space indentation per PEP 8 (VS Code: “Editor: Tab Size” = 4, “Editor: Insert Spaces” = true, “Editor: Detect Indentation” = false).
- Save
create_tasks_table_postgres.py,task_dao.py,task_repository.py,task_manager.py, andtests/test_task_repository.py. - Run PostgreSQL:
docker run -p 5432:5432 -e POSTGRES_PASSWORD=postgres postgres. - Initialize
taskstable:python create_tasks_table_postgres.py. - Verify type safety:
pyright task_dao.py task_repository.py task_manager.py. - Ensure
data/sales.csvexists for task-sales analytics. - Review the Introduction’s explanation of the SQLite-to-PostgreSQL transition.
- Create
Troubleshooting:
- psycopg2.OperationalError: Print
db_configwithprint(db_config)and verify PostgreSQL connectivity withpsql -h localhost -U postgres. - FileNotFoundError: Ensure
config.yamlandsales.csvexist. Print paths withprint(config_path)andprint(sales_path). - IndentationError: Use 4 spaces (not tabs). Run
python -tt task_manager.py. - yaml.YAMLError: Print
open(config_path).read()to check YAML syntax. - pydantic.ValidationError: Print task data with
print(task.dict())to debug. - psycopg2.ProgrammingError: Ensure the
taskstable exists. Runcreate_tasks_table_postgres.pyand printqueryto debug SQL errors. - pandas.errors.ParserError: Print
df.head()to inspectsales.csvfor malformed rows or missing columns.
- psycopg2.OperationalError: Print
Run:
- Open terminal in
de-onboarding/. - Run:
python task_manager.py. - Outputs: Console logs of CRUD operations and task-sales status summary.
- Open terminal in
Test:
- Run:
pytest tests/test_task_repository.py -v. - Verify all tests pass.
- Test Scenarios:
- Valid CRUD:
test_crud_operationsverifies create, read, update, delete. - Missing Task:
test_missing_taskensuresfind_by_idanddeletehandle non-existent tasks. - Sharia Compliance:
test_invalid_descriptionverifies non-Halal descriptions raiseValidationError. - Large Dataset:
test_large_datasetverifiesfind_allwith 1000 tasks and runtime benchmark. - Task-Sales Analytics:
test_task_sales_summaryverifies status summary by product, including null product handling.
- Valid CRUD:
- Run:
46.4 Practice Exercises
Exercise 1: Extend TaskDAO with Status Filter
Add a get_tasks_by_status method to TaskDAO to filter tasks by status, using a PostgreSQL database, with 4-space indentation per PEP 8.
Expected Output:
tasks = dao.get_tasks_by_status("Pending")
# Returns: [Task(task_id="T002", description="Halal Mouse inventory validation", status="Pending")]Follow-Along Instructions:
- Save as
de-onboarding/ex1_task_dao.py. - Ensure PostgreSQL is running (
docker run -p 5432:5432 -e POSTGRES_PASSWORD=postgres postgres). - Configure editor for 4-space indentation per PEP 8.
- Run tests with
pytest. - How to Test:
- Add test to
test_task_repository.py:def test_get_tasks_by_status(test_db): repo = TaskRepository(test_db) tasks = repo.dao.get_tasks_by_status("Pending") assert len(tasks) == 1 assert tasks[0].task_id == "T002" assert tasks[0].status == "Pending" repo.close()
- Add test to
Exercise 2: Repository Bulk Add
Add a bulk_add method to TaskRepository to add multiple tasks, using a PostgreSQL database, with 4-space indentation per PEP 8.
Expected Output:
repo.bulk_add([Task(task_id="T005", ...), Task(task_id="T006", ...)])
# Adds both tasksFollow-Along Instructions:
- Save as
de-onboarding/ex2_repository.py. - Ensure PostgreSQL is running.
- Configure editor for 4-space indentation per PEP 8.
- Run tests with
pytest. - How to Test:
- Add test:
def test_bulk_add(test_db): repo = TaskRepository(test_db) tasks = [ Task(task_id="T005", description="Halal Task 5", status="Pending"), Task(task_id="T006", description="Halal Task 6", status="Completed") ] repo.bulk_add(tasks) retrieved = repo.find_all() assert len(retrieved) == 5 # 3 initial + 2 new assert any(task.task_id == "T005" for task in retrieved) repo.close()
- Add test:
Exercise 3: Debug DAO Connection Bug
Fix buggy TaskDAO code with incorrect connection handling, ensuring 4-space indentation per PEP 8.
Buggy Code:
class TaskDAO:
def __init__(self, db_config: dict):
self.conn = psycopg2.connect(db_config) # Bug: Incorrect argument
self.cursor = self.conn.cursor()Follow-Along Instructions:
- Save as
de-onboarding/ex3_debug_dao.py. - Ensure PostgreSQL is running.
- Configure editor for 4-space indentation per PEP 8.
- Run tests to verify fix.
- How to Test:
- Test with valid
db_configand verify connection.
- Test with valid
Exercise 4: Pydantic Validation
Add validation to Task model to ensure status is one of [“Pending”, “In Progress”, “Completed”], with 4-space indentation per PEP 8.
Expected Output:
Task(task_id="T001", description="Halal Test", status="Invalid") # Raises ValidationErrorFollow-Along Instructions:
- Save as
de-onboarding/ex4_pydantic.py. - Configure editor for 4-space indentation per PEP 8.
- Run tests with
pytest. - How to Test:
- Add test:
def test_invalid_status(test_db): with pytest.raises(pydantic.ValidationError): Task(task_id="T001", description="Halal Test", status="Invalid")
- Add test:
Exercise 5: Conceptual Analysis
Explain the trade-offs between DAO and Repository patterns, saving to ex5_concepts.txt, with 4-space indentation per PEP 8 in code examples.
Expected Output (ex5_concepts.txt):
DAO provides low-level database access, tightly coupled to SQL, while Repository abstracts data as a collection, decoupling business logic. DAO is simpler but less flexible; Repository is more testable but adds complexity.Follow-Along Instructions:
- Save as
de-onboarding/ex5_concepts.py. - Configure editor for 4-space indentation per PEP 8.
- Write explanation and save to
ex5_concepts.txt.
Exercise 6: Task and Sales Data Integration
Write a function to count completed tasks per product from data/sales.csv, using the TaskRepository with a PostgreSQL database, with 4-space indentation per PEP 8.
Sample Input (data/sales.csv):
product,price,quantity
Halal Laptop,999.99,2
Halal Mouse,24.99,10
Halal Keyboard,49.99,5Expected Output:
# {'Halal Laptop': 1, 'Halal Mouse': 0, 'Halal Keyboard': 0}Follow-Along Instructions:
- Save as
de-onboarding/ex6_task_sales.py. - Ensure PostgreSQL is running and
data/sales.csvexists. - Configure editor for 4-space indentation per PEP 8.
- Run tests with
pytest. - How to Test:
- Add test:
def test_task_sales_integration(test_db): repo = TaskRepository(test_db) result = count_completed_tasks_per_product("data/sales.csv", repo) assert result["Halal Laptop"] == 1 assert result["Halal Mouse"] == 0 repo.close()
- Add test:
46.5 Exercise Solutions
Solution to Exercise 1: Extend TaskDAO with Status Filter
# File: de-onboarding/ex1_task_dao.py
from typing import List, Optional # Import typing for type annotations
import psycopg2 # Import psycopg2 for PostgreSQL connectivity
from pydantic import BaseModel, validator # Import Pydantic for validation
class Task(BaseModel): # Define Pydantic model for task validation
"""Pydantic model to validate task data."""
task_id: str # Task ID as string
description: str # Task description as string
status: str # Task status as string
@validator("description") # Validate description field
def description_must_be_halal(cls, v: str) -> str:
"""Ensure description starts with 'Halal' for Sharia compliance."""
if not v.startswith("Halal"): # Check for 'Halal' prefix
raise ValueError("Description must start with 'Halal'") # Raise error if invalid
return v # Return valid description
class TaskDAO: # Define DAO class for database operations
def __init__(self, db_config: dict): # Initialize with database config
"""Initialize DAO with PostgreSQL configuration."""
print(f"Connecting with config: {db_config}") # Debug: Log config to diagnose issues
self.conn = psycopg2.connect(**db_config) # Connect to PostgreSQL
self.cursor = self.conn.cursor() # Create cursor for queries
def get_tasks_by_status(self, status: str) -> List[Task]: # Filter tasks by status
"""Get tasks with the specified status from the tasks table."""
query = "SELECT task_id, description, status FROM tasks WHERE status = %s" # SQL query to filter by status
print(f"Executing query: {query}") # Debug: Log query for troubleshooting
self.cursor.execute(query, (status,)) # Execute query with status
results = self.cursor.fetchall() # Fetch all matching tasks
tasks = [Task(task_id=row[0], description=row[1], status=row[2]) for row in results] # Convert to Task objects
print(f"Retrieved {len(tasks)} tasks with status {status}") # Debug: Log number of tasks
return tasks # Return list of tasks
def close(self) -> None: # Close database connection
"""Close cursor and connection to PostgreSQL."""
self.cursor.close() # Close cursor
self.conn.close() # Close connectionSolution to Exercise 2: Repository Bulk Add
# File: de-onboarding/ex2_repository.py
from typing import List # Import typing for type annotations
from task_dao import TaskDAO, Task # Import TaskDAO and Task model
class TaskRepository: # Define Repository class for abstract data access
def __init__(self, db_config: dict): # Initialize with database config
"""Initialize repository with DAO."""
self.dao = TaskDAO(db_config) # Create DAO instance
def bulk_add(self, tasks: List[Task]) -> None: # Add multiple tasks
"""Add multiple tasks to the repository."""
for task in tasks: # Iterate through tasks
self.dao.create_task(task) # Call DAO to insert each task
print(f"Repository: Added {len(tasks)} tasks") # Debug: Log number of tasks added
def close(self) -> None: # Close repository
"""Close the underlying DAO connection."""
self.dao.close() # Call DAO to close connectionSolution to Exercise 3: Debug DAO Connection Bug
# File: de-onboarding/ex3_debug_dao.py
from typing import List, Optional # Import typing for type annotations
import psycopg2 # Import psycopg2 for PostgreSQL connectivity
from pydantic import BaseModel # Import Pydantic for validation
class Task(BaseModel): # Define Pydantic model for task validation
"""Pydantic model to validate task data."""
task_id: str # Task ID as string
description: str # Task description as string
status: str # Task status as string
class TaskDAO: # Define DAO class for database operations
def __init__(self, db_config: dict): # Initialize with database config
"""Initialize DAO with PostgreSQL configuration."""
print(f"Connecting with config: {db_config}") # Debug: Log config to diagnose issues
self.conn = psycopg2.connect(**db_config) # Fix: Unpack dict to connect
self.cursor = self.conn.cursor() # Create cursor for queries
print("TaskDAO initialized") # Debug: Confirm initialization
def close(self) -> None: # Close database connection
"""Close cursor and connection to PostgreSQL."""
self.cursor.close() # Close cursor
self.conn.close() # Close connectionExplanation:
- Bug:
psycopg2.connect(db_config)passed the dict directly, causing aTypeError. Fixed by unpacking with**db_configto pass parameters correctly.
Solution to Exercise 4: Pydantic Validation
# File: de-onboarding/ex4_pydantic.py
from pydantic import BaseModel, validator # Import Pydantic for validation
class Task(BaseModel): # Define Pydantic model for task validation
"""Pydantic model to validate task data with type safety."""
task_id: str # Task ID as string
description: str # Task description as string
status: str # Task status as string
@validator("description") # Validate description field
def description_must_be_halal(cls, v: str) -> str:
"""Ensure description starts with 'Halal' for Sharia compliance."""
if not v.startswith("Halal"): # Check for 'Halal' prefix
raise ValueError("Description must start with 'Halal'") # Raise error if invalid
return v # Return valid description
@validator("status") # Validate status field
def status_must_be_valid(cls, v: str) -> str:
"""Ensure status is one of allowed values."""
valid_statuses = ["Pending", "In Progress", "Completed"] # Define valid statuses
if v not in valid_statuses: # Check if status is valid
raise ValueError(f"Status must be one of {valid_statuses}") # Raise error if invalid
return v # Return valid statusSolution to Exercise 5: Conceptual Analysis
# File: de-onboarding/ex5_concepts.py
def write_concepts(): # Write conceptual analysis to file
"""Write trade-offs between DAO and Repository patterns to ex5_concepts.txt."""
explanation = """
DAO provides low-level database access, tightly coupled to SQL, while Repository abstracts data as a collection, decoupling business logic. DAO is simpler but less flexible; Repository is more testable but adds complexity.
Example DAO:
def create_task(self, task: Task) -> None:
query = "INSERT INTO tasks (task_id, description, status) VALUES (%s, %s, %s)"
self.cursor.execute(query, (task.task_id, task.description, task.status))
Example Repository:
def add(self, task: Task) -> None:
self.dao.create_task(task)
""" # Define explanation with code examples
with open("ex5_concepts.txt", "w") as f: # Open file for writing
f.write(explanation) # Write explanation to file
write_concepts() # Run functionSolution to Exercise 6: Task and Sales Data Integration
# File: de-onboarding/ex6_task_sales.py
import pandas as pd # Import pandas for CSV processing
from task_repository import TaskRepository # Import TaskRepository for task access
def count_completed_tasks_per_product(csv_path: str, repo: TaskRepository) -> dict:
"""Count completed tasks per product from sales.csv."""
df = pd.read_csv(csv_path) # Load sales CSV into DataFrame
print(f"Sales data head:\n{df.head()}") # Debug: Inspect DataFrame for issues
products = df["product"].dropna().unique() # Get unique non-null products
result = {product: 0 for product in products} # Initialize result dictionary
tasks = repo.find_all() # Retrieve all tasks from repository
for task in tasks: # Iterate through tasks
if task.status == "Completed": # Check if task is completed
for product in products: # Check each product
if product.lower() in task.description.lower(): # Match product in description
result[product] += 1 # Increment count for product
print(f"Completed tasks per product: {result}") # Debug: Log result
return result # Return dictionary of counts
# Test code for standalone execution
if __name__ == "__main__":
db_config = {
"dbname": "postgres",
"user": "postgres",
"password": "postgres",
"host": "localhost",
"port": "5432"
} # Define test database config
repo = TaskRepository(db_config) # Create repository instance
result = count_completed_tasks_per_product("data/sales.csv", repo) # Run function
print(result) # Print result
repo.close() # Close repository46.6 Chapter Summary and Connection to Chapter 47
In this chapter, you’ve mastered:
- DAO Pattern: Encapsulated database operations with type-safe
TaskDAO(O(1) for single operations with indexedtask_id, O(n) for retrieving all tasks, benchmarked with large datasets). - Repository Pattern: Abstracted data access with
TaskRepository, improving testability and modularity. - Type-Safe Integration: Used
psycopg2and Pydantic for reliable, Sharia-compliant database interactions, validated with comprehensive tests. - Testing: Validated operations, analytics, and large datasets with
pytest, ensuring robustness for production scenarios. - Task-Sales Analytics: Integrated tasks with
sales.csvfor financial analytics, handling edge cases like null products, aligning with Hijra Group’s fintech pipeline. - White-Space Sensitivity and PEP 8: Ensured 4-space indentation, with detailed comments explaining each operation, akin to Chapter 3.
The micro-project implemented a modular data access layer for a Sharia-compliant task management system, a key component of Hijra Group’s analytics pipeline, using a PostgreSQL database and config.yaml. It included task status summaries by product, tested with pytest for reliability, clarified the transition from data/tasks.db (SQLite) to PostgreSQL, and prepared for capstone projects. This sets the stage for integrating with web frameworks in Chapters 52–53, where TaskRepository can be used in Django/FastAPI endpoints to manage financial tasks.
Connection to Chapter 47
Chapter 47 introduces Jupyter Notebooks for Data Development, building on this chapter:
- Data Access: Extends
TaskRepositoryto load task data into Jupyter for interactive analysis, usingsales.csvfor sales data exploration. - Modularity: Reuses
task_dao.pyandtask_repository.pyin notebooks for task analytics, maintaining modularity. - Fintech Context: Prepares for interactive analytics of sales and task data, aligning with Hijra Group’s operational reporting, maintaining PEP 8’s 4-space indentation for consistent code.