MNIST

This section covers the MNIST use case, which utilizes the torch-lightning framework for training and evaluation. The following files are integral to this use case:

Torch Lightning

Training

# Download dataset and exit: only run first step in the pipeline (index=0)
itwinai exec-pipeline --config config.yaml --pipe-key training_pipeline --steps 0

# Run the whole training pipeline
itwinai exec-pipeline --config config.yaml --pipe-key training_pipeline

View training logs on MLFLow server (if activated from the configuration):

mlflow ui --backend-store-uri mllogs/mlflow/

dataloader.py

The dataloader.py script is responsible for loading the MNIST dataset and preparing it for training.

from typing import Optional
import lightning as L

from torchvision.datasets import MNIST
from torch.utils.data import DataLoader, random_split
from torchvision import transforms

from itwinai.components import DataGetter, monitor_exec


class LightningMNISTDownloader(DataGetter):
    def __init__(
        self,
        data_path: str,
        name: Optional[str] = None
    ) -> None:
        super().__init__(name)
        self.save_parameters(**self.locals2params(locals()))
        self.data_path = data_path
        self._downloader = MNISTDataModule(
            data_path=self.data_path, download=True,
            # Mock other args...
            batch_size=1, train_prop=.5,
        )

    @monitor_exec
    def execute(self) -> None:
        # Simulate dataset creation to force data download
        self._downloader.setup(stage='fit')
        self._downloader.setup(stage='test')
        self._downloader.setup(stage='predict')


class MNISTDataModule(L.LightningDataModule):
    def __init__(
        self,
        data_path: str,
        batch_size: int,
        train_prop: float,
        download: bool = True
    ) -> None:
        super().__init__()
        self.data_path = data_path
        self.download = download
        self.batch_size = batch_size
        self.train_prop = train_prop
        self.transform = transforms.Compose(
            [
                transforms.ToTensor(),
                transforms.Normalize((0.1307,), (0.3081,)),
            ]
        )

    def setup(self, stage=None):
        if stage == "fit":
            mnist_full = MNIST(
                self.data_path, train=True,
                download=self.download,
                transform=self.transform
            )
            n_train_samples = int(self.train_prop * len(mnist_full))
            n_val_samples = len(mnist_full) - n_train_samples
            self.mnist_train, self.mnist_val = random_split(
                mnist_full, [n_train_samples, n_val_samples]
            )

        if stage == "test":
            self.mnist_test = MNIST(
                self.data_path, train=False,
                download=self.download,
                transform=self.transform
            )

        if stage == "predict":
            self.mnist_predict = MNIST(
                self.data_path, train=False,
                download=self.download,
                transform=self.transform
            )

    def train_dataloader(self):
        return DataLoader(
            self.mnist_train, batch_size=self.batch_size, num_workers=4)

    def val_dataloader(self):
        return DataLoader(
            self.mnist_val, batch_size=self.batch_size, num_workers=4)

    def test_dataloader(self):
        return DataLoader(
            self.mnist_test, batch_size=self.batch_size, num_workers=4)

    def predict_dataloader(self):
        return DataLoader(
            self.mnist_predict, batch_size=self.batch_size, num_workers=4)

config.yaml

This YAML file defines the pipeline configuration for the MNIST use case. It includes settings for the model, training, and evaluation.

# General config
dataset_root: .tmp/

training_pipeline:
  class_path: itwinai.pipeline.Pipeline
  init_args:
    steps:
      - class_path: dataloader.LightningMNISTDownloader
        init_args:
          data_path: ${dataset_root}

      - class_path: itwinai.torch.trainer.TorchLightningTrainer #trainer.LightningMNISTTrainer
        init_args:
          # Pytorch lightning config for training
          config:
            seed_everything: 4231162351
            trainer:
              accelerator: auto
              accumulate_grad_batches: 1
              barebones: false
              benchmark: null
              callbacks:
                - class_path: lightning.pytorch.callbacks.early_stopping.EarlyStopping
                  init_args:
                    monitor: val_loss
                    patience: 2
                - class_path: lightning.pytorch.callbacks.lr_monitor.LearningRateMonitor
                  init_args:
                    logging_interval: step
                - class_path: lightning.pytorch.callbacks.ModelCheckpoint
                  init_args:
                    dirpath: checkpoints
                    filename: best-checkpoint
                    mode: min
                    monitor: val_loss
                    save_top_k: 1
                    verbose: true
              check_val_every_n_epoch: 1
              default_root_dir: null
              detect_anomaly: false
              deterministic: null
              devices: auto
              enable_checkpointing: null
              enable_model_summary: null
              enable_progress_bar: null
              fast_dev_run: false
              gradient_clip_algorithm: null
              gradient_clip_val: null
              inference_mode: true
              limit_predict_batches: null
              limit_test_batches: null
              limit_train_batches: null
              limit_val_batches: null
              log_every_n_steps: null
              logger: null
              max_epochs: 5
              max_steps: -1
              max_time: null
              min_epochs: null
              min_steps: null
              num_sanity_val_steps: null
              overfit_batches: 0.0
              plugins: null
              profiler: null
              reload_dataloaders_every_n_epochs: 0
              strategy: auto
              sync_batchnorm: false
              use_distributed_sampler: true
              val_check_interval: null

            # Lightning Model configuration
            model:
              class_path: itwinai.torch.models.mnist.MNISTModel
              init_args:
                hidden_size: 64

            # Lightning data module configuration
            data:
              class_path: dataloader.MNISTDataModule
              init_args:
                batch_size: 32
                data_path: ${dataset_root}
                download: false
                train_prop: 0.8

            # Torch Optimizer configuration
            optimizer:
              class_path: torch.optim.AdamW
              init_args:
                lr: 0.001

            # Torch LR scheduler configuration
            lr_scheduler:
              class_path: torch.optim.lr_scheduler.ExponentialLR
              init_args:
                gamma: 0.1

