Skip to content
This repository has been archived by the owner on Jul 15, 2024. It is now read-only.

Commit

Permalink
feat: rework querying using pg proxy (#402)
Browse files Browse the repository at this point in the history
* feat: rework querying using pg proxy

* chore: remove unused deps

* fix: commit lock files

* chore: regenerate lockfiles
  • Loading branch information
KagChi authored Apr 24, 2024
1 parent edfb3e2 commit 85d52ab
Show file tree
Hide file tree
Showing 5 changed files with 4,374 additions and 3,412 deletions.
5 changes: 1 addition & 4 deletions packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@
"author": "KagChi",
"license": "GPL-3.0",
"devDependencies": {
"@types/amqplib": "^0.10.5",
"@types/pg": "^8.11.5"
"@types/amqplib": "^0.10.5"
},
"dependencies": {
"@cordis/bitfield": "^1.2.0",
Expand All @@ -47,8 +46,6 @@
"amqp-connection-manager": "^4.1.14",
"discord-api-types": "^0.37.81",
"drizzle-orm": "^0.30.9",
"pg": "^8.11.5",
"postgres": "^3.4.4",
"tslib": "^2.6.2"
}
}
66 changes: 57 additions & 9 deletions packages/core/src/Structures/Client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
/* eslint-disable stylistic/max-len */

import { Buffer } from "node:buffer";
import { randomUUID } from "node:crypto";
import EventEmitter from "node:events";
import process from "node:process";
import { clearTimeout, setTimeout } from "node:timers";
import { URLSearchParams } from "node:url";
import { REST } from "@discordjs/rest";
import { RabbitMQ, GatewayExchangeRoutes } from "@nezuchan/constants";
Expand All @@ -16,11 +18,10 @@ import type { Channel } from "amqplib";
import type { APIChannel, APIGuild, APIGuildMember, APIMessage, APIUser, RESTPostAPIChannelMessageJSONBody } from "discord-api-types/v10";
import { ChannelType, Routes } from "discord-api-types/v10";
import { and, eq, notInArray, sql } from "drizzle-orm";
import type { NodePgDatabase } from "drizzle-orm/node-postgres";
import { drizzle } from "drizzle-orm/node-postgres";
import pg from "pg";
import type { PgRemoteDatabase } from "drizzle-orm/pg-proxy";
import { drizzle } from "drizzle-orm/pg-proxy";
import { Events } from "../Enums/Events.js";
import type { ClientOptions } from "../Typings/index.js";
import type { ClientOptions, QueryResult } from "../Typings/index.js";
import type { BaseChannel } from "./Channels/BaseChannel.js";
import { TextChannel } from "./Channels/TextChannel.js";
import { VoiceChannel } from "./Channels/VoiceChannel.js";
Expand All @@ -32,8 +33,7 @@ import { User } from "./User.js";
import { VoiceState } from "./VoiceState.js";

export class Client extends EventEmitter {
public store: NodePgDatabase<typeof schema>;
public storeBackend: pg.Pool;
public store: PgRemoteDatabase<typeof schema>;
public clientId: string;
public rest = new REST({
api: process.env.HTTP_PROXY ?? process.env.PROXY ?? process.env.NIRN_PROXY ?? "https://discord.com/api",
Expand All @@ -51,11 +51,10 @@ export class Client extends EventEmitter {
this.rest.options.api = options.rest;
}

this.storeBackend = new pg.Pool({ connectionString: options.databaseUrl, max: options.databaseConnectionLimit ?? 10 });
this.store = drizzle(this.storeBackend, { schema });

options.token ??= process.env.DISCORD_TOKEN;
this.clientId = Buffer.from(options.token!.split(".")[0], "base64").toString();

this.store = this.setupPostgresProxy();
}

public connect(): void {
Expand All @@ -66,6 +65,55 @@ export class Client extends EventEmitter {
this.rest.setToken(this.options.token!);
}

public setupPostgresProxy(clientId?: string): PgRemoteDatabase<typeof schema> {
return drizzle(async (sql, params, method) => {
try {
const queueName = `bot:query:${randomUUID()}`;
await this.amqp.addSetup(async (channel: Channel) => channel.assertQueue(queueName, { exclusive: true, durable: false, autoDelete: true }));

const query = new RoutedQueue(`${GatewayExchangeRoutes.REQUEST}.query`, clientId ?? this.clientId);
const correlationId = randomUUID();

const promise = await new Promise<QueryResult>(resolve => {
let tag = "";
const timeout = setTimeout(async () => {
// Cancel to delete the queue
await this.amqp.cancel(tag);

resolve({
route: "unknown",
rows: [],
message: `Query timeout, ${(this.options.queryTimeout ?? 15_000) / 1_000} seconds exceed`
});
}, this.options.queryTimeout ?? 15_000);

void this.amqp.consume(queueName, async message => {
if (message) {
clearTimeout(timeout);
const payload = JSON.parse(message.content.toString()) as QueryResult;
resolve(payload);
this.amqp.ack(message);

// Cancel to delete the queue
await this.amqp.cancel(tag);
}
}).then(consumer => {
tag = consumer.consumerTag;
return consumer;
});
});

await this.amqp.publish(RabbitMQ.GATEWAY_EXCHANGE, query.key, Buffer.from(
JSON.stringify({ sql, params, method })
), { correlationId, replyTo: queueName });

return { rows: promise.rows };
} catch {
return { rows: [] };
}
}, { schema });
}

public async setupAmqp(channel: Channel): Promise<void> {
await channel.assertExchange(RabbitMQ.GATEWAY_EXCHANGE, "topic", { durable: false });

Expand Down
9 changes: 7 additions & 2 deletions packages/core/src/Typings/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,13 @@ export interface ClientOptions {
amqpUrl: string;
shardIds?: number[] | { start: number; end: number; };
shardCount: number;
databaseConnectionLimit?: number;
rest?: string;
databaseUrl: string;
queryTimeout?: number;
instanceName: string;
}

export interface QueryResult {
route: string;
rows: unknown[];
message: string;
}
Loading

0 comments on commit 85d52ab

Please sign in to comment.