Giter VIP home page Giter VIP logo

pykka's Introduction

๐ŸŒ€ Pykka

Pykka makes it easier to build concurrent applications.

CI Docs Coverage PyPI


Pykka is a Python implementation of the actor model. The actor model introduces some simple rules to control the sharing of state and cooperation between execution units, which makes it easier to build concurrent applications.

For a quickstart guide and a complete API reference, see the documentation.

Installation

Pykka requires Python 3.8 or newer.

Pykka is available from PyPI:

python3 -m pip install pykka

Project resources

License

Pykka is copyright 2010-2024 Stein Magnus Jodal and contributors. Pykka is licensed under the Apache License, Version 2.0.

pykka's People

Contributors

benjixx avatar chris-martin avatar darshandzend avatar dependabot[bot] avatar fossilet avatar jodal avatar jrcamp avatar jstasiak avatar lime-green avatar robinsonsk avatar tamland avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

pykka's Issues

Future.map can apply function more than once

Let r be a result of Future.map call. Its type is Future. When I call r.get() multiple times, it will recompute its value that many times. This behavior differs from ordinary "non-map" Futures which cache computed value.

Failing unit-test for illustration:

def test_map_computes_each_value_only_once(self):
    def f(x):
        f.calls_count += 1
    f.calls_count = 0

    future = self.results[0].map(f)
    self.results[0].set(30)
    future.get(timeout=0)
    future.get(timeout=0)

    self.assertEqual(f.calls_count, 1)

To recap, two problems here:

  1. Mapped function call can be expensive, for example, it can start new actors.
  2. Future.get() semantics differs for Futures obtained with map and ask.

Same issue applies to filter/reduce.

Sender attribute?

Is there an equivalent of a sender attribute on the actor?
This is to allow me to send messages to the "sender" --- and is drastically different from the "ask" semantics.

Consider, for example, a child actor keeping track of "failures" and sending a message to parent saying "Failure Limit Reached".

In this case the semantics of the parent would be something like this:

  1. Spawn Child actor.
  2. Listen to "Failure Limit Reached" messages and restart or replace child actor with a new one.

This cannot be accomplished with an "ask" --- since once the child is spawned, the parent cares not for it and carries on with its duties.

P.S.: As a side note, documentation seems to favor the "ask" way of doing things (see "replying to messages" in pykka docs), which is not how the general actor system usually functions. The preferred way is to use "tell" and only resort to "ask" in select cases. Akka also prefers tell:
http://doc.akka.io/docs/akka/snapshot/scala/actors.html

Dead letters equivalent?

Is there an equivalent of dead letters in pykka?
I.e. ability to see all the message (at least in a log) that were not delivered anywhere?

Currently, I implement this as follows in my actor:

if message.get('command') == 'poll':
        # logic
else:
        logging.log(logging.DEBUG, 'Dead letters: ' + str(message))

proxy() does not play well with mock objects

Execution completely freezes up when trying to get a proxy of an actor that's got a bit of constructor injection going on.

I'm specifically using Voidspace mock's MagicMock object:

# ...
mock_credit_card_processor = MagicMock()
foobar = Foobar.start(mock_credit_card_processor)
foobar.proxy()       # blocked indefinitely here!
# ...

... where the code to Foobar looks something like this:

class FooBar(pykka.ThreadingActor):
    def __init__(self, credit_card_processor):
        super(FooBar, self).__init__()
        self.credit_card_processor = credit_card_processor

I suspect because mock objects don't behave as pykka expects members to behave when doing introspection.

Parts of traceback is lost when reraising exceptions

This issue was reported by @adamcik at IRC after review of the Pykka source code.

At https://github.com/jodal/pykka/blob/master/pykka/gevent.py#L37 a gevent.Timeout exceptions is reraised wrapped in a pykka.Timeout exception. This reraise does not keep the traceback of the original exception.

At https://github.com/jodal/pykka/blob/master/pykka/future.py#L93 a exception thrown in a ThreadingActor is reraised in the thread of a consumer of a ThreadingFuture. This reraise does not keep the traceback of the original exception.

The exceptions should be reraised using the three-argument raise syntax:

raise exc_class, exc_instance, traceback

E.g.:

raise _Timeout, _Timeout(e), sys.exc_info()[2]

In cases where the exception is thrown in one thread and reraised in another thread, the first thread must make sure to pass on both the exception object and the traceback, found at sys.exc_info()[2].

How do pykka actors send a message to themselves?

I'm writing a simple Producer actor that's supposed to spit out a string every second. At creation time, it doesn't do anything.
When I send it a 'start' message it should start producing the string I nominate to the target I nominate.
When I send it a 'stop' message it should stop producing.

If we were doing this like Erlang, there'd be a sort of tail-recursive call that serves as the continuation of the process. But that's all a bit deep for Python. All we have is the on_receive function, which may or may not block.

My first stab looks something like this:

class StringProducer(GeventActor):
    def __init__(self, target, string='UNDEFINED STRING'):
        super(StringProducer, self).__init__()
        self.string = string
        self.target = target

    def on_receive(self):
        while True:
            self.target.tell({'produced':self.string})
            gevent.sleep(1)

I don't even bother looking at the command type (start or stop) because I feel stuck already. Is this the way to do it?
I would start a producer and give it a poke as to begin its production loop:

p1 = StringProducer.start(buffer, "apple")
p1.tell({'command':'start'})
gevent.sleep(5)
p1.tell({'command':'stop'})

How would you guys do it?

Pykka fails on 2.6.4

.................EEEEE............EEEE..........EEEEEEEEEE........................................
======================================================================
ERROR: test_actor_field_can_be_read_using_get_postfix (tests.field_access_test.ThreadingFieldAccessTest)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/adamcik/dev/pykka/tests/field_access_test.py", line 25, in setUp
    self.proxy = self.ActorWithFields.start().proxy()
  File "/home/adamcik/dev/pykka/pykka/actor.py", line 372, in proxy
    return ActorProxy(self)
  File "/home/adamcik/dev/pykka/pykka/proxy.py", line 50, in __init__
    self._actor_attributes = self._get_attributes()
  File "/home/adamcik/dev/pykka/pykka/proxy.py", line 54, in _get_attributes
    {'command': 'pykka_get_attributes'})
  File "/home/adamcik/dev/pykka/pykka/actor.py", line 347, in send_request_reply
    return future.get(timeout=timeout)
  File "/home/adamcik/dev/pykka/pykka/future.py", line 97, in get
    return self.get()
  File "/home/adamcik/dev/pykka/pykka/future.py", line 91, in get
    raise self._value
AttributeError: class Queue has no attribute '__mro__'
-------------------- >> begin captured logging << --------------------
pykka: DEBUG: Started ActorWithFields (urn:uuid:69f97bea-e2cf-4db3-ba07-d99849a858b4)
pykka: DEBUG: Registered ActorWithFields (urn:uuid:69f97bea-e2cf-4db3-ba07-d99849a858b4)
--------------------- >> end captured logging << ---------------------

