Skip to content

Commit

Permalink
Add Python payments state machine example
Browse files Browse the repository at this point in the history
  • Loading branch information
gvdongen committed Dec 20, 2024
1 parent d79b5e5 commit bbd629c
Show file tree
Hide file tree
Showing 7 changed files with 139 additions and 6 deletions.
10 changes: 5 additions & 5 deletions python/patterns-use-cases/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -225,16 +225,16 @@ and failures.

### Running this example
1. [Start the Restate Server](https://docs.restate.dev/develop/local_dev) in a separate shell: `restate-server`
2. Start the service: `./gradlew -PmainClass=my.example.statemachinepayments.AppMain run`
3. Register the services (with `--force` to override the endpoint during **development**): `restate -y deployments register --force localhost:9080`
2. Start the service: `python -m hypercorn --config hypercorn-config.toml src/statemachinepayments/payment_processor:app`
3. Register the services (with `--force` to override the endpoint during **development**): `restate -y deployments register --force localhost:9080`

### Demo scenario
Send some requests:

- Make a payment
```shell
curl -X POST localhost:8080/PaymentProcessor/some-string-id/makePayment -H 'content-type: application/json' \
-d '{ "accountId": "abc", "amountCents": 100 }'
-d '{ "account_id": "abc", "amount_cents": 100 }'
```

- Cancel a payment. The 'key' parameter is the idempotency token, there is no further request data.
Expand All @@ -256,8 +256,8 @@ restate kv get PaymentProcessor some-string-id
KEY VALUE
payment {
"accountId": "abc",
"amountCents": 100
"account_id": "abc",
"amount_cents": 100
}
status "CANCELLED"
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
@product_service.handler()
async def reserve(ctx: ObjectContext) -> bool:
if await ctx.get("reserved"):
print(f"Product already reserved {ctx.key}")
print(f"Product already reserved {ctx.key()}")
return False
print(f"Reserving product {ctx.key()}")
ctx.set("reserved", True)
Expand Down
Empty file.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from restate import ObjectContext, VirtualObject
from restate.exceptions import TerminalError
import random

from src.statemachinepayments.types import Result

# A simple virtual object, to track accounts.
# This is for simplicity to make this example work self-contained.
# This should be a database in a real scenario
account = VirtualObject("account")

# The key under which we store the balance
BALANCE = "balance"


@account.handler()
async def deposit(ctx: ObjectContext, amount_cents: int):
if amount_cents <= 0:
raise TerminalError("Amount must be greater than 0")

balance_cents = await ctx.get(BALANCE) or initialize_random_amount()
ctx.set(BALANCE, balance_cents + amount_cents)


@account.handler()
async def withdraw(ctx: ObjectContext, amount_cents: int) -> Result:
if amount_cents <= 0:
raise TerminalError("Amount must be greater than 0")

balance_cents = await ctx.get(BALANCE) or initialize_random_amount()
if balance_cents < amount_cents:
return Result(success=False, message=f"Insufficient funds: {balance_cents} cents")

ctx.set(BALANCE, balance_cents - amount_cents)
return Result(success=True, message="Withdrawal successful")


def initialize_random_amount() -> int:
return random.randint(100_000, 200_000)
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import restate
from pydantic import BaseModel
from restate import VirtualObject, ObjectContext
from datetime import timedelta
from accounts.accounts import account
import accounts.accounts as account_service
from src.statemachinepayments.types import Result

# A service that processes the payment requests.
# This is implemented as a virtual object to ensure that only one concurrent request can happen
# per payment-id. Requests are queued and processed sequentially per id.
# Methods can be called multiple times with the same payment-id, but payment will be executed
# only once. If a 'cancelPayment' is called for an id, the payment will either be undone, or
# blocked from being made in the future, depending on whether the cancel call comes before or after
# the 'makePayment' call.
payment_processor = VirtualObject("PaymentProcessor")

# The key under which we store the status.
STATUS = "status"

# The key under which we store the original payment request.
PAYMENT = "payment"

EXPIRY_TIMEOUT = timedelta(days=1)


class Payment(BaseModel):
account_id: str
amount_cents: int


class PaymentStatus:
NEW = "NEW"
COMPLETED_SUCCESSFULLY = "COMPLETED_SUCCESSFULLY"
CANCELED = "CANCELED"


@payment_processor.handler("makePayment")
async def make_payment(ctx: ObjectContext, payment: Payment) -> Result:
payment_id = ctx.key()
status = await ctx.get(STATUS) or PaymentStatus.NEW

if status == PaymentStatus.CANCELED:
return Result(success=False, message="Payment already cancelled")
if status == PaymentStatus.COMPLETED_SUCCESSFULLY:
return Result(success=False, message="Payment already completed in prior call")

# Charge the target account
payment_result = await ctx.object_call(account_service.withdraw, key=payment.account_id, arg=payment.amount_cents)

# Remember only on success, so that on failure (when we didn't charge) the external
# caller may retry this (with the same payment-id), for the sake of this example
if payment_result.success:
ctx.set(STATUS, PaymentStatus.COMPLETED_SUCCESSFULLY)
ctx.set(PAYMENT, payment.model_dump())
ctx.object_send(expire, payment_id, send_delay=EXPIRY_TIMEOUT, arg=None)

return payment_result


@payment_processor.handler("cancelPayment")
async def cancel_payment(ctx: ObjectContext):
status = await ctx.get(STATUS) or PaymentStatus.NEW

if status == PaymentStatus.NEW:
# not seen this payment-id before, mark as canceled, in case the cancellation
# overtook the actual payment request (on the external caller's side)
ctx.set(STATUS, PaymentStatus.CANCELED)
ctx.object_send(expire, ctx.key(), send_delay=EXPIRY_TIMEOUT, arg=None)

elif status == PaymentStatus.CANCELED:
pass

elif status == PaymentStatus.COMPLETED_SUCCESSFULLY:
# remember this as cancelled
ctx.set(STATUS, PaymentStatus.CANCELED)

# undo the payment
payment = Payment(**await ctx.get(PAYMENT))
ctx.object_send(account_service.deposit, key=payment.account_id, arg=payment.amount_cents)


@payment_processor.handler()
async def expire(ctx: ObjectContext):
ctx.clear_all()


app = restate.app([payment_processor, account])
6 changes: 6 additions & 0 deletions python/patterns-use-cases/src/statemachinepayments/types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from pydantic import BaseModel


class Result(BaseModel):
success: bool
message: str

0 comments on commit bbd629c

Please sign in to comment.