Giter VIP home page Giter VIP logo

rq's Introduction

RQ (Redis Queue) is a simple Python library for queueing jobs and processing them in the background with workers. It is backed by Redis and it is designed to have a low barrier to entry. It should be integrated in your web stack easily.

RQ requires Redis >= 3.0.0.

Build status PyPI Coverage Code style: black

Full documentation can be found here.

Support RQ

If you find RQ useful, please consider supporting this project via Tidelift.

Getting started

First, run a Redis server, of course:

$ redis-server

To put jobs on queues, you don't have to do anything special, just define your typically lengthy or blocking function:

import requests

def count_words_at_url(url):
    """Just an example function that's called async."""
    resp = requests.get(url)
    return len(resp.text.split())

Then, create an RQ queue:

from redis import Redis
from rq import Queue

queue = Queue(connection=Redis())

And enqueue the function call:

from my_module import count_words_at_url
job = queue.enqueue(count_words_at_url, 'http://nvie.com')

Scheduling jobs are also similarly easy:

# Schedule job to run at 9:15, October 10th
job = queue.enqueue_at(datetime(2019, 10, 10, 9, 15), say_hello)

# Schedule job to run in 10 seconds
job = queue.enqueue_in(timedelta(seconds=10), say_hello)

Retrying failed jobs is also supported:

from rq import Retry

# Retry up to 3 times, failed job will be requeued immediately
queue.enqueue(say_hello, retry=Retry(max=3))

# Retry up to 3 times, with configurable intervals between retries
queue.enqueue(say_hello, retry=Retry(max=3, interval=[10, 30, 60]))

For a more complete example, refer to the docs. But this is the essence.

The worker

To start executing enqueued function calls in the background, start a worker from your project's directory:

$ rq worker --with-scheduler
*** Listening for work on default
Got count_words_at_url('http://nvie.com') from default
Job result = 818
*** Listening for work on default

That's about it.

Installation

Simply use the following command to install the latest released version:

pip install rq

If you want the cutting edge version (that may well be broken), use this:

pip install git+https://github.com/rq/rq.git@master#egg=rq

Related Projects

If you use RQ, Check out these below repos which might be useful in your rq based project.

Project history

This project has been inspired by the good parts of Celery, Resque and this snippet, and has been created as a lightweight alternative to the heaviness of Celery or other AMQP-based queueing implementations.

rq's People

Contributors

antoineleclair avatar aparcar avatar bradleyy avatar ccrvlh avatar dependabot[bot] avatar dstufft avatar eswolinsky3241 avatar foxx avatar glaslos avatar jacob414 avatar jlopex avatar joshcoden avatar kampfschlaefer avatar malthe avatar marklap avatar mgax avatar nvie avatar olingerc avatar robhudson avatar ronlut avatar rpkak avatar samuelcolvin avatar selwin avatar sim6 avatar tchapi avatar th3hamm0r avatar theambient avatar thisguycodes avatar thomasmatecki avatar zhangliyong avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

rq's Issues

Error when running rqinfo -wQ when no queues exist

When there are no queues, running rqinfo -wQ yields an error.

$ bin/rqinfo -wQ
Traceback (most recent call last):
  File "bin/rqinfo", line 147, in <module>
    main()
  File "bin/rqinfo", line 141, in main
    show_workers(opts, args, parser)
  File "bin/rqinfo", line 114, in show_workers
    max_qname = max(map(lambda q: len(q.name), queues.keys()))
ValueError: max() arg is an empty sequence

Be more consistent in the API

Currently, the Queue has a few dequeue methods, each returning a different data type:

  • -dequeue() returns an unpickled message (i.e. a 4-tuple)
  • +dequeue_any() returns a tuple with a queue name, and a pickled string representation of the 4-tuple

Quit inconsistent.

Make job id available in a job function

Hi,

I would want to store job details(job id, etc.,) in a DB(other than redis) when I enqueue a function and then update the job details when the function is executed by the worker. It would be nice if job id is available in the job function for updating job details in DB.

Would it be possible to pass the job id to the job function?

Currently, as a hack, I am updating the kwargs passed to the job function to include the job id like
self.kwargs.update({"_id": self._id}).

Bind to Redis connection on queue/worker creation time

