Skip to content

Commit

Permalink
Merge pull request #135 from begoldsm/master
Browse files Browse the repository at this point in the history
Version 0.0.2
* Better 2FA support
* Fixes for overwriting existing files and folders
* Support for upload and download of empty files
  • Loading branch information
begoldsm authored Feb 1, 2017
2 parents b512f40 + 367481f commit bb89169
Show file tree
Hide file tree
Showing 82 changed files with 68,728 additions and 3,897 deletions.
19 changes: 19 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
.. :changelog:
Release History
===============

0.0.2 (2017-01-30)
++++++++++++++++++

* Addresses an issue with lib.auth() not properly defaulting to 2FA
* Fixes an issue with Overwrite for ADLUploader sometimes not being honored.
* Fixes an issue with empty files not properly being uploaded and resulting in a hang in progress tracking.
* Addition of a samples directory showcasing examples of how to use the client and upload and download logic.
* General cleanup of documentation and comments.
* This is still based on API version 2016-11-01

0.0.1 (2016-11-21)
++++++++++++++++++
* Initial preview release. Based on API version 2016-11-01.
* Includes initial ADLS filesystem functionality and extended upload and download support.
2 changes: 1 addition & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ To play with the code, here is a starting point:
multithread.ADLDownloader(adl, "", 'my_temp_dir', 5, 2**24)
Command Line Sample Usage
------------------
-------------------------
To interact with the API at a higher-level, you can use the provided
command-line interface in "samples/cli.py". You will need to set
the appropriate environment variables as described above to connect to the
Expand Down
2 changes: 1 addition & 1 deletion azure/datalake/store/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# license information.
# --------------------------------------------------------------------------

__version__ = "0.0.1"
__version__ = "0.0.2"

from .core import AzureDLFileSystem
from .multithread import ADLDownloader
Expand Down
4 changes: 2 additions & 2 deletions azure/datalake/store/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -733,8 +733,8 @@ def __exit__(self, *args):

def _fetch_range(rest, path, start, end, stream=False):
logger.debug('Fetch: %s, %s-%s', path, start, end)
if end <= start:
return b''
# if the caller gives a bad start/end combination, OPEN will throw and
# this call will bubble it up
return rest.call(
'OPEN', path, offset=start, length=end-start, read='true', stream=stream)

Expand Down
28 changes: 13 additions & 15 deletions azure/datalake/store/multithread.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,13 +299,10 @@ class ADLUploader(object):
block for each API call. This block cannot be bigger than a chunk.
client: ADLTransferClient [None]
Set an instance of ADLTransferClient when finer-grained control over
transfer parameters is needed. Ignores `nthreads`, `chunksize`, and
`delimiter` set by constructor.
transfer parameters is needed. Ignores `nthreads` and `chunksize`
set by constructor.
run: bool [True]
Whether to begin executing immediately.
delimiter: byte(s) or None
If set, will write blocks using delimiters in the backend, as well as
split files for uploading on that delimiter.
overwrite: bool [False]
Whether to forcibly overwrite existing files/directories. If False and
remote path is a directory, will quit regardless if any files would be
Expand All @@ -318,7 +315,7 @@ class ADLUploader(object):
"""
def __init__(self, adlfs, rpath, lpath, nthreads=None, chunksize=2**28,
buffersize=2**22, blocksize=2**22, client=None, run=True,
delimiter=None, overwrite=False, verbose=True):
overwrite=False, verbose=True):
if not overwrite and adlfs.exists(rpath):
raise FileExistsError(rpath)

Expand All @@ -338,7 +335,7 @@ def __init__(self, adlfs, rpath, lpath, nthreads=None, chunksize=2**28,
chunksize=chunksize,
buffersize=buffersize,
blocksize=blocksize,
delimiter=delimiter,
delimiter=None, # TODO: see utils.cs for what is required to support delimiters.
parent=self,
verbose=verbose,
unique_temporary=True)
Expand Down Expand Up @@ -468,14 +465,15 @@ def put_chunk(adlfs, src, dst, offset, size, buffersize, blocksize, delimiter=No
with adlfs.open(dst, 'wb', blocksize=buffersize, delimiter=delimiter) as fout:
end = offset + size
miniblock = min(size, blocksize)
with open(src, 'rb') as fin:
for o in range(offset, end, miniblock):
if shutdown_event and shutdown_event.is_set():
return nbytes, None
data = read_block(fin, o, miniblock, delimiter)
nwritten = fout.write(data)
if nwritten:
nbytes += nwritten
# For empty files there is no need to take the IO hit.
if size != 0:
with open(src, 'rb') as fin:
for o in range(offset, end, miniblock):
if shutdown_event and shutdown_event.is_set():
return nbytes, None
data = read_block(fin, o, miniblock, delimiter)
nbytes += fout.write(data)

except Exception as e:
exception = repr(e)
logger.error('Upload failed %s; %s', src, exception)
Expand Down
8 changes: 8 additions & 0 deletions azure/datalake/store/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,14 @@ def submit(self, src, dst, length):
tmpdir = None

offsets = list(range(0, length, self._chunksize))

# in the case of empty files, ensure that the initial offset of 0 is properly added.
if not offsets:
if not length:
offsets.append(0)
else:
raise DatalakeIncompleteTransferException('Could not compute offsets for source: {}, with destination: {} and expected length: {}.'.format(src, dst, length))

for offset in offsets:
if tmpdir and len(offsets) > 1:
name = tmpdir / "{}_{}".format(dst.name, offset)
Expand Down
94 changes: 36 additions & 58 deletions azure/datalake/store/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import platform
import sys


PY2 = sys.version_info.major == 2

WIN = platform.system() == 'Windows'
Expand All @@ -27,7 +26,6 @@
except:
pass


def ensure_writable(b):
if PY2 and isinstance(b, array.array):
return b.tostring()
Expand All @@ -48,19 +46,21 @@ def read_block(f, offset, length, delimiter=None):
Parameters
----------
fn: string
Path to filename on S3
fn: file object
a file object that supports seek, tell and read.
offset: int
Byte offset to start read
length: int
Number of bytes to read
Maximum number of bytes to read
delimiter: bytes (optional)
Ensure reading starts and stops at delimiter bytestring
Ensure reading stops at delimiter bytestring
If using the ``delimiter=`` keyword argument we ensure that the read
starts and stops at delimiter boundaries that follow the locations
``offset`` and ``offset + length``. If ``offset`` is zero then we
start at zero. The bytestring returned WILL include the
stops at or before the delimiter boundaries that follow the location
``offset + length``. For ADL, if no delimiter is found and the data
requested is > 4MB an exception is raised, since a single record cannot
exceed 4MB and be guaranteed to land contiguously in ADL.
The bytestring returned WILL include the
terminating delimiter string.
Examples
Expand All @@ -72,62 +72,40 @@ def read_block(f, offset, length, delimiter=None):
b'Alice, 100\\nBo'
>>> read_block(f, 0, 13, delimiter=b'\\n') # doctest: +SKIP
b'Alice, 100\\nBob, 200\\n'
b'Alice, 100\\n'
>>> read_block(f, 10, 10, delimiter=b'\\n') # doctest: +SKIP
b'Bob, 200\\nCharlie, 300'
b'\\nCharlie, 300'
>>> f = BytesIO(bytearray(2**22)) # doctest: +SKIP
>>> read_block(f,0,2**22, delimiter=b'\\n') # doctest: +SKIP
IndexError: No delimiter found within max record size of 4MB.
Transfer without specifying a delimiter (as binary) instead.
"""
if delimiter:
f.seek(offset)
seek_delimiter(f, delimiter, 2**16)
start = f.tell()
length -= start - offset

