Skip to content

Commit

Permalink
Merge from dev.
Browse files Browse the repository at this point in the history
  • Loading branch information
begoldsm committed May 31, 2017
1 parent dc693f1 commit 9dafd1b
Show file tree
Hide file tree
Showing 88 changed files with 16,171 additions and 3,102 deletions.
7 changes: 7 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@
Release History
===============
0.0.10 (2017-05-24)
------------------
* Allow users to explicitly use or invalidate the internal, local cache of the filesystem that is built up from previous `ls` calls. It is now set to always call the service instead of the cache by default.
* Update to properly create the wheel package during build to ensure all pip packages are available.
* Update folder upload/download to properly throw early in the event that the destination files exist and overwrite was not specified. NOTE: target folder existence (or sub folder existence) does not automatically cause failure. Only leaf node existence will result in failure.
* Fix a bug that caused file not found errors when attempting to get information about the root folder.

0.0.9 (2017-05-09)
------------------
* Enforce basic SSL utilization to ensure performance due to `GitHub issue 625 <https://github.com/pyca/pyopenssl/issues/625>`
Expand Down
1 change: 1 addition & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ To play with the code, here is a starting point:
# typical operations
adl.ls('')
adl.ls('tmp/', detail=True)
adl.ls('tmp/', detail=True, invalidate_cache=True)
adl.cat('littlefile')
adl.head('gdelt20150827.csv')
Expand Down
2 changes: 1 addition & 1 deletion azure/datalake/store/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# license information.
# --------------------------------------------------------------------------

__version__ = "0.0.9"
__version__ = "0.0.10"

from .core import AzureDLFileSystem
from .multithread import ADLDownloader
Expand Down
94 changes: 67 additions & 27 deletions azure/datalake/store/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def open(self, path, mode='rb', blocksize=2**25, delimiter=None):
path: string
Path of file on ADL
mode: string
One of 'rb' or 'wb'
One of 'rb', 'ab' or 'wb'
blocksize: int
Size of data-node blocks if reading
delimiter: byte(s) or None
Expand All @@ -107,73 +107,102 @@ def open(self, path, mode='rb', blocksize=2**25, delimiter=None):
return AzureDLFile(self, AzureDLPath(path), mode, blocksize=blocksize,
delimiter=delimiter)

def _ls(self, path):
def _ls(self, path, invalidate_cache=True):
""" List files at given path """
path = AzureDLPath(path).trim()
key = path.as_posix()

if invalidate_cache:
self.invalidate_cache(key)

if key not in self.dirs:
out = self.azure.call('LISTSTATUS', key)
self.dirs[key] = out['FileStatuses']['FileStatus']
for f in self.dirs[key]:
f['name'] = (path / f['pathSuffix']).as_posix()
return self.dirs[key]

def ls(self, path="", detail=False):
def ls(self, path="", detail=False, invalidate_cache=True):
""" List single directory with or without details """
path = AzureDLPath(path)
files = self._ls(path)
files = self._ls(path, invalidate_cache)
if not files:
inf = self.info(path)
# in this case we just invalidated the cache (if it was true), so no need to do it again
inf = self.info(path, invalidate_cache=False)
if inf['type'] == 'DIRECTORY':
return []
return inf if detail else []

raise FileNotFoundError(path)
if detail:
return files
else:
return [f['name'] for f in files]

def info(self, path):
def info(self, path, invalidate_cache=True, expected_error_code=None):
""" File information
"""
path = AzureDLPath(path).trim()
root = path.parent
path_as_posix = path.as_posix()
for f in self._ls(root):
root = path.parent
root_as_posix = root.as_posix()

# in the case of getting info about the root itself or if the cache won't be hit
# simply return the result of a GETFILESTATUS from the service
if invalidate_cache or path_as_posix in {'/', '.'}:
to_return = self.azure.call('GETFILESTATUS', path_as_posix, expected_error_code=expected_error_code)['FileStatus']
to_return['name'] = path_as_posix

# add the key/value pair back to the cache so long as it isn't the root
if path_as_posix not in {'/', '.'}:
if root_as_posix not in self.dirs:
self.dirs[root_as_posix] = [to_return]
else:
found = False
for f in self.dirs[root_as_posix]:
if f['name'] == path_as_posix:
found = True
f = to_return
break
if not found:
self.dirs[root_as_posix].append(to_return)
return to_return

