From 67de8556457c7172c56f0b9e681a89fa5d4f353b Mon Sep 17 00:00:00 2001 From: Fabian Martinez <46371672+famarting@users.noreply.github.com> Date: Mon, 25 Nov 2024 17:39:36 +0100 Subject: [PATCH] support set custom status (#1) * support set custom status Signed-off-by: Fabian Martinez <46371672+famarting@users.noreply.github.com> * update license header Signed-off-by: Fabian Martinez <46371672+famarting@users.noreply.github.com> * fix test Signed-off-by: Fabian Martinez <46371672+famarting@users.noreply.github.com> * fix e2e test Signed-off-by: Fabian Martinez <46371672+famarting@users.noreply.github.com> --------- Signed-off-by: Fabian Martinez <46371672+famarting@users.noreply.github.com> --- examples/activity-sequence.ts | 2 + src/task/context/orchestration-context.ts | 7 ++ src/worker/orchestration-execute-result.ts | 14 +++ src/worker/orchestration-executor.ts | 5 +- src/worker/runtime-orchestration-context.ts | 6 + src/worker/task-hub-grpc-worker.ts | 7 +- test/e2e/orchestration.spec.ts | 7 +- test/unit/orchestration_executor.spec.ts | 122 +++++++++++--------- 8 files changed, 107 insertions(+), 63 deletions(-) create mode 100644 src/worker/orchestration-execute-result.ts diff --git a/examples/activity-sequence.ts b/examples/activity-sequence.ts index ac57d40..a6eddf3 100644 --- a/examples/activity-sequence.ts +++ b/examples/activity-sequence.ts @@ -28,6 +28,8 @@ import { TaskHubGrpcWorker } from "../src/worker/task-hub-grpc-worker"; const result3 = yield ctx.callActivity(hello, "London"); cities.push(result3); + ctx.setCustomStatus("sequence done"); + return cities; }; diff --git a/src/task/context/orchestration-context.ts b/src/task/context/orchestration-context.ts index cb49151..834b117 100644 --- a/src/task/context/orchestration-context.ts +++ b/src/task/context/orchestration-context.ts @@ -86,4 +86,11 @@ export abstract class OrchestrationContext { * @param saveEvents {boolean} A flag indicating whether to add any unprocessed external events in the new orchestration history. */ abstract continueAsNew(newInput: any, saveEvents: boolean): void; + + /** + * Sets the custom status + * + * @param status {string} The new custom status + */ + abstract setCustomStatus(status: string): void; } diff --git a/src/worker/orchestration-execute-result.ts b/src/worker/orchestration-execute-result.ts new file mode 100644 index 0000000..5574d03 --- /dev/null +++ b/src/worker/orchestration-execute-result.ts @@ -0,0 +1,14 @@ +// Copyright 2024 The Dapr Authors. +// Licensed under the MIT License. + +import * as pb from "../proto/orchestrator_service_pb"; + +export class OrchestrationExecuteResult { + actions: pb.OrchestratorAction[]; + customStatus: string; + + constructor(actions: pb.OrchestratorAction[], customStatus: string) { + this.actions = actions; + this.customStatus = customStatus; + } +} diff --git a/src/worker/orchestration-executor.ts b/src/worker/orchestration-executor.ts index b8fd7a0..adafe6f 100644 --- a/src/worker/orchestration-executor.ts +++ b/src/worker/orchestration-executor.ts @@ -16,6 +16,7 @@ import { enumValueToKey } from "../utils/enum.util"; import { getOrchestrationStatusStr, isEmpty } from "../utils/pb-helper.util"; import { OrchestratorNotRegisteredError } from "./exception/orchestrator-not-registered-error"; import { StopIterationError } from "./exception/stop-iteration-error"; +import { OrchestrationExecuteResult } from "./orchestration-execute-result"; import { Registry } from "./registry"; import { RuntimeOrchestrationContext } from "./runtime-orchestration-context"; @@ -36,7 +37,7 @@ export class OrchestrationExecutor { instanceId: string, oldEvents: pb.HistoryEvent[], newEvents: pb.HistoryEvent[], - ): Promise { + ): Promise { if (!newEvents?.length) { throw new OrchestrationStateError("The new history event list must have at least one event in it"); } @@ -79,7 +80,7 @@ export class OrchestrationExecutor { const actions = ctx.getActions(); console.log(`${instanceId}: Returning ${actions.length} action(s)`); - return actions; + return new OrchestrationExecuteResult(actions, ctx._customStatus); } private async processEvent(ctx: RuntimeOrchestrationContext, event: pb.HistoryEvent): Promise { diff --git a/src/worker/runtime-orchestration-context.ts b/src/worker/runtime-orchestration-context.ts index ae42ad6..a928ef9 100644 --- a/src/worker/runtime-orchestration-context.ts +++ b/src/worker/runtime-orchestration-context.ts @@ -27,6 +27,7 @@ export class RuntimeOrchestrationContext extends OrchestrationContext { _pendingEvents: Record[]>; _newInput?: any; _saveEvents: any; + _customStatus: string; constructor(instanceId: string) { super(); @@ -45,6 +46,7 @@ export class RuntimeOrchestrationContext extends OrchestrationContext { this._pendingEvents = {}; this._newInput = undefined; this._saveEvents = false; + this._customStatus = ""; } get instanceId(): string { @@ -330,4 +332,8 @@ export class RuntimeOrchestrationContext extends OrchestrationContext { this.setContinuedAsNew(newInput, saveEvents); } + + setCustomStatus(status: string) { + this._customStatus = status; + } } diff --git a/src/worker/task-hub-grpc-worker.ts b/src/worker/task-hub-grpc-worker.ts index 53f79b1..048b708 100644 --- a/src/worker/task-hub-grpc-worker.ts +++ b/src/worker/task-hub-grpc-worker.ts @@ -217,11 +217,14 @@ export class TaskHubGrpcWorker { try { const executor = new OrchestrationExecutor(this._registry); - const actions = await executor.execute(req.getInstanceid(), req.getPasteventsList(), req.getNeweventsList()); + const result = await executor.execute(req.getInstanceid(), req.getPasteventsList(), req.getNeweventsList()); res = new pb.OrchestratorResponse(); res.setInstanceid(req.getInstanceid()); - res.setActionsList(actions); + res.setActionsList(result.actions); + const cs = new StringValue(); + cs.setValue(result.customStatus); + res.setCustomstatus(cs); } catch (e: any) { console.error(e); console.log(`An error occurred while trying to execute instance '${req.getInstanceid()}': ${e.message}`); diff --git a/test/e2e/orchestration.spec.ts b/test/e2e/orchestration.spec.ts index 03daf47..694beff 100644 --- a/test/e2e/orchestration.spec.ts +++ b/test/e2e/orchestration.spec.ts @@ -64,6 +64,8 @@ describe("Durable Functions", () => { numbers.push(current); } + ctx.setCustomStatus("foobaz"); + return numbers; }; @@ -81,6 +83,7 @@ describe("Durable Functions", () => { expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); expect(state?.serializedInput).toEqual(JSON.stringify(1)); expect(state?.serializedOutput).toEqual(JSON.stringify([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11])); + expect(state?.serializedCustomStatus).toEqual("foobaz"); }, 31000); it("should be able to run fan-out/fan-in", async () => { @@ -179,13 +182,13 @@ describe("Durable Functions", () => { await taskHubWorker.start(); const id = await taskHubClient.scheduleNewOrchestration(orchestratorParent, SUB_ORCHESTRATION_COUNT); - const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 43); expect(state); expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); expect(state?.failureDetails).toBeUndefined(); expect(activityCounter).toEqual(SUB_ORCHESTRATION_COUNT * ACTIVITY_COUNT); - }, 31000); + }, 45000); it("should allow waiting for multiple external events", async () => { const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext, _: any): any { diff --git a/test/unit/orchestration_executor.spec.ts b/test/unit/orchestration_executor.spec.ts index 0a4c2be..05c0c72 100644 --- a/test/unit/orchestration_executor.spec.ts +++ b/test/unit/orchestration_executor.spec.ts @@ -46,8 +46,8 @@ describe("Orchestration Executor", () => { newExecutionStartedEvent(name, TEST_INSTANCE_ID, JSON.stringify(testInput)), ]; const executor = new OrchestrationExecutor(registry); - const actions = await executor.execute(TEST_INSTANCE_ID, [], newEvents); - const completeAction = getAndValidateSingleCompleteOrchestrationAction(actions); + const result = await executor.execute(TEST_INSTANCE_ID, [], newEvents); + const completeAction = getAndValidateSingleCompleteOrchestrationAction(result.actions); expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); expect(completeAction?.getResult()).not.toBeNull(); const expectedOutput = [testInput, TEST_INSTANCE_ID, startTime.toISOString(), false]; @@ -62,8 +62,8 @@ describe("Orchestration Executor", () => { const name = registry.addOrchestrator(emptyOrchestrator); const newEvents = [newExecutionStartedEvent(name, TEST_INSTANCE_ID, undefined)]; const executor = new OrchestrationExecutor(registry); - const actions = await executor.execute(TEST_INSTANCE_ID, [], newEvents); - const completeAction = getAndValidateSingleCompleteOrchestrationAction(actions); + const result = await executor.execute(TEST_INSTANCE_ID, [], newEvents); + const completeAction = getAndValidateSingleCompleteOrchestrationAction(result.actions); expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); expect(completeAction?.getResult()).not.toBeNull(); expect(completeAction?.getResult()?.getValue()).toEqual('"done"'); @@ -73,8 +73,8 @@ describe("Orchestration Executor", () => { const name = "Bogus"; const newEvents = [newExecutionStartedEvent(name, TEST_INSTANCE_ID, undefined)]; const executor = new OrchestrationExecutor(registry); - const actions = await executor.execute(TEST_INSTANCE_ID, [], newEvents); - const completeAction = getAndValidateSingleCompleteOrchestrationAction(actions); + const result = await executor.execute(TEST_INSTANCE_ID, [], newEvents); + const completeAction = getAndValidateSingleCompleteOrchestrationAction(result.actions); expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_FAILED); expect(completeAction?.getFailuredetails()?.getErrortype()).toEqual("OrchestratorNotRegisteredError"); expect(completeAction?.getFailuredetails()?.getErrormessage()).not.toBeNull(); @@ -95,7 +95,8 @@ describe("Orchestration Executor", () => { newExecutionStartedEvent(name, TEST_INSTANCE_ID, undefined), ]; const executor = new OrchestrationExecutor(registry); - const actions = await executor.execute(TEST_INSTANCE_ID, [], newEvents); + const result = await executor.execute(TEST_INSTANCE_ID, [], newEvents); + const actions = result.actions; expect(actions).not.toBeNull(); expect(actions.length).toEqual(1); expect(actions[0]?.constructor?.name).toEqual(OrchestratorAction.name); @@ -120,8 +121,8 @@ describe("Orchestration Executor", () => { ]; const newEvents = [newTimerFiredEvent(1, expectedFireAt)]; const executor = new OrchestrationExecutor(registry); - const actions = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); - const completeAction = getAndValidateSingleCompleteOrchestrationAction(actions); + const result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + const completeAction = getAndValidateSingleCompleteOrchestrationAction(result.actions); expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); expect(completeAction?.getResult()).not.toBeNull(); expect(completeAction?.getResult()?.getValue()).toEqual('"done"'); @@ -138,7 +139,8 @@ describe("Orchestration Executor", () => { const name = registry.addOrchestrator(orchestrator); const newEvents = [newExecutionStartedEvent(name, TEST_INSTANCE_ID, undefined)]; const executor = new OrchestrationExecutor(registry); - const actions = await executor.execute(TEST_INSTANCE_ID, [], newEvents); + const result = await executor.execute(TEST_INSTANCE_ID, [], newEvents); + const actions = result.actions; expect(actions).not.toBeNull(); expect(actions.length).toEqual(1); expect(actions[0]?.constructor?.name).toEqual(OrchestratorAction.name); @@ -163,8 +165,8 @@ describe("Orchestration Executor", () => { const encodedOutput = JSON.stringify("done!"); const newEvents = [newTaskCompletedEvent(1, encodedOutput)]; const executor = new OrchestrationExecutor(registry); - const actions = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); - const completeAction = getAndValidateSingleCompleteOrchestrationAction(actions); + const result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + const completeAction = getAndValidateSingleCompleteOrchestrationAction(result.actions); console.log(completeAction?.getFailuredetails()); expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); expect(completeAction?.getResult()?.getValue()).toEqual(encodedOutput); @@ -187,8 +189,8 @@ describe("Orchestration Executor", () => { const encodedOutput = JSON.stringify("done!"); const newEvents = [newTaskCompletedEvent(1, encodedOutput)]; const executor = new OrchestrationExecutor(registry); - const actions = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); - const completeAction = getAndValidateSingleCompleteOrchestrationAction(actions); + const result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + const completeAction = getAndValidateSingleCompleteOrchestrationAction(result.actions); console.log(completeAction?.getFailuredetails()); expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); expect(completeAction?.getResult()?.getValue()).toEqual(encodedOutput); @@ -211,8 +213,8 @@ describe("Orchestration Executor", () => { const ex = new Error("Kah-BOOOOM!!!"); const newEvents = [newTaskFailedEvent(1, ex)]; const executor = new OrchestrationExecutor(registry); - const actions = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); - const completeAction = getAndValidateSingleCompleteOrchestrationAction(actions); + const result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + const completeAction = getAndValidateSingleCompleteOrchestrationAction(result.actions); expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_FAILED); expect(completeAction?.getFailuredetails()?.getErrortype()).toEqual("TaskFailedError"); expect(completeAction?.getFailuredetails()?.getErrormessage()).toContain(ex.message); @@ -241,8 +243,8 @@ describe("Orchestration Executor", () => { ]; const newEvents = [newTimerFiredEvent(1, fireAt)]; const executor = new OrchestrationExecutor(registry); - const actions = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); - const completeAction = getAndValidateSingleCompleteOrchestrationAction(actions); + const result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + const completeAction = getAndValidateSingleCompleteOrchestrationAction(result.actions); expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_FAILED); expect(completeAction?.getFailuredetails()?.getErrortype()).toEqual("NonDeterminismError"); expect(completeAction?.getFailuredetails()?.getErrormessage()).toContain("1"); @@ -263,8 +265,8 @@ describe("Orchestration Executor", () => { ]; const newEvents = [newTaskCompletedEvent(1, "done!")]; const executor = new OrchestrationExecutor(registry); - const actions = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); - const completeAction = getAndValidateSingleCompleteOrchestrationAction(actions); + const result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + const completeAction = getAndValidateSingleCompleteOrchestrationAction(result.actions); expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_FAILED); expect(completeAction?.getFailuredetails()?.getErrortype()).toEqual("NonDeterminismError"); expect(completeAction?.getFailuredetails()?.getErrormessage()).toContain("1"); @@ -287,8 +289,8 @@ describe("Orchestration Executor", () => { ]; const newEvents = [newTaskCompletedEvent(1)]; const executor = new OrchestrationExecutor(registry); - const actions = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); - const completeAction = getAndValidateSingleCompleteOrchestrationAction(actions); + const result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + const completeAction = getAndValidateSingleCompleteOrchestrationAction(result.actions); expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_FAILED); expect(completeAction?.getFailuredetails()?.getErrortype()).toEqual("NonDeterminismError"); expect(completeAction?.getFailuredetails()?.getErrormessage()).toContain("1"); @@ -312,8 +314,8 @@ describe("Orchestration Executor", () => { ]; const newEvents = [newTaskCompletedEvent(1)]; const executor = new OrchestrationExecutor(registry); - const actions = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); - const completeAction = getAndValidateSingleCompleteOrchestrationAction(actions); + const result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + const completeAction = getAndValidateSingleCompleteOrchestrationAction(result.actions); expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_FAILED); expect(completeAction?.getFailuredetails()?.getErrortype()).toEqual("NonDeterminismError"); expect(completeAction?.getFailuredetails()?.getErrormessage()).toContain("1"); @@ -339,8 +341,8 @@ describe("Orchestration Executor", () => { ]; const newEvents = [newSubOrchestrationCompletedEvent(1, "42")]; const executor = new OrchestrationExecutor(registry); - const actions = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); - const completeAction = getAndValidateSingleCompleteOrchestrationAction(actions); + const result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + const completeAction = getAndValidateSingleCompleteOrchestrationAction(result.actions); expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); expect(completeAction?.getResult()?.getValue()).toEqual("42"); }); @@ -363,8 +365,8 @@ describe("Orchestration Executor", () => { const ex = new Error("Kah-BOOOOM!!!"); const newEvents = [newSubOrchestrationFailedEvent(1, ex)]; const executor = new OrchestrationExecutor(registry); - const actions = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); - const completeAction = getAndValidateSingleCompleteOrchestrationAction(actions); + const result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + const completeAction = getAndValidateSingleCompleteOrchestrationAction(result.actions); expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_FAILED); expect(completeAction?.getFailuredetails()?.getErrortype()).toEqual("TaskFailedError"); expect(completeAction?.getFailuredetails()?.getErrormessage()).toContain(ex.message); @@ -387,8 +389,8 @@ describe("Orchestration Executor", () => { ]; const newEvents = [newSubOrchestrationCompletedEvent(1, "42")]; const executor = new OrchestrationExecutor(registry); - const actions = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); - const completeAction = getAndValidateSingleCompleteOrchestrationAction(actions); + const result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + const completeAction = getAndValidateSingleCompleteOrchestrationAction(result.actions); expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_FAILED); expect(completeAction?.getFailuredetails()?.getErrortype()).toEqual("NonDeterminismError"); expect(completeAction?.getFailuredetails()?.getErrormessage()).toContain("1"); @@ -417,8 +419,8 @@ describe("Orchestration Executor", () => { ]; const newEvents = [newSubOrchestrationCompletedEvent(1, "42")]; const executor = new OrchestrationExecutor(registry); - const actions = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); - const completeAction = getAndValidateSingleCompleteOrchestrationAction(actions); + const result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + const completeAction = getAndValidateSingleCompleteOrchestrationAction(result.actions); expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_FAILED); expect(completeAction?.getFailuredetails()?.getErrortype()).toEqual("NonDeterminismError"); expect(completeAction?.getFailuredetails()?.getErrormessage()).toContain("1"); @@ -444,17 +446,17 @@ describe("Orchestration Executor", () => { // Execute the orchestration until it is waiting for an external event. // The result should be an empty list of actions because the orchestration didn't schedule any work let executor = new OrchestrationExecutor(registry); - let actions = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); - expect(actions.length).toBe(0); + let result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + expect(result.actions.length).toBe(0); // Now send an external event to the orchestration and execute it again. // This time the orcehstration should complete oldEvents = newEvents; newEvents = [newEventRaisedEvent("my_event", "42")]; executor = new OrchestrationExecutor(registry); - actions = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); - const completeAction = getAndValidateSingleCompleteOrchestrationAction(actions); + const completeAction = getAndValidateSingleCompleteOrchestrationAction(result.actions); expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); expect(completeAction?.getResult()?.getValue()).toEqual("42"); }); @@ -479,7 +481,8 @@ describe("Orchestration Executor", () => { // Execute the orchestration // It should be in a running state waiting for the timer to fire let executor = new OrchestrationExecutor(registry); - let actions = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + let result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + let actions = result.actions; expect(actions.length).toBe(1); expect(actions[0].hasCreatetimer()).toBeTruthy(); @@ -491,7 +494,8 @@ describe("Orchestration Executor", () => { oldEvents = newEvents; newEvents = [newTimerFiredEvent(1, timerDueTime)]; executor = new OrchestrationExecutor(registry); - actions = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + actions = result.actions; const completeAction = getAndValidateSingleCompleteOrchestrationAction(actions); expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); @@ -515,15 +519,16 @@ describe("Orchestration Executor", () => { // It should be in a running state because it was suspended prior // to the processing the event raised event let executor = new OrchestrationExecutor(registry); - let actions = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + let result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + let actions = result.actions; expect(actions.length).toBe(0); // Resume the orchestration, it should complete successfully oldEvents.push(...newEvents); newEvents = [newResumeEvent()]; executor = new OrchestrationExecutor(registry); - actions = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); - + result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + actions = result.actions; const completeAction = getAndValidateSingleCompleteOrchestrationAction(actions); expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); expect(completeAction?.getResult()?.getValue()).toEqual("42"); @@ -544,12 +549,10 @@ describe("Orchestration Executor", () => { // Execute the orchestration // It should be in a running state waiting for an external event - let executor = new OrchestrationExecutor(registry); - let actions = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); - executor = new OrchestrationExecutor(registry); - actions = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + const executor = new OrchestrationExecutor(registry); + const result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); - const completeAction = getAndValidateSingleCompleteOrchestrationAction(actions); + const completeAction = getAndValidateSingleCompleteOrchestrationAction(result.actions); expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_TERMINATED); expect(completeAction?.getResult()?.getValue()).toEqual(JSON.stringify("terminated!")); }); @@ -577,9 +580,9 @@ describe("Orchestration Executor", () => { // Execute the orchestration, it should be in a running state waiting for the timer to fire const executor = new OrchestrationExecutor(registry); - const actions = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + const result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); - const completeAction = getAndValidateSingleCompleteOrchestrationAction(actions); + const completeAction = getAndValidateSingleCompleteOrchestrationAction(result.actions); expect(completeAction?.getOrchestrationstatus()).toEqual( pb.OrchestrationStatus.ORCHESTRATION_STATUS_CONTINUED_AS_NEW, ); @@ -626,8 +629,8 @@ describe("Orchestration Executor", () => { ]; const executor = new OrchestrationExecutor(registry); - const actions = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); - + const result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + const actions = result.actions; // The result should be 10 "taskScheduled" actions with inputs from 0 to 9 expect(actions.length).toEqual(10); @@ -674,13 +677,15 @@ describe("Orchestration Executor", () => { // we expect the orchestrator to be running // it should however return 0 actions, since it is still waiting for the other 5 tasks to complete let executor = new OrchestrationExecutor(registry); - let actions = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents.slice(0, 4)); + let result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents.slice(0, 4)); + let actions = result.actions; expect(actions.length).toBe(0); // Now test with the full set of new events // we expect the orchestration to complete executor = new OrchestrationExecutor(registry); - actions = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + actions = result.actions; const completeAction = getAndValidateSingleCompleteOrchestrationAction(actions); expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); @@ -727,9 +732,9 @@ describe("Orchestration Executor", () => { // Now test with the full set of new events // We expect the orchestration to complete const executor = new OrchestrationExecutor(registry); - const actions = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + const result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); - const completeAction = getAndValidateSingleCompleteOrchestrationAction(actions); + const completeAction = getAndValidateSingleCompleteOrchestrationAction(result.actions); expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_FAILED); expect(completeAction?.getFailuredetails()?.getErrortype()).toEqual("TaskFailedError"); expect(completeAction?.getFailuredetails()?.getErrormessage()).toContain(ex.message); @@ -762,7 +767,8 @@ describe("Orchestration Executor", () => { let oldEvents: any[] = []; let newEvents = [newExecutionStartedEvent(orchestratorName, TEST_INSTANCE_ID)]; let executor = new OrchestrationExecutor(registry); - let actions = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + let result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + let actions = result.actions; expect(actions.length).toEqual(2); expect(actions[0].hasScheduletask()).toBeTruthy(); @@ -781,7 +787,8 @@ describe("Orchestration Executor", () => { let encodedOutput = JSON.stringify(hello(null, "Tokyo")); newEvents = [newTaskCompletedEvent(1, encodedOutput)]; executor = new OrchestrationExecutor(registry); - actions = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + actions = result.actions; let completeAction = getAndValidateSingleCompleteOrchestrationAction(actions); expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); expect(completeAction?.getResult()?.getValue()).toEqual(encodedOutput); @@ -791,7 +798,8 @@ describe("Orchestration Executor", () => { encodedOutput = JSON.stringify(hello(null, "Seattle")); newEvents = [newTaskCompletedEvent(2, encodedOutput)]; executor = new OrchestrationExecutor(registry); - actions = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + result = await executor.execute(TEST_INSTANCE_ID, oldEvents, newEvents); + actions = result.actions; completeAction = getAndValidateSingleCompleteOrchestrationAction(actions); expect(completeAction?.getOrchestrationstatus()).toEqual(pb.OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); expect(completeAction?.getResult()?.getValue()).toEqual(encodedOutput);