1. PyTorch basics example

1.1. Tutorial: distributed strategies for PyTorch

In this tutorial we show how to use torch DistributedDataParallel (DDP), Horovod and DeepSpeed from the same client code. Note that the environment is tested on the HDFML system at JSC. For other systems, the module versions might need change accordingly.

1.1.1. Setup

First, from the root of this repository, build the environment containing pytorch, horovod and deepspeed. You can try with:

# Creates a Python venv called envAI_hdfml
make torch-gpu-jsc

1.1.2. Distributed training

Each distributed strategy has its own SLURM job script, which should be used to run it:

If you want to distribute the code in train.py with torch DDP, run from terminal:

export DIST_MODE="ddp"
export RUN_NAME="ddp-itwinai"
export TRAINING_CMD="train.py -s ddp"
export PYTHON_VENV="../../../envAI_hdfml"
sbatch --export=ALL,DIST_MODE="$DIST_MODE",RUN_NAME="$RUN_NAME",TRAINING_CMD="$TRAINING_CMD",PYTHON_VENV="$PYTHON_VENV" \
    --job-name="$RUN_NAME-n$N" \
    --output="logs_slurm/job-$RUN_NAME-n$N.out" \
    --error="logs_slurm/job-$RUN_NAME-n$N.err" \
    slurm.sh

If you want to distribute the code in train.py with DeepSpeed, run from terminal:

export DIST_MODE="deepspeed"
export RUN_NAME="deepspeed-itwinai"
export TRAINING_CMD="train.py -s deepspeed"
export PYTHON_VENV="../../../envAI_hdfml"
sbatch --export=ALL,DIST_MODE="$DIST_MODE",RUN_NAME="$RUN_NAME",TRAINING_CMD="$TRAINING_CMD",PYTHON_VENV="$PYTHON_VENV" \
    --job-name="$RUN_NAME-n$N" \
    --output="logs_slurm/job-$RUN_NAME-n$N.out" \
    --error="logs_slurm/job-$RUN_NAME-n$N.err" \
    slurm.sh

If you want to distribute the code in train.py with Horovod, run from terminal:

export DIST_MODE="deepspeed"
export RUN_NAME="deepspeed-itwinai"
export TRAINING_CMD="train.py -s deepspeed"
export PYTHON_VENV="../../../envAI_hdfml"
sbatch --export=ALL,DIST_MODE="$DIST_MODE",RUN_NAME="$RUN_NAME",TRAINING_CMD="$TRAINING_CMD",PYTHON_VENV="$PYTHON_VENV" \
    --job-name="$RUN_NAME-n$N" \
    --output="logs_slurm/job-$RUN_NAME-n$N.out" \
    --error="logs_slurm/job-$RUN_NAME-n$N.err" \
    slurm.sh

You can run all of them with:

bash runall.sh

1.1.3. train.py

"""
Show how to use DDP, Horovod and DeepSpeed strategies interchangeably
with an extremely simple neural network.
"""
from typing import Dict
import argparse
import time

import torch
from torch import nn
from torch.utils.data import Dataset

import horovod.torch as hvd

from itwinai.torch.distributed import (
    distributed_resources_available,
    TorchDistributedStrategy,
    TorchDDPStrategy,
    HorovodStrategy,
    DeepSpeedStrategy,
    NonDistributedStrategy
)


def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--strategy", "-s", type=str,
        choices=['ddp', 'horovod', 'deepspeed'],
        default='ddp'
    )
    parser.add_argument(
        "--shuffle_dataloader",
        action=argparse.BooleanOptionalAction
    )
    parser.add_argument(
        '--batch-size', type=int, default=10,
        help='input batch size for training (default: 10)')

    # DeepSpeed: needs to be removed
    import deepspeed
    parser.add_argument('--local_rank', type=int, default=-1,
                        help='local rank passed from distributed launcher')
    parser = deepspeed.add_config_arguments(parser)
    args = parser.parse_args()
    return args


class UniformRndDataset(Dataset):
    """Dummy torch dataset."""

    def __init__(self, x_size: int, y_size: int, len: int = 100):
        super().__init__()
        self.x_size = x_size
        self.y_size = y_size
        self.len = len

    def __len__(self):
        return self.len

    def __getitem__(self, index):
        return torch.rand(self.x_size), torch.rand(self.y_size)


