Giter VIP home page Giter VIP logo

redisbeat's Introduction

Introduction

redisbeat is a Celery Beat Scheduler that stores periodic tasks and their status in a Redis Datastore.

Tasks can be added, removed or modified without restarting celery using redisbeat.

And you can add scheduler task dynamically when you need to add scheduled task.

Features

  1. Full-featured celery-beat scheduler.
  2. Dynamically add/remove/modify tasks.

Installation

redisbeat can be easily installed using setuptools or pip.

# pip install redisbeat

or you can install from source by cloning this repository:

# git clone https://github.com/liuliqiang/redisbeat.git
# cd redisbeat
# python setup.py install

Docker-compose demo

redisbeat provides a Docker demo in example folder that you can use:

# cd redisbeat/example
# docker-compose up -d

After you have compose running, you can easily see it working with following commands:

  1. Celery worker logs

    # docker-compose logs worker
    
  2. Celery beat logs

    # docker-compose logs beat
    
  3. dynamic add the task sub

    # docker exec -it beat python add_task.py
    
  4. dynamic remove the task sub

    # docker exec -it beat python rem_task.py
    

Running demo locally without Docker

If you want to try locally you can install the requirements from pip, and run it as a python project changing the url of redis from 'redis' to 'localhost' in tasks.py Celery instance and config:

#(...)
app = Celery('tasks', backend='redis://redis:6379',
                broker='redis://redis:6379')

