Skip to content

Commit

Permalink
Shutdown and promise optimizations (#1586)
Browse files Browse the repository at this point in the history
* Simplify WriteFile Promise

* Simplify Subscription Update  Promise

* optimize direct update after update

... and only send when non empty to address edge cases

* Subscription log tweaks

* Optimize log when clearing subscriptions

* Close sessions in parallel

... this mainly optimized the time needed for subscription finalization because "parallel" and not sequencial

* makes sure to send all Mdns messages

... and not come back after first error

* Optimize ServerNode shutdown logic

... to make sure that sessions are closed after broadcaster started to shut down but before network is down

* Changelog

* Optimize Promise handling in various places

* remove generic when we can not properly unwrap it

* fix tests
  • Loading branch information
Apollon77 authored Dec 31, 2024
1 parent b98540a commit 689db45
Show file tree
Hide file tree
Showing 22 changed files with 150 additions and 97 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ The main work (all changes without a GitHub username in brackets in the below li

- @matter/node
- Enhancement: Each new PASE session now automatically arms the failsafe timer for 60s as required by specs
- Enhancement: Optimizes Node shutdown logic to close sessions and subscriptions before shutting down the network
- Fix: Fixes withBehaviors() method on endpoints

- @matter/nodejs-ble
Expand Down
15 changes: 15 additions & 0 deletions packages/general/src/MatterError.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,21 @@ export class MatterAggregateError extends AggregateError {
}
return AggregateError[Symbol.hasInstance](instance);
}

/**
* Wait for all promises to settle and throw an error if any of them reject as MatterAggregateError
* (or extended class). Promise results are not returned.
* TODO: Enhance the types between call and result to be better unwrapped
*/
static async allSettled(promises: Iterable<unknown>, message = "Errors happened"): Promise<unknown[]> {
const results = await Promise.allSettled(promises);
const errors = results.filter(result => result.status === "rejected").map(result => result.reason);

if (errors.length) {
throw new this(errors, message);
}
return (results as PromiseFulfilledResult<unknown>[]).map(result => result.value);
}
}