======================================================================
ERROR: test_actor_field_can_be_set_using_assignment (tests.field_access_test.ThreadingFieldAccessTest)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/adamcik/dev/pykka/tests/field_access_test.py", line 25, in setUp
    self.proxy = self.ActorWithFields.start().proxy()
  File "/home/adamcik/dev/pykka/pykka/actor.py", line 372, in proxy
    return ActorProxy(self)
  File "/home/adamcik/dev/pykka/pykka/proxy.py", line 50, in __init__
    self._actor_attributes = self._get_attributes()
  File "/home/adamcik/dev/pykka/pykka/proxy.py", line 54, in _get_attributes
    {'command': 'pykka_get_attributes'})
  File "/home/adamcik/dev/pykka/pykka/actor.py", line 347, in send_request_reply
    return future.get(timeout=timeout)
  File "/home/adamcik/dev/pykka/pykka/future.py", line 97, in get
    return self.get()
  File "/home/adamcik/dev/pykka/pykka/future.py", line 91, in get
    raise self._value
AttributeError: class Queue has no attribute '__mro__'
-------------------- >> begin captured logging << --------------------
pykka: DEBUG: Started ActorWithFields (urn:uuid:28a9ef5d-295b-486f-89bd-07410dbc4742)
pykka: DEBUG: Registered ActorWithFields (urn:uuid:28a9ef5d-295b-486f-89bd-07410dbc4742)
--------------------- >> end captured logging << ---------------------

======================================================================
ERROR: test_actor_get_attributes_contains_traversable_attributes (tests.field_access_test.ThreadingFieldAccessTest)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/adamcik/dev/pykka/tests/field_access_test.py", line 25, in setUp
    self.proxy = self.ActorWithFields.start().proxy()
  File "/home/adamcik/dev/pykka/pykka/actor.py", line 372, in proxy
    return ActorProxy(self)
  File "/home/adamcik/dev/pykka/pykka/proxy.py", line 50, in __init__
    self._actor_attributes = self._get_attributes()
  File "/home/adamcik/dev/pykka/pykka/proxy.py", line 54, in _get_attributes
    {'command': 'pykka_get_attributes'})
  File "/home/adamcik/dev/pykka/pykka/actor.py", line 347, in send_request_reply
    return future.get(timeout=timeout)
  File "/home/adamcik/dev/pykka/pykka/future.py", line 97, in get
    return self.get()
  File "/home/adamcik/dev/pykka/pykka/future.py", line 91, in get
    raise self._value
AttributeError: class Queue has no attribute '__mro__'
-------------------- >> begin captured logging << --------------------
pykka: DEBUG: Started ActorWithFields (urn:uuid:f77d47b0-4f25-4fd8-9914-3a6071e41c50)
pykka: DEBUG: Registered ActorWithFields (urn:uuid:f77d47b0-4f25-4fd8-9914-3a6071e41c50)
--------------------- >> end captured logging << ---------------------

======================================================================
ERROR: test_attr_of_traversable_attr_can_be_read (tests.field_access_test.ThreadingFieldAccessTest)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/adamcik/dev/pykka/tests/field_access_test.py", line 25, in setUp
    self.proxy = self.ActorWithFields.start().proxy()
  File "/home/adamcik/dev/pykka/pykka/actor.py", line 372, in proxy
    return ActorProxy(self)
  File "/home/adamcik/dev/pykka/pykka/proxy.py", line 50, in __init__
    self._actor_attributes = self._get_attributes()
  File "/home/adamcik/dev/pykka/pykka/proxy.py", line 54, in _get_attributes
    {'command': 'pykka_get_attributes'})
  File "/home/adamcik/dev/pykka/pykka/actor.py", line 347, in send_request_reply
    return future.get(timeout=timeout)
  File "/home/adamcik/dev/pykka/pykka/future.py", line 97, in get
    return self.get()
  File "/home/adamcik/dev/pykka/pykka/future.py", line 91, in get
    raise self._value
AttributeError: class Queue has no attribute '__mro__'
-------------------- >> begin captured logging << --------------------
pykka: DEBUG: Started ActorWithFields (urn:uuid:93a7e81e-bc09-44c1-bf13-e5c4ff1c8a63)
pykka: DEBUG: Registered ActorWithFields (urn:uuid:93a7e81e-bc09-44c1-bf13-e5c4ff1c8a63)
--------------------- >> end captured logging << ---------------------

======================================================================
ERROR: test_private_field_access_raises_exception (tests.field_access_test.ThreadingFieldAccessTest)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/adamcik/dev/pykka/tests/field_access_test.py", line 25, in setUp
    self.proxy = self.ActorWithFields.start().proxy()
  File "/home/adamcik/dev/pykka/pykka/actor.py", line 372, in proxy
    return ActorProxy(self)
  File "/home/adamcik/dev/pykka/pykka/proxy.py", line 50, in __init__
    self._actor_attributes = self._get_attributes()
  File "/home/adamcik/dev/pykka/pykka/proxy.py", line 54, in _get_attributes
    {'command': 'pykka_get_attributes'})
  File "/home/adamcik/dev/pykka/pykka/actor.py", line 347, in send_request_reply
    return future.get(timeout=timeout)
  File "/home/adamcik/dev/pykka/pykka/future.py", line 97, in get
    return self.get()
  File "/home/adamcik/dev/pykka/pykka/future.py", line 91, in get
    raise self._value
AttributeError: class Queue has no attribute '__mro__'
-------------------- >> begin captured logging << --------------------
pykka: DEBUG: Started ActorWithFields (urn:uuid:df13169e-da21-4ccf-a9a6-1178032983a9)
pykka: DEBUG: Registered ActorWithFields (urn:uuid:df13169e-da21-4ccf-a9a6-1178032983a9)
--------------------- >> end captured logging << ---------------------

======================================================================
ERROR: test_calling_unknown_method_raises_attribute_error (tests.method_call_test.ThreadingMethodCallTest)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/adamcik/dev/pykka/tests/method_call_test.py", line 22, in setUp
    self.proxy = self.ActorWithMethods.start().proxy()
  File "/home/adamcik/dev/pykka/pykka/actor.py", line 372, in proxy
    return ActorProxy(self)
  File "/home/adamcik/dev/pykka/pykka/proxy.py", line 50, in __init__
    self._actor_attributes = self._get_attributes()
  File "/home/adamcik/dev/pykka/pykka/proxy.py", line 54, in _get_attributes
    {'command': 'pykka_get_attributes'})
  File "/home/adamcik/dev/pykka/pykka/actor.py", line 347, in send_request_reply
    return future.get(timeout=timeout)
  File "/home/adamcik/dev/pykka/pykka/future.py", line 97, in get
    return self.get()
  File "/home/adamcik/dev/pykka/pykka/future.py", line 91, in get
    raise self._value
AttributeError: class Queue has no attribute '__mro__'
-------------------- >> begin captured logging << --------------------
pykka: DEBUG: Started ActorWithMethods (urn:uuid:81f0b360-8b14-4ce5-9332-a35403af4bf5)
pykka: DEBUG: Registered ActorWithMethods (urn:uuid:81f0b360-8b14-4ce5-9332-a35403af4bf5)
--------------------- >> end captured logging << ---------------------

======================================================================
ERROR: test_can_call_method_that_was_added_at_runtime (tests.method_call_test.ThreadingMethodCallTest)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/adamcik/dev/pykka/tests/method_call_test.py", line 22, in setUp
    self.proxy = self.ActorWithMethods.start().proxy()
  File "/home/adamcik/dev/pykka/pykka/actor.py", line 372, in proxy
    return ActorProxy(self)
  File "/home/adamcik/dev/pykka/pykka/proxy.py", line 50, in __init__
    self._actor_attributes = self._get_attributes()
  File "/home/adamcik/dev/pykka/pykka/proxy.py", line 54, in _get_attributes
    {'command': 'pykka_get_attributes'})
  File "/home/adamcik/dev/pykka/pykka/actor.py", line 347, in send_request_reply
    return future.get(timeout=timeout)
  File "/home/adamcik/dev/pykka/pykka/future.py", line 97, in get
    return self.get()
  File "/home/adamcik/dev/pykka/pykka/future.py", line 91, in get
    raise self._value
