Pipelines: Chain Your Jobs

Connect your existing jobs into automated workflows. Define how data flows between steps and let Valohai handle the orchestration.

💡 Already have working steps? You're ready to build pipelines. Just define how outputs connect to inputs.

How Pipelines Work

A pipeline is a recipe for connecting jobs:

  • Nodes = Your jobs (preprocessing, training, evaluation, etc.)

  • Edges = Data / information flow (e.g. which outputs become which inputs)

When you run a pipeline, Valohai automatically:

  • Executes jobs in the right order

  • Passes outputs between steps into defined inputs

  • Handles parallel execution where possible

  • Tracks the complete lineage

Quick Example

Connect three existing steps into a pipeline:

- pipeline:
    name: ml-workflow
    nodes:
      - name: preprocess
        type: execution
        step: preprocess-data
      - name: train
        type: execution
        step: train-model
      - name: evaluate
        type: execution
        step: evaluate-model
    edges:
      # Connect outputs → inputs
      - [preprocess.outputs.*, train.inputs.dataset]
      - [train.outputs.model*, evaluate.inputs.model]
      - [preprocess.outputs.*test*, evaluate.inputs.test-data]

Run it:

vh pipeline run ml-workflow --adhoc

💡 If you have pushed the valohai.yaml to Git and fetched the commit to your Valohai project, you can omit the --adhoc flag.

Complete Example

Let's build a real pipeline with three steps:

1. Define Your Steps (if not already done)

- step:
    name: preprocess-data
    image: python:3.10
    command:
      - pip install -r requirements.txt
      - python data-preprocess.py

- step:
    name: train-model
    image: tensorflow/tensorflow:2.6.0
    command:
      - python train_model.py
    inputs:
      - name: train-data
        default: dataset://images/latest-train
    parameters:
      - name: epochs
        default: 100

- step:
    name: evaluate-model
    image: tensorflow/tensorflow:2.6.0
    command:
      - python evaluate_model.py
    inputs:
      - name: test-data
        default: dataset://images/latest-test
      - name: model
        default: datum://production-latest

2. Connect as Pipeline

- pipeline:
    name: training-pipeline
    nodes:
      - name: prep-node
        type: execution
        step: preprocess-data
      
      - name: train-node
        type: execution
        step: train-model
        override:
          parameters:
            - name: epochs
              value: 200  # Override default
      
      - name: eval-node
        type: execution  
        step: evaluate-model
    
    edges:
      # Output wildcards → specific inputs
      - [prep-node.outputs.*, train-node.inputs.train-data]
      - [prep-node.outputs.*test*, eval-node.inputs.test-data]
      - [train-node.outputs.*.h5, eval-node.inputs.model]
Optional: Use the valohai-utils Python helper tool

Using valohai-utils, define pipelines in Python:

# pipeline.py
from valohai import Pipeline

def main(config) -> Pipeline:

    #Create a pipeline called "utilspipeline".
    pipe = Pipeline(name="train-inference-pipeline", config=config)

    # Define the pipeline nodes.
    preprocess = pipe.execution("preprocess")
    train = pipe.execution("train-model")
    inference = pipe.execution("batch-inference")

    # Configure the pipeline, i.e. define the edges.
    preprocess.output("*").to(train.input("data"))
    preprocess.output("*.pkl").to(inference.input("model"))

    return pipe

Generate YAML:

vh yaml pipeline pipeline.py

Edge Patterns

Basic Output → Input

# All outputs from A go to B's 'data' input
- [nodeA.outputs.*, nodeB.inputs.data]

Wildcard Matching

# Only .pkl files go to model input
- [train.outputs.*.pkl, inference.inputs.model]

# Multiple patterns
- [prep.outputs.*train*, train.inputs.train-data]
- [prep.outputs.*val*, train.inputs.val-data]

Pass parameters and metrics between nodes

In addition to defining the edges via outputs and inputs, they can be also used to pass parameters between nodes.

 # Pass parameter values between nodes
- [train-model.parameter.user-id, test-model.parameter.user-id]

# Pass metrics into parameters
- [train.metadata.best_lr, finetune.parameter.learning_rate]

