diff --git a/.gitignore b/.gitignore index f2818846..13d0db31 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,7 @@ src/config/*.env # compiled output /dist /node_modules +/lib #migration migrate.json diff --git a/src/functions/fetchMessages.ts b/src/functions/fetchMessages.ts index 49804d19..ddb3af5c 100644 --- a/src/functions/fetchMessages.ts +++ b/src/functions/fetchMessages.ts @@ -66,7 +66,7 @@ async function getNeedDataFromMessage(message: Message, threadInfo?: threadInfo) channelName: threadInfo?.channelName ? threadInfo?.channelName : '', threadId: threadInfo?.threadId ? threadInfo?.threadId : null, threadName: threadInfo?.threadName ? threadInfo?.threadName : null, - isGeneratedByWebhook: message.webhookId ? true : false + isGeneratedByWebhook: message.webhookId ? true : false, }; } else { return { @@ -83,7 +83,7 @@ async function getNeedDataFromMessage(message: Message, threadInfo?: threadInfo) channelName: message.channel instanceof TextChannel ? message.channel.name : null, threadId: null, threadName: null, - isGeneratedByWebhook: message.webhookId ? true : false + isGeneratedByWebhook: message.webhookId ? true : false, }; } } @@ -152,22 +152,22 @@ async function fetchMessages( } channel instanceof ThreadChannel ? await pushMessagesToArray(connection, messagesToStore, [...fetchedMessages.values()], { - threadId: channel.id, - threadName: channel.name, - channelId: channel.parent?.id, - channelName: channel.parent?.name, - }) + threadId: channel.id, + threadName: channel.name, + channelId: channel.parent?.id, + channelName: channel.parent?.name, + }) : await pushMessagesToArray(connection, messagesToStore, [...fetchedMessages.values()]); break; } channel instanceof ThreadChannel ? await pushMessagesToArray(connection, messagesToStore, [...fetchedMessages.values()], { - threadId: channel.id, - threadName: channel.name, - channelId: channel.parent?.id, - channelName: channel.parent?.name, - }) + threadId: channel.id, + threadName: channel.name, + channelId: channel.parent?.id, + channelName: channel.parent?.name, + }) : await pushMessagesToArray(connection, messagesToStore, [...fetchedMessages.values()]); options[fetchDirection] = boundaryMessage.id; fetchedMessages = await channel.messages.fetch(options); diff --git a/src/index.ts b/src/index.ts index 8b2e6c1b..af79d3bb 100644 --- a/src/index.ts +++ b/src/index.ts @@ -37,8 +37,8 @@ const client = new Client({ const partial = (func: any, ...args: any) => - (...rest: any) => - func(...args, ...rest); + (...rest: any) => + func(...args, ...rest); const fetchMethod = async (msg: any) => { logger.info({ msg }, 'fetchMethod is running'); @@ -179,7 +179,7 @@ async function app() { // every: 10000 }, jobId: 'cronJob', // Optional: Provide a unique ID for the job - attempts: 1, // Number of times to retry the job if it fails + attempts: 0, // Number of times to retry the job if it fails backoff: { type: 'exponential', delay: 1000, // Initial delay between retries in milliseconds diff --git a/src/migrations/db/1695210587863-add-isgeneratedbyweebhook-to-rawinfo-schema.ts b/src/migrations/db/1695210587863-add-isgeneratedbyweebhook-to-rawinfo-schema.ts index f54c567e..a6ef1c09 100644 --- a/src/migrations/db/1695210587863-add-isgeneratedbyweebhook-to-rawinfo-schema.ts +++ b/src/migrations/db/1695210587863-add-isgeneratedbyweebhook-to-rawinfo-schema.ts @@ -1,5 +1,5 @@ import 'dotenv/config'; -import { Client, GatewayIntentBits, } from 'discord.js'; +import { Client, GatewayIntentBits } from 'discord.js'; import { guildService } from '../../database/services'; import { connectDB } from '../../database'; import { databaseService } from '@togethercrew.dev/db'; @@ -7,32 +7,23 @@ import config from '../../config'; import { closeConnection } from '../../database/connection'; import webhookLogic from '../utils/webhookLogic'; -const { - Guilds, - GuildMembers, - GuildMessages, - GuildPresences, - DirectMessages -} = GatewayIntentBits; - +const { Guilds, GuildMembers, GuildMessages, GuildPresences, DirectMessages } = GatewayIntentBits; export const up = async () => { - const client = new Client({ - intents: [Guilds, GuildMembers, GuildMessages, GuildPresences, DirectMessages], - }); + const client = new Client({ + intents: [Guilds, GuildMembers, GuildMessages, GuildPresences, DirectMessages], + }); - await client.login(config.discord.botToken); - await connectDB(); - const guilds = await guildService.getGuilds({}); - for (let i = 0; i < guilds.length; i++) { - const connection = databaseService.connectionFactory(guilds[i].guildId, config.mongoose.dbURL); - await webhookLogic(connection, client, guilds[i].guildId); - await closeConnection(connection); - } + await client.login(config.discord.botToken); + await connectDB(); + const guilds = await guildService.getGuilds({}); + for (let i = 0; i < guilds.length; i++) { + const connection = databaseService.connectionFactory(guilds[i].guildId, config.mongoose.dbURL); + await webhookLogic(connection, client, guilds[i].guildId); + await closeConnection(connection); + } }; export const down = async () => { - // TODO: Implement rollback logic if needed + // TODO: Implement rollback logic if needed }; - - diff --git a/src/migrations/utils/template.ts b/src/migrations/utils/template.ts index 1bb0f4e2..080196cd 100644 --- a/src/migrations/utils/template.ts +++ b/src/migrations/utils/template.ts @@ -4,12 +4,11 @@ import 'dotenv/config'; import config from '../../config'; export const up = async () => { - await connectDB(); - const connection = databaseService.connectionFactory("681946187490000803", config.mongoose.dbURL); - await connection.createCollection('my_collection'); + await connectDB(); + const connection = databaseService.connectionFactory('681946187490000803', config.mongoose.dbURL); + await connection.createCollection('my_collection'); }; export const down = async () => { - await connectDB() - -}; \ No newline at end of file + await connectDB(); +}; diff --git a/src/migrations/utils/webhookLogic.ts b/src/migrations/utils/webhookLogic.ts index 0e8e1784..c2621b8a 100644 --- a/src/migrations/utils/webhookLogic.ts +++ b/src/migrations/utils/webhookLogic.ts @@ -7,188 +7,198 @@ import { rawInfoService, channelService } from '../../database/services'; const logger = parentLogger.child({ module: 'Migration' }); interface FetchOptions { - limit: number; - before?: Snowflake; - after?: Snowflake; + limit: number; + before?: Snowflake; + after?: Snowflake; } async function fetchMessagesBetweenOldestAndNewest( - connection: Connection, - channel: TextChannel | ThreadChannel, - oldestRawInfo: IRawInfo, - newestRawInfo: IRawInfo + connection: Connection, + channel: TextChannel | ThreadChannel, + oldestRawInfo: IRawInfo, + newestRawInfo: IRawInfo ) { - try { - let allMessages: Message[] = []; - logger.info( - { guild_id: connection.name, channel_id: channel.id }, - 'Fetching channel messages is running' - ); - const options: FetchOptions = { limit: 100 }; - options.after = oldestRawInfo.messageId; - let fetchedMessages = await channel.messages.fetch(options); - while (fetchedMessages.size > 0) { - allMessages = allMessages.concat(Array.from(fetchedMessages.values())); - if (fetchedMessages.has(newestRawInfo.messageId)) { - break; - } - options.after = fetchedMessages.first()?.id; - fetchedMessages = await channel.messages.fetch(options); - } - return allMessages; - } catch (err) { - logger.error( - { guild_id: connection.name, channel_id: channel.id, err }, - 'Fetching channel messages failed' - ); + try { + let allMessages: Message[] = []; + logger.info({ guild_id: connection.name, channel_id: channel.id }, 'Fetching channel messages is running'); + const options: FetchOptions = { limit: 100 }; + options.after = oldestRawInfo.messageId; + let fetchedMessages = await channel.messages.fetch(options); + while (fetchedMessages.size > 0) { + allMessages = allMessages.concat(Array.from(fetchedMessages.values())); + if (fetchedMessages.has(newestRawInfo.messageId)) { + break; + } + options.after = fetchedMessages.first()?.id; + fetchedMessages = await channel.messages.fetch(options); } - logger.info( - { guild_id: connection.name, channel_id: channel.id }, - 'Fetching channel messages is done' - ); + return allMessages; + } catch (err) { + logger.error({ guild_id: connection.name, channel_id: channel.id, err }, 'Fetching channel messages failed'); + } + logger.info({ guild_id: connection.name, channel_id: channel.id }, 'Fetching channel messages is done'); } async function migrateIsGeneratedByWebhook(connection: Connection, channel: TextChannel) { - try { - logger.info({ guild_id: connection.name, channel_id: channel.id }, 'Migration for isGeneratedByWebhook is running'); - - // Fetch oldest rawInfo from DB - const oldestChannelRawInfo = await rawInfoService.getOldestRawInfo(connection, { - channelId: channel?.id, - threadId: null, - }); - - // Fetch newest rawInfo from DB - const newestChannelRawInfo = await rawInfoService.getNewestRawInfo(connection, { - channelId: channel?.id, - threadId: null, - }); - - if (!oldestChannelRawInfo || !newestChannelRawInfo) { - logger.info({ guild_id: connection.name, channel_id: channel.id }, 'No oldest rawInfo found, skipping migration'); - return; - } - - - - const fetchedMessages = await fetchMessagesBetweenOldestAndNewest(connection, channel, oldestChannelRawInfo, newestChannelRawInfo); - const messagesToUpdateTrue = []; - const messagesToUpdateFalse = []; - - const oldestMessage = await channel.messages.fetch(oldestChannelRawInfo.messageId); - const newestMessage = await channel.messages.fetch(newestChannelRawInfo.messageId); - + try { + logger.info({ guild_id: connection.name, channel_id: channel.id }, 'Migration for isGeneratedByWebhook is running'); + + // Fetch oldest rawInfo from DB + const oldestChannelRawInfo = await rawInfoService.getOldestRawInfo(connection, { + channelId: channel?.id, + threadId: null, + }); + + // Fetch newest rawInfo from DB + const newestChannelRawInfo = await rawInfoService.getNewestRawInfo(connection, { + channelId: channel?.id, + threadId: null, + }); + + if (!oldestChannelRawInfo || !newestChannelRawInfo) { + logger.info({ guild_id: connection.name, channel_id: channel.id }, 'No oldest rawInfo found, skipping migration'); + return; + } - if (oldestMessage.webhookId) messagesToUpdateTrue.push(oldestMessage.id); - else messagesToUpdateFalse.push(oldestMessage.id); + const fetchedMessages = await fetchMessagesBetweenOldestAndNewest( + connection, + channel, + oldestChannelRawInfo, + newestChannelRawInfo + ); + const messagesToUpdateTrue = []; + const messagesToUpdateFalse = []; - if (newestMessage.webhookId) messagesToUpdateTrue.push(newestMessage.id); - else messagesToUpdateFalse.push(newestMessage.id); + const oldestMessage = await channel.messages.fetch(oldestChannelRawInfo.messageId); + const newestMessage = await channel.messages.fetch(newestChannelRawInfo.messageId); - if (fetchedMessages) { - for (const message of fetchedMessages) { - if (message.webhookId) { - messagesToUpdateTrue.push(message.id); - } else { - messagesToUpdateFalse.push(message.id); - } - } + if (oldestMessage.webhookId) messagesToUpdateTrue.push(oldestMessage.id); + else messagesToUpdateFalse.push(oldestMessage.id); - } + if (newestMessage.webhookId) messagesToUpdateTrue.push(newestMessage.id); + else messagesToUpdateFalse.push(newestMessage.id); - if (messagesToUpdateTrue.length > 0) { - await rawInfoService.updateManyRawInfo(connection, { messageId: { $in: messagesToUpdateTrue } }, { isGeneratedByWebhook: true }); + if (fetchedMessages) { + for (const message of fetchedMessages) { + if (message.webhookId) { + messagesToUpdateTrue.push(message.id); + } else { + messagesToUpdateFalse.push(message.id); } + } + } - if (messagesToUpdateFalse.length > 0) { - - await rawInfoService.updateManyRawInfo(connection, { messageId: { $in: messagesToUpdateFalse } }, { isGeneratedByWebhook: false }); - } - - const threads = channel.threads.cache.values(); - - // Handle threads of the channel - for (const thread of threads) { - const oldestThreadRawInfo = await rawInfoService.getOldestRawInfo(connection, { - channelId: channel?.id, - threadId: thread.id, - }); - - const newestThreadRawInfo = await rawInfoService.getNewestRawInfo(connection, { - channelId: channel?.id, - threadId: thread.id, - }); - - if (!oldestThreadRawInfo || !newestThreadRawInfo) { - continue; // No data to migrate for this thread - } - - const fetchedThreadMessages = await fetchMessagesBetweenOldestAndNewest(connection, thread, oldestThreadRawInfo, newestThreadRawInfo); - - const threadMessagesToUpdateTrue = []; - const threadMessagesToUpdateFalse = []; - - - const oldestThreadMessage = await thread.messages.fetch(oldestThreadRawInfo.messageId); - const newestThreadMessage = await thread.messages.fetch(newestThreadRawInfo.messageId); - - if (oldestThreadMessage.webhookId) threadMessagesToUpdateTrue.push(oldestThreadMessage.id); - else threadMessagesToUpdateFalse.push(oldestThreadMessage.id); - - if (newestThreadMessage.webhookId) threadMessagesToUpdateTrue.push(newestThreadMessage.id); - else threadMessagesToUpdateFalse.push(newestThreadMessage.id); - - - - if (fetchedThreadMessages) { - for (const message of fetchedThreadMessages) { - if (message.webhookId) { - threadMessagesToUpdateTrue.push(message.id); - } else { - threadMessagesToUpdateFalse.push(message.id); - } - } - } + if (messagesToUpdateTrue.length > 0) { + await rawInfoService.updateManyRawInfo( + connection, + { messageId: { $in: messagesToUpdateTrue } }, + { isGeneratedByWebhook: true } + ); + } - if (threadMessagesToUpdateTrue.length > 0) { - await rawInfoService.updateManyRawInfo(connection, { messageId: { $in: threadMessagesToUpdateTrue } }, { isGeneratedByWebhook: true }); - } + if (messagesToUpdateFalse.length > 0) { + await rawInfoService.updateManyRawInfo( + connection, + { messageId: { $in: messagesToUpdateFalse } }, + { isGeneratedByWebhook: false } + ); + } - if (threadMessagesToUpdateFalse.length > 0) { - await rawInfoService.updateManyRawInfo(connection, { messageId: { $in: threadMessagesToUpdateFalse } }, { isGeneratedByWebhook: false }); - } + const threads = channel.threads.cache.values(); + + // Handle threads of the channel + for (const thread of threads) { + const oldestThreadRawInfo = await rawInfoService.getOldestRawInfo(connection, { + channelId: channel?.id, + threadId: thread.id, + }); + + const newestThreadRawInfo = await rawInfoService.getNewestRawInfo(connection, { + channelId: channel?.id, + threadId: thread.id, + }); + + if (!oldestThreadRawInfo || !newestThreadRawInfo) { + continue; // No data to migrate for this thread + } + + const fetchedThreadMessages = await fetchMessagesBetweenOldestAndNewest( + connection, + thread, + oldestThreadRawInfo, + newestThreadRawInfo + ); + + const threadMessagesToUpdateTrue = []; + const threadMessagesToUpdateFalse = []; + + const oldestThreadMessage = await thread.messages.fetch(oldestThreadRawInfo.messageId); + const newestThreadMessage = await thread.messages.fetch(newestThreadRawInfo.messageId); + + if (oldestThreadMessage.webhookId) threadMessagesToUpdateTrue.push(oldestThreadMessage.id); + else threadMessagesToUpdateFalse.push(oldestThreadMessage.id); + + if (newestThreadMessage.webhookId) threadMessagesToUpdateTrue.push(newestThreadMessage.id); + else threadMessagesToUpdateFalse.push(newestThreadMessage.id); + + if (fetchedThreadMessages) { + for (const message of fetchedThreadMessages) { + if (message.webhookId) { + threadMessagesToUpdateTrue.push(message.id); + } else { + threadMessagesToUpdateFalse.push(message.id); + } } + } - logger.info({ guild_id: connection.name, channel_id: channel.id }, 'Migration for isGeneratedByWebhook is done'); + if (threadMessagesToUpdateTrue.length > 0) { + await rawInfoService.updateManyRawInfo( + connection, + { messageId: { $in: threadMessagesToUpdateTrue } }, + { isGeneratedByWebhook: true } + ); + } - } catch (err) { - logger.error({ guild_id: connection.name, channel_id: channel.id, err }, 'Migration for isGeneratedByWebhook failed'); + if (threadMessagesToUpdateFalse.length > 0) { + await rawInfoService.updateManyRawInfo( + connection, + { messageId: { $in: threadMessagesToUpdateFalse } }, + { isGeneratedByWebhook: false } + ); + } } -} + logger.info({ guild_id: connection.name, channel_id: channel.id }, 'Migration for isGeneratedByWebhook is done'); + } catch (err) { + logger.error( + { guild_id: connection.name, channel_id: channel.id, err }, + 'Migration for isGeneratedByWebhook failed' + ); + } +} /** - * + * * @param {Connection} connection - Mongoose connection object for the database. * @param {Client} client - The discord.js client object used to fetch the guild. * @param {Snowflake} guildId - The identifier of the guild to extract information from. */ async function runRawInfoMigration(connection: Connection, client: Client, guildId: Snowflake) { - logger.info({ guild_id: guildId }, 'Migration is running'); - try { - const guild = await client.guilds.fetch(guildId); - const channels = await channelService.getChannels(connection, {}); - for (let i = 0; i < channels.length; i++) { - const channel = await guild.channels.fetch(channels[i].channelId); - if (channel) { - if (channel.type !== 0) continue; - await migrateIsGeneratedByWebhook(connection, channel); - } - } - } catch (err) { - logger.error({ guild_id: guildId, err }, 'Migration is failed'); + logger.info({ guild_id: guildId }, 'Migration is running'); + try { + const guild = await client.guilds.fetch(guildId); + const channels = await channelService.getChannels(connection, {}); + for (let i = 0; i < channels.length; i++) { + const channel = await guild.channels.fetch(channels[i].channelId); + if (channel) { + if (channel.type !== 0) continue; + await migrateIsGeneratedByWebhook(connection, channel); + } } - logger.info({ guild_id: guildId }, 'Migration is done'); + } catch (err) { + logger.error({ guild_id: guildId, err }, 'Migration is failed'); + } + logger.info({ guild_id: guildId }, 'Migration is done'); } -export default runRawInfoMigration; \ No newline at end of file +export default runRawInfoMigration;