Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Telegram vectorize handling the banned users! #306

Merged
merged 4 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,13 @@ def extract(self, from_date: datetime | None = None) -> list[TelegramMessagesMod
first_message AS message,
last_edit.updated_at AS edited_at,
last_edit.text AS message_text
OPTIONAL MATCH (author:TGUser)-[created_rel:CREATED_MESSAGE]->(message)
MATCH (author:TGUser)-[created_rel:CREATED_MESSAGE]->(message)
WHERE NOT EXISTS {{
MATCH (author)-[banned_rel:BANNED]->(c:TGChat {{id: $chat_id}})
MATCH (author)-[joined_rel:JOINED|UNBANNED]->(c)
WITH author, MAX(banned_rel.date) AS banned_time, MAX(joined_rel.date) AS joined_time
WHERE banned_time > joined_time
}}
OPTIONAL MATCH (reacted_user:TGUser)-[react_rel:REACTED_TO]->(message)
OPTIONAL MATCH (reply_msg:TGMessage)-[:REPLIED]->(message)
OPTIONAL MATCH (replied_user:TGUser)-[:CREATED_MESSAGE]->(reply_msg)
Expand Down
8 changes: 3 additions & 5 deletions dags/hivemind_etl_helpers/src/db/telegram/extract/tc_chats.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ class TelegramChats:
def __init__(self) -> None:
self._connection = Neo4jOps.get_instance()

def extract_chats(self) -> list[tuple[str, str]]:
def extract_chats(self) -> list[tuple[int, str]]:
"""
extract the chat id and chat names

Returns
---------
chat_info : list[tuple[str, str]]
chat_info : list[tuple[int, str]]
a list of Telegram chat id and chat name
"""
driver = self._connection.neo4j_driver
Expand All @@ -24,9 +24,7 @@ def extract_chats(self) -> list[tuple[str, str]]:
records = session.run(
"MATCH (c:TGChat) RETURN c.id as chat_id, c.title as name"
)
chat_info = [
(str(record["chat_id"]), str(record["name"])) for record in records
]
chat_info = [(record["chat_id"], record["name"]) for record in records]
except Exception as exp:
logging.error(f"Exception during extracting chat ids. exp: {exp}")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def create_platform(self) -> ObjectId:
community_id = ObjectId()
self._client[self.database][self.collection].insert_one(
{
"name": "telegram",
"metadata": {
"id": self.chat_id,
"name": self.chat_name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,3 +254,82 @@ def test_extract_multiple_data_with_from_date(self):
data = self.extractor.extract(from_date=datetime(2024, 1, 1))

self.assertEqual(data, [])

def test_extract_single_banned_user(self):
with self.extractor._connection.neo4j_driver.session() as session:
session.run(
"""
CREATE (c:TGChat {id: $chat_id}),
(u1:TGUser {id: '927814807.0', username: 'User One'}),
(u2:TGUser {id: '203678862.0', username: 'User Two'}),
(u1)-[:JOINED {date: $joined_date1}]->(c),
(u2)-[:JOINED {date: $joined_date2}]->(c),
(m1:TGMessage {
id: '3.0',
text: '🎉️️️️️️ Welcome to the TC Ingestion Pipeline',
date: $created_at1,
updated_at: $created_at1
}
),
(m4:TGMessage {
id: '3.0',
text: '🎉️️️️️️ Welcome to the TC Ingestion Pipeline. EDITED MSG',
date: $created_at4,
updated_at: $created_at4
}
),
(m2:TGMessage {
id: '4.0',
text: 'Hi',
date: $created_at2,
updated_at: $created_at2
}
),
(m3:TGMessage {
id: '5.0',
text: 'Reply🫡',
date: $created_at3,
updated_at: $created_at3
}
),
(m1)-[:SENT_IN]->(c),
(m2)-[:SENT_IN]->(c),
(m3)-[:SENT_IN]->(c),
(m4)-[:SENT_IN]->(c),
(u1)-[:CREATED_MESSAGE]->(m1),
(u2)-[:CREATED_MESSAGE]->(m2),
(u2)-[:CREATED_MESSAGE]->(m3),
(m1)-[:EDITED]->(m4),
(m3)-[:REPLIED]->(m1),
(u2)-[:REACTED_TO {new_reaction: '[{"type":"emoji","emoji":"🍓"}]', date: $reaction_date}]->(m1),
(u2)-[:BANNED {date: $banned_date}]->(c)
""",
{
"chat_id": self.chat_id,
"created_at1": 1672531200.0, # Sunday, January 1, 2023 12:00:00 AM
"joined_date1": 1672531100.0, # Saturday, December 31, 2022 11:58:20 PM
"joined_date2": 1672531105.0, # Saturday, December 31, 2022 11:58:25 PM
"created_at4": 1672531205.0, # Sunday, January 1, 2023 12:00:05 AM
"created_at2": 1672617600.0, # Monday, January 2, 2023 12:00:00 AM
"created_at3": 1672704000.0, # Tuesday, January 3, 2023 12:00:00 AM
"reaction_date": 1672790400.0, # Wednesday, January 4, 2023 12:00:00 AM
"banned_date": 1673633100.0, # Friday, January 13, 2023 6:05:00 PM
},
)
data = self.extractor.extract()

self.assertEqual(
data,
[
TelegramMessagesModel(
message_id=3,
message_text="🎉️️️️️️ Welcome to the TC Ingestion Pipeline. EDITED MSG",
author_username="User One",
message_created_at=1672531200.0,
message_edited_at=1672531205.0,
mentions=[],
repliers=["User Two"],
reactors=["User Two"],
),
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def test_extract_chats_single_chat(self):
)

chat_infos = self.tc_chats.extract_chats()
self.assertEqual(chat_infos, [("100000", "test chat")])
self.assertEqual(chat_infos, [(100000, "test chat")])

def test_extract_chats_multiple_chats(self):
neo4j_driver = self.tc_chats._connection.neo4j_driver
Expand Down Expand Up @@ -109,8 +109,8 @@ def test_extract_chats_multiple_chats(self):
self.assertEqual(
chat_ids,
[
("100001", "test chat"),
("100002", "test chat 2"),
("100003", "test chat 3"),
(100001, "test chat"),
(100002, "test chat 2"),
(100003, "test chat 3"),
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ def setUp(self) -> None:
self.telegram_platform.database = "TempPlatforms"
self.client.drop_database(self.telegram_platform.database)

def tearDown(self) -> None:
self.client.drop_database(self.telegram_platform.database)

def test_check_no_platform_available(self):
result = self.telegram_platform.check_platform_existence()
self.assertFalse(result)
Expand All @@ -29,6 +32,7 @@ def test_single_platform_available(self):
self.telegram_platform.collection
].insert_one(
{
"name": "telegram",
"metadata": {
"id": self.chat_id,
"name": self.chat_name,
Expand All @@ -55,6 +59,7 @@ def test_telegram_multiple_platform_not_available(self):
].insert_many(
[
{
"name": "telegram",
"metadata": {
"id": chat_id,
"name": chat_name,
Expand All @@ -65,6 +70,7 @@ def test_telegram_multiple_platform_not_available(self):
"updatedAt": datetime.now(),
},
{
"name": "telegram",
"metadata": {
"id": chat_id2,
"name": chat_name2,
Expand All @@ -75,6 +81,7 @@ def test_telegram_multiple_platform_not_available(self):
"updatedAt": datetime.now(),
},
{
"name": "telegram",
"metadata": {
"id": chat_id3,
"name": chat_name3,
Expand Down
4 changes: 2 additions & 2 deletions dags/hivemind_telegram_etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
) as dag:

@task
def fetch_chat_ids() -> list[tuple[str, str]]:
def fetch_chat_ids() -> list[tuple[int, str]]:
"""
Getting all Telegram chats from the database

Returns
---------
chat_infos : list[tuple[str, str]]
chat_infos : list[tuple[int, str]]
a list of Telegram chat id and name
"""
load_dotenv()
Expand Down
Loading