Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: dispatch all events to EventBus #445

Merged
merged 29 commits into from
Sep 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
2ff2a9a
chore: create event bus
williazz Sep 5, 2023
b765501
chore: add EventBus to Orchestration
williazz Sep 9, 2023
59ad827
fix linting errors
williazz Sep 5, 2023
8975c04
fix rebase error
williazz Sep 5, 2023
a3c7c78
refactor: EventCache publishes raw RumEvent
williazz Sep 5, 2023
6bb67e8
fix: EventCache.record returns ParsedRumEvent without publishing
williazz Sep 6, 2023
9523320
chore: publish resource events
williazz Sep 6, 2023
751fc6e
chore: publish navigation events
williazz Sep 6, 2023
22a604a
chore: resolve nits
williazz Sep 6, 2023
15eb2f3
chore: only publish image resource events
williazz Sep 6, 2023
ebe415e
chore: only publish the first navigation event
williazz Sep 6, 2023
269eb35
Revert "chore: only publish the first navigation event"
williazz Sep 8, 2023
75a5f68
Revert "chore: only publish image resource events"
williazz Sep 8, 2023
16f9cdf
Revert "chore: resolve nits"
williazz Sep 8, 2023
a6be092
Revert "chore: publish navigation events"
williazz Sep 8, 2023
1bdd317
Revert "chore: publish resource events"
williazz Sep 8, 2023
df7aeed
Revert "fix: EventCache.record returns ParsedRumEvent without publish…
williazz Sep 8, 2023
d6a9497
refactor: rename notify to dispatch
williazz Sep 8, 2023
202a919
chore: rename context.bus to eventBus
williazz Sep 8, 2023
90fab11
chore: add topic enum to eventbus
williazz Sep 8, 2023
aa7513f
chore: event bus dispatches with optional key
williazz Sep 8, 2023
7548383
chore: dispatch resource events with keys
williazz Sep 8, 2023
8cda240
chore: dispatch lvl 2 navigation events with key
williazz Sep 8, 2023
0c0d73c
nit: rename topic.events to event
williazz Sep 8, 2023
a4132cc
chore: fix merge conflict
williazz Sep 9, 2023
a54c76c
Merge branch 'EventBusv1.15' into EventBus
williazz Sep 9, 2023
8400db0
nit: remove unnecessary import
williazz Sep 9, 2023
b17af20
chore: remove key
williazz Sep 11, 2023
bf194e7
chore: remove remaining key usages
williazz Sep 11, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions src/event-bus/EventBus.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
export type Subscriber = (message: any) => void;
export enum Topic {
EVENT = 'event'
}

/** A topic-based event bus to facilitate communication between plugins */
export default class EventBus<T = Topic> {
// map<topic, subscriber>
private subscribers = new Map<T, Subscriber[]>();

subscribe(topic: T, subscriber: Subscriber): void {
const list = this.subscribers.get(topic) ?? [];
if (list.length === 0) {
this.subscribers.set(topic, list);
}
list.push(subscriber);
}

unsubscribe(topic: T, subscriber: Subscriber) {
const list = this.subscribers.get(topic);
if (list) {
for (let i = 0; i < list.length; i++) {
if (list[i] === subscriber) {
list.splice(i, 1);
return true;
}
}
}
return false;
}

dispatch(topic: T, message: any): void {
const list = this.subscribers.get(topic);
if (list) {
for (const subscriber of list) {
subscriber(message);
}
}
}
}
53 changes: 53 additions & 0 deletions src/event-bus/__tests__/EventBus.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import EventBus from '../EventBus';

