Skip to content

Commit

Permalink
fix(plugin-server): move null replacement and add test (#21954)
Browse files Browse the repository at this point in the history
  • Loading branch information
bretthoerner authored Apr 29, 2024
1 parent d839a75 commit 981d58f
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@ export async function eachBatchParallelIngestion(
// Process every message sequentially, stash promises to await on later
for (const { message, pluginEvent } of currentBatch) {
try {
pluginEvent.distinct_id = pluginEvent.distinct_id.replaceAll('\u0000', '')
const result = (await retryIfRetriable(async () => {
const runner = new EventPipelineRunner(queue.pluginsServer, pluginEvent)
return await runner.runEventPipeline(pluginEvent)
Expand Down
2 changes: 1 addition & 1 deletion plugin-server/src/utils/event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ export function normalizeProcessPerson(event: PluginEvent, processPerson: boolea
}

export function normalizeEvent(event: PluginEvent): PluginEvent {
event.distinct_id = event.distinct_id?.toString()
event.distinct_id = event.distinct_id?.toString().replace(/\u0000/g, '\uFFFD')

let properties = event.properties ?? {}
if (event['$set']) {
Expand Down
1 change: 0 additions & 1 deletion plugin-server/src/worker/ingestion/process-event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ export class EventsProcessor {
'Still inside "EventsProcessor.processEvent". Timeout warning after 30 sec!',
() => ({ event: JSON.stringify(data) })
)
distinctId = distinctId.replace('\u0000', '')

let result: PreIngestionEvent | null = null
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ import { UUIDT } from '../../../../src/utils/utils'
import { normalizeEventStep } from '../../../../src/worker/ingestion/event-pipeline/normalizeEventStep'
import { createOrganization, createTeam, resetTestDatabase } from '../../../helpers/sql'

// A simple deep copy to ensure we aren't comparing an event object with itself below.
function copy(a: any) {
return JSON.parse(JSON.stringify(a))
}

describe('normalizeEventStep()', () => {
it('normalizes the event with properties set by plugins', async () => {
await resetTestDatabase()
Expand Down Expand Up @@ -34,7 +39,7 @@ describe('normalizeEventStep()', () => {
}

const processPerson = true
const [resEvent, timestamp] = await normalizeEventStep(event, processPerson)
const [resEvent, timestamp] = await normalizeEventStep(copy(event), processPerson)

expect(resEvent).toEqual({
...event,
Expand All @@ -54,6 +59,35 @@ describe('normalizeEventStep()', () => {
expect(timestamp).toEqual(DateTime.fromISO(event.timestamp!, { zone: 'utc' }))
})

it('replaces null byte with unicode replacement character in distinct_id', async () => {
await resetTestDatabase()
const [hub, _] = await createHub()
const organizationId = await createOrganization(hub.db.postgres)
const teamId = await createTeam(hub.db.postgres, organizationId)
const uuid = new UUIDT().toString()
const event = {
distinct_id: '\u0000foo',
ip: null,
site_url: 'http://localhost',
team_id: teamId,
now: '2020-02-23T02:15:00Z',
timestamp: '2020-02-23T02:15:00Z',
event: 'default event',
uuid: uuid,
}

const processPerson = true
const [resEvent, timestamp] = await normalizeEventStep(copy(event), processPerson)

expect(resEvent).toEqual({
...event,
distinct_id: '\uFFFDfoo',
properties: {},
})

expect(timestamp).toEqual(DateTime.fromISO(event.timestamp!, { zone: 'utc' }))
})

it('normalizes $process_person_profile=false events by dropping $set and related', async () => {
await resetTestDatabase()
const [hub, _] = await createHub()
Expand Down Expand Up @@ -88,7 +122,11 @@ describe('normalizeEventStep()', () => {
}

const processPerson = false
const [resEvent, timestamp] = await normalizeEventStep(event, processPerson)
const [resEvent, timestamp] = await normalizeEventStep(copy(event), processPerson)

// These should be gone in the comparison below.
delete (event as any)['$set']
delete (event as any)['$set_once']

expect(resEvent).toEqual({
...event,
Expand Down

0 comments on commit 981d58f

Please sign in to comment.