# RAG and Context Pipelines in Valohai

Retrieval-Augmented Generation (RAG) systems combine vector search with large language models to produce grounded, explainable answers. Valohai provides the reproducible system behind these fast-moving architectures, so you can plug in new models, retrievers, or vector databases without breaking your workflow.

### What is RAG and why it matters

RAG enhances large language models by retrieving factual context from your own data.\
Instead of relying solely on the model's training corpus, a RAG system:

1. **Embeds** your documents into vectors,
2. **Retrieves** relevant chunks for each query,
3. **Generates** grounded responses, and
4. **Evaluates** factuality and relevance.

In Valohai, each stage becomes a versioned step in a reproducible pipeline —\
so as the ecosystem evolves (OpenAI → Anthropic → Meta → etc.), you can swap tools but keep lineage intact.

> 💡 *In a fast-moving GenAI landscape, your retrieval or model provider may change — your reproducible pipeline shouldn't.*

### RAG pipeline architecture in Valohai

A simple RAG workflow maps naturally onto a Valohai pipeline:

```
ingest → embed → retrieve → generate → evaluate
```

Each component runs as a step and you can replace or parallelize any of them independently.

### Embedding step

The embedding step converts documents into vectors and saves them as a versioned dataset.

```yaml
- step: embed-documents
  image: python:3.10
  command:
    - python src/embed.py --input {inputs.docs} --output /valohai/outputs/vectors/
  inputs:
    - name: docs
      default: dataset://documentation-corpus/v1
  parameters:
    - name: embedding_model
      type: string
      default: "text-embedding-3-large"
```

#### Example `embed.py`

```python
import os, json, openai, numpy as np
from pathlib import Path

input_file = "/valohai/inputs/docs/data.jsonl"
model = os.getenv("EMBEDDING_MODEL", "text-embedding-3-large")

with open(input_file) as f:
    docs = [json.loads(line)["text"] for line in f]

vectors = [openai.embeddings.create(model=model, input=chunk)["data"][0]["embedding"] for chunk in docs]
np.save("/valohai/outputs/vectors.npy", vectors)
print("Saved vector embeddings.")
```

> **Secrets:**\
> Store your API keys (e.g., `OPENAI_API_KEY`) as Valohai secrets in your project settings, they'll be injected automatically at runtime without exposing them in code or YAML.

Each new run produces a new dataset version (`rag-vectors/v2`, `v3`, …), ensuring you can trace which document snapshot produced which embeddings.

### Retrieval and generation step

Retrieve relevant chunks and generate responses using different LLM providers.

```yaml
- step: query-rag
  image: python:3.10
  command:
    - python src/query.py
  inputs:
    - name: vectors
      default: dataset://rag-vectors/v3
  parameters:
    - name: provider
      default: "openai"  # openai | anthropic | meta
      type: string
    - name: model
      default: "gpt-4-turbo"
      type: string
    - name: query
      default: "Explain what a Valohai pipeline is"
      type: string
```

#### Example `query.py`

```python
import os, json
from pathlib import Path
import openai, anthropic
from llamaapi import Llama

provider = os.getenv("PROVIDER", "openai")
model = os.getenv("MODEL", "gpt-4-turbo")

query = os.getenv("QUERY", "What is a pipeline?")
context = "retrieved text chunks..."  # retrieved from your vector index

if provider == "openai":
    response = openai.ChatCompletion.create(
        model=model,
        messages=[{"role": "user", "content": f"{context}\n\n{query}"}],
    )
    answer = response["choices"][0]["message"]["content"]

elif provider == "anthropic":
    client = anthropic.Anthropic()
    answer = (
        client.messages.create(
            model=model,
            messages=[{"role": "user", "content": f"{context}\n\n{query}"}],
        )
        .content[0]
        .text
    )

elif provider == "meta":
    llama = Llama(api_key=os.getenv("LLAMA_API_KEY"))
    answer = llama.generate(prompt=f"{context}\n\n{query}")

Path("/valohai/outputs/responses.json").write_text(json.dumps({"query": query, "answer": answer}))
print(f"Saved answer from {provider}:{model}")
```

Run the same pipeline against multiple providers to benchmark quality, latency, and cost.

### Evaluation step

Use evaluation steps to track both retrieval and generation performance.

```yaml
- step: evaluate-rag
  image: python:3.10
  command:
    - python src/evaluate.py --responses {inputs.responses}
  inputs:
    - name: responses
      default: dataset://rag-responses/v3
```

#### Example `evaluate.py`

```python
import numpy as np, openai, json, os, vh

data = json.load(open("/valohai/inputs/responses/responses.json"))

# 1. Retrieval quality (Recall@K)
recall = np.mean([len(set(d["retrieved"]) & set(d["gold"])) / len(d["gold"]) for d in data])
vh.metadata.set("recall_at_k", recall)

# 2. Factuality via LLM-as-a-judge
prompt = f"Rate factual accuracy of this answer (1–5): {data[0]['answer']}"
score = openai.ChatCompletion.create(model="gpt-4-turbo", messages=[{"role": "user", "content": prompt}])
vh.metadata.set("factuality_score", score["choices"][0]["message"]["content"])

# 3. Cost and latency (pseudo)
vh.metadata.set("avg_latency", 1.2, "avg_cost_usd", 0.004)

print("Metrics logged to execution metadata.")
```

Evaluate both sides of RAG:

* **Retrieval metrics** (Recall\@K, MRR)
* **Generation metrics** (Factuality, BLEU, ROUGE, or GPT-judge)
* **Operational metrics** (Latency, Cost)

You can automate evaluations whenever:

* A new document dataset is uploaded
* A new embedding model is tested
* A new provider (e.g., Claude or Llama) is added

### Human approval and comparison

Add a pause for human approval after automated evaluation:

```yaml
- pipeline:
    name: evaluate
    nodes:
      - name: evaluate
        type: execution
        step: evaluate
      - name: promote-model
        type: execution
        step: promote-model
        actions:
          - when: node-starting
            then: require-approval
    edges:
    - [evaluate.outputs.*, aggregate-results.inputs.results]
```

Review model outputs manually or through your own interface, then approve to continue.\
All human decisions are logged in the pipeline's audit trail.

Use the **Model Catalog** to compare:

| Provider  | Model       | Recall\@K | Factuality | Cost (USD) | Decision   |
| --------- | ----------- | --------- | ---------- | ---------- | ---------- |
| OpenAI    | GPT-4-Turbo | 0.89      | 4.6        | 0.004      | ✅          |
| Anthropic | Claude 3.5  | 0.87      | 4.8        | 0.003      | ✅          |
| Meta      | Llama-3-70B | 0.84      | 4.2        | 0.001      | 🔄 re-test |

### Integrations and triggers

RAG pipelines often connect multiple external tools.\
Valohai integrates easily with them through webhooks, notifications, and the REST API.

* **Webhooks:** Re-run embedding pipelines automatically when your document corpus updates.
* **Notifications:** Post evaluation summaries to Slack or Teams for reviewers.
* **REST API:** Programmatically launch, track, or fetch the latest RAG model metrics.

> All retrievals, embeddings, and evaluations stay reproducible, observable, and interchangeable.

### Summary

* RAG pipelines evolve quickly — models, embeddings, and retrievers come and go.
* Valohai provides the stable, auditable foundation beneath that evolution.
* Every step (embed → retrieve → evaluate) is versioned, testable, and integrable with any provider.

### Learn More

* [GitHub: valohai/rag-doc-example](https://github.com/valohai/rag-doc-example)
