1. Introduction to distributed ML with PyTorch
Author(s): Matteo Bunino (CERN)
In this tutorial we show how to use torch DistributedDataParallel (DDP), Horovod and
DeepSpeed from the same client code.
Note that the environment is tested on the HDFML system at JSC. For other systems,
the module versions might need change accordingly.
1.1. Setup
First, from the root of this repository, build the environment containing pytorch, horovod and deepspeed. You can try with:
# Creates a Python venv called envAI_hdfml
make torch-gpu-jsc
1.2. Distributed training on a single node (interactive)
If you want to use SLURM in interactive mode, do the following:
# Allocate resources
$ salloc --partition=batch --nodes=1 --account=intertwin --gres=gpu:4 --time=1:59:00
job ID is XXXX
# Get a shell in the compute node (if using SLURM)
$ srun --jobid XXXX --overlap --pty /bin/bash
# Now you are inside the compute node
# On JSC, you may need to load some modules...
ml --force purge
ml Stages/2024 GCC OpenMPI CUDA/12 MPI-settings/CUDA Python HDF5 PnetCDF libaio mpi4py
# ...before activating the Python environment (adapt this to your env name/path)
source ../../../envAI_hdfml/bin/activate
To launch the training with torch DDP use:
torchrun --standalone --nnodes=1 --nproc-per-node=gpu train.py -s ddp
# Optional -- from a SLURM login node:
srun --jobid XXXX --ntasks-per-node=1 torchrun --standalone --nnodes=1 --nproc-per-node=gpu train.py -s ddp
To launch the training with Microsoft DeepSpeed use:
deepspeed train.py -s deepspeed --deepspeed
# Optional -- from a SLURM login node:
srun --jobid XXXX --ntasks-per-node=1 deepspeed train.py -s deepspeed --deepspeed
To launch the training with Horovod use:
[!NOTE] NOTE: Assuming 4 GPUs are available.
If your setup has a different number of GPUs, change the -np 4 -H localhost:4 part.
[!WARNING] To use
horovodrun, make sure thatmpirunis available in your environment. Otherwise you cannot use Horovod in interactive mode.
# Assuming 4 GPUs are available (-np=4)
horovodrun -np 4 -H localhost:4 train.py -s horovod
# Optional -- from a SLURM login node:
srun --jobid XXXX --ntasks-per-node=1 horovodrun -np 4 -H localhost:4 python -u train.py -s horovod
1.3. Distributed training with SLURM (batch mode)
Before running any of the commands below independently, for the first time,
ensure you have created the logs_slurm folder to ensure output and error files are stored correctly.
Ignore this step if you are to execute the runall.sh script as it creates the folder.
Each distributed strategy has its own SLURM job script, which should be used to run it:
If you want to distribute the code in train.py with torch DDP, run from terminal:
export DIST_MODE="ddp"
export RUN_NAME="ddp-itwinai"
export TRAINING_CMD="train.py -s ddp"
export PYTHON_VENV="../../../envAI_hdfml"
sbatch --export=ALL,DIST_MODE="$DIST_MODE",RUN_NAME="$RUN_NAME",TRAINING_CMD="$TRAINING_CMD",PYTHON_VENV="$PYTHON_VENV" \
--job-name="$RUN_NAME-n$N" \
--output="logs_slurm/job-$RUN_NAME-n$N.out" \
--error="logs_slurm/job-$RUN_NAME-n$N.err" \
slurm.sh
If you want to distribute the code in train.py with DeepSpeed, run from terminal:
export DIST_MODE="deepspeed"
export RUN_NAME="deepspeed-itwinai"
export TRAINING_CMD="train.py -s deepspeed"
export PYTHON_VENV="../../../envAI_hdfml"
sbatch --export=ALL,DIST_MODE="$DIST_MODE",RUN_NAME="$RUN_NAME",TRAINING_CMD="$TRAINING_CMD",PYTHON_VENV="$PYTHON_VENV" \
--job-name="$RUN_NAME-n$N" \
--output="logs_slurm/job-$RUN_NAME-n$N.out" \
--error="logs_slurm/job-$RUN_NAME-n$N.err" \
slurm.sh
If you want to distribute the code in train.py with Horovod, run from terminal:
export DIST_MODE="horovod"
export RUN_NAME="horovod-itwinai"
export TRAINING_CMD="train.py -s horovod"
export PYTHON_VENV="../../../envAI_hdfml"
sbatch --export=ALL,DIST_MODE="$DIST_MODE",RUN_NAME="$RUN_NAME",TRAINING_CMD="$TRAINING_CMD",PYTHON_VENV="$PYTHON_VENV" \
--job-name="$RUN_NAME-n$N" \
--output="logs_slurm/job-$RUN_NAME-n$N.out" \
--error="logs_slurm/job-$RUN_NAME-n$N.err" \
slurm.sh
You can run all of them with:
bash runall.sh
1.4. train.py
# --------------------------------------------------------------------------------------
# Part of the interTwin Project: https://www.intertwin.eu/
#
# Created by: Matteo Bunino
#
# Credit:
# - Matteo Bunino <matteo.bunino@cern.ch> - CERN
# --------------------------------------------------------------------------------------
"""Show how to use DDP, Horovod and DeepSpeed strategies interchangeably
with an extremely simple neural network.
"""
import argparse
import time
from typing import Dict
import horovod.torch as hvd
import torch
from torch import nn
from torch.utils.data import Dataset
from itwinai.torch.distributed import (
DeepSpeedStrategy,
HorovodStrategy,
NonDistributedStrategy,
TorchDDPStrategy,
TorchDistributedStrategy,
distributed_resources_available,
)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument(
"--strategy", "-s", type=str, choices=["ddp", "horovod", "deepspeed"], default="ddp"
)
parser.add_argument("--shuffle_dataloader", action=argparse.BooleanOptionalAction)
parser.add_argument(
"--batch-size",
type=int,
default=10,
help="input batch size for training (default: 10)",
)
# DeepSpeed: needs to be removed
import deepspeed
parser.add_argument(
"--local_rank",
type=int,
default=-1,
help="local rank passed from distributed launcher",
)
parser = deepspeed.add_config_arguments(parser)
args = parser.parse_args()
return args
class UniformRndDataset(Dataset):
"""Dummy torch dataset."""
def __init__(self, x_size: int, y_size: int, len: int = 100):
super().__init__()
self.x_size = x_size
self.y_size = y_size
self.len = len
def __len__(self):
return self.len
def __getitem__(self, index):
return torch.rand(self.x_size), torch.rand(self.y_size)
def training_fn(
args: argparse.Namespace, strategy: TorchDistributedStrategy, distribute_kwargs: Dict
) -> int:
"""Dummy training function."""
strategy.init()
# Local model
model = nn.Linear(3, 4)
optim = torch.optim.Adam(model.parameters(), lr=1e-3)
loss_fn = nn.MSELoss()
# Distributed model
model, optim, lr_sched = strategy.distributed(
model, optim, lr_scheduler=None, **distribute_kwargs
)
# Dataset
train_set = UniformRndDataset(x_size=3, y_size=4)
# Distributed dataloader
train_loader = strategy.create_dataloader(
train_set, batch_size=args.batch_size, num_workers=1, shuffle=True
)
# Device allocated for this worker
device = strategy.device()
for epoch in range(2):
# IMPORTANT: set current epoch ID in distributed sampler
train_loader.sampler.set_epoch(epoch)
for x, y in train_loader:
# print(f"tensor to cuda:{device}")
x = x.to(device)
y = y.to(device)
optim.zero_grad()
y_pred = model(x)
loss = loss_fn(y_pred, y)
loss.backward()
optim.step()
if strategy.is_main_worker:
print(f"Loss [epoch={epoch}]: {loss.item()}")
# print(f"NNLoss [epoch={epoch}]: {loss.item()}")
# Update scheduler
if lr_sched:
lr_sched.step()
time.sleep(1)
print(f"<Global rank: {strategy.global_rank()}> - TRAINING FINISHED")
strategy.clean_up()
return 123
if __name__ == "__main__":
args = parse_args()
# Instantiate Strategy
if not distributed_resources_available():
print("WARNING: falling back to non-distributed strategy.")
strategy = NonDistributedStrategy()
distribute_kwargs = {}
elif args.strategy == "ddp":
strategy = TorchDDPStrategy(backend="nccl")
distribute_kwargs = {}
elif args.strategy == "horovod":
strategy = HorovodStrategy()
distribute_kwargs = dict(
compression=hvd.Compression.none, op=hvd.Average, gradient_predivide_factor=1.0
)
elif args.strategy == "deepspeed":
strategy = DeepSpeedStrategy(backend="nccl")
distribute_kwargs = dict(
config_params=dict(train_micro_batch_size_per_gpu=args.batch_size)
)
else:
raise NotImplementedError(f"Strategy {args.strategy} is not recognized/implemented.")
# Launch distributed training
training_fn(args, strategy, distribute_kwargs)
1.5. slurm.sh
#!/bin/bash
# SLURM jobscript for JSC systems
# Job configuration
#SBATCH --job-name=distributed_training
#SBATCH --account=intertwin
#SBATCH --mail-user=
#SBATCH --mail-type=ALL
#SBATCH --output=job.out
#SBATCH --error=job.err
#SBATCH --time=00:30:00
# Resources allocation
#SBATCH --partition=batch
#SBATCH --nodes=2
#SBATCH --gpus-per-node=4
#SBATCH --cpus-per-gpu=4
#SBATCH --exclusive
# gres options have to be disabled for deepv
#SBATCH --gres=gpu:4
# Load environment modules
ml Stages/2024 GCC OpenMPI CUDA/12 MPI-settings/CUDA Python HDF5 PnetCDF libaio mpi4py
# Job info
echo "DEBUG: TIME: $(date)"
sysN="$(uname -n | cut -f2- -d.)"
sysN="${sysN%%[0-9]*}"
echo "Running on system: $sysN"
echo "DEBUG: EXECUTE: $EXEC"
echo "DEBUG: SLURM_SUBMIT_DIR: $SLURM_SUBMIT_DIR"
echo "DEBUG: SLURM_JOB_ID: $SLURM_JOB_ID"
echo "DEBUG: SLURM_JOB_NODELIST: $SLURM_JOB_NODELIST"
echo "DEBUG: SLURM_NNODES: $SLURM_NNODES"
echo "DEBUG: SLURM_NTASKS: $SLURM_NTASKS"
echo "DEBUG: SLURM_TASKS_PER_NODE: $SLURM_TASKS_PER_NODE"
echo "DEBUG: SLURM_SUBMIT_HOST: $SLURM_SUBMIT_HOST"
echo "DEBUG: SLURMD_NODENAME: $SLURMD_NODENAME"
echo "DEBUG: CUDA_VISIBLE_DEVICES: $CUDA_VISIBLE_DEVICES"
if [ "$DEBUG" = true ] ; then
echo "DEBUG: NCCL_DEBUG=INFO"
export NCCL_DEBUG=INFO
fi
echo
# Setup env for distributed ML
export CUDA_VISIBLE_DEVICES="0,1,2,3"
export OMP_NUM_THREADS=1
if [ "$SLURM_CPUS_PER_GPU" -gt 0 ] ; then
export OMP_NUM_THREADS=$SLURM_CPUS_PER_GPU
fi
# Env vairables check
if [ -z "$DIST_MODE" ]; then
>&2 echo "ERROR: env variable DIST_MODE is not set. Allowed values are 'horovod', 'ddp' or 'deepspeed'"
exit 1
fi
if [ -z "$RUN_NAME" ]; then
>&2 echo "WARNING: env variable RUN_NAME is not set. It's a way to identify some specific run of an experiment."
RUN_NAME=$DIST_MODE
fi
if [ -z "$TRAINING_CMD" ]; then
>&2 echo "ERROR: env variable TRAINING_CMD is not set. It's the python command to execute."
exit 1
fi
if [ -z "$PYTHON_VENV" ]; then
>&2 echo "WARNING: env variable PYTHON_VENV is not set. It's the path to a python virtual environment."
else
# Activate Python virtual env
source $PYTHON_VENV/bin/activate
fi
# Get GPUs info per node
srun --cpu-bind=none --ntasks-per-node=1 bash -c 'echo -e "NODE hostname: $(hostname)\n$(nvidia-smi)\n\n"'
# Launch training
if [ "$DIST_MODE" == "ddp" ] ; then
echo "DDP training: $TRAINING_CMD"
srun --cpu-bind=none --ntasks-per-node=1 \
bash -c "torchrun \
--log_dir='logs_torchrun' \
--nnodes=$SLURM_NNODES \
--nproc_per_node=$SLURM_GPUS_PER_NODE \
--rdzv_id=$SLURM_JOB_ID \
--rdzv_conf=is_host=\$(((SLURM_NODEID)) && echo 0 || echo 1) \
--rdzv_backend=c10d \
--rdzv_endpoint='$(scontrol show hostnames "$SLURM_JOB_NODELIST" | head -n 1)'i:29500 \
$TRAINING_CMD"
elif [ "$DIST_MODE" == "deepspeed" ] ; then
echo "DEEPSPEED training: $TRAINING_CMD"
srun --cpu-bind=none --ntasks-per-node=1 \
bash -c "torchrun \
--log_dir='logs_torchrun' \
--nnodes=$SLURM_NNODES \
--nproc_per_node=$SLURM_GPUS_PER_NODE \
--rdzv_id=$SLURM_JOB_ID \
--rdzv_conf=is_host=\$(((SLURM_NODEID)) && echo 0 || echo 1) \
--rdzv_backend=c10d \
--rdzv_endpoint='$(scontrol show hostnames "$SLURM_JOB_NODELIST" | head -n 1)'i:29500 \
$TRAINING_CMD --deepspeed"
# # Run with deepspeed launcher: set --ntasks-per-node=1
# # https://www.deepspeed.ai/getting-started/#multi-node-environment-variables
# export NCCL_IB_DISABLE=1
# export NCCL_SOCKET_IFNAME=eth0
# nodelist=$(scontrol show hostname $SLURM_NODELIST)
# echo "$nodelist" | sed -e 's/$/ slots=4/' > .hostfile
# # Requires passwordless SSH access among compute node
# srun --cpu-bind=none deepspeed --hostfile=.hostfile $TRAINING_CMD --deepspeed
# rm .hostfile
elif [ "$DIST_MODE" == "horovod" ] ; then
echo "HOROVOD training: $TRAINING_CMD"
srun --cpu-bind=none --ntasks-per-node=$SLURM_GPUS_PER_NODE --cpus-per-task=$SLURM_CPUS_PER_GPU \
python -u $TRAINING_CMD
else
>&2 echo "ERROR: unrecognized \$DIST_MODE env variable"
exit 1
fi
1.6. runall.sh
#!/bin/bash
# --------------------------------------------------------------------------------------
# Part of the interTwin Project: https://www.intertwin.eu/
#
# Created by: Matteo Bunino
#
# Credit:
# - Matteo Bunino <matteo.bunino@cern.ch> - CERN
# --------------------------------------------------------------------------------------
# Python virtual environment
PYTHON_VENV="../../../envAI_hdfml"
# Clear SLURM logs (*.out and *.err files)
rm -rf logs_slurm
mkdir logs_slurm
rm -rf logs_torchrun
# DDP itwinai
DIST_MODE="ddp"
RUN_NAME="ddp-itwinai"
TRAINING_CMD="train.py -s ddp"
sbatch --export=ALL,DIST_MODE="$DIST_MODE",RUN_NAME="$RUN_NAME",TRAINING_CMD="$TRAINING_CMD",PYTHON_VENV="$PYTHON_VENV" \
--job-name="$RUN_NAME-n$N" \
--output="logs_slurm/job-$RUN_NAME-n$N.out" \
--error="logs_slurm/job-$RUN_NAME-n$N.err" \
slurm.sh
# DeepSpeed itwinai
DIST_MODE="deepspeed"
RUN_NAME="deepspeed-itwinai"
TRAINING_CMD="train.py -s deepspeed"
sbatch --export=ALL,DIST_MODE="$DIST_MODE",RUN_NAME="$RUN_NAME",TRAINING_CMD="$TRAINING_CMD",PYTHON_VENV="$PYTHON_VENV" \
--job-name="$RUN_NAME-n$N" \
--output="logs_slurm/job-$RUN_NAME-n$N.out" \
--error="logs_slurm/job-$RUN_NAME-n$N.err" \
slurm.sh
# Horovod itwinai
DIST_MODE="horovod"
RUN_NAME="horovod-itwinai"
TRAINING_CMD="train.py -s horovod"
sbatch --export=ALL,DIST_MODE="$DIST_MODE",RUN_NAME="$RUN_NAME",TRAINING_CMD="$TRAINING_CMD",PYTHON_VENV="$PYTHON_VENV" \
--job-name="$RUN_NAME-n$N" \
--output="logs_slurm/job-$RUN_NAME-n$N.out" \
--error="logs_slurm/job-$RUN_NAME-n$N.err" \
slurm.sh