itwinai PyTorch Modulesο
distributed.pyο
- itwinai.torch.distributed.distributed_resources_available() bool[source]ο
Check if the current execution environment has (enough) GPUs available to allow for distributed ML.
- Returns:
env can support distributed ML.
- Return type:
bool
- class itwinai.torch.distributed.TorchDistributedStrategy[source]ο
Bases:
DistributedStrategyAbstract class to define the distributed backend methods for PyTorch models.
- is_distributed: bool = Trueο
Allows to discriminate distributed strategies from non-distributed. Defaults to True.
- is_initialized: bool = Falseο
Set to True when the current strategy is initialized. Defaults to False.
- property is_main_worker: boolο
Checks if local worker has global rank equal to zero.
- Returns:
True if main worker.
- Return type:
bool
- abstract distributed(model: Module, optimizer: Optimizer, lr_scheduler: _LRScheduler | None = None) Tuple[Module, Optimizer, _LRScheduler | None][source]ο
Setup model, optimizer and scheduler for distributed.
- abstract global_world_size() int[source]ο
Returns the total number of processes (global world size).
- Returns:
global world size.
- Return type:
int
- abstract local_world_size() int[source]ο
Returns the number of local workers available on a node (local world size). Usually it is equal to the number of available GPUs.
- Returns:
local world size.
- Return type:
int
- abstract global_rank() int[source]ο
Returns the global rank of the current process. Rank ranges from 0 to world_size.
- Returns:
global rank.
- Return type:
int
- abstract local_rank() int[source]ο
Returns the local rank of the current process.
- Returns:
local rank.
- Return type:
int
- device() str[source]ο
Device used by local worker.
- Returns:
torch device in the form βcuda:Nβ.
- Return type:
str
- create_dataloader(dataset: Dataset[T_co], batch_size: int | None = 1, shuffle: bool | None = None, sampler: Sampler | Iterable | None = None, batch_sampler: Sampler[List] | Iterable[List] | None = None, num_workers: int = 0, collate_fn: Callable[[List[T]], Any] | None = None, pin_memory: bool = False, drop_last: bool = False, timeout: float = 0, worker_init_fn: Callable[[int], None] | None = None, multiprocessing_context=None, generator=None, *, prefetch_factor: int | None = None, persistent_workers: bool = False, pin_memory_device: str = '')[source]ο
Create a distributed DataLoader by using
DistributedSampleras random sampler.- Parameters:
dataset (Dataset) β dataset from which to load the data.
batch_size (int, optional) β how many samples per batch to load (default:
1).shuffle (bool, optional) β set to
Trueto have the data reshuffled at every epoch (default:False).sampler (Sampler or Iterable, optional) β defines the strategy to draw samples from the dataset. Can be any
Iterablewith__len__implemented. If specified,shufflemust not be specified.batch_sampler (Sampler or Iterable, optional) β like
sampler, but returns a batch of indices at a time. Mutually exclusive withbatch_size,shuffle,sampler, anddrop_last.num_workers (int, optional) β how many subprocesses to use for data loading.
0means that the data will be loaded in the main process. (default:0)collate_fn (Callable, optional) β merges a list of samples to form a mini-batch of Tensor(s). Used when using batched loading from a map-style dataset.
pin_memory (bool, optional) β If
True, the data loader will copy Tensors into device/CUDA pinned memory before returning them. If your data elements are a custom type, or yourcollate_fnreturns a batch that is a custom type, see the example below.drop_last (bool, optional) β set to
Trueto drop the last incomplete batch, if the dataset size is not divisible by the batch size. IfFalseand the size of dataset is not divisible by the batch size, then the last batch will be smaller. (default:False)timeout (numeric, optional) β if positive, the timeout value for collecting a batch from workers. Should always be non-negative. (default:
0)worker_init_fn (Callable, optional) β If not
None, this will be called on each worker subprocess with the worker id (an int in[0, num_workers - 1]) as input, after seeding and before data loading. (default:None)or (multiprocessing_context (str) β multiprocessing.context.BaseContext, optional): If
None, the default multiprocessing context of your operating system will be used. (default:None)generator (torch.Generator, optional) β If not
None, this RNG will be used by RandomSampler to generate random indexes and multiprocessing to generatebase_seedfor workers. (default:None)prefetch_factor (int, optional, keyword-only arg) β Number of batches loaded in advance by each worker.
2means there will be a total of 2 * num_workers batches prefetched across all workers. (default value depends on the set value for num_workers. If value of num_workers=0 default isNone. Otherwise, if value ofnum_workers > 0default is2).persistent_workers (bool, optional) β If
True, the data loader will not shut down the worker processes after a dataset has been consumed once. This allows to maintain the workers Dataset instances alive. (default:False)pin_memory_device (str, optional) β the device to
pin_memoryto ifpin_memoryisTrue.
Warning
If the
spawnstart method is used,worker_init_fncannot be an unpicklable object, e.g., a lambda function. See Multiprocessing best practices on more details related to multiprocessing in PyTorch.Warning
len(dataloader)heuristic is based on the length of the sampler used. Whendatasetis anIterableDataset, it instead returns an estimate based onlen(dataset) / batch_size, with proper rounding depending ondrop_last, regardless of multi-process loading configurations. This represents the best guess PyTorch can make because PyTorch trusts userdatasetcode in correctly handling multi-process loading to avoid duplicate data.However, if sharding results in multiple workers having incomplete last batches, this estimate can still be inaccurate, because (1) an otherwise complete batch can be broken into multiple ones and (2) more than one batch worth of samples can be dropped when
drop_lastis set. Unfortunately, PyTorch can not detect such cases in general.See Dataset Types for more details on these two types of datasets and how
IterableDatasetinteracts with Multi-process data loading.Warning
See Reproducibility, and My data loader workers return identical random numbers, and Randomness in multi-process data loading notes for random seed related questions.
- abstract allgather_obj(obj: Any) List[Any][source]ο
All-gathers any object from the whole group in a list (to all workers).
- Parameters:
obj (Any) β object to gather from all workers.
- Returns:
list of objects gathered from all workers.
- Return type:
List[Any]
- abstract gather_obj(obj: Any, dst_rank: int = 0) List[Any][source]ο
Gathers any object from the whole group in a list (to all workers).
- Parameters:
obj (Any) β object to gather from all workers.
dst_rank (int) β rank of the worker on which the objects list are gathered.
- Returns:
list of objects gathered from all workers.
- Return type:
List[Any]
- class itwinai.torch.distributed.TorchDDPStrategy(backend: Literal['nccl', 'gloo', 'mpi'])[source]ο
Bases:
TorchDistributedStrategyPyTorch
DistributedDataParalleldistributed strategy class.- Parameters:
backend (Literal['nccl', 'gloo', 'mpi']) β Name of the distributed communication backend to employ.
- backend: Literal['nccl', 'gloo', 'mpi']ο
Torch distributed communication backend.
- init() None[source]ο
Initializes the distributed process group and the distributed package.
- Raises:
RuntimeError β when there are not (enough) GPUs available.
DistributedStrategyError β when trying to initialize a strategy which is already initialized.
- distributed(model: Module, optimizer: Optimizer, lr_scheduler: _LRScheduler | None = None, **kwargs) Tuple[Module, Optimizer, _LRScheduler | None][source]ο
Setup model, optimizer and scheduler for distributed.
- global_world_size() int[source]ο
Returns the total number of processes (global world size).
- Returns:
global world size.
- Return type:
int
- local_world_size() int[source]ο
Returns the local number of workers available per node, which is usually the number of GPUs available.
- Returns:
local world size.
- Return type:
int
- global_rank() int[source]ο
Returns the global rank of the current process, where rank ranges from 0 to world_size.
- Returns:
global rank.
- Return type:
int
- local_rank() int[source]ο
Returns the local rank of the current process.
- Returns:
local rank.
- Return type:
int
- allgather_obj(obj: Any) List[Any][source]ο
All-gathers any object from the whole group in a list (to all workers).
- Parameters:
obj (Any) β Object to gather from all workers.
- Returns:
List of gathered objects.
- Return type:
List[Any]
- gather_obj(obj: Any, dst_rank: int = 0) List[Any] | None[source]ο
Gathers any object from the whole group in a list (to all workers).
- Parameters:
obj (Any) β object to gather from all workers.
dst_rank (int) β rank of the worker on which the objects list are gathered.
- Returns:
list of objects gathered from all workers or
Noneon non-destination ranks.- Return type:
Optional[List[Any]]
- class itwinai.torch.distributed.DeepSpeedStrategy(backend: Literal['nccl', 'gloo', 'mpi'])[source]ο
Bases:
TorchDistributedStrategyDeepSpeed distributed strategy class.
- Parameters:
backend (Literal['nccl', 'gloo', 'mpi']) β Name of the distributed communication backend to employ.
config (Union[dict, Path, str]) β DeepSpeed config. Either a dictionary or a path to a JSON file.
- backend: Literal['nccl', 'gloo', 'mpi']ο
Torch distributed communication backend.
- init() None[source]ο
Initializes the distributed process group and the distributed package.
- Raises:
RuntimeError β when there are not (enough) GPUs available.
DistributedStrategyError β when trying to initialize a strategy already initialized.
- distributed(model: Module, optimizer: Optimizer | None = None, lr_scheduler: _LRScheduler | None = None, model_parameters: Any | None = None, **init_kwargs) Tuple[Module, Optimizer, _LRScheduler | None][source]ο
Setup model, optimizer and scheduler for distributed.
- global_world_size() int[source]ο
Returns the total number of processes (global world size).
- Returns:
global world size.
- Return type:
int
- local_world_size() int[source]ο
Returns the local number of workers available per node, which is usually the number of GPUs available.
- Returns:
local world size.
- Return type:
int
- global_rank() int[source]ο
Returns the global rank of the current process, where rank ranges from 0 to world_size.
- Returns:
global rank.
- Return type:
int
- local_rank() int[source]ο
Returns the local rank of the current process.
- Returns:
local rank.
- Return type:
int
- allgather_obj(obj: Any) List[Any][source]ο
All-gathers any object from the whole group in a list (to all workers).
- Parameters:
obj (Any) β Object to gather from all workers.
- Returns:
List of gathered objects.
- Return type:
List[Any]
- gather_obj(obj: Any, dst_rank: int = 0) List[Any] | None[source]ο
Gathers any object from the whole group in a list (to all workers).
- Parameters:
obj (Any) β object to gather from all workers.
dst_rank (int) β rank of the worker on which the objects list are gathered.
- Returns:
list of objects gathered from all workers or
Noneon non-destination ranks.- Return type:
Optional[List[Any]]
- class itwinai.torch.distributed.HorovodStrategy[source]ο
Bases:
TorchDistributedStrategyHorovod distributed strategy class.
- init() None[source]ο
Initializes the Horovod distributed backend.
- Raises:
RuntimeError β when there are not (enough) GPUs available.
DistributedStrategyError β when trying to initialize a strategy already initialized.
- distributed(model: Module, optimizer: Optimizer | None = None, lr_scheduler: _LRScheduler | None = None, **optim_kwargs) Tuple[Module, Optimizer, _LRScheduler | None][source]ο
Setup model, optimizer and scheduler for distributed.
- global_world_size() int[source]ο
Returns the total number of processes (global world size).
- Returns:
global world size.
- Return type:
int
- local_world_size() int[source]ο
Returns the local number of workers available per node, which is usually the number of GPUs available.
- Returns:
local world size.
- Return type:
int
- global_rank() int[source]ο
Returns the global rank of the current process, where rank ranges from 0 to world_size.
- Returns:
global rank.
- Return type:
int
- local_rank() int[source]ο
Returns the local rank of the current process.
- Returns:
local rank.
- Return type:
int
- class itwinai.torch.distributed.NonDistributedStrategy[source]ο
Bases:
TorchDistributedStrategyDummy class for non-distributed environments.
- is_distributed: bool = Falseο
This strategy is not distributed. Defaults to False.
- init() None[source]ο
If CUDA is available set CUDA device, and do nothing more.
- Raises:
DistributedStrategyError β when trying to initialize a strategy already initialized.
- device() str[source]ο
Device used by local worker.
- Returns:
cpu device if CUDA is not available.
- Return type:
str
- distributed(model: Module, optimizer: Optimizer | None = None, lr_scheduler: _LRScheduler | None = None, **kwargs) Tuple[Module, Optimizer, _LRScheduler | None][source]ο
Do nothing and return model, optimizer and scheduler.
- global_world_size() int[source]ο
Returns the total number of processes (global world size).
- Returns:
global world size.
- Return type:
int
- local_world_size() int[source]ο
Returns the local number of workers available per node, which is usually the number of GPUs available.
- Returns:
local world size.
- Return type:
int
- global_rank() int[source]ο
Returns the global rank of the current process, where rank ranges from 0 to world_size.
- Returns:
global rank.
- Return type:
int
- local_rank() int[source]ο
Returns the local rank of the current process.
- Returns:
local rank.
- Return type:
int
inference.pyο
- class itwinai.torch.inference.TorchModelLoader(model_uri: str)[source]ο
Bases:
ModelLoaderLoads a torch model from somewhere.
- Parameters:
model_uri (str) β Can be a path on local filesystem or an mlflow βlocatorβ in the form: βmlflow+MLFLOW_TRACKING_URI+RUN_ID+ARTIFACT_PATHβ
- class itwinai.torch.inference.TorchPredictor(model: Module | ModelLoader, test_dataloader_class: str = 'torch.utils.data.DataLoader', test_dataloader_kwargs: Dict | None = None, name: str | None = None)[source]ο
Bases:
PredictorApplies a pre-trained torch model to unseen data.
- test_dataset: Datasetο
Dataseton which to make predictions (ML inference).
- test_dataloader: DataLoader = Noneο
DataLoaderfor test dataset.
- model: Module = Noneο
Pre-trained PyTorch model used to make predictions.
- execute(test_dataset: Dataset, model: Module = None) Dict[str, Any][source]ο
Applies a torch model to a dataset for inference.
- Parameters:
test_dataset (Dataset[str, Any]) β each item in this dataset is a couple (item_unique_id, item)
model (nn.Module, optional) β torch model. Overrides the existing model, if given. Defaults to None.
- Returns:
- maps each item ID to the corresponding predicted
value(s).
- Return type:
Dict[str, Any]
- class itwinai.torch.inference.MulticlassTorchPredictor(model: Module | ModelLoader, test_dataloader_class: str = 'torch.utils.data.DataLoader', test_dataloader_kwargs: Dict | None = None, name: str | None = None)[source]ο
Bases:
TorchPredictorApplies a pre-trained torch model to unseen data for multiclass classification.
- class itwinai.torch.inference.MultilabelTorchPredictor(model: Module | ModelLoader, test_dataloader_class: str = 'torch.utils.data.DataLoader', test_dataloader_kwargs: Dict | None = None, threshold: float = 0.5, name: str | None = None)[source]ο
Bases:
TorchPredictorApplies a pre-trained torch model to unseen data for multilabel classification, applying a threshold on the output of the neural network.
- threshold: float = 0.5ο
Threshold to transform probabilities into class predictions. Defaults to 0.5.
- class itwinai.torch.inference.RegressionTorchPredictor(model: Module | ModelLoader, test_dataloader_class: str = 'torch.utils.data.DataLoader', test_dataloader_kwargs: Dict | None = None, name: str | None = None)[source]ο
Bases:
TorchPredictorApplies a pre-trained torch model to unseen data for regression, leaving untouched the output of the neural network.
mlflow.pyο
- itwinai.torch.mlflow.init_lightning_mlflow(pl_config: Dict, default_experiment_name: str = 'Default', tmp_dir: str = '.tmp', **autolog_kwargs) None[source]ο
Initialize mlflow for pytorch lightning, also setting up auto-logging (mlflow.pytorch.autolog(β¦)). Creates a new mlflow run and attaches it to the mlflow auto-logger.
- Parameters:
pl_config (Dict) β pytorch lightning configuration loaded in memory.
default_experiment_name (str, optional) β used as experiment name if it is not given in the lightning conf. Defaults to βDefaultβ.
tmp_dir (str) β where to temporarily store some artifacts.
autolog_kwargs (kwargs) β args for mlflow.pytorch.autolog(β¦).
trainer.pyο
Provides training logic for PyTorch models via Trainer classes.
- class itwinai.torch.trainer.TorchTrainer(config: Dict, epochs: int, model: Module | None = None, strategy: Literal['ddp', 'deepspeed', 'horovod'] = 'ddp', validation_every: int | None = 1, test_every: int | None = None, random_seed: int | None = None, logger: Logger | None = None, log_all_workers: bool = False, metrics: Dict[str, Callable] | None = None, checkpoints_location: str = 'checkpoints', checkpoint_every: int | None = None, name: str | None = None)[source]ο
-
Trainer class for torch training algorithms.
- Parameters:
config (Dict) β training configuration containing hyperparameters.
epochs (int) β number of training epochs.
model (Optional[nn.Module], optional) β model to train. Defaults to None.
strategy (Literal['ddp', 'deepspeed', 'horovod'], optional) β distributed strategy. Defaults to βddpβ.
validation_every (Optional[int], optional) β run a validation epoch every
validation_everyepochs. Disabled if None. Defaults to 1.test_every (Optional[int], optional) β run a test epoch every
test_everyepochs. Disabled if None. Defaults to None.random_seed (Optional[int], optional) β set random seed for reproducibility. If None, the seed is not set. Defaults to None.
logger (Optional[Logger], optional) β logger for ML tracking. Defaults to None.
log_all_workers (bool, optional) β if True, the
logmethod is called on all workers in the distributed context. Defaults to False.metrics (Optional[Dict[str, Metric]], optional) β map of torchmetrics metrics. Defaults to None.
checkpoints_location (str) β path to checkpoints directory. Defaults to βcheckpointsβ.
checkpoint_every (Optional[int]) β save a checkpoint every
checkpoint_everyepochs. Disabled if None. Defaults to None.name (Optional[str], optional) β trainer custom name. Defaults to None.
- train_dataloader: DataLoader = Noneο
PyTorch
DataLoaderfor training dataset.
- validation_dataloader: DataLoader = Noneο
PyTorch
DataLoaderfor validation dataset.
- test_dataloader: DataLoader = Noneο
PyTorch
DataLoaderfor test dataset.
- loss: Module = Noneο
Loss criterion.
- optimizer: Optimizer = Noneο
Optimizer.
- lr_scheduler: Module = Noneο
Learning rate scheduler.
- torch_rng: Generator = Noneο
PyTorch random number generator (PRNG).
- train_glob_step: int = 0ο
Total number training batches used so far, across all epochs.
- validation_glob_step: int = 0ο
Total number validation batches used so far, across all epochs.
- test_glob_step: int = 0ο
Total number test batches used so far, across all epochs.
- model: Module = Noneο
PyTorch model to train.
- metrics: Dict[str, Callable]ο
Dictionary of
torchmetricsmetrics, indexed by user-defined names.
- property strategy: TorchDistributedStrategyο
Strategy currently in use.
- property device: strο
Current device from distributed strategy.
- create_model_loss_optimizer() None[source]ο
Instantiate a torch model, loss, optimizer, and LR scheduler using the configuration provided in the Trainer constructor. Generally a user-define method.
- create_dataloaders(train_dataset: Dataset, validation_dataset: Dataset | None = None, test_dataset: Dataset | None = None) None[source]ο
Create train, validation and test dataloaders using the configuration provided in the Trainer constructor. Generally a user-define method.
- Parameters:
train_dataset (Dataset) β training dataset object.
validation_dataset (Optional[Dataset]) β validation dataset object. Default None.
test_dataset (Optional[Dataset]) β test dataset object. Default None.
- execute(train_dataset: Dataset, validation_dataset: Dataset, test_dataset: Dataset) Tuple[Dataset, Dataset, Dataset, Any][source]ο
Prepares distributed environment and data structures for the actual training.
- Parameters:
train_dataset (Dataset) β training dataset.
validation_dataset (Dataset) β validation dataset.
test_dataset (Dataset) β test dataset.
- Returns:
training dataset, validation dataset, test dataset, trained model.
- Return type:
Tuple[Dataset, Dataset, Dataset, Any]
- set_epoch(epoch: int) None[source]ο
Set current epoch at the beginning of training.
- Parameters:
epoch (int) β epoch number, from 0 to
epochs-1.
- log(item: Any | List[Any], identifier: str | List[str], kind: str = 'metric', step: int | None = None, batch_idx: int | None = None, **kwargs) None[source]ο
Log
itemwithidentifiername ofkindtype atsteptime step.- Parameters:
item (Union[Any, List[Any]]) β element to be logged (e.g., metric).
identifier (Union[str, List[str]]) β unique identifier for the element to log(e.g., name of a metric).
kind (str, optional) β type of the item to be logged. Must be one among the list of self.supported_types. Defaults to βmetricβ.
step (Optional[int], optional) β logging step. Defaults to None.
batch_idx (Optional[int], optional) β DataLoader batch counter (i.e., batch idx), if available. Defaults to None.
- save_checkpoint(name: str, epoch: int, loss: Tensor | None = None) None[source]ο
Save training checkpoint.
- Parameters:
name (str) β name of the checkpoint.
epoch (int) β current training epoch.
loss (Optional[torch.Tensor]) β current loss (if available).
- load_checkpoint(name: str) None[source]ο
Load state from a checkpoint.
- Parameters:
name (str) β name of the checkpoint to load, assuming it is under
self.checkpoints_locationlocation.
- train()[source]ο
Trains a machine learning model. Main training loop/logic.
- Parameters:
train_dataset (Dataset) β training dataset.
validation_dataset (Dataset) β validation dataset.
test_dataset (Dataset) β test dataset.
- Returns:
training dataset, validation dataset, test dataset, trained model.
- Return type:
Tuple[Dataset, Dataset, Dataset, Any]
- compute_metrics(true: Tensor, pred: Tensor, logger_step: int, batch_idx: int | None, stage: str = 'train') Dict[str, Any][source]ο
Compute and log metrics.
- Parameters:
metrics (Dict[str, Metric]) β metrics dict. Can be
self.train_metricsorself.validation_metrics.true (Batch) β true values.
pred (Batch) β predicted values.
logger_step (int) β global step to pass to the logger.
stage (str) β βtrainβ, βvalidationββ¦
- Returns:
metric values.
- Return type:
Dict[str, Any]
- training_step(batch: Tensor, batch_idx: int) Tuple[Module, Dict[str, Any]][source]ο
Perform a single optimization step using a batch sampled from the training dataset.
- Parameters:
batch (Batch) β batch sampled by a dataloader.
batch_idx (int) β batch index in the dataloader.
- Returns:
batch loss and dictionary of metric values with the same structure of
self.metrics.- Return type:
Tuple[Loss, Dict[str, Any]]
- validation_step(batch: Tensor, batch_idx: int) Tuple[Module, Dict[str, Any]][source]ο
Perform a single optimization step using a batch sampled from the validation dataset.
- Parameters:
batch (Batch) β batch sampled by a dataloader.
batch_idx (int) β batch index in the dataloader.
- Returns:
batch loss and dictionary of metric values with the same structure of
self.metrics.- Return type:
Tuple[Loss, Dict[str, Any]]
- train_epoch() Module[source]ο
Perform a complete sweep over the training dataset, completing an epoch of training.
- Returns:
average training loss for the current epoch.
- Return type:
Loss
- class itwinai.torch.trainer.TorchLightningTrainer(config: Dict | str, mlflow_saved_model: str = 'my_model')[source]ο
Bases:
TrainerGeneric trainer for torch Lightning workflows.
- Parameters:
config (Union[Dict, str]) β Lightning configuration which can be the path to a file or a Python dictionary.
mlflow_saved_model (str, optional) β name of the model created in MLFlow. Defaults to βmy_modelβ.
- itwinai.torch.trainer.preproc_dataloader(dataloader: DataLoader, gwsize, grank)[source]ο
Makes a Dataloader distributed.
- itwinai.torch.trainer.distributed(func)[source]ο
The decorated function must have a standard signature. Its first arguments must be: model, train_dataloader, validation_dataloader, device (in this order).
Additional args or kwargs are allowed consistently with the signature of the decorated function.
type.pyο
Custom types definition.
- itwinai.torch.type.Lossο
Torch Loss function.
- itwinai.torch.type.LrSchedulerο
Torch learning rate scheduler
- itwinai.torch.type.Batchο
Torch data batch sampled by a
DataLoader.
- itwinai.torch.type.Metricο
Torch metric function provided by
torchmetricslibrary.alias of
Callable
reproducibility.pyο
This module provides the tools to support reproducible execution of torch scripts.
- itwinai.torch.reproducibility.set_seed(rnd_seed: int | None, deterministic_cudnn: bool = True) Generator[source]ο
Set torch random seed and return a PRNG object.
- Parameters:
rnd_seed (Optional[int]) β random seed. If None, the seed is not set.
deterministic_cudnn (bool) β if True, sets
torch.backends.cudnn.benchmark = False, which may affect performances.
- Returns:
PRNG object.
- Return type:
torch.Generator
config.pyο
Default configuration
- class itwinai.torch.config.Configuration(**extra_data: Any)[source]ο
Bases:
BaseModelBase configuration class.
- model_computed_fields: ClassVar[dict[str, ComputedFieldInfo]] = {}ο
A dictionary of computed field names and their corresponding ComputedFieldInfo objects.
- model_config: ClassVar[ConfigDict] = {'extra': 'allow'}ο
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_fields: ClassVar[dict[str, FieldInfo]] = {}ο
Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].
This replaces Model.__fields__ from Pydantic V1.
- class itwinai.torch.config.TrainingConfiguration(*, batch_size: int = 32, shuffle_train: bool = False, shuffle_validation: bool = False, shuffle_test: bool = False, pin_memory: bool = False, num_workers: int = 4, lr: float = 0.001, momentum: float = 0.9, fp16_allreduce: bool = False, use_adasum: bool = False, gradient_predivide_factor: float = 1.0, **extra_data: Any)[source]ο
Bases:
ConfigurationDefault configuration object for training. Override and/or create new configurations using the constructor.
Example:
>>> cfg = TrainingConfiguration(batch_size=2, param_a=42) >>> print(cfg.batch_size) # returns 17 (overrides default) >>> print(cfg.param_a) # returns 42 (new value) >>> print(cfg.pin_memory) # returns the default value >>> >>> from rich import print >>> print(cfg) # pretty-print of configuration
- batch_size: intο
Batch size. In a distributed environment it is usually the per-worker batch size. Defaults to 32.
- shuffle_train: boolο
Whether to shuffle train dataset when creating a torch
DataLoader. Defaults to False.
- shuffle_validation: boolο
Whether to shuffle validation dataset when creating a torch
DataLoader. Defaults to False.
- shuffle_test: boolο
Whether to shuffle test dataset when creating a torch
DataLoader. Defaults to False.
- pin_memory: boolο
Whether to pin GPU memory. Property of torch
DataLoader. Defaults to False.
- num_workers: intο
Number of parallel workers used by torch
DataLoader. Defaults to 4.
- lr: floatο
Learning rate used by the optimizer. Defaults to 1e-3.
- momentum: floatο
Momentum used by some optimizers (e.g., SGD). Defaults to 0.9.
- fp16_allreduce: boolο
uses float16 operations in the allreduce distributed gradients aggregation. Better performances at lower precision. Defaults to False.
- Type:
Parameter of Horovodβs
DistributedOptimizer
- use_adasum: boolο
use Adasum optimization. Defaults to False.
- Type:
Parameter of Horovodβs
DistributedOptimizer
- gradient_predivide_factor: floatο
scale gradients before adding them up. Defaults to 1.0.
- Type:
Parameter of Horovodβs
DistributedOptimizer
- model_computed_fields: ClassVar[dict[str, ComputedFieldInfo]] = {}ο
A dictionary of computed field names and their corresponding ComputedFieldInfo objects.
- model_config: ClassVar[ConfigDict] = {'extra': 'allow'}ο
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_fields: ClassVar[dict[str, FieldInfo]] = {'batch_size': FieldInfo(annotation=int, required=False, default=32), 'fp16_allreduce': FieldInfo(annotation=bool, required=False, default=False), 'gradient_predivide_factor': FieldInfo(annotation=float, required=False, default=1.0), 'lr': FieldInfo(annotation=float, required=False, default=0.001), 'momentum': FieldInfo(annotation=float, required=False, default=0.9), 'num_workers': FieldInfo(annotation=int, required=False, default=4), 'pin_memory': FieldInfo(annotation=bool, required=False, default=False), 'shuffle_test': FieldInfo(annotation=bool, required=False, default=False), 'shuffle_train': FieldInfo(annotation=bool, required=False, default=False), 'shuffle_validation': FieldInfo(annotation=bool, required=False, default=False), 'use_adasum': FieldInfo(annotation=bool, required=False, default=False)}ο
Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].
This replaces Model.__fields__ from Pydantic V1.