%load_ext autoreload
%autoreload 2
from kedro.io import AbstractVersionedDataset
import rich.pretty
rich.pretty.install()import pandas as pd
%pip install pyiceberg
Note: you may need to restart the kernel to use updated packages.
!curl https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet -o /tmp/yellow_tripdata_2023-01.parquet
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 45.4M 100 45.4M 0 0 6605k 0 0:00:07 0:00:07 --:--:-- 9.8M
from pyiceberg.catalog.sql import SqlCatalog
import pyiceberg
from pyiceberg.table import Table
from kedro.io.core import AbstractDataset, DatasetError
import pyarrow as pa
class PyIcebergDataset(AbstractDataset):
def __init__(
self, table_name, namespace="default", table_type="pandas", *args, **kwargs
):self._table: None | Table = None
self._table_name = table_name
self._namespace = namespace
self.table_type = table_type
= "/tmp/warehouse"
warehouse_path = SqlCatalog(
catalog
namespace,**{
"uri": f"sqlite:///{warehouse_path}/pyiceberg_catalog.db",
"warehouse": f"file://{warehouse_path}",
},
)
self.catalog = catalog
# TODO: If table cannot be create, assume it's not created yet and not raise error
try:
self._table = self.catalog.load_table(
f"{self._namespace}.{self._table_name}"
)except:
pass
def save(self, data, version=None):
= pa.Table.from_pandas(data)
arrow_table if not self._table:
# Create the table first
self.catalog.create_namespace_if_not_exists(self._namespace)
self._table = self.catalog.create_table(
f"{self._namespace}.{self._table_name}", schema=arrow_table.schema
)
if self.table_type == "pandas":
self._table.overwrite(arrow_table)
else:
raise NotImplementedError
def _describe(self):
...
def load(self, snapshot_id=None):
if not self._table:
raise DatasetError(
"Iceberg Table not exist yet, make sure you use the `save` method to create a table first."
)
if self.table_type == "pandas":
= self._table.scan(snapshot_id=snapshot_id).to_pandas()
data return data
# Shortcut to iceberg table `inspect` module
def entries(self):
return self._table.entries()
def partitions(self):
return self._table.partitions()
def snapshots(self):
return self._table.snapshots()
def manifests(self):
return self._table.manifests()
def history(self):
return self._table.history()
def files(self):
return self._table.files()
def schema(self):
return self._table.schema()
def latest_version(self):
return self._table.last_sequence_number
First create a table
= PyIcebergDataset("taxi_dataset")
dataset = pd.read_parquet("/tmp/yellow_tripdata_2023-01.parquet") df
# Downcast timestamp for compatability https://github.com/apache/iceberg-python/issues/1045#issuecomment-2445205707
import os
'PYICEBERG_DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE'] = 'true' os.environ[
dataset.save(df)
Iceberg does not yet support 'ns' timestamp precision. Downcasting to 'us'.
Iceberg does not yet support 'ns' timestamp precision. Downcasting to 'us'.
Iceberg does not yet support 'ns' timestamp precision. Downcasting to 'us'.
Iceberg does not yet support 'ns' timestamp precision. Downcasting to 'us'.
/Users/Nok_Lam_Chan/miniconda3/envs/kedro/lib/python3.10/site-packages/pyiceberg/table/__init__.py:651: UserWarning: Delete operation did not match any records
warnings.warn("Delete operation did not match any records")
Iceberg does not yet support 'ns' timestamp precision. Downcasting to 'us'.
Iceberg does not yet support 'ns' timestamp precision. Downcasting to 'us'.
dataset._table.last_sequence_number
1
Load Data
= dataset.load() data
data.head()
VendorID | tpep_pickup_datetime | tpep_dropoff_datetime | passenger_count | trip_distance | RatecodeID | store_and_fwd_flag | PULocationID | DOLocationID | payment_type | fare_amount | extra | mta_tax | tip_amount | tolls_amount | improvement_surcharge | total_amount | congestion_surcharge | airport_fee | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 2 | 2023-01-01 00:32:10 | 2023-01-01 00:40:36 | 1.0 | 0.97 | 1.0 | N | 161 | 141 | 2 | 9.3 | 1.00 | 0.5 | 0.00 | 0.0 | 1.0 | 14.30 | 2.5 | 0.00 |
1 | 2 | 2023-01-01 00:55:08 | 2023-01-01 01:01:27 | 1.0 | 1.10 | 1.0 | N | 43 | 237 | 1 | 7.9 | 1.00 | 0.5 | 4.00 | 0.0 | 1.0 | 16.90 | 2.5 | 0.00 |
2 | 2 | 2023-01-01 00:25:04 | 2023-01-01 00:37:49 | 1.0 | 2.51 | 1.0 | N | 48 | 238 | 1 | 14.9 | 1.00 | 0.5 | 15.00 | 0.0 | 1.0 | 34.90 | 2.5 | 0.00 |
3 | 1 | 2023-01-01 00:03:48 | 2023-01-01 00:13:25 | 0.0 | 1.90 | 1.0 | N | 138 | 7 | 1 | 12.1 | 7.25 | 0.5 | 0.00 | 0.0 | 1.0 | 20.85 | 0.0 | 1.25 |
4 | 2 | 2023-01-01 00:10:29 | 2023-01-01 00:21:19 | 1.0 | 1.43 | 1.0 | N | 107 | 79 | 1 | 11.4 | 1.00 | 0.5 | 3.28 | 0.0 | 1.0 | 19.68 | 2.5 | 0.00 |
Overwrite data
dataset.save(data)
Iceberg does not yet support 'ns' timestamp precision. Downcasting to 'us'.
Iceberg does not yet support 'ns' timestamp precision. Downcasting to 'us'.
Iceberg does not yet support 'ns' timestamp precision. Downcasting to 'us'.
Iceberg does not yet support 'ns' timestamp precision. Downcasting to 'us'.
dataset._table.last_sequence_number
3
Interestingly, the sequence number increase by 2 instead of 1. After some investigation, I couldn’t find any documentation that explains this in details. But there are some hints in this docstring of overwrite
method.
Shorthand for overwriting the table with a PyArrow table.
An overwrite may produce zero or more snapshots based on the operation:
- DELETE: In case existing Parquet files can be dropped completely.
- REPLACE: In case existing Parquet files need to be rewritten.
- APPEND: In case new data is being inserted into the table.
In this case, it seems to perform two operations DELETE
and APPEND
, thus two snapshots. We can inspect the snapshot history.
Inspecting Snapshots
dataset._table.snapshots()
[ Snapshot( snapshot_id=8697304666575148681, parent_snapshot_id=None, sequence_number=1, timestamp_ms=1731930320934, manifest_list='file:///tmp/warehouse/default.db/taxi_dataset/metadata/snap-8697304666575148681-0-1f0a39a1-5d17-459d-8031-ab8fb52b5e5d.avro', summary=Summary(operation=Operation.APPEND), schema_id=0 ), Snapshot( snapshot_id=3459262658777849345, parent_snapshot_id=8697304666575148681, sequence_number=2, timestamp_ms=1731930437601, manifest_list='file:///tmp/warehouse/default.db/taxi_dataset/metadata/snap-3459262658777849345-0-112bd0f2-7a28-48b2-8e81-607215460f8c.avro', summary=Summary(operation=Operation.DELETE), schema_id=0 ), Snapshot( snapshot_id=8535010037908857370, parent_snapshot_id=3459262658777849345, sequence_number=3, timestamp_ms=1731930438271, manifest_list='file:///tmp/warehouse/default.db/taxi_dataset/metadata/snap-8535010037908857370-0-2bca8b47-b166-4ab8-99ee-81178d28f1d2.avro', summary=Summary(operation=Operation.APPEND), schema_id=0 ) ]
You can see that there are consecutive snapshots like this. Now let’s load the data to confirm this.
summary=Summary(operation=Operation.DELETE),
summary=Summary(operation=Operation.APPEND),
Let’s grab the snapshot_id and load the data.
Load Iceberg table with snapshot_id
= dataset.load(snapshot_id=3459262658777849345) _
TypeError: PyIcebergDataset.load() got an unexpected keyword argument 'snapshot_id'
dataset.load??
'__main__.PyIcebergDataset' is a subclass of AbstractDataset and it must implement the '_describe' method following the signature of AbstractDataset's '_describe'.
Signature: dataset.load(snapshot_id=None) -> None
Docstring:
Loads data by delegation to the provided load method.
Returns:
Data returned by the provided load method.
Raises:
DatasetError: When underlying load method raises error.
Source:
def load(self, snapshot_id=None):
if not self._table:
raise DatasetError(
"Iceberg Table not exist yet, make sure you use the `save` method to create a table first."
)
if self.table_type == "pandas":
data = self._table.scan(snapshot_id=snapshot_id).to_pandas()
return data
File: /var/folders/qq/wpp9vnws3sqcv6v3td68qcpm0000gp/T/ipykernel_46934/2093655639.py
Type: method
This is weird, I suspect this is due to how we wrap the load
and save
method. In general datasets use _load_args
and _save_args
, but it’s not convenient when you are actually developing interactively so I really want to just pass. For now I bypass this with internal method
= dataset._table.scan(snapshot_id=3459262658777849345).to_pandas()
_ _
VendorID | tpep_pickup_datetime | tpep_dropoff_datetime | passenger_count | trip_distance | RatecodeID | store_and_fwd_flag | PULocationID | DOLocationID | payment_type | fare_amount | extra | mta_tax | tip_amount | tolls_amount | improvement_surcharge | total_amount | congestion_surcharge | airport_fee |
---|
It looks like an empty table after DELETE
, let’s check the other snapshot.
= dataset._table.scan(snapshot_id=8535010037908857370).to_pandas() _
_.head()
VendorID | tpep_pickup_datetime | tpep_dropoff_datetime | passenger_count | trip_distance | RatecodeID | store_and_fwd_flag | PULocationID | DOLocationID | payment_type | fare_amount | extra | mta_tax | tip_amount | tolls_amount | improvement_surcharge | total_amount | congestion_surcharge | airport_fee | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 2 | 2023-01-01 00:32:10 | 2023-01-01 00:40:36 | 1.0 | 0.97 | 1.0 | N | 161 | 141 | 2 | 9.3 | 1.00 | 0.5 | 0.00 | 0.0 | 1.0 | 14.30 | 2.5 | 0.00 |
1 | 2 | 2023-01-01 00:55:08 | 2023-01-01 01:01:27 | 1.0 | 1.10 | 1.0 | N | 43 | 237 | 1 | 7.9 | 1.00 | 0.5 | 4.00 | 0.0 | 1.0 | 16.90 | 2.5 | 0.00 |
2 | 2 | 2023-01-01 00:25:04 | 2023-01-01 00:37:49 | 1.0 | 2.51 | 1.0 | N | 48 | 238 | 1 | 14.9 | 1.00 | 0.5 | 15.00 | 0.0 | 1.0 | 34.90 | 2.5 | 0.00 |
3 | 1 | 2023-01-01 00:03:48 | 2023-01-01 00:13:25 | 0.0 | 1.90 | 1.0 | N | 138 | 7 | 1 | 12.1 | 7.25 | 0.5 | 0.00 | 0.0 | 1.0 | 20.85 | 0.0 | 1.25 |
4 | 2 | 2023-01-01 00:10:29 | 2023-01-01 00:21:19 | 1.0 | 1.43 | 1.0 | N | 107 | 79 | 1 | 11.4 | 1.00 | 0.5 | 3.28 | 0.0 | 1.0 | 19.68 | 2.5 | 0.00 |
Questions
- What does it means when we said ” if we can use Iceberg to map a single version number to code, parameters, and I/O data within Kedro and how it aligns with Kedro’s workflow.” Versioning code & parameters sounds more like versioning artifacts.
- How to efficiently version data?
overwrite
is a completely re-write. For SQL engine this is implemented by the engine that utilise API likeappend
,replace
. With pandas/polars it is unclear if it’s possible. (Maybe be possible if it’s using something likeibis
) - Incremental pipeline (and incremental data)
- Version non-table type, i.e. parameters, code(?), Iceberg support only these three formats out of the box: Apache Parquet, Apache ORC, and Apache Avro. Parquet is the first-class citizen and the only format that people use in practice.