Skip to content

Commit

Permalink
Update to version 0.0.6
Browse files Browse the repository at this point in the history
This includes:
Performance fixes for upload and download of large folders
Authentication fixes for token rerfesh using lib.auth()
  • Loading branch information
begoldsm committed Mar 22, 2017
1 parent 9e35e75 commit 3e9d2c9
Show file tree
Hide file tree
Showing 83 changed files with 3,492 additions and 253,262 deletions.
4 changes: 4 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
Release History
===============
0.0.6 (2017-03-15)
------------------
* Fix an issue with path caching that should drastically improve performance for download

0.0.5 (2017-03-01)
------------------
* Fix for downloader to ensure there is access to the source path before creating destination files
Expand Down
2 changes: 1 addition & 1 deletion azure-data-lake-store-python.pyproj.user
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<PropertyGroup>
<ProjectView>ShowAllFiles</ProjectView>
<ProjectView>ProjectFiles</ProjectView>
</PropertyGroup>
</Project>
2 changes: 1 addition & 1 deletion azure/datalake/store/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# license information.
# --------------------------------------------------------------------------

__version__ = "0.0.5"
__version__ = "0.0.6"

from .core import AzureDLFileSystem
from .multithread import ADLDownloader
Expand Down
54 changes: 36 additions & 18 deletions azure/datalake/store/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ def _ls(self, path):
""" List files at given path """
path = AzureDLPath(path).trim()
key = path.as_posix()
if path not in self.dirs:
out = self.azure.call('LISTSTATUS', path.as_posix())
if key not in self.dirs:
out = self.azure.call('LISTSTATUS', key)
self.dirs[key] = out['FileStatuses']['FileStatus']
for f in self.dirs[key]:
f['name'] = (path / f['pathSuffix']).as_posix()
Expand All @@ -137,10 +137,12 @@ def info(self, path):
"""
path = AzureDLPath(path).trim()
root = path.parent
myfile = [f for f in self._ls(root) if f['name'] == path.as_posix()]
if len(myfile) == 1:
return myfile[0]
raise FileNotFoundError(path)
path_as_posix = path.as_posix()
for f in self._ls(root):
if f['name'] == path_as_posix:
return f
else:
raise FileNotFoundError(path)

def _walk(self, path):
fi = list(self._ls(path))
Expand All @@ -149,21 +151,22 @@ def _walk(self, path):
fi.extend(self._ls(apath['name']))
return [f for f in fi if f['type'] == 'FILE']

def walk(self, path=''):
def walk(self, path='', details=False):
""" Get all files below given path
"""
return [f['name'] for f in self._walk(path)]
return [f if details else f['name'] for f in self._walk(path)]

def glob(self, path):
def glob(self, path, details=False):
"""
Find files (not directories) by glob-matching.
"""
path = AzureDLPath(path).trim()
path_as_posix = path.as_posix()
prefix = path.globless_prefix
allfiles = self.walk(prefix)
allfiles = self.walk(prefix, details)
if prefix == path:
return allfiles
return [f for f in allfiles if AzureDLPath(f).match(path.as_posix())]
return [f for f in allfiles if AzureDLPath(f['name'] if details else f).match(path_as_posix)]

def du(self, path, total=False, deep=False):
""" Bytes in keys at path """
Expand Down Expand Up @@ -233,8 +236,9 @@ def set_expiry(self, path, expiry_option, expire_time=None):
parms['expireTime'] = int(expire_time)

self.azure.call('SETEXPIRY', path.as_posix(), is_extended=True, **parms)
self.invalidate_cache(path.as_posix())

def _acl_call(self, action, path, acl_spec=None):
def _acl_call(self, action, path, acl_spec=None, invalidate_cache=False):
"""
Helper method for ACL calls to reduce code repetition
Expand All @@ -249,13 +253,21 @@ def _acl_call(self, action, path, acl_spec=None):
'[default:]user|group|other:[entity id or UPN]:r|-w|-x|-,[default:]user|group|other:[entity id or UPN]:r|-w|-x|-,...'
Note that for remove acl entries the permission (rwx) portion is not required.
invalidate_cache: bool
optionally indicates that the cache of files should be invalidated after this operation
This should always be done for set and remove operations, since the state of the file or folder has changed.
"""
parms = {}
path = AzureDLPath(path).trim()
posix_path = path.as_posix()
if acl_spec:
parms['aclSpec'] = acl_spec

return self.azure.call(action, path.as_posix(), **parms)
to_return = self.azure.call(action, posix_path, **parms)
if invalidate_cache:
self.invalidate_cache(posix_path)

return to_return

def set_acl(self, path, acl_spec):
"""
Expand All @@ -272,7 +284,8 @@ def set_acl(self, path, acl_spec):
'[default:]user|group|other:[entity id or UPN]:r|-w|-x|-,[default:]user|group|other:[entity id or UPN]:r|-w|-x|-,...'
"""

self._acl_call('SETACL', path, acl_spec)
self._acl_call('SETACL', path, acl_spec, invalidate_cache=True)


def modify_acl_entries(self, path, acl_spec):
"""
Expand All @@ -290,7 +303,8 @@ def modify_acl_entries(self, path, acl_spec):
The ACL specification to use in modifying the ACL at the path in the format
'[default:]user|group|other:[entity id or UPN]:r|-w|-x|-,[default:]user|group|other:[entity id or UPN]:r|-w|-x|-,...'
"""
self._acl_call('MODIFYACLENTRIES', path, acl_spec)
self._acl_call('MODIFYACLENTRIES', path, acl_spec, invalidate_cache=True)


def remove_acl_entries(self, path, acl_spec):
"""
Expand All @@ -309,7 +323,8 @@ def remove_acl_entries(self, path, acl_spec):
The ACL specification to remove from the ACL at the path in the format (note that the permission portion is missing)
'[default:]user|group|other:[entity id or UPN],[default:]user|group|other:[entity id or UPN],...'
"""
self._acl_call('REMOVEACLENTRIES', path, acl_spec)
self._acl_call('REMOVEACLENTRIES', path, acl_spec, invalidate_cache=True)


def get_acl_status(self, path):
"""
Expand All @@ -334,7 +349,8 @@ def remove_acl(self, path):
path: str
Location to remove the ACL.
"""
self._acl_call('REMOVEACL', path)
self._acl_call('REMOVEACL', path, invalidate_cache=True)


def remove_default_acl(self, path):
"""
Expand All @@ -349,7 +365,8 @@ def remove_default_acl(self, path):
path: str
Location to set the ACL on.
"""
self._acl_call('REMOVEDEFAULTACL', path)
self._acl_call('REMOVEDEFAULTACL', path, invalidate_cache=True)


def chown(self, path, owner=None, group=None):
"""
Expand All @@ -375,6 +392,7 @@ def chown(self, path, owner=None, group=None):
parms['group'] = group
path = AzureDLPath(path).trim()
self.azure.call('SETOWNER', path.as_posix(), **parms)
self.invalidate_cache(path.as_posix())

def exists(self, path):
""" Does such a file/directory exist? """
Expand Down
55 changes: 26 additions & 29 deletions azure/datalake/store/lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,34 +46,6 @@
# This ensures that no connections are prematurely evicted, which has negative performance implications.
MAX_POOL_CONNECTIONS = 1024

def refresh_token(token, authority=None):
""" Refresh an expired authorization token
Parameters
----------
token : dict
Produced by `auth()` or `refresh_token`.
authority: string
The full URI of the authentication authority to authenticate against (such as https://login.microsoftonline.com/)
"""
if token.get('refresh', False) is False:
raise ValueError("Token cannot be auto-refreshed.")

if not authority:
authority = 'https://login.microsoftonline.com/'

context = adal.AuthenticationContext(authority +
token['tenant'])
out = context.acquire_token_with_refresh_token(token['refresh'],
client_id=token['client'],
resource=token['resource'])
out.update({'access': out['accessToken'], 'refresh': out['refreshToken'],
'time': time.time(), 'tenant': token['tenant'],
'resource': token['resource'], 'client': token['client']})

return DataLakeCredential(out)


def auth(tenant_id=None, username=None,
password=None, client_id=default_client,
client_secret=None, resource=DEFAULT_RESOURCE_ENDPOINT,
Expand Down Expand Up @@ -157,12 +129,37 @@ def __init__(self, token):
def signed_session(self):
session = super(DataLakeCredential, self).signed_session()
if time.time() - self.token['time'] > self.token['expiresIn'] - 100:
self.token = refresh_token(self.token)
self.refresh_token()

scheme, token = self.token['tokenType'], self.token['access']
header = "{} {}".format(scheme, token)
session.headers['Authorization'] = header
return session

def refresh_token(self, authority=None):
""" Refresh an expired authorization token
Parameters
----------
authority: string
The full URI of the authentication authority to authenticate against (such as https://login.microsoftonline.com/)
"""
if self.token.get('refresh', False) is False:
raise ValueError("Token cannot be auto-refreshed.")

if not authority:
authority = 'https://login.microsoftonline.com/'

context = adal.AuthenticationContext(authority +
self.token['tenant'])
out = context.acquire_token_with_refresh_token(self.token['refresh'],
client_id=self.token['client'],
resource=self.token['resource'])
out.update({'access': out['accessToken'], 'refresh': out['refreshToken'],
'time': time.time(), 'tenant': self.token['tenant'],
'resource': self.token['resource'], 'client': self.token['client']})

self.token = out

class DatalakeRESTInterface:
""" Call factory for webHDFS endpoints on ADLS
Expand Down
44 changes: 24 additions & 20 deletions azure/datalake/store/multithread.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,26 +181,28 @@ def _setup(self):
""" Create set of parameters to loop over
"""
if "*" not in self.rpath:
rfiles = self.client._adlfs.walk(self.rpath)
rfiles = self.client._adlfs.walk(self.rpath, details=True)
else:
rfiles = self.client._adlfs.glob(self.rpath)
rfiles = self.client._adlfs.glob(self.rpath, details=True)
if len(rfiles) > 1:
prefix = commonprefix(rfiles)
lfiles = [os.path.join(self.lpath, os.path.relpath(f, prefix))
for f in rfiles]
elif rfiles:
prefix = commonprefix([f['name'] for f in rfiles])
file_pairs = [(os.path.join(self.lpath, os.path.relpath(f['name'], prefix)), f)
for f in rfiles]
elif len(rfiles) == 1:
if os.path.exists(self.lpath) and os.path.isdir(self.lpath):
lfiles = [os.path.join(self.lpath, os.path.basename(rfiles[0]))]
file_pairs = [(os.path.join(self.lpath, os.path.basename(rfiles[0]['name'])),
rfiles[0])]
else:
lfiles = [self.lpath]
file_pairs = [(self.lpath, rfiles[0])]
else:
raise ValueError('No files to download')
self.rfiles = rfiles
self.lfiles = lfiles

for lfile, rfile in zip(lfiles, rfiles):
fsize = self.client._adlfs.info(rfile)['length']
self.client.submit(rfile, lfile, fsize)
# this property is used for internal validation
# and should not be referenced directly by public callers
self._file_pairs = file_pairs

for lfile, rfile in file_pairs:
self.client.submit(rfile['name'], lfile, rfile['length'])

def run(self, nthreads=None, monitor=True):
""" Populate transfer queue and execute downloads
Expand Down Expand Up @@ -412,22 +414,24 @@ def _setup(self):
lfiles = [self.lpath]
else:
lfiles = glob.glob(self.lpath)

if len(lfiles) > 1:
prefix = commonprefix(lfiles)
rfiles = [self.rpath / AzureDLPath(f).relative_to(prefix)
for f in lfiles]
file_pairs = [(f, self.rpath / AzureDLPath(f).relative_to(prefix)) for f in lfiles]
elif lfiles:
if self.client._adlfs.exists(self.rpath) and \
self.client._adlfs.info(self.rpath)['type'] == "DIRECTORY":
rfiles = [self.rpath / AzureDLPath(lfiles[0]).name]
file_pairs = [(lfiles[0], self.rpath / AzureDLPath(lfiles[0]).name)]
else:
rfiles = [self.rpath]
file_pairs = [(lfiles[0], self.rpath)]
else:
raise ValueError('No files to upload')
self.rfiles = rfiles
self.lfiles = lfiles

for lfile, rfile in zip(lfiles, rfiles):
# this property is used for internal validation
# and should not be referenced directly by public callers
self._file_pairs = file_pairs

for lfile, rfile in file_pairs:
fsize = os.stat(lfile).st_size
self.client.submit(lfile, rfile, fsize)

Expand Down
37 changes: 21 additions & 16 deletions azure/datalake/store/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,40 +261,45 @@ def submit(self, src, dst, length):
'pending', 'running', 'finished', 'cancelled', 'errored')

