BigQuery

Query Google BigQuery from Valohai executions and save snapshots for reproducible ML pipelines.


Overview

Google BigQuery is a serverless data warehouse that you can query directly from Valohai executions. This guide shows you how to:

  1. Set up authentication (service account or credentials)

  2. Query BigQuery from your code

  3. Save snapshots for reproducibility


Prerequisites

Before you begin:

  • Existing BigQuery dataset with data to query

  • GCP project with BigQuery API enabled

  • Service account or credentials for authentication


Authentication: Choose Your Method

Decision Tree

Are you running Valohai on GCP?
├─ Yes: Use Service Account **Recommended**
│       More secure, no credentials to manage

└─ No: Use stored credentials (JSON key file)
        Works anywhere, requires credential management

Use your GCP compute instances' service account identity to authenticate without storing credentials.

How It Works

  1. Valohai VMs run with an attached service account

  2. Grant this service account BigQuery permissions

  3. Executions automatically authenticate using the machine's identity

  4. No credentials stored in Valohai

Benefits

  • No credentials to manage or rotate

  • More secure (no JSON keys in environment variables)

  • Automatic authentication

  • Fine-grained permissions per Valohai environment


Step 1: Create Service Account

  1. Go to your GCP project hosting Valohai resources

  2. Navigate to IAM & Admin → Service Accounts

  3. Click "Create Service Account"

  4. Name it (e.g., valohai-bigquery-access)

  5. Click "Create and Continue"


Step 2: Grant BigQuery Permissions

Grant the service account access to BigQuery:

  1. In the service account creation wizard, click Grant this service account access to project

  2. Add role: BigQuery User

  3. Add role: BigQuery Data Viewer (if you only need read access)

  4. Click Done

For write access: Add BigQuery Data Editor role


Step 3: Attach to Valohai Environment

  1. Copy the service account email (e.g., [email protected])

  2. Share this with Valohai support to attach it to your environments

  3. Different Valohai environments can use different service accounts (dev/staging/production)

💡 Multiple environments: Use separate service accounts with different permissions for each Valohai environment to control access granularly.


Step 4: Query Using Service Account

Python example with automatic service account authentication:

from google.cloud import bigquery
import pandas as pd

# Client automatically uses attached service account
client = bigquery.Client(project='my-gcp-project')

# Query data
query = """
    SELECT 
        customer_id,
        order_date,
        total_amount,
        product_category
    FROM `my-project.sales.orders`
    WHERE order_date >= '2024-01-01'
    ORDER BY order_date DESC
    LIMIT 100000
"""

# Execute query and get results as DataFrame
df = client.query(query).result().to_dataframe()

print(f"Fetched {len(df)} rows from BigQuery")

# Save snapshot for reproducibility
snapshot_path = '/valohai/outputs/orders_snapshot.csv'
df.to_csv(snapshot_path, index=False)

print(f"Saved snapshot to {snapshot_path}")

Install dependencies:

command:
  - pip install google-cloud-bigquery pandas
  - python query_bigquery.py

Or in Dockerfile:

RUN pip install google-cloud-bigquery pandas db-dtypes

💡 Note: db-dtypes package improves BigQuery data type handling in pandas.


Accessing BigQuery from Another GCP Project

If your BigQuery data is in a different GCP project than your Valohai resources:

  1. In the BigQuery data project:

    • Grant your Valohai service account BigQuery User permissions

    • Grant BigQuery Data Viewer (or Editor) on specific datasets

  2. In the Valohai infrastructure project:

    • The service account doesn't need BigQuery permissions here

    • Only needs permissions in the project with actual data

Query cross-project data:

# Specify the project with your data
client = bigquery.Client(project='valohai-infrastructure-project')

# Query from different project
query = """
    SELECT *
    FROM `bigquery-data-project.dataset.table`
    WHERE date >= '2024-01-01'
"""

df = client.query(query).result().to_dataframe()

Option 2: Credentials Authentication

Store BigQuery service account JSON key as a Valohai environment variable.

