Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/302 telegram raw vectorize #303

Merged
merged 13 commits into from
Oct 21, 2024
Merged
103 changes: 102 additions & 1 deletion dags/hivemind_etl_helpers/ingestion_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import logging
from datetime import datetime

from dateutil.parser import parse
from hivemind_etl_helpers.src.utils.credentials import load_redis_credentials
from hivemind_etl_helpers.src.utils.mongo import get_mongo_uri
from hivemind_etl_helpers.src.utils.redis import RedisSingleton
Expand All @@ -10,8 +12,11 @@
IngestionPipeline,
)
from llama_index.core.node_parser import SemanticSplitterNodeParser
from llama_index.core.schema import BaseNode
from llama_index.storage.docstore.mongodb import MongoDocumentStore
from llama_index.storage.kvstore.redis import RedisKVStore as RedisCache
from qdrant_client.conversions import common_types as qdrant_types
from qdrant_client.http import models
from tc_hivemind_backend.db.credentials import load_postgres_credentials
from tc_hivemind_backend.db.qdrant import QdrantSingleton
from tc_hivemind_backend.db.utils.model_hyperparams import load_model_hyperparams
Expand All @@ -37,7 +42,23 @@ def __init__(self, community_id: str, collection_name: str, testing: bool = Fals
)
self.redis_client = RedisSingleton.get_instance().get_client()

def run_pipeline(self, docs: list[Document]):
def run_pipeline(self, docs: list[Document]) -> list[BaseNode]:
"""
vectorize and ingest data into a qdrant collection

Note: This will handle duplicate documents by doing an upsert operation.

Parameters
------------
docs : list[llama_index.Document]
list of llama-index documents

Returns
---------
nodes : list[BaseNode]
The set of transformed and loaded Nodes/Documents
(transformation is chunking and embedding of data)
"""
# qdrant is just collection based and doesn't have any database
logging.info(
f"{len(docs)} docuemnts was extracted and now loading into QDrant DB!"
Expand Down Expand Up @@ -68,3 +89,83 @@ def run_pipeline(self, docs: list[Document]):

nodes = pipeline.run(documents=docs, show_progress=True)
return nodes

def _create_payload_index(
self,
field_name: str,
field_schema: qdrant_types.PayloadSchemaType,
) -> qdrant_types.UpdateResult:
"""
Creates an index on a field under the payload of points in qdrant db

Note: this could be used for payload fields that we want to scroll for after

Parameters
------------
field_name : str
the field name under points' payload to create the index for
field_schema : qdrant_client.conversions.common_types.PayloadSchemaType
the schema type of the field

Returns
-----------
operation_result : qdrant_client.conversions.common_types.UpdateResult
the payload index creation type
"""
operation_result = self.qdrant_client.create_payload_index(
collection_name=self.collection_name,
field_name=field_name,
field_schema=field_schema,
)

return operation_result

def get_latest_document_date(self, field_name: str) -> datetime | None:
"""
get the latest date for the most recent available document

NOTE: the given `field_name` under the points' schema MUST CONTAIN A VALUE HAVING DATE FORMAT (or a string format). If not, errors might raise in result of this function

Parameters
------------
field_name : str
the datetime field name in qdrant points' payload

Returns
---------
latest_date : datetime.datetime | None
the datetime for the document containing the latest date
if no document or any errors raised, we would return `None`
"""
latest_date: datetime | None = None
try:
result = self._create_payload_index(
field_name=field_name,
field_schema=models.PayloadSchemaType.DATETIME,
)
if result.status.name == "COMPLETED":
latest_document = self.qdrant_client.scroll(
collection_name=self.collection_name,
limit=1,
with_payload=True,
order_by=models.OrderBy(
key=field_name,
direction=models.Direction.DESC,
),
)

if not latest_document[0]:
logging.info("No documents found in the collection.")
latest_date = None
else:
latest_date = parse(latest_document[0][0].payload[field_name])

else:
raise ValueError(
f"Index not created successfully! index creation result: {result}"
)
except Exception as exp:
logging.error(f"Error: {exp} while loading latest point!")
latest_date = None

return latest_date
Empty file.
2 changes: 2 additions & 0 deletions dags/hivemind_etl_helpers/src/db/telegram/extract/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .messages import ExtractMessages
from .tc_chats import TelegramChats
82 changes: 82 additions & 0 deletions dags/hivemind_etl_helpers/src/db/telegram/extract/messages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
from datetime import datetime

from hivemind_etl_helpers.src.db.telegram.schema import TelegramMessagesModel
from tc_neo4j_lib import Neo4jOps


class ExtractMessages:
def __init__(self, chat_id: str) -> None:
self.chat_id = chat_id
self._connection = Neo4jOps.get_instance()

def extract(self, from_date: datetime | None = None) -> list[TelegramMessagesModel]:
"""
extract messages related to the given `chat_id`

Parameters
-----------
from_date : datetime | None
load from a specific date
if not given, load all data

Returns
---------
tg_messages : list[TelegramMessagesModel]
the telegram messages
"""
amindadgar marked this conversation as resolved.
Show resolved Hide resolved
# initialize
where_clause: str | None = None
from_date_timestamp: int | None = None

if from_date:
from_date_timestamp = int(from_date.timestamp() * 1000)
where_clause = """
AND message.date >= $from_date_timestamp
"""
query = f"""
MATCH (c:TGChat {{id: $chat_id}})<-[:SENT_IN]-(message:TGMessage)
WHERE message.text IS NOT NULL
{where_clause if where_clause else ""}
WITH
message.id AS message_id,
MAX(message.updated_at) AS latest_msg_time,
MIN(message.updated_at) AS first_msg_time

MATCH (first_message:TGMessage {{id: message_id, updated_at: first_msg_time}})
MATCH (last_edit:TGMessage {{id: message_id, updated_at: latest_msg_time}})

WITH
first_message AS message,
last_edit.updated_at AS edited_at,
last_edit.text AS message_text
OPTIONAL MATCH (author:TGUser)-[created_rel:CREATED_MESSAGE]->(message)
OPTIONAL MATCH (reacted_user:TGUser)-[react_rel:REACTED_TO]->(message)
OPTIONAL MATCH (reply_msg:TGMessage)-[:REPLIED]->(message)
OPTIONAL MATCH (replied_user:TGUser)-[:CREATED_MESSAGE]->(reply_msg)
OPTIONAL MATCH (message)-[:MENTIONED]->(mentioned_user:TGUser)
RETURN
message.id AS message_id,
message_text,
author.username AS author_username,
message.date AS message_created_at,
edited_at AS message_edited_at,
COLLECT(DISTINCT mentioned_user.username) AS mentions,
COLLECT(DISTINCT replied_user.username) AS repliers,
COLLECT(DISTINCT reacted_user.username) AS reactors
ORDER BY message_created_at DESC
"""

parameters = {"chat_id": self.chat_id}
if from_date_timestamp:
parameters["from_date_timestamp"] = from_date_timestamp

tg_messages = []
with self._connection.neo4j_driver.session() as session:
result = session.run(
query,
parameters=parameters,
)
messages = result.data()
tg_messages = [TelegramMessagesModel(**message) for message in messages]

return tg_messages
33 changes: 33 additions & 0 deletions dags/hivemind_etl_helpers/src/db/telegram/extract/tc_chats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import logging

from tc_neo4j_lib import Neo4jOps


class TelegramChats:
def __init__(self) -> None:
self._connection = Neo4jOps.get_instance()

def extract_chats(self) -> list[tuple[str, str]]:
"""
extract the chat id and chat names

Returns
---------
chat_info : list[tuple[str, str]]
a list of Telegram chat id and chat name
"""
driver = self._connection.neo4j_driver

chat_info: list[str] = []
try:
with driver.session() as session:
records = session.run(
"MATCH (c:TGChat) RETURN c.id as chat_id, c.title as name"
)
chat_info = [
(str(record["chat_id"]), str(record["name"])) for record in records
]
except Exception as exp:
logging.error(f"Exception during extracting chat ids. exp: {exp}")

return chat_info
16 changes: 16 additions & 0 deletions dags/hivemind_etl_helpers/src/db/telegram/schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from pydantic import BaseModel


class TelegramMessagesModel(BaseModel):
"""
Represents a Telegram message with its associated metadata.
"""

message_id: int
message_text: str
author_username: str
message_created_at: float
message_edited_at: float
mentions: list[str]
repliers: list[str]
reactors: list[str]
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .messages import TransformMessages
59 changes: 59 additions & 0 deletions dags/hivemind_etl_helpers/src/db/telegram/transform/messages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from hivemind_etl_helpers.src.db.telegram.schema import TelegramMessagesModel
from llama_index.core import Document


class TransformMessages:
def __init__(self, chat_id: str, chat_name: str) -> None:
self.chat_id = chat_id
self.chat_name = chat_name

def transform(self, messages: list[TelegramMessagesModel]) -> list[Document]:
"""
transform the given telegram messages to llama-index documents

Parameters
----------
messages : list[TelegramMessagesModel]
the extracted telegram messages

Returns
---------
transformed_docs : list[llama_index.core.Document]
a list of llama-index documents to be embedded & loaded into db
"""
transformed_docs: list[Document] = []

for message in messages:
document = Document(
text=message.message_text,
doc_id=message.message_id,
amindadgar marked this conversation as resolved.
Show resolved Hide resolved
metadata={
"author": message.author_username,
"createdAt": message.message_created_at,
"updatedAt": message.message_edited_at,
"mentions": message.mentions,
"replies": message.repliers,
"reactors": message.reactors,
"chat_name": self.chat_name,
},
excluded_embed_metadata_keys=[
"author",
"createdAt",
"updatedAt",
"mentions",
"replies",
"reactors",
"chat_name",
],
excluded_llm_metadata_keys=[
"createdAt",
"updatedAt",
"mentions",
"replies",
"reactors",
"chat_name",
],
)
transformed_docs.append(document)

amindadgar marked this conversation as resolved.
Show resolved Hide resolved
return transformed_docs
6 changes: 6 additions & 0 deletions dags/hivemind_etl_helpers/src/db/telegram/utility.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from hivemind_etl_helpers.src.db.telegram.utils import TelegramPlatform


class TelegramUtils(TelegramPlatform):
def __init__(self, chat_id: str, chat_name: str) -> None:
super().__init__(chat_id, chat_name)
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .platform import TelegramPlatform
Loading
Loading