tuomur / celery_sqlalchemy_scheduler Goto Github PK
View Code? Open in Web Editor NEWCelery scheduler that uses SQLAlchemy for storage.
License: MIT License
Celery scheduler that uses SQLAlchemy for storage.
License: MIT License
After searching alot, i finally found this. But i dont know how to run this.
See, i added the schedule and schedule model in my flask project and set the CELERYBEAT_SCHEDULER = 'scheduler.sqlalchemy_scheduler:DatabaseScheduler'
to my project and your file name. The app is running and so is the beat. now how to add periodic task dynamically ? any code snippet ?
Tables are created in db too
Celery beat working with the sqlalchemy
Configuration ->
. broker -> redis://localhost:6379//
. loader -> celery.loaders.app.AppLoader
. scheduler -> app.sqlalchemy_scheduler.DatabaseScheduler
. logfile -> [stderr]@%INFO
. maxinterval -> now (0s)
[2015-11-13 16:40:37,838: INFO/MainProcess] beat: Starting...
First of all thanks for the good job, it really helped me a lot.
I used it in a flask app and adding, updating scheduled tasks runs very well.
However when I tried to delete a DatabaseSchedulerEntry
dbse = dbsession.query(DatabaseSchedulerEntry).filter_by(name='myTask').first()
if dbse:
dbsession.delete(dbse)
dbsession.commit()
Everything seems ok however when the execution time comes beat gives an error and quits.
[2016-01-25 21:18:03,354: INFO/MainProcess] Scheduler: Sending due task myTask (add)
[2016-01-25 21:18:03,361: ERROR/MainProcess] Message Error: UPDATE statement on table 'celery_schedules' expected to update 1 row(s); 0 were matched.test_app
[' File "vir_python/bin/celery", line 11, in <module>\n sys.exit(main())\n', ' File "/Users/savassen/DEV/test_app/vir_python/lib/python2.7/site-packages/celery/__main__.py", line 30, in main\n main()\n', ' File "/Users/savassen/DEV/test_app/vir_python/lib/python2.7/site-packages/celery/bin/celery.py", line 81, in main\n cmd.execute_from_commandline(argv)\n', ' File "/Users/savassen/DEV/test_app/vir_python/lib/python2.7/site-packages/celery/bin/celery.py", line 769, in execute_from_commandline\n super(CeleryCommand, self).execute_from_commandline(argv)))\n', ' File "/Users/savassen/DEV/test_app/vir_python/lib/python2.7/site-packages/celery/bin/base.py", line 311, in execute_from_commandline\n return self.handle_argv(self.prog_name, argv[1:])\n', ' File "/Users/savassen/DEV/test_app/vir_python/lib/python2.7/site-packages/celery/bin/celery.py", line 761, in handle_argv\n return self.execute(command, argv)\n', ' File "/Users/savassen/DEV/test_app/vir_python/lib/python2.7/site-packages/celery/bin/celery.py", line 693, in execute\n ).run_from_argv(self.prog_name, argv[1:], command=argv[0])\n', ' File "/Users/savassen/DEV/test_app/vir_python/lib/python2.7/site-packages/celery/bin/base.py", line 315, in run_from_argv\n sys.argv if argv is None else argv, command)\n', ' File "/Users/savassen/DEV/test_app/vir_python/lib/python2.7/site-packages/celery/bin/base.py", line 377, in handle_argv\n return self(*args, **options)\n', ' File "/Users/savassen/DEV/test_app/vir_python/lib/python2.7/site-packages/celery/bin/base.py", line 274, in __call__\n ret = self.run(*args, **kwargs)\n', ' File "/Users/savassen/DEV/test_app/vir_python/lib/python2.7/site-packages/celery/bin/beat.py", line 79, in run\n return beat().run()\n', ' File "/Users/savassen/DEV/test_app/vir_python/lib/python2.7/site-packages/celery/apps/beat.py", line 83, in run\n self.start_scheduler()\n', ' File "/Users/savassen/DEV/test_app/vir_python/lib/python2.7/site-packages/celery/apps/beat.py", line 112, in start_scheduler\n beat.start()\n', ' File "/Users/savassen/DEV/test_app/vir_python/lib/python2.7/site-packages/celery/beat.py", line 463, in start\n interval = self.scheduler.tick()\n', ' File "/Users/savassen/DEV/test_app/test_app_AdminPanel/sqlalchemy_scheduler.py", line 150, in tick\n Scheduler.tick(self)\n', ' File "/Users/savassen/DEV/test_app/vir_python/lib/python2.7/site-packages/celery/beat.py", line 221, in tick\n next_time_to_run = self.maybe_due(entry, self.publisher)\n', ' File "/Users/savassen/DEV/test_app/vir_python/lib/python2.7/site-packages/celery/beat.py", line 207, in maybe_due\n exc, traceback.format_stack(), exc_info=True)\n']
Traceback (most recent call last):
File "/Users/savassen/DEV/test_app/vir_python/lib/python2.7/site-packages/celery/beat.py", line 204, in maybe_due
result = self.apply_async(entry, publisher=publisher)
File "/Users/savassen/DEV/test_app/vir_python/lib/python2.7/site-packages/celery/beat.py", line 245, in apply_async
entry = self.reserve(entry)
File "/Users/savassen/DEV/test_app/vir_python/lib/python2.7/site-packages/celery/beat.py", line 238, in reserve
new_entry = self.schedule[entry.name] = next(entry)
File "/Users/savassen/DEV/test_app/test_app_AdminPanel/sqlalchemy_scheduler.py", line 64, in __next__
dbsession.commit()
File "/Users/savassen/DEV/test_app/vir_python/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 801, in commit
self.transaction.commit()
File "/Users/savassen/DEV/test_app/vir_python/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 392, in commit
self._prepare_impl()
File "/Users/savassen/DEV/test_app/vir_python/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 372, in _prepare_impl
self.session.flush()
File "/Users/savassen/DEV/test_app/vir_python/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 2015, in flush
self._flush(objects)
File "/Users/savassen/DEV/test_app/vir_python/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 2133, in _flush
transaction.rollback(_capture_exception=True)
File "/Users/savassen/DEV/test_app/vir_python/lib/python2.7/site-packages/sqlalchemy/util/langhelpers.py", line 60, in __exit__
compat.reraise(exc_type, exc_value, exc_tb)
File "/Users/savassen/DEV/test_app/vir_python/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 2097, in _flush
flush_context.execute()
File "/Users/savassen/DEV/test_app/vir_python/lib/python2.7/site-packages/sqlalchemy/orm/unitofwork.py", line 373, in execute
rec.execute(self)
File "/Users/savassen/DEV/test_app/vir_python/lib/python2.7/site-packages/sqlalchemy/orm/unitofwork.py", line 532, in execute
uow
File "/Users/savassen/DEV/test_app/vir_python/lib/python2.7/site-packages/sqlalchemy/orm/persistence.py", line 170, in save_obj
mapper, table, update)
File "/Users/savassen/DEV/test_app/vir_python/lib/python2.7/site-packages/sqlalchemy/orm/persistence.py", line 716, in _emit_update_statements
(table.description, len(records), rows))
StaleDataError: UPDATE statement on table 'celery_schedules' expected to update 1 row(s); 0 were matched.
[2016-01-25 21:18:03,367: CRITICAL/MainProcess] beat raised exception <class 'sqlalchemy.exc.InvalidRequestError'>: InvalidRequestError("This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback(). Original exception was: UPDATE statement on table 'celery_schedules' expected to update 1 row(s); 0 were matched.",)
Traceback (most recent call last):
File "/Users/savassen/DEV/test_app/vir_python/lib/python2.7/site-packages/celery/apps/beat.py", line 112, in start_scheduler
beat.start()
File "/Users/savassen/DEV/test_app/vir_python/lib/python2.7/site-packages/celery/beat.py", line 472, in start
self.sync()
File "/Users/savassen/DEV/test_app/vir_python/lib/python2.7/site-packages/celery/beat.py", line 475, in sync
self.scheduler.close()
File "/Users/savassen/DEV/test_app/vir_python/lib/python2.7/site-packages/celery/beat.py", line 285, in close
self.sync()
File "/Users/savassen/DEV/test_app/test_app_AdminPanel/sqlalchemy_scheduler.py", line 165, in sync
self._schedule = self._all_as_schedule()
File "/Users/savassen/DEV/test_app/test_app_AdminPanel/sqlalchemy_scheduler.py", line 129, in _all_as_schedule
for row in query:
File "/Users/savassen/DEV/test_app/vir_python/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2571, in __iter__
return self._execute_and_instances(context)
File "/Users/savassen/DEV/test_app/vir_python/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2584, in _execute_and_instances
close_with_result=True)
File "/Users/savassen/DEV/test_app/vir_python/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2575, in _connection_from_session
**kw)
File "/Users/savassen/DEV/test_app/vir_python/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 893, in connection
execution_options=execution_options)
File "/Users/savassen/DEV/test_app/vir_python/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 898, in _connection_for_bind
engine, execution_options)
File "/Users/savassen/DEV/test_app/vir_python/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 313, in _connection_for_bind
self._assert_active()
File "/Users/savassen/DEV/test_app/vir_python/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 214, in _assert_active
% self._rollback_exception
InvalidRequestError: This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback(). Original exception was: UPDATE statement on table 'celery_schedules' expected to update 1 row(s); 0 were matched.
Can you help me with this issue?
It works fine as like djceley's DatabaseScheule did. But the only problem is that it doesn't seem to send tasks that were added in runtime, then i restart the celery-beat, the tasks that were added before will be sent successfully.
So, is it possible to dynamically add/remove tasks without restarting celery-beat?
Thanks.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.