Giter VIP home page Giter VIP logo

async-task's People

Contributors

dependabot[bot] avatar ivmarkov avatar johntitor avatar js2xxx avatar khoover avatar laizy avatar lucretiel avatar michaelwoerister avatar mystenmark avatar mystor avatar notgull avatar osiewicz avatar skade avatar skifire13 avatar taiki-e avatar victorkoenders avatar yoshuawuyts avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

async-task's Issues

Efficiently support bounded queues for scheduling?

I'm interested in using a bounded queue for scheduling but all the existing examples use an unbounded queue and I'm not sure how to make it efficient and runtime safe (i.e. no panics) to switch to a bounded queue. I'd like my spawn function to instead return a kind of busy/full error. I've created an example repo that shows some of the challenges I faced trying to get this working: https://github.com/jasta/async-task-bounded-example, which does work but with a couple major caveats:

  1. It seems we need to inefficiently track the spawned Task's and loop through all of them each time runnable.run() is called to make sure the finished ones are cleared out. This tracking lets us know if we were to have more than n unfinished spawned tasks that could all spuriously wake up at the same time and fill our queue beyond n capacity.
  2. It's not clear if schedule() can ever occur twice before run(), definitely breaking (1) above if it did. I know it's not supposed to happen nominally, but can it under edge cases? run() seems idempotent, so if it was happening today even if by mistake I'm not sure you'd get any adverse runtime behaviour.

I'd propose that to make this more reliable and efficient we add an API along-side run() that returns whether the task is finished (maybe an enum return that tells us strictly more than the current run). Perhaps also a stateful check and associated tests for schedule to make sure it can't be run twice if that is indeed the case; or if not, finding another way to guarantee that we can efficiently protect against the schedule queue becoming full.

For some context, this is to make it possible to call wake() (and thus schedule tasks) from an ISR function in an embedded system it is important that we not block the routine or call into libc functions, including malloc (see Introduction to RTOS - Hardware Interrupts for more reading).

EDIT: The repository has been updated with an efficient and correct solution.

fails to test examples without feature std

The command cargo test --all-targets --no-default-features fails like this:

514s error[E0425]: cannot find function `spawn_local` in crate `async_task`
514s   --> examples/spawn-local.rs:22:40
514s    |
514s 22 |     let (runnable, task) = async_task::spawn_local(future, schedule);
514s    |                                        ^^^^^^^^^^^ not found in `async_task`
514s 
514s For more information about this error, try `rustc --explain E0425`.
514s error: could not compile `async-task` due to previous error

Option to avoid panicking when polling a cancelled task

In panic=abort environments, currently polling a Task which was cancelled due to the Runnable being dropped will cause a panic and abort the entire panicking process.

Poll::Ready(t) => Poll::Ready(t.expect("task has failed")),

It would be nice to have an alternative option for polling cancelled runnables which doesn't panic, either producing a Result or Option to indicate that the task was cancelled, or continuing to produce poll::Pending and cancelling the blocked task silently (e.g. by dropping the waker).

Exposing the Task<T>::poll_task method publicly should be sufficient to allow downstream crates to customize the behavior they want with wrappers, though async_task could also expose a wrapper type itself which delegates the Future::poll implementation directly to poll_task without the unwrap call.

Unclear behaviour of casting ptr of RawTask to ptr of Header

Hi, I'm going through this repo in order to understand async runtimes better. I'm not able to understand a part of the codebase:

Task's field ptr is is a pointer to RawTask. But in Task's methods, I see it being casted as a pointer to Header.

impl Task {
    fn poll_task(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
            let ptr = self.ptr.as_ptr();
            let header = ptr as *const Header<M>;
            ...
    }
    ...
}

From what I understand, this can happen since ptr to Header is the first field in RawTask:

/// Raw pointers to the fields inside a task.
pub(crate) struct RawTask<F, T, S, M> {
    /// The task header.
    pub(crate) header: *const Header<M>,
    ...
}

According to this post, shouldn't this be only possible with #[repr(C)] on RawTask?

When does schedule called?

I thought it is called when waker is waked.

If schedule is !Send+!Sync, what will happen if the waker is waked in other threads?

Support for statically allocating tasks (no_alloc usage)

Motivation

It is currently possible to use async_task with no_std, but it requires having alloc available because tasks are dynamically allocated.

In memory-constrained environments, such as embedded microcontroller devices, it's often useful to statically declare everything and not have any dynamic allocator. The main advantage is that you have compile-time guarantees that your program will never run out of RAM (The linker knows how much RAM the target device has, and will error if all the static variables won't fit).

Generally in an embedded device the following kinds of tasks are present:

  • tasks that start up at boot and run forever
  • tasks that are started and stopped, but never run multiple instances concurrently
  • tasks that run a bounded number of concurrent instances (usually small, 4-16 instances)

It is very rare that you want an arbitrary number of tasks of arbitrary types mixed together. You rarely have enough RAM for it, and it tends to cause fragmentation problems and unpredictable out of RAM errors.

It would be a huge boon for embedded if async_task allowed statically pre-allocating tasks.

Aditionally, this would be especially useful combined with #![feature(type_alias_impl_trait)] (rust-lang/rust#63063), which makes it possible to name the future types of async fns. (naming the type is needed so the user can declare the static variable containing the task)

API proposal

The API could be something like this

// maybe R is not needed
pub struct StaticTask<F, R, S, T> { /* storage for a raw task */ } 

