Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shutdown and promise optimizations #1586

Merged
merged 13 commits into from
Dec 31, 2024
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ The main work (all changes without a GitHub username in brackets in the below li

- @matter/node
- Enhancement: Each new PASE session now automatically arms the failsafe timer for 60s as required by specs
- Enhancement: Optimizes Node shutdown logic to close sessions and subscriptions before shutting down the network
- Fix: Fixes withBehaviors() method on endpoints

- @matter/nodejs-ble
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ export abstract class NetworkRuntime {
} finally {
this.#owner.behaviors.internalsOf(NetworkBehavior).runtime = undefined;
}
await this.owner.prepareRuntimeShutdown();
await this.#owner.act(agent => this.owner.lifecycle.offline.emit(agent.context));
}

Expand Down
17 changes: 12 additions & 5 deletions packages/node/src/behavior/system/network/ServerNetworkRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -315,17 +315,24 @@ export class ServerNetworkRuntime extends NetworkRuntime {
this.#observers.close();

await this.owner.env.close(DeviceCommissioner);
await this.owner.env.close(DeviceAdvertiser);
// Shutdown the Broadcaster if DeviceAdvertiser is not initialized
// We kick-off the Advertiser shutdown to prevent re-announces when removing sessions and wait a bit later
const advertisementShutdown = this.owner.env.has(DeviceAdvertiser)
? this.owner.env.close(DeviceAdvertiser)
: this.#mdnsBroadcaster?.close();
this.#mdnsBroadcaster = undefined;

await this.owner.prepareRuntimeShutdown();

// Now all sessions are closed, so we wait for Advertiser to be gone
await advertisementShutdown;

await this.owner.env.close(ExchangeManager);
await this.owner.env.close(SecureChannelProtocol);
await this.owner.env.close(TransportInterfaceSet);

await this.#interactionServer?.[Symbol.asyncDispose]();
this.#interactionServer = undefined;

// DeviceAdvertiser does this but we do so here just in case DeviceAdvertiser did not initialize for some reason
await this.#mdnsBroadcaster?.close();
this.#mdnsBroadcaster = undefined;
}

protected override blockNewActivity() {
Expand Down
25 changes: 5 additions & 20 deletions packages/nodejs/src/storage/StorageBackendDiskAsync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

import {
createPromise,
fromJson,
Logger,
MaybeAsyncStorage,
StorageError,
SupportedStorageTypes,
toJson,
} from "#general";
import { fromJson, Logger, MaybeAsyncStorage, StorageError, SupportedStorageTypes, toJson } from "#general";
import { mkdir, readdir, readFile, rm, writeFile } from "fs/promises";
import { join } from "path";

Expand Down Expand Up @@ -133,18 +125,11 @@ export class StorageBackendDiskAsync extends MaybeAsyncStorage {
return this.#writeFile(fileName, value);
}

const { promise, rejecter, resolver } = createPromise<void>();

const promise = writeFile(this.filePath(fileName), value, "utf8").finally(() => {
this.#writeFileBlocker.delete(fileName);
});
this.#writeFileBlocker.set(fileName, promise);
writeFile(this.filePath(fileName), value, "utf8")
.then(() => {
this.#writeFileBlocker.delete(fileName);
resolver();
})
.catch(() => {
this.#writeFileBlocker.delete(fileName);
rejecter();
});

return promise;
}

Expand Down
12 changes: 9 additions & 3 deletions packages/protocol/src/interaction/InteractionServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -955,10 +955,16 @@ export class InteractionServer implements ProtocolHandler, InteractionRecipient
);

if (!keepSubscriptions) {
logger.debug(
`Clear subscriptions for Subscriber node ${session.peerNodeId} because keepSubscriptions=false`,
const clearedCount = await this.#context.sessions.clearSubscriptionsForNode(
fabric.fabricIndex,
session.peerNodeId,
true,
);
await this.#context.sessions.clearSubscriptionsForNode(fabric.fabricIndex, session.peerNodeId, true);
if (clearedCount > 0) {
logger.debug(
`Cleared ${clearedCount} subscriptions for Subscriber node ${session.peerNodeId} because keepSubscriptions=false`,
);
}
}

