itwinai.torch

config.py

Default configuration

class itwinai.torch.config.Configuration(**extra_data: Any)[source]

Bases: BaseModel

Base configuration class.

model_config: ClassVar[ConfigDict] = {'extra': 'allow'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class itwinai.torch.config.TrainingConfiguration(*, batch_size: int = 32, shuffle_train: bool = False, shuffle_validation: bool = False, shuffle_test: bool = False, pin_gpu_memory: bool = False, num_workers_dataloader: int = 4, loss: Literal['mse', 'nllloss', 'cross_entropy', 'l1', 'l2'] = 'cross_entropy', optimizer: Literal['adadelta', 'adam', 'rmsprop', 'sgd'] = 'adam', optim_lr: float = 0.001, optim_momentum: float = 0.9, optim_weight_decay: float = 0.0, fp16_allreduce: bool = False, use_adasum: bool = False, gradient_predivide_factor: float = 1.0, dist_backend: Literal['nccl', 'gloo', 'mpi'] = 'nccl', **extra_data: Any)[source]

Bases: Configuration

Default configuration object for training. Override and/or create new configurations using the constructor.

Example:

>>> cfg = TrainingConfiguration(batch_size=17, param_a=42)
>>> print(cfg.batch_size)  # returns 17 (overrides default)
>>> print(cfg.param_a)     # returns 42 (new value)
>>> print(cfg.pin_memory)  # returns the default value
>>>
>>> from rich import print
>>> print(cfg)             # pretty-print of configuration

Warning

Don’t reinvent parameters that already exist in the training coniguration, if possible. Instead, use the name of the parameters in the training configuration when possible to avoid inconsistencies. For instance, the training configuration defines the learning rate as optim_lr, so if you redefine it as lr by doing TrainingConfiguration(lr=0.005) in the configuration you will now have both optim_lr (created by default) and lr (created by you). This may create confusion and potentially (and silently) break the logic in your code.

batch_size: int

Batch size. In a distributed environment it is usually the per-worker batch size. Defaults to 32.

shuffle_train: bool

Whether to shuffle train dataset when creating a torch DataLoader. Defaults to False.

shuffle_validation: bool

Whether to shuffle validation dataset when creating a torch DataLoader. Defaults to False.

shuffle_test: bool

Whether to shuffle test dataset when creating a torch DataLoader. Defaults to False.

pin_gpu_memory: bool

Whether to pin GPU memory. Property of torch DataLoader. Defaults to False.

num_workers_dataloader: int

Number of parallel workers used by torch DataLoader. Defaults to 4.

loss: Literal['mse', 'nllloss', 'cross_entropy', 'l1', 'l2']

Loss function. Defaults to β€˜cross_entropy’

optimizer: Literal['adadelta', 'adam', 'rmsprop', 'sgd']

Name of the optimizer to use. Defaults to β€˜adam’.

optim_lr: float

Learning rate used by the optimizer. Defaults to 1e-3.

optim_momentum: float

Momentum used by some optimizers (e.g., SGD). Defaults to 0.9.

optim_weight_decay: float

Weight decay parameter for the optimizer. Defaults to 0.

fp16_allreduce: bool

uses float16 operations in the allreduce distributed gradients aggregation. Better performances at lower precision. Defaults to False.

Type:

Parameter of Horovod’s DistributedOptimizer

use_adasum: bool

use Adasum optimization. Defaults to False.

Type:

Parameter of Horovod’s DistributedOptimizer

gradient_predivide_factor: float

scale gradients before adding them up. Defaults to 1.0.

Type:

Parameter of Horovod’s DistributedOptimizer

dist_backend: Literal['nccl', 'gloo', 'mpi']

Torch distributed backend. Defaults to nccl.

model_config: ClassVar[ConfigDict] = {'extra': 'allow'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

distributed.py

itwinai.torch.distributed.distributed_resources_available() bool[source]

Check if the current execution environment has (enough) GPUs available to allow for distributed ML.

Returns:

env can support distributed ML.

Return type:

bool

itwinai.torch.distributed.check_initialized(method: Callable) Callable[source]

Decorator for strategy methods to check whether the strategy was correctly initialized before calling the method.

itwinai.torch.distributed.initialize_ray() None[source]

This method is used by the RayDDPStrategy and RayDeepSpeedStrategy to initialize the Ray backend if it is not already initialized.

Raises:
EnvironmentError: If required environment variables HEAD_NODE_PORT or

HEAD_NODE_IP are not set. These should be set from the slurm script where the ray cluster is launched.

class itwinai.torch.distributed.TorchDistributedStrategy[source]

Bases: DistributedStrategy

Abstract class to define the distributed backend methods for PyTorch models.

is_distributed: bool = True

Allows to discriminate distributed strategies from non-distributed. Defaults to True.

is_initialized: bool = False

Set to True when the current strategy is initialized. Defaults to False.

name: str
property is_main_worker: bool

Checks if local worker has global rank equal to zero.

Returns:

True if main worker.

Return type:

bool

abstract init() None[source]

Initializes the chosen distributed backend

abstract distributed(model: Module, optimizer: Optimizer, lr_scheduler: _LRScheduler | None = None) Tuple[Module, Optimizer, _LRScheduler | None][source]

Setup model, optimizer and scheduler for distributed.

abstract global_world_size() int[source]

Returns the total number of processes (global world size).

Returns:

global world size.

Return type:

int

abstract local_world_size() int[source]

Returns the number of local workers available on a node (local world size). Usually it is equal to the number of available GPUs.

Returns:

local world size.

Return type:

int

abstract global_rank() int[source]

Returns the global rank of the current process. Rank ranges from 0 to world_size.

Returns:

global rank.

Return type:

int

abstract local_rank() int[source]

Returns the local rank of the current process.

Returns:

local rank.

Return type:

int

device() str[source]

Device used by local worker.

Returns:

torch device in the form β€˜device:N’ (e.g., β€˜cuda:0’, β€˜cpu’).

Return type:

str

set_device()[source]

Set local device.

create_dataloader(dataset: Dataset[T_co], batch_size: int | None = 1, shuffle: bool | None = None, sampler: Sampler | Iterable | None = None, batch_sampler: Sampler[List] | Iterable[List] | None = None, num_workers: int = 0, collate_fn: Callable[[List[T]], Any] | None = None, pin_memory: bool = False, drop_last: bool = False, timeout: float = 0, worker_init_fn: Callable[[int], None] | None = None, multiprocessing_context=None, generator=None, *, prefetch_factor: int | None = None, persistent_workers: bool = False, pin_memory_device: str = '')[source]

Create a distributed DataLoader by using DistributedSampler as random sampler.

Parameters:
  • dataset (Dataset) – dataset from which to load the data.

  • batch_size (int, optional) – how many samples per batch to load (default: 1).

  • shuffle (bool, optional) – set to True to have the data reshuffled at every epoch (default: False).

  • sampler (Sampler or Iterable, optional) – defines the strategy to draw samples from the dataset. Can be any Iterable with __len__ implemented. If specified, shuffle must not be specified.

  • batch_sampler (Sampler or Iterable, optional) – like sampler, but returns a batch of indices at a time. Mutually exclusive with batch_size, shuffle, sampler, and drop_last.

  • num_workers (int, optional) – how many subprocesses to use for data loading. 0 means that the data will be loaded in the main process. (default: 0)

  • collate_fn (Callable, optional) – merges a list of samples to form a mini-batch of Tensor(s). Used when using batched loading from a map-style dataset.

  • pin_memory (bool, optional) – If True, the data loader will copy Tensors into device/CUDA pinned memory before returning them. If your data elements are a custom type, or your collate_fn returns a batch that is a custom type, see the example below.

  • drop_last (bool, optional) – set to True to drop the last incomplete batch, if the dataset size is not divisible by the batch size. If False and the size of dataset is not divisible by the batch size, then the last batch will be smaller. (default: False)

  • timeout (numeric, optional) – if positive, the timeout value for collecting a batch from workers. Should always be non-negative. (default: 0)

  • worker_init_fn (Callable, optional) – If not None, this will be called on each worker subprocess with the worker id (an int in [0, num_workers - 1]) as input, after seeding and before data loading. (default: None)

  • or (multiprocessing_context (str) – multiprocessing.context.BaseContext, optional): If None, the default multiprocessing context of your operating system will be used. (default: None)

  • generator (torch.Generator, optional) – If not None, this RNG will be used by RandomSampler to generate random indexes and multiprocessing to generate base_seed for workers. (default: None)

  • prefetch_factor (int, optional, keyword-only arg) – Number of batches loaded in advance by each worker. 2 means there will be a total of 2 * num_workers batches prefetched across all workers. (default value depends on the set value for num_workers. If value of num_workers=0 default is None. Otherwise, if value of num_workers > 0 default is 2).

  • persistent_workers (bool, optional) – If True, the data loader will not shut down the worker processes after a dataset has been consumed once. This allows to maintain the workers Dataset instances alive. (default: False)

  • pin_memory_device (str, optional) – the device to pin_memory to if pin_memory is True.

Raises:
  • UninitializedStrategyError – when this method is called for a strategy which had not been initialized.

  • RuntimeError – when a user-provided sampler, if given, is not of type DistributedSampler.

Warning

If the spawn start method is used, worker_init_fn cannot be an unpicklable object, e.g., a lambda function. See Multiprocessing best practices on more details related to multiprocessing in PyTorch.

Warning

len(dataloader) heuristic is based on the length of the sampler used. When dataset is an IterableDataset, it instead returns an estimate based on len(dataset) / batch_size, with proper rounding depending on drop_last, regardless of multi-process loading configurations. This represents the best guess PyTorch can make because PyTorch trusts user dataset code in correctly handling multi-process loading to avoid duplicate data.

However, if sharding results in multiple workers having incomplete last batches, this estimate can still be inaccurate, because (1) an otherwise complete batch can be broken into multiple ones and (2) more than one batch worth of samples can be dropped when drop_last is set. Unfortunately, PyTorch can not detect such cases in general.

See Dataset Types for more details on these two types of datasets and how IterableDataset interacts with Multi-process data loading.

abstract clean_up() None[source]

Cleans up resources allocated by distributed strategy.

abstract allgather_obj(obj: Any) List[Any][source]

All-gathers any object from the whole group in a list (to all workers).

Parameters:

obj (Any) – object to gather from all workers.

Returns:

list of objects gathered from all workers.

Return type:

List[Any]

abstract gather_obj(obj: Any, dst_rank: int = 0) List[Any][source]

Gathers any object from the whole group in a list (to all workers).

Parameters:
  • obj (Any) – object to gather from all workers.

  • dst_rank (int) – rank of the worker on which the objects list are gathered.

Returns:

list of objects gathered from all workers.

Return type:

List[Any]

abstract gather(tensor: Tensor, dst_rank: int = 0) List | None[source]

Gathers any object from the whole group in a list (to all workers).

Parameters:
  • obj (Any) – object to gather from all workers.

  • dst_rank (int) – rank of the worker on which the objects list are gathered.

Returns:

list of objects gathered from all workers if main

worker, otherwise return None.

Return type:

Optional[List[Any]]

class itwinai.torch.distributed.TorchDDPStrategy(backend: Literal['nccl', 'gloo', 'mpi'])[source]

Bases: TorchDistributedStrategy

PyTorch DistributedDataParallel distributed strategy class.

Parameters:

backend (Literal['nccl', 'gloo', 'mpi']) – Name of the distributed communication backend to employ.

backend: Literal['nccl', 'gloo', 'mpi']

Torch distributed communication backend.

init() None[source]

Initializes the distributed process group and the distributed package.

Raises:
  • RuntimeError – when there are not (enough) GPUs available.

  • DistributedStrategyError – when trying to initialize a strategy which is already initialized.

distributed(model: Module, optimizer: Optimizer, lr_scheduler: _LRScheduler | None = None, find_unused_parameters: bool = False, **kwargs) Tuple[Module, Optimizer, _LRScheduler | None][source]

Setup model, optimizer and scheduler for distributed.

global_world_size() int[source]

Returns the total number of processes (global world size).

Returns:

global world size.

Return type:

int

local_world_size() int[source]

Returns the local number of workers available per node, which is usually the number of GPUs available.

Returns:

local world size.

Return type:

int

Raises:

RuntimeError – when the local world size cannot be retrieved.

global_rank() int[source]

Returns the global rank of the current process, where rank ranges from 0 to world_size.

Returns:

global rank.

Return type:

int

local_rank() int[source]

Returns the local rank of the current process.

Returns:

local rank.

Return type:

int

clean_up() None[source]

Destroys the current process group.

allgather_obj(obj: Any) List[Any][source]

All-gathers any object from the whole group in a list (to all workers).

Parameters:

obj (Any) – Object to gather from all workers.

Returns:

List of gathered objects.

Return type:

List[Any]

gather_obj(obj: Any, dst_rank: int = 0) List[Any] | None[source]

Gathers any object from the whole group in a list (to all workers).

Parameters:
  • obj (Any) – object to gather from all workers.

  • dst_rank (int) – rank of the worker on which the objects list are gathered.

Returns:

list of objects gathered from all workers or None on non-destination ranks.

Return type:

Optional[List[Any]]

gather(tensor: Tensor, dst_rank: int = 0) List | None[source]

Gathers any object from the whole group in a list (to all workers).

Parameters:
  • obj (Any) – object to gather from all workers.

  • dst_rank (int) – rank of the worker on which the objects list are gathered.

Returns:

list of objects gathered from all workers if main

worker, otherwise return None.

Return type:

Optional[List[Any]]

class itwinai.torch.distributed.DeepSpeedStrategy(backend: Literal['nccl', 'gloo', 'mpi'])[source]

Bases: TorchDistributedStrategy

DeepSpeed distributed strategy class.

Parameters:
  • backend (Literal['nccl', 'gloo', 'mpi']) – Name of the distributed communication backend to employ.

  • config (Union[dict, Path, str]) – DeepSpeed config. Either a dictionary or a path to a JSON file.

backend: Literal['nccl', 'gloo', 'mpi']

Torch distributed communication backend.

init() None[source]

Initializes the distributed process group and the distributed package.

Raises:
  • RuntimeError – when there are not (enough) GPUs available.

  • DistributedStrategyError – when trying to initialize a strategy already initialized.

distributed(model: Module, optimizer: Optimizer | None = None, lr_scheduler: _LRScheduler | None = None, model_parameters: Any | None = None, **init_kwargs) Tuple[Module, Optimizer, _LRScheduler | None][source]

Setup model, optimizer and scheduler for distributed.

global_world_size() int[source]

Returns the total number of processes (global world size).

Returns:

global world size.

Return type:

int

local_world_size() int[source]

Returns the local number of workers available per node, which is usually the number of GPUs available.

Returns:

local world size.

Return type:

int

Raises:

RuntimeError – when the local world size cannot be retrieved.

global_rank() int[source]

Returns the global rank of the current process, where rank ranges from 0 to world_size.

Returns:

global rank.

Return type:

int

local_rank() int[source]

Returns the local rank of the current process.

Returns:

local rank.

Return type:

int

clean_up() None[source]

Destroys the current process group.

allgather_obj(obj: Any) List[Any][source]

All-gathers any object from the whole group in a list (to all workers).

Parameters:

obj (Any) – Object to gather from all workers.

Returns:

List of gathered objects.

Return type:

List[Any]

gather_obj(obj: Any, dst_rank: int = 0) List[Any] | None[source]

Gathers any object from the whole group in a list (to all workers).

Parameters:
  • obj (Any) – object to gather from all workers.

  • dst_rank (int) – rank of the worker on which the objects list are gathered.

Returns:

list of objects gathered from all workers or None on non-destination ranks.

Return type:

Optional[List[Any]]

gather(tensor: Tensor, dst_rank: int = 0) List[Tensor] | None[source]

Gathers a tensor from the whole group in a list (to all workers).

Parameters:
  • obj (Any) – object to gather from all workers.

  • dst_rank (int) – rank of the worker on which the objects list are gathered.

Returns:

list of tensors gathered from all workers or None on non-destination ranks.

Return type:

Optional[List[torch.Tensor]]

class itwinai.torch.distributed.HorovodStrategy[source]

Bases: TorchDistributedStrategy

Horovod distributed strategy class.

init() None[source]

Initializes the Horovod distributed backend.

Raises:
  • RuntimeError – when there are not (enough) GPUs available.

  • DistributedStrategyError – when trying to initialize a strategy already initialized.

distributed(model: Module, optimizer: Optimizer | None = None, lr_scheduler: _LRScheduler | None = None, **optim_kwargs) Tuple[Module, Optimizer, _LRScheduler | None][source]

Setup model, optimizer and scheduler for distributed.

global_world_size() int[source]

Returns the total number of processes (global world size).

Returns:

global world size.

Return type:

int

local_world_size() int[source]

Returns the local number of workers available per node, which is usually the number of GPUs available.

Returns:

local world size.

Return type:

int

global_rank() int[source]

Returns the global rank of the current process, where rank ranges from 0 to world_size.

Returns:

global rank.

Return type:

int

local_rank() int[source]

Returns the local rank of the current process.

Returns:

local rank.

Return type:

int

clean_up() None[source]

Shuts Horovod down.

allgather_obj(obj: Any) list[Any][source]

All-gathers any object from the whole group in a list (to all workers).

Parameters:

obj (Any) – Object to gather from all workers.

Returns:

List of gathered objects.

Return type:

List[Any]

gather_obj(obj: Any, dst_rank: int = 0) list[Any] | None[source]

Gathers any object from the whole group in a list (to all workers). Under the hood it relies on allgather as gather is not supported by Horovod.

Parameters:
  • obj (Any) – object to gather from all workers.

  • dst_rank (int) – rank of the worker on which the objects list are gathered.

Returns:

list of objects gathered from all workers or None on non-destination ranks.

Return type:

Optional[List[Any]]

gather(tensor: Tensor, dst_rank: int = 0) List[Tensor] | None[source]

Gathers a tensor from the whole group in a list (to all workers). Under the hood it relies on allgather as gather is not supported by Horovod.

Parameters:
  • obj (Any) – object to gather from all workers.

  • dst_rank (int) – rank of the worker on which the objects list are gathered.

Returns:

list of tensors gathered from all workers or None on non-destination ranks.

Return type:

Optional[List[torch.Tensor]]

class itwinai.torch.distributed.NonDistributedStrategy[source]

Bases: TorchDistributedStrategy

Dummy class for non-distributed environments.

is_distributed: bool = False

This strategy is not distributed. Defaults to False.

init() None[source]

If CUDA is available set CUDA device, and do nothing more.

Raises:

DistributedStrategyError – when trying to initialize a strategy already initialized.

distributed(model: Module, optimizer: Optimizer | None = None, lr_scheduler: _LRScheduler | None = None, **kwargs) Tuple[Module, Optimizer, _LRScheduler | None][source]

Do nothing and return model, optimizer and scheduler.

global_world_size() int[source]

Returns the total number of processes (global world size).

Returns:

global world size.

Return type:

int

local_world_size() int[source]

Returns the local number of workers available per node, which is usually the number of GPUs available.

Returns:

local world size.

Return type:

int

global_rank() int[source]

Returns the global rank of the current process, where rank ranges from 0 to world_size.

Returns:

global rank.

Return type:

int

local_rank() int[source]

Returns the local rank of the current process.

Returns:

local rank.

Return type:

int

clean_up() None[source]

Do nothing.

allgather_obj(obj: Any) list[Any][source]

Wraps obj into a List object.

Parameters:

obj (Any) – object in a worker.

Returns:

input object wrapped in a list.

Return type:

list[Any]

gather_obj(obj: Any, dst_rank: int = 0) list[Any][source]

Wraps obj into a List object.

Parameters:
  • obj (Any) – object in a worker.

  • dst_rank (int) – ignored.

Returns:

input object wrapped in a list.

Return type:

list[Any]

gather(tensor: Tensor, dst_rank: int = 0)[source]

Wraps tensor into a List object.

Parameters:
  • tensor (Any) – object in a worker.

  • dst_rank (int) – ignored.

Returns:

input object wrapped in a list.

Return type:

list[Any]

class itwinai.torch.distributed.RayDDPStrategy[source]

Bases: TorchDDPStrategy

A distributed data-parallel (DDP) strategy using Ray Train for PyTorch training.

init() None[source]

Initializes the distributed process group and the distributed package.

Raises:
  • RuntimeError – when there are not (enough) GPUs available.

  • DistributedStrategyError – when trying to initialize a strategy which is already initialized.

global_world_size() int[source]

Returns the total number of processes (global world size).

Returns:

global world size.

Return type:

int

local_world_size() int[source]

Returns the local number of workers available per node, which is usually the number of GPUs available.

Returns:

local world size.

Return type:

int

Raises:

RuntimeError – when the local world size cannot be retrieved.

global_rank() int[source]

Returns the global rank of the current process, where rank ranges from 0 to world_size.

Returns:

global rank.

Return type:

int

local_rank() int[source]

Returns the local rank of the current process.

Returns:

local rank.

Return type:

int

distributed(model: Module, optimizer: Optimizer, lr_scheduler: _LRScheduler | None = None) Tuple[Module, Optimizer, _LRScheduler | None][source]

Setup model, optimizer and scheduler for distributed.

create_dataloader(dataset: Dataset[T_co], batch_size: int | None = 1, shuffle: bool | None = None, sampler: Sampler | Iterable | None = None, batch_sampler: Sampler[List] | Iterable[List] | None = None, num_workers: int = 0, collate_fn: Callable[[List[T]], Any] | None = None, pin_memory: bool = False, drop_last: bool = False, timeout: float = 0, worker_init_fn: Callable[[int], None] | None = None, multiprocessing_context=None, generator=None, *, prefetch_factor: int | None = None, persistent_workers: bool = False, pin_memory_device: str = '')[source]

Create a distributed DataLoader by using DistributedSampler as random sampler.

Parameters:
  • dataset (Dataset) – dataset from which to load the data.

  • batch_size (int, optional) – how many samples per batch to load (default: 1).

  • shuffle (bool, optional) – set to True to have the data reshuffled at every epoch (default: False).

  • sampler (Sampler or Iterable, optional) – defines the strategy to draw samples from the dataset. Can be any Iterable with __len__ implemented. If specified, shuffle must not be specified.

  • batch_sampler (Sampler or Iterable, optional) – like sampler, but returns a batch of indices at a time. Mutually exclusive with batch_size, shuffle, sampler, and drop_last.

  • num_workers (int, optional) – how many subprocesses to use for data loading. 0 means that the data will be loaded in the main process. (default: 0)

  • collate_fn (Callable, optional) – merges a list of samples to form a mini-batch of Tensor(s). Used when using batched loading from a map-style dataset.

  • pin_memory (bool, optional) – If True, the data loader will copy Tensors into device/CUDA pinned memory before returning them. If your data elements are a custom type, or your collate_fn returns a batch that is a custom type, see the example below.

  • drop_last (bool, optional) – set to True to drop the last incomplete batch, if the dataset size is not divisible by the batch size. If False and the size of dataset is not divisible by the batch size, then the last batch will be smaller. (default: False)

  • timeout (numeric, optional) – if positive, the timeout value for collecting a batch from workers. Should always be non-negative. (default: 0)

  • worker_init_fn (Callable, optional) – If not None, this will be called on each worker subprocess with the worker id (an int in [0, num_workers - 1]) as input, after seeding and before data loading. (default: None)

  • or (multiprocessing_context (str) – multiprocessing.context.BaseContext, optional): If None, the default multiprocessing context of your operating system will be used. (default: None)

  • generator (torch.Generator, optional) – If not None, this RNG will be used by RandomSampler to generate random indexes and multiprocessing to generate base_seed for workers. (default: None)

  • prefetch_factor (int, optional, keyword-only arg) – Number of batches loaded in advance by each worker. 2 means there will be a total of 2 * num_workers batches prefetched across all workers. (default value depends on the set value for num_workers. If value of num_workers=0 default is None. Otherwise, if value of num_workers > 0 default is 2).

  • persistent_workers (bool, optional) – If True, the data loader will not shut down the worker processes after a dataset has been consumed once. This allows to maintain the workers Dataset instances alive. (default: False)

  • pin_memory_device (str, optional) – the device to pin_memory to if pin_memory is True.

Raises:
  • UninitializedStrategyError – when this method is called for a strategy which had not been initialized.

  • RuntimeError – when a user-provided sampler, if given, is not of type DistributedSampler.

Warning

If the spawn start method is used, worker_init_fn cannot be an unpicklable object, e.g., a lambda function. See Multiprocessing best practices on more details related to multiprocessing in PyTorch.

Warning

len(dataloader) heuristic is based on the length of the sampler used. When dataset is an IterableDataset, it instead returns an estimate based on len(dataset) / batch_size, with proper rounding depending on drop_last, regardless of multi-process loading configurations. This represents the best guess PyTorch can make because PyTorch trusts user dataset code in correctly handling multi-process loading to avoid duplicate data.

However, if sharding results in multiple workers having incomplete last batches, this estimate can still be inaccurate, because (1) an otherwise complete batch can be broken into multiple ones and (2) more than one batch worth of samples can be dropped when drop_last is set. Unfortunately, PyTorch can not detect such cases in general.

See Dataset Types for more details on these two types of datasets and how IterableDataset interacts with Multi-process data loading.

class itwinai.torch.distributed.RayDeepSpeedStrategy(backend: Literal['nccl', 'gloo', 'mpi'])[source]

Bases: DeepSpeedStrategy

A distributed strategy using Ray and DeepSpeed for PyTorch training.

Parameters:

backend (Literal["nccl", "gloo", "mpi"]) – The backend for distributed communication.

inference.py

class itwinai.torch.inference.TorchModelLoader(model_uri: str)[source]

Bases: ModelLoader

Loads a torch model from somewhere.

Parameters:

model_uri (str) – Can be a path on local filesystem or an mlflow β€˜locator’ in the form: β€˜mlflow+MLFLOW_TRACKING_URI+RUN_ID+ARTIFACT_PATH’

class itwinai.torch.inference.TorchPredictor(model: Module | ModelLoader, test_dataloader_class: str = 'torch.utils.data.DataLoader', test_dataloader_kwargs: Dict | None = None, name: str | None = None)[source]

Bases: Predictor

Applies a pre-trained torch model to unseen data.

test_dataset: Dataset

Dataset on which to make predictions (ML inference).

test_dataloader: DataLoader = None

DataLoader for test dataset.

model: Module = None

Pre-trained PyTorch model used to make predictions.

execute(test_dataset: Dataset, model: Module = None) Dict[str, Any][source]

Applies a torch model to a dataset for inference.

Parameters:
  • test_dataset (Dataset[str, Any]) – each item in this dataset is a couple (item_unique_id, item)

  • model (nn.Module, optional) – torch model. Overrides the existing model, if given. Defaults to None.

Returns:

maps each item ID to the corresponding predicted

value(s).

Return type:

Dict[str, Any]

abstract transform_predictions(batch: Tensor) Tensor[source]

Post-process the predictions of the torch model (e.g., apply threshold in case of multi-label classifier).

class itwinai.torch.inference.MulticlassTorchPredictor(model: Module | ModelLoader, test_dataloader_class: str = 'torch.utils.data.DataLoader', test_dataloader_kwargs: Dict | None = None, name: str | None = None)[source]

Bases: TorchPredictor

Applies a pre-trained torch model to unseen data for multiclass classification.

transform_predictions(batch: Tensor) Tensor[source]

Post-process the predictions of the torch model (e.g., apply threshold in case of multi-label classifier).

class itwinai.torch.inference.MultilabelTorchPredictor(model: Module | ModelLoader, test_dataloader_class: str = 'torch.utils.data.DataLoader', test_dataloader_kwargs: Dict | None = None, threshold: float = 0.5, name: str | None = None)[source]

Bases: TorchPredictor

Applies a pre-trained torch model to unseen data for multilabel classification, applying a threshold on the output of the neural network.

threshold: float = 0.5

Threshold to transform probabilities into class predictions. Defaults to 0.5.

transform_predictions(batch: Tensor) Tensor[source]

Post-process the predictions of the torch model (e.g., apply threshold in case of multi-label classifier).

class itwinai.torch.inference.RegressionTorchPredictor(model: Module | ModelLoader, test_dataloader_class: str = 'torch.utils.data.DataLoader', test_dataloader_kwargs: Dict | None = None, name: str | None = None)[source]

Bases: TorchPredictor

Applies a pre-trained torch model to unseen data for regression, leaving untouched the output of the neural network.

transform_predictions(batch: Tensor) Tensor[source]

Post-process the predictions of the torch model (e.g., apply threshold in case of multi-label classifier).

loggers.py

class itwinai.torch.loggers.ItwinaiLogger(itwinai_logger: Logger, log_model: Literal['all'] | bool = False, skip_finalize: bool = False)[source]

Bases: Logger

Adapter between PyTorch Lightning logger and itwinai logger.

This adapter forwards logging calls from PyTorch Lightning to the itwinai Logger instance, using the itwinai Logger’s log method. It supports the lightning logging of metrics, hyperparameters, and checkpoints. Additionally, any function calls can be forwarded to the itwinai logger instance though the experiment property of this Adapter.

property name: str | None

Return the experiment name.

property version: int | str | None

Return the experiment version.

property save_dir: str | None

Return the directory where the logs are stored.

property experiment: Logger

Lightning Logger function. Initializes and returns the itwinai Logger context for experiment tracking.

Returns:

The itwinai logger instance.

Return type:

Logger

finalize(status: str) None[source]

Lightning Logger function. Logs any remaining checkpoints and closes the logger context.

Parameters:
  • status (str) – Describes the status of the training (e.g., β€˜completed’, β€˜failed’).

  • classes' (The status is not needed for this function but part of the parent) – (LightningLogger)

  • signature (finalize functions)

  • here. (and therefore must be propagated)

log_metrics(metrics: Dict[str, float], step: int | None = None) None[source]

Lightning Logger function. Logs the given metrics and is usually called by the Lightning Trainer.

Parameters:
  • metrics (Dict[str, float]) – Dictionary of metrics to log.

  • step (Optional[int], optional) – Training step associated with the metrics. Defaults to None.

log_hyperparams(params: Dict[str, Any] | Namespace) None[source]

Lightning Logger function. Logs hyperparameters for the experiment.

Parameters:

params (Union[Dict[str, Any], Namespace]) – Hyperparameters dictionary or object.

after_save_checkpoint(checkpoint_callback: ModelCheckpoint) None[source]

Lightning Logger function. Handles checkpoint saving to the logger after the ModelCheckpoint Callback of the Lightning Trainer is called. The checkpoints are logged as artifacts.

Parameters:

checkpoint_callback (ModelCheckpoint) – Callback instance to manage checkpointing.

mlflow.py

itwinai.torch.mlflow.init_lightning_mlflow(pl_config: Dict, default_experiment_name: str = 'Default', tmp_dir: str = '.tmp', **autolog_kwargs) None[source]

Initialize mlflow for pytorch lightning, also setting up auto-logging (mlflow.pytorch.autolog(…)). Creates a new mlflow run and attaches it to the mlflow auto-logger.

Parameters:
  • pl_config (Dict) – pytorch lightning configuration loaded in memory.

  • default_experiment_name (str, optional) – used as experiment name if it is not given in the lightning conf. Defaults to β€˜Default’.

  • tmp_dir (str) – where to temporarily store some artifacts.

  • autolog_kwargs (kwargs) – args for mlflow.pytorch.autolog(…).

itwinai.torch.mlflow.teardown_lightning_mlflow() None[source]

End active mlflow run, if any.

reproducibility.py

This module provides the tools to support reproducible execution of torch scripts.

itwinai.torch.reproducibility.seed_worker(worker_id)[source]

Seed DataLoader worker.

itwinai.torch.reproducibility.set_seed(rnd_seed: int | None, deterministic_cudnn: bool = True) Generator[source]

Set torch random seed and return a PRNG object.

Parameters:
  • rnd_seed (Optional[int]) – random seed. If None, the seed is not set.

  • deterministic_cudnn (bool) – if True, sets torch.backends.cudnn.benchmark = False, which may affect performances.

Returns:

PRNG object.

Return type:

torch.Generator

type.py

Custom types definition.

itwinai.torch.type.LrScheduler

Torch learning rate scheduler

itwinai.torch.type.Batch

Torch data batch sampled by a DataLoader.

itwinai.torch.type.Metric

Torch metric function provided by torchmetrics library.

alias of Callable

exception itwinai.torch.type.UninitializedStrategyError[source]

Bases: Exception

Error raised when a strategy has not been initialized.

exception itwinai.torch.type.DistributedStrategyError[source]

Bases: Exception

Error raised when a strategy has already been initialized.

trainer.py

Provides training logic for PyTorch models via Trainer classes.

class itwinai.torch.trainer.TorchTrainer(config: Dict | TrainingConfiguration, epochs: int, model: Module | str | None = None, strategy: Literal['ddp', 'deepspeed', 'horovod'] | None = 'ddp', validation_every: int | None = 1, test_every: int | None = None, random_seed: int | None = None, logger: Logger | None = None, metrics: Dict[str, Callable] | None = None, checkpoints_location: str = 'checkpoints', checkpoint_every: int | None = None, disable_tqdm: bool = False, name: str | None = None, profiling_wait_epochs: int = 1, profiling_warmup_epochs: int = 2)[source]

Bases: Trainer, LogMixin

Trainer class for torch training algorithms.

Parameters:
  • config (Union[Dict, TrainingConfiguration]) – training configuration containing hyperparameters.

  • epochs (int) – number of training epochs.

  • model (Optional[Union[nn.Module, str]], optional) – pytorch model to train or a string identifier. Defaults to None.

  • strategy (Literal['ddp', 'deepspeed', 'horovod'], optional) – distributed strategy. Defaults to β€˜ddp’.

  • validation_every (Optional[int], optional) – run a validation epoch every validation_every epochs. Disabled if None. Defaults to 1.

  • test_every (Optional[int], optional) – run a test epoch every test_every epochs. Disabled if None. Defaults to None.

  • random_seed (Optional[int], optional) – set random seed for reproducibility. If None, the seed is not set. Defaults to None.

  • logger (Optional[Logger], optional) – logger for ML tracking. Defaults to None.

  • metrics (Optional[Dict[str, Metric]], optional) – map of torchmetrics metrics. Defaults to None.

  • checkpoints_location (str) – path to checkpoints directory. Defaults to β€œcheckpoints”.

  • checkpoint_every (Optional[int]) – save a checkpoint every checkpoint_every epochs. Disabled if None. Defaults to None.

  • disable_tqdm (bool) – whether to disable tqdm progress bar(s).

  • name (Optional[str], optional) – trainer custom name. Defaults to None.

  • profiling_wait_epochs (int) – how many epochs to wait before starting the profiler.

  • profiling_warmup_epochs (int) – length of the profiler warmup phase in terms of number of epochs.

train_dataloader: DataLoader = None

PyTorch DataLoader for training dataset.

validation_dataloader: DataLoader = None

PyTorch DataLoader for validation dataset.

test_dataloader: DataLoader = None

PyTorch DataLoader for test dataset.

loss: Callable = None

Loss criterion.

optimizer: Optimizer = None

Optimizer.

lr_scheduler: Module = None

Learning rate scheduler.

torch_rng: Generator = None

PyTorch random number generator (PRNG).

train_glob_step: int = 0

Total number training batches used so far, across all epochs.

validation_glob_step: int = 0

Total number validation batches used so far, across all epochs.

test_glob_step: int = 0

Total number test batches used so far, across all epochs.

model: Module = None

PyTorch model to train.

logger: Logger = None

itwinai itwinai.Logger

metrics: Dict[str, Callable]

Dictionary of torchmetrics metrics, indexed by user-defined names.

profiler: Any | None

PyTorch Profiler for communication vs. computation comparison

property strategy: TorchDistributedStrategy

Strategy currently in use.

property device: str

Current device from distributed strategy.

get_default_distributed_kwargs() Dict[source]

Gives the default kwargs for the trainer’s strategy’s distributed() method.

create_model_loss_optimizer() None[source]

Instantiate a torch model, loss, optimizer, and LR scheduler using the configuration provided in the Trainer constructor. Generally a user-defined method.

create_dataloaders(train_dataset: Dataset, validation_dataset: Dataset | None = None, test_dataset: Dataset | None = None) None[source]

Create train, validation and test dataloaders using the configuration provided in the Trainer constructor. Generally a user-defined method.

Parameters:
  • train_dataset (Dataset) – training dataset object.

  • validation_dataset (Optional[Dataset]) – validation dataset object. Default None.

  • test_dataset (Optional[Dataset]) – test dataset object. Default None.

execute(train_dataset: Dataset, validation_dataset: Dataset | None = None, test_dataset: Dataset | None = None) Tuple[Dataset, Dataset, Dataset, Any][source]

Prepares distributed environment and data structures for the actual training.

Parameters:
  • train_dataset (Dataset) – training dataset.

  • validation_dataset (Optional[Dataset], optional) – validation dataset. Defaults to None.

  • test_dataset (Optional[Dataset], optional) – test dataset. Defaults to None.

Returns:

training dataset, validation dataset, test dataset, trained model.

Return type:

Tuple[Dataset, Dataset, Dataset, Any]

set_epoch(epoch: int) None[source]

Set current epoch at the beginning of training.

Parameters:

epoch (int) – epoch number, from 0 to epochs-1.

log(item: Any | List[Any], identifier: str | List[str], kind: str = 'metric', step: int | None = None, batch_idx: int | None = None, **kwargs) None[source]

Log item with identifier name of kind type at step time step.

Parameters:
  • item (Union[Any, List[Any]]) – element to be logged (e.g., metric).

  • identifier (Union[str, List[str]]) – unique identifier for the element to log(e.g., name of a metric).

  • kind (str, optional) – type of the item to be logged. Must be one among the list of self.supported_types. Defaults to β€˜metric’.

  • step (Optional[int], optional) – logging step. Defaults to None.

  • batch_idx (Optional[int], optional) – DataLoader batch counter (i.e., batch idx), if available. Defaults to None.

save_checkpoint(name: str, epoch: int, loss: Tensor | None = None) None[source]

Save training checkpoint.

Parameters:
  • name (str) – name of the checkpoint.

  • epoch (int) – current training epoch.

  • loss (Optional[torch.Tensor]) – current loss (if available).

load_checkpoint(name: str) None[source]

Load state from a checkpoint.

Parameters:

name (str) – name of the checkpoint to load, assuming it is under self.checkpoints_location location.

compute_metrics(true: Tensor, pred: Tensor, logger_step: int, batch_idx: int | None, stage: str = 'train') Dict[str, Any][source]

Compute and log metrics.

Parameters:
  • metrics (Dict[str, Metric]) – metrics dict. Can be self.train_metrics or self.validation_metrics.

  • true (Batch) – true values.

  • pred (Batch) – predicted values.

  • logger_step (int) – global step to pass to the logger.

  • stage (str) – β€˜train’, β€˜validation’…

Returns:

metric values.

Return type:

Dict[str, Any]

train()[source]

Trains a machine learning model. Main training loop/logic.

Parameters:
  • train_dataset (Dataset) – training dataset.

  • validation_dataset (Dataset) – validation dataset.

  • test_dataset (Dataset) – test dataset.

Returns:

training dataset, validation dataset, test dataset, trained model.

Return type:

Tuple[Dataset, Dataset, Dataset, Any]

train_epoch(epoch: int) Tensor[source]

Perform a complete sweep over the training dataset, completing an epoch of training.

Parameters:

epoch (int) – current epoch number, from 0 to self.epochs - 1.

Returns:

average training loss for the current epoch.

Return type:

Loss

train_step(batch: Tensor, batch_idx: int) Tuple[Tensor, Dict[str, Any]][source]

Perform a single optimization step using a batch sampled from the training dataset.

Parameters:
  • batch (Batch) – batch sampled by a dataloader.

  • batch_idx (int) – batch index in the dataloader.

Returns:

batch loss and dictionary of metric values with the same structure of self.metrics.

Return type:

Tuple[Loss, Dict[str, Any]]

validation_epoch(epoch: int) Tensor[source]

Perform a complete sweep over the validation dataset, completing an epoch of validation.

Parameters:

epoch (int) – current epoch number, from 0 to self.epochs - 1.

Returns:

average validation loss for the current epoch if

self.validation_dataloader is not None

Return type:

Optional[Loss]

validation_step(batch: Tensor, batch_idx: int) Tuple[Tensor, Dict[str, Any]][source]

Perform a single optimization step using a batch sampled from the validation dataset.

Parameters:
  • batch (Batch) – batch sampled by a dataloader.

  • batch_idx (int) – batch index in the dataloader.

Returns:

batch loss and dictionary of metric values with the same structure of self.metrics.

Return type:

Tuple[Loss, Dict[str, Any]]

test_epoch(epoch: int) Tensor[source]

Perform a complete sweep over the test dataset, completing an epoch of test.

Parameters:

epoch (int) – current epoch number, from 0 to self.epochs - 1.

Returns:

average test loss for the current epoch.

Return type:

Loss

test_step(batch: Tensor, batch_idx: int) Tuple[Tensor, Dict[str, Any]][source]

Perform a single predictions step using a batch sampled from the test dataset.

Parameters:
  • batch (Batch) – batch sampled by a dataloader.

  • batch_idx (int) – batch index in the dataloader.

Returns:

batch loss and dictionary of metric values with the same structure of self.metrics.

Return type:

Tuple[Loss, Dict[str, Any]]

class itwinai.torch.trainer.GANTrainer(config: Dict | TrainingConfiguration, epochs: int, discriminator: Module, generator: Module, strategy: Literal['ddp', 'deepspeed'] = 'ddp', validation_every: int | None = 1, test_every: int | None = None, random_seed: int | None = None, logger: Logger | None = None, metrics: Dict[str, Callable] | None = None, checkpoints_location: str = 'checkpoints', checkpoint_every: int | None = None, name: str | None = None, **kwargs)[source]

Bases: TorchTrainer

Trainer class for GAN models using pytorch.

Parameters:
  • config (Union[Dict, TrainingConfiguration]) – training configuration containing hyperparameters.

  • epochs (int) – number of training epochs.

  • discriminator (nn.Module) – pytorch discriminator model to train GAN.

  • generator (nn.Module) – pytorch generator model to train GAN.

  • strategy (Literal['ddp', 'deepspeed', 'horovod'], optional) – distributed strategy. Defaults to β€˜ddp’.

  • validation_every (Optional[int], optional) – run a validation epoch every validation_every epochs. Disabled if None. Defaults to 1.

  • test_every (Optional[int], optional) – run a test epoch every test_every epochs. Disabled if None. Defaults to None.

  • random_seed (Optional[int], optional) – set random seed for reproducibility. If None, the seed is not set. Defaults to None.

  • logger (Optional[Logger], optional) – logger for ML tracking. Defaults to None.

  • metrics (Optional[Dict[str, Metric]], optional) – map of torch metrics metrics. Defaults to None.

  • checkpoints_location (str) – path to checkpoints directory. Defaults to β€œcheckpoints”.

  • checkpoint_every (Optional[int]) – save a checkpoint every checkpoint_every epochs. Disabled if None. Defaults to None.

  • name (Optional[str], optional) – trainer custom name. Defaults to None.

create_model_loss_optimizer() None[source]

Instantiate a torch model, loss, optimizer, and LR scheduler using the configuration provided in the Trainer constructor. Generally a user-defined method.

train_epoch(epoch: int)[source]

Perform a complete sweep over the training dataset, completing an epoch of training.

Parameters:

epoch (int) – current epoch number, from 0 to self.epochs - 1.

Returns:

average training loss for the current epoch.

Return type:

Loss

validation_epoch(epoch: int)[source]

Perform a complete sweep over the validation dataset, completing an epoch of validation.

Parameters:

epoch (int) – current epoch number, from 0 to self.epochs - 1.

Returns:

average validation loss for the current epoch if

self.validation_dataloader is not None

Return type:

Optional[Loss]

train_step(real_images, batch_idx)[source]

Perform a single optimization step using a batch sampled from the training dataset.

Parameters:
  • batch (Batch) – batch sampled by a dataloader.

  • batch_idx (int) – batch index in the dataloader.

Returns:

batch loss and dictionary of metric values with the same structure of self.metrics.

Return type:

Tuple[Loss, Dict[str, Any]]

validation_step(real_images, batch_idx)[source]

Perform a single optimization step using a batch sampled from the validation dataset.

Parameters:
  • batch (Batch) – batch sampled by a dataloader.

  • batch_idx (int) – batch index in the dataloader.

Returns:

batch loss and dictionary of metric values with the same structure of self.metrics.

Return type:

Tuple[Loss, Dict[str, Any]]

save_checkpoint(name, epoch, loss=None)[source]

Save training checkpoint with both optimizers.

load_checkpoint(checkpoint_path)[source]

Load models and optimizers from checkpoint.

save_fake_generator_images(epoch)[source]

plot and save fake images from generator

Args:

epoch (int): epoch number, from 0 to epochs-1.

class itwinai.torch.trainer.TorchLightningTrainer(config: Dict | str, mlflow_saved_model: str = 'my_model')[source]

Bases: Trainer

Generic trainer for torch Lightning workflows.

Parameters:
  • config (Union[Dict, str]) – Lightning configuration which can be the path to a file or a Python dictionary.

  • mlflow_saved_model (str, optional) – name of the model created in MLFlow. Defaults to β€˜my_model’.

execute() Any[source]

Trains a machine learning model.

Parameters:
  • train_dataset (MLDataset) – training dataset.

  • validation_dataset (MLDataset) – validation dataset.

  • test_dataset (MLDataset) – test dataset.

Returns:

training dataset, validation dataset, test dataset, trained model.

Return type:

Tuple[MLDataset, MLDataset, MLDataset]

itwinai.torch.trainer.distributed(func)[source]

The decorated function must have a standard signature. Its first arguments must be: model, train_dataloader, validation_dataloader, device (in this order).

Additional args or kwargs are allowed consistently with the signature of the decorated function.

class itwinai.torch.trainer.RayTorchTrainer(config: Dict, strategy: Literal['ddp', 'deepspeed'] = 'ddp', name: str | None = None, logger: Logger | None = None, random_seed: int = 1234)[source]

Bases: Trainer

A trainer class for distributed training and hyperparameter optimization using Ray Train/ Tune and PyTorch.

Parameters:
  • config (Dict) – A dictionary of configuration settings for the trainer.

  • strategy (Optional[Literal["ddp", "deepspeed"]]) – The distributed training strategy to use. Defaults to β€œddp”.

  • name (Optional[str]) – Optional name for the trainer instance. Defaults to None.

  • logger (Optional[Logger]) – Optional logger instance. Defaults to None.

property device: str

Get the current device from distributed strategy. :returns: Device string (e.g., β€œcuda:0”). :rtype: str

create_dataloaders(train_dataset: Dataset, validation_dataset: Dataset | None = None, test_dataset: Dataset | None = None, batch_size: int = 1, num_workers_dataloader: int = 4, pin_memory: bool = False, shuffle_train: bool | None = False, shuffle_test: bool | None = False, shuffle_validation: bool | None = False, sampler: Sampler | Iterable | None = None, collate_fn: Callable[[List], Any] | None = None) None[source]

Create data loaders for training, validation, and testing.

Parameters:
  • train_dataset (Dataset) – The training dataset.

  • validation_dataset (Dataset, optional) – The validation dataset. Defaults to None.

  • test_dataset (Dataset, optional) – The test dataset. Defaults to None.

  • batch_size (int, optional) – Batch size for data loaders. Defaults to 1.

  • shuffle_train (bool, optional) – Whether to shuffle the training dataset. Defaults to False.

  • shuffle_test (bool, optional) – Whether to shuffle the test dataset. Defaults to False.

  • shuffle_validation (bool, optional) – Whether to shuffle the validation dataset. Defaults to False.

  • sampler (Union[Sampler, Iterable, None], optional) – Sampler for the datasets. Defaults to None.

  • collate_fn (Callable[[List], Any], optional) – Function to collate data samples into batches. Defaults to None.

execute(train_dataset: Dataset, validation_dataset: Dataset | None = None, test_dataset: Dataset | None = None) Tuple[Dataset, Dataset, Dataset, Any][source]

Execute the training pipeline with the given datasets.

Parameters:
  • train_dataset (Dataset) – Training dataset.

  • validation_dataset (Optional[Dataset], optional) – Validation dataset. Defaults to None.

  • test_dataset (Optional[Dataset], optional) – Test dataset. Defaults to None.

Returns:

A tuple containing the datasets and the training result grid.

Return type:

Tuple[Dataset, Dataset, Dataset, Any]

set_epoch(epoch: int) None[source]
checkpoint_and_report(epoch, tuning_metrics, checkpointing_data=None)[source]
initialize_logger(hyperparams: Dict | None, rank)[source]
close_logger()[source]
log(item: Any | List[Any], identifier: str | List[str], kind: str = 'metric', step: int | None = None, batch_idx: int | None = None, **kwargs) None[source]

Log item with identifier name of kind type at step time step.

Parameters:
  • item (Union[Any, List[Any]]) – element to be logged (e.g., metric).

  • identifier (Union[str, List[str]]) – unique identifier for the element to log(e.g., name of a metric).

  • kind (str, optional) – type of the item to be logged. Must be one among the list of self.supported_types. Defaults to β€˜metric’.

  • step (Optional[int], optional) – logging step. Defaults to None.

  • batch_idx (Optional[int], optional) – DataLoader batch counter (i.e., batch idx), if available. Defaults to None.