From 9299aa09e5afdd53d2e53b6cdfea2f96ccf4c7ae Mon Sep 17 00:00:00 2001 From: ted kaemming <65315+tkaemming@users.noreply.github.com> Date: Mon, 27 Nov 2023 10:41:36 -0800 Subject: [PATCH] fix(plugin-server): Remove Postgres-based plugin error logging in favor of existing ClickHouse-based approaches (#18764) --- plugin-server/README.md | 19 +- plugin-server/bin/ci_functional_tests.sh | 1 + plugin-server/functional_tests/api.ts | 11 +- .../functional_tests/plugins.test.ts | 195 +++++++++--------- plugin-server/src/types.ts | 1 - plugin-server/src/utils/db/error.ts | 21 +- plugin-server/src/utils/db/sql.ts | 37 +--- .../src/worker/ingestion/app-metrics.ts | 24 ++- plugin-server/src/worker/vm/lazy.ts | 4 +- plugin-server/tests/helpers/sql.ts | 15 +- plugin-server/tests/helpers/sqlMock.ts | 1 - .../run-async-handlers-event-pipeline.test.ts | 17 +- plugin-server/tests/main/teardown.test.ts | 74 +++++-- plugin-server/tests/sql.test.ts | 52 +---- plugin-server/tests/utils/retries.test.ts | 1 - posthog/api/plugin.py | 8 + posthog/models/plugin.py | 1 + 17 files changed, 227 insertions(+), 255 deletions(-) diff --git a/plugin-server/README.md b/plugin-server/README.md index 67ec3042dca4f..b61cba750d9f3 100644 --- a/plugin-server/README.md +++ b/plugin-server/README.md @@ -24,7 +24,7 @@ Let's get you developing the plugin server in no time: 1. Prepare for running functional tests. See notes below. -## Functional tests +### Running Functional Tests Functional tests are provided located in `functional_tests`. They provide tests for high level functionality of the plugin-server, i.e. functionality that any @@ -47,8 +47,21 @@ testing: 1. run docker `docker compose -f docker-compose.dev.yml up` (in posthog folder) 1. setup the test DBs `pnpm setup:test` -1. start the plugin-server with `CLICKHOUSE_DATABASE='default' DATABASE_URL=postgres://posthog:posthog@localhost:5432/test_posthog RELOAD_PLUGIN_JITTER_MAX_MS=0 pnpm start:dev` -1. run the tests with `CLICKHOUSE_DATABASE='default' DATABASE_URL=postgres://posthog:posthog@localhost:5432/test_posthog pnpm functional_tests --watch` +1. start the plugin-server: + ```bash + APP_METRICS_FLUSH_FREQUENCY_MS=0 \ + CLICKHOUSE_DATABASE='default' \ + DATABASE_URL=postgres://posthog:posthog@localhost:5432/test_posthog \ + PLUGINS_DEFAULT_LOG_LEVEL=0 \ + RELOAD_PLUGIN_JITTER_MAX_MS=0 \ + pnpm start:dev + ``` +1. run the tests: + ```bash + CLICKHOUSE_DATABASE='default' \ + DATABASE_URL=postgres://posthog:posthog@localhost:5432/test_posthog \ + pnpm functional_tests --watch + ``` ## CLI flags diff --git a/plugin-server/bin/ci_functional_tests.sh b/plugin-server/bin/ci_functional_tests.sh index 9014a4a249a57..905dc2caadf72 100755 --- a/plugin-server/bin/ci_functional_tests.sh +++ b/plugin-server/bin/ci_functional_tests.sh @@ -15,6 +15,7 @@ export WORKER_CONCURRENCY=1 export CONVERSION_BUFFER_ENABLED=true export BUFFER_CONVERSION_SECONDS=2 # Make sure we don't have to wait for the default 60 seconds export KAFKA_MAX_MESSAGE_BATCH_SIZE=0 +export APP_METRICS_FLUSH_FREQUENCY_MS=0 # Reduce the potential for spurious errors in tests that wait for metrics export APP_METRICS_GATHERED_FOR_ALL=true export PLUGINS_DEFAULT_LOG_LEVEL=0 # All logs, as debug logs are used in synchronization barriers export NODE_ENV=production-functional-tests diff --git a/plugin-server/functional_tests/api.ts b/plugin-server/functional_tests/api.ts index fe6ee479be3ed..9818c37421286 100644 --- a/plugin-server/functional_tests/api.ts +++ b/plugin-server/functional_tests/api.ts @@ -19,6 +19,7 @@ import { import { PostgresRouter, PostgresUse } from '../src/utils/db/postgres' import { parseRawClickHouseEvent } from '../src/utils/event' import { createPostgresPool, UUIDT } from '../src/utils/utils' +import { RawAppMetric } from '../src/worker/ingestion/app-metrics' import { insertRow } from '../tests/helpers/sql' import { waitForExpect } from './expectations' import { produce } from './kafka' @@ -151,7 +152,7 @@ export const createPlugin = async (plugin: Omit) => { } export const createPluginConfig = async ( - pluginConfig: Omit, + pluginConfig: Omit, enabled = true ) => { return await insertRow(postgres, 'posthog_pluginconfig', { @@ -330,6 +331,14 @@ export const fetchPluginLogEntries = async (pluginConfigId: number) => { return logEntries } +export const fetchPluginAppMetrics = async (pluginConfigId: number) => { + const { data: appMetrics } = (await clickHouseClient.querying(` + SELECT * FROM app_metrics + WHERE plugin_config_id = ${pluginConfigId} ORDER BY timestamp + `)) as unknown as ClickHouse.ObjectQueryResult + return appMetrics +} + export const createOrganization = async (organizationProperties = {}) => { const organizationId = new UUIDT().toString() await insertRow(postgres, 'posthog_organization', { diff --git a/plugin-server/functional_tests/plugins.test.ts b/plugin-server/functional_tests/plugins.test.ts index 695d7c705ae40..db56c3f2cefdb 100644 --- a/plugin-server/functional_tests/plugins.test.ts +++ b/plugin-server/functional_tests/plugins.test.ts @@ -1,6 +1,7 @@ import { v4 as uuid4 } from 'uuid' import { ONE_HOUR } from '../src/config/constants' +import { PluginLogEntryType } from '../src/types' import { UUIDT } from '../src/utils/utils' import { getCacheKey } from '../src/worker/vm/extensions/cache' import { @@ -13,7 +14,9 @@ import { createTeam, enablePluginConfig, fetchEvents, + fetchPluginAppMetrics, fetchPluginConsoleLogEntries, + fetchPluginLogEntries, fetchPostgresPersons, getPluginConfig, redis, @@ -90,133 +93,119 @@ test.concurrent(`plugin method tests: event captured, processed, ingested`, asyn }) }) -test.concurrent(`plugin method tests: creates error on unhandled throw`, async () => { - const plugin = await createPlugin({ - organization_id: organizationId, - name: 'test plugin', - plugin_type: 'source', - is_global: false, - source__index_ts: ` +test.concurrent( + `plugin method tests: records error in app metrics and creates log entry on unhandled throw`, + async () => { + const plugin = await createPlugin({ + organization_id: organizationId, + name: 'test plugin', + plugin_type: 'source', + is_global: false, + source__index_ts: ` export async function processEvent(event) { throw new Error('error thrown in plugin') } `, - }) - const teamId = await createTeam(organizationId) - const pluginConfig = await createAndReloadPluginConfig(teamId, plugin.id) - - const distinctId = new UUIDT().toString() - const uuid = new UUIDT().toString() + }) + const teamId = await createTeam(organizationId) + const pluginConfig = await createAndReloadPluginConfig(teamId, plugin.id) - const event = { - event: 'custom event', - // NOTE: Before `sanitizeJsonbValue` was added, the null byte below would blow up the error - // UPDATE, breaking this test. It is now replaced with the Unicode replacement character, - // \uFFFD. - properties: { name: 'haha', other: '\u0000' }, - } + const distinctId = new UUIDT().toString() + const uuid = new UUIDT().toString() - await capture({ teamId, distinctId, uuid, event: event.event, properties: event.properties }) + const event = { + event: 'custom event', + properties: { name: 'haha', other: '\u0000' }, + } - await waitForExpect(async () => { - const events = await fetchEvents(teamId) - expect(events.length).toBe(1) - return events - }) + await capture({ teamId, distinctId, uuid, event: event.event, properties: event.properties }) - const error = await waitForExpect(async () => { - const pluginConfigAgain = await getPluginConfig(teamId, pluginConfig.id) - expect(pluginConfigAgain.error).not.toBeNull() - return pluginConfigAgain.error - }) + await waitForExpect(async () => { + const events = await fetchEvents(teamId) + expect(events.length).toBe(1) + return events + }) - expect(error.message).toEqual('error thrown in plugin') - const errorProperties = error.event.properties - expect(errorProperties.name).toEqual('haha') - expect(errorProperties.other).toEqual('\uFFFD') -}) + const appMetric = await waitForExpect(async () => { + const errorMetrics = await fetchPluginAppMetrics(pluginConfig.id) + expect(errorMetrics.length).toEqual(1) + return errorMetrics[0] + }) -test.concurrent(`plugin method tests: creates error on unhandled rejection`, async () => { - const plugin = await createPlugin({ - organization_id: organizationId, - name: 'test plugin', - plugin_type: 'source', - is_global: false, - source__index_ts: ` - export async function processEvent(event) { - void new Promise((_, rejects) => { rejects(new Error('error thrown in plugin')) }).then(() => {}) - return event - } - `, - }) - const teamId = await createTeam(organizationId) - const pluginConfig = await createAndReloadPluginConfig(teamId, plugin.id) + expect(appMetric.successes).toEqual(0) + expect(appMetric.failures).toEqual(1) + expect(appMetric.error_type).toEqual('Error') + expect(JSON.parse(appMetric.error_details!)).toMatchObject({ + error: { message: 'error thrown in plugin' }, + event: { properties: event.properties }, + }) - const distinctId = new UUIDT().toString() - const uuid = new UUIDT().toString() + const errorLogEntry = await waitForExpect(async () => { + const errorLogEntries = (await fetchPluginLogEntries(pluginConfig.id)).filter( + (entry) => entry.type == PluginLogEntryType.Error + ) + expect(errorLogEntries.length).toBe(1) + return errorLogEntries[0] + }) - const event = { - event: 'custom event', - properties: { name: 'haha' }, + expect(errorLogEntry.message).toContain('error thrown in plugin') } +) - await capture({ teamId, distinctId, uuid, event: event.event, properties: event.properties }) - - await waitForExpect(async () => { - const events = await fetchEvents(teamId) - expect(events.length).toBe(1) - return events - }) - - const error = await waitForExpect(async () => { - const pluginConfigAgain = await getPluginConfig(teamId, pluginConfig.id) - expect(pluginConfigAgain.error).not.toBeNull() - return pluginConfigAgain.error - }) - - expect(error.message).toEqual('error thrown in plugin') -}) - -test.concurrent(`plugin method tests: creates error on unhandled promise errors`, async () => { - const plugin = await createPlugin({ - organization_id: organizationId, - name: 'test plugin', - plugin_type: 'source', - is_global: false, - source__index_ts: ` +test.concurrent( + `plugin method tests: records success in app metrics and creates error log entry on unawaited promise rejection`, + async () => { + const plugin = await createPlugin({ + organization_id: organizationId, + name: 'test plugin', + plugin_type: 'source', + is_global: false, + source__index_ts: ` export async function processEvent(event) { - void new Promise(() => { throw new Error('error thrown in plugin') }).then(() => {}) + void new Promise(() => { throw new Error('error thrown in plugin') }) return event } `, - }) - const teamId = await createTeam(organizationId) - const pluginConfig = await createAndReloadPluginConfig(teamId, plugin.id) + }) + const teamId = await createTeam(organizationId) + const pluginConfig = await createAndReloadPluginConfig(teamId, plugin.id) - const distinctId = new UUIDT().toString() - const uuid = new UUIDT().toString() + const distinctId = new UUIDT().toString() + const uuid = new UUIDT().toString() - const event = { - event: 'custom event', - properties: { name: 'haha' }, - } + const event = { + event: 'custom event', + properties: { name: 'haha' }, + } - await capture({ teamId, distinctId, uuid, event: event.event, properties: event.properties }) + await capture({ teamId, distinctId, uuid, event: event.event, properties: event.properties }) - await waitForExpect(async () => { - const events = await fetchEvents(teamId) - expect(events.length).toBe(1) - return events - }) + await waitForExpect(async () => { + const events = await fetchEvents(teamId) + expect(events.length).toBe(1) + return events + }) - const error = await waitForExpect(async () => { - const pluginConfigAgain = await getPluginConfig(teamId, pluginConfig.id) - expect(pluginConfigAgain.error).not.toBeNull() - return pluginConfigAgain.error - }) + const appMetric = await waitForExpect(async () => { + const appMetrics = await fetchPluginAppMetrics(pluginConfig.id) + expect(appMetrics.length).toEqual(1) + return appMetrics[0] + }) - expect(error.message).toEqual('error thrown in plugin') -}) + expect(appMetric.successes).toEqual(1) + expect(appMetric.failures).toEqual(0) + + const errorLogEntry = await waitForExpect(async () => { + const errorLogEntries = (await fetchPluginLogEntries(pluginConfig.id)).filter( + (entry) => entry.type == PluginLogEntryType.Error + ) + expect(errorLogEntries.length).toBe(1) + return errorLogEntries[0] + }) + + expect(errorLogEntry.message).toContain('error thrown in plugin') + } +) test.concurrent(`plugin method tests: teardown is called on stateful plugin reload if they are updated`, async () => { const plugin = await createPlugin({ diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index ecd7d6e8c8182..148293ba71856 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -400,7 +400,6 @@ export interface PluginConfig { enabled: boolean order: number config: Record - has_error: boolean attachments?: Record vm?: LazyPluginVM | null created_at: string diff --git a/plugin-server/src/utils/db/error.ts b/plugin-server/src/utils/db/error.ts index f161fa289bc48..96924a35a9c17 100644 --- a/plugin-server/src/utils/db/error.ts +++ b/plugin-server/src/utils/db/error.ts @@ -1,8 +1,7 @@ import { PluginEvent, PostHogEvent, ProcessedPluginEvent } from '@posthog/plugin-scaffold' import { captureException } from '@sentry/node' -import { Hub, PluginConfig, PluginError } from '../../types' -import { setError } from './sql' +import { Hub, PluginConfig, PluginError, PluginLogEntrySource, PluginLogEntryType } from '../../types' export class DependencyUnavailableError extends Error { constructor(message: string, dependencyName: string, error: Error) { @@ -47,7 +46,7 @@ export async function processError( throw error } - const errorJson: PluginError = + const pluginError: PluginError = typeof error === 'string' ? { message: error, @@ -61,14 +60,14 @@ export async function processError( event: event, } - await setError(server, errorJson, pluginConfig) -} - -export async function clearError(server: Hub, pluginConfig: PluginConfig): Promise { - // running this may causes weird deadlocks with piscina and vms, so avoiding if possible - if (pluginConfig.has_error) { - await setError(server, null, pluginConfig) - } + await server.db.queuePluginLogEntry({ + pluginConfig, + source: PluginLogEntrySource.Plugin, + type: PluginLogEntryType.Error, + message: pluginError.stack ?? pluginError.message, + instanceId: server.instanceId, + timestamp: pluginError.time, + }) } export function cleanErrorStackTrace(stack: string | undefined): string | undefined { diff --git a/plugin-server/src/utils/db/sql.ts b/plugin-server/src/utils/db/sql.ts index ceb37c9bf764c..202f7f4ece5eb 100644 --- a/plugin-server/src/utils/db/sql.ts +++ b/plugin-server/src/utils/db/sql.ts @@ -1,16 +1,5 @@ -import { - Hub, - Plugin, - PluginAttachmentDB, - PluginCapabilities, - PluginConfig, - PluginConfigId, - PluginError, - PluginLogEntrySource, - PluginLogEntryType, -} from '../../types' +import { Hub, Plugin, PluginAttachmentDB, PluginCapabilities, PluginConfig, PluginConfigId } from '../../types' import { PostgresUse } from './postgres' -import { sanitizeJsonbValue } from './utils' function pluginConfigsInForceQuery(specificField?: keyof PluginConfig): string { const fields = specificField @@ -23,8 +12,7 @@ function pluginConfigsInForceQuery(specificField?: keyof PluginConfig): string { posthog_pluginconfig.order, posthog_pluginconfig.config, posthog_pluginconfig.updated_at, - posthog_pluginconfig.created_at, - posthog_pluginconfig.error IS NOT NULL AS has_error + posthog_pluginconfig.created_at ` return `SELECT ${fields} @@ -117,27 +105,6 @@ export async function setPluginCapabilities( ) } -export async function setError(hub: Hub, pluginError: PluginError | null, pluginConfig: PluginConfig): Promise { - await hub.db.postgres.query( - PostgresUse.COMMON_WRITE, - 'UPDATE posthog_pluginconfig SET error = $1 WHERE id = $2', - // NOTE: In theory `onEvent` shouldn't be seeing events that still have the null byte, but - // it's better to be safe than sorry and sanitize the value here as well. - [sanitizeJsonbValue(pluginError), typeof pluginConfig === 'object' ? pluginConfig?.id : pluginConfig], - 'updatePluginConfigError' - ) - if (pluginError) { - await hub.db.queuePluginLogEntry({ - pluginConfig, - source: PluginLogEntrySource.Plugin, - type: PluginLogEntryType.Error, - message: pluginError.stack ?? pluginError.message, - instanceId: hub.instanceId, - timestamp: pluginError.time, - }) - } -} - export async function disablePlugin(hub: Hub, pluginConfigId: PluginConfigId): Promise { await hub.db.postgres.query( PostgresUse.COMMON_WRITE, diff --git a/plugin-server/src/worker/ingestion/app-metrics.ts b/plugin-server/src/worker/ingestion/app-metrics.ts index ffa139e250f7b..36791e235b242 100644 --- a/plugin-server/src/worker/ingestion/app-metrics.ts +++ b/plugin-server/src/worker/ingestion/app-metrics.ts @@ -53,6 +53,21 @@ interface QueuedMetric { metric: AppMetricIdentifier } +/** An aggregated AppMetric, as written to/read from a ClickHouse row. */ +export interface RawAppMetric { + timestamp: string + team_id: number + plugin_config_id: number + job_id?: string + category: string + successes: number + successes_on_retry: number + failures: number + error_uuid?: string + error_type?: string + error_details?: string +} + const MAX_STRING_LENGTH = 1000 const safeJSONStringify = configure({ @@ -88,9 +103,6 @@ export class AppMetrics { // However, we also don't want to wait too long, nor have the queue grow too big resulting in // the flush taking a long time. const now = Date.now() - if (now - this.lastFlushTime > this.flushFrequencyMs || this.queueSize > this.maxQueueSize) { - await this.flush() - } timestamp = timestamp || now const key = this._key(metric) @@ -123,6 +135,10 @@ export class AppMetrics { this.queuedData[key].failures += failures } this.queuedData[key].lastTimestamp = timestamp + + if (now - this.lastFlushTime > this.flushFrequencyMs || this.queueSize > this.maxQueueSize) { + await this.flush() + } } async queueError(metric: AppMetric, errorWithContext: ErrorWithContext, timestamp?: number) { @@ -163,7 +179,7 @@ export class AppMetrics { error_uuid: value.errorUuid, error_type: value.errorType, error_details: value.errorDetails, - }), + } as RawAppMetric), })) await this.kafkaProducer.queueMessage({ diff --git a/plugin-server/src/worker/vm/lazy.ts b/plugin-server/src/worker/vm/lazy.ts index 301abb30e964b..0a6aec913f38a 100644 --- a/plugin-server/src/worker/vm/lazy.ts +++ b/plugin-server/src/worker/vm/lazy.ts @@ -12,7 +12,7 @@ import { PluginTaskType, VMMethods, } from '../../types' -import { clearError, processError } from '../../utils/db/error' +import { processError } from '../../utils/db/error' import { disablePlugin, setPluginCapabilities } from '../../utils/db/sql' import { instrument } from '../../utils/metrics' import { getNextRetryMs } from '../../utils/retries' @@ -242,8 +242,6 @@ export class LazyPluginVM { `setupPlugin succeeded (instance ID ${this.hub.instanceId}).`, PluginLogEntryType.Debug ) - - void clearError(this.hub, this.pluginConfig) } catch (error) { this.hub.statsd?.increment('plugin.setup.fail', { plugin: this.pluginConfig.plugin?.name ?? '?' }) this.hub.statsd?.timing('plugin.setup.fail_timing', timer, { diff --git a/plugin-server/tests/helpers/sql.ts b/plugin-server/tests/helpers/sql.ts index 39e2a6b166a8f..eb167cccdb20f 100644 --- a/plugin-server/tests/helpers/sql.ts +++ b/plugin-server/tests/helpers/sql.ts @@ -1,5 +1,4 @@ import { DateTime } from 'luxon' -import { Pool } from 'pg' import { defaultConfig } from '../../src/config/config' import { @@ -269,18 +268,6 @@ export async function getFirstTeam(hub: Hub): Promise { return (await getTeams(hub))[0] } -export async function getErrorForPluginConfig(id: number): Promise { - const db = new Pool({ connectionString: defaultConfig.DATABASE_URL! }) - let error - try { - const response = await db.query('SELECT * FROM posthog_pluginconfig WHERE id = $1', [id]) - error = response.rows[0]['error'] - } catch {} - - await db.end() - return error -} - export const createPlugin = async (pg: PostgresRouter, plugin: Omit) => { return await insertRow(pg, 'posthog_plugin', { ...plugin, @@ -296,7 +283,7 @@ export const createPlugin = async (pg: PostgresRouter, plugin: Omit + pluginConfig: Omit ) => { return await insertRow(pg, 'posthog_pluginconfig', { ...pluginConfig, diff --git a/plugin-server/tests/helpers/sqlMock.ts b/plugin-server/tests/helpers/sqlMock.ts index 1da87be42a698..378c6bf6273e9 100644 --- a/plugin-server/tests/helpers/sqlMock.ts +++ b/plugin-server/tests/helpers/sqlMock.ts @@ -12,5 +12,4 @@ export const getPluginConfigRows = s.getPluginConfigRows as unknown as jest.Mock export const setPluginCapabilities = s.setPluginCapabilities as unknown as jest.MockedFunction< UnPromisify > -export const setError = s.setError as unknown as jest.MockedFunction> export const disablePlugin = s.disablePlugin as unknown as jest.MockedFunction> diff --git a/plugin-server/tests/main/ingestion-queues/run-async-handlers-event-pipeline.test.ts b/plugin-server/tests/main/ingestion-queues/run-async-handlers-event-pipeline.test.ts index 6f73fb042b9b4..064d889ad62dc 100644 --- a/plugin-server/tests/main/ingestion-queues/run-async-handlers-event-pipeline.test.ts +++ b/plugin-server/tests/main/ingestion-queues/run-async-handlers-event-pipeline.test.ts @@ -53,28 +53,21 @@ describe('runAppsOnEventPipeline()', () => { let redis: Redis.Redis let closeHub: () => Promise - beforeEach(() => { + beforeEach(async () => { // Use fake timers to ensure that we don't need to wait on e.g. retry logic. jest.useFakeTimers({ advanceTimers: true }) - }) - - beforeAll(async () => { - jest.useFakeTimers({ advanceTimers: true }) ;[hub, closeHub] = await createHub() redis = await hub.redisPool.acquire() await hub.postgres.query(PostgresUse.COMMON_WRITE, POSTGRES_DELETE_TABLES_QUERY, null, 'deleteTables') // Need to clear the DB to avoid unique constraint violations on ids }) - afterEach(() => { - jest.clearAllTimers() - jest.useRealTimers() - jest.clearAllMocks() - }) - - afterAll(async () => { + afterEach(async () => { await hub.redisPool.release(redis) await teardownPlugins(hub) await closeHub() + jest.clearAllTimers() + jest.useRealTimers() + jest.restoreAllMocks() }) test('throws on produce errors', async () => { diff --git a/plugin-server/tests/main/teardown.test.ts b/plugin-server/tests/main/teardown.test.ts index 4e5e69371523b..50e3948e912cf 100644 --- a/plugin-server/tests/main/teardown.test.ts +++ b/plugin-server/tests/main/teardown.test.ts @@ -1,11 +1,13 @@ +import ClickHouse from '@posthog/clickhouse' import { PluginEvent } from '@posthog/plugin-scaffold' +import { waitForExpect } from '../../functional_tests/expectations' import { startPluginsServer } from '../../src/main/pluginsServer' -import { Hub, LogLevel } from '../../src/types' +import { Hub, LogLevel, PluginLogEntry, PluginLogEntrySource, PluginLogEntryType } from '../../src/types' import { runEventPipeline } from '../../src/worker/ingestion/event-pipeline/runner' import { makePiscina } from '../../src/worker/piscina' import { pluginConfig39 } from '../helpers/plugins' -import { getErrorForPluginConfig, resetTestDatabase } from '../helpers/sql' +import { resetTestDatabase } from '../helpers/sql' jest.mock('../../src/utils/status') jest.setTimeout(60000) // 60 sec timeout @@ -21,6 +23,17 @@ const defaultEvent: PluginEvent = { properties: { key: 'value' }, } +async function getLogEntriesForPluginConfig(hub: Hub, pluginConfigId: number) { + const { data: logEntries } = (await hub.clickhouse.querying(` + SELECT * + FROM plugin_log_entries + WHERE + plugin_config_id = ${pluginConfigId} AND + instance_id = '${hub.instanceId}' + ORDER BY timestamp`)) as unknown as ClickHouse.ObjectQueryResult + return logEntries +} + describe('teardown', () => { const processEvent = async (hub: Hub, event: PluginEvent) => { const result = await runEventPipeline(hub, event) @@ -39,6 +52,7 @@ describe('teardown', () => { throw new Error('This Happened In The Teardown Palace') } `) + const { hub, stop } = await startPluginsServer( { WORKER_CONCURRENCY: 2, @@ -48,16 +62,30 @@ describe('teardown', () => { undefined ) - const error1 = await getErrorForPluginConfig(pluginConfig39.id) - expect(error1).toBe(null) - await processEvent(hub!, defaultEvent) - await stop?.() + await stop!() - // verify the teardownPlugin code runs - const error2 = await getErrorForPluginConfig(pluginConfig39.id) - expect(error2.message).toBe('This Happened In The Teardown Palace') + // verify the teardownPlugin code runs -- since we're reading from + // ClickHouse, we need to give it a bit of time to have consumed from + // the topic and written everything we're looking for to the table + await waitForExpect(async () => { + const logEntries = await getLogEntriesForPluginConfig(hub!, pluginConfig39.id) + + const systemErrors = logEntries.filter( + (logEntry) => + logEntry.source == PluginLogEntrySource.System && logEntry.type == PluginLogEntryType.Error + ) + expect(systemErrors).toHaveLength(1) + expect(systemErrors[0].message).toContain('Plugin failed to unload') + + const pluginErrors = logEntries.filter( + (logEntry) => + logEntry.source == PluginLogEntrySource.Plugin && logEntry.type == PluginLogEntryType.Error + ) + expect(pluginErrors).toHaveLength(1) + expect(pluginErrors[0].message).toContain('This Happened In The Teardown Palace') + }) }) test('no need to tear down if plugin was never setup', async () => { @@ -71,7 +99,7 @@ describe('teardown', () => { throw new Error('This Happened In The Teardown Palace') } `) - const { stop } = await startPluginsServer( + const { hub, stop } = await startPluginsServer( { WORKER_CONCURRENCY: 2, LOG_LEVEL: LogLevel.Log, @@ -80,14 +108,26 @@ describe('teardown', () => { undefined ) - const error1 = await getErrorForPluginConfig(pluginConfig39.id) - expect(error1).toBe(null) + await stop!() + + // verify the teardownPlugin code runs -- since we're reading from + // ClickHouse, we need to give it a bit of time to have consumed from + // the topic and written everything we're looking for to the table + await waitForExpect(async () => { + const logEntries = await getLogEntriesForPluginConfig(hub!, pluginConfig39.id) - await stop?.() + const systemLogs = logEntries.filter((logEntry) => logEntry.source == PluginLogEntrySource.System) + expect(systemLogs).toHaveLength(2) + expect(systemLogs[0].message).toContain('Plugin loaded') + expect(systemLogs[1].message).toContain('Plugin unloaded') - // verify the teardownPlugin code doesn't run, because processEvent was never called - // and thus the plugin was never setup - see LazyVM - const error2 = await getErrorForPluginConfig(pluginConfig39.id) - expect(error2).toBe(null) + // verify the teardownPlugin code doesn't run, because processEvent was never called + // and thus the plugin was never setup - see LazyVM + const pluginErrors = logEntries.filter( + (logEntry) => + logEntry.source == PluginLogEntrySource.Plugin && logEntry.type == PluginLogEntryType.Error + ) + expect(pluginErrors).toHaveLength(0) + }) }) }) diff --git a/plugin-server/tests/sql.test.ts b/plugin-server/tests/sql.test.ts index 762da1e167fb3..f3e6dad94901f 100644 --- a/plugin-server/tests/sql.test.ts +++ b/plugin-server/tests/sql.test.ts @@ -1,15 +1,8 @@ -import { Hub, PluginError } from '../src/types' +import { Hub } from '../src/types' import { createHub } from '../src/utils/db/hub' import { PostgresUse } from '../src/utils/db/postgres' -import { - disablePlugin, - getPluginAttachmentRows, - getPluginConfigRows, - getPluginRows, - setError, -} from '../src/utils/db/sql' -import { sanitizeJsonbValue } from '../src/utils/db/utils' -import { commonOrganizationId, pluginConfig39 } from './helpers/plugins' +import { disablePlugin, getPluginAttachmentRows, getPluginConfigRows, getPluginRows } from '../src/utils/db/sql' +import { commonOrganizationId } from './helpers/plugins' import { resetTestDatabase } from './helpers/sql' jest.setTimeout(20_000) @@ -60,7 +53,6 @@ describe('sql', () => { localhostIP: '94.224.212.175', }, enabled: true, - has_error: false, id: 39, order: 0, plugin_id: 60, @@ -71,23 +63,6 @@ describe('sql', () => { const rows1 = await getPluginConfigRows(hub) expect(rows1).toEqual([expectedRow]) - - await hub.db.postgres.query( - PostgresUse.COMMON_WRITE, - "update posthog_team set plugins_opt_in='f'", - undefined, - 'testTag' - ) - const pluginError: PluginError = { message: 'error happened', time: 'now' } - await setError(hub, pluginError, pluginConfig39) - - const rows2 = await getPluginConfigRows(hub) - expect(rows2).toEqual([ - { - ...expectedRow, - has_error: true, - }, - ]) }) test('getPluginRows', async () => { @@ -128,27 +103,6 @@ describe('sql', () => { expect(rows2).toEqual(rowsExpected) }) - test('setError', async () => { - hub.db.postgres.query = jest.fn() as any - - await setError(hub, null, pluginConfig39) - expect(hub.db.postgres.query).toHaveBeenCalledWith( - PostgresUse.COMMON_WRITE, - 'UPDATE posthog_pluginconfig SET error = $1 WHERE id = $2', - [null, pluginConfig39.id], - 'updatePluginConfigError' - ) - - const pluginError: PluginError = { message: 'error happened', time: 'now' } - await setError(hub, pluginError, pluginConfig39) - expect(hub.db.postgres.query).toHaveBeenCalledWith( - PostgresUse.COMMON_WRITE, - 'UPDATE posthog_pluginconfig SET error = $1 WHERE id = $2', - [sanitizeJsonbValue(pluginError), pluginConfig39.id], - 'updatePluginConfigError' - ) - }) - describe('disablePlugin', () => { test('disablePlugin query builds correctly', async () => { hub.db.postgres.query = jest.fn() as any diff --git a/plugin-server/tests/utils/retries.test.ts b/plugin-server/tests/utils/retries.test.ts index 927c7b7675ceb..15193e8ad825c 100644 --- a/plugin-server/tests/utils/retries.test.ts +++ b/plugin-server/tests/utils/retries.test.ts @@ -8,7 +8,6 @@ import { PromiseManager } from '../../src/worker/vm/promise-manager' jest.useFakeTimers() jest.spyOn(global, 'setTimeout') -jest.mock('../../src/utils/db/error') // Mocking setError which we don't need in tests const mockHub: Hub = { instanceId: new UUID('F8B2F832-6639-4596-ABFC-F9664BC88E84'), diff --git a/posthog/api/plugin.py b/posthog/api/plugin.py index 15671f251b199..46fa5f3c2a422 100644 --- a/posthog/api/plugin.py +++ b/posthog/api/plugin.py @@ -531,6 +531,7 @@ class PluginConfigSerializer(serializers.ModelSerializer): config = serializers.SerializerMethodField() plugin_info = serializers.SerializerMethodField() delivery_rate_24h = serializers.SerializerMethodField() + error = serializers.SerializerMethodField() class Meta: model = PluginConfig @@ -554,6 +555,7 @@ class Meta: "id", "team_id", "plugin_info", + "error", "delivery_rate_24h", "created_at", ] @@ -604,6 +606,12 @@ def get_delivery_rate_24h(self, plugin_config: PluginConfig): else: return None + def get_error(self, plugin_config: PluginConfig) -> None: + # Reporting the single latest error is no longer supported: use app + # metrics (for fatal errors) or plugin log entries (for all errors) for + # error details instead. + return None + def create(self, validated_data: Dict, *args: Any, **kwargs: Any) -> PluginConfig: if not can_configure_plugins(self.context["get_organization"]()): raise ValidationError("Plugin configuration is not available for the current organization!") diff --git a/posthog/models/plugin.py b/posthog/models/plugin.py index ff81a8d09d0f4..34afc1918b1d8 100644 --- a/posthog/models/plugin.py +++ b/posthog/models/plugin.py @@ -228,6 +228,7 @@ class Meta: enabled: models.BooleanField = models.BooleanField(default=False) order: models.IntegerField = models.IntegerField() config: models.JSONField = models.JSONField(default=dict) + # DEPRECATED: use `plugin_log_entries` or `app_metrics` in ClickHouse instead # Error when running this plugin on an event (frontend: PluginErrorType) # - e.g: "undefined is not a function on index.js line 23" # - error = { message: "Exception in processEvent()", time: "iso-string", ...meta }