Giter VIP home page Giter VIP logo

saq's People

Contributors

acreux avatar aivashov avatar barakalon avatar beedub avatar evgenii-moriakhin avatar fadeddexofan avatar grigi avatar jnordberg avatar maxyme-perceiv avatar ovresko avatar paul-finary avatar peterschutt avatar pierec avatar sondrelg avatar tobymao avatar vchan avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

saq's Issues

Dashboard doesn't show jobs

I am having a lot of trouble getting jobs to show up in the dashboard

Screenshot 2023-03-13 at 6 56 40 PM

Here is my setup:

class Queue(saq.Queue):
    """[SAQ Queue](https://github.com/tobymao/saq/blob/master/saq/queue.py)

    Configures `orjson` for JSON serialization/deserialization if not otherwise configured.

    Parameters
    ----------
    *args : Any
        Passed through to `saq.Queue.__init__()`
    **kwargs : Any
        Passed through to `saq.Queue.__init__()`
    """

    def __init__(self, *args: Any, **kwargs: Any) -> None:
        kwargs.setdefault("dump", encoder.encode)
        kwargs.setdefault("load", decoder.decode)
        kwargs.setdefault("name", "background-worker")
        super().__init__(*args, **kwargs)

    def namespace(self, key: str) -> str:
        """Make the namespace unique per app."""
        return f"{settings.app.slug}:{self.name}:{key}"


queue = Queue(redis)

...

queue_settings = {
    "queue": queue,
    "concurrency": 10,
}

def start_worker(functions: Collection[WorkerFunction]) -> None:
    global queue_settings
    """

    Args:
        functions: Functions to be called via the async workers.

    Returns:
        The worker instance, instantiated with `functions`.
    """
    # return Worker(queue, functions, concurrency=10)
    # saq.start
    queue_settings["functions"] = functions
    start("app.lib.worker.queue_settings", web=True, port=8080)

    res = await queue.enqueue(
        "initiate_training",
        data=data,
        key=initiate_training_namespace(order_id),
        timeout=30,
        retries=20,
        retry_delay=5,
        retry_backoff=True,
    )

And this is my redis keys:

1) "starlite-pg-redis-docker:background-worker:stats:30d27d7c-c20b-11ed-adf6-0242ac130003"
2) "starlite-pg-redis-docker:background-worker:incomplete"
3) "saq:job:background-worker:initiate_training:2c7880db-90a3-4791-baa0-83ace2d0098b"
4) "starlite-pg-redis-docker:background-worker:stats"
5) "starlite-pg-redis-docker:background-worker:schedule"
6) "starlite-pg-redis-docker:background-worker:sweep"
7) "saq:job:background-worker:initiate_training:c673af4b-fcd5-43cc-9c66-36de903e51a6"

Do you see anything wrong with what i am doing?

Docs & Features

Hi, we've been using Celery for a while on our projects, but it's been a pain with all the complexity and confusing/conflicting documentation (not to say the lack of).

There are a few features we are looking for on a new job queue lib, I was wondering what are your thoughts on it, and plans going forward.

  • Progress / Meta Data: we heavily use meta data on Celery tasks (self.update_state(state='PENDING', meta={'current': i, 'total': NTOTAL}) on a task with bind=True. I couldn't find anything on more granular/custom state handler on any of the libs we've found (Arq, RQ, Saq, Dramatiq), wondering what are the plans/thoughts on that. I would imagine that I could do something similar using the context, maybe?
  • Dynamic Schedule: we have used Redbeat to make Celery's Beat module dynamic, so we can be able to turn on/off a schedule programatically (that wasn't possible with Celery's Beat only). Given the examples I would think this should be possible, but couldn't find a way to do it.
  • Retry Logic: The way we are currently using retry logic is a bit weird, raise self.retry(exc=Exception(str(error)), countdown=countdown, max_retries=5) but it's been working well... wondering how would I go by doing retries using Saq.
  • Gevent: Out app runs with about 250 MB of memory, so leveraging N workers can be taxing on RAM (tasks are complex - many functions - and I/O heavy, taking up to 3-4 minutes). So in this scenario Celery gevent worker with concurrency has been great, not many workers, a ton of parallel tasks. I think Dramatiq has implemented greenlets, but RQ hasn't (there is code somewhere in the wild that implements a custom Gevent Worker if I'm not mistaken)
  • Different Queues / Priority: while we don't use the priority concept directly, we do leverage different queues (and workers that execute on specific queues only), wondering how could we implement this in Saq.

