Giter VIP home page Giter VIP logo

Comments (74)

AlJohri avatar AlJohri commented on May 18, 2024 7

Hi, does anyone have an example of a gevent worker or thread worker that works with the latest rq?

from rq.

orihomie avatar orihomie commented on May 18, 2024 3

Any updates on this?
Personally I'd prefer to use built-in multiprocessing Pool, but it gives me "cannot pickle thread.lock object"

from rq.

sylvinus avatar sylvinus commented on May 18, 2024 1

+1

from rq.

bsergean avatar bsergean commented on May 18, 2024 1

I would love a multiprocessing based worker ... right now I'm thinking about starting multiple services and all of them just start one worker. That works too, but it's more complicated to configure.

from rq.

samuelcolvin avatar samuelcolvin commented on May 18, 2024 1

[shameless self promotion]

arq is build using asyncio and therefore allows hundreds of io-intensive jobs to run concurrently.

from rq.

rizplate avatar rizplate commented on May 18, 2024 1

Our use case needs concurrent workers and not asynchronous or gevent . We were using a gevent based based framework before and it was creating issues with all non-gevent world.

If anything, I would prefer multiprocessing .... as an option, and not gevent/asyncio ..

from rq.

pcdinh avatar pcdinh commented on May 18, 2024

Yes, it would be very nice feature.

from rq.

nvie avatar nvie commented on May 18, 2024

@kennethreitz That's definitely a plan. Are you suggesting to replace the whole pop-then-fork model inside the worker? gevent definitely is promising. Any suggestions on supporting either multiprocessing or gevent?

from rq.

ibeex avatar ibeex commented on May 18, 2024

Since multiprocessing is included in python maybe it should be preferred.

from rq.

nvie avatar nvie commented on May 18, 2024

Both have their areas of usefulness. Roughly, multiprocessing is better at CPU-bound jobs, while gevent will profit from I/O heavy jobs. Building a clean API (so only little documentation is required) is the hardest part.

from rq.

kennethreitz avatar kennethreitz commented on May 18, 2024

Multiprocessing is a good default.

It should be pretty strait forward, I think. Celery and gunicorn both abstract it to simple configuration strings: 'gevent' and 'eventlet'.

from rq.

hcarvalhoalves avatar hcarvalhoalves commented on May 18, 2024

Please consider that multiprocessing is not functional on BSD platforms. If you go down that route, you can fallback to threading (and be stuck with the GIL) like this:

hcarvalhoalves/duplicity@cdc35e5

from rq.

kennethreitz avatar kennethreitz commented on May 18, 2024

Multiprocessing works fine on Windows.

from rq.

jgelens avatar jgelens commented on May 18, 2024

Currently working on a gevent worker. Other async libs shouldn't be too difficult either once this is done.

from rq.

anoopk6 avatar anoopk6 commented on May 18, 2024

When will this be available ? Is there a beta version which can be tried ?

from rq.

jgelens avatar jgelens commented on May 18, 2024

It's in my fork for now https://github.com/jgelens/rq/
Please check the examples dir.

from rq.

anoopk6 avatar anoopk6 commented on May 18, 2024

Thanks. I went through the code and examples. I have a few questions.

  • How to select gevent type workers while starting rqworker ? I didn't see any cmd line options to switch to gevent.
  • From worker.py it seems the slaves are using a semaphore(_slave_semaphore). Why is this required?. Is it only to get the running state of greenlets ?
  • Is there a plan to support eventlet also ?
  • Will this be merged to the official one ?

from rq.

jgelens avatar jgelens commented on May 18, 2024
  1. bin/rqworker doesn't have an option for this (yet). For now import the GeventWorker instead of Worker to use Gevent.
  2. The samephore is indeed only there to get the running state, do you know an easier or better way?
  3. No plan currently, but it's easy to implement once the GeventWorker is done.
  4. I think so, @nvie will decide once the gevent worker is fully implemented.

from rq.

