3. Tensorflow scaling test

3.1. Benchmarking tutorial using JUBE

Benchmarking of itwinai can also be performed with the JUBE Benchmarking Environment from JSC. The JUBE benchmarking tool is already setup in the environment files provided under env-files.

3.1.1. Source the environment

Find the location of your environment file along with the module load commands, such as:

ml Stages/2024 GCC/12.3.0 OpenMPI CUDA/12 MPI-settings/CUDA Python HDF5 PnetCDF libaio mpi4py CMake  cuDNN/8.9.5.29-CUDA-12
source envAI_hdfml/bin/activate

3.1.2. Run benchmark

The benchmarks are defined in the general_jobsys.xml file. One can specify the configurations in terms of parameters such as the number of nodes. The benchmark can be simply launched with the command:

jube run general_jobsys.xml

3.1.3. Monitor status of benchmark run

The status of the run can be monitored with:

jube continue bench_run --id last

3.1.4. Check results of the benchmark run

The results can be viewed with:

jube result -a bench_run --id last

This will create result-csv.dat file in the results folder.

The scaling and efficiency plots can be generated with the bench_plot.ipynb file which takes the result-csv.dat file as input.

3.2. train.py

"""
 Show how to use TensorFlow MultiWorkerMirroredStrategy on itwinai.
 for an Imagenet dataset
 with SLURM:
 >>> sbatch tfmirrored_slurm.sh

 """
import argparse
import sys
from timeit import default_timer as timer

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D
from tensorflow.keras.models import Model

from itwinai.tensorflow.distributed import get_strategy


def parse_args():
    """
    Parse args
    """
    parser = argparse.ArgumentParser(description='TensorFlow ImageNet')

    parser.add_argument(
        "--strategy", "-s", type=str,
        choices=['mirrored'],
        default='mirrored'
    )
    parser.add_argument(
        "--data_dir", type=str,
        default='./'
    )
    parser.add_argument(
        "--batch_size", type=int,
        default=128
    )
    parser.add_argument(
        "--epochs", type=int,
        default=3
    )

    args = parser.parse_args()
    return args


def deserialization_fn(serialized_fn):
    """Imagenet data processing

    Args:
        serialized_example (Any): Input function

    Returns:
        Any: Images and associated labels
    """
    parsed_example = tf.io.parse_single_example(
        serialized_fn,
        features={
            'image/encoded': tf.io.FixedLenFeature([], tf.string),
            'image/class/label': tf.io.FixedLenFeature([], tf.int64),
        }
    )
    image = tf.image.decode_jpeg(parsed_example['image/encoded'], channels=3)
    image = tf.image.resize(image, (224, 224))
    label = tf.cast(parsed_example['image/class/label'], tf.int64) - 1
    return image, label


def tf_records_loader(files_path, shuffle=False):
    """tf_records dataset reader

    Args:
        files_path (String): Path to location of data
        shuffle (bool, optional): If dataset should be shuffled.
        Defaults to False.

    Returns:
        tf.data.Dataset: Returns dataset to be trained
    """
    datasets = tf.data.Dataset.from_tensor_slices(files_path)
    datasets = datasets.shuffle(len(files_path)) if shuffle else datasets
    datasets = datasets.flat_map(tf.data.TFRecordDataset)
    datasets = datasets.map(
        deserialization_fn, num_parallel_calls=tf.data.AUTOTUNE)
    return datasets


