6. itwinai and containers (Docker and Singularity)

In this tutorial you will learn how to use itwinai’s containers images to run your ML workflows without having to setup the python environment by means of virtual environments.

6.1. Using itwinai container to run training on MNIST

Author(s): Matteo Bunino (CERN)

In this tutorial we show how to use containers to run machine learning workflows with itwinai.

Container images are pulled from the GHCR associated with our GH repository and are Docker images. Although Docker is generally not supported in HPC environments, Singularity supports Docker images and is able to convert them to Singularity images (SIF files) upon pull.

The examples will showcase training of a simple neural network defined in model.py on the MNIST benchmark dataset. The ML workflow is defined using itwinai Pipeline, the training algorithm is implemented by the itwinai TorchTrainer, and the training parameters are defined in config.yaml.

In this tutorial we are using general purpose itwinai container images to execute a use case code. This is possible when the use case does not depend on additional packages not included in the container image. If you want to add dependencies, you need to create a new container image using itwinai as base image. A minimal example of a custom Dockerfile:

FROM ghcr.io/intertwin-eu/itwinai:0.2.2-torch-2.1
RUN pip install --no-cache-dir PYTHON_PACKAGE

6.1.1. Docker (non-HPC environments)

When executing a Docker container, you need to explicitly mount the current working directory in the container, making it possible for the script executed in the container to use existing files and create new files in the current directory (on in another location). This can be achieved by bind mounting the current working directory in some location in the container, and moving to that location in the container before executing the desired command.

bash run_docker.sh

The script above runs the following command in the itwinai torch container in this folder:

itwinai exec-pipeline +pipe_key=training_pipeline

Warning

When using Docker, if your container does not recognizes the GPUs of your VM you may need to install the Nvidia container toolkit , if not already installed.

6.1.2. Singularity (HPC environments)

With singularity there is no need to explicitly bind mount the current working directory (CWD) in the container as this is already done automatically by Singularity. Moreover, the CWD inside the container coincides with the CWD outside the container, not requiring to change directory before executing the command inside the container. However, differently from Docker, Singularity does not automatically allow to write in locations inside the container. It is therefore suggested to save results in the CWD, or in other locations mounted in the container.

First of all, pull the Docker image and convert it to a Singularity image:

# If needed, remove existing Singularity image before proceeding
rm -rf itwinai_torch.sif

# Pull Docker image and convert it to Singularity on login node
singularity pull itwinai_torch.sif docker://ghcr.io/intertwin-eu/itwinai:0.2.2-torch-2.1

Before running distributed ML on the computing node of some HPC cluster, make sure to download the dataset as usually there is not internet connection on compute nodes:

# Run only the first step on the HPC login node, which downloads the datasets if not present
singularity run itwinai_torch.sif /bin/bash -c \
    "itwinai exec-pipeline +pipe_key=training_pipeline +pipe_steps=[dataloading_step]"

Now run distributed ML on multiple compute nodes using both Torch DDP and Microsoft DeepSpeed:

# Run on distributed ML job (torch DDP is the default one)
sbatch slurm.sh

# Alternatively, run all distributed jobs
bash runall.sh

Note

Please note that at the moment Horovod distributed training using containerized environments is not supported.

6.2. Shell scripts

6.2.1. run_docker.sh

#!/bin/bash

# --------------------------------------------------------------------------------------
# Part of the interTwin Project: https://www.intertwin.eu/
#
# Created by: Matteo Bunino
#
# Credit:
# - Matteo Bunino <matteo.bunino@cern.ch> - CERN
# --------------------------------------------------------------------------------------

CMD="itwinai exec-pipeline"

# Run command in the itwinai torch Docker container
if [ -z "$1" ]; then
    # CPU only execution
    docker run -it --rm --name mnist-training --user $UID:$GID \
        --ipc=host --ulimit memlock=-1 --ulimit stack=67108864 \
        -v "$PWD":/use-case  ghcr.io/intertwin-eu/itwinai:0.2.2-torch-2.1 \
        /bin/bash -c "cd /use-case && $CMD"
