From a2cc71355886c21f381b12c0938357b170050cfa Mon Sep 17 00:00:00 2001 From: hzmi Date: Sat, 24 Feb 2024 09:27:31 +0000 Subject: [PATCH] fix(ProcessBootstrapper): only ack if it's a owned shard and use replicaId as appId --- services/kanao-gateway/.example.env | 47 ------------------- .../WebSockets/ProcessBootstrapper.ts | 8 ++-- 2 files changed, 5 insertions(+), 50 deletions(-) delete mode 100644 services/kanao-gateway/.example.env diff --git a/services/kanao-gateway/.example.env b/services/kanao-gateway/.example.env deleted file mode 100644 index 28e04978..00000000 --- a/services/kanao-gateway/.example.env +++ /dev/null @@ -1,47 +0,0 @@ -GATEWAY_LARGE_TRESHOLD=250 -GATEWAY_PRESENCE_NAME=Standalone Gateway -GATEWAY_PRESENCE_TYPE=0 -GATEWAY_INTENTS=0 -GATEWAY_SHARD_START=0 -GATEWAY_SHARD_END=1 -GATEWAY_SHARD_COUNT=1 -GATEWAY_SHARD_COUNT_PER_REPLICA=1 -GATEWAY_SHARDS_PERWORKERS=14 -GATEWAY_HELLO_TIMEOUT=600000 -GATEWAY_READY_TIMEOUT=600000 -GATEWAY_HANDSHAKE_TIMEOUT= -GATEWAY_GUILDS_PER_SHARD= -GATEWAY_RESUME=true - -DISCORD_TOKEN= -HTTP_PROXY= - -DATABASE_URL= - -AMQP_HOST="amqp://guest:guest@localhost" - -STATE_MESSAGE= -STATE_MEMBER= -STATE_VOICE= -STATE_CHANNEL= -STATE_PRESENCE= -STATE_STICKER= -STATE_REACTION= -STATE_EMOJI= -STATE_THREAD= -STATE_USER= -STATE_THREAD_MEMBER= -STATE_ROLE= - -PROMETHEUS_PORT=9090 -PROMETHEUS_ENABLED=false - -STORE_LOGS=true -LOKI_HOST=http://localhost:3100 # (optional) - -REPLICA_ID=0 -REPLICA_COUNT=1 - -# Do a garbage collection on guildCreate after x times -# Useful on startups -GUILD_CREATE_GC_EVERY= \ No newline at end of file diff --git a/services/kanao-gateway/src/Utilities/WebSockets/ProcessBootstrapper.ts b/services/kanao-gateway/src/Utilities/WebSockets/ProcessBootstrapper.ts index 3f2df932..5a901f10 100644 --- a/services/kanao-gateway/src/Utilities/WebSockets/ProcessBootstrapper.ts +++ b/services/kanao-gateway/src/Utilities/WebSockets/ProcessBootstrapper.ts @@ -23,7 +23,7 @@ import { migrate } from "drizzle-orm/better-sqlite3/migrator"; import type { Listener } from "../../Stores/Listener.js"; import { ListenerStore } from "../../Stores/ListenerStore.js"; import * as schema from "../../Structures/DatabaseSchema.js"; -import { discordToken, storeLogs, lokiHost, amqp, clientId } from "../../config.js"; +import { discordToken, storeLogs, lokiHost, amqp, clientId, replicaId } from "../../config.js"; import { createLogger } from "../Logger.js"; import { ProcessContextFetchingStrategy } from "./ProcessContextFetchingStrategy.js"; @@ -100,7 +100,7 @@ export class ProcessBootstrapper { setup: async (channel: Channel) => { await channel.assertExchange(RabbitMQ.GATEWAY_EXCHANGE, "topic", { durable: false }); - const routing = new RoutedQueue(GatewayExchangeRoutes.SEND, clientId, `gateway-${this.data.processId}`); + const routing = new RoutedQueue(GatewayExchangeRoutes.SEND, clientId, `gateway-${replicaId}}`); const { queue } = await channel.assertQueue(routing.queue, { durable: false }); for (const shard of this.data.shardIds) { @@ -127,9 +127,11 @@ export class ProcessBootstrapper { public async onConsumeMessage(channel: Channel, message: ConsumeMessage | null): Promise { if (!message) return; - channel.ack(message); const content = JSON.parse(message.content.toString()) as { op: ShardOp; data: unknown; }; const shardId = ShardedRoutedQueue.routingKeyToShardId(message.fields.routingKey); + if (shardId === null) return; + if (!this.shards.has(shardId)) return; + channel.ack(message); switch (content.op) { case ShardOp.SEND: { const shard = this.shards.get(shardId);