Skip to content

Commit

Permalink
Release version 0.0.41 (#273)
Browse files Browse the repository at this point in the history
* Replace content summary api call with user side processing

* Move token check under retry

* Raise exception on repeated zero reads

* Expose timeout for AdlDownloader and AdlUploader

* Refactor walk
  • Loading branch information
akharit authored Feb 4, 2019
1 parent d929cf3 commit 2c60307
Show file tree
Hide file tree
Showing 112 changed files with 117,618 additions and 24 deletions.
7 changes: 7 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@
Release History
===============

0.0.41 (2019-01-31)
+++++++++++++++++++
* Remove GetContentSummary api call
* Move check_token() under retry block
* Expose timeout parameter for AdlDownloader and AdlUploader
* Raise an exception instead of silently break for zero length reads

0.0.40 (2019-01-08)
+++++++++++++++++++
* Fix zero length read
Expand Down
2 changes: 1 addition & 1 deletion azure/datalake/store/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# license information.
# --------------------------------------------------------------------------

__version__ = "0.0.40"
__version__ = "0.0.41"

from .core import AzureDLFileSystem
from .multithread import ADLDownloader
Expand Down
64 changes: 52 additions & 12 deletions azure/datalake/store/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@


# local imports
from .exceptions import DatalakeBadOffsetException
from .exceptions import DatalakeBadOffsetException, DatalakeIncompleteTransferException
from .exceptions import FileNotFoundError, PermissionError
from .lib import DatalakeRESTInterface
from .utils import ensure_writable, read_block
Expand Down Expand Up @@ -195,17 +195,31 @@ def info(self, path, invalidate_cache=True, expected_error_code=None):

raise FileNotFoundError(path)

def _walk(self, path, invalidate_cache=True):
fi = list(self._ls(path, invalidate_cache))
def _walk(self, path, invalidate_cache=True, include_dirs=False):
ret = list(self._ls(path, invalidate_cache))
self._emptyDirs = []
for apath in fi:
if apath['type'] == 'DIRECTORY':
sub_elements = self._ls(apath['name'], invalidate_cache)
current_subdirs = [f for f in ret if f['type'] != 'FILE']
while current_subdirs:
dirs_below_current_level = []
for apath in current_subdirs:
try:
sub_elements = self._ls(apath['name'], invalidate_cache)
except FileNotFoundError:
# Folder may have been deleted while walk is going on. Infrequent so we can take the linear hit
ret.remove(apath)
continue
if not sub_elements:
self._emptyDirs.append(apath)
else:
fi.extend(sub_elements)
return [f for f in fi if f['type'] == 'FILE']
ret.extend(sub_elements)
dirs_below_current_level.extend([f for f in sub_elements if f['type'] != 'FILE'])
current_subdirs = dirs_below_current_level

if include_dirs:
return ret
else:
return [f for f in ret if f['type'] == 'FILE']


def _empty_dirs_to_add(self):
""" Returns directories found empty during walk. Only for internal use"""
Expand Down Expand Up @@ -240,9 +254,31 @@ def du(self, path, total=False, deep=False, invalidate_cache=True):
return {p['name']: p['length'] for p in files}

def df(self, path):
""" Resource summary of path """
""" Resource summary of path
Parameters
----------
path: str
Location
"""
path = AzureDLPath(path).trim()
return self.azure.call('GETCONTENTSUMMARY', path.as_posix())['ContentSummary']
current_path_info = self.info(path, invalidate_cache=False)
if current_path_info['type'] == 'FILE':
return {'directoryCount': 0, 'fileCount': 1, 'length': current_path_info['length'], 'quota': -1,
'spaceConsumed': current_path_info['length'], 'spaceQuota': -1}
else:
all_files_and_dirs = self._walk(path, include_dirs=True)
dir_count = 1 # 1 as walk doesn't return current directory
length = file_count = 0
for item in all_files_and_dirs:
length += item['length']
if item['type'] == 'FILE':
file_count += 1
else:
dir_count += 1

return {'directoryCount': dir_count, 'fileCount': file_count, 'length': length, 'quota': -1,
'spaceConsumed': length, 'spaceQuota': -1}


def chmod(self, path, mod):
""" Change access mode of path
Expand Down Expand Up @@ -858,14 +894,18 @@ def read(self, length=-1):
length = self.size
if self.closed:
raise ValueError('I/O operation on closed file.')

flag = 0
out = b""
while length > 0:
self._read_blocksize()
data_read = self.cache[self.loc - self.start:
min(self.loc - self.start + length, self.end - self.start)]
if not data_read: # Check to catch possible server errors. Ideally shouldn't happen.
break
flag += 1
if flag >= 5:
raise DatalakeIncompleteTransferException('Could not read data: {}. '
'Repeated zero byte reads. '
'Possible file corruption'.format(self.path))
out += data_read
self.loc += len(data_read)
length -= len(data_read)
Expand Down
3 changes: 2 additions & 1 deletion azure/datalake/store/lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ def auth(tenant_id=None, username=None,
if not authority:
authority = 'https://login.microsoftonline.com/'


if not tenant_id:
tenant_id = os.environ.get('azure_tenant_id', "common")

Expand Down Expand Up @@ -373,7 +374,6 @@ def call(self, op, path='', is_extended=False, expected_error_code=None, retry_p
retry_policy = ExponentialRetryPolicy() if retry_policy is None else retry_policy
if op not in self.ends:
raise ValueError("No such op: %s", op)
self._check_token()
method, required, allowed = self.ends[op]
allowed.add('api-version')
data = kwargs.pop('data', b'')
Expand Down Expand Up @@ -402,6 +402,7 @@ def call(self, op, path='', is_extended=False, expected_error_code=None, retry_p
while True:
retry_count += 1
last_exception = None
self._check_token()
try:
response = self.__call_once(method=method,
url=url,
Expand Down
16 changes: 12 additions & 4 deletions azure/datalake/store/multithread.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,17 @@ class ADLDownloader(object):
Callback for progress with signature function(current, total) where
current is the number of bytes transfered so far, and total is the
size of the blob, or None if the total size is unknown.
timeout: int (0)
Default value 0 means infinite timeout. Otherwise time in seconds before the
process will stop and raise an exception if transfer is still in progress
See Also
--------
azure.datalake.store.transfer.ADLTransferClient
"""
def __init__(self, adlfs, rpath, lpath, nthreads=None, chunksize=2**28,
buffersize=2**22, blocksize=2**22, client=None, run=True,
overwrite=False, verbose=False, progress_callback=None):
overwrite=False, verbose=False, progress_callback=None, timeout=0):

