Giter VIP home page Giter VIP logo

celery-once's People

Contributors

abildin avatar cameronmaske avatar gustavoalmeida avatar imomaliev avatar jwpe avatar lologhi avatar philipgarnero avatar pkariz avatar snake575 avatar sobolevn avatar streeter avatar xuhcc avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

celery-once's Issues

Controlling lock behaviour on task failure

Hi, thanks for the superb library.

I have a long running task, it takes a single argument user_id.

@task(name="some_name", base=QueueOnce)
def task_a(user_id):
    ...

I did a system restart while this task was running. This made this task fail. I use celery-once for this task because it's scheduled to run every X minutes, celery-once is working superbly in preventing task_a() from running at the same time for the same user_id.

But I can also invoke task_a() manually. When I tried to do so with the user_id when the task failed, I wouldn't get any kind of warning about task_a() being AlreadyQueued or anything - the task just wasn't sent to my workers. It's like celery stopped communicating for this task_a() when invoked with that specific user_id.

After one hour had passed, which is my global lock timeout for celery-once, I could invoke the task manually.

I find this a bit odd. I realize that the task wasn't invoked at all because there was a lock acquired on it, but shouldn't I have seen an error, warning, or any other indication that this task is being intentionally ignored?

Support redlock

Given That I am a celery-once consumer, I Should Be Able To Configure the use of redlock as provider So That I Can take advantage of the distlock algorithm to solve redis replication race conditions.

Tasks with autoretry_for stuck in locked state

I've discovered a bug in get_key method which prevents lock release.

@app.task(name='mytask', base=celery_once.QueueOnce)
def mytask(foo, bar):
    ...

mytask.delay(foo=1, bar=2)

The getcallargs(self.run, *args, **kwargs) call returns {'foo': 1, 'bar': 2}.
The get_key call returns qo_mytask_foo-1_bar-2. Everything works properly.

If I'd add autoretry_for argument:

@app.task(name='mytask_retry', base=celery_once.QueueOnce, autoretry_for=(Exception, ))
def mytask_retry(foo, bar):
    ...

mytask_retry.delay(foo=1, bar=2)

The getcallargs(self.run, *args, **kwargs) call returns {'args': (), 'kwargs': {'foo': 1, 'bar': 2}}.
The get_key call returns qo_mytask_retry_args-()_kwargs={'bar': 2, 'foo': 1}.
The {'bar': 2, 'foo': 1} part is unsorted and sometimes the order changes to {'foo': 1, 'bar': 2}.

The same issue takes place when a nested dict is passed as task's argument. For example, mytask(foo={'spam': 10, 'eggs': 20}, bar=2). The get_key call returns qo_mytask_foo-{'eggs': 20, 'spam': 10}_bar-2 and sometimes qo_mytask_foo-{'spam': 10, 'eggs': 20}_bar-2

This lead to lock release issues because get_key result is different on lock stage and release stage.

Is there any way to queue a task instead or rejecting it?

From what I see in the documentation now if I try to queue same task twice I will get an exception "AlreadyQueued()"

Is there any way that the task is still queued but doesn't actually start until the previous one (with same key) is finished instead of just dropping it?

Test files should not be installed

Installing collected packages: celery-once
Setting utime for /home/hosts/develop/venv/lib/python2.7/site-packages/tests/conftest.py
Setting utime for /home/hosts/develop/venv/lib/python2.7/site-packages/tests/__init__.py
Setting utime for /home/hosts/develop/venv/lib/python2.7/site-packages/tests/__init__.pyc
Setting utime for /home/hosts/develop/venv/lib/python2.7/site-packages/tests/conftest.pyc
...
Successfully installed celery-once-0.1.4

Advantage over vanilla celery

Hello, could you outline the advantage to using celery_once, over vanilla celery? I can start a worker with concurrency=1 and route tasks to that to achieve (what appears to be) the same thing. For example:

Start a celery worker with a named queue:

celery -n conversion-worker -Q conversion --concurrency 1

Celery queues configuration:

config['CELERY_QUEUES'] = (
    Queue('conversion', Exchange('conversion', type='direct'), routing_key='convert'),
)

And route the task to that queue:

@app.celery.task(queue="conversion")
def convert():
    pass

Thanks!

Getting celery beat error 'SchedulingError: Couldn't apply scheduled task'

