diff --git a/plugin-server/src/cdp/async-function-executor.ts b/plugin-server/src/cdp/async-function-executor.ts index 2d7309f9e739c..89e6c6c299ba9 100644 --- a/plugin-server/src/cdp/async-function-executor.ts +++ b/plugin-server/src/cdp/async-function-executor.ts @@ -2,20 +2,19 @@ import { Webhook } from '@posthog/plugin-scaffold' import { KAFKA_CDP_FUNCTION_CALLBACKS } from '../config/kafka-topics' import { PluginsServerConfig } from '../types' -import { KafkaProducerWrapper } from '../utils/db/kafka-producer-wrapper' import { trackedFetch } from '../utils/fetch' import { status } from '../utils/status' import { RustyHook } from '../worker/rusty-hook' -import { HogFunctionInvocationAsyncRequest, HogFunctionInvocationAsyncResponse } from './types' +import { + HogFunctionInvocationAsyncRequest, + HogFunctionInvocationAsyncResponse, + HogFunctionMessageToQueue, +} from './types' export class AsyncFunctionExecutor { - constructor( - private serverConfig: PluginsServerConfig, - private rustyHook: RustyHook, - private kafkaProducer: KafkaProducerWrapper - ) {} + constructor(private serverConfig: PluginsServerConfig, private rustyHook: RustyHook) {} - async execute(request: HogFunctionInvocationAsyncRequest): Promise { + async execute(request: HogFunctionInvocationAsyncRequest): Promise { const loggingContext = { hogFunctionId: request.hogFunctionId, invocationId: request.id, @@ -25,16 +24,15 @@ export class AsyncFunctionExecutor { switch (request.asyncFunctionName) { case 'fetch': - await this.asyncFunctionFetch(request) - break + return await this.asyncFunctionFetch(request) default: status.error('🦔', `[HogExecutor] Unknown async function: ${request.asyncFunctionName}`, loggingContext) } - - return request } - private async asyncFunctionFetch(request: HogFunctionInvocationAsyncRequest): Promise { + private async asyncFunctionFetch( + request: HogFunctionInvocationAsyncRequest + ): Promise { // TODO: validate the args const args = request.asyncFunctionArgs ?? [] const url: string = args[0] @@ -95,13 +93,11 @@ export class AsyncFunctionExecutor { response.error = 'Something went wrong with the fetch request.' } - // NOTE: This feels like overkill but is basically simulating rusty hook's callback that will eventually be implemented - await this.kafkaProducer!.produce({ + return { topic: KAFKA_CDP_FUNCTION_CALLBACKS, - value: Buffer.from(JSON.stringify(response)), + value: response, key: response.id, - waitForAck: true, - }) + } } } } diff --git a/plugin-server/src/cdp/cdp-processed-events-consumer.ts b/plugin-server/src/cdp/cdp-processed-events-consumer.ts index 0ba2f3e246ada..04bef4f999772 100644 --- a/plugin-server/src/cdp/cdp-processed-events-consumer.ts +++ b/plugin-server/src/cdp/cdp-processed-events-consumer.ts @@ -23,7 +23,7 @@ import { HogFunctionInvocationAsyncResponse, HogFunctionInvocationGlobals, HogFunctionInvocationResult, - HogFunctionLogEntry, + HogFunctionMessageToQueue, } from './types' import { convertToHogFunctionInvocationGlobals } from './utils' @@ -57,7 +57,7 @@ abstract class CdpConsumerBase { groupTypeManager: GroupTypeManager hogFunctionManager: HogFunctionManager asyncFunctionExecutor?: AsyncFunctionExecutor - hogExecutor?: HogExecutor + hogExecutor: HogExecutor appMetrics?: AppMetrics isStopping = false @@ -73,10 +73,54 @@ abstract class CdpConsumerBase { this.organizationManager = new OrganizationManager(postgres, this.teamManager) this.groupTypeManager = new GroupTypeManager(postgres, this.teamManager) this.hogFunctionManager = new HogFunctionManager(postgres, config) + this.hogExecutor = new HogExecutor(this.config, this.hogFunctionManager) } public abstract handleEachBatch(messages: Message[], heartbeat: () => void): Promise + protected async processInvocationResults(results: HogFunctionInvocationResult[]): Promise { + // Processes any async functions and queues up produced messages + + // TODO: Follow up - process metrics from the invocationResults + await runInstrumentedFunction({ + statsKey: `cdpFunctionExecutor.handleEachBatch.produceResults`, + func: async () => { + const messagesToProduce: HogFunctionMessageToQueue[] = [] + + await Promise.all( + results.map(async (result) => { + result.logs.forEach((x) => { + messagesToProduce.push({ + topic: KAFKA_LOG_ENTRIES, + value: x, + key: x.instance_id, + }) + }) + + if (result.asyncFunction) { + const res = await this.asyncFunctionExecutor!.execute(result.asyncFunction) + + if (res) { + messagesToProduce.push(res) + } + } + }) + ) + + await Promise.all( + messagesToProduce.map((x) => + this.kafkaProducer!.produce({ + topic: x.topic, + value: Buffer.from(JSON.stringify(x.value)), + key: x.key, + waitForAck: true, + }) + ) + ) + }, + }) + } + public async start(): Promise { status.info('🔁', `${this.name} - starting`, { librdKafkaVersion: librdkafkaVersion, @@ -94,8 +138,7 @@ abstract class CdpConsumerBase { ) const rustyHook = this.hub?.rustyHook ?? new RustyHook(this.config) - this.asyncFunctionExecutor = new AsyncFunctionExecutor(this.config, rustyHook, this.kafkaProducer!) - this.hogExecutor = new HogExecutor(this.config, this.hogFunctionManager, this.asyncFunctionExecutor) + this.asyncFunctionExecutor = new AsyncFunctionExecutor(this.config, rustyHook) this.appMetrics = this.hub?.appMetrics ?? @@ -199,7 +242,7 @@ export class CdpProcessedEventsConsumer extends CdpConsumerBase { statsKey: `cdpFunctionExecutor.handleEachBatch.consumeBatch`, func: async () => { const results = await Promise.all( - events.map((e) => this.hogExecutor!.executeMatchingFunctions(e)) + events.map((e) => this.hogExecutor.executeMatchingFunctions(e)) ) invocationResults.push(...results.flat()) }, @@ -207,32 +250,7 @@ export class CdpProcessedEventsConsumer extends CdpConsumerBase { heartbeat() - // TODO: Follow up - process metrics from the invocationResults - await runInstrumentedFunction({ - statsKey: `cdpFunctionExecutor.handleEachBatch.queueMetrics`, - func: async () => { - const allLogs = invocationResults.reduce((acc, result) => { - return [...acc, ...result.logs] - }, [] as HogFunctionLogEntry[]) - - await Promise.all( - allLogs.map((x) => - this.kafkaProducer!.produce({ - topic: KAFKA_LOG_ENTRIES, - value: Buffer.from(JSON.stringify(x)), - key: x.instance_id, - waitForAck: true, - }) - ) - ) - - if (allLogs.length) { - status.info('🔁', `${this.name} - produced logs`, { - size: allLogs.length, - }) - } - }, - }) + await this.processInvocationResults(invocationResults) }, }) } @@ -310,39 +328,14 @@ export class CdpFunctionCallbackConsumer extends CdpConsumerBase { await runInstrumentedFunction({ statsKey: `cdpFunctionExecutor.handleEachBatch.consumeBatch`, func: async () => { - const results = await Promise.all(events.map((e) => this.hogExecutor!.executeAsyncResponse(e))) + const results = await Promise.all(events.map((e) => this.hogExecutor.executeAsyncResponse(e))) invocationResults.push(...results.flat()) }, }) heartbeat() - // TODO: Follow up - process metrics from the invocationResults - await runInstrumentedFunction({ - statsKey: `cdpFunctionExecutor.handleEachBatch.queueMetrics`, - func: async () => { - const allLogs = invocationResults.reduce((acc, result) => { - return [...acc, ...result.logs] - }, [] as HogFunctionLogEntry[]) - - await Promise.all( - allLogs.map((x) => - this.kafkaProducer!.produce({ - topic: KAFKA_LOG_ENTRIES, - value: Buffer.from(JSON.stringify(x)), - key: x.instance_id, - waitForAck: true, - }) - ) - ) - - if (allLogs.length) { - status.info('🔁', `${this.name} - produced logs`, { - size: allLogs.length, - }) - } - }, - }) + await this.processInvocationResults(invocationResults) }, }) } diff --git a/plugin-server/src/cdp/hog-executor.ts b/plugin-server/src/cdp/hog-executor.ts index bda63f316bfba..9de9051ee3a18 100644 --- a/plugin-server/src/cdp/hog-executor.ts +++ b/plugin-server/src/cdp/hog-executor.ts @@ -4,19 +4,19 @@ import { DateTime } from 'luxon' import { PluginsServerConfig, TimestampFormat } from '../types' import { status } from '../utils/status' import { castTimestampOrNow, UUIDT } from '../utils/utils' -import { AsyncFunctionExecutor } from './async-function-executor' import { HogFunctionManager } from './hog-function-manager' import { HogFunctionInvocation, HogFunctionInvocationAsyncResponse, HogFunctionInvocationGlobals, HogFunctionInvocationResult, - HogFunctionLogEntry, HogFunctionLogEntryLevel, HogFunctionType, } from './types' import { convertToHogFunctionFilterGlobal } from './utils' +const MAX_ASYNC_STEPS = 2 + export const formatInput = (bytecode: any, globals: HogFunctionInvocation['globals']): any => { // Similar to how we generate the bytecode by iterating over the values, // here we iterate over the object and replace the bytecode with the actual values @@ -46,16 +46,12 @@ export const formatInput = (bytecode: any, globals: HogFunctionInvocation['globa } export class HogExecutor { - constructor( - private serverConfig: PluginsServerConfig, - private hogFunctionManager: HogFunctionManager, - private asyncFunctionExecutor: AsyncFunctionExecutor - ) {} + constructor(private serverConfig: PluginsServerConfig, private hogFunctionManager: HogFunctionManager) {} /** * Intended to be invoked as a starting point from an event */ - async executeMatchingFunctions(event: HogFunctionInvocationGlobals): Promise { + executeMatchingFunctions(event: HogFunctionInvocationGlobals): HogFunctionInvocationResult[] { const allFunctionsForTeam = this.hogFunctionManager.getTeamHogFunctions(event.project.id) const filtersGlobals = convertToHogFunctionFilterGlobal(event) @@ -119,7 +115,7 @@ export class HogExecutor { }, } - const result = await this.execute(hogFunction, { + const result = this.execute(hogFunction, { id: new UUIDT().toString(), globals: modifiedGlobals, }) @@ -133,7 +129,7 @@ export class HogExecutor { /** * Intended to be invoked as a continuation from an async function */ - async executeAsyncResponse(invocation: HogFunctionInvocationAsyncResponse): Promise { + executeAsyncResponse(invocation: HogFunctionInvocationAsyncResponse): HogFunctionInvocationResult { if (!invocation.hogFunctionId) { throw new Error('No hog function id provided') } @@ -145,25 +141,38 @@ export class HogExecutor { invocation.hogFunctionId ] + const baseInvocation: HogFunctionInvocation = { + id: invocation.id, + globals: invocation.globals, + } + + const errorRes = (error = 'Something went wrong'): HogFunctionInvocationResult => ({ + ...baseInvocation, + hogFunctionId: invocation.hogFunctionId, + teamId: invocation.teamId, + success: false, + error, + // TODO: Probably useful to save a log as well? + logs: [], + }) + + if (!hogFunction) { + return errorRes(`Hog Function with ID ${invocation.hogFunctionId} not found`) + } + if (!invocation.vmState || invocation.error) { - // TODO: Maybe add a log as well? - return { - ...invocation, - success: false, - error: invocation.error ?? new Error('No VM state provided for async response'), - logs: [], - } + return errorRes(invocation.error ?? 'No VM state provided for async response') } invocation.vmState.stack.push(convertJSToHog(invocation.vmResponse ?? null)) - return await this.execute(hogFunction, invocation, invocation.vmState) + return this.execute(hogFunction, baseInvocation, invocation.vmState) } - async execute( + execute( hogFunction: HogFunctionType, invocation: HogFunctionInvocation, state?: VMState - ): Promise { + ): HogFunctionInvocationResult { const loggingContext = { hogFunctionId: hogFunction.id, hogFunctionName: hogFunction.name, @@ -172,10 +181,16 @@ export class HogExecutor { status.info('🦔', `[HogExecutor] Executing function`, loggingContext) - let error: any = null - const logs: HogFunctionLogEntry[] = [] let lastTimestamp = DateTime.now() + const result: HogFunctionInvocationResult = { + ...invocation, + teamId: hogFunction.team_id, + hogFunctionId: hogFunction.id, + success: false, + logs: [], + } + const log = (level: HogFunctionLogEntryLevel, message: string) => { // TRICKY: The log entries table is de-duped by timestamp, so we need to ensure that the timestamps are unique // It is unclear how this affects parallel execution environments @@ -186,7 +201,7 @@ export class HogExecutor { } lastTimestamp = now - logs.push({ + result.logs.push({ team_id: hogFunction.team_id, log_source: 'hog_function', log_source_id: hogFunction.id, @@ -200,6 +215,12 @@ export class HogExecutor { if (!state) { log('debug', `Executing function`) } else { + // NOTE: We do our own check here for async steps as it saves executing Hog and is easier to handle + if (state.asyncSteps >= MAX_ASYNC_STEPS) { + log('error', `Function exceeded maximum async steps`) + result.error = 'Function exceeded maximum async steps' + return result + } log('debug', `Resuming function`) } @@ -209,7 +230,7 @@ export class HogExecutor { const res = exec(state ?? hogFunction.bytecode, { globals, timeout: 100, // NOTE: This will likely be configurable in the future - maxAsyncSteps: 5, // NOTE: This will likely be configurable in the future + maxAsyncSteps: MAX_ASYNC_STEPS, // NOTE: This will likely be configurable in the future asyncFunctions: { // We need to pass these in but they don't actually do anything as it is a sync exec fetch: async () => Promise.resolve(), @@ -234,31 +255,27 @@ export class HogExecutor { const args = (res.asyncFunctionArgs ?? []).map((arg) => convertHogToJS(arg)) if (res.asyncFunctionName) { - await this.asyncFunctionExecutor.execute({ + result.asyncFunction = { ...invocation, teamId: hogFunction.team_id, hogFunctionId: hogFunction.id, asyncFunctionName: res.asyncFunctionName, asyncFunctionArgs: args, vmState: res.state, - }) + } } else { log('warn', `Function was not finished but also had no async function to execute.`) } } else { log('debug', `Function completed`) } + result.success = true } catch (err) { - error = err - status.error('🦔', `[HogExecutor] Error executing function ${hogFunction.id} - ${hogFunction.name}`, error) + result.error = err + status.error('🦔', `[HogExecutor] Error executing function ${hogFunction.id} - ${hogFunction.name}`, err) } - return { - ...invocation, - success: !error, - error, - logs, - } + return result } buildHogFunctionGlobals(hogFunction: HogFunctionType, invocation: HogFunctionInvocation): Record { diff --git a/plugin-server/src/cdp/types.ts b/plugin-server/src/cdp/types.ts index 438dc88d9fe8f..b5ef75064abc8 100644 --- a/plugin-server/src/cdp/types.ts +++ b/plugin-server/src/cdp/types.ts @@ -117,9 +117,12 @@ export type HogFunctionInvocation = { } export type HogFunctionInvocationResult = HogFunctionInvocation & { + teamId: number + hogFunctionId: HogFunctionType['id'] success: boolean error?: any logs: HogFunctionLogEntry[] + asyncFunction?: HogFunctionInvocationAsyncRequest } export type HogFunctionInvocationAsyncRequest = HogFunctionInvocation & { @@ -137,6 +140,12 @@ export type HogFunctionInvocationAsyncResponse = HogFunctionInvocationAsyncReque vmResponse?: any } +export type HogFunctionMessageToQueue = { + topic: string + value: object + key: string +} + // Mostly copied from frontend types export type HogFunctionInputSchemaType = { type: 'string' | 'number' | 'boolean' | 'dictionary' | 'choice' | 'json' diff --git a/plugin-server/tests/cdp/cdp-processed-events-consumer.test.ts b/plugin-server/tests/cdp/cdp-processed-events-consumer.test.ts index c8a89acb2318f..8cd9709b74eb8 100644 --- a/plugin-server/tests/cdp/cdp-processed-events-consumer.test.ts +++ b/plugin-server/tests/cdp/cdp-processed-events-consumer.test.ts @@ -199,7 +199,31 @@ describe('CDP Processed Events Consuner', () => { // Once for the async callback, twice for the logs expect(mockProducer.produce).toHaveBeenCalledTimes(3) - expect(decodeKafkaMessage(mockProducer.produce.mock.calls[0][0])).toEqual({ + expect(decodeKafkaMessage(mockProducer.produce.mock.calls[0][0])).toMatchObject({ + key: expect.any(String), + topic: 'log_entries_test', + value: { + instance_id: expect.any(String), + level: 'debug', + log_source: 'hog_function', + log_source_id: expect.any(String), + message: 'Executing function', + team_id: 2, + timestamp: expect.any(String), + }, + waitForAck: true, + }) + + expect(decodeKafkaMessage(mockProducer.produce.mock.calls[1][0])).toMatchObject({ + topic: 'log_entries_test', + value: { + log_source: 'hog_function', + message: "Suspending function due to async function call 'fetch'", + team_id: 2, + }, + }) + + expect(decodeKafkaMessage(mockProducer.produce.mock.calls[2][0])).toEqual({ key: expect.any(String), topic: 'cdp_function_callbacks_test', value: { @@ -243,30 +267,6 @@ describe('CDP Processed Events Consuner', () => { }, waitForAck: true, }) - - expect(decodeKafkaMessage(mockProducer.produce.mock.calls[1][0])).toMatchObject({ - key: expect.any(String), - topic: 'log_entries_test', - value: { - instance_id: expect.any(String), - level: 'debug', - log_source: 'hog_function', - log_source_id: expect.any(String), - message: 'Executing function', - team_id: 2, - timestamp: expect.any(String), - }, - waitForAck: true, - }) - - expect(decodeKafkaMessage(mockProducer.produce.mock.calls[2][0])).toMatchObject({ - topic: 'log_entries_test', - value: { - log_source: 'hog_function', - message: "Suspending function due to async function call 'fetch'", - team_id: 2, - }, - }) }) }) }) diff --git a/plugin-server/tests/cdp/examples.ts b/plugin-server/tests/cdp/examples.ts index b99b86eb1d67b..d07be03b00db5 100644 --- a/plugin-server/tests/cdp/examples.ts +++ b/plugin-server/tests/cdp/examples.ts @@ -6,7 +6,7 @@ import { HogFunctionType } from '../../src/cdp/types' */ export const HOG_EXAMPLES: Record> = { simple_fetch: { - hog: "fetch(inputs.url, {\n 'headers': inputs.headers,\n 'body': inputs.payload,\n 'method': inputs.method\n});", + hog: "let res := fetch(inputs.url, {\n 'headers': inputs.headers,\n 'body': inputs.body,\n 'method': inputs.method\n});\n\nprint('Fetch response:', res)", bytecode: [ '_h', 32, @@ -20,7 +20,7 @@ export const HOG_EXAMPLES: Record { getTeamHogFunctions: jest.fn(), } - const mockAsyncFuntionExecutor = { - execute: jest.fn(), - } - beforeEach(() => { jest.useFakeTimers() jest.setSystemTime(new Date('2024-06-07T12:00:00.000Z').getTime()) - executor = new HogExecutor( - config, - mockFunctionManager as any as HogFunctionManager, - mockAsyncFuntionExecutor as any as AsyncFunctionExecutor - ) + executor = new HogExecutor(config, mockFunctionManager as any as HogFunctionManager) }) describe('general event processing', () => { - /** - * Tests here are somewhat expensive so should mostly simulate happy paths and the more e2e scenarios - */ - it('can parse incoming messages correctly', async () => { - const fn = createHogFunction({ + let hogFunction: HogFunctionType + beforeEach(() => { + hogFunction = createHogFunction({ name: 'Test hog function', ...HOG_EXAMPLES.simple_fetch, ...HOG_INPUTS_EXAMPLES.simple_fetch, @@ -46,58 +36,114 @@ describe('Hog Executor', () => { }) mockFunctionManager.getTeamHogFunctions.mockReturnValue({ - [1]: fn, + [hogFunction.id]: hogFunction, }) + }) - // Create a message that should be processed by this function - // Run the function and check that it was executed - await executor.executeMatchingFunctions(createHogExecutionGlobals()) - - expect(mockAsyncFuntionExecutor.execute).toHaveBeenCalledTimes(1) - expect(mockAsyncFuntionExecutor.execute.mock.calls[0][0]).toMatchObject({ + it('can parse incoming messages correctly', () => { + const results = executor.executeMatchingFunctions(createHogExecutionGlobals()) + expect(results).toHaveLength(1) + expect(results[0]).toMatchObject({ id: expect.any(String), + hogFunctionId: hogFunction.id, + }) + }) + + it('collects logs from the function', () => { + const results = executor.executeMatchingFunctions(createHogExecutionGlobals()) + expect(results[0].logs).toMatchObject([ + { + team_id: 1, + log_source: 'hog_function', + log_source_id: hogFunction.id, + instance_id: results[0].id, + timestamp: '2024-06-07 12:00:00.001', + level: 'debug', + message: 'Executing function', + }, + { + team_id: 1, + log_source: 'hog_function', + log_source_id: hogFunction.id, + instance_id: results[0].id, + timestamp: '2024-06-07 12:00:00.002', + level: 'debug', + message: "Suspending function due to async function call 'fetch'", + }, + ]) + }) + + it('queues up an async function call', () => { + const results = executor.executeMatchingFunctions(createHogExecutionGlobals()) + expect(results[0].asyncFunction).toMatchObject({ + id: results[0].id, globals: { + project: { id: 1, name: 'test', url: 'http://localhost:8000/projects/1' }, event: { uuid: 'uuid', + name: 'test', + distinct_id: 'distinct_id', + url: 'http://localhost:8000/events/1', + properties: { $lib_version: '1.2.3' }, + timestamp: '2024-06-07T12:00:00.000Z', + }, + source: { + name: 'Test hog function', + url: `http://localhost:8000/projects/1/pipeline/destinations/hog-${hogFunction.id}/configuration/`, }, }, teamId: 1, - hogFunctionId: expect.any(String), + hogFunctionId: hogFunction.id, asyncFunctionName: 'fetch', - vmState: expect.any(Object), - }) - expect(mockAsyncFuntionExecutor.execute.mock.calls[0][0].asyncFunctionArgs).toMatchInlineSnapshot(` - Array [ - "https://example.com/posthog-webhook", - Object { - "body": Object { - "event": Object { - "distinct_id": "distinct_id", - "name": "test", - "properties": Object { - "$lib_version": "1.2.3", + asyncFunctionArgs: [ + 'https://example.com/posthog-webhook', + { + headers: { version: 'v=1.2.3' }, + body: { + event: { + uuid: 'uuid', + name: 'test', + distinct_id: 'distinct_id', + url: 'http://localhost:8000/events/1', + properties: { $lib_version: '1.2.3' }, + timestamp: '2024-06-07T12:00:00.000Z', + }, + groups: null, + nested: { foo: 'http://localhost:8000/events/1' }, + person: null, + event_url: 'http://localhost:8000/events/1-test', }, - "timestamp": "2024-06-07T12:00:00.000Z", - "url": "http://localhost:8000/events/1", - "uuid": "uuid", - }, - "event_url": "http://localhost:8000/events/1-test", - "groups": null, - "nested": Object { - "foo": "http://localhost:8000/events/1", - }, - "person": null, + method: 'POST', }, - "headers": Object { - "version": "v=1.2.3", - }, - "method": "POST", - }, - ] - `) + ], + vmState: expect.any(Object), + }) }) - // NOTE: Will be fixed in follow up - it('can filters incoming messages correctly', async () => { + + it('executes the full function in a loop', () => { + const logs: HogFunctionLogEntry[] = [] + const results = executor.executeMatchingFunctions(createHogExecutionGlobals()) + logs.push(...results[0].logs) + const asyncExecResult = executor.executeAsyncResponse({ + ...results[0].asyncFunction!, + vmResponse: { status: 200, body: 'success' }, + }) + + logs.push(...asyncExecResult.logs) + expect(asyncExecResult.error).toBeUndefined() + expect(asyncExecResult.success).toBe(true) + expect(logs.map((log) => log.message)).toEqual([ + 'Executing function', + "Suspending function due to async function call 'fetch'", + 'Resuming function', + 'Fetch response:, {"status":200,"body":"success"}', + 'Function completed', + ]) + }) + }) + + describe('filtering', () => { + it('can filters incoming messages correctly', () => { const fn = createHogFunction({ ...HOG_EXAMPLES.simple_fetch, ...HOG_INPUTS_EXAMPLES.simple_fetch, @@ -105,13 +151,13 @@ describe('Hog Executor', () => { }) mockFunctionManager.getTeamHogFunctions.mockReturnValue({ - [1]: fn, + [fn.id]: fn, }) - const resultsShouldntMatch = await executor.executeMatchingFunctions(createHogExecutionGlobals()) + const resultsShouldntMatch = executor.executeMatchingFunctions(createHogExecutionGlobals()) expect(resultsShouldntMatch).toHaveLength(0) - const resultsShouldMatch = await executor.executeMatchingFunctions( + const resultsShouldMatch = executor.executeMatchingFunctions( createHogExecutionGlobals({ event: { name: '$pageview', @@ -124,4 +170,40 @@ describe('Hog Executor', () => { expect(resultsShouldMatch).toHaveLength(1) }) }) + + describe('async function responses', () => { + it('prevents large looped fetch calls', () => { + const fn = createHogFunction({ + ...HOG_EXAMPLES.recursive_fetch, + ...HOG_INPUTS_EXAMPLES.simple_fetch, + ...HOG_FILTERS_EXAMPLES.no_filters, + }) + + mockFunctionManager.getTeamHogFunctions.mockReturnValue({ + [fn.id]: fn, + }) + + // Simulate the recusive loop + const results = executor.executeMatchingFunctions(createHogExecutionGlobals()) + expect(results).toHaveLength(1) + + // Run the result one time simulating a successful fetch + const asyncResult1 = executor.executeAsyncResponse({ + ...results[0].asyncFunction!, + vmResponse: { status: 200, body: 'success' }, + }) + expect(asyncResult1.success).toBe(true) + expect(asyncResult1.asyncFunction).toBeDefined() + + // Run the result one more time simulating a second successful fetch + const asyncResult2 = executor.executeAsyncResponse({ + ...asyncResult1.asyncFunction!, + vmResponse: { status: 200, body: 'success' }, + }) + // This time we should see an error for hitting the loop limit + expect(asyncResult2.success).toBe(false) + expect(asyncResult2.error).toEqual('Function exceeded maximum async steps') + expect(asyncResult2.logs.map((log) => log.message)).toEqual(['Function exceeded maximum async steps']) + }) + }) })