From 981d58f42b94585a6a6b2592d8a7186c0a5c7bbd Mon Sep 17 00:00:00 2001 From: Brett Hoerner Date: Mon, 29 Apr 2024 15:44:22 -0600 Subject: [PATCH] fix(plugin-server): move null replacement and add test (#21954) --- .../batch-processing/each-batch-ingestion.ts | 1 - plugin-server/src/utils/event.ts | 2 +- .../src/worker/ingestion/process-event.ts | 1 - .../event-pipeline/normalizeEventStep.test.ts | 42 ++++++++++++++++++- 4 files changed, 41 insertions(+), 5 deletions(-) diff --git a/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts b/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts index e952daa72f23c..313e0cf083f6d 100644 --- a/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts +++ b/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts @@ -163,7 +163,6 @@ export async function eachBatchParallelIngestion( // Process every message sequentially, stash promises to await on later for (const { message, pluginEvent } of currentBatch) { try { - pluginEvent.distinct_id = pluginEvent.distinct_id.replaceAll('\u0000', '') const result = (await retryIfRetriable(async () => { const runner = new EventPipelineRunner(queue.pluginsServer, pluginEvent) return await runner.runEventPipeline(pluginEvent) diff --git a/plugin-server/src/utils/event.ts b/plugin-server/src/utils/event.ts index 748500afc13e1..91d045011370a 100644 --- a/plugin-server/src/utils/event.ts +++ b/plugin-server/src/utils/event.ts @@ -144,7 +144,7 @@ export function normalizeProcessPerson(event: PluginEvent, processPerson: boolea } export function normalizeEvent(event: PluginEvent): PluginEvent { - event.distinct_id = event.distinct_id?.toString() + event.distinct_id = event.distinct_id?.toString().replace(/\u0000/g, '\uFFFD') let properties = event.properties ?? {} if (event['$set']) { diff --git a/plugin-server/src/worker/ingestion/process-event.ts b/plugin-server/src/worker/ingestion/process-event.ts index 600ecbe2b6a62..d3f49b85cfd71 100644 --- a/plugin-server/src/worker/ingestion/process-event.ts +++ b/plugin-server/src/worker/ingestion/process-event.ts @@ -80,7 +80,6 @@ export class EventsProcessor { 'Still inside "EventsProcessor.processEvent". Timeout warning after 30 sec!', () => ({ event: JSON.stringify(data) }) ) - distinctId = distinctId.replace('\u0000', '') let result: PreIngestionEvent | null = null try { diff --git a/plugin-server/tests/worker/ingestion/event-pipeline/normalizeEventStep.test.ts b/plugin-server/tests/worker/ingestion/event-pipeline/normalizeEventStep.test.ts index 8fda4db2f2e93..3d14f463cbfa5 100644 --- a/plugin-server/tests/worker/ingestion/event-pipeline/normalizeEventStep.test.ts +++ b/plugin-server/tests/worker/ingestion/event-pipeline/normalizeEventStep.test.ts @@ -5,6 +5,11 @@ import { UUIDT } from '../../../../src/utils/utils' import { normalizeEventStep } from '../../../../src/worker/ingestion/event-pipeline/normalizeEventStep' import { createOrganization, createTeam, resetTestDatabase } from '../../../helpers/sql' +// A simple deep copy to ensure we aren't comparing an event object with itself below. +function copy(a: any) { + return JSON.parse(JSON.stringify(a)) +} + describe('normalizeEventStep()', () => { it('normalizes the event with properties set by plugins', async () => { await resetTestDatabase() @@ -34,7 +39,7 @@ describe('normalizeEventStep()', () => { } const processPerson = true - const [resEvent, timestamp] = await normalizeEventStep(event, processPerson) + const [resEvent, timestamp] = await normalizeEventStep(copy(event), processPerson) expect(resEvent).toEqual({ ...event, @@ -54,6 +59,35 @@ describe('normalizeEventStep()', () => { expect(timestamp).toEqual(DateTime.fromISO(event.timestamp!, { zone: 'utc' })) }) + it('replaces null byte with unicode replacement character in distinct_id', async () => { + await resetTestDatabase() + const [hub, _] = await createHub() + const organizationId = await createOrganization(hub.db.postgres) + const teamId = await createTeam(hub.db.postgres, organizationId) + const uuid = new UUIDT().toString() + const event = { + distinct_id: '\u0000foo', + ip: null, + site_url: 'http://localhost', + team_id: teamId, + now: '2020-02-23T02:15:00Z', + timestamp: '2020-02-23T02:15:00Z', + event: 'default event', + uuid: uuid, + } + + const processPerson = true + const [resEvent, timestamp] = await normalizeEventStep(copy(event), processPerson) + + expect(resEvent).toEqual({ + ...event, + distinct_id: '\uFFFDfoo', + properties: {}, + }) + + expect(timestamp).toEqual(DateTime.fromISO(event.timestamp!, { zone: 'utc' })) + }) + it('normalizes $process_person_profile=false events by dropping $set and related', async () => { await resetTestDatabase() const [hub, _] = await createHub() @@ -88,7 +122,11 @@ describe('normalizeEventStep()', () => { } const processPerson = false - const [resEvent, timestamp] = await normalizeEventStep(event, processPerson) + const [resEvent, timestamp] = await normalizeEventStep(copy(event), processPerson) + + // These should be gone in the comparison below. + delete (event as any)['$set'] + delete (event as any)['$set_once'] expect(resEvent).toEqual({ ...event,