Skip to content

Commit

Permalink
Expose more session options from C++ SDK
Browse files Browse the repository at this point in the history
Since the creation of the BlazingMQ Python SDK, our underlying C++ SDK libbmq
has gained more `SessionOptions` that a user can configure, including
additional and more fine-grained timeout settings, new watermark
configurations, and buffer sizes.  These session options are not currently
exposed in the Python SDK, though, so Python users are unable to make use of
the new configuration options.  We want to expose all new session options that
are not already marked as deprecated or that do not involve distributed trace,
which will require more substantial integration work.  This work can be
undertaken in the future without impact on the changes here.  This patch
extends the Python SDK to expose these new SessionOptions, modifying the API,
documentation, and tests.

The BlazingMQ Python SDK is divided into three layers, with the lowest level
being a pure C++ library pybmq that interfaces with the BlazingMQ C++ client
library libbmq, with the highest level being exposed through the `_session.py`
module’s `Session` Python class, and with a thin middle layer of Cython glue
code in `_ext.pyx` connecting the two.  In order to expose these new
`SessionOptions` to users of the Python SDK, we need to include them in each
layer of the library.  Crucially, we want the changes in the highest layer,
`_session.py` to maintain backwards compatibility with all existing clients of
the Python SDK.  We maintain compatibility and evolve our API with two steps:

  1. Add the new session options as additional parameters to the `Session`
     class’s constructor.  First, all new session options that the `Session`
     constructor accepts are defaulted to `None`, which pybmq ignores,
     maintaining the existing behavior for callers who do not set any of these
     new arguments.  Second, new session options are also appended to the list
     of arguments, so clients who build a `Session` using positional arguments
     rather than named arguments will continue to function correctly.  Finally,
     we introduce a new value-semantic class `Timeouts`, which allows users to
     independently set the timeouts for each of the five session operations
     whose timeouts are configureable: session connection, session
     disconnection, queue opening, queue configuraiton, and queue closing.  By
     overloading the `timeout` argument to taking either a simple float or an
     instance of `Timeouts`, users who currently set a timeout for the
     `Session` by passing in a single timeout will continue to see the same
     behavior, but users passing in a new `Timeouts` value have access to the
     more fine-grained timeout settings that libbmq’s `SessionOptions`
     provides.

  2. Add a new named constructor `with_options` to `Session`, which takes an
     instance of a new value-semantic class `SessionOptions`.  Thile users can
     continue to construct the `Session` directly using its extended
     constructor, adding the additional session options arguments to the
     constructor makes it somewhat unwieldly to call, especially if the user
     only wants to configure one of the session options.  This patch provides
     an alternative, simpler way to construct a `Session` instance, using a new
     value-semantic builder class named `SessionOptions` that holds each of the
     new session options and a class method named `with_options`, which takes
     an instance of `SessionOptions` and returns a new `Session`.
     `with_options` is the recommended way to construct a new session if you
     plan to configure the session options.
  • Loading branch information
pniedzielski authored Dec 6, 2023
1 parent f4077d3 commit cdad1e9
Show file tree
Hide file tree
Showing 16 changed files with 1,011 additions and 51 deletions.
8 changes: 8 additions & 0 deletions docs/api_reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,14 @@ Session
.. autoclass:: Session
:members:

.. autoclass:: Timeouts
:members:
:member-order: bysource

.. autoclass:: SessionOptions
:members:
:member-order: bysource

.. autoclass:: QueueOptions
:members:
:member-order: bysource
Expand Down
1 change: 1 addition & 0 deletions news/8.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Exposed more `SessionOptions` from libbmq
4 changes: 4 additions & 0 deletions src/blazingmq/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
from ._monitors import BasicHealthMonitor
from ._session import QueueOptions
from ._session import Session
from ._session import SessionOptions
from ._timeouts import Timeouts
from ._typing import PropertyTypeDict
from ._typing import PropertyValueDict
from .exceptions import Error
Expand All @@ -42,6 +44,8 @@
"Message",
"MessageHandle",
"Session",
"SessionOptions",
"Timeouts",
"__version__",
"exceptions",
"session_events",
Expand Down
8 changes: 7 additions & 1 deletion src/blazingmq/_ext.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ from blazingmq import CompressionAlgorithmType
from blazingmq import Message
from blazingmq import MessageHandle
from blazingmq import PropertyType
from blazingmq import Timeouts
from blazingmq.session_events import SessionEvent

