Skip to content

Commit

Permalink
Remove $Changed from logical flow of emitter
Browse files Browse the repository at this point in the history
Previously we were providing the ActionContext of the emitter to $Changed handlers.  The transaction was destroyed in this case so was not really useful.  Now we instead just provide metadata about the subject that triggered the change.

Also, the emitter was awaiting $Changed handlers if they were async.  There are advantages and disadvantages to doing this:

* Advantage is that the logical flow is more deterministic and we track the promise as a normal part of node activity
* The disadvantage is that the emitter blocks until an independent process completes

This commit makes it so $Changed handlers operate independently of the emitter.  Tests pass but effects will be subtle so will need to keep an eye on this.
  • Loading branch information
lauckhart committed Nov 13, 2024
1 parent eb222f7 commit 68025a1
Show file tree
Hide file tree
Showing 8 changed files with 131 additions and 59 deletions.
36 changes: 21 additions & 15 deletions packages/node/src/behavior/AccessControl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,9 @@ export namespace AccessControl {
}

/**
* Authorization metadata that varies with session.
* Information about the subject that triggered a change.
*/
export interface Session {
/**
* Checks if the authorized client has a certain Access Privilege granted.
*/
authorizedFor(desiredAccessLevel: AccessLevel, location?: Location): boolean;

export interface Subject {
/**
* The fabric of the authorized client.
*/
Expand All @@ -140,6 +135,25 @@ export namespace AccessControl {
*/
readonly subject?: SubjectId;

/**
* Indicates the action was triggered locally, not by a remote subject.
*
* If true, access levels are not enforced and all values are read/write. Datatypes are still enforced.
*
* Tracks "offline" rather than "online" because this makes the safer mode (full enforcement) the default.
*/
offline?: boolean;
}

/**
* Authorization metadata that varies with session.
*/
export interface Session extends Subject {
/**
* Checks if the authorized client has a certain Access Privilege granted.
*/
authorizedFor(desiredAccessLevel: AccessLevel, location?: Location): boolean;

/**
* If this is true, fabric-scoped lists are filtered to the accessing
* fabric.
Expand All @@ -156,14 +170,6 @@ export namespace AccessControl {
* active.
*/
readonly command?: boolean;

/**
* If this is true then access levels are not enforced and all values are read/write. Datatypes are still
* enforced.
*
* Tracks "offline" rather than "online" because this makes the safer mode (full enforcement) the default.
*/
offline?: boolean;
}
}

Expand Down
18 changes: 10 additions & 8 deletions packages/node/src/behavior/cluster/ClusterEvents.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

import { AccessControl } from "#behavior/AccessControl.js";
import type { AsyncObservable, Observable } from "#general";
import type { ClusterType, TypeFromSchema } from "#types";
import type { Behavior } from "../Behavior.js";
Expand All @@ -27,31 +28,32 @@ export namespace ClusterEvents {
/**
* Properties the cluster contributes to Events.
*/
export type Properties<C> = AttributeObservables<ClusterType.AttributesOf<C>, "Changing"> &
AttributeObservables<ClusterType.AttributesOf<C>, "Changed"> &
export type Properties<C> = AttributeObservables<ClusterType.AttributesOf<C>, "Changing", ActionContext> &
AttributeObservables<ClusterType.AttributesOf<C>, "Changed", AccessControl.Subject> &
EventObservables<ClusterType.EventsOf<C>>;

export type AttributeObservables<A extends Record<string, ClusterType.Attribute>, N extends string> = {
export type AttributeObservables<A extends Record<string, ClusterType.Attribute>, N extends string, C> = {
[K in keyof A as string extends K
? never
: K extends string
? A[K] extends { optional: true }
? never
: `${K}$${N}`
: never]: AttributeObservable<A[K]>;
: never]: AttributeObservable<A[K], C>;
} & {
[K in keyof A as string extends K
? never
: K extends string
? A[K] extends { optional: true }
? `${K}$${N}`
: never
: never]?: AttributeObservable<A[K]>;
: never]?: AttributeObservable<A[K], C>;
};

export type AttributeObservable<A extends ClusterType.Attribute = ClusterType.Attribute> = AsyncObservable<
[value: TypeFromSchema<A["schema"]>, oldValue: TypeFromSchema<A["schema"]>, context: ActionContext]
>;
export type AttributeObservable<
A extends ClusterType.Attribute = ClusterType.Attribute,
C = unknown,
> = AsyncObservable<[value: TypeFromSchema<A["schema"]>, oldValue: TypeFromSchema<A["schema"]>, context: C]>;

export type EventObservables<E extends Record<string, ClusterType.Event>> = {
[K in keyof E as string extends K
Expand Down
18 changes: 10 additions & 8 deletions packages/node/src/behavior/internal/ClusterServerBacking.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import {
EventServer,
FabricManager,
Message,
SecureSession,
} from "#protocol";
import { Attribute, Command, Event, TlvNoResponse } from "#types";
import { AccessControl } from "../AccessControl.js";
Expand Down Expand Up @@ -274,13 +273,16 @@ function createAttributeServer(
// Wire events (FixedAttributeServer is not an AttributeServer so we skip that)
if (server instanceof AttributeServer) {
const observable = (backing.events as any)[`${name}$Changed`] as ClusterEvents.AttributeObservable | undefined;
observable?.on((_value, _oldValue, context) => {
const session = context.session;
if (session instanceof SecureSession) {
server.updated(session);
} else {
server.updatedLocal();
}
observable?.on(() => {
// We no longer have a session in $Changed handlers. I think it's OK to use updatedLocal instead of
// updated as our use of listeners is limited. And updated() will actually notify listeners of fabric
// filtered result which probably isn't correct anyway
// const session = context.session;
// if (session instanceof SecureSession) {
// server.updated(session);
// } else {
server.updatedLocal();
// }
});
}

Expand Down
39 changes: 22 additions & 17 deletions packages/node/src/behavior/state/managed/Datasource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ export namespace Datasource {
}

export interface ValueObserver {
(value: Val, oldValue: Val, context?: ValueSupervisor.Session): void;
(value: Val, oldValue: Val, context?: unknown): void;
}
}

Expand Down Expand Up @@ -215,6 +215,7 @@ interface Internals extends Datasource.Options {
interface CommitChanges {
persistent?: Val.Struct;
notifications: Array<{
name: string;
event: Observable<any[], MaybePromise>;
params: Parameters<Datasource.ValueObserver>;
}>;
Expand Down Expand Up @@ -541,8 +542,13 @@ function createSessionContext(resource: Resource, internals: Internals, session:
const event = internals.events?.[`${name}$Changed`];
if (event?.isObserved) {
changes.notifications.push({
name,
event,
params: [values[name], internals.values[name], session],
params: [
values[name],
internals.values[name],
{ fabric: session.fabric, subject: session.subject, offline: session.offline },
],
});
}
}
Expand Down Expand Up @@ -606,25 +612,24 @@ function createSessionContext(resource: Resource, internals: Internals, session:
return;
}

// Emit is optionally async so we must iterate manually
const iterator = changes.notifications[Symbol.iterator]();

function emitChanged(): MaybePromise<void> {
while (true) {
const n = iterator.next();
if (n.done) {
return;
}

const { event, params } = n.value;
const result = event.emit(...params);
if (MaybePromise.is(result)) {
return Promise.resolve(result).then(emitChanged);
// Changed events do not participate in logical flow of the original change. We allow them to run in parallel
// and log errors independently
for (const { name, event, params } of changes.notifications) {
try {
const promise = event.emit(...params);
if (MaybePromise.is(promise)) {
// Currently we do not track this promise, so it is up to higher-level logic (such as for reactors)
// to track. Could add a registration system if necessary
promise.then(undefined, e => unhandled(name, e));
}
} catch (e) {
unhandled(name, e);
}
}

return emitChanged();
function unhandled(fieldName: string, error: unknown) {
logger.error(`Unhandled error in ${fieldName}$Changed handler of ${internals.path}:`, error);
}
}

/**
Expand Down
5 changes: 4 additions & 1 deletion packages/node/test/behavior/cluster/ClusterBehaviorTest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import {
MaybePromise,
Observable,
} from "#general";
import { AccessControl } from "#index.js";
import { AttributeElement, ClusterModel } from "#model";
import {
Attribute,
Expand Down Expand Up @@ -106,7 +107,9 @@ describe("ClusterBehavior", () => {

({}) as MyBehavior satisfies {
events: EventEmitter & {
reqAttr$Changed: AsyncObservable<[value: string, oldValue: string, context?: ActionContext]>;
reqAttr$Changed: AsyncObservable<
[value: string, oldValue: string, context?: AccessControl.Subject]
>;
};
};

Expand Down
11 changes: 9 additions & 2 deletions packages/node/test/behavior/cluster/ClusterEventsTest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { ActionContext } from "#behavior/context/ActionContext.js";
import { BasicInformationBehavior, BasicInformationServer } from "#behaviors/basic-information";
import { BasicInformation } from "#clusters/basic-information";
import { AsyncObservable, EventEmitter, MaybePromise, Observable } from "#general";
import { AccessControl } from "#index.js";
import { ClusterType } from "#types";
import { MyCluster, MySchema } from "./cluster-behavior-test-util.js";

Expand Down Expand Up @@ -45,7 +46,10 @@ describe("ClusterEvents", () => {

it("includes required", () => {
({}) as Ep satisfies EventEmitter & {
reqAttr$Changed: Observable<[value: string, oldValue: string, context?: ActionContext], MaybePromise>;
reqAttr$Changed: Observable<
[value: string, oldValue: string, context?: AccessControl.Subject],
MaybePromise
>;

reqEv: Observable<[payload: string, context?: ActionContext]>;
};
Expand Down Expand Up @@ -103,7 +107,10 @@ describe("ClusterEvents", () => {

it("requires mandatory", () => {
({}) as Ei satisfies {
reqAttr$Changed: Observable<[value: string, oldValue: string, context: ActionContext], MaybePromise>;
reqAttr$Changed: Observable<
[value: string, oldValue: string, context: AccessControl.Subject],
MaybePromise
>;

reqEv: Observable<[payload: string, context: ActionContext]>;
};
Expand Down
51 changes: 45 additions & 6 deletions packages/node/test/behavior/state/managed/DatasourceTest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ import { FinalizationError } from "#behavior/state/transaction/Errors.js";
import { BehaviorSupervisor } from "#behavior/supervision/BehaviorSupervisor.js";
import { RootSupervisor } from "#behavior/supervision/RootSupervisor.js";
import { ValueSupervisor } from "#behavior/supervision/ValueSupervisor.js";
import { AsyncObservable, MaybePromise, Observable } from "#general";
import { AsyncObservable, createPromise, MaybePromise, Observable } from "#general";
import { AccessControl } from "#index.js";
import { DataModelPath, DatatypeModel, FieldElement, FieldModel } from "#model";

class MyState {
Expand Down Expand Up @@ -326,11 +327,7 @@ describe("Datasource", () => {
}),
);

let actualContext: ActionContext | undefined;

await withDatasourceAndReference({ events }, async ({ context, state }) => {
actualContext = context;

await context.transaction.commit();

expect(changed).false;
Expand All @@ -344,7 +341,49 @@ describe("Datasource", () => {

expect(changed).true;

await expect(result).eventually.deep.equal(["BAR", "bar", actualContext]);
await expect(result).eventually.deep.equal([
"BAR",
"bar",
{ fabric: undefined, subject: undefined, offline: true },
]);
});

it("support chained commit", async () => {
const events = {
foo$Changed: Observable<[newValue: boolean, oldValue: boolean, cx: ActionContext]>(),
};
const ds1 = createDatasource({ type: MyState, events });

class State2 {
bar = false;
}
const ds2 = Datasource({
path: DataModelPath("TestDatasource2"),
type: State2,
supervisor: BehaviorSupervisor({ id: "state2", State: State2 }),
});

const { promise, resolver } = createPromise<void>();

let subject: AccessControl.Subject | undefined;
events.foo$Changed.on(async (_v1, _v2, subj) => {
subject = subj;

await withReference(ds2, ref => {
ref.state.bar = true;
});

resolver();
});

await withReference(ds1, ({ state }) => {
state.foo = "hello";
});

await promise;

expect(ds2.view.bar).true;
expect(subject).deep.equals({ fabric: undefined, subject: undefined, offline: true });
});
});

Expand Down
12 changes: 10 additions & 2 deletions packages/node/test/behaviors/switch/SwitchServerTest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import { SwitchServer } from "#behaviors/switch";
import { Switch } from "#clusters/switch";
import { createPromise } from "#general";
import { MockEndpoint } from "../../endpoint/mock-endpoint.js";

function createEventCatcher(device: MockEndpoint<any>) {
Expand Down Expand Up @@ -138,7 +139,7 @@ describe("SwitchServer", () => {
await device.set({ switch: { debounceDelay: 50 } });
});

it("set currentState is immediately", async () => {
it("set currentState is immediate", async () => {
const events = createEventCatcher(device);

await device.set({
Expand All @@ -160,9 +161,13 @@ describe("SwitchServer", () => {
]);
});

it("set rawPosition with debounceDelay=0 is immediately", async () => {
it("set rawPosition with debounceDelay=0 is immediate", async () => {
await device.set({ switch: { debounceDelay: 0 } });

const { promise, resolver } = createPromise();

(device.events as any).switch.currentPosition$Changed.on(resolver);

const events = createEventCatcher(device);

await device.set({
Expand All @@ -171,6 +176,9 @@ describe("SwitchServer", () => {
},
});

// Actual event is deferred but should trigger without advancing time
await promise;

expect(events).deep.equals([
{
name: "rawPosition$Changed",
Expand Down

0 comments on commit 68025a1

Please sign in to comment.