AttributeError: class Queue has no attribute '__mro__'
-------------------- >> begin captured logging << --------------------
pykka: DEBUG: Started ActorWithMethods (urn:uuid:d4559500-067a-4b2d-b201-3f24284898fe)
pykka: DEBUG: Registered ActorWithMethods (urn:uuid:d4559500-067a-4b2d-b201-3f24284898fe)
--------------------- >> end captured logging << ---------------------

======================================================================
ERROR: test_functional_method_call_returns_correct_value (tests.method_call_test.ThreadingMethodCallTest)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/adamcik/dev/pykka/tests/method_call_test.py", line 22, in setUp
    self.proxy = self.ActorWithMethods.start().proxy()
  File "/home/adamcik/dev/pykka/pykka/actor.py", line 372, in proxy
    return ActorProxy(self)
  File "/home/adamcik/dev/pykka/pykka/proxy.py", line 50, in __init__
    self._actor_attributes = self._get_attributes()
  File "/home/adamcik/dev/pykka/pykka/proxy.py", line 54, in _get_attributes
    {'command': 'pykka_get_attributes'})
  File "/home/adamcik/dev/pykka/pykka/actor.py", line 347, in send_request_reply
    return future.get(timeout=timeout)
  File "/home/adamcik/dev/pykka/pykka/future.py", line 97, in get
    return self.get()
  File "/home/adamcik/dev/pykka/pykka/future.py", line 91, in get
    raise self._value
AttributeError: class Queue has no attribute '__mro__'
-------------------- >> begin captured logging << --------------------
pykka: DEBUG: Started ActorWithMethods (urn:uuid:49cd2895-6b89-4789-a383-f7a59f39d18b)
pykka: DEBUG: Registered ActorWithMethods (urn:uuid:49cd2895-6b89-4789-a383-f7a59f39d18b)
--------------------- >> end captured logging << ---------------------

======================================================================
ERROR: test_side_effect_of_method_is_observable (tests.method_call_test.ThreadingMethodCallTest)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/adamcik/dev/pykka/tests/method_call_test.py", line 22, in setUp
    self.proxy = self.ActorWithMethods.start().proxy()
  File "/home/adamcik/dev/pykka/pykka/actor.py", line 372, in proxy
    return ActorProxy(self)
  File "/home/adamcik/dev/pykka/pykka/proxy.py", line 50, in __init__
    self._actor_attributes = self._get_attributes()
  File "/home/adamcik/dev/pykka/pykka/proxy.py", line 54, in _get_attributes
    {'command': 'pykka_get_attributes'})
  File "/home/adamcik/dev/pykka/pykka/actor.py", line 347, in send_request_reply
    return future.get(timeout=timeout)
  File "/home/adamcik/dev/pykka/pykka/future.py", line 97, in get
    return self.get()
  File "/home/adamcik/dev/pykka/pykka/future.py", line 91, in get
    raise self._value
AttributeError: class Queue has no attribute '__mro__'
-------------------- >> begin captured logging << --------------------
pykka: DEBUG: Started ActorWithMethods (urn:uuid:2f17931e-5581-45e2-9cdc-18e7791006ba)
pykka: DEBUG: Registered ActorWithMethods (urn:uuid:2f17931e-5581-45e2-9cdc-18e7791006ba)
--------------------- >> end captured logging << ---------------------

======================================================================
ERROR: test_dir_on_proxy_lists_attributes_of_the_actor (tests.proxy_test.ThreadingProxyTest)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/adamcik/dev/pykka/tests/proxy_test.py", line 27, in setUp
    self.proxy = ActorProxy(self.AnActor.start())
  File "/home/adamcik/dev/pykka/pykka/proxy.py", line 50, in __init__
    self._actor_attributes = self._get_attributes()
  File "/home/adamcik/dev/pykka/pykka/proxy.py", line 54, in _get_attributes
    {'command': 'pykka_get_attributes'})
  File "/home/adamcik/dev/pykka/pykka/actor.py", line 347, in send_request_reply
    return future.get(timeout=timeout)
  File "/home/adamcik/dev/pykka/pykka/future.py", line 97, in get
    return self.get()
  File "/home/adamcik/dev/pykka/pykka/future.py", line 91, in get
    raise self._value
AttributeError: class Queue has no attribute '__mro__'
-------------------- >> begin captured logging << --------------------
pykka: DEBUG: Started AnActor (urn:uuid:a236a814-2e57-44e1-8087-a19fda552084)
pykka: DEBUG: Registered AnActor (urn:uuid:a236a814-2e57-44e1-8087-a19fda552084)
--------------------- >> end captured logging << ---------------------

======================================================================
ERROR: test_dir_on_proxy_lists_private_attributes_of_the_proxy (tests.proxy_test.ThreadingProxyTest)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/adamcik/dev/pykka/tests/proxy_test.py", line 27, in setUp
    self.proxy = ActorProxy(self.AnActor.start())
  File "/home/adamcik/dev/pykka/pykka/proxy.py", line 50, in __init__
    self._actor_attributes = self._get_attributes()
  File "/home/adamcik/dev/pykka/pykka/proxy.py", line 54, in _get_attributes
    {'command': 'pykka_get_attributes'})
  File "/home/adamcik/dev/pykka/pykka/actor.py", line 347, in send_request_reply
    return future.get(timeout=timeout)
  File "/home/adamcik/dev/pykka/pykka/future.py", line 97, in get
    return self.get()
  File "/home/adamcik/dev/pykka/pykka/future.py", line 91, in get
    raise self._value
AttributeError: class Queue has no attribute '__mro__'
-------------------- >> begin captured logging << --------------------
pykka: DEBUG: Started AnActor (urn:uuid:8bea43a8-6693-463f-8574-b408abc30be4)
pykka: DEBUG: Registered AnActor (urn:uuid:8bea43a8-6693-463f-8574-b408abc30be4)
--------------------- >> end captured logging << ---------------------

======================================================================
ERROR: test_refs_proxy_method_returns_a_proxy (tests.proxy_test.ThreadingProxyTest)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/adamcik/dev/pykka/tests/proxy_test.py", line 27, in setUp
    self.proxy = ActorProxy(self.AnActor.start())
  File "/home/adamcik/dev/pykka/pykka/proxy.py", line 50, in __init__
    self._actor_attributes = self._get_attributes()
  File "/home/adamcik/dev/pykka/pykka/proxy.py", line 54, in _get_attributes
    {'command': 'pykka_get_attributes'})
  File "/home/adamcik/dev/pykka/pykka/actor.py", line 347, in send_request_reply
    return future.get(timeout=timeout)
  File "/home/adamcik/dev/pykka/pykka/future.py", line 97, in get
    return self.get()
  File "/home/adamcik/dev/pykka/pykka/future.py", line 91, in get
    raise self._value
