Skip to content

Commit

Permalink
fix: codeRabbitAI suggestions!
Browse files Browse the repository at this point in the history
  • Loading branch information
amindadgar committed Oct 21, 2024
1 parent dd5101b commit 1160935
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 34 deletions.
12 changes: 6 additions & 6 deletions dags/hivemind_etl_helpers/ingestion_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from llama_index.core.schema import BaseNode
from llama_index.storage.docstore.mongodb import MongoDocumentStore
from llama_index.storage.kvstore.redis import RedisKVStore as RedisCache
from qdrant_client.conversions import common_types as types
from qdrant_client.conversions import common_types as qdrant_types
from qdrant_client.http import models
from tc_hivemind_backend.db.credentials import load_postgres_credentials
from tc_hivemind_backend.db.qdrant import QdrantSingleton
Expand Down Expand Up @@ -93,8 +93,8 @@ def run_pipeline(self, docs: list[Document]) -> list[BaseNode]:
def _create_payload_index(
self,
field_name: str,
field_schema: types.PayloadSchemaType,
) -> types.UpdateResult:
field_schema: qdrant_types.PayloadSchemaType,
) -> qdrant_types.UpdateResult:
"""
Creates an index on a field under the payload of points in qdrant db
Expand All @@ -104,7 +104,7 @@ def _create_payload_index(
------------
field_name : str
the field name under points' payload to create the index for
field_schema : types.PayloadSchemaType
field_schema : qdrant_client.conversions.common_types.PayloadSchemaType
the schema type of the field
Returns
Expand Down Expand Up @@ -162,5 +162,5 @@ def get_latest_document_date(self, field_name: str) -> datetime | None:
except Exception as exp:
logging.error(f"Error: {exp} while loading latest point!")
latest_date = None
finally:
return latest_date

return latest_date
6 changes: 2 additions & 4 deletions dags/hivemind_etl_helpers/src/db/telegram/extract/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,16 @@ def extract(self, from_date: datetime | None = None) -> list[TelegramMessagesMod
tg_messages : list[TelegramMessagesModel]
the telegram messages
"""
query = "MATCH (c:TGChat {id: $chat_id})<-[:SENT_IN]-(message:TGMessage)"

# initialize
where_clause: str | None = None
from_date_timestamp: int | None = None

if from_date:
from_date_timestamp = int(from_date.timestamp() * 1000)
where_clause = f"""
where_clause = """
AND message.date >= $from_date_timestamp
"""
query += f"""
query = f"""
MATCH (c:TGChat {{id: $chat_id}})<-[:SENT_IN]-(message:TGMessage)
WHERE message.text IS NOT NULL
{where_clause if where_clause else ""}
Expand Down
10 changes: 7 additions & 3 deletions dags/hivemind_etl_helpers/src/db/telegram/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@


class TelegramMessagesModel(BaseModel):
"""
Represents a Telegram message with its associated metadata.
"""

message_id: int
message_text: str
author_username: str
message_created_at: float
message_edited_at: float
mentions: list
repliers: list
reactors: list
mentions: list[str]
repliers: list[str]
reactors: list[str]
8 changes: 4 additions & 4 deletions dags/hivemind_etl_helpers/src/db/telegram/utils/platform.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from bson import ObjectId

from datetime import datetime
from datetime import datetime, timezone
from hivemind_etl_helpers.src.utils.mongo import MongoSingleton


Expand All @@ -21,7 +21,7 @@ def __init__(self, chat_id: str, chat_name: str) -> None:
self.database = "Core"
self.collection = "platforms"

def check_platform_existance(self) -> ObjectId | None:
def check_platform_existence(self) -> ObjectId | None:
"""
check if there's any platform exist for a chat_id
Expand Down Expand Up @@ -58,8 +58,8 @@ def create_platform(self) -> ObjectId:
},
"community": community_id,
"disconnectedAt": None,
"createdAt": datetime.now(),
"updatedAt": datetime.now(),
"createdAt": datetime.now().replace(tzinfo=timezone.utc),
"updatedAt": datetime.now().replace(tzinfo=timezone.utc),
}
)
return community_id
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def setUp(self) -> None:
self.client.drop_database(self.telegram_platform.database)