elif [ "$1" == "gpu" ]; then
    # With GPU support: --gpus all
    docker run -it --rm --name mnist-training --user $UID:$GID \
        --gpus all --ipc=host --ulimit memlock=-1 --ulimit stack=67108864 \
        -v "$PWD":/use-case  ghcr.io/intertwin-eu/itwinai:0.2.2-torch-2.1 \
        /bin/bash -c "cd /use-case && $CMD"
fi

6.2.2. slurm.sh

#!/bin/bash

# SLURM jobscript for JSC systems

# Job configuration
#SBATCH --job-name=distributed_training
#SBATCH --account=intertwin
#SBATCH --mail-user=
#SBATCH --mail-type=ALL
#SBATCH --output=job.out
#SBATCH --error=job.err
#SBATCH --time=00:30:00

# Resources allocation
#SBATCH --partition=batch
#SBATCH --nodes=2
#SBATCH --gpus-per-node=4
#SBATCH --cpus-per-gpu=4
#SBATCH --exclusive

# gres options have to be disabled for deepv
#SBATCH --gres=gpu:4

# Load environment modules
# ml Stages/2024 GCC OpenMPI CUDA/12 MPI-settings/CUDA Python HDF5 PnetCDF libaio  mpi4py
ml --force purge

# Job info
echo "DEBUG: TIME: $(date)"
sysN="$(uname -n | cut -f2- -d.)"
sysN="${sysN%%[0-9]*}"
echo "Running on system: $sysN"
echo "DEBUG: EXECUTE: $EXEC"
echo "DEBUG: SLURM_SUBMIT_DIR: $SLURM_SUBMIT_DIR"
echo "DEBUG: SLURM_JOB_ID: $SLURM_JOB_ID"
echo "DEBUG: SLURM_JOB_NODELIST: $SLURM_JOB_NODELIST"
echo "DEBUG: SLURM_NNODES: $SLURM_NNODES"
echo "DEBUG: SLURM_NTASKS: $SLURM_NTASKS"
echo "DEBUG: SLURM_TASKS_PER_NODE: $SLURM_TASKS_PER_NODE"
echo "DEBUG: SLURM_SUBMIT_HOST: $SLURM_SUBMIT_HOST"
echo "DEBUG: SLURMD_NODENAME: $SLURMD_NODENAME"
echo "DEBUG: CUDA_VISIBLE_DEVICES: $CUDA_VISIBLE_DEVICES"
if [ "$DEBUG" = true ] ; then
  echo "DEBUG: NCCL_DEBUG=INFO" 
  export NCCL_DEBUG=INFO
fi
echo

# Setup env for distributed ML
export CUDA_VISIBLE_DEVICES="0,1,2,3"
export OMP_NUM_THREADS=1
if [ "$SLURM_CPUS_PER_GPU" -gt 0 ] ; then
  export OMP_NUM_THREADS=$SLURM_CPUS_PER_GPU
fi

# Env vairables check
if [ -z "$DIST_MODE" ]; then 
  >&2 echo "WARNING: env variable DIST_MODE is not set. Allowed values are 'horovod', 'ddp' or 'deepspeed'. Using 'ddp'."
  DIST_MODE='ddp'
fi
if [ -z "$RUN_NAME" ]; then 
  >&2 echo "WARNING: env variable RUN_NAME is not set. It's a way to identify some specific run of an experiment."
  RUN_NAME=$DIST_MODE
fi
if [ -z "$TRAINING_CMD" ]; then 
  >&2 echo "WARNING: env variable TRAINING_CMD is not set. It's the python command to execute."
  TRAINING_CMD='$(/usr/bin/which itwinai) exec-pipeline +pipe_key=training_pipeline strategy=ddp'
  >&2 echo "setting TRAINING_CMD=$TRAINING_CMD"
fi

