From 839d97707b6f3acfc053fb2beda6516d1ec6211e Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Tue, 7 Nov 2023 23:21:02 +0000 Subject: [PATCH] feat(plugin-server): send only elementsChain to webhooks --- .../batch-processing/each-batch-webhooks.ts | 9 +-------- plugin-server/src/types.ts | 3 ++- plugin-server/src/utils/db/elements-chain.ts | 6 +++--- plugin-server/src/utils/event.ts | 10 ++++------ plugin-server/src/worker/ingestion/action-matcher.ts | 9 +++------ .../ingestion/event-pipeline/runAsyncHandlersStep.ts | 3 +-- .../worker/vm/upgrades/utils/fetchEventsForInterval.ts | 3 ++- plugin-server/tests/utils/db/elements-chain.test.ts | 10 +++++----- 8 files changed, 21 insertions(+), 32 deletions(-) diff --git a/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-webhooks.ts b/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-webhooks.ts index 7237f0d4a7b68..d358b471f26c3 100644 --- a/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-webhooks.ts +++ b/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-webhooks.ts @@ -6,7 +6,7 @@ import { ActionMatcher } from 'worker/ingestion/action-matcher' import { PostIngestionEvent, RawClickHouseEvent } from '../../../types' import { DependencyUnavailableError } from '../../../utils/db/error' -import { convertToIngestionEvent, convertToProcessedPluginEvent } from '../../../utils/event' +import { convertToIngestionEvent } from '../../../utils/event' import { status } from '../../../utils/status' import { processWebhooksStep } from '../../../worker/ingestion/event-pipeline/runAsyncHandlersStep' import { HookCommander } from '../../../worker/ingestion/hooks' @@ -151,13 +151,6 @@ export async function eachMessageWebhooksHandlers( } const event = convertToIngestionEvent(clickHouseEvent) - // TODO: previously onEvent and Webhooks were executed in the same process, - // and onEvent would call convertToProcessedPluginEvent, which ends up - // mutating the `event` that is passed in. To ensure that we have the same - // behaviour we run this here, but we should probably refactor this to - // ensure that we don't mutate the event. - convertToProcessedPluginEvent(event) - await runInstrumentedFunction({ func: () => runWebhooks(statsd, actionMatcher, hookCannon, event), statsKey: `kafka_queue.process_async_handlers_webhooks`, diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index e295f5414e05a..7f973b3657414 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -665,7 +665,8 @@ export interface PreIngestionEvent { } /** Ingestion event after saving, currently just an alias of BaseIngestionEvent */ -export interface PostIngestionEvent extends BaseIngestionEvent { +export interface PostIngestionEvent extends Omit { + elementsChain?: string person_id?: string // This is not optional, but BaseEvent needs to be fixed first person_created_at: ISOTimestamp | null person_properties: Properties diff --git a/plugin-server/src/utils/db/elements-chain.ts b/plugin-server/src/utils/db/elements-chain.ts index 0d086d5b0eb52..3512b980049a3 100644 --- a/plugin-server/src/utils/db/elements-chain.ts +++ b/plugin-server/src/utils/db/elements-chain.ts @@ -103,12 +103,13 @@ export function chainToElements(chain: string, teamId: number, options: { throwO return elements } +/** Clean up a user provided elements list, so it could be inserted into the database */ export function extractElements(elements: Array>): Element[] { return elements.map((el) => ({ text: el['$el_text']?.slice(0, 400), tag_name: el['tag_name'], href: el['attr__href']?.slice(0, 2048), - attr_class: extractAttrClass(el), + attr_class: extractAttrClass(el['attr__class']), attr_id: el['attr__id'], nth_child: el['nth_child'], nth_of_type: el['nth_of_type'], @@ -116,8 +117,7 @@ export function extractElements(elements: Array>): Element[] })) } -function extractAttrClass(el: Record): Element['attr_class'] { - const attr_class = el['attr__class'] +export function extractAttrClass(attr_class: string | undefined | any[]): Element['attr_class'] { if (!attr_class) { return undefined } else if (Array.isArray(attr_class)) { diff --git a/plugin-server/src/utils/event.ts b/plugin-server/src/utils/event.ts index 3c05b7894974b..d179316442dcb 100644 --- a/plugin-server/src/utils/event.ts +++ b/plugin-server/src/utils/event.ts @@ -23,7 +23,9 @@ export function convertToProcessedPluginEvent(event: PostIngestionEvent): Proces $set: event.properties.$set, $set_once: event.properties.$set_once, uuid: event.eventUuid, - elements: convertDatabaseElementsToRawElements(event.elementsList ?? []), + elements: event.elementsChain + ? convertDatabaseElementsToRawElements(chainToElements(event.elementsChain, event.teamId) ?? []) + : undefined, } } @@ -71,11 +73,7 @@ export function convertToIngestionEvent(event: RawClickHouseEvent, skipElementsC distinctId: event.distinct_id, properties, timestamp: clickHouseTimestampToISO(event.timestamp), - elementsList: skipElementsChain - ? [] - : event.elements_chain - ? chainToElements(event.elements_chain, event.team_id) - : [], + elementsChain: skipElementsChain ? undefined : event.elements_chain, person_id: event.person_id, person_created_at: event.person_created_at ? clickHouseTimestampSecondPrecisionToISO(event.person_created_at) diff --git a/plugin-server/src/worker/ingestion/action-matcher.ts b/plugin-server/src/worker/ingestion/action-matcher.ts index 13d5dc898ba80..1d336c67671af 100644 --- a/plugin-server/src/worker/ingestion/action-matcher.ts +++ b/plugin-server/src/worker/ingestion/action-matcher.ts @@ -19,7 +19,7 @@ import { PropertyOperator, StringMatching, } from '../../types' -import { extractElements } from '../../utils/db/elements-chain' +import { chainToElements } from '../../utils/db/elements-chain' import { PostgresRouter, PostgresUse } from '../../utils/db/postgres' import { stringToBoolean } from '../../utils/env-utils' import { stringify } from '../../utils/utils' @@ -141,13 +141,10 @@ export class ActionMatcher { } /** Get all actions matched to the event. */ - public async match(event: PostIngestionEvent, elements?: Element[]): Promise { + public async match(event: PostIngestionEvent): Promise { const matchingStart = new Date() const teamActions: Action[] = Object.values(this.actionManager.getTeamActions(event.teamId)) - if (!elements) { - const rawElements: Record[] | undefined = event.properties?.['$elements'] - elements = rawElements ? extractElements(rawElements) : [] - } + const elements: Element[] = event.elementsChain ? chainToElements(event.elementsChain, event.teamId) : [] const teamActionsMatching: boolean[] = await Promise.all( teamActions.map((action) => this.checkAction(event, elements, action)) ) diff --git a/plugin-server/src/worker/ingestion/event-pipeline/runAsyncHandlersStep.ts b/plugin-server/src/worker/ingestion/event-pipeline/runAsyncHandlersStep.ts index 4fcc4d1bd2a49..a01e0cc86d8fc 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/runAsyncHandlersStep.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/runAsyncHandlersStep.ts @@ -27,8 +27,7 @@ export async function processWebhooksStep( hookCannon: HookCommander ) { const actionMatches = await instrumentWebhookStep('actionMatching', async () => { - const elements = event.elementsList - return await actionMatcher.match(event, elements) + return await actionMatcher.match(event) }) await instrumentWebhookStep('findAndfireHooks', async () => { await hookCannon.findAndFireHooks(event, actionMatches) diff --git a/plugin-server/src/worker/vm/upgrades/utils/fetchEventsForInterval.ts b/plugin-server/src/worker/vm/upgrades/utils/fetchEventsForInterval.ts index 16353c424a5c6..7fd632d5f1bfa 100644 --- a/plugin-server/src/worker/vm/upgrades/utils/fetchEventsForInterval.ts +++ b/plugin-server/src/worker/vm/upgrades/utils/fetchEventsForInterval.ts @@ -3,6 +3,7 @@ import { DateTime } from 'luxon' import { Element, RawClickHouseEvent, TimestampFormat } from '../../../../types' import { DB } from '../../../../utils/db/db' +import { extractAttrClass } from '../../../../utils/db/elements-chain' import { parseRawClickHouseEvent } from '../../../../utils/event' import { status } from '../../../../utils/status' import { castTimestampToClickhouseFormat } from '../../../../utils/utils' @@ -95,7 +96,7 @@ const addHistoricalExportEventProperties = (event: HistoricalExportEvent): Histo export const convertDatabaseElementsToRawElements = (elements: RawElement[]): RawElement[] => { for (const element of elements) { if (element.attributes && element.attributes.attr__class) { - element.attr_class = element.attributes.attr__class + element.attr_class = extractAttrClass(element.attributes.attr__class) } if (element.text) { element.$el_text = element.text diff --git a/plugin-server/tests/utils/db/elements-chain.test.ts b/plugin-server/tests/utils/db/elements-chain.test.ts index ab480e9c37da2..b90c572584242 100644 --- a/plugin-server/tests/utils/db/elements-chain.test.ts +++ b/plugin-server/tests/utils/db/elements-chain.test.ts @@ -31,7 +31,7 @@ describe('elementsToString and chainToElements', () => { ].join(';') ) - const elements = chainToElements(elementsString, { throwOnError: true }) + const elements = chainToElements(elementsString, 33, { throwOnError: true }) expect(elements.length).toBe(4) expect(elements[0].tag_name).toEqual('a') expect(elements[0].href).toEqual('/a-url') @@ -51,12 +51,12 @@ describe('elementsToString and chainToElements', () => { }) it('handles empty strings', () => { - const elements = chainToElements('', { throwOnError: true }) + const elements = chainToElements('', 33, { throwOnError: true }) expect(elements).toEqual([]) }) it('handles broken class names', () => { - const elements = chainToElements('"a........small', { throwOnError: true }) + const elements = chainToElements('"a........small', 33, { throwOnError: true }) expect(elements).not.toEqual([]) expect(elements[0]).toEqual( expect.objectContaining({ @@ -82,7 +82,7 @@ describe('elementsToString and chainToElements', () => { 'a.small.xy:z:attr_class="xyz small\\""href="/a-url"nth-child="0"nth-of-type="0"' ) - const elements = chainToElements(elementsString, { throwOnError: true }) + const elements = chainToElements(elementsString, 33, { throwOnError: true }) expect(elements.length).toEqual(1) expect(elements[0]).toEqual( expect.objectContaining({ @@ -107,7 +107,7 @@ describe('elementsToString and chainToElements', () => { const elementsString = elementsToString([element]) expect(elementsString).toEqual('.another.something:attr__class="something another"nth-child="0"nth-of-type="0"') - expect(chainToElements(elementsString)).toEqual([expect.objectContaining(element)]) + expect(chainToElements(elementsString, 33)).toEqual([expect.objectContaining(element)]) }) })