From 18ccc3490040c6c19528dbe61ea2bda70b204518 Mon Sep 17 00:00:00 2001 From: Ben White Date: Fri, 26 Jul 2024 11:23:23 +0200 Subject: [PATCH] Fixes --- plugin-server/src/cdp/cdp-api.ts | 9 +++- plugin-server/src/cdp/hog-executor.ts | 15 ++++-- plugin-server/src/cdp/types.ts | 2 +- .../cdp/cdp-processed-events-consumer.test.ts | 47 ++----------------- plugin-server/tests/cdp/hog-executor.test.ts | 14 +++--- .../tests/cdp/hog-watcher/hog-watcher.test.ts | 21 +++++++-- 6 files changed, 43 insertions(+), 65 deletions(-) diff --git a/plugin-server/src/cdp/cdp-api.ts b/plugin-server/src/cdp/cdp-api.ts index 8e556cf0f0565..b0e3e2242c0aa 100644 --- a/plugin-server/src/cdp/cdp-api.ts +++ b/plugin-server/src/cdp/cdp-api.ts @@ -9,7 +9,7 @@ import { addLog, HogExecutor } from './hog-executor' import { HogFunctionManager } from './hog-function-manager' import { HogWatcher } from './hog-watcher/hog-watcher' import { HogWatcherState } from './hog-watcher/types' -import { HogFunctionInvocation, HogFunctionInvocationAsyncRequest, HogFunctionType } from './types' +import { HogFunctionInvocation, HogFunctionInvocationAsyncRequest, HogFunctionLogEntry, HogFunctionType } from './types' export class CdpApi { private hogExecutor: HogExecutor @@ -110,6 +110,7 @@ export class CdpApi { globals: globals, teamId: team.id, hogFunctionId: id, + timings: [], } // We use the provided config if given, otherwise the function's config @@ -123,6 +124,7 @@ export class CdpApi { await this.hogFunctionManager.enrichWithIntegrations([compoundConfiguration]) let response = this.hogExecutor.execute(compoundConfiguration, invocation) + const logs: HogFunctionLogEntry[] = [] while (response.asyncFunctionRequest) { invocation.vmState = response.invocation.vmState @@ -158,13 +160,16 @@ export class CdpApi { invocation.vmState!.stack.push(convertJSToHog(asyncRes?.asyncFunctionResponse.response ?? null)) } + logs.push(...response.logs) response = this.hogExecutor.execute(compoundConfiguration, invocation) } + logs.push(...response.logs) + res.json({ status: response.finished ? 'success' : 'error', error: String(response.error), - logs: response.logs, + logs: logs, }) } catch (e) { console.error(e) diff --git a/plugin-server/src/cdp/hog-executor.ts b/plugin-server/src/cdp/hog-executor.ts index 51b8240e7c6d6..bc0274d871a6c 100644 --- a/plugin-server/src/cdp/hog-executor.ts +++ b/plugin-server/src/cdp/hog-executor.ts @@ -175,6 +175,7 @@ export class HogExecutor { globals: modifiedGlobals, teamId: hogFunction.team_id, hogFunctionId: hogFunction.id, + timings: [], }) } @@ -196,7 +197,6 @@ export class HogExecutor { finished: false, error, logs: [], - timings: [], }) const hogFunction = this.hogFunctionManager.getTeamHogFunction( @@ -214,8 +214,14 @@ export class HogExecutor { // Add the response to the stack to continue execution invocation.vmState.stack.push(convertJSToHog(asyncFunctionResponse.response ?? null)) + invocation.timings.push(...(asyncFunctionResponse.timings ?? [])) + + const res = this.execute(hogFunction, invocation) - return this.execute(hogFunction, invocation) + // Add any timings and logs from the async function + res.logs = [...(asyncFunctionResponse.logs ?? []), ...res.logs] + + return res } execute(hogFunction: HogFunctionType, invocation: HogFunctionInvocation): HogFunctionInvocationResult { @@ -233,7 +239,6 @@ export class HogExecutor { finished: false, capturedPostHogEvents: [], logs: [], - timings: [], } if (!invocation.vmState) { @@ -327,7 +332,7 @@ export class HogExecutor { hogExecutionDuration.observe(duration) result.finished = execRes.finished - result.timings.push({ + invocation.timings.push({ kind: 'hog', duration_ms: duration, }) @@ -356,7 +361,7 @@ export class HogExecutor { addLog(result, 'warn', `Function was not finished but also had no async function to execute.`) } } else { - const totalDuration = result.timings.reduce((acc, timing) => acc + timing.duration_ms, 0) + const totalDuration = invocation.timings.reduce((acc, timing) => acc + timing.duration_ms, 0) const messages = [`Function completed in ${totalDuration}ms.`] if (execRes.state) { messages.push(`Sync: ${execRes.state.syncDuration}ms.`) diff --git a/plugin-server/src/cdp/types.ts b/plugin-server/src/cdp/types.ts index 134b08b76d0db..9273f5a7210c3 100644 --- a/plugin-server/src/cdp/types.ts +++ b/plugin-server/src/cdp/types.ts @@ -157,6 +157,7 @@ export type HogFunctionInvocation = { hogFunctionId: HogFunctionType['id'] // The current vmstate (set if the invocation is paused) vmState?: VMState + timings: HogFunctionTiming[] } export type HogFunctionAsyncFunctionRequest = { @@ -180,7 +181,6 @@ export type HogFunctionInvocationResult = { error?: any asyncFunctionRequest?: HogFunctionAsyncFunctionRequest logs: HogFunctionLogEntry[] - timings: HogFunctionTiming[] capturedPostHogEvents?: HogFunctionCapturedEvent[] } diff --git a/plugin-server/tests/cdp/cdp-processed-events-consumer.test.ts b/plugin-server/tests/cdp/cdp-processed-events-consumer.test.ts index 2013d21d48b6c..4de26bd05fca0 100644 --- a/plugin-server/tests/cdp/cdp-processed-events-consumer.test.ts +++ b/plugin-server/tests/cdp/cdp-processed-events-consumer.test.ts @@ -217,55 +217,14 @@ describe('CDP Processed Events Consuner', () => { }) const msg = decodeKafkaMessage(mockProducer.produce.mock.calls[3][0]) - // Parse body so it can match by object equality rather than exact string equality - msg.value.asyncFunctionRequest.args[1].body = JSON.parse(msg.value.asyncFunctionRequest.args[1].body) + expect(msg).toEqual({ key: expect.any(String), topic: 'cdp_function_callbacks_test', value: { - id: expect.any(String), - globals: expect.objectContaining({ - project: { id: 2, name: 'TEST PROJECT', url: 'http://localhost:8000/project/2' }, - // We assume the rest is correct - }), + state: expect.any(String), + hogFunctionId: hogFunction.id, teamId: 2, - hogFunctionId: expect.any(String), - finished: false, - logs: [], - timings: [ - { - kind: 'hog', - duration_ms: expect.any(Number), - }, - ], - asyncFunctionRequest: { - name: 'fetch', - args: [ - 'https://example.com/posthog-webhook', - { - headers: { version: 'v=1.0.0' }, - body: { - event: { - uuid: 'b3a1fe86-b10c-43cc-acaf-d208977608d0', - name: '$pageview', - distinct_id: 'distinct_id_1', - properties: { $lib_version: '1.0.0', $elements_chain: '[]' }, - timestamp: null, - url: 'http://localhost:8000/project/2/events/b3a1fe86-b10c-43cc-acaf-d208977608d0/null', - }, - groups: {}, - nested: { - foo: 'http://localhost:8000/project/2/events/b3a1fe86-b10c-43cc-acaf-d208977608d0/null', - }, - person: null, - event_url: - 'http://localhost:8000/project/2/events/b3a1fe86-b10c-43cc-acaf-d208977608d0/null-test', - }, - method: 'POST', - }, - ], - vmState: expect.any(Object), - }, asyncFunctionResponse: { response: { status: 200, diff --git a/plugin-server/tests/cdp/hog-executor.test.ts b/plugin-server/tests/cdp/hog-executor.test.ts index 727a60f657e6a..ba336799b87e4 100644 --- a/plugin-server/tests/cdp/hog-executor.test.ts +++ b/plugin-server/tests/cdp/hog-executor.test.ts @@ -1,5 +1,4 @@ import { DateTime } from 'luxon' -import { finished } from 'stream' import { HogExecutor } from '../../src/cdp/hog-executor' import { HogFunctionManager } from '../../src/cdp/hog-function-manager' @@ -143,7 +142,6 @@ describe('Hog Executor', () => { teamId: 1, hogFunctionId: hogFunction.id, vmState: expect.any(Object), - globals: { project: { id: 1, name: 'test', url: 'http://localhost:8000/projects/1' }, event: { @@ -159,6 +157,12 @@ describe('Hog Executor', () => { url: `http://localhost:8000/projects/1/pipeline/destinations/hog-${hogFunction.id}/configuration/`, }, }, + timings: [ + { + kind: 'hog', + duration_ms: 0, + }, + ], }, asyncFunctionRequest: { @@ -185,12 +189,6 @@ describe('Hog Executor', () => { }, ], }, - timings: [ - { - kind: 'hog', - duration_ms: 0, - }, - ], }) }) diff --git a/plugin-server/tests/cdp/hog-watcher/hog-watcher.test.ts b/plugin-server/tests/cdp/hog-watcher/hog-watcher.test.ts index 52577df97bede..a36d72794ce99 100644 --- a/plugin-server/tests/cdp/hog-watcher/hog-watcher.test.ts +++ b/plugin-server/tests/cdp/hog-watcher/hog-watcher.test.ts @@ -17,17 +17,29 @@ const mockNow: jest.Mock = require('../../../src/utils/now').now as any const createResult = (id: string, finished = true, error?: string): HogFunctionInvocationResult => { return { - hogFunctionId: id, + invocation: { + id: 'invocation-id', + teamId: 2, + hogFunctionId: id, + globals: {} as any, + timings: [], + }, finished, error, - } as HogFunctionInvocationResult + logs: [], + } } const createAsyncResponse = (id: string, success = true): HogFunctionInvocationAsyncResponse => { return { + state: '', + teamId: 2, hogFunctionId: id, - error: success ? null : 'error', - } as HogFunctionInvocationAsyncResponse + asyncFunctionResponse: { + error: !success ? 'error' : null, + response: {}, + }, + } } const config = defaultConfig @@ -104,7 +116,6 @@ describe('HogWatcher', () => { const advanceTime = (ms: number) => { now += ms - console.log(`[TEST] Advancing time by ${ms}ms to ${now}`) mockNow.mockReturnValue(now) }