# Distributed Data Parallel

Distributed Data Parallel, usually abbreviated as DDP, is PyTorch’s primary system for synchronous multi-GPU training. DDP extends ordinary data parallelism to distributed environments while minimizing Python overhead and communication inefficiency.

The central design principle is simple: one process controls one device. Each process owns a complete replica of the model, computes gradients locally, and synchronizes gradients with other processes during backpropagation.

Compared with older single-process approaches such as `nn.DataParallel`, DDP achieves substantially better scaling because computation and communication are distributed across independent worker processes.

### From Single-GPU Training to DDP

A standard single-GPU training loop looks like this:

```python id="q4tt4m"
model = MyModel().cuda()

optimizer = torch.optim.AdamW(model.parameters(), lr=3e-4)

for x, y in loader:
    x = x.cuda()
    y = y.cuda()

    logits = model(x)
    loss = loss_fn(logits, y)

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
```

In DDP, the core logic remains nearly identical. The main differences are:

1. Multiple processes are launched.
2. Each process is assigned one GPU.
3. The model is wrapped with `DistributedDataParallel`.
4. The dataset is partitioned across processes.

The training loop itself changes very little.

This is one reason DDP became the standard distributed training system in PyTorch. It preserves the mental model of ordinary training while scaling to many devices and machines.

### Distributed Process Groups

DDP depends on a distributed process group. A process group is a collection of worker processes that can communicate with each other using collective operations such as:

- all-reduce
- broadcast
- gather
- scatter
- barrier

PyTorch initializes the process group with:

```python id="s21j7g"
import torch.distributed as dist

dist.init_process_group(
    backend="nccl"
)
```

The backend determines how communication is implemented.

Common backends include:

| Backend | Typical use |
|---|---|
| `nccl` | GPU training on NVIDIA hardware |
| `gloo` | CPU training and debugging |
| `mpi` | HPC clusters with MPI |
| `ucc` | Unified communication systems |

For modern GPU training, `nccl` is almost always preferred because it is optimized for high-bandwidth GPU communication.

### World Size and Rank

Every process in DDP receives two important identifiers.

The world size is the total number of participating processes.

The rank is the unique process ID.

If we launch training on 8 GPUs:

| GPU | Rank |
|---|---:|
| GPU 0 | 0 |
| GPU 1 | 1 |
| GPU 2 | 2 |
| GPU 3 | 3 |
| GPU 4 | 4 |
| GPU 5 | 5 |
| GPU 6 | 6 |
| GPU 7 | 7 |

Rank 0 is usually treated as the primary worker. It commonly handles:

- checkpoint saving
- logging
- metric printing
- validation summaries

Example:

```python id="z45w5n"
if dist.get_rank() == 0:
    print("Saving checkpoint")
```

Without this condition, every process may try to write the same file simultaneously.

### Launching Distributed Training

DDP training is normally launched with `torchrun`.

Example:

```bash id="uh0l98"
torchrun --nproc_per_node=8 train.py
```

This command launches 8 independent Python processes.

PyTorch automatically sets several environment variables:

| Variable | Meaning |
|---|---|
| `RANK` | Global process rank |
| `WORLD_SIZE` | Total number of processes |
| `LOCAL_RANK` | GPU index on current machine |
| `MASTER_ADDR` | Address of primary node |
| `MASTER_PORT` | Communication port |

The training script reads these variables:

```python id="x0a2mc"
import os

rank = int(os.environ["RANK"])
world_size = int(os.environ["WORLD_SIZE"])
local_rank = int(os.environ["LOCAL_RANK"])
```

Then the correct GPU is selected:

```python id="5zjlwm"
torch.cuda.set_device(local_rank)
```

Each process must exclusively control its assigned GPU.

### Wrapping Models with DDP

After creating the model, we wrap it:

```python id="yl1vut"
from torch.nn.parallel import DistributedDataParallel as DDP

model = MyModel().cuda(local_rank)

model = DDP(
    model,
    device_ids=[local_rank]
)
```

The wrapped model behaves almost exactly like the original model.

Forward pass:

```python id="l9yrmn"
logits = model(x)
```

Backward pass:

