Skip to content
This repository has been archived by the owner on Jul 15, 2024. It is now read-only.

Commit

Permalink
fix(ProcessBootstrapper): must send to the correct process
Browse files Browse the repository at this point in the history
  • Loading branch information
Hazmi35 authored Feb 24, 2024
1 parent 7f7a073 commit ba41b72
Showing 1 changed file with 5 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ export class ProcessBootstrapper {
await this.database.pragma("journal_mode = WAL");
migrate(this.drizzle, { migrationsFolder: "dist/drizzle" });

this.setupAmqp(); await this.stores.load();
await this.stores.load();

// Start by initializing the shards
for (const shardId of this.data.shardIds) {
Expand Down Expand Up @@ -82,6 +82,8 @@ export class ProcessBootstrapper {
this.shards.set(shardId, shard);
}

this.setupAmqp();

// Lastly, start listening to messages from the parent thread
this.setupThreadEvents();

Expand All @@ -100,10 +102,10 @@ export class ProcessBootstrapper {
setup: async (channel: Channel) => {
await channel.assertExchange(RabbitMQ.GATEWAY_EXCHANGE, "topic", { durable: false });

const routing = new RoutedQueue(GatewayExchangeRoutes.SEND, clientId, `gateway-${replicaId}`);
const routing = new RoutedQueue(GatewayExchangeRoutes.SEND, clientId, `gateway-${replicaId}-${this.data.processId}`);
const { queue } = await channel.assertQueue(routing.queue, { durable: false });

for (const shard of this.data.shardIds) {
for (const shard of this.shards.keys()) {
await channel.bindQueue(queue, RabbitMQ.GATEWAY_EXCHANGE, routing.shard(shard).key);
}

Expand All @@ -130,7 +132,6 @@ export class ProcessBootstrapper {
const content = JSON.parse(message.content.toString()) as { op: ShardOp; data: unknown; };
const shardId = ShardedRoutedQueue.routingKeyToShardId(message.fields.routingKey);
this.logger.debug(content, `Received message from AMQP to shard ${shardId}`);
this.logger.debug(`Shard ${[...this.shards.keys()].join(", ")}`);
if (shardId === null) return;
if (!this.shards.has(shardId)) return;
channel.ack(message);
Expand Down

0 comments on commit ba41b72

Please sign in to comment.