Skip to content

Commit

Permalink
Added ability to search by recent messages
Browse files Browse the repository at this point in the history
Ref: #30
  • Loading branch information
samuelpapineau committed Nov 24, 2023
1 parent d9eb748 commit b023f2c
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 5 deletions.
3 changes: 3 additions & 0 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ export const DEFAULT_PORT = 3000
export const DEFAULT_HOSTNAME = "0.0.0.0"
export const DEFAULT_VERBOSE = false
export const DEFAULT_SQLITE_FILENAME = "db.sqlite"
export const DEFAULT_RECENT_MESSAGES_LIMIT = 50

// parse command line options
const opts = new Command()
Expand All @@ -19,6 +20,7 @@ const opts = new Command()
.addOption(new Option("--hostname <string>", "Server listen on HTTP hostname").default(DEFAULT_HOSTNAME).env("HOSTNAME"))
.addOption(new Option("--sqlite-filename <string>", "SQLite database filename").default(DEFAULT_SQLITE_FILENAME).env("SQLITE_FILENAME"))
.addOption(new Option("--verbose <boolean>", "Enable verbose logging").default(DEFAULT_VERBOSE).env("VERBOSE"))
.addOption(new Option("--recent-messages-limit <int>", "Limit recent messages").default(DEFAULT_RECENT_MESSAGES_LIMIT).env("RECENT_MESSAGES_LIMIT"))
.version(pkg.version)
.parse(process.argv).opts();

Expand All @@ -28,6 +30,7 @@ export const PORT = Number(opts.port);
export const HOSTNAME: string = opts.hostname
export const SQLITE_FILENAME: string = opts.sqliteFilename;
export const VERBOSE: boolean = opts.verbose === "true" ? true : false;
export const RECENT_MESSAGES_LIMIT: number = Number(opts.recentMessagesLimit);

// validate required options
if (!PUBLIC_KEY) throw new Error("PUBLIC_KEY is required");
Expand Down
18 changes: 18 additions & 0 deletions src/fetch/GET.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import openapi from "./openapi.js";
import swaggerHtml from "../../swagger/index.html"
import swaggerFavicon from "../../swagger/favicon.png"
import { toFile, toJSON, toText } from "./cors.js";
import { recentMessagesEndpoint } from "../recentMessages.js";
import { DEFAULT_RECENT_MESSAGES_LIMIT } from "../config.js";

