Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support mappings for server-side CDP destinations #27168

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 82 additions & 62 deletions plugin-server/src/cdp/cdp-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import { HogExecutor, MAX_ASYNC_STEPS } from './hog-executor'
import { HogFunctionManager } from './hog-function-manager'
import { HogWatcher, HogWatcherState } from './hog-watcher'
import { HogFunctionInvocationResult, HogFunctionType, LogEntry } from './types'
import { createInvocation } from './utils'

export class CdpApi {
private hogExecutor: HogExecutor
Expand Down Expand Up @@ -116,82 +115,103 @@ export class CdpApi {

let lastResponse: HogFunctionInvocationResult | null = null
let logs: LogEntry[] = []
const errors: any[] = []

const triggerGlobals = {
...globals,
project: {
id: team.id,
name: team.name,
url: `${this.hub.SITE_URL ?? 'http://localhost:8000'}/project/${team.id}`,
...globals.project,
},
}

let count = 0

while (!lastResponse || !lastResponse.finished) {
if (count > MAX_ASYNC_STEPS * 2) {
throw new Error('Too many iterations')
const {
invocations,
logs: filterLogs,
metrics: filterMetrics,
} = this.hogExecutor.buildHogFunctionInvocations([compoundConfiguration], triggerGlobals)

// Add metrics to the logs
filterMetrics.forEach((metric) => {
if (metric.metric_name === 'filtered') {
logs.push({
level: 'info',
timestamp: DateTime.now(),
message: `Mapping trigger not matching filters was ignored.`,
})
}
count += 1

let response: HogFunctionInvocationResult

const invocation =
lastResponse?.invocation ||
createInvocation(
{
...globals,
project: {
id: team.id,
name: team.name,
url: `${this.hub.SITE_URL ?? 'http://localhost:8000'}/project/${team.id}`,
...globals.project,
},
},
compoundConfiguration,
// The "email" hog functions export a "sendEmail" function that we must explicitly call
hogFunction.type === 'email' ? ['sendEmail', [globals.email]] : undefined
)

if (invocation.queue === 'fetch') {
if (mock_async_functions) {
// Add the state, simulating what executeAsyncResponse would do

// Re-parse the fetch args for the logging
const fetchArgs = {
...invocation.queueParameters,
}
})

response = {
invocation: {
...invocation,
queue: 'hog',
queueParameters: { response: { status: 200, headers: {} }, body: '{}' },
},
finished: false,
logs: [
{
level: 'info',
timestamp: DateTime.now(),
message: `Async function 'fetch' was mocked with arguments:`,
},
{
level: 'info',
timestamp: DateTime.now(),
message: `fetch(${JSON.stringify(fetchArgs, null, 2)})`,
filterLogs.forEach((log) => {
logs.push(log)
})

for (const _invocation of invocations) {
let count = 0
let invocation = _invocation

while (!lastResponse || !lastResponse.finished) {
if (count > MAX_ASYNC_STEPS * 2) {
throw new Error('Too many iterations')
}
count += 1

let response: HogFunctionInvocationResult

if (invocation.queue === 'fetch') {
if (mock_async_functions) {
// Add the state, simulating what executeAsyncResponse would do

// Re-parse the fetch args for the logging
const fetchArgs = {
...invocation.queueParameters,
}

response = {
invocation: {
...invocation,
queue: 'hog',
queueParameters: { response: { status: 200, headers: {} }, body: '{}' },
},
],
finished: false,
logs: [
{
level: 'info',
timestamp: DateTime.now(),
message: `Async function 'fetch' was mocked with arguments:`,
},
{
level: 'info',
timestamp: DateTime.now(),
message: `fetch(${JSON.stringify(fetchArgs, null, 2)})`,
},
],
}
} else {
response = await this.fetchExecutor!.executeLocally(invocation)
}
} else {
response = await this.fetchExecutor!.executeLocally(invocation)
response = this.hogExecutor.execute(invocation)
}
} else {
response = this.hogExecutor.execute(invocation)
}

logs = logs.concat(response.logs)
lastResponse = response
logs = logs.concat(response.logs)
lastResponse = response
invocation = response.invocation
if (response.error) {
errors.push(response.error)
}
}
}

res.json({
status: lastResponse.finished ? 'success' : 'error',
error: lastResponse.error ? String(lastResponse.error) : null,
errors: errors.map((e) => String(e)),
logs: logs,
})
} catch (e) {
console.error(e)
res.status(500).json({ error: e.message })
res.status(500).json({ errors: [e.message] })
}
}
}
61 changes: 29 additions & 32 deletions plugin-server/src/cdp/cdp-consumers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import { HogWatcher, HogWatcherState } from './hog-watcher'
import { CdpRedis, createCdpRedisPool } from './redis'
import { CdpInternalEventSchema } from './schema'
import {
HogFunctionAppMetric,
HogFunctionInvocation,
HogFunctionInvocationGlobals,
HogFunctionInvocationResult,
Expand All @@ -55,11 +56,10 @@ import {
convertInternalEventToHogFunctionInvocationGlobals,
convertToCaptureEvent,
convertToHogFunctionInvocationGlobals,
createInvocation,
cyclotronJobToInvocation,
fixLogDeduplication,
gzipObject,
invocationToCyclotronJobUpdate,
prepareLogEntriesForClickhouse,
serializeHogFunctionInvocation,
unGzipObject,
} from './utils'
Expand Down Expand Up @@ -182,9 +182,7 @@ abstract class CdpConsumerBase {
)
}

protected produceAppMetric(
metric: Pick<AppMetric2Type, 'team_id' | 'app_source_id' | 'metric_kind' | 'metric_name' | 'count'>
) {
protected produceAppMetric(metric: HogFunctionAppMetric) {
const appMetric: AppMetric2Type = {
app_source: 'hog_function',
...metric,
Expand All @@ -201,7 +199,15 @@ abstract class CdpConsumerBase {
}

protected produceLogs(result: HogFunctionInvocationResult) {
const logs = prepareLogEntriesForClickhouse(result)
const logs = fixLogDeduplication(
result.logs.map((logEntry) => ({
...logEntry,
team_id: result.invocation.hogFunction.team_id,
log_source: 'hog_function',
log_source_id: result.invocation.hogFunction.id,
instance_id: result.invocation.id,
}))
)

logs.forEach((logEntry) => {
this.messagesToProduce.push({
Expand Down Expand Up @@ -292,6 +298,9 @@ abstract class CdpConsumerBase {

this.produceLogs(result)

// Clear the logs so we don't pass them on to the next invocation
result.logs = []

// PostHog capture events
const capturedEvents = result.capturedPostHogEvents
delete result.capturedPostHogEvents
Expand Down Expand Up @@ -491,40 +500,28 @@ export class CdpProcessedEventsConsumer extends CdpConsumerBase {
return await runInstrumentedFunction({
statsKey: `cdpConsumer.handleEachBatch.queueMatchingFunctions`,
func: async () => {
const possibleInvocations: HogFunctionInvocation[] = []

// TODO: Add a helper to hog functions to determine if they require groups or not and then only load those
await this.groupsManager.enrichGroups(invocationGlobals)

await this.runManyWithHeartbeat(invocationGlobals, (globals) => {
const { matchingFunctions, nonMatchingFunctions, erroredFunctions } =
this.hogExecutor.findMatchingFunctions(globals)
const possibleInvocations = (
await this.runManyWithHeartbeat(invocationGlobals, (globals) => {
const { invocations, metrics, logs } = this.hogExecutor.findHogFunctionInvocations(globals)

possibleInvocations.push(
...matchingFunctions.map((hogFunction) => createInvocation(globals, hogFunction))
)

nonMatchingFunctions.forEach((item) =>
this.produceAppMetric({
team_id: item.team_id,
app_source_id: item.id,
metric_kind: 'other',
metric_name: 'filtered',
count: 1,
metrics.forEach((metric) => {
this.produceAppMetric(metric)
})
)

erroredFunctions.forEach(([item, error]) => {
this.produceAppMetric({
team_id: item.team_id,
app_source_id: item.id,
metric_kind: 'other',
metric_name: 'filtering_failed',
count: 1,
fixLogDeduplication(logs).forEach((logEntry) => {
this.messagesToProduce.push({
topic: KAFKA_LOG_ENTRIES,
value: logEntry,
key: logEntry.instance_id,
})
})
this.logFilteringError(item, error)

return invocations
})
})
).flat()

const states = await this.hogWatcher.getStates(possibleInvocations.map((x) => x.hogFunction.id))
const validInvocations: HogFunctionInvocation[] = []
Expand Down
Loading