diff --git a/dags/hivemind_etl_helpers/ingestion_pipeline.py b/dags/hivemind_etl_helpers/ingestion_pipeline.py index 68266336..2448d86d 100644 --- a/dags/hivemind_etl_helpers/ingestion_pipeline.py +++ b/dags/hivemind_etl_helpers/ingestion_pipeline.py @@ -15,7 +15,7 @@ from llama_index.core.schema import BaseNode from llama_index.storage.docstore.mongodb import MongoDocumentStore from llama_index.storage.kvstore.redis import RedisKVStore as RedisCache -from qdrant_client.conversions import common_types as types +from qdrant_client.conversions import common_types as qdrant_types from qdrant_client.http import models from tc_hivemind_backend.db.credentials import load_postgres_credentials from tc_hivemind_backend.db.qdrant import QdrantSingleton @@ -93,8 +93,8 @@ def run_pipeline(self, docs: list[Document]) -> list[BaseNode]: def _create_payload_index( self, field_name: str, - field_schema: types.PayloadSchemaType, - ) -> types.UpdateResult: + field_schema: qdrant_types.PayloadSchemaType, + ) -> qdrant_types.UpdateResult: """ Creates an index on a field under the payload of points in qdrant db @@ -104,7 +104,7 @@ def _create_payload_index( ------------ field_name : str the field name under points' payload to create the index for - field_schema : types.PayloadSchemaType + field_schema : qdrant_client.conversions.common_types.PayloadSchemaType the schema type of the field Returns @@ -162,5 +162,5 @@ def get_latest_document_date(self, field_name: str) -> datetime | None: except Exception as exp: logging.error(f"Error: {exp} while loading latest point!") latest_date = None - finally: - return latest_date + + return latest_date diff --git a/dags/hivemind_etl_helpers/src/db/telegram/extract/messages.py b/dags/hivemind_etl_helpers/src/db/telegram/extract/messages.py index 65d5c14c..af6aa232 100644 --- a/dags/hivemind_etl_helpers/src/db/telegram/extract/messages.py +++ b/dags/hivemind_etl_helpers/src/db/telegram/extract/messages.py @@ -24,18 +24,16 @@ def extract(self, from_date: datetime | None = None) -> list[TelegramMessagesMod tg_messages : list[TelegramMessagesModel] the telegram messages """ - query = "MATCH (c:TGChat {id: $chat_id})<-[:SENT_IN]-(message:TGMessage)" - # initialize where_clause: str | None = None from_date_timestamp: int | None = None if from_date: from_date_timestamp = int(from_date.timestamp() * 1000) - where_clause = f""" + where_clause = """ AND message.date >= $from_date_timestamp """ - query += f""" + query = f""" MATCH (c:TGChat {{id: $chat_id}})<-[:SENT_IN]-(message:TGMessage) WHERE message.text IS NOT NULL {where_clause if where_clause else ""} diff --git a/dags/hivemind_etl_helpers/src/db/telegram/schema.py b/dags/hivemind_etl_helpers/src/db/telegram/schema.py index 67ac50b5..16b48e15 100644 --- a/dags/hivemind_etl_helpers/src/db/telegram/schema.py +++ b/dags/hivemind_etl_helpers/src/db/telegram/schema.py @@ -2,11 +2,15 @@ class TelegramMessagesModel(BaseModel): + """ + Represents a Telegram message with its associated metadata. + """ + message_id: int message_text: str author_username: str message_created_at: float message_edited_at: float - mentions: list - repliers: list - reactors: list + mentions: list[str] + repliers: list[str] + reactors: list[str] diff --git a/dags/hivemind_etl_helpers/src/db/telegram/utils/platform.py b/dags/hivemind_etl_helpers/src/db/telegram/utils/platform.py index be66ad4c..1fa6e95d 100644 --- a/dags/hivemind_etl_helpers/src/db/telegram/utils/platform.py +++ b/dags/hivemind_etl_helpers/src/db/telegram/utils/platform.py @@ -1,6 +1,6 @@ from bson import ObjectId -from datetime import datetime +from datetime import datetime, timezone from hivemind_etl_helpers.src.utils.mongo import MongoSingleton @@ -21,7 +21,7 @@ def __init__(self, chat_id: str, chat_name: str) -> None: self.database = "Core" self.collection = "platforms" - def check_platform_existance(self) -> ObjectId | None: + def check_platform_existence(self) -> ObjectId | None: """ check if there's any platform exist for a chat_id @@ -58,8 +58,8 @@ def create_platform(self) -> ObjectId: }, "community": community_id, "disconnectedAt": None, - "createdAt": datetime.now(), - "updatedAt": datetime.now(), + "createdAt": datetime.now().replace(tzinfo=timezone.utc), + "updatedAt": datetime.now().replace(tzinfo=timezone.utc), } ) return community_id diff --git a/dags/hivemind_etl_helpers/tests/integration/test_telegram_comminity.py b/dags/hivemind_etl_helpers/tests/integration/test_telegram_comminity.py index d3d2b7b9..623430ea 100644 --- a/dags/hivemind_etl_helpers/tests/integration/test_telegram_comminity.py +++ b/dags/hivemind_etl_helpers/tests/integration/test_telegram_comminity.py @@ -20,7 +20,7 @@ def setUp(self) -> None: self.client.drop_database(self.telegram_platform.database) def test_check_no_platform_available(self): - result = self.telegram_platform.check_platform_existance() + result = self.telegram_platform.check_platform_existence() self.assertFalse(result) def test_single_platform_available(self): @@ -40,10 +40,10 @@ def test_single_platform_available(self): "updatedAt": datetime.now(), } ) - created_community_id = self.telegram_platform.check_platform_existance() + created_community_id = self.telegram_platform.check_platform_existence() self.assertEqual(community_id, created_community_id) - def telegram_multiple_platform_not_available(self): + def test_telegram_multiple_platform_not_available(self): chat_id = "111111" chat_name = "sample_chat1" chat_id2 = "222222" @@ -88,12 +88,12 @@ def telegram_multiple_platform_not_available(self): ] ) - result = self.telegram_platform.check_platform_existance() + result = self.telegram_platform.check_platform_existence() self.assertIsNone(result) def test_create_platform(self): community_id = self.telegram_platform.create_platform() self.assertIsNotNone(community_id) - fetched_community_id = self.telegram_platform.check_platform_existance() + fetched_community_id = self.telegram_platform.check_platform_existence() self.assertEqual(fetched_community_id, community_id) diff --git a/dags/hivemind_telegram_etl.py b/dags/hivemind_telegram_etl.py index aae3ebf6..c7674e44 100644 --- a/dags/hivemind_telegram_etl.py +++ b/dags/hivemind_telegram_etl.py @@ -8,7 +8,6 @@ from hivemind_etl_helpers.src.db.telegram.extract import TelegramChats, ExtractMessages from hivemind_etl_helpers.src.db.telegram.transform import TransformMessages from hivemind_etl_helpers.src.db.telegram.utility import TelegramUtils -from qdrant_client.http import models with DAG( dag_id="telegram_vector_store", @@ -21,18 +20,19 @@ @task def fetch_chat_ids() -> list[tuple[str, str]]: """ - Getting all communities having discord from database + Getting all Telegram chats from the database Returns --------- chat_info : list[tuple[str, str]] - a list of Telegram chat name and id + a list of Telegram chat id and name """ + load_dotenv() chat_info = TelegramChats.extract_chats() return chat_info @task - def chat_existance(chat_info: tuple[str, str]) -> tuple[str, str]: + def chat_existence(chat_info: tuple[str, str]) -> tuple[str, str]: """ check if the community & platform was created for the telegram or not if not, create a community and platform and hivemind module for it @@ -45,16 +45,14 @@ def chat_existance(chat_info: tuple[str, str]) -> tuple[str, str]: Returns --------- - chat_id : str - a telegram chat id - community_id : str - the community id, related the created community + chat_info : tuple[str, str] + tuple containing telegram chat id and chat name """ chat_id = chat_info[0] chat_name = chat_info[1] utils = TelegramUtils(chat_id=chat_id, chat_name=chat_name) - community_id = utils.check_platform_existance() + community_id = utils.check_platform_existence() if community_id is None: logging.info( f"Platform with chat_id: {chat_id} doesn't exist. " @@ -63,7 +61,7 @@ def chat_existance(chat_info: tuple[str, str]) -> tuple[str, str]: community_id = utils.create_platform() - return chat_id, community_id + return chat_info, community_id @task def processor(chat_info: tuple[str, str], community_id: str) -> None: @@ -100,5 +98,5 @@ def processor(chat_info: tuple[str, str], community_id: str) -> None: ingestion_pipeline.run_pipeline(docs=documents) chat_infos = fetch_chat_ids() - chat_id, community_id = chat_existance.expand(chat_info=chat_infos) - processor(chat_id=chat_id, community_id=community_id) + chat_info, community_id = chat_existence.expand(chat_info=chat_infos) + processor(chat_info=chat_info, community_id=community_id)