Skip to content

Commit

Permalink
Merge pull request #31 from Steve-Bupyc/extra-params
Browse files Browse the repository at this point in the history
  • Loading branch information
s3rius authored Aug 14, 2024
2 parents ea62465 + b1c71ba commit 8fdbc43
Showing 1 changed file with 10 additions and 0 deletions.
10 changes: 10 additions & 0 deletions taskiq_aio_pika/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ def __init__( # noqa: WPS211
exchange_type: ExchangeType = ExchangeType.TOPIC,
max_priority: Optional[int] = None,
delayed_message_exchange_plugin: bool = False,
declare_exchange_kwargs: Optional[Dict[Any, Any]] = None,
declare_queues_kwargs: Optional[Dict[Any, Any]] = None,
**connection_kwargs: Any,
) -> None:
"""
Expand Down Expand Up @@ -80,6 +82,8 @@ def __init__( # noqa: WPS211
:param max_priority: maximum priority value for messages.
:param delayed_message_exchange_plugin: turn on or disable
delayed-message-exchange rabbitmq plugin.
:param declare_exchange_kwargs: additional from AbstractChannel.declare_exchange
:param declare_queues_kwargs: additional from AbstractChannel.declare_queue
:param connection_kwargs: additional keyword arguments,
for connect_robust method of aio-pika.
"""
Expand All @@ -92,7 +96,9 @@ def __init__( # noqa: WPS211
self._exchange_type = exchange_type
self._qos = qos
self._declare_exchange = declare_exchange
self._declare_exchange_kwargs = declare_exchange_kwargs or {}
self._declare_queues = declare_queues
self._declare_queues_kwargs = declare_queues_kwargs or {}
self._queue_name = queue_name
self._routing_key = routing_key
self._max_priority = max_priority
Expand Down Expand Up @@ -135,6 +141,7 @@ async def startup(self) -> None: # noqa: WPS217
await self.write_channel.declare_exchange(
self._exchange_name,
type=self._exchange_type,
**self._declare_exchange_kwargs,
)

if self._delayed_message_exchange_plugin:
Expand Down Expand Up @@ -178,6 +185,7 @@ async def declare_queues(
"""
await channel.declare_queue(
self._dead_letter_queue_name,
**self._declare_queues_kwargs,
)
args: "Dict[str, Any]" = {
"x-dead-letter-exchange": "",
Expand All @@ -188,6 +196,7 @@ async def declare_queues(
queue = await channel.declare_queue(
self._queue_name,
arguments=args,
**self._declare_queues_kwargs,
)
if self._delayed_message_exchange_plugin:
await queue.bind(
Expand All @@ -201,6 +210,7 @@ async def declare_queues(
"x-dead-letter-exchange": "",
"x-dead-letter-routing-key": self._queue_name,
},
**self._declare_queues_kwargs,
)

await queue.bind(
Expand Down

0 comments on commit 8fdbc43

Please sign in to comment.