Skip to content
This repository has been archived by the owner on Jul 15, 2024. It is now read-only.

Commit

Permalink
feat(KanaoCache): seperate channel for cache and rpc (#395)
Browse files Browse the repository at this point in the history
* feat(KanaoCache): seperate channel for cache and rpc

This allows us to set different prefetch option.
Cache uses prefetch with the value of 1 to allow fair queue consuming across multiple cache workers

* fix: rename func
  • Loading branch information
Hazmi35 authored Feb 25, 2024
1 parent fb3da2f commit 520c865
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,6 @@ export class DispatchListener extends Listener {
const routing = new RoutedQueue(GatewayExchangeRoutes.DISPATCH, clientId)
.shard(payload.shardId);

await container.client.amqp.publish(RabbitMQ.GATEWAY_EXCHANGE, routing.key, Buffer.from(JSON.stringify(payload.data)));
await container.client.cacheQueue.publish(RabbitMQ.GATEWAY_EXCHANGE, routing.key, Buffer.from(JSON.stringify(payload.data)));
}
}
16 changes: 12 additions & 4 deletions services/kanao-cache/src/Structures/KanaoCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,12 @@ import { createLogger } from "../Utilities/Logger.js";
import { clientId, storeLogs, lokiHost, databaseUrl, amqp, databaseConnectionLimit } from "../config.js";

export class KanaoCache extends EventEmitter {
public amqp = createAmqpChannel(amqp, {
setup: async (channel: Channel) => this.setupAmqp(channel)
public cacheQueue = createAmqpChannel(amqp, {
setup: async (channel: Channel) => this.setupCacheQueue(channel)
});

public rpcQueue = createAmqpChannel(amqp, {
setup: async (channel: Channel) => this.setupRpc(channel)
});

public logger = createLogger("kanao-cache", clientId, storeLogs, lokiHost);
Expand All @@ -34,12 +38,14 @@ export class KanaoCache extends EventEmitter {
await this.stores.load();
}

public async setupAmqp(channel: Channel): Promise<void> {
public async setupCacheQueue(channel: Channel): Promise<void> {
await channel.assertExchange(RabbitMQ.GATEWAY_EXCHANGE, "topic", { durable: false });

// Used for receiving receive events from the gateway
const routingKey = new RoutedQueue(GatewayExchangeRoutes.RECEIVE, clientId, "cache");
const { queue } = await channel.assertQueue(routingKey.queue, { durable: false });

await channel.prefetch(1);
await channel.bindQueue(queue, "kanao-gateway", routingKey.key);
await channel.consume(queue, message => {
if (message) {
Expand All @@ -51,7 +57,9 @@ export class KanaoCache extends EventEmitter {
});

this.logger.info(`Successfully bind queue ${queue} to exchange kanao-gateway with routing key ${routingKey.key}`);
}

public async setupRpc(channel: Channel): Promise<void> {
// Used for Counts RPC
const rpc = new RoutedQueue(`${GatewayExchangeRoutes.REQUEST}.counts`, clientId, "cache-rpc");
await channel.assertQueue(rpc.queue, { durable: false });
Expand All @@ -68,7 +76,7 @@ export class KanaoCache extends EventEmitter {
.then(rows => rows[0].count);

channel.ack(message);
await this.amqp.sendToQueue(content.replyTo, Buffer.from(
await this.cacheQueue.sendToQueue(content.replyTo, Buffer.from(
JSON.stringify({ route: rpc.key, clientId, guilds, users, channels })
), { correlationId: message.properties.correlationId as string });
}
Expand Down

0 comments on commit 520c865

Please sign in to comment.