# AWS Redshift

Query AWS Redshift from Valohai executions and save snapshots for reproducible ML pipelines.

***

### Overview

AWS Redshift is a cloud data warehouse that you can query directly from Valohai executions. This guide shows you how to:

1. Set up authentication (IAM roles or credentials)
2. Query Redshift from your code
3. Save snapshots for reproducibility

***

### Prerequisites

Before you begin:

* **Existing Redshift cluster** with data to query
* **Network access** — Inbound rule allowing connections from Valohai workers' security group
* **Credentials or IAM role** for authentication

***

### Authentication: Choose Your Method

#### Decision Tree

```
Are you running Valohai on AWS?
├─ Yes: Use IAM Role (machine identity) **Recommended**
│       More secure, no credentials to manage
│
└─ No: Use Username/Password
        Works anywhere, requires credential management
```

***

### Option 1: IAM Role Authentication (Recommended)

Use your EC2 instances' IAM identity to authenticate without storing credentials.

#### How It Works

1. Your AWS account has an IAM Role for EC2 instances used by Valohai (e.g., `ValohaiWorkerRole`)
2. Grant this role permission to access Redshift
3. Executions automatically authenticate using the machine's identity
4. No credentials stored in Valohai

#### Benefits

* No credentials to manage or rotate
* More secure (no secrets in environment variables)
* Automatic authentication
* Fine-grained permissions per Valohai environment

***

#### Step 1: Create IAM Policy

Grant the `ValohaiWorkerRole` permission to get Redshift credentials.

**IAM Policy JSON:**

```json
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "RedshiftAccess",
      "Effect": "Allow",
      "Action": "redshift:GetClusterCredentials",
      "Resource": [
        "arn:aws:redshift:<region>:<account-id>:dbuser:<cluster-identifier>/<db-username>",
        "arn:aws:redshift:*:<account-id>:dbgroup:*/*",
        "arn:aws:redshift:<region>:<account-id>:dbname:<cluster-identifier>/<database-name>"
      ]
    }
  ]
}
```

**Replace these values:**

* `<region>` — AWS region (e.g., `us-east-1`)
* `<account-id>` — Your AWS account ID
* `<cluster-identifier>` — Redshift cluster identifier
* `<db-username>` — Database user for Valohai
* `<database-name>` — Database name

**Policy explanation:**

* `redshift:GetClusterCredentials` — Allows temporary credential generation
* `dbuser` resource — Specifies which database user Valohai can impersonate
* `dbname` resource — Restricts access to specific database

***

#### Step 2: Attach Policy to Valohai Role

1. In AWS IAM, find your Valohai worker role (e.g., `ValohaiWorkerRole`)
2. Attach the policy you created above
3. Valohai executions can now authenticate to Redshift

> 💡 **Multiple environments:** Create separate IAM roles for dev/staging/production Valohai environments to control access granularly.

***

#### Step 3: Query Using IAM Authentication

**Python example with IAM authentication:**

