# --------------------------------------------------------------------------------------
# Part of the interTwin Project: https://www.intertwin.eu/
#
# Created by: Matteo Bunino
#
# Credit:
# - Matteo Bunino <matteo.bunino@cern.ch> - CERN
# --------------------------------------------------------------------------------------
"""This module provides the base classes to define modular and reproducible ML
workflows. The base component classes provide a template to follow for
extending existing components or creating new ones.
There are two ways of creating workflows: simple and advanced workflows.
Simple workflows can be obtained by creating a sequence of components
wrapped in a Pipeline object, which executes them in cascade, passing the
output of a component as the input of the following one. It is responsibility
of the user to prevent mismatches among outputs and inputs of component
sequences. This pipeline can be configured
both in terms of parameters and structure, with a configuration file
representing the whole pipeline. This configuration file can be executed
using itwinai CLI without the need for python files.
Example:
>>> from itwinai.components import DataGetter, Saver
>>> from itwinai.pipeline import Pipeline
>>>
>>> my_pipe = Pipeline({"getter": DataGetter(...), "data_saver": Saver(...)})
>>> my_pipe.execute()
>>> my_pipe.to_yaml("training_pipe.yaml")
>>>
>>> # The pipeline can be parsed back to Python with:
>>> from itwinai.parser import PipeParser
>>> my_pipe = PipeParser("training_pipe.yaml")
>>> my_pipe.execute()
>>>
>>> # Run the pipeline from configuration file with dynamic override
>>> itwinai exec-pipeline --config training_pipe.yaml \
>>> --override pipeline.init_args.steps.data_saver.some_param 42
Advanced workflows foresee more complicated connections between the
components, thus complicating the definition of a structure structure beforehand
without risking of over-constraining the user. Therefore, advanced
workflows are defined by explicitly connecting component outputs to
to the inputs of other components, without a wrapper Pipeline object.
In this case, the configuration files enable the user to persist the
parameters passed to the argument parser, enabling reuse through
configuration files, with the possibility of dynamic overrides of parameters.
Example:
>>> from jsonargparse import ArgumentParser, ActionConfigFile
>>>
>>> parser = ArgumentParser(description='PyTorch MNIST Example')
>>> parser.add_argument('--batch-size', type=int, default=64,
>>> help='input batch size for training (default: 64)')
>>> parser.add_argument('--epochs', type=int, default=10,
>>> help='number of epochs to train (default: 10)')
>>> parser.add_argument('--lr', type=float, default=0.01,
>>> help='learning rate (default: 0.01)')
>>> parser.add_argument(
>>> "-c", "--config", action=ActionConfigFile,
>>> required=True,
>>> help="Path to a configuration file in json or yaml format."
>>> )
>>> args = parser.parse_args()
>>>
>>> from itwinai.components import (
>>> DataGetter, Saver, DataSplitter, Trainer
>>> )
>>> getter = DataGetter(...)
>>> splitter = DataSplitter(...)
>>> data_saver = Saver(...)
>>> model_saver = Saver(...)
>>> trainer = Trainer(
>>> batch_size=args.batch_size, lr=args.lr, epochs=args.epochs
>>> )
>>>
>>> # Compose workflow
>>> my_dataset = getter.execute()
>>> train_set, valid_set, test_set = splitter.execute(my_dataset)
>>> data_saver.execute("train_dataset.pkl", test_set)
>>> _, _, _, trained_model = trainer(train_set, valid_set)
>>> model_saver.execute(trained_model)
>>>
>>> # Run the script using a previous configuration with dynamic override
>>> python my_train.py --config training_pipe.yaml --lr 0.002
"""
from __future__ import annotations
import functools
import time
from abc import ABC, abstractmethod
# import logging
# from logging import Logger as PythonLogger
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
from .serialization import ModelLoader, Serializable
from .type import MLArtifact, MLDataset, MLModel
[docs]
def monitor_exec(method: Callable) -> Callable:
"""Decorator for ``BaseComponent``'s methods.
Prints when the component starts and ends executing, indicating
its execution time.
"""
@functools.wraps(method)
def wrapper(self: BaseComponent, *args, **kwargs) -> Any:
msg = f"Starting execution of '{self.name}'..."
self._printout(msg)
start_t = time.time()
try:
result = method(self, *args, **kwargs)
finally:
self.cleanup()
self.exec_t = time.time() - start_t
msg = f"'{self.name}' executed in {self.exec_t:.3f}s"
self._printout(msg)
return result
return wrapper
[docs]
class BaseComponent(ABC, Serializable):
"""Base component class. Each component provides a simple interface
to foster modularity in machine learning code. Each component class
implements the `execute` method, which received some input ML artifacts
(e.g., datasets), performs some operations and returns new artifacts.
The components are meant to be assembled in complex ML workflows,
represented as pipelines.
Args:
name (Optional[str], optional): unique identifier for a step.
Defaults to None.
"""
_name: str = None
#: Dictionary storing constructor arguments. Needed to serialize the
#: class to dictionary. Set by ``self.save_parameters()`` method.
parameters: Dict[Any, Any] = None
def __init__(
self,
name: Optional[str] = None,
# logs_dir: Optional[str] = None,
# debug: bool = False,
) -> None:
self.save_parameters(**self.locals2params(locals()))
self.name = name
@property
def name(self) -> str:
"""Name of current component. Defaults to ``self.__class__.__name__``."""
return self._name if self._name is not None else self.__class__.__name__
@name.setter
def name(self, name: str) -> None:
self._name = name
[docs]
@abstractmethod
@monitor_exec
def execute(self, *args, **kwargs) -> Any:
"""Execute some operations."""
# def setup_console(self):
# """Setup Python logging"""
# self.log_file = os.path.join(self.logs_dir, self.name + ".log")
# f_handler = logging.FileHandler(self.log_file, mode='w')
# stdout_h = logging.StreamHandler(sys.stdout)
# if self.debug:
# log_format = ("%(asctime)s %(levelname)s "
# "[%(filename)s:%(lineno)s - %(funcName)s()]: "
# "%(message)s")
# else:
# log_format = ("%(levelname)s : %(message)s")
# logging.basicConfig(
# level=logging.DEBUG if self.debug else logging.INFO,
# handlers=[f_handler, stdout_h],
# format=log_format,
# datefmt="%Y-%m-%d %H:%M:%S"
# )
# self.console = logging.getLogger(self.name)
[docs]
def cleanup(self):
"""Cleanup resources allocated by this component."""
@staticmethod
def _printout(msg: str):
msg = f"# {msg} #"
print("#" * len(msg))
print(msg)
print("#" * len(msg))
[docs]
class DataGetter(BaseComponent):
"""Retrieves a dataset."""
[docs]
@abstractmethod
@monitor_exec
def execute(self) -> MLDataset:
"""Retrieves a dataset.
Returns:
MLDataset: retrieved dataset.
"""
[docs]
class DataProcessor(BaseComponent):
"""Performs dataset pre-processing."""
[docs]
@abstractmethod
@monitor_exec
def execute(
self,
train_dataset: MLDataset,
validation_dataset: MLDataset,
test_dataset: MLDataset,
) -> Tuple[MLDataset, MLDataset, MLDataset]:
"""Trains a machine learning model.
Args:
train_dataset (MLDataset): training dataset.
validation_dataset (MLDataset): validation dataset.
test_dataset (MLDataset): test dataset.
Returns:
Tuple[MLDataset, MLDataset, MLDataset]: preprocessed training
dataset, validation dataset, test dataset.
"""
[docs]
class DataSplitter(BaseComponent):
"""Splits a dataset into train, validation, and test splits."""
_train_proportion: Union[int, float]
_validation_proportion: Union[int, float]
_test_proportion: Union[int, float]
def __init__(
self,
train_proportion: Union[int, float],
validation_proportion: Union[int, float],
test_proportion: Union[int, float],
name: Optional[str] = None,
) -> None:
super().__init__(name)
self.save_parameters(**self.locals2params(locals()))
self.train_proportion = train_proportion
self.validation_proportion = validation_proportion
self.test_proportion = test_proportion
@property
def train_proportion(self) -> Union[int, float]:
"""Training set proportion."""
return self._train_proportion
@train_proportion.setter
def train_proportion(self, prop: Union[int, float]) -> None:
if isinstance(prop, float) and not 0.0 <= prop <= 1.0:
raise ValueError(
"Train proportion should be in the interval [0.0, 1.0] "
f"if given as float. Received {prop}"
)
self._train_proportion = prop
@property
def validation_proportion(self) -> Union[int, float]:
"""Validation set proportion."""
return self._validation_proportion
@validation_proportion.setter
def validation_proportion(self, prop: Union[int, float]) -> None:
if isinstance(prop, float) and not 0.0 <= prop <= 1.0:
raise ValueError(
"Validation proportion should be in the interval [0.0, 1.0] "
f"if given as float. Received {prop}"
)
self._validation_proportion = prop
@property
def test_proportion(self) -> Union[int, float]:
"""Test set proportion."""
return self._test_proportion
@test_proportion.setter
def test_proportion(self, prop: Union[int, float]) -> None:
if isinstance(prop, float) and not 0.0 <= prop <= 1.0:
raise ValueError(
"Test proportion should be in the interval [0.0, 1.0] "
f"if given as float. Received {prop}"
)
self._test_proportion = prop
[docs]
@abstractmethod
@monitor_exec
def execute(self, dataset: MLDataset) -> Tuple[MLDataset, MLDataset, MLDataset]:
"""Splits a dataset into train, validation and test splits.
Args:
dataset (MLDataset): input dataset.
Returns:
Tuple[MLDataset, MLDataset, MLDataset]: tuple of
train, validation and test splits.
"""
[docs]
class Trainer(BaseComponent):
"""Trains a machine learning model."""
[docs]
@abstractmethod
@monitor_exec
def execute(
self,
train_dataset: MLDataset,
validation_dataset: MLDataset,
test_dataset: MLDataset,
) -> Tuple[MLDataset, MLDataset, MLDataset, MLModel]:
"""Trains a machine learning model.
Args:
train_dataset (MLDataset): training dataset.
validation_dataset (MLDataset): validation dataset.
test_dataset (MLDataset): test dataset.
Returns:
Tuple[MLDataset, MLDataset, MLDataset]: training dataset,
validation dataset, test dataset, trained model.
"""
[docs]
class Predictor(BaseComponent):
"""Applies a pre-trained machine learning model to unseen data."""
#: Pre-trained ML model used to make predictions.
model: MLModel
def __init__(
self,
model: Union[MLModel, ModelLoader],
name: Optional[str] = None,
) -> None:
super().__init__(name=name)
self.save_parameters(**self.locals2params(locals()))
self.model = model() if isinstance(model, ModelLoader) else model
[docs]
@abstractmethod
@monitor_exec
def execute(
self, predict_dataset: MLDataset, model: Optional[MLModel] = None
) -> MLDataset:
"""Applies a machine learning model on a dataset of samples.
Args:
predict_dataset (MLDataset): dataset for inference.
model (Optional[MLModel], optional): overrides the internal model,
if given. Defaults to None.
Returns:
MLDataset: predictions with the same cardinality of the
input dataset.
"""
[docs]
class Saver(BaseComponent):
"""Saves artifact to disk."""
[docs]
@abstractmethod
@monitor_exec
def execute(self, artifact: MLArtifact) -> MLArtifact:
"""Saves an ML artifact to disk.
Args:
artifact (MLArtifact): artifact to save.
Returns:
MLArtifact: the same input artifact, after saving it.
"""
[docs]
class Adapter(BaseComponent):
"""Connects to components in a sequential pipeline, allowing to
control with greater detail how intermediate results are propagated
among the components.
Args:
policy (List[Any]): list of the same length of the output of this
component, describing how to map the input args to the output.
name (Optional[str], optional): name of the component.
Defaults to None.
The adapter allows to define a policy with which inputs are re-arranged
before being propagated to the next component.
Some examples: [policy]: (input) -> (output)
- ["INPUT_ARG#2", "INPUT_ARG#1", "INPUT_ARG#0"]: (11,22,33) -> (33,22,11)
- ["INPUT_ARG#0", "INPUT_ARG#2", None]: (11, 22, 33) -> (11, 33, None)
- []: (11, 22, 33) -> ()
- [42, "INPUT_ARG#2", "hello"] -> (11,22,33,44,55) -> (42, 33, "hello")
- [None, 33, 3.14]: () -> (None, 33, 3.14)
- [None, 33, 3.14]: ("double", 44, None, True) -> (None, 33, 3.14)
"""
#: Adapter policy.
policy: List[Any]
INPUT_PREFIX: str = "INPUT_ARG#"
def __init__(self, policy: List[Any], name: Optional[str] = None) -> None:
super().__init__(name=name)
self.save_parameters(**self.locals2params(locals()))
self.name = name
self.policy = policy
[docs]
@monitor_exec
def execute(self, *args) -> Tuple:
"""Produces an output tuple by arranging input arguments according
to the policy specified in the constructor.
Args:
args (Tuple): input arguments.
Returns:
Tuple: input args arranged according to some policy.
"""
result = []
for itm in self.policy:
if not (isinstance(itm, str) and itm.startswith(self.INPUT_PREFIX)):
result.append(itm)
continue
arg_idx = int(itm[len(self.INPUT_PREFIX) :])
if arg_idx >= len(args):
max_idx = max(
map(
lambda itm: int(itm[len(self.INPUT_PREFIX) :]),
filter(
lambda el: (
isinstance(el, str) and el.startswith(self.INPUT_PREFIX)
),
self.policy,
),
)
)
raise IndexError(
f"The args received as input by '{self.name}' "
"are not consistent with the given adapter policy "
"because input args are too few! "
f"Input args are {len(args)} but the policy foresees "
f"at least {max_idx+1} items."
)
result.append(args[arg_idx])
return tuple(result)