Comments (30)
You could get a backtrace in gdb. 👀
from polars.
@kszlim that's a good find. We should always drop mutexes before we spawn work with rayon as we don't want to hold the mutex when pushing other tasks on the work scheduler. This snippet in the window functions was indeed holding a rwlock when parallelizing work with rayon. Will issue a PR.
from polars.
I'll try to make something more reproducible later if required.
A minimal working example reproducing the problem would be much appreciated.
from polars.
Having a hard time creating something, will keep trying, but I've found that it seemingly is caused by:
expr.map_batches(lambda x: x.to_frame().unnest(q.name).to_numpy(use_pyarrow=True))
Ie. the default use_pyarrow=True
parameter is the cause, if I change it to False, the deadlocks seem to stop, it's hard for me to know for sure, as it seems to take minutes or longer to reproduce.
EDIT
Seems like that was a red herring, I still get deadlocks. Might be related to level of parallelism? (I'm using collect_all
on many lazyframes)?
from polars.
Hmm, i'm actually not sure what the issue is. I'm still experiencing a deadlock, it happens so intermittently.
from polars.
I managed to capture the verbose output for when it's deadlocking, any ideas?
parquet file must be read, statistics not sufficient for predicate.
group_by keys are sorted; running sorted key fast path
group_by keys are sorted; running sorted key fast path
group_by keys are sorted; running sorted key fast path
dataframe filtered
FOUND SORTED KEY: running default HASH AGGREGATION
ASOF join dataframes finished
estimated unique values: 1
run STREAMING HASH AGGREGATION
RUN STREAMING PIPELINE
[df -> generic-group_by -> ordered_sink]
finish streaming aggregation with local in-memory table
from polars.
Ran into another instance of it, turns out it doesn't matter what combination of:
map_batches
/map_elements
or use_pyarrow=True
or use_pyarrow=False
is, it can deadlock under either case.
parquet file must be read, statistics not sufficient for predicate.
group_by keys are sorted; running sorted key fast path
group_by keys are sorted; running sorted key fast path
group_by keys are sorted; running sorted key fast path
group_by keys are sorted; running sorted key fast path
group_by keys are sorted; running sorted key fast path
group_by keys are sorted; running sorted key fast path
dataframe filtered
dataframe filtered
group_by keys are sorted; running sorted key fast path
FOUND SORTED KEY: running default HASH AGGREGATION
FOUND SORTED KEY: running default HASH AGGREGATION
ASOF join dataframes finished
group_by keys are sorted; running sorted key fast path
ASOF join dataframes finished
group_by keys are sorted; running sorted key fast path
estimated unique values: 1
run STREAMING HASH AGGREGATION
RUN STREAMING PIPELINE
[df -> generic-group_by -> ordered_sink]
finish streaming aggregation with local in-memory table
estimated unique values: 1
run STREAMING HASH AGGREGATION
RUN STREAMING PIPELINE
[df -> generic-group_by -> ordered_sink]
finish streaming aggregation with local in-memory table
group_by keys are sorted; running sorted key fast path
dataframe filtered
FOUND SORTED KEY: running default HASH AGGREGATION
ASOF join dataframes finished
dataframe filtered
estimated unique values: 1
run STREAMING HASH AGGREGATION
RUN STREAMING PIPELINE
[df -> generic-group_by -> ordered_sink]
finish streaming aggregation with local in-memory table
FOUND SORTED KEY: running default HASH AGGREGATION
ASOF join dataframes finished
estimated unique values: 1
run STREAMING HASH AGGREGATION
RUN STREAMING PIPELINE
[df -> generic-group_by -> ordered_sink]
finish streaming aggregation with local in-memory table
from polars.
@ritchie46 anything pop out to you? I'm also using a lot of window functions in this query if that's relevant.
from polars.
Do you use map_elements
/ map_batches
? Then I think we can deadlock because of the python GIL.
from polars.
I removed the map_*
and it still deadlocks.
from polars.
Here's the obfuscated query plan:
SORT BY [col("crenulation"), col("semipurposiveness")]
AGGREGATE
[] BY [col("crenulation").alias("crenulation"), col("semipurposiveness").alias("semipurposiveness")] FROM
SELECT [col("semipurposiveness"), col("crenulation")] FROM
DROP_NULLS by: [phase]
WITH_COLUMNS:
[.when(col("raob").is_between([col("mastopexy"), col("glomus")])).then(String(ungarment)).otherwise(null.strict_cast(String)).alias("crenulation")]
SELECT [col("semipurposiveness"), col("raob"), col("mastopexy"), col("glomus")] FROM
ASOF JOIN:
LEFT PLAN ON: [col("raob")]
SORT BY [col("semipurposiveness"), col("raob")]
WITH_COLUMNS:
[col("semipurposiveness").over([col("semipurposiveness")]), col("raob").over([col("semipurposiveness")])]
Parquet SCAN 20 files: first file: s3://my-bucket/redacted.parquet
PROJECT 2/215 COLUMNS
SELECTION: col("semipurposiveness").is_in([Series])
RIGHT PLAN ON: [col("mastopexy")]
SORT BY [col("mastopexy")]
WITH_COLUMNS:
[.when([(col("__POLARS_CSER_12832905341579707231")) < (col("mastopexy"))]).then(col("mastopexy")).otherwise(col("__POLARS_CSER_12832905341579707231")).alias("glomus"), [(col("glomus")) + (0.dt.duration([0, 0, 0, 10, 0, 0, 0]).dt.total_nanoseconds())].alias("__POLARS_CSER_12832905341579707231")]
SORT BY [col("mastopexy"), col("glomus")]
SELECT [col("semipurposiveness"), col("mastopexy"), col("glomus")] FROM
AGGREGATE
[col("raob").min().alias("mastopexy"), col("raob").max().alias("glomus")] BY [col("semipurposiveness"), col("Deltaville")] FROM
FILTER [(col("antiwoman")) | (col("autobus"))].over([col("semipurposiveness")]) FROM
WITH_COLUMNS:
[[(col("antiwoman")) | (col("autobus"))].rle_id().over([col("semipurposiveness")]).alias("Deltaville")]
WITH_COLUMNS:
[col("semipurposiveness").over([col("semipurposiveness")]), col("raob").over([col("semipurposiveness")]), [(col("Farnese").alias("nonpersuadable").list.get([2]).alias("dabblingness").abs()) > (3.0)].over([col("semipurposiveness")]).alias("antiwoman"), [([([(col("Farnese").alias("nonpersuadable").list.get([2])) - (col("Farnese").alias("nonpersuadable").list.get([2]).alias("dabblingness").shift([1]))]) / ([([(col("raob").strict_cast(Float64)) * (1.0000e-9)]) - ([(col("raob").strict_cast(Float64)) * (1.0000e-9)].alias("semibachelor").shift([1]))])].alias("dabblingness").abs()) > (4.0)].over([col("semipurposiveness")]).alias("autobus")]
Parquet SCAN 20 files: first file: s3://my-bucket/redacted.parquet
PROJECT 3/89 COLUMNS
SELECTION: col("semipurposiveness").is_in([Series])
END ASOF JOIN
from polars.
I've tried compiling main
on this commit 92902e6 and it still deadlocks, though I got a deadlock after this logging:
run PARTITIONED HASH AGGREGATION
dataframe filtered
FOUND SORTED KEY: running default HASH AGGREGATION
ASOF join dataframes finished
estimated unique values: 1
run PARTITIONED HASH AGGREGATION
dataframe filtered
estimated unique values: 62345
estimated unique count: 62345 exceeded the boundary: 1000, running default HASH AGGREGATION
ASOF join dataframes finished
estimated unique values: 1
run PARTITIONED HASH AGGREGATION
from polars.
Not sure if this is helpful, but it seems to help a little if I turn down the concurrency budget, though that slows down my query quite dramatically.
from polars.
I've tried running it under valgrind's helgrind, and am unable to reproduce, it probably modifies the runtime charecteristics too much.
Haven't tried https://github.com/tokio-rs/loom or https://github.com/awslabs/shuttle i'm curious if they should be integrated into testing for polars?
from polars.
@ritchie46 i locally compiled polars and replaced all usages of OnceLock/RwLock/Mutex with tracing-mutex and it didn't panic when running, so I'm guessing the deadlocking either has something to do with the GIL (though there's no map_*
methods being called) or perhaps to do with thread starvation or async deadlocking.
from polars.
Can you try generating a flamegraph? We can maybe see where the threads stall using it - let it deadlock for a while before killing the PID from another terminal.
from polars.
Can you try generating a flamegraph? We can maybe see where the threads stall using it - let it deadlock for a while before killing the PID from another terminal.
I did try to (I tried to generate a flamegraph by attaching to the deadlocked process) but since it wasn't making any progress it didn't seem to log anything in perf.
from polars.
Alright, in that case I think we can try to use gdb
- if you can run this
gdb -p <PID> -ex 'thread apply all bt' -ex detach -ex quit > backtrace.txt
from polars.
If i encounter this again I'll capture one, after upgrading to 0.20.20 I have yet to encounter a deadlock, perhaps it's been fixed by #15626 or something.
Lets keep this issue open for a while, I'll close it once I'm sure it's been fixed?
from polars.
@nameexhaustion @ritchie46 looks like it's not fixed, had to run a 30s query over 500 times for it to deadlock, here's the backtrace.
from polars.
Nice. I'm guessing a timeout for now, given I can see 4 occurences of tokio::runtime::park::Inner::park
, and that you had run the query several times in succession.
Thread 162 (Thread 0x7f80111ff700 (LWP 10692)):
#0 0x00007f824e6c92a9 in syscall () from /lib64/libc.so.6
#1 0x00007f8245c90a43 in tokio::runtime::park::Inner::park () from /path/to/venv/lib/python3.11/site-packages/polars/polars.abi3.so
#2 0x00007f824499da6c in polars_io::pl_async::RuntimeManager::block_on_potential_spawn::{{closure}} () from /path/to/venv/lib/python3.11/site-packages/polars/polars.abi3.so
#3 0x00007f824499a780 in <polars_lazy::physical_plan::executors::scan::parquet::ParquetExec as polars_lazy::physical_plan::executors::executor::Executor>::execute::{{closure}} () from /path/to/venv/lib/python3.11/site-packages/polars/polars.abi3.so
Would you be able to give this a try - #15643. I have re-added a the connection timeout so I'm hoping that instead of hanging it should display an error.
from polars.
I gave that a try and it didn't work, still deadlocked/hung.
from polars.
Is there anything else i can do to help diagnose this @nameexhaustion ?
from polars.
Does this look like it might be the problem?
#0 0x00007f4eb53162a9 in syscall () from /lib64/libc.so.6
#1 0x00007f4eabf21e82 in std::sys::pal::unix::futex::futex_wait () at library/std/src/sys/pal/unix/futex.rs:62
#2 std::sys::sync::condvar::futex::Condvar::wait_optional_timeout () at library/std/src/sys/sync/condvar/futex.rs:49
#3 std::sys::sync::condvar::futex::Condvar::wait () at library/std/src/sys/sync/condvar/futex.rs:33
#4 0x00007f4eabda9e2e in std::sync::condvar::Condvar::wait (self=0x7f4aab614c50, guard=...) at /rustc/c9f8f3438a8134a413aa5d4903e0196e44e37bbc/library/std/src/sync/condvar.rs:189
#5 tokio::runtime::park::Inner::park (self=0x7f4aab614c40) at /path/to/rust_deps/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.36.0/src/runtime/park.rs:116
#6 0x00007f4ea8ca2a2a in tokio::runtime::park::CachedParkThread::block_on (self=0x7f4ca4dfb65f, f=...) at /path/to/rust_deps/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.36.0/src/runtime/park.rs:285
#7 0x00007f4ea8c86338 in tokio::runtime::context::blocking::BlockingRegionGuard::block_on (self=0x7f4ca4dfb6a0, f=...) at /path/to/rust_deps/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.36.0/src/runtime/context/blocking.rs:66
#8 tokio::runtime::scheduler::multi_thread::MultiThread::block_on::{{closure}} (blocking=0x7f4ca4dfb6a0) at /path/to/rust_deps/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.36.0/src/runtime/scheduler/multi_thread/mod.rs:87
#9 tokio::runtime::context::runtime::enter_runtime (handle=<optimized out>, allow_block_in_place=<optimized out>, f=...) at /path/to/rust_deps/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.36.0/src/runtime/context/runtime.rs:65
#10 0x00007f4ea8c23bb6 in tokio::runtime::scheduler::multi_thread::MultiThread::block_on (handle=0x7f4ead912aa0 <_ZN9polars_io8pl_async7RUNTIME17h03c7e01a78cac90cE.llvm.12616630128375555060+48>, self=<optimized out>, future=...) at /path/to/rust_deps/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.36.0/src/runtime/scheduler/multi_thread/mod.rs:86
#11 tokio::runtime::runtime::Runtime::block_on (self=<optimized out>, future=...) at /path/to/rust_deps/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.36.0/src/runtime/runtime.rs:350
#12 0x00007f4ea8c63ee3 in polars_io::pl_async::RuntimeManager::block_on_potential_spawn::{{closure}} () at crates/polars-io/src/pl_async.rs:246
#13 tokio::runtime::scheduler::multi_thread::worker::block_in_place (f=...) at /path/to/rust_deps/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.36.0/src/runtime/scheduler/multi_thread/worker.rs:440
#14 0x00007f4ea8d46a6a in tokio::runtime::scheduler::block_in_place::block_in_place (f=...) at /path/to/rust_deps/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.36.0/src/runtime/scheduler/block_in_place.rs:20
#15 tokio::task::blocking::block_in_place (f=...) at /path/to/rust_deps/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.36.0/src/task/blocking.rs:78
#16 polars_io::pl_async::RuntimeManager::block_on_potential_spawn (self=0xfffffffffffffe00, future=<error reading variable: access outside bounds of object referenced via synthetic pointer>) at crates/polars-io/src/pl_async.rs:246
#17 polars_lazy::physical_plan::executors::scan::parquet::ParquetExec::read (self=0x7f4aab603c00) at crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs:363
#18 0x00007f4ea8cd62db in <polars_lazy::physical_plan::executors::scan::parquet::ParquetExec as polars_lazy::physical_plan::executors::executor::Executor>::execute::{{closure}} () at crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs:393
#19 polars_lazy::physical_plan::state::ExecutionState::record (self=<optimized out>, func=..., name=...) at crates/polars-lazy/src/physical_plan/state.rs:120
#20 0x00007f4ea8d47910 in <polars_lazy::physical_plan::executors::scan::parquet::ParquetExec as polars_lazy::physical_plan::executors::executor::Executor>::execute (self=0x7f4aab603c00, state=0x7f4ca4dfce70) at crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs:393
#21 0x00007f4ea8d0e086 in <polars_lazy::physical_plan::executors::stack::StackExec as polars_lazy::physical_plan::executors::executor::Executor>::execute (self=0x7f4c51070190, state=0x7f4ca4dfce70) at crates/polars-lazy/src/physical_plan/executors/stack.rs:78
#22 0x00007f4ea8d0e086 in <polars_lazy::physical_plan::executors::stack::StackExec as polars_lazy::physical_plan::executors::executor::Executor>::execute (self=0x7f4bbf26f5a0, state=0x7f4ca4dfce70) at crates/polars-lazy/src/physical_plan/executors/stack.rs:78
#23 0x00007f4ea8d0d688 in <polars_lazy::physical_plan::executors::filter::FilterExec as polars_lazy::physical_plan::executors::executor::Executor>::execute (self=0x7f4aab614bd0, state=0x7f4ca4dfce70) at crates/polars-lazy/src/physical_plan/executors/filter.rs:74
#24 0x00007f4ea8c90c90 in <polars_lazy::physical_plan::executors::group_by_partitioned::PartitionGroupByExec as polars_lazy::physical_plan::executors::executor::Executor>::execute (self=0x7f4aab68c780, state=0x7f4ca4dfce70) at crates/polars-lazy/src/physical_plan/executors/group_by_partitioned.rs:354
#25 0x00007f4ea8ce1fa7 in polars_lazy::physical_plan::executors::projection_simple::ProjectionSimple::execute_impl (self=0x7f4aab63f260, state=0x408, columns=...) at crates/polars-lazy/src/physical_plan/executors/projection_simple.rs:17
#26 0x00007f4ea8ce213e in <polars_lazy::physical_plan::executors::projection_simple::ProjectionSimple as polars_lazy::physical_plan::executors::executor::Executor>::execute (self=0x7f4aab63f260, state=0x7f4ca4dfce70) at crates/polars-lazy/src/physical_plan/executors/projection_simple.rs:42
#27 0x00007f4ea8cea4e9 in <polars_lazy::physical_plan::executors::sort::SortExec as polars_lazy::physical_plan::executors::executor::Executor>::execute (self=0x7f4aab6773c0, state=0x7f4ca4dfce70) at crates/polars-lazy/src/physical_plan/executors/sort.rs:48
#28 0x00007f4ea8d0e086 in <polars_lazy::physical_plan::executors::stack::StackExec as polars_lazy::physical_plan::executors::executor::Executor>::execute (self=0x7f4ab2a723b0, state=0x7f4ca4dfce70) at crates/polars-lazy/src/physical_plan/executors/stack.rs:78
#29 0x00007f4ea8cea4e9 in <polars_lazy::physical_plan::executors::sort::SortExec as polars_lazy::physical_plan::executors::executor::Executor>::execute (self=0x7f4aab677300, state=0x7f4ca4dfce70) at crates/polars-lazy/src/physical_plan/executors/sort.rs:48
#30 0x00007f4ea8aea828 in <polars_lazy::physical_plan::executors::join::JoinExec as polars_lazy::physical_plan::executors::executor::Executor>::execute::{{closure}} () at crates/polars-lazy/src/physical_plan/executors/join.rs:64
#31 rayon_core::join::join::call::{{closure}} () at /path/to/rust_deps/.cargo/registry/src/index.crates.io-6f17d22bba15001f/rayon-core-1.12.1/src/join/mod.rs:102
#32 rayon_core::join::join_context::call_b::{{closure}} (migrated=<optimized out>) at /path/to/rust_deps/.cargo/registry/src/index.crates.io-6f17d22bba15001f/rayon-core-1.12.1/src/join/mod.rs:129
#33 rayon_core::job::StackJob<L,F,R>::run_inline (self=..., stolen=<optimized out>) at /path/to/rust_deps/.cargo/registry/src/index.crates.io-6f17d22bba15001f/rayon-core-1.12.1/src/job.rs:102
#34 0x00007f4ea8bcd7f1 in rayon_core::join::join_context::{{closure}} (worker_thread=0x7f4ca4dfea80, injected=false) at /path/to/rust_deps/.cargo/registry/src/index.crates.io-6f17d22bba15001f/rayon-core-1.12.1/src/join/mod.rs:159
#35 0x00007f4ea8bd92e1 in rayon_core::registry::in_worker (op=...) at /path/to/rust_deps/.cargo/registry/src/index.crates.io-6f17d22bba15001f/rayon-core-1.12.1/src/registry.rs:951
#36 0x00007f4ea8bd66e4 in rayon_core::join::join_context (oper_a=..., oper_b=...) at /path/to/rust_deps/.cargo/registry/src/index.crates.io-6f17d22bba15001f/rayon-core-1.12.1/src/join/mod.rs:132
#37 rayon_core::join::join (oper_a=..., oper_b=...) at /path/to/rust_deps/.cargo/registry/src/index.crates.io-6f17d22bba15001f/rayon-core-1.12.1/src/join/mod.rs:105
#38 rayon_core::thread_pool::ThreadPool::join::{{closure}} () at /path/to/rust_deps/.cargo/registry/src/index.crates.io-6f17d22bba15001f/rayon-core-1.12.1/src/thread_pool/mod.rs:280
#39 rayon_core::thread_pool::ThreadPool::install::{{closure}} () at /path/to/rust_deps/.cargo/registry/src/index.crates.io-6f17d22bba15001f/rayon-core-1.12.1/src/thread_pool/mod.rs:147
#40 rayon_core::registry::Registry::in_worker (self=<optimized out>, op=...) at /path/to/rust_deps/.cargo/registry/src/index.crates.io-6f17d22bba15001f/rayon-core-1.12.1/src/registry.rs:502
#41 0x00007f4ea8ce7e4b in rayon_core::thread_pool::ThreadPool::install (self=0xfffffffffffffe00, op=...) at /path/to/rust_deps/.cargo/registry/src/index.crates.io-6f17d22bba15001f/rayon-core-1.12.1/src/thread_pool/mod.rs:147
#42 rayon_core::thread_pool::ThreadPool::join (self=0xfffffffffffffe00, oper_a=..., oper_b=...) at /path/to/rust_deps/.cargo/registry/src/index.crates.io-6f17d22bba15001f/rayon-core-1.12.1/src/thread_pool/mod.rs:280
#43 <polars_lazy::physical_plan::executors::join::JoinExec as polars_lazy::physical_plan::executors::executor::Executor>::execute (self=0x7f4aab6311c0, state=0x7f4ca4dfe2a0) at crates/polars-lazy/src/physical_plan/executors/join.rs:57
#44 0x00007f4ea8ce1fa7 in polars_lazy::physical_plan::executors::projection_simple::ProjectionSimple::execute_impl (self=0x7f4cde83b3c0, state=0x408, columns=...) at crates/polars-lazy/src/physical_plan/executors/projection_simple.rs:17
#45 0x00007f4ea8ce213e in <polars_lazy::physical_plan::executors::projection_simple::ProjectionSimple as polars_lazy::physical_plan::executors::executor::Executor>::execute (self=0x7f4cde83b3c0, state=0x7f4ca4dfe2a0) at crates/polars-lazy/src/physical_plan/executors/projection_simple.rs:42
#46 0x00007f4ea8d0e086 in <polars_lazy::physical_plan::executors::stack::StackExec as polars_lazy::physical_plan::executors::executor::Executor>::execute (self=0x7f4c6fa713c0, state=0x7f4ca4dfe2a0) at crates/polars-lazy/src/physical_plan/executors/stack.rs:78
#47 0x00007f4ea8d1ce07 in <polars_lazy::physical_plan::executors::udf::UdfExec as polars_lazy::physical_plan::executors::executor::Executor>::execute (self=0x7f4aab670600, state=0x7f4ca4dfe2a0) at crates/polars-lazy/src/physical_plan/executors/udf.rs:17
#48 0x00007f4ea8ce1fa7 in polars_lazy::physical_plan::executors::projection_simple::ProjectionSimple::execute_impl (self=0x7f4bfec3d780, state=0x408, columns=...) at crates/polars-lazy/src/physical_plan/executors/projection_simple.rs:17
#49 0x00007f4ea8ce213e in <polars_lazy::physical_plan::executors::projection_simple::ProjectionSimple as polars_lazy::physical_plan::executors::executor::Executor>::execute (self=0x7f4bfec3d780, state=0x7f4ca4dfe2a0) at crates/polars-lazy/src/physical_plan/executors/projection_simple.rs:42
#50 0x00007f4ea8c90c90 in <polars_lazy::physical_plan::executors::group_by_partitioned::PartitionGroupByExec as polars_lazy::physical_plan::executors::executor::Executor>::execute (self=0x7f4aab68c5a0, state=0x7f4ca4dfe2a0) at crates/polars-lazy/src/physical_plan/executors/group_by_partitioned.rs:354
#51 0x00007f4ea8cea4e9 in <polars_lazy::physical_plan::executors::sort::SortExec as polars_lazy::physical_plan::executors::executor::Executor>::execute (self=0x7f4aab677240, state=0x7f4ca4dfe2a0) at crates/polars-lazy/src/physical_plan/executors/sort.rs:48
from polars.
Does this look like it might be the problem?
I think the backtrace is telling us we are stuck on an async future, but we need to try and confirm this.
I have updated #15643 to set the timeout to 3 minutes instead of 7, and also set the non-connect timeout - please give it another try.
Please also try using this function instead of directly calling collect_all
, it needs to be given a heartbeat_lf
(just use a pl.scan_parquet(...)
of the same files):
collect_all_with_timeout_and_retry
def collect_all_with_timeout_and_retry(
lfs: list[pl.LazyFrame],
heartbeat_lf: pl.LazyFrame,
*,
timeout_secs=30 + int(os.getenv("POLARS_ASYNC_TIMEOUT", "3")) * 60,
heartbeat_interval_secs=10,
) -> list[pl.LazyFrame]:
import warnings
import asyncio
from time import perf_counter
print(
f"collect_all_with_timeout_and_retry {timeout_secs = } {heartbeat_interval_secs = }"
)
is_complete = asyncio.Event()
async def collect_loop():
excs = []
for i in range(1, 4):
if i == 3:
raise Exception(f"collect_loop failed after {i} tries ({excs = })")
task = pl.collect_all_async(lfs)
try:
out = await asyncio.wait_for(task, timeout=timeout_secs)
except Exception as e:
excs.append(e)
continue
break
if i > 1:
raise Exception(
f"collect_loop finished, but required {i} tries ({excs = })"
)
return out
async def heartbeat_loop():
while not is_complete.is_set():
start = perf_counter()
task = heartbeat_lf.select(pl.first()).head(1).collect_async()
try:
await asyncio.wait_for(task, timeout=heartbeat_interval_secs)
except Exception as e:
raise f"heartbeat_loop timed out {e = }"
elapsed = perf_counter() - start
sleep = heartbeat_interval_secs - elapsed
sleep = max(0, sleep)
with warnings.catch_warnings():
warnings.simplefilter("ignore")
next(asyncio.as_completed([is_complete.wait(), asyncio.sleep(sleep)]))
async def f():
task = asyncio.create_task(heartbeat_loop())
out = await collect_loop()
is_complete.set()
await task
return out
return asyncio.run(f())
from polars.
Is it possible:
https://github.com/pola-rs/polars/blob/main/crates/polars-lazy/src/physical_plan/expressions/window.rs#L466
Needs to be changed to:
// Try to get cached grouptuples
let (mut groups, _, cache_key) = if state.cache_window() {
let mut cache_key = String::with_capacity(32 * group_by_columns.len());
write!(&mut cache_key, "{}", state.branch_idx).unwrap();
for s in &group_by_columns {
cache_key.push_str(s.name());
}
let mut gt_map = state.group_tuples.write().unwrap();
// we run sequential and partitioned
// and every partition run the cache should be empty so we expect a max of 1.
debug_assert!(gt_map.len() <= 1);
if let Some(gt) = gt_map.get_mut(&cache_key) {
if df.height() > 0 {
assert!(!gt.is_empty());
};
// We take now, but it is important that we set this before we return!
// a next windows function may get this cached key and get an empty if this
// does not happen
(std::mem::take(gt), true, cache_key)
} else {
drop(gt_map); # Added a drop to the guard here
(create_groups()?, false, cache_key)
}
Specifically dropping the guard in the else
case. Just shooting in the dark.
Seems to prevent me from deadlocking (though I'm not 100% sure if it's working or not or just getting lucky). I am running a lot of window expressions, so it's possible it's a very tough deadlock to trigger that most people wouldn't encounter.
@nameexhaustion
I couldn't get your collect_all_with_timeout_and_retry
script working, i'll dig into it more if this proves to not be the solution.
from polars.
Okay nvm that didn't work.
from polars.
@nameexhaustion gave your change a try, it didn't seem to timeout after deadlocking for 5m+ wasn't sure about the python
code you gave me, it didn't work, tried to tweak it, but wasn't sure what i was looking for.
from polars.
The python function was to:
- If your
collect_all
took more than 3 1/2 minutes, cancel it and try again. After the 2nd try print whether or not it succeeded and then exit. - During your
collect_all
, we periodically download 1 row of 1 column every 10 seconds, if this didn't finish within 10 seconds we would also print and error and exit.
Did you manage to get any output from it at all? You may have to ensure you don't have any map_elements
/ anything that would lock the GIL.
from polars.
The python function was to:
- If your
collect_all
took more than 3 1/2 minutes, cancel it and try again. After the 2nd try print whether or not it succeeded and then exit.- During your
collect_all
, we periodically download 1 row of 1 column every 10 seconds, if this didn't finish within 10 seconds we would also print and error and exit.Did you manage to get any output from it at all? You may have to ensure you don't have any
map_elements
/ anything that would lock the GIL.
When running with just a collect_all
using your fork, it would just deadlock the same as before. I'll try to run your collect_all_with_timeout_and_retry
sometime.
from polars.
Related Issues (20)
- `Series.unique_counts` not supported for dtype `bool`
- `floor`/`ceil` for integers
- `expr.to_field` returns wrong datatype for bitwise and/or HOT 2
- Loading an excel file with `read_excel(engine="calamine")` fails when a float column is all NaN's HOT 6
- Documentation for `all/any_horizontal` mismatched with implementation
- `all_horizontal` on wide dataframes crashes with SIGSEV on latest polars HOT 3
- It would be nice to be able to aggregate not just by rolling windows in time but by boolean expression bounded windows
- Some struct functions do not work on `.list.to_struct` HOT 3
- write_delta claims Categoricals aren't supported, but they are
- Add contingency table/ frequency table functionality in .pivot method
- `.struct.field('*')` StructFieldNotFoundError HOT 2
- `.struct.field('*')` PanicException: no `rename_alias` at this point
- Panic in calling to_numpy on a concatted DataFrame HOT 3
- scan_parquet collect streaming not streaming even though explain indicates streaming HOT 4
- wrong documentation in DataFrame.update HOT 1
- ComputeError: unable to parse Hive partition value: "TRUE" HOT 2
- `min` fails on `duration` types HOT 3
- Github release for rust-polars 0.40.0 HOT 1
- Getting panic when calling `LazyFrame.group_by().map_groups` and intermitten panic when calling `LazyFrame.columns` HOT 4
- GitHub release seems created with wrong commit? HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from polars.