Pass Data Between Pipeline Nodes

Pipeline edges define how data flows between nodes. This guide covers the three types of connections: file outputs, parameters, and metadata.

File outputs to inputs

The most common pipeline pattern: one node produces files, the next consumes them.

Basic file passing

- step:
    name: train-model
    image: tensorflow/tensorflow:2.4.1
    command: python train.py

- step:
    name: test-model
    image: tensorflow/tensorflow:2.4.1
    command: python test.py
    inputs:
      - name: model

- pipeline:
    name: training-pipeline
    nodes:
      - name: train-model
        step: train-model
        type: execution
      - name: test-model
        step: test-model
        type: execution
    edges:
    - [train-model.output.model.pkl, test-model.input.model]

The train-model step saves model.pkl to /valohai/outputs/. The edge passes this file to test-model as its model input where it'll be available at /valohai/inputs/model/model.pkl

You can also use files from subdirectories in the edges. For example, if you have a subdirectory called results and a file model.pkl in it, i.e. path /valohai/outputs/results/model.pkl, the edge would look like this:

    edges:
    - [train-model.output.results/model.pkl, test-model.input.model]

💡 Use wildcards to pass multiple files: train-model.output.*.pkl passes all pickle files. For files under a subdirectory called results: train-model.output.results/*.pkl

Edge merge modes

Control how edges interact with default inputs using edge-merge-mode:

- step:
    name: train-model
    image: tensorflow/tensorflow:2.6.0
    command: python train.py
    inputs:
      - name: preprocessed_dataset
        default: s3://mybucket/preprocessed_data/*

- pipeline:
    name: merge-example
    nodes:
      - name: preprocess
        step: preprocess
        type: execution
      - name: train-model
        step: train-model
        type: execution
        edge-merge-mode: append  # Add edge files to defaults
    edges:
    - [preprocess.output.*, train-model.input.preprocessed_dataset]

Merge modes:

  • replace (default): Edge data replaces default inputs

  • append: Edge data adds to default inputs

Parameter passing

Share configuration values between nodes without modifying code.

Static parameter passing

Pass a parameter value from one node to another:

- step:
    name: train-model
    image: tensorflow/tensorflow:2.4.1
    command: python train.py {parameters}
    parameters:
      - name: user-id
        default: 345345
        type: integer

- step:
    name: test-model
    image: tensorflow/tensorflow:2.4.1
    command: python test.py {parameters}
    parameters:
      - name: user-id
        default: 3
        type: integer

- pipeline:
    name: parameter-pipeline
    nodes:
    - name: train-model
      step: train-model
      type: execution
    - name: test-model
      step: test-model
      type: execution
    edges:
    - [train-model.parameter.user-id, test-model.parameter.user-id]

The test-model node inherits the user-id value from train-model.

Metadata to parameters

Use runtime-generated values to configure downstream nodes.

Single value metadata

Generate metadata in your code:

import json

# Find optimal learning rate during training
optimal_lr = 0.0003
print(json.dumps({"learning_rate": optimal_lr}))

Pass it to the next node:

- pipeline:
    name: dynamic-parameters
    nodes:
    - name: hyperparameter-search
      step: find-best-params
      type: execution
    - name: final-training
      step: train-model
      type: execution
    edges:
    - [hyperparameter-search.metadata.learning_rate, final-training.parameter.lr]

Multi-value metadata for tasks

Generate multiple values to create parallel task executions:

# Generate user IDs for parallel processing
user_ids = [463, 674, 888, 233]
print(json.dumps({"user": user_ids}))
- pipeline:
    name: parallel-processing
    nodes:
    - name: generate-ids
      step: prepare-data
      type: execution
    - name: process-users
      step: process-user
      type: task  # Creates parallel executions
    edges:
    - [generate-ids.metadata.user, process-users.parameter.user-id]

💡 Tasks create one execution per value in the metadata list. With 4 user IDs, you get 4 parallel executions.

For multi-dimensional parameters:

# Each sublist becomes one execution's parameters
configs = [[204, 302], [593, 120]]
print(json.dumps({"params": configs}))

Common issues and fixes

Parameter not passed

Symptom: Downstream node uses default value instead of edge value

Fix: Verify parameter names match exactly between edge definition and step parameters:

# Wrong - names don't match
edges:
- [node1.parameter.userId, node2.parameter.user-id]
# Correct - exact match
edges:
- [node1.parameter.user-id, node2.parameter.user-id]
Outputs not available

Error: FileNotFoundError: /valohai/inputs/model/result.csv

Fix: Ensure the upstream node saves files to /valohai/outputs/:

# Wrong - local directory
model.save("output/model.pkl")

# Correct - Valohai outputs
model.save("/valohai/outputs/model.pkl")
Conditional output handling

Issue: Optional outputs cause downstream failures

Fix: Add existence checks in consuming nodes:

import os

model_path = "/valohai/inputs/model/model.pkl"
if os.path.exists(model_path):
    model = load_model(model_path)
else:
    print("No model provided, using defaults")
    model = create_default_model()
Metadata not recognized

Symptom: Metadata edge doesn't populate parameter

Fix: Ensure metadata is valid JSON printed to stdout:

# Wrong - not JSON
print(f"Best accuracy: {accuracy}")

# Correct - proper JSON
print(json.dumps({"accuracy": accuracy}))

Best practices

  1. Name outputs descriptively: Use model.pkl instead of output.pkl

  2. Validate inputs exist: Always check for file existence in consuming nodes

  3. Log metadata early: Print metadata as soon as values are determined

  4. Use type-specific nodes:

    • execution for single runs

    • task for parallel processing with metadata lists

Next steps

Last updated

Was this helpful?