startscript

The startscript is a shell script to initiate the training process. It sets up the environment and starts the training using the train.py script.

#!/bin/bash

# general configuration of the job
#SBATCH --job-name=PrototypeTest
#SBATCH --account=intertwin
#SBATCH --mail-user=
#SBATCH --mail-type=ALL
#SBATCH --output=job.out
#SBATCH --error=job.err
#SBATCH --time=00:30:00

# configure node and process count on the CM
#SBATCH --partition=batch
#SBATCH --nodes=1
#SBATCH --ntasks-per-node=1
#SBATCH --cpus-per-task=4
#SBATCH --gpus-per-node=4

#SBATCH --exclusive

# gres options have to be disabled for deepv
#SBATCH --gres=gpu:4

# load modules
ml --force purge
ml Stages/2023 StdEnv/2023 NVHPC/23.1 OpenMPI/4.1.4 cuDNN/8.6.0.163-CUDA-11.7 Python/3.10.4 HDF5 libaio/0.3.112 GCC/11.3.0

# shellcheck source=/dev/null
source ~/.bashrc

# ON LOGIN NODE download datasets:
# ../../../.venv-pytorch/bin/itwinai exec-pipeline --config config.yaml --pipe-key training_pipeline --steps dataloading_step
source ../../../.venv-pytorch/bin/activate
srun itwinai exec-pipeline --config config.yaml --pipe-key training_pipeline 

utils.py

The utils.py script includes utility functions and classes that are used across the MNIST use case.

"""
Utilities for itwinai package.
"""
import os
import yaml

from collections.abc import MutableMapping
from typing import Dict
from omegaconf import OmegaConf
from omegaconf.dictconfig import DictConfig


def load_yaml(path: str) -> Dict:
    """Load YAML file as dict.

    Args:
        path (str): path to YAML file.

    Raises:
        exc: yaml.YAMLError for loading/parsing errors.

    Returns:
        Dict: nested dict representation of parsed YAML file.
    """
    with open(path, "r", encoding="utf-8") as yaml_file:
        try:
            loaded_config = yaml.safe_load(yaml_file)
        except yaml.YAMLError as exc:
            print(exc)
            raise exc
    return loaded_config


def load_yaml_with_deps_from_file(path: str) -> DictConfig:
    """
    Load YAML file with OmegaConf and merge it with its dependencies
    specified in the `conf-dependencies` field.
    Assume that the dependencies live in the same folder of the
    YAML file which is importing them.

    Args:
        path (str): path to YAML file.

    Raises:
        exc: yaml.YAMLError for loading/parsing errors.

    Returns:
        DictConfig: nested representation of parsed YAML file.
    """
    yaml_conf = load_yaml(path)
    use_case_dir = os.path.dirname(path)
    deps = []
    if yaml_conf.get("conf-dependencies"):
        for dependency in yaml_conf["conf-dependencies"]:
            deps.append(load_yaml(os.path.join(use_case_dir, dependency)))

    return OmegaConf.merge(yaml_conf, *deps)


def load_yaml_with_deps_from_dict(dict_conf, use_case_dir) -> DictConfig:
    deps = []

    if dict_conf.get("conf-dependencies"):
        for dependency in dict_conf["conf-dependencies"]:
            deps.append(load_yaml(os.path.join(use_case_dir, dependency)))

    return OmegaConf.merge(dict_conf, *deps)


def dynamically_import_class(name: str):
    """
    Dynamically import class by module path.
    Adapted from https://stackoverflow.com/a/547867

    Args:
        name (str): path to the class (e.g., mypackage.mymodule.MyClass)

    Returns:
        __class__: class object.
    """
    module, class_name = name.rsplit(".", 1)
    mod = __import__(module, fromlist=[class_name])
    klass = getattr(mod, class_name)
    return klass


