Giter VIP home page Giter VIP logo

madsim's Introduction

MadSim

Crate Docs CI

Magical Deterministic Simulator for distributed systems.

Deterministic Simulation Testing

MadSim is a Rust async runtime similar to tokio, but with a key feature called deterministic simulation.

The main idea is borrowed from FoundationDB and sled simulation guide. Your code should be able to deterministically executed on top of a simulator. The simulator will amplify randomness, create chaos and inject failures into your system. A lot of hidden bugs may be revealed, which you can then deterministically reproduce them until they are fixed. If your system can survive such chaos, you will have more confidence in deploying your system in the real world.

However, implementing deterministic simulation is difficult. All I/O-related interfaces must be mocked during the simulation, and all uncertainties should be eliminated. This project is created to make that easy.

A part of the implementation of this crate is inspired by tokio-rs/simulation.

See also the blog posts for a detailed writeup:

Usage

Add the following lines to your Cargo.toml:

[dependencies]
madsim = "0.2"

If your project depends on the following crates, replace them by our simulators:

[dependencies]
tokio = { version = "0.2", package = "madsim-tokio" }
tonic = { version = "0.4", package = "madsim-tonic" }
etcd-client = { version = "0.4", package = "madsim-etcd-client" }
rdkafka = { version = "0.3", package = "madsim-rdkafka" }
aws-sdk-s3 = { version = "0.5", package = "madsim-aws-sdk-s3" }

[dev-dependencies]
tonic-build = { version = "0.4", package = "madsim-tonic-build" }

If your dependency graph includes the following crates, replace them by our patched version:

[patch.crates-io]
quanta = { git = "https://github.com/madsim-rs/quanta.git", rev = "948bdc3" }
getrandom = { git = "https://github.com/madsim-rs/getrandom.git", rev = "8daf97e" }
tokio-retry = { git = "https://github.com/madsim-rs/rust-tokio-retry.git", rev = "95e2fd3" }
tokio-postgres = { git = "https://github.com/madsim-rs/rust-postgres.git", rev = "4538cd6" }
tokio-stream = { git = "https://github.com/madsim-rs/tokio.git", rev = "ab251ad" }

When built normally, these crates are identical to the original ones.

To run your code on the simulator, enable the config madsim:

RUSTFLAGS="--cfg madsim" cargo test

Now you have gotten rid of tokio/tonic and you are in the simulation world!

We provide a set of APIs to control the simulator. You can use them to kill a process, disconnect the network, inject failures, etc. Check out the documentation and search for the madsim feature to learn more usages.

Projects

  • MadRaft: The labs of Raft consensus algorithm derived from MIT 6.824 and PingCAP Talent Plan.
  • RisingWave: A distributed SQL database for stream processing that uses MadSim for deterministic testing.

License

Apache License 2.0

madsim's People

Contributors

bsbds avatar bugenzhao avatar dylan-dpc avatar huang-jl avatar hzxa21 avatar imag1ne avatar kveinaxel avatar mrcroxx avatar rex4539 avatar skyzh avatar tennyzhuang avatar wangrunji0408 avatar xxchan avatar yiyuanliu avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar

madsim's Issues

Check the result of the function in `check_determinism`

This is just an idea / feature request, it's totally fine to close the issue if it does not spark any interest.

Currently check_determinism verifies that the randomness state after running the input function twice is the same.
If the input function is truly deterministic, it must return the same result.
Here is how I currently check that a function is deterministic:

#[test]
fn ulid_generation_should_be_deterministic() {
    let mut builder_a = madsim::runtime::Builder::from_env();
    builder_a.check = true;
    let mut builder_b = madsim::runtime::Builder::from_env(); // Builder: Clone would be useful
    builder_b.check = true;
    builder_b.seed = builder_a.seed;
    assert_eq!(
        builder_a.run(|| async { ulid::Ulid::new() }),
        builder_b.run(|| async { ulid::Ulid::new() })
    );
}

It would be nice to have less boilerplate, given that the function returns a value implementing Debug and PartialEq

simulate `rand::rngs`

Currently, this module does not exist in the simulation. It should be implemented.

Enhancing Determinism by Forking Processes

Currently, in madsim, running multiple simulations of the same program occurs within the same process. However, real-world programs inevitably use static variables, which can lead to different initial states for each simulation, resulting in non-deterministic outcomes. To resolve this issue, we can first fork a new process and then run the simulation.

Optimize RPC on UCX

Currently a pingpong RPC on UCX/IB costs 20-30us. We expect to optimize it to ~5us.
The main overhead is coroutine switch: Due to the limitation of UCX, all UCX operations must run on a single thread. So a send request have to be sent from user coroutine to the network coroutine, and a received message have to be sent from the network coroutine to user coroutine. A RPC handler can be directly called from the network coroutine, but now it has 2 extra switches.

Improve the simulator

  • Add random delay for task and process
  • Support auto fault-recovery
  • Support duplicated packets

Close unused TCP connection

Issue

When establishing a TCP connection, there are two background tasks: recever_task and sender_task, which constantly recv msg and send msg:

https://github.com/madsys-dev/madsim/blob/5a7a858547cc350408ec9f78261062295c3fcca8/madsim/src/std/net/tcp.rs#L70-L144

When remote close connection, the recever_task will terminate while sender_task will continue. Then the TCP connection will keep permanently, causing massive useless TCP connections binding on the port.

Temporary Solution

async fn setup_connection(
    self: &Arc<Self>,
    addr: SocketAddr,
    peer: Option<SocketAddr>,
    stream: TcpStream,
) -> (SocketAddr, mpsc::Sender<SendMsg>) {
  // ......
  let inner = Arc::downgrade(self);
  task::spawn(async move {
      // NOTE: when recver task finish, the tcp connection is closed by remote.
      // We need to abort sender task to close the connection cleanly.
      match recver_task.await {
          Ok(_) => {}
          Err(_) => warn!("tcp recver task has err"),
      }
      sender_task.abort();
      if let Some(inner) = inner.upgrade() {
          inner.sender.lock().await.remove(&peer);
      }
  });
  // self.tasks
  //     .lock()
  //     .unwrap()
  //     .extend([sender_task, recver_task]);
  (peer, sender)
}

panic if spawn when task drop

In RisingWave deterministic test, got the following panic, which is caused by tokio::spawn when some struct drop during task drop.

thread '<unnamed>' panicked at /home/mrcroxx/.cargo/registry/src/index.crates.io-6f17d22bba15001f/madsim-0.2.22/src/sim/runtime/context.rs:27:44:
there is no reactor running, must be called from the context of a Madsim runtime
stack backtrace:
   0: rust_begin_unwind
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/std/src/panicking.rs:597:5
   1: core::panicking::panic_fmt
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/panicking.rs:72:14
   2: core::panicking::panic_display
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/panicking.rs:178:5
   3: core::panicking::panic_str
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/panicking.rs:152:5
   4: core::option::expect_failed
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/option.rs:1979:5
   5: core::option::Option<T>::expect
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/option.rs:888:21
   6: madsim::sim::runtime::context::current_task::{{closure}}
             at /home/mrcroxx/.cargo/registry/src/index.crates.io-6f17d22bba15001f/madsim-0.2.22/src/sim/runtime/context.rs:27:44
   7: std::thread::local::LocalKey<T>::try_with
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/std/src/thread/local.rs:270:16
   8: std::thread::local::LocalKey<T>::with
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/std/src/thread/local.rs:246:9
   9: madsim::sim::runtime::context::current_task
             at /home/mrcroxx/.cargo/registry/src/index.crates.io-6f17d22bba15001f/madsim-0.2.22/src/sim/runtime/context.rs:27:10
  10: madsim::sim::task::Spawner::current
             at /home/mrcroxx/.cargo/registry/src/index.crates.io-6f17d22bba15001f/madsim-0.2.22/src/sim/task/mod.rs:577:20
  11: madsim::sim::task::spawn
             at /home/mrcroxx/.cargo/registry/src/index.crates.io-6f17d22bba15001f/madsim-0.2.22/src/sim/task/mod.rs:655:5
  12: foyer_storage::storage::AsyncStorageExt::insert_if_not_exists_async
             at /home/mrcroxx/.cargo/git/checkouts/foyer-e7ae575671bf01ff/ce2e222/foyer-storage/src/storage.rs:216:9
  13: <risingwave_storage::hummock::sstable_store::BlockCacheEventListener as risingwave_common::cache::LruCacheEventListener>::on_release
             at ./src/storage/src/hummock/sstable_store.rs:110:9
  14: core::ptr::drop_in_place<risingwave_common::cache::CacheableEntry<(u64,u64),alloc::boxed::Box<risingwave_storage::hummock::sstable::block::Block>>>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
  15: core::ptr::drop_in_place<risingwave_storage::hummock::block_cache::BlockEntry>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
  16: core::ptr::drop_in_place<risingwave_storage::hummock::block_cache::BlockHolder>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
  17: core::ptr::drop_in_place<risingwave_storage::hummock::sstable::block_iterator::BlockIterator>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
  18: core::ptr::drop_in_place<core::option::Option<risingwave_storage::hummock::sstable::block_iterator::BlockIterator>>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
  19: core::ptr::drop_in_place<risingwave_storage::hummock::sstable::forward_sstable_iterator::SstableIterator>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
  20: core::ptr::drop_in_place<[risingwave_storage::hummock::iterator::merge_inner::Node<risingwave_storage::hummock::iterator::HummockIteratorUnion<risingwave_storage::hummock::iterator::Forward,risingwave_storage::hummock::iterator::merge_inner::MergeIteratorInner<risingwave_storage::hummock::iterator::HummockIteratorUnion<risingwave_storage::hummock::iterator::Forward,risingwave_storage::hummock::shared_buffer::shared_buffer_batch::SharedBufferBatchIterator<risingwave_storage::hummock::iterator::Forward>,risingwave_storage::hummock::sstable::forward_sstable_iterator::SstableIterator>,usize>,risingwave_storage::hummock::sstable::forward_sstable_iterator::SstableIterator,risingwave_storage::hummock::iterator::concat_inner::ConcatIteratorInner<risingwave_storage::hummock::sstable::forward_sstable_iterator::SstableIterator>>,()>]>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
  21: <alloc::vec::Vec<T,A> as core::ops::drop::Drop>::drop
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/alloc/src/vec/mod.rs:3067:13
  22: core::ptr::drop_in_place<alloc::vec::Vec<risingwave_storage::hummock::iterator::merge_inner::Node<risingwave_storage::hummock::iterator::HummockIteratorUnion<risingwave_storage::hummock::iterator::Forward,risingwave_storage::hummock::iterator::merge_inner::MergeIteratorInner<risingwave_storage::hummock::iterator::HummockIteratorUnion<risingwave_storage::hummock::iterator::Forward,risingwave_storage::hummock::shared_buffer::shared_buffer_batch::SharedBufferBatchIterator<risingwave_storage::hummock::iterator::Forward>,risingwave_storage::hummock::sstable::forward_sstable_iterator::SstableIterator>,usize>,risingwave_storage::hummock::sstable::forward_sstable_iterator::SstableIterator,risingwave_storage::hummock::iterator::concat_inner::ConcatIteratorInner<risingwave_storage::hummock::sstable::forward_sstable_iterator::SstableIterator>>,()>>>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
  23: core::ptr::drop_in_place<alloc::collections::binary_heap::BinaryHeap<risingwave_storage::hummock::iterator::merge_inner::Node<risingwave_storage::hummock::iterator::HummockIteratorUnion<risingwave_storage::hummock::iterator::Forward,risingwave_storage::hummock::iterator::merge_inner::MergeIteratorInner<risingwave_storage::hummock::iterator::HummockIteratorUnion<risingwave_storage::hummock::iterator::Forward,risingwave_storage::hummock::shared_buffer::shared_buffer_batch::SharedBufferBatchIterator<risingwave_storage::hummock::iterator::Forward>,risingwave_storage::hummock::sstable::forward_sstable_iterator::SstableIterator>,usize>,risingwave_storage::hummock::sstable::forward_sstable_iterator::SstableIterator,risingwave_storage::hummock::iterator::concat_inner::ConcatIteratorInner<risingwave_storage::hummock::sstable::forward_sstable_iterator::SstableIterator>>,()>>>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
  24: core::ptr::drop_in_place<risingwave_storage::hummock::iterator::merge_inner::MergeIteratorInner<risingwave_storage::hummock::iterator::HummockIteratorUnion<risingwave_storage::hummock::iterator::Forward,risingwave_storage::hummock::iterator::merge_inner::MergeIteratorInner<risingwave_storage::hummock::iterator::HummockIteratorUnion<risingwave_storage::hummock::iterator::Forward,risingwave_storage::hummock::shared_buffer::shared_buffer_batch::SharedBufferBatchIterator<risingwave_storage::hummock::iterator::Forward>,risingwave_storage::hummock::sstable::forward_sstable_iterator::SstableIterator>,usize>,risingwave_storage::hummock::sstable::forward_sstable_iterator::SstableIterator,risingwave_storage::hummock::iterator::concat_inner::ConcatIteratorInner<risingwave_storage::hummock::sstable::forward_sstable_iterator::SstableIterator>>,()>>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
  25: core::ptr::drop_in_place<risingwave_storage::hummock::iterator::forward_user::UserIterator<risingwave_storage::hummock::iterator::merge_inner::MergeIteratorInner<risingwave_storage::hummock::iterator::HummockIteratorUnion<risingwave_storage::hummock::iterator::Forward,risingwave_storage::hummock::iterator::merge_inner::MergeIteratorInner<risingwave_storage::hummock::iterator::HummockIteratorUnion<risingwave_storage::hummock::iterator::Forward,risingwave_storage::hummock::shared_buffer::shared_buffer_batch::SharedBufferBatchIterator<risingwave_storage::hummock::iterator::Forward>,risingwave_storage::hummock::sstable::forward_sstable_iterator::SstableIterator>,usize>,risingwave_storage::hummock::sstable::forward_sstable_iterator::SstableIterator,risingwave_storage::hummock::iterator::concat_inner::ConcatIteratorInner<risingwave_storage::hummock::sstable::forward_sstable_iterator::SstableIterator>>,()>>>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
  26: core::ptr::drop_in_place<risingwave_storage::hummock::store::local_hummock_storage::HummockStorageIterator>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
  27: core::ptr::drop_in_place<risingwave_storage::store::into_stream_inner<risingwave_storage::hummock::store::local_hummock_storage::HummockStorageIterator>::{{closure}}>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:52:1
  28: core::ptr::drop_in_place<core::option::Option<risingwave_storage::store::into_stream_inner<risingwave_storage::hummock::store::local_hummock_storage::HummockStorageIterator>::{{closure}}>>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
  29: core::ptr::drop_in_place<futures_async_stream::try_stream::GenTryStream<risingwave_storage::store::into_stream_inner<risingwave_storage::hummock::store::local_hummock_storage::HummockStorageIterator>::{{closure}}>>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
  30: core::ptr::drop_in_place<risingwave_storage::store_impl::verify::verify_stream<futures_async_stream::try_stream::GenTryStream<risingwave_storage::store::into_stream_inner<risingwave_storage::hummock::store::local_hummock_storage::HummockStorageIterator>::{{closure}}>,futures_async_stream::try_stream::GenTryStream<risingwave_storage::store::into_stream_inner<risingwave_storage::memory::RangeKvStateStoreIter<risingwave_storage::memory::sled::SledRangeKv>>::{{closure}}>>::{{closure}}>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:296:5
  31: core::ptr::drop_in_place<core::option::Option<risingwave_storage::store_impl::verify::verify_stream<futures_async_stream::try_stream::GenTryStream<risingwave_storage::store::into_stream_inner<risingwave_storage::hummock::store::local_hummock_storage::HummockStorageIterator>::{{closure}}>,futures_async_stream::try_stream::GenTryStream<risingwave_storage::store::into_stream_inner<risingwave_storage::memory::RangeKvStateStoreIter<risingwave_storage::memory::sled::SledRangeKv>>::{{closure}}>>::{{closure}}>>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
  32: core::ptr::drop_in_place<alloc::boxed::Box<dyn futures_core::stream::Stream+Item = core::result::Result<(risingwave_hummock_sdk::key::FullKey<bytes::bytes::Bytes>,bytes::bytes::Bytes),risingwave_storage::error::StorageError>+core::marker::Send>>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
  33: core::ptr::drop_in_place<core::pin::Pin<alloc::boxed::Box<dyn futures_core::stream::Stream+Item = core::result::Result<(risingwave_hummock_sdk::key::FullKey<bytes::bytes::Bytes>,bytes::bytes::Bytes),risingwave_storage::error::StorageError>+core::marker::Send>>>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
  34: core::ptr::drop_in_place<risingwave_storage::monitor::monitored_store::MonitoredStateStoreIter<core::pin::Pin<alloc::boxed::Box<dyn futures_core::stream::Stream+Item = core::result::Result<(risingwave_hummock_sdk::key::FullKey<bytes::bytes::Bytes>,bytes::bytes::Bytes),risingwave_storage::error::StorageError>+core::marker::Send>>>::into_stream_inner::{{closure}}>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:386:5
  35: core::ptr::drop_in_place<core::option::Option<risingwave_storage::monitor::monitored_store::MonitoredStateStoreIter<core::pin::Pin<alloc::boxed::Box<dyn futures_core::stream::Stream+Item = core::result::Result<(risingwave_hummock_sdk::key::FullKey<bytes::bytes::Bytes>,bytes::bytes::Bytes),risingwave_storage::error::StorageError>+core::marker::Send>>>::into_stream_inner::{{closure}}>>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
  36: core::ptr::drop_in_place<futures_async_stream::try_stream::GenTryStream<risingwave_storage::monitor::monitored_store::MonitoredStateStoreIter<core::pin::Pin<alloc::boxed::Box<dyn futures_core::stream::Stream+Item = core::result::Result<(risingwave_hummock_sdk::key::FullKey<bytes::bytes::Bytes>,bytes::bytes::Bytes),risingwave_storage::error::StorageError>+core::marker::Send>>>::into_stream_inner::{{closure}}>>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
  37: core::ptr::drop_in_place<tracing_futures::Instrumented<futures_async_stream::try_stream::GenTryStream<risingwave_storage::monitor::monitored_store::MonitoredStateStoreIter<core::pin::Pin<alloc::boxed::Box<dyn futures_core::stream::Stream+Item = core::result::Result<(risingwave_hummock_sdk::key::FullKey<bytes::bytes::Bytes>,bytes::bytes::Bytes),risingwave_storage::error::StorageError>+core::marker::Send>>>::into_stream_inner::{{closure}}>>>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
  38: core::ptr::drop_in_place<risingwave_storage::table::batch_table::storage_table::StorageTableInnerIterInner<risingwave_storage::monitor::monitored_store::MonitoredStateStore<alloc::boxed::Box<dyn risingwave_storage::store_impl::boxed_state_store::DynamicDispatchedStateStore>>,risingwave_common::util::value_encoding::EitherSerde>::into_stream::{{closure}}>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:695:5
  39: core::ptr::drop_in_place<core::option::Option<risingwave_storage::table::batch_table::storage_table::StorageTableInnerIterInner<risingwave_storage::monitor::monitored_store::MonitoredStateStore<alloc::boxed::Box<dyn risingwave_storage::store_impl::boxed_state_store::DynamicDispatchedStateStore>>,risingwave_common::util::value_encoding::EitherSerde>::into_stream::{{closure}}>>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
  40: core::ptr::drop_in_place<futures_async_stream::try_stream::GenTryStream<risingwave_storage::table::batch_table::storage_table::StorageTableInnerIterInner<risingwave_storage::monitor::monitored_store::MonitoredStateStore<alloc::boxed::Box<dyn risingwave_storage::store_impl::boxed_state_store::DynamicDispatchedStateStore>>,risingwave_common::util::value_encoding::EitherSerde>::into_stream::{{closure}}>>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
  41: core::ptr::drop_in_place<[futures_util::stream::futures_ordered::OrderWrapper<core::result::Result<futures_async_stream::try_stream::GenTryStream<risingwave_storage::table::batch_table::storage_table::StorageTableInnerIterInner<risingwave_storage::monitor::monitored_store::MonitoredStateStore<alloc::boxed::Box<dyn risingwave_storage::store_impl::boxed_state_store::DynamicDispatchedStateStore>>,risingwave_common::util::value_encoding::EitherSerde>::into_stream::{{closure}}>,risingwave_storage::error::StorageError>>]>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
  42: <alloc::vec::Vec<T,A> as core::ops::drop::Drop>::drop
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/alloc/src/vec/mod.rs:3067:13
  43: core::ptr::drop_in_place<alloc::vec::Vec<futures_util::stream::futures_ordered::OrderWrapper<core::result::Result<futures_async_stream::try_stream::GenTryStream<risingwave_storage::table::batch_table::storage_table::StorageTableInnerIterInner<risingwave_storage::monitor::monitored_store::MonitoredStateStore<alloc::boxed::Box<dyn risingwave_storage::store_impl::boxed_state_store::DynamicDispatchedStateStore>>,risingwave_common::util::value_encoding::EitherSerde>::into_stream::{{closure}}>,risingwave_storage::error::StorageError>>>>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
  44: core::ptr::drop_in_place<alloc::collections::binary_heap::BinaryHeap<futures_util::stream::futures_ordered::OrderWrapper<core::result::Result<futures_async_stream::try_stream::GenTryStream<risingwave_storage::table::batch_table::storage_table::StorageTableInnerIterInner<risingwave_storage::monitor::monitored_store::MonitoredStateStore<alloc::boxed::Box<dyn risingwave_storage::store_impl::boxed_state_store::DynamicDispatchedStateStore>>,risingwave_common::util::value_encoding::EitherSerde>::into_stream::{{closure}}>,risingwave_storage::error::StorageError>>>>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
  45: core::ptr::drop_in_place<futures_util::stream::futures_ordered::FuturesOrdered<futures_util::future::try_future::into_future::IntoFuture<risingwave_storage::table::batch_table::storage_table::StorageTableInner<risingwave_storage::monitor::monitored_store::MonitoredStateStore<alloc::boxed::Box<dyn risingwave_storage::store_impl::boxed_state_store::DynamicDispatchedStateStore>>,risingwave_common::util::value_encoding::EitherSerde>::iter_with_encoded_key_range::{{closure}}::{{closure}}::{{closure}}>>>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
  46: core::ptr::drop_in_place<futures_util::stream::try_stream::try_collect::TryCollect<futures_util::stream::futures_ordered::FuturesOrdered<futures_util::future::try_future::into_future::IntoFuture<risingwave_storage::table::batch_table::storage_table::StorageTableInner<risingwave_storage::monitor::monitored_store::MonitoredStateStore<alloc::boxed::Box<dyn risingwave_storage::store_impl::boxed_state_store::DynamicDispatchedStateStore>>,risingwave_common::util::value_encoding::EitherSerde>::iter_with_encoded_key_range::{{closure}}::{{closure}}::{{closure}}>>,alloc::vec::Vec<futures_async_stream::try_stream::GenTryStream<risingwave_storage::table::batch_table::storage_table::StorageTableInnerIterInner<risingwave_storage::monitor::monitored_store::MonitoredStateStore<alloc::boxed::Box<dyn risingwave_storage::store_impl::boxed_state_store::DynamicDispatchedStateStore>>,risingwave_common::util::value_encoding::EitherSerde>::into_stream::{{closure}}>>>>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
  47: core::ptr::drop_in_place<futures_util::future::try_join_all::TryJoinAllKind<risingwave_storage::table::batch_table::storage_table::StorageTableInner<risingwave_storage::monitor::monitored_store::MonitoredStateStore<alloc::boxed::Box<dyn risingwave_storage::store_impl::boxed_state_store::DynamicDispatchedStateStore>>,risingwave_common::util::value_encoding::EitherSerde>::iter_with_encoded_key_range::{{closure}}::{{closure}}::{{closure}}>>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
  48: core::ptr::drop_in_place<futures_util::future::try_join_all::TryJoinAll<risingwave_storage::table::batch_table::storage_table::StorageTableInner<risingwave_storage::monitor::monitored_store::MonitoredStateStore<alloc::boxed::Box<dyn risingwave_storage::store_impl::boxed_state_store::DynamicDispatchedStateStore>>,risingwave_common::util::value_encoding::EitherSerde>::iter_with_encoded_key_range::{{closure}}::{{closure}}::{{closure}}>>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
  49: core::ptr::drop_in_place<risingwave_storage::table::batch_table::storage_table::StorageTableInner<risingwave_storage::monitor::monitored_store::MonitoredStateStore<alloc::boxed::Box<dyn risingwave_storage::store_impl::boxed_state_store::DynamicDispatchedStateStore>>,risingwave_common::util::value_encoding::EitherSerde>::iter_with_encoded_key_range::{{closure}}>
             at ./src/storage/src/table/batch_table/storage_table.rs:469:10
  50: core::ptr::drop_in_place<risingwave_storage::table::batch_table::storage_table::StorageTableInner<risingwave_storage::monitor::monitored_store::MonitoredStateStore<alloc::boxed::Box<dyn risingwave_storage::store_impl::boxed_state_store::DynamicDispatchedStateStore>>,risingwave_common::util::value_encoding::EitherSerde>::iter_with_pk_bounds<&risingwave_common::row::owned_row::OwnedRow,(core::ops::range::Bound<risingwave_common::row::owned_row::OwnedRow>,core::ops::range::Bound<risingwave_common::row::owned_row::OwnedRow>)>::{{closure}}>
             at ./src/storage/src/table/batch_table/storage_table.rs:604:10
  51: core::ptr::drop_in_place<risingwave_storage::table::batch_table::storage_table::StorageTableInner<risingwave_storage::monitor::monitored_store::MonitoredStateStore<alloc::boxed::Box<dyn risingwave_storage::store_impl::boxed_state_store::DynamicDispatchedStateStore>>,risingwave_common::util::value_encoding::EitherSerde>::batch_iter_with_pk_bounds<&risingwave_common::row::owned_row::OwnedRow,(core::ops::range::Bound<risingwave_common::row::owned_row::OwnedRow>,core::ops::range::Bound<risingwave_common::row::owned_row::OwnedRow>)>::{{closure}}>
             at ./src/storage/src/table/batch_table/storage_table.rs:618:14
  52: core::ptr::drop_in_place<risingwave_batch::executor::row_seq_scan::RowSeqScanExecutor<risingwave_storage::monitor::monitored_store::MonitoredStateStore<alloc::boxed::Box<dyn risingwave_storage::store_impl::boxed_state_store::DynamicDispatchedStateStore>>>::execute_range<risingwave_common::metrics::guarded_metrics::LabelGuardedMetric<prometheus::histogram::Histogram,4_usize>>::{{closure}}>
             at ./src/batch/src/executor/row_seq_scan.rs:421:18
  53: core::ptr::drop_in_place<core::option::Option<risingwave_batch::executor::row_seq_scan::RowSeqScanExecutor<risingwave_storage::monitor::monitored_store::MonitoredStateStore<alloc::boxed::Box<dyn risingwave_storage::store_impl::boxed_state_store::DynamicDispatchedStateStore>>>::execute_range<risingwave_common::metrics::guarded_metrics::LabelGuardedMetric<prometheus::histogram::Histogram,4_usize>>::{{closure}}>>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
  54: core::ptr::drop_in_place<futures_async_stream::try_stream::GenTryStream<risingwave_batch::executor::row_seq_scan::RowSeqScanExecutor<risingwave_storage::monitor::monitored_store::MonitoredStateStore<alloc::boxed::Box<dyn risingwave_storage::store_impl::boxed_state_store::DynamicDispatchedStateStore>>>::execute_range<risingwave_common::metrics::guarded_metrics::LabelGuardedMetric<prometheus::histogram::Histogram,4_usize>>::{{closure}}>>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
  55: core::ptr::drop_in_place<alloc::boxed::Box<futures_async_stream::try_stream::GenTryStream<risingwave_batch::executor::row_seq_scan::RowSeqScanExecutor<risingwave_storage::monitor::monitored_store::MonitoredStateStore<alloc::boxed::Box<dyn risingwave_storage::store_impl::boxed_state_store::DynamicDispatchedStateStore>>>::execute_range<risingwave_common::metrics::guarded_metrics::LabelGuardedMetric<prometheus::histogram::Histogram,4_usize>>::{{closure}}>>>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
  56: core::ptr::drop_in_place<core::pin::Pin<alloc::boxed::Box<futures_async_stream::try_stream::GenTryStream<risingwave_batch::executor::row_seq_scan::RowSeqScanExecutor<risingwave_storage::monitor::monitored_store::MonitoredStateStore<alloc::boxed::Box<dyn risingwave_storage::store_impl::boxed_state_store::DynamicDispatchedStateStore>>>::execute_range<risingwave_common::metrics::guarded_metrics::LabelGuardedMetric<prometheus::histogram::Histogram,4_usize>>::{{closure}}>>>>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
  57: core::ptr::drop_in_place<core::option::Option<core::pin::Pin<alloc::boxed::Box<futures_async_stream::try_stream::GenTryStream<risingwave_batch::executor::row_seq_scan::RowSeqScanExecutor<risingwave_storage::monitor::monitored_store::MonitoredStateStore<alloc::boxed::Box<dyn risingwave_storage::store_impl::boxed_state_store::DynamicDispatchedStateStore>>>::execute_range<risingwave_common::metrics::guarded_metrics::LabelGuardedMetric<prometheus::histogram::Histogram,4_usize>>::{{closure}}>>>>>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
  58: core::ptr::drop_in_place<futures_util::stream::stream::into_future::StreamFuture<core::pin::Pin<alloc::boxed::Box<futures_async_stream::try_stream::GenTryStream<risingwave_batch::executor::row_seq_scan::RowSeqScanExecutor<risingwave_storage::monitor::monitored_store::MonitoredStateStore<alloc::boxed::Box<dyn risingwave_storage::store_impl::boxed_state_store::DynamicDispatchedStateStore>>>::execute_range<risingwave_common::metrics::guarded_metrics::LabelGuardedMetric<prometheus::histogram::Histogram,4_usize>>::{{closure}}>>>>>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
  59: core::ptr::drop_in_place<core::option::Option<futures_util::stream::stream::into_future::StreamFuture<core::pin::Pin<alloc::boxed::Box<futures_async_stream::try_stream::GenTryStream<risingwave_batch::executor::row_seq_scan::RowSeqScanExecutor<risingwave_storage::monitor::monitored_store::MonitoredStateStore<alloc::boxed::Box<dyn risingwave_storage::store_impl::boxed_state_store::DynamicDispatchedStateStore>>>::execute_range<risingwave_common::metrics::guarded_metrics::LabelGuardedMetric<prometheus::histogram::Histogram,4_usize>>::{{closure}}>>>>>>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
  60: futures_util::stream::futures_unordered::FuturesUnordered<Fut>::release_task
             at /home/mrcroxx/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.28/src/stream/futures_unordered/mod.rs:266:13
  61: futures_util::stream::futures_unordered::FuturesUnordered<Fut>::clear_head_all
             at /home/mrcroxx/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.28/src/stream/futures_unordered/mod.rs:573:13
  62: <futures_util::stream::futures_unordered::FuturesUnordered<Fut> as core::ops::drop::Drop>::drop
             at /home/mrcroxx/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.28/src/stream/futures_unordered/mod.rs:584:9
  63: core::ptr::drop_in_place<futures_util::stream::futures_unordered::FuturesUnordered<futures_util::stream::stream::into_future::StreamFuture<core::pin::Pin<alloc::boxed::Box<futures_async_stream::try_stream::GenTryStream<risingwave_batch::executor::row_seq_scan::RowSeqScanExecutor<risingwave_storage::monitor::monitored_store::MonitoredStateStore<alloc::boxed::Box<dyn risingwave_storage::store_impl::boxed_state_store::DynamicDispatchedStateStore>>>::execute_range<risingwave_common::metrics::guarded_metrics::LabelGuardedMetric<prometheus::histogram::Histogram,4_usize>>::{{closure}}>>>>>>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
  64: core::ptr::drop_in_place<futures_util::stream::select_all::SelectAll<core::pin::Pin<alloc::boxed::Box<futures_async_stream::try_stream::GenTryStream<risingwave_batch::executor::row_seq_scan::RowSeqScanExecutor<risingwave_storage::monitor::monitored_store::MonitoredStateStore<alloc::boxed::Box<dyn risingwave_storage::store_impl::boxed_state_store::DynamicDispatchedStateStore>>>::execute_range<risingwave_common::metrics::guarded_metrics::LabelGuardedMetric<prometheus::histogram::Histogram,4_usize>>::{{closure}}>>>>>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
  65: core::ptr::drop_in_place<risingwave_batch::executor::row_seq_scan::RowSeqScanExecutor<risingwave_storage::monitor::monitored_store::MonitoredStateStore<alloc::boxed::Box<dyn risingwave_storage::store_impl::boxed_state_store::DynamicDispatchedStateStore>>>::do_execute::{{closure}}>
             at ./src/batch/src/executor/row_seq_scan.rs:299:50
  66: core::ptr::drop_in_place<core::option::Option<risingwave_batch::executor::row_seq_scan::RowSeqScanExecutor<risingwave_storage::monitor::monitored_store::MonitoredStateStore<alloc::boxed::Box<dyn risingwave_storage::store_impl::boxed_state_store::DynamicDispatchedStateStore>>>::do_execute::{{closure}}>>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
  67: core::ptr::drop_in_place<alloc::boxed::Box<dyn futures_core::stream::Stream+Item = core::result::Result<risingwave_common::array::data_chunk::DataChunk,risingwave_common::error::RwError>+core::marker::Send>>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
  68: core::ptr::drop_in_place<core::pin::Pin<alloc::boxed::Box<dyn futures_core::stream::Stream+Item = core::result::Result<risingwave_common::array::data_chunk::DataChunk,risingwave_common::error::RwError>+core::marker::Send>>>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
  69: core::ptr::drop_in_place<<risingwave_batch::executor::managed::ManagedExecutor as risingwave_batch::executor::Executor>::execute::{{closure}}>
             at ./src/batch/src/executor/managed.rs:82:5
  70: core::ptr::drop_in_place<core::option::Option<<risingwave_batch::executor::managed::ManagedExecutor as risingwave_batch::executor::Executor>::execute::{{closure}}>>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
  71: core::ptr::drop_in_place<alloc::boxed::Box<dyn futures_core::stream::Stream+Item = core::result::Result<risingwave_common::array::data_chunk::DataChunk,risingwave_common::error::RwError>+core::marker::Send>>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
  72: core::ptr::drop_in_place<core::pin::Pin<alloc::boxed::Box<dyn futures_core::stream::Stream+Item = core::result::Result<risingwave_common::array::data_chunk::DataChunk,risingwave_common::error::RwError>+core::marker::Send>>>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
  73: core::ptr::drop_in_place<risingwave_batch::executor::filter::FilterExecutor::do_execute::{{closure}}>
             at ./src/batch/src/executor/filter.rs:53:57
  74: core::ptr::drop_in_place<core::option::Option<risingwave_batch::executor::filter::FilterExecutor::do_execute::{{closure}}>>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
  75: core::ptr::drop_in_place<alloc::boxed::Box<dyn futures_core::stream::Stream+Item = core::result::Result<risingwave_common::array::data_chunk::DataChunk,risingwave_common::error::RwError>+core::marker::Send>>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
  76: core::ptr::drop_in_place<core::pin::Pin<alloc::boxed::Box<dyn futures_core::stream::Stream+Item = core::result::Result<risingwave_common::array::data_chunk::DataChunk,risingwave_common::error::RwError>+core::marker::Send>>>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
  77: core::ptr::drop_in_place<<risingwave_batch::executor::managed::ManagedExecutor as risingwave_batch::executor::Executor>::execute::{{closure}}>
             at ./src/batch/src/executor/managed.rs:82:5
  78: core::ptr::drop_in_place<core::option::Option<<risingwave_batch::executor::managed::ManagedExecutor as risingwave_batch::executor::Executor>::execute::{{closure}}>>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
  79: core::ptr::drop_in_place<alloc::boxed::Box<dyn futures_core::stream::Stream+Item = core::result::Result<risingwave_common::array::data_chunk::DataChunk,risingwave_common::error::RwError>+core::marker::Send>>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
  80: core::ptr::drop_in_place<core::pin::Pin<alloc::boxed::Box<dyn futures_core::stream::Stream+Item = core::result::Result<risingwave_common::array::data_chunk::DataChunk,risingwave_common::error::RwError>+core::marker::Send>>>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
  81: core::ptr::drop_in_place<futures_util::stream::stream::map::Map<core::pin::Pin<alloc::boxed::Box<dyn futures_core::stream::Stream+Item = core::result::Result<risingwave_common::array::data_chunk::DataChunk,risingwave_common::error::RwError>+core::marker::Send>>,risingwave_batch::executor::project::ProjectExecutor::do_execute::{{closure}}>>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
  82: core::ptr::drop_in_place<futures_util::stream::stream::fuse::Fuse<futures_util::stream::stream::map::Map<core::pin::Pin<alloc::boxed::Box<dyn futures_core::stream::Stream+Item = core::result::Result<risingwave_common::array::data_chunk::DataChunk,risingwave_common::error::RwError>+core::marker::Send>>,risingwave_batch::executor::project::ProjectExecutor::do_execute::{{closure}}>>>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
  83: core::ptr::drop_in_place<futures_util::stream::stream::buffered::Buffered<futures_util::stream::stream::map::Map<core::pin::Pin<alloc::boxed::Box<dyn futures_core::stream::Stream+Item = core::result::Result<risingwave_common::array::data_chunk::DataChunk,risingwave_common::error::RwError>+core::marker::Send>>,risingwave_batch::executor::project::ProjectExecutor::do_execute::{{closure}}>>>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
  84: core::ptr::drop_in_place<alloc::boxed::Box<dyn futures_core::stream::Stream+Item = core::result::Result<risingwave_common::array::data_chunk::DataChunk,risingwave_common::error::RwError>+core::marker::Send>>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
  85: core::ptr::drop_in_place<core::pin::Pin<alloc::boxed::Box<dyn futures_core::stream::Stream+Item = core::result::Result<risingwave_common::array::data_chunk::DataChunk,risingwave_common::error::RwError>+core::marker::Send>>>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
  86: core::ptr::drop_in_place<<risingwave_batch::executor::managed::ManagedExecutor as risingwave_batch::executor::Executor>::execute::{{closure}}>
             at ./src/batch/src/executor/managed.rs:82:5
  87: core::ptr::drop_in_place<core::option::Option<<risingwave_batch::executor::managed::ManagedExecutor as risingwave_batch::executor::Executor>::execute::{{closure}}>>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
  88: core::ptr::drop_in_place<alloc::boxed::Box<dyn futures_core::stream::Stream+Item = core::result::Result<risingwave_common::array::data_chunk::DataChunk,risingwave_common::error::RwError>+core::marker::Send>>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
  89: core::ptr::drop_in_place<core::pin::Pin<alloc::boxed::Box<dyn futures_core::stream::Stream+Item = core::result::Result<risingwave_common::array::data_chunk::DataChunk,risingwave_common::error::RwError>+core::marker::Send>>>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
  90: core::ptr::drop_in_place<risingwave_batch::task::task_execution::BatchTaskExecution<risingwave_batch::task::context::ComputeNodeContext>::run::{{closure}}>
             at ./src/batch/src/task/task_execution.rs:681:5
  91: core::ptr::drop_in_place<tracing::instrument::Instrumented<risingwave_batch::task::task_execution::BatchTaskExecution<risingwave_batch::task::context::ComputeNodeContext>::run::{{closure}}>>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
  92: core::ptr::drop_in_place<risingwave_batch::task::task_execution::BatchTaskExecution<risingwave_batch::task::context::ComputeNodeContext>::async_execute::{{closure}}::{{closure}}::{{closure}}::{{closure}}>
             at ./src/batch/src/task/task_execution.rs:474:22
  93: core::ptr::drop_in_place<tokio_metrics::task::Instrumented<risingwave_batch::task::task_execution::BatchTaskExecution<risingwave_batch::task::context::ComputeNodeContext>::async_execute::{{closure}}::{{closure}}::{{closure}}::{{closure}}>>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
  94: core::ptr::drop_in_place<core::panic::unwind_safe::AssertUnwindSafe<tokio_metrics::task::Instrumented<risingwave_batch::task::task_execution::BatchTaskExecution<risingwave_batch::task::context::ComputeNodeContext>::async_execute::{{closure}}::{{closure}}::{{closure}}::{{closure}}>>>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
  95: core::ptr::drop_in_place<futures_util::future::future::catch_unwind::CatchUnwind<core::panic::unwind_safe::AssertUnwindSafe<tokio_metrics::task::Instrumented<risingwave_batch::task::task_execution::BatchTaskExecution<risingwave_batch::task::context::ComputeNodeContext>::async_execute::{{closure}}::{{closure}}::{{closure}}::{{closure}}>>>>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
  96: core::ptr::drop_in_place<core::option::Option<futures_util::future::future::catch_unwind::CatchUnwind<core::panic::unwind_safe::AssertUnwindSafe<tokio_metrics::task::Instrumented<risingwave_batch::task::task_execution::BatchTaskExecution<risingwave_batch::task::context::ComputeNodeContext>::async_execute::{{closure}}::{{closure}}::{{closure}}::{{closure}}>>>>>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
  97: core::pin::Pin<P>::set
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/pin.rs:754:9
  98: tokio::task::task_local::_::<impl core::ops::drop::Drop for tokio::task::task_local::TaskLocalFuture<T,F>>::drop::__drop_inner::{{closure}}
             at /home/mrcroxx/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/task/task_local.rs:328:21
  99: tokio::task::task_local::LocalKey<T>::scope_inner
             at /home/mrcroxx/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/task/task_local.rs:217:19
 100: tokio::task::task_local::_::<impl core::ops::drop::Drop for tokio::task::task_local::TaskLocalFuture<T,F>>::drop::__drop_inner
             at /home/mrcroxx/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/task/task_local.rs:327:25
 101: tokio::task::task_local::_::<impl core::ops::drop::Drop for tokio::task::task_local::TaskLocalFuture<T,F>>::drop
             at /home/mrcroxx/.cargo/registry/src/index.crates.io-6f17d22bba15001f/pin-project-lite-0.2.13/src/lib.rs:1300:17
 102: core::ptr::drop_in_place<tokio::task::task_local::TaskLocalFuture<(),futures_util::future::future::catch_unwind::CatchUnwind<core::panic::unwind_safe::AssertUnwindSafe<tokio_metrics::task::Instrumented<risingwave_batch::task::task_execution::BatchTaskExecution<risingwave_batch::task::context::ComputeNodeContext>::async_execute::{{closure}}::{{closure}}::{{closure}}::{{closure}}>>>>>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
 103: core::ptr::drop_in_place<risingwave_batch::task::task_execution::BatchTaskExecution<risingwave_batch::task::context::ComputeNodeContext>::async_execute::{{closure}}::{{closure}}>
             at ./src/batch/src/task/task_execution.rs:481:73
 104: core::ptr::drop_in_place<madsim::sim::task::Spawner::spawn_inner<risingwave_batch::task::task_execution::BatchTaskExecution<risingwave_batch::task::context::ComputeNodeContext>::async_execute::{{closure}}::{{closure}}>::{{closure}}::{{closure}}>
             at /home/mrcroxx/.cargo/registry/src/index.crates.io-6f17d22bba15001f/madsim-0.2.22/src/sim/task/mod.rs:629:28
 105: core::mem::manually_drop::ManuallyDrop<T>::drop
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/mem/manually_drop.rs:144:18
 106: <async_task::runnable::Builder<M>::spawn_local::Checked<F> as core::ops::drop::Drop>::drop
             at /home/mrcroxx/.cargo/registry/src/index.crates.io-6f17d22bba15001f/async-task-4.4.0/src/runnable.rs:442:21
 107: core::ptr::drop_in_place<async_task::runnable::Builder<M>::spawn_local::Checked<madsim::sim::task::Spawner::spawn_inner<risingwave_batch::task::task_execution::BatchTaskExecution<risingwave_batch::task::context::ComputeNodeContext>::async_execute::{{closure}}::{{closure}}>::{{closure}}::{{closure}}>>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
 108: core::ptr::drop_in_place<alloc::boxed::Box<async_task::runnable::Builder<M>::spawn_local::Checked<madsim::sim::task::Spawner::spawn_inner<risingwave_batch::task::task_execution::BatchTaskExecution<risingwave_batch::task::context::ComputeNodeContext>::async_execute::{{closure}}::{{closure}}>::{{closure}}::{{closure}}>>>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
 109: core::ptr::drop_in_place<core::pin::Pin<alloc::boxed::Box<async_task::runnable::Builder<M>::spawn_local::Checked<madsim::sim::task::Spawner::spawn_inner<risingwave_batch::task::task_execution::BatchTaskExecution<risingwave_batch::task::context::ComputeNodeContext>::async_execute::{{closure}}::{{closure}}>::{{closure}}::{{closure}}>>>>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
 110: core::ptr::mut_ptr::<impl *mut T>::drop_in_place
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mut_ptr.rs:1461:18
 111: async_task::raw::RawTask<F,T,S,M>::drop_future::{{closure}}
             at /home/mrcroxx/.cargo/registry/src/index.crates.io-6f17d22bba15001f/async-task-4.4.0/src/raw.rs:454:24
 112: async_task::utils::abort_on_panic
             at /home/mrcroxx/.cargo/registry/src/index.crates.io-6f17d22bba15001f/async-task-4.4.0/src/utils.rs:34:13
 113: async_task::raw::RawTask<F,T,S,M>::drop_future
             at /home/mrcroxx/.cargo/registry/src/index.crates.io-6f17d22bba15001f/async-task-4.4.0/src/raw.rs:453:9
 114: <async_task::runnable::Runnable<M> as core::ops::drop::Drop>::drop
             at /home/mrcroxx/.cargo/registry/src/index.crates.io-6f17d22bba15001f/async-task-4.4.0/src/runnable.rs:850:13
 115: core::ptr::drop_in_place<async_task::runnable::Runnable<alloc::sync::Weak<madsim::sim::task::TaskInfo>>>
             at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/ptr/mod.rs:498:1
 116: madsim::sim::task::Executor::run_all_ready
             at /home/mrcroxx/.cargo/registry/src/index.crates.io-6f17d22bba15001f/madsim-0.2.22/src/sim/task/mod.rs:306:9
 117: madsim::sim::task::Executor::block_on
             at /home/mrcroxx/.cargo/registry/src/index.crates.io-6f17d22bba15001f/madsim-0.2.22/src/sim/task/mod.rs:238:13
 118: madsim::sim::runtime::Runtime::block_on
             at /home/mrcroxx/.cargo/registry/src/index.crates.io-6f17d22bba15001f/madsim-0.2.22/src/sim/runtime/mod.rs:126:9
 119: madsim::sim::runtime::builder::Builder::run::{{closure}}::{{closure}}::{{closure}}
             at /home/mrcroxx/.cargo/registry/src/index.crates.io-6f17d22bba15001f/madsim-0.2.22/src/sim/runtime/builder.rs:128:35
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.
thread '<unnamed>' panicked at /home/mrcroxx/.cargo/registry/src/index.crates.io-6f17d22bba15001f/async-task-4.4.0/src/utils.rs:17:5:
aborting the process
stack backtrace:
   0:     0x55cc0209ce4c - std::backtrace_rs::backtrace::libunwind::trace::he8dc9e634c3d5b69
                               at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/std/src/../../backtrace/src/backtrace/libunwind.rs:93:5
   1:     0x55cc0209ce4c - std::backtrace_rs::backtrace::trace_unsynchronized::hcc1fc094cc2c1843
                               at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/std/src/../../backtrace/src/backtrace/mod.rs:66:5
   2:     0x55cc0209ce4c - std::sys_common::backtrace::_print_fmt::he6c8e9059e21630d
                               at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/std/src/sys_common/backtrace.rs:67:5
   3:     0x55cc0209ce4c - <std::sys_common::backtrace::_print::DisplayBacktrace as core::fmt::Display>::fmt::h145361bce15f4a6a
                               at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/std/src/sys_common/backtrace.rs:44:22
   4:     0x55cc020d2ee0 - core::fmt::rt::Argument::fmt::hb890552c7d6e8c2b
                               at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/fmt/rt.rs:142:9
   5:     0x55cc020d2ee0 - core::fmt::write::h97d71643815ece8a
                               at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/core/src/fmt/mod.rs:1117:17
   6:     0x55cc020979ef - std::io::Write::write_fmt::h9145d12d2fb312aa
                               at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/std/src/io/mod.rs:1762:15
   7:     0x55cc0209cc34 - std::sys_common::backtrace::_print::h34d8c7ca46068513
                               at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/std/src/sys_common/backtrace.rs:47:5
   8:     0x55cc0209cc34 - std::sys_common::backtrace::print::h69de4357a88b4f9d
                               at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/std/src/sys_common/backtrace.rs:34:9
   9:     0x55cc0209e3e7 - std::panicking::default_hook::{{closure}}::hbf7d8b588847f0d6
  10:     0x55cc0209e14f - std::panicking::default_hook::h797e21049b5f2e1d
                               at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/std/src/panicking.rs:292:9
  11:     0x55cc0209e8d8 - std::panicking::rust_panic_with_hook::h4b1939882c095f9c
                               at /rustc/249624b5043013d18c00f0401ca431c1a6baa8cd/library/std/src/panicking.rs:731:13