Hi,
Thank you for writing a useful project for celery.

I am attempting to use this with scheduled tasks that run every minute that are configured in settings.py using celery beat config like below:

CELERYBEAT_SCHEDULE = {
    'check_products': {
      'task': 'com.ixlayer.tasks.check_products', 'schedule': timedelta(minutes=1),
    },
}


However sometimes I get an exception in the process that runs celery beat.

Traceback (most recent call last):
  File "/Users/kowsari/workspace/ixlayer-be/venv/lib/python2.7/site-packages/celery/beat.py", line 222, in apply_entry
    result = self.apply_async(entry, producer=producer, advance=False)
  File "/Users/kowsari/workspace/ixlayer-be/venv/lib/python2.7/site-packages/celery/beat.py", line 328, in apply_async
    entry, exc=exc)), sys.exc_info()[2])
  File "/Users/kowsari/workspace/ixlayer-be/venv/lib/python2.7/site-packages/celery/beat.py", line 320, in apply_async
    **entry.options)
  File "/Users/kowsari/workspace/ixlayer-be/venv/lib/python2.7/site-packages/celery_once/tasks.py", line 99, in apply_async
    raise e
SchedulingError: Couldn't apply scheduled task check_products:

I am using this configuration.

app.conf.ONCE = {
  'backend': 'celery_once.backends.Redis',
  'settings': {
    'url': settings.BROKER_URL,
    'default_timeout': 60 * 60
  }
}

Is this celerybeat getting the AlreadyQueued exception? Or is this something else?
Help is much appreciated.

Thanks

Track the task id to enable revoking and requeuing

I'd like to use celery-once to manage the refresh of a PostgreSQL materialized view when the user updates the underlying tables. The user performs many updates in succession and refreshing the view takes about 5 minutes. Therefore, I want to:

  • submit the task with countdown=120 so that I wait for 2 minutes after the last update to see if there are any more updates before executing the task
  • if a new task is submitted during that countdown, then I reset the countdown (or replace the old task submission with the new one)
  • once the countdown is reached and the task execution starts, then the next update should put a new entry on the queue

In order to do that, I think that I need to trap the AlreadyQueued error and use it to revoke the existing task and clear the lock and then submit it again. That would be much easier if the lock stored the id of the task that is going to be executed, rather than the timeout.

Does holding the timeout as the lock value serve any purpose? Or by the time the remaining seconds reaches zero, has the lock expired anyway, so when you attempt to get the key it no longer exists?

Lock gets deleted shortly after it's set

My setup is intricate, so this may not be the fault of celery-once... Working on an example repo to reproduce, but if anyone has seen this before, would love to know.

Specifically, what I'm doing is monitoring the redis server while running my tasks, and I see something like GET, SETEX, DEL, on the celery-once key, all in quick succession.

No such file or directory: '/tmp/celery_once/ using file based backend

My tasks crash and do not run anymore due to a no such file error.
I'm using the file based backend and this is my set up.

settings.py

CELERY_ONCE = {
'backend': 'celery_once.backends.File',
'settings': {
'location': '/tmp/celery_once/',
'default_timeout': 60 * 60
}
}

CELERY_BROKER_URL = 'pyamqp://rabbitmq:5672'
CELERY_RESULT_BACKEND = 'django-db'

CELERYD_HIJACK_ROOT_LOGGER = False

use json format for everything

CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_TIMEZONE = 'UTC'
CELERYBEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'

CELERY_ONCE = {
'backend': 'celery_once.backends.File',
'settings': {
'location': '/tmp/celery_once/',
'default_timeout': 60 * 60
}
}

celery.py

from future import absolute_import
import os

from celery import Celery
from django.conf import settings

set the default Django settings module for the 'celery' program.

all = [
'celery',
'QueueOnce',
]
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'autobets.settings')
os.environ.setdefault('DJANGO_CONFIGURATION', 'Development')
import configurations

configurations.setup()

app = Celery('autobets')

Using a string here means the worker will not have to

pickle the object when using Windows.

app.config_from_object('django.conf:settings', namespace='CELERY')
app.conf.ONCE = settings.CELERY_ONCE
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

tasks.py

@shared_task(bind=True,base=QueueOnce, once={'graceful': True})
def get_events(self):
do stuff bla bla bla

