Comments (6)
(Disclaimer: I'm not a dev)
This looks not like a bug per se, but a missing filter pushdown.
For normal joins, filtering the joined table is essentially the same as joining filtered tables. For positional joins that's not the case, so when you filter a positional join, duckdb has to read in the entire files from disk, join them, and then filter. When you filter single parquets, duckdb instead only reads relevant parts of the files, based on metadata/indices stored along the file, which is much faster than reading the entire file.
It should be possible to add the required logic to pushdown the filter to the relevant parquet scan and then read the other parquet at the filtered row indices, but it's not trivial and therefore likely a feature request rather than an issue.
Yes, that is what I was guessing after doing some code skimming. It did not seem trivial for me to figure out how to add it but it must be a "simple" feature omission, at least conceptually. I suspect positional joins are not yet used in anger, but this would be a key piece of getting there. I think they are a very important feature direction for DuckDB.
from duckdb.
In order for a positional join to work with filter pushdowns, we essentially need two callbacks that are efficiently implemented for each scan:
- Filter out rows that do not fulfill this criteria, but tell me how many rows were skipped
- Skip N rows, but do not scan them
We could then use the first callback to apply the filter in the first result set, and the second to find the corresponding rows that belong to the subsequent (not yet filtered) rows. Adding these callbacks and efficiently implementing them for all the storage back-ends is not trivial, and they cannot be very efficiently implemented for all scanners (e.g. we cannot efficiently skip to the middle of a row group in Parquet files).
In cases where we are scanning e.g. an Arrow Record Batch Reader, these callbacks do not exist, and hence this will not be possible to support unless these or similar callbacks are also implemented for these back-ends.
All-in-all this is a rather large effort and I would not count on this being done in the near-term.
When working with Parquet files, perhaps you could try using file_row_number
and joining on that instead.
from duckdb.
(Disclaimer: I'm not a dev)
This looks not like a bug per se, but a missing filter pushdown.
For normal joins, filtering the joined table is essentially the same as joining filtered tables. For positional joins that's not the case, so when you filter a positional join, duckdb has to read in the entire files from disk, join them, and then filter. When you filter single parquets, duckdb instead only reads relevant parts of the files, based on metadata/indices stored along the file, which is much faster than reading the entire file.
It should be possible to add the required logic to pushdown the filter to the relevant parquet scan and then read the other parquet at the filtered row indices, but it's not trivial and therefore likely a feature request rather than an issue.
from duckdb.
I've got some kind of repro, which was interesting because the discrepancy appears to be huge in the case of ordered integers (Zonemaps?). So there must be some optimization/pushdown in that case that is missing.
import logging
import os
import shutil
import string
import time
import duckdb
import numpy as np
import pandas as pd
import tqdm
from pyarrow.dataset import dataset
_mydir = os.path.dirname(__file__)
_data_dir = os.path.join(_mydir, 'data_example')
_data_dir_a = os.path.join(_data_dir, 'a')
_data_dir_b = os.path.join(_data_dir, 'b')
_duckdb_filename = os.path.join(_mydir, 'duckdb.db')
def create_data_on_disk(n_partitions=100, partition_size=1_000_000, seed=0, string_col_arity=1000):
np.random.seed(seed)
def write(df, path):
os.makedirs(os.path.dirname(path), exist_ok=True)
logging.warning(f'writing {path}')
df.to_parquet(path, index=False)
if os.path.exists(_data_dir):
shutil.rmtree(_data_dir)
string_col_values = ['AAA'] + [
''.join(x)
for x in [
np.random.choice(list(string.ascii_letters + string.digits), size=5) for _ in range(string_col_arity - 1)
]
]
eps = 1e-6
p = np.array([eps] + [(1 - eps) / (string_col_arity - 1)] * (string_col_arity - 1))
assert np.abs(p.sum() - 1) < 1e-6
min_int = 0
for partition in tqdm.tqdm(range(n_partitions)):
filename = f'data_{partition:03d}.parquet'
df = pd.DataFrame(np.random.randn(partition_size, 4), columns=['a', 'b', 'c', 'd'])
df['a'] = np.random.choice(string_col_values, size=partition_size, p=p)
max_int = min_int + np.random.randint(10, 20)
df['b'] = np.random.randint(min_int, max_int, size=partition_size)
df['d'] = np.random.randint(0, 100_000, size=partition_size) # NOTE: this one no ordering
min_int = max_int
path = os.path.join(_data_dir_a, filename)
write(df, path)
df_ = pd.DataFrame(np.random.randn(partition_size, 3), columns=['e', 'f', 'g'])
df_['e'] = df['d'] # NOTE: just as simple way of sanity checking
path = os.path.join(_data_dir_b, filename)
write(df_, path)
def get_duckdb_con():
con = duckdb.connect(_duckdb_filename)
dsa = dataset(_data_dir_a, format='parquet')
con.register('A1', dsa)
dsb = dataset(_data_dir_b, format='parquet')
con.register('B1', dsb)
con.execute(
f"""
create temp view A0 as (select * from parquet_scan('{_data_dir_a}/*'));
create temp view B0 as (select * from parquet_scan('{_data_dir_b}/*'));
create temp view E0 as (select * from A0 positional join B0);
create temp view E1 as (select * from A1 positional join B1);
"""
)
return con
def test_against(table, query_flavour):
"""table should be A0, A1, E0, E1"""
con = get_duckdb_con()
query = {
'A': f"select count(*) as ct from {table} where c > 0 and c <= 0.000001",
'B': f"select count(*) as ct from {table} where a = 'AAA'",
'C': f"select count(*) as ct from {table} where b = 999",
'D': f"select count(*) as ct from {table} where d = 999",
}[query_flavour]
T = time.time()
res = con.execute(query).df()
T = time.time() - T
return dict(res=res, time=T, query=query)
def test_all():
"""Example results:
query_flavour name time query factor
0 A raw_dataset 0.360310 select count(*) as ct from A1 where c > 0 and c <= 0.000001 1
1 A positional join_dataset 2.707489 select count(*) as ct from E1 where c > 0 and c <= 0.000001 8
2 B raw_dataset 0.484304 select count(*) as ct from A1 where a = 'AAA' 1
3 B positional join_dataset 1.915267 select count(*) as ct from E1 where a = 'AAA' 4
4 C raw_dataset 0.012928 select count(*) as ct from A1 where b = 999 1
5 C positional join_dataset 1.171541 select count(*) as ct from E1 where b = 999 91
6 D raw_dataset 0.273574 select count(*) as ct from A1 where d = 999 1
7 D positional join_dataset 1.271827 select count(*) as ct from E1 where d = 999 5
"""
out = list()
for query_flavour in tqdm.tqdm(['A', 'B', 'C', 'D']):
df = None
for k, name in [
# these are much slower
# ('A0', 'raw_parquet_scan'),
# ('E0', 'positional join_parquet_scan'),
('A1', 'raw_dataset'),
('E1', 'positional join_dataset'),
]:
res = test_against(k, query_flavour=query_flavour)
if df is None:
df = res['res']
else:
df_ = res['res']
assert df.shape[0] == df_.shape[0]
pd.testing.assert_frame_equal(df, df_)
# pd.testing.assert_frame_equal(df[['a', 'b', 'c']], df_[['a', 'b', 'c']])
out.append(dict(query_flavour=query_flavour, name=name, time=res['time'], query=res['query']))
df = pd.DataFrame(out)
df['factor'] = (df['time'] / df.groupby('query_flavour')['time'].transform('min')).round(0).astype(int)
return df
results like this (also pasted in the code)
In [4]; create_data_on_disk() # run this once to setup
In [5]: test_all()
Out[5]:
query_flavour name time query factor
0 A raw_dataset 0.360310 select count(*) as ct from A1 where c > 0 and c <= 0.000001 1
1 A positional join_dataset 2.707489 select count(*) as ct from E1 where c > 0 and c <= 0.000001 8
2 B raw_dataset 0.484304 select count(*) as ct from A1 where a = 'AAA' 1
3 B positional join_dataset 1.915267 select count(*) as ct from E1 where a = 'AAA' 4
4 C raw_dataset 0.012928 select count(*) as ct from A1 where b = 999 1
5 C positional join_dataset 1.171541 select count(*) as ct from E1 where b = 999 91
6 D raw_dataset 0.273574 select count(*) as ct from A1 where d = 999 1
7 D positional join_dataset 1.271827 select count(*) as ct from E1 where d = 999 5
If someone could confirm this and report if it's sane, maybe we can spin it over to feature requests? Would be good to at least get some advice on how hard this is to implement if it a sane repro.
from duckdb.
As mentioned this is not a bug and more like a feature request. Positional joins align values based on position (i.e. row 1 on the left side gets united with row 1 on the right side, etc). Pushing a filter into a scan while preserving row numbers is not generically possible and would require special per-scanner support. Supporting this efficiently would therefore require us to extend the functionality of individual scanners to allow for this. It would also not be possible for all scanners (i.e. this would most likely not be supported for arbitrary Arrow data sets).
from duckdb.
As mentioned this is not a bug and more like a feature request. Positional joins align values based on position (i.e. row 1 on the left side gets united with row 1 on the right side, etc). Pushing a filter into a scan while preserving row numbers is not generically possible and would require special per-scanner support. Supporting this efficiently would therefore require us to extend the functionality of individual scanners to allow for this. It would also not be possible for all scanners (i.e. this would most likely not be supported for arbitrary Arrow data sets).
I understand that this is a (missing) feature but can you clarify here? It doesn't really make sense to me to have positional joins without filter push downs, or rather they have limited use. It would mean you need to replicate the ENTIRE set of filter columns in each column partition (or whatever we are calling the things we join). Which would obviate the benefits of the positional join to begin with. But I might be missing some key piece of understanding.
Mechanically, I think one has a filter which results in some row numbering (like a slice per file) and that should be broadcast across to the position join chunks before the position join is then applied.
I think perhaps what you are saying is that most of the predicate pushdowns and efficiency tricks are highly manual and implementation specific, especially in the case of the novel positional join, and that it would involve a lot of work to implement this? Or is there something I am missing that makes this operationally impossible and not merely painful/costly.
from duckdb.
Related Issues (20)
- Query unnesting with grouping sets results in wrong output HOT 1
- IN/OR operator removes completely Parquet Filter Pushdown on other filters HOT 2
- Exporting to Pandas has incorrect column type for BOOLEAN and BIGINT when some entries are NA/NULL HOT 4
- hive_partitioning=True can interpret empty string values in directory as NULL
- Race condition in parallel installations of extensions HOT 1
- `DuckDBPyRelation.intersect()` (Python function API) implicitly performs `.distinct()` operation (i.e. deduplicates for set semantics)
- CSV reader: combination of null_padding = true and parallel = false introduces extra row
- DuckDB lag seems to be much slower than Polars HOT 9
- Binder error when using 'split_part' and reading from stdin
- Python API suggests increasing maximum_object_size when read_json but it's unclear how to do that
- motherduck pivot Invalid Error: PendingQuery can only take a single statement erro HOT 2
- Parquet read : Invalid decimal encoding in Parquet file HOT 2
- DuckDB Build Failure in Alpine Linux Docker Image (host: mac M1) HOT 4
- v1.0.0 jdbc connector: TransactionContext Error on DBeaver
- Incorrect results returned when using JOIN statement HOT 3
- performance of lateral join on timestamp fields
- current_timestamp cannot be used in having clause (Binder Error) HOT 1
- PIVOT should have syntax to GROUP BY no columns
- build error: jemalloc fails due to strerror_r returning char* instead of int HOT 1
- `clang_format` doesn't support Apple Silicon so format-fix doesn't run HOT 2
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from duckdb.