diff --git a/python/patterns-use-cases/README.md b/python/patterns-use-cases/README.md index f01d6b79..01b3b183 100644 --- a/python/patterns-use-cases/README.md +++ b/python/patterns-use-cases/README.md @@ -136,14 +136,14 @@ avoiding accidental state corruption and concurrency issues. ### Running the example 1. [Start the Restate Server](https://docs.restate.dev/develop/local_dev) in a separate shell: `restate-server` -2. Start the service: `./gradlew -PmainClass=my.example.statefulactors.MachineOperator run` +2. Start the service: `python -m hypercorn --config hypercorn-config.toml src/statefulactors/machine_operator:app` 3. Register the services (with `--force` to override the endpoint during **development**): `restate -y deployments register --force localhost:9080` ### Demo scenario Invoke the state machine transitions like ```shell -curl -X POST localhost:8080/MachineOperator/my-machine/setUp +curl -X POST localhost:8080/machine-operator/my-machine/setUp ``` To illustrate the concurrency safety here, send multiple requests without waiting on @@ -154,52 +154,36 @@ the log of the service that the requests queue per object key and safely execute unaffected by crashes and recoveries. ```shell -(curl -X POST localhost:8080/MachineOperator/a/setUp &) -(curl -X POST localhost:8080/MachineOperator/a/tearDown &) -(curl -X POST localhost:8080/MachineOperator/b/setUp &) -(curl -X POST localhost:8080/MachineOperator/b/setUp &) -(curl -X POST localhost:8080/MachineOperator/b/tearDown &) +(curl -X POST localhost:8080/machine-operator/a/setUp &) +(curl -X POST localhost:8080/machine-operator/a/tearDown &) +(curl -X POST localhost:8080/machine-operator/b/setUp &) +(curl -X POST localhost:8080/machine-operator/b/setUp &) +(curl -X POST localhost:8080/machine-operator/b/tearDown &) echo "executing..." ``` For example: ```shell -2024-12-19 09:12:22 INFO [MachineOperator/setUp][inv_1dceKvwtEc2n5doRPWFKzl2mKeGSpwxxO9] dev.restate.sdk.core.InvocationStateMachine - Start invocation -2024-12-19 09:12:22 INFO [MachineOperator/setUp][inv_174rq2A9bm3T30Ad4teHAPrb0QzkrcjlGV] dev.restate.sdk.core.InvocationStateMachine - Start invocation -2024-12-19 09:12:22 INFO [MachineOperator/setUp][inv_1dceKvwtEc2n5doRPWFKzl2mKeGSpwxxO9] my.example.statefulactors.utils.MachineOperations - a beginning transition to UP -2024-12-19 09:12:22 INFO [MachineOperator/setUp][inv_174rq2A9bm3T30Ad4teHAPrb0QzkrcjlGV] my.example.statefulactors.utils.MachineOperations - b beginning transition to UP -2024-12-19 09:12:27 INFO [MachineOperator/setUp][inv_174rq2A9bm3T30Ad4teHAPrb0QzkrcjlGV] my.example.statefulactors.utils.MachineOperations - b is now running -2024-12-19 09:12:27 INFO [MachineOperator/setUp][inv_1dceKvwtEc2n5doRPWFKzl2mKeGSpwxxO9] my.example.statefulactors.utils.MachineOperations - a is now running -2024-12-19 09:12:27 INFO [MachineOperator/setUp][inv_1dceKvwtEc2n5doRPWFKzl2mKeGSpwxxO9] dev.restate.sdk.core.InvocationStateMachine - End invocation -2024-12-19 09:12:27 INFO [MachineOperator/setUp][inv_174rq2A9bm3T30Ad4teHAPrb0QzkrcjlGV] dev.restate.sdk.core.InvocationStateMachine - End invocation -2024-12-19 09:12:27 INFO [MachineOperator/tearDown][inv_1dceKvwtEc2n2EW92WkrNSTF5E4UMjYAJX] dev.restate.sdk.core.InvocationStateMachine - Start invocation -2024-12-19 09:12:27 INFO [MachineOperator/setUp][inv_174rq2A9bm3T0AjO2JedeGnkGYK7Uvtnod] dev.restate.sdk.core.InvocationStateMachine - Start invocation -2024-12-19 09:12:27 INFO [MachineOperator/tearDown][inv_1dceKvwtEc2n2EW92WkrNSTF5E4UMjYAJX] my.example.statefulactors.utils.MachineOperations - a beginning transition to down -2024-12-19 09:12:27 INFO [MachineOperator/setUp][inv_174rq2A9bm3T0AjO2JedeGnkGYK7Uvtnod] dev.restate.sdk.core.InvocationStateMachine - End invocation -2024-12-19 09:12:27 INFO [MachineOperator/tearDown][inv_174rq2A9bm3T2s4ghDhTXRkFKH3ZLp8Jtn] dev.restate.sdk.core.InvocationStateMachine - Start invocation -2024-12-19 09:12:27 INFO [MachineOperator/tearDown][inv_174rq2A9bm3T2s4ghDhTXRkFKH3ZLp8Jtn] my.example.statefulactors.utils.MachineOperations - b beginning transition to down -2024-12-19 09:12:27 ERROR [MachineOperator/tearDown][inv_174rq2A9bm3T2s4ghDhTXRkFKH3ZLp8Jtn] my.example.statefulactors.utils.MachineOperations - A failure happened! -2024-12-19 09:12:27 WARN [MachineOperator/tearDown][inv_174rq2A9bm3T2s4ghDhTXRkFKH3ZLp8Jtn] dev.restate.sdk.core.ResolvedEndpointHandlerImpl - Error when processing the invocation -java.lang.RuntimeException: A failure happened! -...rest of trace... -2024-12-19 09:12:27 INFO [MachineOperator/tearDown][inv_174rq2A9bm3T2s4ghDhTXRkFKH3ZLp8Jtn] dev.restate.sdk.core.InvocationStateMachine - Start invocation -2024-12-19 09:12:27 INFO [MachineOperator/tearDown][inv_174rq2A9bm3T2s4ghDhTXRkFKH3ZLp8Jtn] my.example.statefulactors.utils.MachineOperations - b beginning transition to down -2024-12-19 09:12:27 ERROR [MachineOperator/tearDown][inv_174rq2A9bm3T2s4ghDhTXRkFKH3ZLp8Jtn] my.example.statefulactors.utils.MachineOperations - A failure happened! -2024-12-19 09:12:27 WARN [MachineOperator/tearDown][inv_174rq2A9bm3T2s4ghDhTXRkFKH3ZLp8Jtn] dev.restate.sdk.core.ResolvedEndpointHandlerImpl - Error when processing the invocation -java.lang.RuntimeException: A failure happened! -...rest of trace... -2024-12-19 09:12:27 INFO [MachineOperator/tearDown][inv_174rq2A9bm3T2s4ghDhTXRkFKH3ZLp8Jtn] dev.restate.sdk.core.InvocationStateMachine - Start invocation -2024-12-19 09:12:27 INFO [MachineOperator/tearDown][inv_174rq2A9bm3T2s4ghDhTXRkFKH3ZLp8Jtn] my.example.statefulactors.utils.MachineOperations - b beginning transition to down -2024-12-19 09:12:27 ERROR [MachineOperator/tearDown][inv_174rq2A9bm3T2s4ghDhTXRkFKH3ZLp8Jtn] my.example.statefulactors.utils.MachineOperations - A failure happened! -2024-12-19 09:12:27 WARN [MachineOperator/tearDown][inv_174rq2A9bm3T2s4ghDhTXRkFKH3ZLp8Jtn] dev.restate.sdk.core.ResolvedEndpointHandlerImpl - Error when processing the invocation -java.lang.RuntimeException: A failure happened! -...rest of trace... -2024-12-19 09:12:27 INFO [MachineOperator/tearDown][inv_174rq2A9bm3T2s4ghDhTXRkFKH3ZLp8Jtn] dev.restate.sdk.core.InvocationStateMachine - Start invocation -2024-12-19 09:12:27 INFO [MachineOperator/tearDown][inv_174rq2A9bm3T2s4ghDhTXRkFKH3ZLp8Jtn] my.example.statefulactors.utils.MachineOperations - b beginning transition to down -2024-12-19 09:12:32 INFO [MachineOperator/tearDown][inv_1dceKvwtEc2n2EW92WkrNSTF5E4UMjYAJX] my.example.statefulactors.utils.MachineOperations - a is now down -2024-12-19 09:12:32 INFO [MachineOperator/tearDown][inv_1dceKvwtEc2n2EW92WkrNSTF5E4UMjYAJX] dev.restate.sdk.core.InvocationStateMachine - End invocation -2024-12-19 09:12:32 INFO [MachineOperator/tearDown][inv_174rq2A9bm3T2s4ghDhTXRkFKH3ZLp8Jtn] my.example.statefulactors.utils.MachineOperations - b is now down -2024-12-19 09:12:32 INFO [MachineOperator/tearDown][inv_174rq2A9bm3T2s4ghDhTXRkFKH3ZLp8Jtn] dev.restate.sdk.core.InvocationStateMachine - End invocation +[2024-12-19 17:07:31,572] [698757] [INFO] - Beginning transition of a to up +[2024-12-19 17:07:31,749] [698759] [INFO] - Beginning transition of b to up +[2024-12-19 17:07:31,749] [698759] [ERROR] - A failure happened! +... rest of trace ... +Exception: A failure happened! +[2024-12-19 17:07:31,809] [698759] [INFO] - Beginning transition of b to up +[2024-12-19 17:07:31,809] [698759] [ERROR] - A failure happened! +... rest of trace ... +Exception: A failure happened! +[2024-12-19 17:07:31,931] [698759] [INFO] - Beginning transition of b to up +[2024-12-19 17:07:31,931] [698759] [ERROR] - A failure happened! +... rest of trace ... +Exception: A failure happened! +[2024-12-19 17:07:32,183] [698759] [INFO] - Beginning transition of b to up +[2024-12-19 17:07:36,581] [698757] [INFO] - Done transitioning a to up +[2024-12-19 17:07:36,583] [698757] [INFO] - Beginning transition of a to down +[2024-12-19 17:07:37,191] [698759] [INFO] - Done transitioning b to up +[2024-12-19 17:07:37,195] [698759] [INFO] - Beginning transition of b to down +[2024-12-19 17:07:41,592] [698757] [INFO] - Done transitioning a to down +[2024-12-19 17:07:42,198] [698759] [INFO] - Done transitioning b to down ``` ## Microservices: Payment State Machine @@ -368,8 +352,6 @@ Workflow logs: You see the call to `resultAsEmail` after the upload took too long, and the sending of the email. - - ## Async Tasks: Payment Signals - Combining Sync and Async (Webhook) Responses from Stripe This example issues a payment request to Stripe. diff --git a/python/patterns-use-cases/src/eventtransactions/user_feed.py b/python/patterns-use-cases/src/eventtransactions/user_feed.py index ac548dea..3cd497e5 100644 --- a/python/patterns-use-cases/src/eventtransactions/user_feed.py +++ b/python/patterns-use-cases/src/eventtransactions/user_feed.py @@ -2,7 +2,7 @@ from restate import VirtualObject, ObjectContext from datetime import timedelta -from src.eventtransactions.utils import create_post, get_post_status, update_user_feed, SocialMediaPost +from src.eventtransactions.utils import create_post, get_post_status, update_user_feed, SocialMediaPost, Status # Processing events (from Kafka) to update various downstream systems # - Journaling actions in Restate and driving retries from Restate, recovering @@ -25,7 +25,7 @@ async def process_post(ctx: ObjectContext, post: SocialMediaPost): # Delay processing until content moderation is complete (handler suspends when on FaaS). # This only blocks other posts for this user (Virtual Object), not for other users. - while await ctx.run("post status", lambda: get_post_status(post_id)) == "PENDING": + while await ctx.run("post status", lambda: get_post_status(post_id)) == Status.PENDING: await ctx.sleep(timedelta(seconds=5)) await ctx.run("update feed", lambda: update_user_feed(user_id, post_id)) diff --git a/python/patterns-use-cases/src/eventtransactions/utils.py b/python/patterns-use-cases/src/eventtransactions/utils.py index 291b3ce5..9bf4ca39 100644 --- a/python/patterns-use-cases/src/eventtransactions/utils.py +++ b/python/patterns-use-cases/src/eventtransactions/utils.py @@ -10,8 +10,10 @@ class SocialMediaPost(BaseModel): metadata: str -PENDING = "PENDING" -DONE = "DONE" +class Status: + PENDING = "PENDING" + DONE = "DONE" + logging.basicConfig(level=logging.INFO, format='[%(asctime)s] [%(process)d] [%(levelname)s] - %(message)s') diff --git a/python/patterns-use-cases/src/statefulactors/machine_operator.py b/python/patterns-use-cases/src/statefulactors/machine_operator.py new file mode 100644 index 00000000..cdf57561 --- /dev/null +++ b/python/patterns-use-cases/src/statefulactors/machine_operator.py @@ -0,0 +1,49 @@ +import restate +from restate import VirtualObject, ObjectContext +from utils import bring_up_machine, Status, tear_down_machine + +# This is a State Machine implemented with a Virtual Object +# +# - The object holds the state of the state machine and defines the methods +# to transition between the states. +# - The object's unique id identifies the state machine. Many parallel state +# machines exist, but only state machine (object) exists per id. +# - The "single-writer-per-key" characteristic of virtual objects ensures +# that one state transition per state machine is in progress at a time. +# Additional transitions are enqueued for that object, while a transition +# for a machine is still in progress. +machine_operator = VirtualObject("machine-operator") + + +@machine_operator.handler("setUp") +async def set_up(ctx: ObjectContext): + machine_id = ctx.key() + + # Ignore duplicate calls to 'setUp' + status = await ctx.get("status") + if status == Status.UP: + return f"{machine_id} is already up, so nothing to do" + + # Bringing up a machine is a slow process that frequently crashes + await bring_up_machine(ctx, machine_id) + ctx.set("status", Status.UP) + + return f"{machine_id} is now up" + + +@machine_operator.handler("tearDown") +async def tear_down(ctx: ObjectContext): + machine_id = ctx.key() + + status = await ctx.get("status") + if status != Status.UP: + return f"{machine_id} is not up, cannot tear down" + + # Tearing down a machine is a slow process that frequently crashes + await tear_down_machine(ctx, machine_id) + ctx.set("status", Status.DOWN) + + return f"{machine_id} is now down" + + +app = restate.app([machine_operator]) diff --git a/python/patterns-use-cases/src/statefulactors/utils.py b/python/patterns-use-cases/src/statefulactors/utils.py new file mode 100644 index 00000000..75b1b9d5 --- /dev/null +++ b/python/patterns-use-cases/src/statefulactors/utils.py @@ -0,0 +1,33 @@ +from datetime import timedelta + +import restate +import logging +import os +import random + +logging.basicConfig(level=logging.INFO, format='[%(asctime)s] [%(process)d] [%(levelname)s] - %(message)s') + + +class Status: + UP = "UP" + DOWN = "DOWN" + + +async def bring_up_machine(ctx: restate.Context, machine_id: str): + logging.info(f"Beginning transition of {machine_id} to up") + maybe_crash(0.4) + await ctx.sleep(timedelta(seconds=5)) + logging.info(f"Done transitioning {machine_id} to up") + + +async def tear_down_machine(ctx: restate.Context, machine_id: str): + logging.info(f"Beginning transition of {machine_id} to down") + maybe_crash(0.4) + await ctx.sleep(timedelta(seconds=5)) + logging.info(f"Done transitioning {machine_id} to down") + + +def maybe_crash(probability: float = 0.5) -> None: + if random.random() < probability: + logging.error("A failure happened!") + raise Exception("A failure happened!")