Kedro DuckDB

Kedro Meet the Duck
python
Published

November 15, 2022

Exploring DuckDB and how can we use it with kedro

Reference

Extend the notebook from: https://colab.research.google.com/drive/1eg_TJpPQr2tyYKWjISJlX8IEAi8Qln3U?usp=sharing

Practical SQL for Data Analysis

What you can do together with Pandas

!pip install --quiet duckdb
import pandas as pd
import numpy as np

import sqlite3
import duckdb
import time

Preparation

Download the data and set up the Pandas data frames. We read the data into a Pandas DataFrame using DuckDB’s built-in Parquet reader.

!wget -q https://github.com/cwida/duckdb-data/releases/download/v1.0/lineitemsf1.snappy.parquet
  HTTP/1.1 301 Moved Permanently
  Server: GitHub.com
  Date: Tue, 15 Nov 2022 22:34:17 GMT
  Content-Type: text/html; charset=utf-8
  Vary: X-PJAX, X-PJAX-Container, Turbo-Visit, Turbo-Frame, Accept-Encoding, Accept, X-Requested-With
  Location: https://github.com/duckdb/duckdb-data/releases/download/v1.0/lineitemsf1.snappy.parquet
  Cache-Control: no-cache
  Strict-Transport-Security: max-age=31536000; includeSubdomains; preload
  X-Frame-Options: deny
  X-Content-Type-Options: nosniff
  X-XSS-Protection: 0
  Referrer-Policy: origin-when-cross-origin, strict-origin-when-cross-origin
  Content-Security-Policy: default-src 'none'; base-uri 'self'; block-all-mixed-content; child-src github.com/assets-cdn/worker/ gist.github.com/assets-cdn/worker/; connect-src 'self' uploads.github.com objects-origin.githubusercontent.com www.githubstatus.com collector.github.com raw.githubusercontent.com api.github.com github-cloud.s3.amazonaws.com github-production-repository-file-5c1aeb.s3.amazonaws.com github-production-upload-manifest-file-7fdce7.s3.amazonaws.com github-production-user-asset-6210df.s3.amazonaws.com cdn.optimizely.com logx.optimizely.com/v1/events *.actions.githubusercontent.com wss://*.actions.githubusercontent.com online.visualstudio.com/api/v1/locations github-production-repository-image-32fea6.s3.amazonaws.com github-production-release-asset-2e65be.s3.amazonaws.com insights.github.com wss://alive.github.com; font-src github.githubassets.com; form-action 'self' github.com gist.github.com objects-origin.githubusercontent.com; frame-ancestors 'none'; frame-src viewscreen.githubusercontent.com notebooks.githubusercontent.com; img-src 'self' data: github.githubassets.com media.githubusercontent.com camo.githubusercontent.com identicons.github.com avatars.githubusercontent.com github-cloud.s3.amazonaws.com objects.githubusercontent.com objects-origin.githubusercontent.com secured-user-images.githubusercontent.com/ opengraph.githubassets.com github-production-user-asset-6210df.s3.amazonaws.com customer-stories-feed.github.com spotlights-feed.github.com *.githubusercontent.com; manifest-src 'self'; media-src github.com user-images.githubusercontent.com/ secured-user-images.githubusercontent.com/; script-src github.githubassets.com; style-src 'unsafe-inline' github.githubassets.com; worker-src github.com/assets-cdn/worker/ gist.github.com/assets-cdn/worker/
  Content-Length: 0
  X-GitHub-Request-Id: D47F:F2DE:2F3E29B:303AC52:637413E9
  HTTP/1.1 302 Found
  Server: GitHub.com
  Date: Tue, 15 Nov 2022 22:34:17 GMT
  Content-Type: text/html; charset=utf-8
  Vary: X-PJAX, X-PJAX-Container, Turbo-Visit, Turbo-Frame, Accept-Encoding, Accept, X-Requested-With
  Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/263853960/33e88e80-95cb-11ea-8bb7-2dfa0654592c?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAIWNJYAX4CSVEH53A%2F20221115%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20221115T223417Z&X-Amz-Expires=300&X-Amz-Signature=07d1673053f9e8676510f46b62993e3b9b2428a17f00a613162f67690318e82f&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=263853960&response-content-disposition=attachment%3B%20filename%3Dlineitemsf1.snappy.parquet&response-content-type=application%2Foctet-stream
  Cache-Control: no-cache
  Strict-Transport-Security: max-age=31536000; includeSubdomains; preload
  X-Frame-Options: deny
  X-Content-Type-Options: nosniff
  X-XSS-Protection: 0
  Referrer-Policy: no-referrer-when-downgrade
  Content-Security-Policy: default-src 'none'; base-uri 'self'; block-all-mixed-content; child-src github.com/assets-cdn/worker/ gist.github.com/assets-cdn/worker/; connect-src 'self' uploads.github.com objects-origin.githubusercontent.com www.githubstatus.com collector.github.com raw.githubusercontent.com api.github.com github-cloud.s3.amazonaws.com github-production-repository-file-5c1aeb.s3.amazonaws.com github-production-upload-manifest-file-7fdce7.s3.amazonaws.com github-production-user-asset-6210df.s3.amazonaws.com cdn.optimizely.com logx.optimizely.com/v1/events *.actions.githubusercontent.com wss://*.actions.githubusercontent.com online.visualstudio.com/api/v1/locations github-production-repository-image-32fea6.s3.amazonaws.com github-production-release-asset-2e65be.s3.amazonaws.com insights.github.com wss://alive.github.com; font-src github.githubassets.com; form-action 'self' github.com gist.github.com objects-origin.githubusercontent.com; frame-ancestors 'none'; frame-src viewscreen.githubusercontent.com notebooks.githubusercontent.com; img-src 'self' data: github.githubassets.com media.githubusercontent.com camo.githubusercontent.com identicons.github.com avatars.githubusercontent.com github-cloud.s3.amazonaws.com objects.githubusercontent.com objects-origin.githubusercontent.com secured-user-images.githubusercontent.com/ opengraph.githubassets.com github-production-user-asset-6210df.s3.amazonaws.com customer-stories-feed.github.com spotlights-feed.github.com *.githubusercontent.com; manifest-src 'self'; media-src github.com user-images.githubusercontent.com/ secured-user-images.githubusercontent.com/; script-src github.githubassets.com; style-src 'unsafe-inline' github.githubassets.com; worker-src github.com/assets-cdn/worker/ gist.github.com/assets-cdn/worker/
  Content-Length: 0
  X-GitHub-Request-Id: D47F:F2DE:2F3E323:303ACEF:637413E9
  HTTP/1.1 200 OK
  Connection: keep-alive
  Content-Length: 206368635
  Content-Type: application/octet-stream
  Last-Modified: Tue, 07 Dec 2021 13:35:44 GMT
  ETag: "0x8D9B986787C89B4"
  Server: Windows-Azure-Blob/1.0 Microsoft-HTTPAPI/2.0
  x-ms-request-id: b588900b-a01e-0060-6d42-f95efa000000
  x-ms-version: 2020-04-08
  x-ms-creation-time: Tue, 17 Aug 2021 11:28:44 GMT
  x-ms-lease-status: unlocked
  x-ms-lease-state: available
  x-ms-blob-type: BlockBlob
  Content-Disposition: attachment; filename=lineitemsf1.snappy.parquet
  x-ms-server-encrypted: true
  Fastly-Restarts: 1
  Accept-Ranges: bytes
  Age: 0
  Date: Tue, 15 Nov 2022 22:34:18 GMT
  Via: 1.1 varnish
  X-Served-By: cache-lhr7337-LHR
  X-Cache: MISS
  X-Cache-Hits: 0
  X-Timer: S1668551658.108887,VS0,VE259
