Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: move UUID validation up in pipeline #18317

Merged
merged 18 commits into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
70ef252
Move UUID validation from `process` step to `prepare` step
davemurphysf Oct 31, 2023
1d69564
Move uuid validation all the way up to `populateTeamData`
davemurphysf Nov 2, 2023
b3553dc
Update tests to use `processEvent` instead of `eventsProcessor.proces…
davemurphysf Nov 2, 2023
96e815d
Add temporary vscode config to just run relavent tests
davemurphysf Nov 2, 2023
097ea27
Tests are fixed; was looking for an event that won't be produced
davemurphysf Nov 2, 2023
58d6465
Remove temp vscode debug config
davemurphysf Nov 2, 2023
a0b8aa1
nit: remove added newline
davemurphysf Nov 2, 2023
26ab733
Fix another test that wasn't using a proper UUID
davemurphysf Nov 2, 2023
977ba01
Move UUID validation from `process` step to `prepare` step
davemurphysf Oct 31, 2023
197a5da
Move uuid validation all the way up to `populateTeamData`
davemurphysf Nov 2, 2023
f5492fc
Update tests to use `processEvent` instead of `eventsProcessor.proces…
davemurphysf Nov 2, 2023
45b94d2
Add temporary vscode config to just run relavent tests
davemurphysf Nov 2, 2023
c9c5449
Tests are fixed; was looking for an event that won't be produced
davemurphysf Nov 2, 2023
5d6f91f
Remove temp vscode debug config
davemurphysf Nov 2, 2023
591fa2b
nit: remove added newline
davemurphysf Nov 2, 2023
62766d6
Fix another test that wasn't using a proper UUID
davemurphysf Nov 2, 2023
dccd0f5
Merge branch 'dave/move-uuid-check' of https://github.com/PostHog/pos…
Nov 7, 2023
ac34983
Add missing import after git stupidity
Nov 7, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import { Counter } from 'prom-client'

import { eventDroppedCounter } from '../../../main/ingestion-queues/metrics'
import { PipelineEvent } from '../../../types'
import { UUID } from '../../../utils/utils'
import { captureIngestionWarning } from '../utils'
import { EventPipelineRunner } from './runner'

