Skip to content

Commit

Permalink
Update Retry logic to handle throttling scenarios
Browse files Browse the repository at this point in the history
This change enables upload and download to more reliably complete
successfully in the face of intermittent server-side throttling by
enacting a back-off retry strategy. Note that this strategy is not
guaranteed to work every time. For example, if the server is under
extremely heavy load and continues to reject requests, the retry
strategy will eventually "give up" and return an error back to the user.
  • Loading branch information
begoldsm committed May 1, 2017
1 parent 571ce92 commit 47b8214
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 14 deletions.
4 changes: 4 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
Release History
===============
0.0.8 (2017-04-26)
------------------
* Fix server-side throttling retry support. This is not a guarantee that if the server is throttling the upload (or download) it will eventually succeed, but there is now a back-off retry in place to make it more likely.

0.0.7 (2017-04-19)
------------------
* Update the build process to more efficiently handle multi-part namespaces for pip.
Expand Down
2 changes: 1 addition & 1 deletion azure/datalake/store/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# license information.
# --------------------------------------------------------------------------

__version__ = "0.0.7"
__version__ = "0.0.8"

from .core import AzureDLFileSystem
from .multithread import ADLDownloader
Expand Down
12 changes: 6 additions & 6 deletions azure/datalake/store/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -916,15 +916,15 @@ def _fetch_range(rest, path, start, end, stream=False):


def _fetch_range_with_retry(rest, path, start, end, stream=False, retries=10,
delay=0.01, backoff=1):
delay=0.01, backoff=3):
err = None
for i in range(retries):
try:
return _fetch_range(rest, path, start, end, stream=False)
except Exception as e:
err = e
logger.debug('Exception %s on ADL download, retrying in %s seconds',
repr(err), delay, exc_info=True)
logger.debug('Exception %s on ADL download on attempt: %s, retrying in %s seconds',
repr(err), i, delay, exc_info=True)
time.sleep(delay)
delay *= backoff
exception = RuntimeError('Max number of ADL retries exceeded: exception ' + repr(err))
Expand All @@ -936,7 +936,7 @@ def _put_data(rest, op, path, data, **kwargs):
return rest.call(op, path=path, data=data, **kwargs)


def _put_data_with_retry(rest, op, path, data, retries=10, delay=0.01, backoff=1,
def _put_data_with_retry(rest, op, path, data, retries=10, delay=0.01, backoff=3,
**kwargs):
err = None
for i in range(retries):
Expand All @@ -953,8 +953,8 @@ def _put_data_with_retry(rest, op, path, data, retries=10, delay=0.01, backoff=1
return
except Exception as e:
err = e
logger.debug('Exception %s on ADL upload, retrying in %s seconds',
repr(err), delay, exc_info=True)
logger.debug('Exception %s on ADL upload on attempt: %s, retrying in %s seconds',
repr(err), i, delay, exc_info=True)
time.sleep(delay)
delay *= backoff
exception = RuntimeError('Max number of ADL retries exceeded: exception ' + repr(err))
Expand Down
31 changes: 24 additions & 7 deletions azure/datalake/store/multithread.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,12 +246,13 @@ def __str__(self):


def get_chunk(adlfs, src, dst, offset, size, buffersize, blocksize,
shutdown_event=None, retries=10, delay=0.01, backoff=1):
shutdown_event=None, retries=10, delay=0.01, backoff=3):
""" Download a piece of a remote file and write locally
Internal function used by `download`.
"""
err = None
total_bytes_downloaded = 0
for i in range(retries):
try:
nbytes = 0
Expand All @@ -261,22 +262,38 @@ def get_chunk(adlfs, src, dst, offset, size, buffersize, blocksize,
fout.seek(offset)
for chunk in response.iter_content(chunk_size=blocksize):
if shutdown_event and shutdown_event.is_set():
return nbytes, None
return total_bytes_downloaded, None
if chunk:
nwritten = fout.write(chunk)
if nwritten:
nbytes += nwritten
logger.debug('Downloaded to %s, byte offset %s', dst, offset)
return nbytes, None
logger.debug('Downloaded %s bytes to %s, byte offset %s', nbytes, dst, offset)

# There are certain cases where we will be throttled and recieve less than the expected amount of data.
# In these cases, instead of failing right away, instead indicate a retry is occuring and update offset and
# size to attempt another read to get the rest of the data. We will only do this if the amount of bytes read
# is less than size, because if somehow we recieved too much data we are not clear on how to proceed.
if nbytes < size:
errMsg = 'Did not recieve total bytes requested from server. This can be due to server side throttling and will be retried. Data Expected: {}. Data Received: {}.'.format(size, nbytes)
size -= nbytes
offset += nbytes
total_bytes_downloaded += nbytes
raise IOError(errMsg)
elif nbytes > size:
raise IOError('Received more bytes than expected from the server. Expected: {}. Received: {}.'.format(size, nbytes))
else:
total_bytes_downloaded += nbytes

return total_bytes_downloaded, None
except Exception as e:
err = e
logger.debug('Exception %s on ADL download, retrying in %s seconds',
repr(err), delay, exc_info=True)
logger.debug('Exception %s on ADL download on attempt: %s, retrying in %s seconds',
repr(err), i, delay, exc_info=True)
time.sleep(delay)
delay *= backoff
exception = RuntimeError('Max number of ADL retries exceeded: exception ' + repr(err))
logger.error('Download failed %s; %s', dst, repr(exception))
return nbytes, exception
return total_bytes_downloaded, exception


class ADLUploader(object):
Expand Down

0 comments on commit 47b8214

Please sign in to comment.