Giter VIP home page Giter VIP logo

Comments (15)

mkleinbort avatar mkleinbort commented on June 27, 2024

I've been working on this as well, but not sure this is the answer. I think your us case is better solved by the implementation of page skipping via column indexes (#14314) or by adopting the lance file format (#12389).

I'm personally going with Lance with scalar indexed instead of partitions. It works remarkably well.

from polars.

CaselIT avatar CaselIT commented on June 27, 2024

Thanks for your input.

I'm not familiar with the lance file format, I'll give it a look thanks (even if it seems still a lot less popular that parquet).

Regarding column indexes it does not seem to still be very well supported, at least as far as I could find pyarrow seems to support writing them, but does not uses them while reading (from the documentation of write_page_index). In any case I'll investigate if it's already possible to use.

In any case I think something as proposed would still be viable, even if there are alternatives to it

from polars.

mkleinbort avatar mkleinbort commented on June 27, 2024

If you find a way to speed up parquet I'm all for it - it's what led me to lance. I hit the issue that compression got very bad when the row groups got too small (or the file got split across too many partitions).

Maybe I don't understand your proposal - what is the core idea? Matching the partitions to row group sizes?

from polars.

CaselIT avatar CaselIT commented on June 27, 2024

If you find a way to speed up parquet I'm all for it - it's what led me to lance. I hit the issue that compression got very bad when the row groups got too small (or the file got split across too many partitions).

The too many row-group thing is really evident on how pyarrow saves the data in hive partitions. If the partition are not sorted how hive partitioning is implemented can only be described as "bad".

Maybe I don't understand your proposal - what is the core idea? Matching the partitions tomorrow group sizes?

It's an alternative to hive partitioning, that in my use case is not really great, meaning I would expect that it's optimized at loading a single partition given a partitioning the key, but it does not seem to that good at it. Again on my use case.

from polars.

CaselIT avatar CaselIT commented on June 27, 2024

I'm personally going with Lance with scalar indexed instead of partitions. It works remarkably well.

I'm probably not doing something correctly, but it seems that translating the same example to use lance perform a lot worse compared to either parquet example (saving the data is fast though):

def run_lance(num_partition):
    import lance

    print(f"Saving data with {num_partition} partitions")
    df = make_data(num_partition)

    keys = df["key"].unique()
    assert len(keys) == num_partition

    shutil.rmtree("write_to_lance", ignore_errors=True)
    with timectx("save using write_to_lance"):
        ds = lance.write_dataset(df, "write_to_lance")

    keys = keys.sample(fraction=1)  # randomize access
    assert len(keys) == num_partition
    with timectx("load single key reopening dataset"):
        for key in keys:
            pl.from_arrow(lance.dataset("write_to_lance").scanner(filter=f"key == {key}").to_table())

    with timectx("load single key reusing dataset - read dataset once"):
        write_to_lance = lance.dataset("write_to_lance")
        for key in keys:
            pl.from_arrow(write_to_lance.scanner(filter=f"key == {key}").to_table())

    shutil.rmtree("write_to_lance_index", ignore_errors=True)
    with timectx("save using write_to_lance with index"):
        ds = lance.write_dataset(df, "write_to_lance_index")
        ds.create_scalar_index("key", "BTREE")

    with timectx("load single key reopening dataset with index"):
        for key in keys:
            pl.from_arrow(lance.dataset("write_to_lance_index").scanner(filter=f"key == {key}").to_table())

    with timectx("load single key reusing dataset with index - read dataset once"):
        write_to_lance_index = lance.dataset("write_to_lance_index")
        for key in keys:
            pl.from_arrow(write_to_lance_index.scanner(filter=f"key == {key}").to_table())

Tried only on windows

# Saving data with 100 partitions
#    save using write_to_lance 587.8781999927014 ms
#    load single key reopening dataset 29757.00619997224 ms
#    load single key reusing dataset - read dataset once 30374.47699997574 ms
#    save using write_to_lance with index 585.647500003688 ms
#    load single key reopening dataset with index 21465.08240001276 ms
#    load single key reusing dataset with index - read dataset once 21361.00340000121 ms
# Saving data with 250 partitions
#    save using write_to_lance 521.4053000090644 ms
#    load single key reopening dataset 64323.835899995174 ms
#    load single key reusing dataset - read dataset once 62762.13719998486 ms
#    save using write_to_lance with index 688.1173999863677 ms
#    load single key reopening dataset with index 43631.29079999635 ms
#    load single key reusing dataset with index - read dataset once 42433.75660001766 ms
# Saving data with 500 partitions
#    save using write_to_lance 480.91260000364855 ms
#    load single key reopening dataset 100031.00830002222 ms
#    load single key reusing dataset - read dataset once 99191.57840003027 ms
#    save using write_to_lance with index 651.4785999897867 ms
#    load single key reopening dataset with index 64974.15620001266 ms
#    load single key reusing dataset with index - read dataset once 64076.031400007196 ms

