Skip to content

Commit

Permalink
durable rpc and task queue example
Browse files Browse the repository at this point in the history
  • Loading branch information
gvdongen committed Dec 12, 2024
1 parent 9e23962 commit c9d2f6c
Show file tree
Hide file tree
Showing 19 changed files with 559 additions and 15 deletions.
6 changes: 3 additions & 3 deletions typescript/basics/src/3_workflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import { createUserEntry, sendEmailWithLink } from "./utils/stubs";
// - Additional methods interact with the workflow.
// Each workflow instance has a unique ID and runs only once (to success or failure).
//
const myWorkflow = restate.workflow({
const singupWorkflow = restate.workflow({
name: "usersignup",
handlers: {
// --- The workflow logic ---
Expand Down Expand Up @@ -50,9 +50,9 @@ const myWorkflow = restate.workflow({
},
});

export type SignupApi = typeof myWorkflow;
export type SignupApi = typeof singupWorkflow;

restate.endpoint().bind(myWorkflow).listen();
restate.endpoint().bind(singupWorkflow).listen(9080);
// or .handler() to run on Lambda, Deno, Bun, Cloudflare Workers, ...

/*
Expand Down
5 changes: 5 additions & 0 deletions typescript/patterns-use-cases/async-task-queue/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Hello world - TypeScript example

Sample project configuration of a Restate service using the TypeScript SDK.

Have a look at the [TypeScript Quickstart guide](https://docs.restate.dev/get_started/quickstart?sdk=ts) for more information on how to use this project.
25 changes: 25 additions & 0 deletions typescript/patterns-use-cases/async-task-queue/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"name": "restate-ts-template",
"version": "0.0.1",
"description": "Template for JavaScript/TypeScript services running with Restate (https://github.com/restatedev/) ",
"main": "app.js",
"type": "commonjs",
"scripts": {
"build": "tsc --noEmitOnError",
"prebundle": "rm -rf dist",
"bundle": "esbuild src/app.ts --bundle --minify --sourcemap --platform=node --target=es2020 --outfile=dist/index.js",
"postbundle": "cd dist && zip -r index.zip index.js*",
"app": "node ./dist/app.js",
"app-dev": "tsx watch ./src/app.ts"
},
"dependencies": {
"@restatedev/restate-sdk": "^1.4.0",
"@restatedev/restate-sdk-clients": "^1.4.0"
},
"devDependencies": {
"@types/node": "^20.14.2",
"esbuild": "^0.21.5",
"tsx": "^4.19.2",
"typescript": "^5.4.5"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// --------------- define async task logic as a service handler ---------------

import * as restate from "@restatedev/restate-sdk";
import { Context } from "@restatedev/restate-sdk";

const asyncTaskService = restate.service({
name: "taskWorker",
handlers: {
runTask: async (ctx: Context, params: TaskOpts) => {
return someHeavyWork(params);
},
},
});

export type AsyncTaskService = typeof asyncTaskService;

restate.endpoint().bind(asyncTaskService).listen(9080);


// ----------------------- Stubs to please the compiler -----------------------
export type TaskOpts = {id: string, task: string};

function someHeavyWork(work: TaskOpts) {
return "Work!";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import * as restate from "@restatedev/restate-sdk-clients";
import { AsyncTaskService, TaskOpts } from "./async_task_service";
import {SendOpts} from "@restatedev/restate-sdk-clients";

/*
* Restate is as a sophisticated task queue, with extra features like:
* - delaying execution and reliable timers
* - stateful tasks
* - queues per key (>< per partition; slow tasks for a key don't block others)
* - retries and recovery upon failures
*
* Every handler in Restate is executed asynchronously and can be treated
* as a reliable asynchronous task.
*/

const RESTATE_URL = "http://localhost:8080";

async function submitAndAwaitTask(task: TaskOpts) {
const restateClient = restate.connect({ url: RESTATE_URL });

// submit the task; similar to publishing a message to a queue
// Restate ensures the task is executed exactly once
const taskHandle = await restateClient
.serviceSendClient<AsyncTaskService>({ name: "taskWorker" })
.runTask(
task,
// use a stable uuid as an idempotency key
// optionally, execute the task later via SendOpts.from({ delay: 1000 })
SendOpts.from({ idempotencyKey: task.id })
);

// await the task's result
const result = await restateClient.result(taskHandle);
}

async function attachToTaskFromOtherProcess(taskHandle: string) {
const restateClient = restate.connect({ url: RESTATE_URL });
const result = await restateClient.result<string>(JSON.parse(taskHandle));
}
110 changes: 110 additions & 0 deletions typescript/patterns-use-cases/async-task-queue/tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
{
"compilerOptions": {
/* Visit https://aka.ms/tsconfig to read more about this file */

/* Projects */
// "incremental": true, /* Save .tsbuildinfo files to allow for incremental compilation of projects. */
// "composite": true, /* Enable constraints that allow a TypeScript project to be used with project references. */
// "tsBuildInfoFile": "./.tsbuildinfo", /* Specify the path to .tsbuildinfo incremental compilation file. */
// "disableSourceOfProjectReferenceRedirect": true, /* Disable preferring source files instead of declaration files when referencing composite projects. */
// "disableSolutionSearching": true, /* Opt a project out of multi-project reference checking when editing. */
// "disableReferencedProjectLoad": true, /* Reduce the number of projects loaded automatically by TypeScript. */

/* Language and Environment */
"target": "esnext", /* Set the JavaScript language version for emitted JavaScript and include compatible library declarations. */
"lib": ["esnext"], /* Specify a set of bundled library declaration files that describe the target runtime environment. */
// "jsx": "preserve", /* Specify what JSX code is generated. */
// "experimentalDecorators": true, /* Enable experimental support for legacy experimental decorators. */
// "emitDecoratorMetadata": true, /* Emit design-type metadata for decorated declarations in source files. */
// "jsxFactory": "", /* Specify the JSX factory function used when targeting React JSX emit, e.g. 'React.createElement' or 'h'. */
// "jsxFragmentFactory": "", /* Specify the JSX Fragment reference used for fragments when targeting React JSX emit e.g. 'React.Fragment' or 'Fragment'. */
// "jsxImportSource": "", /* Specify module specifier used to import the JSX factory functions when using 'jsx: react-jsx*'. */
// "reactNamespace": "", /* Specify the object invoked for 'createElement'. This only applies when targeting 'react' JSX emit. */
// "noLib": true, /* Disable including any library files, including the default lib.d.ts. */
// "useDefineForClassFields": true, /* Emit ECMAScript-standard-compliant class fields. */
// "moduleDetection": "auto", /* Control what method is used to detect module-format JS files. */

/* Modules */
"module": "nodenext", /* Specify what module code is generated. */
// "rootDir": "./", /* Specify the root folder within your source files. */
// "moduleResolution": "node10", /* Specify how TypeScript looks up a file from a given module specifier. */
// "baseUrl": "./", /* Specify the base directory to resolve non-relative module names. */
// "paths": {}, /* Specify a set of entries that re-map imports to additional lookup locations. */
// "rootDirs": [], /* Allow multiple folders to be treated as one when resolving modules. */
// "typeRoots": [], /* Specify multiple folders that act like './node_modules/@types'. */
// "types": [], /* Specify type package names to be included without being referenced in a source file. */
// "allowUmdGlobalAccess": true, /* Allow accessing UMD globals from modules. */
// "moduleSuffixes": [], /* List of file name suffixes to search when resolving a module. */
// "allowImportingTsExtensions": true, /* Allow imports to include TypeScript file extensions. Requires '--moduleResolution bundler' and either '--noEmit' or '--emitDeclarationOnly' to be set. */
// "resolvePackageJsonExports": true, /* Use the package.json 'exports' field when resolving package imports. */
// "resolvePackageJsonImports": true, /* Use the package.json 'imports' field when resolving imports. */
// "customConditions": [], /* Conditions to set in addition to the resolver-specific defaults when resolving imports. */
// "resolveJsonModule": true, /* Enable importing .json files. */
// "allowArbitraryExtensions": true, /* Enable importing files with any extension, provided a declaration file is present. */
// "noResolve": true, /* Disallow 'import's, 'require's or '<reference>'s from expanding the number of files TypeScript should add to a project. */

/* JavaScript Support */
"allowJs": true, /* Allow JavaScript files to be a part of your program. Use the 'checkJS' option to get errors from these files. */
// "checkJs": true, /* Enable error reporting in type-checked JavaScript files. */
// "maxNodeModuleJsDepth": 1, /* Specify the maximum folder depth used for checking JavaScript files from 'node_modules'. Only applicable with 'allowJs'. */

/* Emit */
"declaration": true, /* Generate .d.ts files from TypeScript and JavaScript files in your project. */
"declarationMap": true, /* Create sourcemaps for d.ts files. */
"sourceMap": true, /* Create source map files for emitted JavaScript files. */
"outDir": "./dist", /* Specify an output folder for all emitted files. */
// "emitDeclarationOnly": true, /* Only output d.ts files and not JavaScript files. */
// "inlineSourceMap": true, /* Include sourcemap files inside the emitted JavaScript. */
// "outFile": "./", /* Specify a file that bundles all outputs into one JavaScript file. If 'declaration' is true, also designates a file that bundles all .d.ts output. */
// "removeComments": true, /* Disable emitting comments. */
// "noEmit": true, /* Disable emitting files from a compilation. */
// "importHelpers": true, /* Allow importing helper functions from tslib once per project, instead of including them per-file. */
// "importsNotUsedAsValues": "remove", /* Specify emit/checking behavior for imports that are only used for types. */
// "downlevelIteration": true, /* Emit more compliant, but verbose and less performant JavaScript for iteration. */
// "sourceRoot": "", /* Specify the root path for debuggers to find the reference source code. */
// "mapRoot": "", /* Specify the location where debugger should locate map files instead of generated locations. */
// "inlineSources": true, /* Include source code in the sourcemaps inside the emitted JavaScript. */
// "emitBOM": true, /* Emit a UTF-8 Byte Order Mark (BOM) in the beginning of output files. */
// "newLine": "crlf", /* Set the newline character for emitting files. */
// "stripInternal": true, /* Disable emitting declarations that have '@internal' in their JSDoc comments. */
// "noEmitHelpers": true, /* Disable generating custom helper functions like '__extends' in compiled output. */
// "noEmitOnError": true, /* Disable emitting files if any type checking errors are reported. */
// "preserveConstEnums": true, /* Disable erasing 'const enum' declarations in generated code. */
// "declarationDir": "./", /* Specify the output directory for generated declaration files. */
// "preserveValueImports": true, /* Preserve unused imported values in the JavaScript output that would otherwise be removed. */

/* Interop Constraints */
"esModuleInterop": true, /* Emit additional JavaScript to ease support for importing CommonJS modules. This enables 'allowSyntheticDefaultImports' for type compatibility. */
"forceConsistentCasingInFileNames": true, /* Ensure that casing is correct in imports. */
"allowSyntheticDefaultImports": true, /* Allow 'import x from y' when a module doesn't have a default export. */
// "isolatedModules": true, /* Ensure that each file can be safely transpiled without relying on other imports. */
// "verbatimModuleSyntax": true, /* Do not transform or elide any imports or exports not marked as type-only, ensuring they are written in the output file's format based on the 'module' setting. */
// "preserveSymlinks": true, /* Disable resolving symlinks to their realpath. This correlates to the same flag in node. */

/* Type Checking */
"strict": true, /* Enable all strict type-checking options. */
// "noImplicitAny": true, /* Enable error reporting for expressions and declarations with an implied 'any' type. */
// "strictNullChecks": true, /* When type checking, take into account 'null' and 'undefined'. */
// "strictFunctionTypes": true, /* When assigning functions, check to ensure parameters and the return values are subtype-compatible. */
// "strictBindCallApply": true, /* Check that the arguments for 'bind', 'call', and 'apply' methods match the original function. */
// "strictPropertyInitialization": true, /* Check for class properties that are declared but not set in the constructor. */
// "noImplicitThis": true, /* Enable error reporting when 'this' is given the type 'any'. */
// "useUnknownInCatchVariables": true, /* Default catch clause variables as 'unknown' instead of 'any'. */
// "alwaysStrict": true, /* Ensure 'use strict' is always emitted. */
// "noUnusedLocals": true, /* Enable error reporting when local variables aren't read. */
// "noUnusedParameters": true, /* Raise an error when a function parameter isn't read. */
// "exactOptionalPropertyTypes": true, /* Interpret optional property types as written, rather than adding 'undefined'. */
// "noImplicitReturns": true, /* Enable error reporting for codepaths that do not explicitly return in a function. */
// "noFallthroughCasesInSwitch": true, /* Enable error reporting for fallthrough cases in switch statements. */
// "noUncheckedIndexedAccess": true, /* Add 'undefined' to a type when accessed using an index. */
// "noImplicitOverride": true, /* Ensure overriding members in derived classes are marked with an override modifier. */
// "noPropertyAccessFromIndexSignature": true, /* Enforces using indexed accessors for keys declared using an indexed type. */
// "allowUnusedLabels": true, /* Disable error reporting for unused labels. */
// "allowUnreachableCode": true, /* Disable error reporting for unreachable code. */

/* Completeness */
"skipLibCheck": true, /* Skip type checking all .d.ts files. */
"skipDefaultLibCheck": true /* Skip type checking .d.ts files that are included with TypeScript. */
},
"include": ["src/"]
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,24 @@ type PaymentRequest = {
};

async function processPayment(ctx: restate.Context, request: PaymentRequest) {
const {paymentMethodId, amount, delayedStatus} = request;

verifyPaymentRequest(request);

// Generate a deterministic idempotency key
const idempotencyKey = ctx.rand.uuidv4();

// Initiate a listener for external calls for potential webhook callbacks
const webhookPromise = ctx.awakeable<Stripe.PaymentIntent>();
const { id: intentWebhookId, promise: intentPromise } = ctx.awakeable<Stripe.PaymentIntent>();

// Make a synchronous call to the payment service
const paymentIntent = await ctx.run("stripe call", () =>
stripe_utils.createPaymentIntent({
paymentMethodId: request.paymentMethodId,
amount: request.amount,
paymentMethodId,
amount,
idempotencyKey,
webhookPromiseId: webhookPromise.id,
delayedStatus: request.delayedStatus,
intentWebhookId,
delayedStatus,
})
);

Expand All @@ -70,7 +72,7 @@ async function processPayment(ctx: restate.Context, request: PaymentRequest) {

// We will now wait for the webhook call to complete this promise.
// Check out the handler below.
const processedPaymentIntent = await webhookPromise.promise;
const processedPaymentIntent = await intentPromise;

console.log(`Webhook call for ${idempotencyKey} received!`);
stripe_utils.ensureSuccess(processedPaymentIntent.status);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ export async function createPaymentIntent(request: {
paymentMethodId: string;
amount: number;
idempotencyKey: string;
webhookPromiseId: string;
intentWebhookId: string;
delayedStatus?: boolean;
}): Promise<Stripe.PaymentIntent> {
const requestOptions = {
Expand All @@ -66,7 +66,7 @@ export async function createPaymentIntent(request: {
confirmation_method: "automatic",
return_url: "https://restate.dev/", // some random URL
metadata: {
restate_callback_id: request.webhookPromiseId,
restate_callback_id: request.intentWebhookId,
},
},
requestOptions
Expand Down
22 changes: 22 additions & 0 deletions typescript/patterns-use-cases/microservices-durable-rpc/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Microservices: Durable RPC

This example shows an example of:
- **Durable RPC** with exactly-once semantics
- **Requests with idempotency keys**: Ensuring that the same request is processed only once



## Running the Example

Run Restate locally (`npx restate-server`).

Run the Restate service: `cd restate-app && npm run app`

Register the service: `npx restate deployments register http://localhost:5000`

Run the Express service: `cd express-app && npm run app`

Send a request to the Express service:
```
curl -X POST localhost:5000/reserve/12345/abcde
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import * as restate from "@restatedev/restate-sdk-clients";
import { Opts } from "@restatedev/restate-sdk-clients";
import express, { Request, Response } from "express";
import { ProductService } from "product-service";

const app = express();

const RESTATE_URL = "http://localhost:8080";
const restateClient = restate.connect({ url: RESTATE_URL });

app.post("/reserve/:productId/:reservationId", async (req: Request, res: Response) => {
const { productId, reservationId } = req.params;

// Durable RPC call to the product service
// Restate registers the request and makes sure runs to completion exactly once
const products = restateClient
.objectClient<ProductService>({ name: "product" }, productId);
const reservation = await products.reserve(
// Restate deduplicates requests with the same idempotency key
Opts.from({ idempotencyKey: reservationId })
);

console.log("Reservation result", reservation);
return res.json(reservation);
});

app.listen(5000, () => {
console.log("Server is running on port 5000");
})
Loading

0 comments on commit c9d2f6c

Please sign in to comment.