Skip to content

Commit

Permalink
Fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
benjackwhite committed Jul 26, 2024
1 parent 1146de0 commit 18ccc34
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 65 deletions.
9 changes: 7 additions & 2 deletions plugin-server/src/cdp/cdp-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { addLog, HogExecutor } from './hog-executor'
import { HogFunctionManager } from './hog-function-manager'
import { HogWatcher } from './hog-watcher/hog-watcher'
import { HogWatcherState } from './hog-watcher/types'
import { HogFunctionInvocation, HogFunctionInvocationAsyncRequest, HogFunctionType } from './types'
import { HogFunctionInvocation, HogFunctionInvocationAsyncRequest, HogFunctionLogEntry, HogFunctionType } from './types'

export class CdpApi {
private hogExecutor: HogExecutor
Expand Down Expand Up @@ -110,6 +110,7 @@ export class CdpApi {
globals: globals,
teamId: team.id,
hogFunctionId: id,
timings: [],
}

// We use the provided config if given, otherwise the function's config
Expand All @@ -123,6 +124,7 @@ export class CdpApi {
await this.hogFunctionManager.enrichWithIntegrations([compoundConfiguration])

let response = this.hogExecutor.execute(compoundConfiguration, invocation)
const logs: HogFunctionLogEntry[] = []

while (response.asyncFunctionRequest) {
invocation.vmState = response.invocation.vmState
Expand Down Expand Up @@ -158,13 +160,16 @@ export class CdpApi {
invocation.vmState!.stack.push(convertJSToHog(asyncRes?.asyncFunctionResponse.response ?? null))
}

logs.push(...response.logs)
response = this.hogExecutor.execute(compoundConfiguration, invocation)
}

logs.push(...response.logs)

res.json({
status: response.finished ? 'success' : 'error',
error: String(response.error),
logs: response.logs,
logs: logs,
})
} catch (e) {
console.error(e)
Expand Down
15 changes: 10 additions & 5 deletions plugin-server/src/cdp/hog-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ export class HogExecutor {
globals: modifiedGlobals,
teamId: hogFunction.team_id,
hogFunctionId: hogFunction.id,
timings: [],
})
}

Expand All @@ -196,7 +197,6 @@ export class HogExecutor {
finished: false,
error,
logs: [],
timings: [],
})