anoopk6 avatar anoopk6 commented on May 18, 2024

As greenlets uses co-operative scheduling(non preemptible), I think for the state use-case, simple integer incr/decr should do.

from rq.

nvie avatar nvie commented on May 18, 2024

Oh yes, this will be pulled into the main repo—definitely! I still need to take a look at it, though, but I've been too busy lately.

from rq.

anoopk6 avatar anoopk6 commented on May 18, 2024

Thanks, any time lines planned for this feature ?

from rq.

nvie avatar nvie commented on May 18, 2024

Nothing planned. If I don't see any problems with it for future development and it works reliably, I pull. As said, I simply haven't had the time to play with it, yet.

from rq.

nicholasaiello avatar nicholasaiello commented on May 18, 2024

I ran some tests using the existing Worker and GeventWorker, and found some impressive results. My workers are running a simple database update to a MongoDB replica set, and for 1000 iterations the completion time went from ~3 minutes to ~8 seconds.

Any plans to pull this in sometime in the near future?

from rq.

nvie avatar nvie commented on May 18, 2024

I acknowledge that @jgelens' work on the gevent workers is really interesting, and I would love to have it available in RQ. However, I'm still a bit hesitant to pull, since the implementation currently isn't really DRY and if changes are made to the core worker logic, the changes have to be made to both the naïve and the gevent worker implementations. The end result is increased complexity and I wouldn't be confident anymore to make changes to the worker core.

I still would love to pull the concurrent worker stuff in, but I think the core of the worker implementation needs some layer of abstraction (think BaseWorker), of which both the Worker and the GeventWorker could subclass. My gut says that if we come up with the right abstraction, both subclasses could have a really simple implementation and big changes would only have to happen to BaseWorker.

To summarise, I think Jeffrey's work is excellent, and it's definitely not his fault that I'm not pulling yet. It's just that the current RQ implementation needs some refactoring in order to support gevent elegantly.

from rq.

selwin avatar selwin commented on May 18, 2024

We currently already have regular and horse type workers. Instead of making a BaseWorker, I think we should make a new Horse class which handles job dequeues and executions, this can then be subclassed to make other types of horses.

