Skip to content
This repository has been archived by the owner on Jul 15, 2024. It is now read-only.

Commit

Permalink
fix(ProcessBootstrapper):correctly send to the correct shard process
Browse files Browse the repository at this point in the history
  • Loading branch information
Hazmi35 authored Feb 24, 2024
1 parent 4e6f264 commit 54d5912
Showing 1 changed file with 14 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ export class ProcessBootstrapper {
await this.database.pragma("journal_mode = WAL");
migrate(this.drizzle, { migrationsFolder: "dist/drizzle" });

this.setupAmqp(); await this.stores.load();
const bindQueueForShard = this.setupAmqp();
await this.stores.load();

// Start by initializing the shards
for (const shardId of this.data.shardIds) {
Expand All @@ -79,6 +80,7 @@ export class ProcessBootstrapper {
}

await options.shardCallback?.(shard);
await bindQueueForShard(shardId);
this.shards.set(shardId, shard);
}

Expand All @@ -95,19 +97,14 @@ export class ProcessBootstrapper {
}
}

public setupAmqp(): void {
public setupAmqp(): (shardId: number) => Promise<void> {
const routing = new RoutedQueue(GatewayExchangeRoutes.SEND, clientId, `gateway-${replicaId}-${this.data.processId}`);

const amqpChannel = createAmqpChannel(amqp, {
setup: async (channel: Channel) => {
await channel.assertExchange(RabbitMQ.GATEWAY_EXCHANGE, "topic", { durable: false });

const routing = new RoutedQueue(GatewayExchangeRoutes.SEND, clientId, `gateway-${replicaId}`);
const { queue } = await channel.assertQueue(routing.queue, { durable: false });

for (const shard of this.data.shardIds) {
await channel.bindQueue(queue, RabbitMQ.GATEWAY_EXCHANGE, routing.shard(shard).key);
}

await channel.consume(queue, async m => this.onConsumeMessage(channel, m));
await channel.assertQueue(routing.queue, { durable: false });
await channel.consume(routing.queue, async m => this.onConsumeMessage(channel, m));
}
});

Expand All @@ -123,18 +120,23 @@ export class ProcessBootstrapper {
amqp: amqpChannel
})
);

return async (shardId: number) => amqpChannel.bindQueue(routing.queue, RabbitMQ.GATEWAY_EXCHANGE, routing.shard(shardId).key);
}

public async onConsumeMessage(channel: Channel, message: ConsumeMessage | null): Promise<void> {
if (!message) return;
const content = JSON.parse(message.content.toString()) as { op: ShardOp; data: unknown; };
const shardId = ShardedRoutedQueue.routingKeyToShardId(message.fields.routingKey);

this.logger.debug(content, `Received message from AMQP to shard ${shardId}`);
this.logger.debug(`Shard ${[...this.shards.keys()].join(", ")}`);

if (shardId === null) return;
if (!this.shards.has(shardId)) return;

channel.ack(message);
this.logger.debug(content, `Processing message from AMQP to shard ${shardId}`);

switch (content.op) {
case ShardOp.SEND: {
const shard = this.shards.get(shardId);
Expand Down

0 comments on commit 54d5912

Please sign in to comment.