Skip to content

Commit

Permalink
feat(plugin-server): send only elementsChain to webhooks
Browse files Browse the repository at this point in the history
  • Loading branch information
mariusandra committed Nov 7, 2023
1 parent 06e9938 commit 839d977
Show file tree
Hide file tree
Showing 8 changed files with 21 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { ActionMatcher } from 'worker/ingestion/action-matcher'

import { PostIngestionEvent, RawClickHouseEvent } from '../../../types'
import { DependencyUnavailableError } from '../../../utils/db/error'
import { convertToIngestionEvent, convertToProcessedPluginEvent } from '../../../utils/event'
import { convertToIngestionEvent } from '../../../utils/event'
import { status } from '../../../utils/status'
import { processWebhooksStep } from '../../../worker/ingestion/event-pipeline/runAsyncHandlersStep'
import { HookCommander } from '../../../worker/ingestion/hooks'
Expand Down Expand Up @@ -151,13 +151,6 @@ export async function eachMessageWebhooksHandlers(
}
const event = convertToIngestionEvent(clickHouseEvent)

// TODO: previously onEvent and Webhooks were executed in the same process,
// and onEvent would call convertToProcessedPluginEvent, which ends up
// mutating the `event` that is passed in. To ensure that we have the same
// behaviour we run this here, but we should probably refactor this to
// ensure that we don't mutate the event.
convertToProcessedPluginEvent(event)

