diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 5b0a48a7..7bbc324d 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -205,6 +205,9 @@ importers: discord-api-types: specifier: ^0.37.70 version: 0.37.70 + dockerode: + specifier: ^4.0.2 + version: 4.0.2 dotenv: specifier: ^16.4.5 version: 16.4.5 @@ -243,6 +246,9 @@ importers: '@types/amqplib': specifier: ^0.10.4 version: 0.10.4 + '@types/dockerode': + specifier: ^3.3.23 + version: 3.3.23 '@types/gradient-string': specifier: ^1.1.5 version: 1.1.5 diff --git a/services/kanao-cache/package.json b/services/kanao-cache/package.json index 858dd449..e7439350 100644 --- a/services/kanao-cache/package.json +++ b/services/kanao-cache/package.json @@ -30,6 +30,7 @@ "amqp-connection-manager": "^4.1.14", "amqplib": "^0.10.3", "discord-api-types": "^0.37.70", + "dockerode": "^4.0.2", "dotenv": "^16.4.5", "drizzle-orm": "^0.29.3", "gradient-string": "^2.0.2", @@ -42,6 +43,7 @@ "@swc/cli": "^0.3.9", "@swc/core": "^1.4.2", "@types/amqplib": "^0.10.4", + "@types/dockerode": "^3.3.23", "@types/gradient-string": "^1.1.5", "@types/node": "^20.11.19", "@types/pg": "^8.11.0", diff --git a/services/kanao-cache/src/Listeners/Caches/DispatchListener.ts b/services/kanao-cache/src/Listeners/Caches/DispatchListener.ts index 6ceeffe5..217c8fe9 100644 --- a/services/kanao-cache/src/Listeners/Caches/DispatchListener.ts +++ b/services/kanao-cache/src/Listeners/Caches/DispatchListener.ts @@ -15,6 +15,7 @@ export class ReadyListener extends Listener { } public async run(payload: { shardId: number; data: { data: GatewayDispatchPayload; }; }): Promise { + console.log(payload.data.data); switch (payload.data.data.t) { case GatewayDispatchEvents.GuildCreate: case GatewayDispatchEvents.ChannelCreate: diff --git a/services/kanao-cache/src/Structures/KanaoCache.ts b/services/kanao-cache/src/Structures/KanaoCache.ts index 665b9e73..a8b37546 100644 --- a/services/kanao-cache/src/Structures/KanaoCache.ts +++ b/services/kanao-cache/src/Structures/KanaoCache.ts @@ -34,15 +34,13 @@ export class KanaoCache extends EventEmitter { } public async setupAmqp(channel: Channel): Promise { - await channel.assertExchange("nezu-gateway.cache", "direct", { durable: false }); - const { queue } = await channel.assertQueue("", { exclusive: true }); + await channel.prefetch(1); + const { queue } = await channel.assertQueue("kanao-cache.receive", { durable: true }); - await channel.bindQueue(queue, "nezu-gateway.cache", clientId); - - this.logger.info("Successfully bind to gateway exchange !"); + this.logger.info("Successfully bind to cache queue!"); await channel.consume(queue, message => { - if (message) { + if (message && message.properties.replyTo === clientId) { channel.ack(message); this.emit("dispatch", JSON.parse(message.content.toString())); } diff --git a/services/kanao-cache/src/config.ts b/services/kanao-cache/src/config.ts index 962ebffa..0df16cea 100644 --- a/services/kanao-cache/src/config.ts +++ b/services/kanao-cache/src/config.ts @@ -1,6 +1,42 @@ +/* eslint-disable consistent-return */ import { Buffer } from "node:buffer"; import process from "node:process"; import { URL } from "node:url"; +import { Result } from "@sapphire/result"; +import { chunk, range } from "@sapphire/utilities"; +import Dockerode from "dockerode"; + +export const getShardCount = async (): Promise<{ end: number | undefined; start: number; } | null | undefined> => { + const replicaCount = process.env.GATEWAY_REPLICA_COUNT === undefined ? null : Number(process.env.GATEWAY_REPLICA_COUNT); + if (replicaCount !== undefined && (replicaCount !== null) && replicaCount > 1) { + const result = await Result.fromAsync(async (): Promise<{ end: number | undefined; start: number; } | null | undefined> => { + const docker = new Dockerode(); + const container = docker.getContainer(process.env.HOSTNAME!); + const inspect = await container.inspect(); + const gatewayShardCount = process.env.GATEWAY_SHARD_COUNT === undefined ? null : Number(process.env.GATEWAY_SHARD_COUNT); + const gatewayShardCountPerReplica = process.env.GATEWAY_SHARD_COUNT_PER_REPLICA === undefined ? null : Number(process.env.GATEWAY_SHARD_COUNT_PER_REPLICA); + if (gatewayShardCount !== null && gatewayShardCountPerReplica !== null) { + const shards = gatewayShardCount >= 2 ? range(0, gatewayShardCount, 1) : [0]; + const chunks = chunk(shards, gatewayShardCountPerReplica); + const parts = inspect.Name.split("-"); + const replicaId = Number(parts.at(-1) ?? 1) - 1; + const shardIds = chunks[replicaId]; + return { + end: shardIds.at(-1), + start: shardIds[0] + }; + } + }); + if (result.isOk()) return result.unwrap(); + } + + return process.env.GATEWAY_SHARD_START !== undefined && process.env.GATEWAY_SHARD_END !== undefined + ? { + start: Number(process.env.GATEWAY_SHARD_START), + end: Number(process.env.GATEWAY_SHARD_END) + } + : null; +}; export const storeLogs = process.env.STORE_LOGS === "true"; export const lokiHost = process.env.LOKI_HOST === undefined ? undefined : new URL(process.env.LOKI_HOST); diff --git a/services/kanao-gateway/src/Listeners/DispatchListener.ts b/services/kanao-gateway/src/Listeners/DispatchListener.ts index df249651..55b4fdde 100644 --- a/services/kanao-gateway/src/Listeners/DispatchListener.ts +++ b/services/kanao-gateway/src/Listeners/DispatchListener.ts @@ -13,6 +13,6 @@ export class DispatchListener extends Listener { } public async run(payload: { shardId: number; data: { data: GatewayDispatchPayload; }; }): Promise { - await this.store.amqp.publish("nezu-gateway.cache", clientId, Buffer.from(JSON.stringify({ data: payload.data, shardId: payload.shardId }))); + await this.store.amqp.publish("nezu-gateway.cache", clientId, Buffer.from(JSON.stringify({ data: payload.data, shardId: payload.shardId })), { replyTo: clientId }); } } diff --git a/services/kanao-gateway/src/Structures/KanaoGateway.ts b/services/kanao-gateway/src/Structures/KanaoGateway.ts index 51510d99..88617ff8 100644 --- a/services/kanao-gateway/src/Structures/KanaoGateway.ts +++ b/services/kanao-gateway/src/Structures/KanaoGateway.ts @@ -145,6 +145,10 @@ export class NezuGateway extends EventEmitter { public setupAmqp(): void { const amqpChannel = createAmqpChannel(amqp, { setup: async (channel: Channel) => { + await channel.assertExchange("nezu-gateway.cache", "direct", { durable: true }); + await channel.assertQueue("kanao-cache.receive", { durable: true }); + await channel.bindQueue("kanao-cache.receive", "nezu-gateway.cache", clientId); + await channel.assertExchange(RabbitMQ.GATEWAY_QUEUE_STATS, "topic", { durable: false }); const { queue } = await channel.assertQueue("", { exclusive: true });