diff --git a/HISTORY.rst b/HISTORY.rst index e921102..d61f030 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -3,6 +3,11 @@ Release History =============== +0.0.42 (2019-02-26) ++++++++++++++++++++ +* Update docstrings +* Remove logging setlevel to DEBUG for recursive acl operations + 0.0.41 (2019-01-31) +++++++++++++++++++ * Remove GetContentSummary api call diff --git a/azure/datalake/store/__init__.py b/azure/datalake/store/__init__.py index ed94ea7..a4ca4b1 100644 --- a/azure/datalake/store/__init__.py +++ b/azure/datalake/store/__init__.py @@ -6,7 +6,7 @@ # license information. # -------------------------------------------------------------------------- -__version__ = "0.0.41" +__version__ = "0.0.42" from .core import AzureDLFileSystem from .multithread import ADLDownloader diff --git a/azure/datalake/store/core.py b/azure/datalake/store/core.py index f5e326b..b498af2 100644 --- a/azure/datalake/store/core.py +++ b/azure/datalake/store/core.py @@ -53,7 +53,7 @@ class AzureDLFileSystem(object): url_suffix: str (None) Domain to send REST requests to. The end-point URL is constructed using this and the store_name. If None, use default. - api_version: str (2016-11-01) + api_version: str (2018-05-01) The API version to target with requests. Changing this value will change the behavior of the requests, and can cause unexpected behavior or breaking changes. Changes to this value should be undergone with caution. @@ -144,7 +144,20 @@ def _ls(self, path, invalidate_cache=True, batch_size=4000): return self.dirs[key] def ls(self, path="", detail=False, invalidate_cache=True): - """ List single directory with or without details """ + """ + List all elements under directory specified with path + Parameters + ---------- + path : str or AzureDLPath + Path to query + detail : bool + Detailed info or not. + invalidate_cache : bool + Whether to invalidate cache or not + Returns + ------- + List of elements under directory specified with path + """ path = AzureDLPath(path) files = self._ls(path, invalidate_cache) if not files: @@ -161,7 +174,19 @@ def ls(self, path="", detail=False, invalidate_cache=True): return [f['name'] for f in files] def info(self, path, invalidate_cache=True, expected_error_code=None): - """ File information + """ + File information for path + Parameters + ---------- + path : str or AzureDLPath + Path to query + invalidate_cache : bool + Whether to invalidate cache or not + expected_error_code : int + Optionally indicates a specific, expected error code, if any. + Returns + ------- + File information """ path = AzureDLPath(path).trim() path_as_posix = path.as_posix() @@ -196,6 +221,21 @@ def info(self, path, invalidate_cache=True, expected_error_code=None): raise FileNotFoundError(path) def _walk(self, path, invalidate_cache=True, include_dirs=False): + """ + Walk a path recursively and returns list of files and dirs(if parameter set) + Parameters + ---------- + path : str or AzureDLPath + Path to query + invalidate_cache : bool + Whether to invalidate cache + include_dirs : bool + Whether to include dirs in return value + + Returns + ------- + List of files and (optionally) dirs + """ ret = list(self._ls(path, invalidate_cache)) self._emptyDirs = [] current_subdirs = [f for f in ret if f['type'] != 'FILE'] @@ -220,20 +260,45 @@ def _walk(self, path, invalidate_cache=True, include_dirs=False): else: return [f for f in ret if f['type'] == 'FILE'] - def _empty_dirs_to_add(self): """ Returns directories found empty during walk. Only for internal use""" return self._emptyDirs def walk(self, path='', details=False, invalidate_cache=True): - """ Get all files below given path + """ + Get all files below given path + Parameters + ---------- + path : str or AzureDLPath + Path to query + details : bool + Whether to include file details + invalidate_cache : bool + Whether to invalidate cache + + Returns + ------- + List of files """ return [f if details else f['name'] for f in self._walk(path, invalidate_cache)] def glob(self, path, details=False, invalidate_cache=True): """ Find files (not directories) by glob-matching. + Parameters + ---------- + path : str or AzureDLPath + Path to query + details : bool + Whether to include file details + invalidate_cache : bool + Whether to invalidate cache + + Returns + ------- + List of files """ + path = AzureDLPath(path).trim() path_as_posix = path.as_posix() prefix = path.globless_prefix @@ -243,7 +308,24 @@ def glob(self, path, details=False, invalidate_cache=True): return [f for f in allfiles if AzureDLPath(f['name'] if details else f).match(path_as_posix)] def du(self, path, total=False, deep=False, invalidate_cache=True): - """ Bytes in keys at path """ + """ + Bytes in keys at path + Parameters + ---------- + path : str or AzureDLPath + Path to query + total : bool + Return the sum on list + deep : bool + Recursively enumerate or just use files under current dir + invalidate_cache : bool + Whether to invalidate cache + + Returns + ------- + List of dict of name:size pairs or total size. + """ + if deep: files = self._walk(path, invalidate_cache) else: @@ -258,7 +340,7 @@ def df(self, path): Parameters ---------- path: str - Location + Path to query """ path = AzureDLPath(path).trim() current_path_info = self.info(path, invalidate_cache=False) @@ -279,7 +361,6 @@ def df(self, path): return {'directoryCount': dir_count, 'fileCount': file_count, 'length': length, 'quota': -1, 'spaceConsumed': length, 'spaceQuota': -1} - def chmod(self, path, mod): """ Change access mode of path @@ -299,7 +380,7 @@ def chmod(self, path, mod): def set_expiry(self, path, expiry_option, expire_time=None): """ - Sets or removes the expiration time on the specified file. + Set or remove the expiration time on the specified file. This operation can only be executed against files. Note: Folders are not supported. @@ -367,7 +448,7 @@ def _acl_call(self, action, path, acl_spec=None, invalidate_cache=False): def set_acl(self, path, acl_spec, recursive=False, number_of_sub_process=None): """ - Sets the Access Control List (ACL) for a file or folder. + Set the Access Control List (ACL) for a file or folder. Note: this is by default not recursive, and applies only to the file or folder specified. @@ -388,7 +469,7 @@ def set_acl(self, path, acl_spec, recursive=False, number_of_sub_process=None): def modify_acl_entries(self, path, acl_spec, recursive=False, number_of_sub_process=None): """ - Modifies existing Access Control List (ACL) entries on a file or folder. + Modify existing Access Control List (ACL) entries on a file or folder. If the entry does not exist it is added, otherwise it is updated based on the spec passed in. No entries are removed by this process (unlike set_acl). @@ -411,7 +492,7 @@ def modify_acl_entries(self, path, acl_spec, recursive=False, number_of_sub_proc def remove_acl_entries(self, path, acl_spec, recursive=False, number_of_sub_process=None): """ - Removes existing, named, Access Control List (ACL) entries on a file or folder. + Remove existing, named, Access Control List (ACL) entries on a file or folder. If the entry does not exist already it is ignored. Default entries cannot be removed this way, please use remove_default_acl for that. Unnamed entries cannot be removed in this way, please use remove_acl for that. @@ -433,7 +514,6 @@ def remove_acl_entries(self, path, acl_spec, recursive=False, number_of_sub_proc else: self._acl_call('REMOVEACLENTRIES', path, acl_spec, invalidate_cache=True) - def get_acl_status(self, path): """ Gets Access Control List (ACL) entries for the specified file or directory. @@ -447,7 +527,7 @@ def get_acl_status(self, path): def remove_acl(self, path): """ - Removes the entire, non default, ACL from the file or folder, including unnamed entries. + Remove the entire, non default, ACL from the file or folder, including unnamed entries. Default entries cannot be removed this way, please use remove_default_acl for that. Note: this is not recursive, and applies only to the file or folder specified. @@ -459,10 +539,9 @@ def remove_acl(self, path): """ self._acl_call('REMOVEACL', path, invalidate_cache=True) - def remove_default_acl(self, path): """ - Removes the entire default ACL from the folder. + Remove the entire default ACL from the folder. Default entries do not exist on files, if a file is specified, this operation does nothing. @@ -475,7 +554,6 @@ def remove_default_acl(self, path): """ self._acl_call('REMOVEDEFAULTACL', path, invalidate_cache=True) - def chown(self, path, owner=None, group=None): """ Change owner and/or owning group @@ -503,7 +581,18 @@ def chown(self, path, owner=None, group=None): self.invalidate_cache(path.as_posix()) def exists(self, path, invalidate_cache=True): - """ Does such a file/directory exist? """ + """ + Does such a file/directory exist? + Parameters + ---------- + path : str or AzureDLPath + Path to query + invalidate_cache : bool + Whether to invalidate cache + Returns + ------- + True or false depending on whether the path exists. + """ try: self.info(path, invalidate_cache, expected_error_code=404) return True @@ -511,12 +600,33 @@ def exists(self, path, invalidate_cache=True): return False def cat(self, path): - """ Returns contents of file """ + """ + Return contents of file + Parameters + ---------- + path : str or AzureDLPath + Path to query + Returns + ------- + Contents of file + """ with self.open(path, 'rb') as f: return f.read() def tail(self, path, size=1024): - """ Return last bytes of file """ + """ + Return last bytes of file + Parameters + ---------- + path : str or AzureDLPath + Path to query + size : int + How many bytes to return + + Returns + ------- + Last(size) bytes of file + """ length = self.info(path)['length'] if size > length: return self.cat(path) @@ -525,12 +635,35 @@ def tail(self, path, size=1024): return f.read(size) def head(self, path, size=1024): - """ Return first bytes of file """ + """ + Return first bytes of file + Parameters + ---------- + path : str or AzureDLPath + Path to query + size : int + How many bytes to return + + Returns + ------- + First(size) bytes of file + """ with self.open(path, 'rb', blocksize=size) as f: return f.read(size) def get(self, path, filename): - """ Stream data from file at path to local filename """ + """ + Stream data from file at path to local filename + Parameters + ---------- + path : str or AzureDLPath + ADL Path to read + filename : str or Path + Local file path to write to + Returns + ------- + None + """ with self.open(path, 'rb') as f: with open(filename, 'wb') as f2: while True: @@ -540,7 +673,20 @@ def get(self, path, filename): f2.write(data) def put(self, filename, path, delimiter=None): - """ Stream data from local filename to file at path """ + """ + Stream data from local filename to file at path + Parameters + ---------- + filename : str or Path + Local file path to read from + path : str or AzureDLPath + ADL Path to write to + delimiter : + Optional delimeter for delimiter-ended blocks + Returns + ------- + None + """ with open(filename, 'rb') as f: with self.open(path, 'wb', delimiter=delimiter) as f2: while True: @@ -550,13 +696,32 @@ def put(self, filename, path, delimiter=None): f2.write(data) def mkdir(self, path): - """ Make new directory """ + """ + Make new directory + Parameters + ---------- + path : str or AzureDLPath + Path to create directory + Returns + ------- + None + """ + """ """ path = AzureDLPath(path).trim() self.azure.call('MKDIRS', path.as_posix()) self.invalidate_cache(path) def rmdir(self, path): - """ Remove empty directory """ + """ + Remove empty directory + Parameters + ---------- + path : str or AzureDLPath + Directory path to remove + Returns + ------- + None + """ if self.info(path)['type'] != "DIRECTORY": raise ValueError('Can only rmdir on directories') # should always invalidate the cache when checking to see if the directory is empty @@ -565,7 +730,19 @@ def rmdir(self, path): self.rm(path, False) def mv(self, path1, path2): - """ Move file between locations on ADL """ + """ + Move file between locations on ADL + Parameters + ---------- + path1 : + Source Path + path2 : + Destination path + + Returns + ------- + None + """ path1 = AzureDLPath(path1).trim() path2 = AzureDLPath(path2).trim() self.azure.call('RENAME', path1.as_posix(), @@ -587,6 +764,10 @@ def concat(self, outfile, filelist, delete_source=False): delete_source : bool (False) If True, assume that the paths to concatenate exist alone in a directory, and delete that whole directory when done. + + Returns + ------- + None """ outfile = AzureDLPath(outfile).trim() delete = 'true' if delete_source else 'false' @@ -608,15 +789,19 @@ def cp(self, path1, path2): def rm(self, path, recursive=False): """ - Remove a file. + Remove a file or directory Parameters ---------- - path : string + path : str or AzureDLPath The location to remove. recursive : bool (True) Whether to remove also all entries below, i.e., which are returned by `walk()`. + + Returns + ------- + None """ path = AzureDLPath(path).trim() # Always invalidate the cache when attempting to check existence of something to delete @@ -629,7 +814,16 @@ def rm(self, path, recursive=False): [self.invalidate_cache(m) for m in matches] def invalidate_cache(self, path=None): - """Remove entry from object file-cache""" + """ + Remove entry from object file-cache + Parameters + ---------- + path : str or AzureDLPath + Remove the path from object file-cache + Returns + ------- + None + """ if path is None: self.dirs.clear() else: @@ -642,7 +836,13 @@ def touch(self, path): """ Create empty file - If path is a bucket only, attempt to create bucket. + Parameters + ---------- + path : str or AzureDLPath + Path of file to create + Returns + ------- + None """ with self.open(path, 'wb'): pass diff --git a/azure/datalake/store/multiprocessor.py b/azure/datalake/store/multiprocessor.py index 4dab6ef..7a90ae1 100644 --- a/azure/datalake/store/multiprocessor.py +++ b/azure/datalake/store/multiprocessor.py @@ -21,7 +21,6 @@ def monitor_exception(exception_queue, process_ids): global GLOBAL_EXCEPTION logger = logging.getLogger(__name__) - logger.setLevel(logging.DEBUG) while True: try: @@ -65,7 +64,6 @@ def log_listener_process(queue): def multi_processor_change_acl(adl, path=None, method_name="", acl_spec="", number_of_sub_process=None): logger = logging.getLogger(__name__) - logger.setLevel(logging.DEBUG) def launch_processes(number_of_processes): if number_of_processes is None: @@ -157,7 +155,6 @@ def processor(adl, file_path_queue, finish_queue_processing_flag, method_name, a except AttributeError: # Python 2 doesn't have Queue Handler. Default to best effort logging. pass - logger.setLevel(logging.DEBUG) try: func_table = {"mod_acl": adl.modify_acl_entries, "set_acl": adl.set_acl, "rem_acl": adl.remove_acl_entries} diff --git a/azure/datalake/store/multithread.py b/azure/datalake/store/multithread.py index fc4e9a8..b98f5ba 100644 --- a/azure/datalake/store/multithread.py +++ b/azure/datalake/store/multithread.py @@ -81,6 +81,7 @@ class ADLDownloader(object): Number of bytes for a chunk. Large files are split into chunks. Files smaller than this number will always be transferred in a single thread. buffersize: int [2**22] + Ignored in curret implementation. Number of bytes for internal buffer. This block cannot be bigger than a chunk and cannot be smaller than a block. blocksize: int [2**22]