diff --git a/services/node-services/src/app.ts b/services/node-services/src/app.ts index 82b70c85..74f0a71f 100644 --- a/services/node-services/src/app.ts +++ b/services/node-services/src/app.ts @@ -7,7 +7,12 @@ // directory of this repository or package, or at // https://github.com/restatedev/e2e/blob/main/LICENSE +/* eslint-disable @typescript-eslint/no-explicit-any */ + import * as restate from "@restatedev/restate-sdk"; +import { endpoint as fetchEndpoint } from "@restatedev/restate-sdk/fetch"; +import { endpoint as lambdaEndpoint } from "@restatedev/restate-sdk/lambda"; +import process from "node:process"; import "./awakeable_holder"; import "./counter"; @@ -26,19 +31,47 @@ import "./interpreter/entry_point"; import "./workflow"; import { REGISTRY } from "./services"; +import { serve as http1Server } from "./h1server"; + +class EndpointWrapper { + static fromEnvironment() { + if (process.env.E2E_USE_FETCH) { + return new EndpointWrapper("fetch", fetchEndpoint()); + } else if (process.env.AWS_LAMBDA_FUNCTION_NAME) { + return new EndpointWrapper("lambda", lambdaEndpoint()); + } else { + return new EndpointWrapper("node", restate.endpoint()); + } + } -if (!process.env.SERVICES) { - throw new Error("Cannot find SERVICES env"); + constructor(readonly kind: K, readonly endpoint: T) {} } -const fqdns = new Set(process.env.SERVICES.split(",")); -const endpoint = restate.endpoint(); -REGISTRY.register(fqdns, endpoint); + +const wrapper = EndpointWrapper.fromEnvironment(); +REGISTRY.registerFromEnvironment(wrapper.endpoint); if (process.env.E2E_REQUEST_SIGNING) { - endpoint.withIdentityV1(...process.env.E2E_REQUEST_SIGNING.split(",")); + const signing = process.env.E2E_REQUEST_SIGNING; + wrapper.endpoint.withIdentityV1(...signing.split(",")); } -if (!process.env.AWS_LAMBDA_FUNCTION_NAME) { - endpoint.listen(); + +switch (wrapper.kind) { + case "node": { + wrapper.endpoint.listen(); + break; + } + case "fetch": { + http1Server(wrapper.endpoint.handler()); + break; + } + case "lambda": { + // do nothing, handler is exported + break; + } + default: + throw new Error("Unknown endpoint type"); } -export const handler = endpoint.lambdaHandler(); +// export for lambda. it will be only set for a lambda deployment +export const handler = + wrapper.kind == "lambda" ? wrapper.endpoint.handler() : undefined; diff --git a/services/node-services/src/interpreter/test_containers.ts b/services/node-services/src/interpreter/test_containers.ts index 79c1dd61..f31f5dbd 100644 --- a/services/node-services/src/interpreter/test_containers.ts +++ b/services/node-services/src/interpreter/test_containers.ts @@ -10,11 +10,30 @@ import { GenericContainer, Network, - PullPolicy, StartedNetwork, StartedTestContainer, } from "testcontainers"; +export interface EnvironmentSpec { + restate: { + image: string; + env: Record; + pull?: boolean; + }; + + interpreters: { + image: string; + env: Record; + pull?: boolean; + }; + + service: { + image: string; + env: Record; + pull?: boolean; + }; +} + export interface TestEnvironment { ingressUrl: string; adminUrl: string; @@ -31,17 +50,26 @@ export interface Containers { servicesContainer: StartedTestContainer; } -export async function setupContainers(): Promise { +export async function setupContainers( + env: EnvironmentSpec +): Promise { + console.log(env); + const network = await new Network().start(); - const restate = new GenericContainer("ghcr.io/restatedev/restate:main") + const restate = new GenericContainer(env.restate.image) .withExposedPorts(8080, 9070) .withNetwork(network) .withNetworkAliases("restate") - .withPullPolicy(PullPolicy.alwaysPull()) + .withPullPolicy({ + shouldPull() { + return env.restate.pull ?? true; + }, + }) .withEnvironment({ RESTATE_LOG_FILTER: "restate=warn", RESTATE_LOG_FORMAT: "json", + ...(env.restate?.env ?? {}), }) .withUlimits({ nproc: { soft: 65535, hard: 65535 }, @@ -49,64 +77,64 @@ export async function setupContainers(): Promise { }) .start(); - const zero = new GenericContainer("ghcr.io/restatedev/e2e-node-services:main") - .withNetwork(network) - .withNetworkAliases("interpreter_zero") - .withPullPolicy(PullPolicy.alwaysPull()) - .withEnvironment({ - PORT: "9000", - RESTATE_LOGGING: "ERROR", - NODE_ENV: "production", - SERVICES: "ObjectInterpreterL0", - }) - .start(); - - const one = new GenericContainer("ghcr.io/restatedev/e2e-node-services:main") - .withNetwork(network) - .withNetworkAliases("interpreter_one") - .withPullPolicy(PullPolicy.alwaysPull()) - - .withExposedPorts(9001) - .withEnvironment({ - PORT: "9001", + const names = ["interpreter_zero", "interpreter_one", "interpreter_two"]; + const interpreters = []; + for (let i = 0; i < 3; i++) { + const port = 9000 + i; + const auxEnv = { + PORT: `${port}`, RESTATE_LOGGING: "ERROR", NODE_ENV: "production", - SERVICES: "ObjectInterpreterL1", - }) - .start(); + NODE_OPTIONS: "--max-old-space-size=4096", + SERVICES: `ObjectInterpreterL${i}`, + ...(env.interpreters?.env ?? {}), + }; + const interpreter = new GenericContainer(env.interpreters.image) + .withNetwork(network) + .withNetworkAliases(names[i]) + .withExposedPorts(port) + .withPullPolicy({ + shouldPull() { + return env.interpreters.pull ?? true; + }, + }) + .withEnvironment(auxEnv) + .withUlimits({ + nproc: { soft: 65535, hard: 65535 }, + nofile: { soft: 65535, hard: 65535 }, + }) + .start(); - const two = new GenericContainer("ghcr.io/restatedev/e2e-node-services:main") - .withNetwork(network) - .withNetworkAliases("interpreter_two") - .withPullPolicy(PullPolicy.alwaysPull()) - .withExposedPorts(9002) - .withEnvironment({ - PORT: "9002", - RESTATE_LOGGING: "ERROR", - NODE_ENV: "production", - SERVICES: "ObjectInterpreterL2", - }) - .start(); + interpreters.push(interpreter); + } - const services = new GenericContainer( - "ghcr.io/restatedev/e2e-node-services:main" - ) + const services = new GenericContainer(env.service.image) .withNetwork(network) .withNetworkAliases("services") - .withPullPolicy(PullPolicy.alwaysPull()) + .withPullPolicy({ + shouldPull() { + return env.service.pull ?? true; + }, + }) .withExposedPorts(9003) .withEnvironment({ PORT: "9003", RESTATE_LOGGING: "ERROR", NODE_ENV: "production", + NODE_OPTIONS: "--max-old-space-size=4096", SERVICES: "ServiceInterpreterHelper", + ...(env.service?.env ?? {}), + }) + .withUlimits({ + nproc: { soft: 65535, hard: 65535 }, + nofile: { soft: 65535, hard: 65535 }, }) .start(); const restateContainer = await restate; - const interpreterZeroContainer = await zero; - const interpreterOneContainer = await one; - const interpreterTwoContainer = await two; + const interpreterZeroContainer = await interpreters[0]; + const interpreterOneContainer = await interpreters[1]; + const interpreterTwoContainer = await interpreters[2]; const servicesContainer = await services; const ingressUrl = `http://${restateContainer.getHost()}:${restateContainer.getMappedPort( diff --git a/services/node-services/src/interpreter/test_driver.ts b/services/node-services/src/interpreter/test_driver.ts index 36526c9f..4ed4b64b 100644 --- a/services/node-services/src/interpreter/test_driver.ts +++ b/services/node-services/src/interpreter/test_driver.ts @@ -13,7 +13,12 @@ import { interpreterObjectForLayer, } from "./interpreter"; import { Random } from "./random"; -import { setupContainers, tearDown, TestEnvironment } from "./test_containers"; +import { + EnvironmentSpec, + setupContainers, + tearDown, + TestEnvironment, +} from "./test_containers"; import { ProgramGenerator } from "./test_generator"; import * as restate from "@restatedev/restate-sdk-clients"; @@ -35,6 +40,7 @@ export interface TestConfiguration { readonly register?: TestConfigurationDeployments; // auto register the following endpoints readonly bootstrap?: boolean; readonly crashInterval?: number; + readonly bootstrapEnv?: EnvironmentSpec; } export enum TestStatus { @@ -136,7 +142,11 @@ export class Test { console.log(`Ingress is ready.`); } - async registerEndpoints(adminUrl?: string, deployments?: string[]) { + async registerEndpoints( + useHttp11?: boolean, + adminUrl?: string, + deployments?: string[] + ) { if (!adminUrl) { throw new Error("Missing adminUrl"); } @@ -156,11 +166,13 @@ export class Test { await sleep(2000); } console.log("Admin is ready."); + for (const uri of deployments) { const res = await fetch(`${adminUrl}/deployments`, { method: "POST", body: JSON.stringify({ uri, + use_http_11: useHttp11 ?? false ? true : false, }), headers: { "Content-Type": "application/json", @@ -191,8 +203,25 @@ export class Test { console.log(this.conf); this.status = TestStatus.RUNNING; + const useHttp11 = process.env.E2E_USE_FETCH?.toLocaleLowerCase() == "true"; if (this.conf.bootstrap) { - this.containers = await setupContainers(); + const env: EnvironmentSpec = this.conf.bootstrapEnv ?? { + restate: { + image: "ghcr.io/restatedev/restate:main", + env: {}, + }, + + interpreters: { + image: "ghcr.io/restatedev/e2e-node-services:main", + env: {}, + }, + + service: { + image: "ghcr.io/restatedev/e2e-node-services:main", + env: {}, + }, + }; + this.containers = await setupContainers(env); console.log(this.containers); } const ingressUrl = this.containers?.ingressUrl ?? this.conf.ingress; @@ -203,7 +232,10 @@ export class Test { let ingress = restate.connect({ url: ingressUrl }); if (deployments) { - await this.registerEndpoints(adminUrl, deployments); + console.log(useHttp11); + console.log(adminUrl); + console.log(deployments); + await this.registerEndpoints(useHttp11, adminUrl, deployments); } await this.ingressReady(ingressUrl); diff --git a/services/node-services/src/services.ts b/services/node-services/src/services.ts index 019e42bc..86f5994a 100644 --- a/services/node-services/src/services.ts +++ b/services/node-services/src/services.ts @@ -8,7 +8,6 @@ // https://github.com/restatedev/e2e/blob/main/LICENSE import { - RestateEndpoint, ServiceDefinition, VirtualObjectDefinition, WorkflowDefinition, @@ -16,7 +15,7 @@ import { export type IComponent = { fqdn: string; - binder: (endpoint: RestateEndpoint) => void; + binder: (endpoint: { bind: (what: unknown) => void }) => void; }; export class ComponentRegistry { @@ -47,7 +46,12 @@ export class ComponentRegistry { }); } - register(fqdns: Set, e: RestateEndpoint) { + /* eslint-disable-next-line @typescript-eslint/no-explicit-any */ + registerFromEnvironment(e: { bind: (what: any) => any }) { + if (!process.env.SERVICES) { + throw new Error("Cannot find SERVICES env"); + } + const fqdns = new Set(process.env.SERVICES.split(",")); fqdns.forEach((fqdn) => { const c = this.components.get(fqdn); if (!c) {