Giter VIP home page Giter VIP logo

rusty-celery's Issues

pass additional context to task on_failure / on_success callbacks

Right now the task callback methods on_failure and on_success only take the error and returned value, respectively, as arguments. However it might be useful to provide additional information to these callbacks such as the task ID. We could pass this information in a context struct like this:

async fn on_failure(&mut self, ctx: &Context, err: &Error);

and

async fun on_success(&mut self, ctx: &Context, returned: &Self::Returns);

Document best practices

For example,

  • Tasks should never panic. Return an error with ? instead.
  • Task timeouts only work when task is non-blocking. If your wrote a task with an infinite loop, the whole worker would freeze indefinitely.
  • Error handling should be done by returned either ErrorKind::ExpectedError or ErrorKind::UnexpectedError
  • Use non blocking functions for IO, such as those from tokio
  • Tuning prefetch_count: For CPU-bound tasks, set prefetch_count to num CPUs. For IO-bound tasks, set much, much higher. Generally try to separate CPU bound and IO bound tasks to different queues.
  • Each task should do one thing and do it well. Ideally there should only be at most one possible point of failure in a task.

Support "baskets": long term scheduler backends

The current system (and essentially how Python Celery does it) for handling tasks with a far-off ETA is to have workers consume them as usual but use the delay_for async function to delay execution until ETA is reached, which could be a while.

In the meantime that task is still taking up resources on a worker and more importantly is overriding the backpressure mechanism - the prefetch_count configuration setting - because in order for the worker to continue consuming other tasks while it is holding onto delayed tasks it needs to tell the broker to increase its channel's prefetch_count behind the scenes.

If it didn't do this and the worker kept receiving tasks with a far-off ETA, the initial prefetch_count would soon be reached and so the broker would stop sending tasks to this worker. Absent of more workers being spun up, this would cause the broker to pile up with messages since it has no where to send them. So no new tasks (even those without a future ETA) could be executed until the worker executes some of the tasks it is holding onto.

So we choose the lesser of the two evils: increasing the prefetch_count. In other words, the worker says "hey, thanks for this task, but I can't do anything with it right now so want to just give me another one?". And that's all fine unless there are a ton of tasks with far-off ETA, in which case the worker will keep taking in more of these until it runs out of memory.

The solution to this is to offload those tasks - tasks with a far-off ETA - somewhere else. Someplace where the chance of running out of memory or storage space is a lot lower, and where the cost of additional memory or storage space is a lot cheaper.

Any traditional database works well for this as long as you can index by the ETA, which you should be able to do with pretty much any database since you can represent the ETA by an integer or float. Then workers (or a dedicated worker solely for this purpose) just need to occasionally poll the database for tasks that are due soon.

Do we need to handle CPU bound tasks differently?

Is the tokio multi-threaded work-stealing runtime enough?

One option is to handle CPU vs IO bound tasks differently by using spawn_blocking or another thread pool instead of spawn for CPU bound tasks.

Something to think about though is that our timeout functionality doesn't work with blocking tasks, i.e. a blocking task could run forever. This is a problem even if we were to use spawn_blocking, because even though the timeout would return an error after the duration was exceeded, the thread that was spawned could still run forever in the background.

A second option is to completely switch over to async-std: https://async.rs/blog/stop-worrying-about-blocking-the-new-async-std-runtime/ (NOTE: this isn't shipped yet as of the current version 1.4)? We would still have the same issue with timeouts though.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.