Skip to content

Commit

Permalink
Merge branch 'main' into test-web-at-root
Browse files Browse the repository at this point in the history
  • Loading branch information
Apollon77 authored Dec 28, 2024
2 parents f506ac0 + 16c32ba commit 16b3962
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 33 deletions.
7 changes: 5 additions & 2 deletions packages/protocol/src/events/OccurrenceManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

import { asyncNew, Construction, ImplementationError, Logger, MaybePromise } from "#general";
import { asyncNew, Construction, ImplementationError, isObject, Logger, MaybePromise } from "#general";
import {
EventNumber,
EventPriority,
Expand Down Expand Up @@ -158,7 +158,10 @@ export class OccurrenceManager {
if (filterForFabricIndex !== undefined) {
result = MaybePromise.then(result, (occurrences: NumberedOccurrence[]) =>
occurrences.filter(({ payload }) => {
const { fabricIndex } = payload as any;
if (!isObject(payload)) {
return true;
}
const { fabricIndex } = payload;
return fabricIndex === undefined || fabricIndex === filterForFabricIndex;
}),
);
Expand Down
76 changes: 45 additions & 31 deletions packages/protocol/src/interaction/ServerSubscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import {
NoResponseTimeoutError,
Time,
Timer,
createPromise,
isObject,
} from "#general";
import { Specification } from "#model";
Expand Down Expand Up @@ -150,7 +151,7 @@ export class ServerSubscription extends Subscription {

#lastUpdateTimeMs = 0;
#updateTimer: Timer;
readonly #sendDelayTimer: Timer;
readonly #sendDelayTimer = Time.getTimer("Subscription delay", 50, () => this.#triggerSendUpdate());
readonly #outstandingAttributeUpdates = new Map<string, AttributePathWithValueVersion<any>>();
readonly #outstandingEventUpdates = new Set<EventPathWithEventData<any>>();
readonly #attributeListeners = new Map<
Expand All @@ -174,10 +175,10 @@ export class ServerSubscription extends Subscription {
private readonly maxIntervalCeilingMs: number;
private readonly peerAddress: PeerAddress;

private sendingUpdateInProgress = false;
private sendNextUpdateImmediately = false;
private sendUpdateErrorCounter = 0;
private attributeUpdatePromises = new Set<PromiseLike<void>>();
private currentUpdatePromise?: Promise<void>;

constructor(options: {
id: number;
Expand Down Expand Up @@ -205,10 +206,7 @@ export class ServerSubscription extends Subscription {
this.#maxIntervalMs = maxInterval;
this.#sendIntervalMs = sendInterval;

this.#updateTimer = Time.getTimer("Subscription update", this.#sendIntervalMs, () => this.prepareDataUpdate()); // will be started later
this.#sendDelayTimer = Time.getTimer("Subscription delay", 50, () =>
this.sendUpdate().catch(error => logger.warn("Sending subscription update failed:", error)),
); // will be started later
this.#updateTimer = Time.getTimer("Subscription update", this.#sendIntervalMs, () => this.#prepareDataUpdate()); // will be started later
}

private determineSendingIntervals(
Expand Down Expand Up @@ -476,7 +474,7 @@ export class ServerSubscription extends Subscription {
this.#outstandingEventUpdates.add(occurrence);
}

this.prepareDataUpdate();
this.#prepareDataUpdate();
}

get maxInterval(): number {
Expand All @@ -494,12 +492,15 @@ export class ServerSubscription extends Subscription {

this.#sendUpdatesActivated = true;
if (this.#outstandingAttributeUpdates.size > 0 || this.#outstandingEventUpdates.size > 0) {
void this.sendUpdate();
this.#triggerSendUpdate();
}
this.#updateTimer = Time.getTimer("Subscription update", this.#sendIntervalMs, () =>
this.prepareDataUpdate(),
this.#prepareDataUpdate(),
).start();
this.#structure.change.on(() => {
if (this.isClosed) {
return;
}
// TODO When change is AsyncObservable can be simplified
this.updateSubscription().catch(error =>
logger.error("Error updating subscription after structure change:", error),
Expand All @@ -511,9 +512,9 @@ export class ServerSubscription extends Subscription {
* Check if data should be sent straight away or delayed because the minimum interval is not reached. Delay real
* sending by 50ms in any case to mke sure to catch all updates.
*/
prepareDataUpdate() {
if (this.#sendDelayTimer.isRunning) {
// sending data is already scheduled, data updates go in there
#prepareDataUpdate() {
if (this.#sendDelayTimer.isRunning || this.isClosed) {
// sending data is already scheduled, data updates go in there ... or we close down already
return;
}

Expand All @@ -529,27 +530,36 @@ export class ServerSubscription extends Subscription {
this.#updateTimer = Time.getTimer(
"Subscription update",
this.minIntervalFloorMs - timeSinceLastUpdateMs,
() => this.prepareDataUpdate(),
() => this.#prepareDataUpdate(),
).start();
return;
}

this.#sendDelayTimer.start();
this.#updateTimer = Time.getTimer("Subscription update", this.#sendIntervalMs, () =>
this.prepareDataUpdate(),
this.#prepareDataUpdate(),
).start();
}

/**
* Determine all attributes that have changed since the last update and send them tout to the subscriber.
*/
async sendUpdate() {
if (this.sendingUpdateInProgress) {
#triggerSendUpdate() {
if (this.currentUpdatePromise !== undefined) {
logger.debug("Sending update already in progress, delaying update ...");
this.sendNextUpdateImmediately = true;
return;
}
const { promise, resolver } = createPromise<void>();
this.#sendUpdate()
.then(resolver)
.catch(error => logger.warn("Sending subscription update failed:", error))
.finally(() => (this.currentUpdatePromise = undefined));
this.currentUpdatePromise = promise;
}

/**
* Determine all attributes that have changed since the last update and send them tout to the subscriber.
* Important: This method MUST NOT be called directly. Use triggerSendUpdate() instead!
*/
async #sendUpdate() {
// Get all outstanding updates, make sure the order is correct per endpoint and cluster
const attributeUpdatesToSend = new Array<AttributePathWithValueVersion<any>>();
const attributeUpdates: Record<string, AttributePathWithValueVersion<any>[]> = {};
Expand All @@ -572,7 +582,6 @@ export class ServerSubscription extends Subscription {
this.#outstandingEventUpdates.clear();
this.#lastUpdateTimeMs = Time.nowMs();

this.sendingUpdateInProgress = true;
try {
await this.sendUpdateMessage(attributeUpdatesToSend, eventUpdatesToSend);
this.sendUpdateErrorCounter = 0;
Expand Down Expand Up @@ -616,12 +625,11 @@ export class ServerSubscription extends Subscription {
}
}
}
this.sendingUpdateInProgress = false;

if (this.sendNextUpdateImmediately) {
logger.debug("Sending delayed update immediately after last one was sent.");
this.sendNextUpdateImmediately = false;
await this.sendUpdate();
await this.#sendUpdate();
}
}

Expand Down Expand Up @@ -795,11 +803,11 @@ export class ServerSubscription extends Subscription {
version,
value,
});
this.prepareDataUpdate();
this.#prepareDataUpdate();
});
}
this.#outstandingAttributeUpdates.set(attributePathToId(path), { attribute, path, schema, version, value });
this.prepareDataUpdate();
this.#prepareDataUpdate();
}

eventChangeListener<T>(path: EventPath, schema: TlvSchema<T>, newEvent: NumberedOccurrence) {
Expand All @@ -820,35 +828,41 @@ export class ServerSubscription extends Subscription {
}
this.#outstandingEventUpdates.add({ event, path, schema, data: newEvent });
if (path.isUrgent) {
this.prepareDataUpdate();
this.#prepareDataUpdate();
}
}

async flush() {
this.#sendDelayTimer.stop();
logger.debug(
`Flushing subscription ${this.id} with ${this.#outstandingAttributeUpdates.size} attributes and ${this.#outstandingEventUpdates.size} events`,
);
if (this.#outstandingAttributeUpdates.size > 0 || this.#outstandingEventUpdates.size > 0) {
void this.sendUpdate();
logger.debug(
`Flushing subscription ${this.id} with ${this.#outstandingAttributeUpdates.size} attributes and ${this.#outstandingEventUpdates.size} events`,
);
this.#triggerSendUpdate();
if (this.currentUpdatePromise) {
await this.currentUpdatePromise;
}
}
}

override async close(graceful = false) {
this.isClosed = true;
this.#sendUpdatesActivated = false;
this.unregisterAttributeListeners(Array.from(this.#attributeListeners.keys()));
this.unregisterEventListeners(Array.from(this.#eventListeners.keys()));
if (this.attributeUpdatePromises.size) {
const resolvers = [...this.attributeUpdatePromises.values()];
this.attributeUpdatePromises.clear();
await Promise.all(resolvers);
}
this.#updateTimer.stop();
this.#sendDelayTimer.stop();
this.unregisterAttributeListeners(Array.from(this.#attributeListeners.keys()));
this.unregisterEventListeners(Array.from(this.#eventListeners.keys()));
if (graceful) {
await this.flush();
}
if (this.currentUpdatePromise) {
await this.currentUpdatePromise;
}
await super.close();
}

Expand Down

0 comments on commit 16b3962

Please sign in to comment.