From 70ef252b7303ea7889b939288c26c32f99fbbce1 Mon Sep 17 00:00:00 2001 From: Dave Murphy Date: Tue, 31 Oct 2023 15:42:36 -0700 Subject: [PATCH 01/17] Move UUID validation from `process` step to `prepare` step --- .../ingestion/event-pipeline/prepareEventStep.ts | 10 ++++++++++ plugin-server/src/worker/ingestion/process-event.ts | 9 +-------- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/plugin-server/src/worker/ingestion/event-pipeline/prepareEventStep.ts b/plugin-server/src/worker/ingestion/event-pipeline/prepareEventStep.ts index e51fa5df4a477..ad6ef5e9d0a69 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/prepareEventStep.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/prepareEventStep.ts @@ -1,12 +1,22 @@ import { PluginEvent } from '@posthog/plugin-scaffold' import { PreIngestionEvent } from 'types' +import { UUID } from '../../../utils/utils' import { parseEventTimestamp } from '../timestamps' import { captureIngestionWarning } from '../utils' import { EventPipelineRunner } from './runner' export async function prepareEventStep(runner: EventPipelineRunner, event: PluginEvent): Promise { const { team_id, uuid } = event + const { db } = runner.hub + + if (!UUID.validateString(uuid, false)) { + await captureIngestionWarning(db, team_id, 'skipping_event_invalid_uuid', { + eventUuid: JSON.stringify(uuid), + }) + throw new Error(`Not a valid UUID: "${uuid}"`) + } + const tsParsingIngestionWarnings: Promise[] = [] const invalidTimestampCallback = function (type: string, details: Record) { // TODO: make that metric name more generic when transitionning to prometheus diff --git a/plugin-server/src/worker/ingestion/process-event.ts b/plugin-server/src/worker/ingestion/process-event.ts index 92b4e3f7b4d26..6d0951f8fa717 100644 --- a/plugin-server/src/worker/ingestion/process-event.ts +++ b/plugin-server/src/worker/ingestion/process-event.ts @@ -26,13 +26,12 @@ import { elementsToString, extractElements } from '../../utils/db/elements-chain import { KafkaProducerWrapper } from '../../utils/db/kafka-producer-wrapper' import { safeClickhouseString, sanitizeEventName, timeoutGuard } from '../../utils/db/utils' import { status } from '../../utils/status' -import { castTimestampOrNow, UUID } from '../../utils/utils' +import { castTimestampOrNow } from '../../utils/utils' import { GroupTypeManager } from './group-type-manager' import { addGroupProperties } from './groups' import { upsertGroup } from './properties-updater' import { PropertyDefinitionsManager } from './property-definitions-manager' import { TeamManager } from './team-manager' -import { captureIngestionWarning } from './utils' export class EventsProcessor { pluginsServer: Hub @@ -66,12 +65,6 @@ export class EventsProcessor { timestamp: DateTime, eventUuid: string ): Promise { - if (!UUID.validateString(eventUuid, false)) { - await captureIngestionWarning(this.db, teamId, 'skipping_event_invalid_uuid', { - eventUuid: JSON.stringify(eventUuid), - }) - throw new Error(`Not a valid UUID: "${eventUuid}"`) - } const singleSaveTimer = new Date() const timeout = timeoutGuard('Still inside "EventsProcessor.processEvent". Timeout warning after 30 sec!', { event: JSON.stringify(data), From 1d69564e5a82245dea34919bdb7d48d621ced505 Mon Sep 17 00:00:00 2001 From: Dave Murphy Date: Thu, 2 Nov 2023 11:30:16 -0700 Subject: [PATCH 02/17] Move uuid validation all the way up to `populateTeamData` --- .../event-pipeline/populateTeamDataStep.ts | 21 +++++++++++++++++++ .../event-pipeline/prepareEventStep.ts | 9 -------- 2 files changed, 21 insertions(+), 9 deletions(-) diff --git a/plugin-server/src/worker/ingestion/event-pipeline/populateTeamDataStep.ts b/plugin-server/src/worker/ingestion/event-pipeline/populateTeamDataStep.ts index 740177e2c4f61..23f1e2c418a64 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/populateTeamDataStep.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/populateTeamDataStep.ts @@ -3,6 +3,8 @@ import { Counter } from 'prom-client' import { eventDroppedCounter } from '../../../main/ingestion-queues/metrics' import { PipelineEvent } from '../../../types' +import { UUID } from '../../../utils/utils' +import { captureIngestionWarning } from '../utils' import { EventPipelineRunner } from './runner' export const tokenOrTeamPresentCounter = new Counter({ @@ -23,6 +25,8 @@ export async function populateTeamDataStep( * For these, we trust the team_id field value. */ + const { db } = runner.hub + // Collect statistics on the shape of events we are ingesting. tokenOrTeamPresentCounter .labels({ @@ -30,6 +34,7 @@ export async function populateTeamDataStep( token_present: event.token ? 'true' : 'false', }) .inc() + // statsd copy as prometheus is currently not supported in worker threads. runner.hub.statsd?.increment('ingestion_event_hasauthinfo', { team_id_present: event.team_id ? 'true' : 'false', @@ -38,6 +43,14 @@ export async function populateTeamDataStep( // If a team_id is present (event captured from an app), trust it and return the event as is. if (event.team_id) { + // Check for an invalid UUID, which should be blocked by capture, when team_id is present + if (!UUID.validateString(event.uuid, false)) { + await captureIngestionWarning(db, event.team_id, 'skipping_event_invalid_uuid', { + eventUuid: JSON.stringify(event.uuid), + }) + throw new Error(`Not a valid UUID: "${event.uuid}"`) + } + return event as PluginEvent } @@ -69,6 +82,14 @@ export async function populateTeamDataStep( return null } + // Check for an invalid UUID, which should be blocked by capture, when team_id is present + if (!UUID.validateString(event.uuid, false)) { + await captureIngestionWarning(db, team.id, 'skipping_event_invalid_uuid', { + eventUuid: JSON.stringify(event.uuid), + }) + throw new Error(`Not a valid UUID: "${event.uuid}"`) + } + event = { ...event, team_id: team.id, diff --git a/plugin-server/src/worker/ingestion/event-pipeline/prepareEventStep.ts b/plugin-server/src/worker/ingestion/event-pipeline/prepareEventStep.ts index ad6ef5e9d0a69..29bbcc9586c77 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/prepareEventStep.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/prepareEventStep.ts @@ -1,21 +1,12 @@ import { PluginEvent } from '@posthog/plugin-scaffold' import { PreIngestionEvent } from 'types' -import { UUID } from '../../../utils/utils' import { parseEventTimestamp } from '../timestamps' import { captureIngestionWarning } from '../utils' import { EventPipelineRunner } from './runner' export async function prepareEventStep(runner: EventPipelineRunner, event: PluginEvent): Promise { const { team_id, uuid } = event - const { db } = runner.hub - - if (!UUID.validateString(uuid, false)) { - await captureIngestionWarning(db, team_id, 'skipping_event_invalid_uuid', { - eventUuid: JSON.stringify(uuid), - }) - throw new Error(`Not a valid UUID: "${uuid}"`) - } const tsParsingIngestionWarnings: Promise[] = [] const invalidTimestampCallback = function (type: string, details: Record) { From b3553dcebc4fd85b51b915c12afd6ac4845c131e Mon Sep 17 00:00:00 2001 From: Dave Murphy Date: Thu, 2 Nov 2023 11:32:07 -0700 Subject: [PATCH 03/17] Update tests to use `processEvent` instead of `eventsProcessor.processEvent` since we moved the logic --- .../tests/main/process-event.test.ts | 34 +++++++++++++------ 1 file changed, 24 insertions(+), 10 deletions(-) diff --git a/plugin-server/tests/main/process-event.test.ts b/plugin-server/tests/main/process-event.test.ts index 786acbbd7698e..5afc0e334752b 100644 --- a/plugin-server/tests/main/process-event.test.ts +++ b/plugin-server/tests/main/process-event.test.ts @@ -2459,26 +2459,40 @@ test('long event name substr', async () => { expect(event.event?.length).toBe(200) }) -test('throws with bad uuid', async () => { - await expect( - eventsProcessor.processEvent( - 'xxx', - { event: 'E', properties: { price: 299.99, name: 'AirPods Pro' } } as any as PluginEvent, +describe('validates UUID passed in eventUuid', () => { + test('throws with invalid uuid string', async () => { + await processEvent( + 'i_am_a_distinct_id', + '', + '', + { + event: 'E', + properties: { price: 299.99, name: 'AirPods Pro', distinct_id: 'distinct_id1' }, + } as any as PluginEvent, team.id, - DateTime.utc(), + now, 'this is not an uuid' ) - ).rejects.toEqual(new Error('Not a valid UUID: "this is not an uuid"')) - await expect( - eventsProcessor.processEvent( + // expect((await hub.db.fetchEvents()).length).toBe(1) + const [event] = await hub.db.fetchEvents() + expect(event.event).toEqual('Not a valid UUID: "this is not an uuid"') + }) + test('throws with null value in eventUUID', async () => { + await processEvent( 'xxx', + '', + '', { event: 'E', properties: { price: 299.99, name: 'AirPods Pro' } } as any as PluginEvent, team.id, DateTime.utc(), null as any ) - ).rejects.toEqual(new Error('Not a valid UUID: "null"')) + + expect((await hub.db.fetchEvents()).length).toBe(1) + const [event] = await hub.db.fetchEvents() + expect(event.event).toEqual('Not a valid UUID: "null"') + }) }) test('any event can do $set on props (user exists)', async () => { From 96e815dc0160bbbe2fa1416109dbac8fa09a2b71 Mon Sep 17 00:00:00 2001 From: Dave Murphy Date: Thu, 2 Nov 2023 11:32:46 -0700 Subject: [PATCH 04/17] Add temporary vscode config to just run relavent tests --- plugin-server/.vscode/launch.json | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/plugin-server/.vscode/launch.json b/plugin-server/.vscode/launch.json index 0bcb92aa08b47..690dae9c6a3a3 100644 --- a/plugin-server/.vscode/launch.json +++ b/plugin-server/.vscode/launch.json @@ -40,13 +40,15 @@ "request": "launch", "console": "integratedTerminal", "internalConsoleOptions": "neverOpen", - "disableOptimisticBPs": true, - "program": "${workspaceFolder}/node_modules/.bin/jest", + "program": "${workspaceFolder}/node_modules/jest/bin/jest.js", "cwd": "${workspaceFolder}", - "args": ["--runInBand", "--forceExit", "--watchAll=false"], - "env": { - "DATABASE_URL": "postgres://localhost:5432/test_posthog" - } + "args": [ + "${workspaceRoot}/node_modules/.bin/jest", + "--runInBand", + "--watchAll=false", + "./tests/main/process-event.test.ts", + "--testNamePattern='validates UUID passed in eventUUID'" + ] } ] } From 097ea27ed5b329acfedf0e029c41ec96bc14fc7e Mon Sep 17 00:00:00 2001 From: Dave Murphy Date: Thu, 2 Nov 2023 13:56:09 -0700 Subject: [PATCH 05/17] Tests are fixed; was looking for an event that won't be produced --- .../tests/main/process-event.test.ts | 65 ++++++++++--------- 1 file changed, 35 insertions(+), 30 deletions(-) diff --git a/plugin-server/tests/main/process-event.test.ts b/plugin-server/tests/main/process-event.test.ts index 5afc0e334752b..d284ea3c4d9d0 100644 --- a/plugin-server/tests/main/process-event.test.ts +++ b/plugin-server/tests/main/process-event.test.ts @@ -2459,39 +2459,44 @@ test('long event name substr', async () => { expect(event.event?.length).toBe(200) }) -describe('validates UUID passed in eventUuid', () => { - test('throws with invalid uuid string', async () => { - await processEvent( - 'i_am_a_distinct_id', - '', - '', - { - event: 'E', - properties: { price: 299.99, name: 'AirPods Pro', distinct_id: 'distinct_id1' }, - } as any as PluginEvent, - team.id, - now, - 'this is not an uuid' - ) +describe('validates eventUuid', () => { + test('invalid uuid string returns an error', async () => { + const pluginEvent: PluginEvent = { + distinct_id: 'i_am_a_distinct_id', + site_url: '', + team_id: team.id, + timestamp: DateTime.utc().toISO(), + now: now.toUTC().toISO(), + ip: '', + uuid: 'i_am_not_a_uuid', + event: 'eVeNt', + properties: { price: 299.99, name: 'AirPods Pro' }, + } - // expect((await hub.db.fetchEvents()).length).toBe(1) - const [event] = await hub.db.fetchEvents() - expect(event.event).toEqual('Not a valid UUID: "this is not an uuid"') + const runner = new EventPipelineRunner(hub, pluginEvent) + const result = await runner.runEventPipeline(pluginEvent) + + expect(result.error).toBeDefined() + expect(result.error).toEqual('Not a valid UUID: "i_am_not_a_uuid"') }) - test('throws with null value in eventUUID', async () => { - await processEvent( - 'xxx', - '', - '', - { event: 'E', properties: { price: 299.99, name: 'AirPods Pro' } } as any as PluginEvent, - team.id, - DateTime.utc(), - null as any - ) + test('null value in eventUUID returns an error', async () => { + const pluginEvent: PluginEvent = { + distinct_id: 'i_am_a_distinct_id', + site_url: '', + team_id: team.id, + timestamp: DateTime.utc().toISO(), + now: now.toUTC().toISO(), + ip: '', + uuid: null as any, + event: 'eVeNt', + properties: { price: 299.99, name: 'AirPods Pro' }, + } + + const runner = new EventPipelineRunner(hub, pluginEvent) + const result = await runner.runEventPipeline(pluginEvent) - expect((await hub.db.fetchEvents()).length).toBe(1) - const [event] = await hub.db.fetchEvents() - expect(event.event).toEqual('Not a valid UUID: "null"') + expect(result.error).toBeDefined() + expect(result.error).toEqual('Not a valid UUID: "null"') }) }) From 58d64656cbb286b646b23c3b0a0757bea85746bb Mon Sep 17 00:00:00 2001 From: Dave Murphy Date: Thu, 2 Nov 2023 13:57:49 -0700 Subject: [PATCH 06/17] Remove temp vscode debug config --- plugin-server/.vscode/launch.json | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/plugin-server/.vscode/launch.json b/plugin-server/.vscode/launch.json index 690dae9c6a3a3..0bcb92aa08b47 100644 --- a/plugin-server/.vscode/launch.json +++ b/plugin-server/.vscode/launch.json @@ -40,15 +40,13 @@ "request": "launch", "console": "integratedTerminal", "internalConsoleOptions": "neverOpen", - "program": "${workspaceFolder}/node_modules/jest/bin/jest.js", + "disableOptimisticBPs": true, + "program": "${workspaceFolder}/node_modules/.bin/jest", "cwd": "${workspaceFolder}", - "args": [ - "${workspaceRoot}/node_modules/.bin/jest", - "--runInBand", - "--watchAll=false", - "./tests/main/process-event.test.ts", - "--testNamePattern='validates UUID passed in eventUUID'" - ] + "args": ["--runInBand", "--forceExit", "--watchAll=false"], + "env": { + "DATABASE_URL": "postgres://localhost:5432/test_posthog" + } } ] } From a0b8aa116d16cc5814157be423092facbacb288d Mon Sep 17 00:00:00 2001 From: Dave Murphy Date: Thu, 2 Nov 2023 14:00:59 -0700 Subject: [PATCH 07/17] nit: remove added newline --- .../src/worker/ingestion/event-pipeline/prepareEventStep.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/plugin-server/src/worker/ingestion/event-pipeline/prepareEventStep.ts b/plugin-server/src/worker/ingestion/event-pipeline/prepareEventStep.ts index 29bbcc9586c77..e51fa5df4a477 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/prepareEventStep.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/prepareEventStep.ts @@ -7,7 +7,6 @@ import { EventPipelineRunner } from './runner' export async function prepareEventStep(runner: EventPipelineRunner, event: PluginEvent): Promise { const { team_id, uuid } = event - const tsParsingIngestionWarnings: Promise[] = [] const invalidTimestampCallback = function (type: string, details: Record) { // TODO: make that metric name more generic when transitionning to prometheus From 26ab733e1d6abb61be116a909f536743689bdc1c Mon Sep 17 00:00:00 2001 From: Dave Murphy Date: Thu, 2 Nov 2023 14:27:26 -0700 Subject: [PATCH 08/17] Fix another test that wasn't using a proper UUID --- .../tests/main/ingestion-queues/run-ingestion-pipeline.test.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/plugin-server/tests/main/ingestion-queues/run-ingestion-pipeline.test.ts b/plugin-server/tests/main/ingestion-queues/run-ingestion-pipeline.test.ts index d938b95078c68..4ea08342eda2b 100644 --- a/plugin-server/tests/main/ingestion-queues/run-ingestion-pipeline.test.ts +++ b/plugin-server/tests/main/ingestion-queues/run-ingestion-pipeline.test.ts @@ -5,6 +5,7 @@ import { Hub } from '../../../src/types' import { DependencyUnavailableError } from '../../../src/utils/db/error' import { createHub } from '../../../src/utils/db/hub' import { PostgresUse } from '../../../src/utils/db/postgres' +import { UUIDT } from '../../../src/utils/utils' import { runEventPipeline } from '../../../src/worker/ingestion/event-pipeline/runner' import { createOrganization, createTeam, POSTGRES_DELETE_TABLES_QUERY } from '../../helpers/sql' @@ -57,7 +58,7 @@ describe('workerTasks.runEventPipeline()', () => { properties: {}, site_url: 'https://example.com', now: new Date().toISOString(), - uuid: 'uuid', + uuid: new UUIDT().toString(), }) ).rejects.toEqual(new DependencyUnavailableError(errorMessage, 'Postgres', new Error(errorMessage))) pgQueryMock.mockRestore() From 977ba016186a4890f32d83d2d3a6cc2d873bb417 Mon Sep 17 00:00:00 2001 From: Dave Murphy Date: Tue, 31 Oct 2023 15:42:36 -0700 Subject: [PATCH 09/17] Move UUID validation from `process` step to `prepare` step --- .../ingestion/event-pipeline/prepareEventStep.ts | 10 ++++++++++ plugin-server/src/worker/ingestion/process-event.ts | 9 +-------- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/plugin-server/src/worker/ingestion/event-pipeline/prepareEventStep.ts b/plugin-server/src/worker/ingestion/event-pipeline/prepareEventStep.ts index e51fa5df4a477..ad6ef5e9d0a69 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/prepareEventStep.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/prepareEventStep.ts @@ -1,12 +1,22 @@ import { PluginEvent } from '@posthog/plugin-scaffold' import { PreIngestionEvent } from 'types' +import { UUID } from '../../../utils/utils' import { parseEventTimestamp } from '../timestamps' import { captureIngestionWarning } from '../utils' import { EventPipelineRunner } from './runner' export async function prepareEventStep(runner: EventPipelineRunner, event: PluginEvent): Promise { const { team_id, uuid } = event + const { db } = runner.hub + + if (!UUID.validateString(uuid, false)) { + await captureIngestionWarning(db, team_id, 'skipping_event_invalid_uuid', { + eventUuid: JSON.stringify(uuid), + }) + throw new Error(`Not a valid UUID: "${uuid}"`) + } + const tsParsingIngestionWarnings: Promise[] = [] const invalidTimestampCallback = function (type: string, details: Record) { // TODO: make that metric name more generic when transitionning to prometheus diff --git a/plugin-server/src/worker/ingestion/process-event.ts b/plugin-server/src/worker/ingestion/process-event.ts index 22997a7f857a1..8ca1ea76988dd 100644 --- a/plugin-server/src/worker/ingestion/process-event.ts +++ b/plugin-server/src/worker/ingestion/process-event.ts @@ -28,13 +28,12 @@ import { KafkaProducerWrapper } from '../../utils/db/kafka-producer-wrapper' import { safeClickhouseString, sanitizeEventName, timeoutGuard } from '../../utils/db/utils' import { status } from '../../utils/status' import { MessageSizeTooLargeWarningLimiter } from '../../utils/token-bucket' -import { castTimestampOrNow, UUID } from '../../utils/utils' +import { castTimestampOrNow } from '../../utils/utils' import { GroupTypeManager } from './group-type-manager' import { addGroupProperties } from './groups' import { upsertGroup } from './properties-updater' import { PropertyDefinitionsManager } from './property-definitions-manager' import { TeamManager } from './team-manager' -import { captureIngestionWarning } from './utils' export class EventsProcessor { pluginsServer: Hub @@ -68,12 +67,6 @@ export class EventsProcessor { timestamp: DateTime, eventUuid: string ): Promise { - if (!UUID.validateString(eventUuid, false)) { - await captureIngestionWarning(this.db, teamId, 'skipping_event_invalid_uuid', { - eventUuid: JSON.stringify(eventUuid), - }) - throw new Error(`Not a valid UUID: "${eventUuid}"`) - } const singleSaveTimer = new Date() const timeout = timeoutGuard('Still inside "EventsProcessor.processEvent". Timeout warning after 30 sec!', { event: JSON.stringify(data), From 197a5dae956af9af024c5417cb93071b627f518e Mon Sep 17 00:00:00 2001 From: Dave Murphy Date: Thu, 2 Nov 2023 11:30:16 -0700 Subject: [PATCH 10/17] Move uuid validation all the way up to `populateTeamData` --- .../event-pipeline/populateTeamDataStep.ts | 21 +++++++++++++++++++ .../event-pipeline/prepareEventStep.ts | 9 -------- 2 files changed, 21 insertions(+), 9 deletions(-) diff --git a/plugin-server/src/worker/ingestion/event-pipeline/populateTeamDataStep.ts b/plugin-server/src/worker/ingestion/event-pipeline/populateTeamDataStep.ts index 740177e2c4f61..23f1e2c418a64 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/populateTeamDataStep.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/populateTeamDataStep.ts @@ -3,6 +3,8 @@ import { Counter } from 'prom-client' import { eventDroppedCounter } from '../../../main/ingestion-queues/metrics' import { PipelineEvent } from '../../../types' +import { UUID } from '../../../utils/utils' +import { captureIngestionWarning } from '../utils' import { EventPipelineRunner } from './runner' export const tokenOrTeamPresentCounter = new Counter({ @@ -23,6 +25,8 @@ export async function populateTeamDataStep( * For these, we trust the team_id field value. */ + const { db } = runner.hub + // Collect statistics on the shape of events we are ingesting. tokenOrTeamPresentCounter .labels({ @@ -30,6 +34,7 @@ export async function populateTeamDataStep( token_present: event.token ? 'true' : 'false', }) .inc() + // statsd copy as prometheus is currently not supported in worker threads. runner.hub.statsd?.increment('ingestion_event_hasauthinfo', { team_id_present: event.team_id ? 'true' : 'false', @@ -38,6 +43,14 @@ export async function populateTeamDataStep( // If a team_id is present (event captured from an app), trust it and return the event as is. if (event.team_id) { + // Check for an invalid UUID, which should be blocked by capture, when team_id is present + if (!UUID.validateString(event.uuid, false)) { + await captureIngestionWarning(db, event.team_id, 'skipping_event_invalid_uuid', { + eventUuid: JSON.stringify(event.uuid), + }) + throw new Error(`Not a valid UUID: "${event.uuid}"`) + } + return event as PluginEvent } @@ -69,6 +82,14 @@ export async function populateTeamDataStep( return null } + // Check for an invalid UUID, which should be blocked by capture, when team_id is present + if (!UUID.validateString(event.uuid, false)) { + await captureIngestionWarning(db, team.id, 'skipping_event_invalid_uuid', { + eventUuid: JSON.stringify(event.uuid), + }) + throw new Error(`Not a valid UUID: "${event.uuid}"`) + } + event = { ...event, team_id: team.id, diff --git a/plugin-server/src/worker/ingestion/event-pipeline/prepareEventStep.ts b/plugin-server/src/worker/ingestion/event-pipeline/prepareEventStep.ts index ad6ef5e9d0a69..29bbcc9586c77 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/prepareEventStep.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/prepareEventStep.ts @@ -1,21 +1,12 @@ import { PluginEvent } from '@posthog/plugin-scaffold' import { PreIngestionEvent } from 'types' -import { UUID } from '../../../utils/utils' import { parseEventTimestamp } from '../timestamps' import { captureIngestionWarning } from '../utils' import { EventPipelineRunner } from './runner' export async function prepareEventStep(runner: EventPipelineRunner, event: PluginEvent): Promise { const { team_id, uuid } = event - const { db } = runner.hub - - if (!UUID.validateString(uuid, false)) { - await captureIngestionWarning(db, team_id, 'skipping_event_invalid_uuid', { - eventUuid: JSON.stringify(uuid), - }) - throw new Error(`Not a valid UUID: "${uuid}"`) - } const tsParsingIngestionWarnings: Promise[] = [] const invalidTimestampCallback = function (type: string, details: Record) { From f5492fc1bd0fca2f4c929f2524b0f6470efb1068 Mon Sep 17 00:00:00 2001 From: Dave Murphy Date: Thu, 2 Nov 2023 11:32:07 -0700 Subject: [PATCH 11/17] Update tests to use `processEvent` instead of `eventsProcessor.processEvent` since we moved the logic --- .../tests/main/process-event.test.ts | 34 +++++++++++++------ 1 file changed, 24 insertions(+), 10 deletions(-) diff --git a/plugin-server/tests/main/process-event.test.ts b/plugin-server/tests/main/process-event.test.ts index 786acbbd7698e..5afc0e334752b 100644 --- a/plugin-server/tests/main/process-event.test.ts +++ b/plugin-server/tests/main/process-event.test.ts @@ -2459,26 +2459,40 @@ test('long event name substr', async () => { expect(event.event?.length).toBe(200) }) -test('throws with bad uuid', async () => { - await expect( - eventsProcessor.processEvent( - 'xxx', - { event: 'E', properties: { price: 299.99, name: 'AirPods Pro' } } as any as PluginEvent, +describe('validates UUID passed in eventUuid', () => { + test('throws with invalid uuid string', async () => { + await processEvent( + 'i_am_a_distinct_id', + '', + '', + { + event: 'E', + properties: { price: 299.99, name: 'AirPods Pro', distinct_id: 'distinct_id1' }, + } as any as PluginEvent, team.id, - DateTime.utc(), + now, 'this is not an uuid' ) - ).rejects.toEqual(new Error('Not a valid UUID: "this is not an uuid"')) - await expect( - eventsProcessor.processEvent( + // expect((await hub.db.fetchEvents()).length).toBe(1) + const [event] = await hub.db.fetchEvents() + expect(event.event).toEqual('Not a valid UUID: "this is not an uuid"') + }) + test('throws with null value in eventUUID', async () => { + await processEvent( 'xxx', + '', + '', { event: 'E', properties: { price: 299.99, name: 'AirPods Pro' } } as any as PluginEvent, team.id, DateTime.utc(), null as any ) - ).rejects.toEqual(new Error('Not a valid UUID: "null"')) + + expect((await hub.db.fetchEvents()).length).toBe(1) + const [event] = await hub.db.fetchEvents() + expect(event.event).toEqual('Not a valid UUID: "null"') + }) }) test('any event can do $set on props (user exists)', async () => { From 45b94d2fed57ae3a9c44c2b3ba5c85a2afc49301 Mon Sep 17 00:00:00 2001 From: Dave Murphy Date: Thu, 2 Nov 2023 11:32:46 -0700 Subject: [PATCH 12/17] Add temporary vscode config to just run relavent tests --- plugin-server/.vscode/launch.json | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/plugin-server/.vscode/launch.json b/plugin-server/.vscode/launch.json index 0bcb92aa08b47..690dae9c6a3a3 100644 --- a/plugin-server/.vscode/launch.json +++ b/plugin-server/.vscode/launch.json @@ -40,13 +40,15 @@ "request": "launch", "console": "integratedTerminal", "internalConsoleOptions": "neverOpen", - "disableOptimisticBPs": true, - "program": "${workspaceFolder}/node_modules/.bin/jest", + "program": "${workspaceFolder}/node_modules/jest/bin/jest.js", "cwd": "${workspaceFolder}", - "args": ["--runInBand", "--forceExit", "--watchAll=false"], - "env": { - "DATABASE_URL": "postgres://localhost:5432/test_posthog" - } + "args": [ + "${workspaceRoot}/node_modules/.bin/jest", + "--runInBand", + "--watchAll=false", + "./tests/main/process-event.test.ts", + "--testNamePattern='validates UUID passed in eventUUID'" + ] } ] } From c9c54492c293cf113572c7619a54da5b067f89b7 Mon Sep 17 00:00:00 2001 From: Dave Murphy Date: Thu, 2 Nov 2023 13:56:09 -0700 Subject: [PATCH 13/17] Tests are fixed; was looking for an event that won't be produced --- .../tests/main/process-event.test.ts | 65 ++++++++++--------- 1 file changed, 35 insertions(+), 30 deletions(-) diff --git a/plugin-server/tests/main/process-event.test.ts b/plugin-server/tests/main/process-event.test.ts index 5afc0e334752b..d284ea3c4d9d0 100644 --- a/plugin-server/tests/main/process-event.test.ts +++ b/plugin-server/tests/main/process-event.test.ts @@ -2459,39 +2459,44 @@ test('long event name substr', async () => { expect(event.event?.length).toBe(200) }) -describe('validates UUID passed in eventUuid', () => { - test('throws with invalid uuid string', async () => { - await processEvent( - 'i_am_a_distinct_id', - '', - '', - { - event: 'E', - properties: { price: 299.99, name: 'AirPods Pro', distinct_id: 'distinct_id1' }, - } as any as PluginEvent, - team.id, - now, - 'this is not an uuid' - ) +describe('validates eventUuid', () => { + test('invalid uuid string returns an error', async () => { + const pluginEvent: PluginEvent = { + distinct_id: 'i_am_a_distinct_id', + site_url: '', + team_id: team.id, + timestamp: DateTime.utc().toISO(), + now: now.toUTC().toISO(), + ip: '', + uuid: 'i_am_not_a_uuid', + event: 'eVeNt', + properties: { price: 299.99, name: 'AirPods Pro' }, + } - // expect((await hub.db.fetchEvents()).length).toBe(1) - const [event] = await hub.db.fetchEvents() - expect(event.event).toEqual('Not a valid UUID: "this is not an uuid"') + const runner = new EventPipelineRunner(hub, pluginEvent) + const result = await runner.runEventPipeline(pluginEvent) + + expect(result.error).toBeDefined() + expect(result.error).toEqual('Not a valid UUID: "i_am_not_a_uuid"') }) - test('throws with null value in eventUUID', async () => { - await processEvent( - 'xxx', - '', - '', - { event: 'E', properties: { price: 299.99, name: 'AirPods Pro' } } as any as PluginEvent, - team.id, - DateTime.utc(), - null as any - ) + test('null value in eventUUID returns an error', async () => { + const pluginEvent: PluginEvent = { + distinct_id: 'i_am_a_distinct_id', + site_url: '', + team_id: team.id, + timestamp: DateTime.utc().toISO(), + now: now.toUTC().toISO(), + ip: '', + uuid: null as any, + event: 'eVeNt', + properties: { price: 299.99, name: 'AirPods Pro' }, + } + + const runner = new EventPipelineRunner(hub, pluginEvent) + const result = await runner.runEventPipeline(pluginEvent) - expect((await hub.db.fetchEvents()).length).toBe(1) - const [event] = await hub.db.fetchEvents() - expect(event.event).toEqual('Not a valid UUID: "null"') + expect(result.error).toBeDefined() + expect(result.error).toEqual('Not a valid UUID: "null"') }) }) From 5d6f91f5c48f6ec585b9222ea7f05864862f8160 Mon Sep 17 00:00:00 2001 From: Dave Murphy Date: Thu, 2 Nov 2023 13:57:49 -0700 Subject: [PATCH 14/17] Remove temp vscode debug config --- plugin-server/.vscode/launch.json | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/plugin-server/.vscode/launch.json b/plugin-server/.vscode/launch.json index 690dae9c6a3a3..0bcb92aa08b47 100644 --- a/plugin-server/.vscode/launch.json +++ b/plugin-server/.vscode/launch.json @@ -40,15 +40,13 @@ "request": "launch", "console": "integratedTerminal", "internalConsoleOptions": "neverOpen", - "program": "${workspaceFolder}/node_modules/jest/bin/jest.js", + "disableOptimisticBPs": true, + "program": "${workspaceFolder}/node_modules/.bin/jest", "cwd": "${workspaceFolder}", - "args": [ - "${workspaceRoot}/node_modules/.bin/jest", - "--runInBand", - "--watchAll=false", - "./tests/main/process-event.test.ts", - "--testNamePattern='validates UUID passed in eventUUID'" - ] + "args": ["--runInBand", "--forceExit", "--watchAll=false"], + "env": { + "DATABASE_URL": "postgres://localhost:5432/test_posthog" + } } ] } From 591fa2b7385059949f78241c4470933f988dd2e8 Mon Sep 17 00:00:00 2001 From: Dave Murphy Date: Thu, 2 Nov 2023 14:00:59 -0700 Subject: [PATCH 15/17] nit: remove added newline --- .../src/worker/ingestion/event-pipeline/prepareEventStep.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/plugin-server/src/worker/ingestion/event-pipeline/prepareEventStep.ts b/plugin-server/src/worker/ingestion/event-pipeline/prepareEventStep.ts index 29bbcc9586c77..e51fa5df4a477 100644 --- a/plugin-server/src/worker/ingestion/event-pipeline/prepareEventStep.ts +++ b/plugin-server/src/worker/ingestion/event-pipeline/prepareEventStep.ts @@ -7,7 +7,6 @@ import { EventPipelineRunner } from './runner' export async function prepareEventStep(runner: EventPipelineRunner, event: PluginEvent): Promise { const { team_id, uuid } = event - const tsParsingIngestionWarnings: Promise[] = [] const invalidTimestampCallback = function (type: string, details: Record) { // TODO: make that metric name more generic when transitionning to prometheus From 62766d6773252c7467cc774fa9829de8d2d72fe3 Mon Sep 17 00:00:00 2001 From: Dave Murphy Date: Thu, 2 Nov 2023 14:27:26 -0700 Subject: [PATCH 16/17] Fix another test that wasn't using a proper UUID --- .../tests/main/ingestion-queues/run-ingestion-pipeline.test.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/plugin-server/tests/main/ingestion-queues/run-ingestion-pipeline.test.ts b/plugin-server/tests/main/ingestion-queues/run-ingestion-pipeline.test.ts index d938b95078c68..4ea08342eda2b 100644 --- a/plugin-server/tests/main/ingestion-queues/run-ingestion-pipeline.test.ts +++ b/plugin-server/tests/main/ingestion-queues/run-ingestion-pipeline.test.ts @@ -5,6 +5,7 @@ import { Hub } from '../../../src/types' import { DependencyUnavailableError } from '../../../src/utils/db/error' import { createHub } from '../../../src/utils/db/hub' import { PostgresUse } from '../../../src/utils/db/postgres' +import { UUIDT } from '../../../src/utils/utils' import { runEventPipeline } from '../../../src/worker/ingestion/event-pipeline/runner' import { createOrganization, createTeam, POSTGRES_DELETE_TABLES_QUERY } from '../../helpers/sql' @@ -57,7 +58,7 @@ describe('workerTasks.runEventPipeline()', () => { properties: {}, site_url: 'https://example.com', now: new Date().toISOString(), - uuid: 'uuid', + uuid: new UUIDT().toString(), }) ).rejects.toEqual(new DependencyUnavailableError(errorMessage, 'Postgres', new Error(errorMessage))) pgQueryMock.mockRestore() From ac34983306847b9824e2b9c9167544e34b79b987 Mon Sep 17 00:00:00 2001 From: Dave Murphy Date: Tue, 7 Nov 2023 11:29:37 -0800 Subject: [PATCH 17/17] Add missing import after git stupidity --- plugin-server/src/worker/ingestion/process-event.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/plugin-server/src/worker/ingestion/process-event.ts b/plugin-server/src/worker/ingestion/process-event.ts index 8ca1ea76988dd..b85f13dd6e53a 100644 --- a/plugin-server/src/worker/ingestion/process-event.ts +++ b/plugin-server/src/worker/ingestion/process-event.ts @@ -34,6 +34,7 @@ import { addGroupProperties } from './groups' import { upsertGroup } from './properties-updater' import { PropertyDefinitionsManager } from './property-definitions-manager' import { TeamManager } from './team-manager' +import { captureIngestionWarning } from './utils' export class EventsProcessor { pluginsServer: Hub