Skip to content

Commit

Permalink
Merge pull request #317 from TogetherCrew/feat/313-telegram-summarizer
Browse files Browse the repository at this point in the history
fix: hivemind telegram ETL, data types and timestamp conversion!
  • Loading branch information
amindadgar authored Nov 6, 2024
2 parents a321837 + 07d6ca7 commit e4f58ee
Show file tree
Hide file tree
Showing 12 changed files with 34 additions and 29 deletions.
2 changes: 1 addition & 1 deletion dags/analyzer_helper/telegram/extract_raw_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@


class ExtractRawInfo:
def __init__(self, chat_id: str, platform_id: str):
def __init__(self, chat_id: int, platform_id: str):
"""
Initialize the ExtractRawInfo with the forum endpoint, platform id and set up Neo4j and MongoDB connection.
"""
Expand Down
2 changes: 1 addition & 1 deletion dags/analyzer_helper/telegram/extract_raw_members.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@


class ExtractRawMembers:
def __init__(self, chat_id: str, platform_id: str):
def __init__(self, chat_id: int, platform_id: str):
"""
Initialize the ExtractRawMembers with the Neo4j connection parameters.
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def setUpClass(cls):
cls.neo4jConnection = Neo4jConnection()
cls.client = MongoSingleton.get_instance().client
cls.driver = cls.neo4jConnection.connect_neo4j()
cls.chat_id = "test_group"
cls.chat_id = 1234554321
cls.platform_id = "platform_db"
cls.platform_db = cls.client[cls.platform_id]
cls.extractor = ExtractRawInfo(cls.chat_id, cls.platform_id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ class TestExtractRawMembers(unittest.TestCase):
def setUp(self):
self.neo4jConnection = Neo4jConnection()
self.driver = self.neo4jConnection.connect_neo4j()
self.chat_id = "telegram_test_group"
self.chat_id = 12345665432
self.platform_id = "telegram_test_platform"
self.extractor = ExtractRawMembers(self.chat_id, self.platform_id)
self.rawmembers_collection = self.extractor.rawmembers_collection
Expand Down
2 changes: 1 addition & 1 deletion dags/analyzer_helper/telegram/transform_raw_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@


class TransformRawInfo:
def __init__(self, chat_id: str):
def __init__(self, chat_id: int):
self.converter = DateTimeFormatConverter
self.user_bot_checker = UserBotChecker()
self.chat_id = chat_id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@


class ExtractMessages:
def __init__(self, chat_id: str) -> None:
def __init__(self, chat_id: int) -> None:
self.chat_id = chat_id
self._connection = Neo4jOps.get_instance()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@


class ExtractMessagesDaily:
def __init__(self, chat_id: str) -> None:
def __init__(self, chat_id: int) -> None:
self.extractor = ExtractMessages(chat_id=chat_id)

def extract(
Expand All @@ -31,7 +31,7 @@ def extract(

daily_tg_messages: dict[date, list[TelegramMessagesModel]] = defaultdict(list)
for msg in messages:
msg_date = datetime.fromtimestamp(msg.message_created_at / 1000).date()
msg_date = datetime.fromtimestamp(msg.message_created_at).date()
daily_tg_messages[msg_date].append(msg)

return daily_tg_messages
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@


class TransformMessages:
def __init__(self, chat_id: str, chat_name: str) -> None:
def __init__(self, chat_id: int, chat_name: str) -> None:
self.chat_id = chat_id
self.chat_name = chat_name

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
class SummarizeMessages(SummaryBase):
def __init__(
self,
chat_id: str,
chat_id: int,
chat_name: str,
response_synthesizer: BaseSynthesizer | None = None,
verbose: bool = False,
Expand Down
4 changes: 2 additions & 2 deletions dags/hivemind_etl_helpers/src/db/telegram/utils/platform.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@


class TelegramPlatform:
def __init__(self, chat_id: str, chat_name: str) -> None:
def __init__(self, chat_id: int, chat_name: str) -> None:
"""
Parameters
-----------
chat_id : str
chat_id : int
check if there's any platform exists
chat_name : str
the chat name to create later (if not already exists)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
class TestExtractMessagesDaily(unittest.TestCase):
def setUp(self):
load_dotenv()
self.chat_id = "test_chat"
self.chat_id = 12344321
self.extractor = ExtractMessagesDaily(chat_id=self.chat_id)

