Skip to content

Commit

Permalink
feat(cdp): log filtering errors
Browse files Browse the repository at this point in the history
  • Loading branch information
mariusandra committed Dec 17, 2024
1 parent daa0a27 commit b767a6b
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 7 deletions.
27 changes: 24 additions & 3 deletions plugin-server/src/cdp/cdp-consumers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import { createKafkaProducerWrapper } from '../utils/db/hub'
import { KafkaProducerWrapper } from '../utils/db/kafka-producer-wrapper'
import { safeClickhouseString } from '../utils/db/utils'
import { status } from '../utils/status'
import { castTimestampOrNow } from '../utils/utils'
import { castTimestampOrNow, UUIDT } from '../utils/utils'
import { RustyHook } from '../worker/rusty-hook'
import { FetchExecutor } from './fetch-executor'
import { GroupsManager } from './groups-manager'
Expand All @@ -43,7 +43,9 @@ import {
HogFunctionInvocationResult,
HogFunctionInvocationSerialized,
HogFunctionInvocationSerializedCompressed,
HogFunctionLogEntrySerialized,
HogFunctionMessageToProduce,
HogFunctionType,
HogHooksFetchResponse,
} from './types'
import {
Expand Down Expand Up @@ -199,6 +201,24 @@ abstract class CdpConsumerBase {
})
}

protected logFilteringError(item: HogFunctionType, error: string) {
const logEntry: HogFunctionLogEntrySerialized = {
team_id: item.team_id,
log_source: 'hog_function',
log_source_id: item.id,
instance_id: new UUIDT().toString(), // random UUID, like it would be for an invocation
timestamp: castTimestampOrNow(null, TimestampFormat.ClickHouse),
level: 'error',
message: `Error filtering: ${error}`,
}

this.messagesToProduce.push({
topic: KAFKA_LOG_ENTRIES,
value: logEntry,
key: logEntry.instance_id,
})
}

// NOTE: These will be removed once we are only on Cyclotron
protected async queueInvocationsToKafka(invocation: HogFunctionInvocation[]) {
await Promise.all(
Expand Down Expand Up @@ -479,15 +499,16 @@ export class CdpProcessedEventsConsumer extends CdpConsumerBase {
})
)

erroredFunctions.forEach((item) =>
erroredFunctions.forEach(([item, error]) => {
this.produceAppMetric({
team_id: item.team_id,
app_source_id: item.id,
metric_kind: 'other',
metric_name: 'filtering_failed',
count: 1,
})
)
this.logFilteringError(item, error)
})
})

const states = await this.hogWatcher.getStates(possibleInvocations.map((x) => x.hogFunction.id))
Expand Down
8 changes: 4 additions & 4 deletions plugin-server/src/cdp/hog-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,14 @@ export class HogExecutor {
findMatchingFunctions(event: HogFunctionInvocationGlobals): {
matchingFunctions: HogFunctionType[]
nonMatchingFunctions: HogFunctionType[]
erroredFunctions: HogFunctionType[]
erroredFunctions: [HogFunctionType, string][]
} {
const allFunctionsForTeam = this.hogFunctionManager.getTeamHogDestinations(event.project.id)
const filtersGlobals = convertToHogFunctionFilterGlobal(event)

const nonMatchingFunctions: HogFunctionType[] = []
const matchingFunctions: HogFunctionType[] = []
const erroredFunctions: HogFunctionType[] = []
const erroredFunctions: [HogFunctionType, string][] = []

// Filter all functions based on the invocation
allFunctionsForTeam.forEach((hogFunction) => {
Expand All @@ -143,7 +143,7 @@ export class HogExecutor {
error: filterResult.error.message,
result: filterResult,
})
erroredFunctions.push(hogFunction)
erroredFunctions.push([hogFunction, filterResult.error.message])
return
}
} catch (error) {
Expand All @@ -153,7 +153,7 @@ export class HogExecutor {
teamId: hogFunction.team_id,
error: error.message,
})
erroredFunctions.push(hogFunction)
erroredFunctions.push([hogFunction, error.message])
return
} finally {
const duration = performance.now() - start
Expand Down
50 changes: 50 additions & 0 deletions plugin-server/tests/cdp/cdp-processed-events-consumer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -333,5 +333,55 @@ describe('CDP Processed Events Consumer', () => {
])
})
})

describe('filtering errors', () => {
let globals: HogFunctionInvocationGlobals

beforeEach(() => {
globals = createHogExecutionGlobals({
project: {
id: team.id,
} as any,
event: {
uuid: 'b3a1fe86-b10c-43cc-acaf-d208977608d0',
event: '$pageview',
properties: {
$current_url: 'https://posthog.com',
$lib_version: '1.0.0',
},
} as any,
})
})

it('should filter out functions that error while filtering', async () => {
const erroringFunction = await insertHogFunction({
...HOG_EXAMPLES.input_printer,
...HOG_INPUTS_EXAMPLES.secret_inputs,
...HOG_FILTERS_EXAMPLES.broken_filters,
})
await processor.processBatch([globals])
expect(decodeAllKafkaMessages()).toMatchObject([
{
key: expect.any(String),
topic: 'clickhouse_app_metrics2_test',
value: {
app_source: 'hog_function',
app_source_id: erroringFunction.id,
count: 1,
metric_kind: 'other',
metric_name: 'filtering_failed',
team_id: 2,
timestamp: expect.any(String),
},
},
{
topic: 'log_entries_test',
value: {
message: 'Error filtering: Invalid HogQL bytecode, stack is empty, can not pop',
},
},
])
})
})
})
})

0 comments on commit b767a6b

Please sign in to comment.