1. Introduction to distributed ML with PyTorch

Author(s): Matteo Bunino (CERN), Jarl Sondre Sæther (CERN), Linus Eickhoff (CERN)

In this tutorial we show how to use torch DistributedDataParallel (DDP), Horovod and DeepSpeed from the same client code. Note that the environment is tested on the HDFML system at JSC. For other systems, the module versions might need change accordingly.

1.1. Setup

First, from the root of this repository, build the environment containing pytorch, horovod and deepspeed. You can try with:

# Creates a Python venv called envAI_hdfml
make torch-gpu-jsc

1.2. Distributed training on a single node (interactive)

If you want to use SLURM in interactive mode, do the following:

# Allocate resources
$ salloc --partition=develbooster --nodes=1 --account=intertwin  --gres=gpu:4 --time=1:59:00
job ID is XXXX
# Get a shell in the compute node (if using SLURM)
$ srun --jobid XXXX --overlap --pty /bin/bash
# Now you are inside the compute node

# On JSC, you may need to load some modules...
ml --force purge
ml Stages/2024 GCC OpenMPI CUDA/12 MPI-settings/CUDA Python HDF5 PnetCDF libaio mpi4py

# ...before activating the Python environment (adapt this to your env name/path)
source ../../../envAI_hdfml/bin/activate

To launch the training with torch DDP use:

torchrun --standalone --nnodes=1 --nproc-per-node=gpu train.py -s ddp

# Optional -- from a SLURM login node:
srun --jobid XXXX --ntasks-per-node=1 torchrun --standalone --nnodes=1 --nproc-per-node=gpu train.py -s ddp

To launch the training with Microsoft DeepSpeed use:

torchrun --standalone --nnodes=1 --nproc-per-node=gpu train.py -s deepspeed

# Optional -- from a SLURM login node:
srun --jobid XXXX --ntasks-per-node=1 deepspeed train.py -s deepspeed --deepspeed

To launch the training with Horovod use:

Note

Assuming 4 GPUs are available.

If your setup has a different number of GPUs, change the -np 4 -H localhost:4 part.

Warning

Using horovodrun is the suggested way according to the horovod docs. To use horovodrun, make sure that mpirun is available in your environment. Otherwise you cannot use Horovod in interactive mode. On JSC juwels, mpirun is not available, use the srun command as a valid fallback instead.

You can find out if mpirun exists using:

which mpirun

Using horovodrun (if mpirun exists):

# Assuming 4 GPUs are available (-np=4)
horovodrun -np 4 -H localhost:4 train.py -s horovod

# Optional -- from a SLURM login node:
srun --jobid XXXX --ntasks-per-node=1 horovodrun -np 4 -H localhost:4 python -u train.py -s horovod

Using srun (if mpirun doesn’t exist, e.g. on JSC juwels):

# Assuming 4 GPUs are available
srun --cpu-bind=none --ntasks=4 --ntasks-per-node=4 --cpus-per-task=1 python -u train.py -s horovod

1.3. Distributed training with SLURM (batch mode)

You can run your training with SLURM by using the itwinai SLURM Builder. Use the slurm_config.yaml file to specify your SLURM parameters and then preview your script with the following command:

itwinai generate-slurm -c slurm_config.yaml --no-save-script --no-submit-job

If you are happy with the script, you can then run it by omitting --no-submit-job:

itwinai generate-slurm -c slurm_config.yaml --no-save-script

If you want to store a copy of the script in a folder, then you can similarly omit --no-save-script:

itwinai generate-slurm -c slurm_config.yaml

1.4. train.py

# --------------------------------------------------------------------------------------
# Part of the interTwin Project: https://www.intertwin.eu/
#
# Created by: Matteo Bunino
#
# Credit:
# - Matteo Bunino <matteo.bunino@cern.ch> - CERN
# --------------------------------------------------------------------------------------

"""Show how to use DDP, Horovod and DeepSpeed strategies interchangeably
with an extremely simple neural network.
"""

import argparse
import time
from typing import Dict

