Pipelines: Chain Your Jobs
Connect your existing jobs into automated workflows. Define how data flows between steps and let Valohai handle the orchestration.
💡 Already have working steps? You're ready to build pipelines. Just define how outputs connect to inputs.
How Pipelines Work
A pipeline is a recipe for connecting jobs:
Nodes = Your jobs (preprocessing, training, evaluation, etc.)
Edges = Data / information flow (e.g. which outputs become which inputs)
When you run a pipeline, Valohai automatically:
Executes jobs in the right order
Passes outputs between steps into defined inputs
Handles parallel execution where possible
Tracks the complete lineage
Quick Example
Connect three existing steps into a pipeline:
- pipeline:
name: ml-workflow
nodes:
- name: preprocess
type: execution
step: preprocess-data
- name: train
type: execution
step: train-model
- name: evaluate
type: execution
step: evaluate-model
edges:
# Connect outputs → inputs
- [preprocess.outputs.*, train.inputs.dataset]
- [train.outputs.model*, evaluate.inputs.model]
- [preprocess.outputs.*test*, evaluate.inputs.test-data]Run it:
vh pipeline run ml-workflow --adhoc💡 If you have pushed the valohai.yaml to Git and fetched the commit to your Valohai project, you can omit the
--adhocflag.
Complete Example
Let's build a real pipeline with three steps:
1. Define Your Steps (if not already done)
- step:
name: preprocess-data
image: python:3.10
command:
- pip install -r requirements.txt
- python data-preprocess.py
- step:
name: train-model
image: tensorflow/tensorflow:2.6.0
command:
- python train_model.py
inputs:
- name: train-data
default: dataset://images/latest-train
parameters:
- name: epochs
default: 100
- step:
name: evaluate-model
image: tensorflow/tensorflow:2.6.0
command:
- python evaluate_model.py
inputs:
- name: test-data
default: dataset://images/latest-test
- name: model
default: datum://production-latest2. Connect as Pipeline
- pipeline:
name: training-pipeline
nodes:
- name: prep-node
type: execution
step: preprocess-data
- name: train-node
type: execution
step: train-model
override:
parameters:
- name: epochs
value: 200 # Override default
- name: eval-node
type: execution
step: evaluate-model
edges:
# Output wildcards → specific inputs
- [prep-node.outputs.*, train-node.inputs.train-data]
- [prep-node.outputs.*test*, eval-node.inputs.test-data]
- [train-node.outputs.*.h5, eval-node.inputs.model]Edge Patterns
Basic Output → Input
# All outputs from A go to B's 'data' input
- [nodeA.outputs.*, nodeB.inputs.data]Wildcard Matching
# Only .pkl files go to model input
- [train.outputs.*.pkl, inference.inputs.model]
# Multiple patterns
- [prep.outputs.*train*, train.inputs.train-data]
- [prep.outputs.*val*, train.inputs.val-data]Pass parameters and metrics between nodes
In addition to defining the edges via outputs and inputs, they can be also used to pass parameters between nodes.
# Pass parameter values between nodes
- [train-model.parameter.user-id, test-model.parameter.user-id]
# Pass metrics into parameters
- [train.metadata.best_lr, finetune.parameter.learning_rate]Multiple Targets
edges:
# One output goes to multiple nodes
- [preprocess.outputs.*, train.inputs.data]
- [preprocess.outputs.*, evaluate.inputs.data]Advanced Features
Conditional Execution
You can define specific conditions for pipeline nodes.
When: Actions trigger when certain events occur during pipeline execution. The available options include:
node-starting: When a node is about to start.node-complete: When a node successfully completes.node-error: When a node encounters an error.
If Condition: The condition to trigger the action can be based on either metric or a parameter value.
Then: Depending on the condition being met, you can take one of the following actions:
stop-pipeline: Halts the entire pipeline.require-approval: Pauses the pipeline until a user manually approves the previous results.
- pipeline:
name: conditional-workflow
nodes:
- name: train
type: execution
step: train-model
actions:
- when: node-complete
if: metadata.foo >= 0.8
then: stop-pipeline
- name: test-model
type: execution
step: test-model
actions:
- when: node-starting
then: require-approvalParallel Execution
In the example below the nodes train-model-a and train-model-b will run in parallel. The ensemble node will only start once both of them are finished.
nodes:
- name: train-model-a
type: execution
step: train
- name: train-model-b
type: execution
step: train
- name: ensemble
type: execution
step: combine-models
edges:
# Both training nodes run in parallel
- [train-model-a.outputs.*, ensemble.inputs.model-a]
- [train-model-b.outputs.*, ensemble.inputs.model-b]It is also possible to run Task nodes inside pipelines:
nodes:
# Other nodes omitted
- name: train
type: task
step: Train model (MNIST)
on-error: stop-all
override:
inputs:
- name: training-set-images
- name: training-set-labels
- name: test-set-images
- name: test-set-labels
edges:
# Other edges omitted
- [preprocess.output.*train-images*, train.input.training-set-images]
- [preprocess.output.*train-labels*, train.input.training-set-labels]
- [preprocess.output.*test-images*, train.input.test-set-images]
- [preprocess.output.*test-labels*, train.input.test-set-labels]Deployments
In addition to execution and Task nodes, it is possible to create deployments from pipelines.
nodes:
# Other nodes omitted
- name: deploy
type: deployment
deployment: deployment-test
actions:
- when: node-starting
then: require-approval
endpoints:
- predict-digit
edges:
# Other edges omitted
- [find-best-model.output.model*, deploy.file.predict-digit.model]
It is possible to create pipeline nodes after a deployment node. This can be used to for example check the endpoint once it has been created or clean old endpoints within the pipeline.
edges:
- [deploy.deployment.id, cleanup.parameters.deployment_id]
- [deploy.deployment.version_id, cleanup.parameters.version_id]Running Pipelines
From CLI
# Run with local code
vh pipeline run training-pipeline --adhoc
# Run from Git
vh pipeline run training-pipelineQuick Reference
Minimal Pipeline
- pipeline:
name: my-pipeline
nodes:
- name: step1
type: execution
step: my-step-1
- name: step2
type: execution
step: my-step-2
edges:
- [step1.outputs.*, step2.inputs.data]Edge Syntax
Sources:
node-name.output.*— All outputsnode-name.output.*.csv— Only CSV filesnode-name.output.name*— Starts with "name"node-name.metadata.accuracy— Metadata valuenode-name.parameter.learning_rate— Parameter valuedeploy.deployment.id/deploy.deployment.version_id— Deployment / deployment version id
Targets:
node-name.input.input-name— Any input available on the nodenode-name.parameter.learning_rate— Parameter valuedeploy.file.predict-digit.model— File for deployment nodes
Node Types
execution— Run a steptask— Run parameter sweepdeployment— Create endpoint
Bottom line: If your steps work individually, connecting them into a pipeline takes just a few lines of YAML.
Last updated
Was this helpful?
