# BigQuery

Query Google BigQuery from Valohai executions and save snapshots for reproducible ML pipelines.

***

### Overview

Google BigQuery is a serverless data warehouse that you can query directly from Valohai executions. This guide shows you how to:

1. Set up authentication (service account or credentials)
2. Query BigQuery from your code
3. Save snapshots for reproducibility

***

### Prerequisites

Before you begin:

* **Existing BigQuery dataset** with data to query
* **GCP project** with BigQuery API enabled
* **Service account or credentials** for authentication

***

### Authentication: Choose Your Method

#### Decision Tree

```
Are you running Valohai on GCP?
├─ Yes: Use Service Account **Recommended**
│       More secure, no credentials to manage
│
└─ No: Use stored credentials (JSON key file)
        Works anywhere, requires credential management
```

***

### Option 1: Service Account Authentication (Recommended)

Use your GCP compute instances' service account identity to authenticate without storing credentials.

#### How It Works

1. Valohai VMs run with an attached service account
2. Grant this service account BigQuery permissions
3. Executions automatically authenticate using the machine's identity
4. No credentials stored in Valohai

#### Benefits

* No credentials to manage or rotate
* More secure (no JSON keys in environment variables)
* Automatic authentication
* Fine-grained permissions per Valohai environment

***

#### Step 1: Create Service Account

1. Go to your GCP project hosting Valohai resources
2. Navigate to **IAM & Admin → Service Accounts**
3. Click **"Create Service Account"**
4. Name it (e.g., `valohai-bigquery-access`)
5. Click **"Create and Continue"**

***

#### Step 2: Grant BigQuery Permissions

Grant the service account access to BigQuery:

1. In the service account creation wizard, click **Grant this service account access to project**
2. Add role: **BigQuery User**
3. Add role: **BigQuery Data Viewer** (if you only need read access)
4. Click **Done**

**For write access:** Add **BigQuery Data Editor** role

***

#### Step 3: Attach to Valohai Environment

1. Copy the service account email (e.g., `valohai-bigquery-access@my-project.iam.gserviceaccount.com`)
2. Share this with Valohai support to attach it to your environments
3. Different Valohai environments can use different service accounts (dev/staging/production)

> 💡 **Multiple environments:** Use separate service accounts with different permissions for each Valohai environment to control access granularly.

***

#### Step 4: Query Using Service Account

**Python example with automatic service account authentication:**

```python
from google.cloud import bigquery
import pandas as pd

# Client automatically uses attached service account
client = bigquery.Client(project="my-gcp-project")

# Query data
query = """
    SELECT
        customer_id,
        order_date,
        total_amount,
        product_category
    FROM `my-project.sales.orders`
    WHERE order_date >= '2024-01-01'
    ORDER BY order_date DESC
    LIMIT 100000
"""

# Execute query and get results as DataFrame
df = client.query(query).result().to_dataframe()

print(f"Fetched {len(df)} rows from BigQuery")

# Save snapshot for reproducibility
snapshot_path = "/valohai/outputs/orders_snapshot.csv"
df.to_csv(snapshot_path, index=False)

print(f"Saved snapshot to {snapshot_path}")
```

**Install dependencies:**

```yaml
command:
  - pip install google-cloud-bigquery pandas
  - python query_bigquery.py
```

Or in Dockerfile:

```dockerfile
RUN pip install google-cloud-bigquery pandas db-dtypes
```

> 💡 **Note:** `db-dtypes` package improves BigQuery data type handling in pandas.

***

#### Accessing BigQuery from Another GCP Project

If your BigQuery data is in a different GCP project than your Valohai resources:

1. **In the BigQuery data project:**
   * Grant your Valohai service account **BigQuery User** permissions
   * Grant **BigQuery Data Viewer** (or Editor) on specific datasets
2. **In the Valohai infrastructure project:**
   * The service account doesn't need BigQuery permissions here
   * Only needs permissions in the project with actual data

**Query cross-project data:**

```python
# Specify the project with your data
client = bigquery.Client(project="valohai-infrastructure-project")

# Query from different project
query = """
    SELECT *
    FROM `bigquery-data-project.dataset.table`
    WHERE date >= '2024-01-01'
"""

df = client.query(query).result().to_dataframe()
```

***

### Option 2: Credentials Authentication

Store BigQuery service account JSON key as a Valohai environment variable.

#### Step 1: Create Service Account Key

1. In GCP Console, go to **IAM & Admin → Service Accounts**
2. Find or create a service account with BigQuery permissions
3. Click on the service account
4. Go to **Keys** tab
5. Click **Add Key → Create new key**
6. Choose **JSON** format
7. Download the JSON key file

