# --------------------------------------------------------------------------------------
# Part of the interTwin Project: https://www.intertwin.eu/
#
# Created by: Jarl Sondre Sæther
#
# Credit:
# - Jarl Sondre Sæther <jarl.sondre.saether@cern.ch> - CERN
# - Linus Eickhoff <linus.maximilian.eickhoff@cern.ch> - CERN
# --------------------------------------------------------------------------------------
from typing import Set, Tuple
import pandas as pd
from scipy.constants import hour as SECONDS_IN_HOUR
from itwinai.utils import deprecated
[docs]
def check_contains_columns(df: pd.DataFrame, expected_columns: Set) -> None:
"""Validates that the given DataFrame contains all the expected columns. Raises a
ValueError if any columns are missing, including the file path in the error message
if provided.
"""
if not expected_columns.issubset(df.columns):
missing_columns = expected_columns - set(df.columns)
raise ValueError(
f"Invalid data format for given DataFrame. \nMissing columns:{missing_columns}."
)
[docs]
def check_probing_interval_consistency(gpu_data_df: pd.DataFrame) -> None:
"""Checks that the probing_interval is consistent within each group of strategy
and number of GPUs.
Raises:
ValueError: If the probing intervals are inconsistent for any group.
"""
unique_intervals = gpu_data_df.groupby(["strategy", "global_world_size"])[
"probing_interval"
].nunique()
if (unique_intervals > 1).any():
inconsistent_group = unique_intervals.max()
raise ValueError(
f"probing_interval must have the same value for each strategy and "
f"number of GPUs, but was inconsistent for strategy: {inconsistent_group[0]} "
f"and number of GPUs: {inconsistent_group[1]}."
)
[docs]
def calculate_epoch_statistics(
epoch_time_df: pd.DataFrame, expected_columns: Set
) -> pd.DataFrame:
"""Calculates the average epoch time for each strategy and number of GPUs from the
given DataFrame. The DataFrame is expected to contain the columns 'strategy',
'global_world_size', 'sample_idx', 'metric_name', and 'value'.
The 'metric_name' column should contain the value 'epoch_time_s' for epoch time
measurements.
Args:
epoch_time_df (pd.DataFrame): DataFrame containing epoch time data.
expected_columns (Set): Set of expected columns in the DataFrame.
Returns:
pd.DataFrame: A DataFrame containing the average epoch time for each strategy and
number of GPUs, with the columns ``strategy``, ``global_world_size``,
``sample_idx``, and ``avg_epoch_time``.
Raises:
ValueError: If the given DataFrame does not contain the expected columns.
ValueError: If the probing intervals are inconsistent for any group.
"""
check_contains_columns(df=epoch_time_df, expected_columns=expected_columns)
# do not modify inplace
epoch_time_df = epoch_time_df.copy()
mask = epoch_time_df["metric_name"] == "epoch_time_s"
# Ensure value and global_world_size is numeric
epoch_time_df.loc[mask, "value"] = pd.to_numeric(epoch_time_df.loc[mask, "value"])
epoch_time_df["global_world_size"] = pd.to_numeric(epoch_time_df["global_world_size"])
pivoted = epoch_time_df.pivot_table(
index=["strategy", "global_world_size", "sample_idx"],
columns="metric_name",
values="value",
).reset_index()
# Merge previous columns into the pivoted DataFrame
pivoted = pivoted.merge(
epoch_time_df[["strategy", "global_world_size", "sample_idx"]],
how="left",
on=["strategy", "global_world_size", "sample_idx"],
)
# Aggregate as before
aggregated_df = (
pivoted.groupby(["strategy", "global_world_size"])
.agg(avg_epoch_time=("epoch_time_s", "mean"))
.reset_index()
)
return aggregated_df
[docs]
def calculate_gpu_statistics(gpu_data_df: pd.DataFrame, expected_columns: Set) -> pd.DataFrame:
"""Calculates both the total energy expenditure (in Watt-hours) and the average GPU
utilization for each strategy and number of GPUs. Ensures consistent probing intervals.
Returns:
pd.DataFrame: A DataFrame containing the total energy expenditure and
average GPU utilization for each strategy and number of GPUs, with
the columns ``strategy``, ``global_world_size``, ``total_energy_wh``,
and ``utilization``.
Raises:
ValueError: If the given DataFrame does not contain the expected columns.
ValueError: If the probing intervals are inconsistent for any group.
"""
check_contains_columns(df=gpu_data_df, expected_columns=expected_columns)
check_probing_interval_consistency(gpu_data_df)
mask = gpu_data_df["metric_name"] == "gpu_power_W"
# Ensure value and probing_interval are numeric
gpu_data_df.loc[mask, "value"] = pd.to_numeric(gpu_data_df.loc[mask, "value"])
gpu_data_df.loc[mask, "probing_interval"] = pd.to_numeric(
gpu_data_df.loc[mask, "probing_interval"]
)
# Ensure global_world_size is numeric
gpu_data_df["global_world_size"] = pd.to_numeric(gpu_data_df["global_world_size"])
# Calculate energy in watt hours
gpu_data_df.loc[mask, "energy_wh"] = (
gpu_data_df.loc[mask, "value"]
* gpu_data_df.loc[mask, "probing_interval"]
/ SECONDS_IN_HOUR
)
# shift metrics to columns (assumes samples are the same for each strategy and
# global_world_size), ensured earlier by check_probing_interval_consistency
pivoted = gpu_data_df.pivot_table(
index=["strategy", "global_world_size", "timestamp"],
columns="metric_name",
values="value",
).reset_index()
# Merge previous columns into the pivoted DataFrame
pivoted = pivoted.merge(
gpu_data_df[["strategy", "global_world_size", "timestamp", "energy_wh"]],
how="left",
on=["strategy", "global_world_size", "timestamp"],
)
# Aggregate as before
aggregated_df = (
pivoted.groupby(["strategy", "global_world_size"])
.agg(
total_energy_wh=("energy_wh", "sum"), # Total energy in watt-hours
utilization=("gpu_utilization_percent", "mean"), # Average GPU utilization
)
.reset_index()
)
return aggregated_df
[docs]
@deprecated(
"Communication vs computation is unreliable and comparable between GPU"
" architectures. Please use calculate_comp_time instead."
)
def calculate_comp_and_comm_time(df: pd.DataFrame) -> Tuple[float, float]:
"""Calculates the time spent on computation and communication in seconds from the
given DataFrame, assuming an NCCL backend.
Raises:
ValueError: If the DataFrame is missing the required columns 'name' or
'self_cuda_time_total'.
"""
expected_columns = {"name", "self_cuda_time_total"}
check_contains_columns(df=df, expected_columns=expected_columns)
comm_types = [
"all_reduce",
"broadcast",
"reduce",
"all_gather",
"gather",
"reduce_scatter",
]
nccl_comm_pattern = rf"nccl:(?:{'|'.join(comm_types)})"
cuda_stream_pattern = r"cudaStream(?:WaitEvent|Synchronize)"
# Any operation that is a part of PyTorch's ATen library and autograd is considered a
# computation.
# See torch namespaces: https://docs.pytorch.org/cppdocs/api/library_root.html
aten_comp_pattern = r".*(?:aten|\sat|c10|autograd)::"
comm_df = df[
(df["name"].str.contains(nccl_comm_pattern))
| (df["name"].str.contains(cuda_stream_pattern))
]
comp_df = df[df["name"].str.contains(aten_comp_pattern)]
comp_time = comp_df["self_cuda_time_total"].sum()
comm_time = comm_df["self_cuda_time_total"].sum()
# Converting from microseconds to seconds
comp_time *= 1e-6
comm_time *= 1e-6
return comp_time, comm_time
[docs]
def calculate_comp_time(df: pd.DataFrame) -> float:
"""Calculates the time spent on computation in seconds from the
given DataFrame.
Raises:
ValueError: If the DataFrame is missing the required columns 'name' or
'self_cuda_time_total'.
"""
expected_columns = {"name", "self_cuda_time_total"}
check_contains_columns(df=df, expected_columns=expected_columns)
# Any operation that is a part of PyTorch's ATen library and autograd is considered a
# computation.
# See torch namespaces: https://docs.pytorch.org/cppdocs/api/library_root.html
aten_comp_pattern = r".*(?:aten|\sat|c10|autograd)::"
comp_df = df[df["name"].str.contains(aten_comp_pattern)]
comp_time = comp_df["self_cuda_time_total"].sum()
# Converting from microseconds to seconds
comp_time *= 1e-6
return comp_time
[docs]
@deprecated(
"Communication calculation is unreliable and not comparable between GPU"
" architectures. Please use get_computation_vs_other_data instead."
)
def get_computation_fraction_data(df: pd.DataFrame) -> pd.DataFrame:
"""Calculates the computation fraction for each strategy and GPU configuration,
returning a DataFrame with the results. The computation fraction is defined as the
ratio of computation time to the total time (computation + communication).
"""
# Group by strategy and number of GPUs, calculate computation fraction
def compute_fraction(group):
comp_time, comm_time = calculate_comp_and_comm_time(df=group)
return comp_time / (comp_time + comm_time + 1e-10)
grouped = df.groupby(["strategy", "num_gpus"]).apply(compute_fraction)
# Sort and create cartesian product of unique strategies and GPU counts
unique_num_gpus = sorted(df["num_gpus"].unique(), key=int)
unique_strategies = sorted(df["strategy"].unique())
index = pd.MultiIndex.from_product(
[unique_strategies, unique_num_gpus], names=["strategy", "num_gpus"]
)
# Reindex to fill in missing combinations with NaN
result_df = pd.DataFrame(grouped.reindex(index))
result_df = result_df.reset_index()
result_df.columns = ["strategy", "num_gpus", "computation_fraction"]
return result_df
[docs]
def get_computation_vs_other_data(df: pd.DataFrame) -> pd.DataFrame:
"""Calculates the computation fraction for each strategy and GPU configuration,
returning a DataFrame with the results. The computation fraction is defined as the
ratio of computation time to the total time of profiling.
"""
# Group by strategy and number of GPUs, calculate computation fraction
def compute_fraction(group):
# Theoretically, two identical float32 runtimes could be the same and thus disregarded
# but this is impossible to happen in practice for ml runs.
# Convert from microseconds to seconds
total_time = group["self_cuda_time_total"].sum() * 1e-6
profiler_overhead = group.loc[
group["name"] == "ProfilerStep*", "self_cuda_time_total"
].values
profiler_overhead = profiler_overhead.sum() if len(profiler_overhead) > 0 else 0.0
profiler_overhead *= 1e-6 # Convert from microseconds to seconds
total_time_without_profiler = total_time - profiler_overhead
comp_time = calculate_comp_time(df=group)
return comp_time / (total_time_without_profiler + 1e-10)
grouped = df.groupby(["num_gpus", "strategy"]).apply(compute_fraction)
# Sort and create cartesian product of unique strategies and GPU counts
unique_num_gpus = sorted(df["num_gpus"].unique(), key=int)
unique_strategies = sorted(df["strategy"].unique())
index = pd.MultiIndex.from_product(
[unique_num_gpus, unique_strategies], names=["strategy", "num_gpus"]
)
# Reindex to fill in missing combinations with NaN
result_df = pd.DataFrame(grouped.reindex(index))
result_df = result_df.reset_index()
result_df.columns = ["strategy", "num_gpus", "computation_fraction"]
return result_df