Skip to content

Commit

Permalink
Release 0.0.40 (#270)
Browse files Browse the repository at this point in the history
* Updated history.rst and version

* Possible fix for issue with infinite loop on read

* Refactoring

* Update pkg to pep 420 standards
  • Loading branch information
akharit authored Jan 9, 2019
1 parent 81311c5 commit d929cf3
Show file tree
Hide file tree
Showing 10 changed files with 58 additions and 113 deletions.
5 changes: 5 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@
Release History
===============

0.0.40 (2019-01-08)
+++++++++++++++++++
* Fix zero length read
* Remove dependence on custom wheel and conform to PEP 420

0.0.39 (2018-11-14)
+++++++++++++++++++
* Fix for Chunked Decoding exception thrown while reading response.content
Expand Down
4 changes: 2 additions & 2 deletions MANIFEST.in
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
recursive-include azure/datalake/store *.py
recursive-include azure/datalake/store/*.py
recursive-include docs *.rst

include setup.py
Expand All @@ -7,6 +7,6 @@ include LICENSE.txt
include MANIFEST.in
include HISTORY.rst
include requirements.txt
include azure_bdist_wheel.py
include azure/__init__.py

prune docs/_build
2 changes: 1 addition & 1 deletion azure/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__import__('pkg_resources').declare_namespace(__name__)
__path__ = __import__('pkgutil').extend_path(__path__, __name__)
2 changes: 1 addition & 1 deletion azure/datalake/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__import__('pkg_resources').declare_namespace(__name__)
__path__ = __import__('pkgutil').extend_path(__path__, __name__)
3 changes: 1 addition & 2 deletions azure/datalake/store/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@
# license information.
# --------------------------------------------------------------------------

__version__ = "0.0.39"

__version__ = "0.0.40"

from .core import AzureDLFileSystem
from .multithread import ADLDownloader
Expand Down
2 changes: 2 additions & 0 deletions azure/datalake/store/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -864,6 +864,8 @@ def read(self, length=-1):
self._read_blocksize()
data_read = self.cache[self.loc - self.start:
min(self.loc - self.start + length, self.end - self.start)]
if not data_read: # Check to catch possible server errors. Ideally shouldn't happen.
break
out += data_read
self.loc += len(data_read)
length -= len(data_read)
Expand Down
82 changes: 40 additions & 42 deletions azure/datalake/store/multiprocessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,33 +6,32 @@
import os
import logging.handlers
from .exceptions import FileNotFoundError


try:
from queue import Empty # Python 3
except ImportError:
from Queue import Empty # Python 2
end_queue_sentinel = [None, None]

exception = None
exception_lock = threading.Lock()
WORKER_THREAD_PER_PROCESS = 50
QUEUE_BUCKET_SIZE = 10
END_QUEUE_SENTINEL = [None, None]
GLOBAL_EXCEPTION = None
GLOBAL_EXCEPTION_LOCK = threading.Lock()


threading
def monitor_exception(exception_queue, process_ids):
global exception
global GLOBAL_EXCEPTION
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)

while True:
try:
excep = exception_queue.get(timeout=0.1)
if excep == end_queue_sentinel:
local_exception = exception_queue.get(timeout=0.1)
if local_exception == END_QUEUE_SENTINEL:
break
logger.log(logging.DEBUG, "Setting global exception")
exception_lock.acquire()
exception = excep
exception_lock.release()
GLOBAL_EXCEPTION_LOCK.acquire()
GLOBAL_EXCEPTION = local_exception
GLOBAL_EXCEPTION_LOCK.release()
logger.log(logging.DEBUG, "Closing processes")
for p in process_ids:
p.terminate()
Expand All @@ -41,7 +40,7 @@ def monitor_exception(exception_queue, process_ids):
p.join()
import thread
logger.log(logging.DEBUG, "Interrupting main")
raise Exception(excep)
raise Exception(local_exception)
except Empty:
pass

Expand All @@ -51,11 +50,11 @@ def log_listener_process(queue):
try:
record = queue.get(timeout=0.1)
queue.task_done()
if record == end_queue_sentinel: # We send this as a sentinel to tell the listener to quit.
if record == END_QUEUE_SENTINEL: # We send this as a sentinel to tell the listener to quit.
break
logger = logging.getLogger(record.name)
logger.handlers.clear()
logger.handle(record) # No level or filter logic applied - just do it!
logger.handle(record) # No level or filter logic applied - just do it!
except Empty: # Try again
pass
except Exception as e:
Expand All @@ -65,14 +64,12 @@ def log_listener_process(queue):


def multi_processor_change_acl(adl, path=None, method_name="", acl_spec="", number_of_sub_process=None):
log_queue = multiprocessing.JoinableQueue()
exception_queue = multiprocessing.Queue()
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
queue_bucket_size = 10
worker_thread_num_per_process = 50

def launch_processes(number_of_processes):
if number_of_processes is None:
number_of_processes = max(2, multiprocessing.cpu_count() - 1)
process_list = []
for i in range(number_of_processes):
process_list.append(multiprocessing.Process(target=processor,
Expand All @@ -84,46 +81,50 @@ def launch_processes(number_of_processes):
def walk(walk_path):
try:
paths = []
all_files = adl._ls(path=walk_path)
all_files = adl.ls(path=walk_path, detail=True)

for files in all_files:
if files['type'] == 'DIRECTORY':
dir_processed_counter.increment() # A new directory to process
walk_thread_pool.submit(walk, files['name'])
paths.append(files['name'])
if len(paths) == queue_bucket_size:
if len(paths) == QUEUE_BUCKET_SIZE:
file_path_queue.put(list(paths))
paths = []
if paths != []:
file_path_queue.put(list(paths)) # For leftover paths < bucket_size
except FileNotFoundError:
pass # Continue in case the file was deleted in between
except:
except Exception:
import traceback
logger.exception("Failed to walk for path: " + str(walk_path) + ". Exiting!")
exception_queue.put(traceback.format_exc())
finally:
dir_processed_counter.decrement() # Processing complete for this directory

# Initialize concurrency primitives
log_queue = multiprocessing.JoinableQueue()
exception_queue = multiprocessing.Queue()
finish_queue_processing_flag = multiprocessing.Event()
file_path_queue = multiprocessing.JoinableQueue()
if number_of_sub_process == None:
number_of_sub_process = max(2, multiprocessing.cpu_count()-1)
dir_processed_counter = CountUpDownLatch()

# Start relevant threads and processes
log_listener = threading.Thread(target=log_listener_process, args=(log_queue,))
log_listener.start()
child_processes = launch_processes(number_of_sub_process)
exception_monitor_thread = threading.Thread(target=monitor_exception, args=(exception_queue, child_processes))
exception_monitor_thread.start()
log_listener = threading.Thread(target=log_listener_process, args=(log_queue,))
log_listener.start()
walk_thread_pool = ThreadPoolExecutor(max_workers=WORKER_THREAD_PER_PROCESS)

dir_processed_counter = CountUpDownLatch()
walk_thread_pool = ThreadPoolExecutor(max_workers=worker_thread_num_per_process)

file_path_queue.put([path]) # Root directory needs to be passed
# Root directory needs to be explicitly passed
file_path_queue.put([path])
dir_processed_counter.increment()
walk(path) # Start processing root directory

if dir_processed_counter.is_zero(): # Done processing all directories. Blocking call.
# Processing starts here
walk(path)

if dir_processed_counter.is_zero(): # Done processing all directories. Blocking call.
walk_thread_pool.shutdown()
file_path_queue.close() # No new elements to add
file_path_queue.join() # Wait for operations to be done
Expand All @@ -135,11 +136,11 @@ def walk(walk_path):

# Cleanup
logger.log(logging.DEBUG, "Sending exception sentinel")
exception_queue.put(end_queue_sentinel)
exception_queue.put(END_QUEUE_SENTINEL)
exception_monitor_thread.join()
logger.log(logging.DEBUG, "Exception monitor thread finished")
logger.log(logging.DEBUG, "Sending logger sentinel")
log_queue.put(end_queue_sentinel)
log_queue.put(END_QUEUE_SENTINEL)
log_queue.join()
log_queue.close()
logger.log(logging.DEBUG, "Log queue closed")
Expand All @@ -159,21 +160,19 @@ def processor(adl, file_path_queue, finish_queue_processing_flag, method_name, a
logger.setLevel(logging.DEBUG)

try:
worker_thread_num_per_process = 50
func_table = {"mod_acl": adl.modify_acl_entries, "set_acl": adl.set_acl, "rem_acl": adl.remove_acl_entries}
function_thread_pool = ThreadPoolExecutor(max_workers=worker_thread_num_per_process)
function_thread_pool = ThreadPoolExecutor(max_workers=WORKER_THREAD_PER_PROCESS)
adl_function = func_table[method_name]
logger.log(logging.DEBUG, "Started processor pid:"+str(os.getpid()))

def func_wrapper(func, path, spec):
try:
func(path=path, acl_spec=spec)
except FileNotFoundError as e:
except FileNotFoundError:
logger.exception("File "+str(path)+" not found")
pass # Exception is being logged in the relevant acl method. Do nothing here
except:
# TODO Raise to parent process
pass
# Complete Exception is being logged in the relevant acl method. Don't print exception here
except Exception as e:
logger.exception("File " + str(path) + " not set. Exception "+str(e))

logger.log(logging.DEBUG, "Completed running on path:" + str(path))

Expand All @@ -189,7 +188,6 @@ def func_wrapper(func, path, spec):

except Exception as e:
import traceback
# TODO Raise to parent process
logger.exception("Exception in pid "+str(os.getpid())+"Exception: " + str(e))
exception_queue.put(traceback.format_exc())
finally:
Expand Down
54 changes: 0 additions & 54 deletions azure_bdist_wheel.py

This file was deleted.

1 change: 0 additions & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
[bdist_wheel]
universal=1
azure-namespace-package=azure-nspkg
16 changes: 6 additions & 10 deletions setup.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,8 @@
#!/usr/bin/env python

import os
from setuptools import find_packages, setup
from io import open
import re
try:
from azure_bdist_wheel import cmdclass
except ImportError:
from distutils import log as logger
logger.warn("Wheel is not available, disabling bdist_wheel hook")
cmdclass = {}

with open('README.rst', encoding='utf-8') as f:
readme = f.read()
Expand Down Expand Up @@ -44,17 +37,20 @@
'Programming Language :: Python :: 3.6',
'License :: OSI Approved :: MIT License',
],
packages=find_packages(exclude=['tests']),
packages=find_packages(exclude=['tests',
# Exclude packages that will be covered by PEP420 or nspkg
'azure',
]),
install_requires=[
'cffi',
'adal>=0.4.2',
'requests>=2.20.0'
'requests>=2.20.0',
],
extras_require={
":python_version<'3.4'": ['pathlib2'],
":python_version<='2.7'": ['futures'],
":python_version<'3.0'": ['azure-nspkg'],
},
long_description=readme + '\n\n' + history,
zip_safe=False,
cmdclass=cmdclass
)

0 comments on commit d929cf3

Please sign in to comment.