***

#### Step 2: Store Key in Valohai

1. Open your project in Valohai
2. Go to **Settings → Env Variables**
3. Add variable:

<table><thead><tr><th width="325.5">Name</th><th width="270.25">Value</th><th width="127.25">Secret</th></tr></thead><tbody><tr><td><code>GOOGLE_APPLICATION_CREDENTIALS_JSON</code></td><td><code>{entire JSON key file content}</code></td><td><strong>Yes</strong></td></tr></tbody></table>

Alternatively, for the key file path approach:

<table><thead><tr><th width="326.5">Name</th><th width="269.5">Value</th><th width="127">Secret</th></tr></thead><tbody><tr><td><code>GCP_PROJECT_ID</code></td><td><code>my-gcp-project</code></td><td>No</td></tr></tbody></table>

> 💡 **Environment Variable Groups:** Organization admins can create shared credential groups under Organization Settings.

***

#### Step 3: Query Using Credentials

**Python example with JSON credentials:**

```python
from google.cloud import bigquery
import pandas as pd
import json
import os

# Load credentials from environment variable
credentials_json = os.getenv("GOOGLE_APPLICATION_CREDENTIALS_JSON")

# Write to temporary file (BigQuery client expects file path)
import tempfile

with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".json") as f:
    f.write(credentials_json)
    credentials_path = f.name

# Set environment variable for BigQuery client
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = credentials_path

# Create client
client = bigquery.Client(project="my-gcp-project")

# Query data
query = """
    SELECT
        user_id,
        event_name,
        event_timestamp,
        device_category
    FROM `my-project.analytics.events`
    WHERE event_date = CURRENT_DATE() - 1
"""

df = client.query(query).result().to_dataframe()

print(f"Fetched {len(df)} rows from BigQuery")

# Save snapshot
snapshot_path = "/valohai/outputs/events_snapshot.csv"
df.to_csv(snapshot_path, index=False)

print(f"Saved snapshot to {snapshot_path}")

# Cleanup
os.remove(credentials_path)
```

***

### Complete Workflow: Query → Snapshot → Train

#### Step 1: Query and Save Snapshot

**fetch\_data.py:**

```python
from google.cloud import bigquery
import pandas as pd
import json
from datetime import datetime

# Create BigQuery client (uses attached service account)
client = bigquery.Client(project="my-gcp-project")

# Query training data with proper filters
query = """
    SELECT
        customer_id,
        age,
        income,
        lifetime_value,
        transaction_count,
        days_since_last_purchase,
        churn_label
    FROM `my-project.ml_features.customer_features`
    WHERE feature_date = CURRENT_DATE() - 1
    AND data_quality_flag = TRUE
"""

print("Executing BigQuery query...")
query_job = client.query(query)

# Get results as DataFrame
df = query_job.result().to_dataframe()

print(f"Fetched {len(df)} rows from BigQuery")
print(f"Columns: {list(df.columns)}")

# Basic data validation
assert len(df) > 0, "No data returned from query"
assert df["churn_label"].notna().all(), "Missing labels in data"

# Save snapshot
snapshot_path = "/valohai/outputs/training_data.csv"
df.to_csv(snapshot_path, index=False)

# Create dataset version with metadata
metadata = {
    "training_data.csv": {
        "valohai.dataset-versions": [
            {
                "uri": "dataset://customer-churn/daily-features",
            },
        ],
        "valohai.tags": ["bigquery", "daily", "production"],
        "query_date": datetime.now().isoformat(),
        "row_count": len(df),
        "source_table": "ml_features.customer_features",
        "gcp_project": "my-gcp-project",
        "bq_bytes_processed": query_job.total_bytes_processed,
        "bq_bytes_billed": query_job.total_bytes_billed,
    },
}

metadata_path = "/valohai/outputs/valohai.metadata.jsonl"
with open(metadata_path, "w") as f:
    for filename, file_metadata in metadata.items():
        json.dump({"file": filename, "metadata": file_metadata}, f)
        f.write("\n")

print("Created dataset version: dataset://customer-churn/daily-features")
print(f"Query processed {query_job.total_bytes_processed / 1e9:.2f} GB")
```

***

#### Step 2: Train on Snapshot

**train.py:**

