Skip to content

Commit

Permalink
[interpreter] Support Http1 based tests
Browse files Browse the repository at this point in the history
  • Loading branch information
igalshilman committed Jun 25, 2024
1 parent c69673b commit 3826843
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 62 deletions.
51 changes: 42 additions & 9 deletions services/node-services/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@
// directory of this repository or package, or at
// https://github.com/restatedev/e2e/blob/main/LICENSE

/* eslint-disable @typescript-eslint/no-explicit-any */

import * as restate from "@restatedev/restate-sdk";
import { endpoint as fetchEndpoint } from "@restatedev/restate-sdk/fetch";
import { endpoint as lambdaEndpoint } from "@restatedev/restate-sdk/lambda";
import process from "node:process";

import "./awakeable_holder";
import "./counter";
Expand All @@ -26,19 +31,47 @@ import "./interpreter/entry_point";
import "./workflow";

import { REGISTRY } from "./services";
import { serve as http1Server } from "./h1server";

class EndpointWrapper<K extends "fetch" | "lambda" | "node", T> {
static fromEnvironment() {
if (process.env.E2E_USE_FETCH) {
return new EndpointWrapper("fetch", fetchEndpoint());
} else if (process.env.AWS_LAMBDA_FUNCTION_NAME) {
return new EndpointWrapper("lambda", lambdaEndpoint());
} else {
return new EndpointWrapper("node", restate.endpoint());
}
}

if (!process.env.SERVICES) {
throw new Error("Cannot find SERVICES env");
constructor(readonly kind: K, readonly endpoint: T) {}
}
const fqdns = new Set(process.env.SERVICES.split(","));
const endpoint = restate.endpoint();
REGISTRY.register(fqdns, endpoint);

const wrapper = EndpointWrapper.fromEnvironment();
REGISTRY.registerFromEnvironment(wrapper.endpoint);

if (process.env.E2E_REQUEST_SIGNING) {
endpoint.withIdentityV1(...process.env.E2E_REQUEST_SIGNING.split(","));
const signing = process.env.E2E_REQUEST_SIGNING;
wrapper.endpoint.withIdentityV1(...signing.split(","));
}
if (!process.env.AWS_LAMBDA_FUNCTION_NAME) {
endpoint.listen();

switch (wrapper.kind) {
case "node": {
wrapper.endpoint.listen();
break;
}
case "fetch": {
http1Server(wrapper.endpoint.handler());
break;
}
case "lambda": {
// do nothing, handler is exported
break;
}
default:
throw new Error("Unknown endpoint type");
}

export const handler = endpoint.lambdaHandler();
// export for lambda. it will be only set for a lambda deployment
export const handler =
wrapper.kind == "lambda" ? wrapper.endpoint.handler() : undefined;
120 changes: 74 additions & 46 deletions services/node-services/src/interpreter/test_containers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,30 @@
import {
GenericContainer,
Network,
PullPolicy,
StartedNetwork,
StartedTestContainer,
} from "testcontainers";

export interface EnvironmentSpec {
restate: {
image: string;
env: Record<string, string>;
pull?: boolean;
};

interpreters: {
image: string;
env: Record<string, string>;
pull?: boolean;
};

service: {
image: string;
env: Record<string, string>;
pull?: boolean;
};
}

export interface TestEnvironment {
ingressUrl: string;
adminUrl: string;
Expand All @@ -31,82 +50,91 @@ export interface Containers {
servicesContainer: StartedTestContainer;
}

export async function setupContainers(): Promise<TestEnvironment> {
export async function setupContainers(
env: EnvironmentSpec
): Promise<TestEnvironment> {
console.log(env);

const network = await new Network().start();

const restate = new GenericContainer("ghcr.io/restatedev/restate:main")
const restate = new GenericContainer(env.restate.image)
.withExposedPorts(8080, 9070)
.withNetwork(network)
.withNetworkAliases("restate")
.withPullPolicy(PullPolicy.alwaysPull())
.withPullPolicy({
shouldPull() {
return env.restate.pull ?? true;
},
})
.withEnvironment({
RESTATE_LOG_FILTER: "restate=warn",
RESTATE_LOG_FORMAT: "json",
...(env.restate?.env ?? {}),
})
.withUlimits({
nproc: { soft: 65535, hard: 65535 },
nofile: { soft: 65535, hard: 65535 },
})
.start();

const zero = new GenericContainer("ghcr.io/restatedev/e2e-node-services:main")
.withNetwork(network)
.withNetworkAliases("interpreter_zero")
.withPullPolicy(PullPolicy.alwaysPull())
.withEnvironment({
PORT: "9000",
RESTATE_LOGGING: "ERROR",
NODE_ENV: "production",
SERVICES: "ObjectInterpreterL0",
})
.start();

const one = new GenericContainer("ghcr.io/restatedev/e2e-node-services:main")
.withNetwork(network)
.withNetworkAliases("interpreter_one")
.withPullPolicy(PullPolicy.alwaysPull())

.withExposedPorts(9001)
.withEnvironment({
PORT: "9001",
const names = ["interpreter_zero", "interpreter_one", "interpreter_two"];
const interpreters = [];
for (let i = 0; i < 3; i++) {
const port = 9000 + i;
const auxEnv = {
PORT: `${port}`,
RESTATE_LOGGING: "ERROR",
NODE_ENV: "production",
SERVICES: "ObjectInterpreterL1",
})
.start();
NODE_OPTIONS: "--max-old-space-size=4096",
SERVICES: `ObjectInterpreterL${i}`,
...(env.interpreters?.env ?? {}),
};
const interpreter = new GenericContainer(env.interpreters.image)
.withNetwork(network)
.withNetworkAliases(names[i])
.withExposedPorts(port)
.withPullPolicy({
shouldPull() {
return env.interpreters.pull ?? true;
},
})
.withEnvironment(auxEnv)
.withUlimits({
nproc: { soft: 65535, hard: 65535 },
nofile: { soft: 65535, hard: 65535 },
})
.start();

const two = new GenericContainer("ghcr.io/restatedev/e2e-node-services:main")
.withNetwork(network)
.withNetworkAliases("interpreter_two")
.withPullPolicy(PullPolicy.alwaysPull())
.withExposedPorts(9002)
.withEnvironment({
PORT: "9002",
RESTATE_LOGGING: "ERROR",
NODE_ENV: "production",
SERVICES: "ObjectInterpreterL2",
})
.start();
interpreters.push(interpreter);
}

const services = new GenericContainer(
"ghcr.io/restatedev/e2e-node-services:main"
)
const services = new GenericContainer(env.service.image)
.withNetwork(network)
.withNetworkAliases("services")
.withPullPolicy(PullPolicy.alwaysPull())
.withPullPolicy({
shouldPull() {
return env.service.pull ?? true;
},
})
.withExposedPorts(9003)
.withEnvironment({
PORT: "9003",
RESTATE_LOGGING: "ERROR",
NODE_ENV: "production",
NODE_OPTIONS: "--max-old-space-size=4096",
SERVICES: "ServiceInterpreterHelper",
...(env.service?.env ?? {}),
})
.withUlimits({
nproc: { soft: 65535, hard: 65535 },
nofile: { soft: 65535, hard: 65535 },
})
.start();

const restateContainer = await restate;
const interpreterZeroContainer = await zero;
const interpreterOneContainer = await one;
const interpreterTwoContainer = await two;
const interpreterZeroContainer = await interpreters[0];
const interpreterOneContainer = await interpreters[1];
const interpreterTwoContainer = await interpreters[2];
const servicesContainer = await services;

const ingressUrl = `http://${restateContainer.getHost()}:${restateContainer.getMappedPort(
Expand Down
40 changes: 36 additions & 4 deletions services/node-services/src/interpreter/test_driver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@ import {
interpreterObjectForLayer,
} from "./interpreter";
import { Random } from "./random";
import { setupContainers, tearDown, TestEnvironment } from "./test_containers";
import {
EnvironmentSpec,
setupContainers,
tearDown,
TestEnvironment,
} from "./test_containers";
import { ProgramGenerator } from "./test_generator";

import * as restate from "@restatedev/restate-sdk-clients";
Expand All @@ -35,6 +40,7 @@ export interface TestConfiguration {
readonly register?: TestConfigurationDeployments; // auto register the following endpoints
readonly bootstrap?: boolean;
readonly crashInterval?: number;
readonly bootstrapEnv?: EnvironmentSpec;
}

export enum TestStatus {
Expand Down Expand Up @@ -136,7 +142,11 @@ export class Test {
console.log(`Ingress is ready.`);
}

async registerEndpoints(adminUrl?: string, deployments?: string[]) {
async registerEndpoints(
useHttp11?: boolean,
adminUrl?: string,
deployments?: string[]
) {
if (!adminUrl) {
throw new Error("Missing adminUrl");
}
Expand All @@ -156,11 +166,13 @@ export class Test {
await sleep(2000);
}
console.log("Admin is ready.");

for (const uri of deployments) {
const res = await fetch(`${adminUrl}/deployments`, {
method: "POST",
body: JSON.stringify({
uri,
use_http_11: useHttp11 ?? false ? true : false,
}),
headers: {
"Content-Type": "application/json",
Expand Down Expand Up @@ -191,8 +203,25 @@ export class Test {
console.log(this.conf);
this.status = TestStatus.RUNNING;

const useHttp11 = process.env.E2E_USE_FETCH?.toLocaleLowerCase() == "true";
if (this.conf.bootstrap) {
this.containers = await setupContainers();
const env: EnvironmentSpec = this.conf.bootstrapEnv ?? {
restate: {
image: "ghcr.io/restatedev/restate:main",
env: {},
},

interpreters: {
image: "ghcr.io/restatedev/e2e-node-services:main",
env: {},
},

service: {
image: "ghcr.io/restatedev/e2e-node-services:main",
env: {},
},
};
this.containers = await setupContainers(env);
console.log(this.containers);
}
const ingressUrl = this.containers?.ingressUrl ?? this.conf.ingress;
Expand All @@ -203,7 +232,10 @@ export class Test {
let ingress = restate.connect({ url: ingressUrl });

if (deployments) {
await this.registerEndpoints(adminUrl, deployments);
console.log(useHttp11);
console.log(adminUrl);
console.log(deployments);
await this.registerEndpoints(useHttp11, adminUrl, deployments);
}
await this.ingressReady(ingressUrl);

Expand Down
10 changes: 7 additions & 3 deletions services/node-services/src/services.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,14 @@
// https://github.com/restatedev/e2e/blob/main/LICENSE

import {
RestateEndpoint,
ServiceDefinition,
VirtualObjectDefinition,
WorkflowDefinition,
} from "@restatedev/restate-sdk";

export type IComponent = {
fqdn: string;
binder: (endpoint: RestateEndpoint) => void;
binder: (endpoint: { bind: (what: unknown) => void }) => void;
};

export class ComponentRegistry {
Expand Down Expand Up @@ -47,7 +46,12 @@ export class ComponentRegistry {
});
}

register(fqdns: Set<string>, e: RestateEndpoint) {
/* eslint-disable-next-line @typescript-eslint/no-explicit-any */
registerFromEnvironment(e: { bind: (what: any) => any }) {
if (!process.env.SERVICES) {
throw new Error("Cannot find SERVICES env");
}
const fqdns = new Set(process.env.SERVICES.split(","));
fqdns.forEach((fqdn) => {
const c = this.components.get(fqdn);
if (!c) {
Expand Down

0 comments on commit 3826843

Please sign in to comment.