Skip to content

Commit

Permalink
feat: Finish up the telegram embedding!
Browse files Browse the repository at this point in the history
Also, added a test case for the extraction of it.
  • Loading branch information
amindadgar committed Oct 21, 2024
1 parent d58eacd commit c563c19
Show file tree
Hide file tree
Showing 5 changed files with 345 additions and 22 deletions.
98 changes: 97 additions & 1 deletion dags/hivemind_etl_helpers/ingestion_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import logging
from datetime import datetime
from dateutil.parser import parse

from hivemind_etl_helpers.src.utils.credentials import load_redis_credentials
from hivemind_etl_helpers.src.utils.mongo import get_mongo_uri
Expand All @@ -10,8 +12,11 @@
IngestionPipeline,
)
from llama_index.core.node_parser import SemanticSplitterNodeParser
from llama_index.core.schema import BaseNode
from llama_index.storage.docstore.mongodb import MongoDocumentStore
from llama_index.storage.kvstore.redis import RedisKVStore as RedisCache
from qdrant_client.conversions import common_types as types
from qdrant_client.http import models
from tc_hivemind_backend.db.credentials import load_postgres_credentials
from tc_hivemind_backend.db.qdrant import QdrantSingleton
from tc_hivemind_backend.db.utils.model_hyperparams import load_model_hyperparams
Expand All @@ -37,7 +42,23 @@ def __init__(self, community_id: str, collection_name: str, testing: bool = Fals
)
self.redis_client = RedisSingleton.get_instance().get_client()

