AWS Redshift
Query AWS Redshift from Valohai executions and save snapshots for reproducible ML pipelines.
Overview
AWS Redshift is a cloud data warehouse that you can query directly from Valohai executions. This guide shows you how to:
Set up authentication (IAM roles or credentials)
Query Redshift from your code
Save snapshots for reproducibility
Prerequisites
Before you begin:
Existing Redshift cluster with data to query
Network access — Inbound rule allowing connections from Valohai workers' security group
Credentials or IAM role for authentication
Authentication: Choose Your Method
Decision Tree
Are you running Valohai on AWS?
├─ Yes: Use IAM Role (machine identity) **Recommended**
│ More secure, no credentials to manage
│
└─ No: Use Username/Password
Works anywhere, requires credential managementOption 1: IAM Role Authentication (Recommended)
Use your EC2 instances' IAM identity to authenticate without storing credentials.
How It Works
Your AWS account has an IAM Role for EC2 instances used by Valohai (e.g.,
ValohaiWorkerRole)Grant this role permission to access Redshift
Executions automatically authenticate using the machine's identity
No credentials stored in Valohai
Benefits
No credentials to manage or rotate
More secure (no secrets in environment variables)
Automatic authentication
Fine-grained permissions per Valohai environment
Step 1: Create IAM Policy
Grant the ValohaiWorkerRole permission to get Redshift credentials.
IAM Policy JSON:
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "RedshiftAccess",
"Effect": "Allow",
"Action": "redshift:GetClusterCredentials",
"Resource": [
"arn:aws:redshift:<region>:<account-id>:dbuser:<cluster-identifier>/<db-username>",
"arn:aws:redshift:*:<account-id>:dbgroup:*/*",
"arn:aws:redshift:<region>:<account-id>:dbname:<cluster-identifier>/<database-name>"
]
}
]
}Replace these values:
<region>— AWS region (e.g.,us-east-1)<account-id>— Your AWS account ID<cluster-identifier>— Redshift cluster identifier<db-username>— Database user for Valohai<database-name>— Database name
Policy explanation:
redshift:GetClusterCredentials— Allows temporary credential generationdbuserresource — Specifies which database user Valohai can impersonatedbnameresource — Restricts access to specific database
Step 2: Attach Policy to Valohai Role
In AWS IAM, find your Valohai worker role (e.g.,
ValohaiWorkerRole)Attach the policy you created above
Valohai executions can now authenticate to Redshift
💡 Multiple environments: Create separate IAM roles for dev/staging/production Valohai environments to control access granularly.
Step 3: Query Using IAM Authentication
Python example with IAM authentication:
import redshift_connector
import requests
import json
# Fetch credentials from EC2 instance metadata
aws_metadata_ip = '169.254.169.254'
role_name = 'ValohaiWorkerRole'
def get_aws_credentials():
with requests.Session() as session:
token_resp = session.put(
url=f"http://{aws_metadata_ip}/latest/api/token",
headers={"X-aws-ec2-metadata-token-ttl-seconds": "60"},
timeout=5,
)
req_headers = {"X-aws-ec2-metadata-token": token_resp.text}
role_resp = session.get(
url=f"http://{aws_metadata_ip}/latest/meta-data/iam/security-credentials/",
headers=req_headers,
timeout=5,
)
role_name = role_resp.text
credentials_resp = session.get(
url=f"http://{aws_metadata_ip}/latest/meta-data/iam/security-credentials/{role_name}",
headers=req_headers,
timeout=5,
)
return credentials_resp.json()
# Obtain AWS credentials
credentials = get_aws_credentials()
# Redshift connection details
host = 'my-cluster.abc123.us-east-1.redshift.amazonaws.com'
database = 'analytics'
db_user = 'valohai_user'
cluster_identifier = 'my-cluster'
region = 'us-east-1'
# Connect using IAM authentication
conn = redshift_connector.connect(
iam=True,
host=host,
database=database,
db_user=db_user,
cluster_identifier=cluster_identifier,
access_key_id=credentials["AccessKeyId"],
secret_access_key=credentials["SecretAccessKey"],
session_token=credentials["Token"],
region=region
)
# Query data
query = """
SELECT
customer_id,
order_date,
total_amount,
product_category
FROM orders
WHERE order_date >= '2024-01-01'
"""
cursor = conn.cursor()
cursor.execute(query)
results = cursor.fetchall()
print(f"Fetched {len(results)} rows from Redshift")
# Save snapshot for reproducibility
import pandas as pd
df = pd.DataFrame(results, columns=[desc[0] for desc in cursor.description])
snapshot_path = '/valohai/outputs/orders_snapshot.csv'
df.to_csv(snapshot_path, index=False)
print(f"Saved snapshot to {snapshot_path}")
cursor.close()
conn.close()⚠️ The example above only shows the "happy path" - assuming that all the requests will be successful and return expected data.
For a full example, take a look at this GitHub page.
Install dependencies:
Add to your valohai.yaml command:
command:
- pip install redshift_connector pandas
- python query_redshift.pyOr include in your Docker image:
RUN pip install redshift_connector pandasOption 2: Username/Password Authentication
Store Redshift credentials as Valohai environment variables.
Step 1: Store Credentials in Valohai
Open your project in Valohai
Go to Settings → Env Variables
Add the following variables:
REDSHIFT_HOST
my-cluster.abc123.us-east-1.redshift.amazonaws.com
No
REDSHIFT_DATABASE
analytics
No
REDSHIFT_PORT
5439
No
REDSHIFT_USER
ml_pipeline_user
No
REDSHIFT_PASSWORD
your-secure-password
Yes
💡 Environment Variable Groups: Organization admins can create shared credential groups under Organization Settings instead of configuring each project separately.
Step 2: Query Using Credentials
Python example with psycopg2:
import psycopg2
import pandas as pd
import os
# Connect using environment variables
conn = psycopg2.connect(
host=os.getenv('REDSHIFT_HOST'),
database=os.getenv('REDSHIFT_DATABASE'),
port=os.getenv('REDSHIFT_PORT'),
user=os.getenv('REDSHIFT_USER'),
password=os.getenv('REDSHIFT_PASSWORD')
)
# Query data
query = """
SELECT
customer_id,
signup_date,
total_purchases,
churn_label
FROM customers
WHERE signup_date >= '2024-01-01'
ORDER BY signup_date DESC
"""
df = pd.read_sql(query, conn)
conn.close()
print(f"Fetched {len(df)} rows from Redshift")
# Save snapshot for reproducibility
snapshot_path = '/valohai/outputs/customer_snapshot.csv'
df.to_csv(snapshot_path, index=False)
print(f"Saved snapshot to {snapshot_path}")Install dependencies:
command:
- pip install psycopg2-binary pandas
- python query_redshift.pyOr in Dockerfile:
RUN pip install psycopg2-binary pandasUsing psql Command-Line Client
For SQL-heavy workflows, use the psql command-line tool directly.
Example: Query to CSV Output
valohai.yaml:
- step:
name: redshift-export
image: postgres:15
command:
- psql -h $REDSHIFT_HOST -d $REDSHIFT_DATABASE -U $REDSHIFT_USER -p $REDSHIFT_PORT -A -f query.sql -F ',' -o /valohai/outputs/results.csv
environment-variables:
- name: REDSHIFT_HOST
default: my-cluster.abc123.us-east-1.redshift.amazonaws.com
- name: REDSHIFT_DATABASE
default: analytics
- name: REDSHIFT_USER
default: ml_pipeline_user
- name: REDSHIFT_PASSWORD
default: secret-password
- name: REDSHIFT_PORT
default: "5439"
- name: PGPASSWORD
default: secret-passwordquery.sql (in your repository):
SELECT
date_trunc('day', order_date) as day,
COUNT(*) as order_count,
SUM(total_amount) as revenue
FROM orders
WHERE order_date >= CURRENT_DATE - INTERVAL '30 days'
GROUP BY 1
ORDER BY 1 DESC;What happens:
psqlconnects to RedshiftExecutes query from
query.sqlfileOutputs results as CSV to
/valohai/outputs/results.csvFile uploaded to your data store automatically
Install psql in custom image:
FROM python:3.9
RUN apt-get update && apt-get install -y postgresql-clientComplete Workflow: Query → Snapshot → Train
Step 1: Query and Save Snapshot
fetch_data.py:
import psycopg2
import pandas as pd
import json
import os
# Connect to Redshift
conn = psycopg2.connect(
host=os.getenv('REDSHIFT_HOST'),
database=os.getenv('REDSHIFT_DATABASE'),
port=os.getenv('REDSHIFT_PORT'),
user=os.getenv('REDSHIFT_USER'),
password=os.getenv('REDSHIFT_PASSWORD')
)
# Query training data
query = """
SELECT
customer_id,
age,
income,
account_balance,
transaction_count,
churn_flag
FROM customer_features
WHERE feature_date = CURRENT_DATE - 1
"""
df = pd.read_sql(query, conn)
conn.close()
print(f"Fetched {len(df)} rows from Redshift")
# Save snapshot
snapshot_path = '/valohai/outputs/training_data.csv'
df.to_csv(snapshot_path, index=False)
# Create dataset version with metadata
from datetime import datetime
metadata = {
"training_data.csv": {
"valohai.dataset-versions": [{
"uri": "dataset://customer-churn/daily-snapshot"
}],
"valohai.tags": ["redshift", "daily", "production"],
"query_date": datetime.now().isoformat(),
"row_count": len(df),
"source_table": "customer_features",
"redshift_cluster": os.getenv('REDSHIFT_HOST')
}
}
metadata_path = '/valohai/outputs/valohai.metadata.jsonl'
with open(metadata_path, 'w') as f:
for filename, file_metadata in metadata.items():
json.dump({"file": filename, "metadata": file_metadata}, f)
f.write('\n')
print("Created dataset version: dataset://customer-churn/daily-snapshot")Step 2: Train on Snapshot
train.py:
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
import pickle
# Load data from snapshot (not live Redshift!)
data_path = '/valohai/inputs/training-data/training_data.csv'
df = pd.read_csv(data_path)
print(f"Training on {len(df)} rows from snapshot")
# Prepare features
X = df[['age', 'income', 'account_balance', 'transaction_count']]
y = df['churn_flag']
# Split and train
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
# Evaluate
accuracy = model.score(X_test, y_test)
print(f"Model accuracy: {accuracy:.4f}")
# Save model
model_path = '/valohai/outputs/churn_model.pkl'
with open(model_path, 'wb') as f:
pickle.dump(model, f)
print(f"Saved model to {model_path}")Step 3: Pipeline Configuration
valohai.yaml:
- step:
name: fetch-from-redshift
image: python:3.9
command:
- pip install psycopg2-binary pandas
- python fetch_data.py
environment-variables:
- name: REDSHIFT_HOST
default: my-cluster.abc123.us-east-1.redshift.amazonaws.com
- name: REDSHIFT_DATABASE
default: analytics
- name: REDSHIFT_USER
default: ml_pipeline
- name: REDSHIFT_PASSWORD
default: secret
- name: REDSHIFT_PORT
default: "5439"
- step:
name: train-churn-model
image: python:3.9
command:
- pip install pandas scikit-learn
- python train.py
inputs:
- name: training-data
default: dataset://customer-churn/daily-snapshotMaintaining Reproducibility
⚠️ Critical: Redshift data changes continuously. Query results today differ from results tomorrow.
The problem:
# Training today
df = pd.read_sql(query, conn) # Returns 50,000 rows
model.fit(df[features], df[target])
# Retraining next week (same query)
df = pd.read_sql(query, conn) # Returns 55,000 rows
# Different data = different model = can't reproduce original resultsThe solution:
# Always save snapshots
df = pd.read_sql(query, conn)
df.to_csv('/valohai/outputs/snapshot.csv') # Immutable snapshot
# Train on snapshot (stored in S3/GCS)
# Can reproduce exactly, even months laterBest practices:
Query once — Run query in dedicated execution
Snapshot immediately — Save to
/valohai/outputs/Version snapshots — Create dataset versions
Train on snapshots — Use dataset as input, never query directly in training
Schedule snapshots — Create fresh snapshots daily/weekly/monthly
See: Databases for complete reproducibility patterns.
Common Issues & Fixes
Connection Timeout
Symptom: OperationalError: timeout expired
Causes & Fixes:
Security group blocking Valohai IPs → Add Valohai worker security group to Redshift inbound rules
Wrong host/port → Verify Redshift endpoint and port (usually 5439)
Network connectivity → Check VPC configuration
IAM Authentication Fails
Symptom: botocore.exceptions.ClientError: An error occurred (AccessDenied)
Causes & Fixes:
IAM policy missing → Verify policy attached to Valohai worker role
Wrong ARN in policy → Double-check cluster identifier, region, account ID
DB user doesn't exist → Create user in Redshift:
CREATE USER valohai_user
Related Pages
Databases — Overview and reproducibility patterns
BigQuery — Connect to Google BigQuery
Snowflake — Connect to Snowflake
Create and Manage Datasets — Version your snapshots
Next Steps
Set up IAM authentication or store credentials
Create a test query execution
Save your first snapshot as a dataset version
Build a reproducible training pipeline using snapshots
Last updated
Was this helpful?
