7. Tutorial on Kubeflow and TorchTrainer class
7.1. Distributed Machine Learning on Kubernetes with Kubeflow
Author(s): Matteo Bunino (CERN)
This tutorial shows how to run distributed machine learning on Kubernetes when an HPC cluster managed
by SLURM is unavailable. It provides a flexible solution for both on-premises and cloud-based Kubernetes
deployments, including access to GPU nodes for scalable, high-performance training workloads.
It demonstrates running distributed machine learning (ML) on Kubernetes using
Kubeflow’s training operator
for PyTorch and itwinai’s TorchTrainer.
We will only use kubectl and pod manifests to launch jobs, requiring minimal setup beyond
access to a Kubernetes cluster with a few nodes. The Python SDK is beyond this guide’s scope,
but you can explore Kubeflow’s
getting started tutorial
for more details.
7.1.1. Installing Kubeflow’s Training Operator
First, install the training operator. Python SDK is not needed for this tutorial.
Example for v1.8.1:
kubectl apply --server-side -k "github.com/kubeflow/training-operator.git/manifests/overlays/standalone?ref=v1.8.1"
Check that the training operator is running:
kubectl get pods -n kubeflow
Output should include:
NAME READY STATUS RESTARTS AGE
training-operator-6f4d5d95f8-spfgx 1/1 Running 0 11d
Before proceeding, familiarize yourself with how PyTorchJob works. In brief:
The PyTorchJob sets environment variables for
torchrun.The Python script should be invoked using
torchrunin the pod manifest. ThetorchrunCLI will make sure that the correct number of worker processes (i.e., replicas of your Python process) per pod are spawned. Example:containers: - name: pytorch image: registry.cern.ch/itwinai/dist-ml/itwinai-slim:0.0.7 command: - "torchrun" - "/app/main.py"
Set the number of processes per node using
nProcPerNode
. It maps to torchrun’s --nproc-per-node.
7.1.1.1. Creating a PyTorchJob
To submit a job, use:
kubectl create -n kubeflow -f job-manifest.yaml
When creating a PyTorchJob, the Worker pods will wait for the Master to be created first. To manage both Master and Worker pods use:
# Inspect some pods
kubectl describe pod torchrun-cpu-worker-0 -n kubeflow
kubectl describe pod torchrun-cpu-master-0 -n kubeflow
# Get the logs from the pods
kubectl logs torchrun-cpu-master-0 -n kubeflow
kubectl logs torchrun-cpu-worker-0 -n kubeflow
Delete all PyTorchJobs:
kubectl delete --all pytorchjobs -n kubeflow
To remove the training operator:
kubectl delete deployment training-operator -n kubeflow
7.1.2. Distributed Training on CPU
To get started with distributed ML using Kubeflow and itwinai, a GPU cluster is not required.
The PyTorchJob manifest for CPU-based training is defined in cpu.yaml. First, build and
push a Docker container using the provided Dockerfile, then update the manifest with
your container’s image name.
The manifest sets nProcPerNode: "2", which specifies two worker processes per pod.
Adjust this for different levels of parallelism, corresponding to the
--nproc-per-node flag of torchrun.
There are two levels of parallelism:
Pod-level parallelism: Controlled by the number of
replicasin the PyTorchJob.Process-level parallelism: Controlled by
nProcPerNodefor multiple subprocesses per pod.
Using nProcPerNode > 1 allows two levels of parallelism. Each pod runs on a different node,
spawning as many processes as hardware accelerators (like GPUs). Parallelism is:
nProcPerNode * TOTAL_PODS.
Alternatively, setting nProcPerNode: "1" uses pod replicas to control parallelism,
with one pod per distributed ML worker. However, this may be less efficient (e.g., when
using persistent storage).
7.1.3. Distributed Training on GPU
Note
This part has not been extensively tested and is still under development.
To access GPU nodes, add the following request to the containers spec in the job manifest:
resources:
limits:
nvidia.com/gpu: 1
Example:
...
containers:
- name: pytorch
image: registry.cern.ch/itwinai/dist-ml/itwinai-slim:0.0.7
command:
- "torchrun"
- "/app/main.py"
resources:
limits:
nvidia.com/gpu: 1
To allocate a full node to a pod, set the number of requested GPUs
equal to the number of GPUs available on the node, and adjust the number of replicas,
nProcPerNode, accordingly.
7.2. train-cpu.py
# --------------------------------------------------------------------------------------
# Part of the interTwin Project: https://www.intertwin.eu/
#
# Created by: Matteo Bunino
#
# Credit:
# - Matteo Bunino <matteo.bunino@cern.ch> - CERN
# --------------------------------------------------------------------------------------
"""Adapted from: https://github.com/pytorch/examples/blob/main/mnist/main.py"""
import argparse
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchmetrics
from torch.utils.data import Subset
from torchvision import datasets, transforms
from itwinai.loggers import MLFlowLogger
from itwinai.torch.config import TrainingConfiguration
from itwinai.torch.trainer import TorchTrainer
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 32, 3, 1)
self.conv2 = nn.Conv2d(32, 64, 3, 1)
self.dropout1 = nn.Dropout(0.25)
self.dropout2 = nn.Dropout(0.5)
self.fc1 = nn.Linear(9216, 128)
self.fc2 = nn.Linear(128, 10)
def forward(self, x):
x = self.conv1(x)
x = F.relu(x)
x = self.conv2(x)
x = F.relu(x)
x = F.max_pool2d(x, 2)
x = self.dropout1(x)
x = torch.flatten(x, 1)
x = self.fc1(x)
x = F.relu(x)
x = self.dropout2(x)
x = self.fc2(x)
return x
def main():
# Training settings
parser = argparse.ArgumentParser(description="PyTorch MNIST Example")
parser.add_argument(
"--batch-size",
type=int,
default=64,
help="input batch size for training (default: 64)",
)
parser.add_argument(
"--subset",
type=int,
default=1000,
help="maximum size for both training and validation datasets.",
)
parser.add_argument(
"--epochs", type=int, default=14, help="number of epochs to train (default: 14)"
)
parser.add_argument(
"--strategy", type=str, default="ddp", help="distributed strategy (default: ddp)"
)
parser.add_argument("--lr", type=float, default=1.0, help="learning rate (default: 1.0)")
parser.add_argument("--seed", type=int, default=1, help="random seed (default: 1)")
parser.add_argument(
"--ckpt-interval",
type=int,
default=10,
help="how many batches to wait before logging training status",
)
parser.add_argument(
"--force-dist",
action="store_true",
help="Force distributed ML on CPU-only envs (default is False)",
)
args = parser.parse_args()
# Force distributed ML on CPUs
os.environ["ITWINAI_FORCE_DIST"] = "1" if args.force_dist else "0"
transform = transforms.Compose(
[transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]
)
is_main_worker = (
not os.environ.get("LOCAL_RANK")
or os.environ.get("LOCAL_RANK")
and os.environ["LOCAL_RANK"] == "0"
)
if is_main_worker:
# Dataset creation: pull dataset only in main worker
train_dataset = datasets.MNIST(
"data",
train=True,
download=not os.path.exists("DATASET_READY"),
transform=transform,
)
validation_dataset = datasets.MNIST(
"data",
train=False,
download=not os.path.exists("DATASET_READY"),
transform=transform,
)
open("DATASET_READY", "w")
else:
import time
while not os.path.exists("DATASET_READY"):
# Wait for the dataset to be downloaded
time.sleep(1)
# Dataset creation
train_dataset = datasets.MNIST("data", train=True, download=False, transform=transform)
validation_dataset = datasets.MNIST(
"data", train=False, download=False, transform=transform
)
# Subset dataset
train_dataset = Subset(train_dataset, range(args.subset))
validation_dataset = Subset(validation_dataset, range(args.subset))
# Neural network to train
model = Net()
training_config = TrainingConfiguration(
batch_size=args.batch_size,
optim_lr=args.lr,
optimizer="adadelta",
loss="cross_entropy",
dist_backend="gloo",
num_workers_dataloader=0,
)
logger = MLFlowLogger(experiment_name="mnist-tutorial", log_freq=10)
metrics = {
"accuracy": torchmetrics.Accuracy(task="multiclass", num_classes=10),
"precision": torchmetrics.Precision(task="multiclass", num_classes=10),
}
trainer = TorchTrainer(
config=training_config,
model=model,
metrics=metrics,
logger=logger,
strategy=args.strategy,
epochs=args.epochs,
random_seed=args.seed,
checkpoint_every=args.ckpt_interval,
)
# Launch training
_, _, _, trained_model = trainer.execute(train_dataset, validation_dataset, None)
if __name__ == "__main__":
main()
7.3. cpu.yaml
# --------------------------------------------------------------------------------------
# Part of the interTwin Project: https://www.intertwin.eu/
#
# Created by: Matteo Bunino
#
# Credit:
# - Matteo Bunino <matteo.bunino@cern.ch> - CERN
# --------------------------------------------------------------------------------------
apiVersion: "kubeflow.org/v1"
kind: PyTorchJob
metadata:
name: torchrun-cpu
spec:
# This property assumes that each pod runs on a separate node,
# and is propagated to torchrun as its --nproc-per-node argument
nprocPerNode: "2"
pytorchReplicaSpecs:
Master:
# Usually only one Master pod is used
replicas: 1
restartPolicy: OnFailure
template:
spec:
containers:
- name: pytorch
image: registry.cern.ch/itwinai/dist-ml/itwinai-slim:0.0.10
command:
- "torchrun"
- "/app/train-cpu.py"
- "--force-dist"
resources:
# Requests help to implicitly make sure that each pod is running
# in a separate node.
requests:
cpu: 1500m
limits:
cpu: 1500m
memory: 2500Mi
Worker:
# The number of worker pods
replicas: 1
restartPolicy: OnFailure
template:
spec:
containers:
- name: pytorch
image: registry.cern.ch/itwinai/dist-ml/itwinai-slim:0.0.10
command:
- "torchrun"
- "/app/train-cpu.py"
- "--force-dist"
resources:
requests:
cpu: 1500m
limits:
cpu: 1500m
memory: 2500Mi
7.4. Dockerfile
# --------------------------------------------------------------------------------------
# Part of the interTwin Project: https://www.intertwin.eu/
#
# Created by: Matteo Bunino
#
# Credit:
# - Matteo Bunino <matteo.bunino@cern.ch> - CERN
# --------------------------------------------------------------------------------------
FROM python:3.11-slim-bullseye
WORKDIR /app
RUN apt-get update && apt-get install -y \
git \
&& apt-get clean -y && rm -rf /var/lib/apt/lists/*
COPY pyproject.toml pyproject.toml
COPY src src
RUN pip install --no-cache-dir --upgrade pip setuptools wheel \
&& pip install --no-cache-dir ".[torch]" --extra-index-url https://download.pytorch.org/whl/cpu
COPY tutorials/distributed-ml/torch-k8s/train-cpu.py train-cpu.py
LABEL org.opencontainers.image.authors="Matteo Bunino - matteo.bunino@cern.ch"
LABEL org.opencontainers.image.url="https://github.com/interTwin-eu/itwinai"
LABEL org.opencontainers.image.documentation="https://itwinai.readthedocs.io/"
LABEL org.opencontainers.image.source="https://github.com/interTwin-eu/itwinai"
LABEL org.opencontainers.image.vendor="CERN - European Organization for Nuclear Research"