def flatten_dict(
    d: MutableMapping, parent_key: str = "", sep: str = "."
) -> MutableMapping:
    """Flatten dictionary

    Args:
        d (MutableMapping): nested dictionary to flatten
        parent_key (str, optional): prefix for all keys. Defaults to ''.
        sep (str, optional): separator for nested key concatenation.
            Defaults to '.'.

    Returns:
        MutableMapping: flattened dictionary with new keys.
    """
    items = []
    for k, v in d.items():
        new_key = parent_key + sep + k if parent_key else k
        if isinstance(v, MutableMapping):
            items.extend(flatten_dict(v, new_key, sep=sep).items())
        else:
            items.append((new_key, v))
    return dict(items)

This section covers the MNIST use case, which utilizes the torch framework for training and evaluation. The following files are integral to this use case:

PyTorch

Training

# Download dataset and exit
itwinai exec-pipeline --config config.yaml --pipe-key training_pipeline --steps dataloading_step

# Run the whole training pipeline
itwinai exec-pipeline --config config.yaml --pipe-key training_pipeline

View training logs on MLFLow server (if activated from the configuration):

mlflow ui --backend-store-uri mllogs/mlflow/

Inference

  1. Create sample dataset

    from dataloader import InferenceMNIST
    InferenceMNIST.generate_jpg_sample('mnist-sample-data/', 10)
    
  2. Generate a dummy pre-trained neural network

    import torch
    from model import Net
    dummy_nn = Net()
    torch.save(dummy_nn, 'mnist-pre-trained.pth')
    
  3. Run inference command. This will generate a β€œmnist-predictions” folder containing a CSV file with the predictions as rows.

    itwinai exec-pipeline --config config.yaml --pipe-key inference_pipeline
    

Note the same entry point as for training.

Docker image

Build from project root with

# Local
docker buildx build -t itwinai:0.0.1-mnist-torch-0.1 -f use-cases/mnist/torch/Dockerfile .

# Ghcr.io
docker buildx build -t ghcr.io/intertwin-eu/itwinai:0.0.1-mnist-torch-0.1 -f use-cases/mnist/torch/Dockerfile .
docker push ghcr.io/intertwin-eu/itwinai:0.0.1-mnist-torch-0.1

Training with Docker container

docker run -it --rm --name running-inference \
    -v "$PWD":/usr/data ghcr.io/intertwin-eu/itwinai:0.01-mnist-torch-0.1 \
    /bin/bash -c "itwinai exec-pipeline --print-config \
    --config /usr/src/app/config.yaml \
    --pipe-key training_pipeline \
    -o dataset_root=/usr/data/mnist-dataset "

Inference with Docker container

From wherever a sample of MNIST jpg images is available (folder called β€˜mnist-sample-data/’):

β”œβ”€β”€ $PWD
β”‚   β”œβ”€β”€ mnist-sample-data
|   β”‚   β”œβ”€β”€ digit_0.jpg
|   β”‚   β”œβ”€β”€ digit_1.jpg
|   β”‚   β”œβ”€β”€ digit_2.jpg
...
|   β”‚   β”œβ”€β”€ digit_N.jpg
docker run -it --rm --name running-inference \
    -v "$PWD":/usr/data ghcr.io/intertwin-eu/itwinai:0.01-mnist-torch-0.1 \
    /bin/bash -c "itwinai exec-pipeline --print-config \
    --config /usr/src/app/config.yaml \
    --pipe-key inference_pipeline \
    -o test_data_path=/usr/data/mnist-sample-data \
    -o inference_model_mlflow_uri=/usr/src/app/mnist-pre-trained.pth \
    -o predictions_dir=/usr/data/mnist-predictions "

This command will store the results in a folder called β€œmnist-predictions”:

β”œβ”€β”€ $PWD
β”‚   β”œβ”€β”€ mnist-predictions
|   β”‚   β”œβ”€β”€ predictions.csv

dataloader.py

The dataloader.py script is responsible for loading the MNIST dataset and preparing it for training.

"""Dataloader for Torch-based MNIST use case."""

from typing import Optional, Tuple, Callable, Any
import os
import shutil

from PIL import Image
from torch.utils.data import Dataset
from torchvision import transforms, datasets

from itwinai.components import DataGetter, monitor_exec


class MNISTDataModuleTorch(DataGetter):
    """Download MNIST dataset for torch."""

    def __init__(self, save_path: str = '.tmp/',) -> None:
        super().__init__()
        self.save_parameters(**self.locals2params(locals()))
        self.save_path = save_path

    @monitor_exec
    def execute(self) -> Tuple[Dataset, Dataset]:
        train_dataset = datasets.MNIST(
            self.save_path, train=True, download=True,
            transform=transforms.Compose([
                transforms.ToTensor(),
                transforms.Normalize((0.1307,), (0.3081,))
            ]))
        validation_dataset = datasets.MNIST(
            self.save_path, train=False, download=True,
            transform=transforms.Compose([
                transforms.ToTensor(),
                transforms.Normalize((0.1307,), (0.3081,))
            ]))
        print("Train and validation datasets loaded.")
        return train_dataset, validation_dataset, None


