Skip to content
This repository has been archived by the owner on Jul 15, 2024. It is now read-only.

Commit

Permalink
fix(Client): fix drizzle queries
Browse files Browse the repository at this point in the history
  • Loading branch information
Hazmi35 authored May 6, 2024
1 parent 8cf463e commit 99b2d38
Showing 1 changed file with 38 additions and 52 deletions.
90 changes: 38 additions & 52 deletions packages/core/src/Structures/Client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
/* eslint-disable stylistic/max-len */

import { Buffer } from "node:buffer";
import { randomUUID } from "node:crypto";
import EventEmitter from "node:events";
import process from "node:process";
import { clearTimeout, setTimeout } from "node:timers";
Expand Down Expand Up @@ -53,8 +52,7 @@ export class Client extends EventEmitter {

options.token ??= process.env.DISCORD_TOKEN;
this.clientId = Buffer.from(options.token!.split(".")[0], "base64").toString();

this.store = this.setupPostgresProxy();
this.store = drizzle(async (sql, params, method) => this.query(sql, params, method), { schema });
}

public connect(): void {
Expand All @@ -65,55 +63,6 @@ export class Client extends EventEmitter {
this.rest.setToken(this.options.token!);
}

public setupPostgresProxy(clientId?: string): PgRemoteDatabase<typeof schema> {
return drizzle(async (sql, params, method) => {
try {
const queueName = `bot:query:${randomUUID()}`;
await this.amqp.addSetup(async (channel: Channel) => channel.assertQueue(queueName, { exclusive: true, durable: false, autoDelete: true }));

const query = new RoutedQueue(`${GatewayExchangeRoutes.REQUEST}.query`, clientId ?? this.clientId);
const correlationId = randomUUID();

const promise = await new Promise<QueryResult>(resolve => {
let tag = "";
const timeout = setTimeout(async () => {
// Cancel to delete the queue
await this.amqp.cancel(tag);

resolve({
route: "unknown",
rows: [],
message: `Query timeout, ${(this.options.queryTimeout ?? 15_000) / 1_000} seconds exceed`
});
}, this.options.queryTimeout ?? 15_000);

void this.amqp.consume(queueName, async message => {
if (message) {
clearTimeout(timeout);
const payload = JSON.parse(message.content.toString()) as QueryResult;
resolve(payload);
this.amqp.ack(message);

// Cancel to delete the queue
await this.amqp.cancel(tag);
}
}).then(consumer => {
tag = consumer.consumerTag;
return consumer;
});
});

await this.amqp.publish(RabbitMQ.GATEWAY_EXCHANGE, query.key, Buffer.from(
JSON.stringify({ sql, params, method })
), { correlationId, replyTo: queueName });

return { rows: promise.rows };
} catch {
return { rows: [] };
}
}, { schema });
}

public async setupAmqp(channel: Channel): Promise<void> {
await channel.assertExchange(RabbitMQ.GATEWAY_EXCHANGE, "topic", { durable: false });

Expand Down Expand Up @@ -571,4 +520,41 @@ export class Client extends EventEmitter {

return success ? Result.ok<T>({ success } as T) : Result.err(new Error("Failed to publish message"));
}

private async query(sql: string, params: any[], method: "all" | "execute"): Promise<any> {
const { queue: replyTo } = await this.amqp.assertQueue("", { exclusive: true, durable: false, autoDelete: true });
const query = new RoutedQueue(`${GatewayExchangeRoutes.REQUEST}.query`, this.clientId);
let tag = "";

const result = new Promise((resolve, reject) => {
const timeout = setTimeout(async () => {
// Cancel to delete the queue
await this.amqp.cancel(tag);

resolve({
route: "unknown",
rows: [],
message: `Query timeout, ${(this.options.queryTimeout ?? 15_000) / 1_000} seconds exceed`
});
}, this.options.queryTimeout ?? 15_000);

this.amqp.consume(replyTo, async message => {
if (message) {
clearTimeout(timeout);
const payload = JSON.parse(message.content.toString()) as QueryResult;
resolve(payload);
this.amqp.ack(message);

// Cancel to delete the queue
await this.amqp.cancel(tag);
}
}).then(x => tag = x.consumerTag).catch(reject);
});

await this.amqp.publish(RabbitMQ.GATEWAY_EXCHANGE, query.key, Buffer.from(
JSON.stringify({ sql, params, method })
), { replyTo });

return result;
}
}

0 comments on commit 99b2d38

Please sign in to comment.