const hogFunction = this.hogFunctionManager.getTeamHogFunction(
Expand All @@ -214,8 +214,14 @@ export class HogExecutor {

// Add the response to the stack to continue execution
invocation.vmState.stack.push(convertJSToHog(asyncFunctionResponse.response ?? null))
invocation.timings.push(...(asyncFunctionResponse.timings ?? []))

const res = this.execute(hogFunction, invocation)

return this.execute(hogFunction, invocation)
// Add any timings and logs from the async function
res.logs = [...(asyncFunctionResponse.logs ?? []), ...res.logs]

return res
}

execute(hogFunction: HogFunctionType, invocation: HogFunctionInvocation): HogFunctionInvocationResult {
Expand All @@ -233,7 +239,6 @@ export class HogExecutor {
finished: false,
capturedPostHogEvents: [],
logs: [],
timings: [],
}

if (!invocation.vmState) {
Expand Down Expand Up @@ -327,7 +332,7 @@ export class HogExecutor {
hogExecutionDuration.observe(duration)

result.finished = execRes.finished
result.timings.push({
invocation.timings.push({
kind: 'hog',
duration_ms: duration,
})
Expand Down Expand Up @@ -356,7 +361,7 @@ export class HogExecutor {
addLog(result, 'warn', `Function was not finished but also had no async function to execute.`)
}
} else {
const totalDuration = result.timings.reduce((acc, timing) => acc + timing.duration_ms, 0)
const totalDuration = invocation.timings.reduce((acc, timing) => acc + timing.duration_ms, 0)
const messages = [`Function completed in ${totalDuration}ms.`]
if (execRes.state) {
messages.push(`Sync: ${execRes.state.syncDuration}ms.`)
Expand Down
2 changes: 1 addition & 1 deletion plugin-server/src/cdp/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ export type HogFunctionInvocation = {
hogFunctionId: HogFunctionType['id']
// The current vmstate (set if the invocation is paused)
vmState?: VMState
timings: HogFunctionTiming[]
}

export type HogFunctionAsyncFunctionRequest = {
Expand All @@ -180,7 +181,6 @@ export type HogFunctionInvocationResult = {
error?: any
asyncFunctionRequest?: HogFunctionAsyncFunctionRequest
logs: HogFunctionLogEntry[]
timings: HogFunctionTiming[]
capturedPostHogEvents?: HogFunctionCapturedEvent[]
}

Expand Down
47 changes: 3 additions & 44 deletions plugin-server/tests/cdp/cdp-processed-events-consumer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -217,55 +217,14 @@ describe('CDP Processed Events Consuner', () => {
})

const msg = decodeKafkaMessage(mockProducer.produce.mock.calls[3][0])
// Parse body so it can match by object equality rather than exact string equality
msg.value.asyncFunctionRequest.args[1].body = JSON.parse(msg.value.asyncFunctionRequest.args[1].body)

expect(msg).toEqual({
key: expect.any(String),
topic: 'cdp_function_callbacks_test',
value: {
id: expect.any(String),
globals: expect.objectContaining({
project: { id: 2, name: 'TEST PROJECT', url: 'http://localhost:8000/project/2' },
// We assume the rest is correct
}),
state: expect.any(String),
hogFunctionId: hogFunction.id,
teamId: 2,
hogFunctionId: expect.any(String),
finished: false,
logs: [],
timings: [
{
kind: 'hog',
duration_ms: expect.any(Number),
},
],
asyncFunctionRequest: {
name: 'fetch',
args: [
'https://example.com/posthog-webhook',
{
headers: { version: 'v=1.0.0' },
body: {
event: {
uuid: 'b3a1fe86-b10c-43cc-acaf-d208977608d0',
name: '$pageview',
distinct_id: 'distinct_id_1',
properties: { $lib_version: '1.0.0', $elements_chain: '[]' },
timestamp: null,
url: 'http://localhost:8000/project/2/events/b3a1fe86-b10c-43cc-acaf-d208977608d0/null',
},
groups: {},
nested: {
foo: 'http://localhost:8000/project/2/events/b3a1fe86-b10c-43cc-acaf-d208977608d0/null',
},
person: null,
event_url:
'http://localhost:8000/project/2/events/b3a1fe86-b10c-43cc-acaf-d208977608d0/null-test',
},
method: 'POST',
},
],
vmState: expect.any(Object),
},
asyncFunctionResponse: {
response: {
status: 200,
Expand Down
14 changes: 6 additions & 8 deletions plugin-server/tests/cdp/hog-executor.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { DateTime } from 'luxon'
import { finished } from 'stream'

import { HogExecutor } from '../../src/cdp/hog-executor'
import { HogFunctionManager } from '../../src/cdp/hog-function-manager'
Expand Down Expand Up @@ -143,7 +142,6 @@ describe('Hog Executor', () => {
teamId: 1,
hogFunctionId: hogFunction.id,
vmState: expect.any(Object),

globals: {
project: { id: 1, name: 'test', url: 'http://localhost:8000/projects/1' },
event: {
Expand All @@ -159,6 +157,12 @@ describe('Hog Executor', () => {
url: `http://localhost:8000/projects/1/pipeline/destinations/hog-${hogFunction.id}/configuration/`,
},
},
timings: [
{
kind: 'hog',
duration_ms: 0,
},
],
},

asyncFunctionRequest: {
Expand All @@ -185,12 +189,6 @@ describe('Hog Executor', () => {
},
],
},
timings: [
{
kind: 'hog',
duration_ms: 0,
},
],
})
})

Expand Down
21 changes: 16 additions & 5 deletions plugin-server/tests/cdp/hog-watcher/hog-watcher.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,29 @@ const mockNow: jest.Mock = require('../../../src/utils/now').now as any

const createResult = (id: string, finished = true, error?: string): HogFunctionInvocationResult => {
return {
hogFunctionId: id,
invocation: {
id: 'invocation-id',
teamId: 2,
hogFunctionId: id,
globals: {} as any,
timings: [],
},
finished,
error,
} as HogFunctionInvocationResult
logs: [],
}
}

const createAsyncResponse = (id: string, success = true): HogFunctionInvocationAsyncResponse => {
return {
state: '',
teamId: 2,
hogFunctionId: id,
error: success ? null : 'error',
} as HogFunctionInvocationAsyncResponse
asyncFunctionResponse: {
error: !success ? 'error' : null,
response: {},
},
}
}

const config = defaultConfig
Expand Down Expand Up @@ -104,7 +116,6 @@ describe('HogWatcher', () => {

const advanceTime = (ms: number) => {
now += ms
console.log(`[TEST] Advancing time by ${ms}ms to ${now}`)
mockNow.mockReturnValue(now)
}

Expand Down

0 comments on commit 18ccc34

Please sign in to comment.