From 68025a1e6199697cc24469b2ae990173d2930508 Mon Sep 17 00:00:00 2001 From: Greg Lauckhart Date: Wed, 13 Nov 2024 10:53:26 -0800 Subject: [PATCH] Remove $Changed from logical flow of emitter Previously we were providing the ActionContext of the emitter to $Changed handlers. The transaction was destroyed in this case so was not really useful. Now we instead just provide metadata about the subject that triggered the change. Also, the emitter was awaiting $Changed handlers if they were async. There are advantages and disadvantages to doing this: * Advantage is that the logical flow is more deterministic and we track the promise as a normal part of node activity * The disadvantage is that the emitter blocks until an independent process completes This commit makes it so $Changed handlers operate independently of the emitter. Tests pass but effects will be subtle so will need to keep an eye on this. --- packages/node/src/behavior/AccessControl.ts | 36 +++++++------ .../src/behavior/cluster/ClusterEvents.ts | 18 ++++--- .../behavior/internal/ClusterServerBacking.ts | 18 ++++--- .../src/behavior/state/managed/Datasource.ts | 39 +++++++------- .../behavior/cluster/ClusterBehaviorTest.ts | 5 +- .../behavior/cluster/ClusterEventsTest.ts | 11 +++- .../behavior/state/managed/DatasourceTest.ts | 51 ++++++++++++++++--- .../test/behaviors/switch/SwitchServerTest.ts | 12 ++++- 8 files changed, 131 insertions(+), 59 deletions(-) diff --git a/packages/node/src/behavior/AccessControl.ts b/packages/node/src/behavior/AccessControl.ts index 9c7ce16017..d12e9d8f88 100644 --- a/packages/node/src/behavior/AccessControl.ts +++ b/packages/node/src/behavior/AccessControl.ts @@ -122,14 +122,9 @@ export namespace AccessControl { } /** - * Authorization metadata that varies with session. + * Information about the subject that triggered a change. */ - export interface Session { - /** - * Checks if the authorized client has a certain Access Privilege granted. - */ - authorizedFor(desiredAccessLevel: AccessLevel, location?: Location): boolean; - + export interface Subject { /** * The fabric of the authorized client. */ @@ -140,6 +135,25 @@ export namespace AccessControl { */ readonly subject?: SubjectId; + /** + * Indicates the action was triggered locally, not by a remote subject. + * + * If true, access levels are not enforced and all values are read/write. Datatypes are still enforced. + * + * Tracks "offline" rather than "online" because this makes the safer mode (full enforcement) the default. + */ + offline?: boolean; + } + + /** + * Authorization metadata that varies with session. + */ + export interface Session extends Subject { + /** + * Checks if the authorized client has a certain Access Privilege granted. + */ + authorizedFor(desiredAccessLevel: AccessLevel, location?: Location): boolean; + /** * If this is true, fabric-scoped lists are filtered to the accessing * fabric. @@ -156,14 +170,6 @@ export namespace AccessControl { * active. */ readonly command?: boolean; - - /** - * If this is true then access levels are not enforced and all values are read/write. Datatypes are still - * enforced. - * - * Tracks "offline" rather than "online" because this makes the safer mode (full enforcement) the default. - */ - offline?: boolean; } } diff --git a/packages/node/src/behavior/cluster/ClusterEvents.ts b/packages/node/src/behavior/cluster/ClusterEvents.ts index 130541545d..27dc1904f4 100644 --- a/packages/node/src/behavior/cluster/ClusterEvents.ts +++ b/packages/node/src/behavior/cluster/ClusterEvents.ts @@ -4,6 +4,7 @@ * SPDX-License-Identifier: Apache-2.0 */ +import { AccessControl } from "#behavior/AccessControl.js"; import type { AsyncObservable, Observable } from "#general"; import type { ClusterType, TypeFromSchema } from "#types"; import type { Behavior } from "../Behavior.js"; @@ -27,18 +28,18 @@ export namespace ClusterEvents { /** * Properties the cluster contributes to Events. */ - export type Properties = AttributeObservables, "Changing"> & - AttributeObservables, "Changed"> & + export type Properties = AttributeObservables, "Changing", ActionContext> & + AttributeObservables, "Changed", AccessControl.Subject> & EventObservables>; - export type AttributeObservables, N extends string> = { + export type AttributeObservables, N extends string, C> = { [K in keyof A as string extends K ? never : K extends string ? A[K] extends { optional: true } ? never : `${K}$${N}` - : never]: AttributeObservable; + : never]: AttributeObservable; } & { [K in keyof A as string extends K ? never @@ -46,12 +47,13 @@ export namespace ClusterEvents { ? A[K] extends { optional: true } ? `${K}$${N}` : never - : never]?: AttributeObservable; + : never]?: AttributeObservable; }; - export type AttributeObservable = AsyncObservable< - [value: TypeFromSchema, oldValue: TypeFromSchema, context: ActionContext] - >; + export type AttributeObservable< + A extends ClusterType.Attribute = ClusterType.Attribute, + C = unknown, + > = AsyncObservable<[value: TypeFromSchema, oldValue: TypeFromSchema, context: C]>; export type EventObservables> = { [K in keyof E as string extends K diff --git a/packages/node/src/behavior/internal/ClusterServerBacking.ts b/packages/node/src/behavior/internal/ClusterServerBacking.ts index 25452e98c6..c35b120254 100644 --- a/packages/node/src/behavior/internal/ClusterServerBacking.ts +++ b/packages/node/src/behavior/internal/ClusterServerBacking.ts @@ -18,7 +18,6 @@ import { EventServer, FabricManager, Message, - SecureSession, } from "#protocol"; import { Attribute, Command, Event, TlvNoResponse } from "#types"; import { AccessControl } from "../AccessControl.js"; @@ -274,13 +273,16 @@ function createAttributeServer( // Wire events (FixedAttributeServer is not an AttributeServer so we skip that) if (server instanceof AttributeServer) { const observable = (backing.events as any)[`${name}$Changed`] as ClusterEvents.AttributeObservable | undefined; - observable?.on((_value, _oldValue, context) => { - const session = context.session; - if (session instanceof SecureSession) { - server.updated(session); - } else { - server.updatedLocal(); - } + observable?.on(() => { + // We no longer have a session in $Changed handlers. I think it's OK to use updatedLocal instead of + // updated as our use of listeners is limited. And updated() will actually notify listeners of fabric + // filtered result which probably isn't correct anyway + // const session = context.session; + // if (session instanceof SecureSession) { + // server.updated(session); + // } else { + server.updatedLocal(); + // } }); } diff --git a/packages/node/src/behavior/state/managed/Datasource.ts b/packages/node/src/behavior/state/managed/Datasource.ts index f1d99a9975..814e8c38b5 100644 --- a/packages/node/src/behavior/state/managed/Datasource.ts +++ b/packages/node/src/behavior/state/managed/Datasource.ts @@ -185,7 +185,7 @@ export namespace Datasource { } export interface ValueObserver { - (value: Val, oldValue: Val, context?: ValueSupervisor.Session): void; + (value: Val, oldValue: Val, context?: unknown): void; } } @@ -215,6 +215,7 @@ interface Internals extends Datasource.Options { interface CommitChanges { persistent?: Val.Struct; notifications: Array<{ + name: string; event: Observable; params: Parameters; }>; @@ -541,8 +542,13 @@ function createSessionContext(resource: Resource, internals: Internals, session: const event = internals.events?.[`${name}$Changed`]; if (event?.isObserved) { changes.notifications.push({ + name, event, - params: [values[name], internals.values[name], session], + params: [ + values[name], + internals.values[name], + { fabric: session.fabric, subject: session.subject, offline: session.offline }, + ], }); } } @@ -606,25 +612,24 @@ function createSessionContext(resource: Resource, internals: Internals, session: return; } - // Emit is optionally async so we must iterate manually - const iterator = changes.notifications[Symbol.iterator](); - - function emitChanged(): MaybePromise { - while (true) { - const n = iterator.next(); - if (n.done) { - return; - } - - const { event, params } = n.value; - const result = event.emit(...params); - if (MaybePromise.is(result)) { - return Promise.resolve(result).then(emitChanged); + // Changed events do not participate in logical flow of the original change. We allow them to run in parallel + // and log errors independently + for (const { name, event, params } of changes.notifications) { + try { + const promise = event.emit(...params); + if (MaybePromise.is(promise)) { + // Currently we do not track this promise, so it is up to higher-level logic (such as for reactors) + // to track. Could add a registration system if necessary + promise.then(undefined, e => unhandled(name, e)); } + } catch (e) { + unhandled(name, e); } } - return emitChanged(); + function unhandled(fieldName: string, error: unknown) { + logger.error(`Unhandled error in ${fieldName}$Changed handler of ${internals.path}:`, error); + } } /** diff --git a/packages/node/test/behavior/cluster/ClusterBehaviorTest.ts b/packages/node/test/behavior/cluster/ClusterBehaviorTest.ts index b6ac6f5841..cbd9351edd 100644 --- a/packages/node/test/behavior/cluster/ClusterBehaviorTest.ts +++ b/packages/node/test/behavior/cluster/ClusterBehaviorTest.ts @@ -20,6 +20,7 @@ import { MaybePromise, Observable, } from "#general"; +import { AccessControl } from "#index.js"; import { AttributeElement, ClusterModel } from "#model"; import { Attribute, @@ -106,7 +107,9 @@ describe("ClusterBehavior", () => { ({}) as MyBehavior satisfies { events: EventEmitter & { - reqAttr$Changed: AsyncObservable<[value: string, oldValue: string, context?: ActionContext]>; + reqAttr$Changed: AsyncObservable< + [value: string, oldValue: string, context?: AccessControl.Subject] + >; }; }; diff --git a/packages/node/test/behavior/cluster/ClusterEventsTest.ts b/packages/node/test/behavior/cluster/ClusterEventsTest.ts index c5fcfc109e..c00f95d987 100644 --- a/packages/node/test/behavior/cluster/ClusterEventsTest.ts +++ b/packages/node/test/behavior/cluster/ClusterEventsTest.ts @@ -12,6 +12,7 @@ import { ActionContext } from "#behavior/context/ActionContext.js"; import { BasicInformationBehavior, BasicInformationServer } from "#behaviors/basic-information"; import { BasicInformation } from "#clusters/basic-information"; import { AsyncObservable, EventEmitter, MaybePromise, Observable } from "#general"; +import { AccessControl } from "#index.js"; import { ClusterType } from "#types"; import { MyCluster, MySchema } from "./cluster-behavior-test-util.js"; @@ -45,7 +46,10 @@ describe("ClusterEvents", () => { it("includes required", () => { ({}) as Ep satisfies EventEmitter & { - reqAttr$Changed: Observable<[value: string, oldValue: string, context?: ActionContext], MaybePromise>; + reqAttr$Changed: Observable< + [value: string, oldValue: string, context?: AccessControl.Subject], + MaybePromise + >; reqEv: Observable<[payload: string, context?: ActionContext]>; }; @@ -103,7 +107,10 @@ describe("ClusterEvents", () => { it("requires mandatory", () => { ({}) as Ei satisfies { - reqAttr$Changed: Observable<[value: string, oldValue: string, context: ActionContext], MaybePromise>; + reqAttr$Changed: Observable< + [value: string, oldValue: string, context: AccessControl.Subject], + MaybePromise + >; reqEv: Observable<[payload: string, context: ActionContext]>; }; diff --git a/packages/node/test/behavior/state/managed/DatasourceTest.ts b/packages/node/test/behavior/state/managed/DatasourceTest.ts index 60e7367682..1aa02453df 100644 --- a/packages/node/test/behavior/state/managed/DatasourceTest.ts +++ b/packages/node/test/behavior/state/managed/DatasourceTest.ts @@ -14,7 +14,8 @@ import { FinalizationError } from "#behavior/state/transaction/Errors.js"; import { BehaviorSupervisor } from "#behavior/supervision/BehaviorSupervisor.js"; import { RootSupervisor } from "#behavior/supervision/RootSupervisor.js"; import { ValueSupervisor } from "#behavior/supervision/ValueSupervisor.js"; -import { AsyncObservable, MaybePromise, Observable } from "#general"; +import { AsyncObservable, createPromise, MaybePromise, Observable } from "#general"; +import { AccessControl } from "#index.js"; import { DataModelPath, DatatypeModel, FieldElement, FieldModel } from "#model"; class MyState { @@ -326,11 +327,7 @@ describe("Datasource", () => { }), ); - let actualContext: ActionContext | undefined; - await withDatasourceAndReference({ events }, async ({ context, state }) => { - actualContext = context; - await context.transaction.commit(); expect(changed).false; @@ -344,7 +341,49 @@ describe("Datasource", () => { expect(changed).true; - await expect(result).eventually.deep.equal(["BAR", "bar", actualContext]); + await expect(result).eventually.deep.equal([ + "BAR", + "bar", + { fabric: undefined, subject: undefined, offline: true }, + ]); + }); + + it("support chained commit", async () => { + const events = { + foo$Changed: Observable<[newValue: boolean, oldValue: boolean, cx: ActionContext]>(), + }; + const ds1 = createDatasource({ type: MyState, events }); + + class State2 { + bar = false; + } + const ds2 = Datasource({ + path: DataModelPath("TestDatasource2"), + type: State2, + supervisor: BehaviorSupervisor({ id: "state2", State: State2 }), + }); + + const { promise, resolver } = createPromise(); + + let subject: AccessControl.Subject | undefined; + events.foo$Changed.on(async (_v1, _v2, subj) => { + subject = subj; + + await withReference(ds2, ref => { + ref.state.bar = true; + }); + + resolver(); + }); + + await withReference(ds1, ({ state }) => { + state.foo = "hello"; + }); + + await promise; + + expect(ds2.view.bar).true; + expect(subject).deep.equals({ fabric: undefined, subject: undefined, offline: true }); }); }); diff --git a/packages/node/test/behaviors/switch/SwitchServerTest.ts b/packages/node/test/behaviors/switch/SwitchServerTest.ts index 3c0ba755b0..39ef37718a 100644 --- a/packages/node/test/behaviors/switch/SwitchServerTest.ts +++ b/packages/node/test/behaviors/switch/SwitchServerTest.ts @@ -6,6 +6,7 @@ import { SwitchServer } from "#behaviors/switch"; import { Switch } from "#clusters/switch"; +import { createPromise } from "#general"; import { MockEndpoint } from "../../endpoint/mock-endpoint.js"; function createEventCatcher(device: MockEndpoint) { @@ -138,7 +139,7 @@ describe("SwitchServer", () => { await device.set({ switch: { debounceDelay: 50 } }); }); - it("set currentState is immediately", async () => { + it("set currentState is immediate", async () => { const events = createEventCatcher(device); await device.set({ @@ -160,9 +161,13 @@ describe("SwitchServer", () => { ]); }); - it("set rawPosition with debounceDelay=0 is immediately", async () => { + it("set rawPosition with debounceDelay=0 is immediate", async () => { await device.set({ switch: { debounceDelay: 0 } }); + const { promise, resolver } = createPromise(); + + (device.events as any).switch.currentPosition$Changed.on(resolver); + const events = createEventCatcher(device); await device.set({ @@ -171,6 +176,9 @@ describe("SwitchServer", () => { }, }); + // Actual event is deferred but should trigger without advancing time + await promise; + expect(events).deep.equals([ { name: "rawPosition$Changed",