Skip to content

Commit

Permalink
Added support for recursion and a decision ending in [*]
Browse files Browse the repository at this point in the history
  • Loading branch information
omervk committed Feb 23, 2024
1 parent dbf3e22 commit 16503c3
Show file tree
Hide file tree
Showing 13 changed files with 317 additions and 12 deletions.
20 changes: 14 additions & 6 deletions packages/design/src/scaffold.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,9 @@ function inferStepsAndDecisions(diagramStatements: Statement[]): {
}

if (to === '[*]') {
steps[from].whatsNext = EndState
if (!fromDecision) {
steps[from].whatsNext = EndState
}
} else {
if (!toDecision) {
steps[to] = steps[to] ?? {
Expand Down Expand Up @@ -304,16 +306,22 @@ function escapeString(str: string) {
}

function variableFromDecision(decision: Decision) {
const outcomes = decision.outcomes.map(
o =>
`
const outcomes = decision.outcomes.map(o => {
if (o.target !== EndState.name) {
return `
${camelCase(o.name)}(payload: ${stepNameToPayloadName(o.target)}) {
return {
targetTaskName: '${escapeString(o.target)}',
payload,
}
},`,
)
},`
} else {
return `
${camelCase(o.name)}() {
return CompleteWorkflow;
},`
}
})

return `protected ${camelCase(decision.name)} = {${outcomes.join('')}
}`
Expand Down
41 changes: 41 additions & 0 deletions packages/design/test/resources/recursion.generated.typescript
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/* This file is automatically generated. It gets overwritten on build */
import {CompleteWorkflow, WrongTimingError, WorkflowBase} from "@gamgee/run";
import {StateStore} from "@gamgee/interfaces/store";
import {WorkflowTask} from "@gamgee/interfaces/task";

import {CountDownPayload} from "./recursion";

export abstract class RecursionWorkflowBase extends WorkflowBase {
protected constructor() {
super('RecursionWorkflow');

super._registerStep({ name: 'countDown', run: this.countDown, attempts: 1, backoffMs: 1000 });
}

async submit(payload: CountDownPayload, store: StateStore, uniqueInstanceId?: string): Promise<string> {
const task = await super._enqueue('countDown', payload, store, uniqueInstanceId);
return task.instanceId;
}

abstract countDown(payload: CountDownPayload): Promise<ReturnType<(typeof this.decision)['zero']>> | Promise<ReturnType<(typeof this.decision)['nonZero']>>;

protected decision = {
zero() {
return CompleteWorkflow;
},
nonZero(payload: CountDownPayload) {
return {
targetTaskName: 'countDown',
payload,
}
},
}

protected _registerStep() {
throw new WrongTimingError();
}

protected _enqueue(): Promise<WorkflowTask> {
throw new WrongTimingError();
}
}
12 changes: 12 additions & 0 deletions packages/design/test/resources/recursion.mermaid
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
---
title: RecursionWorkflow
---

stateDiagram-v2
state decision <<choice>>

direction LR
[*] --> countDown
countDown --> decision
decision --> [*]: zero
decision --> countDown: nonZero
3 changes: 2 additions & 1 deletion packages/run/docs/examples/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Examples

1. Simple workflow with one step: [simple-workflow](./simple-workflow).
2. A workflow with a decision between going with the left or right branches: [conditions](./conditions).
2. A workflow with a decision between going with the left or right branches: [conditions](./conditions).
3. A workflow counting down recursively: [recursion](./recursion).
18 changes: 18 additions & 0 deletions packages/run/docs/examples/recursion/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
```mermaid
---
title: RecursionWorkflow
---
stateDiagram-v2
direction TB
state decision <<choice>>
[*] --> countDown
countDown --> decision
decision --> [*]: zero
decision --> countDown: nonZero
```

A workflow counting down recursively.

[[Diagram Source](./recursion.mermaid)] [[Generated Scaffold](./recursion.generated.ts)] [[Implementation](./recursion.ts)]
41 changes: 41 additions & 0 deletions packages/run/docs/examples/recursion/recursion.generated.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/* This file is automatically generated. It gets overwritten on build */
import {CompleteWorkflow, WrongTimingError, WorkflowBase} from "@gamgee/run";
import {StateStore} from "@gamgee/interfaces/store";
import {WorkflowTask} from "@gamgee/interfaces/task";

import {CountDownPayload} from "./recursion";

export abstract class RecursionWorkflowBase extends WorkflowBase {
protected constructor() {
super('RecursionWorkflow');

super._registerStep({ name: 'countDown', run: this.countDown, attempts: 1, backoffMs: 1000 });
}

async submit(payload: CountDownPayload, store: StateStore, uniqueInstanceId?: string): Promise<string> {
const task = await super._enqueue('countDown', payload, store, uniqueInstanceId);
return task.instanceId;
}

abstract countDown(payload: CountDownPayload): Promise<ReturnType<(typeof this.decision)['zero']>> | Promise<ReturnType<(typeof this.decision)['nonZero']>>;

protected decision = {
zero() {
return CompleteWorkflow;
},
nonZero(payload: CountDownPayload) {
return {
targetTaskName: 'countDown',
payload,
}
},
}

protected _registerStep() {
throw new WrongTimingError();
}

protected _enqueue(): Promise<WorkflowTask> {
throw new WrongTimingError();
}
}
12 changes: 12 additions & 0 deletions packages/run/docs/examples/recursion/recursion.mermaid
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
---
title: RecursionWorkflow
---

stateDiagram-v2
state decision <<choice>>

direction LR
[*] --> countDown
countDown --> decision
decision --> [*]: zero
decision --> countDown: nonZero
17 changes: 17 additions & 0 deletions packages/run/docs/examples/recursion/recursion.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { RecursionWorkflowBase } from './recursion.generated'

export type CountDownPayload = number

export class RecursionWorkflow extends RecursionWorkflowBase {
constructor() {
super()
}

countDown(count: CountDownPayload) {
if (count === 0) {
return Promise.resolve(this.decision.zero())
}

return Promise.resolve(this.decision.nonZero(count - 1))
}
}
8 changes: 4 additions & 4 deletions packages/run/src/workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ type StepResult = {
payload: JSONValue
}

type StepDefinition<T, R extends StepResult> = {
type StepDefinition<T> = {
name: string
run: (payload: T) => Promise<R>
run: (payload: T) => Promise<StepResult>
attempts: number
backoffMs: number
}
Expand All @@ -36,14 +36,14 @@ function getCurrentOTelContext(): { traceparent: string; tracestate: string } {

export abstract class WorkflowBase {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
private readonly steps: { [name: string]: Omit<StepDefinition<any, StepResult>, 'name'> } = {}
private readonly steps: { [name: string]: Omit<StepDefinition<any>, 'name'> } = {}

protected constructor(readonly workflowType: string) {
// TODO: When trying to register two distinct implementations with the same name, throw (e.g. use a hash in the generated file)
workflowFactory.register(workflowType, () => new (this.constructor as new () => this)())
}

protected _registerStep<T extends JSONValue, R extends StepResult>(def: StepDefinition<T, R>): void {
protected _registerStep<T>(def: StepDefinition<T>): void {
this.steps[def.name] = Object.assign({}, def, { run: def.run.bind(this) })
}

Expand Down
2 changes: 1 addition & 1 deletion packages/run/test/test-02-conditions/test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ describe('test conditions workflow', () => {
},
{
name: 'ConditionsWorkflow.left',
parentSpanId: spansByTraceId?.[0]?.spanId,
parentSpanId: spansByTraceId[0].spanId,
status: { code: SpanStatusCode.OK, message: 'Workflow Completed' },
attributes: {
testId,
Expand Down
12 changes: 12 additions & 0 deletions packages/run/test/test-03-recursion/recursion.mermaid
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
---
title: RecursionWorkflow
---

stateDiagram-v2
state decision <<choice>>

direction LR
[*] --> countDown
countDown --> decision
decision --> [*]: zero
decision --> countDown: nonZero
27 changes: 27 additions & 0 deletions packages/run/test/test-03-recursion/recursion.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import { RecursionWorkflowBase } from './recursion.generated'
import { trace } from '@opentelemetry/api'

export type CountDownPayload = { testId: string; count: number; failuresRequested: number }

const failures: { [testId: string]: number } = {}

export class RecursionWorkflow extends RecursionWorkflowBase {
constructor() {
super()
}

countDown(payload: CountDownPayload) {
trace.getActiveSpan()?.setAttributes(payload)

if (payload.failuresRequested > (failures[payload.testId] ??= 0)) {
failures[payload.testId]++
throw new Error('Task failed successfully :)')
}

if (payload.count === 0) {
return Promise.resolve(this.decision.zero())
}

return Promise.resolve(this.decision.nonZero(Object.assign({}, payload, { count: payload.count - 1 })))
}
}
116 changes: 116 additions & 0 deletions packages/run/test/test-03-recursion/test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import InMemoryStateStore from '@gamgee/test/stateStores/in-memory'
import { WorkflowWorker } from '../../src/worker'
import { FetchStrategy } from '@gamgee/interfaces/store'
import { TestsTraceExporter } from '../tests-trace-exporter'
import { NodeSDK } from '@opentelemetry/sdk-node'
import { SimpleSpanProcessor } from '@opentelemetry/sdk-trace-base'
import { Span, SpanStatusCode, trace } from '@opentelemetry/api'
import { expect } from '@jest/globals'
import { RecursionWorkflow } from './recursion'

function randomString(): string {
return Math.random().toString(36).slice(2)
}

describe('test recursion workflow', () => {
const testsTraceExporter = new TestsTraceExporter()
const sdk: NodeSDK = new NodeSDK({
spanProcessor: new SimpleSpanProcessor(testsTraceExporter),
})
sdk.start()

afterAll(async () => {
await sdk.shutdown()
})

it.concurrent('runs the workflow', async () => {
const testId = randomString()
const store = new InMemoryStateStore()

const parentSpanContext = await trace
.getTracer('test')
.startActiveSpan(expect.getState().currentTestName!, { root: true }, async (span: Span) => {
const workflow = new RecursionWorkflow()
await workflow.submit(
{
testId,
count: 4,
failuresRequested: 0,
},
store,
)

const worker = new WorkflowWorker()
const result = await worker.executeWaitingWorkflow(
store,
{ workflowType: workflow.workflowType },
FetchStrategy.Random,
1000,
)

expect(result).toStrictEqual('Workflow Completed')

return span.spanContext()
})

expect(store.getStats()).toStrictEqual({
taskUpdatesSeen: 6, // Create, ran countDown(4), ran countDown(3), ran countDown(2), ran countDown(1), and done
tasksRemaining: 0,
unrecoverableTasks: 0,
})

const spansByTraceId = testsTraceExporter.getSpansByTraceId(parentSpanContext.traceId)
expect(spansByTraceId).toMatchObject([
{
name: 'RecursionWorkflow.countDown',
parentSpanId: parentSpanContext.spanId,
status: { code: SpanStatusCode.OK, message: 'Continuing to countDown' },
attributes: {
testId,
count: 4,
failuresRequested: 0,
},
},
{
name: 'RecursionWorkflow.countDown',
parentSpanId: spansByTraceId[0].spanId,
status: { code: SpanStatusCode.OK, message: 'Continuing to countDown' },
attributes: {
testId,
count: 3,
failuresRequested: 0,
},
},
{
name: 'RecursionWorkflow.countDown',
parentSpanId: spansByTraceId[1].spanId,
status: { code: SpanStatusCode.OK, message: 'Continuing to countDown' },
attributes: {
testId,
count: 2,
failuresRequested: 0,
},
},
{
name: 'RecursionWorkflow.countDown',
parentSpanId: spansByTraceId[2].spanId,
status: { code: SpanStatusCode.OK, message: 'Continuing to countDown' },
attributes: {
testId,
count: 1,
failuresRequested: 0,
},
},
{
name: 'RecursionWorkflow.countDown',
parentSpanId: spansByTraceId[3].spanId,
status: { code: SpanStatusCode.OK, message: 'Workflow Completed' },
attributes: {
testId,
count: 0,
failuresRequested: 0,
},
},
])
})
})

0 comments on commit 16503c3

Please sign in to comment.