itwinai.torchο
config.pyο
Default configuration
- class itwinai.torch.config.Configuration(**extra_data: Any)[source]ο
Bases:
BaseModelBase configuration class.
- model_config: ClassVar[ConfigDict] = {'extra': 'allow'}ο
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class itwinai.torch.config.TrainingConfiguration(*, batch_size: int = 32, shuffle_train: bool = False, shuffle_validation: bool = False, shuffle_test: bool = False, pin_gpu_memory: bool = False, num_workers_dataloader: int = 4, loss: Literal['mse', 'nllloss', 'cross_entropy', 'l1', 'l2'] = 'cross_entropy', optimizer: Literal['adadelta', 'adam', 'rmsprop', 'sgd'] = 'adam', optim_lr: float = 0.001, optim_momentum: float = 0.9, optim_weight_decay: float = 0.0, fp16_allreduce: bool = False, use_adasum: bool = False, gradient_predivide_factor: float = 1.0, dist_backend: Literal['nccl', 'gloo', 'mpi'] = 'nccl', **extra_data: Any)[source]ο
Bases:
ConfigurationDefault configuration object for training. Override and/or create new configurations using the constructor.
Example:
>>> cfg = TrainingConfiguration(batch_size=17, param_a=42) >>> print(cfg.batch_size) # returns 17 (overrides default) >>> print(cfg.param_a) # returns 42 (new value) >>> print(cfg.pin_memory) # returns the default value >>> >>> from rich import print >>> print(cfg) # pretty-print of configuration
Warning
Donβt reinvent parameters that already exist in the training coniguration, if possible. Instead, use the name of the parameters in the training configuration when possible to avoid inconsistencies. For instance, the training configuration defines the learning rate as
optim_lr, so if you redefine it aslrby doingTrainingConfiguration(lr=0.005)in the configuration you will now have bothoptim_lr(created by default) andlr(created by you). This may create confusion and potentially (and silently) break the logic in your code.- batch_size: intο
Batch size. In a distributed environment it is usually the per-worker batch size. Defaults to 32.
- shuffle_train: boolο
Whether to shuffle train dataset when creating a torch
DataLoader. Defaults to False.
- shuffle_validation: boolο
Whether to shuffle validation dataset when creating a torch
DataLoader. Defaults to False.
- shuffle_test: boolο
Whether to shuffle test dataset when creating a torch
DataLoader. Defaults to False.
- pin_gpu_memory: boolο
Whether to pin GPU memory. Property of torch
DataLoader. Defaults to False.
- num_workers_dataloader: intο
Number of parallel workers used by torch
DataLoader. Defaults to 4.
- loss: Literal['mse', 'nllloss', 'cross_entropy', 'l1', 'l2']ο
Loss function. Defaults to βcross_entropyβ
- optimizer: Literal['adadelta', 'adam', 'rmsprop', 'sgd']ο
Name of the optimizer to use. Defaults to βadamβ.
- optim_lr: floatο
Learning rate used by the optimizer. Defaults to 1e-3.
- optim_momentum: floatο
Momentum used by some optimizers (e.g., SGD). Defaults to 0.9.
- optim_weight_decay: floatο
Weight decay parameter for the optimizer. Defaults to 0.
- fp16_allreduce: boolο
uses float16 operations in the allreduce distributed gradients aggregation. Better performances at lower precision. Defaults to False.
- Type:
Parameter of Horovodβs
DistributedOptimizer
- use_adasum: boolο
use Adasum optimization. Defaults to False.
- Type:
Parameter of Horovodβs
DistributedOptimizer
- gradient_predivide_factor: floatο
scale gradients before adding them up. Defaults to 1.0.
- Type:
Parameter of Horovodβs
DistributedOptimizer
- model_config: ClassVar[ConfigDict] = {'extra': 'allow'}ο
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
distributed.pyο
- itwinai.torch.distributed.distributed_resources_available() bool[source]ο
Check if the current execution environment has (enough) GPUs available to allow for distributed ML.
- Returns:
env can support distributed ML.
- Return type:
bool
- itwinai.torch.distributed.check_initialized(method: Callable) Callable[source]ο
Decorator for strategy methods to check whether the strategy was correctly initialized before calling the method.
- itwinai.torch.distributed.initialize_ray() None[source]ο
This method is used by the RayDDPStrategy and RayDeepSpeedStrategy to initialize the Ray backend if it is not already initialized.
- Raises:
- EnvironmentError: If required environment variables HEAD_NODE_PORT or
HEAD_NODE_IP are not set. These should be set from the slurm script where the ray cluster is launched.
- class itwinai.torch.distributed.TorchDistributedStrategy[source]ο
Bases:
DistributedStrategyAbstract class to define the distributed backend methods for PyTorch models.
- is_distributed: bool = Trueο
Allows to discriminate distributed strategies from non-distributed. Defaults to True.
- is_initialized: bool = Falseο
Set to True when the current strategy is initialized. Defaults to False.
- name: strο
- property is_main_worker: boolο
Checks if local worker has global rank equal to zero.
- Returns:
True if main worker.
- Return type:
bool
- abstract distributed(model: Module, optimizer: Optimizer, lr_scheduler: _LRScheduler | None = None) Tuple[Module, Optimizer, _LRScheduler | None][source]ο
Setup model, optimizer and scheduler for distributed.
- abstract global_world_size() int[source]ο
Returns the total number of processes (global world size).
- Returns:
global world size.
- Return type:
int
- abstract local_world_size() int[source]ο
Returns the number of local workers available on a node (local world size). Usually it is equal to the number of available GPUs.
- Returns:
local world size.
- Return type:
int
- abstract global_rank() int[source]ο
Returns the global rank of the current process. Rank ranges from 0 to world_size.
- Returns:
global rank.
- Return type:
int
- abstract local_rank() int[source]ο
Returns the local rank of the current process.
- Returns:
local rank.
- Return type:
int
- device() str[source]ο
Device used by local worker.
- Returns:
torch device in the form βdevice:Nβ (e.g., βcuda:0β, βcpuβ).
- Return type:
str
- create_dataloader(dataset: Dataset[T_co], batch_size: int | None = 1, shuffle: bool | None = None, sampler: Sampler | Iterable | None = None, batch_sampler: Sampler[List] | Iterable[List] | None = None, num_workers: int = 0, collate_fn: Callable[[List[T]], Any] | None = None, pin_memory: bool = False, drop_last: bool = False, timeout: float = 0, worker_init_fn: Callable[[int], None] | None = None, multiprocessing_context=None, generator=None, *, prefetch_factor: int | None = None, persistent_workers: bool = False, pin_memory_device: str = '')[source]ο
Create a distributed DataLoader by using
DistributedSampleras random sampler.- Parameters:
dataset (Dataset) β dataset from which to load the data.
batch_size (int, optional) β how many samples per batch to load (default:
1).shuffle (bool, optional) β set to
Trueto have the data reshuffled at every epoch (default:False).sampler (Sampler or Iterable, optional) β defines the strategy to draw samples from the dataset. Can be any
Iterablewith__len__implemented. If specified,shufflemust not be specified.batch_sampler (Sampler or Iterable, optional) β like
sampler, but returns a batch of indices at a time. Mutually exclusive withbatch_size,shuffle,sampler, anddrop_last.num_workers (int, optional) β how many subprocesses to use for data loading.
0means that the data will be loaded in the main process. (default:0)collate_fn (Callable, optional) β merges a list of samples to form a mini-batch of Tensor(s). Used when using batched loading from a map-style dataset.
pin_memory (bool, optional) β If
True, the data loader will copy Tensors into device/CUDA pinned memory before returning them. If your data elements are a custom type, or yourcollate_fnreturns a batch that is a custom type, see the example below.drop_last (bool, optional) β set to
Trueto drop the last incomplete batch, if the dataset size is not divisible by the batch size. IfFalseand the size of dataset is not divisible by the batch size, then the last batch will be smaller. (default:False)timeout (numeric, optional) β if positive, the timeout value for collecting a batch from workers. Should always be non-negative. (default:
0)worker_init_fn (Callable, optional) β If not
None, this will be called on each worker subprocess with the worker id (an int in[0, num_workers - 1]) as input, after seeding and before data loading. (default:None)or (multiprocessing_context (str) β multiprocessing.context.BaseContext, optional): If
None, the default multiprocessing context of your operating system will be used. (default:None)generator (torch.Generator, optional) β If not
None, this RNG will be used by RandomSampler to generate random indexes and multiprocessing to generatebase_seedfor workers. (default:None)prefetch_factor (int, optional, keyword-only arg) β Number of batches loaded in advance by each worker.
2means there will be a total of 2 * num_workers batches prefetched across all workers. (default value depends on the set value for num_workers. If value of num_workers=0 default isNone. Otherwise, if value ofnum_workers > 0default is2).persistent_workers (bool, optional) β If
True, the data loader will not shut down the worker processes after a dataset has been consumed once. This allows to maintain the workers Dataset instances alive. (default:False)pin_memory_device (str, optional) β the device to
pin_memoryto ifpin_memoryisTrue.
- Raises:
UninitializedStrategyError β when this method is called for a strategy which had not been initialized.
RuntimeError β when a user-provided sampler, if given, is not of type
DistributedSampler.
Warning
If the
spawnstart method is used,worker_init_fncannot be an unpicklable object, e.g., a lambda function. See Multiprocessing best practices on more details related to multiprocessing in PyTorch.Warning
len(dataloader)heuristic is based on the length of the sampler used. Whendatasetis anIterableDataset, it instead returns an estimate based onlen(dataset) / batch_size, with proper rounding depending ondrop_last, regardless of multi-process loading configurations. This represents the best guess PyTorch can make because PyTorch trusts userdatasetcode in correctly handling multi-process loading to avoid duplicate data.However, if sharding results in multiple workers having incomplete last batches, this estimate can still be inaccurate, because (1) an otherwise complete batch can be broken into multiple ones and (2) more than one batch worth of samples can be dropped when
drop_lastis set. Unfortunately, PyTorch can not detect such cases in general.See Dataset Types for more details on these two types of datasets and how
IterableDatasetinteracts with Multi-process data loading.Warning
See Reproducibility, and My data loader workers return identical random numbers, and Randomness in multi-process data loading notes for random seed related questions.
- abstract allgather_obj(obj: Any) List[Any][source]ο
All-gathers any object from the whole group in a list (to all workers).
- Parameters:
obj (Any) β object to gather from all workers.
- Returns:
list of objects gathered from all workers.
- Return type:
List[Any]
- abstract gather_obj(obj: Any, dst_rank: int = 0) List[Any][source]ο
Gathers any object from the whole group in a list (to all workers).
- Parameters:
obj (Any) β object to gather from all workers.
dst_rank (int) β rank of the worker on which the objects list are gathered.
- Returns:
list of objects gathered from all workers.
- Return type:
List[Any]
- abstract gather(tensor: Tensor, dst_rank: int = 0) List | None[source]ο
Gathers any object from the whole group in a list (to all workers).
- Parameters:
obj (Any) β object to gather from all workers.
dst_rank (int) β rank of the worker on which the objects list are gathered.
- Returns:
- list of objects gathered from all workers if main
worker, otherwise return None.
- Return type:
Optional[List[Any]]
- class itwinai.torch.distributed.TorchDDPStrategy(backend: Literal['nccl', 'gloo', 'mpi'])[source]ο
Bases:
TorchDistributedStrategyPyTorch
DistributedDataParalleldistributed strategy class.- Parameters:
backend (Literal['nccl', 'gloo', 'mpi']) β Name of the distributed communication backend to employ.
- backend: Literal['nccl', 'gloo', 'mpi']ο
Torch distributed communication backend.
- init() None[source]ο
Initializes the distributed process group and the distributed package.
- Raises:
RuntimeError β when there are not (enough) GPUs available.
DistributedStrategyError β when trying to initialize a strategy which is already initialized.
- distributed(model: Module, optimizer: Optimizer, lr_scheduler: _LRScheduler | None = None, find_unused_parameters: bool = False, **kwargs) Tuple[Module, Optimizer, _LRScheduler | None][source]ο
Setup model, optimizer and scheduler for distributed.
- global_world_size() int[source]ο
Returns the total number of processes (global world size).
- Returns:
global world size.
- Return type:
int
- local_world_size() int[source]ο
Returns the local number of workers available per node, which is usually the number of GPUs available.
- Returns:
local world size.
- Return type:
int
- Raises:
RuntimeError β when the local world size cannot be retrieved.
- global_rank() int[source]ο
Returns the global rank of the current process, where rank ranges from 0 to world_size.
- Returns:
global rank.
- Return type:
int
- local_rank() int[source]ο
Returns the local rank of the current process.
- Returns:
local rank.
- Return type:
int
- allgather_obj(obj: Any) List[Any][source]ο
All-gathers any object from the whole group in a list (to all workers).
- Parameters:
obj (Any) β Object to gather from all workers.
- Returns:
List of gathered objects.
- Return type:
List[Any]
- gather_obj(obj: Any, dst_rank: int = 0) List[Any] | None[source]ο
Gathers any object from the whole group in a list (to all workers).
- Parameters:
obj (Any) β object to gather from all workers.
dst_rank (int) β rank of the worker on which the objects list are gathered.
- Returns:
list of objects gathered from all workers or
Noneon non-destination ranks.- Return type:
Optional[List[Any]]
- gather(tensor: Tensor, dst_rank: int = 0) List | None[source]ο
Gathers any object from the whole group in a list (to all workers).
- Parameters:
obj (Any) β object to gather from all workers.
dst_rank (int) β rank of the worker on which the objects list are gathered.
- Returns:
- list of objects gathered from all workers if main
worker, otherwise return None.
- Return type:
Optional[List[Any]]
- class itwinai.torch.distributed.DeepSpeedStrategy(backend: Literal['nccl', 'gloo', 'mpi'])[source]ο
Bases:
TorchDistributedStrategyDeepSpeed distributed strategy class.
- Parameters:
backend (Literal['nccl', 'gloo', 'mpi']) β Name of the distributed communication backend to employ.
config (Union[dict, Path, str]) β DeepSpeed config. Either a dictionary or a path to a JSON file.
- backend: Literal['nccl', 'gloo', 'mpi']ο
Torch distributed communication backend.
- init() None[source]ο
Initializes the distributed process group and the distributed package.
- Raises:
RuntimeError β when there are not (enough) GPUs available.
DistributedStrategyError β when trying to initialize a strategy already initialized.
- distributed(model: Module, optimizer: Optimizer | None = None, lr_scheduler: _LRScheduler | None = None, model_parameters: Any | None = None, **init_kwargs) Tuple[Module, Optimizer, _LRScheduler | None][source]ο
Setup model, optimizer and scheduler for distributed.
- global_world_size() int[source]ο
Returns the total number of processes (global world size).
- Returns:
global world size.
- Return type:
int
- local_world_size() int[source]ο
Returns the local number of workers available per node, which is usually the number of GPUs available.
- Returns:
local world size.
- Return type:
int
- Raises:
RuntimeError β when the local world size cannot be retrieved.
- global_rank() int[source]ο
Returns the global rank of the current process, where rank ranges from 0 to world_size.
- Returns:
global rank.
- Return type:
int
- local_rank() int[source]ο
Returns the local rank of the current process.
- Returns:
local rank.
- Return type:
int
- allgather_obj(obj: Any) List[Any][source]ο
All-gathers any object from the whole group in a list (to all workers).
- Parameters:
obj (Any) β Object to gather from all workers.
- Returns:
List of gathered objects.
- Return type:
List[Any]
- gather_obj(obj: Any, dst_rank: int = 0) List[Any] | None[source]ο
Gathers any object from the whole group in a list (to all workers).
- Parameters:
obj (Any) β object to gather from all workers.
dst_rank (int) β rank of the worker on which the objects list are gathered.
- Returns:
list of objects gathered from all workers or
Noneon non-destination ranks.- Return type:
Optional[List[Any]]
- gather(tensor: Tensor, dst_rank: int = 0) List[Tensor] | None[source]ο
Gathers a tensor from the whole group in a list (to all workers).
- Parameters:
obj (Any) β object to gather from all workers.
dst_rank (int) β rank of the worker on which the objects list are gathered.
- Returns:
list of tensors gathered from all workers or
Noneon non-destination ranks.- Return type:
Optional[List[torch.Tensor]]
- class itwinai.torch.distributed.HorovodStrategy[source]ο
Bases:
TorchDistributedStrategyHorovod distributed strategy class.
- init() None[source]ο
Initializes the Horovod distributed backend.
- Raises:
RuntimeError β when there are not (enough) GPUs available.
DistributedStrategyError β when trying to initialize a strategy already initialized.
- distributed(model: Module, optimizer: Optimizer | None = None, lr_scheduler: _LRScheduler | None = None, **optim_kwargs) Tuple[Module, Optimizer, _LRScheduler | None][source]ο
Setup model, optimizer and scheduler for distributed.
- global_world_size() int[source]ο
Returns the total number of processes (global world size).
- Returns:
global world size.
- Return type:
int
- local_world_size() int[source]ο
Returns the local number of workers available per node, which is usually the number of GPUs available.
- Returns:
local world size.
- Return type:
int
- global_rank() int[source]ο
Returns the global rank of the current process, where rank ranges from 0 to world_size.
- Returns:
global rank.
- Return type:
int
- local_rank() int[source]ο
Returns the local rank of the current process.
- Returns:
local rank.
- Return type:
int
- allgather_obj(obj: Any) list[Any][source]ο
All-gathers any object from the whole group in a list (to all workers).
- Parameters:
obj (Any) β Object to gather from all workers.
- Returns:
List of gathered objects.
- Return type:
List[Any]
- gather_obj(obj: Any, dst_rank: int = 0) list[Any] | None[source]ο
Gathers any object from the whole group in a list (to all workers). Under the hood it relies on allgather as gather is not supported by Horovod.
- Parameters:
obj (Any) β object to gather from all workers.
dst_rank (int) β rank of the worker on which the objects list are gathered.
- Returns:
list of objects gathered from all workers or
Noneon non-destination ranks.- Return type:
Optional[List[Any]]
- gather(tensor: Tensor, dst_rank: int = 0) List[Tensor] | None[source]ο
Gathers a tensor from the whole group in a list (to all workers). Under the hood it relies on allgather as gather is not supported by Horovod.
- Parameters:
obj (Any) β object to gather from all workers.
dst_rank (int) β rank of the worker on which the objects list are gathered.
- Returns:
list of tensors gathered from all workers or
Noneon non-destination ranks.- Return type:
Optional[List[torch.Tensor]]
- class itwinai.torch.distributed.NonDistributedStrategy[source]ο
Bases:
TorchDistributedStrategyDummy class for non-distributed environments.
- is_distributed: bool = Falseο
This strategy is not distributed. Defaults to False.
- init() None[source]ο
If CUDA is available set CUDA device, and do nothing more.
- Raises:
DistributedStrategyError β when trying to initialize a strategy already initialized.
- distributed(model: Module, optimizer: Optimizer | None = None, lr_scheduler: _LRScheduler | None = None, **kwargs) Tuple[Module, Optimizer, _LRScheduler | None][source]ο
Do nothing and return model, optimizer and scheduler.
- global_world_size() int[source]ο
Returns the total number of processes (global world size).
- Returns:
global world size.
- Return type:
int
- local_world_size() int[source]ο
Returns the local number of workers available per node, which is usually the number of GPUs available.
- Returns:
local world size.
- Return type:
int
- global_rank() int[source]ο
Returns the global rank of the current process, where rank ranges from 0 to world_size.
- Returns:
global rank.
- Return type:
int
- local_rank() int[source]ο
Returns the local rank of the current process.
- Returns:
local rank.
- Return type:
int
- allgather_obj(obj: Any) list[Any][source]ο
Wraps
objinto a List object.- Parameters:
obj (Any) β object in a worker.
- Returns:
input object wrapped in a list.
- Return type:
list[Any]
- class itwinai.torch.distributed.RayDDPStrategy[source]ο
Bases:
TorchDDPStrategyA distributed data-parallel (DDP) strategy using Ray Train for PyTorch training.
- init() None[source]ο
Initializes the distributed process group and the distributed package.
- Raises:
RuntimeError β when there are not (enough) GPUs available.
DistributedStrategyError β when trying to initialize a strategy which is already initialized.
- global_world_size() int[source]ο
Returns the total number of processes (global world size).
- Returns:
global world size.
- Return type:
int
- local_world_size() int[source]ο
Returns the local number of workers available per node, which is usually the number of GPUs available.
- Returns:
local world size.
- Return type:
int
- Raises:
RuntimeError β when the local world size cannot be retrieved.
- global_rank() int[source]ο
Returns the global rank of the current process, where rank ranges from 0 to world_size.
- Returns:
global rank.
- Return type:
int
- local_rank() int[source]ο
Returns the local rank of the current process.
- Returns:
local rank.
- Return type:
int
- distributed(model: Module, optimizer: Optimizer, lr_scheduler: _LRScheduler | None = None) Tuple[Module, Optimizer, _LRScheduler | None][source]ο
Setup model, optimizer and scheduler for distributed.
- create_dataloader(dataset: Dataset[T_co], batch_size: int | None = 1, shuffle: bool | None = None, sampler: Sampler | Iterable | None = None, batch_sampler: Sampler[List] | Iterable[List] | None = None, num_workers: int = 0, collate_fn: Callable[[List[T]], Any] | None = None, pin_memory: bool = False, drop_last: bool = False, timeout: float = 0, worker_init_fn: Callable[[int], None] | None = None, multiprocessing_context=None, generator=None, *, prefetch_factor: int | None = None, persistent_workers: bool = False, pin_memory_device: str = '')[source]ο
Create a distributed DataLoader by using
DistributedSampleras random sampler.- Parameters:
dataset (Dataset) β dataset from which to load the data.
batch_size (int, optional) β how many samples per batch to load (default:
1).shuffle (bool, optional) β set to
Trueto have the data reshuffled at every epoch (default:False).sampler (Sampler or Iterable, optional) β defines the strategy to draw samples from the dataset. Can be any
Iterablewith__len__implemented. If specified,shufflemust not be specified.batch_sampler (Sampler or Iterable, optional) β like
sampler, but returns a batch of indices at a time. Mutually exclusive withbatch_size,shuffle,sampler, anddrop_last.num_workers (int, optional) β how many subprocesses to use for data loading.
0means that the data will be loaded in the main process. (default:0)collate_fn (Callable, optional) β merges a list of samples to form a mini-batch of Tensor(s). Used when using batched loading from a map-style dataset.
pin_memory (bool, optional) β If
True, the data loader will copy Tensors into device/CUDA pinned memory before returning them. If your data elements are a custom type, or yourcollate_fnreturns a batch that is a custom type, see the example below.drop_last (bool, optional) β set to
Trueto drop the last incomplete batch, if the dataset size is not divisible by the batch size. IfFalseand the size of dataset is not divisible by the batch size, then the last batch will be smaller. (default:False)timeout (numeric, optional) β if positive, the timeout value for collecting a batch from workers. Should always be non-negative. (default:
0)worker_init_fn (Callable, optional) β If not
None, this will be called on each worker subprocess with the worker id (an int in[0, num_workers - 1]) as input, after seeding and before data loading. (default:None)or (multiprocessing_context (str) β multiprocessing.context.BaseContext, optional): If
None, the default multiprocessing context of your operating system will be used. (default:None)generator (torch.Generator, optional) β If not
None, this RNG will be used by RandomSampler to generate random indexes and multiprocessing to generatebase_seedfor workers. (default:None)prefetch_factor (int, optional, keyword-only arg) β Number of batches loaded in advance by each worker.
2means there will be a total of 2 * num_workers batches prefetched across all workers. (default value depends on the set value for num_workers. If value of num_workers=0 default isNone. Otherwise, if value ofnum_workers > 0default is2).persistent_workers (bool, optional) β If
True, the data loader will not shut down the worker processes after a dataset has been consumed once. This allows to maintain the workers Dataset instances alive. (default:False)pin_memory_device (str, optional) β the device to
pin_memoryto ifpin_memoryisTrue.
- Raises:
UninitializedStrategyError β when this method is called for a strategy which had not been initialized.
RuntimeError β when a user-provided sampler, if given, is not of type
DistributedSampler.
Warning
If the
spawnstart method is used,worker_init_fncannot be an unpicklable object, e.g., a lambda function. See Multiprocessing best practices on more details related to multiprocessing in PyTorch.Warning
len(dataloader)heuristic is based on the length of the sampler used. Whendatasetis anIterableDataset, it instead returns an estimate based onlen(dataset) / batch_size, with proper rounding depending ondrop_last, regardless of multi-process loading configurations. This represents the best guess PyTorch can make because PyTorch trusts userdatasetcode in correctly handling multi-process loading to avoid duplicate data.However, if sharding results in multiple workers having incomplete last batches, this estimate can still be inaccurate, because (1) an otherwise complete batch can be broken into multiple ones and (2) more than one batch worth of samples can be dropped when
drop_lastis set. Unfortunately, PyTorch can not detect such cases in general.See Dataset Types for more details on these two types of datasets and how
IterableDatasetinteracts with Multi-process data loading.Warning
See Reproducibility, and My data loader workers return identical random numbers, and Randomness in multi-process data loading notes for random seed related questions.
- class itwinai.torch.distributed.RayDeepSpeedStrategy(backend: Literal['nccl', 'gloo', 'mpi'])[source]ο
Bases:
DeepSpeedStrategyA distributed strategy using Ray and DeepSpeed for PyTorch training.
- Parameters:
backend (Literal["nccl", "gloo", "mpi"]) β The backend for distributed communication.
inference.pyο
- class itwinai.torch.inference.TorchModelLoader(model_uri: str)[source]ο
Bases:
ModelLoaderLoads a torch model from somewhere.
- Parameters:
model_uri (str) β Can be a path on local filesystem or an mlflow βlocatorβ in the form: βmlflow+MLFLOW_TRACKING_URI+RUN_ID+ARTIFACT_PATHβ
- class itwinai.torch.inference.TorchPredictor(model: Module | ModelLoader, test_dataloader_class: str = 'torch.utils.data.DataLoader', test_dataloader_kwargs: Dict | None = None, name: str | None = None)[source]ο
Bases:
PredictorApplies a pre-trained torch model to unseen data.
- test_dataset: Datasetο
Dataseton which to make predictions (ML inference).
- test_dataloader: DataLoader = Noneο
DataLoaderfor test dataset.
- model: Module = Noneο
Pre-trained PyTorch model used to make predictions.
- execute(test_dataset: Dataset, model: Module = None) Dict[str, Any][source]ο
Applies a torch model to a dataset for inference.
- Parameters:
test_dataset (Dataset[str, Any]) β each item in this dataset is a couple (item_unique_id, item)
model (nn.Module, optional) β torch model. Overrides the existing model, if given. Defaults to None.
- Returns:
- maps each item ID to the corresponding predicted
value(s).
- Return type:
Dict[str, Any]
- class itwinai.torch.inference.MulticlassTorchPredictor(model: Module | ModelLoader, test_dataloader_class: str = 'torch.utils.data.DataLoader', test_dataloader_kwargs: Dict | None = None, name: str | None = None)[source]ο
Bases:
TorchPredictorApplies a pre-trained torch model to unseen data for multiclass classification.
- class itwinai.torch.inference.MultilabelTorchPredictor(model: Module | ModelLoader, test_dataloader_class: str = 'torch.utils.data.DataLoader', test_dataloader_kwargs: Dict | None = None, threshold: float = 0.5, name: str | None = None)[source]ο
Bases:
TorchPredictorApplies a pre-trained torch model to unseen data for multilabel classification, applying a threshold on the output of the neural network.
- threshold: float = 0.5ο
Threshold to transform probabilities into class predictions. Defaults to 0.5.
- class itwinai.torch.inference.RegressionTorchPredictor(model: Module | ModelLoader, test_dataloader_class: str = 'torch.utils.data.DataLoader', test_dataloader_kwargs: Dict | None = None, name: str | None = None)[source]ο
Bases:
TorchPredictorApplies a pre-trained torch model to unseen data for regression, leaving untouched the output of the neural network.
loggers.pyο
- class itwinai.torch.loggers.ItwinaiLogger(itwinai_logger: Logger, log_model: Literal['all'] | bool = False)[source]ο
Bases:
LoggerAdapter between PyTorch Lightning logger and itwinai logger.
This adapter forwards logging calls from PyTorch Lightning to the itwinai Logger instance, using the itwinai Loggerβs log method. It supports the lightning logging of metrics, hyperparameters, and checkpoints. Additionally, any function calls can be forwarded to the itwinai logger instance though the experiment property of this Adapter.
- property name: str | Noneο
Return the experiment name.
- property version: int | str | Noneο
Return the experiment version.
- property save_dir: str | Noneο
Return the directory where the logs are stored.
- property experiment: Loggerο
Lightning Logger function. Initializes and returns the itwinai Logger context for experiment tracking.
- Returns:
The itwinai logger instance.
- Return type:
- finalize(status: str) None[source]ο
Lightning Logger function. Logs any remaining checkpoints and closes the logger context.
- Parameters:
status (str) β Describes the status of the training (e.g., βcompletedβ, βfailedβ).
classes' (The status is not needed for this function but part of the parent) β (LightningLogger)
signature (finalize functions)
here. (and therefore must be propagated)
- log_metrics(metrics: Dict[str, float], step: int | None = None) None[source]ο
Lightning Logger function. Logs the given metrics and is usually called by the Lightning Trainer.
- Parameters:
metrics (Dict[str, float]) β Dictionary of metrics to log.
step (Optional[int], optional) β Training step associated with the metrics. Defaults to None.
- log_hyperparams(params: Dict[str, Any] | Namespace) None[source]ο
Lightning Logger function. Logs hyperparameters for the experiment.
- Parameters:
params (Union[Dict[str, Any], Namespace]) β Hyperparameters dictionary or object.
- after_save_checkpoint(checkpoint_callback: ModelCheckpoint) None[source]ο
Lightning Logger function. Handles checkpoint saving to the logger after the ModelCheckpoint Callback of the Lightning Trainer is called. The checkpoints are logged as artifacts.
- Parameters:
checkpoint_callback (ModelCheckpoint) β Callback instance to manage checkpointing.
mlflow.pyο
- itwinai.torch.mlflow.init_lightning_mlflow(pl_config: Dict, default_experiment_name: str = 'Default', tmp_dir: str = '.tmp', **autolog_kwargs) None[source]ο
Initialize mlflow for pytorch lightning, also setting up auto-logging (mlflow.pytorch.autolog(β¦)). Creates a new mlflow run and attaches it to the mlflow auto-logger.
- Parameters:
pl_config (Dict) β pytorch lightning configuration loaded in memory.
default_experiment_name (str, optional) β used as experiment name if it is not given in the lightning conf. Defaults to βDefaultβ.
tmp_dir (str) β where to temporarily store some artifacts.
autolog_kwargs (kwargs) β args for mlflow.pytorch.autolog(β¦).
reproducibility.pyο
This module provides the tools to support reproducible execution of torch scripts.
- itwinai.torch.reproducibility.set_seed(rnd_seed: int | None, deterministic_cudnn: bool = True) Generator[source]ο
Set torch random seed and return a PRNG object.
- Parameters:
rnd_seed (Optional[int]) β random seed. If None, the seed is not set.
deterministic_cudnn (bool) β if True, sets
torch.backends.cudnn.benchmark = False, which may affect performances.
- Returns:
PRNG object.
- Return type:
torch.Generator
type.pyο
Custom types definition.
- itwinai.torch.type.LrSchedulerο
Torch learning rate scheduler
- itwinai.torch.type.Batchο
Torch data batch sampled by a
DataLoader.
- itwinai.torch.type.Metricο
Torch metric function provided by
torchmetricslibrary.alias of
Callable
trainer.pyο
Provides training logic for PyTorch models via Trainer classes.
- class itwinai.torch.trainer.TorchTrainer(config: Dict | TrainingConfiguration, epochs: int, model: Module | str | None = None, strategy: Literal['ddp', 'deepspeed', 'horovod'] | None = 'ddp', validation_every: int | None = 1, test_every: int | None = None, random_seed: int | None = None, logger: Logger | None = None, metrics: Dict[str, Callable] | None = None, checkpoints_location: str = 'checkpoints', checkpoint_every: int | None = None, disable_tqdm: bool = False, name: str | None = None, profiling_wait_epochs: int = 1, profiling_warmup_epochs: int = 2)[source]ο
-
Trainer class for torch training algorithms.
- Parameters:
config (Union[Dict, TrainingConfiguration]) β training configuration containing hyperparameters.
epochs (int) β number of training epochs.
model (Optional[Union[nn.Module, str]], optional) β pytorch model to train or a string identifier. Defaults to None.
strategy (Literal['ddp', 'deepspeed', 'horovod'], optional) β distributed strategy. Defaults to βddpβ.
validation_every (Optional[int], optional) β run a validation epoch every
validation_everyepochs. Disabled if None. Defaults to 1.test_every (Optional[int], optional) β run a test epoch every
test_everyepochs. Disabled if None. Defaults to None.random_seed (Optional[int], optional) β set random seed for reproducibility. If None, the seed is not set. Defaults to None.
logger (Optional[Logger], optional) β logger for ML tracking. Defaults to None.
metrics (Optional[Dict[str, Metric]], optional) β map of torchmetrics metrics. Defaults to None.
checkpoints_location (str) β path to checkpoints directory. Defaults to βcheckpointsβ.
checkpoint_every (Optional[int]) β save a checkpoint every
checkpoint_everyepochs. Disabled if None. Defaults to None.disable_tqdm (bool) β whether to disable tqdm progress bar(s).
name (Optional[str], optional) β trainer custom name. Defaults to None.
profiling_wait_epochs (int) β how many epochs to wait before starting the profiler.
profiling_warmup_epochs (int) β length of the profiler warmup phase in terms of number of epochs.
- train_dataloader: DataLoader = Noneο
PyTorch
DataLoaderfor training dataset.
- validation_dataloader: DataLoader = Noneο
PyTorch
DataLoaderfor validation dataset.
- test_dataloader: DataLoader = Noneο
PyTorch
DataLoaderfor test dataset.
- loss: Callable = Noneο
Loss criterion.
- optimizer: Optimizer = Noneο
Optimizer.
- lr_scheduler: Module = Noneο
Learning rate scheduler.
- torch_rng: Generator = Noneο
PyTorch random number generator (PRNG).
- train_glob_step: int = 0ο
Total number training batches used so far, across all epochs.
- validation_glob_step: int = 0ο
Total number validation batches used so far, across all epochs.
- test_glob_step: int = 0ο
Total number test batches used so far, across all epochs.
- model: Module = Noneο
PyTorch model to train.
- metrics: Dict[str, Callable]ο
Dictionary of
torchmetricsmetrics, indexed by user-defined names.
- profiler: Any | Noneο
PyTorch Profiler for communication vs. computation comparison
- property strategy: TorchDistributedStrategyο
Strategy currently in use.
- property device: strο
Current device from distributed strategy.
- get_default_distributed_kwargs() Dict[source]ο
Gives the default kwargs for the trainerβs strategyβs distributed() method.
- create_model_loss_optimizer() None[source]ο
Instantiate a torch model, loss, optimizer, and LR scheduler using the configuration provided in the Trainer constructor. Generally a user-defined method.
- create_dataloaders(train_dataset: Dataset, validation_dataset: Dataset | None = None, test_dataset: Dataset | None = None) None[source]ο
Create train, validation and test dataloaders using the configuration provided in the Trainer constructor. Generally a user-defined method.
- Parameters:
train_dataset (Dataset) β training dataset object.
validation_dataset (Optional[Dataset]) β validation dataset object. Default None.
test_dataset (Optional[Dataset]) β test dataset object. Default None.
- execute(train_dataset: Dataset, validation_dataset: Dataset | None = None, test_dataset: Dataset | None = None) Tuple[Dataset, Dataset, Dataset, Any][source]ο
Prepares distributed environment and data structures for the actual training.
- Parameters:
train_dataset (Dataset) β training dataset.
validation_dataset (Optional[Dataset], optional) β validation dataset. Defaults to None.
test_dataset (Optional[Dataset], optional) β test dataset. Defaults to None.
- Returns:
training dataset, validation dataset, test dataset, trained model.
- Return type:
Tuple[Dataset, Dataset, Dataset, Any]
- set_epoch(epoch: int) None[source]ο
Set current epoch at the beginning of training.
- Parameters:
epoch (int) β epoch number, from 0 to
epochs-1.
- log(item: Any | List[Any], identifier: str | List[str], kind: str = 'metric', step: int | None = None, batch_idx: int | None = None, **kwargs) None[source]ο
Log
itemwithidentifiername ofkindtype atsteptime step.- Parameters:
item (Union[Any, List[Any]]) β element to be logged (e.g., metric).
identifier (Union[str, List[str]]) β unique identifier for the element to log(e.g., name of a metric).
kind (str, optional) β type of the item to be logged. Must be one among the list of self.supported_types. Defaults to βmetricβ.
step (Optional[int], optional) β logging step. Defaults to None.
batch_idx (Optional[int], optional) β DataLoader batch counter (i.e., batch idx), if available. Defaults to None.
- save_checkpoint(name: str, epoch: int, loss: Tensor | None = None) None[source]ο
Save training checkpoint.
- Parameters:
name (str) β name of the checkpoint.
epoch (int) β current training epoch.
loss (Optional[torch.Tensor]) β current loss (if available).
- load_checkpoint(name: str) None[source]ο
Load state from a checkpoint.
- Parameters:
name (str) β name of the checkpoint to load, assuming it is under
self.checkpoints_locationlocation.
- compute_metrics(true: Tensor, pred: Tensor, logger_step: int, batch_idx: int | None, stage: str = 'train') Dict[str, Any][source]ο
Compute and log metrics.
- Parameters:
metrics (Dict[str, Metric]) β metrics dict. Can be
self.train_metricsorself.validation_metrics.true (Batch) β true values.
pred (Batch) β predicted values.
logger_step (int) β global step to pass to the logger.
stage (str) β βtrainβ, βvalidationββ¦
- Returns:
metric values.
- Return type:
Dict[str, Any]
- train()[source]ο
Trains a machine learning model. Main training loop/logic.
- Parameters:
train_dataset (Dataset) β training dataset.
validation_dataset (Dataset) β validation dataset.
test_dataset (Dataset) β test dataset.
- Returns:
training dataset, validation dataset, test dataset, trained model.
- Return type:
Tuple[Dataset, Dataset, Dataset, Any]
- train_epoch(epoch: int) Tensor[source]ο
Perform a complete sweep over the training dataset, completing an epoch of training.
- Parameters:
epoch (int) β current epoch number, from 0 to
self.epochs - 1.- Returns:
average training loss for the current epoch.
- Return type:
Loss
- train_step(batch: Tensor, batch_idx: int) Tuple[Tensor, Dict[str, Any]][source]ο
Perform a single optimization step using a batch sampled from the training dataset.
- Parameters:
batch (Batch) β batch sampled by a dataloader.
batch_idx (int) β batch index in the dataloader.
- Returns:
batch loss and dictionary of metric values with the same structure of
self.metrics.- Return type:
Tuple[Loss, Dict[str, Any]]
- validation_epoch(epoch: int) Tensor | None[source]ο
Perform a complete sweep over the validation dataset, completing an epoch of validation.
- Parameters:
epoch (int) β current epoch number, from 0 to
self.epochs - 1.- Returns:
- average validation loss for the current epoch if
self.validation_dataloader is not None
- Return type:
Optional[Loss]
- validation_step(batch: Tensor, batch_idx: int) Tuple[Tensor, Dict[str, Any]][source]ο
Perform a single optimization step using a batch sampled from the validation dataset.
- Parameters:
batch (Batch) β batch sampled by a dataloader.
batch_idx (int) β batch index in the dataloader.
- Returns:
batch loss and dictionary of metric values with the same structure of
self.metrics.- Return type:
Tuple[Loss, Dict[str, Any]]
- test_epoch(epoch: int) Tensor[source]ο
Perform a complete sweep over the test dataset, completing an epoch of test.
- Parameters:
epoch (int) β current epoch number, from 0 to
self.epochs - 1.- Returns:
average test loss for the current epoch.
- Return type:
Loss
- test_step(batch: Tensor, batch_idx: int) Tuple[Tensor, Dict[str, Any]][source]ο
Perform a single predictions step using a batch sampled from the test dataset.
- Parameters:
batch (Batch) β batch sampled by a dataloader.
batch_idx (int) β batch index in the dataloader.
- Returns:
batch loss and dictionary of metric values with the same structure of
self.metrics.- Return type:
Tuple[Loss, Dict[str, Any]]
- class itwinai.torch.trainer.GANTrainer(config: Dict | TrainingConfiguration, epochs: int, discriminator: Module, generator: Module, strategy: Literal['ddp', 'deepspeed'] = 'ddp', validation_every: int | None = 1, test_every: int | None = None, random_seed: int | None = None, logger: Logger | None = None, metrics: Dict[str, Callable] | None = None, checkpoints_location: str = 'checkpoints', checkpoint_every: int | None = None, name: str | None = None, **kwargs)[source]ο
Bases:
TorchTrainerTrainer class for GAN models using pytorch.
- Parameters:
config (Union[Dict, TrainingConfiguration]) β training configuration containing hyperparameters.
epochs (int) β number of training epochs.
discriminator (nn.Module) β pytorch discriminator model to train GAN.
generator (nn.Module) β pytorch generator model to train GAN.
strategy (Literal['ddp', 'deepspeed', 'horovod'], optional) β distributed strategy. Defaults to βddpβ.
validation_every (Optional[int], optional) β run a validation epoch every
validation_everyepochs. Disabled if None. Defaults to 1.test_every (Optional[int], optional) β run a test epoch every
test_everyepochs. Disabled if None. Defaults to None.random_seed (Optional[int], optional) β set random seed for reproducibility. If None, the seed is not set. Defaults to None.
logger (Optional[Logger], optional) β logger for ML tracking. Defaults to None.
metrics (Optional[Dict[str, Metric]], optional) β map of torch metrics metrics. Defaults to None.
checkpoints_location (str) β path to checkpoints directory. Defaults to βcheckpointsβ.
checkpoint_every (Optional[int]) β save a checkpoint every
checkpoint_everyepochs. Disabled if None. Defaults to None.name (Optional[str], optional) β trainer custom name. Defaults to None.
- create_model_loss_optimizer() None[source]ο
Instantiate a torch model, loss, optimizer, and LR scheduler using the configuration provided in the Trainer constructor. Generally a user-defined method.
- train_epoch(epoch: int)[source]ο
Perform a complete sweep over the training dataset, completing an epoch of training.
- Parameters:
epoch (int) β current epoch number, from 0 to
self.epochs - 1.- Returns:
average training loss for the current epoch.
- Return type:
Loss
- validation_epoch(epoch: int)[source]ο
Perform a complete sweep over the validation dataset, completing an epoch of validation.
- Parameters:
epoch (int) β current epoch number, from 0 to
self.epochs - 1.- Returns:
- average validation loss for the current epoch if
self.validation_dataloader is not None
- Return type:
Optional[Loss]
- train_step(real_images, batch_idx)[source]ο
Perform a single optimization step using a batch sampled from the training dataset.
- Parameters:
batch (Batch) β batch sampled by a dataloader.
batch_idx (int) β batch index in the dataloader.
- Returns:
batch loss and dictionary of metric values with the same structure of
self.metrics.- Return type:
Tuple[Loss, Dict[str, Any]]
- validation_step(real_images, batch_idx)[source]ο
Perform a single optimization step using a batch sampled from the validation dataset.
- Parameters:
batch (Batch) β batch sampled by a dataloader.
batch_idx (int) β batch index in the dataloader.
- Returns:
batch loss and dictionary of metric values with the same structure of
self.metrics.- Return type:
Tuple[Loss, Dict[str, Any]]
- class itwinai.torch.trainer.TorchLightningTrainer(config: Dict | str, mlflow_saved_model: str = 'my_model')[source]ο
Bases:
TrainerGeneric trainer for torch Lightning workflows.
- Parameters:
config (Union[Dict, str]) β Lightning configuration which can be the path to a file or a Python dictionary.
mlflow_saved_model (str, optional) β name of the model created in MLFlow. Defaults to βmy_modelβ.
- itwinai.torch.trainer.distributed(func)[source]ο
The decorated function must have a standard signature. Its first arguments must be: model, train_dataloader, validation_dataloader, device (in this order).
Additional args or kwargs are allowed consistently with the signature of the decorated function.
- class itwinai.torch.trainer.RayTorchTrainer(config: Dict, strategy: Literal['ddp', 'deepspeed'] = 'ddp', name: str | None = None, logger: Logger | None = None, random_seed: int = 1234)[source]ο
Bases:
TrainerA trainer class for distributed training and hyperparameter optimization using Ray Train/ Tune and PyTorch.
- Parameters:
config (Dict) β A dictionary of configuration settings for the trainer.
strategy (Optional[Literal["ddp", "deepspeed"]]) β The distributed training strategy to use. Defaults to βddpβ.
name (Optional[str]) β Optional name for the trainer instance. Defaults to None.
logger (Optional[Logger]) β Optional logger instance. Defaults to None.
- property device: strο
Get the current device from distributed strategy. :returns: Device string (e.g., βcuda:0β). :rtype: str
- create_dataloaders(train_dataset: Dataset, validation_dataset: Dataset | None = None, test_dataset: Dataset | None = None, batch_size: int = 1, num_workers_dataloader: int = 4, pin_memory: bool = False, shuffle_train: bool | None = False, shuffle_test: bool | None = False, shuffle_validation: bool | None = False, sampler: Sampler | Iterable | None = None, collate_fn: Callable[[List], Any] | None = None) None[source]ο
Create data loaders for training, validation, and testing.
- Parameters:
train_dataset (Dataset) β The training dataset.
validation_dataset (Dataset, optional) β The validation dataset. Defaults to None.
test_dataset (Dataset, optional) β The test dataset. Defaults to None.
batch_size (int, optional) β Batch size for data loaders. Defaults to 1.
shuffle_train (bool, optional) β Whether to shuffle the training dataset. Defaults to False.
shuffle_test (bool, optional) β Whether to shuffle the test dataset. Defaults to False.
shuffle_validation (bool, optional) β Whether to shuffle the validation dataset. Defaults to False.
sampler (Union[Sampler, Iterable, None], optional) β Sampler for the datasets. Defaults to None.
collate_fn (Callable[[List], Any], optional) β Function to collate data samples into batches. Defaults to None.
- execute(train_dataset: Dataset, validation_dataset: Dataset | None = None, test_dataset: Dataset | None = None) Tuple[Dataset, Dataset, Dataset, Any][source]ο
Execute the training pipeline with the given datasets.
- Parameters:
train_dataset (Dataset) β Training dataset.
validation_dataset (Optional[Dataset], optional) β Validation dataset. Defaults to None.
test_dataset (Optional[Dataset], optional) β Test dataset. Defaults to None.
- Returns:
A tuple containing the datasets and the training result grid.
- Return type:
Tuple[Dataset, Dataset, Dataset, Any]
- log(item: Any | List[Any], identifier: str | List[str], kind: str = 'metric', step: int | None = None, batch_idx: int | None = None, **kwargs) None[source]ο
Log
itemwithidentifiername ofkindtype atsteptime step.- Parameters:
item (Union[Any, List[Any]]) β element to be logged (e.g., metric).
identifier (Union[str, List[str]]) β unique identifier for the element to log(e.g., name of a metric).
kind (str, optional) β type of the item to be logged. Must be one among the list of self.supported_types. Defaults to βmetricβ.
step (Optional[int], optional) β logging step. Defaults to None.
batch_idx (Optional[int], optional) β DataLoader batch counter (i.e., batch idx), if available. Defaults to None.