diff --git a/CHANGELOG.md b/CHANGELOG.md index 213ae467c1..fb9aa851f6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ The main work (all changes without a GitHub username in brackets in the below li - @matter/node - Enhancement: Each new PASE session now automatically arms the failsafe timer for 60s as required by specs + - Enhancement: Optimizes Node shutdown logic to close sessions and subscriptions before shutting down the network - Fix: Fixes withBehaviors() method on endpoints - @matter/nodejs-ble diff --git a/packages/general/src/MatterError.ts b/packages/general/src/MatterError.ts index 90231a2655..0ead024c75 100644 --- a/packages/general/src/MatterError.ts +++ b/packages/general/src/MatterError.ts @@ -154,6 +154,21 @@ export class MatterAggregateError extends AggregateError { } return AggregateError[Symbol.hasInstance](instance); } + + /** + * Wait for all promises to settle and throw an error if any of them reject as MatterAggregateError + * (or extended class). Promise results are not returned. + * TODO: Enhance the types between call and result to be better unwrapped + */ + static async allSettled(promises: Iterable, message = "Errors happened"): Promise { + const results = await Promise.allSettled(promises); + const errors = results.filter(result => result.status === "rejected").map(result => result.reason); + + if (errors.length) { + throw new this(errors, message); + } + return (results as PromiseFulfilledResult[]).map(result => result.value); + } } Object.assign(MatterAggregateError, { diff --git a/packages/general/src/net/UdpMulticastServer.ts b/packages/general/src/net/UdpMulticastServer.ts index 189bad9c2a..566addb2e3 100644 --- a/packages/general/src/net/UdpMulticastServer.ts +++ b/packages/general/src/net/UdpMulticastServer.ts @@ -4,6 +4,7 @@ * SPDX-License-Identifier: Apache-2.0 */ +import { MatterAggregateError } from "#MatterError.js"; import { Logger } from "../log/Logger.js"; import { Cache } from "../util/Cache.js"; import { isIPv4 } from "../util/Ip.js"; @@ -92,7 +93,7 @@ export class UdpMulticastServer { } else { const netInterfaces = netInterface !== undefined ? [{ name: netInterface }] : await this.network.getNetInterfaces(); - await Promise.all( + await MatterAggregateError.allSettled( netInterfaces.map(async ({ name: netInterface }) => { const { ipV4, ipV6 } = (await this.network.getIpMac(netInterface)) ?? { mac: "", @@ -100,7 +101,7 @@ export class UdpMulticastServer { ipV6: [], }; const ips = [...ipV4, ...ipV6]; - await Promise.all( + await MatterAggregateError.allSettled( ips.map(async ip => { const iPv4 = ipV4.includes(ip); const broadcastTarget = iPv4 ? this.broadcastAddressIpv4 : this.broadcastAddressIpv6; @@ -116,8 +117,10 @@ export class UdpMulticastServer { logger.info(`${netInterface}: ${(error as Error).message}`); } }), + `Error sending UDP Multicast message on interface ${netInterface}`, ); }), + "Error sending UDP Multicast message", ); } } diff --git a/packages/general/src/storage/StorageContext.ts b/packages/general/src/storage/StorageContext.ts index e6af550748..64f598b7d0 100644 --- a/packages/general/src/storage/StorageContext.ts +++ b/packages/general/src/storage/StorageContext.ts @@ -4,6 +4,7 @@ * SPDX-License-Identifier: Apache-2.0 */ +import { MatterAggregateError } from "#MatterError.js"; import { MaybePromise } from "../util/Promises.js"; import { Storage, StorageError, StorageOperationResult } from "./Storage.js"; import { SupportedStorageTypes } from "./StringifyTools.js"; @@ -92,7 +93,10 @@ export class StorageContext implements StorageContextFa const keys = this.keys(); if (MaybePromise.is(keys)) { return keys.then(keys => { - return Promise.all(keys.map(key => this.delete(key))).then(() => Promise.resolve()); + return MatterAggregateError.allSettled( + keys.map(key => this.delete(key)), + "Error while clearing storage", + ).then(() => {}); }) as StorageOperationResult; } const promises = new Array>(); @@ -103,7 +107,9 @@ export class StorageContext implements StorageContextFa } }); if (promises.length > 0) { - return Promise.all(promises).then(() => Promise.resolve()) as StorageOperationResult; + return MatterAggregateError.allSettled(promises, "Error while clearing storage").then( + () => {}, + ) as StorageOperationResult; } return undefined as StorageOperationResult; } diff --git a/packages/general/src/util/Promises.ts b/packages/general/src/util/Promises.ts index 632625e646..df6351bb0f 100644 --- a/packages/general/src/util/Promises.ts +++ b/packages/general/src/util/Promises.ts @@ -74,7 +74,7 @@ export class PromiseTimeoutError extends MatterError { /** * Create a promise with a timeout. * - * By default rejects with {@link PromiseTimeoutError} on timeout but you can override by supplying {@link cancel}. + * By default, rejects with {@link PromiseTimeoutError} on timeout but you can override by supplying {@link cancel}. * * @param timeoutMs the timeout in milliseconds * @param promise a promise that resolves or rejects when the timed task completes diff --git a/packages/node/src/behavior/system/controller/ControllerBehavior.ts b/packages/node/src/behavior/system/controller/ControllerBehavior.ts index e6c48678fa..1aba860b06 100644 --- a/packages/node/src/behavior/system/controller/ControllerBehavior.ts +++ b/packages/node/src/behavior/system/controller/ControllerBehavior.ts @@ -6,7 +6,7 @@ import { Behavior } from "#behavior/Behavior.js"; import { BasicInformationBehavior } from "#behaviors/basic-information"; -import { ImplementationError } from "#general"; +import { ImplementationError, Logger, MatterAggregateError } from "#general"; import { Ble, FabricAuthority, @@ -21,6 +21,8 @@ import { NetworkServer } from "../network/NetworkServer.js"; import { ActiveDiscoveries } from "./discovery/ActiveDiscoveries.js"; import type { Discovery } from "./discovery/Discovery.js"; +const logger = Logger.get("ControllerBehavior"); + /** * Node controller functionality. * @@ -91,7 +93,9 @@ export class ControllerBehavior extends Behavior { discovery.cancel(); } - await Promise.allSettled([...discoveries]); + await MatterAggregateError.allSettled([...discoveries], "Error while cancelling discoveries").catch(error => + logger.error(error), + ); } } } diff --git a/packages/node/src/behavior/system/controller/discovery/Discovery.ts b/packages/node/src/behavior/system/controller/discovery/Discovery.ts index 269ffb9a21..2be1b5cc46 100644 --- a/packages/node/src/behavior/system/controller/discovery/Discovery.ts +++ b/packages/node/src/behavior/system/controller/discovery/Discovery.ts @@ -186,22 +186,8 @@ export abstract class Discovery extends CancelablePromise { ); } - Promise.allSettled(promises) - .then(results => { - const errors = Array(); - - for (const result of results) { - if (result.status === "rejected") { - errors.push(result.reason); - } - } - - if (errors.length) { - throw new DiscoveryAggregateError(errors, `${this} failed`); - } - - this.#invokeCompleter(); - }) + DiscoveryAggregateError.allSettled(promises, `${this} failed`) + .then(() => this.#invokeCompleter()) .catch(this.#reject); } diff --git a/packages/node/src/behavior/system/network/NetworkRuntime.ts b/packages/node/src/behavior/system/network/NetworkRuntime.ts index 5dce5a4cd6..4240ee43a6 100644 --- a/packages/node/src/behavior/system/network/NetworkRuntime.ts +++ b/packages/node/src/behavior/system/network/NetworkRuntime.ts @@ -49,7 +49,6 @@ export abstract class NetworkRuntime { } finally { this.#owner.behaviors.internalsOf(NetworkBehavior).runtime = undefined; } - await this.owner.prepareRuntimeShutdown(); await this.#owner.act(agent => this.owner.lifecycle.offline.emit(agent.context)); } diff --git a/packages/node/src/behavior/system/network/ServerNetworkRuntime.ts b/packages/node/src/behavior/system/network/ServerNetworkRuntime.ts index 96bf601d19..8cf7b8c1c3 100644 --- a/packages/node/src/behavior/system/network/ServerNetworkRuntime.ts +++ b/packages/node/src/behavior/system/network/ServerNetworkRuntime.ts @@ -315,17 +315,24 @@ export class ServerNetworkRuntime extends NetworkRuntime { this.#observers.close(); await this.owner.env.close(DeviceCommissioner); - await this.owner.env.close(DeviceAdvertiser); + // Shutdown the Broadcaster if DeviceAdvertiser is not initialized + // We kick-off the Advertiser shutdown to prevent re-announces when removing sessions and wait a bit later + const advertisementShutdown = this.owner.env.has(DeviceAdvertiser) + ? this.owner.env.close(DeviceAdvertiser) + : this.#mdnsBroadcaster?.close(); + this.#mdnsBroadcaster = undefined; + + await this.owner.prepareRuntimeShutdown(); + + // Now all sessions are closed, so we wait for Advertiser to be gone + await advertisementShutdown; + await this.owner.env.close(ExchangeManager); await this.owner.env.close(SecureChannelProtocol); await this.owner.env.close(TransportInterfaceSet); await this.#interactionServer?.[Symbol.asyncDispose](); this.#interactionServer = undefined; - - // DeviceAdvertiser does this but we do so here just in case DeviceAdvertiser did not initialize for some reason - await this.#mdnsBroadcaster?.close(); - this.#mdnsBroadcaster = undefined; } protected override blockNewActivity() { diff --git a/packages/node/src/node/storage/ClientStoreService.ts b/packages/node/src/node/storage/ClientStoreService.ts index 510f2ee54b..dac8b0a8e1 100644 --- a/packages/node/src/node/storage/ClientStoreService.ts +++ b/packages/node/src/node/storage/ClientStoreService.ts @@ -78,12 +78,10 @@ export class ClientStoreFactory extends ClientStoreService { store.construction.start(); } - const results = await Promise.allSettled(Object.values(this.#stores).map(store => store.construction.ready)); - const errors = results.filter(result => result.status === "rejected").map(result => result.reason); - - if (errors.length) { - throw new MatterAggregateError(errors, "Error loading one or more client stores"); - } + await MatterAggregateError.allSettled( + Object.values(this.#stores).map(store => store.construction.ready), + "Error while initializing client stores", + ); } allocateId() { diff --git a/packages/nodejs/src/storage/StorageBackendDiskAsync.ts b/packages/nodejs/src/storage/StorageBackendDiskAsync.ts index 2bf0734882..ed33798e41 100644 --- a/packages/nodejs/src/storage/StorageBackendDiskAsync.ts +++ b/packages/nodejs/src/storage/StorageBackendDiskAsync.ts @@ -5,9 +5,9 @@ */ import { - createPromise, fromJson, Logger, + MatterAggregateError, MaybeAsyncStorage, StorageError, SupportedStorageTypes, @@ -45,8 +45,9 @@ export class StorageBackendDiskAsync extends MaybeAsyncStorage { async #finishAllWrites(filename?: string) { // Let's try max up to 10 times to finish all writes out there, otherwise something is strange for (let i = 0; i < 10; i++) { - await Promise.allSettled( + await MatterAggregateError.allSettled( filename !== undefined ? [this.#writeFileBlocker.get(filename)] : this.#writeFileBlocker.values(), + "Error on finishing all file system writes to storage", ); if (!this.#writeFileBlocker.size) { return; @@ -122,7 +123,7 @@ export class StorageBackendDiskAsync extends MaybeAsyncStorage { for (const [key, value] of Object.entries(keyOrValues)) { promises.push(this.#writeFile(this.buildStorageKey(contexts, key), toJson(value))); } - await Promise.allSettled(promises); + await MatterAggregateError.allSettled(promises, "Error when writing values into filesystem storage"); } /** According to Node.js documentation, writeFile is not atomic. This method ensures atomicity. */ @@ -133,18 +134,11 @@ export class StorageBackendDiskAsync extends MaybeAsyncStorage { return this.#writeFile(fileName, value); } - const { promise, rejecter, resolver } = createPromise(); - + const promise = writeFile(this.filePath(fileName), value, "utf8").finally(() => { + this.#writeFileBlocker.delete(fileName); + }); this.#writeFileBlocker.set(fileName, promise); - writeFile(this.filePath(fileName), value, "utf8") - .then(() => { - this.#writeFileBlocker.delete(fileName); - resolver(); - }) - .catch(() => { - this.#writeFileBlocker.delete(fileName); - rejecter(); - }); + return promise; } @@ -187,7 +181,7 @@ export class StorageBackendDiskAsync extends MaybeAsyncStorage { })(), ); } - await Promise.all(promises); + await MatterAggregateError.allSettled(promises, "Error when reading values from filesystem storage"); return values; } @@ -227,6 +221,6 @@ export class StorageBackendDiskAsync extends MaybeAsyncStorage { promises.push(rm(this.filePath(key), { force: true })); } } - await Promise.all(promises); + await MatterAggregateError.allSettled(promises, "Error when clearing all values from filesystem storage"); } } diff --git a/packages/protocol/src/cluster/server/EventServer.ts b/packages/protocol/src/cluster/server/EventServer.ts index ddd4efd863..9dd668846d 100644 --- a/packages/protocol/src/cluster/server/EventServer.ts +++ b/packages/protocol/src/cluster/server/EventServer.ts @@ -10,6 +10,7 @@ import { ImplementationError, InternalError, isObject, + MatterAggregateError, MaybePromise, Storage, StorageOperationResult, @@ -112,7 +113,9 @@ export class EventServer { } this.eventList = []; if (promises.length > 0) { - return Promise.all(promises).then(() => Promise.resolve()) as StorageOperationResult; + return MatterAggregateError.allSettled(promises, "Error binding events to the event handlers").then( + () => {}, + ) as StorageOperationResult; } return undefined as StorageOperationResult; } diff --git a/packages/protocol/src/events/NonvolatileEventStore.ts b/packages/protocol/src/events/NonvolatileEventStore.ts index 100c6523cf..8b0ea8435b 100644 --- a/packages/protocol/src/events/NonvolatileEventStore.ts +++ b/packages/protocol/src/events/NonvolatileEventStore.ts @@ -4,7 +4,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -import { Logger, MaybePromise, StorageContext } from "#general"; +import { Logger, MatterAggregateError, MaybePromise, StorageContext } from "#general"; import { EventNumber } from "#types"; import { BaseEventStore } from "./BaseEventStore.js"; import { OccurrenceSummary } from "./EventStore.js"; @@ -100,10 +100,9 @@ export class NonvolatileEventStore extends BaseEventStore { override close() { if (this.#iops.size) { - return Promise.allSettled(this.#iops).then( - () => {}, - () => {}, - ); + return MatterAggregateError.allSettled(this.#iops, "Error closing event store") + .then(() => {}) + .catch(error => logger.error(error)); } } diff --git a/packages/protocol/src/events/OccurrenceManager.ts b/packages/protocol/src/events/OccurrenceManager.ts index c7f86a3c62..fad7c75bc4 100644 --- a/packages/protocol/src/events/OccurrenceManager.ts +++ b/packages/protocol/src/events/OccurrenceManager.ts @@ -4,7 +4,15 @@ * SPDX-License-Identifier: Apache-2.0 */ -import { asyncNew, Construction, ImplementationError, isObject, Logger, MaybePromise } from "#general"; +import { + asyncNew, + Construction, + ImplementationError, + isObject, + Logger, + MatterAggregateError, + MaybePromise, +} from "#general"; import { EventNumber, EventPriority, @@ -262,7 +270,9 @@ export class OccurrenceManager { this.#storedEventCount -= totalCulled; if (asyncDrops.length) { - return Promise.allSettled(asyncDrops).then(() => {}); + return MatterAggregateError.allSettled(asyncDrops, "Error dropping occurrences") + .then(() => {}) + .catch(error => logger.error(error)); } } } diff --git a/packages/protocol/src/interaction/InteractionServer.ts b/packages/protocol/src/interaction/InteractionServer.ts index 8355b7d3e3..30ab0c6d73 100644 --- a/packages/protocol/src/interaction/InteractionServer.ts +++ b/packages/protocol/src/interaction/InteractionServer.ts @@ -955,10 +955,16 @@ export class InteractionServer implements ProtocolHandler, InteractionRecipient ); if (!keepSubscriptions) { - logger.debug( - `Clear subscriptions for Subscriber node ${session.peerNodeId} because keepSubscriptions=false`, + const clearedCount = await this.#context.sessions.clearSubscriptionsForNode( + fabric.fabricIndex, + session.peerNodeId, + true, ); - await this.#context.sessions.clearSubscriptionsForNode(fabric.fabricIndex, session.peerNodeId, true); + if (clearedCount > 0) { + logger.debug( + `Cleared ${clearedCount} subscriptions for Subscriber node ${session.peerNodeId} because keepSubscriptions=false`, + ); + } } if ( diff --git a/packages/protocol/src/interaction/ServerSubscription.ts b/packages/protocol/src/interaction/ServerSubscription.ts index ee2632922d..327b71b497 100644 --- a/packages/protocol/src/interaction/ServerSubscription.ts +++ b/packages/protocol/src/interaction/ServerSubscription.ts @@ -8,13 +8,13 @@ import { NumberedOccurrence } from "#events/Occurrence.js"; import { InternalError, Logger, + MatterAggregateError, MatterError, MaybePromise, NetworkError, NoResponseTimeoutError, Time, Timer, - createPromise, isObject, } from "#general"; import { Specification } from "#model"; @@ -151,7 +151,9 @@ export class ServerSubscription extends Subscription { #lastUpdateTimeMs = 0; #updateTimer: Timer; - readonly #sendDelayTimer = Time.getTimer("Subscription delay", 50, () => this.#triggerSendUpdate()); + readonly #sendDelayTimer: Timer = Time.getTimer(`Subscription ${this.id} delay`, 50, () => + this.#triggerSendUpdate(), + ); readonly #outstandingAttributeUpdates = new Map>(); readonly #outstandingEventUpdates = new Set>(); readonly #attributeListeners = new Map< @@ -206,7 +208,9 @@ export class ServerSubscription extends Subscription { this.#maxIntervalMs = maxInterval; this.#sendIntervalMs = sendInterval; - this.#updateTimer = Time.getTimer("Subscription update", this.#sendIntervalMs, () => this.#prepareDataUpdate()); // will be started later + this.#updateTimer = Time.getTimer(`Subscription ${this.id} update`, this.#sendIntervalMs, () => + this.#prepareDataUpdate(), + ); // will be started later } private determineSendingIntervals( @@ -536,7 +540,7 @@ export class ServerSubscription extends Subscription { } this.#sendDelayTimer.start(); - this.#updateTimer = Time.getTimer("Subscription update", this.#sendIntervalMs, () => + this.#updateTimer = Time.getTimer(`Subscription update ${this.id}`, this.#sendIntervalMs, () => this.#prepareDataUpdate(), ).start(); } @@ -547,19 +551,16 @@ export class ServerSubscription extends Subscription { this.sendNextUpdateImmediately = true; return; } - const { promise, resolver } = createPromise(); - this.#sendUpdate() - .then(resolver) + this.currentUpdatePromise = this.#sendUpdate() .catch(error => logger.warn("Sending subscription update failed:", error)) .finally(() => (this.currentUpdatePromise = undefined)); - this.currentUpdatePromise = promise; } /** * Determine all attributes that have changed since the last update and send them tout to the subscriber. * Important: This method MUST NOT be called directly. Use triggerSendUpdate() instead! */ - async #sendUpdate() { + async #sendUpdate(onlyWithData = false) { // Get all outstanding updates, make sure the order is correct per endpoint and cluster const attributeUpdatesToSend = new Array>(); const attributeUpdates: Record[]> = {}; @@ -580,6 +581,11 @@ export class ServerSubscription extends Subscription { const eventUpdatesToSend = Array.from(this.#outstandingEventUpdates.values()); this.#outstandingEventUpdates.clear(); + + if (onlyWithData && attributeUpdatesToSend.length === 0 && eventUpdatesToSend.length === 0) { + return; + } + this.#lastUpdateTimeMs = Time.nowMs(); try { @@ -629,7 +635,7 @@ export class ServerSubscription extends Subscription { if (this.sendNextUpdateImmediately) { logger.debug("Sending delayed update immediately after last one was sent."); this.sendNextUpdateImmediately = false; - await this.#sendUpdate(); + await this.#sendUpdate(true); // Send but only if non-empty } } @@ -836,7 +842,7 @@ export class ServerSubscription extends Subscription { this.#sendDelayTimer.stop(); if (this.#outstandingAttributeUpdates.size > 0 || this.#outstandingEventUpdates.size > 0) { logger.debug( - `Flushing subscription ${this.id} with ${this.#outstandingAttributeUpdates.size} attributes and ${this.#outstandingEventUpdates.size} events`, + `Flushing subscription ${this.id} with ${this.#outstandingAttributeUpdates.size} attributes and ${this.#outstandingEventUpdates.size} events${this.isClosed ? " (for closing)" : ""}`, ); this.#triggerSendUpdate(); if (this.currentUpdatePromise) { @@ -853,7 +859,9 @@ export class ServerSubscription extends Subscription { if (this.attributeUpdatePromises.size) { const resolvers = [...this.attributeUpdatePromises.values()]; this.attributeUpdatePromises.clear(); - await Promise.all(resolvers); + await MatterAggregateError.allSettled(resolvers, "Error receiving all outstanding attribute values").catch( + error => logger.error(error), + ); } this.#updateTimer.stop(); this.#sendDelayTimer.stop(); @@ -870,9 +878,6 @@ export class ServerSubscription extends Subscription { attributes: AttributePathWithValueVersion[], events: EventPathWithEventData[], ) { - logger.debug( - `Sending subscription update message for ID ${this.id} with ${attributes.length} attributes and ${events.length} events`, - ); const exchange = this.#context.initiateExchange(this.peerAddress, INTERACTION_PROTOCOL_ID); if (exchange === undefined) return; if (attributes.length) { @@ -901,7 +906,7 @@ export class ServerSubscription extends Subscription { } else { await messenger.sendDataReport( { - suppressResponse: false, // Non empty data reports always need to send response + suppressResponse: false, // Non-empty data reports always need to send response subscriptionId: this.id, interactionModelRevision: Specification.INTERACTION_MODEL_REVISION, attributeReportsPayload: attributes.map(({ path, schema, value, version, attribute }) => ({ diff --git a/packages/protocol/src/mdns/MdnsServer.ts b/packages/protocol/src/mdns/MdnsServer.ts index 0db0d64fb6..8ad2fa1908 100644 --- a/packages/protocol/src/mdns/MdnsServer.ts +++ b/packages/protocol/src/mdns/MdnsServer.ts @@ -15,6 +15,7 @@ import { DnsRecordType, isDeepEqual, Logger, + MatterAggregateError, MAX_MDNS_MESSAGE_SIZE, Network, Time, @@ -252,7 +253,7 @@ export class MdnsServer { } async announce(announcedNetPort?: number) { - await Promise.all( + await MatterAggregateError.allSettled( (await this.#getMulticastInterfacesForAnnounce()).map(async ({ name: netInterface }) => { const records = await this.#records.get(netInterface); for (const [portType, portTypeRecords] of records) { @@ -263,11 +264,12 @@ export class MdnsServer { await Time.sleep("MDNS delay", 20 + Math.floor(Math.random() * 100)); // as per DNS-SD spec wait 20-120ms before sending more packets } }), - ); + "Error happened when announcing MDNS messages", + ).catch(error => logger.error(error)); } async expireAnnouncements(announcedNetPort?: number, type?: AnnouncementType) { - await Promise.all( + await MatterAggregateError.allSettled( this.#records.keys().map(async netInterface => { const records = await this.#records.get(netInterface); for (const [portType, portTypeRecords] of records) { @@ -300,7 +302,8 @@ export class MdnsServer { await Time.sleep("MDNS delay", 20 + Math.floor(Math.random() * 100)); // as per DNS-SD spec wait 20-120ms before sending more packets } }), - ); + "Error happened when expiring MDNS announcements", + ).catch(error => logger.error(error)); await this.#records.clear(); this.#recordLastSentAsMulticastAnswer.clear(); this.#recordLastSentAsUnicastAnswer.clear(); diff --git a/packages/protocol/src/mdns/MdnsService.ts b/packages/protocol/src/mdns/MdnsService.ts index 1c5c2fbfcb..b2a1c633eb 100644 --- a/packages/protocol/src/mdns/MdnsService.ts +++ b/packages/protocol/src/mdns/MdnsService.ts @@ -10,6 +10,7 @@ import { Environment, Environmental, Logger, + MatterAggregateError, MaybePromise, Network, VariableService, @@ -92,7 +93,10 @@ export class MdnsService { logger.error("Error disposing of MDNS scanner", e), ); - await Promise.all([broadcasterDisposal, scannerDisposal]); + await MatterAggregateError.allSettled( + [broadcasterDisposal, scannerDisposal], + "Error disposing MDNS services", + ).catch(error => logger.error(error)); this.#broadcaster = this.#scanner = undefined; }); diff --git a/packages/protocol/src/protocol/DeviceAdvertiser.ts b/packages/protocol/src/protocol/DeviceAdvertiser.ts index cb337aa06a..5508cc7e14 100644 --- a/packages/protocol/src/protocol/DeviceAdvertiser.ts +++ b/packages/protocol/src/protocol/DeviceAdvertiser.ts @@ -208,13 +208,11 @@ export class DeviceAdvertiser { async clearBroadcasters() { const broadcasters = [...this.#broadcasters]; - const closed = Promise.allSettled(broadcasters.map(b => b.close())); + const closed = MatterAggregateError.allSettled( + broadcasters.map(b => b.close()), + "Error closing broadcasters", + ).catch(error => logger.error(error)); this.#broadcasters.clear(); - const errors = (await closed) - .map(status => (status.status === "rejected" ? status.reason : undefined)) - .filter(reason => reason !== undefined); - if (errors.length) { - throw new MatterAggregateError(errors, "Error closing broadcasters"); - } + await closed; } } diff --git a/packages/protocol/src/protocol/ExchangeManager.ts b/packages/protocol/src/protocol/ExchangeManager.ts index 60be4b21ad..099b17ec46 100644 --- a/packages/protocol/src/protocol/ExchangeManager.ts +++ b/packages/protocol/src/protocol/ExchangeManager.ts @@ -11,6 +11,7 @@ import { Environmental, ImplementationError, Logger, + MatterAggregateError, MatterError, MatterFlowError, NotImplementedError, @@ -181,10 +182,12 @@ export class ExchangeManager { for (const listeners of this.#listeners.keys()) { this.#deleteListener(listeners); } - await Promise.allSettled(this.#closers); - for (const exchange of this.#exchanges.values()) { - await exchange.destroy(); - } + await MatterAggregateError.allSettled(this.#closers, "Error closing exchanges").catch(error => + logger.error(error), + ); + await MatterAggregateError.allSettled(this.#exchanges.values(), "Error closing exchanges").catch(error => + logger.error(error), + ); this.#exchanges.clear(); } diff --git a/packages/protocol/src/session/SessionManager.ts b/packages/protocol/src/session/SessionManager.ts index 9cb4aa8637..75a39353f1 100644 --- a/packages/protocol/src/session/SessionManager.ts +++ b/packages/protocol/src/session/SessionManager.ts @@ -14,6 +14,7 @@ import { Environmental, Lifecycle, Logger, + MatterAggregateError, MatterFlowError, Mutex, Observable, @@ -533,13 +534,16 @@ export class SessionManager { this.#observers.close(); await this.#storeResumptionRecords(); - for (const session of this.#sessions) { + const closePromises = this.#sessions.map(async session => { await session?.end(false); this.#sessions.delete(session); - } + }); for (const session of this.#insecureSessions.values()) { - await session?.end(); + closePromises.push(session?.end()); } + await MatterAggregateError.allSettled(closePromises, "Error closing sessions").catch(error => + logger.error(error), + ); } async clear() { @@ -558,7 +562,9 @@ export class SessionManager { }); } + /** Clears all subscriptions for a given node and returns how many were cleared. */ async clearSubscriptionsForNode(fabricIndex: FabricIndex, nodeId: NodeId, flushSubscriptions?: boolean) { + let clearedCount = 0; for (const session of this.#sessions) { if (session.fabric?.fabricIndex !== fabricIndex) { continue; @@ -567,7 +573,9 @@ export class SessionManager { continue; } await session.clearSubscriptions(flushSubscriptions); + clearedCount++; } + return clearedCount; } } diff --git a/packages/protocol/test/mdns/MdnsTest.ts b/packages/protocol/test/mdns/MdnsTest.ts index db6de9dab5..b78e3f5124 100644 --- a/packages/protocol/test/mdns/MdnsTest.ts +++ b/packages/protocol/test/mdns/MdnsTest.ts @@ -126,6 +126,7 @@ const NODE_ID = NodeId(BigInt(1)); await MockTime.advance(150); await MockTime.yield3(); await MockTime.yield3(); + await MockTime.advance(150); await promise; };