diff --git a/dags/hivemind_etl_helpers/src/db/telegram/extract/messages.py b/dags/hivemind_etl_helpers/src/db/telegram/extract/messages.py index d10d867b..02a862a2 100644 --- a/dags/hivemind_etl_helpers/src/db/telegram/extract/messages.py +++ b/dags/hivemind_etl_helpers/src/db/telegram/extract/messages.py @@ -49,7 +49,13 @@ def extract(self, from_date: datetime | None = None) -> list[TelegramMessagesMod first_message AS message, last_edit.updated_at AS edited_at, last_edit.text AS message_text - OPTIONAL MATCH (author:TGUser)-[created_rel:CREATED_MESSAGE]->(message) + MATCH (author:TGUser)-[created_rel:CREATED_MESSAGE]->(message) + WHERE NOT EXISTS {{ + MATCH (author)-[banned_rel:BANNED]->(c:TGChat {{id: $chat_id}}) + MATCH (author)-[joined_rel:JOINED|UNBANNED]->(c) + WITH author, MAX(banned_rel.date) AS banned_time, MAX(joined_rel.date) AS joined_time + WHERE banned_time > joined_time + }} OPTIONAL MATCH (reacted_user:TGUser)-[react_rel:REACTED_TO]->(message) OPTIONAL MATCH (reply_msg:TGMessage)-[:REPLIED]->(message) OPTIONAL MATCH (replied_user:TGUser)-[:CREATED_MESSAGE]->(reply_msg) diff --git a/dags/hivemind_etl_helpers/src/db/telegram/extract/tc_chats.py b/dags/hivemind_etl_helpers/src/db/telegram/extract/tc_chats.py index 61b7f46d..a42219f4 100644 --- a/dags/hivemind_etl_helpers/src/db/telegram/extract/tc_chats.py +++ b/dags/hivemind_etl_helpers/src/db/telegram/extract/tc_chats.py @@ -7,13 +7,13 @@ class TelegramChats: def __init__(self) -> None: self._connection = Neo4jOps.get_instance() - def extract_chats(self) -> list[tuple[str, str]]: + def extract_chats(self) -> list[tuple[int, str]]: """ extract the chat id and chat names Returns --------- - chat_info : list[tuple[str, str]] + chat_info : list[tuple[int, str]] a list of Telegram chat id and chat name """ driver = self._connection.neo4j_driver @@ -24,9 +24,7 @@ def extract_chats(self) -> list[tuple[str, str]]: records = session.run( "MATCH (c:TGChat) RETURN c.id as chat_id, c.title as name" ) - chat_info = [ - (str(record["chat_id"]), str(record["name"])) for record in records - ] + chat_info = [(record["chat_id"], record["name"]) for record in records] except Exception as exp: logging.error(f"Exception during extracting chat ids. exp: {exp}") diff --git a/dags/hivemind_etl_helpers/src/db/telegram/utils/platform.py b/dags/hivemind_etl_helpers/src/db/telegram/utils/platform.py index 5dea1da1..775b0413 100644 --- a/dags/hivemind_etl_helpers/src/db/telegram/utils/platform.py +++ b/dags/hivemind_etl_helpers/src/db/telegram/utils/platform.py @@ -52,6 +52,7 @@ def create_platform(self) -> ObjectId: community_id = ObjectId() self._client[self.database][self.collection].insert_one( { + "name": "telegram", "metadata": { "id": self.chat_id, "name": self.chat_name, diff --git a/dags/hivemind_etl_helpers/tests/integration/test_extract_messages.py b/dags/hivemind_etl_helpers/tests/integration/test_extract_messages.py index e33b61ba..506db19d 100644 --- a/dags/hivemind_etl_helpers/tests/integration/test_extract_messages.py +++ b/dags/hivemind_etl_helpers/tests/integration/test_extract_messages.py @@ -254,3 +254,82 @@ def test_extract_multiple_data_with_from_date(self): data = self.extractor.extract(from_date=datetime(2024, 1, 1)) self.assertEqual(data, []) + + def test_extract_single_banned_user(self): + with self.extractor._connection.neo4j_driver.session() as session: + session.run( + """ + CREATE (c:TGChat {id: $chat_id}), + (u1:TGUser {id: '927814807.0', username: 'User One'}), + (u2:TGUser {id: '203678862.0', username: 'User Two'}), + (u1)-[:JOINED {date: $joined_date1}]->(c), + (u2)-[:JOINED {date: $joined_date2}]->(c), + (m1:TGMessage { + id: '3.0', + text: '🎉️️️️️️ Welcome to the TC Ingestion Pipeline', + date: $created_at1, + updated_at: $created_at1 + } + ), + (m4:TGMessage { + id: '3.0', + text: '🎉️️️️️️ Welcome to the TC Ingestion Pipeline. EDITED MSG', + date: $created_at4, + updated_at: $created_at4 + } + ), + (m2:TGMessage { + id: '4.0', + text: 'Hi', + date: $created_at2, + updated_at: $created_at2 + } + ), + (m3:TGMessage { + id: '5.0', + text: 'Reply🫡', + date: $created_at3, + updated_at: $created_at3 + } + ), + (m1)-[:SENT_IN]->(c), + (m2)-[:SENT_IN]->(c), + (m3)-[:SENT_IN]->(c), + (m4)-[:SENT_IN]->(c), + (u1)-[:CREATED_MESSAGE]->(m1), + (u2)-[:CREATED_MESSAGE]->(m2), + (u2)-[:CREATED_MESSAGE]->(m3), + (m1)-[:EDITED]->(m4), + (m3)-[:REPLIED]->(m1), + (u2)-[:REACTED_TO {new_reaction: '[{"type":"emoji","emoji":"🍓"}]', date: $reaction_date}]->(m1), + (u2)-[:BANNED {date: $banned_date}]->(c) + """, + { + "chat_id": self.chat_id, + "created_at1": 1672531200.0, # Sunday, January 1, 2023 12:00:00 AM + "joined_date1": 1672531100.0, # Saturday, December 31, 2022 11:58:20 PM + "joined_date2": 1672531105.0, # Saturday, December 31, 2022 11:58:25 PM + "created_at4": 1672531205.0, # Sunday, January 1, 2023 12:00:05 AM + "created_at2": 1672617600.0, # Monday, January 2, 2023 12:00:00 AM + "created_at3": 1672704000.0, # Tuesday, January 3, 2023 12:00:00 AM + "reaction_date": 1672790400.0, # Wednesday, January 4, 2023 12:00:00 AM + "banned_date": 1673633100.0, # Friday, January 13, 2023 6:05:00 PM + }, + ) + data = self.extractor.extract() + + self.assertEqual( + data, + [ + TelegramMessagesModel( + message_id=3, + message_text="🎉️️️️️️ Welcome to the TC Ingestion Pipeline. EDITED MSG", + author_username="User One", + message_created_at=1672531200.0, + message_edited_at=1672531205.0, + mentions=[], + repliers=["User Two"], + reactors=["User Two"], + ), + ], + ) diff --git a/dags/hivemind_etl_helpers/tests/integration/test_telegram_chats.py b/dags/hivemind_etl_helpers/tests/integration/test_telegram_chats.py index 69a983c1..6f179653 100644 --- a/dags/hivemind_etl_helpers/tests/integration/test_telegram_chats.py +++ b/dags/hivemind_etl_helpers/tests/integration/test_telegram_chats.py @@ -52,7 +52,7 @@ def test_extract_chats_single_chat(self): ) chat_infos = self.tc_chats.extract_chats() - self.assertEqual(chat_infos, [("100000", "test chat")]) + self.assertEqual(chat_infos, [(100000, "test chat")]) def test_extract_chats_multiple_chats(self): neo4j_driver = self.tc_chats._connection.neo4j_driver @@ -109,8 +109,8 @@ def test_extract_chats_multiple_chats(self): self.assertEqual( chat_ids, [ - ("100001", "test chat"), - ("100002", "test chat 2"), - ("100003", "test chat 3"), + (100001, "test chat"), + (100002, "test chat 2"), + (100003, "test chat 3"), ], ) diff --git a/dags/hivemind_etl_helpers/tests/integration/test_telegram_comminity.py b/dags/hivemind_etl_helpers/tests/integration/test_telegram_comminity.py index 733f0cc7..33b8c7b4 100644 --- a/dags/hivemind_etl_helpers/tests/integration/test_telegram_comminity.py +++ b/dags/hivemind_etl_helpers/tests/integration/test_telegram_comminity.py @@ -18,6 +18,9 @@ def setUp(self) -> None: self.telegram_platform.database = "TempPlatforms" self.client.drop_database(self.telegram_platform.database) + def tearDown(self) -> None: + self.client.drop_database(self.telegram_platform.database) + def test_check_no_platform_available(self): result = self.telegram_platform.check_platform_existence() self.assertFalse(result) @@ -29,6 +32,7 @@ def test_single_platform_available(self): self.telegram_platform.collection ].insert_one( { + "name": "telegram", "metadata": { "id": self.chat_id, "name": self.chat_name, @@ -55,6 +59,7 @@ def test_telegram_multiple_platform_not_available(self): ].insert_many( [ { + "name": "telegram", "metadata": { "id": chat_id, "name": chat_name, @@ -65,6 +70,7 @@ def test_telegram_multiple_platform_not_available(self): "updatedAt": datetime.now(), }, { + "name": "telegram", "metadata": { "id": chat_id2, "name": chat_name2, @@ -75,6 +81,7 @@ def test_telegram_multiple_platform_not_available(self): "updatedAt": datetime.now(), }, { + "name": "telegram", "metadata": { "id": chat_id3, "name": chat_name3, diff --git a/dags/hivemind_telegram_etl.py b/dags/hivemind_telegram_etl.py index 8b001937..f0b9d6e2 100644 --- a/dags/hivemind_telegram_etl.py +++ b/dags/hivemind_telegram_etl.py @@ -18,13 +18,13 @@ ) as dag: @task - def fetch_chat_ids() -> list[tuple[str, str]]: + def fetch_chat_ids() -> list[tuple[int, str]]: """ Getting all Telegram chats from the database Returns --------- - chat_infos : list[tuple[str, str]] + chat_infos : list[tuple[int, str]] a list of Telegram chat id and name """ load_dotenv()