Skip to content

Commit

Permalink
Fix cluster start/stop (#386)
Browse files Browse the repository at this point in the history
* Fix bugs in cluster-manager start/stop

* Initialize cluster with lock held
  • Loading branch information
dandavison authored Oct 17, 2024
1 parent 486d428 commit b95d2b5
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 27 deletions.
5 changes: 5 additions & 0 deletions message-passing/safe-message-handlers/src/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ export async function startCluster(): Promise<void> {
await activities.sleep(100); // Simulate RPC
}

export async function shutdownCluster(): Promise<void> {
activities.log.info('Shutting down cluster');
await activities.sleep(100); // Simulate RPC
}

export async function assignNodesToJob(input: AssignNodesToJobInput): Promise<void> {
activities.log.info(`Assigning nodes ${input.nodes} to job ${input.jobName}`);
await activities.sleep(100); // Simulate RPC
Expand Down
45 changes: 28 additions & 17 deletions message-passing/safe-message-handlers/src/cluster-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import { Mutex } from 'async-mutex';
import {
AssignNodesToJobUpdateInput,
ClusterManagerState,
ClusterState,
ClusterManagerStateSummary,
DeleteJobUpdateInput,
} from './types';

const { assignNodesToJob, unassignNodesForJob, startCluster } = wf.proxyActivities<typeof activities>({
const { assignNodesToJob, unassignNodesForJob, startCluster, shutdownCluster } = wf.proxyActivities<typeof activities>({
startToCloseTimeout: '1 minute',
});

Expand All @@ -27,8 +28,7 @@ export class ClusterManager {

constructor(state?: ClusterManagerState) {
this.state = state ?? {
clusterStarted: false,
clusterShutdown: false,
clusterState: ClusterState.NOT_STARTED,
nodes: new Map<string, string | null>(),
maxAssignedNodes: 0,
};
Expand All @@ -37,27 +37,39 @@ export class ClusterManager {
}

async startCluster(): Promise<void> {
await startCluster();
this.state.clusterStarted = true;
for (let i = 0; i < 25; i++) {
this.state.nodes.set(i.toString(), null);
if (this.state.clusterState !== ClusterState.NOT_STARTED) {
// This is used as a Signal handler so we log a warning but cannot return an error.
wf.log.warn(`Cannot start cluster in state ${this.state.clusterState}`);
return;
}
await this.nodesMutex.runExclusive(async () => {
await startCluster();
this.state.clusterState = ClusterState.STARTED;
for (let i = 0; i < 25; i++) {
this.state.nodes.set(i.toString(), null);
}
});
wf.log.info('Cluster started');
}

async shutDownCluster(): Promise<void> {
await wf.condition(() => this.state.clusterStarted);
this.state.clusterShutdown = true;
async shutdownCluster(): Promise<true> {
if (this.state.clusterState !== ClusterState.STARTED) {
// This is used as an Update handler we return an error to the caller.
throw new wf.ApplicationFailure(`Cannot shutdown cluster in state ${this.state.clusterState}`);
}
await shutdownCluster();
this.state.clusterState = ClusterState.SHUTTING_DOWN;
wf.log.info('Cluster shutdown');
return true;
}

async assignNodesToJob(input: AssignNodesToJobUpdateInput): Promise<ClusterManagerStateSummary> {
await wf.condition(() => this.state.clusterStarted);
if (this.state.clusterShutdown) {
await wf.condition(() => this.state.clusterState === ClusterState.STARTED);
if (this.state.clusterState === ClusterState.SHUTTING_DOWN) {
// If you want the client to receive a failure, either add an update validator and throw the
// exception from there, or raise an ApplicationError. Other exceptions in the handler will
// cause the workflow to keep retrying and get it stuck.
throw new wf.ApplicationFailure('Cannot assign nodes to a job: Cluster is already shut down');
throw new wf.ApplicationFailure('Cannot assign nodes to a job: Cluster is shutting down');
}
return await this.nodesMutex.runExclusive(async (): Promise<ClusterManagerStateSummary> => {
// Idempotency guard: do nothing if the job already has nodes assigned.
Expand All @@ -83,8 +95,8 @@ export class ClusterManager {
}

async deleteJob(input: DeleteJobUpdateInput) {
await wf.condition(() => this.state.clusterStarted);
if (this.state.clusterShutdown) {
await wf.condition(() => this.state.clusterState === ClusterState.STARTED);
if (this.state.clusterState === ClusterState.SHUTTING_DOWN) {
// If you want the client to receive a failure, either add an update validator and throw the
// exception from there, or raise an ApplicationError. Other exceptions in the handler will
// cause the workflow to keep retrying and get it stuck.
Expand All @@ -105,8 +117,7 @@ export class ClusterManager {

getState(): ClusterManagerState {
return {
clusterStarted: this.state.clusterStarted,
clusterShutdown: this.state.clusterShutdown,
clusterState: this.state.clusterState,
nodes: this.state.nodes,
maxAssignedNodes: this.state.maxAssignedNodes,
};
Expand Down
6 changes: 4 additions & 2 deletions message-passing/safe-message-handlers/src/run-simulation.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { WorkflowHandle } from '@temporalio/client';

import { assignNodesToJobUpdate, startClusterSignal, deleteJobUpdate, shutdownClusterSignal } from './workflows';
import { assignNodesToJobUpdate, startClusterSignal, deleteJobUpdate, shutdownClusterUpdate } from './workflows';
import { startClusterManager } from './client';
import { setTimeout } from 'timers/promises';

Expand All @@ -23,7 +23,9 @@ async function runSimulation(wf: WorkflowHandle, delaySeconds?: number): Promise
}
await Promise.all(deletionUpdates);

await wf.signal(shutdownClusterSignal);
if (!(await wf.executeUpdate(shutdownClusterUpdate))) {
throw new Error('Failed to shutdown cluster');
}
}

async function main() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ import { TestWorkflowEnvironment } from '@temporalio/testing';
import { before, describe, it } from 'mocha';
import { bundleWorkflowCode, WorkflowBundleWithSourceMap, DefaultLogger, Runtime, Worker } from '@temporalio/worker';
import * as activities from '../activities';
import * as client from '@temporalio/client';
import {
clusterManagerWorkflow,
assignNodesToJobUpdate,
startClusterSignal,
shutdownClusterSignal,
shutdownClusterUpdate,
deleteJobUpdate,
getClusterStatusQuery,
} from '../workflows';
Expand Down Expand Up @@ -81,7 +82,7 @@ describe('cluster manager', function () {
);
assert.equal(queryResult.maxAssignedNodes, request1.numNodes + request2.numNodes);
// Terminate the workflow and check that workflow returns same value as obtained from last query.
await workflow.signal(shutdownClusterSignal);
assert.equal(await workflow.executeUpdate(shutdownClusterUpdate), true);
const wfResult = await workflow.result();
assert.deepEqual(wfResult, queryResult);
});
Expand Down
9 changes: 7 additions & 2 deletions message-passing/safe-message-handlers/src/types.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
export interface ClusterManagerState {
clusterStarted: boolean;
clusterShutdown: boolean;
clusterState: ClusterState;
nodes: Map<string, string | null>;
maxAssignedNodes: number;
}
Expand Down Expand Up @@ -28,3 +27,9 @@ export interface ClusterManagerWorkflowResult {
numCurrentlyAssignedNodes: number;
numBadNodes: number;
}

export enum ClusterState {
NOT_STARTED = 'NOT_STARTED',
STARTED = 'STARTED',
SHUTTING_DOWN = 'SHUTTING_DOWN',
}
12 changes: 8 additions & 4 deletions message-passing/safe-message-handlers/src/workflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import {
AssignNodesToJobUpdateInput,
ClusterManagerInput,
ClusterManagerStateSummary,
ClusterState,
DeleteJobUpdateInput,
} from './types';

export const startClusterSignal = wf.defineSignal('startCluster');
export const shutdownClusterSignal = wf.defineSignal('shutdownCluster');
export const shutdownClusterUpdate = wf.defineUpdate('shutdownCluster');
export const assignNodesToJobUpdate = wf.defineUpdate<ClusterManagerStateSummary, [AssignNodesToJobUpdateInput]>(
'allocateNodesToJob'
);
Expand All @@ -21,7 +22,7 @@ export async function clusterManagerWorkflow(input: ClusterManagerInput = {}): P
// Message-handling API
//
wf.setHandler(startClusterSignal, () => manager.startCluster());
wf.setHandler(shutdownClusterSignal, () => manager.shutDownCluster());
wf.setHandler(shutdownClusterUpdate, () => manager.shutdownCluster());

// This is an update as opposed to a signal because the client may want to wait for nodes to be
// allocated before sending work to those nodes. Returns the array of node names that were
Expand Down Expand Up @@ -49,8 +50,11 @@ export async function clusterManagerWorkflow(input: ClusterManagerInput = {}): P
// lies in the message-processing handlers implented in the ClusterManager class. The main
// workflow itself simply waits until the cluster is shutdown, or the workflow needs to
// continue-as-new.
await wf.condition(() => manager.state.clusterShutdown || wf.workflowInfo().continueAsNewSuggested);
if (!manager.state.clusterShutdown) {
await wf.condition(() => manager.state.clusterState === ClusterState.STARTED);
await wf.condition(
() => manager.state.clusterState === ClusterState.SHUTTING_DOWN || wf.workflowInfo().continueAsNewSuggested
);
if (manager.state.clusterState !== ClusterState.SHUTTING_DOWN) {
// You should typically wait for all async handlers to finish before
// completing a workflow or continuing as new. If the main workflow method
// is scheduling activities or child workflows, then you should typically
Expand Down

0 comments on commit b95d2b5

Please sign in to comment.