export default async function (req: Request, server: Server) {
const { pathname, searchParams} = new URL(req.url);
Expand All @@ -17,7 +19,21 @@ export default async function (req: Request, server: Server) {
const key = req.headers.get("sec-websocket-key")
const chain = searchParams.get("chain")
const moduleHash = searchParams.get("moduleHash");
const distinct = searchParams.get("distinct");
const success = server.upgrade(req, {data: {key, chain, moduleHash}});
let limit = Number(searchParams.get("limit"));
let sort = searchParams.get("sort");

//error handling

if (limit === null || limit === 0) limit = DEFAULT_RECENT_MESSAGES_LIMIT;
if (isNaN(Number(limit))) return toText("limit must be a number", 400 );
if (sort === null) sort = "desc";
console.log(distinct)
if (distinct !== "true" && distinct !== null) return toText("distinct must be set to true if declared", 400 );
if (distinct === "true" && chain) return toText("chain cannot be set if distinct is set to true", 400 );
if (sort !== "asc" && sort !== "desc") return toText("sort must be asc or desc", 400 );

if (success) {
logger.info('upgrade', {key, chain, moduleHash});
return;
Expand All @@ -32,5 +48,7 @@ export default async function (req: Request, server: Server) {
if ( pathname === "/traceId") return toJSON(sqlite.selectAll(db, "traceId"));
if ( pathname === "/chain") return toJSON(sqlite.selectAll(db, "chain"));
if ( pathname === "/openapi") return toJSON(openapi);
if ( pathname === "/recentMessages") return recentMessagesEndpoint(db, chain, moduleHash, limit, distinct, sort);

return toText("Not found", 400 );
}
4 changes: 4 additions & 0 deletions src/fetch/POST.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { verify } from "../verify.js";
import { PUBLIC_KEY } from "../config.js";
import { Server } from "bun";
import { toText } from "./cors.js";
import { recentMessages } from "../recentMessages.js";

export default async function (req: Request, server: Server) {
// get headers and body from POST request
Expand Down Expand Up @@ -84,6 +85,9 @@ export default async function (req: Request, server: Server) {
sqlite.replace(db, "moduleHash", moduleHash, timestamp);
sqlite.replace(db, "moduleHashByChain", `${chain}:${moduleHash}`, timestamp);
sqlite.replace(db, "traceId", `${chain}:${traceId}`, timestamp);
//Set timestamp as key to filter recent messages

recentMessages( db, traceId, JSON.stringify(json), chain );

return toText("OK");
}
62 changes: 62 additions & 0 deletions src/recentMessages.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import * as sqlite from "../src/sqlite.js";
import { RECENT_MESSAGES_LIMIT } from "./config.js";
import { toJSON, toText } from "./http.js";

export function recentMessages(db: any, traceId: string, timestamp: string, chain?: string) {
const dbLength = sqlite.count(db, "recentMessages");

if (dbLength >= RECENT_MESSAGES_LIMIT) {
let oldest = sqlite.selectAll(db, "recentMessages").sort((a: any, b: any) => a.timestamp - b.timestamp)[0];
console.log("oldest", oldest)

//update recentMessages
sqlite.replaceRecent(db, "recentMessages", String(Date.now()), `${traceId}`, timestamp);
sqlite.deleteRow(db, "recentMessages", oldest.key);

//update recentMessagesByChain
if (chain) {
oldest = sqlite.selectAll(db, "recentMessagesByChain").sort((a: any, b: any) => a.timestamp - b.timestamp)[0];
console.log(oldest)
sqlite.replaceRecent(db, "recentMessagesByChain", String(Date.now()), `${chain}:${traceId}`, timestamp );
sqlite.deleteRow(db, "recentMessagesByChain", `${oldest.key}`);
}
return;
}
//add messages if tables not full
sqlite.replaceRecent(db, "recentMessages", String(Date.now()), `${traceId}`, timestamp);

if (chain) sqlite.replaceRecent(db, "recentMessagesByChain", String(Date.now()), `${chain}:${traceId}`, timestamp );
return;
}


export function recentMessagesEndpoint(db: any, chain?: string, moduleHash?: string, limit?: number, distinct?: string, sortBy?: string) {

let messages = sqlite.selectAllRecent(db, "recentMessages", "*", sortBy, limit);

if (distinct) messages = fetchDistinct(distinct, messages, db, chain, sortBy, limit);
if (chain) messages = sqlite.selectAllRecent(db, "recentMessagesByChain", "*", sortBy, limit).filter((message: any) => message.value.includes(chain));
if (moduleHash) messages = messages.filter((message: any) => message.value.includes(moduleHash));

return toJSON(messages);
}

function fetchDistinct(distinct?: string, messages?: any, db?: any, chain?: string, sortBy?: string, limit?: number) {
let chainList = sqlite.selectAll(db, "chain");
if (distinct === "true") {

let distinctChain = [];
for (let i = 0; i < chainList.length; i++) {
let chainName = chainList[i].key;

messages = sqlite.selectAllRecent(db, "recentMessagesByChain", "*", sortBy, limit)
messages = messages.filter((message: any) => message.value.includes(chainName));

distinctChain.push(messages[0]);
chainList.slice(i, 1);
}
let result = distinctChain.filter((notNull) => notNull !== undefined)
return result;
}
return;
}
26 changes: 23 additions & 3 deletions src/sqlite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ export function createDb(filename: string) {
create(db, "moduleHashByChain");
create(db, "traceId");
create(db, "chain");
createInc(db, "connection");
createTime(db, "connection");
createRecent(db, "recentMessages")
createRecent(db, "recentMessagesByChain")
return db;
}

Expand All @@ -28,23 +30,37 @@ export function create(db: Database, table: string) {
return db.run(`CREATE TABLE IF NOT EXISTS ${table} (key TEXT PRIMARY KEY, value TEXT)`);
}

export function createInc(db: Database, table: string) {
export function createTime(db: Database, table: string) {
if ( !table ) throw new Error("table is required");
return db.run(`CREATE TABLE IF NOT EXISTS ${table} (key TEXT PRIMARY KEY, value TEXT, timestamp INTEGER)`);
}

export function createRecent(db: Database, table: string) {
if ( !table ) throw new Error("table is required");
return db.run(`CREATE TABLE IF NOT EXISTS ${table} (key TEXT PRIMARY KEY, value TEXT, payload TEXT)`);
}

export function replace(db: Database, table: string, key: string, value: string|number) {
return db.prepare(`REPLACE INTO ${table} (key, value) VALUES (?, ?)`).run(key, value);
}

export function replaceInc(db: Database, table: string, key: string, value: string|number, timestamp: number) {
export function replaceTime(db: Database, table: string, key: string, value: string|number, timestamp: string|number) {
return db.prepare(`REPLACE INTO ${table} (key, value, timestamp) VALUES (?, ?, ?)`).run(key, value, timestamp);
}

export function replaceRecent(db: Database, table: string, key: string, value: string|number, payload: string|number) {
return db.prepare(`REPLACE INTO ${table} (key, value, payload) VALUES (?, ?, ?)`).run(key, value, payload);
}


export function selectAll(db: Database, table: string) {
return db.query(`SELECT * FROM ${table}`).all() as KV[];
}

export function selectAllRecent(db: Database, table: string, distinct: string, order: string, limit: number) {
return db.query(`SELECT ${distinct} FROM ${table} ORDER BY key ${order} LIMIT ${limit}`).all() as KV[];
}

export function count(db: Database, table: string) {
const result = db.query(`SELECT COUNT(key) FROM ${table}`).all();
return result[0]["COUNT(key)"];
Expand All @@ -65,6 +81,10 @@ export function increment(db: Database, table: string, key: string, value: numbe
return db.prepare(`REPLACE INTO ${table} (key, value, timestamp) VALUES (?, ?, ?)`).run(key, value, timestamp);
}

export function deleteRow(db: Database, table: string, key: string) {
return db.prepare(`DELETE FROM ${table} WHERE key = ?`).run(key);
}



// TO-DO: UPDATE (increment/decrement)
Expand Down
4 changes: 2 additions & 2 deletions src/websocket/open.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { ServerWebSocket } from "bun";
import { logger } from "../logger.js";
import { ServerWebSocketData } from "../../index.js";
import * as prometheus from "../prometheus.js";
import { increment, select, replaceInc, replace } from "../sqlite.js";
import { increment, select, replaceTime, replace } from "../sqlite.js";
import { db } from "../../index.js";

function checkLimit(limitData: any, ws: ServerWebSocket<ServerWebSocketData>) {
Expand All @@ -21,7 +21,7 @@ function checkLimit(limitData: any, ws: ServerWebSocket<ServerWebSocketData>) {
return increment(db, "connection", ws.remoteAddress, 1, limitData[0].timestamp);
}
// If timestamp is more than 5 minutes ago, reset timestamp & counter to 1
return replaceInc(db, "connection", ws.remoteAddress, 1, Date.now());
return replaceTime(db, "connection", ws.remoteAddress, 1, Date.now());
}

export default function (ws: ServerWebSocket<ServerWebSocketData>) {
Expand Down

0 comments on commit b023f2c

Please sign in to comment.