class InferenceMNIST(Dataset):
    """Loads a set of MNIST images from a folder of JPG files."""

    def __init__(
        self,
        root: str,
        transform: Optional[Callable] = None,
        supported_format: str = '.jpg'
    ) -> None:
        self.root = root
        self.transform = transform
        self.supported_format = supported_format
        self.data = dict()
        self._load()

    def _load(self):
        for img_file in os.listdir(self.root):
            if not img_file.lower().endswith(self.supported_format):
                continue
            filename = os.path.basename(img_file)
            img = Image.open(os.path.join(self.root, img_file))
            self.data[filename] = img

    def __len__(self) -> int:
        return len(self.data)

    def __getitem__(self, index: int) -> Tuple[Any, Any]:
        """
        Args:
            index (int): Index

        Returns:
            tuple: (image_identifier, image) where image_identifier
                is the unique identifier for the image (e.g., filename).
        """
        img_id, img = list(self.data.items())[index]

        if self.transform is not None:
            img = self.transform(img)

        return img_id, img

    @staticmethod
    def generate_jpg_sample(
        root: str,
        max_items: int = 100
    ):
        """Generate a sample dataset of JPG images starting from
            LeCun's test dataset.

        Args:
            root (str): sample path on disk
            max_items (int, optional): max number of images to
                generate. Defaults to 100.
        """
        if os.path.exists(root):
            shutil.rmtree(root)
        os.makedirs(root)

        test_data = datasets.MNIST(root='.tmp', train=False, download=True)
        for idx, (img, _) in enumerate(test_data):
            if idx >= max_items:
                break
            savepath = os.path.join(root, f'digit_{idx}.jpg')
            img.save(savepath)


class MNISTPredictLoader(DataGetter):
    def __init__(self, test_data_path: str) -> None:
        super().__init__()
        self.save_parameters(**self.locals2params(locals()))
        self.test_data_path = test_data_path

    @monitor_exec
    def execute(self) -> Dataset:
        return InferenceMNIST(
            root=self.test_data_path,
            transform=transforms.Compose([
                transforms.ToTensor(),
                transforms.Normalize((0.1307,), (0.3081,))
            ]))

Dockerfile

# FROM python:3.9
FROM nvcr.io/nvidia/pytorch:23.09-py3

WORKDIR /usr/src/app

# Install pytorch (cpuonly)
# Ref:https://pytorch.org/get-started/previous-versions/#linux-and-windows-5
RUN pip install --no-cache-dir torch==1.13.1+cpu torchvision==0.14.1+cpu torchaudio==0.13.1 --extra-index-url https://download.pytorch.org/whl/cpu

# Install itwinai and dependencies
COPY pyproject.toml ./
COPY src ./
RUN  pip install --no-cache-dir .