def main():
    args = parse_args()

    input_shape = (224, 224, 3)
    num_classes = 1000

    if args.strategy == 'mirrored':
        strategy = get_strategy()[0]
    else:
        raise NotImplementedError(
            f"Strategy {args.strategy} is not recognized/implemented.")

    with strategy.scope():
        base_model = keras.applications.ResNet50(
            weights=None,
            input_shape=input_shape,
            include_top=False,
        )

        x = base_model.output
        x = GlobalAveragePooling2D()(x)
        x = Dense(1024, activation='relu')(x)
        predictions = Dense(num_classes, activation='softmax')(x)

        model = Model(inputs=base_model.input, outputs=predictions)

        model.compile(loss=keras.losses.sparse_categorical_crossentropy,
                      optimizer=keras.optimizers.Adam(),
                      metrics=['accuracy']
                      )

    # scale batch size with number of workers
    batch_size = args.batch_size * get_strategy()[1]

    dir_imagenet = args.data_dir+'imagenet-1K-tfrecords'
    train_shard_suffix = 'train-*-of-01024'
    test_shard_suffix = 'validation-*-of-00128'

    train_set_path = sorted(
        tf.io.gfile.glob(dir_imagenet + f'/{train_shard_suffix}')
    )
    test_set_path = sorted(
        tf.io.gfile.glob(dir_imagenet + f'/{test_shard_suffix}')
    )

    train_dataset = tf_records_loader(train_set_path, shuffle=True)
    test_dataset = tf_records_loader(test_set_path)

    train_dataset = train_dataset.batch(
        batch_size).prefetch(tf.data.experimental.AUTOTUNE)
    test_dataset = test_dataset.batch(
        batch_size).prefetch(tf.data.experimental.AUTOTUNE)

    # distribute datasets among mirrored replicas
    dist_train = strategy.experimental_distribute_dataset(
        train_dataset
    )
    dist_test = strategy.experimental_distribute_dataset(
        test_dataset
    )

    # TODO: add callbacks to evaluate per epoch time
    et = timer()

    # trains the model
    model.fit(dist_train, epochs=args.epochs, steps_per_epoch=2000, verbose=10)

    print('TIMER: total epoch time:',
          timer() - et, ' s')
    print('TIMER: average epoch time:',
          (timer() - et) / (args.epochs), ' s')

    test_scores = model.evaluate(dist_test, steps=100, verbose=5)

    print('Test loss:', test_scores[0])
    print('Test accuracy:', test_scores[1])


if __name__ == "__main__":
    main()
    sys.exit()

# eof

3.3. jube_ddp.sh

#!/bin/bash

# general configuration of the job
#SBATCH --job-name=JUBE_DDP
#SBATCH --account=#ACC#
#SBATCH --mail-user=
#SBATCH --mail-type=ALL
#SBATCH --output=job.out
#SBATCH --error=job.err
#SBATCH --time=#TIMELIM#

# configure node and process count on the CM
#SBATCH --partition=#QUEUE#
#SBATCH --nodes=#NODES#
#SBATCH --cpus-per-task=#NW#
#SBATCH --gpus-per-node=#NGPU#
#SBATCH --exclusive

# gres options have to be disabled for deepv
#SBATCH --gres=gpu:4

set -x
unset http_proxy https_proxy HTTP_PROXY HTTPS_PROXY

# set modules
ml --force purge
ml Stages/2024 GCC/12.3.0 OpenMPI CUDA/12 MPI-settings/CUDA Python HDF5 PnetCDF libaio mpi4py CMake  cuDNN/8.9.5.29-CUDA-12

# set env
source /p/project/intertwin/rakesh/repo_push/itwinai/envAItf_hdfml/bin/activate

# Using legacy (2.16) version of Keras
# Latest version with TF (2.16) installs Keras 3.3
# which returns an error for multi-node execution
export TF_USE_LEGACY_KERAS=1

# sleep a sec
sleep 1

# job info
echo "DEBUG: TIME: $(date)"
echo "DEBUG: EXECUTE: $EXEC"
echo "DEBUG: SLURM_SUBMIT_DIR: $SLURM_SUBMIT_DIR"
echo "DEBUG: SLURM_JOB_ID: $SLURM_JOB_ID"
echo "DEBUG: SLURM_JOB_NODELIST: $SLURM_JOB_NODELIST"
echo "DEBUG: SLURM_NNODES: $SLURM_NNODES"
echo "DEBUG: SLURM_NTASKS: $SLURM_NTASKS"
echo "DEBUG: SLURM_TASKS_PER_NODE: $SLURM_TASKS_PER_NODE"
echo "DEBUG: SLURM_SUBMIT_HOST: $SLURM_SUBMIT_HOST"
echo "DEBUG: SLURMD_NODENAME: $SLURMD_NODENAME"
echo "DEBUG: CUDA_VISIBLE_DEVICES: $CUDA_VISIBLE_DEVICES"
echo "DEBUG: SLURM_NODELIST: $SLURM_NODELIST"
echo

# set comm
export CUDA_VISIBLE_DEVICES="0,1,2,3"
export OMP_NUM_THREADS=1
if [ "$SLURM_CPUS_PER_TASK" -gt 0 ] ; then
  export OMP_NUM_THREADS=$SLURM_CPUS_PER_TASK
fi

dataDir='/p/scratch/intertwin/datasets/imagenet/'

COMMAND="train.py"

EXEC="$COMMAND \
    --data_dir $dataDir"

srun python -u $EXEC


#eof