DAG workflows

Author(s): Matteo Bunino (CERN)

In the first two tutorials we saw how to define simple sequential workflows by means of the Pipeline object, which feds the outputs of the previous component as inputs of the following one. In this tutorial we show how to create more complex workflows, with non-sequential data flows. Here, components can be arranges as an directed acyclic graph (DAG). Under the DAG assumption, outputs of each block can be fed as input potentially to any other component, granting great flexibility to the experimenter. The trade-off for improved flexibility is a change in the way we define configuration files. From now on, it will only be possible to configure the parameters used by the training script, but not its structure through the Pipeline.

Here you can find a graphical representation of the DAG workflow implemented below: dag_wf

[1]:
from typing import Any
from itwinai.components import Predictor, monitor_exec

from basic_components import (
    MyDataGetter, MyDatasetSplitter, MyTrainer, MySaver
)
[2]:
class MyEnsemblePredictor(Predictor):
    @monitor_exec
    def execute(self, dataset, model_ensemble) -> Any:
        """
        do some predictions with model on dataset...
        """
        return dataset

# Parameters
DATA_SIZE = 123
TRAIN_PROP = .5
VALIDATION_PROP = 0.2
LR_MODEL1 = 1e-3
LR_MODEL2 = 0.1
[3]:
# Define workflow components
getter = MyDataGetter(data_size=DATA_SIZE)
splitter = MyDatasetSplitter(
    train_proportion=TRAIN_PROP,
    validation_proportion=VALIDATION_PROP,
    test_proportion=1 - TRAIN_PROP - VALIDATION_PROP
)
trainer1 = MyTrainer(lr=LR_MODEL2)
trainer2 = MyTrainer(lr=LR_MODEL1)
saver = MySaver()
predictor = MyEnsemblePredictor(model=None)

# Define ML workflow
dataset = getter.execute()
train_spl, val_spl, test_spl = splitter.execute(dataset)
_, _, _, trained_model1 = trainer1.execute(train_spl, val_spl, test_spl)
_, _, _, trained_model2 = trainer2.execute(train_spl, val_spl, test_spl)
_ = saver.execute(trained_model1)
predictions = predictor.execute(test_spl, [trained_model1, trained_model2])
print()
print("Predictions: " + str(predictions))
###########################################
# Starting execution of 'MyDataGetter'... #
###########################################
#####################################
# 'MyDataGetter' executed in 0.000s #
#####################################
################################################
# Starting execution of 'MyDatasetSplitter'... #
################################################
##########################################
# 'MyDatasetSplitter' executed in 0.000s #
##########################################
########################################
# Starting execution of 'MyTrainer'... #
########################################
##################################
# 'MyTrainer' executed in 0.000s #
##################################
########################################
# Starting execution of 'MyTrainer'... #
########################################
##################################
# 'MyTrainer' executed in 0.000s #
##################################
######################################
# Starting execution of 'MySaver'... #
######################################
################################
# 'MySaver' executed in 0.000s #
################################
##################################################
# Starting execution of 'MyEnsemblePredictor'... #
##################################################
############################################
# 'MyEnsemblePredictor' executed in 0.000s #
############################################

Predictions: [85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122]