Object.assign(MatterAggregateError, {
Expand Down
7 changes: 5 additions & 2 deletions packages/general/src/net/UdpMulticastServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

import { MatterAggregateError } from "#MatterError.js";
import { Logger } from "../log/Logger.js";
import { Cache } from "../util/Cache.js";
import { isIPv4 } from "../util/Ip.js";
Expand Down Expand Up @@ -92,15 +93,15 @@ export class UdpMulticastServer {
} else {
const netInterfaces =
netInterface !== undefined ? [{ name: netInterface }] : await this.network.getNetInterfaces();
await Promise.all(
await MatterAggregateError.allSettled(
netInterfaces.map(async ({ name: netInterface }) => {
const { ipV4, ipV6 } = (await this.network.getIpMac(netInterface)) ?? {
mac: "",
ipV4: [],
ipV6: [],
};
const ips = [...ipV4, ...ipV6];
await Promise.all(
await MatterAggregateError.allSettled(
ips.map(async ip => {
const iPv4 = ipV4.includes(ip);
const broadcastTarget = iPv4 ? this.broadcastAddressIpv4 : this.broadcastAddressIpv6;
Expand All @@ -116,8 +117,10 @@ export class UdpMulticastServer {
logger.info(`${netInterface}: ${(error as Error).message}`);
}
}),
`Error sending UDP Multicast message on interface ${netInterface}`,
);
}),
"Error sending UDP Multicast message",
);
}
}
Expand Down
10 changes: 8 additions & 2 deletions packages/general/src/storage/StorageContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

import { MatterAggregateError } from "#MatterError.js";
import { MaybePromise } from "../util/Promises.js";
import { Storage, StorageError, StorageOperationResult } from "./Storage.js";
import { SupportedStorageTypes } from "./StringifyTools.js";
Expand Down Expand Up @@ -92,7 +93,10 @@ export class StorageContext<S extends Storage = any> implements StorageContextFa
const keys = this.keys();
if (MaybePromise.is(keys)) {
return keys.then(keys => {
return Promise.all(keys.map(key => this.delete(key))).then(() => Promise.resolve());
return MatterAggregateError.allSettled(
keys.map(key => this.delete(key)),
"Error while clearing storage",
).then(() => {});
}) as StorageOperationResult<S>;
}
const promises = new Array<PromiseLike<void>>();
Expand All @@ -103,7 +107,9 @@ export class StorageContext<S extends Storage = any> implements StorageContextFa
}
});
if (promises.length > 0) {
return Promise.all(promises).then(() => Promise.resolve()) as StorageOperationResult<S>;
return MatterAggregateError.allSettled(promises, "Error while clearing storage").then(
() => {},
) as StorageOperationResult<S>;
}
return undefined as StorageOperationResult<S>;
}
Expand Down
2 changes: 1 addition & 1 deletion packages/general/src/util/Promises.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ export class PromiseTimeoutError extends MatterError {
/**
* Create a promise with a timeout.
*
* By default rejects with {@link PromiseTimeoutError} on timeout but you can override by supplying {@link cancel}.
* By default, rejects with {@link PromiseTimeoutError} on timeout but you can override by supplying {@link cancel}.
*
* @param timeoutMs the timeout in milliseconds
* @param promise a promise that resolves or rejects when the timed task completes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import { Behavior } from "#behavior/Behavior.js";
import { BasicInformationBehavior } from "#behaviors/basic-information";
import { ImplementationError } from "#general";
import { ImplementationError, Logger, MatterAggregateError } from "#general";
import {
Ble,
FabricAuthority,
Expand All @@ -21,6 +21,8 @@ import { NetworkServer } from "../network/NetworkServer.js";
import { ActiveDiscoveries } from "./discovery/ActiveDiscoveries.js";
import type { Discovery } from "./discovery/Discovery.js";

const logger = Logger.get("ControllerBehavior");

/**
* Node controller functionality.
*
Expand Down Expand Up @@ -91,7 +93,9 @@ export class ControllerBehavior extends Behavior {
discovery.cancel();
}

await Promise.allSettled([...discoveries]);
await MatterAggregateError.allSettled([...discoveries], "Error while cancelling discoveries").catch(error =>
logger.error(error),
);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,22 +186,8 @@ export abstract class Discovery<T = unknown> extends CancelablePromise<T> {
);
}

Promise.allSettled(promises)
.then(results => {
const errors = Array<any>();

for (const result of results) {
if (result.status === "rejected") {
errors.push(result.reason);
}
}

if (errors.length) {
throw new DiscoveryAggregateError(errors, `${this} failed`);
}

this.#invokeCompleter();
})
DiscoveryAggregateError.allSettled(promises, `${this} failed`)
.then(() => this.#invokeCompleter())
.catch(this.#reject);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ export abstract class NetworkRuntime {
} finally {
this.#owner.behaviors.internalsOf(NetworkBehavior).runtime = undefined;
}
await this.owner.prepareRuntimeShutdown();
await this.#owner.act(agent => this.owner.lifecycle.offline.emit(agent.context));
}

Expand Down
17 changes: 12 additions & 5 deletions packages/node/src/behavior/system/network/ServerNetworkRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -315,17 +315,24 @@ export class ServerNetworkRuntime extends NetworkRuntime {
this.#observers.close();

await this.owner.env.close(DeviceCommissioner);
await this.owner.env.close(DeviceAdvertiser);
// Shutdown the Broadcaster if DeviceAdvertiser is not initialized
// We kick-off the Advertiser shutdown to prevent re-announces when removing sessions and wait a bit later
const advertisementShutdown = this.owner.env.has(DeviceAdvertiser)
? this.owner.env.close(DeviceAdvertiser)
: this.#mdnsBroadcaster?.close();
this.#mdnsBroadcaster = undefined;

await this.owner.prepareRuntimeShutdown();

// Now all sessions are closed, so we wait for Advertiser to be gone
await advertisementShutdown;

await this.owner.env.close(ExchangeManager);
await this.owner.env.close(SecureChannelProtocol);
await this.owner.env.close(TransportInterfaceSet);

await this.#interactionServer?.[Symbol.asyncDispose]();
this.#interactionServer = undefined;

// DeviceAdvertiser does this but we do so here just in case DeviceAdvertiser did not initialize for some reason
await this.#mdnsBroadcaster?.close();
this.#mdnsBroadcaster = undefined;
}

protected override blockNewActivity() {
Expand Down
10 changes: 4 additions & 6 deletions packages/node/src/node/storage/ClientStoreService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,10 @@ export class ClientStoreFactory extends ClientStoreService {
store.construction.start();
}

const results = await Promise.allSettled(Object.values(this.#stores).map(store => store.construction.ready));
const errors = results.filter(result => result.status === "rejected").map(result => result.reason);

if (errors.length) {
throw new MatterAggregateError(errors, "Error loading one or more client stores");
}
await MatterAggregateError.allSettled(
Object.values(this.#stores).map(store => store.construction.ready),
"Error while initializing client stores",
);
}

allocateId() {
Expand Down
26 changes: 10 additions & 16 deletions packages/nodejs/src/storage/StorageBackendDiskAsync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
*/

import {
createPromise,
fromJson,
Logger,
MatterAggregateError,
MaybeAsyncStorage,
StorageError,
SupportedStorageTypes,
Expand Down Expand Up @@ -45,8 +45,9 @@ export class StorageBackendDiskAsync extends MaybeAsyncStorage {
async #finishAllWrites(filename?: string) {
// Let's try max up to 10 times to finish all writes out there, otherwise something is strange
for (let i = 0; i < 10; i++) {
await Promise.allSettled(
await MatterAggregateError.allSettled(
filename !== undefined ? [this.#writeFileBlocker.get(filename)] : this.#writeFileBlocker.values(),
"Error on finishing all file system writes to storage",
);
if (!this.#writeFileBlocker.size) {
return;
Expand Down Expand Up @@ -122,7 +123,7 @@ export class StorageBackendDiskAsync extends MaybeAsyncStorage {
for (const [key, value] of Object.entries(keyOrValues)) {
promises.push(this.#writeFile(this.buildStorageKey(contexts, key), toJson(value)));
}
await Promise.allSettled(promises);
await MatterAggregateError.allSettled(promises, "Error when writing values into filesystem storage");
}

/** According to Node.js documentation, writeFile is not atomic. This method ensures atomicity. */
Expand All @@ -133,18 +134,11 @@ export class StorageBackendDiskAsync extends MaybeAsyncStorage {
return this.#writeFile(fileName, value);
}

const { promise, rejecter, resolver } = createPromise<void>();

const promise = writeFile(this.filePath(fileName), value, "utf8").finally(() => {
this.#writeFileBlocker.delete(fileName);
});
this.#writeFileBlocker.set(fileName, promise);
writeFile(this.filePath(fileName), value, "utf8")
.then(() => {
this.#writeFileBlocker.delete(fileName);
resolver();
})
.catch(() => {
this.#writeFileBlocker.delete(fileName);
rejecter();
});

return promise;
}

Expand Down Expand Up @@ -187,7 +181,7 @@ export class StorageBackendDiskAsync extends MaybeAsyncStorage {
})(),
);
}
await Promise.all(promises);
await MatterAggregateError.allSettled(promises, "Error when reading values from filesystem storage");
return values;
}

Expand Down Expand Up @@ -227,6 +221,6 @@ export class StorageBackendDiskAsync extends MaybeAsyncStorage {
promises.push(rm(this.filePath(key), { force: true }));
}
}
await Promise.all(promises);
await MatterAggregateError.allSettled(promises, "Error when clearing all values from filesystem storage");
}
}
5 changes: 4 additions & 1 deletion packages/protocol/src/cluster/server/EventServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
ImplementationError,
InternalError,
isObject,
MatterAggregateError,
MaybePromise,
Storage,
StorageOperationResult,
Expand Down Expand Up @@ -112,7 +113,9 @@ export class EventServer<T = any, S extends Storage = any> {
}
this.eventList = [];
if (promises.length > 0) {
return Promise.all(promises).then(() => Promise.resolve()) as StorageOperationResult<S>;
return MatterAggregateError.allSettled(promises, "Error binding events to the event handlers").then(
() => {},
) as StorageOperationResult<S>;
}
return undefined as StorageOperationResult<S>;
}
Expand Down
9 changes: 4 additions & 5 deletions packages/protocol/src/events/NonvolatileEventStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

