From 921713912535e17b96415d2cc8d1d860ec4e1b20 Mon Sep 17 00:00:00 2001 From: annatisch Date: Fri, 1 Mar 2019 09:59:14 -0800 Subject: [PATCH] Fixed datetime offset (#99) * Fixed datetime offset * Updated pylint * Removed 3.4 pylint pass --- .travis.yml | 1 - HISTORY.rst | 9 +++ azure/eventhub/__init__.py | 2 +- azure/eventhub/async_ops/__init__.py | 2 +- azure/eventhub/async_ops/receiver_async.py | 45 ++++++++------- azure/eventhub/async_ops/sender_async.py | 45 ++++++++------- azure/eventhub/client.py | 2 +- azure/eventhub/common.py | 4 +- azure/eventhub/receiver.py | 45 ++++++++------- azure/eventhub/sender.py | 45 ++++++++------- .../abstract_checkpoint_manager.py | 5 -- .../abstract_event_processor.py | 4 -- .../abstract_lease_manager.py | 10 ---- azure/eventprocessorhost/partition_pump.py | 2 - dev_requirements.txt | 2 +- tests/test_receive.py | 55 ++++++++++++++++++- 16 files changed, 165 insertions(+), 113 deletions(-) diff --git a/.travis.yml b/.travis.yml index 37ac36e..261b227 100644 --- a/.travis.yml +++ b/.travis.yml @@ -17,7 +17,6 @@ matrix: script: - pytest - python ./setup.py check -r -s - - pylint --ignore=async_ops azure.eventhub - os: linux python: "3.5" script: diff --git a/HISTORY.rst b/HISTORY.rst index 7983b57..2181037 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -3,6 +3,15 @@ Release History =============== +1.3.1 (2019-02-28) +++++++++++++++++++ + +**BugFixes** + +- Fixed bug where datetime offset filter was using a local timestamp rather than UTC. +- Fixed stackoverflow error in continuous connection reconnect attempts. + + 1.3.0 (2019-01-29) ++++++++++++++++++ diff --git a/azure/eventhub/__init__.py b/azure/eventhub/__init__.py index f94d95c..7067761 100644 --- a/azure/eventhub/__init__.py +++ b/azure/eventhub/__init__.py @@ -3,7 +3,7 @@ # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- -__version__ = "1.3.0" +__version__ = "1.3.1" from azure.eventhub.common import EventData, EventHubError, Offset from azure.eventhub.client import EventHubClient diff --git a/azure/eventhub/async_ops/__init__.py b/azure/eventhub/async_ops/__init__.py index 293376c..1d659ce 100644 --- a/azure/eventhub/async_ops/__init__.py +++ b/azure/eventhub/async_ops/__init__.py @@ -123,7 +123,7 @@ async def run_async(self): if failed and len(failed) == len(self.clients): log.warning("%r: All clients failed to start.", self.container_id) raise failed[0] - elif failed: + if failed: log.warning("%r: %r clients failed to start.", self.container_id, len(failed)) elif redirects: await self._handle_redirect(redirects) diff --git a/azure/eventhub/async_ops/receiver_async.py b/azure/eventhub/async_ops/receiver_async.py index 7e446d1..9a09091 100644 --- a/azure/eventhub/async_ops/receiver_async.py +++ b/azure/eventhub/async_ops/receiver_async.py @@ -49,6 +49,7 @@ def __init__( # pylint: disable=super-init-not-called self.keep_alive = keep_alive self.auto_reconnect = auto_reconnect self.retry_policy = errors.ErrorPolicy(max_retries=3, on_error=_error_handler) + self.reconnect_backoff = 1 self.redirected = None self.error = None self.properties = None @@ -107,9 +108,7 @@ async def open_async(self): while not await self._handler.client_ready_async(): await asyncio.sleep(0.05) - async def reconnect_async(self): # pylint: disable=too-many-statements - """If the Receiver was disconnected from the service with - a retryable error - attempt to reconnect.""" + async def _reconnect_async(self): # pylint: disable=too-many-statements # pylint: disable=protected-access alt_creds = { "username": self.client._auth_config.get("iot_username"), @@ -134,6 +133,7 @@ async def reconnect_async(self): # pylint: disable=too-many-statements await self._handler.open_async() while not await self._handler.client_ready_async(): await asyncio.sleep(0.05) + return True except errors.TokenExpired as shutdown: log.info("AsyncReceiver disconnected due to token expiry. Shutting down.") error = EventHubError(str(shutdown), shutdown) @@ -142,36 +142,39 @@ async def reconnect_async(self): # pylint: disable=too-many-statements except (errors.LinkDetach, errors.ConnectionClose) as shutdown: if shutdown.action.retry and self.auto_reconnect: log.info("AsyncReceiver detached. Attempting reconnect.") - await self.reconnect_async() - else: - log.info("AsyncReceiver detached. Shutting down.") - error = EventHubError(str(shutdown), shutdown) - await self.close_async(exception=error) - raise error + return False + log.info("AsyncReceiver detached. Shutting down.") + error = EventHubError(str(shutdown), shutdown) + await self.close_async(exception=error) + raise error except errors.MessageHandlerError as shutdown: if self.auto_reconnect: log.info("AsyncReceiver detached. Attempting reconnect.") - await self.reconnect_async() - else: - log.info("AsyncReceiver detached. Shutting down.") - error = EventHubError(str(shutdown), shutdown) - await self.close_async(exception=error) - raise error + return False + log.info("AsyncReceiver detached. Shutting down.") + error = EventHubError(str(shutdown), shutdown) + await self.close_async(exception=error) + raise error except errors.AMQPConnectionError as shutdown: if str(shutdown).startswith("Unable to open authentication session") and self.auto_reconnect: log.info("AsyncReceiver couldn't authenticate. Attempting reconnect.") - await self.reconnect_async() - else: - log.info("AsyncReceiver connection error (%r). Shutting down.", e) - error = EventHubError(str(shutdown), shutdown) - await self.close_async(exception=error) - raise error + return False + log.info("AsyncReceiver connection error (%r). Shutting down.", e) + error = EventHubError(str(shutdown), shutdown) + await self.close_async(exception=error) + raise error except Exception as e: log.info("Unexpected error occurred (%r). Shutting down.", e) error = EventHubError("Receiver reconnect failed: {}".format(e)) await self.close_async(exception=error) raise error + async def reconnect_async(self): + """If the Receiver was disconnected from the service with + a retryable error - attempt to reconnect.""" + while not await self._reconnect_async(): + await asyncio.sleep(self.reconnect_backoff) + async def has_started(self): """ Whether the handler has completed all start up processes such as diff --git a/azure/eventhub/async_ops/sender_async.py b/azure/eventhub/async_ops/sender_async.py index 8866c17..6fc786d 100644 --- a/azure/eventhub/async_ops/sender_async.py +++ b/azure/eventhub/async_ops/sender_async.py @@ -55,6 +55,7 @@ def __init__( # pylint: disable=super-init-not-called self.auto_reconnect = auto_reconnect self.timeout = send_timeout self.retry_policy = errors.ErrorPolicy(max_retries=3, on_error=_error_handler) + self.reconnect_backoff = 1 self.name = "EHSender-{}".format(uuid.uuid4()) self.redirected = None self.error = None @@ -100,9 +101,7 @@ async def open_async(self): while not await self._handler.client_ready_async(): await asyncio.sleep(0.05) - async def reconnect_async(self): - """If the Receiver was disconnected from the service with - a retryable error - attempt to reconnect.""" + async def _reconnect_async(self): await self._handler.close_async() unsent_events = self._handler.pending_messages self._handler = SendClientAsync( @@ -119,6 +118,7 @@ async def reconnect_async(self): await self._handler.open_async() self._handler.queue_message(*unsent_events) await self._handler.wait_async() + return True except errors.TokenExpired as shutdown: log.info("AsyncSender disconnected due to token expiry. Shutting down.") error = EventHubError(str(shutdown), shutdown) @@ -127,36 +127,39 @@ async def reconnect_async(self): except (errors.LinkDetach, errors.ConnectionClose) as shutdown: if shutdown.action.retry and self.auto_reconnect: log.info("AsyncSender detached. Attempting reconnect.") - await self.reconnect_async() - else: - log.info("AsyncSender reconnect failed. Shutting down.") - error = EventHubError(str(shutdown), shutdown) - await self.close_async(exception=error) - raise error + return False + log.info("AsyncSender reconnect failed. Shutting down.") + error = EventHubError(str(shutdown), shutdown) + await self.close_async(exception=error) + raise error except errors.MessageHandlerError as shutdown: if self.auto_reconnect: log.info("AsyncSender detached. Attempting reconnect.") - await self.reconnect_async() - else: - log.info("AsyncSender reconnect failed. Shutting down.") - error = EventHubError(str(shutdown), shutdown) - await self.close_async(exception=error) - raise error + return False + log.info("AsyncSender reconnect failed. Shutting down.") + error = EventHubError(str(shutdown), shutdown) + await self.close_async(exception=error) + raise error except errors.AMQPConnectionError as shutdown: if str(shutdown).startswith("Unable to open authentication session") and self.auto_reconnect: log.info("AsyncSender couldn't authenticate. Attempting reconnect.") - await self.reconnect_async() - else: - log.info("AsyncSender connection error (%r). Shutting down.", e) - error = EventHubError(str(shutdown), shutdown) - await self.close_async(exception=error) - raise error + return False + log.info("AsyncSender connection error (%r). Shutting down.", e) + error = EventHubError(str(shutdown), shutdown) + await self.close_async(exception=error) + raise error except Exception as e: log.info("Unexpected error occurred (%r). Shutting down.", e) error = EventHubError("Sender reconnect failed: {}".format(e)) await self.close_async(exception=error) raise error + async def reconnect_async(self): + """If the Receiver was disconnected from the service with + a retryable error - attempt to reconnect.""" + while not await self._reconnect_async(): + await asyncio.sleep(self.reconnect_backoff) + async def has_started(self): """ Whether the handler has completed all start up processes such as diff --git a/azure/eventhub/client.py b/azure/eventhub/client.py index de67d6a..765c8ed 100644 --- a/azure/eventhub/client.py +++ b/azure/eventhub/client.py @@ -320,7 +320,7 @@ def run(self): if failed and len(failed) == len(self.clients): log.warning("%r: All clients failed to start.", self.container_id) raise failed[0] - elif failed: + if failed: log.warning("%r: %r clients failed to start.", self.container_id, len(failed)) elif redirects: self._handle_redirect(redirects) diff --git a/azure/eventhub/common.py b/azure/eventhub/common.py index c7014b0..81bfcbb 100644 --- a/azure/eventhub/common.py +++ b/azure/eventhub/common.py @@ -5,7 +5,7 @@ from __future__ import unicode_literals import datetime -import time +import calendar import json import six @@ -288,7 +288,7 @@ def selector(self): """ operator = ">=" if self.inclusive else ">" if isinstance(self.value, datetime.datetime): - timestamp = (time.mktime(self.value.timetuple()) * 1000) + (self.value.microsecond/1000) + timestamp = (calendar.timegm(self.value.utctimetuple()) * 1000) + (self.value.microsecond/1000) return ("amqp.annotation.x-opt-enqueued-time {} '{}'".format(operator, int(timestamp))).encode('utf-8') if isinstance(self.value, six.integer_types): return ("amqp.annotation.x-opt-sequence-number {} '{}'".format(operator, self.value)).encode('utf-8') diff --git a/azure/eventhub/receiver.py b/azure/eventhub/receiver.py index ddff3c2..9cd6c6f 100644 --- a/azure/eventhub/receiver.py +++ b/azure/eventhub/receiver.py @@ -47,6 +47,7 @@ def __init__(self, client, source, offset=None, prefetch=300, epoch=None, keep_a self.keep_alive = keep_alive self.auto_reconnect = auto_reconnect self.retry_policy = errors.ErrorPolicy(max_retries=3, on_error=_error_handler) + self.reconnect_backoff = 1 self.properties = None self.redirected = None self.error = None @@ -103,9 +104,7 @@ def open(self): while not self._handler.client_ready(): time.sleep(0.05) - def reconnect(self): # pylint: disable=too-many-statements - """If the Receiver was disconnected from the service with - a retryable error - attempt to reconnect.""" + def _reconnect(self): # pylint: disable=too-many-statements # pylint: disable=protected-access alt_creds = { "username": self.client._auth_config.get("iot_username"), @@ -129,6 +128,7 @@ def reconnect(self): # pylint: disable=too-many-statements self._handler.open() while not self._handler.client_ready(): time.sleep(0.05) + return True except errors.TokenExpired as shutdown: log.info("Receiver disconnected due to token expiry. Shutting down.") error = EventHubError(str(shutdown), shutdown) @@ -137,36 +137,39 @@ def reconnect(self): # pylint: disable=too-many-statements except (errors.LinkDetach, errors.ConnectionClose) as shutdown: if shutdown.action.retry and self.auto_reconnect: log.info("Receiver detached. Attempting reconnect.") - self.reconnect() - else: - log.info("Receiver detached. Shutting down.") - error = EventHubError(str(shutdown), shutdown) - self.close(exception=error) - raise error + return False + log.info("Receiver detached. Shutting down.") + error = EventHubError(str(shutdown), shutdown) + self.close(exception=error) + raise error except errors.MessageHandlerError as shutdown: if self.auto_reconnect: log.info("Receiver detached. Attempting reconnect.") - self.reconnect() - else: - log.info("Receiver detached. Shutting down.") - error = EventHubError(str(shutdown), shutdown) - self.close(exception=error) - raise error + return False + log.info("Receiver detached. Shutting down.") + error = EventHubError(str(shutdown), shutdown) + self.close(exception=error) + raise error except errors.AMQPConnectionError as shutdown: if str(shutdown).startswith("Unable to open authentication session") and self.auto_reconnect: log.info("Receiver couldn't authenticate. Attempting reconnect.") - self.reconnect() - else: - log.info("Receiver connection error (%r). Shutting down.", e) - error = EventHubError(str(shutdown), shutdown) - self.close(exception=error) - raise error + return False + log.info("Receiver connection error (%r). Shutting down.", e) + error = EventHubError(str(shutdown)) + self.close(exception=error) + raise error except Exception as e: log.info("Unexpected error occurred (%r). Shutting down.", e) error = EventHubError("Receiver reconnect failed: {}".format(e)) self.close(exception=error) raise error + def reconnect(self): + """If the Receiver was disconnected from the service with + a retryable error - attempt to reconnect.""" + while not self._reconnect(): + time.sleep(self.reconnect_backoff) + def get_handler_state(self): """ Get the state of the underlying handler with regards to start diff --git a/azure/eventhub/sender.py b/azure/eventhub/sender.py index c66d4d6..d96ea0c 100644 --- a/azure/eventhub/sender.py +++ b/azure/eventhub/sender.py @@ -52,6 +52,7 @@ def __init__(self, client, target, partition=None, send_timeout=60, keep_alive=N self.keep_alive = keep_alive self.auto_reconnect = auto_reconnect self.retry_policy = errors.ErrorPolicy(max_retries=3, on_error=_error_handler) + self.reconnect_backoff = 1 self.name = "EHSender-{}".format(uuid.uuid4()) if partition: self.target += "/Partitions/" + partition @@ -93,9 +94,7 @@ def open(self): while not self._handler.client_ready(): time.sleep(0.05) - def reconnect(self): - """If the Sender was disconnected from the service with - a retryable error - attempt to reconnect.""" + def _reconnect(self): # pylint: disable=protected-access self._handler.close() unsent_events = self._handler.pending_messages @@ -112,6 +111,7 @@ def reconnect(self): self._handler.open() self._handler.queue_message(*unsent_events) self._handler.wait() + return True except errors.TokenExpired as shutdown: log.info("Sender disconnected due to token expiry. Shutting down.") error = EventHubError(str(shutdown), shutdown) @@ -120,36 +120,39 @@ def reconnect(self): except (errors.LinkDetach, errors.ConnectionClose) as shutdown: if shutdown.action.retry and self.auto_reconnect: log.info("Sender detached. Attempting reconnect.") - self.reconnect() - else: - log.info("Sender reconnect failed. Shutting down.") - error = EventHubError(str(shutdown), shutdown) - self.close(exception=error) - raise error + return False + log.info("Sender reconnect failed. Shutting down.") + error = EventHubError(str(shutdown), shutdown) + self.close(exception=error) + raise error except errors.MessageHandlerError as shutdown: if self.auto_reconnect: log.info("Sender detached. Attempting reconnect.") - self.reconnect() - else: - log.info("Sender reconnect failed. Shutting down.") - error = EventHubError(str(shutdown), shutdown) - self.close(exception=error) - raise error + return False + log.info("Sender reconnect failed. Shutting down.") + error = EventHubError(str(shutdown), shutdown) + self.close(exception=error) + raise error except errors.AMQPConnectionError as shutdown: if str(shutdown).startswith("Unable to open authentication session") and self.auto_reconnect: log.info("Sender couldn't authenticate. Attempting reconnect.") - self.reconnect() - else: - log.info("Sender connection error (%r). Shutting down.", e) - error = EventHubError(str(shutdown), shutdown) - self.close(exception=error) - raise error + return False + log.info("Sender connection error (%r). Shutting down.", e) + error = EventHubError(str(shutdown)) + self.close(exception=error) + raise error except Exception as e: log.info("Unexpected error occurred (%r). Shutting down.", e) error = EventHubError("Sender Reconnect failed: {}".format(e)) self.close(exception=error) raise error + def reconnect(self): + """If the Sender was disconnected from the service with + a retryable error - attempt to reconnect.""" + while not self._reconnect(): + time.sleep(self.reconnect_backoff) + def get_handler_state(self): """ Get the state of the underlying handler with regards to start diff --git a/azure/eventprocessorhost/abstract_checkpoint_manager.py b/azure/eventprocessorhost/abstract_checkpoint_manager.py index 5e6ec84..b482859 100644 --- a/azure/eventprocessorhost/abstract_checkpoint_manager.py +++ b/azure/eventprocessorhost/abstract_checkpoint_manager.py @@ -25,7 +25,6 @@ async def create_checkpoint_store_if_not_exists_async(self): if there was a failure. :rtype: bool """ - pass @abstractmethod async def get_checkpoint_async(self, partition_id): @@ -38,7 +37,6 @@ async def get_checkpoint_async(self, partition_id): :return: Given partition checkpoint info, or `None` if none has been previously stored. :rtype: ~azure.eventprocessorhost.checkpoint.Checkpoint """ - pass @abstractmethod async def create_checkpoint_if_not_exists_async(self, partition_id): @@ -51,7 +49,6 @@ async def create_checkpoint_if_not_exists_async(self, partition_id): :return: The checkpoint for the given partition, whether newly created or already existing. :rtype: ~azure.eventprocessorhost.checkpoint.Checkpoint """ - pass @abstractmethod async def update_checkpoint_async(self, lease, checkpoint): @@ -63,7 +60,6 @@ async def update_checkpoint_async(self, lease, checkpoint): :param checkpoint: offset/sequeceNumber to update the store with. :type checkpoint: ~azure.eventprocessorhost.checkpoint.Checkpoint """ - pass @abstractmethod async def delete_checkpoint_async(self, partition_id): @@ -74,4 +70,3 @@ async def delete_checkpoint_async(self, partition_id): :param partition_id: The ID of a given parition. :type partition_id: str """ - pass diff --git a/azure/eventprocessorhost/abstract_event_processor.py b/azure/eventprocessorhost/abstract_event_processor.py index 14adcf3..4fbd7fb 100644 --- a/azure/eventprocessorhost/abstract_event_processor.py +++ b/azure/eventprocessorhost/abstract_event_processor.py @@ -21,7 +21,6 @@ async def open_async(self, context): :param context: Information about the partition :type context: ~azure.eventprocessorhost.partition_context.PartitionContext """ - pass @abstractmethod async def close_async(self, context, reason): @@ -33,7 +32,6 @@ async def close_async(self, context, reason): :param reason: The reason for closing. :type reason: str """ - pass @abstractmethod async def process_events_async(self, context, messages): @@ -46,7 +44,6 @@ async def process_events_async(self, context, messages): :param messages: The events to be processed. :type messages: list[~azure.eventhub.common.EventData] """ - pass @abstractmethod async def process_error_async(self, context, error): @@ -59,4 +56,3 @@ async def process_error_async(self, context, error): :type context: ~azure.eventprocessorhost.partition_context.PartitionContext :param error: The error that occured. """ - pass diff --git a/azure/eventprocessorhost/abstract_lease_manager.py b/azure/eventprocessorhost/abstract_lease_manager.py index 8638e49..1577a3b 100644 --- a/azure/eventprocessorhost/abstract_lease_manager.py +++ b/azure/eventprocessorhost/abstract_lease_manager.py @@ -29,7 +29,6 @@ async def create_lease_store_if_not_exists_async(self): :return: `True` if the lease store already exists or was created successfully, `False` if not. :rtype: bool """ - pass @abstractmethod async def delete_lease_store_async(self): @@ -39,7 +38,6 @@ async def delete_lease_store_async(self): :return: `True` if the lease store was deleted successfully, `False` if not. :rtype: bool """ - pass async def get_lease_async(self, partition_id): """ @@ -51,7 +49,6 @@ async def get_lease_async(self, partition_id): :return: Lease info for the partition, or `None`. :rtype: """ - pass @abstractmethod def get_all_leases(self): @@ -62,7 +59,6 @@ def get_all_leases(self): :return: A list of lease info. :rtype: """ - pass @abstractmethod async def create_lease_if_not_exists_async(self, partition_id): @@ -74,7 +70,6 @@ async def create_lease_if_not_exists_async(self, partition_id): :type parition_id: str :return: The existing or newly-created lease info for the partition. """ - pass @abstractmethod async def delete_lease_async(self, lease): @@ -85,7 +80,6 @@ async def delete_lease_async(self, lease): :param lease: The lease to be deleted. :type lease: ~azure.eventprocessorhost.lease.Lease """ - pass @abstractmethod async def acquire_lease_async(self, lease): @@ -99,7 +93,6 @@ async def acquire_lease_async(self, lease): :return: `True` if the lease was acquired successfully, `False` if not. :rtype: bool """ - pass @abstractmethod async def renew_lease_async(self, lease): @@ -113,7 +106,6 @@ async def renew_lease_async(self, lease): :return: `True` if the lease was renewed successfully, `False` if not. :rtype: bool """ - pass @abstractmethod async def release_lease_async(self, lease): @@ -126,7 +118,6 @@ async def release_lease_async(self, lease): :return: `True` if the lease was released successfully, `False` if not. :rtype: bool """ - pass @abstractmethod async def update_lease_async(self, lease): @@ -141,4 +132,3 @@ async def update_lease_async(self, lease): :return: `True` if the updated was performed successfully, `False` if not. :rtype: bool """ - pass diff --git a/azure/eventprocessorhost/partition_pump.py b/azure/eventprocessorhost/partition_pump.py index b487a65..9dbc55d 100644 --- a/azure/eventprocessorhost/partition_pump.py +++ b/azure/eventprocessorhost/partition_pump.py @@ -80,7 +80,6 @@ async def on_open_async(self): """ Event handler for on open event. """ - pass def is_closing(self): """ @@ -129,7 +128,6 @@ async def on_closing_async(self, reason): :param reason: The reason for the shutdown. :type reason: str """ - pass async def process_events_async(self, events): """ diff --git a/dev_requirements.txt b/dev_requirements.txt index 734516e..956c881 100644 --- a/dev_requirements.txt +++ b/dev_requirements.txt @@ -3,7 +3,7 @@ pytest-asyncio>=0.8.0; python_version > '3.4' azure-servicebus==0.50.0 docutils>=0.14 pygments>=2.2.0 -pylint==2.1.1; python_version >= '3.4' +pylint==2.3.0; python_version >= '3.4' pylint==1.8.4; python_version < '3.4' behave==1.2.6 wheel \ No newline at end of file diff --git a/tests/test_receive.py b/tests/test_receive.py index fa96c7a..6538977 100644 --- a/tests/test_receive.py +++ b/tests/test_receive.py @@ -7,11 +7,33 @@ import os import pytest import time +import datetime from azure import eventhub from azure.eventhub import EventData, EventHubClient, Offset +# def test_receive_without_events(connstr_senders): +# connection_str, senders = connstr_senders +# client = EventHubClient.from_connection_string(connection_str, debug=True) +# receiver = client.add_receiver("$default", "0", offset=Offset('@latest')) +# finish = datetime.datetime.now() + datetime.timedelta(seconds=240) +# count = 0 +# try: +# client.run() +# while True: #datetime.datetime.now() < finish: +# senders[0].send(EventData("Receiving an event {}".format(count))) +# received = receiver.receive(timeout=1) +# if received: +# print(received[0].body_as_str()) +# count += 1 +# time.sleep(1) +# except: +# raise +# finally: +# client.stop() + + def test_receive_end_of_stream(connstr_senders): connection_str, senders = connstr_senders client = EventHubClient.from_connection_string(connection_str, debug=False) @@ -95,7 +117,7 @@ def test_receive_with_inclusive_offset(connstr_senders): client.stop() -def test_receive_with_datetime(connstr_senders): +def test_receive_with_datetime_sync(connstr_senders): connection_str, senders = connstr_senders client = EventHubClient.from_connection_string(connection_str, debug=False) partitions = client.get_eventhub_info() @@ -128,6 +150,37 @@ def test_receive_with_datetime(connstr_senders): client.stop() +def test_receive_with_custom_datetime_sync(connstr_senders): + connection_str, senders = connstr_senders + client = EventHubClient.from_connection_string(connection_str, debug=False) + for i in range(5): + senders[0].send(EventData(b"Message before timestamp")) + time.sleep(60) + + now = datetime.datetime.utcnow() + offset = datetime.datetime(now.year, now.month, now.day, now.hour, now.minute) + for i in range(5): + senders[0].send(EventData(b"Message after timestamp")) + + receiver = client.add_receiver("$default", "0", offset=Offset(offset)) + try: + client.run() + all_received = [] + received = receiver.receive(timeout=1) + while received: + all_received.extend(received) + received = receiver.receive(timeout=1) + + assert len(all_received) == 5 + for received_event in all_received: + assert received_event.body_as_str() == "Message after timestamp" + assert received_event.enqueued_time > offset + except: + raise + finally: + client.stop() + + def test_receive_with_sequence_no(connstr_senders): connection_str, senders = connstr_senders client = EventHubClient.from_connection_string(connection_str, debug=False)