def test_check_no_platform_available(self):
result = self.telegram_platform.check_platform_existance()
result = self.telegram_platform.check_platform_existence()
self.assertFalse(result)

def test_single_platform_available(self):
Expand All @@ -40,10 +40,10 @@ def test_single_platform_available(self):
"updatedAt": datetime.now(),
}
)
created_community_id = self.telegram_platform.check_platform_existance()
created_community_id = self.telegram_platform.check_platform_existence()
self.assertEqual(community_id, created_community_id)

def telegram_multiple_platform_not_available(self):
def test_telegram_multiple_platform_not_available(self):
chat_id = "111111"
chat_name = "sample_chat1"
chat_id2 = "222222"
Expand Down Expand Up @@ -88,12 +88,12 @@ def telegram_multiple_platform_not_available(self):
]
)

result = self.telegram_platform.check_platform_existance()
result = self.telegram_platform.check_platform_existence()
self.assertIsNone(result)

def test_create_platform(self):
community_id = self.telegram_platform.create_platform()

self.assertIsNotNone(community_id)
fetched_community_id = self.telegram_platform.check_platform_existance()
fetched_community_id = self.telegram_platform.check_platform_existence()
self.assertEqual(fetched_community_id, community_id)
22 changes: 10 additions & 12 deletions dags/hivemind_telegram_etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from hivemind_etl_helpers.src.db.telegram.extract import TelegramChats, ExtractMessages
from hivemind_etl_helpers.src.db.telegram.transform import TransformMessages
from hivemind_etl_helpers.src.db.telegram.utility import TelegramUtils
from qdrant_client.http import models

with DAG(
dag_id="telegram_vector_store",
Expand All @@ -21,18 +20,19 @@
@task
def fetch_chat_ids() -> list[tuple[str, str]]:
"""
Getting all communities having discord from database
Getting all Telegram chats from the database
Returns
---------
chat_info : list[tuple[str, str]]
a list of Telegram chat name and id
a list of Telegram chat id and name
"""
load_dotenv()
chat_info = TelegramChats.extract_chats()
return chat_info

@task
def chat_existance(chat_info: tuple[str, str]) -> tuple[str, str]:
def chat_existence(chat_info: tuple[str, str]) -> tuple[str, str]:
"""
check if the community & platform was created for the telegram or not
if not, create a community and platform and hivemind module for it
Expand All @@ -45,16 +45,14 @@ def chat_existance(chat_info: tuple[str, str]) -> tuple[str, str]:
Returns
---------
chat_id : str
a telegram chat id
community_id : str
the community id, related the created community
chat_info : tuple[str, str]
tuple containing telegram chat id and chat name
"""
chat_id = chat_info[0]
chat_name = chat_info[1]

utils = TelegramUtils(chat_id=chat_id, chat_name=chat_name)
community_id = utils.check_platform_existance()
community_id = utils.check_platform_existence()
if community_id is None:
logging.info(
f"Platform with chat_id: {chat_id} doesn't exist. "
Expand All @@ -63,7 +61,7 @@ def chat_existance(chat_info: tuple[str, str]) -> tuple[str, str]:

community_id = utils.create_platform()

return chat_id, community_id
return chat_info, community_id

@task
def processor(chat_info: tuple[str, str], community_id: str) -> None:
Expand Down Expand Up @@ -100,5 +98,5 @@ def processor(chat_info: tuple[str, str], community_id: str) -> None:
ingestion_pipeline.run_pipeline(docs=documents)

chat_infos = fetch_chat_ids()
chat_id, community_id = chat_existance.expand(chat_info=chat_infos)
processor(chat_id=chat_id, community_id=community_id)
chat_info, community_id = chat_existence.expand(chat_info=chat_infos)
processor(chat_info=chat_info, community_id=community_id)

0 comments on commit 1160935

Please sign in to comment.