# Snowflake

Query Snowflake from Valohai executions and save snapshots for reproducible ML pipelines.

***

### Overview

Snowflake is a cloud data warehouse that you can query directly from Valohai executions. This guide shows you how to:

1. Store Snowflake credentials securely
2. Query Snowflake from your code
3. Save snapshots for reproducibility
4. Use Snowflake Time Travel features

***

### Prerequisites

Before you begin:

* **Existing Snowflake account** with a database containing your data
* **Snowflake credentials** (username, password, account identifier)
* **Database access** for the user account

***

### Store Credentials in Valohai

Authenticate to Snowflake using username, password, and account identifier stored as environment variables.

#### Step 1: Find Your Snowflake Account Identifier

Your account identifier format depends on your Snowflake deployment:

**Format examples:**

* `xy12345.us-east-1` (AWS)
* `xy12345.us-central1.gcp` (GCP)
* `xy12345.east-us-2.azure` (Azure)

Find it in your Snowflake URL: `https://<account_identifier>.snowflakecomputing.com`

***

#### Step 2: Add Environment Variables

1. Open your project in Valohai
2. Go to **Settings → Env Variables**
3. Add the following variables:

| Name                  | Value                                          | Secret  |
| --------------------- | ---------------------------------------------- | ------- |
| `SNOWFLAKE_USER`      | Your Snowflake username                        | No      |
| `SNOWFLAKE_PASSWORD`  | Your Snowflake password                        | **Yes** |
| `SNOWFLAKE_ACCOUNT`   | Account identifier (e.g., `xy12345.us-east-1`) | No      |
| `SNOWFLAKE_WAREHOUSE` | Warehouse name (e.g., `COMPUTE_WH`)            | No      |
| `SNOWFLAKE_DATABASE`  | Database name                                  | No      |
| `SNOWFLAKE_SCHEMA`    | Schema name (e.g., `PUBLIC`)                   | No      |

> 💡 **Environment Variable Groups:** Organization admins can create shared credential groups under **Organization Settings → Environment Variable Groups** instead of configuring each project separately.

***

### Install Snowflake Connector

The Snowflake Python connector requires Python 3.8+. Install the connector and its dependencies in your execution.

#### Option 1: Install in Command (Recommended)

**valohai.yaml:**

```yaml
- step:
    name: query-snowflake
    image: python:3.11
    command:
      # Install Snowflake connector
      - pip install snowflake-connector-python
      # Run your script
      - python query_snowflake.py
```

***

#### Option 2: Include in Docker Image

**Dockerfile:**

```dockerfile
FROM python:3.11

# Install Snowflake dependencies
RUN pip install snowflake-connector-python pandas

# Copy your code
COPY . /app
WORKDIR /app
```

***

### Query Snowflake

#### Basic Query Example

**query\_snowflake.py:**

```python
import snowflake.connector
import pandas as pd
import os

# Connect to Snowflake using environment variables
conn = snowflake.connector.connect(
    user=os.environ["SNOWFLAKE_USER"],
    password=os.environ["SNOWFLAKE_PASSWORD"],
    account=os.environ["SNOWFLAKE_ACCOUNT"],
    warehouse=os.environ["SNOWFLAKE_WAREHOUSE"],
    database=os.environ["SNOWFLAKE_DATABASE"],
    schema=os.environ["SNOWFLAKE_SCHEMA"],
)

# Create cursor
cursor = conn.cursor()

try:
    # Query data
    query = """
        SELECT
            customer_id,
            order_date,
            total_amount,
            product_category
        FROM orders
        WHERE order_date >= '2024-01-01'
        ORDER BY order_date DESC
    """

    cursor.execute(query)

    # Fetch results
    columns = [desc[0] for desc in cursor.description]
    results = cursor.fetchall()

    print(f"Fetched {len(results)} rows from Snowflake")

    # Convert to DataFrame
    df = pd.DataFrame(results, columns=columns)

    # Save snapshot for reproducibility
    snapshot_path = "/valohai/outputs/orders_snapshot.csv"
    df.to_csv(snapshot_path, index=False)

    print(f"Saved snapshot to {snapshot_path}")

finally:
    cursor.close()
    conn.close()
```

