Source code for itwinai.slurm.configuration

# --------------------------------------------------------------------------------------
# Part of the interTwin Project: https://www.intertwin.eu/
#
# Created by: Jarl Sondre Sæther
#
# Credit:
# - Jarl Sondre Sæther <jarl.sondre.saether@cern.ch> - CERN
# - Matteo Bunino <matteo.bunino@cern.ch> - CERN
# --------------------------------------------------------------------------------------

from pathlib import Path
from typing import List, Literal

from pydantic import BaseModel, ConfigDict, Field, field_validator

from .constants import DEFAULT_SLURM_SAVE_DIR, SLURM_TEMPLATE
from .utils import remove_indentation_from_multiline_string

DEFAULT_TRAINING_CMD = (
    "{itwinai_launcher} exec-pipeline "
    "--config-name={config_name} "
    "--config-path={config_path} "
    "--strategy={distributed_strategy} "
    "--run-name={run_name} "
    "+pipe_key={pipe_key} "
)


[docs] class SlurmScriptConfiguration(BaseModel): """Configuration object for the SLURM script. It contains all the settings for the SLURM script such as which hardware you are requesting or for how long to run it. As it allows for any ``pre_exec_command`` and ``exec_command``, it should work for any SLURM script. """ model_config = ConfigDict(extra="forbid") #: Optional job name for the SLURM job. Defaults to None (auto-generated later). job_name: str | None = None #: Billing account to charge the job to. Required. account: str #: Partition/queue the job should run on. Required. partition: str #: Wall-clock time limit for the job (``HH:MM:SS``). Defaults to ``00:30:00``. time: str = "00:30:00" #: Path to standard output file. Defaults to None (filled later). std_out: Path | None = None #: Path to standard error file. Defaults to None (filled later). err_out: Path | None = None #: Number of nodes requested. Defaults to 1. num_nodes: int = 1 #: Total number of tasks, on all nodes. Defaults to None (computed dynamically). num_tasks: int | None = None #: Number of tasks per node. Defaults to 1. num_tasks_per_node: int = 1 #: GPUs per node requested. Defaults to 4. gpus_per_node: int = 4 #: CPUs per task requested. Defaults to 16. cpus_per_task: int = 16 #: Memory per node requested. Defaults to "16G". memory: str = "16G" #: Whether to request exclusive node access. Defaults to False. exclusive: bool = False #: Pre-execution command content (shell). Defaults to None (set by builder). #: Typically used to set up the environment before executing the command, #: e.g. "ml Python", "source .venv/bin/activate" etc. #: Usually this should not be set by the user except for advanced use cases, and it #: will be generated by the SLURM script builder based on the configuration. pre_exec_command: str | None = None #: Main execution command content (shell). Defaults to None (set by builder). #: Command to execute, typically an 'srun' command. #: Usually this should not be set by the user except for advanced use cases, and it #: will be generated by the SLURM script builder based on the configuration. exec_command: str | None = None # Builder behavior #: Whether to save the generated SLURM script. Defaults to False. save_script: bool = False #: Whether to submit the generated SLURM script. Defaults to False. submit_job: bool = False #: Directory where the script should be saved. Defaults to "slurm-scripts". save_dir: Path | None = Field( default=Path(DEFAULT_SLURM_SAVE_DIR), description=("Directory to write the generated SLURM script. " f"Defaults to {DEFAULT_SLURM_SAVE_DIR}"), ) #: Path/URL to a pre-exec file to load content from. Ignored if not provided. #: Defaults to None. pre_exec_file: str | None = None #: Path/URL to an exec file to load content from. Ignored if not provided. #: Defaults to None. exec_file: str | None = None
[docs] def exclusive_line(self) -> str: return "#SBATCH --exclusive" if self.exclusive else ""
[docs] def generate_script(self) -> str: """Uses the provided configuration parameters and formats a SLURM script with the requested settings. Returns: str: A string containing the SLURM script. """ if ( self.std_out is None or self.err_out is None or self.job_name is None or self.pre_exec_command is None or self.exec_command is None ): raise ValueError( "SlurmScriptConfiguration has some fields set to None! Make sure to set all" " fields before generating script! Configuration was formatted as follows:\n" f"{repr(self)}" ) # Compute num_tasks if needed self.num_tasks = ( self.num_tasks if self.num_tasks is not None else self.num_nodes * self.num_tasks_per_node ) return SLURM_TEMPLATE.format_map( self.model_dump() | {"exclusive_line": self.exclusive_line()} )
[docs] class MLSlurmBuilderConfig(SlurmScriptConfiguration): """Extends the base SLURM configuration with ML builder-specific options.""" model_config = ConfigDict(extra="forbid") #: Whether to launch jobs via Ray. Defaults to False. use_ray: bool = False #: Optional container path to export. Defaults to None. container_path: Path | None = None #: Distributed strategy to use for training. Required. distributed_strategy: Literal["ddp", "horovod", "deepspeed"] #: Execution mode can be a single job, all strategies, or scaling test (with all #: strategies). Defaults to "single". mode: Literal["single", "runall", "scaling-test"] = "single" #: Optional custom training command template. Can reference any field in this config. #: Defaults to ``{itwinai_launcher} exec-pipeline --config-name={config_name} #: --config-path={config_path} --strategy={distributed_strategy} --run_name={run_name} #: +pipe_key={pipe_key}``. training_cmd: str | None = Field( default=DEFAULT_TRAINING_CMD, description=f"Template for the training command. Defaults to '{DEFAULT_TRAINING_CMD}'", ) #: Python virtual environment to activate. Defaults to None. python_venv: str | None = None #: Hydra config name to pass to exec-pipeline. Defaults to "config". config_name: str = "config" #: Hydra config path to pass to exec-pipeline. Defaults to ".". config_path: str = "." #: Pipeline key to execute. Defaults to "training_pipeline". pipe_key: str = "training_pipeline" #: Node counts to use for scaling tests. Defaults to [1, 2, 4, 8]. scalability_nodes: List[int] = Field( default_factory=lambda: [1, 2, 4, 8], description="List of node counts used when mode='scaling-test'.", ) #: Enable py-spy profiling. Defaults to False. py_spy: bool = False #: Sampling rate for py-spy profiling. Defaults to 10. profiling_sampling_rate: int = 10 #: Run name for tracking. Defaults to "main-run". run_name: str = "main-run"
[docs] @field_validator("scalability_nodes", mode="before") @classmethod def parse_scalability_nodes(cls, value): if isinstance(value, str): try: return [int(n) for n in value.split(",") if n] except ValueError as e: raise ValueError( f"Invalid scalability_nodes '{value}', expected comma-separated ints." ) from e return value
[docs] @field_validator("mode", "distributed_strategy", mode="before") @classmethod def normalize_choices(cls, value): if isinstance(value, str): return value.lower() return value
[docs] def build_training_command(self) -> str: """Render the training command using the configured template and fields.""" context = self.model_dump() if self.python_venv: itwinai_launcher = (Path(self.python_venv) / "bin" / "itwinai").resolve() else: itwinai_launcher = "itwinai" context["itwinai_launcher"] = itwinai_launcher template = self.training_cmd or DEFAULT_TRAINING_CMD try: rendered = template.format(**context) except KeyError as exc: raise ValueError(f"Unknown placeholder in training_cmd template: {exc}") from exc return remove_indentation_from_multiline_string(rendered.strip())