Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Telegram vectorize, added automated module creation! #310

Merged
merged 6 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions dags/hivemind_etl_helpers/src/db/telegram/utility.py

This file was deleted.

3 changes: 2 additions & 1 deletion dags/hivemind_etl_helpers/src/db/telegram/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from .platform import TelegramPlatform
from .module import TelegramModules
from .platform import TelegramPlatform
119 changes: 119 additions & 0 deletions dags/hivemind_etl_helpers/src/db/telegram/utils/module.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import logging
from datetime import datetime, timezone

from bson import ObjectId
from hivemind_etl_helpers.src.utils.mongo import MongoSingleton


class TelegramModules:
def __init__(self, community_id: str, platform_id: str) -> None:
"""
Parameters
-----------
community_id : str
the community id related to telegram platform
platform_id : str
The platform id related to telegram
"""
self._client = MongoSingleton.get_instance().get_client()
self.platform_id = platform_id
self.community_id = community_id

self.database = "Core"
self.collection = "modules"

def create(self):
"""
create a module if not exists for community_id
else, add a platform into the module if not exist and else do nothing
"""
exists = self._check_module_existence()
if not exists:
logging.info(
f"Module doesn't exist for community: {self.community_id}. Creating one."
)
self._create_module()
else:
logging.info(f"Module already exists for community: {self.community_id}")
platform_exists = self._check_platform_existence()
if not platform_exists:
logging.info(
f"Adding platform the already exists community with id: {self.community_id}"
amindadgar marked this conversation as resolved.
Show resolved Hide resolved
)
self._add_platform_to_community()
else:
logging.info("Platform was already added to modules!")

def _check_module_existence(self) -> bool:
"""
check if there's any module exists for a chat_id
amindadgar marked this conversation as resolved.
Show resolved Hide resolved

Returns
--------
existence : bool
True, if a community module is already set
False, if there's no module related to the community
"""
document = self._client[self.database][self.collection].find_one(
{"community": ObjectId(self.community_id)},
{
"_id": 1,
},
)
return True if document else False
amindadgar marked this conversation as resolved.
Show resolved Hide resolved

def _check_platform_existence(self):
amindadgar marked this conversation as resolved.
Show resolved Hide resolved
"""
check if the platform exist in a module holding the community id
"""
document = self._client[self.database][self.collection].find_one(
{
"community": ObjectId(self.community_id),
"options.platforms.platform": ObjectId(self.platform_id)
},
{
"_id": 1,
},
)
return True if document else False
amindadgar marked this conversation as resolved.
Show resolved Hide resolved

def _add_platform_to_community(self):
amindadgar marked this conversation as resolved.
Show resolved Hide resolved
"""
Having the community_id modules insert the platform into it
"""
result = self._client[self.database][self.collection].update_one(
{"community": ObjectId(self.community_id)},
{
"$push": {
"options.platforms": {
"platform": ObjectId(self.platform_id),
"name": "telegram",
"_id": ObjectId(),
}
},
"$set": {"updatedAt": datetime.now().replace(tzinfo=timezone.utc)},
amindadgar marked this conversation as resolved.
Show resolved Hide resolved
"$inc": {"__v": 1}
}
)
return result.modified_count > 0

def _create_module(self) -> None:
"""
create a module for the community holding platform
"""
self._client[self.database][self.collection].insert_one(
{
"name": "hivemind",
"community": ObjectId(self.community_id),
"options": {
"platforms": [{
"platform": ObjectId(self.platform_id),
"name": "telegram",
"_id": ObjectId()
}]
},
"createdAt": datetime.now().replace(tzinfo=timezone.utc),
"updatedAt": datetime.now().replace(tzinfo=timezone.utc),
}
)

19 changes: 14 additions & 5 deletions dags/hivemind_etl_helpers/src/db/telegram/utils/platform.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def __init__(self, chat_id: str, chat_name: str) -> None:
self.database = "Core"
self.collection = "platforms"

