Skip to content

Commit

Permalink
Merge pull request #303 from TogetherCrew/feat/302-telegram-raw-vecto…
Browse files Browse the repository at this point in the history
…rize

Feat/302 telegram raw vectorize
  • Loading branch information
amindadgar authored Oct 21, 2024
2 parents c4586cf + e1c6a89 commit ab5136f
Show file tree
Hide file tree
Showing 15 changed files with 939 additions and 1 deletion.
103 changes: 102 additions & 1 deletion dags/hivemind_etl_helpers/ingestion_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import logging
from datetime import datetime

from dateutil.parser import parse
from hivemind_etl_helpers.src.utils.credentials import load_redis_credentials
from hivemind_etl_helpers.src.utils.mongo import get_mongo_uri
from hivemind_etl_helpers.src.utils.redis import RedisSingleton
Expand All @@ -10,8 +12,11 @@
IngestionPipeline,
)
from llama_index.core.node_parser import SemanticSplitterNodeParser
from llama_index.core.schema import BaseNode
from llama_index.storage.docstore.mongodb import MongoDocumentStore
from llama_index.storage.kvstore.redis import RedisKVStore as RedisCache
from qdrant_client.conversions import common_types as qdrant_types
from qdrant_client.http import models
from tc_hivemind_backend.db.credentials import load_postgres_credentials
from tc_hivemind_backend.db.qdrant import QdrantSingleton
from tc_hivemind_backend.db.utils.model_hyperparams import load_model_hyperparams
Expand All @@ -37,7 +42,23 @@ def __init__(self, community_id: str, collection_name: str, testing: bool = Fals
)
self.redis_client = RedisSingleton.get_instance().get_client()