!wget -q https://github.com/cwida/duckdb-data/releases/download/v1.0/orders.parquet
%%time
lineitem = duckdb.query("SELECT * FROM 'lineitemsf1.snappy.parquet'").to_df()
orders = duckdb.query("SELECT * FROM 'orders.parquet'").to_df()
CPU times: user 7.62 s, sys: 5.43 s, total: 13 s
Wall time: 14.1 s
%%time
_ = pd.read_parquet("lineitemsf1.snappy.parquet")
_ = pd.read_parquet("orders.parquet")
CPU times: user 6.29 s, sys: 1.5 s, total: 7.78 s
Wall time: 5.81 s
con = duckdb.connect()
con.execute('PRAGMA threads=2')

def timeit(fun, name):
    import time
    start_time = time.monotonic()
    fun()
    return [name, time.monotonic() - start_time]

def plot_results(results, title):
  df = pd.DataFrame.from_dict({
      'name': [x[0] for x in results],
      'time': [x[1] for x in results]
  })
  print(title)
  print(df)

Ungrouped Aggregates

This performs a simple set of ungrouped aggregates (sum, min, max, avg) over a column without any filters or other complex operations.

ungrouped_aggregate = '''
    SELECT SUM(l_extendedprice), MIN(l_extendedprice), MAX(l_extendedprice), AVG(l_extendedprice) FROM lineitem
'''

