Skip to content

Commit

Permalink
Synchronize node lifecycle transitions (#1507)
Browse files Browse the repository at this point in the history
This places protections around node lifecycle transitions with many async steps.  Previously it was possible to
trigger these simultaneously in ways that cause crashes due to interference between them.  They may still be
invoked in parallel but will execute sequentially.  Protected methods are available for composing the operations
when the mutex is already held.
  • Loading branch information
lauckhart authored Dec 11, 2024
1 parent 3e2a368 commit 6d80d8b
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 9 deletions.
15 changes: 15 additions & 0 deletions packages/general/src/util/Mutex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,21 @@ export class Mutex implements PromiseLike<unknown> {
}
}

/**
* Enqueue work with an awaitable result.
*/
produce<T>(task: () => PromiseLike<T>, cancel?: () => void): Promise<T> {
return new Promise<T>((resolve, reject) => {
this.run(async () => {
try {
resolve(await task());
} catch (e) {
reject(e);
}
}, cancel);
});
}

/**
* Cancel remaining work and perform one last task with the Mutex held.
*/
Expand Down
26 changes: 22 additions & 4 deletions packages/node/src/node/Node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ export abstract class Node<T extends Node.CommonRootEndpoint = Node.CommonRootEn
* Bring the node online.
*/
async start() {
await this.lifecycle.mutex.produce(this.startWithMutex.bind(this));
}

protected async startWithMutex() {
this.env.runtime.add(this);

try {
Expand Down Expand Up @@ -123,6 +127,10 @@ export abstract class Node<T extends Node.CommonRootEndpoint = Node.CommonRootEn
* Once the node is offline you may use {@link start} to bring the node online again.
*/
async cancel() {
await this.lifecycle.mutex.produce(this.cancelWithMutex.bind(this));
}

protected async cancelWithMutex() {
if (!this.#runtime) {
return;
}
Expand All @@ -133,17 +141,27 @@ export abstract class Node<T extends Node.CommonRootEndpoint = Node.CommonRootEn
}

override async close() {
await this.lifecycle.mutex.produce(this.closeWithMutex.bind(this));
}

protected async closeWithMutex() {
// The runtime is not designed to operate with a node that is shutting down so destroy it before performing
// actual close
//
// TODO - this should probably block other functions like start()
if (this.#runtime) {
await this.cancel();
await this.cancelWithMutex();
}

await super.close();
}

override async reset() {
await this.lifecycle.mutex.produce(this.resetWithMutex.bind(this));
}

protected async resetWithMutex() {
return super.reset();
}

/**
* Create the network runtime.
*/
Expand Down Expand Up @@ -177,7 +195,7 @@ export abstract class Node<T extends Node.CommonRootEndpoint = Node.CommonRootEn
}

override async [Construction.destruct]() {
await this.cancel();
await this.cancelWithMutex();
await super[Construction.destruct]();
DiagnosticSource.delete(this);
}
Expand Down
18 changes: 17 additions & 1 deletion packages/node/src/node/NodeLifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import { ActionContext } from "#behavior/context/ActionContext.js";
import { Endpoint } from "#endpoint/Endpoint.js";
import { EndpointLifecycle } from "#endpoint/properties/EndpointLifecycle.js";
import { AsyncObservable, Observable } from "#general";
import { AsyncObservable, Mutex, Observable } from "#general";

/**
* Extended lifecycle information that only applies to root endpoints.
Expand All @@ -21,10 +21,13 @@ export class NodeLifecycle extends EndpointLifecycle {
#initialized = Observable<[isCommissioned: boolean]>();
#isOnline = false;
#isCommissioned = false;
#mutex: Mutex;

constructor(endpoint: Endpoint) {
super(endpoint);

this.#mutex = new Mutex(endpoint);

this.#online.on(() => {
this.#isOnline = true;
});
Expand Down Expand Up @@ -97,4 +100,17 @@ export class NodeLifecycle extends EndpointLifecycle {
get decommissioned() {
return this.#decommissioned;
}

/**
* Mutex for protecting node lifecycle transitions.
*
* Methods that implement complex async lifecycle transitions use this mutex to ensure conflicting operations cannot
* intermingle.
*
* Generally methods that hold this mutex have a protected "*WithMutex" variant. This allows for nesting of logic
* that requires the mutex without causing deadlock.
*/
get mutex() {
return this.#mutex;
}
}
10 changes: 7 additions & 3 deletions packages/node/src/node/ServerNode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,20 +114,24 @@ export class ServerNode<T extends ServerNode.RootEndpoint = ServerNode.RootEndpo
* Perform a factory reset of the node.
*/
override async erase() {
await this.lifecycle.mutex.produce(this.eraseWithMutex.bind(this));
}

protected async eraseWithMutex() {
try {
await this.construction;

// Go offline before performing reset
const isOnline = this.lifecycle.isOnline;
if (isOnline) {
await this.cancel();
await this.cancelWithMutex();
}

// Inform user
this.statusUpdate("resetting to factory defaults");

// Reset in-memory state
await this.reset();
await this.resetWithMutex();

// Reset persistent state
await this.resetStorage();
Expand All @@ -137,7 +141,7 @@ export class ServerNode<T extends ServerNode.RootEndpoint = ServerNode.RootEndpo

// Go back online if we were online at time of reset, otherwise just await reinitialization
if (isOnline) {
await this.start();
await this.startWithMutex();
} else {
await this.construction.ready;
}
Expand Down
2 changes: 1 addition & 1 deletion packages/node/test/node/ServerNodeTest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ describe("ServerNode", () => {
await node.cancel();
}

await node.erase();
await MockTime.resolve(node.erase());

// Confirm previous online state is resumed
expect(node.lifecycle.isOnline).equals(mode === "online");
Expand Down
4 changes: 4 additions & 0 deletions packages/protocol/src/mdns/MdnsService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const logger = Logger.get("MDNS");
export class MdnsService {
#broadcaster?: MdnsBroadcaster;
#scanner?: MdnsScanner;
#env: Environment;
readonly #construction: Construction<MdnsService>;
readonly #enableIpv4: boolean;
readonly limitedToNetInterface?: string;
Expand All @@ -31,6 +32,7 @@ export class MdnsService {
}

constructor(environment: Environment, options?: MdnsService.Options) {
this.#env = environment;
environment.set(MdnsService, this);
environment.runtime.add(this);

Expand Down Expand Up @@ -78,6 +80,8 @@ export class MdnsService {
}

async [Symbol.asyncDispose]() {
this.#env.delete(MdnsService, this);

await this.#construction.close(async () => {
const broadcasterDisposal = MaybePromise.then(this.#broadcaster?.close(), undefined, e =>
logger.error("Error disposing of MDNS broadcaster", e),
Expand Down

0 comments on commit 6d80d8b

Please sign in to comment.