"""
``itwinai`` wrappers for well-known ML loggers.
A logger allows to save objects of different kinds:
.. list-table:: Logger kinds
:widths: 25 25
:header-rows: 1
* - Object ``kind``
- Description
* - ``metric``
- Number, usually representing a ML metric of interest (e.g., loss,
accuracy).
* - ``torch``
- PyTorch object (e.g., tensor).
* - ``artifact``
- File on the local filesystem to be stored by the logger.
* - ``figure``
- Matplotlib of Plotly figure
* - ``image``
- PIL image or numpy array storing an image.
* - ``param``
- | Hyper-parameter (e.g., learning rate, batch size, number of layers)
| as a primitive Python type.
* - ``text``
- Running text (string).
* - ``dict``
- Python dictionary.
* - ``model``
- ML model. At the moment only :class:`torch.nn.Module` is supported.
* - ``best_model``
- Best ML model. At the moment only :class:`torch.nn.Module` is
supported.
* - ``dataset``
- Dataset object (e.g., objects of type :class:`mlflow.data.Dataset`).
* - ``watch``
- | WandB ``watch``: Hook into the torch model to collect gradients and
| the topology. `More info`_.
* - ``flops_pb``
- Flops per batch, used by :class:`~itwinai.loggers.Prov4MLLogger`.
* - ``flops_pb``
- Flops per batch, used by :class:`~itwinai.loggers.Prov4MLLogger`.
* - ``flops_pe``
- Flops per epoch, used by :class:`~itwinai.loggers.Prov4MLLogger`.
* - ``system``
- System metrics, used by :class:`~itwinai.loggers.Prov4MLLogger`.
* - ``carbon``
- Carbon footprint information, used
by :class:`~itwinai.loggers.Prov4MLLogger`.
* - ``execution_time``
- Execution time, used by :class:`~itwinai.loggers.Prov4MLLogger`.
.. _More info:
https://docs.wandb.ai/ref/python/watch
"""
import csv
import os
import pathlib
import pickle
from abc import ABC, abstractmethod
from contextlib import contextmanager
from typing import Any, Dict, List, Literal, Optional, Tuple, Union
import mlflow
import prov4ml
import wandb
from typing_extensions import override
BASE_EXP_NAME: str = 'default_experiment'
[docs]
class LogMixin(ABC):
[docs]
@abstractmethod
def log(
self,
item: Union[Any, List[Any]],
identifier: Union[str, List[str]],
kind: str = 'metric',
step: Optional[int] = None,
batch_idx: Optional[int] = None,
**kwargs
) -> None:
"""Log ``item`` with ``identifier`` name of ``kind`` type at ``step``
time step.
Args:
item (Union[Any, List[Any]]): element to be logged (e.g., metric).
identifier (Union[str, List[str]]): unique identifier for the
element to log(e.g., name of a metric).
kind (str, optional): type of the item to be logged. Must be one
among the list of self.supported_kinds. Defaults to 'metric'.
step (Optional[int], optional): logging step. Defaults to None.
batch_idx (Optional[int], optional): DataLoader batch counter
(i.e., batch idx), if available. Defaults to None.
"""
[docs]
class Logger(LogMixin):
"""Base class for logger
Args:
savedir (str, optional): filesystem location where logs are stored.
Defaults to 'mllogs'.
log_freq (Union[int, Literal['epoch', 'batch']], optional):
how often should the logger fulfill calls to the `log()`
method:
- When set to 'epoch', the logger logs only if ``batch_idx``
is not passed to the ``log`` method.
- When an integer
is given, the logger logs if ``batch_idx`` is a multiple of
``log_freq``.
- When set to ``'batch'``, the logger logs always.
Defaults to 'epoch'.
log_on_workers (Optional[Union[int, List[int]]]): if -1, log on all
workers; if int log on worker with rank equal to log_on_workers;
if List[int], log on workers which rank is in the list.
Defaults to 0 (the global rank of the main worker).
"""
#: Location on filesystem where to store data.
savedir: str = None
#: Supported logging 'kind's.
supported_kinds: Tuple[str]
#: Current worker global rank
worker_rank: int
_log_freq: Union[int, Literal['epoch', 'batch']]
def __init__(
self,
savedir: str = 'mllogs',
log_freq: Union[int, Literal['epoch', 'batch']] = 'epoch',
log_on_workers: Union[int, List[int]] = 0
) -> None:
self.savedir = savedir
self.log_freq = log_freq
self.log_on_workers = log_on_workers
@property
def log_freq(self) -> Union[int, Literal['epoch', 'batch']]:
"""Get ``log_feq``, namely how often should the logger
fulfill or ignore calls to the `log()` method."""
return self._log_freq
@log_freq.setter
def log_freq(self, val: Union[int, Literal['epoch', 'batch']]):
"""Sanitize log_freq value."""
if val in ['epoch', 'batch'] or (isinstance(val, int) and val > 0):
self._log_freq = val
else:
raise ValueError(
"Wrong value for 'log_freq'. Supported values are: "
f"['epoch', 'batch'] or int > 0. Received: {val}"
)
[docs]
@contextmanager
def start_logging(self, rank: Optional[int] = None):
"""Start logging context.
Args:
rank (Optional[int]): global rank of current process,
used in distributed environments. Defaults to None.
Example:
>>> with my_logger.start_logging():
>>> my_logger.log(123, 'value', kind='metric', step=0)
"""
try:
self.create_logger_context(rank=rank)
yield
finally:
self.destroy_logger_context()
[docs]
@abstractmethod
def create_logger_context(self, rank: Optional[int] = None) -> Any:
"""
Initializes the logger context.
Args:
rank (Optional[int]): global rank of current process,
used in distributed environments. Defaults to None.
"""
[docs]
@abstractmethod
def destroy_logger_context(self) -> None:
"""Destroy logger."""
[docs]
@abstractmethod
def save_hyperparameters(self, params: Dict[str, Any]) -> None:
"""Save hyperparameters.
Args:
params (Dict[str, Any]): hyperparameters dictionary.
"""
[docs]
def serialize(self, obj: Any, identifier: str) -> str:
"""Serializes object to disk and returns its path.
Args:
obj (Any): item to save.
identifier (str): identifier of the item to log (expected to be a
path under ``self.savedir``).
Returns:
str: local path of the serialized object to be logged.
"""
itm_path = os.path.join(self.savedir, identifier)
with open(itm_path, 'wb') as itm_file:
pickle.dump(obj, itm_file)
[docs]
def should_log(
self,
batch_idx: Optional[int] = None
) -> bool:
"""Determines whether the logger should fulfill or ignore calls to the
`log()` method, depending on the ``log_freq`` property:
- When ``log_freq`` is set to 'epoch', the logger logs only if
``batch_idx`` is not passed to the ``log`` method.
- When ``log_freq`` is an integer
is given, the logger logs if ``batch_idx`` is a multiple of
``log_freq``.
- When ``log_freq`` is set to ``'batch'``, the logger logs always.
It also takes into account whether logging on the current worker
rank is allowed by ``self.log_on_workers``.
Args:
batch_idx (Optional[int]): the dataloader batch idx, if available.
Defaults to None.
Returns:
bool: True if the logger should log, False otherwise.
"""
# Check worker's global rank
worker_ok = (
self.worker_rank is None or
(isinstance(self.log_on_workers, int) and (
self.log_on_workers == -1 or
self.log_on_workers == self.worker_rank
)
)
or
(isinstance(self.log_on_workers, list)
and self.worker_rank in self.log_on_workers)
)
if not worker_ok:
return False
# Check batch ID
if batch_idx is not None:
if isinstance(self.log_freq, int):
if batch_idx % self.log_freq == 0:
return True
return False
if self.log_freq == 'batch':
return True
return False
return True
class _EmptyLogger(Logger):
"""Dummy logger which can be used as a placeholder when a real logger is
not available. All methods do nothing.
"""
def __init__(
self,
savedir: str = 'mllogs',
log_freq: int | Literal['epoch'] | Literal['batch'] = 'epoch',
log_on_workers: int | List[int] = 0
) -> None:
super().__init__(savedir, log_freq, log_on_workers)
def create_logger_context(self, rank: Optional[int] = None):
pass
def destroy_logger_context(self):
pass
def save_hyperparameters(self, params: Dict[str, Any]) -> None:
pass
def log(
self,
item: Union[Any, List[Any]],
identifier: Union[str, List[str]],
kind: str = 'metric',
step: Optional[int] = None,
batch_idx: Optional[int] = None,
**kwargs
) -> None:
pass
[docs]
class ConsoleLogger(Logger):
"""Simplified logger.
Args:
savedir (str, optional): where to store artifacts.
Defaults to 'mllogs'.
log_freq (Union[int, Literal['epoch', 'batch']], optional):
determines whether the logger should fulfill or ignore
calls to the `log()` method. See ``Logger.should_log`` method for
more details. Defaults to 'epoch'.
log_on_workers (Optional[Union[int, List[int]]]): if -1, log on all
workers; if int log on worker with rank equal to log_on_workers;
if List[int], log on workers which rank is in the list.
Defaults to 0 (the global rank of the main worker).
"""
#: Supported kinds in the ``log`` method
supported_kinds: Tuple[str] = ('torch', 'artifact', 'metric')
def __init__(
self,
savedir: str = 'mllogs',
log_freq: Union[int, Literal['epoch', 'batch']] = 'epoch',
log_on_workers: Union[int, List[int]] = 0
) -> None:
savedir = os.path.join(savedir, 'simple-logger')
super().__init__(
savedir=savedir,
log_freq=log_freq,
log_on_workers=log_on_workers
)
[docs]
def create_logger_context(self, rank: Optional[int] = None):
"""
Initializes the logger context.
Args:
rank (Optional[int]): global rank of current process,
used in distributed environments. Defaults to None.
"""
self.worker_rank = rank
if not self.should_log():
return
os.makedirs(self.savedir, exist_ok=True)
run_dirs = sorted([int(dir) for dir in os.listdir(self.savedir)])
if len(run_dirs) == 0:
self.run_id = 0
else:
self.run_id = int(run_dirs[-1]) + 1
self.run_path = os.path.join(self.savedir, str(self.run_id))
os.makedirs(self.run_path)
[docs]
def destroy_logger_context(self):
"""Destroy logger. Do nothing."""
[docs]
def save_hyperparameters(self, params: Dict[str, Any]) -> None:
"""Save hyperparameters. Do nothing.
Args:
params (Dict[str, Any]): hyperparameters dictionary.
"""
if not self.should_log():
return
# Save hyperparams
[docs]
def log(
self,
item: Union[Any, List[Any]],
identifier: Union[str, List[str]],
kind: str = 'metric',
step: Optional[int] = None,
batch_idx: Optional[int] = None,
**kwargs
) -> None:
"""Print metrics to stdout and save artifacts to the filesystem.
Args:
item (Union[Any, List[Any]]): element to be logged (e.g., metric).
identifier (Union[str, List[str]]): unique identifier for the
element to log(e.g., name of a metric).
kind (str, optional): type of the item to be logged. Must be
one among the list of ``self.supported_kinds``.
Defaults to 'metric'.
step (Optional[int], optional): logging step. Defaults to None.
batch_idx (Optional[int], optional): DataLoader batch counter
(i.e., batch idx), if available. Defaults to None.
kwargs: keyword arguments to pass to the logger.
"""
if not self.should_log(batch_idx=batch_idx):
return
if kind == 'artifact':
if isinstance(item, str) and os.path.isfile(item):
import shutil
identifier = os.path.join(
self.run_path,
identifier
)
if len(os.path.dirname(identifier)) > 0:
os.makedirs(os.path.dirname(identifier), exist_ok=True)
print(f"ConsoleLogger: Serializing to {identifier}...")
shutil.copyfile(item, identifier)
else:
identifier = os.path.join(
os.path.basename(self.run_path),
identifier
)
print(f"ConsoleLogger: Serializing to {identifier}...")
self.serialize(item, identifier)
elif kind == 'torch':
identifier = os.path.join(self.run_path, identifier)
print(f"ConsoleLogger: Saving to {identifier}...")
import torch
torch.save(item, identifier)
else:
print(f"ConsoleLogger: {identifier} = {item}")
[docs]
class MLFlowLogger(Logger):
"""Abstraction around MLFlow logger.
Args:
savedir (str, optional): path on local filesystem where logs are
stored. Defaults to 'mllogs'.
experiment_name (str, optional): experiment name. Defaults to
``itwinai.loggers.BASE_EXP_NAME``.
tracking_uri (Optional[str], optional): MLFLow tracking URI.
Overrides ``savedir`` if given. Defaults to None.
run_description (Optional[str], optional): run description.
Defaults to None.
log_freq (Union[int, Literal['epoch', 'batch']], optional):
determines whether the logger should fulfill or ignore
calls to the `log()` method. See ``Logger.should_log`` method for
more details. Defaults to 'epoch'.
log_on_workers (Optional[Union[int, List[int]]]): if -1, log on all
workers; if int log on worker with rank equal to log_on_workers;
if List[int], log on workers which rank is in the list.
Defaults to 0 (the global rank of the main worker).
"""
#: Supported kinds in the ``log`` method
supported_kinds: Tuple[str] = (
'metric', 'figure', 'image', 'artifact', 'torch', 'dict', 'param',
'text', 'model', 'dataset')
#: Current MLFLow experiment's run.
active_run: mlflow.ActiveRun
def __init__(
self,
savedir: str = 'mllogs',
experiment_name: str = BASE_EXP_NAME,
tracking_uri: Optional[str] = None,
run_description: Optional[str] = None,
log_freq: Union[int, Literal['epoch', 'batch']] = 'epoch',
log_on_workers: Union[int, List[int]] = 0
):
savedir = os.path.join(savedir, 'mlflow')
super().__init__(
savedir=savedir,
log_freq=log_freq,
log_on_workers=log_on_workers
)
self.experiment_name = experiment_name
self.tracking_uri = tracking_uri
self.run_description = run_description
if self.tracking_uri is None:
# Default MLFLow tracking URI
saved_abs_path = os.path.abspath(self.savedir)
self.tracking_uri = pathlib.Path(saved_abs_path).as_uri()
# TODO: for pytorch lightning:
# mlflow.pytorch.autolog()
[docs]
def create_logger_context(
self,
rank: Optional[int] = None
) -> mlflow.ActiveRun:
"""
Initializes the logger context. Start MLFLow run.
Args:
rank (Optional[int]): global rank of current process,
used in distributed environments. Defaults to None.
Returns:
mlflow.ActiveRun: active MLFlow run.
"""
self.worker_rank = rank
if not self.should_log():
return
active_run = mlflow.active_run()
if active_run:
print("Detected an active MLFlow run. Attaching to it...")
self.active_run = active_run
else:
mlflow.set_tracking_uri(self.tracking_uri)
mlflow.set_experiment(experiment_name=self.experiment_name)
self.active_run: mlflow.ActiveRun = mlflow.start_run(
description=self.run_description
)
return self.active_run
[docs]
def destroy_logger_context(self):
"""Destroy logger. End current MLFlow run."""
if not self.should_log():
return
mlflow.end_run()
[docs]
def save_hyperparameters(self, params: Dict[str, Any]) -> None:
"""Save hyperparameters as MLFlow parameters.
Args:
params (Dict[str, Any]): hyperparameters dictionary.
"""
if not self.should_log():
return
for param_name, val in params.items():
self.log(item=val, identifier=param_name, step=0, kind='param')
[docs]
def log(
self,
item: Union[Any, List[Any]],
identifier: Union[str, List[str]],
kind: str = 'metric',
step: Optional[int] = None,
batch_idx: Optional[int] = None,
**kwargs
) -> None:
"""Log with MLFlow.
Args:
item (Union[Any, List[Any]]): element to be logged (e.g., metric).
identifier (Union[str, List[str]]): unique identifier for the
element to log(e.g., name of a metric).
kind (str, optional): type of the item to be logged. Must be
one among the list of ``self.supported_kinds``.
Defaults to 'metric'.
step (Optional[int], optional): logging step. Defaults to None.
batch_idx (Optional[int], optional): DataLoader batch counter
(i.e., batch idx), if available. Defaults to None.
kwargs: keyword arguments to pass to the logger.
"""
if not self.should_log(batch_idx=batch_idx):
return
if kind == 'metric':
# if isinstance(item, list) and isinstance(identifier, list):
mlflow.log_metric(
key=identifier,
value=item,
step=step
)
if kind == 'artifact':
if not isinstance(item, str):
# Save the object locally and then log it
name = os.path.basename(identifier)
save_path = os.path.join(self.savedir, '.trash', name)
os.makedirs(os.path.dirname(save_path), exist_ok=True)
item = self.serialize(item, save_path)
mlflow.log_artifact(
local_path=item,
artifact_path=identifier
)
if kind == 'model':
import torch
if isinstance(item, torch.nn.Module):
mlflow.pytorch.log_model(item, identifier)
else:
print("WARNING: unrecognized model type")
if kind == 'dataset':
# Log mlflow dataset
# https://mlflow.org/docs/latest/python_api/mlflow.html#mlflow.log_input
# It may be needed to convert item into a mlflow dataset, e.g.:
# https://mlflow.org/docs/latest/python_api/mlflow.data.html#mlflow.data.from_pandas
# ATM delegated to the user
if isinstance(item, mlflow.data.Dataset):
mlflow.log_input(item)
else:
print("WARNING: unrecognized dataset type. "
"Must be an MLFlow dataset")
if kind == 'torch':
import torch
# Save the object locally and then log it
name = os.path.basename(identifier)
save_path = os.path.join(self.savedir, '.trash', name)
os.makedirs(os.path.dirname(save_path), exist_ok=True)
torch.save(item, save_path)
# Log into mlflow
mlflow.log_artifact(
local_path=save_path,
artifact_path=identifier
)
if kind == 'dict':
mlflow.log_dict(
dictionary=item,
artifact_file=identifier
)
if kind == 'figure':
mlflow.log_figure(
artifact_file=identifier,
figure=item,
save_kwargs=kwargs.get('save_kwargs')
)
if kind == 'image':
mlflow.log_image(
artifact_file=identifier,
image=item
)
if kind == 'param':
mlflow.log_param(
key=identifier,
value=item
)
if kind == 'text':
mlflow.log_text(
artifact_file=identifier,
text=item
)
[docs]
class WandBLogger(Logger):
"""Abstraction around WandB logger.
Args:
savedir (str, optional): location on local filesystem where logs
are stored. Defaults to 'mllogs'.
project_name (str, optional): experiment name. Defaults to
``itwinai.loggers.BASE_EXP_NAME``.
log_freq (Union[int, Literal['epoch', 'batch']], optional):
determines whether the logger should fulfill or ignore
calls to the `log()` method. See ``Logger.should_log`` method for
more details. Defaults to 'epoch'.
log_on_workers (Optional[Union[int, List[int]]]): if -1, log on all
workers; if int log on worker with rank equal to log_on_workers;
if List[int], log on workers which rank is in the list.
Defaults to 0 (the global rank of the main worker).
"""
# TODO: add support for artifacts logging
#: Supported kinds in the ``log`` method
supported_kinds: Tuple[str] = (
'watch', 'metric', 'figure', 'image', 'torch', 'dict',
'param', 'text')
def __init__(
self,
savedir: str = 'mllogs',
project_name: str = BASE_EXP_NAME,
log_freq: Union[int, Literal['epoch', 'batch']] = 'epoch',
log_on_workers: Union[int, List[int]] = 0
) -> None:
savedir = os.path.join(savedir, 'wandb')
super().__init__(
savedir=savedir,
log_freq=log_freq,
log_on_workers=log_on_workers
)
self.project_name = project_name
[docs]
def create_logger_context(self, rank: Optional[int] = None) -> None:
"""
Initializes the logger context. Init WandB run.
Args:
rank (Optional[int]): global rank of current process,
used in distributed environments. Defaults to None.
"""
self.worker_rank = rank
if not self.should_log():
return
os.makedirs(os.path.join(self.savedir, 'wandb'), exist_ok=True)
self.active_run = wandb.init(
dir=os.path.abspath(self.savedir),
project=self.project_name
)
[docs]
def destroy_logger_context(self):
"""Destroy logger."""
if not self.should_log():
return
[docs]
def save_hyperparameters(self, params: Dict[str, Any]) -> None:
"""Save hyperparameters.
Args:
params (Dict[str, Any]): hyperparameters dictionary.
"""
if not self.should_log():
return
wandb.config.update(params)
[docs]
def log(
self,
item: Union[Any, List[Any]],
identifier: Union[str, List[str]],
kind: str = 'metric',
step: Optional[int] = None,
batch_idx: Optional[int] = None,
**kwargs
) -> None:
"""Log with WandB. Wrapper of https://docs.wandb.ai/ref/python/log
Args:
item (Union[Any, List[Any]]): element to be logged (e.g., metric).
identifier (Union[str, List[str]]): unique identifier for the
element to log(e.g., name of a metric).
kind (str, optional): type of the item to be logged. Must be
one among the list of ``self.supported_kinds``.
Defaults to 'metric'.
step (Optional[int], optional): ignored by ``WandBLogger``.
batch_idx (Optional[int], optional): DataLoader batch counter
(i.e., batch idx), if available. Defaults to None.
kwargs: keyword arguments to pass to the logger.
"""
if not self.should_log(batch_idx=batch_idx):
return
if kind == 'watch':
wandb.watch(item)
elif kind in self.supported_kinds:
# wandb.log({identifier: item}, step=step, commit=True)
# Let WandB use its preferred step
wandb.log({identifier: item}, commit=True)
[docs]
class TensorBoardLogger(Logger):
"""Abstraction around TensorBoard logger, both for PyTorch and
TensorFlow.
Args:
savedir (str, optional): location on local filesystem where logs
are stored. Defaults to 'mllogs'.
log_freq (Union[int, Literal['epoch', 'batch']], optional):
determines whether the logger should fulfill or ignore
calls to the `log()` method. See ``Logger.should_log`` method for
more details. Defaults to 'epoch'.
framework (Literal['tensorflow', 'pytorch'], optional):
whether to log PyTorch or TensorFlow ML data.
Defaults to 'pytorch'.
log_on_workers (Optional[Union[int, List[int]]]): if -1, log on all
workers; if int log on worker with rank equal to log_on_workers;
if List[int], log on workers which rank is in the list.
Defaults to 0 (the global rank of the main worker).
Raises:
ValueError: when ``framework`` is not recognized.
"""
# TODO: decouple the logger into TorchTBLogger and TFTBLogger
# and add the missing logging types supported by each.
#: Supported kinds in the ``log`` method
supported_kinds: Tuple[str] = (
'metric', 'image', 'text', 'figure', 'torch')
def __init__(
self,
savedir: str = 'mllogs',
log_freq: Union[int, Literal['epoch', 'batch']] = 'epoch',
framework: Literal['tensorflow', 'pytorch'] = 'pytorch',
log_on_workers: Union[int, List[int]] = 0
) -> None:
savedir = os.path.join(savedir, 'tensorboard')
super().__init__(
savedir=savedir,
log_freq=log_freq,
log_on_workers=log_on_workers
)
self.framework = framework
if framework.lower() == 'tensorflow':
import tensorflow as tf
self.tf = tf
self.writer = tf.summary.create_file_writer(savedir)
elif framework.lower() == 'pytorch':
from torch.utils.tensorboard import SummaryWriter
self.writer = SummaryWriter(savedir)
else:
raise ValueError(
"Framework must be either 'tensorflow' or 'pytorch'")
[docs]
def create_logger_context(self, rank: Optional[int] = None) -> None:
"""
Initializes the logger context. Init Tensorboard run.
Args:
rank (Optional[int]): global rank of current process,
used in distributed environments. Defaults to None.
"""
self.worker_rank = rank
if not self.should_log():
return
if self.framework == 'tensorflow':
self.writer.set_as_default()
[docs]
def destroy_logger_context(self):
"""Destroy logger. Close SummaryWriter."""
if not self.should_log():
return
self.writer.close()
[docs]
def save_hyperparameters(self, params: Dict[str, Any]) -> None:
"""Save hyperparameters.
Args:
params (Dict[str, Any]): hyperparameters dictionary.
"""
if not self.should_log():
return
if self.framework == 'tensorflow':
from tensorboard.plugins.hparams import api as hp
hparams = {hp.HParam(k): v for k, v in params.items()}
with self.writer.as_default():
hp.hparams(hparams)
elif self.framework == 'pytorch':
self.writer.add_hparams(params, {})
[docs]
def log(
self,
item: Union[Any, List[Any]],
identifier: Union[str, List[str]],
kind: str = 'metric',
step: Optional[int] = None,
batch_idx: Optional[int] = None,
**kwargs
) -> None:
"""Log with Tensorboard.
Args:
item (Union[Any, List[Any]]): element to be logged (e.g., metric).
identifier (Union[str, List[str]]): unique identifier for the
element to log(e.g., name of a metric).
kind (str, optional): type of the item to be logged. Must be
one among the list of ``self.supported_kinds``.
Defaults to 'metric'.
step (Optional[int], optional): logging step. Defaults to None.
batch_idx (Optional[int], optional): DataLoader batch counter
(i.e., batch idx), if available. Defaults to None.
kwargs: keyword arguments to pass to the logger.
"""
if not self.should_log(batch_idx=batch_idx):
return
if self.framework == 'tensorflow':
with self.writer.as_default():
if kind == 'metric':
self.tf.summary.scalar(identifier, item, step=step)
elif kind == 'image':
self.tf.summary.image(identifier, item, step=step)
elif kind == 'text':
self.tf.summary.text(identifier, item, step=step)
elif kind == 'figure':
self.tf.summary.figure(identifier, item, step=step)
elif self.framework == 'pytorch':
if kind == 'metric':
self.writer.add_scalar(identifier, item, global_step=step)
elif kind == 'image':
self.writer.add_image(identifier, item, global_step=step)
elif kind == 'text':
self.writer.add_text(identifier, item, global_step=step)
elif kind == 'figure':
self.writer.add_figure(identifier, item, global_step=step)
elif kind == 'torch':
self.writer.add_graph(item)
[docs]
class LoggersCollection(Logger):
"""Wrapper of a set of loggers, allowing to use them simultaneously.
Args:
loggers (List[Logger]): list of itwinai loggers.
"""
#: Supported kinds are delegated to the loggers in the collection.
supported_kinds: Tuple[str]
def __init__(
self,
loggers: List[Logger]
) -> None:
super().__init__(savedir='/tmp/mllogs_LoggersCollection', log_freq=1)
self.loggers = loggers
[docs]
def should_log(self, batch_idx: int = None) -> bool:
"""Transparent method which delegates the `Logger.should_log``
to individual loggers. Always returns True.
Args:
batch_idx (int, optional): dataloader batch index.
Defaults to None.
Returns:
bool: always True.
"""
return True
[docs]
def log(
self,
item: Union[Any, List[Any]],
identifier: Union[str, List[str]],
kind: str = 'metric',
step: Optional[int] = None,
batch_idx: Optional[int] = None,
**kwargs
) -> None:
"""Log on all loggers.
Args:
item (Union[Any, List[Any]]): element to be logged (e.g., metric).
identifier (Union[str, List[str]]): unique identifier for the
element to log(e.g., name of a metric).
kind (str, optional): type of the item to be logged. Must be
one among the list of ``self.supported_kinds``.
Defaults to 'metric'.
step (Optional[int], optional): logging step. Defaults to None.
batch_idx (Optional[int], optional): DataLoader batch counter
(i.e., batch idx), if available. Defaults to None.
kwargs: keyword arguments to pass to the logger.
"""
for logger in self.loggers:
logger.log(
item=item,
identifier=identifier,
kind=kind,
step=step,
batch_idx=batch_idx,
**kwargs
)
[docs]
def create_logger_context(self, rank: Optional[int] = None) -> Any:
"""
Initializes all loggers.
Args:
rank (Optional[int]): global rank of current process,
used in distributed environments. Defaults to None.
"""
for logger in self.loggers:
logger.create_logger_context(rank=rank)
[docs]
def destroy_logger_context(self):
"""Destroy all loggers."""
for logger in self.loggers:
logger.destroy_logger_context()
[docs]
def save_hyperparameters(self, params: Dict[str, Any]) -> None:
"""Save hyperparameters for all loggers.
Args:
params (Dict[str, Any]): hyperparameters dictionary.
"""
for logger in self.loggers:
logger.save_hyperparameters(params=params)
[docs]
class Prov4MLLogger(Logger):
"""
Abstraction around Prov4ML logger.
Args:
prov_user_namespace (str, optional): location to where provenance
files will be uploaded. Defaults to "www.example.org".
experiment_name (str, optional): experiment name.
Defaults to "experiment_name".
provenance_save_dir (str, optional): path where to store provenance
files and logs. Defaults to "prov".
save_after_n_logs (Optional[int], optional): how often to save
logs to disk from main memory. Defaults to 100.
create_graph (Optional[bool], optional): whether to create a
provenance graph. Defaults to True.
create_svg (Optional[bool], optional): whether to create an SVG
representation of the provenance graph. Defaults to True.
log_freq (Union[int, Literal['epoch', 'batch']], optional):
determines whether the logger should fulfill or ignore
calls to the `log()` method. See ``Logger.should_log`` method for
more details. Defaults to 'epoch'.
log_on_workers (Optional[Union[int, List[int]]]): if -1, log on all
workers; if int log on worker with rank equal to log_on_workers;
if List[int], log on workers which rank is in the list.
Defaults to 0 (the global rank of the main worker).
"""
#: Supported kinds in the ``log`` method
supported_kinds: Tuple[str] = (
'metric', 'flops_pb', 'flops_pe', 'system', 'carbon',
'execution_time', 'model', 'best_model',
'torch')
def __init__(
self,
prov_user_namespace="www.example.org",
experiment_name="experiment_name",
provenance_save_dir="mllogs/prov_logs",
save_after_n_logs: Optional[int] = 100,
create_graph: Optional[bool] = True,
create_svg: Optional[bool] = True,
log_freq: Union[int, Literal['epoch', 'batch']] = 'epoch',
log_on_workers: Union[int, List[int]] = 0
) -> None:
super().__init__(
savedir=provenance_save_dir,
log_freq=log_freq,
log_on_workers=log_on_workers
)
self.name = experiment_name
self.version = None
self.prov_user_namespace = prov_user_namespace
self.provenance_save_dir = provenance_save_dir
self.save_after_n_logs = save_after_n_logs
self.create_graph = create_graph
self.create_svg = create_svg
[docs]
@override
def create_logger_context(self, rank: Optional[int] = None):
"""
Initializes the logger context.
Args:
rank (Optional[int]): global rank of current process,
used in distributed environments. Defaults to None.
"""
self.worker_rank = rank
if not self.should_log():
return
prov4ml.start_run(
prov_user_namespace=self.prov_user_namespace,
experiment_name=self.name,
provenance_save_dir=self.provenance_save_dir,
save_after_n_logs=self.save_after_n_logs,
# This class will control which workers can log
collect_all_processes=True,
rank=rank
)
[docs]
@override
def destroy_logger_context(self):
"""
Destroys the logger context.
"""
if not self.should_log():
return
prov4ml.end_run(
create_graph=self.create_graph,
create_svg=self.create_svg)
[docs]
@override
def save_hyperparameters(self, params: Dict[str, Any]) -> None:
if not self.should_log():
return
# Save hyperparams
for param_name, val in params.items():
prov4ml.log_param(param_name, val)
[docs]
@override
def log(
self,
item: Union[Any, List[Any]],
identifier: Union[str, List[str]],
kind: str = 'metric',
step: Optional[int] = None,
batch_idx: Optional[int] = None,
context: Optional[str] = 'training',
**kwargs
) -> None:
"""Logs with Prov4ML.
Args:
item (Union[Any, List[Any]]): element to be logged (e.g., metric).
identifier (Union[str, List[str]]): unique identifier for the
element to log(e.g., name of a metric).
kind (str, optional): type of the item to be logged. Must be
one among the list of ``self.supported_kinds``.
Defaults to 'metric'.
step (Optional[int], optional): logging step. Defaults to None.
batch_idx (Optional[int], optional): DataLoader batch counter
(i.e., batch idx), if available. Defaults to None.
kwargs: keyword arguments to pass to the logger.
"""
if not self.should_log(batch_idx=batch_idx):
return
if kind == "metric":
prov4ml.log_metric(key=identifier, value=item,
context=context, step=step)
elif kind == "flops_pb":
model, batch = item
prov4ml.log_flops_per_batch(
identifier, model=model,
batch=batch, context=context, step=step)
elif kind == "flops_pe":
model, dataset = item
prov4ml.log_flops_per_epoch(
identifier, model=model,
dataset=dataset, context=context, step=step)
elif kind == "system":
prov4ml.log_system_metrics(context=context, step=step)
elif kind == "carbon":
prov4ml.log_carbon_metrics(context=context, step=step)
elif kind == "execution_time":
prov4ml.log_current_execution_time(
label=identifier, context=context, step=step)
elif kind == 'model':
prov4ml.save_model_version(
model=item, model_name=identifier, context=context, step=step)
elif kind == 'best_model':
prov4ml.log_model(model=item, model_name=identifier,
log_model_info=True, log_as_artifact=True)
elif kind == 'torch':
from torch.utils.data import DataLoader
if isinstance(item, DataLoader):
prov4ml.log_dataset(dataset=item, label=identifier)
else:
prov4ml.log_param(key=identifier, value=item)
[docs]
class EpochTimeTracker:
"""Profiler for epoch execution time used to support scaling tests.
It uses CSV files to store, for each epoch, the ``name`` of the
experiment, the number of compute ``nodes`` used, the ``epoch_id``,
and the execution ``time`` in seconds.
Args:
series_name (str): name of the experiment/job.
csv_file (str): path to CSV file to store experiments times.
"""
def __init__(self, series_name: str, csv_file: str) -> None:
self.series_name = series_name
self._data = []
self.csv_file = csv_file
with open(csv_file, 'w') as csvfile:
csvwriter = csv.writer(csvfile)
csvwriter.writerow(['name', 'nodes', 'epoch_id', 'time'])
[docs]
def add_epoch_time(self, epoch_idx: int, time: float) -> None:
"""Add row to the current experiment's CSV file in append mode.
Args:
epoch_idx (int): epoch order idx.
time (float): epoch execution time (seconds).
"""
n_nodes = os.environ.get('SLURM_NNODES', -1)
fields = (self.series_name, n_nodes, epoch_idx, time)
self._data.append(fields)
with open(self.csv_file, 'a') as csvfile:
csvwriter = csv.writer(csvfile)
csvwriter.writerow(fields)
[docs]
def save(self, csv_file: Optional[str] = None) -> None:
"""Save data to a new CSV file.
Args:
csv_file (Optional[str], optional): path to the CSV file.
If not given, uses the one given in the constructor.
Defaults to None.
"""
if not csv_file:
csv_file = self.csv_file
with open(csv_file, 'w') as csvfile:
csvwriter = csv.writer(csvfile)
csvwriter.writerow(['name', 'nodes', 'epoch_id', 'time'])
csvwriter.writerows(self._data)