From 520c86566ae8b3ff93b06a5ddea232f2812b4e20 Mon Sep 17 00:00:00 2001 From: hzmi Date: Sun, 25 Feb 2024 13:10:54 +0700 Subject: [PATCH] feat(KanaoCache): seperate channel for cache and rpc (#395) * feat(KanaoCache): seperate channel for cache and rpc This allows us to set different prefetch option. Cache uses prefetch with the value of 1 to allow fair queue consuming across multiple cache workers * fix: rename func --- .../src/Listeners/Caches/DispatchListener.ts | 2 +- .../kanao-cache/src/Structures/KanaoCache.ts | 16 ++++++++++++---- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/services/kanao-cache/src/Listeners/Caches/DispatchListener.ts b/services/kanao-cache/src/Listeners/Caches/DispatchListener.ts index 9e57afde..702338aa 100644 --- a/services/kanao-cache/src/Listeners/Caches/DispatchListener.ts +++ b/services/kanao-cache/src/Listeners/Caches/DispatchListener.ts @@ -48,6 +48,6 @@ export class DispatchListener extends Listener { const routing = new RoutedQueue(GatewayExchangeRoutes.DISPATCH, clientId) .shard(payload.shardId); - await container.client.amqp.publish(RabbitMQ.GATEWAY_EXCHANGE, routing.key, Buffer.from(JSON.stringify(payload.data))); + await container.client.cacheQueue.publish(RabbitMQ.GATEWAY_EXCHANGE, routing.key, Buffer.from(JSON.stringify(payload.data))); } } diff --git a/services/kanao-cache/src/Structures/KanaoCache.ts b/services/kanao-cache/src/Structures/KanaoCache.ts index d843ccc9..b0586305 100644 --- a/services/kanao-cache/src/Structures/KanaoCache.ts +++ b/services/kanao-cache/src/Structures/KanaoCache.ts @@ -13,8 +13,12 @@ import { createLogger } from "../Utilities/Logger.js"; import { clientId, storeLogs, lokiHost, databaseUrl, amqp, databaseConnectionLimit } from "../config.js"; export class KanaoCache extends EventEmitter { - public amqp = createAmqpChannel(amqp, { - setup: async (channel: Channel) => this.setupAmqp(channel) + public cacheQueue = createAmqpChannel(amqp, { + setup: async (channel: Channel) => this.setupCacheQueue(channel) + }); + + public rpcQueue = createAmqpChannel(amqp, { + setup: async (channel: Channel) => this.setupRpc(channel) }); public logger = createLogger("kanao-cache", clientId, storeLogs, lokiHost); @@ -34,12 +38,14 @@ export class KanaoCache extends EventEmitter { await this.stores.load(); } - public async setupAmqp(channel: Channel): Promise { + public async setupCacheQueue(channel: Channel): Promise { await channel.assertExchange(RabbitMQ.GATEWAY_EXCHANGE, "topic", { durable: false }); // Used for receiving receive events from the gateway const routingKey = new RoutedQueue(GatewayExchangeRoutes.RECEIVE, clientId, "cache"); const { queue } = await channel.assertQueue(routingKey.queue, { durable: false }); + + await channel.prefetch(1); await channel.bindQueue(queue, "kanao-gateway", routingKey.key); await channel.consume(queue, message => { if (message) { @@ -51,7 +57,9 @@ export class KanaoCache extends EventEmitter { }); this.logger.info(`Successfully bind queue ${queue} to exchange kanao-gateway with routing key ${routingKey.key}`); + } + public async setupRpc(channel: Channel): Promise { // Used for Counts RPC const rpc = new RoutedQueue(`${GatewayExchangeRoutes.REQUEST}.counts`, clientId, "cache-rpc"); await channel.assertQueue(rpc.queue, { durable: false }); @@ -68,7 +76,7 @@ export class KanaoCache extends EventEmitter { .then(rows => rows[0].count); channel.ack(message); - await this.amqp.sendToQueue(content.replyTo, Buffer.from( + await this.cacheQueue.sendToQueue(content.replyTo, Buffer.from( JSON.stringify({ route: rpc.key, clientId, guilds, users, channels }) ), { correlationId: message.properties.correlationId as string }); }