From 457471c23f93d23d16ab56c21659e3bdc36b9c90 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Antonio=20Mendoza=20P=C3=A9rez?= Date: Sun, 2 Jun 2024 19:01:44 +0200 Subject: [PATCH 1/5] added batch interator example --- .scripts/list-of-samples.json | 1 + batch/.eslintignore | 3 ++ batch/.eslintrc.js | 48 +++++++++++++++++ batch/.gitignore | 2 + batch/.npmrc | 1 + batch/.nvmrc | 1 + batch/.post-create | 18 +++++++ batch/.prettierignore | 1 + batch/.prettierrc | 2 + batch/README.md | 3 ++ batch/package.json | 58 +++++++++++++++++++++ batch/src/iterator/README.md | 21 ++++++++ batch/src/iterator/activities.ts | 19 +++++++ batch/src/iterator/client.ts | 25 +++++++++ batch/src/iterator/worker.ts | 17 ++++++ batch/src/iterator/workflows.ts | 88 ++++++++++++++++++++++++++++++++ batch/tsconfig.json | 12 +++++ 17 files changed, 320 insertions(+) create mode 100644 batch/.eslintignore create mode 100644 batch/.eslintrc.js create mode 100644 batch/.gitignore create mode 100644 batch/.npmrc create mode 100644 batch/.nvmrc create mode 100644 batch/.post-create create mode 100644 batch/.prettierignore create mode 100644 batch/.prettierrc create mode 100644 batch/README.md create mode 100644 batch/package.json create mode 100644 batch/src/iterator/README.md create mode 100644 batch/src/iterator/activities.ts create mode 100644 batch/src/iterator/client.ts create mode 100644 batch/src/iterator/worker.ts create mode 100644 batch/src/iterator/workflows.ts create mode 100644 batch/tsconfig.json diff --git a/.scripts/list-of-samples.json b/.scripts/list-of-samples.json index bf1f908b..4ea67bb5 100644 --- a/.scripts/list-of-samples.json +++ b/.scripts/list-of-samples.json @@ -3,6 +3,7 @@ "activities-cancellation-heartbeating", "activities-dependency-injection", "activities-examples", + "batch", "child-workflows", "continue-as-new", "cron-workflows", diff --git a/batch/.eslintignore b/batch/.eslintignore new file mode 100644 index 00000000..7bd99a41 --- /dev/null +++ b/batch/.eslintignore @@ -0,0 +1,3 @@ +node_modules +lib +.eslintrc.js \ No newline at end of file diff --git a/batch/.eslintrc.js b/batch/.eslintrc.js new file mode 100644 index 00000000..b8251a06 --- /dev/null +++ b/batch/.eslintrc.js @@ -0,0 +1,48 @@ +const { builtinModules } = require('module'); + +const ALLOWED_NODE_BUILTINS = new Set(['assert']); + +module.exports = { + root: true, + parser: '@typescript-eslint/parser', + parserOptions: { + project: './tsconfig.json', + tsconfigRootDir: __dirname, + }, + plugins: ['@typescript-eslint', 'deprecation'], + extends: [ + 'eslint:recommended', + 'plugin:@typescript-eslint/eslint-recommended', + 'plugin:@typescript-eslint/recommended', + 'prettier', + ], + rules: { + // recommended for safety + '@typescript-eslint/no-floating-promises': 'error', // forgetting to await Activities and Workflow APIs is bad + 'deprecation/deprecation': 'warn', + + // code style preference + 'object-shorthand': ['error', 'always'], + + // relaxed rules, for convenience + '@typescript-eslint/no-unused-vars': [ + 'warn', + { + argsIgnorePattern: '^_', + varsIgnorePattern: '^_', + }, + ], + '@typescript-eslint/no-explicit-any': 'off', + }, + overrides: [ + { + files: ['src/workflows.ts', 'src/workflows-*.ts', 'src/workflows/*.ts'], + rules: { + 'no-restricted-imports': [ + 'error', + ...builtinModules.filter((m) => !ALLOWED_NODE_BUILTINS.has(m)).flatMap((m) => [m, `node:${m}`]), + ], + }, + }, + ], +}; diff --git a/batch/.gitignore b/batch/.gitignore new file mode 100644 index 00000000..a9f4ed54 --- /dev/null +++ b/batch/.gitignore @@ -0,0 +1,2 @@ +lib +node_modules \ No newline at end of file diff --git a/batch/.npmrc b/batch/.npmrc new file mode 100644 index 00000000..9cf94950 --- /dev/null +++ b/batch/.npmrc @@ -0,0 +1 @@ +package-lock=false \ No newline at end of file diff --git a/batch/.nvmrc b/batch/.nvmrc new file mode 100644 index 00000000..b6a7d89c --- /dev/null +++ b/batch/.nvmrc @@ -0,0 +1 @@ +16 diff --git a/batch/.post-create b/batch/.post-create new file mode 100644 index 00000000..a682bb78 --- /dev/null +++ b/batch/.post-create @@ -0,0 +1,18 @@ +To begin development, install the Temporal CLI: + + Mac: {cyan brew install temporal} + Other: Download and extract the latest release from https://github.com/temporalio/cli/releases/latest + +Start Temporal Server: + + {cyan temporal server start-dev} + +Use Node version 16+: + + Mac: {cyan brew install node@16} + Other: https://nodejs.org/en/download/ + +Then, in the project directory, using two other shells, run these commands: + + {cyan npm run start.watch} + {cyan npm run workflow} diff --git a/batch/.prettierignore b/batch/.prettierignore new file mode 100644 index 00000000..7951405f --- /dev/null +++ b/batch/.prettierignore @@ -0,0 +1 @@ +lib \ No newline at end of file diff --git a/batch/.prettierrc b/batch/.prettierrc new file mode 100644 index 00000000..965d50bf --- /dev/null +++ b/batch/.prettierrc @@ -0,0 +1,2 @@ +printWidth: 120 +singleQuote: true diff --git a/batch/README.md b/batch/README.md new file mode 100644 index 00000000..dfb86c08 --- /dev/null +++ b/batch/README.md @@ -0,0 +1,3 @@ +# Batch Examples + +## [Iterator](./src/iterator/README.md) diff --git a/batch/package.json b/batch/package.json new file mode 100644 index 00000000..44d46f5e --- /dev/null +++ b/batch/package.json @@ -0,0 +1,58 @@ +{ + "name": "temporal-activities-examples", + "version": "0.1.0", + "private": true, + "scripts": { + "build": "tsc --build", + "build.watch": "tsc --build --watch", + "lint": "eslint .", + "start-iterator": "ts-node src/iterator/worker.ts", + "start-iterator.watch": "nodemon src/iterator/worker.ts", + "workflow-iterator": "ts-node src/iterator/client.ts", + "format": "prettier --config .prettierrc 'src/**/*.ts' --write" + }, + "nodemonConfig": { + "execMap": { + "ts": "ts-node" + }, + "ext": "ts", + "watch": [ + "src" + ] + }, + "dependencies": { + "@temporalio/activity": "^1.10.1", + "@temporalio/client": "^1.10.1", + "@temporalio/worker": "^1.10.1", + "@temporalio/workflow": "^1.10.1", + "axios": "^0.26.0", + "node-fetch": "2.x" + }, + "devDependencies": { + "@temporalio/nyc-test-coverage": "^1.10.1", + "@temporalio/testing": "^1.10.1", + "@tsconfig/node16": "^1.0.0", + "@types/jest": "^27.5.1", + "@types/mocha": "8.x", + "@types/node": "^16.11.43", + "@types/node-fetch": "^2.5.12", + "@types/sinon": "^10.0.4", + "@types/uuid": "^8.3.4", + "@typescript-eslint/eslint-plugin": "^5.0.0", + "@typescript-eslint/parser": "^5.0.0", + "eslint": "^7.32.0", + "eslint-config-prettier": "^8.3.0", + "eslint-plugin-deprecation": "^1.2.1", + "jest": "^28.1.0", + "mocha": "8.x", + "nodemon": "^2.0.12", + "nyc": "15.1.0", + "prettier": "^2.3.2", + "sinon": "^11.1.2", + "source-map-support": "^0.5.21", + "ts-jest": "^28.0.2", + "ts-node": "^10.2.1", + "typescript": "^4.2.2", + "uuid": "^8.3.2" + } +} diff --git a/batch/src/iterator/README.md b/batch/src/iterator/README.md new file mode 100644 index 00000000..65b51889 --- /dev/null +++ b/batch/src/iterator/README.md @@ -0,0 +1,21 @@ +# Batch Iterator + +A sample implementation of the Workflow iterator pattern. + +A workflow starts a configured number of Child Workflows in parallel. Each child processes a single record. +After all children close (complete or fail), the parent calls continue-as-new and starts the children for the next page of records. + +The parent tracks and returns the total number of records processed and the number of failed ones. + +This allows processing a set of records of any size. The advantage of this approach is simplicity. +The main disadvantage is that it processes records in batches, with each batch waiting for the slowest child workflow. + +A variation of this pattern runs activities instead of child workflows. + +## Running this sample + +1. `temporal server start-dev` to start [Temporal Server](https://github.com/temporalio/cli/#installation). +2. Navigate to the parent directory (`batch`), in the parent directory: + 1. `npm install` to install dependencies. + 2. `npm run start-iterator.watch` to start the Worker. + 3. In another shell, `npm run workflow-iterator` to run the Workflow. diff --git a/batch/src/iterator/activities.ts b/batch/src/iterator/activities.ts new file mode 100644 index 00000000..ffe25170 --- /dev/null +++ b/batch/src/iterator/activities.ts @@ -0,0 +1,19 @@ +export async function getRecords(pageSize: number, offset: number) { + const PAGE_COUNT = 2; + const result = []; + if (offset < pageSize * PAGE_COUNT) { + for (let i = 0; i < pageSize; i++) { + result.push(new Record(offset + i)); + } + } + return result; +} + +export class Record { + public readonly id: any; + public readonly description: string; + constructor(id: number) { + this.id = id; + this.description = 'record number ' + this.id; + } +} diff --git a/batch/src/iterator/client.ts b/batch/src/iterator/client.ts new file mode 100644 index 00000000..182f3d8e --- /dev/null +++ b/batch/src/iterator/client.ts @@ -0,0 +1,25 @@ +import { Connection, Client } from '@temporalio/client'; +import { processBatch } from './workflows'; + +async function run() { + const connection = await Connection.connect(); + const client = new Client({ connection }); + + const handle = await client.workflow.start(processBatch, { + taskQueue: 'tq-iterator-wf', + workflowId: 'iterator-wf', + args: [ + { + pageSize: 5, + offset: 0, + }, + ], + }); + const result = await handle.result(); + console.log('Execution result:', result); +} + +run().catch((err) => { + console.error(err); + process.exit(1); +}); diff --git a/batch/src/iterator/worker.ts b/batch/src/iterator/worker.ts new file mode 100644 index 00000000..cdb35e35 --- /dev/null +++ b/batch/src/iterator/worker.ts @@ -0,0 +1,17 @@ +import { Worker } from '@temporalio/worker'; +import * as activities from './activities'; + +async function run() { + const worker = await Worker.create({ + workflowsPath: require.resolve('./workflows'), + activities, + taskQueue: 'tq-iterator-wf', + }); + + await worker.run(); +} + +run().catch((err) => { + console.error(err); + process.exit(1); +}); diff --git a/batch/src/iterator/workflows.ts b/batch/src/iterator/workflows.ts new file mode 100644 index 00000000..d92e19a8 --- /dev/null +++ b/batch/src/iterator/workflows.ts @@ -0,0 +1,88 @@ +import { ChildWorkflowHandle, continueAsNew, log, sleep, startChild, workflowInfo } from '@temporalio/workflow'; + +import { proxyActivities } from '@temporalio/workflow'; +import type * as activities from './activities'; +import { Record } from './activities'; +import { ApplicationFailure } from '@temporalio/workflow'; + +const { getRecords } = proxyActivities({ + startToCloseTimeout: '1 minute', +}); + +export async function processBatch(batch: Batch, previousExecutionResult?: Result): Promise { + // load the items we want to process + const records: Record[] = await getRecords(batch.pageSize, batch.offset); + + // Starts a child per record asynchronously. + const handles: Array> = await Promise.all( + records.map((record) => { + return startChild(recordProcessor, { + workflowId: workflowInfo().workflowId + '/child-' + record.id, + args: [record], + }); + }) + ); + + const totalProcessedRecords = previousExecutionResult + ? previousExecutionResult.totalProcessedRecords + handles.length + : handles.length; + let failedRecords = previousExecutionResult ? previousExecutionResult.failedRecords : 0; + //wait for all child workflows to complete or fail + for (const handle of handles) { + await handle.result().catch(() => { + //intentionally failing 1/5 child workflows, track child workflows failures. + failedRecords++; + }); + } + + const executionResult = { + totalProcessedRecords, + failedRecords, + }; + + //Complete the workflow if there are no more record to process + if (records.length == 0) { + return executionResult; + } + + //Continue as new to process the next batch + return continueAsNew( + { + pageSize: batch.pageSize, + offset: batch.offset + records.length, + }, + executionResult + ); +} + +export async function recordProcessor(record: Record): Promise { + log.info(`Processing record ${JSON.stringify(record)} in child workflow `); + + //Sleep random time between 1000 and 2000 ms + const maxSleep = 2000; + const minSleep = 1000; + + await sleep(Math.floor(Math.random() * (maxSleep - minSleep + 1) + minSleep)); + + //intentionally failing 1/5 child workflows + if (record.id % 5 == 0) { + throw ApplicationFailure.nonRetryable( + `Intentionally failing the child workflow with input ${JSON.stringify(record)}` + ); + } +} + +export class Batch { + public readonly pageSize: number; + public readonly offset: number; + + constructor(pageSize: number, offset: number) { + this.pageSize = pageSize; + this.offset = offset; + } +} + +interface Result { + totalProcessedRecords: number; + failedRecords: number; +} diff --git a/batch/tsconfig.json b/batch/tsconfig.json new file mode 100644 index 00000000..6ff187f6 --- /dev/null +++ b/batch/tsconfig.json @@ -0,0 +1,12 @@ +{ + "extends": "@tsconfig/node16/tsconfig.json", + "version": "4.4.2", + "compilerOptions": { + "declaration": true, + "declarationMap": true, + "sourceMap": true, + "rootDir": "./src", + "outDir": "./lib" + }, + "include": ["src/**/*.ts"] +} From 9557ee0946c17cf6447ff16bc0c32e08ab7ecb64 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Antonio=20Mendoza=20P=C3=A9rez?= Date: Sun, 2 Jun 2024 19:13:03 +0200 Subject: [PATCH 2/5] added batch interator example --- batch/package.json | 30 +++++++----------------------- batch/src/iterator/README.md | 2 +- batch/src/iterator/workflows.ts | 2 +- 3 files changed, 9 insertions(+), 25 deletions(-) diff --git a/batch/package.json b/batch/package.json index 44d46f5e..72fcee86 100644 --- a/batch/package.json +++ b/batch/package.json @@ -1,5 +1,5 @@ { - "name": "temporal-activities-examples", + "name": "child-workflows", "version": "0.1.0", "private": true, "scripts": { @@ -24,35 +24,19 @@ "@temporalio/activity": "^1.10.1", "@temporalio/client": "^1.10.1", "@temporalio/worker": "^1.10.1", - "@temporalio/workflow": "^1.10.1", - "axios": "^0.26.0", - "node-fetch": "2.x" + "@temporalio/workflow": "^1.10.1" }, "devDependencies": { - "@temporalio/nyc-test-coverage": "^1.10.1", - "@temporalio/testing": "^1.10.1", "@tsconfig/node16": "^1.0.0", - "@types/jest": "^27.5.1", - "@types/mocha": "8.x", - "@types/node": "^16.11.43", - "@types/node-fetch": "^2.5.12", - "@types/sinon": "^10.0.4", - "@types/uuid": "^8.3.4", + "@types/node": "^18.0.0", "@typescript-eslint/eslint-plugin": "^5.0.0", "@typescript-eslint/parser": "^5.0.0", "eslint": "^7.32.0", "eslint-config-prettier": "^8.3.0", "eslint-plugin-deprecation": "^1.2.1", - "jest": "^28.1.0", - "mocha": "8.x", - "nodemon": "^2.0.12", - "nyc": "15.1.0", - "prettier": "^2.3.2", - "sinon": "^11.1.2", - "source-map-support": "^0.5.21", - "ts-jest": "^28.0.2", - "ts-node": "^10.2.1", - "typescript": "^4.2.2", - "uuid": "^8.3.2" + "nodemon": "^2.0.22", + "prettier": "^2.8.8", + "ts-node": "^10.9.2", + "typescript": "^4.4.2" } } diff --git a/batch/src/iterator/README.md b/batch/src/iterator/README.md index 65b51889..87189263 100644 --- a/batch/src/iterator/README.md +++ b/batch/src/iterator/README.md @@ -15,7 +15,7 @@ A variation of this pattern runs activities instead of child workflows. ## Running this sample 1. `temporal server start-dev` to start [Temporal Server](https://github.com/temporalio/cli/#installation). -2. Navigate to the parent directory (`batch`), in the parent directory: +2. Navigate to the parent directory (`batch`), and run: 1. `npm install` to install dependencies. 2. `npm run start-iterator.watch` to start the Worker. 3. In another shell, `npm run workflow-iterator` to run the Workflow. diff --git a/batch/src/iterator/workflows.ts b/batch/src/iterator/workflows.ts index d92e19a8..b5c6ed19 100644 --- a/batch/src/iterator/workflows.ts +++ b/batch/src/iterator/workflows.ts @@ -10,7 +10,7 @@ const { getRecords } = proxyActivities({ }); export async function processBatch(batch: Batch, previousExecutionResult?: Result): Promise { - // load the items we want to process + // load the records to process in this batch const records: Record[] = await getRecords(batch.pageSize, batch.offset); // Starts a child per record asynchronously. From 0d80a5695a37020c9f29da875d50ec827563c938 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Antonio=20Mendoza=20P=C3=A9rez?= Date: Sun, 2 Jun 2024 19:17:41 +0200 Subject: [PATCH 3/5] added batch interator example --- batch/src/iterator/workflows.ts | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/batch/src/iterator/workflows.ts b/batch/src/iterator/workflows.ts index b5c6ed19..ea2df879 100644 --- a/batch/src/iterator/workflows.ts +++ b/batch/src/iterator/workflows.ts @@ -27,10 +27,11 @@ export async function processBatch(batch: Batch, previousExecutionResult?: Resul ? previousExecutionResult.totalProcessedRecords + handles.length : handles.length; let failedRecords = previousExecutionResult ? previousExecutionResult.failedRecords : 0; - //wait for all child workflows to complete or fail + + //wait for all child workflows to close for (const handle of handles) { await handle.result().catch(() => { - //intentionally failing 1/5 child workflows, track child workflows failures. + //intentionally failing 1/5 child workflow, track child workflows failures. failedRecords++; }); } @@ -40,7 +41,7 @@ export async function processBatch(batch: Batch, previousExecutionResult?: Resul failedRecords, }; - //Complete the workflow if there are no more record to process + //Complete the workflow if there are no more records to process if (records.length == 0) { return executionResult; } @@ -58,13 +59,13 @@ export async function processBatch(batch: Batch, previousExecutionResult?: Resul export async function recordProcessor(record: Record): Promise { log.info(`Processing record ${JSON.stringify(record)} in child workflow `); - //Sleep random time between 1000 and 2000 ms const maxSleep = 2000; const minSleep = 1000; + //sleep to simulate record processing await sleep(Math.floor(Math.random() * (maxSleep - minSleep + 1) + minSleep)); - //intentionally failing 1/5 child workflows + //intentionally failing 1/5 child workflow if (record.id % 5 == 0) { throw ApplicationFailure.nonRetryable( `Intentionally failing the child workflow with input ${JSON.stringify(record)}` From f80934fed64caa5a359003cf68c1546f4f896cd3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Antonio=20Mendoza=20P=C3=A9rez?= Date: Sun, 2 Jun 2024 19:23:59 +0200 Subject: [PATCH 4/5] set node dependency to > 1.16.x.x --- batch/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/batch/package.json b/batch/package.json index 72fcee86..3903fb7f 100644 --- a/batch/package.json +++ b/batch/package.json @@ -28,7 +28,7 @@ }, "devDependencies": { "@tsconfig/node16": "^1.0.0", - "@types/node": "^18.0.0", + "@types/node": "^16.11.43", "@typescript-eslint/eslint-plugin": "^5.0.0", "@typescript-eslint/parser": "^5.0.0", "eslint": "^7.32.0", From da8e77361acf7bb9404ce771e1f93c5a7a8413b1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Antonio=20Mendoza=20P=C3=A9rez?= Date: Sun, 2 Jun 2024 21:04:27 +0200 Subject: [PATCH 5/5] add doc --- batch/src/iterator/activities.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/batch/src/iterator/activities.ts b/batch/src/iterator/activities.ts index ffe25170..4289e171 100644 --- a/batch/src/iterator/activities.ts +++ b/batch/src/iterator/activities.ts @@ -1,4 +1,5 @@ export async function getRecords(pageSize: number, offset: number) { + // This always returns 2 pages, the real implementation would iterate over an existing dataset or file. const PAGE_COUNT = 2; const result = []; if (offset < pageSize * PAGE_COUNT) {