BigQuery
Query Google BigQuery from Valohai executions and save snapshots for reproducible ML pipelines.
Overview
Google BigQuery is a serverless data warehouse that you can query directly from Valohai executions. This guide shows you how to:
Set up authentication (service account or credentials)
Query BigQuery from your code
Save snapshots for reproducibility
Prerequisites
Before you begin:
Existing BigQuery dataset with data to query
GCP project with BigQuery API enabled
Service account or credentials for authentication
Authentication: Choose Your Method
Decision Tree
Are you running Valohai on GCP?
├─ Yes: Use Service Account **Recommended**
│ More secure, no credentials to manage
│
└─ No: Use stored credentials (JSON key file)
Works anywhere, requires credential managementOption 1: Service Account Authentication (Recommended)
Use your GCP compute instances' service account identity to authenticate without storing credentials.
How It Works
Valohai VMs run with an attached service account
Grant this service account BigQuery permissions
Executions automatically authenticate using the machine's identity
No credentials stored in Valohai
Benefits
No credentials to manage or rotate
More secure (no JSON keys in environment variables)
Automatic authentication
Fine-grained permissions per Valohai environment
Step 1: Create Service Account
Go to your GCP project hosting Valohai resources
Navigate to IAM & Admin → Service Accounts
Click "Create Service Account"
Name it (e.g.,
valohai-bigquery-access)Click "Create and Continue"
Step 2: Grant BigQuery Permissions
Grant the service account access to BigQuery:
In the service account creation wizard, click Grant this service account access to project
Add role: BigQuery User
Add role: BigQuery Data Viewer (if you only need read access)
Click Done
For write access: Add BigQuery Data Editor role
Step 3: Attach to Valohai Environment
Copy the service account email (e.g.,
[email protected])Share this with Valohai support to attach it to your environments
Different Valohai environments can use different service accounts (dev/staging/production)
💡 Multiple environments: Use separate service accounts with different permissions for each Valohai environment to control access granularly.
Step 4: Query Using Service Account
Python example with automatic service account authentication:
from google.cloud import bigquery
import pandas as pd
# Client automatically uses attached service account
client = bigquery.Client(project='my-gcp-project')
# Query data
query = """
SELECT
customer_id,
order_date,
total_amount,
product_category
FROM `my-project.sales.orders`
WHERE order_date >= '2024-01-01'
ORDER BY order_date DESC
LIMIT 100000
"""
# Execute query and get results as DataFrame
df = client.query(query).result().to_dataframe()
print(f"Fetched {len(df)} rows from BigQuery")
# Save snapshot for reproducibility
snapshot_path = '/valohai/outputs/orders_snapshot.csv'
df.to_csv(snapshot_path, index=False)
print(f"Saved snapshot to {snapshot_path}")Install dependencies:
command:
- pip install google-cloud-bigquery pandas
- python query_bigquery.pyOr in Dockerfile:
RUN pip install google-cloud-bigquery pandas db-dtypes💡 Note:
db-dtypespackage improves BigQuery data type handling in pandas.
Accessing BigQuery from Another GCP Project
If your BigQuery data is in a different GCP project than your Valohai resources:
In the BigQuery data project:
Grant your Valohai service account BigQuery User permissions
Grant BigQuery Data Viewer (or Editor) on specific datasets
In the Valohai infrastructure project:
The service account doesn't need BigQuery permissions here
Only needs permissions in the project with actual data
Query cross-project data:
# Specify the project with your data
client = bigquery.Client(project='valohai-infrastructure-project')
# Query from different project
query = """
SELECT *
FROM `bigquery-data-project.dataset.table`
WHERE date >= '2024-01-01'
"""
df = client.query(query).result().to_dataframe()Option 2: Credentials Authentication
Store BigQuery service account JSON key as a Valohai environment variable.
Step 1: Create Service Account Key
In GCP Console, go to IAM & Admin → Service Accounts
Find or create a service account with BigQuery permissions
Click on the service account
Go to Keys tab
Click Add Key → Create new key
Choose JSON format
Download the JSON key file
Step 2: Store Key in Valohai
Open your project in Valohai
Go to Settings → Env Variables
Add variable:
GOOGLE_APPLICATION_CREDENTIALS_JSON
{entire JSON key file content}
Yes
Alternatively, for the key file path approach:
GCP_PROJECT_ID
my-gcp-project
No
💡 Environment Variable Groups: Organization admins can create shared credential groups under Organization Settings.
Step 3: Query Using Credentials
Python example with JSON credentials:
from google.cloud import bigquery
import pandas as pd
import json
import os
# Load credentials from environment variable
credentials_json = os.getenv('GOOGLE_APPLICATION_CREDENTIALS_JSON')
# Write to temporary file (BigQuery client expects file path)
import tempfile
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.json') as f:
f.write(credentials_json)
credentials_path = f.name
# Set environment variable for BigQuery client
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = credentials_path
# Create client
client = bigquery.Client(project='my-gcp-project')
# Query data
query = """
SELECT
user_id,
event_name,
event_timestamp,
device_category
FROM `my-project.analytics.events`
WHERE event_date = CURRENT_DATE() - 1
"""
df = client.query(query).result().to_dataframe()
print(f"Fetched {len(df)} rows from BigQuery")
# Save snapshot
snapshot_path = '/valohai/outputs/events_snapshot.csv'
df.to_csv(snapshot_path, index=False)
print(f"Saved snapshot to {snapshot_path}")
# Cleanup
os.remove(credentials_path)Complete Workflow: Query → Snapshot → Train
Step 1: Query and Save Snapshot
fetch_data.py:
from google.cloud import bigquery
import pandas as pd
import json
from datetime import datetime
# Create BigQuery client (uses attached service account)
client = bigquery.Client(project='my-gcp-project')
# Query training data with proper filters
query = """
SELECT
customer_id,
age,
income,
lifetime_value,
transaction_count,
days_since_last_purchase,
churn_label
FROM `my-project.ml_features.customer_features`
WHERE feature_date = CURRENT_DATE() - 1
AND data_quality_flag = TRUE
"""
print("Executing BigQuery query...")
query_job = client.query(query)
# Get results as DataFrame
df = query_job.result().to_dataframe()
print(f"Fetched {len(df)} rows from BigQuery")
print(f"Columns: {list(df.columns)}")
# Basic data validation
assert len(df) > 0, "No data returned from query"
assert df['churn_label'].notna().all(), "Missing labels in data"
# Save snapshot
snapshot_path = '/valohai/outputs/training_data.csv'
df.to_csv(snapshot_path, index=False)
# Create dataset version with metadata
metadata = {
"training_data.csv": {
"valohai.dataset-versions": [{
"uri": "dataset://customer-churn/daily-features"
}],
"valohai.tags": ["bigquery", "daily", "production"],
"query_date": datetime.now().isoformat(),
"row_count": len(df),
"source_table": "ml_features.customer_features",
"gcp_project": "my-gcp-project",
"bq_bytes_processed": query_job.total_bytes_processed,
"bq_bytes_billed": query_job.total_bytes_billed
}
}
metadata_path = '/valohai/outputs/valohai.metadata.jsonl'
with open(metadata_path, 'w') as f:
for filename, file_metadata in metadata.items():
json.dump({"file": filename, "metadata": file_metadata}, f)
f.write('\n')
print("Created dataset version: dataset://customer-churn/daily-features")
print(f"Query processed {query_job.total_bytes_processed / 1e9:.2f} GB")Step 2: Train on Snapshot
train.py:
import pandas as pd
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score, classification_report
import pickle
# Load data from snapshot (not live BigQuery!)
data_path = '/valohai/inputs/training-data/training_data.csv'
df = pd.read_csv(data_path)
print(f"Training on {len(df)} rows from snapshot")
# Prepare features
feature_columns = [
'age', 'income', 'lifetime_value',
'transaction_count', 'days_since_last_purchase'
]
X = df[feature_columns]
y = df['churn_label']
# Split data
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
print(f"Training set: {len(X_train)} rows")
print(f"Test set: {len(X_test)} rows")
# Train model
model = GradientBoostingClassifier(
n_estimators=100,
learning_rate=0.1,
max_depth=5,
random_state=42
)
model.fit(X_train, y_train)
# Evaluate
y_pred_proba = model.predict_proba(X_test)[:, 1]
auc_score = roc_auc_score(y_test, y_pred_proba)
print(f"ROC AUC Score: {auc_score:.4f}")
print("\nClassification Report:")
print(classification_report(y_test, model.predict(X_test)))
# Save model
model_path = '/valohai/outputs/churn_model.pkl'
with open(model_path, 'wb') as f:
pickle.dump(model, f)
# Save metrics
import json
metrics = {
"auc_score": float(auc_score),
"training_rows": len(X_train),
"test_rows": len(X_test),
"features": feature_columns
}
with open('/valohai/outputs/metrics.json', 'w') as f:
json.dump(metrics, f, indent=2)
print(f"Saved model to {model_path}")Step 3: Pipeline Configuration
valohai.yaml:
- step:
name: fetch-from-bigquery
image: python:3.9
command:
- pip install google-cloud-bigquery pandas db-dtypes
- python fetch_data.py
environment-variables:
- name: GCP_PROJECT_ID
default: my-gcp-project
- step:
name: train-churn-model
image: python:3.9
command:
- pip install pandas scikit-learn
- python train.py
inputs:
- name: training-data
default: dataset://customer-churn/daily-featuresMaintaining Reproducibility
⚠️ Critical: BigQuery data changes continuously. Query results today differ from results tomorrow.
The problem:
# Training today
df = client.query(query).result().to_dataframe() # 50,000 rows
model.fit(df[features], df[target])
# Retraining next week (same query)
df = client.query(query).result().to_dataframe() # 55,000 rows
# Different data = different model = can't reproduce original resultsThe solution:
# Always save snapshots
df = client.query(query).result().to_dataframe()
df.to_csv('/valohai/outputs/snapshot.csv') # Immutable snapshot
# Train on snapshot (stored in GCS/S3)
# Can reproduce exactly, even months laterBest practices:
Query once — Run query in dedicated execution
Snapshot immediately — Save to
/valohai/outputs/Version snapshots — Create dataset versions
Train on snapshots — Use dataset as input, never query directly in training
Schedule snapshots — Create fresh snapshots daily/weekly/monthly
See: Databases for complete reproducibility patterns.
Common Issues & Fixes
Authentication Failed
Symptom: google.auth.exceptions.DefaultCredentialsError
Causes & Fixes:
Service account not attached → Contact Valohai support to attach service account
Wrong project ID → Verify GCP project ID in client initialization
Missing credentials → Check
GOOGLE_APPLICATION_CREDENTIALSor service account setup
Permission Denied on Query
Symptom: google.api_core.exceptions.Forbidden: 403 Access Denied
Causes & Fixes:
Service account missing BigQuery User role → Add role in IAM
Missing dataset permissions → Grant BigQuery Data Viewer on specific datasets
Cross-project access not configured → Grant permissions in data project
Out of Memory
Symptom: MemoryError when converting to DataFrame
Causes & Fixes:
Result set too large → Add LIMIT clause or process in chunks
Execution environment too small → Use larger machine type in Valohai
Related Pages
Databases — Overview and reproducibility patterns
AWS Redshift — Connect to Amazon Redshift
Snowflake — Connect to Snowflake
Create and Manage Datasets — Version your snapshots
Next Steps
Set up service account authentication
Create a test query execution
Save your first snapshot as a dataset version
Build a reproducible training pipeline using snapshots
Optimize query costs with partitioning and filtering
Last updated
Was this helpful?
