AWS Redshift

Query AWS Redshift from Valohai executions and save snapshots for reproducible ML pipelines.


Overview

AWS Redshift is a cloud data warehouse that you can query directly from Valohai executions. This guide shows you how to:

  1. Set up authentication (IAM roles or credentials)

  2. Query Redshift from your code

  3. Save snapshots for reproducibility


Prerequisites

Before you begin:

  • Existing Redshift cluster with data to query

  • Network access — Inbound rule allowing connections from Valohai workers' security group

  • Credentials or IAM role for authentication


Authentication: Choose Your Method

Decision Tree

Are you running Valohai on AWS?
├─ Yes: Use IAM Role (machine identity) **Recommended**
│       More secure, no credentials to manage

└─ No: Use Username/Password
        Works anywhere, requires credential management

Use your EC2 instances' IAM identity to authenticate without storing credentials.

How It Works

  1. Your AWS account has an IAM Role for EC2 instances used by Valohai (e.g., ValohaiWorkerRole)

  2. Grant this role permission to access Redshift

  3. Executions automatically authenticate using the machine's identity

  4. No credentials stored in Valohai

Benefits

  • No credentials to manage or rotate

  • More secure (no secrets in environment variables)

  • Automatic authentication

  • Fine-grained permissions per Valohai environment


Step 1: Create IAM Policy

Grant the ValohaiWorkerRole permission to get Redshift credentials.

IAM Policy JSON:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "RedshiftAccess",
            "Effect": "Allow",
            "Action": "redshift:GetClusterCredentials",
            "Resource": [
                "arn:aws:redshift:<region>:<account-id>:dbuser:<cluster-identifier>/<db-username>",
                "arn:aws:redshift:*:<account-id>:dbgroup:*/*",
                "arn:aws:redshift:<region>:<account-id>:dbname:<cluster-identifier>/<database-name>"
            ]
        }
    ]
}

Replace these values:

  • <region> — AWS region (e.g., us-east-1)

  • <account-id> — Your AWS account ID

  • <cluster-identifier> — Redshift cluster identifier

  • <db-username> — Database user for Valohai

  • <database-name> — Database name

Policy explanation:

  • redshift:GetClusterCredentials — Allows temporary credential generation

  • dbuser resource — Specifies which database user Valohai can impersonate

  • dbname resource — Restricts access to specific database


Step 2: Attach Policy to Valohai Role

  1. In AWS IAM, find your Valohai worker role (e.g., ValohaiWorkerRole)

  2. Attach the policy you created above

  3. Valohai executions can now authenticate to Redshift

💡 Multiple environments: Create separate IAM roles for dev/staging/production Valohai environments to control access granularly.


Step 3: Query Using IAM Authentication

Python example with IAM authentication:

import redshift_connector
import requests
import json

# Fetch credentials from EC2 instance metadata
aws_metadata_ip = '169.254.169.254'
role_name = 'ValohaiWorkerRole'

def get_aws_credentials():
    with requests.Session() as session:
        token_resp = session.put(
            url=f"http://{aws_metadata_ip}/latest/api/token",
            headers={"X-aws-ec2-metadata-token-ttl-seconds": "60"},
            timeout=5,
        )
        
        req_headers = {"X-aws-ec2-metadata-token": token_resp.text}
        role_resp = session.get(
            url=f"http://{aws_metadata_ip}/latest/meta-data/iam/security-credentials/",
            headers=req_headers,
            timeout=5,
        )
        
        role_name = role_resp.text
        credentials_resp = session.get(
            url=f"http://{aws_metadata_ip}/latest/meta-data/iam/security-credentials/{role_name}",
            headers=req_headers,
            timeout=5,
        )
        
        return credentials_resp.json()

# Obtain AWS credentials
credentials = get_aws_credentials()

# Redshift connection details
host = 'my-cluster.abc123.us-east-1.redshift.amazonaws.com'
database = 'analytics'
db_user = 'valohai_user'
cluster_identifier = 'my-cluster'
region = 'us-east-1'

# Connect using IAM authentication
conn = redshift_connector.connect(
    iam=True,
    host=host,
    database=database,
    db_user=db_user,
    cluster_identifier=cluster_identifier,
    access_key_id=credentials["AccessKeyId"],
    secret_access_key=credentials["SecretAccessKey"],
    session_token=credentials["Token"],
    region=region
)

# Query data
query = """
    SELECT 
        customer_id,
        order_date,
        total_amount,
        product_category
    FROM orders
    WHERE order_date >= '2024-01-01'
"""

cursor = conn.cursor()
cursor.execute(query)
results = cursor.fetchall()

print(f"Fetched {len(results)} rows from Redshift")

