Advance Kedro Series - Digging into Dataset Memory Management and CacheDataSet

Kedro pipeline offers some nice feature like automatically release data in memory that is no longer need. How is this possible? Let’s dive deep into the code.
python
kedro
Author

noklam

Published

July 2, 2021

Today I am gonna explain some kedro internals to understnad how kedor manage your dataset. If you always write imperative python code, you may find that writing nodes and pipeline is a little bti awkward. They may seems less intuitive, however, it also enable some interesting featrue.

This article assumes you have basic understanding of kedro, I will focus on CacheDataSet and the auto-release dataset feature of kedro pipeline. It is useful to reduce your memory footprint without encountering the infamous Out of Memory (OOM) issue.

To start with, we have the default iris dataset. Normally we would do it in a YAML file, but to make things easier in Notebook, I’ll keep everything compact in a notebook.

import kedro
kedro.__version__
'0.17.4'
from kedro.io import DataCatalog, MemoryDataSet, CachedDataSet
from kedro.extras.datasets.pandas import CSVDataSet
from kedro.pipeline import node, Pipeline
from kedro.runner import SequentialRunner

# Prepare a data catalog
data_catalog = DataCatalog({"iris": CSVDataSet('data/01_raw/iris.csv')})

Next, we have a pipeline follows this execution order: A -> B -> C

from kedro.pipeline import Pipeline, node
import pandas as pd


def A(df):
    print('Loading the Iris Dataset')
    return 'Step1'


def B(dummy):
    return 'Step2'


def C(dummy):
    return 'Step3'


pipeline = Pipeline([node(A, "iris", "A"),
                     node(B, "A", "B"),
                     node(C, "B", "C"),
                    ])
c:\programdata\miniconda3\lib\site-packages\ipykernel\ipkernel.py:283: DeprecationWarning: `should_run_async` will not call `transform_cell` automatically in the future. Please pass the result to `transformed_cell` argument and any exception that happen during thetransform in `preprocessing_exc_tuple` in IPython 7.17 and above.
  and should_run_async(code)

To zoom in to the pipeline, we can use Hook to print out the catalog after every node’s run.

from kedro.framework.hooks import hook_impl
from kedro.framework.hooks import get_hook_manager
from pprint import pprint

def apply_dict(d):
    new_dict = {}
    for k, v in d.items():
        if isinstance(v, CachedDataSet):
            if v._cache.exists():
                print(v._cache._data)
                new_dict[k] = 'In Memory'
            else:
                new_dict[k] ='Cache Deleted'
        elif v.exists():
            new_dict[k] = 'In Memory'
    return new_dict


class DebugHook:
    """A hook class for creating a post mortem debugging with the PDB debugger
    whenever an error is triggered within a pipeline. The local scope from when the
    exception occured is available within this debugging session.
    """
    @hook_impl
    def after_node_run(self, node, catalog):
        # adding extra behaviour to a single node
        print(f"Finish node {node.name}")
        pprint(f"Print Catalog {apply_dict(catalog._data_sets)}")
#         pprint(f"Print Catalog {apply_dict2(lambda x:x.exists(), catalog._data_sets)}")
        print("*****************************")

hook_manager = get_hook_manager()
debug_hook = hook_manager.register(DebugHook());

This hook will print out dataset that exist in data catalog. It is a bit tricky because kedro did not delete the dataset, it marked the underlying data as _EMPTY object instead.

# Create a runner to run the pipeline
runner = SequentialRunner()

# Run the pipeline
runner.run(pipeline, data_catalog);
Loading the Iris Dataset
Finish node A([iris]) -> [A]
"Print Catalog {'iris': 'In Memory'}"
*****************************
Finish node B([A]) -> [B]
"Print Catalog {'iris': 'In Memory', 'A': 'In Memory'}"
*****************************
Finish node C([B]) -> [C]
"Print Catalog {'iris': 'In Memory', 'B': 'In Memory'}"
*****************************

Let’s have a look at what happened when a SequentialRunner runs a pipeline.

