1. Introduction to distributed ML with PyTorchο
Author(s): Matteo Bunino (CERN), Jarl Sondre Sæther (CERN), Linus Eickhoff (CERN)
In this tutorial we show how to use torch DistributedDataParallel (DDP), Horovod and
DeepSpeed from the same client code.
Note that the environment is tested on the HDFML system at JSC. For other systems,
the module versions might need change accordingly.
1.1. Setupο
First, from the root of this repository, build the environment containing pytorch, horovod and deepspeed. You can try with:
# Creates a Python venv called envAI_hdfml
make torch-gpu-jsc
1.2. Distributed training on a single node (interactive)ο
If you want to use SLURM in interactive mode, do the following:
# Allocate resources
$ salloc --partition=develbooster --nodes=1 --account=intertwin --gres=gpu:4 --time=1:59:00
job ID is XXXX
# Get a shell in the compute node (if using SLURM)
$ srun --jobid XXXX --overlap --pty /bin/bash
# Now you are inside the compute node
# On JSC, you may need to load some modules...
ml --force purge
ml Stages/2024 GCC OpenMPI CUDA/12 MPI-settings/CUDA Python HDF5 PnetCDF libaio mpi4py
# ...before activating the Python environment (adapt this to your env name/path)
source ../../../envAI_hdfml/bin/activate
To launch the training with torch DDP use:
torchrun --standalone --nnodes=1 --nproc-per-node=gpu train.py -s ddp
# Optional -- from a SLURM login node:
srun --jobid XXXX --ntasks-per-node=1 torchrun --standalone --nnodes=1 --nproc-per-node=gpu train.py -s ddp
To launch the training with Microsoft DeepSpeed use:
torchrun --standalone --nnodes=1 --nproc-per-node=gpu train.py -s deepspeed
# Optional -- from a SLURM login node:
srun --jobid XXXX --ntasks-per-node=1 deepspeed train.py -s deepspeed --deepspeed
To launch the training with Horovod use:
Note
Assuming 4 GPUs are available.
If your setup has a different number of GPUs, change the -np 4 -H localhost:4 part.
Warning
Using horovodrun is the suggested way according to the horovod docs.
To use horovodrun, make sure that mpirun is available in your environment.
Otherwise you cannot use Horovod in interactive mode.
On JSC juwels, mpirun is not available, use the srun command as a valid fallback instead.
You can find out if mpirun exists using:
which mpirun
Using horovodrun (if mpirun exists):
# Assuming 4 GPUs are available (-np=4)
horovodrun -np 4 -H localhost:4 train.py -s horovod
# Optional -- from a SLURM login node:
srun --jobid XXXX --ntasks-per-node=1 horovodrun -np 4 -H localhost:4 python -u train.py -s horovod
Using srun (if mpirun doesnβt exist, e.g. on JSC juwels):
# Assuming 4 GPUs are available
srun --cpu-bind=none --ntasks=4 --ntasks-per-node=4 --cpus-per-task=1 python -u train.py -s horovod
1.3. Distributed training with SLURM (batch mode)ο
You can run your training with SLURM by using the itwinai SLURM Builder. Use the
slurm_config.yaml file to specify your SLURM parameters and then preview your script
with the following command:
itwinai generate-slurm -c slurm_config.yaml --no-save-script --no-submit-job
If you are happy with the script, you can then run it by omitting --no-submit-job:
itwinai generate-slurm -c slurm_config.yaml --no-save-script
If you want to store a copy of the script in a folder, then you can similarly omit
--no-save-script:
itwinai generate-slurm -c slurm_config.yaml
1.4. train.pyο
# --------------------------------------------------------------------------------------
# Part of the interTwin Project: https://www.intertwin.eu/
#
# Created by: Matteo Bunino
#
# Credit:
# - Matteo Bunino <matteo.bunino@cern.ch> - CERN
# --------------------------------------------------------------------------------------
"""Show how to use DDP, Horovod and DeepSpeed strategies interchangeably
with an extremely simple neural network.
"""
import argparse
import time
from typing import Dict
import horovod.torch as hvd
import torch
from torch import nn
from torch.utils.data import Dataset
from itwinai.torch.distributed import (
DeepSpeedStrategy,
HorovodStrategy,
NonDistributedStrategy,
TorchDDPStrategy,
TorchDistributedStrategy,
distributed_resources_available,
)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument(
"--strategy", "-s", type=str, choices=["ddp", "horovod", "deepspeed"], default="ddp"
)
parser.add_argument("--shuffle_dataloader", action=argparse.BooleanOptionalAction)
parser.add_argument(
"--batch-size",
type=int,
default=10,
help="input batch size for training (default: 10)",
)
# DeepSpeed: needs to be removed
import deepspeed
parser.add_argument(
"--local_rank",
type=int,
default=-1,
help="local rank passed from distributed launcher",
)
parser = deepspeed.add_config_arguments(parser)
args = parser.parse_args()
return args
class UniformRndDataset(Dataset):
"""Dummy torch dataset."""
def __init__(self, x_size: int, y_size: int, len: int = 100):
super().__init__()
self.x_size = x_size
self.y_size = y_size
self.len = len
def __len__(self):
return self.len
def __getitem__(self, index):
return torch.rand(self.x_size), torch.rand(self.y_size)
def training_fn(
args: argparse.Namespace, strategy: TorchDistributedStrategy, distribute_kwargs: Dict
) -> int:
"""Dummy training function."""
strategy.init()
# Local model
model = nn.Linear(3, 4)
optim = torch.optim.Adam(model.parameters(), lr=1e-3)
loss_fn = nn.MSELoss()
# Distributed model
model, optim, lr_sched = strategy.distributed(
model, optim, lr_scheduler=None, **distribute_kwargs
)
# Dataset
train_set = UniformRndDataset(x_size=3, y_size=4)
# Distributed dataloader
train_loader = strategy.create_dataloader(
train_set, batch_size=args.batch_size, num_workers=1, shuffle=True
)
# Device allocated for this worker
device = strategy.device()
for epoch in range(2):
# IMPORTANT: set current epoch ID in distributed sampler
if strategy.is_distributed:
train_loader.sampler.set_epoch(epoch)
for x, y in train_loader:
# print(f"tensor to cuda:{device}")
x = x.to(device)
y = y.to(device)
optim.zero_grad()
y_pred = model(x)
loss = loss_fn(y_pred, y)
loss.backward()
optim.step()
if strategy.is_main_worker:
print(f"Loss [epoch={epoch}]: {loss.item()}")
# print(f"NNLoss [epoch={epoch}]: {loss.item()}")
# Update scheduler
if lr_sched:
lr_sched.step()
time.sleep(1)
print(f"<Global rank: {strategy.global_rank()}> - TRAINING FINISHED")
strategy.clean_up()
return 123
if __name__ == "__main__":
args = parse_args()
# Instantiate Strategy
if not distributed_resources_available():
print("WARNING: falling back to non-distributed strategy.")
strategy = NonDistributedStrategy()
distribute_kwargs = {}
elif args.strategy == "ddp":
strategy = TorchDDPStrategy(backend="nccl")
distribute_kwargs = {}
elif args.strategy == "horovod":
strategy = HorovodStrategy()
distribute_kwargs = dict(
compression=hvd.Compression.none, op=hvd.Average, gradient_predivide_factor=1.0
)
elif args.strategy == "deepspeed":
strategy = DeepSpeedStrategy(backend="nccl")
distribute_kwargs = dict(
config_params=dict(train_micro_batch_size_per_gpu=args.batch_size)
)
else:
raise NotImplementedError(f"Strategy {args.strategy} is not recognized/implemented.")
# Launch distributed training
training_fn(args, strategy, distribute_kwargs)