Skip to content

Commit

Permalink
feat(vm): Implemented HTTP fetch + VM Adapters using Atomics and Shar…
Browse files Browse the repository at this point in the history
…edArrayBuffers
  • Loading branch information
FranklinWaller committed Aug 9, 2023
1 parent 13162a7 commit c074579
Show file tree
Hide file tree
Showing 14 changed files with 1,512 additions and 3,819 deletions.
42 changes: 37 additions & 5 deletions libs/vm/src/default-vm-adapter.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,42 @@
import { HttpFetchAction, VmAdapter } from "./types/vm-adapter";
import fetch from "node-fetch";
import type { HttpFetchAction } from './types/vm-actions';
import { HttpFetchResponse } from './types/vm-actions.js';
import type { VmAdapter } from './types/vm-adapter';
import fetch from 'node-fetch';
import { PromiseStatus } from './types/vm-promise.js';

export default class DefaultVmAdapter implements VmAdapter {
async http_fetch(action: HttpFetchAction): Promise<Response> {
console.log(action);
async httpFetch(action: HttpFetchAction): Promise<PromiseStatus<HttpFetchResponse>> {
try {
const response = await fetch(new URL(action.url), {
method: action.options.method.toUpperCase(),
headers: action.options.headers,
body: action.options.body
? Buffer.from(action.options.body)
: undefined,
});

return new Response();
const bufferResponse = await response.arrayBuffer();
const httpResponse = new HttpFetchResponse({
bytes: Array.from(new Uint8Array(bufferResponse)),
content_length: response.size,
headers: Object.fromEntries(response.headers.entries()),
status: response.status,
url: response.url,
});

return PromiseStatus.fulfilled(httpResponse);
} catch (error) {
const stringifiedError = '' + error;

console.error('@default-vm-adapter: ', stringifiedError);

return PromiseStatus.rejected(new HttpFetchResponse({
bytes: Array.from(new TextEncoder().encode(stringifiedError)),
content_length: stringifiedError.length,
headers: {},
status: 0,
url: '',
}));
}
}
}
32 changes: 14 additions & 18 deletions libs/vm/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { VmCallWorkerMessage, WorkerMessage, WorkerMessageType } from "./types/w
import type { VmAdapter } from "./types/vm-adapter.js";
import DefaultVmAdapter from "./default-vm-adapter.js";
import { parse, format } from "node:path";
import { HostToWorker } from "./worker-host-communication.js";

const CURRENT_FILE_PATH = parse(import.meta.url);
CURRENT_FILE_PATH.base = 'worker.js';
Expand All @@ -12,35 +13,30 @@ const DEFAULT_WORKER_PATH = format(CURRENT_FILE_PATH);
export function callVm(callData: VmCallData, workerUrl = DEFAULT_WORKER_PATH, vmAdapter: VmAdapter = new DefaultVmAdapter()): Promise<VmResult> {
return new Promise((resolve) => {
const worker = new Worker(new URL(workerUrl));
const notifierBuffer = new SharedArrayBuffer(8); // 4 bytes for notifying, 4 bytes for storing i32 numbers

const hostToWorker = new HostToWorker(vmAdapter, notifierBuffer);
const workerMessage: VmCallWorkerMessage = {
processId: Math.random().toString(),
callData: {
...callData,
binary: Array.from(callData.binary),
},
notifierBuffer,
type: WorkerMessageType.VmCall,
};

worker.on("message", (event) => {
const message: WorkerMessage = JSON.parse(event);

if (message.type === WorkerMessageType.VmResult && message.processId === workerMessage.processId) {
worker.on('message', (message: WorkerMessage) => {
if (message.type === WorkerMessageType.VmResult) {
resolve(message.result);
} else if (message.type === WorkerMessageType.VmActionExecute) {
hostToWorker.executeAction(message.action);
} else if (message.type === WorkerMessageType.VmActionResultBuffer) {
hostToWorker.sendActionResultToWorker(message.buffer);
} else {
console.warn('Unknown message', message);
}
});

worker.postMessage(JSON.stringify(workerMessage));
worker.postMessage(workerMessage);
});
}

async function main() {
const result = await callVm({
args: [],
binary: new Uint8Array(),
envs: {},
});

console.log(result);
}

main();
51 changes: 51 additions & 0 deletions libs/vm/src/types/vm-actions.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import type { ToBuffer } from "./vm-promise";

enum HttpFetchMethod {
Options = 'Options',
Get = 'Get',
Post = 'Post',
Put = 'Put',
Delete = 'Delete',
Head = 'Head',
Trace = 'Trace',
Connect = 'Connect',
Patch = 'Patch',
}

export interface HttpFetchOptions {
method: HttpFetchMethod;
headers: { [key: string]: string };
body?: Uint8Array;
}

export interface HttpFetchAction {
url: string;
options: HttpFetchOptions;
}

export interface HttpFetchResponseData {
/** HTTP Status code */
status: number;

/** Response headers */
headers: { [key: string]: string };

/** Response body in bytes */
bytes: number[];

/** The final URL that was resolved */
url: string;

/** The byte length of the response */
content_length: number;
}

export class HttpFetchResponse implements ToBuffer {
constructor(public data: HttpFetchResponseData) {}

toBuffer(): Uint8Array {
return new TextEncoder().encode(JSON.stringify(this.data));
}
}

export type VmAction = HttpFetchAction;
25 changes: 3 additions & 22 deletions libs/vm/src/types/vm-adapter.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,7 @@
enum HttpFetchMethod {
Options,
Get,
Post,
Put,
Delete,
Head,
Trace,
Connect,
Patch,
}

export interface HttpFetchOptions {
method: HttpFetchMethod,
headers: Map<string, string>,
body?: Uint8Array,
}
import type { HttpFetchAction, HttpFetchResponse } from "./vm-actions";
import { PromiseStatus } from "./vm-promise.js";

export interface HttpFetchAction {
url: string,
options: HttpFetchOptions,
}

export interface VmAdapter {
http_fetch(action: HttpFetchAction): Promise<Response>;
httpFetch(action: HttpFetchAction): Promise<PromiseStatus<HttpFetchResponse>>;
}
38 changes: 38 additions & 0 deletions libs/vm/src/types/vm-promise.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
interface PromiseStatusResult {
/** Actually a Uint8[] */
Fulfilled?: number[];
/** Actually a Uint8[] */
Rejected?: number[];
}

export interface ToBuffer {
toBuffer(): Uint8Array;
}

export class PromiseStatus<T> {
private constructor(private value: PromiseStatusResult) {}

static rejected<T extends ToBuffer>(value: T): PromiseStatus<T> {
return new PromiseStatus({
Rejected: Array.from(value.toBuffer()),
});
}

static fulfilled<T extends ToBuffer>(value?: T): PromiseStatus<T> {
return new PromiseStatus({
Fulfilled: Array.from(value?.toBuffer() ?? []),
});
}

get length() {
return this.toJSON().length;
}

toJSON(): string {
return JSON.stringify(this.value);
}

toBuffer(): Uint8Array {
return new TextEncoder().encode(this.toJSON());
}
}
25 changes: 21 additions & 4 deletions libs/vm/src/types/worker-messages.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,37 @@
import { VmCallData, VmResult } from "../vm";
import type { VmCallData, VmResult } from "../vm";
import type { VmAction } from "./vm-actions";

export enum WorkerMessageType {
VmCall = "VmCall",
VmResult = "VmResult",
VmActionResultBuffer = "VmActionResultBuffer",
VmActionExecute = "VmActionExecute",
}

export interface VmCallWorkerMessage {
processId: string,
callData: VmCallData,
notifierBuffer: SharedArrayBuffer,
type: WorkerMessageType.VmCall,
}

export interface VmResultWorkerMessage {
processId: string,
result: VmResult,
type: WorkerMessageType.VmResult,
}

export type WorkerMessage = VmCallWorkerMessage | VmResultWorkerMessage;
export interface VmActionResultBufferMessage {
buffer: SharedArrayBuffer,
type: WorkerMessageType.VmActionResultBuffer,
}

export interface VmActionExecuteMessage {
action: VmAction;
type: WorkerMessageType.VmActionExecute;
}


export type WorkerMessage =
| VmCallWorkerMessage
| VmResultWorkerMessage
| VmActionResultBufferMessage
| VmActionExecuteMessage;
66 changes: 36 additions & 30 deletions libs/vm/src/vm-imports.ts
Original file line number Diff line number Diff line change
@@ -1,49 +1,55 @@

function httpFetch(memory: WebAssembly.Memory) {
return (action: number, actionLength: number) => {
console.log('http args', action, actionLength, memory);
};
}

function callResultWrite(result: number, resultLength: number) {
console.log('call result args', result, resultLength);
}

function executionResult(result: number, resultLength: number) {
console.log('execution result args', result, resultLength);
}

export function createVmImports(wasiImports: object): WebAssembly.Imports {
return {
...wasiImports,
'env': {
'http_fetch': httpFetch,
'call_result_write': callResultWrite,
'execution_result': executionResult,
},
};
}
import type { HttpFetchAction } from './types/vm-actions';
import { WorkerToHost } from './worker-host-communication.js';

export default class VmImports {
memory?: WebAssembly.Memory;
workerToHost: WorkerToHost;
callResult: Uint8Array = new Uint8Array();
result: Uint8Array = new Uint8Array();

constructor(notifierBuffer: SharedArrayBuffer) {
this.workerToHost = new WorkerToHost(notifierBuffer);
}

setMemory(memory: WebAssembly.Memory) {
this.memory = memory;
}

httpFetch(action: number, actionLength: number) {
console.log(action, actionLength, this.memory?.buffer.byteLength);
const rawAction = new Uint8Array(this.memory?.buffer.slice(action, action + actionLength) ?? []);
console.log(Buffer.from(rawAction).toString('utf-8'));
const rawAction = new Uint8Array(
this.memory?.buffer.slice(action, action + actionLength) ?? []
);
const messageRaw = Buffer.from(rawAction).toString('utf-8');
const message: HttpFetchAction = JSON.parse(messageRaw);
this.callResult = this.workerToHost.callActionOnHost(message);

return this.callResult.length;
}

callResultWrite(ptr: number, length: number) {
try {
const memory = new Uint8Array(this.memory?.buffer ?? []);
memory.set(this.callResult.slice(0, length), ptr);
} catch (err) {
console.error('@callResultWrite: ', err);
}
}

executionResult(ptr: number, length: number) {
this.result = new Uint8Array(
this.memory?.buffer.slice(ptr, ptr + length) ?? []
);
}

getImports(wasiImports: object): WebAssembly.Imports {
return {
// TODO: Data requests should not have this many imports
// we should restrict it to only a few
...wasiImports,
env: {
http_fetch: this.httpFetch.bind(this),
call_result_write: callResultWrite,
execution_result: executionResult,
call_result_write: this.callResultWrite.bind(this),
execution_result: this.executionResult.bind(this),
},
};
}
Expand Down
Loading

0 comments on commit c074579

Please sign in to comment.