Pass Data Between Pipeline Nodes
Pipeline edges define how data flows between nodes. This guide covers the three types of connections: file outputs, parameters, and metadata.
File outputs to inputs
The most common pipeline pattern: one node produces files, the next consumes them.
Basic file passing
- step:
name: train-model
image: tensorflow/tensorflow:2.4.1
command: python train.py
- step:
name: test-model
image: tensorflow/tensorflow:2.4.1
command: python test.py
inputs:
- name: model
- pipeline:
name: training-pipeline
nodes:
- name: train-model
step: train-model
type: execution
- name: test-model
step: test-model
type: execution
edges:
- [train-model.output.model.pkl, test-model.input.model]The train-model step saves model.pkl to /valohai/outputs/. The edge passes this file to test-model as its model input where it'll be available at /valohai/inputs/model/model.pkl
You can also use files from subdirectories in the edges. For example, if you have a subdirectory called results and a file model.pkl in it, i.e. path /valohai/outputs/results/model.pkl, the edge would look like this:
edges:
- [train-model.output.results/model.pkl, test-model.input.model]💡 Use wildcards to pass multiple files:
train-model.output.*.pklpasses all pickle files. For files under a subdirectory calledresults:train-model.output.results/*.pkl
Edge merge modes
Control how edges interact with default inputs using edge-merge-mode:
- step:
name: train-model
image: tensorflow/tensorflow:2.6.0
command: python train.py
inputs:
- name: preprocessed_dataset
default: s3://mybucket/preprocessed_data/*
- pipeline:
name: merge-example
nodes:
- name: preprocess
step: preprocess
type: execution
- name: train-model
step: train-model
type: execution
edge-merge-mode: append # Add edge files to defaults
edges:
- [preprocess.output.*, train-model.input.preprocessed_dataset]Merge modes:
replace(default): Edge data replaces default inputsappend: Edge data adds to default inputs
Parameter passing
Share configuration values between nodes without modifying code.
Static parameter passing
Pass a parameter value from one node to another:
- step:
name: train-model
image: tensorflow/tensorflow:2.4.1
command: python train.py {parameters}
parameters:
- name: user-id
default: 345345
type: integer
- step:
name: test-model
image: tensorflow/tensorflow:2.4.1
command: python test.py {parameters}
parameters:
- name: user-id
default: 3
type: integer
- pipeline:
name: parameter-pipeline
nodes:
- name: train-model
step: train-model
type: execution
- name: test-model
step: test-model
type: execution
edges:
- [train-model.parameter.user-id, test-model.parameter.user-id]The test-model node inherits the user-id value from train-model.
Metadata to parameters
Use runtime-generated values to configure downstream nodes.
Single value metadata
Generate metadata in your code:
import json
# Find optimal learning rate during training
optimal_lr = 0.0003
print(json.dumps({"learning_rate": optimal_lr}))Pass it to the next node:
- pipeline:
name: dynamic-parameters
nodes:
- name: hyperparameter-search
step: find-best-params
type: execution
- name: final-training
step: train-model
type: execution
edges:
- [hyperparameter-search.metadata.learning_rate, final-training.parameter.lr]Multi-value metadata for tasks
Generate multiple values to create parallel task executions:
# Generate user IDs for parallel processing
user_ids = [463, 674, 888, 233]
print(json.dumps({"user": user_ids}))- pipeline:
name: parallel-processing
nodes:
- name: generate-ids
step: prepare-data
type: execution
- name: process-users
step: process-user
type: task # Creates parallel executions
edges:
- [generate-ids.metadata.user, process-users.parameter.user-id]💡 Tasks create one execution per value in the metadata list. With 4 user IDs, you get 4 parallel executions.
For multi-dimensional parameters:
# Each sublist becomes one execution's parameters
configs = [[204, 302], [593, 120]]
print(json.dumps({"params": configs}))Common issues and fixes
Best practices
Name outputs descriptively: Use
model.pklinstead ofoutput.pklValidate inputs exist: Always check for file existence in consuming nodes
Log metadata early: Print metadata as soon as values are determined
Use type-specific nodes:
executionfor single runstaskfor parallel processing with metadata lists
Next steps
Learn about conditional execution based on metadata values
Explore pipeline scheduling for automated workflows
Set up pipeline notifications for status updates
Last updated
Was this helpful?
