diff --git a/packages/design/src/scaffold.ts b/packages/design/src/scaffold.ts index a3d8ca7..527d1a4 100644 --- a/packages/design/src/scaffold.ts +++ b/packages/design/src/scaffold.ts @@ -136,7 +136,9 @@ function inferStepsAndDecisions(diagramStatements: Statement[]): { } if (to === '[*]') { - steps[from].whatsNext = EndState + if (!fromDecision) { + steps[from].whatsNext = EndState + } } else { if (!toDecision) { steps[to] = steps[to] ?? { @@ -304,16 +306,22 @@ function escapeString(str: string) { } function variableFromDecision(decision: Decision) { - const outcomes = decision.outcomes.map( - o => - ` + const outcomes = decision.outcomes.map(o => { + if (o.target !== EndState.name) { + return ` ${camelCase(o.name)}(payload: ${stepNameToPayloadName(o.target)}) { return { targetTaskName: '${escapeString(o.target)}', payload, } - },`, - ) + },` + } else { + return ` + ${camelCase(o.name)}() { + return CompleteWorkflow; + },` + } + }) return `protected ${camelCase(decision.name)} = {${outcomes.join('')} }` diff --git a/packages/design/test/resources/recursion.generated.typescript b/packages/design/test/resources/recursion.generated.typescript new file mode 100644 index 0000000..f3f6c3c --- /dev/null +++ b/packages/design/test/resources/recursion.generated.typescript @@ -0,0 +1,41 @@ +/* This file is automatically generated. It gets overwritten on build */ +import {CompleteWorkflow, WrongTimingError, WorkflowBase} from "@gamgee/run"; +import {StateStore} from "@gamgee/interfaces/store"; +import {WorkflowTask} from "@gamgee/interfaces/task"; + +import {CountDownPayload} from "./recursion"; + +export abstract class RecursionWorkflowBase extends WorkflowBase { + protected constructor() { + super('RecursionWorkflow'); + + super._registerStep({ name: 'countDown', run: this.countDown, attempts: 1, backoffMs: 1000 }); + } + + async submit(payload: CountDownPayload, store: StateStore, uniqueInstanceId?: string): Promise<string> { + const task = await super._enqueue('countDown', payload, store, uniqueInstanceId); + return task.instanceId; + } + + abstract countDown(payload: CountDownPayload): Promise<ReturnType<(typeof this.decision)['zero']>> | Promise<ReturnType<(typeof this.decision)['nonZero']>>; + + protected decision = { + zero() { + return CompleteWorkflow; + }, + nonZero(payload: CountDownPayload) { + return { + targetTaskName: 'countDown', + payload, + } + }, + } + + protected _registerStep() { + throw new WrongTimingError(); + } + + protected _enqueue(): Promise<WorkflowTask> { + throw new WrongTimingError(); + } +} \ No newline at end of file diff --git a/packages/design/test/resources/recursion.mermaid b/packages/design/test/resources/recursion.mermaid new file mode 100644 index 0000000..2f9843e --- /dev/null +++ b/packages/design/test/resources/recursion.mermaid @@ -0,0 +1,12 @@ +--- +title: RecursionWorkflow +--- + +stateDiagram-v2 + state decision <<choice>> + + direction LR + [*] --> countDown + countDown --> decision + decision --> [*]: zero + decision --> countDown: nonZero \ No newline at end of file diff --git a/packages/run/docs/examples/README.md b/packages/run/docs/examples/README.md index 01f300b..d4d284c 100644 --- a/packages/run/docs/examples/README.md +++ b/packages/run/docs/examples/README.md @@ -1,4 +1,5 @@ # Examples 1. Simple workflow with one step: [simple-workflow](./simple-workflow). -2. A workflow with a decision between going with the left or right branches: [conditions](./conditions). \ No newline at end of file +2. A workflow with a decision between going with the left or right branches: [conditions](./conditions). +3. A workflow counting down recursively: [recursion](./recursion). \ No newline at end of file diff --git a/packages/run/docs/examples/recursion/README.md b/packages/run/docs/examples/recursion/README.md new file mode 100644 index 0000000..a52aa62 --- /dev/null +++ b/packages/run/docs/examples/recursion/README.md @@ -0,0 +1,18 @@ +```mermaid +--- +title: RecursionWorkflow +--- + +stateDiagram-v2 + direction TB + state decision <<choice>> + + [*] --> countDown + countDown --> decision + decision --> [*]: zero + decision --> countDown: nonZero +``` + +A workflow counting down recursively. + +[[Diagram Source](./recursion.mermaid)] [[Generated Scaffold](./recursion.generated.ts)] [[Implementation](./recursion.ts)] \ No newline at end of file diff --git a/packages/run/docs/examples/recursion/recursion.generated.ts b/packages/run/docs/examples/recursion/recursion.generated.ts new file mode 100644 index 0000000..f3f6c3c --- /dev/null +++ b/packages/run/docs/examples/recursion/recursion.generated.ts @@ -0,0 +1,41 @@ +/* This file is automatically generated. It gets overwritten on build */ +import {CompleteWorkflow, WrongTimingError, WorkflowBase} from "@gamgee/run"; +import {StateStore} from "@gamgee/interfaces/store"; +import {WorkflowTask} from "@gamgee/interfaces/task"; + +import {CountDownPayload} from "./recursion"; + +export abstract class RecursionWorkflowBase extends WorkflowBase { + protected constructor() { + super('RecursionWorkflow'); + + super._registerStep({ name: 'countDown', run: this.countDown, attempts: 1, backoffMs: 1000 }); + } + + async submit(payload: CountDownPayload, store: StateStore, uniqueInstanceId?: string): Promise<string> { + const task = await super._enqueue('countDown', payload, store, uniqueInstanceId); + return task.instanceId; + } + + abstract countDown(payload: CountDownPayload): Promise<ReturnType<(typeof this.decision)['zero']>> | Promise<ReturnType<(typeof this.decision)['nonZero']>>; + + protected decision = { + zero() { + return CompleteWorkflow; + }, + nonZero(payload: CountDownPayload) { + return { + targetTaskName: 'countDown', + payload, + } + }, + } + + protected _registerStep() { + throw new WrongTimingError(); + } + + protected _enqueue(): Promise<WorkflowTask> { + throw new WrongTimingError(); + } +} \ No newline at end of file diff --git a/packages/run/docs/examples/recursion/recursion.mermaid b/packages/run/docs/examples/recursion/recursion.mermaid new file mode 100644 index 0000000..2f9843e --- /dev/null +++ b/packages/run/docs/examples/recursion/recursion.mermaid @@ -0,0 +1,12 @@ +--- +title: RecursionWorkflow +--- + +stateDiagram-v2 + state decision <<choice>> + + direction LR + [*] --> countDown + countDown --> decision + decision --> [*]: zero + decision --> countDown: nonZero \ No newline at end of file diff --git a/packages/run/docs/examples/recursion/recursion.ts b/packages/run/docs/examples/recursion/recursion.ts new file mode 100644 index 0000000..f5adc67 --- /dev/null +++ b/packages/run/docs/examples/recursion/recursion.ts @@ -0,0 +1,17 @@ +import { RecursionWorkflowBase } from './recursion.generated' + +export type CountDownPayload = number + +export class RecursionWorkflow extends RecursionWorkflowBase { + constructor() { + super() + } + + countDown(count: CountDownPayload) { + if (count === 0) { + return Promise.resolve(this.decision.zero()) + } + + return Promise.resolve(this.decision.nonZero(count - 1)) + } +} diff --git a/packages/run/src/workflow.ts b/packages/run/src/workflow.ts index dc107fe..632e054 100644 --- a/packages/run/src/workflow.ts +++ b/packages/run/src/workflow.ts @@ -19,9 +19,9 @@ type StepResult = { payload: JSONValue } -type StepDefinition<T, R extends StepResult> = { +type StepDefinition<T> = { name: string - run: (payload: T) => Promise<R> + run: (payload: T) => Promise<StepResult> attempts: number backoffMs: number } @@ -36,14 +36,14 @@ function getCurrentOTelContext(): { traceparent: string; tracestate: string } { export abstract class WorkflowBase { // eslint-disable-next-line @typescript-eslint/no-explicit-any - private readonly steps: { [name: string]: Omit<StepDefinition<any, StepResult>, 'name'> } = {} + private readonly steps: { [name: string]: Omit<StepDefinition<any>, 'name'> } = {} protected constructor(readonly workflowType: string) { // TODO: When trying to register two distinct implementations with the same name, throw (e.g. use a hash in the generated file) workflowFactory.register(workflowType, () => new (this.constructor as new () => this)()) } - protected _registerStep<T extends JSONValue, R extends StepResult>(def: StepDefinition<T, R>): void { + protected _registerStep<T>(def: StepDefinition<T>): void { this.steps[def.name] = Object.assign({}, def, { run: def.run.bind(this) }) } diff --git a/packages/run/test/test-02-conditions/test.ts b/packages/run/test/test-02-conditions/test.ts index 0cae64b..7be0871 100644 --- a/packages/run/test/test-02-conditions/test.ts +++ b/packages/run/test/test-02-conditions/test.ts @@ -77,7 +77,7 @@ describe('test conditions workflow', () => { }, { name: 'ConditionsWorkflow.left', - parentSpanId: spansByTraceId?.[0]?.spanId, + parentSpanId: spansByTraceId[0].spanId, status: { code: SpanStatusCode.OK, message: 'Workflow Completed' }, attributes: { testId, diff --git a/packages/run/test/test-03-recursion/recursion.mermaid b/packages/run/test/test-03-recursion/recursion.mermaid new file mode 100644 index 0000000..2f9843e --- /dev/null +++ b/packages/run/test/test-03-recursion/recursion.mermaid @@ -0,0 +1,12 @@ +--- +title: RecursionWorkflow +--- + +stateDiagram-v2 + state decision <<choice>> + + direction LR + [*] --> countDown + countDown --> decision + decision --> [*]: zero + decision --> countDown: nonZero \ No newline at end of file diff --git a/packages/run/test/test-03-recursion/recursion.ts b/packages/run/test/test-03-recursion/recursion.ts new file mode 100644 index 0000000..f8a4fa9 --- /dev/null +++ b/packages/run/test/test-03-recursion/recursion.ts @@ -0,0 +1,27 @@ +import { RecursionWorkflowBase } from './recursion.generated' +import { trace } from '@opentelemetry/api' + +export type CountDownPayload = { testId: string; count: number; failuresRequested: number } + +const failures: { [testId: string]: number } = {} + +export class RecursionWorkflow extends RecursionWorkflowBase { + constructor() { + super() + } + + countDown(payload: CountDownPayload) { + trace.getActiveSpan()?.setAttributes(payload) + + if (payload.failuresRequested > (failures[payload.testId] ??= 0)) { + failures[payload.testId]++ + throw new Error('Task failed successfully :)') + } + + if (payload.count === 0) { + return Promise.resolve(this.decision.zero()) + } + + return Promise.resolve(this.decision.nonZero(Object.assign({}, payload, { count: payload.count - 1 }))) + } +} diff --git a/packages/run/test/test-03-recursion/test.ts b/packages/run/test/test-03-recursion/test.ts new file mode 100644 index 0000000..fcd1935 --- /dev/null +++ b/packages/run/test/test-03-recursion/test.ts @@ -0,0 +1,116 @@ +import InMemoryStateStore from '@gamgee/test/stateStores/in-memory' +import { WorkflowWorker } from '../../src/worker' +import { FetchStrategy } from '@gamgee/interfaces/store' +import { TestsTraceExporter } from '../tests-trace-exporter' +import { NodeSDK } from '@opentelemetry/sdk-node' +import { SimpleSpanProcessor } from '@opentelemetry/sdk-trace-base' +import { Span, SpanStatusCode, trace } from '@opentelemetry/api' +import { expect } from '@jest/globals' +import { RecursionWorkflow } from './recursion' + +function randomString(): string { + return Math.random().toString(36).slice(2) +} + +describe('test recursion workflow', () => { + const testsTraceExporter = new TestsTraceExporter() + const sdk: NodeSDK = new NodeSDK({ + spanProcessor: new SimpleSpanProcessor(testsTraceExporter), + }) + sdk.start() + + afterAll(async () => { + await sdk.shutdown() + }) + + it.concurrent('runs the workflow', async () => { + const testId = randomString() + const store = new InMemoryStateStore() + + const parentSpanContext = await trace + .getTracer('test') + .startActiveSpan(expect.getState().currentTestName!, { root: true }, async (span: Span) => { + const workflow = new RecursionWorkflow() + await workflow.submit( + { + testId, + count: 4, + failuresRequested: 0, + }, + store, + ) + + const worker = new WorkflowWorker() + const result = await worker.executeWaitingWorkflow( + store, + { workflowType: workflow.workflowType }, + FetchStrategy.Random, + 1000, + ) + + expect(result).toStrictEqual('Workflow Completed') + + return span.spanContext() + }) + + expect(store.getStats()).toStrictEqual({ + taskUpdatesSeen: 6, // Create, ran countDown(4), ran countDown(3), ran countDown(2), ran countDown(1), and done + tasksRemaining: 0, + unrecoverableTasks: 0, + }) + + const spansByTraceId = testsTraceExporter.getSpansByTraceId(parentSpanContext.traceId) + expect(spansByTraceId).toMatchObject([ + { + name: 'RecursionWorkflow.countDown', + parentSpanId: parentSpanContext.spanId, + status: { code: SpanStatusCode.OK, message: 'Continuing to countDown' }, + attributes: { + testId, + count: 4, + failuresRequested: 0, + }, + }, + { + name: 'RecursionWorkflow.countDown', + parentSpanId: spansByTraceId[0].spanId, + status: { code: SpanStatusCode.OK, message: 'Continuing to countDown' }, + attributes: { + testId, + count: 3, + failuresRequested: 0, + }, + }, + { + name: 'RecursionWorkflow.countDown', + parentSpanId: spansByTraceId[1].spanId, + status: { code: SpanStatusCode.OK, message: 'Continuing to countDown' }, + attributes: { + testId, + count: 2, + failuresRequested: 0, + }, + }, + { + name: 'RecursionWorkflow.countDown', + parentSpanId: spansByTraceId[2].spanId, + status: { code: SpanStatusCode.OK, message: 'Continuing to countDown' }, + attributes: { + testId, + count: 1, + failuresRequested: 0, + }, + }, + { + name: 'RecursionWorkflow.countDown', + parentSpanId: spansByTraceId[3].spanId, + status: { code: SpanStatusCode.OK, message: 'Workflow Completed' }, + attributes: { + testId, + count: 0, + failuresRequested: 0, + }, + }, + ]) + }) +})