# Running Distributed Tasks

Distributed Tasks enable training across multiple machines that communicate during execution.

This guide covers how to prepare your code and launch Distributed Tasks on Valohai.

### Before you start

To use Distributed Tasks, you need:

* **Private workers** or a full private Valohai installation
* **Network configuration** allowing free communication between workers in the same subnet
* **Distributed features enabled** on your account (contact Valohai support)

> 💡 **Tip:** See the [distributed examples repository](https://github.com/valohai/distributed-examples) for complete code samples with PyTorch, Horovod, TensorFlow, and more.

### Prepare your code for distributed execution

Distributed Tasks don't automatically make your code work in a distributed way. You need to:

1. Detect whether the execution is part of a Distributed Task
2. Initialize your distributed framework
3. Read connection information from Valohai's configuration files
4. Establish communication between workers

#### Detect distributed execution

Use `valohai-utils` to check if the execution is part of a Distributed Task:

```python
import valohai

if valohai.distributed.is_distributed_task():
    distributed_init()
else:
    normal_init()
```

This lets your code run in both standard and distributed modes.

#### Get connection information

Valohai provides connection details for all workers in the Distributed Task.

**Get all workers:**

```python
members = valohai.distributed.members()
member_ips = ",".join([m.primary_local_ip for m in members])
```

**Get the master worker:**

```python
master = valohai.distributed.master()
master_ip = master.primary_local_ip
master_url = f"tcp://{master_ip}:1234"
```

The "master" is simply the first worker that announced itself—it's a convenience abstraction if your framework needs a coordinator.

**Get current worker details:**

```python
me = valohai.distributed.me()
rank = valohai.distributed.rank  # 0, 1, 2, etc.
size = valohai.distributed.required_count  # Total worker count
```

> 💡 **Tip:** See the [distributed configuration files](/distributed-training/distributed-config.md) if you don't want to use valohai-utils but instead prefer to read the values from files.

#### Example: PyTorch Distributed setup

```python
import torch.distributed as dist
import valohai

if valohai.distributed.is_distributed_task():
    master = valohai.distributed.master()
    rank = valohai.distributed.rank
    world_size = valohai.distributed.required_count

    # Initialize PyTorch distributed
    dist.init_process_group(
        backend="nccl",
        init_method=f"tcp://{master.primary_local_ip}:29500",
        rank=rank,
        world_size=world_size,
    )
```

This sets up PyTorch's distributed backend using Valohai's connection information.

### Configure network access

Some distributed frameworks need specific ports or network modes.

Use environment variables to configure Docker networking:

**Expose all ports:**

```yaml
- step:
    name: train-distributed
    image: pytorch/pytorch:latest
    environment-variables:
      - name: VH_DOCKER_NETWORK
        default: host
```

This exposes all container ports to the worker subnet.

**Expose specific ports:**

```yaml
environment-variables:
  - name: VH_EXPOSE_PORTS
    default: 29500
```

This exposes only the specified port or port range.

### Run a Distributed Task

Once your code is ready:

1. Open your project in Valohai
2. Go to the **Tasks** tab
3. Click **Create Task**
4. Select your distributed training step
5. Under **Configuration**, select **Distributed** as the Task type
6. Set **Execution count** to the number of workers you need (e.g., `4` for 4 machines)
7. Configure parameters and environment variables as needed
8. Click **Create task**

Valohai will queue all workers, wait for them to come online, share connection information, and start your distributed training job.

### Example: Distributed Task blueprint

Define a Distributed Task in your `valohai.yaml`:

```yaml
- step:
    name: train-distributed
    image: pytorch/pytorch:latest
    command:
      - pip install valohai-utils
      - python train_distributed.py {parameters}
    environment-variables:
      - name: VH_DOCKER_NETWORK
        default: host
    parameters:
      - name: epochs
        default: 10
        type: integer
    inputs:
      - name: dataset
        default: s3://mybucket/train-data.tar

- task:
    step: train-distributed
    name: multi-gpu-training
    type: distributed
    parameters:
      - name: epochs
        style: single
        rules:
          value: 20
```

Run this Task from the UI by selecting the blueprint.

### Frameworks and examples

Valohai supports multiple distributed frameworks. Check the [distributed examples repository](https://github.com/valohai/distributed-examples) for working code:

**PyTorch:**

* PyTorch Distributed with NCCL backend
* PyTorch with Gloo backend
* PyTorch with OpenMPI

**TensorFlow:**

* TensorFlow Distributed with MultiWorkerMirroredStrategy
* Horovod with TensorFlow

**Hugging Face:**

* Accelerate for seamless distribution
* Transformers with Trainer API

**Other:**

* Horovod with PyTorch or TensorFlow
* DeepSpeed for large model training
* Ray for reinforcement learning

### Troubleshooting

**Workers stuck at "Waiting for the rest of the group"**

All workers must come online before execution starts. Check:

* Are all workers queued and starting?
* Is your account's worker pool large enough?
* Are there resource constraints (GPUs, memory)?

**Connection errors between workers**

Verify:

* Network security allows communication between workers in the subnet
* Ports are exposed correctly (`VH_DOCKER_NETWORK` or `VH_EXPOSE_PORTS`)
* Firewall rules permit worker-to-worker traffic

**Code runs in standard mode instead of distributed**

Confirm:

* You selected **Distributed** as the Task type (not Grid Search or Bayesian)
* Your code checks `valohai.distributed.is_distributed_task()`

### Next steps

* [Distributed Configuration Files](/distributed-training/distributed-config.md) for details on connection information
* [Distributed Examples Repository](https://github.com/valohai/distributed-examples) for framework-specific code
* [Tasks & Parallel Execution](/tasks.md) for standard Task types


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.valohai.com/distributed-training/distributed-tasks.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
