# --------------------------------------------------------------------------------------
# Part of the interTwin Project: https://www.intertwin.eu/
#
# Created by: Matteo Bunino
#
# Credit:
# - Matteo Bunino <matteo.bunino@cern.ch> - CERN
# - Anna Lappe <anna.elisa.lappe@cern.ch> - CERN
# -------------------------------------------------------------------------------------
"""
``itwinai`` wrappers for well-known ML loggers.
A logger allows to save objects of different kinds:
.. list-table:: Logger kinds
:widths: 25 25
:header-rows: 1
* - Object ``kind``
- Description
* - ``metric``
- Number, usually representing a ML metric of interest (e.g., loss,
accuracy).
* - ``torch``
- PyTorch object (e.g., tensor).
* - ``artifact``
- File on the local filesystem to be stored by the logger.
* - ``figure``
- Matplotlib of Plotly figure
* - ``image``
- PIL image or numpy array storing an image.
* - ``param``
- | Hyper-parameter (e.g., learning rate, batch size, number of layers)
| as a primitive Python type.
* - ``text``
- Running text (string).
* - ``dict``
- Python dictionary.
* - ``model``
- ML model. At the moment only :class:`torch.nn.Module` is supported.
* - ``best_model``
- Best ML model. At the moment only :class:`torch.nn.Module` is
supported.
* - ``dataset``
- Dataset object (e.g., objects of type :class:`mlflow.data.Dataset`).
* - ``watch``
- | WandB ``watch``: Hook into the torch model to collect gradients and
| the topology. `More info`_.
* - ``flops_pb``
- Flops per batch, used by :class:`~itwinai.loggers.Prov4MLLogger`.
* - ``flops_pb``
- Flops per batch, used by :class:`~itwinai.loggers.Prov4MLLogger`.
* - ``flops_pe``
- Flops per epoch, used by :class:`~itwinai.loggers.Prov4MLLogger`.
* - ``system``
- System metrics, used by :class:`~itwinai.loggers.Prov4MLLogger`.
* - ``carbon``
- Carbon footprint information, used
by :class:`~itwinai.loggers.Prov4MLLogger`.
* - ``execution_time``
- Execution time, used by :class:`~itwinai.loggers.Prov4MLLogger`.
* - ``prov_documents``
- Provenance documents, used by :class:`~itwinai.loggers.Prov4MLLogger`.
.. _More info:
https://docs.wandb.ai/ref/python/watch
"""
import os
import pickle
from abc import ABC, abstractmethod
from contextlib import contextmanager
from pathlib import Path
from typing import Any, Dict, List, Literal, Optional, Tuple, Union
import mlflow
import pandas as pd
import prov4ml
import wandb
from typing_extensions import override
BASE_EXP_NAME: str = "default_experiment"
[docs]
class LogMixin(ABC):
[docs]
@abstractmethod
def log(
self,
item: Union[Any, List[Any]],
identifier: Union[str, List[str]],
kind: str = "metric",
step: Optional[int] = None,
batch_idx: Optional[int] = None,
**kwargs,
) -> None:
"""Log ``item`` with ``identifier`` name of ``kind`` type at ``step``
time step.
Args:
item (Union[Any, List[Any]]): element to be logged (e.g., metric).
identifier (Union[str, List[str]]): unique identifier for the
element to log(e.g., name of a metric).
kind (str, optional): type of the item to be logged. Must be one
among the list of self.supported_kinds. Defaults to 'metric'.
step (Optional[int], optional): logging step. Defaults to None.
batch_idx (Optional[int], optional): DataLoader batch counter
(i.e., batch idx), if available. Defaults to None.
"""
[docs]
class Logger(LogMixin):
"""Base class for logger
Args:
savedir (Union[Path, str], optional): filesystem location where logs are stored.
Defaults to 'mllogs'.
log_freq (Union[int, Literal['epoch', 'batch']], optional):
how often should the logger fulfill calls to the `log()`
method:
- When set to 'epoch', the logger logs only if ``batch_idx``
is not passed to the ``log`` method.
- When an integer
is given, the logger logs if ``batch_idx`` is a multiple of
``log_freq``.
- When set to ``'batch'``, the logger logs always.
Defaults to 'epoch'.
log_on_workers (Optional[Union[int, List[int]]]): if -1, log on all
workers; if int log on worker with rank equal to log_on_workers;
if List[int], log on workers which rank is in the list.
Defaults to 0 (the global rank of the main worker).
"""
#: Location on filesystem where to store data.
savedir: Path
#: Supported logging 'kind's.
supported_kinds: Tuple[str]
#: Current worker global rank
worker_rank: int
_log_freq: Union[int, Literal["epoch", "batch"]]
def __init__(
self,
savedir: Union[Path, str] = "mllogs",
log_freq: Union[int, Literal["epoch", "batch"]] = "epoch",
log_on_workers: Union[int, List[int]] = 0,
experiment_id: Optional[str] = None,
run_id: Optional[Union[int, str]] = None,
) -> None:
self.savedir = Path(savedir)
self.log_freq = log_freq
self.log_on_workers = log_on_workers
self._experiment_id = experiment_id
self._run_id = run_id
@property
def experiment_id(self) -> Optional[str]:
"""Return the experiment name."""
return self._experiment_id
@property
def run_id(self) -> Optional[Union[int, str]]:
"""Return the experiment version."""
return self._run_id
@property
def log_freq(self) -> Union[int, Literal["epoch", "batch"]]:
"""Get ``log_feq``, namely how often should the logger
fulfill or ignore calls to the `log()` method."""
return self._log_freq
@log_freq.setter
def log_freq(self, val: Union[int, Literal["epoch", "batch"]]):
"""Sanitize log_freq value."""
if val in ["epoch", "batch"] or (isinstance(val, int) and val > 0):
self._log_freq = val
else:
raise ValueError(
"Wrong value for 'log_freq'. Supported values are: "
f"['epoch', 'batch'] or int > 0. Received: {val}"
)
[docs]
@contextmanager
def start_logging(self, rank: Optional[int] = None):
"""Start logging context.
Args:
rank (Optional[int]): global rank of current process,
used in distributed environments. Defaults to None.
Example:
>>> with my_logger.start_logging():
>>> my_logger.log(123, 'value', kind='metric', step=0)
"""
try:
self.create_logger_context(rank=rank)
yield
finally:
self.destroy_logger_context()
[docs]
@abstractmethod
def create_logger_context(self, rank: Optional[int] = None) -> Any:
"""Initializes the logger context.
Args:
rank (Optional[int]): global rank of current process,
used in distributed environments. Defaults to None.
"""
[docs]
@abstractmethod
def destroy_logger_context(self) -> None:
"""Destroy logger."""
[docs]
@abstractmethod
def save_hyperparameters(self, params: Dict[str, Any]) -> None:
"""Save hyperparameters.
Args:
params (Dict[str, Any]): hyperparameters dictionary.
"""
[docs]
def serialize(self, obj: Any, identifier: str) -> str:
"""Serializes object to disk and returns its path.
Args:
obj (Any): item to save.
identifier (str): identifier of the item to log (expected to be a
path under ``self.savedir``).
Returns:
str: local path of the serialized object to be logged.
"""
itm_path = self.savedir / identifier
with open(itm_path, "wb") as itm_file:
pickle.dump(obj, itm_file)
[docs]
def should_log(self, batch_idx: Optional[int] = None) -> bool:
"""Determines whether the logger should fulfill or ignore calls to the
`log()` method, depending on the ``log_freq`` property:
- When ``log_freq`` is set to 'epoch', the logger logs only if
``batch_idx`` is not passed to the ``log`` method.
- When ``log_freq`` is an integer
is given, the logger logs if ``batch_idx`` is a multiple of
``log_freq``.
- When ``log_freq`` is set to ``'batch'``, the logger logs always.
It also takes into account whether logging on the current worker
rank is allowed by ``self.log_on_workers``.
Args:
batch_idx (Optional[int]): the dataloader batch idx, if available.
Defaults to None.
Returns:
bool: True if the logger should log, False otherwise.
"""
# Check worker's global rank
worker_ok = (
self.worker_rank is None
or (
isinstance(self.log_on_workers, int)
and (self.log_on_workers == -1 or self.log_on_workers == self.worker_rank)
)
or (
isinstance(self.log_on_workers, list)
and self.worker_rank in self.log_on_workers
)
)
if not worker_ok:
return False
# Check batch ID
if batch_idx is not None:
if isinstance(self.log_freq, int):
if batch_idx % self.log_freq == 0:
return True
return False
if self.log_freq == "batch":
return True
return False
return True
class _EmptyLogger(Logger):
"""Dummy logger which can be used as a placeholder when a real logger is
not available. All methods do nothing.
"""
def __init__(
self,
savedir: Union[Path, str] = "mllogs",
log_freq: int | Literal["epoch"] | Literal["batch"] = "epoch",
log_on_workers: int | List[int] = 0,
) -> None:
super().__init__(savedir, log_freq, log_on_workers)
def create_logger_context(self, rank: Optional[int] = None):
pass
def destroy_logger_context(self):
pass
def save_hyperparameters(self, params: Dict[str, Any]) -> None:
pass
def log(
self,
item: Union[Any, List[Any]],
identifier: Union[str, List[str]],
kind: str = "metric",
step: Optional[int] = None,
batch_idx: Optional[int] = None,
**kwargs,
) -> None:
pass
[docs]
class ConsoleLogger(Logger):
"""Simplified logger.
Args:
savedir (Union[Path, str], optional): where to store artifacts.
Defaults to 'mllogs'.
log_freq (Union[int, Literal['epoch', 'batch']], optional):
determines whether the logger should fulfill or ignore
calls to the `log()` method. See ``Logger.should_log`` method for
more details. Defaults to 'epoch'.
log_on_workers (Optional[Union[int, List[int]]]): if -1, log on all
workers; if int log on worker with rank equal to log_on_workers;
if List[int], log on workers which rank is in the list.
Defaults to 0 (the global rank of the main worker).
"""
#: Supported kinds in the ``log`` method
supported_kinds: Tuple[str] = ("torch", "artifact", "metric")
def __init__(
self,
savedir: Union[Path, str] = "mllogs",
log_freq: Union[int, Literal["epoch", "batch"]] = "epoch",
log_on_workers: Union[int, List[int]] = 0,
) -> None:
cl_savedir = Path(savedir) / "simple-logger"
super().__init__(savedir=cl_savedir, log_freq=log_freq, log_on_workers=log_on_workers)
[docs]
def create_logger_context(self, rank: Optional[int] = None):
"""Initializes the logger context.
Args:
rank (Optional[int]): global rank of current process,
used in distributed environments. Defaults to None.
"""
self.worker_rank = rank
if not self.should_log():
return
if self.savedir.is_dir():
numeric_dirs = [
int(exp_dir.name)
for exp_dir in self.savedir.iterdir()
if exp_dir.is_dir() and exp_dir.name.isdigit()
]
self._experiment_id = max(numeric_dirs) + 1
else:
self._experiment_id = 0
self.run_path = self.savedir / str(self.experiment_id)
self.run_path.mkdir(exist_ok=True, parents=True)
[docs]
def destroy_logger_context(self):
"""Destroy logger. Do nothing."""
[docs]
def save_hyperparameters(self, params: Dict[str, Any]) -> None:
"""Save hyperparameters. Do nothing.
Args:
params (Dict[str, Any]): hyperparameters dictionary.
"""
if not self.should_log():
return
# Save hyperparams
[docs]
def log(
self,
item: Union[Any, List[Any]],
identifier: Union[str, List[str]],
kind: str = "metric",
step: Optional[int] = None,
batch_idx: Optional[int] = None,
**kwargs,
) -> None:
"""Print metrics to stdout and save artifacts to the filesystem.
Args:
item (Union[Any, List[Any]]): element to be logged (e.g., metric).
identifier (Union[str, List[str]]): unique identifier for the
element to log(e.g., name of a metric).
kind (str, optional): type of the item to be logged. Must be
one among the list of ``self.supported_kinds``.
Defaults to 'metric'.
step (Optional[int], optional): logging step. Defaults to None.
batch_idx (Optional[int], optional): DataLoader batch counter
(i.e., batch idx), if available. Defaults to None.
kwargs: keyword arguments to pass to the logger.
"""
if not self.should_log(batch_idx=batch_idx):
return
if kind == "artifact":
import shutil
artifact_dir = self.run_path / "artifacts" / identifier
artifact_dir.mkdir(exist_ok=True, parents=True)
item_path = Path(item)
if item_path.is_file():
target_path = artifact_dir / identifier
shutil.copyfile(item, target_path)
elif item_path.is_dir():
numeric_dirs = [
int(exp_dir.name)
for exp_dir in artifact_dir.iterdir()
if exp_dir.is_dir() and exp_dir.name.isdigit()
]
child_id = max(numeric_dirs) + 1
target_path = artifact_dir / f"{self._experiment_id}.{child_id}"
shutil.copytree(item, target_path, dirs_exist_ok=True)
else:
print(f"INFO: The ConsoleLogger expects an artifact to be either a path \
or a directory. Received instead an item of type {type(item)}. \
The item will be ignored and not logged.")
elif kind == "torch":
import torch
target_path = self.run_path / identifier
torch.save(item, target_path)
print(f"INFO: ConsoleLogger saved to {target_path}...")
elif kind == "metric":
print(f"ConsoleLogger: {identifier} = {item}")
[docs]
class MLFlowLogger(Logger):
"""Abstraction around MLFlow logger.
Args:
savedir (Union[Path, str], optional): path on local filesystem where logs are
stored. Defaults to 'mllogs'.
experiment_name (str, optional): experiment name. Defaults to
``itwinai.loggers.BASE_EXP_NAME``.
tracking_uri (Optional[str], optional): MLFLow tracking URI.
Overrides ``savedir`` if given. Defaults to None.
run_description (Optional[str], optional): run description.
Defaults to None.
run_name (Optional[str], optional): run name.
Defaults to None.
log_freq (Union[int, Literal['epoch', 'batch']], optional):
determines whether the logger should fulfill or ignore
calls to the `log()` method. See ``Logger.should_log`` method for
more details. Defaults to 'epoch'.
log_on_workers (Optional[Union[int, List[int]]]): if -1, log on all
workers; if int log on worker with rank equal to log_on_workers;
if List[int], log on workers which rank is in the list.
Defaults to 0 (the global rank of the main worker).
"""
#: Supported kinds in the ``log`` method
supported_kinds: Tuple[str] = (
"metric",
"figure",
"image",
"artifact",
"torch",
"dict",
"param",
"text",
"model",
"dataset",
)
#: Current MLFLow experiment's run.
active_run: mlflow.ActiveRun
def __init__(
self,
savedir: Union[Path, str] = "mllogs",
experiment_name: str = BASE_EXP_NAME,
tracking_uri: Optional[str] = None,
run_description: Optional[str] = None,
run_name: Optional[str] = None,
log_freq: Union[int, Literal["epoch", "batch"]] = "epoch",
log_on_workers: Union[int, List[int]] = 0,
):
mfl_savedir = Path(savedir) / "mlflow"
super().__init__(savedir=mfl_savedir, log_freq=log_freq, log_on_workers=log_on_workers)
self.tracking_uri = tracking_uri
self.run_description = run_description
self.run_name = run_name
self.experiment_name = experiment_name
self.tracking_uri = (
self.tracking_uri
or os.environ.get("MLFLOW_TRACKING_URI")
or Path(self.savedir).resolve().as_uri()
)
[docs]
def create_logger_context(self, rank: Optional[int] = None) -> mlflow.ActiveRun:
"""Initializes the logger context. Start MLFLow run.
Args:
rank (Optional[int]): global rank of current process,
used in distributed environments. Defaults to None.
Returns:
mlflow.ActiveRun: active MLFlow run.
"""
self.worker_rank = rank
if not self.should_log():
return
active_run = mlflow.active_run()
if active_run:
print("Detected an active MLFlow run. Attaching to it...")
self.active_run = active_run
else:
mlflow.set_tracking_uri(self.tracking_uri)
mlflow.set_experiment(experiment_name=self.experiment_name)
self.active_run: mlflow.ActiveRun = mlflow.start_run(
description=self.run_description, run_name=self.run_name
)
self._run_id = self.active_run.info.run_id
self._experiment_id = self.active_run.info.experiment_id
return self.active_run
[docs]
def destroy_logger_context(self):
"""Destroy logger. End current MLFlow run."""
if not self.should_log():
return
mlflow.end_run()
[docs]
def save_hyperparameters(self, params: Dict[str, Any]) -> None:
"""Save hyperparameters as MLFlow parameters.
Args:
params (Dict[str, Any]): hyperparameters dictionary.
"""
if not self.should_log():
return
for param_name, val in params.items():
self.log(item=val, identifier=param_name, step=0, kind="param")
[docs]
def log(
self,
item: Union[Any, List[Any]],
identifier: Union[str, List[str]],
kind: str = "metric",
step: Optional[int] = None,
batch_idx: Optional[int] = None,
**kwargs,
) -> None:
"""Log with MLFlow.
Args:
item (Union[Any, List[Any]]): element to be logged (e.g., metric).
identifier (Union[str, List[str]]): unique identifier for the
element to log(e.g., name of a metric).
kind (str, optional): type of the item to be logged. Must be
one among the list of ``self.supported_kinds``.
Defaults to 'metric'.
step (Optional[int], optional): logging step. Defaults to None.
batch_idx (Optional[int], optional): DataLoader batch counter
(i.e., batch idx), if available. Defaults to None.
kwargs: keyword arguments to pass to the logger.
"""
if not self.should_log(batch_idx=batch_idx):
return
if kind == "metric":
mlflow.log_metric(key=identifier, value=item, step=step)
elif kind == "artifact":
if not isinstance(item, str):
# Save the object locally and then log it
name = os.path.basename(identifier)
save_path = self.savedir / ".trash" / str(name)
save_path.mkdir(os.path.dirname(save_path), exist_ok=True)
item = self.serialize(item, save_path)
mlflow.log_artifact(local_path=item, artifact_path=identifier)
elif kind == "model":
import torch
if isinstance(item, torch.nn.Module):
mlflow.pytorch.log_model(item, identifier)
else:
print("WARNING: unrecognized model type")
elif kind == "dataset":
# Log mlflow dataset
# https://mlflow.org/docs/latest/python_api/mlflow.html#mlflow.log_input
# It may be needed to convert item into a mlflow dataset, e.g.:
# https://mlflow.org/docs/latest/python_api/mlflow.data.html#mlflow.data.from_pandas
# ATM delegated to the user
if isinstance(item, mlflow.data.Dataset):
mlflow.log_input(item)
else:
print("WARNING: unrecognized dataset type. " "Must be an MLFlow dataset")
elif kind == "torch":
import torch
# Save the object locally and then log it
name = os.path.basename(identifier)
save_path = self.savedir / ".trash" / str(name)
save_path.mkdir(os.path.dirname(save_path), exist_ok=True)
torch.save(item, save_path)
# Log into mlflow
mlflow.log_artifact(local_path=save_path, artifact_path=identifier)
elif kind == "dict":
mlflow.log_dict(dictionary=item, artifact_file=identifier)
elif kind == "figure":
mlflow.log_figure(
artifact_file=identifier,
figure=item,
save_kwargs=kwargs.get("save_kwargs"),
)
elif kind == "image":
mlflow.log_image(artifact_file=identifier, image=item)
elif kind == "param":
mlflow.log_param(key=identifier, value=item)
elif kind == "text":
mlflow.log_text(artifact_file=identifier, text=item)
[docs]
class WandBLogger(Logger):
"""Abstraction around WandB logger.
Args:
savedir (Union[Path, str], optional): location on local filesystem where logs
are stored. Defaults to 'mllogs'.
project_name (str, optional): experiment name. Defaults to
``itwinai.loggers.BASE_EXP_NAME``.
log_freq (Union[int, Literal['epoch', 'batch']], optional):
determines whether the logger should fulfill or ignore
calls to the `log()` method. See ``Logger.should_log`` method for
more details. Defaults to 'epoch'.
log_on_workers (Optional[Union[int, List[int]]]): if -1, log on all
workers; if int log on worker with rank equal to log_on_workers;
if List[int], log on workers which rank is in the list.
Defaults to 0 (the global rank of the main worker).
offline_mode (str, optional): Use this option if working on compute
node without internet access. Saves logs locally.
Defaults to 'False'.
"""
# TODO: add support for artifacts logging
#: Supported kinds in the ``log`` method
supported_kinds: Tuple[str] = (
"watch",
"metric",
"figure",
"image",
"torch",
"dict",
"param",
"text",
)
def __init__(
self,
savedir: Union[Path, str] = "mllogs",
project_name: str = BASE_EXP_NAME,
log_freq: Union[int, Literal["epoch", "batch"]] = "epoch",
log_on_workers: Union[int, List[int]] = 0,
offline_mode: bool = False,
) -> None:
wbl_savedir = Path(savedir) / "wandb"
super().__init__(savedir=wbl_savedir, log_freq=log_freq, log_on_workers=log_on_workers)
self.project_name = project_name
self.offline_mode = offline_mode
[docs]
def create_logger_context(self, rank: Optional[int] = None) -> None:
"""Initializes the logger context. Init WandB run.
Args:
rank (Optional[int]): global rank of current process,
used in distributed environments. Defaults to None.
"""
self.worker_rank = rank
if not self.should_log():
return
(self.savedir / "wandb").mkdir(
exist_ok=True,
parents=True,
)
self.active_run = wandb.init(
dir=self.savedir.resolve(),
project=self.project_name,
mode="offline" if self.offline_mode else "online",
)
[docs]
def destroy_logger_context(self):
"""Destroy logger."""
if not self.should_log():
return
[docs]
def save_hyperparameters(self, params: Dict[str, Any]) -> None:
"""Save hyperparameters.
Args:
params (Dict[str, Any]): hyperparameters dictionary.
"""
if not self.should_log():
return
wandb.config.update(params)
[docs]
def log(
self,
item: Union[Any, List[Any]],
identifier: Union[str, List[str]],
kind: str = "metric",
step: Optional[int] = None,
batch_idx: Optional[int] = None,
**kwargs,
) -> None:
"""Log with WandB. Wrapper of https://docs.wandb.ai/ref/python/log
Args:
item (Union[Any, List[Any]]): element to be logged (e.g., metric).
identifier (Union[str, List[str]]): unique identifier for the
element to log(e.g., name of a metric).
kind (str, optional): type of the item to be logged. Must be
one among the list of ``self.supported_kinds``.
Defaults to 'metric'.
step (Optional[int], optional): ignored by ``WandBLogger``.
batch_idx (Optional[int], optional): DataLoader batch counter
(i.e., batch idx), if available. Defaults to None.
kwargs: keyword arguments to pass to the logger.
"""
if not self.should_log(batch_idx=batch_idx):
return
if kind == "watch":
wandb.watch(item)
elif kind in self.supported_kinds:
# wandb.log({identifier: item}, step=step, commit=True)
# Let WandB use its preferred step
wandb.log({identifier: item}, commit=True)
[docs]
class TensorBoardLogger(Logger):
"""Abstraction around TensorBoard logger, both for PyTorch and
TensorFlow.
Args:
savedir (Union[Path, str], optional): location on local filesystem where logs
are stored. Defaults to 'mllogs'.
log_freq (Union[int, Literal['epoch', 'batch']], optional):
determines whether the logger should fulfill or ignore
calls to the `log()` method. See ``Logger.should_log`` method for
more details. Defaults to 'epoch'.
framework (Literal['tensorflow', 'pytorch'], optional):
whether to log PyTorch or TensorFlow ML data.
Defaults to 'pytorch'.
log_on_workers (Optional[Union[int, List[int]]]): if -1, log on all
workers; if int log on worker with rank equal to log_on_workers;
if List[int], log on workers which rank is in the list.
Defaults to 0 (the global rank of the main worker).
Raises:
ValueError: when ``framework`` is not recognized.
"""
# TODO: decouple the logger into TorchTBLogger and TFTBLogger
# and add the missing logging types supported by each.
#: Supported kinds in the ``log`` method
supported_kinds: Tuple[str] = ("metric", "image", "text", "figure", "torch")
def __init__(
self,
savedir: Union[Path, str] = "mllogs",
log_freq: Union[int, Literal["epoch", "batch"]] = "epoch",
framework: Literal["tensorflow", "pytorch"] = "pytorch",
log_on_workers: Union[int, List[int]] = 0,
) -> None:
tbl_savedir = Path(savedir) / "tensorboard"
super().__init__(savedir=tbl_savedir, log_freq=log_freq, log_on_workers=log_on_workers)
self.framework = framework
if framework.lower() == "tensorflow":
import tensorflow as tf
self.tf = tf
self.writer = tf.summary.create_file_writer(tbl_savedir.resolve().as_posix())
elif framework.lower() == "pytorch":
from torch.utils.tensorboard import SummaryWriter
self.writer = SummaryWriter(tbl_savedir.resolve().as_posix())
else:
raise ValueError("Framework must be either 'tensorflow' or 'pytorch'")
[docs]
def create_logger_context(self, rank: Optional[int] = None) -> None:
"""Initializes the logger context. Init Tensorboard run.
Args:
rank (Optional[int]): global rank of current process,
used in distributed environments. Defaults to None.
"""
self.worker_rank = rank
if not self.should_log():
return
if self.framework == "tensorflow":
self.writer.set_as_default()
[docs]
def destroy_logger_context(self):
"""Destroy logger. Close SummaryWriter."""
if not self.should_log():
return
self.writer.close()
[docs]
def save_hyperparameters(self, params: Dict[str, Any]) -> None:
"""Save hyperparameters.
Args:
params (Dict[str, Any]): hyperparameters dictionary.
"""
if not self.should_log():
return
if self.framework == "tensorflow":
from tensorboard.plugins.hparams import api as hp
hparams = {hp.HParam(k): v for k, v in params.items()}
with self.writer.as_default():
hp.hparams(hparams)
elif self.framework == "pytorch":
self.writer.add_hparams(params, {})
[docs]
def log(
self,
item: Union[Any, List[Any]],
identifier: Union[str, List[str]],
kind: str = "metric",
step: Optional[int] = None,
batch_idx: Optional[int] = None,
**kwargs,
) -> None:
"""Log with Tensorboard.
Args:
item (Union[Any, List[Any]]): element to be logged (e.g., metric).
identifier (Union[str, List[str]]): unique identifier for the
element to log(e.g., name of a metric).
kind (str, optional): type of the item to be logged. Must be
one among the list of ``self.supported_kinds``.
Defaults to 'metric'.
step (Optional[int], optional): logging step. Defaults to None.
batch_idx (Optional[int], optional): DataLoader batch counter
(i.e., batch idx), if available. Defaults to None.
kwargs: keyword arguments to pass to the logger.
"""
if not self.should_log(batch_idx=batch_idx):
return
if self.framework == "tensorflow":
with self.writer.as_default():
if kind == "metric":
self.tf.summary.scalar(identifier, item, step=step)
elif kind == "image":
self.tf.summary.image(identifier, item, step=step)
elif kind == "text":
self.tf.summary.text(identifier, item, step=step)
elif kind == "figure":
self.tf.summary.figure(identifier, item, step=step)
elif self.framework == "pytorch":
if kind == "metric":
self.writer.add_scalar(identifier, item, global_step=step)
elif kind == "image":
self.writer.add_image(identifier, item, global_step=step)
elif kind == "text":
self.writer.add_text(identifier, item, global_step=step)
elif kind == "figure":
self.writer.add_figure(identifier, item, global_step=step)
elif kind == "torch":
self.writer.add_graph(item)
[docs]
class LoggersCollection(Logger):
"""Wrapper of a set of loggers, allowing to use them simultaneously.
Args:
loggers (List[Logger]): list of itwinai loggers.
"""
#: Supported kinds are delegated to the loggers in the collection.
supported_kinds: Tuple[str]
def __init__(self, loggers: List[Logger]) -> None:
super().__init__(savedir=Path("/tmp/mllogs_LoggersCollection"), log_freq=1)
self.loggers = loggers
[docs]
def should_log(self, batch_idx: int = None) -> bool:
"""Transparent method which delegates the `Logger.should_log``
to individual loggers. Always returns True.
Args:
batch_idx (int, optional): dataloader batch index.
Defaults to None.
Returns:
bool: always True.
"""
return True
[docs]
def log(
self,
item: Union[Any, List[Any]],
identifier: Union[str, List[str]],
kind: str = "metric",
step: Optional[int] = None,
batch_idx: Optional[int] = None,
**kwargs,
) -> None:
"""Log on all loggers.
Args:
item (Union[Any, List[Any]]): element to be logged (e.g., metric).
identifier (Union[str, List[str]]): unique identifier for the
element to log(e.g., name of a metric).
kind (str, optional): type of the item to be logged. Must be
one among the list of ``self.supported_kinds``.
Defaults to 'metric'.
step (Optional[int], optional): logging step. Defaults to None.
batch_idx (Optional[int], optional): DataLoader batch counter
(i.e., batch idx), if available. Defaults to None.
kwargs: keyword arguments to pass to the logger.
"""
for logger in self.loggers:
logger.log(
item=item,
identifier=identifier,
kind=kind,
step=step,
batch_idx=batch_idx,
**kwargs,
)
[docs]
def create_logger_context(self, rank: Optional[int] = None) -> Any:
"""Initializes all loggers.
Args:
rank (Optional[int]): global rank of current process,
used in distributed environments. Defaults to None.
"""
for logger in self.loggers:
logger.create_logger_context(rank=rank)
[docs]
def destroy_logger_context(self):
"""Destroy all loggers."""
for logger in self.loggers:
logger.destroy_logger_context()
[docs]
def save_hyperparameters(self, params: Dict[str, Any]) -> None:
"""Save hyperparameters for all loggers.
Args:
params (Dict[str, Any]): hyperparameters dictionary.
"""
for logger in self.loggers:
logger.save_hyperparameters(params=params)
[docs]
class Prov4MLLogger(Logger):
"""
Abstraction around Prov4ML logger.
Args:
prov_user_namespace (str, optional): location to where provenance
files will be uploaded. Defaults to "www.example.org".
experiment_name (str, optional): experiment name.
Defaults to "experiment_name".
provenance_save_dir (Union[Path, str], optional): path where to store provenance
files and logs. Defaults to "prov".
save_after_n_logs (Optional[int], optional): how often to save
logs to disk from main memory. Defaults to 100.
create_graph (Optional[bool], optional): whether to create a
provenance graph. Defaults to True.
create_svg (Optional[bool], optional): whether to create an SVG
representation of the provenance graph. Defaults to True.
log_freq (Union[int, Literal['epoch', 'batch']], optional):
determines whether the logger should fulfill or ignore
calls to the `log()` method. See ``Logger.should_log`` method for
more details. Defaults to 'epoch'.
log_on_workers (Optional[Union[int, List[int]]]): if -1, log on all
workers; if int log on worker with rank equal to log_on_workers;
if List[int], log on workers which rank is in the list.
Defaults to 0 (the global rank of the main worker).
"""
#: Supported kinds in the ``log`` method
supported_kinds: Tuple[str] = (
"metric",
"flops_pb",
"flops_pe",
"system",
"carbon",
"execution_time",
"model",
"best_model",
"torch",
)
def __init__(
self,
prov_user_namespace: str = "www.example.org",
experiment_name: str = "experiment_name",
provenance_save_dir: Union[Path, str] = "mllogs/prov_logs",
save_after_n_logs: Optional[int] = 100,
create_graph: Optional[bool] = True,
create_svg: Optional[bool] = True,
log_freq: Union[int, Literal["epoch", "batch"]] = "epoch",
log_on_workers: Union[int, List[int]] = 0,
) -> None:
super().__init__(
savedir=provenance_save_dir,
log_freq=log_freq,
log_on_workers=log_on_workers,
)
self.prov_user_namespace = prov_user_namespace
self.experiment_name = experiment_name
self.provenance_save_dir = provenance_save_dir
self.save_after_n_logs = save_after_n_logs
self.create_graph = create_graph
self.create_svg = create_svg
[docs]
@override
def create_logger_context(self, rank: Optional[int] = None):
"""Initializes the logger context.
Args:
rank (Optional[int]): global rank of current process,
used in distributed environments. Defaults to None.
"""
self.worker_rank = rank
if not self.should_log():
return
prov4ml.start_run(
prov_user_namespace=self.prov_user_namespace,
experiment_name=self.experiment_name,
provenance_save_dir=self.provenance_save_dir,
save_after_n_logs=self.save_after_n_logs,
# This class will control which workers can log
collect_all_processes=True,
rank=rank,
)
[docs]
@override
def destroy_logger_context(self):
"""
Destroys the logger context.
"""
if not self.should_log():
return
prov4ml.end_run(create_graph=self.create_graph, create_svg=self.create_svg)
[docs]
@override
def save_hyperparameters(self, params: Dict[str, Any]) -> None:
if not self.should_log():
return
# Save hyperparams
for param_name, val in params.items():
prov4ml.log_param(param_name, val)
[docs]
@override
def log(
self,
item: Union[Any, List[Any]],
identifier: Union[str, List[str]],
kind: str = "metric",
step: Optional[int] = None,
batch_idx: Optional[int] = None,
context: Optional[str] = "training",
**kwargs,
) -> None:
"""Logs with Prov4ML.
Args:
item (Union[Any, List[Any]]): element to be logged (e.g., metric).
identifier (Union[str, List[str]]): unique identifier for the
element to log(e.g., name of a metric).
kind (str, optional): type of the item to be logged. Must be
one among the list of ``self.supported_kinds``.
Defaults to 'metric'.
step (Optional[int], optional): logging step. Defaults to None.
batch_idx (Optional[int], optional): DataLoader batch counter
(i.e., batch idx), if available. Defaults to None.
kwargs: keyword arguments to pass to the logger.
"""
if not self.should_log(batch_idx=batch_idx):
return
if kind == "metric":
prov4ml.log_metric(key=identifier, value=item, context=context, step=step)
elif kind == "flops_pb":
model, batch = item
prov4ml.log_flops_per_batch(
identifier, model=model, batch=batch, context=context, step=step
)
elif kind == "flops_pe":
model, dataset = item
prov4ml.log_flops_per_epoch(
identifier, model=model, dataset=dataset, context=context, step=step
)
elif kind == "system":
prov4ml.log_system_metrics(context=context, step=step)
elif kind == "carbon":
prov4ml.log_carbon_metrics(context=context, step=step)
elif kind == "execution_time":
prov4ml.log_current_execution_time(label=identifier, context=context, step=step)
elif kind == "model":
prov4ml.save_model_version(
model=item, model_name=identifier, context=context, step=step
)
elif kind == "best_model":
prov4ml.log_model(
model=item,
model_name=identifier,
log_model_info=True,
log_as_artifact=True,
)
elif kind == "torch":
from torch.utils.data import DataLoader
if isinstance(item, DataLoader):
prov4ml.log_dataset(dataset=item, label=identifier)
else:
prov4ml.log_param(key=identifier, value=item)
elif kind == "prov_documents":
prov_docs = prov4ml.log_provenance_documents(create_graph=True, create_svg=True)
# Upload to MLFlow
if mlflow.active_run() is not None:
for f in prov_docs:
if f:
mlflow.log_artifact(f)
[docs]
class EpochTimeTracker:
"""Tracker for epoch execution time during training."""
def __init__(
self, strategy_name: str, save_path: Path | str, num_nodes: int
) -> None:
if isinstance(save_path, str):
save_path = Path(save_path)
self.save_path: Path = save_path
self.strategy_name = strategy_name
self.num_nodes = num_nodes
self.data = {"epoch_id": [], "time": []}
[docs]
def add_epoch_time(self, epoch_idx: int, time: float) -> None:
"""Add epoch time to data."""
self.data["epoch_id"].append(epoch_idx)
self.data["time"].append(time)
[docs]
def save(self) -> None:
"""Save data to a new CSV file."""
df = pd.DataFrame(self.data)
df["name"] = self.strategy_name
df["nodes"] = self.num_nodes
self.save_path.parent.mkdir(parents=True, exist_ok=True)
df.to_csv(self.save_path, index=False)
print(f"Saving EpochTimeTracking data to '{self.save_path.resolve()}'.")