"""Abstraction layer for loggers."""
import os
import csv
from abc import ABCMeta, abstractmethod
from contextlib import contextmanager
from typing import Any, Dict, List, Optional, Union, Literal
import pickle
import pathlib
import wandb
import mlflow
BASE_EXP_NAME: str = 'default_experiment'
[docs]
class LogMixin(metaclass=ABCMeta):
[docs]
@abstractmethod
def log(
self,
item: Union[Any, List[Any]],
identifier: Union[str, List[str]],
kind: str = 'metric',
step: Optional[int] = None,
batch_idx: Optional[int] = None,
**kwargs
) -> None:
"""Log ``item`` with ``identifier`` name of ``kind`` type at ``step``
time step.
Args:
item (Union[Any, List[Any]]): element to be logged (e.g., metric).
identifier (Union[str, List[str]]): unique identifier for the
element to log(e.g., name of a metric).
kind (str, optional): type of the item to be logged. Must be one
among the list of self.supported_types. Defaults to 'metric'.
step (Optional[int], optional): logging step. Defaults to None.
batch_idx (Optional[int], optional): DataLoader batch counter
(i.e., batch idx), if available. Defaults to None.
"""
[docs]
class Logger(LogMixin, metaclass=ABCMeta):
"""Base class for logger
Args:
savedir (str, optional): filesystem location where logs are stored.
Defaults to 'mllogs'.
log_freq (Union[int, Literal['epoch', 'batch']], optional):
how often should the logger fulfill calls to the `log()`
method:
- When set to 'epoch', the logger logs only if ``batch_idx``
is not passed to the ``log`` method.
- When an integer
is given, the logger logs if ``batch_idx`` is a multiple of
``log_freq``.
- When set to ``'batch'``, the logger logs always.
Defaults to 'epoch'.
"""
#: Location on filesystem where to store data.
savedir: str = None
#: Supported logging 'kind's.
supported_types: List[str]
_log_freq: Union[int, Literal['epoch', 'batch']]
def __init__(
self,
savedir: str = 'mllogs',
log_freq: Union[int, Literal['epoch', 'batch']] = 'epoch'
) -> None:
self.savedir = savedir
self.log_freq = log_freq
@property
def log_freq(self) -> Union[int, Literal['epoch', 'batch']]:
"""Get ``log_feq``, namely how often should the logger
fulfill or ignore calls to the `log()` method."""
return self._log_freq
@log_freq.setter
def log_freq(self, val: Union[int, Literal['epoch', 'batch']]):
"""Sanitize log_freq value."""
if val in ['epoch', 'batch'] or (isinstance(val, int) and val > 0):
self._log_freq = val
else:
raise ValueError(
"Wrong value for 'log_freq'. Supported values are: "
f"['epoch', 'batch'] or int > 0. Received: {val}"
)
[docs]
@contextmanager
def start_logging(self):
"""Start logging context.
Example:
>>> with my_logger.start_logging():
>>> my_logger.log(123, 'value', kind='metric', step=0)
"""
try:
self.create_logger_context()
yield
finally:
self.destroy_logger_context()
[docs]
@abstractmethod
def create_logger_context(self):
"""Initialize logger."""
[docs]
@abstractmethod
def destroy_logger_context(self):
"""Destroy logger."""
[docs]
@abstractmethod
def save_hyperparameters(self, params: Dict[str, Any]) -> None:
"""Save hyperparameters.
Args:
params (Dict[str, Any]): hyperparameters dictionary.
"""
[docs]
def serialize(self, obj: Any, identifier: str) -> str:
"""Serializes object to disk and returns its path.
Args:
obj (Any): item to save.
identifier (str): identifier of the item to log (expected to be a
path under ``self.savedir``).
Returns:
str: local path of the serialized object to be logged.
"""
itm_path = os.path.join(self.savedir, identifier)
with open(itm_path, 'wb') as itm_file:
pickle.dump(obj, itm_file)
[docs]
def should_log(
self,
batch_idx: Optional[int] = None
) -> bool:
"""Determines whether the logger should fulfill or ignore calls to the
`log()` method, depending on the ``log_freq`` property:
- When ``log_freq`` is set to 'epoch', the logger logs only if
``batch_idx`` is not passed to the ``log`` method.
- When ``log_freq`` is an integer
is given, the logger logs if ``batch_idx`` is a multiple of
``log_freq``.
- When ``log_freq`` is set to ``'batch'``, the logger logs always.
Args:
batch_idx (Optional[int]): the dataloader batch idx, if available.
Defaults to None.
Returns:
bool: True if the logger should log, False otherwise.
"""
if batch_idx is not None:
if isinstance(self.log_freq, int):
if batch_idx % self.log_freq == 0:
return True
return False
if self.log_freq == 'batch':
return True
return False
return True
[docs]
class ConsoleLogger(Logger):
"""Simplified logger.
Args:
savedir (str, optional): where to store artifacts.
Defaults to 'mllogs'.
log_freq (Union[int, Literal['epoch', 'batch']], optional):
determines whether the logger should fulfill or ignore
calls to the `log()` method. See ``Logger.should_log`` method for
more details. Defaults to 'epoch'.
"""
def __init__(
self,
savedir: str = 'mllogs',
log_freq: Union[int, Literal['epoch', 'batch']] = 'epoch'
) -> None:
savedir = os.path.join(savedir, 'simple-logger')
super().__init__(savedir=savedir, log_freq=log_freq)
self.supported_types = ['torch', 'artifact']
[docs]
def create_logger_context(self):
"""Initialize logger."""
os.makedirs(self.savedir, exist_ok=True)
run_dirs = sorted([int(dir) for dir in os.listdir(self.savedir)])
if len(run_dirs) == 0:
self.run_id = 0
else:
self.run_id = int(run_dirs[-1]) + 1
self.run_path = os.path.join(self.savedir, str(self.run_id))
os.makedirs(self.run_path)
[docs]
def destroy_logger_context(self):
"""Destroy logger. Do nothing."""
[docs]
def save_hyperparameters(self, params: Dict[str, Any]) -> None:
"""Save hyperparameters. Do nothing.
Args:
params (Dict[str, Any]): hyperparameters dictionary.
"""
[docs]
def log(
self,
item: Union[Any, List[Any]],
identifier: Union[str, List[str]],
kind: str = 'metric',
step: Optional[int] = None,
batch_idx: Optional[int] = None,
**kwargs
) -> None:
"""Print metrics to stdout and save artifacts to the filesystem.
Args:
item (Union[Any, List[Any]]): element to be logged (e.g., metric).
identifier (Union[str, List[str]]): unique identifier for the
element to log(e.g., name of a metric).
kind (str, optional): type of the item to be logged. Must be
one among the list of ``self.supported_types``.
Defaults to 'metric'.
step (Optional[int], optional): logging step. Defaults to None.
batch_idx (Optional[int], optional): DataLoader batch counter
(i.e., batch idx), if available. Defaults to None.
kwargs: keyword arguments to pass to the logger.
"""
if not self.should_log(batch_idx=batch_idx):
return
if kind == 'artifact':
if isinstance(item, str) and os.path.isfile(item):
import shutil
identifier = os.path.join(
self.run_path,
identifier
)
print(f"ConsoleLogger: Serializing to {identifier}...")
shutil.copyfile(item, identifier)
else:
identifier = os.path.join(
os.path.basename(self.run_path),
identifier
)
print(f"ConsoleLogger: Serializing to {identifier}...")
self.serialize(item, identifier)
elif kind == 'torch':
identifier = os.path.join(self.run_path, identifier)
print(f"ConsoleLogger: Saving to {identifier}...")
import torch
torch.save(item, identifier)
else:
print(f"ConsoleLogger: {identifier} = {item}")
[docs]
class MLFlowLogger(Logger):
"""Abstraction around MLFlow logger.
Args:
savedir (str, optional): path on local filesystem where logs are
stored. Defaults to 'mllogs'.
experiment_name (str, optional): experiment name. Defaults to
``itwinai.loggers.BASE_EXP_NAME``.
tracking_uri (Optional[str], optional): MLFLow tracking URI.
Overrides ``savedir`` if given. Defaults to None.
run_description (Optional[str], optional): run description.
Defaults to None.
log_freq (Union[int, Literal['epoch', 'batch']], optional):
determines whether the logger should fulfill or ignore
calls to the `log()` method. See ``Logger.should_log`` method for
more details. Defaults to 'epoch'.
"""
#: Current MLFLow experiment's run.
active_run: mlflow.ActiveRun
def __init__(
self,
savedir: str = 'mllogs',
experiment_name: str = BASE_EXP_NAME,
tracking_uri: Optional[str] = None,
run_description: Optional[str] = None,
log_freq: Union[int, Literal['epoch', 'batch']] = 'epoch'
):
savedir = os.path.join(savedir, 'mlflow')
super().__init__(savedir=savedir, log_freq=log_freq)
self.experiment_name = experiment_name
self.tracking_uri = tracking_uri
self.run_description = run_description
if self.tracking_uri is None:
# Default MLFLow tracking URI
saved_abs_path = os.path.abspath(self.savedir)
self.tracking_uri = pathlib.Path(saved_abs_path).as_uri()
# TODO: for pytorch lightning:
# mlflow.pytorch.autolog()
self.supported_types = [
'metric', 'figure', 'image', 'artifact', 'torch', 'dict', 'param',
'text'
]
[docs]
def create_logger_context(self):
"""Initialize logger. Start MLFLow run."""
mlflow.set_tracking_uri(self.tracking_uri)
mlflow.set_experiment(experiment_name=self.experiment_name)
self.active_run: mlflow.ActiveRun = mlflow.start_run(
description=self.run_description
)
[docs]
def destroy_logger_context(self):
"""Destroy logger. End current MLFlow run."""
mlflow.end_run()
[docs]
def save_hyperparameters(self, params: Dict[str, Any]) -> None:
"""Save hyperparameters as MLFlow parameters.
Args:
params (Dict[str, Any]): hyperparameters dictionary.
"""
for param_name, val in params.items():
self.log(item=val, identifier=param_name, step=0, kind='param')
[docs]
def log(
self,
item: Union[Any, List[Any]],
identifier: Union[str, List[str]],
kind: str = 'metric',
step: Optional[int] = None,
batch_idx: Optional[int] = None,
**kwargs
) -> None:
"""Log with MLFlow.
Args:
item (Union[Any, List[Any]]): element to be logged (e.g., metric).
identifier (Union[str, List[str]]): unique identifier for the
element to log(e.g., name of a metric).
kind (str, optional): type of the item to be logged. Must be
one among the list of ``self.supported_types``.
Defaults to 'metric'.
step (Optional[int], optional): logging step. Defaults to None.
batch_idx (Optional[int], optional): DataLoader batch counter
(i.e., batch idx), if available. Defaults to None.
kwargs: keyword arguments to pass to the logger.
"""
if not self.should_log(batch_idx=batch_idx):
return
if kind == 'metric':
# if isinstance(item, list) and isinstance(identifier, list):
mlflow.log_metric(
key=identifier,
value=item,
step=step
)
if kind == 'artifact':
if not isinstance(item, str):
# Save the object locally and then log it
name = os.path.basename(identifier)
save_path = os.path.join(self.savedir, '.trash', name)
os.makedirs(os.path.dirname(save_path), exist_ok=True)
item = self.serialize(item, save_path)
mlflow.log_artifact(
local_path=item,
artifact_path=identifier
)
if kind == 'torch':
import torch
# Save the object locally and then log it
name = os.path.basename(identifier)
save_path = os.path.join(self.savedir, '.trash', name)
os.makedirs(os.path.dirname(save_path), exist_ok=True)
torch.save(item, save_path)
# Log into mlflow
mlflow.log_artifact(
local_path=save_path,
artifact_path=identifier
)
if kind == 'dict':
mlflow.log_dict(
dictionary=item,
artifact_file=identifier
)
if kind == 'figure':
mlflow.log_figure(
artifact_file=identifier,
figure=item,
save_kwargs=kwargs.get('save_kwargs')
)
if kind == 'image':
mlflow.log_image(
artifact_file=identifier,
image=item
)
if kind == 'param':
mlflow.log_param(
key=identifier,
value=item
)
if kind == 'text':
mlflow.log_text(
artifact_file=identifier,
text=item
)
[docs]
class WanDBLogger(Logger):
"""Abstraction around WandB logger.
Args:
savedir (str, optional): location on local filesystem where logs
are stored. Defaults to 'mllogs'.
project_name (str, optional): experiment name. Defaults to
``itwinai.loggers.BASE_EXP_NAME``.
log_freq (Union[int, Literal['epoch', 'batch']], optional):
determines whether the logger should fulfill or ignore
calls to the `log()` method. See ``Logger.should_log`` method for
more details. Defaults to 'epoch'.
"""
def __init__(
self,
savedir: str = 'mllogs',
project_name: str = BASE_EXP_NAME,
log_freq: Union[int, Literal['epoch', 'batch']] = 'epoch'
) -> None:
savedir = os.path.join(savedir, 'wandb')
super().__init__(savedir=savedir, log_freq=log_freq)
self.project_name = project_name
self.supported_types = [
'watch', 'metric', 'figure', 'image', 'artifact', 'torch', 'dict',
'param', 'text'
]
[docs]
def create_logger_context(self):
"""Initialize logger. Init WandB run."""
self.active_run = wandb.init(
dir=self.savedir,
project=self.project_name
)
[docs]
def destroy_logger_context(self):
"""Destroy logger."""
[docs]
def save_hyperparameters(self, params: Dict[str, Any]) -> None:
"""Save hyperparameters.
Args:
params (Dict[str, Any]): hyperparameters dictionary.
"""
wandb.config.update(params)
[docs]
def log(
self,
item: Union[Any, List[Any]],
identifier: Union[str, List[str]],
kind: str = 'metric',
step: Optional[int] = None,
batch_idx: Optional[int] = None,
**kwargs
) -> None:
"""Log with WandB. Wrapper of https://docs.wandb.ai/ref/python/log
Args:
item (Union[Any, List[Any]]): element to be logged (e.g., metric).
identifier (Union[str, List[str]]): unique identifier for the
element to log(e.g., name of a metric).
kind (str, optional): type of the item to be logged. Must be
one among the list of ``self.supported_types``.
Defaults to 'metric'.
step (Optional[int], optional): logging step. Defaults to None.
batch_idx (Optional[int], optional): DataLoader batch counter
(i.e., batch idx), if available. Defaults to None.
kwargs: keyword arguments to pass to the logger.
"""
if not self.should_log(batch_idx=batch_idx):
return
if kind == 'watch':
wandb.watch(item)
if kind in self.supported_types[1:]:
wandb.log({identifier: item}, step=step, commit=True)
[docs]
class TensorBoardLogger(Logger):
"""Abstraction around TensorBoard logger, both for PyTorch and
TensorFlow.
Args:
savedir (str, optional): location on local filesystem where logs
are stored. Defaults to 'mllogs'.
log_freq (Union[int, Literal['epoch', 'batch']], optional):
determines whether the logger should fulfill or ignore
calls to the `log()` method. See ``Logger.should_log`` method for
more details. Defaults to 'epoch'.
framework (Literal['tensorflow', 'pytorch'], optional):
whether to log PyTorch or TensorFlow ML data.
Defaults to 'pytorch'.
Raises:
ValueError: when ``framework`` is not recognized.
"""
# TODO: decouple the logger into TorchTBLogger and TFTBLogger
# and add the missing logging types supported by each.
def __init__(
self,
savedir: str = 'mllogs',
log_freq: Union[int, Literal['epoch', 'batch']] = 'epoch',
framework: Literal['tensorflow', 'pytorch'] = 'pytorch'
) -> None:
savedir = os.path.join(savedir, 'tensorboard')
super().__init__(savedir=savedir, log_freq=log_freq)
self.framework = framework
if framework.lower() == 'tensorflow':
import tensorflow as tf
self.tf = tf
self.writer = tf.summary.create_file_writer(savedir)
elif framework.lower() == 'pytorch':
from torch.utils.tensorboard import SummaryWriter
self.writer = SummaryWriter(savedir)
else:
raise ValueError(
"Framework must be either 'tensorflow' or 'pytorch'")
self.supported_types = ['metric', 'image',
'text', 'figure', 'torch']
[docs]
def create_logger_context(self):
"""Initialize logger."""
if self.framework == 'tensorflow':
self.writer.set_as_default()
[docs]
def destroy_logger_context(self):
"""Destroy logger. Close SummaryWriter."""
self.writer.close()
[docs]
def save_hyperparameters(self, params: Dict[str, Any]) -> None:
"""Save hyperparameters.
Args:
params (Dict[str, Any]): hyperparameters dictionary.
"""
if self.framework == 'tensorflow':
from tensorboard.plugins.hparams import api as hp
hparams = {hp.HParam(k): v for k, v in params.items()}
with self.writer.as_default():
hp.hparams(hparams)
elif self.framework == 'pytorch':
self.writer.add_hparams(params, {})
[docs]
def log(
self,
item: Union[Any, List[Any]],
identifier: Union[str, List[str]],
kind: str = 'metric',
step: Optional[int] = None,
batch_idx: Optional[int] = None,
**kwargs
) -> None:
"""Log with Tensorboard.
Args:
item (Union[Any, List[Any]]): element to be logged (e.g., metric).
identifier (Union[str, List[str]]): unique identifier for the
element to log(e.g., name of a metric).
kind (str, optional): type of the item to be logged. Must be
one among the list of ``self.supported_types``.
Defaults to 'metric'.
step (Optional[int], optional): logging step. Defaults to None.
batch_idx (Optional[int], optional): DataLoader batch counter
(i.e., batch idx), if available. Defaults to None.
kwargs: keyword arguments to pass to the logger.
"""
if not self.should_log(batch_idx=batch_idx):
return
if self.framework == 'tensorflow':
with self.writer.as_default():
if kind == 'metric':
self.tf.summary.scalar(identifier, item, step=step)
elif kind == 'image':
self.tf.summary.image(identifier, item, step=step)
elif kind == 'text':
self.tf.summary.text(identifier, item, step=step)
elif kind == 'figure':
self.tf.summary.figure(identifier, item, step=step)
elif self.framework == 'pytorch':
if kind == 'metric':
self.writer.add_scalar(identifier, item, global_step=step)
elif kind == 'image':
self.writer.add_image(identifier, item, global_step=step)
elif kind == 'text':
self.writer.add_text(identifier, item, global_step=step)
elif kind == 'figure':
self.writer.add_figure(identifier, item, global_step=step)
elif kind == 'torch':
self.writer.add_graph(item)
[docs]
class LoggersCollection(Logger):
"""Wrapper of a set of loggers, allowing to use them simultaneously.
Args:
loggers (List[Logger]): list of itwinai loggers.
"""
def __init__(
self,
loggers: List[Logger]
) -> None:
super().__init__(savedir='/tmp/mllogs_LoggersCollection', log_freq=1)
self.loggers = loggers
[docs]
def should_log(self, batch_idx: int = None) -> bool:
"""Transparent method which delegates the `Logger.should_log``
to individual loggers. Always returns True.
Args:
batch_idx (int, optional): dataloader batch index.
Defaults to None.
Returns:
bool: always True.
"""
return True
[docs]
def log(
self,
item: Union[Any, List[Any]],
identifier: Union[str, List[str]],
kind: str = 'metric',
step: Optional[int] = None,
batch_idx: Optional[int] = None,
**kwargs
) -> None:
"""Log on all loggers.
Args:
item (Union[Any, List[Any]]): element to be logged (e.g., metric).
identifier (Union[str, List[str]]): unique identifier for the
element to log(e.g., name of a metric).
kind (str, optional): type of the item to be logged. Must be
one among the list of ``self.supported_types``.
Defaults to 'metric'.
step (Optional[int], optional): logging step. Defaults to None.
batch_idx (Optional[int], optional): DataLoader batch counter
(i.e., batch idx), if available. Defaults to None.
kwargs: keyword arguments to pass to the logger.
"""
for logger in self.loggers:
logger.log(
item=item,
identifier=identifier,
kind=kind,
step=step,
batch_idx=batch_idx,
**kwargs
)
[docs]
def create_logger_context(self):
"""Initialize all loggers."""
for logger in self.loggers:
logger.create_logger_context()
[docs]
def destroy_logger_context(self):
"""Destroy all loggers."""
for logger in self.loggers:
logger.destroy_logger_context()
[docs]
def save_hyperparameters(self, params: Dict[str, Any]) -> None:
"""Save hyperparameters for all loggers.
Args:
params (Dict[str, Any]): hyperparameters dictionary.
"""
for logger in self.loggers:
logger.save_hyperparameters(params=params)
[docs]
class EpochTimeTracker:
"""Profiler for epoch execution time used to support scaling tests.
It uses CSV files to store, for each epoch, the ``name`` of the
experiment, the number of compute ``nodes`` used, the ``epoch_id``,
and the execution ``time`` in seconds.
Args:
series_name (str): name of the experiment/job.
csv_file (str): path to CSV file to store experiments times.
"""
def __init__(self, series_name: str, csv_file: str) -> None:
self.series_name = series_name
self._data = []
self.csv_file = csv_file
with open(csv_file, 'w') as csvfile:
csvwriter = csv.writer(csvfile)
csvwriter.writerow(['name', 'nodes', 'epoch_id', 'time'])
[docs]
def add_epoch_time(self, epoch_idx: int, time: float) -> None:
"""Add row to the current experiment's CSV file in append mode.
Args:
epoch_idx (int): epoch order idx.
time (float): epoch execution time (seconds).
"""
n_nodes = os.environ.get('SLURM_NNODES', -1)
fields = (self.series_name, n_nodes, epoch_idx, time)
self._data.append(fields)
with open(self.csv_file, 'a') as csvfile:
csvwriter = csv.writer(csvfile)
csvwriter.writerow(fields)
[docs]
def save(self, csv_file: Optional[str] = None) -> None:
"""Save data to a new CSV file.
Args:
csv_file (Optional[str], optional): path to the CSV file.
If not given, uses the one given in the constructor.
Defaults to None.
"""
if not csv_file:
csv_file = self.csv_file
with open(csv_file, 'w') as csvfile:
csvwriter = csv.writer(csvfile)
csvwriter.writerow(['name', 'nodes', 'epoch_id', 'time'])
csvwriter.writerows(self._data)