diff --git a/src/event-bus/EventBus.ts b/src/event-bus/EventBus.ts new file mode 100644 index 00000000..0f0a0d3d --- /dev/null +++ b/src/event-bus/EventBus.ts @@ -0,0 +1,40 @@ +export type Subscriber = (message: any) => void; +export enum Topic { + EVENT = 'event' +} + +/** A topic-based event bus to facilitate communication between plugins */ +export default class EventBus { + // map + private subscribers = new Map(); + + subscribe(topic: T, subscriber: Subscriber): void { + const list = this.subscribers.get(topic) ?? []; + if (list.length === 0) { + this.subscribers.set(topic, list); + } + list.push(subscriber); + } + + unsubscribe(topic: T, subscriber: Subscriber) { + const list = this.subscribers.get(topic); + if (list) { + for (let i = 0; i < list.length; i++) { + if (list[i] === subscriber) { + list.splice(i, 1); + return true; + } + } + } + return false; + } + + dispatch(topic: T, message: any): void { + const list = this.subscribers.get(topic); + if (list) { + for (const subscriber of list) { + subscriber(message); + } + } + } +} diff --git a/src/event-bus/__tests__/EventBus.test.ts b/src/event-bus/__tests__/EventBus.test.ts new file mode 100644 index 00000000..e11ac2ac --- /dev/null +++ b/src/event-bus/__tests__/EventBus.test.ts @@ -0,0 +1,53 @@ +import EventBus from '../EventBus'; + +export enum MockTopics { + FOOD = 'food', + BOOKS = 'books' +} +describe('EventBus tests', () => { + let eventBus: EventBus; + const l1 = jest.fn(); + const l2 = jest.fn(); + beforeEach(() => { + eventBus = new EventBus(); + jest.clearAllMocks(); + }); + test('when dispatch is invoked then all listeners are called', async () => { + // init + eventBus.subscribe(MockTopics.FOOD, l1); + eventBus.subscribe(MockTopics.FOOD, l2); + + // run + eventBus.dispatch(MockTopics.FOOD, 'burger'); + + // assert + expect(l1).toHaveBeenCalledWith('burger'); + expect(l2).toHaveBeenCalledWith('burger'); + }); + + test('when subscriber is removed then it is not called', async () => { + // init + eventBus.subscribe(MockTopics.FOOD, l1); + eventBus.subscribe(MockTopics.FOOD, l2); + const removed = eventBus.unsubscribe(MockTopics.FOOD, l2); + + // run + eventBus.dispatch(MockTopics.FOOD, 'burger'); + + // assert + expect(l1).toHaveBeenCalledWith('burger'); + expect(removed).toBe(true); + expect(l2).not.toHaveBeenCalled(); + }); + + test('when subscribed to topic A then does not hear topic B', async () => { + eventBus.subscribe(MockTopics.FOOD, l1); + eventBus.subscribe(MockTopics.BOOKS, l2); + + // run + eventBus.dispatch(MockTopics.FOOD, 'burger'); + + // assert + expect(l2).not.toHaveBeenCalled(); + }); +}); diff --git a/src/event-cache/EventCache.ts b/src/event-cache/EventCache.ts index 7a071e19..af8266bf 100644 --- a/src/event-cache/EventCache.ts +++ b/src/event-cache/EventCache.ts @@ -8,6 +8,7 @@ import { UserDetails, RumEvent } from '../dispatch/dataplane'; +import EventBus, { Topic } from '../event-bus/EventBus'; const webClientVersion = '1.14.0'; @@ -37,7 +38,11 @@ export class EventCache { * @param sessionManager The sessionManager returns user id, session id and handles session timeout. * @param pageManager The pageManager returns page id. */ - constructor(applicationDetails: AppMonitorDetails, config: Config) { + constructor( + applicationDetails: AppMonitorDetails, + config: Config, + private eventBus = new EventBus() + ) { this.appMonitorDetails = applicationDetails; this.config = config; this.enabled = true; @@ -220,12 +225,20 @@ export class EventCache { 'aws:clientVersion': webClientVersion }; - this.events.push({ - details: JSON.stringify(eventData), + const partialEvent = { id: v4(), - metadata: JSON.stringify(metaData), timestamp: new Date(), type + }; + this.eventBus.dispatch(Topic.EVENT, { + ...partialEvent, + details: eventData, + metadata: metaData + }); + this.events.push({ + ...partialEvent, + details: JSON.stringify(eventData), + metadata: JSON.stringify(metaData) }); }; diff --git a/src/event-cache/__tests__/EventCache.test.ts b/src/event-cache/__tests__/EventCache.test.ts index 9b009523..ad3d9fef 100644 --- a/src/event-cache/__tests__/EventCache.test.ts +++ b/src/event-cache/__tests__/EventCache.test.ts @@ -5,6 +5,8 @@ import { SessionManager } from '../../sessions/SessionManager'; import { RumEvent } from '../../dispatch/dataplane'; import { DEFAULT_CONFIG, mockFetch } from '../../test-utils/test-utils'; import { INSTALL_MODULE, INSTALL_SCRIPT } from '../../utils/constants'; +import EventBus, { Topic } from '../../event-bus/EventBus'; +jest.mock('../../event-bus/EventBus'); global.fetch = mockFetch; const getSession = jest.fn(() => ({ @@ -492,6 +494,60 @@ describe('EventCache tests', () => { expect(eventCache.getEventBatch().length).toEqual(1); }); + test('when event is recorded then events subscribers are notified with parsed rum event', async () => { + // Init + const EVENT1_SCHEMA = 'com.amazon.rum.event1'; + const bus = new EventBus(); + const eventCache: EventCache = Utils.createEventCache( + DEFAULT_CONFIG, + bus + ); + + const event = { + id: expect.stringMatching(/[0-9a-f\-]+/), + timestamp: new Date(), + type: EVENT1_SCHEMA, + metadata: `{"version":"1.0.0","aws:client":"${INSTALL_MODULE}","aws:clientVersion":"${WEB_CLIENT_VERSION}"}`, + details: '{}' + }; + + // Run + eventCache.recordEvent(EVENT1_SCHEMA, {}); + const eventBatch: RumEvent[] = eventCache.getEventBatch(); + expect(eventBatch).toEqual(expect.arrayContaining([event])); + // eslint-disable-next-line + expect(bus.dispatch).toHaveBeenCalledWith( + Topic.EVENT, + expect.objectContaining({ + id: expect.stringMatching(/[0-9a-f\-]+/), + timestamp: new Date(), + type: EVENT1_SCHEMA, + metadata: expect.objectContaining({ + version: '1.0.0', + 'aws:client': INSTALL_MODULE, + 'aws:clientVersion': WEB_CLIENT_VERSION + }), + details: expect.objectContaining({}) + }) + ); + }); + + test('when cache is disabled then subscribers are not notified', async () => { + // Init + const EVENT1_SCHEMA = 'com.amazon.rum.event1'; + const bus = new EventBus(); + const eventCache: EventCache = Utils.createEventCache( + DEFAULT_CONFIG, + bus + ); + // Run + eventCache.disable(); + eventCache.recordEvent(EVENT1_SCHEMA, {}); + const eventBatch: RumEvent[] = eventCache.getEventBatch(); + expect(eventBatch).toHaveLength(0); + expect(bus.dispatch).not.toHaveBeenCalled(); // eslint-disable-line + }); + test('when event limit is zero then recordEvent records all events', async () => { // Init const eventCount = 0; diff --git a/src/orchestration/Orchestration.ts b/src/orchestration/Orchestration.ts index 528480a3..4f363e41 100644 --- a/src/orchestration/Orchestration.ts +++ b/src/orchestration/Orchestration.ts @@ -24,6 +24,7 @@ import { FetchPlugin } from '../plugins/event-plugins/FetchPlugin'; import { PageViewPlugin } from '../plugins/event-plugins/PageViewPlugin'; import { PageAttributes } from '../sessions/PageManager'; import { INSTALL_MODULE } from '../utils/constants'; +import EventBus, { Topic } from '../event-bus/EventBus'; const DEFAULT_REGION = 'us-west-2'; const DEFAULT_ENDPOINT = `https://dataplane.rum.${DEFAULT_REGION}.amazonaws.com`; @@ -206,6 +207,7 @@ export class Orchestration { private eventCache: EventCache; private dispatchManager: Dispatch; private config: Config; + private eventBus = new EventBus(); /** * Instantiate the CloudWatch RUM web client and begin monitoring the @@ -444,7 +446,8 @@ export class Orchestration { config: this.config, record: this.eventCache.recordEvent, recordPageView: this.eventCache.recordPageView, - getSession: this.eventCache.getSession + getSession: this.eventCache.getSession, + eventBus: this.eventBus }; // Initialize PluginManager diff --git a/src/plugins/event-plugins/__tests__/FetchPlugin.test.ts b/src/plugins/event-plugins/__tests__/FetchPlugin.test.ts index 4d72ed8f..31cc68db 100644 --- a/src/plugins/event-plugins/__tests__/FetchPlugin.test.ts +++ b/src/plugins/event-plugins/__tests__/FetchPlugin.test.ts @@ -5,7 +5,9 @@ import { } from '../../utils/http-utils'; import { advanceTo } from 'jest-date-mock'; import { + context, DEFAULT_CONFIG, + getSession, record, recordPageView, xRayOffContext, @@ -73,6 +75,7 @@ describe('FetchPlugin tests', () => { mockFetchWithErrorObject.mockClear(); mockFetchWithErrorObjectAndStack.mockClear(); record.mockClear(); + getSession.mockClear(); }); test('when fetch is called then the plugin records the http request/response', async () => { @@ -537,17 +540,9 @@ describe('FetchPlugin tests', () => { logicalServiceName: 'sample.rum.aws.amazon.com', urlsToInclude: [/aws\.amazon\.com/] }; - const xRayOnContext: PluginContext = { - applicationId: 'b', - applicationVersion: '1.0', - config: { ...DEFAULT_CONFIG, ...{ enableXRay: true } }, - record, - recordPageView, - getSession - }; - + const context = Object.assign({}, xRayOnContext, { getSession }); const plugin: FetchPlugin = new FetchPlugin(config); - plugin.load(xRayOnContext); + plugin.load(context); // Run await fetch(URL); @@ -566,17 +561,10 @@ describe('FetchPlugin tests', () => { logicalServiceName: 'sample.rum.aws.amazon.com', urlsToInclude: [/aws\.amazon\.com/] }; - const xRayOnContext: PluginContext = { - applicationId: 'b', - applicationVersion: '1.0', - config: { ...DEFAULT_CONFIG, ...{ enableXRay: true } }, - record, - recordPageView, - getSession - }; + const context = Object.assign({}, xRayOnContext, { getSession }); const plugin: FetchPlugin = new FetchPlugin(config); - plugin.load(xRayOnContext); + plugin.load(context); // Run await fetch(URL); diff --git a/src/plugins/event-plugins/__tests__/ResourcePlugin.test.ts b/src/plugins/event-plugins/__tests__/ResourcePlugin.test.ts index 6bad5e8a..26fd8a91 100644 --- a/src/plugins/event-plugins/__tests__/ResourcePlugin.test.ts +++ b/src/plugins/event-plugins/__tests__/ResourcePlugin.test.ts @@ -70,18 +70,14 @@ describe('ResourcePlugin tests', () => { test('when recordResourceUrl is false then the resource name is not recorded', async () => { // Setup mockRandom(0); // Retain order in shuffle - const context: PluginContext = { - applicationId: 'b', - applicationVersion: '1.0', - config: { ...DEFAULT_CONFIG, recordResourceUrl: false }, - record, - recordPageView, - getSession - }; + const plugin: ResourcePlugin = buildResourcePlugin(); + const mockContext = Object.assign({}, context, { + config: { ...DEFAULT_CONFIG, recordResourceUrl: false } + }); // Run - plugin.load(context); + plugin.load(mockContext); window.dispatchEvent(new Event('load')); plugin.disable(); diff --git a/src/plugins/event-plugins/__tests__/XhrPlugin.test.ts b/src/plugins/event-plugins/__tests__/XhrPlugin.test.ts index 1233715a..0fd49b50 100644 --- a/src/plugins/event-plugins/__tests__/XhrPlugin.test.ts +++ b/src/plugins/event-plugins/__tests__/XhrPlugin.test.ts @@ -2,6 +2,7 @@ import { PartialHttpPluginConfig } from '../../utils/http-utils'; import { advanceTo } from 'jest-date-mock'; import { XhrPlugin } from '../XhrPlugin'; import { + context as mockContext, xRayOffContext, xRayOnContext, record, @@ -537,14 +538,9 @@ describe('XhrPlugin tests', () => { record: false, eventCount: 0 })); - const context: PluginContext = { - applicationId: 'b', - applicationVersion: '1.0', - config: DEFAULT_CONFIG, - record, - recordPageView, - getSession - }; + + const context = { ...mockContext, getSession }; + const config: PartialHttpPluginConfig = { logicalServiceName: 'sample.rum.aws.amazon.com', urlsToInclude: [/response\.json/] @@ -574,14 +570,7 @@ describe('XhrPlugin tests', () => { test('when getSession returns undefined then the plugin does not record a trace', async () => { // Init const getSession: jest.MockedFunction = jest.fn(); - const context: PluginContext = { - applicationId: 'b', - applicationVersion: '1.0', - config: { ...DEFAULT_CONFIG, ...{ enableXRay: true } }, - record, - recordPageView, - getSession - }; + const context = { ...mockContext, getSession }; const config: PartialHttpPluginConfig = { logicalServiceName: 'sample.rum.aws.amazon.com', urlsToInclude: [/response\.json/], diff --git a/src/plugins/types.ts b/src/plugins/types.ts index 2aec3745..670b52de 100644 --- a/src/plugins/types.ts +++ b/src/plugins/types.ts @@ -1,9 +1,9 @@ import { Config } from '../orchestration/Orchestration'; import { Session } from '../sessions/SessionManager'; +import EventBus, { Topic } from '../event-bus/EventBus'; export type RecordEvent = (type: string, eventData: object) => void; export type RecordPageView = (pageId: string) => void; - export type GetSession = () => Session | undefined; export type PluginContext = { @@ -13,4 +13,5 @@ export type PluginContext = { record: RecordEvent; recordPageView: RecordPageView; getSession: GetSession; + eventBus: EventBus; }; diff --git a/src/test-utils/test-utils.ts b/src/test-utils/test-utils.ts index 7ba16b3e..592c0136 100644 --- a/src/test-utils/test-utils.ts +++ b/src/test-utils/test-utils.ts @@ -17,6 +17,8 @@ import { UserDetails } from '../dispatch/dataplane'; import { ReadableStream } from 'web-streams-polyfill'; +import EventBus from '../event-bus/EventBus'; +jest.mock('../event-bus/EventBus'); export const AWS_RUM_ENDPOINT = new URL( 'https://rumservicelambda.us-west-2.amazonaws.com' @@ -64,8 +66,11 @@ export const createDefaultEventCache = (): EventCache => { return new EventCache(APP_MONITOR_DETAILS, DEFAULT_CONFIG); }; -export const createEventCache = (config: Config): EventCache => { - return new EventCache(APP_MONITOR_DETAILS, config); +export const createEventCache = ( + config: Config, + bus?: EventBus +): EventCache => { + return new EventCache(APP_MONITOR_DETAILS, config, bus); }; export const createDefaultEventCacheWithEvents = (): EventCache => { @@ -112,25 +117,18 @@ export const context: PluginContext = { config: DEFAULT_CONFIG, record, recordPageView, - getSession + getSession, + eventBus: new EventBus() }; export const xRayOffContext: PluginContext = { - applicationId: 'b', - applicationVersion: '1.0', - config: { ...DEFAULT_CONFIG, ...{ enableXRay: false } }, - record, - recordPageView, - getSession + ...context, + config: { ...DEFAULT_CONFIG, ...{ enableXRay: false } } }; export const xRayOnContext: PluginContext = { - applicationId: 'b', - applicationVersion: '1.0', - config: { ...DEFAULT_CONFIG, ...{ enableXRay: true } }, - record, - recordPageView, - getSession + ...context, + config: { ...DEFAULT_CONFIG, ...{ enableXRay: true } } }; export const stringToUtf16 = (inputString: string) => {