diff --git a/services/kanao-gateway/.example.env b/services/kanao-gateway/.example.env index 8cd37830..578f18db 100644 --- a/services/kanao-gateway/.example.env +++ b/services/kanao-gateway/.example.env @@ -1,43 +1,43 @@ -GATEWAY_LARGE_TRESHOLD = 250 -GATEWAY_PRESENCE_NAME = Standalone Gateway -GATEWAY_PRESENCE_TYPE = 0 -GATEWAY_INTENTS = 0 -GATEWAY_SHARD_START = 0 -GATEWAY_SHARD_END = 1 -GATEWAY_SHARD_COUNT = 1 -GATEWAY_SHARD_COUNT_PER_REPLICA = 1 -GATEWAY_SHARDS_PERWORKERS = 14 -GATEWAY_HELLO_TIMEOUT = 600000 -GATEWAY_READY_TIMEOUT = 600000 -GATEWAY_HANDSHAKE_TIMEOUT = -GATEWAY_GUILDS_PER_SHARD = -GATEWAY_RESUME = true +GATEWAY_LARGE_TRESHOLD=250 +GATEWAY_PRESENCE_NAME=Standalone Gateway +GATEWAY_PRESENCE_TYPE=0 +GATEWAY_INTENTS=0 +GATEWAY_SHARD_START=0 +GATEWAY_SHARD_END=1 +GATEWAY_SHARD_COUNT=1 +GATEWAY_SHARD_COUNT_PER_REPLICA=1 +GATEWAY_SHARDS_PERWORKERS=14 +GATEWAY_HELLO_TIMEOUT=600000 +GATEWAY_READY_TIMEOUT=600000 +GATEWAY_HANDSHAKE_TIMEOUT= +GATEWAY_GUILDS_PER_SHARD= +GATEWAY_RESUME=true -DISCORD_TOKEN = -HTTP_PROXY = +DISCORD_TOKEN= +HTTP_PROXY= -DATABASE_URL = +DATABASE_URL= -AMQP_HOST = "amqp://guest:guest@localhost" +AMQP_HOST="amqp://guest:guest@localhost" -STATE_MESSAGE = -STATE_MEMBER = -STATE_VOICE = -STATE_CHANNEL = -STATE_PRESENCE = -STATE_STICKER = -STATE_REACTION = -STATE_EMOJI = -STATE_THREAD = -STATE_USER = -STATE_THREAD_MEMBER = -STATE_ROLE = +STATE_MESSAGE= +STATE_MEMBER= +STATE_VOICE= +STATE_CHANNEL= +STATE_PRESENCE= +STATE_STICKER= +STATE_REACTION= +STATE_EMOJI= +STATE_THREAD= +STATE_USER= +STATE_THREAD_MEMBER= +STATE_ROLE= -PROMETHEUS_PORT = 9090 -PROMETHEUS_ENABLED = false +PROMETHEUS_PORT=9090 +PROMETHEUS_ENABLED=false -STORE_LOGS = true -LOKI_HOST = http://localhost:3100 # (optional) +STORE_LOGS=true +LOKI_HOST=http://localhost:3100 # (optional) -REPLICA_ID = 0 -REPLICA_COUNT = 1 \ No newline at end of file +REPLICA_ID=0 +REPLICA_COUNT=1 \ No newline at end of file diff --git a/services/kanao-gateway/src/Listeners/Caches/GuildMembers/GuildMembersChunkListener.ts b/services/kanao-gateway/src/Listeners/Caches/GuildMembers/GuildMembersChunkListener.ts index 2cab979b..992dfbbd 100644 --- a/services/kanao-gateway/src/Listeners/Caches/GuildMembers/GuildMembersChunkListener.ts +++ b/services/kanao-gateway/src/Listeners/Caches/GuildMembers/GuildMembersChunkListener.ts @@ -2,8 +2,10 @@ import { Buffer } from "node:buffer"; import { RabbitMQ } from "@nezuchan/constants"; import { memberRoles, members, users } from "@nezuchan/kanao-schema"; import { RoutingKey } from "@nezuchan/utilities"; +import { chunk } from "@sapphire/utilities"; import type { GatewayGuildMembersChunkDispatch } from "discord-api-types/v10"; import { GatewayDispatchEvents } from "discord-api-types/v10"; +import { sql } from "drizzle-orm"; import type { ListenerContext } from "../../../Stores/Listener.js"; import { Listener } from "../../../Stores/Listener.js"; import { clientId, stateMembers, stateUsers } from "../../../config.js"; @@ -16,48 +18,87 @@ export class GuildMembersChunkListener extends Listener { } public async run(payload: { data: GatewayGuildMembersChunkDispatch; shardId: number; }): Promise { - if (stateMembers || stateUsers) { - for (const member of payload.data.d.members) { - if (stateUsers) { - await this.store.drizzle.insert(users).values({ - id: member.user!.id, - username: member.user!.username, - discriminator: member.user?.discriminator ?? null, - globalName: member.user?.global_name ?? null, - avatar: member.user?.avatar ?? null, - bot: member.user?.bot ?? false, - flags: member.user?.flags, - accentColor: member.user?.accent_color, - avatarDecoration: member.user?.avatar_decoration, - banner: member.user?.banner, - locale: member.user?.locale, - mfaEnabled: member.user?.mfa_enabled, - premiumType: member.user?.premium_type, - publicFlags: member.user?.public_flags - }).onConflictDoNothing({ target: users.id }); - } + const chunks = chunk(payload.data.d.members, 1_000); - if (stateMembers) { - await this.store.drizzle.insert(members).values({ - id: member.user!.id, - avatar: member.avatar, - communicationDisabledUntil: member.premium_since, - deaf: member.deaf, - flags: member.flags, - joinedAt: member.joined_at, - mute: member.mute, - nick: member.nick, - pending: member.pending, - premiumSince: member.premium_since - }).onConflictDoNothing({ target: members.id }); - } + for (const memberChunk of chunks) { + if (stateUsers) { + await this.store.drizzle.insert(users) + .values( + memberChunk.map(member => ({ + id: member.user!.id, + username: member.user!.username, + discriminator: member.user?.discriminator ?? null, + globalName: member.user?.global_name ?? null, + avatar: member.user?.avatar ?? null, + bot: member.user?.bot ?? false, + flags: member.user?.flags, + accentColor: member.user?.accent_color, + avatarDecoration: member.user?.avatar_decoration, + banner: member.user?.banner, + locale: member.user?.locale, + mfaEnabled: member.user?.mfa_enabled, + premiumType: member.user?.premium_type, + publicFlags: member.user?.public_flags + })) + ).onConflictDoUpdate({ + target: users.id, + set: { + username: sql`EXCLUDED.username`, + discriminator: sql`EXCLUDED.discriminator`, + globalName: sql`EXCLUDED.global_name`, + avatar: sql`EXCLUDED.avatar`, + bot: sql`EXCLUDED.bot`, + flags: sql`EXCLUDED.flags`, + accentColor: sql`EXCLUDED.accent_color`, + avatarDecoration: sql`EXCLUDED.avatar_decoration`, + banner: sql`EXCLUDED.banner`, + locale: sql`EXCLUDED.locale`, + mfaEnabled: sql`EXCLUDED.mfa_enabled`, + premiumType: sql`EXCLUDED.premium_type`, + publicFlags: sql`EXCLUDED.public_flags` + } + }); + } + + if (stateMembers) { + await this.store.drizzle.insert(members) + .values( + memberChunk.map(member => ({ + id: member.user!.id, + avatar: member.avatar, + communicationDisabledUntil: member.premium_since, + deaf: member.deaf, + flags: member.flags, + joinedAt: member.joined_at, + mute: member.mute, + nick: member.nick, + pending: member.pending, + premiumSince: member.premium_since + })) + ).onConflictDoUpdate({ + target: members.id, + set: { + avatar: sql`EXCLUDED.avatar`, + communicationDisabledUntil: sql`EXCLUDED.premium_since`, + deaf: sql`EXCLUDED.deaf`, + flags: sql`EXCLUDED.flags`, + joinedAt: sql`EXCLUDED.joined_at`, + mute: sql`EXCLUDED.mute`, + nick: sql`EXCLUDED.nick`, + pending: sql`EXCLUDED.pending`, + premiumSince: sql`EXCLUDED.premium_since` + } + }); - if (member.roles.length > 0) { - await this.store.drizzle.insert(memberRoles).values(member.roles.map(role => ({ - memberId: member.user!.id, - roleId: role, - guildId: payload.data.d.guild_id - }))).onConflictDoNothing({ target: [memberRoles.memberId, memberRoles.roleId] }); + for (const member of memberChunk) { + if (member.roles.length > 0) { + await this.store.drizzle.insert(memberRoles) + .values(member.roles.map(role => ({ + memberId: member.user!.id, + roleId: role, + guildId: payload.data.d.guild_id + }))).onConflictDoNothing({ target: [memberRoles.memberId, memberRoles.roleId] }); + } } } } diff --git a/services/kanao-gateway/src/Listeners/Caches/Guilds/GuildCreateListener.ts b/services/kanao-gateway/src/Listeners/Caches/Guilds/GuildCreateListener.ts index 71def62e..d92f6af0 100644 --- a/services/kanao-gateway/src/Listeners/Caches/Guilds/GuildCreateListener.ts +++ b/services/kanao-gateway/src/Listeners/Caches/Guilds/GuildCreateListener.ts @@ -1,7 +1,8 @@ import { Buffer } from "node:buffer"; import { RabbitMQ } from "@nezuchan/constants"; -import { channels, channelsOverwrite, guilds, memberRoles, members, roles, users, voiceStates } from "@nezuchan/kanao-schema"; +import { channels, channelsOverwrite, guilds, memberRoles, roles, users, voiceStates } from "@nezuchan/kanao-schema"; import { RoutingKey } from "@nezuchan/utilities"; +import { chunk } from "@sapphire/utilities"; import type { GatewayGuildCreateDispatch } from "discord-api-types/v10"; import { GatewayDispatchEvents } from "discord-api-types/v10"; import { sql } from "drizzle-orm"; @@ -17,49 +18,15 @@ export class GuildCreateListener extends Listener { } public async run(payload: { data: GatewayGuildCreateDispatch; shardId: number; }): Promise { - if (payload.data.d.unavailable !== undefined && payload.data.d.unavailable) return; + if ( + payload.data.d.unavailable !== undefined && + payload.data.d.unavailable + ) return; - await this.store.drizzle.insert(guilds).values({ - id: payload.data.d.id, - unavailable: payload.data.d.unavailable, - name: payload.data.d.name, - banner: payload.data.d.banner, - owner: payload.data.d.owner, - ownerId: payload.data.d.owner_id, - afkChannelId: payload.data.d.afk_channel_id, - afkTimeout: payload.data.d.afk_timeout, - defaultMessageNotifications: payload.data.d.default_message_notifications, - explicitContentFilter: payload.data.d.explicit_content_filter, - icon: payload.data.d.icon, - mfaLevel: payload.data.d.mfa_level, - region: payload.data.d.region, - systemChannelId: payload.data.d.system_channel_id, - verificationLevel: payload.data.d.verification_level, - widgetChannelId: payload.data.d.widget_channel_id, - widgetEnabled: payload.data.d.widget_enabled, - approximateMemberCount: payload.data.d.approximate_member_count, - approximatePresenceCount: payload.data.d.approximate_presence_count, - description: payload.data.d.description, - discoverySplash: payload.data.d.discovery_splash, - iconHash: payload.data.d.icon_hash, - maxMembers: payload.data.d.max_members, - maxPresences: payload.data.d.max_presences, - premiumSubscriptionCount: payload.data.d.premium_subscription_count, - premiumTier: payload.data.d.premium_tier, - vanityUrlCode: payload.data.d.vanity_url_code, - nsfwLevel: payload.data.d.nsfw_level, - rulesChannelId: payload.data.d.rules_channel_id, - publicUpdatesChannelId: payload.data.d.public_updates_channel_id, - preferredLocale: payload.data.d.preferred_locale, - maxVideoChannelUsers: payload.data.d.max_video_channel_users, - permissions: payload.data.d.permissions, - premiumProgressBarEnabled: payload.data.d.premium_progress_bar_enabled, - safetyAlertChannelId: payload.data.d.safety_alerts_channel_id, - splash: payload.data.d.splash, - systemChannelFlags: payload.data.d.system_channel_flags - }).onConflictDoUpdate({ - target: guilds.id, - set: { + await this.store.drizzle + .insert(guilds) + .values({ + id: payload.data.d.id, unavailable: payload.data.d.unavailable, name: payload.data.d.name, banner: payload.data.d.banner, @@ -96,20 +63,54 @@ export class GuildCreateListener extends Listener { safetyAlertChannelId: payload.data.d.safety_alerts_channel_id, splash: payload.data.d.splash, systemChannelFlags: payload.data.d.system_channel_flags - } - }); + }) + .onConflictDoUpdate({ + target: guilds.id, + set: { + unavailable: payload.data.d.unavailable, + name: payload.data.d.name, + banner: payload.data.d.banner, + owner: payload.data.d.owner, + ownerId: payload.data.d.owner_id, + afkChannelId: payload.data.d.afk_channel_id, + afkTimeout: payload.data.d.afk_timeout, + defaultMessageNotifications: payload.data.d.default_message_notifications, + explicitContentFilter: payload.data.d.explicit_content_filter, + icon: payload.data.d.icon, + mfaLevel: payload.data.d.mfa_level, + region: payload.data.d.region, + systemChannelId: payload.data.d.system_channel_id, + verificationLevel: payload.data.d.verification_level, + widgetChannelId: payload.data.d.widget_channel_id, + widgetEnabled: payload.data.d.widget_enabled, + approximateMemberCount: payload.data.d.approximate_member_count, + approximatePresenceCount: payload.data.d.approximate_presence_count, + description: payload.data.d.description, + discoverySplash: payload.data.d.discovery_splash, + iconHash: payload.data.d.icon_hash, + maxMembers: payload.data.d.max_members, + maxPresences: payload.data.d.max_presences, + premiumSubscriptionCount: payload.data.d.premium_subscription_count, + premiumTier: payload.data.d.premium_tier, + vanityUrlCode: payload.data.d.vanity_url_code, + nsfwLevel: payload.data.d.nsfw_level, + rulesChannelId: payload.data.d.rules_channel_id, + publicUpdatesChannelId: payload.data.d.public_updates_channel_id, + preferredLocale: payload.data.d.preferred_locale, + maxVideoChannelUsers: payload.data.d.max_video_channel_users, + permissions: payload.data.d.permissions, + premiumProgressBarEnabled: payload.data.d.premium_progress_bar_enabled, + safetyAlertChannelId: payload.data.d.safety_alerts_channel_id, + splash: payload.data.d.splash, + systemChannelFlags: payload.data.d.system_channel_flags + } + }); if (stateRoles) { - // TODO [2024-03-01]: Use Batch API Instead - // TODO [2024-03-01]: Set operation length to a constants var - let ops = []; - for (const role of payload.data.d.roles) { - if (ops.length > 500) { - await this.store.drizzle.execute(sql`${ops.join(";")}`); - ops = []; - } - ops.push( - this.store.drizzle.insert(roles).values({ + await this.store.drizzle + .insert(roles) + .values( + payload.data.d.roles.map(role => ({ id: role.id, name: role.name, permissions: role.permissions, @@ -117,175 +118,156 @@ export class GuildCreateListener extends Listener { color: role.color, hoist: role.hoist, guildId: payload.data.d.id - }).onConflictDoUpdate({ - target: roles.id, - set: { - name: role.name, - permissions: role.permissions, - position: role.position, - color: role.color, - hoist: role.hoist - } - }) - .toSQL() - ); - } + })) + ) + .onConflictDoUpdate({ + target: roles.id, + set: { + name: sql`EXCLUDED.name`, + permissions: sql`EXCLUDED.permissions`, + position: sql`EXCLUDED.position`, + color: sql`EXCLUDED.color`, + hoist: sql`EXCLUDED.hoist` + } + }); } + const membersChunk = chunk(payload.data.d.members.filter(member => member.user !== undefined), 1_000); + if (stateUsers) { - let ops = []; - for (const member of payload.data.d.members) { - if (member.user !== undefined) { - if (ops.length > 500) { - await this.store.drizzle.execute(sql`${ops.join(";")}`); - ops = []; - } - ops.push( - this.store.drizzle.insert(users).values({ - id: member.user.id, - username: member.user.username, - discriminator: member.user.discriminator ?? null, - globalName: member.user.global_name ?? null, - avatar: member.user.avatar ?? null, - bot: member.user.bot ?? false, - flags: member.user.flags, - premiumType: member.user.premium_type, - publicFlags: member.user.public_flags - }).onConflictDoUpdate({ - target: users.id, - set: { - username: member.user.username, - discriminator: member.user.discriminator ?? null, - globalName: member.user.global_name ?? null, - avatar: member.user.avatar ?? null, - bot: member.user.bot ?? false, - flags: member.user.flags, - premiumType: member.user.premium_type, - publicFlags: member.user.public_flags - } - }) - .toSQL() - ); - } + for (const members of membersChunk) { + await this.store.drizzle + .insert(users) + .values( + members + .map(member => ({ + id: member.user!.id, + username: member.user!.username, + discriminator: member.user!.discriminator ?? null, + globalName: member.user!.global_name ?? null, + avatar: member.user!.avatar ?? null, + bot: member.user!.bot ?? false, + flags: member.user!.flags, + premiumType: member.user!.premium_type, + publicFlags: member.user!.public_flags + })) + ) + .onConflictDoUpdate({ + target: users.id, + set: { + username: sql`EXCLUDED.username`, + discriminator: sql`EXCLUDED.discriminator`, + globalName: sql`EXCLUDED.global_name`, + avatar: sql`EXCLUDED.avatar`, + bot: sql`EXCLUDED.bot`, + flags: sql`EXCLUDED.flags`, + premiumType: sql`EXCLUDED.premium_type`, + publicFlags: sql`EXCLUDED.public_flags` + } + }); } } if (stateMembers) { - let ops = []; - for (const member of payload.data.d.members) { - if (member.user !== undefined) { - if (ops.length > 500) { - await this.store.drizzle.execute(sql`${ops.join(";")}`); - ops = []; - } - ops.push( - this.store.drizzle.insert(members).values({ - id: member.user.id, - guildId: payload.data.d.id, - avatar: member.avatar, - flags: member.flags, - communicationDisabledUntil: member.communication_disabled_until, - deaf: member.deaf, - joinedAt: member.joined_at, - mute: member.mute, - nick: member.nick, - pending: member.pending, - premiumSince: member.premium_since - }).onConflictDoUpdate({ - target: members.id, - set: { - avatar: member.avatar, - flags: member.flags, - communicationDisabledUntil: member.communication_disabled_until, - deaf: member.deaf, - joinedAt: member.joined_at, - mute: member.mute, - nick: member.nick, - pending: member.pending, - premiumSince: member.premium_since - } - }) - .toSQL() - ); - for (const role of member.roles) { - ops.push( - this.store.drizzle.insert(memberRoles).values({ - memberId: member.user.id, + for (const members of membersChunk) { + await this.store.drizzle + .insert(users) + .values( + members + .map(member => ({ + id: member.user!.id, + username: member.user!.username, + discriminator: member.user!.discriminator ?? null, + globalName: member.user!.global_name ?? null, + avatar: member.user!.avatar ?? null, + bot: member.user!.bot ?? false, + flags: member.user!.flags, + premiumType: member.user!.premium_type, + publicFlags: member.user!.public_flags + })) + ) + .onConflictDoUpdate({ + target: users.id, + set: { + username: sql`EXCLUDED.username`, + discriminator: sql`EXCLUDED.discriminator`, + globalName: sql`EXCLUDED.global_name`, + avatar: sql`EXCLUDED.avatar`, + bot: sql`EXCLUDED.bot`, + flags: sql`EXCLUDED.flags`, + premiumType: sql`EXCLUDED.premium_type`, + publicFlags: sql`EXCLUDED.public_flags` + } + }); + + for (const member of members) { + if (member.roles.length > 0) { + await this.store.drizzle.insert(memberRoles) + .values(member.roles.map(role => ({ + memberId: member.user!.id, roleId: role, guildId: payload.data.d.id - }).onConflictDoNothing({ target: [memberRoles.memberId, memberRoles.roleId] }) - .toSQL() - ); + }))).onConflictDoNothing({ target: [memberRoles.memberId, memberRoles.roleId] }); } } } } if (stateChannels && payload.data.d.channels.length > 0) { - let ops = []; + const chunks = chunk(payload.data.d.channels, 1_000); - for (const channel of payload.data.d.channels) { - if (ops.length > 500) { - await this.store.drizzle.execute(sql`${ops.join(";")}`); - ops = []; - } - ops.push( - this.store.drizzle.insert(channels).values({ - id: channel.id, - guildId: payload.data.d.id, - name: channel.name, - type: channel.type, - flags: channel.flags - }).onConflictDoNothing({ target: channels.id }) - .toSQL() - ); + for (const chChunk of chunks) { + await this.store.drizzle + .insert(channels) + .values( + chChunk.map(channel => ({ + id: channel.id, + guildId: payload.data.d.id, + name: channel.name, + type: channel.type, + flags: channel.flags + })) + ) + .onConflictDoNothing({ target: channels.id }); - if ("permission_overwrites" in channel && channel.permission_overwrites !== undefined && channel.permission_overwrites.length > 0) { - for (const overwrite of channel.permission_overwrites) { - ops.push( - this.store.drizzle.insert(channelsOverwrite).values({ - userOrRole: overwrite.id, - channelId: channel.id, - type: overwrite.type, - allow: overwrite.allow, - deny: overwrite.deny - }).onConflictDoNothing({ target: [channelsOverwrite.userOrRole, channelsOverwrite.channelId] }) - .toSQL() - ); + for (const ch of chChunk) { + if ( + "permission_overwrites" in ch && + (ch.permission_overwrites?.length ?? 0) > 0 + ) { + const overwritesChunk = chunk(ch.permission_overwrites ?? [], 1_000); + for (const overwrites of overwritesChunk) { + await this.store.drizzle + .insert(channelsOverwrite) + .values( + overwrites.map(overwrite => ({ + userOrRole: overwrite.id, + channelId: ch.id, + type: overwrite.type, + allow: overwrite.allow, + deny: overwrite.deny + })) + ) + .onConflictDoNothing({ + target: [channelsOverwrite.userOrRole, channelsOverwrite.channelId] + }); + } } } } } if (stateVoices) { - let ops = []; - for (const voice of payload.data.d.voice_states) { - if (voice.channel_id === null) continue; - - if (ops.length > 500) { - await this.store.drizzle.execute(sql`${ops.join(";")}`); - ops = []; - } - - ops.push( - this.store.drizzle.insert(voiceStates).values({ - channelId: voice.channel_id, - guildId: payload.data.d.id, - sessionId: voice.session_id, - memberId: voice.user_id, - deaf: voice.deaf, - mute: voice.mute, - requestToSpeakTimestamp: voice.request_to_speak_timestamp, - selfDeaf: voice.self_deaf, - selfMute: voice.self_mute, - selfStream: voice.self_stream, - selfVideo: voice.self_video, - suppress: voice.suppress - }).onConflictDoUpdate({ - target: voiceStates.memberId, - set: { - channelId: voice.channel_id, + const chunks = chunk(payload.data.d.voice_states.filter(x => x.channel_id !== null), 1_000); + for (const states of chunks) { + await this.store.drizzle + .insert(voiceStates) + .values( + states.map(voice => ({ + channelId: voice.channel_id!, + guildId: payload.data.d.id, sessionId: voice.session_id, + memberId: voice.user_id, deaf: voice.deaf, mute: voice.mute, requestToSpeakTimestamp: voice.request_to_speak_timestamp, @@ -294,13 +276,30 @@ export class GuildCreateListener extends Listener { selfStream: voice.self_stream, selfVideo: voice.self_video, suppress: voice.suppress + })) + ) + .onConflictDoUpdate({ + target: voiceStates.memberId, + set: { + channelId: sql`EXCLUDED.channel_id`, + sessionId: sql`EXCLUDED.session_id`, + deaf: sql`EXCLUDED.deaf`, + mute: sql`EXCLUDED.mute`, + requestToSpeakTimestamp: sql`EXCLUDED.request_to_speak_timestamp`, + selfDeaf: sql`EXCLUDED.self_deaf`, + selfMute: sql`EXCLUDED.self_mute`, + selfStream: sql`EXCLUDED.self_stream`, + selfVideo: sql`EXCLUDED.self_video`, + suppress: sql`EXCLUDED.suppress` } - }) - .toSQL() - ); + }); } } - await this.store.amqp.publish(RabbitMQ.GATEWAY_QUEUE_SEND, RoutingKey(clientId, payload.shardId), Buffer.from(JSON.stringify(payload.data))); + await this.store.amqp.publish( + RabbitMQ.GATEWAY_QUEUE_SEND, + RoutingKey(clientId, payload.shardId), + Buffer.from(JSON.stringify(payload.data)) + ); } }