Very rough pseudo code to demonstrate what I have in mind (don't even know if this makes sense):


class Horse(object):

    def fetch_job(self):
        return Queue.dequeue_any(self.queues, blocking=True,
                    connection=self.connection)

    def perform(self, job):
        result = job.perform()
        return result

    def work(self):
        while True: # This loop is for regular worker, a Gevent one would obviously be different
            job = self.fetch_job()
            result = self.execute(job)


class Worker(object):

    def dispatch_horses(self, type, *args, **kwargs):
        # start horses here
        Horse(*args, **kwargs).work()

    def work(self, burst=False, type=None, *args, **kwargs):
        # ...
        self.dispatch_horses(type=type, *args, **kwargs)
        # ...

So essentially Worker would just be in charge of starting, monitoring and stopping the Horses (so perhaps the terminologies could be changed here since worker is typically in charge of executing the jobs).

Thoughts?

from rq.

nvie avatar nvie commented on May 18, 2024

Thanks for your thoughts, @selwin!

Currently, the worker is responsible for dequeueing and unpickling the job function and the horse is responsible for execution (in an isolated context). The GeventWorker uses a different approach: the worker only spawns slaves and just waits until they're all finished. The slaves themselves are responsible for both dequeueing/unpickling and execution. The latter is not due to limitations of gevent, however. It's just implemented that way.

I've stated this before: The main reason for the current implementation has to do with stability. When you spawn a child process (with fork(), or multiprocessing, or whatever) you get an isolated execution context, which has a few nice benefits. Some of which are:

  1. If a process crashes (by a segfault in a C module for example), only the child crashes;
  2. Additionally, the worker will always be responsive and can easily kill the child after a time out;
  3. Also, memory leaks caused in the child can never affect the main worker. The child is killed after every job, so memory should never grow, even when running rqworker for long periods of time.

These characteristics I'm not willing to let go.

Therefore, I like it that Worker is responsible for the dequeueing loop. I could maybe live with a Horse that's responsible for that, but the main dequeue/unpickle/execute loop should not be fully inside the Horse, as it loses execution isolation (see 3). Two possible implementation scenarios as I see it:

  • Add another level of "spawning" under the Horse, to provide execution isolation;
  • Keep the Horse responsible for what it's doing right now: merely (isolated) execution of jobs. The Worker then stays responsible for collecting work and managing its horses, only now it manages potentially many of them.

My gut feeling says the latter approach is simpler and feels more natural to me. What we need to change, however, is that it should not make the assumption of a single horse and instead support spawning/controlling multiple horses. Besides the aspect of supporting multiple horses, we should also delegate the actual creation and managing of them to a subclass, which knows about the implementation details for doing so (e.g. fork(), multiprocessing, gevent).

What we might need is a pool of horses (?), which can be size=1 for the default (forking) worker. The Worker loop could then be:

while True:
    result = Queue.dequeue_any(self.queues, wait_for_job=True, ...)
    ...
    job, queue = result
    self.spawn_horse(job)

The spawn_horse() method would then spawn a horse to execute the given job, using a free pool slot. The nice thing about pools is that they are auto-managed (idle pool members are reused, and dying pool members are replaced).

Provided we use pool semantics to dispatch jobs to horses, and even use that for the simplest case (our current single-horse-by-forking), we have an abstraction that could work for every type of concurrency implementation. Threading, multiprocessing and gevent all support the pool model in some way or the other.

The thing that worries me most is how we can keep the signal handling / abortion behaviour the same. My experiences with multiprocessing pools are that there's little control over accessing/killing members explicitly from the managing process. I might need to familiarise myself a bit more with this and see how we would keep bullet 2 in practice. I'm not sure how compatible gevent is with respect to this, too.

More thoughts?

from rq.

nvie avatar nvie commented on May 18, 2024

Oh, wait a minute. I suddenly remember something.

When I first wrote the worker implementation, I did consider supporting a pool for multiple members. This is what was so nasty about it:

  • gevent.pool.spawn() typically blocks when there is no free slot—by that time, work has been read from Redis already, so if you stop the worker while spawn() is blocking, you lose a job;
  • multiprocessing.apply_async() never blocks, and instead buffers work in its internal queue—in effect, there's a chance that all work is read from Redis and gets queued up in the worker process instead. Again, you lose work if the worker is then killed (before the work is carried out).

Thinking about this again, we might need another pool primitive that works with both fork(), multiprocessing, threading, and gevent.

What we might need is something like this:

  • We should be able to "claim" pool members before reading any work from Redis. Those members are spawned (started or reused), initialised and ready to perform work immediately when handed. When no free slots are available, claim() should be blocking;
  • Pool members can be asked to die gracefully after finishing their current job.
  • There's direct access to the pool member's PIDs, in order to forcefully stop their execution (by killing their processes). I'm not sure how this even works with gevent, though. @jgelens, can you shed some light on this? Is force-killing even possible for gevent "children"?

from rq.

jgelens avatar jgelens commented on May 18, 2024

@nvie Killing greenlets is certainly possible.
I'll look into the discussion above later this week and post my thoughts.

from rq.

selwin avatar selwin commented on May 18, 2024

I think making Worker responsible for dequeueing jobs and passing them on to horses is complicated for the reasons @nvie stated above, it needs to know whether there's any horses available for work before running dequeue or you'll risk losing jobs.

With regards to execution isolation, I think it just needs to be implemented within each horse's work method. So for a example (again, only for rough illustration purposes, please be gentle ;)

class Horse(object):

    def warm_shutdown(self):
        # Warm shutdown logic here

    def cold_shutdown(self):
        # Cold shutdown logic

    def fetch_job(self):
        return Queue.dequeue_any(self.queues, blocking=True, connection=self.connection)

    def perform(self, job):
        try:
            result = job.perform()
            job.status = 'finished'
        except:
            job.status = 'failed'
            # handle error
        return result

    def work(self):
        while True: # This loop is for regular worker, a Gevent one would obviously be different
            job = self.fetch_job()
            result = self.execute(job)

class ForkingHorse(Horse):

    def fork_and_perform_job(self, job):
        #some forking action here, calling self.perform(job)

    def work(self):
        while True: # This loop is for regular worker, a Gevent one would obviously be different
            job = self.fetch_job()
            result = self.fork_and_perform_job(job)

class GeventHorse(Horse):

    def __init__(self, *args, **kwargs):
        self.slaves = kwargs.pop('slaves', 1)
        self.slave_workers = []
        self.slave_counter = self.slaves
        super(GeventHorse, self).__init__(*args, **kwargs)

    def work(self, burst=False):
        # Gevent work logic here

class Worker(object):

    def warm_shutdown(self):
        self.horse.warm_shutdown()

    def cold_shutdown(self):
        self.horse.cold_shutdown()

    def dispatch_horses(self, type, *args, **kwargs):
        # start horses here
        Horse(*args, **kwargs).work()

    def work(self, burst=False, type=None, *args, **kwargs):
        # ...
        self.dispatch_horses(type=type, *args, **kwargs)
        # ...

Thoughts?

from rq.

nicholasaiello avatar nicholasaiello commented on May 18, 2024

Why not have the GeventHorse extend Greenlet, so that you can closely manage each process?

class GeventHorse(Horse, Greenlet):

    def __init__(self, *args, **kwargs):
        super(GeventHorse, self).__init__(*args, **kwargs)

    def warm_shutdown(self):
        # wait for this greenlet process to complete
        self.join()

    def cold_shutdown(self):
        # Kill this greenlet process
        self.kill()

    # Horse execute
    def execute(self, burst=False):
        self.start()

    # Greenlet _run
    def _run():
        # Gevent work logic here

from rq.

nvie avatar nvie commented on May 18, 2024

@selwin, I agree that dequeueing jobs from within the horse makes the problem much simpler. So it might not be as bad an idea as I originally thought.

gevent provides great primitives for implementing pools with blocking spawn() calls, which is exactly what we need.

However, after playing a bit with multiprocessing.Pool, I found my gut feeling to be correct: multiprocessing.Pool does not block on apply_async when all children are already busy. Instead, it queues up work internally. See this Gist for a demonstration.

We need to either monkey patch multiprocessing.Pool, subclass it to fix this behaviour, or manually come up with something similar to the way gevent works on this. FYI: I came across this SO thread that discusses something similar. The solution would be so simple, if only multiprocessing would support passing through the maxsize of its internal queue in the Pool() constructor...

from rq.

selwin avatar selwin commented on May 18, 2024

@nicholasaiello I don't have much experience using gevent so yes, it could probably be much improved when we get around to implementing it. Your input will be much appreciated :)

@nvie I think as long as we keep the dequeue and perform within the same process (or child processes), everything should fall in place quite elegantly. I haven't had time to play with real codes but I think something like this would work for horses with multiple slaves:

def dequeue_and_perform_loop(horse, slave):
    while True:
        job = Queue.dequeue_any(horse.queues, blocking=True, connection=self.connection)
        horse.perform_job(job)


class MultiProcessingHorse(Horse):

    def __init__(self, num_processes=1, *args, **kwargs):
        self.slaves = # init slaves
        self.pool = Pool(processes=num_processes)

    def work(self, *args, **kwargs):
        for slave in self.slaves:
            pool.apply_async(dequeue_and_perform_loop, [horse, slave])

I'll try to play around with this over the weekend.

from rq.

nvie avatar nvie commented on May 18, 2024

OK. This is what I'm thinking about currently:

The main worker loop could be reduced to this:

def work(self):
    """Forever, keep spawning children (this is like how a "pre-fork" server works)."""
    while True:
        self.spawn_child()

The function spawn_child() should be implemented in concrete subclasses, one for each supported concurrency technique. It must:

  1. Claim an execution slot, or block until such a free slot is available;
  2. Create a second unit of execution (process/greenlet/thread) and do all the fetch+execute work in there. When done, release the slot from that thread/process/greenlet.

Point 2 is simple. Thought should be given to how we can assure point 1. The blocking behaviour is absolutely crucial since work() will be stuck in an endless loop otherwise.

The gevent implementation is fairly simple: using gevent.pool, and pool.spawn, it natively has the desired blocking spawn behaviour already.

The forking worker implementation could use a semaphore to guard the forking. What's extra nice about this is that we gain the ability to fork multiple times, instead of the current one horse limit. The current RQ (< 0.4) implementation is comparable to a forking worker with a semaphore value of 1 (a simple lock).

For a sample implementation, see: https://gist.github.com/3541415

I'm fairly certain that a ThreadingWorker or MultiprocessingWorker would be easy to do, too.

Now, all that remains is how to fit in the signal handling stuff in there, so we keep the current behaviour no matter what concurrency mechanism is used.

But... I'd say: ✨ progress ✨!

from rq.

selwin avatar selwin commented on May 18, 2024

@nvie I took a stab at porting ForkingWorker from your gist to rq, it's still in a pretty rough state and doesn't actually perform jobs yet but here's what I found from this quick experiment (feel free to correct me if I'm wrong):

  1. Worker subclasses need to implement a few methods like handle_cold_shutdown and is_busy methods to handle SIGINT
  2. Graceful termination a worker instance when a job is in progress is an issue. Unlike the original Worker implementation, the parent process waits doesn't for the child process to finish before doing another BLPOP. Perhaps we can tweak the request_stop method to loop indefinitely and check whether all children are finished before terminating.

