From 1146de08a44777bd10b487ddcd273ad75ab99f46 Mon Sep 17 00:00:00 2001 From: Ben White Date: Fri, 26 Jul 2024 10:55:42 +0200 Subject: [PATCH] Fixes --- plugin-server/src/cdp/cdp-api.ts | 26 +++--- .../src/cdp/hog-watcher/hog-watcher.ts | 6 +- plugin-server/tests/cdp/hog-executor.test.ts | 82 ++++++++++--------- 3 files changed, 60 insertions(+), 54 deletions(-) diff --git a/plugin-server/src/cdp/cdp-api.ts b/plugin-server/src/cdp/cdp-api.ts index 642061adef622..8e556cf0f0565 100644 --- a/plugin-server/src/cdp/cdp-api.ts +++ b/plugin-server/src/cdp/cdp-api.ts @@ -9,7 +9,7 @@ import { addLog, HogExecutor } from './hog-executor' import { HogFunctionManager } from './hog-function-manager' import { HogWatcher } from './hog-watcher/hog-watcher' import { HogWatcherState } from './hog-watcher/types' -import { HogFunctionInvocation, HogFunctionType } from './types' +import { HogFunctionInvocation, HogFunctionInvocationAsyncRequest, HogFunctionType } from './types' export class CdpApi { private hogExecutor: HogExecutor @@ -110,8 +110,6 @@ export class CdpApi { globals: globals, teamId: team.id, hogFunctionId: id, - logs: [], - timings: [], } // We use the provided config if given, otherwise the function's config @@ -127,6 +125,8 @@ export class CdpApi { let response = this.hogExecutor.execute(compoundConfiguration, invocation) while (response.asyncFunctionRequest) { + invocation.vmState = response.invocation.vmState + const asyncFunctionRequest = response.asyncFunctionRequest if (mock_async_functions || asyncFunctionRequest.name !== 'fetch') { @@ -140,25 +140,25 @@ export class CdpApi { ) // Add the state, simulating what executeAsyncResponse would do - asyncFunctionRequest.vmState.stack.push(convertJSToHog({ status: 200, body: {} })) + invocation.vmState!.stack.push(convertJSToHog({ status: 200, body: {} })) } else { - const asyncRes = await this.asyncFunctionExecutor!.execute(response, { + const asyncInvocationRequest: HogFunctionInvocationAsyncRequest = { + state: '', // WE don't care about the state for this level of testing + teamId: team.id, + hogFunctionId: hogFunction.id, + asyncFunctionRequest, + } + const asyncRes = await this.asyncFunctionExecutor!.execute(asyncInvocationRequest, { sync: true, }) if (!asyncRes || asyncRes.asyncFunctionResponse.error) { addLog(response, 'error', 'Failed to execute async function') } - asyncFunctionRequest.vmState.stack.push( - convertJSToHog(asyncRes?.asyncFunctionResponse.response ?? null) - ) - response.timings.push(...(asyncRes?.asyncFunctionResponse.timings ?? [])) + invocation.vmState!.stack.push(convertJSToHog(asyncRes?.asyncFunctionResponse.response ?? null)) } - // Clear it so we can't ever end up in a loop - delete response.asyncFunctionRequest - - response = this.hogExecutor.execute(compoundConfiguration, response, asyncFunctionRequest.vmState) + response = this.hogExecutor.execute(compoundConfiguration, invocation) } res.json({ diff --git a/plugin-server/src/cdp/hog-watcher/hog-watcher.ts b/plugin-server/src/cdp/hog-watcher/hog-watcher.ts index 8d531d8b36410..d8fd248d7bc44 100644 --- a/plugin-server/src/cdp/hog-watcher/hog-watcher.ts +++ b/plugin-server/src/cdp/hog-watcher/hog-watcher.ts @@ -64,7 +64,7 @@ export class HogWatcherActiveObservations { observeResults(results: HogFunctionInvocationResult[]) { results.forEach((result) => - this.addObservations(result.hogFunctionId, { + this.addObservations(result.invocation.hogFunctionId, { successes: result.finished ? 1 : 0, failures: result.error ? 1 : 0, }) @@ -75,8 +75,8 @@ export class HogWatcherActiveObservations { // NOTE: This probably wants to be done using the response status instead :thinking: responses.forEach((response) => this.addObservations(response.hogFunctionId, { - asyncFunctionSuccesses: response.error ? 0 : 1, - asyncFunctionFailures: response.error ? 1 : 0, + asyncFunctionSuccesses: response.asyncFunctionResponse.error ? 0 : 1, + asyncFunctionFailures: response.asyncFunctionResponse.error ? 1 : 0, }) ) } diff --git a/plugin-server/tests/cdp/hog-executor.test.ts b/plugin-server/tests/cdp/hog-executor.test.ts index 123541cc12238..727a60f657e6a 100644 --- a/plugin-server/tests/cdp/hog-executor.test.ts +++ b/plugin-server/tests/cdp/hog-executor.test.ts @@ -1,9 +1,10 @@ import { DateTime } from 'luxon' +import { finished } from 'stream' import { HogExecutor } from '../../src/cdp/hog-executor' import { HogFunctionManager } from '../../src/cdp/hog-function-manager' import { - HogFunctionInvocationAsyncResponse, + HogFunctionAsyncFunctionResponse, HogFunctionInvocationResult, HogFunctionLogEntry, HogFunctionType, @@ -13,20 +14,17 @@ import { castTimestampOrNow } from '../../src/utils/utils' import { HOG_EXAMPLES, HOG_FILTERS_EXAMPLES, HOG_INPUTS_EXAMPLES } from './examples' import { createHogExecutionGlobals, createHogFunction, insertHogFunction as _insertHogFunction } from './fixtures' -const simulateMockFetchAsyncResponse = (result: HogFunctionInvocationResult): HogFunctionInvocationAsyncResponse => { +const createAsyncFunctionResponse = (): HogFunctionAsyncFunctionResponse => { return { - ...result, - asyncFunctionResponse: { - timings: [ - { - kind: 'async_function', - duration_ms: 100, - }, - ], - response: { - status: 200, - body: 'success', + timings: [ + { + kind: 'async_function', + duration_ms: 100, }, + ], + response: { + status: 200, + body: 'success', }, } } @@ -61,15 +59,19 @@ describe('Hog Executor', () => { mockFunctionManager.getTeamHogFunction.mockReturnValue(hogFunction) }) - it('can parse incoming messages correctly', () => { + it('can execute messages', () => { const globals = createHogExecutionGlobals() const results = executor .findMatchingFunctions(createHogExecutionGlobals()) .matchingFunctions.map((x) => executor.executeFunction(globals, x) as HogFunctionInvocationResult) expect(results).toHaveLength(1) expect(results[0]).toMatchObject({ - id: expect.any(String), - hogFunctionId: hogFunction.id, + invocation: { + id: expect.any(String), + hogFunctionId: hogFunction.id, + }, + finished: false, + asyncFunctionRequest: {}, }) }) @@ -83,7 +85,7 @@ describe('Hog Executor', () => { team_id: 1, log_source: 'hog_function', log_source_id: hogFunction.id, - instance_id: results[0].id, + instance_id: results[0].invocation.id, timestamp: expect.any(DateTime), level: 'debug', message: 'Executing function', @@ -92,7 +94,7 @@ describe('Hog Executor', () => { team_id: 1, log_source: 'hog_function', log_source_id: hogFunction.id, - instance_id: results[0].id, + instance_id: results[0].invocation.id, timestamp: expect.any(DateTime), level: 'debug', message: "Suspending function due to async function call 'fetch'. Payload: 1299 bytes", @@ -136,24 +138,29 @@ describe('Hog Executor', () => { .findMatchingFunctions(createHogExecutionGlobals()) .matchingFunctions.map((x) => executor.executeFunction(globals, x) as HogFunctionInvocationResult) expect(results[0]).toMatchObject({ - id: results[0].id, - globals: { - project: { id: 1, name: 'test', url: 'http://localhost:8000/projects/1' }, - event: { - uuid: 'uuid', - name: 'test', - distinct_id: 'distinct_id', - url: 'http://localhost:8000/events/1', - properties: { $lib_version: '1.2.3' }, - timestamp: '2024-06-07T12:00:00.000Z', - }, - source: { - name: 'Test hog function', - url: `http://localhost:8000/projects/1/pipeline/destinations/hog-${hogFunction.id}/configuration/`, + invocation: { + id: results[0].invocation.id, + teamId: 1, + hogFunctionId: hogFunction.id, + vmState: expect.any(Object), + + globals: { + project: { id: 1, name: 'test', url: 'http://localhost:8000/projects/1' }, + event: { + uuid: 'uuid', + name: 'test', + distinct_id: 'distinct_id', + url: 'http://localhost:8000/events/1', + properties: { $lib_version: '1.2.3' }, + timestamp: '2024-06-07T12:00:00.000Z', + }, + source: { + name: 'Test hog function', + url: `http://localhost:8000/projects/1/pipeline/destinations/hog-${hogFunction.id}/configuration/`, + }, }, }, - teamId: 1, - hogFunctionId: hogFunction.id, + asyncFunctionRequest: { name: 'fetch', args: [ @@ -177,7 +184,6 @@ describe('Hog Executor', () => { method: 'POST', }, ], - vmState: expect.any(Object), }, timings: [ { @@ -197,7 +203,7 @@ describe('Hog Executor', () => { const splicedLogs = results[0].logs.splice(0, 100) logs.push(...splicedLogs) - const asyncExecResult = executor.executeAsyncResponse(simulateMockFetchAsyncResponse(results[0])) + const asyncExecResult = executor.executeAsyncResponse(results[0].invocation, createAsyncFunctionResponse()) logs.push(...asyncExecResult.logs) expect(asyncExecResult.error).toBeUndefined() @@ -259,13 +265,13 @@ describe('Hog Executor', () => { expect(results).toHaveLength(1) // Run the result one time simulating a successful fetch - const asyncResult1 = executor.executeAsyncResponse(simulateMockFetchAsyncResponse(results[0])) + const asyncResult1 = executor.executeAsyncResponse(results[0].invocation, createAsyncFunctionResponse()) expect(asyncResult1.finished).toBe(false) expect(asyncResult1.error).toBe(undefined) expect(asyncResult1.asyncFunctionRequest).toBeDefined() // Run the result one more time simulating a second successful fetch - const asyncResult2 = executor.executeAsyncResponse(simulateMockFetchAsyncResponse(asyncResult1)) + const asyncResult2 = executor.executeAsyncResponse(asyncResult1.invocation, createAsyncFunctionResponse()) // This time we should see an error for hitting the loop limit expect(asyncResult2.finished).toBe(false) expect(asyncResult2.error).toEqual('Exceeded maximum number of async steps: 2')