(WIP) - Integrating Iceberg with Kedro for data versioning

python
kedro
Published

November 18, 2024

%load_ext autoreload
%autoreload 2
from kedro.io import AbstractVersionedDataset
import rich.pretty
rich.pretty.install()
import pandas as pd
%pip install pyiceberg
Note: you may need to restart the kernel to use updated packages.
!curl https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet -o /tmp/yellow_tripdata_2023-01.parquet
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 45.4M  100 45.4M    0     0  6605k      0  0:00:07  0:00:07 --:--:--  9.8M
from pyiceberg.catalog.sql import SqlCatalog
import pyiceberg
from pyiceberg.table import Table
from kedro.io.core import AbstractDataset, DatasetError
import pyarrow as pa


class PyIcebergDataset(AbstractDataset):
    def __init__(
        self, table_name, namespace="default", table_type="pandas", *args, **kwargs
    ):
        self._table: None | Table = None
        self._table_name = table_name
        self._namespace = namespace
        self.table_type = table_type

        warehouse_path = "/tmp/warehouse"
        catalog = SqlCatalog(
            namespace,
            **{
                "uri": f"sqlite:///{warehouse_path}/pyiceberg_catalog.db",
                "warehouse": f"file://{warehouse_path}",
            },
        )

        self.catalog = catalog

        # TODO: If table cannot be create, assume it's not created yet and not raise error
        try:
            self._table = self.catalog.load_table(
                f"{self._namespace}.{self._table_name}"
            )
        except:
            pass

    def save(self, data, version=None):
        arrow_table = pa.Table.from_pandas(data)
        if not self._table:
            # Create the table first
            self.catalog.create_namespace_if_not_exists(self._namespace)
            self._table = self.catalog.create_table(
                f"{self._namespace}.{self._table_name}", schema=arrow_table.schema
            )

        if self.table_type == "pandas":
            self._table.overwrite(arrow_table)
        else:
            raise NotImplementedError

    def _describe(self):
        ...

    def load(self, snapshot_id=None):
        if not self._table:
            raise DatasetError(
                "Iceberg Table not exist yet, make sure you use the `save` method to create a table first."
            )

        if self.table_type == "pandas":
            data = self._table.scan(snapshot_id=snapshot_id).to_pandas()
        return data

    # Shortcut to iceberg table `inspect` module
    def entries(self):
        return self._table.entries()

    def partitions(self):
        return self._table.partitions()

    def snapshots(self):
        return self._table.snapshots()

    def manifests(self):
        return self._table.manifests()

    def history(self):
        return self._table.history()

    def files(self):
        return self._table.files()

    def schema(self):
        return self._table.schema()

    def latest_version(self):
        return self._table.last_sequence_number

First create a table

dataset = PyIcebergDataset("taxi_dataset")
df = pd.read_parquet("/tmp/yellow_tripdata_2023-01.parquet")
# Downcast timestamp for compatability https://github.com/apache/iceberg-python/issues/1045#issuecomment-2445205707
import os
os.environ['PYICEBERG_DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE'] = 'true'
dataset.save(df)
Iceberg does not yet support 'ns' timestamp precision. Downcasting to 'us'.
Iceberg does not yet support 'ns' timestamp precision. Downcasting to 'us'.
Iceberg does not yet support 'ns' timestamp precision. Downcasting to 'us'.
Iceberg does not yet support 'ns' timestamp precision. Downcasting to 'us'.
/Users/Nok_Lam_Chan/miniconda3/envs/kedro/lib/python3.10/site-packages/pyiceberg/table/__init__.py:651: UserWarning: Delete operation did not match any records
  warnings.warn("Delete operation did not match any records")
Iceberg does not yet support 'ns' timestamp precision. Downcasting to 'us'.
Iceberg does not yet support 'ns' timestamp precision. Downcasting to 'us'.
dataset._table.last_sequence_number

1

Load Data

data = dataset.load()
data.head()