# Add torch MNIST use case
COPY use-cases/mnist/torch/* ./

create_inference_sample.py

This file defines a pipeline configuration for the MNIST use case inference.

"""Create a simple inference dataset sample and a checkpoint."""

import torch
import os
import argparse

from model import Net
from dataloader import InferenceMNIST


def mnist_torch_inference_files(
    root: str = '.',
    samples_path: str = 'mnist-sample-data/',
    model_name: str = 'mnist-pre-trained.pth'
):
    """Create sample dataset and fake model to test mnist
    inference workflow. Assumes to be run from
    the use case folder.

    Args:
        root (str, optional): where to create the files.
        Defaults to '.'.
    """

    sample = os.path.join(root, samples_path)
    InferenceMNIST.generate_jpg_sample(sample, 10)

    # Fake checkpoint
    dummy_nn = Net()
    mdl_ckpt = os.path.join(root, model_name)
    torch.save(dummy_nn, mdl_ckpt)


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--root", type=str, default='.')
    parser.add_argument("--samples-path", type=str,
                        default='mnist-sample-data')
    parser.add_argument("--model-name", type=str,
                        default='mnist-pre-trained.pth')
    args = parser.parse_args()
    mnist_torch_inference_files(**vars(args))

model.py

The model.py script is responsible for loading a simple model.

from torch import nn
import torch.nn.functional as F


class Net(nn.Module):

    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
        self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
        self.conv2_drop = nn.Dropout2d()
        self.fc1 = nn.Linear(320, 50)
        self.fc2 = nn.Linear(50, 10)

    def forward(self, x):
        x = F.relu(F.max_pool2d(self.conv1(x), 2))
        x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
        x = x.view(-1, 320)
        x = F.relu(self.fc1(x))
        x = F.dropout(x, training=self.training)
        x = self.fc2(x)
        return F.log_softmax(x, dim=0)

config.yaml

This YAML file defines the pipeline configuration for the MNIST use case. It includes settings for the model, training, and evaluation.

# General config
dataset_root: .tmp/
num_classes: 10
batch_size: 64
num_workers_dataloader: 4
pin_memory: False
lr: 0.001
momentum: 0.9
fp16_allreduce: False
use_adasum: False
gradient_predivide_factor: 1.0
epochs: 2
strategy: ddp
test_data_path: mnist-sample-data
inference_model_mlflow_uri: mnist-pre-trained.pth
predictions_dir: mnist-predictions
predictions_file: predictions.csv
class_labels: null
checkpoints_location: checkpoints
checkpoint_every: 1

# Workflows configuration
training_pipeline:
  class_path: itwinai.pipeline.Pipeline
  init_args:
    steps:
      dataloading_step:
        class_path: dataloader.MNISTDataModuleTorch
        init_args:
          save_path: ${dataset_root}

      training_step:
        class_path: itwinai.torch.trainer.TorchTrainer
        init_args:
          config:
            batch_size: ${batch_size}
            num_workers_dataloader: ${num_workers_dataloader}
            pin_gpu_memory: ${pin_memory}
            optimizer: sgd
            optim_lr: ${lr}
            optim_momentum: ${momentum}
            fp16_allreduce: ${fp16_allreduce}
            use_adasum: ${use_adasum}
            gradient_predivide_factor: ${gradient_predivide_factor}

          model:
            class_path: model.Net
          epochs: ${epochs}
          metrics:
            accuracy:
              class_path: torchmetrics.classification.MulticlassAccuracy
              init_args:
                num_classes: ${num_classes}
            precision:
              class_path: torchmetrics.classification.MulticlassPrecision
              init_args:
                num_classes: ${num_classes}
            recall:
              class_path: torchmetrics.classification.MulticlassRecall
              init_args:
                num_classes: ${num_classes}
          logger:
            class_path: itwinai.loggers.LoggersCollection
            init_args:
              loggers:
                - class_path: itwinai.loggers.ConsoleLogger
                  init_args:
                    log_freq: 10000
                - class_path: itwinai.loggers.MLFlowLogger
                  init_args:
                    experiment_name: MNIST classifier
                    log_freq: batch 
          strategy: ${strategy}
          checkpoint_every: ${checkpoint_every}
          checkpoints_location: ${checkpoints_location}


inference_pipeline:
  class_path: itwinai.pipeline.Pipeline
  init_args:
    steps:
      - class_path: dataloader.MNISTPredictLoader
        init_args:
          test_data_path: ${test_data_path}

      - class_path: itwinai.torch.inference.MulticlassTorchPredictor
        init_args: 
          model:
            class_path: itwinai.torch.inference.TorchModelLoader
            init_args:
              model_uri: ${inference_model_mlflow_uri}
          test_dataloader_kwargs:
            batch_size: ${batch_size}
      
      - class_path: saver.TorchMNISTLabelSaver
        init_args:
          save_dir: ${predictions_dir}
          predictions_file: ${predictions_file}
          class_labels: ${class_labels}

startscript.sh

The startscript is a shell script to initiate the training process. It sets up the environment and starts the training using the train.py script.

#!/bin/bash

# general configuration of the job
#SBATCH --job-name=PrototypeTest
#SBATCH --account=intertwin
#SBATCH --mail-user=
#SBATCH --mail-type=ALL
#SBATCH --output=job.out
#SBATCH --error=job.err
#SBATCH --time=00:30:00

# configure node and process count on the CM
#SBATCH --partition=batch
#SBATCH --nodes=1
#SBATCH --ntasks-per-node=1
#SBATCH --cpus-per-task=4
#SBATCH --gpus-per-node=4

#SBATCH --exclusive

# gres options have to be disabled for deepv
#SBATCH --gres=gpu:4

# load modules
ml --force purge
ml Stages/2023 StdEnv/2023 NVHPC/23.1 OpenMPI/4.1.4 cuDNN/8.6.0.163-CUDA-11.7 Python/3.10.4 HDF5 libaio/0.3.112 GCC/11.3.0

# shellcheck source=/dev/null
source ~/.bashrc

# ON LOGIN NODE download datasets:
# ../../../.venv-pytorch/bin/itwinai exec-pipeline --config config.yaml --pipe-key training_pipeline --steps dataloading_step
source ../../../.venv-pytorch/bin/activate
srun itwinai exec-pipeline --config config.yaml --pipe-key training_pipeline 

saver.py

…

"""
This module is used during inference to save predicted labels to file.
"""

from typing import Optional, List, Dict
import os
import shutil
import csv

from itwinai.components import Saver, monitor_exec


class TorchMNISTLabelSaver(Saver):
    """Serializes to disk the labels predicted for MNIST dataset."""

    def __init__(
        self,
        save_dir: str = 'mnist_predictions',
        predictions_file: str = 'predictions.csv',
        class_labels: Optional[List] = None
    ) -> None:
        super().__init__()
        self.save_parameters(**self.locals2params(locals()))
        self.save_dir = save_dir
        self.predictions_file = predictions_file
        self.class_labels = (
            class_labels if class_labels is not None
            else [f'Digit {i}' for i in range(10)]
        )

    @monitor_exec
    def execute(self, predicted_classes: Dict[str, int],) -> Dict[str, int]:
        """Translate predictions from class idx to class label and save
        them to disk.

        Args:
            predicted_classes (Dict[str, int]): maps unique item ID to
                the predicted class ID.

        Returns:
            Dict[str, int]: predicted classes.
        """
        if os.path.exists(self.save_dir):
            shutil.rmtree(self.save_dir)
        os.makedirs(self.save_dir)

        # Map class idx (int) to class label (str)
        predicted_labels = {
            itm_name: self.class_labels[cls_idx]
            for itm_name, cls_idx in predicted_classes.items()
        }

        # Save to disk
        filepath = os.path.join(self.save_dir, self.predictions_file)
        with open(filepath, 'w') as csv_file:
            writer = csv.writer(csv_file)
            for key, value in predicted_labels.items():
                writer.writerow([key, value])
        return predicted_labels

runall.sh

#!/bin/bash

# Python virtual environment (no conda/micromamba)
PYTHON_VENV="../../../envAI_hdfml"

# Clear SLURM logs (*.out and *.err files)
rm -rf logs_slurm checkpoints* mllogs*
mkdir logs_slurm
rm -rf logs_torchrun

# DDP itwinai
DIST_MODE="ddp"
RUN_NAME="ddp-itwinai"
TRAINING_CMD="$PYTHON_VENV/bin/itwinai exec-pipeline --config config.yaml --pipe-key training_pipeline -o strategy=ddp -o checkpoints_location=checkpoints_ddp"
sbatch --export=ALL,DIST_MODE="$DIST_MODE",RUN_NAME="$RUN_NAME",TRAINING_CMD="$TRAINING_CMD",PYTHON_VENV="$PYTHON_VENV" \
    --job-name="$RUN_NAME-n$N" \
    --output="logs_slurm/job-$RUN_NAME-n$N.out" \
    --error="logs_slurm/job-$RUN_NAME-n$N.err" \
    slurm.sh

# DeepSpeed itwinai
DIST_MODE="deepspeed"
RUN_NAME="deepspeed-itwinai"
TRAINING_CMD="$PYTHON_VENV/bin/itwinai exec-pipeline --config config.yaml --pipe-key training_pipeline -o strategy=deepspeed -o checkpoints_location=checkpoints_deepspeed"
sbatch --export=ALL,DIST_MODE="$DIST_MODE",RUN_NAME="$RUN_NAME",TRAINING_CMD="$TRAINING_CMD",PYTHON_VENV="$PYTHON_VENV" \
    --job-name="$RUN_NAME-n$N" \
    --output="logs_slurm/job-$RUN_NAME-n$N.out" \
    --error="logs_slurm/job-$RUN_NAME-n$N.err" \
    slurm.sh

# Horovod itwinai
DIST_MODE="horovod"
RUN_NAME="horovod-itwinai"
TRAINING_CMD="$PYTHON_VENV/bin/itwinai exec-pipeline --config config.yaml --pipe-key training_pipeline -o strategy=horovod -o checkpoints_location=checkpoints_hvd"
sbatch --export=ALL,DIST_MODE="$DIST_MODE",RUN_NAME="$RUN_NAME",TRAINING_CMD="$TRAINING_CMD",PYTHON_VENV="$PYTHON_VENV" \
    --job-name="$RUN_NAME-n$N" \
    --output="logs_slurm/job-$RUN_NAME-n$N.out" \
    --error="logs_slurm/job-$RUN_NAME-n$N.err" \
    slurm.sh

slurm.sh

#!/bin/bash

# SLURM jobscript for JSC systems

# Job configuration
#SBATCH --job-name=distributed_training
#SBATCH --account=intertwin
#SBATCH --mail-user=
#SBATCH --mail-type=ALL
#SBATCH --output=job.out
#SBATCH --error=job.err
#SBATCH --time=00:30:00

# Resources allocation
#SBATCH --partition=batch
#SBATCH --nodes=2
#SBATCH --gpus-per-node=4
#SBATCH --cpus-per-gpu=4
#SBATCH --exclusive

# gres options have to be disabled for deepv
#SBATCH --gres=gpu:4

# Load environment modules
ml Stages/2024 GCC OpenMPI CUDA/12 MPI-settings/CUDA Python HDF5 PnetCDF libaio mpi4py

# Job info
echo "DEBUG: TIME: $(date)"
sysN="$(uname -n | cut -f2- -d.)"
sysN="${sysN%%[0-9]*}"
echo "Running on system: $sysN"
echo "DEBUG: EXECUTE: $EXEC"
echo "DEBUG: SLURM_SUBMIT_DIR: $SLURM_SUBMIT_DIR"
echo "DEBUG: SLURM_JOB_ID: $SLURM_JOB_ID"
echo "DEBUG: SLURM_JOB_NODELIST: $SLURM_JOB_NODELIST"
echo "DEBUG: SLURM_NNODES: $SLURM_NNODES"
echo "DEBUG: SLURM_NTASKS: $SLURM_NTASKS"
echo "DEBUG: SLURM_TASKS_PER_NODE: $SLURM_TASKS_PER_NODE"
echo "DEBUG: SLURM_SUBMIT_HOST: $SLURM_SUBMIT_HOST"
echo "DEBUG: SLURMD_NODENAME: $SLURMD_NODENAME"
echo "DEBUG: CUDA_VISIBLE_DEVICES: $CUDA_VISIBLE_DEVICES"
if [ "$DEBUG" = true ] ; then
  echo "DEBUG: NCCL_DEBUG=INFO" 
  export NCCL_DEBUG=INFO
fi
echo

# Setup env for distributed ML
export CUDA_VISIBLE_DEVICES="0,1,2,3"
export OMP_NUM_THREADS=1
if [ "$SLURM_CPUS_PER_GPU" -gt 0 ] ; then
  export OMP_NUM_THREADS=$SLURM_CPUS_PER_GPU
fi

# Env vairables check
if [ -z "$DIST_MODE" ]; then 
  >&2 echo "ERROR: env variable DIST_MODE is not set. Allowed values are 'horovod', 'ddp' or 'deepspeed'"
  exit 1
fi
if [ -z "$RUN_NAME" ]; then 
  >&2 echo "WARNING: env variable RUN_NAME is not set. It's a way to identify some specific run of an experiment."
  RUN_NAME=$DIST_MODE
fi
if [ -z "$TRAINING_CMD" ]; then 
  >&2 echo "ERROR: env variable TRAINING_CMD is not set. It's the python command to execute."
  exit 1
fi
if [ -z "$PYTHON_VENV" ]; then 
  >&2 echo "WARNING: env variable PYTHON_VENV is not set. It's the path to a python virtual environment."
else
  # Activate Python virtual env
  source $PYTHON_VENV/bin/activate
fi

# Get GPUs info per node
srun --cpu-bind=none --ntasks-per-node=1 bash -c 'echo -e "NODE hostname: $(hostname)\n$(nvidia-smi)\n\n"'

# Launch training
if [ "$DIST_MODE" == "ddp" ] ; then
  echo "DDP training: $TRAINING_CMD"
  srun --cpu-bind=none --ntasks-per-node=1 \
    bash -c "torchrun \
    --log_dir='logs_torchrun' \
    --nnodes=$SLURM_NNODES \
    --nproc_per_node=$SLURM_GPUS_PER_NODE \
    --rdzv_id=$SLURM_JOB_ID \
    --rdzv_conf=is_host=\$(((SLURM_NODEID)) && echo 0 || echo 1) \
    --rdzv_backend=c10d \
    --rdzv_endpoint='$(scontrol show hostnames "$SLURM_JOB_NODELIST" | head -n 1)'i:29500 \
    $TRAINING_CMD"
elif [ "$DIST_MODE" == "deepspeed" ] ; then
  echo "DEEPSPEED training: $TRAINING_CMD"
  MASTER_ADDR=$(scontrol show hostnames "\$SLURM_JOB_NODELIST" | head -n 1)i
  export MASTER_ADDR
  export MASTER_PORT=29500 

  srun --cpu-bind=none --ntasks-per-node=$SLURM_GPUS_PER_NODE --cpus-per-task=$SLURM_CPUS_PER_GPU \
    $TRAINING_CMD

  # # Run with deepspeed launcher: set --ntasks-per-node=1
  # # https://www.deepspeed.ai/getting-started/#multi-node-environment-variables
  # export NCCL_IB_DISABLE=1
  # export NCCL_SOCKET_IFNAME=eth0
  # nodelist=$(scontrol show hostname $SLURM_NODELIST)
  # echo "$nodelist" | sed -e 's/$/ slots=4/' > .hostfile
  # # Requires passwordless SSH access among compute node
  # srun --cpu-bind=none deepspeed --hostfile=.hostfile $TRAINING_CMD --deepspeed
  # rm .hostfile
elif [ "$DIST_MODE" == "horovod" ] ; then
  echo "HOROVOD training: $TRAINING_CMD"
  srun --cpu-bind=none --ntasks-per-node=$SLURM_GPUS_PER_NODE --cpus-per-task=$SLURM_CPUS_PER_GPU \
    $TRAINING_CMD
else
  >&2 echo "ERROR: unrecognized \$DIST_MODE env variable"
  exit 1
fi

This section covers the MNIST use case, which utilizes the tensorflow framework for training and evaluation. The following files are integral to this use case:

Tensorflow

dataloader.py

The dataloader.py script is responsible for loading the MNIST dataset and preparing it for training.

from typing import Tuple
import tensorflow.keras as keras
import tensorflow as tf

from itwinai.components import DataGetter, DataProcessor, monitor_exec


class MNISTDataGetter(DataGetter):
    def __init__(self):
        super().__init__()
        self.save_parameters(**self.locals2params(locals()))

    @monitor_exec
    def execute(self) -> Tuple:
        train, test = keras.datasets.mnist.load_data()
        return train, test


class MNISTDataPreproc(DataProcessor):
    def __init__(self, classes: int):
        super().__init__()
        self.save_parameters(**self.locals2params(locals()))
        self.classes = classes

    @monitor_exec
    def execute(
        self,
        *datasets,
    ) -> Tuple:
        options = tf.data.Options()
        options.experimental_distribute.auto_shard_policy = (
            tf.data.experimental.AutoShardPolicy.DATA)
        preprocessed = []
        for dataset in datasets:
            x, y = dataset
            y = keras.utils.to_categorical(y, self.classes)
            sliced = tf.data.Dataset.from_tensor_slices((x, y))
            sliced = sliced.with_options(options)
            preprocessed.append(sliced)
        return tuple(preprocessed)

pipeline.yaml

This YAML file defines the pipeline configuration for the MNIST use case. It includes settings for the model, training, and evaluation.

# General config
verbose: auto
micro_batch_size: 17
epochs: 3
checkpoints_path: checkpoints
tb_log_dir: ./logs

# Training pipeline
pipeline:
  class_path: itwinai.pipeline.Pipeline
  init_args:
    steps:
      - class_path: dataloader.MNISTDataGetter

      - class_path: dataloader.MNISTDataPreproc
        init_args:
          classes: 10

      - class_path: itwinai.tensorflow.trainer.TensorflowTrainer
        init_args:
          epochs: ${epochs}
          micro_batch_size: ${micro_batch_size}
          verbose: ${verbose}
          model_compile_config:
            loss:
              class_path: tensorflow.keras.losses.CategoricalCrossentropy
              init_args:
                from_logits: False

            optimizer: 
              class_path: tensorflow.keras.optimizers.Adam
              init_args: 
                  learning_rate: 0.001

          model_config:
            class_path: itwinai.tensorflow.models.mnist.MNIST_Model
            init_args:
              input_shape: [ 28, 28, 1 ]
              output_shape: 10
        
          callbacks:
            - class_path: keras.callbacks.EarlyStopping
              init_args:
                patience: 2
            - class_path: keras.callbacks.ModelCheckpoint
              init_args:
                filepath: ${checkpoints_path}/model.{epoch:02d}-{val_loss:.2f}.keras
            - class_path: keras.callbacks.TensorBoard
              init_args:
                log_dir: ${tb_log_dir}

startscript.sh

The startscript is a shell script to initiate the training pipeline.

#!/bin/bash

# general configuration of the job
#SBATCH --job-name=PrototypeTest
#SBATCH --account=intertwin
#SBATCH --mail-user=
#SBATCH --mail-type=ALL
#SBATCH --output=job.out
#SBATCH --error=job.err
#SBATCH --time=00:30:00

# configure node and process count on the CM
#SBATCH --partition=batch
#SBATCH --nodes=2
#SBATCH --ntasks-per-node=1
#SBATCH --cpus-per-task=4
#SBATCH --gpus-per-node=4

#SBATCH --exclusive

# gres options have to be disabled for deepv
#SBATCH --gres=gpu:4

# load modules
ml --force purge
ml Stages/2024 GCC/12.3.0 OpenMPI CUDA/12 MPI-settings/CUDA Python/3.11 HDF5 PnetCDF libaio mpi4py CMake cuDNN/8.9.5.29-CUDA-12

# shellcheck source=/dev/null
source ~/.bashrc

# Using legacy (2.16) version of Keras
# Latest version with TF (2.16) installs Keras 3.3
# which returns an error for multi-node execution
export TF_USE_LEGACY_KERAS=1

# ON LOGIN NODE download datasets:
# ../../../.venv-tf/bin/itwinai exec-pipeline --config pipeline.yaml --pipe-key pipeline --steps 0
source ../../../envAItf_hdfml/bin/activate
srun itwinai exec-pipeline --config pipeline.yaml --pipe-key pipeline -o verbose=2