Skip to content

Commit

Permalink
Make OPERATION_TIMEOUT object default timeout parameters value
Browse files Browse the repository at this point in the history
  • Loading branch information
decaz committed Oct 4, 2019
1 parent f326985 commit ec9c803
Show file tree
Hide file tree
Showing 9 changed files with 76 additions and 54 deletions.
23 changes: 13 additions & 10 deletions aio_pika/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from .exchange import Exchange, ExchangeType
from .message import IncomingMessage
from .queue import Queue
from .tools import OPERATION_TIMEOUT
from .transaction import Transaction
from .types import ReturnCallbackType, CloseCallbackType, TimeoutType

Expand Down Expand Up @@ -106,10 +107,11 @@ def channel(self) -> aiormq.Channel:
def number(self):
return self.channel.number if self._channel else None

def _get_operation_timeout(self, timeout: TimeoutType = None):
if timeout is not None:
return timeout
return self._connection.operation_timeout
def _get_operation_timeout(self, timeout: TimeoutType):
return (
self._connection.operation_timeout if timeout is OPERATION_TIMEOUT
else timeout
)

def __str__(self):
return "{0}".format(
Expand Down Expand Up @@ -163,7 +165,8 @@ async def _create_channel(self) -> aiormq.Channel:
channel_number=self._channel_number,
)

async def initialize(self, timeout: TimeoutType = None) -> None:
async def initialize(self,
timeout: TimeoutType = OPERATION_TIMEOUT) -> None:
if self._channel is not None:
raise RuntimeError("Can't initialize channel")

