From 47b821499d1225d1e93d5150240d8a7bebe802f2 Mon Sep 17 00:00:00 2001 From: begoldsm Date: Mon, 1 May 2017 09:44:11 -0700 Subject: [PATCH] Update Retry logic to handle throttling scenarios This change enables upload and download to more reliably complete successfully in the face of intermittent server-side throttling by enacting a back-off retry strategy. Note that this strategy is not guaranteed to work every time. For example, if the server is under extremely heavy load and continues to reject requests, the retry strategy will eventually "give up" and return an error back to the user. --- HISTORY.rst | 4 ++++ azure/datalake/store/__init__.py | 2 +- azure/datalake/store/core.py | 12 +++++------ azure/datalake/store/multithread.py | 31 ++++++++++++++++++++++------- 4 files changed, 35 insertions(+), 14 deletions(-) diff --git a/HISTORY.rst b/HISTORY.rst index 1ad5c3f..02af67d 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -2,6 +2,10 @@ Release History =============== +0.0.8 (2017-04-26) +------------------ +* Fix server-side throttling retry support. This is not a guarantee that if the server is throttling the upload (or download) it will eventually succeed, but there is now a back-off retry in place to make it more likely. + 0.0.7 (2017-04-19) ------------------ * Update the build process to more efficiently handle multi-part namespaces for pip. diff --git a/azure/datalake/store/__init__.py b/azure/datalake/store/__init__.py index f4e3bdc..495438f 100644 --- a/azure/datalake/store/__init__.py +++ b/azure/datalake/store/__init__.py @@ -6,7 +6,7 @@ # license information. # -------------------------------------------------------------------------- -__version__ = "0.0.7" +__version__ = "0.0.8" from .core import AzureDLFileSystem from .multithread import ADLDownloader diff --git a/azure/datalake/store/core.py b/azure/datalake/store/core.py index 57501c0..036d66a 100644 --- a/azure/datalake/store/core.py +++ b/azure/datalake/store/core.py @@ -916,15 +916,15 @@ def _fetch_range(rest, path, start, end, stream=False): def _fetch_range_with_retry(rest, path, start, end, stream=False, retries=10, - delay=0.01, backoff=1): + delay=0.01, backoff=3): err = None for i in range(retries): try: return _fetch_range(rest, path, start, end, stream=False) except Exception as e: err = e - logger.debug('Exception %s on ADL download, retrying in %s seconds', - repr(err), delay, exc_info=True) + logger.debug('Exception %s on ADL download on attempt: %s, retrying in %s seconds', + repr(err), i, delay, exc_info=True) time.sleep(delay) delay *= backoff exception = RuntimeError('Max number of ADL retries exceeded: exception ' + repr(err)) @@ -936,7 +936,7 @@ def _put_data(rest, op, path, data, **kwargs): return rest.call(op, path=path, data=data, **kwargs) -def _put_data_with_retry(rest, op, path, data, retries=10, delay=0.01, backoff=1, +def _put_data_with_retry(rest, op, path, data, retries=10, delay=0.01, backoff=3, **kwargs): err = None for i in range(retries): @@ -953,8 +953,8 @@ def _put_data_with_retry(rest, op, path, data, retries=10, delay=0.01, backoff=1 return except Exception as e: err = e - logger.debug('Exception %s on ADL upload, retrying in %s seconds', - repr(err), delay, exc_info=True) + logger.debug('Exception %s on ADL upload on attempt: %s, retrying in %s seconds', + repr(err), i, delay, exc_info=True) time.sleep(delay) delay *= backoff exception = RuntimeError('Max number of ADL retries exceeded: exception ' + repr(err)) diff --git a/azure/datalake/store/multithread.py b/azure/datalake/store/multithread.py index 4737f67..4f3c7c3 100644 --- a/azure/datalake/store/multithread.py +++ b/azure/datalake/store/multithread.py @@ -246,12 +246,13 @@ def __str__(self): def get_chunk(adlfs, src, dst, offset, size, buffersize, blocksize, - shutdown_event=None, retries=10, delay=0.01, backoff=1): + shutdown_event=None, retries=10, delay=0.01, backoff=3): """ Download a piece of a remote file and write locally Internal function used by `download`. """ err = None + total_bytes_downloaded = 0 for i in range(retries): try: nbytes = 0 @@ -261,22 +262,38 @@ def get_chunk(adlfs, src, dst, offset, size, buffersize, blocksize, fout.seek(offset) for chunk in response.iter_content(chunk_size=blocksize): if shutdown_event and shutdown_event.is_set(): - return nbytes, None + return total_bytes_downloaded, None if chunk: nwritten = fout.write(chunk) if nwritten: nbytes += nwritten - logger.debug('Downloaded to %s, byte offset %s', dst, offset) - return nbytes, None + logger.debug('Downloaded %s bytes to %s, byte offset %s', nbytes, dst, offset) + + # There are certain cases where we will be throttled and recieve less than the expected amount of data. + # In these cases, instead of failing right away, instead indicate a retry is occuring and update offset and + # size to attempt another read to get the rest of the data. We will only do this if the amount of bytes read + # is less than size, because if somehow we recieved too much data we are not clear on how to proceed. + if nbytes < size: + errMsg = 'Did not recieve total bytes requested from server. This can be due to server side throttling and will be retried. Data Expected: {}. Data Received: {}.'.format(size, nbytes) + size -= nbytes + offset += nbytes + total_bytes_downloaded += nbytes + raise IOError(errMsg) + elif nbytes > size: + raise IOError('Received more bytes than expected from the server. Expected: {}. Received: {}.'.format(size, nbytes)) + else: + total_bytes_downloaded += nbytes + + return total_bytes_downloaded, None except Exception as e: err = e - logger.debug('Exception %s on ADL download, retrying in %s seconds', - repr(err), delay, exc_info=True) + logger.debug('Exception %s on ADL download on attempt: %s, retrying in %s seconds', + repr(err), i, delay, exc_info=True) time.sleep(delay) delay *= backoff exception = RuntimeError('Max number of ADL retries exceeded: exception ' + repr(err)) logger.error('Download failed %s; %s', dst, repr(exception)) - return nbytes, exception + return total_bytes_downloaded, exception class ADLUploader(object):