```python
import pandas as pd
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score, classification_report
import pickle

# Load data from snapshot (not live BigQuery!)
data_path = "/valohai/inputs/training-data/training_data.csv"
df = pd.read_csv(data_path)

print(f"Training on {len(df)} rows from snapshot")

# Prepare features
feature_columns = [
    "age",
    "income",
    "lifetime_value",
    "transaction_count",
    "days_since_last_purchase",
]
X = df[feature_columns]
y = df["churn_label"]

# Split data
X_train, X_test, y_train, y_test = train_test_split(
    X,
    y,
    test_size=0.2,
    random_state=42,
    stratify=y,
)

print(f"Training set: {len(X_train)} rows")
print(f"Test set: {len(X_test)} rows")

# Train model
model = GradientBoostingClassifier(
    n_estimators=100,
    learning_rate=0.1,
    max_depth=5,
    random_state=42,
)

model.fit(X_train, y_train)

# Evaluate
y_pred_proba = model.predict_proba(X_test)[:, 1]
auc_score = roc_auc_score(y_test, y_pred_proba)

print(f"ROC AUC Score: {auc_score:.4f}")
print("\nClassification Report:")
print(classification_report(y_test, model.predict(X_test)))

# Save model
model_path = "/valohai/outputs/churn_model.pkl"
with open(model_path, "wb") as f:
    pickle.dump(model, f)

# Save metrics
import json

metrics = {
    "auc_score": float(auc_score),
    "training_rows": len(X_train),
    "test_rows": len(X_test),
    "features": feature_columns,
}

with open("/valohai/outputs/metrics.json", "w") as f:
    json.dump(metrics, f, indent=2)

print(f"Saved model to {model_path}")
```

***

#### Step 3: Pipeline Configuration

**valohai.yaml:**

```yaml
- step:
    name: fetch-from-bigquery
    image: python:3.9
    command:
      - pip install google-cloud-bigquery pandas db-dtypes
      - python fetch_data.py
    environment-variables:
      - name: GCP_PROJECT_ID
        default: my-gcp-project

- step:
    name: train-churn-model
    image: python:3.9
    command:
      - pip install pandas scikit-learn
      - python train.py
    inputs:
      - name: training-data
        default: dataset://customer-churn/daily-features
```

***

### Maintaining Reproducibility

> ⚠️ **Critical:** BigQuery data changes continuously. Query results today differ from results tomorrow.

**The problem:**

```python
# Training today
df = client.query(query).result().to_dataframe()  # 50,000 rows
model.fit(df[features], df[target])

# Retraining next week (same query)
df = client.query(query).result().to_dataframe()  # 55,000 rows
# Different data = different model = can't reproduce original results
```

**The solution:**

```python
# Always save snapshots
df = client.query(query).result().to_dataframe()
df.to_csv("/valohai/outputs/snapshot.csv")  # Immutable snapshot

# Train on snapshot (stored in GCS/S3)
# Can reproduce exactly, even months later
```

**Best practices:**

1. **Query once** — Run query in dedicated execution
2. **Snapshot immediately** — Save to `/valohai/outputs/`
3. **Version snapshots** — Create dataset versions
4. **Train on snapshots** — Use dataset as input, never query directly in training
5. **Schedule snapshots** — Create fresh snapshots daily/weekly/monthly

**See:** [Databases](/data/data-databases.md) for complete reproducibility patterns.

***

### Common Issues & Fixes

#### Authentication Failed

**Symptom:** `google.auth.exceptions.DefaultCredentialsError`

**Causes & Fixes:**

* Service account not attached → Contact Valohai support to attach service account
* Wrong project ID → Verify GCP project ID in client initialization
* Missing credentials → Check `GOOGLE_APPLICATION_CREDENTIALS` or service account setup

***

#### Permission Denied on Query

**Symptom:** `google.api_core.exceptions.Forbidden: 403 Access Denied`

**Causes & Fixes:**

* Service account missing BigQuery User role → Add role in IAM
* Missing dataset permissions → Grant BigQuery Data Viewer on specific datasets
* Cross-project access not configured → Grant permissions in data project

***

#### Out of Memory

**Symptom:** `MemoryError` when converting to DataFrame

**Causes & Fixes:**

* Result set too large → Add LIMIT clause or process in chunks
* Execution environment too small → Use larger machine type in Valohai

***

### Related Pages

* [Databases ](/data/data-databases.md)— Overview and reproducibility patterns
* [AWS Redshift](/data/data-databases/aws-redshift.md) — Connect to Amazon Redshift
* [Snowflake](/data/data-databases/snowflake.md) — Connect to Snowflake
* [Create and Manage Datasets](/data/datasets/creating-datasets.md) — Version your snapshots

***

### Next Steps

* Set up service account authentication
* Create a test query execution
* Save your first snapshot as a [dataset version](/data/datasets/update-datasets.md)
* Build a reproducible training [pipeline](/pipelines.md) using snapshots
* Optimize query costs with partitioning and filtering


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.valohai.com/data/data-databases/bigquery.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
