1. Introduction on distributed training with TensorFlow

1.1. Tutorial: distributed strategies for Tensorflow

In this tutorial we show how to use Tensorflow MultiWorkerMirroredStrategy. Note that the environment is tested on the HDFML system at JSC. For other systems, the module versions might need change accordingly. Other strategies will be updated here.

First, from the root of this repository, build the environment containing Tensorflow. You can try with:

# Creates a Python venv called envAItf_hdfml
make tf-gpu-jsc

If you want to distribute the code in train.py, run from terminal:

sbatch tfmirrored_slurm.sh

1.2. train.py

"""
Show how to use TensorFlow MultiWorkerMirroredStrategy on itwinai.

with SLURM:
>>> sbatch tfmirrored_slurm.sh

"""
from typing import Any
import argparse
import tensorflow as tf
from tensorflow import keras
from itwinai.tensorflow.distributed import get_strategy


def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--strategy", "-s", type=str,
        choices=['mirrored'],
        default='mirrored'
    )
    parser.add_argument(
        "--batch_size", "-bs", type=int,
        default=64
    )
    parser.add_argument(
        "--shuffle_dataloader",
        action=argparse.BooleanOptionalAction
    )

    args = parser.parse_args()
    return args


def tf_rnd_dataset(args):
    """Dummy TF dataset."""
    (x_train, y_train), (x_test, y_test) = \
        tf.keras.datasets.mnist.load_data(
            path='p/scratch/intertwin/datasets/.keras/datasets/mnist.npz')

    train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
    train_dataset = train_dataset.batch(args.batch_size)

    test_dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test))
    test_dataset = test_dataset.batch(args.batch_size)

    return train_dataset, test_dataset


def trainer_entrypoint_fn(
        foo: Any, args: argparse.Namespace, strategy
) -> int:
    """Dummy training function, similar to custom code developed
    by some use case.
    """
    # dataset to be trained
    train_dataset, test_dataset = tf_rnd_dataset(args)

    # distribute datasets among mirrored replicas
    dist_train = strategy.experimental_distribute_dataset(
        train_dataset
    )
    dist_test = strategy.experimental_distribute_dataset(
        test_dataset
    )

    # define and compile model within strategy.scope()
    with strategy.scope():
        # Local model
        model = tf.keras.models.Sequential([
            tf.keras.layers.Flatten(input_shape=(28, 28)),
            tf.keras.layers.Dense(128, activation='relu'),
            tf.keras.layers.Dense(10)
        ])

        model.compile(loss=keras.losses.SparseCategoricalCrossentropy
                      (from_logits=True),
                      optimizer=keras.optimizers.RMSprop(),
                      metrics=['accuracy']
                      )

    model.fit(dist_train,
              epochs=5,
              steps_per_epoch=2000)

    test_scores = model.evaluate(dist_test, verbose=0, steps=500)

    print('Test loss:', test_scores[0])
    print('Test accuracy:', test_scores[1])

    return 123


if __name__ == "__main__":

    args = parse_args()

    # Instantiate Strategy
    if args.strategy == 'mirrored':
        if (len(tf.config.list_physical_devices('GPU')) == 0):
            raise RuntimeError('Resources unavailable')
        strategy, num_replicas = get_strategy()
    else:
        raise NotImplementedError(
            f"Strategy {args.strategy} is not recognized/implemented.")

    # Launch distributed training
    trainer_entrypoint_fn("foobar", args, strategy)

1.3. tfmirrored_slurm.sh

#!/bin/bash

# general configuration of the job
#SBATCH --job-name=TFTest
#SBATCH --account=intertwin
#SBATCH --mail-user=
#SBATCH --mail-type=ALL
#SBATCH --output=job.out
#SBATCH --error=job.err
#SBATCH --time=00:15:00

# configure node and process count on the CM
#SBATCH --partition=batch
#SBATCH --nodes=2
#SBATCH --ntasks-per-node=1
#SBATCH --cpus-per-task=32
#SBATCH --gpus-per-node=4
#SBATCH --exclusive

# gres options have to be disabled for deepv
#SBATCH --gres=gpu:4

set -x
unset http_proxy https_proxy HTTP_PROXY HTTPS_PROXY

# set modules
ml --force purge
ml Stages/2024 GCC/12.3.0 OpenMPI CUDA/12 MPI-settings/CUDA Python HDF5 PnetCDF libaio mpi4py CMake cuDNN/8.9.5.29-CUDA-12

# set env - change to location of your environment
source itwinai/envAItf_hdfml/bin/activate

# Using legacy (2.16) version of Keras
# Latest version with TF (2.16) installs Keras 3.3
# which returns an error for multi-node execution
export TF_USE_LEGACY_KERAS=1
 
# sleep a sec
sleep 1

# job info
echo "DEBUG: TIME: $(date)"
echo "DEBUG: EXECUTE: $EXEC"
echo "DEBUG: SLURM_SUBMIT_DIR: $SLURM_SUBMIT_DIR"
echo "DEBUG: SLURM_JOB_ID: $SLURM_JOB_ID"
echo "DEBUG: SLURM_JOB_NODELIST: $SLURM_JOB_NODELIST"
echo "DEBUG: SLURM_NNODES: $SLURM_NNODES"
echo "DEBUG: SLURM_NTASKS: $SLURM_NTASKS"
echo "DEBUG: SLURM_TASKS_PER_NODE: $SLURM_TASKS_PER_NODE"
echo "DEBUG: SLURM_SUBMIT_HOST: $SLURM_SUBMIT_HOST"
echo "DEBUG: SLURMD_NODENAME: $SLURMD_NODENAME"
echo "DEBUG: CUDA_VISIBLE_DEVICES: $CUDA_VISIBLE_DEVICES"
echo "DEBUG: SLURM_NODELIST: $SLURM_NODELIST"
echo

# set comm
export CUDA_VISIBLE_DEVICES="0,1,2,3"
export OMP_NUM_THREADS=1
if [ "$SLURM_CPUS_PER_TASK" -gt 0 ] ; then
  export OMP_NUM_THREADS=$SLURM_CPUS_PER_TASK
fi

COMMAND="train.py"

EXEC="$COMMAND "

srun python -u $EXEC