itwinai PyTorch Modules

distributed.py

itwinai.torch.distributed.distributed_resources_available() bool[source]

Check if the current execution environment has (enough) GPUs available to allow for distributed ML.

Returns:

env can support distributed ML.

Return type:

bool

class itwinai.torch.distributed.TorchDistributedStrategy[source]

Bases: DistributedStrategy

Abstract class to define the distributed backend methods for PyTorch models.

is_distributed: bool = True

Allows to discriminate distributed strategies from non-distributed. Defaults to True.

is_initialized: bool = False

Set to True when the current strategy is initialized. Defaults to False.

property is_main_worker: bool

Checks if local worker has global rank equal to zero.

Returns:

True if main worker.

Return type:

bool

abstract init() None[source]

Initializes the chosen distributed backend

abstract distributed(model: Module, optimizer: Optimizer, lr_scheduler: _LRScheduler | None = None) Tuple[Module, Optimizer, _LRScheduler | None][source]

Setup model, optimizer and scheduler for distributed.

abstract global_world_size() int[source]

Returns the total number of processes (global world size).

Returns:

global world size.

Return type:

int

abstract local_world_size() int[source]

Returns the number of local workers available on a node (local world size). Usually it is equal to the number of available GPUs.

Returns:

local world size.

Return type:

int

abstract global_rank() int[source]

Returns the global rank of the current process. Rank ranges from 0 to world_size.

Returns:

global rank.

Return type:

int

abstract local_rank() int[source]

Returns the local rank of the current process.

Returns:

local rank.

Return type:

int

device() str[source]

Device used by local worker.

Returns:

torch device in the form β€˜cuda:N’.

Return type:

str

set_device()[source]

Set local device.

create_dataloader(dataset: Dataset[T_co], batch_size: int | None = 1, shuffle: bool | None = None, sampler: Sampler | Iterable | None = None, batch_sampler: Sampler[List] | Iterable[List] | None = None, num_workers: int = 0, collate_fn: Callable[[List[T]], Any] | None = None, pin_memory: bool = False, drop_last: bool = False, timeout: float = 0, worker_init_fn: Callable[[int], None] | None = None, multiprocessing_context=None, generator=None, *, prefetch_factor: int | None = None, persistent_workers: bool = False, pin_memory_device: str = '')[source]

Create a distributed DataLoader by using DistributedSampler as random sampler.

Parameters:
  • dataset (Dataset) – dataset from which to load the data.

  • batch_size (int, optional) – how many samples per batch to load (default: 1).

  • shuffle (bool, optional) – set to True to have the data reshuffled at every epoch (default: False).

  • sampler (Sampler or Iterable, optional) – defines the strategy to draw samples from the dataset. Can be any Iterable with __len__ implemented. If specified, shuffle must not be specified.

  • batch_sampler (Sampler or Iterable, optional) – like sampler, but returns a batch of indices at a time. Mutually exclusive with batch_size, shuffle, sampler, and drop_last.

  • num_workers (int, optional) – how many subprocesses to use for data loading. 0 means that the data will be loaded in the main process. (default: 0)

  • collate_fn (Callable, optional) – merges a list of samples to form a mini-batch of Tensor(s). Used when using batched loading from a map-style dataset.

  • pin_memory (bool, optional) – If True, the data loader will copy Tensors into device/CUDA pinned memory before returning them. If your data elements are a custom type, or your collate_fn returns a batch that is a custom type, see the example below.

  • drop_last (bool, optional) – set to True to drop the last incomplete batch, if the dataset size is not divisible by the batch size. If False and the size of dataset is not divisible by the batch size, then the last batch will be smaller. (default: False)

  • timeout (numeric, optional) – if positive, the timeout value for collecting a batch from workers. Should always be non-negative. (default: 0)

  • worker_init_fn (Callable, optional) – If not None, this will be called on each worker subprocess with the worker id (an int in [0, num_workers - 1]) as input, after seeding and before data loading. (default: None)

  • or (multiprocessing_context (str) – multiprocessing.context.BaseContext, optional): If None, the default multiprocessing context of your operating system will be used. (default: None)

  • generator (torch.Generator, optional) – If not None, this RNG will be used by RandomSampler to generate random indexes and multiprocessing to generate base_seed for workers. (default: None)

  • prefetch_factor (int, optional, keyword-only arg) – Number of batches loaded in advance by each worker. 2 means there will be a total of 2 * num_workers batches prefetched across all workers. (default value depends on the set value for num_workers. If value of num_workers=0 default is None. Otherwise, if value of num_workers > 0 default is 2).

  • persistent_workers (bool, optional) – If True, the data loader will not shut down the worker processes after a dataset has been consumed once. This allows to maintain the workers Dataset instances alive. (default: False)

  • pin_memory_device (str, optional) – the device to pin_memory to if pin_memory is True.

