From cdad1e952e6b5059b319a11b3b7aa7d7ad700bbf Mon Sep 17 00:00:00 2001 From: "Patrick M. Niedzielski" Date: Wed, 6 Dec 2023 20:37:15 +0000 Subject: [PATCH] Expose more session options from C++ SDK MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Since the creation of the BlazingMQ Python SDK, our underlying C++ SDK libbmq has gained more `SessionOptions` that a user can configure, including additional and more fine-grained timeout settings, new watermark configurations, and buffer sizes. These session options are not currently exposed in the Python SDK, though, so Python users are unable to make use of the new configuration options. We want to expose all new session options that are not already marked as deprecated or that do not involve distributed trace, which will require more substantial integration work. This work can be undertaken in the future without impact on the changes here. This patch extends the Python SDK to expose these new SessionOptions, modifying the API, documentation, and tests. The BlazingMQ Python SDK is divided into three layers, with the lowest level being a pure C++ library pybmq that interfaces with the BlazingMQ C++ client library libbmq, with the highest level being exposed through the `_session.py` module’s `Session` Python class, and with a thin middle layer of Cython glue code in `_ext.pyx` connecting the two. In order to expose these new `SessionOptions` to users of the Python SDK, we need to include them in each layer of the library. Crucially, we want the changes in the highest layer, `_session.py` to maintain backwards compatibility with all existing clients of the Python SDK. We maintain compatibility and evolve our API with two steps: 1. Add the new session options as additional parameters to the `Session` class’s constructor. First, all new session options that the `Session` constructor accepts are defaulted to `None`, which pybmq ignores, maintaining the existing behavior for callers who do not set any of these new arguments. Second, new session options are also appended to the list of arguments, so clients who build a `Session` using positional arguments rather than named arguments will continue to function correctly. Finally, we introduce a new value-semantic class `Timeouts`, which allows users to independently set the timeouts for each of the five session operations whose timeouts are configureable: session connection, session disconnection, queue opening, queue configuraiton, and queue closing. By overloading the `timeout` argument to taking either a simple float or an instance of `Timeouts`, users who currently set a timeout for the `Session` by passing in a single timeout will continue to see the same behavior, but users passing in a new `Timeouts` value have access to the more fine-grained timeout settings that libbmq’s `SessionOptions` provides. 2. Add a new named constructor `with_options` to `Session`, which takes an instance of a new value-semantic class `SessionOptions`. Thile users can continue to construct the `Session` directly using its extended constructor, adding the additional session options arguments to the constructor makes it somewhat unwieldly to call, especially if the user only wants to configure one of the session options. This patch provides an alternative, simpler way to construct a `Session` instance, using a new value-semantic builder class named `SessionOptions` that holds each of the new session options and a class method named `with_options`, which takes an instance of `SessionOptions` and returns a new `Session`. `with_options` is the recommended way to construct a new session if you plan to configure the session options. --- docs/api_reference.rst | 8 + news/8.feature.rst | 1 + src/blazingmq/__init__.py | 4 + src/blazingmq/_ext.pyi | 8 +- src/blazingmq/_ext.pyx | 43 ++++- src/blazingmq/_session.py | 272 +++++++++++++++++++++++++++-- src/blazingmq/_timeouts.py | 93 ++++++++++ src/cpp/pybmq_mocksession.cpp | 47 +++-- src/cpp/pybmq_session.cpp | 55 +++++- src/cpp/pybmq_session.h | 11 +- src/declarations/pybmq.pxd | 12 +- tests/unit/test_ext_session.py | 57 +++++- tests/unit/test_queue_options.py | 8 +- tests/unit/test_session.py | 253 ++++++++++++++++++++++++++- tests/unit/test_session_options.py | 102 +++++++++++ tests/unit/test_timeouts.py | 88 ++++++++++ 16 files changed, 1011 insertions(+), 51 deletions(-) create mode 100644 news/8.feature.rst create mode 100644 src/blazingmq/_timeouts.py create mode 100644 tests/unit/test_session_options.py create mode 100644 tests/unit/test_timeouts.py diff --git a/docs/api_reference.rst b/docs/api_reference.rst index 7171401..b380ab8 100644 --- a/docs/api_reference.rst +++ b/docs/api_reference.rst @@ -11,6 +11,14 @@ Session .. autoclass:: Session :members: +.. autoclass:: Timeouts + :members: + :member-order: bysource + +.. autoclass:: SessionOptions + :members: + :member-order: bysource + .. autoclass:: QueueOptions :members: :member-order: bysource diff --git a/news/8.feature.rst b/news/8.feature.rst new file mode 100644 index 0000000..a593060 --- /dev/null +++ b/news/8.feature.rst @@ -0,0 +1 @@ +Exposed more `SessionOptions` from libbmq diff --git a/src/blazingmq/__init__.py b/src/blazingmq/__init__.py index d83f44b..910d1ad 100644 --- a/src/blazingmq/__init__.py +++ b/src/blazingmq/__init__.py @@ -25,6 +25,8 @@ from ._monitors import BasicHealthMonitor from ._session import QueueOptions from ._session import Session +from ._session import SessionOptions +from ._timeouts import Timeouts from ._typing import PropertyTypeDict from ._typing import PropertyValueDict from .exceptions import Error @@ -42,6 +44,8 @@ "Message", "MessageHandle", "Session", + "SessionOptions", + "Timeouts", "__version__", "exceptions", "session_events", diff --git a/src/blazingmq/_ext.pyi b/src/blazingmq/_ext.pyi index c8c64e7..ca022e4 100644 --- a/src/blazingmq/_ext.pyi +++ b/src/blazingmq/_ext.pyi @@ -24,6 +24,7 @@ from blazingmq import CompressionAlgorithmType from blazingmq import Message from blazingmq import MessageHandle from blazingmq import PropertyType +from blazingmq import Timeouts from blazingmq.session_events import SessionEvent DEFAULT_MAX_UNCONFIRMED_MESSAGES: int = ... @@ -44,7 +45,12 @@ class Session: on_message: Optional[Callable[[Message, MessageHandle], None]] = None, broker: bytes, message_compression_algorithm: CompressionAlgorithmType, - timeout: Optional[float] = None, + num_processing_threads: Optional[int] = None, + blob_buffer_size: Optional[int] = None, + channel_high_watermark: Optional[int] = None, + event_queue_watermarks: Optional[tuple[int, int]] = None, + stats_dump_interval: Optional[int | float] = None, + timeouts: Timeouts = Timeouts(), monitor_host_health: bool = False, fake_host_health_monitor: Optional[FakeHostHealthMonitor] = None, ) -> None: ... diff --git a/src/blazingmq/_ext.pyx b/src/blazingmq/_ext.pyx index 4069406..d10ea5a 100644 --- a/src/blazingmq/_ext.pyx +++ b/src/blazingmq/_ext.pyx @@ -19,6 +19,7 @@ import logging import weakref from bsl cimport optional +from bsl cimport pair from bsl cimport shared_ptr from bsl.bsls cimport TimeInterval from cpython.ceval cimport PyEval_InitThreads @@ -43,6 +44,7 @@ from . import _callbacks from . import _enums from . import _messages from . import _script_name +from . import _timeouts from . import session_events from .exceptions import BrokerTimeoutError from .exceptions import Error @@ -164,15 +166,40 @@ cdef class Session: on_message=None, broker not None: bytes = b'tcp://localhost:30114', message_compression_algorithm not None=_enums.CompressionAlgorithmType.NONE, - timeout: Optional[int|float] = None, + num_processing_threads: Optional[int] = None, + blob_buffer_size: Optional[int] = None, + channel_high_watermark: Optional[int] = None, + event_queue_watermarks: Optional[tuple[int,int]] = None, + stats_dump_interval: Optional[int|float] = None, + timeouts: _timeouts.Timeouts = _timeouts.Timeouts(), monitor_host_health: bool = False, fake_host_health_monitor: FakeHostHealthMonitor = None, _mock: Optional[object] = None, ) -> None: cdef shared_ptr[ManualHostHealthMonitor] fake_host_health_monitor_sp + cdef optional[int] c_num_processing_threads + cdef optional[int] c_blob_buffer_size + cdef optional[int] c_channel_high_watermark + cdef optional[pair[int,int]] c_event_queue_watermarks + cdef TimeInterval c_stats_dump_interval = create_time_interval(stats_dump_interval) + cdef TimeInterval c_connect_timeout = create_time_interval(timeouts.connect_timeout) + cdef TimeInterval c_disconnect_timeout = create_time_interval(timeouts.disconnect_timeout) + cdef TimeInterval c_open_queue_timeout = create_time_interval(timeouts.open_queue_timeout) + cdef TimeInterval c_configure_queue_timeout = create_time_interval(timeouts.configure_queue_timeout) + cdef TimeInterval c_close_queue_timeout = create_time_interval(timeouts.close_queue_timeout) PyEval_InitThreads() + if num_processing_threads is not None: + c_num_processing_threads = optional[int](num_processing_threads) + if blob_buffer_size is not None: + c_blob_buffer_size = optional[int](blob_buffer_size) + if channel_high_watermark is not None: + c_channel_high_watermark = optional[int](channel_high_watermark) + if event_queue_watermarks is not None: + c_event_queue_watermarks = optional[pair[int,int]]( + pair[int,int](event_queue_watermarks[0], event_queue_watermarks[1])) + self.monitor_host_health = monitor_host_health if fake_host_health_monitor: @@ -194,7 +221,6 @@ cdef class Session: cdef char *c_broker_uri = broker script_name = _script_name.get_script_name() cdef char *c_script_name = script_name - cdef TimeInterval c_timeout = create_time_interval(timeout) self._session = new NativeSession( session_cb, message_cb, @@ -202,13 +228,22 @@ cdef class Session: c_broker_uri, c_script_name, COMPRESSION_ALGO_FROM_PY_MAPPING[message_compression_algorithm], - c_timeout, + c_num_processing_threads, + c_blob_buffer_size, + c_channel_high_watermark, + c_event_queue_watermarks, + c_stats_dump_interval, + c_connect_timeout, + c_disconnect_timeout, + c_open_queue_timeout, + c_configure_queue_timeout, + c_close_queue_timeout, monitor_host_health, fake_host_health_monitor_sp, Error, BrokerTimeoutError, _mock) - self._session.start(c_timeout) + self._session.start(c_connect_timeout) atexit.register(ensure_stop_session_impl, weakref.ref(self)) def stop(self) -> None: diff --git a/src/blazingmq/_session.py b/src/blazingmq/_session.py index efad6d0..5a55a6c 100644 --- a/src/blazingmq/_session.py +++ b/src/blazingmq/_session.py @@ -35,6 +35,7 @@ from ._messages import Message from ._messages import MessageHandle from ._monitors import BasicHealthMonitor +from ._timeouts import Timeouts from ._typing import PropertyTypeDict from ._typing import PropertyValueDict from ._typing import PropertyValueType @@ -55,19 +56,49 @@ def DefaultMonitor() -> Union[BasicHealthMonitor, None]: KNOWN_MONITORS = ("blazingmq.BasicHealthMonitor",) -def _convert_timeout(timeout: float) -> Optional[float]: +def _validate_timeouts(timeouts: Timeouts) -> Timeouts: + """Validate a `.Timeouts` instance for use by the Cython layer. + + If any of the timeouts contained within the `.Timeouts` instance are the + `DEFAULT_TIMEOUT` sentinel or `None`, return `None`. Otherwise, validate + that it is within the range accepted by `bsls::TimeInterval` and return it. + """ + return Timeouts( + connect_timeout=_convert_timeout(timeouts.connect_timeout), + disconnect_timeout=_convert_timeout(timeouts.disconnect_timeout), + open_queue_timeout=_convert_timeout(timeouts.open_queue_timeout), + configure_queue_timeout=_convert_timeout(timeouts.configure_queue_timeout), + close_queue_timeout=_convert_timeout(timeouts.close_queue_timeout), + ) + + +def _convert_timeout(timeout: Optional[float]) -> Optional[float]: """Convert the timeout for use by the Cython layer. - If it is the DEFAULT_TIMEOUT sentinel, return None. Otherwise, validate - that it is within the range accepted by bsls::TimeInterval and return it. + If it is the DEFAULT_TIMEOUT sentinel or None, return None. Otherwise, + validate that it is within the range accepted by bsls::TimeInterval and + return it. """ - if timeout is DEFAULT_TIMEOUT: + if timeout is DEFAULT_TIMEOUT or timeout is None: return None - elif timeout is not None and (0.0 < float(timeout) < 2**63): - return float(timeout) + elif 0.0 < timeout < 2**63: + return timeout raise ValueError(f"timeout must be greater than 0.0, was {timeout}") +def _convert_stats_dump_interval(interval: Optional[float]) -> Optional[float]: + """Convert the stats dump interval for use by the Cython layer. + + If is None, return None. Otherwise, validate that it is within the range + accepted by bsls::TimeInterval and return it. + """ + if interval is None: + return interval + if 0.0 <= interval < 2**63: + return interval + raise ValueError(f"stats_dump_interval must be nonnegative, was {interval}") + + def _collect_properties_and_types( properties: Optional[PropertyValueDict], property_type_overrides: Optional[PropertyTypeDict], @@ -204,7 +235,116 @@ def __repr__(self) -> str: if value is not None: params.append(f"{attr}={value!r}") - return "QueueOptions(%s)" % ", ".join(params) + return f"QueueOptions({', '.join(params)})" + + +class SessionOptions: + """A value semantic type representing session options. + + Each option can be set either by passing it as a keyword argument when + constructing a *SessionOptions* instance, or by setting it as an attribute + on a constructed instance. + + The default for every option is `None`. When constructing a `Session`, + options set to `None` are given reasonable default values. + + Args: + message_compression_algorithm: + The type of compression to apply to messages being posted via the + session this object is configuring. + timeouts: + The maximum number of seconds to wait for requests for each + operation on this session. If not provided, reasonable defaults + are used. + host_health_monitor: + A `.BasicHealthMonitor` is used by default, so your tests can + control whether the session sees the machine as healthy or not by + calling `.set_healthy` and `.set_unhealthy` on that instance. If + you instead pass `None`, the session will always see the machine as + healthy, `.HostUnhealthy` and `.HostHealthRestored` events with + never be emitted, and the *suspends_on_bad_host_health* option of + `QueueOptions` cannot be used. + num_processing_threads: + The number of threads for the SDK to use for processing events. + This defaults to 1. + blob_buffer_size: + The size (in bytes) of the blob buffers to use. This defaults to + 4k. + channel_high_watermark: + The size (in bytes) to use for the write cache high watermark on + the channel. The default value is 128MB. Note that BlazingMQ + reserves 4MB of this value for control messages, so the actual + watermark for data published is ``channel_high_watermark - 4MB``. + event_queue_watermarks: + A tuple containing the low and high notification watermark + thresholds for the buffer containing all incoming messages from the + broker, respectively. A warning `.SlowConsumerHighWaterMark` is + emitted when the buffer reaches the high watermark value, and a + notification `.SlowConsumerNormal` is emitted when the buffer is + back to the low watermark. + stats_dump_interval: + The interval (in seconds) at which to dump stats into the logs. If + 0, disable the recurring dump of stats (final stats are always + dumped at the end of the session). The default is 5min; the value + must be a multiple of 30s, in the range ``[0s - 60min]``. + """ + + def __init__( + self, + message_compression_algorithm: Optional[CompressionAlgorithmType] = None, + timeouts: Optional[Timeouts] = None, + host_health_monitor: Union[BasicHealthMonitor, None] = (DefaultMonitor()), + num_processing_threads: Optional[int] = None, + blob_buffer_size: Optional[int] = None, + channel_high_watermark: Optional[int] = None, + event_queue_watermarks: Optional[tuple[int, int]] = None, + stats_dump_interval: Optional[float] = None, + ) -> None: + self.message_compression_algorithm = message_compression_algorithm + self.timeouts = timeouts + self.host_health_monitor = host_health_monitor + self.num_processing_threads = num_processing_threads + self.blob_buffer_size = blob_buffer_size + self.channel_high_watermark = channel_high_watermark + self.event_queue_watermarks = event_queue_watermarks + self.stats_dump_interval = stats_dump_interval + + def __eq__(self, other: object) -> bool: + if not isinstance(other, SessionOptions): + return False + return ( + self.message_compression_algorithm == other.message_compression_algorithm + and self.timeouts == other.timeouts + and self.host_health_monitor == other.host_health_monitor + and self.num_processing_threads == other.num_processing_threads + and self.blob_buffer_size == other.blob_buffer_size + and self.channel_high_watermark == other.channel_high_watermark + and self.event_queue_watermarks == other.event_queue_watermarks + and self.stats_dump_interval == other.stats_dump_interval + ) + + def __ne__(self, other: object) -> bool: + return not self == other + + def __repr__(self) -> str: + attrs = ( + "message_compression_algorithm", + "timeouts", + "host_health_monitor", + "num_processing_threads", + "blob_buffer_size", + "channel_high_watermark", + "event_queue_watermarks", + "stats_dump_interval", + ) + + params = [] + for attr in attrs: + value = getattr(self, attr) + if value is not None: + params.append(f"{attr}={value!r}") + + return f"SessionOptions({', '.join(params)})" class Session: @@ -226,7 +366,10 @@ class Session: message_compression_algorithm: the type of compression to apply to messages being posted via this session object. timeout: maximum number of seconds to wait for requests on this - session. If not provided, reasonable defaults are used. + session. If not provided, reasonable defaults are used. This + argument may either be a simple ``float``, which sets the same + timeout for each operation, or an instance of the `Timeouts` class, + which allows setting the timeout for each operation independently. host_health_monitor: A `.BasicHealthMonitor` is used by default, so your tests can control whether the session sees the machine as healthy or not by calling `.set_healthy` and `.set_unhealthy` on @@ -235,12 +378,33 @@ class Session: `.HostHealthRestored` events will never be emitted, and the *suspends_on_bad_host_health* option of `QueueOptions` cannot be used. + num_processing_threads: The number of threads for the SDK to use for + processing events. This defaults to 1. + blob_buffer_size: The size (in bytes) of the blob buffers to use. This + defaults to 4k. + channel_high_watermark: The size (in bytes) to use for the write cache + high watermark on the channel. The default value is 128MB. Note + that BlazingMQ reserves 4MB of this value for control messages, so + the actual watermark for data published is + ``channel_high_watermark - 4MB``. + event_queue_watermarks: A tuple containing the low and high + notification watermark thresholds for the buffer containing all + incoming messages from the broker, respectively. A warning + `.SlowConsumerHighWaterMark` is emitted when the buffer reaches the + high watermark value, and a notification `.SlowConsumerNormal` is + emitted when the buffer is back to the low watermark. + stats_dump_interval: The interval (in seconds) at which to dump stats + into the logs. If 0, disable the recurring dump of stats (final + stats are always dumped at the end of the session). The default is + 5min; the value must be a multiple of 30s, in the range + ``[0s - 60min]``. Raises: `~blazingmq.Error`: If the session start request was not successful. `~blazingmq.exceptions.BrokerTimeoutError`: If the broker didn't respond to the request within a reasonable amount of time. - `ValueError`: If *timeout* is provided and not > 0.0. + `ValueError`: If any of the timeouts are provided and not > 0.0, or if + the ``stats_dump_interval`` is provided and is < 0.0. """ def __init__( @@ -251,8 +415,13 @@ def __init__( message_compression_algorithm: CompressionAlgorithmType = ( CompressionAlgorithmType.NONE ), - timeout: float = DEFAULT_TIMEOUT, + timeout: Union[Timeouts, float] = DEFAULT_TIMEOUT, host_health_monitor: Union[BasicHealthMonitor, None] = (DefaultMonitor()), + num_processing_threads: Optional[int] = None, + blob_buffer_size: Optional[int] = None, + channel_high_watermark: Optional[int] = None, + event_queue_watermarks: Optional[tuple[int, int]] = None, + stats_dump_interval: Optional[float] = None, ) -> None: if host_health_monitor is not None: if not isinstance(host_health_monitor, BasicHealthMonitor): @@ -265,16 +434,97 @@ def __init__( fake_host_health_monitor = getattr(host_health_monitor, "_monitor", None) self._has_no_on_message = on_message is None + + # Using our Timeouts class, preserve the old behavior of passing in a + # simple float as a timeout. Avoid setting the `connect_timeout` and + # `disconnect_timeout`. + if not isinstance(timeout, Timeouts): + timeout = Timeouts( + open_queue_timeout=timeout, + configure_queue_timeout=timeout, + close_queue_timeout=timeout, + ) + self._ext = ExtSession( on_session_event, on_message=on_message, broker=six.ensure_binary(broker), message_compression_algorithm=message_compression_algorithm, - timeout=_convert_timeout(timeout), + num_processing_threads=num_processing_threads, + blob_buffer_size=blob_buffer_size, + channel_high_watermark=channel_high_watermark, + event_queue_watermarks=event_queue_watermarks, + stats_dump_interval=_convert_stats_dump_interval(stats_dump_interval), + timeouts=_validate_timeouts(timeout), monitor_host_health=monitor_host_health, fake_host_health_monitor=fake_host_health_monitor, ) + @classmethod + def with_options( + cls, + on_session_event: Callable[[SessionEvent], None], + on_message: Optional[Callable[[Message, MessageHandle], None]] = None, + broker: str = "tcp://localhost:30114", + session_options: SessionOptions = (SessionOptions()), + ) -> Session: + """Construct a *Session* instance using `.SessionOptions`. + + This is the recommended way to construct a new session, as the + `.SessionOptions` class provides an easier to use interface for + configuring only those options you need. + + Args: + on_session_event: a required callback to process `.SessionEvent` events + received by the session. + on_message: an optional callback to process `Message` objects received + by the session. + broker: TCP address of the broker (default: 'tcp://localhost:30114'). + If the environment variable ``BMQ_BROKER_URI`` is set, its value + will override whatever broker address is passed via this argument. + session_options: an instance of `.SessionOptions` that represents the + session's configuration. + + Raises: + `~blazingmq.Error`: If the session start request was not successful. + `~blazingmq.exceptions.BrokerTimeoutError`: If the broker didn't respond + to the request within a reasonable amount of time. + `ValueError`: If any of the timeouts are provided and not > 0.0, or if + the ``stats_dump_interval`` is provided and is < 0.0. + """ + message_compression_algorithm = session_options.message_compression_algorithm + if message_compression_algorithm is None: + message_compression_algorithm = CompressionAlgorithmType.NONE + + if session_options.timeouts is None: + return cls( + on_session_event, + on_message, + broker, + message_compression_algorithm, + DEFAULT_TIMEOUT, + session_options.host_health_monitor, + session_options.num_processing_threads, + session_options.blob_buffer_size, + session_options.channel_high_watermark, + session_options.event_queue_watermarks, + session_options.stats_dump_interval, + ) + else: + return cls( + on_session_event, + on_message, + broker, + message_compression_algorithm, + session_options.timeouts, + session_options.host_health_monitor, + session_options.num_processing_threads, + session_options.blob_buffer_size, + session_options.channel_high_watermark, + session_options.event_queue_watermarks, + session_options.stats_dump_interval, + ) + def open_queue( self, queue_uri: str, diff --git a/src/blazingmq/_timeouts.py b/src/blazingmq/_timeouts.py new file mode 100644 index 0000000..2d5c56f --- /dev/null +++ b/src/blazingmq/_timeouts.py @@ -0,0 +1,93 @@ +# Copyright 2019-2023 Bloomberg Finance L.P. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from typing import Optional + + +class Timeouts: + """A value semantic type representing session timeouts. + + Each option can be set either by passing it as a keyword argument when + constructing a *Timeouts* instance, or by setting it as an attribute on + a constructed instance. + + The default for every option is `None`. When constructing a `Session`, + either directly or using `SessionOptions`, options set to `None` are given + reasonable default values. + + Args: + connect_timeout: + The maximum number of seconds to wait for connection requests on + this session. + disconnect_timeout: + The maximum number of seconds to wait for disconnection requests + on this session. + open_queue_timeout: + The maximum number of seconds to wait for open queue requests on + this session. + configure_queue_timeout: + The maximum number of seconds to wait for configure queue requests + on this session. + close_queue_timeout: + The maximum number of seconds to wait for close queue requests on + this session. + """ + + def __init__( + self, + connect_timeout: Optional[float] = None, + disconnect_timeout: Optional[float] = None, + open_queue_timeout: Optional[float] = None, + configure_queue_timeout: Optional[float] = None, + close_queue_timeout: Optional[float] = None, + ) -> None: + self.connect_timeout = connect_timeout + self.disconnect_timeout = disconnect_timeout + self.open_queue_timeout = open_queue_timeout + self.configure_queue_timeout = configure_queue_timeout + self.close_queue_timeout = close_queue_timeout + + def __eq__(self, other: object) -> bool: + if not isinstance(other, Timeouts): + return False + return ( + self.connect_timeout == other.connect_timeout + and self.disconnect_timeout == other.disconnect_timeout + and self.open_queue_timeout == other.open_queue_timeout + and self.configure_queue_timeout == other.configure_queue_timeout + and self.close_queue_timeout == other.close_queue_timeout + ) + + def __ne__(self, other: object) -> bool: + return not self == other + + def __repr__(self) -> str: + attrs = ( + "connect_timeout", + "disconnect_timeout", + "open_queue_timeout", + "configure_queue_timeout", + "close_queue_timeout", + ) + + params = [] + for attr in attrs: + value = getattr(self, attr) + if value is not None: + params.append(f"{attr}={value!r}") + + return f"Timeouts({', '.join(params)})" diff --git a/src/cpp/pybmq_mocksession.cpp b/src/cpp/pybmq_mocksession.cpp index e842e25..6e1050a 100644 --- a/src/cpp/pybmq_mocksession.cpp +++ b/src/cpp/pybmq_mocksession.cpp @@ -261,6 +261,15 @@ maybe_emit_acks(PyObject* mock, bmqa::MockSession* mock_session) } } +double +time_interval_to_seconds(const bsls::TimeInterval& time_interval) +{ + const double k_S_PER_NS = + 1.0 / static_cast(bdlt::TimeUnitRatio::k_NS_PER_S); + + return time_interval.seconds() + time_interval.nanoseconds() * k_S_PER_NS; +} + } // namespace // CREATORS @@ -276,29 +285,47 @@ MockSession::MockSession( static const char* const option_names[] = { "broker_uri", "process_name_override", + "connect_timeout", + "disconnect_timeout", "open_queue_timeout", "configure_queue_timeout", - "close_queue_timeout"}; - - double timeout_open_secs = options.openQueueTimeout().seconds() - + options.openQueueTimeout().nanoseconds() * 1e-9; + "close_queue_timeout", + "num_processing_threads", + "blob_buffer_size", + "channel_high_watermark", + "event_queue_low_watermark", + "event_queue_high_watermark", + "stats_dump_interval"}; + + double timeout_connect_secs = time_interval_to_seconds(options.connectTimeout()); + double timeout_disconnect_secs = + time_interval_to_seconds(options.disconnectTimeout()); + double timeout_open_secs = time_interval_to_seconds(options.openQueueTimeout()); double timeout_configure_secs = - options.configureQueueTimeout().seconds() - + options.configureQueueTimeout().nanoseconds() * 1e-9; - double timeout_close_secs = options.closeQueueTimeout().seconds() - + options.closeQueueTimeout().nanoseconds() * 1e-9; + time_interval_to_seconds(options.configureQueueTimeout()); + double timeout_close_secs = time_interval_to_seconds(options.closeQueueTimeout()); + double stats_dump_interval_secs = + time_interval_to_seconds(options.statsDumpInterval()); bslma::ManagedPtr py_options = RefUtils::toManagedPtr(_Py_DictBuilder( option_names, - "(s# N f f f)", + "(s# N f f f f f i i i i i f)", options.brokerUri().c_str(), options.brokerUri().length(), PyBytes_FromStringAndSize( options.processNameOverride().c_str(), options.processNameOverride().length()), + timeout_connect_secs, + timeout_disconnect_secs, timeout_open_secs, timeout_configure_secs, - timeout_close_secs)); + timeout_close_secs, + options.numProcessingThreads(), + options.blobBufferSize(), + options.channelHighWatermark(), + options.eventQueueLowWatermark(), + options.eventQueueHighWatermark(), + stats_dump_interval_secs)); if (!py_options) throw bsl::runtime_error("propagating Python error"); PyObject_SetAttrString(d_mock, "options", py_options.get()); } diff --git a/src/cpp/pybmq_session.cpp b/src/cpp/pybmq_session.cpp index ba2386b..80c8947 100644 --- a/src/cpp/pybmq_session.cpp +++ b/src/cpp/pybmq_session.cpp @@ -80,7 +80,16 @@ Session::Session( const char* broker_uri, const char* script_name, bmqt::CompressionAlgorithmType::Enum message_compression_type, - const bsls::TimeInterval& timeout, + bsl::optional num_processing_threads, + bsl::optional blob_buffer_size, + bsl::optional channel_high_watermark, + bsl::optional > event_queue_watermarks, + const bsls::TimeInterval& stats_dump_interval, + const bsls::TimeInterval& connect_timeout, + const bsls::TimeInterval& disconnect_timeout, + const bsls::TimeInterval& open_queue_timeout, + const bsls::TimeInterval& configure_queue_timeout, + const bsls::TimeInterval& close_queue_timeout, bool monitor_host_health, bsl::shared_ptr fake_host_health_monitor_sp, PyObject* error, @@ -117,10 +126,46 @@ Session::Session( .setProcessNameOverride(script_name) .setHostHealthMonitor(host_health_monitor_sp); - if (timeout != bsls::TimeInterval()) { - options.setOpenQueueTimeout(timeout) - .setConfigureQueueTimeout(timeout) - .setCloseQueueTimeout(timeout); + if (num_processing_threads.has_value()) { + options.setNumProcessingThreads(num_processing_threads.value()); + } + + if (blob_buffer_size.has_value()) { + options.setBlobBufferSize(blob_buffer_size.value()); + } + + if (channel_high_watermark.has_value()) { + options.setChannelHighWatermark(channel_high_watermark.value()); + } + + if (event_queue_watermarks.has_value()) { + options.configureEventQueue( + event_queue_watermarks.value().first, + event_queue_watermarks.value().second); + } + + if (stats_dump_interval != bsls::TimeInterval()) { + options.setStatsDumpInterval(stats_dump_interval); + } + + if (connect_timeout != bsls::TimeInterval()) { + options.setConnectTimeout(connect_timeout); + } + + if (disconnect_timeout != bsls::TimeInterval()) { + options.setDisconnectTimeout(disconnect_timeout); + } + + if (open_queue_timeout != bsls::TimeInterval()) { + options.setOpenQueueTimeout(open_queue_timeout); + } + + if (configure_queue_timeout != bsls::TimeInterval()) { + options.setConfigureQueueTimeout(configure_queue_timeout); + } + + if (close_queue_timeout != bsls::TimeInterval()) { + options.setCloseQueueTimeout(close_queue_timeout); } bslma::ManagedPtr handler( diff --git a/src/cpp/pybmq_session.h b/src/cpp/pybmq_session.h index f9813cc..f37a407 100644 --- a/src/cpp/pybmq_session.h +++ b/src/cpp/pybmq_session.h @@ -54,7 +54,16 @@ class Session const char* broker_uri, const char* script_name, bmqt::CompressionAlgorithmType::Enum message_compression_type, - const bsls::TimeInterval& timeout, + bsl::optional num_processing_threads, + bsl::optional blob_buffer_size, + bsl::optional channel_high_watermark, + bsl::optional > event_queue_watermarks, + const bsls::TimeInterval& stats_dump_interval, + const bsls::TimeInterval& connect_timeout, + const bsls::TimeInterval& disconnect_timeout, + const bsls::TimeInterval& open_queue_timeout, + const bsls::TimeInterval& configure_queue_timeout, + const bsls::TimeInterval& close_queue_timeout, bool monitor_host_health, bsl::shared_ptr fake_host_health_monitor, PyObject* d_error, diff --git a/src/declarations/pybmq.pxd b/src/declarations/pybmq.pxd index 77481fc..9ba293e 100644 --- a/src/declarations/pybmq.pxd +++ b/src/declarations/pybmq.pxd @@ -14,6 +14,7 @@ # limitations under the License. from bsl cimport optional +from bsl cimport pair from bsl cimport shared_ptr from bsl.bsls cimport TimeInterval from libcpp cimport bool as cppbool @@ -41,7 +42,16 @@ cdef extern from "pybmq_session.h" namespace "BloombergLP::pybmq" nogil: const char* broker_uri, const char* script_name, CompressionAlgorithmType message_compression_algorithm, - TimeInterval timeout, + optional[int] num_processing_threads, + optional[int] blob_buffer_size, + optional[int] channel_high_watermark, + optional[pair[int, int]] event_queue_watermarks, + TimeInterval stats_dump_interval, + TimeInterval connect_timeout, + TimeInterval disconnect_timeout, + TimeInterval open_queue_timeout, + TimeInterval configure_queue_timeout, + TimeInterval close_queue_timeout, bint monitor_host_health, shared_ptr[ManualHostHealthMonitor] fake_host_health_monitor_sp, object error, diff --git a/tests/unit/test_ext_session.py b/tests/unit/test_ext_session.py index 6bdd968..ea5a4b4 100644 --- a/tests/unit/test_ext_session.py +++ b/tests/unit/test_ext_session.py @@ -20,6 +20,7 @@ import pytest +from blazingmq import Timeouts from blazingmq import exceptions from blazingmq._ext import Session from blazingmq._ext import ensure_stop_session @@ -116,13 +117,14 @@ def test_stopped_session_not_stopped_on_dealloc(): mock.stop.assert_called_once_with() -def test_start_timeout(): +def test_start_connect_timeout(): # GIVEN mock = sdk_mock(start=0, stop=None) timeout = 1.12345 + timeouts = Timeouts(connect_timeout=timeout) # WHEN - session = Session(dummy_callback, timeout=timeout, _mock=mock) + session = Session(dummy_callback, timeouts=timeouts, _mock=mock) # THEN mock.start.assert_called_once_with(timeout=timeout) @@ -204,15 +206,56 @@ def test_process_name_override_non_unicode(monkeypatch): def test_session_timeout_passed_to_queue_defaults(): # GIVEN mock = sdk_mock(start=0, stop=None) - timeout = 321 + open_queue_timeout = 321 + configure_queue_timeout = 432 + close_queue_timeout = 543 + timeouts = Timeouts( + open_queue_timeout=open_queue_timeout, + configure_queue_timeout=configure_queue_timeout, + close_queue_timeout=close_queue_timeout, + ) # WHEN - Session(dummy_callback, timeout=timeout, _mock=mock) + Session( + dummy_callback, + timeouts=timeouts, + _mock=mock, + ) + + # THEN + assert mock.options["open_queue_timeout"] == open_queue_timeout + assert mock.options["configure_queue_timeout"] == configure_queue_timeout + assert mock.options["close_queue_timeout"] == close_queue_timeout + + +def test_session_session_options_propagated(): + # GIVEN + mock = sdk_mock(start=0, stop=None) + num_processing_threads = 10 + blob_buffer_size = 5000 + channel_high_watermark = 20000000 + event_queue_low_watermark = 1000000 + event_queue_high_watermark = 10000000 + stats_dump_interval = 90.0 + + # WHEN + Session( + dummy_callback, + num_processing_threads=num_processing_threads, + blob_buffer_size=blob_buffer_size, + channel_high_watermark=channel_high_watermark, + event_queue_watermarks=(event_queue_low_watermark, event_queue_high_watermark), + stats_dump_interval=stats_dump_interval, + _mock=mock, + ) # THEN - assert mock.options["open_queue_timeout"] == timeout - assert mock.options["configure_queue_timeout"] == timeout - assert mock.options["close_queue_timeout"] == timeout + assert mock.options["num_processing_threads"] == num_processing_threads + assert mock.options["blob_buffer_size"] == blob_buffer_size + assert mock.options["channel_high_watermark"] == channel_high_watermark + assert mock.options["event_queue_low_watermark"] == event_queue_low_watermark + assert mock.options["event_queue_high_watermark"] == event_queue_high_watermark + assert mock.options["stats_dump_interval"] == stats_dump_interval def test_ensure_stop_session_callback_calls_sdk_stop(): diff --git a/tests/unit/test_queue_options.py b/tests/unit/test_queue_options.py index 633777c..83b7797 100644 --- a/tests/unit/test_queue_options.py +++ b/tests/unit/test_queue_options.py @@ -47,10 +47,10 @@ def test_queue_options_default_to_none(): # WHEN options = blazingmq.QueueOptions() # THEN - options.consumer_priority is None - options.max_unconfirmed_bytes is None - options.max_unconfirmed_messages is None - options.suspends_on_bad_host_health is None + assert options.consumer_priority is None + assert options.max_unconfirmed_bytes is None + assert options.max_unconfirmed_messages is None + assert options.suspends_on_bad_host_health is None def test_queue_options_equality(): diff --git a/tests/unit/test_session.py b/tests/unit/test_session.py index bc39238..cd24cc0 100644 --- a/tests/unit/test_session.py +++ b/tests/unit/test_session.py @@ -22,6 +22,8 @@ from blazingmq import Error from blazingmq import QueueOptions from blazingmq import Session +from blazingmq import SessionOptions +from blazingmq import Timeouts from blazingmq._messages import create_message from blazingmq._session import DEFAULT_TIMEOUT from blazingmq.testing import HostHealth @@ -48,6 +50,111 @@ def dummy2(): on_message=dummy2, broker="some_uri", timeout=60.0, + num_processing_threads=1, + blob_buffer_size=5000, + channel_high_watermark=8000000, + event_queue_watermarks=(6000000, 7000000), + stats_dump_interval=30.0, + host_health_monitor=None, + ) + + # THEN + ext_cls.assert_called_once_with( + dummy1, + on_message=dummy2, + broker=b"some_uri", + message_compression_algorithm=CompressionAlgorithmType.NONE, + num_processing_threads=1, + blob_buffer_size=5000, + channel_high_watermark=8000000, + event_queue_watermarks=(6000000, 7000000), + stats_dump_interval=30.0, + timeouts=Timeouts( + connect_timeout=None, + disconnect_timeout=None, + open_queue_timeout=60.0, + configure_queue_timeout=60.0, + close_queue_timeout=60.0, + ), + monitor_host_health=False, + fake_host_health_monitor=None, + ) + + +@mock.patch("blazingmq._session.ExtSession") +def test_session_constructed_with_timeouts(ext_cls): + # GIVEN + ext_cls.mock_add_spec([]) + + def dummy1(): + pass + + def dummy2(): + pass + + timeouts = Timeouts( + connect_timeout=60.0, + disconnect_timeout=70.0, + open_queue_timeout=80.0, + configure_queue_timeout=90.0, + close_queue_timeout=100.0, + ) + + # WHEN + Session( + dummy1, + on_message=dummy2, + broker="some_uri", + timeout=timeouts, + num_processing_threads=1, + blob_buffer_size=5000, + channel_high_watermark=8000000, + event_queue_watermarks=(6000000, 7000000), + stats_dump_interval=30.0, + host_health_monitor=None, + ) + + # THEN + ext_cls.assert_called_once_with( + dummy1, + on_message=dummy2, + broker=b"some_uri", + message_compression_algorithm=CompressionAlgorithmType.NONE, + num_processing_threads=1, + blob_buffer_size=5000, + channel_high_watermark=8000000, + event_queue_watermarks=(6000000, 7000000), + stats_dump_interval=30.0, + timeouts=timeouts, + monitor_host_health=False, + fake_host_health_monitor=None, + ) + + +@mock.patch("blazingmq._session.ExtSession") +def test_session_constructed_with_default_timeouts(ext_cls): + # GIVEN + ext_cls.mock_add_spec([]) + + def dummy1(): + pass + + def dummy2(): + pass + + timeouts = Timeouts() + + # WHEN + Session( + dummy1, + on_message=dummy2, + broker="some_uri", + timeout=timeouts, + num_processing_threads=1, + blob_buffer_size=5000, + channel_high_watermark=8000000, + event_queue_watermarks=(6000000, 7000000), + stats_dump_interval=30.0, host_health_monitor=None, ) @@ -56,8 +163,100 @@ def dummy2(): dummy1, on_message=dummy2, broker=b"some_uri", - timeout=60.0, message_compression_algorithm=CompressionAlgorithmType.NONE, + num_processing_threads=1, + blob_buffer_size=5000, + channel_high_watermark=8000000, + event_queue_watermarks=(6000000, 7000000), + stats_dump_interval=30.0, + timeouts=timeouts, + monitor_host_health=False, + fake_host_health_monitor=None, + ) + + +@mock.patch("blazingmq._session.ExtSession") +def test_session_default_with_options(ext_cls): + # GIVEN + ext_cls.mock_add_spec([]) + + def dummy1(): + pass + + def dummy2(): + pass + + session_options = SessionOptions() + + # WHEN + Session.with_options( + dummy1, on_message=dummy2, broker="some_uri", session_options=session_options + ) + + # THEN + ext_cls.assert_called_once_with( + dummy1, + on_message=dummy2, + broker=b"some_uri", + message_compression_algorithm=CompressionAlgorithmType.NONE, + num_processing_threads=None, + blob_buffer_size=None, + channel_high_watermark=None, + event_queue_watermarks=None, + stats_dump_interval=None, + timeouts=Timeouts(), + monitor_host_health=False, + fake_host_health_monitor=None, + ) + + +@mock.patch("blazingmq._session.ExtSession") +def test_session_with_options(ext_cls): + # GIVEN + ext_cls.mock_add_spec([]) + + def dummy1(): + pass + + def dummy2(): + pass + + timeouts = Timeouts( + connect_timeout=60.0, + disconnect_timeout=70.0, + open_queue_timeout=80.0, + configure_queue_timeout=90.0, + close_queue_timeout=100.0, + ) + + session_options = SessionOptions( + message_compression_algorithm=CompressionAlgorithmType.NONE, + timeouts=timeouts, + host_health_monitor=None, + num_processing_threads=1, + blob_buffer_size=5000, + channel_high_watermark=8000000, + event_queue_watermarks=(6000000, 7000000), + stats_dump_interval=30.0, + ) + + # WHEN + Session.with_options( + dummy1, on_message=dummy2, broker="some_uri", session_options=session_options + ) + + # THEN + ext_cls.assert_called_once_with( + dummy1, + on_message=dummy2, + broker=b"some_uri", + message_compression_algorithm=CompressionAlgorithmType.NONE, + num_processing_threads=1, + blob_buffer_size=5000, + channel_high_watermark=8000000, + event_queue_watermarks=(6000000, 7000000), + stats_dump_interval=30.0, + timeouts=timeouts, monitor_host_health=False, fake_host_health_monitor=None, ) @@ -90,8 +289,19 @@ def dummy2(): dummy1, on_message=dummy2, broker=b"some_uri", - timeout=60.0, message_compression_algorithm=CompressionAlgorithmType.NONE, + num_processing_threads=None, + blob_buffer_size=None, + channel_high_watermark=None, + event_queue_watermarks=None, + stats_dump_interval=None, + timeouts=Timeouts( + connect_timeout=None, + disconnect_timeout=None, + open_queue_timeout=60.0, + configure_queue_timeout=60.0, + close_queue_timeout=60.0, + ), monitor_host_health=True, fake_host_health_monitor=monitor._monitor, ) @@ -116,8 +326,13 @@ def dummy2(): dummy1, on_message=dummy2, broker=b"tcp://localhost:30114", - timeout=None, message_compression_algorithm=CompressionAlgorithmType.NONE, + num_processing_threads=None, + blob_buffer_size=None, + channel_high_watermark=None, + event_queue_watermarks=None, + stats_dump_interval=None, + timeouts=Timeouts(), monitor_host_health=False, fake_host_health_monitor=None, ) @@ -410,7 +625,7 @@ def test_session_as_context_manager(ext): ext.stop.assert_called_once_with() -@pytest.mark.parametrize("timeout", [None, -1.0, 2.0**63, float("inf")]) +@pytest.mark.parametrize("timeout", [-1.0, 2.0**63, float("inf")]) def test_session_bad_timeout(timeout): # GIVEN def dummy(): @@ -428,7 +643,7 @@ def dummy(): @mock.patch("blazingmq._session.ExtSession") -@pytest.mark.parametrize("timeout", [None, -1.0, 0.0, 2.0**63, float("inf")]) +@pytest.mark.parametrize("timeout", [-1.0, 0.0, 2.0**63, float("inf")]) def test_session_open_queue_bad_timeout(ext_cls, timeout): # GIVEN def dummy(): @@ -448,7 +663,7 @@ def dummy(): @mock.patch("blazingmq._session.ExtSession") -@pytest.mark.parametrize("timeout", [None, -1.0, 0.0, 2.0**63, float("inf")]) +@pytest.mark.parametrize("timeout", [-1.0, 0.0, 2.0**63, float("inf")]) def test_session_configure_queue_bad_timeout(ext_cls, timeout): # GIVEN @@ -471,7 +686,7 @@ def dummy(): @mock.patch("blazingmq._session.ExtSession") -@pytest.mark.parametrize("timeout", [None, -1.0, 0.0, 2.0**63, float("inf")]) +@pytest.mark.parametrize("timeout", [-1.0, 0.0, 2.0**63, float("inf")]) def test_session_close_queue_bad_timeout(ext_cls, timeout): # GIVEN @@ -493,6 +708,30 @@ def dummy(): assert exc.match(expected_pat) +@pytest.mark.parametrize("stats_dump_interval", [-1.0, 2.0**63, float("inf")]) +def test_session_bad_stats_dump_interval(stats_dump_interval): + # GIVEN + def dummy(): + pass + + expected_pat = re.escape( + f"stats_dump_interval must be nonnegative, was {stats_dump_interval}" + ) + + # WHEN + with pytest.raises(Exception) as exc: + Session( + dummy, + on_message=dummy, + broker="some_uri", + stats_dump_interval=stats_dump_interval, + ) + + # THEN + assert exc.type is ValueError + assert exc.match(expected_pat) + + def test_default_timeout_repr(): # GIVEN # WHEN diff --git a/tests/unit/test_session_options.py b/tests/unit/test_session_options.py new file mode 100644 index 0000000..6adc505 --- /dev/null +++ b/tests/unit/test_session_options.py @@ -0,0 +1,102 @@ +# Copyright 2019-2023 Bloomberg Finance L.P. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +import blazingmq + + +def test_session_options_repr(): + # WHEN + one = blazingmq.SessionOptions( + message_compression_algorithm=blazingmq.CompressionAlgorithmType.ZLIB, + timeouts=blazingmq.Timeouts(), + host_health_monitor=blazingmq.BasicHealthMonitor(), + num_processing_threads=1, + blob_buffer_size=5000, + channel_high_watermark=8000000, + event_queue_watermarks=(6000000, 7000000), + stats_dump_interval=30.0, + ) + # THEN + assert ( + "SessionOptions(" + "message_compression_algorithm=," + " timeouts=Timeouts()," + " host_health_monitor=BasicHealthMonitor()," + " num_processing_threads=1," + " blob_buffer_size=5000," + " channel_high_watermark=8000000," + " event_queue_watermarks=(6000000, 7000000)," + " stats_dump_interval=30.0)" == repr(one) + ) + + +def test_session_options_default_repr(): + # WHEN + options = blazingmq.SessionOptions() + # THEN + assert "SessionOptions()" == repr(options) + + +def test_session_options_default_to_none(): + # WHEN + options = blazingmq.SessionOptions() + # THEN + assert options.message_compression_algorithm is None + assert options.timeouts is None + assert options.host_health_monitor is None + assert options.num_processing_threads is None + assert options.blob_buffer_size is None + assert options.channel_high_watermark is None + assert options.event_queue_watermarks is None + assert options.stats_dump_interval is None + + +def test_session_options_equality(): + # GIVEN + left = blazingmq.SessionOptions() + + # WHEN + right = blazingmq.SessionOptions() + + # THEN + assert left == right + assert (left != right) is False + + +@pytest.mark.parametrize( + "right", + [ + None, + "string", + blazingmq.SessionOptions( + message_compression_algorithm=blazingmq.CompressionAlgorithmType.ZLIB + ), + blazingmq.SessionOptions(timeouts=blazingmq.Timeouts()), + blazingmq.SessionOptions(host_health_monitor=blazingmq.BasicHealthMonitor()), + blazingmq.SessionOptions(num_processing_threads=1), + blazingmq.SessionOptions(blob_buffer_size=5000), + blazingmq.SessionOptions(channel_high_watermark=8000000), + blazingmq.SessionOptions(event_queue_watermarks=(6000000, 7000000)), + blazingmq.SessionOptions(stats_dump_interval=30.0), + ], +) +def test_queue_options_other_inequality(right): + # GIVEN + left = blazingmq.SessionOptions() + + # THEN + assert not left == right diff --git a/tests/unit/test_timeouts.py b/tests/unit/test_timeouts.py new file mode 100644 index 0000000..8873b76 --- /dev/null +++ b/tests/unit/test_timeouts.py @@ -0,0 +1,88 @@ +# Copyright 2019-2023 Bloomberg Finance L.P. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +import blazingmq + + +def test_timeouts_repr(): + # WHEN + one = blazingmq.Timeouts( + connect_timeout=60.0, + disconnect_timeout=70.0, + open_queue_timeout=80.0, + configure_queue_timeout=90.0, + close_queue_timeout=100.0, + ) + # THEN + assert ( + "Timeouts(" + "connect_timeout=60.0," + " disconnect_timeout=70.0," + " open_queue_timeout=80.0," + " configure_queue_timeout=90.0," + " close_queue_timeout=100.0)" == repr(one) + ) + + +def test_timeouts_default_repr(): + # WHEN + timeouts = blazingmq.Timeouts() + # THEN + assert "Timeouts()" == repr(timeouts) + + +def test_timeouts_default_to_none(): + # WHEN + timeouts = blazingmq.Timeouts() + # THEN + assert timeouts.connect_timeout is None + assert timeouts.disconnect_timeout is None + assert timeouts.open_queue_timeout is None + assert timeouts.configure_queue_timeout is None + assert timeouts.close_queue_timeout is None + + +def test_timeouts_equality(): + # GIVEN + left = blazingmq.Timeouts() + + # WHEN + right = blazingmq.Timeouts() + + # THEN + assert left == right + assert (left != right) is False + + +@pytest.mark.parametrize( + "right", + [ + None, + "string", + blazingmq.Timeouts(connect_timeout=60.0), + blazingmq.Timeouts(disconnect_timeout=70.0), + blazingmq.Timeouts(open_queue_timeout=80.0), + blazingmq.Timeouts(configure_queue_timeout=90.0), + blazingmq.Timeouts(close_queue_timeout=100.0), + ], +) +def test_timeouts_other_inequality(right): + # GIVEN + left = blazingmq.Timeouts() + + # THEN + assert not left == right