AttributeError: class Queue has no attribute '__mro__'
-------------------- >> begin captured logging << --------------------
pykka: DEBUG: Started AnActor (urn:uuid:79f3b0bf-d3c3-4fb9-9b07-25affae1f9d4)
pykka: DEBUG: Registered AnActor (urn:uuid:79f3b0bf-d3c3-4fb9-9b07-25affae1f9d4)
--------------------- >> end captured logging << ---------------------

======================================================================
ERROR: test_repr_contains_actor_class_name (tests.proxy_test.ThreadingProxyTest)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/adamcik/dev/pykka/tests/proxy_test.py", line 27, in setUp
    self.proxy = ActorProxy(self.AnActor.start())
  File "/home/adamcik/dev/pykka/pykka/proxy.py", line 50, in __init__
    self._actor_attributes = self._get_attributes()
  File "/home/adamcik/dev/pykka/pykka/proxy.py", line 54, in _get_attributes
    {'command': 'pykka_get_attributes'})
  File "/home/adamcik/dev/pykka/pykka/actor.py", line 347, in send_request_reply
    return future.get(timeout=timeout)
  File "/home/adamcik/dev/pykka/pykka/future.py", line 97, in get
    return self.get()
  File "/home/adamcik/dev/pykka/pykka/future.py", line 91, in get
    raise self._value
AttributeError: class Queue has no attribute '__mro__'
-------------------- >> begin captured logging << --------------------
pykka: DEBUG: Started AnActor (urn:uuid:e803ccdc-0304-4e44-92f4-e45e35959641)
pykka: DEBUG: Registered AnActor (urn:uuid:e803ccdc-0304-4e44-92f4-e45e35959641)
--------------------- >> end captured logging << ---------------------

======================================================================
ERROR: test_repr_contains_actor_urn (tests.proxy_test.ThreadingProxyTest)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/adamcik/dev/pykka/tests/proxy_test.py", line 27, in setUp
    self.proxy = ActorProxy(self.AnActor.start())
  File "/home/adamcik/dev/pykka/pykka/proxy.py", line 50, in __init__
    self._actor_attributes = self._get_attributes()
  File "/home/adamcik/dev/pykka/pykka/proxy.py", line 54, in _get_attributes
    {'command': 'pykka_get_attributes'})
  File "/home/adamcik/dev/pykka/pykka/actor.py", line 347, in send_request_reply
    return future.get(timeout=timeout)
  File "/home/adamcik/dev/pykka/pykka/future.py", line 97, in get
    return self.get()
  File "/home/adamcik/dev/pykka/pykka/future.py", line 91, in get
    raise self._value
AttributeError: class Queue has no attribute '__mro__'
-------------------- >> begin captured logging << --------------------
pykka: DEBUG: Started AnActor (urn:uuid:89240e62-6a67-4fc7-9b24-274b902b1e0d)
pykka: DEBUG: Registered AnActor (urn:uuid:89240e62-6a67-4fc7-9b24-274b902b1e0d)
--------------------- >> end captured logging << ---------------------

======================================================================
ERROR: test_repr_contains_attr_path (tests.proxy_test.ThreadingProxyTest)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/adamcik/dev/pykka/tests/proxy_test.py", line 27, in setUp
    self.proxy = ActorProxy(self.AnActor.start())
  File "/home/adamcik/dev/pykka/pykka/proxy.py", line 50, in __init__
    self._actor_attributes = self._get_attributes()
  File "/home/adamcik/dev/pykka/pykka/proxy.py", line 54, in _get_attributes
    {'command': 'pykka_get_attributes'})
  File "/home/adamcik/dev/pykka/pykka/actor.py", line 347, in send_request_reply
    return future.get(timeout=timeout)
  File "/home/adamcik/dev/pykka/pykka/future.py", line 97, in get
    return self.get()
  File "/home/adamcik/dev/pykka/pykka/future.py", line 91, in get
    raise self._value
AttributeError: class Queue has no attribute '__mro__'
-------------------- >> begin captured logging << --------------------
pykka: DEBUG: Started AnActor (urn:uuid:87c24bcc-0b65-4e05-bf96-58c47d9a1564)
pykka: DEBUG: Registered AnActor (urn:uuid:87c24bcc-0b65-4e05-bf96-58c47d9a1564)
--------------------- >> end captured logging << ---------------------

======================================================================
ERROR: test_repr_is_wrapped_in_lt_and_gt (tests.proxy_test.ThreadingProxyTest)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/adamcik/dev/pykka/tests/proxy_test.py", line 27, in setUp
    self.proxy = ActorProxy(self.AnActor.start())
  File "/home/adamcik/dev/pykka/pykka/proxy.py", line 50, in __init__
    self._actor_attributes = self._get_attributes()
  File "/home/adamcik/dev/pykka/pykka/proxy.py", line 54, in _get_attributes
    {'command': 'pykka_get_attributes'})
  File "/home/adamcik/dev/pykka/pykka/actor.py", line 347, in send_request_reply
    return future.get(timeout=timeout)
  File "/home/adamcik/dev/pykka/pykka/future.py", line 97, in get
    return self.get()
  File "/home/adamcik/dev/pykka/pykka/future.py", line 91, in get
    raise self._value
AttributeError: class Queue has no attribute '__mro__'
-------------------- >> begin captured logging << --------------------
pykka: DEBUG: Started AnActor (urn:uuid:81e8d81b-a7b9-471e-b71b-19e50c8a4572)
pykka: DEBUG: Registered AnActor (urn:uuid:81e8d81b-a7b9-471e-b71b-19e50c8a4572)
--------------------- >> end captured logging << ---------------------

======================================================================
ERROR: test_repr_reveals_that_this_is_a_proxy (tests.proxy_test.ThreadingProxyTest)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/adamcik/dev/pykka/tests/proxy_test.py", line 27, in setUp
    self.proxy = ActorProxy(self.AnActor.start())
  File "/home/adamcik/dev/pykka/pykka/proxy.py", line 50, in __init__
    self._actor_attributes = self._get_attributes()
  File "/home/adamcik/dev/pykka/pykka/proxy.py", line 54, in _get_attributes
    {'command': 'pykka_get_attributes'})
  File "/home/adamcik/dev/pykka/pykka/actor.py", line 347, in send_request_reply
    return future.get(timeout=timeout)
  File "/home/adamcik/dev/pykka/pykka/future.py", line 97, in get
    return self.get()
  File "/home/adamcik/dev/pykka/pykka/future.py", line 91, in get
    raise self._value
AttributeError: class Queue has no attribute '__mro__'
-------------------- >> begin captured logging << --------------------
pykka: DEBUG: Started AnActor (urn:uuid:e1d5e262-ba97-4e55-b77c-43a6e89be475)
pykka: DEBUG: Registered AnActor (urn:uuid:e1d5e262-ba97-4e55-b77c-43a6e89be475)
--------------------- >> end captured logging << ---------------------

======================================================================
ERROR: test_str_contains_actor_class_name (tests.proxy_test.ThreadingProxyTest)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/adamcik/dev/pykka/tests/proxy_test.py", line 27, in setUp
    self.proxy = ActorProxy(self.AnActor.start())
  File "/home/adamcik/dev/pykka/pykka/proxy.py", line 50, in __init__
    self._actor_attributes = self._get_attributes()
  File "/home/adamcik/dev/pykka/pykka/proxy.py", line 54, in _get_attributes
    {'command': 'pykka_get_attributes'})
  File "/home/adamcik/dev/pykka/pykka/actor.py", line 347, in send_request_reply
    return future.get(timeout=timeout)
  File "/home/adamcik/dev/pykka/pykka/future.py", line 97, in get
    return self.get()
  File "/home/adamcik/dev/pykka/pykka/future.py", line 91, in get
    raise self._value
