RAG and Context Pipelines in Valohai
Learn how to build and evaluate Retrieval-Augmented Generation (RAG) pipelines in Valohai using embeddings, vector databases, and LLMs like GPT, Claude, and Llama.
Retrieval-Augmented Generation (RAG) systems combine vector search with large language models to produce grounded, explainable answers. Valohai provides the reproducible system behind these fast-moving architectures, so you can plug in new models, retrievers, or vector databases without breaking your workflow.
What is RAG and why it matters
RAG enhances large language models by retrieving factual context from your own data. Instead of relying solely on the model’s training corpus, a RAG system:
Embeds your documents into vectors,
Retrieves relevant chunks for each query,
Generates grounded responses, and
Evaluates factuality and relevance.
In Valohai, each stage becomes a versioned step in a reproducible pipeline — so as the ecosystem evolves (OpenAI → Anthropic → Meta → etc.), you can swap tools but keep lineage intact.
💡 In a fast-moving GenAI landscape, your retrieval or model provider may change — your reproducible pipeline shouldn’t.
RAG pipeline architecture in Valohai
A simple RAG workflow maps naturally onto a Valohai pipeline:
ingest → embed → retrieve → generate → evaluateEach component runs as a step and you can replace or parallelize any of them independently.
Embedding step
The embedding step converts documents into vectors and saves them as a versioned dataset.
- step: embed-documents
image: python:3.10
command:
- python src/embed.py --input {inputs.docs} --output /valohai/outputs/vectors/
inputs:
- name: docs
default: dataset://documentation-corpus/v1
parameters:
- name: embedding_model
type: string
default: "text-embedding-3-large"Example embed.py
embed.pyimport os, json, openai, numpy as np
from pathlib import Path
input_file = "/valohai/inputs/docs/data.jsonl"
model = os.getenv("EMBEDDING_MODEL", "text-embedding-3-large")
with open(input_file) as f:
docs = [json.loads(line)["text"] for line in f]
vectors = [
openai.embeddings.create(model=model, input=chunk)["data"][0]["embedding"]
for chunk in docs
]
np.save("/valohai/outputs/vectors.npy", vectors)
print("Saved vector embeddings.")Secrets: Store your API keys (e.g.,
OPENAI_API_KEY) as Valohai secrets in your project settings, they’ll be injected automatically at runtime without exposing them in code or YAML.
Each new run produces a new dataset version (rag-vectors/v2, v3, …), ensuring you can trace which document snapshot produced which embeddings.
Retrieval and generation step
Retrieve relevant chunks and generate responses using different LLM providers.
- step: query-rag
image: python:3.10
command:
- python src/query.py
inputs:
- name: vectors
default: dataset://rag-vectors/v3
parameters:
- name: provider
default: "openai" # openai | anthropic | meta
type: string
- name: model
default: "gpt-4-turbo"
type: string
- name: query
default: "Explain what a Valohai pipeline is"
type: stringExample query.py
query.pyimport os, json
from pathlib import Path
import openai, anthropic
from llamaapi import Llama
provider = os.getenv("PROVIDER", "openai")
model = os.getenv("MODEL", "gpt-4-turbo")
query = os.getenv("QUERY", "What is a pipeline?")
context = "retrieved text chunks..." # retrieved from your vector index
if provider == "openai":
response = openai.ChatCompletion.create(
model=model, messages=[{"role": "user", "content": f"{context}\n\n{query}"}]
)
answer = response["choices"][0]["message"]["content"]
elif provider == "anthropic":
client = anthropic.Anthropic()
answer = client.messages.create(
model=model, messages=[{"role": "user", "content": f"{context}\n\n{query}"}]
).content[0].text
elif provider == "meta":
llama = Llama(api_key=os.getenv("LLAMA_API_KEY"))
answer = llama.generate(prompt=f"{context}\n\n{query}")
Path("/valohai/outputs/responses.json").write_text(json.dumps({"query": query, "answer": answer}))
print(f"Saved answer from {provider}:{model}")Run the same pipeline against multiple providers to benchmark quality, latency, and cost.
Evaluation step
Use evaluation steps to track both retrieval and generation performance.
- step: evaluate-rag
image: python:3.10
command:
- python src/evaluate.py --responses {inputs.responses}
inputs:
- name: responses
default: dataset://rag-responses/v3Example evaluate.py
evaluate.pyimport numpy as np, openai, json, os, vh
data = json.load(open("/valohai/inputs/responses/responses.json"))
# 1. Retrieval quality (Recall@K)
recall = np.mean([len(set(d["retrieved"]) & set(d["gold"])) / len(d["gold"]) for d in data])
vh.metadata.set("recall_at_k", recall)
# 2. Factuality via LLM-as-a-judge
prompt = f"Rate factual accuracy of this answer (1–5): {data[0]['answer']}"
score = openai.ChatCompletion.create(model="gpt-4-turbo", messages=[{"role":"user","content":prompt}])
vh.metadata.set("factuality_score", score["choices"][0]["message"]["content"])
# 3. Cost and latency (pseudo)
vh.metadata.set("avg_latency", 1.2, "avg_cost_usd", 0.004)
print("Metrics logged to execution metadata.")Evaluate both sides of RAG:
Retrieval metrics (Recall@K, MRR)
Generation metrics (Factuality, BLEU, ROUGE, or GPT-judge)
Operational metrics (Latency, Cost)
You can automate evaluations whenever:
A new document dataset is uploaded
A new embedding model is tested
A new provider (e.g., Claude or Llama) is added
Human approval and comparison
Add a pause for human approval after automated evaluation:
- pipeline:
name: evaluate
nodes:
- name: evaluate
type: execution
step: evaluate
- name: promote-model
type: execution
step: promote-model
actions:
- when: node-starting
then: require-approval
edges:
- [evaluate.outputs.*, aggregate-results.inputs.results]Review model outputs manually or through your own interface, then approve to continue. All human decisions are logged in the pipeline’s audit trail.
Use the Model Catalog to compare:
OpenAI
GPT-4-Turbo
0.89
4.6
0.004
✅
Anthropic
Claude 3.5
0.87
4.8
0.003
✅
Meta
Llama-3-70B
0.84
4.2
0.001
🔄 re-test
Integrations and triggers
RAG pipelines often connect multiple external tools. Valohai integrates easily with them through webhooks, notifications, and the REST API.
Webhooks: Re-run embedding pipelines automatically when your document corpus updates.
Notifications: Post evaluation summaries to Slack or Teams for reviewers.
REST API: Programmatically launch, track, or fetch the latest RAG model metrics.
All retrievals, embeddings, and evaluations stay reproducible, observable, and interchangeable.
Summary
RAG pipelines evolve quickly — models, embeddings, and retrievers come and go.
Valohai provides the stable, auditable foundation beneath that evolution.
Every step (embed → retrieve → evaluate) is versioned, testable, and integrable with any provider.
Learn More
Last updated
Was this helpful?
