Skip to content
This repository has been archived by the owner on Jul 15, 2024. It is now read-only.

Commit

Permalink
feat: create independent channel (#444)
Browse files Browse the repository at this point in the history
  • Loading branch information
Hazmi35 authored May 19, 2024
1 parent c64b01c commit 4ec2fb9
Showing 1 changed file with 27 additions and 8 deletions.
35 changes: 27 additions & 8 deletions packages/core/src/Structures/Client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ export class Client extends EventEmitter {

options.token ??= process.env.DISCORD_TOKEN;
this.clientId = Buffer.from(options.token!.split(".")[0], "base64").toString();
this.store = drizzle(async (sql, params, method) => this.query(sql, params, method), { schema });
this.store = drizzle(async (sql, params, method) => {
const channel = await this.setupQuery();
return this.query(channel, sql, params, method);
}, { schema });
}

public connect(): void {
Expand All @@ -79,6 +82,22 @@ export class Client extends EventEmitter {
});
}

public async setupQuery(): Promise<Channel> {
let createdChannel: Channel;
return new Promise((resolve, reject) => {
this.amqp.addSetup(async (channel: Channel) => {
createdChannel = channel;
return Promise.all(
[
channel.assertExchange(RabbitMQ.GATEWAY_EXCHANGE, "topic", { durable: false })
]
).catch(reject);
})
.then(() => resolve(createdChannel))
.catch(reject);
});
}

public async resolveMember({ force = false, fetch = true, cache, id, guildId }: { force?: boolean | undefined; fetch?: boolean; cache?: boolean | undefined; id: string; guildId: string; }): Promise<GuildMember | undefined> {
if (force) {
const result = await Result.fromAsync<APIGuildMember>(async () => this.rest.get(Routes.guildMember(guildId, id)) as unknown as Promise<APIGuildMember>);
Expand Down Expand Up @@ -521,15 +540,15 @@ export class Client extends EventEmitter {
return success ? Result.ok<T>({ success } as T) : Result.err(new Error("Failed to publish message"));
}

private async query(sql: string, params: any[], method: "all" | "execute"): Promise<any> {
const { queue: replyTo } = await this.amqp.assertQueue("", { exclusive: true, durable: false, autoDelete: true });
private async query(channel: Channel, sql: string, params: any[], method: "all" | "execute"): Promise<any> {
const { queue: replyTo } = await channel.assertQueue("", { exclusive: true, durable: false, autoDelete: true });
const query = new RoutedQueue(`${GatewayExchangeRoutes.REQUEST}.query`, this.clientId);
let tag = "";

const result = new Promise((resolve, reject) => {
const timeout = setTimeout(async () => {
// Cancel to delete the queue
await this.amqp.cancel(tag);
await channel.cancel(tag);

resolve({
route: "unknown",
Expand All @@ -538,20 +557,20 @@ export class Client extends EventEmitter {
});
}, this.options.queryTimeout ?? 15_000);

this.amqp.consume(replyTo, async message => {
channel.consume(replyTo, async message => {
if (message) {
clearTimeout(timeout);
const payload = JSON.parse(message.content.toString()) as QueryResult;
resolve(payload);
this.amqp.ack(message);
channel.ack(message);

// Cancel to delete the queue
await this.amqp.cancel(tag);
await channel.cancel(tag);
}
}).then(x => tag = x.consumerTag).catch(reject);
});

await this.amqp.publish(RabbitMQ.GATEWAY_EXCHANGE, query.key, Buffer.from(
channel.publish(RabbitMQ.GATEWAY_EXCHANGE, query.key, Buffer.from(
JSON.stringify({ sql, params, method })
), { replyTo });

Expand Down

0 comments on commit 4ec2fb9

Please sign in to comment.