from kedro.pipeline import pipeline, node
from kedro.pipeline.node import Node
def foo():
return "bar"
Kedro Node and Pipelines
Kedro introduces the concepts of Nodes and Pipelines. A basic understanding of these concepts is assumed. However, if you’re unfamiliar, you can refer to the Nodes and Pipelines documentation for more details.
In essence, a Kedro Node acts as a thin wrapper around a Python function, specifying its inputs and outputs. On the other hand, a Pipeline is essentially a collection of Nodes that are strung together. When a pipeline is executed, Kedro resolves the dependencies between nodes to determine the correct order of execution.
While Kedro is primarily designed for data and machine learning applications, it can be utilized for executing any sequential tasks, including parallel processing if needed.
= node(func=foo, inputs=None, outputs="output_a")
node_a = pipeline([])
first_pipeline node_a, first_pipeline
(Node(foo, None, 'output_a', None), Pipeline([]))
pipeline
is a factory method that expects a list of Node
and produce the Pipeline
object. In this example, we have an empty Pipeline
. Below is another valid example:
pipeline([node_a])
Pipeline([
Node(foo, None, 'output_a', None)
])
Node Uniqueness
The pipeline in Kedro automatically validates Node instances. Specifically, nodes cannot produce the same output (though they can share the same input), and there cannot be duplicate nodes within the pipeline. This validation is crucial to ensure that the pipeline forms an executable Directed Acyclic Graph (DAG), allowing for proper execution and preventing any cyclic dependencies.
pipeline([node_a, node_a])
ValueError: Pipeline nodes must have unique names. The following node names appear more than once:
Free nodes:
- foo(None) -> [output_a]
You can name your nodes using the last argument of 'node()'.
On the other hand, Node
are considered equal if they have the same inputs
, outputs
and function
(and node name if provided, it is an optional argument)
= node(foo, inputs=None, outputs="output_a") node_b
== node_a node_b
True
Internally, it is comparing the name
attribute, which is a combination of namespace, function name, inputs and outputs. This is not important to most Kedro users and are only used by Kedro internally.
node_a.name
'foo(None) -> [output_a]'
__str__?? Node.
Signature: Node.__str__(self)
Docstring: Return str(self).
Source:
def __str__(self):
def _set_to_str(xset):
return f"[{','.join(xset)}]"
out_str = _set_to_str(self.outputs) if self._outputs else "None"
in_str = _set_to_str(self.inputs) if self._inputs else "None"
prefix = self._name + ": " if self._name else ""
return prefix + f"{self._func_name}({in_str}) -> {out_str}"
File: ~/miniconda3/envs/blog/lib/python3.10/site-packages/kedro/pipeline/node.py
Type: function
Pipeline Arithmetic
The closest analogy to Pipeline
is the Python set
. They share simliary characteristics: - The elements cannot be repeated. - Pipelines can be added or subtracted
+ pipeline([node_a]) pipeline([node_a])
Pipeline([
Node(foo, None, 'output_a', None)
])
= node(foo, None, "a")
a = node(foo, None, "b")
b = node(foo, None, "c")
c = node(foo, None, "d")
d
= set(["a","b","c"])
original_set = pipeline([a,b,c]) original_pipeline
+ pipeline([b]) pipeline([a])
Pipeline([
Node(foo, None, 'a', None),
Node(foo, None, 'b', None)
])
- pipeline([b]) pipeline([a, b])
Pipeline([
Node(foo, None, 'a', None)
])
- pipeline([a]) pipeline([a, b])
Pipeline([
Node(foo, None, 'b', None)
])
| set(["b","c","d"]) original_set
{'a', 'b', 'c', 'd'}
| pipeline([b,c,d]) # nodes in both pipelines pipeline([a,b,c])
Pipeline([
Node(foo, None, 'a', None),
Node(foo, None, 'b', None),
Node(foo, None, 'c', None),
Node(foo, None, 'd', None)
])
& set(["b","c","d"]) original_set
{'b', 'c'}
& pipeline([b,c,d]) # only nodes that exist in both pipelines pipeline([a,b,c])
Pipeline([
Node(foo, None, 'b', None),
Node(foo, None, 'c', None)
])
Pipeline arithmetic is more useful for pipeline registration i.e. pipeline_registry.py
. For example, you can combine your development pipeline and inference pipeline in different way.
def fake_node(name):
return node(foo, inputs=None, outputs=name, name=name)
# For simplicaition, let's assume each pipeline is just one single node.
= pipeline([fake_node("spark")])
spark_pipeline = pipeline([fake_node("feature_engineering")])
feature_engineering = pipeline([fake_node("model_pipeline")])
model_training = pipeline([fake_node("inference")]) inference
With 4 base pipelines, you can combined them in different ways. For example you want a e2e pipeline which add all of them.
= spark_pipeline + feature_engineering + model_training + inference e2e
You can also have a local
pipeline that skip only the spark
pipeline.
= e2e - spark_pipeline
local local
Pipeline([
Node(foo, None, 'feature_engineering', 'feature_engineering'),
Node(foo, None, 'inference', 'inference'),
Node(foo, None, 'model_pipeline', 'model_pipeline')
])
Advance Pipeline Slicing
Kedro provides an interaction visualisation that you can play around with, for this post I am gonna stick with the demo project and explains concepts about Pipeline and how you can slice pipeline and compose them.
#hide
%load_ext kedro.ipython
%cd /Users/Nok_Lam_Chan/dev/kedro-viz/demo-project
%reload_kedro /Users/Nok_Lam_Chan/dev/kedro-viz/demo-project
INFO Kedro project modular-spaceflights __init__.py:134
INFO Defined global variable 'context', 'session', 'catalog' and __init__.py:135 'pipelines'
By using the reload_kedro
inside a notebook, you can access the project pipelines
object. Let’s say I want to filter out the highlighted pipeline like this (click on the “Create Derived Features”):
To filter this with the Pipeline
API, you need two options. from-nodes
(downstream) and to-nodes
(upstream).
pipelines.keys()
dict_keys(['__default__', 'Data ingestion', 'Modelling stage', 'Feature engineering', 'Reporting stage', 'Pre-modelling'])
full_pipeline
Pipeline([ Node(apply_types_to_companies, 'companies', 'ingestion.int_typed_companies', 'apply_types_to_companies'), Node(apply_types_to_reviews, ['reviews', 'params:ingestion.typing.reviews.columns_as_floats'], 'ingestion.int_typed_reviews', 'apply_types_to_reviews'), Node(apply_types_to_shuttles, 'shuttles', 'ingestion.int_typed_shuttles@pandas1', 'apply_types_to_shuttles'), Node(aggregate_company_data, 'ingestion.int_typed_companies', 'ingestion.prm_agg_companies', 'company_agg'), Node(combine_shuttle_level_information, {'shuttles': 'ingestion.int_typed_shuttles@pandas2', 'reviews': 'ingestion.int_typed_reviews', 'companies': 'ingestion.prm_agg_companies'}, ['prm_shuttle_company_reviews', 'prm_spine_table'], 'combine_step'), Node(create_derived_features, ['prm_spine_table', 'prm_shuttle_company_reviews', 'params:feature_engineering.feature.derived'], 'feature_engineering.feat_derived_features', 'create_derived_features'), Node(create_feature_importance, 'prm_spine_table', 'feature_importance_output', None), Node(create_static_features, ['prm_shuttle_company_reviews', 'params:feature_engineering.feature.static'], 'feature_engineering.feat_static_features', None), Node(<lambda>, 'prm_spine_table', 'ingestion.prm_spine_table_clone', None), Node(create_matplotlib_chart, 'prm_shuttle_company_reviews', 'reporting.confusion_matrix', None), ... ])
= "feature_engineering.create_derived_features" # make s|apipeline
node_name filter(from_nodes=[node_name], to_nodes=[node_name]) full_pipeline.
Pipeline([ Node(create_derived_features, ['prm_spine_table', 'prm_shuttle_company_reviews', 'params:feature_engineering.feature.derived'], 'feature_engineering.feat_derived_features', 'create_derived_features') ])
This only select one node because by default the filter
method apply both method as an and
condition. So we need to apply the filter
method separately.
filter(from_nodes=[node_name]) | full_pipeline.filter(to_nodes=[node_name]) full_pipeline.
Pipeline([ Node(apply_types_to_companies, 'companies', 'ingestion.int_typed_companies', 'apply_types_to_companies'), Node(apply_types_to_reviews, ['reviews', 'params:ingestion.typing.reviews.columns_as_floats'], 'ingestion.int_typed_reviews', 'apply_types_to_reviews'), Node(apply_types_to_shuttles, 'shuttles', 'ingestion.int_typed_shuttles@pandas1', 'apply_types_to_shuttles'), Node(aggregate_company_data, 'ingestion.int_typed_companies', 'ingestion.prm_agg_companies', 'company_agg'), Node(combine_shuttle_level_information, {'shuttles': 'ingestion.int_typed_shuttles@pandas2', 'reviews': 'ingestion.int_typed_reviews', 'companies': 'ingestion.prm_agg_companies'}, ['prm_shuttle_company_reviews', 'prm_spine_table'], 'combine_step'), Node(create_derived_features, ['prm_spine_table', 'prm_shuttle_company_reviews', 'params:feature_engineering.feature.derived'], 'feature_engineering.feat_derived_features', 'create_derived_features'), Node(joiner, ['prm_spine_table', 'feature_engineering.feat_static_features', 'feature_engineering.feat_derived_features'], 'model_input_table', None), Node(split_data, ['model_input_table', 'params:split_options'], ['X_train', 'X_test', 'y_train', 'y_test'], None), Node(train_model, ['X_train', 'y_train', 'params:train_evaluation.model_options.linear_regression'], ['train_evaluation.linear_regression.regressor', 'train_evaluation.linear_regression.experiment_params'], None), Node(train_model, ['X_train', 'y_train', 'params:train_evaluation.model_options.random_forest'], ['train_evaluation.random_forest.regressor', 'train_evaluation.random_forest.experiment_params'], None), ... ])
Now we get the correct filtered pipeline as expected.
More notes
The Pipeline.filter
support or
operator
While the current filter supports many options, there may be some value to wrap around the Pipeline API to support things like or
. This is only possible if you use the Python API directly but not CLI (with the example above). maybe something similar to the Graph Operators in dbt.
e.g.
kedro run --select "my_model+" # select my_model and all children
kedro run --select "+my_model" # select my_model and all parents
kedro run --select "+my_model+" # select my_model, and all of its parents and children
Selecting or slicing multiple pipeline with kedro run
Since Pipeline API support arithmetic, it would be quite straight forward to support things like kedro run --pipeline a+b
or kedro run --pipeline a-b
. Let’s have a look what’s options are available for the CLI.
!kedro run --help
Usage: kedro run [OPTIONS]
Run the pipeline.
Options:
--from-inputs TEXT A list of dataset names which should be used as a
starting point.
--to-outputs TEXT A list of dataset names which should be used as
an end point.
--from-nodes TEXT A list of node names which should be used as a
starting point.
--to-nodes TEXT A list of node names which should be used as an
end point.
-n, --nodes TEXT Run only nodes with specified names.
-r, --runner TEXT Specify a runner that you want to run the
pipeline with. Available runners:
'SequentialRunner', 'ParallelRunner' and
'ThreadRunner'.
--async Load and save node inputs and outputs
asynchronously with threads. If not specified,
load and save datasets synchronously.
-e, --env TEXT Kedro configuration environment name. Defaults to
`local`.
-t, --tags TEXT Construct the pipeline using only nodes which
have this tag attached. Option can be used
multiple times, what results in a pipeline
constructed from nodes having any of those tags.
-lv, --load-versions TEXT Specify a particular dataset version (timestamp)
for loading.
-p, --pipeline TEXT Name of the registered pipeline to run. If not
set, the '__default__' pipeline is run.
-ns, --namespace TEXT Name of the node namespace to run.
-c, --config FILE Specify a YAML configuration file to load the run
command arguments from. If command line arguments
are provided, they will override the loaded ones.
--conf-source PATH Path of a directory where project configuration
is stored.
--params TEXT Specify extra parameters that you want to pass to
the context initialiser. Items must be separated
by comma, keys - by colon or equals sign,
example: param1=value1,param2=value2. Each
parameter is split by the first comma, so
parameter values are allowed to contain colons,
parameter keys are not. To pass a nested
dictionary as parameter, separate keys by '.',
example: param_group.param1:value1.
-h, --help Show this message and exit.
This is what happen when you do kedro run -p training -t model_a
, it’s a two steps flitering: 1. Apply the -p
pipeline name to select a key from the pipeline dictionary, it’s just pipelines[pipeline_name]
, note this mean you can only select ONE pipeline at a time. 2. The pipeline is then further filtered with Pipeline.filter
from kedro.pipeline.pipeline import Pipeline
filter?? Pipeline.
Signature:
Pipeline.filter(
self,
tags: 'Iterable[str] | None' = None,
from_nodes: 'Iterable[str] | None' = None,
to_nodes: 'Iterable[str] | None' = None,
node_names: 'Iterable[str] | None' = None,
from_inputs: 'Iterable[str] | None' = None,
to_outputs: 'Iterable[str] | None' = None,
node_namespace: 'str | None' = None,
) -> 'Pipeline'
Source:
def filter( # noqa: PLR0913
self,
tags: Iterable[str] | None = None,
from_nodes: Iterable[str] | None = None,
to_nodes: Iterable[str] | None = None,
node_names: Iterable[str] | None = None,
from_inputs: Iterable[str] | None = None,
to_outputs: Iterable[str] | None = None,
node_namespace: str | None = None,
) -> Pipeline:
"""Creates a new ``Pipeline`` object with the nodes that meet all of the
specified filtering conditions.
The new pipeline object is the intersection of pipelines that meet each
filtering condition. This is distinct from chaining multiple filters together.
Args:
tags: A list of node tags which should be used to lookup
the nodes of the new ``Pipeline``.
from_nodes: A list of node names which should be used as a
starting point of the new ``Pipeline``.
to_nodes: A list of node names which should be used as an
end point of the new ``Pipeline``.
node_names: A list of node names which should be selected for the
new ``Pipeline``.
from_inputs: A list of inputs which should be used as a starting point
of the new ``Pipeline``
to_outputs: A list of outputs which should be the final outputs of
the new ``Pipeline``.
node_namespace: One node namespace which should be used to select
nodes in the new ``Pipeline``.
Returns:
A new ``Pipeline`` object with nodes that meet all of the specified
filtering conditions.
Raises:
ValueError: The filtered ``Pipeline`` has no nodes.
Example:
::
>>> pipeline = Pipeline(
>>> [
>>> node(func, "A", "B", name="node1"),
>>> node(func, "B", "C", name="node2"),
>>> node(func, "C", "D", name="node3"),
>>> ]
>>> )
>>> pipeline.filter(node_names=["node1", "node3"], from_inputs=["A"])
>>> # Gives a new pipeline object containing node1 and node3.
"""
# Use [node_namespace] so only_nodes_with_namespace can follow the same
# *filter_args pattern as the other filtering methods, which all take iterables.
node_namespace_iterable = [node_namespace] if node_namespace else None
filter_methods = {
self.only_nodes_with_tags: tags,
self.from_nodes: from_nodes,
self.to_nodes: to_nodes,
self.only_nodes: node_names,
self.from_inputs: from_inputs,
self.to_outputs: to_outputs,
self.only_nodes_with_namespace: node_namespace_iterable,
}
subset_pipelines = {
filter_method(*filter_args) # type: ignore
for filter_method, filter_args in filter_methods.items()
if filter_args
}
# Intersect all the pipelines subsets. We apply each filter to the original
# pipeline object (self) rather than incrementally chaining filter methods
# together. Hence the order of filtering does not affect the outcome, and the
# resultant pipeline is unambiguously defined.
# If this were not the case then, for example,
# pipeline.filter(node_names=["node1", "node3"], from_inputs=["A"])
# would give different outcomes depending on the order of filter methods:
# only_nodes and then from_inputs would give node1, while only_nodes and then
# from_inputs would give node1 and node3.
filtered_pipeline = Pipeline(self.nodes)
for subset_pipeline in subset_pipelines:
filtered_pipeline &= subset_pipeline
if not filtered_pipeline.nodes:
raise ValueError(
"Pipeline contains no nodes after applying all provided filters"
)
return filtered_pipeline
File: ~/dev/kedro/kedro/pipeline/pipeline.py
Type: function
This means that, if you have tags applied across multiple pipeline, you cannot filter it by tag, unless you apply the filter in the largest pipeline that contains all nodes. What if we can support things like: kedro run -p feature+training -t model_a
?