def training_fn(
        args: argparse.Namespace,
        strategy: TorchDistributedStrategy,
        distribute_kwargs: Dict
) -> int:
    """Dummy training function."""
    strategy.init()

    # Local model
    model = nn.Linear(3, 4)
    optim = torch.optim.Adam(model.parameters(), lr=1e-3)
    loss_fn = nn.MSELoss()
    # Distributed model
    model, optim, lr_sched = strategy.distributed(
        model, optim, lr_scheduler=None, **distribute_kwargs
    )

    # Data
    train_set = UniformRndDataset(x_size=3, y_size=4)
    # Distributed dataloader
    train_loader = strategy.create_dataloader(
        train_set, batch_size=args.batch_size, num_workers=1)

    # Device allocated for this worker
    device = strategy.device()

    for epoch in range(2):
        for (x, y) in train_loader:
            # print(f"tensor to cuda:{device}")
            x = x.to(device)
            y = y.to(device)

            optim.zero_grad()

            y_pred = model(x)

            loss = loss_fn(y_pred, y)
            loss.backward()

            optim.step()

            if strategy.is_main_worker:
                print(f"Loss [epoch={epoch}]: {loss.item()}")
            # print(f"NNLoss [epoch={epoch}]: {loss.item()}")

        # Update scheduler
        if lr_sched:
            lr_sched.step()

    time.sleep(1)
    print(f"<Global rank: {strategy.global_rank()}> - TRAINING FINISHED")
    strategy.clean_up()
    return 123


if __name__ == "__main__":

    args = parse_args()

    # Instantiate Strategy
    if not distributed_resources_available():
        print("WARNING: falling back to non-distributed strategy.")
        strategy = NonDistributedStrategy()
        distribute_kwargs = {}
    elif args.strategy == 'ddp':
        strategy = TorchDDPStrategy(backend='nccl')
        distribute_kwargs = {}
    elif args.strategy == 'horovod':
        strategy = HorovodStrategy()
        distribute_kwargs = dict(
            compression=hvd.Compression.none,
            op=hvd.Average,
            gradient_predivide_factor=1.0
        )
    elif args.strategy == 'deepspeed':
        strategy = DeepSpeedStrategy(backend='nccl')
        distribute_kwargs = dict(
            config_params=dict(train_micro_batch_size_per_gpu=args.batch_size)
        )
    else:
        raise NotImplementedError(
            f"Strategy {args.strategy} is not recognized/implemented.")
    # Launch distributed training
    training_fn(args, strategy, distribute_kwargs)

1.1.4. runall.sh

#!/bin/bash

# Python virtual environment
PYTHON_VENV="../../../envAI_hdfml"

# Clear SLURM logs (*.out and *.err files)
rm -rf logs_slurm
mkdir logs_slurm
rm -rf logs_torchrun

# DDP itwinai
DIST_MODE="ddp"
RUN_NAME="ddp-itwinai"
TRAINING_CMD="train.py -s ddp"
sbatch --export=ALL,DIST_MODE="$DIST_MODE",RUN_NAME="$RUN_NAME",TRAINING_CMD="$TRAINING_CMD",PYTHON_VENV="$PYTHON_VENV" \
    --job-name="$RUN_NAME-n$N" \
    --output="logs_slurm/job-$RUN_NAME-n$N.out" \
    --error="logs_slurm/job-$RUN_NAME-n$N.err" \
    slurm.sh

# DeepSpeed itwinai
DIST_MODE="deepspeed"
RUN_NAME="deepspeed-itwinai"
TRAINING_CMD="train.py -s deepspeed"
sbatch --export=ALL,DIST_MODE="$DIST_MODE",RUN_NAME="$RUN_NAME",TRAINING_CMD="$TRAINING_CMD",PYTHON_VENV="$PYTHON_VENV" \
    --job-name="$RUN_NAME-n$N" \
    --output="logs_slurm/job-$RUN_NAME-n$N.out" \
    --error="logs_slurm/job-$RUN_NAME-n$N.err" \
    slurm.sh

# Horovod itwinai
DIST_MODE="horovod"
RUN_NAME="horovod-itwinai"
TRAINING_CMD="train.py -s horovod"
sbatch --export=ALL,DIST_MODE="$DIST_MODE",RUN_NAME="$RUN_NAME",TRAINING_CMD="$TRAINING_CMD",PYTHON_VENV="$PYTHON_VENV" \
    --job-name="$RUN_NAME-n$N" \
    --output="logs_slurm/job-$RUN_NAME-n$N.out" \
    --error="logs_slurm/job-$RUN_NAME-n$N.err" \
    slurm.sh

1.1.5. slurm.sh

#!/bin/bash

# SLURM jobscript for JSC systems

# Job configuration
#SBATCH --job-name=distributed_training
#SBATCH --account=intertwin
#SBATCH --mail-user=
#SBATCH --mail-type=ALL
#SBATCH --output=job.out
#SBATCH --error=job.err
#SBATCH --time=00:30:00

# Resources allocation
#SBATCH --partition=batch
#SBATCH --nodes=2
#SBATCH --gpus-per-node=4
#SBATCH --cpus-per-gpu=4
#SBATCH --exclusive

# gres options have to be disabled for deepv
#SBATCH --gres=gpu:4

# Load environment modules
ml Stages/2024 GCC OpenMPI CUDA/12 MPI-settings/CUDA Python HDF5 PnetCDF libaio mpi4py