for f in self._ls(root, invalidate_cache):
if f['name'] == path_as_posix:
return f
else:
raise FileNotFoundError(path)

raise FileNotFoundError(path)

def _walk(self, path):
fi = list(self._ls(path))
def _walk(self, path, invalidate_cache=True):
fi = list(self._ls(path, invalidate_cache))
for apath in fi:
if apath['type'] == 'DIRECTORY':
fi.extend(self._ls(apath['name']))
fi.extend(self._ls(apath['name'], invalidate_cache))
return [f for f in fi if f['type'] == 'FILE']

def walk(self, path='', details=False):
def walk(self, path='', details=False, invalidate_cache=True):
""" Get all files below given path
"""
return [f if details else f['name'] for f in self._walk(path)]
return [f if details else f['name'] for f in self._walk(path, invalidate_cache)]

def glob(self, path, details=False):
def glob(self, path, details=False, invalidate_cache=True):
"""
Find files (not directories) by glob-matching.
"""
path = AzureDLPath(path).trim()
path_as_posix = path.as_posix()
prefix = path.globless_prefix
allfiles = self.walk(prefix, details)
allfiles = self.walk(prefix, details, invalidate_cache)
if prefix == path:
return allfiles
return [f for f in allfiles if AzureDLPath(f['name'] if details else f).match(path_as_posix)]

def du(self, path, total=False, deep=False):
def du(self, path, total=False, deep=False, invalidate_cache=True):
""" Bytes in keys at path """
if deep:
files = self._walk(path)
files = self._walk(path, invalidate_cache)
else:
files = self.ls(path, detail=True)
files = self.ls(path, detail=True, invalidate_cache=invalidate_cache)
if total:
return sum(f.get('length', 0) for f in files)
else:
Expand Down Expand Up @@ -394,10 +423,10 @@ def chown(self, path, owner=None, group=None):
self.azure.call('SETOWNER', path.as_posix(), **parms)
self.invalidate_cache(path.as_posix())

def exists(self, path):
def exists(self, path, invalidate_cache=True):
""" Does such a file/directory exist? """
try:
self.info(path)
self.info(path, invalidate_cache, expected_error_code=404)
return True
except FileNotFoundError:
return False
Expand Down Expand Up @@ -451,7 +480,8 @@ def rmdir(self, path):
""" Remove empty directory """
if self.info(path)['type'] != "DIRECTORY":
raise ValueError('Can only rmdir on directories')
if self.ls(path):
# should always invalidate the cache when checking to see if the directory is empty
if self.ls(path, invalidate_cache=True):
raise ValueError('Directory not empty: %s' % path)
self.rm(path, False)

Expand Down Expand Up @@ -507,7 +537,8 @@ def rm(self, path, recursive=False):
by `walk()`.
"""
path = AzureDLPath(path).trim()
if not self.exists(path):
# Always invalidate the cache when attempting to check existence of something to delete
if not self.exists(path, invalidate_cache=True):
raise FileNotFoundError(path)
self.azure.call('DELETE', path.as_posix(), recursive=recursive)
self.invalidate_cache(path)
Expand Down Expand Up @@ -635,10 +666,19 @@ def __init__(self, azure, path, mode='rb', blocksize=2**25,
self.buffer = io.BytesIO()
self.blocksize = blocksize
self.first_write = True
if mode == 'ab' and self.azure.exists(path):

# always invalidate the cache when checking for existence of a file
# that may be created or written to (for the first time).
exists = self.azure.exists(path, invalidate_cache=True)

# cannot create a new file object out of a directory
if exists and self.info()['type'] == 'DIRECTORY':
raise IOError('path: {} is a directory, not a file, and cannot be opened for reading or writing'.format(path))

if mode == 'ab' and exists:
self.loc = self.info()['length']
self.first_write = False
if mode == 'rb':
elif mode == 'rb':
self.size = self.info()['length']
else:
self.blocksize = min(2**22, blocksize)
Expand Down
18 changes: 14 additions & 4 deletions azure/datalake/store/lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ def _is_json_response(self, response):
return False
return response.headers['content-type'].startswith('application/json')

def call(self, op, path='', is_extended=False, **kwargs):
def call(self, op, path='', is_extended=False, expected_error_code=None, **kwargs):
""" Execute a REST call
Parameters
Expand All @@ -322,6 +322,11 @@ def call(self, op, path='', is_extended=False, **kwargs):
Indicates if the API call comes from the webhdfs extensions path or the basic webhdfs path.
By default, all requests target the official webhdfs path. A small subset of custom convenience
methods specific to Azure Data Lake Store target the extension path (such as SETEXPIRY).
expected_error_code: int
Optionally indicates a specific, expected error code, if any. In the event that this error
is returned, the exception will be logged to DEBUG instead of ERROR stream. The exception
will still be raised, however, as it is expected that the caller will expect to handle it
and do something different if it is raised.
kwargs: dict
other parameters, as defined by the webHDFS standard and
https://msdn.microsoft.com/en-us/library/mt710547.aspx
Expand Down Expand Up @@ -359,11 +364,16 @@ def call(self, op, path='', is_extended=False, **kwargs):
r = func(url, params=params, headers=headers, data=data, stream=stream)
except requests.exceptions.RequestException as e:
raise DatalakeRESTException('HTTP error: ' + repr(e))