# Save snapshot for reproducibility
import pandas as pd

df = pd.DataFrame(results, columns=[desc[0] for desc in cursor.description])
snapshot_path = '/valohai/outputs/orders_snapshot.csv'
df.to_csv(snapshot_path, index=False)

print(f"Saved snapshot to {snapshot_path}")

cursor.close()
conn.close()

⚠️ ​The example above only shows the "happy path" - assuming that all the requests will be successful and return expected data.

For a full example, take a look at this GitHub page.

Install dependencies:

Add to your valohai.yaml command:

command:
  - pip install redshift_connector pandas
  - python query_redshift.py

Or include in your Docker image:

RUN pip install redshift_connector pandas

Option 2: Username/Password Authentication

Store Redshift credentials as Valohai environment variables.

Step 1: Store Credentials in Valohai

  1. Open your project in Valohai

  2. Go to Settings → Env Variables

  3. Add the following variables:

Name
Value
Secret

REDSHIFT_HOST

my-cluster.abc123.us-east-1.redshift.amazonaws.com

No

REDSHIFT_DATABASE

analytics

No

REDSHIFT_PORT

5439

No

REDSHIFT_USER

ml_pipeline_user

No

REDSHIFT_PASSWORD

your-secure-password

Yes

💡 Environment Variable Groups: Organization admins can create shared credential groups under Organization Settings instead of configuring each project separately.


Step 2: Query Using Credentials

Python example with psycopg2:

import psycopg2
import pandas as pd
import os

# Connect using environment variables
conn = psycopg2.connect(
    host=os.getenv('REDSHIFT_HOST'),
    database=os.getenv('REDSHIFT_DATABASE'),
    port=os.getenv('REDSHIFT_PORT'),
    user=os.getenv('REDSHIFT_USER'),
    password=os.getenv('REDSHIFT_PASSWORD')
)

# Query data
query = """
    SELECT 
        customer_id,
        signup_date,
        total_purchases,
        churn_label
    FROM customers
    WHERE signup_date >= '2024-01-01'
    ORDER BY signup_date DESC
"""

df = pd.read_sql(query, conn)
conn.close()

print(f"Fetched {len(df)} rows from Redshift")

# Save snapshot for reproducibility
snapshot_path = '/valohai/outputs/customer_snapshot.csv'
df.to_csv(snapshot_path, index=False)

print(f"Saved snapshot to {snapshot_path}")

Install dependencies:

command:
  - pip install psycopg2-binary pandas
  - python query_redshift.py

Or in Dockerfile:

RUN pip install psycopg2-binary pandas

Using psql Command-Line Client

For SQL-heavy workflows, use the psql command-line tool directly.

Example: Query to CSV Output

valohai.yaml:

- step:
    name: redshift-export
    image: postgres:15
    command:
      - psql -h $REDSHIFT_HOST -d $REDSHIFT_DATABASE -U $REDSHIFT_USER -p $REDSHIFT_PORT -A -f query.sql -F ',' -o /valohai/outputs/results.csv
    environment-variables:
      - name: REDSHIFT_HOST
        default: my-cluster.abc123.us-east-1.redshift.amazonaws.com
      - name: REDSHIFT_DATABASE
        default: analytics
      - name: REDSHIFT_USER
        default: ml_pipeline_user
      - name: REDSHIFT_PASSWORD
        default: secret-password
      - name: REDSHIFT_PORT
        default: "5439"
      - name: PGPASSWORD
        default: secret-password

query.sql (in your repository):

SELECT 
    date_trunc('day', order_date) as day,
    COUNT(*) as order_count,
    SUM(total_amount) as revenue
FROM orders
WHERE order_date >= CURRENT_DATE - INTERVAL '30 days'
GROUP BY 1
ORDER BY 1 DESC;

What happens:

  1. psql connects to Redshift

  2. Executes query from query.sql file

  3. Outputs results as CSV to /valohai/outputs/results.csv

  4. File uploaded to your data store automatically

Install psql in custom image:

FROM python:3.9
RUN apt-get update && apt-get install -y postgresql-client

Complete Workflow: Query → Snapshot → Train

Step 1: Query and Save Snapshot

fetch_data.py:

import psycopg2
import pandas as pd
import json
import os

# Connect to Redshift
conn = psycopg2.connect(
    host=os.getenv('REDSHIFT_HOST'),
    database=os.getenv('REDSHIFT_DATABASE'),
    port=os.getenv('REDSHIFT_PORT'),
    user=os.getenv('REDSHIFT_USER'),
    password=os.getenv('REDSHIFT_PASSWORD')
)

