Skip to content

Commit

Permalink
fix(plugin-server): Remove Postgres-based plugin error logging in fav…
Browse files Browse the repository at this point in the history
…or of existing ClickHouse-based approaches (#18764)
  • Loading branch information
tkaemming authored Nov 27, 2023
1 parent f0d531b commit 9299aa0
Show file tree
Hide file tree
Showing 17 changed files with 227 additions and 255 deletions.
19 changes: 16 additions & 3 deletions plugin-server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ Let's get you developing the plugin server in no time:

1. Prepare for running functional tests. See notes below.

## Functional tests
### Running Functional Tests

Functional tests are provided located in `functional_tests`. They provide tests
for high level functionality of the plugin-server, i.e. functionality that any
Expand All @@ -47,8 +47,21 @@ testing:

1. run docker `docker compose -f docker-compose.dev.yml up` (in posthog folder)
1. setup the test DBs `pnpm setup:test`
1. start the plugin-server with `CLICKHOUSE_DATABASE='default' DATABASE_URL=postgres://posthog:posthog@localhost:5432/test_posthog RELOAD_PLUGIN_JITTER_MAX_MS=0 pnpm start:dev`
1. run the tests with `CLICKHOUSE_DATABASE='default' DATABASE_URL=postgres://posthog:posthog@localhost:5432/test_posthog pnpm functional_tests --watch`
1. start the plugin-server:
```bash
APP_METRICS_FLUSH_FREQUENCY_MS=0 \
CLICKHOUSE_DATABASE='default' \
DATABASE_URL=postgres://posthog:posthog@localhost:5432/test_posthog \
PLUGINS_DEFAULT_LOG_LEVEL=0 \
RELOAD_PLUGIN_JITTER_MAX_MS=0 \
pnpm start:dev
```
1. run the tests:
```bash
CLICKHOUSE_DATABASE='default' \
DATABASE_URL=postgres://posthog:posthog@localhost:5432/test_posthog \
pnpm functional_tests --watch
```

## CLI flags

Expand Down
1 change: 1 addition & 0 deletions plugin-server/bin/ci_functional_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export WORKER_CONCURRENCY=1
export CONVERSION_BUFFER_ENABLED=true
export BUFFER_CONVERSION_SECONDS=2 # Make sure we don't have to wait for the default 60 seconds
export KAFKA_MAX_MESSAGE_BATCH_SIZE=0
export APP_METRICS_FLUSH_FREQUENCY_MS=0 # Reduce the potential for spurious errors in tests that wait for metrics
export APP_METRICS_GATHERED_FOR_ALL=true
export PLUGINS_DEFAULT_LOG_LEVEL=0 # All logs, as debug logs are used in synchronization barriers
export NODE_ENV=production-functional-tests
Expand Down
11 changes: 10 additions & 1 deletion plugin-server/functional_tests/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {
import { PostgresRouter, PostgresUse } from '../src/utils/db/postgres'
import { parseRawClickHouseEvent } from '../src/utils/event'
import { createPostgresPool, UUIDT } from '../src/utils/utils'
import { RawAppMetric } from '../src/worker/ingestion/app-metrics'
import { insertRow } from '../tests/helpers/sql'
import { waitForExpect } from './expectations'
import { produce } from './kafka'
Expand Down Expand Up @@ -151,7 +152,7 @@ export const createPlugin = async (plugin: Omit<Plugin, 'id'>) => {
}

export const createPluginConfig = async (
pluginConfig: Omit<PluginConfig, 'id' | 'created_at' | 'enabled' | 'order' | 'has_error'>,
pluginConfig: Omit<PluginConfig, 'id' | 'created_at' | 'enabled' | 'order'>,
enabled = true
) => {
return await insertRow(postgres, 'posthog_pluginconfig', {
Expand Down Expand Up @@ -330,6 +331,14 @@ export const fetchPluginLogEntries = async (pluginConfigId: number) => {
return logEntries
}

export const fetchPluginAppMetrics = async (pluginConfigId: number) => {
const { data: appMetrics } = (await clickHouseClient.querying(`
SELECT * FROM app_metrics
WHERE plugin_config_id = ${pluginConfigId} ORDER BY timestamp
`)) as unknown as ClickHouse.ObjectQueryResult<RawAppMetric>
return appMetrics
}

export const createOrganization = async (organizationProperties = {}) => {
const organizationId = new UUIDT().toString()
await insertRow(postgres, 'posthog_organization', {
Expand Down
195 changes: 92 additions & 103 deletions plugin-server/functional_tests/plugins.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { v4 as uuid4 } from 'uuid'

import { ONE_HOUR } from '../src/config/constants'
import { PluginLogEntryType } from '../src/types'
import { UUIDT } from '../src/utils/utils'
import { getCacheKey } from '../src/worker/vm/extensions/cache'
import {
Expand All @@ -13,7 +14,9 @@ import {
createTeam,
enablePluginConfig,
fetchEvents,
fetchPluginAppMetrics,
fetchPluginConsoleLogEntries,
fetchPluginLogEntries,
fetchPostgresPersons,
getPluginConfig,
redis,
Expand Down Expand Up @@ -90,133 +93,119 @@ test.concurrent(`plugin method tests: event captured, processed, ingested`, asyn
})
})

test.concurrent(`plugin method tests: creates error on unhandled throw`, async () => {
const plugin = await createPlugin({
organization_id: organizationId,
name: 'test plugin',
plugin_type: 'source',
is_global: false,
source__index_ts: `
test.concurrent(
`plugin method tests: records error in app metrics and creates log entry on unhandled throw`,
async () => {
const plugin = await createPlugin({
organization_id: organizationId,
name: 'test plugin',
plugin_type: 'source',
is_global: false,
source__index_ts: `
export async function processEvent(event) {
throw new Error('error thrown in plugin')
}
`,
})
const teamId = await createTeam(organizationId)
const pluginConfig = await createAndReloadPluginConfig(teamId, plugin.id)

const distinctId = new UUIDT().toString()
const uuid = new UUIDT().toString()
})
const teamId = await createTeam(organizationId)
const pluginConfig = await createAndReloadPluginConfig(teamId, plugin.id)

const event = {
event: 'custom event',
// NOTE: Before `sanitizeJsonbValue` was added, the null byte below would blow up the error
// UPDATE, breaking this test. It is now replaced with the Unicode replacement character,
// \uFFFD.
properties: { name: 'haha', other: '\u0000' },
}
const distinctId = new UUIDT().toString()
const uuid = new UUIDT().toString()

await capture({ teamId, distinctId, uuid, event: event.event, properties: event.properties })
const event = {
event: 'custom event',
properties: { name: 'haha', other: '\u0000' },
}

await waitForExpect(async () => {
const events = await fetchEvents(teamId)
expect(events.length).toBe(1)
return events
})
await capture({ teamId, distinctId, uuid, event: event.event, properties: event.properties })

const error = await waitForExpect(async () => {
const pluginConfigAgain = await getPluginConfig(teamId, pluginConfig.id)
expect(pluginConfigAgain.error).not.toBeNull()
return pluginConfigAgain.error
})
await waitForExpect(async () => {
const events = await fetchEvents(teamId)
expect(events.length).toBe(1)
return events
})

expect(error.message).toEqual('error thrown in plugin')
const errorProperties = error.event.properties
expect(errorProperties.name).toEqual('haha')
expect(errorProperties.other).toEqual('\uFFFD')
})
const appMetric = await waitForExpect(async () => {
const errorMetrics = await fetchPluginAppMetrics(pluginConfig.id)
expect(errorMetrics.length).toEqual(1)
return errorMetrics[0]
})

test.concurrent(`plugin method tests: creates error on unhandled rejection`, async () => {
const plugin = await createPlugin({
organization_id: organizationId,
name: 'test plugin',
plugin_type: 'source',
is_global: false,
source__index_ts: `
export async function processEvent(event) {
void new Promise((_, rejects) => { rejects(new Error('error thrown in plugin')) }).then(() => {})
return event
}
`,
})
const teamId = await createTeam(organizationId)
const pluginConfig = await createAndReloadPluginConfig(teamId, plugin.id)
expect(appMetric.successes).toEqual(0)
expect(appMetric.failures).toEqual(1)
expect(appMetric.error_type).toEqual('Error')
expect(JSON.parse(appMetric.error_details!)).toMatchObject({
error: { message: 'error thrown in plugin' },
event: { properties: event.properties },
})

const distinctId = new UUIDT().toString()
const uuid = new UUIDT().toString()
const errorLogEntry = await waitForExpect(async () => {
const errorLogEntries = (await fetchPluginLogEntries(pluginConfig.id)).filter(
(entry) => entry.type == PluginLogEntryType.Error
)
expect(errorLogEntries.length).toBe(1)
return errorLogEntries[0]
})

const event = {
event: 'custom event',
properties: { name: 'haha' },
expect(errorLogEntry.message).toContain('error thrown in plugin')
}
)

await capture({ teamId, distinctId, uuid, event: event.event, properties: event.properties })

await waitForExpect(async () => {
const events = await fetchEvents(teamId)
expect(events.length).toBe(1)
return events
})

const error = await waitForExpect(async () => {
const pluginConfigAgain = await getPluginConfig(teamId, pluginConfig.id)
expect(pluginConfigAgain.error).not.toBeNull()
return pluginConfigAgain.error
})

expect(error.message).toEqual('error thrown in plugin')
})

test.concurrent(`plugin method tests: creates error on unhandled promise errors`, async () => {
const plugin = await createPlugin({
organization_id: organizationId,
name: 'test plugin',
plugin_type: 'source',
is_global: false,
source__index_ts: `
test.concurrent(
`plugin method tests: records success in app metrics and creates error log entry on unawaited promise rejection`,
async () => {
const plugin = await createPlugin({
organization_id: organizationId,
name: 'test plugin',
plugin_type: 'source',
is_global: false,
source__index_ts: `
export async function processEvent(event) {
void new Promise(() => { throw new Error('error thrown in plugin') }).then(() => {})
void new Promise(() => { throw new Error('error thrown in plugin') })
return event
}
`,
})
const teamId = await createTeam(organizationId)
const pluginConfig = await createAndReloadPluginConfig(teamId, plugin.id)
})
const teamId = await createTeam(organizationId)
const pluginConfig = await createAndReloadPluginConfig(teamId, plugin.id)

const distinctId = new UUIDT().toString()
const uuid = new UUIDT().toString()
const distinctId = new UUIDT().toString()
const uuid = new UUIDT().toString()

const event = {
event: 'custom event',
properties: { name: 'haha' },
}
const event = {
event: 'custom event',
properties: { name: 'haha' },
}

await capture({ teamId, distinctId, uuid, event: event.event, properties: event.properties })
await capture({ teamId, distinctId, uuid, event: event.event, properties: event.properties })

await waitForExpect(async () => {
const events = await fetchEvents(teamId)
expect(events.length).toBe(1)
return events
})
await waitForExpect(async () => {
const events = await fetchEvents(teamId)
expect(events.length).toBe(1)
return events
})

const error = await waitForExpect(async () => {
const pluginConfigAgain = await getPluginConfig(teamId, pluginConfig.id)
expect(pluginConfigAgain.error).not.toBeNull()
return pluginConfigAgain.error
})
const appMetric = await waitForExpect(async () => {
const appMetrics = await fetchPluginAppMetrics(pluginConfig.id)
expect(appMetrics.length).toEqual(1)
return appMetrics[0]
})

expect(error.message).toEqual('error thrown in plugin')
})
expect(appMetric.successes).toEqual(1)
expect(appMetric.failures).toEqual(0)

const errorLogEntry = await waitForExpect(async () => {
const errorLogEntries = (await fetchPluginLogEntries(pluginConfig.id)).filter(
(entry) => entry.type == PluginLogEntryType.Error
)
expect(errorLogEntries.length).toBe(1)
return errorLogEntries[0]
})

expect(errorLogEntry.message).toContain('error thrown in plugin')
}
)

test.concurrent(`plugin method tests: teardown is called on stateful plugin reload if they are updated`, async () => {
const plugin = await createPlugin({
Expand Down
1 change: 0 additions & 1 deletion plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,6 @@ export interface PluginConfig {
enabled: boolean
order: number
config: Record<string, unknown>
has_error: boolean
attachments?: Record<string, PluginAttachment>
vm?: LazyPluginVM | null
created_at: string
Expand Down
21 changes: 10 additions & 11 deletions plugin-server/src/utils/db/error.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import { PluginEvent, PostHogEvent, ProcessedPluginEvent } from '@posthog/plugin-scaffold'
import { captureException } from '@sentry/node'

import { Hub, PluginConfig, PluginError } from '../../types'
import { setError } from './sql'
import { Hub, PluginConfig, PluginError, PluginLogEntrySource, PluginLogEntryType } from '../../types'

export class DependencyUnavailableError extends Error {
constructor(message: string, dependencyName: string, error: Error) {
Expand Down Expand Up @@ -47,7 +46,7 @@ export async function processError(
throw error
}

const errorJson: PluginError =
const pluginError: PluginError =
typeof error === 'string'
? {
message: error,
Expand All @@ -61,14 +60,14 @@ export async function processError(
event: event,
}

await setError(server, errorJson, pluginConfig)
}

export async function clearError(server: Hub, pluginConfig: PluginConfig): Promise<void> {
// running this may causes weird deadlocks with piscina and vms, so avoiding if possible
if (pluginConfig.has_error) {
await setError(server, null, pluginConfig)
}
await server.db.queuePluginLogEntry({
pluginConfig,
source: PluginLogEntrySource.Plugin,
type: PluginLogEntryType.Error,
message: pluginError.stack ?? pluginError.message,
instanceId: server.instanceId,
timestamp: pluginError.time,
})
}

export function cleanErrorStackTrace(stack: string | undefined): string | undefined {
Expand Down
Loading

0 comments on commit 9299aa0

Please sign in to comment.