Skip to content

Commit

Permalink
Fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
benjackwhite committed Jul 26, 2024
1 parent d7e347e commit 1146de0
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 54 deletions.
26 changes: 13 additions & 13 deletions plugin-server/src/cdp/cdp-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { addLog, HogExecutor } from './hog-executor'
import { HogFunctionManager } from './hog-function-manager'
import { HogWatcher } from './hog-watcher/hog-watcher'
import { HogWatcherState } from './hog-watcher/types'
import { HogFunctionInvocation, HogFunctionType } from './types'
import { HogFunctionInvocation, HogFunctionInvocationAsyncRequest, HogFunctionType } from './types'

export class CdpApi {
private hogExecutor: HogExecutor
Expand Down Expand Up @@ -110,8 +110,6 @@ export class CdpApi {
globals: globals,
teamId: team.id,
hogFunctionId: id,
logs: [],
timings: [],
}

// We use the provided config if given, otherwise the function's config
Expand All @@ -127,6 +125,8 @@ export class CdpApi {
let response = this.hogExecutor.execute(compoundConfiguration, invocation)

while (response.asyncFunctionRequest) {
invocation.vmState = response.invocation.vmState

const asyncFunctionRequest = response.asyncFunctionRequest

if (mock_async_functions || asyncFunctionRequest.name !== 'fetch') {
Expand All @@ -140,25 +140,25 @@ export class CdpApi {
)

// Add the state, simulating what executeAsyncResponse would do
asyncFunctionRequest.vmState.stack.push(convertJSToHog({ status: 200, body: {} }))
invocation.vmState!.stack.push(convertJSToHog({ status: 200, body: {} }))
} else {
const asyncRes = await this.asyncFunctionExecutor!.execute(response, {
const asyncInvocationRequest: HogFunctionInvocationAsyncRequest = {
state: '', // WE don't care about the state for this level of testing
teamId: team.id,
hogFunctionId: hogFunction.id,
asyncFunctionRequest,
}
const asyncRes = await this.asyncFunctionExecutor!.execute(asyncInvocationRequest, {
sync: true,
})

if (!asyncRes || asyncRes.asyncFunctionResponse.error) {
addLog(response, 'error', 'Failed to execute async function')
}
asyncFunctionRequest.vmState.stack.push(
convertJSToHog(asyncRes?.asyncFunctionResponse.response ?? null)
)
response.timings.push(...(asyncRes?.asyncFunctionResponse.timings ?? []))
invocation.vmState!.stack.push(convertJSToHog(asyncRes?.asyncFunctionResponse.response ?? null))
}

// Clear it so we can't ever end up in a loop
delete response.asyncFunctionRequest

response = this.hogExecutor.execute(compoundConfiguration, response, asyncFunctionRequest.vmState)
response = this.hogExecutor.execute(compoundConfiguration, invocation)
}

res.json({
Expand Down
6 changes: 3 additions & 3 deletions plugin-server/src/cdp/hog-watcher/hog-watcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ export class HogWatcherActiveObservations {

observeResults(results: HogFunctionInvocationResult[]) {
results.forEach((result) =>
this.addObservations(result.hogFunctionId, {
this.addObservations(result.invocation.hogFunctionId, {
successes: result.finished ? 1 : 0,
failures: result.error ? 1 : 0,
})
Expand All @@ -75,8 +75,8 @@ export class HogWatcherActiveObservations {
// NOTE: This probably wants to be done using the response status instead :thinking:
responses.forEach((response) =>
this.addObservations(response.hogFunctionId, {
asyncFunctionSuccesses: response.error ? 0 : 1,
asyncFunctionFailures: response.error ? 1 : 0,
asyncFunctionSuccesses: response.asyncFunctionResponse.error ? 0 : 1,
asyncFunctionFailures: response.asyncFunctionResponse.error ? 1 : 0,
})
)
}
Expand Down
82 changes: 44 additions & 38 deletions plugin-server/tests/cdp/hog-executor.test.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import { DateTime } from 'luxon'
import { finished } from 'stream'

Check failure on line 2 in plugin-server/tests/cdp/hog-executor.test.ts

View workflow job for this annotation

GitHub Actions / Code quality

'finished' is defined but never used. Allowed unused vars must match /^_/u

import { HogExecutor } from '../../src/cdp/hog-executor'
import { HogFunctionManager } from '../../src/cdp/hog-function-manager'
import {
HogFunctionInvocationAsyncResponse,
HogFunctionAsyncFunctionResponse,
HogFunctionInvocationResult,
HogFunctionLogEntry,
HogFunctionType,
Expand All @@ -13,20 +14,17 @@ import { castTimestampOrNow } from '../../src/utils/utils'
import { HOG_EXAMPLES, HOG_FILTERS_EXAMPLES, HOG_INPUTS_EXAMPLES } from './examples'
import { createHogExecutionGlobals, createHogFunction, insertHogFunction as _insertHogFunction } from './fixtures'

const simulateMockFetchAsyncResponse = (result: HogFunctionInvocationResult): HogFunctionInvocationAsyncResponse => {
const createAsyncFunctionResponse = (): HogFunctionAsyncFunctionResponse => {
return {
...result,
asyncFunctionResponse: {
timings: [
{
kind: 'async_function',
duration_ms: 100,
},
],
response: {
status: 200,
body: 'success',
timings: [
{
kind: 'async_function',
duration_ms: 100,
},
],
response: {
status: 200,
body: 'success',
},
}
}
Expand Down Expand Up @@ -61,15 +59,19 @@ describe('Hog Executor', () => {
mockFunctionManager.getTeamHogFunction.mockReturnValue(hogFunction)
})

it('can parse incoming messages correctly', () => {
it('can execute messages', () => {
const globals = createHogExecutionGlobals()
const results = executor
.findMatchingFunctions(createHogExecutionGlobals())
.matchingFunctions.map((x) => executor.executeFunction(globals, x) as HogFunctionInvocationResult)
expect(results).toHaveLength(1)
expect(results[0]).toMatchObject({
id: expect.any(String),
hogFunctionId: hogFunction.id,
invocation: {
id: expect.any(String),
hogFunctionId: hogFunction.id,
},
finished: false,
asyncFunctionRequest: {},
})
})

Expand All @@ -83,7 +85,7 @@ describe('Hog Executor', () => {
team_id: 1,
log_source: 'hog_function',
log_source_id: hogFunction.id,
instance_id: results[0].id,
instance_id: results[0].invocation.id,
timestamp: expect.any(DateTime),
level: 'debug',
message: 'Executing function',
Expand All @@ -92,7 +94,7 @@ describe('Hog Executor', () => {
team_id: 1,
log_source: 'hog_function',
log_source_id: hogFunction.id,
instance_id: results[0].id,
instance_id: results[0].invocation.id,
timestamp: expect.any(DateTime),
level: 'debug',
message: "Suspending function due to async function call 'fetch'. Payload: 1299 bytes",
Expand Down Expand Up @@ -136,24 +138,29 @@ describe('Hog Executor', () => {
.findMatchingFunctions(createHogExecutionGlobals())
.matchingFunctions.map((x) => executor.executeFunction(globals, x) as HogFunctionInvocationResult)
expect(results[0]).toMatchObject({
id: results[0].id,
globals: {
project: { id: 1, name: 'test', url: 'http://localhost:8000/projects/1' },
event: {
uuid: 'uuid',
name: 'test',
distinct_id: 'distinct_id',
url: 'http://localhost:8000/events/1',
properties: { $lib_version: '1.2.3' },
timestamp: '2024-06-07T12:00:00.000Z',
},
source: {
name: 'Test hog function',
url: `http://localhost:8000/projects/1/pipeline/destinations/hog-${hogFunction.id}/configuration/`,
invocation: {
id: results[0].invocation.id,
teamId: 1,
hogFunctionId: hogFunction.id,
vmState: expect.any(Object),

globals: {
project: { id: 1, name: 'test', url: 'http://localhost:8000/projects/1' },
event: {
uuid: 'uuid',
name: 'test',
distinct_id: 'distinct_id',
url: 'http://localhost:8000/events/1',
properties: { $lib_version: '1.2.3' },
timestamp: '2024-06-07T12:00:00.000Z',
},
source: {
name: 'Test hog function',
url: `http://localhost:8000/projects/1/pipeline/destinations/hog-${hogFunction.id}/configuration/`,
},
},
},
teamId: 1,
hogFunctionId: hogFunction.id,

asyncFunctionRequest: {
name: 'fetch',
args: [
Expand All @@ -177,7 +184,6 @@ describe('Hog Executor', () => {
method: 'POST',
},
],
vmState: expect.any(Object),
},
timings: [
{
Expand All @@ -197,7 +203,7 @@ describe('Hog Executor', () => {
const splicedLogs = results[0].logs.splice(0, 100)
logs.push(...splicedLogs)

const asyncExecResult = executor.executeAsyncResponse(simulateMockFetchAsyncResponse(results[0]))
const asyncExecResult = executor.executeAsyncResponse(results[0].invocation, createAsyncFunctionResponse())

logs.push(...asyncExecResult.logs)
expect(asyncExecResult.error).toBeUndefined()
Expand Down Expand Up @@ -259,13 +265,13 @@ describe('Hog Executor', () => {
expect(results).toHaveLength(1)

// Run the result one time simulating a successful fetch
const asyncResult1 = executor.executeAsyncResponse(simulateMockFetchAsyncResponse(results[0]))
const asyncResult1 = executor.executeAsyncResponse(results[0].invocation, createAsyncFunctionResponse())
expect(asyncResult1.finished).toBe(false)
expect(asyncResult1.error).toBe(undefined)
expect(asyncResult1.asyncFunctionRequest).toBeDefined()

// Run the result one more time simulating a second successful fetch
const asyncResult2 = executor.executeAsyncResponse(simulateMockFetchAsyncResponse(asyncResult1))
const asyncResult2 = executor.executeAsyncResponse(asyncResult1.invocation, createAsyncFunctionResponse())
// This time we should see an error for hitting the loop limit
expect(asyncResult2.finished).toBe(false)
expect(asyncResult2.error).toEqual('Exceeded maximum number of async steps: 2')
Expand Down

0 comments on commit 1146de0

Please sign in to comment.