It is interesting to note that kedro takes a similar approach to Python, it uses reference counting to control the dataset life cycle. If you are interested, I have another post to dive into Python Memory Management.

            # decrement load counts and release any data sets we've finished with
            for data_set in node.inputs:
                load_counts[data_set] -= 1
                if load_counts[data_set] < 1 and data_set not in pipeline.inputs():
                    catalog.release(data_set)
            for data_set in node.outputs:
                if load_counts[data_set] < 1 and data_set not in pipeline.outputs():
                    catalog.release(data_set)

CacheDataSet

What does release do? It will remove the underlying data if this data is stored in memory.

# In CSVDataSet
https://github.com/quantumblacklabs/kedro/blob/master/kedro/extras/datasets/pandas/csv_dataset.py#L176-L178
```python
def _release(self) -> None:
    super()._release()
    self._invalidate_cache()
# In CacheDataSet
def _release(self) -> None:
    self._cache.release()
    self._dataset.release()
# In MemoryDataSet
def _release(self) -> None:
    self._data = _EMPTY

First, we can test if it works as expected.

d = CachedDataSet(CSVDataSet('data/01_raw/iris.csv'))
d.load()
d._cache._data.head()
sepal_length sepal_width petal_length petal_width species
0 5.1 3.5 1.4 0.2 setosa
1 4.9 3.0 1.4 0.2 setosa
2 4.7 3.2 1.3 0.2 setosa
3 4.6 3.1 1.5 0.2 setosa
4 5.0 3.6 1.4 0.2 setosa
d.exists()
c:\programdata\miniconda3\lib\site-packages\ipykernel\ipkernel.py:283: DeprecationWarning: `should_run_async` will not call `transform_cell` automatically in the future. Please pass the result to `transformed_cell` argument and any exception that happen during thetransform in `preprocessing_exc_tuple` in IPython 7.17 and above.
  and should_run_async(code)
True
d.release()
d._cache.exists()
False

This is the expected behavior, where the cache should be released. However, it seems not to be the case when I run the pipeline.

data_catalog = DataCatalog({"iris": CachedDataSet(CSVDataSet('data/01_raw/iris.csv'))})
runner.run(pipeline, data_catalog)
Loading the Iris Dataset
Finish node A([iris]) -> [A]
     sepal_length  sepal_width  petal_length  petal_width    species
0             5.1          3.5           1.4          0.2     setosa
1             4.9          3.0           1.4          0.2     setosa
2             4.7          3.2           1.3          0.2     setosa
3             4.6          3.1           1.5          0.2     setosa
4             5.0          3.6           1.4          0.2     setosa
..            ...          ...           ...          ...        ...
145           6.7          3.0           5.2          2.3  virginica
146           6.3          2.5           5.0          1.9  virginica
147           6.5          3.0           5.2          2.0  virginica
148           6.2          3.4           5.4          2.3  virginica
149           5.9          3.0           5.1          1.8  virginica

[150 rows x 5 columns]
"Print Catalog {'iris': 'In Memory'}"
*****************************
Finish node B([A]) -> [B]
     sepal_length  sepal_width  petal_length  petal_width    species
0             5.1          3.5           1.4          0.2     setosa
1             4.9          3.0           1.4          0.2     setosa
2             4.7          3.2           1.3          0.2     setosa
3             4.6          3.1           1.5          0.2     setosa
4             5.0          3.6           1.4          0.2     setosa
..            ...          ...           ...          ...        ...
145           6.7          3.0           5.2          2.3  virginica
146           6.3          2.5           5.0          1.9  virginica
147           6.5          3.0           5.2          2.0  virginica
148           6.2          3.4           5.4          2.3  virginica
149           5.9          3.0           5.1          1.8  virginica

[150 rows x 5 columns]
"Print Catalog {'iris': 'In Memory', 'A': 'In Memory'}"
*****************************
Finish node C([B]) -> [C]
     sepal_length  sepal_width  petal_length  petal_width    species
0             5.1          3.5           1.4          0.2     setosa
1             4.9          3.0           1.4          0.2     setosa
2             4.7          3.2           1.3          0.2     setosa
3             4.6          3.1           1.5          0.2     setosa
4             5.0          3.6           1.4          0.2     setosa
..            ...          ...           ...          ...        ...
145           6.7          3.0           5.2          2.3  virginica
146           6.3          2.5           5.0          1.9  virginica
147           6.5          3.0           5.2          2.0  virginica
148           6.2          3.4           5.4          2.3  virginica
149           5.9          3.0           5.1          1.8  virginica

[150 rows x 5 columns]
"Print Catalog {'iris': 'In Memory', 'B': 'In Memory'}"
*****************************
{'C': 'Step3'}

The dataset is persisted throughout the entire pipeline, why? We can monkey patch the SequentialRunner to check why is this happening.

A potential bug or undesired beahvior?

from collections import Counter
from itertools import chain
from kedro.runner.runner import AbstractRunner, run_node

def _run(
    self, pipeline, catalog, run_id = None
) -> None:
    """The method implementing sequential pipeline running.

    Args:
        pipeline: The ``Pipeline`` to run.
        catalog: The ``DataCatalog`` from which to fetch data.
        run_id: The id of the run.

    Raises:
        Exception: in case of any downstream node failure.
    """
    nodes = pipeline.nodes
    done_nodes = set()

    load_counts = Counter(chain.from_iterable(n.inputs for n in nodes))

    for exec_index, node in enumerate(nodes):
        try:
            run_node(node, catalog, self._is_async, run_id)
            done_nodes.add(node)
        except Exception:
            self._suggest_resume_scenario(pipeline, done_nodes)
            raise

        # print load counts for every node run
        pprint(f"{load_counts}")
        print("pipeline input: ", pipeline.inputs())
        print("pipeline output: ", pipeline.outputs())

        # decrement load counts and release any data sets we've finished with
        for data_set in node.inputs:
            load_counts[data_set] -= 1
            if load_counts[data_set] < 1 and data_set not in pipeline.inputs():
                catalog.release(data_set)
        for data_set in node.outputs:
            if load_counts[data_set] < 1 and data_set not in pipeline.outputs():
                catalog.release(data_set)

        self._logger.info(
            "Completed %d out of %d tasks", exec_index + 1, len(nodes)
        )

SequentialRunner._run = _run
c:\programdata\miniconda3\lib\site-packages\ipykernel\ipkernel.py:283: DeprecationWarning: `should_run_async` will not call `transform_cell` automatically in the future. Please pass the result to `transformed_cell` argument and any exception that happen during thetransform in `preprocessing_exc_tuple` in IPython 7.17 and above.
  and should_run_async(code)

Now we re-run the pipeline. Let’s reset the hook to only print related information.

class PrintHook:
    @hook_impl
    def after_node_run(self, node, catalog):
        # adding extra behaviour to a single node
        print(f"Finish node {node.name}")
        print("*****************************")


hook_manager.set_blocked(debug_hook); # I tried hook_manger.unregister(), but it is not working.
print_hook = hook_manager.register(PrintHook())
# Create a runner to run the pipeline
runner = SequentialRunner()

# Run the pipeline
runner.run(pipeline, data_catalog);
Loading the Iris Dataset
Finish node A([iris]) -> [A]
*****************************
"Counter({'iris': 1, 'A': 1, 'B': 1})"
pipeline input:  {'iris'}
pipeline output:  {'C'}
Finish node B([A]) -> [B]
*****************************
"Counter({'A': 1, 'B': 1, 'iris': 0})"
pipeline input:  {'iris'}
pipeline output:  {'C'}
Finish node C([B]) -> [C]
*****************************
"Counter({'B': 1, 'iris': 0, 'A': 0})"
pipeline input:  {'iris'}
pipeline output:  {'C'}

Conclusion

So the reason why the iris data is kept becasue it is always in pipeline.inputs(), which I think is not what we wanted.