Snowflake

Query Snowflake from Valohai executions and save snapshots for reproducible ML pipelines.


Overview

Snowflake is a cloud data warehouse that you can query directly from Valohai executions. This guide shows you how to:

  1. Store Snowflake credentials securely

  2. Query Snowflake from your code

  3. Save snapshots for reproducibility

  4. Use Snowflake Time Travel features


Prerequisites

Before you begin:

  • Existing Snowflake account with a database containing your data

  • Snowflake credentials (username, password, account identifier)

  • Database access for the user account


Store Credentials in Valohai

Authenticate to Snowflake using username, password, and account identifier stored as environment variables.

Step 1: Find Your Snowflake Account Identifier

Your account identifier format depends on your Snowflake deployment:

Format examples:

  • xy12345.us-east-1 (AWS)

  • xy12345.us-central1.gcp (GCP)

  • xy12345.east-us-2.azure (Azure)

Find it in your Snowflake URL: https://<account_identifier>.snowflakecomputing.com


Step 2: Add Environment Variables

  1. Open your project in Valohai

  2. Go to Settings → Env Variables

  3. Add the following variables:

Name
Value
Secret

SNOWFLAKE_USER

Your Snowflake username

No

SNOWFLAKE_PASSWORD

Your Snowflake password

Yes

SNOWFLAKE_ACCOUNT

Account identifier (e.g., xy12345.us-east-1)

No

SNOWFLAKE_WAREHOUSE

Warehouse name (e.g., COMPUTE_WH)

No

SNOWFLAKE_DATABASE

Database name

No

SNOWFLAKE_SCHEMA

Schema name (e.g., PUBLIC)

No

💡 Environment Variable Groups: Organization admins can create shared credential groups under Organization Settings → Environment Variable Groups instead of configuring each project separately.


Install Snowflake Connector

The Snowflake Python connector requires Python 3.8+. Install the connector and its dependencies in your execution.

valohai.yaml:

- step:
    name: query-snowflake
    image: python:3.11
    command:
      # Install Snowflake connector
      - pip install snowflake-connector-python
      # Run your script
      - python query_snowflake.py

Option 2: Include in Docker Image

Dockerfile:

FROM python:3.11

# Install Snowflake dependencies
RUN pip install snowflake-connector-python pandas

# Copy your code
COPY . /app
WORKDIR /app

Query Snowflake

Basic Query Example

query_snowflake.py:

import snowflake.connector
import pandas as pd
import os

# Connect to Snowflake using environment variables
conn = snowflake.connector.connect(
    user=os.environ['SNOWFLAKE_USER'],
    password=os.environ['SNOWFLAKE_PASSWORD'],
    account=os.environ['SNOWFLAKE_ACCOUNT'],
    warehouse=os.environ['SNOWFLAKE_WAREHOUSE'],
    database=os.environ['SNOWFLAKE_DATABASE'],
    schema=os.environ['SNOWFLAKE_SCHEMA']
)

# Create cursor
cursor = conn.cursor()

try:
    # Query data
    query = """
        SELECT 
            customer_id,
            order_date,
            total_amount,
            product_category
        FROM orders
        WHERE order_date >= '2024-01-01'
        ORDER BY order_date DESC
    """
    
    cursor.execute(query)
    
    # Fetch results
    columns = [desc[0] for desc in cursor.description]
    results = cursor.fetchall()
    
    print(f"Fetched {len(results)} rows from Snowflake")
    
    # Convert to DataFrame
    df = pd.DataFrame(results, columns=columns)
    
    # Save snapshot for reproducibility
    snapshot_path = '/valohai/outputs/orders_snapshot.csv'
    df.to_csv(snapshot_path, index=False)
    
    print(f"Saved snapshot to {snapshot_path}")

finally:
    cursor.close()
    conn.close()

Complete Workflow: Query → Snapshot → Train

Step 1: Query and Save Snapshot

fetch_data.py:

import snowflake.connector
import pandas as pd
import json
import os
from datetime import datetime

# Connect to Snowflake
conn = snowflake.connector.connect(
    user=os.environ['SNOWFLAKE_USER'],
    password=os.environ['SNOWFLAKE_PASSWORD'],
    account=os.environ['SNOWFLAKE_ACCOUNT'],
    warehouse=os.environ['SNOWFLAKE_WAREHOUSE'],
    database=os.environ['SNOWFLAKE_DATABASE'],
    schema=os.environ['SNOWFLAKE_SCHEMA']
)

cursor = conn.cursor()

try:
    # Set warehouse
    cursor.execute(f"USE WAREHOUSE {os.environ['SNOWFLAKE_WAREHOUSE']}")
    
    # Query training data
    query = """
        SELECT 
            customer_id,
            age,
            income,
            account_balance,
            transaction_count,
            last_transaction_date,
            churn_flag
        FROM customer_features
        WHERE feature_date = CURRENT_DATE() - 1
        AND data_quality_score > 0.95
    """
    
    print("Executing Snowflake query...")
    cursor.execute(query)
    
    # Fetch results
    columns = [desc[0] for desc in cursor.description]
    results = cursor.fetchall()
    
    print(f"Fetched {len(results)} rows from Snowflake")
    
    # Convert to DataFrame
    df = pd.DataFrame(results, columns=columns)
    
    # Basic validation
    assert len(df) > 0, "No data returned from query"
    assert df['churn_flag'].notna().all(), "Missing labels in data"
    
    # Save snapshot
    snapshot_path = '/valohai/outputs/training_data.csv'
    df.to_csv(snapshot_path, index=False)
    
    # Create dataset version with metadata
    metadata = {
        "training_data.csv": {
            "valohai.dataset-versions": [{
                "uri": "dataset://customer-churn/daily-snapshot"
            }],
            "valohai.tags": ["snowflake", "daily", "production"],
            "query_date": datetime.now().isoformat(),
            "row_count": len(df),
            "source_table": "customer_features",
            "snowflake_account": os.environ['SNOWFLAKE_ACCOUNT'],
            "snowflake_warehouse": os.environ['SNOWFLAKE_WAREHOUSE']
        }
    }
    
    metadata_path = '/valohai/outputs/valohai.metadata.jsonl'
    with open(metadata_path, 'w') as f:
        for filename, file_metadata in metadata.items():
            json.dump({"file": filename, "metadata": file_metadata}, f)
            f.write('\n')
    
    print("Created dataset version: dataset://customer-churn/daily-snapshot")

