7. Tutorial on Kubeflow and TorchTrainer class

7.1. Distributed Machine Learning on Kubernetes with Kubeflow

Author(s): Matteo Bunino (CERN)

This tutorial shows how to run distributed machine learning on Kubernetes when an HPC cluster managed by SLURM is unavailable. It provides a flexible solution for both on-premises and cloud-based Kubernetes deployments, including access to GPU nodes for scalable, high-performance training workloads. It demonstrates running distributed machine learning (ML) on Kubernetes using Kubeflow’s training operator for PyTorch and itwinai’s TorchTrainer.

We will only use kubectl and pod manifests to launch jobs, requiring minimal setup beyond access to a Kubernetes cluster with a few nodes. The Python SDK is beyond this guide’s scope, but you can explore Kubeflow’s getting started tutorial for more details.

7.1.1. Installing Kubeflow’s Training Operator

First, install the training operator. Python SDK is not needed for this tutorial.

Example for v1.8.1:

kubectl apply --server-side -k "github.com/kubeflow/training-operator.git/manifests/overlays/standalone?ref=v1.8.1"

Check that the training operator is running:

kubectl get pods -n kubeflow

Output should include:

NAME                                 READY   STATUS    RESTARTS   AGE
training-operator-6f4d5d95f8-spfgx   1/1     Running   0          11d

Before proceeding, familiarize yourself with how PyTorchJob works. In brief:

  1. The PyTorchJob sets environment variables for torchrun.

  2. The Python script should be invoked using torchrun in the pod manifest. The torchrun CLI will make sure that the correct number of worker processes (i.e., replicas of your Python process) per pod are spawned. Example:

    containers:
    - name: pytorch
        image: registry.cern.ch/itwinai/dist-ml/itwinai-slim:0.0.7
        command:
        - "torchrun"
        - "/app/main.py"
    

Set the number of processes per node using nProcPerNode . It maps to torchrun’s --nproc-per-node.

7.1.1.1. Creating a PyTorchJob

To submit a job, use:

kubectl create -n kubeflow -f job-manifest.yaml

When creating a PyTorchJob, the Worker pods will wait for the Master to be created first. To manage both Master and Worker pods use:

# Inspect some pods
kubectl describe pod torchrun-cpu-worker-0 -n kubeflow
kubectl describe pod torchrun-cpu-master-0 -n kubeflow

# Get the logs from the pods
kubectl logs torchrun-cpu-master-0 -n kubeflow
kubectl logs torchrun-cpu-worker-0 -n kubeflow

Delete all PyTorchJobs:

kubectl delete --all pytorchjobs -n kubeflow

To remove the training operator:

kubectl delete deployment training-operator -n kubeflow

7.1.2. Distributed Training on CPU

To get started with distributed ML using Kubeflow and itwinai, a GPU cluster is not required. The PyTorchJob manifest for CPU-based training is defined in cpu.yaml. First, build and push a Docker container using the provided Dockerfile, then update the manifest with your container’s image name.

The manifest sets nProcPerNode: "2", which specifies two worker processes per pod. Adjust this for different levels of parallelism, corresponding to the --nproc-per-node flag of torchrun.

There are two levels of parallelism:

  • Pod-level parallelism: Controlled by the number of replicas in the PyTorchJob.

  • Process-level parallelism: Controlled by nProcPerNode for multiple subprocesses per pod.

Using nProcPerNode > 1 allows two levels of parallelism. Each pod runs on a different node, spawning as many processes as hardware accelerators (like GPUs). Parallelism is: nProcPerNode * TOTAL_PODS.

Alternatively, setting nProcPerNode: "1" uses pod replicas to control parallelism, with one pod per distributed ML worker. However, this may be less efficient (e.g., when using persistent storage).

7.1.3. Distributed Training on GPU

Note

This part has not been extensively tested and is still under development.

To access GPU nodes, add the following request to the containers spec in the job manifest:

resources:
  limits:
    nvidia.com/gpu: 1