# Job info
echo "DEBUG: TIME: $(date)"
sysN="$(uname -n | cut -f2- -d.)"
sysN="${sysN%%[0-9]*}"
echo "Running on system: $sysN"
echo "DEBUG: EXECUTE: $EXEC"
echo "DEBUG: SLURM_SUBMIT_DIR: $SLURM_SUBMIT_DIR"
echo "DEBUG: SLURM_JOB_ID: $SLURM_JOB_ID"
echo "DEBUG: SLURM_JOB_NODELIST: $SLURM_JOB_NODELIST"
echo "DEBUG: SLURM_NNODES: $SLURM_NNODES"
echo "DEBUG: SLURM_NTASKS: $SLURM_NTASKS"
echo "DEBUG: SLURM_TASKS_PER_NODE: $SLURM_TASKS_PER_NODE"
echo "DEBUG: SLURM_SUBMIT_HOST: $SLURM_SUBMIT_HOST"
echo "DEBUG: SLURMD_NODENAME: $SLURMD_NODENAME"
echo "DEBUG: CUDA_VISIBLE_DEVICES: $CUDA_VISIBLE_DEVICES"
if [ "$DEBUG" = true ] ; then
  echo "DEBUG: NCCL_DEBUG=INFO" 
  export NCCL_DEBUG=INFO
fi
echo

# Setup env for distributed ML
export CUDA_VISIBLE_DEVICES="0,1,2,3"
export OMP_NUM_THREADS=1
if [ "$SLURM_CPUS_PER_GPU" -gt 0 ] ; then
  export OMP_NUM_THREADS=$SLURM_CPUS_PER_GPU
fi

# Env vairables check
if [ -z "$DIST_MODE" ]; then 
  >&2 echo "ERROR: env variable DIST_MODE is not set. Allowed values are 'horovod', 'ddp' or 'deepspeed'"
  exit 1
fi
if [ -z "$RUN_NAME" ]; then 
  >&2 echo "WARNING: env variable RUN_NAME is not set. It's a way to identify some specific run of an experiment."
  RUN_NAME=$DIST_MODE
fi
if [ -z "$TRAINING_CMD" ]; then 
  >&2 echo "ERROR: env variable TRAINING_CMD is not set. It's the python command to execute."
  exit 1
fi
if [ -z "$PYTHON_VENV" ]; then 
  >&2 echo "WARNING: env variable PYTHON_VENV is not set. It's the path to a python virtual environment."
else
  # Activate Python virtual env
  source $PYTHON_VENV/bin/activate
fi

# Get GPUs info per node
srun --cpu-bind=none --ntasks-per-node=1 bash -c 'echo -e "NODE hostname: $(hostname)\n$(nvidia-smi)\n\n"'

# Launch training
if [ "$DIST_MODE" == "ddp" ] ; then
  echo "DDP training: $TRAINING_CMD"
  srun --cpu-bind=none --ntasks-per-node=1 \
    bash -c "torchrun \
    --log_dir='logs_torchrun' \
    --nnodes=$SLURM_NNODES \
    --nproc_per_node=$SLURM_GPUS_PER_NODE \
    --rdzv_id=$SLURM_JOB_ID \
    --rdzv_conf=is_host=\$(((SLURM_NODEID)) && echo 0 || echo 1) \
    --rdzv_backend=c10d \
    --rdzv_endpoint='$(scontrol show hostnames "$SLURM_JOB_NODELIST" | head -n 1)'i:29500 \
    $TRAINING_CMD"
elif [ "$DIST_MODE" == "deepspeed" ] ; then
  echo "DEEPSPEED training: $TRAINING_CMD"
  MASTER_ADDR=$(scontrol show hostnames "\$SLURM_JOB_NODELIST" | head -n 1)i
  export MASTER_ADDR
  export MASTER_PORT=29500 

  srun --cpu-bind=none --ntasks-per-node=$SLURM_GPUS_PER_NODE --cpus-per-task=$SLURM_CPUS_PER_GPU \
    python -u $TRAINING_CMD --deepspeed

  # # Run with deepspeed launcher: set --ntasks-per-node=1
  # # https://www.deepspeed.ai/getting-started/#multi-node-environment-variables
  # export NCCL_IB_DISABLE=1
  # export NCCL_SOCKET_IFNAME=eth0
  # nodelist=$(scontrol show hostname $SLURM_NODELIST)
  # echo "$nodelist" | sed -e 's/$/ slots=4/' > .hostfile
  # # Requires passwordless SSH access among compute node
  # srun --cpu-bind=none deepspeed --hostfile=.hostfile $TRAINING_CMD --deepspeed
  # rm .hostfile
elif [ "$DIST_MODE" == "horovod" ] ; then
  echo "HOROVOD training: $TRAINING_CMD"
  srun --cpu-bind=none --ntasks-per-node=$SLURM_GPUS_PER_NODE --cpus-per-task=$SLURM_CPUS_PER_GPU \
    python -u $TRAINING_CMD
else
  >&2 echo "ERROR: unrecognized \$DIST_MODE env variable"
  exit 1
fi