diff --git a/packages/core/src/Structures/Client.ts b/packages/core/src/Structures/Client.ts index 210dffff..c57d4d1a 100644 --- a/packages/core/src/Structures/Client.ts +++ b/packages/core/src/Structures/Client.ts @@ -52,7 +52,10 @@ export class Client extends EventEmitter { options.token ??= process.env.DISCORD_TOKEN; this.clientId = Buffer.from(options.token!.split(".")[0], "base64").toString(); - this.store = drizzle(async (sql, params, method) => this.query(sql, params, method), { schema }); + this.store = drizzle(async (sql, params, method) => { + const channel = await this.setupQuery(); + return this.query(channel, sql, params, method); + }, { schema }); } public connect(): void { @@ -79,6 +82,22 @@ export class Client extends EventEmitter { }); } + public async setupQuery(): Promise { + let createdChannel: Channel; + return new Promise((resolve, reject) => { + this.amqp.addSetup(async (channel: Channel) => { + createdChannel = channel; + return Promise.all( + [ + channel.assertExchange(RabbitMQ.GATEWAY_EXCHANGE, "topic", { durable: false }) + ] + ).catch(reject); + }) + .then(() => resolve(createdChannel)) + .catch(reject); + }); + } + public async resolveMember({ force = false, fetch = true, cache, id, guildId }: { force?: boolean | undefined; fetch?: boolean; cache?: boolean | undefined; id: string; guildId: string; }): Promise { if (force) { const result = await Result.fromAsync(async () => this.rest.get(Routes.guildMember(guildId, id)) as unknown as Promise); @@ -521,15 +540,15 @@ export class Client extends EventEmitter { return success ? Result.ok({ success } as T) : Result.err(new Error("Failed to publish message")); } - private async query(sql: string, params: any[], method: "all" | "execute"): Promise { - const { queue: replyTo } = await this.amqp.assertQueue("", { exclusive: true, durable: false, autoDelete: true }); + private async query(channel: Channel, sql: string, params: any[], method: "all" | "execute"): Promise { + const { queue: replyTo } = await channel.assertQueue("", { exclusive: true, durable: false, autoDelete: true }); const query = new RoutedQueue(`${GatewayExchangeRoutes.REQUEST}.query`, this.clientId); let tag = ""; const result = new Promise((resolve, reject) => { const timeout = setTimeout(async () => { // Cancel to delete the queue - await this.amqp.cancel(tag); + await channel.cancel(tag); resolve({ route: "unknown", @@ -538,20 +557,20 @@ export class Client extends EventEmitter { }); }, this.options.queryTimeout ?? 15_000); - this.amqp.consume(replyTo, async message => { + channel.consume(replyTo, async message => { if (message) { clearTimeout(timeout); const payload = JSON.parse(message.content.toString()) as QueryResult; resolve(payload); - this.amqp.ack(message); + channel.ack(message); // Cancel to delete the queue - await this.amqp.cancel(tag); + await channel.cancel(tag); } }).then(x => tag = x.consumerTag).catch(reject); }); - await this.amqp.publish(RabbitMQ.GATEWAY_EXCHANGE, query.key, Buffer.from( + channel.publish(RabbitMQ.GATEWAY_EXCHANGE, query.key, Buffer.from( JSON.stringify({ sql, params, method }) ), { replyTo });