malloc_consolidate(): unaligned fastbin chunk detected

How to add madsim support for opendal?

Hello, I am the maintainer of OpenDAL. This project is interesting and I would like to extend it with OpenDAL madsim support. Is there any way to do this without copying all opendal code to this repo?

Improving behaviour to failed socket address parsing

At present, a failure in parsing a socket address from a str results in a panic. Recognizing that constructing an I/O error with payload requires heap allocation and is thus costly, an error is still returned if a DNS lookup fails. It could be argued that returning an error in this scenario is a more preferable behavior than just panic, isn't it?

Replace std::sync::<Lock> with parking_lot lib

The main API difference between the two libs is poison detection. Poison detection is useful when panic happens in some thread while others are still runing, but it's not the case in this lib. In a rpc lib, we usually panic when the whole system breaks and can't be recovered, otherwise we should not panic. In another word there's no need to detection mutex poison in madsim.

Apart from poison detection parking_lot locks have better performance in most cases. So switch to parking_lot sync lock is the obvious choice from my view.

How to deal with thread spawned outside of madsim simulation

Hi, thanks for maintaining this fantastic project! I have a question regarding the scenario of invoking external C libraries that may create system threads.

For instance, I've been using rocksdb on madsim runtime, the main thread of rocksdb operates inside the madsim context while the threads it spawned do not, thereby leading to an inconsistency.

