From 4f309684cb7454a08c10c01bc3276a49194824a2 Mon Sep 17 00:00:00 2001 From: Mohammad Amin Date: Thu, 8 Aug 2024 16:11:11 +0330 Subject: [PATCH] fix: violation detection fix extraction query! --- dags/violation_detection_etl.py | 7 ++++++- dags/violation_detection_helpers/extract.py | 2 +- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/dags/violation_detection_etl.py b/dags/violation_detection_etl.py index e8ae6e73..aab89cde 100644 --- a/dags/violation_detection_etl.py +++ b/dags/violation_detection_etl.py @@ -49,6 +49,8 @@ def process_platforms(platform: dict[str, Any]): recompute = platform["recompute"] discord_users = platform["selected_discord_users"] + logging.info(f"Processing PLATFORM_ID: {platform_id}!") + # EXTRACT # TODO: Get resource_identifier from platform analyzer config @@ -72,6 +74,7 @@ def process_platforms(platform: dict[str, Any]): # Load if len(transformed_data) != 0: + logging.info(f"Loading {len(transformed_data)} documents into db!") loader = LoadPlatformLabeledData() loader.load(platform_id=platform, transformed_data=transformed_data) @@ -84,7 +87,9 @@ def process_platforms(platform: dict[str, Any]): reporter = SendReportDiscordUser(platform_id=platform_id) reporter.send(discord_user_id=discord_id, message=report) else: - logging.warning("No documents were transformed!") + logging.warning( + f"PLATFORM_ID: {platform_id}, No documents were transformed!" + ) platforms = get_violation_modules() process_platforms.expand(platform=platforms) diff --git a/dags/violation_detection_helpers/extract.py b/dags/violation_detection_helpers/extract.py index cc3864e1..7a4920c8 100644 --- a/dags/violation_detection_helpers/extract.py +++ b/dags/violation_detection_helpers/extract.py @@ -102,7 +102,7 @@ def extract( { **date_query, "text": {"$ne": None}, - "source_id": {"$in": resources}, + self.resource_name: {"$in": resources}, } ) return cursor, override_recompute