diff --git a/plugin-server/src/cdp/cdp-api.ts b/plugin-server/src/cdp/cdp-api.ts index 4c40a8376c983..cfc70e7f1b8fc 100644 --- a/plugin-server/src/cdp/cdp-api.ts +++ b/plugin-server/src/cdp/cdp-api.ts @@ -9,7 +9,7 @@ import { HogExecutor } from './hog-executor' import { HogFunctionManager } from './hog-function-manager' import { HogWatcher, HogWatcherState } from './hog-watcher' import { HogFunctionInvocationResult, HogFunctionType, LogEntry } from './types' -import { createInvocation } from './utils' +import { createInvocation, queueBlobToString } from './utils' export class CdpApi { private hogExecutor: HogExecutor @@ -148,7 +148,7 @@ export class CdpApi { // Re-parse the fetch args for the logging const fetchArgs = { ...invocation.queueParameters, - body: invocation.queueBlob?.toString(), + body: queueBlobToString(invocation.queueBlob), } response = { diff --git a/plugin-server/src/cdp/fetch-executor.ts b/plugin-server/src/cdp/fetch-executor.ts index 6ab71f613f198..e9fa2a4ffbb8c 100644 --- a/plugin-server/src/cdp/fetch-executor.ts +++ b/plugin-server/src/cdp/fetch-executor.ts @@ -12,7 +12,7 @@ import { HogFunctionQueueParametersFetchRequest, HogFunctionQueueParametersFetchResponse, } from './types' -import { gzipObject, serializeHogFunctionInvocation } from './utils' +import { gzipObject, queueBlobToString, serializeHogFunctionInvocation } from './utils' export const BUCKETS_KB_WRITTEN = [0, 128, 512, 1024, 2024, 4096, 10240, Infinity] @@ -40,13 +40,13 @@ export class FetchExecutor { async execute(invocation: HogFunctionInvocation): Promise { if (invocation.queue !== 'fetch' || !invocation.queueParameters) { - throw new Error('Bad invocation') + // throw new Error('Bad invocation') + return } const params = invocation.queueParameters as HogFunctionQueueParametersFetchRequest - const blob = invocation.queueBlob - const body = blob ? blob.toString() : undefined + const body = queueBlobToString(invocation.queueBlob) if (body) { histogramFetchPayloadSize.observe(body.length / 1024) } @@ -92,7 +92,7 @@ export class FetchExecutor { } const params = invocation.queueParameters as HogFunctionQueueParametersFetchRequest - const body = invocation.queueBlob ? invocation.queueBlob.toString() : undefined + const body = queueBlobToString(invocation.queueBlob) || '' let responseBody = '' const resParams: HogFunctionQueueParametersFetchResponse = { diff --git a/plugin-server/src/cdp/hog-executor.ts b/plugin-server/src/cdp/hog-executor.ts index 2d8dd2e3b145e..28bad8e38099a 100644 --- a/plugin-server/src/cdp/hog-executor.ts +++ b/plugin-server/src/cdp/hog-executor.ts @@ -14,7 +14,7 @@ import { HogFunctionQueueParametersFetchResponse, HogFunctionType, } from './types' -import { convertToHogFunctionFilterGlobal } from './utils' +import { convertToHogFunctionFilterGlobal, queueBlobToString } from './utils' const MAX_ASYNC_STEPS = 2 const MAX_HOG_LOGS = 10 @@ -163,7 +163,7 @@ export class HogExecutor { let responseBody: any = undefined if (response) { // Convert from buffer to string - responseBody = invocation.queueBlob ? Buffer.from(invocation.queueBlob).toString() : undefined + responseBody = queueBlobToString(invocation.queueBlob) } // Reset the queue parameters to be sure @@ -341,16 +341,18 @@ export class HogExecutor { const headers = fetchOptions?.headers || { 'Content-Type': 'application/json', } - let body = fetchOptions?.body // Modify the body to ensure it is a string (we allow Hog to send an object to keep things simple) - body = body ? (typeof body === 'string' ? body : JSON.stringify(body)) : body + const body: string | undefined = fetchOptions?.body + ? typeof fetchOptions.body === 'string' + ? fetchOptions.body + : JSON.stringify(fetchOptions.body) + : fetchOptions?.body result.invocation.queue = 'fetch' result.invocation.queueParameters = { url, method, headers, - // body, return_queue: 'hog', } // The payload is always blob encoded diff --git a/plugin-server/src/cdp/utils.ts b/plugin-server/src/cdp/utils.ts index b6ad78d732efb..db1884c8c8a69 100644 --- a/plugin-server/src/cdp/utils.ts +++ b/plugin-server/src/cdp/utils.ts @@ -240,3 +240,7 @@ export function serializeHogFunctionInvocation(invocation: HogFunctionInvocation return serializedInvocation } + +export function queueBlobToString(blob?: HogFunctionInvocation["queueBlob"]): string | undefined { + return blob ? Buffer.from(blob).toString('utf-8') : undefined +} \ No newline at end of file diff --git a/plugin-server/tests/cdp/hog-executor.test.ts b/plugin-server/tests/cdp/hog-executor.test.ts index 5d22f63bea51a..2fc0f9f78cc4b 100644 --- a/plugin-server/tests/cdp/hog-executor.test.ts +++ b/plugin-server/tests/cdp/hog-executor.test.ts @@ -134,7 +134,7 @@ describe('Hog Executor', () => { }, }) - const body = JSON.parse(result.invocation.queueBlob!.toString()) + const body = JSON.parse(Buffer.from(result.invocation.queueBlob!).toString()) expect(body).toEqual({ event: { uuid: 'uuid', @@ -256,7 +256,7 @@ describe('Hog Executor', () => { // This time we should see an error for hitting the loop limit setupFetchResponse(result2.invocation) const result3 = executor.execute(result1.invocation) - expect(result3.finished).toBe(false) + expect(result3.finished).toBe(true) expect(result3.error).toEqual('Exceeded maximum number of async steps: 2') expect(result3.logs.map((log) => log.message)).toEqual([ 'Resuming function',