Databases

Connect to production databases from your ML pipelines to query fresh data, but snapshot results to maintain reproducibility as data evolves.


The Challenge: Databases Change

Production databases are constantly updated with new data. This creates a reproducibility problem for ML:

Today's query:

SELECT * FROM customer_data WHERE signup_date >= '2024-01-01'
-- Returns: 50,000 rows

Same query next month:

SELECT * FROM customer_data WHERE signup_date >= '2024-01-01'
-- Returns: 65,000 rows (15,000 new customers!)

The problem:

  • Rerunning your training pipeline produces different results

  • Can't reproduce experiments from last week

  • Hard to debug: "Did the model improve, or did the data change?"

  • Compliance issues: "Which data was used to train the production model?"


The Solution: Query + Snapshot Pattern

Valohai solves this with a simple workflow:

  1. Query — Fetch data from your database

  2. Snapshot — Save results to /valohai/outputs/

  3. Version — Create dataset version with snapshot

  4. Train — Use versioned dataset as input

[Database] → [Query Execution] → [Snapshot in S3/GCS/Azure]

         [Dataset Version]

         [Training Execution]

Benefits:

  • Reproducible: Same dataset version = same results forever

  • Traceable: Know exactly which data trained each model

  • Fast: Reuse snapshots instead of requerying database

  • Compliant: Audit trail of data used in production models


Authentication: Machine Identity vs. Credentials

Choose your authentication method based on security requirements and available infrastructure.

Decision Tree

Do you run on AWS and use Redshift?
├─ Yes: Use IAM Role (machine identity) **Recommended**
└─ No: ↓

Do you run on GCP and use BigQuery?
├─ Yes: Use Service Account (machine identity) **Recommended**
└─ No: ↓

Use stored credentials (username/password)

What it is: Your execution environment's cloud identity authenticates automatically.

Pros:

  • No credentials to manage

  • More secure (no secrets in code or environment variables)

  • Automatic rotation via cloud IAM

  • Fine-grained permissions per environment

Cons:

  • Cloud-specific (AWS IAM, GCP Service Accounts)

  • Requires infrastructure setup

When to use: Production pipelines on AWS (Redshift) or GCP (BigQuery)

Supported:

  • AWS Redshift → IAM Roles

  • GCP BigQuery → Service Accounts


Stored Credentials

What it is: Store database username/password as Valohai environment variables.

Pros:

  • Works with any database

  • Simple setup

  • Cloud-agnostic

Cons:

  • Credentials to manage and rotate

  • Stored as environment variables (marked secret)

When to use:

  • Databases without machine identity support

  • Quick prototypes

  • On-premises databases

  • Cross-cloud scenarios

Supported: All databases (Redshift, BigQuery, Snowflake, PostgreSQL, MySQL, etc.)


Complete Workflow Example

Here's the full pattern: query database → save snapshot → create dataset → train model.

Step 1: Query Database and Save Snapshot

import pandas as pd
import your_database_connector  # psycopg2, bigquery, snowflake, etc.

# Connect to database (see specific database guides for auth)
conn = connect_to_database()

# Query data
query = """
    SELECT 
        customer_id,
        signup_date,
        total_purchases,
        churn_label
    FROM customer_data
    WHERE signup_date >= '2024-01-01'
"""

df = pd.read_sql(query, conn)
conn.close()

print(f"Fetched {len(df)} rows from database")

# Snapshot: Save to Valohai outputs
output_path = '/valohai/outputs/customer_data_snapshot.csv'
df.to_csv(output_path, index=False)

print(f"Saved snapshot to {output_path}")

Step 2: Create Dataset Version with Snapshot

import json

# Add snapshot to dataset version
metadata = {
    "customer_data_snapshot.csv": {
        "valohai.dataset-versions": [{
            "uri": "dataset://customer-data/2024-q1"
        }],
        "valohai.tags": ["snapshot", "2024-q1", "production"],
        "query_date": "2024-01-15",
        "row_count": len(df)
    }
}

# Save metadata
metadata_path = '/valohai/outputs/valohai.metadata.jsonl'
with open(metadata_path, 'w') as f:
    for filename, file_metadata in metadata.items():
        json.dump({"file": filename, "metadata": file_metadata}, f)
        f.write('\n')

print("Created dataset version: dataset://customer-data/2024-q1")

Step 3: Train Using Versioned Dataset

valohai.yaml:

- step:
    name: query-database
    image: python:3.9
    command:
      - pip install pandas psycopg2-binary  # or your database connector
      - python fetch_data.py
    environment-variables:
      - name: DB_HOST
        default: my-database.example.com
      - name: DB_USER
        default: ml_pipeline

- step:
    name: train-model
    image: python:3.9
    command:
      - pip install pandas scikit-learn
      - python train.py
    inputs:
      - name: training-data
        default: dataset://customer-data/2024-q1  # Uses snapshot, not live database

train.py:

import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier

# Load data from snapshot (not database!)
data_path = '/valohai/inputs/training-data/customer_data_snapshot.csv'
df = pd.read_csv(data_path)

print(f"Training on {len(df)} rows from snapshot")

# Train model
X = df[['total_purchases', 'days_since_signup']]
y = df['churn_label']

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

model = RandomForestClassifier()
model.fit(X_train, y_train)

accuracy = model.score(X_test, y_test)
print(f"Model accuracy: {accuracy:.4f}")

# Save model
import pickle
with open('/valohai/outputs/model.pkl', 'wb') as f:
    pickle.dump(model, f)

Step 4: Update Snapshot (Monthly/Weekly)

Create new dataset versions as data evolves:

# Next month: Create new snapshot
metadata = {
    "customer_data_snapshot.csv": {
        "valohai.dataset-versions": [{
            "uri": "dataset://customer-data/2024-q2"  # New version
        }],
        "valohai.tags": ["snapshot", "2024-q2", "production"],
        "query_date": "2024-04-15",
        "row_count": len(df)
    }
}

Result:

  • 2024-q1 version: 50,000 rows (immutable)

  • 2024-q2 version: 65,000 rows (immutable)

  • Can train on either version reproducibly

  • Can compare model performance across data versions


Why Snapshots Matter: Real Example

Without snapshots:

Week 1: Train model → 85% accuracy
Week 2: Retrain "same" model → 87% accuracy
Question: Did the model improve, or did the data change?

With snapshots:

Week 1: Train on dataset://data/week1 → 85% accuracy
Week 2:
  - Retrain on dataset://data/week1 → 85% accuracy (reproducible)
  - Train on dataset://data/week2 → 87% accuracy (data improved model)
Answer: The data helped! We can prove it. 🎯

Best Practices

Snapshot Every Query

# Good: Always save snapshots
df = pd.read_sql(query, conn)
df.to_csv('/valohai/outputs/data_snapshot.csv')

# Bad: Query database directly in training
# (Can't reproduce if data changes)
df = pd.read_sql(query, conn)
model.fit(df[features], df[target])

Version Your Snapshots

# Good: Clear versioning by time period
"uri": "dataset://sales-data/2024-q1"
"uri": "dataset://sales-data/2024-q2"

# Avoid: Arbirtray version names
"uri": "dataset://sales-data/working"
"uri": "dataset://sales-data/last"
"uri": "dataset://sales-data/last_1"

Tag Snapshots with Metadata

metadata = {
    "snapshot.csv": {
        "valohai.dataset-versions": [{
            "uri": "dataset://data/2024-q1"
        }],
        "valohai.tags": ["production", "validated", "2024-q1"],
        "query_date": "2024-01-15T10:30:00Z",
        "query": "SELECT * FROM customers WHERE ...",
        "row_count": 50000,
        "date_range": "2024-01-01 to 2024-03-31",
        "source_table": "customers",
        "source_database": "production_db"
    }
}

This metadata helps with:

  • Debugging ("Which data was this?")

  • Compliance ("Prove what data trained this model")

  • Optimization ("Do we need to requery or can we reuse?")


Schedule Regular Snapshots

Create a pipeline that runs automatically:

- step:
    name: weekly-snapshot
    image: python:3.9
    command:
      - pip install pandas psycopg2-binary
      - python create_weekly_snapshot.py
    environment-variables:
      - name: SNAPSHOT_DATE
        default: "2024-01-15"  # Parameterize for automation

Run weekly/monthly via Valohai scheduling or external orchestration (Airflow, GitHub Actions, etc.)


Separate Query from Training

# Good: Two-step pipeline
- step:
    name: snapshot-data
    image: python:3.9
    command: python fetch_and_save.py

- step:
    name: train
    image: python:3.9
    command: python train.py
    inputs:
      - name: data
        default: dataset://data/latest

# Avoid: Query inside training step
# (Harder to reproduce, slower iteration)

Benefits:

  • Reuse snapshots across experiments

  • Iterate on models without requerying database

  • Clear separation of concerns


Environment Variables for Credentials

All database pages use environment variables for storing credentials. Here's the standard pattern:

Project-Level Variables

  1. Open your project in Valohai

  2. Go to Settings → Env Variables

  3. Add database credentials

  4. Mark sensitive values (passwords, tokens) as Secret

Example for any database:

Name
Value
Secret

DB_HOST

my-database.example.com

No

DB_NAME

production_db

No

DB_USER

ml_pipeline_user

No

DB_PASSWORD

your-secure-password

Yes

DB_PORT

5432

No


Organization-Level Variable Groups

For shared credentials across projects:

  1. Organization admin goes to Organization Settings → Environment Variables

  2. Add new environment variable group

  3. Create group (e.g., "Production Database Credentials")

  4. Add variables

  5. Projects can inherit these groups

Benefits:

  • Central credential management

  • One update affects all projects

  • Easier rotation

  • Consistent naming


Common Patterns

Daily Snapshot for Real-Time Features

# Daily snapshot with date in filename
from datetime import datetime

today = datetime.now().strftime('%Y-%m-%d')
snapshot_path = f'/valohai/outputs/daily_snapshot_{today}.csv'

df.to_csv(snapshot_path)

metadata = {
    f"daily_snapshot_{today}.csv": {
        "valohai.dataset-versions": [{
            "uri": f"dataset://daily-features/{today}"
        }]
    }
}

Incremental Snapshots

# Only fetch new data since last snapshot
query = """
    SELECT * FROM events
    WHERE event_date > '{last_snapshot_date}'
"""

# Append to existing dataset version
metadata = {
    "incremental_data.csv": {
        "valohai.dataset-versions": [{
            "uri": "dataset://events/2024-q1",
            "from": "dataset://events/2024-q1-partial"
        }]
    }
}

Train/Validation/Test Splits from Database

# Create three snapshots from one query
df = pd.read_sql(query, conn)

train, temp = train_test_split(df, test_size=0.3)
val, test = train_test_split(temp, test_size=0.5)

train.to_csv('/valohai/outputs/train.csv')
val.to_csv('/valohai/outputs/validation.csv')
test.to_csv('/valohai/outputs/test.csv')

# Create three dataset versions
metadata = {
    "train.csv": {
        "valohai.dataset-versions": [{"uri": "dataset://data/train-2024-q1"}]
    },
    "validation.csv": {
        "valohai.dataset-versions": [{"uri": "dataset://data/val-2024-q1"}]
    },
    "test.csv": {
        "valohai.dataset-versions": [{"uri": "dataset://data/test-2024-q1"}]
    }
}

Database-Specific Guides

Each database has unique setup requirements and best practices:

  • IAM Role authentication (recommended)

  • Username/password authentication

  • Security group configuration

  • Python and psql examples

  • Service Account authentication (recommended)

  • Cross-project access

  • Python client setup

  • Query optimization

  • Username/password authentication

  • Time Travel feature integration

  • Python connector setup

  • Dependency management



Next Steps

  • Choose your database from the guides above

  • Set up authentication (machine identity or credentials)

  • Create your first querysnapshottrain pipeline

  • Schedule regular snapshots for fresh data

  • Build reproducible ML pipelines with versioned data

Last updated

Was this helpful?