def duckdb_ungrouped_aggregate(d_con):
    print(d_con.query(ungrouped_aggregate).to_df())

def duckdb_ungrouped_aggregate_1t():
    duckdb_ungrouped_aggregate(duckdb)

def duckdb_ungrouped_aggregate_2t():
    duckdb_ungrouped_aggregate(con)

def pandas_ungrouped_aggregate():
  result = lineitem.groupby(
    ['l_returnflag', 'l_linestatus']
  ).agg(
    Sum=('l_extendedprice', 'sum'),
    Min=('l_extendedprice', 'min'),
    Max=('l_extendedprice', 'max'),
    Avg=('l_extendedprice', 'mean')
  )
  print(result)
    # print(lineitem.agg(Sum=('l_extendedprice', 'sum'), Min=('l_extendedprice', 'min'), Max=('l_extendedprice', 'max'), Avg=('l_extendedprice', 'mean')))

ua_results = []
ua_results.append(timeit(duckdb_ungrouped_aggregate_1t, 'DuckDB (1T)'))
ua_results.append(timeit(duckdb_ungrouped_aggregate_2t, 'DuckDB (2T)'))
ua_results.append(timeit(pandas_ungrouped_aggregate, 'Pandas'))
plot_results(ua_results, 'Ungrouped Aggregate')
   sum(l_extendedprice)  min(l_extendedprice)  max(l_extendedprice)  \
0          2.295773e+11                 901.0              104949.5   

   avg(l_extendedprice)  
0          38255.138485  
   sum(l_extendedprice)  min(l_extendedprice)  max(l_extendedprice)  \
0          2.295773e+11                 901.0              104949.5   

   avg(l_extendedprice)  
0          38255.138485  
                                    Sum    Min       Max           Avg
l_returnflag l_linestatus                                             
A            F             5.658655e+10  904.0  104949.5  38273.129735
N            F             1.487505e+09  920.0  104049.5  38284.467761
             O             1.149352e+11  901.0  104749.5  38248.015609
R            F             5.656804e+10  904.0  104899.5  38250.854626
Ungrouped Aggregate
          name      time
0  DuckDB (1T)  0.052544
1  DuckDB (2T)  0.066239
2       Pandas  0.801278

Grouped Aggregates

This performs the same set of aggregates, but this time grouped by two other columns (l_returnflag and l_linestatus).

grouped_aggregate = '''
SELECT l_returnflag,
       l_linestatus,
       SUM(l_extendedprice),
       MIN(l_extendedprice),
       MAX(l_extendedprice),
       AVG(l_extendedprice)
FROM lineitem
GROUP BY l_returnflag,
         l_linestatus
'''

def duckdb_grouped_aggregate(d_con):
    print(d_con.query(grouped_aggregate).to_df())

def duckdb_grouped_aggregate_1t():
    duckdb_grouped_aggregate(duckdb)

def duckdb_grouped_aggregate_2t():
    duckdb_grouped_aggregate(con)

def pandas_grouped_aggregate():
    print(lineitem.groupby(['l_returnflag', 'l_linestatus']).agg(Sum=('l_extendedprice', 'sum'), Min=('l_extendedprice', 'min'), Max=('l_extendedprice', 'max'), Avg=('l_extendedprice', 'mean')))

results = []
results.append(timeit(duckdb_grouped_aggregate_1t, 'DuckDB (1T)'))
results.append(timeit(duckdb_grouped_aggregate_2t, 'DuckDB (2T)'))
results.append(timeit(pandas_grouped_aggregate, 'Pandas'))
plot_results(results, 'Grouped Aggregate')
  l_returnflag l_linestatus  sum(l_extendedprice)  min(l_extendedprice)  \
0            N            O          1.149352e+11                 901.0   
1            R            F          5.656804e+10                 904.0   
2            A            F          5.658655e+10                 904.0   
3            N            F          1.487505e+09                 920.0   

   max(l_extendedprice)  avg(l_extendedprice)  
0              104749.5          38248.015609  
1              104899.5          38250.854626  
2              104949.5          38273.129735  
3              104049.5          38284.467761  
  l_returnflag l_linestatus  sum(l_extendedprice)  min(l_extendedprice)  \
0            N            O          1.149352e+11                 901.0   
1            R            F          5.656804e+10                 904.0   
2            A            F          5.658655e+10                 904.0   
3            N            F          1.487505e+09                 920.0   

   max(l_extendedprice)  avg(l_extendedprice)  
