diff --git a/python/patterns-use-cases/README.md b/python/patterns-use-cases/README.md index 98324ef6..0990e886 100644 --- a/python/patterns-use-cases/README.md +++ b/python/patterns-use-cases/README.md @@ -225,8 +225,8 @@ and failures. ### Running this example 1. [Start the Restate Server](https://docs.restate.dev/develop/local_dev) in a separate shell: `restate-server` -2. Start the service: `./gradlew -PmainClass=my.example.statemachinepayments.AppMain run` -3. Register the services (with `--force` to override the endpoint during **development**): `restate -y deployments register --force localhost:9080` +2. Start the service: `python -m hypercorn --config hypercorn-config.toml src/statemachinepayments/payment_processor:app` +3. Register the services (with `--force` to override the endpoint during **development**): `restate -y deployments register --force localhost:9080` ### Demo scenario Send some requests: @@ -234,7 +234,7 @@ Send some requests: - Make a payment ```shell curl -X POST localhost:8080/PaymentProcessor/some-string-id/makePayment -H 'content-type: application/json' \ - -d '{ "accountId": "abc", "amountCents": 100 }' + -d '{ "account_id": "abc", "amount_cents": 100 }' ``` - Cancel a payment. The 'key' parameter is the idempotency token, there is no further request data. @@ -256,8 +256,8 @@ restate kv get PaymentProcessor some-string-id KEY VALUE payment { - "accountId": "abc", - "amountCents": 100 + "account_id": "abc", + "amount_cents": 100 } status "CANCELLED" ``` diff --git a/python/patterns-use-cases/src/durablerpc/product_service.py b/python/patterns-use-cases/src/durablerpc/product_service.py index 8442d43a..a0b64b9b 100644 --- a/python/patterns-use-cases/src/durablerpc/product_service.py +++ b/python/patterns-use-cases/src/durablerpc/product_service.py @@ -7,7 +7,7 @@ @product_service.handler() async def reserve(ctx: ObjectContext) -> bool: if await ctx.get("reserved"): - print(f"Product already reserved {ctx.key}") + print(f"Product already reserved {ctx.key()}") return False print(f"Reserving product {ctx.key()}") ctx.set("reserved", True) diff --git a/python/patterns-use-cases/src/statemachinepayments/__init__.py b/python/patterns-use-cases/src/statemachinepayments/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/python/patterns-use-cases/src/statemachinepayments/accounts/__init__.py b/python/patterns-use-cases/src/statemachinepayments/accounts/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/python/patterns-use-cases/src/statemachinepayments/accounts/accounts.py b/python/patterns-use-cases/src/statemachinepayments/accounts/accounts.py new file mode 100644 index 00000000..51d638c1 --- /dev/null +++ b/python/patterns-use-cases/src/statemachinepayments/accounts/accounts.py @@ -0,0 +1,39 @@ +from restate import ObjectContext, VirtualObject +from restate.exceptions import TerminalError +import random + +from src.statemachinepayments.types import Result + +# A simple virtual object, to track accounts. +# This is for simplicity to make this example work self-contained. +# This should be a database in a real scenario +account = VirtualObject("account") + +# The key under which we store the balance +BALANCE = "balance" + + +@account.handler() +async def deposit(ctx: ObjectContext, amount_cents: int): + if amount_cents <= 0: + raise TerminalError("Amount must be greater than 0") + + balance_cents = await ctx.get(BALANCE) or initialize_random_amount() + ctx.set(BALANCE, balance_cents + amount_cents) + + +@account.handler() +async def withdraw(ctx: ObjectContext, amount_cents: int) -> Result: + if amount_cents <= 0: + raise TerminalError("Amount must be greater than 0") + + balance_cents = await ctx.get(BALANCE) or initialize_random_amount() + if balance_cents < amount_cents: + return Result(success=False, message=f"Insufficient funds: {balance_cents} cents") + + ctx.set(BALANCE, balance_cents - amount_cents) + return Result(success=True, message="Withdrawal successful") + + +def initialize_random_amount() -> int: + return random.randint(100_000, 200_000) diff --git a/python/patterns-use-cases/src/statemachinepayments/payment_processor.py b/python/patterns-use-cases/src/statemachinepayments/payment_processor.py new file mode 100644 index 00000000..251eccfa --- /dev/null +++ b/python/patterns-use-cases/src/statemachinepayments/payment_processor.py @@ -0,0 +1,88 @@ +import restate +from pydantic import BaseModel +from restate import VirtualObject, ObjectContext +from datetime import timedelta +from accounts.accounts import account +import accounts.accounts as account_service +from src.statemachinepayments.types import Result + +# A service that processes the payment requests. +# This is implemented as a virtual object to ensure that only one concurrent request can happen +# per payment-id. Requests are queued and processed sequentially per id. +# Methods can be called multiple times with the same payment-id, but payment will be executed +# only once. If a 'cancelPayment' is called for an id, the payment will either be undone, or +# blocked from being made in the future, depending on whether the cancel call comes before or after +# the 'makePayment' call. +payment_processor = VirtualObject("PaymentProcessor") + +# The key under which we store the status. +STATUS = "status" + +# The key under which we store the original payment request. +PAYMENT = "payment" + +EXPIRY_TIMEOUT = timedelta(days=1) + + +class Payment(BaseModel): + account_id: str + amount_cents: int + + +class PaymentStatus: + NEW = "NEW" + COMPLETED_SUCCESSFULLY = "COMPLETED_SUCCESSFULLY" + CANCELED = "CANCELED" + + +@payment_processor.handler("makePayment") +async def make_payment(ctx: ObjectContext, payment: Payment) -> Result: + payment_id = ctx.key() + status = await ctx.get(STATUS) or PaymentStatus.NEW + + if status == PaymentStatus.CANCELED: + return Result(success=False, message="Payment already cancelled") + if status == PaymentStatus.COMPLETED_SUCCESSFULLY: + return Result(success=False, message="Payment already completed in prior call") + + # Charge the target account + payment_result = await ctx.object_call(account_service.withdraw, key=payment.account_id, arg=payment.amount_cents) + + # Remember only on success, so that on failure (when we didn't charge) the external + # caller may retry this (with the same payment-id), for the sake of this example + if payment_result.success: + ctx.set(STATUS, PaymentStatus.COMPLETED_SUCCESSFULLY) + ctx.set(PAYMENT, payment.model_dump()) + ctx.object_send(expire, payment_id, send_delay=EXPIRY_TIMEOUT, arg=None) + + return payment_result + + +@payment_processor.handler("cancelPayment") +async def cancel_payment(ctx: ObjectContext): + status = await ctx.get(STATUS) or PaymentStatus.NEW + + if status == PaymentStatus.NEW: + # not seen this payment-id before, mark as canceled, in case the cancellation + # overtook the actual payment request (on the external caller's side) + ctx.set(STATUS, PaymentStatus.CANCELED) + ctx.object_send(expire, ctx.key(), send_delay=EXPIRY_TIMEOUT, arg=None) + + elif status == PaymentStatus.CANCELED: + pass + + elif status == PaymentStatus.COMPLETED_SUCCESSFULLY: + # remember this as cancelled + ctx.set(STATUS, PaymentStatus.CANCELED) + + # undo the payment + payment = Payment(**await ctx.get(PAYMENT)) + ctx.object_send(account_service.deposit, key=payment.account_id, arg=payment.amount_cents) + + +@payment_processor.handler() +async def expire(ctx: ObjectContext): + ctx.clear_all() + + +app = restate.app([payment_processor, account]) diff --git a/python/patterns-use-cases/src/statemachinepayments/types.py b/python/patterns-use-cases/src/statemachinepayments/types.py new file mode 100644 index 00000000..f7f97395 --- /dev/null +++ b/python/patterns-use-cases/src/statemachinepayments/types.py @@ -0,0 +1,6 @@ +from pydantic import BaseModel + + +class Result(BaseModel): + success: bool + message: str \ No newline at end of file