Also, just to add to the discussion and share my experiences:

  • RQ: For now we decided not to go that route because of the lack of concurrency, the API seems pretty nice though.
  • Celery: a huge pain to setup, manage and debug. Bad documentation, confusing, and even worse error messages (think of Celery and Flask contexts)
  • Dramatiq: seems nice, I tend to like RQ API's better, but the scheduler has to be a third party library. Still want to invest more time understanding the library.
  • Arq: seems great also, but it does look like is less mature than the libraries above (documentation, material, help etc), also couldn't explicitly find the things I was looking for.

I know this is quite a list in terms of features, I was just wondering what are your thoughts and if you reckon it could make sense for the future. I'm probably not the most knowledgable person, but I would be more than happy to help!

Thanks!

cronjob is not respecting current timezone

It's scheduled based on EPOCH, as the croniter is called without any timezone information. Maybe add a timezone attribute to CronJob class and use it to create datetime.datetime instance to call crontier?

Stopping Worker Cancels Running Tasks

Can stopping the worker get an option wait for all tasks to complete rather than cancel them? Would be useful to have a way to block new jobs from being processed and wait for the existing jobs to finish.

# change - allow for option to skip the task cancel() and set flag to block processing new tasks
async def stop(self) -> None:
    """Stop the worker and cleanup."""
    self.event.set()
    all_tasks = list(self.tasks)
    self.tasks.clear()
    for task in all_tasks:
        task.cancel()
    await asyncio.gather(*all_tasks, return_exceptions=True)
    
# change - check for job processing flag from worker stop and don't schedule new tasks
def _process(self, previous_task: Task | None = None) -> None:
  if previous_task:
      self.tasks.discard(previous_task)

  if not self.event.is_set():
      new_task = asyncio.create_task(self.process())
      self.tasks.add(new_task)
      new_task.add_done_callback(self._process)

Error a long task

I start a long task using a framework PlayWright

