Skip to content

Commit

Permalink
support set custom status (#1)
Browse files Browse the repository at this point in the history
* support set custom status

Signed-off-by: Fabian Martinez <[email protected]>

* update license header

Signed-off-by: Fabian Martinez <[email protected]>

* fix test

Signed-off-by: Fabian Martinez <[email protected]>

* fix e2e test

Signed-off-by: Fabian Martinez <[email protected]>

---------

Signed-off-by: Fabian Martinez <[email protected]>
  • Loading branch information
famarting authored Nov 25, 2024
1 parent 33a8fb6 commit 67de855
Show file tree
Hide file tree
Showing 8 changed files with 107 additions and 63 deletions.
2 changes: 2 additions & 0 deletions examples/activity-sequence.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import { TaskHubGrpcWorker } from "../src/worker/task-hub-grpc-worker";
const result3 = yield ctx.callActivity(hello, "London");
cities.push(result3);

ctx.setCustomStatus("sequence done");

return cities;
};

Expand Down
7 changes: 7 additions & 0 deletions src/task/context/orchestration-context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,11 @@ export abstract class OrchestrationContext {
* @param saveEvents {boolean} A flag indicating whether to add any unprocessed external events in the new orchestration history.
*/
abstract continueAsNew(newInput: any, saveEvents: boolean): void;

/**
* Sets the custom status
*
* @param status {string} The new custom status
*/
abstract setCustomStatus(status: string): void;
}
14 changes: 14 additions & 0 deletions src/worker/orchestration-execute-result.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright 2024 The Dapr Authors.
// Licensed under the MIT License.

import * as pb from "../proto/orchestrator_service_pb";

export class OrchestrationExecuteResult {
actions: pb.OrchestratorAction[];
customStatus: string;

constructor(actions: pb.OrchestratorAction[], customStatus: string) {
this.actions = actions;
this.customStatus = customStatus;
}
}
5 changes: 3 additions & 2 deletions src/worker/orchestration-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import { enumValueToKey } from "../utils/enum.util";
import { getOrchestrationStatusStr, isEmpty } from "../utils/pb-helper.util";
import { OrchestratorNotRegisteredError } from "./exception/orchestrator-not-registered-error";
import { StopIterationError } from "./exception/stop-iteration-error";
import { OrchestrationExecuteResult } from "./orchestration-execute-result";
import { Registry } from "./registry";
import { RuntimeOrchestrationContext } from "./runtime-orchestration-context";

Expand All @@ -36,7 +37,7 @@ export class OrchestrationExecutor {
instanceId: string,
oldEvents: pb.HistoryEvent[],
newEvents: pb.HistoryEvent[],
): Promise<pb.OrchestratorAction[]> {
): Promise<OrchestrationExecuteResult> {
if (!newEvents?.length) {
throw new OrchestrationStateError("The new history event list must have at least one event in it");
}
Expand Down Expand Up @@ -79,7 +80,7 @@ export class OrchestrationExecutor {
const actions = ctx.getActions();
console.log(`${instanceId}: Returning ${actions.length} action(s)`);

return actions;
return new OrchestrationExecuteResult(actions, ctx._customStatus);
}

private async processEvent(ctx: RuntimeOrchestrationContext, event: pb.HistoryEvent): Promise<void> {
Expand Down
6 changes: 6 additions & 0 deletions src/worker/runtime-orchestration-context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ export class RuntimeOrchestrationContext extends OrchestrationContext {
_pendingEvents: Record<string, CompletableTask<any>[]>;
_newInput?: any;
_saveEvents: any;
_customStatus: string;

constructor(instanceId: string) {
super();
Expand All @@ -45,6 +46,7 @@ export class RuntimeOrchestrationContext extends OrchestrationContext {
this._pendingEvents = {};
this._newInput = undefined;
this._saveEvents = false;
this._customStatus = "";
}

get instanceId(): string {
Expand Down Expand Up @@ -330,4 +332,8 @@ export class RuntimeOrchestrationContext extends OrchestrationContext {

this.setContinuedAsNew(newInput, saveEvents);
}

setCustomStatus(status: string) {
this._customStatus = status;
}
}
7 changes: 5 additions & 2 deletions src/worker/task-hub-grpc-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,14 @@ export class TaskHubGrpcWorker {

try {
const executor = new OrchestrationExecutor(this._registry);
const actions = await executor.execute(req.getInstanceid(), req.getPasteventsList(), req.getNeweventsList());
const result = await executor.execute(req.getInstanceid(), req.getPasteventsList(), req.getNeweventsList());

res = new pb.OrchestratorResponse();
res.setInstanceid(req.getInstanceid());
res.setActionsList(actions);
res.setActionsList(result.actions);
const cs = new StringValue();
cs.setValue(result.customStatus);
res.setCustomstatus(cs);
} catch (e: any) {
console.error(e);
console.log(`An error occurred while trying to execute instance '${req.getInstanceid()}': ${e.message}`);
Expand Down
7 changes: 5 additions & 2 deletions test/e2e/orchestration.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ describe("Durable Functions", () => {
numbers.push(current);
}

ctx.setCustomStatus("foobaz");

return numbers;
};

Expand All @@ -81,6 +83,7 @@ describe("Durable Functions", () => {
expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED);
expect(state?.serializedInput).toEqual(JSON.stringify(1));
expect(state?.serializedOutput).toEqual(JSON.stringify([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]));
expect(state?.serializedCustomStatus).toEqual("foobaz");
}, 31000);

it("should be able to run fan-out/fan-in", async () => {
Expand Down Expand Up @@ -179,13 +182,13 @@ describe("Durable Functions", () => {
await taskHubWorker.start();

const id = await taskHubClient.scheduleNewOrchestration(orchestratorParent, SUB_ORCHESTRATION_COUNT);
const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30);
const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 43);

expect(state);
expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED);
expect(state?.failureDetails).toBeUndefined();
expect(activityCounter).toEqual(SUB_ORCHESTRATION_COUNT * ACTIVITY_COUNT);
}, 31000);
}, 45000);

it("should allow waiting for multiple external events", async () => {
const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext, _: any): any {
Expand Down
Loading

0 comments on commit 67de855

Please sign in to comment.