from collections import Counterfrom itertools import chainfrom typing import Any, Dict, Iterable, List, Setfrom kedro.framework.hooks.manager import _NullPluginManagerfrom kedro.io import AbstractDataSet, DataCatalog, MemoryDataSetfrom kedro.pipeline import Pipelinefrom kedro.pipeline.node import Nodefrom kedro.runner import SequentialRunnerfrom kedro.runner.runner import AbstractRunner, run_nodefrom pluggy import PluginManagerclass DebugRunner(SequentialRunner):def run(self, pipeline: Pipeline, catalog: DataCatalog, dataset_names: List[str] =None, hook_manager: PluginManager =None, session_id: str=None, ) -> Dict[str, Any]:"""Run the ``Pipeline`` using the datasets provided by ``catalog`` and save results back to the same objects. Args: pipeline: The ``Pipeline`` to run. catalog: The ``DataCatalog`` from which to fetch data. hook_manager: The ``PluginManager`` to activate hooks. session_id: The id of the session. Raises: ValueError: Raised when ``Pipeline`` inputs cannot be satisfied. Returns: Any node outputs that cannot be processed by the ``DataCatalog``. These are returned in a dictionary, where the keys are defined by the node outputs. """ifnot dataset_names: dataset_names = [] hook_manager = hook_manager or _NullPluginManager() catalog = catalog.shallow_copy() unsatisfied = pipeline.inputs() -set(catalog.list())if unsatisfied:raiseValueError(f"Pipeline input(s) {unsatisfied} not found in the DataCatalog" ) free_outputs = ( pipeline.outputs() ) # Return everything regardless if it it's in catalog unregistered_ds = pipeline.data_sets() -set(catalog.list())for ds_name in unregistered_ds: catalog.add(ds_name, self.create_default_data_set(ds_name))ifself._is_async:self._logger.info("Asynchronous mode is enabled for loading and saving data" )self._run(pipeline, catalog, dataset_names, hook_manager, session_id)self._logger.info("Pipeline execution completed successfully.") free_outputs = free_outputs |set(dataset_names) # Unionreturn {ds_name: catalog.load(ds_name) for ds_name in free_outputs}def _run(self, pipeline: Pipeline, catalog: DataCatalog, dataset_names: List[str], hook_manager: PluginManager, session_id: str=None, ) ->None:"""The method implementing sequential pipeline running. Args: pipeline: The ``Pipeline`` to run. catalog: The ``DataCatalog`` from which to fetch data. hook_manager: The ``PluginManager`` to activate hooks. session_id: The id of the session. Raises: Exception: in case of any downstream node failure. """ nodes = pipeline.nodes done_nodes =set() load_counts = Counter(chain.from_iterable(n.inputs for n in nodes))for exec_index, node inenumerate(nodes):try: run_node(node, catalog, hook_manager, self._is_async, session_id) done_nodes.add(node)exceptException:self._suggest_resume_scenario(pipeline, done_nodes, catalog)raise# decrement load counts and release any data sets we've finished withfor data_set in node.inputs: load_counts[data_set] -=1if load_counts[data_set] <1and data_set notin pipeline.inputs():if data_set notin dataset_names: catalog.release(data_set)for data_set in node.outputs:if load_counts[data_set] <1and data_set notin pipeline.outputs():if data_set notin dataset_names: catalog.release(data_set)self._logger.info("Completed %d out of %d tasks", exec_index +1, len(nodes) )
:::
# `DebugRunner` has to be used in a different way since `session.run` don't support additional argument, so we are going to use a lower level approach and construct `Runner` and `Pipeline` and `DataCatalog` ourselves.# Testing Kedro Project: https://github.com/noklam/kedro_gallery/tree/master/kedro-debug-runner-demo%load_ext kedro.ipython%reload_kedro ~/dev/kedro_gallery/kedro-debug-runner-demo
The kedro.ipython extension is already loaded. To reload it, use:
%reload_ext kedro.ipython
[10/06/22 14:45:20] INFO Updated path to Kedro project: __init__.py:54
/Users/Nok_Lam_Chan/dev/kedro_galler
y/kedro-debug-runner-demo
[10/06/22 14:45:22] INFO Kedro project __init__.py:77
kedro_debug_runner_demo
INFO Defined global variable 'context', __init__.py:78
'session', 'catalog' and 'pipelines'
INFO Updated path to Kedro project: __init__.py:54
/Users/Nok_Lam_Chan/dev/kedro_galler
y/kedro-debug-runner-demo
[10/06/22 14:45:24] INFO Kedro project __init__.py:77
kedro_debug_runner_demo
INFO Defined global variable 'context', __init__.py:78
'session', 'catalog' and 'pipelines'
INFO Loading data from data_catalog.py:343
'example_iris_data'
(CSVDataSet)...
INFO Loading data from 'parameters' data_catalog.py:343
(MemoryDataSet)...
INFO Running node: split: node.py:327
split_data([example_iris_data,parameter
s]) -> [X_train,X_test,y_train,y_test]
INFO Saving data to 'X_train' data_catalog.py:382
(MemoryDataSet)...
INFO Saving data to 'X_test' data_catalog.py:382
(MemoryDataSet)...
INFO Saving data to 'y_train' data_catalog.py:382
(MemoryDataSet)...
INFO Saving data to 'y_test' data_catalog.py:382
(MemoryDataSet)...
INFO Loading data from 'X_train' data_catalog.py:343
(MemoryDataSet)...
INFO Loading data from 'X_test' data_catalog.py:343
(MemoryDataSet)...
INFO Loading data from 'y_train' data_catalog.py:343
(MemoryDataSet)...
INFO Running node: make_predictions: node.py:327
make_predictions([X_train,X_test,y_trai
n]) -> [y_pred]
INFO Saving data to 'y_pred' data_catalog.py:382
(MemoryDataSet)...
INFO Loading data from 'y_pred' data_catalog.py:343
(MemoryDataSet)...
INFO Loading data from 'y_test' data_catalog.py:343
(MemoryDataSet)...
INFO Running node: report_accuracy: node.py:327
report_accuracy([y_pred,y_test]) ->
None
INFO Model has accuracy of 0.933 on test nodes.py:74
data.
[10/06/22 14:45:27] INFO Loading data from data_catalog.py:343
'example_iris_data'
(CSVDataSet)...
INFO Loading data from 'parameters' data_catalog.py:343
(MemoryDataSet)...
INFO Running node: split: node.py:327
split_data([example_iris_data,parameter
s]) -> [X_train,X_test,y_train,y_test]
INFO Saving data to 'X_train' data_catalog.py:382
(MemoryDataSet)...
INFO Saving data to 'X_test' data_catalog.py:382
(MemoryDataSet)...
INFO Saving data to 'y_train' data_catalog.py:382
(MemoryDataSet)...
INFO Saving data to 'y_test' data_catalog.py:382
(MemoryDataSet)...
INFO Loading data from 'X_train' data_catalog.py:343
(MemoryDataSet)...
INFO Loading data from 'X_test' data_catalog.py:343
(MemoryDataSet)...
INFO Loading data from 'y_train' data_catalog.py:343
(MemoryDataSet)...
INFO Running node: make_predictions: node.py:327
make_predictions([X_train,X_test,y_trai
n]) -> [y_pred]
INFO Saving data to 'y_pred' data_catalog.py:382
(MemoryDataSet)...
INFO Loading data from 'y_pred' data_catalog.py:343
(MemoryDataSet)...
INFO Loading data from 'y_test' data_catalog.py:343
(MemoryDataSet)...
INFO Running node: report_accuracy: node.py:327
report_accuracy([y_pred,y_test]) ->
None
INFO Model has accuracy of 0.933 on test nodes.py:74
data.
INFO Loading data from data_catalog.py:343
'example_iris_data'
(CSVDataSet)...
[10/06/22 14:46:01] INFO Loading data from data_catalog.py:343
'example_iris_data'
(CSVDataSet)...
INFO Loading data from 'parameters' data_catalog.py:343
(MemoryDataSet)...
INFO Running node: split: node.py:327
split_data([example_iris_data,parameter
s]) -> [X_train,X_test,y_train,y_test]
INFO Saving data to 'X_train' data_catalog.py:382
(MemoryDataSet)...
INFO Saving data to 'X_test' data_catalog.py:382
(MemoryDataSet)...
INFO Saving data to 'y_train' data_catalog.py:382
(MemoryDataSet)...
INFO Saving data to 'y_test' data_catalog.py:382
(MemoryDataSet)...
INFO Loading data from 'X_train' data_catalog.py:343
(MemoryDataSet)...
INFO Loading data from 'X_test' data_catalog.py:343
(MemoryDataSet)...
INFO Loading data from 'y_train' data_catalog.py:343
(MemoryDataSet)...
INFO Running node: make_predictions: node.py:327
make_predictions([X_train,X_test,y_trai
n]) -> [y_pred]
INFO Saving data to 'y_pred' data_catalog.py:382
(MemoryDataSet)...
INFO Loading data from 'y_pred' data_catalog.py:343
(MemoryDataSet)...
INFO Loading data from 'y_test' data_catalog.py:343
(MemoryDataSet)...
INFO Running node: report_accuracy: node.py:327
report_accuracy([y_pred,y_test]) ->
None
INFO Model has accuracy of 0.933 on test nodes.py:74
data.
INFO Loading data from 'X_train' data_catalog.py:343
(MemoryDataSet)...
class GreedySequentialRunner(SequentialRunner):def run(self, pipeline: Pipeline, catalog: DataCatalog, hook_manager: PluginManager =None, session_id: str=None, ) -> Dict[str, Any]:"""Run the ``Pipeline`` using the datasets provided by ``catalog`` and save results back to the same objects. Args: pipeline: The ``Pipeline`` to run. catalog: The ``DataCatalog`` from which to fetch data. hook_manager: The ``PluginManager`` to activate hooks. session_id: The id of the session. Raises: ValueError: Raised when ``Pipeline`` inputs cannot be satisfied. Returns: Any node outputs that cannot be processed by the ``DataCatalog``. These are returned in a dictionary, where the keys are defined by the node outputs. """ hook_manager = hook_manager or _NullPluginManager() catalog = catalog.shallow_copy() unsatisfied = pipeline.inputs() -set(catalog.list())if unsatisfied:raiseValueError(f"Pipeline input(s) {unsatisfied} not found in the DataCatalog" ) free_outputs = pipeline.outputs() # Return everything regardless if it it's in catalog unregistered_ds = pipeline.data_sets() -set(catalog.list())for ds_name in unregistered_ds: catalog.add(ds_name, self.create_default_data_set(ds_name))ifself._is_async:self._logger.info("Asynchronous mode is enabled for loading and saving data" )self._run(pipeline, catalog, hook_manager, session_id)self._logger.info("Pipeline execution completed successfully.")return {ds_name: catalog.load(ds_name) for ds_name in free_outputs}