AttributeError: class Queue has no attribute '__mro__'
-------------------- >> begin captured logging << --------------------
pykka: DEBUG: Started AnActor (urn:uuid:6277eb8c-c7b5-4e93-8d2a-cac28a336a64)
pykka: DEBUG: Registered AnActor (urn:uuid:6277eb8c-c7b5-4e93-8d2a-cac28a336a64)
--------------------- >> end captured logging << ---------------------

======================================================================
ERROR: test_str_contains_actor_urn (tests.proxy_test.ThreadingProxyTest)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/adamcik/dev/pykka/tests/proxy_test.py", line 27, in setUp
    self.proxy = ActorProxy(self.AnActor.start())
  File "/home/adamcik/dev/pykka/pykka/proxy.py", line 50, in __init__
    self._actor_attributes = self._get_attributes()
  File "/home/adamcik/dev/pykka/pykka/proxy.py", line 54, in _get_attributes
    {'command': 'pykka_get_attributes'})
  File "/home/adamcik/dev/pykka/pykka/actor.py", line 347, in send_request_reply
    return future.get(timeout=timeout)
  File "/home/adamcik/dev/pykka/pykka/future.py", line 97, in get
    return self.get()
  File "/home/adamcik/dev/pykka/pykka/future.py", line 91, in get
    raise self._value
AttributeError: class Queue has no attribute '__mro__'
-------------------- >> begin captured logging << --------------------
pykka: DEBUG: Started AnActor (urn:uuid:265f6a07-617f-45a9-bcfe-602a496fed25)
pykka: DEBUG: Registered AnActor (urn:uuid:265f6a07-617f-45a9-bcfe-602a496fed25)
--------------------- >> end captured logging << ---------------------

Name             Stmts   Miss  Cover   Missing
----------------------------------------------
pykka                3      0   100%   
pykka.actor        124      3    98%   9-11, 205
pykka.future        40      5    88%   6-8, 48, 57, 66
pykka.gevent        29      3    90%   26, 33-34
pykka.proxy         41      0   100%   
pykka.registry      31      0   100%   
----------------------------------------------
TOTAL              268     11    96%   
----------------------------------------------------------------------
Ran 98 tests in 0.804s

FAILED (errors=19)

Add broadcast to ActorRegistry?

Would something along the lines of ActorRegistry.broadcast(message, target=ActorClass) make sense? Would at least be a nice abstraction for idle in Mopidy if nothing else :)

    def broadcast(self, message, target=None):
        if target:
            refs = self.get_by_class(target)
        else:
            refs = self.get_all()

        for ref in refs:
            ref.send_one_way(message)

If this is out of the intended scope for pykka I could always just built a broadcast helper function somewhere in Mopidy :)

Base Message Class?

hi,
first, thank you for creating such a useful package in python :)
I just start to try out pykka. just a suggestion.
the message pass through actors is an dict, do you think we could have a base message class to extend? I think it might be more powerful than dict.

Actors that drive their own event processing (or maybe have idle processing)

As I currently understand Pykka actors, they are entirely event driven in the sense that they don't do anything unless they've received an event. What I'm looking for is a way to have an actor that "does things" all of the time, and which handles messages when it's ready.

The motivating example I've got is an actor which monitors a camera (say, a web cam), examining an image every second. When it detects a face in the image, it send a message to some other actor. The obvious (perhaps naive) approach to this is an actor that sits in a loop, taking pictures and, after each picture, explicitly letting events be processed. Something like this:

class CameraActor(pykka.Actor):
    . . .
   def process_events(self):
       if self.its_been_one_second():
           img = take_picture()
           if face_in_img(img): 
               self.target.tell({'face_rect': face_rect(img)})
       super().process_events()

Or something like that. Perhaps an "idle handler" is a better approach. The main point, though, is that I'd like the actor to be able to do processing even in the absence of any external stimulus, e.g. events.

I think I can simulate this currently by having the actor send messages to itself. This feels to me like a lot of ceremony for something that's conceptually simple.

I've looked briefly at Actor._actor_loop(), and this seems like the likely place to insert these kinds of changes.

In any case, hopefully you can understand what I'm looking for. Is something like this supported in Pykka already, or would it require changes? Maybe this is just a bad idea, but then I wonder how I should implement something like my CameraActor.

Explicitly stopping actors weirdness

Hey,

Love pykka. I'm just dabbling with concurrent programming so please bear with me.

I understand that there is no supervisor, however I am trying to explicitly stop actors and am not able to get them to stop with a keyboard inturrupt.

class send_flow(pykka.ThreadingActor):
    # ....

def main():
    logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
    try:
        for i in range(0,4):
            locals()['actor_{0}'.format(i)] = send_flow.start()
    except KeyboardInterrupt:
        LOGGER.info("Stopping actors...")
        # Close all actors to stop blocking
        for i in pykka.ActorRegistry.get_all():
            LOGGER.info("Stopping Actor: " + i.actor_urn + " ...")
            i.stop()
            LOGGER.info("Stopped Actor: " + i.actor_urn)
        LOGGER.info("Stopped actors.")

if __name__ == '__main__':
    main()

I have also tried the pykka.ActorRegistry.stop_all() and that didn't work either.

Any help would be much appreciated.

Nicer syntax for creating/comparing messages

Is there any way we can create messages with some kind of class-inheriance, and have some tools for comparing them?

E.g.:

class Tick(Message):
   def __init__(self, param1, param2):
       self.param1 = param1
       self.param2 = param2

self.actor_ref.tell(Tick('1', '2'))

def on_receive(self, message):
      if message |match| Tick:
          pass

I tried doing this on my own, but doing "name" on a class does not seem to be thread safe! I got really weird errors of actors sending messages of the wrong type!

Reverse actor stopping order (was: Check that actor is still runable after message has been recieved.)

On line https://github.com/jodal/pykka/blob/master/pykka/actor.py#L164 the check for the run-able state is done before inbox.get() which can block for any amount of time, during this waiting run-able can very well have changed.

Symptom of this problem is not finding the refs that you expect in the actor registry. This happens easily when stress-testing with a lot of clients in my current mopidy branch. For each new connection a ThreadingActor is started and as socket data is received it is delivered as a message. Then I CTRL^C mopidy and it stops all the actors, however, since some of my actors have messages in their inbox still they will go on to handle them despite being stopped. Part of this handling includes looking for other non-running actors which of course fails.

ActorRef.ask and on_receive/reply_to semantics

Right now the documentation say the suggested way of replying to message is:

def on_receive(self, message):
    # ...
    if 'reply_to' in message:
        message['reply_to'].set('some value')
    # ...

However that's what _actor_loop already does (

if 'reply_to' in message:
), so there are 2 ways of replying to message at the moment - the one above and this one:

def on_receive(self, message):
    # ...
    return 'some value'

IMO this is one way too many, I believe it's reasonable to change the official on_receive semantics regarding replying to messages to "whatever gets returned/raised from on_receive will be sent back to the sender if they expect the response at, if they don't await the response then the return value is discarded".

That'd IMO result in a slightly cleaner and a bit less repetitive code. Also modifying message and putting reply_to in it would become unnecessary so I'd get rid of it as well.

I appreciate that this is a change in the API, the question is how big is it's impact on already existing projects, use cases etc. I, for one, can't think of use case that'd require using reply_to instead of just returning the value.