ERROR:saq:Error processing job Job<function=fgis, kwargs={'cmd': 'parse', 'cn': '67:17:0010333:59'}, queue=fgis, id=saq:job:fgis:57646e99-7bcd-11ed-b618-2d9cfaea7267, scheduled=0, progress=0.0, start_ms=23, attempts=1, status=active, meta={}>
Traceback (most recent call last):
  File "/root/playwright/saq_srv.py", line 18, in fgis
    return await w.job(cmd, **kvargs)
  File "/root/playwright/rosreestr/fgis/worker.py", line 51, in job
    self.current = await self.search.cadnum(cn, regions[cn[:2]])
  File "/root/playwright/rosreestr/fgis/_page/search.py", line 14, in cadnum
    await page.locator(".v-filterselect-input").first.type(region, delay=100)
  File "/root/playwright/.pw/lib/python3.10/site-packages/playwright/async_api/_generated.py", line 15250, in type
    await self._impl_obj.type(
  File "/root/playwright/.pw/lib/python3.10/site-packages/playwright/_impl/_locator.py", line 558, in type
    return await self._frame.type(
  File "/root/playwright/.pw/lib/python3.10/site-packages/playwright/_impl/_frame.py", line 710, in type
    await self._channel.send("type", locals_to_params(locals()))
  File "/root/playwright/.pw/lib/python3.10/site-packages/playwright/_impl/_connection.py", line 44, in send
    return await self._connection.wrap_api_call(
  File "/root/playwright/.pw/lib/python3.10/site-packages/playwright/_impl/_connection.py", line 419, in wrap_api_call
    return await cb()
  File "/root/playwright/.pw/lib/python3.10/site-packages/playwright/_impl/_connection.py", line 70, in inner_send
    done, _ = await asyncio.wait(
  File "/usr/lib/python3.10/asyncio/tasks.py", line 384, in wait
    return await _wait(fs, timeout, return_when, loop)
  File "/usr/lib/python3.10/asyncio/tasks.py", line 491, in _wait
    await waiter
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/lib/python3.10/asyncio/tasks.py", line 456, in wait_for
    return fut.result()
asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/root/playwright/.pw/lib/python3.10/site-packages/saq/worker.py", line 247, in process
    result = await asyncio.wait_for(task, job.timeout)
  File "/usr/lib/python3.10/asyncio/tasks.py", line 458, in wait_for
    raise exceptions.TimeoutError() from exc
asyncio.exceptions.TimeoutError

Relax redis dependency

Hi @tobymao, great project!

redis dependency is specified as >4.2,<4.4. Any reason why we need <4.4? Not a big deal but the readme only mention >4.2

cron job updating

hi! i'm interested in and have a rough draft for a PR for updating already scheduled cron jobs with new times. is this something you'd be interested in/open to?

Add authentication to web dashboard

It would be nice to have some authentication for the web dashboard, like flower, so not everyone can start/stop/retry tasks.

It doesn't have to be anything fancy, HTTP Basic Auth with only one user should be enough.

Alternatively, each request could require a ?token=XXX query parameter, where the token is passed in through an environment variable.

Mechanism for transferring state from enqueing process to worker

Hi @tobymao,

The before_process and after_process hooks are great. Do you think it would also be possible to add an event hook for on_enqueue.

Celery has the before_task_publish event - this is really what I'd like to mirror.

# celery example

@before_task_publish.connect
def transfer_correlation_id(headers: Dict[str, str], **kwargs: Any) -> None:
    """
    Attach request ID to the task headers.
    """
    cid = correlation_id.get() 
    if cid:
        headers[header_key] = cid

I need to be able to pass state from this event handler, to the worker, so there needs to be some data structure involved that is passed to the worker. Celery uses a plain dict I believe.

If you think this is a good idea, I'd also be happy to open a PR for this myself ๐Ÿ‘

The PYTHONOPTIMIZE variable is not supported

Hi!

I build my application in a docker image and specify the PYTHONOPTIMIZE variable equal to 1 or 2. Since the deserialization of tasks uses the removal of the queue value from the task dictionary via assert, the following error occurs:

Traceback (most recent call last):
  File "/some/path/python3.11/site-packages/saq/worker.py", line 180, in poll
    await func(arg or sleep)
  File "/some/path/python3.11/site-packages/saq/queue.py", line 271, in sweep
    job = self.deserialize(job_bytes)
  File "/some/path/python3.11/site-packages/saq/queue.py", line 130, in deserialize
    return Job(**job_dict, queue=self)
TypeError: saq.job.Job() got multiple values for keyword argument 'queue'

I would still like to use this variable in the docker image. Is there a way to fix this error?

Thanks!

Running multiple workers

As part of the comparisons with arq, you mention you can easily run multiple workers to leverage cpu cores.

I thought this implied some logic of marking which functions are run. However, if I run multiple workers processes with the same settings object, the first-run worker is the only one that executes tasks.

What am I missing? Is it just that you can define different workers to different functions?

Context Worker

The example uses context change, which doesn't work.

async def startup(ctx):
    ctx["db"] = await create_db()

The following code works
ctx['worker'].context["db"] = await create_db()

Beginners ask for help

Hello, great gods, my previous script used asyncio.Queue and list to achieve asynchronous tasks, accidentally brushed to the saq,web function attracted me, to be able to check the progress of the task in real time, which is good.
But I do not know how to change my script to use saq to achieve, I am not bug, and I do not know who to communicate, I take the liberty of sending it to issues inside is OK?

Aborting jobs and readding them

I have a queue for building jobs (90 seconds job). Whenever source code changes I want to post a new job.
I set the key of the job to the repository name so changes in the same repository gets the same job key.
If a change in the source code happens during a build I abort the job and enqueue it again.

job = await queue.job(key)
if job:
    await queue.abort(job, 'Abort job and add new job instead')
await queue.enqueue(Job(key=key, ...))

But - that won't work. The job isn't started.

Is abort not meant to be used like this? Any other way of handling this situation?

Upkeep error

Hi @tobymao!

Out of nowhere, our saq process started emitting Upkeep task failed unexpectedly errors, where the exception logger contains the following traceback:

Traceback (most recent call last):
  File \"/opt/venv/lib/python3.11/site-packages/saq/worker.py\", line 180, in poll
    await func(arg or sleep)
  File \"/opt/venv/lib/python3.11/site-packages/saq/worker.py\", line 167, in schedule
    scheduled = await self.queue.schedule(lock)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File \"/opt/venv/lib/python3.11/site-packages/saq/queue.py\", line 248, in schedule
    return await self._schedule_script(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File \"/opt/venv/lib/python3.11/site-packages/redis/commands/core.py\", line 4983, in __call__
    return await client.evalsha(self.sha, len(keys), *args)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File \"/opt/venv/lib/python3.11/site-packages/redis/asyncio/client.py\", line 487, in execute_command
    return await conn.retry.call_with_retry(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File \"/opt/venv/lib/python3.11/site-packages/redis/asyncio/retry.py\", line 59, in call_with_retry
    return await do()
           ^^^^^^^^^^
  File \"/opt/venv/lib/python3.11/site-packages/redis/asyncio/client.py\", line 463, in _send_command_parse_response
    return await self.parse_response(conn, command_name, **options)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File \"/opt/venv/lib/python3.11/site-packages/redis/asyncio/client.py\", line 505, in parse_response
    response = await connection.read_response()
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File \"/opt/venv/lib/python3.11/site-packages/redis/asyncio/connection.py\", line 963, in read_response
    raise response from None
redis.exceptions.ResponseError: user_script:12: too many results to unpack script: 15c892840ce49156b99eaf77c367913a8c069ed5, on @user_script:12.

Not sure if this is really a saq issue, but any ideas as to what could be wrong? I've flushed scripts in the redis instance, but the problem persists.

Url web prefix

Hi! Can you add url prefix in saq position arguments for run web behind proxy?

Add type annotations

It would be really great if this project had PEP 484 type hints, for better editor support and integration with Mypy.

I'd be willing to help with this.

How to use Saq Web in docker

Iยดm trying to open saq dashboard on docker.
Iยดm using de the config from a starlite template (https://github.com/starlite-api/starlite-pg-redis-docker).

Here is the the docker compose, with some changes:
version: "3.9"
services:
  redis:
    image: redis:latest
    ports:
      - "6379:6379"
    volumes:
      - cache:/data

  db:
    image: postgres:latest
    volumes:
      - db:/var/lib/postrgresql/data/
    ports:
      - "5432:5432"
    environment:
      POSTGRES_PASSWORD: postgres
      POSTGRES_DB: postgres

  mailhog:
    image: mailhog/mailhog
    ports:
      - "1025:1025"
      - "8025:8025"

  app:
    image: test/starlite:1.0
    build:
      context: .
      target: install
      args:
        INSTALL_ARGS: "--no-root"
    command: scripts/entry
    depends_on:
      - db
      - mailhog
      - redis
    ports:
      - "8000:8000"
      
    env_file:
      - .env
    volumes:
      - .:/app:cached
   saq:
    image: test/starlite:1.0
    command: python -m saq --web app.lib.workers
    ports:
      - "8080:8080"
    
        
volumes:
  db: {}
  cache: {}

It doesnยดt work if I try call the worker with lib.worker or app.app.lib.worker.
Do you have any suggestion how to call the web interface on this situation?

duplicate jobs displayed in monitor

screenshot of http://localhost:8080/queues/default

Screenshot 2023-11-18 at 1 55 39 AM

As you can see in the screenshot, there are two rows for jobs with identical key.

How to reproduce:

Run redis with docker:

  1. docker pull redis
  2. docker run --name redis-broker -p 6379:6379 -d redis
  3. confirm it's running with docker ps

within a root folder, make two files: worker.py, main.py

#worker.py
import asyncio

from saq import CronJob, Queue

# all functions take in context dict and kwargs
async def test(ctx, *, a):
    await asyncio.sleep(0.5)
    # result should be json serializable
    # custom serializers and deserializers can be used through Queue(dump=,load=)
    return {"x": a}

async def cron(ctx):
    print("i am a cron job")

async def startup(ctx):
    print("i am a start up")

async def shutdown(ctx):
    await ctx["db"].disconnect()

async def before_process(ctx):
    print(ctx["job"], ctx["db"])

async def after_process(ctx):
    pass

async def sleeper(ctx, *, a):
    await asyncio.sleep(a)
    return {"a": a}


async def adder(ctx, *, a, b):
    await asyncio.sleep(1)
    return a + b

queue = Queue.from_url("redis://localhost")

settings = {
    "queue": queue,
    "functions": [test, adder, sleeper],
    "concurrency": 10,
    # "cron_jobs": [CronJob(cron, cron="* * * * * */5")], # run every 5 seconds
    "startup": startup,
    # "shutdown": shutdown,
    # "before_process": before_process,
    # "after_process": after_process,
}
#main.py
import asyncio
import random
import time

from saq import CronJob, Queue
from worker import queue

async def cron_job(ctx):
    print("excuting cron job")


async def enqueue(count):
    queue = Queue.from_url("redis://localhost")
    for _ in range(count):
        await queue.enqueue("sleeper", a=5, timeout=40)


if __name__ == "__main__":
    now = time.time()
    asyncio.run(enqueue(count=1))
    print(time.time() - now)

  1. run saq worker.settings --web
  2. run python3 main.py
  3. you will see two identical rows in jobs section in http://localhost:8080/queues/default

Example usage in an ASGI framework?

Hi, I am trying to use saq in an ASGI framework called starlite. I'm struggling to get a basic SAQ implementation working smoothly.

Below is a minimal example where the worker gets started and the job gets scheduled, but during shutdown, the code hangs. It seems like the jobs don't get aborted or cleaned up.

Can you help suggest a working configuration?

Python Requirements:

starlite
uvicorn
saq
python-dotenv

Save file as main.py and run using: uvicorn main:app

import asyncio
import logging
import os

import uvicorn
from dotenv import load_dotenv
from saq import CronJob, Queue, Worker
from starlite import LoggingConfig, Starlite, get

########################################################
#################### ENV & LOGGING CONFIG  ################

load_dotenv()

REDIS_URL = os.getenv("REDIS_URL")

logging_config = LoggingConfig(
    loggers={
        "app": {
            "level": "DEBUG",
            "handlers": ["queue_listener"],
            "propagate": False,
        }
    }
)

logger = logging.getLogger("app")


########################################################
################  SCHEDULED JOB ########################

async def scheduled_job(ctx) -> None:
    logger.info("I'm scheduled to run by SAQ")


#########################################################
###### SAQ CONFIG: SCHEDULES REQUEST EVERY 10 SECONDS ###### 

queue = Queue.from_url(REDIS_URL)

tb = CronJob(scheduled_job, cron="* * * * * */10")  # Every 10 seconds
worker = Worker(queue=queue, functions=[], cron_jobs=[tb])


async def tasks_on_startup() -> None:
    logger.info("Starting SAQ worker")

    asyncio.create_task(worker.start())
    asyncio.create_task(worker.schedule())

    logger.info("SAQ started and tasks scheduled")


async def tasks_on_shutdown() -> None:
    logger.info("Stopping SAQ worker")

    asyncio.create_task(worker.abort())
    asyncio.create_task(worker.stop())

    logger.info("SAQ worker should be dead")


##################################
######## BASIC STARLITE APP #########

@get("/")
def hello_world() -> dict[str, str]:
    """Handler function that returns a greeting dictionary."""
    return {"hello": "world"}


app = Starlite(
    route_handlers=[hello_world],
    on_startup=[tasks_on_startup],
    on_shutdown=[tasks_on_shutdown],
    logging_config=logging_config,
    debug=True,
)

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

Fast clear all in queue

Is there some way to quickly abort all job in a queue? I'm able to enqueue at ~6k operations per second but when aborting it slows down to just 30 per second. Bottleneck seems to be redis server which is pegged at 100% cpu during this.

cron jobs running at millisecond interval in tests

Greetings, I have saq running cron jobs only.

While testing the job, I sometimes have some flaky tests.

So looking at the logs I discovered that when failing I can see saq is running the very same cron job on a very short interval, I'm talking microseconds according to the logs, while my test fixture is supposed to run it every second.

Below this is the way I'm using the fixture below in this test

so at first I thought I may have another saq process (my dev env) that interferes with my tests, but I'm pretty confident it doesnt pick any job, queues names are different, I dont see any jobs processed in it,

I secondly thought maybe invoking await worker.schedule() would "add" jobs but it is flaky commented or not.

So I'm at a loss as to why the very same job supposed to run every second is running twice milliseconds appart sometimes

Hope you may have an idea :)

@pytest.mark.anyio
async def test_order_seller_expired_before_approved(
    _ = asyncio.create_task(worker.start())
    # await worker.schedule()
    await worker.process()
    await asyncio.sleep(5)

@pytest.fixture(scope="session")
async def saq_queue(
    app_settings_test: AppSettings,
) -> AsyncGenerator[Queue, None]:
    _redis = Queue.from_url(app_settings_test.redis_url, name="worker-tests")
    yield _redis
    await _redis.redis.flushall()
    await _redis.disconnect()


@pytest.fixture
async def worker(
    saq_queue: Queue, app_settings_test: AppSettings
) -> AsyncGenerator[Worker, None]:
    from .worker import check_not_paid_not_expired
    from .worker import startup, shutdown

    _worker = Worker(
        saq_queue,
        functions=[],
        cron_jobs=[
            CronJob(check_not_paid_not_expired, cron="* * * * * */1", unique=False),
        ],
        startup=partial(startup, app_settings=app_settings_test),
        shutdown=shutdown,
    )
    yield _worker
    await _worker.stop()
    logger.debug("worker end")

Feature request: Task priority

First off, thank you very much for this very useful framework.

I do apologise in advance if this isn't the correct place to raise a feature request or if you're not looking at requests at all. In any case some basic task prioritisation would be very useful. The ideal implementation would allow multiple worker nodes (a single resource pool) to pick up tasks from a single task queue that handles the prioritisation internally, abstracted away from the consumer.

To illustrate:

from saq import Queue


# Create queue
queue = Queue.from_url("redis://localhost")

# Enqueue tasks (We assume a higher number is a higher priority)
low_priority_task = await queue.enqueue("some_task", a=5, priority=25)
high_priority_task =await queue.enqueue("some_task", a=5, priority=100)
medium_priority_task = await queue.enqueue("some_task", a=5, priority=50)

Now when starting up a worker once these jobs have been enqueued successfully, these jobs will be popped in the order:

  • high_priority_task
  • medium_priority_task
  • low_priority_task

P.S. Queue-based priorities can be nice but often have significant infrastructure/configuration trade-offs so I much prefer Enqueue-time prioritisation.

`ModuleNotFoundError` on `saq` start

Hello! Thank you for nice framework!

I'm trying to use it, but getting an exception on attempt to start worker:

$ saq app.worker.settings
Traceback (most recent call last):
  File "********/saq-test/.venv/bin/saq", line 8, in <module>
    sys.exit(main())
  File "********/saq-test/.venv/lib/python3.10/site-packages/saq/__main__.py", line
 73, in main
    start(
  File "********/saq-test/.venv/lib/python3.10/site-packages/saq/worker.py", line 2
83, in start
    settings = import_settings(settings)
  File "********/saq-test/.venv/lib/python3.10/site-packages/saq/worker.py", line 2
78, in import_settings
    module = importlib.import_module(module_path)
  File "/usr/lib/python3.10/importlib/__init__.py", line 126, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
  File "<frozen importlib._bootstrap>", line 1050, in _gcd_import
  File "<frozen importlib._bootstrap>", line 1027, in _find_and_load
  File "<frozen importlib._bootstrap>", line 992, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed
  File "<frozen importlib._bootstrap>", line 1050, in _gcd_import
  File "<frozen importlib._bootstrap>", line 1027, in _find_and_load
  File "<frozen importlib._bootstrap>", line 1004, in _find_and_load_unlocked
ModuleNotFoundError: No module named 'app'

Project structure:

 โ”œโ”€โ”€ app
 โ”‚  โ”œโ”€โ”€ __init__.py
 โ”‚  โ””โ”€โ”€ worker.py
 โ”œโ”€โ”€ poetry.lock
 โ””โ”€โ”€ pyproject.toml

worker.py contents:

from typing import Any, Dict

from saq import Queue


async def startup(_: Dict[str, Any]) -> None:
    print("Hello from startup job!")


settings = {
    "queue": Queue.from_url("redis://localhost"),
    "functions": [],
    "concurrency": 8,
    "startup": startup,
}

If I try to use importlib from interpreter, everything is OK:

>>> import importlib
>>> importlib.import_module("app.worker")
<module 'app.worker' from '********/saq-test/app/worker.py'>
>>> _.settings
{'queue': <saq.queue.Queue object at 0x7f81fcd52740>, 'functions': [], 'concurrency': 8, 'startup': <function startup at 0x7f81fcd681f0>}
>>>

I've found, that current directory not in sys.path and used PYTHONPATH to this fix the problem:

$ PYTHONPATH="$PYTHONPATH:$PWD" saq app.worker.settings
Hello from startup job!

Could you help me to figure out, is this a bug or my fault?

Show completed tasks in web UI

The web UI currently only shows in-progress and queued Tasks. Completed tasks do not show up, but it would be nice to be able to view them. It could be put behind a switch/env var too.

I can help with this.

Make repository and project itself more friendly for future contributions

I want to try to implement the plan indicated in the title and prepare the repository for future contributions and possible participation in Hacktoberfest 2023.

And so here is the plan in my vision point by point:

  1. Move all package and build managing to PDM. Something like done recently in Pydantic
  2. Use Ruff and Black (already used) to all code-style checking, linting and formatting.
  3. Add pre-commit hooks
  4. Revisit type annotations with stricter and more precise mypy configuration
  5. Migrate old test suites based on unittest to pytest
  6. Write CONTRIBUTING.md guide to cover all these steps
  7. Add hacktoberfest topic to repo ๐Ÿ˜„

Why are you using a Semaphore instead of a BlockingConnectionPool?

Hi,

I migrated from ARQ to SAQ (again: great job with this library, I find it way cleaner), and I faced some issues while migrating from aioredis 1.3.1 (used by ARQ) to redis-py 4.2.0 (used by SAQ) in other parts of my project.

During my investigation, I found out that you implemented a way to throttle the number of connections to Redis using a semaphore (_op_sem) instead of redis-py's BlockingConnectionPool (which, with the max_connections set to your max_concurrent_ops and timeout set to None, would have the same behaviour: wait for a connection to be available before executing a command).

So I wonder, is there a reason you choose to implement what seems to be your own BlockingConnectionPool instead of using redis-py's?

Can I use a custom serializer?

I want to enqueue a job which will have a class from a 3rd party lib. It is not json serializable, but I can serialize it with pickle.dumps and loads without problems. How can I use pickle instead?

Thanks in advance, kindaway.

Novice question

I'm new to microservice word, but is required of me from my work, we have an ecommerce that we want to migrate to microservice architecture. so the question is if i have one microservice in another machine and another in other machine. so if am using saq which i should say than you because it has everything i love have tried it and is very straight forward, how do i make another machine know there is a job you need to work on. thanks

Key error when calling `queue.apply` with `ttl=-1`

When calling queue.apply with ttl=-1 it raises a key error trying to access a non-existent result index.

If this edge case can't be handled, it would be great if the API checked for jobs with immediate expiry and raised a value error with an explanation.

Running an action after a task fails and there are no more retries left

Is there a built-in way to run code after the job fails and there are no more retries left? In my use case, I'm trying to update a related value in a database if the job is unsuccessful.

Or is there a way to get all the (failed) jobs? I guess querying Redis directly is one answer.

Loving the library, by the way!

May I ask the experts how the ใ€sweptใ€‘ error is caused? How to Troubleshooting?

Traceback (most recent call last):
File "/usr/local/saq-master/examples/del.py", line 95, in
asyncio.run(enqueue())
File "/root/anaconda3/lib/python3.11/asyncio/runners.py", line 190, in run
return runner.run(main)
^^^^^^^^^^^^^^^^
File "/root/anaconda3/lib/python3.11/asyncio/runners.py", line 118, in run
return self._loop.run_until_complete(task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/root/anaconda3/lib/python3.11/asyncio/base_events.py", line 653, in run_until_complete
return future.result()
^^^^^^^^^^^^^^^
File "/usr/local/saq-master/examples/del.py", line 89, in enqueue
result = await queue.apply(batch_delete_url.name, uris=uris)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/root/anaconda3/lib/python3.11/site-packages/saq/queue.py", line 580, in apply
results = await self.map(job_or_func, timeout=timeout, iter_kwargs=[kwargs])
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/root/anaconda3/lib/python3.11/site-packages/saq/queue.py", line 670, in map
raise exc
saq.queue.JobError: Job saq:job:default:3a32d23c-9806-11ee-bd3c-000c29216232 failed

The above job failed with the following error:

Traceback (most recent call last):
File "/root/anaconda3/lib/python3.11/site-packages/saq/worker.py", line 260, in process
result = await asyncio.wait_for(task, job.timeout if job.timeout else None)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/root/anaconda3/lib/python3.11/asyncio/tasks.py", line 442, in wait_for
return await fut
^^^^^^^^^
File "/usr/local/saq-master/examples/del.py", line 65, in batch_delete_url
delete_urls = await queue.map(
^^^^^^^^^^^^^^^^
File "/root/anaconda3/lib/python3.11/site-packages/saq/queue.py", line 670, in map
raise exc
saq.queue.JobError: Job saq:job:default:3a3341ea-9806-11ee-afa6-000c29216232 aborted

The above job failed with the following error:

swept

Issues in the sample project

Hi!

I'm trying a sample project in the examples folder.

And I found an "error" that appears in my terminal.

Here are the full logs:

ERROR:saq:Traceback (most recent call last):
  File "/Users/user/Projects/saq/saq/worker.py", line 228, in process
    await job.finish(Status.COMPLETE, result=result)
  File "/Users/user/Projects/saq/saq/job.py", line 218, in finish
    await self.queue.finish(self, status, result=result, error=error)
  File "/Users/user/Projects/saq/saq/queue.py", line 378, in finish
    await pipe.execute()
  File "/Users/user/Projects/saq/venv/lib/python3.9/site-packages/redis-4.3.1-py3.9.egg/redis/asyncio/client.py", line 1334, in execute
    conn = await self.connection_pool.get_connection("MULTI", self.shard_hint)
  File "/Users/user/Projects/saq/venv/lib/python3.9/site-packages/redis-4.3.1-py3.9.egg/redis/asyncio/connection.py", line 1490, in get_connection
    async with self._lock:
  File "/Users/user/.pyenv/versions/3.9.11/lib/python3.9/asyncio/locks.py", line 14, in __aenter__
    await self.acquire()
  File "/Users/user/.pyenv/versions/3.9.11/lib/python3.9/asyncio/locks.py", line 120, in acquire
    await fut
RuntimeError: Task <Task pending name='Task-3508' coro=<Worker.process() running at /Users/user/Projects/saq/saq/worker.py:228> cb=[Worker._process(), _gather.<locals>._done_callback() at /Users/user/.pyenv/versions/3.9.11/lib/python3.9/asyncio/tasks.py:767]> got Future <Future pending> attached to a different loop

Lots of errors above appear in my terminal.

Here is the command to reproduce:

Terminal 1:

docker run -p 6379:6379 redis

Terminal 2:

saq examples.simple.settings --web

Terminal 3:

python examples/simple.py

There's a slight modification in examples/simple.py:

import asyncio
import random
import time

from saq import CronJob, Queue


async def sleeper(ctx, *, a):
    await asyncio.sleep(a)
    print("sleeper", a)
    return {"a": a}


async def adder(ctx, *, a, b):
    await asyncio.sleep(1)
    print("adder", a, b)
    return a + b


async def cron_job(ctx):
    print("excuting cron job")


settings = {
    "functions": [sleeper, adder],
    "concurrency": 100,
    "cron_jobs": [CronJob(cron_job, cron="* * * * * */5")],
}


async def enqueue(func, **kwargs):
    queue = Queue.from_url("redis://localhost")
    for _ in range(10000):
        await queue.enqueue(func, **{k: v() for k, v in kwargs.items()})

async def main():
    now = time.time()
    await enqueue("sleeper", a=random.random)
    await enqueue("adder", a=random.random, b=random.random)
    print(time.time() - now)

if __name__ == "__main__":
    asyncio.run(main())

Question:

  • Does SAQ support many queues? I tried reducing the number of queues from 10000 to 100, seems no problem.

Thanks!

Queue timeout

Hi!

I've just sat down to take a stab at rewriting one of our services from arq to saq, and it's been going well! Thanks for creating this ๐Ÿ‘ Was also happy to see that it's compatible with the new redis v4.2+ stuff!

I just have one question I wasn't easily able to answer: is there a way to set a queue-specific timeout? I have two queues, where one is reserved for long running jobs. I'd rather not pass timeout to every enqueue call if possible ๐Ÿ™‚ Can I specify a timeout either in the settings, or on the queue-instance?

Support dependencies among jobs

What is the best way to do job dependencies with saq, like in rq? In rq, users can specify a depends_on argument when creating a job, and only enqueue the job if the dependencies are finished. I had thought I can hack the before_enqueue and/or before_process hook for this, but it looks like the job will be enqueued/processed no matter what happens in the hook. If anyone can help point me to a good way to support it with saq, I can write the code and do a PR.

BTW, thanks for a fast, light-weighted, yet powerful package!

Support redis 4.5.x?

  1. There's a race condition bug with redis <=4.4.3 and <=4.5.3, which was addressed in the next bugfix versions.
  2. I'm trying to install litestar 2.0.0alpha3, which depends on redis >=4.4.4 and >=4.5.4 (yeah they explicitly require so to avoid that race condition bug).

Job is not performed if there is a scheduled job

I have problems getting saq to work properly when using scheduled tasks.

As soon as I enqueue() a Job with scheduled set to something else than 0 I run into problems.

The scheduled job gets executed, but any jobs I try to enqueue during the time until the scheduled job is executed are silently ignored. They are not even performed after the scheduled job, they are simply just gone.

I've tried to use multiple workers, but that doesn't help. Any help in debugging this or help in understand how to setup this would be appreciated.

Best way to globally override Job defaults ?

Hi,

I'd like to change the default parameters for all my Jobs (e.g. generate a key via the job name and its arguments, setting the ttl to -1, increasing the timeout, etc.), but I have a bunch of jobs (10+).

I thought about defining sort of a default_job_kwargs and passing it to enqueue(), but it looks easy to forget and thus error-prone. Or I also thought about defining my jobs as instances of Job instead of functions, and using this default_job_kwargs here instead of when enqueueing them, but then again, easy to forget when defining a new job.

Is there a preferred way to globally override the defaults instead of passing those arguments every time I enqueue/implement a job ?

If that's a feature you think would be good to have, do you have an opinion/idea about how it should be implemented ?

Thanks for the help!

Default job arguments

Is there some way to specify default arguments to jobs? For example, if I want to have every job have retry_backoff on by default, how would I do that?

not working in windows due to asyncio has no add_signal_handler

windows doesn't have signal sigterm like in linux. which caused NotImplementedError thrown on running saq
best solution that i have right now is to skip add_signal_handler if os is windows

if os.name != "nt":
    for signum in self.SIGNALS:
        loop.add_signal_handler(signum, self.event.set)

Question: How to run Web monitoring component as a separate process (without workers)

I'd like to be able to run the Web queue monitor as a separate process to multiple worker processes. I've tried using --workers 0 but it doesn't seem to respect the value of 0 and still picks up tasks from the queue.

This would be useful in docker setups where you'd have a service that serves up only the Web monitoring (without any workers) and then multiple instances of a Worker service. The issue here is that the worker instances conflict with the port they're exposing.

Readiness and liveness probes

Hi ๐Ÿ‘‹

I just wanted to ask a quick question: have you spent any time considering what effective liveness/readiness probes might look like for saq @tobymao?

For a web server I think convention is to ping a healthcheck endpoint, or something equivalent. I'm not aware of an obvious equivalent for a task runner.

I'm sure this has been answered for other task runners, and I'm planning to do a little research now. Just asking in case you've either got an answer ready, or if there's an implementation detail unique to saq that would make sense to use for this ๐Ÿ‘

(I'm not running the aiohttp part of saq, so I don't want to rely on that ๐Ÿ™‚ )

Testing your app that uses SAQ

I find I'm needing to do a lot of mocking to unit test my app that uses SAQ.
It would be great to have a test double of Queue that basically does NOPs.

What would be needed to make this comprehensive enough to be included in the library as test helper?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.