exception_log_level = logging.ERROR
if expected_error_code and r.status_code == expected_error_code:
logger.log(logging.DEBUG, 'Error code: {} was an expected potential error from the caller. Logging the exception to the debug stream'.format(r.status_code))
exception_log_level = logging.DEBUG

if r.status_code == 403:
self.log_response_and_raise(r, PermissionError(path))
self.log_response_and_raise(r, PermissionError(path), level=exception_log_level)
elif r.status_code == 404:
self.log_response_and_raise(r, FileNotFoundError(path))
self.log_response_and_raise(r, FileNotFoundError(path), level=exception_log_level)
elif r.status_code >= 400:
err = DatalakeRESTException(
'Data-lake REST exception: %s, %s' % (op, path))
Expand All @@ -374,7 +384,7 @@ def call(self, op, path='', is_extended=False, **kwargs):
if exception == 'BadOffsetException':
err = DatalakeBadOffsetException(path)
self.log_response_and_raise(r, err, level=logging.DEBUG)
self.log_response_and_raise(r, err)
self.log_response_and_raise(r, err, level=exception_log_level)
else:
self._log_response(r)

Expand Down
58 changes: 37 additions & 21 deletions azure/datalake/store/multithread.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,16 @@ class ADLDownloader(object):
def __init__(self, adlfs, rpath, lpath, nthreads=None, chunksize=2**28,
buffersize=2**22, blocksize=2**22, client=None, run=True,
overwrite=False, verbose=False):
if not overwrite and os.path.exists(lpath):
raise FileExistsError(lpath)

