From 80a7f37b6583dfbaec6c023bbaba23c684281a9a Mon Sep 17 00:00:00 2001 From: hzmi Date: Thu, 22 Feb 2024 23:54:31 +0700 Subject: [PATCH] feat: cache optimization and guildCreate throttling (#372) --- .../Caches/Channels/ChannelCreateListener.ts | 24 ++++++++++-------- .../Caches/Channels/ChannelUpdateListener.ts | 25 +++++++++++-------- .../GuildMembers/GuildMemberUpdateListener.ts | 1 + .../GuildMembers/GuildMembersChunkListener.ts | 1 + .../Caches/Guilds/GuildCreateListener.ts | 9 +++---- .../kanao-cache/src/Structures/KanaoCache.ts | 18 ++++++++++--- services/kanao-cache/src/config.ts | 3 ++- 7 files changed, 51 insertions(+), 30 deletions(-) diff --git a/services/kanao-cache/src/Listeners/Caches/Channels/ChannelCreateListener.ts b/services/kanao-cache/src/Listeners/Caches/Channels/ChannelCreateListener.ts index 21a4e364..ac814b2b 100644 --- a/services/kanao-cache/src/Listeners/Caches/Channels/ChannelCreateListener.ts +++ b/services/kanao-cache/src/Listeners/Caches/Channels/ChannelCreateListener.ts @@ -18,7 +18,7 @@ export class ChannelCreateListener extends Listener { public async run(payload: { data: GatewayChannelCreateDispatch; shardId: number; }): Promise { if (stateChannels) { - const channel = await this.container.client.drizzle.insert(channels).values({ + await this.container.client.drizzle.insert(channels).values({ id: payload.data.d.id, guildId: "guild_id" in payload.data.d ? payload.data.d.guild_id : null, name: payload.data.d.name, @@ -42,15 +42,19 @@ export class ChannelCreateListener extends Listener { .then(c => c[0]); if ("permission_overwrites" in payload.data.d && payload.data.d.permission_overwrites !== undefined && payload.data.d.permission_overwrites.length > 0) { - await this.container.client.drizzle.insert(channelsOverwrite).values(payload.data.d.permission_overwrites.map(overwrite => ({ - userOrRole: overwrite.id, - channelId: channel.id, - type: overwrite.type, - allow: overwrite.allow, - deny: overwrite.deny - }))).onConflictDoNothing({ - target: [channelsOverwrite.userOrRole, channelsOverwrite.channelId] - }); + for (const overwrite of payload.data.d.permission_overwrites) { + // @ts-expect-error Intended to avoid .map + overwrite.channelId = payload.data.d.id; + + // @ts-expect-error Intended to avoid .map + overwrite.userOrRole = overwrite.id; + } + + await this.container.client.drizzle.insert(channelsOverwrite) + .values(payload.data.d.permission_overwrites) + .onConflictDoNothing({ + target: [channelsOverwrite.userOrRole, channelsOverwrite.channelId] + }); } } diff --git a/services/kanao-cache/src/Listeners/Caches/Channels/ChannelUpdateListener.ts b/services/kanao-cache/src/Listeners/Caches/Channels/ChannelUpdateListener.ts index cd893996..88332c43 100644 --- a/services/kanao-cache/src/Listeners/Caches/Channels/ChannelUpdateListener.ts +++ b/services/kanao-cache/src/Listeners/Caches/Channels/ChannelUpdateListener.ts @@ -18,7 +18,7 @@ export class ChannelUpdateListener extends Listener { } public async run(payload: { data: GatewayChannelUpdateDispatch; shardId: number; }): Promise { - const channel = await this.container.client.drizzle.insert(channels).values({ + await this.container.client.drizzle.insert(channels).values({ id: payload.data.d.id, guildId: "guild_id" in payload.data.d ? payload.data.d.guild_id : null, name: payload.data.d.name, @@ -41,18 +41,23 @@ export class ChannelUpdateListener extends Listener { .returning({ id: channels.id }) .then(c => c[0]); + // TODO [2024-03-01]: Avoid delete all, intelligently delete only the ones that are not in the new payload await this.container.client.drizzle.delete(channelsOverwrite).where(eq(channelsOverwrite.channelId, payload.data.d.id)); if ("permission_overwrites" in payload.data.d && payload.data.d.permission_overwrites !== undefined && payload.data.d.permission_overwrites.length > 0) { - await this.container.client.drizzle.insert(channelsOverwrite).values(payload.data.d.permission_overwrites.map(overwrite => ({ - userOrRole: overwrite.id, - channelId: channel.id, - type: overwrite.type, - allow: overwrite.allow, - deny: overwrite.deny - }))).onConflictDoNothing({ - target: [channelsOverwrite.userOrRole, channelsOverwrite.channelId] - }); + for (const overwrite of payload.data.d.permission_overwrites) { + // @ts-expect-error Intended to avoid .map + overwrite.channelId = payload.data.d.id; + + // @ts-expect-error Intended to avoid .map + overwrite.userOrRole = overwrite.id; + } + + await this.container.client.drizzle.insert(channelsOverwrite) + .values(payload.data.d.permission_overwrites) + .onConflictDoNothing({ + target: [channelsOverwrite.userOrRole, channelsOverwrite.channelId] + }); } await this.container.client.amqp.publish(RabbitMQ.GATEWAY_QUEUE_SEND, RoutingKey(clientId, payload.shardId), Buffer.from(JSON.stringify(payload.data))); diff --git a/services/kanao-cache/src/Listeners/Caches/GuildMembers/GuildMemberUpdateListener.ts b/services/kanao-cache/src/Listeners/Caches/GuildMembers/GuildMemberUpdateListener.ts index cfb95acb..5edfb331 100644 --- a/services/kanao-cache/src/Listeners/Caches/GuildMembers/GuildMemberUpdateListener.ts +++ b/services/kanao-cache/src/Listeners/Caches/GuildMembers/GuildMemberUpdateListener.ts @@ -70,6 +70,7 @@ export class GuildMemberUpdateListener extends Listener { } }); + // TODO [2024-03-01]: Avoid delete all, intelligently delete only the ones that are not in the new payload await this.container.client.drizzle.delete(memberRoles).where(and(eq(memberRoles.memberId, payload.data.d.user.id), eq(memberRoles.guildId, payload.data.d.guild_id))); if (payload.data.d.roles.length > 0) { diff --git a/services/kanao-cache/src/Listeners/Caches/GuildMembers/GuildMembersChunkListener.ts b/services/kanao-cache/src/Listeners/Caches/GuildMembers/GuildMembersChunkListener.ts index 86a44c30..dadff4b7 100644 --- a/services/kanao-cache/src/Listeners/Caches/GuildMembers/GuildMembersChunkListener.ts +++ b/services/kanao-cache/src/Listeners/Caches/GuildMembers/GuildMembersChunkListener.ts @@ -18,6 +18,7 @@ export class GuildMembersChunkListener extends Listener { } public async run(payload: { data: GatewayGuildMembersChunkDispatch; shardId: number; }): Promise { + // TODO [2024-03-01]: Avoid .map for large arrays (unless if it's just string[]) const chunks = chunk(payload.data.d.members, 1_000); for (const memberChunk of chunks) { diff --git a/services/kanao-cache/src/Listeners/Caches/Guilds/GuildCreateListener.ts b/services/kanao-cache/src/Listeners/Caches/Guilds/GuildCreateListener.ts index 152cd6d7..00c1f395 100644 --- a/services/kanao-cache/src/Listeners/Caches/Guilds/GuildCreateListener.ts +++ b/services/kanao-cache/src/Listeners/Caches/Guilds/GuildCreateListener.ts @@ -10,8 +10,6 @@ import { Listener } from "../../../Stores/Listener.js"; import { clientId, guildCreateGcEvery, stateChannels, stateRoles } from "../../../config.js"; export class GuildCreateListener extends Listener { - public count = 0; - public gcEvery = guildCreateGcEvery; public constructor(context: ListenerContext) { super(context, { event: GatewayDispatchEvents.GuildCreate @@ -304,11 +302,10 @@ export class GuildCreateListener extends Listener { Buffer.from(JSON.stringify(payload.data)) ); - this.count++; - - if (global.gc && this.count % this.gcEvery === 0) { - this.logger.info(`Running garbage collection in ${payload.shardId}, ${this.count} Guilds flushed to the database so far`); + if (global.gc && this.container.client.guildsCreateThrottle % guildCreateGcEvery === 0) { + this.logger.info(`Running garbage collection in ${payload.shardId}, ${this.container.client.guildsCreateThrottle} Guilds flushed to the database so far`); global.gc(); + this.container.client.guildsCreateThrottle = 0; } } } diff --git a/services/kanao-cache/src/Structures/KanaoCache.ts b/services/kanao-cache/src/Structures/KanaoCache.ts index 6a0e3330..932066c8 100644 --- a/services/kanao-cache/src/Structures/KanaoCache.ts +++ b/services/kanao-cache/src/Structures/KanaoCache.ts @@ -4,11 +4,13 @@ import * as schema from "@nezuchan/kanao-schema"; import { createAmqpChannel } from "@nezuchan/utilities"; import { StoreRegistry, container } from "@sapphire/pieces"; import type { Channel } from "amqplib"; +import type { GatewayDispatchPayload } from "discord-api-types/v10.js"; +import { GatewayDispatchEvents } from "discord-api-types/v10.js"; import { drizzle } from "drizzle-orm/node-postgres"; import pg from "pg"; import { ListenerStore } from "../Stores/ListenerStore.js"; import { createLogger } from "../Utilities/Logger.js"; -import { clientId, storeLogs, lokiHost, databaseUrl, amqp, databaseConnectionLimit } from "../config.js"; +import { clientId, storeLogs, lokiHost, databaseUrl, amqp, databaseConnectionLimit, guildCreateGcEvery, disableGuildCreateGcThrottle } from "../config.js"; export class KanaoCache extends EventEmitter { public amqp = createAmqpChannel(amqp, { @@ -22,6 +24,8 @@ export class KanaoCache extends EventEmitter { public stores = new StoreRegistry(); + public guildsCreateThrottle = 0; + public async connect(): Promise { container.client = this; await this.pgClient.connect(); @@ -43,8 +47,16 @@ export class KanaoCache extends EventEmitter { await channel.consume(queue, message => { if (message && message.properties.replyTo === clientId) { - channel.ack(message); - this.emit("dispatch", JSON.parse(message.content.toString())); + const payload = JSON.parse(message.content.toString()) as { shardId: number; data: { data: GatewayDispatchPayload; }; }; + + if (payload.data.data.t === GatewayDispatchEvents.GuildCreate && !disableGuildCreateGcThrottle) { + if (this.guildsCreateThrottle++ % guildCreateGcEvery === 0) return; + channel.ack(message); + this.emit("dispatch", payload); + } else { + channel.ack(message); + this.emit("dispatch", payload); + } } }); } diff --git a/services/kanao-cache/src/config.ts b/services/kanao-cache/src/config.ts index 0eb748b8..802c14aa 100644 --- a/services/kanao-cache/src/config.ts +++ b/services/kanao-cache/src/config.ts @@ -19,4 +19,5 @@ export const stateRoles = process.env.STATE_ROLE === "true"; export const stateChannels = process.env.STATE_CHANNEL === "true"; export const stateMessages = process.env.STATE_MESSAGE === "true"; -export const guildCreateGcEvery = Number(process.env.GUILD_CREATE_GC_EVERY ?? 50); +export const guildCreateGcEvery = Number(process.env.GUILD_CREATE_GC_EVERY ?? 150); +export const disableGuildCreateGcThrottle = process.env.DISABLE_GUILD_CREATE_GC_THROTTLE === "true";