From bb7e829ad2ab679f0147505228f5d1e067127b43 Mon Sep 17 00:00:00 2001 From: Ingo Fischer Date: Sat, 28 Dec 2024 10:33:28 +0100 Subject: [PATCH 1/2] Correctly check object for events (#1580) stumbled ober an destruction error for "undefined" payload --- packages/protocol/src/events/OccurrenceManager.ts | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/packages/protocol/src/events/OccurrenceManager.ts b/packages/protocol/src/events/OccurrenceManager.ts index 7f765cdbc..c7f86a3c6 100644 --- a/packages/protocol/src/events/OccurrenceManager.ts +++ b/packages/protocol/src/events/OccurrenceManager.ts @@ -4,7 +4,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -import { asyncNew, Construction, ImplementationError, Logger, MaybePromise } from "#general"; +import { asyncNew, Construction, ImplementationError, isObject, Logger, MaybePromise } from "#general"; import { EventNumber, EventPriority, @@ -158,7 +158,10 @@ export class OccurrenceManager { if (filterForFabricIndex !== undefined) { result = MaybePromise.then(result, (occurrences: NumberedOccurrence[]) => occurrences.filter(({ payload }) => { - const { fabricIndex } = payload as any; + if (!isObject(payload)) { + return true; + } + const { fabricIndex } = payload; return fabricIndex === undefined || fabricIndex === filterForFabricIndex; }), ); From 16c32bab1d6256b44c683b6a21da3e494938d438 Mon Sep 17 00:00:00 2001 From: Ingo Fischer Date: Sat, 28 Dec 2024 13:25:33 +0100 Subject: [PATCH 2/2] Optimize Subscription handling and keep promise whe updating to wait on close --- .../src/interaction/ServerSubscription.ts | 76 +++++++++++-------- 1 file changed, 45 insertions(+), 31 deletions(-) diff --git a/packages/protocol/src/interaction/ServerSubscription.ts b/packages/protocol/src/interaction/ServerSubscription.ts index 05e6dca8f..6314a5d32 100644 --- a/packages/protocol/src/interaction/ServerSubscription.ts +++ b/packages/protocol/src/interaction/ServerSubscription.ts @@ -14,6 +14,7 @@ import { NoResponseTimeoutError, Time, Timer, + createPromise, isObject, } from "#general"; import { Specification } from "#model"; @@ -150,7 +151,7 @@ export class ServerSubscription extends Subscription { #lastUpdateTimeMs = 0; #updateTimer: Timer; - readonly #sendDelayTimer: Timer; + readonly #sendDelayTimer = Time.getTimer("Subscription delay", 50, () => this.#triggerSendUpdate()); readonly #outstandingAttributeUpdates = new Map>(); readonly #outstandingEventUpdates = new Set>(); readonly #attributeListeners = new Map< @@ -174,10 +175,10 @@ export class ServerSubscription extends Subscription { private readonly maxIntervalCeilingMs: number; private readonly peerAddress: PeerAddress; - private sendingUpdateInProgress = false; private sendNextUpdateImmediately = false; private sendUpdateErrorCounter = 0; private attributeUpdatePromises = new Set>(); + private currentUpdatePromise?: Promise; constructor(options: { id: number; @@ -205,10 +206,7 @@ export class ServerSubscription extends Subscription { this.#maxIntervalMs = maxInterval; this.#sendIntervalMs = sendInterval; - this.#updateTimer = Time.getTimer("Subscription update", this.#sendIntervalMs, () => this.prepareDataUpdate()); // will be started later - this.#sendDelayTimer = Time.getTimer("Subscription delay", 50, () => - this.sendUpdate().catch(error => logger.warn("Sending subscription update failed:", error)), - ); // will be started later + this.#updateTimer = Time.getTimer("Subscription update", this.#sendIntervalMs, () => this.#prepareDataUpdate()); // will be started later } private determineSendingIntervals( @@ -476,7 +474,7 @@ export class ServerSubscription extends Subscription { this.#outstandingEventUpdates.add(occurrence); } - this.prepareDataUpdate(); + this.#prepareDataUpdate(); } get maxInterval(): number { @@ -494,12 +492,15 @@ export class ServerSubscription extends Subscription { this.#sendUpdatesActivated = true; if (this.#outstandingAttributeUpdates.size > 0 || this.#outstandingEventUpdates.size > 0) { - void this.sendUpdate(); + this.#triggerSendUpdate(); } this.#updateTimer = Time.getTimer("Subscription update", this.#sendIntervalMs, () => - this.prepareDataUpdate(), + this.#prepareDataUpdate(), ).start(); this.#structure.change.on(() => { + if (this.isClosed) { + return; + } // TODO When change is AsyncObservable can be simplified this.updateSubscription().catch(error => logger.error("Error updating subscription after structure change:", error), @@ -511,9 +512,9 @@ export class ServerSubscription extends Subscription { * Check if data should be sent straight away or delayed because the minimum interval is not reached. Delay real * sending by 50ms in any case to mke sure to catch all updates. */ - prepareDataUpdate() { - if (this.#sendDelayTimer.isRunning) { - // sending data is already scheduled, data updates go in there + #prepareDataUpdate() { + if (this.#sendDelayTimer.isRunning || this.isClosed) { + // sending data is already scheduled, data updates go in there ... or we close down already return; } @@ -529,27 +530,36 @@ export class ServerSubscription extends Subscription { this.#updateTimer = Time.getTimer( "Subscription update", this.minIntervalFloorMs - timeSinceLastUpdateMs, - () => this.prepareDataUpdate(), + () => this.#prepareDataUpdate(), ).start(); return; } this.#sendDelayTimer.start(); this.#updateTimer = Time.getTimer("Subscription update", this.#sendIntervalMs, () => - this.prepareDataUpdate(), + this.#prepareDataUpdate(), ).start(); } - /** - * Determine all attributes that have changed since the last update and send them tout to the subscriber. - */ - async sendUpdate() { - if (this.sendingUpdateInProgress) { + #triggerSendUpdate() { + if (this.currentUpdatePromise !== undefined) { logger.debug("Sending update already in progress, delaying update ..."); this.sendNextUpdateImmediately = true; return; } + const { promise, resolver } = createPromise(); + this.#sendUpdate() + .then(resolver) + .catch(error => logger.warn("Sending subscription update failed:", error)) + .finally(() => (this.currentUpdatePromise = undefined)); + this.currentUpdatePromise = promise; + } + /** + * Determine all attributes that have changed since the last update and send them tout to the subscriber. + * Important: This method MUST NOT be called directly. Use triggerSendUpdate() instead! + */ + async #sendUpdate() { // Get all outstanding updates, make sure the order is correct per endpoint and cluster const attributeUpdatesToSend = new Array>(); const attributeUpdates: Record[]> = {}; @@ -572,7 +582,6 @@ export class ServerSubscription extends Subscription { this.#outstandingEventUpdates.clear(); this.#lastUpdateTimeMs = Time.nowMs(); - this.sendingUpdateInProgress = true; try { await this.sendUpdateMessage(attributeUpdatesToSend, eventUpdatesToSend); this.sendUpdateErrorCounter = 0; @@ -616,12 +625,11 @@ export class ServerSubscription extends Subscription { } } } - this.sendingUpdateInProgress = false; if (this.sendNextUpdateImmediately) { logger.debug("Sending delayed update immediately after last one was sent."); this.sendNextUpdateImmediately = false; - await this.sendUpdate(); + await this.#sendUpdate(); } } @@ -795,11 +803,11 @@ export class ServerSubscription extends Subscription { version, value, }); - this.prepareDataUpdate(); + this.#prepareDataUpdate(); }); } this.#outstandingAttributeUpdates.set(attributePathToId(path), { attribute, path, schema, version, value }); - this.prepareDataUpdate(); + this.#prepareDataUpdate(); } eventChangeListener(path: EventPath, schema: TlvSchema, newEvent: NumberedOccurrence) { @@ -820,23 +828,28 @@ export class ServerSubscription extends Subscription { } this.#outstandingEventUpdates.add({ event, path, schema, data: newEvent }); if (path.isUrgent) { - this.prepareDataUpdate(); + this.#prepareDataUpdate(); } } async flush() { this.#sendDelayTimer.stop(); - logger.debug( - `Flushing subscription ${this.id} with ${this.#outstandingAttributeUpdates.size} attributes and ${this.#outstandingEventUpdates.size} events`, - ); if (this.#outstandingAttributeUpdates.size > 0 || this.#outstandingEventUpdates.size > 0) { - void this.sendUpdate(); + logger.debug( + `Flushing subscription ${this.id} with ${this.#outstandingAttributeUpdates.size} attributes and ${this.#outstandingEventUpdates.size} events`, + ); + this.#triggerSendUpdate(); + if (this.currentUpdatePromise) { + await this.currentUpdatePromise; + } } } override async close(graceful = false) { this.isClosed = true; this.#sendUpdatesActivated = false; + this.unregisterAttributeListeners(Array.from(this.#attributeListeners.keys())); + this.unregisterEventListeners(Array.from(this.#eventListeners.keys())); if (this.attributeUpdatePromises.size) { const resolvers = [...this.attributeUpdatePromises.values()]; this.attributeUpdatePromises.clear(); @@ -844,11 +857,12 @@ export class ServerSubscription extends Subscription { } this.#updateTimer.stop(); this.#sendDelayTimer.stop(); - this.unregisterAttributeListeners(Array.from(this.#attributeListeners.keys())); - this.unregisterEventListeners(Array.from(this.#eventListeners.keys())); if (graceful) { await this.flush(); } + if (this.currentUpdatePromise) { + await this.currentUpdatePromise; + } await super.close(); }