diff --git a/java/patterns-use-cases/async-tasks-parallelize-work/README.md b/java/patterns-use-cases/async-tasks-parallelize-work/README.md deleted file mode 100644 index c2e1bbdc..00000000 --- a/java/patterns-use-cases/async-tasks-parallelize-work/README.md +++ /dev/null @@ -1,11 +0,0 @@ -# Hello world - Java HTTP example - -Sample project configuration of a Restate service using the Java SDK and HTTP server. - -Have a look at the [Java Quickstart guide](https://docs.restate.dev/get_started/quickstart?sdk=java) for more information on how to use this project. - -To run: - -```shell -./gradlew run -``` \ No newline at end of file diff --git a/java/patterns-use-cases/async-tasks-payment-signals/src/main/java/my/example/PaymentService.java b/java/patterns-use-cases/async-tasks-payment-signals/src/main/java/my/example/PaymentService.java index a7f8889e..2fd76ca4 100644 --- a/java/patterns-use-cases/async-tasks-payment-signals/src/main/java/my/example/PaymentService.java +++ b/java/patterns-use-cases/async-tasks-payment-signals/src/main/java/my/example/PaymentService.java @@ -23,7 +23,6 @@ import dev.restate.sdk.common.Serde; import dev.restate.sdk.common.TerminalException; import dev.restate.sdk.http.vertx.RestateHttpEndpointBuilder; -import my.example.types.PaymentRequest; import my.example.utils.PaymentUtils; import my.example.utils.StripeUtils; import org.apache.logging.log4j.LogManager; @@ -53,6 +52,8 @@ public class PaymentService { intent -> intent.toJson().getBytes(), bytes -> ApiResource.GSON.fromJson(new String(bytes), PaymentIntent.class)); + public record PaymentRequest(Long amount, String paymentMethodId, boolean delayedStatus) {} + @Handler public void processPayment(Context ctx, PaymentRequest request) { PaymentUtils.verifyPaymentRequest(request); diff --git a/java/patterns-use-cases/async-tasks-payment-signals/src/main/java/my/example/types/PaymentRequest.java b/java/patterns-use-cases/async-tasks-payment-signals/src/main/java/my/example/types/PaymentRequest.java deleted file mode 100644 index a2b52d04..00000000 --- a/java/patterns-use-cases/async-tasks-payment-signals/src/main/java/my/example/types/PaymentRequest.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright (c) 2024 - Restate Software, Inc., Restate GmbH - * - * This file is part of the Restate examples, - * which is released under the MIT license. - * - * You can find a copy of the license in the file LICENSE - * in the root directory of this repository or package or at - * https://github.com/restatedev/examples/ - */ -package my.example.types; - -public class PaymentRequest { - private final Long amount; - private final String paymentMethodId; - private final boolean delayedStatus; - - public PaymentRequest(Long amount, String paymentMethodId, boolean delayedStatus) { - this.amount = amount; - this.paymentMethodId = paymentMethodId; - this.delayedStatus = delayedStatus; - } - - public Long getAmount() { - return amount; - } - - public String getPaymentMethodId() { - return paymentMethodId; - } - - public boolean isDelayed() { - return delayedStatus; - } -} diff --git a/java/patterns-use-cases/event-processing-transactional-handlers/src/main/java/my/example/UserFeed.java b/java/patterns-use-cases/event-processing-transactional-handlers/src/main/java/my/example/UserFeed.java index 3ddc986b..55f9cca2 100644 --- a/java/patterns-use-cases/event-processing-transactional-handlers/src/main/java/my/example/UserFeed.java +++ b/java/patterns-use-cases/event-processing-transactional-handlers/src/main/java/my/example/UserFeed.java @@ -4,7 +4,6 @@ import dev.restate.sdk.annotation.Handler; import dev.restate.sdk.annotation.VirtualObject; import dev.restate.sdk.http.vertx.RestateHttpEndpointBuilder; -import my.example.types.SocialMediaPost; import java.time.Duration; @@ -16,6 +15,8 @@ @VirtualObject public class UserFeed { + public record SocialMediaPost(String content, String metadata) {} + // Connect a handler to a Kafka topic. Restate manages the Kafka subscription and offsets. // Events are pushed in order to the Virtual Object (Kafka key = object key). @Handler diff --git a/java/patterns-use-cases/event-processing-transactional-handlers/src/main/java/my/example/types/SocialMediaPost.java b/java/patterns-use-cases/event-processing-transactional-handlers/src/main/java/my/example/types/SocialMediaPost.java deleted file mode 100644 index f4213567..00000000 --- a/java/patterns-use-cases/event-processing-transactional-handlers/src/main/java/my/example/types/SocialMediaPost.java +++ /dev/null @@ -1,3 +0,0 @@ -package my.example.types; - -public record SocialMediaPost(String content, String metadata) {} \ No newline at end of file diff --git a/java/patterns-use-cases/microservices-payment-state-machines/src/main/java/my/example/utils/TypeChecks.java b/java/patterns-use-cases/microservices-payment-state-machines/src/main/java/my/example/utils/TypeChecks.java deleted file mode 100644 index e69de29b..00000000 diff --git a/java/patterns-use-cases/microservices-sagas/src/main/java/dev/restate/patterns/AppMain.java b/java/patterns-use-cases/microservices-sagas/src/main/java/dev/restate/patterns/AppMain.java deleted file mode 100644 index 45fc8e61..00000000 --- a/java/patterns-use-cases/microservices-sagas/src/main/java/dev/restate/patterns/AppMain.java +++ /dev/null @@ -1,15 +0,0 @@ -package dev.restate.patterns; - -import dev.restate.patterns.activities.CarRentals; -import dev.restate.patterns.activities.Flights; -import dev.restate.sdk.http.vertx.RestateHttpEndpointBuilder; - -public class AppMain { - public static void main(String[] args) { - RestateHttpEndpointBuilder.builder() - .bind(new BookingWorkflow()) - .bind(new CarRentals()) - .bind(new Flights()) - .buildAndListen(); - } -} diff --git a/java/patterns-use-cases/microservices-sagas/src/main/java/dev/restate/patterns/BookingWorkflow.java b/java/patterns-use-cases/microservices-sagas/src/main/java/dev/restate/patterns/BookingWorkflow.java index d2202c5d..e1f06fba 100644 --- a/java/patterns-use-cases/microservices-sagas/src/main/java/dev/restate/patterns/BookingWorkflow.java +++ b/java/patterns-use-cases/microservices-sagas/src/main/java/dev/restate/patterns/BookingWorkflow.java @@ -39,6 +39,12 @@ @Workflow public class BookingWorkflow { + public record BookingRequest( + Flights.FlightBookingRequest flights, + CarRentals.CarRentalRequest car, + PaymentClient.PaymentInfo paymentInfo + ) {} + @Workflow public void run(WorkflowContext ctx, BookingRequest req) throws TerminalException { // create a list of undo actions @@ -78,4 +84,12 @@ public void run(WorkflowContext ctx, BookingRequest req) throws TerminalExceptio throw e; } } + + public static void main(String[] args) { + RestateHttpEndpointBuilder.builder() + .bind(new BookingWorkflow()) + .bind(new CarRentals()) + .bind(new Flights()) + .buildAndListen(); + } } diff --git a/java/patterns-use-cases/microservices-sagas/src/main/java/dev/restate/patterns/clients/PaymentClient.java b/java/patterns-use-cases/microservices-sagas/src/main/java/dev/restate/patterns/activities/PaymentClient.java similarity index 100% rename from java/patterns-use-cases/microservices-sagas/src/main/java/dev/restate/patterns/clients/PaymentClient.java rename to java/patterns-use-cases/microservices-sagas/src/main/java/dev/restate/patterns/activities/PaymentClient.java diff --git a/java/patterns-use-cases/microservices-sagas/src/main/java/dev/restate/patterns/types/BookingRequest.java b/java/patterns-use-cases/microservices-sagas/src/main/java/dev/restate/patterns/types/BookingRequest.java deleted file mode 100644 index 87eaf4b5..00000000 --- a/java/patterns-use-cases/microservices-sagas/src/main/java/dev/restate/patterns/types/BookingRequest.java +++ /dev/null @@ -1,12 +0,0 @@ -package dev.restate.patterns.types; - -import dev.restate.patterns.activities.CarRentals; -import dev.restate.patterns.activities.Flights; -import dev.restate.patterns.clients.PaymentClient; - -public record BookingRequest( - Flights.FlightBookingRequest flights, - CarRentals.CarRentalRequest car, - PaymentClient.PaymentInfo paymentInfo -) {} - diff --git a/java/patterns-use-cases/async-tasks-parallelize-work/.gitignore b/java/patterns-use-cases/patterns-use-cases/.gitignore similarity index 100% rename from java/patterns-use-cases/async-tasks-parallelize-work/.gitignore rename to java/patterns-use-cases/patterns-use-cases/.gitignore diff --git a/java/patterns-use-cases/patterns-use-cases/README.md b/java/patterns-use-cases/patterns-use-cases/README.md new file mode 100644 index 00000000..4663d93c --- /dev/null +++ b/java/patterns-use-cases/patterns-use-cases/README.md @@ -0,0 +1,467 @@ +# Java Patterns and Use Cases + + +Common tasks and patterns implemented with Restate: + +| Category | Use case / Name | Difficulty | Description | +|------------------|--------------------------------------------------------------------------------------|-------------|-------------------------------------------------------------------------------------------------------------| +| Microservices | Durable RPC: [code](src/java/my/example/durablerpc) | Basic | Restate persists requests and makes sure they execute exactly-once. | +| Microservices | Sagas: [code](src/java/my/example/sagas) | Basic | Preserve consistency by tracking undo actions and running them when code fails halfway through. | +| Microservices | [Stateful Actors](patterns-use-cases/microservices-stateful-actors) | Basic | State machine with a set of transitions, built as a Restate Virtual Object for automatic state persistence. | +| Microservices | [Payment state machines](patterns-use-cases/microservices-payment-state-machines) | Advanced | State machine example that tracks a payment process, ensuring consistent processing and cancellations. | +| Async tasks | [(Delayed) Task Queue](patterns-use-cases/async-tasks-queue) | Basic | Use Restate as a queue. Schedule tasks for now or later and ensure the task is only executed once. | +| Async tasks | [Parallelizing work](patterns-use-cases/async-tasks-parallelize-work) | Intermediate | Execute a list of tasks in parallel and then gather their result. | +| Async tasks | [Slow async data upload](patterns-use-cases/async-tasks-data-upload) | Intermediate | Kick of a synchronous task (e.g. data upload) and turn it into an asynchronous one if it takes too long. | +| Async tasks | [Payments: async signals processing](patterns-use-cases/async-tasks-payment-signals) | Advanced | Handling async payment callbacks for slow payments, with Stripe. | +| Event processing | [Transactional handlers](patterns-use-cases/event-processing-transactional-handlers) | Basic | Processing events (from Kafka) to update various downstream systems in a transactional way. | +| Event processing | [Enriching streams](patterns-use-cases/event-processing-enrichment) | Basic | Stateful functions/actors connected to Kafka and callable over RPC. | +| Patterns | [Durable Promises](patterns-use-cases/pattern-durable-promises) | Advanced | Implementation of Promises/Futures that are durable across processes and failures. | +| Patterns | [Priority Queue](patterns-use-cases/pattern-priority-queue) | Advanced | Example of implementing a priority queue to manage task execution order. | + + + +## Microservices: Durable RPC + +This example shows an example of: +- **Durable RPC**: once a request has reached Restate, it is guaranteed to be processed +- **Exactly-once processing**: Ensure that duplicate requests are not processed multiple times via idempotency keys + +The example shows how you can programmatically submit a requests to a Restate service. +Every request gets processed durably, and deduplicated based on the idempotency key. + +The example shows a client that receives product reservation requests and forwards them to the product service. +The [Product service](restate-app/app.ts) is a Restate service that durably processes the reservation requests and deduplicates them. +Each product can be reserved only once. + +## Microservices: Sagas + +An example of a trip reservation workflow, using the saga pattern to undo previous steps in case of an error. + +Durable Execution's guarantee to run code to the end in the presence of failures, and to deterministically recover previous steps from the journal, makes sagas easy. +Every step pushes a compensation action (an undo operation) to a stack. In the case of an error, those operations are run. + +The main requirement is that steps are implemented as journaled operations, like `ctx.run()` or RPC/messaging. + +### Adding compensations +The example shows two ways you can implement the compensation, depending on the characteristics of the API/system you interact with. + +The flight and car reservations work in a two-phase commit way, where you first create a reservation, get a reservation ID back, and then confirm or cancel the reservation with its ID. +In this case, you need to add the compensation to the list after creating the reservation, because you need the reservation ID to cancel it. +If the failure happens while making the reservation, you can be sure that it never takes effect, because you didn't confirm it. + +The payment on the other hand uses a client generated idempotency key. +The payment goes through in one shot (single API call). +If we receive an error, we might not be sure if this occurred before or after the payment took effect. +Therefore, we need to add the compensation to the list before the payment is made. +If a failure happens during the payment, the compensation will run. +The downstream API then uses the idempotency key to check if the payment went through, and whether it needs to be refunded. + +Note that the compensating action needs to be idempotent. + +### Running this example +1. [Start the Restate Server](https://docs.restate.dev/develop/local_dev) in a separate shell: `restate-server` +2. Start the service: `./gradlew -PmainClass=my.example.sagas.BookingWorkflow run` +3. Register the services: `restate -y deployments register localhost:9080` + +### Demo scenario + +Have a look at the logs to see how the compensations run in case of a terminal error. + +Start the workflow: +```shell +curl -X POST localhost:8080/BookingWorkflow/trip12883/run -H 'content-type: application/json' -d '{ + "flights": { + "flightId": "12345", + "passengerName": "John Doe" + }, + "car": { + "pickupLocation": "Airport", + "rentalDate": "2024-12-16" + }, + "paymentInfo": { + "cardNumber": "4111111111111111", + "amount": 1500 + } +}' +``` + +Have a look at the logs to see the cancellations of the flight and car booking in case of a terminal error: +```shell +2024-12-18 11:35:48 INFO [BookingWorkflow/run][inv_12ogPnVefk1c3clc9wNhEa4pMxxRh9IRyx] dev.restate.sdk.core.InvocationStateMachine - Start invocation +2024-12-18 11:35:49 INFO [Flights/reserve][inv_1ccelXW8IxuW6QpLWQu9ykt5aMAqRTl7pL] dev.restate.sdk.core.InvocationStateMachine - Start invocation +2024-12-18 11:35:49 INFO [Flights/reserve][inv_1ccelXW8IxuW6QpLWQu9ykt5aMAqRTl7pL] dev.restate.patterns.activities.Flights - Flight reservation created with id: 35ab7c68-6f32-48f6-adb9-a2a74076f4df +2024-12-18 11:35:49 INFO [Flights/reserve][inv_1ccelXW8IxuW6QpLWQu9ykt5aMAqRTl7pL] dev.restate.sdk.core.InvocationStateMachine - End invocation +2024-12-18 11:35:49 INFO [CarRentals/reserve][inv_13cgaqr4XecK2ztj72BfVPuscdL1SJwMCZ] dev.restate.sdk.core.InvocationStateMachine - Start invocation +2024-12-18 11:35:49 INFO [CarRentals/reserve][inv_13cgaqr4XecK2ztj72BfVPuscdL1SJwMCZ] dev.restate.patterns.activities.CarRentals - Car rental reservation created with id: c103022e-9dda-4a34-a6ef-0c95d2911b2c +2024-12-18 11:35:49 INFO [CarRentals/reserve][inv_13cgaqr4XecK2ztj72BfVPuscdL1SJwMCZ] dev.restate.sdk.core.InvocationStateMachine - End invocation +2024-12-18 11:35:49 ERROR [BookingWorkflow/run][inv_12ogPnVefk1c3clc9wNhEa4pMxxRh9IRyx] dev.restate.patterns.clients.PaymentClient - This payment should never be accepted! Aborting booking. +2024-12-18 11:35:49 INFO [Flights/cancel][inv_19STR0U1v5Xo5W2UsYS3rhZEI02VGDVJM5] dev.restate.sdk.core.InvocationStateMachine - Start invocation +2024-12-18 11:35:49 INFO [Flights/cancel][inv_19STR0U1v5Xo5W2UsYS3rhZEI02VGDVJM5] dev.restate.patterns.activities.Flights - Flight reservation cancelled with id: 35ab7c68-6f32-48f6-adb9-a2a74076f4df +2024-12-18 11:35:49 INFO [Flights/cancel][inv_19STR0U1v5Xo5W2UsYS3rhZEI02VGDVJM5] dev.restate.sdk.core.InvocationStateMachine - End invocation +2024-12-18 11:35:49 INFO [CarRentals/cancel][inv_14PS98BWOeNn1zw3yn2RqJ0wSp7V5sEJMd] dev.restate.sdk.core.InvocationStateMachine - Start invocation +2024-12-18 11:35:49 INFO [CarRentals/cancel][inv_14PS98BWOeNn1zw3yn2RqJ0wSp7V5sEJMd] dev.restate.patterns.activities.CarRentals - Car rental reservation cancelled with id: c103022e-9dda-4a34-a6ef-0c95d2911b2c +2024-12-18 11:35:49 INFO [CarRentals/cancel][inv_14PS98BWOeNn1zw3yn2RqJ0wSp7V5sEJMd] dev.restate.sdk.core.InvocationStateMachine - End invocation +2024-12-18 11:35:49 INFO [BookingWorkflow/run][inv_12ogPnVefk1c3clc9wNhEa4pMxxRh9IRyx] dev.restate.patterns.clients.PaymentClient - Refunding payment with id: 1a640cda-bd5f-9751-b6b9-274817549b58 +2024-12-18 11:35:49 WARN [BookingWorkflow/run][inv_12ogPnVefk1c3clc9wNhEa4pMxxRh9IRyx] dev.restate.sdk.core.ResolvedEndpointHandlerImpl - Error when processing the invocation +dev.restate.sdk.common.TerminalException: Payment could not be accepted! +... rest of trace ... +``` + +## Microservices: Payment State Machine + +This example shows how to build a reliable payment state machine. + +The state machine ensures that payments are processed once, not duplicated, +can be revoked, and that concurrent payment requests and cancellations sort +out consistently. + +The example illustrates the following aspects: + +- Payment requests use a token to identify payments (stripe-style) +- Restate tracks the status of each payment request by token in internal state. +- A payment can be cancelled, which prevents it from succeeding later, or rolls it back, if + it was already processed. +- Virtual Object concurrency ensures that requests and cancellations don't produce + tricky race conditions. +- Expiry of tokens is handled through Restate's internal timers. + +Despite the relatively few lines of code (no careful synchronization, retries, or other recovery logic), +this application maintains a high level of consistency in the presence of concurrent external requests +and failures. + +### Running this example +1. [Start the Restate Server](https://docs.restate.dev/develop/local_dev) in a separate shell: `restate-server` +2. Start the service: `./gradlew -PmainClass=my.example.statemachinepayments.AppMain run` +3. Register the services: `restate -y deployments register localhost:9080` + +### Demo scenario +Send some requests: + +- Make a payment + ```shell + curl -X POST localhost:8080/PaymentProcessor/some-string-id/makePayment -H 'content-type: application/json' \ + -d '{ "accountId": "abc", "amountCents": 100 }' + ``` + +- Cancel a payment. The 'key' parameter is the idempotency token, there is no further request data. + + ```shell + curl -X POST localhost:8080/PaymentProcessor/some-string-id/cancelPayment + ``` + +- Have a look at the state: +```shell +restate kv get PaymentProcessor some-string-id +``` +``` +🤖 State: +――――――――― + + Service PaymentProcessor + Key some-string-id + + KEY VALUE + payment { + "accountId": "abc", + "amountCents": 100 + } + status "CANCELLED" +``` + +## Async Tasks: (Delayed) Tasks Queue + +Use Restate as a queue. Schedule tasks for now or later and ensure the task is only executed once. + +Files to look at: +- [Task Submitter](src/task_submitter.ts): schedules tasks via send requests with and idempotency key. + - The **send requests** put the tasks in Restate's queue. The task submitter does not wait for the task response. + - The **idempotency key** in the header is used by Restate to deduplicate requests. + - If a delay is set, the task will be executed later and Restate will track the timer durably, like a **delayed task queue**. +- [Async Task Worker](src/async_task_worker.ts): gets invoked by Restate for each task in the queue. + +## Async Tasks: Async Data Upload + +This example shows how to use the Restate SDK to **kick of a synchronous task and turn it into an asynchronous one if it takes too long**. + +The example implements a [data upload service](src/main/java/my/example/DataUploadService.java), that creates a bucket, uploads data to it, and then returns the URL. + +The [upload client](src/main/java/my/example/UploadClient.java) does a synchronous request to upload the file, and the server will respond with the URL. + +If the upload takes too long, however, the client asks the upload service to send the URL later in an email. + +### Running the examples +1. [Start the Restate Server](https://docs.restate.dev/develop/local_dev) in a separate shell: `restate-server` +2. Start the service: `./gradlew -PmainClass=my.example.dataupload.DataUploadService run` +3. Register the services: `restate -y deployments register localhost:9080` + +## Demo scenario + +Run the upload client with a userId: `./gradlew -PmainClass=my.example.UploadClient run --args="someone21"` + +This will submit an upload workflow to the data upload service. +The workflow will run only once per ID, so you need to provide a new ID for each run. + +Have a look at the logs to see how the execution switches from synchronously waiting to the response to requesting an email: + +#### Fast upload + +Client logs: +``` +2024-12-18 15:02:34 INFO my.example.UploadClient - Uploading data for user someone212 +2024-12-18 15:02:36 INFO my.example.UploadClient - Fast upload... URL was https://s3-eu-central-1.amazonaws.com/257587941/ +``` +Workflow logs: +``` +2024-12-18 15:02:34 INFO [DataUploadService/run][inv_17cZwACLnO7f5m1BjN7SKoQpuyycCmWwnv] dev.restate.sdk.core.InvocationStateMachine - Start invocation +2024-12-18 15:02:34 INFO [DataUploadService/run][inv_17cZwACLnO7f5m1BjN7SKoQpuyycCmWwnv] my.example.utils.DataOperations - Creating bucket with URL https://s3-eu-central-1.amazonaws.com/257587941/ +2024-12-18 15:02:34 INFO [DataUploadService/run][inv_17cZwACLnO7f5m1BjN7SKoQpuyycCmWwnv] my.example.utils.DataOperations - Uploading data to target https://s3-eu-central-1.amazonaws.com/257587941/. ETA: 1500 ms +2024-12-18 15:02:36 INFO [DataUploadService/run][inv_17cZwACLnO7f5m1BjN7SKoQpuyycCmWwnv] dev.restate.sdk.core.InvocationStateMachine - End invocation +``` + +#### Slow upload + +Client logs: +``` +2024-12-18 15:02:41 INFO my.example.UploadClient - Uploading data for user someone2122 +2024-12-18 15:02:46 INFO my.example.UploadClient - Slow upload... Mail the link later +``` + +Workflow logs: +``` +2024-12-18 15:02:41 INFO [DataUploadService/run][inv_1koakM2GXxcN2Co3aM3pSrQJokiqnyR7MJ] dev.restate.sdk.core.InvocationStateMachine - Start invocation +2024-12-18 15:02:41 INFO [DataUploadService/run][inv_1koakM2GXxcN2Co3aM3pSrQJokiqnyR7MJ] my.example.utils.DataOperations - Creating bucket with URL https://s3-eu-central-1.amazonaws.com/493004051/ +2024-12-18 15:02:41 INFO [DataUploadService/run][inv_1koakM2GXxcN2Co3aM3pSrQJokiqnyR7MJ] my.example.utils.DataOperations - Uploading data to target https://s3-eu-central-1.amazonaws.com/493004051/. ETA: 10000 ms +2024-12-18 15:02:46 INFO [DataUploadService/resultAsEmail][inv_1koakM2GXxcN7veCWCBDo77G0P2BIX7KFz] dev.restate.sdk.core.InvocationStateMachine - Start invocation +2024-12-18 15:02:51 INFO [DataUploadService/run][inv_1koakM2GXxcN2Co3aM3pSrQJokiqnyR7MJ] dev.restate.sdk.core.InvocationStateMachine - End invocation +2024-12-18 15:02:51 INFO [DataUploadService/resultAsEmail][inv_1koakM2GXxcN7veCWCBDo77G0P2BIX7KFz] my.example.utils.EmailClient - Sending email to https://s3-eu-central-1.amazonaws.com/493004051/ with url someone2122@example.com +2024-12-18 15:02:51 INFO [DataUploadService/resultAsEmail][inv_1koakM2GXxcN7veCWCBDo77G0P2BIX7KFz] dev.restate.sdk.core.InvocationStateMachine - End invocation +``` +You see the call to `resultAsEmail` after the upload took too long, and the sending of the email. + +## Async Tasks: Payment Signals - Combining Sync and Async (Webhook) Responses from Stripe + +This example issues a payment request to Stripe. +When calling Stripe, the result often comes synchronously as a response API call. +But sometimes, an immediate answer is not possible, and especially some payment +methods (like IBAN transfers or Klarna) frequently only return "processing" to notify +you later via a webhook. + +This example combines both paths in a single function that reliably waits for both +paths, if needed, thus giving you a single long-running synchronous function. +This is useful, for example, when the payment is processed completely asynchronously, +like during periodic charging of a subscription. + +And because we have a durable execution system that suspends and resumes state +and promises, we can actually combine this into a single reliably promise/async-function. + +### Running the Example + +This example works end-to-end with Stripe. You need a Stripe account to run it. +If you want to run everything locally, you also need a tool like _ngrok_ to forward +webhooks to your local machine. + +1. [Start the Restate Server](https://docs.restate.dev/develop/local_dev) in a separate shell: `restate-server` +2. Start the service: `./gradlew -PmainClass=my.example.signalspayments.PaymentService run` +3. Register the services: `restate -y deployments register localhost:9080` + +4. Create a free Stripe test account. This requires no verification, but you can only work + with test data, not make real payments. Good enough for this example. + +5. In the Stripe UI, go to "Developers" -> "API Keys" and copy the _secret key_ (`sk_test_...`). + Add it to the [StripeUtils.java](./src/main/java/my/example/utils/StripeUtils.java) file. Because this is a dev-only + API key, it supports only test data, so it isn't super sensitive. + +6. Run launch _ngrok_: Get a free account and download the binary, or launch a docker container. + Make it forward HTTP calls to local port `8080` + - `NGROK_AUTHTOKEN= ngrok http 8080` + - or `docker run --rm -it -e NGROK_AUTHTOKEN= --network host ngrok/ngrok http 8080` (on Linux command). + Copy the public URL that ngrok shows you: `https://.ngrok-free.app` + +7. Go to the Stripe UI and create a webhook. Select all _"Payment Intent"_ event types. Put the ngrok + public URL + `/PaymentService/processWebhook` as the webhook URL (you need to update this whenever you stop/start ngrok). + Example: `https://.ngrok-free.app/PaymentService/processWebhooks` + +8. Put the webhook secret (`whsec_...`) to the [StripeUtils.java](./src/main/java/my/example/utils/StripeUtils.java) file. + +Use as test data `pm_card_visa` for a successful payment and `pm_card_visa_chargeDeclined` for a declined payment. +Because the test data rarely triggers an async response, this example's tools can mimic that +if you add `"delayedStatus": true` to the request. + +```shell +curl localhost:8080/PaymentService/processPayment -H 'content-type: application/json' -d '{ + "paymentMethodId": "pm_card_visa", + "amount": 109, + "delayedStatus": true +}' +``` + +A few notes: +* You would usually submit payment calls through Restate also with an idempotency token, + like: ` -H 'idempotency-key: my-id-token'` +* The webhook setup with ngrok is not trivial and can easily be wrong. You might end up with + some payments waiting for the webhooks. You can use the CLI to cancel them: + `npx restate inv list` and `npx restate inv cancel `. +* Here is an opportunity for the SAGAs pattern to cancel payments in that case. + + +## Event Processing Example: Transactional Handlers with Durable Side Effects and Timers + +Processing events (from Kafka) to update various downstream systems. +- Durable side effects with retries and recovery of partial progress +- Events get sent to objects based on the Kafka key. + For each key, Restate ensures that events are processed sequentially and in order. + Slow events on other keys do not block processing (high fan-out, no head-of-line waiting). +- Ability to delay events when the downstream systems are busy, without blocking + entire partitions. + + +### Running the example + +1. Start the Kafka broker via Docker Compose: `docker compose up -d`. + +2. [Start the Restate Server](https://docs.restate.dev/develop/local_dev) with the Kafka broker configuration in a separate shell: `restate-server --config-file restate.toml` + +3. Start the service: `./gradlew -PmainClass=my.example.eventtransactions.UserFeed run` + +4. Register the example at Restate server by calling `restate -y deployment register localhost:9080`. + +5. Let Restate subscribe to the Kafka topic `social-media-posts` and invoke `UserFeed/processPost` on each message. +```shell +curl localhost:9070/subscriptions -H 'content-type: application/json' \ +-d '{ + "source": "kafka://my-cluster/social-media-posts", + "sink": "service://UserFeed/processPost", + "options": {"auto.offset.reset": "earliest"} +}' +``` + +### Demo scenario + +Start a Kafka producer and send some messages to the `social-media-posts` topic: +```shell +docker exec -it broker kafka-console-producer --bootstrap-server broker:29092 --topic social-media-posts --property parse.key=true --property key.separator=: +``` + +Let's submit some posts for two different users: +``` +userid1:{"content": "Hi! This is my first post!", "metadata": "public"} +userid2:{"content": "Hi! This is my first post!", "metadata": "public"} +userid1:{"content": "Hi! This is my second post!", "metadata": "public"} +``` + +Our Kafka broker only has a single partition so all these messages end up on the same partition. +You can see in the logs how events for different users are processed in parallel, but events for the same user are processed sequentially: + +```shell +2024-12-17 18:07:43 INFO [UserFeed/processPost][inv_13puWeoWJykN17cPZQm43rQZxiPr0qNmhP] my.example.utils.Stubs - Creating post 300dbd34-eae8-4875-8a71-c18b14e2aed7 for user userid1 +2024-12-17 18:07:43 INFO [UserFeed/processPost][inv_13puWeoWJykN17cPZQm43rQZxiPr0qNmhP] my.example.utils.Stubs - Content moderation for post 300dbd34-eae8-4875-8a71-c18b14e2aed7 is still pending... Will check again in 5 seconds +2024-12-17 18:07:46 INFO [UserFeed/processPost][inv_1eZjTF0DbaEl3UzViEbqNPu6FZK4Y8KBAB] dev.restate.sdk.core.InvocationStateMachine - Start invocation +2024-12-17 18:07:46 INFO [UserFeed/processPost][inv_1eZjTF0DbaEl3UzViEbqNPu6FZK4Y8KBAB] my.example.utils.Stubs - Creating post 011443bb-a47d-43a0-8df4-d2c4ea50b3b8 for user userid2 +2024-12-17 18:07:46 INFO [UserFeed/processPost][inv_1eZjTF0DbaEl3UzViEbqNPu6FZK4Y8KBAB] my.example.utils.Stubs - Content moderation for post 011443bb-a47d-43a0-8df4-d2c4ea50b3b8 is still pending... Will check again in 5 seconds +2024-12-17 18:07:48 INFO [UserFeed/processPost][inv_13puWeoWJykN17cPZQm43rQZxiPr0qNmhP] my.example.utils.Stubs - Content moderation for post 300dbd34-eae8-4875-8a71-c18b14e2aed7 is still pending... Will check again in 5 seconds +2024-12-17 18:07:56 INFO [UserFeed/processPost][inv_1eZjTF0DbaEl3UzViEbqNPu6FZK4Y8KBAB] my.example.utils.Stubs - Content moderation for post 011443bb-a47d-43a0-8df4-d2c4ea50b3b8 is done +2024-12-17 18:07:56 INFO [UserFeed/processPost][inv_1eZjTF0DbaEl3UzViEbqNPu6FZK4Y8KBAB] my.example.utils.Stubs - Updating user feed for user userid2 with post 011443bb-a47d-43a0-8df4-d2c4ea50b3b8 +2024-12-17 18:07:56 INFO [UserFeed/processPost][inv_1eZjTF0DbaEl3UzViEbqNPu6FZK4Y8KBAB] dev.restate.sdk.core.InvocationStateMachine - End invocation +2024-12-17 18:07:58 INFO [UserFeed/processPost][inv_13puWeoWJykN17cPZQm43rQZxiPr0qNmhP] my.example.utils.Stubs - Content moderation for post 300dbd34-eae8-4875-8a71-c18b14e2aed7 is still pending... Will check again in 5 seconds +2024-12-17 18:09:03 INFO [UserFeed/processPost][inv_13puWeoWJykN17cPZQm43rQZxiPr0qNmhP] my.example.utils.Stubs - Content moderation for post 300dbd34-eae8-4875-8a71-c18b14e2aed7 is still pending... Will check again in 5 seconds +2024-12-17 18:09:08 INFO [UserFeed/processPost][inv_13puWeoWJykN17cPZQm43rQZxiPr0qNmhP] my.example.utils.Stubs - Content moderation for post 300dbd34-eae8-4875-8a71-c18b14e2aed7 is done +2024-12-17 18:09:08 INFO [UserFeed/processPost][inv_13puWeoWJykN17cPZQm43rQZxiPr0qNmhP] my.example.utils.Stubs - Updating user feed for user userid1 with post 300dbd34-eae8-4875-8a71-c18b14e2aed7 +2024-12-17 18:09:08 INFO [UserFeed/processPost][inv_13puWeoWJykN17cPZQm43rQZxiPr0qNmhP] dev.restate.sdk.core.InvocationStateMachine - End invocation +2024-12-17 18:09:08 INFO [UserFeed/processPost][inv_13puWeoWJykN0lJ761afYGoczigaKJDzWh] dev.restate.sdk.core.InvocationStateMachine - Start invocation +2024-12-17 18:09:08 INFO [UserFeed/processPost][inv_13puWeoWJykN0lJ761afYGoczigaKJDzWh] my.example.utils.Stubs - Creating post 738f0f12-8191-4702-bf49-59e1604ee799 for user userid1 +2024-12-17 18:09:08 INFO [UserFeed/processPost][inv_13puWeoWJykN0lJ761afYGoczigaKJDzWh] my.example.utils.Stubs - Content moderation for post 738f0f12-8191-4702-bf49-59e1604ee799 is still pending... Will check again in 5 seconds +2024-12-17 18:09:48 INFO [UserFeed/processPost][inv_13puWeoWJykN0lJ761afYGoczigaKJDzWh] my.example.utils.Stubs - Content moderation for post 738f0f12-8191-4702-bf49-59e1604ee799 is done +2024-12-17 18:09:48 INFO [UserFeed/processPost][inv_13puWeoWJykN0lJ761afYGoczigaKJDzWh] my.example.utils.Stubs - Updating user feed for user userid1 with post 738f0f12-8191-4702-bf49-59e1604ee799 +2024-12-17 18:09:48 INFO [UserFeed/processPost][inv_13puWeoWJykN0lJ761afYGoczigaKJDzWh] dev.restate.sdk.core.InvocationStateMachine - End invocation +``` + +As you see, slow events do not block other slow events. +Restate effectively created a queue per user ID. + +The handler creates the social media post and waits for content moderation to finish. +If the moderation takes long, and there is an infrastructure crash, then Restate will trigger a retry. +The handler will fast-forward to where it was, will recover the post ID and will continue waiting for moderation to finish. + +You can try it out by killing Restate or the service halfway through processing a post. + + +## Event Processing Example: Event Enrichment + +This example shows an example of: +- **Event enrichment** over different sources: RPC and Kafka +- **Stateful actors / Digital twins** updated over Kafka +- **Streaming join** +- Populating state from events and making it queryable via RPC handlers. + +The example implements a package delivery tracking service. +Packages are registered via an RPC handler, and their location is updated via Kafka events. +The Package Tracker Virtual Object tracks the package details and its location history. + +### Running the example + +1. Start the Kafka broker via Docker Compose: `docker compose up -d`. + +2. Start Restate Server with the Kafka broker configuration in a separate shell: `restate-server --config-file restate.toml` + +3. Start the service: `./gradlew -PmainClass=my.example.eventenrichment.PackageTracker run` + +4. Register the example at Restate server by calling + `restate -y deployment register localhost:9080`. + +5. Let Restate subscribe to the Kafka topic `package-location-updates` and invoke `PackageTracker/updateLocation` on each message. +```shell +curl localhost:9070/subscriptions -H 'content-type: application/json' \ +-d '{ + "source": "kafka://my-cluster/package-location-updates", + "sink": "service://PackageTracker/updateLocation", + "options": {"auto.offset.reset": "earliest"} +}' +``` + +### Demo scenario + +1. Register a new package via the RPC handler: +```shell +curl localhost:8080/PackageTracker/package1/registerPackage \ + -H 'content-type: application/json' -d '{"finalDestination": "Bridge 6, Amsterdam"}' +``` + +2. Start a Kafka producer and publish some messages to update the location of the package on the `package-location-updates` topic: +```shell +docker exec -it broker kafka-console-producer --bootstrap-server broker:29092 --topic package-location-updates --property parse.key=true --property key.separator=: +``` +Send messages like +``` +package1:{"timestamp": "2024-10-10 13:00", "location": "Pinetree Road 5, Paris"} +package1:{"timestamp": "2024-10-10 14:00", "location": "Mountain Road 155, Brussels"} +``` + +3. Query the package location via the RPC handler: +```shell +curl localhost:8080/PackageTracker/package1/getPackageInfo +``` +or via the CLI: `npx restate kv get package-tracker package1` + +You can see how the state was enriched by the initial RPC event and the subsequent Kafka events: +``` +🤖 State: +――――――――― + + Service package-tracker + Key package1 + + KEY VALUE + package-info { + "finalDestination": "Bridge 6, Amsterdam", + "locations": [ + { + "location": "Pinetree Road 5, Paris", + "timestamp": "2024-10-10 13:00" + }, + { + "location": "Mountain Road 155, Brussels", + "timestamp": "2024-10-10 14:00" + } + ] + } +``` diff --git a/java/patterns-use-cases/async-tasks-parallelize-work/build.gradle.kts b/java/patterns-use-cases/patterns-use-cases/build.gradle.kts similarity index 89% rename from java/patterns-use-cases/async-tasks-parallelize-work/build.gradle.kts rename to java/patterns-use-cases/patterns-use-cases/build.gradle.kts index bedcad8a..3ee1c389 100644 --- a/java/patterns-use-cases/async-tasks-parallelize-work/build.gradle.kts +++ b/java/patterns-use-cases/patterns-use-cases/build.gradle.kts @@ -25,13 +25,17 @@ dependencies { implementation("com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.16.1") implementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.16.1") + // Stripe + implementation("com.stripe:stripe-java:25.7.0") + implementation("com.google.code.gson:gson:2.10.1") + // Logging (optional) implementation("org.apache.logging.log4j:log4j-core:2.24.1") } // Set main class application { - mainClass.set(project.property("mainClass") as String) + mainClass.set("my.example.FanOutWorker") } tasks.withType { diff --git a/java/patterns-use-cases/async-tasks-parallelize-work/gradle/wrapper/gradle-wrapper.jar b/java/patterns-use-cases/patterns-use-cases/gradle/wrapper/gradle-wrapper.jar similarity index 100% rename from java/patterns-use-cases/async-tasks-parallelize-work/gradle/wrapper/gradle-wrapper.jar rename to java/patterns-use-cases/patterns-use-cases/gradle/wrapper/gradle-wrapper.jar diff --git a/java/patterns-use-cases/async-tasks-parallelize-work/gradle/wrapper/gradle-wrapper.properties b/java/patterns-use-cases/patterns-use-cases/gradle/wrapper/gradle-wrapper.properties similarity index 100% rename from java/patterns-use-cases/async-tasks-parallelize-work/gradle/wrapper/gradle-wrapper.properties rename to java/patterns-use-cases/patterns-use-cases/gradle/wrapper/gradle-wrapper.properties diff --git a/java/patterns-use-cases/async-tasks-parallelize-work/gradlew b/java/patterns-use-cases/patterns-use-cases/gradlew similarity index 100% rename from java/patterns-use-cases/async-tasks-parallelize-work/gradlew rename to java/patterns-use-cases/patterns-use-cases/gradlew diff --git a/java/patterns-use-cases/async-tasks-parallelize-work/gradlew.bat b/java/patterns-use-cases/patterns-use-cases/gradlew.bat similarity index 100% rename from java/patterns-use-cases/async-tasks-parallelize-work/gradlew.bat rename to java/patterns-use-cases/patterns-use-cases/gradlew.bat diff --git a/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/dataupload/DataUploadService.java b/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/dataupload/DataUploadService.java new file mode 100644 index 00000000..c2bcc818 --- /dev/null +++ b/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/dataupload/DataUploadService.java @@ -0,0 +1,39 @@ +package my.example.dataupload; + +import dev.restate.sdk.JsonSerdes; +import dev.restate.sdk.SharedWorkflowContext; +import dev.restate.sdk.WorkflowContext; +import dev.restate.sdk.annotation.Shared; +import dev.restate.sdk.annotation.Workflow; +import dev.restate.sdk.common.DurablePromiseKey; +import dev.restate.sdk.http.vertx.RestateHttpEndpointBuilder; +import my.example.dataupload.utils.EmailClient; + +import static my.example.dataupload.utils.DataOperations.createS3Bucket; +import static my.example.dataupload.utils.DataOperations.uploadData; + +@Workflow +public class DataUploadService { + + private static final DurablePromiseKey URL_PROMISE = + DurablePromiseKey.of("url", JsonSerdes.STRING); + + @Workflow + public String run(WorkflowContext ctx) { + String url = ctx.run(JsonSerdes.STRING, () -> createS3Bucket()); + ctx.run(() -> uploadData(url)); + + ctx.promiseHandle(URL_PROMISE).resolve(url); + return url; + } + + @Shared + public void resultAsEmail(SharedWorkflowContext ctx, String email) { + String url = ctx.promise(URL_PROMISE).awaitable().await(); + ctx.run(() -> EmailClient.send(url, email)); + } + + public static void main(String[] args) { + RestateHttpEndpointBuilder.builder().bind(new DataUploadService()).buildAndListen(9082); + } +} diff --git a/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/dataupload/UploadClient.java b/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/dataupload/UploadClient.java new file mode 100644 index 00000000..d98dded4 --- /dev/null +++ b/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/dataupload/UploadClient.java @@ -0,0 +1,56 @@ +package my.example.dataupload; + +import dev.restate.sdk.client.Client; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +// The upload client calls the data upload workflow and awaits the result for 5 seconds. +// If the workflow doesn't complete within that time, it asks the +// workflow to send the upload url via email instead. +public class UploadClient { + private static final Logger logger = LogManager.getLogger(UploadClient.class); + + private static final String RESTATE_URL = "http://localhost:8080"; + + public void uploadData(String userId, String email) { + logger.info("Uploading data for user {}", userId); + + // Submit the workflow + Client restateClient = Client.connect(RESTATE_URL); + var uploadClient = DataUploadServiceClient.fromClient(restateClient, userId); + uploadClient.submit(); + + String url; + try { + // Wait for the workflow to complete or timeout + url = uploadClient.workflowHandle().attachAsync() + .orTimeout(5, TimeUnit.SECONDS) + .join(); + } catch (Exception e) { + if (e.getCause() instanceof TimeoutException) { + logger.info("Slow upload... Mail the link later"); + uploadClient.resultAsEmail(email); + return; + } + throw e; + } + + // ... process directly ... + logger.info("Fast upload... URL was {}", url); + } + + //-------------------------------------------------------------------------------- + // This client would be used in some other part of the system. + // For the sake of this example, we are calling it here from the main method, so you can test the example. + public static void main(String[] args) { + if (args.length < 1) { + System.err.println("Specify the userId as the argument: " + + "./gradlew run -PmainClass=my.example.UploadClient --args=\"userId123\""); + System.exit(1); + } + new UploadClient().uploadData(args[0], args[0] + "@example.com"); + } +} diff --git a/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/dataupload/utils/DataOperations.java b/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/dataupload/utils/DataOperations.java new file mode 100644 index 00000000..8cb70fa5 --- /dev/null +++ b/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/dataupload/utils/DataOperations.java @@ -0,0 +1,25 @@ +package my.example.dataupload.utils; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class DataOperations { + private static final Logger logger = LogManager.getLogger(DataOperations.class); + + public static String createS3Bucket(){ + String bucket = String.valueOf((long) (Math.random() * 1_000_000_000)); + String bucketUrl = "https://s3-eu-central-1.amazonaws.com/" + bucket + "/"; + logger.info("Creating bucket with URL {}", bucketUrl); + return bucketUrl; + } + + public static void uploadData(String url){ + long timeRemaining = Math.random() < 0.5 ? 1500 : 10000; + logger.info("Uploading data to target {}. ETA: {} ms", url, timeRemaining); + try { + Thread.sleep(timeRemaining); + } catch (InterruptedException e) { + logger.error("Upload failed: {}", e.getMessage()); + } + } +} diff --git a/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/dataupload/utils/EmailClient.java b/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/dataupload/utils/EmailClient.java new file mode 100644 index 00000000..5e476011 --- /dev/null +++ b/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/dataupload/utils/EmailClient.java @@ -0,0 +1,12 @@ +package my.example.dataupload.utils; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class EmailClient { + private static final Logger logger = LogManager.getLogger(EmailClient.class); + + public static void send(String email, String url){ + logger.info("Sending email to {} with url {}", email, url); + } +} diff --git a/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/durablerpc/MyClient.java b/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/durablerpc/MyClient.java new file mode 100644 index 00000000..da463ce3 --- /dev/null +++ b/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/durablerpc/MyClient.java @@ -0,0 +1,21 @@ +package my.example.durablerpc; + +import dev.restate.sdk.client.CallRequestOptions; +import dev.restate.sdk.client.Client; + +public class MyClient { + + public static String RESTATE_URL = "http://localhost:8080"; + + public boolean reserveProduct(String productId, String reservationId) { + Client restateClient = Client.connect(RESTATE_URL); + + // Durable RPC call to the product service + // Restate registers the request and makes sure runs to completion exactly once + boolean reserved = ProductServiceClient.fromClient(restateClient, productId) + // Restate deduplicates requests with the same idempotency key + .reserve(CallRequestOptions.DEFAULT.withIdempotency(reservationId)); + + return reserved; + } +} diff --git a/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/durablerpc/ProductService.java b/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/durablerpc/ProductService.java new file mode 100644 index 00000000..d78da570 --- /dev/null +++ b/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/durablerpc/ProductService.java @@ -0,0 +1,39 @@ +package my.example.durablerpc; + +import dev.restate.sdk.JsonSerdes; +import dev.restate.sdk.ObjectContext; +import dev.restate.sdk.annotation.Handler; +import dev.restate.sdk.annotation.VirtualObject; +import dev.restate.sdk.common.StateKey; +import dev.restate.sdk.http.vertx.RestateHttpEndpointBuilder; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +/* + * Have a look at the ProductServiceClient class to see how to call Restate handlers + * programmatically from another process. + */ +@VirtualObject +public class ProductService { + private static final Logger logger = LogManager.getLogger(ProductService.class); + + private static final StateKey RESERVED = StateKey.of("reserved", JsonSerdes.BOOLEAN); + + + @Handler + public boolean reserve(ObjectContext ctx) { + if (ctx.get(RESERVED).orElse(false)){ + logger.info("Product already reserved"); + return false; + } + logger.info("Reserving product"); + ctx.set(RESERVED, true); + return true; + } + + public static void main(String[] args) { + RestateHttpEndpointBuilder.builder() + .bind(new ProductService()) + .buildAndListen(); + } +} diff --git a/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/eventenrichment/PackageTracker.java b/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/eventenrichment/PackageTracker.java new file mode 100644 index 00000000..4476f0a3 --- /dev/null +++ b/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/eventenrichment/PackageTracker.java @@ -0,0 +1,61 @@ +package my.example.eventenrichment; + +import dev.restate.sdk.ObjectContext; +import dev.restate.sdk.SharedObjectContext; +import dev.restate.sdk.annotation.Handler; +import dev.restate.sdk.annotation.Shared; +import dev.restate.sdk.annotation.VirtualObject; +import dev.restate.sdk.common.StateKey; +import dev.restate.sdk.common.TerminalException; +import dev.restate.sdk.http.vertx.RestateHttpEndpointBuilder; +import dev.restate.sdk.serde.jackson.JacksonSerdes; +import my.example.eventenrichment.types.LocationUpdate; +import my.example.eventenrichment.types.PackageInfo; + +// Package tracking system: +// Digital twin representing a package in delivery with real-time location updates. +// Handlers get called over HTTP or Kafka. +@VirtualObject +public class PackageTracker { + + private static final StateKey PACKAGE_INFO = + StateKey.of("package-info", JacksonSerdes.of(PackageInfo.class)); + + // Called first by the seller over HTTP + @Handler + public void registerPackage(ObjectContext ctx, PackageInfo packageInfo){ + // Store the package details in the state + ctx.set(PACKAGE_INFO, packageInfo); + } + + // Connected to a Kafka topic for real-time location updates + @Handler + public void updateLocation(ObjectContext ctx, LocationUpdate locationUpdate){ + var packageInfo = ctx.get(PACKAGE_INFO) + .orElseThrow(() -> new TerminalException("Package not found")); + + // Update the package info with the new location + packageInfo.addLocation(locationUpdate); + ctx.set(PACKAGE_INFO, packageInfo); + } + + // Called by the delivery dashboard to get the package details + @Shared + public PackageInfo getPackageInfo(SharedObjectContext ctx){ + return ctx.get(PACKAGE_INFO) + .orElseThrow(() -> new TerminalException("Package not found")); + } + + public static void main(String[] args) { + RestateHttpEndpointBuilder.builder() + .bind(new PackageTracker()) + .buildAndListen(9081); + } +} + +// Example API Usage: +/* +curl localhost:8080/PackageTracker/package123/registerPackage -H 'content-type: application/json' -d '{ "finalDestination": "Bridge 6, Amsterdam"}' +curl localhost:8080/PackageTracker/package123/updateLocation -H 'content-type: application/json' -d '{ "timestamp": "2024-12-11T12:00:00Z", "location": "Warehouse A" }' +curl localhost:8080/PackageTracker/package123/getPackageInfo +*/ \ No newline at end of file diff --git a/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/eventenrichment/types/LocationUpdate.java b/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/eventenrichment/types/LocationUpdate.java new file mode 100644 index 00000000..70487034 --- /dev/null +++ b/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/eventenrichment/types/LocationUpdate.java @@ -0,0 +1,3 @@ +package my.example.eventenrichment.types; + +public record LocationUpdate (String timestamp, String location) {} diff --git a/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/eventenrichment/types/PackageInfo.java b/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/eventenrichment/types/PackageInfo.java new file mode 100644 index 00000000..25f5a072 --- /dev/null +++ b/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/eventenrichment/types/PackageInfo.java @@ -0,0 +1,33 @@ +package my.example.eventenrichment.types; + +import java.util.ArrayList; +import java.util.List; + +public class PackageInfo { + private String finalDestination; + private List locations = new ArrayList<>(); + + public PackageInfo(String finalDestination) { + this.finalDestination = finalDestination; + } + + public String getFinalDestination() { + return finalDestination; + } + + public void setFinalDestination(String finalDestination) { + this.finalDestination = finalDestination; + } + + public List getLocations() { + return locations; + } + + public void setLocations(List locations) { + this.locations = locations; + } + + public void addLocation(LocationUpdate locationUpdate) { + this.locations.add(locationUpdate); + } +} \ No newline at end of file diff --git a/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/eventtransactions/UserFeed.java b/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/eventtransactions/UserFeed.java new file mode 100644 index 00000000..c8240aac --- /dev/null +++ b/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/eventtransactions/UserFeed.java @@ -0,0 +1,44 @@ +package my.example.eventtransactions; + +import dev.restate.sdk.ObjectContext; +import dev.restate.sdk.annotation.Handler; +import dev.restate.sdk.annotation.VirtualObject; +import dev.restate.sdk.http.vertx.RestateHttpEndpointBuilder; + +import java.time.Duration; + +import static dev.restate.sdk.JsonSerdes.STRING; +import static my.example.eventtransactions.utils.Stubs.*; + +// Implement transactional event handlers: +// e.g. update various downstream systems via API calls +@VirtualObject +public class UserFeed { + + public record SocialMediaPost(String content, String metadata) {} + + // Connect a handler to a Kafka topic. Restate manages the Kafka subscription and offsets. + // Events are pushed in order to the Virtual Object (Kafka key = object key). + @Handler + public void processPost(ObjectContext ctx, SocialMediaPost post) { + String userId = ctx.key(); + + // Durable side effects: Restate persists intermediate results and replays them on failure. + String postId = ctx.run(STRING, () -> createPost(userId, post)); + + // No restrictions on the handler code: loops, sleeps, etc. + while(ctx.run(STRING, () -> getPostStatus(postId)).equals("PENDING")) { + // Delay processing until content moderation is complete (handler suspends when on FaaS). + // This only blocks other posts for this user (Virtual Object), not for other users. + ctx.sleep(Duration.ofMillis(5000)); + } + + ctx.run(() -> updateUserFeed(userId, postId)); + } + + public static void main(String[] args) { + RestateHttpEndpointBuilder.builder() + .bind(new UserFeed()) + .buildAndListen(); + } +} diff --git a/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/eventtransactions/utils/Stubs.java b/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/eventtransactions/utils/Stubs.java new file mode 100644 index 00000000..3eb03899 --- /dev/null +++ b/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/eventtransactions/utils/Stubs.java @@ -0,0 +1,31 @@ +package my.example.eventtransactions.utils; + +import my.example.eventtransactions.UserFeed; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.UUID; + +public class Stubs { + private static final Logger logger = LogManager.getLogger(Stubs.class); + + public static String createPost(String userId, UserFeed.SocialMediaPost post) { + String postId = UUID.randomUUID().toString(); + logger.info("Creating post {} for user {}", postId, userId); + return postId; + } + + public static String getPostStatus(String postId) { + if (Math.random() < 0.8) { + logger.info("Content moderation for post {} is still pending... Will check again in 5 seconds", postId); + return "PENDING"; + } else { + logger.info("Content moderation for post {} is done", postId); + return "DONE"; + } + } + + public static void updateUserFeed(String userId, String postId) { + logger.info("Updating user feed for user {} with post {}", userId, postId); + } +} diff --git a/java/patterns-use-cases/async-tasks-parallelize-work/src/main/java/my/example/parallelizework/FanOutWorker.java b/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/parallelizework/FanOutWorker.java similarity index 99% rename from java/patterns-use-cases/async-tasks-parallelize-work/src/main/java/my/example/parallelizework/FanOutWorker.java rename to java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/parallelizework/FanOutWorker.java index 185897fe..a060be28 100644 --- a/java/patterns-use-cases/async-tasks-parallelize-work/src/main/java/my/example/parallelizework/FanOutWorker.java +++ b/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/parallelizework/FanOutWorker.java @@ -7,6 +7,7 @@ import dev.restate.sdk.annotation.Service; import dev.restate.sdk.http.vertx.RestateHttpEndpointBuilder; import dev.restate.sdk.serde.jackson.JacksonSerdes; + import java.util.ArrayList; import java.util.List; diff --git a/java/patterns-use-cases/async-tasks-parallelize-work/src/main/java/my/example/parallelizework/utils/DataProcessingUtils.java b/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/parallelizework/utils/DataProcessingUtils.java similarity index 100% rename from java/patterns-use-cases/async-tasks-parallelize-work/src/main/java/my/example/parallelizework/utils/DataProcessingUtils.java rename to java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/parallelizework/utils/DataProcessingUtils.java diff --git a/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/queue/AsyncTaskService.java b/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/queue/AsyncTaskService.java new file mode 100644 index 00000000..a46b1f69 --- /dev/null +++ b/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/queue/AsyncTaskService.java @@ -0,0 +1,25 @@ +package my.example.queue; + +import dev.restate.sdk.Context; +import dev.restate.sdk.annotation.Handler; +import dev.restate.sdk.annotation.Service; +import dev.restate.sdk.http.vertx.RestateHttpEndpointBuilder; + +@Service +public class AsyncTaskService { + + public record TaskOpts(String key, String taskName, String payload) {} + + @Handler + public String runTask(Context ctx, TaskOpts params) { + return someHeavyWork(params); + } + + private String someHeavyWork(TaskOpts params) { + return "someHeavyWork"; + } + + public static void main(String[] args) { + RestateHttpEndpointBuilder.builder().bind(new AsyncTaskService()).buildAndListen(); + } +} diff --git a/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/queue/TaskSubmitter.java b/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/queue/TaskSubmitter.java new file mode 100644 index 00000000..123cba45 --- /dev/null +++ b/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/queue/TaskSubmitter.java @@ -0,0 +1,42 @@ +package my.example.queue; + +import dev.restate.sdk.JsonSerdes; +import dev.restate.sdk.client.CallRequestOptions; +import dev.restate.sdk.client.Client; +import dev.restate.sdk.client.SendResponse; + +import java.time.Duration; + +/* + * Restate is as a sophisticated task queue, with extra features like: + * - delaying execution and reliable timers + * - stateful tasks + * - queues per key (>< per partition; slow tasks for a key don't block others) + * - retries and recovery upon failures + * + * Every handler in Restate is executed asynchronously and can be treated + * as a reliable asynchronous task. + */ +public class TaskSubmitter { + + private static final String RESTATE_URL = "http://localhost:8080"; + private static final Client restateClient = Client.connect(RESTATE_URL); + + public void submitAndAwaitTasks(AsyncTaskService.TaskOpts taskOpts) { + + // submit the task; similar to publishing a message to a queue + // Restate ensures the task is executed exactly once + SendResponse handle = + AsyncTaskServiceClient.fromClient(restateClient) + // optionally add a delay to execute the task later + .send(/*Duration.ofDays(1)*/) + .runTask( + taskOpts, + // use a stable uuid as an idempotency key + CallRequestOptions.DEFAULT.withIdempotency("dQw4w9WgXcQ") + ); + + // await the handler's result; optionally from another process + String result = restateClient.invocationHandle(handle.getInvocationId(), JsonSerdes.STRING).attach(); + } +} diff --git a/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/sagas/BookingWorkflow.java b/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/sagas/BookingWorkflow.java new file mode 100644 index 00000000..86a4b7e8 --- /dev/null +++ b/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/sagas/BookingWorkflow.java @@ -0,0 +1,83 @@ +package my.example.sagas; + +import dev.restate.sdk.http.vertx.RestateHttpEndpointBuilder; +import my.example.sagas.activities.*; +import dev.restate.sdk.WorkflowContext; +import dev.restate.sdk.annotation.Workflow; +import dev.restate.sdk.common.TerminalException; + +import java.util.ArrayList; +import java.util.List; + +// +// An example of a trip reservation workflow, using the SAGAs pattern to +// undo previous steps in case of an error. +// +// The durable execution's guarantee to run code to the end in the presence +// of failures, and to deterministically recover previous steps from the +// journal, makes sagas easy. +// Every step pushes a compensation action (an undo operation) to a stack. +// in the case of an error, those operations are run. +// +// The main requirement is that steps are implemented as journaled +// operations, like `ctx.run()` or rpc/messaging. +// +// Note: that the compensation logic is purely implemented in the user code +// and runs durably until it completes. +@Workflow +public class BookingWorkflow { + + public record BookingRequest( + Flights.FlightBookingRequest flights, + CarRentals.CarRentalRequest car, + PaymentClient.PaymentInfo paymentInfo + ) {} + + @Workflow + public void run(WorkflowContext ctx, BookingRequest req) throws TerminalException { + // create a list of undo actions + List compensations = new ArrayList<>(); + + try { + // Reserve the flights; Restate remembers the reservation ID + var flightsRpcClient = FlightsClient.fromContext(ctx); + String flightBookingId = flightsRpcClient.reserve(req.flights()).await(); + // Register the undo action for the flight reservation. + compensations.add(() -> flightsRpcClient.cancel(flightBookingId).await()); + + // Reserve the car; Restate remembers the reservation ID + var carRentalRpcClient = CarRentalsClient.fromContext(ctx); + String carBookingId = carRentalRpcClient.reserve(req.car()).await(); + // Register the undo action for the car rental. + compensations.add(() -> carRentalRpcClient.cancel(carBookingId).await()); + + // Charge the payment; Generate a payment ID and store it in Restate + String paymentId = ctx.random().nextUUID().toString(); + // Register the payment refund using the paymentId + compensations.add(() -> ctx.run(() -> PaymentClient.refund(paymentId))); + // Do the payment using the paymentId as idempotency key + ctx.run(() -> PaymentClient.charge(req.paymentInfo(), paymentId)); + + // confirm the flight and car reservations + flightsRpcClient.confirm(flightBookingId).await(); + carRentalRpcClient.confirm(carBookingId).await(); + + } catch (TerminalException e) { + // undo all the steps up to this point by running the compensations + for (Runnable compensation : compensations) { + compensation.run(); + } + + // rethrow error to fail this workflow + throw e; + } + } + + public static void main(String[] args) { + RestateHttpEndpointBuilder.builder() + .bind(new BookingWorkflow()) + .bind(new CarRentals()) + .bind(new Flights()) + .buildAndListen(); + } +} diff --git a/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/sagas/activities/CarRentals.java b/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/sagas/activities/CarRentals.java new file mode 100644 index 00000000..11d4c24c --- /dev/null +++ b/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/sagas/activities/CarRentals.java @@ -0,0 +1,40 @@ +package my.example.sagas.activities; + +import dev.restate.sdk.Context; +import dev.restate.sdk.annotation.Handler; +import dev.restate.sdk.annotation.Service; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.UUID; + +@Service +public class CarRentals { + private static final Logger logger = LogManager.getLogger(CarRentals.class); + + public record CarRentalRequest(String pickupLocation, String rentalDate) {} + + @Handler + public String reserve(Context ctx, CarRentalRequest request) { + // this should implement the communication with the rental + // provider's APIs + // just return a mock random id representing the reservation + String bookingId = UUID.randomUUID().toString(); + logger.info("Car rental reservation created with id: {}", bookingId); + return bookingId; + } + + @Handler + public void confirm(Context ctx, String bookingId) { + // this should implement the communication with the rental + // provider's APIs + logger.info("Car rental reservation confirmed with id: {}", bookingId); + } + + @Handler + public void cancel(Context ctx, String bookingId) { + // this should implement the communication with the rental + // provider's APIs + logger.info("Car rental reservation cancelled with id: {}", bookingId); + } +} diff --git a/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/sagas/activities/Flights.java b/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/sagas/activities/Flights.java new file mode 100644 index 00000000..a28c13d9 --- /dev/null +++ b/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/sagas/activities/Flights.java @@ -0,0 +1,40 @@ +package my.example.sagas.activities; + +import dev.restate.sdk.Context; +import dev.restate.sdk.annotation.Handler; +import dev.restate.sdk.annotation.Service; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.UUID; + +@Service +public class Flights { + private static final Logger logger = LogManager.getLogger(Flights.class); + + public record FlightBookingRequest(String flightId, String passengerName) {} + + @Handler + public String reserve(Context ctx, FlightBookingRequest request) { + // this should implement the communication with the flight + // provider's APIs + // just return a mock random id representing the reservation + String bookingId = UUID.randomUUID().toString(); + logger.info("Flight reservation created with id: {}", bookingId); + return bookingId; + } + + @Handler + public void confirm(Context ctx, String flightBookingId) { + // this should implement the communication with the flight + // provider's APIs + logger.info("Flight reservation confirmed with id: {}", flightBookingId); + } + + @Handler + public void cancel(Context ctx, String flightBookingId) { + // this should implement the communication with the flight + // provider's APIs + logger.info("Flight reservation cancelled with id: {}", flightBookingId); + } +} diff --git a/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/sagas/activities/PaymentClient.java b/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/sagas/activities/PaymentClient.java new file mode 100644 index 00000000..6e7066b7 --- /dev/null +++ b/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/sagas/activities/PaymentClient.java @@ -0,0 +1,36 @@ +package my.example.sagas.activities; + +import dev.restate.sdk.common.TerminalException; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class PaymentClient { + private static final Logger logger = LogManager.getLogger(PaymentClient.class); + + public record PaymentInfo(String cardNumber, Long amount) { } + + public static void charge(PaymentInfo request, String paymentId) { + // This should implement the actual payment processing, or communication + // to the external provider's APIs. + // Here, we just simulate payment failure to show how the compensations run. + + if (Math.random() < 0.5) { + logger.error("This payment should never be accepted! Aborting booking."); + throw new TerminalException("Payment could not be accepted!"); + } + + if (Math.random() < 0.8) { + logger.error("A payment failure happened! Will retry..."); + throw new RuntimeException("A payment failure happened! Will retry..."); + } + + logger.info("Payment with id {} was successful!", paymentId); + } + + public static void refund(String paymentId) { + // refund the payment identified by this paymentId + // this should implement the actual payment processing, or communication + // to the external provider's APIs + logger.info("Refunding payment with id: {}", paymentId); + } +} \ No newline at end of file diff --git a/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/signalspayments/PaymentService.java b/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/signalspayments/PaymentService.java new file mode 100644 index 00000000..25f5d018 --- /dev/null +++ b/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/signalspayments/PaymentService.java @@ -0,0 +1,121 @@ +package my.example.signalspayments; + +import com.stripe.model.Event; +import com.stripe.model.PaymentIntent; +import com.stripe.net.ApiResource; +import dev.restate.sdk.Awakeable; +import dev.restate.sdk.Context; +import dev.restate.sdk.annotation.Accept; +import dev.restate.sdk.annotation.Handler; +import dev.restate.sdk.annotation.Raw; +import dev.restate.sdk.annotation.Service; +import dev.restate.sdk.common.Serde; +import dev.restate.sdk.common.TerminalException; +import dev.restate.sdk.http.vertx.RestateHttpEndpointBuilder; +import my.example.signalspayments.utils.PaymentUtils; +import my.example.signalspayments.utils.StripeUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +// +// The payment handlers that issues calls to Stripe. +// - the result often comes synchronously as a response API call. +// - some requests (and some payment methods) only return "processing" and +// notify later via a webhook. +// +// This example combines both paths in a single function that reliably waits for both +// paths, if needed, thus giving you a single long-running synchronous function. +// Durable execution and the persistent awakeable promises combine this into a single +// reliably promise/async-function. +// +// See README on how to run this example (needs a Stripe test account). +// +@Service +public class PaymentService { + + private static final Logger logger = LogManager.getLogger(PaymentService.class); + private static final StripeUtils stripe = new StripeUtils(); + + private static final Serde paymentIntentSerde = + Serde.using( + intent -> intent.toJson().getBytes(), + bytes -> ApiResource.GSON.fromJson(new String(bytes), PaymentIntent.class)); + + public record PaymentRequest(Long amount, String paymentMethodId, boolean delayedStatus) {} + + @Handler + public void processPayment(Context ctx, PaymentRequest request) { + PaymentUtils.verifyPaymentRequest(request); + + // Generate a deterministic idempotency key + String idempotencyKey = ctx.random().nextUUID().toString(); + + // Initiate a listener for external calls for potential webhook callbacks + Awakeable webhookPromise = ctx.awakeable(paymentIntentSerde); + + // Make a synchronous call to the payment service + PaymentIntent paymentIntent = + ctx.run( + "Stripe call", + paymentIntentSerde, + () -> { + // create payment intent + return stripe.createPaymentIntent( + request.paymentMethodId(), + request.amount(), + idempotencyKey, + webhookPromise.id(), + request.delayedStatus()); + }); + + if (!paymentIntent.getStatus().equals("processing")) { + // The synchronous call to Stripe had already been completed. + // That was fast :) + logger.info("Request {} was processed synchronously!", idempotencyKey); + PaymentUtils.ensureSuccess(paymentIntent.getStatus()); + } + + // We did not get the response on the synchronous path, talking to Stripe. + // No worries, Stripe will let us know when it is done processing via a webhook. + logger.info( + "Synchronous response for {} yielded 'processing', awaiting webhook call...", + idempotencyKey); + + // We will now wait for the webhook call to complete this promise. + // Check out the handler below. + PaymentIntent processedPaymentIntent = webhookPromise.await(); + + logger.info("Received webhook call for idempotency key: {}", idempotencyKey); + PaymentUtils.ensureSuccess(processedPaymentIntent.getStatus()); + } + + @Handler + public boolean processWebhook( + Context ctx, + // The raw request is the webhook call from Stripe that we will verify in the handler + @Accept("*/*") @Raw byte[] request) { + Event event = stripe.parseWebhookCall(request, ctx.request().headers().get("stripe-signature")); + + if (!PaymentUtils.isPaymentIntent(event)) { + logger.info("Unhandled event type: {}", event.getType()); + return true; + } + + PaymentIntent paymentIntent = stripe.parseAsPaymentIntent(event); + logger.info("Received webhook call for payment intent: {}", paymentIntent.toJson()); + + String webhookPromise = paymentIntent.getMetadata().get(PaymentUtils.RESTATE_CALLBACK_ID); + + if (webhookPromise == null) { + throw new TerminalException( + 400, "Missing callback property: " + PaymentUtils.RESTATE_CALLBACK_ID); + } + + ctx.awakeableHandle(webhookPromise).resolve(paymentIntentSerde, paymentIntent); + return true; + } + + public static void main(String[] args) { + RestateHttpEndpointBuilder.builder().bind(new PaymentService()).buildAndListen(); + } +} diff --git a/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/signalspayments/utils/PaymentUtils.java b/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/signalspayments/utils/PaymentUtils.java new file mode 100644 index 00000000..e3df370b --- /dev/null +++ b/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/signalspayments/utils/PaymentUtils.java @@ -0,0 +1,34 @@ +package my.example.signalspayments.utils; + +import com.stripe.model.Event; +import dev.restate.sdk.common.TerminalException; +import my.example.signalspayments.PaymentService; + +public class PaymentUtils { + + public static final String RESTATE_CALLBACK_ID = "restate_callback_id"; + + public static void ensureSuccess(String status) { + switch (status) { + case "succeeded" -> {} + case "requires_payment_method", "canceled" -> + throw new TerminalException("Payment declined" + status); + default -> { + throw new IllegalStateException("Unhandled status: " + status); + } + } + } + + public static boolean isPaymentIntent(Event event) { + return event.getType().startsWith("payment_intent"); + } + + public static void verifyPaymentRequest(PaymentService.PaymentRequest request) { + if (request.amount() <= 0) { + throw new TerminalException("Amount must be larger than zero"); + } + if (request.paymentMethodId() == null) { + throw new TerminalException("Payment method ID missing in request"); + } + } +} diff --git a/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/signalspayments/utils/StripeUtils.java b/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/signalspayments/utils/StripeUtils.java new file mode 100644 index 00000000..511a0b1c --- /dev/null +++ b/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/signalspayments/utils/StripeUtils.java @@ -0,0 +1,93 @@ +package my.example.signalspayments.utils; + +import com.stripe.StripeClient; +import com.stripe.exception.SignatureVerificationException; +import com.stripe.exception.StripeException; +import com.stripe.model.Event; +import com.stripe.model.EventDataObjectDeserializer; +import com.stripe.model.PaymentIntent; +import com.stripe.net.RequestOptions; +import com.stripe.net.Webhook; +import com.stripe.param.PaymentIntentCreateParams; +import com.stripe.param.PaymentIntentCreateParams.ConfirmationMethod; +import dev.restate.sdk.common.TerminalException; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class StripeUtils { + + Logger logger = LogManager.getLogger(StripeUtils.class); + private final String stripeSecretKey = "sk_test_..."; + private final String webhookSecret = "whsec_..."; + private final StripeClient stripe; + + public StripeUtils() { + stripe = StripeClient.builder().setApiKey(stripeSecretKey).build(); + } + + public PaymentIntent createPaymentIntent( + String paymentMethodId, + Long amount, + String idempotencyKey, + String webhookPromiseId, + boolean delayedStatus) { + + try { + PaymentIntent paymentIntent = + stripe + .paymentIntents() + .create( + new PaymentIntentCreateParams.Builder() + .setPaymentMethod(paymentMethodId) + .setAmount(amount) + .setCurrency("USD") + .setConfirm(true) + .setConfirmationMethod(ConfirmationMethod.AUTOMATIC) + .setReturnUrl("https://restate.dev/") + .putMetadata("restate_callback_id", webhookPromiseId) + .build(), + RequestOptions.builder().setIdempotencyKey(idempotencyKey).build()); + + if (delayedStatus) { + paymentIntent.setStatus("processing"); + } + + return paymentIntent; + } catch (StripeException err) { + logger.error("Payment error: " + err.getMessage()); + // Simulate delayed notifications for testing + try { + PaymentIntent paymentIntent = err.getStripeError().getPaymentIntent(); + if (delayedStatus) { + paymentIntent.setStatus("processing"); + return paymentIntent; + } else { + throw new TerminalException( + "Payment declined: " + paymentIntent.getStatus() + " - " + err.getMessage()); + } + } catch (NullPointerException exc) { + throw new TerminalException("Payment error: " + exc.getMessage()); + } + } + } + + public Event parseWebhookCall(byte[] request, String sig) { + try { + return Webhook.constructEvent(new String(request), sig, webhookSecret); + } catch (SignatureVerificationException e) { + throw new TerminalException(400, "Invalid Stripe signature"); + } + } + + public PaymentIntent parseAsPaymentIntent(Event event) { + EventDataObjectDeserializer dataObjectDeserializer = event.getDataObjectDeserializer(); + PaymentIntent paymentIntent = null; + if (dataObjectDeserializer.getObject().isPresent()) { + paymentIntent = (PaymentIntent) dataObjectDeserializer.getObject().get(); + } else { + throw new TerminalException(500, "No Stripe object found in event"); + } + + return paymentIntent; + } +} diff --git a/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/statefulactors/MachineOperator.java b/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/statefulactors/MachineOperator.java new file mode 100644 index 00000000..5d4358c4 --- /dev/null +++ b/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/statefulactors/MachineOperator.java @@ -0,0 +1,63 @@ +package my.example.statefulactors; + +import dev.restate.sdk.JsonSerdes; +import dev.restate.sdk.ObjectContext; +import dev.restate.sdk.annotation.Handler; +import dev.restate.sdk.annotation.VirtualObject; +import dev.restate.sdk.common.StateKey; +import dev.restate.sdk.serde.jackson.JacksonSerdes; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import static my.example.statefulactors.utils.MachineOperations.bringUpMachine; +import static my.example.statefulactors.utils.MachineOperations.tearDownMachine; + +// This is a State Machine implemented with a Virtual Object +// +// - The object holds the state of the state machine and defines the methods +// to transition between the states. +// - The object's unique id identifies the state machine. Many parallel state +// machines exist, but only state machine (object) exists per id. +// - The "single-writer-per-key" characteristic of virtual objects ensures +// that one state transition per state machine is in progress at a time. +// Additional transitions are enqueued for that object, while a transition +// for a machine is still in progress. +@VirtualObject +public class MachineOperator { + + enum Status { UP, DOWN } + private static final StateKey STATUS = StateKey.of("state", JacksonSerdes.of(Status.class)); + + @Handler + public String setUp(ObjectContext ctx) { + String machineId = ctx.key(); + + // Ignore duplicate calls to start + var status = ctx.get(STATUS).orElse(Status.DOWN); + if (status.equals(Status.UP)) { + return machineId + " is already running"; + } + + // Bringing up a machine is a slow process that frequently crashes + bringUpMachine(ctx, machineId); + ctx.set(STATUS, Status.UP); + + return machineId + " is now running"; + } + + @Handler + public String tearDown(ObjectContext ctx) { + String machineId = ctx.key(); + + var status = ctx.get(STATUS).orElse(Status.DOWN); + if (!status.equals(Status.UP)) { + return machineId + " is not up, cannot tear down"; + } + + // Tearing down a machine is a slow process that frequently crashes + tearDownMachine(ctx, machineId); + ctx.set(STATUS, Status.DOWN); + + return machineId + " is now down"; + } +} diff --git a/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/statefulactors/utils/MachineOperations.java b/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/statefulactors/utils/MachineOperations.java new file mode 100644 index 00000000..31f4ca30 --- /dev/null +++ b/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/statefulactors/utils/MachineOperations.java @@ -0,0 +1,43 @@ +package my.example.statefulactors.utils; + +import dev.restate.sdk.Context; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.time.Duration; + +public class MachineOperations { + private static final Logger logger = LogManager.getLogger(MachineOperations.class); + private static final boolean killProcess = System.getenv("CRASH_PROCESS") != null; + + public static void maybeCrash(double probability) { + if (Math.random() < probability) { + logger.error("A failure happened!"); + + if (killProcess) { + logger.error("--- CRASHING THE PROCESS ---"); + System.exit(1); + } else { + throw new RuntimeException("A failure happened!"); + } + } + } + + public static void bringUpMachine(Context ctx, String machineId) { + logger.info("{} beginning transition to UP", machineId); + + // Some long fragile process + maybeCrash(0.4); + ctx.sleep(Duration.ofMillis(5000)); + logger.info("{} is now running", machineId); + } + + public static void tearDownMachine(Context ctx, String machineId) { + logger.info("{} beginning transition to down", machineId); + + // Some long fragile process + maybeCrash(0.4); + ctx.sleep(Duration.ofMillis(5000)); + logger.info("{} is now down", machineId); + } +} diff --git a/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/statemachinepayments/AppMain.java b/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/statemachinepayments/AppMain.java new file mode 100644 index 00000000..244eabd2 --- /dev/null +++ b/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/statemachinepayments/AppMain.java @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2024 - Restate Software, Inc., Restate GmbH + * + * This file is part of the Restate examples, + * which is released under the MIT license. + * + * You can find a copy of the license in the file LICENSE + * in the root directory of this repository or package or at + * https://github.com/restatedev/examples/ + */ +package my.example.statemachinepayments; + +import dev.restate.sdk.http.vertx.RestateHttpEndpointBuilder; +import my.example.statemachinepayments.accounts.Account; + +public class AppMain { + + public static void main(String[] args) { + RestateHttpEndpointBuilder.builder() + .bind(new Account()) + .bind(new PaymentProcessor()) + .buildAndListen(); + } +} diff --git a/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/statemachinepayments/PaymentProcessor.java b/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/statemachinepayments/PaymentProcessor.java new file mode 100644 index 00000000..33fa3867 --- /dev/null +++ b/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/statemachinepayments/PaymentProcessor.java @@ -0,0 +1,100 @@ +package my.example.statemachinepayments; + +import static my.example.statemachinepayments.types.PaymentStatus.*; +import static my.example.statemachinepayments.types.PaymentStatus.*; + +import dev.restate.sdk.ObjectContext; +import dev.restate.sdk.annotation.Handler; +import dev.restate.sdk.annotation.VirtualObject; +import dev.restate.sdk.common.StateKey; +import dev.restate.sdk.serde.jackson.JacksonSerdes; +import my.example.statemachinepayments.accounts.AccountClient; +import my.example.statemachinepayments.types.*; + +import java.time.Duration; + +/* + * A service that processes the payment requests. + * + * This is implemented as a virtual object to ensure that only one concurrent request can happen + * per payment-id. Requests are queued and processed sequentially per id. + * + * Methods can be called multiple times with the same payment-id, but payment will be executed + * only once. If a 'cancelPayment' is called for an id, the payment will either be undone, or + * blocked from being made in the future, depending on whether the cancel call comes before or after + * the 'makePayment' call. + */ +@VirtualObject +public class PaymentProcessor { + + /** The key under which we store the status. */ + private static final StateKey STATUS = + StateKey.of("status", JacksonSerdes.of(PaymentStatus.class)); + + /** The key under which we store the original payment request. */ + private static final StateKey PAYMENT = + StateKey.of("payment", JacksonSerdes.of(Payment.class)); + + private static final Duration EXPIRY_TIMEOUT = Duration.ofDays(1); + + @Handler + public Result makePayment(ObjectContext ctx, Payment payment) { + final String paymentId = ctx.key(); + final PaymentStatus status = ctx.get(STATUS).orElse(NEW); + + if (status == CANCELLED) { + return new Result(false, "Payment already cancelled"); + } + if (status == COMPLETED_SUCCESSFULLY) { + return new Result(false, "Payment already completed in prior call"); + } + + // Charge the target account + Result paymentResult = + AccountClient.fromContext(ctx, payment.getAccountId()) + .withdraw(payment.getAmountCents()) + .await(); + + // Remember only on success, so that on failure (when we didn't charge) the external + // caller may retry this (with the same payment-id), for the sake of this example + if (paymentResult.isSuccess()) { + ctx.set(STATUS, COMPLETED_SUCCESSFULLY); + ctx.set(PAYMENT, payment); + PaymentProcessorClient.fromContext(ctx, paymentId).send(EXPIRY_TIMEOUT).expire(); + } + + return paymentResult; + } + + @Handler + public void cancelPayment(ObjectContext ctx) { + PaymentStatus status = ctx.get(STATUS).orElse(NEW); + + switch (status) { + case NEW -> { + // not seen this payment-id before, mark as canceled, in case the cancellation + // overtook the actual payment request (on the external caller's side) + ctx.set(STATUS, CANCELLED); + PaymentProcessorClient.fromContext(ctx, ctx.key()).send(EXPIRY_TIMEOUT).expire(); + } + + case CANCELLED -> {} + + case COMPLETED_SUCCESSFULLY -> { + // remember this as cancelled + ctx.set(STATUS, CANCELLED); + + // undo the payment + Payment payment = ctx.get(PAYMENT).get(); + AccountClient.fromContext(ctx, payment.getAccountId()) + .send() + .deposit(payment.getAmountCents()); + } + } + } + + @Handler + public void expire(ObjectContext ctx) { + ctx.clearAll(); + } +} diff --git a/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/statemachinepayments/accounts/Account.java b/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/statemachinepayments/accounts/Account.java new file mode 100644 index 00000000..ee01c212 --- /dev/null +++ b/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/statemachinepayments/accounts/Account.java @@ -0,0 +1,49 @@ +package my.example.statemachinepayments.accounts; + +import dev.restate.sdk.JsonSerdes; +import dev.restate.sdk.ObjectContext; +import dev.restate.sdk.annotation.Handler; +import dev.restate.sdk.annotation.VirtualObject; +import dev.restate.sdk.common.StateKey; +import dev.restate.sdk.common.TerminalException; +import my.example.statemachinepayments.types.Result; + +// +// A simple virtual object, to track accounts. +// This is for simplicity to make this example work self-contained. +// This should be a database in a real scenario +// +@VirtualObject +public class Account { + + private static final StateKey BALANCE = StateKey.of("balance", JsonSerdes.LONG); + + @Handler + public void deposit(ObjectContext ctx, Long amountCents) { + if (amountCents <= 0) { + throw new TerminalException("Amount must be greater than 0"); + } + + long balanceCents = ctx.get(BALANCE).orElse(initializeRandomAmount()); + ctx.set(BALANCE, balanceCents + amountCents); + } + + @Handler + public Result withdraw(ObjectContext ctx, Long amountCents) { + if (amountCents <= 0) { + throw new TerminalException("Amount must be greater than 0"); + } + + long balanceCents = ctx.get(BALANCE).orElse(initializeRandomAmount()); + if (balanceCents < amountCents) { + return new Result(false, "Insufficient funds: " + balanceCents + " cents"); + } + + ctx.set(BALANCE, balanceCents - amountCents); + return new Result(true, "Withdrawal successful"); + } + + private long initializeRandomAmount() { + return (long) (Math.random() * 100_000 + 100_000); + } +} diff --git a/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/statemachinepayments/types/Payment.java b/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/statemachinepayments/types/Payment.java new file mode 100644 index 00000000..3c01d522 --- /dev/null +++ b/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/statemachinepayments/types/Payment.java @@ -0,0 +1,28 @@ +package my.example.statemachinepayments.types; + +import dev.restate.sdk.common.TerminalException; + +public class Payment { + private final String accountId; + private final long amountCents; + + public Payment(String accountId, long amountCents) { + if (accountId == null || accountId.isEmpty()) { + throw new TerminalException("Account ID is required"); + } + if (amountCents <= 0) { + throw new TerminalException("Amount must be greater than 0"); + } + + this.accountId = accountId; + this.amountCents = amountCents; + } + + public String getAccountId() { + return accountId; + } + + public long getAmountCents() { + return amountCents; + } +} diff --git a/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/statemachinepayments/types/PaymentStatus.java b/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/statemachinepayments/types/PaymentStatus.java new file mode 100644 index 00000000..ffe4e3ed --- /dev/null +++ b/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/statemachinepayments/types/PaymentStatus.java @@ -0,0 +1,7 @@ +package my.example.statemachinepayments.types; + +public enum PaymentStatus { + NEW, + COMPLETED_SUCCESSFULLY, + CANCELLED; +} diff --git a/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/statemachinepayments/types/Result.java b/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/statemachinepayments/types/Result.java new file mode 100644 index 00000000..a455d163 --- /dev/null +++ b/java/patterns-use-cases/patterns-use-cases/src/main/java/my/example/statemachinepayments/types/Result.java @@ -0,0 +1,24 @@ +package my.example.statemachinepayments.types; + +public class Result { + private final boolean success; + private final String reason; + + public Result(boolean success, String reason) { + this.success = success; + this.reason = reason; + } + + public boolean isSuccess() { + return success; + } + + public String getReason() { + return reason; + } + + @Override + public String toString() { + return "Result{" + "success=" + success + ", reason='" + reason + '\'' + '}'; + } +} diff --git a/java/patterns-use-cases/async-tasks-parallelize-work/src/main/resources/log4j2.properties b/java/patterns-use-cases/patterns-use-cases/src/main/resources/log4j2.properties similarity index 100% rename from java/patterns-use-cases/async-tasks-parallelize-work/src/main/resources/log4j2.properties rename to java/patterns-use-cases/patterns-use-cases/src/main/resources/log4j2.properties diff --git a/typescript/patterns-use-cases/async-tasks-queue/src/task_submitter.ts b/typescript/patterns-use-cases/async-tasks-queue/src/task_submitter.ts index d7863730..2e88cc36 100644 --- a/typescript/patterns-use-cases/async-tasks-queue/src/task_submitter.ts +++ b/typescript/patterns-use-cases/async-tasks-queue/src/task_submitter.ts @@ -25,8 +25,8 @@ async function submitAndAwaitTask(task: TaskOpts) { .runTask( task, // use a stable uuid as an idempotency key - // optionally, execute the task later via SendOpts.from({ delay: 1000 }) - SendOpts.from({ idempotencyKey: task.id }) + // optionally, execute the task later by adding a delay + SendOpts.from({ idempotencyKey: task.id, /*delay: 1000*/ }) ); // await the task's result diff --git a/typescript/patterns-use-cases/microservices-stateful-actors/src/machine_management.ts b/typescript/patterns-use-cases/microservices-stateful-actors/src/machine_management.ts index a15aefad..689f2e3c 100644 --- a/typescript/patterns-use-cases/microservices-stateful-actors/src/machine_management.ts +++ b/typescript/patterns-use-cases/microservices-stateful-actors/src/machine_management.ts @@ -1,5 +1,5 @@ import * as restate from "@restatedev/restate-sdk"; -import {bringUpMachine, State, tearDownMachine} from "./utils/utils"; +import {bringUpMachine, Status, tearDownMachine} from "./utils/utils"; // This is a State Machine implemented with a Virtual Object // @@ -19,36 +19,31 @@ const machineManagement = restate.object({ const machineId = ctx.key; // Ignore duplicate calls to 'setUp' - const state = await ctx.get("state"); - if (state === State.UP) { + const status = await ctx.get("status"); + if (status === Status.UP) { return `${machineId} is already up, so nothing to do`; } // Bringing up a machine is a slow process that frequently crashes - ctx.console.info(`Beginning transition of ${machineId} to UP`); await bringUpMachine(ctx, machineId); + ctx.set("status", Status.UP); - ctx.console.info(`Done transitioning ${machineId} to UP`); - ctx.set("state", State.UP); return `${machineId} is now up`; }, tearDown: async (ctx: restate.ObjectContext) => { const machineId = ctx.key; - const state = await ctx.get("state"); - if (state !== State.UP) { - ctx.console.info(`${machineId} is not UP, cannot tear down`); - return `${machineId} is not yet UP`; + const status = await ctx.get("status"); + if (status !== Status.UP) { + return `${machineId} is not up, cannot tear down`; } // Tearing down a machine is a slow process that frequently crashes - ctx.console.info(`Beginning transition of ${machineId} to DOWN`); await tearDownMachine(ctx, machineId); + ctx.set("status", Status.DOWN); - ctx.console.info(`Done transitioning ${machineId} to DOWN`); - ctx.set("state", State.DOWN); - return `${machineId} is now DOWN`; + return `${machineId} is now down`; }, }, }); diff --git a/typescript/patterns-use-cases/microservices-stateful-actors/src/utils/utils.ts b/typescript/patterns-use-cases/microservices-stateful-actors/src/utils/utils.ts index a4e5001d..d69ec261 100644 --- a/typescript/patterns-use-cases/microservices-stateful-actors/src/utils/utils.ts +++ b/typescript/patterns-use-cases/microservices-stateful-actors/src/utils/utils.ts @@ -1,18 +1,22 @@ import * as restate from "@restatedev/restate-sdk"; -export enum State { +export enum Status { UP = "UP", DOWN = "DOWN", } export async function bringUpMachine(ctx: restate.Context, machineId: string){ + ctx.console.info(`Beginning transition of ${machineId} to up`); maybeCrash(0.4); await ctx.sleep(5000); + ctx.console.info(`Done transitioning ${machineId} to up`); } export async function tearDownMachine(ctx: restate.Context, machineId: string){ + ctx.console.info(`Beginning transition of ${machineId} to down`); maybeCrash(0.4); await ctx.sleep(5000); + ctx.console.info(`Done transitioning ${machineId} to down`); } const killProcess: boolean = Boolean(process.env.CRASH_PROCESS);