ovh / celery-dyrygent Goto Github PK
View Code? Open in Web Editor NEWCelery extension which allows to orchestrate 100/1000/10000 tasks combined into a complex workflow
License: Other
Celery extension which allows to orchestrate 100/1000/10000 tasks combined into a complex workflow
License: Other
In celery i have to implement a chord, like situation
I working on a stock market data. Every hour i have to calculate some value for each stock based on last 100 hours data
I have 10,000 stocks.
Currently i am using python multiprocessing. It works fine.
But I am thinking to use celery, As you mentioned celery is slow in this regard in your article. https://www.ovh.com/blog/doing-big-automation-with-celery/
So i want to use your celery extension. Will this have the same speed as python multiprocessing utility
I'm having some issues with triggering of the on_error task callbacks from chains. I've created a test project here:
https://github.com/kriberg/dyrygent-test
This defines three tasks:
@app.task
def normal_task():
log.info("normal task")
time.sleep(2)
@app.task(throws=(Exception,))
def failing_task():
log.info("failing task")
time.sleep(2)
raise Exception("failure")
@app.task
def callback(msg, *args, **kwargs):
log.error(f"error called: {msg} {args} {kwargs}")
These are put into a chain:
chain1 = chain(normal_task.si(), normal_task.si(), failing_task.si())
chain1.on_error(callback.si(f"Leaf chain 1 failed"))
Calling this with celery-dyrygent:
wf = Workflow()
wf.set_retry_policy("random", 1, 3)
wf.add_celery_canvas(chain1)
result = wf.apply_async(options={"link_error": callback.si("wf error")})
This produces the following log:
[2021-11-08 12:55:24,163: INFO/MainProcess] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] received
[2021-11-08 12:55:24,168: INFO/ForkPoolWorker-8] Scheduling execution of task 15b38035-8e7b-4857-be5c-9c6e44f4f438 with options {'link_error': {'task': 'test.celery.callback', 'args': ['wf error'], 'kwargs': {}, 'options': {}, 'subtask_type': None, 'immutable': True}}
[2021-11-08 12:55:24,180: INFO/ForkPoolWorker-8] Tick done, took 0.015532ms, workflow running, waiting for 1 tasks
[2021-11-08 12:55:24,180: INFO/MainProcess] Task test.celery.normal_task[15b38035-8e7b-4857-be5c-9c6e44f4f438] received
[2021-11-08 12:55:24,181: INFO/ForkPoolWorker-1] normal task
[2021-11-08 12:55:24,183: INFO/MainProcess] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] received
[2021-11-08 12:55:24,184: INFO/ForkPoolWorker-8] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] retry: Retry in 2s
[2021-11-08 12:55:26,197: INFO/ForkPoolWorker-1] Task test.celery.normal_task[15b38035-8e7b-4857-be5c-9c6e44f4f438] succeeded in 2.0154468250002537s: None
[2021-11-08 12:55:26,201: INFO/ForkPoolWorker-8] Task 15b38035-8e7b-4857-be5c-9c6e44f4f438 is done, success
[2021-11-08 12:55:26,204: INFO/ForkPoolWorker-8] Scheduling execution of task 504684c4-257e-495f-8419-237c7442d954 with options {'link_error': {'task': 'test.celery.callback', 'args': ['wf error'], 'kwargs': {}, 'options': {}, 'subtask_type': None, 'immutable': True}}
[2021-11-08 12:55:26,206: INFO/ForkPoolWorker-8] Tick done, took 0.005649ms, workflow running, waiting for 1 tasks
[2021-11-08 12:55:26,207: INFO/MainProcess] Task test.celery.normal_task[504684c4-257e-495f-8419-237c7442d954] received
[2021-11-08 12:55:26,208: INFO/ForkPoolWorker-1] normal task
[2021-11-08 12:55:26,211: INFO/MainProcess] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] received
[2021-11-08 12:55:26,212: INFO/ForkPoolWorker-8] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] retry: Retry in 2s
[2021-11-08 12:55:28,212: INFO/ForkPoolWorker-1] Task test.celery.normal_task[504684c4-257e-495f-8419-237c7442d954] succeeded in 2.004066970999702s: None
[2021-11-08 12:55:28,217: INFO/ForkPoolWorker-8] Task 504684c4-257e-495f-8419-237c7442d954 is done, success
[2021-11-08 12:55:28,220: INFO/ForkPoolWorker-8] Scheduling execution of task 3f02678a-c8b8-4e89-b348-4e4ef2ad2251 with options {'link_error': {'task': 'test.celery.callback', 'args': ['wf error'], 'kwargs': {}, 'options': {}, 'subtask_type': None, 'immutable': True}}
[2021-11-08 12:55:28,222: INFO/ForkPoolWorker-8] Tick done, took 0.006836ms, workflow running, waiting for 1 tasks
[2021-11-08 12:55:28,224: INFO/MainProcess] Task test.celery.failing_task[3f02678a-c8b8-4e89-b348-4e4ef2ad2251] received
[2021-11-08 12:55:28,225: INFO/ForkPoolWorker-1] failing task
[2021-11-08 12:55:28,227: INFO/MainProcess] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] received
[2021-11-08 12:55:28,228: INFO/ForkPoolWorker-8] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] retry: Retry in 2s
[2021-11-08 12:55:30,230: ERROR/ForkPoolWorker-1] error called: wf error () {}
[2021-11-08 12:55:30,230: INFO/ForkPoolWorker-1] Task test.celery.failing_task[3f02678a-c8b8-4e89-b348-4e4ef2ad2251] raised expected: Exception('failure')
[2021-11-08 12:55:30,233: INFO/ForkPoolWorker-8] Tick done, took 0.000878ms, workflow running, waiting for 1 tasks
[2021-11-08 12:55:30,237: INFO/MainProcess] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] received
[2021-11-08 12:55:30,238: INFO/ForkPoolWorker-8] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] retry: Retry in 1s
[2021-11-08 12:55:32,425: INFO/ForkPoolWorker-8] Tick done, took 0.001262ms, workflow running, waiting for 1 tasks
[2021-11-08 12:55:32,431: INFO/MainProcess] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] received
[2021-11-08 12:55:32,433: INFO/ForkPoolWorker-8] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] retry: Retry in 2s
[2021-11-08 12:55:34,458: INFO/ForkPoolWorker-8] Tick done, took 0.000845ms, workflow running, waiting for 1 tasks
[2021-11-08 12:55:34,462: INFO/MainProcess] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] received
[2021-11-08 12:55:34,463: INFO/ForkPoolWorker-8] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] retry: Retry in 1s
[2021-11-08 12:55:36,429: INFO/ForkPoolWorker-8] Task 3f02678a-c8b8-4e89-b348-4e4ef2ad2251 has final state after 4 checks
[2021-11-08 12:55:36,429: INFO/ForkPoolWorker-8] Task 3f02678a-c8b8-4e89-b348-4e4ef2ad2251 is done, failure, result '<class 'Exception'>(failure)'
[2021-11-08 12:55:36,429: INFO/ForkPoolWorker-8] Tick done, took 0.000660ms, workflow finished after 7 ticks
[2021-11-08 12:55:36,430: INFO/ForkPoolWorker-8] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] succeeded in 0.0014809360000072047s: None
Here we see the callback linked to the overall workflow triggers as intended, but the callback set to the chain never fires.
Calling the same chain with celery apply_async:
chain1.on_error(callback.si("master error"))
chain1.apply_async()
Produces this:
[2021-11-08 12:55:46,447: INFO/MainProcess] Task test.celery.normal_task[15b38035-8e7b-4857-be5c-9c6e44f4f438] received
[2021-11-08 12:55:46,449: INFO/ForkPoolWorker-8] normal task
[2021-11-08 12:55:48,455: INFO/MainProcess] Task test.celery.normal_task[504684c4-257e-495f-8419-237c7442d954] received
[2021-11-08 12:55:48,455: INFO/ForkPoolWorker-8] Task test.celery.normal_task[15b38035-8e7b-4857-be5c-9c6e44f4f438] succeeded in 2.0059917730031884s: None
[2021-11-08 12:55:48,456: INFO/ForkPoolWorker-8] normal task
[2021-11-08 12:55:50,462: INFO/ForkPoolWorker-8] Task test.celery.normal_task[504684c4-257e-495f-8419-237c7442d954] succeeded in 2.0055490619997727s: None
[2021-11-08 12:55:50,463: INFO/MainProcess] Task test.celery.failing_task[3f02678a-c8b8-4e89-b348-4e4ef2ad2251] received
[2021-11-08 12:55:50,465: INFO/ForkPoolWorker-8] failing task
[2021-11-08 12:55:52,470: ERROR/ForkPoolWorker-8] error called: Leaf chain 1 failed () {}
[2021-11-08 12:55:52,471: ERROR/ForkPoolWorker-8] error called: master error () {}
[2021-11-08 12:55:52,471: INFO/ForkPoolWorker-8] Task test.celery.failing_task[3f02678a-c8b8-4e89-b348-4e4ef2ad2251] raised expected: Exception('failure')
Here both callbacks are triggered correctly.
Now, as we know, celery doesn't do well with a large complex canvas, so just using celery isn't a good option.
Is this a limitation with dyrygent or is it a bug?
It's been a few years since this was released to PyPI and there have been quite a few fixes since. Perhaps we can cut another release of this to the package index?
hello,
may i ask if the relevant parts of this repo have been included in celery-director, or what is the relation between those two OVH celery projects?
Python 3 support was recently added.
Please prepare new release with version 0.7.0
Hello I need to use this library for a project. I've created a pypi release. https://pypi.org/project/celery-dyrygent/
It would be appropriate to transfer the ownership to you. Do you have a pypi User that I can change ownership over to?
This is the stack trace i get when attempting to run a workflow. I do not believe it's necessary to include more code, but i can if you think it's necessary.
celery-worker_1 | [2020-04-12 15:11:57,579: INFO/MainProcess] Received task: workflow-processor[e529f4d8-3a0c-4717-af8f-83fd32aaf1dd]
celery-worker_1 | [2020-04-12 15:11:57,609: ERROR/ForkPoolWorker-2] Task workflow-processor[e529f4d8-3a0c-4717-af8f-83fd32aaf1dd] raised unexpected: AttributeError("'dict' object has no attribute 'iteritems'")
celery-worker_1 | Traceback (most recent call last):
celery-worker_1 | File "/usr/local/lib/python3.7/site-packages/celery/app/trace.py", line 385, in trace_task
celery-worker_1 | R = retval = fun(*args, **kwargs)
celery-worker_1 | File "/usr/local/lib/python3.7/site-packages/celery/app/trace.py", line 650, in __protected_call__
celery-worker_1 | return self.run(*args, **kwargs)
celery-worker_1 | File "/usr/local/lib/python3.7/site-packages/celery_dyrygent/tasks/processor.py", line 10, in workflow_processor
celery-worker_1 | wf = Workflow.from_dict(workflow_dict)
celery-worker_1 | File "/usr/local/lib/python3.7/site-packages/celery_dyrygent/workflows/workflow.py", line 529, in from_dict
celery-worker_1 | for node_id, node_dict in workflow_dict['nodes'].iteritems()
celery-worker_1 | AttributeError: 'dict' object has no attribute 'iteritems'
Given a task called echo
like this
@app.task
def echo(*args, **kwargs):
print('echo {} {}'.format(args, kwargs))
return args
And then a workflow like this
celery_workflow = chain(echo.s(1), echo.s(2))
The output of the standard Celery workflow is correct and looks like this
print(celery_workflow.apply_async())
[2022-09-06 03:12:28,534: WARNING/ForkPoolWorker-32] echo (1,) {}
[2022-09-06 03:12:28,550: INFO/MainProcess] Task worker2.echo[a88d5d4a-c531-46c0-9f50-34ef207d54e0] received
[2022-09-06 03:12:28,551: WARNING/ForkPoolWorker-1] echo ([1], 2) {}
But the out of the dyrgent version is wrong like this
workflow = Workflow()
workflow.add_celery_canvas(celery_workflow)
print(workflow.apply_async())
[2022-09-06 03:17:44,812: WARNING/ForkPoolWorker-1] echo (1,) {}
[2022-09-06 03:17:44,813: INFO/MainProcess] Task workflow-processor[c068d1df-f6a7-418f-8dd9-c152e3bbbc18] received
[2022-09-06 03:17:44,817: INFO/ForkPoolWorker-1] Task worker2.echo[664d33a1-c7f8-4b93-aef6-99a43d602576] succeeded in 0.0048081259010359645s: (1,)
[2022-09-06 03:17:44,906: INFO/MainProcess] Events of group {task} enabled by remote.
[2022-09-06 03:17:48,061: INFO/ForkPoolWorker-32] Task 664d33a1-c7f8-4b93-aef6-99a43d602576 is done, success
[2022-09-06 03:17:48,062: INFO/ForkPoolWorker-32] Scheduling execution of task 7913e555-38e4-4052-b1e9-0be756d4f9ba with options {}
[2022-09-06 03:17:48,064: INFO/ForkPoolWorker-32] Tick done, took 0.003532ms, workflow running, waiting for 1 tasks
[2022-09-06 03:17:48,065: INFO/MainProcess] Task worker2.echo[7913e555-38e4-4052-b1e9-0be756d4f9ba] received
[2022-09-06 03:17:48,066: WARNING/ForkPoolWorker-1] echo (2,) {}
Is this a bug or a feature?
If i start a workflow, serialize it to json, and then recreate my original Workflow
, the tick
function will raise an exception.
This is the stack trace :
File "/usr/local/lib/python3.7/site-packages/celery_dyrygent/workflows/workflow.py", line 411, in tick
running = self._tick()
File "/usr/local/lib/python3.7/site-packages/celery_dyrygent/workflows/workflow.py", line 447, in _tick
already_scheduled = self.are_scheduled(sigs_to_run)
File "/usr/local/lib/python3.7/site-packages/celery_dyrygent/workflows/workflow.py", line 385, in are_scheduled
states = self.get_tasks_state([n.id for n in nodes_to_run])
File "/usr/local/lib/python3.7/site-packages/celery_dyrygent/workflows/workflow.py", line 271, in get_tasks_state
for task_id in task_ids
File "/usr/local/lib/python3.7/site-packages/celery_dyrygent/workflows/workflow.py", line 271, in <dictcomp>
for task_id in task_ids
File "/usr/local/lib/python3.7/site-packages/celery_dyrygent/celery/inspect.py", line 34, in get_task_state
return ResultState(entities.AsyncResult(task_id))
File "/usr/local/lib/python3.7/site-packages/celery_dyrygent/celery/inspect.py", line 50, in __init__
self.__inspect()
File "/usr/local/lib/python3.7/site-packages/celery_dyrygent/celery/inspect.py", line 53, in __inspect
self.success = self.result.successful()
File "/usr/local/lib/python3.7/site-packages/celery/result.py", line 319, in successful
return self.state == states.SUCCESS
File "/usr/local/lib/python3.7/site-packages/celery/result.py", line 475, in state
return self._get_task_meta()['status']
File "/usr/local/lib/python3.7/site-packages/celery/result.py", line 414, in _get_task_meta
return self._maybe_set_cache(self.backend.get_task_meta(self.id))
File "/usr/local/lib/python3.7/site-packages/celery/backends/base.py", line 451, in get_task_meta
meta = self._get_task_meta_for(task_id)
AttributeError: 'DisabledBackend' object has no attribute '_get_task_meta_for'
The root cause is that inspect.get_task_state
will try to instanciate AsyncResult
without specifying their result backend.
I do not know how that works normally. I will keep on looking in my application code, to make sure that the issue is not on my side.
This is a great package and really helped me with automating large celery workflows. I recently updated my app to use Celery 5 and had continued to use this package without noticing any issues but started to debug some of the tasks this week and noticed this was only built for Celery 4. I haven't noticed anything obvious in terms of incompatibility but figured I would ask you first in the event you had an answer. So, two questions:
I would prefer to keep using this if possible so wanted to see if I should fork and make any changes so I could use it with Celery 5 OR remove the package completely and try to go another route.
Thank you in advance and appreciate the good work!
One of the things i wanted to do in my first experimentations was to add data to the nodes, so that i can serialize it.
I can offer a technical solution if you believe it's of interest.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.