0              104749.5          38248.015609  
1              104899.5          38250.854626  
2              104949.5          38273.129735  
3              104049.5          38284.467761  
                                    Sum    Min       Max           Avg
l_returnflag l_linestatus                                             
A            F             5.658655e+10  904.0  104949.5  38273.129735
N            F             1.487505e+09  920.0  104049.5  38284.467761
             O             1.149352e+11  901.0  104749.5  38248.015609
R            F             5.656804e+10  904.0  104899.5  38250.854626
Grouped Aggregate
          name      time
0  DuckDB (1T)  0.115463
1  DuckDB (2T)  0.222520
2       Pandas  0.708696

Grouped Aggregate with a Filter

This benchmark performs a grouped aggregate with a filter over the shipdate column.

As Pandas does not perform any projection pushdown, we include a version where we manually perform the projection pushdown by filtering only the columns we actually need before running the filter and aggregate.

This optimization is performed automatically in DuckDB by the query optimizer.

def duckdb_grouped_aggregate_filter(d_con):
    print(d_con.query('''
SELECT l_returnflag,
       l_linestatus,
       SUM(l_extendedprice),
       MIN(l_extendedprice),
       MAX(l_extendedprice),
       AVG(l_extendedprice)
FROM lineitem
WHERE
    l_shipdate <= DATE '1998-09-02'
GROUP BY l_returnflag,
         l_linestatus
''').to_df())

def duckdb_grouped_aggregate_filter_1t():
    duckdb_grouped_aggregate_filter(duckdb)

def duckdb_grouped_aggregate_filter_2t():
    duckdb_grouped_aggregate_filter(con)

def pandas_grouped_aggregate_filter():
  filtered_df = lineitem[lineitem['l_shipdate'] < "1998-09-02"]
  print(filtered_df.groupby(['l_returnflag', 'l_linestatus']).agg(Sum=('l_extendedprice', 'sum'), Min=('l_extendedprice', 'min'), Max=('l_extendedprice', 'max'), Avg=('l_extendedprice', 'mean')))

def pandas_grouped_aggregate_filter_projection_pushdown():
  pushed_down_df = lineitem[['l_shipdate', 'l_returnflag', 'l_linestatus', 'l_extendedprice']]
  filtered_df = pushed_down_df[pushed_down_df['l_shipdate'] < "1998-09-02"]
  print(filtered_df.groupby(['l_returnflag', 'l_linestatus']).agg(Sum=('l_extendedprice', 'sum'), Min=('l_extendedprice', 'min'), Max=('l_extendedprice', 'max'), Avg=('l_extendedprice', 'mean')))

results = []
results.append(timeit(duckdb_grouped_aggregate_filter_1t, 'DuckDB (1T)'))
results.append(timeit(duckdb_grouped_aggregate_filter_2t, 'DuckDB (2T)'))
results.append(timeit(pandas_grouped_aggregate_filter, 'Pandas'))
results.append(timeit(pandas_grouped_aggregate_filter_projection_pushdown, 'Pandas (manual pushdown)'))
plot_results(results, 'Grouped Aggregate + Filter')
  l_returnflag l_linestatus  sum(l_extendedprice)  min(l_extendedprice)  \
0            N            O          1.117017e+11                 901.0   
1            A            F          5.658655e+10                 904.0   
2            R            F          5.656804e+10                 904.0   
3            N            F          1.487505e+09                 920.0   

   max(l_extendedprice)  avg(l_extendedprice)  
0              104749.5          38249.117989  
1              104949.5          38273.129735  
2              104899.5          38250.854626  
3              104049.5          38284.467761  
  l_returnflag l_linestatus  sum(l_extendedprice)  min(l_extendedprice)  \
0            N            O          1.117017e+11                 901.0   
1            A            F          5.658655e+10                 904.0   
2            R            F          5.656804e+10                 904.0   
3            N            F          1.487505e+09                 920.0   

   max(l_extendedprice)  avg(l_extendedprice)  
0              104749.5          38249.117989  
1              104949.5          38273.129735  
2              104899.5          38250.854626  
3              104049.5          38284.467761  
                                    Sum    Min       Max           Avg
l_returnflag l_linestatus                                             
A            F             5.658655e+10  904.0  104949.5  38273.129735
N            F             1.487505e+09  920.0  104049.5  38284.467761
             O             1.116318e+11  901.0  104749.5  38249.322811
R            F             5.656804e+10  904.0  104899.5  38250.854626
                                    Sum    Min       Max           Avg
