Skip to content

Commit

Permalink
feat: Support mappings for server-side CDP destinations (#27168)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjackwhite authored Jan 15, 2025
1 parent dc867b0 commit f59dc5d
Show file tree
Hide file tree
Showing 11 changed files with 533 additions and 277 deletions.
144 changes: 82 additions & 62 deletions plugin-server/src/cdp/cdp-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import { HogExecutor, MAX_ASYNC_STEPS } from './hog-executor'
import { HogFunctionManager } from './hog-function-manager'
import { HogWatcher, HogWatcherState } from './hog-watcher'
import { HogFunctionInvocationResult, HogFunctionType, LogEntry } from './types'
import { createInvocation } from './utils'

export class CdpApi {
private hogExecutor: HogExecutor
Expand Down Expand Up @@ -116,82 +115,103 @@ export class CdpApi {

let lastResponse: HogFunctionInvocationResult | null = null
let logs: LogEntry[] = []
const errors: any[] = []

const triggerGlobals = {
...globals,
project: {
id: team.id,
name: team.name,
url: `${this.hub.SITE_URL ?? 'http://localhost:8000'}/project/${team.id}`,
...globals.project,
},
}

let count = 0

while (!lastResponse || !lastResponse.finished) {
if (count > MAX_ASYNC_STEPS * 2) {
throw new Error('Too many iterations')
const {
invocations,
logs: filterLogs,
metrics: filterMetrics,
} = this.hogExecutor.buildHogFunctionInvocations([compoundConfiguration], triggerGlobals)

// Add metrics to the logs
filterMetrics.forEach((metric) => {
if (metric.metric_name === 'filtered') {
logs.push({
level: 'info',
timestamp: DateTime.now(),
message: `Mapping trigger not matching filters was ignored.`,
})
}
count += 1

let response: HogFunctionInvocationResult

const invocation =
lastResponse?.invocation ||
createInvocation(
{
...globals,
project: {
id: team.id,
name: team.name,
url: `${this.hub.SITE_URL ?? 'http://localhost:8000'}/project/${team.id}`,
...globals.project,
},
},
compoundConfiguration,
// The "email" hog functions export a "sendEmail" function that we must explicitly call
hogFunction.type === 'email' ? ['sendEmail', [globals.email]] : undefined
)

if (invocation.queue === 'fetch') {
if (mock_async_functions) {
// Add the state, simulating what executeAsyncResponse would do

// Re-parse the fetch args for the logging
const fetchArgs = {
...invocation.queueParameters,
}
})

response = {
invocation: {
...invocation,
queue: 'hog',
queueParameters: { response: { status: 200, headers: {} }, body: '{}' },
},
finished: false,
logs: [
{
level: 'info',
timestamp: DateTime.now(),
message: `Async function 'fetch' was mocked with arguments:`,
},
{
level: 'info',
timestamp: DateTime.now(),
message: `fetch(${JSON.stringify(fetchArgs, null, 2)})`,
filterLogs.forEach((log) => {
logs.push(log)
})

for (const _invocation of invocations) {
let count = 0
let invocation = _invocation

while (!lastResponse || !lastResponse.finished) {
if (count > MAX_ASYNC_STEPS * 2) {
throw new Error('Too many iterations')
}
count += 1

let response: HogFunctionInvocationResult

if (invocation.queue === 'fetch') {
if (mock_async_functions) {
// Add the state, simulating what executeAsyncResponse would do

// Re-parse the fetch args for the logging
const fetchArgs = {
...invocation.queueParameters,
}

response = {
invocation: {
...invocation,
queue: 'hog',
queueParameters: { response: { status: 200, headers: {} }, body: '{}' },
},
],
finished: false,
logs: [
{
level: 'info',
timestamp: DateTime.now(),
message: `Async function 'fetch' was mocked with arguments:`,
},
{
level: 'info',
timestamp: DateTime.now(),
message: `fetch(${JSON.stringify(fetchArgs, null, 2)})`,
},
],
}
} else {
response = await this.fetchExecutor!.executeLocally(invocation)
}
} else {
response = await this.fetchExecutor!.executeLocally(invocation)
response = this.hogExecutor.execute(invocation)
}
} else {
response = this.hogExecutor.execute(invocation)
}

logs = logs.concat(response.logs)
lastResponse = response
logs = logs.concat(response.logs)
lastResponse = response
invocation = response.invocation
if (response.error) {
errors.push(response.error)
}
}
}

res.json({
status: lastResponse.finished ? 'success' : 'error',
error: lastResponse.error ? String(lastResponse.error) : null,
errors: errors.map((e) => String(e)),
logs: logs,
})
} catch (e) {
console.error(e)
res.status(500).json({ error: e.message })
res.status(500).json({ errors: [e.message] })
}
}
}
61 changes: 29 additions & 32 deletions plugin-server/src/cdp/cdp-consumers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import { HogWatcher, HogWatcherState } from './hog-watcher'
import { CdpRedis, createCdpRedisPool } from './redis'
import { CdpInternalEventSchema } from './schema'
import {
HogFunctionAppMetric,
HogFunctionInvocation,
HogFunctionInvocationGlobals,
HogFunctionInvocationResult,
Expand All @@ -55,11 +56,10 @@ import {
convertInternalEventToHogFunctionInvocationGlobals,
convertToCaptureEvent,
convertToHogFunctionInvocationGlobals,
createInvocation,
cyclotronJobToInvocation,
fixLogDeduplication,
gzipObject,
invocationToCyclotronJobUpdate,
prepareLogEntriesForClickhouse,
serializeHogFunctionInvocation,
unGzipObject,
} from './utils'
Expand Down Expand Up @@ -182,9 +182,7 @@ abstract class CdpConsumerBase {
)
}

protected produceAppMetric(
metric: Pick<AppMetric2Type, 'team_id' | 'app_source_id' | 'metric_kind' | 'metric_name' | 'count'>
) {
protected produceAppMetric(metric: HogFunctionAppMetric) {
const appMetric: AppMetric2Type = {
app_source: 'hog_function',
...metric,
Expand All @@ -201,7 +199,15 @@ abstract class CdpConsumerBase {
}

protected produceLogs(result: HogFunctionInvocationResult) {
const logs = prepareLogEntriesForClickhouse(result)
const logs = fixLogDeduplication(
result.logs.map((logEntry) => ({
...logEntry,
team_id: result.invocation.hogFunction.team_id,
log_source: 'hog_function',
log_source_id: result.invocation.hogFunction.id,
instance_id: result.invocation.id,
}))
)

logs.forEach((logEntry) => {
this.messagesToProduce.push({
Expand Down Expand Up @@ -292,6 +298,9 @@ abstract class CdpConsumerBase {

this.produceLogs(result)

// Clear the logs so we don't pass them on to the next invocation
result.logs = []

// PostHog capture events
const capturedEvents = result.capturedPostHogEvents
delete result.capturedPostHogEvents
Expand Down Expand Up @@ -497,40 +506,28 @@ export class CdpProcessedEventsConsumer extends CdpConsumerBase {
return await runInstrumentedFunction({
statsKey: `cdpConsumer.handleEachBatch.queueMatchingFunctions`,
func: async () => {
const possibleInvocations: HogFunctionInvocation[] = []

// TODO: Add a helper to hog functions to determine if they require groups or not and then only load those
await this.groupsManager.enrichGroups(invocationGlobals)

await this.runManyWithHeartbeat(invocationGlobals, (globals) => {
const { matchingFunctions, nonMatchingFunctions, erroredFunctions } =
this.hogExecutor.findMatchingFunctions(globals)
const possibleInvocations = (
await this.runManyWithHeartbeat(invocationGlobals, (globals) => {
const { invocations, metrics, logs } = this.hogExecutor.findHogFunctionInvocations(globals)

possibleInvocations.push(
...matchingFunctions.map((hogFunction) => createInvocation(globals, hogFunction))
)

nonMatchingFunctions.forEach((item) =>
this.produceAppMetric({
team_id: item.team_id,
app_source_id: item.id,
metric_kind: 'other',
metric_name: 'filtered',
count: 1,
metrics.forEach((metric) => {
this.produceAppMetric(metric)
})
)

erroredFunctions.forEach(([item, error]) => {
this.produceAppMetric({
team_id: item.team_id,
app_source_id: item.id,
metric_kind: 'other',
metric_name: 'filtering_failed',
count: 1,
fixLogDeduplication(logs).forEach((logEntry) => {
this.messagesToProduce.push({
topic: KAFKA_LOG_ENTRIES,
value: logEntry,
key: logEntry.instance_id,
})
})
this.logFilteringError(item, error)

return invocations
})
})
).flat()

const states = await this.hogWatcher.getStates(possibleInvocations.map((x) => x.hogFunction.id))
const validInvocations: HogFunctionInvocation[] = []
Expand Down
Loading

0 comments on commit f59dc5d

Please sign in to comment.