diff --git a/HISTORY.rst b/HISTORY.rst index 1ad5c3f..02af67d 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -2,6 +2,10 @@ Release History =============== +0.0.8 (2017-04-26) +------------------ +* Fix server-side throttling retry support. This is not a guarantee that if the server is throttling the upload (or download) it will eventually succeed, but there is now a back-off retry in place to make it more likely. + 0.0.7 (2017-04-19) ------------------ * Update the build process to more efficiently handle multi-part namespaces for pip. diff --git a/azure/datalake/store/__init__.py b/azure/datalake/store/__init__.py index f4e3bdc..495438f 100644 --- a/azure/datalake/store/__init__.py +++ b/azure/datalake/store/__init__.py @@ -6,7 +6,7 @@ # license information. # -------------------------------------------------------------------------- -__version__ = "0.0.7" +__version__ = "0.0.8" from .core import AzureDLFileSystem from .multithread import ADLDownloader diff --git a/azure/datalake/store/core.py b/azure/datalake/store/core.py index 57501c0..036d66a 100644 --- a/azure/datalake/store/core.py +++ b/azure/datalake/store/core.py @@ -916,15 +916,15 @@ def _fetch_range(rest, path, start, end, stream=False): def _fetch_range_with_retry(rest, path, start, end, stream=False, retries=10, - delay=0.01, backoff=1): + delay=0.01, backoff=3): err = None for i in range(retries): try: return _fetch_range(rest, path, start, end, stream=False) except Exception as e: err = e - logger.debug('Exception %s on ADL download, retrying in %s seconds', - repr(err), delay, exc_info=True) + logger.debug('Exception %s on ADL download on attempt: %s, retrying in %s seconds', + repr(err), i, delay, exc_info=True) time.sleep(delay) delay *= backoff exception = RuntimeError('Max number of ADL retries exceeded: exception ' + repr(err)) @@ -936,7 +936,7 @@ def _put_data(rest, op, path, data, **kwargs): return rest.call(op, path=path, data=data, **kwargs) -def _put_data_with_retry(rest, op, path, data, retries=10, delay=0.01, backoff=1, +def _put_data_with_retry(rest, op, path, data, retries=10, delay=0.01, backoff=3, **kwargs): err = None for i in range(retries): @@ -953,8 +953,8 @@ def _put_data_with_retry(rest, op, path, data, retries=10, delay=0.01, backoff=1 return except Exception as e: err = e - logger.debug('Exception %s on ADL upload, retrying in %s seconds', - repr(err), delay, exc_info=True) + logger.debug('Exception %s on ADL upload on attempt: %s, retrying in %s seconds', + repr(err), i, delay, exc_info=True) time.sleep(delay) delay *= backoff exception = RuntimeError('Max number of ADL retries exceeded: exception ' + repr(err)) diff --git a/azure/datalake/store/multithread.py b/azure/datalake/store/multithread.py index 4737f67..4f3c7c3 100644 --- a/azure/datalake/store/multithread.py +++ b/azure/datalake/store/multithread.py @@ -246,12 +246,13 @@ def __str__(self): def get_chunk(adlfs, src, dst, offset, size, buffersize, blocksize, - shutdown_event=None, retries=10, delay=0.01, backoff=1): + shutdown_event=None, retries=10, delay=0.01, backoff=3): """ Download a piece of a remote file and write locally Internal function used by `download`. """ err = None + total_bytes_downloaded = 0 for i in range(retries): try: nbytes = 0 @@ -261,22 +262,38 @@ def get_chunk(adlfs, src, dst, offset, size, buffersize, blocksize, fout.seek(offset) for chunk in response.iter_content(chunk_size=blocksize): if shutdown_event and shutdown_event.is_set(): - return nbytes, None + return total_bytes_downloaded, None if chunk: nwritten = fout.write(chunk) if nwritten: nbytes += nwritten - logger.debug('Downloaded to %s, byte offset %s', dst, offset) - return nbytes, None + logger.debug('Downloaded %s bytes to %s, byte offset %s', nbytes, dst, offset) + + # There are certain cases where we will be throttled and recieve less than the expected amount of data. + # In these cases, instead of failing right away, instead indicate a retry is occuring and update offset and + # size to attempt another read to get the rest of the data. We will only do this if the amount of bytes read + # is less than size, because if somehow we recieved too much data we are not clear on how to proceed. + if nbytes < size: + errMsg = 'Did not recieve total bytes requested from server. This can be due to server side throttling and will be retried. Data Expected: {}. Data Received: {}.'.format(size, nbytes) + size -= nbytes + offset += nbytes + total_bytes_downloaded += nbytes + raise IOError(errMsg) + elif nbytes > size: + raise IOError('Received more bytes than expected from the server. Expected: {}. Received: {}.'.format(size, nbytes)) + else: + total_bytes_downloaded += nbytes + + return total_bytes_downloaded, None except Exception as e: err = e - logger.debug('Exception %s on ADL download, retrying in %s seconds', - repr(err), delay, exc_info=True) + logger.debug('Exception %s on ADL download on attempt: %s, retrying in %s seconds', + repr(err), i, delay, exc_info=True) time.sleep(delay) delay *= backoff exception = RuntimeError('Max number of ADL retries exceeded: exception ' + repr(err)) logger.error('Download failed %s; %s', dst, repr(exception)) - return nbytes, exception + return total_bytes_downloaded, exception class ADLUploader(object):