diff --git a/dags/hivemind_etl_helpers/src/db/discord/fetch_raw_messages.py b/dags/hivemind_etl_helpers/src/db/discord/fetch_raw_messages.py index 5cb3268d..d39d4441 100644 --- a/dags/hivemind_etl_helpers/src/db/discord/fetch_raw_messages.py +++ b/dags/hivemind_etl_helpers/src/db/discord/fetch_raw_messages.py @@ -22,7 +22,7 @@ def fetch_raw_messages(guild_id: str, from_date: datetime | None = None) -> list """ client = MongoSingleton.get_instance().get_client() - channels = fetch_channels(guild_id=guild_id) + channels, from_date_modules = fetch_channels_and_from_date(guild_id=guild_id) raw_messages: list[dict] if from_date is not None: @@ -45,6 +45,7 @@ def fetch_raw_messages(guild_id: str, from_date: datetime | None = None) -> list { "isGeneratedByWebhook": False, "channelId": {"$in": channels}, + "createdDate": {"$gte": from_date_modules}, } ) .sort("createdDate", 1) @@ -88,7 +89,7 @@ def fetch_raw_msg_grouped( """ client = MongoSingleton.get_instance().client - channels = fetch_channels(guild_id) + channels, from_date_modules = fetch_channels_and_from_date(guild_id=guild_id) # the pipeline to apply through mongodb pipeline: list[dict] = [] @@ -110,7 +111,18 @@ def fetch_raw_msg_grouped( ) else: pipeline.append( - {"$match": {"isGeneratedByWebhook": False, "channelId": {"$in": channels}}}, + { + "$match": { + "createdDate": { + "$gte": from_date_modules, + "$lt": datetime.now().replace( + hour=0, minute=0, second=0, microsecond=0 + ), + }, + "isGeneratedByWebhook": False, + "channelId": {"$in": channels}, + } + }, ) # sorting @@ -138,9 +150,10 @@ def fetch_raw_msg_grouped( return raw_messages_grouped -def fetch_channels(guild_id: str): +def fetch_channels_and_from_date(guild_id: str) -> tuple[list[str], datetime | None]: """ - fetch the channels from modules that we wanted to process + fetch the channels and the `fromDate` to process + from Module that we wanted to process Parameters ----------- @@ -151,6 +164,8 @@ def fetch_channels(guild_id: str): --------- channels : list[str] the channels to fetch data from + from_date : datetime | None + the processing from_date """ client = MongoSingleton.get_instance().client platform = client["Core"]["platforms"].find_one( @@ -173,9 +188,11 @@ def fetch_channels(guild_id: str): ) channels: list[str] + from_date: datetime | None = None if result is not None: channels = result["options"]["platforms"][0]["options"]["channels"] + from_date = result["options"]["platforms"][0]["fromDate"] else: raise ValueError("No modules set for this community!") - return channels + return channels, from_date diff --git a/dags/hivemind_etl_helpers/tests/integration/test_discord_fetch_modules_channels.py b/dags/hivemind_etl_helpers/tests/integration/test_discord_fetch_modules_channels.py index bb3a7a5b..a3190685 100644 --- a/dags/hivemind_etl_helpers/tests/integration/test_discord_fetch_modules_channels.py +++ b/dags/hivemind_etl_helpers/tests/integration/test_discord_fetch_modules_channels.py @@ -2,7 +2,9 @@ from unittest import TestCase from bson import ObjectId -from hivemind_etl_helpers.src.db.discord.fetch_raw_messages import fetch_channels +from hivemind_etl_helpers.src.db.discord.fetch_raw_messages import ( + fetch_channels_and_from_date, +) from hivemind_etl_helpers.src.utils.mongo import MongoSingleton @@ -30,6 +32,7 @@ def setup_db( "platforms": [ { "platformId": platform_id, + "fromDate": datetime(2024, 1, 1), "options": { "channels": channels, "roles": ["role_id"], @@ -79,7 +82,7 @@ def setup_db( ) def test_fetch_channels(self): - guild_id = "1234" + guild_id = "12345" channels = ["111111", "22222"] self.setup_db( create_modules=True, @@ -87,12 +90,14 @@ def test_fetch_channels(self): guild_id=guild_id, channels=channels, ) - channels = fetch_channels(guild_id="1234") + channels, from_date = fetch_channels_and_from_date(guild_id="1234") self.assertEqual(channels, channels) + self.assertIsInstance(from_date, datetime) + self.assertEqual(from_date, datetime(2024, 1, 1)) def test_fetch_channels_no_modules_available(self): - guild_id = "1234" + guild_id = "12345" channels = ["111111", "22222"] self.setup_db( create_modules=False, @@ -101,10 +106,10 @@ def test_fetch_channels_no_modules_available(self): channels=channels, ) with self.assertRaises(ValueError): - _ = fetch_channels(guild_id="1234") + _ = fetch_channels_and_from_date(guild_id="1234") def test_fetch_channels_no_platform_available(self): - guild_id = "1234" + guild_id = "12345" channels = ["111111", "22222"] self.setup_db( create_modules=True, @@ -114,4 +119,4 @@ def test_fetch_channels_no_platform_available(self): ) with self.assertRaises(ValueError): - _ = fetch_channels(guild_id="1234") + _ = fetch_channels_and_from_date(guild_id="1234")