```python
import redshift_connector
import requests
import json

# Fetch credentials from EC2 instance metadata
aws_metadata_ip = "169.254.169.254"
role_name = "ValohaiWorkerRole"


def get_aws_credentials():
    with requests.Session() as session:
        token_resp = session.put(
            url=f"http://{aws_metadata_ip}/latest/api/token",
            headers={"X-aws-ec2-metadata-token-ttl-seconds": "60"},
            timeout=5,
        )

        req_headers = {"X-aws-ec2-metadata-token": token_resp.text}
        role_resp = session.get(
            url=f"http://{aws_metadata_ip}/latest/meta-data/iam/security-credentials/",
            headers=req_headers,
            timeout=5,
        )

        role_name = role_resp.text
        credentials_resp = session.get(
            url=f"http://{aws_metadata_ip}/latest/meta-data/iam/security-credentials/{role_name}",
            headers=req_headers,
            timeout=5,
        )

        return credentials_resp.json()


# Obtain AWS credentials
credentials = get_aws_credentials()

# Redshift connection details
host = "my-cluster.abc123.us-east-1.redshift.amazonaws.com"
database = "analytics"
db_user = "valohai_user"
cluster_identifier = "my-cluster"
region = "us-east-1"

# Connect using IAM authentication
conn = redshift_connector.connect(
    iam=True,
    host=host,
    database=database,
    db_user=db_user,
    cluster_identifier=cluster_identifier,
    access_key_id=credentials["AccessKeyId"],
    secret_access_key=credentials["SecretAccessKey"],
    session_token=credentials["Token"],
    region=region,
)

# Query data
query = """
    SELECT
        customer_id,
        order_date,
        total_amount,
        product_category
    FROM orders
    WHERE order_date >= '2024-01-01'
"""

cursor = conn.cursor()
cursor.execute(query)
results = cursor.fetchall()

print(f"Fetched {len(results)} rows from Redshift")

# Save snapshot for reproducibility
import pandas as pd

df = pd.DataFrame(results, columns=[desc[0] for desc in cursor.description])
snapshot_path = "/valohai/outputs/orders_snapshot.csv"
df.to_csv(snapshot_path, index=False)

print(f"Saved snapshot to {snapshot_path}")

cursor.close()
conn.close()
```