Any suggestions on this? Currently I have to allow spawning a system thread and call rocksdb functions inside it.

simulate fs

Now we only have a very simple file system simulator for the raft use. We need to improve that.

simulate local time and clock going backwards

Now we only have a global time that goes monotonically. However, in the real world, each node has its own local system time. And the system time can be set by the user, causing the potential clock backward. This is a very tricky scenario where applications are likely to be crashed. We should simulate this behavior.

tonic: simulate connection broken

The real-world gRPC sets up a TCP connection between client and server. So when a node is down, the other side will return an error of connection broken down. However, in our simulation, the underlying network is a connectionless model, so nothing will happen when the peer is down.

bug: the first RPC will timeout after kill-restart

https://github.com/huang-jl/madsim-bug/blob/main/src/lib.rs

[ WARN][0.114723s][0.0.0.0:0][madsim_test::tests] Server crash!
[ WARN][5.114723s][0.0.0.0:0][madsim_test::tests] Server restart!
[ INFO][10.114723s][10.0.0.2:1234][madsim_test] Send Incr request from 10.0.0.2:1234 to 10.0.0.1:8000
[ WARN][11.114723s][10.0.0.2:1234][madsim_test] Send incr fail: RPC timeout  <---
[ INFO][11.114723s][10.0.0.2:1234][madsim_test] Send Incr request from 10.0.0.2:1234 to 10.0.0.1:8000
[ INFO][11.124569s][10.0.0.1:8000][madsim_test] Receive increment request, local addr = 10.0.0.1:8000
[ INFO][11.128317s][10.0.0.2:1234][madsim_test] Send incr success