Here's the source code https://github.com/selwin/rq/blob/new-worker/rq/new_worker.py and here's how to run it:

from rq.queue import Queue
from rq.new_worker import ForkingWorker
from redis import Redis

redis = Redis()
queue = Queue('default', connection=redis)
worker = ForkingWorker(num_processes=4, queues=[queue], connection=redis)
worker.work()

I'd appreciate any feedback.

from rq.

selwin avatar selwin commented on May 18, 2024

@nvie I just read your comment again for the second time and it seems like I misunderstood how you want it to be implemented. Now that I finally understand, I think the way you suggested would be simpler to implement.

from rq.

nvie avatar nvie commented on May 18, 2024

I'm going to be looking at this really soon! (Fixing some production issues now, first.)

from rq.

nvie avatar nvie commented on May 18, 2024

Thanks for giving this thread some new input—it needed it!

from rq.

nicholasaiello avatar nicholasaiello commented on May 18, 2024
  • Like *

from rq.

selwin avatar selwin commented on May 18, 2024

Here's another experiment at implementing concurrent workers, this one is modeled closely after your previous comment. This one properly handles warm and cold shutdown for ForkingWorker and also introduced a few new methods that need to be implemented by a Worker subclasses.

I wrote this on a separate branch so should we want to, we can compare the pros and cons of both implementations and see which one we like better.

