Skip to content

Commit

Permalink
Fix sagas in TypeScript and Java
Browse files Browse the repository at this point in the history
  • Loading branch information
gvdongen committed Dec 18, 2024
1 parent 1c4670e commit fc4e1ae
Show file tree
Hide file tree
Showing 17 changed files with 316 additions and 188 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ Build and start the example
./gradlew run
```

Register the services: `restate dep reg localhost:9080`
Register the services: `restate deployments register localhost:9080`

Make some requests:

Expand All @@ -45,3 +45,22 @@ Make some requests:
```shell
curl -X POST localhost:8080/PaymentProcessor/some-string-id/cancelPayment
```

- Have a look at the state:
```shell
restate kv get PaymentProcessor some-string-id
```
```
🤖 State:
―――――――――
Service PaymentProcessor
Key some-string-id
KEY VALUE
payment {
"accountId": "abc",
"amountCents": 100
}
status "CANCELLED"
```
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ dependencies {
implementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.16.1")

// Logging (optional)
implementation("org.apache.logging.log4j:log4j-core:2.20.0")
implementation("org.apache.logging.log4j:log4j-core:2.24.1")
}

// Set main class
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,3 @@
/*
* Copyright (c) 2024 - Restate Software, Inc., Restate GmbH
*
* This file is part of the Restate examples,
* which is released under the MIT license.
*
* You can find a copy of the license in the file LICENSE
* in the root directory of this repository or package or at
* https://github.com/restatedev/examples/
*/

package my.example;

import static my.example.types.PaymentStatus.*;
Expand All @@ -24,13 +13,13 @@
import my.example.types.PaymentStatus;
import my.example.types.Result;

/**
/*
* A service that processes the payment requests.
*
* <p>This is implemented as a virtual object to ensure that only one concurrent request can happen
* This is implemented as a virtual object to ensure that only one concurrent request can happen
* per payment-id. Requests are queued and processed sequentially per id.
*
* <p>Methods can be called multiple times with the same payment-id, but payment will be executed
* Methods can be called multiple times with the same payment-id, but payment will be executed
* only once. If a 'cancelPayment' is called for an id, the payment will either be undone, or
* blocked from being made in the future, depending on whether the cancel call comes before or after
* the 'makePayment' call.
Expand Down
94 changes: 78 additions & 16 deletions java/patterns-use-cases/microservices-sagas/README.md
Original file line number Diff line number Diff line change
@@ -1,16 +1,78 @@
# SAGAs / Compensations

An example of a trip reservation workflow, using the SAGAs pattern to
undo previous steps in case of an error.
This is a minimal version of the holiday reservation demo in the
[Restate Holiday Repository](https://github.com/restatedev/restate-holiday).

Durable Execution's guarantee to run code to the end in the presence
of failures, and to deterministically recover previous steps from the
journal, makes SAGAs easy.
Every step pushes a compensation action (an undo operation) to a stack.
in the case of an error, those operations are run.

The main requirement is that steps are implemented as journalled
operations, like `ctx.run()` (e.g., direct calls to 3rd party APIs) or
RPCs to other Restate-backed services (e.g., `FlightsClient.fromContex(ctx)`).
# Microservices: Sagas

An example of a trip reservation workflow, using the saga pattern to undo previous steps in case of an error.

Durable Execution's guarantee to run code to the end in the presence of failures, and to deterministically recover previous steps from the journal, makes sagas easy.
Every step pushes a compensation action (an undo operation) to a stack. In the case of an error, those operations are run.

The main requirement is that steps are implemented as journaled operations, like `ctx.run()` or RPC/messaging.

## Adding compensations
The example shows two ways you can implement the compensation, depending on the characteristics of the API/system you interact with.

The flight and car reservations work in a two-phase commit way, where you first create a reservation, get a reservation ID back, and then confirm or cancel the reservation with its ID.
In this case, you need to add the compensation to the list after creating the reservation, because you need the reservation ID to cancel it.
If the failure happens while making the reservation, you can be sure that it never takes effect, because you didn't confirm it.

The payment on the other hand uses a client generated idempotency key.
The payment goes through in one shot (single API call).
If we receive an error, we might not be sure if this occurred before or after the payment took effect.
Therefore, we need to add the compensation to the list before the payment is made.
If a failure happens during the payment, the compensation will run.
The downstream API then uses the idempotency key to check if the payment went through, and whether it needs to be refunded.

Note that the compensating action needs to be idempotent.

## Running the examples

1. [Start the Restate Server](https://docs.restate.dev/develop/local_dev) in a separate shell:
`restate-server`

2. Start the service: `./gradlew run`

3. Register the example at Restate server by calling
`restate -y deployment register localhost:9080`

## Demo scenario

Have a look at the logs to see how the compensations run in case of a terminal error.

Start the workflow:
```shell
curl -X POST localhost:8080/BookingWorkflow/trip12883/run -H 'content-type: application/json' -d '{
"flights": {
"flightId": "12345",
"passengerName": "John Doe"
},
"car": {
"pickupLocation": "Airport",
"rentalDate": "2024-12-16"
},
"paymentInfo": {
"cardNumber": "4111111111111111",
"amount": 1500
}
}'
```

Have a look at the logs to see the cancellations of the flight and car booking in case of a terminal error:
```shell
2024-12-18 11:35:48 INFO [BookingWorkflow/run][inv_12ogPnVefk1c3clc9wNhEa4pMxxRh9IRyx] dev.restate.sdk.core.InvocationStateMachine - Start invocation
2024-12-18 11:35:49 INFO [Flights/reserve][inv_1ccelXW8IxuW6QpLWQu9ykt5aMAqRTl7pL] dev.restate.sdk.core.InvocationStateMachine - Start invocation
2024-12-18 11:35:49 INFO [Flights/reserve][inv_1ccelXW8IxuW6QpLWQu9ykt5aMAqRTl7pL] dev.restate.patterns.activities.Flights - Flight reservation created with id: 35ab7c68-6f32-48f6-adb9-a2a74076f4df
2024-12-18 11:35:49 INFO [Flights/reserve][inv_1ccelXW8IxuW6QpLWQu9ykt5aMAqRTl7pL] dev.restate.sdk.core.InvocationStateMachine - End invocation
2024-12-18 11:35:49 INFO [CarRentals/reserve][inv_13cgaqr4XecK2ztj72BfVPuscdL1SJwMCZ] dev.restate.sdk.core.InvocationStateMachine - Start invocation
2024-12-18 11:35:49 INFO [CarRentals/reserve][inv_13cgaqr4XecK2ztj72BfVPuscdL1SJwMCZ] dev.restate.patterns.activities.CarRentals - Car rental reservation created with id: c103022e-9dda-4a34-a6ef-0c95d2911b2c
2024-12-18 11:35:49 INFO [CarRentals/reserve][inv_13cgaqr4XecK2ztj72BfVPuscdL1SJwMCZ] dev.restate.sdk.core.InvocationStateMachine - End invocation
2024-12-18 11:35:49 ERROR [BookingWorkflow/run][inv_12ogPnVefk1c3clc9wNhEa4pMxxRh9IRyx] dev.restate.patterns.clients.PaymentClient - This payment should never be accepted! Aborting booking.
2024-12-18 11:35:49 INFO [Flights/cancel][inv_19STR0U1v5Xo5W2UsYS3rhZEI02VGDVJM5] dev.restate.sdk.core.InvocationStateMachine - Start invocation
2024-12-18 11:35:49 INFO [Flights/cancel][inv_19STR0U1v5Xo5W2UsYS3rhZEI02VGDVJM5] dev.restate.patterns.activities.Flights - Flight reservation cancelled with id: 35ab7c68-6f32-48f6-adb9-a2a74076f4df
2024-12-18 11:35:49 INFO [Flights/cancel][inv_19STR0U1v5Xo5W2UsYS3rhZEI02VGDVJM5] dev.restate.sdk.core.InvocationStateMachine - End invocation
2024-12-18 11:35:49 INFO [CarRentals/cancel][inv_14PS98BWOeNn1zw3yn2RqJ0wSp7V5sEJMd] dev.restate.sdk.core.InvocationStateMachine - Start invocation
2024-12-18 11:35:49 INFO [CarRentals/cancel][inv_14PS98BWOeNn1zw3yn2RqJ0wSp7V5sEJMd] dev.restate.patterns.activities.CarRentals - Car rental reservation cancelled with id: c103022e-9dda-4a34-a6ef-0c95d2911b2c
2024-12-18 11:35:49 INFO [CarRentals/cancel][inv_14PS98BWOeNn1zw3yn2RqJ0wSp7V5sEJMd] dev.restate.sdk.core.InvocationStateMachine - End invocation
2024-12-18 11:35:49 INFO [BookingWorkflow/run][inv_12ogPnVefk1c3clc9wNhEa4pMxxRh9IRyx] dev.restate.patterns.clients.PaymentClient - Refunding payment with id: 1a640cda-bd5f-9751-b6b9-274817549b58
2024-12-18 11:35:49 WARN [BookingWorkflow/run][inv_12ogPnVefk1c3clc9wNhEa4pMxxRh9IRyx] dev.restate.sdk.core.ResolvedEndpointHandlerImpl - Error when processing the invocation
dev.restate.sdk.common.TerminalException: Payment could not be accepted!
... rest of trace ...
```
12 changes: 10 additions & 2 deletions java/patterns-use-cases/microservices-sagas/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import java.net.URI

plugins {
java
application
Expand All @@ -23,3 +21,13 @@ dependencies {
// Logging (optional)
implementation("org.apache.logging.log4j:log4j-core:2.24.1")
}

application {
mainClass.set("dev.restate.patterns.AppMain")
}

java {
toolchain {
languageVersion.set(JavaLanguageVersion.of(17))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package dev.restate.patterns;

import dev.restate.patterns.activities.CarRentals;
import dev.restate.patterns.activities.Flights;
import dev.restate.sdk.http.vertx.RestateHttpEndpointBuilder;

public class AppMain {
public static void main(String[] args) {
RestateHttpEndpointBuilder.builder()
.bind(new BookingWorkflow())
.bind(new CarRentals())
.bind(new Flights())
.buildAndListen();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,99 +12,70 @@
package dev.restate.patterns;

import dev.restate.patterns.activities.*;
import dev.restate.patterns.clients.PaymentClient;
import dev.restate.patterns.types.BookingRequest;
import dev.restate.sdk.WorkflowContext;
import dev.restate.sdk.annotation.Workflow;
import dev.restate.sdk.common.TerminalException;

import java.util.ArrayList;
import java.util.List;

//
// SAGAs / Compensations
//
// An example of a trip reservation workflow, using the SAGAs pattern to
// undo previous steps in case of an error.
//
// Durable Execution's guarantee to run code to the end in the presence
// The durable execution's guarantee to run code to the end in the presence
// of failures, and to deterministically recover previous steps from the
// journal, makes SAGAs easy.
// journal, makes sagas easy.
// Every step pushes a compensation action (an undo operation) to a stack.
// in the case of an error, those operations are run.
//
// The main requirement is that steps are implemented as journalled
// operations, like `ctx.run()` or RPC calls/messages executed
// through the Restate Context.
// The main requirement is that steps are implemented as journaled
// operations, like `ctx.run()` or rpc/messaging.
//

/**
* Trip reservation workflow which has been instrumented with compensations. The workflow tries to
* reserve the flight and the car rental before it processes the payment. If at any point one of
* the calls fails or gets cancelled, then the trip reservation workflow will undo all
* successfully completed steps by running the compensations.
*
* <p>Note: that the compensation logic is purely implemented in the user code and runs durably
* until it completes. Moreover, an invocation failure and an invocation cancellation are handled
* in the exact same way by the caller.
*/
// Note: that the compensation logic is purely implemented in the user code
// and runs durably until it completes.
@Workflow
public class BookingWorkflow {

// The workflow parameters, like the car and flight to book, the
// payment details (card/token, amount, ...)
public record TravelBookingRequest( /* car, flights, payment info, ... */ ) { }

@Workflow
public void run(WorkflowContext context, TravelBookingRequest request) throws TerminalException {
// Create a list of compensations to run in case of a failure or cancellation.
final List<Runnable> compensations = new ArrayList<>();
public void run(WorkflowContext ctx, BookingRequest req) throws TerminalException {
// create a list of undo actions
List<Runnable> compensations = new ArrayList<>();

try {
// Reserve the flights and let Restate remember the reservation ID
final var flightsRpcClient = FlightsClient.fromContext(context);
final String flightReservationId =
flightsRpcClient
.reserve(new Flights.FlightBookingRequest(request))
.await();
// Register the compensation to undo the flight reservation.
compensations.add(() -> flightsRpcClient.cancel(flightReservationId).await());
// Reserve the flights; Restate remembers the reservation ID
var flightsRpcClient = FlightsClient.fromContext(ctx);
String flightBookingId = flightsRpcClient.reserve(req.flights()).await();
// Register the undo action for the flight reservation.
compensations.add(() -> flightsRpcClient.cancel(flightBookingId).await());

// Reserve the car and let Restate remember the reservation ID
final var carRentalRpcClient = CarRentalsClient.fromContext(context);
final String carReservationId =
carRentalRpcClient
.reserve(new CarRentals.CarRentalRequest(request))
.await();
// Register the compensation to undo the car rental reservation.
compensations.add(() -> carRentalRpcClient.cancel(carReservationId).await());
// Reserve the car; Restate remembers the reservation ID
var carRentalRpcClient = CarRentalsClient.fromContext(ctx);
String carBookingId = carRentalRpcClient.reserve(req.car()).await();
// Register the undo action for the car rental.
compensations.add(() -> carRentalRpcClient.cancel(carBookingId).await());

// call the payment service to make the payment and let Restate remember
// the payment ID
final var paymentRpcClient = PaymentClient.fromContext(context);
final String paymentId =
paymentRpcClient
.process(new Payment.PaymentRequest(request))
.await();
// Register the compensation to undo the payment.
compensations.add(() -> paymentRpcClient.refund(paymentId).await());
// Charge the payment; Generate a payment ID and store it in Restate
String paymentId = ctx.random().nextUUID().toString();
// Register the payment refund using the paymentId
compensations.add(() -> ctx.run(() -> PaymentClient.refund(paymentId)));
// Do the payment using the paymentId as idempotency key
ctx.run(() -> PaymentClient.charge(req.paymentInfo(), paymentId));

// confirm the reserved flight / rental
// failures here will still trigger the SAGA compensations
flightsRpcClient.confirm(flightReservationId).await();
carRentalRpcClient.confirm(carReservationId).await();
// confirm the flight and car reservations
flightsRpcClient.confirm(flightBookingId).await();
carRentalRpcClient.confirm(carBookingId).await();

} catch (TerminalException e) {

// Run the compensations
// undo all the steps up to this point by running the compensations
for (Runnable compensation : compensations) {
compensation.run();
}

// rethrow error to fail this workflow
throw new TerminalException(
e.getCode(),
String.format(
"Failed to reserve the trip: %s. Ran %d compensations.",
e.getMessage(), compensations.size()));
throw e;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,34 +1,40 @@
package dev.restate.patterns.activities;

import dev.restate.patterns.BookingWorkflow.TravelBookingRequest;
import dev.restate.sdk.Context;
import dev.restate.sdk.annotation.Handler;
import dev.restate.sdk.annotation.Service;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.UUID;

@Service
public class CarRentals {
private static final Logger logger = LogManager.getLogger(CarRentals.class);

public record CarRentalRequest(TravelBookingRequest req /* rental details */) {}
public record CarRentalRequest(String pickupLocation, String rentalDate) {}

@Handler
public String reserve(Context ctx, CarRentalRequest request) {
// this should implement the communication with the rental
// provider's APIs
// just return a mock random id representing the reservation
return "car-" + UUID.randomUUID().toString();
String bookingId = UUID.randomUUID().toString();
logger.info("Car rental reservation created with id: {}", bookingId);
return bookingId;
}

@Handler
public void confirm(Context ctx, String carRentalBookingId) {
public void confirm(Context ctx, String bookingId) {
// this should implement the communication with the rental
// provider's APIs
logger.info("Car rental reservation confirmed with id: {}", bookingId);
}

@Handler
public void cancel(Context ctx, String carRentalBookingId) {
public void cancel(Context ctx, String bookingId) {
// this should implement the communication with the rental
// provider's APIs
logger.info("Car rental reservation cancelled with id: {}", bookingId);
}
}
Loading

0 comments on commit fc4e1ae

Please sign in to comment.