Example:

  ...
  containers:
  - name: pytorch
      image: registry.cern.ch/itwinai/dist-ml/itwinai-slim:0.0.7
      command:
      - "torchrun"
      - "/app/main.py"
      resources:
        limits:
          nvidia.com/gpu: 1

To allocate a full node to a pod, set the number of requested GPUs equal to the number of GPUs available on the node, and adjust the number of replicas, nProcPerNode, accordingly.

7.2. train-cpu.py

# --------------------------------------------------------------------------------------
# Part of the interTwin Project: https://www.intertwin.eu/
#
# Created by: Matteo Bunino
#
# Credit:
# - Matteo Bunino <matteo.bunino@cern.ch> - CERN
# --------------------------------------------------------------------------------------

"""Adapted from: https://github.com/pytorch/examples/blob/main/mnist/main.py"""

import argparse
import os

import torch
import torch.nn as nn
import torch.nn.functional as F
import torchmetrics
from torch.utils.data import Subset
from torchvision import datasets, transforms

from itwinai.loggers import MLFlowLogger
from itwinai.torch.config import TrainingConfiguration
from itwinai.torch.trainer import TorchTrainer


class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, 3, 1)
        self.conv2 = nn.Conv2d(32, 64, 3, 1)
        self.dropout1 = nn.Dropout(0.25)
        self.dropout2 = nn.Dropout(0.5)
        self.fc1 = nn.Linear(9216, 128)
        self.fc2 = nn.Linear(128, 10)

    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(x)
        x = self.conv2(x)
        x = F.relu(x)
        x = F.max_pool2d(x, 2)
        x = self.dropout1(x)
        x = torch.flatten(x, 1)
        x = self.fc1(x)
        x = F.relu(x)
        x = self.dropout2(x)
        x = self.fc2(x)
        return x


def main():
    # Training settings
    parser = argparse.ArgumentParser(description="PyTorch MNIST Example")
    parser.add_argument(
        "--batch-size",
        type=int,
        default=64,
        help="input batch size for training (default: 64)",
    )
    parser.add_argument(
        "--subset",
        type=int,
        default=1000,
        help="maximum size for both training and validation datasets.",
    )
    parser.add_argument(
        "--epochs", type=int, default=14, help="number of epochs to train (default: 14)"
    )
    parser.add_argument(
        "--strategy", type=str, default="ddp", help="distributed strategy (default: ddp)"
    )
    parser.add_argument("--lr", type=float, default=1.0, help="learning rate (default: 1.0)")
    parser.add_argument("--seed", type=int, default=1, help="random seed (default: 1)")
    parser.add_argument(
        "--ckpt-interval",
        type=int,
        default=10,
        help="how many batches to wait before logging training status",
    )
    parser.add_argument(
        "--force-dist",
        action="store_true",
        help="Force distributed ML on CPU-only envs (default is False)",
    )
    args = parser.parse_args()

    # Force distributed ML on CPUs
    os.environ["ITWINAI_FORCE_DIST"] = "1" if args.force_dist else "0"

    transform = transforms.Compose(
        [transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]
    )

    is_main_worker = (
        not os.environ.get("LOCAL_RANK")
        or os.environ.get("LOCAL_RANK")
        and os.environ["LOCAL_RANK"] == "0"
    )
    if is_main_worker:
        # Dataset creation: pull dataset only in main worker
        train_dataset = datasets.MNIST(
            "data",
            train=True,
            download=not os.path.exists("DATASET_READY"),
            transform=transform,
        )
        validation_dataset = datasets.MNIST(
            "data",
            train=False,
            download=not os.path.exists("DATASET_READY"),
            transform=transform,
        )
        open("DATASET_READY", "w")
    else:
        import time

        while not os.path.exists("DATASET_READY"):
            # Wait for the dataset to be downloaded
            time.sleep(1)

        # Dataset creation
        train_dataset = datasets.MNIST("data", train=True, download=False, transform=transform)
        validation_dataset = datasets.MNIST(
            "data", train=False, download=False, transform=transform
        )

    # Subset dataset
    train_dataset = Subset(train_dataset, range(args.subset))
    validation_dataset = Subset(validation_dataset, range(args.subset))

    # Neural network to train
    model = Net()

    training_config = TrainingConfiguration(
        batch_size=args.batch_size,
        optim_lr=args.lr,
        optimizer="adadelta",
        loss="cross_entropy",
        dist_backend="gloo",
        num_workers_dataloader=0,
    )

    logger = MLFlowLogger(experiment_name="mnist-tutorial", log_freq=10)

    metrics = {
        "accuracy": torchmetrics.Accuracy(task="multiclass", num_classes=10),
        "precision": torchmetrics.Precision(task="multiclass", num_classes=10),
    }

    trainer = TorchTrainer(
        config=training_config,
        model=model,
        metrics=metrics,
        logger=logger,
        strategy=args.strategy,
        epochs=args.epochs,
        random_seed=args.seed,
        checkpoint_every=args.ckpt_interval,
    )

    # Launch training
    _, _, _, trained_model = trainer.execute(train_dataset, validation_dataset, None)