from polars.

mkleinbort avatar mkleinbort commented on June 27, 2024

I'll check. But before I do I'd like to understand your proposed partitioning scheme. How would you describe it? I read the code but it's not clear to me what is the core idea... To have one row group per value in the partition column?

from polars.

CaselIT avatar CaselIT commented on June 27, 2024

To have one row group per value in the partition column?

Yes, that's all there is to it regarding how the data is saved.

A more complex part is probably how to save the mapping key value to row group index, that in the example is not taken into consideration simplifying it for a dict.
As mentioned in the considerations such data could be saved in the parquet metadata in some standard format.

from polars.

CaselIT avatar CaselIT commented on June 27, 2024

I'm personally going with Lance with scalar indexed instead of partitions. It works remarkably well.

I'm probably not doing something correctly

Seems like lance in this use case is very sensible to the order of the saved data. Adding df = df.sort('key') before saving results in almost 2 order of magnitude improvements compared to the unordered case

from polars.

mkleinbort-ic avatar mkleinbort-ic commented on June 27, 2024

Yes, and you can add an index to make it even faster.

If you know the rows you need, lance's take is very fast as well.

from polars.

jonashaag avatar jonashaag commented on June 27, 2024

I'm getting very different benchmark results on a real world dataset. 200 M rows, 10 columns, 100 partitions (index), zstd level 3 compressed:

# Manual read of partitions
In [59]: %%timeit -n1 -r1
    ...: for k in index:
    ...:     pl.read_parquet(f"/tmp/xxx/xxx={k}/data.parquet")
    ...:
12.8 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)

# Manual read of partitions with PyArrow
In [60]: %%timeit -n1 -r1
    ...: for k in index:
    ...:     pq.read_table(f"/tmp/xxx/xxx={k}/data.parquet")
    ...:
2.24 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)

# Read row groups with PyArrow
In [61]: %%timeit -n1 -r1
    ...: for i, k in enumerate(index):
    ...:     pl.from_arrow(r.read_row_group(i))
    ...:
8.17 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)

# Polars hive partitions
In [62]: scan = pl.scan_parquet("/tmp/xxx/*/*")
In [63]: %%timeit -n1 -r1
    ...: for k in index:
    ...:     scan.filter(pl.col.xxx == k).collect()
    ...:
13.5 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)

# PyArrow hive partitions
In [94]: %%timeit -n1 -r1
    ...: for k in index:
    ...:     pq.read_table("/tmp/xxx", filters=[("xxx", "==", k)])
    ...:
3.12 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)

from polars.

CaselIT avatar CaselIT commented on June 27, 2024

Interesting, thanks for the test! Is the dataset public by any chance?

from polars.

jonashaag avatar jonashaag commented on June 27, 2024

No :(

from polars.

CaselIT avatar CaselIT commented on June 27, 2024

I'll try to check a synthetic one with the same characteristic. Are the key ordered when it was originally partitioned?

from polars.

jonashaag avatar jonashaag commented on June 27, 2024

One problem in your benchmark is that the number of rows (or the file size) are way too small and the types of columns are not representative of real world data. Also, uniform random distribution is very rare in real world data.

As a consequence of the small number of rows the overhead of repeatedly calling .scan_parquet() is very large. If I change the Hive test to

        scan = pl.scan_parquet("write_to_dataset/**/*.parquet")
        for key in keys:
            scan.filter(key=key).collect()

it improves ~ 3x in performance, making it faster than the row group one. Although when I increase the number of rows, the row group one is faster again.

from polars.

CaselIT avatar CaselIT commented on June 27, 2024

Regarding what to keep inside the for keys or not in the example, depends on what one wants to benchmark. The ParquetFile initialization has also a very significant overhead.

My example was more representative of the use case where something creates a partitioned file, then work on the single partitions is done independently, and where sharing a scan_parquet or pyarrow.datataset is not feasible (because for example work is being done in a batch processing or similar system).

In any case thanks for the input. I'll try different types of data to see if I also see different times.

from polars.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.