if (
Expand Down
32 changes: 17 additions & 15 deletions packages/protocol/src/interaction/ServerSubscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import {
NoResponseTimeoutError,
Time,
Timer,
createPromise,
isObject,
} from "#general";
import { Specification } from "#model";
Expand Down Expand Up @@ -151,7 +150,9 @@ export class ServerSubscription extends Subscription {

#lastUpdateTimeMs = 0;
#updateTimer: Timer;
readonly #sendDelayTimer = Time.getTimer("Subscription delay", 50, () => this.#triggerSendUpdate());
readonly #sendDelayTimer: Timer = Time.getTimer(`Subscription ${this.id} delay`, 50, () =>
this.#triggerSendUpdate(),
);
readonly #outstandingAttributeUpdates = new Map<string, AttributePathWithValueVersion<any>>();
readonly #outstandingEventUpdates = new Set<EventPathWithEventData<any>>();
readonly #attributeListeners = new Map<
Expand Down Expand Up @@ -206,7 +207,9 @@ export class ServerSubscription extends Subscription {
this.#maxIntervalMs = maxInterval;
this.#sendIntervalMs = sendInterval;

this.#updateTimer = Time.getTimer("Subscription update", this.#sendIntervalMs, () => this.#prepareDataUpdate()); // will be started later
this.#updateTimer = Time.getTimer(`Subscription ${this.id} update`, this.#sendIntervalMs, () =>
this.#prepareDataUpdate(),
); // will be started later
}

private determineSendingIntervals(
Expand Down Expand Up @@ -536,7 +539,7 @@ export class ServerSubscription extends Subscription {
}

this.#sendDelayTimer.start();
this.#updateTimer = Time.getTimer("Subscription update", this.#sendIntervalMs, () =>
this.#updateTimer = Time.getTimer(`Subscription update ${this.id}`, this.#sendIntervalMs, () =>
this.#prepareDataUpdate(),
).start();
}
Expand All @@ -547,19 +550,16 @@ export class ServerSubscription extends Subscription {
this.sendNextUpdateImmediately = true;
return;
}
const { promise, resolver } = createPromise<void>();
this.#sendUpdate()
.then(resolver)
this.currentUpdatePromise = this.#sendUpdate()
.catch(error => logger.warn("Sending subscription update failed:", error))
.finally(() => (this.currentUpdatePromise = undefined));
this.currentUpdatePromise = promise;
}

