# --------------------------------------------------------------------------------------
# Part of the interTwin Project: https://www.intertwin.eu/
#
# Created by: Jarl Sondre Sæther
#
# Credit:
# - Jarl Sondre Sæther <jarl.sondre.saether@cern.ch> - CERN
# - Matteo Bunino <matteo.bunino@cern.ch> - CERN
# - Linus Eickhoff <linus.maximilian.eickhoff@cern.ch> - CERN
# --------------------------------------------------------------------------------------
import logging
from pathlib import Path
from typing import List
from mlflow.tracking import MlflowClient
from itwinai.scalability_report.data import (
read_epoch_time_from_mlflow,
read_gpu_metrics_from_mlflow,
read_profiling_data_from_mlflow,
)
from itwinai.scalability_report.plot import (
absolute_avg_epoch_time_plot,
computation_fraction_bar_plot,
computation_vs_other_bar_plot,
gpu_bar_plot,
relative_epoch_time_speedup_plot,
)
from itwinai.scalability_report.utils import (
calculate_epoch_statistics,
calculate_gpu_statistics,
get_computation_fraction_data,
get_computation_vs_other_data,
)
from itwinai.utils import deprecated
cli_logger = logging.getLogger("cli_logger")
[docs]
def epoch_time_report(
plot_dir: Path | str,
mlflow_client: MlflowClient,
experiment_name: str,
run_names: List[str] | None = None,
plot_file_suffix: str = ".png",
) -> str | None:
"""Generates reports and plots for epoch training times across distributed training
strategies, including a log-log plot of absolute average epoch times against the
number of GPUs and a log-log plot of relative speedup as more GPUs are added.
Args:
plot_dir (Path | str): Path to the directory where the generated plots will
be saved.
mlflow_client (MlflowClient): MLflow client to interact with the MLflow tracking
server.
experiment_name (str): Name of the MLflow experiment to retrieve epoch time data
from.
run_names (List[str] | None): List of specific run names to filter the epoch
time data. If None, all runs in the experiment will be considered.
plot_file_suffix (str): Suffix for the plot file names. Defaults to ".png".
Returns:
str | None: A string representation of the epoch time statistics table, or None if
no data was found.
"""
epoch_time_df = read_epoch_time_from_mlflow(
mlflow_client=mlflow_client,
experiment_name=experiment_name,
run_names=run_names,
)
if epoch_time_df is None:
return None
cli_logger.info("\nAnalyzing Epoch Time Data...")
epoch_time_expected_columns = {
"strategy",
"global_world_size",
"sample_idx",
"metric_name",
"value",
}
avg_epoch_time_df = calculate_epoch_statistics(
epoch_time_df=epoch_time_df,
expected_columns=epoch_time_expected_columns,
)
# Print the resulting table
formatters = {"avg_epoch_time": "{:.2f} s".format}
epoch_time_table = avg_epoch_time_df.to_string(index=False, formatters=formatters)
# Create and save the figures
absolute_fig, _ = absolute_avg_epoch_time_plot(avg_epoch_time_df=avg_epoch_time_df)
relative_fig, _ = relative_epoch_time_speedup_plot(avg_epoch_time_df=avg_epoch_time_df)
if isinstance(plot_dir, str):
plot_dir = Path(plot_dir).resolve()
absolute_avg_time_plot_path = plot_dir / ("absolute_epoch_time" + plot_file_suffix)
relative_speedup_plot_path = plot_dir / ("relative_epoch_time_speedup" + plot_file_suffix)
absolute_fig.savefig(absolute_avg_time_plot_path)
relative_fig.savefig(relative_speedup_plot_path)
cli_logger.info(
f"Saved absolute-average-time plot at '{absolute_avg_time_plot_path.resolve()}'."
)
cli_logger.info(
f"Saved relative-average-time plot at '{relative_speedup_plot_path.resolve()}'."
)
return epoch_time_table
[docs]
def gpu_data_report(
plot_dir: Path | str,
mlflow_client: MlflowClient,
experiment_name: str,
run_names: List[str] | None = None,
plot_file_suffix: str = ".png",
ray_footnote: str | None = None,
) -> str | None:
"""Generates reports and plots for GPU energy consumption and utilization across
distributed training strategies. Includes bar plots for energy consumption and GPU
utilization by strategy and number of GPUs.
Args:
plot_dir (Path | str): Path to the directory where the generated plots will
be saved.
mlflow_client (MlflowClient): MLflow client to interact with the MLflow tracking
experiment_name (str): Name of the MLflow experiment to retrieve GPU data from.
run_names (List[str] | None): List of specific run names to filter the GPU data.
If None, all runs in the experiment will be considered.
plot_file_suffix (str): Suffix for the plot file names. Defaults to ".png".
ray_footnote (str | None): Optional footnote for energy plots containing ray
strategies. Defaults to None.
Returns:
str | None: A string representation of the GPU data statistics table, or None if
no data is available.
"""
if isinstance(plot_dir, str):
plot_dir = Path(plot_dir)
gpu_data_expected_columns = {
"metric_name",
"sample_idx",
"global_world_size",
"strategy",
"probing_interval",
}
gpu_data_df = read_gpu_metrics_from_mlflow(
mlflow_client=mlflow_client, experiment_name=experiment_name, run_names=run_names
)
if gpu_data_df is None:
return None
cli_logger.info("\nAnalyzing GPU Data...")
gpu_data_statistics_df = calculate_gpu_statistics(
gpu_data_df=gpu_data_df, expected_columns=gpu_data_expected_columns
)
formatters = {
"total_energy_wh": "{:.2f} Wh".format,
"utilization": "{:.2f} %".format,
}
gpu_data_table = gpu_data_statistics_df.to_string(index=False, formatters=formatters)
energy_plot_path = plot_dir / ("gpu_energy_plot" + plot_file_suffix)
utilization_plot_path = plot_dir / ("utilization_plot" + plot_file_suffix)
energy_fig, _ = gpu_bar_plot(
data_df=gpu_data_statistics_df,
plot_title="Energy Consumption by Framework and Number of GPUs",
y_label="Energy Consumption (Wh)",
main_column="total_energy_wh",
ray_footnote=ray_footnote,
)
utilization_fig, _ = gpu_bar_plot(
data_df=gpu_data_statistics_df,
plot_title="GPU Utilization by Framework and Number of GPUs",
y_label="GPU Utilization (%)",
main_column="utilization",
)
energy_fig.savefig(energy_plot_path)
utilization_fig.savefig(utilization_plot_path)
cli_logger.info(f"Saved GPU energy plot at '{energy_plot_path.resolve()}'.")
cli_logger.info(f"Saved utilization plot at '{utilization_plot_path.resolve()}'.")
return gpu_data_table
[docs]
@deprecated("Please use `computation_data_report` instead.")
def communication_data_report(
plot_dir: Path | str,
mlflow_client: MlflowClient,
experiment_name: str,
run_names: List[str] | None,
plot_file_suffix: str = ".png",
) -> str | None:
"""Generates reports and plots for communication and computation fractions across
distributed training strategies. Includes a bar plot showing the fraction of time
spent on computation vs communication for each strategy and GPU count.
Args:
plot_dir (Path | str): Path to the directory where the generated plot will
be saved.
mlflow_client (MlflowClient): MLflow client to interact with the MLflow tracking
server.
experiment_name (str): Name of the MLflow experiment to retrieve data from.
run_names (List[str]): List of specific run names to filter the data.
If None, all runs in the experiment will be considered.
plot_file_suffix (str): Suffix for the plot file names. Defaults to ".png".
"""
if isinstance(plot_dir, str):
plot_dir = Path(plot_dir).resolve()
communication_data_expected_columns = {
"num_gpus",
"strategy",
"global_rank",
"name",
"self_cuda_time_total",
}
communication_data_df = read_profiling_data_from_mlflow(
mlflow_client,
experiment_name,
run_names,
expected_columns=communication_data_expected_columns,
)
if communication_data_df is None:
return None
cli_logger.info("\nAnalyzing Communication Data...")
computation_fraction_df = get_computation_fraction_data(communication_data_df)
formatters = {"computation_fraction": lambda x: f"{x * 100:.2f} %"}
communication_data_table = computation_fraction_df.to_string(
index=False, formatters=formatters
)
computation_fraction_plot_path = plot_dir / (
"computation_vs_communication_plot" + plot_file_suffix
)
computation_fraction_fig, _ = computation_fraction_bar_plot(computation_fraction_df)
computation_fraction_fig.savefig(computation_fraction_plot_path)
cli_logger.info(
f"Saved computation fraction plot at '{computation_fraction_plot_path.resolve()}'."
)
return communication_data_table
[docs]
def computation_data_report(
plot_dir: Path | str,
mlflow_client: MlflowClient,
experiment_name: str,
run_names: List[str] | None = None,
plot_file_suffix: str = ".png",
) -> str | None:
"""Generates reports and plots for computation and other fractions across
distributed training strategies. Includes a bar plot showing the fraction of time
spent on computation vs other for each strategy and GPU count.
Args:
plot_dir (Path | str): Path to the directory where the generated plot will
be saved.
mlflow_client (MlflowClient): MLflow client to interact with the MLflow tracking
server.
experiment_name (str): Name of the MLflow experiment to retrieve data from.
run_names (List[str] | None): List of specific run names to filter the data.
If None, all runs in the experiment will be considered.
plot_file_suffix (str): Suffix for the plot file names. Defaults to ".png".
Returns:
str | None: A string representation of the computation data statistics table,
or None if no data is available.
"""
if isinstance(plot_dir, str):
plot_dir = Path(plot_dir)
computation_data_expected_columns = {
"strategy",
"num_gpus",
"global_rank",
"name",
"self_cuda_time_total",
}
computation_data_df = read_profiling_data_from_mlflow(
mlflow_client,
experiment_name,
run_names,
expected_columns=computation_data_expected_columns,
)
if computation_data_df is None:
return None
cli_logger.info("\nAnalyzing Computation Data...")
computation_fraction_df = get_computation_vs_other_data(computation_data_df)
formatters = {"computation_fraction": lambda x: f"{x * 100:.2f} %"}
computation_data_table = computation_fraction_df.to_string(
index=False, formatters=formatters
)
computation_fraction_plot_path = plot_dir / (
"computation_vs_other_plot" + plot_file_suffix
)
computation_fraction_fig, _ = computation_vs_other_bar_plot(computation_fraction_df)
computation_fraction_fig.savefig(computation_fraction_plot_path)
cli_logger.info(
f"Saved computation fraction plot at '{computation_fraction_plot_path.resolve()}'."
)
return computation_data_table