diff --git a/dags/analyzer_helper/telegram/extract_raw_data.py b/dags/analyzer_helper/telegram/extract_raw_data.py index 8a074b33..1c6a43bd 100644 --- a/dags/analyzer_helper/telegram/extract_raw_data.py +++ b/dags/analyzer_helper/telegram/extract_raw_data.py @@ -9,7 +9,7 @@ class ExtractRawInfo: - def __init__(self, chat_id: str, platform_id: str): + def __init__(self, chat_id: int, platform_id: str): """ Initialize the ExtractRawInfo with the forum endpoint, platform id and set up Neo4j and MongoDB connection. """ diff --git a/dags/analyzer_helper/telegram/extract_raw_members.py b/dags/analyzer_helper/telegram/extract_raw_members.py index af37e14a..0ec2f382 100644 --- a/dags/analyzer_helper/telegram/extract_raw_members.py +++ b/dags/analyzer_helper/telegram/extract_raw_members.py @@ -7,7 +7,7 @@ class ExtractRawMembers: - def __init__(self, chat_id: str, platform_id: str): + def __init__(self, chat_id: int, platform_id: str): """ Initialize the ExtractRawMembers with the Neo4j connection parameters. """ diff --git a/dags/analyzer_helper/telegram/tests/integration/test_telegram_extract_raw_data.py b/dags/analyzer_helper/telegram/tests/integration/test_telegram_extract_raw_data.py index 6ed85aa8..7e07e000 100644 --- a/dags/analyzer_helper/telegram/tests/integration/test_telegram_extract_raw_data.py +++ b/dags/analyzer_helper/telegram/tests/integration/test_telegram_extract_raw_data.py @@ -12,7 +12,7 @@ def setUpClass(cls): cls.neo4jConnection = Neo4jConnection() cls.client = MongoSingleton.get_instance().client cls.driver = cls.neo4jConnection.connect_neo4j() - cls.chat_id = "test_group" + cls.chat_id = 1234554321 cls.platform_id = "platform_db" cls.platform_db = cls.client[cls.platform_id] cls.extractor = ExtractRawInfo(cls.chat_id, cls.platform_id) diff --git a/dags/analyzer_helper/telegram/tests/integration/test_telegram_extract_raw_members.py b/dags/analyzer_helper/telegram/tests/integration/test_telegram_extract_raw_members.py index 9be1cc87..40983241 100644 --- a/dags/analyzer_helper/telegram/tests/integration/test_telegram_extract_raw_members.py +++ b/dags/analyzer_helper/telegram/tests/integration/test_telegram_extract_raw_members.py @@ -9,7 +9,7 @@ class TestExtractRawMembers(unittest.TestCase): def setUp(self): self.neo4jConnection = Neo4jConnection() self.driver = self.neo4jConnection.connect_neo4j() - self.chat_id = "telegram_test_group" + self.chat_id = 12345665432 self.platform_id = "telegram_test_platform" self.extractor = ExtractRawMembers(self.chat_id, self.platform_id) self.rawmembers_collection = self.extractor.rawmembers_collection diff --git a/dags/analyzer_helper/telegram/transform_raw_data.py b/dags/analyzer_helper/telegram/transform_raw_data.py index 26f94866..4bcf987a 100644 --- a/dags/analyzer_helper/telegram/transform_raw_data.py +++ b/dags/analyzer_helper/telegram/transform_raw_data.py @@ -5,7 +5,7 @@ class TransformRawInfo: - def __init__(self, chat_id: str): + def __init__(self, chat_id: int): self.converter = DateTimeFormatConverter self.user_bot_checker = UserBotChecker() self.chat_id = chat_id diff --git a/dags/hivemind_etl_helpers/src/db/telegram/extract/messages.py b/dags/hivemind_etl_helpers/src/db/telegram/extract/messages.py index da288297..04c77cfb 100644 --- a/dags/hivemind_etl_helpers/src/db/telegram/extract/messages.py +++ b/dags/hivemind_etl_helpers/src/db/telegram/extract/messages.py @@ -5,7 +5,7 @@ class ExtractMessages: - def __init__(self, chat_id: str) -> None: + def __init__(self, chat_id: int) -> None: self.chat_id = chat_id self._connection = Neo4jOps.get_instance() diff --git a/dags/hivemind_etl_helpers/src/db/telegram/extract/messages_daily.py b/dags/hivemind_etl_helpers/src/db/telegram/extract/messages_daily.py index f1da3cba..7c0b60a9 100644 --- a/dags/hivemind_etl_helpers/src/db/telegram/extract/messages_daily.py +++ b/dags/hivemind_etl_helpers/src/db/telegram/extract/messages_daily.py @@ -7,7 +7,7 @@ class ExtractMessagesDaily: - def __init__(self, chat_id: str) -> None: + def __init__(self, chat_id: int) -> None: self.extractor = ExtractMessages(chat_id=chat_id) def extract( @@ -31,7 +31,7 @@ def extract( daily_tg_messages: dict[date, list[TelegramMessagesModel]] = defaultdict(list) for msg in messages: - msg_date = datetime.fromtimestamp(msg.message_created_at / 1000).date() + msg_date = datetime.fromtimestamp(msg.message_created_at).date() daily_tg_messages[msg_date].append(msg) return daily_tg_messages diff --git a/dags/hivemind_etl_helpers/src/db/telegram/transform/messages.py b/dags/hivemind_etl_helpers/src/db/telegram/transform/messages.py index 70642093..fe2423de 100644 --- a/dags/hivemind_etl_helpers/src/db/telegram/transform/messages.py +++ b/dags/hivemind_etl_helpers/src/db/telegram/transform/messages.py @@ -3,7 +3,7 @@ class TransformMessages: - def __init__(self, chat_id: str, chat_name: str) -> None: + def __init__(self, chat_id: int, chat_name: str) -> None: self.chat_id = chat_id self.chat_name = chat_name diff --git a/dags/hivemind_etl_helpers/src/db/telegram/transform/summarizer.py b/dags/hivemind_etl_helpers/src/db/telegram/transform/summarizer.py index d93516cf..6657b0ae 100644 --- a/dags/hivemind_etl_helpers/src/db/telegram/transform/summarizer.py +++ b/dags/hivemind_etl_helpers/src/db/telegram/transform/summarizer.py @@ -10,7 +10,7 @@ class SummarizeMessages(SummaryBase): def __init__( self, - chat_id: str, + chat_id: int, chat_name: str, response_synthesizer: BaseSynthesizer | None = None, verbose: bool = False, diff --git a/dags/hivemind_etl_helpers/src/db/telegram/utils/platform.py b/dags/hivemind_etl_helpers/src/db/telegram/utils/platform.py index 1891878c..a9b21aaf 100644 --- a/dags/hivemind_etl_helpers/src/db/telegram/utils/platform.py +++ b/dags/hivemind_etl_helpers/src/db/telegram/utils/platform.py @@ -5,11 +5,11 @@ class TelegramPlatform: - def __init__(self, chat_id: str, chat_name: str) -> None: + def __init__(self, chat_id: int, chat_name: str) -> None: """ Parameters ----------- - chat_id : str + chat_id : int check if there's any platform exists chat_name : str the chat name to create later (if not already exists) diff --git a/dags/hivemind_etl_helpers/tests/unit/test_telegram_extract_daily_messages.py b/dags/hivemind_etl_helpers/tests/unit/test_telegram_extract_daily_messages.py index 7f4216d0..564fb2ac 100644 --- a/dags/hivemind_etl_helpers/tests/unit/test_telegram_extract_daily_messages.py +++ b/dags/hivemind_etl_helpers/tests/unit/test_telegram_extract_daily_messages.py @@ -10,7 +10,7 @@ class TestExtractMessagesDaily(unittest.TestCase): def setUp(self): load_dotenv() - self.chat_id = "test_chat" + self.chat_id = 12344321 self.extractor = ExtractMessagesDaily(chat_id=self.chat_id) @patch( @@ -32,8 +32,8 @@ def test_single_message(self, mock_extract): message_id=1, message_text="Hello, World!", author_username="user1", - message_created_at=1633046400000.0, # Equivalent to 2021-10-01T00:00:00 UTC - message_edited_at=1633046500000.0, + message_created_at=1633046400.0, # Equivalent to 2021-10-01T00:00:00 UTC + message_edited_at=1633046500.0, mentions=["user2"], repliers=["user3"], reactors=["user4"], @@ -41,7 +41,7 @@ def test_single_message(self, mock_extract): mock_extract.return_value = [msg] result = self.extractor.extract(from_date=None) - expected_date = datetime.fromtimestamp(msg.message_created_at / 1000).date() + expected_date = datetime.fromtimestamp(msg.message_created_at).date() self.assertIn(expected_date, result, "Expected date key in result") self.assertEqual( @@ -59,8 +59,8 @@ def test_multiple_messages_same_date(self, mock_extract): message_id=1, message_text="Message 1", author_username="user1", - message_created_at=1633046400000.0, # Equivalent to 2021-10-01 - message_edited_at=1633046500000.0, + message_created_at=1633046400.0, # Equivalent to 2021-10-01 + message_edited_at=1633046500.0, mentions=["user2"], repliers=["user3"], reactors=["user4"], @@ -69,8 +69,8 @@ def test_multiple_messages_same_date(self, mock_extract): message_id=2, message_text="Message 2", author_username="user2", - message_created_at=1633050000000.0, # Same date as msg1 - message_edited_at=1633050500000.0, + message_created_at=1633050000.0, # Same date as msg1 + message_edited_at=1633050500.0, mentions=["user1"], repliers=["user3"], reactors=["user4"], @@ -78,7 +78,7 @@ def test_multiple_messages_same_date(self, mock_extract): mock_extract.return_value = [msg1, msg2] result = self.extractor.extract(from_date=None) - expected_date = datetime.fromtimestamp(msg1.message_created_at / 1000).date() + expected_date = datetime.fromtimestamp(msg1.message_created_at).date() self.assertIn(expected_date, result, "Expected date key in result") self.assertEqual( @@ -94,8 +94,8 @@ def test_multiple_messages_different_dates(self, mock_extract): message_id=1, message_text="Message 1", author_username="user1", - message_created_at=1633046400000.0, # 2021-10-01 - message_edited_at=1633046500000.0, + message_created_at=1633046400.0, # 2021-10-01 + message_edited_at=1633046500.0, mentions=["user2"], repliers=["user3"], reactors=["user4"], @@ -104,8 +104,8 @@ def test_multiple_messages_different_dates(self, mock_extract): message_id=2, message_text="Message 2", author_username="user2", - message_created_at=1633132800000.0, # 2021-10-02 - message_edited_at=1633132900000.0, + message_created_at=1633132800.0, # 2021-10-02 + message_edited_at=1633132900.0, mentions=["user1"], repliers=["user3"], reactors=["user4"], @@ -113,8 +113,8 @@ def test_multiple_messages_different_dates(self, mock_extract): mock_extract.return_value = [msg1, msg2] result = self.extractor.extract(from_date=None) - date1 = datetime.fromtimestamp(msg1.message_created_at / 1000).date() - date2 = datetime.fromtimestamp(msg2.message_created_at / 1000).date() + date1 = datetime.fromtimestamp(msg1.message_created_at).date() + date2 = datetime.fromtimestamp(msg2.message_created_at).date() self.assertIn(date1, result, "Expected first date key in result") self.assertIn(date2, result, "Expected second date key in result") diff --git a/dags/hivemind_telegram_etl.py b/dags/hivemind_telegram_etl.py index 5826f992..8d0d7101 100644 --- a/dags/hivemind_telegram_etl.py +++ b/dags/hivemind_telegram_etl.py @@ -135,11 +135,16 @@ def process_data(messages): logging.info(f"Started extracting from date: {latest_date}!") messages = extractor.extract(from_date=latest_date) - logging.info(f"Extracted {len(messages)} messages!") + if dag_type == "messages": + msg_count = len(messages) + else: + msg_count = len(sum(messages.values(), [])) + + logging.info(f"Extracted {msg_count} messages!") # Process and load data documents = process_data(messages) - logging.info(f"Transformed {len(messages)} messages!") + logging.info(f"Transformed {len(documents)} messages!") ingestion_pipeline.run_pipeline(docs=documents) logging.info("Finished loading into database!")