diff --git a/services/kanao-gateway/src/Utilities/WebSockets/ProcessBootstrapper.ts b/services/kanao-gateway/src/Utilities/WebSockets/ProcessBootstrapper.ts index 49f9ea15..a4f25901 100644 --- a/services/kanao-gateway/src/Utilities/WebSockets/ProcessBootstrapper.ts +++ b/services/kanao-gateway/src/Utilities/WebSockets/ProcessBootstrapper.ts @@ -53,7 +53,7 @@ export class ProcessBootstrapper { await this.database.pragma("journal_mode = WAL"); migrate(this.drizzle, { migrationsFolder: "dist/drizzle" }); - this.setupAmqp(); await this.stores.load(); + await this.stores.load(); // Start by initializing the shards for (const shardId of this.data.shardIds) { @@ -82,6 +82,8 @@ export class ProcessBootstrapper { this.shards.set(shardId, shard); } + this.setupAmqp(); + // Lastly, start listening to messages from the parent thread this.setupThreadEvents(); @@ -100,10 +102,10 @@ export class ProcessBootstrapper { setup: async (channel: Channel) => { await channel.assertExchange(RabbitMQ.GATEWAY_EXCHANGE, "topic", { durable: false }); - const routing = new RoutedQueue(GatewayExchangeRoutes.SEND, clientId, `gateway-${replicaId}`); + const routing = new RoutedQueue(GatewayExchangeRoutes.SEND, clientId, `gateway-${replicaId}-${this.data.processId}`); const { queue } = await channel.assertQueue(routing.queue, { durable: false }); - for (const shard of this.data.shardIds) { + for (const shard of this.shards.keys()) { await channel.bindQueue(queue, RabbitMQ.GATEWAY_EXCHANGE, routing.shard(shard).key); } @@ -130,7 +132,6 @@ export class ProcessBootstrapper { const content = JSON.parse(message.content.toString()) as { op: ShardOp; data: unknown; }; const shardId = ShardedRoutedQueue.routingKeyToShardId(message.fields.routingKey); this.logger.debug(content, `Received message from AMQP to shard ${shardId}`); - this.logger.debug(`Shard ${[...this.shards.keys()].join(", ")}`); if (shardId === null) return; if (!this.shards.has(shardId)) return; channel.ack(message);