Warning

If the spawn start method is used, worker_init_fn cannot be an unpicklable object, e.g., a lambda function. See Multiprocessing best practices on more details related to multiprocessing in PyTorch.

Warning

len(dataloader) heuristic is based on the length of the sampler used. When dataset is an IterableDataset, it instead returns an estimate based on len(dataset) / batch_size, with proper rounding depending on drop_last, regardless of multi-process loading configurations. This represents the best guess PyTorch can make because PyTorch trusts user dataset code in correctly handling multi-process loading to avoid duplicate data.

However, if sharding results in multiple workers having incomplete last batches, this estimate can still be inaccurate, because (1) an otherwise complete batch can be broken into multiple ones and (2) more than one batch worth of samples can be dropped when drop_last is set. Unfortunately, PyTorch can not detect such cases in general.

See Dataset Types for more details on these two types of datasets and how IterableDataset interacts with Multi-process data loading.

abstract clean_up() None[source]

Cleans up resources allocated by distributed strategy.

abstract allgather_obj(obj: Any) List[Any][source]

All-gathers any object from the whole group in a list (to all workers).

Parameters:

obj (Any) – object to gather from all workers.

Returns:

list of objects gathered from all workers.

Return type:

List[Any]

abstract gather_obj(obj: Any, dst_rank: int = 0) List[Any][source]

Gathers any object from the whole group in a list (to all workers).

Parameters:
  • obj (Any) – object to gather from all workers.

  • dst_rank (int) – rank of the worker on which the objects list are gathered.

Returns:

list of objects gathered from all workers.

Return type:

List[Any]

abstract gather(tensor: Tensor, dst_rank: int = 0)[source]

Gathers any object from the whole group in a list (to all workers).

Parameters:
  • obj (Any) – object to gather from all workers.

  • dst_rank (int) – rank of the worker on which the objects list are gathered.

Returns:

list of objects gathered from all workers.

Return type:

List[Any]

class itwinai.torch.distributed.TorchDDPStrategy(backend: Literal['nccl', 'gloo', 'mpi'])[source]

Bases: TorchDistributedStrategy

PyTorch DistributedDataParallel distributed strategy class.

Parameters:

backend (Literal['nccl', 'gloo', 'mpi']) – Name of the distributed communication backend to employ.

backend: Literal['nccl', 'gloo', 'mpi']

Torch distributed communication backend.

init() None[source]

Initializes the distributed process group and the distributed package.

Raises:
  • RuntimeError – when there are not (enough) GPUs available.

  • DistributedStrategyError – when trying to initialize a strategy which is already initialized.

distributed(model: Module, optimizer: Optimizer, lr_scheduler: _LRScheduler | None = None, **kwargs) Tuple[Module, Optimizer, _LRScheduler | None][source]

Setup model, optimizer and scheduler for distributed.

global_world_size() int[source]

Returns the total number of processes (global world size).

Returns:

global world size.

Return type:

int

local_world_size() int[source]

Returns the local number of workers available per node, which is usually the number of GPUs available.

Returns:

local world size.

Return type:

int

global_rank() int[source]

Returns the global rank of the current process, where rank ranges from 0 to world_size.

Returns:

global rank.

Return type:

int

local_rank() int[source]

Returns the local rank of the current process.

Returns:

local rank.

Return type:

int

clean_up() None[source]

Destroys the current process group.

allgather_obj(obj: Any) List[Any][source]

All-gathers any object from the whole group in a list (to all workers).

Parameters:

obj (Any) – Object to gather from all workers.

Returns:

List of gathered objects.

Return type:

List[Any]

gather_obj(obj: Any, dst_rank: int = 0) List[Any] | None[source]

Gathers any object from the whole group in a list (to all workers).

Parameters:
  • obj (Any) – object to gather from all workers.

  • dst_rank (int) – rank of the worker on which the objects list are gathered.

Returns:

list of objects gathered from all workers or None on non-destination ranks.

Return type:

Optional[List[Any]]

gather(tensor: Tensor, dst_rank: int = 0)[source]

Gathers any object from the whole group in a list (to all workers).

Parameters:
  • obj (Any) – object to gather from all workers.

  • dst_rank (int) – rank of the worker on which the objects list are gathered.

Returns:

list of objects gathered from all workers.

Return type:

List[Any]

class itwinai.torch.distributed.DeepSpeedStrategy(backend: Literal['nccl', 'gloo', 'mpi'])[source]

Bases: TorchDistributedStrategy

DeepSpeed distributed strategy class.

Parameters:
  • backend (Literal['nccl', 'gloo', 'mpi']) – Name of the distributed communication backend to employ.

  • config (Union[dict, Path, str]) – DeepSpeed config. Either a dictionary or a path to a JSON file.

