Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support "become" to initialize actors with circular dependencies #38

Open
drozzy opened this issue Jul 16, 2014 · 2 comments
Open

Support "become" to initialize actors with circular dependencies #38

drozzy opened this issue Jul 16, 2014 · 2 comments

Comments

@drozzy
Copy link

drozzy commented Jul 16, 2014

Is it possible to provide support for "become" functionality in akka?
See:
http://doc.akka.io/docs/akka/snapshot/scala/actors.html#become-unbecome

Seems like python should be particularly well-suited to such "hot-swapping". Thanks!

@jodal jodal added the wishlist label Aug 10, 2014
@jodal jodal changed the title support for become? Support "become" to initialize actors with circular dependencies Jan 29, 2019
@akatashev
Copy link

akatashev commented May 19, 2020

In addition to become and unbecome it could be useful to implement stash and unstash_all.
https://doc.akka.io/docs/akka/current/actors.html#stash

Unfortunately, it's not really feasible with a standard Queue, because unstash_all means that messages from stash are being prepended to a mailbox. Queue classes don't give us a chance to do this.

I believe, it can be implemented with using PriorityQueues instead.
With priority levels:
0 - Control messages (notifications about child death, whatever).
1 - Unstashed messages (they should be processed before "normal" ones).
2 - Normal messages.
Tell, for example, could be then changed this way:

    def tell(self, message):
        """
        Send message to actor without waiting for any response.

        Will generally not block, but if the underlying queue is full it will
        block until a free slot is available.

        :param message: message to send
        :type message: any

        :raise: :exc:`pykka.ActorDeadError` if actor is not available
        :return: nothing
        """
        if not self.is_alive():
            raise ActorDeadError(f"{self} not found")

        if isinstance(message, (list_of_control_message_types)):
            priority=0
        else:
            priority=2        
        self.actor_inbox.put((priority, Envelope(message)))

Ask as well would have priority 2 for its normal messages.

I took a look into Akka and found out that it stashes not the message content but the message Envelope itself.
Its ActorCell class has a var currentMessage: Envelope = _ variable. And this currentMessage is being stashed.

In Pykka it's possible to have a self.current_message: Envelope variable in the Actor class as well.

    def _actor_loop(self):
        """
        The actor's event loop.

        This is the method that will be executed by the thread or greenlet.
        """
        try:
            self.on_start()
        except Exception:
            self._handle_failure(*sys.exc_info())

        while not self.actor_stopped.is_set():
            _, envelope = self.actor_inbox.get()
            self.current_message = envelope
            try:
                response = self._handle_receive(envelope.message)
                if envelope.reply_to is not None:
                    envelope.reply_to.set(response)
...

And to implement stash and unstash_all like this:

actor_message_stash: deque = deque()

def _stash(self):
    self.actor_message_stash.append(self.current_message)

def _unstash_all(self):
    while self.actor_messages_stash:
        envelope = self.actor_messages_stash.popleft()    
        self.actor_inbox.put((1, envelope))

I believe (I should try it for sure), it will allow us to use ask pattern successfully as well. We just would need to return None after stashing ask requests to avoid executing envelope.reply_to.set(response) until actual response is received.

I'll try it and post later whether this approach works or not.

@akatashev
Copy link

It works perfectly for tells. However, I needed to implement dummy def __lt__(self, other) and def __gt__(self, other) for Envelope to let PriorityQueue to store them.

Unfortunately, stashing doesn't really work for asks and I am not sure, whether it can be fixed.
While ThreadingActor and GeventActor just return None for a stashed ask(message, block=True) request, EventletActor raises an exception:

Traceback (most recent call last):
  File "~/.local/lib/python3.6/site-packages/eventlet/queue.py", line 118, in switch
    self.greenlet.switch(value)
  File "~/.local/lib/python3.6/site-packages/eventlet/greenthread.py", line 221, in main
    result = function(*args, **kwargs)
  File "~/GitHub/pykka/pykka/_actor.py", line 216, in _actor_loop
    envelope.reply_to.set_exception()
  File "~/GitHub/pykka/pykka/eventlet.py", line 83, in set_exception
    self.event.send_exception(*(exc_info or sys.exc_info()))
  File "~/.local/lib/python3.6/site-packages/eventlet/event.py", line 220, in send_exception
    return self.send(None, args)
  File "~/.local/lib/python3.6/site-packages/eventlet/event.py", line 162, in send
    assert self._result is NOT_USED, 'Trying to re-send() an already-triggered event.'
AssertionError: Trying to re-send() an already-triggered event.

Akka in this case returns a future whose value can be received and processed later and that doesn't lead to exceptions or deadlocks if we don't try to do something silly like Await.result(future ,atMost = 3 seconds).

I'll check later what happens when we try to use ask(message, block=False).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants