Skip to content

Commit

Permalink
Update Python basics examples
Browse files Browse the repository at this point in the history
  • Loading branch information
gvdongen committed Dec 20, 2024
1 parent bbd629c commit d96e09c
Show file tree
Hide file tree
Showing 12 changed files with 132 additions and 374 deletions.
32 changes: 6 additions & 26 deletions python/basics/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,49 +14,29 @@ about how they work and how they can be run.
finished actions. The example applies a series of updates and permission setting changes
to user's profile.

* **[Durable Execution with Compensations](app/2_durable_execution_compensation.py):**
Reliably compensating / undoing previous actions upon unrecoverable errors halfway
through multi-step change. This is the same example as above, extended for cases where
a part of the change cannot be applied (conflict) and everything has to roll back.

* **[Workflows](app/3_workflows.py):** Workflows are durable execution tasks that can
* **[Workflows](app/2_workflows.py):** Workflows are durable execution tasks that can
be submitted and awaited. They have an identity and can be signaled and queried
through durable promises. The example is a user-signup flow that takes multiple
operations, including verifying the email address.

* **[Virtual Objects](app/4_virtual_objects.py):** Stateful serverless objects
* **[Virtual Objects](app/3_virtual_objects.py):** Stateful serverless objects
to manage durable consistent state and state-manipulating logic.

* **[Kafka Event-processing](app/5_events_processing.py):** Processing events to
update various downstream systems with durable event handlers, event-delaying,
in a strict-per-key order.

* **[Stateful Event-processing](app/6_events_state.py):** Populating state from
events and making is queryable via RPC handlers.


### Running the examples

To set up the example, use the following sequence of commands.

1. Setup the virtual env:
```shell
python3 -m venv .venv
source .venv/bin/activate
```

2. Install the requirements:
```shell
python3 -m venv venv
source venv/bin/activate
pip install -r requirements.txt
```

3. Start the app as follows:
- Durable execution example: `python -m hypercorn --config hypercorn-config.toml app/1_durable_execution.py:app`
- Durable execution with compensations example: `python -m hypercorn --config hypercorn-config.toml app/2_durable_execution_compensation.py:app`
- Workflows example: `python -m hypercorn --config hypercorn-config.toml app/3_workflows.py:app`
- Virtual Objects example: `python -m hypercorn --config hypercorn-config.toml app/4_virtual_objects.py:app`
- Kafka Event-processing example: `python -m hypercorn --config hypercorn-config.toml app/5_events_processing.py:app`
- Stateful Event-processing example: `python -m hypercorn --config hypercorn-config.toml app/6_events_state.py:app`
- Workflows example: `python -m hypercorn --config hypercorn-config.toml app/2_workflows.py:app`
- Virtual Objects example: `python -m hypercorn --config hypercorn-config.toml app/3_virtual_objects.py:app`

4. Start the Restate Server ([other options here](https://docs.restate.dev/develop/local_dev)):
```shell
Expand Down
96 changes: 38 additions & 58 deletions python/basics/app/1_durable_execution.py
Original file line number Diff line number Diff line change
@@ -1,75 +1,55 @@
#
# Copyright (c) 2023-2024 - Restate Software, Inc., Restate GmbH
#
# This file is part of the Restate examples,
# which is released under the MIT license.
#
# You can find a copy of the license in file LICENSE in the root
# directory of this repository or package, or at
# https:#github.com/restatedev/sdk-typescript/blob/main/LICENSE
from typing import TypedDict
import uuid

from restate import Service, Context
import restate
from pydantic import BaseModel
from restate import Service, Context
from utils import create_recurring_payment, create_subscription

from utils import apply_user_role, apply_permission, UpdateRequest
# Restate lets you implement resilient applications.
# Restate ensures handler code runs to completion despite failures:
# - Automatic retries
# - Restate tracks the progress of execution, and prevents re-execution of completed work on retries
# - Regular code and control flow, no custom DSLs

role_update = Service("roleUpdate")
# Applications consist of services with handlers that can be called over HTTP or Kafka.
subscription_service = Service("SubscriptionService")


# This is an example of the benefits of Durable Execution.
# Durable Execution ensures code runs to the end, even in the presence of
# failures. This is particularly useful for code that updates different systems and needs to
# make sure all updates are applied:
#
# - Failures are automatically retried, unless they are explicitly labeled
# as terminal errors
# - Restate tracks execution progress in a journal.
# Work that has already been completed is not repeated during retries.
# Instead, the previously completed journal entries are replayed.
# This ensures that stable deterministic values are used during execution.
# - Durable executed functions use the regular code and control flow,
# no custom DSLs
#
class SubscriptionRequest(BaseModel):
user_id: str
credit_card: str
subscriptions: list[str]

@role_update.handler(name="applyRoleUpdate")
async def apply_role_update(ctx: Context, update: UpdateRequest):
# parameters are durable across retries
user_id, role, permissions = update["user_id"], update["role"], update["permissions"]

# Apply a change to one system (e.g., DB upsert, API call, ...).
# The side effect persists the result with a consensus method so
# any later code relies on a deterministic result.
success = await ctx.run("apply_user_role", lambda: apply_user_role(user_id, role))
if not success:
return
@subscription_service.handler()
async def add(ctx: Context, req: SubscriptionRequest):
# Stable idempotency key: Restate persists the result of
# all `ctx` actions and recovers them after failures
payment_id = await ctx.run("payment id", lambda: str(uuid.uuid4()))

# Loop over the permission settings and apply them.
# Each operation through the Restate context is journaled
# and recovery restores results of previous operations from the journal
# without re-executing them.
for permission in permissions:
await ctx.run("apply_permission", lambda: apply_permission(user_id, permission))
# Retried in case of timeouts, API downtime, etc.
pay_ref = await ctx.run("recurring payment",
lambda: create_recurring_payment(req.credit_card, payment_id))

# Persists successful subscriptions and skip them on retries
for subscription in req.subscriptions:
await ctx.run("subscription",
lambda: create_subscription(req.user_id, subscription, pay_ref))

app = restate.app(services=[role_update])

# Create an HTTP endpoint to serve your services on port 9080
# or use .handler() to run on Lambda, Deno, Bun, Cloudflare Workers, ...
app = restate.app([subscription_service])

#
# See README for details on how to start and connect Restate.
#
# When invoking this function (see below for sample request), it will apply all
# role and permission changes, regardless of crashes.
# You will see all lines of the type "Applied permission remove:allow for user Sam Beckett"
# in the log, across all retries. You will also see that re-tries will not re-execute
# previously completed actions again, so each line occurs only once.
# Check the README to learn how to run Restate.
# Then invoke this function and see in the log how it recovers.
# Each action (e.g. "created recurring payment") is only logged once across all retries.
# Retries did not re-execute the successful operations.
#
# curl localhost:8080/roleUpdate/applyRoleUpdate -H 'content-type: application/json' -d \
# curl localhost:8080/SubscriptionService/add -H 'content-type: application/json' -d \
# '{
# "user_id": "Sam Beckett",
# "role": "content-manager",
# "permissions" : [
# { "permissionKey": "add", "setting": "allow" },
# { "permissionKey": "remove", "setting": "allow" },
# { "permissionKey": "share", "setting": "block" }
# ]
# "credit_card": "1234-5678-9012-3456",
# "subscriptions" : ["Netflix", "Disney+", "HBO Max"]
# }'
87 changes: 0 additions & 87 deletions python/basics/app/2_durable_execution_compensation.py

This file was deleted.

56 changes: 56 additions & 0 deletions python/basics/app/2_workflows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import uuid
import restate
from pydantic import BaseModel
from restate import Workflow, WorkflowContext, WorkflowSharedContext
from app.utils import create_user_entry, send_email_with_link


# Workflow for user signup and email verification.
# - Main workflow in run() method
# - Additional methods interact with the workflow.
# Each workflow instance has a unique ID and runs only once (to success or failure).
user_signup = Workflow("usersignup")


class User(BaseModel):
name: str
email: str


# --- The workflow logic ---
@user_signup.main()
async def run(ctx: WorkflowContext, user: User) -> bool:
# workflow ID = user ID; workflow runs once per user
user_id = ctx.key()

# Durably executed action; write to other system
async def create_user():
return await create_user_entry(user)
await ctx.run("create_user", create_user)

# Send the email with the verification link
secret = await ctx.run("secret", lambda: str(uuid.uuid4()))
await ctx.run("send_email", lambda: send_email_with_link(user_id, user.email, secret))

# Wait until user clicked email verification link
# Promise gets resolved or rejected by the other handlers
click_secret = await ctx.promise("email_link").value()
return click_secret == secret


# --- Other handlers interact with the workflow via queries and signals ---
@user_signup.handler()
async def click(ctx: WorkflowSharedContext, secret: str):
# Send data to the workflow via a durable promise
await ctx.promise("email_link").resolve(secret)

app = restate.app(services=[user_signup])
# You can deploy this as a container, Lambda, etc. - Invoke it over HTTP via: curl
# localhost:8080/usersignup/signup-userid1/run/send -H 'content-type: application/json' \
# -d '{ "name": "Bob", "email": "[email protected]" }'
#
# - Resolve the email link via:
# curl localhost:8080/usersignup/signup-userid1/verifyEmail
#
# - Attach back to the workflow to get the result:
# curl localhost:8080/restate/workflow/usersignup/userid1/attach
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
import restate
from restate import VirtualObject, ObjectContext

greeter_object = VirtualObject("greeter")


#
# Virtual Objects hold state and have methods to interact with the object.
# An object is identified by a unique id - only one object exists per id.
#
Expand All @@ -13,7 +9,9 @@
# method execution.
#
# Virtual Objects are _Stateful Serverless_ constructs.
#
greeter_object = VirtualObject("greeter")


@greeter_object.handler()
async def greet(ctx: ObjectContext, greeting: str) -> str:
# Access the state attached to this object (this 'name')
Expand Down
Loading

0 comments on commit d96e09c

Please sign in to comment.