Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(plugin-server): fix unicode null byte blowing up the pipeline #18282

Merged
merged 1 commit into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion plugin-server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ testing:

1. run docker `docker compose -f docker-compose.dev.yml up` (in posthog folder)
1. setup the test DBs `pnpm setup:test`
1. start the plugin-server with `CLICKHOUSE_DATABASE='default' DATABASE_URL=postgres://posthog:posthog@localhost:5432/test_posthog pnpm start:dev`
1. start the plugin-server with `CLICKHOUSE_DATABASE='default' DATABASE_URL=postgres://posthog:posthog@localhost:5432/test_posthog RELOAD_PLUGIN_JITTER_MAX_MS=0 pnpm start:dev`
1. run the tests with `CLICKHOUSE_DATABASE='default' DATABASE_URL=postgres://posthog:posthog@localhost:5432/test_posthog pnpm functional_tests --watch`

## CLI flags
Expand Down
8 changes: 7 additions & 1 deletion plugin-server/functional_tests/plugins.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,10 @@ test.concurrent(`plugin method tests: creates error on unhandled throw`, async (

const event = {
event: 'custom event',
properties: { name: 'haha' },
// NOTE: Before `sanitizeJsonbValue` was added, the null byte below would blow up the error
// UPDATE, breaking this test. It is now replaced with the Unicode replacement character,
// \uFFFD.
properties: { name: 'haha', other: '\u0000' },
}

await capture({ teamId, distinctId, uuid, event: event.event, properties: event.properties })
Expand All @@ -125,6 +128,9 @@ test.concurrent(`plugin method tests: creates error on unhandled throw`, async (
})

expect(error.message).toEqual('error thrown in plugin')
const errorProperties = error.event.properties
expect(errorProperties.name).toEqual('haha')
expect(errorProperties.other).toEqual('\uFFFD')
})

test.concurrent(`plugin method tests: creates error on unhandled rejection`, async () => {
Expand Down
9 changes: 5 additions & 4 deletions plugin-server/src/utils/db/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ import { PostgresRouter, PostgresUse, TransactionClient } from './postgres'
import {
generateKafkaPersonUpdateMessage,
safeClickhouseString,
sanitizeJsonbValue,
shouldStoreLog,
timeoutGuard,
unparsePersonPartial,
Expand Down Expand Up @@ -677,9 +678,9 @@ export class DB {
`SELECT * FROM inserted_person;`,
[
createdAt.toISO(),
JSON.stringify(properties),
JSON.stringify(propertiesLastUpdatedAt),
JSON.stringify(propertiesLastOperation),
sanitizeJsonbValue(properties),
sanitizeJsonbValue(propertiesLastUpdatedAt),
sanitizeJsonbValue(propertiesLastOperation),
teamId,
isUserId,
isIdentified,
Expand Down Expand Up @@ -739,7 +740,7 @@ export class DB {
return [person, []]
}

const values = [...updateValues, person.id]
const values = [...updateValues, person.id].map(sanitizeJsonbValue)

// Potentially overriding values badly if there was an update to the person after computing updateValues above
const queryString = `UPDATE posthog_person SET version = COALESCE(version, 0)::numeric + 1, ${Object.keys(
Expand Down
5 changes: 4 additions & 1 deletion plugin-server/src/utils/db/sql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
PluginLogEntryType,
} from '../../types'
import { PostgresUse } from './postgres'
import { sanitizeJsonbValue } from './utils'

function pluginConfigsInForceQuery(specificField?: keyof PluginConfig): string {
const fields = specificField
Expand Down Expand Up @@ -120,7 +121,9 @@ export async function setError(hub: Hub, pluginError: PluginError | null, plugin
await hub.db.postgres.query(
PostgresUse.COMMON_WRITE,
'UPDATE posthog_pluginconfig SET error = $1 WHERE id = $2',
[pluginError, typeof pluginConfig === 'object' ? pluginConfig?.id : pluginConfig],
// NOTE: In theory `onEvent` shouldn't be seeing events that still have the null byte, but
// it's better to be safe than sorry and sanitize the value here as well.
[sanitizeJsonbValue(pluginError), typeof pluginConfig === 'object' ? pluginConfig?.id : pluginConfig],
'updatePluginConfigError'
)
if (pluginError) {
Expand Down
19 changes: 19 additions & 0 deletions plugin-server/src/utils/db/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,3 +154,22 @@ export function safeClickhouseString(str: string): string {
return res.slice(1, res.length - 1) + `\\`
})
}

// JSONB columns may not contain null bytes, so we replace them with the Unicode replacement
// character. This should be called before passing a parameter to a parameterized query. It is
// designed to safely ignore other types, since we have some functions that operate on generic
// parameter arrays.
//
// Objects are JSON serialized to make the replacement safer and less expensive, since we don't have
// to recursively walk the object once its a string. They need to be JSON serialized before sending
// to Postgres anyway.
export function sanitizeJsonbValue(value: any): any {
if (value === null) {
// typeof null is 'object', but we don't want to serialize it into a string below
return value
} else if (typeof value === 'object') {
return JSON.stringify(value).replace(/\\u0000/g, '\\uFFFD')
} else {
return value
}
}
3 changes: 2 additions & 1 deletion plugin-server/tests/sql.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
getPluginRows,
setError,
} from '../src/utils/db/sql'
import { sanitizeJsonbValue } from '../src/utils/db/utils'
import { commonOrganizationId, pluginConfig39 } from './helpers/plugins'
import { resetTestDatabase } from './helpers/sql'

Expand Down Expand Up @@ -143,7 +144,7 @@ describe('sql', () => {
expect(hub.db.postgres.query).toHaveBeenCalledWith(
PostgresUse.COMMON_WRITE,
'UPDATE posthog_pluginconfig SET error = $1 WHERE id = $2',
[pluginError, pluginConfig39.id],
[sanitizeJsonbValue(pluginError), pluginConfig39.id],
'updatePluginConfigError'
)
})
Expand Down
9 changes: 6 additions & 3 deletions plugin-server/tests/worker/ingestion/person-state.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,16 @@ describe('PersonState.update()', () => {
event: '$pageview',
distinct_id: 'new-user',
uuid: event_uuid,
// `null_byte` validates that `sanitizeJsonbValue` is working as expected
properties: { $set: { null_byte: '\u0000' } },
}).updateProperties()
await hub.db.kafkaProducer.flush()

expect(person).toEqual(
expect.objectContaining({
id: expect.any(Number),
uuid: uuid.toString(),
properties: { $creator_event_uuid: event_uuid },
properties: { $creator_event_uuid: event_uuid, null_byte: '\uFFFD' },
created_at: timestamp,
version: 0,
is_identified: false,
Expand Down Expand Up @@ -261,7 +263,7 @@ describe('PersonState.update()', () => {
distinct_id: 'new-user',
properties: {
$set_once: { c: 3, e: 4 },
$set: { b: 4, toString: 1 },
$set: { b: 4, toString: 1, null_byte: '\u0000' },
},
}).updateProperties()
await hub.db.kafkaProducer.flush()
Expand All @@ -270,7 +272,8 @@ describe('PersonState.update()', () => {
expect.objectContaining({
id: expect.any(Number),
uuid: uuid.toString(),
properties: { b: 4, c: 4, e: 4, toString: 1 },
// `null_byte` validates that `sanitizeJsonbValue` is working as expected
properties: { b: 4, c: 4, e: 4, toString: 1, null_byte: '\uFFFD' },
created_at: timestamp,
version: 1,
is_identified: false,
Expand Down
Loading