# RAG and Context Pipelines in Valohai

Retrieval-Augmented Generation (RAG) systems combine vector search with large language models to produce grounded, explainable answers. Valohai provides the reproducible system behind these fast-moving architectures, so you can plug in new models, retrievers, or vector databases without breaking your workflow.

### What is RAG and why it matters

RAG enhances large language models by retrieving factual context from your own data.\
Instead of relying solely on the model's training corpus, a RAG system:

1. **Embeds** your documents into vectors,
2. **Retrieves** relevant chunks for each query,
3. **Generates** grounded responses, and
4. **Evaluates** factuality and relevance.

In Valohai, each stage becomes a versioned step in a reproducible pipeline —\
so as the ecosystem evolves (OpenAI → Anthropic → Meta → etc.), you can swap tools but keep lineage intact.

> 💡 *In a fast-moving GenAI landscape, your retrieval or model provider may change — your reproducible pipeline shouldn't.*

### RAG pipeline architecture in Valohai

A simple RAG workflow maps naturally onto a Valohai pipeline:

```
ingest → embed → retrieve → generate → evaluate
```

Each component runs as a step and you can replace or parallelize any of them independently.

### Embedding step

The embedding step converts documents into vectors and saves them as a versioned dataset.

```yaml
- step: embed-documents
  image: python:3.10
  command:
    - python src/embed.py --input {inputs.docs} --output /valohai/outputs/vectors/
  inputs:
    - name: docs
      default: dataset://documentation-corpus/v1
  parameters:
    - name: embedding_model
      type: string
      default: "text-embedding-3-large"
```

#### Example `embed.py`

```python
import os, json, openai, numpy as np
from pathlib import Path

input_file = "/valohai/inputs/docs/data.jsonl"
model = os.getenv("EMBEDDING_MODEL", "text-embedding-3-large")

with open(input_file) as f:
    docs = [json.loads(line)["text"] for line in f]

vectors = [openai.embeddings.create(model=model, input=chunk)["data"][0]["embedding"] for chunk in docs]
np.save("/valohai/outputs/vectors.npy", vectors)
print("Saved vector embeddings.")
```

> **Secrets:**\
> Store your API keys (e.g., `OPENAI_API_KEY`) as Valohai secrets in your project settings, they'll be injected automatically at runtime without exposing them in code or YAML.

Each new run produces a new dataset version (`rag-vectors/v2`, `v3`, …), ensuring you can trace which document snapshot produced which embeddings.

### Retrieval and generation step

Retrieve relevant chunks and generate responses using different LLM providers.

```yaml
- step: query-rag
  image: python:3.10
  command:
    - python src/query.py
  inputs:
    - name: vectors
      default: dataset://rag-vectors/v3
  parameters:
    - name: provider
      default: "openai"  # openai | anthropic | meta
      type: string
    - name: model
      default: "gpt-4-turbo"
      type: string
    - name: query
      default: "Explain what a Valohai pipeline is"
      type: string
```

#### Example `query.py`

```python
import os, json
from pathlib import Path
import openai, anthropic
from llamaapi import Llama

provider = os.getenv("PROVIDER", "openai")
model = os.getenv("MODEL", "gpt-4-turbo")

query = os.getenv("QUERY", "What is a pipeline?")
context = "retrieved text chunks..."  # retrieved from your vector index

if provider == "openai":
    response = openai.ChatCompletion.create(
        model=model,
        messages=[{"role": "user", "content": f"{context}\n\n{query}"}],
    )
    answer = response["choices"][0]["message"]["content"]

elif provider == "anthropic":
    client = anthropic.Anthropic()
    answer = (
        client.messages.create(
            model=model,
            messages=[{"role": "user", "content": f"{context}\n\n{query}"}],
        )
        .content[0]
        .text
    )

elif provider == "meta":
    llama = Llama(api_key=os.getenv("LLAMA_API_KEY"))
    answer = llama.generate(prompt=f"{context}\n\n{query}")

Path("/valohai/outputs/responses.json").write_text(json.dumps({"query": query, "answer": answer}))
print(f"Saved answer from {provider}:{model}")
```

Run the same pipeline against multiple providers to benchmark quality, latency, and cost.

### Evaluation step

Use evaluation steps to track both retrieval and generation performance.

```yaml
- step: evaluate-rag
  image: python:3.10
  command:
    - python src/evaluate.py --responses {inputs.responses}
  inputs:
    - name: responses
      default: dataset://rag-responses/v3
```

#### Example `evaluate.py`

```python
import numpy as np, openai, json, os, vh

data = json.load(open("/valohai/inputs/responses/responses.json"))

# 1. Retrieval quality (Recall@K)
recall = np.mean([len(set(d["retrieved"]) & set(d["gold"])) / len(d["gold"]) for d in data])
vh.metadata.set("recall_at_k", recall)

# 2. Factuality via LLM-as-a-judge
prompt = f"Rate factual accuracy of this answer (1–5): {data[0]['answer']}"
score = openai.ChatCompletion.create(model="gpt-4-turbo", messages=[{"role": "user", "content": prompt}])
vh.metadata.set("factuality_score", score["choices"][0]["message"]["content"])

# 3. Cost and latency (pseudo)
vh.metadata.set("avg_latency", 1.2, "avg_cost_usd", 0.004)

print("Metrics logged to execution metadata.")
```

Evaluate both sides of RAG:

* **Retrieval metrics** (Recall\@K, MRR)
* **Generation metrics** (Factuality, BLEU, ROUGE, or GPT-judge)
* **Operational metrics** (Latency, Cost)

You can automate evaluations whenever:

* A new document dataset is uploaded
* A new embedding model is tested
* A new provider (e.g., Claude or Llama) is added

### Human approval and comparison

Add a pause for human approval after automated evaluation:

```yaml
- pipeline:
    name: evaluate
    nodes:
      - name: evaluate
        type: execution
        step: evaluate
      - name: promote-model
        type: execution
        step: promote-model
        actions:
          - when: node-starting
            then: require-approval
    edges:
    - [evaluate.outputs.*, aggregate-results.inputs.results]
```

Review model outputs manually or through your own interface, then approve to continue.\
All human decisions are logged in the pipeline's audit trail.

Use the **Model Catalog** to compare:

| Provider  | Model       | Recall\@K | Factuality | Cost (USD) | Decision   |
| --------- | ----------- | --------- | ---------- | ---------- | ---------- |
| OpenAI    | GPT-4-Turbo | 0.89      | 4.6        | 0.004      | ✅          |
| Anthropic | Claude 3.5  | 0.87      | 4.8        | 0.003      | ✅          |
| Meta      | Llama-3-70B | 0.84      | 4.2        | 0.001      | 🔄 re-test |

### Integrations and triggers

RAG pipelines often connect multiple external tools.\
Valohai integrates easily with them through webhooks, notifications, and the REST API.

* **Webhooks:** Re-run embedding pipelines automatically when your document corpus updates.
* **Notifications:** Post evaluation summaries to Slack or Teams for reviewers.
* **REST API:** Programmatically launch, track, or fetch the latest RAG model metrics.

> All retrievals, embeddings, and evaluations stay reproducible, observable, and interchangeable.

### Summary

* RAG pipelines evolve quickly — models, embeddings, and retrievers come and go.
* Valohai provides the stable, auditable foundation beneath that evolution.
* Every step (embed → retrieve → evaluate) is versioned, testable, and integrable with any provider.

### Learn More

* [GitHub: valohai/rag-doc-example](https://github.com/valohai/rag-doc-example)


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.valohai.com/genai/rag-context-pipelines.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