export enum MockTopics {
FOOD = 'food',
BOOKS = 'books'
}
describe('EventBus tests', () => {
let eventBus: EventBus<MockTopics>;
const l1 = jest.fn();
const l2 = jest.fn();
beforeEach(() => {
eventBus = new EventBus();
jest.clearAllMocks();
});
test('when dispatch is invoked then all listeners are called', async () => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: consider adding test to subscribe to different topics

Copy link
Contributor Author

@williazz williazz Sep 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added an extra test "when subscribed to topic A then does not hear topic B"

// init
eventBus.subscribe(MockTopics.FOOD, l1);
eventBus.subscribe(MockTopics.FOOD, l2);

// run
eventBus.dispatch(MockTopics.FOOD, 'burger');

// assert
expect(l1).toHaveBeenCalledWith('burger');
expect(l2).toHaveBeenCalledWith('burger');
});

test('when subscriber is removed then it is not called', async () => {
// init
eventBus.subscribe(MockTopics.FOOD, l1);
eventBus.subscribe(MockTopics.FOOD, l2);
const removed = eventBus.unsubscribe(MockTopics.FOOD, l2);

// run
eventBus.dispatch(MockTopics.FOOD, 'burger');

// assert
expect(l1).toHaveBeenCalledWith('burger');
expect(removed).toBe(true);
expect(l2).not.toHaveBeenCalled();
});

test('when subscribed to topic A then does not hear topic B', async () => {
eventBus.subscribe(MockTopics.FOOD, l1);
eventBus.subscribe(MockTopics.BOOKS, l2);

// run
eventBus.dispatch(MockTopics.FOOD, 'burger');

// assert
expect(l2).not.toHaveBeenCalled();
});
});
21 changes: 17 additions & 4 deletions src/event-cache/EventCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
UserDetails,
RumEvent
} from '../dispatch/dataplane';
import EventBus, { Topic } from '../event-bus/EventBus';

const webClientVersion = '1.14.0';