https://github.com/selwin/rq/blob/new-worker-redux/rq/new_worker.py

As usual, comments welcome :)

from rq.

nvie avatar nvie commented on May 18, 2024

@selwin That looks great—thanks for taking the time to work on this!

One strong advise on trying to implement this immediately in RQ: don't. I went down this route several times, only to be so deep into implementation details that the only way out was throwing everything away and starting over. The gist tries to keep the code focused and limited to the absolute minimum that the worker structure should be, isolated from any RQ functionality.

These lines, for example, introduce a regression (this behaviour was changed in 0.3.3 just last week), so let's just forget about the RQ details and make something generic:

  • the worker is just a main loop, spawning children endlessly (the children block if no free slot is avail)
  • the children do the actual work (in the minimal example, let that work be print "hello from {pid}"; randomly pauze; print "done from {pid}", instead of doing anything with Redis)
  • the worker handles ctrl+c and kindly starts waiting for children to finish
  • the children may not even notice—they shouldn't be aware that the ctrl+c happened
  • if, during this waiting period, another ctrl+c is pressed, the children should receive a SIGKILL signal from the worker
  • no proceses may be left orphanized (check ps -ef output): in other words, you may not get your shell prompt back before all "done"'s are printed
  • children may not be terminated when a single ctrl+c is pressed: the worker should wait indefinitely for them to finish. Every child should eventually report a "done" line.