# validate that the src exists and the current user has access to it
# this only validates access to the top level folder. If there are files
# or folders underneath it that the user does not have access to the download
# will fail on those files. We clean the path in case there are wildcards.
if not adlfs.exists(AzureDLPath(rpath).globless_prefix):
# In this case, we will always invalidate the cache for this check to
# do our best to ensure that the path exists as close to run time of the transfer as possible.
# Due to the nature of a distributed filesystem, the path could be deleted later during execution,
# at which point the transfer's behavior may be non-deterministic, but it will indicate an error.
if not adlfs.exists(AzureDLPath(rpath).globless_prefix, invalidate_cache=True):
raise FileNotFoundError('Data Lake item at path: {} either does not exist or the current user does not have permission to access it.'.format(rpath))
if client:
self.client = client
Expand All @@ -128,7 +130,10 @@ def __init__(self, adlfs, rpath, lpath, nthreads=None, chunksize=2**28,
self.rpath = rpath
self.lpath = lpath
self._overwrite = overwrite
self._setup()
existing_files = self._setup()
if existing_files:
raise FileExistsError('Overwrite was not specified and the following files exist, blocking the transfer operation. Please specify overwrite to overwrite these files during transfer: {}'.format(','.join(existing_files)))

if run:
self.run()

Expand Down Expand Up @@ -181,9 +186,9 @@ def _setup(self):
""" Create set of parameters to loop over
"""
if "*" not in self.rpath:
rfiles = self.client._adlfs.walk(self.rpath, details=True)
rfiles = self.client._adlfs.walk(self.rpath, details=True, invalidate_cache=True)
else:
rfiles = self.client._adlfs.glob(self.rpath, details=True)
rfiles = self.client._adlfs.glob(self.rpath, details=True, invalidate_cache=True)
if len(rfiles) > 1:
prefix = commonprefix([f['name'] for f in rfiles])
file_pairs = [(os.path.join(self.lpath, os.path.relpath(f['name'], prefix)), f)
Expand All @@ -201,9 +206,14 @@ def _setup(self):
# and should not be referenced directly by public callers
self._file_pairs = file_pairs

existing_files = []
for lfile, rfile in file_pairs:
self.client.submit(rfile['name'], lfile, rfile['length'])

if not self._overwrite and os.path.exists(lfile):
existing_files.append(lfile)
else:
self.client.submit(rfile['name'], lfile, rfile['length'])

return existing_files
def run(self, nthreads=None, monitor=True):
""" Populate transfer queue and execute downloads
Expand Down Expand Up @@ -343,13 +353,6 @@ class ADLUploader(object):
def __init__(self, adlfs, rpath, lpath, nthreads=None, chunksize=2**28,
buffersize=2**22, blocksize=2**22, client=None, run=True,
overwrite=False, verbose=False):
if not overwrite and adlfs.exists(rpath):
raise FileExistsError(rpath)

# forcibly remove the target file before execution
# if the user indicates they want to overwrite the destination.
if overwrite and adlfs.exists(rpath):
adlfs.remove(rpath, True)

if client:
self.client = client
Expand All @@ -370,7 +373,11 @@ def __init__(self, adlfs, rpath, lpath, nthreads=None, chunksize=2**28,
self.rpath = AzureDLPath(rpath)
self.lpath = lpath
self._overwrite = overwrite
self._setup()
existing_files = self._setup()

if existing_files:
raise FileExistsError('Overwrite was not specified and the following files exist, blocking the transfer operation. Please specify overwrite to overwrite these files during transfer: {}'.format(','.join(existing_files)))

if run:
self.run()

Expand Down Expand Up @@ -436,8 +443,8 @@ def _setup(self):
prefix = commonprefix(lfiles)
file_pairs = [(f, self.rpath / AzureDLPath(f).relative_to(prefix)) for f in lfiles]
elif lfiles:
if self.client._adlfs.exists(self.rpath) and \
self.client._adlfs.info(self.rpath)['type'] == "DIRECTORY":
if self.client._adlfs.exists(self.rpath, invalidate_cache=True) and \
self.client._adlfs.info(self.rpath, invalidate_cache=False)['type'] == "DIRECTORY":
file_pairs = [(lfiles[0], self.rpath / AzureDLPath(lfiles[0]).name)]
else:
file_pairs = [(lfiles[0], self.rpath)]
Expand All @@ -448,9 +455,15 @@ def _setup(self):
# and should not be referenced directly by public callers
self._file_pairs = file_pairs

existing_files = []
for lfile, rfile in file_pairs:
fsize = os.stat(lfile).st_size
self.client.submit(lfile, rfile, fsize)
if not self._overwrite and self.client._adlfs.exists(rfile, invalidate_cache=False):
existing_files.append(rfile.as_posix())
else:
fsize = os.stat(lfile).st_size
self.client.submit(lfile, rfile, fsize)

return existing_files

def run(self, nthreads=None, monitor=True):
""" Populate transfer queue and execute downloads
Expand Down Expand Up @@ -517,7 +530,10 @@ def merge_chunks(adlfs, outfile, files, shutdown_event=None, overwrite=False):
# so this call is optimized to instantly delete the temp folder on concat.
# if somehow the target file was created between the beginning of upload
# and concat, we will remove it if the user specified overwrite.
if adlfs.exists(outfile):
# here we must get the most up to date information from the service,
# instead of relying on the local cache to ensure that we know if
# the merge target already exists.
if adlfs.exists(outfile, invalidate_cache=True):
if overwrite:
adlfs.remove(outfile, True)
else:
Expand Down
Loading

0 comments on commit 9dafd1b

Please sign in to comment.