# validate that the src exists and the current user has access to it
# this only validates access to the top level folder. If there are files
Expand All @@ -133,7 +136,8 @@ def __init__(self, adlfs, rpath, lpath, nthreads=None, chunksize=2**28,
chunked=False,
verbose=verbose,
parent=self,
progress_callback=progress_callback)
progress_callback=progress_callback,
timeout=timeout)
self._name = tokenize(adlfs, rpath, lpath, chunksize, blocksize)
self.rpath = rpath
self.lpath = lpath
Expand Down Expand Up @@ -378,14 +382,17 @@ class ADLUploader(object):
Callback for progress with signature function(current, total) where
current is the number of bytes transfered so far, and total is the
size of the blob, or None if the total size is unknown.
timeout: int (0)
Default value 0 means infinite timeout. Otherwise time in seconds before the
process will stop and raise an exception if transfer is still in progress
See Also
--------
azure.datalake.store.transfer.ADLTransferClient
"""
def __init__(self, adlfs, rpath, lpath, nthreads=None, chunksize=2**28,
buffersize=2**22, blocksize=2**22, client=None, run=True,
overwrite=False, verbose=False, progress_callback=None):
overwrite=False, verbose=False, progress_callback=None, timeout=0):

if client:
self.client = client
Expand All @@ -402,7 +409,8 @@ def __init__(self, adlfs, rpath, lpath, nthreads=None, chunksize=2**28,
parent=self,
verbose=verbose,
unique_temporary=True,
progress_callback=progress_callback)
progress_callback=progress_callback,
timeout=timeout)
self._name = tokenize(adlfs, rpath, lpath, chunksize, blocksize)
self.rpath = AzureDLPath(rpath)
self.lpath = lpath
Expand Down
8 changes: 6 additions & 2 deletions azure/datalake/store/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,9 @@ class ADLTransferClient(object):
Callback for progress with signature function(current, total) where
current is the number of bytes transferred so far, and total is the
size of the blob, or None if the total size is unknown.
timeout: int (0)
Default value 0 means infinite timeout. Otherwise time in seconds before the
process will stop and raise an exception if transfer is still in progress
Temporary Files
---------------
Expand Down Expand Up @@ -233,7 +236,7 @@ def __init__(self, adlfs, transfer, merge=None, nthreads=None,
chunksize=2**28, blocksize=2**25, chunked=True,
unique_temporary=True, delimiter=None,
parent=None, verbose=False, buffersize=2**25,
progress_callback=None):
progress_callback=None, timeout=0):
self._adlfs = adlfs
self._parent = parent
self._transfer = transfer
Expand All @@ -247,6 +250,7 @@ def __init__(self, adlfs, transfer, merge=None, nthreads=None,
self._unique_str = uuid.uuid4().hex
self._progress_callback=progress_callback
self._progress_lock = threading.Lock()
self._timeout = timeout
self.verbose = verbose

# Internal state tracking files/chunks/futures
Expand Down Expand Up @@ -496,7 +500,7 @@ def run(self, nthreads=None, monitor=True, before_start=None):
self._start(src, dst)

if monitor:
self.monitor()
self.monitor(timeout=self._timeout)
has_errors = False
error_list = []
for f in self.progress:
Expand Down
Loading

0 comments on commit 2c60307

Please sign in to comment.