tobymao / saq Goto Github PK
View Code? Open in Web Editor NEWSimple Async Queues
Home Page: https://saq-py.readthedocs.io/en/latest/
License: MIT License
Simple Async Queues
Home Page: https://saq-py.readthedocs.io/en/latest/
License: MIT License
Hi @tobymao, great project!
redis dependency is specified as >4.2,<4.4
. Any reason why we need <4.4
? Not a big deal but the readme only mention >4.2
The web UI currently only shows in-progress and queued Tasks. Completed tasks do not show up, but it would be nice to be able to view them. It could be put behind a switch/env var too.
I can help with this.
Hi! Can you add url prefix in saq position arguments for run web behind proxy?
windows doesn't have signal sigterm like in linux. which caused NotImplementedError
thrown on running saq
best solution that i have right now is to skip add_signal_handler
if os is windows
if os.name != "nt":
for signum in self.SIGNALS:
loop.add_signal_handler(signum, self.event.set)
Hi,
I migrated from ARQ to SAQ (again: great job with this library, I find it way cleaner), and I faced some issues while migrating from aioredis 1.3.1 (used by ARQ) to redis-py 4.2.0 (used by SAQ) in other parts of my project.
During my investigation, I found out that you implemented a way to throttle the number of connections to Redis using a semaphore (_op_sem
) instead of redis-py's BlockingConnectionPool (which, with the max_connections
set to your max_concurrent_ops
and timeout
set to None
, would have the same behaviour: wait for a connection to be available before executing a command).
So I wonder, is there a reason you choose to implement what seems to be your own BlockingConnectionPool instead of using redis-py's?
Hello, great gods, my previous script used asyncio.Queue and list to achieve asynchronous tasks, accidentally brushed to the saq,web function attracted me, to be able to check the progress of the task in real time, which is good.
But I do not know how to change my script to use saq to achieve, I am not bug, and I do not know who to communicate, I take the liberty of sending it to issues inside is OK?
Hi π
I just wanted to ask a quick question: have you spent any time considering what effective liveness/readiness probes might look like for saq @tobymao?
For a web server I think convention is to ping a healthcheck endpoint, or something equivalent. I'm not aware of an obvious equivalent for a task runner.
I'm sure this has been answered for other task runners, and I'm planning to do a little research now. Just asking in case you've either got an answer ready, or if there's an implementation detail unique to saq that would make sense to use for this π
(I'm not running the aiohttp part of saq, so I don't want to rely on that π )
It would be nice to have a way to pass a log config in SAQ like in ARQ like this: https://github.com/samuelcolvin/arq/blob/main/arq/cli.py#L41
Hi!
There is a great lack of support for creating/editing/deleting Cronjobs in Runtime. Can you add this?
Thanks!
I'd like to be able to run the Web queue monitor as a separate process to multiple worker processes. I've tried using --workers 0
but it doesn't seem to respect the value of 0 and still picks up tasks from the queue.
This would be useful in docker setups where you'd have a service that serves up only the Web monitoring (without any workers) and then multiple instances of a Worker service. The issue here is that the worker instances conflict with the port they're exposing.
It's scheduled based on EPOCH, as the croniter is called without any timezone information. Maybe add a timezone attribute to CronJob class and use it to create datetime.datetime instance to call crontier?
Hello! Thank you for nice framework!
I'm trying to use it, but getting an exception on attempt to start worker:
$ saq app.worker.settings
Traceback (most recent call last):
File "********/saq-test/.venv/bin/saq", line 8, in <module>
sys.exit(main())
File "********/saq-test/.venv/lib/python3.10/site-packages/saq/__main__.py", line
73, in main
start(
File "********/saq-test/.venv/lib/python3.10/site-packages/saq/worker.py", line 2
83, in start
settings = import_settings(settings)
File "********/saq-test/.venv/lib/python3.10/site-packages/saq/worker.py", line 2
78, in import_settings
module = importlib.import_module(module_path)
File "/usr/lib/python3.10/importlib/__init__.py", line 126, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
File "<frozen importlib._bootstrap>", line 1050, in _gcd_import
File "<frozen importlib._bootstrap>", line 1027, in _find_and_load
File "<frozen importlib._bootstrap>", line 992, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed
File "<frozen importlib._bootstrap>", line 1050, in _gcd_import
File "<frozen importlib._bootstrap>", line 1027, in _find_and_load
File "<frozen importlib._bootstrap>", line 1004, in _find_and_load_unlocked
ModuleNotFoundError: No module named 'app'
Project structure:
βββ app
β βββ __init__.py
β βββ worker.py
βββ poetry.lock
βββ pyproject.toml
worker.py
contents:
from typing import Any, Dict
from saq import Queue
async def startup(_: Dict[str, Any]) -> None:
print("Hello from startup job!")
settings = {
"queue": Queue.from_url("redis://localhost"),
"functions": [],
"concurrency": 8,
"startup": startup,
}
If I try to use importlib
from interpreter, everything is OK:
>>> import importlib
>>> importlib.import_module("app.worker")
<module 'app.worker' from '********/saq-test/app/worker.py'>
>>> _.settings
{'queue': <saq.queue.Queue object at 0x7f81fcd52740>, 'functions': [], 'concurrency': 8, 'startup': <function startup at 0x7f81fcd681f0>}
>>>
I've found, that current directory not in sys.path
and used PYTHONPATH
to this fix the problem:
$ PYTHONPATH="$PYTHONPATH:$PWD" saq app.worker.settings
Hello from startup job!
Could you help me to figure out, is this a bug or my fault?
IΒ΄m trying to open saq dashboard on docker.
IΒ΄m using de the config from a starlite template (https://github.com/starlite-api/starlite-pg-redis-docker).
Here is the the docker compose, with some changes:
version: "3.9"
services:
redis:
image: redis:latest
ports:
- "6379:6379"
volumes:
- cache:/data
db:
image: postgres:latest
volumes:
- db:/var/lib/postrgresql/data/
ports:
- "5432:5432"
environment:
POSTGRES_PASSWORD: postgres
POSTGRES_DB: postgres
mailhog:
image: mailhog/mailhog
ports:
- "1025:1025"
- "8025:8025"
app:
image: test/starlite:1.0
build:
context: .
target: install
args:
INSTALL_ARGS: "--no-root"
command: scripts/entry
depends_on:
- db
- mailhog
- redis
ports:
- "8000:8000"
env_file:
- .env
volumes:
- .:/app:cached
saq:
image: test/starlite:1.0
command: python -m saq --web app.lib.workers
ports:
- "8080:8080"
volumes:
db: {}
cache: {}
It doesnΒ΄t work if I try call the worker with lib.worker or app.app.lib.worker.
Do you have any suggestion how to call the web interface on this situation?
As part of the comparisons with arq, you mention you can easily run multiple workers to leverage cpu cores.
I thought this implied some logic of marking which functions are run. However, if I run multiple workers processes with the same settings object, the first-run worker is the only one that executes tasks.
What am I missing? Is it just that you can define different workers to different functions?
The example uses context change, which doesn't work.
async def startup(ctx):
ctx["db"] = await create_db()
The following code works
ctx['worker'].context["db"] = await create_db()
Traceback (most recent call last):
File "/usr/local/saq-master/examples/del.py", line 95, in
asyncio.run(enqueue())
File "/root/anaconda3/lib/python3.11/asyncio/runners.py", line 190, in run
return runner.run(main)
^^^^^^^^^^^^^^^^
File "/root/anaconda3/lib/python3.11/asyncio/runners.py", line 118, in run
return self._loop.run_until_complete(task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/root/anaconda3/lib/python3.11/asyncio/base_events.py", line 653, in run_until_complete
return future.result()
^^^^^^^^^^^^^^^
File "/usr/local/saq-master/examples/del.py", line 89, in enqueue
result = await queue.apply(batch_delete_url.name, uris=uris)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/root/anaconda3/lib/python3.11/site-packages/saq/queue.py", line 580, in apply
results = await self.map(job_or_func, timeout=timeout, iter_kwargs=[kwargs])
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/root/anaconda3/lib/python3.11/site-packages/saq/queue.py", line 670, in map
raise exc
saq.queue.JobError: Job saq:job:default:3a32d23c-9806-11ee-bd3c-000c29216232 failed
The above job failed with the following error:
Traceback (most recent call last):
File "/root/anaconda3/lib/python3.11/site-packages/saq/worker.py", line 260, in process
result = await asyncio.wait_for(task, job.timeout if job.timeout else None)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/root/anaconda3/lib/python3.11/asyncio/tasks.py", line 442, in wait_for
return await fut
^^^^^^^^^
File "/usr/local/saq-master/examples/del.py", line 65, in batch_delete_url
delete_urls = await queue.map(
^^^^^^^^^^^^^^^^
File "/root/anaconda3/lib/python3.11/site-packages/saq/queue.py", line 670, in map
raise exc
saq.queue.JobError: Job saq:job:default:3a3341ea-9806-11ee-afa6-000c29216232 aborted
The above job failed with the following error:
swept
Hi, we've been using Celery for a while on our projects, but it's been a pain with all the complexity and confusing/conflicting documentation (not to say the lack of).
There are a few features we are looking for on a new job queue lib, I was wondering what are your thoughts on it, and plans going forward.
self.update_state(state='PENDING', meta={'current': i, 'total': NTOTAL})
on a task with bind=True
. I couldn't find anything on more granular/custom state handler on any of the libs we've found (Arq, RQ, Saq, Dramatiq), wondering what are the plans/thoughts on that. I would imagine that I could do something similar using the context, maybe?raise self.retry(exc=Exception(str(error)), countdown=countdown, max_retries=5)
but it's been working well... wondering how would I go by doing retries using Saq.Also, just to add to the discussion and share my experiences:
I know this is quite a list in terms of features, I was just wondering what are your thoughts and if you reckon it could make sense for the future. I'm probably not the most knowledgable person, but I would be more than happy to help!
Thanks!
Hi @tobymao, sorry for the issue spam.
Would you mind unpinning the redis requirement upper-bound? I see the original issue is resolved, and I need to be able to run 4.4+ to fix another ]issue I'm having, migrating from a standalone to cluster redis setup π
Specifically this line: https://github.com/tobymao/saq/blob/master/setup.py#L32
I want to try to implement the plan indicated in the title and prepare the repository for future contributions and possible participation in Hacktoberfest 2023.
And so here is the plan in my vision point by point:
CONTRIBUTING.md
guide to cover all these stepshacktoberfest
topic to repo πIt would be really great if this project had PEP 484 type hints, for better editor support and integration with Mypy.
I'd be willing to help with this.
Hi @tobymao,
The before_process
and after_process
hooks are great. Do you think it would also be possible to add an event hook for on_enqueue
.
Celery has the before_task_publish
event - this is really what I'd like to mirror.
# celery example
@before_task_publish.connect
def transfer_correlation_id(headers: Dict[str, str], **kwargs: Any) -> None:
"""
Attach request ID to the task headers.
"""
cid = correlation_id.get()
if cid:
headers[header_key] = cid
I need to be able to pass state from this event handler, to the worker, so there needs to be some data structure involved that is passed to the worker. Celery uses a plain dict
I believe.
If you think this is a good idea, I'd also be happy to open a PR for this myself π
i'm very happy to see this project. but it will be nice to have a website the documentation
Hi how to show scheduled job on the dashboard and linking to detail?
there is only scheduled job counter
Hi!
I've just sat down to take a stab at rewriting one of our services from arq to saq, and it's been going well! Thanks for creating this π Was also happy to see that it's compatible with the new redis v4.2+ stuff!
I just have one question I wasn't easily able to answer: is there a way to set a queue-specific timeout? I have two queues, where one is reserved for long running jobs. I'd rather not pass timeout to every enqueue
call if possible π Can I specify a timeout either in the settings, or on the queue-instance?
I want to enqueue a job which will have a class from a 3rd party lib. It is not json serializable, but I can serialize it with pickle.dumps and loads without problems. How can I use pickle instead?
Thanks in advance, kindaway.
I'm new to microservice word, but is required of me from my work, we have an ecommerce that we want to migrate to microservice architecture. so the question is if i have one microservice in another machine and another in other machine. so if am using saq which i should say than you because it has everything i love have tried it and is very straight forward, how do i make another machine know there is a job you need to work on. thanks
Is there some way to specify default arguments to jobs? For example, if I want to have every job have retry_backoff
on by default, how would I do that?
I have problems getting saq to work properly when using scheduled tasks.
As soon as I enqueue() a Job with scheduled set to something else than 0 I run into problems.
The scheduled job gets executed, but any jobs I try to enqueue during the time until the scheduled job is executed are silently ignored. They are not even performed after the scheduled job, they are simply just gone.
I've tried to use multiple workers, but that doesn't help. Any help in debugging this or help in understand how to setup this would be appreciated.
Line 291 in 877c5dd
I missed cleaning this up from the generated types, it doesn't need to be typed this narrow.
I'll PR, just putting this here so I don't forget.
As you can see in the screenshot, there are two rows for jobs with identical key.
Run redis with docker:
docker pull redis
docker run --name redis-broker -p 6379:6379 -d redis
docker ps
within a root folder, make two files: worker.py
, main.py
#worker.py
import asyncio
from saq import CronJob, Queue
# all functions take in context dict and kwargs
async def test(ctx, *, a):
await asyncio.sleep(0.5)
# result should be json serializable
# custom serializers and deserializers can be used through Queue(dump=,load=)
return {"x": a}
async def cron(ctx):
print("i am a cron job")
async def startup(ctx):
print("i am a start up")
async def shutdown(ctx):
await ctx["db"].disconnect()
async def before_process(ctx):
print(ctx["job"], ctx["db"])
async def after_process(ctx):
pass
async def sleeper(ctx, *, a):
await asyncio.sleep(a)
return {"a": a}
async def adder(ctx, *, a, b):
await asyncio.sleep(1)
return a + b
queue = Queue.from_url("redis://localhost")
settings = {
"queue": queue,
"functions": [test, adder, sleeper],
"concurrency": 10,
# "cron_jobs": [CronJob(cron, cron="* * * * * */5")], # run every 5 seconds
"startup": startup,
# "shutdown": shutdown,
# "before_process": before_process,
# "after_process": after_process,
}
#main.py
import asyncio
import random
import time
from saq import CronJob, Queue
from worker import queue
async def cron_job(ctx):
print("excuting cron job")
async def enqueue(count):
queue = Queue.from_url("redis://localhost")
for _ in range(count):
await queue.enqueue("sleeper", a=5, timeout=40)
if __name__ == "__main__":
now = time.time()
asyncio.run(enqueue(count=1))
print(time.time() - now)
saq worker.settings --web
python3 main.py
What is the best way to do job dependencies with saq
, like in rq
? In rq
, users can specify a depends_on
argument when creating a job, and only enqueue the job if the dependencies are finished. I had thought I can hack the before_enqueue
and/or before_process
hook for this, but it looks like the job will be enqueued/processed no matter what happens in the hook. If anyone can help point me to a good way to support it with saq
, I can write the code and do a PR.
BTW, thanks for a fast, light-weighted, yet powerful package!
I start a long task using a framework PlayWright
ERROR:saq:Error processing job Job<function=fgis, kwargs={'cmd': 'parse', 'cn': '67:17:0010333:59'}, queue=fgis, id=saq:job:fgis:57646e99-7bcd-11ed-b618-2d9cfaea7267, scheduled=0, progress=0.0, start_ms=23, attempts=1, status=active, meta={}>
Traceback (most recent call last):
File "/root/playwright/saq_srv.py", line 18, in fgis
return await w.job(cmd, **kvargs)
File "/root/playwright/rosreestr/fgis/worker.py", line 51, in job
self.current = await self.search.cadnum(cn, regions[cn[:2]])
File "/root/playwright/rosreestr/fgis/_page/search.py", line 14, in cadnum
await page.locator(".v-filterselect-input").first.type(region, delay=100)
File "/root/playwright/.pw/lib/python3.10/site-packages/playwright/async_api/_generated.py", line 15250, in type
await self._impl_obj.type(
File "/root/playwright/.pw/lib/python3.10/site-packages/playwright/_impl/_locator.py", line 558, in type
return await self._frame.type(
File "/root/playwright/.pw/lib/python3.10/site-packages/playwright/_impl/_frame.py", line 710, in type
await self._channel.send("type", locals_to_params(locals()))
File "/root/playwright/.pw/lib/python3.10/site-packages/playwright/_impl/_connection.py", line 44, in send
return await self._connection.wrap_api_call(
File "/root/playwright/.pw/lib/python3.10/site-packages/playwright/_impl/_connection.py", line 419, in wrap_api_call
return await cb()
File "/root/playwright/.pw/lib/python3.10/site-packages/playwright/_impl/_connection.py", line 70, in inner_send
done, _ = await asyncio.wait(
File "/usr/lib/python3.10/asyncio/tasks.py", line 384, in wait
return await _wait(fs, timeout, return_when, loop)
File "/usr/lib/python3.10/asyncio/tasks.py", line 491, in _wait
await waiter
asyncio.exceptions.CancelledError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/lib/python3.10/asyncio/tasks.py", line 456, in wait_for
return fut.result()
asyncio.exceptions.CancelledError
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/root/playwright/.pw/lib/python3.10/site-packages/saq/worker.py", line 247, in process
result = await asyncio.wait_for(task, job.timeout)
File "/usr/lib/python3.10/asyncio/tasks.py", line 458, in wait_for
raise exceptions.TimeoutError() from exc
asyncio.exceptions.TimeoutError
Hi, I am trying to use saq
in an ASGI framework called starlite
. I'm struggling to get a basic SAQ implementation working smoothly.
Below is a minimal example where the worker gets started and the job gets scheduled, but during shutdown, the code hangs. It seems like the jobs don't get aborted or cleaned up.
Can you help suggest a working configuration?
Python Requirements:
starlite
uvicorn
saq
python-dotenv
Save file as main.py
and run using: uvicorn main:app
import asyncio
import logging
import os
import uvicorn
from dotenv import load_dotenv
from saq import CronJob, Queue, Worker
from starlite import LoggingConfig, Starlite, get
########################################################
#################### ENV & LOGGING CONFIG ################
load_dotenv()
REDIS_URL = os.getenv("REDIS_URL")
logging_config = LoggingConfig(
loggers={
"app": {
"level": "DEBUG",
"handlers": ["queue_listener"],
"propagate": False,
}
}
)
logger = logging.getLogger("app")
########################################################
################ SCHEDULED JOB ########################
async def scheduled_job(ctx) -> None:
logger.info("I'm scheduled to run by SAQ")
#########################################################
###### SAQ CONFIG: SCHEDULES REQUEST EVERY 10 SECONDS ######
queue = Queue.from_url(REDIS_URL)
tb = CronJob(scheduled_job, cron="* * * * * */10") # Every 10 seconds
worker = Worker(queue=queue, functions=[], cron_jobs=[tb])
async def tasks_on_startup() -> None:
logger.info("Starting SAQ worker")
asyncio.create_task(worker.start())
asyncio.create_task(worker.schedule())
logger.info("SAQ started and tasks scheduled")
async def tasks_on_shutdown() -> None:
logger.info("Stopping SAQ worker")
asyncio.create_task(worker.abort())
asyncio.create_task(worker.stop())
logger.info("SAQ worker should be dead")
##################################
######## BASIC STARLITE APP #########
@get("/")
def hello_world() -> dict[str, str]:
"""Handler function that returns a greeting dictionary."""
return {"hello": "world"}
app = Starlite(
route_handlers=[hello_world],
on_startup=[tasks_on_startup],
on_shutdown=[tasks_on_shutdown],
logging_config=logging_config,
debug=True,
)
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
Is there a built-in way to run code after the job fails and there are no more retries left? In my use case, I'm trying to update a related value in a database if the job is unsuccessful.
Or is there a way to get all the (failed) jobs? I guess querying Redis directly is one answer.
Loving the library, by the way!
I find I'm needing to do a lot of mocking to unit test my app that uses SAQ.
It would be great to have a test double of Queue that basically does NOPs.
What would be needed to make this comprehensive enough to be included in the library as test helper?
First off, thank you very much for this very useful framework.
I do apologise in advance if this isn't the correct place to raise a feature request or if you're not looking at requests at all. In any case some basic task prioritisation would be very useful. The ideal implementation would allow multiple worker nodes (a single resource pool) to pick up tasks from a single task queue that handles the prioritisation internally, abstracted away from the consumer.
To illustrate:
from saq import Queue
# Create queue
queue = Queue.from_url("redis://localhost")
# Enqueue tasks (We assume a higher number is a higher priority)
low_priority_task = await queue.enqueue("some_task", a=5, priority=25)
high_priority_task =await queue.enqueue("some_task", a=5, priority=100)
medium_priority_task = await queue.enqueue("some_task", a=5, priority=50)
Now when starting up a worker once these jobs have been enqueued successfully, these jobs will be popped in the order:
P.S. Queue-based priorities can be nice but often have significant infrastructure/configuration trade-offs so I much prefer Enqueue-time prioritisation.
I have a queue for building jobs (90 seconds job). Whenever source code changes I want to post a new job.
I set the key of the job to the repository name so changes in the same repository gets the same job key.
If a change in the source code happens during a build I abort the job and enqueue it again.
job = await queue.job(key)
if job:
await queue.abort(job, 'Abort job and add new job instead')
await queue.enqueue(Job(key=key, ...))
But - that won't work. The job isn't started.
Is abort not meant to be used like this? Any other way of handling this situation?
Hi @tobymao!
Out of nowhere, our saq
process started emitting Upkeep task failed unexpectedly
errors, where the exception logger contains the following traceback:
Traceback (most recent call last):
File \"/opt/venv/lib/python3.11/site-packages/saq/worker.py\", line 180, in poll
await func(arg or sleep)
File \"/opt/venv/lib/python3.11/site-packages/saq/worker.py\", line 167, in schedule
scheduled = await self.queue.schedule(lock)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File \"/opt/venv/lib/python3.11/site-packages/saq/queue.py\", line 248, in schedule
return await self._schedule_script(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File \"/opt/venv/lib/python3.11/site-packages/redis/commands/core.py\", line 4983, in __call__
return await client.evalsha(self.sha, len(keys), *args)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File \"/opt/venv/lib/python3.11/site-packages/redis/asyncio/client.py\", line 487, in execute_command
return await conn.retry.call_with_retry(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File \"/opt/venv/lib/python3.11/site-packages/redis/asyncio/retry.py\", line 59, in call_with_retry
return await do()
^^^^^^^^^^
File \"/opt/venv/lib/python3.11/site-packages/redis/asyncio/client.py\", line 463, in _send_command_parse_response
return await self.parse_response(conn, command_name, **options)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File \"/opt/venv/lib/python3.11/site-packages/redis/asyncio/client.py\", line 505, in parse_response
response = await connection.read_response()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File \"/opt/venv/lib/python3.11/site-packages/redis/asyncio/connection.py\", line 963, in read_response
raise response from None
redis.exceptions.ResponseError: user_script:12: too many results to unpack script: 15c892840ce49156b99eaf77c367913a8c069ed5, on @user_script:12.
Not sure if this is really a saq issue, but any ideas as to what could be wrong? I've flushed scripts in the redis instance, but the problem persists.
When calling queue.apply
with ttl=-1
it raises a key error trying to access a non-existent result index.
If this edge case can't be handled, it would be great if the API checked for jobs with immediate expiry and raised a value error with an explanation.
It would be nice to have some authentication for the web dashboard, like flower, so not everyone can start/stop/retry tasks.
It doesn't have to be anything fancy, HTTP Basic Auth with only one user should be enough.
Alternatively, each request could require a ?token=XXX
query parameter, where the token is passed in through an environment variable.
Hi,
I'd like to change the default parameters for all my Jobs (e.g. generate a key via the job name and its arguments, setting the ttl to -1, increasing the timeout, etc.), but I have a bunch of jobs (10+).
I thought about defining sort of a default_job_kwargs
and passing it to enqueue()
, but it looks easy to forget and thus error-prone. Or I also thought about defining my jobs as instances of Job instead of functions, and using this default_job_kwargs
here instead of when enqueueing them, but then again, easy to forget when defining a new job.
Is there a preferred way to globally override the defaults instead of passing those arguments every time I enqueue/implement a job ?
If that's a feature you think would be good to have, do you have an opinion/idea about how it should be implemented ?
Thanks for the help!
Can stopping the worker get an option wait for all tasks to complete rather than cancel them? Would be useful to have a way to block new jobs from being processed and wait for the existing jobs to finish.
# change - allow for option to skip the task cancel() and set flag to block processing new tasks
async def stop(self) -> None:
"""Stop the worker and cleanup."""
self.event.set()
all_tasks = list(self.tasks)
self.tasks.clear()
for task in all_tasks:
task.cancel()
await asyncio.gather(*all_tasks, return_exceptions=True)
# change - check for job processing flag from worker stop and don't schedule new tasks
def _process(self, previous_task: Task | None = None) -> None:
if previous_task:
self.tasks.discard(previous_task)
if not self.event.is_set():
new_task = asyncio.create_task(self.process())
self.tasks.add(new_task)
new_task.add_done_callback(self._process)
hi! i'm interested in and have a rough draft for a PR for updating already scheduled cron jobs with new times. is this something you'd be interested in/open to?
Hi!
I'm trying a sample project in the examples
folder.
And I found an "error" that appears in my terminal.
Here are the full logs:
ERROR:saq:Traceback (most recent call last):
File "/Users/user/Projects/saq/saq/worker.py", line 228, in process
await job.finish(Status.COMPLETE, result=result)
File "/Users/user/Projects/saq/saq/job.py", line 218, in finish
await self.queue.finish(self, status, result=result, error=error)
File "/Users/user/Projects/saq/saq/queue.py", line 378, in finish
await pipe.execute()
File "/Users/user/Projects/saq/venv/lib/python3.9/site-packages/redis-4.3.1-py3.9.egg/redis/asyncio/client.py", line 1334, in execute
conn = await self.connection_pool.get_connection("MULTI", self.shard_hint)
File "/Users/user/Projects/saq/venv/lib/python3.9/site-packages/redis-4.3.1-py3.9.egg/redis/asyncio/connection.py", line 1490, in get_connection
async with self._lock:
File "/Users/user/.pyenv/versions/3.9.11/lib/python3.9/asyncio/locks.py", line 14, in __aenter__
await self.acquire()
File "/Users/user/.pyenv/versions/3.9.11/lib/python3.9/asyncio/locks.py", line 120, in acquire
await fut
RuntimeError: Task <Task pending name='Task-3508' coro=<Worker.process() running at /Users/user/Projects/saq/saq/worker.py:228> cb=[Worker._process(), _gather.<locals>._done_callback() at /Users/user/.pyenv/versions/3.9.11/lib/python3.9/asyncio/tasks.py:767]> got Future <Future pending> attached to a different loop
Lots of errors above appear in my terminal.
Here is the command to reproduce:
Terminal 1:
docker run -p 6379:6379 redis
Terminal 2:
saq examples.simple.settings --web
Terminal 3:
python examples/simple.py
There's a slight modification in examples/simple.py
:
import asyncio
import random
import time
from saq import CronJob, Queue
async def sleeper(ctx, *, a):
await asyncio.sleep(a)
print("sleeper", a)
return {"a": a}
async def adder(ctx, *, a, b):
await asyncio.sleep(1)
print("adder", a, b)
return a + b
async def cron_job(ctx):
print("excuting cron job")
settings = {
"functions": [sleeper, adder],
"concurrency": 100,
"cron_jobs": [CronJob(cron_job, cron="* * * * * */5")],
}
async def enqueue(func, **kwargs):
queue = Queue.from_url("redis://localhost")
for _ in range(10000):
await queue.enqueue(func, **{k: v() for k, v in kwargs.items()})
async def main():
now = time.time()
await enqueue("sleeper", a=random.random)
await enqueue("adder", a=random.random, b=random.random)
print(time.time() - now)
if __name__ == "__main__":
asyncio.run(main())
Question:
Thanks!
Greetings, I have saq running cron jobs only.
While testing the job, I sometimes have some flaky tests.
So looking at the logs I discovered that when failing I can see saq is running the very same cron job on a very short interval, I'm talking microseconds according to the logs, while my test fixture is supposed to run it every second.
Below this is the way I'm using the fixture below in this test
so at first I thought I may have another saq process (my dev env) that interferes with my tests, but I'm pretty confident it doesnt pick any job, queues names are different, I dont see any jobs processed in it,
I secondly thought maybe invoking await worker.schedule()
would "add" jobs but it is flaky commented or not.
So I'm at a loss as to why the very same job supposed to run every second is running twice milliseconds appart sometimes
Hope you may have an idea :)
@pytest.mark.anyio
async def test_order_seller_expired_before_approved(
_ = asyncio.create_task(worker.start())
# await worker.schedule()
await worker.process()
await asyncio.sleep(5)
@pytest.fixture(scope="session")
async def saq_queue(
app_settings_test: AppSettings,
) -> AsyncGenerator[Queue, None]:
_redis = Queue.from_url(app_settings_test.redis_url, name="worker-tests")
yield _redis
await _redis.redis.flushall()
await _redis.disconnect()
@pytest.fixture
async def worker(
saq_queue: Queue, app_settings_test: AppSettings
) -> AsyncGenerator[Worker, None]:
from .worker import check_not_paid_not_expired
from .worker import startup, shutdown
_worker = Worker(
saq_queue,
functions=[],
cron_jobs=[
CronJob(check_not_paid_not_expired, cron="* * * * * */1", unique=False),
],
startup=partial(startup, app_settings=app_settings_test),
shutdown=shutdown,
)
yield _worker
await _worker.stop()
logger.debug("worker end")
Is there some way to quickly abort all job in a queue? I'm able to enqueue at ~6k operations per second but when aborting it slows down to just 30 per second. Bottleneck seems to be redis server which is pegged at 100% cpu during this.
I am having a lot of trouble getting jobs to show up in the dashboard
Here is my setup:
class Queue(saq.Queue):
"""[SAQ Queue](https://github.com/tobymao/saq/blob/master/saq/queue.py)
Configures `orjson` for JSON serialization/deserialization if not otherwise configured.
Parameters
----------
*args : Any
Passed through to `saq.Queue.__init__()`
**kwargs : Any
Passed through to `saq.Queue.__init__()`
"""
def __init__(self, *args: Any, **kwargs: Any) -> None:
kwargs.setdefault("dump", encoder.encode)
kwargs.setdefault("load", decoder.decode)
kwargs.setdefault("name", "background-worker")
super().__init__(*args, **kwargs)
def namespace(self, key: str) -> str:
"""Make the namespace unique per app."""
return f"{settings.app.slug}:{self.name}:{key}"
queue = Queue(redis)
...
queue_settings = {
"queue": queue,
"concurrency": 10,
}
def start_worker(functions: Collection[WorkerFunction]) -> None:
global queue_settings
"""
Args:
functions: Functions to be called via the async workers.
Returns:
The worker instance, instantiated with `functions`.
"""
# return Worker(queue, functions, concurrency=10)
# saq.start
queue_settings["functions"] = functions
start("app.lib.worker.queue_settings", web=True, port=8080)
res = await queue.enqueue(
"initiate_training",
data=data,
key=initiate_training_namespace(order_id),
timeout=30,
retries=20,
retry_delay=5,
retry_backoff=True,
)
And this is my redis keys:
1) "starlite-pg-redis-docker:background-worker:stats:30d27d7c-c20b-11ed-adf6-0242ac130003"
2) "starlite-pg-redis-docker:background-worker:incomplete"
3) "saq:job:background-worker:initiate_training:2c7880db-90a3-4791-baa0-83ace2d0098b"
4) "starlite-pg-redis-docker:background-worker:stats"
5) "starlite-pg-redis-docker:background-worker:schedule"
6) "starlite-pg-redis-docker:background-worker:sweep"
7) "saq:job:background-worker:initiate_training:c673af4b-fcd5-43cc-9c66-36de903e51a6"
Do you see anything wrong with what i am doing?
Hi!
I build my application in a docker image and specify the PYTHONOPTIMIZE
variable equal to 1 or 2. Since the deserialization of tasks uses the removal of the queue
value from the task dictionary via assert
, the following error occurs:
Traceback (most recent call last):
File "/some/path/python3.11/site-packages/saq/worker.py", line 180, in poll
await func(arg or sleep)
File "/some/path/python3.11/site-packages/saq/queue.py", line 271, in sweep
job = self.deserialize(job_bytes)
File "/some/path/python3.11/site-packages/saq/queue.py", line 130, in deserialize
return Job(**job_dict, queue=self)
TypeError: saq.job.Job() got multiple values for keyword argument 'queue'
I would still like to use this variable in the docker image. Is there a way to fix this error?
Thanks!
A declarative, efficient, and flexible JavaScript library for building user interfaces.
π Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. πππ
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google β€οΈ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.