Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

122 capture webhook messages #123

Merged
merged 5 commits into from
Sep 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# env vars
src/config/*.env
*.env

# compiled output
/lib
Expand Down
2,759 changes: 1,568 additions & 1,191 deletions package-lock.json

Large diffs are not rendered by default.

8 changes: 6 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
"test:ci": "jest --ci --detectOpenHandles",
"lint": "eslint **/*.ts",
"lint-fix": "eslint --fix **/*.ts",
"format": "prettier --write src/**/*.ts"
"format": "prettier --write src/**/*.ts",
"migrate:create": "migrate create --template-file ./src/migrations/utils/template.ts --migrations-dir=\"./src/migrations/db\" --compiler=\"ts:./src/migrations/utils/ts-compiler.js\"",
"migrate:up": "migrate --migrations-dir=\"./src/migrations/db\" --compiler=\"ts:./src/migrations/utils/ts-compiler.js\" up",
"migrate:down": "migrate --migrations-dir=\"./src/migrations/db\" --compiler=\"ts:./src/migrations/utils/ts-compiler.js\" down"
},
"repository": {
"type": "git",
Expand All @@ -26,12 +29,13 @@
"homepage": "https://github.com/Behzad-rabiei/tc-discordBot#readme",
"dependencies": {
"@sentry/node": "^7.51.2",
"@togethercrew.dev/db": "^2.4.96",
"@togethercrew.dev/db": "^2.5.1",
"@togethercrew.dev/tc-messagebroker": "^0.0.40",
"babel-jest": "^29.5.0",
"bullmq": "^3.14.0",
"discord.js": "^14.12.1",
"joi": "^17.9.2",
"migrate": "^2.0.0",
"moment": "^2.29.4",
"mongodb": "^5.4.0",
"mongoose": "^6.11.1",
Expand Down
24 changes: 13 additions & 11 deletions src/functions/fetchMessages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ async function getNeedDataFromMessage(message: Message, threadInfo?: threadInfo)
channelName: threadInfo?.channelName ? threadInfo?.channelName : '',
threadId: threadInfo?.threadId ? threadInfo?.threadId : null,
threadName: threadInfo?.threadName ? threadInfo?.threadName : null,
isGeneratedByWebhook: message.webhookId ? true : false
};
} else {
return {
Expand All @@ -82,6 +83,7 @@ async function getNeedDataFromMessage(message: Message, threadInfo?: threadInfo)
channelName: message.channel instanceof TextChannel ? message.channel.name : null,
threadId: null,
threadName: null,
isGeneratedByWebhook: message.webhookId ? true : false
};
}
}
Expand Down Expand Up @@ -135,7 +137,7 @@ async function fetchMessages(
'Fetching channel messages is running'
);
const messagesToStore: IRawInfo[] = [];
const options: FetchOptions = { limit: 10 };
const options: FetchOptions = { limit: 100 };
if (rawInfo) {
options[fetchDirection] = rawInfo.messageId;
}
Expand All @@ -150,22 +152,22 @@ async function fetchMessages(
}
channel instanceof ThreadChannel
? await pushMessagesToArray(connection, messagesToStore, [...fetchedMessages.values()], {
threadId: channel.id,
threadName: channel.name,
channelId: channel.parent?.id,
channelName: channel.parent?.name,
})
threadId: channel.id,
threadName: channel.name,
channelId: channel.parent?.id,
channelName: channel.parent?.name,
})
: await pushMessagesToArray(connection, messagesToStore, [...fetchedMessages.values()]);
break;
}

channel instanceof ThreadChannel
? await pushMessagesToArray(connection, messagesToStore, [...fetchedMessages.values()], {
threadId: channel.id,
threadName: channel.name,
channelId: channel.parent?.id,
channelName: channel.parent?.name,
})
threadId: channel.id,
threadName: channel.name,
channelId: channel.parent?.id,
channelName: channel.parent?.name,
})
: await pushMessagesToArray(connection, messagesToStore, [...fetchedMessages.values()]);
options[fetchDirection] = boundaryMessage.id;
fetchedMessages = await channel.messages.fetch(options);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import 'dotenv/config';
import { Client, GatewayIntentBits, } from 'discord.js';
import { guildService } from '../../database/services';
import { connectDB } from '../../database';
import { databaseService } from '@togethercrew.dev/db';
import config from '../../config';
import { closeConnection } from '../../database/connection';
import webhookLogic from '../utils/webhookLogic';

