Running Distributed Tasks

Distributed Tasks enable training across multiple machines that communicate during execution.

This guide covers how to prepare your code and launch Distributed Tasks on Valohai.

Before you start

To use Distributed Tasks, you need:

  • Private workers or a full private Valohai installation

  • Network configuration allowing free communication between workers in the same subnet

  • Distributed features enabled on your account (contact Valohai support)

💡 Tip: See the distributed examples repository for complete code samples with PyTorch, Horovod, TensorFlow, and more.

Prepare your code for distributed execution

Distributed Tasks don't automatically make your code work in a distributed way. You need to:

  1. Detect whether the execution is part of a Distributed Task

  2. Initialize your distributed framework

  3. Read connection information from Valohai's configuration files

  4. Establish communication between workers

Detect distributed execution

Use valohai-utils to check if the execution is part of a Distributed Task:

import valohai

if valohai.distributed.is_distributed_task():
    distributed_init()
else:
    normal_init()

This lets your code run in both standard and distributed modes.

Get connection information

Valohai provides connection details for all workers in the Distributed Task.

Get all workers:

members = valohai.distributed.members()
member_ips = ",".join([m.primary_local_ip for m in members])

Get the master worker:

master = valohai.distributed.master()
master_ip = master.primary_local_ip
master_url = f"tcp://{master_ip}:1234"

The "master" is simply the first worker that announced itself—it's a convenience abstraction if your framework needs a coordinator.

Get current worker details:

me = valohai.distributed.me()
rank = valohai.distributed.rank  # 0, 1, 2, etc.
size = valohai.distributed.required_count  # Total worker count

💡 Tip: See the distributed configuration files if you don't want to use valohai-utils but instead prefer to read the values from files.

Example: PyTorch Distributed setup

import torch.distributed as dist
import valohai

if valohai.distributed.is_distributed_task():
    master = valohai.distributed.master()
    rank = valohai.distributed.rank
    world_size = valohai.distributed.required_count
    
    # Initialize PyTorch distributed
    dist.init_process_group(
        backend="nccl",
        init_method=f"tcp://{master.primary_local_ip}:29500",
        rank=rank,
        world_size=world_size
    )

This sets up PyTorch's distributed backend using Valohai's connection information.

Configure network access

Some distributed frameworks need specific ports or network modes.

Use environment variables to configure Docker networking:

Expose all ports:

- step:
    name: train-distributed
    image: pytorch/pytorch:latest
    environment-variables:
      - name: VH_DOCKER_NETWORK
        default: host

This exposes all container ports to the worker subnet.

Expose specific ports:

environment-variables:
  - name: VH_EXPOSE_PORTS
    default: 29500

This exposes only the specified port or port range.

Run a Distributed Task

Once your code is ready:

  1. Open your project in Valohai

  2. Go to the Tasks tab

  3. Click Create Task

  4. Select your distributed training step

  5. Under Configuration, select Distributed as the Task type

  6. Set Execution count to the number of workers you need (e.g., 4 for 4 machines)

  7. Configure parameters and environment variables as needed

  8. Click Create task

Valohai will queue all workers, wait for them to come online, share connection information, and start your distributed training job.

Example: Distributed Task blueprint

Define a Distributed Task in your valohai.yaml:

- step:
    name: train-distributed
    image: pytorch/pytorch:latest
    command:
      - pip install valohai-utils
      - python train_distributed.py {parameters}
    environment-variables:
      - name: VH_DOCKER_NETWORK
        default: host
    parameters:
      - name: epochs
        default: 10
        type: integer
    inputs:
      - name: dataset
        default: s3://mybucket/train-data.tar

- task:
    step: train-distributed
    name: multi-gpu-training
    type: distributed
    parameters:
      - name: epochs
        style: single
        rules:
          value: 20

Run this Task from the UI by selecting the blueprint.

Frameworks and examples

Valohai supports multiple distributed frameworks. Check the distributed examples repository for working code:

PyTorch:

  • PyTorch Distributed with NCCL backend

  • PyTorch with Gloo backend

  • PyTorch with OpenMPI

TensorFlow:

  • TensorFlow Distributed with MultiWorkerMirroredStrategy

  • Horovod with TensorFlow

Hugging Face:

  • Accelerate for seamless distribution

  • Transformers with Trainer API

Other:

  • Horovod with PyTorch or TensorFlow

  • DeepSpeed for large model training

  • Ray for reinforcement learning

Troubleshooting

Workers stuck at "Waiting for the rest of the group"

All workers must come online before execution starts. Check:

  • Are all workers queued and starting?

  • Is your account's worker pool large enough?

  • Are there resource constraints (GPUs, memory)?

Connection errors between workers

Verify:

  • Network security allows communication between workers in the subnet

  • Ports are exposed correctly (VH_DOCKER_NETWORK or VH_EXPOSE_PORTS)

  • Firewall rules permit worker-to-worker traffic

Code runs in standard mode instead of distributed

Confirm:

  • You selected Distributed as the Task type (not Grid Search or Bayesian)

  • Your code checks valohai.distributed.is_distributed_task()

Next steps

Last updated

Was this helpful?