25 - BigQuery Fundamentals
Complexity: Moderate (M)
25.0 Introduction: Why This Matters for Data Engineering
In data engineering, cloud-based analytics platforms like Google BigQuery are critical for processing large-scale financial transaction data, enabling Hijra Group to deliver Sharia-compliant fintech insights efficiently. BigQuery, a serverless data warehouse, supports petabyte-scale datasets with SQL-based querying, achieving query execution times of seconds for millions of rows due to its columnar storage and distributed architecture. This chapter introduces BigQuery fundamentals, including dataset creation, table management, and basic SQL queries, building on prior database knowledge from Chapters 12–24 (SQLite, PostgreSQL) and preparing for Python integration in Chapter 26.
BigQuery’s columnar storage optimizes analytical queries (e.g., aggregations) with O(n) complexity for n rows, leveraging Google’s Dremel engine for parallel processing. For Hijra Group’s analytics, BigQuery enables rapid sales data analysis, crucial for stakeholder reporting. This chapter uses data/sales.csv from Appendix 1, uploading it to BigQuery for querying, and avoids advanced concepts like type annotations (introduced in Chapter 7) or testing (Chapter 9) to focus on core operations. All code adheres to PEP 8’s 4-space indentation, preferring spaces over tabs to prevent IndentationError.
25.0.1 Setup Instructions
To work with BigQuery, set up your environment as follows:
- Verify Python Version:
- Run
python --versionto ensure Python 3.10 or higher, as required by the curriculum.
- Run
- Install Google Cloud SDK:
- Download and install from Google Cloud SDK for your OS (Windows, macOS, Linux).
- Initialize with
gcloud initand follow prompts to select your Google Cloud project.
- Create a Google Cloud Project:
- Go to Google Cloud Console, create a new project (e.g.,
hijra-analytics), and note the project ID (e.g.,hijra-analytics-123).
- Go to Google Cloud Console, create a new project (e.g.,
- Enable Billing:
- Ensure billing is enabled for your project (see Billing Setup); the free tier covers 10 GB/month of queries.
- Enable BigQuery API:
- In the Console, navigate to “APIs & Services” > “Library,” search for “BigQuery API,” and enable it.
- Authenticate:
- Run
gcloud auth application-default loginin a terminal to authenticate with your Google account. Follow the browser prompt to log in.
- Run
- Create and Activate Virtual Environment:
- Create:
python -m venv venv. - Activate: Windows (
venv\Scripts\activate), Unix/macOS (source venv/bin/activate). Verify withwhich python(Unix/macOS) orwhere python(Windows) showing the virtual environment path.
- Create:
- Install Python Libraries:
- Run
pip install google-cloud-bigquery pandas pyyamlin the activated virtual environment.
- Run
- Verify Setup:
- Run
gcloud config listto confirm your project ID and authentication. - Expected output includes
[core] project = your-project-id.
- Run
- Prepare Data:
- Create
de-onboarding/data/and populate withsales.csv,config.yaml,sample.csv, andsales.dbper Appendix 1.
- Create
- Navigate BigQuery Console:
- Access the BigQuery Console at https://console.cloud.google.com/bigquery.
- In “SQL Workspace,” select your project from the left panel, then view datasets and tables to verify creation (e.g.,
sales_data.sales).
Troubleshooting:
- If
python --versionshows less than 3.10, install Python 3.10+ from python.org. - If
gcloud: command not found, ensure SDK is installed and added to PATH. - If authentication fails, re-run
gcloud auth application-default login. - If
piperrors occur, ensure the virtual environment is activated (see step 7). - If console navigation is unclear, refer to BigQuery Console Guide.
- If billing errors occur (e.g., “Billing not enabled for project”), navigate to Billing in Google Cloud Console and link a payment method.
Data Engineering Workflow Context
This diagram illustrates BigQuery’s role in a cloud analytics pipeline:
flowchart TD
A["Raw Data (CSV)"] --> B["Python Script"]
B --> C{"BigQuery Operations"}
C -->|Upload| D["BigQuery Dataset/Table"]
C -->|Query| E["Aggregated Metrics"]
D --> F["SQL Queries"]
E --> G["Output (JSON/CSV)"]
F --> G
G --> H["Storage/Analysis"]
classDef data fill:#f9f9f9,stroke:#333,stroke-width:2px
classDef process fill:#d0e0ff,stroke:#336,stroke-width:1px
classDef storage fill:#ddffdd,stroke:#363,stroke-width:1px
class A,D,E,G data
class B,C,F process
class H storageBuilding On and Preparing For
- Building On:
- Chapters 12–15, 19–20: Leverages SQL skills from SQLite for BigQuery’s SQL syntax.
- Chapters 16–17, 21–23: Applies PostgreSQL querying and schema design to BigQuery tables.
- Chapter 24: Extends database fundamentals for cloud-based analytics.
- Preparing For:
- Chapter 26: Enables Python integration with BigQuery for programmatic analytics.
- Chapters 27–29: Prepares for advanced querying, warehousing, and optimization.
- Chapter 32: Supports data mart creation with BigQuery.
- Chapter 51: Lays groundwork for BI visualizations using BigQuery data.
What You’ll Learn
This chapter covers:
- BigQuery Setup: Configuring Google Cloud SDK and creating datasets.
- Table Management: Uploading CSVs and defining schemas.
- Basic SQL Queries: Filtering, grouping, and aggregating sales data.
- Query Execution: Running queries via BigQuery Console and Python.
- Output Handling: Exporting query results to JSON.
By the end, you’ll create a BigQuery dataset, upload data/sales.csv, query sales metrics, and export results to data/sales_metrics.json, all with 4-space indentation per PEP 8. The micro-project ensures robust handling of sales data, aligning with Hijra Group’s analytics needs.
Follow-Along Tips:
- Complete setup instructions above before running code.
- Use print statements (e.g.,
print(query_job.result())) to debug BigQuery queries. - Verify file paths with
ls data/(Unix/macOS) ordir data\(Windows). - Use UTF-8 encoding to avoid
UnicodeDecodeError. - Configure editor for 4-space indentation per PEP 8 (VS Code: “Editor: Tab Size” = 4, “Editor: Insert Spaces” = true, “Editor: Detect Indentation” = false).
- If
IndentationErroroccurs, runpython -tt script.pyto detect tab/space mixing.
25.1 BigQuery Setup
BigQuery organizes data into projects, datasets, and tables. A project is a billing and access control unit, a dataset is a container for tables, and a table stores structured data. BigQuery’s serverless model eliminates infrastructure management, with costs based on data scanned (approximately $5 per terabyte queried, with a free tier of 10 GB/month; see BigQuery Pricing). BigQuery integrates with Google Cloud Storage for loading data lakes, covered in Chapter 31. For production, authentication typically uses service accounts, covered in Chapter 65 (Security Best Practices).
25.1.1 Creating a Dataset
Create a dataset using Python and the google-cloud-bigquery library.
from google.cloud import bigquery # Import BigQuery client
# Initialize client
client = bigquery.Client() # Connect to BigQuery
# Define dataset
project_id = "your-project-id" # Replace with your project ID
dataset_id = f"{project_id}.sales_data" # Dataset name
dataset = bigquery.Dataset(dataset_id) # Create dataset object
dataset.location = "US" # Set location (e.g., US, EU)
# Create dataset
client.create_dataset(dataset, exists_ok=True) # Create if not exists
print(f"Created dataset: {dataset_id}") # Confirm creation
# Expected Output:
# Created dataset: your-project-id.sales_dataFollow-Along Instructions:
- Install library:
pip install google-cloud-bigquery. - Replace
your-project-idwith your Google Cloud project ID (fromgcloud config list). - Save as
de-onboarding/create_dataset.py. - Configure editor for 4-space indentation per PEP 8.
- Run:
python create_dataset.py. - Verify dataset in BigQuery Console (https://console.cloud.google.com/bigquery).
- Common Errors:
- AuthenticationError: Ensure
gcloud auth application-default loginwas run. Verify withgcloud config list. - NotFound: Confirm project ID. Print
client.project. - IndentationError: Use 4 spaces (not tabs). Run
python -tt create_dataset.py.
- AuthenticationError: Ensure
Key Points:
- Dataset Creation: O(1) operation, metadata-based.
- Underlying Implementation: BigQuery datasets are logical containers, stored in Google’s Colossus filesystem, enabling scalable metadata operations.
- Implication: Datasets organize sales data for Hijra Group’s analytics.
25.2 Table Management
Upload data/sales.csv to a BigQuery table with a defined schema.
25.2.1 Uploading a CSV
Define a schema and load data/sales.csv into a table. Note: Ensure CSV data matches the schema (e.g., numeric price); malformed data may cause BadRequest errors. For example, a non-numeric price (e.g., ‘invalid’) causes a BadRequest error. Resolve by cleaning the CSV or adjusting the schema to STRING and casting in queries. Inspect with pd.read_csv(csv_path).head() to verify.
from google.cloud import bigquery # Import BigQuery client
# Initialize client
client = bigquery.Client() # Connect to BigQuery
# Define table
dataset_id = "your-project-id.sales_data" # Replace with your dataset
table_id = f"{dataset_id}.sales" # Table name
table = bigquery.Table(table_id) # Create table object
# Define schema
schema = [
bigquery.SchemaField("product", "STRING"), # Product name
bigquery.SchemaField("price", "FLOAT"), # Sale price
bigquery.SchemaField("quantity", "INTEGER") # Quantity sold
]
table.schema = schema # Assign schema
client.create_table(table, exists_ok=True) # Create table if not exists
# Load CSV
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.CSV, # CSV format
skip_leading_rows=1, # Skip header
schema=schema, # Apply schema
write_disposition="WRITE_TRUNCATE" # Overwrite table
)
with open("data/sales.csv", "rb") as source_file:
job = client.load_table_from_file(source_file, table_id, job_config=job_config)
job.result() # Wait for job to complete
print(f"Loaded {job.output_rows} rows into {table_id}") # Confirm load
# Expected Output:
# Loaded 6 rows into your-project-id.sales_data.salesFollow-Along Instructions:
- Ensure
data/sales.csvexists inde-onboarding/data/per Appendix 1. - Replace
your-project-idwith your project ID. - Save as
de-onboarding/load_table.py. - Configure editor for 4-space indentation per PEP 8.
- Run:
python load_table.py. - Verify table in BigQuery Console.
- Common Errors:
- FileNotFoundError: Ensure
data/sales.csvexists. Printos.path.exists("data/sales.csv"). - SchemaMismatch: Verify CSV with
pd.read_csv("data/sales.csv").head(). - IndentationError: Use 4 spaces (not tabs). Run
python -tt load_table.py.
- FileNotFoundError: Ensure
Key Points:
- Schema: Ensures data types (STRING, FLOAT, INTEGER).
- Load Operation: O(n) for n rows, distributed across BigQuery nodes.
- Underlying Implementation: BigQuery stores tables in columnar format, optimizing analytical queries.
- Space Complexity: O(n) for n rows, with compression reducing storage.
- Implication: Efficient for loading sales data into Hijra Group’s warehouse.
25.3 Basic SQL Queries
Run SQL queries to analyze sales data, focusing on filtering, grouping, and aggregation. Queries use parameters to improve reusability, aligning with best practices. Note: Before querying, verify the table schema in BigQuery Console or print it using print(client.get_table(table_id).schema) to ensure data types match (e.g., price as FLOAT). The or 0.0 in result handling (e.g., next(results).total_sales or 0.0) ensures robustness for empty results. For example, test an empty table with:
client = bigquery.Client()
query = f"SELECT SUM(price * quantity) AS total_sales FROM `{table_id}` WHERE FALSE"
job = client.query(query)
result = next(job.result()).total_sales or 0.0
print(result) # Expected: 0.025.3.1 Querying Sales Data
Query total sales and top products, starting with a simple non-parameterized query followed by a parameterized version for reusability.
Non-Parameterized Example:
from google.cloud import bigquery # Import BigQuery client
# Initialize client
client = bigquery.Client() # Connect to BigQuery
# Define simple query
query = """
SELECT
product,
SUM(price * quantity) AS total_sales
FROM
`your-project-id.sales_data.sales`
WHERE
product IS NOT NULL
AND product LIKE 'Halal%'
GROUP BY
product
ORDER BY
total_sales DESC
LIMIT
3
"""
# Run query
query_job = client.query(query) # Execute query
results = query_job.result() # Get results
# Print results
print("Top Products by Sales (Simple Query):") # Debug
for row in results:
print(f"Product: {row.product}, Total Sales: {row.total_sales}") # Show results
# Expected Output:
# Top Products by Sales (Simple Query):
# Product: Halal Laptop, Total Sales: 1999.98
# Product: Halal Keyboard, Total Sales: 249.95
# Product: Halal Mouse, Total Sales: 249.9Parameterized Example (enhances reusability):
from google.cloud import bigquery # Import BigQuery client
# Initialize client
client = bigquery.Client() # Connect to BigQuery
# Define query with parameters
query = """
SELECT
product,
SUM(price * quantity) AS total_sales
FROM
`your-project-id.sales_data.sales`
WHERE
product IS NOT NULL
AND product LIKE @prefix
AND quantity <= @max_quantity
GROUP BY
product
ORDER BY
total_sales DESC
LIMIT
3
"""
job_config = bigquery.QueryJobConfig(
query_parameters=[
bigquery.ScalarQueryParameter("prefix", "STRING", "Halal%"),
bigquery.ScalarQueryParameter("max_quantity", "INTEGER", 100)
]
)
# Run query
query_job = client.query(query, job_config=job_config) # Execute query
results = query_job.result() # Get results
# Print results
print("Top Products by Sales (Parameterized Query):") # Debug
for row in results:
print(f"Product: {row.product}, Total Sales: {row.total_sales}") # Show results
print("Parameterized queries improve security by preventing SQL injection and enhance reusability by allowing dynamic inputs, preparing for Chapter 26’s automation.")
# Expected Output:
# Top Products by Sales (Parameterized Query):
# Product: Halal Laptop, Total Sales: 1999.98
# Product: Halal Keyboard, Total Sales: 249.95
# Product: Halal Mouse, Total Sales: 249.9
# Parameterized queries improve security by preventing SQL injection and enhance reusability by allowing dynamic inputs, preparing for Chapter 26’s automation.Follow-Along Instructions:
- Replace
your-project-idwith your project ID. - Save as
de-onboarding/query_sales.py. - Configure editor for 4-space indentation per PEP 8.
- Run:
python query_sales.py. - Verify output matches expected.
- Common Errors:
- QueryError: Check SQL syntax. Print
queryand test in BigQuery Console. - NotFound: Ensure table exists. Run
client.get_table("your-project-id.sales_data.sales"). - IndentationError: Use 4 spaces (not tabs). Run
python -tt query_sales.py.
- QueryError: Check SQL syntax. Print
Key Points:
- SQL Syntax: BigQuery uses Standard SQL, similar to PostgreSQL (Chapter 21).
- Parameters:
@prefixand@max_quantityimprove query reusability, building on the simple query. - Time Complexity: O(n) for scanning n rows, optimized by columnar storage.
- Space Complexity: O(k) for k output rows.
- Implication: Efficient for aggregating sales metrics.
25.4 Micro-Project: Sales Data Analytics Tool
Project Requirements
Build a BigQuery-based tool to analyze data/sales.csv, creating a dataset, uploading data, querying metrics, and exporting results to data/sales_metrics.json. This tool supports Hijra Group’s cloud analytics, ensuring Sharia-compliant sales reporting. The YAML validation ensures Sharia compliance by filtering Halal products (via product_prefix: 'Halal') and enforcing financial constraints (e.g., min_price, max_quantity), ensuring only compliant transactions are analyzed, a key requirement for Hijra Group’s fintech analytics.
- Create a BigQuery dataset (
sales_data). - Upload
data/sales.csvto a table (sales). - Read
data/config.yamlwith PyYAML for validation rules. - Query total sales and top 3 Halal products (quantity ≤ 100) using parameterized SQL.
- Export results to
data/sales_metrics.json. - Log steps using print statements.
- Use 4-space indentation per PEP 8, preferring spaces over tabs.
Sample Input Files
data/sales.csv (from Appendix 1):
product,price,quantity
Halal Laptop,999.99,2
Halal Mouse,24.99,10
Halal Keyboard,49.99,5
,29.99,3
Monitor,invalid,2
Headphones,5.00,150data/config.yaml (from Appendix 1):
min_price: 10.0
max_quantity: 100
required_fields:
- product
- price
- quantity
product_prefix: 'Halal'
max_decimals: 2Data Processing Flow
flowchart TD
A["Input CSV
sales.csv"] --> B["Load CSV
BigQuery"]
B --> C["BigQuery Table"]
C --> D["Read YAML
config.yaml"]
D --> E["Run SQL Query
BigQuery"]
E -->|Invalid| F["Log Warning"]
E -->|Valid| G["Extract Metrics"]
G --> H["Export JSON
sales_metrics.json"]
F --> I["End Processing"]
H --> I
classDef data fill:#f9f9f9,stroke:#333,stroke-width:2px
classDef process fill:#d0e0ff,stroke:#336,stroke-width:1px
classDef error fill:#ffdddd,stroke:#933,stroke-width:1px
classDef endpoint fill:#ddffdd,stroke:#363,stroke-width:1px
class A,C,G,H data
class B,D,E process
class F error
class I endpointAcceptance Criteria
- Go Criteria:
- Creates dataset and table correctly.
- Uploads
sales.csvwith defined schema. - Queries total sales and top 3 Halal products using parameters.
- Exports results to
data/sales_metrics.json. - Logs steps and invalid data.
- Uses 4-space indentation per PEP 8.
- No-Go Criteria:
- Fails to create dataset/table.
- Incorrect query results.
- Missing JSON export.
- Uses try/except or type annotations.
- Inconsistent indentation.
Common Pitfalls to Avoid
- Authentication Issues:
- Problem:
google.api_core.exceptions.Forbidden. - Solution: Run
gcloud auth application-default login. Printclient.project.
- Problem:
- Schema Mismatches:
- Problem:
BadRequestduring CSV load. - Solution: Verify CSV with
pd.read_csv("data/sales.csv").head().
- Problem:
- Query Errors:
- Problem: Invalid SQL syntax.
- Solution: Print
queryand test in BigQuery Console.
- FileNotFoundError:
- Problem: Missing
sales.csvorconfig.yaml. - Solution: Ensure files in
de-onboarding/data/. Printos.path.exists("data/sales.csv").
- Problem: Missing
- IndentationError:
- Problem: Mixed spaces/tabs.
- Solution: Use 4 spaces per PEP 8. Run
python -tt sales_analytics.py.
How This Differs from Production
In production, this solution would include:
- Error Handling: Try/except (Chapter 7).
- Type Safety: Type annotations (Chapter 7).
- Testing: Pytest tests (Chapter 9).
- Scalability: Partitioned tables (Chapter 29).
- Logging: File-based logging (Chapter 52).
- Security: IAM roles and encryption (Chapter 65).
- Data Marts: Extending the dataset into a data mart with fact/dimension tables (Chapter 32).
Implementation
# File: de-onboarding/sales_analytics.py
import pandas as pd # For CSV inspection
import yaml # For YAML parsing
import json # For JSON export
import os # For file checks
from google.cloud import bigquery # For BigQuery operations
# Define function to read YAML configuration
def read_config(config_path): # Takes config file path
"""Read YAML configuration."""
print(f"Opening config: {config_path}") # Debug
file = open(config_path, "r") # Open file
config = yaml.safe_load(file) # Parse YAML
file.close() # Close file
print(f"Loaded config: {config}") # Debug
return config # Return config
# Define function to create BigQuery dataset
def create_dataset(project_id, dataset_name): # Takes project and dataset names
"""Create BigQuery dataset."""
client = bigquery.Client() # Initialize client
dataset_id = f"{project_id}.{dataset_name}" # Full dataset ID
dataset = bigquery.Dataset(dataset_id) # Create dataset object
dataset.location = "US" # Set location
client.create_dataset(dataset, exists_ok=True) # Create if not exists
print(f"Created dataset: {dataset_id}") # Confirm
return dataset_id # Return dataset ID
# Define function to load CSV to BigQuery
def load_csv_to_table(dataset_id, table_name, csv_path): # Takes dataset, table, CSV path
"""Load CSV to BigQuery table."""
client = bigquery.Client() # Initialize client
table_id = f"{dataset_id}.{table_name}" # Full table ID
table = bigquery.Table(table_id) # Create table object
# Define schema
schema = [
bigquery.SchemaField("product", "STRING"),
bigquery.SchemaField("price", "FLOAT"),
bigquery.SchemaField("quantity", "INTEGER")
]
table.schema = schema # Assign schema
client.create_table(table, exists_ok=True) # Create table
# Load CSV
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.CSV,
skip_leading_rows=1,
schema=schema,
write_disposition="WRITE_TRUNCATE"
)
print(f"Loading CSV: {csv_path}") # Debug
with open(csv_path, "rb") as source_file:
job = client.load_table_from_file(source_file, table_id, job_config=job_config)
job.result() # Wait for completion
print(f"Loaded {job.output_rows} rows into {table_id}") # Confirm
return table_id # Return table ID
# Define function to query sales data
def query_sales(table_id, config): # Takes table ID and config
"""Query sales data for metrics."""
client = bigquery.Client() # Initialize client
prefix = config["product_prefix"] # Get prefix
max_quantity = config["max_quantity"] # Get max quantity
min_price = config["min_price"] # Get min price
# Define query with parameters
query = f"""
SELECT
product,
SUM(price * quantity) AS total_sales
FROM
`{table_id}`
WHERE
product IS NOT NULL
AND product LIKE @prefix
AND quantity <= @max_quantity
AND price >= @min_price
GROUP BY
product
ORDER BY
total_sales DESC
LIMIT
3
"""
job_config = bigquery.QueryJobConfig(
query_parameters=[
bigquery.ScalarQueryParameter("prefix", "STRING", f"{prefix}%"),
bigquery.ScalarQueryParameter("max_quantity", "INTEGER", max_quantity),
bigquery.ScalarQueryParameter("min_price", "FLOAT", min_price)
]
)
print(f"Running query:\n{query}") # Debug
query_job = client.query(query, job_config=job_config) # Execute query
results = query_job.result() # Get results
# Process results
total_sales = 0.0
top_products = {}
for row in results:
total_sales += row.total_sales # Accumulate total
top_products[row.product] = float(row.total_sales) # Store top products
print(f"Query results: {top_products}") # Debug
return {
"total_sales": float(total_sales),
"top_products": top_products
}, len(top_products) # Return results and count
# Define function to export results
def export_results(results, json_path): # Takes results and file path
"""Export results to JSON."""
print(f"Writing to: {json_path}") # Debug
print(f"Results: {results}") # Debug
file = open(json_path, "w") # Open file
json.dump(results, file, indent=2) # Write JSON
file.close() # Close file
print(f"Exported results to {json_path}") # Confirm
# Define main function
def main(): # No parameters
"""Main function to analyze sales data."""
project_id = "your-project-id" # Replace with your project ID
dataset_name = "sales_data" # Dataset name
table_name = "sales" # Table name
csv_path = "data/sales.csv" # CSV path
config_path = "data/config.yaml" # YAML path
json_path = "data/sales_metrics.json" # JSON output path
# Validate file existence
if not os.path.exists(csv_path):
print(f"Error: {csv_path} not found")
return
if not os.path.exists(config_path):
print(f"Error: {config_path} not found")
return
config = read_config(config_path) # Read config
dataset_id = create_dataset(project_id, dataset_name) # Create dataset
table_id = load_csv_to_table(dataset_id, table_name, csv_path) # Load CSV
results, valid_records = query_sales(table_id, config) # Query data
export_results(results, json_path) # Export results
# Print report
print("\nSales Analytics Report:")
print(f"Valid Records: {valid_records}")
print(f"Total Sales: ${round(results['total_sales'], 2)}")
print(f"Top Products: {results['top_products']}")
print("Processing completed")
if __name__ == "__main__":
main() # Run main functionExpected Outputs
data/sales_metrics.json:
{
"total_sales": 2499.83,
"top_products": {
"Halal Laptop": 1999.98,
"Halal Keyboard": 249.95,
"Halal Mouse": 249.9
}
}Console Output (abridged):
Opening config: data/config.yaml
Loaded config: {'min_price': 10.0, 'max_quantity': 100, 'required_fields': ['product', 'price', 'quantity'], 'product_prefix': 'Halal', 'max_decimals': 2}
Created dataset: your-project-id.sales_data
Loading CSV: data/sales.csv
Loaded 6 rows into your-project-id.sales_data.sales
Running query:
SELECT
product,
SUM(price * quantity) AS total_sales
FROM
`your-project-id.sales_data.sales`
WHERE
product IS NOT NULL
AND product LIKE @prefix
AND quantity <= @max_quantity
AND price >= @min_price
GROUP BY
product
ORDER BY
total_sales DESC
LIMIT
3
Query results: {'Halal Laptop': 1999.98, 'Halal Keyboard': 249.95, 'Halal Mouse': 249.9}
Writing to: data/sales_metrics.json
Exported results to data/sales_metrics.json
Sales Analytics Report:
Valid Records: 3
Total Sales: $2499.83
Top Products: {'Halal Laptop': 1999.98, 'Halal Keyboard': 249.95, 'Halal Mouse': 249.9}
Processing completedHow to Run and Test
Setup:
- Setup Checklist:
- Complete setup instructions in Section 25.0.1 (verify Python 3.10+, install Google Cloud SDK, create project, enable billing, enable BigQuery API, authenticate, create/activate virtual environment, install libraries).
- Create
de-onboarding/data/withsales.csv,config.yaml,sample.csv, andsales.dbper Appendix 1. - Configure editor for 4-space indentation per PEP 8.
- Save
sales_analytics.pyinde-onboarding/.
- Troubleshooting:
- If
python --versionshows less than 3.10, install Python 3.10+ from python.org. - If
FileNotFoundError, check files withls data/(Unix/macOS) ordir data\(Windows). - If
google.api_core.exceptions.Forbidden, re-authenticate withgcloud auth application-default login. - If
IndentationError, use 4 spaces. Runpython -tt sales_analytics.py. - If
yaml.YAMLError, printopen(config_path).read()to check syntax. - If billing errors occur, verify billing setup in Google Cloud Console.
- If
- Setup Checklist:
Run:
- Open terminal in
de-onboarding/. - Replace
your-project-idinsales_analytics.pywith your project ID. - Run:
python sales_analytics.py. - Outputs:
data/sales_metrics.json, console logs.
- Open terminal in
Test Scenarios:
- Valid Data: Verify
sales_metrics.jsonshowstotal_sales: 2499.83and top products. - Empty Table: Manually truncate table in BigQuery Console, re-run query:
client = bigquery.Client() query = f"SELECT * FROM `{table_id}` WHERE FALSE" query_job = client.query(query) results = query_job.result() print(list(results)) # Expected: []
- Valid Data: Verify
25.5 Practice Exercises
Exercise 1: Create BigQuery Dataset
Write a function to create a BigQuery dataset, with 4-space indentation per PEP 8.
Expected Output:
Created dataset: your-project-id.test_dataFollow-Along Instructions:
- Save as
de-onboarding/ex1_dataset.py. - Configure editor for 4-space indentation per PEP 8.
- Run:
python ex1_dataset.py. - How to Test:
- Verify dataset in BigQuery Console.
- Test with invalid project ID: Should raise
NotFound. Printclient.projectto debug. - Common Errors:
- AuthenticationError: Re-run
gcloud auth application-default login. - IndentationError: Use 4 spaces (not tabs). Run
python -tt ex1_dataset.py.
- AuthenticationError: Re-run
Exercise 2: Load CSV to BigQuery
Write a function to load data/sales.csv to a BigQuery table, with 4-space indentation per PEP 8.
Expected Output:
Loaded 6 rows into your-project-id.sales_data.salesFollow-Along Instructions:
- Save as
de-onboarding/ex2_load.py. - Ensure
data/sales.csvexists per Appendix 1. - Configure editor for 4-space indentation per PEP 8.
- Run:
python ex2_load.py. - How to Test:
- Verify table in BigQuery Console.
- Test with invalid CSV path: Should raise
FileNotFoundError. Printos.path.exists(csv_path). - Common Errors:
- SchemaMismatch: Print
pd.read_csv(csv_path).head()to inspect CSV. - IndentationError: Use 4 spaces (not tabs). Run
python -tt ex2_load.py.
- SchemaMismatch: Print
Exercise 3: Query Total Sales
Write a function to query total sales from a BigQuery table using parameterized SQL, with 4-space indentation per PEP 8.
Expected Output:
Total Sales: 2499.83Follow-Along Instructions:
- Save as
de-onboarding/ex3_query.py. - Configure editor for 4-space indentation per PEP 8.
- Run:
python ex3_query.py. - How to Test:
- Verify output matches expected.
- Test with empty table: Should return 0.0. Truncate table in BigQuery Console.
- Common Errors:
- QueryError: Print
queryto check syntax. - IndentationError: Use 4 spaces (not tabs). Run
python -tt ex3_query.py.
- QueryError: Print
Exercise 4: Debug a Query Bug
Fix this buggy code that assumes price is a STRING instead of FLOAT, causing a type mismatch, with 4-space indentation per PEP 8.
Buggy Code:
from google.cloud import bigquery
def query_sales(table_id):
client = bigquery.Client()
query = f"""
SELECT
product,
SUM(CAST(price AS FLOAT) * quantity) AS total_sales
FROM
`{table_id}`
GROUP BY
product
"""
query_job = client.query(query)
results = query_job.result()
total = sum(row.total_sales for row in results)
return total
print(query_sales("your-project-id.sales_data.sales"))Expected Output:
2499.83Follow-Along Instructions:
- Save as
de-onboarding/ex4_debug.py. - Configure editor for 4-space indentation per PEP 8.
- Run:
python ex4_debug.pyto see error (if schema is correct, it may run but is inefficient). - Fix and re-run.
- How to Test:
- Verify output matches expected.
- Check schema in BigQuery Console to confirm
priceisFLOAT. - Common Errors:
- QueryError: Print
queryand test in BigQuery Console. - IndentationError: Use 4 spaces (not tabs). Run
python -tt ex4_debug.py.
- QueryError: Print
Exercise 5: Conceptual Analysis with Performance Comparison
Compare BigQuery’s columnar storage to SQLite’s row-based storage for sales data analytics by writing a script that queries total sales from both a BigQuery table (your-project-id.sales_data.sales) and a SQLite database (data/sales.db, created in Chapter 12’s SQLite setup), measuring execution time with time.time(). Save the comparison and timing results to ex5_concepts.txt. Note: BigQuery times may vary due to network latency; focus on relative differences, as SQLite is local. This exercise uses data/sales.db from Chapter 12’s SQLite setup. Use 4-space indentation per PEP 8.
Expected Output (in ex5_concepts.txt):
BigQuery Time: [time] seconds
SQLite Time: [time] seconds
Comparison: BigQuery’s columnar storage optimizes analytical queries (e.g., SUM(price * quantity)) with O(n) complexity for n rows, ideal for Hijra Group’s sales aggregations. SQLite’s row-based storage is better for transactional queries (e.g., INSERT, UPDATE) but slower for aggregations due to row scanning. BigQuery’s distributed architecture scales to petabytes, while SQLite is limited to local databases.Follow-Along Instructions:
- Save as
de-onboarding/ex5_concepts.py. - Ensure
data/sales.dbexists per Appendix 1 (created in Chapter 12). If missing, run the setup script from Appendix 1 (e.g.,python create_sales_db.py). - Configure editor for 4-space indentation per PEP 8.
- Run:
python ex5_concepts.py. - How to Test:
- Verify
ex5_concepts.txtcontains timing results and comparison. - Ensure BigQuery time is generally faster for the small dataset (
sales.csv). - Test with empty BigQuery table: Should return 0.0 for BigQuery.
- Common Errors:
- FileNotFoundError: Check
data/sales.dbwithos.path.exists("data/sales.db"). - DatabaseError: Verify
salestable in SQLite withsqlite3 data/sales.db "SELECT * FROM sales;". - IndentationError: Use 4 spaces (not tabs). Run
python -tt ex5_concepts.py.
- FileNotFoundError: Check
- Verify
Exercise 6: Synthesize BigQuery Operations with Cost Optimization
Write a script that creates a BigQuery dataset (test_data), loads data/sample.csv into a table (sample_sales), queries total sales for Halal products with and without a price > 10.0 filter, and compares data scanned in BigQuery Console (via query details). Save query results to ex6_results.json and cost analysis to ex6_cost.txt. Use 4-space indentation per PEP 8.
Sample Input (data/sample.csv from Appendix 1):
product,price,quantity
Halal Laptop,999.99,2
Halal Mouse,24.99,10Expected Output:
ex6_results.json:
{
"total_sales_with_filter": 2249.88,
"total_sales_without_filter": 2249.88
}ex6_cost.txt:
Data Scanned With Filter: [X] MB
Data Scanned Without Filter: [Y] MB
Analysis: Adding a filter like 'price > 10.0' reduces data scanned from [Y] MB to [X] MB, lowering costs since BigQuery charges ~$5/TB scanned.Follow-Along Instructions:
- Save as
de-onboarding/ex6_synthesize.py. - Ensure
data/sample.csvexists per Appendix 1. - Configure editor for 4-space indentation per PEP 8.
- Run:
python ex6_synthesize.py. - How to Test:
- Verify
ex6_results.jsonshowstotal_sales_with_filter: 2249.88andtotal_sales_without_filter: 2249.88(same forsample.csvdue to all prices > 10.0). - Verify
ex6_cost.txtreports data scanned (check BigQuery Console query details) and analysis. - Check dataset and table in BigQuery Console.
- Test with empty CSV (e.g.,
data/empty.csv): Should returntotal_sales: 0.0for both queries. - Common Errors:
- FileNotFoundError: Print
os.path.exists("data/sample.csv"). - SchemaMismatch: Print
pd.read_csv("data/sample.csv").head(). - IndentationError: Use 4 spaces (not tabs). Run
python -tt ex6_synthesize.py.
- FileNotFoundError: Print
- Verify
25.6 Exercise Solutions
Solution to Exercise 1: Create BigQuery Dataset
from google.cloud import bigquery # Import BigQuery client
def create_dataset(project_id, dataset_name): # Takes project and dataset names
"""Create BigQuery dataset."""
client = bigquery.Client() # Initialize client
dataset_id = f"{project_id}.{dataset_name}" # Full ID
dataset = bigquery.Dataset(dataset_id) # Create dataset
dataset.location = "US" # Set location
client.create_dataset(dataset, exists_ok=True) # Create
print(f"Created dataset: {dataset_id}") # Confirm
# Test
create_dataset("your-project-id", "test_data") # Call functionSolution to Exercise 2: Load CSV to BigQuery
from google.cloud import bigquery # Import BigQuery client
def load_csv_to_table(dataset_id, table_name, csv_path): # Takes dataset, table, CSV
"""Load CSV to BigQuery table."""
client = bigquery.Client() # Initialize client
table_id = f"{dataset_id}.{table_name}" # Full ID
table = bigquery.Table(table_id) # Create table
schema = [
bigquery.SchemaField("product", "STRING"),
bigquery.SchemaField("price", "FLOAT"),
bigquery.SchemaField("quantity", "INTEGER")
]
table.schema = schema # Assign schema
client.create_table(table, exists_ok=True) # Create
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.CSV,
skip_leading_rows=1,
schema=schema,
write_disposition="WRITE_TRUNCATE"
)
with open(csv_path, "rb") as source_file:
job = client.load_table_from_file(source_file, table_id, job_config=job_config)
job.result() # Wait
print(f"Loaded {job.output_rows} rows into {table_id}") # Confirm
# Test
load_csv_to_table("your-project-id.sales_data", "sales", "data/sales.csv")Solution to Exercise 3: Query Total Sales
from google.cloud import bigquery # Import BigQuery client
def query_total_sales(table_id): # Takes table ID
"""Query total sales."""
client = bigquery.Client() # Initialize client
query = f"""
SELECT
SUM(price * quantity) AS total_sales
FROM
`{table_id}`
WHERE
product IS NOT NULL
AND product LIKE @prefix
AND quantity <= @max_quantity
"""
job_config = bigquery.QueryJobConfig(
query_parameters=[
bigquery.ScalarQueryParameter("prefix", "STRING", "Halal%"),
bigquery.ScalarQueryParameter("max_quantity", "INTEGER", 100)
]
)
query_job = client.query(query, job_config=job_config) # Execute
results = query_job.result() # Get results
total = next(results).total_sales or 0.0 # Extract total
print(f"Total Sales: {total}") # Confirm
return total # Return total
# Test
print(query_total_sales("your-project-id.sales_data.sales"))Solution to Exercise 4: Debug a Query Bug
from google.cloud import bigquery # Import BigQuery client
def query_sales(table_id): # Takes table ID
"""Query total sales."""
client = bigquery.Client() # Initialize client
query = f"""
SELECT
product,
SUM(price * quantity) AS total_sales
FROM
`{table_id}`
GROUP BY
product
"""
query_job = client.query(query) # Execute
results = query_job.result() # Get results
total = sum(row.total_sales for row in results) # Sum
return total # Return total
# Test
print(query_sales("your-project-id.sales_data.sales"))Explanation:
- Bug: The query used
CAST(price AS FLOAT), assumingpricewasSTRING, which is unnecessary since the schema definespriceasFLOAT. - Fix: Removed
CAST, usingpricedirectly.
Solution to Exercise 5: Conceptual Analysis with Performance Comparison
import time # For timing
import sqlite3 # For SQLite
from google.cloud import bigquery # For BigQuery
def compare_storage_performance(bigquery_table_id, sqlite_db_path, output_path):
"""Compare BigQuery and SQLite query performance."""
# BigQuery query
client = bigquery.Client()
bq_query = f"""
SELECT
SUM(price * quantity) AS total_sales
FROM
`{bigquery_table_id}`
WHERE
product IS NOT NULL
AND product LIKE 'Halal%'
"""
start_time = time.time()
bq_job = client.query(bq_query)
bq_result = bq_job.result()
bq_total = next(bq_result).total_sales or 0.0
bq_time = time.time() - start_time
# SQLite query
conn = sqlite3.connect(sqlite_db_path)
cursor = conn.cursor()
sqlite_query = """
SELECT
SUM(price * quantity) AS total_sales
FROM
sales
WHERE
product IS NOT NULL
AND product LIKE 'Halal%'
"""
start_time = time.time()
cursor.execute(sqlite_query)
sqlite_total = cursor.fetchone()[0] or 0.0
sqlite_time = time.time() - start_time
conn.close()
# Write results
analysis = f"""
BigQuery Time: {bq_time:.4f} seconds
SQLite Time: {sqlite_time:.4f} seconds
Comparison: BigQuery’s columnar storage optimizes analytical queries (e.g., SUM(price * quantity)) with O(n) complexity for n rows, ideal for Hijra Group’s sales aggregations. SQLite’s row-based storage is better for transactional queries (e.g., INSERT, UPDATE) but slower for aggregations due to row scanning. BigQuery’s distributed architecture scales to petabytes, while SQLite is limited to local databases.
"""
print(f"Writing analysis to: {output_path}") # Debug
file = open(output_path, "w") # Open file
file.write(analysis.strip()) # Write analysis
file.close() # Close file
print(f"Saved to {output_path}") # Confirm
# Test
compare_storage_performance("your-project-id.sales_data.sales", "data/sales.db", "ex5_concepts.txt")Solution to Exercise 6: Synthesize BigQuery Operations with Cost Optimization
import json # For JSON export
import os # For file checks
from google.cloud import bigquery # For BigQuery operations
def synthesize_bigquery_operations(project_id, dataset_name, table_name, csv_path, json_path, cost_path):
"""Create dataset, load CSV, query total sales with/without filter, and analyze cost."""
client = bigquery.Client()
# Create dataset
dataset_id = f"{project_id}.{dataset_name}"
dataset = bigquery.Dataset(dataset_id)
dataset.location = "US"
client.create_dataset(dataset, exists_ok=True)
print(f"Created dataset: {dataset_id}")
# Load CSV
table_id = f"{dataset_id}.{table_name}"
table = bigquery.Table(table_id)
schema = [
bigquery.SchemaField("product", "STRING"),
bigquery.SchemaField("price", "FLOAT"),
bigquery.SchemaField("quantity", "INTEGER")
]
table.schema = schema
client.create_table(table, exists_ok=True)
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.CSV,
skip_leading_rows=1,
schema=schema,
write_disposition="WRITE_TRUNCATE"
)
with open(csv_path, "rb") as source_file:
job = client.load_table_from_file(source_file, table_id, job_config=job_config)
job.result()
print(f"Loaded {job.output_rows} rows into {table_id}")
# Query total sales without filter
query_no_filter = f"""
SELECT
SUM(price * quantity) AS total_sales
FROM
`{table_id}`
WHERE
product IS NOT NULL
AND product LIKE 'Halal%'
"""
query_job_no_filter = client.query(query_no_filter)
results_no_filter = query_job_no_filter.result()
total_sales_no_filter = next(results_no_filter).total_sales or 0.0
data_scanned_no_filter = query_job_no_filter.total_bytes_billed / (1024 * 1024) # Convert to MB
print(f"Total Sales (No Filter): {total_sales_no_filter}, Data Scanned: {data_scanned_no_filter:.2f} MB")
# Query total sales with filter
query_with_filter = f"""
SELECT
SUM(price * quantity) AS total_sales
FROM
`{table_id}`
WHERE
product IS NOT NULL
AND product LIKE 'Halal%'
AND price > 10.0
"""
query_job_with_filter = client.query(query_with_filter)
results_with_filter = query_job_with_filter.result()
total_sales_with_filter = next(results_with_filter).total_sales or 0.0
data_scanned_with_filter = query_job_with_filter.total_bytes_billed / (1024 * 1024) # Convert to MB
print(f"Total Sales (With Filter): {total_sales_with_filter}, Data Scanned: {data_scanned_with_filter:.2f} MB")
# Save query results
results = {
"total_sales_with_filter": float(total_sales_with_filter),
"total_sales_without_filter": float(total_sales_no_filter)
}
file = open(json_path, "w")
json.dump(results, file, indent=2)
file.close()
print(f"Saved results to {json_path}")
# Analyze cost optimization
cost_analysis = f"""
Data Scanned With Filter: {data_scanned_with_filter:.2f} MB
Data Scanned Without Filter: {data_scanned_no_filter:.2f} MB
Analysis: Adding a filter like 'price > 10.0' reduces data scanned from {data_scanned_no_filter:.2f} MB to {data_scanned_with_filter:.2f} MB, lowering costs since BigQuery charges ~$5/TB scanned.
"""
file = open(cost_path, "w")
file.write(cost_analysis.strip())
file.close()
print(f"Saved cost analysis to {cost_path}")
# Test
if os.path.exists("data/sample.csv"):
synthesize_bigquery_operations(
"your-project-id", "test_data", "sample_sales", "data/sample.csv", "ex6_results.json", "ex6_cost.txt"
)
else:
print("Error: data/sample.csv not found")25.7 Chapter Summary and Connection to Chapter 26
In this chapter, you’ve mastered:
- BigQuery Setup: Creating datasets (O(1) metadata operations) with Google Cloud SDK.
- Table Management: Uploading CSVs (O(n) for n rows) with schemas, handling malformed data.
- SQL Queries: Filtering and aggregating data (O(n) scanning) using simple and parameterized SQL.
- White-Space Sensitivity and PEP 8: Using 4-space indentation to avoid
IndentationError.
The micro-project built a sales analytics tool, creating a BigQuery dataset, uploading data/sales.csv, querying metrics with parameterized SQL, and exporting results, aligning with Hijra Group’s Sharia-compliant cloud analytics needs. The exercises reinforced these concepts, including a performance comparison of BigQuery and SQLite and a synthesized task with practical cost optimization analysis. BigQuery tables can serve as sources for data lakes and marts, explored in Chapters 31–32.
Connection to Chapter 26
Chapter 26 introduces Python and BigQuery Integration, building on this chapter:
- Data Loading: Extends CSV uploads to programmatic pipelines.
- SQL Queries: Automates parameterized queries with Python scripts.
- Modules: Reuses utilities for validation, preparing for OOP (Chapter 5).
- Fintech Context: Enables automated analytics for Hijra Group, maintaining PEP 8 compliance.