f.seek(start + length)
seek_delimiter(f, delimiter, 2**16)
end = f.tell()
eof = not f.read(1)

offset = start
length = end - start

f.seek(offset)
bytes = f.read(length)
return bytes


def seek_delimiter(file, delimiter, blocksize):
""" Seek current file to next byte after a delimiter bytestring
This seeks the file to the next byte following the delimiter. It does
not return anything. Use ``file.tell()`` to see location afterwards.
Parameters
----------
file: a file
delimiter: bytes
a delimiter like ``b'\n'`` or message sentinel
blocksize: int
Number of bytes to read from the file at once.
"""

if file.tell() == 0:
return

last = b''
while True:
current = file.read(blocksize)
if not current:
return
full = last + current
if delimiter:
# max record size is 4MB
max_record = 2**22
if length > max_record:
raise IndexError('Records larger than ' + str(max_record) + ' bytes are not supported. The length requested was: ' + str(length) + 'bytes')
# get the last index of the delimiter if it exists
try:
i = full.index(delimiter)
file.seek(file.tell() - (len(full) - i) + len(delimiter))
return
last_delim_index = len(bytes) -1 - bytes[::-1].index(delimiter)
# this ensures the length includes all of the last delimiter (in the event that it is more than one character)
length = last_delim_index + len(delimiter)
return bytes[0:length]
except ValueError:
pass
last = full[-len(delimiter):]

# TODO: Before delimters can be supported through the ADLUploader logic, the number of chunks being uploaded
# needs to be visible to this method, since it needs to throw if:
# 1. We cannot find a delimiter in <= 4MB of data
# 2. If the remaining size is less than 4MB but there are multiple chunks that need to be stitched together,
# since the delimiter could be split across chunks.
# 3. If delimiters are specified, there must be logic during segment determination that ensures all chunks
# terminate at the end of a record (on a new line), even if that makes the chunk < 256MB.
if length >= max_record:
raise IndexError('No delimiter found within max record size of ' + str(max_record) + ' bytes. Transfer without specifying a delimiter (as binary) instead.')

return bytes

def tokenize(*args, **kwargs):
""" Deterministic token
Expand Down
6 changes: 4 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

with open('README.rst', encoding='utf-8') as f:
readme = f.read()
with open('HISTORY.rst', encoding='utf-8') as f:
history = f.read()

# Version extraction inspired from 'requests'
with open('azure/datalake/store/__init__.py', 'r') as fd:
Expand All @@ -18,7 +20,7 @@

setup(name='azure-datalake-store',
version=version,
description='Convenient Filesystem interface to Azure Data-lake Store',
description='Convenient Filesystem interface to Azure Data Lake Store',
url='https://github.com/Azure/azure-data-lake-store-python',
author='Microsoft Corporation',
author_email='',
Expand Down Expand Up @@ -46,6 +48,6 @@
":python_version<'3.4'": ['pathlib2'],
":python_version<='2.7'": ['futures'],
},
long_description=readme,
long_description=readme + '\n\n' + history,
zip_safe=False
)
3 changes: 3 additions & 0 deletions tests/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ To run the test suite against the published package:
py.test -x -vvv --doctest-modules --pyargs azure-datalake-store tests

To run the test suite against a local build:

python setup.py develop

py.test -x -vvv --doctest-modules --pyargs azure.datalake.store tests

This test suite uses [VCR.py](https://github.com/kevin1024/vcrpy) to record the
Expand All @@ -28,5 +30,6 @@ environment variables should be defined:
* `azure_password`
* `azure_data_lake_store_name`
* `azure_subscription_id`
* `azure_resource_group_name`

Optionally, you may need to define `azure_tenant_id` or `azure_url_suffix`.
Loading

0 comments on commit bb89169

Please sign in to comment.