# --------------------------------------------------------------------------------------
# Part of the interTwin Project: https://www.intertwin.eu/
#
# Created by: Matteo Bunino
#
# Credit:
# - Matteo Bunino <matteo.bunino@cern.ch> - CERN
# --------------------------------------------------------------------------------------
"""This module provides the functionalities to execute workflows defined in
in form of pipelines.
"""
from __future__ import annotations
from typing import Any, Dict, Iterable, Optional, Tuple, Union
from .components import BaseComponent, monitor_exec
from .utils import SignatureInspector
[docs]
class Pipeline(BaseComponent):
"""Executes a set of components arranged as a pipeline."""
#: Pipeline steps. Can be a list of ``BaseComponent`` or a dictionary
#: allowing the user to name each ``BaseComponent``.
steps: Union[Dict[str, BaseComponent], Iterable[BaseComponent]]
def __init__(
self,
steps: Union[Dict[str, BaseComponent], Iterable[BaseComponent]],
name: Optional[str] = None,
):
super().__init__(name=name)
self.save_parameters(steps=steps, name=name)
self.steps = steps
def __getitem__(self, subscript: Union[str, int, slice]) -> Pipeline:
if isinstance(subscript, slice):
# First, convert to list if is a dict
if isinstance(self.steps, dict):
steps = list(self.steps.items())
else:
steps = self.steps
# Second, perform slicing
s = steps[subscript.start : subscript.stop : subscript.step]
# Third, reconstruct dict, if it is a dict
if isinstance(self.steps, dict):
s = dict(s)
# Fourth, return sliced sub-pipeline, preserving its
# initial structure
sliced = self.__class__(steps=s, name=self.name)
return sliced
else:
return self.steps[subscript]
def __len__(self) -> int:
return len(self.steps)
[docs]
@monitor_exec
def execute(self, *args) -> Any:
""" "Execute components sequentially."""
if isinstance(self.steps, dict):
steps = list(self.steps.values())
else:
steps = self.steps
for step in steps:
step: BaseComponent
args = self._pack_args(args)
self.validate_args(args, step)
args = step.execute(*args)
return args
@staticmethod
def _pack_args(args) -> Tuple:
"""Wraps args in a tuple, if needed."""
args = () if args is None else args
if not isinstance(args, tuple):
args = (args,)
return args
[docs]
@staticmethod
def validate_args(input_args: Tuple, component: BaseComponent):
"""Verify that the number of input args provided to some component
match with the number of the non-default args in the component.
Args:
input_args (Tuple): input args to be fed to the component.
component (BaseComponent): component to be executed.
Raises:
RuntimeError: in case of args mismatch.
"""
inspector = SignatureInspector(component.execute)
if inspector.min_params_num > len(input_args):
raise TypeError(
f"Component '{component.name}' received too few "
f"input arguments: {input_args}. Expected at least "
f"{inspector.min_params_num}, with names: "
f"{inspector.required_params}."
)
if (
inspector.max_params_num != inspector.INFTY
and len(input_args) > inspector.max_params_num
):
raise TypeError(
f"Component '{component.name}' received too many "
f"input arguments: {input_args}. Expected at most "
f"{inspector.max_params_num}."
)