Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove $Changed from logical flow of emitter #1394

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 21 additions & 15 deletions packages/node/src/behavior/AccessControl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,9 @@ export namespace AccessControl {
}

/**
* Authorization metadata that varies with session.
* Information about the subject that triggered a change.
*/
export interface Session {
/**
* Checks if the authorized client has a certain Access Privilege granted.
*/
authorizedFor(desiredAccessLevel: AccessLevel, location?: Location): boolean;

export interface Actor {
/**
* The fabric of the authorized client.
*/
Expand All @@ -140,6 +135,25 @@ export namespace AccessControl {
*/
readonly subject?: SubjectId;

/**
* Indicates the action was triggered locally, not by a remote subject.
*
* If true, access levels are not enforced and all values are read/write. Datatypes are still enforced.
*
* Tracks "offline" rather than "online" because this makes the safer mode (full enforcement) the default.
*/
offline?: boolean;
}

/**
* Authorization metadata that varies with session.
*/
export interface Session extends Actor {
/**
* Checks if the authorized client has a certain Access Privilege granted.
*/
authorizedFor(desiredAccessLevel: AccessLevel, location?: Location): boolean;

/**
* If this is true, fabric-scoped lists are filtered to the accessing
* fabric.
Expand All @@ -156,14 +170,6 @@ export namespace AccessControl {
* active.
*/
readonly command?: boolean;

/**
* If this is true then access levels are not enforced and all values are read/write. Datatypes are still
* enforced.
*
* Tracks "offline" rather than "online" because this makes the safer mode (full enforcement) the default.
*/
offline?: boolean;
}
}

Expand Down
18 changes: 10 additions & 8 deletions packages/node/src/behavior/cluster/ClusterEvents.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

import { AccessControl } from "#behavior/AccessControl.js";
import type { AsyncObservable, Observable } from "#general";
import type { ClusterType, TypeFromSchema } from "#types";
import type { Behavior } from "../Behavior.js";
Expand All @@ -27,31 +28,32 @@ export namespace ClusterEvents {
/**
* Properties the cluster contributes to Events.
*/
export type Properties<C> = AttributeObservables<ClusterType.AttributesOf<C>, "Changing"> &
AttributeObservables<ClusterType.AttributesOf<C>, "Changed"> &
export type Properties<C> = AttributeObservables<ClusterType.AttributesOf<C>, "Changing", ActionContext> &
AttributeObservables<ClusterType.AttributesOf<C>, "Changed", AccessControl.Actor> &
EventObservables<ClusterType.EventsOf<C>>;

export type AttributeObservables<A extends Record<string, ClusterType.Attribute>, N extends string> = {
export type AttributeObservables<A extends Record<string, ClusterType.Attribute>, N extends string, C> = {
[K in keyof A as string extends K
? never
: K extends string
? A[K] extends { optional: true }
? never
: `${K}$${N}`
: never]: AttributeObservable<A[K]>;
: never]: AttributeObservable<A[K], C>;
} & {
[K in keyof A as string extends K
? never
: K extends string
? A[K] extends { optional: true }
? `${K}$${N}`
: never
: never]?: AttributeObservable<A[K]>;
: never]?: AttributeObservable<A[K], C>;
};

export type AttributeObservable<A extends ClusterType.Attribute = ClusterType.Attribute> = AsyncObservable<
[value: TypeFromSchema<A["schema"]>, oldValue: TypeFromSchema<A["schema"]>, context: ActionContext]
>;
export type AttributeObservable<
A extends ClusterType.Attribute = ClusterType.Attribute,
C = unknown,
> = AsyncObservable<[value: TypeFromSchema<A["schema"]>, oldValue: TypeFromSchema<A["schema"]>, context: C]>;

export type EventObservables<E extends Record<string, ClusterType.Event>> = {
[K in keyof E as string extends K
Expand Down
18 changes: 10 additions & 8 deletions packages/node/src/behavior/internal/ClusterServerBacking.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import {
EventServer,
FabricManager,
Message,
SecureSession,
} from "#protocol";
import { Attribute, Command, Event, TlvNoResponse } from "#types";
import { AccessControl } from "../AccessControl.js";
Expand Down Expand Up @@ -274,13 +273,16 @@ function createAttributeServer(
// Wire events (FixedAttributeServer is not an AttributeServer so we skip that)
if (server instanceof AttributeServer) {
const observable = (backing.events as any)[`${name}$Changed`] as ClusterEvents.AttributeObservable | undefined;
observable?.on((_value, _oldValue, context) => {
const session = context.session;
if (session instanceof SecureSession) {
server.updated(session);
} else {
server.updatedLocal();
}
observable?.on(() => {
// We no longer have a session in $Changed handlers. I think it's OK to use updatedLocal instead of
// updated as our use of listeners is limited. And updated() will actually notify listeners of fabric
// filtered result which probably isn't correct anyway
// const session = context.session;
// if (session instanceof SecureSession) {
// server.updated(session);
// } else {
server.updatedLocal();
// }
});
}

Expand Down
39 changes: 22 additions & 17 deletions packages/node/src/behavior/state/managed/Datasource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ export namespace Datasource {
}

export interface ValueObserver {
(value: Val, oldValue: Val, context?: ValueSupervisor.Session): void;
(value: Val, oldValue: Val, context?: unknown): void;
}
}

Expand Down Expand Up @@ -218,6 +218,7 @@ interface Internals extends Datasource.Options {
interface CommitChanges {
persistent?: Val.Struct;
notifications: Array<{
name: string;
event: Observable<any[], MaybePromise>;
params: Parameters<Datasource.ValueObserver>;
}>;
Expand Down Expand Up @@ -563,8 +564,13 @@ function createSessionContext(resource: Resource, internals: Internals, session:
const event = internals.events?.[`${name}$Changed`];
if (event?.isObserved) {
changes.notifications.push({
name,
event,
params: [values[name], internals.values[name], session],
params: [
values[name],
internals.values[name],
{ fabric: session.fabric, subject: session.subject, offline: session.offline },
],
});
}
}
Expand Down Expand Up @@ -632,25 +638,24 @@ function createSessionContext(resource: Resource, internals: Internals, session:
return;
}

// Emit is optionally async so we must iterate manually
const iterator = changes.notifications[Symbol.iterator]();

function emitChanged(): MaybePromise<void> {
while (true) {
const n = iterator.next();
if (n.done) {
return;
}

const { event, params } = n.value;
const result = event.emit(...params);
if (MaybePromise.is(result)) {
return Promise.resolve(result).then(emitChanged);
// Changed events do not participate in logical flow of the original change. We allow them to run in parallel
// and log errors independently
for (const { name, event, params } of changes.notifications) {
try {
const promise = event.emit(...params);
if (MaybePromise.is(promise)) {
// Currently we do not track this promise, so it is up to higher-level logic (such as for reactors)
// to track. Could add a registration system if necessary
promise.then(undefined, e => unhandled(name, e));
}
} catch (e) {
unhandled(name, e);
}
}

return emitChanged();
function unhandled(fieldName: string, error: unknown) {
logger.error(`Unhandled error in ${fieldName}$Changed handler of ${internals.path}:`, error);
}
}

/**
Expand Down
Loading
Loading