Expand Down Expand Up @@ -37,7 +38,11 @@ export class EventCache {
* @param sessionManager The sessionManager returns user id, session id and handles session timeout.
* @param pageManager The pageManager returns page id.
*/
constructor(applicationDetails: AppMonitorDetails, config: Config) {
constructor(
applicationDetails: AppMonitorDetails,
config: Config,
private eventBus = new EventBus<Topic>()
) {
this.appMonitorDetails = applicationDetails;
this.config = config;
this.enabled = true;
Expand Down Expand Up @@ -220,12 +225,20 @@ export class EventCache {
'aws:clientVersion': webClientVersion
};

this.events.push({
details: JSON.stringify(eventData),
const partialEvent = {
id: v4(),
metadata: JSON.stringify(metaData),
timestamp: new Date(),
type
};
this.eventBus.dispatch(Topic.EVENT, {
...partialEvent,
details: eventData,
metadata: metaData
});
this.events.push({
...partialEvent,
details: JSON.stringify(eventData),
metadata: JSON.stringify(metaData)
});
};

Expand Down
56 changes: 56 additions & 0 deletions src/event-cache/__tests__/EventCache.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import { SessionManager } from '../../sessions/SessionManager';
import { RumEvent } from '../../dispatch/dataplane';
import { DEFAULT_CONFIG, mockFetch } from '../../test-utils/test-utils';
import { INSTALL_MODULE, INSTALL_SCRIPT } from '../../utils/constants';
import EventBus, { Topic } from '../../event-bus/EventBus';
jest.mock('../../event-bus/EventBus');

global.fetch = mockFetch;
const getSession = jest.fn(() => ({
Expand Down Expand Up @@ -492,6 +494,60 @@ describe('EventCache tests', () => {
expect(eventCache.getEventBatch().length).toEqual(1);
});

test('when event is recorded then events subscribers are notified with parsed rum event', async () => {
// Init
const EVENT1_SCHEMA = 'com.amazon.rum.event1';
const bus = new EventBus();
const eventCache: EventCache = Utils.createEventCache(
DEFAULT_CONFIG,
bus
);

const event = {
id: expect.stringMatching(/[0-9a-f\-]+/),
timestamp: new Date(),
type: EVENT1_SCHEMA,
metadata: `{"version":"1.0.0","aws:client":"${INSTALL_MODULE}","aws:clientVersion":"${WEB_CLIENT_VERSION}"}`,
details: '{}'
};

// Run
eventCache.recordEvent(EVENT1_SCHEMA, {});
const eventBatch: RumEvent[] = eventCache.getEventBatch();
expect(eventBatch).toEqual(expect.arrayContaining([event]));
// eslint-disable-next-line
expect(bus.dispatch).toHaveBeenCalledWith(
Topic.EVENT,
expect.objectContaining({
id: expect.stringMatching(/[0-9a-f\-]+/),
timestamp: new Date(),
type: EVENT1_SCHEMA,
metadata: expect.objectContaining({
version: '1.0.0',
'aws:client': INSTALL_MODULE,
'aws:clientVersion': WEB_CLIENT_VERSION
}),
details: expect.objectContaining({})
})
);
});

test('when cache is disabled then subscribers are not notified', async () => {
// Init
const EVENT1_SCHEMA = 'com.amazon.rum.event1';
const bus = new EventBus();
const eventCache: EventCache = Utils.createEventCache(
DEFAULT_CONFIG,
bus
);
// Run
eventCache.disable();
eventCache.recordEvent(EVENT1_SCHEMA, {});
const eventBatch: RumEvent[] = eventCache.getEventBatch();
expect(eventBatch).toHaveLength(0);
expect(bus.dispatch).not.toHaveBeenCalled(); // eslint-disable-line
});

test('when event limit is zero then recordEvent records all events', async () => {
// Init
const eventCount = 0;
Expand Down
5 changes: 4 additions & 1 deletion src/orchestration/Orchestration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import { FetchPlugin } from '../plugins/event-plugins/FetchPlugin';
import { PageViewPlugin } from '../plugins/event-plugins/PageViewPlugin';
import { PageAttributes } from '../sessions/PageManager';
import { INSTALL_MODULE } from '../utils/constants';
import EventBus, { Topic } from '../event-bus/EventBus';

const DEFAULT_REGION = 'us-west-2';
const DEFAULT_ENDPOINT = `https://dataplane.rum.${DEFAULT_REGION}.amazonaws.com`;
Expand Down Expand Up @@ -206,6 +207,7 @@ export class Orchestration {
private eventCache: EventCache;
private dispatchManager: Dispatch;
private config: Config;
private eventBus = new EventBus<Topic>();

/**
* Instantiate the CloudWatch RUM web client and begin monitoring the
Expand Down Expand Up @@ -444,7 +446,8 @@ export class Orchestration {
config: this.config,
record: this.eventCache.recordEvent,
recordPageView: this.eventCache.recordPageView,
getSession: this.eventCache.getSession
getSession: this.eventCache.getSession,
eventBus: this.eventBus
};

// Initialize PluginManager
Expand Down
26 changes: 7 additions & 19 deletions src/plugins/event-plugins/__tests__/FetchPlugin.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import {
} from '../../utils/http-utils';
import { advanceTo } from 'jest-date-mock';
import {
context,
DEFAULT_CONFIG,
getSession,
record,
recordPageView,
xRayOffContext,
Expand Down Expand Up @@ -73,6 +75,7 @@ describe('FetchPlugin tests', () => {
mockFetchWithErrorObject.mockClear();
mockFetchWithErrorObjectAndStack.mockClear();
record.mockClear();
getSession.mockClear();
});

test('when fetch is called then the plugin records the http request/response', async () => {
Expand Down Expand Up @@ -537,17 +540,9 @@ describe('FetchPlugin tests', () => {
logicalServiceName: 'sample.rum.aws.amazon.com',
urlsToInclude: [/aws\.amazon\.com/]
};
const xRayOnContext: PluginContext = {
applicationId: 'b',
applicationVersion: '1.0',
config: { ...DEFAULT_CONFIG, ...{ enableXRay: true } },
record,
recordPageView,
getSession
};

const context = Object.assign({}, xRayOnContext, { getSession });
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix unrelated breaking changes

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What was the problem here?

const plugin: FetchPlugin = new FetchPlugin(config);
plugin.load(xRayOnContext);
plugin.load(context);

// Run
await fetch(URL);
Expand All @@ -566,17 +561,10 @@ describe('FetchPlugin tests', () => {
logicalServiceName: 'sample.rum.aws.amazon.com',
urlsToInclude: [/aws\.amazon\.com/]
};
const xRayOnContext: PluginContext = {
applicationId: 'b',
applicationVersion: '1.0',
config: { ...DEFAULT_CONFIG, ...{ enableXRay: true } },
record,
recordPageView,
getSession
};
const context = Object.assign({}, xRayOnContext, { getSession });

const plugin: FetchPlugin = new FetchPlugin(config);
plugin.load(xRayOnContext);
plugin.load(context);

// Run
await fetch(URL);
Expand Down
14 changes: 5 additions & 9 deletions src/plugins/event-plugins/__tests__/ResourcePlugin.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,18 +70,14 @@ describe('ResourcePlugin tests', () => {
test('when recordResourceUrl is false then the resource name is not recorded', async () => {
// Setup
mockRandom(0); // Retain order in shuffle
const context: PluginContext = {
applicationId: 'b',
applicationVersion: '1.0',
config: { ...DEFAULT_CONFIG, recordResourceUrl: false },
record,
recordPageView,
getSession
};

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix unrelated breaking changes

const plugin: ResourcePlugin = buildResourcePlugin();
const mockContext = Object.assign({}, context, {
config: { ...DEFAULT_CONFIG, recordResourceUrl: false }
});

// Run
plugin.load(context);
plugin.load(mockContext);
window.dispatchEvent(new Event('load'));
plugin.disable();

Expand Down
21 changes: 5 additions & 16 deletions src/plugins/event-plugins/__tests__/XhrPlugin.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { PartialHttpPluginConfig } from '../../utils/http-utils';
import { advanceTo } from 'jest-date-mock';
import { XhrPlugin } from '../XhrPlugin';
import {
context as mockContext,
xRayOffContext,
xRayOnContext,
record,
Expand Down Expand Up @@ -537,14 +538,9 @@ describe('XhrPlugin tests', () => {
record: false,
eventCount: 0
}));
const context: PluginContext = {
applicationId: 'b',
applicationVersion: '1.0',
config: DEFAULT_CONFIG,
record,
recordPageView,
getSession
};

const context = { ...mockContext, getSession };

const config: PartialHttpPluginConfig = {
logicalServiceName: 'sample.rum.aws.amazon.com',
urlsToInclude: [/response\.json/]
Expand Down Expand Up @@ -574,14 +570,7 @@ describe('XhrPlugin tests', () => {
test('when getSession returns undefined then the plugin does not record a trace', async () => {
// Init
const getSession: jest.MockedFunction<GetSession> = jest.fn();
const context: PluginContext = {
applicationId: 'b',
applicationVersion: '1.0',
config: { ...DEFAULT_CONFIG, ...{ enableXRay: true } },
record,
recordPageView,
getSession
};
const context = { ...mockContext, getSession };
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix unrelated breaking changes

const config: PartialHttpPluginConfig = {
logicalServiceName: 'sample.rum.aws.amazon.com',
urlsToInclude: [/response\.json/],
Expand Down
3 changes: 2 additions & 1 deletion src/plugins/types.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { Config } from '../orchestration/Orchestration';
import { Session } from '../sessions/SessionManager';
import EventBus, { Topic } from '../event-bus/EventBus';

export type RecordEvent = (type: string, eventData: object) => void;
export type RecordPageView = (pageId: string) => void;

export type GetSession = () => Session | undefined;

export type PluginContext = {
Expand All @@ -13,4 +13,5 @@ export type PluginContext = {
record: RecordEvent;
recordPageView: RecordPageView;
getSession: GetSession;
eventBus: EventBus<Topic>;
};
Loading