if __name__ == "__main__":
    main()

7.3. cpu.yaml

# --------------------------------------------------------------------------------------
# Part of the interTwin Project: https://www.intertwin.eu/
#
# Created by: Matteo Bunino
#
# Credit:
# - Matteo Bunino <matteo.bunino@cern.ch> - CERN
# --------------------------------------------------------------------------------------

apiVersion: "kubeflow.org/v1"
kind: PyTorchJob
metadata:
  name: torchrun-cpu
spec:
  # This property assumes that each pod runs on a separate node,
  # and is propagated to torchrun as its --nproc-per-node argument
  nprocPerNode: "2"
  pytorchReplicaSpecs:
    Master:
      # Usually only one Master pod is used 
      replicas: 1
      restartPolicy: OnFailure
      template:
        spec:
          containers:
            - name: pytorch
              image: registry.cern.ch/itwinai/dist-ml/itwinai-slim:0.0.10
              command:
                - "torchrun"
                - "/app/train-cpu.py"
                - "--force-dist"
              resources:
                # Requests help to implicitly make sure that each pod is running
                # in a separate node.
                requests:
                  cpu: 1500m
                limits:
                  cpu: 1500m
                  memory: 2500Mi
    Worker:
      # The number of worker pods 
      replicas: 1
      restartPolicy: OnFailure
      template:
        spec:
          containers:
            - name: pytorch
              image: registry.cern.ch/itwinai/dist-ml/itwinai-slim:0.0.10
              command:
                - "torchrun"
                - "/app/train-cpu.py"
                - "--force-dist"
              resources:
                requests:
                  cpu: 1500m
                limits:
                  cpu: 1500m
                  memory: 2500Mi

7.4. Dockerfile

# --------------------------------------------------------------------------------------
# Part of the interTwin Project: https://www.intertwin.eu/
#
# Created by: Matteo Bunino
#
# Credit:
# - Matteo Bunino <matteo.bunino@cern.ch> - CERN
# --------------------------------------------------------------------------------------

FROM python:3.11-slim-bullseye

WORKDIR /app

RUN apt-get update && apt-get install -y \
    git \
    && apt-get clean -y && rm -rf /var/lib/apt/lists/*

COPY pyproject.toml pyproject.toml
COPY src src
RUN pip install --no-cache-dir --upgrade pip setuptools wheel \
    && pip install --no-cache-dir ".[torch]" --extra-index-url https://download.pytorch.org/whl/cpu

COPY tutorials/distributed-ml/torch-k8s/train-cpu.py train-cpu.py

LABEL org.opencontainers.image.authors="Matteo Bunino - matteo.bunino@cern.ch"
LABEL org.opencontainers.image.url="https://github.com/interTwin-eu/itwinai"
LABEL org.opencontainers.image.documentation="https://itwinai.readthedocs.io/"
LABEL org.opencontainers.image.source="https://github.com/interTwin-eu/itwinai"
LABEL org.opencontainers.image.vendor="CERN - European Organization for Nuclear Research"