diff --git a/dags/hivemind_etl_helpers/src/db/discord/fetch_raw_messages.py b/dags/hivemind_etl_helpers/src/db/discord/fetch_raw_messages.py index 2bd09f52..5cb3268d 100644 --- a/dags/hivemind_etl_helpers/src/db/discord/fetch_raw_messages.py +++ b/dags/hivemind_etl_helpers/src/db/discord/fetch_raw_messages.py @@ -22,6 +22,8 @@ def fetch_raw_messages(guild_id: str, from_date: datetime | None = None) -> list """ client = MongoSingleton.get_instance().get_client() + channels = fetch_channels(guild_id=guild_id) + raw_messages: list[dict] if from_date is not None: cursor = ( @@ -30,6 +32,7 @@ def fetch_raw_messages(guild_id: str, from_date: datetime | None = None) -> list { "createdDate": {"$gte": from_date}, "isGeneratedByWebhook": False, + "channelId": {"$in": channels}, } ) .sort("createdDate", 1) @@ -38,7 +41,12 @@ def fetch_raw_messages(guild_id: str, from_date: datetime | None = None) -> list else: cursor = ( client[guild_id]["rawinfos"] - .find({"isGeneratedByWebhook": False}) + .find( + { + "isGeneratedByWebhook": False, + "channelId": {"$in": channels}, + } + ) .sort("createdDate", 1) ) raw_messages = list(cursor) @@ -80,6 +88,8 @@ def fetch_raw_msg_grouped( """ client = MongoSingleton.get_instance().client + channels = fetch_channels(guild_id) + # the pipeline to apply through mongodb pipeline: list[dict] = [] @@ -87,27 +97,20 @@ def fetch_raw_msg_grouped( pipeline.append( { "$match": { - "$and": [ - { - "createdDate": { - "$gte": from_date, - "$lt": datetime.now().replace( - hour=0, minute=0, second=0, microsecond=0 - ), - } - }, - {"isGeneratedByWebhook": False}, - ] + "createdDate": { + "$gte": from_date, + "$lt": datetime.now().replace( + hour=0, minute=0, second=0, microsecond=0 + ), + }, + "isGeneratedByWebhook": False, + "channelId": {"$in": channels}, } }, ) else: pipeline.append( - { - "$match": { - "isGeneratedByWebhook": False, - } - }, + {"$match": {"isGeneratedByWebhook": False, "channelId": {"$in": channels}}}, ) # sorting @@ -134,6 +137,7 @@ def fetch_raw_msg_grouped( return raw_messages_grouped + def fetch_channels(guild_id: str): """ fetch the channels from modules that we wanted to process @@ -148,4 +152,30 @@ def fetch_channels(guild_id: str): channels : list[str] the channels to fetch data from """ - pass \ No newline at end of file + client = MongoSingleton.get_instance().client + platform = client["Core"]["platforms"].find_one( + {"name": "discord", "metadata.id": guild_id}, + { + "_id": 1, + "community": 1, + }, + ) + + if platform is None: + raise ValueError(f"No platform with given guild_id: {guild_id} available!") + + result = client["Module"]["modules"].find_one( + { + "communityId": platform["community"], + "options.platforms.platformId": platform["_id"], + }, + {"_id": 0, "options.platforms.$": 1}, + ) + + channels: list[str] + if result is not None: + channels = result["options"]["platforms"][0]["options"]["channels"] + else: + raise ValueError("No modules set for this community!") + + return channels diff --git a/dags/hivemind_etl_helpers/tests/integration/test_discord_fetch_modules_channels.py b/dags/hivemind_etl_helpers/tests/integration/test_discord_fetch_modules_channels.py new file mode 100644 index 00000000..0f861471 --- /dev/null +++ b/dags/hivemind_etl_helpers/tests/integration/test_discord_fetch_modules_channels.py @@ -0,0 +1,117 @@ +from unittest import TestCase +from bson import ObjectId + +from datetime import datetime, timedelta +from hivemind_etl_helpers.src.utils.mongo import MongoSingleton +from hivemind_etl_helpers.src.db.discord.fetch_raw_messages import fetch_channels + + +class TestDiscordFetchModulesChannels(TestCase): + def setup_db( + self, + channels: list[str], + create_modules: bool = True, + create_platform: bool = True, + guild_id: str = "1234", + ): + client = MongoSingleton.get_instance().client + + community_id = ObjectId("9f59dd4f38f3474accdc8f24") + platform_id = ObjectId("063a2a74282db2c00fbc2428") + + client["Module"].drop_collection("modules") + client["Core"].drop_collection("platforms") + + if create_modules: + data = { + "name": "hivemind", + "communityId": community_id, + "options": { + "platforms": [ + { + "platformId": platform_id, + "options": { + "channels": channels, + "roles": ["role_id"], + "users": ["user_id"], + }, + } + ] + }, + } + client["Module"]["modules"].insert_one(data) + + if create_platform: + client["Core"]["platforms"].insert_one( + { + "_id": platform_id, + "name": "discord", + "metadata": { + "action": { + "INT_THR": 1, + "UW_DEG_THR": 1, + "PAUSED_T_THR": 1, + "CON_T_THR": 4, + "CON_O_THR": 3, + "EDGE_STR_THR": 5, + "UW_THR_DEG_THR": 5, + "VITAL_T_THR": 4, + "VITAL_O_THR": 3, + "STILL_T_THR": 2, + "STILL_O_THR": 2, + "DROP_H_THR": 2, + "DROP_I_THR": 1, + }, + "window": {"period_size": 7, "step_size": 1}, + "id": guild_id, + "isInProgress": False, + "period": datetime.now() - timedelta(days=35), + "icon": "some_icon_hash", + "selectedChannels": channels, + "name": "GuildName", + }, + "community": community_id, + "disconnectedAt": None, + "connectedAt": datetime.now(), + "createdAt": datetime.now(), + "updatedAt": datetime.now(), + } + ) + + def test_fetch_channels(self): + guild_id = "1234" + channels = ["111111", "22222"] + self.setup_db( + create_modules=True, + create_platform=True, + guild_id=guild_id, + channels=channels, + ) + channels = fetch_channels(guild_id="1234") + + self.assertEqual(channels, channels) + + def test_fetch_channels_no_modules_available(self): + guild_id = "1234" + channels = ["111111", "22222"] + self.setup_db( + create_modules=False, + create_platform=True, + guild_id=guild_id, + channels=channels, + ) + with self.assertRaises(ValueError): + _ = fetch_channels(guild_id="1234") + + def test_fetch_channels_no_platform_available(self): + guild_id = "1234" + channels = ["111111", "22222"] + self.setup_db( + create_modules=True, + create_platform=False, + guild_id=guild_id, + channels=channels, + ) + + with self.assertRaises(ValueError): + _ = fetch_channels(guild_id="1234") diff --git a/dags/hivemind_etl_helpers/tests/integration/test_discord_fetch_raw_messages.py b/dags/hivemind_etl_helpers/tests/integration/test_discord_fetch_raw_messages.py index 8cba8b5e..a70b5aaf 100644 --- a/dags/hivemind_etl_helpers/tests/integration/test_discord_fetch_raw_messages.py +++ b/dags/hivemind_etl_helpers/tests/integration/test_discord_fetch_raw_messages.py @@ -1,5 +1,6 @@ import unittest -from datetime import datetime +from datetime import datetime, timedelta +from bson import ObjectId import numpy as np from hivemind_etl_helpers.src.db.discord.fetch_raw_messages import fetch_raw_messages @@ -7,10 +8,85 @@ class TestFetchRawMessages(unittest.TestCase): - def test_fetch_raw_messages_fetch_all(self): + def setup_db( + self, + channels: list[str], + create_modules: bool = True, + create_platform: bool = True, + guild_id: str = "1234", + ): client = MongoSingleton.get_instance().client + community_id = ObjectId("9f59dd4f38f3474accdc8f24") + platform_id = ObjectId("063a2a74282db2c00fbc2428") + + client["Module"].drop_collection("modules") + client["Core"].drop_collection("platforms") + + if create_modules: + data = { + "name": "hivemind", + "communityId": community_id, + "options": { + "platforms": [ + { + "platformId": platform_id, + "options": { + "channels": channels, + "roles": ["role_id"], + "users": ["user_id"], + }, + } + ] + }, + } + client["Module"]["modules"].insert_one(data) + + if create_platform: + client["Core"]["platforms"].insert_one( + { + "_id": platform_id, + "name": "discord", + "metadata": { + "action": { + "INT_THR": 1, + "UW_DEG_THR": 1, + "PAUSED_T_THR": 1, + "CON_T_THR": 4, + "CON_O_THR": 3, + "EDGE_STR_THR": 5, + "UW_THR_DEG_THR": 5, + "VITAL_T_THR": 4, + "VITAL_O_THR": 3, + "STILL_T_THR": 2, + "STILL_O_THR": 2, + "DROP_H_THR": 2, + "DROP_I_THR": 1, + }, + "window": {"period_size": 7, "step_size": 1}, + "id": guild_id, + "isInProgress": False, + "period": datetime.now() - timedelta(days=35), + "icon": "some_icon_hash", + "selectedChannels": channels, + "name": "GuildName", + }, + "community": community_id, + "disconnectedAt": None, + "connectedAt": datetime.now(), + "createdAt": datetime.now(), + "updatedAt": datetime.now(), + } + ) + + def test_fetch_raw_messages_fetch_all(self): + client = MongoSingleton.get_instance().client + channels = ["111111", "22222"] guild_id = "1234" + self.setup_db( + channels=channels, + guild_id=guild_id, + ) # droping any previous data client[guild_id].drop_collection("rawinfos") @@ -18,7 +94,7 @@ def test_fetch_raw_messages_fetch_all(self): message_count = 2 raw_data = [] - for _ in range(message_count): + for i in range(message_count): data = { "type": 0, "author": str(np.random.randint(100000, 999999)), @@ -29,7 +105,7 @@ def test_fetch_raw_messages_fetch_all(self): "replied_user": None, "createdDate": datetime.now(), "messageId": str(np.random.randint(1000000, 9999999)), - "channelId": str(np.random.randint(10000000, 99999999)), + "channelId": channels[i % len(channels)], "channelName": "general", "threadId": None, "threadName": None, @@ -59,6 +135,13 @@ def test_fetch_raw_messages_fetch_all_no_data_available(self): client = MongoSingleton.get_instance().client guild_id = "1234" + + channels = ["111111", "22222"] + guild_id = "1234" + self.setup_db( + channels=channels, + guild_id=guild_id, + ) # droping any previous data client[guild_id].drop_collection("rawinfos") @@ -71,6 +154,12 @@ def test_fetch_raw_messages_fetch_from_date(self): client = MongoSingleton.get_instance().client guild_id = "1234" + channels = ["111111", "22222"] + guild_id = "1234" + self.setup_db( + channels=channels, + guild_id=guild_id, + ) # Dropping any previous data client[guild_id].drop_collection("rawinfos") @@ -90,8 +179,8 @@ def test_fetch_raw_messages_fetch_from_date(self): 2023, 10, i + 1 ), # Different dates in October 2023 "messageId": str(np.random.randint(1000000, 9999999)), - "channelId": str(np.random.randint(10000000, 99999999)), - "channelName": "general", + "channelId": channels[i % len(channels)], + "channelName": f"general {channels[i % len(channels)]}", "threadId": None, "threadName": None, "isGeneratedByWebhook": False, diff --git a/dags/hivemind_etl_helpers/tests/integration/test_discord_fetch_raw_messages_grouped.py b/dags/hivemind_etl_helpers/tests/integration/test_discord_fetch_raw_messages_grouped.py index cfb3580f..50089f60 100644 --- a/dags/hivemind_etl_helpers/tests/integration/test_discord_fetch_raw_messages_grouped.py +++ b/dags/hivemind_etl_helpers/tests/integration/test_discord_fetch_raw_messages_grouped.py @@ -1,4 +1,5 @@ -from datetime import datetime +from bson import ObjectId +from datetime import datetime, timedelta from unittest import TestCase from hivemind_etl_helpers.src.db.discord.fetch_raw_messages import fetch_raw_msg_grouped @@ -6,8 +7,85 @@ class TestFetchRawMessagesGrouped(TestCase): + def setup_db( + self, + channels: list[str], + create_modules: bool = True, + create_platform: bool = True, + guild_id: str = "1234", + ): + client = MongoSingleton.get_instance().client + + community_id = ObjectId("9f59dd4f38f3474accdc8f24") + platform_id = ObjectId("063a2a74282db2c00fbc2428") + + client["Module"].drop_collection("modules") + client["Core"].drop_collection("platforms") + + if create_modules: + data = { + "name": "hivemind", + "communityId": community_id, + "options": { + "platforms": [ + { + "platformId": platform_id, + "options": { + "channels": channels, + "roles": ["role_id"], + "users": ["user_id"], + }, + } + ] + }, + } + client["Module"]["modules"].insert_one(data) + + if create_platform: + client["Core"]["platforms"].insert_one( + { + "_id": platform_id, + "name": "discord", + "metadata": { + "action": { + "INT_THR": 1, + "UW_DEG_THR": 1, + "PAUSED_T_THR": 1, + "CON_T_THR": 4, + "CON_O_THR": 3, + "EDGE_STR_THR": 5, + "UW_THR_DEG_THR": 5, + "VITAL_T_THR": 4, + "VITAL_O_THR": 3, + "STILL_T_THR": 2, + "STILL_O_THR": 2, + "DROP_H_THR": 2, + "DROP_I_THR": 1, + }, + "window": {"period_size": 7, "step_size": 1}, + "id": guild_id, + "isInProgress": False, + "period": datetime.now() - timedelta(days=35), + "icon": "some_icon_hash", + "selectedChannels": channels, + "name": "GuildName", + }, + "community": community_id, + "disconnectedAt": None, + "connectedAt": datetime.now(), + "createdAt": datetime.now(), + "updatedAt": datetime.now(), + } + ) + def test_empty_data_empty_fromdate(self): + channels = ["111111", "22222"] guild_id = "1234" + self.setup_db( + channels=channels, + guild_id=guild_id, + ) + client = MongoSingleton.get_instance().client client[guild_id].drop_collection("rawinfos") @@ -26,7 +104,13 @@ def test_empty_data_non_empty_fromdate(self): self.assertEqual(messages, []) def test_some_data_available_empty_fromdate_single_channel_single_thread(self): + channels = ["111111"] guild_id = "1234" + self.setup_db( + channels=channels, + guild_id=guild_id, + ) + client = MongoSingleton.get_instance().client client[guild_id].drop_collection("rawinfos") from_date = datetime(2023, 9, 29) @@ -45,7 +129,7 @@ def test_some_data_available_empty_fromdate_single_channel_single_thread(self): 2023, 10, i + 1 ), # Different dates in October 2023 "messageId": f"11111{i}", - "channelId": "12454123", + "channelId": channels[0], "channelName": "general", "threadId": None, "threadName": None, @@ -73,7 +157,7 @@ def test_some_data_available_empty_fromdate_single_channel_single_thread(self): self.assertEqual(messages[0]["replied_user"], None) self.assertEqual(messages[0]["createdDate"], datetime(2023, 10, 1)) self.assertEqual(messages[0]["messageId"], "111110") - self.assertEqual(messages[0]["channelId"], "12454123") + self.assertEqual(messages[0]["channelId"], channels[0]) self.assertEqual(messages[0]["channelName"], "general") self.assertEqual(messages[0]["threadId"], None) self.assertEqual(messages[0]["threadName"], None) @@ -89,7 +173,7 @@ def test_some_data_available_empty_fromdate_single_channel_single_thread(self): self.assertEqual(messages[0]["replied_user"], None) self.assertEqual(messages[0]["createdDate"], datetime(2023, 10, 2)) self.assertEqual(messages[0]["messageId"], "111111") - self.assertEqual(messages[0]["channelId"], "12454123") + self.assertEqual(messages[0]["channelId"], channels[0]) self.assertEqual(messages[0]["channelName"], "general") self.assertEqual(messages[0]["threadId"], None) self.assertEqual(messages[0]["threadName"], None) @@ -105,7 +189,7 @@ def test_some_data_available_empty_fromdate_single_channel_single_thread(self): self.assertEqual(messages[0]["replied_user"], None) self.assertEqual(messages[0]["createdDate"], datetime(2023, 10, 3)) self.assertEqual(messages[0]["messageId"], "111112") - self.assertEqual(messages[0]["channelId"], "12454123") + self.assertEqual(messages[0]["channelId"], channels[0]) self.assertEqual(messages[0]["channelName"], "general") self.assertEqual(messages[0]["threadId"], None) self.assertEqual(messages[0]["threadName"], None) @@ -116,7 +200,13 @@ def test_some_data_available_empty_fromdate_single_channel_single_thread(self): def test_count_with_some_data_available_empty_fromdate_two_channel_single_thread( self, ): + channels = ["111111", "22222"] guild_id = "1234" + self.setup_db( + channels=channels, + guild_id=guild_id, + ) + client = MongoSingleton.get_instance().client client[guild_id].drop_collection("rawinfos") from_date = datetime(2023, 9, 29) @@ -127,11 +217,11 @@ def test_count_with_some_data_available_empty_fromdate_two_channel_single_thread ch_id: str if i == 2: ch = "channel#2" - ch_id = "112" + ch_id = channels[1] day = datetime(2023, 10, 2) else: ch = "channel#1" - ch_id = "111" + ch_id = channels[0] day = datetime(2023, 10, 1) data = {