Skip to content
This repository has been archived by the owner on Oct 12, 2023. It is now read-only.

Commit

Permalink
Fixed datetime offset (#99)
Browse files Browse the repository at this point in the history
* Fixed datetime offset

* Updated pylint

* Removed 3.4 pylint pass
  • Loading branch information
annatisch authored Mar 1, 2019
1 parent ff197f3 commit 9217139
Show file tree
Hide file tree
Showing 16 changed files with 165 additions and 113 deletions.
1 change: 0 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ matrix:
script:
- pytest
- python ./setup.py check -r -s
- pylint --ignore=async_ops azure.eventhub
- os: linux
python: "3.5"
script:
Expand Down
9 changes: 9 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,15 @@
Release History
===============

1.3.1 (2019-02-28)
++++++++++++++++++

**BugFixes**

- Fixed bug where datetime offset filter was using a local timestamp rather than UTC.
- Fixed stackoverflow error in continuous connection reconnect attempts.


1.3.0 (2019-01-29)
++++++++++++++++++

Expand Down
2 changes: 1 addition & 1 deletion azure/eventhub/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

__version__ = "1.3.0"
__version__ = "1.3.1"

from azure.eventhub.common import EventData, EventHubError, Offset
from azure.eventhub.client import EventHubClient
Expand Down
2 changes: 1 addition & 1 deletion azure/eventhub/async_ops/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ async def run_async(self):
if failed and len(failed) == len(self.clients):
log.warning("%r: All clients failed to start.", self.container_id)
raise failed[0]
elif failed:
if failed:
log.warning("%r: %r clients failed to start.", self.container_id, len(failed))
elif redirects:
await self._handle_redirect(redirects)
Expand Down
45 changes: 24 additions & 21 deletions azure/eventhub/async_ops/receiver_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def __init__( # pylint: disable=super-init-not-called
self.keep_alive = keep_alive
self.auto_reconnect = auto_reconnect
self.retry_policy = errors.ErrorPolicy(max_retries=3, on_error=_error_handler)
self.reconnect_backoff = 1
self.redirected = None
self.error = None
self.properties = None
Expand Down Expand Up @@ -107,9 +108,7 @@ async def open_async(self):
while not await self._handler.client_ready_async():
await asyncio.sleep(0.05)

async def reconnect_async(self): # pylint: disable=too-many-statements
"""If the Receiver was disconnected from the service with
a retryable error - attempt to reconnect."""
async def _reconnect_async(self): # pylint: disable=too-many-statements
# pylint: disable=protected-access
alt_creds = {
"username": self.client._auth_config.get("iot_username"),
Expand All @@ -134,6 +133,7 @@ async def reconnect_async(self): # pylint: disable=too-many-statements
await self._handler.open_async()
while not await self._handler.client_ready_async():
await asyncio.sleep(0.05)
return True
except errors.TokenExpired as shutdown:
log.info("AsyncReceiver disconnected due to token expiry. Shutting down.")
error = EventHubError(str(shutdown), shutdown)
Expand All @@ -142,36 +142,39 @@ async def reconnect_async(self): # pylint: disable=too-many-statements
except (errors.LinkDetach, errors.ConnectionClose) as shutdown:
if shutdown.action.retry and self.auto_reconnect:
log.info("AsyncReceiver detached. Attempting reconnect.")
await self.reconnect_async()
else:
log.info("AsyncReceiver detached. Shutting down.")
error = EventHubError(str(shutdown), shutdown)
await self.close_async(exception=error)
raise error
return False
log.info("AsyncReceiver detached. Shutting down.")
error = EventHubError(str(shutdown), shutdown)
await self.close_async(exception=error)
raise error
except errors.MessageHandlerError as shutdown:
if self.auto_reconnect:
log.info("AsyncReceiver detached. Attempting reconnect.")
await self.reconnect_async()
else:
log.info("AsyncReceiver detached. Shutting down.")
error = EventHubError(str(shutdown), shutdown)
await self.close_async(exception=error)
raise error
return False
log.info("AsyncReceiver detached. Shutting down.")
error = EventHubError(str(shutdown), shutdown)
await self.close_async(exception=error)
raise error
except errors.AMQPConnectionError as shutdown:
if str(shutdown).startswith("Unable to open authentication session") and self.auto_reconnect:
log.info("AsyncReceiver couldn't authenticate. Attempting reconnect.")
await self.reconnect_async()
else:
log.info("AsyncReceiver connection error (%r). Shutting down.", e)
error = EventHubError(str(shutdown), shutdown)
await self.close_async(exception=error)
raise error
return False
log.info("AsyncReceiver connection error (%r). Shutting down.", e)
error = EventHubError(str(shutdown), shutdown)
await self.close_async(exception=error)
raise error
except Exception as e:
log.info("Unexpected error occurred (%r). Shutting down.", e)
error = EventHubError("Receiver reconnect failed: {}".format(e))
await self.close_async(exception=error)
raise error

async def reconnect_async(self):
"""If the Receiver was disconnected from the service with
a retryable error - attempt to reconnect."""
while not await self._reconnect_async():
await asyncio.sleep(self.reconnect_backoff)

async def has_started(self):
"""
Whether the handler has completed all start up processes such as
Expand Down
45 changes: 24 additions & 21 deletions azure/eventhub/async_ops/sender_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def __init__( # pylint: disable=super-init-not-called
self.auto_reconnect = auto_reconnect
self.timeout = send_timeout
self.retry_policy = errors.ErrorPolicy(max_retries=3, on_error=_error_handler)
self.reconnect_backoff = 1
self.name = "EHSender-{}".format(uuid.uuid4())
self.redirected = None
self.error = None
Expand Down Expand Up @@ -100,9 +101,7 @@ async def open_async(self):
while not await self._handler.client_ready_async():
await asyncio.sleep(0.05)

async def reconnect_async(self):
"""If the Receiver was disconnected from the service with
a retryable error - attempt to reconnect."""
async def _reconnect_async(self):
await self._handler.close_async()
unsent_events = self._handler.pending_messages
self._handler = SendClientAsync(
Expand All @@ -119,6 +118,7 @@ async def reconnect_async(self):
await self._handler.open_async()
self._handler.queue_message(*unsent_events)
await self._handler.wait_async()
return True
except errors.TokenExpired as shutdown:
log.info("AsyncSender disconnected due to token expiry. Shutting down.")
error = EventHubError(str(shutdown), shutdown)
Expand All @@ -127,36 +127,39 @@ async def reconnect_async(self):
except (errors.LinkDetach, errors.ConnectionClose) as shutdown:
if shutdown.action.retry and self.auto_reconnect:
log.info("AsyncSender detached. Attempting reconnect.")
await self.reconnect_async()
else:
log.info("AsyncSender reconnect failed. Shutting down.")
error = EventHubError(str(shutdown), shutdown)
await self.close_async(exception=error)
raise error
return False
log.info("AsyncSender reconnect failed. Shutting down.")
error = EventHubError(str(shutdown), shutdown)
await self.close_async(exception=error)
raise error
except errors.MessageHandlerError as shutdown:
if self.auto_reconnect:
log.info("AsyncSender detached. Attempting reconnect.")
await self.reconnect_async()
else:
log.info("AsyncSender reconnect failed. Shutting down.")
error = EventHubError(str(shutdown), shutdown)
await self.close_async(exception=error)
raise error
return False
log.info("AsyncSender reconnect failed. Shutting down.")
error = EventHubError(str(shutdown), shutdown)
await self.close_async(exception=error)
raise error
except errors.AMQPConnectionError as shutdown:
if str(shutdown).startswith("Unable to open authentication session") and self.auto_reconnect:
log.info("AsyncSender couldn't authenticate. Attempting reconnect.")
await self.reconnect_async()
else:
log.info("AsyncSender connection error (%r). Shutting down.", e)
error = EventHubError(str(shutdown), shutdown)
await self.close_async(exception=error)
raise error
return False
log.info("AsyncSender connection error (%r). Shutting down.", e)
error = EventHubError(str(shutdown), shutdown)
await self.close_async(exception=error)
raise error
except Exception as e:
log.info("Unexpected error occurred (%r). Shutting down.", e)
error = EventHubError("Sender reconnect failed: {}".format(e))
await self.close_async(exception=error)
raise error

async def reconnect_async(self):
"""If the Receiver was disconnected from the service with
a retryable error - attempt to reconnect."""
while not await self._reconnect_async():
await asyncio.sleep(self.reconnect_backoff)

async def has_started(self):
"""
Whether the handler has completed all start up processes such as
Expand Down
2 changes: 1 addition & 1 deletion azure/eventhub/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ def run(self):
if failed and len(failed) == len(self.clients):
log.warning("%r: All clients failed to start.", self.container_id)
raise failed[0]
elif failed:
if failed:
log.warning("%r: %r clients failed to start.", self.container_id, len(failed))
elif redirects:
self._handle_redirect(redirects)
Expand Down
4 changes: 2 additions & 2 deletions azure/eventhub/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from __future__ import unicode_literals

import datetime
import time
import calendar
import json

import six
Expand Down Expand Up @@ -288,7 +288,7 @@ def selector(self):
"""
operator = ">=" if self.inclusive else ">"
if isinstance(self.value, datetime.datetime):
timestamp = (time.mktime(self.value.timetuple()) * 1000) + (self.value.microsecond/1000)
timestamp = (calendar.timegm(self.value.utctimetuple()) * 1000) + (self.value.microsecond/1000)
return ("amqp.annotation.x-opt-enqueued-time {} '{}'".format(operator, int(timestamp))).encode('utf-8')
if isinstance(self.value, six.integer_types):
return ("amqp.annotation.x-opt-sequence-number {} '{}'".format(operator, self.value)).encode('utf-8')
Expand Down
45 changes: 24 additions & 21 deletions azure/eventhub/receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def __init__(self, client, source, offset=None, prefetch=300, epoch=None, keep_a
self.keep_alive = keep_alive
self.auto_reconnect = auto_reconnect
self.retry_policy = errors.ErrorPolicy(max_retries=3, on_error=_error_handler)
self.reconnect_backoff = 1
self.properties = None
self.redirected = None
self.error = None
Expand Down Expand Up @@ -103,9 +104,7 @@ def open(self):
while not self._handler.client_ready():
time.sleep(0.05)

def reconnect(self): # pylint: disable=too-many-statements
"""If the Receiver was disconnected from the service with
a retryable error - attempt to reconnect."""
def _reconnect(self): # pylint: disable=too-many-statements
# pylint: disable=protected-access
alt_creds = {
"username": self.client._auth_config.get("iot_username"),
Expand All @@ -129,6 +128,7 @@ def reconnect(self): # pylint: disable=too-many-statements
self._handler.open()
while not self._handler.client_ready():
time.sleep(0.05)
return True
except errors.TokenExpired as shutdown:
log.info("Receiver disconnected due to token expiry. Shutting down.")
error = EventHubError(str(shutdown), shutdown)
Expand All @@ -137,36 +137,39 @@ def reconnect(self): # pylint: disable=too-many-statements
except (errors.LinkDetach, errors.ConnectionClose) as shutdown:
if shutdown.action.retry and self.auto_reconnect:
log.info("Receiver detached. Attempting reconnect.")
self.reconnect()
else:
log.info("Receiver detached. Shutting down.")
error = EventHubError(str(shutdown), shutdown)
self.close(exception=error)
raise error
return False
log.info("Receiver detached. Shutting down.")
error = EventHubError(str(shutdown), shutdown)
self.close(exception=error)
raise error
except errors.MessageHandlerError as shutdown:
if self.auto_reconnect:
log.info("Receiver detached. Attempting reconnect.")
self.reconnect()
else:
log.info("Receiver detached. Shutting down.")
error = EventHubError(str(shutdown), shutdown)
self.close(exception=error)
raise error
return False
log.info("Receiver detached. Shutting down.")
error = EventHubError(str(shutdown), shutdown)
self.close(exception=error)
raise error
except errors.AMQPConnectionError as shutdown:
if str(shutdown).startswith("Unable to open authentication session") and self.auto_reconnect:
log.info("Receiver couldn't authenticate. Attempting reconnect.")
self.reconnect()
else:
log.info("Receiver connection error (%r). Shutting down.", e)
error = EventHubError(str(shutdown), shutdown)
self.close(exception=error)
raise error
return False
log.info("Receiver connection error (%r). Shutting down.", e)
error = EventHubError(str(shutdown))
self.close(exception=error)
raise error
except Exception as e:
log.info("Unexpected error occurred (%r). Shutting down.", e)
error = EventHubError("Receiver reconnect failed: {}".format(e))
self.close(exception=error)
raise error

def reconnect(self):
"""If the Receiver was disconnected from the service with
a retryable error - attempt to reconnect."""
while not self._reconnect():
time.sleep(self.reconnect_backoff)

def get_handler_state(self):
"""
Get the state of the underlying handler with regards to start
Expand Down
Loading

0 comments on commit 9217139

Please sign in to comment.