Summary
This guide demonstrates how to launch a pipeline that processes all the files in a dataset version using a dataset version trigger. This involves parsing notification payloads, handling dataset inputs, and configuring pipeline and trigger settings.
Example Python Scripts
Dependencies
Add the following line to the requirements.txt
file to include the valohai-utils
package in your project (optional, but this example uses it for parsing the input payload):
valohai-utils
Parse Notification Payload
An execution doesn’t receive the dataset version files directly as inputs; instead, you can add a script that parses the dataset version URI from the notification payload, and passes it on to the next node.
parse-notification.py
"""Parse dataset version URL from notification payload.
The payload is a JSON file that Valohai sends to the step.
"""
import json
import valohai
# notification payload is provided in a Valohai input file
input_file = valohai.inputs("payload").path()
# get the json "payload" content from the input file
with open(input_file) as file:
payload = json.load(file)
# retrieve the new dataset version URI from the payload
dataset_version_uri = payload["data"]["version"]["uri"]
# output the URI as execution metadata
# this will be available to the next step
print(json.dumps({"dataset": dataset_version_uri}))
Datum Handler Script
This is a simple demo script that loops through the files included in the dataset version and lists their paths.
list-inputs.py
"""List all inputs given to the current step."""
import valohai
valohai.prepare(step="list-inputs")
for file_path in valohai.inputs("dataset").paths():
print(file_path)
YAML Configuration
Create a valohai.yaml
file with the following definitions:
- step:
parse-notification
- Parsing step: receives the notification body with the dataset version information as input
(in the example, input name
payload
). - The step outputs the dataset version URL as metadata.
- Parsing step: receives the notification body with the dataset version information as input
(in the example, input name
- step:
list-inputs
- Action step; name this step according to what you do with the inputs.
- This step receives the dataset version URL as a parameter.
- The parameter value is passed to an input (e.g.
dataset
).
- pipeline:
Dataset handling automation
- Give the pipeline a descriptive name; this is used in the trigger action to run the pipeline.
- Connect the parsing step output metadata to the action step parameter.
You can name steps and parameters however you like; just remember to use the correct names in the following section.
- step:
name: parse-notification
image: python:3.12
command:
- pip install -r requirements.txt
- python ./parse-notification.py {parameters}
inputs:
- name: payload
- step:
name: list-inputs
image: python:3.12
command:
- pip install -r requirements.txt
- python ./list-inputs.py {parameters}
parameters:
- name: dataset_url
type: string
inputs:
- name: dataset
default: "{parameter:dataset_url}"
- pipeline:
name: Dataset handling automation
nodes:
- name: parse-notification
step: parse-notification
type: execution
- name: list-inputs
step: list-inputs
type: execution
edges:
- [parse-notification.metadata.dataset, list-inputs.parameter.dataset_url]
Trigger Setup
Create a trigger with the following values:
- Title: descriptive title, e.g.
Dataset version -> new data handler pipeline
- Trigger type:
Notification
- Actions:
Run Pipeline
- Source Commit Reference:
main
(or e.g. a reference to a specific commit) - Pipeline Name:
Dataset handling automation
(the name used in thevalohai. yaml
file) - Pipeline Title: (the title for the pipeline runs created by this trigger)
- Payload input name:
parse-notification.payload
(step and input names fromvalohai.yaml
)
- Source Commit Reference:
A Managed Trigger Channel notification channel is automatically created for you when you save the trigger.
By default, the workflow will be triggered when any dataset version is created. If you want to, you can configure the trigger to only trigger when a specific dataset version is created by adding a payload filter condition.
Notification Setup
Go to Settings > Notifications > Project Notifications and create a new notification routing:
- Event:
dataset version is created
- Filter events by users:
All users
- Channel: select the
Launches trigger: TRIGGER_TITLE
channel (for the trigger you just created).
Testing Your Workflow
If everything works as it should, creating a new dataset version should trigger the pipeline.
You can also test the workflow nodes individually.
Parse Notification Step
Create a dataset and a dataset version.
Create a payload input file and add it to your project. A minimal example of the payload file:
{
"type": "dataset_version_created",
"data": {
"version":{
"uri": "DATASET_VERSION_DATUM_URL"
}
}
}
- Create a new execution:
- Select the
parse-notification
step. - Add the payload input file as the execution input.
- Run the execution.
- The dataset version URL is printed out in the execution log.
- The execution metadata includes a list of files in the dataset version.
Handle Inputs Step
The input handler step requires a dataset version URL as input.
- Create a dataset.
- Create a new dataset and add files to it.
- Go to Executions > Create execution.
- Select the action step.
- Paste the dataset version datum URL into the Data > Inputs > your step in put > URL field.
- Set the dataset_url parameter to an empty string (
""
). - Run the execution.