Everything above should work similarly for {forking,gevent,multiprocessing}, which is hard. As of this moment, I have been unable to prevent ctrl+c's from breaking child greenlet execution, for instance. This means that the last bullet isn't satisfied in my experiments with the gevent worker.

(Also, my forking worker's code was also broken in the same way when I had the gevent monkey patch time.sleep()—you're warned!)

Shall we therefore build upon my gist to implement all the specs above, for all the different workers? Also, it would be nice to add some arg handling to the main script, so you can test behaviour of each backend quickly, for example ./new_worker.py gevent to setup gevent patching and instantiate a gevent worker. Whatever backend you select, behaviour should be the same. (I wouldn't know a way of automatically testing this.)

Thanks for all the awesome work so far!

from rq.

nvie avatar nvie commented on May 18, 2024

I've started doing the above-mentioned desired implementation in a separate project. Check it out here: https://github.com/nvie/new_workers

Behaviourally, this project is at the point where there's no different worker termination behaviour between each concurrency backend—right what I wanted!

It will need a little cleanup, and a demonstration/proof that this will work for a threading/multiprocessing pool implementation, too. After that, we can bite the bullet and apply this to RQ and rewrite the Worker class.

I'm finally getting the feeling that we can actually make it to a 0.4 with concurrency soon. (And that's a first :))

from rq.

selwin avatar selwin commented on May 18, 2024

@nvie any update on this issue? :)

from rq.

nvie avatar nvie commented on May 18, 2024

Not yet. Structural lack of time, I'm afraid :(
I'm hoping to get something done this Wednesday on my flight to PyCon, which should give me ~8 hrs of hacking time.

from rq.

selwin avatar selwin commented on May 18, 2024

Cool, hopefully it will be fruitful :)

Sent from my phone

On Mar 9, 2013, at 7:44 PM, Vincent Driessen [email protected]
wrote:

Not yet. Structural lack of time, I'm afraid :(
I'm hoping to get something done this Wednesday on my flight to PyCon,
which should give me ~8 hrs of hacking time.


Reply to this email directly or view it on
GitHubhttps://github.com//issues/45#issuecomment-14662461
.

from rq.

spleeyah avatar spleeyah commented on May 18, 2024

+1 would love to see this

from rq.

jheusser avatar jheusser commented on May 18, 2024

👍 that would be extremely useful. Fork on every job is prohibitive for my usage.

from rq.

bwzhou avatar bwzhou commented on May 18, 2024

Reading through this thread gives me the impression that the planned support for concurrency is only for a static number of children that is set when starting the worker. If we knew upfront the number of children we want for a worker, why not just start that number of workers using the old worker implementation? In other words, will there be support for dynamic number of children in this new concurrent worker?

from rq.

faliev avatar faliev commented on May 18, 2024

Any update on this issue? In resque there is jobs-per-fork plugin where you can specify number of jobs to run before fork needs to happen. Is there an easy way to do this now?

from rq.

selwin avatar selwin commented on May 18, 2024

@faliev master branch already supports custom worker classes so you do rqworker --worker-class='path.to.my.Worker'

There's an example gevent based worker implementation here.

I'll try to get some time to refactor a few methods to make writing custom workers easier this weekend. If you do end up implementing a custom worker class, please let me know which hooks are needed.

from rq.