Before using celeryonce tasks would run as normal.

Do I need to create a tmp backend file first or does celery once create this?

stack trace.

[2020-02-13 20:11:43,505: INFO/MainProcess] mingle: searching for neighbors
[2020-02-13 20:11:44,810: INFO/MainProcess] mingle: all alone
[2020-02-13 20:11:44,865: WARNING/MainProcess] /usr/local/lib/python3.6/site-packages/celery/fixups/django.py:200: UserWarning: Using settings.DEBUG leads to a memory leak, never use this setting in production environments!
  warnings.warn('Using settings.DEBUG leads to a memory leak, never '
[2020-02-13 20:11:44,866: INFO/MainProcess] celery@9f2b7c337579 ready.
[2020-02-13 20:11:44,868: INFO/MainProcess] Received task: apimb.tasks.get_events[b423ad7e-cf2b-4137-8f2d-25bb934dc148]  
[2020-02-13 20:11:44,872: INFO/MainProcess] Received task: apimb.tasks.get_balance[6af923d7-f1a1-451a-bc8d-e4f07f3b2f06]  
[2020-02-13 20:11:44,876: INFO/MainProcess] Received task: apimb.tasks.get_orders[3c30ddee-22a2-4831-b256-0511ed1b8cc6]  
[2020-02-13 20:11:45,416: WARNING/ForkPoolWorker-1] login has been calledAPIClient
[2020-02-13 20:11:45,502: WARNING/ForkPoolWorker-1] GET Balance task has been called now
[2020-02-13 20:11:45,586: WARNING/ForkPoolWorker-2] login has been calledAPIClient
[2020-02-13 20:11:45,637: INFO/ForkPoolWorker-1] Task apimb.tasks.get_balance[6af923d7-f1a1-451a-bc8d-e4f07f3b2f06] succeeded in 0.6551041000020632s: None
[2020-02-13 20:11:45,652: WARNING/ForkPoolWorker-2] GET EVENTS task has been called now
[2020-02-13 20:11:45,706: WARNING/ForkPoolWorker-1] /usr/local/lib/python3.6/site-packages/celery/app/trace.py:561: RuntimeWarning: Exception raised outside body: FileNotFoundError(2, 'No such file or directory'):
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/celery/app/trace.py", line 465, in trace_task
    state, retval, uuid, args, kwargs, None,
  File "/usr/local/lib/python3.6/site-packages/celery_once/tasks.py", line 141, in after_return
    self.once_backend.clear_lock(key)
  File "/usr/local/lib/python3.6/site-packages/celery_once/backends/file.py", line 76, in clear_lock
    os.remove(lock_path)
FileNotFoundError: [Errno 2] No such file or directory: '/tmp/celery_once/qo_apimb.tasks.ge_52094e3196ca05f36eb013e2b749ce03'

  exc, exc_info.traceback)))
[2020-02-13 20:11:45,750: ERROR/ForkPoolWorker-1] Task apimb.tasks.get_balance[6af923d7-f1a1-451a-bc8d-e4f07f3b2f06] raised unexpected: FileNotFoundError(2, 'No such file or directory')
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/celery/app/trace.py", line 465, in trace_task
    state, retval, uuid, args, kwargs, None,
  File "/usr/local/lib/python3.6/site-packages/celery_once/tasks.py", line 141, in after_return
    self.once_backend.clear_lock(key)
  File "/usr/local/lib/python3.6/site-packages/celery_once/backends/file.py", line 76, in clear_lock
    os.remove(lock_path)
FileNotFoundError: [Errno 2] No such file or directory: '/tmp/celery_once/qo_apimb.tasks.ge_52094e3196ca05f36eb013e2b749ce03'
[2020-02-13 20:11:45,813: WARNING/ForkPoolWorker-1] GET ORDERS task has been called now
[2020-02-13 20:11:45,860: INFO/ForkPoolWorker-1] Task apimb.tasks.get_orders[3c30ddee-22a2-4831-b256-0511ed1b8cc6] succeeded in 0.10413699999844539s: None
[2020-02-13 20:11:45,875: WARNING/ForkPoolWorker-1] /usr/local/lib/python3.6/site-packages/celery/app/trace.py:561: RuntimeWarning: Exception raised outside body: FileNotFoundError(2, 'No such file or directory'):
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/celery/app/trace.py", line 465, in trace_task
    state, retval, uuid, args, kwargs, None,
  File "/usr/local/lib/python3.6/site-packages/celery_once/tasks.py", line 141, in after_return
    self.once_backend.clear_lock(key)
  File "/usr/local/lib/python3.6/site-packages/celery_once/backends/file.py", line 76, in clear_lock
    os.remove(lock_path)
