From b9c9336baecb64380e5a87bbd3936a251b1e91f6 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Thu, 16 Nov 2023 16:20:18 -0800 Subject: [PATCH 01/23] Check if we need to flush after queueing, rather than before -- this improves the chances we'll hit our delivery targets (and also makes this easier to test.) --- plugin-server/src/worker/ingestion/app-metrics.ts | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/plugin-server/src/worker/ingestion/app-metrics.ts b/plugin-server/src/worker/ingestion/app-metrics.ts index ffa139e250f7b..ea430237ca06b 100644 --- a/plugin-server/src/worker/ingestion/app-metrics.ts +++ b/plugin-server/src/worker/ingestion/app-metrics.ts @@ -88,9 +88,6 @@ export class AppMetrics { // However, we also don't want to wait too long, nor have the queue grow too big resulting in // the flush taking a long time. const now = Date.now() - if (now - this.lastFlushTime > this.flushFrequencyMs || this.queueSize > this.maxQueueSize) { - await this.flush() - } timestamp = timestamp || now const key = this._key(metric) @@ -123,6 +120,10 @@ export class AppMetrics { this.queuedData[key].failures += failures } this.queuedData[key].lastTimestamp = timestamp + + if (now - this.lastFlushTime > this.flushFrequencyMs || this.queueSize > this.maxQueueSize) { + await this.flush() + } } async queueError(metric: AppMetric, errorWithContext: ErrorWithContext, timestamp?: number) { From c4cbd65ca74e2f18f2959d043f181f73fda7e63f Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Thu, 16 Nov 2023 10:47:50 -0800 Subject: [PATCH 02/23] Remove UPDATEs from error callback. --- plugin-server/src/utils/db/sql.ts | 9 --------- plugin-server/tests/sql.test.ts | 22 ---------------------- 2 files changed, 31 deletions(-) diff --git a/plugin-server/src/utils/db/sql.ts b/plugin-server/src/utils/db/sql.ts index ceb37c9bf764c..9a4a9a9eaef6a 100644 --- a/plugin-server/src/utils/db/sql.ts +++ b/plugin-server/src/utils/db/sql.ts @@ -10,7 +10,6 @@ import { PluginLogEntryType, } from '../../types' import { PostgresUse } from './postgres' -import { sanitizeJsonbValue } from './utils' function pluginConfigsInForceQuery(specificField?: keyof PluginConfig): string { const fields = specificField @@ -118,14 +117,6 @@ export async function setPluginCapabilities( } export async function setError(hub: Hub, pluginError: PluginError | null, pluginConfig: PluginConfig): Promise { - await hub.db.postgres.query( - PostgresUse.COMMON_WRITE, - 'UPDATE posthog_pluginconfig SET error = $1 WHERE id = $2', - // NOTE: In theory `onEvent` shouldn't be seeing events that still have the null byte, but - // it's better to be safe than sorry and sanitize the value here as well. - [sanitizeJsonbValue(pluginError), typeof pluginConfig === 'object' ? pluginConfig?.id : pluginConfig], - 'updatePluginConfigError' - ) if (pluginError) { await hub.db.queuePluginLogEntry({ pluginConfig, diff --git a/plugin-server/tests/sql.test.ts b/plugin-server/tests/sql.test.ts index 762da1e167fb3..1d72c6c7bbfc1 100644 --- a/plugin-server/tests/sql.test.ts +++ b/plugin-server/tests/sql.test.ts @@ -8,7 +8,6 @@ import { getPluginRows, setError, } from '../src/utils/db/sql' -import { sanitizeJsonbValue } from '../src/utils/db/utils' import { commonOrganizationId, pluginConfig39 } from './helpers/plugins' import { resetTestDatabase } from './helpers/sql' @@ -128,27 +127,6 @@ describe('sql', () => { expect(rows2).toEqual(rowsExpected) }) - test('setError', async () => { - hub.db.postgres.query = jest.fn() as any - - await setError(hub, null, pluginConfig39) - expect(hub.db.postgres.query).toHaveBeenCalledWith( - PostgresUse.COMMON_WRITE, - 'UPDATE posthog_pluginconfig SET error = $1 WHERE id = $2', - [null, pluginConfig39.id], - 'updatePluginConfigError' - ) - - const pluginError: PluginError = { message: 'error happened', time: 'now' } - await setError(hub, pluginError, pluginConfig39) - expect(hub.db.postgres.query).toHaveBeenCalledWith( - PostgresUse.COMMON_WRITE, - 'UPDATE posthog_pluginconfig SET error = $1 WHERE id = $2', - [sanitizeJsonbValue(pluginError), pluginConfig39.id], - 'updatePluginConfigError' - ) - }) - describe('disablePlugin', () => { test('disablePlugin query builds correctly', async () => { hub.db.postgres.query = jest.fn() as any From b71ea99dbaf6eb937e666347da8171f349c8131b Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Thu, 16 Nov 2023 16:45:03 -0800 Subject: [PATCH 03/23] Hack together an ugly test that passes, but requires running server with `APP_METRICS_FLUSH_FREQUENCY_MS=0`. --- plugin-server/functional_tests/api.ts | 8 +++++++ .../functional_tests/plugins.test.ts | 23 ++++++++++--------- 2 files changed, 20 insertions(+), 11 deletions(-) diff --git a/plugin-server/functional_tests/api.ts b/plugin-server/functional_tests/api.ts index 5a924802ade77..b652d4e5c0529 100644 --- a/plugin-server/functional_tests/api.ts +++ b/plugin-server/functional_tests/api.ts @@ -330,6 +330,14 @@ export const fetchPluginLogEntries = async (pluginConfigId: number) => { return logEntries } +export const fetchPluginAppMetrics = async (pluginConfigId: number) => { + // TODO: clean up, better type handling + const { data: appMetrics } = await clickHouseClient.querying( + `SELECT * FROM app_metrics WHERE plugin_config_id = ${pluginConfigId} ORDER BY timestamp` + ) + return appMetrics +} + export const createOrganization = async (organizationProperties = {}) => { const organizationId = new UUIDT().toString() await insertRow(postgres, 'posthog_organization', { diff --git a/plugin-server/functional_tests/plugins.test.ts b/plugin-server/functional_tests/plugins.test.ts index 9542d59476831..a94e137a996a7 100644 --- a/plugin-server/functional_tests/plugins.test.ts +++ b/plugin-server/functional_tests/plugins.test.ts @@ -12,6 +12,7 @@ import { createTeam, enablePluginConfig, fetchEvents, + fetchPluginAppMetrics, fetchPluginConsoleLogEntries, fetchPostgresPersons, getPluginConfig, @@ -108,9 +109,6 @@ test.concurrent(`plugin method tests: creates error on unhandled throw`, async ( const event = { event: 'custom event', - // NOTE: Before `sanitizeJsonbValue` was added, the null byte below would blow up the error - // UPDATE, breaking this test. It is now replaced with the Unicode replacement character, - // \uFFFD. properties: { name: 'haha', other: '\u0000' }, } @@ -122,16 +120,19 @@ test.concurrent(`plugin method tests: creates error on unhandled throw`, async ( return events }) - const error = await waitForExpect(async () => { - const pluginConfigAgain = await getPluginConfig(teamId, pluginConfig.id) - expect(pluginConfigAgain.error).not.toBeNull() - return pluginConfigAgain.error + const { error_details } = await waitForExpect(async () => { + // TODO: clean up, move parsing down to fetch + const errors = (await fetchPluginAppMetrics(pluginConfig.id)) + .filter((record) => record.error_type) + .map((record) => ({ ...record, error_details: JSON.parse(record.error_details) })) + expect(errors.length).toEqual(1) + return errors[0] }) - expect(error.message).toEqual('error thrown in plugin') - const errorProperties = error.event.properties - expect(errorProperties.name).toEqual('haha') - expect(errorProperties.other).toEqual('\uFFFD') + expect(error_details).toMatchObject({ + error: { message: 'error thrown in plugin' }, + event: { properties: event.properties }, + }) }) test.concurrent(`plugin method tests: creates error on unhandled rejection`, async () => { From 8d2a31a8716a43d41dab0b1b2307ca4a6e3631b7 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Thu, 16 Nov 2023 17:11:31 -0800 Subject: [PATCH 04/23] Move error parsing down into helper function. --- plugin-server/functional_tests/api.ts | 7 ++++++- plugin-server/functional_tests/plugins.test.ts | 5 +---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/plugin-server/functional_tests/api.ts b/plugin-server/functional_tests/api.ts index b652d4e5c0529..806fff0d576ef 100644 --- a/plugin-server/functional_tests/api.ts +++ b/plugin-server/functional_tests/api.ts @@ -335,7 +335,12 @@ export const fetchPluginAppMetrics = async (pluginConfigId: number) => { const { data: appMetrics } = await clickHouseClient.querying( `SELECT * FROM app_metrics WHERE plugin_config_id = ${pluginConfigId} ORDER BY timestamp` ) - return appMetrics + return appMetrics.map((row) => { + if (row.error_type) { + row.error_details = JSON.parse(row.error_details) + } + return row + }) } export const createOrganization = async (organizationProperties = {}) => { diff --git a/plugin-server/functional_tests/plugins.test.ts b/plugin-server/functional_tests/plugins.test.ts index a94e137a996a7..c9200815d4031 100644 --- a/plugin-server/functional_tests/plugins.test.ts +++ b/plugin-server/functional_tests/plugins.test.ts @@ -121,10 +121,7 @@ test.concurrent(`plugin method tests: creates error on unhandled throw`, async ( }) const { error_details } = await waitForExpect(async () => { - // TODO: clean up, move parsing down to fetch - const errors = (await fetchPluginAppMetrics(pluginConfig.id)) - .filter((record) => record.error_type) - .map((record) => ({ ...record, error_details: JSON.parse(record.error_details) })) + const errors = (await fetchPluginAppMetrics(pluginConfig.id)).filter((record) => record.error_type) expect(errors.length).toEqual(1) return errors[0] }) From 82f4f163e846fba599bb5b0e1abd2014761ef0d6 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Thu, 16 Nov 2023 17:18:11 -0800 Subject: [PATCH 05/23] Update the remainder of the tests (...which error?) --- .../functional_tests/plugins.test.ts | 26 ++++++++++++------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/plugin-server/functional_tests/plugins.test.ts b/plugin-server/functional_tests/plugins.test.ts index c9200815d4031..cd1ab85fda898 100644 --- a/plugin-server/functional_tests/plugins.test.ts +++ b/plugin-server/functional_tests/plugins.test.ts @@ -164,13 +164,16 @@ test.concurrent(`plugin method tests: creates error on unhandled rejection`, asy return events }) - const error = await waitForExpect(async () => { - const pluginConfigAgain = await getPluginConfig(teamId, pluginConfig.id) - expect(pluginConfigAgain.error).not.toBeNull() - return pluginConfigAgain.error + const { error_details } = await waitForExpect(async () => { + const errors = (await fetchPluginAppMetrics(pluginConfig.id)).filter((record) => record.error_type) + expect(errors.length).toEqual(1) + return errors[0] }) - expect(error.message).toEqual('error thrown in plugin') + expect(error_details).toMatchObject({ + error: { message: 'error thrown in plugin' }, + event: { properties: event.properties }, + }) }) test.concurrent(`plugin method tests: creates error on unhandled promise errors`, async () => { @@ -205,13 +208,16 @@ test.concurrent(`plugin method tests: creates error on unhandled promise errors` return events }) - const error = await waitForExpect(async () => { - const pluginConfigAgain = await getPluginConfig(teamId, pluginConfig.id) - expect(pluginConfigAgain.error).not.toBeNull() - return pluginConfigAgain.error + const { error_details } = await waitForExpect(async () => { + const errors = (await fetchPluginAppMetrics(pluginConfig.id)).filter((record) => record.error_type) + expect(errors.length).toEqual(1) + return errors[0] }) - expect(error.message).toEqual('error thrown in plugin') + expect(error_details).toMatchObject({ + error: { message: 'error thrown in plugin' }, + event: { properties: event.properties }, + }) }) test.concurrent(`plugin method tests: teardown is called on stateful plugin reload if they are updated`, async () => { From 8068ec34494948f81a2c9429f8b5ae11d4636227 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Fri, 17 Nov 2023 15:49:17 -0800 Subject: [PATCH 06/23] Remove a case so nice it was tested twice... --- .../functional_tests/plugins.test.ts | 44 ------------------- 1 file changed, 44 deletions(-) diff --git a/plugin-server/functional_tests/plugins.test.ts b/plugin-server/functional_tests/plugins.test.ts index cd1ab85fda898..6e4b4a3713280 100644 --- a/plugin-server/functional_tests/plugins.test.ts +++ b/plugin-server/functional_tests/plugins.test.ts @@ -132,50 +132,6 @@ test.concurrent(`plugin method tests: creates error on unhandled throw`, async ( }) }) -test.concurrent(`plugin method tests: creates error on unhandled rejection`, async () => { - const plugin = await createPlugin({ - organization_id: organizationId, - name: 'test plugin', - plugin_type: 'source', - is_global: false, - source__index_ts: ` - export async function processEvent(event) { - void new Promise((_, rejects) => { rejects(new Error('error thrown in plugin')) }).then(() => {}) - return event - } - `, - }) - const teamId = await createTeam(organizationId) - const pluginConfig = await createAndReloadPluginConfig(teamId, plugin.id) - - const distinctId = new UUIDT().toString() - const uuid = new UUIDT().toString() - - const event = { - event: 'custom event', - properties: { name: 'haha' }, - } - - await capture({ teamId, distinctId, uuid, event: event.event, properties: event.properties }) - - await waitForExpect(async () => { - const events = await fetchEvents(teamId) - expect(events.length).toBe(1) - return events - }) - - const { error_details } = await waitForExpect(async () => { - const errors = (await fetchPluginAppMetrics(pluginConfig.id)).filter((record) => record.error_type) - expect(errors.length).toEqual(1) - return errors[0] - }) - - expect(error_details).toMatchObject({ - error: { message: 'error thrown in plugin' }, - event: { properties: event.properties }, - }) -}) - test.concurrent(`plugin method tests: creates error on unhandled promise errors`, async () => { const plugin = await createPlugin({ organization_id: organizationId, From 684c76423d16a0c8fdbd66b22d1763ead494c796 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Fri, 17 Nov 2023 15:52:05 -0800 Subject: [PATCH 07/23] Also remove unnecessary chaining from unhandled promise rejection. --- plugin-server/functional_tests/plugins.test.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/plugin-server/functional_tests/plugins.test.ts b/plugin-server/functional_tests/plugins.test.ts index 6e4b4a3713280..8727c06d538db 100644 --- a/plugin-server/functional_tests/plugins.test.ts +++ b/plugin-server/functional_tests/plugins.test.ts @@ -132,7 +132,7 @@ test.concurrent(`plugin method tests: creates error on unhandled throw`, async ( }) }) -test.concurrent(`plugin method tests: creates error on unhandled promise errors`, async () => { +test.concurrent(`plugin method tests: creates error on unhandled promise rejection`, async () => { const plugin = await createPlugin({ organization_id: organizationId, name: 'test plugin', @@ -140,7 +140,7 @@ test.concurrent(`plugin method tests: creates error on unhandled promise errors` is_global: false, source__index_ts: ` export async function processEvent(event) { - void new Promise(() => { throw new Error('error thrown in plugin') }).then(() => {}) + void new Promise(() => { throw new Error('error thrown in plugin') }) return event } `, From 29f5f999097f540cf6da61f569fc97a15e82be7b Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Fri, 17 Nov 2023 16:03:13 -0800 Subject: [PATCH 08/23] Check for log entry, not plugin error. --- plugin-server/functional_tests/plugins.test.ts | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/plugin-server/functional_tests/plugins.test.ts b/plugin-server/functional_tests/plugins.test.ts index 8727c06d538db..98b02c386797f 100644 --- a/plugin-server/functional_tests/plugins.test.ts +++ b/plugin-server/functional_tests/plugins.test.ts @@ -1,6 +1,7 @@ import { v4 as uuid4 } from 'uuid' import { ONE_HOUR } from '../src/config/constants' +import { PluginLogEntryType } from '../src/types' import { UUIDT } from '../src/utils/utils' import { capture, @@ -14,6 +15,7 @@ import { fetchEvents, fetchPluginAppMetrics, fetchPluginConsoleLogEntries, + fetchPluginLogEntries, fetchPostgresPersons, getPluginConfig, reloadPlugins, @@ -164,16 +166,15 @@ test.concurrent(`plugin method tests: creates error on unhandled promise rejecti return events }) - const { error_details } = await waitForExpect(async () => { - const errors = (await fetchPluginAppMetrics(pluginConfig.id)).filter((record) => record.error_type) - expect(errors.length).toEqual(1) - return errors[0] + const errorLogEntry = await waitForExpect(async () => { + const errorLogEntries = (await fetchPluginLogEntries(pluginConfig.id)).filter( + (entry) => entry.type == PluginLogEntryType.Error + ) + expect(errorLogEntries.length).toBe(1) + return errorLogEntries[0] }) - expect(error_details).toMatchObject({ - error: { message: 'error thrown in plugin' }, - event: { properties: event.properties }, - }) + expect(errorLogEntry.message).toContain('error thrown in plugin') }) test.concurrent(`plugin method tests: teardown is called on stateful plugin reload if they are updated`, async () => { From 182130c6a6bece86daa0f027caf5012b09ab5625 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Mon, 20 Nov 2023 13:49:12 -0800 Subject: [PATCH 09/23] Clean up some messy wording. --- plugin-server/functional_tests/plugins.test.ts | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/plugin-server/functional_tests/plugins.test.ts b/plugin-server/functional_tests/plugins.test.ts index 98b02c386797f..5cc514789b2a8 100644 --- a/plugin-server/functional_tests/plugins.test.ts +++ b/plugin-server/functional_tests/plugins.test.ts @@ -91,7 +91,7 @@ test.concurrent(`plugin method tests: event captured, processed, ingested`, asyn }) }) -test.concurrent(`plugin method tests: creates error on unhandled throw`, async () => { +test.concurrent(`plugin method tests: records error in app metrics on unhandled throw`, async () => { const plugin = await createPlugin({ organization_id: organizationId, name: 'test plugin', @@ -122,19 +122,19 @@ test.concurrent(`plugin method tests: creates error on unhandled throw`, async ( return events }) - const { error_details } = await waitForExpect(async () => { + const errorDetails = await waitForExpect(async () => { const errors = (await fetchPluginAppMetrics(pluginConfig.id)).filter((record) => record.error_type) expect(errors.length).toEqual(1) return errors[0] - }) + }).error_details - expect(error_details).toMatchObject({ + expect(errorDetails).toMatchObject({ error: { message: 'error thrown in plugin' }, event: { properties: event.properties }, }) }) -test.concurrent(`plugin method tests: creates error on unhandled promise rejection`, async () => { +test.concurrent(`plugin method tests: creates log entry on unhandled promise rejection`, async () => { const plugin = await createPlugin({ organization_id: organizationId, name: 'test plugin', From ff5ce590611666a8f38ed8ad39f4d0acc85b05c5 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Mon, 20 Nov 2023 14:00:43 -0800 Subject: [PATCH 10/23] Set a reminder for what actually needs to be cleaned up with types here. --- plugin-server/functional_tests/api.ts | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/plugin-server/functional_tests/api.ts b/plugin-server/functional_tests/api.ts index 806fff0d576ef..cb2ad9ee67b74 100644 --- a/plugin-server/functional_tests/api.ts +++ b/plugin-server/functional_tests/api.ts @@ -331,10 +331,13 @@ export const fetchPluginLogEntries = async (pluginConfigId: number) => { } export const fetchPluginAppMetrics = async (pluginConfigId: number) => { - // TODO: clean up, better type handling - const { data: appMetrics } = await clickHouseClient.querying( - `SELECT * FROM app_metrics WHERE plugin_config_id = ${pluginConfigId} ORDER BY timestamp` - ) + // TODO: Improve type handling here: the structure of ``AppMetric`` is + // inconsistent with that of the ClickHouse schema (seemingly for stylistic + // reasons), so that will need to be bridged somehow. + const { data: appMetrics } = await clickHouseClient.querying(` + SELECT * FROM app_metrics + WHERE plugin_config_id = ${pluginConfigId} ORDER BY timestamp + `) return appMetrics.map((row) => { if (row.error_type) { row.error_details = JSON.parse(row.error_details) From 8f28d383b333a705e3539b3f77e4d9b0c7b5d07c Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Mon, 20 Nov 2023 16:04:17 -0800 Subject: [PATCH 11/23] Hide the error field from the model serializer. --- posthog/api/plugin.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/posthog/api/plugin.py b/posthog/api/plugin.py index 15671f251b199..46fa5f3c2a422 100644 --- a/posthog/api/plugin.py +++ b/posthog/api/plugin.py @@ -531,6 +531,7 @@ class PluginConfigSerializer(serializers.ModelSerializer): config = serializers.SerializerMethodField() plugin_info = serializers.SerializerMethodField() delivery_rate_24h = serializers.SerializerMethodField() + error = serializers.SerializerMethodField() class Meta: model = PluginConfig @@ -554,6 +555,7 @@ class Meta: "id", "team_id", "plugin_info", + "error", "delivery_rate_24h", "created_at", ] @@ -604,6 +606,12 @@ def get_delivery_rate_24h(self, plugin_config: PluginConfig): else: return None + def get_error(self, plugin_config: PluginConfig) -> None: + # Reporting the single latest error is no longer supported: use app + # metrics (for fatal errors) or plugin log entries (for all errors) for + # error details instead. + return None + def create(self, validated_data: Dict, *args: Any, **kwargs: Any) -> PluginConfig: if not can_configure_plugins(self.context["get_organization"]()): raise ValidationError("Plugin configuration is not available for the current organization!") From b30bb45eaf71d0d6f1e7887e823f89340ded8d94 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Mon, 20 Nov 2023 17:40:55 -0800 Subject: [PATCH 12/23] Clean up(?) types. --- plugin-server/functional_tests/api.ts | 15 ++++----------- plugin-server/functional_tests/plugins.test.ts | 8 ++++---- .../src/worker/ingestion/app-metrics.ts | 17 ++++++++++++++++- 3 files changed, 24 insertions(+), 16 deletions(-) diff --git a/plugin-server/functional_tests/api.ts b/plugin-server/functional_tests/api.ts index cb2ad9ee67b74..cba596aafddfa 100644 --- a/plugin-server/functional_tests/api.ts +++ b/plugin-server/functional_tests/api.ts @@ -19,6 +19,7 @@ import { import { PostgresRouter, PostgresUse } from '../src/utils/db/postgres' import { parseRawClickHouseEvent } from '../src/utils/event' import { createPostgresPool, UUIDT } from '../src/utils/utils' +import { RawAppMetric } from '../src/worker/ingestion/app-metrics' import { insertRow } from '../tests/helpers/sql' import { waitForExpect } from './expectations' import { produce } from './kafka' @@ -331,19 +332,11 @@ export const fetchPluginLogEntries = async (pluginConfigId: number) => { } export const fetchPluginAppMetrics = async (pluginConfigId: number) => { - // TODO: Improve type handling here: the structure of ``AppMetric`` is - // inconsistent with that of the ClickHouse schema (seemingly for stylistic - // reasons), so that will need to be bridged somehow. - const { data: appMetrics } = await clickHouseClient.querying(` + const { data: appMetrics } = (await clickHouseClient.querying(` SELECT * FROM app_metrics WHERE plugin_config_id = ${pluginConfigId} ORDER BY timestamp - `) - return appMetrics.map((row) => { - if (row.error_type) { - row.error_details = JSON.parse(row.error_details) - } - return row - }) + `)) as unknown as ClickHouse.ObjectQueryResult + return appMetrics } export const createOrganization = async (organizationProperties = {}) => { diff --git a/plugin-server/functional_tests/plugins.test.ts b/plugin-server/functional_tests/plugins.test.ts index 5cc514789b2a8..de05a9bbda096 100644 --- a/plugin-server/functional_tests/plugins.test.ts +++ b/plugin-server/functional_tests/plugins.test.ts @@ -122,13 +122,13 @@ test.concurrent(`plugin method tests: records error in app metrics on unhandled return events }) - const errorDetails = await waitForExpect(async () => { - const errors = (await fetchPluginAppMetrics(pluginConfig.id)).filter((record) => record.error_type) + const error = await waitForExpect(async () => { + const errors = (await fetchPluginAppMetrics(pluginConfig.id)).filter((metric) => metric.error_type) expect(errors.length).toEqual(1) return errors[0] - }).error_details + }) - expect(errorDetails).toMatchObject({ + expect(JSON.parse(error.error_details as string)).toMatchObject({ error: { message: 'error thrown in plugin' }, event: { properties: event.properties }, }) diff --git a/plugin-server/src/worker/ingestion/app-metrics.ts b/plugin-server/src/worker/ingestion/app-metrics.ts index ea430237ca06b..36791e235b242 100644 --- a/plugin-server/src/worker/ingestion/app-metrics.ts +++ b/plugin-server/src/worker/ingestion/app-metrics.ts @@ -53,6 +53,21 @@ interface QueuedMetric { metric: AppMetricIdentifier } +/** An aggregated AppMetric, as written to/read from a ClickHouse row. */ +export interface RawAppMetric { + timestamp: string + team_id: number + plugin_config_id: number + job_id?: string + category: string + successes: number + successes_on_retry: number + failures: number + error_uuid?: string + error_type?: string + error_details?: string +} + const MAX_STRING_LENGTH = 1000 const safeJSONStringify = configure({ @@ -164,7 +179,7 @@ export class AppMetrics { error_uuid: value.errorUuid, error_type: value.errorType, error_details: value.errorDetails, - }), + } as RawAppMetric), })) await this.kafkaProducer.queueMessage({ From 92ab3e834490ea52f0b0626870033284b5be193c Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Mon, 20 Nov 2023 17:48:49 -0800 Subject: [PATCH 13/23] Update functional test script and docs to use `APP_METRICS_FLUSH_FREQUENCY_MS`. --- plugin-server/README.md | 2 +- plugin-server/bin/ci_functional_tests.sh | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/plugin-server/README.md b/plugin-server/README.md index 67ec3042dca4f..e1607b78a1c3c 100644 --- a/plugin-server/README.md +++ b/plugin-server/README.md @@ -47,7 +47,7 @@ testing: 1. run docker `docker compose -f docker-compose.dev.yml up` (in posthog folder) 1. setup the test DBs `pnpm setup:test` -1. start the plugin-server with `CLICKHOUSE_DATABASE='default' DATABASE_URL=postgres://posthog:posthog@localhost:5432/test_posthog RELOAD_PLUGIN_JITTER_MAX_MS=0 pnpm start:dev` +1. start the plugin-server with `APP_METRICS_FLUSH_FREQUENCY_MS=0 CLICKHOUSE_DATABASE='default' DATABASE_URL=postgres://posthog:posthog@localhost:5432/test_posthog RELOAD_PLUGIN_JITTER_MAX_MS=0 pnpm start:dev` 1. run the tests with `CLICKHOUSE_DATABASE='default' DATABASE_URL=postgres://posthog:posthog@localhost:5432/test_posthog pnpm functional_tests --watch` ## CLI flags diff --git a/plugin-server/bin/ci_functional_tests.sh b/plugin-server/bin/ci_functional_tests.sh index 9eff572c71251..a9a7910a14daf 100755 --- a/plugin-server/bin/ci_functional_tests.sh +++ b/plugin-server/bin/ci_functional_tests.sh @@ -15,6 +15,7 @@ export WORKER_CONCURRENCY=1 export CONVERSION_BUFFER_ENABLED=true export BUFFER_CONVERSION_SECONDS=2 # Make sure we don't have to wait for the default 60 seconds export KAFKA_MAX_MESSAGE_BATCH_SIZE=0 +export APP_METRICS_FLUSH_FREQUENCY_MS=0 # Reduce the potential for spurious errors in tests that wait for metrics export APP_METRICS_GATHERED_FOR_ALL=true export NODE_ENV=production-functional-tests From b229260d58890934b45f8e7d84712e6578d25ca2 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Mon, 20 Nov 2023 18:01:50 -0800 Subject: [PATCH 14/23] Make tests better. --- .../functional_tests/plugins.test.ts | 154 +++++++++++------- 1 file changed, 91 insertions(+), 63 deletions(-) diff --git a/plugin-server/functional_tests/plugins.test.ts b/plugin-server/functional_tests/plugins.test.ts index de05a9bbda096..93d4152422497 100644 --- a/plugin-server/functional_tests/plugins.test.ts +++ b/plugin-server/functional_tests/plugins.test.ts @@ -91,91 +91,119 @@ test.concurrent(`plugin method tests: event captured, processed, ingested`, asyn }) }) -test.concurrent(`plugin method tests: records error in app metrics on unhandled throw`, async () => { - const plugin = await createPlugin({ - organization_id: organizationId, - name: 'test plugin', - plugin_type: 'source', - is_global: false, - source__index_ts: ` +test.concurrent( + `plugin method tests: records error in app metrics and creates log entry on unhandled throw`, + async () => { + const plugin = await createPlugin({ + organization_id: organizationId, + name: 'test plugin', + plugin_type: 'source', + is_global: false, + source__index_ts: ` export async function processEvent(event) { throw new Error('error thrown in plugin') } `, - }) - const teamId = await createTeam(organizationId) - const pluginConfig = await createAndReloadPluginConfig(teamId, plugin.id) + }) + const teamId = await createTeam(organizationId) + const pluginConfig = await createAndReloadPluginConfig(teamId, plugin.id) - const distinctId = new UUIDT().toString() - const uuid = new UUIDT().toString() + const distinctId = new UUIDT().toString() + const uuid = new UUIDT().toString() - const event = { - event: 'custom event', - properties: { name: 'haha', other: '\u0000' }, - } + const event = { + event: 'custom event', + properties: { name: 'haha', other: '\u0000' }, + } - await capture({ teamId, distinctId, uuid, event: event.event, properties: event.properties }) + await capture({ teamId, distinctId, uuid, event: event.event, properties: event.properties }) - await waitForExpect(async () => { - const events = await fetchEvents(teamId) - expect(events.length).toBe(1) - return events - }) + await waitForExpect(async () => { + const events = await fetchEvents(teamId) + expect(events.length).toBe(1) + return events + }) - const error = await waitForExpect(async () => { - const errors = (await fetchPluginAppMetrics(pluginConfig.id)).filter((metric) => metric.error_type) - expect(errors.length).toEqual(1) - return errors[0] - }) + const appMetric = await waitForExpect(async () => { + const errorMetrics = await fetchPluginAppMetrics(pluginConfig.id) + expect(errorMetrics.length).toEqual(1) + return errorMetrics[0] + }) - expect(JSON.parse(error.error_details as string)).toMatchObject({ - error: { message: 'error thrown in plugin' }, - event: { properties: event.properties }, - }) -}) + expect(appMetric.successes).toEqual(0) + expect(appMetric.failures).toEqual(1) + expect(appMetric.error_type).toEqual('Error') + expect(JSON.parse(appMetric.error_details as string)).toMatchObject({ + error: { message: 'error thrown in plugin' }, + event: { properties: event.properties }, + }) -test.concurrent(`plugin method tests: creates log entry on unhandled promise rejection`, async () => { - const plugin = await createPlugin({ - organization_id: organizationId, - name: 'test plugin', - plugin_type: 'source', - is_global: false, - source__index_ts: ` + const errorLogEntry = await waitForExpect(async () => { + const errorLogEntries = (await fetchPluginLogEntries(pluginConfig.id)).filter( + (entry) => entry.type == PluginLogEntryType.Error + ) + expect(errorLogEntries.length).toBe(1) + return errorLogEntries[0] + }) + + expect(errorLogEntry.message).toContain('error thrown in plugin') + } +) + +test.concurrent( + `plugin method tests: records success in app metrics and creates error log entry on unawaited promise rejection`, + async () => { + const plugin = await createPlugin({ + organization_id: organizationId, + name: 'test plugin', + plugin_type: 'source', + is_global: false, + source__index_ts: ` export async function processEvent(event) { void new Promise(() => { throw new Error('error thrown in plugin') }) return event } `, - }) - const teamId = await createTeam(organizationId) - const pluginConfig = await createAndReloadPluginConfig(teamId, plugin.id) + }) + const teamId = await createTeam(organizationId) + const pluginConfig = await createAndReloadPluginConfig(teamId, plugin.id) - const distinctId = new UUIDT().toString() - const uuid = new UUIDT().toString() + const distinctId = new UUIDT().toString() + const uuid = new UUIDT().toString() - const event = { - event: 'custom event', - properties: { name: 'haha' }, - } + const event = { + event: 'custom event', + properties: { name: 'haha' }, + } - await capture({ teamId, distinctId, uuid, event: event.event, properties: event.properties }) + await capture({ teamId, distinctId, uuid, event: event.event, properties: event.properties }) - await waitForExpect(async () => { - const events = await fetchEvents(teamId) - expect(events.length).toBe(1) - return events - }) + await waitForExpect(async () => { + const events = await fetchEvents(teamId) + expect(events.length).toBe(1) + return events + }) - const errorLogEntry = await waitForExpect(async () => { - const errorLogEntries = (await fetchPluginLogEntries(pluginConfig.id)).filter( - (entry) => entry.type == PluginLogEntryType.Error - ) - expect(errorLogEntries.length).toBe(1) - return errorLogEntries[0] - }) + const appMetric = await waitForExpect(async () => { + const appMetrics = await fetchPluginAppMetrics(pluginConfig.id) + expect(appMetrics.length).toEqual(1) + return appMetrics[0] + }) - expect(errorLogEntry.message).toContain('error thrown in plugin') -}) + expect(appMetric.successes).toEqual(1) + expect(appMetric.failures).toEqual(0) + + const errorLogEntry = await waitForExpect(async () => { + const errorLogEntries = (await fetchPluginLogEntries(pluginConfig.id)).filter( + (entry) => entry.type == PluginLogEntryType.Error + ) + expect(errorLogEntries.length).toBe(1) + return errorLogEntries[0] + }) + + expect(errorLogEntry.message).toContain('error thrown in plugin') + } +) test.concurrent(`plugin method tests: teardown is called on stateful plugin reload if they are updated`, async () => { const plugin = await createPlugin({ From 7e3d94dd24a38e5687ba550df2b693813a55275f Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Tue, 21 Nov 2023 08:48:13 -0800 Subject: [PATCH 15/23] Remove `has_error` attribute from `PluginConfig` (also remove `setError` to appease the ESLint gods.) --- plugin-server/functional_tests/api.ts | 2 +- .../functional_tests/plugins.test.ts | 2 +- plugin-server/src/types.ts | 1 - plugin-server/src/utils/db/error.ts | 7 ----- plugin-server/src/utils/db/sql.ts | 4 +-- plugin-server/src/worker/vm/lazy.ts | 4 +-- plugin-server/tests/helpers/sql.ts | 2 +- plugin-server/tests/sql.test.ts | 30 ++----------------- 8 files changed, 9 insertions(+), 43 deletions(-) diff --git a/plugin-server/functional_tests/api.ts b/plugin-server/functional_tests/api.ts index cba596aafddfa..cd6c4c99c325d 100644 --- a/plugin-server/functional_tests/api.ts +++ b/plugin-server/functional_tests/api.ts @@ -152,7 +152,7 @@ export const createPlugin = async (plugin: Omit) => { } export const createPluginConfig = async ( - pluginConfig: Omit, + pluginConfig: Omit, enabled = true ) => { return await insertRow(postgres, 'posthog_pluginconfig', { diff --git a/plugin-server/functional_tests/plugins.test.ts b/plugin-server/functional_tests/plugins.test.ts index 93d4152422497..cb8eb6bef53a2 100644 --- a/plugin-server/functional_tests/plugins.test.ts +++ b/plugin-server/functional_tests/plugins.test.ts @@ -133,7 +133,7 @@ test.concurrent( expect(appMetric.successes).toEqual(0) expect(appMetric.failures).toEqual(1) expect(appMetric.error_type).toEqual('Error') - expect(JSON.parse(appMetric.error_details as string)).toMatchObject({ + expect(JSON.parse(appMetric.error_details!)).toMatchObject({ error: { message: 'error thrown in plugin' }, event: { properties: event.properties }, }) diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index 700bf4f6cef89..c9975ae1aecf9 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -396,7 +396,6 @@ export interface PluginConfig { enabled: boolean order: number config: Record - has_error: boolean attachments?: Record vm?: LazyPluginVM | null created_at: string diff --git a/plugin-server/src/utils/db/error.ts b/plugin-server/src/utils/db/error.ts index f161fa289bc48..1d05a2ba0e7ee 100644 --- a/plugin-server/src/utils/db/error.ts +++ b/plugin-server/src/utils/db/error.ts @@ -64,13 +64,6 @@ export async function processError( await setError(server, errorJson, pluginConfig) } -export async function clearError(server: Hub, pluginConfig: PluginConfig): Promise { - // running this may causes weird deadlocks with piscina and vms, so avoiding if possible - if (pluginConfig.has_error) { - await setError(server, null, pluginConfig) - } -} - export function cleanErrorStackTrace(stack: string | undefined): string | undefined { if (!stack) { return stack diff --git a/plugin-server/src/utils/db/sql.ts b/plugin-server/src/utils/db/sql.ts index 9a4a9a9eaef6a..c6dfb5dd1f19a 100644 --- a/plugin-server/src/utils/db/sql.ts +++ b/plugin-server/src/utils/db/sql.ts @@ -22,8 +22,7 @@ function pluginConfigsInForceQuery(specificField?: keyof PluginConfig): string { posthog_pluginconfig.order, posthog_pluginconfig.config, posthog_pluginconfig.updated_at, - posthog_pluginconfig.created_at, - posthog_pluginconfig.error IS NOT NULL AS has_error + posthog_pluginconfig.created_at ` return `SELECT ${fields} @@ -117,6 +116,7 @@ export async function setPluginCapabilities( } export async function setError(hub: Hub, pluginError: PluginError | null, pluginConfig: PluginConfig): Promise { + // TODO: pluginError can likely be required now, but need to check call sites more thoroughly to ensure that's the case. if (pluginError) { await hub.db.queuePluginLogEntry({ pluginConfig, diff --git a/plugin-server/src/worker/vm/lazy.ts b/plugin-server/src/worker/vm/lazy.ts index 301abb30e964b..0a6aec913f38a 100644 --- a/plugin-server/src/worker/vm/lazy.ts +++ b/plugin-server/src/worker/vm/lazy.ts @@ -12,7 +12,7 @@ import { PluginTaskType, VMMethods, } from '../../types' -import { clearError, processError } from '../../utils/db/error' +import { processError } from '../../utils/db/error' import { disablePlugin, setPluginCapabilities } from '../../utils/db/sql' import { instrument } from '../../utils/metrics' import { getNextRetryMs } from '../../utils/retries' @@ -242,8 +242,6 @@ export class LazyPluginVM { `setupPlugin succeeded (instance ID ${this.hub.instanceId}).`, PluginLogEntryType.Debug ) - - void clearError(this.hub, this.pluginConfig) } catch (error) { this.hub.statsd?.increment('plugin.setup.fail', { plugin: this.pluginConfig.plugin?.name ?? '?' }) this.hub.statsd?.timing('plugin.setup.fail_timing', timer, { diff --git a/plugin-server/tests/helpers/sql.ts b/plugin-server/tests/helpers/sql.ts index 39e2a6b166a8f..a42522f38c810 100644 --- a/plugin-server/tests/helpers/sql.ts +++ b/plugin-server/tests/helpers/sql.ts @@ -296,7 +296,7 @@ export const createPlugin = async (pg: PostgresRouter, plugin: Omit + pluginConfig: Omit ) => { return await insertRow(pg, 'posthog_pluginconfig', { ...pluginConfig, diff --git a/plugin-server/tests/sql.test.ts b/plugin-server/tests/sql.test.ts index 1d72c6c7bbfc1..f3e6dad94901f 100644 --- a/plugin-server/tests/sql.test.ts +++ b/plugin-server/tests/sql.test.ts @@ -1,14 +1,8 @@ -import { Hub, PluginError } from '../src/types' +import { Hub } from '../src/types' import { createHub } from '../src/utils/db/hub' import { PostgresUse } from '../src/utils/db/postgres' -import { - disablePlugin, - getPluginAttachmentRows, - getPluginConfigRows, - getPluginRows, - setError, -} from '../src/utils/db/sql' -import { commonOrganizationId, pluginConfig39 } from './helpers/plugins' +import { disablePlugin, getPluginAttachmentRows, getPluginConfigRows, getPluginRows } from '../src/utils/db/sql' +import { commonOrganizationId } from './helpers/plugins' import { resetTestDatabase } from './helpers/sql' jest.setTimeout(20_000) @@ -59,7 +53,6 @@ describe('sql', () => { localhostIP: '94.224.212.175', }, enabled: true, - has_error: false, id: 39, order: 0, plugin_id: 60, @@ -70,23 +63,6 @@ describe('sql', () => { const rows1 = await getPluginConfigRows(hub) expect(rows1).toEqual([expectedRow]) - - await hub.db.postgres.query( - PostgresUse.COMMON_WRITE, - "update posthog_team set plugins_opt_in='f'", - undefined, - 'testTag' - ) - const pluginError: PluginError = { message: 'error happened', time: 'now' } - await setError(hub, pluginError, pluginConfig39) - - const rows2 = await getPluginConfigRows(hub) - expect(rows2).toEqual([ - { - ...expectedRow, - has_error: true, - }, - ]) }) test('getPluginRows', async () => { From 185d8340e5a209d9b7fae9f8a7871b8c5e3944b7 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Tue, 21 Nov 2023 09:53:13 -0800 Subject: [PATCH 16/23] One step forward, one step backward. --- .../ingestion-queues/run-async-handlers-event-pipeline.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugin-server/tests/main/ingestion-queues/run-async-handlers-event-pipeline.test.ts b/plugin-server/tests/main/ingestion-queues/run-async-handlers-event-pipeline.test.ts index 6f73fb042b9b4..a5be1f22ddc8d 100644 --- a/plugin-server/tests/main/ingestion-queues/run-async-handlers-event-pipeline.test.ts +++ b/plugin-server/tests/main/ingestion-queues/run-async-handlers-event-pipeline.test.ts @@ -68,7 +68,7 @@ describe('runAppsOnEventPipeline()', () => { afterEach(() => { jest.clearAllTimers() jest.useRealTimers() - jest.clearAllMocks() + jest.restoreAllMocks() }) afterAll(async () => { From afbe6b78e5ba84f5c01b9b25e5291c80813678b5 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Tue, 21 Nov 2023 10:29:15 -0800 Subject: [PATCH 17/23] Just reset everything each test run rather than trying to figure out what is happening here. --- .../run-async-handlers-event-pipeline.test.ts | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/plugin-server/tests/main/ingestion-queues/run-async-handlers-event-pipeline.test.ts b/plugin-server/tests/main/ingestion-queues/run-async-handlers-event-pipeline.test.ts index a5be1f22ddc8d..064d889ad62dc 100644 --- a/plugin-server/tests/main/ingestion-queues/run-async-handlers-event-pipeline.test.ts +++ b/plugin-server/tests/main/ingestion-queues/run-async-handlers-event-pipeline.test.ts @@ -53,28 +53,21 @@ describe('runAppsOnEventPipeline()', () => { let redis: Redis.Redis let closeHub: () => Promise - beforeEach(() => { + beforeEach(async () => { // Use fake timers to ensure that we don't need to wait on e.g. retry logic. jest.useFakeTimers({ advanceTimers: true }) - }) - - beforeAll(async () => { - jest.useFakeTimers({ advanceTimers: true }) ;[hub, closeHub] = await createHub() redis = await hub.redisPool.acquire() await hub.postgres.query(PostgresUse.COMMON_WRITE, POSTGRES_DELETE_TABLES_QUERY, null, 'deleteTables') // Need to clear the DB to avoid unique constraint violations on ids }) - afterEach(() => { - jest.clearAllTimers() - jest.useRealTimers() - jest.restoreAllMocks() - }) - - afterAll(async () => { + afterEach(async () => { await hub.redisPool.release(redis) await teardownPlugins(hub) await closeHub() + jest.clearAllTimers() + jest.useRealTimers() + jest.restoreAllMocks() }) test('throws on produce errors', async () => { From ff4d6e950dfb83828e3c5d14302aeb7bc36e6571 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Tue, 21 Nov 2023 13:55:12 -0800 Subject: [PATCH 18/23] Sidestep Postgres error column dependency in teardown tests. --- plugin-server/tests/helpers/sql.ts | 13 ----- plugin-server/tests/main/teardown.test.ts | 58 +++++++++++++++++------ 2 files changed, 44 insertions(+), 27 deletions(-) diff --git a/plugin-server/tests/helpers/sql.ts b/plugin-server/tests/helpers/sql.ts index a42522f38c810..eb167cccdb20f 100644 --- a/plugin-server/tests/helpers/sql.ts +++ b/plugin-server/tests/helpers/sql.ts @@ -1,5 +1,4 @@ import { DateTime } from 'luxon' -import { Pool } from 'pg' import { defaultConfig } from '../../src/config/config' import { @@ -269,18 +268,6 @@ export async function getFirstTeam(hub: Hub): Promise { return (await getTeams(hub))[0] } -export async function getErrorForPluginConfig(id: number): Promise { - const db = new Pool({ connectionString: defaultConfig.DATABASE_URL! }) - let error - try { - const response = await db.query('SELECT * FROM posthog_pluginconfig WHERE id = $1', [id]) - error = response.rows[0]['error'] - } catch {} - - await db.end() - return error -} - export const createPlugin = async (pg: PostgresRouter, plugin: Omit) => { return await insertRow(pg, 'posthog_plugin', { ...plugin, diff --git a/plugin-server/tests/main/teardown.test.ts b/plugin-server/tests/main/teardown.test.ts index 4e5e69371523b..11120b3a846bc 100644 --- a/plugin-server/tests/main/teardown.test.ts +++ b/plugin-server/tests/main/teardown.test.ts @@ -1,11 +1,12 @@ +import ClickHouse from '@posthog/clickhouse' import { PluginEvent } from '@posthog/plugin-scaffold' import { startPluginsServer } from '../../src/main/pluginsServer' -import { Hub, LogLevel } from '../../src/types' +import { Hub, LogLevel, PluginLogEntry, PluginLogEntrySource, PluginLogEntryType } from '../../src/types' import { runEventPipeline } from '../../src/worker/ingestion/event-pipeline/runner' import { makePiscina } from '../../src/worker/piscina' import { pluginConfig39 } from '../helpers/plugins' -import { getErrorForPluginConfig, resetTestDatabase } from '../helpers/sql' +import { resetTestDatabase } from '../helpers/sql' jest.mock('../../src/utils/status') jest.setTimeout(60000) // 60 sec timeout @@ -21,6 +22,17 @@ const defaultEvent: PluginEvent = { properties: { key: 'value' }, } +async function getLogEntriesForPluginConfig(hub: Hub, pluginConfigId: number) { + const { data: logEntries } = (await hub.clickhouse.querying(` + SELECT * + FROM plugin_log_entries + WHERE + plugin_config_id = ${pluginConfigId} AND + instance_id = '${hub.instanceId}' + ORDER BY timestamp`)) as unknown as ClickHouse.ObjectQueryResult + return logEntries +} + describe('teardown', () => { const processEvent = async (hub: Hub, event: PluginEvent) => { const result = await runEventPipeline(hub, event) @@ -39,6 +51,7 @@ describe('teardown', () => { throw new Error('This Happened In The Teardown Palace') } `) + const { hub, stop } = await startPluginsServer( { WORKER_CONCURRENCY: 2, @@ -48,16 +61,25 @@ describe('teardown', () => { undefined ) - const error1 = await getErrorForPluginConfig(pluginConfig39.id) - expect(error1).toBe(null) - await processEvent(hub!, defaultEvent) - await stop?.() + await stop!() // verify the teardownPlugin code runs - const error2 = await getErrorForPluginConfig(pluginConfig39.id) - expect(error2.message).toBe('This Happened In The Teardown Palace') + // XXX: potential race condition here, should probably have grace period + const logEntries = await getLogEntriesForPluginConfig(hub!, pluginConfig39.id) + + const systemErrors = logEntries.filter( + (logEntry) => logEntry.source == PluginLogEntrySource.System && logEntry.type == PluginLogEntryType.Error + ) + expect(systemErrors).toHaveLength(1) + expect(systemErrors[0].message).toContain('Plugin failed to unload') + + const pluginErrors = logEntries.filter( + (logEntry) => logEntry.source == PluginLogEntrySource.Plugin && logEntry.type == PluginLogEntryType.Error + ) + expect(pluginErrors).toHaveLength(1) + expect(pluginErrors[0].message).toContain('This Happened In The Teardown Palace') }) test('no need to tear down if plugin was never setup', async () => { @@ -71,7 +93,7 @@ describe('teardown', () => { throw new Error('This Happened In The Teardown Palace') } `) - const { stop } = await startPluginsServer( + const { hub, stop } = await startPluginsServer( { WORKER_CONCURRENCY: 2, LOG_LEVEL: LogLevel.Log, @@ -80,14 +102,22 @@ describe('teardown', () => { undefined ) - const error1 = await getErrorForPluginConfig(pluginConfig39.id) - expect(error1).toBe(null) + await stop!() - await stop?.() + // verify the teardownPlugin code runs + // XXX: potential race condition here, should probably have grace period + const logEntries = await getLogEntriesForPluginConfig(hub!, pluginConfig39.id) + + const systemLogs = logEntries.filter((logEntry) => logEntry.source == PluginLogEntrySource.System) + expect(systemLogs).toHaveLength(2) + expect(systemLogs[0].message).toContain('Plugin loaded') + expect(systemLogs[1].message).toContain('Plugin unloaded') // verify the teardownPlugin code doesn't run, because processEvent was never called // and thus the plugin was never setup - see LazyVM - const error2 = await getErrorForPluginConfig(pluginConfig39.id) - expect(error2).toBe(null) + const pluginErrors = logEntries.filter( + (logEntry) => logEntry.source == PluginLogEntrySource.Plugin && logEntry.type == PluginLogEntryType.Error + ) + expect(pluginErrors).toHaveLength(0) }) }) From 91248582d4d51b3339904cde241aa2a7564d9cc7 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Tue, 21 Nov 2023 14:40:22 -0800 Subject: [PATCH 19/23] Make tests simultaneously a little better and a little worse somehow. --- plugin-server/tests/main/teardown.test.ts | 70 +++++++++++++---------- 1 file changed, 40 insertions(+), 30 deletions(-) diff --git a/plugin-server/tests/main/teardown.test.ts b/plugin-server/tests/main/teardown.test.ts index 11120b3a846bc..50e3948e912cf 100644 --- a/plugin-server/tests/main/teardown.test.ts +++ b/plugin-server/tests/main/teardown.test.ts @@ -1,6 +1,7 @@ import ClickHouse from '@posthog/clickhouse' import { PluginEvent } from '@posthog/plugin-scaffold' +import { waitForExpect } from '../../functional_tests/expectations' import { startPluginsServer } from '../../src/main/pluginsServer' import { Hub, LogLevel, PluginLogEntry, PluginLogEntrySource, PluginLogEntryType } from '../../src/types' import { runEventPipeline } from '../../src/worker/ingestion/event-pipeline/runner' @@ -65,21 +66,26 @@ describe('teardown', () => { await stop!() - // verify the teardownPlugin code runs - // XXX: potential race condition here, should probably have grace period - const logEntries = await getLogEntriesForPluginConfig(hub!, pluginConfig39.id) - - const systemErrors = logEntries.filter( - (logEntry) => logEntry.source == PluginLogEntrySource.System && logEntry.type == PluginLogEntryType.Error - ) - expect(systemErrors).toHaveLength(1) - expect(systemErrors[0].message).toContain('Plugin failed to unload') - - const pluginErrors = logEntries.filter( - (logEntry) => logEntry.source == PluginLogEntrySource.Plugin && logEntry.type == PluginLogEntryType.Error - ) - expect(pluginErrors).toHaveLength(1) - expect(pluginErrors[0].message).toContain('This Happened In The Teardown Palace') + // verify the teardownPlugin code runs -- since we're reading from + // ClickHouse, we need to give it a bit of time to have consumed from + // the topic and written everything we're looking for to the table + await waitForExpect(async () => { + const logEntries = await getLogEntriesForPluginConfig(hub!, pluginConfig39.id) + + const systemErrors = logEntries.filter( + (logEntry) => + logEntry.source == PluginLogEntrySource.System && logEntry.type == PluginLogEntryType.Error + ) + expect(systemErrors).toHaveLength(1) + expect(systemErrors[0].message).toContain('Plugin failed to unload') + + const pluginErrors = logEntries.filter( + (logEntry) => + logEntry.source == PluginLogEntrySource.Plugin && logEntry.type == PluginLogEntryType.Error + ) + expect(pluginErrors).toHaveLength(1) + expect(pluginErrors[0].message).toContain('This Happened In The Teardown Palace') + }) }) test('no need to tear down if plugin was never setup', async () => { @@ -104,20 +110,24 @@ describe('teardown', () => { await stop!() - // verify the teardownPlugin code runs - // XXX: potential race condition here, should probably have grace period - const logEntries = await getLogEntriesForPluginConfig(hub!, pluginConfig39.id) - - const systemLogs = logEntries.filter((logEntry) => logEntry.source == PluginLogEntrySource.System) - expect(systemLogs).toHaveLength(2) - expect(systemLogs[0].message).toContain('Plugin loaded') - expect(systemLogs[1].message).toContain('Plugin unloaded') - - // verify the teardownPlugin code doesn't run, because processEvent was never called - // and thus the plugin was never setup - see LazyVM - const pluginErrors = logEntries.filter( - (logEntry) => logEntry.source == PluginLogEntrySource.Plugin && logEntry.type == PluginLogEntryType.Error - ) - expect(pluginErrors).toHaveLength(0) + // verify the teardownPlugin code runs -- since we're reading from + // ClickHouse, we need to give it a bit of time to have consumed from + // the topic and written everything we're looking for to the table + await waitForExpect(async () => { + const logEntries = await getLogEntriesForPluginConfig(hub!, pluginConfig39.id) + + const systemLogs = logEntries.filter((logEntry) => logEntry.source == PluginLogEntrySource.System) + expect(systemLogs).toHaveLength(2) + expect(systemLogs[0].message).toContain('Plugin loaded') + expect(systemLogs[1].message).toContain('Plugin unloaded') + + // verify the teardownPlugin code doesn't run, because processEvent was never called + // and thus the plugin was never setup - see LazyVM + const pluginErrors = logEntries.filter( + (logEntry) => + logEntry.source == PluginLogEntrySource.Plugin && logEntry.type == PluginLogEntryType.Error + ) + expect(pluginErrors).toHaveLength(0) + }) }) }) From e96da9d95d707dca261f71889772b74db3801adc Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Tue, 21 Nov 2023 14:47:39 -0800 Subject: [PATCH 20/23] Remove `setError` completely. --- plugin-server/src/utils/db/error.ts | 14 ++++++++---- plugin-server/src/utils/db/sql.ts | 26 +---------------------- plugin-server/tests/helpers/sqlMock.ts | 1 - plugin-server/tests/utils/retries.test.ts | 1 - 4 files changed, 11 insertions(+), 31 deletions(-) diff --git a/plugin-server/src/utils/db/error.ts b/plugin-server/src/utils/db/error.ts index 1d05a2ba0e7ee..96924a35a9c17 100644 --- a/plugin-server/src/utils/db/error.ts +++ b/plugin-server/src/utils/db/error.ts @@ -1,8 +1,7 @@ import { PluginEvent, PostHogEvent, ProcessedPluginEvent } from '@posthog/plugin-scaffold' import { captureException } from '@sentry/node' -import { Hub, PluginConfig, PluginError } from '../../types' -import { setError } from './sql' +import { Hub, PluginConfig, PluginError, PluginLogEntrySource, PluginLogEntryType } from '../../types' export class DependencyUnavailableError extends Error { constructor(message: string, dependencyName: string, error: Error) { @@ -47,7 +46,7 @@ export async function processError( throw error } - const errorJson: PluginError = + const pluginError: PluginError = typeof error === 'string' ? { message: error, @@ -61,7 +60,14 @@ export async function processError( event: event, } - await setError(server, errorJson, pluginConfig) + await server.db.queuePluginLogEntry({ + pluginConfig, + source: PluginLogEntrySource.Plugin, + type: PluginLogEntryType.Error, + message: pluginError.stack ?? pluginError.message, + instanceId: server.instanceId, + timestamp: pluginError.time, + }) } export function cleanErrorStackTrace(stack: string | undefined): string | undefined { diff --git a/plugin-server/src/utils/db/sql.ts b/plugin-server/src/utils/db/sql.ts index c6dfb5dd1f19a..202f7f4ece5eb 100644 --- a/plugin-server/src/utils/db/sql.ts +++ b/plugin-server/src/utils/db/sql.ts @@ -1,14 +1,4 @@ -import { - Hub, - Plugin, - PluginAttachmentDB, - PluginCapabilities, - PluginConfig, - PluginConfigId, - PluginError, - PluginLogEntrySource, - PluginLogEntryType, -} from '../../types' +import { Hub, Plugin, PluginAttachmentDB, PluginCapabilities, PluginConfig, PluginConfigId } from '../../types' import { PostgresUse } from './postgres' function pluginConfigsInForceQuery(specificField?: keyof PluginConfig): string { @@ -115,20 +105,6 @@ export async function setPluginCapabilities( ) } -export async function setError(hub: Hub, pluginError: PluginError | null, pluginConfig: PluginConfig): Promise { - // TODO: pluginError can likely be required now, but need to check call sites more thoroughly to ensure that's the case. - if (pluginError) { - await hub.db.queuePluginLogEntry({ - pluginConfig, - source: PluginLogEntrySource.Plugin, - type: PluginLogEntryType.Error, - message: pluginError.stack ?? pluginError.message, - instanceId: hub.instanceId, - timestamp: pluginError.time, - }) - } -} - export async function disablePlugin(hub: Hub, pluginConfigId: PluginConfigId): Promise { await hub.db.postgres.query( PostgresUse.COMMON_WRITE, diff --git a/plugin-server/tests/helpers/sqlMock.ts b/plugin-server/tests/helpers/sqlMock.ts index 1da87be42a698..378c6bf6273e9 100644 --- a/plugin-server/tests/helpers/sqlMock.ts +++ b/plugin-server/tests/helpers/sqlMock.ts @@ -12,5 +12,4 @@ export const getPluginConfigRows = s.getPluginConfigRows as unknown as jest.Mock export const setPluginCapabilities = s.setPluginCapabilities as unknown as jest.MockedFunction< UnPromisify > -export const setError = s.setError as unknown as jest.MockedFunction> export const disablePlugin = s.disablePlugin as unknown as jest.MockedFunction> diff --git a/plugin-server/tests/utils/retries.test.ts b/plugin-server/tests/utils/retries.test.ts index 927c7b7675ceb..15193e8ad825c 100644 --- a/plugin-server/tests/utils/retries.test.ts +++ b/plugin-server/tests/utils/retries.test.ts @@ -8,7 +8,6 @@ import { PromiseManager } from '../../src/worker/vm/promise-manager' jest.useFakeTimers() jest.spyOn(global, 'setTimeout') -jest.mock('../../src/utils/db/error') // Mocking setError which we don't need in tests const mockHub: Hub = { instanceId: new UUID('F8B2F832-6639-4596-ABFC-F9664BC88E84'), From 4156ebaa60b810c06e58b0333173b9c12fd5fb15 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Thu, 23 Nov 2023 09:48:59 -0800 Subject: [PATCH 21/23] Mark the `PluginConfig.error` column as unused. --- posthog/models/plugin.py | 1 + 1 file changed, 1 insertion(+) diff --git a/posthog/models/plugin.py b/posthog/models/plugin.py index ff81a8d09d0f4..34afc1918b1d8 100644 --- a/posthog/models/plugin.py +++ b/posthog/models/plugin.py @@ -228,6 +228,7 @@ class Meta: enabled: models.BooleanField = models.BooleanField(default=False) order: models.IntegerField = models.IntegerField() config: models.JSONField = models.JSONField(default=dict) + # DEPRECATED: use `plugin_log_entries` or `app_metrics` in ClickHouse instead # Error when running this plugin on an event (frontend: PluginErrorType) # - e.g: "undefined is not a function on index.js line 23" # - error = { message: "Exception in processEvent()", time: "iso-string", ...meta } From 3392e15826da376e61f84066f5b11088057c879b Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Mon, 27 Nov 2023 09:47:14 -0800 Subject: [PATCH 22/23] Add PLUGINS_DEFAULT_LOG_LEVEL=0 to instructions --- plugin-server/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugin-server/README.md b/plugin-server/README.md index e1607b78a1c3c..51a793c46bd7b 100644 --- a/plugin-server/README.md +++ b/plugin-server/README.md @@ -47,7 +47,7 @@ testing: 1. run docker `docker compose -f docker-compose.dev.yml up` (in posthog folder) 1. setup the test DBs `pnpm setup:test` -1. start the plugin-server with `APP_METRICS_FLUSH_FREQUENCY_MS=0 CLICKHOUSE_DATABASE='default' DATABASE_URL=postgres://posthog:posthog@localhost:5432/test_posthog RELOAD_PLUGIN_JITTER_MAX_MS=0 pnpm start:dev` +1. start the plugin-server with `APP_METRICS_FLUSH_FREQUENCY_MS=0 CLICKHOUSE_DATABASE='default' DATABASE_URL=postgres://posthog:posthog@localhost:5432/test_posthog PLUGINS_DEFAULT_LOG_LEVEL=0 RELOAD_PLUGIN_JITTER_MAX_MS=0 pnpm start:dev` 1. run the tests with `CLICKHOUSE_DATABASE='default' DATABASE_URL=postgres://posthog:posthog@localhost:5432/test_posthog pnpm functional_tests --watch` ## CLI flags From b6a4107ffcaa6ff54557ffe757e60470ec96c3f1 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Mon, 27 Nov 2023 09:50:24 -0800 Subject: [PATCH 23/23] Make the README a bit easier to read. --- plugin-server/README.md | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/plugin-server/README.md b/plugin-server/README.md index 51a793c46bd7b..b61cba750d9f3 100644 --- a/plugin-server/README.md +++ b/plugin-server/README.md @@ -24,7 +24,7 @@ Let's get you developing the plugin server in no time: 1. Prepare for running functional tests. See notes below. -## Functional tests +### Running Functional Tests Functional tests are provided located in `functional_tests`. They provide tests for high level functionality of the plugin-server, i.e. functionality that any @@ -47,8 +47,21 @@ testing: 1. run docker `docker compose -f docker-compose.dev.yml up` (in posthog folder) 1. setup the test DBs `pnpm setup:test` -1. start the plugin-server with `APP_METRICS_FLUSH_FREQUENCY_MS=0 CLICKHOUSE_DATABASE='default' DATABASE_URL=postgres://posthog:posthog@localhost:5432/test_posthog PLUGINS_DEFAULT_LOG_LEVEL=0 RELOAD_PLUGIN_JITTER_MAX_MS=0 pnpm start:dev` -1. run the tests with `CLICKHOUSE_DATABASE='default' DATABASE_URL=postgres://posthog:posthog@localhost:5432/test_posthog pnpm functional_tests --watch` +1. start the plugin-server: + ```bash + APP_METRICS_FLUSH_FREQUENCY_MS=0 \ + CLICKHOUSE_DATABASE='default' \ + DATABASE_URL=postgres://posthog:posthog@localhost:5432/test_posthog \ + PLUGINS_DEFAULT_LOG_LEVEL=0 \ + RELOAD_PLUGIN_JITTER_MAX_MS=0 \ + pnpm start:dev + ``` +1. run the tests: + ```bash + CLICKHOUSE_DATABASE='default' \ + DATABASE_URL=postgres://posthog:posthog@localhost:5432/test_posthog \ + pnpm functional_tests --watch + ``` ## CLI flags