> :warning: ​The example above only shows the "happy path" - assuming that all the requests will be successful and return expected data.
>
> For a full example, take a look at this [GitHub page](https://github.com/valohai/library-sandbox/blob/master/connectors/steps/redshift/query.py).

**Install dependencies:**

Add to your `valohai.yaml` command:

```yaml
command:
  - pip install redshift_connector pandas
  - python query_redshift.py
```

Or include in your Docker image:

```dockerfile
RUN pip install redshift_connector pandas
```

***

### Option 2: Username/Password Authentication

Store Redshift credentials as Valohai environment variables.

#### Step 1: Store Credentials in Valohai

1. Open your project in Valohai
2. Go to **Settings → Env Variables**
3. Add the following variables:

<table><thead><tr><th width="169.25">Name</th><th width="460.25">Value</th><th width="86.75">Secret</th></tr></thead><tbody><tr><td><code>REDSHIFT_HOST</code></td><td><code>my-cluster.abc123.us-east-1.redshift.amazonaws.com</code></td><td>No</td></tr><tr><td><code>REDSHIFT_DATABASE</code></td><td><code>analytics</code></td><td>No</td></tr><tr><td><code>REDSHIFT_PORT</code></td><td><code>5439</code></td><td>No</td></tr><tr><td><code>REDSHIFT_USER</code></td><td><code>ml_pipeline_user</code></td><td>No</td></tr><tr><td><code>REDSHIFT_PASSWORD</code></td><td><code>your-secure-password</code></td><td><strong>Yes</strong></td></tr></tbody></table>

> 💡 **Environment Variable Groups:** Organization admins can create shared credential groups under Organization Settings instead of configuring each project separately.

***

#### Step 2: Query Using Credentials

**Python example with psycopg2:**

```python
import psycopg2
import pandas as pd
import os

# Connect using environment variables
conn = psycopg2.connect(
    host=os.getenv("REDSHIFT_HOST"),
    database=os.getenv("REDSHIFT_DATABASE"),
    port=os.getenv("REDSHIFT_PORT"),
    user=os.getenv("REDSHIFT_USER"),
    password=os.getenv("REDSHIFT_PASSWORD"),
)

# Query data
query = """
    SELECT
        customer_id,
        signup_date,
        total_purchases,
        churn_label
    FROM customers
    WHERE signup_date >= '2024-01-01'
    ORDER BY signup_date DESC
"""

df = pd.read_sql(query, conn)
conn.close()

print(f"Fetched {len(df)} rows from Redshift")

# Save snapshot for reproducibility
snapshot_path = "/valohai/outputs/customer_snapshot.csv"
df.to_csv(snapshot_path, index=False)

print(f"Saved snapshot to {snapshot_path}")
```

**Install dependencies:**

```yaml
command:
  - pip install psycopg2-binary pandas
  - python query_redshift.py
```

Or in Dockerfile:

```dockerfile
RUN pip install psycopg2-binary pandas
```

***

### Using psql Command-Line Client

For SQL-heavy workflows, use the `psql` command-line tool directly.

#### Example: Query to CSV Output

**valohai.yaml:**

```yaml
- step:
    name: redshift-export
    image: postgres:15
    command:
      - psql -h $REDSHIFT_HOST -d $REDSHIFT_DATABASE -U $REDSHIFT_USER -p $REDSHIFT_PORT -A -f query.sql -F ',' -o /valohai/outputs/results.csv
    environment-variables:
      - name: REDSHIFT_HOST
        default: my-cluster.abc123.us-east-1.redshift.amazonaws.com
      - name: REDSHIFT_DATABASE
        default: analytics
      - name: REDSHIFT_USER
        default: ml_pipeline_user
      - name: REDSHIFT_PASSWORD
        default: secret-password
      - name: REDSHIFT_PORT
        default: "5439"
      - name: PGPASSWORD
        default: secret-password
```

**query.sql (in your repository):**

```sql
SELECT
    date_trunc('day', order_date) as day,
    COUNT(*) as order_count,
    SUM(total_amount) as revenue
FROM orders
WHERE order_date >= CURRENT_DATE - INTERVAL '30 days'
GROUP BY 1
ORDER BY 1 DESC;
```

**What happens:**

1. `psql` connects to Redshift
2. Executes query from `query.sql` file
3. Outputs results as CSV to `/valohai/outputs/results.csv`
4. File uploaded to your data store automatically

**Install psql in custom image:**

```dockerfile
FROM python:3.9
RUN apt-get update && apt-get install -y postgresql-client
```

***

### Complete Workflow: Query → Snapshot → Train

#### Step 1: Query and Save Snapshot

**fetch\_data.py:**

```python
import psycopg2
import pandas as pd
import json
import os

# Connect to Redshift
conn = psycopg2.connect(
    host=os.getenv("REDSHIFT_HOST"),
    database=os.getenv("REDSHIFT_DATABASE"),
    port=os.getenv("REDSHIFT_PORT"),
    user=os.getenv("REDSHIFT_USER"),
    password=os.getenv("REDSHIFT_PASSWORD"),
)

# Query training data
query = """
    SELECT
        customer_id,
        age,
        income,
        account_balance,
        transaction_count,
        churn_flag
    FROM customer_features
    WHERE feature_date = CURRENT_DATE - 1
"""

df = pd.read_sql(query, conn)
conn.close()

print(f"Fetched {len(df)} rows from Redshift")

# Save snapshot
snapshot_path = "/valohai/outputs/training_data.csv"
df.to_csv(snapshot_path, index=False)

# Create dataset version with metadata
from datetime import datetime

metadata = {
    "training_data.csv": {
        "valohai.dataset-versions": [
            {
                "uri": "dataset://customer-churn/daily-snapshot",
            },
        ],
        "valohai.tags": ["redshift", "daily", "production"],
        "query_date": datetime.now().isoformat(),
        "row_count": len(df),
        "source_table": "customer_features",
        "redshift_cluster": os.getenv("REDSHIFT_HOST"),
    },
}

metadata_path = "/valohai/outputs/valohai.metadata.jsonl"
with open(metadata_path, "w") as f:
    for filename, file_metadata in metadata.items():
        json.dump({"file": filename, "metadata": file_metadata}, f)
        f.write("\n")

print("Created dataset version: dataset://customer-churn/daily-snapshot")
```

***

#### Step 2: Train on Snapshot

**train.py:**

```python
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
import pickle

# Load data from snapshot (not live Redshift!)
data_path = "/valohai/inputs/training-data/training_data.csv"
df = pd.read_csv(data_path)

print(f"Training on {len(df)} rows from snapshot")

# Prepare features
X = df[["age", "income", "account_balance", "transaction_count"]]
y = df["churn_flag"]

# Split and train
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)

# Evaluate
accuracy = model.score(X_test, y_test)
print(f"Model accuracy: {accuracy:.4f}")

# Save model
model_path = "/valohai/outputs/churn_model.pkl"
with open(model_path, "wb") as f:
    pickle.dump(model, f)

print(f"Saved model to {model_path}")
```

***

#### Step 3: Pipeline Configuration

**valohai.yaml:**

```yaml
- step:
    name: fetch-from-redshift
    image: python:3.9
    command:
      - pip install psycopg2-binary pandas
      - python fetch_data.py
    environment-variables:
      - name: REDSHIFT_HOST
        default: my-cluster.abc123.us-east-1.redshift.amazonaws.com
      - name: REDSHIFT_DATABASE
        default: analytics
      - name: REDSHIFT_USER
        default: ml_pipeline
      - name: REDSHIFT_PASSWORD
        default: secret
      - name: REDSHIFT_PORT
        default: "5439"

- step:
    name: train-churn-model
    image: python:3.9
    command:
      - pip install pandas scikit-learn
      - python train.py
    inputs:
      - name: training-data
        default: dataset://customer-churn/daily-snapshot
```

***

### Maintaining Reproducibility

> ⚠️ **Critical:** Redshift data changes continuously. Query results today differ from results tomorrow.

**The problem:**

```python
# Training today
df = pd.read_sql(query, conn)  # Returns 50,000 rows
model.fit(df[features], df[target])

# Retraining next week (same query)
df = pd.read_sql(query, conn)  # Returns 55,000 rows
# Different data = different model = can't reproduce original results
```

**The solution:**

```python
# Always save snapshots
df = pd.read_sql(query, conn)
df.to_csv("/valohai/outputs/snapshot.csv")  # Immutable snapshot

# Train on snapshot (stored in S3/GCS)
# Can reproduce exactly, even months later
```

**Best practices:**

1. **Query once** — Run query in dedicated execution
2. **Snapshot immediately** — Save to `/valohai/outputs/`
3. **Version snapshots** — Create dataset versions
4. **Train on snapshots** — Use dataset as input, never query directly in training
5. **Schedule snapshots** — Create fresh snapshots daily/weekly/monthly

**See:** [Databases](/data/data-databases.md) for complete reproducibility patterns.

***

### Common Issues & Fixes

#### Connection Timeout

**Symptom:** `OperationalError: timeout expired`

**Causes & Fixes:**

* Security group blocking Valohai IPs → Add Valohai worker security group to Redshift inbound rules
* Wrong host/port → Verify Redshift endpoint and port (usually 5439)
* Network connectivity → Check VPC configuration

***

#### IAM Authentication Fails

**Symptom:** `botocore.exceptions.ClientError: An error occurred (AccessDenied)`

**Causes & Fixes:**

* IAM policy missing → Verify policy attached to Valohai worker role
* Wrong ARN in policy → Double-check cluster identifier, region, account ID
* DB user doesn't exist → Create user in Redshift: `CREATE USER valohai_user`

***

### Related Pages

* [Databases](/data/data-databases.md) — Overview and reproducibility patterns
* [BigQuery](/data/data-databases/bigquery.md) — Connect to Google BigQuery
* [Snowflake](/data/data-databases/snowflake.md) — Connect to Snowflake
* [Create and Manage Datasets](/data/datasets/creating-datasets.md) — Version your snapshots

***

### Next Steps

* Set up IAM authentication or store credentials
* Create a test query execution
* Save your first snapshot as a [dataset version](/data/datasets/update-datasets.md)
* Build a reproducible training [pipeline](/pipelines.md) using snapshots


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.valohai.com/data/data-databases/aws-redshift.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