# Get GPUs info per node
# srun --cpu-bind=none --ntasks-per-node=1 bash -c 'echo -e "NODE hostname: $(hostname)\n$(nvidia-smi)\n\n"'

# Launch training
if [ "$DIST_MODE" == "ddp" ] ; then
  echo "DDP training: $TRAINING_CMD"
  srun --cpu-bind=none --ntasks-per-node=1 \
    singularity run --nv itwinai_torch.sif /bin/bash -c \
    "torchrun \
    --log_dir='logs_torchrun' \
    --nnodes=$SLURM_NNODES \
    --nproc_per_node=$SLURM_GPUS_PER_NODE \
    --rdzv_id=$SLURM_JOB_ID \
    --rdzv_conf=is_host=\$(((SLURM_NODEID)) && echo 0 || echo 1) \
    --rdzv_backend=c10d \
    --rdzv_endpoint='$(scontrol show hostnames "$SLURM_JOB_NODELIST" | head -n 1)'i:29500 \
    $TRAINING_CMD"
elif [ "$DIST_MODE" == "deepspeed" ] ; then
  echo "DEEPSPEED training: $TRAINING_CMD"
  srun --cpu-bind=none --ntasks-per-node=1 \
    singularity run --nv itwinai_torch.sif /bin/bash -c \
    "torchrun \
    --log_dir='logs_torchrun' \
    --nnodes=$SLURM_NNODES \
    --nproc_per_node=$SLURM_GPUS_PER_NODE \
    --rdzv_id=$SLURM_JOB_ID \
    --rdzv_conf=is_host=\$(((SLURM_NODEID)) && echo 0 || echo 1) \
    --rdzv_backend=c10d \
    --rdzv_endpoint='$(scontrol show hostnames "$SLURM_JOB_NODELIST" | head -n 1)'i:29500 \
    $TRAINING_CMD"

  # MASTER_ADDR=$(scontrol show hostnames "\$SLURM_JOB_NODELIST" | head -n 1)i
  # export MASTER_ADDR
  # export MASTER_PORT=29500 

  # srun --cpu-bind=none --ntasks-per-node=$SLURM_GPUS_PER_NODE --cpus-per-task=$SLURM_CPUS_PER_GPU \
  #   --mpi=pmi2 singularity run --nv  \
  #   --env MASTER_ADDR=$MASTER_ADDR,MASTER_PORT=$MASTER_PORT \
  #   itwinai_torch.sif /bin/bash -c "$TRAINING_CMD"

elif [ "$DIST_MODE" == "horovod" ] ; then
  echo "Horovod is not currently supported in conjuction with containers"
  exit 2

  # echo "HOROVOD training: $TRAINING_CMD"
  # srun --cpu-bind=none --ntasks-per-node=$SLURM_GPUS_PER_NODE --cpus-per-task=$SLURM_CPUS_PER_GPU \
  #   --mpi=pmix singularity run --nv itwinai_torch.sif \
  #   /bin/bash -c "$TRAINING_CMD"
else
  >&2 echo "ERROR: unrecognized \$DIST_MODE env variable"
  exit 1
fi

6.2.3. runall.sh

#!/bin/bash

# --------------------------------------------------------------------------------------
# Part of the interTwin Project: https://www.intertwin.eu/
#
# Created by: Matteo Bunino
#
# Credit:
# - Matteo Bunino <matteo.bunino@cern.ch> - CERN
# --------------------------------------------------------------------------------------

# Clear SLURM logs (*.out and *.err files)
rm -rf logs_slurm
mkdir logs_slurm
rm -rf logs_torchrun

# DDP itwinai
DIST_MODE="ddp"
RUN_NAME="ddp-itwinai"
TRAINING_CMD='/usr/local/bin/itwinai exec-pipeline strategy=ddp'
sbatch --export=ALL,DIST_MODE="$DIST_MODE",RUN_NAME="$RUN_NAME",TRAINING_CMD="$TRAINING_CMD" \
    --job-name="$RUN_NAME-n$N" \
    --output="logs_slurm/job-$RUN_NAME-n$N.out" \
    --error="logs_slurm/job-$RUN_NAME-n$N.err" \
    slurm.sh

# DeepSpeed itwinai
DIST_MODE="deepspeed"
RUN_NAME="deepspeed-itwinai"
TRAINING_CMD='/usr/local/bin/itwinai exec-pipeline strategy=deepspeed'
sbatch --export=ALL,DIST_MODE="$DIST_MODE",RUN_NAME="$RUN_NAME",TRAINING_CMD="$TRAINING_CMD" \
    --job-name="$RUN_NAME-n$N" \
    --output="logs_slurm/job-$RUN_NAME-n$N.out" \
    --error="logs_slurm/job-$RUN_NAME-n$N.err" \
    slurm.sh

# # Horovod itwinai
# DIST_MODE="horovod"
# RUN_NAME="horovod-itwinai"
# TRAINING_CMD='/usr/local/bin/itwinai exec-pipeline strategy=horovod'
# sbatch --export=ALL,DIST_MODE="$DIST_MODE",RUN_NAME="$RUN_NAME",TRAINING_CMD="$TRAINING_CMD" \
#     --job-name="$RUN_NAME-n$N" \
#     --output="logs_slurm/job-$RUN_NAME-n$N.out" \
#     --error="logs_slurm/job-$RUN_NAME-n$N.err" \
#     slurm.sh

6.3. Pipeline configuration

6.3.1. config.yaml

# --------------------------------------------------------------------------------------
# Part of the interTwin Project: https://www.intertwin.eu/
#
# Created by: Matteo Bunino
#
# Credit:
# - Matteo Bunino <matteo.bunino@cern.ch> - CERN
# --------------------------------------------------------------------------------------

# General config
dataset_root: .tmp/
num_classes: 10
batch_size: 64
num_workers_dataloader: 4
pin_memory: False
lr: 0.001
momentum: 0.9
fp16_allreduce: False
use_adasum: False
gradient_predivide_factor: 1.0
epochs: 2
strategy: ddp
test_data_path: mnist-sample-data
inference_model_mlflow_uri: mnist-pre-trained.pth
predictions_dir: mnist-predictions
predictions_file: predictions.csv
class_labels: null

# Workflows configuration
training_pipeline:
  _target_: itwinai.pipeline.Pipeline
  steps:
    dataloading_step:
      _target_: dataloader.MNISTDataModuleTorch
      save_path: ${dataset_root}

      training_step:
        _target_: itwinai.torch.trainer.TorchTrainer
        config:
          batch_size: ${batch_size}
          num_workers: ${num_workers_dataloader}
          pin_memory: ${pin_memory}
          lr: ${lr}
          momentum: ${momentum}
          fp16_allreduce: ${fp16_allreduce}
          use_adasum: ${use_adasum}
          gradient_predivide_factor: ${gradient_predivide_factor}

          model:
            _target_: model.Net
          epochs: ${epochs}
          metrics:
            accuracy:
              _target_: torchmetrics.classification.MulticlassAccuracy
              num_classes: ${num_classes}
            precision:
              _target_: torchmetrics.classification.MulticlassPrecision
              num_classes: ${num_classes}
            recall:
              _target_: torchmetrics.classification.MulticlassRecall
              num_classes: ${num_classes}
          logger:
            _target_: itwinai.loggers.LoggersCollection
            loggers:
              - _target_: itwinai.loggers.ConsoleLogger
                log_freq: 10000
              - _target_: itwinai.loggers.MLFlowLogger
                experiment_name: MNIST classifier
                log_freq: batch 
        strategy: ${strategy}
        # checkpoint_every: 1

6.4. Python files

6.4.1. model.py

# --------------------------------------------------------------------------------------
# Part of the interTwin Project: https://www.intertwin.eu/
#
# Created by: Matteo Bunino
#
# Credit:
# - Matteo Bunino <matteo.bunino@cern.ch> - CERN
# --------------------------------------------------------------------------------------