export const tokenOrTeamPresentCounter = new Counter({
Expand All @@ -23,13 +25,16 @@ export async function populateTeamDataStep(
* For these, we trust the team_id field value.
*/

const { db } = runner.hub

// Collect statistics on the shape of events we are ingesting.
tokenOrTeamPresentCounter
.labels({
team_id_present: event.team_id ? 'true' : 'false',
token_present: event.token ? 'true' : 'false',
})
.inc()

// statsd copy as prometheus is currently not supported in worker threads.
runner.hub.statsd?.increment('ingestion_event_hasauthinfo', {
team_id_present: event.team_id ? 'true' : 'false',
Expand All @@ -38,6 +43,14 @@ export async function populateTeamDataStep(

// If a team_id is present (event captured from an app), trust it and return the event as is.
if (event.team_id) {
// Check for an invalid UUID, which should be blocked by capture, when team_id is present
if (!UUID.validateString(event.uuid, false)) {
await captureIngestionWarning(db, event.team_id, 'skipping_event_invalid_uuid', {
eventUuid: JSON.stringify(event.uuid),
})
throw new Error(`Not a valid UUID: "${event.uuid}"`)
}

return event as PluginEvent
}

Expand Down Expand Up @@ -69,6 +82,14 @@ export async function populateTeamDataStep(
return null
}

// Check for an invalid UUID, which should be blocked by capture, when team_id is present
if (!UUID.validateString(event.uuid, false)) {
await captureIngestionWarning(db, team.id, 'skipping_event_invalid_uuid', {
eventUuid: JSON.stringify(event.uuid),
})
throw new Error(`Not a valid UUID: "${event.uuid}"`)
}

event = {
...event,
team_id: team.id,
Expand Down
9 changes: 1 addition & 8 deletions plugin-server/src/worker/ingestion/process-event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,12 @@ import { elementsToString, extractElements } from '../../utils/db/elements-chain
import { KafkaProducerWrapper } from '../../utils/db/kafka-producer-wrapper'
import { safeClickhouseString, sanitizeEventName, timeoutGuard } from '../../utils/db/utils'
import { status } from '../../utils/status'
import { castTimestampOrNow, UUID } from '../../utils/utils'
import { castTimestampOrNow } from '../../utils/utils'
import { GroupTypeManager } from './group-type-manager'
import { addGroupProperties } from './groups'
import { upsertGroup } from './properties-updater'
import { PropertyDefinitionsManager } from './property-definitions-manager'
import { TeamManager } from './team-manager'
import { captureIngestionWarning } from './utils'

export class EventsProcessor {
pluginsServer: Hub
Expand Down Expand Up @@ -66,12 +65,6 @@ export class EventsProcessor {
timestamp: DateTime,
eventUuid: string
): Promise<PreIngestionEvent> {
if (!UUID.validateString(eventUuid, false)) {
await captureIngestionWarning(this.db, teamId, 'skipping_event_invalid_uuid', {
eventUuid: JSON.stringify(eventUuid),
})
throw new Error(`Not a valid UUID: "${eventUuid}"`)
}
const singleSaveTimer = new Date()
const timeout = timeoutGuard('Still inside "EventsProcessor.processEvent". Timeout warning after 30 sec!', {
event: JSON.stringify(data),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { Hub } from '../../../src/types'
import { DependencyUnavailableError } from '../../../src/utils/db/error'
import { createHub } from '../../../src/utils/db/hub'
import { PostgresUse } from '../../../src/utils/db/postgres'
import { UUIDT } from '../../../src/utils/utils'
import { runEventPipeline } from '../../../src/worker/ingestion/event-pipeline/runner'
import { createOrganization, createTeam, POSTGRES_DELETE_TABLES_QUERY } from '../../helpers/sql'

Expand Down Expand Up @@ -57,7 +58,7 @@ describe('workerTasks.runEventPipeline()', () => {
properties: {},
site_url: 'https://example.com',
now: new Date().toISOString(),
uuid: 'uuid',
uuid: new UUIDT().toString(),
})
).rejects.toEqual(new DependencyUnavailableError(errorMessage, 'Postgres', new Error(errorMessage)))
pgQueryMock.mockRestore()
Expand Down
57 changes: 38 additions & 19 deletions plugin-server/tests/main/process-event.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2459,26 +2459,45 @@ test('long event name substr', async () => {
expect(event.event?.length).toBe(200)
})

test('throws with bad uuid', async () => {
await expect(
eventsProcessor.processEvent(
'xxx',
{ event: 'E', properties: { price: 299.99, name: 'AirPods Pro' } } as any as PluginEvent,
team.id,
DateTime.utc(),
'this is not an uuid'
)
).rejects.toEqual(new Error('Not a valid UUID: "this is not an uuid"'))
describe('validates eventUuid', () => {
test('invalid uuid string returns an error', async () => {
const pluginEvent: PluginEvent = {
distinct_id: 'i_am_a_distinct_id',
site_url: '',
team_id: team.id,
timestamp: DateTime.utc().toISO(),
now: now.toUTC().toISO(),
ip: '',
uuid: 'i_am_not_a_uuid',
event: 'eVeNt',
properties: { price: 299.99, name: 'AirPods Pro' },
}

await expect(
eventsProcessor.processEvent(
'xxx',
{ event: 'E', properties: { price: 299.99, name: 'AirPods Pro' } } as any as PluginEvent,
team.id,
DateTime.utc(),
null as any
)
).rejects.toEqual(new Error('Not a valid UUID: "null"'))
const runner = new EventPipelineRunner(hub, pluginEvent)
const result = await runner.runEventPipeline(pluginEvent)

expect(result.error).toBeDefined()
expect(result.error).toEqual('Not a valid UUID: "i_am_not_a_uuid"')
})
test('null value in eventUUID returns an error', async () => {
const pluginEvent: PluginEvent = {
distinct_id: 'i_am_a_distinct_id',
site_url: '',
team_id: team.id,
timestamp: DateTime.utc().toISO(),
now: now.toUTC().toISO(),
ip: '',
uuid: null as any,
event: 'eVeNt',
properties: { price: 299.99, name: 'AirPods Pro' },
}

const runner = new EventPipelineRunner(hub, pluginEvent)
const result = await runner.runEventPipeline(pluginEvent)

expect(result.error).toBeDefined()
expect(result.error).toEqual('Not a valid UUID: "null"')
})
})

test('any event can do $set on props (user exists)', async () => {
Expand Down
Loading