Giter VIP home page Giter VIP logo

celery-dyrygent's People

Contributors

brabiega avatar jensenbox avatar lukaszgomulka avatar omerfeinberg avatar sadziu82 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

celery-dyrygent's Issues

Need some help

In celery i have to implement a chord, like situation

image

I working on a stock market data. Every hour i have to calculate some value for each stock based on last 100 hours data
I have 10,000 stocks.

Currently i am using python multiprocessing. It works fine.

But I am thinking to use celery, As you mentioned celery is slow in this regard in your article. https://www.ovh.com/blog/doing-big-automation-with-celery/

image

So i want to use your celery extension. Will this have the same speed as python multiprocessing utility

on_error callbacks don't fire in task chains

I'm having some issues with triggering of the on_error task callbacks from chains. I've created a test project here:
https://github.com/kriberg/dyrygent-test

This defines three tasks:

@app.task
def normal_task():
    log.info("normal task")
    time.sleep(2)


@app.task(throws=(Exception,))
def failing_task():
    log.info("failing task")
    time.sleep(2)
    raise Exception("failure")


@app.task
def callback(msg, *args, **kwargs):
    log.error(f"error called: {msg} {args} {kwargs}")

These are put into a chain:

chain1 = chain(normal_task.si(), normal_task.si(), failing_task.si())
chain1.on_error(callback.si(f"Leaf chain 1 failed"))

Calling this with celery-dyrygent:

wf = Workflow()
wf.set_retry_policy("random", 1, 3)
wf.add_celery_canvas(chain1)
result = wf.apply_async(options={"link_error": callback.si("wf error")})

This produces the following log:

[2021-11-08 12:55:24,163: INFO/MainProcess] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] received
[2021-11-08 12:55:24,168: INFO/ForkPoolWorker-8] Scheduling execution of task 15b38035-8e7b-4857-be5c-9c6e44f4f438 with options {'link_error': {'task': 'test.celery.callback', 'args': ['wf error'], 'kwargs': {}, 'options': {}, 'subtask_type': None, 'immutable': True}}
[2021-11-08 12:55:24,180: INFO/ForkPoolWorker-8] Tick done, took 0.015532ms, workflow running, waiting for 1 tasks
[2021-11-08 12:55:24,180: INFO/MainProcess] Task test.celery.normal_task[15b38035-8e7b-4857-be5c-9c6e44f4f438] received
[2021-11-08 12:55:24,181: INFO/ForkPoolWorker-1] normal task
[2021-11-08 12:55:24,183: INFO/MainProcess] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] received
[2021-11-08 12:55:24,184: INFO/ForkPoolWorker-8] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] retry: Retry in 2s
[2021-11-08 12:55:26,197: INFO/ForkPoolWorker-1] Task test.celery.normal_task[15b38035-8e7b-4857-be5c-9c6e44f4f438] succeeded in 2.0154468250002537s: None
[2021-11-08 12:55:26,201: INFO/ForkPoolWorker-8] Task 15b38035-8e7b-4857-be5c-9c6e44f4f438 is done, success
[2021-11-08 12:55:26,204: INFO/ForkPoolWorker-8] Scheduling execution of task 504684c4-257e-495f-8419-237c7442d954 with options {'link_error': {'task': 'test.celery.callback', 'args': ['wf error'], 'kwargs': {}, 'options': {}, 'subtask_type': None, 'immutable': True}}
[2021-11-08 12:55:26,206: INFO/ForkPoolWorker-8] Tick done, took 0.005649ms, workflow running, waiting for 1 tasks
[2021-11-08 12:55:26,207: INFO/MainProcess] Task test.celery.normal_task[504684c4-257e-495f-8419-237c7442d954] received
[2021-11-08 12:55:26,208: INFO/ForkPoolWorker-1] normal task
[2021-11-08 12:55:26,211: INFO/MainProcess] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] received
[2021-11-08 12:55:26,212: INFO/ForkPoolWorker-8] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] retry: Retry in 2s
[2021-11-08 12:55:28,212: INFO/ForkPoolWorker-1] Task test.celery.normal_task[504684c4-257e-495f-8419-237c7442d954] succeeded in 2.004066970999702s: None
[2021-11-08 12:55:28,217: INFO/ForkPoolWorker-8] Task 504684c4-257e-495f-8419-237c7442d954 is done, success
[2021-11-08 12:55:28,220: INFO/ForkPoolWorker-8] Scheduling execution of task 3f02678a-c8b8-4e89-b348-4e4ef2ad2251 with options {'link_error': {'task': 'test.celery.callback', 'args': ['wf error'], 'kwargs': {}, 'options': {}, 'subtask_type': None, 'immutable': True}}
[2021-11-08 12:55:28,222: INFO/ForkPoolWorker-8] Tick done, took 0.006836ms, workflow running, waiting for 1 tasks
[2021-11-08 12:55:28,224: INFO/MainProcess] Task test.celery.failing_task[3f02678a-c8b8-4e89-b348-4e4ef2ad2251] received
[2021-11-08 12:55:28,225: INFO/ForkPoolWorker-1] failing task
[2021-11-08 12:55:28,227: INFO/MainProcess] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] received
[2021-11-08 12:55:28,228: INFO/ForkPoolWorker-8] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] retry: Retry in 2s
[2021-11-08 12:55:30,230: ERROR/ForkPoolWorker-1] error called: wf error () {}
[2021-11-08 12:55:30,230: INFO/ForkPoolWorker-1] Task test.celery.failing_task[3f02678a-c8b8-4e89-b348-4e4ef2ad2251] raised expected: Exception('failure')
[2021-11-08 12:55:30,233: INFO/ForkPoolWorker-8] Tick done, took 0.000878ms, workflow running, waiting for 1 tasks
[2021-11-08 12:55:30,237: INFO/MainProcess] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] received
[2021-11-08 12:55:30,238: INFO/ForkPoolWorker-8] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] retry: Retry in 1s
[2021-11-08 12:55:32,425: INFO/ForkPoolWorker-8] Tick done, took 0.001262ms, workflow running, waiting for 1 tasks
[2021-11-08 12:55:32,431: INFO/MainProcess] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] received
[2021-11-08 12:55:32,433: INFO/ForkPoolWorker-8] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] retry: Retry in 2s
[2021-11-08 12:55:34,458: INFO/ForkPoolWorker-8] Tick done, took 0.000845ms, workflow running, waiting for 1 tasks
[2021-11-08 12:55:34,462: INFO/MainProcess] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] received
[2021-11-08 12:55:34,463: INFO/ForkPoolWorker-8] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] retry: Retry in 1s
[2021-11-08 12:55:36,429: INFO/ForkPoolWorker-8] Task 3f02678a-c8b8-4e89-b348-4e4ef2ad2251 has final state after 4 checks
[2021-11-08 12:55:36,429: INFO/ForkPoolWorker-8] Task 3f02678a-c8b8-4e89-b348-4e4ef2ad2251 is done, failure, result '<class 'Exception'>(failure)'
[2021-11-08 12:55:36,429: INFO/ForkPoolWorker-8] Tick done, took 0.000660ms, workflow finished after 7 ticks
[2021-11-08 12:55:36,430: INFO/ForkPoolWorker-8] Task workflow-processor[1e02286f-b836-4269-bcca-f35fdc5523ea] succeeded in 0.0014809360000072047s: None

Here we see the callback linked to the overall workflow triggers as intended, but the callback set to the chain never fires.

Calling the same chain with celery apply_async:

chain1.on_error(callback.si("master error"))
chain1.apply_async()

Produces this:

[2021-11-08 12:55:46,447: INFO/MainProcess] Task test.celery.normal_task[15b38035-8e7b-4857-be5c-9c6e44f4f438] received
[2021-11-08 12:55:46,449: INFO/ForkPoolWorker-8] normal task
[2021-11-08 12:55:48,455: INFO/MainProcess] Task test.celery.normal_task[504684c4-257e-495f-8419-237c7442d954] received
[2021-11-08 12:55:48,455: INFO/ForkPoolWorker-8] Task test.celery.normal_task[15b38035-8e7b-4857-be5c-9c6e44f4f438] succeeded in 2.0059917730031884s: None
[2021-11-08 12:55:48,456: INFO/ForkPoolWorker-8] normal task
[2021-11-08 12:55:50,462: INFO/ForkPoolWorker-8] Task test.celery.normal_task[504684c4-257e-495f-8419-237c7442d954] succeeded in 2.0055490619997727s: None
[2021-11-08 12:55:50,463: INFO/MainProcess] Task test.celery.failing_task[3f02678a-c8b8-4e89-b348-4e4ef2ad2251] received
[2021-11-08 12:55:50,465: INFO/ForkPoolWorker-8] failing task
[2021-11-08 12:55:52,470: ERROR/ForkPoolWorker-8] error called: Leaf chain 1 failed () {}
[2021-11-08 12:55:52,471: ERROR/ForkPoolWorker-8] error called: master error () {}
[2021-11-08 12:55:52,471: INFO/ForkPoolWorker-8] Task test.celery.failing_task[3f02678a-c8b8-4e89-b348-4e4ef2ad2251] raised expected: Exception('failure')

