Skip to content

Commit

Permalink
Add stateful actors python
Browse files Browse the repository at this point in the history
  • Loading branch information
gvdongen committed Dec 19, 2024
1 parent 4fa755f commit f30fed9
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 49 deletions.
72 changes: 27 additions & 45 deletions python/patterns-use-cases/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,14 +136,14 @@ avoiding accidental state corruption and concurrency issues.

### Running the example
1. [Start the Restate Server](https://docs.restate.dev/develop/local_dev) in a separate shell: `restate-server`
2. Start the service: `./gradlew -PmainClass=my.example.statefulactors.MachineOperator run`
2. Start the service: `python -m hypercorn --config hypercorn-config.toml src/statefulactors/machine_operator:app`
3. Register the services (with `--force` to override the endpoint during **development**): `restate -y deployments register --force localhost:9080`

### Demo scenario

Invoke the state machine transitions like
```shell
curl -X POST localhost:8080/MachineOperator/my-machine/setUp
curl -X POST localhost:8080/machine-operator/my-machine/setUp
```

To illustrate the concurrency safety here, send multiple requests without waiting on
Expand All @@ -154,52 +154,36 @@ the log of the service that the requests queue per object key and safely execute
unaffected by crashes and recoveries.

```shell
(curl -X POST localhost:8080/MachineOperator/a/setUp &)
(curl -X POST localhost:8080/MachineOperator/a/tearDown &)
(curl -X POST localhost:8080/MachineOperator/b/setUp &)
(curl -X POST localhost:8080/MachineOperator/b/setUp &)
(curl -X POST localhost:8080/MachineOperator/b/tearDown &)
(curl -X POST localhost:8080/machine-operator/a/setUp &)
(curl -X POST localhost:8080/machine-operator/a/tearDown &)
(curl -X POST localhost:8080/machine-operator/b/setUp &)
(curl -X POST localhost:8080/machine-operator/b/setUp &)
(curl -X POST localhost:8080/machine-operator/b/tearDown &)
echo "executing..."
```

For example:
```shell
2024-12-19 09:12:22 INFO [MachineOperator/setUp][inv_1dceKvwtEc2n5doRPWFKzl2mKeGSpwxxO9] dev.restate.sdk.core.InvocationStateMachine - Start invocation
2024-12-19 09:12:22 INFO [MachineOperator/setUp][inv_174rq2A9bm3T30Ad4teHAPrb0QzkrcjlGV] dev.restate.sdk.core.InvocationStateMachine - Start invocation
2024-12-19 09:12:22 INFO [MachineOperator/setUp][inv_1dceKvwtEc2n5doRPWFKzl2mKeGSpwxxO9] my.example.statefulactors.utils.MachineOperations - a beginning transition to UP
2024-12-19 09:12:22 INFO [MachineOperator/setUp][inv_174rq2A9bm3T30Ad4teHAPrb0QzkrcjlGV] my.example.statefulactors.utils.MachineOperations - b beginning transition to UP
2024-12-19 09:12:27 INFO [MachineOperator/setUp][inv_174rq2A9bm3T30Ad4teHAPrb0QzkrcjlGV] my.example.statefulactors.utils.MachineOperations - b is now running
2024-12-19 09:12:27 INFO [MachineOperator/setUp][inv_1dceKvwtEc2n5doRPWFKzl2mKeGSpwxxO9] my.example.statefulactors.utils.MachineOperations - a is now running
2024-12-19 09:12:27 INFO [MachineOperator/setUp][inv_1dceKvwtEc2n5doRPWFKzl2mKeGSpwxxO9] dev.restate.sdk.core.InvocationStateMachine - End invocation
2024-12-19 09:12:27 INFO [MachineOperator/setUp][inv_174rq2A9bm3T30Ad4teHAPrb0QzkrcjlGV] dev.restate.sdk.core.InvocationStateMachine - End invocation
2024-12-19 09:12:27 INFO [MachineOperator/tearDown][inv_1dceKvwtEc2n2EW92WkrNSTF5E4UMjYAJX] dev.restate.sdk.core.InvocationStateMachine - Start invocation
2024-12-19 09:12:27 INFO [MachineOperator/setUp][inv_174rq2A9bm3T0AjO2JedeGnkGYK7Uvtnod] dev.restate.sdk.core.InvocationStateMachine - Start invocation
2024-12-19 09:12:27 INFO [MachineOperator/tearDown][inv_1dceKvwtEc2n2EW92WkrNSTF5E4UMjYAJX] my.example.statefulactors.utils.MachineOperations - a beginning transition to down
2024-12-19 09:12:27 INFO [MachineOperator/setUp][inv_174rq2A9bm3T0AjO2JedeGnkGYK7Uvtnod] dev.restate.sdk.core.InvocationStateMachine - End invocation
2024-12-19 09:12:27 INFO [MachineOperator/tearDown][inv_174rq2A9bm3T2s4ghDhTXRkFKH3ZLp8Jtn] dev.restate.sdk.core.InvocationStateMachine - Start invocation
2024-12-19 09:12:27 INFO [MachineOperator/tearDown][inv_174rq2A9bm3T2s4ghDhTXRkFKH3ZLp8Jtn] my.example.statefulactors.utils.MachineOperations - b beginning transition to down
2024-12-19 09:12:27 ERROR [MachineOperator/tearDown][inv_174rq2A9bm3T2s4ghDhTXRkFKH3ZLp8Jtn] my.example.statefulactors.utils.MachineOperations - A failure happened!
2024-12-19 09:12:27 WARN [MachineOperator/tearDown][inv_174rq2A9bm3T2s4ghDhTXRkFKH3ZLp8Jtn] dev.restate.sdk.core.ResolvedEndpointHandlerImpl - Error when processing the invocation
java.lang.RuntimeException: A failure happened!
...rest of trace...
2024-12-19 09:12:27 INFO [MachineOperator/tearDown][inv_174rq2A9bm3T2s4ghDhTXRkFKH3ZLp8Jtn] dev.restate.sdk.core.InvocationStateMachine - Start invocation
2024-12-19 09:12:27 INFO [MachineOperator/tearDown][inv_174rq2A9bm3T2s4ghDhTXRkFKH3ZLp8Jtn] my.example.statefulactors.utils.MachineOperations - b beginning transition to down
2024-12-19 09:12:27 ERROR [MachineOperator/tearDown][inv_174rq2A9bm3T2s4ghDhTXRkFKH3ZLp8Jtn] my.example.statefulactors.utils.MachineOperations - A failure happened!
2024-12-19 09:12:27 WARN [MachineOperator/tearDown][inv_174rq2A9bm3T2s4ghDhTXRkFKH3ZLp8Jtn] dev.restate.sdk.core.ResolvedEndpointHandlerImpl - Error when processing the invocation
java.lang.RuntimeException: A failure happened!
...rest of trace...
2024-12-19 09:12:27 INFO [MachineOperator/tearDown][inv_174rq2A9bm3T2s4ghDhTXRkFKH3ZLp8Jtn] dev.restate.sdk.core.InvocationStateMachine - Start invocation
2024-12-19 09:12:27 INFO [MachineOperator/tearDown][inv_174rq2A9bm3T2s4ghDhTXRkFKH3ZLp8Jtn] my.example.statefulactors.utils.MachineOperations - b beginning transition to down
2024-12-19 09:12:27 ERROR [MachineOperator/tearDown][inv_174rq2A9bm3T2s4ghDhTXRkFKH3ZLp8Jtn] my.example.statefulactors.utils.MachineOperations - A failure happened!
2024-12-19 09:12:27 WARN [MachineOperator/tearDown][inv_174rq2A9bm3T2s4ghDhTXRkFKH3ZLp8Jtn] dev.restate.sdk.core.ResolvedEndpointHandlerImpl - Error when processing the invocation
java.lang.RuntimeException: A failure happened!
...rest of trace...
2024-12-19 09:12:27 INFO [MachineOperator/tearDown][inv_174rq2A9bm3T2s4ghDhTXRkFKH3ZLp8Jtn] dev.restate.sdk.core.InvocationStateMachine - Start invocation
2024-12-19 09:12:27 INFO [MachineOperator/tearDown][inv_174rq2A9bm3T2s4ghDhTXRkFKH3ZLp8Jtn] my.example.statefulactors.utils.MachineOperations - b beginning transition to down
2024-12-19 09:12:32 INFO [MachineOperator/tearDown][inv_1dceKvwtEc2n2EW92WkrNSTF5E4UMjYAJX] my.example.statefulactors.utils.MachineOperations - a is now down
2024-12-19 09:12:32 INFO [MachineOperator/tearDown][inv_1dceKvwtEc2n2EW92WkrNSTF5E4UMjYAJX] dev.restate.sdk.core.InvocationStateMachine - End invocation
2024-12-19 09:12:32 INFO [MachineOperator/tearDown][inv_174rq2A9bm3T2s4ghDhTXRkFKH3ZLp8Jtn] my.example.statefulactors.utils.MachineOperations - b is now down
2024-12-19 09:12:32 INFO [MachineOperator/tearDown][inv_174rq2A9bm3T2s4ghDhTXRkFKH3ZLp8Jtn] dev.restate.sdk.core.InvocationStateMachine - End invocation
[2024-12-19 17:07:31,572] [698757] [INFO] - Beginning transition of a to up
[2024-12-19 17:07:31,749] [698759] [INFO] - Beginning transition of b to up
[2024-12-19 17:07:31,749] [698759] [ERROR] - A failure happened!
... rest of trace ...
Exception: A failure happened!
[2024-12-19 17:07:31,809] [698759] [INFO] - Beginning transition of b to up
[2024-12-19 17:07:31,809] [698759] [ERROR] - A failure happened!
... rest of trace ...
Exception: A failure happened!
[2024-12-19 17:07:31,931] [698759] [INFO] - Beginning transition of b to up
[2024-12-19 17:07:31,931] [698759] [ERROR] - A failure happened!
... rest of trace ...
Exception: A failure happened!
[2024-12-19 17:07:32,183] [698759] [INFO] - Beginning transition of b to up
[2024-12-19 17:07:36,581] [698757] [INFO] - Done transitioning a to up
[2024-12-19 17:07:36,583] [698757] [INFO] - Beginning transition of a to down
[2024-12-19 17:07:37,191] [698759] [INFO] - Done transitioning b to up
[2024-12-19 17:07:37,195] [698759] [INFO] - Beginning transition of b to down
[2024-12-19 17:07:41,592] [698757] [INFO] - Done transitioning a to down
[2024-12-19 17:07:42,198] [698759] [INFO] - Done transitioning b to down
```

## Microservices: Payment State Machine
Expand Down Expand Up @@ -368,8 +352,6 @@ Workflow logs:

You see the call to `resultAsEmail` after the upload took too long, and the sending of the email.



## Async Tasks: Payment Signals - Combining Sync and Async (Webhook) Responses from Stripe

This example issues a payment request to Stripe.
Expand Down
4 changes: 2 additions & 2 deletions python/patterns-use-cases/src/eventtransactions/user_feed.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from restate import VirtualObject, ObjectContext
from datetime import timedelta

from src.eventtransactions.utils import create_post, get_post_status, update_user_feed, SocialMediaPost
from src.eventtransactions.utils import create_post, get_post_status, update_user_feed, SocialMediaPost, Status

# Processing events (from Kafka) to update various downstream systems
# - Journaling actions in Restate and driving retries from Restate, recovering
Expand All @@ -25,7 +25,7 @@ async def process_post(ctx: ObjectContext, post: SocialMediaPost):

# Delay processing until content moderation is complete (handler suspends when on FaaS).
# This only blocks other posts for this user (Virtual Object), not for other users.
while await ctx.run("post status", lambda: get_post_status(post_id)) == "PENDING":
while await ctx.run("post status", lambda: get_post_status(post_id)) == Status.PENDING:
await ctx.sleep(timedelta(seconds=5))

await ctx.run("update feed", lambda: update_user_feed(user_id, post_id))
Expand Down
6 changes: 4 additions & 2 deletions python/patterns-use-cases/src/eventtransactions/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ class SocialMediaPost(BaseModel):
metadata: str


PENDING = "PENDING"
DONE = "DONE"
class Status:
PENDING = "PENDING"
DONE = "DONE"


logging.basicConfig(level=logging.INFO, format='[%(asctime)s] [%(process)d] [%(levelname)s] - %(message)s')

Expand Down
49 changes: 49 additions & 0 deletions python/patterns-use-cases/src/statefulactors/machine_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import restate
from restate import VirtualObject, ObjectContext
from utils import bring_up_machine, Status, tear_down_machine

# This is a State Machine implemented with a Virtual Object
#
# - The object holds the state of the state machine and defines the methods
# to transition between the states.
# - The object's unique id identifies the state machine. Many parallel state
# machines exist, but only state machine (object) exists per id.
# - The "single-writer-per-key" characteristic of virtual objects ensures
# that one state transition per state machine is in progress at a time.
# Additional transitions are enqueued for that object, while a transition
# for a machine is still in progress.
machine_operator = VirtualObject("machine-operator")


@machine_operator.handler("setUp")
async def set_up(ctx: ObjectContext):
machine_id = ctx.key()

# Ignore duplicate calls to 'setUp'
status = await ctx.get("status")
if status == Status.UP:
return f"{machine_id} is already up, so nothing to do"

# Bringing up a machine is a slow process that frequently crashes
await bring_up_machine(ctx, machine_id)
ctx.set("status", Status.UP)

return f"{machine_id} is now up"


@machine_operator.handler("tearDown")
async def tear_down(ctx: ObjectContext):
machine_id = ctx.key()

status = await ctx.get("status")
if status != Status.UP:
return f"{machine_id} is not up, cannot tear down"

# Tearing down a machine is a slow process that frequently crashes
await tear_down_machine(ctx, machine_id)
ctx.set("status", Status.DOWN)

return f"{machine_id} is now down"


app = restate.app([machine_operator])
33 changes: 33 additions & 0 deletions python/patterns-use-cases/src/statefulactors/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from datetime import timedelta

import restate
import logging
import os
import random

logging.basicConfig(level=logging.INFO, format='[%(asctime)s] [%(process)d] [%(levelname)s] - %(message)s')


class Status:
UP = "UP"
DOWN = "DOWN"


async def bring_up_machine(ctx: restate.Context, machine_id: str):
logging.info(f"Beginning transition of {machine_id} to up")
maybe_crash(0.4)
await ctx.sleep(timedelta(seconds=5))
logging.info(f"Done transitioning {machine_id} to up")


async def tear_down_machine(ctx: restate.Context, machine_id: str):
logging.info(f"Beginning transition of {machine_id} to down")
maybe_crash(0.4)
await ctx.sleep(timedelta(seconds=5))
logging.info(f"Done transitioning {machine_id} to down")


def maybe_crash(probability: float = 0.5) -> None:
if random.random() < probability:
logging.error("A failure happened!")
raise Exception("A failure happened!")

0 comments on commit f30fed9

Please sign in to comment.