def run_pipeline(self, docs: list[Document]):
def run_pipeline(self, docs: list[Document]) -> list[BaseNode]:
"""
vectorize and ingest data into a qdrant collection
Note: This will handle duplicate documents by doing an upsert operation.
Parameters
------------
docs : list[llama_index.Document]
list of llama-index documents
Returns
---------
nodes : list[BaseNode]
The set of transformed and loaded Nodes/Documents
(transformation is chunking and embedding of data)
"""
# qdrant is just collection based and doesn't have any database
logging.info(
f"{len(docs)} docuemnts was extracted and now loading into QDrant DB!"
Expand Down Expand Up @@ -68,3 +89,83 @@ def run_pipeline(self, docs: list[Document]):

nodes = pipeline.run(documents=docs, show_progress=True)
return nodes

def _create_payload_index(
self,
field_name: str,
field_schema: qdrant_types.PayloadSchemaType,
) -> qdrant_types.UpdateResult:
"""
Creates an index on a field under the payload of points in qdrant db
Note: this could be used for payload fields that we want to scroll for after
Parameters
------------
field_name : str
the field name under points' payload to create the index for
field_schema : qdrant_client.conversions.common_types.PayloadSchemaType
the schema type of the field
Returns
-----------
operation_result : qdrant_client.conversions.common_types.UpdateResult
the payload index creation type
"""
operation_result = self.qdrant_client.create_payload_index(
collection_name=self.collection_name,
field_name=field_name,
field_schema=field_schema,
)

return operation_result

def get_latest_document_date(self, field_name: str) -> datetime | None:
"""
get the latest date for the most recent available document
NOTE: the given `field_name` under the points' schema MUST CONTAIN A VALUE HAVING DATE FORMAT (or a string format). If not, errors might raise in result of this function
Parameters
------------
field_name : str
the datetime field name in qdrant points' payload
Returns
---------
latest_date : datetime.datetime | None
the datetime for the document containing the latest date
if no document or any errors raised, we would return `None`
"""
latest_date: datetime | None = None
try:
result = self._create_payload_index(
field_name=field_name,
field_schema=models.PayloadSchemaType.DATETIME,
)
if result.status.name == "COMPLETED":
latest_document = self.qdrant_client.scroll(
collection_name=self.collection_name,
limit=1,
with_payload=True,
order_by=models.OrderBy(
key=field_name,
direction=models.Direction.DESC,
),
)

if not latest_document[0]:
logging.info("No documents found in the collection.")
latest_date = None
else:
latest_date = parse(latest_document[0][0].payload[field_name])

else:
raise ValueError(
f"Index not created successfully! index creation result: {result}"
)
except Exception as exp:
logging.error(f"Error: {exp} while loading latest point!")
latest_date = None

return latest_date
Empty file.
2 changes: 2 additions & 0 deletions dags/hivemind_etl_helpers/src/db/telegram/extract/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .messages import ExtractMessages
from .tc_chats import TelegramChats
82 changes: 82 additions & 0 deletions dags/hivemind_etl_helpers/src/db/telegram/extract/messages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
from datetime import datetime

from hivemind_etl_helpers.src.db.telegram.schema import TelegramMessagesModel
from tc_neo4j_lib import Neo4jOps


class ExtractMessages:
def __init__(self, chat_id: str) -> None:
self.chat_id = chat_id
self._connection = Neo4jOps.get_instance()

def extract(self, from_date: datetime | None = None) -> list[TelegramMessagesModel]:
"""
extract messages related to the given `chat_id`
Parameters
-----------
from_date : datetime | None
load from a specific date
if not given, load all data
Returns
---------
tg_messages : list[TelegramMessagesModel]
the telegram messages
"""
# initialize
where_clause: str | None = None
from_date_timestamp: int | None = None

if from_date:
from_date_timestamp = int(from_date.timestamp() * 1000)
where_clause = """
AND message.date >= $from_date_timestamp
"""
query = f"""
MATCH (c:TGChat {{id: $chat_id}})<-[:SENT_IN]-(message:TGMessage)
WHERE message.text IS NOT NULL
{where_clause if where_clause else ""}
WITH
message.id AS message_id,
MAX(message.updated_at) AS latest_msg_time,
MIN(message.updated_at) AS first_msg_time
MATCH (first_message:TGMessage {{id: message_id, updated_at: first_msg_time}})
MATCH (last_edit:TGMessage {{id: message_id, updated_at: latest_msg_time}})
WITH
first_message AS message,
last_edit.updated_at AS edited_at,
last_edit.text AS message_text
OPTIONAL MATCH (author:TGUser)-[created_rel:CREATED_MESSAGE]->(message)
OPTIONAL MATCH (reacted_user:TGUser)-[react_rel:REACTED_TO]->(message)
OPTIONAL MATCH (reply_msg:TGMessage)-[:REPLIED]->(message)
OPTIONAL MATCH (replied_user:TGUser)-[:CREATED_MESSAGE]->(reply_msg)
OPTIONAL MATCH (message)-[:MENTIONED]->(mentioned_user:TGUser)
RETURN
message.id AS message_id,
message_text,
author.username AS author_username,
message.date AS message_created_at,
edited_at AS message_edited_at,
COLLECT(DISTINCT mentioned_user.username) AS mentions,
COLLECT(DISTINCT replied_user.username) AS repliers,
COLLECT(DISTINCT reacted_user.username) AS reactors
ORDER BY message_created_at DESC
"""

parameters = {"chat_id": self.chat_id}
if from_date_timestamp:
parameters["from_date_timestamp"] = from_date_timestamp

tg_messages = []
with self._connection.neo4j_driver.session() as session:
result = session.run(
query,
parameters=parameters,
)
messages = result.data()
tg_messages = [TelegramMessagesModel(**message) for message in messages]

return tg_messages
33 changes: 33 additions & 0 deletions dags/hivemind_etl_helpers/src/db/telegram/extract/tc_chats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import logging

from tc_neo4j_lib import Neo4jOps


class TelegramChats:
def __init__(self) -> None:
self._connection = Neo4jOps.get_instance()

def extract_chats(self) -> list[tuple[str, str]]:
"""
extract the chat id and chat names
Returns
---------
chat_info : list[tuple[str, str]]
a list of Telegram chat id and chat name
"""
driver = self._connection.neo4j_driver

chat_info: list[str] = []
try:
with driver.session() as session:
records = session.run(
"MATCH (c:TGChat) RETURN c.id as chat_id, c.title as name"
)
chat_info = [
(str(record["chat_id"]), str(record["name"])) for record in records
]
except Exception as exp:
logging.error(f"Exception during extracting chat ids. exp: {exp}")

return chat_info
16 changes: 16 additions & 0 deletions dags/hivemind_etl_helpers/src/db/telegram/schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from pydantic import BaseModel


class TelegramMessagesModel(BaseModel):
"""
Represents a Telegram message with its associated metadata.
"""

message_id: int
message_text: str
author_username: str
message_created_at: float
message_edited_at: float
mentions: list[str]
repliers: list[str]
reactors: list[str]
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .messages import TransformMessages
59 changes: 59 additions & 0 deletions dags/hivemind_etl_helpers/src/db/telegram/transform/messages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from hivemind_etl_helpers.src.db.telegram.schema import TelegramMessagesModel
from llama_index.core import Document


class TransformMessages:
def __init__(self, chat_id: str, chat_name: str) -> None:
self.chat_id = chat_id
self.chat_name = chat_name

def transform(self, messages: list[TelegramMessagesModel]) -> list[Document]:
"""
transform the given telegram messages to llama-index documents
Parameters
----------
messages : list[TelegramMessagesModel]
the extracted telegram messages
Returns
---------
transformed_docs : list[llama_index.core.Document]
a list of llama-index documents to be embedded & loaded into db
"""
transformed_docs: list[Document] = []

for message in messages:
document = Document(
text=message.message_text,
doc_id=message.message_id,
metadata={
"author": message.author_username,
"createdAt": message.message_created_at,
"updatedAt": message.message_edited_at,
"mentions": message.mentions,
"replies": message.repliers,
"reactors": message.reactors,
"chat_name": self.chat_name,
},
excluded_embed_metadata_keys=[
"author",
"createdAt",
"updatedAt",
"mentions",
"replies",
"reactors",
"chat_name",
],
excluded_llm_metadata_keys=[
"createdAt",
"updatedAt",
"mentions",
"replies",
"reactors",
"chat_name",
],
)
transformed_docs.append(document)

return transformed_docs
6 changes: 6 additions & 0 deletions dags/hivemind_etl_helpers/src/db/telegram/utility.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from hivemind_etl_helpers.src.db.telegram.utils import TelegramPlatform


class TelegramUtils(TelegramPlatform):
def __init__(self, chat_id: str, chat_name: str) -> None:
super().__init__(chat_id, chat_name)
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .platform import TelegramPlatform
Loading

0 comments on commit ab5136f

Please sign in to comment.