Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(plugin-server): Remove Postgres-based plugin error logging in favor of existing ClickHouse-based approaches #18764

Merged
merged 25 commits into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
b9c9336
Check if we need to flush after queueing, rather than before -- this …
tkaemming Nov 17, 2023
c4cbd65
Remove UPDATEs from error callback.
tkaemming Nov 16, 2023
b71ea99
Hack together an ugly test that passes, but requires running server w…
tkaemming Nov 17, 2023
8d2a31a
Move error parsing down into helper function.
tkaemming Nov 17, 2023
82f4f16
Update the remainder of the tests (...which error?)
tkaemming Nov 17, 2023
8068ec3
Remove a case so nice it was tested twice...
tkaemming Nov 17, 2023
684c764
Also remove unnecessary chaining from unhandled promise rejection.
tkaemming Nov 17, 2023
29f5f99
Check for log entry, not plugin error.
tkaemming Nov 18, 2023
182130c
Clean up some messy wording.
tkaemming Nov 20, 2023
ff5ce59
Set a reminder for what actually needs to be cleaned up with types here.
tkaemming Nov 20, 2023
8f28d38
Hide the error field from the model serializer.
tkaemming Nov 21, 2023
b30bb45
Clean up(?) types.
tkaemming Nov 21, 2023
92ab3e8
Update functional test script and docs to use `APP_METRICS_FLUSH_FREQ…
tkaemming Nov 21, 2023
b229260
Make tests better.
tkaemming Nov 21, 2023
7e3d94d
Remove `has_error` attribute from `PluginConfig` (also remove `setErr…
tkaemming Nov 21, 2023
185d834
One step forward, one step backward.
tkaemming Nov 21, 2023
afbe6b7
Just reset everything each test run rather than trying to figure out …
tkaemming Nov 21, 2023
ff4d6e9
Sidestep Postgres error column dependency in teardown tests.
tkaemming Nov 21, 2023
9124858
Make tests simultaneously a little better and a little worse somehow.
tkaemming Nov 21, 2023
e96da9d
Remove `setError` completely.
tkaemming Nov 21, 2023
b355cfe
Merge branch 'master' into remove-plugin-error-update
tkaemming Nov 23, 2023
4156eba
Mark the `PluginConfig.error` column as unused.
tkaemming Nov 23, 2023
2a54885
Merge branch 'master' into remove-plugin-error-update
tkaemming Nov 27, 2023
3392e15
Add PLUGINS_DEFAULT_LOG_LEVEL=0 to instructions
tkaemming Nov 27, 2023
b6a4107
Make the README a bit easier to read.
tkaemming Nov 27, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion plugin-server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ testing:

1. run docker `docker compose -f docker-compose.dev.yml up` (in posthog folder)
1. setup the test DBs `pnpm setup:test`
1. start the plugin-server with `CLICKHOUSE_DATABASE='default' DATABASE_URL=postgres://posthog:posthog@localhost:5432/test_posthog RELOAD_PLUGIN_JITTER_MAX_MS=0 pnpm start:dev`
1. start the plugin-server with `APP_METRICS_FLUSH_FREQUENCY_MS=0 CLICKHOUSE_DATABASE='default' DATABASE_URL=postgres://posthog:posthog@localhost:5432/test_posthog RELOAD_PLUGIN_JITTER_MAX_MS=0 pnpm start:dev`
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having this here (and in plugin-server/bin/ci_functional_tests.sh) feels like a hack.

I think the ideal approach would be to have the functional test setup/teardown control the server itself and provide any ad hoc test-specific configurations like this — but without taking the time to better increase test isolation here, I'm not sure that's worth doing. Not sure if there is a better compromise approach here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with you, this smells but we can live with it a bit more, then do a spring cleaning.

BTW PLUGINS_DEFAULT_LOG_LEVEL=0 should also be added here, as it's in the script, and needed by waitForPluginToLoad, could you please add it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this in 3392e15.