wrong `tokio::runtime::Runtime::enter` implementation

Related #175

Reproduce

[package]
name = "test-tokio-madsim"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
tokio = { version = "0.2", package = "madsim-tokio", features = ["rt", "macros", "rt-multi-thread"] }
fn main() {
    let rt = tokio::runtime::Runtime::new().unwrap();
    let _guard = rt.enter();
    tokio::spawn(async {
        println!("Hello, world!");
    });
}
thread 'main' panicked at /Users/tianyizhuang/.cargo/registry/src/index.crates.io-6f17d22bba15001f/madsim-0.2.23/src/sim/runtime/context.rs:27:44:
there is no reactor running, must be called from the context of a Madsim runtime
stack backtrace:
   0: rust_begin_unwind
             at /rustc/4578435e1695863d921c7763d5a0add98f8e3869/library/std/src/panicking.rs:597:5
   1: core::panicking::panic_fmt
             at /rustc/4578435e1695863d921c7763d5a0add98f8e3869/library/core/src/panicking.rs:72:14
   2: core::panicking::panic_display
             at /rustc/4578435e1695863d921c7763d5a0add98f8e3869/library/core/src/panicking.rs:178:5
   3: core::panicking::panic_str
             at /rustc/4578435e1695863d921c7763d5a0add98f8e3869/library/core/src/panicking.rs:152:5
   4: core::option::expect_failed
             at /rustc/4578435e1695863d921c7763d5a0add98f8e3869/library/core/src/option.rs:1979:5
   5: core::option::Option<T>::expect
             at /rustc/4578435e1695863d921c7763d5a0add98f8e3869/library/core/src/option.rs:888:21
   6: madsim::sim::runtime::context::current_task::{{closure}}
             at /Users/tianyizhuang/.cargo/registry/src/index.crates.io-6f17d22bba15001f/madsim-0.2.23/src/sim/runtime/context.rs:27:22
   7: std::thread::local::LocalKey<T>::try_with
             at /rustc/4578435e1695863d921c7763d5a0add98f8e3869/library/std/src/thread/local.rs:270:16
   8: std::thread::local::LocalKey<T>::with
             at /rustc/4578435e1695863d921c7763d5a0add98f8e3869/library/std/src/thread/local.rs:246:9
   9: madsim::sim::runtime::context::current_task
             at /Users/tianyizhuang/.cargo/registry/src/index.crates.io-6f17d22bba15001f/madsim-0.2.23/src/sim/runtime/context.rs:27:5
  10: madsim::sim::task::Spawner::current
             at /Users/tianyizhuang/.cargo/registry/src/index.crates.io-6f17d22bba15001f/madsim-0.2.23/src/sim/task/mod.rs:577:20
  11: madsim::sim::task::spawn
             at /Users/tianyizhuang/.cargo/registry/src/index.crates.io-6f17d22bba15001f/madsim-0.2.23/src/sim/task/mod.rs:655:5
  12: test_tokio_madsim::main
             at ./src/main.rs:4:5
  13: core::ops::function::FnOnce::call_once
             at /rustc/4578435e1695863d921c7763d5a0add98f8e3869/library/core/src/ops/function.rs:250:5
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.