Rather than binding access time (for example, when enqueue'ing), bind to the Redis connection at creation time.

This supports usecases where multiple Redis connections can be used while being managed by the currently existing connection stack mechanism.

Rewrite the jobs data model

The current model of serialising jobs and putting them on queues directly poses some limitations on the advanced stuff we want to build into RQ. Mainly:

  • jobs aren't addressable individually anymore (which is just as bad as the Celery behaviour)
  • due to the above, messages can't be pulled off from queues, and can't be easily requeued (at least not in random order)
  • tracking jobs isn't possible easily

We can make a simple change to the persistence model for jobs, namely by only storing the job ID, and putting all of the job data inside separate Redis keys (one key per job), containing the serialised (pickled) properties as values.

This solves the following problems:

  • this fundamentally makes jobs addressable directly (i.e. their properties can be grabbed directly if you know their job ID)
  • jobs can be taken off from queues ("unqueued") by simply removing their job key (workers could be changed to ignore any job ID on a queue if there exists no corresponding job key)
  • job status information can eventually be added directly inside the job key (whatever implementation is suitable here)
  • requeueing jobs is now extremely easy, as it will be equivalent to pushing the job ID onto any queue

Make reliable when Redis connection is dropped

rqworker should run reliably over long periods of time, without the possibility of crashing or stopping under normal circumstances. When redis-server is killed and immediately rebooted (i.e. upon a connection hiccup), rqworker should remain running normally.

Currently, it crashes due to Python Redis' driver's redis.exceptions.ConnectionError.

Look into this bug

[2011-11-28 13:57] INFO: worker: *** Listening for work on default
^C[2011-11-28 13:57] DEBUG: worker: Got SIGINT signal.
[2011-11-28 13:57] WARNING: worker: Warm shut down. Press Ctrl+C again for a cold shutdown.
[2011-11-28 13:57] DEBUG: worker: Stopping after current horse is finished.

^C[2011-11-28 13:57] WARNING: worker: Cold shut down.
[2011-11-28 13:57] DEBUG: worker: Taking down horse 54577 with me.
[2011-11-28 13:57] DEBUG: worker: Registering death
Traceback (most recent call last):
File "bin/rqworker", line 62, in <module>
    main()
File "bin/rqworker", line 59, in main
    w.work(burst=args.burst)
File "/Users/vincent/Projects/rq/rq/worker.py", line 246, in work
    job = Queue.dequeue_any(self.queues, wait_for_job)
File "/Users/vincent/Projects/rq/rq/queue.py", line 171, in dequeue_any
    queue_key, blob = conn.blpop(queue_keys)
File "/Users/vincent/.virtualenvs/rq/lib/python2.7/site-packages/redis/client.py", line 587, in blpop
    return self.execute_command('BLPOP', *keys)
File "/Users/vincent/.virtualenvs/rq/lib/python2.7/site-packages/redis/client.py", line 277, in execute_command
    return self.parse_response(connection, command_name, **options)
File "/Users/vincent/.virtualenvs/rq/lib/python2.7/site-packages/redis/client.py", line 287, in parse_response
    response = connection.read_response()
File "/Users/vincent/.virtualenvs/rq/lib/python2.7/site-packages/redis/connection.py", line 247, in read_response
    response = self._parser.read_response()
File "/Users/vincent/.virtualenvs/rq/lib/python2.7/site-packages/redis/connection.py", line 65, in read_response
    response = self.read()
File "/Users/vincent/.virtualenvs/rq/lib/python2.7/site-packages/redis/connection.py", line 59, in read
    return self._fp.readline()[:-2]
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/socket.py", line 447, in readline
    data = self._sock.recv(self._rbufsize)
File "/Users/vincent/Projects/rq/rq/worker.py", line 198, in request_force_stop
    os.kill(self.horse_pid, signal.SIGKILL)
OSError: [Errno 3] No such process

Make Jobs directly pickle'able

`
q.enqueue(f, args, kwargs)
=>
job = Job(f, args, kwargs)
job.origin = queue name
job.timestamp = now()
job.rv_key = random thing
=>
q._push(job)
serialises the job object (pickles) + writes out to Redis

q.dequeue() / Q.dequeue_any()
=>
q._pop() / Q._pop_any()
pops from Redis and deserializes Job. Keeps raw Redis data behind in case the depickling fails (to requeue it to the failure queue)

(STILL NOT CLEAR THOUGH WHO IS RESPONSIBLE FOR THE REQUEUEING!) I think the Worker should be, but it is Friday afternoon. So please revisit this on Monday :)

Worker concurrency?

It'd be nice if there was a built in mechanism to use multiprocess, eventlet, or gevent to have rworker handle n tasks concurrently.

I don't think it would complicate things.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.