app.conf.update(
    CELERY_REDIS_SCHEDULER_URL = 'redis://redis:6379',
#(...)

Commands to start worker and beat:

# celery worker -A tasks -l INFO
# celery beat -A tasks -S redisbeat.RedisScheduler -l INFO

Configuration and Usage

Configuration for redisbeat is similar to the original celery configuration for beat. You can configure redisbeat as:

# encoding: utf-8

from datetime import timedelta
from celery.schedules import crontab
from celery import Celery

app = Celery('tasks', backend='redis://localhost:6379',
             broker='redis://localhost:6379')

app.conf.update(
    CELERYBEAT_SCHEDULE={
        'perminute': {
            'task': 'tasks.add',
            'schedule': timedelta(seconds=3),
            'args': (1, 1)
        }
    }
)

@app.task
def add(x, y):
    return x + y

@app.task
def sub(x, y):
    return x - y

when you want to add a new task dynamically, you can try this code such like in __main__:

#!/usr/bin/env python
# encoding: utf-8
from datetime import timedelta
from celery import Celery
from redisbeat.scheduler import RedisScheduler


app = Celery('tasks', backend='redis://localhost:6379',
             broker='redis://localhost:6379')

app.conf.update(
    CELERYBEAT_SCHEDULE={
        'perminute': {
            'task': 'tasks.add',
            'schedule': timedelta(seconds=3),
            'args': (1, 1)
        }
    }
)

@app.task
def add(x, y):
    return x + y

@app.task
def sub(x, y):
    return x - y

if __name__ == "__main__":
    schduler = RedisScheduler(app=app)
    schduler.add(**{
        'name': 'sub-perminute',
        'task': 'tasks.sub',
        'schedule': timedelta(seconds=3),
        'args': (1, 1)
    })

It can be easily to add task for two step:

  1. Init a RedisScheduler object from Celery app
  2. Add new tasks by RedisScheduler object

Or you can define settings in your celery configuration file similar to other configurations.

CELERY_BEAT_SCHEDULER = 'redisbeat.RedisScheduler'
CELERY_REDIS_SCHEDULER_URL = 'redis://localhost:6379/1'
CELERY_REDIS_SCHEDULER_KEY = 'celery:beat:order_tasks'
CELERYBEAT_SCHEDULE = {
    'perminute': {
        'task': 'tasks.add',
        'schedule': timedelta(seconds=3),
        'args': (1, 1)
    }
}

Multiple node support

For running redisbeat in multi node deployment, it uses redis lock to prevent same task to be executed mutiple times.

CELERY_REDIS_MULTI_NODE_MODE = True
CELERY_REDIS_SCHEDULER_LOCK_TTL = 30

This is an experimental feature, to use redisbeat in production env, set CELERY_REDIS_MULTI_NODE_MODE = False, redisbeat will not use this feature.

redisbeat's People

Contributors

adrienyhuel avatar b0g3r avatar ilyaglow avatar isaqueprofeta avatar kai3341 avatar liuliqiang avatar luke0922 avatar poteri avatar shiprashalini avatar svajipay avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

redisbeat's Issues

Is there a remove or adjust example?

This package is very convenient,
but i didn't find remove or adjust example.
Could you write a remove or adjust example?
Thank you !
你好,这个包真的很方便,
但我没有发现有删除或调整任务的示例,
请问可以给出一些删除或调整任务的示例吗?

jsonpickle issues parsing datetime

I noticed this projects requirements.txt file lists Celery 4 and I'm using Celery 5 which is probably the root of this bug. Unfortunately I cannot downgrade Celery since I'm using Python 3.11 and Celery 4 depends on an older version of vine which wasn't upgraded yet to deal with the removal of inspect.formatargspec. I'm not exactly sure what is causing the error, but I found adding the following jsonpickle handler fixed the issue.

# https://stackoverflow.com/a/35634809/6169961
class DatePickleISO8601(jsonpickle.handlers.DatetimeHandler):
    def flatten(self, obj, data):
        pickler = self.context
        if not pickler.unpicklable:
            return str(obj)
        cls, args = obj.__reduce__()
        flatten = pickler.flatten
        payload = obj.isoformat()
        args = [payload] + [flatten(i, reset=False) for i in args[1:]]
        data["__reduce__"] = (flatten(cls, reset=False), args)
        return data

    def restore(self, data):
        cls, args = data["__reduce__"]
        unpickler = self.context
        restore = unpickler.restore
        cls = restore(cls, reset=False)
        value = datetime.fromisoformat(args[0])
        return value


jsonpickle.handlers.registry.register(datetime, DatePickleISO8601)

Hopefully this saves someone else a few hours. If you guys are open to PR's, I'd be happy to throw one together.

[feature] functionality to execute the task as django-celery-beat does:

Suppose we schedule the task to execute for every 1 hour and we created the schedule task at 2PM

  • Now Next Run would be Feb. 24, 2020, 3:00 p.m. , Let's imagine we kept the Scheduler
    redisbeat.RedisSchedule off for 4 hours.

  • And our time would be now "7 PM" , As soon as we start our redisbeat.RedisSchedule again , i believe it should execute the missed task immediately for only one time,since our schedule task was in the past and has missed the intervals and set the next run time automatically scheduled.

AttributeError: 'crontab' object has no attribute 'human_seconds'

settings:

CELERYBEAT_SCHEDULE = {     # default beat setting is CELERY_BEAT_SCHEDULE
    'task_check_scheduler': {
        'task': 'tasks.tasks.task_check_scheduler',
        'schedule': timedelta(seconds=60),
        "args": (None, 1, 3),
    },
    'task_check_scheduler_cron': {
        'task': 'tasks.tasks.task_check_scheduler',
        'schedule': crontab(minute='*/1', hour='*', day_of_week='*', day_of_month='*', month_of_year='*'),
        "args": (None, 1, 3),
    }
}

error:

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/python372/lib/python3.7/site-packages/celery/apps/beat.py", line 109, in start_scheduler
    service.start()
  File "/home/python372/lib/python3.7/site-packages/celery/beat.py", line 588, in start
    humanize_seconds(self.scheduler.max_interval))
  File "/home/python372/lib/python3.7/site-packages/kombu/utils/objects.py", line 44, in __get__
    value = obj.__dict__[self.__name__] = self.__get(obj)
  File "/home/python372/lib/python3.7/site-packages/celery/beat.py", line 632, in scheduler
    return self.get_scheduler()
  File "/home/python372/lib/python3.7/site-packages/celery/beat.py", line 627, in get_scheduler
    lazy=lazy,
  File "/home/python372/lib/python3.7/site-packages/redisbeat/scheduler.py", line 48, in __init__
    Scheduler.__init__(self, *args, **kwargs)
  File "/home/python372/lib/python3.7/site-packages/celery/beat.py", line 226, in __init__
    self.setup_schedule()
  File "/home/python372/lib/python3.7/site-packages/redisbeat/scheduler.py", line 72, in setup_schedule
    for entry in tasks))
  File "/home/python372/lib/python3.7/site-packages/redisbeat/scheduler.py", line 72, in <genexpr>
    for entry in tasks))
AttributeError: 'crontab' object has no attribute 'human_seconds'