[NEED HELP] How to call a service from another service?

I am trying to simulate several services communicating via madsim RPC and I don't know how to call a service from another service. The Endpoint::call method is applied on an instance of Endpoint but I haven't found a way to get the Endpoint of the current service. See below a small example of what I'm trying to do.

#[derive(Serialize, Deserialize, Request)]
#[rtype("String")]
pub struct HttpGetRequest(pub String);

#[derive(Clone)]
pub struct FrontendService {
    redisAddr: SocketAddr,
}

#[madsim::service]
impl FrontendService {
    pub fn new(redisAddr: SocketAddr) -> Self {
        FrontendService {
            redisAddr,
        }
    }

    #[rpc]
    async fn serve_http_request(&self, req: HttpGetRequest) -> String {
        let reply = <CURRENT_ENDPOINT?>.call(self.redisAddr, redis::GetRedisRequest(req.0.clone())).await.unwrap();
        reply
    }
}

I tried multiple approaches:

  • The method server_on consumes the endpoint and Endpoint is not clonable so I can't pass the endpoint after the initial binding.
  • The creation of Endpoint from the method Endpoint::connect doesn't work in this context.
  • I can't use Arc as the serve_on method consumes an instance of Endpoint.

Is it a supported scenario?
If yes how can I do that?

If I can solve this problem, I will submit a PR containing an example of a microservices topology communicating via RPC and running on several simulated nodes. I think this kind of complete example doesn't exist yet in the repo and it would probably save time for future users. Feel free to point me to such an example if it exists.

