diff --git a/README.md b/README.md index c85d809..104b82f 100644 --- a/README.md +++ b/README.md @@ -2,143 +2,91 @@ Minimalistic implementation of the actor concurrency model powered by asyncio. ---- +## -* Installation: `pip install pyctor` (requires Python `>= 3.7.3`) +* Installation: + * `pip install pyctor` (requires Python `>= 3.7.3`) +* [Examples](examples) * [API](#api) -## Example +## Overview -Imagine a teacher and two students, Sarah and Mike, at a playground ready to play a round of ping-pong. The teacher throws the ball to Mike and the match between the two kids begins. Should one of them decide they've had enough and leave the playground, the other one will follow. Meanwhile, the teacher watches the students and leaves with them once they've finished playing. - -Let's start with the implementation of a student, i.e., Sarah and Mike. The students exchange ping-pong messages until one of them randomly decides to stop: +Pyctor is a lightweight implementation of the actor model for concurrent systems. Actors should only modify their own private state. They can influence other actors in the system through messaging. In Pyctor, we define actors by subclassing the `Actor` class: ```python -@dataclass -class Play: - student: ActorRef # play with this student - - -class Student(Actor): # subclass of Actor - # Every Actor must implement `receive` to receive messages. +class MyActor(pyctor.Actor): async def receive(self, sender, message): - if isinstance(message, Play): - # Student was told to play. - # Send a message to the other student which will start the match. - self.tell(message.student, 'ping!') - elif message == 'bye!': - # The other student has stopped so this student stops as well. - self.stop() - elif random.random() < 0.25: - # With a probability of 0.25 leave the playground and notify the other student. - self.tell(sender, 'bye!') - self.stop() - elif message == 'ping!': - # Respond with "pong" after 1 second. - self.schedule_tell(sender, 'pong!', delay=1) - elif message == 'pong!': - # Respond with "ping" after 1 second. - self.schedule_tell(sender, 'ping!', delay=1) - print(f'[{datetime.now()}] {sender} to {self}: {message}') + print(f'received message: {message}') ``` -Upon creation, the teacher creates the two students and asks the system to send a `Terminated` message if a student stops. If no students are left, the teacher shuts down the entire system: - -```python -class Teacher(Actor): - def __init__(self): - super().__init__() - self.students = 0 - - async def started(self): - # Create 2 students: Mike and Sarah. - mike = self.system.create(Student(), name='Mike') - sarah = self.system.create(Student(), name='Sarah') - # Get notified when students stop. - self.watch(mike) - self.watch(sarah) - # Tell Mike to play with Sarah. - self.tell(mike, Play(sarah)) - # Initialize the number of students to 2. - self.students = 2 - print(f'[{datetime.now()}] {self}: it\'s playtime!') - - async def receive(self, sender, message): - if isinstance(message, Terminated): - # Student has stopped. - self.students -= 1 - if self.students == 0: - # All students have stopped so shut down. - self.system.shutdown() - - async def stopped(self): - print(f'[{datetime.now()}] {self}: time to go back.') -``` +Every actor must override the `receive` method. Inside it, the actor responds to the received messages, e.g., by modifying the internal state, sending new messages or creating new actors. -Finally, we implement the top-level entry point `main()` function that creates the actor system, the teacher, and waits for the system to shut down. +All actors live in a single ecosystem called `ActorSystem`. The actor system controls the lifecycle of every actor and handles message passing between actors. Actors may override lifecycle methods (e.g., `started`, `restarted` or `stopped`) that will be called by the system at specific points throughout actor's life. In the main function below, which serves as an entry point, we create the actor system and the first actor: ```python -import asyncio -import random -from dataclasses import dataclass -from datetime import datetime - -from pyctor import ActorSystem, Actor, ActorRef, Terminated - -# [Play code omitted] +async def main(): + system = pyctor.ActorSystem() # create actor system + my_actor = system.create(MyActor()) # create first actor + system.tell(my_actor, 'hello') # send message to the actor + await system.stopped() # await system shutdown +``` -# [Student code omitted] +The `create` method starts an actor and returns a reference for that actor. Actors may create child actors, forming a hierarchy of actors in the system. Actors should not be modified directly. Instead, they should communicate through messages. Messages are sent with the `tell` method which expects a reference of the actor that shall receive the message. The actor system guarantees that the messages are received in the same order they were sent in. -# [Teacher code omitted] - -async def main(): - # Create the actor system. - system = ActorSystem() - # Create the teacher. - system.create(Teacher(), name='Teacher') - # Run until system is shutdown. - await system.stopped() +We run the actor system by passing the main function to the `asyncio.run` method: +```python if __name__ == '__main__': asyncio.run(main()) ``` -An output of this script could be: - -``` -[2020-06-21 12:53:41.598117] Teacher: it's playtime! -[2020-06-21 12:53:41.598117] Teacher to Mike: Play(student=Sarah) -[2020-06-21 12:53:42.598700] Mike to Sarah: ping! -[2020-06-21 12:53:43.600073] Sarah to Mike: pong! -[2020-06-21 12:53:44.601047] Mike to Sarah: ping! -[2020-06-21 12:53:44.602062] Sarah to Mike: bye! -[2020-06-21 12:53:44.603047] Teacher: time to go back. -``` +You can find further examples in the [examples](examples) directory. ## API -Quick overview of the API: - * `pyctor.ActorSystem` - * `create(actor, name=None)` Start an actor. - * `tell(actor, message)` Deliver a message to an actor. - * `schedule_tell(actor, message, *, delay=None, period=None)` Schedule a message to be delivered to an actor at some time. - * `stop(actor)` Stop an actor. - * `shutdown(timeout=None)` Initialize a shutdown of the actor system. - * `stopped()` Await system shutdown. + * `create(actor, name=None)`
+ Start an actor. + * `tell(actor, message)`
+ Deliver a message to an actor. + * `schedule_tell(actor, message, *, delay=None, period=None)`
+ Schedule a message to be delivered to an actor at some time. The message may also be delivered periodically. + * `stop(actor)`
+ Stop an actor. + * `shutdown(timeout=None)`
+ Initialize a shutdown of the actor system. + * `stopped()`
+ Await system shutdown. * `pyctor.Actor` - * `receive(sender, message)` Must be implemented by the class inheriting from Actor. It is used by the system to deliver the messages. - * `tell(actor, message)` Deliver a message to another actor. - * `schedule_tell(actor, message, *, delay=None, period=None)` Schedule a message to be delivered to another actor at some time. - * `watch(actor)` Watch another actor and receive a `Terminated(actor)` message when the watched actor stops. - * `unwatch(actor)` Stop watching another actor. - * `stop()` Stop the actor. - * `started()` Called by the system before the actor starts to receive messages. - * `restarted(sender, message, error)` Called by the system if receiving the message caused an error. Actor will continue to receive messages. - * `stopped()` Called by the system before stopping the actor. The actor will not receive messages anymore. + * `receive(sender, message)`
+ This method is called by the system every time a message is received by the actor. It must be implemented by the class inheriting from Actor. + * `create(actor, name=None)`
+ Start a child actor. The child actor will be watched. + * `tell(actor, message)`
+ Deliver a message to another actor. + * `schedule_tell(actor, message, *, delay=None, period=None)`
+ Schedule a message to be delivered to another actor at some time. The message may also be delivered periodically. + * `watch(actor)`
+ Watch another actor and receive a `Terminated(actor)` message when the watched actor stops. + * `unwatch(actor)`
+ Stop watching another actor. + * `stop()`
+ Stop the actor. + + Lifecycle callbacks: + + * `started()`
+ Called by the system before the actor starts to receive messages. + * `restarted(sender, message, error)`
+ Called by the system if receiving the message caused an error. Actor will continue to receive messages. + * `stopped()`
+ Called by the system before stopping the actor. The actor will not receive messages anymore. * System messages: - * `PoisonPill()` kills an actor. - * `Terminated(actor)` is sent to all watchers when the actor stops. - * `DeadLetter(actor, message)` is sent back to the sender if the actor was unable to receive the message. \ No newline at end of file + * `PoisonPill()`
+ Stop an actor. + * `Terminated(actor)`
+ Notify that the actor has stopped. + * `DeadLetter(actor, message)`
+ Notify the sender that the actor did not receive the message. \ No newline at end of file diff --git a/examples/01_getting_started.py b/examples/01_getting_started.py index 58313d5..e9f1166 100644 --- a/examples/01_getting_started.py +++ b/examples/01_getting_started.py @@ -12,6 +12,8 @@ class Play: class Student(Actor): # subclass of Actor + """Students exchange ping-pong messages until one of them randomly decides to stop.""" + # Every Actor must implement `receive` to receive messages. async def receive(self, sender, message): if isinstance(message, Play): @@ -31,37 +33,36 @@ async def receive(self, sender, message): elif message == 'pong!': # Respond with "ping" after 1 second. self.schedule_tell(sender, 'ping!', delay=1) - print(f'[{datetime.now()}] {sender} to {self}: {message}') + print(f'[{datetime.now()}] {sender.name} to {self.name}: {message}') class Teacher(Actor): + """Teacher creates the two students and watches them. + If students stop playing, the teacher shuts down the entire system.""" + def __init__(self): super().__init__() self.students = 0 async def started(self): - # Create 2 students: Mike and Sarah. - mike = self.system.create(Student(), name='Mike') - sarah = self.system.create(Student(), name='Sarah') - # Get notified when students stop. - self.watch(mike) - self.watch(sarah) + # Create 2 students: Mike and Sarah. Children are watched by default. + mike = self.create(Student(), name='Mike') + sarah = self.create(Student(), name='Sarah') + self.students = 2 # Tell Mike to play with Sarah. self.tell(mike, Play(sarah)) - # Initialize the number of students to 2. - self.students = 2 print(f'[{datetime.now()}] {self}: it\'s playtime!') async def receive(self, sender, message): if isinstance(message, Terminated): - # Student has stopped. self.students -= 1 + print(f'[{datetime.now()}] {message.actor.name} has stopped playing') if self.students == 0: # All students have stopped so shut down. self.system.shutdown() async def stopped(self): - print(f'[{datetime.now()}] {self}: time to go back.') + print(f'[{datetime.now()}] {self.name}: time to go back.') async def main(): diff --git a/examples/02_master_worker.py b/examples/02_master_worker.py index e14dafa..616769d 100644 --- a/examples/02_master_worker.py +++ b/examples/02_master_worker.py @@ -31,12 +31,12 @@ class WorkFailed: class Worker(Actor): """Worker does work and reports back to the master.""" + def __init__(self, master): super().__init__() self.master = master async def started(self): - self.watch(self.master) self.tell(self.master, WorkerReady()) async def receive(self, sender, message): @@ -45,9 +45,6 @@ async def receive(self, sender, message): elif isinstance(message, (WorkDone, WorkFailed)): self.tell(self.master, message) self.tell(self.master, WorkerReady()) - elif isinstance(message, Terminated): - if message.actor == self.master: - self.stop() async def do_work_report_back(self, work): try: @@ -63,6 +60,7 @@ async def do_work(self, work): class Master(Actor): """Master receives work from other actors, distributes the work across its workers and sends the results back.""" + def __init__(self, worker_init, num_workers): super().__init__() self.worker_init = worker_init @@ -73,10 +71,9 @@ def __init__(self, worker_init, num_workers): async def started(self): for i in range(self.num_workers): - worker = self.system.create( + self.create( self.worker_init(self.actor_ref), name=f'Worker-{i+1}') - self.watch(worker) async def receive(self, sender, message): if isinstance(message, WorkerReady): @@ -133,7 +130,7 @@ def __init__(self): self.failed = 0 async def started(self): - self.master = self.system.create( + self.master = self.create( Master(CountWorker, num_workers=parallelism), name='Master') print(f'[{datetime.now()}] {self.name}: Start the count!') diff --git a/pyctor/actor.py b/pyctor/actor.py index 3494416..7be7feb 100644 --- a/pyctor/actor.py +++ b/pyctor/actor.py @@ -4,24 +4,24 @@ import logging import uuid from dataclasses import dataclass -from typing import Union +from typing import Union, List, Dict, Set @dataclass(repr=False, eq=False, frozen=True) class ActorRef: # serializable reference which represents an Actor actor_id: str + path: str name: str def __eq__(self, other): return isinstance(other, ActorRef) and self.actor_id == other.actor_id def __hash__(self): return hash(self.actor_id) - def __repr__(self): return self.name - def __str__(self): return self.name + def __repr__(self): return self.path + def __str__(self): return self.path class Actor: # inheritable Actor class # noinspection PyTypeChecker def __init__(self): - self.actor_ref: ActorRef = None - self.system: ActorSystem = None + self._context: '_ActorContext' = None async def receive( self, @@ -39,6 +39,28 @@ async def receive( """ raise NotImplementedError + # noinspection PyProtectedMember + def create( + self, + actor: 'Actor', + name: str = None + ) -> ActorRef: + """Initialize and asynchronously start a child actor. The child actor will be watched. + + Args: + actor: Actor instance to start. + name: Name of the actor. + + Returns: + Actor reference. + """ + child_actor_ref = self.system._create( + actor=actor, + parent=self.actor_ref, + name=name) + self.watch(child_actor_ref) + return child_actor_ref + # noinspection PyProtectedMember def tell( self, @@ -112,7 +134,7 @@ def unwatch( ) -> None: """Stop watching another actor. - This function can be called even if the actor is not watched anymore. + This method can be called even if the actor is not watched anymore. Args: actor: Watched actor. @@ -158,28 +180,46 @@ async def restarted( None """ logging.exception('%s failed to receive message %s from %s', - self, message, sender, exc_info=error) + self.actor_ref, message, sender, exc_info=error) async def stopped(self) -> None: """Called by the system to let this actor know that they will not receive messages anymore. + When this method is called, the actor's children have already been terminated. Returns: None """ pass + @property + def actor_ref(self) -> ActorRef: + return self._context.actor_ref + + @property + def system(self) -> 'ActorSystem': + return self._context.system + + @property + def parent(self) -> ActorRef: + return self._context.parent + @property def name(self) -> str: return self.actor_ref.name - def __str__(self): return self.name - def __repr__(self): return self.name + @property + def path(self) -> str: + return self.actor_ref.path + + def __str__(self): return self.path + def __repr__(self): return self.path class ActorSystem: # ActorSystem controls all the actors def __init__(self): - self._actors = {} - self._stopped = asyncio.Event() + self._actors: Dict[ActorRef, '_ActorContext'] = {} + self._is_stopped = asyncio.Event() + self.children: List[ActorRef] = [] def create( self, @@ -196,26 +236,17 @@ def create( Actor reference. Examples: - Start an actor by passing the actor instance to this function. + Start the actor by passing its instance to this method. >> class MyActor(Actor): ... async def receive(self, sender, message): pass >> my_actor = ActorSystem().create(MyActor()) """ - if not isinstance(actor, Actor): - raise ValueError(f'Not an actor: {actor}') - actor_id = uuid4() - if not name: - name = f'{type(actor).__name__}-{actor_id}' - actor_ref = ActorRef(actor_id, name) - actor.actor_ref = actor_ref - actor.system = self - actor_ctx = _ActorContext() - actor_ctx.lifecycle = asyncio.get_event_loop().create_task( - self._actor_lifecycle_loop(actor, actor_ref, actor_ctx)) - self._actors[actor_ref] = actor_ctx - return actor_ref + return self._create( + actor=actor, + parent=None, + name=name) def tell( self, @@ -233,7 +264,8 @@ def tell( """ self._tell( actor=actor, - message=message) + message=message, + sender=None) def schedule_tell( self, @@ -244,6 +276,7 @@ def schedule_tell( period: Union[None, int, float] = None ) -> asyncio.Task: """Schedule a message to be delivered to an actor at some time. + The message may also be delivered periodically. Args: actor: Actor that will receive a message from the system. @@ -257,6 +290,7 @@ def schedule_tell( return self._schedule_tell( actor=actor, message=message, + sender=None, delay=delay, period=period) @@ -274,7 +308,8 @@ def stop( """ self._tell( actor=actor, - message=PoisonPill()) + message=PoisonPill(), + sender=None) def shutdown( self, @@ -291,37 +326,77 @@ def shutdown( """ asyncio.create_task(self._shutdown(timeout=timeout)) - async def stopped(self) -> None: + def stopped(self): """Await system shutdown. Returns: Awaitable until the system is fully shutdown. """ - await self._stopped.wait() + return self._is_stopped.wait() + + def _create( + self, + actor: Actor, + *, + parent: Union[None, Actor, ActorRef], # parent is None if system is the parent + name: str = None + ) -> ActorRef: + if not isinstance(actor, Actor): + raise ValueError(f'Not an actor: {actor}') + if parent: + parent = self._validate_actor_ref(parent) + parent_ctx = self._actors[parent] + child_idx = len(parent_ctx.children) + 1 + else: + child_idx = len(self.children) + 1 + if not name: + name = f'{type(actor).__name__}-{child_idx}' + if parent: + path = f'{parent.path}/{name}' + else: + path = name + actor_id = uuid4() + actor_ref = ActorRef(actor_id=actor_id, path=path, name=name) + actor_ctx = _ActorContext(self, actor_ref, parent) + actor_ctx.lifecycle = asyncio.get_event_loop().create_task( + self._actor_lifecycle_loop(actor, actor_ref, actor_ctx)) + actor._context = actor_ctx + self._actors[actor_ref] = actor_ctx + if parent: + # parent_ctx is assigned above + # noinspection PyUnboundLocalVariable + parent_ctx.children.append(actor_ref) + else: + self.children.append(actor_ref) + return actor_ref def _tell( self, actor: Union[Actor, ActorRef], message: 'any', *, - sender: Union[None, Actor, ActorRef] = None, # sender is None if the system sends the message + sender: Union[None, Actor, ActorRef], # sender is None if system sends the message ) -> None: actor = self._validate_actor_ref(actor) if sender: sender = self._validate_actor_ref(sender) + if sender not in self._actors: + raise ValueError(f'Sender does not exist: {sender}') if actor in self._actors: actor_ctx = self._actors[actor] actor_ctx.letterbox.put_nowait((sender, message)) elif sender: deadletter = DeadLetter(actor=actor, message=message) - self._tell(sender, deadletter) + self._tell(sender, deadletter, sender=None) + else: + logging.warning('Failed to deliver message %s to %s', message, actor) def _schedule_tell( self, actor: Union[Actor, ActorRef], message: 'any', *, - sender: Union[None, Actor, ActorRef] = None, # sender is None if the system sends the message + sender: Union[None, Actor, ActorRef], # sender is None if the system sends the message delay: Union[None, int, float] = None, period: Union[None, int, float] = None ) -> asyncio.Task: @@ -339,7 +414,7 @@ async def _schedule_tell_loop( actor: Union[Actor, ActorRef], message: 'any', *, - sender: Union[None, Actor, ActorRef] = None, # sender is None if the system sends the message + sender: Union[None, Actor, ActorRef], # sender is None if system sends the message ts: Union[None, int, float] = None, period: Union[None, int, float] = None ) -> None: @@ -349,7 +424,7 @@ async def _schedule_tell_loop( if delay > 0: await asyncio.sleep(delay) self._tell(actor, message, sender=sender) - if not period or actor not in self._actors: + if not period: break delay = period @@ -368,8 +443,8 @@ def _watch( raise ValueError(f'Actor cannot watch themselves: {actor}') actor_ctx = self._actors[actor] other_ctx = self._actors[other] - actor_ctx.watching.append(other) - other_ctx.watched_by.append(actor) + actor_ctx.watching.add(other) + other_ctx.watched_by.add(actor) def _unwatch( self, @@ -380,29 +455,28 @@ def _unwatch( if actor not in self._actors: raise ValueError(f'Actor does not exist: {actor}') other = self._validate_actor_ref(other) - if other not in self._actors: - return # other actor has been terminated so unwatch is obsolete if actor == other: raise ValueError(f'Actor cannot unwatch themselves: {actor}') actor_ctx = self._actors[actor] - other_ctx = self._actors[other] if other in actor_ctx.watching: actor_ctx.watching.remove(other) - other_ctx.watched_by.remove(actor) + if other in self._actors: + other_ctx = self._actors[other] + if actor in other_ctx.watched_by: + other_ctx.watched_by.remove(actor) async def _shutdown( self, timeout: Union[None, int, float] = None ) -> None: if self._actors: - lifecycle_tasks = [] - for actor_ref, actor_ctx in self._actors.items(): + for actor_ref in self.children: # children propagate stop messages self.stop(actor_ref) - lifecycle_tasks.append(actor_ctx.lifecycle) + lifecycle_tasks = [actor_ctx.lifecycle for actor_ctx in self._actors.values()] done, pending = await asyncio.wait(lifecycle_tasks, timeout=timeout) for lifecycle_task in pending: lifecycle_task.cancel() - self._stopped.set() + self._is_stopped.set() async def _actor_lifecycle_loop( self, @@ -410,29 +484,63 @@ async def _actor_lifecycle_loop( actor_ref: ActorRef, actor_ctx: '_ActorContext' ) -> None: + # start actor try: await actor.started() - while True: - sender, message = await actor_ctx.letterbox.get() - if isinstance(message, PoisonPill): - break + actor_ctx.receiving_messages = True + except Exception as e: + logging.exception('Exception raised while awaiting start of %s', actor_ref, exc_info=e) + # receive messages + while actor_ctx.receiving_messages: + sender, message = await actor_ctx.letterbox.get() + if isinstance(message, PoisonPill): + break + try: + await actor.receive(sender, message) + except Exception as e: try: - await actor.receive(sender, message) - except Exception as e: await actor.restarted(sender, message, e) + except Exception as e: + logging.exception('Exception raised while awaiting restart of %s', actor_ref, exc_info=e) + actor_ctx.receiving_messages = False + # stop children + children_stopping = [] + for child in actor_ctx.children: + child_ctx = self._actors[child] + children_stopping.append(child_ctx.is_stopped.wait()) + self.stop(child) + if children_stopping: + await asyncio.wait(children_stopping) + # stop actor + try: await actor.stopped() - finally: - # notify others that actor has been terminated - for other in actor_ctx.watched_by: - self._tell(other, Terminated(actor_ref)) - other_ctx = self._actors[other] - other_ctx.watching.remove(actor_ref) - # unwatch other actors - for other in actor_ctx.watching: - other_ctx = self._actors[other] - other_ctx.watched_by.remove(actor_ref) - # remove the actor - del self._actors[actor_ref] + except Exception as e: + logging.exception('Exception raised while awaiting stop of %s', actor_ref, exc_info=e) + # notify others that actor has been terminated + for other in actor_ctx.watched_by: + self._tell(other, Terminated(actor_ref), sender=None) + other_ctx = self._actors[other] + other_ctx.watching.remove(actor_ref) + # unwatch other actors + for other in actor_ctx.watching: + other_ctx = self._actors[other] + other_ctx.watched_by.remove(actor_ref) + # update parent's children + if actor_ctx.parent: + parent_ctx = self._actors[actor_ctx.parent] + parent_ctx.children.remove(actor_ref) + else: + self.children.remove(actor_ref) + # flush actor's letterbox and send deadletters + while not actor_ctx.letterbox.empty(): + sender, message = actor_ctx.letterbox.get_nowait() + if sender and sender != actor_ref: + deadletter = DeadLetter(actor_ref, message) + self._tell(sender, deadletter, sender=None) + # messages to this actor are ignored, because the actor is not receiving messages anymore + # remove the actor + actor_ctx.is_stopped.set() + del self._actors[actor_ref] @staticmethod def _validate_actor_ref( @@ -446,11 +554,25 @@ def _validate_actor_ref( class _ActorContext: # context class that holds the internal state of an actor - def __init__(self): + def __init__( + self, + system: ActorSystem, + actor_ref: ActorRef, + parent: ActorRef + ): + self.system: ActorSystem = system + self.actor_ref: ActorRef = actor_ref + self.parent: ActorRef = parent self.letterbox = asyncio.Queue() # unbounded queue self.lifecycle = None # main task controlling actor's lifecycle - self.watching = [] # this actor is watching other actors in the list - self.watched_by = [] # this actor is watched by other actors in the list + # watching, watched_by and children are not exposed to the actor + # because they are updated before the actor receives a Terminated message. + # This leads to an inconsistent state if the actor has not received multiple Terminated messages yet. + self.watching: Set[ActorRef] = set() # this actor is watching other actors in the list + self.watched_by: Set[ActorRef] = set() # this actor is watched by other actors in the list + self.children: List[ActorRef] = [] # actor's children + self.is_stopped = asyncio.Event() + self.receiving_messages = False # Messages