From 9d4942e2cdd63d8eef7d59b27a5f86b6710ffb26 Mon Sep 17 00:00:00 2001 From: begoldsm Date: Wed, 7 Jun 2017 11:12:33 -0700 Subject: [PATCH] version 0.0.11 of azure-datalake-store package * Address a request to indicate that files being downloaded are in progress when being downloaded. * All files will have a suffix `.inprogress` until they are fully downloaded, at which point the suffix will be removed. --- HISTORY.rst | 4 ++++ azure/datalake/store/__init__.py | 2 +- azure/datalake/store/multithread.py | 12 +++++++----- azure/datalake/store/transfer.py | 26 ++++++++++++++++++++++++++ tests/test_multithread.py | 14 +++++++------- 5 files changed, 45 insertions(+), 13 deletions(-) diff --git a/HISTORY.rst b/HISTORY.rst index 610ad81..cda0e88 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -2,6 +2,10 @@ Release History =============== +0.0.11 (2017-06-02) +------------------- +* Update to name incomplete file downloads with a `.inprogress` suffix. This suffix is removed when the download completes successfully. + 0.0.10 (2017-05-24) ------------------- * Allow users to explicitly use or invalidate the internal, local cache of the filesystem that is built up from previous `ls` calls. It is now set to always call the service instead of the cache by default. diff --git a/azure/datalake/store/__init__.py b/azure/datalake/store/__init__.py index a3b4d86..0e7713e 100644 --- a/azure/datalake/store/__init__.py +++ b/azure/datalake/store/__init__.py @@ -6,7 +6,7 @@ # license information. # -------------------------------------------------------------------------- -__version__ = "0.0.10" +__version__ = "0.0.11" from .core import AzureDLFileSystem from .multithread import ADLDownloader diff --git a/azure/datalake/store/multithread.py b/azure/datalake/store/multithread.py index 6808f20..b5b1f9b 100644 --- a/azure/datalake/store/multithread.py +++ b/azure/datalake/store/multithread.py @@ -191,11 +191,11 @@ def _setup(self): rfiles = self.client._adlfs.glob(self.rpath, details=True, invalidate_cache=True) if len(rfiles) > 1: prefix = commonprefix([f['name'] for f in rfiles]) - file_pairs = [(os.path.join(self.lpath, os.path.relpath(f['name'], prefix)), f) + file_pairs = [(os.path.join(self.lpath, os.path.relpath(f['name'] +'.inprogress', prefix)), f) for f in rfiles] elif len(rfiles) == 1: if os.path.exists(self.lpath) and os.path.isdir(self.lpath): - file_pairs = [(os.path.join(self.lpath, os.path.basename(rfiles[0]['name'])), + file_pairs = [(os.path.join(self.lpath, os.path.basename(rfiles[0]['name'] + '.inprogress')), rfiles[0])] else: file_pairs = [(self.lpath, rfiles[0])] @@ -208,8 +208,11 @@ def _setup(self): existing_files = [] for lfile, rfile in file_pairs: - if not self._overwrite and os.path.exists(lfile): - existing_files.append(lfile) + # only interested in the final destination file name for existence, + # not the initial inprogress target + destination_file = lfile.replace('.inprogress', '') + if not self._overwrite and os.path.exists(destination_file): + existing_files.append(destination_file) else: self.client.submit(rfile['name'], lfile, rfile['length']) @@ -254,7 +257,6 @@ def __str__(self): __repr__ = __str__ - def get_chunk(adlfs, src, dst, offset, size, buffersize, blocksize, shutdown_event=None, retries=10, delay=0.01, backoff=3): """ Download a piece of a remote file and write locally diff --git a/azure/datalake/store/transfer.py b/azure/datalake/store/transfer.py index 190a9dc..4ee3454 100644 --- a/azure/datalake/store/transfer.py +++ b/azure/datalake/store/transfer.py @@ -21,6 +21,7 @@ import time import uuid import operator +import os from .exceptions import DatalakeIncompleteTransferException @@ -361,6 +362,25 @@ def progress(self): chunks=chunks, exception=self._files[key]['exception'])) return files + + def _rename_file(self, src, dst, overwrite=False): + """ Rename a file from file_name.inprogress to just file_name. Invoked once download completes on a file. + + Internal function used by `download`. + """ + try: + # we do a final check to make sure someone didn't create the destination file while download was occuring + # if the user did not specify overwrite. + if os.path.isfile(dst): + if not overwrite: + raise FileExistsError(dst) + os.remove(dst) + os.rename(src, dst) + except Exception as e: + logger.error('Rename failed for source file: %r; %r', src, e) + raise e + + logger.debug('Renamed %r to %r', src, dst) def _update(self, future): if future in self._cfutures: @@ -405,6 +425,12 @@ def _update(self, future): overwrite=self._parent._overwrite) self._ffutures[merge_future] = parent else: + if not self._chunked and str(dst).endswith('.inprogress'): + logger.debug("Renaming file to remove .inprogress: %s", self._fstates[parent]) + self._fstates[parent] = 'merging' + self._rename_file(dst, dst.replace('.inprogress',''), overwrite=self._parent._overwrite) + dst = dst.replace('.inprogress', '') + self._fstates[parent] = 'finished' logger.info("Transferred %s -> %s", src, dst) elif cstates.contains_none('running'): diff --git a/tests/test_multithread.py b/tests/test_multithread.py index b56cd89..3675a69 100644 --- a/tests/test_multithread.py +++ b/tests/test_multithread.py @@ -154,7 +154,7 @@ def test_download_glob(tempdir, azure): assert len(file_pair_dict.keys()) == 2 lfiles = [os.path.relpath(f, tempdir) for f in file_pair_dict.keys()] - assert sorted(lfiles) == sorted(['x.csv', 'y.csv']) + assert sorted(lfiles) == sorted(['x.csv.inprogress', 'y.csv.inprogress']) remote_path = test_dir / 'data' / '*' / '*.csv' down = ADLDownloader(azure, remote_path, tempdir, run=False, @@ -165,10 +165,10 @@ def test_download_glob(tempdir, azure): lfiles = [os.path.relpath(f, tempdir) for f in file_pair_dict.keys()] assert sorted(lfiles) == sorted([ - os.path.join('a', 'x.csv'), - os.path.join('a', 'y.csv'), - os.path.join('b', 'x.csv'), - os.path.join('b', 'y.csv')]) + os.path.join('a', 'x.csv.inprogress'), + os.path.join('a', 'y.csv.inprogress'), + os.path.join('b', 'x.csv.inprogress'), + os.path.join('b', 'y.csv.inprogress')]) remote_path = test_dir / 'data' / '*' / 'z.txt' down = ADLDownloader(azure, remote_path, tempdir, run=False, @@ -178,8 +178,8 @@ def test_download_glob(tempdir, azure): lfiles = [os.path.relpath(f, tempdir) for f in file_pair_dict.keys()] assert sorted(lfiles) == sorted([ - os.path.join('a', 'z.txt'), - os.path.join('b', 'z.txt')]) + os.path.join('a', 'z.txt.inprogress'), + os.path.join('b', 'z.txt.inprogress')]) @my_vcr.use_cassette