Step 1: Create Service Account Key

  1. In GCP Console, go to IAM & Admin → Service Accounts

  2. Find or create a service account with BigQuery permissions

  3. Click on the service account

  4. Go to Keys tab

  5. Click Add Key → Create new key

  6. Choose JSON format

  7. Download the JSON key file


Step 2: Store Key in Valohai

  1. Open your project in Valohai

  2. Go to Settings → Env Variables

  3. Add variable:

Name
Value
Secret

GOOGLE_APPLICATION_CREDENTIALS_JSON

{entire JSON key file content}

Yes

Alternatively, for the key file path approach:

Name
Value
Secret

GCP_PROJECT_ID

my-gcp-project

No

💡 Environment Variable Groups: Organization admins can create shared credential groups under Organization Settings.


Step 3: Query Using Credentials

Python example with JSON credentials:

from google.cloud import bigquery
import pandas as pd
import json
import os

# Load credentials from environment variable
credentials_json = os.getenv('GOOGLE_APPLICATION_CREDENTIALS_JSON')

# Write to temporary file (BigQuery client expects file path)
import tempfile
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.json') as f:
    f.write(credentials_json)
    credentials_path = f.name

# Set environment variable for BigQuery client
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = credentials_path

# Create client
client = bigquery.Client(project='my-gcp-project')

# Query data
query = """
    SELECT 
        user_id,
        event_name,
        event_timestamp,
        device_category
    FROM `my-project.analytics.events`
    WHERE event_date = CURRENT_DATE() - 1
"""

df = client.query(query).result().to_dataframe()

print(f"Fetched {len(df)} rows from BigQuery")

# Save snapshot
snapshot_path = '/valohai/outputs/events_snapshot.csv'
df.to_csv(snapshot_path, index=False)

print(f"Saved snapshot to {snapshot_path}")

# Cleanup
os.remove(credentials_path)

Complete Workflow: Query → Snapshot → Train

Step 1: Query and Save Snapshot

fetch_data.py:

from google.cloud import bigquery
import pandas as pd
import json
from datetime import datetime

# Create BigQuery client (uses attached service account)
client = bigquery.Client(project='my-gcp-project')

# Query training data with proper filters
query = """
    SELECT 
        customer_id,
        age,
        income,
        lifetime_value,
        transaction_count,
        days_since_last_purchase,
        churn_label
    FROM `my-project.ml_features.customer_features`
    WHERE feature_date = CURRENT_DATE() - 1
    AND data_quality_flag = TRUE
"""

print("Executing BigQuery query...")
query_job = client.query(query)

# Get results as DataFrame
df = query_job.result().to_dataframe()

print(f"Fetched {len(df)} rows from BigQuery")
print(f"Columns: {list(df.columns)}")

# Basic data validation
assert len(df) > 0, "No data returned from query"
assert df['churn_label'].notna().all(), "Missing labels in data"

# Save snapshot
snapshot_path = '/valohai/outputs/training_data.csv'
df.to_csv(snapshot_path, index=False)

# Create dataset version with metadata
metadata = {
    "training_data.csv": {
        "valohai.dataset-versions": [{
            "uri": "dataset://customer-churn/daily-features"
        }],
        "valohai.tags": ["bigquery", "daily", "production"],
        "query_date": datetime.now().isoformat(),
        "row_count": len(df),
        "source_table": "ml_features.customer_features",
        "gcp_project": "my-gcp-project",
        "bq_bytes_processed": query_job.total_bytes_processed,
        "bq_bytes_billed": query_job.total_bytes_billed
    }
}

metadata_path = '/valohai/outputs/valohai.metadata.jsonl'
with open(metadata_path, 'w') as f:
    for filename, file_metadata in metadata.items():
        json.dump({"file": filename, "metadata": file_metadata}, f)
        f.write('\n')

print("Created dataset version: dataset://customer-churn/daily-features")
print(f"Query processed {query_job.total_bytes_processed / 1e9:.2f} GB")

Step 2: Train on Snapshot

train.py:

import pandas as pd
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score, classification_report
import pickle

# Load data from snapshot (not live BigQuery!)
data_path = '/valohai/inputs/training-data/training_data.csv'
df = pd.read_csv(data_path)