@patch(
Expand All @@ -32,16 +32,16 @@ def test_single_message(self, mock_extract):
message_id=1,
message_text="Hello, World!",
author_username="user1",
message_created_at=1633046400000.0, # Equivalent to 2021-10-01T00:00:00 UTC
message_edited_at=1633046500000.0,
message_created_at=1633046400.0, # Equivalent to 2021-10-01T00:00:00 UTC
message_edited_at=1633046500.0,
mentions=["user2"],
repliers=["user3"],
reactors=["user4"],
)
mock_extract.return_value = [msg]

result = self.extractor.extract(from_date=None)
expected_date = datetime.fromtimestamp(msg.message_created_at / 1000).date()
expected_date = datetime.fromtimestamp(msg.message_created_at).date()

self.assertIn(expected_date, result, "Expected date key in result")
self.assertEqual(
Expand All @@ -59,8 +59,8 @@ def test_multiple_messages_same_date(self, mock_extract):
message_id=1,
message_text="Message 1",
author_username="user1",
message_created_at=1633046400000.0, # Equivalent to 2021-10-01
message_edited_at=1633046500000.0,
message_created_at=1633046400.0, # Equivalent to 2021-10-01
message_edited_at=1633046500.0,
mentions=["user2"],
repliers=["user3"],
reactors=["user4"],
Expand All @@ -69,16 +69,16 @@ def test_multiple_messages_same_date(self, mock_extract):
message_id=2,
message_text="Message 2",
author_username="user2",
message_created_at=1633050000000.0, # Same date as msg1
message_edited_at=1633050500000.0,
message_created_at=1633050000.0, # Same date as msg1
message_edited_at=1633050500.0,
mentions=["user1"],
repliers=["user3"],
reactors=["user4"],
)
mock_extract.return_value = [msg1, msg2]

result = self.extractor.extract(from_date=None)
expected_date = datetime.fromtimestamp(msg1.message_created_at / 1000).date()
expected_date = datetime.fromtimestamp(msg1.message_created_at).date()

self.assertIn(expected_date, result, "Expected date key in result")
self.assertEqual(
Expand All @@ -94,8 +94,8 @@ def test_multiple_messages_different_dates(self, mock_extract):
message_id=1,
message_text="Message 1",
author_username="user1",
message_created_at=1633046400000.0, # 2021-10-01
message_edited_at=1633046500000.0,
message_created_at=1633046400.0, # 2021-10-01
message_edited_at=1633046500.0,
mentions=["user2"],
repliers=["user3"],
reactors=["user4"],
Expand All @@ -104,17 +104,17 @@ def test_multiple_messages_different_dates(self, mock_extract):
message_id=2,
message_text="Message 2",
author_username="user2",
message_created_at=1633132800000.0, # 2021-10-02
message_edited_at=1633132900000.0,
message_created_at=1633132800.0, # 2021-10-02
message_edited_at=1633132900.0,
mentions=["user1"],
repliers=["user3"],
reactors=["user4"],
)
mock_extract.return_value = [msg1, msg2]

result = self.extractor.extract(from_date=None)
date1 = datetime.fromtimestamp(msg1.message_created_at / 1000).date()
date2 = datetime.fromtimestamp(msg2.message_created_at / 1000).date()
date1 = datetime.fromtimestamp(msg1.message_created_at).date()
date2 = datetime.fromtimestamp(msg2.message_created_at).date()

self.assertIn(date1, result, "Expected first date key in result")
self.assertIn(date2, result, "Expected second date key in result")
Expand Down
9 changes: 7 additions & 2 deletions dags/hivemind_telegram_etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,16 @@ def process_data(messages):
logging.info(f"Started extracting from date: {latest_date}!")
messages = extractor.extract(from_date=latest_date)

logging.info(f"Extracted {len(messages)} messages!")
if dag_type == "messages":
msg_count = len(messages)
else:
msg_count = len(sum(messages.values(), []))

logging.info(f"Extracted {msg_count} messages!")

# Process and load data
documents = process_data(messages)
logging.info(f"Transformed {len(messages)} messages!")
logging.info(f"Transformed {len(documents)} messages!")

ingestion_pipeline.run_pipeline(docs=documents)
logging.info("Finished loading into database!")
Expand Down

0 comments on commit e4f58ee

Please sign in to comment.