Pykka test support

One of the main advantages of using the Actor model is its ease of testing.

Every actor is by default isolated and so verifying the individual behaviours of Actors should be easy because we can provide each actor with fake / stub collaborators and verify that the right messages (of the appropriate formats) are sent and received.

However, it's not simple to capture this in the form of an assertion in a unit test.

Perhaps a couple of examples of how Pykka actors can be tested with popular test/mocking frameworks would help here.

Improve readme/introduction

Pykka's readme doesn't do a great job at explaining what it is, why it is and what it's useful for.

This can be illustrated with first impressions like this: https://twitter.com/ronnypfannsch/status/288784239854514176

A better readme/introduction should:

  • Explain why Pykka was made, with usecase/examples.
  • Explain how Pykka is implemented, e.g. a future is simply a queue. Explain that everything except proxies are really simple and thin abstractions on top of Queue/Thread or AsyncResult/Greenlet. Try to explain proxies as well.
  • Explain what Pykka is not, does not support: maybe list major features from Akka that Pykka doesn't have, e.g. remote actors.

geventactor: Exception KeyError ()

I simply replace ThreadingActor with GeventActor in the official examples/plain_actor.py, the code is below:

#! /usr/bin/env python

import pykka
import gevent
import gevent.monkey
from pykka.gevent import GeventActor

gevent.monkey.patch_all()


#class PlainActor(pykka.ThreadingActor):
class PlainActor(GeventActor):

    def __init__(self):
        super(PlainActor, self).__init__()
        self.stored_messages = []

    def on_receive(self, message):
        if message.get('command') == 'get_messages':
            return self.stored_messages
        else:
            self.stored_messages.append(message)


if __name__ == '__main__':
    actor = PlainActor.start()
    actor.tell({'no': 'Norway', 'se': 'Sweden'})
    actor.tell({'a': 3, 'b': 4, 'c': 5})
    print(actor.ask({'command': 'get_messages'}))
    actor.stop()

After run this I saw:
Exception KeyError: KeyError(4536872784,) in <module 'threading' from '/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.pyc'> ignored

My environments:
python 2.7.10
pykka 1.2.1
gevent 1.0.2

Pykka actors auto scheduling

Hello pykka contributors,

is there any equivalent and easy way to implement some message scheduling like we have in akka lib ;

InjectorAkkaSystemActivator.getSystem().scheduler().schedule(Duration.Zero(),
                      Duration.create(millisecondPeriod, TimeUnit.MILLISECONDS),
                      actor,
                      message,
                      InjectorAkkaSystemActivator.getSystem().dispatcher(),
                      null));

Or do we need to implement this kind of scheduler ourself ? Do you have some best practices on that topic ?

Cheers,

Mathilde

`is_alive()` on `ActorProxy` may cause `ActorDeadError`

Using Pykka 0.12.2.

  1. Get a proxy for a running actor.
  2. ActorRegistry.stop_all()
  3. Call proxy.actor_ref.is_alive() results in the following exception:
Traceback (most recent call last):
  File "/home/jodal/dev/mopidy/tests/frontends/mpris/root_interface_test.py", line 17, in tearDown
    if self.backend.actor_ref.is_alive():
  File "/usr/lib/pymodules/python2.6/pykka/proxy.py", line 81, in __getattr__
    self._update_attrs()
  File "/usr/lib/pymodules/python2.6/pykka/proxy.py", line 61, in _update_attrs
    {'command': 'pykka_get_attributes'})
  File "/usr/lib/pymodules/python2.6/pykka/actor.py", line 426, in send_request_reply
    self.send_one_way(message)
  File "/usr/lib/pymodules/python2.6/pykka/actor.py", line 394, in send_one_way
    raise _ActorDeadError('%s not found' % self)
ActorDeadError: DummyBackend (urn:uuid:0d3a1518-0d19-48ef-b691-b359f177bcda) not found

can't start new thread

image

every actor is need call self.stop() ?

when i call self.stop() in every actor๏ผŒ there has no exception raisedใ€‚ใ€‚

image

how can i controll the actor numbers?

thanks!

How can I stop an eventlet actor on Ctrl+C?

So I am trying to move way for ThreadingActors so my system doesn't eat up 50 threads... Decided to go with Eventlet.

How do I handle the "main" file where everything starts up, and when uses sends kill signal or presses Ctrl+C the script stops?

Currently I have my main loop setup with eventlet actors like so (this used to be time.sleep with threading actors):

while True:
    eventlet.sleep(1)

and my signal handler is responsible for stopping all the actors when the application shuts down:

def signal_handler(s, frame):
    logger.info('You pressed Ctrl+C. Stopping all systems.')
    # Stop all top level actors in reverse order
    poller.stop()
    source_cache.stop()
    harvester.stop()
    linked_store.stop()
    result_store.stop()
    logger.info("All systems stopped.")
    sys.exit(0)

signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