l_returnflag l_linestatus                                             
A            F             5.658655e+10  904.0  104949.5  38273.129735
N            F             1.487505e+09  920.0  104049.5  38284.467761
             O             1.116318e+11  901.0  104749.5  38249.322811
R            F             5.656804e+10  904.0  104899.5  38250.854626
Grouped Aggregate + Filter
                       name      time
0               DuckDB (1T)  0.281653
1               DuckDB (2T)  0.356302
2                    Pandas  2.889015
3  Pandas (manual pushdown)  1.625353

Grouped Aggregate with Join and Filter

In this benchmark we expand on the previous benchmark by including a join and a filter on the joined-on table.

Note that the naive version in Pandas is extremely slow, as it performs a full join of the entire table including all the columns that are not used and all the rows that will be filtered out. For that reason we have included a separate benchmark in which we have manually optimized the Pandas code by pushing down the projections and the filters.

These optimizations are performed automatically in DuckDB by the query optimizer.


# projection & filter on lineitem table
lineitem_projected = lineitem[
  ['l_shipdate',
   'l_orderkey',
   'l_linestatus',
   'l_returnflag',
   'l_extendedprice']
]
lineitem_filtered = lineitem_projected[
  lineitem_projected['l_shipdate'] < "1998-09-02"]
# projection and filter on order table
orders_projected = orders[
  ['o_orderkey',
   'o_orderstatus']
]
orders_filtered = orders_projected[
  orders_projected['o_orderstatus'] == 'O']
# perform the join
merged = lineitem_filtered.merge(
  orders_filtered,
  left_on='l_orderkey',
  right_on='o_orderkey')
# perform the aggregate
result = merged.groupby(
  ['l_returnflag', 'l_linestatus']
).agg(
  Sum=('l_extendedprice', 'sum'),
  Min=('l_extendedprice', 'min'),
  Max=('l_extendedprice', 'max'),
  Avg=('l_extendedprice', 'mean')
)
result
Sum Min Max Avg
l_returnflag l_linestatus
N O 1.080448e+11 901.0 104749.5 38250.662806
def duckdb_grouped_aggregate_filter_join(d_con):
    print(d_con.query('''
SELECT l_returnflag,
       l_linestatus,
       sum(l_extendedprice),
       min(l_extendedprice),
       max(l_extendedprice),
       avg(l_extendedprice)
FROM lineitem lineitem
JOIN orders orders ON (l_orderkey=o_orderkey)
WHERE l_shipdate <= DATE '1998-09-02'
  AND o_orderstatus='O'
GROUP BY l_returnflag,
         l_linestatus
''').to_df())

def duckdb_grouped_aggregate_filter_join_1t():
    duckdb_grouped_aggregate_filter_join(duckdb)

def duckdb_grouped_aggregate_filter_join_2t():
    duckdb_grouped_aggregate_filter_join(con)

def pandas_grouped_aggregate_filter_join():
    merged = lineitem.merge(orders, left_on='l_orderkey', right_on='o_orderkey')
    filtered_a = merged[merged['l_shipdate'] < "1998-09-02"]
    filtered_b = filtered_a[filtered_a['o_orderstatus'] == 'O']
    result = filtered_b.groupby(['l_returnflag', 'l_linestatus']).agg(Sum=('l_extendedprice', 'sum'), Min=('l_extendedprice', 'min'), Max=('l_extendedprice', 'max'), Avg=('l_extendedprice', 'mean'))
    print(result)

def pandas_grouped_aggregate_filter_join_manual_pushdown():
    lineitem_projected = lineitem[['l_shipdate', 'l_orderkey', 'l_linestatus', 'l_returnflag', 'l_extendedprice']]
    lineitem_filtered = lineitem_projected[lineitem_projected['l_shipdate'] < "1998-09-02"]
    orders_projected = orders[['o_orderkey', 'o_orderstatus']]
    orders_filtered = orders_projected[orders_projected['o_orderstatus'] == 'O']
    merged = lineitem_filtered.merge(orders_filtered, left_on='l_orderkey', right_on='o_orderkey')
    result = merged.groupby(['l_returnflag', 'l_linestatus']).agg(Sum=('l_extendedprice', 'sum'), Min=('l_extendedprice', 'min'), Max=('l_extendedprice', 'max'), Avg=('l_extendedprice', 'mean'))
    print(result)