```python id="jyfxgo"
loss.backward()
```

DDP automatically intercepts gradients during backpropagation and synchronizes them across workers.

The optimizer remains unchanged:

```python id="4h9fn8"
optimizer.step()
```

This design keeps distributed training close to ordinary PyTorch code.

### How Gradient Synchronization Works

The key operation inside DDP is gradient all-reduce.

Suppose parameter tensor $W$ exists on every worker. During backpropagation, each worker computes its local gradient:

$$
g_1,\quad g_2,\quad \ldots,\quad g_K.
$$

DDP computes:

$$
g =
\frac{1}{K}
\sum_{k=1}^{K} g_k.
$$

Then every worker replaces its local gradient with the averaged result.

Because all workers start with identical parameters and apply identical updates, parameter replicas remain synchronized.

Conceptually:

```python id="x0zm5v"
for param in model.parameters():
    dist.all_reduce(param.grad)

    param.grad /= world_size
```

DDP performs this automatically and efficiently.

### Bucketing and Communication Overlap

Naively synchronizing gradients after the entire backward pass would waste time. GPUs would sit idle waiting for communication.

DDP avoids this by using gradient bucketing.

Parameters are grouped into buckets. As soon as gradients for one bucket are ready, DDP begins communicating them while backpropagation continues for remaining layers.

This overlaps:

- gradient computation
- communication
- parameter synchronization

The result is much better scaling efficiency.

Large transformer models depend heavily on this overlap. Without it, communication overhead would dominate training time.

### Backward Hooks

DDP internally uses autograd hooks.

A hook is attached to each parameter tensor. When autograd finishes computing a parameter gradient, the hook triggers communication for that parameter’s bucket.

Conceptually:

```python id="m0z5r2"
def hook(grad):
    synchronize_gradient(grad)

param.register_hook(hook)
```

This integration with autograd is one reason DDP scales efficiently while preserving PyTorch’s eager execution model.

### Static Graph Assumptions

DDP assumes that the same parameters participate in the backward pass across workers.

If one worker skips a parameter while another uses it, synchronization can deadlock.

Dynamic control flow sometimes violates this assumption:

```python id="7gm7f2"
if random.random() > 0.5:
    y = branch_a(x)
else:
    y = branch_b(x)
```

If different workers take different branches, some gradients may be missing.

PyTorch provides:

```python id="9wpk2h"
find_unused_parameters=True
```

Example:

```python id="zv89wa"
model = DDP(
    model,
    device_ids=[local_rank],
    find_unused_parameters=True
)
```

This allows DDP to track unused parameters, but it introduces extra overhead.

Whenever possible, distributed training graphs should remain structurally consistent across workers.

### Distributed Data Loading

Each worker must receive different data.

DDP training normally uses:

```python id="w9a6c7"
from torch.utils.data.distributed import DistributedSampler
```

Example:

```python id="i6g42q"
sampler = DistributedSampler(
    dataset,
    shuffle=True
)

loader = DataLoader(
    dataset,
    batch_size=32,
    sampler=sampler
)
```

Without a distributed sampler, every worker may process the same mini-batch, wasting compute and harming optimization.

At the beginning of each epoch:

```python id="h8xn2d"
sampler.set_epoch(epoch)
```

This synchronizes shuffling across workers.

### Initialization Synchronization

At startup, DDP ensures all workers begin with identical parameters.

Typically rank 0 initializes the model, then parameters are broadcast:

```python id="pm0l7v"
dist.broadcast(param.data, src=0)
```

This guarantees consistency across replicas.

If workers start with different random initialization, averaging gradients would no longer correspond to valid synchronized optimization.

### Distributed Loss Computation

Each worker computes its own local loss.

Suppose worker $k$ computes:

$$
L_k.
$$

The backward pass uses the local loss directly. DDP synchronizes gradients, not losses.

Therefore this is correct:

```python id="5n4f9g"
loss.backward()
```

No explicit averaging of the loss is required for optimization correctness.

However, for logging and metrics, losses are often averaged across workers:

```python id="ybxvlq"
loss_tensor = torch.tensor(
    loss.item(),
    device=local_rank
)

dist.all_reduce(loss_tensor)

loss_tensor /= world_size
```

