From 738080cb14c3c8ad0493c92a15438ec9b77bc7b6 Mon Sep 17 00:00:00 2001 From: KagChi Date: Sat, 24 Feb 2024 17:56:21 +0700 Subject: [PATCH] fix: properly bind queue --- .../src/Utilities/WebSockets/ProcessBootstrapper.ts | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/services/kanao-gateway/src/Utilities/WebSockets/ProcessBootstrapper.ts b/services/kanao-gateway/src/Utilities/WebSockets/ProcessBootstrapper.ts index 94fdc467..0f6afab0 100644 --- a/services/kanao-gateway/src/Utilities/WebSockets/ProcessBootstrapper.ts +++ b/services/kanao-gateway/src/Utilities/WebSockets/ProcessBootstrapper.ts @@ -53,8 +53,7 @@ export class ProcessBootstrapper { await this.database.pragma("journal_mode = WAL"); migrate(this.drizzle, { migrationsFolder: "dist/drizzle" }); - const bindQueueForShard = this.setupAmqp(); - await this.stores.load(); + this.setupAmqp(); await this.stores.load(); // Start by initializing the shards for (const shardId of this.data.shardIds) { @@ -80,7 +79,6 @@ export class ProcessBootstrapper { } await options.shardCallback?.(shard); - await bindQueueForShard(shardId); this.shards.set(shardId, shard); } @@ -97,7 +95,7 @@ export class ProcessBootstrapper { } } - public setupAmqp(): (shardId: number) => Promise { + public setupAmqp(): void { const routing = new RoutedQueue(GatewayExchangeRoutes.SEND, clientId, `gateway-${replicaId}-${this.data.processId}`); const amqpChannel = createAmqpChannel(amqp, { @@ -105,6 +103,10 @@ export class ProcessBootstrapper { await channel.assertExchange(RabbitMQ.GATEWAY_EXCHANGE, "topic", { durable: false }); await channel.assertQueue(routing.queue, { durable: false, autoDelete: true }); await channel.consume(routing.queue, async m => this.onConsumeMessage(channel, m)); + + await Promise.all( + this.data.shardIds.map(async shardId => channel.bindQueue(routing.queue, RabbitMQ.GATEWAY_EXCHANGE, routing.shard(shardId).key)) + ); } }); @@ -120,8 +122,6 @@ export class ProcessBootstrapper { amqp: amqpChannel }) ); - - return async (shardId: number) => amqpChannel.bindQueue(routing.queue, RabbitMQ.GATEWAY_EXCHANGE, routing.shard(shardId).key); } public async onConsumeMessage(channel: Channel, message: ConsumeMessage | null): Promise {