await runInstrumentedFunction({
func: () => runWebhooks(statsd, actionMatcher, hookCannon, event),
statsKey: `kafka_queue.process_async_handlers_webhooks`,
Expand Down
3 changes: 2 additions & 1 deletion plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,8 @@ export interface PreIngestionEvent {
}

/** Ingestion event after saving, currently just an alias of BaseIngestionEvent */
export interface PostIngestionEvent extends BaseIngestionEvent {
export interface PostIngestionEvent extends Omit<BaseIngestionEvent, 'elementsList'> {
elementsChain?: string
person_id?: string // This is not optional, but BaseEvent needs to be fixed first
person_created_at: ISOTimestamp | null
person_properties: Properties
Expand Down
6 changes: 3 additions & 3 deletions plugin-server/src/utils/db/elements-chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,21 +103,21 @@ export function chainToElements(chain: string, teamId: number, options: { throwO
return elements
}

/** Clean up a user provided elements list, so it could be inserted into the database */
export function extractElements(elements: Array<Record<string, any>>): Element[] {
return elements.map((el) => ({
text: el['$el_text']?.slice(0, 400),
tag_name: el['tag_name'],
href: el['attr__href']?.slice(0, 2048),
attr_class: extractAttrClass(el),
attr_class: extractAttrClass(el['attr__class']),
attr_id: el['attr__id'],
nth_child: el['nth_child'],
nth_of_type: el['nth_of_type'],
attributes: Object.fromEntries(Object.entries(el).filter(([key]) => key.startsWith('attr__'))),
}))
}

function extractAttrClass(el: Record<string, any>): Element['attr_class'] {
const attr_class = el['attr__class']
export function extractAttrClass(attr_class: string | undefined | any[]): Element['attr_class'] {
if (!attr_class) {
return undefined
} else if (Array.isArray(attr_class)) {
Expand Down
10 changes: 4 additions & 6 deletions plugin-server/src/utils/event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ export function convertToProcessedPluginEvent(event: PostIngestionEvent): Proces
$set: event.properties.$set,
$set_once: event.properties.$set_once,
uuid: event.eventUuid,
elements: convertDatabaseElementsToRawElements(event.elementsList ?? []),
elements: event.elementsChain
? convertDatabaseElementsToRawElements(chainToElements(event.elementsChain, event.teamId) ?? [])
: undefined,
}
}

Expand Down Expand Up @@ -71,11 +73,7 @@ export function convertToIngestionEvent(event: RawClickHouseEvent, skipElementsC
distinctId: event.distinct_id,
properties,
timestamp: clickHouseTimestampToISO(event.timestamp),
elementsList: skipElementsChain
? []
: event.elements_chain
? chainToElements(event.elements_chain, event.team_id)
: [],
elementsChain: skipElementsChain ? undefined : event.elements_chain,
person_id: event.person_id,
person_created_at: event.person_created_at
? clickHouseTimestampSecondPrecisionToISO(event.person_created_at)
Expand Down
9 changes: 3 additions & 6 deletions plugin-server/src/worker/ingestion/action-matcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import {
PropertyOperator,
StringMatching,
} from '../../types'
import { extractElements } from '../../utils/db/elements-chain'
import { chainToElements } from '../../utils/db/elements-chain'
import { PostgresRouter, PostgresUse } from '../../utils/db/postgres'
import { stringToBoolean } from '../../utils/env-utils'
import { stringify } from '../../utils/utils'
Expand Down Expand Up @@ -141,13 +141,10 @@ export class ActionMatcher {
}

/** Get all actions matched to the event. */
public async match(event: PostIngestionEvent, elements?: Element[]): Promise<Action[]> {
public async match(event: PostIngestionEvent): Promise<Action[]> {
const matchingStart = new Date()
const teamActions: Action[] = Object.values(this.actionManager.getTeamActions(event.teamId))
if (!elements) {
const rawElements: Record<string, any>[] | undefined = event.properties?.['$elements']
elements = rawElements ? extractElements(rawElements) : []
}
const elements: Element[] = event.elementsChain ? chainToElements(event.elementsChain, event.teamId) : []
const teamActionsMatching: boolean[] = await Promise.all(
teamActions.map((action) => this.checkAction(event, elements, action))
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ export async function processWebhooksStep(
hookCannon: HookCommander
) {
const actionMatches = await instrumentWebhookStep('actionMatching', async () => {
const elements = event.elementsList
return await actionMatcher.match(event, elements)
return await actionMatcher.match(event)
})
await instrumentWebhookStep('findAndfireHooks', async () => {
await hookCannon.findAndFireHooks(event, actionMatches)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { DateTime } from 'luxon'

import { Element, RawClickHouseEvent, TimestampFormat } from '../../../../types'
import { DB } from '../../../../utils/db/db'
import { extractAttrClass } from '../../../../utils/db/elements-chain'
import { parseRawClickHouseEvent } from '../../../../utils/event'
import { status } from '../../../../utils/status'
import { castTimestampToClickhouseFormat } from '../../../../utils/utils'
Expand Down Expand Up @@ -95,7 +96,7 @@ const addHistoricalExportEventProperties = (event: HistoricalExportEvent): Histo
export const convertDatabaseElementsToRawElements = (elements: RawElement[]): RawElement[] => {
for (const element of elements) {
if (element.attributes && element.attributes.attr__class) {
element.attr_class = element.attributes.attr__class
element.attr_class = extractAttrClass(element.attributes.attr__class)
}
if (element.text) {
element.$el_text = element.text
Expand Down
10 changes: 5 additions & 5 deletions plugin-server/tests/utils/db/elements-chain.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ describe('elementsToString and chainToElements', () => {
].join(';')
)

const elements = chainToElements(elementsString, { throwOnError: true })
const elements = chainToElements(elementsString, 33, { throwOnError: true })
expect(elements.length).toBe(4)
expect(elements[0].tag_name).toEqual('a')
expect(elements[0].href).toEqual('/a-url')
Expand All @@ -51,12 +51,12 @@ describe('elementsToString and chainToElements', () => {
})

it('handles empty strings', () => {
const elements = chainToElements('', { throwOnError: true })
const elements = chainToElements('', 33, { throwOnError: true })
expect(elements).toEqual([])
})

it('handles broken class names', () => {
const elements = chainToElements('"a........small', { throwOnError: true })
const elements = chainToElements('"a........small', 33, { throwOnError: true })
expect(elements).not.toEqual([])
expect(elements[0]).toEqual(
expect.objectContaining({
Expand All @@ -82,7 +82,7 @@ describe('elementsToString and chainToElements', () => {
'a.small.xy:z:attr_class="xyz small\\""href="/a-url"nth-child="0"nth-of-type="0"'
)

const elements = chainToElements(elementsString, { throwOnError: true })
const elements = chainToElements(elementsString, 33, { throwOnError: true })
expect(elements.length).toEqual(1)
expect(elements[0]).toEqual(
expect.objectContaining({
Expand All @@ -107,7 +107,7 @@ describe('elementsToString and chainToElements', () => {
const elementsString = elementsToString([element])

expect(elementsString).toEqual('.another.something:attr__class="something another"nth-child="0"nth-of-type="0"')
expect(chainToElements(elementsString)).toEqual([expect.objectContaining(element)])
expect(chainToElements(elementsString, 33)).toEqual([expect.objectContaining(element)])
})
})

Expand Down

0 comments on commit 839d977

Please sign in to comment.