This produces a global mean loss for reporting.

### Validation in Distributed Training

Validation may also be distributed.

Each worker evaluates a subset of validation data:

```python id="aqvw7v"
with torch.no_grad():
    logits = model(x)
```

Metrics are then aggregated across workers.

For example, total correct predictions:

```python id="fq5j2h"
correct = torch.tensor(correct_count).cuda()

dist.all_reduce(correct)
```

This allows validation throughput to scale with the number of GPUs.

Some projects instead run validation only on rank 0 to simplify implementation.

### Checkpointing with DDP

The DDP wrapper stores the original model in:

```python id="2z86kp"
model.module
```

Therefore checkpoints usually save:

```python id="rm4uqq"
torch.save(
    model.module.state_dict(),
    "checkpoint.pt"
)
```

Loading:

```python id="lt8aqf"
state_dict = torch.load("checkpoint.pt")

model.load_state_dict(state_dict)
```

Only rank 0 should save checkpoints:

```python id="3wqvqb"
if dist.get_rank() == 0:
    save_checkpoint()
```

Otherwise multiple workers may overwrite the same file.

### Multi-Node DDP

DDP also supports training across multiple machines.

Example cluster:

| Node | GPUs |
|---|---:|
| Node 0 | 8 |
| Node 1 | 8 |
| Node 2 | 8 |
| Node 3 | 8 |

Total world size:

$$
32.
$$

Workers communicate over network interconnects such as:

- InfiniBand
- NVLink
- Ethernet
- RoCE

Multi-node training introduces additional concerns:

- network bandwidth
- latency
- node failure
- clock skew
- distributed filesystem performance

At large scale, communication topology becomes critically important.

### Communication Bottlenecks

DDP scales well only when communication cost remains manageable.

Communication overhead grows with:

- parameter count
- gradient size
- synchronization frequency
- network latency

Large transformer models may contain billions of parameters. Synchronizing gradients for every step can dominate runtime.

Common mitigation strategies include:

| Technique | Purpose |
|---|---|
| Mixed precision | Reduce communication volume |
| Gradient compression | Compress transmitted gradients |
| Larger batches | Increase computation-to-communication ratio |
| Faster interconnects | Reduce transfer latency |
| Bucket tuning | Improve overlap efficiency |

Modern large-scale training systems spend enormous engineering effort minimizing communication overhead.

### Failure Handling

Distributed systems fail more often than single-device systems.

Possible failures include:

- GPU out-of-memory
- network interruption
- worker crashes
- NCCL hangs
- deadlocks
- filesystem corruption

A common debugging tool:

```bash id="d0o2rw"
export NCCL_DEBUG=INFO
```

This enables verbose NCCL logging.

Barrier synchronization is also useful:

```python id="95s6xq"
dist.barrier()
```

A barrier blocks all workers until every process reaches the synchronization point.

Barriers help isolate where distributed programs hang.

### DDP Versus Fully Sharded Training

DDP replicates the full model on every GPU.

If the model contains $P$ parameters, then every GPU stores:

- parameters
- gradients
- optimizer states

Memory usage scales poorly for very large models.

This limitation motivated:

- Fully Sharded Data Parallel (FSDP)
- ZeRO optimization
- tensor parallelism
- pipeline parallelism

These systems partition model state across devices instead of fully replicating it.

Still, ordinary DDP remains the standard baseline because it is simple, stable, and highly effective when the model fits on one GPU.

### Why DDP Became the Standard

DDP succeeded because it aligns closely with the structure of deep learning workloads.

Neural networks naturally process batches independently. Gradients are additive across examples. Autograd systems already organize computation as graphs. GPUs excel at dense tensor operations.

DDP exploits all of these properties while preserving PyTorch’s imperative programming style.

From the user perspective, distributed training often requires only four conceptual changes:

1. launch multiple processes
2. initialize a process group
3. shard the dataset
4. wrap the model with DDP

Everything else remains close to ordinary PyTorch training.

That simplicity made DDP one of the most important infrastructure abstractions in modern deep learning.

