Comments (8)
Hey @epwalsh I have a working mechanism to push/pull messages to/from redis! I still need to implement things like ack, retry and close. But I wanted you to take a look at the implementation so far. It can be ran from the example celery app too.
Issues / concerns:
-
The main consume loop only seems to be able to pick up tasks that were produced before but not after its launch. I cannot seem to find the correct place to invoke the waker. Or maybe it has nothing to do with waker but the objects returned from Stream's next method.
-
Is it correct that we cannot do async work inside the
DeliveryStream
? I currently clone theredis::Client
for each channel because MultiplexeConnection works for async calls only. Not sure if cloningredis::Client
creates a new connection each time.
Code at redis-broker branch
from rusty-celery.
Depends on async pub sub: redis-rs/redis-rs#287. This is now merged but not released
UPDATE: Actually see
from rusty-celery.
I would like to take a stab at this. I can try mimicking the way amqp broker is implemented.
from rusty-celery.
@Abhishek8394 that would be much appreciated!
from rusty-celery.
@epwalsh Does this look like the correct way to do it?
- Create a redis list where list key is queue name.
Each queue also gets a correspondingJust one*-processing
list.processing
list.- On
consume(queue)
it expects a future stream.- loop on a named queue, move item from there into
processing
list.
- loop on a named queue, move item from there into
- On
ack
- Remove item from
*-processing
processing
queue
- Remove item from
- On
retry
- Push item back in
named queue.?*-processing
queue
- Push item back in
- The role of
*_channel
is not clear to me, it seems like an AMQP thing so far. The MultiplexedConnection would do the role of both? Maybe create dummy channel that wraps around multiplexed connection?
Broker
struct RedisBroker{
connection: MultiplexedConnection,
consume_channel: Something?,
produce_channel: Something?,
queues: HashSet<String>, // Use queue names as redis list key
}
from rusty-celery.
Hey @Abhishek8394, I'm really not sure the best way to do it, but I think the best source to look at would be the Python equivalent from the kombu
package: https://github.com/celery/kombu/blob/master/kombu/transport/redis.py
Sorry I can't be more help at the moment, I'm wrapped up in another project for the next week or two
from rusty-celery.
So it seems kombu uses n
lists for n
queues and a single processing queue. They also use a hashmap to keep track of "processing" jobs. I think we can do that either with redis::Pipeline
or brpoplpush
. However an issue I ran into is that the redis library expects a mutable connection object whereas the Broker
trait provides immutable self
refs. Two potential solutions below.
- Use
RefCell
to wrap connections. I tried this but theSend
trait is not satisfied for something like&mut *conn.borrow_mut()
. So not sure how to proceed further with this. - Use unsafe code, haven't tried this but there should be a better way to do this.
from rusty-celery.
Hey @Abhishek8394, I'd like to stick with the "interior mutability" pattern that the Broker
currently has, i.e. keep the self
refs immutable. So instead of RefCell
you could use Mutex
. See https://doc.rust-lang.org/book/ch16-03-shared-state.html#similarities-between-refcelltrct-and-mutextarct, for example.
from rusty-celery.
Related Issues (20)
- rustls support
- Add task options that enable the user to send negative acknowledgements and rely on broker's retry mechanisms HOT 2
- Prefetch not working properly
- No connection was performed when setting `broker_connection_retry` to `false`
- How do I run the main method in beat_app.rs in a thread
- Cannot connect to broker with x-max-priority set HOT 2
- How to get TaskResult from AsyncResult? HOT 1
- New release? HOT 3
- Call for new features HOT 5
- BrokerBuilder is not Send HOT 6
- Socket was readable but we read 0. This usually means that the connection is half closed this mark it as broken HOT 7
- Celery task blocking main thread in actix web HOT 3
- does the RedisBroker support password
- WARN celery::broker::redis:Setting heartbeat on redis broker has no effect on anything HOT 2
- support streams protocol
- is it possible to define the log output level HOT 1
- Postgres as a broker/backend
- Limiting the rate of consuming HOT 4
- Delivery acknowledgement timeout on best practices
- Flower integration HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from rusty-celery.