Skip to content
This repository has been archived by the owner on Jul 15, 2024. It is now read-only.

Commit

Permalink
fix: properly bind queue
Browse files Browse the repository at this point in the history
  • Loading branch information
KagChi committed Feb 24, 2024
1 parent eabf974 commit 738080c
Showing 1 changed file with 6 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ export class ProcessBootstrapper {
await this.database.pragma("journal_mode = WAL");
migrate(this.drizzle, { migrationsFolder: "dist/drizzle" });

const bindQueueForShard = this.setupAmqp();
await this.stores.load();
this.setupAmqp(); await this.stores.load();

// Start by initializing the shards
for (const shardId of this.data.shardIds) {
Expand All @@ -80,7 +79,6 @@ export class ProcessBootstrapper {
}

await options.shardCallback?.(shard);
await bindQueueForShard(shardId);
this.shards.set(shardId, shard);
}

Expand All @@ -97,14 +95,18 @@ export class ProcessBootstrapper {
}
}

public setupAmqp(): (shardId: number) => Promise<void> {
public setupAmqp(): void {
const routing = new RoutedQueue(GatewayExchangeRoutes.SEND, clientId, `gateway-${replicaId}-${this.data.processId}`);

const amqpChannel = createAmqpChannel(amqp, {
setup: async (channel: Channel) => {
await channel.assertExchange(RabbitMQ.GATEWAY_EXCHANGE, "topic", { durable: false });
await channel.assertQueue(routing.queue, { durable: false, autoDelete: true });
await channel.consume(routing.queue, async m => this.onConsumeMessage(channel, m));

await Promise.all(
this.data.shardIds.map(async shardId => channel.bindQueue(routing.queue, RabbitMQ.GATEWAY_EXCHANGE, routing.shard(shardId).key))
);
}
});

Expand All @@ -120,8 +122,6 @@ export class ProcessBootstrapper {
amqp: amqpChannel
})
);

return async (shardId: number) => amqpChannel.bindQueue(routing.queue, RabbitMQ.GATEWAY_EXCHANGE, routing.shard(shardId).key);
}

public async onConsumeMessage(channel: Channel, message: ConsumeMessage | null): Promise<void> {
Expand Down

0 comments on commit 738080c

Please sign in to comment.