backend: Literal['nccl', 'gloo', 'mpi']

Torch distributed communication backend.

init() None[source]

Initializes the distributed process group and the distributed package.

Raises:
  • RuntimeError – when there are not (enough) GPUs available.

  • DistributedStrategyError – when trying to initialize a strategy already initialized.

distributed(model: Module, optimizer: Optimizer | None = None, lr_scheduler: _LRScheduler | None = None, model_parameters: Any | None = None, **init_kwargs) Tuple[Module, Optimizer, _LRScheduler | None][source]

Setup model, optimizer and scheduler for distributed.

global_world_size() int[source]

Returns the total number of processes (global world size).

Returns:

global world size.

Return type:

int

local_world_size() int[source]

Returns the local number of workers available per node, which is usually the number of GPUs available.

Returns:

local world size.

Return type:

int

global_rank() int[source]

Returns the global rank of the current process, where rank ranges from 0 to world_size.

Returns:

global rank.

Return type:

int

local_rank() int[source]

Returns the local rank of the current process.

Returns:

local rank.

Return type:

int

clean_up() None[source]

Destroys the current process group.

allgather_obj(obj: Any) List[Any][source]

All-gathers any object from the whole group in a list (to all workers).

Parameters:

obj (Any) – Object to gather from all workers.

Returns:

List of gathered objects.

Return type:

List[Any]

gather_obj(obj: Any, dst_rank: int = 0) List[Any] | None[source]

Gathers any object from the whole group in a list (to all workers).

Parameters:
  • obj (Any) – object to gather from all workers.

  • dst_rank (int) – rank of the worker on which the objects list are gathered.

Returns:

list of objects gathered from all workers or None on non-destination ranks.

Return type:

Optional[List[Any]]

gather(tensor: Tensor, dst_rank: int = 0)[source]

Gathers any object from the whole group in a list (to all workers).

Parameters:
  • obj (Any) – object to gather from all workers.

  • dst_rank (int) – rank of the worker on which the objects list are gathered.

Returns:

list of objects gathered from all workers.

Return type:

List[Any]

class itwinai.torch.distributed.HorovodStrategy[source]

Bases: TorchDistributedStrategy

Horovod distributed strategy class.

init() None[source]

Initializes the Horovod distributed backend.

Raises:
  • RuntimeError – when there are not (enough) GPUs available.

  • DistributedStrategyError – when trying to initialize a strategy already initialized.

distributed(model: Module, optimizer: Optimizer | None = None, lr_scheduler: _LRScheduler | None = None, **optim_kwargs) Tuple[Module, Optimizer, _LRScheduler | None][source]

Setup model, optimizer and scheduler for distributed.

global_world_size() int[source]

Returns the total number of processes (global world size).

Returns:

global world size.

Return type:

int

local_world_size() int[source]

Returns the local number of workers available per node, which is usually the number of GPUs available.

Returns:

local world size.

Return type:

int

global_rank() int[source]

Returns the global rank of the current process, where rank ranges from 0 to world_size.

Returns:

global rank.

Return type:

int

local_rank() int[source]

Returns the local rank of the current process.

Returns:

local rank.

Return type:

int

clean_up() None[source]

Shuts Horovod down.

allgather_obj(obj: Any) list[Any][source]

