From 2beb27db623a154c0970eceb625d5dd81c069419 Mon Sep 17 00:00:00 2001 From: IvanKirpichnikov Date: Tue, 20 Aug 2024 21:22:01 +0300 Subject: [PATCH 1/3] fix #68 --- taskiq_faststream/__about__.py | 1 + taskiq_faststream/broker.py | 2 ++ taskiq_faststream/formatter.py | 48 ++++++++++++++++++++++++++++++++++ 3 files changed, 51 insertions(+) create mode 100644 taskiq_faststream/formatter.py diff --git a/taskiq_faststream/__about__.py b/taskiq_faststream/__about__.py index 67739d1..e6aa654 100644 --- a/taskiq_faststream/__about__.py +++ b/taskiq_faststream/__about__.py @@ -1,2 +1,3 @@ """FastStream - taskiq integration to schedule FastStream tasks.""" + __version__ = "0.1.8" diff --git a/taskiq_faststream/broker.py b/taskiq_faststream/broker.py index f295316..1d44461 100644 --- a/taskiq_faststream/broker.py +++ b/taskiq_faststream/broker.py @@ -10,6 +10,7 @@ from taskiq.decor import AsyncTaskiqDecoratedTask from typing_extensions import TypeAlias, override +from taskiq_faststream.formatter import PatchedFormatter from taskiq_faststream.serializer import PatchedSerializer from taskiq_faststream.types import ScheduledTask from taskiq_faststream.utils import resolve_msg @@ -34,6 +35,7 @@ class BrokerWrapper(AsyncBroker): def __init__(self, broker: Any) -> None: super().__init__() self.serializer = PatchedSerializer() + self.formatter = PatchedFormatter(self) self.broker = broker async def startup(self) -> None: diff --git a/taskiq_faststream/formatter.py b/taskiq_faststream/formatter.py new file mode 100644 index 0000000..235fbd8 --- /dev/null +++ b/taskiq_faststream/formatter.py @@ -0,0 +1,48 @@ +from typing import Any, Dict + +from taskiq.abc.broker import AsyncBroker +from taskiq.abc.formatter import TaskiqFormatter +from taskiq.compat import IS_PYDANTIC2, Model, model_dump, model_validate +from taskiq.message import BrokerMessage, TaskiqMessage + +if IS_PYDANTIC2: + + def model_dump(instance: Model) -> Dict[str, Any]: + """Model dump.""" + return instance.model_dump() + +else: + + def model_dump(instance: Model) -> Dict[str, Any]: + """Model dump.""" + return instance.dict() + + +class PatchedFormatter(TaskiqFormatter): + """Default taskiq formatter.""" + + def __init__(self, broker: AsyncBroker) -> None: + self.broker = broker + + def dumps(self, message: TaskiqMessage) -> BrokerMessage: + """ + Dumps taskiq message to some broker message format. + + :param message: message to send. + :return: Dumped message. + """ + return BrokerMessage( + task_id=message.task_id, + task_name=message.task_name, + message=self.broker.serializer.dumpb(model_dump(message)), + labels=message.labels, + ) + + def loads(self, message: bytes) -> TaskiqMessage: + """ + Loads json from message. + + :param message: broker's message. + :return: parsed taskiq message. + """ + return model_validate(TaskiqMessage, self.broker.serializer.loadb(message)) From 0b354543b61cb9ba62d49c1ac7cecfba988560ef Mon Sep 17 00:00:00 2001 From: Nikita Pastukhov Date: Wed, 21 Aug 2024 23:12:13 +0300 Subject: [PATCH 2/3] fix: taskiq 0.11 compat --- .github/workflows/release.yml | 2 +- pyproject.toml | 9 ++++++-- taskiq_faststream/__about__.py | 2 +- taskiq_faststream/broker.py | 24 ++++++++----------- taskiq_faststream/formatter.py | 41 +++++++++++++++------------------ taskiq_faststream/serializer.py | 17 -------------- tests/conftest.py | 1 - tests/testcase.py | 4 ++-- 8 files changed, 39 insertions(+), 61 deletions(-) delete mode 100644 taskiq_faststream/serializer.py diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 8b8429b..5dfd5d4 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -39,7 +39,7 @@ jobs: run: python -m build - name: Publish - uses: pypa/gh-action-pypi-publish@v1.8.14 + uses: pypa/gh-action-pypi-publish@v1.9.0 with: password: ${{ secrets.PYPI_TOKEN }} diff --git a/pyproject.toml b/pyproject.toml index 9e4c486..50c24b4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,7 +42,7 @@ classifiers = [ dynamic = ["version"] dependencies = [ - "taskiq>=0.10.0,<1.0.0", + "taskiq>=0.11.0,<0.12.0", "faststream>=0.3.14,<0.6.0", ] @@ -59,6 +59,10 @@ kafka = [ "faststream[kafka]" ] +confluent = [ + "faststream[confluent]" +] + redis = [ "faststream[redis]" ] @@ -68,6 +72,7 @@ test = [ "taskiq-faststream[nats]", "taskiq-faststream[rabbit]", "taskiq-faststream[kafka]", + "taskiq-faststream[confluent]", "taskiq-faststream[redis]", "coverage[toml]>=7.2.0,<8.0.0", @@ -77,7 +82,7 @@ test = [ dev = [ "taskiq-faststream[test]", - "mypy>=1.8.0,<1.10.0", + "mypy>=1.8.0,<1.12.0", "ruff==0.4.1", "pre-commit >=3.6.0,<4.0.0", ] diff --git a/taskiq_faststream/__about__.py b/taskiq_faststream/__about__.py index e6aa654..d9e60ec 100644 --- a/taskiq_faststream/__about__.py +++ b/taskiq_faststream/__about__.py @@ -1,3 +1,3 @@ """FastStream - taskiq integration to schedule FastStream tasks.""" -__version__ = "0.1.8" +__version__ = "0.2.0" diff --git a/taskiq_faststream/broker.py b/taskiq_faststream/broker.py index 1d44461..35abc1e 100644 --- a/taskiq_faststream/broker.py +++ b/taskiq_faststream/broker.py @@ -5,13 +5,12 @@ import anyio from faststream.app import FastStream from faststream.types import SendableMessage -from taskiq import AsyncBroker, BrokerMessage +from taskiq import AsyncBroker from taskiq.acks import AckableMessage from taskiq.decor import AsyncTaskiqDecoratedTask from typing_extensions import TypeAlias, override -from taskiq_faststream.formatter import PatchedFormatter -from taskiq_faststream.serializer import PatchedSerializer +from taskiq_faststream.formatter import PatchedFormatter, PathcedMessage from taskiq_faststream.types import ScheduledTask from taskiq_faststream.utils import resolve_msg @@ -34,8 +33,7 @@ class BrokerWrapper(AsyncBroker): def __init__(self, broker: Any) -> None: super().__init__() - self.serializer = PatchedSerializer() - self.formatter = PatchedFormatter(self) + self.formatter = PatchedFormatter() self.broker = broker async def startup(self) -> None: @@ -48,7 +46,7 @@ async def shutdown(self) -> None: await self.broker.close() await super().shutdown() - async def kick(self, message: BrokerMessage) -> None: + async def kick(self, message: PathcedMessage) -> None: # type: ignore[override] """Call wrapped FastStream broker `publish` method.""" await _broker_publish(self.broker, message) @@ -111,7 +109,7 @@ class AppWrapper(BrokerWrapper): def __init__(self, app: FastStream) -> None: super(BrokerWrapper, self).__init__() - self.serializer = PatchedSerializer() + self.formatter = PatchedFormatter() self.app = app async def startup(self) -> None: @@ -124,7 +122,7 @@ async def shutdown(self) -> None: await self.app._shutdown() # noqa: SLF001 await super(BrokerWrapper, self).shutdown() - async def kick(self, message: BrokerMessage) -> None: + async def kick(self, message: PathcedMessage) -> None: # type: ignore[override] """Call wrapped FastStream broker `publish` method.""" assert ( # noqa: S101 self.app.broker @@ -134,11 +132,7 @@ async def kick(self, message: BrokerMessage) -> None: async def _broker_publish( broker: Any, - message: BrokerMessage, + message: PathcedMessage, ) -> None: - labels = message.labels - labels.pop("schedule", None) - async for msg in resolve_msg( - msg=labels.pop("message", message.message), - ): - await broker.publish(msg, **labels) + async for msg in resolve_msg(message.body): + await broker.publish(msg, **message.labels) diff --git a/taskiq_faststream/formatter.py b/taskiq_faststream/formatter.py index 235fbd8..aa9bcc8 100644 --- a/taskiq_faststream/formatter.py +++ b/taskiq_faststream/formatter.py @@ -1,41 +1,38 @@ +from dataclasses import dataclass from typing import Any, Dict -from taskiq.abc.broker import AsyncBroker from taskiq.abc.formatter import TaskiqFormatter -from taskiq.compat import IS_PYDANTIC2, Model, model_dump, model_validate -from taskiq.message import BrokerMessage, TaskiqMessage +from taskiq.message import TaskiqMessage -if IS_PYDANTIC2: - def model_dump(instance: Model) -> Dict[str, Any]: - """Model dump.""" - return instance.model_dump() +@dataclass +class PathcedMessage: + """DTO to transfer data to `broker.kick`.""" -else: - - def model_dump(instance: Model) -> Dict[str, Any]: - """Model dump.""" - return instance.dict() + body: Any + labels: Dict[str, Any] class PatchedFormatter(TaskiqFormatter): """Default taskiq formatter.""" - def __init__(self, broker: AsyncBroker) -> None: - self.broker = broker - - def dumps(self, message: TaskiqMessage) -> BrokerMessage: + def dumps( # type: ignore[override] + self, + message: TaskiqMessage, + ) -> PathcedMessage: """ Dumps taskiq message to some broker message format. :param message: message to send. :return: Dumped message. """ - return BrokerMessage( - task_id=message.task_id, - task_name=message.task_name, - message=self.broker.serializer.dumpb(model_dump(message)), - labels=message.labels, + labels = message.labels + labels.pop("schedule", None) + labels.pop("schedule_id", None) + + return PathcedMessage( + body=labels.pop("message", None), + labels=labels, ) def loads(self, message: bytes) -> TaskiqMessage: @@ -45,4 +42,4 @@ def loads(self, message: bytes) -> TaskiqMessage: :param message: broker's message. :return: parsed taskiq message. """ - return model_validate(TaskiqMessage, self.broker.serializer.loadb(message)) + raise NotImplementedError diff --git a/taskiq_faststream/serializer.py b/taskiq_faststream/serializer.py deleted file mode 100644 index c4900b5..0000000 --- a/taskiq_faststream/serializer.py +++ /dev/null @@ -1,17 +0,0 @@ -from typing import Any - -from taskiq.serializers.json_serializer import JSONSerializer - - -class PatchedSerializer(JSONSerializer): - """Patched serializer removes labels.""" - - def dumpb(self, value: Any) -> bytes: - """ - Dumps taskiq message to some broker message format. - - :param message: message to send. - :return: Dumped message. - """ - del value["labels"] - return super().dumpb(value) diff --git a/tests/conftest.py b/tests/conftest.py index 1bf01ae..c990036 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -27,6 +27,5 @@ def mock() -> MagicMock: @pytest.fixture() -@pytest.mark.anyio async def event() -> asyncio.Event: return asyncio.Event() diff --git a/tests/testcase.py b/tests/testcase.py index 2374bfb..f263a20 100644 --- a/tests/testcase.py +++ b/tests/testcase.py @@ -1,5 +1,5 @@ import asyncio -from datetime import datetime +import datetime from typing import Any from unittest.mock import MagicMock @@ -44,7 +44,7 @@ async def handler(msg: str) -> None: **{self.subj_name: subject}, schedule=[ { - "time": datetime.utcnow(), + "time": datetime.datetime.now(datetime.UTC), }, ], ) From f37079b1170106939b0af08f629246412eb4e457 Mon Sep 17 00:00:00 2001 From: Nikita Pastukhov Date: Wed, 21 Aug 2024 23:15:35 +0300 Subject: [PATCH 3/3] tests: old python compat --- tests/testcase.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/testcase.py b/tests/testcase.py index f263a20..2aa958f 100644 --- a/tests/testcase.py +++ b/tests/testcase.py @@ -1,5 +1,5 @@ import asyncio -import datetime +from datetime import datetime from typing import Any from unittest.mock import MagicMock @@ -44,7 +44,7 @@ async def handler(msg: str) -> None: **{self.subj_name: subject}, schedule=[ { - "time": datetime.datetime.now(datetime.UTC), + "time": datetime.utcnow(), # old python compat }, ], )