***

### Complete Workflow: Query → Snapshot → Train

#### Step 1: Query and Save Snapshot

**fetch\_data.py:**

```python
import snowflake.connector
import pandas as pd
import json
import os
from datetime import datetime

# Connect to Snowflake
conn = snowflake.connector.connect(
    user=os.environ["SNOWFLAKE_USER"],
    password=os.environ["SNOWFLAKE_PASSWORD"],
    account=os.environ["SNOWFLAKE_ACCOUNT"],
    warehouse=os.environ["SNOWFLAKE_WAREHOUSE"],
    database=os.environ["SNOWFLAKE_DATABASE"],
    schema=os.environ["SNOWFLAKE_SCHEMA"],
)

cursor = conn.cursor()

try:
    # Set warehouse
    cursor.execute(f"USE WAREHOUSE {os.environ['SNOWFLAKE_WAREHOUSE']}")

    # Query training data
    query = """
        SELECT
            customer_id,
            age,
            income,
            account_balance,
            transaction_count,
            last_transaction_date,
            churn_flag
        FROM customer_features
        WHERE feature_date = CURRENT_DATE() - 1
        AND data_quality_score > 0.95
    """

    print("Executing Snowflake query...")
    cursor.execute(query)

    # Fetch results
    columns = [desc[0] for desc in cursor.description]
    results = cursor.fetchall()

    print(f"Fetched {len(results)} rows from Snowflake")

    # Convert to DataFrame
    df = pd.DataFrame(results, columns=columns)

    # Basic validation
    assert len(df) > 0, "No data returned from query"
    assert df["churn_flag"].notna().all(), "Missing labels in data"

    # Save snapshot
    snapshot_path = "/valohai/outputs/training_data.csv"
    df.to_csv(snapshot_path, index=False)

    # Create dataset version with metadata
    metadata = {
        "training_data.csv": {
            "valohai.dataset-versions": [
                {
                    "uri": "dataset://customer-churn/daily-snapshot",
                },
            ],
            "valohai.tags": ["snowflake", "daily", "production"],
            "query_date": datetime.now().isoformat(),
            "row_count": len(df),
            "source_table": "customer_features",
            "snowflake_account": os.environ["SNOWFLAKE_ACCOUNT"],
            "snowflake_warehouse": os.environ["SNOWFLAKE_WAREHOUSE"],
        },
    }

    metadata_path = "/valohai/outputs/valohai.metadata.jsonl"
    with open(metadata_path, "w") as f:
        for filename, file_metadata in metadata.items():
            json.dump({"file": filename, "metadata": file_metadata}, f)
            f.write("\n")

    print("Created dataset version: dataset://customer-churn/daily-snapshot")

finally:
    cursor.close()
    conn.close()
```

***

#### Step 2: Train on Snapshot

**train.py:**

```python
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, roc_auc_score
import pickle

# Load data from snapshot (not live Snowflake!)
data_path = "/valohai/inputs/training-data/training_data.csv"
df = pd.read_csv(data_path)

print(f"Training on {len(df)} rows from snapshot")

# Prepare features
feature_columns = ["age", "income", "account_balance", "transaction_count"]
X = df[feature_columns]
y = df["churn_flag"]

# Split data
X_train, X_test, y_train, y_test = train_test_split(
    X,
    y,
    test_size=0.2,
    random_state=42,
)

print(f"Training set: {len(X_train)} rows")
print(f"Test set: {len(X_test)} rows")

# Train model
model = RandomForestClassifier(
    n_estimators=100,
    max_depth=10,
    random_state=42,
)

model.fit(X_train, y_train)

# Evaluate
y_pred = model.predict(X_test)
y_pred_proba = model.predict_proba(X_test)[:, 1]

accuracy = model.score(X_test, y_test)
auc = roc_auc_score(y_test, y_pred_proba)

print(f"Accuracy: {accuracy:.4f}")
print(f"ROC AUC: {auc:.4f}")
print("\nClassification Report:")
print(classification_report(y_test, y_pred))

# Save model
model_path = "/valohai/outputs/churn_model.pkl"
with open(model_path, "wb") as f:
    pickle.dump(model, f)

print(f"Saved model to {model_path}")
```

