Job queues in python with asyncio and redis.
See documentation for more details.
Fast job queuing and RPC in python with asyncio and redis.
Home Page: https://arq-docs.helpmanual.io/
License: MIT License
Job queues in python with asyncio and redis.
See documentation for more details.
I want to send some message from worker to front end (send out the jobs to worker). And I want to close a jobs depend on received response. How can I achieve that?
Useful for:
Including getting extra()
from exceptions and information on the worker.
Suppose I add a job to a queue and later I add another job with the same job_id then even before the completion of job it returns JobStatus.complete
.
example:
import asyncio
from aiohttp import ClientSession
from arq import create_pool
from arq.connections import RedisSettings
async def download_content(ctx, url):
session: ClientSession = ctx['session']
async with session.get(url) as response:
content = await response.text()
print(f'{url}: {content:.80}...')
return len(content)
async def startup(ctx):
ctx['session'] = ClientSession()
async def shutdown(ctx):
await ctx['session'].close()
async def main():
redis = await create_pool(RedisSettings())
job_list = []
for url in ('https://facebook.com', 'https://microsoft.com', 'https://github.com'):
job = await redis.enqueue_job('download_content', url, _job_id=url)
job_list.append(job)
for j_ in job_list:
print( j_.job_id)
print(await j_.status())
# WorkerSettings defines the settings to use when creating the work,
# it's used by the arq cli
class WorkerSettings:
functions = [download_content]
on_startup = startup
on_shutdown = shutdown
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Running this twice:
(sanic) ╭─adam@adams-MacBook-Air ~/sidekik
╰─$ python arqq_.py
https://facebook.com
JobStatus.queued
https://microsoft.com
JobStatus.queued
https://github.com
JobStatus.queued
(sanic) ╭─adam@adams-MacBook-Air ~/sidekik
╰─$ python arqq_.py
https://facebook.com
JobStatus.complete
https://microsoft.com
JobStatus.complete
https://github.com
JobStatus.complete
Particularly if the function is not registered.
After some playing around, I’ve finally figured things out and am able to run some workers but now I am getting this error:
00:01:18: error serializing result of 353fa5bd0bcf4556982727732b82a34f:add_async_task even after replacing result
Traceback (most recent call last):
File "/Users/sml/.virtualenvs/cr-api-web/lib/python3.7/site-packages/arq/jobs.py", line 184, in serialize_result
return serializer(data)
TypeError: can't pickle _asyncio.Future objects
CRITICAL:arq.jobs:error serializing result of 353fa5bd0bcf4556982727732b82a34f:add_async_task even after replacing result
Traceback (most recent call last):
File "/Users/sml/.virtualenvs/cr-api-web/lib/python3.7/site-packages/arq/jobs.py", line 184, in serialize_result
return serializer(data)
TypeError: can't pickle _asyncio.Future objects
Here’s my source containing the worker:
import importlib
from arq import create_pool
from arq.connections import RedisSettings
from cr.config import Config
async def add_async_task(ctx, method_name=None, args=None, kwargs=None, use_es=False):
module_name, func_name = method_name.rsplit('.', 1)
module = importlib.import_module(module_name)
method = getattr(module, func_name)
if use_es:
kwargs['es'] = ctx['es']
await method(*args, **kwargs)
async def startup(ctx):
ctx['es'] = Config().es
async def shutdown(ctx):
await ctx['es'].close()
async def setup(sanic_app):
sanic_app.ARQ_REDIS = await create_pool(
RedisSettings(
host=Config().config.redis.host,
port=Config().config.redis.port,
password=Config().config.redis.password,
)
)
async def get_arq_redis(request):
return request.app.ARQ_REDIS
class WorkerSettings:
functions = [add_async_task]
on_startup = startup
on_shutdown = shutdown
redis_settings = RedisSettings(
host=Config().config.redis.host,
port=Config().config.redis.port,
password=Config().config.redis.password,
)
Basically, I am trying to dynamically run any coroutine that it can see within the module by passing a fully qualified module and function name together with its arguments. Is this at all possible?
Allow the elipsisification length of log messages to be changed.
test_no_jobs
and test_health_check_direct
fail with this same error:
pytest -k test_health_check_direct
Test session starts (platform: linux, Python 3.7.4, pytest 4.4.1, pytest-sugar 0.9.2)
rootdir: /home/miki/exp/arq, inifile: setup.cfg, testpaths: tests
plugins: mock-1.10.4, sugar-0.9.2, timeout-1.3.3, cov-2.6.1, aiohttp-0.3.0, toolbox-0.4
timeout: 5.0s
timeout method: signal
timeout func_only: False
collecting ...
―――――――――――――――――――――――――――――――――――― test_health_check_direct ――――――――――――――――――――――――――――――――――――
loop = <_UnixSelectorEventLoop running=False closed=False debug=False>
def test_health_check_direct(loop):
class Settings:
pass
> assert check_health(Settings) == 1
tests/test_worker.py:41:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
arq/worker.py:610: in check_health
loop = asyncio.get_event_loop()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <asyncio.unix_events._UnixDefaultEventLoopPolicy object at 0x7f080bc70210>
def get_event_loop(self):
"""Get the event loop.
This may be None or an instance of EventLoop.
"""
if (self._local._loop is None and
not self._local._set_called and
isinstance(threading.current_thread(), threading._MainThread)):
self.set_event_loop(self.new_event_loop())
if self._local._loop is None:
raise RuntimeError('There is no current event loop in thread %r.'
> % threading.current_thread().name)
E RuntimeError: There is no current event loop in thread 'MainThread'.
/usr/lib64/python3.7/asyncio/events.py:644: RuntimeError
tests/test_worker.py ⨯ 100% ██████████
Results (0.31s):
1 failed
- tests/test_worker.py:37 test_health_check_direct
97 deselected
Run in a similar way to the health check.
Separate class or associated with an actor?
20:16:07 WorkProcess: dft ran in 1.169s ← Downloader.download_content ● 88346
20:16:34 WorkProcess: shutting down worker, waiting for 6 jobs to finish
20:16:34 WorkProcess: shutting down worker after 460.835s ◆ 6 jobs done ◆ 0 failed ◆ 0 timed out
20:16:34 WorkProcess: Worker exiting after an unhandled error: CancelledError
Traceback (most recent call last):
File "/…/venv/lib/python3.5/site-packages/arq/worker.py", line 353, in start_worker
worker.run_until_complete()
File "/…/venv/lib/python3.5/site-packages/arq/worker.py", line 130, in run_until_complete
self.loop.run_until_complete(self.run())
File "/…/.pyenv/versions/3.5.2/lib/python3.5/asyncio/base_events.py", line 387, in run_until_complete
return future.result()
File "/…/.pyenv/versions/3.5.2/lib/python3.5/asyncio/futures.py", line 266, in result
raise CancelledError
concurrent.futures._base.CancelledError
20:16:34 MainProcess: worker process 17776 exited badly with exit code 1
Line 295 in fd59bfc
If pool is None
here, log a warning and close.
extra "supervisor" process is unnecessary. Should be removed
Should be possible to requeue jobs if the worker goes down.
drains are a start, but they're confusing to use.
Better to redesign to automatically restart jobs.
Would be useful.
Just ceases the current job without the error logger or printed traceback.
when testing it would be useful if you could run pending jobs but not retries.
Hi @samuelcolvin, thanks for arq!
I'd like to integrate arq with aiohttp web application to run some long jobs in the background.
Do you have some examples for this?
If I understood right I can create a remote worker specifying the redis server and run it.
Then inside the app I can create an Actor using the same redis server that sends jobs to the worker.
Is it correct?
Shadows should have some way to access "local" data eg. for objects shared between many jobs where you don't want extra redis calls in every job.
Add option where if no new jobs have occurred the "recording health" log message isn't printed.
currently only logs when failing to connect, should be another log message when successful.
Simple usage code here https://arq-docs.helpmanual.io/ does not work. Programm returns without any results. Please can you fix it?
__init__
takes context, then has run
or call
.
Also allows custom logging and name.
I've built a fairly substantial flask application which uses arq for all of its background processing. I'm pretty happy with the solution but see one deficiency that I'd like to try to address. I'd love to have some clean way to integrate visibility/monitoring of the queues that my application is leveraging.
Since rq-dashboard was designed to monitor http://python-rq.org/ queues and since you have been an significant contributor to RQ, I hoped you might offer some pointers for how I could integrate such a web front-end to monitor to arq queues, jobs, and workers. I'm looking to empower my application's users with greater insights into what is happening behind the scenes.
I'm looking to know whether forking rq-dashboard and reworking it might be feasible for me to pursue, and I'm hoping for a little guidance.
Separately, I suppose I'm also pushing for arq to offer console based "rq info" type capabilities similar to what is described at http://python-rq.org/docs/monitoring/
so jobs can be started with actor.job()
, or awaited with await actor.job()
, or called directly with await actor.job.direct()
Hello.
I try to select another RedisDB for create_pool
This is the simple code
redis = await arq_create_pool(settings=RedisSettings(host='localhost', port=6379, database=3))
job = await redis.enqueue_job('get_messages')
print(job.job_id)
print(await job.info())
print(await job.status())
print(await job.result())
When I use default settings, like a
redis = await arq_create_pool(RedisSettings())
all working fine.
For example
bf04177b3c7443778a6b0786b8b435ac
{'enqueue_time': datetime.datetime(2019, 4, 1, 12, 34, 15, 939000), 'job_try': None, 'function': 'get_messages', 'args': (), 'kwargs': {}, 'score': 1554122055939}
JobStatus.queued
[My data is here]
But when I try to use another database
redis = await arq_create_pool(settings=RedisSettings(host='localhost', port=6379, database=3))
script is stuck on print(await job.result())
In arq log I doesn't see any of actions actions
In redis database3 I see only two keys
arq:queue = score :1554122478867, membe: ef48cff3cd0e46ec86b9c2ae8642491a
arq:job:ef48cff3cd0e46ec86b9c2ae8642491a = {}
Where is my mistake?
I have a question, but I don't know what happened to it.
I run a timed arq task in production every two hours for the first minute,then I changed it to every night at 12:05,but it still runs every two hours.
May I ask if there is a command to check the execution time of timed tasks?
It would be easier to track the job status if the app knows the job id. Also, it would be easier to cancel the job.
y@wuza $ ./venv/bin/arq
Traceback (most recent call last):
File "./venv/bin/arq", line 7, in <module>
from arq.cli import cli
File "/…/venv/lib/python3.5/site-packages/arq/__init__.py", line 6, in <module>
from .worker import * # type: ignore
File "/…/venv/lib/python3.5/site-packages/arq/worker.py", line 39, in <module>
SIG_SUPERVISOR = signal.SIGRTMIN + 7
AttributeError: module 'signal' has no attribute 'SIGRTMIN'
How I'm supposed to create multiple queues with arq v0.16?
Looking at old docs I see that you could specify queue name, but now you can't.
What was the purpose of deleting queues from API?
I tried to start workers with different functions, but if worker doesn't see registered function it just deletes job.
So, if I want to use multiple queues on the same redis instance I have to use multiple redis databases (that have only numerical names)?
Would be useful to be able to get a list of enqueued jobs (with args) when testing.
Should "just work" when encoded.
Perhaps should also allow pickling job class
see #50
tasks should not take 20seconds to finish, I suspect something has gone wrong.
Also loads of 20 pending tasks, waiting for one to finish
, should be a way to suppress them
17:35:12 WorkProcess: drain waiting 20.0s for 19 tasks to finish
17:35:32 WorkProcess: dft ran in 43.929s ← Sender.send ●
Exception in callback Drain._job_callback(<Task cancell...orker.py:154>>)
handle: <Handle Drain._job_callback(<Task cancell...orker.py:154>>)>
Traceback (most recent call last):
File "/usr/lib/python3.6/asyncio/events.py", line 127, in _run
self._callback(*self._args)
File "/home/samuel/code/arq/arq/drain.py", line 154, in _job_callback
task_exception = task.exception()
concurrent.futures._base.CancelledError
Exception in callback Drain._job_callback(<Task cancell...orker.py:154>>)
handle: <Handle Drain._job_callback(<Task cancell...orker.py:154>>)>
Traceback (most recent call last):
File "/usr/lib/python3.6/asyncio/events.py", line 127, in _run
self._callback(*self._args)
File "/home/samuel/code/arq/arq/drain.py", line 154, in _job_callback
task_exception = task.exception()
concurrent.futures._base.CancelledError
Exception in callback Drain._job_callback(<Task cancell...orker.py:154>>)
handle: <Handle Drain._job_callback(<Task cancell...orker.py:154>>)>
Traceback (most recent call last):
File "/usr/lib/python3.6/asyncio/events.py", line 127, in _run
self._callback(*self._args)
File "/home/samuel/code/arq/arq/drain.py", line 154, in _job_callback
task_exception = task.exception()
concurrent.futures._base.CancelledError
Hello.
First, thanks for the great work.
But I can found answers for my question in doc.
So, may be some one can help me.
async def startup(ctx):
qredis = await arq_create_pool(settings=RedisSettings(host='localhost', port=6379, database=1))
ctx['redis_cache'] = redis_cache
And use it in my function later
class WorkerSettings:
functions = [get_messages]
on_startup = startup
on_shutdown = shutdown
async def get_messages(ctx):
redis_cache = ctx['redis_cache']
print(f"LAST_MAIL_ID: {await redis_cache.get('last_id')}")
Does I am need close this poll in on_shutdown params?
How I can run workers from python file? Not from system terminal like
# arq my_file.WorkeRname
How I can work with output log? Disable it or retranslate it in to file?
Thanks
Hi, in arq is it possible to get the status of a job that has been submitted ? Currently for version 0.14 awaiting a job returns None. Is there a provision to get the job id and query the status.
add flushall
and flushdb
I am reading your docs but I can’t find the CLI options. I would like to start multiple workers like I can in RQ. Is this possible. If so, how?
Thanks!
to use instead of getting a connection.
Or just example of how to use a pipeline directly.
Checks for any failed jobs and raises an exception from one of those failed jobs, otherwise returns the number of successful jobs.
useful when testing, could also all async_run
.
Is supporting something like Redis Sentinel within the scope of this project? I think it could be added with a few changes to connections.py
.
If it is within scope, I can send in a PR.
I'm getting a aioredis.errors.WatchVariableError: ('WatchVariableError errors:', 'WATCH variable has changed')
when I'm running my queue.
I noticed that you had already raised a similar issue at: aio-libs-abandoned/aioredis-py#558
Are you still seeing the same issue using arq? Do you by chance have a workaround for this?
I have a simple queue setup and the only thing I'm doing differently is using aioredis.create_sentinel_pool()
to generate the redis connection pool before feeding it to the redis_pool
parameter on the worker.
r = RedisConfig
sentinel_client_pool = aioredis.sentinel.create_sentinel_pool(
[(r.host, r.port)],
db=r.database,
password=r.password,
timeout=r.timeout,
encoding='utf8'
)
sentinel_client_pool = await sentinel_client_pool
redis_master = sentinel_client_pool.master_for('mymaster')
redis_pool = ArqRedis(redis_master)
Hi,
thanks for your work.
I am currently evaluating switching to async web framework and one big piece of my existing code is celery tasks. The problem is that not all of them can be converted to async because of the absence of underlying transport libraries.
So the question is if it is possible to run sync tasks in a thread with arq?
By checking the source code it seems to me that a worker always takes as many jobs as possible from the queue, independently of how many free actors are there:
Lines 260 to 264 in b2d397a
I think this has great potential for improvement. If the queue is large, a worker will keep all the job IDs in memory without being able to execute all of those in a meaningful time. On the contrary, if the semaphore indicates that it's full, why take jobs? (Or maybe we should take just a few.) I was also planning to run multiple instances of my worker, and in that case the parallelism would benefit from each worker taking a subset of jobs that they can manage.
With that in mind I propose that the above call to Redis is changed to:
job_ids = await self.pool.zrangebyscore(self.queue_name, limit=limit)
where limit
is the number of free spots in the semaphore or some multiple of that (ideally configurable, I think the optimal value would mostly depend on the use case but it's probably slightly above 1
in most of them).
I also don't quite understand the usefulness of now
. I think that in the great majority of the cases, all the enqueued jobs have a timestamp below the current timestamp. But even if they had a slightly higher timestamp, why would a free worker not run it?
When first initialising a redis server it sometimes causes connections to hang indefinitely.
Solution is to connect to redis on startup and print the version etc, then timeout and retry until a good connection is established.
would make watching the logs much easier
Log from compose down
with arq:
morpheus_worker_1|INFO arq.work got signal: SIGTERM, waiting for worker pid=24 to finish...
morpheus_worker_1|INFO arq.work pid=24, got signal proxied from main process, stopping...
morpheus_worker_1|Exception in callback Loop._read_from_self
morpheus_worker_1|handle: <Handle Loop._read_from_self>
morpheus_worker_1|Traceback (most recent call last):
morpheus_worker_1| File "uvloop/cbhandles.pyx", line 52, in uvloop.loop.Handle._run (uvloop/loop.c:48414)
morpheus_worker_1| File "uvloop/loop.pyx", line 242, in uvloop.loop.Loop._read_from_self (uvloop/loop.c:9481)
morpheus_worker_1| File "uvloop/loop.pyx", line 253, in uvloop.loop.Loop._process_self_data (uvloop/loop.c:9726)
morpheus_worker_1| File "uvloop/loop.pyx", line 267, in uvloop.loop.Loop._handle_signal (uvloop/loop.c:9912)
morpheus_worker_1| File "/usr/local/lib/python3.6/site-packages/arq/worker.py", line 356, in handle_proxy_signal
morpheus_worker_1| raise HandledExit()
morpheus_worker_1|arq.worker.HandledExit
morpheus_worker_1|WARNING arq.work pid=24, got signal: SIGALRM again, forcing exit
morpheus_worker_1|Exception in callback Loop._read_from_self
morpheus_worker_1|handle: <Handle Loop._read_from_self>
morpheus_worker_1|Traceback (most recent call last):
morpheus_worker_1| File "uvloop/cbhandles.pyx", line 52, in uvloop.loop.Handle._run (uvloop/loop.c:48414)
morpheus_worker_1| File "uvloop/loop.pyx", line 242, in uvloop.loop.Loop._read_from_self (uvloop/loop.c:9481)
morpheus_worker_1| File "uvloop/loop.pyx", line 253, in uvloop.loop.Loop._process_self_data (uvloop/loop.c:9726)
morpheus_worker_1| File "uvloop/loop.pyx", line 267, in uvloop.loop.Loop._handle_signal (uvloop/loop.c:9912)
morpheus_worker_1| File "/usr/local/lib/python3.6/site-packages/arq/worker.py", line 370, in handle_sig_force
morpheus_worker_1| raise ImmediateExit('force exit')
morpheus_worker_1|arq.worker.ImmediateExit: force exit
Like gunicorn does.
This should prevent minor memory leaks in Actors becoming a problem, should solve tutorcruncher/morpheus#68.
so if we ever change the data in jobs, then people shutdown a worker while there are jobs queued and update, they don't get errors.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.