Skip to content

Commit

Permalink
chore: fetch telegram channel msg concurrently
Browse files Browse the repository at this point in the history
  • Loading branch information
naaive committed Aug 26, 2024
1 parent 0082cd4 commit 8cdb19a
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 42 deletions.
82 changes: 40 additions & 42 deletions openagent/executors/tg_news_executor.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
import json
from typing import Optional, Type
import asyncio
from typing import Any, Dict, List, Optional, Type

import aiohttp
from langchain.callbacks.manager import (
AsyncCallbackManagerForToolRun,
CallbackManagerForToolRun,
)
from langchain.tools import BaseTool
from loguru import logger
from pydantic import BaseModel, Field

from openagent.conf.env import settings
from openagent.executors.tg_util import fetch_tg_msgs


class ParamSchema(BaseModel):
Expand All @@ -26,63 +23,64 @@ class TelegramNewsExecutor(BaseTool):
A tool for fetching recent news from specific Telegram channels using RSS3 DATA API.
"""

def _run(self, *args: Any, **kwargs: Any) -> Any:
raise NotImplementedError

name = "TelegramNewsExecutor"
description = """Use this tool to get recent news and updates in the blockchain \
and cryptocurrency space."""
args_schema: Type[ParamSchema] = ParamSchema

def _run(
self,
limit: int = 10,
run_manager: Optional[CallbackManagerForToolRun] = None,
) -> str:
raise NotImplementedError

async def _arun(
self,
limit: int = 10,
run_manager: Optional[AsyncCallbackManagerForToolRun] = None,
):
) -> str:
"""
Asynchronously run the Telegram news fetching process.
:param limit: Number of recent news items to fetch
:param run_manager: Optional callback manager for async operations
:return: A string containing the fetched news items
"""
return await fetch_telegram_news(limit)
return await fetch_telegram_news(["ChannelPANews", "chainfeedsxyz"], limit)


async def fetch_telegram_news(limit: int = 10):
async def fetch_telegram_news(channels: List[str], limit: int = 10) -> str:
"""
Fetch recent news from specific Telegram channels using RSS3 DATA API.
:param channels: List of Telegram channels to fetch news from
:param limit: Number of recent news items to fetch
:return: A string containing the fetched news items
"""
channels = ["ChannelPANews", "chainfeedsxyz"]
all_news = []

async with aiohttp.ClientSession() as session:
for channel in channels:
url = f"{settings.RSS3_DATA_API}/rss/telegram/channel/{channel}"
logger.info(f"Fetching news from {url}")

async with session.get(url) as resp:
if resp.status == 200:
content = await resp.text()
data = json.loads(content)
entries = data["data"][:limit]
all_news.extend(entries)
else:
logger.error(f"Failed to fetch from {url}. Status: {resp.status}")

formatted_news = []
for entry in all_news:
metadata = entry["actions"][0]["metadata"]
formatted_entry = f"Title: {metadata['title']}\nDate: {metadata['pub_date']}\nSummary: {metadata['description']}\n\n"
formatted_news.append(formatted_entry)

result = "Recent news from Telegram channels:\n\n" + "\n".join(formatted_news)

return result
results = await asyncio.gather(*[fetch_tg_msgs(channel, limit) for channel in channels])
return format_news(list(results))


def format_news(results: List[List[Dict]]) -> str:
"""
Format the fetched news results into a readable string.
:param results: A list of lists containing news entries
:return: A formatted string of news items
"""
formatted_news = [format_entry(entry) for item in results for entry in item]
return "Recent news from Telegram channels:\n\n" + "\n".join(formatted_news)


def format_entry(entry: Dict) -> str:
"""
Format a single news entry into a readable string.
:param entry: A dictionary containing news entry data
:return: A formatted string of the news entry
"""
metadata = entry["actions"][0]["metadata"]
return f"Title: {metadata['title']}\nDate: {metadata['pub_date']}\nSummary: {metadata['description']}\n\n"


if __name__ == "__main__":
loop = asyncio.get_event_loop()
entries = loop.run_until_complete(fetch_telegram_news(["ChannelPANews", "chainfeedsxyz"], 10))
print(entries)
36 changes: 36 additions & 0 deletions openagent/executors/tg_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import json

import aiohttp
from loguru import logger

from openagent.conf.env import settings


async def fetch_tg_msgs(channel: str, limit: int = 10):
"""
Fetch recent content from a specific Telegram channel using RSS3 DATA API.
:param channel: The Telegram channel to fetch content from
:param limit: Number of recent items to fetch
:return: A string containing the fetched items
"""

url = f"{settings.RSS3_DATA_API}/rss/telegram/channel/{channel}"
logger.info(f"Fetching content from {url}")

async with aiohttp.ClientSession() as session: # noqa
async with session.get(url) as resp:
if resp.status == 200:
content = await resp.text()
data = json.loads(content)
return data["data"][:limit]
else:
logger.error(f"Failed to fetch from {url}. Status: {resp.status}")


if __name__ == "__main__":
import asyncio

loop = asyncio.get_event_loop()
entries = loop.run_until_complete(fetch_tg_msgs("ChannelPANews", 5))
print(entries)

0 comments on commit 8cdb19a

Please sign in to comment.