import horovod.torch as hvd
import torch
from torch import nn
from torch.utils.data import Dataset

from itwinai.torch.distributed import (
    DeepSpeedStrategy,
    HorovodStrategy,
    NonDistributedStrategy,
    TorchDDPStrategy,
    TorchDistributedStrategy,
    distributed_resources_available,
)


def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--strategy", "-s", type=str, choices=["ddp", "horovod", "deepspeed"], default="ddp"
    )
    parser.add_argument("--shuffle_dataloader", action=argparse.BooleanOptionalAction)
    parser.add_argument(
        "--batch-size",
        type=int,
        default=10,
        help="input batch size for training (default: 10)",
    )

    # DeepSpeed: needs to be removed
    import deepspeed

    parser.add_argument(
        "--local_rank",
        type=int,
        default=-1,
        help="local rank passed from distributed launcher",
    )
    parser = deepspeed.add_config_arguments(parser)
    args = parser.parse_args()
    return args


class UniformRndDataset(Dataset):
    """Dummy torch dataset."""

    def __init__(self, x_size: int, y_size: int, len: int = 100):
        super().__init__()
        self.x_size = x_size
        self.y_size = y_size
        self.len = len

    def __len__(self):
        return self.len

    def __getitem__(self, index):
        return torch.rand(self.x_size), torch.rand(self.y_size)


def training_fn(
    args: argparse.Namespace, strategy: TorchDistributedStrategy, distribute_kwargs: Dict
) -> int:
    """Dummy training function."""
    strategy.init()

    # Local model
    model = nn.Linear(3, 4)
    optim = torch.optim.Adam(model.parameters(), lr=1e-3)
    loss_fn = nn.MSELoss()
    # Distributed model
    model, optim, lr_sched = strategy.distributed(
        model, optim, lr_scheduler=None, **distribute_kwargs
    )

    # Dataset
    train_set = UniformRndDataset(x_size=3, y_size=4)
    # Distributed dataloader
    train_loader = strategy.create_dataloader(
        train_set, batch_size=args.batch_size, num_workers=1, shuffle=True
    )

    # Device allocated for this worker
    device = strategy.device()

    for epoch in range(2):
        # IMPORTANT: set current epoch ID in distributed sampler
        if strategy.is_distributed:
            train_loader.sampler.set_epoch(epoch)

        for x, y in train_loader:
            # print(f"tensor to cuda:{device}")
            x = x.to(device)
            y = y.to(device)

            optim.zero_grad()

            y_pred = model(x)

            loss = loss_fn(y_pred, y)
            loss.backward()

            optim.step()

            if strategy.is_main_worker:
                print(f"Loss [epoch={epoch}]: {loss.item()}")
            # print(f"NNLoss [epoch={epoch}]: {loss.item()}")

        # Update scheduler
        if lr_sched:
            lr_sched.step()

    time.sleep(1)
    print(f"<Global rank: {strategy.global_rank()}> - TRAINING FINISHED")
    strategy.clean_up()
    return 123


if __name__ == "__main__":
    args = parse_args()

    # Instantiate Strategy
    if not distributed_resources_available():
        print("WARNING: falling back to non-distributed strategy.")
        strategy = NonDistributedStrategy()
        distribute_kwargs = {}
    elif args.strategy == "ddp":
        strategy = TorchDDPStrategy(backend="nccl")
        distribute_kwargs = {}
    elif args.strategy == "horovod":
        strategy = HorovodStrategy()
        distribute_kwargs = dict(
            compression=hvd.Compression.none, op=hvd.Average, gradient_predivide_factor=1.0
        )
    elif args.strategy == "deepspeed":
        strategy = DeepSpeedStrategy(backend="nccl")
        distribute_kwargs = dict(
            config_params=dict(train_micro_batch_size_per_gpu=args.batch_size)
        )
    else:
        raise NotImplementedError(f"Strategy {args.strategy} is not recognized/implemented.")
    # Launch distributed training
    training_fn(args, strategy, distribute_kwargs)