Snowflake
Query Snowflake from Valohai executions and save snapshots for reproducible ML pipelines.
Overview
Snowflake is a cloud data warehouse that you can query directly from Valohai executions. This guide shows you how to:
Store Snowflake credentials securely
Query Snowflake from your code
Save snapshots for reproducibility
Use Snowflake Time Travel features
Prerequisites
Before you begin:
Existing Snowflake account with a database containing your data
Snowflake credentials (username, password, account identifier)
Database access for the user account
Store Credentials in Valohai
Authenticate to Snowflake using username, password, and account identifier stored as environment variables.
Step 1: Find Your Snowflake Account Identifier
Your account identifier format depends on your Snowflake deployment:
Format examples:
xy12345.us-east-1(AWS)xy12345.us-central1.gcp(GCP)xy12345.east-us-2.azure(Azure)
Find it in your Snowflake URL: https://<account_identifier>.snowflakecomputing.com
Step 2: Add Environment Variables
Open your project in Valohai
Go to Settings → Env Variables
Add the following variables:
SNOWFLAKE_USER
Your Snowflake username
No
SNOWFLAKE_PASSWORD
Your Snowflake password
Yes
SNOWFLAKE_ACCOUNT
Account identifier (e.g., xy12345.us-east-1)
No
SNOWFLAKE_WAREHOUSE
Warehouse name (e.g., COMPUTE_WH)
No
SNOWFLAKE_DATABASE
Database name
No
SNOWFLAKE_SCHEMA
Schema name (e.g., PUBLIC)
No
💡 Environment Variable Groups: Organization admins can create shared credential groups under Organization Settings → Environment Variable Groups instead of configuring each project separately.
Install Snowflake Connector
The Snowflake Python connector requires Python 3.8+. Install the connector and its dependencies in your execution.
Option 1: Install in Command (Recommended)
valohai.yaml:
- step:
name: query-snowflake
image: python:3.11
command:
# Install Snowflake connector
- pip install snowflake-connector-python
# Run your script
- python query_snowflake.pyOption 2: Include in Docker Image
Dockerfile:
FROM python:3.11
# Install Snowflake dependencies
RUN pip install snowflake-connector-python pandas
# Copy your code
COPY . /app
WORKDIR /appQuery Snowflake
Basic Query Example
query_snowflake.py:
import snowflake.connector
import pandas as pd
import os
# Connect to Snowflake using environment variables
conn = snowflake.connector.connect(
user=os.environ['SNOWFLAKE_USER'],
password=os.environ['SNOWFLAKE_PASSWORD'],
account=os.environ['SNOWFLAKE_ACCOUNT'],
warehouse=os.environ['SNOWFLAKE_WAREHOUSE'],
database=os.environ['SNOWFLAKE_DATABASE'],
schema=os.environ['SNOWFLAKE_SCHEMA']
)
# Create cursor
cursor = conn.cursor()
try:
# Query data
query = """
SELECT
customer_id,
order_date,
total_amount,
product_category
FROM orders
WHERE order_date >= '2024-01-01'
ORDER BY order_date DESC
"""
cursor.execute(query)
# Fetch results
columns = [desc[0] for desc in cursor.description]
results = cursor.fetchall()
print(f"Fetched {len(results)} rows from Snowflake")
# Convert to DataFrame
df = pd.DataFrame(results, columns=columns)
# Save snapshot for reproducibility
snapshot_path = '/valohai/outputs/orders_snapshot.csv'
df.to_csv(snapshot_path, index=False)
print(f"Saved snapshot to {snapshot_path}")
finally:
cursor.close()
conn.close()Complete Workflow: Query → Snapshot → Train
Step 1: Query and Save Snapshot
fetch_data.py:
import snowflake.connector
import pandas as pd
import json
import os
from datetime import datetime
# Connect to Snowflake
conn = snowflake.connector.connect(
user=os.environ['SNOWFLAKE_USER'],
password=os.environ['SNOWFLAKE_PASSWORD'],
account=os.environ['SNOWFLAKE_ACCOUNT'],
warehouse=os.environ['SNOWFLAKE_WAREHOUSE'],
database=os.environ['SNOWFLAKE_DATABASE'],
schema=os.environ['SNOWFLAKE_SCHEMA']
)
cursor = conn.cursor()
try:
# Set warehouse
cursor.execute(f"USE WAREHOUSE {os.environ['SNOWFLAKE_WAREHOUSE']}")
# Query training data
query = """
SELECT
customer_id,
age,
income,
account_balance,
transaction_count,
last_transaction_date,
churn_flag
FROM customer_features
WHERE feature_date = CURRENT_DATE() - 1
AND data_quality_score > 0.95
"""
print("Executing Snowflake query...")
cursor.execute(query)
# Fetch results
columns = [desc[0] for desc in cursor.description]
results = cursor.fetchall()
print(f"Fetched {len(results)} rows from Snowflake")
# Convert to DataFrame
df = pd.DataFrame(results, columns=columns)
# Basic validation
assert len(df) > 0, "No data returned from query"
assert df['churn_flag'].notna().all(), "Missing labels in data"
# Save snapshot
snapshot_path = '/valohai/outputs/training_data.csv'
df.to_csv(snapshot_path, index=False)
# Create dataset version with metadata
metadata = {
"training_data.csv": {
"valohai.dataset-versions": [{
"uri": "dataset://customer-churn/daily-snapshot"
}],
"valohai.tags": ["snowflake", "daily", "production"],
"query_date": datetime.now().isoformat(),
"row_count": len(df),
"source_table": "customer_features",
"snowflake_account": os.environ['SNOWFLAKE_ACCOUNT'],
"snowflake_warehouse": os.environ['SNOWFLAKE_WAREHOUSE']
}
}
metadata_path = '/valohai/outputs/valohai.metadata.jsonl'
with open(metadata_path, 'w') as f:
for filename, file_metadata in metadata.items():
json.dump({"file": filename, "metadata": file_metadata}, f)
f.write('\n')
print("Created dataset version: dataset://customer-churn/daily-snapshot")
finally:
cursor.close()
conn.close()Step 2: Train on Snapshot
train.py:
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, roc_auc_score
import pickle
# Load data from snapshot (not live Snowflake!)
data_path = '/valohai/inputs/training-data/training_data.csv'
df = pd.read_csv(data_path)
print(f"Training on {len(df)} rows from snapshot")
# Prepare features
feature_columns = ['age', 'income', 'account_balance', 'transaction_count']
X = df[feature_columns]
y = df['churn_flag']
# Split data
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
print(f"Training set: {len(X_train)} rows")
print(f"Test set: {len(X_test)} rows")
# Train model
model = RandomForestClassifier(
n_estimators=100,
max_depth=10,
random_state=42
)
model.fit(X_train, y_train)
# Evaluate
y_pred = model.predict(X_test)
y_pred_proba = model.predict_proba(X_test)[:, 1]
accuracy = model.score(X_test, y_test)
auc = roc_auc_score(y_test, y_pred_proba)
print(f"Accuracy: {accuracy:.4f}")
print(f"ROC AUC: {auc:.4f}")
print("\nClassification Report:")
print(classification_report(y_test, y_pred))
# Save model
model_path = '/valohai/outputs/churn_model.pkl'
with open(model_path, 'wb') as f:
pickle.dump(model, f)
print(f"Saved model to {model_path}")Step 3: Pipeline Configuration
valohai.yaml:
- step:
name: fetch-from-snowflake
image: python:3.11
command:
- pip install snowflake-connector-python pandas
- python fetch_data.py
environment-variables:
- name: SNOWFLAKE_USER
default: ml_pipeline_user
- name: SNOWFLAKE_PASSWORD
default: secret-password
- name: SNOWFLAKE_ACCOUNT
default: xy12345.us-east-1
- name: SNOWFLAKE_WAREHOUSE
default: COMPUTE_WH
- name: SNOWFLAKE_DATABASE
default: ANALYTICS
- name: SNOWFLAKE_SCHEMA
default: PUBLIC
- step:
name: train-churn-model
image: python:3.11
command:
- pip install pandas scikit-learn
- python train.py
inputs:
- name: training-data
default: dataset://customer-churn/daily-snapshotMaintaining Reproducibility
⚠️ Critical: Snowflake data changes continuously. Query results today differ from results tomorrow.
The problem:
# Training today
df = pd.read_sql(query, conn) # Returns 50,000 rows
model.fit(df[features], df[target])
# Retraining next month (same query)
df = pd.read_sql(query, conn) # Returns 65,000 rows
# Different data = different model = can't reproduce original resultsThe solution:
# Always save snapshots
df = pd.read_sql(query, conn)
df.to_csv('/valohai/outputs/snapshot.csv') # Immutable snapshot
# Train on snapshot (stored in cloud storage)
# Can reproduce exactly, even months/years laterBest practices:
Query once — Run query in dedicated execution
Snapshot immediately — Save to
/valohai/outputs/Version snapshots — Create dataset versions
Train on snapshots — Use dataset as input, never query directly in training
Use Time Travel for debugging — But snapshot for reproducibility
See: Databases for complete reproducibility patterns.
Common Issues & Fixes
Connection Failed
Symptom: snowflake.connector.errors.DatabaseError: 250001
Causes & Fixes:
Wrong account identifier → Verify format (include region and cloud)
Wrong username/password → Check credentials in Snowflake UI
Network connectivity → Check firewall/VPN settings
Warehouse Not Running
Symptom: SQL compilation error: Object does not exist
Causes & Fixes:
Warehouse suspended → Resume warehouse:
ALTER WAREHOUSE COMPUTE_WH RESUMEWrong warehouse name → Verify warehouse exists:
SHOW WAREHOUSES
Insufficient Privileges
Symptom: SQL access control error: Insufficient privileges
Causes & Fixes:
User missing permissions → Grant necessary roles:
GRANT SELECT ON DATABASE analytics TO ROLE ml_roleWrong role active → Use correct role:
cursor.execute("USE ROLE ml_role")
Python Version Incompatibility
Symptom: Import errors or dependency conflicts
Causes & Fixes:
Wrong requirements file → Use correct version for your Python (e.g.,
requirements_39.reqsfor Python 3.9)Missing dependencies → Install tested requirements before connector
Related Pages
Databases — Overview and reproducibility patterns
AWS Redshift — Connect to Amazon Redshift
BigQuery — Connect to Google BigQuery
Create and Manage Datasets — Version your snapshots
Next Steps
Store Snowflake credentials in Valohai
Create a test query execution
Save your first snapshot as a dataset version
Last updated
Was this helpful?