impl StaticTask<F, R, S, T>
where
    F: Future<Output = R> + Send + 'static,
    R: Send + 'static,
    S: Fn(Task<T>) + Send + Sync + 'static,
    T: Send + Sync + 'static,
{
    // create a new StaticTask in "free" state
    // The bit pattern of the return value must be only zero bits and uninitialized bits, so
    // StaticTasks can be placed in the .bss section (otherwise they'd go into .data which wastes flash space)
    pub const fn new() -> Self { ... }

    // If self is in "free" state, change it to "used" state and initialize it with the given future, and return the task and joinhandle.
    // if self is in "used" state, return None.
    pub fn spawn<F, R, S, T>(&'static self, future: F, schedule: S, tag: T) -> Option<(Task<T>, JoinHandle<R, T>)> { .. } 
}

This would be used like this

static MY_TASK: StaticTask<MyFuture, MyFuture::Output, ??, ()> = StaticTask::new()

fn main() {
   if let Some(t, j) = MY_TASK.spawn(my_do_something(), |t| { /* schedule t)}, ()) {
      t.schedule()
   } else {
      // spawn failed because the static task is already running, return some error
   }
}

When the task is no longer running (ie when it would be freed if it was dynamically allocated), the StaticTask is returned to "free" state, so it can be used by .spawn() again.

This would make it possible to mix statically-allocated and dynamically-allocated tasks in the same executor.

Having so many generic arguments in StaticTask is somewhat ugly because the user has to manually specify them, but this is something executor libraries could abstract (ie export a newtype so you only have to set F). A higher-level executor API could be like this:

static MY_TASK: my_executor::Task<MyFuture> = my_executor::Task::new()

fn main() {
  // dynamically allocate
  my_executor::spawn(my_do_something());

  // statically allocate
  MY_TASK.spawn(my_do_something());

  my_executor::run()
}

Alternatives

Add an API where the user can specify a custom allocator for spawning, via some trait. Still, th library would still have to export a type so that user code can know what's the size required for a RawTask of a given future, so they can statically allocate buffers of the right size.

[Feature Request] a non-async way to poll a JoinHandle for a result

Currently I'm working on a project that mixes async and sync code, and my general pattern for dealing with this has been to create oneshot channels and spawn tasks which send to the oneshot channel, and on the sync side, poll for results using try_recv.

Would it be possible or sensible to have such a non-async poll method directly on the JoinHandle returned by the executor? Can I get the same result by using Future::poll with a no-op Waker?

Right now there are only a few sanctioned "sync <-> async bridge" methods like oneshot's try_recv, and I'm not sure how much the rust async ecosystem in general is aware of this use case. None of the popular executors I see return join handles that can be non-async polled. They may all work with no-op wakers, but my worry would be that I would somehow cause a panic by polling without honoring waking, but maybe that's guaranteed not to happen? It would also be nicer if there was a sanctioned, optimized way to do this generally.

This pattern shows up a lot when you have a hard / soft real-time loop that is waiting on the result of async code, so games and stuff like games.

spawn_local for no_std ?

Hi,

I'm looking to build a small executor with async-task that runs in the idle task of the RTIC (formerly RTFM) framework.

I would like to run non-Send futures in this guaranteed single-thread environment. This would be a very similar function to the current spawn_local, but with no thread id checks and only references to core and alloc crates.

Do you think this can work ?
If so, I will submit a PR soon.

Why manual call of schedule is needed on runnable?

let schedule = |runnable| QUEUE.send(runnable).unwrap();
let (runnable, task) = async_task::spawn(async { 1 + 2 }, schedule);

// Schedule the task and await its output.
runnable.schedule(); <- this line
assert_eq!(smol::future::block_on(task), 3);

Frankly,it's not very ergonomic. Why not the future starting to be scheduled once it's awaited, just use Rust lazy future feature.

Implicit boxing of large futures causes excessive monomorphization

async-task contains an optimisation for handling large futures in the definition of spawn_unchecked. This leads to excessive IR size, as one branch instantiates RawTask with Fut and the other does so with Pin<Box<Fut>>. This probably gets eliminated within LLVM (as the branch itself is trivial), but it's still a bummer that this cannot be truly determined at compile time. I took several stabs recently at getting rid of the unnecessary instantiation, without luck. I do understand why we need the boxing, but it'd be nice to not spend time on generating code we're gonna throw away in LLVM anyways.
Getting rid of large-future-boxing reduces the LLVM IR size for my medium-size (~1.5M LLVM IR lines in debug) crate by 7%. This is also replicated in examples from this crate:

Example name LLVM IR before LLVM IR after % of original
spawn 18276 15631 85.52%
spawn-local 39801 34537 86.77%
spawn-on-thread 18667 16031 85.87%
with-metadata 32834 24887 75.79%

Related: rust-lang/rust#85836

Clarify safety of spawn_unchecked for Runnable lifetime

async_task::spawn_unchecked mentions the following requirement:

If future is not 'static, borrowed variables must outlive its Runnable.

The problem is that I don't see a feasible way to guarantee this. Currently I have the following code:

fn ensure_send<T: Send>(val: T) -> T { val }
fn ensure_send_sync<T: Send + Sync>(val: T) -> T { val }

// SAFETY: the runner may not live longer than ScopedExecutor.
// We ensure this by keeping a weak reference to the scheduler
// which is destroyed when the executor is dropped.
// The other conditions are checked by ensure_send(_sync).
let scheduler = Arc::downgrade(&self.scheduler);
async_task::Builder::new()
    .metadata(TaskMetadata { affinity, priority })
    .spawn_unchecked(
        ensure_send(|_| fut),
        ensure_send_sync(move |task| if let Some(s) = scheduler.upgrade() {
            s.add_task(task)
        }),
    );

I believe (and please do correct me if I'm wrong) this is an intended usage of spawn_unchecked to support a potential ScopedExecutor<'a> which can add fut: Future<...> + Send + 'a tasks. If the ScopedExecutor is dead, any existing Wakers will call schedule which does nothing as the weak pointer is dead.

However... this technically violates the safety requirement of spawn_unchecked! Because my schedule functions gets called... with a Runnable! And nothing is preventing fut from storing any Waker it receives in its poll function inside some &'static Mutex<Waker>, which it can call way after 'a has expired. Sure, I immediately drop it since the weak pointer is dead, but technically a Runnable has survived after 'a.

Dummy task

Ability to create an already completed Task with something like Task::completed(...).

This would serve as a way to mock asynchronous functionality with synchronous code.

My specific use case is an interface which has to return a Task, but the actual code doesn't have a way to create said Task.
(I don't think this solution can be called sound, but it is what it is)
source

This is very much inspired by C#.

Run tests for async-executor in CI

Similarly to how patched tests for async-io are run in the CI workflow for polling, we should also run tests for async-executor in the CI for this crate.

Http clients for async-task

Hey I'm currently writing the async rust book for O'Reilly. I love this crate and I generally use it for teaching the concepts async runtimes. However, I usually use crates like Tokio at work because well, that's just what industry seems to go for and I have no say in rewriting the entire stack. So When I got the opportunity to write the async rust book I was excited to put this crate into chapter three. However, I've quickly realised that although writing custom queues in this crate is a dream, the HTTP crates I usually lean on just don't work as they're looking for Tokio runtimes etc. I'd love to have a complete runtime with HTTP for the book on this crate. Are there any HTTP crates that work with async-task? If not I'm up for collaborating on a HTTP crate that can work with async-task.

Custom allocators

I've been working on a specialized fork of async_executor focused on prioritized execution high performance CPU bound tasks for bevy. Thus far I've made a number of optimizations that definitely have helped out with overhead, but spawning tasks remains one of the primary bottlenecks, which is coming from spawn_unchecked.

One of the primary use cases is a lifetime limited scoped batch of tasks that we know the inner tasks cannot outlive. As we know that futures and tasks cannot outlive the scope, one of the more appealing options is to use bumpalo or some thread safe wrapper around it to speed up allocation.

It'd be super helpful to be able to provide a version of the API that allows specifying your own allocator or a callback for allocating memory for use in non-nightly environments.

Can there be no additional scheduling of tasks when Task cancel or Detach

The following code is logic in my dynamic call based async-task project. And Here, I didn't let the task reschedule to run, but it didn't seem any problem.

  • task.rs
     //Cancel task execution (may be called only once because ownership will be taken for either a cancer or drop)
    fn set_canceled(&mut self) {
        let raw = unsafe { self.raw.as_mut() };
        let mut state = raw.state.load(Ordering::Acquire);
        loop {`
            //If the task has been completed or closed, it does not need to be cancelled.
            if state & (COMPLETED | CLOSED) != 0 {
                break;
            }
            match raw.state.compare_exchange_weak(
                state,
                state | CLOSED,
                Ordering::AcqRel,
                Ordering::Acquire,
            ) {
                Ok(_) => {
                    //Notify watcher if task is in pendding status
                    //Drop Future
                    //Reduce reference counts. At this point task is definitely not detached from task, so there is no real destory RawTask.
                    //When the task is detached, the task is truly destroyed
                    //It is only possible that the notified here has other tasks
                    if state & (SCHEDULED | RUNNING) == 0 {
                        raw.notify(None);
                        raw.drop_future();
                        raw.drop_ref();
                    }
                }
                Err(s) => state = s,
            }
        }
    }
    unsafe fn set_detached(&mut self) -> Option<T> {
        let raw = self.raw.as_mut();
        let mut output = None;
        //Most often dissociate when created for a task, at which point only simple removal of the task Association mark is necessary
        if let Err(mut state) = raw.state.compare_exchange_weak(
            SCHEDULED | TASK | REFERENCE,
            SCHEDULED | REFERENCE,
            Ordering::AcqRel,
            Ordering::Acquire,
        ) {
            //Detach directly from RawTask
            state = raw.state.fetch_and(!TASK, Ordering::AcqRel);
            //If the RawTask has been completed but not closed
            //Then need to  take out  the results of the task.
            if state & (COMPLETED | CLOSED) == COMPLETED {
                output = Some((raw.output as *mut T).read());
            }
            //If the task reference is already zero when detach, the task needs to be destroyed.
            if state & !(REFERENCE - 1) == 0 {
                raw.destroy();
            }
        }
        output
    }
    fn poll_task(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
        unsafe {
            let raw = self.raw.as_mut();
            let mut state = raw.state.load(Ordering::Acquire);
            // If the task is cancelled, register yourself to the task and wait for the task to cancel.
            //No task execution results are acquired at this time.
            if state & CLOSED != 0 {
                //If the task is scheduled or running, we need to wait until its future is abandoned.
                if state & (SCHEDULED | RUNNING) != 0 {
                    //The awaiter is replaced by a waker associated with the current task.
                    raw.register(cx.waker());
                    //Reload the state after registering. It is possible changes occurred just
                    // before registration so we need to check for that.
                    state = raw.state.load(Ordering::Acquire);
                    //If the task is still planned or running, we need to wait because its future has not been abandoned.
                    if state & (SCHEDULED | RUNNING) != 0 {
                        return Poll::Pending;
                    } else {
                       //Otherwise, we remove the walker just registered anyway.
                        let _rtn = raw.take();
                    }
                }
                return Poll::Ready(None);
            }
            // If the task is not completed, register the current waker.
            if state & COMPLETED == 0 {
                //Replacing the awaker with an arousal associated with the current task。
                raw.register(cx.waker());
                //  Reload the state after registering. It is possible that the task became
                // completed or closed just before registration so we need to check for that.
                state = raw.state.load(Ordering::Acquire);
               
                // It is not possible to get here because the only one that can now operate the rawtask status is our own.
                // if state & CLOSED != 0 {
                //     continue;
                // }
                // If the task is still not completed, we're blocked on it
                if state & COMPLETED == 0 {
                    return Poll::Pending;
                } else {
                    raw.take();
                }
            }
            raw.state.fetch_or(CLOSED, Ordering::AcqRel);
            //从任务中获取输出。
            let output = raw.output as *mut T;
            return Poll::Ready(Some(output.read()));
        }
    }

*raw.rs

    pub(crate) unsafe fn run(&mut self) -> bool {
        println!("runing");
        let mut state = self.state.load(Ordering::Acquire);
        loop {
            //Task switched to RUNNING state.
            let new = (state & !SCHEDULED) | RUNNING;
            match self
                .state
                .compare_exchange_weak(state, new, Ordering::AcqRel, Ordering::Acquire)
            {
                Ok(_) => {
                    state = new;
                    break;
                }
                Err(s) => state = s,
            }
        }
        //Start execution future
        let waker = ManuallyDrop::new(Waker::from_raw(RawWaker::new(
            self as *const _ as *const (),
            &Self::RAW_WAKER_VTABLE,
        )));
        let cx = &mut Context::from_waker(&waker);
        let poll = self.dynfn.as_mut().poll(cx);
        match poll {
            Poll::Ready(_) => {
                loop {
                    //Task switch to COMPLETED state
                    let new = if state & TASK == 0 {
                        (state & !RUNNING & !SCHEDULED) | COMPLETED | CLOSED
                    } else {
                        (state & !RUNNING & !SCHEDULED) | COMPLETED
                    };
                    match self.state.compare_exchange_weak(
                        state,
                        new,
                        Ordering::AcqRel,
                        Ordering::Acquire,
                    ) {
                        Ok(_) => {
                            if state & TASK == 0 || state & CLOSED != 0 {
                                self.drop_output();
                            }
                            if state & AWAITER != 0 {
                                self.notify();
                            }
                            self.drop_ref();
                            break;
                        }
                        Err(s) => {
                            state = s;
                        }
                    }
                }
            }

            Poll::Pending => loop {
                let new = if state & CLOSED != 0 {
                    state & !RUNNING & !SCHEDULED
                } else {
                    state & !RUNNING
                };
                match self.state.compare_exchange_weak(
                    state,
                    new,
                    Ordering::AcqRel,
                    Ordering::Acquire,
                ) {
                    Ok(_) => {
                        if state & CLOSED != 0 {
                            if state & AWAITER != 0 {
                                self.notify();
                            }
                            self.drop_future();
                            self.drop_ref();
                        } else if state & SCHEDULED != 0 {
                            self.schedule();
                            return true;
                        }
                        break;
                    }
                    Err(s) => state = s,
                }
            },
        }
        false
    }

Add names to tasks

In order to make it easier to debug tasks, it should be possible to give tasks names.

I was thinking of an API like this:

Task::builder().name("my-task").spawn(async { println!("Hello world!"); });

Where, if the name() was occluded, it would be the same as Task::spawn.

Re-using memory for RawTask

I'd like to re-use memory for a RawTask, because I create and destroy tasks in a tight loop. Would it be feasible to let users provide an object to do custom allocations, such as an instance of the Bumpalo allocator? Thank you!

Bug: Unit test with expected panic message randomly fails because of crate-specific panic message

I currently write unit tests for my Bevy project which uses the current async-task version.
My unit test looks like this:

#[test]
#[should_panic = "invalid `progress_query`: `QueryOutOfRange(2..=3)`"]

As expected, the test fails with the given message in one of the systems I wrote. However, the unit test sometimes still fails because my panic seems to cause another panic here with a different message text:

Poll::Ready(t) => Poll::Ready(t.expect("task has failed")),

---- controller::test::forward::forward_query_forward_to_too_many_ticks stdout ----
note: panic did not contain expected string
panic message: "task has failed",
expected substring: "invalid progress_query: QueryOutOfRange(2..=3)"

In my case the async_task code is called by bevy here:

https://github.com/bevyengine/bevy/blob/93a131661de507eb711264b11965fe1d4bb13f12/crates/bevy_tasks/src/task_pool.rs#L173

It only happens sometimes though, I can repeat the test multiple times (without recompiling) and this happens only sometimes. Seems to be a race condition.

This is the full console output. My project is rather large so I could not set up a minimal example of this issue yet.

Hidden output
Running unittests src\lib.rs (target\debug\deps\library-55a25fc12b677daa.exe)

running 1 test
thread 'TaskPool (0)' panicked at 'invalid progress_query: QueryOutOfRange(2..=3)', src\controller\test.rs:163:25
stack backtrace:
0: std::panicking::begin_panic_handler
at /rustc/57f097ea25f2c05f424fc9b9dc50dbd6d399845c/library\std\src\panicking.rs:556
1: core::panicking::panic_fmt
at /rustc/57f097ea25f2c05f424fc9b9dc50dbd6d399845c/library\core\src\panicking.rs:142
2: library::controller::test::impl$3::run::closure$1<2>
at .\src\controller\test.rs:163
3: core::ops::function::impls::impl$3::call_mut<tuple$<bevy_ecs::system::system_param::Reslibrary::controller::Controller,bevy_ecs::system::commands::Commands>,library::controller::test::impl$3::run::closure_env$1<2> >
at /rustc/57f097ea25f2c05f424fc9b9dc50dbd6d399845c\library\core\src\ops\function.rs:297
4: bevy_ecs::system::function_system::impl$18::run::call_inner<tuple$<>,bevy_ecs::system::system_param::Reslibrary::controller::Controller,bevy_ecs::system::commands::Commands,ref_mut$<library::controller::test::impl$3::run::closure_env$1<2> > >
at C:\Users\Anonymous.cargo\registry\src\github.com-1ecc6299db9ec823\bevy_ecs-0.8.1\src\system\function_system.rs:564
5: bevy_ecs::system::function_system::impl$18::run<tuple$<>,library::controller::test::impl$3::run::closure_env$1<2>,bevy_ecs::system::system_param::Reslibrary::controller::Controller,bevy_ecs::system::commands::Commands>
at C:\Users\Anonymous.cargo\registry\src\github.com-1ecc6299db9ec823\bevy_ecs-0.8.1\src\system\function_system.rs:567
6: bevy_ecs::system::function_system::impl$6::run_unsafe<tuple$<>,tuple$<>,tuple$<bevy_ecs::system::system_param::Reslibrary::controller::Controller,bevy_ecs::system::commands::Commands>,tuple$<>,library::controller::test::impl$3::run::closure_env$1<2> >
at C:\Users\Anonymous.cargo\registry\src\github.com-1ecc6299db9ec823\bevy_ecs-0.8.1\src\system\function_system.rs:403
7: bevy_ecs::schedule::executor_parallel::impl$2::prepare_systems::async_block$0
at C:\Users\Anonymous.cargo\registry\src\github.com-1ecc6299db9ec823\bevy_ecs-0.8.1\src\schedule\executor_parallel.rs:194
8: core::future::from_generator::impl$1::poll<enum2$<bevy_ecs::schedule::executor_parallel::impl$2::prepare_systems::async_block_env$0> >
at /rustc/57f097ea25f2c05f424fc9b9dc50dbd6d399845c\library\core\src\future\mod.rs:91
9: async_executor::impl$4::spawn::async_block$0<tuple$<>,core::future::from_generator::GenFuture<enum2$<bevy_ecs::schedule::executor_parallel::impl$2::prepare_systems::async_block_env$0> > >
at C:\Users\Anonymous.cargo\registry\src\github.com-1ecc6299db9ec823\async-executor-1.4.1\src\lib.rs:144
10: core::future::from_generator::impl$1::poll<enum2$<async_executor::impl$4::spawn::async_block_env$0<tuple$<>,core::future::from_generator::GenFuture<enum2$<bevy_ecs::schedule::executor_parallel::impl$2::prepare_systems::async_block_env$0> > > > >
at /rustc/57f097ea25f2c05f424fc9b9dc50dbd6d399845c\library\core\src\future\mod.rs:91
11: async_task::raw::RawTask<core::future::from_generator::GenFuture<enum2$<async_executor::impl$4::spawn::async_block_env$0<tuple$<>,core::future::from_generator::GenFuture<enum2$<bevy_ecs::schedule::executor_parallel::impl$2::prepare_systems::async_block_en
at C:\Users\Anonymous.cargo\registry\src\github.com-1ecc6299db9ec823\async-task-4.3.0\src\raw.rs:511
12: async_task::runnable::Runnable::run
at C:\Users\Anonymous.cargo\registry\src\github.com-1ecc6299db9ec823\async-task-4.3.0\src\runnable.rs:309
13: async_executor::impl$4::run::async_fn$0::async_block$0<enum2$<core::result::Result<tuple$<>,async_channel::RecvError> >,async_channel::Recv<tuple$<> > >
at C:\Users\Anonymous.cargo\registry\src\github.com-1ecc6299db9ec823\async-executor-1.4.1\src\lib.rs:235
14: core::future::from_generator::impl$1::poll<enum2$<async_executor::impl$4::run::async_fn$0::async_block_env$0<enum2$<core::result::Result<tuple$<>,async_channel::RecvError> >,async_channel::Recv<tuple$<> > > > >
at /rustc/57f097ea25f2c05f424fc9b9dc50dbd6d399845c\library\core\src\future\mod.rs:91
15: futures_lite::future::impl$12::poll<enum2$<core::result::Result<tuple$<>,async_channel::RecvError> >,async_channel::Recv<tuple$<> >,core::future::from_generator::GenFuture<enum2$<async_executor::impl$4::run::async_fn$0::async_block_env$0<enum2$<core::resu
at C:\Users\Anonymous.cargo\registry\src\github.com-1ecc6299db9ec823\futures-lite-1.12.0\src\future.rs:529
16: async_executor::impl$4::run::async_fn$0<enum2$<core::result::Result<tuple$<>,async_channel::RecvError> >,async_channel::Recv<tuple$<> > >
at C:\Users\Anonymous.cargo\registry\src\github.com-1ecc6299db9ec823\async-executor-1.4.1\src\lib.rs:242
17: core::future::from_generator::impl$1::poll<enum2$<async_executor::impl$4::run::async_fn_env$0<enum2$<core::result::Result<tuple$<>,async_channel::RecvError> >,async_channel::Recv<tuple$<> > > > > at /rustc/57f097ea25f2c05f424fc9b9dc50dbd6d399845c\library\core\src\future\mod.rs:91
18: futures_lite::future::block_on::closure$0<enum2$<core::result::Result<tuple$<>,async_channel::RecvError> >,core::future::from_generator::GenFuture<enum2$<async_executor::impl$4::run::async_fn_env$0<enum2$<core::result::Result<tuple$<>,async_channel::RecvE
at C:\Users\Anonymous.cargo\registry\src\github.com-1ecc6299db9ec823\futures-lite-1.12.0\src\future.rs:89
19: std::thread::local::LocalKey<core::cell::RefCell<tuple$parking::Parker,core::task::wake::Waker > >::try_with<core::cell::RefCell<tuple$parking::Parker,core::task::wake::Waker >,futures_lite::future::block_on::closure_env$0<enum2$<core::result::Result<
at /rustc/57f097ea25f2c05f424fc9b9dc50dbd6d399845c\library\std\src\thread\local.rs:446
20: std::thread::local::LocalKey<core::cell::RefCell<tuple$parking::Parker,core::task::wake::Waker > >::with<core::cell::RefCell<tuple$parking::Parker,core::task::wake::Waker >,futures_lite::future::block_on::closure_env$0<enum2$<core::result::Result<tupl
at /rustc/57f097ea25f2c05f424fc9b9dc50dbd6d399845c\library\std\src\thread\local.rs:422
21: futures_lite::future::block_on<enum2$<core::result::Result<tuple$<>,async_channel::RecvError> >,core::future::from_generator::GenFuture<enum2$<async_executor::impl$4::run::async_fn_env$0<enum2$<core::result::Result<tuple$<>,async_channel::RecvError> >,asy
at C:\Users\Anonymous.cargo\registry\src\github.com-1ecc6299db9ec823\futures-lite-1.12.0\src\future.rs:79
22: bevy_tasks::task_pool::impl$1::new_internal::closure$0::closure$0
at C:\Users\Anonymous.cargo\registry\src\github.com-1ecc6299db9ec823\bevy_tasks-0.8.1\src\task_pool.rs:120
note: Some details are omitted, run with RUST_BACKTRACE=full for
a verbose backtrace.
thread 'controller::test::forward::forward_query_forward_to_too_many_ticks' panicked at 'task has failed', C:\Users\Anonymous.cargo\registry\src\github.com-1ecc6299db9ec823\async-task-4.3.0\src\task.rs:426:45
stack backtrace:
0: std::panicking::begin_panic_handler
at /rustc/57f097ea25f2c05f424fc9b9dc50dbd6d399845c/library\std\src\panicking.rs:556
1: core::panicking::panic_fmt
at /rustc/57f097ea25f2c05f424fc9b9dc50dbd6d399845c/library\core\src\panicking.rs:142
2: core::panicking::panic_display
at /rustc/57f097ea25f2c05f424fc9b9dc50dbd6d399845c/library\core\src\panicking.rs:72
3: core::panicking::panic_str
at /rustc/57f097ea25f2c05f424fc9b9dc50dbd6d399845c/library\core\src\panicking.rs:56
4: core::option::expect_failed
at /rustc/57f097ea25f2c05f424fc9b9dc50dbd6d399845c/library\core\src\option.rs:1873
5: enum2$<core::option::Option<tuple$<> > >::expect<tuple$<> >
at /rustc/57f097ea25f2c05f424fc9b9dc50dbd6d399845c\library\core\src\option.rs:738
6: async_task::task::impl$7::poll<tuple$<> >
at C:\Users\Anonymous.cargo\registry\src\github.com-1ecc6299db9ec823\async-task-4.3.0\src\task.rs:426
7: bevy_tasks::task_pool::impl$1::scope::closure$0::async_block$0<bevy_ecs::schedule::executor_parallel::impl$1::run_systems::closure_env$0,tuple$<> >
at C:\Users\Anonymous.cargo\registry\src\github.com-1ecc6299db9ec823\bevy_tasks-0.8.1\src\task_pool.rs:173
8: core::future::from_generator::impl$1::poll<enum2$<bevy_tasks::task_pool::impl$1::scope::closure$0::async_block_env$0<bevy_ecs::schedule::executor_parallel::impl$1::run_systems::closure_env$0,tuple$<> > > >
at /rustc/57f097ea25f2c05f424fc9b9dc50dbd6d399845c\library\core\src\future\mod.rs:91
9: core::future::future::impl$1::poll<ref_mut$<dyn$<core::future::future::Future<assoc$<Output,alloc::vec::Vec<tuple$<>,alloc::alloc::Global> > > > > >
at /rustc/57f097ea25f2c05f424fc9b9dc50dbd6d399845c\library\core\src\future\future.rs:124
10: async_executor::impl$9::spawn::async_block$0<alloc::vec::Vec<tuple$<>,alloc::alloc::Global>,core::pin::Pin<ref_mut$<dyn$<core::future::future::Future<assoc$<Output,alloc::vec::Vec<tuple$<>,alloc::alloc::Global> > > > > > >
at C:\Users\Anonymous.cargo\registry\src\github.com-1ecc6299db9ec823\async-executor-1.4.1\src\lib.rs:376
11: core::future::from_generator::impl$1::poll<enum2$<async_executor::impl$9::spawn::async_block_env$0<alloc::vec::Vec<tuple$<>,alloc::alloc::Global>,core::pin::Pin<ref_mut$<dyn$<core::future::future::Future<assoc$<Output,alloc::vec::Vec<tuple$<>,alloc::alloc
at /rustc/57f097ea25f2c05f424fc9b9dc50dbd6d399845c\library\core\src\future\mod.rs:91
12: async_task::raw::RawTask<core::future::from_generator::GenFuture<enum2$<async_executor::impl$9::spawn::async_block_env$0<alloc::vec::Vec<tuple$<>,alloc::alloc::Global>,core::pin::Pin<ref_mut$<dyn$<core::future::future::Future<assoc$<Output,alloc::vec::Vec
at C:\Users\Anonymous.cargo\registry\src\github.com-1ecc6299db9ec823\async-task-4.3.0\src\raw.rs:511
13: async_task::runnable::Runnable::run
at C:\Users\Anonymous.cargo\registry\src\github.com-1ecc6299db9ec823\async-task-4.3.0\src\runnable.rs:309
14: async_executor::Executor::try_tick
at C:\Users\Anonymous.cargo\registry\src\github.com-1ecc6299db9ec823\async-executor-1.4.1\src\lib.rs:181
15: async_executor::LocalExecutor::try_tick
at C:\Users\Anonymous.cargo\registry\src\github.com-1ecc6299db9ec823\async-executor-1.4.1\src\lib.rs:405
16: bevy_tasks::task_pool::impl$1::scope::closure$0<bevy_ecs::schedule::executor_parallel::impl$1::run_systems::closure_env$0,tuple$<> >
at C:\Users\Anonymous.cargo\registry\src\github.com-1ecc6299db9ec823\bevy_tasks-0.8.1\src\task_pool.rs:202
17: std::thread::local::LocalKey<async_executor::LocalExecutor>::try_with<async_executor::LocalExecutor,bevy_tasks::task_pool::impl$1::scope::closure_env$0<bevy_ecs::schedule::executor_parallel::impl$1::run_systems::closure_env$0,tuple$<> >,alloc::vec::Vec<tu
at /rustc/57f097ea25f2c05f424fc9b9dc50dbd6d399845c\library\std\src\thread\local.rs:446
18: std::thread::local::LocalKey<async_executor::LocalExecutor>::with<async_executor::LocalExecutor,bevy_tasks::task_pool::impl$1::scope::closure_env$0<bevy_ecs::schedule::executor_parallel::impl$1::run_systems::closure_env$0,tuple$<> >,alloc::vec::Vec<tuple$
at /rustc/57f097ea25f2c05f424fc9b9dc50dbd6d399845c\library\std\src\thread\local.rs:422
19: bevy_tasks::task_pool::TaskPool::scope<bevy_ecs::schedule::executor_parallel::impl$1::run_systems::closure_env$0,tuple$<> >
at C:\Users\Anonymous.cargo\registry\src\github.com-1ecc6299db9ec823\bevy_tasks-0.8.1\src\task_pool.rs:148
20: bevy_ecs::schedule::executor_parallel::impl$1::run_systems
at C:\Users\Anonymous.cargo\registry\src\github.com-1ecc6299db9ec823\bevy_ecs-0.8.1\src\schedule\executor_parallel.rs:126
21: bevy_ecs::schedule::stage::impl$1::run
at C:\Users\Anonymous.cargo\registry\src\github.com-1ecc6299db9ec823\bevy_ecs-0.8.1\src\schedule\stage.rs:884
22: bevy_ecs::schedule::Schedule::run_once
at C:\Users\Anonymous.cargo\registry\src\github.com-1ecc6299db9ec823\bevy_ecs-0.8.1\src\schedule\mod.rs:342
23: bevy_ecs::schedule::impl$1::run
at C:\Users\Anonymous.cargo\registry\src\github.com-1ecc6299db9ec823\bevy_ecs-0.8.1\src\schedule\mod.rs:360
24: bevy_app::app::App::update
at C:\Users\Anonymous.cargo\registry\src\github.com-1ecc6299db9ec823\bevy_app-0.8.1\src\app.rs:119
25: library::controller::test::impl$3::run::closure$4<2>
at .\src\controller\test.rs:208
26: core::iter::traits::iterator::Iterator::for_each::call::closure$0<usize,library::controller::test::impl$3::run::closure_env$4<2> >
at /rustc/57f097ea25f2c05f424fc9b9dc50dbd6d399845c\library\core\src\iter\traits\iterator.rs:828
27: core::iter::traits::iterator::Iterator::fold<core::ops::range::Range,tuple$<>,core::iter::traits::iterator::Iterator::for_each::call::closure_env$0<usize,library::controller::test::impl$3::run::closure_env$4<2> > >
at /rustc/57f097ea25f2c05f424fc9b9dc50dbd6d399845c\library\core\src\iter\traits\iterator.rs:2414
28: core::iter::traits::iterator::Iterator::for_each<core::ops::range::Range,library::controller::test::impl$3::run::closure_env$4<2> >
at /rustc/57f097ea25f2c05f424fc9b9dc50dbd6d399845c\library\core\src\iter\traits\iterator.rs:831
29: library::controller::test::impl$3::run<2>
at .\src\controller\test.rs:208
30: library::controller::test::forward::forward_query_forward_to_too_many_ticks
at .\src\controller\test\forward.rs:94
31: library::controller::test::forward::forward_query_forward_to_too_many_ticks::closure$0
at .\src\controller\test\forward.rs:93
32: core::ops::function::FnOnce::call_once<library::controller::test::forward::forward_query_forward_to_too_many_ticks::closure_env$0,tuple$<> >
at /rustc/57f097ea25f2c05f424fc9b9dc50dbd6d399845c\library\core\src\ops\function.rs:251
33: core::ops::function::FnOnce::call_once
at /rustc/57f097ea25f2c05f424fc9b9dc50dbd6d399845c/library\core\src\ops\function.rs:251
note: Some details are omitted, run with RUST_BACKTRACE=full for
a verbose backtrace.
test controller::test::forward::forward_query_forward_to_too_many_ticks - should panic ... FAILED

failures:

---- controller::test::forward::forward_query_forward_to_too_many_ticks stdout ----
note: panic did not contain expected string
panic message: "task has failed",
expected substring: "invalid progress_query: QueryOutOfRange(2..=3)"

failures:
controller::test::forward::forward_query_forward_to_too_many_ticks

test result: FAILED. 0 passed; 1 failed; 0 ignored; 0 measured; 5 filtered out; finished in 0.60s

Use portable-atomic

That would make it possible to use this crate on bare-metal targets that do not natively support atomics.

atomic-waker already does that, so why not here either?

Task not get rescheduled randomly

Code for reproduce:

use std::{
    cell::RefCell,
    thread::{self, sleep},
    time::{Duration, Instant},
};

use async_task::{Runnable, Task};
use futures::{channel::mpsc, Future, StreamExt};

thread_local! {
    static RUNNABLE_LIST: RefCell<Vec<Runnable>> = RefCell::new(Vec::new());
}

fn poll_once() -> usize {
    let runnable_list: Vec<_> =
        RUNNABLE_LIST.with(|runnable_list| runnable_list.borrow_mut().drain(..).collect());
    let count = runnable_list.len();
    for runnable in runnable_list {
        let waker = runnable.waker();
        runnable.run();
        waker.wake();
    }
    count
}

fn spawn(task: impl Future<Output = ()> + Send + 'static) -> Task<()> {
    let (runnable, handle) = async_task::spawn(task, |runnable| {
        RUNNABLE_LIST.with(|runnable_list| runnable_list.borrow_mut().push(runnable));
    });
    runnable.schedule();
    handle
}

fn main() {
    let (tx, mut rx) = mpsc::unbounded();
    let handle = spawn(async move {
        loop {
            let _: () = rx.next().await.unwrap();
        }
    });

    thread::spawn(move || loop {
        sleep(Duration::from_millis(20));
        tx.unbounded_send(()).unwrap();
    });

    let start = Instant::now();
    while Instant::now() - start < Duration::from_millis(1 * 1000) {
        // assert_eq!(poll_once(), 1);
        poll_once();
    }
    assert_eq!(poll_once(), 1);
    drop(handle);
}

I understand wake up runnable immediately unconditionally will waste a lot of CPU, but I have a good reason to do it in my case.

The problem here is poll_once returns 0 after random duration, and probably cannot last for 1 second. That means the task disappeared in the system at some instant. According to my observation it never appear again after that, even when channel tx send new message.

One interesting point is that when simplify async task into futures::pending() it never disappear. Is there anything I did incorrect?

Run `poll_drop` destructors when invoking `Task::cancel`

The Task::cancel API is used to cancel a task through the remote handle. The Drop::poll_drop_ready RFC proposes a mechanism through which async destructors can be added to the language. I'd like for Task::cancel (or a possibly a new API) to provide a way to cancel a task and provide a chance for the destructors to run.

Example Usage

In Tide we allow people to spin up multiple Server instances in the same application. One "async drop" becomes available what we'd like to do is make it so when Server is dropped it stops accepting new connections, and then allows the existing connections to finish responding. This can then be used to e.g. reload configuration, or gracefully exit the application pending an upgrade.

The way to run multiple Tide servers is by using several instances of async_std::task::spawn. I'm not too sure yet about whether concurrency adapters such as join and race need to be "Async Drop" aware as well. But it seems that at the very least Task::cancel should be possible to be invoked.

Implementation

The way to run "async drop" is by passing the future into an async context, and then dropping it. For example by passing it into a function with the following signature:

#[allow(dead_code)]
async fn async_drop<T>(x: T) {}

The way Task::cancel would work is when "cancel" is called, it would move the internal future into an async context, and then await that future. I'm unsure whether that requires an invocation of Box::pin(), but it's probably not unacceptable for cancellation to require an extra allocation.

Future developments

Another case I think is worth handling is:

let a = task::spawn(async { /* something */ });
let b = task::spawn(async { /* something */ });
a.race(b).await; // async destructors should run for the task that was dropped

There is not quite a precedent for this in thread::JoinHandle since synchronous code doesn't have suspension points, so intermediate cancellation isn't really a thing. But I think people would be surprised if async destructors didn't run in this setup, so I think it's at least worth considering if we can make this work.

Add blocking wrapper

Suggesting we add a blocking::Task, a thin blocking wrapper of Task that's more convenient to use in the blocking code. API:

pub mod blocking {
    pub struct Task<T>(crate::Task<T>);

    impl<T> Task<T> {
        fn cancel(self);
        fn detach(self) -> Option<T>;
        fn wait(self) -> T;
    }

    impl<T> Deref for Task<T> {
        type Target = crate::Task<T>;
        ...
    }

    impl From<crate::Task<T>> for Task<T>;
    impl From<Task<T>> for crate::Task<T>;
}

For the implementation, we will either want to depend on pollster for its block_on or async-io, which IMO would be a bit of an overkill so if we go for async-io, we probably want to feature gate this.

How can I clone a `Task`

I want to create a Task object and retry it later if the Task failed. Is there any way I can do this?

Build fails on nightly due to type_length_limit error

On version 3.0.0 with a recent nightly I get:

error: reached the type-length limit while instantiating `async_task::task::spawn::<std::f...ling::WorkStealingExecutor], ()>`
  --> ../../out/core.x64/../../third_party/rust_crates/vendor/async-task/src/task.rs:50:1
   |
50 | / pub fn spawn<F, R, S, T>(future: F, schedule: S, tag: T) -> (Task<T>, JoinHandle<R, T>)
51 | | where
52 | |     F: Future<Output = R> + Send + 'static,
53 | |     R: Send + 'static,
...  |
73 | |     (task, handle)
74 | | }
   | |_^
   |
   = note: consider adding a `#![type_length_limit="1896049"]` attribute to your crate

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.