Giter VIP home page Giter VIP logo

Comments (6)

cottrell avatar cottrell commented on June 24, 2024 1

(Disclaimer: I'm not a dev)

This looks not like a bug per se, but a missing filter pushdown.

For normal joins, filtering the joined table is essentially the same as joining filtered tables. For positional joins that's not the case, so when you filter a positional join, duckdb has to read in the entire files from disk, join them, and then filter. When you filter single parquets, duckdb instead only reads relevant parts of the files, based on metadata/indices stored along the file, which is much faster than reading the entire file.

It should be possible to add the required logic to pushdown the filter to the relevant parquet scan and then read the other parquet at the filtered row indices, but it's not trivial and therefore likely a feature request rather than an issue.

Yes, that is what I was guessing after doing some code skimming. It did not seem trivial for me to figure out how to add it but it must be a "simple" feature omission, at least conceptually. I suspect positional joins are not yet used in anger, but this would be a key piece of getting there. I think they are a very important feature direction for DuckDB.

from duckdb.

Mytherin avatar Mytherin commented on June 24, 2024 1

In order for a positional join to work with filter pushdowns, we essentially need two callbacks that are efficiently implemented for each scan:

  • Filter out rows that do not fulfill this criteria, but tell me how many rows were skipped
  • Skip N rows, but do not scan them

We could then use the first callback to apply the filter in the first result set, and the second to find the corresponding rows that belong to the subsequent (not yet filtered) rows. Adding these callbacks and efficiently implementing them for all the storage back-ends is not trivial, and they cannot be very efficiently implemented for all scanners (e.g. we cannot efficiently skip to the middle of a row group in Parquet files).

In cases where we are scanning e.g. an Arrow Record Batch Reader, these callbacks do not exist, and hence this will not be possible to support unless these or similar callbacks are also implemented for these back-ends.

All-in-all this is a rather large effort and I would not count on this being done in the near-term.

When working with Parquet files, perhaps you could try using file_row_number and joining on that instead.

from duckdb.

soerenwolfers avatar soerenwolfers commented on June 24, 2024

(Disclaimer: I'm not a dev)

This looks not like a bug per se, but a missing filter pushdown.

For normal joins, filtering the joined table is essentially the same as joining filtered tables. For positional joins that's not the case, so when you filter a positional join, duckdb has to read in the entire files from disk, join them, and then filter. When you filter single parquets, duckdb instead only reads relevant parts of the files, based on metadata/indices stored along the file, which is much faster than reading the entire file.

It should be possible to add the required logic to pushdown the filter to the relevant parquet scan and then read the other parquet at the filtered row indices, but it's not trivial and therefore likely a feature request rather than an issue.

from duckdb.

cottrell avatar cottrell commented on June 24, 2024

I've got some kind of repro, which was interesting because the discrepancy appears to be huge in the case of ordered integers (Zonemaps?). So there must be some optimization/pushdown in that case that is missing.

import logging
import os
import shutil
import string
import time

import duckdb
import numpy as np
import pandas as pd
import tqdm
from pyarrow.dataset import dataset

_mydir = os.path.dirname(__file__)

_data_dir = os.path.join(_mydir, 'data_example')
_data_dir_a = os.path.join(_data_dir, 'a')
_data_dir_b = os.path.join(_data_dir, 'b')

_duckdb_filename = os.path.join(_mydir, 'duckdb.db')


def create_data_on_disk(n_partitions=100, partition_size=1_000_000, seed=0, string_col_arity=1000):
    np.random.seed(seed)

    def write(df, path):
        os.makedirs(os.path.dirname(path), exist_ok=True)
        logging.warning(f'writing {path}')
        df.to_parquet(path, index=False)

    if os.path.exists(_data_dir):
        shutil.rmtree(_data_dir)

    string_col_values = ['AAA'] + [
        ''.join(x)
        for x in [
            np.random.choice(list(string.ascii_letters + string.digits), size=5) for _ in range(string_col_arity - 1)
        ]
    ]
    eps = 1e-6
    p = np.array([eps] + [(1 - eps) / (string_col_arity - 1)] * (string_col_arity - 1))
    assert np.abs(p.sum() - 1) < 1e-6

    min_int = 0
    for partition in tqdm.tqdm(range(n_partitions)):
        filename = f'data_{partition:03d}.parquet'
        df = pd.DataFrame(np.random.randn(partition_size, 4), columns=['a', 'b', 'c', 'd'])
        df['a'] = np.random.choice(string_col_values, size=partition_size, p=p)
        max_int = min_int + np.random.randint(10, 20)
        df['b'] = np.random.randint(min_int, max_int, size=partition_size)
        df['d'] = np.random.randint(0, 100_000, size=partition_size)  # NOTE: this one no ordering
        min_int = max_int
        path = os.path.join(_data_dir_a, filename)
        write(df, path)
        df_ = pd.DataFrame(np.random.randn(partition_size, 3), columns=['e', 'f', 'g'])
        df_['e'] = df['d']  # NOTE: just as simple way of sanity checking
        path = os.path.join(_data_dir_b, filename)
        write(df_, path)


def get_duckdb_con():
    con = duckdb.connect(_duckdb_filename)
    dsa = dataset(_data_dir_a, format='parquet')
    con.register('A1', dsa)
    dsb = dataset(_data_dir_b, format='parquet')
    con.register('B1', dsb)
    con.execute(
        f"""
    create temp view A0 as (select * from parquet_scan('{_data_dir_a}/*'));
    create temp view B0 as (select * from parquet_scan('{_data_dir_b}/*'));
    create temp view E0 as (select * from A0 positional join B0);
    create temp view E1 as (select * from A1 positional join B1);
    """
    )
    return con


def test_against(table, query_flavour):
    """table should be A0, A1, E0, E1"""
    con = get_duckdb_con()
    query = {
        'A': f"select count(*) as ct from {table} where c > 0 and c <= 0.000001",
        'B': f"select count(*) as ct from {table} where a = 'AAA'",
        'C': f"select count(*) as ct from {table} where b = 999",
        'D': f"select count(*) as ct from {table} where d = 999",
    }[query_flavour]
    T = time.time()
    res = con.execute(query).df()
    T = time.time() - T
    return dict(res=res, time=T, query=query)


def test_all():
    """Example results:

      query_flavour                     name      time                                                        query  factor
    0             A              raw_dataset  0.360310  select count(*) as ct from A1 where c > 0 and c <= 0.000001       1
    1             A  positional join_dataset  2.707489  select count(*) as ct from E1 where c > 0 and c <= 0.000001       8
    2             B              raw_dataset  0.484304                select count(*) as ct from A1 where a = 'AAA'       1
    3             B  positional join_dataset  1.915267                select count(*) as ct from E1 where a = 'AAA'       4
    4             C              raw_dataset  0.012928                  select count(*) as ct from A1 where b = 999       1
    5             C  positional join_dataset  1.171541                  select count(*) as ct from E1 where b = 999      91
    6             D              raw_dataset  0.273574                  select count(*) as ct from A1 where d = 999       1
    7             D  positional join_dataset  1.271827                  select count(*) as ct from E1 where d = 999       5


    """
    out = list()
    for query_flavour in tqdm.tqdm(['A', 'B', 'C', 'D']):
        df = None
        for k, name in [
            # these are much slower
            # ('A0', 'raw_parquet_scan'),
            # ('E0', 'positional join_parquet_scan'),
            ('A1', 'raw_dataset'),
            ('E1', 'positional join_dataset'),
        ]:
            res = test_against(k, query_flavour=query_flavour)
            if df is None:
                df = res['res']
            else:
                df_ = res['res']
                assert df.shape[0] == df_.shape[0]
                pd.testing.assert_frame_equal(df, df_)
                # pd.testing.assert_frame_equal(df[['a', 'b', 'c']], df_[['a', 'b', 'c']])
            out.append(dict(query_flavour=query_flavour, name=name, time=res['time'], query=res['query']))
    df = pd.DataFrame(out)
    df['factor'] = (df['time'] / df.groupby('query_flavour')['time'].transform('min')).round(0).astype(int)
    return df

results like this (also pasted in the code)

In [4]; create_data_on_disk()  # run this once to setup
In [5]: test_all()
Out[5]: 
  query_flavour                     name      time                                                        query  factor
0             A              raw_dataset  0.360310  select count(*) as ct from A1 where c > 0 and c <= 0.000001       1
1             A  positional join_dataset  2.707489  select count(*) as ct from E1 where c > 0 and c <= 0.000001       8
2             B              raw_dataset  0.484304                select count(*) as ct from A1 where a = 'AAA'       1
3             B  positional join_dataset  1.915267                select count(*) as ct from E1 where a = 'AAA'       4
4             C              raw_dataset  0.012928                  select count(*) as ct from A1 where b = 999       1
5             C  positional join_dataset  1.171541                  select count(*) as ct from E1 where b = 999      91
6             D              raw_dataset  0.273574                  select count(*) as ct from A1 where d = 999       1
7             D  positional join_dataset  1.271827                  select count(*) as ct from E1 where d = 999       5

If someone could confirm this and report if it's sane, maybe we can spin it over to feature requests? Would be good to at least get some advice on how hard this is to implement if it a sane repro.

from duckdb.

Mytherin avatar Mytherin commented on June 24, 2024

As mentioned this is not a bug and more like a feature request. Positional joins align values based on position (i.e. row 1 on the left side gets united with row 1 on the right side, etc). Pushing a filter into a scan while preserving row numbers is not generically possible and would require special per-scanner support. Supporting this efficiently would therefore require us to extend the functionality of individual scanners to allow for this. It would also not be possible for all scanners (i.e. this would most likely not be supported for arbitrary Arrow data sets).

from duckdb.

cottrell avatar cottrell commented on June 24, 2024

As mentioned this is not a bug and more like a feature request. Positional joins align values based on position (i.e. row 1 on the left side gets united with row 1 on the right side, etc). Pushing a filter into a scan while preserving row numbers is not generically possible and would require special per-scanner support. Supporting this efficiently would therefore require us to extend the functionality of individual scanners to allow for this. It would also not be possible for all scanners (i.e. this would most likely not be supported for arbitrary Arrow data sets).

I understand that this is a (missing) feature but can you clarify here? It doesn't really make sense to me to have positional joins without filter push downs, or rather they have limited use. It would mean you need to replicate the ENTIRE set of filter columns in each column partition (or whatever we are calling the things we join). Which would obviate the benefits of the positional join to begin with. But I might be missing some key piece of understanding.

Mechanically, I think one has a filter which results in some row numbering (like a slice per file) and that should be broadcast across to the position join chunks before the position join is then applied.

I think perhaps what you are saying is that most of the predicate pushdowns and efficiency tricks are highly manual and implementation specific, especially in the case of the novel positional join, and that it would involve a lot of work to implement this? Or is there something I am missing that makes this operationally impossible and not merely painful/costly.

from duckdb.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.