liuliqiang / redisbeat Goto Github PK
View Code? Open in Web Editor NEWcelery redis scheduler, dynamic add/modify/delete task from celery.
License: MIT License
celery redis scheduler, dynamic add/modify/delete task from celery.
License: MIT License
Suppose we schedule the task to execute for every 1 hour and we created the
schedule
task at 2PM
Now Next Run
would be Feb. 24, 2020, 3:00 p.m.
, Let's imagine we kept the Scheduler
redisbeat.RedisSchedule
off for 4 hours.
And our time would be now "7 PM" , As soon as we start our redisbeat.RedisSchedule
again , i believe it should execute the missed task immediately for only one
time,since our schedule task was in the past and has missed the intervals and set the next run time automatically scheduled.
Support for redis sentinel is absent.
hi ,author
when i execute python add_task.py ,first has a error
2017-09-04 17:55:12,343: ERROR/MainProcess] Received unregistered task of type u'tasks.sub3'.
The message has been ignored and discarded.
Did you remember to import the module containing this task?
Or m /aybe you're using relative imports?
Please see
http://docs.celeryq.org/en/latest/internals/protocol.html
for more information.
The full contents of the message body was:
'[[99, 100], {}, {"chord": null, "callbacks": null, "errbacks": null, "chain": null}]' (84b)
Traceback (most recent call last):
File "/Library/Python/2.7/site-packages/celery/worker/consumer/consumer.py", line 559, in on_task_received
strategy = strategies[type_]
KeyError: u'tasks.sub3'
but,when i restart (celery worker and celery beat) it ,it will work properly ,why is that ??
remove_task.py the same mistake ,
Look forward to your reply
PeriodicTask tasks mutate app.conf.beat_schedule
https://github.com/celery/celery/blob/595b9d717bafaf31e99b749e13e765df5e1a50f1/celery/task/base.py#L259-L267
But redisbeat uses the unpacked version of setting:
redisbeat/redisbeat/scheduler.py
Line 68 in 88446a1
On my project, we use periodic_task decorator and it looks like a problem for us :sad:
I tried to cancel the job by using "scheduler.remove(app)"
but it still go on receiving the task and execute it
File "/data/tasks.py", line 177, in
scheduler = RedisScheduler(app=app)
File "/usr/local/lib/python3.9/site-packages/redisbeat/scheduler.py", line 48, in init
Scheduler.init(self, *args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/celery/beat.py", line 257, in init
self.setup_schedule()
File "/usr/local/lib/python3.9/site-packages/redisbeat/scheduler.py", line 68, in setup_schedule
self.merge_inplace(self.app.conf.CELERYBEAT_SCHEDULE)
File "/usr/local/lib/python3.9/site-packages/redisbeat/scheduler.py", line 75, in merge_inplace
old_entries = self.rdb.zrangebyscore(self.key, 0, MAXINT, withscores=True)
File "/usr/local/lib/python3.9/site-packages/redis/client.py", line 2861, in zrangebyscore
return self.execute_command(*pieces, **options)
File "/usr/local/lib/python3.9/site-packages/redis/client.py", line 898, in execute_command
conn = self.connection or pool.get_connection(command_name, **options)
File "/usr/local/lib/python3.9/site-packages/redis/connection.py", line 1183, in get_connection
connection.connect()
File "/usr/local/lib/python3.9/site-packages/redis/connection.py", line 558, in connect
self.on_connect()
File "/usr/local/lib/python3.9/site-packages/redis/connection.py", line 655, in on_connect
if nativestr(self.read_response()) != 'OK':
File "/usr/local/lib/python3.9/site-packages/redis/connection.py", line 747, in read_response
raise response
redis.exceptions.ResponseError: SELECT is not allowed in cluster mode
settings:
CELERYBEAT_SCHEDULE = { # default beat setting is CELERY_BEAT_SCHEDULE
'task_check_scheduler': {
'task': 'tasks.tasks.task_check_scheduler',
'schedule': timedelta(seconds=60),
"args": (None, 1, 3),
},
'task_check_scheduler_cron': {
'task': 'tasks.tasks.task_check_scheduler',
'schedule': crontab(minute='*/1', hour='*', day_of_week='*', day_of_month='*', month_of_year='*'),
"args": (None, 1, 3),
}
}
error:
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/python372/lib/python3.7/site-packages/celery/apps/beat.py", line 109, in start_scheduler
service.start()
File "/home/python372/lib/python3.7/site-packages/celery/beat.py", line 588, in start
humanize_seconds(self.scheduler.max_interval))
File "/home/python372/lib/python3.7/site-packages/kombu/utils/objects.py", line 44, in __get__
value = obj.__dict__[self.__name__] = self.__get(obj)
File "/home/python372/lib/python3.7/site-packages/celery/beat.py", line 632, in scheduler
return self.get_scheduler()
File "/home/python372/lib/python3.7/site-packages/celery/beat.py", line 627, in get_scheduler
lazy=lazy,
File "/home/python372/lib/python3.7/site-packages/redisbeat/scheduler.py", line 48, in __init__
Scheduler.__init__(self, *args, **kwargs)
File "/home/python372/lib/python3.7/site-packages/celery/beat.py", line 226, in __init__
self.setup_schedule()
File "/home/python372/lib/python3.7/site-packages/redisbeat/scheduler.py", line 72, in setup_schedule
for entry in tasks))
File "/home/python372/lib/python3.7/site-packages/redisbeat/scheduler.py", line 72, in <genexpr>
for entry in tasks))
AttributeError: 'crontab' object has no attribute 'human_seconds'
should update function setup_schedule
like this:
def setup_schedule(self):
# init entries
self.merge_inplace(self.app.conf.CELERYBEAT_SCHEDULE)
tasks = [jsonpickle.decode(entry) for entry in self.rdb.zrange(self.key, 0, -1)]
# linfo('Current schedule:\n' + '\n'.join(
# str('task: ' + entry.task + '; each: ' + entry.schedule.human_seconds)
# for entry in tasks))
for entry in tasks:
if hasattr(entry.schedule, 'human_seconds'):
linfo('Current schedule:\n' + str('task: ' + entry.task + '; each: ' + entry.schedule.human_seconds))
else:
cron = '{minute} {hour} {day_of_month} {month_of_year} {day_of_week}'.format(
minute=entry.schedule._orig_minute,
hour=entry.schedule._orig_hour,
day_of_month=entry.schedule._orig_day_of_month,
month_of_year=entry.schedule._orig_month_of_year,
day_of_week=entry.schedule._orig_day_of_week,
)
linfo('Current schedule:\n' + str('task: ' + entry.task + '; cron: ' + cron))
l want to support amqp.
example : rabbitmq
Output from https://github.com/pyupio/safety:
safety check --full-report
+==============================================================================+
| |
| /$$$$$$ /$$ |
| /$$__ $$ | $$ |
| /$$$$$$$ /$$$$$$ | $$ \__//$$$$$$ /$$$$$$ /$$ /$$ |
| /$$_____/ |____ $$| $$$$ /$$__ $$|_ $$_/ | $$ | $$ |
| | $$$$$$ /$$$$$$$| $$_/ | $$$$$$$$ | $$ | $$ | $$ |
| \____ $$ /$$__ $$| $$ | $$_____/ | $$ /$$| $$ | $$ |
| /$$$$$$$/| $$$$$$$| $$ | $$$$$$$ | $$$$/| $$$$$$$ |
| |_______/ \_______/|__/ \_______/ \___/ \____ $$ |
| /$$ | $$ |
| | $$$$$$/ |
| by pyup.io \______/ |
| |
+==============================================================================+
| REPORT |
| checked 200 packages, using default DB |
+============================+===========+==========================+==========+
| package | installed | affected | ID |
+============================+===========+==========================+==========+
| jsonpickle | 1.2 | <=1.4.1 | 39319 |
+==============================================================================+
| Jsonpickle through 1.4.1 allows remote code execution during deserialization |
| of a malicious payload through the decode() function. See CVE-2020-22083. |
+==============================================================================+
Can redisbeat work with a newer version of jsonpickle? setup.py requires that exact version:
Line 75 in c1a9a4f
When using the library via pip install redisbeat, I see errors related to sys.maxint, which changed in python3. it works fine if I pull the code and install.
Is this not updated ? Shows an old date of March 2017.
https://pypi.org/project/redisbeat/
$ pip install redisbeat
Collecting redisbeat
Using cached redisbeat-0.1.0.tar.gz
Complete output from command python setup.py egg_info:
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "/private/var/folders/qk/1npqk9fd1m146__ylc2c36cc0000gn/T/pip-build-px8vPV/redisbeat/setup.py", line 13, in <module>
with open(path.join(here, 'README.md'), encoding='utf-8') as f:
File "/Users/moonlace/PycharmProjects/wolfe/lib/python2.7/codecs.py", line 884, in open
file = __builtin__.open(filename, mode, buffering)
IOError: [Errno 2] No such file or directory: '/private/var/folders/qk/1npqk9fd1m146__ylc2c36cc0000gn/T/pip-build-px8vPV/redisbeat/README.md'
----------------------------------------
Command "python setup.py egg_info" failed with error code 1 in /private/var/folders/qk/1npqk9fd1m146__ylc2c36cc0000gn/T/pip-build-px8vPV/redisbeat/
when I run celery beat -A manage.celery_app -S RedisScheduler
,it will init a new RedisScheduler object,and obey this call chain: __init__
->setup_schedule->merge_inplace.
In merge_inplace() it will load tasks from conf and redis's original tasks.
while I have a dout when you add/rm a job like this:
if __name__ == "__main__":
schduler = RedisScheduler(app=app)
schduler.add(**{
'name': 'sub-perminute',
'task': 'tasks.sub',
'schedule': timedelta(seconds=3),
'args': (1, 1)
})
I wonder whether it will call this chain __init__
->setup_schedule->merge_inplace again, which means I have to reload all the tasks first??
Dynamically-add-periodic-tasks works fine
But when shutdown the scheduler and restart it, the period task was overridden.
Is there a way to avoid this?
Using the version 1.2.5, I got the following error:
File "/home/runner/.cache/pypoetry/virtualenvs/rero-ils-dOBfF0Z2-py3.9/lib/python3.9/site-packages/redisbeat/scheduler.py", line 95
debug("old_entries: %s",old_entries_dict))
I noticed this projects requirements.txt
file lists Celery 4 and I'm using Celery 5 which is probably the root of this bug. Unfortunately I cannot downgrade Celery since I'm using Python 3.11 and Celery 4 depends on an older version of vine which wasn't upgraded yet to deal with the removal of inspect.formatargspec
. I'm not exactly sure what is causing the error, but I found adding the following jsonpickle
handler fixed the issue.
# https://stackoverflow.com/a/35634809/6169961
class DatePickleISO8601(jsonpickle.handlers.DatetimeHandler):
def flatten(self, obj, data):
pickler = self.context
if not pickler.unpicklable:
return str(obj)
cls, args = obj.__reduce__()
flatten = pickler.flatten
payload = obj.isoformat()
args = [payload] + [flatten(i, reset=False) for i in args[1:]]
data["__reduce__"] = (flatten(cls, reset=False), args)
return data
def restore(self, data):
cls, args = data["__reduce__"]
unpickler = self.context
restore = unpickler.restore
cls = restore(cls, reset=False)
value = datetime.fromisoformat(args[0])
return value
jsonpickle.handlers.registry.register(datetime, DatePickleISO8601)
Hopefully this saves someone else a few hours. If you guys are open to PR's, I'd be happy to throw one together.
This package is very convenient,
but i didn't find remove or adjust example.
Could you write a remove or adjust example?
Thank you !
你好,这个包真的很方便,
但我没有发现有删除或调整任务的示例,
请问可以给出一些删除或调整任务的示例吗?
I wonder if anyone besides me is interested in a couple helper functions to list and iterate scheduled tasks.
Something like this:
diff --git a/redisbeat/scheduler.py b/redisbeat/scheduler.py
index a2e58fc..52f291d 100644
--- a/redisbeat/scheduler.py
+++ b/redisbeat/scheduler.py
@@ -155,3 +155,11 @@ class RedisScheduler(Scheduler):
def info(self):
# return infomation about Schedule
return ' . db -> {self.schedule_url}, key -> {self.key}'.format(self=self)
+
+ def list(self):
+ # return a list with celery.beat.ScheduleEntry objects
+ return [pickle.loads(entry) for entry in self.rdb.zrange(self.key, 0, MAXINT)]
+
+ def itertasks(self):
+ # return a generator of celery.beat.ScheduleEntry objects
+ return (pickle.loads(entry) for entry in self.rdb.zrange(self.key, 0, MAXINT))
@yetship, I can send a PR if you wish
Hi @yetship , I already installed redis, but when I pasted codecelery worker -A tasks -l info
on command line, I always received ImportError: Missing redis library (pip install redis)
, could you please help with this problem? I am using Ubuntu 16.04 system, Python 2.7 and Python 3.5 on virtualenvwrapper virtual environment.
Current schedule is always empty on my INFO Log.
My references from requirements.txt:
celery==4.3.0
redis==3.3.8
jsonpickle==1.0
redisbeat==1.1.4
Makes sense to you or I'm doing something wrong?
如果使用 schduler.remove 将所有任务删除后, 再次schduler.add 则任务不会执行。 需要重启 celery beat 才会再次开始执行。
Hello!
I tried to use remote redis docker, but all my attempts were unsuccessful.
All times when i run celerybeatredis, celery beat use localhost instead remote address (redis://redis:6379).
celery beat v4.1.0 (latentcall) is starting.
__ - ... __ - _
LocalTime -> 2018-01-06 18:46:45
Configuration ->
. broker -> redis://redis:6379//
. loader -> celery.loaders.app.AppLoader
. scheduler -> redisbeat.scheduler.RedisScheduler
. db -> redis://localhost:6379, key -> celery:beat:order_tasks
. logfile -> [stderr]@%WARNING
but celery worker use right address of redis.
I try in docker:
environment:
CELERY_BROKER_URL: 'redis://redis:6379'
CELERY_RESULT_BACKEND: 'redis://redis:6379'
CELERY_TASK_SERIALIZER: 'json'
CELERY_RESULT_SERIALIZER: 'json'
CELERY_RESULT_BACKEND: "redis"
CELERY_REDIS_HOST: "redis"
CELERY_REDIS_PORT: 6379
CELERY_REDIS_DB: 0
CELERY_BEAT_SCHEDULER: 'redisbeat.RedisScheduler'
CELERY_REDIS_SCHEDULER_URL: 'redis://redis:6379/1'
CELERY_REDIS_SCHEDULER_KEY: 'celery:beat:order_test'
WAIT_HOSTS: redis:6379, elasticsearch:9200
Try in .py file like this:
CELERY_BEAT_SCHEDULER = 'redisbeat.RedisScheduler'
CELERY_REDIS_SCHEDULER_URL = 'redis://redis:6379'
CELERY_REDIS_SCHEDULER_KEY = 'celery:beat:order_test'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_RESULT_BACKEND = "redis"
CELERY_REDIS_HOST = "redis"
CELERY_REDIS_PORT = 6379
CELERY_REDIS_DB = 1
CELERY_BROKER_URL = 'redis://redis:6379/0'
BROKER_URL = CELERY_BROKER_URL
CELERY_RESULT_BACKEND = 'redis://redis:6379/1'
app = Celery('test', backend='redis://{}'.format(REDIS),
broker='redis://{}'.format(REDIS))
app.conf.broker_url = 'redis://redis:6379/0'
app.conf.result_backend = 'redis://redis:6379/0'
I also try use this settings config file celery and use --config option on celery beat.
How i can use remote redis for celerybeatredis ?
CELERY_REDIS_SCHEDULER_URL in the format 'redis://:' + passw + "@" + host + ":" + str(port) + "/1" is not working and resulting in timeouts. It is valid for celery otherwise.
目前只能通过@app.task装饰器写死task
LANGUAGE_CODE = 'en-us'
TIME_ZONE = 'Asia/Kolkata'
USE_I18N = True
USE_L10N = True
USE_TZ = True
CELERY_BROKER_URL = 'redis://localhost:6379'
CELERY_RESULT_BACKEND = CELERY_BROKER_URL
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_REDBEAT_REDIS_URL = 'redis://localhost:6379/1'
CELERY_ENABLED_UTC = False
CELERY_TIMEZONE = 'Asia/Kolkata'
CELERYBEAT_MAX_LOOP_INTERVAL = 1 # redbeat likes fast loops ## keep it 5
CELERYBEAT_SCHEDULER = 'redbeat.RedBeatScheduler'
CELERY_TASK_SOFT_TIME_LIMIT = 14400
CELERY_TASK_TIME_LIMIT = 14420
interval_c = celery.schedules.schedule(run_every=20) # seconds
entry = RedBeatSchedulerEntry('demo_task','mainapp.tasks.Demo_Task_Run', interval_c, args=["arbaz"],options={'queue':'mainqueue'},app=app)
entry.save()
print(entry.key)
As soon as I run the scheduler task gets executed for once only and it won't repeat, this behavior is applicable on setting a CELERY_TIMEZONE
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.