should update function setup_schedule like this:

    def setup_schedule(self):
        # init entries
        self.merge_inplace(self.app.conf.CELERYBEAT_SCHEDULE)
        tasks = [jsonpickle.decode(entry) for entry in self.rdb.zrange(self.key, 0, -1)]
        # linfo('Current schedule:\n' + '\n'.join(
        #       str('task: ' + entry.task + '; each: ' + entry.schedule.human_seconds)
        #       for entry in tasks))
        for entry in tasks:
            if hasattr(entry.schedule, 'human_seconds'):
                linfo('Current schedule:\n' + str('task: ' + entry.task + '; each: ' + entry.schedule.human_seconds))
            else:
                cron = '{minute} {hour} {day_of_month} {month_of_year} {day_of_week}'.format(
                    minute=entry.schedule._orig_minute,
                    hour=entry.schedule._orig_hour,
                    day_of_month=entry.schedule._orig_day_of_month,
                    month_of_year=entry.schedule._orig_month_of_year,
                    day_of_week=entry.schedule._orig_day_of_week,
                )
                linfo('Current schedule:\n' + str('task: ' + entry.task + '; cron: ' + cron))

Problematic dependency on jsonpickle==1.2

Output from https://github.com/pyupio/safety:

safety check --full-report
+==============================================================================+
|                                                                              |
|                               /$$$$$$            /$$                         |
|                              /$$__  $$          | $$                         |
|           /$$$$$$$  /$$$$$$ | $$  \__//$$$$$$  /$$$$$$   /$$   /$$           |
|          /$$_____/ |____  $$| $$$$   /$$__  $$|_  $$_/  | $$  | $$           |
|         |  $$$$$$   /$$$$$$$| $$_/  | $$$$$$$$  | $$    | $$  | $$           |
|          \____  $$ /$$__  $$| $$    | $$_____/  | $$ /$$| $$  | $$           |
|          /$$$$$$$/|  $$$$$$$| $$    |  $$$$$$$  |  $$$$/|  $$$$$$$           |
|         |_______/  \_______/|__/     \_______/   \___/   \____  $$           |
|                                                          /$$  | $$           |
|                                                         |  $$$$$$/           |
|  by pyup.io                                              \______/            |
|                                                                              |
+==============================================================================+
| REPORT                                                                       |
| checked 200 packages, using default DB                                       |
+============================+===========+==========================+==========+
| package                    | installed | affected                 | ID       |
+============================+===========+==========================+==========+
| jsonpickle                 | 1.2       | <=1.4.1                  | 39319    |
+==============================================================================+
| Jsonpickle through 1.4.1 allows remote code execution during deserialization |
| of a malicious payload through the decode() function. See CVE-2020-22083.    |
+==============================================================================+

Can redisbeat work with a newer version of jsonpickle? setup.py requires that exact version:

'jsonpickle==1.2',

Remote Redis

Hello!

