!pip install --quiet duckdb
Exploring DuckDB and how can we use it with kedro
Reference
Extend the notebook from: https://colab.research.google.com/drive/1eg_TJpPQr2tyYKWjISJlX8IEAi8Qln3U?usp=sharing
Practical SQL for Data Analysis
What you can do together with Pandas
import pandas as pd
import numpy as np
import sqlite3
import duckdb
import time
Preparation
Download the data and set up the Pandas data frames. We read the data into a Pandas DataFrame using DuckDB’s built-in Parquet reader.
!wget -q https://github.com/cwida/duckdb-data/releases/download/v1.0/lineitemsf1.snappy.parquet
HTTP/1.1 301 Moved Permanently
Server: GitHub.com
Date: Tue, 15 Nov 2022 22:34:17 GMT
Content-Type: text/html; charset=utf-8
Vary: X-PJAX, X-PJAX-Container, Turbo-Visit, Turbo-Frame, Accept-Encoding, Accept, X-Requested-With
Location: https://github.com/duckdb/duckdb-data/releases/download/v1.0/lineitemsf1.snappy.parquet
Cache-Control: no-cache
Strict-Transport-Security: max-age=31536000; includeSubdomains; preload
X-Frame-Options: deny
X-Content-Type-Options: nosniff
X-XSS-Protection: 0
Referrer-Policy: origin-when-cross-origin, strict-origin-when-cross-origin
Content-Security-Policy: default-src 'none'; base-uri 'self'; block-all-mixed-content; child-src github.com/assets-cdn/worker/ gist.github.com/assets-cdn/worker/; connect-src 'self' uploads.github.com objects-origin.githubusercontent.com www.githubstatus.com collector.github.com raw.githubusercontent.com api.github.com github-cloud.s3.amazonaws.com github-production-repository-file-5c1aeb.s3.amazonaws.com github-production-upload-manifest-file-7fdce7.s3.amazonaws.com github-production-user-asset-6210df.s3.amazonaws.com cdn.optimizely.com logx.optimizely.com/v1/events *.actions.githubusercontent.com wss://*.actions.githubusercontent.com online.visualstudio.com/api/v1/locations github-production-repository-image-32fea6.s3.amazonaws.com github-production-release-asset-2e65be.s3.amazonaws.com insights.github.com wss://alive.github.com; font-src github.githubassets.com; form-action 'self' github.com gist.github.com objects-origin.githubusercontent.com; frame-ancestors 'none'; frame-src viewscreen.githubusercontent.com notebooks.githubusercontent.com; img-src 'self' data: github.githubassets.com media.githubusercontent.com camo.githubusercontent.com identicons.github.com avatars.githubusercontent.com github-cloud.s3.amazonaws.com objects.githubusercontent.com objects-origin.githubusercontent.com secured-user-images.githubusercontent.com/ opengraph.githubassets.com github-production-user-asset-6210df.s3.amazonaws.com customer-stories-feed.github.com spotlights-feed.github.com *.githubusercontent.com; manifest-src 'self'; media-src github.com user-images.githubusercontent.com/ secured-user-images.githubusercontent.com/; script-src github.githubassets.com; style-src 'unsafe-inline' github.githubassets.com; worker-src github.com/assets-cdn/worker/ gist.github.com/assets-cdn/worker/
Content-Length: 0
X-GitHub-Request-Id: D47F:F2DE:2F3E29B:303AC52:637413E9
HTTP/1.1 302 Found
Server: GitHub.com
Date: Tue, 15 Nov 2022 22:34:17 GMT
Content-Type: text/html; charset=utf-8
Vary: X-PJAX, X-PJAX-Container, Turbo-Visit, Turbo-Frame, Accept-Encoding, Accept, X-Requested-With
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/263853960/33e88e80-95cb-11ea-8bb7-2dfa0654592c?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAIWNJYAX4CSVEH53A%2F20221115%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20221115T223417Z&X-Amz-Expires=300&X-Amz-Signature=07d1673053f9e8676510f46b62993e3b9b2428a17f00a613162f67690318e82f&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=263853960&response-content-disposition=attachment%3B%20filename%3Dlineitemsf1.snappy.parquet&response-content-type=application%2Foctet-stream
Cache-Control: no-cache
Strict-Transport-Security: max-age=31536000; includeSubdomains; preload
X-Frame-Options: deny
X-Content-Type-Options: nosniff
X-XSS-Protection: 0
Referrer-Policy: no-referrer-when-downgrade
Content-Security-Policy: default-src 'none'; base-uri 'self'; block-all-mixed-content; child-src github.com/assets-cdn/worker/ gist.github.com/assets-cdn/worker/; connect-src 'self' uploads.github.com objects-origin.githubusercontent.com www.githubstatus.com collector.github.com raw.githubusercontent.com api.github.com github-cloud.s3.amazonaws.com github-production-repository-file-5c1aeb.s3.amazonaws.com github-production-upload-manifest-file-7fdce7.s3.amazonaws.com github-production-user-asset-6210df.s3.amazonaws.com cdn.optimizely.com logx.optimizely.com/v1/events *.actions.githubusercontent.com wss://*.actions.githubusercontent.com online.visualstudio.com/api/v1/locations github-production-repository-image-32fea6.s3.amazonaws.com github-production-release-asset-2e65be.s3.amazonaws.com insights.github.com wss://alive.github.com; font-src github.githubassets.com; form-action 'self' github.com gist.github.com objects-origin.githubusercontent.com; frame-ancestors 'none'; frame-src viewscreen.githubusercontent.com notebooks.githubusercontent.com; img-src 'self' data: github.githubassets.com media.githubusercontent.com camo.githubusercontent.com identicons.github.com avatars.githubusercontent.com github-cloud.s3.amazonaws.com objects.githubusercontent.com objects-origin.githubusercontent.com secured-user-images.githubusercontent.com/ opengraph.githubassets.com github-production-user-asset-6210df.s3.amazonaws.com customer-stories-feed.github.com spotlights-feed.github.com *.githubusercontent.com; manifest-src 'self'; media-src github.com user-images.githubusercontent.com/ secured-user-images.githubusercontent.com/; script-src github.githubassets.com; style-src 'unsafe-inline' github.githubassets.com; worker-src github.com/assets-cdn/worker/ gist.github.com/assets-cdn/worker/
Content-Length: 0
X-GitHub-Request-Id: D47F:F2DE:2F3E323:303ACEF:637413E9
HTTP/1.1 200 OK
Connection: keep-alive
Content-Length: 206368635
Content-Type: application/octet-stream
Last-Modified: Tue, 07 Dec 2021 13:35:44 GMT
ETag: "0x8D9B986787C89B4"
Server: Windows-Azure-Blob/1.0 Microsoft-HTTPAPI/2.0
x-ms-request-id: b588900b-a01e-0060-6d42-f95efa000000
x-ms-version: 2020-04-08
x-ms-creation-time: Tue, 17 Aug 2021 11:28:44 GMT
x-ms-lease-status: unlocked
x-ms-lease-state: available
x-ms-blob-type: BlockBlob
Content-Disposition: attachment; filename=lineitemsf1.snappy.parquet
x-ms-server-encrypted: true
Fastly-Restarts: 1
Accept-Ranges: bytes
Age: 0
Date: Tue, 15 Nov 2022 22:34:18 GMT
Via: 1.1 varnish
X-Served-By: cache-lhr7337-LHR
X-Cache: MISS
X-Cache-Hits: 0
X-Timer: S1668551658.108887,VS0,VE259
!wget -q https://github.com/cwida/duckdb-data/releases/download/v1.0/orders.parquet
%%time
= duckdb.query("SELECT * FROM 'lineitemsf1.snappy.parquet'").to_df()
lineitem = duckdb.query("SELECT * FROM 'orders.parquet'").to_df() orders
CPU times: user 7.62 s, sys: 5.43 s, total: 13 s
Wall time: 14.1 s
%%time
= pd.read_parquet("lineitemsf1.snappy.parquet")
_ = pd.read_parquet("orders.parquet") _
CPU times: user 6.29 s, sys: 1.5 s, total: 7.78 s
Wall time: 5.81 s
= duckdb.connect()
con 'PRAGMA threads=2')
con.execute(
def timeit(fun, name):
import time
= time.monotonic()
start_time
fun()return [name, time.monotonic() - start_time]
def plot_results(results, title):
= pd.DataFrame.from_dict({
df 'name': [x[0] for x in results],
'time': [x[1] for x in results]
})print(title)
print(df)
Ungrouped Aggregates
This performs a simple set of ungrouped aggregates (sum, min, max, avg) over a column without any filters or other complex operations.
= '''
ungrouped_aggregate SELECT SUM(l_extendedprice), MIN(l_extendedprice), MAX(l_extendedprice), AVG(l_extendedprice) FROM lineitem
'''
def duckdb_ungrouped_aggregate(d_con):
print(d_con.query(ungrouped_aggregate).to_df())
def duckdb_ungrouped_aggregate_1t():
duckdb_ungrouped_aggregate(duckdb)
def duckdb_ungrouped_aggregate_2t():
duckdb_ungrouped_aggregate(con)
def pandas_ungrouped_aggregate():
= lineitem.groupby(
result 'l_returnflag', 'l_linestatus']
[
).agg(=('l_extendedprice', 'sum'),
Sum=('l_extendedprice', 'min'),
Min=('l_extendedprice', 'max'),
Max=('l_extendedprice', 'mean')
Avg
)print(result)
# print(lineitem.agg(Sum=('l_extendedprice', 'sum'), Min=('l_extendedprice', 'min'), Max=('l_extendedprice', 'max'), Avg=('l_extendedprice', 'mean')))
= []
ua_results 'DuckDB (1T)'))
ua_results.append(timeit(duckdb_ungrouped_aggregate_1t, 'DuckDB (2T)'))
ua_results.append(timeit(duckdb_ungrouped_aggregate_2t, 'Pandas'))
ua_results.append(timeit(pandas_ungrouped_aggregate, 'Ungrouped Aggregate') plot_results(ua_results,
sum(l_extendedprice) min(l_extendedprice) max(l_extendedprice) \
0 2.295773e+11 901.0 104949.5
avg(l_extendedprice)
0 38255.138485
sum(l_extendedprice) min(l_extendedprice) max(l_extendedprice) \
0 2.295773e+11 901.0 104949.5
avg(l_extendedprice)
0 38255.138485
Sum Min Max Avg
l_returnflag l_linestatus
A F 5.658655e+10 904.0 104949.5 38273.129735
N F 1.487505e+09 920.0 104049.5 38284.467761
O 1.149352e+11 901.0 104749.5 38248.015609
R F 5.656804e+10 904.0 104899.5 38250.854626
Ungrouped Aggregate
name time
0 DuckDB (1T) 0.052544
1 DuckDB (2T) 0.066239
2 Pandas 0.801278
Grouped Aggregates
This performs the same set of aggregates, but this time grouped by two other columns (l_returnflag and l_linestatus).
= '''
grouped_aggregate SELECT l_returnflag,
l_linestatus,
SUM(l_extendedprice),
MIN(l_extendedprice),
MAX(l_extendedprice),
AVG(l_extendedprice)
FROM lineitem
GROUP BY l_returnflag,
l_linestatus
'''
def duckdb_grouped_aggregate(d_con):
print(d_con.query(grouped_aggregate).to_df())
def duckdb_grouped_aggregate_1t():
duckdb_grouped_aggregate(duckdb)
def duckdb_grouped_aggregate_2t():
duckdb_grouped_aggregate(con)
def pandas_grouped_aggregate():
print(lineitem.groupby(['l_returnflag', 'l_linestatus']).agg(Sum=('l_extendedprice', 'sum'), Min=('l_extendedprice', 'min'), Max=('l_extendedprice', 'max'), Avg=('l_extendedprice', 'mean')))
= []
results 'DuckDB (1T)'))
results.append(timeit(duckdb_grouped_aggregate_1t, 'DuckDB (2T)'))
results.append(timeit(duckdb_grouped_aggregate_2t, 'Pandas'))
results.append(timeit(pandas_grouped_aggregate, 'Grouped Aggregate') plot_results(results,
l_returnflag l_linestatus sum(l_extendedprice) min(l_extendedprice) \
0 N O 1.149352e+11 901.0
1 R F 5.656804e+10 904.0
2 A F 5.658655e+10 904.0
3 N F 1.487505e+09 920.0
max(l_extendedprice) avg(l_extendedprice)
0 104749.5 38248.015609
1 104899.5 38250.854626
2 104949.5 38273.129735
3 104049.5 38284.467761
l_returnflag l_linestatus sum(l_extendedprice) min(l_extendedprice) \
0 N O 1.149352e+11 901.0
1 R F 5.656804e+10 904.0
2 A F 5.658655e+10 904.0
3 N F 1.487505e+09 920.0
max(l_extendedprice) avg(l_extendedprice)
0 104749.5 38248.015609
1 104899.5 38250.854626
2 104949.5 38273.129735
3 104049.5 38284.467761
Sum Min Max Avg
l_returnflag l_linestatus
A F 5.658655e+10 904.0 104949.5 38273.129735
N F 1.487505e+09 920.0 104049.5 38284.467761
O 1.149352e+11 901.0 104749.5 38248.015609
R F 5.656804e+10 904.0 104899.5 38250.854626
Grouped Aggregate
name time
0 DuckDB (1T) 0.115463
1 DuckDB (2T) 0.222520
2 Pandas 0.708696
Grouped Aggregate with a Filter
This benchmark performs a grouped aggregate with a filter over the shipdate column.
As Pandas does not perform any projection pushdown, we include a version where we manually perform the projection pushdown by filtering only the columns we actually need before running the filter and aggregate.
This optimization is performed automatically in DuckDB by the query optimizer.
def duckdb_grouped_aggregate_filter(d_con):
print(d_con.query('''
SELECT l_returnflag,
l_linestatus,
SUM(l_extendedprice),
MIN(l_extendedprice),
MAX(l_extendedprice),
AVG(l_extendedprice)
FROM lineitem
WHERE
l_shipdate <= DATE '1998-09-02'
GROUP BY l_returnflag,
l_linestatus
''').to_df())
def duckdb_grouped_aggregate_filter_1t():
duckdb_grouped_aggregate_filter(duckdb)
def duckdb_grouped_aggregate_filter_2t():
duckdb_grouped_aggregate_filter(con)
def pandas_grouped_aggregate_filter():
= lineitem[lineitem['l_shipdate'] < "1998-09-02"]
filtered_df print(filtered_df.groupby(['l_returnflag', 'l_linestatus']).agg(Sum=('l_extendedprice', 'sum'), Min=('l_extendedprice', 'min'), Max=('l_extendedprice', 'max'), Avg=('l_extendedprice', 'mean')))
def pandas_grouped_aggregate_filter_projection_pushdown():
= lineitem[['l_shipdate', 'l_returnflag', 'l_linestatus', 'l_extendedprice']]
pushed_down_df = pushed_down_df[pushed_down_df['l_shipdate'] < "1998-09-02"]
filtered_df print(filtered_df.groupby(['l_returnflag', 'l_linestatus']).agg(Sum=('l_extendedprice', 'sum'), Min=('l_extendedprice', 'min'), Max=('l_extendedprice', 'max'), Avg=('l_extendedprice', 'mean')))
= []
results 'DuckDB (1T)'))
results.append(timeit(duckdb_grouped_aggregate_filter_1t, 'DuckDB (2T)'))
results.append(timeit(duckdb_grouped_aggregate_filter_2t, 'Pandas'))
results.append(timeit(pandas_grouped_aggregate_filter, 'Pandas (manual pushdown)'))
results.append(timeit(pandas_grouped_aggregate_filter_projection_pushdown, 'Grouped Aggregate + Filter') plot_results(results,
l_returnflag l_linestatus sum(l_extendedprice) min(l_extendedprice) \
0 N O 1.117017e+11 901.0
1 A F 5.658655e+10 904.0
2 R F 5.656804e+10 904.0
3 N F 1.487505e+09 920.0
max(l_extendedprice) avg(l_extendedprice)
0 104749.5 38249.117989
1 104949.5 38273.129735
2 104899.5 38250.854626
3 104049.5 38284.467761
l_returnflag l_linestatus sum(l_extendedprice) min(l_extendedprice) \
0 N O 1.117017e+11 901.0
1 A F 5.658655e+10 904.0
2 R F 5.656804e+10 904.0
3 N F 1.487505e+09 920.0
max(l_extendedprice) avg(l_extendedprice)
0 104749.5 38249.117989
1 104949.5 38273.129735
2 104899.5 38250.854626
3 104049.5 38284.467761
Sum Min Max Avg
l_returnflag l_linestatus
A F 5.658655e+10 904.0 104949.5 38273.129735
N F 1.487505e+09 920.0 104049.5 38284.467761
O 1.116318e+11 901.0 104749.5 38249.322811
R F 5.656804e+10 904.0 104899.5 38250.854626
Sum Min Max Avg
l_returnflag l_linestatus
A F 5.658655e+10 904.0 104949.5 38273.129735
N F 1.487505e+09 920.0 104049.5 38284.467761
O 1.116318e+11 901.0 104749.5 38249.322811
R F 5.656804e+10 904.0 104899.5 38250.854626
Grouped Aggregate + Filter
name time
0 DuckDB (1T) 0.281653
1 DuckDB (2T) 0.356302
2 Pandas 2.889015
3 Pandas (manual pushdown) 1.625353
Grouped Aggregate with Join and Filter
In this benchmark we expand on the previous benchmark by including a join and a filter on the joined-on table.
Note that the naive version in Pandas is extremely slow, as it performs a full join of the entire table including all the columns that are not used and all the rows that will be filtered out. For that reason we have included a separate benchmark in which we have manually optimized the Pandas code by pushing down the projections and the filters.
These optimizations are performed automatically in DuckDB by the query optimizer.
# projection & filter on lineitem table
= lineitem[
lineitem_projected 'l_shipdate',
['l_orderkey',
'l_linestatus',
'l_returnflag',
'l_extendedprice']
]= lineitem_projected[
lineitem_filtered 'l_shipdate'] < "1998-09-02"]
lineitem_projected[# projection and filter on order table
= orders[
orders_projected 'o_orderkey',
['o_orderstatus']
]= orders_projected[
orders_filtered 'o_orderstatus'] == 'O']
orders_projected[# perform the join
= lineitem_filtered.merge(
merged
orders_filtered,='l_orderkey',
left_on='o_orderkey')
right_on# perform the aggregate
= merged.groupby(
result 'l_returnflag', 'l_linestatus']
[
).agg(=('l_extendedprice', 'sum'),
Sum=('l_extendedprice', 'min'),
Min=('l_extendedprice', 'max'),
Max=('l_extendedprice', 'mean')
Avg )
result
Sum | Min | Max | Avg | ||
---|---|---|---|---|---|
l_returnflag | l_linestatus | ||||
N | O | 1.080448e+11 | 901.0 | 104749.5 | 38250.662806 |
def duckdb_grouped_aggregate_filter_join(d_con):
print(d_con.query('''
SELECT l_returnflag,
l_linestatus,
sum(l_extendedprice),
min(l_extendedprice),
max(l_extendedprice),
avg(l_extendedprice)
FROM lineitem lineitem
JOIN orders orders ON (l_orderkey=o_orderkey)
WHERE l_shipdate <= DATE '1998-09-02'
AND o_orderstatus='O'
GROUP BY l_returnflag,
l_linestatus
''').to_df())
def duckdb_grouped_aggregate_filter_join_1t():
duckdb_grouped_aggregate_filter_join(duckdb)
def duckdb_grouped_aggregate_filter_join_2t():
duckdb_grouped_aggregate_filter_join(con)
def pandas_grouped_aggregate_filter_join():
= lineitem.merge(orders, left_on='l_orderkey', right_on='o_orderkey')
merged = merged[merged['l_shipdate'] < "1998-09-02"]
filtered_a = filtered_a[filtered_a['o_orderstatus'] == 'O']
filtered_b = filtered_b.groupby(['l_returnflag', 'l_linestatus']).agg(Sum=('l_extendedprice', 'sum'), Min=('l_extendedprice', 'min'), Max=('l_extendedprice', 'max'), Avg=('l_extendedprice', 'mean'))
result print(result)
def pandas_grouped_aggregate_filter_join_manual_pushdown():
= lineitem[['l_shipdate', 'l_orderkey', 'l_linestatus', 'l_returnflag', 'l_extendedprice']]
lineitem_projected = lineitem_projected[lineitem_projected['l_shipdate'] < "1998-09-02"]
lineitem_filtered = orders[['o_orderkey', 'o_orderstatus']]
orders_projected = orders_projected[orders_projected['o_orderstatus'] == 'O']
orders_filtered = lineitem_filtered.merge(orders_filtered, left_on='l_orderkey', right_on='o_orderkey')
merged = merged.groupby(['l_returnflag', 'l_linestatus']).agg(Sum=('l_extendedprice', 'sum'), Min=('l_extendedprice', 'min'), Max=('l_extendedprice', 'max'), Avg=('l_extendedprice', 'mean'))
result print(result)
= []
results 'DuckDB (1T)'))
results.append(timeit(duckdb_grouped_aggregate_filter_join_1t, 'DuckDB (2T)'))
results.append(timeit(duckdb_grouped_aggregate_filter_join_2t, 'Pandas'))
results.append(timeit(pandas_grouped_aggregate_filter_join, 'Pandas (manual pushdown)'))
results.append(timeit(pandas_grouped_aggregate_filter_join_manual_pushdown, 'Grouped Aggregate Join')
plot_results(results,
l_returnflag l_linestatus sum(l_extendedprice) min(l_extendedprice) \
0 N O 1.081147e+11 901.0
max(l_extendedprice) avg(l_extendedprice)
0 104749.5 38250.450307
l_returnflag l_linestatus sum(l_extendedprice) min(l_extendedprice) \
0 N O 1.081147e+11 901.0
max(l_extendedprice) avg(l_extendedprice)
0 104749.5 38250.450307
Sum Min Max Avg
l_returnflag l_linestatus
N O 1.080448e+11 901.0 104749.5 38250.662806
Sum Min Max Avg
l_returnflag l_linestatus
N O 1.080448e+11 901.0 104749.5 38250.662806
Grouped Aggregate Join
name time
0 DuckDB (1T) 0.218088
1 DuckDB (2T) 0.376592
2 Pandas 11.403579
3 Pandas (manual pushdown) 2.765103
Appendix A: There and back again: Transferring data from Pandas to a SQL engine and back
As Appendix A relies on the presence of an external PostgreSQL database server, the code cannot be executed in colab. The source code can be found here: https://gist.github.com/hannesmuehleisen/a95a39a1eda63aeb0ca13fd82d1ba49c
Appendix B: PandasSQL
Note: we cannot run this on the original dataset, as colab will run out of memory and crash. Instead for the benchmark we add a sample clause to reduce the data set size to 10% of the original data set size.
!pip install --quiet pandasql
import pandasql as psql
= lambda q: psql.sqldf(q, globals()) pysqldf
= duckdb.query("SELECT * FROM 'lineitemsf1.snappy.parquet' USING SAMPLE 10%").to_df() lineitem_sample
= '''
ungrouped_aggregate SELECT SUM(l_extendedprice), MIN(l_extendedprice), MAX(l_extendedprice), AVG(l_extendedprice) FROM lineitem_sample
'''
def duckdb_ungrouped_aggregate(d_con):
print(d_con.query(ungrouped_aggregate).to_df())
def duckdb_ungrouped_aggregate_1t():
duckdb_ungrouped_aggregate(duckdb)
def duckdb_ungrouped_aggregate_2t():
duckdb_ungrouped_aggregate(con)
def pandas_ungrouped_aggregate():
print(lineitem_sample.agg(Sum=('l_extendedprice', 'sum'), Min=('l_extendedprice', 'min'), Max=('l_extendedprice', 'max'), Avg=('l_extendedprice', 'mean')))
def ungrouped_aggregate_pandasql():
print(pysqldf(ungrouped_aggregate))
= []
ua_results 'DuckDB (1T)'))
ua_results.append(timeit(duckdb_ungrouped_aggregate_1t, 'DuckDB (2T)'))
ua_results.append(timeit(duckdb_ungrouped_aggregate_2t, 'Pandas'))
ua_results.append(timeit(pandas_ungrouped_aggregate, 'PandaSQL'))
ua_results.append(timeit(ungrouped_aggregate_pandasql, 'Ungrouped Aggregate') plot_results(ua_results,
sum(l_extendedprice) min(l_extendedprice) max(l_extendedprice) \
0 2.318151e+10 907.0 104899.5
avg(l_extendedprice)
0 38240.198955
sum(l_extendedprice) min(l_extendedprice) max(l_extendedprice) \
0 2.318151e+10 907.0 104899.5
avg(l_extendedprice)
0 38240.198955
l_extendedprice
Sum 2.318151e+10
Min 9.070000e+02
Max 1.048995e+05
Avg 3.824020e+04
SUM(l_extendedprice) MIN(l_extendedprice) MAX(l_extendedprice) \
0 2.318151e+10 907.0 104899.5
AVG(l_extendedprice)
0 38240.198955
Ungrouped Aggregate
name time
0 DuckDB (1T) 0.039731
1 DuckDB (2T) 0.033024
2 Pandas 0.012675
3 PandaSQL 9.181672
Appendix C: Directly querying Parquet files
In the benchmarks above, we fully read the parquet files into Pandas. However, DuckDB also has the capability of directly running queries on top of Parquet files. In this appendix, we show the performance of this compared to loading the file into Python first.
You can even use the wildcard syntax to run queries on multiple Parquet files in the same folder and create a unified single-table view over them (as long as they have the same schema).
# need to install pyarrow for pandas parquet reading
!pip install pyarrow
Requirement already satisfied: pyarrow in /Users/Nok_Lam_Chan/miniconda3/envs/duckdb/lib/python3.9/site-packages (10.0.0)
Requirement already satisfied: numpy>=1.16.6 in /Users/Nok_Lam_Chan/miniconda3/envs/duckdb/lib/python3.9/site-packages (from pyarrow) (1.23.4)
# set up the view
= duckdb.connect()
parquet_con "CREATE VIEW lineitem_parquet AS SELECT * FROM 'lineitemsf1.snappy.parquet'")
parquet_con.execute("CREATE VIEW orders_parquet AS SELECT * FROM 'orders.parquet'") parquet_con.execute(
<duckdb.DuckDBPyConnection>
Ungrouped Aggregate
= '''
ungrouped_aggregate SELECT SUM(l_extendedprice), MIN(l_extendedprice), MAX(l_extendedprice), AVG(l_extendedprice) FROM lineitem_parquet
'''
def duckdb_parquet_query(d_con):
print(d_con.query(ungrouped_aggregate).to_df())
def duckdb_ungrouped_parquet_1t():
'PRAGMA threads=1')
parquet_con.execute(
duckdb_ungrouped_aggregate(parquet_con)
def duckdb_ungrouped_parquet_2t():
'PRAGMA threads=2')
parquet_con.execute(
duckdb_ungrouped_aggregate(parquet_con)
def pandas_ungrouped_aggregate():
print(lineitem.agg(Sum=('l_extendedprice', 'sum'), Min=('l_extendedprice', 'min'), Max=('l_extendedprice', 'max'), Avg=('l_extendedprice', 'mean')))
def pandas_ungrouped_aggregate_parquet_load():
= pd.read_parquet('lineitemsf1.snappy.parquet')
lineitem_pandas_parquet print(lineitem_pandas_parquet.agg(Sum=('l_extendedprice', 'sum'), Min=('l_extendedprice', 'min'), Max=('l_extendedprice', 'max'), Avg=('l_extendedprice', 'mean')))
def pandas_ungrouped_aggregate_parquet_load_pushdown():
= pd.read_parquet('lineitemsf1.snappy.parquet', columns=['l_extendedprice'])
lineitem_pandas_parquet print(lineitem_pandas_parquet.agg(Sum=('l_extendedprice', 'sum'), Min=('l_extendedprice', 'min'), Max=('l_extendedprice', 'max'), Avg=('l_extendedprice', 'mean')))
= []
ua_results 'DuckDB (1 Thread)'))
ua_results.append(timeit(duckdb_ungrouped_parquet_1t, 'DuckDB (2 Threads)'))
ua_results.append(timeit(duckdb_ungrouped_parquet_2t, 'Pandas'))
ua_results.append(timeit(pandas_ungrouped_aggregate, 'Pandas + Parquet Load'))
ua_results.append(timeit(pandas_ungrouped_aggregate_parquet_load, 'Pandas + Parquet Load (Pushdown)'))
ua_results.append(timeit(pandas_ungrouped_aggregate_parquet_load_pushdown, 'Ungrouped Aggregate (Parquet)') plot_results(ua_results,
sum(l_extendedprice) min(l_extendedprice) max(l_extendedprice) \
0 2.295773e+11 901.0 104949.5
avg(l_extendedprice)
0 38255.138485
sum(l_extendedprice) min(l_extendedprice) max(l_extendedprice) \
0 2.295773e+11 901.0 104949.5
avg(l_extendedprice)
0 38255.138485
l_extendedprice
Sum 2.295773e+11
Min 9.010000e+02
Max 1.049495e+05
Avg 3.825514e+04
l_extendedprice
Sum 2.295773e+11
Min 9.010000e+02
Max 1.049495e+05
Avg 3.825514e+04
l_extendedprice
Sum 2.295773e+11
Min 9.010000e+02
Max 1.049495e+05
Avg 3.825514e+04
Ungrouped Aggregate (Parquet)
name time
0 DuckDB (1 Thread) 0.173902
1 DuckDB (2 Threads) 0.086305
2 Pandas 0.050655
3 Pandas + Parquet Load 6.311870
4 Pandas + Parquet Load (Pushdown) 0.151299
Grouped Aggregate with Join and Filter
def duckdb_grouped_aggregate_filter_join_pq(d_con):
print(d_con.query('''
SELECT l_returnflag,
l_linestatus,
sum(l_extendedprice),
min(l_extendedprice),
max(l_extendedprice),
avg(l_extendedprice)
FROM lineitem_parquet lineitem
JOIN orders_parquet orders ON (l_orderkey=o_orderkey)
WHERE l_shipdate <= DATE '1998-09-02'
AND o_orderstatus='O'
GROUP BY l_returnflag,
l_linestatus
''').to_df())
def duckdb_grouped_aggregate_filter_join_pq_1t():
'PRAGMA threads=1')
parquet_con.execute(
duckdb_grouped_aggregate_filter_join_pq(parquet_con)
def duckdb_grouped_aggregate_filter_join_pq_2t():
'PRAGMA threads=2')
parquet_con.execute(
duckdb_grouped_aggregate_filter_join_pq(parquet_con)
def pandas_grouped_aggregate_filter_join_pq():
= pd.read_parquet('lineitemsf1.snappy.parquet')
lineitem_pandas_parquet = pd.read_parquet('orders.parquet')
orders_pandas_parquet = lineitem_pandas_parquet.merge(orders, left_on='l_orderkey', right_on='o_orderkey')
merged = merged[merged['l_shipdate'] < "1998-09-02"]
filtered_a = filtered_a[filtered_a['o_orderstatus'] == 'O']
filtered_b = filtered_b.groupby(['l_returnflag', 'l_linestatus']).agg(Sum=('l_extendedprice', 'sum'), Min=('l_extendedprice', 'min'), Max=('l_extendedprice', 'max'), Avg=('l_extendedprice', 'mean'))
result print(result)
def pandas_grouped_aggregate_filter_join_manual_pushdown_pq():
= pd.read_parquet('lineitemsf1.snappy.parquet', columns=['l_shipdate', 'l_orderkey', 'l_linestatus', 'l_returnflag', 'l_extendedprice'])
lineitem_projected = pd.read_parquet('orders.parquet', columns=['o_orderkey', 'o_orderstatus'])
orders_projected = lineitem_projected[lineitem_projected['l_shipdate'] < "1998-09-02"]
lineitem_filtered = orders_projected[orders_projected['o_orderstatus'] == 'O']
orders_filtered = lineitem_filtered.merge(orders_filtered, left_on='l_orderkey', right_on='o_orderkey')
merged = merged.groupby(['l_returnflag', 'l_linestatus']).agg(Sum=('l_extendedprice', 'sum'), Min=('l_extendedprice', 'min'), Max=('l_extendedprice', 'max'), Avg=('l_extendedprice', 'mean'))
result print(result)
= []
results 'DuckDB (1T)'))
results.append(timeit(duckdb_grouped_aggregate_filter_join_pq_1t, 'DuckDB (2T)'))
results.append(timeit(duckdb_grouped_aggregate_filter_join_pq_2t, 'Pandas'))
results.append(timeit(pandas_grouped_aggregate_filter_join_pq, 'Pandas (manual pushdown)'))
results.append(timeit(pandas_grouped_aggregate_filter_join_manual_pushdown_pq, 'Grouped Aggregate Join (Parquet)')
plot_results(results,
l_returnflag l_linestatus sum(l_extendedprice) min(l_extendedprice) \
0 N O 1.081147e+11 901.0
max(l_extendedprice) avg(l_extendedprice)
0 104749.5 38250.450307
l_returnflag l_linestatus sum(l_extendedprice) min(l_extendedprice) \
0 N O 1.081147e+11 901.0
max(l_extendedprice) avg(l_extendedprice)
0 104749.5 38250.450307
Sum Min Max Avg
l_returnflag l_linestatus
N O 1.080448e+11 901.0 104749.5 38250.662806
Sum Min Max Avg
l_returnflag l_linestatus
N O 1.080448e+11 901.0 104749.5 38250.662806
Grouped Aggregate Join (Parquet)
name time
0 DuckDB (1T) 0.828549
1 DuckDB (2T) 0.508537
2 Pandas 13.954761
3 Pandas (manual pushdown) 2.337109
## Part II - Transform Pandas to DuckDB Query
## Part III - Create the DuckDB DataSet
## Part IV - Create the DuckDB Transformer