print(f"Training on {len(df)} rows from snapshot")

# Prepare features
feature_columns = [
    'age', 'income', 'lifetime_value',
    'transaction_count', 'days_since_last_purchase'
]
X = df[feature_columns]
y = df['churn_label']

# Split data
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

print(f"Training set: {len(X_train)} rows")
print(f"Test set: {len(X_test)} rows")

# Train model
model = GradientBoostingClassifier(
    n_estimators=100,
    learning_rate=0.1,
    max_depth=5,
    random_state=42
)

model.fit(X_train, y_train)

# Evaluate
y_pred_proba = model.predict_proba(X_test)[:, 1]
auc_score = roc_auc_score(y_test, y_pred_proba)

print(f"ROC AUC Score: {auc_score:.4f}")
print("\nClassification Report:")
print(classification_report(y_test, model.predict(X_test)))

# Save model
model_path = '/valohai/outputs/churn_model.pkl'
with open(model_path, 'wb') as f:
    pickle.dump(model, f)

# Save metrics
import json
metrics = {
    "auc_score": float(auc_score),
    "training_rows": len(X_train),
    "test_rows": len(X_test),
    "features": feature_columns
}

with open('/valohai/outputs/metrics.json', 'w') as f:
    json.dump(metrics, f, indent=2)

print(f"Saved model to {model_path}")

Step 3: Pipeline Configuration

valohai.yaml:

- step:
    name: fetch-from-bigquery
    image: python:3.9
    command:
      - pip install google-cloud-bigquery pandas db-dtypes
      - python fetch_data.py
    environment-variables:
      - name: GCP_PROJECT_ID
        default: my-gcp-project

- step:
    name: train-churn-model
    image: python:3.9
    command:
      - pip install pandas scikit-learn
      - python train.py
    inputs:
      - name: training-data
        default: dataset://customer-churn/daily-features

Maintaining Reproducibility

⚠️ Critical: BigQuery data changes continuously. Query results today differ from results tomorrow.

The problem:

# Training today
df = client.query(query).result().to_dataframe()  # 50,000 rows
model.fit(df[features], df[target])

# Retraining next week (same query)
df = client.query(query).result().to_dataframe()  # 55,000 rows
# Different data = different model = can't reproduce original results

The solution:

# Always save snapshots
df = client.query(query).result().to_dataframe()
df.to_csv('/valohai/outputs/snapshot.csv')  # Immutable snapshot

# Train on snapshot (stored in GCS/S3)
# Can reproduce exactly, even months later

Best practices:

  1. Query once — Run query in dedicated execution

  2. Snapshot immediately — Save to /valohai/outputs/

  3. Version snapshots — Create dataset versions

  4. Train on snapshots — Use dataset as input, never query directly in training

  5. Schedule snapshots — Create fresh snapshots daily/weekly/monthly

See: Databases for complete reproducibility patterns.


Common Issues & Fixes

Authentication Failed

Symptom: google.auth.exceptions.DefaultCredentialsError

Causes & Fixes:

  • Service account not attached → Contact Valohai support to attach service account

  • Wrong project ID → Verify GCP project ID in client initialization

  • Missing credentials → Check GOOGLE_APPLICATION_CREDENTIALS or service account setup


Permission Denied on Query

Symptom: google.api_core.exceptions.Forbidden: 403 Access Denied

Causes & Fixes:

  • Service account missing BigQuery User role → Add role in IAM

  • Missing dataset permissions → Grant BigQuery Data Viewer on specific datasets

  • Cross-project access not configured → Grant permissions in data project


Out of Memory

Symptom: MemoryError when converting to DataFrame

Causes & Fixes:

  • Result set too large → Add LIMIT clause or process in chunks

  • Execution environment too small → Use larger machine type in Valohai



Next Steps

  • Set up service account authentication

  • Create a test query execution

  • Save your first snapshot as a dataset version

  • Build a reproducible training pipeline using snapshots

  • Optimize query costs with partitioning and filtering

Last updated

Was this helpful?