MNISTο
This section covers the MNIST use case, which utilizes the torch-lightning framework for training and evaluation. The following files are integral to this use case:
Torch Lightningο
Training
# Download dataset and exit: only run first step in the pipeline (index=0)
itwinai exec-pipeline --config config.yaml --pipe-key training_pipeline --steps 0
# Run the whole training pipeline
itwinai exec-pipeline --config config.yaml --pipe-key training_pipeline
View training logs on MLFLow server (if activated from the configuration):
mlflow ui --backend-store-uri mllogs/mlflow/
dataloader.pyο
The dataloader.py script is responsible for loading the MNIST dataset and preparing it for training.
from typing import Optional
import lightning as L
from torchvision.datasets import MNIST
from torch.utils.data import DataLoader, random_split
from torchvision import transforms
from itwinai.components import DataGetter, monitor_exec
class LightningMNISTDownloader(DataGetter):
def __init__(
self,
data_path: str,
name: Optional[str] = None
) -> None:
super().__init__(name)
self.save_parameters(**self.locals2params(locals()))
self.data_path = data_path
self._downloader = MNISTDataModule(
data_path=self.data_path, download=True,
# Mock other args...
batch_size=1, train_prop=.5,
)
@monitor_exec
def execute(self) -> None:
# Simulate dataset creation to force data download
self._downloader.setup(stage='fit')
self._downloader.setup(stage='test')
self._downloader.setup(stage='predict')
class MNISTDataModule(L.LightningDataModule):
def __init__(
self,
data_path: str,
batch_size: int,
train_prop: float,
download: bool = True
) -> None:
super().__init__()
self.data_path = data_path
self.download = download
self.batch_size = batch_size
self.train_prop = train_prop
self.transform = transforms.Compose(
[
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,)),
]
)
def setup(self, stage=None):
if stage == "fit":
mnist_full = MNIST(
self.data_path, train=True,
download=self.download,
transform=self.transform
)
n_train_samples = int(self.train_prop * len(mnist_full))
n_val_samples = len(mnist_full) - n_train_samples
self.mnist_train, self.mnist_val = random_split(
mnist_full, [n_train_samples, n_val_samples]
)
if stage == "test":
self.mnist_test = MNIST(
self.data_path, train=False,
download=self.download,
transform=self.transform
)
if stage == "predict":
self.mnist_predict = MNIST(
self.data_path, train=False,
download=self.download,
transform=self.transform
)
def train_dataloader(self):
return DataLoader(
self.mnist_train, batch_size=self.batch_size, num_workers=4)
def val_dataloader(self):
return DataLoader(
self.mnist_val, batch_size=self.batch_size, num_workers=4)
def test_dataloader(self):
return DataLoader(
self.mnist_test, batch_size=self.batch_size, num_workers=4)
def predict_dataloader(self):
return DataLoader(
self.mnist_predict, batch_size=self.batch_size, num_workers=4)
config.yamlο
This YAML file defines the pipeline configuration for the MNIST use case. It includes settings for the model, training, and evaluation.
# General config
dataset_root: .tmp/
training_pipeline:
class_path: itwinai.pipeline.Pipeline
init_args:
steps:
- class_path: dataloader.LightningMNISTDownloader
init_args:
data_path: ${dataset_root}
- class_path: itwinai.torch.trainer.TorchLightningTrainer #trainer.LightningMNISTTrainer
init_args:
# Pytorch lightning config for training
config:
seed_everything: 4231162351
trainer:
accelerator: auto
accumulate_grad_batches: 1
barebones: false
benchmark: null
callbacks:
- class_path: lightning.pytorch.callbacks.early_stopping.EarlyStopping
init_args:
monitor: val_loss
patience: 2
- class_path: lightning.pytorch.callbacks.lr_monitor.LearningRateMonitor
init_args:
logging_interval: step
- class_path: lightning.pytorch.callbacks.ModelCheckpoint
init_args:
dirpath: checkpoints
filename: best-checkpoint
mode: min
monitor: val_loss
save_top_k: 1
verbose: true
check_val_every_n_epoch: 1
default_root_dir: null
detect_anomaly: false
deterministic: null
devices: auto
enable_checkpointing: null
enable_model_summary: null
enable_progress_bar: null
fast_dev_run: false
gradient_clip_algorithm: null
gradient_clip_val: null
inference_mode: true
limit_predict_batches: null
limit_test_batches: null
limit_train_batches: null
limit_val_batches: null
log_every_n_steps: null
logger: null
max_epochs: 5
max_steps: -1
max_time: null
min_epochs: null
min_steps: null
num_sanity_val_steps: null
overfit_batches: 0.0
plugins: null
profiler: null
reload_dataloaders_every_n_epochs: 0
strategy: auto
sync_batchnorm: false
use_distributed_sampler: true
val_check_interval: null
# Lightning Model configuration
model:
class_path: itwinai.torch.models.mnist.MNISTModel
init_args:
hidden_size: 64
# Lightning data module configuration
data:
class_path: dataloader.MNISTDataModule
init_args:
batch_size: 32
data_path: ${dataset_root}
download: false
train_prop: 0.8
# Torch Optimizer configuration
optimizer:
class_path: torch.optim.AdamW
init_args:
lr: 0.001
# Torch LR scheduler configuration
lr_scheduler:
class_path: torch.optim.lr_scheduler.ExponentialLR
init_args:
gamma: 0.1
startscriptο
The startscript is a shell script to initiate the training process. It sets up the environment and starts the training using the train.py script.
#!/bin/bash
# general configuration of the job
#SBATCH --job-name=PrototypeTest
#SBATCH --account=intertwin
#SBATCH --mail-user=
#SBATCH --mail-type=ALL
#SBATCH --output=job.out
#SBATCH --error=job.err
#SBATCH --time=00:30:00
# configure node and process count on the CM
#SBATCH --partition=batch
#SBATCH --nodes=1
#SBATCH --ntasks-per-node=1
#SBATCH --cpus-per-task=4
#SBATCH --gpus-per-node=4
#SBATCH --exclusive
# gres options have to be disabled for deepv
#SBATCH --gres=gpu:4
# load modules
ml --force purge
ml Stages/2023 StdEnv/2023 NVHPC/23.1 OpenMPI/4.1.4 cuDNN/8.6.0.163-CUDA-11.7 Python/3.10.4 HDF5 libaio/0.3.112 GCC/11.3.0
# shellcheck source=/dev/null
source ~/.bashrc
# ON LOGIN NODE download datasets:
# ../../../.venv-pytorch/bin/itwinai exec-pipeline --config config.yaml --pipe-key training_pipeline --steps dataloading_step
source ../../../.venv-pytorch/bin/activate
srun itwinai exec-pipeline --config config.yaml --pipe-key training_pipeline
utils.pyο
The utils.py script includes utility functions and classes that are used across the MNIST use case.
"""
Utilities for itwinai package.
"""
import os
import yaml
from collections.abc import MutableMapping
from typing import Dict
from omegaconf import OmegaConf
from omegaconf.dictconfig import DictConfig
def load_yaml(path: str) -> Dict:
"""Load YAML file as dict.
Args:
path (str): path to YAML file.
Raises:
exc: yaml.YAMLError for loading/parsing errors.
Returns:
Dict: nested dict representation of parsed YAML file.
"""
with open(path, "r", encoding="utf-8") as yaml_file:
try:
loaded_config = yaml.safe_load(yaml_file)
except yaml.YAMLError as exc:
print(exc)
raise exc
return loaded_config
def load_yaml_with_deps_from_file(path: str) -> DictConfig:
"""
Load YAML file with OmegaConf and merge it with its dependencies
specified in the `conf-dependencies` field.
Assume that the dependencies live in the same folder of the
YAML file which is importing them.
Args:
path (str): path to YAML file.
Raises:
exc: yaml.YAMLError for loading/parsing errors.
Returns:
DictConfig: nested representation of parsed YAML file.
"""
yaml_conf = load_yaml(path)
use_case_dir = os.path.dirname(path)
deps = []
if yaml_conf.get("conf-dependencies"):
for dependency in yaml_conf["conf-dependencies"]:
deps.append(load_yaml(os.path.join(use_case_dir, dependency)))
return OmegaConf.merge(yaml_conf, *deps)
def load_yaml_with_deps_from_dict(dict_conf, use_case_dir) -> DictConfig:
deps = []
if dict_conf.get("conf-dependencies"):
for dependency in dict_conf["conf-dependencies"]:
deps.append(load_yaml(os.path.join(use_case_dir, dependency)))
return OmegaConf.merge(dict_conf, *deps)
def dynamically_import_class(name: str):
"""
Dynamically import class by module path.
Adapted from https://stackoverflow.com/a/547867
Args:
name (str): path to the class (e.g., mypackage.mymodule.MyClass)
Returns:
__class__: class object.
"""
module, class_name = name.rsplit(".", 1)
mod = __import__(module, fromlist=[class_name])
klass = getattr(mod, class_name)
return klass
def flatten_dict(
d: MutableMapping, parent_key: str = "", sep: str = "."
) -> MutableMapping:
"""Flatten dictionary
Args:
d (MutableMapping): nested dictionary to flatten
parent_key (str, optional): prefix for all keys. Defaults to ''.
sep (str, optional): separator for nested key concatenation.
Defaults to '.'.
Returns:
MutableMapping: flattened dictionary with new keys.
"""
items = []
for k, v in d.items():
new_key = parent_key + sep + k if parent_key else k
if isinstance(v, MutableMapping):
items.extend(flatten_dict(v, new_key, sep=sep).items())
else:
items.append((new_key, v))
return dict(items)
This section covers the MNIST use case, which utilizes the torch framework for training and evaluation. The following files are integral to this use case:
PyTorchο
Training
# Download dataset and exit
itwinai exec-pipeline --config config.yaml --pipe-key training_pipeline --steps dataloading_step
# Run the whole training pipeline
itwinai exec-pipeline --config config.yaml --pipe-key training_pipeline
View training logs on MLFLow server (if activated from the configuration):
mlflow ui --backend-store-uri mllogs/mlflow/
Inference
Create sample dataset
from dataloader import InferenceMNIST InferenceMNIST.generate_jpg_sample('mnist-sample-data/', 10)
Generate a dummy pre-trained neural network
import torch from model import Net dummy_nn = Net() torch.save(dummy_nn, 'mnist-pre-trained.pth')
Run inference command. This will generate a βmnist-predictionsβ folder containing a CSV file with the predictions as rows.
itwinai exec-pipeline --config config.yaml --pipe-key inference_pipeline
Note the same entry point as for training.
Docker imageο
Build from project root with
# Local
docker buildx build -t itwinai:0.0.1-mnist-torch-0.1 -f use-cases/mnist/torch/Dockerfile .
# Ghcr.io
docker buildx build -t ghcr.io/intertwin-eu/itwinai:0.0.1-mnist-torch-0.1 -f use-cases/mnist/torch/Dockerfile .
docker push ghcr.io/intertwin-eu/itwinai:0.0.1-mnist-torch-0.1
Training with Docker container
docker run -it --rm --name running-inference \
-v "$PWD":/usr/data ghcr.io/intertwin-eu/itwinai:0.01-mnist-torch-0.1 \
/bin/bash -c "itwinai exec-pipeline --print-config \
--config /usr/src/app/config.yaml \
--pipe-key training_pipeline \
-o dataset_root=/usr/data/mnist-dataset "
Inference with Docker container
From wherever a sample of MNIST jpg images is available (folder called βmnist-sample-data/β):
βββ $PWD
β βββ mnist-sample-data
β β βββ digit_0.jpg
β β βββ digit_1.jpg
β β βββ digit_2.jpg
...
β β βββ digit_N.jpg
docker run -it --rm --name running-inference \
-v "$PWD":/usr/data ghcr.io/intertwin-eu/itwinai:0.01-mnist-torch-0.1 \
/bin/bash -c "itwinai exec-pipeline --print-config \
--config /usr/src/app/config.yaml \
--pipe-key inference_pipeline \
-o test_data_path=/usr/data/mnist-sample-data \
-o inference_model_mlflow_uri=/usr/src/app/mnist-pre-trained.pth \
-o predictions_dir=/usr/data/mnist-predictions "
This command will store the results in a folder called βmnist-predictionsβ:
βββ $PWD
β βββ mnist-predictions
| β βββ predictions.csv
dataloader.pyο
The dataloader.py script is responsible for loading the MNIST dataset and preparing it for training.
"""Dataloader for Torch-based MNIST use case."""
from typing import Optional, Tuple, Callable, Any
import os
import shutil
from PIL import Image
from torch.utils.data import Dataset
from torchvision import transforms, datasets
from itwinai.components import DataGetter, monitor_exec
class MNISTDataModuleTorch(DataGetter):
"""Download MNIST dataset for torch."""
def __init__(self, save_path: str = '.tmp/',) -> None:
super().__init__()
self.save_parameters(**self.locals2params(locals()))
self.save_path = save_path
@monitor_exec
def execute(self) -> Tuple[Dataset, Dataset]:
train_dataset = datasets.MNIST(
self.save_path, train=True, download=True,
transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
]))
validation_dataset = datasets.MNIST(
self.save_path, train=False, download=True,
transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
]))
print("Train and validation datasets loaded.")
return train_dataset, validation_dataset, None
class InferenceMNIST(Dataset):
"""Loads a set of MNIST images from a folder of JPG files."""
def __init__(
self,
root: str,
transform: Optional[Callable] = None,
supported_format: str = '.jpg'
) -> None:
self.root = root
self.transform = transform
self.supported_format = supported_format
self.data = dict()
self._load()
def _load(self):
for img_file in os.listdir(self.root):
if not img_file.lower().endswith(self.supported_format):
continue
filename = os.path.basename(img_file)
img = Image.open(os.path.join(self.root, img_file))
self.data[filename] = img
def __len__(self) -> int:
return len(self.data)
def __getitem__(self, index: int) -> Tuple[Any, Any]:
"""
Args:
index (int): Index
Returns:
tuple: (image_identifier, image) where image_identifier
is the unique identifier for the image (e.g., filename).
"""
img_id, img = list(self.data.items())[index]
if self.transform is not None:
img = self.transform(img)
return img_id, img
@staticmethod
def generate_jpg_sample(
root: str,
max_items: int = 100
):
"""Generate a sample dataset of JPG images starting from
LeCun's test dataset.
Args:
root (str): sample path on disk
max_items (int, optional): max number of images to
generate. Defaults to 100.
"""
if os.path.exists(root):
shutil.rmtree(root)
os.makedirs(root)
test_data = datasets.MNIST(root='.tmp', train=False, download=True)
for idx, (img, _) in enumerate(test_data):
if idx >= max_items:
break
savepath = os.path.join(root, f'digit_{idx}.jpg')
img.save(savepath)
class MNISTPredictLoader(DataGetter):
def __init__(self, test_data_path: str) -> None:
super().__init__()
self.save_parameters(**self.locals2params(locals()))
self.test_data_path = test_data_path
@monitor_exec
def execute(self) -> Dataset:
return InferenceMNIST(
root=self.test_data_path,
transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
]))
Dockerfileο
# FROM python:3.9
FROM nvcr.io/nvidia/pytorch:23.09-py3
WORKDIR /usr/src/app
# Install pytorch (cpuonly)
# Ref:https://pytorch.org/get-started/previous-versions/#linux-and-windows-5
RUN pip install --no-cache-dir torch==1.13.1+cpu torchvision==0.14.1+cpu torchaudio==0.13.1 --extra-index-url https://download.pytorch.org/whl/cpu
# Install itwinai and dependencies
COPY pyproject.toml ./
COPY src ./
RUN pip install --no-cache-dir .
# Add torch MNIST use case
COPY use-cases/mnist/torch/* ./
create_inference_sample.pyο
This file defines a pipeline configuration for the MNIST use case inference.
"""Create a simple inference dataset sample and a checkpoint."""
import torch
import os
import argparse
from model import Net
from dataloader import InferenceMNIST
def mnist_torch_inference_files(
root: str = '.',
samples_path: str = 'mnist-sample-data/',
model_name: str = 'mnist-pre-trained.pth'
):
"""Create sample dataset and fake model to test mnist
inference workflow. Assumes to be run from
the use case folder.
Args:
root (str, optional): where to create the files.
Defaults to '.'.
"""
sample = os.path.join(root, samples_path)
InferenceMNIST.generate_jpg_sample(sample, 10)
# Fake checkpoint
dummy_nn = Net()
mdl_ckpt = os.path.join(root, model_name)
torch.save(dummy_nn, mdl_ckpt)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--root", type=str, default='.')
parser.add_argument("--samples-path", type=str,
default='mnist-sample-data')
parser.add_argument("--model-name", type=str,
default='mnist-pre-trained.pth')
args = parser.parse_args()
mnist_torch_inference_files(**vars(args))
model.pyο
The model.py script is responsible for loading a simple model.
from torch import nn
import torch.nn.functional as F
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
self.conv2_drop = nn.Dropout2d()
self.fc1 = nn.Linear(320, 50)
self.fc2 = nn.Linear(50, 10)
def forward(self, x):
x = F.relu(F.max_pool2d(self.conv1(x), 2))
x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
x = x.view(-1, 320)
x = F.relu(self.fc1(x))
x = F.dropout(x, training=self.training)
x = self.fc2(x)
return F.log_softmax(x, dim=0)
config.yamlο
This YAML file defines the pipeline configuration for the MNIST use case. It includes settings for the model, training, and evaluation.
# General config
dataset_root: .tmp/
num_classes: 10
batch_size: 64
num_workers_dataloader: 4
pin_memory: False
lr: 0.001
momentum: 0.9
fp16_allreduce: False
use_adasum: False
gradient_predivide_factor: 1.0
epochs: 2
strategy: ddp
test_data_path: mnist-sample-data
inference_model_mlflow_uri: mnist-pre-trained.pth
predictions_dir: mnist-predictions
predictions_file: predictions.csv
class_labels: null
checkpoints_location: checkpoints
checkpoint_every: 1
# Workflows configuration
training_pipeline:
class_path: itwinai.pipeline.Pipeline
init_args:
steps:
dataloading_step:
class_path: dataloader.MNISTDataModuleTorch
init_args:
save_path: ${dataset_root}
training_step:
class_path: itwinai.torch.trainer.TorchTrainer
init_args:
config:
batch_size: ${batch_size}
num_workers: ${num_workers_dataloader}
pin_memory: ${pin_memory}
lr: ${lr}
momentum: ${momentum}
fp16_allreduce: ${fp16_allreduce}
use_adasum: ${use_adasum}
gradient_predivide_factor: ${gradient_predivide_factor}
model:
class_path: model.Net
epochs: ${epochs}
metrics:
accuracy:
class_path: torchmetrics.classification.MulticlassAccuracy
init_args:
num_classes: ${num_classes}
precision:
class_path: torchmetrics.classification.MulticlassPrecision
init_args:
num_classes: ${num_classes}
recall:
class_path: torchmetrics.classification.MulticlassRecall
init_args:
num_classes: ${num_classes}
logger:
class_path: itwinai.loggers.LoggersCollection
init_args:
loggers:
- class_path: itwinai.loggers.ConsoleLogger
init_args:
log_freq: 10000
- class_path: itwinai.loggers.MLFlowLogger
init_args:
experiment_name: MNIST classifier
log_freq: batch
strategy: ${strategy}
checkpoint_every: ${checkpoint_every}
checkpoints_location: ${checkpoints_location}
inference_pipeline:
class_path: itwinai.pipeline.Pipeline
init_args:
steps:
- class_path: dataloader.MNISTPredictLoader
init_args:
test_data_path: ${test_data_path}
- class_path: itwinai.torch.inference.MulticlassTorchPredictor
init_args:
model:
class_path: itwinai.torch.inference.TorchModelLoader
init_args:
model_uri: ${inference_model_mlflow_uri}
test_dataloader_kwargs:
batch_size: ${batch_size}
- class_path: saver.TorchMNISTLabelSaver
init_args:
save_dir: ${predictions_dir}
predictions_file: ${predictions_file}
class_labels: ${class_labels}
startscript.shο
The startscript is a shell script to initiate the training process. It sets up the environment and starts the training using the train.py script.
#!/bin/bash
# general configuration of the job
#SBATCH --job-name=PrototypeTest
#SBATCH --account=intertwin
#SBATCH --mail-user=
#SBATCH --mail-type=ALL
#SBATCH --output=job.out
#SBATCH --error=job.err
#SBATCH --time=00:30:00
# configure node and process count on the CM
#SBATCH --partition=batch
#SBATCH --nodes=1
#SBATCH --ntasks-per-node=1
#SBATCH --cpus-per-task=4
#SBATCH --gpus-per-node=4
#SBATCH --exclusive
# gres options have to be disabled for deepv
#SBATCH --gres=gpu:4
# load modules
ml --force purge
ml Stages/2023 StdEnv/2023 NVHPC/23.1 OpenMPI/4.1.4 cuDNN/8.6.0.163-CUDA-11.7 Python/3.10.4 HDF5 libaio/0.3.112 GCC/11.3.0
# shellcheck source=/dev/null
source ~/.bashrc
# ON LOGIN NODE download datasets:
# ../../../.venv-pytorch/bin/itwinai exec-pipeline --config config.yaml --pipe-key training_pipeline --steps dataloading_step
source ../../../.venv-pytorch/bin/activate
srun itwinai exec-pipeline --config config.yaml --pipe-key training_pipeline
saver.pyο
β¦
"""
This module is used during inference to save predicted labels to file.
"""
from typing import Optional, List, Dict
import os
import shutil
import csv
from itwinai.components import Saver, monitor_exec
class TorchMNISTLabelSaver(Saver):
"""Serializes to disk the labels predicted for MNIST dataset."""
def __init__(
self,
save_dir: str = 'mnist_predictions',
predictions_file: str = 'predictions.csv',
class_labels: Optional[List] = None
) -> None:
super().__init__()
self.save_parameters(**self.locals2params(locals()))
self.save_dir = save_dir
self.predictions_file = predictions_file
self.class_labels = (
class_labels if class_labels is not None
else [f'Digit {i}' for i in range(10)]
)
@monitor_exec
def execute(self, predicted_classes: Dict[str, int],) -> Dict[str, int]:
"""Translate predictions from class idx to class label and save
them to disk.
Args:
predicted_classes (Dict[str, int]): maps unique item ID to
the predicted class ID.
Returns:
Dict[str, int]: predicted classes.
"""
if os.path.exists(self.save_dir):
shutil.rmtree(self.save_dir)
os.makedirs(self.save_dir)
# Map class idx (int) to class label (str)
predicted_labels = {
itm_name: self.class_labels[cls_idx]
for itm_name, cls_idx in predicted_classes.items()
}
# Save to disk
filepath = os.path.join(self.save_dir, self.predictions_file)
with open(filepath, 'w') as csv_file:
writer = csv.writer(csv_file)
for key, value in predicted_labels.items():
writer.writerow([key, value])
return predicted_labels
runall.shο
#!/bin/bash
# Python virtual environment (no conda/micromamba)
PYTHON_VENV="../../../envAI_hdfml"
# Clear SLURM logs (*.out and *.err files)
rm -rf logs_slurm checkpoints* mllogs*
mkdir logs_slurm
rm -rf logs_torchrun
# DDP itwinai
DIST_MODE="ddp"
RUN_NAME="ddp-itwinai"
TRAINING_CMD="$PYTHON_VENV/bin/itwinai exec-pipeline --config config.yaml --pipe-key training_pipeline -o strategy=ddp -o checkpoints_location=checkpoints_ddp"
sbatch --export=ALL,DIST_MODE="$DIST_MODE",RUN_NAME="$RUN_NAME",TRAINING_CMD="$TRAINING_CMD",PYTHON_VENV="$PYTHON_VENV" \
--job-name="$RUN_NAME-n$N" \
--output="logs_slurm/job-$RUN_NAME-n$N.out" \
--error="logs_slurm/job-$RUN_NAME-n$N.err" \
slurm.sh
# DeepSpeed itwinai
DIST_MODE="deepspeed"
RUN_NAME="deepspeed-itwinai"
TRAINING_CMD="$PYTHON_VENV/bin/itwinai exec-pipeline --config config.yaml --pipe-key training_pipeline -o strategy=deepspeed -o checkpoints_location=checkpoints_deepspeed"
sbatch --export=ALL,DIST_MODE="$DIST_MODE",RUN_NAME="$RUN_NAME",TRAINING_CMD="$TRAINING_CMD",PYTHON_VENV="$PYTHON_VENV" \
--job-name="$RUN_NAME-n$N" \
--output="logs_slurm/job-$RUN_NAME-n$N.out" \
--error="logs_slurm/job-$RUN_NAME-n$N.err" \
slurm.sh
# Horovod itwinai
DIST_MODE="horovod"
RUN_NAME="horovod-itwinai"
TRAINING_CMD="$PYTHON_VENV/bin/itwinai exec-pipeline --config config.yaml --pipe-key training_pipeline -o strategy=horovod -o checkpoints_location=checkpoints_hvd"
sbatch --export=ALL,DIST_MODE="$DIST_MODE",RUN_NAME="$RUN_NAME",TRAINING_CMD="$TRAINING_CMD",PYTHON_VENV="$PYTHON_VENV" \
--job-name="$RUN_NAME-n$N" \
--output="logs_slurm/job-$RUN_NAME-n$N.out" \
--error="logs_slurm/job-$RUN_NAME-n$N.err" \
slurm.sh
slurm.shο
#!/bin/bash
# SLURM jobscript for JSC systems
# Job configuration
#SBATCH --job-name=distributed_training
#SBATCH --account=intertwin
#SBATCH --mail-user=
#SBATCH --mail-type=ALL
#SBATCH --output=job.out
#SBATCH --error=job.err
#SBATCH --time=00:30:00
# Resources allocation
#SBATCH --partition=batch
#SBATCH --nodes=2
#SBATCH --gpus-per-node=4
#SBATCH --cpus-per-gpu=4
#SBATCH --exclusive
# gres options have to be disabled for deepv
#SBATCH --gres=gpu:4
# Load environment modules
ml Stages/2024 GCC OpenMPI CUDA/12 MPI-settings/CUDA Python HDF5 PnetCDF libaio mpi4py
# Job info
echo "DEBUG: TIME: $(date)"
sysN="$(uname -n | cut -f2- -d.)"
sysN="${sysN%%[0-9]*}"
echo "Running on system: $sysN"
echo "DEBUG: EXECUTE: $EXEC"
echo "DEBUG: SLURM_SUBMIT_DIR: $SLURM_SUBMIT_DIR"
echo "DEBUG: SLURM_JOB_ID: $SLURM_JOB_ID"
echo "DEBUG: SLURM_JOB_NODELIST: $SLURM_JOB_NODELIST"
echo "DEBUG: SLURM_NNODES: $SLURM_NNODES"
echo "DEBUG: SLURM_NTASKS: $SLURM_NTASKS"
echo "DEBUG: SLURM_TASKS_PER_NODE: $SLURM_TASKS_PER_NODE"
echo "DEBUG: SLURM_SUBMIT_HOST: $SLURM_SUBMIT_HOST"
echo "DEBUG: SLURMD_NODENAME: $SLURMD_NODENAME"
echo "DEBUG: CUDA_VISIBLE_DEVICES: $CUDA_VISIBLE_DEVICES"
if [ "$DEBUG" = true ] ; then
echo "DEBUG: NCCL_DEBUG=INFO"
export NCCL_DEBUG=INFO
fi
echo
# Setup env for distributed ML
export CUDA_VISIBLE_DEVICES="0,1,2,3"
export OMP_NUM_THREADS=1
if [ "$SLURM_CPUS_PER_GPU" -gt 0 ] ; then
export OMP_NUM_THREADS=$SLURM_CPUS_PER_GPU
fi
# Env vairables check
if [ -z "$DIST_MODE" ]; then
>&2 echo "ERROR: env variable DIST_MODE is not set. Allowed values are 'horovod', 'ddp' or 'deepspeed'"
exit 1
fi
if [ -z "$RUN_NAME" ]; then
>&2 echo "WARNING: env variable RUN_NAME is not set. It's a way to identify some specific run of an experiment."
RUN_NAME=$DIST_MODE
fi
if [ -z "$TRAINING_CMD" ]; then
>&2 echo "ERROR: env variable TRAINING_CMD is not set. It's the python command to execute."
exit 1
fi
if [ -z "$PYTHON_VENV" ]; then
>&2 echo "WARNING: env variable PYTHON_VENV is not set. It's the path to a python virtual environment."
else
# Activate Python virtual env
source $PYTHON_VENV/bin/activate
fi
# Get GPUs info per node
srun --cpu-bind=none --ntasks-per-node=1 bash -c 'echo -e "NODE hostname: $(hostname)\n$(nvidia-smi)\n\n"'
# Launch training
if [ "$DIST_MODE" == "ddp" ] ; then
echo "DDP training: $TRAINING_CMD"
srun --cpu-bind=none --ntasks-per-node=1 \
bash -c "torchrun \
--log_dir='logs_torchrun' \
--nnodes=$SLURM_NNODES \
--nproc_per_node=$SLURM_GPUS_PER_NODE \
--rdzv_id=$SLURM_JOB_ID \
--rdzv_conf=is_host=\$(((SLURM_NODEID)) && echo 0 || echo 1) \
--rdzv_backend=c10d \
--rdzv_endpoint='$(scontrol show hostnames "$SLURM_JOB_NODELIST" | head -n 1)'i:29500 \
$TRAINING_CMD"
elif [ "$DIST_MODE" == "deepspeed" ] ; then
echo "DEEPSPEED training: $TRAINING_CMD"
MASTER_ADDR=$(scontrol show hostnames "\$SLURM_JOB_NODELIST" | head -n 1)i
export MASTER_ADDR
export MASTER_PORT=29500
srun --cpu-bind=none --ntasks-per-node=$SLURM_GPUS_PER_NODE --cpus-per-task=$SLURM_CPUS_PER_GPU \
$TRAINING_CMD
# # Run with deepspeed launcher: set --ntasks-per-node=1
# # https://www.deepspeed.ai/getting-started/#multi-node-environment-variables
# export NCCL_IB_DISABLE=1
# export NCCL_SOCKET_IFNAME=eth0
# nodelist=$(scontrol show hostname $SLURM_NODELIST)
# echo "$nodelist" | sed -e 's/$/ slots=4/' > .hostfile
# # Requires passwordless SSH access among compute node
# srun --cpu-bind=none deepspeed --hostfile=.hostfile $TRAINING_CMD --deepspeed
# rm .hostfile
elif [ "$DIST_MODE" == "horovod" ] ; then
echo "HOROVOD training: $TRAINING_CMD"
srun --cpu-bind=none --ntasks-per-node=$SLURM_GPUS_PER_NODE --cpus-per-task=$SLURM_CPUS_PER_GPU \
$TRAINING_CMD
else
>&2 echo "ERROR: unrecognized \$DIST_MODE env variable"
exit 1
fi
This section covers the MNIST use case, which utilizes the tensorflow framework for training and evaluation. The following files are integral to this use case:
Tensorflowο
dataloader.pyο
The dataloader.py script is responsible for loading the MNIST dataset and preparing it for training.
from typing import Tuple
import tensorflow.keras as keras
import tensorflow as tf
from itwinai.components import DataGetter, DataProcessor, monitor_exec
class MNISTDataGetter(DataGetter):
def __init__(self):
super().__init__()
self.save_parameters(**self.locals2params(locals()))
@monitor_exec
def execute(self) -> Tuple:
train, test = keras.datasets.mnist.load_data()
return train, test
class MNISTDataPreproc(DataProcessor):
def __init__(self, classes: int):
super().__init__()
self.save_parameters(**self.locals2params(locals()))
self.classes = classes
@monitor_exec
def execute(
self,
*datasets,
) -> Tuple:
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = (
tf.data.experimental.AutoShardPolicy.DATA)
preprocessed = []
for dataset in datasets:
x, y = dataset
y = keras.utils.to_categorical(y, self.classes)
sliced = tf.data.Dataset.from_tensor_slices((x, y))
sliced = sliced.with_options(options)
preprocessed.append(sliced)
return tuple(preprocessed)
pipeline.yamlο
This YAML file defines the pipeline configuration for the MNIST use case. It includes settings for the model, training, and evaluation.
# General config
verbose: auto
micro_batch_size: 17
epochs: 3
checkpoints_path: checkpoints
tb_log_dir: ./logs
# Training pipeline
pipeline:
class_path: itwinai.pipeline.Pipeline
init_args:
steps:
- class_path: dataloader.MNISTDataGetter
- class_path: dataloader.MNISTDataPreproc
init_args:
classes: 10
- class_path: itwinai.tensorflow.trainer.TensorflowTrainer
init_args:
epochs: ${epochs}
micro_batch_size: ${micro_batch_size}
verbose: ${verbose}
model_compile_config:
loss:
class_path: tensorflow.keras.losses.CategoricalCrossentropy
init_args:
from_logits: False
optimizer:
class_path: tensorflow.keras.optimizers.Adam
init_args:
learning_rate: 0.001
model_config:
class_path: itwinai.tensorflow.models.mnist.MNIST_Model
init_args:
input_shape: [ 28, 28, 1 ]
output_shape: 10
callbacks:
- class_path: keras.callbacks.EarlyStopping
init_args:
patience: 2
- class_path: keras.callbacks.ModelCheckpoint
init_args:
filepath: ${checkpoints_path}/model.{epoch:02d}-{val_loss:.2f}.keras
- class_path: keras.callbacks.TensorBoard
init_args:
log_dir: ${tb_log_dir}
startscript.shο
The startscript is a shell script to initiate the training pipeline.
#!/bin/bash
# general configuration of the job
#SBATCH --job-name=PrototypeTest
#SBATCH --account=intertwin
#SBATCH --mail-user=
#SBATCH --mail-type=ALL
#SBATCH --output=job.out
#SBATCH --error=job.err
#SBATCH --time=00:30:00
# configure node and process count on the CM
#SBATCH --partition=batch
#SBATCH --nodes=2
#SBATCH --ntasks-per-node=1
#SBATCH --cpus-per-task=4
#SBATCH --gpus-per-node=4
#SBATCH --exclusive
# gres options have to be disabled for deepv
#SBATCH --gres=gpu:4
# load modules
ml --force purge
ml Stages/2024 GCC/12.3.0 OpenMPI CUDA/12 MPI-settings/CUDA Python/3.11 HDF5 PnetCDF libaio mpi4py CMake cuDNN/8.9.5.29-CUDA-12
# shellcheck source=/dev/null
source ~/.bashrc
# Using legacy (2.16) version of Keras
# Latest version with TF (2.16) installs Keras 3.3
# which returns an error for multi-node execution
export TF_USE_LEGACY_KERAS=1
# ON LOGIN NODE download datasets:
# ../../../.venv-tf/bin/itwinai exec-pipeline --config pipeline.yaml --pipe-key pipeline --steps 0
source ../../../envAItf_hdfml/bin/activate
srun itwinai exec-pipeline --config pipeline.yaml --pipe-key pipeline -o verbose=2