From c6a42544fb081844bbf4abb81d28accbd06c1e32 Mon Sep 17 00:00:00 2001 From: Giselle van Dongen Date: Thu, 12 Dec 2024 22:17:01 +0100 Subject: [PATCH] Improve typescript examples for basics and sagas --- typescript/basics/src/1_durable_execution.ts | 9 +-- .../basics/src/2_durable_execution_sagas.ts | 22 ++---- typescript/basics/src/3_workflows.ts | 6 +- typescript/basics/src/4_virtual_objects.ts | 7 +- typescript/basics/src/utils/stubs.ts | 6 +- .../microservices-sagas/README.md | 4 +- .../src/booking_workflow.ts | 71 ++++++++++++++++++ .../src/services/payments.ts | 2 +- .../microservices-sagas/src/workflow_saga.ts | 75 ------------------- 9 files changed, 90 insertions(+), 112 deletions(-) create mode 100644 typescript/patterns-use-cases/microservices-sagas/src/booking_workflow.ts delete mode 100644 typescript/patterns-use-cases/microservices-sagas/src/workflow_saga.ts diff --git a/typescript/basics/src/1_durable_execution.ts b/typescript/basics/src/1_durable_execution.ts index 5a00d980..a069bf84 100644 --- a/typescript/basics/src/1_durable_execution.ts +++ b/typescript/basics/src/1_durable_execution.ts @@ -31,20 +31,17 @@ restate // Parameters are durable across retries const { userId, creditCard, subscriptions } = req; - // 1. Generate an idempotency key - // This value is retained after a failure - const paymentId = ctx.rand.uuidv4(); + // Recoverable after failures + const stableIdempotencyKey = ctx.rand.uuidv4(); - // 2. Create a recurring payment via API call // Retried in case of timeouts, API downtime, etc. const { success } = await ctx.run(() => - createRecurringPayment(userId, creditCard, paymentId), + createRecurringPayment(creditCard, stableIdempotencyKey), ); if (!success) { return; } - // 3. Create subscriptions via API calls // Persists successful subscriptions and skip them on retries for (const subscription of subscriptions) { await ctx.run(() => createSubscription(userId, subscription)); diff --git a/typescript/basics/src/2_durable_execution_sagas.ts b/typescript/basics/src/2_durable_execution_sagas.ts index 4e0f79b4..4d898ed9 100644 --- a/typescript/basics/src/2_durable_execution_sagas.ts +++ b/typescript/basics/src/2_durable_execution_sagas.ts @@ -24,38 +24,26 @@ restate add: async (ctx: restate.Context, req: SubscriptionRequest) => { const { userId, creditCard, subscriptions } = req; - // We will add the undo actions to this list const compensations = []; try { const paymentId = ctx.rand.uuidv4(); // Register compensating actions for steps that need to be undone in case of a terminal error compensations.push(() => removeRecurringPayment(paymentId)); - const { success } = await ctx.run(() => - createRecurringPayment(userId, creditCard, paymentId), + const res = await ctx.run(() => + createRecurringPayment(creditCard, paymentId), ); - if (!success) { + if (!res.success) { return; } for (const subscription of subscriptions) { - // Register compensating actions for the subscriptions, to run in case of a terminal error - compensations.push(() => - removeSubscription(userId, subscription), - ); - const result = await ctx.run(() => - createSubscription(userId, subscription), - ); - // If the subscription already exists, then revert the payment and other subscriptions - // and surface the error to the user - if (result == "ALREADY_EXISTS") { - throw new restate.TerminalError("Duplicate subscription"); - } + compensations.push(() => removeSubscription(userId, subscription)); + await ctx.run(() => createSubscription(userId, subscription)); } } catch (err) { // On TerminalError, Restate runs compensations without retrying. // On other errors, Restate does not run compensations but retries from the last successful operation. if (err instanceof restate.TerminalError) { - console.error(">>> Terminal error occurred. Running compensations."); for (const compensation of compensations.reverse()) { await ctx.run(compensation); } diff --git a/typescript/basics/src/3_workflows.ts b/typescript/basics/src/3_workflows.ts index 046e70ee..0f991f18 100644 --- a/typescript/basics/src/3_workflows.ts +++ b/typescript/basics/src/3_workflows.ts @@ -18,7 +18,7 @@ import { createUserEntry, sendEmailWithLink } from "./utils/stubs"; // - Additional methods interact with the workflow. // Each workflow instance has a unique ID and runs only once (to success or failure). // -const singupWorkflow = restate.workflow({ +const signupWorkflow = restate.workflow({ name: "usersignup", handlers: { // --- The workflow logic --- @@ -50,9 +50,9 @@ const singupWorkflow = restate.workflow({ }, }); -export type SignupApi = typeof singupWorkflow; +export type SignupApi = typeof signupWorkflow; -restate.endpoint().bind(singupWorkflow).listen(9080); +restate.endpoint().bind(signupWorkflow).listen(9080); // or .handler() to run on Lambda, Deno, Bun, Cloudflare Workers, ... /* diff --git a/typescript/basics/src/4_virtual_objects.ts b/typescript/basics/src/4_virtual_objects.ts index 92684214..4908986f 100644 --- a/typescript/basics/src/4_virtual_objects.ts +++ b/typescript/basics/src/4_virtual_objects.ts @@ -23,12 +23,9 @@ import * as restate from "@restatedev/restate-sdk"; const greeterObject = restate.object({ name: "greeter", handlers: { - greet: async ( - ctx: restate.ObjectContext, - req: { greeting: string }, - ) => { + greet: async (ctx: restate.ObjectContext, req: { greeting: string }) => { // Access the state attached to this object (this 'name') - // State access and updates are exclusive and consistent with the invocations + // State access and updates are exclusive and consistent with the execution progress. let count = (await ctx.get("count")) ?? 0; count++; ctx.set("count", count); diff --git a/typescript/basics/src/utils/stubs.ts b/typescript/basics/src/utils/stubs.ts index 694834f7..0d518e00 100644 --- a/typescript/basics/src/utils/stubs.ts +++ b/typescript/basics/src/utils/stubs.ts @@ -9,6 +9,8 @@ * https://github.com/restatedev/examples/blob/main/LICENSE */ +import * as restate from "@restatedev/restate-sdk"; + /** * Utility to let the service crash with a probability to show how the system recovers. */ @@ -43,7 +45,8 @@ export function createSubscription( console.log(`>>> Creating subscription ${subscription} for user ${userId}`); if (Math.random() < 0.5) { - return "ALREADY_EXISTS"; + console.error("Duplicate subscription."); + throw new restate.TerminalError("Duplicate subscription"); } return "SUCCESS"; @@ -53,7 +56,6 @@ export function createSubscription( * Simulates calling a payment API, with a random probability of API downtime. */ export function createRecurringPayment( - userId: string, creditCard: string, paymentId: any, ): { success: boolean } { diff --git a/typescript/patterns-use-cases/microservices-sagas/README.md b/typescript/patterns-use-cases/microservices-sagas/README.md index 779881b5..8364fe7d 100644 --- a/typescript/patterns-use-cases/microservices-sagas/README.md +++ b/typescript/patterns-use-cases/microservices-sagas/README.md @@ -2,8 +2,6 @@ An example of a trip reservation workflow, using the SAGAs pattern to undo previous steps in case of an error. -This is a minimal version of the holiday reservation demo in the -[Restate Holiday Repository](https://github.com/restatedev/restate-holiday). Durable Execution's guarantee to run code to the end in the presence of failures, and to deterministically recover previous steps from the @@ -12,4 +10,4 @@ Every step pushes a compensation action (an undo operation) to a stack. in the case of an error, those operations are run. The main requirement is that steps are implemented as journald -operations, like `ctx.sideEffect()` or rpc/messaging. +operations, like `ctx.run()` or rpc/messaging. diff --git a/typescript/patterns-use-cases/microservices-sagas/src/booking_workflow.ts b/typescript/patterns-use-cases/microservices-sagas/src/booking_workflow.ts new file mode 100644 index 00000000..132af6e9 --- /dev/null +++ b/typescript/patterns-use-cases/microservices-sagas/src/booking_workflow.ts @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2024 - Restate Software, Inc., Restate GmbH + * + * This file is part of the Restate examples, + * which is released under the MIT license. + * + * You can find a copy of the license in the file LICENSE + * in the root directory of this repository or package or at + * https://github.com/restatedev/examples/ + */ + +import * as restate from "@restatedev/restate-sdk"; + +import { flights } from "./services/flights"; +import { cars } from "./services/cars"; +import { payments } from "./services/payments"; + +// +// An example of a trip reservation workflow, using the SAGAs pattern to +// undo previous steps in case of an error. +// +// The durable execution's guarantee to run code to the end in the presence +// of failures, and to deterministically recover previous steps from the +// journal, makes SAGAs easy. +// Every step pushes a compensation action (an undo operation) to a stack. +// in the case of an error, those operations are run. +// +// The main requirement is that steps are implemented as journald +// operations, like `ctx.run()` or rpc/messaging. +restate.endpoint().bind( + restate.workflow({ + name: "BookingWorkflow", + handlers: { + run: async (ctx: restate.WorkflowContext, tripID: string) => { + // create a list of undo actions + const compensations = []; + try { + + // call the flight API to reserve, keeping track of how to cancel + const flightBooking = await ctx.run(() => flights.reserve(tripID)); + compensations.push(() => flights.cancel(tripID, flightBooking)); + + // call the car rental service API to reserve, keeping track of how to cancel + const carBooking = await ctx.run(() => cars.reserve(tripID)); + compensations.push(() => cars.cancel(tripID, carBooking)); + + // call the payments API, keeping track of how to refund + const paymentId = ctx.rand.uuidv4(); + compensations.push(() => payments.refund({ paymentId })); + await ctx.run(() => payments.process({ tripID })); + + // confirm the flight and car reservations + await flights.confirm(tripID, flightBooking); + await cars.confirm(tripID, carBooking); + + } catch (e) { + + if (e instanceof restate.TerminalError) { + // undo all the steps up to this point by running the compensations + for (const compensation of compensations.reverse()) { + await compensation(); + } + } + + throw e; + } + } + }, + }) +).listen(9080) + diff --git a/typescript/patterns-use-cases/microservices-sagas/src/services/payments.ts b/typescript/patterns-use-cases/microservices-sagas/src/services/payments.ts index 200d30bf..b54e53f0 100644 --- a/typescript/patterns-use-cases/microservices-sagas/src/services/payments.ts +++ b/typescript/patterns-use-cases/microservices-sagas/src/services/payments.ts @@ -15,7 +15,7 @@ export const payments = { return "payment_id"; }, - refund: async (request: { tripID: string; paymentId: string }) => { + refund: async (request: { paymentId: string }) => { // refund the payment }, }; diff --git a/typescript/patterns-use-cases/microservices-sagas/src/workflow_saga.ts b/typescript/patterns-use-cases/microservices-sagas/src/workflow_saga.ts deleted file mode 100644 index e1598178..00000000 --- a/typescript/patterns-use-cases/microservices-sagas/src/workflow_saga.ts +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Copyright (c) 2024 - Restate Software, Inc., Restate GmbH - * - * This file is part of the Restate examples, - * which is released under the MIT license. - * - * You can find a copy of the license in the file LICENSE - * in the root directory of this repository or package or at - * https://github.com/restatedev/examples/ - */ - -import * as restate from "@restatedev/restate-sdk"; - -import { flights } from "./services/flights"; -import { cars } from "./services/cars"; -import { payments } from "./services/payments"; - -// -// An example of a trip reservation workflow, using the SAGAs pattern to -// undo previous steps in case of an error. -// -// The durable execution's guarantee to run code to the end in the presence -// of failures, and to deterministically recover previous steps from the -// journal, makes SAGAs easy. -// Every step pushes a compensation action (an undo operation) to a stack. -// in the case of an error, those operations are run. -// -// The main requirement is that steps are implemented as journald -// operations, like `ctx.run()` or rpc/messaging. - -export default async (ctx: restate.Context, tripID: string) => { - // create a list of undo actions - const compensations = []; - try { - // - // call the flight API to reserve, keeping track of how to cancel - // - const flightBooking = await ctx.run("reserve a flight", () => - flights.reserve(tripID) - ); - compensations.push(() => flights.cancel(tripID, flightBooking)); - - // - // call the car rental service API to reserve, keeping track of how to cancel - // - const carBooking = await ctx.run("rent a car", () => cars.reserve(tripID)); - compensations.push(() => cars.cancel(tripID, carBooking)); - - // - // call the payments API, keeping track of how to refund - // - const paymentId = await ctx.run("process payment", () => - payments.process({ tripID }) - ); - - compensations.push(() => payments.refund({ tripID, paymentId })); - - // - // confirm the flight and car reservations - // - await flights.confirm(tripID, flightBooking); - await cars.confirm(tripID, carBooking); - - } catch (e) { - - if (e instanceof restate.TerminalError) { - // undo all the steps up to this point by running the compensations - for (const compensation of compensations.reverse()) { - await compensation(); - } - } - - throw e; - } -};