Skip to content

Commit

Permalink
Add hierarchy of actors. Add minor improvements. Update examples. Upd…
Browse files Browse the repository at this point in the history
…ate README.
  • Loading branch information
kweimann committed Nov 27, 2020
1 parent 0f25903 commit c4abbd8
Show file tree
Hide file tree
Showing 4 changed files with 264 additions and 196 deletions.
174 changes: 61 additions & 113 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,143 +2,91 @@

Minimalistic implementation of the actor concurrency model powered by asyncio.

---
##

* Installation: `pip install pyctor` (requires Python `>= 3.7.3`)
* Installation:
* `pip install pyctor` (requires Python `>= 3.7.3`)
* [Examples](examples)
* [API](#api)

## Example
## Overview

Imagine a teacher and two students, Sarah and Mike, at a playground ready to play a round of ping-pong. The teacher throws the ball to Mike and the match between the two kids begins. Should one of them decide they've had enough and leave the playground, the other one will follow. Meanwhile, the teacher watches the students and leaves with them once they've finished playing.

Let's start with the implementation of a student, i.e., Sarah and Mike. The students exchange ping-pong messages until one of them randomly decides to stop:
Pyctor is a lightweight implementation of the actor model for concurrent systems. Actors should only modify their own private state. They can influence other actors in the system through messaging. In Pyctor, we define actors by subclassing the `Actor` class:

```python
@dataclass
class Play:
student: ActorRef # play with this student


class Student(Actor): # subclass of Actor
# Every Actor must implement `receive` to receive messages.
class MyActor(pyctor.Actor):
async def receive(self, sender, message):
if isinstance(message, Play):
# Student was told to play.
# Send a message to the other student which will start the match.
self.tell(message.student, 'ping!')
elif message == 'bye!':
# The other student has stopped so this student stops as well.
self.stop()
elif random.random() < 0.25:
# With a probability of 0.25 leave the playground and notify the other student.
self.tell(sender, 'bye!')
self.stop()
elif message == 'ping!':
# Respond with "pong" after 1 second.
self.schedule_tell(sender, 'pong!', delay=1)
elif message == 'pong!':
# Respond with "ping" after 1 second.
self.schedule_tell(sender, 'ping!', delay=1)
print(f'[{datetime.now()}] {sender} to {self}: {message}')
print(f'received message: {message}')
```

Upon creation, the teacher creates the two students and asks the system to send a `Terminated` message if a student stops. If no students are left, the teacher shuts down the entire system:

```python
class Teacher(Actor):
def __init__(self):
super().__init__()
self.students = 0

async def started(self):
# Create 2 students: Mike and Sarah.
mike = self.system.create(Student(), name='Mike')
sarah = self.system.create(Student(), name='Sarah')
# Get notified when students stop.
self.watch(mike)
self.watch(sarah)
# Tell Mike to play with Sarah.
self.tell(mike, Play(sarah))
# Initialize the number of students to 2.
self.students = 2
print(f'[{datetime.now()}] {self}: it\'s playtime!')

async def receive(self, sender, message):
if isinstance(message, Terminated):
# Student has stopped.
self.students -= 1
if self.students == 0:
# All students have stopped so shut down.
self.system.shutdown()

async def stopped(self):
print(f'[{datetime.now()}] {self}: time to go back.')
```
Every actor must override the `receive` method. Inside it, the actor responds to the received messages, e.g., by modifying the internal state, sending new messages or creating new actors.

Finally, we implement the top-level entry point `main()` function that creates the actor system, the teacher, and waits for the system to shut down.
All actors live in a single ecosystem called `ActorSystem`. The actor system controls the lifecycle of every actor and handles message passing between actors. Actors may override lifecycle methods (e.g., `started`, `restarted` or `stopped`) that will be called by the system at specific points throughout actor's life. In the main function below, which serves as an entry point, we create the actor system and the first actor:

```python
import asyncio
import random
from dataclasses import dataclass
from datetime import datetime

from pyctor import ActorSystem, Actor, ActorRef, Terminated

# [Play code omitted]
async def main():
system = pyctor.ActorSystem() # create actor system
my_actor = system.create(MyActor()) # create first actor
system.tell(my_actor, 'hello') # send message to the actor
await system.stopped() # await system shutdown
```

# [Student code omitted]
The `create` method starts an actor and returns a reference for that actor. Actors may create child actors, forming a hierarchy of actors in the system. Actors should not be modified directly. Instead, they should communicate through messages. Messages are sent with the `tell` method which expects a reference of the actor that shall receive the message. The actor system guarantees that the messages are received in the same order they were sent in.

# [Teacher code omitted]

async def main():
# Create the actor system.
system = ActorSystem()
# Create the teacher.
system.create(Teacher(), name='Teacher')
# Run until system is shutdown.
await system.stopped()
We run the actor system by passing the main function to the `asyncio.run` method:

```python
if __name__ == '__main__':
asyncio.run(main())
```

An output of this script could be:

```
[2020-06-21 12:53:41.598117] Teacher: it's playtime!
[2020-06-21 12:53:41.598117] Teacher to Mike: Play(student=Sarah)
[2020-06-21 12:53:42.598700] Mike to Sarah: ping!
[2020-06-21 12:53:43.600073] Sarah to Mike: pong!
[2020-06-21 12:53:44.601047] Mike to Sarah: ping!
[2020-06-21 12:53:44.602062] Sarah to Mike: bye!
[2020-06-21 12:53:44.603047] Teacher: time to go back.
```
You can find further examples in the [examples](examples) directory.

## API

Quick overview of the API:

* `pyctor.ActorSystem`
* `create(actor, name=None)` Start an actor.
* `tell(actor, message)` Deliver a message to an actor.
* `schedule_tell(actor, message, *, delay=None, period=None)` Schedule a message to be delivered to an actor at some time.
* `stop(actor)` Stop an actor.
* `shutdown(timeout=None)` Initialize a shutdown of the actor system.
* `stopped()` Await system shutdown.
* `create(actor, name=None)`<br/>
Start an actor.
* `tell(actor, message)`<br/>
Deliver a message to an actor.
* `schedule_tell(actor, message, *, delay=None, period=None)`<br/>
Schedule a message to be delivered to an actor at some time. The message may also be delivered periodically.
* `stop(actor)`<br/>
Stop an actor.
* `shutdown(timeout=None)`<br/>
Initialize a shutdown of the actor system.
* `stopped()`<br/>
Await system shutdown.

* `pyctor.Actor`
* `receive(sender, message)` Must be implemented by the class inheriting from Actor. It is used by the system to deliver the messages.
* `tell(actor, message)` Deliver a message to another actor.
* `schedule_tell(actor, message, *, delay=None, period=None)` Schedule a message to be delivered to another actor at some time.
* `watch(actor)` Watch another actor and receive a `Terminated(actor)` message when the watched actor stops.
* `unwatch(actor)` Stop watching another actor.
* `stop()` Stop the actor.
* `started()` Called by the system before the actor starts to receive messages.
* `restarted(sender, message, error)` Called by the system if receiving the message caused an error. Actor will continue to receive messages.
* `stopped()` Called by the system before stopping the actor. The actor will not receive messages anymore.
* `receive(sender, message)`<br/>
This method is called by the system every time a message is received by the actor. It must be implemented by the class inheriting from Actor.
* `create(actor, name=None)`<br/>
Start a child actor. The child actor will be watched.
* `tell(actor, message)`<br/>
Deliver a message to another actor.
* `schedule_tell(actor, message, *, delay=None, period=None)`<br/>
Schedule a message to be delivered to another actor at some time. The message may also be delivered periodically.
* `watch(actor)`<br/>
Watch another actor and receive a `Terminated(actor)` message when the watched actor stops.
* `unwatch(actor)`<br/>
Stop watching another actor.
* `stop()`<br/>
Stop the actor.

Lifecycle callbacks:

* `started()`<br/>
Called by the system before the actor starts to receive messages.
* `restarted(sender, message, error)`<br/>
Called by the system if receiving the message caused an error. Actor will continue to receive messages.
* `stopped()`<br/>
Called by the system before stopping the actor. The actor will not receive messages anymore.

* System messages:
* `PoisonPill()` kills an actor.
* `Terminated(actor)` is sent to all watchers when the actor stops.
* `DeadLetter(actor, message)` is sent back to the sender if the actor was unable to receive the message.
* `PoisonPill()`<br/>
Stop an actor.
* `Terminated(actor)`<br/>
Notify that the actor has stopped.
* `DeadLetter(actor, message)`<br/>
Notify the sender that the actor did not receive the message.
23 changes: 12 additions & 11 deletions examples/01_getting_started.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ class Play:


class Student(Actor): # subclass of Actor
"""Students exchange ping-pong messages until one of them randomly decides to stop."""

# Every Actor must implement `receive` to receive messages.
async def receive(self, sender, message):
if isinstance(message, Play):
Expand All @@ -31,37 +33,36 @@ async def receive(self, sender, message):
elif message == 'pong!':
# Respond with "ping" after 1 second.
self.schedule_tell(sender, 'ping!', delay=1)
print(f'[{datetime.now()}] {sender} to {self}: {message}')
print(f'[{datetime.now()}] {sender.name} to {self.name}: {message}')


class Teacher(Actor):
"""Teacher creates the two students and watches them.
If students stop playing, the teacher shuts down the entire system."""

def __init__(self):
super().__init__()
self.students = 0

async def started(self):
# Create 2 students: Mike and Sarah.
mike = self.system.create(Student(), name='Mike')
sarah = self.system.create(Student(), name='Sarah')
# Get notified when students stop.
self.watch(mike)
self.watch(sarah)
# Create 2 students: Mike and Sarah. Children are watched by default.
mike = self.create(Student(), name='Mike')
sarah = self.create(Student(), name='Sarah')
self.students = 2
# Tell Mike to play with Sarah.
self.tell(mike, Play(sarah))
# Initialize the number of students to 2.
self.students = 2
print(f'[{datetime.now()}] {self}: it\'s playtime!')

async def receive(self, sender, message):
if isinstance(message, Terminated):
# Student has stopped.
self.students -= 1
print(f'[{datetime.now()}] {message.actor.name} has stopped playing')
if self.students == 0:
# All students have stopped so shut down.
self.system.shutdown()

async def stopped(self):
print(f'[{datetime.now()}] {self}: time to go back.')
print(f'[{datetime.now()}] {self.name}: time to go back.')


async def main():
Expand Down
11 changes: 4 additions & 7 deletions examples/02_master_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ class WorkFailed:

class Worker(Actor):
"""Worker does work and reports back to the master."""

def __init__(self, master):
super().__init__()
self.master = master

async def started(self):
self.watch(self.master)
self.tell(self.master, WorkerReady())

async def receive(self, sender, message):
Expand All @@ -45,9 +45,6 @@ async def receive(self, sender, message):
elif isinstance(message, (WorkDone, WorkFailed)):
self.tell(self.master, message)
self.tell(self.master, WorkerReady())
elif isinstance(message, Terminated):
if message.actor == self.master:
self.stop()

async def do_work_report_back(self, work):
try:
Expand All @@ -63,6 +60,7 @@ async def do_work(self, work):
class Master(Actor):
"""Master receives work from other actors, distributes the work
across its workers and sends the results back."""

def __init__(self, worker_init, num_workers):
super().__init__()
self.worker_init = worker_init
Expand All @@ -73,10 +71,9 @@ def __init__(self, worker_init, num_workers):

async def started(self):
for i in range(self.num_workers):
worker = self.system.create(
self.create(
self.worker_init(self.actor_ref),
name=f'Worker-{i+1}')
self.watch(worker)

async def receive(self, sender, message):
if isinstance(message, WorkerReady):
Expand Down Expand Up @@ -133,7 +130,7 @@ def __init__(self):
self.failed = 0

async def started(self):
self.master = self.system.create(
self.master = self.create(
Master(CountWorker, num_workers=parallelism),
name='Master')
print(f'[{datetime.now()}] {self.name}: Start the count!')
Expand Down
Loading

0 comments on commit c4abbd8

Please sign in to comment.