Expand Down Expand Up @@ -195,7 +198,7 @@ async def declare_exchange(
self, name: str, type: Union[ExchangeType, str] = ExchangeType.DIRECT,
durable: bool = None, auto_delete: bool = False,
internal: bool = False, passive: bool = False, arguments: dict = None,
timeout: TimeoutType = None
timeout: TimeoutType = OPERATION_TIMEOUT
) -> Exchange:
"""
Declare an exchange.
Expand Down Expand Up @@ -233,7 +236,7 @@ async def declare_queue(
self, name: str = None, *, durable: bool = None,
exclusive: bool = False, passive: bool = False,
auto_delete: bool = False, arguments: dict = None,
timeout: TimeoutType = None
timeout: TimeoutType = OPERATION_TIMEOUT
) -> Queue:

"""
Expand Down Expand Up @@ -269,7 +272,7 @@ async def declare_queue(

async def set_qos(
self, prefetch_count: int = 0, prefetch_size: int = 0,
all_channels: bool = False, timeout: TimeoutType = None
all_channels: bool = False, timeout: TimeoutType = OPERATION_TIMEOUT
) -> aiormq.spec.Basic.QosOk:

return await asyncio.wait_for(
Expand All @@ -281,7 +284,7 @@ async def set_qos(
)

async def queue_delete(
self, queue_name: str, timeout: TimeoutType = None,
self, queue_name: str, timeout: TimeoutType = OPERATION_TIMEOUT,
if_unused: bool = False, if_empty: bool = False, nowait: bool = False
) -> aiormq.spec.Queue.DeleteOk:

Expand All @@ -296,7 +299,7 @@ async def queue_delete(
)

async def exchange_delete(
self, exchange_name: str, timeout: TimeoutType = None,
self, exchange_name: str, timeout: TimeoutType = OPERATION_TIMEOUT,
if_unused: bool = False, nowait: bool = False
) -> aiormq.spec.Exchange.DeleteOk:

Expand Down
20 changes: 11 additions & 9 deletions aio_pika/exchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import aiormq
from .message import Message
from .tools import OPERATION_TIMEOUT
from .types import ExchangeType as ExchangeType_, TimeoutType


Expand Down Expand Up @@ -53,10 +54,11 @@ def channel(self) -> aiormq.Channel:

return self._channel

def _get_operation_timeout(self, timeout: TimeoutType = None):
if timeout is not None:
return timeout
return self._connection.operation_timeout
def _get_operation_timeout(self, timeout: TimeoutType):
return (
self._connection.operation_timeout if timeout is OPERATION_TIMEOUT
else timeout
)

def __str__(self):
return self.name
Expand All @@ -67,7 +69,7 @@ def __repr__(self):
)

async def declare(
self, timeout: TimeoutType = None
self, timeout: TimeoutType = OPERATION_TIMEOUT
) -> aiormq.spec.Exchange.DeclareOk:
return await asyncio.wait_for(self.channel.exchange_declare(
self.name,
Expand All @@ -91,7 +93,7 @@ def _get_exchange_name(exchange: ExchangeType_):

async def bind(
self, exchange: ExchangeType_, routing_key: str = '', *,
arguments: dict = None, timeout: TimeoutType = None
arguments: dict = None, timeout: TimeoutType = OPERATION_TIMEOUT
) -> aiormq.spec.Exchange.BindOk:

""" A binding can also be a relationship between two exchanges.
Expand Down Expand Up @@ -144,7 +146,7 @@ async def bind(

async def unbind(
self, exchange: ExchangeType_, routing_key: str = '',
arguments: dict = None, timeout: TimeoutType = None
arguments: dict = None, timeout: TimeoutType = OPERATION_TIMEOUT
) -> aiormq.spec.Exchange.UnbindOk:

""" Remove exchange-to-exchange binding for this
Expand Down Expand Up @@ -174,7 +176,7 @@ async def unbind(

async def publish(
self, message: Message, routing_key, *, mandatory: bool = True,
immediate: bool = False, timeout: TimeoutType = None
immediate: bool = False, timeout: TimeoutType = OPERATION_TIMEOUT
) -> Optional[aiormq.types.ConfirmationFrameType]:

""" Publish the message to the queue. `aio-pika` uses
Expand Down Expand Up @@ -207,7 +209,7 @@ async def publish(
)

async def delete(
self, if_unused: bool = False, timeout: TimeoutType = None
self, if_unused: bool = False, timeout: TimeoutType = OPERATION_TIMEOUT
) -> aiormq.spec.Exchange.DeleteOk:

""" Delete the queue
Expand Down
31 changes: 17 additions & 14 deletions aio_pika/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from .exchange import Exchange
from .types import ExchangeType as ExchangeType_, TimeoutType
from .message import IncomingMessage
from .tools import create_task, shield
from .tools import OPERATION_TIMEOUT, create_task, shield

log = getLogger(__name__)

Expand Down Expand Up @@ -53,10 +53,11 @@ def channel(self) -> aiormq.Channel:
raise RuntimeError("Channel not opened")
return self._channel

def _get_operation_timeout(self, timeout: TimeoutType = None):
if timeout is not None:
return timeout
return self._connection.operation_timeout
def _get_operation_timeout(self, timeout: TimeoutType):
return (
self._connection.operation_timeout if timeout is OPERATION_TIMEOUT
else timeout
)

def __str__(self):
return "%s" % self.name
Expand All @@ -77,7 +78,7 @@ def __repr__(self):
)

async def declare(
self, timeout: TimeoutType = None
self, timeout: TimeoutType = OPERATION_TIMEOUT
) -> aiormq.spec.Queue.DeclareOk:
""" Declare queue.
Expand All @@ -100,7 +101,7 @@ async def declare(

async def bind(
self, exchange: ExchangeType_, routing_key: str=None, *,
arguments=None, timeout: TimeoutType = None
arguments=None, timeout: TimeoutType = OPERATION_TIMEOUT
) -> aiormq.spec.Queue.BindOk:

""" A binding is a relationship between an exchange and a queue.
Expand Down Expand Up @@ -139,7 +140,7 @@ async def bind(

async def unbind(
self, exchange: ExchangeType_, routing_key: str=None,
arguments: dict=None, timeout: TimeoutType = None
arguments: dict=None, timeout: TimeoutType = OPERATION_TIMEOUT
) -> aiormq.spec.Queue.UnbindOk:

""" Remove binding from exchange for this :class:`Queue` instance
Expand Down Expand Up @@ -173,7 +174,7 @@ async def unbind(
async def consume(
self, callback: Callable[[IncomingMessage], Any], no_ack: bool = False,
exclusive: bool = False, arguments: dict = None,
consumer_tag=None, timeout: TimeoutType = None
consumer_tag=None, timeout: TimeoutType = OPERATION_TIMEOUT
) -> ConsumerTag:

""" Start to consuming the :class:`Queue`.
Expand Down Expand Up @@ -215,8 +216,8 @@ async def consume(
)).consumer_tag

async def cancel(
self, consumer_tag: ConsumerTag, timeout: TimeoutType = None,
nowait: bool=False
self, consumer_tag: ConsumerTag,
timeout: TimeoutType = OPERATION_TIMEOUT, nowait: bool=False
) -> aiormq.spec.Basic.CancelOk:
""" This method cancels a consumer. This does not affect already
delivered messages, but it does mean the server will not send any more
Expand Down Expand Up @@ -244,7 +245,8 @@ async def cancel(
)

async def get(
self, *, no_ack=False, fail=True, timeout=5
self, *, no_ack=False, fail=True,
timeout: TimeoutType = OPERATION_TIMEOUT
) -> Optional[IncomingMessage]:

""" Get message from the queue.
Expand All @@ -270,7 +272,7 @@ async def get(
return IncomingMessage(msg, no_ack=no_ack)

async def purge(
self, no_wait=False, timeout: TimeoutType = None
self, no_wait=False, timeout: TimeoutType = OPERATION_TIMEOUT
) -> aiormq.spec.Queue.PurgeOk:
""" Purge all messages from the queue.
Expand All @@ -289,7 +291,8 @@ async def purge(
)

async def delete(
self, *, if_unused=True, if_empty=True, timeout: TimeoutType = None
self, *, if_unused=True, if_empty=True,
timeout: TimeoutType = OPERATION_TIMEOUT
) -> aiormq.spec.Queue.DeclareOk:

""" Delete the queue.
Expand Down
16 changes: 10 additions & 6 deletions aio_pika/robust_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from .exchange import Exchange, ExchangeType
from .queue import Queue
from .tools import OPERATION_TIMEOUT
from .types import TimeoutType
from .channel import Channel
from .robust_queue import RobustQueue
Expand Down Expand Up @@ -61,7 +62,7 @@ async def on_reconnect(self, connection, channel_number):
for queue in self._queues.values():
await queue.on_reconnect(self)

async def initialize(self, timeout: TimeoutType = None):
async def initialize(self, timeout: TimeoutType = OPERATION_TIMEOUT):
result = await super().initialize()

prefetch_count, prefetch_size = self._qos
Expand All @@ -74,7 +75,8 @@ async def initialize(self, timeout: TimeoutType = None):
return result

async def set_qos(self, prefetch_count: int = 0, prefetch_size: int = 0,
all_channels=False, timeout: TimeoutType = None):
all_channels=False,
timeout: TimeoutType = OPERATION_TIMEOUT):

if all_channels:
raise NotImplementedError("Not available to RobustConnection")
Expand All @@ -92,7 +94,7 @@ async def declare_exchange(self, name: str,
durable: bool = None, auto_delete: bool = False,
internal: bool = False, passive: bool = False,
arguments: dict = None,
timeout: TimeoutType = None,
timeout: TimeoutType = OPERATION_TIMEOUT,
robust: bool = True) -> Exchange:

exchange = await super().declare_exchange(
Expand All @@ -107,7 +109,8 @@ async def declare_exchange(self, name: str,
return exchange

async def exchange_delete(self, exchange_name: str,
timeout: TimeoutType = None, if_unused=False,
timeout: TimeoutType = OPERATION_TIMEOUT,
if_unused=False,
nowait=False) -> aiormq.spec.Exchange.DeleteOk:

result = await super().exchange_delete(
Expand All @@ -122,7 +125,7 @@ async def exchange_delete(self, exchange_name: str,
async def declare_queue(self, name: str = None, *, durable: bool = None,
exclusive: bool = False, passive: bool = False,
auto_delete: bool = False, arguments: dict = None,
timeout: TimeoutType = None,
timeout: TimeoutType = OPERATION_TIMEOUT,
robust: bool = True) -> Queue:

queue = await super().declare_queue(
Expand All @@ -136,7 +139,8 @@ async def declare_queue(self, name: str = None, *, durable: bool = None,

return queue

async def queue_delete(self, queue_name: str, timeout: TimeoutType = None,
async def queue_delete(self, queue_name: str,
timeout: TimeoutType = OPERATION_TIMEOUT,
if_unused: bool = False, if_empty: bool = False,
nowait: bool = False):
result = await super().queue_delete(
Expand Down
6 changes: 4 additions & 2 deletions aio_pika/robust_exchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from .exchange import Exchange, ExchangeType
from .channel import Channel
from .tools import OPERATION_TIMEOUT
from .types import TimeoutType


Expand Down Expand Up @@ -43,7 +44,7 @@ async def on_reconnect(self, channel: Channel):
await self.bind(exchange, **kwargs)

async def bind(self, exchange, routing_key: str='', *,
arguments=None, timeout: TimeoutType = None):
arguments=None, timeout: TimeoutType = OPERATION_TIMEOUT):
result = await super().bind(
exchange, routing_key=routing_key,
arguments=arguments, timeout=timeout
Expand All @@ -56,7 +57,8 @@ async def bind(self, exchange, routing_key: str='', *,
return result

async def unbind(self, exchange, routing_key: str = '',
arguments: dict=None, timeout: TimeoutType = None):
arguments: dict=None,
timeout: TimeoutType = OPERATION_TIMEOUT):

result = await super().unbind(exchange, routing_key,
arguments=arguments, timeout=timeout)
Expand Down
11 changes: 7 additions & 4 deletions aio_pika/robust_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import aiormq

from .channel import Channel
from .tools import OPERATION_TIMEOUT
from .types import ExchangeType as ExchangeType_, TimeoutType
from .queue import Queue, ConsumerTag

Expand Down Expand Up @@ -56,7 +57,7 @@ async def on_reconnect(self, channel: Channel):
await self.consume(consumer_tag=consumer_tag, **kwargs)

async def bind(self, exchange: ExchangeType_, routing_key: str=None, *,
arguments=None, timeout: TimeoutType = None):
arguments=None, timeout: TimeoutType = OPERATION_TIMEOUT):

if routing_key is None:
routing_key = self.name
Expand All @@ -74,7 +75,8 @@ async def bind(self, exchange: ExchangeType_, routing_key: str=None, *,
return result

async def unbind(self, exchange: ExchangeType_, routing_key: str=None,
arguments: dict=None, timeout: TimeoutType = None):
arguments: dict=None,
timeout: TimeoutType = OPERATION_TIMEOUT):

if routing_key is None:
routing_key = self.name
Expand All @@ -89,7 +91,7 @@ async def unbind(self, exchange: ExchangeType_, routing_key: str=None,
async def consume(self, callback: FunctionType, no_ack: bool=False,
exclusive: bool=False, arguments: dict=None,
consumer_tag=None,
timeout: TimeoutType = None) -> ConsumerTag:
timeout: TimeoutType = OPERATION_TIMEOUT) -> ConsumerTag:

kwargs = dict(
callback=callback,
Expand All @@ -107,7 +109,8 @@ async def consume(self, callback: FunctionType, no_ack: bool=False,
return consumer_tag

async def cancel(self, consumer_tag: ConsumerTag,
timeout: TimeoutType = None, nowait: bool = False):
timeout: TimeoutType = OPERATION_TIMEOUT,
nowait: bool = False):

result = await super().cancel(consumer_tag, timeout, nowait)
self._consumers.pop(consumer_tag, None)
Expand Down
2 changes: 2 additions & 0 deletions aio_pika/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

log = logging.getLogger(__name__)

OPERATION_TIMEOUT = object()


def iscoroutinepartial(fn):
"""
Expand Down
Loading

0 comments on commit ec9c803

Please sign in to comment.