def run_pipeline(self, docs: list[Document]):
def run_pipeline(self, docs: list[Document]) -> list[BaseNode]:
"""
vectorize and ingest data into a qdrant collection
Note: This will handle duplicate documents by doing an upsert operation.
Parameters
------------
docs : list[llama_index.Document]
list of llama-index documents
Returns
---------
nodes : list[BaseNode]
The set of transformed and loaded Nodes/Documents
(transformation is chunking and embedding of data)
"""
# qdrant is just collection based and doesn't have any database
logging.info(
f"{len(docs)} docuemnts was extracted and now loading into QDrant DB!"
Expand Down Expand Up @@ -68,3 +89,78 @@ def run_pipeline(self, docs: list[Document]):

nodes = pipeline.run(documents=docs, show_progress=True)
return nodes

def _create_payload_index(
self,
field_name: str,
field_schema: types.PayloadSchemaType,
) -> types.UpdateResult:
"""
Creates an index on a field under the payload of points in qdrant db
Note: this could be used for payload fields that we want to scroll for after
Parameters
------------
field_name : str
the field name under points' payload to create the index for
field_schema : types.PayloadSchemaType
the schema type of the field
Returns
-----------
operation_result : qdrant_client.conversions.common_types.UpdateResult
the payload index creation type
"""
operation_result = self.qdrant_client.create_payload_index(
collection_name=self.collection_name,
field_name=field_name,
field_schema=field_schema,
)

return operation_result

def get_latest_document_date(self, field_name: str) -> datetime | None:
"""
get the latest date for the most recent available document
NOTE: the given `field_name` under the points' schema MUST CONTAIN A VALUE HAVING DATE FORMAT (or a string format). If not, errors might raise in result of this function
Parameters
------------
field_name : str
the datetime field name in qdrant points' payload
Returns
---------
latest_date : datetime.datetime | None
the datetime for the document containing the latest date
if no document or any errors raised, we would return `None`
"""
latest_date: datetime | None = None
try:
result = self._create_payload_index(
field_name=field_name,
field_schema=models.PayloadSchemaType.DATETIME,
)
if result.status.name == "COMPLETED":
latest_document = self.qdrant_client.scroll(
collection_name=self.collection_name,
limit=1,
with_payload=True,
order_by=models.OrderBy(
key=field_name,
direction=models.Direction.DESC,
),
)

latest_date = parse(latest_document[0][0].payload[field_name])
else:
raise ValueError(
f"Index not created successfully! index creation result: {result}"
)
except Exception as exp:
logging.error(f"Error: {exp} while loading latest point!")
latest_date = None
finally:
return latest_date
32 changes: 26 additions & 6 deletions dags/hivemind_etl_helpers/src/db/telegram/extract/messages.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from tc_neo4j_lib import Neo4jOps
from datetime import datetime

from hivemind_etl_helpers.src.db.telegram.schema import TelegramMessagesModel


Expand All @@ -7,25 +9,40 @@ def __init__(self, chat_id: str) -> None:
self.chat_id = chat_id
self._connection = Neo4jOps.get_instance()

def extract(self) -> list[TelegramMessagesModel]:
def extract(self, from_date: datetime | None = None) -> list[TelegramMessagesModel]:
"""
extract messages related to the given `chat_id`
Parameters
-----------
from_date : datetime | None
load from a specific date
if not given, load all data
Returns
---------
tg_messages : list[TelegramMessagesModel]
the telegram messages
"""
query = """
MATCH (c:TGChat {id: $chat_id})<-[:SENT_IN]-(message:TGMessage)
query = "MATCH (c:TGChat {id: $chat_id})<-[:SENT_IN]-(message:TGMessage)"

where_clause: str | None = None
from_date_timestamp = int(from_date.timestamp() * 1000)
if from_date:
where_clause = f"""
AND message.date >= $from_date_timestamp
"""
query += f"""
MATCH (c:TGChat {{id: $chat_id}})<-[:SENT_IN]-(message:TGMessage)
WHERE message.text IS NOT NULL
{where_clause if where_clause else ""}
WITH
message.id AS message_id,
MAX(message.updated_at) AS latest_msg_time,
MIN(message.updated_at) AS first_msg_time
MATCH (first_message:TGMessage {id: message_id, updated_at: first_msg_time})
MATCH (last_edit:TGMessage {id: message_id, updated_at: latest_msg_time})
MATCH (first_message:TGMessage {{id: message_id, updated_at: first_msg_time}})
MATCH (last_edit:TGMessage {{id: message_id, updated_at: latest_msg_time}})
WITH
first_message AS message,
Expand All @@ -49,7 +66,10 @@ def extract(self) -> list[TelegramMessagesModel]:
"""
tg_messages = []
with self._connection.neo4j_driver.session() as session:
result = session.run(query, {"chat_id": self.chat_id})
result = session.run(
query,
{"chat_id": self.chat_id, "from_date_timestamp": from_date_timestamp},
)
messages = result.data()
tg_messages = [TelegramMessagesModel(**message) for message in messages]

Expand Down
195 changes: 195 additions & 0 deletions dags/hivemind_etl_helpers/tests/integration/test_extract_messages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
from unittest import TestCase
from datetime import datetime

from dotenv import load_dotenv
from hivemind_etl_helpers.src.db.telegram.extract import ExtractMessages
from hivemind_etl_helpers.src.db.telegram.schema import TelegramMessagesModel


class TestExtractTelegramMessages(TestCase):
def setUp(self) -> None:
load_dotenv()
self.chat_id = "1234567890"
self.extractor = ExtractMessages()
self._delete_everything()

def tearDown(self) -> None:
self._delete_everything()

def _delete_everything(self):
"""remove everything on neo4j db"""
with self.extractor._connection.neo4j_driver.session() as session:
session.execute_write(lambda tx: tx.run("MATCH (n) DETACH DELETE (n)"))

def test_extract_empty_data(self):
data = self.extractor.extract()

self.assertEqual(data, [])

def test_extract_empty_data_with_from_date(self):
data = self.extractor.extract(from_date=datetime(2023, 1, 1))

self.assertEqual(data, [])

def test_extract_single_data(self):
with self.extractor._connection.neo4j_driver.session() as session:
session.run(
"""
CREATE (c:TGChat {id: $chat_id}),
(u1:TGUser {id: '927814807.0', username: 'User One'}),
(u2:TGUser {id: '203678862.0', username: 'User Two'}),
(m1:TGMessage {
id: '3.0',
text: '🎉️️️️️️ Welcome to the TC Ingestion Pipeline',
date: $created_at1,
updated_at: $created_at1
}
),
(m1)-[:SENT_IN]->(c),
(u1)-[:CREATED_MESSAGE]->(m1),
(u2)-[:REACTED_TO {new_reaction: '[{"type":"emoji","emoji":"🍓"}]', date: $reaction_date}]->(m1)
""",
{
"chat_id": self.chat_id,
"created_at1": 1672531200.0, # Sunday, January 1, 2023 12:00:00 AM
"reaction_date": 1672790400.0, # Wednesday, January 4, 2023 12:00:00 AM
},
)
data = self.extractor.extract()

self.assertEqual(
data,
[
TelegramMessagesModel(
message_id=3,
message_text="🎉️️️️️️ Welcome to the TC Ingestion Pipeline",
author_username="User One",
message_created_at=1672531200,
message_edited_at=1672531200,
mentions=[],
repliers=[],
reactors=["User Two"],
)
],
)

def test_extract_single_data_with_from_date(self):
with self.extractor._connection.neo4j_driver.session() as session:
session.run(
"""
CREATE (c:TGChat {id: $chat_id}),
(u1:TGUser {id: '927814807.0', username: 'User One'}),
(u2:TGUser {id: '203678862.0', username: 'User Two'}),
(m1:TGMessage {
id: '3.0',
text: '🎉️️️️️️ Welcome to the TC Ingestion Pipeline',
date: $created_at1,
updated_at: $created_at1
}
),
(m1)-[:SENT_IN]->(c),
(u1)-[:CREATED_MESSAGE]->(m1),
(u2)-[:REACTED_TO {new_reaction: '[{"type":"emoji","emoji":"🍓"}]', date: $reaction_date}]->(m1)
""",
{
"chat_id": self.chat_id,
"created_at1": 1672531200.0, # Sunday, January 1, 2023 12:00:00 AM
"reaction_date": 1672790400.0, # Wednesday, January 4, 2023 12:00:00 AM
},
)
data = self.extractor.extract(from_date=datetime(2024, 1, 1))

self.assertEqual(data, [])

def test_extract_multiple_data(self):
with self.extractor._connection.neo4j_driver.session() as session:
session.run(
"""
CREATE (c:TGChat {id: $chat_id}),
(u1:TGUser {id: '927814807.0', username: 'User One'}),
(u2:TGUser {id: '203678862.0', username: 'User Two'}),
(m1:TGMessage {
id: '3.0',
text: '🎉️️️️️️ Welcome to the TC Ingestion Pipeline',
date: $created_at1,
updated_at: $created_at1
}
),
(m4:TGMessage {
id: '3.0',
text: '🎉️️️️️️ Welcome to the TC Ingestion Pipeline. EDITED MSG',
date: $created_at4,
updated_at: $created_at4
}
),
(m2:TGMessage {
id: '4.0',
text: 'Hi',
date: $created_at2,
updated_at: $created_at2
}
),
(m3:TGMessage {
id: '5.0',
text: 'Reply🫡',
date: $created_at3,
updated_at: $created_at3
}
),
(m1)-[:SENT_IN]->(c),
(m2)-[:SENT_IN]->(c),
(m3)-[:SENT_IN]->(c),
(m4)-[:SENT_IN]->(c),
(u1)-[:CREATED_MESSAGE]->(m1),
(u2)-[:CREATED_MESSAGE]->(m2),
(u2)-[:CREATED_MESSAGE]->(m3),
(m1)-[:EDITED]->(m4),
(m3)-[:REPLIED]->(m1),
(u2)-[:REACTED_TO {new_reaction: '[{"type":"emoji","emoji":"🍓"}]', date: $reaction_date}]->(m1)
""",
{
"chat_id": self.chat_id,
"created_at1": 1672531200.0, # Sunday, January 1, 2023 12:00:00 AM
"created_at4": 1672531205.0, # Sunday, January 1, 2023 12:00:05 AM
"created_at2": 1672617600.0, # Monday, January 2, 2023 12:00:00 AM
"created_at3": 1672704000.0, # Tuesday, January 3, 2023 12:00:00 AM
"reaction_date": 1672790400.0, # Wednesday, January 4, 2023 12:00:00 AM
},
)
data = self.extractor.extract(from_date=datetime(2024, 1, 1))

self.assertEqual(
data,
[
TelegramMessagesModel(
message_id=3,
message_text="🎉️️️️️️ Welcome to the TC Ingestion Pipeline. EDITED MSG",
author_username="User One",
message_created_at=1672531200,
message_edited_at=1672531200,
mentions=[],
repliers=[],
reactors=["User Two"],
),
TelegramMessagesModel(
message_id=4,
message_text="Hi",
author_username="User Two",
message_created_at=1672531205,
message_edited_at=1672531205,
mentions=[],
repliers=[],
reactors=[],
),
TelegramMessagesModel(
message_id=5,
message_text="Reply🫡",
author_username="User Two",
message_created_at=1672704000,
message_edited_at=1672704000,
mentions=[],
repliers=[],
reactors=[],
),
],
)
Loading

0 comments on commit c563c19

Please sign in to comment.