import torch.nn.functional as F
from torch import nn


class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
        self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
        self.conv2_drop = nn.Dropout2d()
        self.fc1 = nn.Linear(320, 50)
        self.fc2 = nn.Linear(50, 10)

    def forward(self, x):
        x = F.relu(F.max_pool2d(self.conv1(x), 2))
        x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
        x = x.view(-1, 320)
        x = F.relu(self.fc1(x))
        x = F.dropout(x, training=self.training)
        x = self.fc2(x)
        return F.log_softmax(x, dim=0)

6.4.2. dataloader.py

# --------------------------------------------------------------------------------------
# Part of the interTwin Project: https://www.intertwin.eu/
#
# Created by: Matteo Bunino
#
# Credit:
# - Matteo Bunino <matteo.bunino@cern.ch> - CERN
# --------------------------------------------------------------------------------------

"""Dataloader for Torch-based MNIST use case."""

import os
import shutil
from typing import Any, Callable, Optional, Tuple

from PIL import Image
from torch.utils.data import Dataset
from torchvision import datasets, transforms

from itwinai.components import DataGetter, monitor_exec


class MNISTDataModuleTorch(DataGetter):
    """Download MNIST dataset for torch."""

    def __init__(
        self,
        save_path: str = ".tmp/",
    ) -> None:
        super().__init__()
        self.save_parameters(**self.locals2params(locals()))
        self.save_path = save_path

    @monitor_exec
    def execute(self) -> Tuple[Dataset, Dataset]:
        train_dataset = datasets.MNIST(
            self.save_path,
            train=True,
            download=True,
            transform=transforms.Compose(
                [transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]
            ),
        )
        validation_dataset = datasets.MNIST(
            self.save_path,
            train=False,
            download=True,
            transform=transforms.Compose(
                [transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]
            ),
        )
        print("Train and validation datasets loaded.")
        return train_dataset, validation_dataset, None


class InferenceMNIST(Dataset):
    """Loads a set of MNIST images from a folder of JPG files."""

    def __init__(
        self, root: str, transform: Optional[Callable] = None, supported_format: str = ".jpg"
    ) -> None:
        self.root = root
        self.transform = transform
        self.supported_format = supported_format
        self.data = dict()
        self._load()

    def _load(self):
        for img_file in os.listdir(self.root):
            if not img_file.lower().endswith(self.supported_format):
                continue
            filename = os.path.basename(img_file)
            img = Image.open(os.path.join(self.root, img_file))
            self.data[filename] = img

    def __len__(self) -> int:
        return len(self.data)

    def __getitem__(self, index: int) -> Tuple[Any, Any]:
        """
        Args:
            index (int): Index

        Returns:
            tuple: (image_identifier, image) where image_identifier
                is the unique identifier for the image (e.g., filename).
        """
        img_id, img = list(self.data.items())[index]

        if self.transform is not None:
            img = self.transform(img)

        return img_id, img

    @staticmethod
    def generate_jpg_sample(root: str, max_items: int = 100):
        """Generate a sample dataset of JPG images starting from
            LeCun's test dataset.

        Args:
            root (str): sample path on disk
            max_items (int, optional): max number of images to
                generate. Defaults to 100.
        """
        if os.path.exists(root):
            shutil.rmtree(root)
        os.makedirs(root)

        test_data = datasets.MNIST(root=".tmp", train=False, download=True)
        for idx, (img, _) in enumerate(test_data):
            if idx >= max_items:
                break
            savepath = os.path.join(root, f"digit_{idx}.jpg")
            img.save(savepath)


class MNISTPredictLoader(DataGetter):
    def __init__(self, test_data_path: str) -> None:
        super().__init__()
        self.save_parameters(**self.locals2params(locals()))
        self.test_data_path = test_data_path

    @monitor_exec
    def execute(self) -> Dataset:
        return InferenceMNIST(
            root=self.test_data_path,
            transform=transforms.Compose(
                [transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]
            ),
        )