diff --git a/plugin-server/src/cdp/cdp-consumers.ts b/plugin-server/src/cdp/cdp-consumers.ts index dbbd163c72ae8..f738b559a0523 100644 --- a/plugin-server/src/cdp/cdp-consumers.ts +++ b/plugin-server/src/cdp/cdp-consumers.ts @@ -28,7 +28,7 @@ import { createKafkaProducerWrapper } from '../utils/db/hub' import { KafkaProducerWrapper } from '../utils/db/kafka-producer-wrapper' import { safeClickhouseString } from '../utils/db/utils' import { status } from '../utils/status' -import { castTimestampOrNow } from '../utils/utils' +import { castTimestampOrNow, UUIDT } from '../utils/utils' import { RustyHook } from '../worker/rusty-hook' import { FetchExecutor } from './fetch-executor' import { GroupsManager } from './groups-manager' @@ -43,7 +43,9 @@ import { HogFunctionInvocationResult, HogFunctionInvocationSerialized, HogFunctionInvocationSerializedCompressed, + HogFunctionLogEntrySerialized, HogFunctionMessageToProduce, + HogFunctionType, HogHooksFetchResponse, } from './types' import { @@ -199,6 +201,24 @@ abstract class CdpConsumerBase { }) } + protected logFilteringError(item: HogFunctionType, error: string) { + const logEntry: HogFunctionLogEntrySerialized = { + team_id: item.team_id, + log_source: 'hog_function', + log_source_id: item.id, + instance_id: new UUIDT().toString(), // random UUID, like it would be for an invocation + timestamp: castTimestampOrNow(null, TimestampFormat.ClickHouse), + level: 'error', + message: error, + } + + this.messagesToProduce.push({ + topic: KAFKA_LOG_ENTRIES, + value: logEntry, + key: logEntry.instance_id, + }) + } + // NOTE: These will be removed once we are only on Cyclotron protected async queueInvocationsToKafka(invocation: HogFunctionInvocation[]) { await Promise.all( @@ -479,7 +499,7 @@ export class CdpProcessedEventsConsumer extends CdpConsumerBase { }) ) - erroredFunctions.forEach((item) => + erroredFunctions.forEach(([item, error]) => { this.produceAppMetric({ team_id: item.team_id, app_source_id: item.id, @@ -487,7 +507,8 @@ export class CdpProcessedEventsConsumer extends CdpConsumerBase { metric_name: 'filtering_failed', count: 1, }) - ) + this.logFilteringError(item, error) + }) }) const states = await this.hogWatcher.getStates(possibleInvocations.map((x) => x.hogFunction.id)) diff --git a/plugin-server/src/cdp/hog-executor.ts b/plugin-server/src/cdp/hog-executor.ts index 15e147f022b7f..631f8c8438444 100644 --- a/plugin-server/src/cdp/hog-executor.ts +++ b/plugin-server/src/cdp/hog-executor.ts @@ -110,17 +110,17 @@ export class HogExecutor { this.telemetryMatcher = buildIntegerMatcher(this.hub.CDP_HOG_FILTERS_TELEMETRY_TEAMS, true) } - findMatchingFunctions(event: HogFunctionInvocationGlobals): { + findMatchingFunctions(globals: HogFunctionInvocationGlobals): { matchingFunctions: HogFunctionType[] nonMatchingFunctions: HogFunctionType[] - erroredFunctions: HogFunctionType[] + erroredFunctions: [HogFunctionType, string][] } { - const allFunctionsForTeam = this.hogFunctionManager.getTeamHogDestinations(event.project.id) - const filtersGlobals = convertToHogFunctionFilterGlobal(event) + const allFunctionsForTeam = this.hogFunctionManager.getTeamHogDestinations(globals.project.id) + const filtersGlobals = convertToHogFunctionFilterGlobal(globals) const nonMatchingFunctions: HogFunctionType[] = [] const matchingFunctions: HogFunctionType[] = [] - const erroredFunctions: HogFunctionType[] = [] + const erroredFunctions: [HogFunctionType, string][] = [] // Filter all functions based on the invocation allFunctionsForTeam.forEach((hogFunction) => { @@ -143,7 +143,10 @@ export class HogExecutor { error: filterResult.error.message, result: filterResult, }) - erroredFunctions.push(hogFunction) + erroredFunctions.push([ + hogFunction, + `Error filtering event ${globals.event.uuid}: ${filterResult.error.message}`, + ]) return } } catch (error) { @@ -153,7 +156,10 @@ export class HogExecutor { teamId: hogFunction.team_id, error: error.message, }) - erroredFunctions.push(hogFunction) + erroredFunctions.push([ + hogFunction, + `Error filtering event ${globals.event.uuid}: ${error.message}`, + ]) return } finally { const duration = performance.now() - start @@ -165,7 +171,7 @@ export class HogExecutor { hogFunctionName: hogFunction.name, teamId: hogFunction.team_id, duration, - eventId: event.event.uuid, + eventId: globals.event.uuid, }) } } diff --git a/plugin-server/tests/cdp/cdp-processed-events-consumer.test.ts b/plugin-server/tests/cdp/cdp-processed-events-consumer.test.ts index 4bd6eb339c5cf..c559a4240fca4 100644 --- a/plugin-server/tests/cdp/cdp-processed-events-consumer.test.ts +++ b/plugin-server/tests/cdp/cdp-processed-events-consumer.test.ts @@ -333,5 +333,56 @@ describe('CDP Processed Events Consumer', () => { ]) }) }) + + describe('filtering errors', () => { + let globals: HogFunctionInvocationGlobals + + beforeEach(() => { + globals = createHogExecutionGlobals({ + project: { + id: team.id, + } as any, + event: { + uuid: 'b3a1fe86-b10c-43cc-acaf-d208977608d0', + event: '$pageview', + properties: { + $current_url: 'https://posthog.com', + $lib_version: '1.0.0', + }, + } as any, + }) + }) + + it('should filter out functions that error while filtering', async () => { + const erroringFunction = await insertHogFunction({ + ...HOG_EXAMPLES.input_printer, + ...HOG_INPUTS_EXAMPLES.secret_inputs, + ...HOG_FILTERS_EXAMPLES.broken_filters, + }) + await processor.processBatch([globals]) + expect(decodeAllKafkaMessages()).toMatchObject([ + { + key: expect.any(String), + topic: 'clickhouse_app_metrics2_test', + value: { + app_source: 'hog_function', + app_source_id: erroringFunction.id, + count: 1, + metric_kind: 'other', + metric_name: 'filtering_failed', + team_id: 2, + timestamp: expect.any(String), + }, + }, + { + topic: 'log_entries_test', + value: { + message: + 'Error filtering event b3a1fe86-b10c-43cc-acaf-d208977608d0: Invalid HogQL bytecode, stack is empty, can not pop', + }, + }, + ]) + }) + }) }) })