Skip to content

Commit

Permalink
fix(plugin-server): fix unicode null byte blowing up the pipeline (#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
bretthoerner authored Oct 30, 2023
1 parent 3962724 commit d164087
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 11 deletions.
2 changes: 1 addition & 1 deletion plugin-server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ testing:

1. run docker `docker compose -f docker-compose.dev.yml up` (in posthog folder)
1. setup the test DBs `pnpm setup:test`
1. start the plugin-server with `CLICKHOUSE_DATABASE='default' DATABASE_URL=postgres://posthog:posthog@localhost:5432/test_posthog pnpm start:dev`
1. start the plugin-server with `CLICKHOUSE_DATABASE='default' DATABASE_URL=postgres://posthog:posthog@localhost:5432/test_posthog RELOAD_PLUGIN_JITTER_MAX_MS=0 pnpm start:dev`
1. run the tests with `CLICKHOUSE_DATABASE='default' DATABASE_URL=postgres://posthog:posthog@localhost:5432/test_posthog pnpm functional_tests --watch`

## CLI flags
Expand Down
8 changes: 7 additions & 1 deletion plugin-server/functional_tests/plugins.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,10 @@ test.concurrent(`plugin method tests: creates error on unhandled throw`, async (

const event = {
event: 'custom event',
properties: { name: 'haha' },
// NOTE: Before `sanitizeJsonbValue` was added, the null byte below would blow up the error
// UPDATE, breaking this test. It is now replaced with the Unicode replacement character,
// \uFFFD.
properties: { name: 'haha', other: '\u0000' },
}

await capture({ teamId, distinctId, uuid, event: event.event, properties: event.properties })
Expand All @@ -125,6 +128,9 @@ test.concurrent(`plugin method tests: creates error on unhandled throw`, async (
})

expect(error.message).toEqual('error thrown in plugin')
const errorProperties = error.event.properties
expect(errorProperties.name).toEqual('haha')
expect(errorProperties.other).toEqual('\uFFFD')
})

test.concurrent(`plugin method tests: creates error on unhandled rejection`, async () => {
Expand Down
9 changes: 5 additions & 4 deletions plugin-server/src/utils/db/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ import { PostgresRouter, PostgresUse, TransactionClient } from './postgres'
import {
generateKafkaPersonUpdateMessage,
safeClickhouseString,
sanitizeJsonbValue,
shouldStoreLog,
timeoutGuard,
unparsePersonPartial,
Expand Down Expand Up @@ -677,9 +678,9 @@ export class DB {
`SELECT * FROM inserted_person;`,
[
createdAt.toISO(),
JSON.stringify(properties),
JSON.stringify(propertiesLastUpdatedAt),
JSON.stringify(propertiesLastOperation),
sanitizeJsonbValue(properties),
sanitizeJsonbValue(propertiesLastUpdatedAt),
sanitizeJsonbValue(propertiesLastOperation),
teamId,
isUserId,
isIdentified,
Expand Down Expand Up @@ -739,7 +740,7 @@ export class DB {
return [person, []]
}

const values = [...updateValues, person.id]
const values = [...updateValues, person.id].map(sanitizeJsonbValue)

// Potentially overriding values badly if there was an update to the person after computing updateValues above
const queryString = `UPDATE posthog_person SET version = COALESCE(version, 0)::numeric + 1, ${Object.keys(
Expand Down
5 changes: 4 additions & 1 deletion plugin-server/src/utils/db/sql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
PluginLogEntryType,
} from '../../types'
import { PostgresUse } from './postgres'
import { sanitizeJsonbValue } from './utils'

function pluginConfigsInForceQuery(specificField?: keyof PluginConfig): string {
const fields = specificField
Expand Down Expand Up @@ -120,7 +121,9 @@ export async function setError(hub: Hub, pluginError: PluginError | null, plugin
await hub.db.postgres.query(
PostgresUse.COMMON_WRITE,
'UPDATE posthog_pluginconfig SET error = $1 WHERE id = $2',
[pluginError, typeof pluginConfig === 'object' ? pluginConfig?.id : pluginConfig],
// NOTE: In theory `onEvent` shouldn't be seeing events that still have the null byte, but
// it's better to be safe than sorry and sanitize the value here as well.
[sanitizeJsonbValue(pluginError), typeof pluginConfig === 'object' ? pluginConfig?.id : pluginConfig],
'updatePluginConfigError'
)
if (pluginError) {
Expand Down
19 changes: 19 additions & 0 deletions plugin-server/src/utils/db/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,3 +154,22 @@ export function safeClickhouseString(str: string): string {
return res.slice(1, res.length - 1) + `\\`
})
}

// JSONB columns may not contain null bytes, so we replace them with the Unicode replacement
// character. This should be called before passing a parameter to a parameterized query. It is
// designed to safely ignore other types, since we have some functions that operate on generic
// parameter arrays.
//
// Objects are JSON serialized to make the replacement safer and less expensive, since we don't have
// to recursively walk the object once its a string. They need to be JSON serialized before sending
// to Postgres anyway.
export function sanitizeJsonbValue(value: any): any {
if (value === null) {
// typeof null is 'object', but we don't want to serialize it into a string below
return value
} else if (typeof value === 'object') {
return JSON.stringify(value).replace(/\\u0000/g, '\\uFFFD')
} else {
return value
}
}
3 changes: 2 additions & 1 deletion plugin-server/tests/sql.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
getPluginRows,
setError,
} from '../src/utils/db/sql'
import { sanitizeJsonbValue } from '../src/utils/db/utils'
import { commonOrganizationId, pluginConfig39 } from './helpers/plugins'
import { resetTestDatabase } from './helpers/sql'

Expand Down Expand Up @@ -143,7 +144,7 @@ describe('sql', () => {
expect(hub.db.postgres.query).toHaveBeenCalledWith(
PostgresUse.COMMON_WRITE,
'UPDATE posthog_pluginconfig SET error = $1 WHERE id = $2',
[pluginError, pluginConfig39.id],
[sanitizeJsonbValue(pluginError), pluginConfig39.id],
'updatePluginConfigError'
)
})
Expand Down
9 changes: 6 additions & 3 deletions plugin-server/tests/worker/ingestion/person-state.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,16 @@ describe('PersonState.update()', () => {
event: '$pageview',
distinct_id: 'new-user',
uuid: event_uuid,
// `null_byte` validates that `sanitizeJsonbValue` is working as expected
properties: { $set: { null_byte: '\u0000' } },
}).updateProperties()
await hub.db.kafkaProducer.flush()

expect(person).toEqual(
expect.objectContaining({
id: expect.any(Number),
uuid: uuid.toString(),
properties: { $creator_event_uuid: event_uuid },
properties: { $creator_event_uuid: event_uuid, null_byte: '\uFFFD' },
created_at: timestamp,
version: 0,
is_identified: false,
Expand Down Expand Up @@ -261,7 +263,7 @@ describe('PersonState.update()', () => {
distinct_id: 'new-user',
properties: {
$set_once: { c: 3, e: 4 },
$set: { b: 4, toString: 1 },
$set: { b: 4, toString: 1, null_byte: '\u0000' },
},
}).updateProperties()
await hub.db.kafkaProducer.flush()
Expand All @@ -270,7 +272,8 @@ describe('PersonState.update()', () => {
expect.objectContaining({
id: expect.any(Number),
uuid: uuid.toString(),
properties: { b: 4, c: 4, e: 4, toString: 1 },
// `null_byte` validates that `sanitizeJsonbValue` is working as expected
properties: { b: 4, c: 4, e: 4, toString: 1, null_byte: '\uFFFD' },
created_at: timestamp,
version: 1,
is_identified: false,
Expand Down

0 comments on commit d164087

Please sign in to comment.