VendorID tpep_pickup_datetime tpep_dropoff_datetime passenger_count trip_distance RatecodeID store_and_fwd_flag PULocationID DOLocationID payment_type fare_amount extra mta_tax tip_amount tolls_amount improvement_surcharge total_amount congestion_surcharge airport_fee
0 2 2023-01-01 00:32:10 2023-01-01 00:40:36 1.0 0.97 1.0 N 161 141 2 9.3 1.00 0.5 0.00 0.0 1.0 14.30 2.5 0.00
1 2 2023-01-01 00:55:08 2023-01-01 01:01:27 1.0 1.10 1.0 N 43 237 1 7.9 1.00 0.5 4.00 0.0 1.0 16.90 2.5 0.00
2 2 2023-01-01 00:25:04 2023-01-01 00:37:49 1.0 2.51 1.0 N 48 238 1 14.9 1.00 0.5 15.00 0.0 1.0 34.90 2.5 0.00
3 1 2023-01-01 00:03:48 2023-01-01 00:13:25 0.0 1.90 1.0 N 138 7 1 12.1 7.25 0.5 0.00 0.0 1.0 20.85 0.0 1.25
4 2 2023-01-01 00:10:29 2023-01-01 00:21:19 1.0 1.43 1.0 N 107 79 1 11.4 1.00 0.5 3.28 0.0 1.0 19.68 2.5 0.00

Overwrite data

dataset.save(data)
Iceberg does not yet support 'ns' timestamp precision. Downcasting to 'us'.
Iceberg does not yet support 'ns' timestamp precision. Downcasting to 'us'.
Iceberg does not yet support 'ns' timestamp precision. Downcasting to 'us'.
Iceberg does not yet support 'ns' timestamp precision. Downcasting to 'us'.
dataset._table.last_sequence_number

3

Interestingly, the sequence number increase by 2 instead of 1. After some investigation, I couldn’t find any documentation that explains this in details. But there are some hints in this docstring of overwrite method.

        Shorthand for overwriting the table with a PyArrow table.

        An overwrite may produce zero or more snapshots based on the operation:

            - DELETE: In case existing Parquet files can be dropped completely.
            - REPLACE: In case existing Parquet files need to be rewritten.
            - APPEND: In case new data is being inserted into the table.

In this case, it seems to perform two operations DELETE and APPEND, thus two snapshots. We can inspect the snapshot history.

Inspecting Snapshots

dataset._table.snapshots()

[
    Snapshot(
        snapshot_id=8697304666575148681,
        parent_snapshot_id=None,
        sequence_number=1,
        timestamp_ms=1731930320934,
        manifest_list='file:///tmp/warehouse/default.db/taxi_dataset/metadata/snap-8697304666575148681-0-1f0a39a1-5d17-459d-8031-ab8fb52b5e5d.avro',
        summary=Summary(operation=Operation.APPEND),
        schema_id=0
    ),
    Snapshot(
        snapshot_id=3459262658777849345,
        parent_snapshot_id=8697304666575148681,
        sequence_number=2,
        timestamp_ms=1731930437601,
        manifest_list='file:///tmp/warehouse/default.db/taxi_dataset/metadata/snap-3459262658777849345-0-112bd0f2-7a28-48b2-8e81-607215460f8c.avro',
        summary=Summary(operation=Operation.DELETE),
        schema_id=0
    ),
    Snapshot(
        snapshot_id=8535010037908857370,
        parent_snapshot_id=3459262658777849345,
        sequence_number=3,
        timestamp_ms=1731930438271,
        manifest_list='file:///tmp/warehouse/default.db/taxi_dataset/metadata/snap-8535010037908857370-0-2bca8b47-b166-4ab8-99ee-81178d28f1d2.avro',
        summary=Summary(operation=Operation.APPEND),
        schema_id=0
    )
]

You can see that there are consecutive snapshots like this. Now let’s load the data to confirm this.

summary=Summary(operation=Operation.DELETE),
summary=Summary(operation=Operation.APPEND),