fix local bind address

Currently, if a socket binds 0.0.0.0, the network simulator will replace it with the node IP (e.g. 192.168.1.1), or panic if the node IP is uninitialized.
The correct behavior is to keep the local address as 0.0.0.0 and store the remote address 192.168.1.1 for other nodes.

Process termination.

Processes may terminate in a number of ways, eg SIGTERM, SIGKILL, machine down. Currently, Madsim does not simulate processes termination very well.

Madsim provides kill function to terminate a host, but the semantic of the kill function is unclear.

See this code:

struct Foo(String);

impl Foo {
    fn new(name: &str) -> Foo {
        println!("foo::new in {}", name);
        Foo(name.to_owned())
    }
}

impl Drop for Foo {
    fn drop(&mut self) {
        println!("foo::drop in {}!", self.0);
    }
}

#[madsim::main]
async fn main() {
    let handle = madsim::Handle::current().create_host("127.0.0.1:10086").unwrap();
    handle.spawn(async move {
        let _foo = Foo::new("madsim");
        loop {
            madsim::time::sleep(std::time::Duration::from_secs(1)).await;
        }
    }).detach();

    madsim::time::sleep(std::time::Duration::from_secs(1)).await;
    madsim::Handle::current().kill("127.0.0.1:1".parse().unwrap());
    loop {
        madsim::time::sleep(std::time::Duration::from_secs(1)).await;
    }
}

