From 4e6f2644f5311d6764deaba70c1a3b3a178c06bf Mon Sep 17 00:00:00 2001 From: hzmi Date: Sat, 24 Feb 2024 10:06:10 +0000 Subject: [PATCH] revert: "fix(ProcessBootstrapper): must send to the correct process" This reverts commit ba41b72d438f472179f4526e628b99dc773b1352. Testing, idk after that commit we don't receive any message from the gateway --- .../src/Utilities/WebSockets/ProcessBootstrapper.ts | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/services/kanao-gateway/src/Utilities/WebSockets/ProcessBootstrapper.ts b/services/kanao-gateway/src/Utilities/WebSockets/ProcessBootstrapper.ts index a4f25901..49f9ea15 100644 --- a/services/kanao-gateway/src/Utilities/WebSockets/ProcessBootstrapper.ts +++ b/services/kanao-gateway/src/Utilities/WebSockets/ProcessBootstrapper.ts @@ -53,7 +53,7 @@ export class ProcessBootstrapper { await this.database.pragma("journal_mode = WAL"); migrate(this.drizzle, { migrationsFolder: "dist/drizzle" }); - await this.stores.load(); + this.setupAmqp(); await this.stores.load(); // Start by initializing the shards for (const shardId of this.data.shardIds) { @@ -82,8 +82,6 @@ export class ProcessBootstrapper { this.shards.set(shardId, shard); } - this.setupAmqp(); - // Lastly, start listening to messages from the parent thread this.setupThreadEvents(); @@ -102,10 +100,10 @@ export class ProcessBootstrapper { setup: async (channel: Channel) => { await channel.assertExchange(RabbitMQ.GATEWAY_EXCHANGE, "topic", { durable: false }); - const routing = new RoutedQueue(GatewayExchangeRoutes.SEND, clientId, `gateway-${replicaId}-${this.data.processId}`); + const routing = new RoutedQueue(GatewayExchangeRoutes.SEND, clientId, `gateway-${replicaId}`); const { queue } = await channel.assertQueue(routing.queue, { durable: false }); - for (const shard of this.shards.keys()) { + for (const shard of this.data.shardIds) { await channel.bindQueue(queue, RabbitMQ.GATEWAY_EXCHANGE, routing.shard(shard).key); } @@ -132,6 +130,7 @@ export class ProcessBootstrapper { const content = JSON.parse(message.content.toString()) as { op: ShardOp; data: unknown; }; const shardId = ShardedRoutedQueue.routingKeyToShardId(message.fields.routingKey); this.logger.debug(content, `Received message from AMQP to shard ${shardId}`); + this.logger.debug(`Shard ${[...this.shards.keys()].join(", ")}`); if (shardId === null) return; if (!this.shards.has(shardId)) return; channel.ack(message);