RAG and Context Pipelines in Valohai

Learn how to build and evaluate Retrieval-Augmented Generation (RAG) pipelines in Valohai using embeddings, vector databases, and LLMs like GPT, Claude, and Llama.

Retrieval-Augmented Generation (RAG) systems combine vector search with large language models to produce grounded, explainable answers. Valohai provides the reproducible system behind these fast-moving architectures, so you can plug in new models, retrievers, or vector databases without breaking your workflow.

What is RAG and why it matters

RAG enhances large language models by retrieving factual context from your own data. Instead of relying solely on the model’s training corpus, a RAG system:

  1. Embeds your documents into vectors,

  2. Retrieves relevant chunks for each query,

  3. Generates grounded responses, and

  4. Evaluates factuality and relevance.

In Valohai, each stage becomes a versioned step in a reproducible pipeline — so as the ecosystem evolves (OpenAI → Anthropic → Meta → etc.), you can swap tools but keep lineage intact.

💡 In a fast-moving GenAI landscape, your retrieval or model provider may change — your reproducible pipeline shouldn’t.

RAG pipeline architecture in Valohai

A simple RAG workflow maps naturally onto a Valohai pipeline:

ingest → embed → retrieve → generate → evaluate

Each component runs as a step and you can replace or parallelize any of them independently.

Embedding step

The embedding step converts documents into vectors and saves them as a versioned dataset.

- step: embed-documents
  image: python:3.10
  command:
    - python src/embed.py --input {inputs.docs} --output /valohai/outputs/vectors/
  inputs:
    - name: docs
      default: dataset://documentation-corpus/v1
  parameters:
    - name: embedding_model
      type: string
      default: "text-embedding-3-large"

Example embed.py

import os, json, openai, numpy as np
from pathlib import Path

input_file = "/valohai/inputs/docs/data.jsonl"
model = os.getenv("EMBEDDING_MODEL", "text-embedding-3-large")

with open(input_file) as f:
    docs = [json.loads(line)["text"] for line in f]

vectors = [
    openai.embeddings.create(model=model, input=chunk)["data"][0]["embedding"]
    for chunk in docs
]
np.save("/valohai/outputs/vectors.npy", vectors)
print("Saved vector embeddings.")

Secrets: Store your API keys (e.g., OPENAI_API_KEY) as Valohai secrets in your project settings, they’ll be injected automatically at runtime without exposing them in code or YAML.

Each new run produces a new dataset version (rag-vectors/v2, v3, …), ensuring you can trace which document snapshot produced which embeddings.

Retrieval and generation step

Retrieve relevant chunks and generate responses using different LLM providers.

- step: query-rag
  image: python:3.10
  command:
    - python src/query.py
  inputs:
    - name: vectors
      default: dataset://rag-vectors/v3
  parameters:
    - name: provider
      default: "openai"  # openai | anthropic | meta
      type: string
    - name: model
      default: "gpt-4-turbo"
      type: string
    - name: query
      default: "Explain what a Valohai pipeline is"
      type: string

Example query.py

import os, json
from pathlib import Path
import openai, anthropic
from llamaapi import Llama

provider = os.getenv("PROVIDER", "openai")
model = os.getenv("MODEL", "gpt-4-turbo")

query = os.getenv("QUERY", "What is a pipeline?")
context = "retrieved text chunks..."  # retrieved from your vector index

if provider == "openai":
    response = openai.ChatCompletion.create(
        model=model, messages=[{"role": "user", "content": f"{context}\n\n{query}"}]
    )
    answer = response["choices"][0]["message"]["content"]

elif provider == "anthropic":
    client = anthropic.Anthropic()
    answer = client.messages.create(
        model=model, messages=[{"role": "user", "content": f"{context}\n\n{query}"}]
    ).content[0].text

elif provider == "meta":
    llama = Llama(api_key=os.getenv("LLAMA_API_KEY"))
    answer = llama.generate(prompt=f"{context}\n\n{query}")

Path("/valohai/outputs/responses.json").write_text(json.dumps({"query": query, "answer": answer}))
print(f"Saved answer from {provider}:{model}")

Run the same pipeline against multiple providers to benchmark quality, latency, and cost.

Evaluation step

Use evaluation steps to track both retrieval and generation performance.

- step: evaluate-rag
  image: python:3.10
  command:
    - python src/evaluate.py --responses {inputs.responses}
  inputs:
    - name: responses
      default: dataset://rag-responses/v3

Example evaluate.py

import numpy as np, openai, json, os, vh

data = json.load(open("/valohai/inputs/responses/responses.json"))

# 1. Retrieval quality (Recall@K)
recall = np.mean([len(set(d["retrieved"]) & set(d["gold"])) / len(d["gold"]) for d in data])
vh.metadata.set("recall_at_k", recall)

# 2. Factuality via LLM-as-a-judge
prompt = f"Rate factual accuracy of this answer (1–5): {data[0]['answer']}"
score = openai.ChatCompletion.create(model="gpt-4-turbo", messages=[{"role":"user","content":prompt}])
vh.metadata.set("factuality_score", score["choices"][0]["message"]["content"])

# 3. Cost and latency (pseudo)
vh.metadata.set("avg_latency", 1.2, "avg_cost_usd", 0.004)

print("Metrics logged to execution metadata.")

Evaluate both sides of RAG:

  • Retrieval metrics (Recall@K, MRR)

  • Generation metrics (Factuality, BLEU, ROUGE, or GPT-judge)

  • Operational metrics (Latency, Cost)

You can automate evaluations whenever:

  • A new document dataset is uploaded

  • A new embedding model is tested

  • A new provider (e.g., Claude or Llama) is added

Human approval and comparison

Add a pause for human approval after automated evaluation:

- pipeline:
    name: evaluate
    nodes:
      - name: evaluate
        type: execution
        step: evaluate
      - name: promote-model
        type: execution
        step: promote-model
        actions:
          - when: node-starting
            then: require-approval
    edges:
    - [evaluate.outputs.*, aggregate-results.inputs.results]

Review model outputs manually or through your own interface, then approve to continue. All human decisions are logged in the pipeline’s audit trail.

Use the Model Catalog to compare:

Provider
Model
Recall@K
Factuality
Cost (USD)
Decision

OpenAI

GPT-4-Turbo

0.89

4.6

0.004

Anthropic

Claude 3.5

0.87

4.8

0.003

Meta

Llama-3-70B

0.84

4.2

0.001

🔄 re-test

Integrations and triggers

RAG pipelines often connect multiple external tools. Valohai integrates easily with them through webhooks, notifications, and the REST API.

  • Webhooks: Re-run embedding pipelines automatically when your document corpus updates.

  • Notifications: Post evaluation summaries to Slack or Teams for reviewers.

  • REST API: Programmatically launch, track, or fetch the latest RAG model metrics.

All retrievals, embeddings, and evaluations stay reproducible, observable, and interchangeable.

Summary

  • RAG pipelines evolve quickly — models, embeddings, and retrievers come and go.

  • Valohai provides the stable, auditable foundation beneath that evolution.

  • Every step (embed → retrieve → evaluate) is versioned, testable, and integrable with any provider.

Learn More

Last updated

Was this helpful?