const {
Guilds,
GuildMembers,
GuildMessages,
GuildPresences,
DirectMessages
} = GatewayIntentBits;


export const up = async () => {
const client = new Client({
intents: [Guilds, GuildMembers, GuildMessages, GuildPresences, DirectMessages],
});

await client.login(config.discord.botToken);
await connectDB();
const guilds = await guildService.getGuilds({});
for (let i = 0; i < guilds.length; i++) {
const connection = databaseService.connectionFactory(guilds[i].guildId, config.mongoose.dbURL);
await webhookLogic(connection, client, guilds[i].guildId);
await closeConnection(connection);
}
};

export const down = async () => {
// TODO: Implement rollback logic if needed
};
15 changes: 15 additions & 0 deletions src/migrations/utils/template.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import { connectDB } from '../../database';
import { databaseService } from '@togethercrew.dev/db';
import 'dotenv/config';
import config from '../../config';

export const up = async () => {
await connectDB();
const connection = databaseService.connectionFactory("681946187490000803", config.mongoose.dbURL);
await connection.createCollection('my_collection');
};

export const down = async () => {
await connectDB()

};
3 changes: 3 additions & 0 deletions src/migrations/utils/ts-compiler.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
// eslint-disable-next-line @typescript-eslint/no-var-requires
const tsNode = require('ts-node');
module.exports = tsNode.register;
194 changes: 194 additions & 0 deletions src/migrations/utils/webhookLogic.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
import { TextChannel, Message, ThreadChannel, Snowflake, Client } from 'discord.js';
import { IRawInfo } from '@togethercrew.dev/db';
import { Connection } from 'mongoose';
import parentLogger from '../../config/logger';
import { rawInfoService, channelService } from '../../database/services';

const logger = parentLogger.child({ module: 'Migration' });

interface FetchOptions {
limit: number;
before?: Snowflake;
after?: Snowflake;
}

async function fetchMessagesBetweenOldestAndNewest(
connection: Connection,
channel: TextChannel | ThreadChannel,
oldestRawInfo: IRawInfo,
newestRawInfo: IRawInfo
) {
try {
let allMessages: Message[] = [];
logger.info(
{ guild_id: connection.name, channel_id: channel.id },
'Fetching channel messages is running'
);
const options: FetchOptions = { limit: 100 };
options.after = oldestRawInfo.messageId;
let fetchedMessages = await channel.messages.fetch(options);
while (fetchedMessages.size > 0) {
allMessages = allMessages.concat(Array.from(fetchedMessages.values()));
if (fetchedMessages.has(newestRawInfo.messageId)) {
break;
}
options.after = fetchedMessages.first()?.id;
fetchedMessages = await channel.messages.fetch(options);
}
return allMessages;
} catch (err) {
logger.error(
{ guild_id: connection.name, channel_id: channel.id, err },
'Fetching channel messages failed'
);
}
logger.info(
{ guild_id: connection.name, channel_id: channel.id },
'Fetching channel messages is done'
);
}