results = []
results.append(timeit(duckdb_grouped_aggregate_filter_join_1t, 'DuckDB (1T)'))
results.append(timeit(duckdb_grouped_aggregate_filter_join_2t, 'DuckDB (2T)'))
results.append(timeit(pandas_grouped_aggregate_filter_join, 'Pandas'))
results.append(timeit(pandas_grouped_aggregate_filter_join_manual_pushdown, 'Pandas (manual pushdown)'))
plot_results(results, 'Grouped Aggregate Join')
  l_returnflag l_linestatus  sum(l_extendedprice)  min(l_extendedprice)  \
0            N            O          1.081147e+11                 901.0   

   max(l_extendedprice)  avg(l_extendedprice)  
0              104749.5          38250.450307  
  l_returnflag l_linestatus  sum(l_extendedprice)  min(l_extendedprice)  \
0            N            O          1.081147e+11                 901.0   

   max(l_extendedprice)  avg(l_extendedprice)  
0              104749.5          38250.450307  
                                    Sum    Min       Max           Avg
l_returnflag l_linestatus                                             
N            O             1.080448e+11  901.0  104749.5  38250.662806
                                    Sum    Min       Max           Avg
l_returnflag l_linestatus                                             
N            O             1.080448e+11  901.0  104749.5  38250.662806
Grouped Aggregate Join
                       name       time
0               DuckDB (1T)   0.218088
1               DuckDB (2T)   0.376592
2                    Pandas  11.403579
3  Pandas (manual pushdown)   2.765103

Appendix A: There and back again: Transferring data from Pandas to a SQL engine and back

As Appendix A relies on the presence of an external PostgreSQL database server, the code cannot be executed in colab. The source code can be found here: https://gist.github.com/hannesmuehleisen/a95a39a1eda63aeb0ca13fd82d1ba49c

Appendix B: PandasSQL

Note: we cannot run this on the original dataset, as colab will run out of memory and crash. Instead for the benchmark we add a sample clause to reduce the data set size to 10% of the original data set size.

!pip install --quiet pandasql
import pandasql as psql
pysqldf = lambda q: psql.sqldf(q, globals())
lineitem_sample = duckdb.query("SELECT * FROM 'lineitemsf1.snappy.parquet' USING SAMPLE 10%").to_df()
ungrouped_aggregate = '''
    SELECT SUM(l_extendedprice), MIN(l_extendedprice), MAX(l_extendedprice), AVG(l_extendedprice) FROM lineitem_sample
'''

def duckdb_ungrouped_aggregate(d_con):
    print(d_con.query(ungrouped_aggregate).to_df())

def duckdb_ungrouped_aggregate_1t():
    duckdb_ungrouped_aggregate(duckdb)

def duckdb_ungrouped_aggregate_2t():
    duckdb_ungrouped_aggregate(con)

def pandas_ungrouped_aggregate():
    print(lineitem_sample.agg(Sum=('l_extendedprice', 'sum'), Min=('l_extendedprice', 'min'), Max=('l_extendedprice', 'max'), Avg=('l_extendedprice', 'mean')))

def ungrouped_aggregate_pandasql():
  print(pysqldf(ungrouped_aggregate))

ua_results = []
ua_results.append(timeit(duckdb_ungrouped_aggregate_1t, 'DuckDB (1T)'))
ua_results.append(timeit(duckdb_ungrouped_aggregate_2t, 'DuckDB (2T)'))
ua_results.append(timeit(pandas_ungrouped_aggregate, 'Pandas'))
ua_results.append(timeit(ungrouped_aggregate_pandasql, 'PandaSQL'))
plot_results(ua_results, 'Ungrouped Aggregate')
   sum(l_extendedprice)  min(l_extendedprice)  max(l_extendedprice)  \
0          2.318151e+10                 907.0              104899.5   

   avg(l_extendedprice)  
0          38240.198955  
   sum(l_extendedprice)  min(l_extendedprice)  max(l_extendedprice)  \
0          2.318151e+10                 907.0              104899.5   

   avg(l_extendedprice)  
0          38240.198955  
     l_extendedprice
Sum     2.318151e+10
Min     9.070000e+02
Max     1.048995e+05
Avg     3.824020e+04
   SUM(l_extendedprice)  MIN(l_extendedprice)  MAX(l_extendedprice)  \
0          2.318151e+10                 907.0              104899.5   

   AVG(l_extendedprice)  
0          38240.198955  
Ungrouped Aggregate
          name      time
0  DuckDB (1T)  0.039731
1  DuckDB (2T)  0.033024
2       Pandas  0.012675
3     PandaSQL  9.181672

Appendix C: Directly querying Parquet files

In the benchmarks above, we fully read the parquet files into Pandas. However, DuckDB also has the capability of directly running queries on top of Parquet files. In this appendix, we show the performance of this compared to loading the file into Python first.

