Skip to content

Commit

Permalink
Make OPERATION_TIMEOUT object default timeout parameters value
Browse files Browse the repository at this point in the history
  • Loading branch information
decaz committed Nov 20, 2019
1 parent 3787b94 commit 7aa9991
Show file tree
Hide file tree
Showing 10 changed files with 79 additions and 56 deletions.
23 changes: 13 additions & 10 deletions aio_pika/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from .exchange import Exchange, ExchangeType
from .message import IncomingMessage
from .queue import Queue
from .tools import OPERATION_TIMEOUT
from .transaction import Transaction
from .types import ReturnCallbackType, CloseCallbackType, TimeoutType

Expand Down Expand Up @@ -107,10 +108,11 @@ def channel(self) -> aiormq.Channel:
def number(self):
return self.channel.number if self._channel else None

def _get_operation_timeout(self, timeout: TimeoutType = None):
if timeout is not None:
return timeout
return self._connection.operation_timeout
def _get_operation_timeout(self, timeout: TimeoutType):
return (
self._connection.operation_timeout if timeout is OPERATION_TIMEOUT
else timeout
)

def __str__(self):
return "{0}".format(
Expand Down Expand Up @@ -164,7 +166,8 @@ async def _create_channel(self) -> aiormq.Channel:
channel_number=self._channel_number,
)

async def initialize(self, timeout: TimeoutType = None) -> None:
async def initialize(self,
timeout: TimeoutType = OPERATION_TIMEOUT) -> None:
if self._channel is not None:
raise RuntimeError("Can't initialize channel")

Expand Down Expand Up @@ -196,7 +199,7 @@ async def declare_exchange(
self, name: str, type: Union[ExchangeType, str] = ExchangeType.DIRECT,
durable: bool = None, auto_delete: bool = False,
internal: bool = False, passive: bool = False, arguments: dict = None,
timeout: TimeoutType = None
timeout: TimeoutType = OPERATION_TIMEOUT
) -> Exchange:
"""
Declare an exchange.
Expand Down Expand Up @@ -234,7 +237,7 @@ async def declare_queue(
self, name: str = None, *, durable: bool = None,
exclusive: bool = False, passive: bool = False,
auto_delete: bool = False, arguments: dict = None,
timeout: TimeoutType = None
timeout: TimeoutType = OPERATION_TIMEOUT
) -> Queue:

"""
Expand Down Expand Up @@ -270,7 +273,7 @@ async def declare_queue(

async def set_qos(
self, prefetch_count: int = 0, prefetch_size: int = 0,
global_: bool = False, timeout: TimeoutType = None,
global_: bool = False, timeout: TimeoutType = OPERATION_TIMEOUT,
all_channels: bool = None
) -> aiormq.spec.Basic.QosOk:
if all_channels is not None:
Expand All @@ -287,7 +290,7 @@ async def set_qos(
)

async def queue_delete(
self, queue_name: str, timeout: TimeoutType = None,
self, queue_name: str, timeout: TimeoutType = OPERATION_TIMEOUT,
if_unused: bool = False, if_empty: bool = False, nowait: bool = False
) -> aiormq.spec.Queue.DeleteOk:

Expand All @@ -302,7 +305,7 @@ async def queue_delete(
)

async def exchange_delete(
self, exchange_name: str, timeout: TimeoutType = None,
self, exchange_name: str, timeout: TimeoutType = OPERATION_TIMEOUT,
if_unused: bool = False, nowait: bool = False
) -> aiormq.spec.Exchange.DeleteOk:

Expand Down
4 changes: 2 additions & 2 deletions aio_pika/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,8 +323,8 @@ async def main():
)

connection = connection_class(
url,
operation_timeout=operation_timeout,
url,
operation_timeout=operation_timeout,
loop=loop
)

Expand Down
20 changes: 11 additions & 9 deletions aio_pika/exchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import aiormq
from .message import Message
from .tools import OPERATION_TIMEOUT
from .types import ExchangeType as ExchangeType_, TimeoutType


Expand Down Expand Up @@ -53,10 +54,11 @@ def channel(self) -> aiormq.Channel:

return self._channel

def _get_operation_timeout(self, timeout: TimeoutType = None):
if timeout is not None:
return timeout
return self._connection.operation_timeout
def _get_operation_timeout(self, timeout: TimeoutType):
return (
self._connection.operation_timeout if timeout is OPERATION_TIMEOUT
else timeout
)

def __str__(self):
return self.name
Expand All @@ -67,7 +69,7 @@ def __repr__(self):
)

async def declare(
self, timeout: TimeoutType = None
self, timeout: TimeoutType = OPERATION_TIMEOUT
) -> aiormq.spec.Exchange.DeclareOk:
return await asyncio.wait_for(self.channel.exchange_declare(
self.name,
Expand All @@ -91,7 +93,7 @@ def _get_exchange_name(exchange: ExchangeType_):

async def bind(
self, exchange: ExchangeType_, routing_key: str = '', *,
arguments: dict = None, timeout: TimeoutType = None
arguments: dict = None, timeout: TimeoutType = OPERATION_TIMEOUT
) -> aiormq.spec.Exchange.BindOk:

""" A binding can also be a relationship between two exchanges.
Expand Down Expand Up @@ -144,7 +146,7 @@ async def bind(

async def unbind(
self, exchange: ExchangeType_, routing_key: str = '',
arguments: dict = None, timeout: TimeoutType = None
arguments: dict = None, timeout: TimeoutType = OPERATION_TIMEOUT
) -> aiormq.spec.Exchange.UnbindOk:

""" Remove exchange-to-exchange binding for this
Expand Down Expand Up @@ -174,7 +176,7 @@ async def unbind(

async def publish(
self, message: Message, routing_key, *, mandatory: bool = True,
immediate: bool = False, timeout: TimeoutType = None
immediate: bool = False, timeout: TimeoutType = OPERATION_TIMEOUT
) -> Optional[aiormq.types.ConfirmationFrameType]:

""" Publish the message to the queue. `aio-pika` uses
Expand Down Expand Up @@ -207,7 +209,7 @@ async def publish(
)

async def delete(
self, if_unused: bool = False, timeout: TimeoutType = None
self, if_unused: bool = False, timeout: TimeoutType = OPERATION_TIMEOUT
) -> aiormq.spec.Exchange.DeleteOk:

""" Delete the queue
Expand Down
31 changes: 17 additions & 14 deletions aio_pika/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from .exchange import Exchange
from .types import ExchangeType as ExchangeType_, TimeoutType
from .message import IncomingMessage
from .tools import create_task, shield
from .tools import OPERATION_TIMEOUT, create_task, shield

log = getLogger(__name__)

Expand Down Expand Up @@ -53,10 +53,11 @@ def channel(self) -> aiormq.Channel:
raise RuntimeError("Channel not opened")
return self._channel

def _get_operation_timeout(self, timeout: TimeoutType = None):
if timeout is not None:
return timeout
return self._connection.operation_timeout
def _get_operation_timeout(self, timeout: TimeoutType):
return (
self._connection.operation_timeout if timeout is OPERATION_TIMEOUT
else timeout
)

def __str__(self):
return "%s" % self.name
Expand All @@ -77,7 +78,7 @@ def __repr__(self):
)

async def declare(
self, timeout: TimeoutType = None
self, timeout: TimeoutType = OPERATION_TIMEOUT
) -> aiormq.spec.Queue.DeclareOk:
""" Declare queue.
Expand All @@ -100,7 +101,7 @@ async def declare(

async def bind(
self, exchange: ExchangeType_, routing_key: str=None, *,
arguments=None, timeout: TimeoutType = None
arguments=None, timeout: TimeoutType = OPERATION_TIMEOUT
) -> aiormq.spec.Queue.BindOk:

""" A binding is a relationship between an exchange and a queue.
Expand Down Expand Up @@ -139,7 +140,7 @@ async def bind(

async def unbind(
self, exchange: ExchangeType_, routing_key: str=None,
arguments: dict=None, timeout: TimeoutType = None
arguments: dict=None, timeout: TimeoutType = OPERATION_TIMEOUT
) -> aiormq.spec.Queue.UnbindOk:

""" Remove binding from exchange for this :class:`Queue` instance
Expand Down Expand Up @@ -173,7 +174,7 @@ async def unbind(
async def consume(
self, callback: Callable[[IncomingMessage], Any], no_ack: bool = False,
exclusive: bool = False, arguments: dict = None,
consumer_tag=None, timeout: TimeoutType = None
consumer_tag=None, timeout: TimeoutType = OPERATION_TIMEOUT
) -> ConsumerTag:

""" Start to consuming the :class:`Queue`.
Expand Down Expand Up @@ -215,8 +216,8 @@ async def consume(
)).consumer_tag

async def cancel(
self, consumer_tag: ConsumerTag, timeout: TimeoutType = None,
nowait: bool=False
self, consumer_tag: ConsumerTag,
timeout: TimeoutType = OPERATION_TIMEOUT, nowait: bool=False
) -> aiormq.spec.Basic.CancelOk:
""" This method cancels a consumer. This does not affect already
delivered messages, but it does mean the server will not send any more
Expand Down Expand Up @@ -244,7 +245,8 @@ async def cancel(
)

async def get(
self, *, no_ack=False, fail=True, timeout=5
self, *, no_ack=False, fail=True,
timeout: TimeoutType = OPERATION_TIMEOUT
) -> Optional[IncomingMessage]:

""" Get message from the queue.
Expand All @@ -270,7 +272,7 @@ async def get(
return IncomingMessage(msg, no_ack=no_ack)

async def purge(
self, no_wait=False, timeout: TimeoutType = None
self, no_wait=False, timeout: TimeoutType = OPERATION_TIMEOUT
) -> aiormq.spec.Queue.PurgeOk:
""" Purge all messages from the queue.
Expand All @@ -289,7 +291,8 @@ async def purge(
)

async def delete(
self, *, if_unused=True, if_empty=True, timeout: TimeoutType = None
self, *, if_unused=True, if_empty=True,
timeout: TimeoutType = OPERATION_TIMEOUT
) -> aiormq.spec.Queue.DeclareOk:

""" Delete the queue.
Expand Down
16 changes: 10 additions & 6 deletions aio_pika/robust_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from .exchange import Exchange, ExchangeType
from .queue import Queue
from .tools import OPERATION_TIMEOUT
from .types import TimeoutType
from .channel import Channel
from .robust_queue import RobustQueue
Expand Down Expand Up @@ -62,7 +63,7 @@ async def on_reconnect(self, connection, channel_number):
for queue in self._queues.values():
await queue.on_reconnect(self)

async def initialize(self, timeout: TimeoutType = None):
async def initialize(self, timeout: TimeoutType = OPERATION_TIMEOUT):
result = await super().initialize()

prefetch_count, prefetch_size, global_ = self._qos
Expand All @@ -76,7 +77,8 @@ async def initialize(self, timeout: TimeoutType = None):
return result

async def set_qos(self, prefetch_count: int = 0, prefetch_size: int = 0,
global_: bool = False, timeout: TimeoutType = None,
global_: bool = False,
timeout: TimeoutType = OPERATION_TIMEOUT,
all_channels: bool = None):
if all_channels is not None:
warn('Use "global_" instead of "all_channels"', DeprecationWarning)
Expand All @@ -96,7 +98,7 @@ async def declare_exchange(self, name: str,
durable: bool = None, auto_delete: bool = False,
internal: bool = False, passive: bool = False,
arguments: dict = None,
timeout: TimeoutType = None,
timeout: TimeoutType = OPERATION_TIMEOUT,
robust: bool = True) -> Exchange:

exchange = await super().declare_exchange(
Expand All @@ -111,7 +113,8 @@ async def declare_exchange(self, name: str,
return exchange

async def exchange_delete(self, exchange_name: str,
timeout: TimeoutType = None, if_unused=False,
timeout: TimeoutType = OPERATION_TIMEOUT,
if_unused=False,
nowait=False) -> aiormq.spec.Exchange.DeleteOk:

result = await super().exchange_delete(
Expand All @@ -126,7 +129,7 @@ async def exchange_delete(self, exchange_name: str,
async def declare_queue(self, name: str = None, *, durable: bool = None,
exclusive: bool = False, passive: bool = False,
auto_delete: bool = False, arguments: dict = None,
timeout: TimeoutType = None,
timeout: TimeoutType = OPERATION_TIMEOUT,
robust: bool = True) -> Queue:

queue = await super().declare_queue(
Expand All @@ -140,7 +143,8 @@ async def declare_queue(self, name: str = None, *, durable: bool = None,

return queue

async def queue_delete(self, queue_name: str, timeout: TimeoutType = None,
async def queue_delete(self, queue_name: str,
timeout: TimeoutType = OPERATION_TIMEOUT,
if_unused: bool = False, if_empty: bool = False,
nowait: bool = False):
result = await super().queue_delete(
Expand Down
6 changes: 4 additions & 2 deletions aio_pika/robust_exchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from .exchange import Exchange, ExchangeType
from .channel import Channel
from .tools import OPERATION_TIMEOUT
from .types import TimeoutType


Expand Down Expand Up @@ -43,7 +44,7 @@ async def on_reconnect(self, channel: Channel):
await self.bind(exchange, **kwargs)

async def bind(self, exchange, routing_key: str='', *,
arguments=None, timeout: TimeoutType = None,
arguments=None, timeout: TimeoutType = OPERATION_TIMEOUT,
robust: bool = True):
result = await super().bind(
exchange, routing_key=routing_key,
Expand All @@ -58,7 +59,8 @@ async def bind(self, exchange, routing_key: str='', *,
return result

async def unbind(self, exchange, routing_key: str = '',
arguments: dict=None, timeout: TimeoutType = None):
arguments: dict=None,
timeout: TimeoutType = OPERATION_TIMEOUT):

result = await super().unbind(exchange, routing_key,
arguments=arguments, timeout=timeout)
Expand Down
12 changes: 8 additions & 4 deletions aio_pika/robust_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import aiormq

from .channel import Channel
from .tools import OPERATION_TIMEOUT
from .types import ExchangeType as ExchangeType_, TimeoutType
from .queue import Queue, ConsumerTag

Expand Down Expand Up @@ -56,7 +57,7 @@ async def on_reconnect(self, channel: Channel):
await self.consume(consumer_tag=consumer_tag, **kwargs)

async def bind(self, exchange: ExchangeType_, routing_key: str=None, *,
arguments=None, timeout: TimeoutType = None,
arguments=None, timeout: TimeoutType = OPERATION_TIMEOUT,
robust: bool = True):

if routing_key is None:
Expand All @@ -76,7 +77,8 @@ async def bind(self, exchange: ExchangeType_, routing_key: str=None, *,
return result

async def unbind(self, exchange: ExchangeType_, routing_key: str=None,
arguments: dict=None, timeout: TimeoutType = None):
arguments: dict=None,
timeout: TimeoutType = OPERATION_TIMEOUT):

if routing_key is None:
routing_key = self.name
Expand All @@ -90,7 +92,8 @@ async def unbind(self, exchange: ExchangeType_, routing_key: str=None,

async def consume(self, callback: FunctionType, no_ack: bool=False,
exclusive: bool=False, arguments: dict=None,
consumer_tag=None, timeout: TimeoutType = None,
consumer_tag=None,
timeout: TimeoutType = OPERATION_TIMEOUT,
robust: bool = True) -> ConsumerTag:

kwargs = dict(
Expand All @@ -110,7 +113,8 @@ async def consume(self, callback: FunctionType, no_ack: bool=False,
return consumer_tag

async def cancel(self, consumer_tag: ConsumerTag,
timeout: TimeoutType = None, nowait: bool = False):
timeout: TimeoutType = OPERATION_TIMEOUT,
nowait: bool = False):

result = await super().cancel(consumer_tag, timeout, nowait)
self._consumers.pop(consumer_tag, None)
Expand Down
Loading

0 comments on commit 7aa9991

Please sign in to comment.