Multiple Targets

edges:
  # One output goes to multiple nodes
  - [preprocess.outputs.*, train.inputs.data]
  - [preprocess.outputs.*, evaluate.inputs.data]

Advanced Features

Conditional Execution

You can define specific conditions for pipeline nodes.

When: Actions trigger when certain events occur during pipeline execution. The available options include:

  • node-starting: When a node is about to start.

  • node-complete: When a node successfully completes.

  • node-error: When a node encounters an error.

If Condition: The condition to trigger the action can be based on either metric or a parameter value.

Then: Depending on the condition being met, you can take one of the following actions:

  • stop-pipeline: Halts the entire pipeline.

  • require-approval: Pauses the pipeline until a user manually approves the previous results.

- pipeline:
    name: conditional-workflow
    nodes:
      - name: train
        type: execution
        step: train-model
        actions:
          - when: node-complete
            if: metadata.foo >= 0.8
            then: stop-pipeline
      
      - name: test-model
        type: execution
        step: test-model
        actions:
          - when: node-starting
            then: require-approval

Parallel Execution

In the example below the nodes train-model-a and train-model-b will run in parallel. The ensemble node will only start once both of them are finished.

nodes:
  - name: train-model-a
    type: execution
    step: train
  
  - name: train-model-b  
    type: execution
    step: train
  
  - name: ensemble
    type: execution
    step: combine-models

edges:
  # Both training nodes run in parallel
  - [train-model-a.outputs.*, ensemble.inputs.model-a]
  - [train-model-b.outputs.*, ensemble.inputs.model-b]

It is also possible to run Task nodes inside pipelines:

  nodes:    
   # Other nodes omitted
      - name: train
        type: task
        step: Train model (MNIST)
        on-error: stop-all
        override:
          inputs:
              - name: training-set-images
              - name: training-set-labels
              - name: test-set-images
              - name: test-set-labels
  edges:
  # Other edges omitted
    - [preprocess.output.*train-images*, train.input.training-set-images]
    - [preprocess.output.*train-labels*, train.input.training-set-labels]
    - [preprocess.output.*test-images*, train.input.test-set-images]
    - [preprocess.output.*test-labels*, train.input.test-set-labels]

Deployments

In addition to execution and Task nodes, it is possible to create deployments from pipelines.

nodes:  
  # Other nodes omitted
  - name: deploy
    type: deployment
    deployment: deployment-test
    actions:
      - when: node-starting
        then: require-approval
    endpoints:
      - predict-digit
    
edges:
    # Other edges omitted
    - [find-best-model.output.model*, deploy.file.predict-digit.model]

It is possible to create pipeline nodes after a deployment node. This can be used to for example check the endpoint once it has been created or clean old endpoints within the pipeline.

    edges:
      - [deploy.deployment.id, cleanup.parameters.deployment_id]
      - [deploy.deployment.version_id, cleanup.parameters.version_id]

Running Pipelines

From CLI

# Run with local code
vh pipeline run training-pipeline --adhoc

# Run from Git
vh pipeline run training-pipeline

Quick Reference

Minimal Pipeline

- pipeline:
    name: my-pipeline
    nodes:
      - name: step1
        type: execution
        step: my-step-1
      - name: step2
        type: execution
        step: my-step-2
    edges:
      - [step1.outputs.*, step2.inputs.data]

Edge Syntax

Sources:

  • node-name.output.* — All outputs

  • node-name.output.*.csv — Only CSV files

  • node-name.output.name* — Starts with "name"

  • node-name.metadata.accuracy — Metadata value

  • node-name.parameter.learning_rate — Parameter value

  • deploy.deployment.id / deploy.deployment.version_id — Deployment / deployment version id

Targets:

  • node-name.input.input-name — Any input available on the node

  • node-name.parameter.learning_rate — Parameter value

  • deploy.file.predict-digit.model — File for deployment nodes

Node Types

  • execution — Run a step

  • task — Run parameter sweep

  • deployment — Create endpoint


Bottom line: If your steps work individually, connecting them into a pipeline takes just a few lines of YAML.

Last updated

Was this helpful?