diff --git a/HISTORY.rst b/HISTORY.rst index c90d34f..89db3a3 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -3,6 +3,19 @@ Release History =============== +0.2.0rc2 (2018-07-29) ++++++++++++++++++++++ + +- **Breaking change** `EventData.offset` will now return an object of type `~uamqp.common.Offset` rather than str. + The original string value can be retrieved from `~uamqp.common.Offset.value`. +- Each sender/receiver will now run in its own independent connection. +- Updated uAMQP dependency to 0.2.0 +- Fixed issue with IoTHub clients not being able to retrieve partition information. +- Added support for HTTP proxy settings to both EventHubClient and EPH. +- Added error handling policy to automatically reconnect on retryable error. +- Added keep-alive thread for maintaining an unused connection. + + 0.2.0rc1 (2018-07-06) +++++++++++++++++++++ diff --git a/azure/eventhub/__init__.py b/azure/eventhub/__init__.py index 5182b38..5acadea 100644 --- a/azure/eventhub/__init__.py +++ b/azure/eventhub/__init__.py @@ -3,7 +3,7 @@ # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- -__version__ = "0.2.0rc1" +__version__ = "0.2.0rc2" from azure.eventhub.common import EventData, EventHubError, Offset from azure.eventhub.client import EventHubClient diff --git a/azure/eventhub/_async/__init__.py b/azure/eventhub/_async/__init__.py index e2e727a..93a60c5 100644 --- a/azure/eventhub/_async/__init__.py +++ b/azure/eventhub/_async/__init__.py @@ -7,11 +7,14 @@ import asyncio import time import datetime +try: + from urllib import urlparse, unquote_plus, urlencode, quote_plus +except ImportError: + from urllib.parse import urlparse, unquote_plus, urlencode, quote_plus from uamqp import authentication, constants, types, errors from uamqp import ( Message, - Source, ConnectionAsync, AMQPClientAsync, SendClientAsync, @@ -37,7 +40,7 @@ class EventHubClientAsync(EventHubClient): sending events to and receiving events from the Azure Event Hubs service. """ - def _create_auth(self, auth_uri, username, password): # pylint: disable=no-self-use + def _create_auth(self, username=None, password=None): # pylint: disable=no-self-use """ Create an ~uamqp.authentication.cbs_auth_async.SASTokenAuthAsync instance to authenticate the session. @@ -49,32 +52,13 @@ def _create_auth(self, auth_uri, username, password): # pylint: disable=no-self :param password: The shared access key. :type password: str """ + username = username or self._auth_config['username'] + password = password or self._auth_config['password'] if "@sas.root" in username: - return authentication.SASLPlain(self.address.hostname, username, password) - return authentication.SASTokenAsync.from_shared_access_key(auth_uri, username, password) - - def _create_connection_async(self): - """ - Create a new ~uamqp._async.connection_async.ConnectionAsync instance that will be shared between all - AsyncSender/AsyncReceiver clients. - """ - if not self.connection: - log.info("{}: Creating connection with address={}".format( - self.container_id, self.address.geturl())) - self.connection = ConnectionAsync( - self.address.hostname, - self.auth, - container_id=self.container_id, - properties=self._create_properties(), - debug=self.debug) - - async def _close_connection_async(self): - """ - Close and destroy the connection async. - """ - if self.connection: - await self.connection.destroy_async() - self.connection = None + return authentication.SASLPlain( + self.address.hostname, username, password, http_proxy=self.http_proxy) + return authentication.SASTokenAsync.from_shared_access_key( + self.auth_uri, username, password, timeout=60, http_proxy=self.http_proxy) async def _close_clients_async(self): """ @@ -85,17 +69,13 @@ async def _close_clients_async(self): async def _wait_for_client(self, client): try: while client.get_handler_state().value == 2: - await self.connection.work_async() + await client._handler._connection.work_async() # pylint: disable=protected-access except Exception as exp: # pylint: disable=broad-except await client.close_async(exception=exp) async def _start_client_async(self, client): try: - await client.open_async(self.connection) - started = await client.has_started() - while not started: - await self.connection.work_async() - started = await client.has_started() + await client.open_async() except Exception as exp: # pylint: disable=broad-except await client.close_async(exception=exp) @@ -108,9 +88,8 @@ async def _handle_redirect(self, redirects): redirects = [c.redirected for c in self.clients if c.redirected] if not all(r.hostname == redirects[0].hostname for r in redirects): raise EventHubError("Multiple clients attempting to redirect to different hosts.") - self.auth = self._create_auth(redirects[0].address.decode('utf-8'), **self._auth_config) - await self.connection.redirect_async(redirects[0], self.auth) - await asyncio.gather(*[c.open_async(self.connection) for c in self.clients]) + self._process_redirect_uri(redirects[0]) + await asyncio.gather(*[c.open_async() for c in self.clients]) async def run_async(self): """ @@ -125,7 +104,6 @@ async def run_async(self): :rtype: list[~azure.eventhub.common.EventHubError] """ log.info("{}: Starting {} clients".format(self.container_id, len(self.clients))) - self._create_connection_async() tasks = [self._start_client_async(c) for c in self.clients] try: await asyncio.gather(*tasks) @@ -153,7 +131,6 @@ async def stop_async(self): log.info("{}: Stopping {} clients".format(self.container_id, len(self.clients))) self.stopped = True await self._close_clients_async() - await self._close_connection_async() async def get_eventhub_info_async(self): """ @@ -161,10 +138,14 @@ async def get_eventhub_info_async(self): :rtype: dict """ - eh_name = self.address.path.lstrip('/') - target = "amqps://{}/{}".format(self.address.hostname, eh_name) - async with AMQPClientAsync(target, auth=self.auth, debug=self.debug) as mgmt_client: - mgmt_msg = Message(application_properties={'name': eh_name}) + alt_creds = { + "username": self._auth_config.get("iot_username"), + "password":self._auth_config.get("iot_password")} + try: + mgmt_auth = self._create_auth(**alt_creds) + mgmt_client = AMQPClientAsync(self.mgmt_target, auth=mgmt_auth, debug=self.debug) + await mgmt_client.open_async() + mgmt_msg = Message(application_properties={'name': self.eh_name}) response = await mgmt_client.mgmt_request_async( mgmt_msg, constants.READ_OPERATION, @@ -180,6 +161,8 @@ async def get_eventhub_info_async(self): output['partition_count'] = eh_info[b'partition_count'] output['partition_ids'] = [p.decode('utf-8') for p in eh_info[b'partition_ids']] return output + finally: + await mgmt_client.close_async() def add_async_receiver(self, consumer_group, partition, offset=None, prefetch=300, operation=None, loop=None): """ @@ -201,10 +184,7 @@ def add_async_receiver(self, consumer_group, partition, offset=None, prefetch=30 path = self.address.path + operation if operation else self.address.path source_url = "amqps://{}{}/ConsumerGroups/{}/Partitions/{}".format( self.address.hostname, path, consumer_group, partition) - source = Source(source_url) - if offset is not None: - source.set_filter(offset.selector()) - handler = AsyncReceiver(self, source, prefetch=prefetch, loop=loop) + handler = AsyncReceiver(self, source_url, offset=offset, prefetch=prefetch, loop=loop) self.clients.append(handler) return handler diff --git a/azure/eventhub/_async/receiver_async.py b/azure/eventhub/_async/receiver_async.py index 2ceb518..6ba2c3a 100644 --- a/azure/eventhub/_async/receiver_async.py +++ b/azure/eventhub/_async/receiver_async.py @@ -6,10 +6,11 @@ import asyncio from uamqp import errors, types -from uamqp import ReceiveClientAsync +from uamqp import ReceiveClientAsync, Source from azure.eventhub import EventHubError, EventData from azure.eventhub.receiver import Receiver +from azure.eventhub.common import _error_handler class AsyncReceiver(Receiver): @@ -17,7 +18,7 @@ class AsyncReceiver(Receiver): Implements the async API of a Receiver. """ - def __init__(self, client, source, prefetch=300, epoch=None, loop=None): # pylint: disable=super-init-not-called + def __init__(self, client, source, offset=None, prefetch=300, epoch=None, loop=None): # pylint: disable=super-init-not-called """ Instantiate an async receiver. @@ -33,25 +34,32 @@ def __init__(self, client, source, prefetch=300, epoch=None, loop=None): # pyli :param loop: An event loop. """ self.loop = loop or asyncio.get_event_loop() + self.client = client + self.source = source + self.offset = offset + self.prefetch = prefetch + self.epoch = epoch + self.retry_policy = errors.ErrorPolicy(max_retries=3, on_error=_error_handler) self.redirected = None self.error = None - self.debug = client.debug - self.offset = None - self.prefetch = prefetch self.properties = None - self.epoch = epoch + source = Source(self.source) + if self.offset is not None: + source.set_filter(self.offset.selector()) if epoch: self.properties = {types.AMQPSymbol(self._epoch): types.AMQPLong(int(epoch))} self._handler = ReceiveClientAsync( source, - auth=client.auth, - debug=self.debug, + auth=self.client.get_auth(), + debug=self.client.debug, prefetch=self.prefetch, link_properties=self.properties, timeout=self.timeout, + error_policy=self.retry_policy, + keep_alive_interval=30, loop=self.loop) - async def open_async(self, connection): + async def open_async(self): """ Open the Receiver using the supplied conneciton. If the handler has previously been redirected, the redirect @@ -60,16 +68,54 @@ async def open_async(self, connection): :param connection: The underlying client shared connection. :type: connection: ~uamqp._async.connection_async.ConnectionAsync """ + # pylint: disable=protected-access if self.redirected: + self.source = self.redirected.address + source = Source(self.source) + if self.offset is not None: + source.set_filter(self.offset.selector()) + alt_creds = { + "username": self.client._auth_config.get("iot_username"), + "password":self.client._auth_config.get("iot_password")} self._handler = ReceiveClientAsync( - self.redirected.address, - auth=None, - debug=self.debug, + source, + auth=self.client.get_auth(**alt_creds), + debug=self.client.debug, prefetch=self.prefetch, link_properties=self.properties, timeout=self.timeout, + error_policy=self.retry_policy, + keep_alive_interval=30, loop=self.loop) - await self._handler.open_async(connection=connection) + await self._handler.open_async() + while not await self.has_started(): + await self._handler._connection.work_async() + + async def reconnect_async(self): + """If the Receiver was disconnected from the service with + a retryable error - attempt to reconnect.""" + # pylint: disable=protected-access + alt_creds = { + "username": self.client._auth_config.get("iot_username"), + "password":self.client._auth_config.get("iot_password")} + await self._handler.close_async() + source = Source(self.source) + if self.offset is not None: + source.set_filter(self.offset.selector()) + self._handler = ReceiveClientAsync( + source, + auth=self.client.get_auth(**alt_creds), + debug=self.client.debug, + prefetch=self.prefetch, + link_properties=self.properties, + timeout=self.timeout, + error_policy=self.retry_policy, + keep_alive_interval=30, + properties=self.client.create_properties(), + loop=self.loop) + await self._handler.open_async() + while not await self.has_started(): + await self._handler._connection.work_async() async def has_started(self): """ @@ -88,7 +134,7 @@ async def has_started(self): raise EventHubError("Authorization timeout.") elif auth_in_progress: return False - elif not await self._handler._client_ready(): + elif not await self._handler._client_ready_async(): return False else: return True @@ -109,6 +155,8 @@ async def close_async(self, exception=None): self.redirected = exception elif isinstance(exception, EventHubError): self.error = exception + elif isinstance(exception, (errors.LinkDetach, errors.ConnectionClose)): + self.error = EventHubError(str(exception), exception) elif exception: self.error = EventHubError(str(exception)) else: @@ -129,21 +177,28 @@ async def receive(self, max_batch_size=None, timeout=None): """ if self.error: raise self.error + data_batch = [] try: timeout_ms = 1000 * timeout if timeout else 0 message_batch = await self._handler.receive_message_batch_async( max_batch_size=max_batch_size, timeout=timeout_ms) - data_batch = [] for message in message_batch: event_data = EventData(message=message) self.offset = event_data.offset data_batch.append(event_data) return data_batch - except errors.LinkDetach as detach: - error = EventHubError(str(detach)) - await self.close_async(exception=error) - raise error + except (errors.LinkDetach, errors.ConnectionClose) as shutdown: + if shutdown.action.retry: + await self.reconnect_async() + return data_batch + else: + error = EventHubError(str(shutdown), shutdown) + await self.close_async(exception=error) + raise error + except errors.MessageHandlerError: + await self.reconnect_async() + return data_batch except Exception as e: error = EventHubError("Receive failed: {}".format(e)) await self.close_async(exception=error) diff --git a/azure/eventhub/_async/sender_async.py b/azure/eventhub/_async/sender_async.py index 3e57e3c..42865f3 100644 --- a/azure/eventhub/_async/sender_async.py +++ b/azure/eventhub/_async/sender_async.py @@ -10,6 +10,7 @@ from azure.eventhub import EventHubError from azure.eventhub.sender import Sender +from azure.eventhub.common import _error_handler class AsyncSender(Sender): """ @@ -26,23 +27,28 @@ def __init__(self, client, target, partition=None, loop=None): # pylint: disabl :type target: str :param loop: An event loop. """ + self.loop = loop or asyncio.get_event_loop() + self.client = client + self.target = target + self.partition = partition + self.retry_policy = errors.ErrorPolicy(max_retries=3, on_error=_error_handler) self.redirected = None self.error = None - self.debug = client.debug - self.partition = partition if partition: - target += "/Partitions/" + partition - self.loop = loop or asyncio.get_event_loop() + self.target += "/Partitions/" + partition self._handler = SendClientAsync( - target, - auth=client.auth, - debug=self.debug, + self.target, + auth=self.client.get_auth(), + debug=self.client.debug, msg_timeout=Sender.TIMEOUT, + error_policy=self.retry_policy, + keep_alive_interval=30, + properties=self.client.create_properties(), loop=self.loop) self._outcome = None self._condition = None - async def open_async(self, connection): + async def open_async(self): """ Open the Sender using the supplied conneciton. If the handler has previously been redirected, the redirect @@ -52,12 +58,39 @@ async def open_async(self, connection): :type: connection:~uamqp._async.connection_async.ConnectionAsync """ if self.redirected: + self.target = self.redirected.address self._handler = SendClientAsync( - self.redirected.address, - auth=None, - debug=self.debug, - msg_timeout=Sender.TIMEOUT) - await self._handler.open_async(connection=connection) + self.target, + auth=self.client.get_auth(), + debug=self.client.debug, + msg_timeout=Sender.TIMEOUT, + error_policy=self.retry_policy, + keep_alive_interval=30, + properties=self.client.create_properties(), + loop=self.loop) + await self._handler.open_async() + while not await self.has_started(): + await self._handler._connection.work_async() # pylint: disable=protected-access + + async def reconnect_async(self): + """If the Receiver was disconnected from the service with + a retryable error - attempt to reconnect.""" + # pylint: disable=protected-access + pending_states = (constants.MessageState.WaitingForSendAck, constants.MessageState.WaitingToBeSent) + unsent_events = [e for e in self._handler._pending_messages if e.state in pending_states] + await self._handler.close_async() + self._handler = SendClientAsync( + self.target, + auth=self.client.get_auth(), + debug=self.client.debug, + msg_timeout=Sender.TIMEOUT, + error_policy=self.retry_policy, + keep_alive_interval=30, + properties=self.client.create_properties(), + loop=self.loop) + await self._handler.open_async() + self._handler._pending_messages = unsent_events + await self._handler.wait_async() async def has_started(self): """ @@ -76,7 +109,7 @@ async def has_started(self): raise EventHubError("Authorization timeout.") elif auth_in_progress: return False - elif not await self._handler._client_ready(): + elif not await self._handler._client_ready_async(): return False else: return True @@ -97,6 +130,8 @@ async def close_async(self, exception=None): self.redirected = exception elif isinstance(exception, EventHubError): self.error = exception + elif isinstance(exception, (errors.LinkDetach, errors.ConnectionClose)): + self.error = EventHubError(str(exception), exception) elif exception: self.error = EventHubError(str(exception)) else: @@ -122,10 +157,15 @@ async def send(self, event_data): await self._handler.send_message_async(event_data.message) if self._outcome != constants.MessageSendResult.Ok: raise Sender._error(self._outcome, self._condition) - except errors.LinkDetach as detach: - error = EventHubError(str(detach)) - await self.close_async(exception=error) - raise error + except (errors.LinkDetach, errors.ConnectionClose) as shutdown: + if shutdown.action.retry: + await self.reconnect_async() + else: + error = EventHubError(str(shutdown), shutdown) + await self.close_async(exception=error) + raise error + except errors.MessageHandlerError: + await self.reconnect_async() except Exception as e: error = EventHubError("Send failed: {}".format(e)) await self.close_async(exception=error) @@ -141,5 +181,14 @@ async def wait_async(self): raise self.error try: await self._handler.wait_async() + except (errors.LinkDetach, errors.ConnectionClose) as shutdown: + if shutdown.action.retry: + await self.reconnect_async() + else: + error = EventHubError(str(shutdown), shutdown) + await self.close_async(exception=error) + raise error + except errors.MessageHandlerError: + await self.reconnect_async() except Exception as e: raise EventHubError("Send failed: {}".format(e)) diff --git a/azure/eventhub/client.py b/azure/eventhub/client.py index 6e37dfe..9c57cbd 100644 --- a/azure/eventhub/client.py +++ b/azure/eventhub/client.py @@ -8,15 +8,14 @@ import sys import uuid import time +import functools try: from urllib import urlparse, unquote_plus, urlencode, quote_plus except ImportError: from urllib.parse import urlparse, unquote_plus, urlencode, quote_plus import uamqp -from uamqp import Connection from uamqp import Message -from uamqp import Source from uamqp import authentication from uamqp import constants @@ -89,7 +88,7 @@ class EventHubClient(object): events to and receiving events from the Azure Event Hubs service. """ - def __init__(self, address, username=None, password=None, debug=False): + def __init__(self, address, username=None, password=None, debug=False, http_proxy=None): """ Constructs a new EventHubClient with the given address URL. @@ -105,19 +104,25 @@ def __init__(self, address, username=None, password=None, debug=False): :param debug: Whether to output network trace logs to the logger. Default is `False`. :type debug: bool + :param http_proxy: HTTP proxy settings. This must be a dictionary with the following + keys: 'proxy_hostname' (str value) and 'proxy_port' (int value). + Additionally the following keys may also be present: 'username', 'password'. + :type http_proxy: dict[str, Any] """ self.container_id = "eventhub.pysdk-" + str(uuid.uuid4())[:8] self.address = urlparse(address) + self.eh_name = self.address.path.lstrip('/') + self.http_proxy = http_proxy + self.mgmt_target = "amqps://{}/{}".format(self.address.hostname, self.eh_name) url_username = unquote_plus(self.address.username) if self.address.username else None username = username or url_username url_password = unquote_plus(self.address.password) if self.address.password else None password = password or url_password if not username or not password: raise ValueError("Missing username and/or password.") - auth_uri = "sb://{}{}".format(self.address.hostname, self.address.path) - self.auth = self._create_auth(auth_uri, username, password) - self._auth_config = None - self.connection = None + self.auth_uri = "sb://{}{}".format(self.address.hostname, self.address.path) + self._auth_config = {'username': username, 'password': password} + self.get_auth = functools.partial(self._create_auth) self.debug = debug self.clients = [] @@ -146,10 +151,14 @@ def from_iothub_connection_string(cls, conn_str, **kwargs): username = "{}@sas.root.{}".format(policy, hub_name) password = _generate_sas_token(address, policy, key) client = cls("amqps://" + address, username=username, password=password, **kwargs) - client._auth_config = {'username': policy, 'password': key} # pylint: disable=protected-access + client._auth_config = { # pylint: disable=protected-access + 'iot_username': policy, + 'iot_password': key, + 'username': username, + 'password': password} return client - def _create_auth(self, auth_uri, username, password): # pylint: disable=no-self-use + def _create_auth(self, username=None, password=None): """ Create an ~uamqp.authentication.SASTokenAuth instance to authenticate the session. @@ -161,11 +170,15 @@ def _create_auth(self, auth_uri, username, password): # pylint: disable=no-self :param password: The shared access key. :type password: str """ + username = username or self._auth_config['username'] + password = password or self._auth_config['password'] if "@sas.root" in username: - return authentication.SASLPlain(self.address.hostname, username, password) - return authentication.SASTokenAuth.from_shared_access_key(auth_uri, username, password) + return authentication.SASLPlain( + self.address.hostname, username, password, http_proxy=self.http_proxy) + return authentication.SASTokenAuth.from_shared_access_key( + self.auth_uri, username, password, timeout=60, http_proxy=self.http_proxy) - def _create_properties(self): # pylint: disable=no-self-use + def create_properties(self): # pylint: disable=no-self-use """ Format the properties with which to instantiate the connection. This acts like a user agent over HTTP. @@ -179,29 +192,6 @@ def _create_properties(self): # pylint: disable=no-self-use properties["platform"] = sys.platform return properties - def _create_connection(self): - """ - Create a new ~uamqp.connection.Connection instance that will be shared between all - Sender/Receiver clients. - """ - if not self.connection: - log.info("{}: Creating connection with address={}".format( - self.container_id, self.address.geturl())) - self.connection = Connection( - self.address.hostname, - self.auth, - container_id=self.container_id, - properties=self._create_properties(), - debug=self.debug) - - def _close_connection(self): - """ - Close and destroy the connection. - """ - if self.connection: - self.connection.destroy() - self.connection = None - def _close_clients(self): """ Close all open Sender/Receiver clients. @@ -212,21 +202,26 @@ def _close_clients(self): def _start_clients(self): for client in self.clients: try: - client.open(self.connection) - while not client.has_started(): - self.connection.work() + client.open() except Exception as exp: # pylint: disable=broad-except client.close(exception=exp) + def _process_redirect_uri(self, redirect): + redirect_uri = redirect.address.decode('utf-8') + auth_uri, _, _ = redirect_uri.partition("/ConsumerGroups") + self.address = urlparse(auth_uri) + self.auth_uri = "sb://{}{}".format(self.address.hostname, self.address.path) + self.eh_name = self.address.path.lstrip('/') + self.mgmt_target = redirect_uri + def _handle_redirect(self, redirects): if len(redirects) != len(self.clients): raise EventHubError("Some clients are attempting to redirect the connection.") if not all(r.hostname == redirects[0].hostname for r in redirects): raise EventHubError("Multiple clients attempting to redirect to different hosts.") - self.auth = self._create_auth(redirects[0].address.decode('utf-8'), **self._auth_config) - self.connection.redirect(redirects[0], self.auth) + self._process_redirect_uri(redirects[0]) for client in self.clients: - client.open(self.connection) + client.open() def run(self): """ @@ -241,7 +236,6 @@ def run(self): :rtype: list[~azure.eventhub.common.EventHubError] """ log.info("{}: Starting {} clients".format(self.container_id, len(self.clients))) - self._create_connection() try: self._start_clients() redirects = [c.redirected for c in self.clients if c.redirected] @@ -268,7 +262,6 @@ def stop(self): log.info("{}: Stopping {} clients".format(self.container_id, len(self.clients))) self.stopped = True self._close_clients() - self._close_connection() def get_eventhub_info(self): """ @@ -282,13 +275,14 @@ def get_eventhub_info(self): :rtype: dict """ - self._create_connection() - eh_name = self.address.path.lstrip('/') - target = "amqps://{}/{}".format(self.address.hostname, eh_name) - mgmt_client = uamqp.AMQPClient(target, auth=self.auth, debug=self.debug) - mgmt_client.open(self.connection) + alt_creds = { + "username": self._auth_config.get("iot_username"), + "password":self._auth_config.get("iot_password")} try: - mgmt_msg = Message(application_properties={'name': eh_name}) + mgmt_auth = self._create_auth(**alt_creds) + mgmt_client = uamqp.AMQPClient(self.mgmt_target, auth=mgmt_auth, debug=self.debug) + mgmt_client.open() + mgmt_msg = Message(application_properties={'name': self.eh_name}) response = mgmt_client.mgmt_request( mgmt_msg, constants.READ_OPERATION, @@ -329,10 +323,7 @@ def add_receiver(self, consumer_group, partition, offset=None, prefetch=300, ope path = self.address.path + operation if operation else self.address.path source_url = "amqps://{}{}/ConsumerGroups/{}/Partitions/{}".format( self.address.hostname, path, consumer_group, partition) - source = Source(source_url) - if offset is not None: - source.set_filter(offset.selector()) - handler = Receiver(self, source, prefetch=prefetch) + handler = Receiver(self, source_url, offset=offset, prefetch=prefetch) self.clients.append(handler) return handler diff --git a/azure/eventhub/common.py b/azure/eventhub/common.py index 4ba972a..035a812 100644 --- a/azure/eventhub/common.py +++ b/azure/eventhub/common.py @@ -7,9 +7,40 @@ import time from uamqp import Message, BatchMessage -from uamqp import types +from uamqp import types, constants, errors from uamqp.message import MessageHeader, MessageProperties +_NO_RETRY_ERRORS = ( + b"com.microsoft:argument-out-of-range", + b"com.microsoft:entity-disabled", + b"com.microsoft:auth-failed", + b"com.microsoft:precondition-failed", + b"com.microsoft:argument-error" +) + +def _error_handler(error): + """ + Called internally when an event has failed to send so we + can parse the error to determine whether we should attempt + to retry sending the event again. + Returns the action to take according to error type. + + :param error: The error received in the send attempt. + :type error: Exception + :rtype: ~uamqp.errors.ErrorAction + """ + if error.condition == b'com.microsoft:server-busy': + return errors.ErrorAction(retry=True, backoff=4) + elif error.condition == b'com.microsoft:timeout': + return errors.ErrorAction(retry=True, backoff=2) + elif error.condition == b'com.microsoft:operation-cancelled': + return errors.ErrorAction(retry=True) + elif error.condition == b"com.microsoft:container-close": + return errors.ErrorAction(retry=True, backoff=4) + elif error.condition in _NO_RETRY_ERRORS: + return errors.ErrorAction(retry=False) + return errors.ErrorAction(retry=True) + class EventData(object): """ @@ -75,7 +106,7 @@ def offset(self): :rtype: int """ try: - return self._annotations[EventData.PROP_OFFSET].decode('UTF-8') + return Offset(self._annotations[EventData.PROP_OFFSET].decode('UTF-8')) except (KeyError, AttributeError): return None @@ -208,4 +239,41 @@ class EventHubError(Exception): """ Represents an error happened in the client. """ - pass + + def __init__(self, message, details=None): + self.error = None + self.message = message + self.details = details + if isinstance(message, constants.MessageSendResult): + self.message = "Message send failed with result: {}".format(message) + if details and isinstance(details, Exception): + try: + condition = details.condition.value.decode('UTF-8') + except AttributeError: + condition = details.condition.decode('UTF-8') + _, _, self.error = condition.partition(':') + self.message += "\nError: {}".format(self.error) + try: + self._parse_error(details.description) + for detail in self.details: + self.message += "\n{}".format(detail) + except: # pylint: disable=bare-except + self.message += "\n{}".format(details) + super(EventHubError, self).__init__(self.message) + + def _parse_error(self, error_list): + details = [] + self.message = error_list if isinstance(error_list, str) else error_list.decode('UTF-8') + details_index = self.message.find(" Reference:") + if details_index >= 0: + details_msg = self.message[details_index + 1:] + self.message = self.message[0:details_index] + + tracking_index = details_msg.index(", TrackingId:") + system_index = details_msg.index(", SystemTracker:") + timestamp_index = details_msg.index(", Timestamp:") + details.append(details_msg[:tracking_index]) + details.append(details_msg[tracking_index + 2: system_index]) + details.append(details_msg[system_index + 2: timestamp_index]) + details.append(details_msg[timestamp_index + 2:]) + self.details = details diff --git a/azure/eventhub/receiver.py b/azure/eventhub/receiver.py index 3cef829..49a15ce 100644 --- a/azure/eventhub/receiver.py +++ b/azure/eventhub/receiver.py @@ -4,9 +4,9 @@ # -------------------------------------------------------------------------------------------- from uamqp import types, errors -from uamqp import ReceiveClient +from uamqp import ReceiveClient, Source -from azure.eventhub.common import EventHubError, EventData, Offset +from azure.eventhub.common import EventHubError, EventData, _error_handler class Receiver: @@ -16,38 +16,46 @@ class Receiver: timeout = 0 _epoch = b'com.microsoft:epoch' - def __init__(self, client, source, prefetch=300, epoch=None): + def __init__(self, client, source, offset=None, prefetch=300, epoch=None): """ Instantiate a receiver. :param client: The parent EventHubClient. :type client: ~azure.eventhub.client.EventHubClient :param source: The source EventHub from which to receive events. - :type source: ~uamqp.address.Source + :type source: str :param prefetch: The number of events to prefetch from the service for processing. Default is 300. :type prefetch: int :param epoch: An optional epoch value. :type epoch: int """ - self.offset = None + self.client = client + self.source = source + self.offset = offset self.prefetch = prefetch self.epoch = epoch + self.retry_policy = errors.ErrorPolicy(max_retries=3, on_error=_error_handler) self.properties = None self.redirected = None - self.debug = client.debug self.error = None + source = Source(self.source) + if self.offset is not None: + source.set_filter(self.offset.selector()) if epoch: self.properties = {types.AMQPSymbol(self._epoch): types.AMQPLong(int(epoch))} self._handler = ReceiveClient( source, - auth=client.auth, - debug=self.debug, + auth=self.client.get_auth(), + debug=self.client.debug, prefetch=self.prefetch, link_properties=self.properties, - timeout=self.timeout) + timeout=self.timeout, + error_policy=self.retry_policy, + keep_alive_interval=30, + properties=self.client.create_properties()) - def open(self, connection): + def open(self): """ Open the Receiver using the supplied conneciton. If the handler has previously been redirected, the redirect @@ -56,15 +64,53 @@ def open(self, connection): :param connection: The underlying client shared connection. :type: connection: ~uamqp.connection.Connection """ + # pylint: disable=protected-access if self.redirected: + self.source = self.redirected.address + source = Source(self.source) + if self.offset is not None: + source.set_filter(self.offset.selector()) + alt_creds = { + "username": self.client._auth_config.get("iot_username"), + "password":self.client._auth_config.get("iot_password")} self._handler = ReceiveClient( - self.redirected.address, - auth=None, - debug=self.debug, + source, + auth=self.client.get_auth(**alt_creds), + debug=self.client.debug, prefetch=self.prefetch, link_properties=self.properties, - timeout=self.timeout) - self._handler.open(connection) + timeout=self.timeout, + error_policy=self.retry_policy, + keep_alive_interval=30, + properties=self.client.create_properties()) + self._handler.open() + while not self.has_started(): + self._handler._connection.work() + + def reconnect(self): + """If the Receiver was disconnected from the service with + a retryable error - attempt to reconnect.""" + # pylint: disable=protected-access + alt_creds = { + "username": self.client._auth_config.get("iot_username"), + "password":self.client._auth_config.get("iot_password")} + self._handler.close() + source = Source(self.source) + if self.offset is not None: + source.set_filter(self.offset.selector()) + self._handler = ReceiveClient( + source, + auth=self.client.get_auth(**alt_creds), + debug=self.client.debug, + prefetch=self.prefetch, + link_properties=self.properties, + timeout=self.timeout, + error_policy=self.retry_policy, + keep_alive_interval=30, + properties=self.client.create_properties()) + self._handler.open() + while not self.has_started(): + self._handler._connection.work() def get_handler_state(self): """ @@ -146,34 +192,29 @@ def receive(self, max_batch_size=None, timeout=None): """ if self.error: raise self.error + data_batch = [] try: timeout_ms = 1000 * timeout if timeout else 0 message_batch = self._handler.receive_message_batch( max_batch_size=max_batch_size, timeout=timeout_ms) - data_batch = [] for message in message_batch: event_data = EventData(message=message) self.offset = event_data.offset data_batch.append(event_data) return data_batch - except errors.LinkDetach as detach: - error = EventHubError(str(detach)) - self.close(exception=error) - raise error + except (errors.LinkDetach, errors.ConnectionClose) as shutdown: + if shutdown.action.retry: + self.reconnect() + return data_batch + else: + error = EventHubError(str(shutdown), shutdown) + self.close(exception=error) + raise error + except errors.MessageHandlerError: + self.reconnect() + return data_batch except Exception as e: error = EventHubError("Receive failed: {}".format(e)) self.close(exception=error) raise error - - def selector(self, default): - """ - Create a selector for the current offset if it is set. - - :param default: The fallback receive offset. - :type default: ~azure.eventhub.common.Offset - :rtype: ~azure.eventhub.common.Offset - """ - if self.offset is not None: - return Offset(self.offset).selector() - return default diff --git a/azure/eventhub/sender.py b/azure/eventhub/sender.py index 358a336..ff358d0 100644 --- a/azure/eventhub/sender.py +++ b/azure/eventhub/sender.py @@ -6,7 +6,7 @@ from uamqp import constants, errors from uamqp import SendClient -from azure.eventhub.common import EventHubError +from azure.eventhub.common import EventHubError, _error_handler class Sender: @@ -24,21 +24,26 @@ def __init__(self, client, target, partition=None): :param target: The URI of the EventHub to send to. :type target: str """ + self.client = client + self.target = target + self.partition = partition self.redirected = None self.error = None - self.debug = client.debug - self.partition = partition + self.retry_policy = errors.ErrorPolicy(max_retries=3, on_error=_error_handler) if partition: - target += "/Partitions/" + partition + self.target += "/Partitions/" + partition self._handler = SendClient( - target, - auth=client.auth, - debug=self.debug, - msg_timeout=Sender.TIMEOUT) + self.target, + auth=self.client.get_auth(), + debug=self.client.debug, + msg_timeout=Sender.TIMEOUT, + error_policy=self.retry_policy, + keep_alive_interval=30, + properties=self.client.create_properties()) self._outcome = None self._condition = None - def open(self, connection): + def open(self): """ Open the Sender using the supplied conneciton. If the handler has previously been redirected, the redirect @@ -48,12 +53,37 @@ def open(self, connection): :type: connection: ~uamqp.connection.Connection """ if self.redirected: + self.target = self.redirected.address self._handler = SendClient( - self.redirected.address, - auth=None, - debug=self.debug, - msg_timeout=Sender.TIMEOUT) - self._handler.open(connection) + self.target, + auth=self.client.get_auth(), + debug=self.client.debug, + msg_timeout=Sender.TIMEOUT, + error_policy=self.retry_policy, + keep_alive_interval=30, + properties=self.client.create_properties()) + self._handler.open() + while not self.has_started(): + self._handler._connection.work() # pylint: disable=protected-access + + def reconnect(self): + """If the Sender was disconnected from the service with + a retryable error - attempt to reconnect.""" + # pylint: disable=protected-access + pending_states = (constants.MessageState.WaitingForSendAck, constants.MessageState.WaitingToBeSent) + unsent_events = [e for e in self._handler._pending_messages if e.state in pending_states] + self._handler.close() + self._handler = SendClient( + self.target, + auth=self.client.get_auth(), + debug=self.client.debug, + msg_timeout=Sender.TIMEOUT, + error_policy=self.retry_policy, + keep_alive_interval=30, + properties=self.client.create_properties()) + self._handler.open() + self._handler._pending_messages = unsent_events + self._handler.wait() def get_handler_state(self): """ @@ -130,10 +160,19 @@ def send(self, event_data): self._handler.send_message(event_data.message) if self._outcome != constants.MessageSendResult.Ok: raise Sender._error(self._outcome, self._condition) - except errors.LinkDetach as detach: - error = EventHubError(str(detach)) + except errors.MessageException as failed: + error = EventHubError(str(failed), failed) self.close(exception=error) raise error + except (errors.LinkDetach, errors.ConnectionClose) as shutdown: + if shutdown.action.retry: + self.reconnect() + else: + error = EventHubError(str(shutdown), shutdown) + self.close(exception=error) + raise error + except errors.MessageHandlerError: + self.reconnect() except Exception as e: error = EventHubError("Send failed: {}".format(e)) self.close(exception=error) @@ -167,6 +206,15 @@ def wait(self): raise self.error try: self._handler.wait() + except (errors.LinkDetach, errors.ConnectionClose) as shutdown: + if shutdown.action.retry: + self.reconnect() + else: + error = EventHubError(str(shutdown), shutdown) + self.close(exception=error) + raise error + except errors.MessageHandlerError: + self.reconnect() except Exception as e: raise EventHubError("Send failed: {}".format(e)) diff --git a/azure/eventprocessorhost/eh_partition_pump.py b/azure/eventprocessorhost/eh_partition_pump.py index 86c42d2..4801a25 100644 --- a/azure/eventprocessorhost/eh_partition_pump.py +++ b/azure/eventprocessorhost/eh_partition_pump.py @@ -66,7 +66,8 @@ async def open_clients_async(self): # Create event hub client and receive handler and set options self.eh_client = EventHubClientAsync( self.host.eh_config.client_address, - debug=self.host.eph_options.debug_trace) + debug=self.host.eph_options.debug_trace, + http_proxy=self.host.eph_options.http_proxy) self.partition_receive_handler = self.eh_client.add_async_receiver( self.partition_context.consumer_group_name, self.partition_context.partition_id, diff --git a/azure/eventprocessorhost/eph.py b/azure/eventprocessorhost/eph.py index 27cbc3e..7c7541e 100644 --- a/azure/eventprocessorhost/eph.py +++ b/azure/eventprocessorhost/eph.py @@ -73,3 +73,4 @@ def __init__(self): self.release_pump_on_timeout = False self.initial_offset_provider = "-1" self.debug_trace = False + self.http_proxy = None diff --git a/azure/eventprocessorhost/partition_context.py b/azure/eventprocessorhost/partition_context.py index 9eaf53f..b21514b 100644 --- a/azure/eventprocessorhost/partition_context.py +++ b/azure/eventprocessorhost/partition_context.py @@ -34,7 +34,7 @@ def set_offset_and_sequence_number(self, event_data): """ if not event_data: raise Exception(event_data) - self.offset = event_data.offset + self.offset = event_data.offset.value self.sequence_number = event_data.sequence_number async def get_initial_offset_async(self): # throws InterruptedException, ExecutionException @@ -84,7 +84,7 @@ async def checkpoint_async_event_data(self, event_data): raise ValueError("Argument Out Of Range event_data x-opt-sequence-number") await self.persist_checkpoint_async(Checkpoint(self.partition_id, - event_data.offset, + event_data.offset.value, event_data.sequence_number)) def to_string(self): diff --git a/azure/eventprocessorhost/partition_manager.py b/azure/eventprocessorhost/partition_manager.py index 2ae402e..7025fe0 100644 --- a/azure/eventprocessorhost/partition_manager.py +++ b/azure/eventprocessorhost/partition_manager.py @@ -35,15 +35,18 @@ async def get_partition_ids_async(self): :rtype: list[str] """ if not self.partition_ids: - eh_client = EventHubClientAsync( - self.host.eh_config.client_address, - debug=self.host.eph_options.debug_trace) try: - eh_info = await eh_client.get_eventhub_info_async() - self.partition_ids = eh_info['partition_ids'] - except Exception as err: # pylint: disable=broad-except - raise Exception("Failed to get partition ids", repr(err)) - + eh_client = EventHubClientAsync( + self.host.eh_config.client_address, + debug=self.host.eph_options.debug_trace, + http_proxy=self.host.eph_options.http_proxy) + try: + eh_info = await eh_client.get_eventhub_info_async() + self.partition_ids = eh_info['partition_ids'] + except Exception as err: # pylint: disable=broad-except + raise Exception("Failed to get partition ids", repr(err)) + finally: + await eh_client.stop_async() return self.partition_ids async def start_async(self): diff --git a/conftest.py b/conftest.py index 6932e27..96d135d 100644 --- a/conftest.py +++ b/conftest.py @@ -79,7 +79,7 @@ def invalid_policy(): @pytest.fixture() def iot_connection_str(): try: - return os.environ['IOT_HUB_CONNECTION_STR'] + return os.environ['IOTHUB_CONNECTION_STR'] except KeyError: pytest.skip("No IotHub connection string found.") @@ -94,7 +94,7 @@ def device_id(): @pytest.fixture() def receivers(connection_str): - client = EventHubClient.from_connection_string(connection_str, debug=True) + client = EventHubClient.from_connection_string(connection_str, debug=False) eh_hub_info = client.get_eventhub_info() partitions = eh_hub_info["partition_ids"] @@ -114,7 +114,7 @@ def receivers(connection_str): @pytest.fixture() def senders(connection_str): - client = EventHubClient.from_connection_string(connection_str, debug=False) + client = EventHubClient.from_connection_string(connection_str, debug=True) eh_hub_info = client.get_eventhub_info() partitions = eh_hub_info["partition_ids"] diff --git a/examples/recv.py b/examples/recv.py index 92a5df2..d2fbdf7 100644 --- a/examples/recv.py +++ b/examples/recv.py @@ -41,7 +41,7 @@ for event_data in receiver.receive(timeout=100): last_offset = event_data.offset last_sn = event_data.sequence_number - print("Received: {}, {}".format(last_offset, last_sn)) + print("Received: {}, {}".format(last_offset.value, last_sn)) total += 1 end_time = time.time() diff --git a/examples/recv_async.py b/examples/recv_async.py index ab8da39..04d9226 100644 --- a/examples/recv_async.py +++ b/examples/recv_async.py @@ -39,7 +39,7 @@ async def pump(client, partition): for event_data in await receiver.receive(timeout=10): last_offset = event_data.offset last_sn = event_data.sequence_number - print("Received: {}, {}".format(last_offset, last_sn)) + print("Received: {}, {}".format(last_offset.value, last_sn)) total += 1 end_time = time.time() run_time = end_time - start_time diff --git a/examples/recv_batch.py b/examples/recv_batch.py index 7ce562d..9478f51 100644 --- a/examples/recv_batch.py +++ b/examples/recv_batch.py @@ -40,7 +40,7 @@ client.run() batched_events = receiver.receive(max_batch_size=10) for event_data in batched_events: - last_offset = event_data.offset + last_offset = event_data.offset.value last_sn = event_data.sequence_number total += 1 print("Partition {}, Received {}, sn={} offset={}".format( diff --git a/features/eventhub.feature b/features/eventhub.feature index c96a998..19bf68b 100644 --- a/features/eventhub.feature +++ b/features/eventhub.feature @@ -5,15 +5,14 @@ Feature: Exercising EventHub SDK -# Scenario: Just sends for 3 days, no receives. Focus on send failures only. - @long-running - Scenario: Generic send and receive on client for 3 days. - Given the EventHub SDK is installed - And an EventHub is created with credentials retrieved - When I send and receive messages for 72 hours + Scenario: Just sends for 3 days, no receives. Focus on send failures only. + Given The EventHub SDK is installed + And An EventHub is created with credentials retrieved + When I start a message sender + And I send messages for 72 hours Then I should receive no errors - And I can shutdown the sender and receiver cleanly + And I can shutdown sender And I remove the EventHub # Scenario: Sender stays idle for 45 minutes and sends some number of messages after each idle duration. diff --git a/features/steps/eventhub.py b/features/steps/eventhub.py index b92bdc5..01cda1d 100644 --- a/features/steps/eventhub.py +++ b/features/steps/eventhub.py @@ -5,6 +5,7 @@ import asyncio import uuid +import functools from behave import * @@ -24,7 +25,22 @@ def step_impl(context): def step_impl(context, properties): #from mgmt_settings_real import get_credentials, SUBSCRIPTION_ID #rg, mgmt_client = test_utils.create_mgmt_client(get_credentials(), SUBSCRIPTION_ID) - context.eh_config = test_utils.get_eventhub_config() + _, prop = properties.split(' ') + if prop == '100TU': + context.eh_config = test_utils.get_eventhub_100TU_config() + else: + raise ValueError("Unrecognised property: {}".format(prop)) + +@When('I start a message sender') +def step_impl(context): + from azure.eventhub import EventHubClient + address = "sb://{}/{}".format(context.eh_config['hostname'], context.eh_config['event_hub']) + context.client = EventHubClient( + address, + username=context.eh_config['key_name'], + password=context.eh_config['access_key']) + context.sender = client.add_sender() + context.client.run() @when('I {clients} messages for {hours} hours') def step_impl(context, clients, hours): diff --git a/features/steps/test_utils.py b/features/steps/test_utils.py index 31eb7b5..7cc4d34 100644 --- a/features/steps/test_utils.py +++ b/features/steps/test_utils.py @@ -4,6 +4,8 @@ # -------------------------------------------------------------------------------------------- import uuid +import time +import asyncio def create_mgmt_client(credentials, subscription, location='westus'): from azure.mgmt.resource import ResourceManagementClient @@ -32,3 +34,64 @@ def get_eventhub_config(): config['consumer_group'] = "$Default" config['partition'] = "0" return config + + +def get_eventhub_100TU_config(): + config = {} + config['hostname'] = os.environ['EVENT_HUB_100TU_HOSTNAME'] + config['event_hub'] = os.environ['EVENT_HUB_100TU_NAME'] + config['key_name'] = os.environ['EVENT_HUB_100TU_SAS_POLICY'] + config['access_key'] = os.environ['EVENT_HUB_100TU_SAS_KEY'] + config['consumer_group'] = "$Default" + config['partition'] = "0" + return config + + +def send_constant_messages(sender, timeout, payload=1024): + deadline = time.time() + total = 0 + while time.time() < deadline: + data = EventData(body=b"D" * payload) + sender.send(data) + total += 1 + return total + + +def send_constant_async_messages(sender, timeout, batch_size=10000, payload=1024): + deadline = time.time() + total = 0 + while time.time() < deadline: + data = EventData(body=b"D" * args.payload) + sender.transfer(data) + total += 1 + if total % 10000 == 0: + sender.wait() + return total + + +def send_constant_async_messages(sender, timeout, batch_size=1, payload=1024): + deadline = time.time() + while time.time() < deadline: + if batch_size > 1: + data = EventData(batch=data_generator()) + else: + data = EventData(body=b"D" * payload) + + +async def receive_pump(receiver, timeout, validation=True): + total = 0 + deadline = time.time() + timeout + sequence = 0 + offset = None + while time.time() < deadline: + batch = await receiver.receive(timeout=5) + total += len(batch) + if validation: + assert receiver.offset + for event in batch: + next_sequence = event.sequence_number + assert next_sequence > sequence, "Received Event with lower sequence number than previous." + assert (next_sequence - sequence) == 1, "Sequence number skipped by a value great than 1." + sequence = next_sequence + msg_data = b"".join([b for b in event.body]).decode('UTF-8') + assert json.loads(msg_data), "Unable to deserialize Event data." diff --git a/setup.py b/setup.py index 9fce5a2..df46435 100644 --- a/setup.py +++ b/setup.py @@ -55,7 +55,7 @@ zip_safe=False, packages=find_packages(exclude=["examples", "tests"]), install_requires=[ - 'uamqp~=0.1.0', + 'uamqp~=0.2.0', 'msrestazure~=0.4.11', 'azure-common~=1.1', 'azure-storage~=0.36.0' diff --git a/tests/__init__.py b/tests/__init__.py index 7b7c91a..7ec7d3b 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -12,7 +12,7 @@ def get_logger(filename, level=logging.INFO): azure_logger = logging.getLogger("azure") azure_logger.setLevel(level) uamqp_logger = logging.getLogger("uamqp") - uamqp_logger.setLevel(logging.INFO) + uamqp_logger.setLevel(logging.DEBUG) formatter = logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s') console_handler = logging.StreamHandler(stream=sys.stdout) diff --git a/tests/test_iothub_receive.py b/tests/test_iothub_receive.py index 78c1de8..ced6858 100644 --- a/tests/test_iothub_receive.py +++ b/tests/test_iothub_receive.py @@ -11,11 +11,13 @@ from azure import eventhub from azure.eventhub import EventData, EventHubClient, Offset -def test_iothub_receive(iot_connection_str, device_id): +def test_iothub_receive_sync(iot_connection_str, device_id): client = EventHubClient.from_iothub_connection_string(iot_connection_str, debug=True) receiver = client.add_receiver("$default", "0", operation='/messages/events') try: client.run() + partitions = client.get_eventhub_info() + assert partitions["partition_ids"] == ["0", "1", "2", "3"] received = receiver.receive(timeout=5) assert len(received) == 0 finally: diff --git a/tests/test_iothub_receive_async.py b/tests/test_iothub_receive_async.py index a7126d3..d26c00c 100644 --- a/tests/test_iothub_receive_async.py +++ b/tests/test_iothub_receive_async.py @@ -24,43 +24,29 @@ async def pump(receiver, sleep=None): return messages -@pytest.mark.asyncio -async def test_iothub_receive_async(iot_connection_str): - client = EventHubClientAsync.from_iothub_connection_string(iot_connection_str, debug=True) - receivers = [] - for i in range(2): - receivers.append(client.add_async_receiver("$default", "0", prefetch=1000, operation='/messages/events')) - await client.run_async() +async def get_partitions(iot_connection_str): try: - outputs = await asyncio.gather( - pump(receivers[0]), - pump(receivers[1]), - return_exceptions=True) - - assert isinstance(outputs[0], int) and outputs[0] == 0 - assert isinstance(outputs[1], int) and outputs[1] == 0 - except: - raise + client = EventHubClientAsync.from_iothub_connection_string(iot_connection_str, debug=True) + client.add_async_receiver("$default", "0", prefetch=1000, operation='/messages/events') + await client.run_async() + partitions = await client.get_eventhub_info_async() + return partitions["partition_ids"] finally: await client.stop_async() @pytest.mark.asyncio -async def test_iothub_receive_detach_async(iot_connection_str): +async def test_iothub_receive_multiple_async(iot_connection_str): + partitions = await get_partitions(iot_connection_str) client = EventHubClientAsync.from_iothub_connection_string(iot_connection_str, debug=True) - receivers = [] - for i in range(2): - receivers.append(client.add_async_receiver("$default", str(i), prefetch=1000, operation='/messages/events')) - await client.run_async() try: - outputs = await asyncio.gather( - pump(receivers[0]), - pump(receivers[1]), - return_exceptions=True) + receivers = [] + for p in partitions: + receivers.append(client.add_async_receiver("$default", p, prefetch=1000, operation='/messages/events')) + await client.run_async() + outputs = await asyncio.gather(*[pump(r) for r in receivers]) assert isinstance(outputs[0], int) and outputs[0] == 0 - assert isinstance(outputs[1], EventHubError) - except: - raise + assert isinstance(outputs[1], int) and outputs[1] == 0 finally: - await client.stop_async() \ No newline at end of file + await client.stop_async() diff --git a/tests/test_longrunning_receive.py b/tests/test_longrunning_receive.py index 60007a5..b32731b 100644 --- a/tests/test_longrunning_receive.py +++ b/tests/test_longrunning_receive.py @@ -48,7 +48,7 @@ async def pump(_pid, receiver, _args, _dl): _pid, total, batch[-1].sequence_number, - batch[-1].offset)) + batch[-1].offset.value)) print("{}: total received {}".format( _pid, total)) diff --git a/tests/test_longrunning_send_async.py b/tests/test_longrunning_send_async.py new file mode 100644 index 0000000..afc13fa --- /dev/null +++ b/tests/test_longrunning_send_async.py @@ -0,0 +1,111 @@ +#!/usr/bin/env python + +""" +send test +""" + +import logging +import argparse +import time +import threading +import os +import asyncio + +from azure.eventhub import EventHubClientAsync, EventData + +try: + import tests + logger = tests.get_logger("send_test.log", logging.INFO) +except ImportError: + logger = logging.getLogger("uamqp") + logger.setLevel(logging.INFO) + + +def check_send_successful(outcome, condition): + if outcome.value != 0: + print("Send failed {}".format(condition)) + + +async def get_partitions(args): + #client = EventHubClientAsync.from_connection_string( + # args.conn_str, + # eventhub=args.eventhub, debug=True) + eh_data = await args.get_eventhub_info_async() + return eh_data["partition_ids"] + + +async def pump(pid, sender, args, duration): + deadline = time.time() + duration + total = 0 + + def data_generator(): + for i in range(args.batch): + yield b"D" * args.payload + + if args.batch > 1: + logger.error("Sending batched messages") + else: + logger.error("Sending single messages") + + try: + while time.time() < deadline: + if args.batch > 1: + data = EventData(batch=data_generator()) + else: + data = EventData(body=b"D" * args.payload) + sender.transfer(data, callback=check_send_successful) + total += args.batch + if total % 10000 == 0: + await sender.wait_async() + logger.error("Send total {}".format(total)) + except Exception as err: + logger.error("Send failed {}".format(err)) + logger.error("Sent total {}".format(total)) + + +def test_long_running_partition_send_async(): + parser = argparse.ArgumentParser() + parser.add_argument("--duration", help="Duration in seconds of the test", type=int, default=30) + parser.add_argument("--payload", help="payload size", type=int, default=512) + parser.add_argument("--batch", help="Number of events to send and wait", type=int, default=1) + parser.add_argument("--partitions", help="Comma seperated partition IDs") + parser.add_argument("--conn-str", help="EventHub connection string", default=os.environ.get('EVENT_HUB_CONNECTION_STR')) + parser.add_argument("--eventhub", help="Name of EventHub") + parser.add_argument("--address", help="Address URI to the EventHub entity") + parser.add_argument("--sas-policy", help="Name of the shared access policy to authenticate with") + parser.add_argument("--sas-key", help="Shared access key") + + loop = asyncio.get_event_loop() + args, _ = parser.parse_known_args() + if args.conn_str: + client = EventHubClientAsync.from_connection_string( + args.conn_str, + eventhub=args.eventhub, debug=True) + elif args.address: + client = EventHubClient( + args.address, + username=args.sas_policy, + password=args.sas_key) + else: + try: + import pytest + pytest.skip("Must specify either '--conn-str' or '--address'") + except ImportError: + raise ValueError("Must specify either '--conn-str' or '--address'") + + try: + if not args.partitions: + partitions = loop.run_until_complete(get_partitions(client)) + else: + partitions = args.partitions.split(",") + pumps = [] + for pid in partitions: + sender = client.add_async_sender(partition=pid) + pumps.append(pump(pid, sender, args, args.duration)) + loop.run_until_complete(client.run_async()) + loop.run_until_complete(asyncio.gather(*pumps)) + finally: + loop.run_until_complete(client.stop_async()) + +if __name__ == '__main__': + test_long_running_partition_send_async() diff --git a/tests/test_negative.py b/tests/test_negative.py index 753f2e2..dbc8096 100644 --- a/tests/test_negative.py +++ b/tests/test_negative.py @@ -181,7 +181,7 @@ def test_receive_from_invalid_partitions_sync(connection_str): async def test_receive_from_invalid_partitions_async(connection_str): partitions = ["XYZ", "-1", "1000", "-" ] for p in partitions: - client = EventHubClientAsync.from_connection_string(connection_str, debug=False) + client = EventHubClientAsync.from_connection_string(connection_str, debug=True) receiver = client.add_async_receiver("$default", p) try: with pytest.raises(EventHubError): @@ -221,7 +221,7 @@ async def test_send_to_invalid_partitions_async(connection_str): def test_send_too_large_message(connection_str): - client = EventHubClient.from_connection_string(connection_str, debug=False) + client = EventHubClient.from_connection_string(connection_str, debug=True) sender = client.add_sender() try: client.run() @@ -299,6 +299,8 @@ async def test_max_receivers_async(connection_str, senders): pump(receivers[5]), return_exceptions=True) print(outputs) - assert len([o for o in outputs if isinstance(o, EventHubError)]) == 1 + failed = [o for o in outputs if isinstance(o, EventHubError)] + assert len(failed) == 1 + print(failed[0].message) finally: await client.stop_async() \ No newline at end of file diff --git a/tests/test_receive.py b/tests/test_receive.py index 44cb7b2..1b7480e 100644 --- a/tests/test_receive.py +++ b/tests/test_receive.py @@ -31,11 +31,15 @@ def test_receive_end_of_stream(connection_str, senders): client.stop() -def test_receive_with_offset(connection_str, senders): +def test_receive_with_offset_sync(connection_str, senders): client = EventHubClient.from_connection_string(connection_str, debug=False) + partitions = client.get_eventhub_info() + assert partitions["partition_ids"] == ["0", "1"] receiver = client.add_receiver("$default", "0", offset=Offset('@latest')) try: client.run() + more_partitions = client.get_eventhub_info() + assert more_partitions["partition_ids"] == ["0", "1"] received = receiver.receive(timeout=5) assert len(received) == 0 @@ -44,7 +48,7 @@ def test_receive_with_offset(connection_str, senders): assert len(received) == 1 offset = received[0].offset - offset_receiver = client.add_receiver("$default", "0", offset=Offset(offset)) + offset_receiver = client.add_receiver("$default", "0", offset=offset) client.run() received = offset_receiver.receive(timeout=5) assert len(received) == 0 @@ -71,7 +75,7 @@ def test_receive_with_inclusive_offset(connection_str, senders): assert len(received) == 1 offset = received[0].offset - offset_receiver = client.add_receiver("$default", "0", offset=Offset(offset, inclusive=True)) + offset_receiver = client.add_receiver("$default", "0", offset=Offset(offset.value, inclusive=True)) client.run() received = offset_receiver.receive(timeout=5) assert len(received) == 1 @@ -83,10 +87,13 @@ def test_receive_with_inclusive_offset(connection_str, senders): def test_receive_with_datetime(connection_str, senders): client = EventHubClient.from_connection_string(connection_str, debug=False) + partitions = client.get_eventhub_info() + assert partitions["partition_ids"] == ["0", "1"] receiver = client.add_receiver("$default", "0", offset=Offset('@latest')) try: client.run() - + more_partitions = client.get_eventhub_info() + assert more_partitions["partition_ids"] == ["0", "1"] received = receiver.receive(timeout=5) assert len(received) == 0 senders[0].send(EventData(b"Data")) diff --git a/tests/test_receive_async.py b/tests/test_receive_async.py index d002674..267e82c 100644 --- a/tests/test_receive_async.py +++ b/tests/test_receive_async.py @@ -46,7 +46,7 @@ async def test_receive_with_offset_async(connection_str, senders): assert len(received) == 1 offset = received[0].offset - offset_receiver = client.add_async_receiver("$default", "0", offset=Offset(offset)) + offset_receiver = client.add_async_receiver("$default", "0", offset=offset) await client.run_async() received = await offset_receiver.receive(timeout=5) assert len(received) == 0 @@ -73,7 +73,7 @@ async def test_receive_with_inclusive_offset_async(connection_str, senders): assert len(received) == 1 offset = received[0].offset - offset_receiver = client.add_async_receiver("$default", "0", offset=Offset(offset, inclusive=True)) + offset_receiver = client.add_async_receiver("$default", "0", offset=Offset(offset.value, inclusive=True)) await client.run_async() received = await offset_receiver.receive(timeout=5) assert len(received) == 1 @@ -216,11 +216,15 @@ async def test_epoch_receiver_async(connection_str, senders): @pytest.mark.asyncio async def test_multiple_receiver_async(connection_str, senders): client = EventHubClientAsync.from_connection_string(connection_str, debug=True) + partitions = await client.get_eventhub_info_async() + assert partitions["partition_ids"] == ["0", "1"] receivers = [] for i in range(2): receivers.append(client.add_async_receiver("$default", "0", prefetch=10)) try: await client.run_async() + more_partitions = await client.get_eventhub_info_async() + assert more_partitions["partition_ids"] == ["0", "1"] outputs = await asyncio.gather( pump(receivers[0]), pump(receivers[1]), diff --git a/tests/test_reconnect.py b/tests/test_reconnect.py new file mode 100644 index 0000000..a6aa0ce --- /dev/null +++ b/tests/test_reconnect.py @@ -0,0 +1,128 @@ +#------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +#-------------------------------------------------------------------------- + +import os +import time +import asyncio +import pytest + +from azure import eventhub +from azure.eventhub import ( + EventHubClientAsync, + EventData, + Offset, + EventHubError, + EventHubClient) + + +def test_send_with_long_interval_sync(connection_str, receivers): + #pytest.skip("long running") + client = EventHubClient.from_connection_string(connection_str, debug=True) + sender = client.add_sender() + try: + client.run() + sender.send(EventData(b"A single event")) + for _ in range(2): + time.sleep(300) + sender.send(EventData(b"A single event")) + finally: + client.stop() + + received = [] + for r in receivers: + received.extend(r.receive(timeout=1)) + + assert len(received) == 3 + assert list(received[0].body)[0] == b"A single event" + + +@pytest.mark.asyncio +async def test_send_with_long_interval_async(connection_str, receivers): + #pytest.skip("long running") + client = EventHubClientAsync.from_connection_string(connection_str, debug=True) + sender = client.add_async_sender() + try: + await client.run_async() + await sender.send(EventData(b"A single event")) + for _ in range(2): + await asyncio.sleep(300) + await sender.send(EventData(b"A single event")) + finally: + await client.stop_async() + + received = [] + for r in receivers: + received.extend(r.receive(timeout=1)) + assert len(received) == 3 + assert list(received[0].body)[0] == b"A single event" + + +def test_send_with_forced_conn_close_sync(connection_str, receivers): + #pytest.skip("long running") + client = EventHubClient.from_connection_string(connection_str, debug=True) + sender = client.add_sender() + try: + client.run() + sender.send(EventData(b"A single event")) + sender._handler._message_sender.destroy() + time.sleep(300) + sender.send(EventData(b"A single event")) + sender.send(EventData(b"A single event")) + sender._handler._message_sender.destroy() + time.sleep(300) + sender.send(EventData(b"A single event")) + sender.send(EventData(b"A single event")) + finally: + client.stop() + + received = [] + for r in receivers: + received.extend(r.receive(timeout=1)) + assert len(received) == 5 + assert list(received[0].body)[0] == b"A single event" + + +@pytest.mark.asyncio +async def test_send_with_forced_conn_close_async(connection_str, receivers): + #pytest.skip("long running") + client = EventHubClientAsync.from_connection_string(connection_str, debug=True) + sender = client.add_async_sender() + try: + await client.run_async() + await sender.send(EventData(b"A single event")) + sender._handler._message_sender.destroy() + await asyncio.sleep(300) + await sender.send(EventData(b"A single event")) + await sender.send(EventData(b"A single event")) + sender._handler._message_sender.destroy() + await asyncio.sleep(300) + await sender.send(EventData(b"A single event")) + await sender.send(EventData(b"A single event")) + finally: + await client.stop_async() + + received = [] + for r in receivers: + received.extend(r.receive(timeout=1)) + assert len(received) == 5 + assert list(received[0].body)[0] == b"A single event" + + +# def test_send_with_forced_link_detach(connection_str, receivers): +# client = EventHubClient.from_connection_string(connection_str, debug=True) +# sender = client.add_sender() +# size = 20 * 1024 +# try: +# client.run() +# for i in range(1000): +# sender.transfer(EventData([b"A"*size, b"B"*size, b"C"*size, b"D"*size, b"A"*size, b"B"*size, b"C"*size, b"D"*size, b"A"*size, b"B"*size, b"C"*size, b"D"*size])) +# sender.wait() +# finally: +# client.stop() + +# received = [] +# for r in receivers: +# received.extend(r.receive(timeout=10)) diff --git a/tests/test_send.py b/tests/test_send.py index faf116f..a74c3d1 100644 --- a/tests/test_send.py +++ b/tests/test_send.py @@ -164,8 +164,8 @@ def batched(): assert len(partition_1) == 10 -def test_send_array(connection_str, receivers): - client = EventHubClient.from_connection_string(connection_str, debug=False) +def test_send_array_sync(connection_str, receivers): + client = EventHubClient.from_connection_string(connection_str, debug=True) sender = client.add_sender() try: client.run()