# Pipelines: Chain Your Jobs

Connect your existing jobs into automated workflows. Define how data flows between steps and let Valohai handle the orchestration.

> 💡 **Already have working steps?** You're ready to build pipelines. Just define how outputs connect to inputs.

### How Pipelines Work

A pipeline is a recipe for connecting jobs:

* **Nodes** = Your jobs (preprocessing, training, evaluation, etc.)
* **Edges** = Data / information flow (e.g. which outputs become which inputs)

When you run a pipeline, Valohai automatically:

* Executes jobs in the right order
* Passes outputs between steps into defined inputs
* Handles parallel execution where possible
* Tracks the complete lineage

### Quick Example

Connect three existing steps into a pipeline:

```yaml
- pipeline:
    name: ml-workflow
    nodes:
      - name: preprocess
        type: execution
        step: preprocess-data
      - name: train
        type: execution
        step: train-model
      - name: evaluate
        type: execution
        step: evaluate-model
    edges:
      # Connect outputs → inputs
      - [preprocess.outputs.*, train.inputs.dataset]
      - [train.outputs.model*, evaluate.inputs.model]
      - [preprocess.outputs.*test*, evaluate.inputs.test-data]
```

Run it:

```shell
vh pipeline run ml-workflow --adhoc
```

> 💡 If you have pushed the valohai.yaml to Git and fetched the commit to your Valohai project, you can omit the `--adhoc` flag.

### Complete Example

Let's build a real pipeline with three steps:

#### 1. Define Your Steps (if not already done)

```yaml
- step:
    name: preprocess-data
    image: python:3.10
    command:
      - pip install -r requirements.txt
      - python data-preprocess.py

- step:
    name: train-model
    image: tensorflow/tensorflow:2.6.0
    command:
      - python train_model.py
    inputs:
      - name: train-data
        default: dataset://images/latest-train
    parameters:
      - name: epochs
        default: 100

- step:
    name: evaluate-model
    image: tensorflow/tensorflow:2.6.0
    command:
      - python evaluate_model.py
    inputs:
      - name: test-data
        default: dataset://images/latest-test
      - name: model
        default: datum://production-latest
```

#### 2. Connect as Pipeline

```yaml
- pipeline:
    name: training-pipeline
    nodes:
      - name: prep-node
        type: execution
        step: preprocess-data

      - name: train-node
        type: execution
        step: train-model
        override:
          parameters:
            - name: epochs
              value: 200  # Override default

      - name: eval-node
        type: execution
        step: evaluate-model

    edges:
      # Output wildcards → specific inputs
      - [prep-node.outputs.*, train-node.inputs.train-data]
      - [prep-node.outputs.*test*, eval-node.inputs.test-data]
      - [train-node.outputs.*.h5, eval-node.inputs.model]
```

<details>

<summary>Optional: Use the <code>valohai-utils</code> Python helper tool</summary>

Using `valohai-utils`, define pipelines in Python:

```python
# pipeline.py
from valohai import Pipeline


def main(config) -> Pipeline:
    # Create a pipeline called "utilspipeline".
    pipe = Pipeline(name="train-inference-pipeline", config=config)

    # Define the pipeline nodes.
    preprocess = pipe.execution("preprocess")
    train = pipe.execution("train-model")
    inference = pipe.execution("batch-inference")

    # Configure the pipeline, i.e. define the edges.
    preprocess.output("*").to(train.input("data"))
    preprocess.output("*.pkl").to(inference.input("model"))

    return pipe
```

Generate YAML:

```shell
vh yaml pipeline pipeline.py
```

</details>

### Edge Patterns

#### Basic Output → Input

```yaml
# All outputs from A go to B's 'data' input
- [nodeA.outputs.*, nodeB.inputs.data]
```

#### Wildcard Matching

```yaml
# Only .pkl files go to model input
- [train.outputs.*.pkl, inference.inputs.model]

# Multiple patterns
- [prep.outputs.*train*, train.inputs.train-data]
- [prep.outputs.*val*, train.inputs.val-data]
```

#### Pass parameters and metrics between nodes <a href="#article-title" id="article-title"></a>

In addition to defining the edges via outputs and inputs, they can be also used to pass parameters between nodes.

```yaml
 # Pass parameter values between nodes
- [train-model.parameter.user-id, test-model.parameter.user-id]

# Pass metrics into parameters
- [train.metadata.best_lr, finetune.parameter.learning_rate]
```

#### Multiple Targets

