From a4b7df61422d6344dee1e3f19c7690d7681ab359 Mon Sep 17 00:00:00 2001 From: Franklin Waller Date: Mon, 2 Oct 2023 15:01:03 +0200 Subject: [PATCH] chore: Trying to fix the pipeline --- .../as-sdk-integration-tests/src/http.test.ts | 3 +- libs/vm/src/index.ts | 31 +++++++++++++------ libs/vm/src/vm-imports.ts | 4 +++ libs/vm/src/vm.test.ts | 2 ++ libs/vm/src/vm.ts | 3 ++ libs/vm/src/worker-host-communication.ts | 5 +++ libs/vm/src/worker.ts | 20 +++++++----- 7 files changed, 50 insertions(+), 18 deletions(-) diff --git a/libs/as-sdk-integration-tests/src/http.test.ts b/libs/as-sdk-integration-tests/src/http.test.ts index c85b012..4063a1b 100644 --- a/libs/as-sdk-integration-tests/src/http.test.ts +++ b/libs/as-sdk-integration-tests/src/http.test.ts @@ -6,13 +6,14 @@ import { PromiseStatus } from '../../../dist/libs/vm/src/types/vm-promise'; const mockHttpFetch = jest.fn(); +jest.setTimeout(30_000); + const TestVmAdapter = jest.fn().mockImplementation(() => { return { httpFetch: mockHttpFetch }; }); describe('Http', () => { it('Test SDK HTTP Rejection', async () => { - const wasmBinary = await readFile('dist/libs/as-sdk-integration-tests/debug.wasm'); const result = await callVm({ args: ['testHttpRejection'], diff --git a/libs/vm/src/index.ts b/libs/vm/src/index.ts index 69d92d4..e416403 100644 --- a/libs/vm/src/index.ts +++ b/libs/vm/src/index.ts @@ -25,18 +25,31 @@ export function callVm(callData: VmCallData, workerUrl = DEFAULT_WORKER_PATH, vm type: WorkerMessageType.VmCall, }; - worker.on('message', (message: WorkerMessage) => { - if (message.type === WorkerMessageType.VmResult) { - resolve(message.result); - } else if (message.type === WorkerMessageType.VmActionExecute) { - hostToWorker.executeAction(message.action); - } else if (message.type === WorkerMessageType.VmActionResultBuffer) { - hostToWorker.sendActionResultToWorker(message.buffer); - } else { - console.warn('Unknown message', message); + worker.on('message', async (message: WorkerMessage) => { + try { + console.error('@message: ', message); + if (message.type === WorkerMessageType.VmResult) { + resolve(message.result); + } else if (message.type === WorkerMessageType.VmActionExecute) { + await hostToWorker.executeAction(message.action); + } else if (message.type === WorkerMessageType.VmActionResultBuffer) { + await hostToWorker.sendActionResultToWorker(message.buffer); + } else { + console.warn('Unknown message', message); + } + } catch (error) { + console.error('@callVm-onMessage: ', error); } }); + worker.on('error', (error) => { + resolve({ + exitCode: 1, + stderr: 'Worker threw an uncaught error: ' + error, + stdout: '', + }); + }); + worker.postMessage(workerMessage); }); } diff --git a/libs/vm/src/vm-imports.ts b/libs/vm/src/vm-imports.ts index b5141c6..b618427 100644 --- a/libs/vm/src/vm-imports.ts +++ b/libs/vm/src/vm-imports.ts @@ -16,14 +16,18 @@ export default class VmImports { } httpFetch(action: number, actionLength: number) { + console.log("HTTP Action called"); const rawAction = new Uint8Array( this.memory?.buffer.slice(action, action + actionLength) ?? [] ); const messageRaw = Buffer.from(rawAction).toString('utf-8'); + console.log('HTTP Action called with ', messageRaw); + try { const message: HttpFetchAction = JSON.parse(messageRaw); this.callResult = this.workerToHost.callActionOnHost(message); + console.log('We got a result: ', this.callResult); return this.callResult.length; } catch (error) { console.error(`@httpFetch: ${messageRaw}`, error); diff --git a/libs/vm/src/vm.test.ts b/libs/vm/src/vm.test.ts index a8a2395..34d1068 100644 --- a/libs/vm/src/vm.test.ts +++ b/libs/vm/src/vm.test.ts @@ -7,6 +7,8 @@ import { PromiseStatus } from "../../../dist/libs/vm/src/types/vm-promise"; const mockHttpFetch = jest.fn(); +jest.setTimeout(10_000); + const TestVmAdapter = jest.fn().mockImplementation(() => { return { httpFetch: mockHttpFetch }; }); diff --git a/libs/vm/src/vm.ts b/libs/vm/src/vm.ts index 77a87ae..1798fbe 100644 --- a/libs/vm/src/vm.ts +++ b/libs/vm/src/vm.ts @@ -34,7 +34,9 @@ export async function executeVm(callData: VmCallData, notifierBuffer: SharedArra const memory = instance.exports.memory; vmImports.setMemory(memory as WebAssembly.Memory); + console.log("Starting WASI"); const exitCode = wasi.start(instance); + console.log("exit: ", exitCode); return { exitCode, @@ -44,6 +46,7 @@ export async function executeVm(callData: VmCallData, notifierBuffer: SharedArra resultAsString: new TextDecoder().decode(vmImports.result), } } catch (err) { + console.log('Exception thrown', err); console.error(` @executeWasm Exception threw: ${err} diff --git a/libs/vm/src/worker-host-communication.ts b/libs/vm/src/worker-host-communication.ts index 26488ea..2d24bdc 100644 --- a/libs/vm/src/worker-host-communication.ts +++ b/libs/vm/src/worker-host-communication.ts @@ -69,6 +69,8 @@ export class HostToWorker { const notifierBufferi32 = new Int32Array(this.notifierBuffer); notifierBufferi32.set([this.actionResult.length], 1); + console.error("setted buffer, ready to send"); + updateNotifierState(notifierBufferi32, AtomicState.ResponseResultLength); } @@ -112,6 +114,8 @@ export class WorkerToHost { const length = notifierBufferi32[1]; + console.error("The length is: ", length); + // No need to do a full roundtrip if there are no bytes if (length === 0) { return Buffer.from([]); @@ -123,6 +127,7 @@ export class WorkerToHost { type: WorkerMessageType.VmActionResultBuffer, }; + console.error('Requesting message: ', message); parentPort?.postMessage(message); waitForNotifierStateChange(notifierBufferi32, AtomicState.RequestResult); diff --git a/libs/vm/src/worker.ts b/libs/vm/src/worker.ts index 7ac751d..01a9795 100644 --- a/libs/vm/src/worker.ts +++ b/libs/vm/src/worker.ts @@ -7,15 +7,19 @@ import { } from './types/worker-messages.js'; parentPort?.on('message', async function (event) { - const message: WorkerMessage = event; + try { + const message: WorkerMessage = event; - if (message.type === WorkerMessageType.VmCall) { - const result = await executeVm(message.callData, message.notifierBuffer); - const response: VmResultWorkerMessage = { - result, - type: WorkerMessageType.VmResult, - }; + if (message.type === WorkerMessageType.VmCall) { + const result = await executeVm(message.callData, message.notifierBuffer); + const response: VmResultWorkerMessage = { + result, + type: WorkerMessageType.VmResult, + }; - parentPort?.postMessage(response); + parentPort?.postMessage(response); + } + } catch (error) { + console.error("@worker:message, error thrown: ", error); } });