{ "cells": [ { "cell_type": "markdown", "id": "e5064a94", "metadata": {}, "source": [ "# Simple Workflow and Pipeline Components\n", "\n", "**Author(s)**: Matteo Bunino (CERN), Anna Elisa Lappe (CERN)" ] }, { "cell_type": "markdown", "id": "588e0f6a", "metadata": {}, "source": [ "The most simple workflow that you can write is a sequential pipeline of steps,\n", "where the outputs of a component are fed as input to the following component,\n", "employing a scikit-learn-like Pipeline.\n", "\n", "Itwinai defines each step as a \"component\". Components are implemented by extending\n", "the ``itwinai.components.BaseComponent`` class. Each component implements\n", "the `execute(...)` method, which provides a unified interface for interaction.\n", "\n", "The aim of itwinai components is to provide reusable machine learning best\n", "practices.\n", "To this end, some common operations are already encoded in abstract\n", "components. Some examples are:\n", "- ``DataGetter``: has no input and returns a dataset, collected from somewhere\n", "(e.g., downloaded).\n", "- ``DataSplitter``: splits an input dataset into train, validation and test.\n", "- ``DataPreproc``: perform preprocessing on train, validation, and test\n", "datasets.\n", "- ``Trainer``: trains an ML model and returns the trained model.\n", "- ``Saver``: saved an ML artifact (e.g., dataset, model) to disk.\n", "\n", "In this tutorial, you will see how to create new components and how they\n", "are assembled into sequential pipelines." ] }, { "cell_type": "code", "execution_count": null, "id": "c30284ef", "metadata": {}, "outputs": [], "source": [ "from typing import List, Optional, Tuple\n", "\n", "from itwinai.components import DataGetter, DataSplitter, Trainer, monitor_exec\n", "from itwinai.pipeline import Pipeline" ] }, { "cell_type": "markdown", "id": "1a6f5a79", "metadata": {}, "source": [ "## Creating dummy components" ] }, { "cell_type": "code", "execution_count": 2, "id": "26b7d5a2", "metadata": {}, "outputs": [], "source": [ "\n", "class MyDataGetter(DataGetter):\n", " def __init__(self, data_size: int, name: Optional[str] = None) -> None:\n", " super().__init__(name)\n", " self.data_size = data_size\n", " self.save_parameters(data_size=data_size)\n", "\n", " @monitor_exec\n", " def execute(self) -> List[int]:\n", " \"\"\"Return a list dataset.\n", "\n", " Returns:\n", " List[int]: dataset\n", " \"\"\"\n", " return list(range(self.data_size))\n", "\n", "\n", "class MyDatasetSplitter(DataSplitter):\n", " @monitor_exec\n", " def execute(\n", " self,\n", " dataset: List[int]\n", " ) -> Tuple[List[int], List[int], List[int]]:\n", " \"\"\"Splits a list dataset into train, validation and test datasets.\n", "\n", " Args:\n", " dataset (List[int]): input list dataset.\n", "\n", " Returns:\n", " Tuple[List[int], List[int], List[int]]: train, validation, and\n", " test datasets respectively.\n", " \"\"\"\n", " train_n = int(len(dataset)*self.train_proportion)\n", " valid_n = int(len(dataset)*self.validation_proportion)\n", " train_set = dataset[:train_n]\n", " vaild_set = dataset[train_n:train_n+valid_n]\n", " test_set = dataset[train_n+valid_n:]\n", " return train_set, vaild_set, test_set\n", "\n", "\n", "class MyTrainer(Trainer):\n", " def __init__(self, lr: float = 1e-3, name: Optional[str] = None) -> None:\n", " super().__init__(name)\n", " self.save_parameters(name=name, lr=lr)\n", "\n", " @monitor_exec\n", " def execute(\n", " self,\n", " train_set: List[int],\n", " vaild_set: List[int],\n", " test_set: List[int]\n", " ) -> Tuple[List[int], List[int], List[int], str]:\n", " \"\"\"Dummy ML trainer mocking a ML training algorithm.\n", "\n", " Args:\n", " train_set (List[int]): training dataset.\n", " vaild_set (List[int]): validation dataset.\n", " test_set (List[int]): test dataset.\n", "\n", " Returns:\n", " Tuple[List[int], List[int], List[int], str]: train, validation,\n", " test datasets, and trained model.\n", " \"\"\"\n", " return train_set, vaild_set, test_set, \"my_trained_model\"" ] }, { "cell_type": "markdown", "id": "1271e285", "metadata": {}, "source": [ "## Running the pipeline\n", "\n", "Here you can find a graphical representation of the pipeline implemented below:\n", "\n", "\n", "![dag_wf](sample_pipeline_1.jpg)" ] }, { "cell_type": "code", "execution_count": 4, "id": "826fe8c7", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "<__main__.MyDataGetter object at 0x7f77c6fc17b0>\n", "MyTrainer\n", "0.5\n", "#######################################\n", "# Starting execution of 'Pipeline'... #\n", "#######################################\n", "###########################################\n", "# Starting execution of 'MyDataGetter'... #\n", "###########################################\n", "#####################################\n", "# 'MyDataGetter' executed in 0.000s #\n", "#####################################\n", "################################################\n", "# Starting execution of 'MyDatasetSplitter'... #\n", "################################################\n", "##########################################\n", "# 'MyDatasetSplitter' executed in 0.000s #\n", "##########################################\n", "########################################\n", "# Starting execution of 'MyTrainer'... #\n", "########################################\n", "##################################\n", "# 'MyTrainer' executed in 0.000s #\n", "##################################\n", "#################################\n", "# 'Pipeline' executed in 0.006s #\n", "#################################\n", "Trained model: my_trained_model\n", "<__main__.MyDataGetter object at 0x7f77c7f51120>\n", "MyTrainer\n", "0.5\n", "#######################################\n", "# Starting execution of 'Pipeline'... #\n", "#######################################\n", "###########################################\n", "# Starting execution of 'MyDataGetter'... #\n", "###########################################\n", "#####################################\n", "# 'MyDataGetter' executed in 0.000s #\n", "#####################################\n", "################################################\n", "# Starting execution of 'MyDatasetSplitter'... #\n", "################################################\n", "##########################################\n", "# 'MyDatasetSplitter' executed in 0.000s #\n", "##########################################\n", "########################################\n", "# Starting execution of 'MyTrainer'... #\n", "########################################\n", "##################################\n", "# 'MyTrainer' executed in 0.000s #\n", "##################################\n", "#################################\n", "# 'Pipeline' executed in 0.001s #\n", "#################################\n", "Trained model: my_trained_model\n" ] } ], "source": [ "# Assemble them in a scikit-learn like pipeline\n", "pipeline = Pipeline([\n", " MyDataGetter(data_size=100),\n", " MyDatasetSplitter(\n", " train_proportion=.5,\n", " validation_proportion=.25,\n", " test_proportion=0.25\n", " ),\n", " MyTrainer()\n", "])\n", "\n", "# Inspect steps\n", "print(pipeline[0])\n", "print(pipeline[2].name)\n", "print(pipeline[1].train_proportion)\n", "\n", "# Run pipeline\n", "_, _, _, trained_model = pipeline.execute()\n", "print(\"Trained model: \", trained_model)\n", "\n", "# You can also create a Pipeline from a dict of components, which\n", "# simplifies their retrieval by name\n", "pipeline = Pipeline({\n", " \"datagetter\": MyDataGetter(data_size=100),\n", " \"splitter\": MyDatasetSplitter(\n", " train_proportion=.5,\n", " validation_proportion=.25,\n", " test_proportion=0.25\n", " ),\n", " \"trainer\": MyTrainer()\n", "})\n", "\n", "# Inspect steps\n", "print(pipeline[\"datagetter\"])\n", "print(pipeline[\"trainer\"].name)\n", "print(pipeline[\"splitter\"].train_proportion)\n", "\n", "# Run pipeline\n", "_, _, _, trained_model = pipeline.execute()\n", "print(\"Trained model: \", trained_model)" ] }, { "cell_type": "markdown", "id": "8cea5fe1", "metadata": {}, "source": [ "## The Adapter Component\n", "\n", "Now we saw how to create new components and assemble them into a Pipeline for a simplified workflow execution. The Pipeline executes the components in the order in which they are given, assuming that the outputs of a component will fit as inputs of the following component. This is not always true, thus you can use the Adapter component to compensate for mismatches. This component allows users to define a policy to rearrange intermediate results between two components. Below you will see an example that uses the same components we just used to assemble our pipeline, with the addition of a `Saver` and `Adapter` component to save the ML model we have created after running the `Trainer`." ] }, { "cell_type": "code", "execution_count": null, "id": "8ef96bfa", "metadata": {}, "outputs": [], "source": [ "from itwinai.components import Adapter\n", "from itwinai.pipeline import Pipeline\n", "\n", "# Now we are importing the components from an external module\n", "from basic_components import MyDataGetter, MyDatasetSplitter, MySaver, MyTrainer\n", "\n", "# In this pipeline, the MyTrainer produces 4 elements as output: train,\n", "# validation, test datasets, and trained model. The Adapter selects the\n", "# trained model only, and forwards it to the saver, which expects a single\n", "# item as input.\n", "pipeline = Pipeline([\n", " MyDataGetter(data_size=100),\n", " MyDatasetSplitter(\n", " train_proportion=.5,\n", " validation_proportion=.25,\n", " test_proportion=0.25\n", " ),\n", " MyTrainer(),\n", " Adapter(policy=[f\"{Adapter.INPUT_PREFIX}-1\"]),\n", " MySaver()\n", "])\n", "\n", "# Run pipeline\n", "trained_model = pipeline.execute()\n", "print(\"Trained model: \", trained_model)" ] }, { "cell_type": "markdown", "id": "243f273e", "metadata": {}, "source": [ "## Running the pipeline\n", "\n", "Here you can find a graphical representation of the pipeline implemented above.\n", "\n", "![pipeline](sample_pipeline_2.jpg)" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.0" } }, "nbformat": 4, "nbformat_minor": 5 }