Dataset Version Trigger

Automatically launch pipelines when new dataset versions are created

Launch a pipeline automatically when a new dataset version is created. This guide shows you how to parse notification payloads, handle dataset inputs, and configure pipeline and trigger settings for event-driven data processing.

Use Case

Process newly uploaded datasets automatically without manual intervention. When a user uploads data to Valohai and creates a new dataset version out of it, your preprocessing pipeline launches immediately and processes all files in the dataset version.

Perfect for:

  • Auto-validating uploaded data

  • Preprocessing raw data for training

  • Generating data quality reports

  • Extracting features from new samples


How It Works

  1. Upload dataset: User creates a new dataset version

  2. Notification fires: Valohai generates "dataset version created" event

  3. Trigger launches: Pipeline starts automatically

  4. Parse payload: First node extracts dataset URI from notification

  5. Process data: Second node receives URI and processes files


Step 1: Create the Scripts

Parse Notification Payload

This script extracts the dataset version URI from the notification and outputs it as metadata for the next pipeline node.

Create parse-notification.py:

"""Parse dataset version URL from notification payload.

The payload is a JSON file that Valohai sends to the step.
"""

import json
import valohai

# notification payload is provided in a Valohai input file
input_file = valohai.inputs("payload").path()

# get the json "payload" content from the input file
with open(input_file) as file:
    payload = json.load(file)

# retrieve the new dataset version URI from the payload
dataset_version_uri = payload["data"]["version"]["uri"]

# output the URI as execution metadata
# this will be available to the next step

print(json.dumps({"dataset": dataset_version_uri}))

Why metadata? Outputting as JSON metadata (not a file) lets you connect it directly to parameters in the next pipeline node using edges.


Data Processing Script

This example lists all files in the dataset. Replace this with your actual processing logic.

Create list-inputs.py:

"""List all inputs given to the current step."""

import valohai

valohai.prepare(step="list-inputs")

for file_path in valohai.inputs("dataset").paths():
    print(file_path)

Replace this script with your actual data processing code. The key is that it receives a dataset URI via the dataset input.


Dependencies

Add to requirements.txt:

valohai-utils

Step 2: Configure Pipeline in YAML

Create your valohai.yaml with two steps and a pipeline that connects them:

- step:
    name: parse-notification
    image: python:3.12
    command:
      - pip install -r requirements.txt
      - python ./parse-notification.py {parameters}
    inputs:
      - name: payload
- step:
    name: list-inputs
    image: python:3.12
    command:
      - pip install -r requirements.txt
      - python ./list-inputs.py {parameters}
    parameters:
      - name: dataset_url
        type: string
    inputs:
      - name: dataset
        default: "{parameter:dataset_url}"
- pipeline:
    name: Dataset handling automation
    nodes:
      - name: parse-notification
        step: parse-notification
        type: execution
      - name: list-inputs
        step: list-inputs
        type: execution
    edges:
      - [parse-notification.metadata.dataset, list-inputs.parameter.dataset_url]

How the Pipeline Works

Node 1: parse-notification

  • Receives webhook payload as input

  • Extracts dataset version URI

  • Outputs URI as metadata (dataset)

Edge: Connect metadata to parameter

  • Takes parse-notification.metadata.dataset

  • Passes it to list-inputs.parameter.dataset_url

Node 2: list-inputs

  • Receives dataset URI via parameter

  • Parameter becomes input via {parameter:dataset_url}

  • Processes all files in the dataset


Step 3: Create the Trigger

  1. Go to Project Settings → Triggers

  2. Click Create Trigger

  3. Configure the trigger:

Basic Settings:

  • Title: Dataset version -> new data handler pipeline

  • Trigger Type: Notification

Conditions (optional):

  • Payload Filter if you want to filter by specific dataset:

    • Lookup Path: data.dataset.name

    • Operation: Equals

    • Invariant: your-dataset-name

Actions:

  • Action Type: Run Pipeline

  • Source Commit Reference: main (or your branch)

  • Pipeline Name: Dataset handling automation

  • Pipeline Title: (optional title for pipeline runs)

  • Payload Input Name: parse-notification.payload

Payload Input Name format: <node-name>.<input-name> specifies which pipeline node receives the notification payload.

  1. Click Save Trigger

A Managed Trigger Channel is automatically created for this trigger.


Step 4: Set Up Notification Routing

Connect the "dataset version created" event to your trigger:

  1. Go to Project Settings → Notifications → Project Notifications

  2. Click Create new notification routing

  3. Configure routing:

    • Event: dataset version is created

    • Filter events by users: All users (or select specific users)

    • Channel: Select Launches trigger: Dataset version -> new data handler pipeline

  4. Click Save

Now your automation is live!


Step 5: Test the Workflow

Test Parsing Node Manually

Before testing the full trigger, verify the parsing step works:

  1. Create a test payload file test-payload.json:

{
  "type": "dataset_version_created",
  "data": {
    "version": {
      "uri": "datum://your-dataset/version-id"
    }
  }
}
  1. Create a new execution:

    • Select step: parse-notification

    • Add input: Upload your test payload file to the payload input

  2. Run execution

  3. Check logs: Dataset version URL should be printed

  4. Check metadata: Should show extracted URI


Test Processing Node Manually

Verify the data processing step works:

  1. Create a dataset and add files to it

  2. Copy the dataset version datum URL (from dataset page)

  3. Create execution:

    • Select step: list-inputs

    • Paste dataset URI into Data → Inputs → dataset → URL field

    • Set dataset_url parameter to empty string ("")

  4. Run execution

  5. Check logs: Should list all files in dataset


Test Full Trigger

Once manual tests pass, trigger the real automation:

  1. Go to your dataset

  2. Create a new version (upload files or create empty version)

  3. Check Executions page: Pipeline should launch automatically

  4. Verify both nodes execute correctly

  5. Check trigger logs if anything fails: Project Settings → Triggers → ... menu → View Logs


Troubleshooting

Trigger Not Launching

Check notification routing:

  • Verify event type is "dataset version is created"

  • Confirm channel is selected correctly

  • Check if user filter is too restrictive

Check trigger status:

  • Ensure trigger is enabled

  • Look at trigger logs for errors

  • Verify payload filters aren't blocking events


Pipeline Fails at Parsing Node

Common issues:

  • Payload input not configured correctly

  • Input name mismatch between trigger config and YAML

  • JSON parsing error in script

Debug steps:

  • Check execution logs for Python errors

  • Verify payload file exists and is valid JSON

  • Print payload content to see structure


Pipeline Fails at Processing Node

Common issues:

  • Dataset URI not passed correctly through edge

  • Parameter name mismatch

  • Input configuration error

Debug steps:

  • Check if parameter received correct value

  • Verify edge configuration in pipeline YAML

  • Test processing node manually with known-good dataset URI


Trigger Fires Too Often

Solutions:

  • Add payload filter to only trigger for specific datasets

  • Add user filter to only trigger for certain team members

  • Check if multiple notification routings are configured


Next Steps

Last updated

Was this helpful?