4. GAN tutorial with PyTorchο
4.1. Tutorial on itwinai TorchTrainer adapted for the distributed GAN using MNIST datasetο
Author(s): Henry Mutegeki (CERN), Matteo Bunino (CERN), Jarl Sondre Sæther (CERN)
The code is adapted from
this example
and a simple non-distributed GAN model can be found in a file named simpleGAN.py
that serves as a baseline GAN example but focus is mainly on the train.py
file for the distributed GAN use case.
4.1.1. Setupο
First, from the root of this repository, build the environment containing pytorch and deepspeed. Refer to the itwinai installation steps.
Then navigate to the project working directory
cd tutorials/distributed-ml/torch-tutorial-GAN
4.1.2. Distributed training on a single node (interactive)ο
If you want to use SLURM in interactive mode, do the following:
# Allocate resources
$ salloc --partition=batch --nodes=1 --account=intertwin --gres=gpu:4 --time=1:59:00
job ID is XXXX
# Get a shell in the compute node (if using SLURM)
$ srun --jobid XXXX --overlap --pty /bin/bash
# Now you are inside the compute node
# On JSC, you may need to load some modules
ml --force purge
ml Stages/2024 GCC OpenMPI CUDA/12 MPI-settings/CUDA Python HDF5 PnetCDF libaio mpi4py
# ...before activating the Python environment (adapt this to your env name/path)
source ../../../envAI_hdfml/bin/activate
To launch the training with torch DDP use:
torchrun --standalone --nnodes=1 --nproc-per-node=gpu train.py --strategy ddp
# Optional -- from a SLURM login node:
srun --jobid XXXX --ntasks-per-node=1 torchrun --standalone --nnodes=1 --nproc-per-node=gpu train.py --strategy ddp
To launch the training with Microsoft DeepSpeed use:
deepspeed train.py -s deepspeed --deepspeed
# Optional -- from a SLURM login node:
srun --jobid XXXX --ntasks-per-node=1 deepspeed train.py --strategy deepspeed
4.1.3. Distributed training with SLURM (batch mode)ο
Before running distributed training with slurm, make sure to create the logs_slurm folder in your
current working directory to store slurm logs and outputs.
Each distributed strategy has its own SLURM job script, which should be used to run it:
If you want to distribute the code in train.py with torch DDP, run from terminal:
export DIST_MODE="ddp"
export RUN_NAME="ddp-itwinai"
export TRAINING_CMD="train.py --strategy ddp"
export PYTHON_VENV="../../../envAI_hdfml"
sbatch --export=ALL,DIST_MODE="$DIST_MODE",RUN_NAME="$RUN_NAME",TRAINING_CMD="$TRAINING_CMD",PYTHON_VENV="$PYTHON_VENV" \
--job-name="$RUN_NAME-n$N" \
--output="logs_slurm/job-$RUN_NAME-n$N.out" \
--error="logs_slurm/job-$RUN_NAME-n$N.err" \
slurm.sh
If you want to distribute the code in train.py with DeepSpeed, run from terminal:
export DIST_MODE="deepspeed"
export RUN_NAME="deepspeed-itwinai"
export TRAINING_CMD="train.py --strategy deepspeed"
export PYTHON_VENV="../../../envAI_hdfml"
sbatch --export=ALL,DIST_MODE="$DIST_MODE",RUN_NAME="$RUN_NAME",TRAINING_CMD="$TRAINING_CMD",PYTHON_VENV="$PYTHON_VENV" \
--job-name="$RUN_NAME-n$N" \
--output="logs_slurm/job-$RUN_NAME-n$N.out" \
--error="logs_slurm/job-$RUN_NAME-n$N.err" \
slurm.sh
4.1.4. Analyze the logsο
Analyze the logs with MLFlow:
itwinai mlflow-ui --path mllogs/mlflow
4.1.5. Distributed GAN Documentationο
This Guide provides a detailed explanation of how a simple Generative Adversarial Network (GAN) has been adapted to
operate within a distributed environment using the GANTrainer. This adaptation enables more efficient training on
larger datasets by leveraging distributed computing resources.
4.1.6. Overviewο
A Generative Adversarial Network consists of two key components:
Generator (G): Generates new data instances.
Discriminator (D): Evaluates them for authenticity, aiming to distinguish real instances from the fake ones generated by the Generator.
The training process involves iterative adjustments where the Generator tries to produce data indistinguishable from actual data, and the Discriminator improves its ability to detect fakes.
4.1.7. Steps to make a distributed GAN modelο
The code for all steps can be seen in the attached python file.
4.1.7.1. Step 1: Define Model Architectureο
Both the Generator and Discriminator are defined using PyTorchβs nn.Module. The specific architecture for both
includes convolutional layers that are well-suited for processing image data.
4.1.7.2. Step 2: Implement Distributed Trainingο
The GANTrainer class extends the custom itwinia TorchTrainer class and handles the initialization of models,
optimizers, and the distributed training strategy for the GAN. The snippet below shows how the GANTrainer is extending
the TorchTrainer class and initializing the parameters.
This is essentially done to handle the scenario for the GAN which comprises of two Neural Network models which is not
handled by the TorchTrainer that expects and handles one model. We also create custom optimizers for the Optimizer
and Discriminator GAN models:
4.1.7.3. Step 3: Training and Validation Logicο
The training alternates between updating the Discriminator using real and generated images and training the Generator to fool the Discriminator. Validation evaluates the performance of the Generator in deceiving the Discriminator.
4.1.7.4. Step 4: Visualization and Monitoringο
Training progress is monitored through visualizations of loss metrics and image samples generated periodically.
4.1.8. Takeawaysο
This readme describes steps taken to adapt a GAN for distributed training, aimed at enhancing efficiency and scalability for training on large-scale datasets. From this use case we learn the following:
itwinai
TorchTrainercan easily be adapted to different unique use cases like the GAN that has two models.Training models in a distributed environment may require some level of customization in the training architecturing but it comes with lots of performance improvements.
Distributed training for GANs requires a large dataset to reduce the chances of overfitting to the smaller data splits created during the training phase.
Always ensure that both the models, data and results for a specific process are accessible on the same device during training and validation in a distributed environment.
4.2. Python filesο
4.2.1. train.pyο
# --------------------------------------------------------------------------------------
# Part of the interTwin Project: https://www.intertwin.eu/
#
# Created by: Henry Mutegeki
#
# Credit:
# - Henry Mutegeki <henry.mutegeki@cern.ch> - CERN
# - Matteo Bunino <matteo.bunino@cern.ch> - CERN
# - Jarl Sondre Sæther <jarl.sondre.saether@cern.ch> - CERN
# --------------------------------------------------------------------------------------
import argparse
import os
from typing import Dict, Literal, Optional, Union
import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
from torchvision import datasets, transforms
from itwinai.loggers import Logger, MLFlowLogger
from itwinai.torch.config import TrainingConfiguration
from itwinai.torch.distributed import DeepSpeedStrategy
from itwinai.torch.trainer import TorchTrainer
from itwinai.torch.type import Metric
class Generator(nn.Module):
def __init__(self, z_dim, g_hidden, image_channel):
super(Generator, self).__init__()
self.main = nn.Sequential(
# input layer
nn.ConvTranspose2d(z_dim, g_hidden * 8, 4, 1, 0, bias=False),
nn.BatchNorm2d(g_hidden * 8),
nn.ReLU(True),
# 1st hidden layer
nn.ConvTranspose2d(g_hidden * 8, g_hidden * 4, 4, 2, 1, bias=False),
nn.BatchNorm2d(g_hidden * 4),
nn.ReLU(True),
# 2nd hidden layer
nn.ConvTranspose2d(g_hidden * 4, g_hidden * 2, 4, 2, 1, bias=False),
nn.BatchNorm2d(g_hidden * 2),
nn.ReLU(True),
# 3rd hidden layer
nn.ConvTranspose2d(g_hidden * 2, g_hidden, 4, 2, 1, bias=False),
nn.BatchNorm2d(g_hidden),
nn.ReLU(True),
# output layer
nn.ConvTranspose2d(g_hidden, image_channel, 4, 2, 1, bias=False),
nn.Tanh(),
)
def forward(self, input):
return self.main(input)
class Discriminator(nn.Module):
def __init__(self, d_hidden, image_channel):
super(Discriminator, self).__init__()
self.main = nn.Sequential(
# 1st layer
nn.Conv2d(image_channel, d_hidden, 4, 2, 1, bias=False),
nn.LeakyReLU(0.2, inplace=True),
# 2nd layer
nn.Conv2d(d_hidden, d_hidden * 2, 4, 2, 1, bias=False),
nn.BatchNorm2d(d_hidden * 2),
nn.LeakyReLU(0.2, inplace=True),
# 3rd layer
nn.Conv2d(d_hidden * 2, d_hidden * 4, 4, 2, 1, bias=False),
nn.BatchNorm2d(d_hidden * 4),
nn.LeakyReLU(0.2, inplace=True),
# 4th layer
nn.Conv2d(d_hidden * 4, d_hidden * 8, 4, 2, 1, bias=False),
nn.BatchNorm2d(d_hidden * 8),
nn.LeakyReLU(0.2, inplace=True),
# output layer
nn.Conv2d(d_hidden * 8, 1, 4, 1, 0, bias=False),
nn.Sigmoid(),
)
def forward(self, input):
return self.main(input).view(-1, 1).squeeze(1)
class GANTrainer(TorchTrainer):
"""Trainer class for GAN model using pytorch.
Args:
config (Union[Dict, TrainingConfiguration]): training configuration
containing hyperparameters.
epochs (int): number of training epochs.
discriminator (nn.Module): pytorch discriminator model to train GAN.
generator (nn.Module): pytorch generator model to train GAN.
strategy (Literal['ddp', 'deepspeed', 'horovod'], optional):
distributed strategy. Defaults to 'ddp'.
validation_every (Optional[int], optional): run a validation epoch
every ``validation_every`` epochs. Disabled if None. Defaults to 1.
test_every (Optional[int], optional): run a test epoch
every ``test_every`` epochs. Disabled if None. Defaults to None.
random_seed (Optional[int], optional): set random seed for
reproducibility. If None, the seed is not set. Defaults to None.
logger (Optional[Logger], optional): logger for ML tracking.
Defaults to None.
metrics (Optional[Dict[str, Metric]], optional): map of torch metrics
metrics. Defaults to None.
checkpoints_location (str): path to checkpoints directory.
Defaults to "checkpoints".
checkpoint_every (Optional[int]): save a checkpoint every
``checkpoint_every`` epochs. Disabled if None. Defaults to None.
name (Optional[str], optional): trainer custom name. Defaults to None.
"""
def __init__(
self,
config: Union[Dict, TrainingConfiguration],
epochs: int,
discriminator: nn.Module,
generator: nn.Module,
strategy: Literal["ddp", "deepspeed"] = "ddp",
validation_every: Optional[int] = 1,
test_every: Optional[int] = None,
random_seed: Optional[int] = None,
logger: Optional[Logger] = None,
metrics: Optional[Dict[str, Metric]] = None,
checkpoints_location: str = "checkpoints",
checkpoint_every: Optional[int] = None,
name: Optional[str] = None,
**kwargs,
) -> None:
super().__init__(
config=config,
epochs=epochs,
model=None,
strategy=strategy,
validation_every=validation_every,
test_every=test_every,
random_seed=random_seed,
logger=logger,
metrics=metrics,
checkpoints_location=checkpoints_location,
checkpoint_every=checkpoint_every,
name=name,
**kwargs,
)
self.save_parameters(**self.locals2params(locals()))
self.discriminator = discriminator
self.generator = generator
def create_model_loss_optimizer(self) -> None:
self.optimizerD = optim.Adam(
self.discriminator.parameters(), lr=self.config.lr, betas=(0.5, 0.999)
)
self.optimizerG = optim.Adam(
self.generator.parameters(), lr=self.config.lr, betas=(0.5, 0.999)
)
self.criterion = nn.BCELoss()
# https://stackoverflow.com/a/67437077
self.discriminator = torch.nn.SyncBatchNorm.convert_sync_batchnorm(self.discriminator)
self.generator = torch.nn.SyncBatchNorm.convert_sync_batchnorm(self.generator)
# First, define strategy-wise optional configurations
if isinstance(self.strategy, DeepSpeedStrategy):
# Batch size definition is not optional for DeepSpeedStrategy!
distribute_kwargs = dict(
config_params=dict(train_micro_batch_size_per_gpu=self.config.batch_size)
)
else:
distribute_kwargs = {}
# Distribute discriminator and its optimizer
self.discriminator, self.optimizerD, _ = self.strategy.distributed(
self.discriminator, self.optimizerD, **distribute_kwargs
)
self.generator, self.optimizerG, _ = self.strategy.distributed(
self.generator, self.optimizerG, **distribute_kwargs
)
self.discriminator.to(self.device)
self.generator.to(self.device)
def train_epoch(self, epoch: int):
self.discriminator.train()
self.generator.train()
gen_train_losses = []
disc_train_losses = []
disc_train_accuracy = []
for batch_idx, (real_images, _) in enumerate(self.train_dataloader):
lossG, lossD, accuracy_disc = self.train_step(real_images, batch_idx)
gen_train_losses.append(lossG)
disc_train_losses.append(lossD)
disc_train_accuracy.append(accuracy_disc)
self.train_glob_step += 1
# Aggregate and log losses and accuracy
avg_disc_accuracy = torch.mean(torch.stack(disc_train_accuracy))
self.log(
item=avg_disc_accuracy.item(),
identifier="disc_train_accuracy_per_epoch",
kind="metric",
step=epoch,
)
avg_gen_loss = torch.mean(torch.stack(gen_train_losses))
self.log(
item=avg_gen_loss.item(),
identifier="gen_train_loss_per_epoch",
kind="metric",
step=epoch,
)
avg_disc_loss = torch.mean(torch.stack(disc_train_losses))
self.log(
item=avg_disc_loss.item(),
identifier="disc_train_loss_per_epoch",
kind="metric",
step=epoch,
)
self.save_fake_generator_images(epoch)
def validation_epoch(self, epoch: int):
gen_validation_losses = []
gen_validation_accuracy = []
disc_validation_losses = []
disc_validation_accuracy = []
self.discriminator.eval()
self.generator.eval()
for batch_idx, (real_images, _) in enumerate(self.validation_dataloader):
loss_gen, accuracy_gen, loss_disc, accuracy_disc = self.validation_step(
real_images, batch_idx
)
gen_validation_losses.append(loss_gen)
gen_validation_accuracy.append(accuracy_gen)
disc_validation_losses.append(loss_disc)
disc_validation_accuracy.append(accuracy_disc)
self.validation_glob_step += 1
# Aggregate and log metrics
disc_validation_loss = torch.mean(torch.stack(disc_validation_losses))
self.log(
item=disc_validation_loss.item(),
identifier="disc_valid_loss_per_epoch",
kind="metric",
step=epoch,
)
disc_validation_accuracy = torch.mean(torch.stack(disc_validation_accuracy))
self.log(
item=disc_validation_accuracy.item(),
identifier="disc_valid_accuracy_epoch",
kind="metric",
step=epoch,
)
gen_validation_loss = torch.mean(torch.stack(gen_validation_losses))
self.log(
item=gen_validation_loss.item(),
identifier="gen_valid_loss_per_epoch",
kind="metric",
step=epoch,
)
gen_validation_accuracy = torch.mean(torch.stack(gen_validation_accuracy))
self.log(
item=gen_validation_accuracy.item(),
identifier="gen_valid_accuracy_epoch",
kind="metric",
step=epoch,
)
return gen_validation_loss
def train_step(self, real_images, batch_idx):
real_images = real_images.to(self.device)
batch_size = real_images.size(0)
real_labels = torch.ones((batch_size,), dtype=torch.float, device=self.device)
fake_labels = torch.zeros((batch_size,), dtype=torch.float, device=self.device)
# Train Discriminator with real images
output_real = self.discriminator(real_images)
lossD_real = self.criterion(output_real, real_labels)
# Generate fake images and train Discriminator
noise = torch.randn(batch_size, self.config.z_dim, 1, 1, device=self.device)
fake_images = self.generator(noise)
output_fake = self.discriminator(fake_images.detach())
lossD_fake = self.criterion(output_fake, fake_labels)
lossD = (lossD_real + lossD_fake) / 2
self.optimizerD.zero_grad()
lossD.backward()
self.optimizerD.step()
accuracy = ((output_real > 0.5).float() == real_labels).float().mean() + (
(output_fake < 0.5).float() == fake_labels
).float().mean()
accuracy_disc = accuracy.mean()
# Train Generator
output_fake = self.discriminator(fake_images)
lossG = self.criterion(output_fake, real_labels)
self.optimizerG.zero_grad()
lossG.backward()
self.optimizerG.step()
self.log(
item=accuracy_disc,
identifier="disc_train_accuracy_per_batch",
kind="metric",
step=self.train_glob_step,
batch_idx=batch_idx,
)
self.log(
item=lossG,
identifier="gen_train_loss_per_batch",
kind="metric",
step=self.train_glob_step,
batch_idx=batch_idx,
)
self.log(
item=lossD,
identifier="disc_train_loss_per_batch",
kind="metric",
step=self.train_glob_step,
batch_idx=batch_idx,
)
return lossG, lossD, accuracy_disc
def validation_step(self, real_images, batch_idx):
real_images = real_images.to(self.device)
batch_size = real_images.size(0)
real_labels = torch.ones((batch_size,), dtype=torch.float, device=self.device)
fake_labels = torch.zeros((batch_size,), dtype=torch.float, device=self.device)
# Validate with real images
output_real = self.discriminator(real_images)
loss_real = self.criterion(output_real, real_labels)
# Generate and validate fake images
noise = torch.randn(batch_size, self.config.z_dim, 1, 1, device=self.device)
with torch.no_grad():
fake_images = self.generator(noise)
output_fake = self.discriminator(fake_images.detach())
loss_fake = self.criterion(output_fake, fake_labels)
# Generator's attempt to fool the discriminator
loss_gen = self.criterion(output_fake, real_labels)
accuracy_gen = ((output_fake > 0.5).float() == real_labels).float().mean()
# Calculate total discriminator loss and accuracy
loss_disc = (loss_real + loss_fake) / 2
accuracy = ((output_real > 0.5).float() == real_labels).float().mean() + (
(output_fake < 0.5).float() == fake_labels
).float().mean()
accuracy_disc = accuracy.mean()
self.log(
item=loss_gen.item(),
identifier="gen_valid_loss_per_batch",
kind="metric",
step=self.validation_glob_step,
batch_idx=batch_idx,
)
self.log(
item=accuracy_gen.item(),
identifier="gen_valid_accuracy_per_batch",
kind="metric",
step=self.validation_glob_step,
batch_idx=batch_idx,
)
self.log(
item=loss_disc.item(),
identifier="disc_valid_loss_per_batch",
kind="metric",
step=self.validation_glob_step,
batch_idx=batch_idx,
)
self.log(
item=accuracy_disc,
identifier="disc_valid_accuracy_per_batch",
kind="metric",
step=self.validation_glob_step,
batch_idx=batch_idx,
)
return loss_gen, accuracy_gen, loss_disc, accuracy_disc
def save_checkpoint(self, name, epoch, loss=None):
"""Save training checkpoint with both optimizers."""
if not os.path.exists(self.checkpoints_location):
os.makedirs(self.checkpoints_location)
checkpoint_path = os.path.join(self.checkpoints_location, f"{name}")
checkpoint = {
"epoch": epoch,
"loss": loss.item() if loss is not None else None,
"discriminator_state_dict": self.discriminator.state_dict(),
"generator_state_dict": self.generator.state_dict(),
"optimizerD_state_dict": self.optimizerD.state_dict(),
"optimizerG_state_dict": self.optimizerG.state_dict(),
"lr_scheduler": (self.lr_scheduler.state_dict() if self.lr_scheduler else None),
}
torch.save(checkpoint, checkpoint_path)
print(f"Checkpoint saved to {checkpoint_path}")
def load_checkpoint(self, checkpoint_path):
"""Load models and optimizers from checkpoint."""
checkpoint = torch.load(checkpoint_path)
self.discriminator.load_state_dict(checkpoint["discriminator_state_dict"])
self.generator.load_state_dict(checkpoint["generator_state_dict"])
self.optimizerD.load_state_dict(checkpoint["optimizerD_state_dict"])
self.optimizerG.load_state_dict(checkpoint["optimizerG_state_dict"])
if "lr_scheduler" in checkpoint and checkpoint["lr_scheduler"] is not None:
self.lr_scheduler.load_state_dict(checkpoint["lr_scheduler"])
print(f"Checkpoint loaded from {checkpoint_path}")
def save_fake_generator_images(self, epoch):
self.generator.eval()
noise = torch.randn(64, self.config.z_dim, 1, 1, device=self.device)
fake_images = self.generator(noise)
fake_images_grid = torchvision.utils.make_grid(fake_images, normalize=True)
fig, ax = plt.subplots(figsize=(8, 8))
ax.set_axis_off()
ax.set_title(f"Fake images for epoch {epoch}")
ax.imshow(np.transpose(fake_images_grid.cpu().numpy(), (1, 2, 0)))
self.log(
item=fig,
identifier=f"fake_images_epoch_{epoch}.png",
kind="figure",
step=epoch,
)
def main():
parser = argparse.ArgumentParser(description="PyTorch MNIST GAN Example")
parser.add_argument(
"--batch-size",
type=int,
default=128,
help="input batch size for training (default: 128)",
)
parser.add_argument(
"--epochs", type=int, default=15, help="number of epochs to train (default: 15)"
)
parser.add_argument(
"--strategy", type=str, default="ddp", help="distributed strategy (default=ddp)"
)
parser.add_argument(
"--lr", type=float, default=0.001, help="learning rate (default: 0.001)"
)
parser.add_argument("--seed", type=int, default=1, help="random seed (default: 1)")
parser.add_argument(
"--ckpt-interval",
type=int,
default=2,
help="how many batches to wait before logging training status",
)
args = parser.parse_args()
torch.manual_seed(args.seed)
# Dataset
transform = transforms.Compose(
[
transforms.Resize(64),
transforms.ToTensor(),
transforms.Normalize((0.5,), (0.5,)),
]
)
train_dataset = datasets.MNIST("../data", train=True, download=True, transform=transform)
validation_dataset = datasets.MNIST("../data", train=False, transform=transform)
def weights_init(m):
classname = m.__class__.__name__
if classname.find("Conv") != -1:
m.weight.data.normal_(0.0, 0.02)
elif classname.find("BatchNorm") != -1:
m.weight.data.normal_(1.0, 0.02)
m.bias.data.fill_(0)
# Models
netG = Generator(z_dim=100, g_hidden=64, image_channel=1)
netG.apply(weights_init)
netD = Discriminator(d_hidden=64, image_channel=1)
netD.apply(weights_init)
# Training configuration
training_config = TrainingConfiguration(
batch_size=args.batch_size, lr=args.lr, epochs=args.epochs, z_dim=100
)
# Logger
logger = MLFlowLogger(experiment_name="Distributed GAN MNIST", log_freq=10)
# Trainer
trainer = GANTrainer(
config=training_config,
discriminator=netD,
generator=netG,
strategy=args.strategy,
epochs=args.epochs,
random_seed=args.seed,
logger=logger,
)
# Launch training
_, _, _, trained_model = trainer.execute(train_dataset, validation_dataset, None)
if __name__ == "__main__":
main()
4.2.2. simpleGAN.pyο
# --------------------------------------------------------------------------------------
# Part of the interTwin Project: https://www.intertwin.eu/
#
# Created by: Henry Mutegeki
#
# Credit:
# - Henry Mutegeki <henry.mutegeki@cern.ch> - CERN
# - Matteo Bunino <matteo.bunino@cern.ch> - CERN
# --------------------------------------------------------------------------------------
import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.backends.cudnn as cudnn
import torch.nn as nn
import torch.nn.parallel
import torch.optim as optim
import torch.utils.data
import torchvision.datasets as dset
import torchvision.transforms as transforms
import torchvision.utils as vutils
DATA_PATH = "../data"
BATCH_SIZE = 128
IMAGE_CHANNEL = 1
Z_DIM = 100
G_HIDDEN = 64
X_DIM = 64
D_HIDDEN = 64
EPOCH_NUM = 10
REAL_LABEL = 1
FAKE_LABEL = 0
lr = 2e-4
seed = 1
USE_CUDA = False
CUDA = torch.cuda.is_available() and USE_CUDA
device = torch.device("cuda:0" if CUDA else "cpu")
print("PyTorch version: {}".format(torch.__version__))
if CUDA:
print("CUDA version: {}\n".format(torch.version.cuda))
torch.cuda.manual_seed(seed)
cudnn.benchmark = True
# Data preprocessing
dataset = dset.MNIST(
root=DATA_PATH,
download=False,
transform=transforms.Compose(
[transforms.Resize(X_DIM), transforms.ToTensor(), transforms.Normalize((0.5,), (0.5,))]
),
)
# Dataloader
dataloader = torch.utils.data.DataLoader(
dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=2
)
def weights_init(m):
classname = m.__class__.__name__
if classname.find("Conv") != -1:
m.weight.data.normal_(0.0, 0.02)
elif classname.find("BatchNorm") != -1:
m.weight.data.normal_(1.0, 0.02)
m.bias.data.fill_(0)
class Generator(nn.Module):
def __init__(self):
super(Generator, self).__init__()
self.main = nn.Sequential(
# input layer
nn.ConvTranspose2d(Z_DIM, G_HIDDEN * 8, 4, 1, 0, bias=False),
nn.BatchNorm2d(G_HIDDEN * 8),
nn.ReLU(True),
# 1st hidden layer
nn.ConvTranspose2d(G_HIDDEN * 8, G_HIDDEN * 4, 4, 2, 1, bias=False),
nn.BatchNorm2d(G_HIDDEN * 4),
nn.ReLU(True),
# 2nd hidden layer
nn.ConvTranspose2d(G_HIDDEN * 4, G_HIDDEN * 2, 4, 2, 1, bias=False),
nn.BatchNorm2d(G_HIDDEN * 2),
nn.ReLU(True),
# 3rd hidden layer
nn.ConvTranspose2d(G_HIDDEN * 2, G_HIDDEN, 4, 2, 1, bias=False),
nn.BatchNorm2d(G_HIDDEN),
nn.ReLU(True),
# output layer
nn.ConvTranspose2d(G_HIDDEN, IMAGE_CHANNEL, 4, 2, 1, bias=False),
nn.Tanh(),
)
def forward(self, input):
return self.main(input)
class Discriminator(nn.Module):
def __init__(self):
super(Discriminator, self).__init__()
self.main = nn.Sequential(
# 1st layer
nn.Conv2d(IMAGE_CHANNEL, D_HIDDEN, 4, 2, 1, bias=False),
nn.LeakyReLU(0.2, inplace=True),
# 2nd layer
nn.Conv2d(D_HIDDEN, D_HIDDEN * 2, 4, 2, 1, bias=False),
nn.BatchNorm2d(D_HIDDEN * 2),
nn.LeakyReLU(0.2, inplace=True),
# 3rd layer
nn.Conv2d(D_HIDDEN * 2, D_HIDDEN * 4, 4, 2, 1, bias=False),
nn.BatchNorm2d(D_HIDDEN * 4),
nn.LeakyReLU(0.2, inplace=True),
# 4th layer
nn.Conv2d(D_HIDDEN * 4, D_HIDDEN * 8, 4, 2, 1, bias=False),
nn.BatchNorm2d(D_HIDDEN * 8),
nn.LeakyReLU(0.2, inplace=True),
# output layer
nn.Conv2d(D_HIDDEN * 8, 1, 4, 1, 0, bias=False),
nn.Sigmoid(),
)
def forward(self, input):
return self.main(input).view(-1, 1).squeeze(1)
# Create the generator
netG = Generator().to(device)
netG.apply(weights_init)
print(netG)
# Create the discriminator
netD = Discriminator().to(device)
netD.apply(weights_init)
print(netD)
# Initialize BCELoss function
criterion = nn.BCELoss()
# Create batch of latent vectors to visualize the progression of the generator
viz_noise = torch.randn(BATCH_SIZE, Z_DIM, 1, 1, device=device)
# Setup Adam optimizers for both G and D
optimizerD = optim.Adam(netD.parameters(), lr=lr, betas=(0.5, 0.999))
optimizerG = optim.Adam(netG.parameters(), lr=lr, betas=(0.5, 0.999))
# Training Loop
def train_GAN_model(EPOCH_NUM, netD, netG, optimizerG, optimizerD, dataloader, criterion):
img_list = []
G_losses = []
D_losses = []
iters = 0
for epoch in range(EPOCH_NUM):
for i, data in enumerate(dataloader, 0):
# (1) Update the discriminator with real data
netD.zero_grad()
# Format batch
real_cpu = data[0].to(device)
b_size = real_cpu.size(0)
label = torch.full((b_size,), REAL_LABEL, dtype=torch.float, device=device)
# Forward pass real batch through D
output = netD(real_cpu).view(-1)
# Calculate loss on all-real batch
errD_real = criterion(output, label)
# Calculate gradients for D in backward pass
errD_real.backward()
D_x = output.mean().item()
# (2) Update the discriminator with fake data
# Generate batch of latent vectors
noise = torch.randn(b_size, Z_DIM, 1, 1, device=device)
# Generate fake image batch with G
fake = netG(noise)
label.fill_(FAKE_LABEL)
# Classify all fake batch with D
output = netD(fake.detach()).view(-1)
# Calculate D's loss on the all-fake batch
errD_fake = criterion(output, label)
# Calculate the gradients for this batch, accumulated (summed)
# with previous gradients
errD_fake.backward()
D_G_z1 = output.mean().item()
# Compute error of D as sum over the fake and the real batches
errD = errD_real + errD_fake
# Update D
optimizerD.step()
# (3) Update the generator with fake data
netG.zero_grad()
label.fill_(REAL_LABEL) # fake labels are real for generator cost
# Since we just updated D, perform another forward pass of
# all-fake batch through D
output = netD(fake).view(-1)
# Calculate G's loss based on this output
errG = criterion(output, label)
# Calculate gradients for G
errG.backward()
D_G_z2 = output.mean().item()
# Update G
optimizerG.step()
# Output training stats
if i % 50 == 0:
print(
(
"[ % d/%d][%d/%d]\tLoss_D: % .4f\tLoss_G:"
" % .4f\tD(x): % .4f\tD(G(z)): % .4f / %.4f"
)
% (
epoch,
EPOCH_NUM,
i,
len(dataloader),
errD.item(),
errG.item(),
D_x,
D_G_z1,
D_G_z2,
)
)
# Save Losses for plotting later
G_losses.append(errG.item())
D_losses.append(errD.item())
# Check how the generator is doing
if (iters % 500 == 0) or ((epoch == EPOCH_NUM - 1) and (i == len(dataloader) - 1)):
with torch.no_grad():
fake = netG(viz_noise).detach().cpu()
img_list.append(vutils.make_grid(fake, padding=2, normalize=True))
iters += 1
plt.figure(figsize=(10, 5))
plt.title("Generator and Discriminator Loss During Training")
plt.plot(G_losses, label="G")
plt.plot(D_losses, label="D")
plt.xlabel("iterations")
plt.ylabel("Loss")
plt.legend()
plt.show()
# plt.savefig('simpleGANlearning_curve.png')
# Grab a batch of real images from the dataloader
real_batch = next(iter(dataloader))
# Plot the real images
plt.figure(figsize=(15, 15))
plt.subplot(1, 2, 1)
plt.axis("off")
plt.title("Real Images")
plt.imshow(
np.transpose(
vutils.make_grid(real_batch[0].to(device)[:64], padding=5, normalize=True).cpu(),
(1, 2, 0),
)
)
# plt.savefig('simpleGANreal_image.png')
# Plot the fake images from the last epoch
plt.subplot(1, 2, 2)
plt.axis("off")
plt.title("Fake Images")
plt.imshow(np.transpose(img_list[-1], (1, 2, 0)))
plt.show()
# plt.savefig('simpleGANfake_image.png')
train_GAN_model(EPOCH_NUM, netD, netG, optimizerG, optimizerD, dataloader, criterion)
4.3. Shell scriptsο
4.3.1. slurm.shο
#!/bin/bash
# SLURM jobscript for JSC systems
# Job configuration
#SBATCH --job-name=distributed_training
#SBATCH --account=intertwin
#SBATCH --mail-user=
#SBATCH --mail-type=ALL
#SBATCH --output=job.out
#SBATCH --error=job.err
#SBATCH --time=00:30:00
# Resources allocation
#SBATCH --partition=batch
#SBATCH --nodes=2
#SBATCH --gpus-per-node=4
#SBATCH --cpus-per-gpu=4
#SBATCH --exclusive
# gres options have to be disabled for deepv
#SBATCH --gres=gpu:4
# Load environment modules
ml Stages/2024 GCC OpenMPI CUDA/12 MPI-settings/CUDA Python HDF5 PnetCDF libaio mpi4py
# Job info
echo "DEBUG: TIME: $(date)"
sysN="$(uname -n | cut -f2- -d.)"
sysN="${sysN%%[0-9]*}"
echo "Running on system: $sysN"
echo "DEBUG: EXECUTE: $EXEC"
echo "DEBUG: SLURM_SUBMIT_DIR: $SLURM_SUBMIT_DIR"
echo "DEBUG: SLURM_JOB_ID: $SLURM_JOB_ID"
echo "DEBUG: SLURM_JOB_NODELIST: $SLURM_JOB_NODELIST"
echo "DEBUG: SLURM_NNODES: $SLURM_NNODES"
echo "DEBUG: SLURM_NTASKS: $SLURM_NTASKS"
echo "DEBUG: SLURM_TASKS_PER_NODE: $SLURM_TASKS_PER_NODE"
echo "DEBUG: SLURM_SUBMIT_HOST: $SLURM_SUBMIT_HOST"
echo "DEBUG: SLURMD_NODENAME: $SLURMD_NODENAME"
echo "DEBUG: CUDA_VISIBLE_DEVICES: $CUDA_VISIBLE_DEVICES"
if [ "$DEBUG" = true ] ; then
echo "DEBUG: NCCL_DEBUG=INFO"
export NCCL_DEBUG=INFO
fi
echo
# Setup env for distributed ML
export CUDA_VISIBLE_DEVICES="0,1,2,3"
export OMP_NUM_THREADS=1
if [ "$SLURM_CPUS_PER_GPU" -gt 0 ] ; then
export OMP_NUM_THREADS=$SLURM_CPUS_PER_GPU
fi
# Env vairables check
if [ -z "$DIST_MODE" ]; then
>&2 echo "ERROR: env variable DIST_MODE is not set. Allowed values are 'horovod', 'ddp' or 'deepspeed'"
exit 1
fi
if [ -z "$RUN_NAME" ]; then
>&2 echo "WARNING: env variable RUN_NAME is not set. It's a way to identify some specific run of an experiment."
RUN_NAME=$DIST_MODE
fi
if [ -z "$TRAINING_CMD" ]; then
>&2 echo "ERROR: env variable TRAINING_CMD is not set. It's the python command to execute."
exit 1
fi
if [ -z "$PYTHON_VENV" ]; then
>&2 echo "WARNING: env variable PYTHON_VENV is not set. It's the path to a python virtual environment."
else
# Activate Python virtual env
source $PYTHON_VENV/bin/activate
fi
# Get GPUs info per node
srun --cpu-bind=none --ntasks-per-node=1 bash -c 'echo -e "NODE hostname: $(hostname)\n$(nvidia-smi)\n\n"'
# Launch training
if [ "$DIST_MODE" == "ddp" ] ; then
echo "DDP training: $TRAINING_CMD"
srun --cpu-bind=none --ntasks-per-node=1 \
bash -c "torchrun \
--log_dir='logs_torchrun' \
--nnodes=$SLURM_NNODES \
--nproc_per_node=$SLURM_GPUS_PER_NODE \
--rdzv_id=$SLURM_JOB_ID \
--rdzv_conf=is_host=\$(((SLURM_NODEID)) && echo 0 || echo 1) \
--rdzv_backend=c10d \
--rdzv_endpoint='$(scontrol show hostnames "$SLURM_JOB_NODELIST" | head -n 1)'i:29500 \
$TRAINING_CMD"
elif [ "$DIST_MODE" == "deepspeed" ] ; then
echo "DEEPSPEED training: $TRAINING_CMD"
MASTER_ADDR=$(scontrol show hostnames "\$SLURM_JOB_NODELIST" | head -n 1)i
export MASTER_ADDR
export MASTER_PORT=29500
srun --cpu-bind=none --ntasks-per-node=$SLURM_GPUS_PER_NODE --cpus-per-task=$SLURM_CPUS_PER_GPU \
python -u $TRAINING_CMD
# # Run with deepspeed launcher: set --ntasks-per-node=1
# # https://www.deepspeed.ai/getting-started/#multi-node-environment-variables
# export NCCL_IB_DISABLE=1
# export NCCL_SOCKET_IFNAME=eth0
# nodelist=$(scontrol show hostname $SLURM_NODELIST)
# echo "$nodelist" | sed -e 's/$/ slots=4/' > .hostfile
# # Requires passwordless SSH access among compute node
# srun --cpu-bind=none deepspeed --hostfile=.hostfile $TRAINING_CMD --deepspeed
# rm .hostfile
elif [ "$DIST_MODE" == "horovod" ] ; then
echo "HOROVOD training: $TRAINING_CMD"
srun --cpu-bind=none --ntasks-per-node=$SLURM_GPUS_PER_NODE --cpus-per-task=$SLURM_CPUS_PER_GPU \
python -u $TRAINING_CMD
else
>&2 echo "ERROR: unrecognized \$DIST_MODE env variable"
exit 1
fi
4.3.2. runall.shο
#!/bin/bash
# --------------------------------------------------------------------------------------
# Part of the interTwin Project: https://www.intertwin.eu/
#
# Created by: Matteo Bunino
#
# Credit:
# - Matteo Bunino <matteo.bunino@cern.ch> - CERN
# --------------------------------------------------------------------------------------
# Python virtual environment
PYTHON_VENV="../../../envAI_hdfml"
# Clear SLURM logs (*.out and *.err files)
rm -rf logs_slurm
mkdir logs_slurm
rm -rf logs_torchrun
# DDP itwinai
DIST_MODE="ddp"
RUN_NAME="ddp-itwinai"
TRAINING_CMD="train.py --strategy ddp"
sbatch --export=ALL,DIST_MODE="$DIST_MODE",RUN_NAME="$RUN_NAME",TRAINING_CMD="$TRAINING_CMD",PYTHON_VENV="$PYTHON_VENV" \
--job-name="$RUN_NAME-n$N" \
--output="logs_slurm/job-$RUN_NAME-n$N.out" \
--error="logs_slurm/job-$RUN_NAME-n$N.err" \
slurm.sh
# DeepSpeed itwinai
DIST_MODE="deepspeed"
RUN_NAME="deepspeed-itwinai"
TRAINING_CMD="train.py --strategy deepspeed"
sbatch --export=ALL,DIST_MODE="$DIST_MODE",RUN_NAME="$RUN_NAME",TRAINING_CMD="$TRAINING_CMD",PYTHON_VENV="$PYTHON_VENV" \
--job-name="$RUN_NAME-n$N" \
--output="logs_slurm/job-$RUN_NAME-n$N.out" \
--error="logs_slurm/job-$RUN_NAME-n$N.err" \
slurm.sh