Here both callbacks are triggered correctly.
Now, as we know, celery doesn't do well with a large complex canvas, so just using celery isn't a good option.

Is this a limitation with dyrygent or is it a bug?

Cut a 0.9 release

It's been a few years since this was released to PyPI and there have been quite a few fixes since. Perhaps we can cut another release of this to the package index?

relation to celery-director

hello,

may i ask if the relevant parts of this repo have been included in celery-director, or what is the relation between those two OVH celery projects?

Release version 0.7.0

Python 3 support was recently added.
Please prepare new release with version 0.7.0

celery-dyrygent appears to only support python 2.x

This is the stack trace i get when attempting to run a workflow. I do not believe it's necessary to include more code, but i can if you think it's necessary.

celery-worker_1    | [2020-04-12 15:11:57,579: INFO/MainProcess] Received task: workflow-processor[e529f4d8-3a0c-4717-af8f-83fd32aaf1dd]
celery-worker_1    | [2020-04-12 15:11:57,609: ERROR/ForkPoolWorker-2] Task workflow-processor[e529f4d8-3a0c-4717-af8f-83fd32aaf1dd] raised unexpected: AttributeError("'dict' object has no attribute 'iteritems'")
celery-worker_1    | Traceback (most recent call last):
celery-worker_1    |   File "/usr/local/lib/python3.7/site-packages/celery/app/trace.py", line 385, in trace_task
celery-worker_1    |     R = retval = fun(*args, **kwargs)
celery-worker_1    |   File "/usr/local/lib/python3.7/site-packages/celery/app/trace.py", line 650, in __protected_call__
celery-worker_1    |     return self.run(*args, **kwargs)
celery-worker_1    |   File "/usr/local/lib/python3.7/site-packages/celery_dyrygent/tasks/processor.py", line 10, in workflow_processor
celery-worker_1    |     wf = Workflow.from_dict(workflow_dict)
celery-worker_1    |   File "/usr/local/lib/python3.7/site-packages/celery_dyrygent/workflows/workflow.py", line 529, in from_dict
celery-worker_1    |     for node_id, node_dict in workflow_dict['nodes'].iteritems()
celery-worker_1    | AttributeError: 'dict' object has no attribute 'iteritems'

dyrgent not passing output of task 1 as input to task 2 in a chain

Given a task called echo like this

@app.task
def echo(*args, **kwargs):
    print('echo {} {}'.format(args, kwargs))
    return args

And then a workflow like this

celery_workflow = chain(echo.s(1), echo.s(2))

The output of the standard Celery workflow is correct and looks like this

print(celery_workflow.apply_async())
[2022-09-06 03:12:28,534: WARNING/ForkPoolWorker-32] echo (1,) {}
[2022-09-06 03:12:28,550: INFO/MainProcess] Task worker2.echo[a88d5d4a-c531-46c0-9f50-34ef207d54e0] received
[2022-09-06 03:12:28,551: WARNING/ForkPoolWorker-1] echo ([1], 2) {}

But the out of the dyrgent version is wrong like this

