From d678d039d840a0e5c42d55f19627b23681d1ed2e Mon Sep 17 00:00:00 2001 From: Mohammad Amin Date: Mon, 7 Oct 2024 09:31:52 +0330 Subject: [PATCH] fix: dag dependency and more logs! --- dags/telegram_analyzer_etl.py | 53 ++++++++++++++++++++++++----------- 1 file changed, 36 insertions(+), 17 deletions(-) diff --git a/dags/telegram_analyzer_etl.py b/dags/telegram_analyzer_etl.py index bac1da32..8ef40470 100644 --- a/dags/telegram_analyzer_etl.py +++ b/dags/telegram_analyzer_etl.py @@ -109,18 +109,21 @@ def telegram_etl_raw_data( platform_id=platform_id, ) extracted_data = extractor.extract(period=period, recompute=recompute) - logging.info( - f"CHAT_ID: {chat_id}. {len(extracted_data)} data extracted! Transforming them . . ." - ) - transformer = TransformRawInfo(chat_id=chat_id) - transformed_data = transformer.transform( - raw_data=extracted_data, - ) - logging.info( - f"CHAT_ID: {chat_id}. Loading {len(transformed_data)} to database!" - ) - loader = LoadTransformedData(platform_id=platform_id) - loader.load(processed_data=transformed_data, recompute=recompute) + if len(extracted_data) != 0: + logging.info( + f"CHAT_ID: {chat_id}. {len(extracted_data)} data extracted! Transforming them . . ." + ) + transformer = TransformRawInfo(chat_id=chat_id) + transformed_data = transformer.transform( + raw_data=extracted_data, + ) + logging.info( + f"CHAT_ID: {chat_id}. Loading {len(transformed_data)} to database!" + ) + loader = LoadTransformedData(platform_id=platform_id) + loader.load(processed_data=transformed_data, recompute=recompute) + else: + logging.info(f"CHAT_ID: {chat_id}. No raw data extracted!") @task def telegram_etl_raw_members( @@ -148,12 +151,23 @@ def telegram_etl_raw_members( # period = platform_info["period"] recompute = platform_info["recompute"] + logging.info(f"CHAT_ID: {chat_id}. Extracting raw members!") + extractor = ExtractRawMembers(chat_id=chat_id, platform_id=platform_id) extracted_data = extractor.extract(recompute=recompute) - transformer = TransformRawMembers() - transformed_data = transformer.transform(raw_members=extracted_data) - loader = LoadTransformedMembers(platform_id=platform_id) - loader.load(processed_data=transformed_data, recompute=recompute) + if len(extracted_data) != 0: + logging.info( + f"CHAT_ID: {chat_id}. {len(extracted_data)} data extracted! Transforming them . . ." + ) + transformer = TransformRawMembers() + transformed_data = transformer.transform(raw_members=extracted_data) + logging.info( + f"CHAT_ID: {chat_id}. Loading {len(transformed_data)} to database!" + ) + loader = LoadTransformedMembers(platform_id=platform_id) + loader.load(processed_data=transformed_data, recompute=recompute) + else: + logging.info(f"CHAT_ID: {chat_id}. No raw members extracted!") @task def analyze_telegram(platform_processed: dict[str, str | bool]) -> None: @@ -200,4 +214,9 @@ def analyze_telegram(platform_processed: dict[str, str | bool]) -> None: raw_data_etl = telegram_etl_raw_data.expand(platform_info=platform_modules) raw_members_etl = telegram_etl_raw_members.expand(platform_info=platform_modules) - raw_members_etl >> analyze_telegram(platform_processed=raw_data_etl) + + analyze_discourse_task = analyze_telegram.expand( + platform_processed=platform_modules + ) + + [raw_data_etl, raw_members_etl] >> analyze_discourse_task