From 756ab3f243400eb825106b60608a6fd2c801e03c Mon Sep 17 00:00:00 2001 From: Mohammad Amin Date: Mon, 21 Oct 2024 15:06:31 +0330 Subject: [PATCH 1/3] fix: runtime bugs! wrong passing the variables from methods and wrong method calls. --- dags/hivemind_telegram_etl.py | 34 ++++++++++++++++++++++------------ 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/dags/hivemind_telegram_etl.py b/dags/hivemind_telegram_etl.py index 44513c33..f09b7d1e 100644 --- a/dags/hivemind_telegram_etl.py +++ b/dags/hivemind_telegram_etl.py @@ -28,12 +28,11 @@ def fetch_chat_ids() -> list[tuple[str, str]]: a list of Telegram chat id and name """ load_dotenv() - chat_infos = TelegramChats.extract_chats() - logging.info(f"Extracted chats: {chat_infos}") + chat_infos = TelegramChats().extract_chats() return chat_infos @task - def chat_existence(chat_info: tuple[str, str]) -> tuple[str, str]: + def chat_existence(chat_info: tuple[str, str]) -> dict[str, tuple[str, str] | str]: """ check if the community & platform was created for the telegram or not if not, create a community and platform and hivemind module for it @@ -46,7 +45,9 @@ def chat_existence(chat_info: tuple[str, str]) -> tuple[str, str]: Returns --------- - chat_info : tuple[str, str] + details : dict[str, tuple[str, str] | str] + the chat details containing the chat_info + and a community id related to that tuple containing telegram chat id and chat name """ chat_id = chat_info[0] @@ -62,20 +63,29 @@ def chat_existence(chat_info: tuple[str, str]) -> tuple[str, str]: community_id = utils.create_platform() - return chat_info, community_id + return { + "chat_info": chat_info, + "community_id": str(community_id), + } @task - def processor(chat_info: tuple[str, str], community_id: str) -> None: + def processor( + details: dict[str, tuple[str, str] | str], + ) -> None: """ extract, transform, and load telegram data Parameters ----------- - chat_id : str - a telegram chat id - community_id : str - the community id, related the created community + details : dict[str, tuple[str, str] | str] + the chat details containing the chat_info + and a community id related to that + tuple containing telegram chat id and chat name """ + # unwrapping data + chat_info = details["chat_info"] + community_id = details["community_id"] + chat_id = chat_info[0] chat_name = chat_info[1] @@ -99,5 +109,5 @@ def processor(chat_info: tuple[str, str], community_id: str) -> None: ingestion_pipeline.run_pipeline(docs=documents) chat_infos = fetch_chat_ids() - chat_info, community_id = chat_existence.expand(chat_info=chat_infos) - processor(chat_info=chat_info, community_id=community_id) + details = chat_existence.expand(chat_info=chat_infos) + processor(details=details) From db3c172e242c57710a2331c1eaa571909357723b Mon Sep 17 00:00:00 2001 From: Mohammad Amin Date: Mon, 21 Oct 2024 15:31:42 +0330 Subject: [PATCH 2/3] fix: more logs & runtime bug fixing! --- dags/hivemind_telegram_etl.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/dags/hivemind_telegram_etl.py b/dags/hivemind_telegram_etl.py index f09b7d1e..a3f3a200 100644 --- a/dags/hivemind_telegram_etl.py +++ b/dags/hivemind_telegram_etl.py @@ -82,10 +82,14 @@ def processor( and a community id related to that tuple containing telegram chat id and chat name """ + load_dotenv() + logging.info(f"received details: {details}!") # unwrapping data chat_info = details["chat_info"] community_id = details["community_id"] + logging.info(f"Started processing community: {community_id}") + chat_id = chat_info[0] chat_name = chat_info[1] @@ -102,12 +106,18 @@ def processor( if latest_date: # this is to catch any edits for messages of 30 days ago from_date = latest_date - timedelta(days=30) + logging.info(f"Started extracting from date: {from_date}!") messages = extractor.extract(from_date=from_date) else: + logging.info("Started extracting data from scratch!") messages = extractor.extract() + + logging.info(f"Extracted {len(messages)} messages!") documents = transformer.transform(messages=messages) + logging.info(f"Messages transformed!") ingestion_pipeline.run_pipeline(docs=documents) + logging.info("Finished loading into database!") chat_infos = fetch_chat_ids() details = chat_existence.expand(chat_info=chat_infos) - processor(details=details) + processor.expand(details=details) From 8a2a3c680dc2190762c358daa8c1b37143066623 Mon Sep 17 00:00:00 2001 From: Mohammad Amin Date: Mon, 21 Oct 2024 15:36:53 +0330 Subject: [PATCH 3/3] fix: the usage of fstring without variable! --- dags/hivemind_telegram_etl.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dags/hivemind_telegram_etl.py b/dags/hivemind_telegram_etl.py index a3f3a200..8b001937 100644 --- a/dags/hivemind_telegram_etl.py +++ b/dags/hivemind_telegram_etl.py @@ -114,7 +114,7 @@ def processor( logging.info(f"Extracted {len(messages)} messages!") documents = transformer.transform(messages=messages) - logging.info(f"Messages transformed!") + logging.info(f"{len(messages)} Messages transformed!") ingestion_pipeline.run_pipeline(docs=documents) logging.info("Finished loading into database!")