workflow = Workflow()
workflow.add_celery_canvas(celery_workflow)
print(workflow.apply_async())
[2022-09-06 03:17:44,812: WARNING/ForkPoolWorker-1] echo (1,) {}
[2022-09-06 03:17:44,813: INFO/MainProcess] Task workflow-processor[c068d1df-f6a7-418f-8dd9-c152e3bbbc18] received
[2022-09-06 03:17:44,817: INFO/ForkPoolWorker-1] Task worker2.echo[664d33a1-c7f8-4b93-aef6-99a43d602576] succeeded in 0.0048081259010359645s: (1,)
[2022-09-06 03:17:44,906: INFO/MainProcess] Events of group {task} enabled by remote.
[2022-09-06 03:17:48,061: INFO/ForkPoolWorker-32] Task 664d33a1-c7f8-4b93-aef6-99a43d602576 is done, success
[2022-09-06 03:17:48,062: INFO/ForkPoolWorker-32] Scheduling execution of task 7913e555-38e4-4052-b1e9-0be756d4f9ba with options {}
[2022-09-06 03:17:48,064: INFO/ForkPoolWorker-32] Tick done, took 0.003532ms, workflow running, waiting for 1 tasks
[2022-09-06 03:17:48,065: INFO/MainProcess] Task worker2.echo[7913e555-38e4-4052-b1e9-0be756d4f9ba] received
[2022-09-06 03:17:48,066: WARNING/ForkPoolWorker-1] echo (2,) {}

Is this a bug or a feature?

`tick`fails on deserialized Workflow

If i start a workflow, serialize it to json, and then recreate my original Workflow, the tick function will raise an exception.

This is the stack trace :

 File "/usr/local/lib/python3.7/site-packages/celery_dyrygent/workflows/workflow.py", line 411, in tick
    running = self._tick()
  File "/usr/local/lib/python3.7/site-packages/celery_dyrygent/workflows/workflow.py", line 447, in _tick
    already_scheduled = self.are_scheduled(sigs_to_run)
  File "/usr/local/lib/python3.7/site-packages/celery_dyrygent/workflows/workflow.py", line 385, in are_scheduled
    states = self.get_tasks_state([n.id for n in nodes_to_run])
  File "/usr/local/lib/python3.7/site-packages/celery_dyrygent/workflows/workflow.py", line 271, in get_tasks_state
    for task_id in task_ids
  File "/usr/local/lib/python3.7/site-packages/celery_dyrygent/workflows/workflow.py", line 271, in <dictcomp>
    for task_id in task_ids
  File "/usr/local/lib/python3.7/site-packages/celery_dyrygent/celery/inspect.py", line 34, in get_task_state
    return ResultState(entities.AsyncResult(task_id))
  File "/usr/local/lib/python3.7/site-packages/celery_dyrygent/celery/inspect.py", line 50, in __init__
    self.__inspect()
  File "/usr/local/lib/python3.7/site-packages/celery_dyrygent/celery/inspect.py", line 53, in __inspect
    self.success = self.result.successful()
  File "/usr/local/lib/python3.7/site-packages/celery/result.py", line 319, in successful
    return self.state == states.SUCCESS
  File "/usr/local/lib/python3.7/site-packages/celery/result.py", line 475, in state
    return self._get_task_meta()['status']
  File "/usr/local/lib/python3.7/site-packages/celery/result.py", line 414, in _get_task_meta
    return self._maybe_set_cache(self.backend.get_task_meta(self.id))
  File "/usr/local/lib/python3.7/site-packages/celery/backends/base.py", line 451, in get_task_meta
    meta = self._get_task_meta_for(task_id)
AttributeError: 'DisabledBackend' object has no attribute '_get_task_meta_for'

The root cause is that inspect.get_task_state will try to instanciate AsyncResult without specifying their result backend.

I do not know how that works normally. I will keep on looking in my application code, to make sure that the issue is not on my side.

Celery 5 support

This is a great package and really helped me with automating large celery workflows. I recently updated my app to use Celery 5 and had continued to use this package without noticing any issues but started to debug some of the tasks this week and noticed this was only built for Celery 4. I haven't noticed anything obvious in terms of incompatibility but figured I would ask you first in the event you had an answer. So, two questions:

  1. Do you plan to upgrade to Celery 5/add support?
  2. If not, is there anything that sticks out within this package that would not be compatible with Celery 5? I have looked through everything and can't seem to see anything that would cause any issues.

I would prefer to keep using this if possible so wanted to see if I should fork and make any changes so I could use it with Celery 5 OR remove the package completely and try to go another route.

Thank you in advance and appreciate the good work!

Extend the capabilities of WorkflowNode

One of the things i wanted to do in my first experimentations was to add data to the nodes, so that i can serialize it.

I can offer a technical solution if you believe it's of interest.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.