# Create unique temporary directory for each file
if self._chunked and self._unique_temporary:
tmpdir = dst.parent / "{}.segments.{}".format(dst.name, self._unique_str)
elif self._chunked:
tmpdir = dst.parent / "{}.segments".format(dst.name)
if self._chunked:
if self._unique_temporary:
filename = "{}.segments.{}".format(dst.name, self._unique_str)
else:
filename = "{}.segments".format(dst.name)
tmpdir = dst.parent/filename
else:
tmpdir = None

offsets = list(range(0, length, self._chunksize))
# TODO: might need xrange support for py2
offsets = range(0, length, self._chunksize)

# in the case of empty files, ensure that the initial offset of 0 is properly added.
if not offsets:
if not length:
offsets.append(0)
offsets = [0]
else:
raise DatalakeIncompleteTransferException('Could not compute offsets for source: {}, with destination: {} and expected length: {}.'.format(src, dst, length))

tmpdir_and_offsets = tmpdir and len(offsets) > 1
for offset in offsets:
if tmpdir and len(offsets) > 1:
if tmpdir_and_offsets:
name = tmpdir / "{}_{}".format(dst.name, offset)
else:
name = dst
cstates[(name, offset)] = 'pending'
self._chunks[(name, offset)] = dict(
parent=(src, dst),
expected=min(length - offset, self._chunksize),
actual=0,
exception=None)
self._chunks[(name, offset)] = {
"parent": (src, dst),
"expected": min(length - offset, self._chunksize),
"actual": 0,
"exception": None}
logger.debug("Submitted %s, byte offset %d", name, offset)

self._fstates[(src, dst)] = 'pending'
self._files[(src, dst)] = dict(
length=length,
cstates=cstates,
exception=None)
self._files[(src, dst)] = {
"length": length,
"cstates": cstates,
"exception": None}


def _submit(self, fn, *args, **kwargs):
kwargs['shutdown_event'] = self._shutdown_event
Expand Down
Loading

0 comments on commit 3e9d2c9

Please sign in to comment.