Skip to content
This repository has been archived by the owner on Jul 15, 2024. It is now read-only.

Commit

Permalink
revert: "fix(ProcessBootstrapper): must send to the correct process"
Browse files Browse the repository at this point in the history
This reverts commit ba41b72.

Testing, idk after that commit we don't receive any message from the gateway
  • Loading branch information
Hazmi35 authored Feb 24, 2024
1 parent ba41b72 commit 4e6f264
Showing 1 changed file with 4 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ export class ProcessBootstrapper {
await this.database.pragma("journal_mode = WAL");
migrate(this.drizzle, { migrationsFolder: "dist/drizzle" });

await this.stores.load();
this.setupAmqp(); await this.stores.load();

// Start by initializing the shards
for (const shardId of this.data.shardIds) {
Expand Down Expand Up @@ -82,8 +82,6 @@ export class ProcessBootstrapper {
this.shards.set(shardId, shard);
}

this.setupAmqp();

// Lastly, start listening to messages from the parent thread
this.setupThreadEvents();

Expand All @@ -102,10 +100,10 @@ export class ProcessBootstrapper {
setup: async (channel: Channel) => {
await channel.assertExchange(RabbitMQ.GATEWAY_EXCHANGE, "topic", { durable: false });

const routing = new RoutedQueue(GatewayExchangeRoutes.SEND, clientId, `gateway-${replicaId}-${this.data.processId}`);
const routing = new RoutedQueue(GatewayExchangeRoutes.SEND, clientId, `gateway-${replicaId}`);
const { queue } = await channel.assertQueue(routing.queue, { durable: false });

for (const shard of this.shards.keys()) {
for (const shard of this.data.shardIds) {
await channel.bindQueue(queue, RabbitMQ.GATEWAY_EXCHANGE, routing.shard(shard).key);
}

Expand All @@ -132,6 +130,7 @@ export class ProcessBootstrapper {
const content = JSON.parse(message.content.toString()) as { op: ShardOp; data: unknown; };
const shardId = ShardedRoutedQueue.routingKeyToShardId(message.fields.routingKey);
this.logger.debug(content, `Received message from AMQP to shard ${shardId}`);
this.logger.debug(`Shard ${[...this.shards.keys()].join(", ")}`);
if (shardId === null) return;
if (!this.shards.has(shardId)) return;
channel.ack(message);
Expand Down

0 comments on commit 4e6f264

Please sign in to comment.