Skip to content

Commit

Permalink
version 0.0.11 of azure-datalake-store package
Browse files Browse the repository at this point in the history
* Address a request to indicate that files being downloaded are in
progress when being downloaded.
* All files will have a suffix `.inprogress` until they are fully
downloaded, at which point the suffix will be removed.
  • Loading branch information
begoldsm committed Jun 7, 2017
1 parent 579c4f2 commit 9d4942e
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 13 deletions.
4 changes: 4 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
Release History
===============
0.0.11 (2017-06-02)
-------------------
* Update to name incomplete file downloads with a `.inprogress` suffix. This suffix is removed when the download completes successfully.

0.0.10 (2017-05-24)
-------------------
* Allow users to explicitly use or invalidate the internal, local cache of the filesystem that is built up from previous `ls` calls. It is now set to always call the service instead of the cache by default.
Expand Down
2 changes: 1 addition & 1 deletion azure/datalake/store/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# license information.
# --------------------------------------------------------------------------

__version__ = "0.0.10"
__version__ = "0.0.11"

from .core import AzureDLFileSystem
from .multithread import ADLDownloader
Expand Down
12 changes: 7 additions & 5 deletions azure/datalake/store/multithread.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,11 @@ def _setup(self):
rfiles = self.client._adlfs.glob(self.rpath, details=True, invalidate_cache=True)
if len(rfiles) > 1:
prefix = commonprefix([f['name'] for f in rfiles])
file_pairs = [(os.path.join(self.lpath, os.path.relpath(f['name'], prefix)), f)
file_pairs = [(os.path.join(self.lpath, os.path.relpath(f['name'] +'.inprogress', prefix)), f)
for f in rfiles]
elif len(rfiles) == 1:
if os.path.exists(self.lpath) and os.path.isdir(self.lpath):
file_pairs = [(os.path.join(self.lpath, os.path.basename(rfiles[0]['name'])),
file_pairs = [(os.path.join(self.lpath, os.path.basename(rfiles[0]['name'] + '.inprogress')),
rfiles[0])]
else:
file_pairs = [(self.lpath, rfiles[0])]
Expand All @@ -208,8 +208,11 @@ def _setup(self):

existing_files = []
for lfile, rfile in file_pairs:
if not self._overwrite and os.path.exists(lfile):
existing_files.append(lfile)
# only interested in the final destination file name for existence,
# not the initial inprogress target
destination_file = lfile.replace('.inprogress', '')
if not self._overwrite and os.path.exists(destination_file):
existing_files.append(destination_file)
else:
self.client.submit(rfile['name'], lfile, rfile['length'])

Expand Down Expand Up @@ -254,7 +257,6 @@ def __str__(self):

__repr__ = __str__


def get_chunk(adlfs, src, dst, offset, size, buffersize, blocksize,
shutdown_event=None, retries=10, delay=0.01, backoff=3):
""" Download a piece of a remote file and write locally
Expand Down
26 changes: 26 additions & 0 deletions azure/datalake/store/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import time
import uuid
import operator
import os

from .exceptions import DatalakeIncompleteTransferException

Expand Down Expand Up @@ -361,6 +362,25 @@ def progress(self):
chunks=chunks,
exception=self._files[key]['exception']))
return files

def _rename_file(self, src, dst, overwrite=False):
""" Rename a file from file_name.inprogress to just file_name. Invoked once download completes on a file.
Internal function used by `download`.
"""
try:
# we do a final check to make sure someone didn't create the destination file while download was occuring
# if the user did not specify overwrite.
if os.path.isfile(dst):
if not overwrite:
raise FileExistsError(dst)
os.remove(dst)
os.rename(src, dst)
except Exception as e:
logger.error('Rename failed for source file: %r; %r', src, e)
raise e

logger.debug('Renamed %r to %r', src, dst)

def _update(self, future):
if future in self._cfutures:
Expand Down Expand Up @@ -405,6 +425,12 @@ def _update(self, future):
overwrite=self._parent._overwrite)
self._ffutures[merge_future] = parent
else:
if not self._chunked and str(dst).endswith('.inprogress'):
logger.debug("Renaming file to remove .inprogress: %s", self._fstates[parent])
self._fstates[parent] = 'merging'
self._rename_file(dst, dst.replace('.inprogress',''), overwrite=self._parent._overwrite)
dst = dst.replace('.inprogress', '')

self._fstates[parent] = 'finished'
logger.info("Transferred %s -> %s", src, dst)
elif cstates.contains_none('running'):
Expand Down
14 changes: 7 additions & 7 deletions tests/test_multithread.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def test_download_glob(tempdir, azure):
assert len(file_pair_dict.keys()) == 2

lfiles = [os.path.relpath(f, tempdir) for f in file_pair_dict.keys()]
assert sorted(lfiles) == sorted(['x.csv', 'y.csv'])
assert sorted(lfiles) == sorted(['x.csv.inprogress', 'y.csv.inprogress'])

remote_path = test_dir / 'data' / '*' / '*.csv'
down = ADLDownloader(azure, remote_path, tempdir, run=False,
Expand All @@ -165,10 +165,10 @@ def test_download_glob(tempdir, azure):

lfiles = [os.path.relpath(f, tempdir) for f in file_pair_dict.keys()]
assert sorted(lfiles) == sorted([
os.path.join('a', 'x.csv'),
os.path.join('a', 'y.csv'),
os.path.join('b', 'x.csv'),
os.path.join('b', 'y.csv')])
os.path.join('a', 'x.csv.inprogress'),
os.path.join('a', 'y.csv.inprogress'),
os.path.join('b', 'x.csv.inprogress'),
os.path.join('b', 'y.csv.inprogress')])

remote_path = test_dir / 'data' / '*' / 'z.txt'
down = ADLDownloader(azure, remote_path, tempdir, run=False,
Expand All @@ -178,8 +178,8 @@ def test_download_glob(tempdir, azure):

lfiles = [os.path.relpath(f, tempdir) for f in file_pair_dict.keys()]
assert sorted(lfiles) == sorted([
os.path.join('a', 'z.txt'),
os.path.join('b', 'z.txt')])
os.path.join('a', 'z.txt.inprogress'),
os.path.join('b', 'z.txt.inprogress')])


@my_vcr.use_cassette
Expand Down

0 comments on commit 9d4942e

Please sign in to comment.