Skip to content

Commit

Permalink
Fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
benjackwhite committed Sep 5, 2024
1 parent af18caa commit bfbf8a0
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 14 deletions.
4 changes: 2 additions & 2 deletions plugin-server/src/cdp/cdp-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { HogExecutor } from './hog-executor'
import { HogFunctionManager } from './hog-function-manager'
import { HogWatcher, HogWatcherState } from './hog-watcher'
import { HogFunctionInvocationResult, HogFunctionType, LogEntry } from './types'
import { createInvocation } from './utils'
import { createInvocation, queueBlobToString } from './utils'

export class CdpApi {
private hogExecutor: HogExecutor
Expand Down Expand Up @@ -148,7 +148,7 @@ export class CdpApi {
// Re-parse the fetch args for the logging
const fetchArgs = {
...invocation.queueParameters,
body: invocation.queueBlob?.toString(),
body: queueBlobToString(invocation.queueBlob),
}

response = {
Expand Down
10 changes: 5 additions & 5 deletions plugin-server/src/cdp/fetch-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import {
HogFunctionQueueParametersFetchRequest,
HogFunctionQueueParametersFetchResponse,
} from './types'
import { gzipObject, serializeHogFunctionInvocation } from './utils'
import { gzipObject, queueBlobToString, serializeHogFunctionInvocation } from './utils'

export const BUCKETS_KB_WRITTEN = [0, 128, 512, 1024, 2024, 4096, 10240, Infinity]

Expand Down Expand Up @@ -40,13 +40,13 @@ export class FetchExecutor {

async execute(invocation: HogFunctionInvocation): Promise<HogFunctionInvocationResult | undefined> {
if (invocation.queue !== 'fetch' || !invocation.queueParameters) {
throw new Error('Bad invocation')
// throw new Error('Bad invocation')
return
}

const params = invocation.queueParameters as HogFunctionQueueParametersFetchRequest
const blob = invocation.queueBlob

const body = blob ? blob.toString() : undefined
const body = queueBlobToString(invocation.queueBlob)
if (body) {
histogramFetchPayloadSize.observe(body.length / 1024)
}
Expand Down Expand Up @@ -92,7 +92,7 @@ export class FetchExecutor {
}

const params = invocation.queueParameters as HogFunctionQueueParametersFetchRequest
const body = invocation.queueBlob ? invocation.queueBlob.toString() : undefined
const body = queueBlobToString(invocation.queueBlob) || ''
let responseBody = ''

const resParams: HogFunctionQueueParametersFetchResponse = {
Expand Down
12 changes: 7 additions & 5 deletions plugin-server/src/cdp/hog-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import {
HogFunctionQueueParametersFetchResponse,
HogFunctionType,
} from './types'
import { convertToHogFunctionFilterGlobal } from './utils'
import { convertToHogFunctionFilterGlobal, queueBlobToString } from './utils'

const MAX_ASYNC_STEPS = 2
const MAX_HOG_LOGS = 10
Expand Down Expand Up @@ -163,7 +163,7 @@ export class HogExecutor {
let responseBody: any = undefined
if (response) {
// Convert from buffer to string
responseBody = invocation.queueBlob ? Buffer.from(invocation.queueBlob).toString() : undefined
responseBody = queueBlobToString(invocation.queueBlob)
}

// Reset the queue parameters to be sure
Expand Down Expand Up @@ -341,16 +341,18 @@ export class HogExecutor {
const headers = fetchOptions?.headers || {
'Content-Type': 'application/json',
}
let body = fetchOptions?.body
// Modify the body to ensure it is a string (we allow Hog to send an object to keep things simple)
body = body ? (typeof body === 'string' ? body : JSON.stringify(body)) : body
const body: string | undefined = fetchOptions?.body
? typeof fetchOptions.body === 'string'
? fetchOptions.body
: JSON.stringify(fetchOptions.body)
: fetchOptions?.body

result.invocation.queue = 'fetch'
result.invocation.queueParameters = {
url,
method,
headers,
// body,
return_queue: 'hog',
}
// The payload is always blob encoded
Expand Down
4 changes: 4 additions & 0 deletions plugin-server/src/cdp/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -240,3 +240,7 @@ export function serializeHogFunctionInvocation(invocation: HogFunctionInvocation

return serializedInvocation
}

export function queueBlobToString(blob?: HogFunctionInvocation["queueBlob"]): string | undefined {
return blob ? Buffer.from(blob).toString('utf-8') : undefined
}
4 changes: 2 additions & 2 deletions plugin-server/tests/cdp/hog-executor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ describe('Hog Executor', () => {
},
})

const body = JSON.parse(result.invocation.queueBlob!.toString())
const body = JSON.parse(Buffer.from(result.invocation.queueBlob!).toString())
expect(body).toEqual({
event: {
uuid: 'uuid',
Expand Down Expand Up @@ -256,7 +256,7 @@ describe('Hog Executor', () => {
// This time we should see an error for hitting the loop limit
setupFetchResponse(result2.invocation)
const result3 = executor.execute(result1.invocation)
expect(result3.finished).toBe(false)
expect(result3.finished).toBe(true)
expect(result3.error).toEqual('Exceeded maximum number of async steps: 2')
expect(result3.logs.map((log) => log.message)).toEqual([
'Resuming function',
Expand Down

0 comments on commit bfbf8a0

Please sign in to comment.