From c09b1de6440648e63719d82694d7f5ccaa083b51 Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Fri, 25 Oct 2024 14:21:17 +0200 Subject: [PATCH] feat(hog): importing modules in hog (#25796) Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com> --- hogvm/python/execute.py | 110 +++++++++--- hogvm/python/test/test_execute.py | 25 +++ hogvm/stl/src/arrayCount.hog | 2 +- hogvm/stl/src/arrayExists.hog | 2 +- hogvm/stl/src/arrayFilter.hog | 2 +- hogvm/stl/src/arrayMap.hog | 2 +- hogvm/typescript/package.json | 2 +- .../typescript/src/__tests__/execute.test.ts | 67 +++++-- hogvm/typescript/src/constants.ts | 1 + hogvm/typescript/src/execute.ts | 164 +++++++++++++----- hogvm/typescript/src/types.ts | 16 +- package.json | 2 +- plugin-server/package.json | 2 +- plugin-server/pnpm-lock.yaml | 8 +- plugin-server/tests/cdp/cdp-api.test.ts | 4 +- .../cdp/cdp-processed-events-consumer.test.ts | 2 +- plugin-server/tests/cdp/hog-executor.test.ts | 6 +- pnpm-lock.yaml | 11 +- posthog/api/services/query.py | 5 +- 19 files changed, 320 insertions(+), 113 deletions(-) diff --git a/hogvm/python/execute.py b/hogvm/python/execute.py index 77b34de135a89..2c6a363f629e2 100644 --- a/hogvm/python/execute.py +++ b/hogvm/python/execute.py @@ -27,29 +27,36 @@ MAX_MEMORY = 64 * 1024 * 1024 # 64 MB MAX_FUNCTION_ARGS_LENGTH = 300 +CALLSTACK_LENGTH = 1000 @dataclass class BytecodeResult: result: Any - bytecode: list[Any] + bytecodes: dict[str, list[Any]] stdout: list[str] def execute_bytecode( - bytecode: list[Any], + input: list[Any] | dict, globals: Optional[dict[str, Any]] = None, functions: Optional[dict[str, Callable[..., Any]]] = None, timeout=timedelta(seconds=5), team: Optional["Team"] = None, debug=False, ) -> BytecodeResult: - if len(bytecode) == 0 or (bytecode[0] != HOGQL_BYTECODE_IDENTIFIER and bytecode[0] != HOGQL_BYTECODE_IDENTIFIER_V0): + bytecodes = input if isinstance(input, dict) else {"root": {"bytecode": input}} + root_bytecode = bytecodes.get("root", {}).get("bytecode", []) or [] + + if ( + not root_bytecode + or len(root_bytecode) == 0 + or (root_bytecode[0] != HOGQL_BYTECODE_IDENTIFIER and root_bytecode[0] != HOGQL_BYTECODE_IDENTIFIER_V0) + ): raise HogVMException(f"Invalid bytecode. Must start with '{HOGQL_BYTECODE_IDENTIFIER}'") - version = bytecode[1] if len(bytecode) >= 2 and bytecode[0] == HOGQL_BYTECODE_IDENTIFIER else 0 - result = None + version = root_bytecode[1] if len(root_bytecode) >= 2 and root_bytecode[0] == HOGQL_BYTECODE_IDENTIFIER else 0 start_time = time.time() - last_op = len(bytecode) - 1 + last_op = len(root_bytecode) - 1 stack: list = [] upvalues: list[dict] = [] upvalues_by_id: dict[int, dict] = {} @@ -61,23 +68,23 @@ def execute_bytecode( max_mem_used = 0 ops = 0 stdout: list[str] = [] - colored_bytecode = color_bytecode(bytecode) if debug else [] + debug_bytecode = [] if isinstance(timeout, int): timeout = timedelta(seconds=timeout) if len(call_stack) == 0: call_stack.append( CallFrame( - ip=2 if bytecode[0] == HOGQL_BYTECODE_IDENTIFIER else 1, + ip=0, chunk="root", stack_start=0, arg_len=0, closure=new_hog_closure( new_hog_callable( - type="main", + type="local", arg_count=0, upvalue_count=0, - ip=2 if bytecode[0] == HOGQL_BYTECODE_IDENTIFIER else 1, + ip=0, chunk="root", name="", ) @@ -85,18 +92,30 @@ def execute_bytecode( ) ) frame = call_stack[-1] - chunk_bytecode: list[Any] = bytecode + chunk_bytecode: list[Any] = root_bytecode + chunk_globals = globals def set_chunk_bytecode(): - nonlocal chunk_bytecode, last_op + nonlocal chunk_bytecode, chunk_globals, last_op, debug_bytecode if not frame.chunk or frame.chunk == "root": - chunk_bytecode = bytecode - last_op = len(bytecode) - 1 + chunk_bytecode = root_bytecode + chunk_globals = globals elif frame.chunk.startswith("stl/") and frame.chunk[4:] in BYTECODE_STL: chunk_bytecode = BYTECODE_STL[frame.chunk[4:]][1] - last_op = len(bytecode) - 1 + chunk_globals = {} + elif bytecodes.get(frame.chunk): + chunk_bytecode = bytecodes[frame.chunk].get("bytecode", []) + chunk_globals = bytecodes[frame.chunk].get("globals", {}) else: raise HogVMException(f"Unknown chunk: {frame.chunk}") + last_op = len(chunk_bytecode) - 1 + if debug: + debug_bytecode = color_bytecode(chunk_bytecode) + if frame.ip == 0 and (chunk_bytecode[0] == "_H" or chunk_bytecode[0] == "_h"): + # TODO: store chunk version + frame.ip += 2 if chunk_bytecode[0] == "_H" else 1 + + set_chunk_bytecode() def stack_keep_first_elements(count: int) -> list[Any]: nonlocal stack, mem_stack, mem_used @@ -163,13 +182,28 @@ def capture_upvalue(index) -> dict: return created_upvalue symbol: Any = None - while frame.ip <= last_op: + while True: + # Return or jump back to the previous call frame if ran out of bytecode to execute in this one, and return null + if frame.ip > last_op: + last_call_frame = call_stack.pop() + if len(call_stack) == 0 or last_call_frame is None: + if len(stack) > 1: + raise HogVMException("Invalid bytecode. More than one value left on stack") + return BytecodeResult( + result=pop_stack() if len(stack) > 0 else None, stdout=stdout, bytecodes=bytecodes + ) + stack_start = last_call_frame.stack_start + stack_keep_first_elements(stack_start) + push_stack(None) + frame = call_stack[-1] + set_chunk_bytecode() + ops += 1 symbol = chunk_bytecode[frame.ip] if (ops & 127) == 0: # every 128th operation check_timeout() elif debug: - debugger(symbol, bytecode, colored_bytecode, frame.ip, stack, call_stack, throw_stack) + debugger(symbol, chunk_bytecode, debug_bytecode, frame.ip, stack, call_stack, throw_stack) match symbol: case None: break @@ -247,8 +281,8 @@ def capture_upvalue(index) -> dict: push_stack(not bool(re.search(re.compile(args[1], re.RegexFlag.IGNORECASE), args[0]))) case Operation.GET_GLOBAL: chain = [pop_stack() for _ in range(next_token())] - if globals and chain[0] in globals: - push_stack(deepcopy(get_nested_value(globals, chain, True))) + if chunk_globals and chain[0] in chunk_globals: + push_stack(deepcopy(get_nested_value(chunk_globals, chain, True))) elif functions and chain[0] in functions: push_stack( new_hog_closure( @@ -298,7 +332,7 @@ def capture_upvalue(index) -> dict: response = pop_stack() last_call_frame = call_stack.pop() if len(call_stack) == 0 or last_call_frame is None: - return BytecodeResult(result=response, stdout=stdout, bytecode=bytecode) + return BytecodeResult(result=response, stdout=stdout, bytecodes=bytecodes) stack_start = last_call_frame.stack_start stack_keep_first_elements(stack_start) push_stack(response) @@ -459,10 +493,35 @@ def capture_upvalue(index) -> dict: ) ), ) + set_chunk_bytecode() call_stack.append(frame) continue # resume the loop without incrementing frame.ip else: - if functions is not None and name in functions: + if name == "import": + if arg_count != 1: + raise HogVMException("Function import requires exactly 1 argument") + module_name = pop_stack() + frame.ip += 1 # advance for when we return + frame = CallFrame( + ip=0, + chunk=module_name, + stack_start=len(stack), + arg_len=0, + closure=new_hog_closure( + new_hog_callable( + type="local", + name=module_name, + arg_count=0, + upvalue_count=0, + ip=0, + chunk=module_name, + ) + ), + ) + set_chunk_bytecode() + call_stack.append(frame) + continue + elif functions is not None and name in functions: if version == 0: args = [pop_stack() for _ in range(arg_count)] else: @@ -598,10 +657,5 @@ def capture_upvalue(index) -> dict: ) frame.ip += 1 - if debug: - debugger(symbol, bytecode, colored_bytecode, frame.ip, stack, call_stack, throw_stack) - if len(stack) > 1: - raise HogVMException("Invalid bytecode. More than one value left on stack") - if len(stack) == 1: - result = pop_stack() - return BytecodeResult(result=result, stdout=stdout, bytecode=bytecode) + + return BytecodeResult(result=pop_stack() if len(stack) > 0 else None, stdout=stdout, bytecodes=bytecodes) diff --git a/hogvm/python/test/test_execute.py b/hogvm/python/test/test_execute.py index 1a0664ebb9178..c7fae2a73051c 100644 --- a/hogvm/python/test/test_execute.py +++ b/hogvm/python/test/test_execute.py @@ -1008,3 +1008,28 @@ def test_bytecode_uncaught_errors(self): assert e.payload == {"key": "value"} else: raise AssertionError("Expected Exception not raised") + + def test_multiple_bytecodes(self): + ret = lambda string: {"bytecode": ["_H", 1, op.STRING, string, op.RETURN]} + call = lambda chunk: {"bytecode": ["_H", 1, op.STRING, chunk, op.CALL_GLOBAL, "import", 1, op.RETURN]} + res = execute_bytecode( + { + "root": call("code2"), + "code2": ret("banana"), + } + ) + assert res.result == "banana" + + def test_multiple_bytecodes_callback(self): + ret = lambda string: {"bytecode": ["_H", 1, op.STRING, string, op.RETURN]} + call = lambda chunk: {"bytecode": ["_H", 1, op.STRING, chunk, op.CALL_GLOBAL, "import", 1, op.RETURN]} + res = execute_bytecode( + { + "root": call("code2"), + "code2": call("code3"), + "code3": call("code4"), + "code4": call("code5"), + "code5": ret("tomato"), + } + ) + assert res.result == "tomato" diff --git a/hogvm/stl/src/arrayCount.hog b/hogvm/stl/src/arrayCount.hog index da2cac89c27fa..831206a116d27 100644 --- a/hogvm/stl/src/arrayCount.hog +++ b/hogvm/stl/src/arrayCount.hog @@ -1,4 +1,4 @@ -fn arrayCount(func, arr) { +fun arrayCount(func, arr) { let count := 0 for (let i in arr) { if (func(i)) { diff --git a/hogvm/stl/src/arrayExists.hog b/hogvm/stl/src/arrayExists.hog index d3340d724cb12..018f34cd9ef1b 100644 --- a/hogvm/stl/src/arrayExists.hog +++ b/hogvm/stl/src/arrayExists.hog @@ -1,4 +1,4 @@ -fn arrayExists(func, arr) { +fun arrayExists(func, arr) { for (let i in arr) { if (func(i)) { return true diff --git a/hogvm/stl/src/arrayFilter.hog b/hogvm/stl/src/arrayFilter.hog index 3a2f8240f1d28..fc2ba9a43c178 100644 --- a/hogvm/stl/src/arrayFilter.hog +++ b/hogvm/stl/src/arrayFilter.hog @@ -1,4 +1,4 @@ -fn arrayFilter(func, arr) { +fun arrayFilter(func, arr) { let result := [] for (let i in arr) { if (func(i)) { diff --git a/hogvm/stl/src/arrayMap.hog b/hogvm/stl/src/arrayMap.hog index 98b75552f79b7..7586a90c22438 100644 --- a/hogvm/stl/src/arrayMap.hog +++ b/hogvm/stl/src/arrayMap.hog @@ -1,4 +1,4 @@ -fn arrayMap(func, arr) { +fun arrayMap(func, arr) { let result := [] for (let i in arr) { result := arrayPushBack(result, func(i)) diff --git a/hogvm/typescript/package.json b/hogvm/typescript/package.json index 5b45ec2170904..cdb4b64b03240 100644 --- a/hogvm/typescript/package.json +++ b/hogvm/typescript/package.json @@ -1,6 +1,6 @@ { "name": "@posthog/hogvm", - "version": "1.0.54", + "version": "1.0.55", "description": "PostHog Hog Virtual Machine", "types": "dist/index.d.ts", "source": "src/index.ts", diff --git a/hogvm/typescript/src/__tests__/execute.test.ts b/hogvm/typescript/src/__tests__/execute.test.ts index 430198e726bac..6d28c8ea06004 100644 --- a/hogvm/typescript/src/__tests__/execute.test.ts +++ b/hogvm/typescript/src/__tests__/execute.test.ts @@ -2,6 +2,7 @@ import RE2 from 're2' import { exec, execAsync, execSync } from '../execute' import { Operation as op } from '../operation' import { UncaughtHogVMException } from '../utils' +import { BytecodeEntry } from '../types' export function delay(ms: number): Promise { return new Promise((resolve) => { @@ -556,7 +557,7 @@ describe('hogvm execute', () => { finished: false, result: undefined, state: { - bytecode, + bytecodes: { root: { bytecode } }, asyncSteps: 1, callStack: [ { @@ -567,7 +568,7 @@ describe('hogvm execute', () => { closure: { __hogClosure__: true, callable: { - __hogCallable__: 'main', + __hogCallable__: 'local', name: '', argCount: 0, chunk: 'root', @@ -602,7 +603,7 @@ describe('hogvm execute', () => { result: '0.002', state: { asyncSteps: 0, - bytecode: [], + bytecodes: {}, callStack: [], declaredFunctions: {}, maxMemUsed: 13, @@ -629,7 +630,7 @@ describe('hogvm execute', () => { result: '0.002', state: { asyncSteps: 0, - bytecode: [], + bytecodes: {}, callStack: [], declaredFunctions: {}, maxMemUsed: 13, @@ -1955,7 +1956,7 @@ describe('hogvm execute', () => { result: undefined, state: { asyncSteps: 1, - bytecode: bytecode, + bytecodes: { root: { bytecode } }, callStack: [ { ip: 12, @@ -1965,7 +1966,7 @@ describe('hogvm execute', () => { closure: { __hogClosure__: true, callable: { - __hogCallable__: 'main', + __hogCallable__: 'local', name: '', argCount: 0, chunk: 'root', @@ -2051,7 +2052,7 @@ describe('hogvm execute', () => { asyncFunctionName: 'sleep', asyncFunctionArgs: [2], state: { - bytecode, + bytecodes: { root: { bytecode } }, stack: [ 2, { @@ -2085,7 +2086,7 @@ describe('hogvm execute', () => { closure: { __hogClosure__: true, callable: { - __hogCallable__: 'main', + __hogCallable__: 'local', name: '', argCount: 0, upvalueCount: 0, @@ -2110,7 +2111,7 @@ describe('hogvm execute', () => { result: 17, finished: true, state: { - bytecode: [], + bytecodes: {}, stack: expect.any(Array), telemetry: undefined, upvalues: [], @@ -2198,7 +2199,7 @@ describe('hogvm execute', () => { asyncFunctionName: 'sleep', asyncFunctionArgs: [2], state: { - bytecode, + bytecodes: { root: { bytecode } }, stack: [ { __hogClosure__: true, @@ -2243,7 +2244,7 @@ describe('hogvm execute', () => { closure: { __hogClosure__: true, callable: { - __hogCallable__: 'main', + __hogCallable__: 'local', name: '', argCount: 0, upvalueCount: 0, @@ -2268,7 +2269,7 @@ describe('hogvm execute', () => { result: 'outside', finished: true, state: { - bytecode: [], + bytecodes: {}, stack: [], upvalues: [], callStack: [], @@ -2361,7 +2362,7 @@ describe('hogvm execute', () => { asyncFunctionName: 'sleep', asyncFunctionArgs: [2], state: { - bytecode: bytecode, + bytecodes: { root: { bytecode } }, stack: [ { __hogClosure__: true, @@ -2406,7 +2407,7 @@ describe('hogvm execute', () => { closure: { __hogClosure__: true, callable: { - __hogCallable__: 'main', + __hogCallable__: 'local', name: '', argCount: 0, upvalueCount: 0, @@ -2449,7 +2450,7 @@ describe('hogvm execute', () => { result: 'outside', finished: true, state: { - bytecode: [], + bytecodes: {}, stack: expect.any(Array), upvalues: [], callStack: [], @@ -2470,7 +2471,7 @@ describe('hogvm execute', () => { result: 3, finished: true, state: { - bytecode: [], + bytecodes: {}, stack: [], upvalues: [], callStack: [], @@ -2498,7 +2499,7 @@ describe('hogvm execute', () => { result: 'truefalse', finished: true, state: { - bytecode: [], + bytecodes: {}, stack: [], upvalues: [], callStack: [], @@ -2517,4 +2518,36 @@ describe('hogvm execute', () => { }, }) }) + + test('multiple bytecodes', () => { + const ret = (string: string): BytecodeEntry => ({ bytecode: ['_H', 1, op.STRING, string, op.RETURN] }) + const call = (chunk: string): BytecodeEntry => ({ + bytecode: ['_H', 1, op.STRING, chunk, op.CALL_GLOBAL, 'import', 1, op.RETURN], + }) + + const bytecodes: Record = { + root: call('code2'), + code2: ret('banana'), + } + const res = exec({ bytecodes }) + expect(res.result).toEqual('banana') + }) + + test('multiple bytecodes via callback', () => { + const ret = (string: string): BytecodeEntry => ({ bytecode: ['_H', 1, op.STRING, string, op.RETURN] }) + const call = (chunk: string): BytecodeEntry => ({ + // bytecode: ['_H', 1, op.STRING, chunk, op.CALL_GLOBAL, '__importCallable', 1, op.CALL_LOCAL, 0, op.RETURN], + bytecode: ['_H', 1, op.STRING, chunk, op.CALL_GLOBAL, 'import', 1, op.RETURN], + }) + const res = exec(call('code2').bytecode, { + importBytecode: (chunk: string) => + ({ + code2: call('code3'), + code3: call('code4'), + code4: call('code5'), + code5: ret('tomato'), + }[chunk]), + }) + expect(res.result).toEqual('tomato') + }) }) diff --git a/hogvm/typescript/src/constants.ts b/hogvm/typescript/src/constants.ts index 70033476cb497..7e0ed7e940c19 100644 --- a/hogvm/typescript/src/constants.ts +++ b/hogvm/typescript/src/constants.ts @@ -2,3 +2,4 @@ export const DEFAULT_MAX_ASYNC_STEPS = 100 export const DEFAULT_MAX_MEMORY = 64 * 1024 * 1024 // 64 MB export const DEFAULT_TIMEOUT_MS = 5000 // ms export const MAX_FUNCTION_ARGS_LENGTH = 300 +export const CALLSTACK_LENGTH = 1000 diff --git a/hogvm/typescript/src/execute.ts b/hogvm/typescript/src/execute.ts index 1644eebe05e69..2083de268e723 100644 --- a/hogvm/typescript/src/execute.ts +++ b/hogvm/typescript/src/execute.ts @@ -1,9 +1,25 @@ -import { DEFAULT_MAX_ASYNC_STEPS, DEFAULT_MAX_MEMORY, DEFAULT_TIMEOUT_MS, MAX_FUNCTION_ARGS_LENGTH } from './constants' +import { + CALLSTACK_LENGTH, + DEFAULT_MAX_ASYNC_STEPS, + DEFAULT_MAX_MEMORY, + DEFAULT_TIMEOUT_MS, + MAX_FUNCTION_ARGS_LENGTH, +} from './constants' import { isHogCallable, isHogClosure, isHogError, isHogUpValue, newHogCallable, newHogClosure } from './objects' import { Operation, operations } from './operation' import { BYTECODE_STL } from './stl/bytecode' import { ASYNC_STL, STL } from './stl/stl' -import { CallFrame, ExecOptions, ExecResult, HogUpValue, Telemetry, ThrowFrame, VMState } from './types' +import { + BytecodeEntry, + Bytecodes, + CallFrame, + ExecOptions, + ExecResult, + HogUpValue, + Telemetry, + ThrowFrame, + VMState, +} from './types' import { calculateCost, convertHogToJS, @@ -16,7 +32,7 @@ import { unifyComparisonTypes, } from './utils' -export function execSync(bytecode: any[] | VMState, options?: ExecOptions): any { +export function execSync(bytecode: any[] | VMState | Bytecodes, options?: ExecOptions): any { const response = exec(bytecode, options) if (response.finished) { return response.result @@ -27,7 +43,7 @@ export function execSync(bytecode: any[] | VMState, options?: ExecOptions): any throw new HogVMException('Unexpected async function call: ' + response.asyncFunctionName) } -export async function execAsync(bytecode: any[] | VMState, options?: ExecOptions): Promise { +export async function execAsync(bytecode: any[] | VMState | Bytecodes, options?: ExecOptions): Promise { let vmState: VMState | undefined = undefined while (true) { const response = exec(vmState ?? bytecode, options) @@ -58,20 +74,26 @@ export async function execAsync(bytecode: any[] | VMState, options?: ExecOptions } } -export function exec(code: any[] | VMState, options?: ExecOptions): ExecResult { +export function exec(input: any[] | VMState | Bytecodes, options?: ExecOptions): ExecResult { const startTime = Date.now() let vmState: VMState | undefined = undefined - let bytecode: any[] - if (!Array.isArray(code)) { - vmState = code - bytecode = vmState.bytecode + + let bytecodes: Record + if (!Array.isArray(input)) { + if ('stack' in input) { + vmState = input + } + bytecodes = (input as VMState).bytecode + ? { root: { bytecode: (input as VMState).bytecode as any[] } } + : input.bytecodes } else { - bytecode = code + bytecodes = { root: { bytecode: input } } } - if (!bytecode || bytecode.length === 0 || (bytecode[0] !== '_h' && bytecode[0] !== '_H')) { + const rootBytecode = bytecodes.root.bytecode + if (!rootBytecode || rootBytecode.length === 0 || (rootBytecode[0] !== '_h' && rootBytecode[0] !== '_H')) { throw new HogVMException("Invalid HogQL bytecode, must start with '_H'") } - const version = bytecode[0] === '_H' ? bytecode[1] ?? 0 : 0 + const version = rootBytecode[0] === '_H' ? rootBytecode[1] ?? 0 : 0 let temp: any let temp2: any @@ -100,15 +122,16 @@ export function exec(code: any[] | VMState, options?: ExecOptions): ExecResult { let ops = vmState ? vmState.ops : 0 const timeout = options?.timeout ?? DEFAULT_TIMEOUT_MS const maxAsyncSteps = options?.maxAsyncSteps ?? DEFAULT_MAX_ASYNC_STEPS + const rootGlobals: Record = options?.globals ?? {} if (callStack.length === 0) { callStack.push({ - ip: bytecode[0] === '_H' ? 2 : 1, + ip: 0, chunk: 'root', stackStart: 0, argCount: 0, closure: newHogClosure( - newHogCallable('main', { + newHogCallable('local', { name: '', argCount: 0, upvalueCount: 0, @@ -119,7 +142,8 @@ export function exec(code: any[] | VMState, options?: ExecOptions): ExecResult { } satisfies CallFrame) } let frame: CallFrame = callStack[callStack.length - 1] - let chunkBytecode: any[] = bytecode + let chunkBytecode: any[] = rootBytecode + let chunkGlobals = rootGlobals let lastChunk = frame.chunk let lastTime = startTime @@ -127,13 +151,32 @@ export function exec(code: any[] | VMState, options?: ExecOptions): ExecResult { const setChunkBytecode = (): void => { if (!frame.chunk || frame.chunk === 'root') { - chunkBytecode = bytecode + chunkBytecode = rootBytecode + chunkGlobals = rootGlobals } else if (frame.chunk.startsWith('stl/') && frame.chunk.substring(4) in BYTECODE_STL) { chunkBytecode = BYTECODE_STL[frame.chunk.substring(4)][1] + chunkGlobals = {} + } else if (bytecodes[frame.chunk]) { + chunkBytecode = bytecodes[frame.chunk].bytecode + chunkGlobals = bytecodes[frame.chunk].globals ?? {} + } else if (options?.importBytecode) { + const chunk = options.importBytecode(frame.chunk) + if (chunk) { + bytecodes[frame.chunk] = chunk // cache for later + chunkBytecode = chunk.bytecode + chunkGlobals = chunk.globals ?? {} + } else { + throw new HogVMException(`Unknown chunk: ${frame.chunk}`) + } } else { throw new HogVMException(`Unknown chunk: ${frame.chunk}`) } + if (frame.ip === 0 && (chunkBytecode[0] === '_H' || chunkBytecode[0] === '_h')) { + // TODO: store chunkVersion + frame.ip += chunkBytecode[0] === '_H' ? 2 : 1 + } } + setChunkBytecode() function popStack(): any { if (stack.length === 0) { @@ -193,7 +236,7 @@ export function exec(code: any[] | VMState, options?: ExecOptions): ExecResult { function getVMState(): VMState { return { - bytecode, + bytecodes: bytecodes, stack: stack.map((v) => convertHogToJS(v)), upvalues: sortedUpValues.map((v) => ({ ...v, value: convertHogToJS(v.value) })), callStack: callStack.map((v) => ({ @@ -284,7 +327,25 @@ export function exec(code: any[] | VMState, options?: ExecOptions): ExecResult { } try { - while (frame.ip < chunkBytecode.length) { + while (true) { + // Return or jump back to the previous call frame if ran out of bytecode to execute in this one + if (frame.ip >= chunkBytecode.length) { + const lastCallFrame = callStack.pop() + if (!lastCallFrame || callStack.length === 0) { + if (stack.length > 1) { + throw new HogVMException('Invalid bytecode. More than one value left on stack') + } + return { + result: stack.length > 0 ? popStack() : null, + finished: true, + state: { ...getVMState(), bytecodes: {}, stack: [], callStack: [], upvalues: [] }, + } satisfies ExecResult + } + stackKeepFirstElements(lastCallFrame.stackStart) + pushStack(null) + frame = callStack[callStack.length - 1] + setChunkBytecode() + } nextOp() switch (chunkBytecode[frame.ip]) { case null: @@ -407,8 +468,8 @@ export function exec(code: any[] | VMState, options?: ExecOptions): ExecResult { for (let i = 0; i < count; i++) { chain.push(popStack()) } - if (options?.globals && chain[0] in options.globals && Object.hasOwn(options.globals, chain[0])) { - pushStack(convertJSToHog(getNestedValue(options.globals, chain, true))) + if (chunkGlobals && chain[0] in chunkGlobals && Object.hasOwn(chunkGlobals, chain[0])) { + pushStack(convertJSToHog(getNestedValue(chunkGlobals, chain, true))) } else if ( options?.asyncFunctions && chain.length == 1 && @@ -480,7 +541,7 @@ export function exec(code: any[] | VMState, options?: ExecOptions): ExecResult { return { result, finished: true, - state: { ...getVMState(), bytecode: [], stack: [], callStack: [], upvalues: [] }, + state: { ...getVMState(), bytecodes: {}, stack: [], callStack: [], upvalues: [] }, } satisfies ExecResult } const stackStart = lastCallFrame.stackStart @@ -671,7 +732,40 @@ export function exec(code: any[] | VMState, options?: ExecOptions): ExecResult { throw new HogVMException('Too many arguments') } - if (options?.functions && Object.hasOwn(options.functions, name) && options.functions[name]) { + if (name === 'import') { + const args = + version === 0 + ? Array(temp) + .fill(null) + .map(() => popStack()) + : stackKeepFirstElements(stack.length - temp) + if (args.length !== 1) { + throw new HogVMException(`Function ${name} requires exactly 1 argument`) + } + frame.ip += 1 // advance for when we return + frame = { + ip: 0, + chunk: args[0], + stackStart: stack.length, + argCount: 0, + closure: newHogClosure( + newHogCallable('local', { + name: args[0], + argCount: 0, + upvalueCount: 0, + ip: 0, + chunk: args[0], + }) + ), + } satisfies CallFrame + setChunkBytecode() + callStack.push(frame) + continue // resume the loop without incrementing frame.ip + } else if ( + options?.functions && + Object.hasOwn(options.functions, name) && + options.functions[name] + ) { const args = version === 0 ? Array(temp) @@ -742,6 +836,9 @@ export function exec(code: any[] | VMState, options?: ExecOptions): ExecResult { } satisfies CallFrame setChunkBytecode() callStack.push(frame) + if (callStack.length > CALLSTACK_LENGTH) { + throw new HogVMException(`Call stack exceeded maximum length of ${CALLSTACK_LENGTH}`) + } continue // resume the loop without incrementing frame.ip } else { throw new HogVMException(`Unsupported function call: ${name}`) @@ -785,6 +882,9 @@ export function exec(code: any[] | VMState, options?: ExecOptions): ExecResult { } satisfies CallFrame setChunkBytecode() callStack.push(frame) + if (callStack.length > CALLSTACK_LENGTH) { + throw new HogVMException(`Call stack exceeded maximum length of ${CALLSTACK_LENGTH}`) + } continue // resume the loop without incrementing frame.ip } else if (closure.callable.__hogCallable__ === 'stl') { if (!closure.callable.name || !(closure.callable.name in STL)) { @@ -871,28 +971,10 @@ export function exec(code: any[] | VMState, options?: ExecOptions): ExecResult { ) } - // use "continue" to skip incrementing frame.ip each iteration + // use "continue" to skip this frame.ip auto-increment frame.ip++ } - - if (stack.length > 1) { - throw new HogVMException('Invalid bytecode. More than one value left on stack') - } } catch (e) { return { result: null, finished: false, error: e, state: getVMState() } satisfies ExecResult } - - if (stack.length === 0) { - return { - result: null, - finished: true, - state: { ...getVMState(), bytecode: [], stack: [], callStack: [], upvalues: [] }, - } satisfies ExecResult - } - - return { - result: popStack() ?? null, - finished: true, - state: { ...getVMState(), bytecode: [], stack: [], callStack: [], upvalues: [] }, - } satisfies ExecResult } diff --git a/hogvm/typescript/src/types.ts b/hogvm/typescript/src/types.ts index 8cde60ce9cd79..707c97d6e9633 100644 --- a/hogvm/typescript/src/types.ts +++ b/hogvm/typescript/src/types.ts @@ -1,8 +1,15 @@ import type crypto from 'crypto' +export interface BytecodeEntry { + bytecode: any[] + globals?: Record +} + export interface VMState { /** Bytecode running in the VM */ - bytecode: any[] + bytecodes: Record + /** TODO: Legacy bytecode running in the VM (kept around for inflight jobs) */ + bytecode?: any[] /** Stack of the VM */ stack: any[] /** Values hoisted from the stack */ @@ -25,11 +32,16 @@ export interface VMState { telemetry?: Telemetry[] } +export interface Bytecodes { + bytecodes: Record +} + export interface ExecOptions { /** Global variables to be passed into the function */ globals?: Record functions?: Record any> asyncFunctions?: Record Promise> + importBytecode?: (module: string) => BytecodeEntry | undefined /** Timeout in milliseconds */ timeout?: number /** Max number of async function that can happen. When reached the function will throw */ @@ -108,7 +120,7 @@ export interface HogError { } export interface HogCallable { - __hogCallable__: 'local' | 'stl' | 'async' | 'main' + __hogCallable__: 'local' | 'stl' | 'async' name?: string argCount: number upvalueCount: number diff --git a/package.json b/package.json index ddbed7ae6cec6..65dd94c06455a 100644 --- a/package.json +++ b/package.json @@ -76,7 +76,7 @@ "@medv/finder": "^3.1.0", "@microlink/react-json-view": "^1.21.3", "@monaco-editor/react": "4.6.0", - "@posthog/hogvm": "^1.0.54", + "@posthog/hogvm": "^1.0.55", "@posthog/icons": "0.8.5", "@posthog/plugin-scaffold": "^1.4.4", "@react-hook/size": "^2.1.2", diff --git a/plugin-server/package.json b/plugin-server/package.json index 671a6941b8329..d3b9711b6f963 100644 --- a/plugin-server/package.json +++ b/plugin-server/package.json @@ -54,7 +54,7 @@ "@maxmind/geoip2-node": "^3.4.0", "@posthog/clickhouse": "^1.7.0", "@posthog/cyclotron": "file:../rust/cyclotron-node", - "@posthog/hogvm": "^1.0.54", + "@posthog/hogvm": "^1.0.55", "@posthog/plugin-scaffold": "1.4.4", "@sentry/node": "^7.49.0", "@sentry/profiling-node": "^0.3.0", diff --git a/plugin-server/pnpm-lock.yaml b/plugin-server/pnpm-lock.yaml index e1838d53a6732..d6150f2962bd9 100644 --- a/plugin-server/pnpm-lock.yaml +++ b/plugin-server/pnpm-lock.yaml @@ -47,8 +47,8 @@ dependencies: specifier: file:../rust/cyclotron-node version: file:../rust/cyclotron-node '@posthog/hogvm': - specifier: ^1.0.54 - version: 1.0.54(luxon@3.4.4) + specifier: ^1.0.55 + version: 1.0.55(luxon@3.4.4) '@posthog/plugin-scaffold': specifier: 1.4.4 version: 1.4.4 @@ -3119,8 +3119,8 @@ packages: engines: {node: '>=12'} dev: false - /@posthog/hogvm@1.0.54(luxon@3.4.4): - resolution: {integrity: sha512-vHYdiFcIvJ3CxYd/8rSZoUk8ZkeHETGb1dbBhURLejxp9dMyWC1qJnmcIlkH/vsbed/4hVUtFMqtp4bnQSmi2g==} + /@posthog/hogvm@1.0.55(luxon@3.4.4): + resolution: {integrity: sha512-cjF3lrA62aaqoERiVJHpkkRBS6QJ1rH4xYwiOMOs8ZQKNNRofeqth4NJzhJReXA0Wlf99l8hDt2lgFgLaDBI1w==} peerDependencies: luxon: ^3.4.4 dependencies: diff --git a/plugin-server/tests/cdp/cdp-api.test.ts b/plugin-server/tests/cdp/cdp-api.test.ts index 3808657e6ba29..aff207f099a4b 100644 --- a/plugin-server/tests/cdp/cdp-api.test.ts +++ b/plugin-server/tests/cdp/cdp-api.test.ts @@ -174,7 +174,7 @@ describe('CDP API', () => { }, { level: 'debug', - message: "Suspending function due to async function call 'fetch'. Payload: 2064 bytes", + message: "Suspending function due to async function call 'fetch'. Payload: 2110 bytes", }, { level: 'info', @@ -222,7 +222,7 @@ describe('CDP API', () => { }, { level: 'debug', - message: "Suspending function due to async function call 'fetch'. Payload: 2064 bytes", + message: "Suspending function due to async function call 'fetch'. Payload: 2110 bytes", }, { level: 'debug', diff --git a/plugin-server/tests/cdp/cdp-processed-events-consumer.test.ts b/plugin-server/tests/cdp/cdp-processed-events-consumer.test.ts index 71ccf4f5b8d5f..633f408101802 100644 --- a/plugin-server/tests/cdp/cdp-processed-events-consumer.test.ts +++ b/plugin-server/tests/cdp/cdp-processed-events-consumer.test.ts @@ -172,7 +172,7 @@ describe('CDP Processed Events Consumer', () => { { topic: 'log_entries_test', value: { - message: "Suspending function due to async function call 'fetch'. Payload: 1956 bytes", + message: "Suspending function due to async function call 'fetch'. Payload: 2002 bytes", log_source_id: fnFetchNoFilters.id, }, }, diff --git a/plugin-server/tests/cdp/hog-executor.test.ts b/plugin-server/tests/cdp/hog-executor.test.ts index f13ebd0e9c0a4..7f10b5a9103be 100644 --- a/plugin-server/tests/cdp/hog-executor.test.ts +++ b/plugin-server/tests/cdp/hog-executor.test.ts @@ -106,7 +106,7 @@ describe('Hog Executor', () => { { timestamp: expect.any(DateTime), level: 'debug', - message: "Suspending function due to async function call 'fetch'. Payload: 1872 bytes", + message: "Suspending function due to async function call 'fetch'. Payload: 1918 bytes", }, ]) }) @@ -187,7 +187,7 @@ describe('Hog Executor', () => { expect(logs.map((log) => log.message)).toMatchInlineSnapshot(` Array [ "Executing function", - "Suspending function due to async function call 'fetch'. Payload: 1872 bytes", + "Suspending function due to async function call 'fetch'. Payload: 1918 bytes", "Resuming function", "Fetch response:, {\\"status\\":200,\\"body\\":\\"success\\"}", "Function completed in 100ms. Sync: 0ms. Mem: 779 bytes. Ops: 22. Event: 'http://localhost:8000/events/1'", @@ -206,7 +206,7 @@ describe('Hog Executor', () => { expect(logs.map((log) => log.message)).toMatchInlineSnapshot(` Array [ "Executing function", - "Suspending function due to async function call 'fetch'. Payload: 1872 bytes", + "Suspending function due to async function call 'fetch'. Payload: 1918 bytes", "Resuming function", "Fetch response:, {\\"status\\":200,\\"body\\":{\\"foo\\":\\"bar\\"}}", "Function completed in 100ms. Sync: 0ms. Mem: 779 bytes. Ops: 22. Event: 'http://localhost:8000/events/1'", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 6143118ede8fd..7ccdb6be2beaf 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -50,8 +50,8 @@ dependencies: specifier: 4.6.0 version: 4.6.0(monaco-editor@0.49.0)(react-dom@18.2.0)(react@18.2.0) '@posthog/hogvm': - specifier: ^1.0.54 - version: 1.0.54(luxon@3.5.0) + specifier: ^1.0.55 + version: 1.0.55(luxon@3.5.0) '@posthog/icons': specifier: 0.8.5 version: 0.8.5(react-dom@18.2.0)(react@18.2.0) @@ -386,7 +386,7 @@ dependencies: optionalDependencies: fsevents: specifier: ^2.3.2 - version: 2.3.3 + version: 2.3.2 devDependencies: '@babel/core': @@ -5412,8 +5412,8 @@ packages: resolution: {integrity: sha512-50/17A98tWUfQ176raKiOGXuYpLyyVMkxxG6oylzL3BPOlA6ADGdK7EYunSa4I064xerltq9TGXs8HmOk5E+vw==} dev: false - /@posthog/hogvm@1.0.54(luxon@3.5.0): - resolution: {integrity: sha512-vHYdiFcIvJ3CxYd/8rSZoUk8ZkeHETGb1dbBhURLejxp9dMyWC1qJnmcIlkH/vsbed/4hVUtFMqtp4bnQSmi2g==} + /@posthog/hogvm@1.0.55(luxon@3.5.0): + resolution: {integrity: sha512-cjF3lrA62aaqoERiVJHpkkRBS6QJ1rH4xYwiOMOs8ZQKNNRofeqth4NJzhJReXA0Wlf99l8hDt2lgFgLaDBI1w==} peerDependencies: luxon: ^3.4.4 dependencies: @@ -13126,7 +13126,6 @@ packages: engines: {node: ^8.16.0 || ^10.6.0 || >=11.0.0} os: [darwin] requiresBuild: true - dev: true optional: true /fsevents@2.3.3: diff --git a/posthog/api/services/query.py b/posthog/api/services/query.py index c3ca91cb88ab5..b98bb979bfc63 100644 --- a/posthog/api/services/query.py +++ b/posthog/api/services/query.py @@ -106,10 +106,11 @@ def process_query_model( try: hog_result = execute_hog(query.code or "", team=team) + bytecode = hog_result.bytecodes.get("root", None) result = HogQueryResponse( results=hog_result.result, - bytecode=hog_result.bytecode, - coloredBytecode=color_bytecode(hog_result.bytecode), + bytecode=bytecode, + coloredBytecode=color_bytecode(bytecode) if bytecode else None, stdout="\n".join(hog_result.stdout), ) except Exception as e: