diff --git a/app/bot/bot.py b/app/bot/bot.py index a586239..b5fbd08 100644 --- a/app/bot/bot.py +++ b/app/bot/bot.py @@ -1,6 +1,6 @@ """Configuration for bot instance.""" -from pybotx import Bot +from pybotx import Bot, CallbackRepoProto from pybotx_fsm import FSMMiddleware from app.bot.commands import common @@ -10,7 +10,7 @@ from app.settings import settings -def get_bot() -> Bot: +def get_bot(callback_repo: CallbackRepoProto) -> Bot: return Bot( collectors=[ common.collector, @@ -26,4 +26,5 @@ def get_bot() -> Bot: [create_task.fsm, get_tasks.fsm], state_repo_key="redis_repo" ), ], + callback_repo=callback_repo, ) diff --git a/app/caching/callback_redis_repo.py b/app/caching/callback_redis_repo.py new file mode 100644 index 0000000..33a355c --- /dev/null +++ b/app/caching/callback_redis_repo.py @@ -0,0 +1,77 @@ +"""Repository for work callbacks with redis.""" + +import asyncio +import pickle # noqa: S403 +from typing import Optional +from uuid import UUID + +# from redis import asyncio as aioredis # type: ignore +import aioredis +from pybotx import CallbackNotReceivedError, CallbackRepoProto +from pybotx.models.method_callbacks import BotXMethodCallback + +from app.resources import strings + + +class CallbackRedisRepo(CallbackRepoProto): + CACHE_KEY = f"{strings.BOT_PROJECT_NAME}:callbacks_stream" + CHECK_CALLBACKS_DELAY = 0.01 + + def __init__(self, redis: aioredis.Redis, prefix: Optional[str] = None): + self._redis = redis + self._prefix = prefix or "" + + async def create_botx_method_callback(self, sync_id: UUID) -> None: + """Unnecessary in streams implementation.""" + + async def set_botx_method_callback_result( + self, callback: BotXMethodCallback + ) -> None: + dump = pickle.dumps(callback) + await self._redis.xadd( + self.CACHE_KEY, + {f"{self._prefix}:{callback.sync_id}": dump}, + ) + + async def wait_botx_method_callback( + self, sync_id: UUID, timeout: float + ) -> BotXMethodCallback: + try: + callback = await asyncio.wait_for( + self._wait_callback(sync_id), timeout=timeout + ) + except asyncio.TimeoutError: + raise CallbackNotReceivedError(sync_id) from None + + return callback + + async def pop_botx_method_callback( + self, sync_id: UUID + ) -> "asyncio.Future[BotXMethodCallback]": + return await self._get_callback(self._get_callback_uid(sync_id)) # type: ignore + + async def stop_callbacks_waiting(self) -> None: + await self._redis.delete(self.CACHE_KEY) + + async def _wait_callback(self, sync_id: UUID) -> BotXMethodCallback: + callback_uid = self._get_callback_uid(sync_id) + while True: + callback = await self._get_callback(callback_uid) + + if callback: + return callback + + await asyncio.sleep(self.CHECK_CALLBACKS_DELAY) + + async def _get_callback(self, callback_uid: bytes) -> Optional[BotXMethodCallback]: + for cid, callback in await self._redis.xrange(self.CACHE_KEY): + if callback_uid not in callback: + continue + + await self._redis.xdel(self.CACHE_KEY, cid) + return pickle.loads(callback[callback_uid]) # noqa: S301 + + return None + + def _get_callback_uid(self, sync_id: UUID) -> bytes: + return f"{self._prefix}:{sync_id}".encode() diff --git a/app/main.py b/app/main.py index 3bd4916..0e1d653 100644 --- a/app/main.py +++ b/app/main.py @@ -2,32 +2,45 @@ from functools import partial +import aioredis from fastapi import FastAPI from pybotx import Bot from app.api.routers import router from app.bot.bot import get_bot +from app.caching.callback_redis_repo import CallbackRedisRepo from app.caching.redis_repo import RedisRepo from app.db.sqlalchemy import build_db_session_factory from app.resources import strings from app.settings import settings -async def startup(bot: Bot) -> None: - # -- Bot -- - await bot.startup() - +async def startup(application: FastAPI) -> None: # -- Database -- - bot.state.db_session_factory = await build_db_session_factory() + db_session_factory = await build_db_session_factory() # -- Redis -- - bot.state.redis_repo = await RedisRepo.init( + redis_repo = await RedisRepo.init( dsn=settings.REDIS_DSN, prefix=strings.BOT_PROJECT_NAME ) + redis_client = await aioredis.create_redis_pool(settings.REDIS_DSN) + + # -- Bot -- + callback_repo = CallbackRedisRepo(redis_client) + bot = get_bot(callback_repo) + + await bot.startup() + + bot.state.db_session_factory = db_session_factory + bot.state.redis_repo = redis_repo + + application.state.bot = bot + application.state.redis = redis_client -async def shutdown(bot: Bot) -> None: +async def shutdown(application: FastAPI) -> None: # -- Bot -- + bot: Bot = application.state.bot await bot.shutdown() # -- Redis -- @@ -36,13 +49,10 @@ async def shutdown(bot: Bot) -> None: def get_application() -> FastAPI: """Create configured server application instance.""" - bot = get_bot() - application = FastAPI(title=strings.BOT_PROJECT_NAME) - application.state.bot = bot - application.add_event_handler("startup", partial(startup, bot)) - application.add_event_handler("shutdown", partial(shutdown, bot)) + application.add_event_handler("startup", partial(startup, application)) + application.add_event_handler("shutdown", partial(shutdown, application)) application.include_router(router) diff --git a/pyproject.toml b/pyproject.toml index b229b17..0ed7fd4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -2,7 +2,7 @@ [tool.poetry] name = "todo-bot" -version = "2.0.7" +version = "2.0.8" description = "TODO" authors = []