I tried to use remote redis docker, but all my attempts were unsuccessful.
All times when i run celerybeatredis, celery beat use localhost instead remote address (redis://redis:6379).

celery beat v4.1.0 (latentcall) is starting.
__    -    ... __   -        _
LocalTime -> 2018-01-06 18:46:45
Configuration ->
    . broker -> redis://redis:6379//
    . loader -> celery.loaders.app.AppLoader
    . scheduler -> redisbeat.scheduler.RedisScheduler
    . db -> redis://localhost:6379, key -> celery:beat:order_tasks
    . logfile -> [stderr]@%WARNING

but celery worker use right address of redis.

I try in docker:

environment:
      CELERY_BROKER_URL: 'redis://redis:6379'
      CELERY_RESULT_BACKEND: 'redis://redis:6379'
      CELERY_TASK_SERIALIZER: 'json'
      CELERY_RESULT_SERIALIZER: 'json'
      CELERY_RESULT_BACKEND: "redis"
      CELERY_REDIS_HOST: "redis"
      CELERY_REDIS_PORT: 6379
      CELERY_REDIS_DB: 0
      CELERY_BEAT_SCHEDULER: 'redisbeat.RedisScheduler'
      CELERY_REDIS_SCHEDULER_URL: 'redis://redis:6379/1'
      CELERY_REDIS_SCHEDULER_KEY: 'celery:beat:order_test'
      WAIT_HOSTS: redis:6379, elasticsearch:9200

Try in .py file like this:

CELERY_BEAT_SCHEDULER = 'redisbeat.RedisScheduler'
CELERY_REDIS_SCHEDULER_URL = 'redis://redis:6379'
CELERY_REDIS_SCHEDULER_KEY = 'celery:beat:order_test'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_RESULT_BACKEND = "redis"
CELERY_REDIS_HOST = "redis"
CELERY_REDIS_PORT = 6379
CELERY_REDIS_DB = 1
CELERY_BROKER_URL = 'redis://redis:6379/0'
BROKER_URL = CELERY_BROKER_URL
CELERY_RESULT_BACKEND = 'redis://redis:6379/1'

app = Celery('test', backend='redis://{}'.format(REDIS),
             broker='redis://{}'.format(REDIS))

app.conf.broker_url = 'redis://redis:6379/0'
app.conf.result_backend = 'redis://redis:6379/0'

I also try use this settings config file celery and use --config option on celery beat.

How i can use remote redis for celerybeatredis ?

each time init RedisScheduler - repeated call

when I run celery beat -A manage.celery_app -S RedisScheduler ,it will init a new RedisScheduler object,and obey this call chain: __init__->setup_schedule->merge_inplace.

In merge_inplace() it will load tasks from conf and redis's original tasks.
while I have a dout when you add/rm a job like this:

if __name__ == "__main__":
    schduler = RedisScheduler(app=app)
    schduler.add(**{
        'name': 'sub-perminute',
        'task': 'tasks.sub',
        'schedule': timedelta(seconds=3),
        'args': (1, 1)
    })

I wonder whether it will call this chain __init__->setup_schedule->merge_inplace again, which means I have to reload all the tasks first??

"remove" seems doesn't work

I tried to cancel the job by using "scheduler.remove(app)"
but it still go on receiving the task and execute it

How to setup an Timzone properly?

LANGUAGE_CODE = 'en-us'
TIME_ZONE = 'Asia/Kolkata'
USE_I18N = True
USE_L10N = True
USE_TZ = True

CELERY_BROKER_URL = 'redis://localhost:6379'
CELERY_RESULT_BACKEND =  CELERY_BROKER_URL
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_REDBEAT_REDIS_URL = 'redis://localhost:6379/1'
CELERY_ENABLED_UTC = False
CELERY_TIMEZONE = 'Asia/Kolkata'
CELERYBEAT_MAX_LOOP_INTERVAL = 1  # redbeat likes fast loops ## keep it 5
CELERYBEAT_SCHEDULER = 'redbeat.RedBeatScheduler'
CELERY_TASK_SOFT_TIME_LIMIT = 14400
CELERY_TASK_TIME_LIMIT = 14420
interval_c = celery.schedules.schedule(run_every=20)  # seconds
entry = RedBeatSchedulerEntry('demo_task','mainapp.tasks.Demo_Task_Run', interval_c, args=["arbaz"],options={'queue':'mainqueue'},app=app)
entry.save()
print(entry.key)

As soon as I run the scheduler task gets executed for once only and it won't repeat, this behavior is applicable on setting a CELERY_TIMEZONE

version 1.2.5 is broken

Using the version 1.2.5, I got the following error:

  File "/home/runner/.cache/pypoetry/virtualenvs/rero-ils-dOBfF0Z2-py3.9/lib/python3.9/site-packages/redisbeat/scheduler.py", line 95
    debug("old_entries: %s",old_entries_dict))

List and iterate scheduled tasks

I wonder if anyone besides me is interested in a couple helper functions to list and iterate scheduled tasks.

Something like this:

diff --git a/redisbeat/scheduler.py b/redisbeat/scheduler.py
index a2e58fc..52f291d 100644
--- a/redisbeat/scheduler.py
+++ b/redisbeat/scheduler.py
@@ -155,3 +155,11 @@ class RedisScheduler(Scheduler):
     def info(self):
         # return infomation about Schedule
         return '    . db -> {self.schedule_url}, key -> {self.key}'.format(self=self)
+
+    def list(self):
+        # return a list with celery.beat.ScheduleEntry objects
+        return [pickle.loads(entry) for entry in self.rdb.zrange(self.key, 0, MAXINT)]
+
+    def itertasks(self):
+        # return a generator of celery.beat.ScheduleEntry objects
+        return (pickle.loads(entry) for entry in self.rdb.zrange(self.key, 0, MAXINT))

@yetship, I can send a PR if you wish

Can't see current schedule on INFO

Current schedule is always empty on my INFO Log.