/**
* Determine all attributes that have changed since the last update and send them tout to the subscriber.
* Important: This method MUST NOT be called directly. Use triggerSendUpdate() instead!
*/
async #sendUpdate() {
async #sendUpdate(onlyWithData = false) {
// Get all outstanding updates, make sure the order is correct per endpoint and cluster
const attributeUpdatesToSend = new Array<AttributePathWithValueVersion<any>>();
const attributeUpdates: Record<string, AttributePathWithValueVersion<any>[]> = {};
Expand All @@ -580,6 +580,11 @@ export class ServerSubscription extends Subscription {

const eventUpdatesToSend = Array.from(this.#outstandingEventUpdates.values());
this.#outstandingEventUpdates.clear();

if (onlyWithData && attributeUpdatesToSend.length === 0 && eventUpdatesToSend.length === 0) {
return;
}

this.#lastUpdateTimeMs = Time.nowMs();

try {
Expand Down Expand Up @@ -629,7 +634,7 @@ export class ServerSubscription extends Subscription {
if (this.sendNextUpdateImmediately) {
logger.debug("Sending delayed update immediately after last one was sent.");
this.sendNextUpdateImmediately = false;
await this.#sendUpdate();
await this.#sendUpdate(true); // Send but only if non-empty
}
}

Expand Down Expand Up @@ -836,7 +841,7 @@ export class ServerSubscription extends Subscription {
this.#sendDelayTimer.stop();
if (this.#outstandingAttributeUpdates.size > 0 || this.#outstandingEventUpdates.size > 0) {
logger.debug(
`Flushing subscription ${this.id} with ${this.#outstandingAttributeUpdates.size} attributes and ${this.#outstandingEventUpdates.size} events`,
`Flushing subscription ${this.id} with ${this.#outstandingAttributeUpdates.size} attributes and ${this.#outstandingEventUpdates.size} events${this.isClosed ? " (for closing)" : ""}`,
);
this.#triggerSendUpdate();
if (this.currentUpdatePromise) {
Expand Down Expand Up @@ -870,9 +875,6 @@ export class ServerSubscription extends Subscription {
attributes: AttributePathWithValueVersion<any>[],
events: EventPathWithEventData<any>[],
) {
logger.debug(
`Sending subscription update message for ID ${this.id} with ${attributes.length} attributes and ${events.length} events`,
);
const exchange = this.#context.initiateExchange(this.peerAddress, INTERACTION_PROTOCOL_ID);
if (exchange === undefined) return;
if (attributes.length) {
Expand Down Expand Up @@ -901,7 +903,7 @@ export class ServerSubscription extends Subscription {
} else {
await messenger.sendDataReport(
{
suppressResponse: false, // Non empty data reports always need to send response
suppressResponse: false, // Non-empty data reports always need to send response
subscriptionId: this.id,
interactionModelRevision: Specification.INTERACTION_MODEL_REVISION,
attributeReportsPayload: attributes.map(({ path, schema, value, version, attribute }) => ({
Expand Down
4 changes: 2 additions & 2 deletions packages/protocol/src/mdns/MdnsServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ export class MdnsServer {
}

async announce(announcedNetPort?: number) {
await Promise.all(
await Promise.allSettled(
Apollon77 marked this conversation as resolved.
Show resolved Hide resolved
(await this.#getMulticastInterfacesForAnnounce()).map(async ({ name: netInterface }) => {
const records = await this.#records.get(netInterface);
for (const [portType, portTypeRecords] of records) {
Expand All @@ -267,7 +267,7 @@ export class MdnsServer {
}

async expireAnnouncements(announcedNetPort?: number, type?: AnnouncementType) {
await Promise.all(
await Promise.allSettled(
Apollon77 marked this conversation as resolved.
Show resolved Hide resolved
this.#records.keys().map(async netInterface => {
const records = await this.#records.get(netInterface);
for (const [portType, portTypeRecords] of records) {
Expand Down
11 changes: 8 additions & 3 deletions packages/protocol/src/session/SessionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -533,13 +533,14 @@ export class SessionManager {

this.#observers.close();
await this.#storeResumptionRecords();
for (const session of this.#sessions) {
const closePromises = this.#sessions.map(async session => {
await session?.end(false);
this.#sessions.delete(session);
}
});
for (const session of this.#insecureSessions.values()) {
await session?.end();
closePromises.push(session?.end());
}
await Promise.allSettled(closePromises);
Apollon77 marked this conversation as resolved.
Show resolved Hide resolved
}

async clear() {
Expand All @@ -558,7 +559,9 @@ export class SessionManager {
});
}

/** Clears all subscriptions for a given node and returns how many were cleared. */
async clearSubscriptionsForNode(fabricIndex: FabricIndex, nodeId: NodeId, flushSubscriptions?: boolean) {
let clearedCount = 0;
for (const session of this.#sessions) {
if (session.fabric?.fabricIndex !== fabricIndex) {
continue;
Expand All @@ -567,7 +570,9 @@ export class SessionManager {
continue;
}
await session.clearSubscriptions(flushSubscriptions);
clearedCount++;
}
return clearedCount;
}
}

Expand Down
Loading