DEFAULT_MAX_UNCONFIRMED_MESSAGES: int = ...
Expand All @@ -44,7 +45,12 @@ class Session:
on_message: Optional[Callable[[Message, MessageHandle], None]] = None,
broker: bytes,
message_compression_algorithm: CompressionAlgorithmType,
timeout: Optional[float] = None,
num_processing_threads: Optional[int] = None,
blob_buffer_size: Optional[int] = None,
channel_high_watermark: Optional[int] = None,
event_queue_watermarks: Optional[tuple[int, int]] = None,
stats_dump_interval: Optional[int | float] = None,
timeouts: Timeouts = Timeouts(),
monitor_host_health: bool = False,
fake_host_health_monitor: Optional[FakeHostHealthMonitor] = None,
) -> None: ...
Expand Down
43 changes: 39 additions & 4 deletions src/blazingmq/_ext.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import logging
import weakref

from bsl cimport optional
from bsl cimport pair
from bsl cimport shared_ptr
from bsl.bsls cimport TimeInterval
from cpython.ceval cimport PyEval_InitThreads
Expand All @@ -43,6 +44,7 @@ from . import _callbacks
from . import _enums
from . import _messages
from . import _script_name
from . import _timeouts
from . import session_events
from .exceptions import BrokerTimeoutError
from .exceptions import Error
Expand Down Expand Up @@ -164,15 +166,40 @@ cdef class Session:
on_message=None,
broker not None: bytes = b'tcp://localhost:30114',
message_compression_algorithm not None=_enums.CompressionAlgorithmType.NONE,
timeout: Optional[int|float] = None,
num_processing_threads: Optional[int] = None,
blob_buffer_size: Optional[int] = None,
channel_high_watermark: Optional[int] = None,
event_queue_watermarks: Optional[tuple[int,int]] = None,
stats_dump_interval: Optional[int|float] = None,
timeouts: _timeouts.Timeouts = _timeouts.Timeouts(),
monitor_host_health: bool = False,
fake_host_health_monitor: FakeHostHealthMonitor = None,
_mock: Optional[object] = None,
) -> None:
cdef shared_ptr[ManualHostHealthMonitor] fake_host_health_monitor_sp
cdef optional[int] c_num_processing_threads
cdef optional[int] c_blob_buffer_size
cdef optional[int] c_channel_high_watermark
cdef optional[pair[int,int]] c_event_queue_watermarks
cdef TimeInterval c_stats_dump_interval = create_time_interval(stats_dump_interval)
cdef TimeInterval c_connect_timeout = create_time_interval(timeouts.connect_timeout)
cdef TimeInterval c_disconnect_timeout = create_time_interval(timeouts.disconnect_timeout)
cdef TimeInterval c_open_queue_timeout = create_time_interval(timeouts.open_queue_timeout)
cdef TimeInterval c_configure_queue_timeout = create_time_interval(timeouts.configure_queue_timeout)
cdef TimeInterval c_close_queue_timeout = create_time_interval(timeouts.close_queue_timeout)

PyEval_InitThreads()

if num_processing_threads is not None:
c_num_processing_threads = optional[int](num_processing_threads)
if blob_buffer_size is not None:
c_blob_buffer_size = optional[int](blob_buffer_size)
if channel_high_watermark is not None:
c_channel_high_watermark = optional[int](channel_high_watermark)
if event_queue_watermarks is not None:
c_event_queue_watermarks = optional[pair[int,int]](
pair[int,int](event_queue_watermarks[0], event_queue_watermarks[1]))

self.monitor_host_health = monitor_host_health

if fake_host_health_monitor:
Expand All @@ -194,21 +221,29 @@ cdef class Session:
cdef char *c_broker_uri = broker
script_name = _script_name.get_script_name()
cdef char *c_script_name = script_name
cdef TimeInterval c_timeout = create_time_interval(timeout)
self._session = new NativeSession(
session_cb,
message_cb,
ack_cb,
c_broker_uri,
c_script_name,
COMPRESSION_ALGO_FROM_PY_MAPPING[message_compression_algorithm],
c_timeout,
c_num_processing_threads,
c_blob_buffer_size,
c_channel_high_watermark,
c_event_queue_watermarks,
c_stats_dump_interval,
c_connect_timeout,
c_disconnect_timeout,
c_open_queue_timeout,
c_configure_queue_timeout,
c_close_queue_timeout,
monitor_host_health,
fake_host_health_monitor_sp,
Error,
BrokerTimeoutError,
_mock)
self._session.start(c_timeout)
self._session.start(c_connect_timeout)
atexit.register(ensure_stop_session_impl, weakref.ref(self))

def stop(self) -> None:
Expand Down
Loading

0 comments on commit cdad1e9

Please sign in to comment.