From 8f19a75220a89c584ef43db461d0a011cf8beb5d Mon Sep 17 00:00:00 2001 From: igalshilman Date: Tue, 25 Jun 2024 13:29:59 +0200 Subject: [PATCH 1/3] [interpreter] Support Http1 based tests --- services/node-services/src/app.ts | 51 ++++++-- .../src/interpreter/test_containers.ts | 120 +++++++++++------- .../src/interpreter/test_driver.ts | 40 +++++- services/node-services/src/services.ts | 10 +- verification/run-request-response.sh | 60 +++++++++ 5 files changed, 219 insertions(+), 62 deletions(-) create mode 100755 verification/run-request-response.sh diff --git a/services/node-services/src/app.ts b/services/node-services/src/app.ts index 82b70c85..74f0a71f 100644 --- a/services/node-services/src/app.ts +++ b/services/node-services/src/app.ts @@ -7,7 +7,12 @@ // directory of this repository or package, or at // https://github.com/restatedev/e2e/blob/main/LICENSE +/* eslint-disable @typescript-eslint/no-explicit-any */ + import * as restate from "@restatedev/restate-sdk"; +import { endpoint as fetchEndpoint } from "@restatedev/restate-sdk/fetch"; +import { endpoint as lambdaEndpoint } from "@restatedev/restate-sdk/lambda"; +import process from "node:process"; import "./awakeable_holder"; import "./counter"; @@ -26,19 +31,47 @@ import "./interpreter/entry_point"; import "./workflow"; import { REGISTRY } from "./services"; +import { serve as http1Server } from "./h1server"; + +class EndpointWrapper { + static fromEnvironment() { + if (process.env.E2E_USE_FETCH) { + return new EndpointWrapper("fetch", fetchEndpoint()); + } else if (process.env.AWS_LAMBDA_FUNCTION_NAME) { + return new EndpointWrapper("lambda", lambdaEndpoint()); + } else { + return new EndpointWrapper("node", restate.endpoint()); + } + } -if (!process.env.SERVICES) { - throw new Error("Cannot find SERVICES env"); + constructor(readonly kind: K, readonly endpoint: T) {} } -const fqdns = new Set(process.env.SERVICES.split(",")); -const endpoint = restate.endpoint(); -REGISTRY.register(fqdns, endpoint); + +const wrapper = EndpointWrapper.fromEnvironment(); +REGISTRY.registerFromEnvironment(wrapper.endpoint); if (process.env.E2E_REQUEST_SIGNING) { - endpoint.withIdentityV1(...process.env.E2E_REQUEST_SIGNING.split(",")); + const signing = process.env.E2E_REQUEST_SIGNING; + wrapper.endpoint.withIdentityV1(...signing.split(",")); } -if (!process.env.AWS_LAMBDA_FUNCTION_NAME) { - endpoint.listen(); + +switch (wrapper.kind) { + case "node": { + wrapper.endpoint.listen(); + break; + } + case "fetch": { + http1Server(wrapper.endpoint.handler()); + break; + } + case "lambda": { + // do nothing, handler is exported + break; + } + default: + throw new Error("Unknown endpoint type"); } -export const handler = endpoint.lambdaHandler(); +// export for lambda. it will be only set for a lambda deployment +export const handler = + wrapper.kind == "lambda" ? wrapper.endpoint.handler() : undefined; diff --git a/services/node-services/src/interpreter/test_containers.ts b/services/node-services/src/interpreter/test_containers.ts index 79c1dd61..f31f5dbd 100644 --- a/services/node-services/src/interpreter/test_containers.ts +++ b/services/node-services/src/interpreter/test_containers.ts @@ -10,11 +10,30 @@ import { GenericContainer, Network, - PullPolicy, StartedNetwork, StartedTestContainer, } from "testcontainers"; +export interface EnvironmentSpec { + restate: { + image: string; + env: Record; + pull?: boolean; + }; + + interpreters: { + image: string; + env: Record; + pull?: boolean; + }; + + service: { + image: string; + env: Record; + pull?: boolean; + }; +} + export interface TestEnvironment { ingressUrl: string; adminUrl: string; @@ -31,17 +50,26 @@ export interface Containers { servicesContainer: StartedTestContainer; } -export async function setupContainers(): Promise { +export async function setupContainers( + env: EnvironmentSpec +): Promise { + console.log(env); + const network = await new Network().start(); - const restate = new GenericContainer("ghcr.io/restatedev/restate:main") + const restate = new GenericContainer(env.restate.image) .withExposedPorts(8080, 9070) .withNetwork(network) .withNetworkAliases("restate") - .withPullPolicy(PullPolicy.alwaysPull()) + .withPullPolicy({ + shouldPull() { + return env.restate.pull ?? true; + }, + }) .withEnvironment({ RESTATE_LOG_FILTER: "restate=warn", RESTATE_LOG_FORMAT: "json", + ...(env.restate?.env ?? {}), }) .withUlimits({ nproc: { soft: 65535, hard: 65535 }, @@ -49,64 +77,64 @@ export async function setupContainers(): Promise { }) .start(); - const zero = new GenericContainer("ghcr.io/restatedev/e2e-node-services:main") - .withNetwork(network) - .withNetworkAliases("interpreter_zero") - .withPullPolicy(PullPolicy.alwaysPull()) - .withEnvironment({ - PORT: "9000", - RESTATE_LOGGING: "ERROR", - NODE_ENV: "production", - SERVICES: "ObjectInterpreterL0", - }) - .start(); - - const one = new GenericContainer("ghcr.io/restatedev/e2e-node-services:main") - .withNetwork(network) - .withNetworkAliases("interpreter_one") - .withPullPolicy(PullPolicy.alwaysPull()) - - .withExposedPorts(9001) - .withEnvironment({ - PORT: "9001", + const names = ["interpreter_zero", "interpreter_one", "interpreter_two"]; + const interpreters = []; + for (let i = 0; i < 3; i++) { + const port = 9000 + i; + const auxEnv = { + PORT: `${port}`, RESTATE_LOGGING: "ERROR", NODE_ENV: "production", - SERVICES: "ObjectInterpreterL1", - }) - .start(); + NODE_OPTIONS: "--max-old-space-size=4096", + SERVICES: `ObjectInterpreterL${i}`, + ...(env.interpreters?.env ?? {}), + }; + const interpreter = new GenericContainer(env.interpreters.image) + .withNetwork(network) + .withNetworkAliases(names[i]) + .withExposedPorts(port) + .withPullPolicy({ + shouldPull() { + return env.interpreters.pull ?? true; + }, + }) + .withEnvironment(auxEnv) + .withUlimits({ + nproc: { soft: 65535, hard: 65535 }, + nofile: { soft: 65535, hard: 65535 }, + }) + .start(); - const two = new GenericContainer("ghcr.io/restatedev/e2e-node-services:main") - .withNetwork(network) - .withNetworkAliases("interpreter_two") - .withPullPolicy(PullPolicy.alwaysPull()) - .withExposedPorts(9002) - .withEnvironment({ - PORT: "9002", - RESTATE_LOGGING: "ERROR", - NODE_ENV: "production", - SERVICES: "ObjectInterpreterL2", - }) - .start(); + interpreters.push(interpreter); + } - const services = new GenericContainer( - "ghcr.io/restatedev/e2e-node-services:main" - ) + const services = new GenericContainer(env.service.image) .withNetwork(network) .withNetworkAliases("services") - .withPullPolicy(PullPolicy.alwaysPull()) + .withPullPolicy({ + shouldPull() { + return env.service.pull ?? true; + }, + }) .withExposedPorts(9003) .withEnvironment({ PORT: "9003", RESTATE_LOGGING: "ERROR", NODE_ENV: "production", + NODE_OPTIONS: "--max-old-space-size=4096", SERVICES: "ServiceInterpreterHelper", + ...(env.service?.env ?? {}), + }) + .withUlimits({ + nproc: { soft: 65535, hard: 65535 }, + nofile: { soft: 65535, hard: 65535 }, }) .start(); const restateContainer = await restate; - const interpreterZeroContainer = await zero; - const interpreterOneContainer = await one; - const interpreterTwoContainer = await two; + const interpreterZeroContainer = await interpreters[0]; + const interpreterOneContainer = await interpreters[1]; + const interpreterTwoContainer = await interpreters[2]; const servicesContainer = await services; const ingressUrl = `http://${restateContainer.getHost()}:${restateContainer.getMappedPort( diff --git a/services/node-services/src/interpreter/test_driver.ts b/services/node-services/src/interpreter/test_driver.ts index 36526c9f..4ed4b64b 100644 --- a/services/node-services/src/interpreter/test_driver.ts +++ b/services/node-services/src/interpreter/test_driver.ts @@ -13,7 +13,12 @@ import { interpreterObjectForLayer, } from "./interpreter"; import { Random } from "./random"; -import { setupContainers, tearDown, TestEnvironment } from "./test_containers"; +import { + EnvironmentSpec, + setupContainers, + tearDown, + TestEnvironment, +} from "./test_containers"; import { ProgramGenerator } from "./test_generator"; import * as restate from "@restatedev/restate-sdk-clients"; @@ -35,6 +40,7 @@ export interface TestConfiguration { readonly register?: TestConfigurationDeployments; // auto register the following endpoints readonly bootstrap?: boolean; readonly crashInterval?: number; + readonly bootstrapEnv?: EnvironmentSpec; } export enum TestStatus { @@ -136,7 +142,11 @@ export class Test { console.log(`Ingress is ready.`); } - async registerEndpoints(adminUrl?: string, deployments?: string[]) { + async registerEndpoints( + useHttp11?: boolean, + adminUrl?: string, + deployments?: string[] + ) { if (!adminUrl) { throw new Error("Missing adminUrl"); } @@ -156,11 +166,13 @@ export class Test { await sleep(2000); } console.log("Admin is ready."); + for (const uri of deployments) { const res = await fetch(`${adminUrl}/deployments`, { method: "POST", body: JSON.stringify({ uri, + use_http_11: useHttp11 ?? false ? true : false, }), headers: { "Content-Type": "application/json", @@ -191,8 +203,25 @@ export class Test { console.log(this.conf); this.status = TestStatus.RUNNING; + const useHttp11 = process.env.E2E_USE_FETCH?.toLocaleLowerCase() == "true"; if (this.conf.bootstrap) { - this.containers = await setupContainers(); + const env: EnvironmentSpec = this.conf.bootstrapEnv ?? { + restate: { + image: "ghcr.io/restatedev/restate:main", + env: {}, + }, + + interpreters: { + image: "ghcr.io/restatedev/e2e-node-services:main", + env: {}, + }, + + service: { + image: "ghcr.io/restatedev/e2e-node-services:main", + env: {}, + }, + }; + this.containers = await setupContainers(env); console.log(this.containers); } const ingressUrl = this.containers?.ingressUrl ?? this.conf.ingress; @@ -203,7 +232,10 @@ export class Test { let ingress = restate.connect({ url: ingressUrl }); if (deployments) { - await this.registerEndpoints(adminUrl, deployments); + console.log(useHttp11); + console.log(adminUrl); + console.log(deployments); + await this.registerEndpoints(useHttp11, adminUrl, deployments); } await this.ingressReady(ingressUrl); diff --git a/services/node-services/src/services.ts b/services/node-services/src/services.ts index 019e42bc..86f5994a 100644 --- a/services/node-services/src/services.ts +++ b/services/node-services/src/services.ts @@ -8,7 +8,6 @@ // https://github.com/restatedev/e2e/blob/main/LICENSE import { - RestateEndpoint, ServiceDefinition, VirtualObjectDefinition, WorkflowDefinition, @@ -16,7 +15,7 @@ import { export type IComponent = { fqdn: string; - binder: (endpoint: RestateEndpoint) => void; + binder: (endpoint: { bind: (what: unknown) => void }) => void; }; export class ComponentRegistry { @@ -47,7 +46,12 @@ export class ComponentRegistry { }); } - register(fqdns: Set, e: RestateEndpoint) { + /* eslint-disable-next-line @typescript-eslint/no-explicit-any */ + registerFromEnvironment(e: { bind: (what: any) => any }) { + if (!process.env.SERVICES) { + throw new Error("Cannot find SERVICES env"); + } + const fqdns = new Set(process.env.SERVICES.split(",")); fqdns.forEach((fqdn) => { const c = this.components.get(fqdn); if (!c) { diff --git a/verification/run-request-response.sh b/verification/run-request-response.sh new file mode 100755 index 00000000..db14bc50 --- /dev/null +++ b/verification/run-request-response.sh @@ -0,0 +1,60 @@ +#!/usr/bin/env bash + +export SERVICES=InterpreterDriverJob +export NODE_ENV=production +export NODE_OPTIONS="--max-old-space-size=4096" +export E2E_USE_FETCH=true +export DEBUG=testcontainers:containers + +SEED=$(date --iso-8601=seconds) + + +# "image": "ghcr.io/restatedev/e2e-node-services:main", + +export INTERPRETER_DRIVER_CONF=$(cat <<-EOF +{ + "seed" : "${SEED}", + "keys" : 100000, + "tests" : 1000000, + "maxProgramSize" : 50, + "crashInterval" : 900000, + "bootstrap" : true, + "bootstrapEnv" : { + "restate": { + "image": "ghcr.io/restatedev/restate:main", + "env": {} + }, + "interpreters" : { + "image": "e2enode", + "pull" : false, + "env": { + "E2E_USE_FETCH" : ${E2E_USE_FETCH} + } + }, + "service": { + "image": "e2enode", + "pull" : false, + "env": { + "E2E_USE_FETCH" : ${E2E_USE_FETCH} + } + } + } +} +EOF +) + +#docker pull ghcr.io/restatedev/e2e-node-services:main + +docker run \ + --net host\ + -v /var/run/docker.sock:/var/run/docker.sock \ + --env SERVICES \ + --env E2E_USE_FETCH \ + --env NODE_ENV \ + --env NODE_OPTIONS \ + --env DEBUG \ + --env INTERPRETER_DRIVER_CONF \ + e2enode 2>&1 | grep -v "undefined is not a number, but it still has feelings" + +# ghcr.io/restatedev/e2e-node-services:main 2>&1 | grep -v "undefined is not a number, but it still has feelings" + From 2c932384ea2fee736b5ef02323f2c1d6915f7b80 Mon Sep 17 00:00:00 2001 From: igalshilman Date: Tue, 25 Jun 2024 13:40:19 +0200 Subject: [PATCH 2/3] wip --- services/node-services/src/app.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/node-services/src/app.ts b/services/node-services/src/app.ts index 74f0a71f..32b9e40e 100644 --- a/services/node-services/src/app.ts +++ b/services/node-services/src/app.ts @@ -31,7 +31,7 @@ import "./interpreter/entry_point"; import "./workflow"; import { REGISTRY } from "./services"; -import { serve as http1Server } from "./h1server"; +import http1Server from "./h1server"; class EndpointWrapper { static fromEnvironment() { From 7d04dddf8e8290e40fb0b9e234668e6da6bc5b45 Mon Sep 17 00:00:00 2001 From: igalshilman Date: Tue, 25 Jun 2024 13:46:30 +0200 Subject: [PATCH 3/3] wip --- services/node-services/src/h1server.ts | 87 ++++++++++++++++++++++++++ 1 file changed, 87 insertions(+) create mode 100644 services/node-services/src/h1server.ts diff --git a/services/node-services/src/h1server.ts b/services/node-services/src/h1server.ts new file mode 100644 index 00000000..1df4f05c --- /dev/null +++ b/services/node-services/src/h1server.ts @@ -0,0 +1,87 @@ +// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH +// +// This file is part of the Restate e2e tests, +// which are released under the MIT license. +// +// You can find a copy of the license in file LICENSE in the root +// directory of this repository or package, or at +// https://github.com/restatedev/e2e/blob/main/LICENSE + +import http from "node:http"; +import { Buffer } from "node:buffer"; + +/* eslint-disable @typescript-eslint/no-explicit-any */ + +export default function h1server(handler: { + fetch: (request: Request) => Promise; +}) { + let port = 9080; + if (process.env.PORT) { + try { + port = parseInt(process.env.PORT); + } catch (e) { + console.log(`Failed parsing ${process.env.PORT} with: `, e); + throw e; + } + } + const nodeHandler = nodeHandlerFromFetchHandler(handler.fetch); + http + .createServer( + { + keepAlive: true, + }, + nodeHandler + ) + .listen({ port, host: "0.0.0.0", backlog: 1024 * 1024 }); +} + +function nodeHandlerFromFetchHandler( + handler: (request: Request) => Promise +): (req: http.IncomingMessage, res: http.ServerResponse) => void { + return (r, res) => { + toWeb(r) + .then((request) => handler(request)) + .then(async (response: any) => { + const body = await response.arrayBuffer(); + const headers = Object.fromEntries(response.headers.entries()); + res.writeHead(response.status, headers); + res.end(Buffer.from(body)); + }); + }; +} + +function toWeb(req: http.IncomingMessage): Promise { + const { headers, method, url } = req; + + return new Promise((resolve, reject) => { + const body: Uint8Array[] = []; + + req.on("data", (chunk: Uint8Array) => { + body.push(chunk); + }); + + req.on("end", () => { + const buf = Buffer.concat(body); + + const h = new Headers(); + for (const [k, v] of Object.entries(headers)) { + // FIXME v can be something other than string. + h.append(k, v as string); + } + + // Create a new Request object + const fullURL = new URL(`http://localhost${url}`); + const request = new Request(fullURL, { + method, + headers: h, + body: method !== "GET" && method !== "HEAD" ? buf : undefined, + }); + + resolve(request); + }); + + req.on("error", (err) => { + reject(err); + }); + }); +}