My references from requirements.txt:

celery==4.3.0
redis==3.3.8
jsonpickle==1.0
redisbeat==1.1.4

Makes sense to you or I'm doing something wrong?

Error Installing redisbeat via pip

$ pip install redisbeat
Collecting redisbeat
  Using cached redisbeat-0.1.0.tar.gz
    Complete output from command python setup.py egg_info:
    Traceback (most recent call last):
      File "<string>", line 1, in <module>
      File "/private/var/folders/qk/1npqk9fd1m146__ylc2c36cc0000gn/T/pip-build-px8vPV/redisbeat/setup.py", line 13, in <module>
        with open(path.join(here, 'README.md'), encoding='utf-8') as f:
      File "/Users/moonlace/PycharmProjects/wolfe/lib/python2.7/codecs.py", line 884, in open
        file = __builtin__.open(filename, mode, buffering)
    IOError: [Errno 2] No such file or directory: '/private/var/folders/qk/1npqk9fd1m146__ylc2c36cc0000gn/T/pip-build-px8vPV/redisbeat/README.md'

    ----------------------------------------
Command "python setup.py egg_info" failed with error code 1 in /private/var/folders/qk/1npqk9fd1m146__ylc2c36cc0000gn/T/pip-build-px8vPV/redisbeat/

dynamically add task has problem!!!

hi ,author
when i execute python add_task.py ,first has a error
2017-09-04 17:55:12,343: ERROR/MainProcess] Received unregistered task of type u'tasks.sub3'.
The message has been ignored and discarded.

Did you remember to import the module containing this task?
Or m /aybe you're using relative imports?

Please see
http://docs.celeryq.org/en/latest/internals/protocol.html
for more information.

The full contents of the message body was:
'[[99, 100], {}, {"chord": null, "callbacks": null, "errbacks": null, "chain": null}]' (84b)
Traceback (most recent call last):
File "/Library/Python/2.7/site-packages/celery/worker/consumer/consumer.py", line 559, in on_task_received
strategy = strategies[type_]
KeyError: u'tasks.sub3'

but,when i restart (celery worker and celery beat) it ,it will work properly ,why is that ??

remove_task.py the same mistake ,
Look forward to your reply

新增task 问题

如果使用 schduler.remove 将所有任务删除后, 再次schduler.add 则任务不会执行。 需要重启 celery beat 才会再次开始执行。

Hi, I encountered the following error while using this framework. Could you please advise on how to resolve it? My Redis is in cluster mode.

File "/data/tasks.py", line 177, in
scheduler = RedisScheduler(app=app)
File "/usr/local/lib/python3.9/site-packages/redisbeat/scheduler.py", line 48, in init
Scheduler.init(self, *args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/celery/beat.py", line 257, in init
self.setup_schedule()
File "/usr/local/lib/python3.9/site-packages/redisbeat/scheduler.py", line 68, in setup_schedule
self.merge_inplace(self.app.conf.CELERYBEAT_SCHEDULE)
File "/usr/local/lib/python3.9/site-packages/redisbeat/scheduler.py", line 75, in merge_inplace
old_entries = self.rdb.zrangebyscore(self.key, 0, MAXINT, withscores=True)
File "/usr/local/lib/python3.9/site-packages/redis/client.py", line 2861, in zrangebyscore
return self.execute_command(*pieces, **options)
File "/usr/local/lib/python3.9/site-packages/redis/client.py", line 898, in execute_command
conn = self.connection or pool.get_connection(command_name, **options)
File "/usr/local/lib/python3.9/site-packages/redis/connection.py", line 1183, in get_connection
connection.connect()
File "/usr/local/lib/python3.9/site-packages/redis/connection.py", line 558, in connect
self.on_connect()
File "/usr/local/lib/python3.9/site-packages/redis/connection.py", line 655, in on_connect
if nativestr(self.read_response()) != 'OK':
File "/usr/local/lib/python3.9/site-packages/redis/connection.py", line 747, in read_response
raise response
redis.exceptions.ResponseError: SELECT is not allowed in cluster mode

ImportError: Missing redis library (pip install redis)

Hi @yetship , I already installed redis, but when I pasted codecelery worker -A tasks -l info on command line, I always received ImportError: Missing redis library (pip install redis), could you please help with this problem? I am using Ubuntu 16.04 system, Python 2.7 and Python 3.5 on virtualenvwrapper virtual environment.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.