Source code for itwinai.cli

"""
Command line interface for out Python application.
You can call commands from the command line.
Example

>>> $ itwinai --help

"""

# NOTE: import libs in the command"s function, not here.
# Otherwise this will slow the whole CLI.

from typing import Optional, List
from typing_extensions import Annotated
from pathlib import Path
import typer


app = typer.Typer(pretty_exceptions_enable=False)


[docs] @app.command() def scalability_report( pattern: Annotated[str, typer.Option( help="Python pattern matching names of CSVs in sub-folders." )], plot_title: Annotated[Optional[str], typer.Option( help=("Plot name.") )] = None, skip_id: Annotated[Optional[int], typer.Option( help=("Skip epoch ID.") )] = None, archive: Annotated[Optional[str], typer.Option( help=("Archive name to backup the data, without extension.") )] = None, ): """ Generate scalability report merging all CSVs containing epoch time records in sub-folders. Example: >>> itwinai scalability-report --pattern="^epoch.+\\.csv$" --skip-id 0 \\ >>> --plot-title "Some title" --archive archive_name """ # TODO: add max depth and path different from CWD import os import re import glob import shutil import pandas as pd import matplotlib import matplotlib.pyplot as plt import numpy as np regex = re.compile(r'{}'.format(pattern)) combined_df = pd.DataFrame() csv_files = [] for root, _, files in os.walk(os.getcwd()): for file in files: if regex.match(file): fpath = os.path.join(root, file) csv_files.append(fpath) df = pd.read_csv(fpath) if skip_id is not None: df = df.drop(df[df.epoch_id == skip_id].index) combined_df = pd.concat([combined_df, df]) print("Merged CSV:") print(combined_df) avg_times = ( combined_df .drop(columns='epoch_id') .groupby(['name', 'nodes']) .mean() .reset_index() ) print("\nAvg over name and nodes:") print(avg_times.rename(columns=dict(time='avg(time)'))) # fig, (sp_up_ax, eff_ax) = plt.subplots(1, 2, figsize=(12, 4)) fig, sp_up_ax = plt.subplots(1, 1, figsize=(6, 4)) if plot_title is not None: fig.suptitle(plot_title) sp_up_ax.set_yscale("log") sp_up_ax.set_xscale("log") markers = iter("ov^s*dXpD.+12348") series_names = sorted(set(avg_times.name.values)) for name in series_names: df = avg_times[avg_times.name == name].drop(columns='name') # Debug # compute_time = [3791., 1884., 1011., 598.] # nodes = [1, 2, 4, 8] # d = {'nodes': nodes, 'time': compute_time} # df = pd.DataFrame(data=d) df["NGPUs"] = df["nodes"]*4 # speedup df["Speedup - ideal"] = df["nodes"].astype(float) df["Speedup"] = df["time"].iloc[0] / df["time"] df["Nworkers"] = 1 # efficiency df["Threadscaled Sim. Time / s"] = df["time"] * \ df["nodes"] * df["Nworkers"] df["Efficiency"] = df["Threadscaled Sim. Time / s"].iloc[0] / \ df["Threadscaled Sim. Time / s"] sp_up_ax.plot( df["NGPUs"].values, df["Speedup"].values, marker=next(markers), lw=1.0, label=name, alpha=0.7) sp_up_ax.plot(df["NGPUs"].values, df["Speedup - ideal"].values, ls='dashed', lw=1.0, c='k', label="ideal") sp_up_ax.legend(ncol=1) sp_up_ax.set_xticks(df["NGPUs"].values) sp_up_ax.get_xaxis().set_major_formatter( matplotlib.ticker.ScalarFormatter()) sp_up_ax.set_ylabel('Speedup') sp_up_ax.set_xlabel('NGPUs (4 per node)') sp_up_ax.grid() # Sort legend handles, labels = sp_up_ax.get_legend_handles_labels() order = np.argsort(labels) plt.legend([handles[idx] for idx in order], [labels[idx] for idx in order]) plot_png = f"scaling_plot_{plot_title}.png" plt.tight_layout() plt.savefig(plot_png, bbox_inches='tight', format='png', dpi=300) print("Saved scaling plot to: ", plot_png) if archive is not None: if '/' in archive: raise ValueError("Archive name must NOT contain a path. " f"Received: '{archive}'") if '.' in archive: raise ValueError("Archive name must NOT contain an extension. " f"Received: '{archive}'") if os.path.isdir(archive): raise ValueError(f"Folder '{archive}' already exists. " "Change archive name.") os.makedirs(archive) for csvfile in csv_files: shutil.copyfile(csvfile, os.path.join(archive, os.path.basename(csvfile))) shutil.copyfile(plot_png, os.path.join(archive, plot_png)) avg_times.to_csv(os.path.join(archive, "avg_times.csv"), index=False) print("Archived AVG epoch times CSV") # Copy SLURM logs: *.err *.out files if os.path.exists('logs_slurm'): print("Archived SLURM logs") shutil.copytree('logs_slurm', os.path.join(archive, 'logs_slurm')) # Copy other SLURM logs for ext in ['*.out', '*.err']: for file in glob.glob(ext): shutil.copyfile(file, os.path.join(archive, file)) # Create archive archive_name = shutil.make_archive( base_name=archive, # archive file name format='gztar', # root_dir='.', base_dir=archive # folder path inside archive ) shutil.rmtree(archive) print("Archived logs and plot at: ", archive_name)
[docs] @app.command() def exec_pipeline( config: Annotated[Path, typer.Option( help="Path to the configuration file of the pipeline to execute." )], pipe_key: Annotated[str, typer.Option( help=("Key in the configuration file identifying " "the pipeline object to execute.") )] = "pipeline", steps: Annotated[Optional[str], typer.Option( help=("Run only some steps of the pipeline. Accepted values are " "indices, python slices (e.g., 0:3 or 2:10:100), and " "string names of steps.") )] = None, print_config: Annotated[bool, typer.Option( help=("Print config to be executed after overrides.") )] = False, overrides_list: Annotated[ Optional[List[str]], typer.Option( "--override", "-o", help=( "Nested key to dynamically override elements in the " "configuration file with the " "corresponding new value, joined by '='. It is also possible " "to index elements in lists using their list index. " "Example: [...] " "-o pipeline.init_args.trainer.init_args.lr=0.001 " "-o pipeline.my_list.2.batch_size=64 " ) ) ] = None ): """Execute a pipeline from configuration file. Allows dynamic override of fields. """ # Add working directory to python path so that the interpreter is able # to find the local python files imported from the pipeline file import os import sys import re from .utils import str_to_slice sys.path.append(os.path.dirname(config)) sys.path.append(os.getcwd()) # Parse and execute pipeline from itwinai.parser import ConfigParser overrides_list = overrides_list if overrides_list is not None else [] overrides = { k: v for k, v in map(lambda x: (x.split('=')[0], x.split('=')[1]), overrides_list) } parser = ConfigParser(config=config, override_keys=overrides) if print_config: import json print() print("#="*15 + " Used configuration " + "#="*15) print(json.dumps(parser.config, indent=2)) print("#="*50) print() pipeline = parser.parse_pipeline(pipeline_nested_key=pipe_key) if steps: if not re.match(r"\d+(:\d+)?(:\d+)?", steps): print(f"Looking for step name '{steps}'") else: steps = str_to_slice(steps) pipeline = pipeline[steps] pipeline.execute() # Cleanup PYTHONPATH sys.path.pop() sys.path.pop()
[docs] @app.command() def mlflow_ui( path: str = typer.Option("ml-logs/", help="Path to logs storage."), ): """ Visualize Mlflow logs. """ import subprocess subprocess.run(f"mlflow ui --backend-store-uri {path}".split())
[docs] @app.command() def mlflow_server( path: str = typer.Option("ml-logs/", help="Path to logs storage."), port: int = typer.Option( 5000, help="Port on which the server is listening."), ): """ Spawn Mlflow server. """ import subprocess subprocess.run( f"mlflow server --backend-store-uri {path} --port {port}".split())
[docs] @app.command() def kill_mlflow_server( port: int = typer.Option( 5000, help="Port on which the server is listening."), ): """ Kill Mlflow server. """ import subprocess subprocess.run( f"kill -9 $(lsof -t -i:{port})", shell=True, check=True, stderr=subprocess.DEVNULL )
if __name__ == "__main__": app()