Snowflake

Query Snowflake from Valohai executions and save snapshots for reproducible ML pipelines.


Overview

Snowflake is a cloud data warehouse that you can query directly from Valohai executions. This guide shows you how to:

  1. Store Snowflake credentials securely

  2. Query Snowflake from your code

  3. Save snapshots for reproducibility

  4. Use Snowflake Time Travel features


Prerequisites

Before you begin:

  • Existing Snowflake account with a database containing your data

  • Snowflake credentials (username, password, account identifier)

  • Database access for the user account


Store Credentials in Valohai

Authenticate to Snowflake using username, password, and account identifier stored as environment variables.

Step 1: Find Your Snowflake Account Identifier

Your account identifier format depends on your Snowflake deployment:

Format examples:

  • xy12345.us-east-1 (AWS)

  • xy12345.us-central1.gcp (GCP)

  • xy12345.east-us-2.azure (Azure)

Find it in your Snowflake URL: https://<account_identifier>.snowflakecomputing.com


Step 2: Add Environment Variables

  1. Open your project in Valohai

  2. Go to Settings → Env Variables

  3. Add the following variables:

Name
Value
Secret

SNOWFLAKE_USER

Your Snowflake username

No

SNOWFLAKE_PASSWORD

Your Snowflake password

Yes

SNOWFLAKE_ACCOUNT

Account identifier (e.g., xy12345.us-east-1)

No

SNOWFLAKE_WAREHOUSE

Warehouse name (e.g., COMPUTE_WH)

No

SNOWFLAKE_DATABASE

Database name

No

SNOWFLAKE_SCHEMA

Schema name (e.g., PUBLIC)

No

💡 Environment Variable Groups: Organization admins can create shared credential groups under Organization Settings → Environment Variable Groups instead of configuring each project separately.


Install Snowflake Connector

The Snowflake Python connector requires Python 3.8+. Install the connector and its dependencies in your execution.

valohai.yaml:

- step:
    name: query-snowflake
    image: python:3.11
    command:
      # Install Snowflake connector
      - pip install snowflake-connector-python
      # Run your script
      - python query_snowflake.py

Option 2: Include in Docker Image

Dockerfile:

FROM python:3.11

# Install Snowflake dependencies
RUN pip install snowflake-connector-python pandas

# Copy your code
COPY . /app
WORKDIR /app

Query Snowflake

Basic Query Example

query_snowflake.py:

import snowflake.connector
import pandas as pd
import os

# Connect to Snowflake using environment variables
conn = snowflake.connector.connect(
    user=os.environ["SNOWFLAKE_USER"],
    password=os.environ["SNOWFLAKE_PASSWORD"],
    account=os.environ["SNOWFLAKE_ACCOUNT"],
    warehouse=os.environ["SNOWFLAKE_WAREHOUSE"],
    database=os.environ["SNOWFLAKE_DATABASE"],
    schema=os.environ["SNOWFLAKE_SCHEMA"],
)

# Create cursor
cursor = conn.cursor()

try:
    # Query data
    query = """
        SELECT
            customer_id,
            order_date,
            total_amount,
            product_category
        FROM orders
        WHERE order_date >= '2024-01-01'
        ORDER BY order_date DESC
    """

    cursor.execute(query)

    # Fetch results
    columns = [desc[0] for desc in cursor.description]
    results = cursor.fetchall()

    print(f"Fetched {len(results)} rows from Snowflake")

    # Convert to DataFrame
    df = pd.DataFrame(results, columns=columns)

    # Save snapshot for reproducibility
    snapshot_path = "/valohai/outputs/orders_snapshot.csv"
    df.to_csv(snapshot_path, index=False)

    print(f"Saved snapshot to {snapshot_path}")

finally:
    cursor.close()
    conn.close()

Complete Workflow: Query → Snapshot → Train

Step 1: Query and Save Snapshot

fetch_data.py:

import snowflake.connector
import pandas as pd
import json
import os
from datetime import datetime

# Connect to Snowflake
conn = snowflake.connector.connect(
    user=os.environ["SNOWFLAKE_USER"],
    password=os.environ["SNOWFLAKE_PASSWORD"],
    account=os.environ["SNOWFLAKE_ACCOUNT"],
    warehouse=os.environ["SNOWFLAKE_WAREHOUSE"],
    database=os.environ["SNOWFLAKE_DATABASE"],
    schema=os.environ["SNOWFLAKE_SCHEMA"],
)

cursor = conn.cursor()

try:
    # Set warehouse
    cursor.execute(f"USE WAREHOUSE {os.environ['SNOWFLAKE_WAREHOUSE']}")

    # Query training data
    query = """
        SELECT
            customer_id,
            age,
            income,
            account_balance,
            transaction_count,
            last_transaction_date,
            churn_flag
        FROM customer_features
        WHERE feature_date = CURRENT_DATE() - 1
        AND data_quality_score > 0.95
    """

    print("Executing Snowflake query...")
    cursor.execute(query)

    # Fetch results
    columns = [desc[0] for desc in cursor.description]
    results = cursor.fetchall()

    print(f"Fetched {len(results)} rows from Snowflake")

    # Convert to DataFrame
    df = pd.DataFrame(results, columns=columns)

    # Basic validation
    assert len(df) > 0, "No data returned from query"
    assert df["churn_flag"].notna().all(), "Missing labels in data"

    # Save snapshot
    snapshot_path = "/valohai/outputs/training_data.csv"
    df.to_csv(snapshot_path, index=False)

    # Create dataset version with metadata
    metadata = {
        "training_data.csv": {
            "valohai.dataset-versions": [
                {
                    "uri": "dataset://customer-churn/daily-snapshot",
                },
            ],
            "valohai.tags": ["snowflake", "daily", "production"],
            "query_date": datetime.now().isoformat(),
            "row_count": len(df),
            "source_table": "customer_features",
            "snowflake_account": os.environ["SNOWFLAKE_ACCOUNT"],
            "snowflake_warehouse": os.environ["SNOWFLAKE_WAREHOUSE"],
        },
    }

    metadata_path = "/valohai/outputs/valohai.metadata.jsonl"
    with open(metadata_path, "w") as f:
        for filename, file_metadata in metadata.items():
            json.dump({"file": filename, "metadata": file_metadata}, f)
            f.write("\n")

    print("Created dataset version: dataset://customer-churn/daily-snapshot")

finally:
    cursor.close()
    conn.close()

Step 2: Train on Snapshot

train.py:

import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, roc_auc_score
import pickle

# Load data from snapshot (not live Snowflake!)
data_path = "/valohai/inputs/training-data/training_data.csv"
df = pd.read_csv(data_path)

print(f"Training on {len(df)} rows from snapshot")

# Prepare features
feature_columns = ["age", "income", "account_balance", "transaction_count"]
X = df[feature_columns]
y = df["churn_flag"]

# Split data
X_train, X_test, y_train, y_test = train_test_split(
    X,
    y,
    test_size=0.2,
    random_state=42,
)

print(f"Training set: {len(X_train)} rows")
print(f"Test set: {len(X_test)} rows")

# Train model
model = RandomForestClassifier(
    n_estimators=100,
    max_depth=10,
    random_state=42,
)

model.fit(X_train, y_train)

# Evaluate
y_pred = model.predict(X_test)
y_pred_proba = model.predict_proba(X_test)[:, 1]

accuracy = model.score(X_test, y_test)
auc = roc_auc_score(y_test, y_pred_proba)

print(f"Accuracy: {accuracy:.4f}")
print(f"ROC AUC: {auc:.4f}")
print("\nClassification Report:")
print(classification_report(y_test, y_pred))

# Save model
model_path = "/valohai/outputs/churn_model.pkl"
with open(model_path, "wb") as f:
    pickle.dump(model, f)

print(f"Saved model to {model_path}")

Step 3: Pipeline Configuration

valohai.yaml:

- step:
    name: fetch-from-snowflake
    image: python:3.11
    command:
      - pip install snowflake-connector-python pandas
      - python fetch_data.py
    environment-variables:
      - name: SNOWFLAKE_USER
        default: ml_pipeline_user
      - name: SNOWFLAKE_PASSWORD
        default: secret-password
      - name: SNOWFLAKE_ACCOUNT
        default: xy12345.us-east-1
      - name: SNOWFLAKE_WAREHOUSE
        default: COMPUTE_WH
      - name: SNOWFLAKE_DATABASE
        default: ANALYTICS
      - name: SNOWFLAKE_SCHEMA
        default: PUBLIC

- step:
    name: train-churn-model
    image: python:3.11
    command:
      - pip install pandas scikit-learn
      - python train.py
    inputs:
      - name: training-data
        default: dataset://customer-churn/daily-snapshot

Maintaining Reproducibility

⚠️ Critical: Snowflake data changes continuously. Query results today differ from results tomorrow.

The problem:

# Training today
df = pd.read_sql(query, conn)  # Returns 50,000 rows
model.fit(df[features], df[target])

# Retraining next month (same query)
df = pd.read_sql(query, conn)  # Returns 65,000 rows
# Different data = different model = can't reproduce original results

The solution:

# Always save snapshots
df = pd.read_sql(query, conn)
df.to_csv("/valohai/outputs/snapshot.csv")  # Immutable snapshot

# Train on snapshot (stored in cloud storage)
# Can reproduce exactly, even months/years later

Best practices:

  1. Query once — Run query in dedicated execution

  2. Snapshot immediately — Save to /valohai/outputs/

  3. Version snapshots — Create dataset versions

  4. Train on snapshots — Use dataset as input, never query directly in training

  5. Use Time Travel for debugging — But snapshot for reproducibility

See: Databases for complete reproducibility patterns.


Common Issues & Fixes

Connection Failed

Symptom: snowflake.connector.errors.DatabaseError: 250001

Causes & Fixes:

  • Wrong account identifier → Verify format (include region and cloud)

  • Wrong username/password → Check credentials in Snowflake UI

  • Network connectivity → Check firewall/VPN settings


Warehouse Not Running

Symptom: SQL compilation error: Object does not exist

Causes & Fixes:

  • Warehouse suspended → Resume warehouse: ALTER WAREHOUSE COMPUTE_WH RESUME

  • Wrong warehouse name → Verify warehouse exists: SHOW WAREHOUSES


Insufficient Privileges

Symptom: SQL access control error: Insufficient privileges

Causes & Fixes:

  • User missing permissions → Grant necessary roles: GRANT SELECT ON DATABASE analytics TO ROLE ml_role

  • Wrong role active → Use correct role: cursor.execute("USE ROLE ml_role")


Python Version Incompatibility

Symptom: Import errors or dependency conflicts

Causes & Fixes:

  • Wrong requirements file → Use correct version for your Python (e.g., requirements_39.reqs for Python 3.9)

  • Missing dependencies → Install tested requirements before connector



Next Steps

  • Store Snowflake credentials in Valohai

  • Create a test query execution

  • Save your first snapshot as a dataset version

Last updated

Was this helpful?