Giter VIP home page Giter VIP logo

rusty-celery's Introduction




A Rust implementation of Celery for producing and consuming asynchronous tasks with a distributed message queue.


Build License Crates Docs Help wanted


We welcome contributions from everyone regardless of your experience level with Rust. For complete beginners, see HACKING_QUICKSTART.md.

If you already know the basics of Rust but are new to Celery, check out the Rusty Celery Book or the original Python Celery Project.

Quick start

Define tasks by decorating functions with the task attribute.

use celery::prelude::*;

#[celery::task]
fn add(x: i32, y: i32) -> TaskResult<i32> {
    Ok(x + y)
}

Create an app with the app macro and register your tasks with it:

let my_app = celery::app!(
    broker = AMQPBroker { std::env::var("AMQP_ADDR").unwrap() },
    tasks = [add],
    task_routes = [
        "*" => "celery",
    ],
).await?;

Then send tasks to a queue with

my_app.send_task(add::new(1, 2)).await?;

And consume tasks as a worker from a queue with

my_app.consume().await?;

Examples

The examples/ directory contains:

Prerequisites

If you already have an AMQP broker running you can set the environment variable AMQP_ADDR to your broker's URL (e.g., amqp://localhost:5672//, where the second slash at the end is the name of the default vhost). Otherwise simply run the helper script:

./scripts/brokers/amqp.sh

This will download and run the official RabbitMQ image (RabbitMQ is a popular AMQP broker).

Run the examples

Run Rust Celery app

You can consume tasks with:

cargo run --example celery_app consume

And you can produce tasks with:

cargo run --example celery_app produce [task_name]

Current supported tasks for this example are: add, buggy_task, long_running_task and bound_task

Run Python Celery app

Similarly, you can consume or produce tasks from Python by running

python examples/celery_app.py consume [task_name]

or

python examples/celery_app.py produce

You'll need to have Python 3 installed, along with the requirements listed in the requirements.txt file. You'll also have to provide a task name. This example implements 4 tasks: add, buggy_task, long_running_task and bound_task

Run Rust Beat app

You can start the Rust beat with:

cargo run --example beat_app

And then you can consume tasks from Rust or Python as explained above.

Road map and current state

โœ… = Supported and mostly stable, although there may be a few incomplete features.
โš ๏ธ = Partially implemented and under active development.
๐Ÿ”ด = Not supported yet but on-deck to be implemented soon.

Core

Status Tracking
Protocol โš ๏ธ
Producers โœ…
Consumers โœ…
Brokers โœ…
Beat โœ…
Backends ๐Ÿ”ด
Baskets ๐Ÿ”ด

Brokers

Status Tracking
AMQP โœ…
Redis โœ…

Backends

Status Tracking
RPC ๐Ÿ”ด
Redis ๐Ÿ”ด

rusty-celery's People

Contributors

abhishek8394 avatar ajesipow avatar al-jshen avatar azhicham avatar boylede avatar crash-g avatar dependabot-preview[bot] avatar dependabot[bot] avatar epwalsh avatar fourbytes avatar javednissar avatar johnedmonds avatar keruspe avatar mkucijan avatar morenol avatar palfrey avatar pastre avatar peter-formlogic avatar pidelport avatar rksm avatar tjhall13 avatar xulaus avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

rusty-celery's Issues

Do we need to handle CPU bound tasks differently?

Is the tokio multi-threaded work-stealing runtime enough?

One option is to handle CPU vs IO bound tasks differently by using spawn_blocking or another thread pool instead of spawn for CPU bound tasks.

Something to think about though is that our timeout functionality doesn't work with blocking tasks, i.e. a blocking task could run forever. This is a problem even if we were to use spawn_blocking, because even though the timeout would return an error after the duration was exceeded, the thread that was spawned could still run forever in the background.

A second option is to completely switch over to async-std: https://async.rs/blog/stop-worrying-about-blocking-the-new-async-std-runtime/ (NOTE: this isn't shipped yet as of the current version 1.4)? We would still have the same issue with timeouts though.

pass additional context to task on_failure / on_success callbacks

Right now the task callback methods on_failure and on_success only take the error and returned value, respectively, as arguments. However it might be useful to provide additional information to these callbacks such as the task ID. We could pass this information in a context struct like this:

async fn on_failure(&mut self, ctx: &Context, err: &Error);

and

async fun on_success(&mut self, ctx: &Context, returned: &Self::Returns);

Document best practices

For example,

  • Tasks should never panic. Return an error with ? instead.
  • Task timeouts only work when task is non-blocking. If your wrote a task with an infinite loop, the whole worker would freeze indefinitely.
  • Error handling should be done by returned either ErrorKind::ExpectedError or ErrorKind::UnexpectedError
  • Use non blocking functions for IO, such as those from tokio
  • Tuning prefetch_count: For CPU-bound tasks, set prefetch_count to num CPUs. For IO-bound tasks, set much, much higher. Generally try to separate CPU bound and IO bound tasks to different queues.
  • Each task should do one thing and do it well. Ideally there should only be at most one possible point of failure in a task.

Support "baskets": long term scheduler backends

The current system (and essentially how Python Celery does it) for handling tasks with a far-off ETA is to have workers consume them as usual but use the delay_for async function to delay execution until ETA is reached, which could be a while.

In the meantime that task is still taking up resources on a worker and more importantly is overriding the backpressure mechanism - the prefetch_count configuration setting - because in order for the worker to continue consuming other tasks while it is holding onto delayed tasks it needs to tell the broker to increase its channel's prefetch_count behind the scenes.

If it didn't do this and the worker kept receiving tasks with a far-off ETA, the initial prefetch_count would soon be reached and so the broker would stop sending tasks to this worker. Absent of more workers being spun up, this would cause the broker to pile up with messages since it has no where to send them. So no new tasks (even those without a future ETA) could be executed until the worker executes some of the tasks it is holding onto.

So we choose the lesser of the two evils: increasing the prefetch_count. In other words, the worker says "hey, thanks for this task, but I can't do anything with it right now so want to just give me another one?". And that's all fine unless there are a ton of tasks with far-off ETA, in which case the worker will keep taking in more of these until it runs out of memory.

The solution to this is to offload those tasks - tasks with a far-off ETA - somewhere else. Someplace where the chance of running out of memory or storage space is a lot lower, and where the cost of additional memory or storage space is a lot cheaper.

Any traditional database works well for this as long as you can index by the ETA, which you should be able to do with pretty much any database since you can represent the ETA by an integer or float. Then workers (or a dedicated worker solely for this purpose) just need to occasionally poll the database for tasks that are due soon.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.