Let’s grab the snapshot_id and load the data.

Load Iceberg table with snapshot_id

_ = dataset.load(snapshot_id=3459262658777849345)
TypeError: PyIcebergDataset.load() got an unexpected keyword argument 'snapshot_id'
dataset.load??
'__main__.PyIcebergDataset' is a subclass of AbstractDataset and it must implement the '_describe' method following the signature of AbstractDataset's '_describe'.
Signature: dataset.load(snapshot_id=None) -> None
Docstring:
Loads data by delegation to the provided load method.

Returns:
    Data returned by the provided load method.

Raises:
    DatasetError: When underlying load method raises error.
Source:   
    def load(self, snapshot_id=None):
        if not self._table:
            raise DatasetError(
                "Iceberg Table not exist yet, make sure you use the `save` method to create a table first."
            )

        if self.table_type == "pandas":
            data = self._table.scan(snapshot_id=snapshot_id).to_pandas()
        return data
File:      /var/folders/qq/wpp9vnws3sqcv6v3td68qcpm0000gp/T/ipykernel_46934/2093655639.py
Type:      method

This is weird, I suspect this is due to how we wrap the load and save method. In general datasets use _load_args and _save_args, but it’s not convenient when you are actually developing interactively so I really want to just pass. For now I bypass this with internal method

_ = dataset._table.scan(snapshot_id=3459262658777849345).to_pandas()
_

VendorID tpep_pickup_datetime tpep_dropoff_datetime passenger_count trip_distance RatecodeID store_and_fwd_flag PULocationID DOLocationID payment_type fare_amount extra mta_tax tip_amount tolls_amount improvement_surcharge total_amount congestion_surcharge airport_fee

It looks like an empty table after DELETE, let’s check the other snapshot.

_ = dataset._table.scan(snapshot_id=8535010037908857370).to_pandas()
_.head()

VendorID tpep_pickup_datetime tpep_dropoff_datetime passenger_count trip_distance RatecodeID store_and_fwd_flag PULocationID DOLocationID payment_type fare_amount extra mta_tax tip_amount tolls_amount improvement_surcharge total_amount congestion_surcharge airport_fee
0 2 2023-01-01 00:32:10 2023-01-01 00:40:36 1.0 0.97 1.0 N 161 141 2 9.3 1.00 0.5 0.00 0.0 1.0 14.30 2.5 0.00
1 2 2023-01-01 00:55:08 2023-01-01 01:01:27 1.0 1.10 1.0 N 43 237 1 7.9 1.00 0.5 4.00 0.0 1.0 16.90 2.5 0.00
2 2 2023-01-01 00:25:04 2023-01-01 00:37:49 1.0 2.51 1.0 N 48 238 1 14.9 1.00 0.5 15.00 0.0 1.0 34.90 2.5 0.00
3 1 2023-01-01 00:03:48 2023-01-01 00:13:25 0.0 1.90 1.0 N 138 7 1 12.1 7.25 0.5 0.00 0.0 1.0 20.85 0.0 1.25
4 2 2023-01-01 00:10:29 2023-01-01 00:21:19 1.0 1.43 1.0 N 107 79 1 11.4 1.00 0.5 3.28 0.0 1.0 19.68 2.5 0.00

Questions

  • What does it means when we said ” if we can use Iceberg to map a single version number to code, parameters, and I/O data within Kedro and how it aligns with Kedro’s workflow.” Versioning code & parameters sounds more like versioning artifacts.
  • How to efficiently version data? overwrite is a completely re-write. For SQL engine this is implemented by the engine that utilise API like append, replace. With pandas/polars it is unclear if it’s possible. (Maybe be possible if it’s using something like ibis)
  • Incremental pipeline (and incremental data)
  • Version non-table type, i.e. parameters, code(?), Iceberg support only these three formats out of the box: Apache Parquet, Apache ORC, and Apache Avro. Parquet is the first-class citizen and the only format that people use in practice.