itwinai.torch

config

Default configuration

class itwinai.torch.config.Configuration(**extra_data: Any)[source]

Bases: BaseModel

Base configuration class.

model_config: ClassVar[ConfigDict] = {'extra': 'allow'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class itwinai.torch.config.TrainingConfiguration(*, batch_size: int = 32, shuffle_train: bool = False, shuffle_validation: bool = False, shuffle_test: bool = False, pin_gpu_memory: bool = False, num_workers_dataloader: int = 4, loss: Literal['mse', 'nllloss', 'cross_entropy', 'l1', 'l2', 'bceloss'] = 'cross_entropy', optimizer: Literal['adadelta', 'adam', 'adamw', 'rmsprop', 'sgd'] = 'adam', optim_lr: float = 0.001, optim_momentum: float = 0.9, optim_betas: Tuple[float, float] = (0.9, 0.999), optim_weight_decay: float = 0.0, lr_scheduler: Literal['step', 'multistep', 'constant', 'linear', 'exponential', 'polynomial'] | None = None, lr_scheduler_step_size: int | Iterable[int] = 10, lr_scheduler_gamma: float = 0.95, fp16_allreduce: bool = False, use_adasum: bool = False, gradient_predivide_factor: float = 1.0, dist_backend: Literal['nccl', 'gloo', 'mpi'] = 'nccl', **extra_data: Any)[source]

Bases: Configuration

Default configuration object for training. Override and/or create new configurations using the constructor.

Example:

>>> cfg = TrainingConfiguration(batch_size=17, param_a=42)
>>> print(cfg.batch_size)  # returns 17 (overrides default)
>>> print(cfg.param_a)     # returns 42 (new value)
>>> print(cfg.pin_memory)  # returns the default value
>>>
>>> from rich import print
>>> print(cfg)             # pretty-print of configuration

Warning

Don’t reinvent parameters that already exist in the training coniguration, if possible. Instead, use the name of the parameters in the training configuration when possible to avoid inconsistencies. For instance, the training configuration defines the learning rate as optim_lr, so if you redefine it as lr by doing TrainingConfiguration(lr=0.005) in the configuration you will now have both optim_lr (created by default) and lr (created by you). This may create confusion and potentially (and silently) break the logic in your code.

batch_size: int

Batch size. In a distributed environment it is usually the per-worker batch size. Defaults to 32.

shuffle_train: bool

Whether to shuffle train dataset when creating a torch DataLoader. Defaults to False.

shuffle_validation: bool

Whether to shuffle validation dataset when creating a torch DataLoader. Defaults to False.

shuffle_test: bool

Whether to shuffle test dataset when creating a torch DataLoader. Defaults to False.

pin_gpu_memory: bool

Whether to pin GPU memory. Property of torch DataLoader. Defaults to False.

num_workers_dataloader: int

Number of parallel workers used by torch DataLoader. Defaults to 4.

loss: Literal['mse', 'nllloss', 'cross_entropy', 'l1', 'l2', 'bceloss']

Loss function. Defaults to β€˜cross_entropy’

optimizer: Literal['adadelta', 'adam', 'adamw', 'rmsprop', 'sgd']

Name of the optimizer to use. Defaults to β€˜adam’.

optim_lr: float

Learning rate used by the optimizer. Defaults to 1e-3.

optim_momentum: float

Momentum used by some optimizers (e.g., SGD). Defaults to 0.9.

optim_betas: Tuple[float, float]

Betas of Adam optimizer (if used). Defaults to (0.9, 0.999).

optim_weight_decay: float

Weight decay parameter for the optimizer. Defaults to 0.

lr_scheduler: Literal['step', 'multistep', 'constant', 'linear', 'exponential', 'polynomial'] | None

Learning rate scheduler algorithm. Defaults to None (not used).

lr_scheduler_step_size: int | Iterable[int]

Learning rate scheduler step size, if needed by the scheduler. Defaults to 10 (epochs).

lr_scheduler_gamma: float
fp16_allreduce: bool

uses float16 operations in the allreduce distributed gradients aggregation. Better performances at lower precision. Defaults to False.

Type:

Parameter of Horovod’s DistributedOptimizer

use_adasum: bool

use Adasum optimization. Defaults to False.

Type:

Parameter of Horovod’s DistributedOptimizer

gradient_predivide_factor: float

scale gradients before adding them up. Defaults to 1.0.

Type:

Parameter of Horovod’s DistributedOptimizer

dist_backend: Literal['nccl', 'gloo', 'mpi']

Torch distributed backend. Defaults to nccl.

model_config: ClassVar[ConfigDict] = {'extra': 'allow'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

distributed

itwinai.torch.distributed.distributed_resources_available() bool[source]

Check if the current execution environment has (enough) GPUs available to allow for distributed ML.

Returns:

env can support distributed ML.

Return type:

bool

itwinai.torch.distributed.check_initialized(method: Callable) Callable[source]

Decorator for strategy methods to check whether the strategy was correctly initialized before calling the method.

itwinai.torch.distributed.initialize_ray() None[source]

This method is used by the RayDDPStrategy and RayDeepSpeedStrategy to initialize the Ray backend if it is not already initialized. This is meant to be called before submitting a function to Ray (as a trial in tuning, or as a worker in distributed ML).

Raises:
  • RuntimeError – when no Ray cluster is detected.

  • EnvironmentError – If required environment variables HEAD_NODE_PORT or HEAD_NODE_IP are not set. These should be set from the slurm script where the ray cluster is launched.

class itwinai.torch.distributed.TorchDistributedStrategy[source]

Bases: ABC

Abstract class to define the distributed backend methods for PyTorch models.

is_distributed: bool = True

Allows to discriminate distributed strategies from non-distributed. Defaults to True.

is_initialized: bool = False

Set to True when the current strategy is initialized. Defaults to False.

name: str
property is_main_worker: bool

Checks if local worker has global rank equal to zero.

Returns:

True if main worker.

Return type:

bool

abstract init() None[source]

Initializes the chosen distributed backend

abstract distributed(model: Module, optimizer: Optimizer, lr_scheduler: _LRScheduler | None = None) Tuple[Module, Optimizer, _LRScheduler | None][source]

Setup model, optimizer and scheduler for distributed.

abstract global_world_size() int[source]

Returns the total number of processes (global world size).

Returns:

global world size.

Return type:

int

abstract local_world_size() int[source]

Returns the number of local workers available on a node (local world size). Usually it is equal to the number of available GPUs.

Returns:

local world size.

Return type:

int

abstract global_rank() int[source]

Returns the global rank of the current process. Rank ranges from 0 to world_size.

Returns:

global rank.

Return type:

int

abstract local_rank() int[source]

Returns the local rank of the current process.

Returns:

local rank.

Return type:

int

abstract barrier() None[source]

Forces all the workers to wait for each other.

device() str[source]

Device used by local worker.

Returns:

torch device in the form β€˜device:N’ (e.g., β€˜cuda:0’, β€˜cpu’).

Return type:

str

set_device()[source]

Set local device.

create_dataloader(dataset: Dataset, batch_size: int | None = 1, shuffle: bool | None = None, sampler: Sampler | Iterable | None = None, batch_sampler: Sampler[List] | Iterable[List] | None = None, num_workers: int = 0, collate_fn: Callable[[List[_T]], Any] | None = None, pin_memory: bool = False, drop_last: bool = False, timeout: float = 0, worker_init_fn: Callable[[int], None] | None = None, multiprocessing_context=None, generator=None, *, prefetch_factor: int | None = None, persistent_workers: bool = False, pin_memory_device: str = '')[source]

Create a distributed DataLoader by using DistributedSampler as random sampler.

Parameters:
  • dataset (Dataset) – dataset from which to load the data.

  • batch_size (int, optional) – how many samples per batch to load (default: 1).

  • shuffle (bool, optional) – set to True to have the data reshuffled at every epoch (default: False).

  • sampler (Sampler or Iterable, optional) – defines the strategy to draw samples from the dataset. Can be any Iterable with __len__ implemented. If specified, shuffle must not be specified.

  • batch_sampler (Sampler or Iterable, optional) – like sampler, but returns a batch of indices at a time. Mutually exclusive with batch_size, shuffle, sampler, and drop_last.

  • num_workers (int, optional) – how many subprocesses to use for data loading. 0 means that the data will be loaded in the main process. (default: 0)

  • collate_fn (Callable, optional) – merges a list of samples to form a mini-batch of Tensor(s). Used when using batched loading from a map-style dataset.

  • pin_memory (bool, optional) – If True, the data loader will copy Tensors into device/CUDA pinned memory before returning them. If your data elements are a custom type, or your collate_fn returns a batch that is a custom type, see the example below.

  • drop_last (bool, optional) – set to True to drop the last incomplete batch, if the dataset size is not divisible by the batch size. If False and the size of dataset is not divisible by the batch size, then the last batch will be smaller. (default: False)

  • timeout (numeric, optional) – if positive, the timeout value for collecting a batch from workers. Should always be non-negative. (default: 0)

  • worker_init_fn (Callable, optional) – If not None, this will be called on each worker subprocess with the worker id (an int in [0, num_workers - 1]) as input, after seeding and before data loading. (default: None)

  • or (multiprocessing_context (str) – multiprocessing.context.BaseContext, optional): If None, the default multiprocessing context of your operating system will be used. (default: None)

  • generator (torch.Generator, optional) – If not None, this RNG will be used by RandomSampler to generate random indexes and multiprocessing to generate base_seed for workers. (default: None)

  • prefetch_factor (int, optional, keyword-only arg) – Number of batches loaded in advance by each worker. 2 means there will be a total of 2 * num_workers batches prefetched across all workers. (default value depends on the set value for num_workers. If value of num_workers=0 default is None. Otherwise, if value of num_workers > 0 default is 2).

  • persistent_workers (bool, optional) – If True, the data loader will not shut down the worker processes after a dataset has been consumed once. This allows to maintain the workers Dataset instances alive. (default: False)

  • pin_memory_device (str, optional) – the device to pin_memory to if pin_memory is True.

Raises:
  • UninitializedStrategyError – when this method is called for a strategy which had not been initialized.

  • RuntimeError – when a user-provided sampler, if given, is not of type DistributedSampler.

Warning

If the spawn start method is used, worker_init_fn cannot be an unpicklable object, e.g., a lambda function. See Multiprocessing best practices on more details related to multiprocessing in PyTorch.

Warning

len(dataloader) heuristic is based on the length of the sampler used. When dataset is an IterableDataset, it instead returns an estimate based on len(dataset) / batch_size, with proper rounding depending on drop_last, regardless of multi-process loading configurations. This represents the best guess PyTorch can make because PyTorch trusts user dataset code in correctly handling multi-process loading to avoid duplicate data.

However, if sharding results in multiple workers having incomplete last batches, this estimate can still be inaccurate, because (1) an otherwise complete batch can be broken into multiple ones and (2) more than one batch worth of samples can be dropped when drop_last is set. Unfortunately, PyTorch can not detect such cases in general.

See Dataset Types for more details on these two types of datasets and how IterableDataset interacts with Multi-process data loading.

abstract clean_up() None[source]

Cleans up resources allocated by distributed strategy.

abstract allgather_obj(obj: Any) List[Any][source]

All-gathers any object from the whole group in a list (to all workers).

Parameters:

obj (Any) – object to gather from all workers.

Returns:

list of objects gathered from all workers.

Return type:

List[Any]

abstract gather_obj(obj: Any, dst_rank: int = 0) List[Any][source]

Gathers any object from the whole group in a list (to all workers).

Parameters:
  • obj (Any) – object to gather from all workers.

  • dst_rank (int) – rank of the worker on which the objects list are gathered.

Returns:

list of objects gathered from all workers.

Return type:

List[Any]

abstract gather(tensor: Tensor, dst_rank: int = 0) List | None[source]

Gathers any object from the whole group in a list (to all workers).

Parameters:
  • obj (Any) – object to gather from all workers.

  • dst_rank (int) – rank of the worker on which the objects list are gathered.

Returns:

list of objects gathered from all workers if main

worker, otherwise return None.

Return type:

Optional[List[Any]]

abstract broadcast_obj(obj: Any, src_rank: int) Any[source]

Broadcasts an object to all workers.

Parameters:
  • obj (Any) – object to broadcast to all workers.

  • src_rank (int) – the rank that broadcasted

Returns:

broadcasted object.

Return type:

Any

class itwinai.torch.distributed.TorchDDPStrategy(backend: Literal['nccl', 'gloo', 'mpi'])[source]

Bases: TorchDistributedStrategy

PyTorch DistributedDataParallel distributed strategy class.

Parameters:

backend (Literal['nccl', 'gloo', 'mpi']) – Name of the distributed communication backend to employ.

backend: Literal['nccl', 'gloo', 'mpi']

Torch distributed communication backend.

init() None[source]

Initializes the distributed process group and the distributed package.

Raises:
  • RuntimeError – when there are not (enough) GPUs available.

  • DistributedStrategyError – when trying to initialize a strategy which is already initialized.

distributed(model: Module, optimizer: Optimizer, lr_scheduler: _LRScheduler | None = None, find_unused_parameters: bool = False, **kwargs) Tuple[Module, Optimizer, _LRScheduler | None][source]

Setup model, optimizer and scheduler for distributed.

barrier() None[source]

Forces all the workers to wait for each other.

global_world_size() int[source]

Returns the total number of processes (global world size).

Returns:

global world size.

Return type:

int

local_world_size() int[source]

Returns the local number of workers available per node, which is usually the number of GPUs available.

Returns:

local world size.

Return type:

int

Raises:

RuntimeError – when the local world size cannot be retrieved.

global_rank() int[source]

Returns the global rank of the current process, where rank ranges from 0 to world_size.

Returns:

global rank.

Return type:

int

local_rank() int[source]

Returns the local rank of the current process.

Returns:

local rank.

Return type:

int

clean_up() None[source]

Destroys the current process group.

allgather_obj(obj: Any) List[Any][source]

All-gathers any object from the whole group in a list (to all workers).

Parameters:

obj (Any) – Object to gather from all workers.

Returns:

List of gathered objects.

Return type:

List[Any]

gather_obj(obj: Any, dst_rank: int = 0) List | None[source]

Gathers any object from the whole group in a list (to all workers).

Parameters:
  • obj (Any) – object to gather from all workers.

  • dst_rank (int) – rank of the worker on which the objects list are gathered.

Returns:

list of objects gathered from all workers or None on non-destination ranks.

Return type:

List | None

gather(tensor: Tensor, dst_rank: int = 0) List | None[source]

Gathers any object from the whole group in a list (to all workers).

Parameters:
  • obj (Any) – object to gather from all workers.

  • dst_rank (int) – rank of the worker on which the objects list are gathered.

Returns:

list of objects gathered from all workers if main

worker, otherwise return None.

Return type:

Optional[List[Any]]

broadcast_obj(obj: Any, src_rank: int) Any[source]

Broadcasts an object to all workers. (object must be picklable)

Parameters:
  • obj (Any) – object to broadcast to all workers.

  • src_rank (int) – the rank that broadcasted

Returns:

broadcasted object.

Return type:

Any

class itwinai.torch.distributed.DeepSpeedStrategy(backend: Literal['nccl', 'gloo', 'mpi'])[source]

Bases: TorchDistributedStrategy

DeepSpeed distributed strategy class.

Parameters:
  • backend (Literal['nccl', 'gloo', 'mpi']) – Name of the distributed communication backend to employ.

  • config (Union[dict, Path, str]) – DeepSpeed config. Either a dictionary or a path to a JSON file.

backend: Literal['nccl', 'gloo', 'mpi']

Torch distributed communication backend.

init() None[source]

Initializes the distributed process group and the distributed package.

Raises:
  • RuntimeError – when there are not (enough) GPUs available.

  • DistributedStrategyError – when trying to initialize a strategy already initialized.

distributed(model: Module, optimizer: Optimizer | None = None, lr_scheduler: _LRScheduler | None = None, model_parameters: Any | None = None, **init_kwargs) Tuple[Module, Optimizer, _LRScheduler | None][source]

Setup model, optimizer and scheduler for distributed.

barrier() None[source]

Forces all the workers to wait for each other.

global_world_size() int[source]

Returns the total number of processes (global world size).

Returns:

global world size.

Return type:

int

local_world_size() int[source]

Returns the local number of workers available per node, which is usually the number of GPUs available.

Returns:

local world size.

Return type:

int

Raises:

RuntimeError – when the local world size cannot be retrieved.

global_rank() int[source]

Returns the global rank of the current process, where rank ranges from 0 to world_size.

Returns:

global rank.

Return type:

int

local_rank() int[source]

Returns the local rank of the current process.

Returns:

local rank.

Return type:

int

clean_up() None[source]

Destroys the current process group.

allgather_obj(obj: Any) List[Any][source]

All-gathers any object from the whole group in a list (to all workers).

Parameters:

obj (Any) – Object to gather from all workers.

Returns:

List of gathered objects.

Return type:

List[Any]

gather_obj(obj: Any, dst_rank: int = 0) List[Any] | None[source]

Gathers any object from the whole group in a list (to all workers).

Parameters:
  • obj (Any) – object to gather from all workers.

  • dst_rank (int) – rank of the worker on which the objects list are gathered.

Returns:

list of objects gathered from all workers or None on non-destination ranks.

Return type:

Optional[List[Any]]

gather(tensor: Tensor, dst_rank: int = 0) List[Tensor] | None[source]

Gathers a tensor from the whole group in a list (to all workers).

Parameters:
  • obj (Any) – object to gather from all workers.

  • dst_rank (int) – rank of the worker on which the objects list are gathered.

Returns:

list of tensors gathered from all workers or None on non-destination ranks.

Return type:

Optional[List[torch.Tensor]]

broadcast_obj(obj: Any, src_rank: int) Any[source]

Broadcasts an object to all workers. (object must be picklable)

Parameters:
  • obj (Any) – object to broadcast to all workers.

  • src_rank (int) – the rank that broadcasted

Returns:

broadcasted object.

Return type:

Any

class itwinai.torch.distributed.HorovodStrategy[source]

Bases: TorchDistributedStrategy

Horovod distributed strategy class.

init() None[source]

Initializes the Horovod distributed backend.

Raises:
  • RuntimeError – when there are not (enough) GPUs available.

  • DistributedStrategyError – when trying to initialize a strategy already initialized.

distributed(model: Module, optimizer: Optimizer | None = None, lr_scheduler: _LRScheduler | None = None, **optim_kwargs) Tuple[Module, Optimizer, _LRScheduler | None][source]

Setup model, optimizer and scheduler for distributed.

barrier() None[source]

Forces all the workers to wait for each other.

global_world_size() int[source]

Returns the total number of processes (global world size).

Returns:

global world size.

Return type:

int

local_world_size() int[source]

Returns the local number of workers available per node, which is usually the number of GPUs available.

Returns:

local world size.

Return type:

int

global_rank() int[source]

Returns the global rank of the current process, where rank ranges from 0 to world_size.

Returns:

global rank.

Return type:

int

local_rank() int[source]

Returns the local rank of the current process.

Returns:

local rank.

Return type:

int

clean_up() None[source]

Shuts Horovod down.

allgather_obj(obj: Any) list[Any][source]

All-gathers any object from the whole group in a list (to all workers).

Parameters:

obj (Any) – Object to gather from all workers.

Returns:

List of gathered objects.

Return type:

List[Any]

gather_obj(obj: Any, dst_rank: int = 0) list[Any] | None[source]

Gathers any object from the whole group in a list (to all workers). Under the hood it relies on allgather as gather is not supported by Horovod.

Parameters:
  • obj (Any) – object to gather from all workers.

  • dst_rank (int) – rank of the worker on which the objects list are gathered.

Returns:

list of objects gathered from all workers or None on non-destination ranks.

Return type:

Optional[List[Any]]

gather(tensor: Tensor, dst_rank: int = 0) List[Tensor] | None[source]

Gathers a tensor from the whole group in a list (to all workers). Under the hood it relies on allgather as gather is not supported by Horovod.

Parameters:
  • obj (Any) – object to gather from all workers.

  • dst_rank (int) – rank of the worker on which the objects list are gathered.

Returns:

list of tensors gathered from all workers or None on non-destination ranks.

Return type:

Optional[List[torch.Tensor]]

broadcast_obj(obj: Any, src_rank: int) Any[source]

Broadcasts an object to all workers. (object must be picklable)

Parameters:
  • obj (Any) – object to broadcast to all workers.

  • src_rank (int) – the rank that broadcasted

Returns:

broadcasted object.

Return type:

Any

class itwinai.torch.distributed.NonDistributedStrategy[source]

Bases: TorchDistributedStrategy

Dummy class for non-distributed environments.

is_distributed: bool = False

This strategy is not distributed. Defaults to False.

init() None[source]

If CUDA is available set CUDA device, and do nothing more.

Raises:

DistributedStrategyError – when trying to initialize a strategy already initialized.

distributed(model: Module, optimizer: Optimizer | None = None, lr_scheduler: _LRScheduler | None = None, **kwargs) Tuple[Module, Optimizer, _LRScheduler | None][source]

Do nothing and return model, optimizer and scheduler.

barrier() None[source]

Forces all the workers to wait for each other.

global_world_size() int[source]

Returns the total number of processes (global world size).

Returns:

global world size.

Return type:

int

local_world_size() int[source]

Returns the local number of workers available per node, which is usually the number of GPUs available.

Returns:

local world size.

Return type:

int

global_rank() int[source]

Returns the global rank of the current process, where rank ranges from 0 to world_size.

Returns:

global rank.

Return type:

int

local_rank() int[source]

Returns the local rank of the current process.

Returns:

local rank.

Return type:

int

clean_up() None[source]

Do nothing.

allgather_obj(obj: Any) list[Any][source]

Wraps obj into a List object.

Parameters:

obj (Any) – object in a worker.

Returns:

input object wrapped in a list.

Return type:

list[Any]

gather_obj(obj: Any, dst_rank: int = 0) list[Any][source]

Wraps obj into a List object.

Parameters:
  • obj (Any) – object in a worker.

  • dst_rank (int) – ignored.

Returns:

input object wrapped in a list.

Return type:

list[Any]

gather(tensor: Tensor, dst_rank: int = 0)[source]

Wraps tensor into a List object.

Parameters:
  • tensor (Any) – object in a worker.

  • dst_rank (int) – ignored.

Returns:

input object wrapped in a list.

Return type:

list[Any]

broadcast_obj(obj: Any, src_rank: int) Any[source]

Broadcasts an object to all workers.

Parameters:
  • obj (Any) – object to broadcast to all workers.

  • src_rank (int) – the rank that broadcasted

Returns:

broadcasted object.

Return type:

Any

class itwinai.torch.distributed.RayTorchDistributedStrategy[source]

Bases: TorchDistributedStrategy

Base class for all ray distributed strategies.

class itwinai.torch.distributed.RayDDPStrategy[source]

Bases: TorchDDPStrategy, RayTorchDistributedStrategy

A distributed data-parallel (DDP) strategy using Ray Train for PyTorch training.

init() None[source]

Initializes Ray trial/worker.

Raises:

RuntimeError – when the Ray cluster is not detected.

global_world_size() int[source]

Returns the total number of processes (global world size).

Returns:

global world size.

Return type:

int

local_world_size() int[source]

Returns the local number of workers available per node, which is usually the number of GPUs available.

Returns:

local world size.

Return type:

int

Raises:

RuntimeError – when the local world size cannot be retrieved.

global_rank() int[source]

Returns the global rank of the current process, where rank ranges from 0 to world_size.

Returns:

global rank.

Return type:

int

local_rank() int[source]

Returns the local rank of the current process.

Returns:

local rank.

Return type:

int

distributed(model: Module, optimizer: Optimizer, lr_scheduler: _LRScheduler | None = None) Tuple[Module, Optimizer, _LRScheduler | None][source]

Setup model, optimizer and scheduler for distributed.

class itwinai.torch.distributed.RayDeepSpeedStrategy(backend: Literal['nccl', 'gloo', 'mpi'])[source]

Bases: DeepSpeedStrategy, RayTorchDistributedStrategy

A distributed strategy using Ray and DeepSpeed for PyTorch training.

Parameters:

backend (Literal["nccl", "gloo", "mpi"]) – The backend for distributed communication.

init() None[source]

Initializes the distributed process group and the distributed package.

Raises:
  • RuntimeError – when there is not a Ray cluster running.

  • DistributedStrategyError – when trying to initialize a strategy already initialized.

gan

Provides training logic for PyTorch models via Trainer classes.

class itwinai.torch.gan.GANTrainingConfiguration(*, batch_size: int = 32, shuffle_train: bool = False, shuffle_validation: bool = False, shuffle_test: bool = False, pin_gpu_memory: bool = False, num_workers_dataloader: int = 4, loss: str = 'bceloss', optimizer: Literal['adadelta', 'adam', 'adamw', 'rmsprop', 'sgd'] = 'adam', optim_lr: float = 0.001, optim_momentum: float = 0.9, optim_betas: Tuple[float, float] = (0.9, 0.999), optim_weight_decay: float = 0.0, lr_scheduler: Literal['step', 'multistep', 'constant', 'linear', 'exponential', 'polynomial'] | None = None, lr_scheduler_step_size: int | Iterable[int] = 10, lr_scheduler_gamma: float = 0.95, fp16_allreduce: bool = False, use_adasum: bool = False, gradient_predivide_factor: float = 1.0, dist_backend: Literal['nccl', 'gloo', 'mpi'] = 'nccl', optimizer_generator: Literal['adadelta', 'adam', 'adamw', 'rmsprop', 'sgd'] = 'adam', optim_generator_lr: float = 0.001, optim_generator_momentum: float = 0.9, optim_generator_betas: Tuple[float, float] = (0.5, 0.999), optim_generator_weight_decay: float = 0.0, lr_scheduler_generator: Literal['step', 'multistep', 'constant', 'linear', 'exponential', 'polynomial'] | None = None, lr_scheduler_generator_step_size: int | Iterable[int] = 10, lr_scheduler_generator_gamma: float = 0.95, optimizer_discriminator: Literal['adadelta', 'adam', 'adamw', 'rmsprop', 'sgd'] = 'adam', optim_discriminator_lr: float = 0.001, optim_discriminator_momentum: float = 0.9, optim_discriminator_betas: Tuple[float, float] = (0.5, 0.999), optim_discriminator_weight_decay: float = 0.0, lr_scheduler_discriminator: Literal['step', 'multistep', 'constant', 'linear', 'exponential', 'polynomial'] | None = None, lr_scheduler_discriminator_step_size: int | Iterable[int] = 10, lr_scheduler_discriminator_gamma: float = 0.95, z_dim: int = 100, **extra_data: Any)[source]

Bases: TrainingConfiguration

Configuration object for training a GAN. Extends the base TrainingConfiguration.

optimizer_generator: Literal['adadelta', 'adam', 'adamw', 'rmsprop', 'sgd']

Name of the optimizer to use for the generator. Defaults to β€˜adam’.

optim_generator_lr: float

Learning rate used by the optimizer for the generator. Defaults to 1e-3.

optim_generator_momentum: float

Momentum used by some optimizers (e.g., SGD) for the generator. Defaults to 0.9.

optim_generator_betas: Tuple[float, float]

Betas of Adam optimized (if used) for the generator. Defaults to (0.5, 0.999).

optim_generator_weight_decay: float

Weight decay parameter for the optimizer for the generator. Defaults to 0.

lr_scheduler_generator: Literal['step', 'multistep', 'constant', 'linear', 'exponential', 'polynomial'] | None

Learning rate scheduler algorithm for the generator optimizer. Defaults to None (not used).

lr_scheduler_generator_step_size: int | Iterable[int]

Learning rate scheduler step size, if needed by the scheduler. Defaults to 10 (epochs).

lr_scheduler_generator_gamma: float
optimizer_discriminator: Literal['adadelta', 'adam', 'adamw', 'rmsprop', 'sgd']

Name of the optimizer to use for the discriminator. Defaults to β€˜adam’.

optim_discriminator_lr: float

Learning rate used by the optimizer for the discriminator. Defaults to 1e-3.

optim_discriminator_momentum: float

Momentum used by some optimizers (e.g., SGD) for the discriminator. Defaults to 0.9.

optim_discriminator_betas: Tuple[float, float]

Betas of Adam optimized (if used) for the discriminator. Defaults to (0.5, 0.999).

optim_discriminator_weight_decay: float

Weight decay parameter for the optimizer for the discriminator. Defaults to 0.

lr_scheduler_discriminator: Literal['step', 'multistep', 'constant', 'linear', 'exponential', 'polynomial'] | None

Learning rate scheduler algorithm for the discriminator optimizer. Defaults to None (not used).

lr_scheduler_discriminator_step_size: int | Iterable[int]

Learning rate scheduler step size, if needed by the scheduler. Defaults to 10 (epochs).

lr_scheduler_discriminator_gamma: float
loss: str

Classification criterion to be used for generator and discriminator losses. Defaults to β€œbceloss”.

z_dim: int

Generator input size (random noise size). Defaults to 100.

model_config: ClassVar[ConfigDict] = {'extra': 'allow'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class itwinai.torch.gan.GANTrainer(config: Dict | GANTrainingConfiguration, epochs: int, discriminator: Module, generator: Module, strategy: Literal['ddp', 'deepspeed'] = 'ddp', test_every: int | None = None, random_seed: int | None = None, logger: Logger | None = None, metrics: Dict[str, Metric] | None = None, checkpoints_location: str = 'checkpoints', checkpoint_every: int | None = None, name: str | None = None, profiling_wait_epochs: int = 1, profiling_warmup_epochs: int = 2, ray_scaling_config: ScalingConfig | None = None, ray_tune_config: TuneConfig | None = None, ray_run_config: RunConfig | None = None, ray_search_space: Dict[str, Any] | None = None, ray_torch_config: TorchConfig | None = None, ray_data_config: DataConfig | None = None, from_checkpoint: str | Path | None = None, **kwargs)[source]

Bases: TorchTrainer

Trainer class for GAN models using pytorch.

Parameters:
  • config (Dict | TrainingConfiguration) – training configuration containing hyperparameters.

  • epochs (int) – number of training epochs.

  • discriminator (nn.Module) – pytorch discriminator model to train GAN.

  • generator (nn.Module) – pytorch generator model to train GAN.

  • strategy (Literal['ddp', 'deepspeed', 'horovod'], optional) – distributed strategy. Defaults to β€˜ddp’.

  • test_every (int | None, optional) – run a test epoch every test_every epochs. Disabled if None. Defaults to None.

  • random_seed (int | None, optional) – set random seed for reproducibility. If None, the seed is not set. Defaults to None.

  • logger (Logger | None, optional) – logger for ML tracking. Defaults to None.

  • metrics (Dict[str, Callable] | None, optional) – map of torch metrics metrics. Defaults to None.

  • checkpoints_location (str) – path to checkpoints directory. Defaults to β€œcheckpoints”.

  • checkpoint_every (int | None) – save a checkpoint every checkpoint_every epochs. Disabled if None. Defaults to None.

  • name (str | None, optional) – trainer custom name. Defaults to None.

  • profiling_wait_epochs (int) – how many epochs to wait before starting the profiler.

  • profiling_warmup_epochs (int) – length of the profiler warmup phase in terms of number of epochs.

  • ray_scaling_config (ScalingConfig, optional) – scaling config for Ray Trainer. Defaults to None,

  • ray_tune_config (TuneConfig, optional) – tune config for Ray Tuner. Defaults to None.

  • ray_run_config (RunConfig, optional) – run config for Ray Trainer. Defaults to None.

  • ray_search_space (Dict[str, Any], optional) – search space for Ray Tuner. Defaults to None.

  • ray_torch_config (TorchConfig, optional) – torch configuration for Ray’s TorchTrainer. Defaults to None.

  • ray_data_config (DataConfig, optional) – dataset configuration for Ray. Defaults to None.

  • from_checkpoint (str | Path, optional) – path to checkpoint directory. Defaults to None.

loss: Callable | None = None

Classification loss criterion used in the generator and discriminator losses.

optimizer_generator: Optimizer | None = None

Optimizer for the generator.

optimizer_discriminator: Optimizer | None = None

Optimizer for the discriminator.

lr_scheduler_generator: LRScheduler | None = None

Learning rate scheduler for the optimizer of the generator.

lr_scheduler_discriminator: LRScheduler | None = None

Learning rate scheduler for the optimizer of the discriminator.

discriminator: Module | None = None

PyTorch discriminator to train.

generator: Module | None = None

PyTorch generator to train.

create_model_loss_optimizer() None[source]

Instantiate a torch model, loss, optimizer, and LR scheduler using the configuration provided in the Trainer constructor. Generally a user-defined method.

save_checkpoint(name: str, best_validation_metric: Tensor | None = None, checkpoints_root: str | Path | None = None, force: bool = False) str | None[source]

Save training checkpoint.

Parameters:
  • name (str) – name of the checkpoint directory.

  • best_validation_metric (torch.Tensor | None) – best validation loss throughout training so far (if available).

  • checkpoints_root (str | None) – path for root checkpoints dir. If None, uses self.checkpoints_location as base.

  • force (bool) – force checkpointign now.

Returns:

path to the checkpoint file or None when the checkpoint is not created.

train_epoch()[source]

Perform a complete sweep over the training dataset, completing an epoch of training.

Parameters:

epoch (int) – current epoch number, from 0 to self.epochs - 1.

Returns:

average training loss for the current epoch.

Return type:

torch.Tensor

train_step(real_images: Tensor, batch_idx: int) Tuple[Tensor, Tensor, Tensor][source]

train step for GAN.

Parameters:
  • real_images (torch.Tensor) – real images.

  • batch_idx (int) – batch index.

Returns:

loss of the discriminator torch.Tensor: loss of the generator torch.Tensor: accuracy of the discriminator

Return type:

torch.Tensor

validation_epoch(fid_features: int = 2048) Tensor[source]

Validation epoch for GAN.

Parameters:
  • fid_features (int, optional) – number of features for InceptionV3 modela.

  • 2048. (Defaults to)

Returns:

FID score that is returned by the FID metric.

Return type:

torch.Tensor

validation_step(real_images: Tensor, batch_idx: int, fid: FrechetInceptionDistance) Tuple[Tensor, Tensor][source]

Validation step for GAN.

Parameters:
  • real_images (torch.Tensor) – real images.

  • batch_idx (int) – batch index.

  • fid (FrechetInceptionDistance) – FID metric.

Returns:

accuracy of the generator torch.Tensor: accuracy of the discriminator

Return type:

torch.Tensor

save_fake_generator_images()[source]

Plot and save fake images from generator

inference

class itwinai.torch.inference.TorchModelLoader(model_uri: str, model_class: Module | None = None)[source]

Bases: ModelLoader

Loads a torch model from somewhere.

Parameters:

model_uri (str) – Can be a path on local filesystem or an mlflow β€˜locator’ in the form: β€˜mlflow+MLFLOW_TRACKING_URI+RUN_ID+ARTIFACT_PATH’

class itwinai.torch.inference.TorchPredictor(config: Dict | TrainingConfiguration, model: Module | ModelLoader, strategy: Literal['ddp', 'deepspeed', 'horovod'] = 'ddp', logger: Logger | None = None, checkpoints_location: str = 'checkpoints', name: str | None = None)[source]

Bases: TorchTrainer, Predictor

Applies a pre-trained torch model to unseen data.

inference_dataloader: DataLoader = None

PyTorch DataLoader for inference dataset.

model: Module = None

Pre-trained PyTorch model used to make predictions.

torch_rng: Generator = None

PyTorch random number generator (PRNG).

logger: Logger = None

itwinai itwinai.Logger

distribute_model() None[source]

Distribute the torch model with the chosen strategy.

create_dataloaders(inference_dataset: Dataset) DataLoader[source]

Create inference dataloader.

Parameters:

inference_dataset (Dataset) – inference dataset object.

Returns:

Instance of DataLoader for the given inference dataset.

Return type:

DataLoader

predict() Dict[str, Any][source]

Predicts or runs inference on a trained ML model.

Returns:

maps each item ID to the corresponding predicted values.

Return type:

Dict[str, Any]

execute(inference_dataset: Dataset, model: Module | None = None) Dict[str, Any][source]

Applies a torch model to a dataset for inference.

Parameters:
  • inference_dataset (Dataset[str, Any]) – each item in this dataset is a couple (item_unique_id, item)

  • model (nn.Module, optional) – torch model. Overrides the existing model, if given. Defaults to None.

Returns:

maps each item ID to the corresponding predicted

value(s).

Return type:

Dict[str, Any]

log(item: Any | List[Any], identifier: str | List[str], kind: str = 'metric', step: int | None = None, batch_idx: int | None = None, **kwargs) None[source]

Log item with identifier name of kind type at step time step.

Parameters:
  • item (Union[Any, List[Any]]) – element to be logged (e.g., metric).

  • identifier (Union[str, List[str]]) – unique identifier for the element to log(e.g., name of a metric).

  • kind (str) – Type of the item to be logged. Must be one among the list of self.supported_types. Defaults to β€˜metric’.

  • step (int | None) – logging step. Defaults to None.

  • batch_idx (int | None) – DataLoader batch counter (i.e., batch idx), if available. Defaults to None.

transform_predictions(batch: Tensor) Tensor[source]

Post-process the predictions of the torch model (e.g., apply threshold in case of multi-label classifier).

class itwinai.torch.inference.MulticlassTorchPredictor(config: Dict | TrainingConfiguration, model: Module | ModelLoader, strategy: Literal['ddp', 'deepspeed', 'horovod'] = 'ddp', logger: Logger | None = None, checkpoints_location: str = 'checkpoints', name: str | None = None)[source]

Bases: TorchPredictor

Applies a pre-trained torch model to unseen data for multiclass classification.

transform_predictions(batch: Tensor) Tensor[source]

Post-process the predictions of the torch model (e.g., apply threshold in case of multi-label classifier).

class itwinai.torch.inference.MultilabelTorchPredictor(model: Module | ModelLoader, test_dataloader_class: str = 'torch.utils.data.DataLoader', test_dataloader_kwargs: Dict | None = None, threshold: float = 0.5, name: str | None = None)[source]

Bases: TorchPredictor

Applies a pre-trained torch model to unseen data for multilabel classification, applying a threshold on the output of the neural network.

threshold: float = 0.5

Threshold to transform probabilities into class predictions. Defaults to 0.5.

transform_predictions(batch: Tensor) Tensor[source]

Post-process the predictions of the torch model (e.g., apply threshold in case of multi-label classifier).

class itwinai.torch.inference.RegressionTorchPredictor(config: Dict | TrainingConfiguration, model: Module | ModelLoader, strategy: Literal['ddp', 'deepspeed', 'horovod'] = 'ddp', logger: Logger | None = None, checkpoints_location: str = 'checkpoints', name: str | None = None)[source]

Bases: TorchPredictor

Applies a pre-trained torch model to unseen data for regression, leaving untouched the output of the neural network.

transform_predictions(batch: Tensor) Tensor[source]

Post-process the predictions of the torch model (e.g., apply threshold in case of multi-label classifier).

loggers

class itwinai.torch.loggers.ItwinaiLogger(itwinai_logger: Logger, log_model: Literal['all'] | bool = False, skip_finalize: bool = False)[source]

Bases: Logger

Adapter between PyTorch Lightning logger and itwinai logger.

This adapter forwards logging calls from PyTorch Lightning to the itwinai Logger instance, using the itwinai Logger’s log method. It supports the lightning logging of metrics, hyperparameters, and checkpoints. Additionally, any function calls can be forwarded to the itwinai logger instance though the experiment property of this Adapter.

property name: str | None

Return the experiment name.

property version: int | str | None

Return the experiment version.

property save_dir: str | None

Return the directory where the logs are stored.

property experiment: Logger

Lightning Logger function. Initializes and returns the itwinai Logger context for experiment tracking.

Returns:

The itwinai logger instance.

Return type:

Logger

finalize(status: str) None[source]

Lightning Logger function. Logs any remaining checkpoints and closes the logger context.

Parameters:
  • status (str) – Describes the status of the training (e.g., β€˜completed’, β€˜failed’).

  • classes' (The status is not needed for this function but part of the parent) – (LightningLogger)

  • signature (finalize functions)

  • here. (and therefore must be propagated)

log_metrics(metrics: Dict[str, float], step: int | None = None) None[source]

Lightning Logger function. Logs the given metrics and is usually called by the Lightning Trainer.

Parameters:
  • metrics (Dict[str, float]) – Dictionary of metrics to log.

  • step (Optional[int], optional) – Training step associated with the metrics. Defaults to None.

log_hyperparams(params: Dict[str, Any] | Namespace) None[source]

Lightning Logger function. Logs hyperparameters for the experiment.

Parameters:

params (Union[Dict[str, Any], Namespace]) – Hyperparameters dictionary or object.

after_save_checkpoint(checkpoint_callback: ModelCheckpoint) None[source]

Lightning Logger function. Handles checkpoint saving to the logger after the ModelCheckpoint Callback of the Lightning Trainer is called. The checkpoints are logged as artifacts.

Parameters:

checkpoint_callback (ModelCheckpoint) – Callback instance to manage checkpointing.

models.mnist

class itwinai.torch.models.mnist.MNISTModel(hidden_size: int = 64)[source]

Bases: LightningModule

Simple PL model for MNIST. Adapted from https://lightning.ai/docs/pytorch/stable/notebooks/lightning_examples/mnist-hello-world.html

forward(x)[source]

Same as torch.nn.Module.forward().

Parameters:
  • *args – Whatever you decide to pass into the forward method.

  • **kwargs – Keyword arguments are also possible.

Returns:

Your model’s output

training_step(batch, batch_idx)[source]

Here you compute and return the training loss and some additional metrics for e.g. the progress bar or logger.

Parameters:
  • batch – The output of your data iterable, normally a DataLoader.

  • batch_idx – The index of this batch.

  • dataloader_idx – The index of the dataloader that produced this batch. (only if multiple dataloaders used)

Returns:

  • Tensor - The loss tensor

  • dict - A dictionary which can include any keys, but must include the key 'loss' in the case of automatic optimization.

  • None - In automatic optimization, this will skip to the next batch (but is not supported for multi-GPU, TPU, or DeepSpeed). For manual optimization, this has no special meaning, as returning the loss is not required.

In this step you’d normally do the forward pass and calculate the loss for a batch. You can also do fancier things like multiple forward passes or something model specific.

Example:

def training_step(self, batch, batch_idx):
    x, y, z = batch
    out = self.encoder(x)
    loss = self.loss(out, x)
    return loss

To use multiple optimizers, you can switch to β€˜manual optimization’ and control their stepping:

def __init__(self):
    super().__init__()
    self.automatic_optimization = False


# Multiple optimizers (e.g.: GANs)
def training_step(self, batch, batch_idx):
    opt1, opt2 = self.optimizers()

    # do training_step with encoder
    ...
    opt1.step()
    # do training_step with decoder
    ...
    opt2.step()

Note

When accumulate_grad_batches > 1, the loss returned here will be automatically normalized by accumulate_grad_batches internally.

validation_step(batch, batch_idx)[source]

Operates on a single batch of data from the validation set. In this step you’d might generate examples or calculate anything of interest like accuracy.

Parameters:
  • batch – The output of your data iterable, normally a DataLoader.

  • batch_idx – The index of this batch.

  • dataloader_idx – The index of the dataloader that produced this batch. (only if multiple dataloaders used)

Returns:

  • Tensor - The loss tensor

  • dict - A dictionary. Can include any keys, but must include the key 'loss'.

  • None - Skip to the next batch.

# if you have one val dataloader:
def validation_step(self, batch, batch_idx): ...


# if you have multiple val dataloaders:
def validation_step(self, batch, batch_idx, dataloader_idx=0): ...

Examples:

# CASE 1: A single validation dataset
def validation_step(self, batch, batch_idx):
    x, y = batch

    # implement your own
    out = self(x)
    loss = self.loss(out, y)

    # log 6 example images
    # or generated text... or whatever
    sample_imgs = x[:6]
    grid = torchvision.utils.make_grid(sample_imgs)
    self.logger.experiment.add_image('example_images', grid, 0)

    # calculate acc
    labels_hat = torch.argmax(out, dim=1)
    val_acc = torch.sum(y == labels_hat).item() / (len(y) * 1.0)

    # log the outputs!
    self.log_dict({'val_loss': loss, 'val_acc': val_acc})

If you pass in multiple val dataloaders, validation_step() will have an additional argument. We recommend setting the default value of 0 so that you can quickly switch between single and multiple dataloaders.

# CASE 2: multiple validation dataloaders
def validation_step(self, batch, batch_idx, dataloader_idx=0):
    # dataloader_idx tells you which dataset this is.
    x, y = batch

    # implement your own
    out = self(x)

    if dataloader_idx == 0:
        loss = self.loss0(out, y)
    else:
        loss = self.loss1(out, y)

    # calculate acc
    labels_hat = torch.argmax(out, dim=1)
    acc = torch.sum(y == labels_hat).item() / (len(y) * 1.0)

    # log the outputs separately for each dataloader
    self.log_dict({f"val_loss_{dataloader_idx}": loss, f"val_acc_{dataloader_idx}": acc})

Note

If you don’t need to validate you don’t need to implement this method.

Note

When the validation_step() is called, the model has been put in eval mode and PyTorch gradients have been disabled. At the end of validation, the model goes back to training mode and gradients are enabled.

test_step(batch, batch_idx)[source]

Operates on a single batch of data from the test set. In this step you’d normally generate examples or calculate anything of interest such as accuracy.

Parameters:
  • batch – The output of your data iterable, normally a DataLoader.

  • batch_idx – The index of this batch.

  • dataloader_idx – The index of the dataloader that produced this batch. (only if multiple dataloaders used)

Returns:

  • Tensor - The loss tensor

  • dict - A dictionary. Can include any keys, but must include the key 'loss'.

  • None - Skip to the next batch.

# if you have one test dataloader:
def test_step(self, batch, batch_idx): ...


# if you have multiple test dataloaders:
def test_step(self, batch, batch_idx, dataloader_idx=0): ...

Examples:

# CASE 1: A single test dataset
def test_step(self, batch, batch_idx):
    x, y = batch

    # implement your own
    out = self(x)
    loss = self.loss(out, y)

    # log 6 example images
    # or generated text... or whatever
    sample_imgs = x[:6]
    grid = torchvision.utils.make_grid(sample_imgs)
    self.logger.experiment.add_image('example_images', grid, 0)

    # calculate acc
    labels_hat = torch.argmax(out, dim=1)
    test_acc = torch.sum(y == labels_hat).item() / (len(y) * 1.0)

    # log the outputs!
    self.log_dict({'test_loss': loss, 'test_acc': test_acc})

If you pass in multiple test dataloaders, test_step() will have an additional argument. We recommend setting the default value of 0 so that you can quickly switch between single and multiple dataloaders.

# CASE 2: multiple test dataloaders
def test_step(self, batch, batch_idx, dataloader_idx=0):
    # dataloader_idx tells you which dataset this is.
    x, y = batch

    # implement your own
    out = self(x)

    if dataloader_idx == 0:
        loss = self.loss0(out, y)
    else:
        loss = self.loss1(out, y)

    # calculate acc
    labels_hat = torch.argmax(out, dim=1)
    acc = torch.sum(y == labels_hat).item() / (len(y) * 1.0)

    # log the outputs separately for each dataloader
    self.log_dict({f"test_loss_{dataloader_idx}": loss, f"test_acc_{dataloader_idx}": acc})

Note

If you don’t need to test you don’t need to implement this method.

Note

When the test_step() is called, the model has been put in eval mode and PyTorch gradients have been disabled. At the end of the test epoch, the model goes back to training mode and gradients are enabled.

predict_step(batch, batch_idx, dataloader_idx=0)[source]

Step function called during predict(). By default, it calls forward(). Override to add any processing logic.

The predict_step() is used to scale inference on multi-devices.

To prevent an OOM error, it is possible to use BasePredictionWriter callback to write the predictions to disk or database after each batch or on epoch end.

The BasePredictionWriter should be used while using a spawn based accelerator. This happens for Trainer(strategy="ddp_spawn") or training on 8 TPU cores with Trainer(accelerator="tpu", devices=8) as predictions won’t be returned.

Parameters:
  • batch – The output of your data iterable, normally a DataLoader.

  • batch_idx – The index of this batch.

  • dataloader_idx – The index of the dataloader that produced this batch. (only if multiple dataloaders used)

Returns:

Predicted output (optional).

Example

class MyModel(LightningModule):

    def predict_step(self, batch, batch_idx, dataloader_idx=0):
        return self(batch)

dm = ...
model = MyModel()
trainer = Trainer(accelerator="gpu", devices=2)
predictions = trainer.predict(model, dm)

monitoring.monitoring

itwinai.torch.monitoring.monitoring.profile_gpu_utilization(stop_flag: ValueProxy, local_rank: int, global_rank: int, logger: Logger, run_id: str | None = None, parent_run_id: str | None = None, probing_interval: int = 2, warmup_time: int = 5) None[source]

Logs the GPU utilization across all availble GPUs on a single node. Is meant to be called by multiprocessing’s Process and expects variables to be shared using a multiprocessing.Manager object. Logs utilization into log_dict until stop_flag.value is set to True.

Parameters:
  • stop_flag (ValueProxy) – A shared flag to stop the profiling process.

  • local_rank (int) – Local rank of the GPU being profiled.

  • global_rank (int) – Global rank of the process.

  • logger (Logger) – Logger instance to log GPU utilization data.

  • run_id (str | None) – ID of the MLflow run for logging.

  • parent_run_id (str | None) – ID of the parent MLflow run for logging.

  • probing_interval (int) – Interval in seconds between each probing of GPU utilization.

  • warmup_time (int) – Time in seconds to wait before starting the profiling.

itwinai.torch.monitoring.monitoring.measure_gpu_utilization(method: Callable) Callable[source]

Decorator for measuring GPU utilization and storing it to a .csv file.

monitoring.backend

class itwinai.torch.monitoring.backend.GPUBackend[source]

Bases: ABC

abstract property man_lib: ModuleType | None

The library used for GPU management.

abstract property man_type: Literal['nvidia', 'amd'] | None

The type of GPU management library used.

abstract get_handle_by_uuid(gpu_uuid: str) object[source]

Get the device handle for a specific GPU UUID.

abstract get_handle_by_id(gpu_id: int) object[source]

Get the device handle for a specific GPU index (ID).

abstract get_gpu_utilization(handle) float[source]

Get the GPU utilization (%) for a given handle.

abstract get_gpu_power_usage(handle) float[source]

Get the GPU power usage (W) for a given handle.

abstract get_visible_gpu_ids() list[int][source]

Get a list of visible GPU UUIDs.

class itwinai.torch.monitoring.backend.NvidiaBackend[source]

Bases: GPUBackend

property man_lib: ModuleType | None

The library used for GPU management.

property man_type: Literal['nvidia', 'amd'] | None

The type of GPU management library used.

get_handle_by_uuid(gpu_uuid: str) object[source]

Get the device handle for a specific GPU UUID.

get_handle_by_id(gpu_id: int) object[source]

Get the device handle for a specific GPU index (ID).

get_gpu_utilization(handle) float[source]

Get the GPU utilization (%) for a given handle.

get_gpu_power_usage(handle) float[source]

Get the GPU power usage (W) for a given handle.

get_visible_gpu_ids() list[int][source]

Get a list of visible GPU UUIDs.

class itwinai.torch.monitoring.backend.AMDBackend[source]

Bases: GPUBackend

property man_lib: ModuleType | None

The library used for GPU management.

property man_type: Literal['nvidia', 'amd'] | None

The type of GPU management library used.

get_handle_by_uuid(gpu_uuid: str) object[source]

Get the device handle for a specific GPU UUID.

get_handle_by_id(gpu_id: int) object[source]

Get the device handle for a specific GPU index (ID).

get_gpu_utilization(handle) float[source]

Get the GPU utilization (%) for a given handle.

get_gpu_power_usage(handle) float[source]

Get the GPU power usage (W) for a given handle.

get_visible_gpu_ids() list[int][source]

Get a list of visible GPU UUIDs.

itwinai.torch.monitoring.backend.detect_gpu_backend() GPUBackend[source]

Detects the available GPU backend and returns an instance of the corresponding class.

mlflow

itwinai.torch.mlflow.init_lightning_mlflow(pl_config: Dict, default_experiment_name: str = 'Default', tmp_dir: str = '.tmp', **autolog_kwargs) None[source]

Initialize mlflow for pytorch lightning, also setting up auto-logging (mlflow.pytorch.autolog(…)). Creates a new mlflow run and attaches it to the mlflow auto-logger.

Parameters:
  • pl_config (Dict) – pytorch lightning configuration loaded in memory.

  • default_experiment_name (str, optional) – used as experiment name if it is not given in the lightning conf. Defaults to β€˜Default’.

  • tmp_dir (str) – where to temporarily store some artifacts.

  • autolog_kwargs (kwargs) – args for mlflow.pytorch.autolog(…).

itwinai.torch.mlflow.teardown_lightning_mlflow() None[source]

End active mlflow run, if any.

itwinai.torch.mlflow.get_epoch_time_runs_by_parent(mlflow_client: MlflowClient, experiment_id: str, run: Run) List[Run][source]

Get all epoch time runs associated with a given run. This function assumes that the data is in the main worker run of each train run. Which is either: - The main worker run in each trial run of a given tuner run (if Ray was used) - The main worker run of the given training run (if Ray was not used) :param mlflow_client: MLFlow client to use. :type mlflow_client: mlflow.tracking.MlflowClient :param experiment_id: The ID of the experiment to search in. :type experiment_id: str :param run: The run from which to collect epoch runs. :type run: mlflow.entities.Run

Returns:

A list of runs that contain epoch time data associated with the given run.

Return type:

List[Run]

itwinai.torch.mlflow.get_profiling_avg_by_parent(mlflow_client: MlflowClient, experiment_id: str, run: Run) List[DataFrame][source]

Get all worker profiling averages associated with a given run. This function assumes that the worker profiling averages are either: - Nested under the trial runs of a tuner run (if Ray was used) - Nested under the training run (if Ray was not used)

Parameters:
  • mlflow_client (mlflow.tracking.MlflowClient) – MLFlow client to use

  • experiment_id (str) – The ID of the experiment to search in.

  • run (mlflow.entities.Run) – The run from which to collect worker profiling averages.

Returns:

A list of DataFrames containing the worker profiling averages

associated with the given run. Each DataFrame corresponds to a worker run.

Return type:

List[pd.DataFrame]

itwinai.torch.mlflow.get_gpu_runs_by_parent(mlflow_client: MlflowClient, experiment_id: str, run: Run) List[Run][source]

Get all GPU worker runs associated with a given run. This function assumes that the GPU worker runs are either: - Nested under the trial runs of a tuner run (if Ray was used) - Nested under the training run (if Ray was not used)

Parameters:
  • mlflow_client (mlflow.tracking.MlflowClient) – MLFlow client to use.

  • experiment_id (str) – The ID of the experiment to search in.

  • run (mlflow.entities.Run) – The run from which to collect GPU worker runs.

Returns:

A list of runs that are GPU workers associated with the given run.

Return type:

List[Run]

itwinai.torch.mlflow.get_metric_names(run: Run) List[str][source]

Get the names of all metrics logged in a run.

itwinai.torch.mlflow.get_params(run: Run) Dict[str, str][source]

Get the parameters logged in a run.

itwinai.torch.mlflow.get_run_metrics_as_df(mlflow_client: MlflowClient, run: Run, metric_names: List[str] | None = None)[source]

Collect metrics logged in a run and return them as a tidy DataFrame.

Parameters:
  • mlflow_client (mlflow.MlflowClient) – MLFlow client to use.

  • run (mlflow.entities.Run) – The run from which to collect metrics.

  • metric_names (List[str] | None) – If provided, only these metrics will be collected. If None, all metrics will be collected.

Returns:

A DataFrame containing the metrics, with columns:
  • metric_name: the name of the metric

  • sample_idx: the step index of the metric

  • timestamp: the timestamp of the metric

  • value: the value of the metric

  • all parameters logged in the run

Return type:

pd.DataFrame

itwinai.torch.mlflow.get_runs_by_name(mlflow_client: MlflowClient, experiment_id: str, run_names: List[str] | None = None) List[Run][source]

Get all runs in an experiment by their names.

Parameters:
  • mlflow_client (mlflow.tracking.MlflowClient) – MLFlow client to use.

  • experiment_id (str) – The ID of the experiment to search in.

  • run_names (List[str] | None) – The names of the runs to retrieve. If None, all runs in the experiment will be retrieved.

Returns:

A list of runs that match the given names.

Return type:

List[Run]

profiling.profiler

itwinai.torch.profiling.profiler.profile_torch_trainer(method: Callable) Callable[source]

Decorator for execute method for components. Profiles function calls and stores the result for future analysis (e.g. computation vs. other plots).

profiling.py_spy_aggregation

class itwinai.torch.profiling.py_spy_aggregation.StackFrame(*, name: str, path: str, line: str, num_samples: int, proportion: float | None = None, library_function_name: str = 'Not Found', library_function_path: str = 'Not Found', library_function_line: str = 'Not Found')[source]

Bases: BaseModel

Represents a single stack frame in a call stack.

name: str
path: str
line: str
num_samples: int
proportion: float | None
library_function_name: str
library_function_path: str
library_function_line: str
get_aggregation_key() Tuple[str, str, str, str, str, str][source]
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

itwinai.torch.profiling.py_spy_aggregation.add_library_information(stack_traces: List[List[StackFrame]], library_name: str = 'itwinai') None[source]

Iterates through each stack traces in the given list and finds the lowest call with the given library name, if any, and adds it to each dictionary in the trace.

This function changes the given list in-place.

itwinai.torch.profiling.py_spy_aggregation.get_aggregated_paths(stack_frames: List[StackFrame]) List[StackFrame][source]

Aggregate stack frames with identical keys by summing their sample counts.

Two stack frames are considered identical if their get_aggregation_key() values match. Returns a new list of stack frames, each representing a unique key with the corresponding total number of samples.

itwinai.torch.profiling.py_spy_aggregation.parse_trace_line_to_stack_frame(trace_line: str, num_samples: int) StackFrame | None[source]

Parses a single trace line, which contains a function name, file name and line number, to a StackFrame object.

Raises:
  • ValueError – If the given trace line does not conform to the expected structure of

  • "function_name (path/to/function – line_number)”.

itwinai.torch.profiling.py_spy_aggregation.convert_stack_trace_to_list(line: str) List[StackFrame][source]

Converts a single line of the raw profiling output, which contains a stack trace and a number of samples, to a list of StackFrame objects with the data for each line of the stack trace.

Note

The number of samples is the last number in the string, separated from the rest with a space. There can be multiple spaces, however, so we have to extract the last element.

Raises:
  • ValueError – If any of the samples are not numeric.

  • ValueError – If any of the lines in the stack trace are not formatted as expected.

itwinai.torch.profiling.py_spy_aggregation.parse_py_spy_lines(file_lines: List[str]) List[List[StackFrame]][source]

Parse the lines of a py-spy-profiling output and turn it into a list of call stacks, each of which is a list of

Raises:

ValueError – If converting the stack trace to a list fails.

itwinai.torch.profiling.py_spy_aggregation.parse_num_rows(num_rows: str) int | None[source]

Parses the number of rows from a string. Makes sure it is either β€œall” or greater than zero. If it is β€œall” then it returns None.

Raises:
  • ValueError – If num_rows is not numeric or β€œall”.

  • ValueError – If β€˜num_rows` is parsed to a number smaller than one.

itwinai.torch.profiling.py_spy_aggregation.read_stack_traces(path: Path) List[List[StackFrame]][source]

Reads stack traces from a path. The path can either point to a file or a directory. If it points to a directory, it will read all files in this directory and combine them.

Raises:

ValueError – If parsing the lines from one of the files fails.

reproducibility

This module provides the tools to support reproducible execution of torch scripts.

itwinai.torch.reproducibility.seed_worker(worker_id)[source]

Seed DataLoader worker.

itwinai.torch.reproducibility.set_seed(rnd_seed: int | None, deterministic_cudnn: bool = True) Generator[source]

Set torch random seed and return a PRNG object.

Parameters:
  • rnd_seed (Optional[int]) – random seed. If None, the seed is not set.

  • deterministic_cudnn (bool) – if True, sets torch.backends.cudnn.benchmark = False, which may affect performances.

Returns:

PRNG object.

Return type:

torch.Generator

trainer

Provides training logic for PyTorch models via Trainer classes.

class itwinai.torch.trainer.TorchTrainer(config: Dict | TrainingConfiguration, epochs: int, model: Module | None = None, strategy: Literal['ddp', 'deepspeed', 'horovod'] = 'ddp', test_every: int | None = None, random_seed: int | None = None, logger: Logger | None = None, metrics: Dict[str, Metric] | None = None, checkpoints_location: str | Path = 'checkpoints', checkpoint_every: int | None = None, disable_tqdm: bool = False, name: str | None = None, profiling_wait_epochs: int = 0, profiling_warmup_epochs: int = 0, measure_gpu_data: bool = False, enable_torch_profiling: bool = False, store_torch_profiling_traces: bool = False, measure_epoch_time: bool = False, ray_scaling_config: ScalingConfig | None = None, ray_tune_config: TuneConfig | None = None, ray_run_config: RunConfig | None = None, ray_search_space: Dict[str, Any] | None = None, ray_torch_config: TorchConfig | None = None, ray_data_config: DataConfig | None = None, from_checkpoint: Path | str | None = None, initial_best_validation_metric: str = 'inf', run_name: str | None = None, time_ray: bool = False)[source]

Bases: Trainer, LogMixin

Trainer class for torch training algorithms.

Parameters:
  • config (Dict | TrainingConfiguration) – training configuration containing hyperparameters.

  • epochs (int) – number of training epochs.

  • model (nn.Module | None, optional) – pytorch model to train or a string identifier. Defaults to None.

  • strategy (Literal['ddp', 'deepspeed', 'horovod'], optional) – distributed strategy. Defaults to β€˜ddp’.

  • test_every (int | None, optional) – run a test epoch every test_every epochs. Disabled if None. Defaults to None.

  • random_seed (int | None, optional) – set random seed for reproducibility. If None, the seed is not set. Defaults to None.

  • logger (Logger | None, optional) – logger for ML tracking. Defaults to None.

  • metrics (Dict[str, Callable] | None, optional) – map of torchmetrics metrics. Defaults to None.

  • checkpoints_location (str) – path to checkpoints directory. Defaults to β€œcheckpoints”.

  • checkpoint_every (int | None) – save a checkpoint every checkpoint_every epochs. Disabled if None. Defaults to None.

  • disable_tqdm (bool) – whether to disable tqdm progress bar(s).

  • name (str | None, optional) – trainer custom name. Defaults to None.

  • profiling_wait_epochs (int) – how many epochs to wait before starting the profiler.

  • profiling_warmup_epochs (int) – length of the profiler warmup phase in terms of number of epochs.

  • measure_gpu_data (bool) – enable the collection of data on average GPU utilization and total energy consumption throughout training. Defaults to False.

  • enable_torch_profiling (bool) – enable the profiling of computation. It uses the torch profiler and it may slow down training. Defaults to False.

  • measure_epoch_time (bool) – enable the measurement of epoch duration (in seconds). Defaults to False.

  • ray_scaling_config (ScalingConfig, optional) – scaling config for Ray Trainer. Defaults to None.

  • ray_tune_config (TuneConfig, optional) – tune config for Ray Tuner. Defaults to None.

  • ray_run_config (ray.tune.RunConfig, optional) – run config for Ray Tuner. Distributed training with Ray but without HPO will still be wrapped into a Ray Tuner, to keep everything homogeneous. Defaults to None.

  • ray_search_space (Dict[str, Any], optional) – search space for Ray Tuner. Defaults to None.

  • ray_torch_config (TorchConfig, optional) – torch configuration for Ray’s TorchTrainer. Defaults to None.

  • ray_data_config (DataConfig, optional) – dataset configuration for Ray. Defaults to None.

  • from_checkpoint (str | Path, optional) – path to checkpoint directory. Defaults to None.

  • initial_best_validation_metric (str) – initial value for the best validation metric. Usually the validation metric is a loss to be minimized and this value exceeds the highest possible loss value, so that it will be overwritten when the first validation loss is computed. Example values are β€œinf” and β€œ-inf”, depending on wether the best validation metric should be minimized or maximized. Defaults to β€œinf”.

  • run_name (str, optional) – name used to identify a specific run when collecting metrics on the trainer (e.g. GPU utilization). Defaults to None.

  • time_ray (bool) – whether to time and log the execution of Ray functions. Defaults to False.

train_dataloader: DataLoader | None = None

PyTorch DataLoader for training dataset.

validation_dataloader: DataLoader | None = None

PyTorch DataLoader for validation dataset.

test_dataloader: DataLoader | None = None

PyTorch DataLoader for test dataset.

loss: Callable | None = None

Loss criterion.

optimizer: Optimizer | None = None

Optimizer.

lr_scheduler: LRScheduler | None = None

Learning rate scheduler.

torch_rng: Generator | None = None

PyTorch random number generator (PRNG).

train_glob_step: int = 0

Total number training batches used so far, across all epochs.

validation_glob_step: int = 0

Total number validation batches used so far, across all epochs.

test_glob_step: int = 0

Total number test batches used so far, across all epochs.

mlflow_tune_run_id: str | None = None
mlflow_train_run_id: str | None = None
mlflow_worker_run_id: str | None = None
model: Module | None = None

PyTorch model to train.

logger: Logger | None = None

itwinai itwinai.Logger

metrics: Dict[str, Callable]

Dictionary of torchmetrics metrics, indexed by user-defined names.

profiler: Any | None

PyTorch Profiler for computation ratio profiling.

measure_gpu_data: bool = False

Toggle for GPU utilization monitoring

enable_torch_profiling: bool = False

Toggle for computation fraction profiling

store_torch_profiling_traces: bool = False

Store PyTorch Profiling traces

measure_epoch_time: bool = False

Toggle for epoch time tracking

time_ray: bool = False

Toggle for Ray time logging

run_name: str

Run ID

property strategy: TorchDistributedStrategy

Strategy currently in use.

property device: str

Current device from distributed strategy.

get_default_distributed_kwargs() Dict[source]

Gives the default kwargs for the trainer’s strategy’s distributed() method.

create_model_loss_optimizer() None[source]

Instantiate a torch model, loss, optimizer, and LR scheduler using the configuration provided in the Trainer constructor. Generally a user-defined method.

Raises:

ValueError – If self.model is None.

save_checkpoint(name: str, best_validation_metric: Tensor | None = None, checkpoints_root: Path | str | None = None, force: bool = False) str | None[source]

Save training checkpoint.

Parameters:
  • name (str) – name of the checkpoint directory.

  • best_validation_metric (torch.Tensor | None) – best validation metric throughout training so far (if available). Usually this is the validation loss.

  • checkpoints_root (str | None) – path for root checkpoints dir. If None, uses self.checkpoints_location as base.

  • force (bool) – force checkpointing now.

Returns:

path to the checkpoint file or None when the checkpoint is not created.

load_checkpoint() None[source]

Reload training state from checkpoint.

create_dataloaders(train_dataset: Dataset, validation_dataset: Dataset | None = None, test_dataset: Dataset | None = None) None[source]

Create train, validation and test dataloaders using the configuration provided in the Trainer constructor. Generally a user-defined method.

Parameters:
  • train_dataset (Dataset) – training dataset object.

  • validation_dataset (Dataset | None) – validation dataset object. Default None.

  • test_dataset (Dataset | None) – test dataset object. Default None.

execute(train_dataset: Dataset, validation_dataset: Dataset | None = None, test_dataset: Dataset | None = None) Tuple[Dataset, Dataset, Dataset, Any][source]

Prepares distributed environment and data structures for the actual training.

Parameters:
  • train_dataset (Dataset) – training dataset.

  • validation_dataset (Dataset | None, optional) – validation dataset. Defaults to None.

  • test_dataset (Dataset | None, optional) – test dataset. Defaults to None.

Returns:

training dataset, validation dataset, test dataset, trained model.

Return type:

Tuple[Dataset, Dataset, Dataset, Any]

set_epoch() None[source]

Set current epoch at the beginning of training.

log(item: Any | List[Any], identifier: str | List[str], kind: str = 'metric', step: int | None = None, batch_idx: int | None = None, **kwargs) None[source]

Log item with identifier name of kind type at step time step.

Parameters:
  • item (Any | List[Any]) – element to be logged (e.g., metric).

  • identifier (str | List[str]) – unique identifier for the element to log(e.g., name of a metric).

  • kind (str, optional) – type of the item to be logged. Must be one among the list of self.supported_types. Defaults to β€˜metric’.

  • step (int | None, optional) – logging step. Defaults to None.

  • batch_idx (int | None, optional) – DataLoader batch counter (i.e., batch idx), if available. Defaults to None.

ray_report(metrics: Dict[str, float], checkpoint_file: Path | str | None = None, checkpoint_dir: Path | str | None = None, checkpoint_data: Any | None = None) None[source]

Report a dictionary of metrics and optionally a checkpoint to Ray, only when using Ray distributed strategies. The checkpoint could be in the form of a Python object (passed as checkpoint_data), the path to a single file (passed as checkpoint_file), or the path to an existing checkpoint directory (passed as checkpoint_dir).

Parameters:
  • metrics (Dict[str, float]) – metrics to be reported.

  • checkpoint_file (str | Path | None, optional) – path to the checkpoint file. Defaults to None.

  • checkpoint_dir (str | Path | None, optional) – path to the checkpoint directory. Defaults to None.

  • checkpoint_data (Any | None, optional) – object to serialize as a checkpoint. Defaults to None.

compute_metrics(true: Tensor, pred: Tensor, logger_step: int, batch_idx: int | None, stage: str = 'train') Dict[str, Any][source]

Compute and log metrics.

Parameters:
  • true (torch.Tensor) – true values.

  • pred (torch.Tensor) – predicted values.

  • logger_step (int) – global step to pass to the logger.

  • stage (str) – β€˜train’, β€˜validation’…

Returns:

metric values.

Return type:

Dict[str, Any]

train() None[source]

Trains a machine learning model. Main training loop/logic.

Parameters:
  • train_dataset (Dataset) – training dataset.

  • validation_dataset (Dataset) – validation dataset.

  • test_dataset (Dataset) – test dataset.

Returns:

The training dataset. Dataset: The validation dataset. Dataset: The test dataset. Any: The trained model

Return type:

Dataset

train_epoch() Tensor[source]

Perform a complete sweep over the training dataset, completing an epoch of training.

Parameters:

epoch (int) – current epoch number, from 0 to self.epochs - 1.

Returns:

average training loss for the current epoch.

Return type:

torch.Tensor

train_step(batch: Tensor, batch_idx: int) Tuple[Tensor, Dict[str, Any]][source]

Perform a single optimization step using a batch sampled from the training dataset.

Parameters:
  • batch (torch.Tensor) – batch sampled by a dataloader.

  • batch_idx (int) – batch index in the dataloader.

Returns:

The batch loss. Dict[str, Any]: Dictionary of metric values (same structure as self.metrics).

Return type:

torch.Tensor

validation_epoch() Tensor | None[source]

Perform a complete sweep over the validation dataset, completing an epoch of validation.

Returns:

average validation loss for the current epoch if self.validation_dataloader is not None

Return type:

torch.Tensor | None

validation_step(batch: Tensor, batch_idx: int) Tuple[Tensor, Dict[str, Any]][source]

Perform a single optimization step using a batch sampled from the validation dataset.

Parameters:
  • batch (torch.Tensor) – batch sampled by a dataloader.

  • batch_idx (int) – batch index in the dataloader.

Returns:

Batch loss. Dict[str, Any]: Dictionary of metric values (same structure as self.metrics).

Return type:

torch.Tensor

test_epoch() Tensor[source]

Perform a complete sweep over the test dataset, completing an epoch of test.

Returns:

average test loss for the current epoch.

Return type:

torch.Tensor

test_step(batch: Tensor, batch_idx: int) Tuple[Tensor, Dict[str, Any]][source]

Perform a single predictions step using a batch sampled from the test dataset.

Parameters:
  • batch (torch.Tensor) – batch sampled by a dataloader.

  • batch_idx (int) – batch index in the dataloader.

Returns:

The batch loss Dict[str, Any]: Dictionary of metric values (same structure as self.metrics).

Return type:

torch.Tensor

class itwinai.torch.trainer.TorchLightningTrainer(config: Dict | str, mlflow_saved_model: str = 'my_model')[source]

Bases: Trainer

Generic trainer for torch Lightning workflows.

Parameters:
  • config (Dict | str) – Lightning configuration which can be the path to a file or a Python dictionary.

  • mlflow_saved_model (str, optional) – name of the model created in MLFlow. Defaults to β€˜my_model’.

execute() Any[source]

Trains a machine learning model.

Parameters:
  • train_dataset (MLDataset) – training dataset.

  • validation_dataset (MLDataset) – validation dataset.

  • test_dataset (MLDataset) – test dataset.

Returns:

training dataset, validation dataset, test dataset, trained model.

Return type:

Tuple[MLDataset, MLDataset, MLDataset]

tuning

Logic to parse configuration and transform it into Ray objects.

itwinai.torch.tuning.search_space(config: Dict | None) Dict[source]

type

Custom types definition.

itwinai.torch.type.Batch

Torch data batch sampled by a DataLoader.

itwinai.torch.type.Metric

Torch metric function provided by torchmetrics library.

alias of Callable

exception itwinai.torch.type.UninitializedStrategyError[source]

Bases: Exception

Error raised when a strategy has not been initialized.

exception itwinai.torch.type.DistributedStrategyError[source]

Bases: Exception

Error raised when a strategy has already been initialized.