import kedro
kedro.__version__
'0.17.4'
noklam
July 2, 2021
Today I am gonna explain some kedro
internals to understnad how kedor manage your dataset. If you always write imperative python code, you may find that writing nodes
and pipeline
is a little bti awkward. They may seems less intuitive, however, it also enable some interesting featrue.
This article assumes you have basic understanding of kedro
, I will focus on CacheDataSet
and the auto-release dataset feature of kedro pipeline. It is useful to reduce your memory footprint without encountering the infamous Out of Memory (OOM) issue.
To start with, we have the default iris dataset. Normally we would do it in a YAML file, but to make things easier in Notebook, I’ll keep everything compact in a notebook.
Next, we have a pipeline follows this execution order: A -> B -> C
from kedro.pipeline import Pipeline, node
import pandas as pd
def A(df):
print('Loading the Iris Dataset')
return 'Step1'
def B(dummy):
return 'Step2'
def C(dummy):
return 'Step3'
pipeline = Pipeline([node(A, "iris", "A"),
node(B, "A", "B"),
node(C, "B", "C"),
])
c:\programdata\miniconda3\lib\site-packages\ipykernel\ipkernel.py:283: DeprecationWarning: `should_run_async` will not call `transform_cell` automatically in the future. Please pass the result to `transformed_cell` argument and any exception that happen during thetransform in `preprocessing_exc_tuple` in IPython 7.17 and above.
and should_run_async(code)
To zoom in to the pipeline, we can use Hook
to print out the catalog after every node’s run.
from kedro.framework.hooks import hook_impl
from kedro.framework.hooks import get_hook_manager
from pprint import pprint
def apply_dict(d):
new_dict = {}
for k, v in d.items():
if isinstance(v, CachedDataSet):
if v._cache.exists():
print(v._cache._data)
new_dict[k] = 'In Memory'
else:
new_dict[k] ='Cache Deleted'
elif v.exists():
new_dict[k] = 'In Memory'
return new_dict
class DebugHook:
"""A hook class for creating a post mortem debugging with the PDB debugger
whenever an error is triggered within a pipeline. The local scope from when the
exception occured is available within this debugging session.
"""
@hook_impl
def after_node_run(self, node, catalog):
# adding extra behaviour to a single node
print(f"Finish node {node.name}")
pprint(f"Print Catalog {apply_dict(catalog._data_sets)}")
# pprint(f"Print Catalog {apply_dict2(lambda x:x.exists(), catalog._data_sets)}")
print("*****************************")
hook_manager = get_hook_manager()
debug_hook = hook_manager.register(DebugHook());
This hook will print out dataset that exist in data catalog. It is a bit tricky because kedro
did not delete the dataset, it marked the underlying data as _EMPTY
object instead.
# Create a runner to run the pipeline
runner = SequentialRunner()
# Run the pipeline
runner.run(pipeline, data_catalog);
Loading the Iris Dataset
Finish node A([iris]) -> [A]
"Print Catalog {'iris': 'In Memory'}"
*****************************
Finish node B([A]) -> [B]
"Print Catalog {'iris': 'In Memory', 'A': 'In Memory'}"
*****************************
Finish node C([B]) -> [C]
"Print Catalog {'iris': 'In Memory', 'B': 'In Memory'}"
*****************************
Let’s have a look at what happened when a SequentialRunner
runs a pipeline.
It is interesting to note that kedro
takes a similar approach to Python
, it uses reference counting
to control the dataset life cycle. If you are interested, I have another post to dive into Python Memory Management.
# decrement load counts and release any data sets we've finished with
for data_set in node.inputs:
load_counts[data_set] -= 1
if load_counts[data_set] < 1 and data_set not in pipeline.inputs():
catalog.release(data_set)
for data_set in node.outputs:
if load_counts[data_set] < 1 and data_set not in pipeline.outputs():
catalog.release(data_set)
What does release
do? It will remove the underlying data if this data is stored in memory.
# In CSVDataSet
https://github.com/quantumblacklabs/kedro/blob/master/kedro/extras/datasets/pandas/csv_dataset.py#L176-L178
```python
def _release(self) -> None:
super()._release()
self._invalidate_cache()
First, we can test if it works as expected.
sepal_length | sepal_width | petal_length | petal_width | species | |
---|---|---|---|---|---|
0 | 5.1 | 3.5 | 1.4 | 0.2 | setosa |
1 | 4.9 | 3.0 | 1.4 | 0.2 | setosa |
2 | 4.7 | 3.2 | 1.3 | 0.2 | setosa |
3 | 4.6 | 3.1 | 1.5 | 0.2 | setosa |
4 | 5.0 | 3.6 | 1.4 | 0.2 | setosa |
c:\programdata\miniconda3\lib\site-packages\ipykernel\ipkernel.py:283: DeprecationWarning: `should_run_async` will not call `transform_cell` automatically in the future. Please pass the result to `transformed_cell` argument and any exception that happen during thetransform in `preprocessing_exc_tuple` in IPython 7.17 and above.
and should_run_async(code)
True
This is the expected behavior, where the cache should be released. However, it seems not to be the case when I run the pipeline.
data_catalog = DataCatalog({"iris": CachedDataSet(CSVDataSet('data/01_raw/iris.csv'))})
runner.run(pipeline, data_catalog)
Loading the Iris Dataset
Finish node A([iris]) -> [A]
sepal_length sepal_width petal_length petal_width species
0 5.1 3.5 1.4 0.2 setosa
1 4.9 3.0 1.4 0.2 setosa
2 4.7 3.2 1.3 0.2 setosa
3 4.6 3.1 1.5 0.2 setosa
4 5.0 3.6 1.4 0.2 setosa
.. ... ... ... ... ...
145 6.7 3.0 5.2 2.3 virginica
146 6.3 2.5 5.0 1.9 virginica
147 6.5 3.0 5.2 2.0 virginica
148 6.2 3.4 5.4 2.3 virginica
149 5.9 3.0 5.1 1.8 virginica
[150 rows x 5 columns]
"Print Catalog {'iris': 'In Memory'}"
*****************************
Finish node B([A]) -> [B]
sepal_length sepal_width petal_length petal_width species
0 5.1 3.5 1.4 0.2 setosa
1 4.9 3.0 1.4 0.2 setosa
2 4.7 3.2 1.3 0.2 setosa
3 4.6 3.1 1.5 0.2 setosa
4 5.0 3.6 1.4 0.2 setosa
.. ... ... ... ... ...
145 6.7 3.0 5.2 2.3 virginica
146 6.3 2.5 5.0 1.9 virginica
147 6.5 3.0 5.2 2.0 virginica
148 6.2 3.4 5.4 2.3 virginica
149 5.9 3.0 5.1 1.8 virginica
[150 rows x 5 columns]
"Print Catalog {'iris': 'In Memory', 'A': 'In Memory'}"
*****************************
Finish node C([B]) -> [C]
sepal_length sepal_width petal_length petal_width species
0 5.1 3.5 1.4 0.2 setosa
1 4.9 3.0 1.4 0.2 setosa
2 4.7 3.2 1.3 0.2 setosa
3 4.6 3.1 1.5 0.2 setosa
4 5.0 3.6 1.4 0.2 setosa
.. ... ... ... ... ...
145 6.7 3.0 5.2 2.3 virginica
146 6.3 2.5 5.0 1.9 virginica
147 6.5 3.0 5.2 2.0 virginica
148 6.2 3.4 5.4 2.3 virginica
149 5.9 3.0 5.1 1.8 virginica
[150 rows x 5 columns]
"Print Catalog {'iris': 'In Memory', 'B': 'In Memory'}"
*****************************
{'C': 'Step3'}
The dataset is persisted throughout the entire pipeline, why? We can monkey patch the SequentialRunner
to check why is this happening.
from collections import Counter
from itertools import chain
from kedro.runner.runner import AbstractRunner, run_node
def _run(
self, pipeline, catalog, run_id = None
) -> None:
"""The method implementing sequential pipeline running.
Args:
pipeline: The ``Pipeline`` to run.
catalog: The ``DataCatalog`` from which to fetch data.
run_id: The id of the run.
Raises:
Exception: in case of any downstream node failure.
"""
nodes = pipeline.nodes
done_nodes = set()
load_counts = Counter(chain.from_iterable(n.inputs for n in nodes))
for exec_index, node in enumerate(nodes):
try:
run_node(node, catalog, self._is_async, run_id)
done_nodes.add(node)
except Exception:
self._suggest_resume_scenario(pipeline, done_nodes)
raise
# print load counts for every node run
pprint(f"{load_counts}")
print("pipeline input: ", pipeline.inputs())
print("pipeline output: ", pipeline.outputs())
# decrement load counts and release any data sets we've finished with
for data_set in node.inputs:
load_counts[data_set] -= 1
if load_counts[data_set] < 1 and data_set not in pipeline.inputs():
catalog.release(data_set)
for data_set in node.outputs:
if load_counts[data_set] < 1 and data_set not in pipeline.outputs():
catalog.release(data_set)
self._logger.info(
"Completed %d out of %d tasks", exec_index + 1, len(nodes)
)
SequentialRunner._run = _run
c:\programdata\miniconda3\lib\site-packages\ipykernel\ipkernel.py:283: DeprecationWarning: `should_run_async` will not call `transform_cell` automatically in the future. Please pass the result to `transformed_cell` argument and any exception that happen during thetransform in `preprocessing_exc_tuple` in IPython 7.17 and above.
and should_run_async(code)
Now we re-run the pipeline. Let’s reset the hook to only print related information.
class PrintHook:
@hook_impl
def after_node_run(self, node, catalog):
# adding extra behaviour to a single node
print(f"Finish node {node.name}")
print("*****************************")
hook_manager.set_blocked(debug_hook); # I tried hook_manger.unregister(), but it is not working.
print_hook = hook_manager.register(PrintHook())
# Create a runner to run the pipeline
runner = SequentialRunner()
# Run the pipeline
runner.run(pipeline, data_catalog);
Loading the Iris Dataset
Finish node A([iris]) -> [A]
*****************************
"Counter({'iris': 1, 'A': 1, 'B': 1})"
pipeline input: {'iris'}
pipeline output: {'C'}
Finish node B([A]) -> [B]
*****************************
"Counter({'A': 1, 'B': 1, 'iris': 0})"
pipeline input: {'iris'}
pipeline output: {'C'}
Finish node C([B]) -> [C]
*****************************
"Counter({'B': 1, 'iris': 0, 'A': 0})"
pipeline input: {'iris'}
pipeline output: {'C'}
So the reason why the iris data is kept becasue it is always in pipeline.inputs()
, which I think is not what we wanted.