FileNotFoundError: [Errno 2] No such file or directory: '/tmp/celery_once/qo_apimb.tasks.ge_55d76a4bebdad047aea38a8fd7e07c8e'

  exc, exc_info.traceback)))
[2020-02-13 20:11:45,937: ERROR/ForkPoolWorker-1] Task apimb.tasks.get_orders[3c30ddee-22a2-4831-b256-0511ed1b8cc6] raised unexpected: FileNotFoundError(2, 'No such file or directory')
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/celery/app/trace.py", line 465, in trace_task
    state, retval, uuid, args, kwargs, None,
  File "/usr/local/lib/python3.6/site-packages/celery_once/tasks.py", line 141, in after_return
    self.once_backend.clear_lock(key)
  File "/usr/local/lib/python3.6/site-packages/celery_once/backends/file.py", line 76, in clear_lock
    os.remove(lock_path)
FileNotFoundError: [Errno 2] No such file or directory: '/tmp/celery_once/qo_apimb.tasks.ge_55d76a4bebdad047aea38a8fd7e07c8e'
[2020-02-13 20:11:48,857: INFO/ForkPoolWorker-2] Task apimb.tasks.get_events[b423ad7e-cf2b-4137-8f2d-25bb934dc148] succeeded in 3.861357500001759s: None
[2020-02-13 20:11:48,878: WARNING/ForkPoolWorker-2] /usr/local/lib/python3.6/site-packages/celery/app/trace.py:561: RuntimeWarning: Exception raised outside body: FileNotFoundError(2, 'No such file or directory'):
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/celery/app/trace.py", line 465, in trace_task
    state, retval, uuid, args, kwargs, None,
  File "/usr/local/lib/python3.6/site-packages/celery_once/tasks.py", line 141, in after_return
    self.once_backend.clear_lock(key)
  File "/usr/local/lib/python3.6/site-packages/celery_once/backends/file.py", line 76, in clear_lock
    os.remove(lock_path)
FileNotFoundError: [Errno 2] No such file or directory: '/tmp/celery_once/qo_apimb.tasks.ge_f98116d19c287c5287360c9ef915d089'

  exc, exc_info.traceback)))
[2020-02-13 20:11:48,910: ERROR/ForkPoolWorker-2] Task apimb.tasks.get_events[b423ad7e-cf2b-4137-8f2d-25bb934dc148] raised unexpected: FileNotFoundError(2, 'No such file or directory')
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/celery/app/trace.py", line 465, in trace_task
    state, retval, uuid, args, kwargs, None,
  File "/usr/local/lib/python3.6/site-packages/celery_once/tasks.py", line 141, in after_return
    self.once_backend.clear_lock(key)
  File "/usr/local/lib/python3.6/site-packages/celery_once/backends/file.py", line 76, in clear_lock
    os.remove(lock_path)
FileNotFoundError: [Errno 2] No such file or directory: '/tmp/celery_once/qo_apimb.tasks.ge_f98116d19c287c5287360c9ef915d089'```


Only delay and apply_async check against the lock.

Hello,

I am struglling to make celery-once check against the lock when using celery.send_task.
Can you explain me the reason why it fails to check against the lock when using send_task ?
Is it possible to add this feature to the library ?

I am making the flower rest api available to the end users to make calls to my tasks and it would be great if celery_once could prevent calling the task when using send_task endpoint ?

Thanks,

Regards

Suggestion: Custom key generation function

Hi,
First of all, thanks for creating this library!

Related to the key collisions issue #19, I was wondering if passing a function that generates the key from the task's arguments as an alternative to specifying the arguments would be a good solution?

In my case, I need to add the tenant's name somewhere in the key because I want the tasks to only be locked for the same tenant.

The solution proposed seems to address this kind of issues in a generic way.

Don't return lock after task ends

I want a task to not release the lock when it returns. Effectively, when task_A is called with the same signature, I want it to fail gracefully until redis expires the lock. This ensures the same task can't be called with the same signature until the cache releases the lock after the default timeout.

I tried unlock_before_run but that doesn't seem to work. With my current set up, Once blocks tasks from being run in parallel but I want to enforce an hour long blocking period until a task with the same signature can be re-run. (For context, I'm doing this to prevent multiple notification emails from getting sent within a short time period in my app)

Long running tasks do not seem to hold the lock

I have some very long running processes (48-60h) but I've noticed the running tasks are released before the task finishes an thus other workers start processing the same task.
If I simulate the long task by a simple sleep command of a few minutes, all task locks are preserved as expected. Am I missing a setting?
Below are my settings for Celery 3.1.20

`# -- coding: utf-8 --
BROKER_URL = 'redis://localhost:6379/8'

BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 262800} # 73 hours.

Celery result backend

CELERY_RESULT_BACKEND = 'redis://localhost:6379/8'
CELERY_RESULT_PERSISTENT = True

CELERY_ACKS_LATE = True
CELERYD_CONCURRENCY = 1
CELERYD_PREFETCH_MULTIPLIER = 1

CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT=['json']
CELERY_TIMEZONE = 'Europe/Amsterdam'
CELERY_ENABLE_UTC = True

List of modules to import when celery starts.

CELERY_IMPORTS = ('worker.tasks', )

CeleryOnce for task locking

ONCE_REDIS_URL = 'redis://localhost:6379/8'
ONCE_DEFAULT_TIMEOUT = 259200 # 72 hours
`

Bug: OSError Filename too long

I'm using the File backend and I'm passing a relatively large dictionary as a param to the celery task. Is it possible to use a hash or something else instead of the function params to generate the task filename?

AttributeError: 'Settings' object has no attribute 'ONCE'

I am always receiving this error in celery logs after execution like

[2017-07-10 15:40:33,053: CRITICAL/MainProcess] Task provider.KENO.tasks.add[1da681d2-2815-4257-b139-01facaaef67d] INTERNAL ERROR: AttributeError(u"'Settings' object has no attribute 'ONCE'",)
Traceback (most recent call last):
  File "/Users/deanchristianarmada/Desktop/projects/radars/radar/lib/python2.7/site-packages/celery/app/trace.py", line 299, in trace_task
    state, retval, uuid, args, kwargs, None,
  File "/Users/deanchristianarmada/Desktop/projects/radars/radar/lib/python2.7/site-packages/celery_once/tasks.py", line 129, in after_return
    self.once_backend.clear_lock(key)
  File "/Users/deanchristianarmada/Desktop/projects/radars/radar/lib/python2.7/site-packages/celery_once/tasks.py", line 52, in once_backend
    return import_backend(self.once_config)
  File "/Users/deanchristianarmada/Desktop/projects/radars/radar/lib/python2.7/site-packages/celery_once/tasks.py", line 48, in once_config
    return self.config.ONCE
  File "/Users/deanchristianarmada/Desktop/projects/radars/radar/lib/python2.7/site-packages/celery/datastructures.py", line 353, in __getattr__
    type(self).__name__, k))
AttributeError: 'Settings' object has no attribute 'ONCE'

My celery version is 3.1.24. I tried it in 4.02 and the error still exists

My code is:

from celery import Celery
from celery_once import QueueOnce

celery = Celery('tasks', broker='amqp://guest:guest@localhost:5672//')
celery.conf.ONCE = {
  'backend': 'celery_once.backends.Redis',
  'settings': {
    'url': 'redis://127.0.0.1:6379',
    'default_timeout': 60 * 60
  }
}

@celery.task(base=QueueOnce)
def add(x, y):
    return x + y

I run it through terminal celery -A proj -l info and coded in python manage.py shell like:

>>> from app import add
>>> x = add.apply_async(args=[6, 74])
>>> 

get_key function return different key for same function.

I'm use celery_once=3.0.0 in my project.

I have a task like:

@celery.task(base=QueueOnce)
def calculate_something():

But when I run the task on second time, it always throw AlreadyQueued exception. Seems like the key didn't been delete after this function ends.

So I create a class

class TestQueueOnce(QueueOnce):

and call get_key and print the key in apply_async and after_return

I found the key is different!

in after_return, the key is app.worker.analyze_tasks.calculate_something_args-()_kwargs-{}
in apply_async, the key is app.worker.analyze_tasks.calculate_something

Django

It does not work in Django applications. Are you aware?

cannot be used as base class?

in celery 4.0, we can use class to create a task as:

from celery.task import Task

class DemoTask(Task):
    
    def run(self, *args, **kwargs):
        #do something

but if using celery-once, how to do that? I am trying to:

from celery_once import QueueOnce

class DemoTask2(QueueOnce):
    
    def run(self, *args, **kwargs):
        #do something

but DemoTask2 task do not be loaded

celery-once and Redlock

I hope this issue doesn't come as too pedantic but the README mentions that celery-once uses the Redlock algorithm.

After digging a bit it looks like redis.lock.Lock uses a plain lock on a single instance, whereas Redlock is about using multiple Redis instances in parallel for fault-tolerance.

The confusion probably comes from the fact that both algorithms are presented in the same page in the Redis documentation.

Task conflict detection at runtime

One thing that confused me is that celery_once tries to detect conflicts and acquire locks before inputting a task into the queue and not when a task is pulled from the queue to run. Do you have opinions about also including a task existance check as part of the QueueOnce.__call__ method?

With Flask-SQLAlchemy query, I get RuntimeError: Working outside of application context.

Thank you for the great work on this library!

Info Details
OS name and version macOS High Sierra v10.13.3
Broker RabbitMQ
Other details RedBeat, Flask-SQLAlchemy

Following the usage directions worked perfectly out of the box, but once I needed to lookup a record in my database, I get a RuntimeError: Working outside of application context.

This code works

@celery.task(base=QueueOnce, once={'graceful': True})
def slow_task(id):
    sleep(20)
    return "Done!"

This code doesn't work

@celery.task(base=QueueOnce, once={'graceful': True})
def slow_task(id):
    result = CustomORModel.query.filter_by(id=id).first()
    do_something_with(result)
    return "Fails!!"

Using celery's base Task class plays nicely with my db lookup--that is, it works when I remove base=QueueOnce, once={'graceful': True}, but I need the task locking mechanism offered by celery-once.

Anyone else have experience with this? Maybe, I'm missing something pretty basic in my setup or the way I'm calling my query.

Any help would be greatly appreciated! Thanks in advance.

Connie

get_key doesn't work with autoretry_for

OS: debian 9 stretch

When autoretry_for is passed as a kwarg to a task, celery wraps task.run method and stores original task.run method in task._orig_run. Currently get_key always checks task.run and the inspection of arguments doesn't return the right values. The solution is to check if _orig_run exists on task and if it does, inspect this method for arguments. I have created a PR #75 which includes a proposed fix and a test which covers that problem.

Ability to chain tasks

I tried with many task dedupe libs including yours and this doesn't work:

(task1.si(42) | task2.s(1)).delay()
(task1.si(42) | task2.s(2)).delay()

I want task1(42) to be deduped and then both task2(1) and task2(2) be ran after it finished. Is this possible?

Allow not setting any timeout on the lock

This is more a feature request/suggestion than a bug.

I sugest adding the possibility of setting timeout = None on the task (and also on the ONCE_DEFAULT_TIMEOUT configuration key). When doing so, no TTL would be set on the lock keys. Only the completion (or the failure) of the task itself would remove the lock, in the Task.after_return method.

I think this could be useful in the case of regular tasks with a highly variable time of execution, given the amount of work they have to perform. In this case, we just want to make sure the task is not executed concurrently multiple times, intead of having to guess an appropriate timeout.

It also should not break any pre-existing behaviour. What do you think?

Using redis through a unix socket

I'd like to use redis with a unix socket instead of TCP connection. I changed ONCE_REDIS_URL (ONCE_REDIS_URL = 'unix:/tmp/redis.sock:1') but I got an error when celery_once tried to set the redis port.

/usr/local/lib/python2.7/dist-packages/redis/connection.pyc in __init__(self, host, port, db, password, socket_timeout, socket_connect_timeout, socket_keepalive, socket_keepalive_options, retry_on_timeout, encoding, encoding_errors, decode_responses, parser_class, socket_read_size)
    397         self.pid = os.getpid()
    398         self.host = host
--> 399         self.port = int(port)
    400         self.db = db
    401         self.password = password

TypeError: int() argument must be a string or a number, not 'NoneType'

Support other backends than Redis

Memcache comes to mind for distributed locks and it'd be nice to use that instead of Redis if it's more likely to be available than Redis

Suggestion: Add how many tasks are permitted

I have some use cases when is good to control how many allowed tasks are permitted. I workaround giving some parameters, to have this control, for example:

  sender.add_periodic_task(10, cleaning_file.s("1"), name='Cleaning Worker 1')
  sender.add_periodic_task(10, cleaning_file.s("3"), name='Cleaning Worker 2')

Would be nice to have that kind of configuration such as a parameter called max_task:

@app.task(base=QueueOnce, once={'graceful': True, 'max_task': 2}, max_retries=0)
def cleaning_file(key="default"):

Task with base OnceQueue causes my celery task invisible.

When i use rabbitmq as my celery broker it works fine. But if i prefer to use redis as celery broker and celery-once broker at same time my OnceQueue task becomes invisible. Worker doesn't receive that task.

By the way, if i remove OnceQueue it works fine too.

1 Running, 1 Queued (per set of keys)

EDIT: replaced "bucket" with segment since it's a group unified by a key but not a fixed number of "buckets" as the term would normally be used

The "toy" case on the need for 1 running and 1 queued goes something like... assume we're updating a standard deviation field for a specific segment of values:

  • We add a value to the DB
  • We queue that segment for calculation
  • The task starts and loads the segment (and starts work)
  • We add a value to the DB
  • We try to queue the segment (but celery-once blocks it)
  • The task finishes (without incorporating the latest value)

If we don't queue a second copy when we add the new value, the stdef ends up out of date. The "1 running" limitation is due to the nature of the task -- which is not idempotent (and can't be made so).

It sounds like:

  • The package will always ensure only one is queued
  • unlock_before_run would make it possible to have a running and a queued
  • #26 shows how to ensure only 1 is running

However, the documentation on unlock_before_run states "any retry of the task won't re-enable the lock" and #26 calls retry. Together it sounds like this will put an arbitrary number of tasks into the retry queue. Does my analysis sound right so far?

Is there some reason we can't/shouldn't wrap the retry call to attempt to restore the lock (or raise/quit, preventing duplicates in the queue) if unlock_before_run is configured?

Clear old locks when server starts?

If the server goes down while a task is running and has acquired a lock, the lock is still in place when the server comes back up. It isn't released until the default timeout has expired and there doesn't seem to be any clean mechanism for dealing with this.

A similar project to this called celery-singleton has a clear_locks API that removes any locks. This can be called when workers are first ready.

from celery.signals import worker_ready
from celery_singleton import clear_locks
from somewhere import celery_app

@worker_ready()
def unlock_all(**kwargs):
    clear_locks(celery_app)

Does such a mechanism exist for this project? Maybe I'm missing it. If it doesn't exist, would this be a welcome feature?

Keyword-arg as list of str not handled properly

I'm facing an issue when sending a keyword argument containing a list of strings.

The task is called as

>>> import_files.delay(access_type='local:files', filepaths=map(os.path.abspath, ['tmp/tmpfile1', 'tmp/tmpfile2'])

The getcallargs(self.run, *args, **kwargs) call, returns the following dict:

{
    'access_type': 'local:files', 
    'server': None, 
    'user': None, 
    'filepaths': ['/home/br/code/project/project/tmp/tmpfile1', '/home/br/code/project/project/tmp/tmpfile2'], 
    'password': None
}

which generates the key

"qo_externalapi.tasks.marin.import_marin_files_access_type-local:files_filepaths-['/home/br/code/project/project/tmp/tmpfile1', '/home/br/code/project/project/tmp/tmpfile2']_password-None_server-None_user-None"

When the task call returns, the getcallargs(self.run, *args, **kwargs) call now returns the following dict:

{
    'access_type': 'local:files', 
    'server': None, 
    'user': None, 
    'filepaths': [u'/home/br/code/project/project/tmp/tmpfile1', u'/home/br/code/project/project/tmp/tmpfile2'], 
    'password': None
}

which generates the key

"qo_externalapi.tasks.marin.import_marin_files_access_type-local:files_filepaths-[u'/home/br/code/project/project/tmp/tmpfile1', u'/home/br/code/project/project/tmp/tmpfile2']_password-None_server-None_user-None"

The (subtle) difference is that inspect.getcallargs added the "u" prefix in front of each string in the filepaths kwargs. As this results in two different keys, I can only delay a task once, until the lock expires.

Key collisions

When sending Django models as args to tasks, part of the key is generated by converting the model to a string. This causes key collisions for us since we have multiple objects that outputs the same from __unicode__

I propose something like this https://github.com/chripede/celery-once/commit/a2b465ce8fd1aa6593cfb5bfe7712320e792bf5f to allow objects to create unique keys.

The code in my django model would look something like this

    def __unicode__(self):
        return self.domain

    def celery_once_key(self):
        return "{}-{}".format(self.pk, self.domain)

UnicodeDecodeError

When one of the parameters is not an ascii character this happens:

Task x[42ce145b-9364-4961-b9ea-12cd192aa49c] raised unexpected: UnicodeDecodeError('ascii', 'l\xc3\xa5n', 8, 9, 'ordinal not in range(128)')
...
 line 76, in queue_once_key
    key = "_".join(keys)
UnicodeDecodeError: 'ascii' codec can't decode byte 0xc3 in position 8: ordinal not in range(128)

Python 2.7

Issue working with Sentry celery integration

I'm trying to use celery-once with the Sentry celery integration, but I'm running into an issue with how Sentry patches the task tracer.

The issue I'm seeing is that the key before and after the task is patched are different. I believe this is because of how inspect.getcallargs is working with Sentry's wrapped run method.

Would there be anything wrong with storing the key on the task instance like this:

    def get_key(self, args=None, kwargs=None):
        """
        Generate the key from the name of the task (e.g. 'tasks.example') and
        args/kwargs.
        """
        if not hasattr(self, '_key'):
            restrict_to = self.once.get('keys', None)
            args = args or {}
            kwargs = kwargs or {}
            call_args = getcallargs(
                    getattr(self, '_orig_run', self.run), *args, **kwargs)
            # Remove the task instance from the kwargs. This only happens when the
            # task has the 'bind' attribute set to True. We remove it, as the task
            # has a memory pointer in its repr, that will change between the task
            # caller and the celery worker
            if isinstance(call_args.get('self'), Task):
                del call_args['self']
            self._key = queue_once_key(self.name, call_args, restrict_to)
        return self._key

Default lock keys use process memory address

I tried using QueueOnce tasks and see that lock keys they generate look like this:
qo_simple_test_task_args-(<@task: simple_test_task of __main__ at 0x10af06d38>,)_kwargs-{}

I see two problems with this key:

  • of __main__
  • at 0x10af06d38

Problem with of __main__
If entry point for process that called the task wouldn't be __main__, this part would be different. Not sure how could I achieve such entry point, but I just don't see why use process entry point as part of task key - same task posted from processes that use different entry point would be mapped to different lock keys.

Problem with at 0x10af06d38
This is a memory address and will be different for every process. If I have two separate processes posting same task, then they will be mapped to two different keys and both will get queued - not the expected behaviour for QueueOnce task. I was able to verify this problem simply by running same tasks posting code from two different terminal windows.

Are both conscious choices? They really don't seem very natural defaults to me.

For completeness - here are library versions I used to produce above key:

In [1]: import redis, celery, celery_once

In [2]: redis.__version__
Out[2]: '3.2.0'

In [3]: celery.__version__
Out[3]: '4.2.1'

In [4]: celery_once.__version__
Out[4]: '2.1.0'

Bug: Issue with shared_task

Doesn't work with shared_task.
When used with shared_task, the task doesn't get auto-discovered. Just importing the QueueOnce makes all the tasks undiscoverable.

Suggestion: Allow checking if task is pending without enqueuing it

I have a class with a method that asynchronously executes a maintenance operation of sorts asynchronously. I want certain methods of this class to raise an exception if this maintenance operation is in progress. celery-once looks ideal for my use case, except that I would like to be able to query whether the task is in progress without enqueuing it.

Looking at the code, this should just be a case of pulling out all except the last line of QueueOnce .apply_async into another method, say "already_queued".

Would there be interest in a PR for this?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.