Source code for itwinai.torch.profiling.profiler

# --------------------------------------------------------------------------------------
# Part of the interTwin Project: https://www.intertwin.eu/
#
# Created by: Jarl Sondre Sæther
#
# Credit:
# - Jarl Sondre Sæther <jarl.sondre.saether@cern.ch> - CERN
# - Matteo Bunino <matteo.bunino@cern.ch> - CERN
# - Linus Eickhoff <linus.maximilian.eickhoff@cern.ch> - CERN
# --------------------------------------------------------------------------------------

import functools
import logging
import tempfile
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, Iterable, Tuple

import matplotlib
import pandas as pd
from torch.profiler import ProfilerActivity, profile, schedule, tensorboard_trace_handler

from itwinai.constants import PROFILER_TRACES_DIR_NAME, PROFILING_AVG_NAME

if TYPE_CHECKING:
    from itwinai.torch.trainer import TorchTrainer


# Doing this because otherwise I get an error about X11 Forwarding which I believe
# is due to the server trying to pass the image to the client computer
matplotlib.use("Agg")

py_logger = logging.getLogger(__name__)


[docs] def profile_torch_trainer(method: Callable) -> Callable: """Decorator for execute method for components. Profiles function calls and stores the result for future analysis (e.g. computation vs. other plots). """ def gather_profiling_data(key_averages: Iterable) -> pd.DataFrame: profiling_data = [] for event in key_averages: profiling_data.append( { "name": event.key, "node_id": event.node_id, "self_cuda_time_total": event.self_device_time_total, "cuda_time_total": event.device_time_total, "cuda_time_total_str": event.device_time_total_str, "calls": event.count, } ) return pd.DataFrame(profiling_data) def adjust_wait_and_warmup_epochs( training_epochs: int, wait_epochs: int, warmup_epochs: int ) -> Tuple[int, int, int]: """Validates if the given wait and warmup epochs are compatible and if not, adjusts them so they fit. The largest one is iteratively decreased until a compatible value is reached. Returns: int: The resulting number of epochs for doing active profiling int: The resulting number of wait epochs, possibly adjusted int: The resulting number of warmup epochs, possibly adjusted """ active_epochs = training_epochs - wait_epochs - warmup_epochs if active_epochs > 0: return active_epochs, wait_epochs, warmup_epochs # This can probably be done with a simple math expression, but this was # simpler to implement and won't really cause much overhead anyway... while active_epochs <= 0: if wait_epochs > warmup_epochs: wait_epochs -= 1 else: warmup_epochs -= 1 active_epochs = training_epochs - wait_epochs - warmup_epochs if wait_epochs < 0 or warmup_epochs < 0: raise ValueError( f"Unable to adjust wait and warmup epochs to accomodate the" f"given number of training epochs. Was given the following values: " f"Training epochs: {training_epochs}, wait epochs: {wait_epochs}" f", warmup epochs: {warmup_epochs}" ) py_logger.warning( f"Adjusted the given wait and warmup epochs for the profiler - " f"wait epochs: {wait_epochs}, warmup epochs: {warmup_epochs}." ) return active_epochs, wait_epochs, warmup_epochs @functools.wraps(method) def profiled_method(self: "TorchTrainer", *args, **kwargs) -> Any: if not self.enable_torch_profiling: py_logger.info( "Profiling of computation with the PyTorch profiler has been disabled!" ) return method(self, *args, **kwargs) active_epochs, wait_epochs, warmup_epochs = adjust_wait_and_warmup_epochs( training_epochs=self.epochs, wait_epochs=self.profiling_wait_epochs, warmup_epochs=self.profiling_warmup_epochs, ) # Set correct values for the profiling epochs self.profiling_wait_epochs = wait_epochs self.profiling_warmup_epochs = warmup_epochs if self.store_torch_profiling_traces: trace_handler = tensorboard_trace_handler( dir_name=f"{PROFILER_TRACES_DIR_NAME}/{self.run_name}/torch-traces", worker_name=f"worker_{self.strategy.global_rank()}", ) else: py_logger.warning("Profiling computation without storing the traces!") trace_handler = None with profile( activities=[ProfilerActivity.CUDA, ProfilerActivity.CPU], schedule=schedule( wait=wait_epochs, warmup=warmup_epochs, active=active_epochs, ), on_trace_ready=trace_handler, with_modules=True, ) as profiler: self.profiler = profiler result = method(self, *args, **kwargs) strategy = self.strategy strategy_name = strategy.name global_rank = strategy.global_rank() num_gpus_global = strategy.global_world_size() # Extracting and storing the profiling data py_logger.info( "Computing key averages of the profiling data..." ) key_averages = profiler.key_averages() py_logger.info( "gathering profiling averages into a pandas DataFrame..." ) profiling_dataframe = gather_profiling_data(key_averages=key_averages) profiling_dataframe["strategy"] = strategy_name profiling_dataframe["num_gpus"] = num_gpus_global profiling_dataframe["global_rank"] = global_rank # write profiling averages to mlflow logger if self.mlflow_logger is None: py_logger.info("No MLflow logger is set, not logging the profiling data!") return result temp_dir = tempfile.gettempdir() csv_path = Path(temp_dir) / f"{PROFILING_AVG_NAME}.csv" profiling_dataframe.to_csv(csv_path, index=False) self.mlflow_logger.log( item=str(csv_path), identifier=PROFILING_AVG_NAME, kind="artifact", ) py_logger.info( f"Logged profiling averages to MLflow logger at: {csv_path}" ) csv_path.unlink() # Remove after logging return result return profiled_method