Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: telegram vectorize - runtime bugs! #305

Merged
merged 3 commits into from
Oct 21, 2024
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 32 additions & 12 deletions dags/hivemind_telegram_etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,11 @@ def fetch_chat_ids() -> list[tuple[str, str]]:
a list of Telegram chat id and name
"""
load_dotenv()
chat_infos = TelegramChats.extract_chats()
logging.info(f"Extracted chats: {chat_infos}")
chat_infos = TelegramChats().extract_chats()
return chat_infos

@task
def chat_existence(chat_info: tuple[str, str]) -> tuple[str, str]:
def chat_existence(chat_info: tuple[str, str]) -> dict[str, tuple[str, str] | str]:
"""
amindadgar marked this conversation as resolved.
Show resolved Hide resolved
check if the community & platform was created for the telegram or not
if not, create a community and platform and hivemind module for it
Expand All @@ -46,7 +45,9 @@ def chat_existence(chat_info: tuple[str, str]) -> tuple[str, str]:

Returns
---------
chat_info : tuple[str, str]
details : dict[str, tuple[str, str] | str]
the chat details containing the chat_info
and a community id related to that
tuple containing telegram chat id and chat name
"""
chat_id = chat_info[0]
Expand All @@ -62,20 +63,33 @@ def chat_existence(chat_info: tuple[str, str]) -> tuple[str, str]:

community_id = utils.create_platform()

return chat_info, community_id
return {
"chat_info": chat_info,
"community_id": str(community_id),
}

@task
def processor(chat_info: tuple[str, str], community_id: str) -> None:
def processor(
details: dict[str, tuple[str, str] | str],
) -> None:
"""
extract, transform, and load telegram data

Parameters
-----------
chat_id : str
a telegram chat id
community_id : str
the community id, related the created community
details : dict[str, tuple[str, str] | str]
the chat details containing the chat_info
and a community id related to that
tuple containing telegram chat id and chat name
"""
load_dotenv()
logging.info(f"received details: {details}!")
# unwrapping data
chat_info = details["chat_info"]
community_id = details["community_id"]

amindadgar marked this conversation as resolved.
Show resolved Hide resolved
logging.info(f"Started processing community: {community_id}")

chat_id = chat_info[0]
chat_name = chat_info[1]

Expand All @@ -92,12 +106,18 @@ def processor(chat_info: tuple[str, str], community_id: str) -> None:
if latest_date:
# this is to catch any edits for messages of 30 days ago
from_date = latest_date - timedelta(days=30)
logging.info(f"Started extracting from date: {from_date}!")
messages = extractor.extract(from_date=from_date)
else:
logging.info("Started extracting data from scratch!")
messages = extractor.extract()

logging.info(f"Extracted {len(messages)} messages!")
documents = transformer.transform(messages=messages)
logging.info(f"Messages transformed!")
amindadgar marked this conversation as resolved.
Show resolved Hide resolved
ingestion_pipeline.run_pipeline(docs=documents)
logging.info("Finished loading into database!")

chat_infos = fetch_chat_ids()
chat_info, community_id = chat_existence.expand(chat_info=chat_infos)
processor(chat_info=chat_info, community_id=community_id)
details = chat_existence.expand(chat_info=chat_infos)
processor.expand(details=details)
Loading