Skip to content

Commit

Permalink
fix: dag dependency and more logs!
Browse files Browse the repository at this point in the history
  • Loading branch information
amindadgar committed Oct 7, 2024
1 parent 8f1ee38 commit d678d03
Showing 1 changed file with 36 additions and 17 deletions.
53 changes: 36 additions & 17 deletions dags/telegram_analyzer_etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,18 +109,21 @@ def telegram_etl_raw_data(
platform_id=platform_id,
)
extracted_data = extractor.extract(period=period, recompute=recompute)
logging.info(
f"CHAT_ID: {chat_id}. {len(extracted_data)} data extracted! Transforming them . . ."
)
transformer = TransformRawInfo(chat_id=chat_id)
transformed_data = transformer.transform(
raw_data=extracted_data,
)
logging.info(
f"CHAT_ID: {chat_id}. Loading {len(transformed_data)} to database!"
)
loader = LoadTransformedData(platform_id=platform_id)
loader.load(processed_data=transformed_data, recompute=recompute)
if len(extracted_data) != 0:
logging.info(
f"CHAT_ID: {chat_id}. {len(extracted_data)} data extracted! Transforming them . . ."
)
transformer = TransformRawInfo(chat_id=chat_id)
transformed_data = transformer.transform(
raw_data=extracted_data,
)
logging.info(
f"CHAT_ID: {chat_id}. Loading {len(transformed_data)} to database!"
)
loader = LoadTransformedData(platform_id=platform_id)
loader.load(processed_data=transformed_data, recompute=recompute)
else:
logging.info(f"CHAT_ID: {chat_id}. No raw data extracted!")

@task
def telegram_etl_raw_members(
Expand Down Expand Up @@ -148,12 +151,23 @@ def telegram_etl_raw_members(
# period = platform_info["period"]
recompute = platform_info["recompute"]

logging.info(f"CHAT_ID: {chat_id}. Extracting raw members!")

extractor = ExtractRawMembers(chat_id=chat_id, platform_id=platform_id)
extracted_data = extractor.extract(recompute=recompute)
transformer = TransformRawMembers()
transformed_data = transformer.transform(raw_members=extracted_data)
loader = LoadTransformedMembers(platform_id=platform_id)
loader.load(processed_data=transformed_data, recompute=recompute)
if len(extracted_data) != 0:
logging.info(
f"CHAT_ID: {chat_id}. {len(extracted_data)} data extracted! Transforming them . . ."
)
transformer = TransformRawMembers()
transformed_data = transformer.transform(raw_members=extracted_data)
logging.info(
f"CHAT_ID: {chat_id}. Loading {len(transformed_data)} to database!"
)
loader = LoadTransformedMembers(platform_id=platform_id)
loader.load(processed_data=transformed_data, recompute=recompute)
else:
logging.info(f"CHAT_ID: {chat_id}. No raw members extracted!")

@task
def analyze_telegram(platform_processed: dict[str, str | bool]) -> None:
Expand Down Expand Up @@ -200,4 +214,9 @@ def analyze_telegram(platform_processed: dict[str, str | bool]) -> None:

raw_data_etl = telegram_etl_raw_data.expand(platform_info=platform_modules)
raw_members_etl = telegram_etl_raw_members.expand(platform_info=platform_modules)
raw_members_etl >> analyze_telegram(platform_processed=raw_data_etl)

analyze_discourse_task = analyze_telegram.expand(
platform_processed=platform_modules
)

[raw_data_etl, raw_members_etl] >> analyze_discourse_task

0 comments on commit d678d03

Please sign in to comment.