Skip to content
This repository has been archived by the owner on Jul 15, 2024. It is now read-only.

Commit

Permalink
fix: send to random gateway cache replica
Browse files Browse the repository at this point in the history
Co-authored-by: hzmi <[email protected]>
  • Loading branch information
KagChi and Hazmi35 committed Feb 22, 2024
1 parent b1b2245 commit e79b9a0
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 7 deletions.
6 changes: 6 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions services/kanao-cache/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
"amqp-connection-manager": "^4.1.14",
"amqplib": "^0.10.3",
"discord-api-types": "^0.37.70",
"dockerode": "^4.0.2",
"dotenv": "^16.4.5",
"drizzle-orm": "^0.29.3",
"gradient-string": "^2.0.2",
Expand All @@ -42,6 +43,7 @@
"@swc/cli": "^0.3.9",
"@swc/core": "^1.4.2",
"@types/amqplib": "^0.10.4",
"@types/dockerode": "^3.3.23",
"@types/gradient-string": "^1.1.5",
"@types/node": "^20.11.19",
"@types/pg": "^8.11.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export class ReadyListener extends Listener {
}

public async run(payload: { shardId: number; data: { data: GatewayDispatchPayload; }; }): Promise<void> {
console.log(payload.data.data);
switch (payload.data.data.t) {
case GatewayDispatchEvents.GuildCreate:
case GatewayDispatchEvents.ChannelCreate:
Expand Down
10 changes: 4 additions & 6 deletions services/kanao-cache/src/Structures/KanaoCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,13 @@ export class KanaoCache extends EventEmitter {
}

public async setupAmqp(channel: Channel): Promise<void> {
await channel.assertExchange("nezu-gateway.cache", "direct", { durable: false });
const { queue } = await channel.assertQueue("", { exclusive: true });
await channel.prefetch(1);
const { queue } = await channel.assertQueue("kanao-cache.receive", { durable: true });

await channel.bindQueue(queue, "nezu-gateway.cache", clientId);

this.logger.info("Successfully bind to gateway exchange !");
this.logger.info("Successfully bind to cache queue!");

await channel.consume(queue, message => {
if (message) {
if (message && message.properties.replyTo === clientId) {
channel.ack(message);
this.emit("dispatch", JSON.parse(message.content.toString()));
}
Expand Down
36 changes: 36 additions & 0 deletions services/kanao-cache/src/config.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,42 @@
/* eslint-disable consistent-return */
import { Buffer } from "node:buffer";
import process from "node:process";
import { URL } from "node:url";
import { Result } from "@sapphire/result";
import { chunk, range } from "@sapphire/utilities";
import Dockerode from "dockerode";

export const getShardCount = async (): Promise<{ end: number | undefined; start: number; } | null | undefined> => {
const replicaCount = process.env.GATEWAY_REPLICA_COUNT === undefined ? null : Number(process.env.GATEWAY_REPLICA_COUNT);
if (replicaCount !== undefined && (replicaCount !== null) && replicaCount > 1) {
const result = await Result.fromAsync(async (): Promise<{ end: number | undefined; start: number; } | null | undefined> => {
const docker = new Dockerode();
const container = docker.getContainer(process.env.HOSTNAME!);
const inspect = await container.inspect();
const gatewayShardCount = process.env.GATEWAY_SHARD_COUNT === undefined ? null : Number(process.env.GATEWAY_SHARD_COUNT);
const gatewayShardCountPerReplica = process.env.GATEWAY_SHARD_COUNT_PER_REPLICA === undefined ? null : Number(process.env.GATEWAY_SHARD_COUNT_PER_REPLICA);
if (gatewayShardCount !== null && gatewayShardCountPerReplica !== null) {
const shards = gatewayShardCount >= 2 ? range(0, gatewayShardCount, 1) : [0];
const chunks = chunk(shards, gatewayShardCountPerReplica);
const parts = inspect.Name.split("-");
const replicaId = Number(parts.at(-1) ?? 1) - 1;
const shardIds = chunks[replicaId];
return {
end: shardIds.at(-1),
start: shardIds[0]
};
}
});
if (result.isOk()) return result.unwrap();
}

return process.env.GATEWAY_SHARD_START !== undefined && process.env.GATEWAY_SHARD_END !== undefined
? {
start: Number(process.env.GATEWAY_SHARD_START),
end: Number(process.env.GATEWAY_SHARD_END)
}
: null;
};

export const storeLogs = process.env.STORE_LOGS === "true";
export const lokiHost = process.env.LOKI_HOST === undefined ? undefined : new URL(process.env.LOKI_HOST);
Expand Down
2 changes: 1 addition & 1 deletion services/kanao-gateway/src/Listeners/DispatchListener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@ export class DispatchListener extends Listener {
}

public async run(payload: { shardId: number; data: { data: GatewayDispatchPayload; }; }): Promise<void> {
await this.store.amqp.publish("nezu-gateway.cache", clientId, Buffer.from(JSON.stringify({ data: payload.data, shardId: payload.shardId })));
await this.store.amqp.publish("nezu-gateway.cache", clientId, Buffer.from(JSON.stringify({ data: payload.data, shardId: payload.shardId })), { replyTo: clientId });
}
}
4 changes: 4 additions & 0 deletions services/kanao-gateway/src/Structures/KanaoGateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ export class NezuGateway extends EventEmitter {
public setupAmqp(): void {
const amqpChannel = createAmqpChannel(amqp, {
setup: async (channel: Channel) => {
await channel.assertExchange("nezu-gateway.cache", "direct", { durable: true });
await channel.assertQueue("kanao-cache.receive", { durable: true });
await channel.bindQueue("kanao-cache.receive", "nezu-gateway.cache", clientId);

await channel.assertExchange(RabbitMQ.GATEWAY_QUEUE_STATS, "topic", { durable: false });

const { queue } = await channel.assertQueue("", { exclusive: true });
Expand Down

0 comments on commit e79b9a0

Please sign in to comment.