faliev avatar faliev commented on May 18, 2024

After thinking about it for a while, I think ideal( for me ) implementation would be a prefork model, similar to Apache workers. Starting process would load up environment and fork n workers. They would process some configured number of jobs. Outside process could be used to monitor memory footprint of each worker and send signal to shut down if something looks iffy, in which case parent process would fork fresh copy.

Do you have an interest in adding prefork worker type to codebase?

from rq.

nvie avatar nvie commented on May 18, 2024

Yes, pre-fork would definitely be one of the supported worker types.

from rq.

lechup avatar lechup commented on May 18, 2024

Has anyone written any GeventWorker custom class that works with rq==0.42? There were some changes in Worker class since @sylvinus put his version here #303 ...

Any Gist would be welcome!
Thanks in advance

from rq.

sylvinus avatar sylvinus commented on May 18, 2024

@lechup: Sorry I've not kept my patch up to date with latest RQ. After lots of experimenting we concluded RQ needed more than a custom worker class to properly use gevent so we built a new taskqueue implementation inspired by RQ but built from the ground up to support only gevent workers:
http://github.com/pricingassistant/mrq

from rq.

lechup avatar lechup commented on May 18, 2024

Thanks for pointing MRQ out. I'll give it a shot!

PS: But still if someone has custom Worker Class / example with Gevent or Threads I'll be grateful for sharing.

from rq.

jhorman avatar jhorman commented on May 18, 2024

@lechup I added an updated example #303 (comment)

from rq.

lechup avatar lechup commented on May 18, 2024

Thanks! Maybe we could add it somehow to core? Set the default to current 'subprocess' worker class to preserve backward compatibility but arrange workers in module/file:

rq.workers.gevent.GeventWorker + rq.workers.subprocess.SubprocessWorker + rq.workers.base.Worker?

and add all those in module init?

rq.workers.GeventWorker + rq.workers.SubprocessWorker + rq.workers.Worker?

What do You think? @nvie ?

from rq.

joostdevries avatar joostdevries commented on May 18, 2024

I see that this issue has been stale for a while but as someone who is just switching to RQ, I'd like to offer my thoughts:

We moved away from Celery because of what kind of beast it has become supporting different backends, brokers and serializers. If RQ wants to support concurrency, I'd pick one method (preferably one that doesn't add any extra dependencies) and delegate any other implementation to plugins/other libs. (rq-gevent) etc.

I LOVE the fact that I can quickly glance over the entire RQ source and understand what's going on. With Celery + Kombu that was completely impossible.

from rq.

selwin avatar selwin commented on May 18, 2024

@joostdevries I'm completely in agreement with you. Ideally, I'd like to have a reference implementation using multiprocessing in the core library. The rest can live as separate, third party libraries implementing worker classes using different concurrency methods. I just haven't had the time/need to work on this feature yet.

from rq.

Koed00 avatar Koed00 commented on May 18, 2024

I think having a standard implementation using multiprocessing is the way to go. You don't want to get caught up in too many dependencies. Personally, I do have a practical need for this. My projects run on Heroku and that means I have to dedicate a 512Mb quadcore dyno to a single worker process. I've been moving longer running tasks to RQ and these are blocking the high priority short tasks. This has led me to set up a second worker on a separate dyno for high priority tasks .A single dyno could easily manage all of these, and more, with multiprocessing.

from rq.

arikfr avatar arikfr commented on May 18, 2024

@Koed00 as a workaround you can run multiple processes on a single Dyno (without defining multiple process types). Maybe these days there are more intelligent ways to do it, but you can run Honcho (to start multiple processes) as the process Heroku runs for you in this dyno.

from rq.

Koed00 avatar Koed00 commented on May 18, 2024

@arikfr starting multiple workers with honcho is definitely a way to do this and I've tried it. It works, but it feels hacky and the teardown of the workers fails more often than not.

I've been trying a more direct approach with:

workers = []
for i in range(4):
    p = multiprocessing.Process(target=worker.work)
    workers.append(p)
    p.start()

and this seems to work fine too, except again for the teardown.

from rq.

chenxin-csu avatar chenxin-csu commented on May 18, 2024

I think its quite a simple way to use multiprocessing this way!but how do u mean by saying about the teardown issue?

from rq.

Koed00 avatar Koed00 commented on May 18, 2024

@chenxin-csu when you use multiprocess like this you should terminate the pool instead of having each individual process handle the exit. Otherwise the pool process will throw exceptions.

from rq.

Koed00 avatar Koed00 commented on May 18, 2024

I've been trying to think of a strategy for multiprocessing and have been testing this one today:

  • worker spawns a pool (stable) of horses at init
  • horses start looking at a shared queue and picking up jobs from there
  • worker can push jobs on the queue at any time.
  • worker replenishes stable in case of accidental death.
  • on exit worker waits for the stable to finish the remaining queue items

I've put up a gist multiworker which shows the basic plan. Feedback is welcome.

from rq.

kyleterry avatar kyleterry commented on May 18, 2024

Just weighing in on gevent here. I recently dropped gevent for all my Python programming because there seems to be no hope for it ever supporting Python 3. I would use an asyncio-falling-back-to-tulip solution instead. Otherwise RQ will forever be locked into the Python 2 branch.

from rq.

jhorman avatar jhorman commented on May 18, 2024

The nice thing about gevent is if a threaded version of RQ was developed, gevent monkey patching will work. It probably doesn't need to be a gevent specific implementation. Seems useful to have threaded, multiproc, and asyncio.

from rq.

sylvinus avatar sylvinus commented on May 18, 2024

gevent is working great for our RQ-inspired project: http://github.com/pricingassistant/mrq

Still, we've found it's great to have both greenlets and processes to deal with different job requirements (io-bound & cpu-bound). We use supervisord so we can launch something like mrq-worker --processes 3 --greenlets 10 to have 30 total greenlets. I'm not sure adding threads would be very useful if you have both these cases covered.

What is quite tricky to do (and I think this is what was delaying the new_worker experiments here) is managing signals and worker termination the same way for all configs. We had to build a complete test suite to get it right: https://github.com/pricingassistant/mrq/blob/master/tests/test_interrupts.py

Happy to provide further feedback! We spent quite a lot of time on this...

from rq.

Koed00 avatar Koed00 commented on May 18, 2024

Gevent will probably always be faster if you are processing many small tasks, but as soon as you introduce a few intensive tasks, everything gets clogged up. In those cases multiprocessing will make better use of your server resources to get those slow tasks out of the way. That's the road I took with the RQ inspired project I started this week

from rq.

imaia avatar imaia commented on May 18, 2024

I also would like to know if there is a "best way" to use eventlet with rq; will span_n work?

from rq.

zyv avatar zyv commented on May 18, 2024

Too bad there is still no support for worker concurrency in the core; thinking of wrapping the default worker in Supervisor to get a simple multiprocessing thing: http://supervisord.org ...

from rq.

samuelcolvin avatar samuelcolvin commented on May 18, 2024

gevent and asyncio are very different animals, I wouldn't say "asyncio isn't suitable because gevent doesn't work". Then again switching to asyncio will require a complete rewrite, so I understand it might not be that easy to adopt.

Person I think managing separate processes is outside the bounds rq, generally whatever environment you're using to run the worker will provide a sophisticated way of running the worker multiple times - eg. both docker compose and heroku allow you to increase the worker count for a service/dyno type.

from rq.

ghsobetter avatar ghsobetter commented on May 18, 2024

I want to know how many workers can i run

from rq.

cw1427 avatar cw1427 commented on May 18, 2024

Is it possible to use golang coroutine or rust to implement to worker and wrap it to python module?

from rq.

ccrvlh avatar ccrvlh commented on May 18, 2024

For future reference, a first test with a GeventWorker is linked here: #885

from rq.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.