Skip to content

Commit

Permalink
Merge pull request #70 from taskiq-python/develop
Browse files Browse the repository at this point in the history
release 0.2.0
  • Loading branch information
Lancetnik authored Aug 21, 2024
2 parents 471b8cb + f37079b commit b113ee0
Show file tree
Hide file tree
Showing 8 changed files with 65 additions and 36 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ jobs:
run: python -m build

- name: Publish
uses: pypa/gh-action-pypi-publish@v1.8.14
uses: pypa/gh-action-pypi-publish@v1.9.0
with:
password: ${{ secrets.PYPI_TOKEN }}

Expand Down
9 changes: 7 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ classifiers = [
dynamic = ["version"]

dependencies = [
"taskiq>=0.10.0,<1.0.0",
"taskiq>=0.11.0,<0.12.0",
"faststream>=0.3.14,<0.6.0",
]

Expand All @@ -59,6 +59,10 @@ kafka = [
"faststream[kafka]"
]

confluent = [
"faststream[confluent]"
]

redis = [
"faststream[redis]"
]
Expand All @@ -68,6 +72,7 @@ test = [
"taskiq-faststream[nats]",
"taskiq-faststream[rabbit]",
"taskiq-faststream[kafka]",
"taskiq-faststream[confluent]",
"taskiq-faststream[redis]",

"coverage[toml]>=7.2.0,<8.0.0",
Expand All @@ -77,7 +82,7 @@ test = [
dev = [
"taskiq-faststream[test]",

"mypy>=1.8.0,<1.10.0",
"mypy>=1.8.0,<1.12.0",
"ruff==0.4.1",
"pre-commit >=3.6.0,<4.0.0",
]
Expand Down
3 changes: 2 additions & 1 deletion taskiq_faststream/__about__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
"""FastStream - taskiq integration to schedule FastStream tasks."""
__version__ = "0.1.8"

__version__ = "0.2.0"
22 changes: 9 additions & 13 deletions taskiq_faststream/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
import anyio
from faststream.app import FastStream
from faststream.types import SendableMessage
from taskiq import AsyncBroker, BrokerMessage
from taskiq import AsyncBroker
from taskiq.acks import AckableMessage
from taskiq.decor import AsyncTaskiqDecoratedTask
from typing_extensions import TypeAlias, override

from taskiq_faststream.serializer import PatchedSerializer
from taskiq_faststream.formatter import PatchedFormatter, PathcedMessage
from taskiq_faststream.types import ScheduledTask
from taskiq_faststream.utils import resolve_msg

Expand All @@ -33,7 +33,7 @@ class BrokerWrapper(AsyncBroker):

def __init__(self, broker: Any) -> None:
super().__init__()
self.serializer = PatchedSerializer()
self.formatter = PatchedFormatter()
self.broker = broker

async def startup(self) -> None:
Expand All @@ -46,7 +46,7 @@ async def shutdown(self) -> None:
await self.broker.close()
await super().shutdown()

async def kick(self, message: BrokerMessage) -> None:
async def kick(self, message: PathcedMessage) -> None: # type: ignore[override]
"""Call wrapped FastStream broker `publish` method."""
await _broker_publish(self.broker, message)

Expand Down Expand Up @@ -109,7 +109,7 @@ class AppWrapper(BrokerWrapper):

def __init__(self, app: FastStream) -> None:
super(BrokerWrapper, self).__init__()
self.serializer = PatchedSerializer()
self.formatter = PatchedFormatter()
self.app = app

async def startup(self) -> None:
Expand All @@ -122,7 +122,7 @@ async def shutdown(self) -> None:
await self.app._shutdown() # noqa: SLF001
await super(BrokerWrapper, self).shutdown()

async def kick(self, message: BrokerMessage) -> None:
async def kick(self, message: PathcedMessage) -> None: # type: ignore[override]
"""Call wrapped FastStream broker `publish` method."""
assert ( # noqa: S101
self.app.broker
Expand All @@ -132,11 +132,7 @@ async def kick(self, message: BrokerMessage) -> None:

async def _broker_publish(
broker: Any,
message: BrokerMessage,
message: PathcedMessage,
) -> None:
labels = message.labels
labels.pop("schedule", None)
async for msg in resolve_msg(
msg=labels.pop("message", message.message),
):
await broker.publish(msg, **labels)
async for msg in resolve_msg(message.body):
await broker.publish(msg, **message.labels)
45 changes: 45 additions & 0 deletions taskiq_faststream/formatter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from dataclasses import dataclass
from typing import Any, Dict

from taskiq.abc.formatter import TaskiqFormatter
from taskiq.message import TaskiqMessage


@dataclass
class PathcedMessage:
"""DTO to transfer data to `broker.kick`."""

body: Any
labels: Dict[str, Any]


class PatchedFormatter(TaskiqFormatter):
"""Default taskiq formatter."""

def dumps( # type: ignore[override]
self,
message: TaskiqMessage,
) -> PathcedMessage:
"""
Dumps taskiq message to some broker message format.
:param message: message to send.
:return: Dumped message.
"""
labels = message.labels
labels.pop("schedule", None)
labels.pop("schedule_id", None)

return PathcedMessage(
body=labels.pop("message", None),
labels=labels,
)

def loads(self, message: bytes) -> TaskiqMessage:
"""
Loads json from message.
:param message: broker's message.
:return: parsed taskiq message.
"""
raise NotImplementedError
17 changes: 0 additions & 17 deletions taskiq_faststream/serializer.py

This file was deleted.

1 change: 0 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,5 @@ def mock() -> MagicMock:


@pytest.fixture()
@pytest.mark.anyio
async def event() -> asyncio.Event:
return asyncio.Event()
2 changes: 1 addition & 1 deletion tests/testcase.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ async def handler(msg: str) -> None:
**{self.subj_name: subject},
schedule=[
{
"time": datetime.utcnow(),
"time": datetime.utcnow(), # old python compat
},
],
)
Expand Down

0 comments on commit b113ee0

Please sign in to comment.