Comments (15)
I've been working on this as well, but not sure this is the answer. I think your us case is better solved by the implementation of page skipping via column indexes (#14314) or by adopting the lance file format (#12389).
I'm personally going with Lance with scalar indexed instead of partitions. It works remarkably well.
from polars.
Thanks for your input.
I'm not familiar with the lance file format, I'll give it a look thanks (even if it seems still a lot less popular that parquet).
Regarding column indexes it does not seem to still be very well supported, at least as far as I could find pyarrow seems to support writing them, but does not uses them while reading (from the documentation of write_page_index). In any case I'll investigate if it's already possible to use.
In any case I think something as proposed would still be viable, even if there are alternatives to it
from polars.
If you find a way to speed up parquet I'm all for it - it's what led me to lance. I hit the issue that compression got very bad when the row groups got too small (or the file got split across too many partitions).
Maybe I don't understand your proposal - what is the core idea? Matching the partitions to row group sizes?
from polars.
If you find a way to speed up parquet I'm all for it - it's what led me to lance. I hit the issue that compression got very bad when the row groups got too small (or the file got split across too many partitions).
The too many row-group thing is really evident on how pyarrow saves the data in hive partitions. If the partition are not sorted how hive partitioning is implemented can only be described as "bad".
Maybe I don't understand your proposal - what is the core idea? Matching the partitions tomorrow group sizes?
It's an alternative to hive partitioning, that in my use case is not really great, meaning I would expect that it's optimized at loading a single partition given a partitioning the key, but it does not seem to that good at it. Again on my use case.
from polars.
I'm personally going with Lance with scalar indexed instead of partitions. It works remarkably well.
I'm probably not doing something correctly, but it seems that translating the same example to use lance perform a lot worse compared to either parquet example (saving the data is fast though):
def run_lance(num_partition):
import lance
print(f"Saving data with {num_partition} partitions")
df = make_data(num_partition)
keys = df["key"].unique()
assert len(keys) == num_partition
shutil.rmtree("write_to_lance", ignore_errors=True)
with timectx("save using write_to_lance"):
ds = lance.write_dataset(df, "write_to_lance")
keys = keys.sample(fraction=1) # randomize access
assert len(keys) == num_partition
with timectx("load single key reopening dataset"):
for key in keys:
pl.from_arrow(lance.dataset("write_to_lance").scanner(filter=f"key == {key}").to_table())
with timectx("load single key reusing dataset - read dataset once"):
write_to_lance = lance.dataset("write_to_lance")
for key in keys:
pl.from_arrow(write_to_lance.scanner(filter=f"key == {key}").to_table())
shutil.rmtree("write_to_lance_index", ignore_errors=True)
with timectx("save using write_to_lance with index"):
ds = lance.write_dataset(df, "write_to_lance_index")
ds.create_scalar_index("key", "BTREE")
with timectx("load single key reopening dataset with index"):
for key in keys:
pl.from_arrow(lance.dataset("write_to_lance_index").scanner(filter=f"key == {key}").to_table())
with timectx("load single key reusing dataset with index - read dataset once"):
write_to_lance_index = lance.dataset("write_to_lance_index")
for key in keys:
pl.from_arrow(write_to_lance_index.scanner(filter=f"key == {key}").to_table())
Tried only on windows
# Saving data with 100 partitions
# save using write_to_lance 587.8781999927014 ms
# load single key reopening dataset 29757.00619997224 ms
# load single key reusing dataset - read dataset once 30374.47699997574 ms
# save using write_to_lance with index 585.647500003688 ms
# load single key reopening dataset with index 21465.08240001276 ms
# load single key reusing dataset with index - read dataset once 21361.00340000121 ms
# Saving data with 250 partitions
# save using write_to_lance 521.4053000090644 ms
# load single key reopening dataset 64323.835899995174 ms
# load single key reusing dataset - read dataset once 62762.13719998486 ms
# save using write_to_lance with index 688.1173999863677 ms
# load single key reopening dataset with index 43631.29079999635 ms
# load single key reusing dataset with index - read dataset once 42433.75660001766 ms
# Saving data with 500 partitions
# save using write_to_lance 480.91260000364855 ms
# load single key reopening dataset 100031.00830002222 ms
# load single key reusing dataset - read dataset once 99191.57840003027 ms
# save using write_to_lance with index 651.4785999897867 ms
# load single key reopening dataset with index 64974.15620001266 ms
# load single key reusing dataset with index - read dataset once 64076.031400007196 ms
from polars.
I'll check. But before I do I'd like to understand your proposed partitioning scheme. How would you describe it? I read the code but it's not clear to me what is the core idea... To have one row group per value in the partition column?
from polars.
To have one row group per value in the partition column?
Yes, that's all there is to it regarding how the data is saved.
A more complex part is probably how to save the mapping key value to row group index, that in the example is not taken into consideration simplifying it for a dict.
As mentioned in the considerations such data could be saved in the parquet metadata in some standard format.
from polars.
I'm personally going with Lance with scalar indexed instead of partitions. It works remarkably well.
I'm probably not doing something correctly
Seems like lance in this use case is very sensible to the order of the saved data. Adding df = df.sort('key')
before saving results in almost 2 order of magnitude improvements compared to the unordered case
from polars.
Yes, and you can add an index to make it even faster.
If you know the rows you need, lance's take is very fast as well.
from polars.
I'm getting very different benchmark results on a real world dataset. 200 M rows, 10 columns, 100 partitions (index
), zstd level 3 compressed:
# Manual read of partitions
In [59]: %%timeit -n1 -r1
...: for k in index:
...: pl.read_parquet(f"/tmp/xxx/xxx={k}/data.parquet")
...:
12.8 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
# Manual read of partitions with PyArrow
In [60]: %%timeit -n1 -r1
...: for k in index:
...: pq.read_table(f"/tmp/xxx/xxx={k}/data.parquet")
...:
2.24 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
# Read row groups with PyArrow
In [61]: %%timeit -n1 -r1
...: for i, k in enumerate(index):
...: pl.from_arrow(r.read_row_group(i))
...:
8.17 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
# Polars hive partitions
In [62]: scan = pl.scan_parquet("/tmp/xxx/*/*")
In [63]: %%timeit -n1 -r1
...: for k in index:
...: scan.filter(pl.col.xxx == k).collect()
...:
13.5 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
# PyArrow hive partitions
In [94]: %%timeit -n1 -r1
...: for k in index:
...: pq.read_table("/tmp/xxx", filters=[("xxx", "==", k)])
...:
3.12 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
from polars.
Interesting, thanks for the test! Is the dataset public by any chance?
from polars.
No :(
from polars.
I'll try to check a synthetic one with the same characteristic. Are the key ordered when it was originally partitioned?
from polars.
One problem in your benchmark is that the number of rows (or the file size) are way too small and the types of columns are not representative of real world data. Also, uniform random distribution is very rare in real world data.
As a consequence of the small number of rows the overhead of repeatedly calling .scan_parquet()
is very large. If I change the Hive test to
scan = pl.scan_parquet("write_to_dataset/**/*.parquet")
for key in keys:
scan.filter(key=key).collect()
it improves ~ 3x in performance, making it faster than the row group one. Although when I increase the number of rows, the row group one is faster again.
from polars.
Regarding what to keep inside the for keys or not in the example, depends on what one wants to benchmark. The ParquetFile
initialization has also a very significant overhead.
My example was more representative of the use case where something creates a partitioned file, then work on the single partitions is done independently, and where sharing a scan_parquet
or pyarrow.datataset
is not feasible (because for example work is being done in a batch processing or similar system).
In any case thanks for the input. I'll try different types of data to see if I also see different times.
from polars.
Related Issues (20)
- Why two struct series with different type are equal?
- Upsample fills time_column but not groups HOT 5
- Setup CodSpeed for PR benchmarks HOT 1
- polars' maximum length reached. Consider installing 'polars-u64-idx' with a dataframe of 12K rows HOT 3
- join_asof should allow for strict inequalities on the "on" column
- Inconsistent result from comm_subplan_elim HOT 2
- Polars write_database Connection Issue with Redshift: Unrecognized Configuration Parameter HOT 7
- Support `group_by` on columns of type `List(String)` HOT 3
- Ability to disable jemalloc HOT 3
- `.reshape((-1, -1))` causes PanicException HOT 3
- Schema contradiction with polars.LazyFrame.fill_null(matches_supertype=True)
- Regression in common subplan elimination when cached node is used HOT 2
- Python Polars Import Fail on Aarch64 Linux HOT 6
- [DRAFT] Predicate pushdown broken by lazy `pl.concat` HOT 12
- Regression after 0.20.19: vec_hash_combine operation not supported for dtype list[u8] when joining with secondary column on list[numeric] HOT 1
- Implement window context in `df.sort(col("a").over("b"))`
- Be able to add negative values in dt.offset_by wherever in the string HOT 6
- CSE doesn't detect self join HOT 3
- add function to generate explicit schema mapping from polars datatype to pyarrow datatype
- Accept hexadecimal 0x prefix (and others) in to_integer() (maybe also in CSV integer parsing?) HOT 7
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from polars.