***

#### Step 3: Pipeline Configuration

**valohai.yaml:**

```yaml
- step:
    name: fetch-from-snowflake
    image: python:3.11
    command:
      - pip install snowflake-connector-python pandas
      - python fetch_data.py
    environment-variables:
      - name: SNOWFLAKE_USER
        default: ml_pipeline_user
      - name: SNOWFLAKE_PASSWORD
        default: secret-password
      - name: SNOWFLAKE_ACCOUNT
        default: xy12345.us-east-1
      - name: SNOWFLAKE_WAREHOUSE
        default: COMPUTE_WH
      - name: SNOWFLAKE_DATABASE
        default: ANALYTICS
      - name: SNOWFLAKE_SCHEMA
        default: PUBLIC

- step:
    name: train-churn-model
    image: python:3.11
    command:
      - pip install pandas scikit-learn
      - python train.py
    inputs:
      - name: training-data
        default: dataset://customer-churn/daily-snapshot
```

***

### Maintaining Reproducibility

> ⚠️ **Critical:** Snowflake data changes continuously. Query results today differ from results tomorrow.

**The problem:**

```python
# Training today
df = pd.read_sql(query, conn)  # Returns 50,000 rows
model.fit(df[features], df[target])

# Retraining next month (same query)
df = pd.read_sql(query, conn)  # Returns 65,000 rows
# Different data = different model = can't reproduce original results
```

**The solution:**

```python
# Always save snapshots
df = pd.read_sql(query, conn)
df.to_csv("/valohai/outputs/snapshot.csv")  # Immutable snapshot

# Train on snapshot (stored in cloud storage)
# Can reproduce exactly, even months/years later
```

*

**Best practices:**

1. **Query once** — Run query in dedicated execution
2. **Snapshot immediately** — Save to `/valohai/outputs/`
3. **Version snapshots** — Create dataset versions
4. **Train on snapshots** — Use dataset as input, never query directly in training
5. **Use Time Travel for debugging** — But snapshot for reproducibility

**See:** [Databases](/data/data-databases.md) for complete reproducibility patterns.

***

### Common Issues & Fixes

#### Connection Failed

**Symptom:** `snowflake.connector.errors.DatabaseError: 250001`

**Causes & Fixes:**

* Wrong account identifier → Verify format (include region and cloud)
* Wrong username/password → Check credentials in Snowflake UI
* Network connectivity → Check firewall/VPN settings

***

#### Warehouse Not Running

**Symptom:** `SQL compilation error: Object does not exist`

**Causes & Fixes:**

* Warehouse suspended → Resume warehouse: `ALTER WAREHOUSE COMPUTE_WH RESUME`
* Wrong warehouse name → Verify warehouse exists: `SHOW WAREHOUSES`

***

#### Insufficient Privileges

**Symptom:** `SQL access control error: Insufficient privileges`

**Causes & Fixes:**

* User missing permissions → Grant necessary roles: `GRANT SELECT ON DATABASE analytics TO ROLE ml_role`
* Wrong role active → Use correct role: `cursor.execute("USE ROLE ml_role")`

***

#### Python Version Incompatibility

**Symptom:** Import errors or dependency conflicts

**Causes & Fixes:**

* Wrong requirements file → Use correct version for your Python (e.g., `requirements_39.reqs` for Python 3.9)
* Missing dependencies → Install tested requirements before connector

***

### Related Pages

* [Databases](/data/data-databases.md) — Overview and reproducibility patterns
* [AWS Redshift](/data/data-databases/aws-redshift.md) — Connect to Amazon Redshift
* [BigQuery](/data/data-databases/bigquery.md) — Connect to Google BigQuery
* [Create and Manage Datasets](/data/datasets/creating-datasets.md) — Version your snapshots

***

### Next Steps

* Store Snowflake credentials in Valohai
* Create a test query execution
* Save your first snapshot as a [dataset version](/data/datasets/update-datasets.md)


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.valohai.com/data/data-databases/snowflake.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