# Query training data
query = """
    SELECT 
        customer_id,
        age,
        income,
        account_balance,
        transaction_count,
        churn_flag
    FROM customer_features
    WHERE feature_date = CURRENT_DATE - 1
"""

df = pd.read_sql(query, conn)
conn.close()

print(f"Fetched {len(df)} rows from Redshift")

# Save snapshot
snapshot_path = '/valohai/outputs/training_data.csv'
df.to_csv(snapshot_path, index=False)

# Create dataset version with metadata
from datetime import datetime

metadata = {
    "training_data.csv": {
        "valohai.dataset-versions": [{
            "uri": "dataset://customer-churn/daily-snapshot"
        }],
        "valohai.tags": ["redshift", "daily", "production"],
        "query_date": datetime.now().isoformat(),
        "row_count": len(df),
        "source_table": "customer_features",
        "redshift_cluster": os.getenv('REDSHIFT_HOST')
    }
}

metadata_path = '/valohai/outputs/valohai.metadata.jsonl'
with open(metadata_path, 'w') as f:
    for filename, file_metadata in metadata.items():
        json.dump({"file": filename, "metadata": file_metadata}, f)
        f.write('\n')

print("Created dataset version: dataset://customer-churn/daily-snapshot")

Step 2: Train on Snapshot

train.py:

import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
import pickle

# Load data from snapshot (not live Redshift!)
data_path = '/valohai/inputs/training-data/training_data.csv'
df = pd.read_csv(data_path)

print(f"Training on {len(df)} rows from snapshot")

# Prepare features
X = df[['age', 'income', 'account_balance', 'transaction_count']]
y = df['churn_flag']

# Split and train
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)

# Evaluate
accuracy = model.score(X_test, y_test)
print(f"Model accuracy: {accuracy:.4f}")

# Save model
model_path = '/valohai/outputs/churn_model.pkl'
with open(model_path, 'wb') as f:
    pickle.dump(model, f)

print(f"Saved model to {model_path}")

Step 3: Pipeline Configuration

valohai.yaml:

- step:
    name: fetch-from-redshift
    image: python:3.9
    command:
      - pip install psycopg2-binary pandas
      - python fetch_data.py
    environment-variables:
      - name: REDSHIFT_HOST
        default: my-cluster.abc123.us-east-1.redshift.amazonaws.com
      - name: REDSHIFT_DATABASE
        default: analytics
      - name: REDSHIFT_USER
        default: ml_pipeline
      - name: REDSHIFT_PASSWORD
        default: secret
      - name: REDSHIFT_PORT
        default: "5439"

- step:
    name: train-churn-model
    image: python:3.9
    command:
      - pip install pandas scikit-learn
      - python train.py
    inputs:
      - name: training-data
        default: dataset://customer-churn/daily-snapshot

Maintaining Reproducibility

⚠️ Critical: Redshift data changes continuously. Query results today differ from results tomorrow.

The problem:

# Training today
df = pd.read_sql(query, conn)  # Returns 50,000 rows
model.fit(df[features], df[target])

# Retraining next week (same query)
df = pd.read_sql(query, conn)  # Returns 55,000 rows
# Different data = different model = can't reproduce original results

The solution:

# Always save snapshots
df = pd.read_sql(query, conn)
df.to_csv('/valohai/outputs/snapshot.csv')  # Immutable snapshot

# Train on snapshot (stored in S3/GCS)
# Can reproduce exactly, even months later

Best practices:

  1. Query once — Run query in dedicated execution

  2. Snapshot immediately — Save to /valohai/outputs/

  3. Version snapshots — Create dataset versions

  4. Train on snapshots — Use dataset as input, never query directly in training

  5. Schedule snapshots — Create fresh snapshots daily/weekly/monthly

See: Databases for complete reproducibility patterns.


Common Issues & Fixes

Connection Timeout

Symptom: OperationalError: timeout expired

Causes & Fixes:

  • Security group blocking Valohai IPs → Add Valohai worker security group to Redshift inbound rules

  • Wrong host/port → Verify Redshift endpoint and port (usually 5439)

  • Network connectivity → Check VPC configuration


IAM Authentication Fails

Symptom: botocore.exceptions.ClientError: An error occurred (AccessDenied)

Causes & Fixes:

  • IAM policy missing → Verify policy attached to Valohai worker role

  • Wrong ARN in policy → Double-check cluster identifier, region, account ID

  • DB user doesn't exist → Create user in Redshift: CREATE USER valohai_user



Next Steps

  • Set up IAM authentication or store credentials

  • Create a test query execution

  • Save your first snapshot as a dataset version

  • Build a reproducible training pipeline using snapshots

Last updated

Was this helpful?