You can even use the wildcard syntax to run queries on multiple Parquet files in the same folder and create a unified single-table view over them (as long as they have the same schema).

# need to install pyarrow for pandas parquet reading
!pip install pyarrow
Requirement already satisfied: pyarrow in /Users/Nok_Lam_Chan/miniconda3/envs/duckdb/lib/python3.9/site-packages (10.0.0)
Requirement already satisfied: numpy>=1.16.6 in /Users/Nok_Lam_Chan/miniconda3/envs/duckdb/lib/python3.9/site-packages (from pyarrow) (1.23.4)
# set up the view
parquet_con = duckdb.connect()
parquet_con.execute("CREATE VIEW lineitem_parquet AS SELECT * FROM 'lineitemsf1.snappy.parquet'")
parquet_con.execute("CREATE VIEW orders_parquet AS SELECT * FROM 'orders.parquet'")
<duckdb.DuckDBPyConnection>

Ungrouped Aggregate

ungrouped_aggregate = '''
    SELECT SUM(l_extendedprice), MIN(l_extendedprice), MAX(l_extendedprice), AVG(l_extendedprice) FROM lineitem_parquet
'''

def duckdb_parquet_query(d_con):
    print(d_con.query(ungrouped_aggregate).to_df())

def duckdb_ungrouped_parquet_1t():
  parquet_con.execute('PRAGMA threads=1')
  duckdb_ungrouped_aggregate(parquet_con)
 
def duckdb_ungrouped_parquet_2t():
  parquet_con.execute('PRAGMA threads=2')
  duckdb_ungrouped_aggregate(parquet_con)

def pandas_ungrouped_aggregate():
    print(lineitem.agg(Sum=('l_extendedprice', 'sum'), Min=('l_extendedprice', 'min'), Max=('l_extendedprice', 'max'), Avg=('l_extendedprice', 'mean')))

def pandas_ungrouped_aggregate_parquet_load():
  lineitem_pandas_parquet = pd.read_parquet('lineitemsf1.snappy.parquet')
  print(lineitem_pandas_parquet.agg(Sum=('l_extendedprice', 'sum'), Min=('l_extendedprice', 'min'), Max=('l_extendedprice', 'max'), Avg=('l_extendedprice', 'mean')))

def pandas_ungrouped_aggregate_parquet_load_pushdown():
  lineitem_pandas_parquet = pd.read_parquet('lineitemsf1.snappy.parquet', columns=['l_extendedprice'])
  print(lineitem_pandas_parquet.agg(Sum=('l_extendedprice', 'sum'), Min=('l_extendedprice', 'min'), Max=('l_extendedprice', 'max'), Avg=('l_extendedprice', 'mean')))

ua_results = []
ua_results.append(timeit(duckdb_ungrouped_parquet_1t, 'DuckDB (1 Thread)'))
ua_results.append(timeit(duckdb_ungrouped_parquet_2t, 'DuckDB (2 Threads)'))
ua_results.append(timeit(pandas_ungrouped_aggregate, 'Pandas'))
ua_results.append(timeit(pandas_ungrouped_aggregate_parquet_load, 'Pandas + Parquet Load'))
ua_results.append(timeit(pandas_ungrouped_aggregate_parquet_load_pushdown, 'Pandas + Parquet Load (Pushdown)'))
plot_results(ua_results, 'Ungrouped Aggregate (Parquet)')
   sum(l_extendedprice)  min(l_extendedprice)  max(l_extendedprice)  \
0          2.295773e+11                 901.0              104949.5   

   avg(l_extendedprice)  
0          38255.138485  
   sum(l_extendedprice)  min(l_extendedprice)  max(l_extendedprice)  \
0          2.295773e+11                 901.0              104949.5   

   avg(l_extendedprice)  
0          38255.138485  
     l_extendedprice
Sum     2.295773e+11
Min     9.010000e+02
Max     1.049495e+05
Avg     3.825514e+04
     l_extendedprice
Sum     2.295773e+11
Min     9.010000e+02
Max     1.049495e+05
Avg     3.825514e+04
     l_extendedprice
Sum     2.295773e+11
Min     9.010000e+02
Max     1.049495e+05
Avg     3.825514e+04
Ungrouped Aggregate (Parquet)
                               name      time
0                 DuckDB (1 Thread)  0.173902
1                DuckDB (2 Threads)  0.086305
2                            Pandas  0.050655
3             Pandas + Parquet Load  6.311870
4  Pandas + Parquet Load (Pushdown)  0.151299

