26 - Python and BigQuery Integration
Complexity: Moderate (M)
26.0 Introduction: Why This Matters for Data Engineering
In data engineering, integrating Python with cloud-based data warehouses like Google BigQuery is essential for automating large-scale analytics, particularly for Hijra Group’s Sharia-compliant fintech operations. BigQuery, a serverless data warehouse, handles petabytes of data with millisecond query performance, leveraging Google’s Dremel engine for distributed query execution. Python integration via the google-cloud-bigquery library enables programmatic data loading, querying, and transformation, critical for building scalable data pipelines. Building on Chapter 25 (BigQuery Fundamentals), this chapter focuses on type-annotated Python code to automate sales data processing, ensuring robust, testable pipelines with Pyright verification and pytest tests, aligning with Hijra Group’s need for secure, compliant analytics.
This chapter uses data/sales.csv and config.yaml (Appendix 1) for data loading and validation, avoiding advanced concepts like concurrency (Chapter 40) or dbt transformations (Chapter 54). All code includes type annotations (introduced in Chapter 7), is verified by Pyright, and is tested with pytest (introduced in Chapter 9). It adheres to PEP 8’s 4-space indentation, preferring spaces over tabs to avoid IndentationError, ensuring compatibility with Hijra Group’s pipeline standards.
Data Engineering Workflow Context
This diagram illustrates Python and BigQuery integration in a data pipeline:
flowchart TD
A["Raw Data (CSV)"] --> B["Python Script with google-cloud-bigquery"]
B --> C{"BigQuery Operations"}
C -->|Load Data| D["BigQuery Table"]
C -->|Execute Query| E["Query Results"]
D --> F["Processed Data"]
E --> F
F --> G["Storage/Reporting"]
classDef data fill:#f9f9f9,stroke:#333,stroke-width:2px
classDef process fill:#d0e0ff,stroke:#336,stroke-width:1px
classDef storage fill:#ddffdd,stroke:#363,stroke-width:1px
class A,D,E,F data
class B,C process
class G storageBuilding On and Preparing For
- Building On:
- Chapter 3: Builds on Pandas for data loading (
pd.read_csv), used for preprocessing before BigQuery upload. - Chapter 7: Applies type annotations for type-safe code, verified by Pyright.
- Chapter 9: Incorporates pytest for testing pipeline components.
- Chapter 25: Extends BigQuery table creation and querying with Python automation.
- Chapter 3: Builds on Pandas for data loading (
- Preparing For:
- Chapter 27: Enables advanced BigQuery querying (e.g., CTEs, window functions).
- Chapter 28: Supports data warehouse design with star schemas.
- Chapter 29: Prepares for BigQuery optimization techniques.
- Chapter 31: Lays groundwork for data lakes with Google Cloud Storage.
What You’ll Learn
This chapter covers:
- BigQuery Client Setup: Initializing
google-cloud-bigquerywith type annotations. - Data Loading: Uploading Pandas DataFrames to BigQuery tables.
- Query Execution: Running parameterized queries with type-safe results.
- Validation and Testing: Ensuring data integrity with Pydantic and pytest.
- Micro-Project: A type-annotated pipeline to load and query sales data, exporting results to JSON.
By the end, you’ll build a type-safe pipeline that loads data/sales.csv into BigQuery, validates data using config.yaml, queries sales metrics, and exports results, all with 4-space indentation per PEP 8. The pipeline is tested with pytest, including edge cases (empty.csv, invalid.csv, malformed.csv, negative.csv), ensuring robustness for Hijra Group’s analytics.
Follow-Along Tips:
- Create
de-onboarding/data/and populate withsales.csv,config.yaml,empty.csv,invalid.csv,malformed.csv, andnegative.csvper Appendix 1. - Install libraries:
pip install google-cloud-bigquery pandas pyyaml pydantic pytest. - Set up Google Cloud credentials (see micro-project setup).
- Use print statements (e.g.,
print(df.head())) to debug DataFrames. - Verify file paths with
ls data/(Unix/macOS) ordir data\(Windows). - Use UTF-8 encoding to avoid
UnicodeDecodeError. - Configure editor for 4-space indentation per PEP 8 (VS Code: “Editor: Tab Size” = 4, “Editor: Insert Spaces” = true, “Editor: Detect Indentation” = false).
- Run
python -tt script.pyto detect tab/space mixing.
26.1 BigQuery Client Setup
The google-cloud-bigquery library provides a type-safe client for BigQuery operations. The client requires Google Cloud credentials (a JSON key file) and a project ID, ensuring secure access to BigQuery resources.
26.1.1 Initializing the BigQuery Client
Create a type-annotated client to interact with BigQuery.
# File: de-onboarding/bigquery_setup.py
from google.cloud import bigquery # Import BigQuery client
from typing import Optional # For type annotations
def init_bigquery_client(project_id: str, credentials_path: Optional[str] = None) -> bigquery.Client:
"""Initialize BigQuery client with credentials."""
print(f"Initializing BigQuery client for project: {project_id}") # Debug
if credentials_path:
print(f"Using credentials: {credentials_path}") # Debug
client = bigquery.Client.from_service_account_json(credentials_path, project=project_id)
else:
print("Using default credentials") # Debug
client = bigquery.Client(project=project_id)
print("Client initialized successfully") # Confirm
return client
# Example usage
if __name__ == "__main__":
project_id = "your-project-id" # Replace with your Google Cloud project ID
credentials_path = "path/to/credentials.json" # Replace with your credentials path
client = init_bigquery_client(project_id, credentials_path)
print(f"Client project: {client.project}") # Debug
# Expected Output:
# Initializing BigQuery client for project: your-project-id
# Using credentials: path/to/credentials.json
# Client initialized successfully
# Client project: your-project-idFollow-Along Instructions:
- Install library:
pip install google-cloud-bigquery. - Save as
de-onboarding/bigquery_setup.py. - Configure editor for 4-space indentation per PEP 8.
- Replace
project_idandcredentials_pathwith your Google Cloud details (see micro-project setup). - Run:
python bigquery_setup.py. - Verify output confirms client initialization.
- Common Errors:
- GoogleAuthError: Ensure
credentials_pathpoints to a valid JSON key file. Printos.path.exists(credentials_path). - ModuleNotFoundError: Install
google-cloud-bigquery. - IndentationError: Use 4 spaces (not tabs). Run
python -tt bigquery_setup.py.
- GoogleAuthError: Ensure
Key Points:
- Type Annotations:
bigquery.Clientensures type-safe operations, verified by Pyright. - Credentials: JSON key file from Google Cloud Console authenticates the client.
- Underlying Implementation: The client uses OAuth 2.0 for authentication, connecting to BigQuery’s REST API.
- Performance Considerations:
- Time Complexity: O(1) for client initialization.
- Space Complexity: O(1) for client object.
- Implication: Secure client setup enables programmatic BigQuery access for Hijra Group’s pipelines.
26.2 Data Loading to BigQuery
Load Pandas DataFrames into BigQuery tables using type-annotated code. BigQuery supports schema auto-detection, but explicit schemas ensure type safety.
26.2.1 Uploading a DataFrame
Upload a cleaned DataFrame to a BigQuery table.
# File: de-onboarding/bigquery_load.py
from google.cloud import bigquery # Import BigQuery client
import pandas as pd # Import Pandas
from typing import List # For type annotations
def load_dataframe_to_bigquery(
client: bigquery.Client,
df: pd.DataFrame,
dataset_id: str,
table_id: str
) -> None:
"""Load DataFrame to BigQuery table."""
print(f"Loading DataFrame to {dataset_id}.{table_id}") # Debug
table_ref = f"{client.project}.{dataset_id}.{table_id}" # Full table path
job_config = bigquery.LoadJobConfig(
write_disposition="WRITE_TRUNCATE", # Overwrite table
schema=[
bigquery.SchemaField("product", "STRING"),
bigquery.SchemaField("price", "FLOAT"),
bigquery.SchemaField("quantity", "INTEGER")
]
)
print(f"DataFrame rows: {len(df)}") # Debug
job = client.load_table_from_dataframe(df, table_ref, job_config=job_config)
job.result() # Wait for job completion
print(f"Loaded {len(df)} rows to {table_ref}") # Confirm
# Example usage
if __name__ == "__main__":
project_id = "your-project-id"
client = bigquery.Client(project=project_id)
df = pd.read_csv("data/sales.csv").dropna() # Load and clean CSV
load_dataframe_to_bigquery(client, df, "sales_dataset", "sales_table")
# Expected Output:
# Loading DataFrame to sales_dataset.sales_table
# DataFrame rows: 4
# Loaded 4 rows to your-project-id.sales_dataset.sales_tableFollow-Along Instructions:
- Ensure
data/sales.csvexists per Appendix 1. - Save as
de-onboarding/bigquery_load.py. - Configure editor for 4-space indentation per PEP 8.
- Replace
project_idand ensure dataset exists in BigQuery. - Run:
python bigquery_load.py. - Verify table in BigQuery Console.
- Common Errors:
- NotFound: Ensure dataset exists. Create via BigQuery Console or programmatically (see micro-project setup).
- ValueError: Check DataFrame schema matches
job_config.schema. Printdf.dtypes. - IndentationError: Use 4 spaces (not tabs). Run
python -tt bigquery_load.py.
Key Points:
- Type Annotations:
pd.DataFrameandbigquery.Clientensure type safety. - Schema Definition: Explicit schemas prevent type mismatches.
- Underlying Implementation: Data is serialized to Parquet and uploaded via BigQuery’s streaming API.
- Performance Considerations:
- Time Complexity: O(n) for n rows, dominated by network transfer.
- Space Complexity: O(n) for DataFrame in memory.
- Implication: Efficient for loading sales data into BigQuery for analytics.
26.3 Query Execution
Execute parameterized queries to retrieve metrics, ensuring type-safe results with Pydantic.
26.3.1 Running Queries
Run a query to compute total sales by product.
# File: de-onboarding/bigquery_query.py
from google.cloud import bigquery # Import BigQuery client
from pydantic import BaseModel # For result validation
from typing import List # For type annotations
class SalesResult(BaseModel): # Pydantic model for query results
product: str
total_sales: float
def query_sales(
client: bigquery.Client,
dataset_id: str,
table_id: str,
max_quantity: int
) -> List[SalesResult]:
"""Query total sales by product with quantity filter."""
query = """
SELECT product, SUM(price * quantity) AS total_sales
FROM `@project_id.@dataset_id.@table_id`
WHERE quantity <= @max_quantity
GROUP BY product
"""
query = query.replace("@project_id", client.project).replace(
"@dataset_id", dataset_id
).replace("@table_id", table_id)
job_config = bigquery.QueryJobConfig(
query_parameters=[bigquery.ScalarQueryParameter("max_quantity", "INT64", max_quantity)]
)
print(f"Running query: {query}") # Debug
query_job = client.query(query, job_config=job_config)
results = [SalesResult(**row) for row in query_job.result()]
print(f"Retrieved {len(results)} results") # Confirm
return results
# Example usage
if __name__ == "__main__":
project_id = "your-project-id"
client = bigquery.Client(project=project_id)
results = query_sales(client, "sales_dataset", "sales_table", 100)
for result in results:
print(f"{result.product}: ${result.total_sales}") # Debug
# Expected Output:
# Running query: SELECT product, SUM(price * quantity) AS total_sales ...
# Retrieved 3 results
# Halal Laptop: $1999.98
# Halal Mouse: $249.9
# Halal Keyboard: $249.95Follow-Along Instructions:
- Install library:
pip install pydantic. - Save as
de-onboarding/bigquery_query.py. - Configure editor for 4-space indentation per PEP 8.
- Run:
python bigquery_query.py. - Verify output shows sales metrics.
- Common Errors:
- NotFound: Ensure table exists. Print
client.list_tables(dataset_id). - ValidationError: Check Pydantic model matches query schema. Print query results.
- IndentationError: Use 4 spaces (not tabs). Run
python -tt bigquery_query.py.
- NotFound: Ensure table exists. Print
Key Points:
- Type Annotations:
List[SalesResult]ensures type-safe results. - Pydantic: Validates query results, catching schema mismatches.
- Parameterized Queries: Prevent SQL injection, improving security.
- Underlying Implementation: Queries are executed via BigQuery’s Dremel engine, distributing computation across nodes.
- Optimization Note: BigQuery’s Dremel engine distributes queries across nodes using columnar storage, optimizing O(n) scans. Partitioning (Chapter 29) can further reduce scanned data.
- Performance Considerations:
- Time Complexity: O(n) for scanning n rows, optimized by BigQuery’s columnar storage.
- Space Complexity: O(k) for k result rows.
- Implication: Efficient for aggregating sales metrics in Hijra Group’s pipelines.
26.4 Micro-Project: Type-Safe Sales Data Pipeline
Project Requirements
Build a type-annotated pipeline to load data/sales.csv into BigQuery, validate data using config.yaml, query sales metrics, and export results to data/sales_metrics.json. The pipeline supports Hijra Group’s analytics by ensuring Sharia-compliant data processing, with pytest tests for reliability, including edge cases (empty.csv, invalid.csv, malformed.csv, negative.csv).
- Load: Read
sales.csvwith Pandas andconfig.yamlwith PyYAML. - Validate: Use Pydantic and Pandas to enforce config rules (e.g., Halal prefix, max quantity).
- Upload: Load validated DataFrame to BigQuery.
- Query: Compute total sales by product, filtering by
max_quantity. - Export: Save results to JSON.
- Test: Use pytest with mocking to verify pipeline components and edge cases.
- Use: Type annotations, Pyright verification, and 4-space indentation per PEP 8.
Sample Input Files
data/sales.csv (Appendix 1):
product,price,quantity
Halal Laptop,999.99,2
Halal Mouse,24.99,10
Halal Keyboard,49.99,5
,29.99,3
Monitor,invalid,2
Headphones,5.00,150data/config.yaml (Appendix 1):
min_price: 10.0
max_quantity: 100
required_fields:
- product
- price
- quantity
product_prefix: 'Halal'
max_decimals: 2Data Processing Flow
flowchart TD
A["Input CSV
sales.csv"] --> B["Load CSV
pandas.read_csv"]
B --> C["Pandas DataFrame"]
C --> D["Read YAML
config.yaml"]
D --> E["Validate DataFrame
Pydantic/Pandas"]
E -->|Invalid| F["Log Warning
Skip Record"]
E -->|Valid| G["Load to BigQuery
google-cloud-bigquery"]
G --> H["Query Metrics
BigQuery Client"]
H --> I["Export JSON
sales_metrics.json"]
F --> J["End Processing"]
I --> J
classDef data fill:#f9f9f9,stroke:#333,stroke-width:2px
classDef process fill:#d0e0ff,stroke:#336,stroke-width:1px
classDef error fill:#ffdddd,stroke:#933,stroke-width:1px
classDef endpoint fill:#ddffdd,stroke:#363,stroke-width:1px
class A,C data
class B,D,E,G,H,I process
class F error
class J endpointAcceptance Criteria
- Go Criteria:
- Loads
sales.csvandconfig.yamlcorrectly. - Validates data for Halal prefix, numeric price/quantity, positive prices, and config rules.
- Uploads validated data to BigQuery table.
- Queries total sales by product, filtering by
max_quantity. - Exports results to
data/sales_metrics.json. - Includes type annotations, verified by Pyright.
- Passes pytest tests for loading, validation, querying, and edge cases (
empty.csv,invalid.csv,malformed.csv,negative.csv). - Uses 4-space indentation per PEP 8, preferring spaces over tabs.
- Loads
- No-Go Criteria:
- Fails to load files or upload to BigQuery.
- Incorrect validation or query results.
- Missing JSON export.
- Lacks type annotations or fails Pyright.
- Inconsistent indentation or tab/space mixing.
Common Pitfalls to Avoid
- Authentication Errors:
- Problem: Invalid credentials cause
GoogleAuthError. - Solution: Verify credentials file path. Print
os.path.exists(credentials_path).
- Problem: Invalid credentials cause
- Credential Setup Errors:
- Problem:
GoogleAuthErrordue to missing environment variable or incorrect JSON key. - Solution: Set
export GOOGLE_APPLICATION_CREDENTIALS=path/to/credentials.json(Unix) orset GOOGLE_APPLICATION_CREDENTIALS=path/to/credentials.json(Windows). Verify JSON key format in Google Cloud Console.
- Problem:
- Dataset Not Found:
- Problem:
NotFounderror due to missing BigQuery dataset. - Solution: Create dataset via BigQuery Console or programmatically (see setup instructions). Print
client.list_datasets().
- Problem:
- Schema Mismatches:
- Problem: DataFrame schema doesn’t match BigQuery table.
- Solution: Print
df.dtypesand compare withjob_config.schema.
- Query Errors:
- Problem: Invalid SQL syntax or missing table.
- Solution: Print query string and verify table existence with
client.list_tables(dataset_id).
- Pydantic Validation:
- Problem: Query results don’t match Pydantic model.
- Solution: Print raw query results to check schema.
- SQL Injection Risk:
- Problem: Using string concatenation instead of parameterized queries risks SQL injection.
- Solution: Use
bigquery.QueryJobConfigwithScalarQueryParameterfor safe query parameterization.
- Mock Testing Errors:
- Problem:
ValidationErrorin mock tests due to incorrect mock results. - Solution: Print mock results (
print(mock_client.query.return_value.result.return_value)) and ensure they match theSalesResultmodel (product,total_sales).
- Problem:
- IndentationError:
- Problem: Mixed spaces/tabs.
- Solution: Use 4 spaces per PEP 8. Run
python -tt sales_pipeline.py.
How This Differs from Production
In production, this solution would include:
- Error Handling: Robust try/except with retries (Chapter 40).
- Scalability: Chunked loading for large datasets (Chapter 40).
- Logging: Structured logging to files (Chapter 52).
- Orchestration: Airflow for scheduling (Chapter 56).
- Security: Encrypted credentials with Secret Manager (Chapter 65).
- Schema Evolution: Support for schema updates (e.g., adding columns like
transaction_date) usingSCHEMA_UPDATE_OPTION(Chapter 28).
Performance Considerations
- Validation:
- Time Complexity: O(n) for iterating n rows, dominated by Pandas filtering and Pydantic validation.
- Space Complexity: O(n) for storing validated DataFrame.
- Data Loading:
- Time Complexity: O(n) for n rows, dominated by network transfer.
- Space Complexity: O(n) for DataFrame in memory.
- Querying:
- Time Complexity: O(n) for scanning n rows, optimized by BigQuery’s columnar storage.
- Space Complexity: O(k) for k result rows.
- Cost Model: BigQuery charges separately for storage ($0.02/GB/month) and query processing ($5/TB scanned). The free tier (10 GB storage, 1 TB queries/month) lacks high-availability guarantees, so monitor usage for production needs. Minimize scanned data using filters and partitioning (Chapter 29) to reduce costs.
Implementation
# File: de-onboarding/utils.py (updated from Chapter 3)
from typing import Union, Dict, Any # For type annotations
def is_numeric(s: str, max_decimals: int = 2) -> bool:
"""Check if string is a decimal number with up to max_decimals. Allows negative numbers, filtered by price > 0."""
parts = s.split(".") # Split on decimal point
if len(parts) != 2 or not parts[0].replace("-", "").isdigit() or not parts[1].isdigit():
return False # Invalid format
return len(parts[1]) <= max_decimals # Check decimal places
def clean_string(s: Union[str, Any]) -> str:
"""Strip whitespace from string or convert to string."""
return str(s).strip()
def is_numeric_value(x: Any) -> bool:
"""Check if value is an integer or float."""
return isinstance(x, (int, float))
def has_valid_decimals(x: Any, max_decimals: int) -> bool:
"""Check if value has valid decimal places."""
return is_numeric(str(x), max_decimals)
def apply_valid_decimals(x: Any, max_decimals: int) -> bool:
"""Apply has_valid_decimals to a value."""
return has_valid_decimals(x, max_decimals)
def is_integer(x: Any) -> bool:
"""Check if value is an integer when converted to string."""
return str(x).isdigit()
def validate_sale(sale: Dict[str, Any], config: Dict[str, Any]) -> bool:
"""Validate sale based on config rules."""
required_fields = config["required_fields"]
min_price = config["min_price"]
max_quantity = config["max_quantity"]
prefix = config["product_prefix"]
max_decimals = config["max_decimals"]
print(f"Validating sale: {sale}") # Debug
for field in required_fields:
if field not in sale or not sale[field] or clean_string(sale[field]) == "":
print(f"Invalid sale: missing {field}: {sale}")
return False
product = clean_string(sale["product"])
if not product.startswith(prefix):
print(f"Invalid sale: product lacks '{prefix}' prefix: {sale}")
return False
price = clean_string(sale["price"])
if not is_numeric(price, max_decimals) or float(price) < min_price or float(price) <= 0:
print(f"Invalid sale: invalid price: {sale}")
return False
quantity = clean_string(sale["quantity"])
if not quantity.isdigit() or int(quantity) > max_quantity:
print(f"Invalid sale: invalid quantity: {sale}")
return False
return True
# File: de-onboarding/sales_pipeline.py
from google.cloud import bigquery # Import BigQuery client
import pandas as pd # Import Pandas
import yaml # For YAML parsing
import json # For JSON export
from pydantic import BaseModel # For validation
from typing import List, Dict, Any, Optional # For type annotations
import os # For file operations
import utils # Import custom utils
class Sale(BaseModel): # Pydantic model for sales data
product: str
price: float
quantity: int
class SalesResult(BaseModel): # Pydantic model for query results
product: str
total_sales: float
def init_bigquery_client(project_id: str, credentials_path: Optional[str] = None) -> bigquery.Client:
"""Initialize BigQuery client with credentials."""
print(f"Initializing BigQuery client for project: {project_id}")
if credentials_path:
print(f"Using credentials: {credentials_path}")
client = bigquery.Client.from_service_account_json(credentials_path, project=project_id)
else:
print("Using default credentials")
client = bigquery.Client(project=project_id)
print("Client initialized successfully")
return client
def create_dataset(client: bigquery.Client, dataset_id: str) -> None:
"""Create a BigQuery dataset."""
dataset_ref = f"{client.project}.{dataset_id}"
dataset = bigquery.Dataset(dataset_ref)
client.create_dataset(dataset, exists_ok=True) # Idempotent creation
print(f"Dataset {dataset_id} created")
def read_config(config_path: str) -> Dict[str, Any]:
"""Read YAML configuration."""
print(f"Opening config: {config_path}")
with open(config_path, "r") as file:
config = yaml.safe_load(file)
print(f"Loaded config: {config}")
return config
def load_and_validate_sales(csv_path: str, config: Dict[str, Any]) -> pd.DataFrame:
"""Load and validate sales CSV."""
print(f"Loading CSV: {csv_path}")
df = pd.read_csv(csv_path)
print("Initial DataFrame:")
print(df.head())
required_fields = config["required_fields"]
missing_fields = [f for f in required_fields if f not in df.columns]
if missing_fields:
print(f"Missing columns: {missing_fields}")
return pd.DataFrame()
df = df.dropna(subset=["product"])
df = df[df["product"].str.startswith(config["product_prefix"])]
df = df[df["quantity"].apply(utils.is_integer)]
df["quantity"] = df["quantity"].astype(int)
df = df[df["quantity"] <= config["max_quantity"]]
df = df[df["price"].apply(utils.is_numeric_value)]
df = df[df["price"] > 0]
df = df[df["price"] >= config["min_price"]]
df = df[df["price"].apply(lambda x: utils.apply_valid_decimals(x, config["max_decimals"]))]
# Validate with Pydantic
valid_rows = []
for _, row in df.iterrows():
sale = {"product": row["product"], "price": row["price"], "quantity": row["quantity"]}
try:
Sale(**sale)
valid_rows.append(row)
except ValueError as e:
print(f"Invalid sale: {sale}, Error: {e}")
df = pd.DataFrame(valid_rows)
print("Validated DataFrame:")
print(df)
return df
def load_dataframe_to_bigquery(
client: bigquery.Client,
df: pd.DataFrame,
dataset_id: str,
table_id: str
) -> None:
"""Load DataFrame to BigQuery table."""
print(f"Loading DataFrame to {dataset_id}.{table_id}")
table_ref = f"{client.project}.{dataset_id}.{table_id}"
job_config = bigquery.LoadJobConfig(
write_disposition="WRITE_TRUNCATE",
schema=[
bigquery.SchemaField("product", "STRING"),
bigquery.SchemaField("price", "FLOAT"),
bigquery.SchemaField("quantity", "INTEGER")
]
)
print(f"DataFrame rows: {len(df)}")
job = client.load_table_from_dataframe(df, table_ref, job_config=job_config)
job.result()
print(f"Loaded {len(df)} rows to {table_ref}")
def query_sales(
client: bigquery.Client,
dataset_id: str,
table_id: str,
max_quantity: int
) -> List[SalesResult]:
"""Query total sales by product."""
query = """
SELECT product, SUM(price * quantity) AS total_sales
FROM `@project_id.@dataset_id.@table_id`
WHERE quantity <= @max_quantity
GROUP BY product
"""
query = query.replace("@project_id", client.project).replace(
"@dataset_id", dataset_id
).replace("@table_id", table_id)
job_config = bigquery.QueryJobConfig(
query_parameters=[bigquery.ScalarQueryParameter("max_quantity", "INT64", max_quantity)]
)
print(f"Running query: {query}")
query_job = client.query(query, job_config=job_config)
results = [SalesResult(**row) for row in query_job.result()]
print(f"Retrieved {len(results)} results")
return results
def export_results(results: List[SalesResult], json_path: str) -> None:
"""Export query results to JSON."""
print(f"Writing to: {json_path}")
results_dict = [result.dict() for result in results]
print(f"Results: {results_dict}")
with open(json_path, "w") as file:
json.dump(results_dict, file, indent=2)
print(f"Exported results to {json_path}")
def main() -> None:
"""Main function to run the sales pipeline."""
project_id = "your-project-id" # Replace with your project ID
credentials_path = "path/to/credentials.json" # Replace with your credentials
csv_path = "data/sales.csv"
config_path = "data/config.yaml"
dataset_id = "sales_dataset"
table_id = "sales_table"
json_path = "data/sales_metrics.json"
client = init_bigquery_client(project_id, credentials_path)
create_dataset(client, dataset_id) # Create dataset if not exists
config = read_config(config_path)
df = load_and_validate_sales(csv_path, config)
if not df.empty:
load_dataframe_to_bigquery(client, df, dataset_id, table_id)
results = query_sales(client, dataset_id, table_id, config["max_quantity"])
export_results(results, json_path)
print("\nSales Report:")
for result in results:
print(f"{result.product}: ${result.total_sales}")
else:
print("No valid data to process")
if __name__ == "__main__":
main()Test Implementation
# File: de-onboarding/tests/test_sales_pipeline.py
import pytest
import pandas as pd
from google.cloud import bigquery
from sales_pipeline import init_bigquery_client, load_and_validate_sales, query_sales, SalesResult
from typing import List
import yaml
from unittest.mock import MagicMock
@pytest.fixture
def config() -> dict:
"""Load config.yaml."""
with open("data/config.yaml", "r") as file:
return yaml.safe_load(file)
@pytest.fixture
def mock_client() -> bigquery.Client:
"""Mock BigQuery client for testing."""
client = MagicMock(spec=bigquery.Client)
client.project = "your-project-id"
return client
def test_load_and_validate_sales(config: dict) -> None:
"""Test loading and validating sales.csv."""
df = load_and_validate_sales("data/sales.csv", config)
assert len(df) == 3 # Expect 3 valid rows
assert all(df["product"].str.startswith("Halal"))
assert all(df["price"] >= config["min_price"])
assert all(df["quantity"] <= config["max_quantity"])
def test_empty_csv(config: dict) -> None:
"""Test loading empty.csv."""
df = load_and_validate_sales("data/empty.csv", config)
assert df.empty, "Empty CSV should return empty DataFrame"
def test_invalid_csv(config: dict) -> None:
"""Test loading invalid.csv with incorrect headers."""
df = load_and_validate_sales("data/invalid.csv", config)
assert df.empty, "Invalid headers should return empty DataFrame"
def test_malformed_csv(config: dict) -> None:
"""Test loading malformed.csv with non-integer quantity."""
df = load_and_validate_sales("data/malformed.csv", config)
assert len(df) == 1 # Expect 1 valid row (Halal Mouse)
assert df["product"].iloc[0] == "Halal Mouse"
def test_negative_csv(config: dict) -> None:
"""Test loading negative.csv with negative price."""
df = load_and_validate_sales("data/negative.csv", config)
assert len(df) == 1 # Expect 1 valid row (Halal Mouse)
assert df["product"].iloc[0] == "Halal Mouse"
def test_query_sales(mock_client: bigquery.Client, config: dict) -> None:
"""Test querying sales with mocked client."""
# Mock query results
mock_results = [
{"product": "Halal Laptop", "total_sales": 1999.98},
{"product": "Halal Mouse", "total_sales": 249.9},
{"product": "Halal Keyboard", "total_sales": 249.95}
]
mock_client.query.return_value.result.return_value = mock_results
results: List[SalesResult] = query_sales(mock_client, "sales_dataset", "sales_table", config["max_quantity"])
assert len(results) == 3
assert any(result.product == "Halal Laptop" and abs(result.total_sales - 1999.98) < 0.01 for result in results)Expected Outputs
data/sales_metrics.json:
[
{
"product": "Halal Laptop",
"total_sales": 1999.98
},
{
"product": "Halal Mouse",
"total_sales": 249.9
},
{
"product": "Halal Keyboard",
"total_sales": 249.95
}
]Console Output (abridged):
Initializing BigQuery client for project: your-project-id
Using credentials: path/to/credentials.json
Client initialized successfully
Dataset sales_dataset created
Opening config: data/config.yaml
Loaded config: {'min_price': 10.0, 'max_quantity': 100, ...}
Loading CSV: data/sales.csv
Initial DataFrame:
product price quantity
0 Halal Laptop 999.99 2
...
Validated DataFrame:
product price quantity
0 Halal Laptop 999.99 2
1 Halal Mouse 24.99 10
2 Halal Keyboard 49.99 5
Loading DataFrame to sales_dataset.sales_table
DataFrame rows: 3
Loaded 3 rows to your-project-id.sales_dataset.sales_table
Running query: SELECT product, SUM(price * quantity) AS total_sales ...
Retrieved 3 results
Writing to: data/sales_metrics.json
Exported results to data/sales_metrics.json
Sales Report:
Halal Laptop: $1999.98
Halal Mouse: $249.9
Halal Keyboard: $249.95How to Run and Test
Setup:
- Setup Checklist:
- Create
de-onboarding/data/and savesales.csv,config.yaml,empty.csv,invalid.csv,malformed.csv,negative.csvper Appendix 1. - Install libraries:
pip install google-cloud-bigquery pandas pyyaml pydantic pytest. - Create Google Cloud project and download service account JSON key to
path/to/credentials.json. - Set environment variable:
export GOOGLE_APPLICATION_CREDENTIALS=path/to/credentials.json(Unix) orset GOOGLE_APPLICATION_CREDENTIALS=path/to/credentials.json(Windows). - Create virtual environment:
python -m venv venv, activate (Windows:venv\Scripts\activate, Unix:source venv/bin/activate). - Verify Python 3.10+:
python --version. - Configure editor for 4-space indentation per PEP 8.
- Save
utils.py,sales_pipeline.py, andtests/test_sales_pipeline.py. - Create BigQuery Dataset: Run the following to create
sales_dataset:from google.cloud import bigquery client = bigquery.Client(project="your-project-id") dataset_ref = f"{client.project}.sales_dataset" dataset = bigquery.Dataset(dataset_ref) client.create_dataset(dataset, exists_ok=True) print("Dataset sales_dataset created") - Note on Costs: BigQuery charges for storage and query processing (e.g., $5/TB queried). Use the free tier (10 GB storage, 1 TB queries/month) and monitor usage in Google Cloud Console.
- Best Practices: Review Google Cloud’s BigQuery best practices (https://cloud.google.com/bigquery/docs/best-practices) for naming conventions and query optimization.
- Documentation: Refer to the BigQuery Python client documentation (https://googleapis.dev/python/bigquery/latest/index.html) for detailed API usage, alongside best practices.
- Create
- Troubleshooting:
- If
GoogleAuthError, verify credentials path andGOOGLE_APPLICATION_CREDENTIALS. - If
NotFound, create dataset as shown above. - If
yaml.YAMLError, printopen(config_path).read()to check syntax. - If
IndentationError, use 4 spaces. Runpython -tt sales_pipeline.py.
- If
- Setup Checklist:
Run:
- Open terminal in
de-onboarding/. - Run:
python sales_pipeline.py. - Outputs:
data/sales_metrics.json, console logs.
- Open terminal in
Test:
- Run:
pytest tests/test_sales_pipeline.py -v. - Verify tests pass, confirming loading, querying, and edge cases.
- Mock Testing Setup: Ensure
unittest.mockis available (included in Python’s standard library). Themock_clientfixture intest_sales_pipeline.pysimulates BigQuery interactions, allowing tests without a live connection.
- Run:
26.5 Practice Exercises
Exercise 1: BigQuery Client Initialization
Write a type-annotated function to initialize a BigQuery client, with 4-space indentation per PEP 8.
Expected Output:
Client initialized for project: your-project-idFollow-Along Instructions:
- Save as
de-onboarding/ex1_bigquery_client.py. - Configure editor for 4-space indentation per PEP 8.
- Run:
python ex1_bigquery_client.py. - How to Test:
- Add:
client = init_client("your-project-id", "path/to/credentials.json"); print(f"Client initialized for project: {client.project}"). - Verify output matches expected.
- Test with invalid credentials: Should raise
GoogleAuthError.
- Add:
Exercise 2: DataFrame Upload
Write a type-annotated function to upload a DataFrame to BigQuery, with 4-space indentation per PEP 8.
Sample Input:
df = pd.DataFrame({
"product": ["Halal Laptop", "Halal Mouse"],
"price": [999.99, 24.99],
"quantity": [2, 10]
})Expected Output:
Loaded 2 rows to your-project-id.sales_dataset.sales_tableNote: Before uploading, print df.dtypes to ensure the DataFrame schema matches job_config.schema (e.g., product: STRING, price: FLOAT, quantity: INTEGER).
Follow-Along Instructions:
- Save as
de-onboarding/ex2_upload.py. - Configure editor for 4-space indentation per PEP 8.
- Run:
python ex2_upload.py. - How to Test:
- Verify table in BigQuery Console.
- Test with empty DataFrame: Should load no rows.
Exercise 3: Analyze BigQuery’s Columnar Data Model
Explain how BigQuery’s columnar storage optimizes queries compared to row-based storage, with 4-space indentation per PEP 8.
Expected Output (save to de-onboarding/ex3_concepts.txt):
BigQuery’s columnar storage stores data by column, enabling faster queries for analytics by scanning only relevant columns. Row-based storage scans entire rows, increasing I/O for large datasets.Follow-Along Instructions:
- Save explanation as
de-onboarding/ex3_concepts.txt. - Configure editor for 4-space indentation per PEP 8.
- How to Test:
- Verify explanation addresses columnar storage benefits.
- Compare with BigQuery’s optimization note in Section 26.3.
Exercise 4: Debug Dataset Creation
Fix buggy code that fails to create a BigQuery dataset, with 4-space indentation per PEP 8.
Buggy Code:
from google.cloud import bigquery
def create_dataset(client: bigquery.Client, dataset_id: str) -> None:
dataset_ref = f"{client.project}.{dataset_id}"
dataset = bigquery.Dataset(dataset_ref)
client.create_dataset(dataset) # Bug: Missing exists_ok
print(f"Dataset {dataset_id} created")Expected Output:
Dataset sales_dataset createdFollow-Along Instructions:
- Save as
de-onboarding/ex4_debug_dataset.py. - Configure editor for 4-space indentation per PEP 8.
- Run:
python ex4_debug_dataset.pyto see error. - Fix and re-run.
- How to Test:
- Verify dataset in BigQuery Console.
- Test with existing dataset: Should not raise an error (idempotent).
- Test with invalid dataset ID (e.g.,
invalid@dataset): Should raiseInvalidArgument.
Exercise 5: Debug a Query Bug
Fix buggy code that uses incorrect query parameters, with 4-space indentation per PEP 8.
Buggy Code:
from google.cloud import bigquery
from pydantic import BaseModel
from typing import List
class SalesResult(BaseModel):
product: str
total_sales: float
def query_sales(client: bigquery.Client, dataset_id: str, table_id: str) -> List[SalesResult]:
query = """
SELECT product, SUM(price * quantity) AS total_sales
FROM `@project_id.@dataset_id.@table_id`
WHERE quantity <= @max_quantity
GROUP BY product
"""
query = query.replace("@project_id", client.project).replace(
"@dataset_id", dataset_id
).replace("@table_id", table_id)
job_config = bigquery.QueryJobConfig() # Bug: Missing query parameter
query_job = client.query(query, job_config=job_config)
return [SalesResult(**row) for row in query_job.result()]Expected Output:
[
{"product": "Halal Laptop", "total_sales": 1999.98},
{"product": "Halal Mouse", "total_sales": 249.9}
]Follow-Along Instructions:
- Save as
de-onboarding/ex5_debug.py. - Configure editor for 4-space indentation per PEP 8.
- Run:
python ex5_debug.pyto see error. - Fix and re-run.
- How to Test:
- Verify output matches expected.
- Test with different
max_quantityvalues.
26.6 Exercise Solutions
Solution to Exercise 1: BigQuery Client Initialization
from google.cloud import bigquery
from typing import Optional
def init_client(project_id: str, credentials_path: Optional[str] = None) -> bigquery.Client:
"""Initialize BigQuery client."""
print(f"Initializing client for project: {project_id}")
if credentials_path:
client = bigquery.Client.from_service_account_json(credentials_path, project=project_id)
else:
client = bigquery.Client(project=project_id)
print(f"Client initialized for project: {client.project}")
return client
# Test
client = init_client("your-project-id", "path/to/credentials.json")Solution to Exercise 2: DataFrame Upload
from google.cloud import bigquery
import pandas as pd
from typing import List
def upload_dataframe(
client: bigquery.Client,
df: pd.DataFrame,
dataset_id: str,
table_id: str
) -> None:
"""Upload DataFrame to BigQuery."""
print(f"Uploading to {dataset_id}.{table_id}")
table_ref = f"{client.project}.{dataset_id}.{table_id}"
job_config = bigquery.LoadJobConfig(
write_disposition="WRITE_TRUNCATE",
schema=[
bigquery.SchemaField("product", "STRING"),
bigquery.SchemaField("price", "FLOAT"),
bigquery.SchemaField("quantity", "INTEGER")
]
)
job = client.load_table_from_dataframe(df, table_ref, job_config=job_config)
job.result()
print(f"Loaded {len(df)} rows to {table_ref}")
# Test
client = bigquery.Client(project="your-project-id")
df = pd.DataFrame({
"product": ["Halal Laptop", "Halal Mouse"],
"price": [999.99, 24.99],
"quantity": [2, 10]
})
upload_dataframe(client, df, "sales_dataset", "sales_table")Solution to Exercise 3: Analyze BigQuery’s Columnar Data Model
# File: de-onboarding/ex3_concepts.txt
BigQuery’s columnar storage stores data by column, enabling faster queries for analytics by scanning only relevant columns. Row-based storage scans entire rows, increasing I/O for large datasets.Explanation:
- BigQuery’s columnar storage optimizes analytics by reducing I/O, as only queried columns are scanned, unlike row-based storage, which processes entire rows, increasing data access time for large datasets.
Solution to Exercise 4: Debug Dataset Creation
from google.cloud import bigquery
from typing import Optional
def create_dataset(client: bigquery.Client, dataset_id: str) -> None:
"""Create a BigQuery dataset."""
dataset_ref = f"{client.project}.{dataset_id}"
dataset = bigquery.Dataset(dataset_ref)
client.create_dataset(dataset, exists_ok=True) # Idempotent creation
print(f"Dataset {dataset_id} created")
# Test
client = bigquery.Client(project="your-project-id")
create_dataset(client, "sales_dataset")Explanation:
- Bug: Missing
exists_ok=Truecaused an error if the dataset already existed. - Fix: Added
exists_ok=Trueto make creation idempotent, preventing errors.
Solution to Exercise 5: Debug a Query Bug
from google.cloud import bigquery
from pydantic import BaseModel
from typing import List
class SalesResult(BaseModel):
product: str
total_sales: float
def query_sales(client: bigquery.Client, dataset_id: str, table_id: str, max_quantity: int) -> List[SalesResult]:
"""Query sales with fixed parameters."""
query = """
SELECT product, SUM(price * quantity) AS total_sales
FROM `@project_id.@dataset_id.@table_id`
WHERE quantity <= @max_quantity
GROUP BY product
"""
query = query.replace("@project_id", client.project).replace(
"@dataset_id", dataset_id
).replace("@table_id", table_id)
job_config = bigquery.QueryJobConfig(
query_parameters=[bigquery.ScalarQueryParameter("max_quantity", "INT64", max_quantity)]
)
query_job = client.query(query, job_config=job_config)
return [SalesResult(**row) for row in query_job.result()]
# Test
client = bigquery.Client(project="your-project-id")
results = query_sales(client, "sales_dataset", "sales_table", 100)
print([result.dict() for result in results])Explanation:
- Bug: Missing
query_parametersinjob_configcaused a query error. - Fix: Added
ScalarQueryParameterformax_quantity.
26.7 Chapter Summary and Connection to Chapter 27
In this chapter, you’ve mastered:
- BigQuery Client: Type-safe initialization with
google-cloud-bigquery. - Data Loading: Uploading Pandas DataFrames to BigQuery (O(n) time).
- Query Execution: Parameterized queries with Pydantic validation (O(n) scan time).
- Testing: Pytest with mocking for pipeline reliability, including edge cases (
empty.csv,invalid.csv,malformed.csv,negative.csv). - White-Space Sensitivity and PEP 8: 4-space indentation, avoiding
IndentationError.
The micro-project built a type-annotated pipeline to create a BigQuery dataset, load and validate sales data, query metrics using secure parameterized queries, and export results to JSON, verified by Pyright and tested with pytest, including mock tests for accessibility. Edge case tests ensure robustness, cost considerations enhance practical applicability, and the columnar data model analysis deepens understanding of BigQuery’s efficiency. This prepares for Chapter 27’s advanced querying techniques, such as CTEs and window functions, enabling complex analytics for Hijra Group’s pipelines.
Connection to Chapter 27
Chapter 27 (BigQuery Advanced Querying) builds on this chapter:
- Querying: Extends parameterized queries to complex analytics (e.g., window functions, CTEs).
- Data Structures: Uses DataFrames for query results, building on Pandas integration.
- Type Safety: Continues type annotations for robust code.
- Fintech Context: Prepares for advanced sales trend analysis, maintaining PEP 8 compliance.