itwinai.distributed

class itwinai.distributed.ClusterEnvironment(*, global_rank: int = 0, local_rank: int = 0, global_world_size: int = 1, local_world_size: int = 1)[source]

Bases: BaseModel

Stores information about distributed environment.

global_rank: int

Global rank of current worker, in a distributed environment. global_rank==0 identifies the main worker. Defaults to 0.

local_rank: int

Local rank of current worker, in a distributed environment. Defaults to 0.

global_world_size: int

Total number of workers in a distributed environment. Defaults to 1.

local_world_size: int

Number of workers on the same node in a distributed environment. Defaults to 1.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

itwinai.distributed.ray_cluster_is_running() bool[source]

Check if a Ray cluster is running.

Returns:

True if a running Ray cluster is detected. False otherwise.

Return type:

bool

itwinai.distributed.detect_distributed_environment() ClusterEnvironment[source]

Detect a distributed environment by probing known env vars.

Robust across: - laptop (no SLURM/OMPI/torchrun env): returns default (rank=0, world=1) - interactive SLURM allocation without a job step: returns default - SLURM batch / srun step: detects via SLURM_PROCID - OpenMPI: detects via OMPI rank/size - torchrun / TorchElastic: detects via rank/size + extra torch markers

Returns:

The detected cluster environment.

Return type:

ClusterEnvironment

itwinai.distributed.builtin_print()

Save original builtin print before patching it in distributed environments

itwinai.distributed.distributed_patch_print(is_main: bool) Callable[source]

Disable print() when not in main worker.

Parameters:

is_main (bool) – whether it is called from main worker.

Returns:

patched print().

Return type:

Callable

itwinai.distributed.suppress_workers_print(func: Callable) Callable[source]

Decorator to suppress print() calls in workers having global rank different from 0. To force printing on all workers you need to use print(..., force=True).

itwinai.distributed.suppress_workers_output(func)[source]

Decorator to suppress stadout and stderr in workers having global rank different from 0.

itwinai.distributed.get_adaptive_ray_scaling_config() ScalingConfig[source]

Returns a Ray scaling config for distributed ML training depending on the resources available in the Ray cluster. The number of workers is equal to the number of GPUs available, and if there are not GPUs two CPU-only workers are used.