async function migrateIsGeneratedByWebhook(connection: Connection, channel: TextChannel) {
try {
logger.info({ guild_id: connection.name, channel_id: channel.id }, 'Migration for isGeneratedByWebhook is running');

// Fetch oldest rawInfo from DB
const oldestChannelRawInfo = await rawInfoService.getOldestRawInfo(connection, {
channelId: channel?.id,
threadId: null,
});

// Fetch newest rawInfo from DB
const newestChannelRawInfo = await rawInfoService.getNewestRawInfo(connection, {
channelId: channel?.id,
threadId: null,
});

if (!oldestChannelRawInfo || !newestChannelRawInfo) {
logger.info({ guild_id: connection.name, channel_id: channel.id }, 'No oldest rawInfo found, skipping migration');
return;
}



const fetchedMessages = await fetchMessagesBetweenOldestAndNewest(connection, channel, oldestChannelRawInfo, newestChannelRawInfo);
const messagesToUpdateTrue = [];
const messagesToUpdateFalse = [];

const oldestMessage = await channel.messages.fetch(oldestChannelRawInfo.messageId);
const newestMessage = await channel.messages.fetch(newestChannelRawInfo.messageId);


if (oldestMessage.webhookId) messagesToUpdateTrue.push(oldestMessage.id);
else messagesToUpdateFalse.push(oldestMessage.id);

if (newestMessage.webhookId) messagesToUpdateTrue.push(newestMessage.id);
else messagesToUpdateFalse.push(newestMessage.id);

if (fetchedMessages) {
for (const message of fetchedMessages) {
if (message.webhookId) {
messagesToUpdateTrue.push(message.id);
} else {
messagesToUpdateFalse.push(message.id);
}
}

}

if (messagesToUpdateTrue.length > 0) {
await rawInfoService.updateManyRawInfo(connection, { messageId: { $in: messagesToUpdateTrue } }, { isGeneratedByWebhook: true });
}

if (messagesToUpdateFalse.length > 0) {

await rawInfoService.updateManyRawInfo(connection, { messageId: { $in: messagesToUpdateFalse } }, { isGeneratedByWebhook: false });
}

const threads = channel.threads.cache.values();

// Handle threads of the channel
for (const thread of threads) {
const oldestThreadRawInfo = await rawInfoService.getOldestRawInfo(connection, {
channelId: channel?.id,
threadId: thread.id,
});

const newestThreadRawInfo = await rawInfoService.getNewestRawInfo(connection, {
channelId: channel?.id,
threadId: thread.id,
});

if (!oldestThreadRawInfo || !newestThreadRawInfo) {
continue; // No data to migrate for this thread
}

const fetchedThreadMessages = await fetchMessagesBetweenOldestAndNewest(connection, thread, oldestThreadRawInfo, newestThreadRawInfo);

const threadMessagesToUpdateTrue = [];
const threadMessagesToUpdateFalse = [];


const oldestThreadMessage = await thread.messages.fetch(oldestThreadRawInfo.messageId);
const newestThreadMessage = await thread.messages.fetch(newestThreadRawInfo.messageId);

if (oldestThreadMessage.webhookId) threadMessagesToUpdateTrue.push(oldestThreadMessage.id);
else threadMessagesToUpdateFalse.push(oldestThreadMessage.id);

if (newestThreadMessage.webhookId) threadMessagesToUpdateTrue.push(newestThreadMessage.id);
else threadMessagesToUpdateFalse.push(newestThreadMessage.id);



if (fetchedThreadMessages) {
for (const message of fetchedThreadMessages) {
if (message.webhookId) {
threadMessagesToUpdateTrue.push(message.id);
} else {
threadMessagesToUpdateFalse.push(message.id);
}
}
}

if (threadMessagesToUpdateTrue.length > 0) {
await rawInfoService.updateManyRawInfo(connection, { messageId: { $in: threadMessagesToUpdateTrue } }, { isGeneratedByWebhook: true });
}

if (threadMessagesToUpdateFalse.length > 0) {
await rawInfoService.updateManyRawInfo(connection, { messageId: { $in: threadMessagesToUpdateFalse } }, { isGeneratedByWebhook: false });
}
}

logger.info({ guild_id: connection.name, channel_id: channel.id }, 'Migration for isGeneratedByWebhook is done');

} catch (err) {
logger.error({ guild_id: connection.name, channel_id: channel.id, err }, 'Migration for isGeneratedByWebhook failed');
}
}


/**
*
* @param {Connection} connection - Mongoose connection object for the database.
* @param {Client} client - The discord.js client object used to fetch the guild.
* @param {Snowflake} guildId - The identifier of the guild to extract information from.
*/
async function runRawInfoMigration(connection: Connection, client: Client, guildId: Snowflake) {
logger.info({ guild_id: guildId }, 'Migration is running');
try {
const guild = await client.guilds.fetch(guildId);
const channels = await channelService.getChannels(connection, {});
for (let i = 0; i < channels.length; i++) {
const channel = await guild.channels.fetch(channels[i].channelId);
if (channel) {
if (channel.type !== 0) continue;
await migrateIsGeneratedByWebhook(connection, channel);
}
}
} catch (err) {
logger.error({ guild_id: guildId, err }, 'Migration is failed');
}
logger.info({ guild_id: guildId }, 'Migration is done');
}

export default runRawInfoMigration;
Loading