1. Introduction on distributed training with TensorFlow
1.1. Tutorial: distributed strategies for Tensorflow
In this tutorial we show how to use Tensorflow MultiWorkerMirroredStrategy.
Note that the environment is tested on the HDFML system at JSC.
For other systems, the module versions might need change accordingly.
Other strategies will be updated here.
First, from the root of this repository, build the environment containing Tensorflow. You can try with:
# Creates a Python venv called envAItf_hdfml
make tf-gpu-jsc
If you want to distribute the code in train.py, run from terminal:
sbatch tfmirrored_slurm.sh
1.2. train.py
"""
Show how to use TensorFlow MultiWorkerMirroredStrategy on itwinai.
with SLURM:
>>> sbatch tfmirrored_slurm.sh
"""
from typing import Any
import argparse
import tensorflow as tf
from tensorflow import keras
from itwinai.tensorflow.distributed import get_strategy
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument(
"--strategy", "-s", type=str,
choices=['mirrored'],
default='mirrored'
)
parser.add_argument(
"--batch_size", "-bs", type=int,
default=64
)
parser.add_argument(
"--shuffle_dataloader",
action=argparse.BooleanOptionalAction
)
args = parser.parse_args()
return args
def tf_rnd_dataset(args):
"""Dummy TF dataset."""
(x_train, y_train), (x_test, y_test) = \
tf.keras.datasets.mnist.load_data(
path='p/scratch/intertwin/datasets/.keras/datasets/mnist.npz')
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = train_dataset.batch(args.batch_size)
test_dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test))
test_dataset = test_dataset.batch(args.batch_size)
return train_dataset, test_dataset
def trainer_entrypoint_fn(
foo: Any, args: argparse.Namespace, strategy
) -> int:
"""Dummy training function, similar to custom code developed
by some use case.
"""
# dataset to be trained
train_dataset, test_dataset = tf_rnd_dataset(args)
# distribute datasets among mirrored replicas
dist_train = strategy.experimental_distribute_dataset(
train_dataset
)
dist_test = strategy.experimental_distribute_dataset(
test_dataset
)
# define and compile model within strategy.scope()
with strategy.scope():
# Local model
model = tf.keras.models.Sequential([
tf.keras.layers.Flatten(input_shape=(28, 28)),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(10)
])
model.compile(loss=keras.losses.SparseCategoricalCrossentropy
(from_logits=True),
optimizer=keras.optimizers.RMSprop(),
metrics=['accuracy']
)
model.fit(dist_train,
epochs=5,
steps_per_epoch=2000)
test_scores = model.evaluate(dist_test, verbose=0, steps=500)
print('Test loss:', test_scores[0])
print('Test accuracy:', test_scores[1])
return 123
if __name__ == "__main__":
args = parse_args()
# Instantiate Strategy
if args.strategy == 'mirrored':
if (len(tf.config.list_physical_devices('GPU')) == 0):
raise RuntimeError('Resources unavailable')
strategy, num_replicas = get_strategy()
else:
raise NotImplementedError(
f"Strategy {args.strategy} is not recognized/implemented.")
# Launch distributed training
trainer_entrypoint_fn("foobar", args, strategy)
1.3. tfmirrored_slurm.sh
#!/bin/bash
# general configuration of the job
#SBATCH --job-name=TFTest
#SBATCH --account=intertwin
#SBATCH --mail-user=
#SBATCH --mail-type=ALL
#SBATCH --output=job.out
#SBATCH --error=job.err
#SBATCH --time=00:15:00
# configure node and process count on the CM
#SBATCH --partition=batch
#SBATCH --nodes=2
#SBATCH --ntasks-per-node=1
#SBATCH --cpus-per-task=32
#SBATCH --gpus-per-node=4
#SBATCH --exclusive
# gres options have to be disabled for deepv
#SBATCH --gres=gpu:4
set -x
unset http_proxy https_proxy HTTP_PROXY HTTPS_PROXY
# set modules
ml --force purge
ml Stages/2024 GCC/12.3.0 OpenMPI CUDA/12 MPI-settings/CUDA Python HDF5 PnetCDF libaio mpi4py CMake cuDNN/8.9.5.29-CUDA-12
# set env - change to location of your environment
source itwinai/envAItf_hdfml/bin/activate
# Using legacy (2.16) version of Keras
# Latest version with TF (2.16) installs Keras 3.3
# which returns an error for multi-node execution
export TF_USE_LEGACY_KERAS=1
# sleep a sec
sleep 1
# job info
echo "DEBUG: TIME: $(date)"
echo "DEBUG: EXECUTE: $EXEC"
echo "DEBUG: SLURM_SUBMIT_DIR: $SLURM_SUBMIT_DIR"
echo "DEBUG: SLURM_JOB_ID: $SLURM_JOB_ID"
echo "DEBUG: SLURM_JOB_NODELIST: $SLURM_JOB_NODELIST"
echo "DEBUG: SLURM_NNODES: $SLURM_NNODES"
echo "DEBUG: SLURM_NTASKS: $SLURM_NTASKS"
echo "DEBUG: SLURM_TASKS_PER_NODE: $SLURM_TASKS_PER_NODE"
echo "DEBUG: SLURM_SUBMIT_HOST: $SLURM_SUBMIT_HOST"
echo "DEBUG: SLURMD_NODENAME: $SLURMD_NODENAME"
echo "DEBUG: CUDA_VISIBLE_DEVICES: $CUDA_VISIBLE_DEVICES"
echo "DEBUG: SLURM_NODELIST: $SLURM_NODELIST"
echo
# set comm
export CUDA_VISIBLE_DEVICES="0,1,2,3"
export OMP_NUM_THREADS=1
if [ "$SLURM_CPUS_PER_TASK" -gt 0 ] ; then
export OMP_NUM_THREADS=$SLURM_CPUS_PER_TASK
fi
COMMAND="train.py"
EXEC="$COMMAND "
srun python -u $EXEC