Run this code with features sim enabled, we will find that foo::drop is called. This means that the kill function behaves more like SIGTERM than SIGKILL, and this function cannot simulate some unexpected events (machine failure, out of memory, etc.). We have no way to know that when all async tasks on that host will be dropped, so it's still hard to determine a suitable time to restart host.

If we replace sleep in spawned task with futures::future::pending::<()>().await. We will see that the drop function is not called, which looks like SIGKILL (but not same as SIGKILL).

It seems difficult to simulate SIGKILL or machine failure. If the drop function is not called, there may be resource leaks (memory, file locks, etc.)

Another problem is that drop function won't be called automatically after process termination. See this code:

struct Foo(String);

impl Foo {
    fn new(name: &str) -> Foo {
        println!("foo::new in {}", name);
        Foo(name.to_owned())
    }
}

impl Drop for Foo {
    fn drop(&mut self) {
        println!("foo::drop in {}!", self.0);
    }
}

#[madsim::main]
async fn main() {
    let handle = madsim::Handle::current().create_host("127.0.0.1:10086").unwrap();
    handle.spawn(async move {
        let _foo = Foo::new("madsim");
        loop {
            madsim::time::sleep(std::time::Duration::from_secs(1)).await;
        }
    }).detach();

    madsim::time::sleep(std::time::Duration::from_secs(1)).await;
}

Foo::drop won't be called run this code with feature sim or std. This makes it difficult for applications that use madsim to exit gracefully. It seems would be better to provide an async fn as an entry function when the host is created, like spawn_blocking in tokio. When this entry function exits, the host is considered finished. And provide a joinhandle to wait for the host to finish.

support tokio's tracing feature

error: failed to select a version for `madsim-tokio`.
    ... required by package `risingwave_ctl v0.1.9 (/Users/skyzh/Work/risingwave/src/ctl)`
    ... which satisfies path dependency `risingwave_ctl` (locked to 0.1.9) of package `risingwave_cmd v0.1.9 (/Users/skyzh/Work/risingwave/src/cmd)`
versions that meet the requirements `=0.2.0-alpha.3` (locked to 0.2.0-alpha.3) are: 0.2.0-alpha.3

the package `risingwave_ctl` depends on `madsim-tokio`, with features: `tracing` but `madsim-tokio` does not have these features.

we may see if we can pass this feature flag to tokio.

`tokio::time::sleep(Duration::ZERO)` is not simulated properly

Having two tasks, t1 sleeps in a loop with a non-zero duration, t2 with Duration::ZERO, I observe that t2 always wins and t1 does not make any progress. Switching t2 to yield_now instead of sleep has no effect.
IRL it looks like sleep(Duration::ZERO) always yields for at least 1 ms.

Docs: Add caveat about madsim reproducibility

Initially observed here: risingwavelabs/risingwave#7901

Across various build environments the behaviour of RNG in a system may change, if they use from_entropy for instance.
Or even just compilation might produce different ordering of rng writes and reads.

It should behave the same only with the same build.
For example: we can run the original binary, it should yield the same result with the same SEED.
Alternatively: Ensure build environment is the same (compiler, os, commit).

Should add some caveat about this in docs + output on how to reproduce when madsim cluster crashes.

Don't expose `NetLocalHandle` or make it !Send

NetLocalHandle is the entry point of all network operations. However, it should not be sent to other threads. One thread using a handle from other thread would cause it to block forever. The current API design is likely to be misused. For example, users tend to store the handle in a global state. To prevent that, we should either hide this type and make its methods static, or make this type !Send.

tonic: reduce dependencies on `cfg(madsim)`

Currently when building in simulation mode (with --cfg madsim), madsim-tonic still depends on tonic and re-exports some of its types. However, tonic has a huge dependence tree. Most of these crates are useless in our simulation. Let's figure out how to get rid of them in order to save compile time.

Add `#[service]` macro

Currently building a server on top of the RPC interface needs a lot boilerplate code:

pub struct Server {...}

impl Server {
    pub fn new() -> Arc<Self> {
        let server = Arc::new(Server {...});
        server.add_rpc_handler();
        server
    }

    // boilerplate!
    fn add_rpc_handler(self: Arc<Self>) {
        let net = NetLocalHandle::current();

        let this = self.clone();
        net.add_rpc_handler(move |req: Ping| {
            let this = this.clone();
            async move { this.ping(req) }
        });
    }

    fn ping(&self, req: Ping) {
        // handle RPC...
    }
}

We hope to provide a procedural macro to make that easy:

#[madsim::service]
impl Server {
    #[ctor]
    pub fn new() -> Arc<Self> {
        Arc::new(Server {...})
    }

    #[rpc]
    fn ping(&self, req: Ping) {
        // handle RPC...
    }
}

print/validate MADSIM_CONFIG_HASH

I see this line in the output, but it seems this hash is nowhere printed and valiadated ๐Ÿ˜„

and make sure `MADSIM_CONFIG_HASH=1AD2E4ABC41F3553`

Peer address of TCP connection

Issue

For now, when an endpoint accepts a connection, it will wait for the peer to send the real peer address (i.e. binding address). Later the endpoint will save this real peer address instead of the socket's remote address:

https://github.com/madsys-dev/madsim/blob/5a7a858547cc350408ec9f78261062295c3fcca8/madsim/src/std/net/tcp.rs#L81-L99

However, for example, when two endpoints R1 and R2 want to connect to S, R1 and R2 may bind to the same address like 0.0.0.0:xxxx. In this situation, S cannot distinguishes R1 and R2's connection.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.