Grouped Aggregate with Join and Filter

def duckdb_grouped_aggregate_filter_join_pq(d_con):
    print(d_con.query('''
SELECT l_returnflag,
       l_linestatus,
       sum(l_extendedprice),
       min(l_extendedprice),
       max(l_extendedprice),
       avg(l_extendedprice)
FROM lineitem_parquet lineitem
JOIN orders_parquet orders ON (l_orderkey=o_orderkey)
WHERE l_shipdate <= DATE '1998-09-02'
  AND o_orderstatus='O'
GROUP BY l_returnflag,
         l_linestatus
''').to_df())

def duckdb_grouped_aggregate_filter_join_pq_1t():
  parquet_con.execute('PRAGMA threads=1')
  duckdb_grouped_aggregate_filter_join_pq(parquet_con)

def duckdb_grouped_aggregate_filter_join_pq_2t():
  parquet_con.execute('PRAGMA threads=2')
  duckdb_grouped_aggregate_filter_join_pq(parquet_con)

def pandas_grouped_aggregate_filter_join_pq():
  lineitem_pandas_parquet = pd.read_parquet('lineitemsf1.snappy.parquet')
  orders_pandas_parquet = pd.read_parquet('orders.parquet')
  merged = lineitem_pandas_parquet.merge(orders, left_on='l_orderkey', right_on='o_orderkey')
  filtered_a = merged[merged['l_shipdate'] < "1998-09-02"]
  filtered_b = filtered_a[filtered_a['o_orderstatus'] == 'O']
  result = filtered_b.groupby(['l_returnflag', 'l_linestatus']).agg(Sum=('l_extendedprice', 'sum'), Min=('l_extendedprice', 'min'), Max=('l_extendedprice', 'max'), Avg=('l_extendedprice', 'mean'))
  print(result)

def pandas_grouped_aggregate_filter_join_manual_pushdown_pq():
  lineitem_projected = pd.read_parquet('lineitemsf1.snappy.parquet', columns=['l_shipdate', 'l_orderkey', 'l_linestatus', 'l_returnflag', 'l_extendedprice'])
  orders_projected = pd.read_parquet('orders.parquet', columns=['o_orderkey', 'o_orderstatus'])
  lineitem_filtered = lineitem_projected[lineitem_projected['l_shipdate'] < "1998-09-02"]
  orders_filtered = orders_projected[orders_projected['o_orderstatus'] == 'O']
  merged = lineitem_filtered.merge(orders_filtered, left_on='l_orderkey', right_on='o_orderkey')
  result = merged.groupby(['l_returnflag', 'l_linestatus']).agg(Sum=('l_extendedprice', 'sum'), Min=('l_extendedprice', 'min'), Max=('l_extendedprice', 'max'), Avg=('l_extendedprice', 'mean'))
  print(result)

results = []
results.append(timeit(duckdb_grouped_aggregate_filter_join_pq_1t, 'DuckDB (1T)'))
results.append(timeit(duckdb_grouped_aggregate_filter_join_pq_2t, 'DuckDB (2T)'))
results.append(timeit(pandas_grouped_aggregate_filter_join_pq, 'Pandas'))
results.append(timeit(pandas_grouped_aggregate_filter_join_manual_pushdown_pq, 'Pandas (manual pushdown)'))
plot_results(results, 'Grouped Aggregate Join (Parquet)')
  l_returnflag l_linestatus  sum(l_extendedprice)  min(l_extendedprice)  \
0            N            O          1.081147e+11                 901.0   

   max(l_extendedprice)  avg(l_extendedprice)  
0              104749.5          38250.450307  
  l_returnflag l_linestatus  sum(l_extendedprice)  min(l_extendedprice)  \
0            N            O          1.081147e+11                 901.0   

   max(l_extendedprice)  avg(l_extendedprice)  
0              104749.5          38250.450307  
                                    Sum    Min       Max           Avg
l_returnflag l_linestatus                                             
N            O             1.080448e+11  901.0  104749.5  38250.662806
                                    Sum    Min       Max           Avg
l_returnflag l_linestatus                                             
N            O             1.080448e+11  901.0  104749.5  38250.662806
Grouped Aggregate Join (Parquet)
                       name       time
0               DuckDB (1T)   0.828549
1               DuckDB (2T)   0.508537
2                    Pandas  13.954761
3  Pandas (manual pushdown)   2.337109
## Part II - Transform Pandas to DuckDB Query
## Part III - Create the DuckDB DataSet
## Part IV - Create the DuckDB Transformer