However, this does not work. Signal handler seems to never execute. Instead I get the error:

        Traceback (most recent call last):
      File "c:\Users\drozdyuka\Projects\env\Scripts\harvester-script.py", line 9, in
     <module>
        load_entry_point('ResourceRepositoryNetwork==0.1.9', 'console_scripts', 'har
    vester')()
      File "c:\users\drozdyuka\projects\lpsscf\resourcerepositorynetwork\resourcerep
    ositorynetwork\harvester\harvester_runner.py", line 124, in main
        sleep(1)
      File "c:\Users\drozdyuka\Projects\env\lib\site-packages\eventlet\greenthread.p
    y", line 32, in sleep
        hub.switch()
      File "c:\Users\drozdyuka\Projects\env\lib\site-packages\eventlet\hubs\hub.py",
     line 293, in switch

        return self.greenlet.switch()
      File "c:\Users\drozdyuka\Projects\env\lib\site-packages\eventlet\hubs\hub.py",
     line 345, in run
        self.wait(sleep_time)
      File "c:\Users\drozdyuka\Projects\env\lib\site-packages\eventlet\hubs\selects.
    py", line 34, in wait
        time.sleep(seconds)
      File "c:\users\drozdyuka\projects\lpsscf\resourcerepositorynetwork\resourcerep
    ositorynetwork\harvester\harvester_runner.py", line 106, in signal_handler
        poller.stop()
      File "build\bdist.win32\egg\pykka\actor.py", line 511, in stop
      File "build\bdist.win32\egg\pykka\eventlet.py", line 65, in get
      File "build\bdist.win32\egg\pykka\future.py", line 69, in get
      File "build\bdist.win32\egg\pykka\actor.py", line 502, in _stop_result_convert
    er
      File "build\bdist.win32\egg\pykka\eventlet.py", line 79, in get
      File "c:\Users\drozdyuka\Projects\env\lib\site-packages\eventlet\event.py", li
    ne 121, in wait
        return hubs.get_hub().switch()
      File "c:\Users\drozdyuka\Projects\env\lib\site-packages\eventlet\hubs\hub.py",
     line 279, in switch
        assert cur is not self.greenlet, 'Cannot switch to MAINLOOP from MAINLOOP'
    AssertionError: Cannot switch to MAINLOOP from MAINLOOP
    (env) /C/Users/drozdyuka/Projects/lpsscf [hotfix-actors|? 8]

I even tried wrapping the things into exception handlers, but weird things are happening.

Has anyone had any luck with this?

Set ThreadingActors thread name to class name

Currently the threads name is set to PykkaThreadingActor, when debuging threading issues this isn't really helpful. Adding the actual class name to the thread name would be really nice :-) Should be a on line fix in _start_actor_loop().

Remote actors

Just wondering if there were any plans to add remote actor support. Pyro might be interesting to look at even though its goal is basic object remoting and remote method invocation.

How comes I can't see Pykka debug log messages?

EDIT: I've solved my problem. I thought the pykka logger was somehow buggered but in actual fact, file configuration caused the issue. See the last comment on this issue.

I feel the solution could be documented in Pykka, mainly because it was non-trivial for me to find out that:

  • how logging was supposed to work when a library obtains their own logger, e.g. pykka
  • whether logging causes problems when working with multiple threads (it doesn't)
  • whether logging.getLogger('pykka') gets the exact instance of the logger as Pykka does (it does)
  • how then to set the logger's level to DEBUG; how the root logger's level also needs to be set to DEBUG for messages to show.

Gevent can't work

class GeventPrintActor(GeventActor):
    def print_text(self, message):
        print message
        self.stop()


class ThreadingPrintActor(pykka.ThreadingActor):
    def print_text(self, message):
        print message
        self.stop()


GeventPrintActor.start().proxy().print_text("GeventPrintActor")
ThreadingPrintActor.start().proxy().print_text("ThreadingPrintActor")

Can't see GeventPrintActor in console

Default implementation for `on_failure`?

Hi,

I started playing around with Pykka using the GeventActor. Somehow, my sample code wasn't printing anything, so I dug around, and found out that I was looking for a nonexistent key in one of the actors. However, the exception that caused the error wasn't being raised. I then found out about the on_failure method, where I can get the errors to be printed. Would it not make more sense to have a default implementation for on_failure so that errors are printed by default?

For instance, here's some sample code that causes this problem:

import pykka
import pykka.gevent
import gevent

class Greeter(pykka.gevent.GeventActor):

    #def on_failure(self, exception_type, exception_value, traceback):
    #    print exception_type
    #    print exception_value
    #    print traceback

    def on_receive(self, msg=None):
        if 'name' in msg:
            self.greeting = "Hi, {}".format(msg['name'])
        else:
            self.greeting = "Yo!" + msg['bo']
        print(self.greeting)

actor_ref = Greeter.start()
actor_ref.tell({})

gevent.wait()

When on_failure is uncommented, the KeyError exception is printed. Having something like this already implemented would be helpful for newcomers.

Stopping actors: Stop & Poison Pill

Pykka is lacking the true "stop" functionality. The current implementation is really the "Poison Pill" implementation from Akka:
http://doc.akka.io/docs/akka/snapshot/scala/actors.html#Stopping_actors

This processes all messages in the inbox, and then when it gets to the {"command":"pykka_stop"} message - it shuts down.

The other kind of stop, which should be invoked on the context, clears out the inbox of all messages, allows the actor to finish its work and stops it then.

Maximum

Hello, I love the "proxy" feature of Pykka! But I often run in to this pattern that yields RuntimeError: maximum recursion depth exceeded while calling a Python object. I can't pass a proxy in to the constructor of another actor.

import pykka

class MySupervisor(pykka.ThreadingActor):

    def __init__(self):
        super(MySupervisor, self).__init__()
        self.use_daemon_thread = True
        self.in_future = self.actor_ref.proxy()
        self.worker = Worker.start(self.in_future).proxy()

class Worker(pykka.ThreadingActor):

    def __init__(self, supervisor):
        super(Worker, self).__init__()
        self.use_daemon_thread = True

sup = MySupervisor.start().proxy()

Proxy to self as attribute on actor causes deadlock

Your documentation says that referencing your internal proxy is ideal. This is exactly what I want to do, but it hangs every time. It gets out of the constructor and does nothing. Is this a race condition on proxy (internal vs external)?

Details:

  • code below
  • Python 2.7.8
#! /usr/bin/env python
from pykka import ThreadingFuture, ThreadingActor


def go_command():
    actor = HungActor.start()
    proxy = actor.proxy()
    proxy.run()
    actor.stop()


class HungActor(ThreadingActor):
    def __init__(self):
        super(HungActor, self).__init__()
        proxy = self.actor_ref.proxy()
        print proxy
        self.proxy = proxy
        print "out of constructor"

    def run(self):
        print "run"
        self.proxy.a()

    def a(self):
        print "a async"
        self.proxy.b()

    def b(self):
        print "b async"


if __name__ == '__main__':
    # scriptine allows for quick easy scripting
    # where this script will run via `corelogic_data.py command`
    # commands are any def with {SOME_NAME}_command
    import scriptine

    scriptine.run()

to run:
./crap_actor.py go

Can not import GeventActor

version detail:

  • gevent==1.0.2
  • Pykka==1.2.1
  • Mac Yosemite 10.10.4

pykka.GeventActor

Traceback (most recent call last):
File "", line 1, in
AttributeError: 'module' object has no attribute 'GeventActor'

how can i import GeventActor?

Initialising an actor system with cyclic references

Pykka's API (.start method) doesn't really let the user make actor systems with cyclic referential relationships like:

a -> b -> c -> a

where a -> b denotes that actor a holds a reference to actor b (probably for the better!)

However, I find this useful.

What is idiomatic Pykka for initialising such a ring of actors?

Implement remoting functionality

I know that the project describes it self as not trying to be a port of Akka, but as far as I know, is the most feature complete implementation of the actor model in python.

I think it would be a great improvement for the python community to have an Akka-like implementation of the actor model.

And the remoting like functionality is one thing I couldn't find well implemented in python. I believe something implemented over zeromq would be great. Maybe implementing the gossip model for clustering too.

What do you think?

Infinite loop when making proxy after having made one internal to the obj.

I am working on a project where I would be using multiple actors for a webcrawler. My design goal was to have a central hub that would handle all the actors. However I ran into an issue when trying to have the hub have both a proxy of itself as well as other actors. The problem can be demonstrated just with:

from pykka.gevent import GeventActor

class foo(GeventActor):
    def __init__(self):
        super(foo, self).__init__()
        self.p = self.actor_ref.proxy()
        print(self.p)
    def m(self):
        print("Method executed")

A = foo.start()
X = A.proxy()

This locks into an infinite loop of:

File "/usr/local/lib/python2.7/dist-packages/pykka/proxy.py", line 111,
    in _get_attributes 'traversable': self._is_traversable_attribute(attr),
File "/usr/local/lib/python2.7/dist-packages/pykka/proxy.py", line 140,
    in _is_traversable_attribute return hasattr(attr, 'pykka_traversable')
File "/usr/local/lib/python2.7/dist-packages/pykka/proxy.py", line 158,
    in __getattr__ self._known_attrs = self._get_attributes()

I've testing this with both Gevent and the standard Thread models and gotten the same result. I'm on Python 2.7.3 and Pykka 1.2.0.

Stopping and Actor should stop its children

I think this is one of the biggest selling points of an actor system. Stopping an actor SHOULD stop all of its children.

Without this, we are back in the land of "wild threads" and memory leaks.

This behaviour should be a direct consequence of Stopping actors. See #46.

Support multiple actor systems

I want to replace the global ActorRegistry with an "actor system" concept. You should be able to run multiple actor systems in the same process at the same time. Any actor lookups, like you today do using ActorRegistry, should only return results from the same actor system. Two actors in two different actor systems should under normal API usage not be able to get references to each other. In other words, an actor system should be isolated from other actor systems.

The use case here is mostly to get rid of the global state maintained by ActorRegistry, not to make it possible to have multiple actor systems in the same process. That's just a convenient side effect. Isolated actor systems will also be of great help in tests, and should make parallel test running possible since tests no longer share any global state.

Today's ActorRegistry API can maybe be kept around as a convenience wrapper around a default actor system.

Support a mix of threading and gevent based actors

With the upcoming release of gevent 1.0 and it's support for multiple threads, supporting a mix of ThreadingActor and GeventActor in the same process, that can communicate with each other, is an attractive extension of Pykka.

AttributeError: Queue instance has no attribute 'not_full'

Using Pykka 0.12.2-1 and Python 2.7.1-5ubuntu2.

Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib/pymodules/python2.7/pykka/actor.py", line 331, in run
    return Actor._run(self)
  File "/usr/lib/pymodules/python2.7/pykka/actor.py", line 174, in _run
    message['reply_to'].set_exception(exception)
  File "/usr/lib/pymodules/python2.7/pykka/future.py", line 104, in set_exception
    self.set(exception)
  File "/usr/lib/pymodules/python2.7/pykka/future.py", line 101, in set
    self._queue.put(value)
  File "/usr/lib/python2.7/Queue.py", line 118, in put
    self.not_full.acquire()
AttributeError: Queue instance has no attribute 'not_full'

Implementation of ActorSystem, Dispatchers and Executors

Hi, recently I have been learning about Scala and Akka and I became amazed by such technologies. The problem is that I have some ongoing projects that are already binded to Python language. I was looking for similar things in Python and found Pykka. It looks great to me, as I don't need all the features that Akka have, so I am considering seriously to start using it. Congratulations for the current work !

However there is something that I don't understand from the current design. Why actors are binded to a given threading technology and can not be decided on runtime, for example from the user configuration ? When you define class MyActor(ThreadingActor) there is no way to use this same actor logic with gevent without touching the source code (let's assume that gevent monkey patch doesn't exist). Or imagine you want to expand pykka to use asyncio and want to start using it in current projects...

From what I have been seeing in the source code it does not look so difficult to solve this problem, but it would require to change a bit the overall design of Pykka. I have already seen the issue #18 and I completely agree with it. This would allow not only having independent actor systems but also having an inbox, future and event loop factory per system, which is the only thing that {Threading,Gevent,Eventlet}Actor's are doing (please correct me if I am wrong).

It would be called ActorSystem and have an actor factory method actorOf (just like in Akka) that would create actor instances with a reference to the actor system that created it. The ActorSystem would implement the methods _create_inbox, _create_future and _create_event_loop that would delegate to the selected threading technology implementation.

ActorSystem would have a constructor that would allow to configure it. As a replacement for Akka Props it could be used the builtin library functools.partial.

Just as a reference:

import pykka
from functools import partial

class Greeter(pykka.Actor):
    def __init__(self, greeting='Hi there!'):
        super(Greeter, self).__init__()
        self.greeting = greeting

    def on_receive(self, message):
        print(self.greeting)

system = pykka.ActorSystem(using=pykka.THREADS)
actor_ref = system.actorOf(partial(Greeter, greeting='Hi you!'))

What do you think ? If we agree on a design I could make a fork and start working on it if anyone else is interested in doing it.

Supervision

Currently, the only way to know that the "child" actor failed, is to employ the 'ask' pattern:

actor = MyActor.start()
try:
      actor.ask({'command':'hey'})
except Exception:
      # actor failed!

However, if we are using tell, the is no way to "detect" a failure:

actor.tell({'command':'throw me an exception!'})
# Everything is great!

Akka handles this by registering an actor for "death watch":
http://doc.akka.io/docs/akka/snapshot/general/supervision.html

In pykka we can implement a similar thing manually by providing an actor with a parent argument and overriding on_failure in the MyActor:

class MyActor(ThreadingActor):
    def __init__(self, parent):
        super(MyActor, self).__init__()
        self.parent = parent

    def on_failure(self, exception_type, exception_value, traceback):
        self.parent.tell({'command': 'death', 'exception': exception_value})

Then the parent actor would simply have to handle it, like so:

class Parent(ThreadingActor):
    def __init__(self, parent):
        super(Parent, self).__init__()
        self.child = MyActor.start(parent=self.actor_ref)

    def on_receive(self, message):
        if message.get('command') == 'death:            
            # Child actor died....
            if not self.child.is_alive():
                # restart it...
                self.child = MyActor.start(parent=self.actor_ref)

P.S.: On the readme page, where it says what "pykka is not" --- supervision is one of the mentions.
Is there any specific reasons (aside from time/effort) that its not? I.e. are you opposed to the idea?

Handle stop being called from on_start()

Following extract from an actor manages to get stop called twice.

    def on_start(self):                                                                                                                                        
        try:                                                                                                                                                   
            self.send_response([u'OK MPD %s' % VERSION])                                                                                                       
            self.request_loop()                                                                                                                                
        except gobject.GError, e:                                                                                                                              
            pass                                                                                                                                               
        self.stop()                                                                                                                                            

    def on_stop(self):                                                                                                                                         
        self.channel.close()                                                                                                                                   
        del self.sock                                                                                                                                          

    def close(self):                                                                                                                                           
        self.channel.close()  

proxy() does not play well with patched methods

Let's say we have an exciting actor that does nothing:

class NopActor(pykka.ThreadingActor):
    def __init__(self):
        super(NopActor, self).__init__()

    def do_nothing(self):
        return

If we try to patch it using Voidspace mock to verify it gets called somehow, somewhere.

        @patch.object(NopActor, 'do_nothing')
        def should_be_able_to_spy_on_actors(mock_method):
            nop_actor = NopActor.start().proxy()    # blocks indefinitely
            nop_actor.do_nothing().get()
            mock_method.assert_called_once_with()

The test hangs indefinitely. This could be related to or the same issue as #26.

I have no identified a workaround for this case.

Create ActorProxy from actor's self.actor_ref

Sending messages to one self is a common way of implementing recursion using actors. This is doable in Pykka using e.g.:

self.actor_ref.tell({'some': 'message'})

Obviously, blocking on getting the value from a future encapsulating a result calculated by one self will deadlock the actor, as the actor will not be able to respond to the message while waiting for the response. It's biting its own tail:

future = self.actor_ref.ask({'some': 'message'})
future.get()   # will never return

Analogous, it would be nice to be able to create an ActorProxy around self.actor_ref and call methods on the proxy instead of sending plain messages. As @zombiecalypse discovered and pointed out in a mail to me February 17, this currently doesn't work. An example:

proxy = self.actor_ref.proxy()
proxy.foo()   # will never return

Even though we don't block on any future here, the proxy.foo() method call will never return. This is because the proxy object on it's first use will query the actor about what fields and methods it has available before it performs the method call or field access it was asked to do.

To make the above code work, we need to either:

  • Rewrite the proxy implementation so that we don't need to ask about available methods and fields first. Some of the work may be done on the receiving side of the call. Some of the work may be done by looking at the callees class, with the possible loss of free access to dynamically added attributes.
  • Or, we can special-case the creation of a proxy to itself. Since the actor has full access to itself, it can preseed the proxy object with information about itself, so that the proxy never needs to ask for what attributes exist, and thus never block.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.