```yaml
edges:
  # One output goes to multiple nodes
  - [preprocess.outputs.*, train.inputs.data]
  - [preprocess.outputs.*, evaluate.inputs.data]
```

### Advanced Features

#### Conditional Execution

You can define specific conditions for pipeline nodes.

**When**: Actions trigger when certain events occur during pipeline execution. The available options include:

* `node-starting`: When a node is about to start.
* `node-complete`: When a node successfully completes.
* `node-error`: When a node encounters an error.

**If Condition**: The condition to trigger the action can be based on either metric or a parameter value.

**Then**: Depending on the condition being met, you can take one of the following actions:

* `stop-pipeline`: Halts the entire pipeline.
* `require-approval`: Pauses the pipeline until a user manually approves the previous results.

```yaml
- pipeline:
    name: conditional-workflow
    nodes:
      - name: train
        type: execution
        step: train-model
        actions:
          - when: node-complete
            if: metadata.foo >= 0.8
            then: stop-pipeline

      - name: test-model
        type: execution
        step: test-model
        actions:
          - when: node-starting
            then: require-approval
```

#### Parallel Execution

In the example below the nodes `train-model-a` and `train-model-b` will run in parallel. The `ensemble` node will only start once both of them are finished.

```yaml
nodes:
  - name: train-model-a
    type: execution
    step: train

  - name: train-model-b
    type: execution
    step: train

  - name: ensemble
    type: execution
    step: combine-models

edges:
  # Both training nodes run in parallel
  - [train-model-a.outputs.*, ensemble.inputs.model-a]
  - [train-model-b.outputs.*, ensemble.inputs.model-b]
```

It is also possible to run Task nodes inside pipelines:

```yaml
  nodes:
   # Other nodes omitted
      - name: train
        type: task
        step: Train model (MNIST)
        on-error: stop-all
        override:
          inputs:
              - name: training-set-images
              - name: training-set-labels
              - name: test-set-images
              - name: test-set-labels
  edges:
  # Other edges omitted
    - [preprocess.output.*train-images*, train.input.training-set-images]
    - [preprocess.output.*train-labels*, train.input.training-set-labels]
    - [preprocess.output.*test-images*, train.input.test-set-images]
    - [preprocess.output.*test-labels*, train.input.test-set-labels]
```

#### Deployments

In addition to execution and Task nodes, it is possible to create deployments from pipelines.

<pre class="language-yaml"><code class="lang-yaml">nodes:
  # Other nodes omitted
  - name: deploy
    type: deployment
    deployment: deployment-test
    actions:
      - when: node-starting
        then: require-approval
    endpoints:
      - predict-digit

<strong>edges:
</strong><strong>    # Other edges omitted
</strong>    - [find-best-model.output.model*, deploy.file.predict-digit.model]

</code></pre>

It is possible to create pipeline nodes after a deployment node. This can be used to for example check the endpoint once it has been created or clean old endpoints within the pipeline.

```yaml
    edges:
      - [deploy.deployment.id, cleanup.parameters.deployment_id]
      - [deploy.deployment.version_id, cleanup.parameters.version_id]
```

### Running Pipelines

#### From CLI

```shell
# Run with local code
vh pipeline run training-pipeline --adhoc

# Run from Git
vh pipeline run training-pipeline
```

### Quick Reference

#### Minimal Pipeline

```yaml
- pipeline:
    name: my-pipeline
    nodes:
      - name: step1
        type: execution
        step: my-step-1
      - name: step2
        type: execution
        step: my-step-2
    edges:
      - [step1.outputs.*, step2.inputs.data]
```

#### Edge Syntax

Sources:

* `node-name.output.*` — All outputs
* `node-name.output.*.csv` — Only CSV files
* `node-name.output.name*` — Starts with "name"
* `node-name.metadata.accuracy` — Metadata value
* `node-name.parameter.learning_rate` — Parameter value
* `deploy.deployment.id` / `deploy.deployment.version_id` — Deployment / deployment version id

Targets:

* `node-name.input.input-name` — Any input available on the node
* `node-name.parameter.learning_rate` — Parameter value
* `deploy.file.predict-digit.model` — File for deployment nodes

#### Node Types

* `execution` — Run a step
* `task` — Run parameter sweep
* `deployment` — Create endpoint

***

**Bottom line:** If your steps work individually, connecting them into a pipeline takes just a few lines of YAML.


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.valohai.com/migration-strategy/migrate-pipeline-yaml.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