import { Logger, MaybePromise, StorageContext } from "#general";
import { Logger, MatterAggregateError, MaybePromise, StorageContext } from "#general";
import { EventNumber } from "#types";
import { BaseEventStore } from "./BaseEventStore.js";
import { OccurrenceSummary } from "./EventStore.js";
Expand Down Expand Up @@ -100,10 +100,9 @@ export class NonvolatileEventStore extends BaseEventStore {

override close() {
if (this.#iops.size) {
return Promise.allSettled(this.#iops).then(
() => {},
() => {},
);
return MatterAggregateError.allSettled(this.#iops, "Error closing event store")
.then(() => {})
.catch(error => logger.error(error));
}
}

Expand Down
14 changes: 12 additions & 2 deletions packages/protocol/src/events/OccurrenceManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,15 @@
* SPDX-License-Identifier: Apache-2.0
*/

import { asyncNew, Construction, ImplementationError, isObject, Logger, MaybePromise } from "#general";
import {
asyncNew,
Construction,
ImplementationError,
isObject,
Logger,
MatterAggregateError,
MaybePromise,
} from "#general";
import {
EventNumber,
EventPriority,
Expand Down Expand Up @@ -262,7 +270,9 @@ export class OccurrenceManager {
this.#storedEventCount -= totalCulled;

if (asyncDrops.length) {
return Promise.allSettled(asyncDrops).then(() => {});
return MatterAggregateError.allSettled(asyncDrops, "Error dropping occurrences")
.then(() => {})
.catch(error => logger.error(error));
}
}
}
Expand Down
12 changes: 9 additions & 3 deletions packages/protocol/src/interaction/InteractionServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -955,10 +955,16 @@ export class InteractionServer implements ProtocolHandler, InteractionRecipient
);

if (!keepSubscriptions) {
logger.debug(
`Clear subscriptions for Subscriber node ${session.peerNodeId} because keepSubscriptions=false`,
const clearedCount = await this.#context.sessions.clearSubscriptionsForNode(
fabric.fabricIndex,
session.peerNodeId,
true,
);
await this.#context.sessions.clearSubscriptionsForNode(fabric.fabricIndex, session.peerNodeId, true);
if (clearedCount > 0) {
logger.debug(
`Cleared ${clearedCount} subscriptions for Subscriber node ${session.peerNodeId} because keepSubscriptions=false`,
);
}
}

if (
Expand Down
Loading

0 comments on commit 689db45

Please sign in to comment.