From b767a6ba949f5a39f3388e4fef08322c2445593a Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Tue, 17 Dec 2024 10:50:11 +0100 Subject: [PATCH] feat(cdp): log filtering errors --- plugin-server/src/cdp/cdp-consumers.ts | 27 ++++++++-- plugin-server/src/cdp/hog-executor.ts | 8 +-- .../cdp/cdp-processed-events-consumer.test.ts | 50 +++++++++++++++++++ 3 files changed, 78 insertions(+), 7 deletions(-) diff --git a/plugin-server/src/cdp/cdp-consumers.ts b/plugin-server/src/cdp/cdp-consumers.ts index dbbd163c72ae8..425cd69698c35 100644 --- a/plugin-server/src/cdp/cdp-consumers.ts +++ b/plugin-server/src/cdp/cdp-consumers.ts @@ -28,7 +28,7 @@ import { createKafkaProducerWrapper } from '../utils/db/hub' import { KafkaProducerWrapper } from '../utils/db/kafka-producer-wrapper' import { safeClickhouseString } from '../utils/db/utils' import { status } from '../utils/status' -import { castTimestampOrNow } from '../utils/utils' +import { castTimestampOrNow, UUIDT } from '../utils/utils' import { RustyHook } from '../worker/rusty-hook' import { FetchExecutor } from './fetch-executor' import { GroupsManager } from './groups-manager' @@ -43,7 +43,9 @@ import { HogFunctionInvocationResult, HogFunctionInvocationSerialized, HogFunctionInvocationSerializedCompressed, + HogFunctionLogEntrySerialized, HogFunctionMessageToProduce, + HogFunctionType, HogHooksFetchResponse, } from './types' import { @@ -199,6 +201,24 @@ abstract class CdpConsumerBase { }) } + protected logFilteringError(item: HogFunctionType, error: string) { + const logEntry: HogFunctionLogEntrySerialized = { + team_id: item.team_id, + log_source: 'hog_function', + log_source_id: item.id, + instance_id: new UUIDT().toString(), // random UUID, like it would be for an invocation + timestamp: castTimestampOrNow(null, TimestampFormat.ClickHouse), + level: 'error', + message: `Error filtering: ${error}`, + } + + this.messagesToProduce.push({ + topic: KAFKA_LOG_ENTRIES, + value: logEntry, + key: logEntry.instance_id, + }) + } + // NOTE: These will be removed once we are only on Cyclotron protected async queueInvocationsToKafka(invocation: HogFunctionInvocation[]) { await Promise.all( @@ -479,7 +499,7 @@ export class CdpProcessedEventsConsumer extends CdpConsumerBase { }) ) - erroredFunctions.forEach((item) => + erroredFunctions.forEach(([item, error]) => { this.produceAppMetric({ team_id: item.team_id, app_source_id: item.id, @@ -487,7 +507,8 @@ export class CdpProcessedEventsConsumer extends CdpConsumerBase { metric_name: 'filtering_failed', count: 1, }) - ) + this.logFilteringError(item, error) + }) }) const states = await this.hogWatcher.getStates(possibleInvocations.map((x) => x.hogFunction.id)) diff --git a/plugin-server/src/cdp/hog-executor.ts b/plugin-server/src/cdp/hog-executor.ts index 15e147f022b7f..b1af237213b08 100644 --- a/plugin-server/src/cdp/hog-executor.ts +++ b/plugin-server/src/cdp/hog-executor.ts @@ -113,14 +113,14 @@ export class HogExecutor { findMatchingFunctions(event: HogFunctionInvocationGlobals): { matchingFunctions: HogFunctionType[] nonMatchingFunctions: HogFunctionType[] - erroredFunctions: HogFunctionType[] + erroredFunctions: [HogFunctionType, string][] } { const allFunctionsForTeam = this.hogFunctionManager.getTeamHogDestinations(event.project.id) const filtersGlobals = convertToHogFunctionFilterGlobal(event) const nonMatchingFunctions: HogFunctionType[] = [] const matchingFunctions: HogFunctionType[] = [] - const erroredFunctions: HogFunctionType[] = [] + const erroredFunctions: [HogFunctionType, string][] = [] // Filter all functions based on the invocation allFunctionsForTeam.forEach((hogFunction) => { @@ -143,7 +143,7 @@ export class HogExecutor { error: filterResult.error.message, result: filterResult, }) - erroredFunctions.push(hogFunction) + erroredFunctions.push([hogFunction, filterResult.error.message]) return } } catch (error) { @@ -153,7 +153,7 @@ export class HogExecutor { teamId: hogFunction.team_id, error: error.message, }) - erroredFunctions.push(hogFunction) + erroredFunctions.push([hogFunction, error.message]) return } finally { const duration = performance.now() - start diff --git a/plugin-server/tests/cdp/cdp-processed-events-consumer.test.ts b/plugin-server/tests/cdp/cdp-processed-events-consumer.test.ts index 4bd6eb339c5cf..66487f202f374 100644 --- a/plugin-server/tests/cdp/cdp-processed-events-consumer.test.ts +++ b/plugin-server/tests/cdp/cdp-processed-events-consumer.test.ts @@ -333,5 +333,55 @@ describe('CDP Processed Events Consumer', () => { ]) }) }) + + describe('filtering errors', () => { + let globals: HogFunctionInvocationGlobals + + beforeEach(() => { + globals = createHogExecutionGlobals({ + project: { + id: team.id, + } as any, + event: { + uuid: 'b3a1fe86-b10c-43cc-acaf-d208977608d0', + event: '$pageview', + properties: { + $current_url: 'https://posthog.com', + $lib_version: '1.0.0', + }, + } as any, + }) + }) + + it('should filter out functions that error while filtering', async () => { + const erroringFunction = await insertHogFunction({ + ...HOG_EXAMPLES.input_printer, + ...HOG_INPUTS_EXAMPLES.secret_inputs, + ...HOG_FILTERS_EXAMPLES.broken_filters, + }) + await processor.processBatch([globals]) + expect(decodeAllKafkaMessages()).toMatchObject([ + { + key: expect.any(String), + topic: 'clickhouse_app_metrics2_test', + value: { + app_source: 'hog_function', + app_source_id: erroringFunction.id, + count: 1, + metric_kind: 'other', + metric_name: 'filtering_failed', + team_id: 2, + timestamp: expect.any(String), + }, + }, + { + topic: 'log_entries_test', + value: { + message: 'Error filtering: Invalid HogQL bytecode, stack is empty, can not pop', + }, + }, + ]) + }) + }) }) })