3. Tensorflow scaling test
3.1. Benchmarking tutorial using JUBE
Benchmarking of itwinai can also be performed with the JUBE Benchmarking Environment from JSC.
The JUBE benchmarking tool is already setup in the environment files provided under env-files.
3.1.1. Source the environment
Find the location of your environment file along with the module load commands, such as:
ml Stages/2024 GCC/12.3.0 OpenMPI CUDA/12 MPI-settings/CUDA Python HDF5 PnetCDF libaio mpi4py CMake cuDNN/8.9.5.29-CUDA-12
source envAI_hdfml/bin/activate
3.1.2. Run benchmark
The benchmarks are defined in the general_jobsys.xml file.
One can specify the configurations in terms of parameters such as the number of nodes.
The benchmark can be simply launched with the command:
jube run general_jobsys.xml
3.1.3. Monitor status of benchmark run
The status of the run can be monitored with:
jube continue bench_run --id last
3.1.4. Check results of the benchmark run
The results can be viewed with:
jube result -a bench_run --id last
This will create result-csv.dat file in the results folder.
The scaling and efficiency plots can be generated with the bench_plot.ipynb file
which takes the result-csv.dat file as input.
3.2. train.py
"""
Show how to use TensorFlow MultiWorkerMirroredStrategy on itwinai.
for an Imagenet dataset
with SLURM:
>>> sbatch tfmirrored_slurm.sh
"""
import argparse
import sys
from timeit import default_timer as timer
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D
from tensorflow.keras.models import Model
from itwinai.tensorflow.distributed import get_strategy
def parse_args():
"""
Parse args
"""
parser = argparse.ArgumentParser(description='TensorFlow ImageNet')
parser.add_argument(
"--strategy", "-s", type=str,
choices=['mirrored'],
default='mirrored'
)
parser.add_argument(
"--data_dir", type=str,
default='./'
)
parser.add_argument(
"--batch_size", type=int,
default=128
)
parser.add_argument(
"--epochs", type=int,
default=3
)
args = parser.parse_args()
return args
def deserialization_fn(serialized_fn):
"""Imagenet data processing
Args:
serialized_example (Any): Input function
Returns:
Any: Images and associated labels
"""
parsed_example = tf.io.parse_single_example(
serialized_fn,
features={
'image/encoded': tf.io.FixedLenFeature([], tf.string),
'image/class/label': tf.io.FixedLenFeature([], tf.int64),
}
)
image = tf.image.decode_jpeg(parsed_example['image/encoded'], channels=3)
image = tf.image.resize(image, (224, 224))
label = tf.cast(parsed_example['image/class/label'], tf.int64) - 1
return image, label
def tf_records_loader(files_path, shuffle=False):
"""tf_records dataset reader
Args:
files_path (String): Path to location of data
shuffle (bool, optional): If dataset should be shuffled.
Defaults to False.
Returns:
tf.data.Dataset: Returns dataset to be trained
"""
datasets = tf.data.Dataset.from_tensor_slices(files_path)
datasets = datasets.shuffle(len(files_path)) if shuffle else datasets
datasets = datasets.flat_map(tf.data.TFRecordDataset)
datasets = datasets.map(
deserialization_fn, num_parallel_calls=tf.data.AUTOTUNE)
return datasets
def main():
args = parse_args()
input_shape = (224, 224, 3)
num_classes = 1000
if args.strategy == 'mirrored':
strategy = get_strategy()[0]
else:
raise NotImplementedError(
f"Strategy {args.strategy} is not recognized/implemented.")
with strategy.scope():
base_model = keras.applications.ResNet50(
weights=None,
input_shape=input_shape,
include_top=False,
)
x = base_model.output
x = GlobalAveragePooling2D()(x)
x = Dense(1024, activation='relu')(x)
predictions = Dense(num_classes, activation='softmax')(x)
model = Model(inputs=base_model.input, outputs=predictions)
model.compile(loss=keras.losses.sparse_categorical_crossentropy,
optimizer=keras.optimizers.Adam(),
metrics=['accuracy']
)
# scale batch size with number of workers
batch_size = args.batch_size * get_strategy()[1]
dir_imagenet = args.data_dir+'imagenet-1K-tfrecords'
train_shard_suffix = 'train-*-of-01024'
test_shard_suffix = 'validation-*-of-00128'
train_set_path = sorted(
tf.io.gfile.glob(dir_imagenet + f'/{train_shard_suffix}')
)
test_set_path = sorted(
tf.io.gfile.glob(dir_imagenet + f'/{test_shard_suffix}')
)
train_dataset = tf_records_loader(train_set_path, shuffle=True)
test_dataset = tf_records_loader(test_set_path)
train_dataset = train_dataset.batch(
batch_size).prefetch(tf.data.experimental.AUTOTUNE)
test_dataset = test_dataset.batch(
batch_size).prefetch(tf.data.experimental.AUTOTUNE)
# distribute datasets among mirrored replicas
dist_train = strategy.experimental_distribute_dataset(
train_dataset
)
dist_test = strategy.experimental_distribute_dataset(
test_dataset
)
# TODO: add callbacks to evaluate per epoch time
et = timer()
# trains the model
model.fit(dist_train, epochs=args.epochs, steps_per_epoch=2000, verbose=10)
print('TIMER: total epoch time:',
timer() - et, ' s')
print('TIMER: average epoch time:',
(timer() - et) / (args.epochs), ' s')
test_scores = model.evaluate(dist_test, steps=100, verbose=5)
print('Test loss:', test_scores[0])
print('Test accuracy:', test_scores[1])
if __name__ == "__main__":
main()
sys.exit()
# eof
3.3. jube_ddp.sh
#!/bin/bash
# general configuration of the job
#SBATCH --job-name=JUBE_DDP
#SBATCH --account=#ACC#
#SBATCH --mail-user=
#SBATCH --mail-type=ALL
#SBATCH --output=job.out
#SBATCH --error=job.err
#SBATCH --time=#TIMELIM#
# configure node and process count on the CM
#SBATCH --partition=#QUEUE#
#SBATCH --nodes=#NODES#
#SBATCH --cpus-per-task=#NW#
#SBATCH --gpus-per-node=#NGPU#
#SBATCH --exclusive
# gres options have to be disabled for deepv
#SBATCH --gres=gpu:4
set -x
unset http_proxy https_proxy HTTP_PROXY HTTPS_PROXY
# set modules
ml --force purge
ml Stages/2024 GCC/12.3.0 OpenMPI CUDA/12 MPI-settings/CUDA Python HDF5 PnetCDF libaio mpi4py CMake cuDNN/8.9.5.29-CUDA-12
# set env
source /p/project/intertwin/rakesh/repo_push/itwinai/envAItf_hdfml/bin/activate
# Using legacy (2.16) version of Keras
# Latest version with TF (2.16) installs Keras 3.3
# which returns an error for multi-node execution
export TF_USE_LEGACY_KERAS=1
# sleep a sec
sleep 1
# job info
echo "DEBUG: TIME: $(date)"
echo "DEBUG: EXECUTE: $EXEC"
echo "DEBUG: SLURM_SUBMIT_DIR: $SLURM_SUBMIT_DIR"
echo "DEBUG: SLURM_JOB_ID: $SLURM_JOB_ID"
echo "DEBUG: SLURM_JOB_NODELIST: $SLURM_JOB_NODELIST"
echo "DEBUG: SLURM_NNODES: $SLURM_NNODES"
echo "DEBUG: SLURM_NTASKS: $SLURM_NTASKS"
echo "DEBUG: SLURM_TASKS_PER_NODE: $SLURM_TASKS_PER_NODE"
echo "DEBUG: SLURM_SUBMIT_HOST: $SLURM_SUBMIT_HOST"
echo "DEBUG: SLURMD_NODENAME: $SLURMD_NODENAME"
echo "DEBUG: CUDA_VISIBLE_DEVICES: $CUDA_VISIBLE_DEVICES"
echo "DEBUG: SLURM_NODELIST: $SLURM_NODELIST"
echo
# set comm
export CUDA_VISIBLE_DEVICES="0,1,2,3"
export OMP_NUM_THREADS=1
if [ "$SLURM_CPUS_PER_TASK" -gt 0 ] ; then
export OMP_NUM_THREADS=$SLURM_CPUS_PER_TASK
fi
dataDir='/p/scratch/intertwin/datasets/imagenet/'
COMMAND="train.py"
EXEC="$COMMAND \
--data_dir $dataDir"
srun python -u $EXEC
#eof