Skip to content

Commit

Permalink
refactor: sleep with listen
Browse files Browse the repository at this point in the history
  • Loading branch information
Lancetnik authored Nov 12, 2023
2 parents 2be2c32 + 8320512 commit a626beb
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 18 deletions.
2 changes: 1 addition & 1 deletion taskiq_faststream/__about__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
"""FastStream - taskiq integration to schedule FastStream tasks."""
__version__ = "0.1.1"
__version__ = "0.1.2"
35 changes: 18 additions & 17 deletions taskiq_faststream/broker.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import typing
import warnings

import anyio
from faststream._compat import TypeAlias, override
from faststream.app import FastStream
from faststream.broker.core.asyncronous import BrokerAsyncUsecase
Expand All @@ -23,8 +24,8 @@ class BrokerWrapper(AsyncBroker):
Methods:
__init__ : Initializes the object.
startup : Startup wrapper FastStream broker.
shutdown : Shutdown wrapper FastStream broker.
startup : Startup wrapped FastStream broker.
shutdown : Shutdown wrapped FastStream broker.
kick : Call wrapped FastStream broker `publish` method.
task : Register FastStream scheduled task.
"""
Expand All @@ -34,12 +35,12 @@ def __init__(self, broker: BrokerAsyncUsecase[typing.Any, typing.Any]) -> None:
self.broker = broker

async def startup(self) -> None:
"""Startup wrapper FastStream broker."""
"""Startup wrapped FastStream broker."""
await super().startup()
await self.broker.start()

async def shutdown(self) -> None:
"""Shutdown wrapper FastStream broker."""
"""Shutdown wrapped FastStream broker."""
await self.broker.close()
await super().shutdown()

Expand All @@ -51,17 +52,17 @@ async def listen(
self,
) -> typing.AsyncGenerator[typing.Union[bytes, AckableMessage], None]:
"""Not supported method."""
warnings.warn(
message=(
f"{self.__class__.__name__} doesn't support `listen` method. "
"Please, use it only to register a task."
),
category=RuntimeWarning,
stacklevel=1,
)

while True:
warnings.warn(
message=(
f"{self.__class__.__name__} doesn't support `listen` method. "
"Please, use it only to register a task."
),
category=RuntimeWarning,
stacklevel=1,
)
yield b""
await anyio.sleep(60)

@override
def task( # type: ignore[override]
Expand Down Expand Up @@ -98,8 +99,8 @@ class AppWrapper(BrokerWrapper):
Methods:
__init__ : Initializes the object.
startup : Startup wrapper FastStream.
shutdown : Shutdown wrapper FastStream.
startup : Startup wrapped FastStream.
shutdown : Shutdown wrapped FastStream.
kick : Call wrapped FastStream broker `publish` method.
task : Register FastStream scheduled task.
"""
Expand All @@ -109,12 +110,12 @@ def __init__(self, app: FastStream) -> None:
self.app = app

async def startup(self) -> None:
"""Startup wrapper FastStream broker."""
"""Startup wrapped FastStream."""
await super(BrokerWrapper, self).startup()
await self.app._startup() # noqa: SLF001

async def shutdown(self) -> None:
"""Shutdown wrapper FastStream broker."""
"""Shutdown wrapped FastStream."""
await self.app._shutdown() # noqa: SLF001
await super(BrokerWrapper, self).shutdown()

Expand Down

0 comments on commit a626beb

Please sign in to comment.