Skip to content

Commit

Permalink
fix: add trpc adapter
Browse files Browse the repository at this point in the history
  • Loading branch information
rotorsoft committed Dec 9, 2023
1 parent ae6c665 commit 160968d
Show file tree
Hide file tree
Showing 18 changed files with 1,143 additions and 648 deletions.
4 changes: 2 additions & 2 deletions libs/eventually-aws/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@
"build": "npx tsc --build"
},
"dependencies": {
"@aws-sdk/client-dynamodb": "^3.433.0",
"@aws-sdk/client-dynamodb": "^3.470.0",
"@rotorsoft/eventually": "workspace:^5",
"zod": "^3.22.4"
},
"devDependencies": {
"@rotorsoft/calculator-artifacts": "workspace:^1",
"@types/aws-lambda": "^8.10.125"
"@types/aws-lambda": "^8.10.130"
}
}
4 changes: 2 additions & 2 deletions libs/eventually-broker/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@
"@rotorsoft/eventually": "workspace:^5",
"@rotorsoft/eventually-openapi": "workspace:^0",
"@rotorsoft/eventually-pg": "workspace:^5",
"axios": "^1.5.1",
"axios": "^1.6.2",
"cron": "^2.4.4",
"cron-parser": "^4.9.0",
"cron-validator": "^1.3.1",
"express": "^4.18.2",
"express-handlebars": "^7.1.2",
"helmet": "^7.0.0",
"helmet": "^7.1.0",
"joi": "^17.11.0",
"pg": "^8.11.3",
"pg-listen": "^1.7.0"
Expand Down
4 changes: 2 additions & 2 deletions libs/eventually-express/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@rotorsoft/eventually-express",
"version": "5.7.11",
"description": "Eventually Framework Express Services",
"description": "Eventually ExpressJs Application Adapter",
"author": {
"name": "rotorsoft",
"email": "[email protected]"
Expand All @@ -27,6 +27,6 @@
"express": "^4.18.2"
},
"devDependencies": {
"@types/express": "^4.17.20"
"@types/express": "^4.17.21"
}
}
4 changes: 2 additions & 2 deletions libs/eventually-openapi/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@
},
"dependencies": {
"@rotorsoft/eventually": "workspace:^5",
"axios": "^1.5.1",
"axios": "^1.6.2",
"openapi3-ts": "^4.1.2",
"ts-deepmerge": "^6.2.0",
"validator": "^13.11.0",
"zod": "^3.22.4"
},
"devDependencies": {
"@types/validator": "^13.11.5"
"@types/validator": "^13.11.7"
}
}
2 changes: 1 addition & 1 deletion libs/eventually-pg/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@rotorsoft/eventually-pg",
"version": "5.5.19",
"description": "Eventually Framework Postgres Services",
"description": "Eventually Postgres Store Adapters",
"author": {
"name": "rotorsoft",
"email": "[email protected]"
Expand Down
4 changes: 2 additions & 2 deletions libs/eventually-pg/src/PostgresProjectorStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,11 @@ export const PostgresProjectorStore = <S extends State>(
deleted = 0;
for (const { sql, vals } of deletes) {
log().green().data("sql:", sql, vals);
deleted += (await client.query(sql, vals)).rowCount;
deleted += (await client.query(sql, vals)).rowCount ?? 0;
}
for (const { sql, vals } of upserts) {
log().green().data("sql:", sql, vals);
upserted += (await client.query(sql, vals)).rowCount;
upserted += (await client.query(sql, vals)).rowCount ?? 0;
}
await client.query("COMMIT");
return { upserted, deleted, watermark };
Expand Down
2 changes: 1 addition & 1 deletion libs/eventually-pg/src/PostgresStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ export const PostgresStore = (table: string): Store => {
for (const row of result.rows)
callback(row as unknown as CommittedEvent<E>);

return result.rowCount;
return result.rowCount ?? 0;
},

commit: async <E extends Messages>(
Expand Down
2 changes: 1 addition & 1 deletion libs/eventually-pg/src/PostgresSubscriptionStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ export const PostgresSubscriptionStore = (table: string): SubscriptionStore => {
) {
const sql = `UPDATE "${table}" SET watermark=$2, lease=null, expires=null WHERE "${table}".consumer=$1`;
const vals = [lease.consumer, watermark];
acked = (await client.query(sql, vals)).rowCount > 0;
acked = ((await client.query(sql, vals)).rowCount ?? 0) > 0;
log().silver().data(sql, vals, { acked });
}
await client.query("COMMIT");
Expand Down
5 changes: 5 additions & 0 deletions libs/eventually-trpc/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# [eventually-trpc](https://rotorsoft.github.io/eventually-monorepo/modules/eventually_trpc.html)

[![NPM Version](https://img.shields.io/npm/v/@rotorsoft/eventually-trpc.svg)](https://www.npmjs.com/package/@rotorsoft/eventually-trpc)

[Eventually](../../README.md) library implementing a [tRPC](https://trpc.io/) based application
28 changes: 28 additions & 0 deletions libs/eventually-trpc/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{
"name": "@rotorsoft/eventually-trpc",
"version": "5.7.11",
"description": "Eventually tRPC Application Adapter",
"author": {
"name": "rotorsoft",
"email": "[email protected]"
},
"license": "MIT",
"repository": {
"type": "git",
"url": "git+https://github.com/rotorsoft/eventually-monorepo.git",
"directory": "libs/eventually-trpc"
},
"main": "dist/index",
"types": "dist/index",
"files": [
"dist"
],
"scripts": {
"build": "npx tsc --build"
},
"dependencies": {
"@rotorsoft/eventually": "workspace:^5",
"@rotorsoft/eventually-openapi": "workspace:^0",
"@trpc/server": "^10.44.1"
}
}
170 changes: 170 additions & 0 deletions libs/eventually-trpc/src/TrpcApp.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
import {
Builder,
broker,
log,
type AggregateFactory
// type CommandAdapterFactory,
// type EventHandlerFactory,
// type ProjectorFactory,
// CommandHandlerFactory
} from "@rotorsoft/eventually";
import { httpGetPath, httpPostPath } from "@rotorsoft/eventually-openapi";
import { ProcedureRouterRecord, initTRPC } from "@trpc/server";
import { createHTTPServer } from "@trpc/server/adapters/standalone";
import { Server } from "http";
import { config } from "./config";

const trpc = initTRPC.create();

/**
* tRPC app builder
*
* @remarks Exposes public interface as tRPC endpoints
*/
export class TrpcApp extends Builder {
private _procedures: ProcedureRouterRecord = {};
private _server?: Server;

constructor() {
super();
}

private _withStreams(): void {
// this._router.get("/all", queryHandler);
log()
.green()
.info(
"GET ",
"/all?[stream=...][&names=...][&after=-1][&limit=1][&before=...][&created_after=...][&created_before=...]"
);
// this._router.get("/_stats", statsHandler);
log().green().info("GET ", "/_stats");
// this._router.get("/_subscriptions", subscriptionsHandler);
log().green().info("GET ", "/_subscriptions");
}

private _withGets(factory: AggregateFactory): void {
const path = httpGetPath(factory.name);
// this._router.get(path, getHandler(factory));
log().green().info("GET ", path);
const streamPath = path.concat("/stream");
// this._router.get(streamPath, getStreamHandler(factory));
log().green().info("GET ", streamPath);
}

private _withPosts(): void {
this.artifacts.forEach(({ type, factory, inputs }) => {
const endpoints = inputs
.filter((input) => input.scope === "public")
.map((input) => input.name);
type === "aggregate" && this._withGets(factory as AggregateFactory);
if (type === "policy" || type === "process-manager") {
if (endpoints.length) {
const path = httpPostPath(factory.name, type);
//this._router.post(path, eventHandler(factory as EventHandlerFactory));
log().magenta().info("POST", path, endpoints);
}
} else if (type === "projector") {
//const projector_factory = factory as ProjectorFactory;
//const projector = projector_factory();
const path = httpPostPath(factory.name, type);
if (endpoints.length) {
// projectors expose a route to handle an array of events
//this._router.post(path, projectHandler(projector_factory));
log().magenta().info("POST", path, inputs);
}
//this._router.get(
// path,
// readHandler(projector_factory, projector.schemas.state)
//);
log().green().info("GET ", path);
} else
endpoints.forEach((name) => {
const path = httpPostPath(factory.name, type, name);
// if (type === "command-adapter")
// this._router.post(
// path,
// invokeHandler(factory as CommandAdapterFactory)
// );
// else
// this._router.post(
// path,
// commandHandler(
// factory as CommandHandlerFactory,
// name,
// type === "aggregate"
// )
// );
log().blue().info("POST", path);
});
});
}

//TODO: pass options
build(): void {
super.build();

// route artifacts
this._withPosts();
this.hasStreams && this._withStreams();

// add middleware

// use artifact routes
//this._app.use(this._router);

// ensure catch-all is last handler
//this._app.use(errorHandler);

// log sanitized config
const { service, version, env, logLevel } = config;
log().info("config", service, { env, logLevel, version });
}

/**
* Starts listening for requests
*
* WARNING!
* - Serverless environments provide their own listening framework
* - Use wrappers like serverless-http instead
*
* @param port to override port in config
*/
async listen(port?: number): Promise<void> {
port = port || config.port;
this._server = await new Promise((resolve, reject) => {
try {
const server = createHTTPServer({
router: trpc.router(this._procedures)
});
server.listen(port);
log()
.yellow()
.underlined()
.info(`tRPC server is listening on port ${port}`);

// sync pending subscriptions
void broker().drain();

resolve(server.server);
} catch (error) {
reject(error);
}
});
}

get name(): string {
return "TrcpApp";
}

async dispose(): Promise<void> {
await super.dispose();
if (this._server) {
await new Promise((resolve, reject) => {
this._server && this._server.once("close", resolve);
this._server && this._server.close(reject);
});
this._server = undefined;
}
}
}
53 changes: 53 additions & 0 deletions libs/eventually-trpc/src/config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import { extend, config as target } from "@rotorsoft/eventually";
import * as fs from "node:fs";
import { z } from "zod";

type Package = {
name: string;
version: string;
description: string;
author: {
name: string;
email: string;
};
license: string;
dependencies: Record<string, string>;
};

const getPackage = (): Package => {
const pkg = fs.readFileSync("package.json");
return JSON.parse(pkg.toString()) as Package;
};

/**
* Configuration zod schema
*/
const Schema = z.object({
service: z.string().min(1),
version: z.string().min(1),
description: z.string().min(1),
author: z.object({ name: z.string().min(1), email: z.string() }),
license: z.string().min(1),
dependencies: z.record(z.string()),
port: z.number().int().min(1000).max(65535)
});
export type Config = z.infer<typeof Schema>;

const { PORT } = process.env;
const pkg = getPackage();
const parts = pkg.name.split("/");
const service = parts.at(-1) || "";

export const config = extend(
{
service,
version: pkg.version,
description: pkg.description,
author: { name: pkg.author?.name, email: pkg.author?.email },
license: pkg.license,
dependencies: pkg.dependencies,
port: parseInt(PORT || "3000") || 3000
},
Schema,
target()
);
2 changes: 2 additions & 0 deletions libs/eventually-trpc/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
/** @module eventually-trpc */
export * from "./TrpcApp";
14 changes: 14 additions & 0 deletions libs/eventually-trpc/tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"extends": "../../tsconfig.json",
"compilerOptions": {
"rootDir": "./src",
"outDir": "./dist",
"tsBuildInfoFile": "./dist/.tsbuildinfo",
"composite": true
},

"references": [
{ "path": "../eventually" },
{ "path": "../eventually-openapi" }
]
}
Loading

0 comments on commit 160968d

Please sign in to comment.