finally:
    cursor.close()
    conn.close()

Step 2: Train on Snapshot

train.py:

import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, roc_auc_score
import pickle

# Load data from snapshot (not live Snowflake!)
data_path = '/valohai/inputs/training-data/training_data.csv'
df = pd.read_csv(data_path)

print(f"Training on {len(df)} rows from snapshot")

# Prepare features
feature_columns = ['age', 'income', 'account_balance', 'transaction_count']
X = df[feature_columns]
y = df['churn_flag']

# Split data
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)

print(f"Training set: {len(X_train)} rows")
print(f"Test set: {len(X_test)} rows")

# Train model
model = RandomForestClassifier(
    n_estimators=100,
    max_depth=10,
    random_state=42
)

model.fit(X_train, y_train)

# Evaluate
y_pred = model.predict(X_test)
y_pred_proba = model.predict_proba(X_test)[:, 1]

accuracy = model.score(X_test, y_test)
auc = roc_auc_score(y_test, y_pred_proba)

print(f"Accuracy: {accuracy:.4f}")
print(f"ROC AUC: {auc:.4f}")
print("\nClassification Report:")
print(classification_report(y_test, y_pred))

# Save model
model_path = '/valohai/outputs/churn_model.pkl'
with open(model_path, 'wb') as f:
    pickle.dump(model, f)

print(f"Saved model to {model_path}")

Step 3: Pipeline Configuration

valohai.yaml:

- step:
    name: fetch-from-snowflake
    image: python:3.11
    command:
      - pip install snowflake-connector-python pandas
      - python fetch_data.py
    environment-variables:
      - name: SNOWFLAKE_USER
        default: ml_pipeline_user
      - name: SNOWFLAKE_PASSWORD
        default: secret-password
      - name: SNOWFLAKE_ACCOUNT
        default: xy12345.us-east-1
      - name: SNOWFLAKE_WAREHOUSE
        default: COMPUTE_WH
      - name: SNOWFLAKE_DATABASE
        default: ANALYTICS
      - name: SNOWFLAKE_SCHEMA
        default: PUBLIC

- step:
    name: train-churn-model
    image: python:3.11
    command:
      - pip install pandas scikit-learn
      - python train.py
    inputs:
      - name: training-data
        default: dataset://customer-churn/daily-snapshot

Maintaining Reproducibility

⚠️ Critical: Snowflake data changes continuously. Query results today differ from results tomorrow.

The problem:

# Training today
df = pd.read_sql(query, conn)  # Returns 50,000 rows
model.fit(df[features], df[target])

# Retraining next month (same query)
df = pd.read_sql(query, conn)  # Returns 65,000 rows
# Different data = different model = can't reproduce original results

The solution:

# Always save snapshots
df = pd.read_sql(query, conn)
df.to_csv('/valohai/outputs/snapshot.csv')  # Immutable snapshot

# Train on snapshot (stored in cloud storage)
# Can reproduce exactly, even months/years later

Best practices:

  1. Query once — Run query in dedicated execution

  2. Snapshot immediately — Save to /valohai/outputs/

  3. Version snapshots — Create dataset versions

  4. Train on snapshots — Use dataset as input, never query directly in training

  5. Use Time Travel for debugging — But snapshot for reproducibility

See: Databases for complete reproducibility patterns.


Common Issues & Fixes

Connection Failed

Symptom: snowflake.connector.errors.DatabaseError: 250001

Causes & Fixes:

  • Wrong account identifier → Verify format (include region and cloud)

  • Wrong username/password → Check credentials in Snowflake UI

  • Network connectivity → Check firewall/VPN settings


Warehouse Not Running

Symptom: SQL compilation error: Object does not exist

Causes & Fixes:

  • Warehouse suspended → Resume warehouse: ALTER WAREHOUSE COMPUTE_WH RESUME

  • Wrong warehouse name → Verify warehouse exists: SHOW WAREHOUSES


Insufficient Privileges

Symptom: SQL access control error: Insufficient privileges

Causes & Fixes:

  • User missing permissions → Grant necessary roles: GRANT SELECT ON DATABASE analytics TO ROLE ml_role

  • Wrong role active → Use correct role: cursor.execute("USE ROLE ml_role")


Python Version Incompatibility

Symptom: Import errors or dependency conflicts

Causes & Fixes:

  • Wrong requirements file → Use correct version for your Python (e.g., requirements_39.reqs for Python 3.9)

  • Missing dependencies → Install tested requirements before connector



Next Steps

  • Store Snowflake credentials in Valohai

  • Create a test query execution

  • Save your first snapshot as a dataset version

Last updated

Was this helpful?