1. run the tests with `CLICKHOUSE_DATABASE='default' DATABASE_URL=postgres://posthog:posthog@localhost:5432/test_posthog pnpm functional_tests --watch`

## CLI flags
Expand Down
1 change: 1 addition & 0 deletions plugin-server/bin/ci_functional_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export WORKER_CONCURRENCY=1
export CONVERSION_BUFFER_ENABLED=true
export BUFFER_CONVERSION_SECONDS=2 # Make sure we don't have to wait for the default 60 seconds
export KAFKA_MAX_MESSAGE_BATCH_SIZE=0
export APP_METRICS_FLUSH_FREQUENCY_MS=0 # Reduce the potential for spurious errors in tests that wait for metrics
export APP_METRICS_GATHERED_FOR_ALL=true
export PLUGINS_DEFAULT_LOG_LEVEL=0 # All logs, as debug logs are used in synchronization barriers
export NODE_ENV=production-functional-tests
Expand Down
11 changes: 10 additions & 1 deletion plugin-server/functional_tests/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {
import { PostgresRouter, PostgresUse } from '../src/utils/db/postgres'
import { parseRawClickHouseEvent } from '../src/utils/event'
import { createPostgresPool, UUIDT } from '../src/utils/utils'
import { RawAppMetric } from '../src/worker/ingestion/app-metrics'
import { insertRow } from '../tests/helpers/sql'
import { waitForExpect } from './expectations'
import { produce } from './kafka'
Expand Down Expand Up @@ -151,7 +152,7 @@ export const createPlugin = async (plugin: Omit<Plugin, 'id'>) => {
}

export const createPluginConfig = async (
pluginConfig: Omit<PluginConfig, 'id' | 'created_at' | 'enabled' | 'order' | 'has_error'>,
pluginConfig: Omit<PluginConfig, 'id' | 'created_at' | 'enabled' | 'order'>,
enabled = true
) => {
return await insertRow(postgres, 'posthog_pluginconfig', {
Expand Down Expand Up @@ -330,6 +331,14 @@ export const fetchPluginLogEntries = async (pluginConfigId: number) => {
return logEntries
}

export const fetchPluginAppMetrics = async (pluginConfigId: number) => {
const { data: appMetrics } = (await clickHouseClient.querying(`
SELECT * FROM app_metrics
WHERE plugin_config_id = ${pluginConfigId} ORDER BY timestamp
`)) as unknown as ClickHouse.ObjectQueryResult<RawAppMetric>
return appMetrics
}

export const createOrganization = async (organizationProperties = {}) => {
const organizationId = new UUIDT().toString()
await insertRow(postgres, 'posthog_organization', {
Expand Down
195 changes: 92 additions & 103 deletions plugin-server/functional_tests/plugins.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { v4 as uuid4 } from 'uuid'

import { ONE_HOUR } from '../src/config/constants'
import { PluginLogEntryType } from '../src/types'
import { UUIDT } from '../src/utils/utils'
import { getCacheKey } from '../src/worker/vm/extensions/cache'
import {
Expand All @@ -13,7 +14,9 @@ import {
createTeam,
enablePluginConfig,
fetchEvents,
fetchPluginAppMetrics,
fetchPluginConsoleLogEntries,
fetchPluginLogEntries,
fetchPostgresPersons,
getPluginConfig,
redis,
Expand Down Expand Up @@ -90,133 +93,119 @@ test.concurrent(`plugin method tests: event captured, processed, ingested`, asyn
})
})

test.concurrent(`plugin method tests: creates error on unhandled throw`, async () => {
const plugin = await createPlugin({
organization_id: organizationId,
name: 'test plugin',
plugin_type: 'source',
is_global: false,
source__index_ts: `
test.concurrent(
`plugin method tests: records error in app metrics and creates log entry on unhandled throw`,
async () => {
const plugin = await createPlugin({
organization_id: organizationId,
name: 'test plugin',
plugin_type: 'source',
is_global: false,
source__index_ts: `
export async function processEvent(event) {
throw new Error('error thrown in plugin')
}
`,
})
const teamId = await createTeam(organizationId)
const pluginConfig = await createAndReloadPluginConfig(teamId, plugin.id)

const distinctId = new UUIDT().toString()
const uuid = new UUIDT().toString()
})
const teamId = await createTeam(organizationId)
const pluginConfig = await createAndReloadPluginConfig(teamId, plugin.id)

const event = {
event: 'custom event',
// NOTE: Before `sanitizeJsonbValue` was added, the null byte below would blow up the error
// UPDATE, breaking this test. It is now replaced with the Unicode replacement character,
// \uFFFD.
properties: { name: 'haha', other: '\u0000' },
}
const distinctId = new UUIDT().toString()
const uuid = new UUIDT().toString()

await capture({ teamId, distinctId, uuid, event: event.event, properties: event.properties })
const event = {
event: 'custom event',
properties: { name: 'haha', other: '\u0000' },
}

await waitForExpect(async () => {
const events = await fetchEvents(teamId)
expect(events.length).toBe(1)
return events
})
await capture({ teamId, distinctId, uuid, event: event.event, properties: event.properties })

const error = await waitForExpect(async () => {
const pluginConfigAgain = await getPluginConfig(teamId, pluginConfig.id)
expect(pluginConfigAgain.error).not.toBeNull()
return pluginConfigAgain.error
})
await waitForExpect(async () => {
const events = await fetchEvents(teamId)
expect(events.length).toBe(1)
return events
})

expect(error.message).toEqual('error thrown in plugin')
const errorProperties = error.event.properties
expect(errorProperties.name).toEqual('haha')
expect(errorProperties.other).toEqual('\uFFFD')
})
const appMetric = await waitForExpect(async () => {
const errorMetrics = await fetchPluginAppMetrics(pluginConfig.id)
expect(errorMetrics.length).toEqual(1)
return errorMetrics[0]
})

test.concurrent(`plugin method tests: creates error on unhandled rejection`, async () => {
const plugin = await createPlugin({
organization_id: organizationId,
name: 'test plugin',
plugin_type: 'source',
is_global: false,
source__index_ts: `
export async function processEvent(event) {
void new Promise((_, rejects) => { rejects(new Error('error thrown in plugin')) }).then(() => {})
return event
}
Comment on lines -146 to -149
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding of the Promise API is that this test and the one below it ("plugin method tests: creates error on unhandled promise errors") are functionally identical, so I consolidated them into a single test.

`,
})
const teamId = await createTeam(organizationId)
const pluginConfig = await createAndReloadPluginConfig(teamId, plugin.id)
expect(appMetric.successes).toEqual(0)
expect(appMetric.failures).toEqual(1)
expect(appMetric.error_type).toEqual('Error')
expect(JSON.parse(appMetric.error_details!)).toMatchObject({
error: { message: 'error thrown in plugin' },
event: { properties: event.properties },
})
Comment on lines +129 to +141
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've learned to really like doing a single check, because then if the test fails I see directly what it was instead and it's quicker for me to know what broke, i.e.

       await waitForExpect(async () => {
            const errorMetrics = await fetchPluginAppMetrics(pluginConfig.id)
            expect(errorMetrics).toBe([
            	successes: 0
            	failures: 1
            	error_type: 'Error'
				error_details: ...
            ])
        })

Does this make sense? Not expecting you to change an already merged PR more for maybe let's do this in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, that makes sense! (I do need to spend some time getting more familiar with the different expect matchers so that this comes to mind more naturally.)

Copy link
Contributor

@tiina303 tiina303 Nov 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure my code there is valid btw - I let copilot write it for me normally 😅


const distinctId = new UUIDT().toString()
const uuid = new UUIDT().toString()
const errorLogEntry = await waitForExpect(async () => {
const errorLogEntries = (await fetchPluginLogEntries(pluginConfig.id)).filter(
(entry) => entry.type == PluginLogEntryType.Error
)
expect(errorLogEntries.length).toBe(1)
return errorLogEntries[0]
})

const event = {
event: 'custom event',
properties: { name: 'haha' },
expect(errorLogEntry.message).toContain('error thrown in plugin')
}
)

await capture({ teamId, distinctId, uuid, event: event.event, properties: event.properties })

await waitForExpect(async () => {
const events = await fetchEvents(teamId)
expect(events.length).toBe(1)
return events
})

const error = await waitForExpect(async () => {
const pluginConfigAgain = await getPluginConfig(teamId, pluginConfig.id)
expect(pluginConfigAgain.error).not.toBeNull()
return pluginConfigAgain.error
})

expect(error.message).toEqual('error thrown in plugin')
})

test.concurrent(`plugin method tests: creates error on unhandled promise errors`, async () => {
const plugin = await createPlugin({
organization_id: organizationId,
name: 'test plugin',
plugin_type: 'source',
is_global: false,
source__index_ts: `
test.concurrent(
`plugin method tests: records success in app metrics and creates error log entry on unawaited promise rejection`,
async () => {
const plugin = await createPlugin({
organization_id: organizationId,
name: 'test plugin',
plugin_type: 'source',
is_global: false,
source__index_ts: `
export async function processEvent(event) {
void new Promise(() => { throw new Error('error thrown in plugin') }).then(() => {})
void new Promise(() => { throw new Error('error thrown in plugin') })
return event
}
`,
})
const teamId = await createTeam(organizationId)
const pluginConfig = await createAndReloadPluginConfig(teamId, plugin.id)
})
const teamId = await createTeam(organizationId)
const pluginConfig = await createAndReloadPluginConfig(teamId, plugin.id)

const distinctId = new UUIDT().toString()
const uuid = new UUIDT().toString()
const distinctId = new UUIDT().toString()
const uuid = new UUIDT().toString()

const event = {
event: 'custom event',
properties: { name: 'haha' },
}
const event = {
event: 'custom event',
properties: { name: 'haha' },
}

await capture({ teamId, distinctId, uuid, event: event.event, properties: event.properties })
await capture({ teamId, distinctId, uuid, event: event.event, properties: event.properties })

await waitForExpect(async () => {
const events = await fetchEvents(teamId)
expect(events.length).toBe(1)
return events
})
await waitForExpect(async () => {
const events = await fetchEvents(teamId)
expect(events.length).toBe(1)
return events
})

const error = await waitForExpect(async () => {
const pluginConfigAgain = await getPluginConfig(teamId, pluginConfig.id)
expect(pluginConfigAgain.error).not.toBeNull()
return pluginConfigAgain.error
})
const appMetric = await waitForExpect(async () => {
const appMetrics = await fetchPluginAppMetrics(pluginConfig.id)
expect(appMetrics.length).toEqual(1)
return appMetrics[0]
})

expect(error.message).toEqual('error thrown in plugin')
})
expect(appMetric.successes).toEqual(1)
expect(appMetric.failures).toEqual(0)

const errorLogEntry = await waitForExpect(async () => {
const errorLogEntries = (await fetchPluginLogEntries(pluginConfig.id)).filter(
(entry) => entry.type == PluginLogEntryType.Error
)
expect(errorLogEntries.length).toBe(1)
return errorLogEntries[0]
})

expect(errorLogEntry.message).toContain('error thrown in plugin')
}
Comment on lines +189 to +207
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a subtle distinction here that I hadn't originally considered until making this change: it's possible for a plugin execution to succeed (and be marked as such in app metrics), but also log an error at some later time in a case similar to this one where an un-awaited promise is later rejected.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All plugins will be non-async in the future ... that's my current plan at least.

)

test.concurrent(`plugin method tests: teardown is called on stateful plugin reload if they are updated`, async () => {
const plugin = await createPlugin({
Expand Down
1 change: 0 additions & 1 deletion plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,6 @@ export interface PluginConfig {
enabled: boolean
order: number
config: Record<string, unknown>
has_error: boolean
attachments?: Record<string, PluginAttachment>
vm?: LazyPluginVM | null
created_at: string
Expand Down
21 changes: 10 additions & 11 deletions plugin-server/src/utils/db/error.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import { PluginEvent, PostHogEvent, ProcessedPluginEvent } from '@posthog/plugin-scaffold'
import { captureException } from '@sentry/node'

import { Hub, PluginConfig, PluginError } from '../../types'
import { setError } from './sql'
import { Hub, PluginConfig, PluginError, PluginLogEntrySource, PluginLogEntryType } from '../../types'

export class DependencyUnavailableError extends Error {
constructor(message: string, dependencyName: string, error: Error) {
Expand Down Expand Up @@ -47,7 +46,7 @@ export async function processError(
throw error
}

const errorJson: PluginError =
const pluginError: PluginError =
typeof error === 'string'
? {
message: error,
Expand All @@ -61,14 +60,14 @@ export async function processError(
event: event,
}

await setError(server, errorJson, pluginConfig)
}

export async function clearError(server: Hub, pluginConfig: PluginConfig): Promise<void> {
// running this may causes weird deadlocks with piscina and vms, so avoiding if possible
if (pluginConfig.has_error) {
await setError(server, null, pluginConfig)
}
await server.db.queuePluginLogEntry({
pluginConfig,
source: PluginLogEntrySource.Plugin,
type: PluginLogEntryType.Error,
message: pluginError.stack ?? pluginError.message,
instanceId: server.instanceId,
timestamp: pluginError.time,
})
}

export function cleanErrorStackTrace(stack: string | undefined): string | undefined {
Expand Down
37 changes: 2 additions & 35 deletions plugin-server/src/utils/db/sql.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,5 @@
import {
Hub,
Plugin,
PluginAttachmentDB,
PluginCapabilities,
PluginConfig,
PluginConfigId,
PluginError,
PluginLogEntrySource,
PluginLogEntryType,
} from '../../types'
import { Hub, Plugin, PluginAttachmentDB, PluginCapabilities, PluginConfig, PluginConfigId } from '../../types'
import { PostgresUse } from './postgres'
import { sanitizeJsonbValue } from './utils'

function pluginConfigsInForceQuery(specificField?: keyof PluginConfig): string {
const fields = specificField
Expand All @@ -23,8 +12,7 @@ function pluginConfigsInForceQuery(specificField?: keyof PluginConfig): string {
posthog_pluginconfig.order,
posthog_pluginconfig.config,
posthog_pluginconfig.updated_at,
posthog_pluginconfig.created_at,
posthog_pluginconfig.error IS NOT NULL AS has_error
posthog_pluginconfig.created_at
`

return `SELECT ${fields}
Expand Down Expand Up @@ -117,27 +105,6 @@ export async function setPluginCapabilities(
)
}

export async function setError(hub: Hub, pluginError: PluginError | null, pluginConfig: PluginConfig): Promise<void> {
await hub.db.postgres.query(
PostgresUse.COMMON_WRITE,
'UPDATE posthog_pluginconfig SET error = $1 WHERE id = $2',
// NOTE: In theory `onEvent` shouldn't be seeing events that still have the null byte, but
// it's better to be safe than sorry and sanitize the value here as well.
[sanitizeJsonbValue(pluginError), typeof pluginConfig === 'object' ? pluginConfig?.id : pluginConfig],
'updatePluginConfigError'
)
if (pluginError) {
Comment on lines -121 to -128
Copy link
Contributor Author

@tkaemming tkaemming Nov 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the important part that was removed.

(The queuePluginLogEntry call below was moved up to the original call site for setError in plugin-server/src/utils/db/error.ts.)

await hub.db.queuePluginLogEntry({
pluginConfig,
source: PluginLogEntrySource.Plugin,
type: PluginLogEntryType.Error,
message: pluginError.stack ?? pluginError.message,
instanceId: hub.instanceId,
timestamp: pluginError.time,
})
}
}

export async function disablePlugin(hub: Hub, pluginConfigId: PluginConfigId): Promise<void> {
await hub.db.postgres.query(
PostgresUse.COMMON_WRITE,
Expand Down
Loading
Loading