itwinai.torchο
configο
Default configuration
- class itwinai.torch.config.Configuration(**extra_data: Any)[source]ο
Bases:
BaseModelBase configuration class.
- model_config: ClassVar[ConfigDict] = {'extra': 'allow'}ο
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class itwinai.torch.config.TrainingConfiguration(*, batch_size: int = 32, shuffle_train: bool = False, shuffle_validation: bool = False, shuffle_test: bool = False, pin_gpu_memory: bool = False, num_workers_dataloader: int = 4, loss: Literal['mse', 'nllloss', 'cross_entropy', 'l1', 'l2', 'bceloss'] = 'cross_entropy', optimizer: Literal['adadelta', 'adam', 'adamw', 'rmsprop', 'sgd'] = 'adam', optim_lr: float = 0.001, optim_momentum: float = 0.9, optim_betas: Tuple[float, float] = (0.9, 0.999), optim_weight_decay: float = 0.0, lr_scheduler: Literal['step', 'multistep', 'constant', 'linear', 'exponential', 'polynomial'] | None = None, lr_scheduler_step_size: int | Iterable[int] = 10, lr_scheduler_gamma: float = 0.95, fp16_allreduce: bool = False, use_adasum: bool = False, gradient_predivide_factor: float = 1.0, dist_backend: Literal['nccl', 'gloo', 'mpi'] = 'nccl', **extra_data: Any)[source]ο
Bases:
ConfigurationDefault configuration object for training. Override and/or create new configurations using the constructor.
Example:
>>> cfg = TrainingConfiguration(batch_size=17, param_a=42) >>> print(cfg.batch_size) # returns 17 (overrides default) >>> print(cfg.param_a) # returns 42 (new value) >>> print(cfg.pin_memory) # returns the default value >>> >>> from rich import print >>> print(cfg) # pretty-print of configuration
Warning
Donβt reinvent parameters that already exist in the training coniguration, if possible. Instead, use the name of the parameters in the training configuration when possible to avoid inconsistencies. For instance, the training configuration defines the learning rate as
optim_lr, so if you redefine it aslrby doingTrainingConfiguration(lr=0.005)in the configuration you will now have bothoptim_lr(created by default) andlr(created by you). This may create confusion and potentially (and silently) break the logic in your code.- batch_size: intο
Batch size. In a distributed environment it is usually the per-worker batch size. Defaults to 32.
- shuffle_train: boolο
Whether to shuffle train dataset when creating a torch
DataLoader. Defaults to False.
- shuffle_validation: boolο
Whether to shuffle validation dataset when creating a torch
DataLoader. Defaults to False.
- shuffle_test: boolο
Whether to shuffle test dataset when creating a torch
DataLoader. Defaults to False.
- pin_gpu_memory: boolο
Whether to pin GPU memory. Property of torch
DataLoader. Defaults to False.
- num_workers_dataloader: intο
Number of parallel workers used by torch
DataLoader. Defaults to 4.
- loss: Literal['mse', 'nllloss', 'cross_entropy', 'l1', 'l2', 'bceloss']ο
Loss function. Defaults to βcross_entropyβ
- optimizer: Literal['adadelta', 'adam', 'adamw', 'rmsprop', 'sgd']ο
Name of the optimizer to use. Defaults to βadamβ.
- optim_lr: floatο
Learning rate used by the optimizer. Defaults to 1e-3.
- optim_momentum: floatο
Momentum used by some optimizers (e.g., SGD). Defaults to 0.9.
- optim_betas: Tuple[float, float]ο
Betas of Adam optimizer (if used). Defaults to (0.9, 0.999).
- optim_weight_decay: floatο
Weight decay parameter for the optimizer. Defaults to 0.
- lr_scheduler: Literal['step', 'multistep', 'constant', 'linear', 'exponential', 'polynomial'] | Noneο
Learning rate scheduler algorithm. Defaults to None (not used).
- lr_scheduler_step_size: int | Iterable[int]ο
Learning rate scheduler step size, if needed by the scheduler. Defaults to 10 (epochs).
- lr_scheduler_gamma: floatο
- fp16_allreduce: boolο
uses float16 operations in the allreduce distributed gradients aggregation. Better performances at lower precision. Defaults to False.
- Type:
Parameter of Horovodβs
DistributedOptimizer
- use_adasum: boolο
use Adasum optimization. Defaults to False.
- Type:
Parameter of Horovodβs
DistributedOptimizer
- gradient_predivide_factor: floatο
scale gradients before adding them up. Defaults to 1.0.
- Type:
Parameter of Horovodβs
DistributedOptimizer
- model_config: ClassVar[ConfigDict] = {'extra': 'allow'}ο
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
distributedο
- itwinai.torch.distributed.distributed_resources_available() bool[source]ο
Check if the current execution environment has (enough) GPUs available to allow for distributed ML.
- Returns:
env can support distributed ML.
- Return type:
bool
- itwinai.torch.distributed.check_initialized(method: Callable) Callable[source]ο
Decorator for strategy methods to check whether the strategy was correctly initialized before calling the method.
- itwinai.torch.distributed.initialize_ray() None[source]ο
This method is used by the RayDDPStrategy and RayDeepSpeedStrategy to initialize the Ray backend if it is not already initialized. This is meant to be called before submitting a function to Ray (as a trial in tuning, or as a worker in distributed ML).
- Raises:
RuntimeError β when no Ray cluster is detected.
EnvironmentError β If required environment variables HEAD_NODE_PORT or HEAD_NODE_IP are not set. These should be set from the slurm script where the ray cluster is launched.
- class itwinai.torch.distributed.TorchDistributedStrategy[source]ο
Bases:
ABCAbstract class to define the distributed backend methods for PyTorch models.
- is_distributed: bool = Trueο
Allows to discriminate distributed strategies from non-distributed. Defaults to True.
- is_initialized: bool = Falseο
Set to True when the current strategy is initialized. Defaults to False.
- name: strο
- property is_main_worker: boolο
Checks if local worker has global rank equal to zero.
- Returns:
True if main worker.
- Return type:
bool
- abstract distributed(model: Module, optimizer: Optimizer, lr_scheduler: _LRScheduler | None = None) Tuple[Module, Optimizer, _LRScheduler | None][source]ο
Setup model, optimizer and scheduler for distributed.
- abstract global_world_size() int[source]ο
Returns the total number of processes (global world size).
- Returns:
global world size.
- Return type:
int
- abstract local_world_size() int[source]ο
Returns the number of local workers available on a node (local world size). Usually it is equal to the number of available GPUs.
- Returns:
local world size.
- Return type:
int
- abstract global_rank() int[source]ο
Returns the global rank of the current process. Rank ranges from 0 to world_size.
- Returns:
global rank.
- Return type:
int
- abstract local_rank() int[source]ο
Returns the local rank of the current process.
- Returns:
local rank.
- Return type:
int
- device() str[source]ο
Device used by local worker.
- Returns:
torch device in the form βdevice:Nβ (e.g., βcuda:0β, βcpuβ).
- Return type:
str
- create_dataloader(dataset: Dataset, batch_size: int | None = 1, shuffle: bool | None = None, sampler: Sampler | Iterable | None = None, batch_sampler: Sampler[List] | Iterable[List] | None = None, num_workers: int = 0, collate_fn: Callable[[List[_T]], Any] | None = None, pin_memory: bool = False, drop_last: bool = False, timeout: float = 0, worker_init_fn: Callable[[int], None] | None = None, multiprocessing_context=None, generator=None, *, prefetch_factor: int | None = None, persistent_workers: bool = False, pin_memory_device: str = '')[source]ο
Create a distributed DataLoader by using
DistributedSampleras random sampler.- Parameters:
dataset (Dataset) β dataset from which to load the data.
batch_size (int, optional) β how many samples per batch to load (default:
1).shuffle (bool, optional) β set to
Trueto have the data reshuffled at every epoch (default:False).sampler (Sampler or Iterable, optional) β defines the strategy to draw samples from the dataset. Can be any
Iterablewith__len__implemented. If specified,shufflemust not be specified.batch_sampler (Sampler or Iterable, optional) β like
sampler, but returns a batch of indices at a time. Mutually exclusive withbatch_size,shuffle,sampler, anddrop_last.num_workers (int, optional) β how many subprocesses to use for data loading.
0means that the data will be loaded in the main process. (default:0)collate_fn (Callable, optional) β merges a list of samples to form a mini-batch of Tensor(s). Used when using batched loading from a map-style dataset.
pin_memory (bool, optional) β If
True, the data loader will copy Tensors into device/CUDA pinned memory before returning them. If your data elements are a custom type, or yourcollate_fnreturns a batch that is a custom type, see the example below.drop_last (bool, optional) β set to
Trueto drop the last incomplete batch, if the dataset size is not divisible by the batch size. IfFalseand the size of dataset is not divisible by the batch size, then the last batch will be smaller. (default:False)timeout (numeric, optional) β if positive, the timeout value for collecting a batch from workers. Should always be non-negative. (default:
0)worker_init_fn (Callable, optional) β If not
None, this will be called on each worker subprocess with the worker id (an int in[0, num_workers - 1]) as input, after seeding and before data loading. (default:None)or (multiprocessing_context (str) β multiprocessing.context.BaseContext, optional): If
None, the default multiprocessing context of your operating system will be used. (default:None)generator (torch.Generator, optional) β If not
None, this RNG will be used by RandomSampler to generate random indexes and multiprocessing to generatebase_seedfor workers. (default:None)prefetch_factor (int, optional, keyword-only arg) β Number of batches loaded in advance by each worker.
2means there will be a total of 2 * num_workers batches prefetched across all workers. (default value depends on the set value for num_workers. If value of num_workers=0 default isNone. Otherwise, if value ofnum_workers > 0default is2).persistent_workers (bool, optional) β If
True, the data loader will not shut down the worker processes after a dataset has been consumed once. This allows to maintain the workers Dataset instances alive. (default:False)pin_memory_device (str, optional) β the device to
pin_memoryto ifpin_memoryisTrue.
- Raises:
UninitializedStrategyError β when this method is called for a strategy which had not been initialized.
RuntimeError β when a user-provided sampler, if given, is not of type
DistributedSampler.
Warning
If the
spawnstart method is used,worker_init_fncannot be an unpicklable object, e.g., a lambda function. See Multiprocessing best practices on more details related to multiprocessing in PyTorch.Warning
len(dataloader)heuristic is based on the length of the sampler used. Whendatasetis anIterableDataset, it instead returns an estimate based onlen(dataset) / batch_size, with proper rounding depending ondrop_last, regardless of multi-process loading configurations. This represents the best guess PyTorch can make because PyTorch trusts userdatasetcode in correctly handling multi-process loading to avoid duplicate data.However, if sharding results in multiple workers having incomplete last batches, this estimate can still be inaccurate, because (1) an otherwise complete batch can be broken into multiple ones and (2) more than one batch worth of samples can be dropped when
drop_lastis set. Unfortunately, PyTorch can not detect such cases in general.See Dataset Types for more details on these two types of datasets and how
IterableDatasetinteracts with Multi-process data loading.Warning
See Reproducibility, and My data loader workers return identical random numbers, and Randomness in multi-process data loading notes for random seed related questions.
- abstract allgather_obj(obj: Any) List[Any][source]ο
All-gathers any object from the whole group in a list (to all workers).
- Parameters:
obj (Any) β object to gather from all workers.
- Returns:
list of objects gathered from all workers.
- Return type:
List[Any]
- abstract gather_obj(obj: Any, dst_rank: int = 0) List[Any][source]ο
Gathers any object from the whole group in a list (to all workers).
- Parameters:
obj (Any) β object to gather from all workers.
dst_rank (int) β rank of the worker on which the objects list are gathered.
- Returns:
list of objects gathered from all workers.
- Return type:
List[Any]
- abstract gather(tensor: Tensor, dst_rank: int = 0) List | None[source]ο
Gathers any object from the whole group in a list (to all workers).
- Parameters:
obj (Any) β object to gather from all workers.
dst_rank (int) β rank of the worker on which the objects list are gathered.
- Returns:
- list of objects gathered from all workers if main
worker, otherwise return None.
- Return type:
Optional[List[Any]]
- class itwinai.torch.distributed.TorchDDPStrategy(backend: Literal['nccl', 'gloo', 'mpi'])[source]ο
Bases:
TorchDistributedStrategyPyTorch
DistributedDataParalleldistributed strategy class.- Parameters:
backend (Literal['nccl', 'gloo', 'mpi']) β Name of the distributed communication backend to employ.
- backend: Literal['nccl', 'gloo', 'mpi']ο
Torch distributed communication backend.
- init() None[source]ο
Initializes the distributed process group and the distributed package.
- Raises:
RuntimeError β when there are not (enough) GPUs available.
DistributedStrategyError β when trying to initialize a strategy which is already initialized.
- distributed(model: Module, optimizer: Optimizer, lr_scheduler: _LRScheduler | None = None, find_unused_parameters: bool = False, **kwargs) Tuple[Module, Optimizer, _LRScheduler | None][source]ο
Setup model, optimizer and scheduler for distributed.
- global_world_size() int[source]ο
Returns the total number of processes (global world size).
- Returns:
global world size.
- Return type:
int
- local_world_size() int[source]ο
Returns the local number of workers available per node, which is usually the number of GPUs available.
- Returns:
local world size.
- Return type:
int
- Raises:
RuntimeError β when the local world size cannot be retrieved.
- global_rank() int[source]ο
Returns the global rank of the current process, where rank ranges from 0 to world_size.
- Returns:
global rank.
- Return type:
int
- local_rank() int[source]ο
Returns the local rank of the current process.
- Returns:
local rank.
- Return type:
int
- allgather_obj(obj: Any) List[Any][source]ο
All-gathers any object from the whole group in a list (to all workers).
- Parameters:
obj (Any) β Object to gather from all workers.
- Returns:
List of gathered objects.
- Return type:
List[Any]
- gather_obj(obj: Any, dst_rank: int = 0) List | None[source]ο
Gathers any object from the whole group in a list (to all workers).
- Parameters:
obj (Any) β object to gather from all workers.
dst_rank (int) β rank of the worker on which the objects list are gathered.
- Returns:
list of objects gathered from all workers or
Noneon non-destination ranks.- Return type:
List | None
- gather(tensor: Tensor, dst_rank: int = 0) List | None[source]ο
Gathers any object from the whole group in a list (to all workers).
- Parameters:
obj (Any) β object to gather from all workers.
dst_rank (int) β rank of the worker on which the objects list are gathered.
- Returns:
- list of objects gathered from all workers if main
worker, otherwise return None.
- Return type:
Optional[List[Any]]
- class itwinai.torch.distributed.DeepSpeedStrategy(backend: Literal['nccl', 'gloo', 'mpi'])[source]ο
Bases:
TorchDistributedStrategyDeepSpeed distributed strategy class.
- Parameters:
backend (Literal['nccl', 'gloo', 'mpi']) β Name of the distributed communication backend to employ.
config (Union[dict, Path, str]) β DeepSpeed config. Either a dictionary or a path to a JSON file.
- backend: Literal['nccl', 'gloo', 'mpi']ο
Torch distributed communication backend.
- init() None[source]ο
Initializes the distributed process group and the distributed package.
- Raises:
RuntimeError β when there are not (enough) GPUs available.
DistributedStrategyError β when trying to initialize a strategy already initialized.
- distributed(model: Module, optimizer: Optimizer | None = None, lr_scheduler: _LRScheduler | None = None, model_parameters: Any | None = None, **init_kwargs) Tuple[Module, Optimizer, _LRScheduler | None][source]ο
Setup model, optimizer and scheduler for distributed.
- global_world_size() int[source]ο
Returns the total number of processes (global world size).
- Returns:
global world size.
- Return type:
int
- local_world_size() int[source]ο
Returns the local number of workers available per node, which is usually the number of GPUs available.
- Returns:
local world size.
- Return type:
int
- Raises:
RuntimeError β when the local world size cannot be retrieved.
- global_rank() int[source]ο
Returns the global rank of the current process, where rank ranges from 0 to world_size.
- Returns:
global rank.
- Return type:
int
- local_rank() int[source]ο
Returns the local rank of the current process.
- Returns:
local rank.
- Return type:
int
- allgather_obj(obj: Any) List[Any][source]ο
All-gathers any object from the whole group in a list (to all workers).
- Parameters:
obj (Any) β Object to gather from all workers.
- Returns:
List of gathered objects.
- Return type:
List[Any]
- gather_obj(obj: Any, dst_rank: int = 0) List[Any] | None[source]ο
Gathers any object from the whole group in a list (to all workers).
- Parameters:
obj (Any) β object to gather from all workers.
dst_rank (int) β rank of the worker on which the objects list are gathered.
- Returns:
list of objects gathered from all workers or
Noneon non-destination ranks.- Return type:
Optional[List[Any]]
- gather(tensor: Tensor, dst_rank: int = 0) List[Tensor] | None[source]ο
Gathers a tensor from the whole group in a list (to all workers).
- Parameters:
obj (Any) β object to gather from all workers.
dst_rank (int) β rank of the worker on which the objects list are gathered.
- Returns:
list of tensors gathered from all workers or
Noneon non-destination ranks.- Return type:
Optional[List[torch.Tensor]]
- class itwinai.torch.distributed.HorovodStrategy[source]ο
Bases:
TorchDistributedStrategyHorovod distributed strategy class.
- init() None[source]ο
Initializes the Horovod distributed backend.
- Raises:
RuntimeError β when there are not (enough) GPUs available.
DistributedStrategyError β when trying to initialize a strategy already initialized.
- distributed(model: Module, optimizer: Optimizer | None = None, lr_scheduler: _LRScheduler | None = None, **optim_kwargs) Tuple[Module, Optimizer, _LRScheduler | None][source]ο
Setup model, optimizer and scheduler for distributed.
- global_world_size() int[source]ο
Returns the total number of processes (global world size).
- Returns:
global world size.
- Return type:
int
- local_world_size() int[source]ο
Returns the local number of workers available per node, which is usually the number of GPUs available.
- Returns:
local world size.
- Return type:
int
- global_rank() int[source]ο
Returns the global rank of the current process, where rank ranges from 0 to world_size.
- Returns:
global rank.
- Return type:
int
- local_rank() int[source]ο
Returns the local rank of the current process.
- Returns:
local rank.
- Return type:
int
- allgather_obj(obj: Any) list[Any][source]ο
All-gathers any object from the whole group in a list (to all workers).
- Parameters:
obj (Any) β Object to gather from all workers.
- Returns:
List of gathered objects.
- Return type:
List[Any]
- gather_obj(obj: Any, dst_rank: int = 0) list[Any] | None[source]ο
Gathers any object from the whole group in a list (to all workers). Under the hood it relies on allgather as gather is not supported by Horovod.
- Parameters:
obj (Any) β object to gather from all workers.
dst_rank (int) β rank of the worker on which the objects list are gathered.
- Returns:
list of objects gathered from all workers or
Noneon non-destination ranks.- Return type:
Optional[List[Any]]
- gather(tensor: Tensor, dst_rank: int = 0) List[Tensor] | None[source]ο
Gathers a tensor from the whole group in a list (to all workers). Under the hood it relies on allgather as gather is not supported by Horovod.
- Parameters:
obj (Any) β object to gather from all workers.
dst_rank (int) β rank of the worker on which the objects list are gathered.
- Returns:
list of tensors gathered from all workers or
Noneon non-destination ranks.- Return type:
Optional[List[torch.Tensor]]
- class itwinai.torch.distributed.NonDistributedStrategy[source]ο
Bases:
TorchDistributedStrategyDummy class for non-distributed environments.
- is_distributed: bool = Falseο
This strategy is not distributed. Defaults to False.
- init() None[source]ο
If CUDA is available set CUDA device, and do nothing more.
- Raises:
DistributedStrategyError β when trying to initialize a strategy already initialized.
- distributed(model: Module, optimizer: Optimizer | None = None, lr_scheduler: _LRScheduler | None = None, **kwargs) Tuple[Module, Optimizer, _LRScheduler | None][source]ο
Do nothing and return model, optimizer and scheduler.
- global_world_size() int[source]ο
Returns the total number of processes (global world size).
- Returns:
global world size.
- Return type:
int
- local_world_size() int[source]ο
Returns the local number of workers available per node, which is usually the number of GPUs available.
- Returns:
local world size.
- Return type:
int
- global_rank() int[source]ο
Returns the global rank of the current process, where rank ranges from 0 to world_size.
- Returns:
global rank.
- Return type:
int
- local_rank() int[source]ο
Returns the local rank of the current process.
- Returns:
local rank.
- Return type:
int
- allgather_obj(obj: Any) list[Any][source]ο
Wraps
objinto a List object.- Parameters:
obj (Any) β object in a worker.
- Returns:
input object wrapped in a list.
- Return type:
list[Any]
- gather_obj(obj: Any, dst_rank: int = 0) list[Any][source]ο
Wraps
objinto a List object.- Parameters:
obj (Any) β object in a worker.
dst_rank (int) β ignored.
- Returns:
input object wrapped in a list.
- Return type:
list[Any]
- class itwinai.torch.distributed.RayTorchDistributedStrategy[source]ο
Bases:
TorchDistributedStrategyBase class for all ray distributed strategies.
- class itwinai.torch.distributed.RayDDPStrategy[source]ο
Bases:
TorchDDPStrategy,RayTorchDistributedStrategyA distributed data-parallel (DDP) strategy using Ray Train for PyTorch training.
- init() None[source]ο
Initializes Ray trial/worker.
- Raises:
RuntimeError β when the Ray cluster is not detected.
- global_world_size() int[source]ο
Returns the total number of processes (global world size).
- Returns:
global world size.
- Return type:
int
- local_world_size() int[source]ο
Returns the local number of workers available per node, which is usually the number of GPUs available.
- Returns:
local world size.
- Return type:
int
- Raises:
RuntimeError β when the local world size cannot be retrieved.
- global_rank() int[source]ο
Returns the global rank of the current process, where rank ranges from 0 to world_size.
- Returns:
global rank.
- Return type:
int
- class itwinai.torch.distributed.RayDeepSpeedStrategy(backend: Literal['nccl', 'gloo', 'mpi'])[source]ο
Bases:
DeepSpeedStrategy,RayTorchDistributedStrategyA distributed strategy using Ray and DeepSpeed for PyTorch training.
- Parameters:
backend (Literal["nccl", "gloo", "mpi"]) β The backend for distributed communication.
- init() None[source]ο
Initializes the distributed process group and the distributed package.
- Raises:
RuntimeError β when there is not a Ray cluster running.
DistributedStrategyError β when trying to initialize a strategy already initialized.
ganο
Provides training logic for PyTorch models via Trainer classes.
- class itwinai.torch.gan.GANTrainingConfiguration(*, batch_size: int = 32, shuffle_train: bool = False, shuffle_validation: bool = False, shuffle_test: bool = False, pin_gpu_memory: bool = False, num_workers_dataloader: int = 4, loss: str = 'bceloss', optimizer: Literal['adadelta', 'adam', 'adamw', 'rmsprop', 'sgd'] = 'adam', optim_lr: float = 0.001, optim_momentum: float = 0.9, optim_betas: Tuple[float, float] = (0.9, 0.999), optim_weight_decay: float = 0.0, lr_scheduler: Literal['step', 'multistep', 'constant', 'linear', 'exponential', 'polynomial'] | None = None, lr_scheduler_step_size: int | Iterable[int] = 10, lr_scheduler_gamma: float = 0.95, fp16_allreduce: bool = False, use_adasum: bool = False, gradient_predivide_factor: float = 1.0, dist_backend: Literal['nccl', 'gloo', 'mpi'] = 'nccl', optimizer_generator: Literal['adadelta', 'adam', 'adamw', 'rmsprop', 'sgd'] = 'adam', optim_generator_lr: float = 0.001, optim_generator_momentum: float = 0.9, optim_generator_betas: Tuple[float, float] = (0.5, 0.999), optim_generator_weight_decay: float = 0.0, lr_scheduler_generator: Literal['step', 'multistep', 'constant', 'linear', 'exponential', 'polynomial'] | None = None, lr_scheduler_generator_step_size: int | Iterable[int] = 10, lr_scheduler_generator_gamma: float = 0.95, optimizer_discriminator: Literal['adadelta', 'adam', 'adamw', 'rmsprop', 'sgd'] = 'adam', optim_discriminator_lr: float = 0.001, optim_discriminator_momentum: float = 0.9, optim_discriminator_betas: Tuple[float, float] = (0.5, 0.999), optim_discriminator_weight_decay: float = 0.0, lr_scheduler_discriminator: Literal['step', 'multistep', 'constant', 'linear', 'exponential', 'polynomial'] | None = None, lr_scheduler_discriminator_step_size: int | Iterable[int] = 10, lr_scheduler_discriminator_gamma: float = 0.95, z_dim: int = 100, **extra_data: Any)[source]ο
Bases:
TrainingConfigurationConfiguration object for training a GAN. Extends the base TrainingConfiguration.
- optimizer_generator: Literal['adadelta', 'adam', 'adamw', 'rmsprop', 'sgd']ο
Name of the optimizer to use for the generator. Defaults to βadamβ.
- optim_generator_lr: floatο
Learning rate used by the optimizer for the generator. Defaults to 1e-3.
- optim_generator_momentum: floatο
Momentum used by some optimizers (e.g., SGD) for the generator. Defaults to 0.9.
- optim_generator_betas: Tuple[float, float]ο
Betas of Adam optimized (if used) for the generator. Defaults to (0.5, 0.999).
- optim_generator_weight_decay: floatο
Weight decay parameter for the optimizer for the generator. Defaults to 0.
- lr_scheduler_generator: Literal['step', 'multistep', 'constant', 'linear', 'exponential', 'polynomial'] | Noneο
Learning rate scheduler algorithm for the generator optimizer. Defaults to None (not used).
- lr_scheduler_generator_step_size: int | Iterable[int]ο
Learning rate scheduler step size, if needed by the scheduler. Defaults to 10 (epochs).
- lr_scheduler_generator_gamma: floatο
- optimizer_discriminator: Literal['adadelta', 'adam', 'adamw', 'rmsprop', 'sgd']ο
Name of the optimizer to use for the discriminator. Defaults to βadamβ.
- optim_discriminator_lr: floatο
Learning rate used by the optimizer for the discriminator. Defaults to 1e-3.
- optim_discriminator_momentum: floatο
Momentum used by some optimizers (e.g., SGD) for the discriminator. Defaults to 0.9.
- optim_discriminator_betas: Tuple[float, float]ο
Betas of Adam optimized (if used) for the discriminator. Defaults to (0.5, 0.999).
- optim_discriminator_weight_decay: floatο
Weight decay parameter for the optimizer for the discriminator. Defaults to 0.
- lr_scheduler_discriminator: Literal['step', 'multistep', 'constant', 'linear', 'exponential', 'polynomial'] | Noneο
Learning rate scheduler algorithm for the discriminator optimizer. Defaults to None (not used).
- lr_scheduler_discriminator_step_size: int | Iterable[int]ο
Learning rate scheduler step size, if needed by the scheduler. Defaults to 10 (epochs).
- lr_scheduler_discriminator_gamma: floatο
- loss: strο
Classification criterion to be used for generator and discriminator losses. Defaults to βbcelossβ.
- z_dim: intο
Generator input size (random noise size). Defaults to 100.
- model_config: ClassVar[ConfigDict] = {'extra': 'allow'}ο
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class itwinai.torch.gan.GANTrainer(config: Dict | GANTrainingConfiguration, epochs: int, discriminator: Module, generator: Module, strategy: Literal['ddp', 'deepspeed'] = 'ddp', test_every: int | None = None, random_seed: int | None = None, logger: Logger | None = None, metrics: Dict[str, Metric] | None = None, checkpoints_location: str = 'checkpoints', checkpoint_every: int | None = None, name: str | None = None, profiling_wait_epochs: int = 1, profiling_warmup_epochs: int = 2, ray_scaling_config: ScalingConfig | None = None, ray_tune_config: TuneConfig | None = None, ray_run_config: RunConfig | None = None, ray_search_space: Dict[str, Any] | None = None, ray_torch_config: TorchConfig | None = None, ray_data_config: DataConfig | None = None, from_checkpoint: str | Path | None = None, **kwargs)[source]ο
Bases:
TorchTrainerTrainer class for GAN models using pytorch.
- Parameters:
config (Dict | TrainingConfiguration) β training configuration containing hyperparameters.
epochs (int) β number of training epochs.
discriminator (nn.Module) β pytorch discriminator model to train GAN.
generator (nn.Module) β pytorch generator model to train GAN.
strategy (Literal['ddp', 'deepspeed', 'horovod'], optional) β distributed strategy. Defaults to βddpβ.
test_every (int | None, optional) β run a test epoch every
test_everyepochs. Disabled if None. Defaults to None.random_seed (int | None, optional) β set random seed for reproducibility. If None, the seed is not set. Defaults to None.
logger (Logger | None, optional) β logger for ML tracking. Defaults to None.
metrics (Dict[str, Callable] | None, optional) β map of torch metrics metrics. Defaults to None.
checkpoints_location (str) β path to checkpoints directory. Defaults to βcheckpointsβ.
checkpoint_every (int | None) β save a checkpoint every
checkpoint_everyepochs. Disabled if None. Defaults to None.name (str | None, optional) β trainer custom name. Defaults to None.
profiling_wait_epochs (int) β how many epochs to wait before starting the profiler.
profiling_warmup_epochs (int) β length of the profiler warmup phase in terms of number of epochs.
ray_scaling_config (ScalingConfig, optional) β scaling config for Ray Trainer. Defaults to None,
ray_tune_config (TuneConfig, optional) β tune config for Ray Tuner. Defaults to None.
ray_run_config (RunConfig, optional) β run config for Ray Trainer. Defaults to None.
ray_search_space (Dict[str, Any], optional) β search space for Ray Tuner. Defaults to None.
ray_torch_config (TorchConfig, optional) β torch configuration for Rayβs TorchTrainer. Defaults to None.
ray_data_config (DataConfig, optional) β dataset configuration for Ray. Defaults to None.
from_checkpoint (str | Path, optional) β path to checkpoint directory. Defaults to None.
- loss: Callable | None = Noneο
Classification loss criterion used in the generator and discriminator losses.
- optimizer_generator: Optimizer | None = Noneο
Optimizer for the generator.
- optimizer_discriminator: Optimizer | None = Noneο
Optimizer for the discriminator.
- lr_scheduler_generator: LRScheduler | None = Noneο
Learning rate scheduler for the optimizer of the generator.
- lr_scheduler_discriminator: LRScheduler | None = Noneο
Learning rate scheduler for the optimizer of the discriminator.
- discriminator: Module | None = Noneο
PyTorch discriminator to train.
- generator: Module | None = Noneο
PyTorch generator to train.
- create_model_loss_optimizer() None[source]ο
Instantiate a torch model, loss, optimizer, and LR scheduler using the configuration provided in the Trainer constructor. Generally a user-defined method.
- save_checkpoint(name: str, best_validation_metric: Tensor | None = None, checkpoints_root: str | Path | None = None, force: bool = False) str | None[source]ο
Save training checkpoint.
- Parameters:
name (str) β name of the checkpoint directory.
best_validation_metric (torch.Tensor | None) β best validation loss throughout training so far (if available).
checkpoints_root (str | None) β path for root checkpoints dir. If None, uses
self.checkpoints_locationas base.force (bool) β force checkpointign now.
- Returns:
path to the checkpoint file or
Nonewhen the checkpoint is not created.
- train_epoch()[source]ο
Perform a complete sweep over the training dataset, completing an epoch of training.
- Parameters:
epoch (int) β current epoch number, from 0 to
self.epochs - 1.- Returns:
average training loss for the current epoch.
- Return type:
torch.Tensor
- train_step(real_images: Tensor, batch_idx: int) Tuple[Tensor, Tensor, Tensor][source]ο
train step for GAN.
- Parameters:
real_images (torch.Tensor) β real images.
batch_idx (int) β batch index.
- Returns:
loss of the discriminator torch.Tensor: loss of the generator torch.Tensor: accuracy of the discriminator
- Return type:
torch.Tensor
- validation_epoch(fid_features: int = 2048) Tensor[source]ο
Validation epoch for GAN.
- Parameters:
fid_features (int, optional) β number of features for InceptionV3 modela.
2048. (Defaults to)
- Returns:
FID score that is returned by the FID metric.
- Return type:
torch.Tensor
- validation_step(real_images: Tensor, batch_idx: int, fid: FrechetInceptionDistance) Tuple[Tensor, Tensor][source]ο
Validation step for GAN.
- Parameters:
real_images (torch.Tensor) β real images.
batch_idx (int) β batch index.
fid (FrechetInceptionDistance) β FID metric.
- Returns:
accuracy of the generator torch.Tensor: accuracy of the discriminator
- Return type:
torch.Tensor
inferenceο
- class itwinai.torch.inference.TorchModelLoader(model_uri: str, model_class: Module | None = None)[source]ο
Bases:
ModelLoaderLoads a torch model from somewhere.
- Parameters:
model_uri (str) β Can be a path on local filesystem or an mlflow βlocatorβ in the form: βmlflow+MLFLOW_TRACKING_URI+RUN_ID+ARTIFACT_PATHβ
- class itwinai.torch.inference.TorchPredictor(config: Dict | TrainingConfiguration, model: Module | ModelLoader, strategy: Literal['ddp', 'deepspeed', 'horovod'] = 'ddp', logger: Logger | None = None, checkpoints_location: str = 'checkpoints', name: str | None = None)[source]ο
Bases:
TorchTrainer,PredictorApplies a pre-trained torch model to unseen data.
- inference_dataloader: DataLoader = Noneο
PyTorch
DataLoaderfor inference dataset.
- model: Module = Noneο
Pre-trained PyTorch model used to make predictions.
- torch_rng: Generator = Noneο
PyTorch random number generator (PRNG).
- create_dataloaders(inference_dataset: Dataset) DataLoader[source]ο
Create inference dataloader.
- Parameters:
inference_dataset (Dataset) β inference dataset object.
- Returns:
Instance of DataLoader for the given inference dataset.
- Return type:
DataLoader
- predict() Dict[str, Any][source]ο
Predicts or runs inference on a trained ML model.
- Returns:
maps each item ID to the corresponding predicted values.
- Return type:
Dict[str, Any]
- execute(inference_dataset: Dataset, model: Module | None = None) Dict[str, Any][source]ο
Applies a torch model to a dataset for inference.
- Parameters:
inference_dataset (Dataset[str, Any]) β each item in this dataset is a couple (item_unique_id, item)
model (nn.Module, optional) β torch model. Overrides the existing model, if given. Defaults to None.
- Returns:
- maps each item ID to the corresponding predicted
value(s).
- Return type:
Dict[str, Any]
- log(item: Any | List[Any], identifier: str | List[str], kind: str = 'metric', step: int | None = None, batch_idx: int | None = None, **kwargs) None[source]ο
Log
itemwithidentifiername ofkindtype atsteptime step.- Parameters:
item (Union[Any, List[Any]]) β element to be logged (e.g., metric).
identifier (Union[str, List[str]]) β unique identifier for the element to log(e.g., name of a metric).
kind (str) β Type of the item to be logged. Must be one among the list of self.supported_types. Defaults to βmetricβ.
step (int | None) β logging step. Defaults to None.
batch_idx (int | None) β DataLoader batch counter (i.e., batch idx), if available. Defaults to None.
- class itwinai.torch.inference.MulticlassTorchPredictor(config: Dict | TrainingConfiguration, model: Module | ModelLoader, strategy: Literal['ddp', 'deepspeed', 'horovod'] = 'ddp', logger: Logger | None = None, checkpoints_location: str = 'checkpoints', name: str | None = None)[source]ο
Bases:
TorchPredictorApplies a pre-trained torch model to unseen data for multiclass classification.
- class itwinai.torch.inference.MultilabelTorchPredictor(model: Module | ModelLoader, test_dataloader_class: str = 'torch.utils.data.DataLoader', test_dataloader_kwargs: Dict | None = None, threshold: float = 0.5, name: str | None = None)[source]ο
Bases:
TorchPredictorApplies a pre-trained torch model to unseen data for multilabel classification, applying a threshold on the output of the neural network.
- threshold: float = 0.5ο
Threshold to transform probabilities into class predictions. Defaults to 0.5.
- class itwinai.torch.inference.RegressionTorchPredictor(config: Dict | TrainingConfiguration, model: Module | ModelLoader, strategy: Literal['ddp', 'deepspeed', 'horovod'] = 'ddp', logger: Logger | None = None, checkpoints_location: str = 'checkpoints', name: str | None = None)[source]ο
Bases:
TorchPredictorApplies a pre-trained torch model to unseen data for regression, leaving untouched the output of the neural network.
loggersο
- class itwinai.torch.loggers.ItwinaiLogger(itwinai_logger: Logger, log_model: Literal['all'] | bool = False, skip_finalize: bool = False)[source]ο
Bases:
LoggerAdapter between PyTorch Lightning logger and itwinai logger.
This adapter forwards logging calls from PyTorch Lightning to the itwinai Logger instance, using the itwinai Loggerβs log method. It supports the lightning logging of metrics, hyperparameters, and checkpoints. Additionally, any function calls can be forwarded to the itwinai logger instance though the experiment property of this Adapter.
- property name: str | Noneο
Return the experiment name.
- property version: int | str | Noneο
Return the experiment version.
- property save_dir: str | Noneο
Return the directory where the logs are stored.
- property experiment: Loggerο
Lightning Logger function. Initializes and returns the itwinai Logger context for experiment tracking.
- Returns:
The itwinai logger instance.
- Return type:
- finalize(status: str) None[source]ο
Lightning Logger function. Logs any remaining checkpoints and closes the logger context.
- Parameters:
status (str) β Describes the status of the training (e.g., βcompletedβ, βfailedβ).
classes' (The status is not needed for this function but part of the parent) β (LightningLogger)
signature (finalize functions)
here. (and therefore must be propagated)
- log_metrics(metrics: Dict[str, float], step: int | None = None) None[source]ο
Lightning Logger function. Logs the given metrics and is usually called by the Lightning Trainer.
- Parameters:
metrics (Dict[str, float]) β Dictionary of metrics to log.
step (Optional[int], optional) β Training step associated with the metrics. Defaults to None.
- log_hyperparams(params: Dict[str, Any] | Namespace) None[source]ο
Lightning Logger function. Logs hyperparameters for the experiment.
- Parameters:
params (Union[Dict[str, Any], Namespace]) β Hyperparameters dictionary or object.
- after_save_checkpoint(checkpoint_callback: ModelCheckpoint) None[source]ο
Lightning Logger function. Handles checkpoint saving to the logger after the ModelCheckpoint Callback of the Lightning Trainer is called. The checkpoints are logged as artifacts.
- Parameters:
checkpoint_callback (ModelCheckpoint) β Callback instance to manage checkpointing.
models.mnistο
- class itwinai.torch.models.mnist.MNISTModel(hidden_size: int = 64)[source]ο
Bases:
LightningModuleSimple PL model for MNIST. Adapted from https://lightning.ai/docs/pytorch/stable/notebooks/lightning_examples/mnist-hello-world.html
- forward(x)[source]ο
Same as
torch.nn.Module.forward().- Parameters:
*args β Whatever you decide to pass into the forward method.
**kwargs β Keyword arguments are also possible.
- Returns:
Your modelβs output
- training_step(batch, batch_idx)[source]ο
Here you compute and return the training loss and some additional metrics for e.g. the progress bar or logger.
- Parameters:
batch β The output of your data iterable, normally a
DataLoader.batch_idx β The index of this batch.
dataloader_idx β The index of the dataloader that produced this batch. (only if multiple dataloaders used)
- Returns:
Tensor- The loss tensordict- A dictionary which can include any keys, but must include the key'loss'in the case of automatic optimization.None- In automatic optimization, this will skip to the next batch (but is not supported for multi-GPU, TPU, or DeepSpeed). For manual optimization, this has no special meaning, as returning the loss is not required.
In this step youβd normally do the forward pass and calculate the loss for a batch. You can also do fancier things like multiple forward passes or something model specific.
Example:
def training_step(self, batch, batch_idx): x, y, z = batch out = self.encoder(x) loss = self.loss(out, x) return loss
To use multiple optimizers, you can switch to βmanual optimizationβ and control their stepping:
def __init__(self): super().__init__() self.automatic_optimization = False # Multiple optimizers (e.g.: GANs) def training_step(self, batch, batch_idx): opt1, opt2 = self.optimizers() # do training_step with encoder ... opt1.step() # do training_step with decoder ... opt2.step()
Note
When
accumulate_grad_batches> 1, the loss returned here will be automatically normalized byaccumulate_grad_batchesinternally.
- validation_step(batch, batch_idx)[source]ο
Operates on a single batch of data from the validation set. In this step youβd might generate examples or calculate anything of interest like accuracy.
- Parameters:
batch β The output of your data iterable, normally a
DataLoader.batch_idx β The index of this batch.
dataloader_idx β The index of the dataloader that produced this batch. (only if multiple dataloaders used)
- Returns:
Tensor- The loss tensordict- A dictionary. Can include any keys, but must include the key'loss'.None- Skip to the next batch.
# if you have one val dataloader: def validation_step(self, batch, batch_idx): ... # if you have multiple val dataloaders: def validation_step(self, batch, batch_idx, dataloader_idx=0): ...
Examples:
# CASE 1: A single validation dataset def validation_step(self, batch, batch_idx): x, y = batch # implement your own out = self(x) loss = self.loss(out, y) # log 6 example images # or generated text... or whatever sample_imgs = x[:6] grid = torchvision.utils.make_grid(sample_imgs) self.logger.experiment.add_image('example_images', grid, 0) # calculate acc labels_hat = torch.argmax(out, dim=1) val_acc = torch.sum(y == labels_hat).item() / (len(y) * 1.0) # log the outputs! self.log_dict({'val_loss': loss, 'val_acc': val_acc})
If you pass in multiple val dataloaders,
validation_step()will have an additional argument. We recommend setting the default value of 0 so that you can quickly switch between single and multiple dataloaders.# CASE 2: multiple validation dataloaders def validation_step(self, batch, batch_idx, dataloader_idx=0): # dataloader_idx tells you which dataset this is. x, y = batch # implement your own out = self(x) if dataloader_idx == 0: loss = self.loss0(out, y) else: loss = self.loss1(out, y) # calculate acc labels_hat = torch.argmax(out, dim=1) acc = torch.sum(y == labels_hat).item() / (len(y) * 1.0) # log the outputs separately for each dataloader self.log_dict({f"val_loss_{dataloader_idx}": loss, f"val_acc_{dataloader_idx}": acc})
Note
If you donβt need to validate you donβt need to implement this method.
Note
When the
validation_step()is called, the model has been put in eval mode and PyTorch gradients have been disabled. At the end of validation, the model goes back to training mode and gradients are enabled.
- test_step(batch, batch_idx)[source]ο
Operates on a single batch of data from the test set. In this step youβd normally generate examples or calculate anything of interest such as accuracy.
- Parameters:
batch β The output of your data iterable, normally a
DataLoader.batch_idx β The index of this batch.
dataloader_idx β The index of the dataloader that produced this batch. (only if multiple dataloaders used)
- Returns:
Tensor- The loss tensordict- A dictionary. Can include any keys, but must include the key'loss'.None- Skip to the next batch.
# if you have one test dataloader: def test_step(self, batch, batch_idx): ... # if you have multiple test dataloaders: def test_step(self, batch, batch_idx, dataloader_idx=0): ...
Examples:
# CASE 1: A single test dataset def test_step(self, batch, batch_idx): x, y = batch # implement your own out = self(x) loss = self.loss(out, y) # log 6 example images # or generated text... or whatever sample_imgs = x[:6] grid = torchvision.utils.make_grid(sample_imgs) self.logger.experiment.add_image('example_images', grid, 0) # calculate acc labels_hat = torch.argmax(out, dim=1) test_acc = torch.sum(y == labels_hat).item() / (len(y) * 1.0) # log the outputs! self.log_dict({'test_loss': loss, 'test_acc': test_acc})
If you pass in multiple test dataloaders,
test_step()will have an additional argument. We recommend setting the default value of 0 so that you can quickly switch between single and multiple dataloaders.# CASE 2: multiple test dataloaders def test_step(self, batch, batch_idx, dataloader_idx=0): # dataloader_idx tells you which dataset this is. x, y = batch # implement your own out = self(x) if dataloader_idx == 0: loss = self.loss0(out, y) else: loss = self.loss1(out, y) # calculate acc labels_hat = torch.argmax(out, dim=1) acc = torch.sum(y == labels_hat).item() / (len(y) * 1.0) # log the outputs separately for each dataloader self.log_dict({f"test_loss_{dataloader_idx}": loss, f"test_acc_{dataloader_idx}": acc})
Note
If you donβt need to test you donβt need to implement this method.
Note
When the
test_step()is called, the model has been put in eval mode and PyTorch gradients have been disabled. At the end of the test epoch, the model goes back to training mode and gradients are enabled.
- predict_step(batch, batch_idx, dataloader_idx=0)[source]ο
Step function called during
predict(). By default, it callsforward(). Override to add any processing logic.The
predict_step()is used to scale inference on multi-devices.To prevent an OOM error, it is possible to use
BasePredictionWritercallback to write the predictions to disk or database after each batch or on epoch end.The
BasePredictionWritershould be used while using a spawn based accelerator. This happens forTrainer(strategy="ddp_spawn")or training on 8 TPU cores withTrainer(accelerator="tpu", devices=8)as predictions wonβt be returned.- Parameters:
batch β The output of your data iterable, normally a
DataLoader.batch_idx β The index of this batch.
dataloader_idx β The index of the dataloader that produced this batch. (only if multiple dataloaders used)
- Returns:
Predicted output (optional).
Example
class MyModel(LightningModule): def predict_step(self, batch, batch_idx, dataloader_idx=0): return self(batch) dm = ... model = MyModel() trainer = Trainer(accelerator="gpu", devices=2) predictions = trainer.predict(model, dm)
monitoring.monitoringο
- itwinai.torch.monitoring.monitoring.profile_gpu_utilization(stop_flag: ValueProxy, local_rank: int, global_rank: int, logger: Logger, run_id: str | None = None, parent_run_id: str | None = None, probing_interval: int = 2, warmup_time: int = 5) None[source]ο
Logs the GPU utilization across all availble GPUs on a single node. Is meant to be called by multiprocessingβs Process and expects variables to be shared using a multiprocessing.Manager object. Logs utilization into log_dict until stop_flag.value is set to True.
- Parameters:
stop_flag (ValueProxy) β A shared flag to stop the profiling process.
local_rank (int) β Local rank of the GPU being profiled.
global_rank (int) β Global rank of the process.
logger (Logger) β Logger instance to log GPU utilization data.
run_id (str | None) β ID of the MLflow run for logging.
parent_run_id (str | None) β ID of the parent MLflow run for logging.
probing_interval (int) β Interval in seconds between each probing of GPU utilization.
warmup_time (int) β Time in seconds to wait before starting the profiling.
monitoring.backendο
- class itwinai.torch.monitoring.backend.GPUBackend[source]ο
Bases:
ABC- abstract property man_lib: ModuleType | Noneο
The library used for GPU management.
- abstract property man_type: Literal['nvidia', 'amd'] | Noneο
The type of GPU management library used.
- abstract get_handle_by_uuid(gpu_uuid: str) object[source]ο
Get the device handle for a specific GPU UUID.
- abstract get_handle_by_id(gpu_id: int) object[source]ο
Get the device handle for a specific GPU index (ID).
- abstract get_gpu_utilization(handle) float[source]ο
Get the GPU utilization (%) for a given handle.
- class itwinai.torch.monitoring.backend.NvidiaBackend[source]ο
Bases:
GPUBackend- property man_lib: ModuleType | Noneο
The library used for GPU management.
- property man_type: Literal['nvidia', 'amd'] | Noneο
The type of GPU management library used.
- class itwinai.torch.monitoring.backend.AMDBackend[source]ο
Bases:
GPUBackend- property man_lib: ModuleType | Noneο
The library used for GPU management.
- property man_type: Literal['nvidia', 'amd'] | Noneο
The type of GPU management library used.
- itwinai.torch.monitoring.backend.detect_gpu_backend() GPUBackend[source]ο
Detects the available GPU backend and returns an instance of the corresponding class.
mlflowο
- itwinai.torch.mlflow.init_lightning_mlflow(pl_config: Dict, default_experiment_name: str = 'Default', tmp_dir: str = '.tmp', **autolog_kwargs) None[source]ο
Initialize mlflow for pytorch lightning, also setting up auto-logging (mlflow.pytorch.autolog(β¦)). Creates a new mlflow run and attaches it to the mlflow auto-logger.
- Parameters:
pl_config (Dict) β pytorch lightning configuration loaded in memory.
default_experiment_name (str, optional) β used as experiment name if it is not given in the lightning conf. Defaults to βDefaultβ.
tmp_dir (str) β where to temporarily store some artifacts.
autolog_kwargs (kwargs) β args for mlflow.pytorch.autolog(β¦).
- itwinai.torch.mlflow.get_epoch_time_runs_by_parent(mlflow_client: MlflowClient, experiment_id: str, run: Run) List[Run][source]ο
Get all epoch time runs associated with a given run. This function assumes that the data is in the main worker run of each train run. Which is either: - The main worker run in each trial run of a given tuner run (if Ray was used) - The main worker run of the given training run (if Ray was not used) :param mlflow_client: MLFlow client to use. :type mlflow_client: mlflow.tracking.MlflowClient :param experiment_id: The ID of the experiment to search in. :type experiment_id: str :param run: The run from which to collect epoch runs. :type run: mlflow.entities.Run
- Returns:
A list of runs that contain epoch time data associated with the given run.
- Return type:
List[Run]
- itwinai.torch.mlflow.get_profiling_avg_by_parent(mlflow_client: MlflowClient, experiment_id: str, run: Run) List[DataFrame][source]ο
Get all worker profiling averages associated with a given run. This function assumes that the worker profiling averages are either: - Nested under the trial runs of a tuner run (if Ray was used) - Nested under the training run (if Ray was not used)
- Parameters:
mlflow_client (mlflow.tracking.MlflowClient) β MLFlow client to use
experiment_id (str) β The ID of the experiment to search in.
run (mlflow.entities.Run) β The run from which to collect worker profiling averages.
- Returns:
- A list of DataFrames containing the worker profiling averages
associated with the given run. Each DataFrame corresponds to a worker run.
- Return type:
List[pd.DataFrame]
- itwinai.torch.mlflow.get_gpu_runs_by_parent(mlflow_client: MlflowClient, experiment_id: str, run: Run) List[Run][source]ο
Get all GPU worker runs associated with a given run. This function assumes that the GPU worker runs are either: - Nested under the trial runs of a tuner run (if Ray was used) - Nested under the training run (if Ray was not used)
- Parameters:
mlflow_client (mlflow.tracking.MlflowClient) β MLFlow client to use.
experiment_id (str) β The ID of the experiment to search in.
run (mlflow.entities.Run) β The run from which to collect GPU worker runs.
- Returns:
A list of runs that are GPU workers associated with the given run.
- Return type:
List[Run]
- itwinai.torch.mlflow.get_metric_names(run: Run) List[str][source]ο
Get the names of all metrics logged in a run.
- itwinai.torch.mlflow.get_params(run: Run) Dict[str, str][source]ο
Get the parameters logged in a run.
- itwinai.torch.mlflow.get_run_metrics_as_df(mlflow_client: MlflowClient, run: Run, metric_names: List[str] | None = None)[source]ο
Collect metrics logged in a run and return them as a tidy DataFrame.
- Parameters:
mlflow_client (mlflow.MlflowClient) β MLFlow client to use.
run (mlflow.entities.Run) β The run from which to collect metrics.
metric_names (List[str] | None) β If provided, only these metrics will be collected. If None, all metrics will be collected.
- Returns:
- A DataFrame containing the metrics, with columns:
metric_name: the name of the metric
sample_idx: the step index of the metric
timestamp: the timestamp of the metric
value: the value of the metric
all parameters logged in the run
- Return type:
pd.DataFrame
- itwinai.torch.mlflow.get_runs_by_name(mlflow_client: MlflowClient, experiment_id: str, run_names: List[str] | None = None) List[Run][source]ο
Get all runs in an experiment by their names.
- Parameters:
mlflow_client (mlflow.tracking.MlflowClient) β MLFlow client to use.
experiment_id (str) β The ID of the experiment to search in.
run_names (List[str] | None) β The names of the runs to retrieve. If None, all runs in the experiment will be retrieved.
- Returns:
A list of runs that match the given names.
- Return type:
List[Run]
profiling.profilerο
profiling.py_spy_aggregationο
- class itwinai.torch.profiling.py_spy_aggregation.StackFrame(*, name: str, path: str, line: str, num_samples: int, proportion: float | None = None, library_function_name: str = 'Not Found', library_function_path: str = 'Not Found', library_function_line: str = 'Not Found')[source]ο
Bases:
BaseModelRepresents a single stack frame in a call stack.
- name: strο
- path: strο
- line: strο
- num_samples: intο
- proportion: float | Noneο
- library_function_name: strο
- library_function_path: strο
- library_function_line: strο
- model_config: ClassVar[ConfigDict] = {}ο
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- itwinai.torch.profiling.py_spy_aggregation.add_library_information(stack_traces: List[List[StackFrame]], library_name: str = 'itwinai') None[source]ο
Iterates through each stack traces in the given list and finds the lowest call with the given library name, if any, and adds it to each dictionary in the trace.
This function changes the given list in-place.
- itwinai.torch.profiling.py_spy_aggregation.get_aggregated_paths(stack_frames: List[StackFrame]) List[StackFrame][source]ο
Aggregate stack frames with identical keys by summing their sample counts.
Two stack frames are considered identical if their get_aggregation_key() values match. Returns a new list of stack frames, each representing a unique key with the corresponding total number of samples.
- itwinai.torch.profiling.py_spy_aggregation.parse_trace_line_to_stack_frame(trace_line: str, num_samples: int) StackFrame | None[source]ο
Parses a single trace line, which contains a function name, file name and line number, to a StackFrame object.
- Raises:
ValueError β If the given trace line does not conform to the expected structure of
"function_name (path/to/function β line_number)β.
- itwinai.torch.profiling.py_spy_aggregation.convert_stack_trace_to_list(line: str) List[StackFrame][source]ο
Converts a single line of the raw profiling output, which contains a stack trace and a number of samples, to a list of StackFrame objects with the data for each line of the stack trace.
Note
The number of samples is the last number in the string, separated from the rest with a space. There can be multiple spaces, however, so we have to extract the last element.
- Raises:
ValueError β If any of the samples are not numeric.
ValueError β If any of the lines in the stack trace are not formatted as expected.
- itwinai.torch.profiling.py_spy_aggregation.parse_py_spy_lines(file_lines: List[str]) List[List[StackFrame]][source]ο
Parse the lines of a py-spy-profiling output and turn it into a list of call stacks, each of which is a list of
- Raises:
ValueError β If converting the stack trace to a list fails.
- itwinai.torch.profiling.py_spy_aggregation.parse_num_rows(num_rows: str) int | None[source]ο
Parses the number of rows from a string. Makes sure it is either βallβ or greater than zero. If it is βallβ then it returns None.
- Raises:
ValueError β If num_rows is not numeric or βallβ.
ValueError β If βnum_rows` is parsed to a number smaller than one.
- itwinai.torch.profiling.py_spy_aggregation.read_stack_traces(path: Path) List[List[StackFrame]][source]ο
Reads stack traces from a path. The path can either point to a file or a directory. If it points to a directory, it will read all files in this directory and combine them.
- Raises:
ValueError β If parsing the lines from one of the files fails.
reproducibilityο
This module provides the tools to support reproducible execution of torch scripts.
- itwinai.torch.reproducibility.set_seed(rnd_seed: int | None, deterministic_cudnn: bool = True) Generator[source]ο
Set torch random seed and return a PRNG object.
- Parameters:
rnd_seed (Optional[int]) β random seed. If None, the seed is not set.
deterministic_cudnn (bool) β if True, sets
torch.backends.cudnn.benchmark = False, which may affect performances.
- Returns:
PRNG object.
- Return type:
torch.Generator
trainerο
Provides training logic for PyTorch models via Trainer classes.
- class itwinai.torch.trainer.TorchTrainer(config: Dict | TrainingConfiguration, epochs: int, model: Module | None = None, strategy: Literal['ddp', 'deepspeed', 'horovod'] = 'ddp', test_every: int | None = None, random_seed: int | None = None, logger: Logger | None = None, metrics: Dict[str, Metric] | None = None, checkpoints_location: str | Path = 'checkpoints', checkpoint_every: int | None = None, disable_tqdm: bool = False, name: str | None = None, profiling_wait_epochs: int = 0, profiling_warmup_epochs: int = 0, measure_gpu_data: bool = False, enable_torch_profiling: bool = False, store_torch_profiling_traces: bool = False, measure_epoch_time: bool = False, ray_scaling_config: ScalingConfig | None = None, ray_tune_config: TuneConfig | None = None, ray_run_config: RunConfig | None = None, ray_search_space: Dict[str, Any] | None = None, ray_torch_config: TorchConfig | None = None, ray_data_config: DataConfig | None = None, from_checkpoint: Path | str | None = None, initial_best_validation_metric: str = 'inf', run_name: str | None = None, time_ray: bool = False)[source]ο
-
Trainer class for torch training algorithms.
- Parameters:
config (Dict | TrainingConfiguration) β training configuration containing hyperparameters.
epochs (int) β number of training epochs.
model (nn.Module | None, optional) β pytorch model to train or a string identifier. Defaults to None.
strategy (Literal['ddp', 'deepspeed', 'horovod'], optional) β distributed strategy. Defaults to βddpβ.
test_every (int | None, optional) β run a test epoch every
test_everyepochs. Disabled if None. Defaults to None.random_seed (int | None, optional) β set random seed for reproducibility. If None, the seed is not set. Defaults to None.
logger (Logger | None, optional) β logger for ML tracking. Defaults to None.
metrics (Dict[str, Callable] | None, optional) β map of torchmetrics metrics. Defaults to None.
checkpoints_location (str) β path to checkpoints directory. Defaults to βcheckpointsβ.
checkpoint_every (int | None) β save a checkpoint every
checkpoint_everyepochs. Disabled if None. Defaults to None.disable_tqdm (bool) β whether to disable tqdm progress bar(s).
name (str | None, optional) β trainer custom name. Defaults to None.
profiling_wait_epochs (int) β how many epochs to wait before starting the profiler.
profiling_warmup_epochs (int) β length of the profiler warmup phase in terms of number of epochs.
measure_gpu_data (bool) β enable the collection of data on average GPU utilization and total energy consumption throughout training. Defaults to False.
enable_torch_profiling (bool) β enable the profiling of computation. It uses the torch profiler and it may slow down training. Defaults to False.
measure_epoch_time (bool) β enable the measurement of epoch duration (in seconds). Defaults to False.
ray_scaling_config (ScalingConfig, optional) β scaling config for Ray Trainer. Defaults to None.
ray_tune_config (TuneConfig, optional) β tune config for Ray Tuner. Defaults to None.
ray_run_config (ray.tune.RunConfig, optional) β run config for Ray Tuner. Distributed training with Ray but without HPO will still be wrapped into a Ray Tuner, to keep everything homogeneous. Defaults to None.
ray_search_space (Dict[str, Any], optional) β search space for Ray Tuner. Defaults to None.
ray_torch_config (TorchConfig, optional) β torch configuration for Rayβs TorchTrainer. Defaults to None.
ray_data_config (DataConfig, optional) β dataset configuration for Ray. Defaults to None.
from_checkpoint (str | Path, optional) β path to checkpoint directory. Defaults to None.
initial_best_validation_metric (str) β initial value for the best validation metric. Usually the validation metric is a loss to be minimized and this value exceeds the highest possible loss value, so that it will be overwritten when the first validation loss is computed. Example values are βinfβ and β-infβ, depending on wether the best validation metric should be minimized or maximized. Defaults to βinfβ.
run_name (str, optional) β name used to identify a specific run when collecting metrics on the trainer (e.g. GPU utilization). Defaults to None.
time_ray (bool) β whether to time and log the execution of Ray functions. Defaults to False.
- train_dataloader: DataLoader | None = Noneο
PyTorch
DataLoaderfor training dataset.
- validation_dataloader: DataLoader | None = Noneο
PyTorch
DataLoaderfor validation dataset.
- test_dataloader: DataLoader | None = Noneο
PyTorch
DataLoaderfor test dataset.
- loss: Callable | None = Noneο
Loss criterion.
- optimizer: Optimizer | None = Noneο
Optimizer.
- lr_scheduler: LRScheduler | None = Noneο
Learning rate scheduler.
- torch_rng: Generator | None = Noneο
PyTorch random number generator (PRNG).
- train_glob_step: int = 0ο
Total number training batches used so far, across all epochs.
- validation_glob_step: int = 0ο
Total number validation batches used so far, across all epochs.
- test_glob_step: int = 0ο
Total number test batches used so far, across all epochs.
- mlflow_tune_run_id: str | None = Noneο
- mlflow_train_run_id: str | None = Noneο
- mlflow_worker_run_id: str | None = Noneο
- model: Module | None = Noneο
PyTorch model to train.
- metrics: Dict[str, Callable]ο
Dictionary of
torchmetricsmetrics, indexed by user-defined names.
- profiler: Any | Noneο
PyTorch Profiler for computation ratio profiling.
- measure_gpu_data: bool = Falseο
Toggle for GPU utilization monitoring
- enable_torch_profiling: bool = Falseο
Toggle for computation fraction profiling
- store_torch_profiling_traces: bool = Falseο
Store PyTorch Profiling traces
- measure_epoch_time: bool = Falseο
Toggle for epoch time tracking
- time_ray: bool = Falseο
Toggle for Ray time logging
- run_name: strο
Run ID
- property strategy: TorchDistributedStrategyο
Strategy currently in use.
- property device: strο
Current device from distributed strategy.
- get_default_distributed_kwargs() Dict[source]ο
Gives the default kwargs for the trainerβs strategyβs distributed() method.
- create_model_loss_optimizer() None[source]ο
Instantiate a torch model, loss, optimizer, and LR scheduler using the configuration provided in the Trainer constructor. Generally a user-defined method.
- Raises:
ValueError β If
self.modelis None.
- save_checkpoint(name: str, best_validation_metric: Tensor | None = None, checkpoints_root: Path | str | None = None, force: bool = False) str | None[source]ο
Save training checkpoint.
- Parameters:
name (str) β name of the checkpoint directory.
best_validation_metric (torch.Tensor | None) β best validation metric throughout training so far (if available). Usually this is the validation loss.
checkpoints_root (str | None) β path for root checkpoints dir. If None, uses
self.checkpoints_locationas base.force (bool) β force checkpointing now.
- Returns:
path to the checkpoint file or
Nonewhen the checkpoint is not created.
- create_dataloaders(train_dataset: Dataset, validation_dataset: Dataset | None = None, test_dataset: Dataset | None = None) None[source]ο
Create train, validation and test dataloaders using the configuration provided in the Trainer constructor. Generally a user-defined method.
- Parameters:
train_dataset (Dataset) β training dataset object.
validation_dataset (Dataset | None) β validation dataset object. Default None.
test_dataset (Dataset | None) β test dataset object. Default None.
- execute(train_dataset: Dataset, validation_dataset: Dataset | None = None, test_dataset: Dataset | None = None) Tuple[Dataset, Dataset, Dataset, Any][source]ο
Prepares distributed environment and data structures for the actual training.
- Parameters:
train_dataset (Dataset) β training dataset.
validation_dataset (Dataset | None, optional) β validation dataset. Defaults to None.
test_dataset (Dataset | None, optional) β test dataset. Defaults to None.
- Returns:
training dataset, validation dataset, test dataset, trained model.
- Return type:
Tuple[Dataset, Dataset, Dataset, Any]
- log(item: Any | List[Any], identifier: str | List[str], kind: str = 'metric', step: int | None = None, batch_idx: int | None = None, **kwargs) None[source]ο
Log
itemwithidentifiername ofkindtype atsteptime step.- Parameters:
item (Any | List[Any]) β element to be logged (e.g., metric).
identifier (str | List[str]) β unique identifier for the element to log(e.g., name of a metric).
kind (str, optional) β type of the item to be logged. Must be one among the list of self.supported_types. Defaults to βmetricβ.
step (int | None, optional) β logging step. Defaults to None.
batch_idx (int | None, optional) β DataLoader batch counter (i.e., batch idx), if available. Defaults to None.
- ray_report(metrics: Dict[str, float], checkpoint_file: Path | str | None = None, checkpoint_dir: Path | str | None = None, checkpoint_data: Any | None = None) None[source]ο
Report a dictionary of metrics and optionally a checkpoint to Ray, only when using Ray distributed strategies. The checkpoint could be in the form of a Python object (passed as
checkpoint_data), the path to a single file (passed ascheckpoint_file), or the path to an existing checkpoint directory (passed ascheckpoint_dir).- Parameters:
metrics (Dict[str, float]) β metrics to be reported.
checkpoint_file (str | Path | None, optional) β path to the checkpoint file. Defaults to None.
checkpoint_dir (str | Path | None, optional) β path to the checkpoint directory. Defaults to None.
checkpoint_data (Any | None, optional) β object to serialize as a checkpoint. Defaults to None.
- compute_metrics(true: Tensor, pred: Tensor, logger_step: int, batch_idx: int | None, stage: str = 'train') Dict[str, Any][source]ο
Compute and log metrics.
- Parameters:
true (torch.Tensor) β true values.
pred (torch.Tensor) β predicted values.
logger_step (int) β global step to pass to the logger.
stage (str) β βtrainβ, βvalidationββ¦
- Returns:
metric values.
- Return type:
Dict[str, Any]
- train() None[source]ο
Trains a machine learning model. Main training loop/logic.
- Parameters:
train_dataset (Dataset) β training dataset.
validation_dataset (Dataset) β validation dataset.
test_dataset (Dataset) β test dataset.
- Returns:
The training dataset. Dataset: The validation dataset. Dataset: The test dataset. Any: The trained model
- Return type:
Dataset
- train_epoch() Tensor[source]ο
Perform a complete sweep over the training dataset, completing an epoch of training.
- Parameters:
epoch (int) β current epoch number, from 0 to
self.epochs - 1.- Returns:
average training loss for the current epoch.
- Return type:
torch.Tensor
- train_step(batch: Tensor, batch_idx: int) Tuple[Tensor, Dict[str, Any]][source]ο
Perform a single optimization step using a batch sampled from the training dataset.
- Parameters:
batch (torch.Tensor) β batch sampled by a dataloader.
batch_idx (int) β batch index in the dataloader.
- Returns:
The batch loss. Dict[str, Any]: Dictionary of metric values (same structure as
self.metrics).- Return type:
torch.Tensor
- validation_epoch() Tensor | None[source]ο
Perform a complete sweep over the validation dataset, completing an epoch of validation.
- Returns:
average validation loss for the current epoch if self.validation_dataloader is not None
- Return type:
torch.Tensor | None
- validation_step(batch: Tensor, batch_idx: int) Tuple[Tensor, Dict[str, Any]][source]ο
Perform a single optimization step using a batch sampled from the validation dataset.
- Parameters:
batch (torch.Tensor) β batch sampled by a dataloader.
batch_idx (int) β batch index in the dataloader.
- Returns:
Batch loss. Dict[str, Any]: Dictionary of metric values (same structure as
self.metrics).- Return type:
torch.Tensor
- test_epoch() Tensor[source]ο
Perform a complete sweep over the test dataset, completing an epoch of test.
- Returns:
average test loss for the current epoch.
- Return type:
torch.Tensor
- test_step(batch: Tensor, batch_idx: int) Tuple[Tensor, Dict[str, Any]][source]ο
Perform a single predictions step using a batch sampled from the test dataset.
- Parameters:
batch (torch.Tensor) β batch sampled by a dataloader.
batch_idx (int) β batch index in the dataloader.
- Returns:
The batch loss Dict[str, Any]: Dictionary of metric values (same structure as
self.metrics).- Return type:
torch.Tensor
- class itwinai.torch.trainer.TorchLightningTrainer(config: Dict | str, mlflow_saved_model: str = 'my_model')[source]ο
Bases:
TrainerGeneric trainer for torch Lightning workflows.
- Parameters:
config (Dict | str) β Lightning configuration which can be the path to a file or a Python dictionary.
mlflow_saved_model (str, optional) β name of the model created in MLFlow. Defaults to βmy_modelβ.
tuningο
Logic to parse configuration and transform it into Ray objects.
typeο
Custom types definition.
- itwinai.torch.type.Batchο
Torch data batch sampled by a
DataLoader.
- itwinai.torch.type.Metricο
Torch metric function provided by
torchmetricslibrary.alias of
Callable