Giter VIP home page Giter VIP logo

Comments (8)

Abhishek8394 avatar Abhishek8394 commented on June 12, 2024 1

Hey @epwalsh I have a working mechanism to push/pull messages to/from redis! I still need to implement things like ack, retry and close. But I wanted you to take a look at the implementation so far. It can be ran from the example celery app too.

Issues / concerns:

  • The main consume loop only seems to be able to pick up tasks that were produced before but not after its launch. I cannot seem to find the correct place to invoke the waker. Or maybe it has nothing to do with waker but the objects returned from Stream's next method.

  • Is it correct that we cannot do async work inside the DeliveryStream ? I currently clone the redis::Client for each channel because MultiplexeConnection works for async calls only. Not sure if cloning redis::Client creates a new connection each time.

Code at redis-broker branch

from rusty-celery.

epwalsh avatar epwalsh commented on June 12, 2024

Depends on async pub sub: redis-rs/redis-rs#287. This is now merged but not released

UPDATE: Actually see

from rusty-celery.

Abhishek8394 avatar Abhishek8394 commented on June 12, 2024

I would like to take a stab at this. I can try mimicking the way amqp broker is implemented.

from rusty-celery.

epwalsh avatar epwalsh commented on June 12, 2024

@Abhishek8394 that would be much appreciated!

from rusty-celery.

Abhishek8394 avatar Abhishek8394 commented on June 12, 2024

@epwalsh Does this look like the correct way to do it?

  • Create a redis list where list key is queue name.
  • Each queue also gets a corresponding *-processing list. Just one processing list.
  • On consume(queue) it expects a future stream.
    • loop on a named queue, move item from there into processing list.
  • On ack
    • Remove item from *-processing processing queue
  • On retry
    • Push item back in *-processing queue named queue.?
  • The role of *_channel is not clear to me, it seems like an AMQP thing so far. The MultiplexedConnection would do the role of both? Maybe create dummy channel that wraps around multiplexed connection?

Broker

struct RedisBroker{
    connection: MultiplexedConnection, 
    consume_channel: Something?,
    produce_channel: Something?,
    queues: HashSet<String>, // Use queue names as redis list key
}

from rusty-celery.

epwalsh avatar epwalsh commented on June 12, 2024

Hey @Abhishek8394, I'm really not sure the best way to do it, but I think the best source to look at would be the Python equivalent from the kombu package: https://github.com/celery/kombu/blob/master/kombu/transport/redis.py

Sorry I can't be more help at the moment, I'm wrapped up in another project for the next week or two

from rusty-celery.

Abhishek8394 avatar Abhishek8394 commented on June 12, 2024

So it seems kombu uses n lists for n queues and a single processing queue. They also use a hashmap to keep track of "processing" jobs. I think we can do that either with redis::Pipeline or brpoplpush. However an issue I ran into is that the redis library expects a mutable connection object whereas the Broker trait provides immutable self refs. Two potential solutions below.

  1. Use RefCell to wrap connections. I tried this but the Send trait is not satisfied for something like &mut *conn.borrow_mut(). So not sure how to proceed further with this.
  2. Use unsafe code, haven't tried this but there should be a better way to do this.

code here

from rusty-celery.

epwalsh avatar epwalsh commented on June 12, 2024

Hey @Abhishek8394, I'd like to stick with the "interior mutability" pattern that the Broker currently has, i.e. keep the self refs immutable. So instead of RefCell you could use Mutex. See https://doc.rust-lang.org/book/ch16-03-shared-state.html#similarities-between-refcelltrct-and-mutextarct, for example.

from rusty-celery.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.