All-gathers scalar objects across all workers to a list with size(#worker), uses horovod communicator

Parameters:

obj (Any) – object in a worker.

Returns:

gathered list with size(#worker).

Return type:

list

gather_obj(obj: Any, dst_rank: int = 0) list[Any][source]

The same as allgather_obj, as gather is not supported by Horovod.

Parameters:
  • obj (Any) – object in a worker.

  • dst_rank (int) – ignored.

Returns:

gathered list with size(#worker).

Return type:

list

gather(tensor: Tensor, dst_rank: int = 0)[source]

The same as allgather_obj, as gather is not supported by Horovod.

Parameters:
  • tensor (Any) – object in a worker.

  • dst_rank (int) – ignored.

Returns:

gathered list with size(#worker).

Return type:

list

class itwinai.torch.distributed.NonDistributedStrategy[source]

Bases: TorchDistributedStrategy

Dummy class for non-distributed environments.

is_distributed: bool = False

This strategy is not distributed. Defaults to False.

init() None[source]

If CUDA is available set CUDA device, and do nothing more.

Raises:

DistributedStrategyError – when trying to initialize a strategy already initialized.

device() str[source]

Device used by local worker.

Returns:

cpu device if CUDA is not available.

Return type:

str

distributed(model: Module, optimizer: Optimizer | None = None, lr_scheduler: _LRScheduler | None = None, **kwargs) Tuple[Module, Optimizer, _LRScheduler | None][source]

Do nothing and return model, optimizer and scheduler.

global_world_size() int[source]

Returns the total number of processes (global world size).

Returns:

global world size.

Return type:

int

local_world_size() int[source]

Returns the local number of workers available per node, which is usually the number of GPUs available.

Returns:

local world size.

Return type:

int

global_rank() int[source]

Returns the global rank of the current process, where rank ranges from 0 to world_size.

Returns:

global rank.

Return type:

int

local_rank() int[source]

Returns the local rank of the current process.

Returns:

local rank.

Return type:

int

clean_up() None[source]

Do nothing.

allgather_obj(obj: Any) list[Any][source]

Wraps obj into a List object.

Parameters:

obj (Any) – object in a worker.

Returns:

input object wrapped in a list.

Return type:

list[Any]

gather_obj(obj: Any, dst_rank: int = 0) list[Any][source]

Wraps obj into a List object.

Parameters:
  • obj (Any) – object in a worker.

  • dst_rank (int) – ignored.

Returns:

input object wrapped in a list.

Return type:

list[Any]

gather(tensor: Tensor, dst_rank: int = 0)[source]

Wraps tensor into a List object.

Parameters:
  • tensor (Any) – object in a worker.

  • dst_rank (int) – ignored.

Returns:

input object wrapped in a list.

Return type:

list[Any]

inference.py

class itwinai.torch.inference.TorchModelLoader(model_uri: str)[source]

Bases: ModelLoader

Loads a torch model from somewhere.

Parameters:

model_uri (str) – Can be a path on local filesystem or an mlflow β€˜locator’ in the form: β€˜mlflow+MLFLOW_TRACKING_URI+RUN_ID+ARTIFACT_PATH’

class itwinai.torch.inference.TorchPredictor(model: Module | ModelLoader, test_dataloader_class: str = 'torch.utils.data.DataLoader', test_dataloader_kwargs: Dict | None = None, name: str | None = None)[source]

Bases: Predictor

Applies a pre-trained torch model to unseen data.

test_dataset: Dataset

Dataset on which to make predictions (ML inference).

test_dataloader: DataLoader = None

DataLoader for test dataset.

model: Module = None

Pre-trained PyTorch model used to make predictions.

execute(test_dataset: Dataset, model: Module = None) Dict[str, Any][source]

Applies a torch model to a dataset for inference.

Parameters:
  • test_dataset (Dataset[str, Any]) – each item in this dataset is a couple (item_unique_id, item)

  • model (nn.Module, optional) – torch model. Overrides the existing model, if given. Defaults to None.

Returns:

maps each item ID to the corresponding predicted

value(s).

Return type:

Dict[str, Any]

abstract transform_predictions(batch: Tensor) Tensor[source]

Post-process the predictions of the torch model (e.g., apply threshold in case of multi-label classifier).

class itwinai.torch.inference.MulticlassTorchPredictor(model: Module | ModelLoader, test_dataloader_class: str = 'torch.utils.data.DataLoader', test_dataloader_kwargs: Dict | None = None, name: str | None = None)[source]

Bases: TorchPredictor

Applies a pre-trained torch model to unseen data for multiclass classification.

transform_predictions(batch: Tensor) Tensor[source]

Post-process the predictions of the torch model (e.g., apply threshold in case of multi-label classifier).

class itwinai.torch.inference.MultilabelTorchPredictor(model: Module | ModelLoader, test_dataloader_class: str = 'torch.utils.data.DataLoader', test_dataloader_kwargs: Dict | None = None, threshold: float = 0.5, name: str | None = None)[source]

Bases: TorchPredictor

Applies a pre-trained torch model to unseen data for multilabel classification, applying a threshold on the output of the neural network.

threshold: float = 0.5

Threshold to transform probabilities into class predictions. Defaults to 0.5.

transform_predictions(batch: Tensor) Tensor[source]

Post-process the predictions of the torch model (e.g., apply threshold in case of multi-label classifier).

class itwinai.torch.inference.RegressionTorchPredictor(model: Module | ModelLoader, test_dataloader_class: str = 'torch.utils.data.DataLoader', test_dataloader_kwargs: Dict | None = None, name: str | None = None)[source]

Bases: TorchPredictor

Applies a pre-trained torch model to unseen data for regression, leaving untouched the output of the neural network.

transform_predictions(batch: Tensor) Tensor[source]

Post-process the predictions of the torch model (e.g., apply threshold in case of multi-label classifier).

mlflow.py

itwinai.torch.mlflow.init_lightning_mlflow(pl_config: Dict, default_experiment_name: str = 'Default', tmp_dir: str = '.tmp', **autolog_kwargs) None[source]

Initialize mlflow for pytorch lightning, also setting up auto-logging (mlflow.pytorch.autolog(…)). Creates a new mlflow run and attaches it to the mlflow auto-logger.

Parameters:
  • pl_config (Dict) – pytorch lightning configuration loaded in memory.

  • default_experiment_name (str, optional) – used as experiment name if it is not given in the lightning conf. Defaults to β€˜Default’.

  • tmp_dir (str) – where to temporarily store some artifacts.

  • autolog_kwargs (kwargs) – args for mlflow.pytorch.autolog(…).

itwinai.torch.mlflow.teardown_lightning_mlflow() None[source]

End active mlflow run, if any.

trainer.py

Provides training logic for PyTorch models via Trainer classes.

class itwinai.torch.trainer.TorchTrainer(config: Dict | TrainingConfiguration, epochs: int, model: Module | str | None = None, strategy: Literal['ddp', 'deepspeed', 'horovod'] = 'ddp', validation_every: int | None = 1, test_every: int | None = None, random_seed: int | None = None, logger: Logger | None = None, metrics: Dict[str, Callable] | None = None, checkpoints_location: str = 'checkpoints', checkpoint_every: int | None = None, name: str | None = None)[source]

Bases: Trainer, LogMixin

Trainer class for torch training algorithms.

Parameters:
  • config (Union[Dict, TrainingConfiguration]) – training configuration containing hyperparameters.

  • epochs (int) – number of training epochs.

  • model (Optional[Union[nn.Module, str]], optional) – pytorch model to train or a string identifier. Defaults to None.

  • strategy (Literal['ddp', 'deepspeed', 'horovod'], optional) – distributed strategy. Defaults to β€˜ddp’.

  • validation_every (Optional[int], optional) – run a validation epoch every validation_every epochs. Disabled if None. Defaults to 1.

  • test_every (Optional[int], optional) – run a test epoch every test_every epochs. Disabled if None. Defaults to None.

  • random_seed (Optional[int], optional) – set random seed for reproducibility. If None, the seed is not set. Defaults to None.

  • logger (Optional[Logger], optional) – logger for ML tracking. Defaults to None.

  • metrics (Optional[Dict[str, Metric]], optional) – map of torchmetrics metrics. Defaults to None.

  • checkpoints_location (str) – path to checkpoints directory. Defaults to β€œcheckpoints”.

  • checkpoint_every (Optional[int]) – save a checkpoint every checkpoint_every epochs. Disabled if None. Defaults to None.

  • name (Optional[str], optional) – trainer custom name. Defaults to None.

train_dataloader: DataLoader = None

PyTorch DataLoader for training dataset.

validation_dataloader: DataLoader = None

PyTorch DataLoader for validation dataset.

test_dataloader: DataLoader = None

PyTorch DataLoader for test dataset.

loss: Callable = None

Loss criterion.

optimizer: Optimizer = None

Optimizer.

lr_scheduler: Module = None

Learning rate scheduler.

torch_rng: Generator = None

PyTorch random number generator (PRNG).

train_glob_step: int = 0

Total number training batches used so far, across all epochs.

validation_glob_step: int = 0

Total number validation batches used so far, across all epochs.

test_glob_step: int = 0

Total number test batches used so far, across all epochs.

model: Module = None

PyTorch model to train.

logger: Logger = None

itwinai itwinai.Logger

metrics: Dict[str, Callable]

Dictionary of torchmetrics metrics, indexed by user-defined names.

property strategy: TorchDistributedStrategy

Strategy currently in use.

property device: str

Current device from distributed strategy.

create_model_loss_optimizer() None[source]

Instantiate a torch model, loss, optimizer, and LR scheduler using the configuration provided in the Trainer constructor. Generally a user-define method.

create_dataloaders(train_dataset: Dataset, validation_dataset: Dataset | None = None, test_dataset: Dataset | None = None) None[source]

Create train, validation and test dataloaders using the configuration provided in the Trainer constructor. Generally a user-define method.

Parameters:
  • train_dataset (Dataset) – training dataset object.

  • validation_dataset (Optional[Dataset]) – validation dataset object. Default None.

  • test_dataset (Optional[Dataset]) – test dataset object. Default None.

execute(train_dataset: Dataset, validation_dataset: Dataset | None = None, test_dataset: Dataset | None = None) Tuple[Dataset, Dataset, Dataset, Any][source]

Prepares distributed environment and data structures for the actual training.

Parameters:
  • train_dataset (Dataset) – training dataset.

  • validation_dataset (Optional[Dataset], optional) – validation dataset. Defaults to None.

  • test_dataset (Optional[Dataset], optional) – test dataset. Defaults to None.

Returns:

training dataset, validation dataset, test dataset, trained model.

Return type:

Tuple[Dataset, Dataset, Dataset, Any]

set_epoch(epoch: int) None[source]

Set current epoch at the beginning of training.

Parameters:

epoch (int) – epoch number, from 0 to epochs-1.

log(item: Any | List[Any], identifier: str | List[str], kind: str = 'metric', step: int | None = None, batch_idx: int | None = None, **kwargs) None[source]

Log item with identifier name of kind type at step time step.

Parameters:
  • item (Union[Any, List[Any]]) – element to be logged (e.g., metric).

  • identifier (Union[str, List[str]]) – unique identifier for the element to log(e.g., name of a metric).

  • kind (str, optional) – type of the item to be logged. Must be one among the list of self.supported_types. Defaults to β€˜metric’.

  • step (Optional[int], optional) – logging step. Defaults to None.

  • batch_idx (Optional[int], optional) – DataLoader batch counter (i.e., batch idx), if available. Defaults to None.

save_checkpoint(name: str, epoch: int, loss: Tensor | None = None) None[source]

Save training checkpoint.

Parameters:
  • name (str) – name of the checkpoint.

  • epoch (int) – current training epoch.

  • loss (Optional[torch.Tensor]) – current loss (if available).

load_checkpoint(name: str) None[source]

Load state from a checkpoint.

Parameters:

name (str) – name of the checkpoint to load, assuming it is under self.checkpoints_location location.

compute_metrics(true: Tensor, pred: Tensor, logger_step: int, batch_idx: int | None, stage: str = 'train') Dict[str, Any][source]

Compute and log metrics.

Parameters:
  • metrics (Dict[str, Metric]) – metrics dict. Can be self.train_metrics or self.validation_metrics.

  • true (Batch) – true values.

  • pred (Batch) – predicted values.

  • logger_step (int) – global step to pass to the logger.

  • stage (str) – β€˜train’, β€˜validation’…

Returns:

metric values.

Return type:

Dict[str, Any]

train()[source]

Trains a machine learning model. Main training loop/logic.

Parameters:
  • train_dataset (Dataset) – training dataset.

  • validation_dataset (Dataset) – validation dataset.

  • test_dataset (Dataset) – test dataset.

Returns:

training dataset, validation dataset, test dataset, trained model.

Return type:

Tuple[Dataset, Dataset, Dataset, Any]

train_epoch(epoch: int) Tensor[source]

Perform a complete sweep over the training dataset, completing an epoch of training.

Parameters:

epoch (int) – current epoch number, from 0 to self.epochs - 1.

Returns:

average training loss for the current epoch.

Return type:

Loss

train_step(batch: Tensor, batch_idx: int) Tuple[Tensor, Dict[str, Any]][source]

Perform a single optimization step using a batch sampled from the training dataset.

Parameters:
  • batch (Batch) – batch sampled by a dataloader.

  • batch_idx (int) – batch index in the dataloader.

Returns:

batch loss and dictionary of metric values with the same structure of self.metrics.

Return type:

Tuple[Loss, Dict[str, Any]]

validation_epoch(epoch: int) Tensor[source]

Perform a complete sweep over the validation dataset, completing an epoch of validation.

Parameters:

epoch (int) – current epoch number, from 0 to self.epochs - 1.

Returns:

average validation loss for the current epoch.

Return type:

Loss

validation_step(batch: Tensor, batch_idx: int) Tuple[Tensor, Dict[str, Any]][source]

Perform a single optimization step using a batch sampled from the validation dataset.

Parameters:
  • batch (Batch) – batch sampled by a dataloader.

  • batch_idx (int) – batch index in the dataloader.

Returns:

batch loss and dictionary of metric values with the same structure of self.metrics.

Return type:

Tuple[Loss, Dict[str, Any]]

test_epoch(epoch: int) Tensor[source]

Perform a complete sweep over the test dataset, completing an epoch of test.

Parameters:

epoch (int) – current epoch number, from 0 to self.epochs - 1.

Returns:

average test loss for the current epoch.

Return type:

Loss

test_step(batch: Tensor, batch_idx: int) Tuple[Tensor, Dict[str, Any]][source]

Perform a single predictions step using a batch sampled from the test dataset.

Parameters:
  • batch (Batch) – batch sampled by a dataloader.

  • batch_idx (int) – batch index in the dataloader.

Returns:

batch loss and dictionary of metric values with the same structure of self.metrics.

Return type:

Tuple[Loss, Dict[str, Any]]

class itwinai.torch.trainer.GANTrainer(config: Dict | TrainingConfiguration, epochs: int, discriminator: Module, generator: Module, strategy: Literal['ddp', 'deepspeed'] = 'ddp', validation_every: int | None = 1, test_every: int | None = None, random_seed: int | None = None, logger: Logger | None = None, metrics: Dict[str, Callable] | None = None, checkpoints_location: str = 'checkpoints', checkpoint_every: int | None = None, name: str | None = None, **kwargs)[source]

Bases: TorchTrainer

Trainer class for GAN models using pytorch.

Parameters:
  • config (Union[Dict, TrainingConfiguration]) – training configuration containing hyperparameters.

  • epochs (int) – number of training epochs.

  • discriminator (nn.Module) – pytorch discriminator model to train GAN.

  • generator (nn.Module) – pytorch generator model to train GAN.

  • strategy (Literal['ddp', 'deepspeed', 'horovod'], optional) – distributed strategy. Defaults to β€˜ddp’.

  • validation_every (Optional[int], optional) – run a validation epoch every validation_every epochs. Disabled if None. Defaults to 1.

  • test_every (Optional[int], optional) – run a test epoch every test_every epochs. Disabled if None. Defaults to None.

  • random_seed (Optional[int], optional) – set random seed for reproducibility. If None, the seed is not set. Defaults to None.

  • logger (Optional[Logger], optional) – logger for ML tracking. Defaults to None.

  • metrics (Optional[Dict[str, Metric]], optional) – map of torch metrics metrics. Defaults to None.

  • checkpoints_location (str) – path to checkpoints directory. Defaults to β€œcheckpoints”.

  • checkpoint_every (Optional[int]) – save a checkpoint every checkpoint_every epochs. Disabled if None. Defaults to None.

  • name (Optional[str], optional) – trainer custom name. Defaults to None.

create_model_loss_optimizer() None[source]

Instantiate a torch model, loss, optimizer, and LR scheduler using the configuration provided in the Trainer constructor. Generally a user-define method.

train_epoch(epoch: int)[source]

Perform a complete sweep over the training dataset, completing an epoch of training.

Parameters:

epoch (int) – current epoch number, from 0 to self.epochs - 1.

Returns:

average training loss for the current epoch.

Return type:

Loss

validation_epoch(epoch: int)[source]

Perform a complete sweep over the validation dataset, completing an epoch of validation.

Parameters:

epoch (int) – current epoch number, from 0 to self.epochs - 1.

Returns:

average validation loss for the current epoch.

Return type:

Loss

train_step(real_images, batch_idx)[source]

Perform a single optimization step using a batch sampled from the training dataset.

Parameters:
  • batch (Batch) – batch sampled by a dataloader.

  • batch_idx (int) – batch index in the dataloader.

Returns:

batch loss and dictionary of metric values with the same structure of self.metrics.

Return type:

Tuple[Loss, Dict[str, Any]]

validation_step(real_images, batch_idx)[source]

Perform a single optimization step using a batch sampled from the validation dataset.

Parameters:
  • batch (Batch) – batch sampled by a dataloader.

  • batch_idx (int) – batch index in the dataloader.

Returns:

batch loss and dictionary of metric values with the same structure of self.metrics.

Return type:

Tuple[Loss, Dict[str, Any]]

save_checkpoint(name, epoch, loss=None)[source]

Save training checkpoint with both optimizers.

load_checkpoint(checkpoint_path)[source]

Load models and optimizers from checkpoint.

save_fake_generator_images(epoch)[source]

plot and save fake images from generator

Args:

epoch (int): epoch number, from 0 to epochs-1.

class itwinai.torch.trainer.TorchLightningTrainer(config: Dict | str, mlflow_saved_model: str = 'my_model')[source]

Bases: Trainer

Generic trainer for torch Lightning workflows.

Parameters:
  • config (Union[Dict, str]) – Lightning configuration which can be the path to a file or a Python dictionary.

  • mlflow_saved_model (str, optional) – name of the model created in MLFlow. Defaults to β€˜my_model’.

execute() Any[source]

Trains a machine learning model.

Parameters:
  • train_dataset (MLDataset) – training dataset.

  • validation_dataset (MLDataset) – validation dataset.

  • test_dataset (MLDataset) – test dataset.

Returns:

training dataset, validation dataset, test dataset, trained model.

Return type:

Tuple[MLDataset, MLDataset, MLDataset]

itwinai.torch.trainer.distributed(func)[source]

The decorated function must have a standard signature. Its first arguments must be: model, train_dataloader, validation_dataloader, device (in this order).

Additional args or kwargs are allowed consistently with the signature of the decorated function.

type.py

Custom types definition.

itwinai.torch.type.LrScheduler

Torch learning rate scheduler

itwinai.torch.type.Batch

Torch data batch sampled by a DataLoader.

itwinai.torch.type.Metric

Torch metric function provided by torchmetrics library.

alias of Callable

exception itwinai.torch.type.UninitializedStrategyError[source]

Bases: Exception

Error raised when a strategy has not been initialized.

exception itwinai.torch.type.DistributedStrategyError[source]

Bases: Exception

Error raised when a strategy has already been initialized.

reproducibility.py

This module provides the tools to support reproducible execution of torch scripts.

itwinai.torch.reproducibility.seed_worker(worker_id)[source]

Seed DataLoader worker.

itwinai.torch.reproducibility.set_seed(rnd_seed: int | None, deterministic_cudnn: bool = True) Generator[source]

Set torch random seed and return a PRNG object.

Parameters:
  • rnd_seed (Optional[int]) – random seed. If None, the seed is not set.

  • deterministic_cudnn (bool) – if True, sets torch.backends.cudnn.benchmark = False, which may affect performances.

Returns:

PRNG object.

Return type:

torch.Generator

config.py

Default configuration

class itwinai.torch.config.Configuration(**extra_data: Any)[source]

Bases: BaseModel

Base configuration class.

model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {'extra': 'allow'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, FieldInfo]] = {}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

class itwinai.torch.config.TrainingConfiguration(*, batch_size: int = 32, shuffle_train: bool = False, shuffle_validation: bool = False, shuffle_test: bool = False, pin_gpu_memory: bool = False, num_workers_dataloader: int = 4, loss: Literal['mse', 'nllloss', 'cross_entropy'] = 'cross_entropy', optimizer: Literal['adadelta', 'adam', 'rmsprop', 'sgd'] = 'adam', optim_lr: float = 0.001, optim_momentum: float = 0.9, optim_weight_decay: float = 0.0, fp16_allreduce: bool = False, use_adasum: bool = False, gradient_predivide_factor: float = 1.0, **extra_data: Any)[source]

Bases: Configuration

Default configuration object for training. Override and/or create new configurations using the constructor.

Example:

>>> cfg = TrainingConfiguration(batch_size=17, param_a=42)
>>> print(cfg.batch_size)  # returns 17 (overrides default)
>>> print(cfg.param_a)     # returns 42 (new value)
>>> print(cfg.pin_memory)  # returns the default value
>>>
>>> from rich import print
>>> print(cfg)             # pretty-print of configuration
batch_size: int

Batch size. In a distributed environment it is usually the per-worker batch size. Defaults to 32.

shuffle_train: bool

Whether to shuffle train dataset when creating a torch DataLoader. Defaults to False.

shuffle_validation: bool

Whether to shuffle validation dataset when creating a torch DataLoader. Defaults to False.

shuffle_test: bool

Whether to shuffle test dataset when creating a torch DataLoader. Defaults to False.

pin_gpu_memory: bool

Whether to pin GPU memory. Property of torch DataLoader. Defaults to False.

num_workers_dataloader: int

Number of parallel workers used by torch DataLoader. Defaults to 4.

loss: Literal['mse', 'nllloss', 'cross_entropy']

Loss function. Defaults to β€˜cross_entropy’

optimizer: Literal['adadelta', 'adam', 'rmsprop', 'sgd']

Name of the optimizer to use. Defaults to β€˜adam’.

optim_lr: float

Learning rate used by the optimizer. Defaults to 1e-3.

optim_momentum: float

Momentum used by some optimizers (e.g., SGD). Defaults to 0.9.

optim_weight_decay: float

Weight decay parameter for the optimizer. Defaults to 0.

fp16_allreduce: bool

uses float16 operations in the allreduce distributed gradients aggregation. Better performances at lower precision. Defaults to False.

Type:

Parameter of Horovod’s DistributedOptimizer

use_adasum: bool

use Adasum optimization. Defaults to False.

Type:

Parameter of Horovod’s DistributedOptimizer

gradient_predivide_factor: float

scale gradients before adding them up. Defaults to 1.0.

Type:

Parameter of Horovod’s DistributedOptimizer

model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {'extra': 'allow'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, FieldInfo]] = {'batch_size': FieldInfo(annotation=int, required=False, default=32), 'fp16_allreduce': FieldInfo(annotation=bool, required=False, default=False), 'gradient_predivide_factor': FieldInfo(annotation=float, required=False, default=1.0), 'loss': FieldInfo(annotation=Literal['mse', 'nllloss', 'cross_entropy'], required=False, default='cross_entropy'), 'num_workers_dataloader': FieldInfo(annotation=int, required=False, default=4), 'optim_lr': FieldInfo(annotation=float, required=False, default=0.001), 'optim_momentum': FieldInfo(annotation=float, required=False, default=0.9), 'optim_weight_decay': FieldInfo(annotation=float, required=False, default=0.0), 'optimizer': FieldInfo(annotation=Literal['adadelta', 'adam', 'rmsprop', 'sgd'], required=False, default='adam'), 'pin_gpu_memory': FieldInfo(annotation=bool, required=False, default=False), 'shuffle_test': FieldInfo(annotation=bool, required=False, default=False), 'shuffle_train': FieldInfo(annotation=bool, required=False, default=False), 'shuffle_validation': FieldInfo(annotation=bool, required=False, default=False), 'use_adasum': FieldInfo(annotation=bool, required=False, default=False)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.