def check_platform_existence(self) -> ObjectId | None:
def check_platform_existence(self) -> tuple[ObjectId | None, ObjectId | None]:
"""
check if there's any platform exist for a chat_id

Expand All @@ -30,27 +30,35 @@ def check_platform_existence(self) -> ObjectId | None:
community_id : ObjectId | None
the community id if available
else will be None
platform_id : ObjectId | None
the paltform id if available
else will be None
"""
document = self._client[self.database][self.collection].find_one(
{"metadata.id": self.chat_id},
{
"community": 1,
"_id": 1,
},
)
community_id = document["community"] if document else None
platform_id = document["_id"] if document else None

return document["community"] if document else None
return community_id, platform_id

def create_platform(self) -> ObjectId:
def create_platform(self) -> tuple[ObjectId, ObjectId]:
"""
create a platform for the chat_id having the community id

Returns
---------
community_id : ObjectId
the community ID that was assigned to a platform
platform_id : ObjectId
the created platform ID
"""
community_id = ObjectId()
self._client[self.database][self.collection].insert_one(
result = self._client[self.database][self.collection].insert_one(
{
"name": "telegram",
"metadata": {
Expand All @@ -63,4 +71,5 @@ def create_platform(self) -> ObjectId:
"updatedAt": datetime.now().replace(tzinfo=timezone.utc),
}
)
return community_id
platform_id = result.inserted_id
return community_id, platform_id
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ def tearDown(self) -> None:

def test_check_no_platform_available(self):
result = self.telegram_platform.check_platform_existence()
self.assertFalse(result)
self.assertIsNone(result)

def test_single_platform_available(self):
community_id = ObjectId()

self.client[self.telegram_platform.database][
result = self.client[self.telegram_platform.database][
self.telegram_platform.collection
].insert_one(
{
Expand All @@ -43,8 +43,10 @@ def test_single_platform_available(self):
"updatedAt": datetime.now(),
}
)
created_community_id = self.telegram_platform.check_platform_existence()
created_community_id, created_platform_id = self.telegram_platform.check_platform_existence()
self.assertEqual(community_id, created_community_id)
self.assertEqual(result.inserted_id, created_platform_id)


def test_telegram_multiple_platform_not_available(self):
chat_id = "111111"
Expand Down Expand Up @@ -94,12 +96,15 @@ def test_telegram_multiple_platform_not_available(self):
]
)

result = self.telegram_platform.check_platform_existence()
self.assertIsNone(result)
community_id, platform_id = self.telegram_platform.check_platform_existence()
self.assertIsNone(community_id)
self.assertIsNone(platform_id)

def test_create_platform(self):
community_id = self.telegram_platform.create_platform()
community_id, platform_id = self.telegram_platform.create_platform()

self.assertIsNotNone(community_id)
fetched_community_id = self.telegram_platform.check_platform_existence()
self.assertIsNotNone(platform_id)
fetched_community_id, fetched_platform_id = self.telegram_platform.check_platform_existence()
self.assertEqual(fetched_community_id, community_id)
self.assertEqual(fetched_platform_id, platform_id)
11 changes: 7 additions & 4 deletions dags/hivemind_telegram_etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from hivemind_etl_helpers.ingestion_pipeline import CustomIngestionPipeline
from hivemind_etl_helpers.src.db.telegram.extract import ExtractMessages, TelegramChats
from hivemind_etl_helpers.src.db.telegram.transform import TransformMessages
from hivemind_etl_helpers.src.db.telegram.utility import TelegramUtils
from hivemind_etl_helpers.src.db.telegram.utils import TelegramModules, TelegramPlatform

with DAG(
dag_id="telegram_vector_store",
Expand Down Expand Up @@ -53,15 +53,18 @@ def chat_existence(chat_info: tuple[str, str]) -> dict[str, tuple[str, str] | st
chat_id = chat_info[0]
chat_name = chat_info[1]

utils = TelegramUtils(chat_id=chat_id, chat_name=chat_name)
community_id = utils.check_platform_existence()
platform_utils = TelegramPlatform(chat_id=chat_id, chat_name=chat_name)
community_id, platform_id = platform_utils.check_platform_existence()
if community_id is None:
logging.info(
f"Platform with chat_id: {chat_id} doesn't exist. "
"Creating one instead!"
)

community_id = utils.create_platform()
community_id, platform_id = platform_utils.create_platform()

modules = TelegramModules(community_id, platform_id)
modules.create()
amindadgar marked this conversation as resolved.
Show resolved Hide resolved

return {
"chat_info": chat_info,
Expand Down
Loading