Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add redis callback repo #43

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions app/bot/bot.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Configuration for bot instance."""

from pybotx import Bot
from pybotx import Bot, CallbackRepoProto
from pybotx_fsm import FSMMiddleware

from app.bot.commands import common
Expand All @@ -10,7 +10,7 @@
from app.settings import settings


def get_bot() -> Bot:
def get_bot(callback_repo: CallbackRepoProto) -> Bot:
return Bot(
collectors=[
common.collector,
Expand All @@ -26,4 +26,5 @@ def get_bot() -> Bot:
[create_task.fsm, get_tasks.fsm], state_repo_key="redis_repo"
),
],
callback_repo=callback_repo,
)
77 changes: 77 additions & 0 deletions app/caching/callback_redis_repo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
"""Repository for work callbacks with redis."""

import asyncio
import pickle # noqa: S403
from typing import Optional
from uuid import UUID

# from redis import asyncio as aioredis # type: ignore
import aioredis
from pybotx import CallbackNotReceivedError, CallbackRepoProto
from pybotx.models.method_callbacks import BotXMethodCallback

from app.resources import strings


class CallbackRedisRepo(CallbackRepoProto):
CACHE_KEY = f"{strings.BOT_PROJECT_NAME}:callbacks_stream"
CHECK_CALLBACKS_DELAY = 0.01

def __init__(self, redis: aioredis.Redis, prefix: Optional[str] = None):
self._redis = redis
self._prefix = prefix or ""

async def create_botx_method_callback(self, sync_id: UUID) -> None:
"""Unnecessary in streams implementation."""

async def set_botx_method_callback_result(
self, callback: BotXMethodCallback
) -> None:
dump = pickle.dumps(callback)
await self._redis.xadd(
self.CACHE_KEY,
{f"{self._prefix}:{callback.sync_id}": dump},
)

async def wait_botx_method_callback(
self, sync_id: UUID, timeout: float
) -> BotXMethodCallback:
try:
callback = await asyncio.wait_for(
self._wait_callback(sync_id), timeout=timeout
)
except asyncio.TimeoutError:
raise CallbackNotReceivedError(sync_id) from None

return callback

async def pop_botx_method_callback(
self, sync_id: UUID
) -> "asyncio.Future[BotXMethodCallback]":
return await self._get_callback(self._get_callback_uid(sync_id)) # type: ignore

async def stop_callbacks_waiting(self) -> None:
await self._redis.delete(self.CACHE_KEY)

async def _wait_callback(self, sync_id: UUID) -> BotXMethodCallback:
callback_uid = self._get_callback_uid(sync_id)
while True:
callback = await self._get_callback(callback_uid)

if callback:
return callback

await asyncio.sleep(self.CHECK_CALLBACKS_DELAY)

async def _get_callback(self, callback_uid: bytes) -> Optional[BotXMethodCallback]:
for cid, callback in await self._redis.xrange(self.CACHE_KEY):
if callback_uid not in callback:
continue

await self._redis.xdel(self.CACHE_KEY, cid)
return pickle.loads(callback[callback_uid]) # noqa: S301

return None

def _get_callback_uid(self, sync_id: UUID) -> bytes:
return f"{self._prefix}:{sync_id}".encode()
34 changes: 22 additions & 12 deletions app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,45 @@

from functools import partial

import aioredis
from fastapi import FastAPI
from pybotx import Bot

from app.api.routers import router
from app.bot.bot import get_bot
from app.caching.callback_redis_repo import CallbackRedisRepo
from app.caching.redis_repo import RedisRepo
from app.db.sqlalchemy import build_db_session_factory
from app.resources import strings
from app.settings import settings


async def startup(bot: Bot) -> None:
# -- Bot --
await bot.startup()

async def startup(application: FastAPI) -> None:
# -- Database --
bot.state.db_session_factory = await build_db_session_factory()
db_session_factory = await build_db_session_factory()

# -- Redis --
bot.state.redis_repo = await RedisRepo.init(
redis_repo = await RedisRepo.init(
dsn=settings.REDIS_DSN, prefix=strings.BOT_PROJECT_NAME
)
redis_client = await aioredis.create_redis_pool(settings.REDIS_DSN)

# -- Bot --
callback_repo = CallbackRedisRepo(redis_client)
bot = get_bot(callback_repo)

await bot.startup()

bot.state.db_session_factory = db_session_factory
bot.state.redis_repo = redis_repo

application.state.bot = bot
application.state.redis = redis_client


async def shutdown(bot: Bot) -> None:
async def shutdown(application: FastAPI) -> None:
# -- Bot --
bot: Bot = application.state.bot
await bot.shutdown()

# -- Redis --
Expand All @@ -36,13 +49,10 @@ async def shutdown(bot: Bot) -> None:

def get_application() -> FastAPI:
"""Create configured server application instance."""
bot = get_bot()

application = FastAPI(title=strings.BOT_PROJECT_NAME)
application.state.bot = bot

application.add_event_handler("startup", partial(startup, bot))
application.add_event_handler("shutdown", partial(shutdown, bot))
application.add_event_handler("startup", partial(startup, application))
application.add_event_handler("shutdown", partial(shutdown, application))

application.include_router(router)

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

[tool.poetry]
name = "todo-bot"
version = "2.0.7"
version = "2.0.8"
description = "TODO"
authors = []

Expand Down
Loading