diff --git a/.travis.yml b/.travis.yml index a3ce03413..1842e89ca 100644 --- a/.travis.yml +++ b/.travis.yml @@ -39,7 +39,10 @@ after_success: - if [ "$TRAVIS_EVENT_TYPE" == "push" -a "$TRAVIS_BRANCH" == "master" ]; then ./docker/build-trigger.sh Branch "$TRAVIS_BRANCH" "$BUILD_TRIGGER_URL"; fi + - if [ "$TRAVIS_EVENT_TYPE" == "push" -a "$TRAVIS_BRANCH" == "generic-storage" ]; then + ./docker/build-trigger.sh Branch "$TRAVIS_BRANCH" "$BUILD_TRIGGER_URL"; + fi - if [ "$TRAVIS_EVENT_TYPE" == "push" -a -z "$TRAVIS_TAG" ]; then ./bin/push-docs.sh "$GIT_REMOTE" branches "$TRAVIS_BRANCH" "Travis Core Docs Build - ${TRAVIS_BUILD_NUMBER}"; fi - + diff --git a/api/config.py b/api/config.py index c2bbaf696..da24f7ff9 100644 --- a/api/config.py +++ b/api/config.py @@ -7,6 +7,8 @@ import datetime import elasticsearch +from fs import open_fs + from . import util from .dao.dbutil import try_replace_one @@ -21,7 +23,7 @@ logging.getLogger('requests').setLevel(logging.WARNING) # silence Requests library logging.getLogger('paste.httpserver').setLevel(logging.WARNING) # silence Paste library logging.getLogger('elasticsearch').setLevel(logging.WARNING) # silence Elastic library - +logging.getLogger('urllib3').setLevel(logging.WARNING) # silence urllib3 library # NOTE: Keep in sync with environment variables in sample.config file. DEFAULT_CONFIG = { @@ -63,6 +65,8 @@ 'db_server_selection_timeout': '3000', 'data_path': os.path.join(os.path.dirname(__file__), '../persistent/data'), 'elasticsearch_host': 'localhost:9200', + 'fs_url': 'osfs://' + os.path.join(os.path.dirname(__file__), '../persistent/data'), + 'support_legacy_fs': True }, } @@ -321,3 +325,9 @@ def mongo_pipeline(table, pipeline): def get_auth(auth_type): return get_config()['auth'][auth_type] + + +# Storage configuration +fs = open_fs(__config['persistent']['fs_url']) +legacy_fs = open_fs('osfs://' + __config['persistent']['data_path']) +support_legacy_fs = __config['persistent']['support_legacy_fs'] diff --git a/api/download.py b/api/download.py index e75443cde..882b6bda0 100644 --- a/api/download.py +++ b/api/download.py @@ -1,17 +1,18 @@ +import os import bson import pytz -import os.path import tarfile import datetime import cStringIO +import fs.path +import fs.errors + from .web import base from .web.request import AccessType -from . import config -from . import util -from . import validators -import os +from . import config, files, util, validators from .dao.containerutil import pluralize + log = config.log BYTES_IN_MEGABYTE = float(1<<20) @@ -32,7 +33,7 @@ def _filter_check(property_filter, property_values): class Download(base.RequestHandler): - def _append_targets(self, targets, cont_name, container, prefix, total_size, total_cnt, data_path, filters): + def _append_targets(self, targets, cont_name, container, prefix, total_size, total_cnt, filters): for f in container.get('files', []): if filters: filtered = True @@ -46,20 +47,20 @@ def _append_targets(self, targets, cont_name, container, prefix, total_size, tot break if filtered: continue - filepath = os.path.join(data_path, util.path_from_hash(f['hash'])) - if os.path.exists(filepath): # silently skip missing files + + file_path, _ = files.get_valid_file(f) + if file_path: # silently skip missing files if cont_name == 'analyses': - targets.append((filepath, prefix + '/' + ('input' if f.get('input') else 'output') + '/' + f['name'], cont_name, str(container.get('_id')),f['size'])) + targets.append((file_path, prefix + '/' + ('input' if f.get('input') else 'output') + '/' + f['name'], cont_name, str(container.get('_id')), f['size'])) else: - targets.append((filepath, prefix + '/' + f['name'], cont_name, str(container.get('_id')),f['size'])) + targets.append((file_path, prefix + '/' + f['name'], cont_name, str(container.get('_id')), f['size'])) total_size += f['size'] total_cnt += 1 else: - log.warn("Expected {} to exist but it is missing. File will be skipped in download.".format(filepath)) + log.warn("Expected {} to exist but it is missing. File will be skipped in download.".format(file_path)) return total_size, total_cnt def _bulk_preflight_archivestream(self, file_refs): - data_path = config.get_item('persistent', 'data_path') arc_prefix = self.get_param('prefix', 'scitran') file_cnt = 0 total_size = 0 @@ -95,9 +96,9 @@ def _bulk_preflight_archivestream(self, file_refs): log.warn("Expected file {} on Container {} {} to exist but it is missing. File will be skipped in download.".format(filename, cont_name, cont_id)) continue - filepath = os.path.join(data_path, util.path_from_hash(file_obj['hash'])) - if os.path.exists(filepath): # silently skip missing files - targets.append((filepath, cont_name+'/'+cont_id+'/'+file_obj['name'], cont_name, cont_id, file_obj['size'])) + file_path, _ = files.get_valid_file(file_obj) + if file_path: # silently skip missing files + targets.append((file_path, cont_name+'/'+cont_id+'/'+file_obj['name'], cont_name, cont_id, file_obj['size'])) total_size += file_obj['size'] file_cnt += 1 @@ -111,7 +112,6 @@ def _bulk_preflight_archivestream(self, file_refs): def _preflight_archivestream(self, req_spec, collection=None): - data_path = config.get_item('persistent', 'data_path') arc_prefix = self.get_param('prefix', 'scitran') file_cnt = 0 total_size = 0 @@ -136,7 +136,7 @@ def _preflight_archivestream(self, req_spec, collection=None): continue prefix = '/'.join([arc_prefix, project['group'], project['label']]) - total_size, file_cnt = self._append_targets(targets, 'projects', project, prefix, total_size, file_cnt, data_path, req_spec.get('filters')) + total_size, file_cnt = self._append_targets(targets, 'projects', project, prefix, total_size, file_cnt, req_spec.get('filters')) sessions = config.db.sessions.find({'project': item_id, 'deleted': {'$exists': False}}, ['label', 'files', 'uid', 'timestamp', 'timezone', 'subject']) session_dict = {session['_id']: session for session in sessions} @@ -157,19 +157,19 @@ def _preflight_archivestream(self, req_spec, collection=None): for code, subject in subject_dict.iteritems(): subject_prefix = self._path_from_container(prefix, subject, ids_of_paths, code) subject_prefixes[code] = subject_prefix - total_size, file_cnt = self._append_targets(targets, 'subjects', subject, subject_prefix, total_size, file_cnt, data_path, req_spec.get('filters')) + total_size, file_cnt = self._append_targets(targets, 'subjects', subject, subject_prefix, total_size, file_cnt, req_spec.get('filters')) for session in session_dict.itervalues(): subject_code = session['subject'].get('code', 'unknown_subject') subject = subject_dict[subject_code] session_prefix = self._path_from_container(subject_prefixes[subject_code], session, ids_of_paths, session["_id"]) session_prefixes[session['_id']] = session_prefix - total_size, file_cnt = self._append_targets(targets, 'sessions', session, session_prefix, total_size, file_cnt, data_path, req_spec.get('filters')) + total_size, file_cnt = self._append_targets(targets, 'sessions', session, session_prefix, total_size, file_cnt, req_spec.get('filters')) for acq in acquisitions: session = session_dict[acq['session']] acq_prefix = self._path_from_container(session_prefixes[session['_id']], acq, ids_of_paths, acq['_id']) - total_size, file_cnt = self._append_targets(targets, 'acquisitions', acq, acq_prefix, total_size, file_cnt, data_path, req_spec.get('filters')) + total_size, file_cnt = self._append_targets(targets, 'acquisitions', acq, acq_prefix, total_size, file_cnt, req_spec.get('filters')) elif item['level'] == 'session': @@ -184,7 +184,7 @@ def _preflight_archivestream(self, req_spec, collection=None): if not subject.get('code'): subject['code'] = 'unknown_subject' prefix = self._path_from_container(self._path_from_container(project['group'] + '/' + project['label'], subject, ids_of_paths, subject["code"]), session, ids_of_paths, session['_id']) - total_size, file_cnt = self._append_targets(targets, 'sessions', session, prefix, total_size, file_cnt, data_path, req_spec.get('filters')) + total_size, file_cnt = self._append_targets(targets, 'sessions', session, prefix, total_size, file_cnt, req_spec.get('filters')) # If the param `collection` holding a collection id is not None, filter out acquisitions that are not in the collection a_query = {'session': item_id, 'deleted': {'$exists': False}} @@ -194,7 +194,7 @@ def _preflight_archivestream(self, req_spec, collection=None): for acq in acquisitions: acq_prefix = self._path_from_container(prefix, acq, ids_of_paths, acq['_id']) - total_size, file_cnt = self._append_targets(targets, 'acquisitions', acq, acq_prefix, total_size, file_cnt, data_path, req_spec.get('filters')) + total_size, file_cnt = self._append_targets(targets, 'acquisitions', acq, acq_prefix, total_size, file_cnt, req_spec.get('filters')) elif item['level'] == 'acquisition': acq = config.db.acquisitions.find_one(base_query, ['session', 'label', 'files', 'uid', 'timestamp', 'timezone']) @@ -210,7 +210,7 @@ def _preflight_archivestream(self, req_spec, collection=None): project = config.db.projects.find_one({'_id': session['project']}, ['group', 'label']) prefix = self._path_from_container(self._path_from_container(self._path_from_container(project['group'] + '/' + project['label'], subject, ids_of_paths, subject['code']), session, ids_of_paths, session["_id"]), acq, ids_of_paths, acq['_id']) - total_size, file_cnt = self._append_targets(targets, 'acquisitions', acq, prefix, total_size, file_cnt, data_path, req_spec.get('filters')) + total_size, file_cnt = self._append_targets(targets, 'acquisitions', acq, prefix, total_size, file_cnt, req_spec.get('filters')) elif item['level'] == 'analysis': analysis = config.db.analyses.find_one(base_query, ['parent', 'label', 'files', 'uid', 'timestamp']) @@ -220,7 +220,7 @@ def _preflight_archivestream(self, req_spec, collection=None): continue prefix = self._path_from_container("", analysis, ids_of_paths, util.sanitize_string_to_filename(analysis['label'])) filename = 'analysis_' + util.sanitize_string_to_filename(analysis['label']) + '.tar' - total_size, file_cnt = self._append_targets(targets, 'analyses', analysis, prefix, total_size, file_cnt, data_path, req_spec.get('filters')) + total_size, file_cnt = self._append_targets(targets, 'analyses', analysis, prefix, total_size, file_cnt, req_spec.get('filters')) if len(targets) > 0: if not filename: @@ -278,8 +278,9 @@ def archivestream(self, ticket): stream = cStringIO.StringIO() with tarfile.open(mode='w|', fileobj=stream) as archive: for filepath, arcpath, cont_name, cont_id, _ in ticket['target']: - yield archive.gettarinfo(filepath, arcpath).tobuf() - with open(filepath, 'rb') as fd: + file_system = files.get_fs_by_file_path(filepath) + with file_system.open(filepath, 'rb') as fd: + yield archive.gettarinfo(fileobj=fd, arcname=arcpath).tobuf() chunk = '' for chunk in iter(lambda: fd.read(CHUNKSIZE), ''): # pylint: disable=cell-var-from-loop yield chunk @@ -289,11 +290,11 @@ def archivestream(self, ticket): yield stream.getvalue() # get tar stream trailer stream.close() - def symlinkarchivestream(self, ticket, data_path): + def symlinkarchivestream(self, ticket): for filepath, arcpath, cont_name, cont_id, _ in ticket['target']: t = tarfile.TarInfo(name=arcpath) t.type = tarfile.SYMTYPE - t.linkname = os.path.relpath(filepath, data_path) + t.linkname = fs.path.relpath(filepath) yield t.tobuf() self.log_user_access(AccessType.download_file, cont_name=cont_name, cont_id=cont_id, filename=os.path.basename(arcpath), multifile=True, origin_override=ticket['origin']) # log download stream = cStringIO.StringIO() @@ -312,7 +313,7 @@ def download(self): if ticket['ip'] != self.request.client_addr: self.abort(400, 'ticket not for this source IP') if self.get_param('symlinks'): - self.response.app_iter = self.symlinkarchivestream(ticket, config.get_item('persistent', 'data_path')) + self.response.app_iter = self.symlinkarchivestream(ticket) else: self.response.app_iter = self.archivestream(ticket) self.response.headers['Content-Type'] = 'application/octet-stream' diff --git a/api/files.py b/api/files.py index 5060637a4..5b2bfaeee 100644 --- a/api/files.py +++ b/api/files.py @@ -1,115 +1,111 @@ import os import cgi import json -import shutil +import six import hashlib -import collections - -from backports import tempfile - -from . import util -from . import config - -DEFAULT_HASH_ALG='sha384' - -def move_file(path, target_path): - target_dir = os.path.dirname(target_path) - if not os.path.exists(target_dir): - os.makedirs(target_dir) - shutil.move(path, target_path) - -def move_form_file_field_into_cas(file_field): - """ - Given a file form field, move the (downloaded, tempdir-stored) file into the CAS. - - Requires an augmented file field; see upload.process_upload() for details. - """ - - if not file_field.hash or not file_field.path: - raise Exception("Field is not a file field with hash and path") - - base = config.get_item('persistent', 'data_path') - cas = util.path_from_hash(file_field.hash) - move_file(file_field.path, os.path.join(base, cas)) - -def hash_file_formatted(path, hash_alg=None, buffer_size=65536): - """ - Return the scitran-formatted hash of a file, specified by path. - """ - - hash_alg = hash_alg or DEFAULT_HASH_ALG - hasher = hashlib.new(hash_alg) - - with open(path, 'rb') as f: - while True: - data = f.read(buffer_size) - if not data: - break - hasher.update(data) - - return util.format_hash(hash_alg, hasher.hexdigest()) - -class HashingFile(file): - def __init__(self, file_path, hash_alg): - super(HashingFile, self).__init__(file_path, "wb") - self.hash_alg = hashlib.new(hash_alg) - self.hash_name = hash_alg - - def write(self, data): - self.hash_alg.update(data) - return file.write(self, data) +import uuid + +import fs.move +import fs.subfs +import fs.path +import fs.errors + +from . import config, util + +DEFAULT_HASH_ALG = 'sha384' + +class FileProcessor(object): + def __init__(self, base, presistent_fs): + self.base = base + self._tempdir_name = str(uuid.uuid4()) + self._presistent_fs = presistent_fs + self._presistent_fs.makedirs(fs.path.join('tmp', self._tempdir_name), recreate=True) + self._temp_fs = fs.subfs.SubFS(presistent_fs, fs.path.join('tmp', self._tempdir_name)) + + def store_temp_file(self, src_path, dest_path): + if not isinstance(src_path, unicode): + src_path = six.u(src_path) + if not isinstance(dest_path, unicode): + dest_path = six.u(dest_path) + dst_dir = fs.path.dirname(dest_path) + self._presistent_fs.makedirs(dst_dir, recreate=True) + self._presistent_fs.move(src_path=fs.path.join('tmp', self._tempdir_name, src_path), dst_path=dest_path) + + def process_form(self, request): + """ + Some workarounds to make webapp2 process forms in an intelligent way. + Normally webapp2/WebOb Reqest.POST would copy the entire request stream + into a single file on disk. + https://github.com/Pylons/webob/blob/cb9c0b4f51542a7d0ed5cc5bf0a73f528afbe03e/webob/request.py#L787 + https://github.com/moraes/webapp-improved/pull/12 + We pass request.body_file (wrapped wsgi input stream) + to our custom subclass of cgi.FieldStorage to write each upload file + to a separate file on disk, as it comes in off the network stream from the client. + Then we can rename these files to their final destination, + without copying the data gain. + + Returns (tuple): + form: SingleFileFieldStorage instance + tempdir: tempdir the file was stored in. + + Keep tempdir in scope until you don't need it anymore; it will be deleted on GC. + """ + + # Copied from WebOb source: + # https://github.com/Pylons/webob/blob/cb9c0b4f51542a7d0ed5cc5bf0a73f528afbe03e/webob/request.py#L790 + env = request.environ.copy() + env.setdefault('CONTENT_LENGTH', '0') + env['QUERY_STRING'] = '' + + field_storage_class = get_single_file_field_storage(self._temp_fs) + + form = field_storage_class( + fp=request.body_file, environ=env, keep_blank_values=True + ) - def get_hash(self): - return self.hash_alg.hexdigest() + return form - def get_formatted_hash(self): - return util.format_hash(self.hash_name, self.get_hash()) + def hash_file_formatted(self, filepath, f_system, hash_alg=None, buffer_size=65536): + """ + Return the scitran-formatted hash of a file, specified by path. + """ -ParsedFile = collections.namedtuple('ParsedFile', ['info', 'path']) + if not isinstance(filepath, unicode): + filepath = six.u(filepath) -def process_form(request, hash_alg=None): - """ - Some workarounds to make webapp2 process forms in an intelligent way, - and hash files we process. - Normally webapp2/WebOb Reqest.POST would copy the entire request stream - into a single file on disk. - https://github.com/Pylons/webob/blob/cb9c0b4f51542a7d0ed5cc5bf0a73f528afbe03e/webob/request.py#L787 - https://github.com/moraes/webapp-improved/pull/12 - We pass request.body_file (wrapped wsgi input stream) - to our custom subclass of cgi.FieldStorage to write each upload file - to a separate file on disk, as it comes in off the network stream from the client. - Then we can rename these files to their final destination, - without copying the data gain. + hash_alg = hash_alg or DEFAULT_HASH_ALG + hasher = hashlib.new(hash_alg) - Returns (tuple): - form: HashingFieldStorage instance - tempdir: tempdir the file was stored in. + with f_system.open(filepath, 'rb') as f: + while True: + data = f.read(buffer_size) + if not data: + break + hasher.update(data) - Keep tempdir in scope until you don't need it anymore; it will be deleted on GC. - """ + return util.format_hash(hash_alg, hasher.hexdigest()) - hash_alg = hash_alg or DEFAULT_HASH_ALG + @property + def temp_fs(self): + return self._temp_fs - # Store form file fields in a tempdir - tempdir = tempfile.TemporaryDirectory(prefix='.tmp', dir=config.get_item('persistent', 'data_path')) + @property + def persistent_fs(self): + return self._presistent_fs - # Copied from WebOb source: - # https://github.com/Pylons/webob/blob/cb9c0b4f51542a7d0ed5cc5bf0a73f528afbe03e/webob/request.py#L790 - env = request.environ.copy() - env.setdefault('CONTENT_LENGTH', '0') - env['QUERY_STRING'] = '' + def __exit__(self, exc, value, tb): + self.close() - field_storage_class = getHashingFieldStorage( - tempdir.name, DEFAULT_HASH_ALG - ) + def __del__(self): + self.close() - form = field_storage_class( - fp=request.body_file, environ=env, keep_blank_values=True - ) + def close(self): + # Cleaning up + self._presistent_fs.listdir(fs.path.join('tmp', self._tempdir_name)) + self._presistent_fs.removetree(fs.path.join('tmp', self._tempdir_name)) - return (form, tempdir) -def getHashingFieldStorage(upload_dir, hash_alg): +def get_single_file_field_storage(file_system): # pylint: disable=attribute-defined-outside-init # We dynamically create this class because we @@ -120,15 +116,17 @@ def getHashingFieldStorage(upload_dir, hash_alg): # https://github.com/python/cpython/blob/1e3e162ff5c0cc656559c43914439ab3e5734f00/Lib/cgi.py#L696 # https://github.com/python/cpython/blob/1e3e162ff5c0cc656559c43914439ab3e5734f00/Lib/cgi.py#L728 - class HashingFieldStorage(cgi.FieldStorage): - bufsize = 2**20 + class SingleFileFieldStorage(cgi.FieldStorage): + bufsize = 2 ** 20 def make_file(self, binary=None): # Sanitize form's filename (read: prevent malicious escapes, bad characters, etc) - self.filename = os.path.basename(self.filename) - # self.filename = util.sanitize_string_to_filename(self.filename) - self.open_file = HashingFile(os.path.join(upload_dir, self.filename), hash_alg) + self.filename = fs.path.basename(self.filename) + self.hasher = hashlib.new(DEFAULT_HASH_ALG) + if not isinstance(self.filename, unicode): + self.filename = six.u(self.filename) + self.open_file = file_system.open(self.filename, 'wb') return self.open_file # override private method __write of superclass FieldStorage @@ -139,15 +137,21 @@ def _FieldStorage__write(self, line): # Always write fields of type "file" to disk for consistent renaming behavior if self.filename: self.file = self.make_file('') - self.file.write(self._FieldStorage__file.getvalue()) + self.hasher.update(self._FieldStorage__file.getvalue()) self._FieldStorage__file = None + self.file.write(line) - def get_hash(self): - return self.open_file.get_hash() + # NOTE: In case of metadata, we don't have a file name and we also don't have a hasher, + # so skipping the hasher.update + if self.filename: + self.hasher.update(line) - return HashingFieldStorage + return SingleFileFieldStorage + +class FileStoreException(Exception): + pass # File extension --> scitran file type detection hueristics. # Listed in precendence order. @@ -166,3 +170,45 @@ def guess_type_from_filename(filename): else: filetype = None return filetype + + +def get_valid_file(file_info): + file_id = file_info.get('_id', '') + file_hash = file_info.get('hash', '') + file_uuid_path = None + file_hash_path = None + + if file_hash: + file_hash_path = util.path_from_hash(file_hash) + + if file_id: + file_uuid_path = util.path_from_uuid(file_id) + + if config.support_legacy_fs: + if file_hash_path and config.legacy_fs.isfile(file_hash_path): + return file_hash_path, config.legacy_fs + elif file_uuid_path and config.legacy_fs.isfile(file_uuid_path): + return file_uuid_path, config.legacy_fs + + if file_uuid_path and config.fs.isfile(file_uuid_path): + return file_uuid_path, config.fs + else: + raise fs.errors.ResourceNotFound('File not found: %s', file_info['name']) + + +def get_signed_url(file_path, file_system, filename=None): + try: + if hasattr(file_system, 'get_signed_url'): + return file_system.get_signed_url(file_path, filename=filename) + except fs.errors.NoURL: + return None + + +def get_fs_by_file_path(file_path): + if config.support_legacy_fs and config.legacy_fs.isfile(file_path): + return config.legacy_fs + + if config.fs.isfile(file_path): + return config.fs + else: + raise fs.errors.ResourceNotFound('File not found: %s', file_path) diff --git a/api/handlers/containerhandler.py b/api/handlers/containerhandler.py index 2bc5c5609..2a94fc20e 100644 --- a/api/handlers/containerhandler.py +++ b/api/handlers/containerhandler.py @@ -108,10 +108,6 @@ def get(self, cont_name, **kwargs): self._filter_permissions(result, self.uid) if self.is_true('join_avatars'): self.join_user_info([result]) - # build and insert file paths if they are requested - if self.is_true('paths'): - for fileinfo in result['files']: - fileinfo['path'] = util.path_from_hash(fileinfo['hash']) inflate_job_info = cont_name == 'sessions' result['analyses'] = AnalysisStorage().get_analyses(cont_name, _id, inflate_job_info) diff --git a/api/handlers/listhandler.py b/api/handlers/listhandler.py index 5f731a7e9..54d36da41 100644 --- a/api/handlers/listhandler.py +++ b/api/handlers/listhandler.py @@ -8,10 +8,7 @@ import zipfile from ..web import base -from .. import config -from .. import upload -from .. import util -from .. import validators +from .. import config, files, upload, util, validators from ..auth import listauth, always_ok from ..dao import noop from ..dao import liststorage @@ -360,24 +357,25 @@ def _check_ticket(self, ticket_id, _id, filename): return ticket @staticmethod - def build_zip_info(filepath): + def build_zip_info(file_path, file_system): """ Builds a json response containing member and comment info for a zipfile """ - with zipfile.ZipFile(filepath) as zf: - info = {} - info['comment'] = zf.comment - info['members'] = [] - for zi in zf.infolist(): - m = {} - m['path'] = zi.filename - m['size'] = zi.file_size - m['timestamp'] = datetime.datetime(*zi.date_time) - m['comment'] = zi.comment - - info['members'].append(m) - - return info + with file_system.open(file_path, 'rb') as f: + with zipfile.ZipFile(f) as zf: + info = { + 'comment': zf.comment, + 'members': [] + } + for zi in zf.infolist(): + info['members'].append({ + 'path': zi.filename, + 'size': zi.file_size, + 'timestamp': datetime.datetime(*zi.date_time), + 'comment': zi.comment + }) + + return info def get(self, cont_name, list_name, **kwargs): _id = kwargs.pop('cid') @@ -406,7 +404,8 @@ def get(self, cont_name, list_name, **kwargs): hash_ = self.get_param('hash') if hash_ and hash_ != fileinfo['hash']: self.abort(409, 'file exists, hash mismatch') - filepath = os.path.join(config.get_item('persistent', 'data_path'), util.path_from_hash(fileinfo['hash'])) + + file_path, file_system = files.get_valid_file(fileinfo) # Request for download ticket if self.get_param('ticket') == '': @@ -416,18 +415,19 @@ def get(self, cont_name, list_name, **kwargs): # Request for info about zipfile elif self.is_true('info'): try: - info = self.build_zip_info(filepath) + info = self.build_zip_info(file_path, file_system) + return info except zipfile.BadZipfile: self.abort(400, 'not a zip file') - return info # Request to download zipfile member elif self.get_param('member') is not None: zip_member = self.get_param('member') try: - with zipfile.ZipFile(filepath) as zf: - self.response.headers['Content-Type'] = util.guess_mimetype(zip_member) - self.response.write(zf.open(zip_member).read()) + with file_system.open(file_path, 'rb') as f: + with zipfile.ZipFile(f) as zf: + self.response.headers['Content-Type'] = util.guess_mimetype(zip_member) + self.response.write(zf.open(zip_member).read()) except zipfile.BadZipfile: self.abort(400, 'not a zip file') except KeyError: @@ -442,67 +442,71 @@ def get(self, cont_name, list_name, **kwargs): # Authenticated or ticketed download request else: - range_header = self.request.headers.get('Range', '') - try: - if not self.is_true('view'): - raise util.RangeHeaderParseError('Feature flag not set') + signed_url = files.get_signed_url(file_path, file_system, filename=filename) + if signed_url: + self.redirect(signed_url) + else: + range_header = self.request.headers.get('Range', '') + try: + if not self.is_true('view'): + raise util.RangeHeaderParseError('Feature flag not set') - ranges = util.parse_range_header(range_header) - for first, last in ranges: - if first > fileinfo['size'] - 1: - self.abort(416, 'Invalid range') + ranges = util.parse_range_header(range_header) + for first, last in ranges: + if first > fileinfo['size'] - 1: + self.abort(416, 'Invalid range') - if last > fileinfo['size'] - 1: - raise util.RangeHeaderParseError('Invalid range') + if last > fileinfo['size'] - 1: + raise util.RangeHeaderParseError('Invalid range') - except util.RangeHeaderParseError: - self.response.app_iter = open(filepath, 'rb') - self.response.headers['Content-Length'] = str(fileinfo['size']) # must be set after setting app_iter + except util.RangeHeaderParseError: + self.response.app_iter = file_system.open(file_path, 'rb') + self.response.headers['Content-Length'] = str(fileinfo['size']) # must be set after setting app_iter - if self.is_true('view'): - self.response.headers['Content-Type'] = str(fileinfo.get('mimetype', 'application/octet-stream')) - else: - self.response.headers['Content-Type'] = 'application/octet-stream' - self.response.headers['Content-Disposition'] = 'attachment; filename="' + filename + '"' - else: - self.response.status = 206 - if len(ranges) > 1: - self.response.headers['Content-Type'] = 'multipart/byteranges; boundary=%s' % self.request.id + if self.is_true('view'): + self.response.headers['Content-Type'] = str(fileinfo.get('mimetype', 'application/octet-stream')) + else: + self.response.headers['Content-Type'] = 'application/octet-stream' + self.response.headers['Content-Disposition'] = 'attachment; filename="' + filename + '"' else: - self.response.headers['Content-Type'] = str( - fileinfo.get('mimetype', 'application/octet-stream')) - self.response.headers['Content-Range'] = 'bytes %s-%s/%s' % (str(ranges[0][0]), - str(ranges[0][1]), - str(fileinfo['size'])) - with open(filepath, 'rb') as f: - for first, last in ranges: - mode = os.SEEK_SET - if first < 0: - mode = os.SEEK_END - length = abs(first) - elif last is None: - length = fileinfo['size'] - first - else: - if last > fileinfo['size']: + self.response.status = 206 + if len(ranges) > 1: + self.response.headers['Content-Type'] = 'multipart/byteranges; boundary=%s' % self.request.id + else: + self.response.headers['Content-Type'] = str( + fileinfo.get('mimetype', 'application/octet-stream')) + self.response.headers['Content-Range'] = 'bytes %s-%s/%s' % (str(ranges[0][0]), + str(ranges[0][1]), + str(fileinfo['size'])) + with file_system.open(file_path, 'rb') as f: + for first, last in ranges: + mode = os.SEEK_SET + if first < 0: + mode = os.SEEK_END + length = abs(first) + elif last is None: length = fileinfo['size'] - first else: - length = last - first + 1 - - f.seek(first, mode) - data = f.read(length) - - if len(ranges) > 1: - self.response.write('--%s\n' % self.request.id) - self.response.write('Content-Type: %s\n' % str( - fileinfo.get('mimetype', 'application/octet-stream'))) - self.response.write('Content-Range: %s' % 'bytes %s-%s/%s\n' % (str(first), - str(last), - str(fileinfo['size']))) - self.response.write('\n') - self.response.write(data) - self.response.write('\n') - else: - self.response.write(data) + if last > fileinfo['size']: + length = fileinfo['size'] - first + else: + length = last - first + 1 + + f.seek(first, mode) + data = f.read(length) + + if len(ranges) > 1: + self.response.write('--%s\n' % self.request.id) + self.response.write('Content-Type: %s\n' % str( + fileinfo.get('mimetype', 'application/octet-stream'))) + self.response.write('Content-Range: %s' % 'bytes %s-%s/%s\n' % (str(first), + str(last), + str(fileinfo['size']))) + self.response.write('\n') + self.response.write(data) + self.response.write('\n') + else: + self.response.write(data) # log download if we haven't already for this ticket if ticket: diff --git a/api/handlers/refererhandler.py b/api/handlers/refererhandler.py index a04440187..2ca729d4c 100644 --- a/api/handlers/refererhandler.py +++ b/api/handlers/refererhandler.py @@ -7,15 +7,11 @@ """ import bson -import os import zipfile import datetime from abc import ABCMeta, abstractproperty -from .. import config -from .. import upload -from .. import util -from .. import validators +from .. import config, files, upload, util, validators from ..auth import containerauth, always_ok from ..dao import containerstorage, noop from ..dao.basecontainerstorage import ContainerStorage @@ -362,27 +358,25 @@ def download(self, **kwargs): self.abort(404, "{} doesn't exist".format(filename)) else: fileinfo = fileinfo[0] - filepath = os.path.join( - config.get_item('persistent', 'data_path'), - util.path_from_hash(fileinfo['hash']) - ) + file_path, file_system = files.get_valid_file(fileinfo) filename = fileinfo['name'] # Request for info about zipfile if self.is_true('info'): try: - info = FileListHandler.build_zip_info(filepath) + info = FileListHandler.build_zip_info(file_path, file_system) + return info except zipfile.BadZipfile: self.abort(400, 'not a zip file') - return info # Request to download zipfile member elif self.get_param('member') is not None: zip_member = self.get_param('member') try: - with zipfile.ZipFile(filepath) as zf: - self.response.headers['Content-Type'] = util.guess_mimetype(zip_member) - self.response.write(zf.open(zip_member).read()) + with file_system.open(file_path, 'rb') as f: + with zipfile.ZipFile(f) as zf: + self.response.headers['Content-Type'] = util.guess_mimetype(zip_member) + self.response.write(zf.open(zip_member).read()) except zipfile.BadZipfile: self.abort(400, 'not a zip file') except KeyError: @@ -397,7 +391,7 @@ def download(self, **kwargs): # Request to download the file itself else: - self.response.app_iter = open(filepath, 'rb') + self.response.app_iter = file_system.open(file_path, 'rb') self.response.headers['Content-Length'] = str(fileinfo['size']) # must be set after setting app_iter if self.is_true('view'): self.response.headers['Content-Type'] = str(fileinfo.get('mimetype', 'application/octet-stream')) @@ -439,15 +433,13 @@ def _prepare_batch(self, fileinfo, analysis): ## we need a way to avoid this targets = [] total_size = total_cnt = 0 - data_path = config.get_item('persistent', 'data_path') for f in fileinfo: - filepath = os.path.join(data_path, util.path_from_hash(f['hash'])) - if os.path.exists(filepath): # silently skip missing files - targets.append((filepath, - util.sanitize_string_to_filename(analysis['label']) + '/' + ('input' if f.get('input') else 'output') + '/'+ f['name'], - 'analyses', analysis['_id'], f['size'])) - total_size += f['size'] - total_cnt += 1 + file_path, _ = files.get_valid_file(f) + targets.append((file_path, + util.sanitize_string_to_filename(analysis['label']) + '/' + ('input' if f.get('input') else 'output') + '/'+ f['name'], + 'analyses', analysis['_id'], f['size'])) + total_size += f['size'] + total_cnt += 1 return targets, total_size, total_cnt diff --git a/api/jobs/handlers.py b/api/jobs/handlers.py index 3c058c970..60c1f535f 100644 --- a/api/jobs/handlers.py +++ b/api/jobs/handlers.py @@ -3,7 +3,6 @@ """ import bson import copy -import os import StringIO from jsonschema import ValidationError from urlparse import urlparse @@ -11,7 +10,7 @@ from . import batch from .. import config from .. import upload -from .. import util +from .. import files from ..auth import require_drone, require_login, require_admin, has_access from ..auth.apikeys import JobApiKey from ..dao import hierarchy @@ -89,10 +88,12 @@ def download(self, **kwargs): # pragma: no cover """Download gear tarball file""" dl_id = kwargs.pop('cid') gear = get_gear(dl_id) - hash_ = gear['exchange']['rootfs-hash'] - filepath = os.path.join(config.get_item('persistent', 'data_path'), util.path_from_hash('v0-' + hash_.replace(':', '-'))) + file_path, file_system = files.get_valid_file({ + '_id': gear['exchange'].get('rootfs-id', ''), + 'hash': 'v0-' + gear['exchange']['rootfs-hash'].replace(':', '-') + }) - stream = open(filepath, 'rb') + stream = file_system.open(file_path, 'rb') set_for_download(self.response, stream=stream, filename='gear.tar') @require_admin diff --git a/api/placer.py b/api/placer.py index 77aa4eacf..2e1215092 100644 --- a/api/placer.py +++ b/api/placer.py @@ -2,17 +2,14 @@ import copy import datetime import dateutil -import os import pymongo -import shutil +import uuid import zipfile -from backports import tempfile +import fs.path +import fs.errors -from . import config -from . import files -from . import util -from . import validators +from . import config, util, validators from .dao import containerutil, hierarchy from .dao.containerstorage import SessionStorage, AcquisitionStorage from .jobs import rules @@ -27,7 +24,7 @@ class Placer(object): Interface for a placer, which knows how to process files and place them where they belong - on disk and database. """ - def __init__(self, container_type, container, id_, metadata, timestamp, origin, context): + def __init__(self, container_type, container, id_, metadata, timestamp, origin, context, file_processor): self.container_type = container_type self.container = container self.id_ = id_ @@ -46,6 +43,8 @@ def __init__(self, container_type, container, id_, metadata, timestamp, origin, # A list of files that have been saved via save_file() usually returned by finalize() self.saved = [] + self.file_processor = file_processor + def check(self): """ @@ -86,10 +85,9 @@ def save_file(self, field=None, file_attrs=None): Requires an augmented file field; see process_upload() for details. """ - # Save file - if field is not None: - files.move_form_file_field_into_cas(field) + if field is not None and self.file_processor is not None: + self.file_processor.store_temp_file(field.path, util.path_from_uuid(field.uuid)) # Update the DB if file_attrs is not None: @@ -139,8 +137,8 @@ class UIDPlacer(Placer): create_hierarchy = staticmethod(hierarchy.upsert_top_down_hierarchy) match_type = 'uid' - def __init__(self, container_type, container, id_, metadata, timestamp, origin, context): - super(UIDPlacer, self).__init__(container_type, container, id_, metadata, timestamp, origin, context) + def __init__(self, container_type, container, id_, metadata, timestamp, origin, context, file_processor): + super(UIDPlacer, self).__init__(container_type, container, id_, metadata, timestamp, origin, context, file_processor) self.metadata_for_file = {} self.session_id = None self.count = 0 @@ -190,7 +188,7 @@ def process_file_field(self, field, file_attrs): self.save_file(field, file_attrs) else: if field is not None: - files.move_form_file_field_into_cas(field) + self.file_processor.store_temp_file(field.path, util.path_from_uuid(field.uuid)) if file_attrs is not None: container.upsert_file(file_attrs) @@ -362,8 +360,8 @@ class TokenPlacer(Placer): Intended for use with a token that tracks where the files will be stored. """ - def __init__(self, container_type, container, id_, metadata, timestamp, origin, context): - super(TokenPlacer, self).__init__(container_type, container, id_, metadata, timestamp, origin, context) + def __init__(self, container_type, container, id_, metadata, timestamp, origin, context, file_processor): + super(TokenPlacer, self).__init__(container_type, container, id_, metadata, timestamp, origin, context, file_processor) self.paths = [] self.folder = None @@ -380,10 +378,9 @@ def check(self): # upload.clean_packfile_tokens # # It must be kept in sync between each instance. - base_path = config.get_item('persistent', 'data_path') - self.folder = os.path.join(base_path, 'tokens', 'packfile', token) + self.folder = fs.path.join('tokens', 'packfile', token) - util.mkdir_p(self.folder) + util.mkdir_p(self.folder, config.fs) def process_file_field(self, field, file_attrs): self.saved.append(file_attrs) @@ -391,8 +388,8 @@ def process_file_field(self, field, file_attrs): def finalize(self): for path in self.paths: - dest = os.path.join(self.folder, os.path.basename(path)) - shutil.move(path, dest) + dest = fs.path.join(self.folder, path) + self.file_processor.store_temp_file(path, dest) self.recalc_session_compliance() return self.saved @@ -401,8 +398,8 @@ class PackfilePlacer(Placer): A placer that can accept N files, save them into a zip archive, and place the result on an acquisition. """ - def __init__(self, container_type, container, id_, metadata, timestamp, origin, context): - super(PackfilePlacer, self).__init__(container_type, container, id_, metadata, timestamp, origin, context) + def __init__(self, container_type, container, id_, metadata, timestamp, origin, context, file_processor): + super(PackfilePlacer, self).__init__(container_type, container, id_, metadata, timestamp, origin, context, file_processor) # This endpoint is an SSE endpoint self.sse = True @@ -438,10 +435,11 @@ def check(self): # upload.clean_packfile_tokens # # It must be kept in sync between each instance. - base_path = config.get_item('persistent', 'data_path') - self.folder = os.path.join(base_path, 'tokens', 'packfile', token) + self.folder = fs.path.join('tokens', 'packfile', token) - if not os.path.isdir(self.folder): + try: + config.fs.isdir(self.folder) + except fs.errors.ResourceNotFound: raise Exception('Packfile directory does not exist or has been deleted') self.requireMetadata() @@ -479,27 +477,20 @@ def check(self): stamp = minimum # Remember the timestamp integer for later use with os.utime. - self.ziptime = int(dateutil.parser.parse(stamp).strftime('%s')) + self.ziptime = dateutil.parser.parse(stamp) # The zipfile is a santizied acquisition label self.dir_ = util.sanitize_string_to_filename(self.a_label) self.name = self.dir_ + '.zip' - # Make a tempdir to store zip until moved - # OPPORTUNITY: this is also called in files.py. Could be a util func. - self.tempdir = tempfile.TemporaryDirectory(prefix='.tmp', dir=config.get_item('persistent', 'data_path')) - # Create a zip in the tempdir that later gets moved into the CAS. - self.path = os.path.join(self.tempdir.name, 'temp.zip') - self.zip_ = zipfile.ZipFile(self.path, 'w', zipfile.ZIP_DEFLATED, allowZip64=True) + self.path = u'temp.zip' + self.zip_ = zipfile.ZipFile(self.file_processor.temp_fs.open(self.path, 'wb'), + 'w', zipfile.ZIP_DEFLATED, allowZip64=True) # OPPORTUNITY: add zip comment # self.zip.comment = json.dumps(metadata, default=metadata_encoder) - # Bit of a silly hack: write our tempdir directory into the zip (not including its contents). - # Creates an empty directory entry in the zip which will hold all the files inside. - # This way, when you expand a zip, you'll get folder/things instead of a thousand dicoms splattered everywhere. - self.zip_.write(self.tempdir.name, self.dir_) def process_file_field(self, field, file_attrs): # Should not be called with any files @@ -507,19 +498,20 @@ def process_file_field(self, field, file_attrs): def finalize(self): - paths = os.listdir(self.folder) + paths = self.file_processor.persistent_fs.listdir(self.folder) total = len(paths) # Write all files to zip complete = 0 for path in paths: - p = os.path.join(self.folder, path) + full_path = fs.path.join(self.folder, path) # Set the file's mtime & atime. - os.utime(p, (self.ziptime, self.ziptime)) + self.file_processor.persistent_fs.settimes(full_path, self.ziptime, self.ziptime) # Place file into the zip folder we created before - self.zip_.write(p, os.path.join(self.dir_, os.path.basename(path))) + with self.file_processor.persistent_fs.open(full_path, 'rb') as f: + self.zip_.writestr(fs.path.join(self.dir_, path), f.read()) # Report progress complete += 1 @@ -531,7 +523,7 @@ def finalize(self): self.zip_.close() # Remove the folder created by TokenPlacer - shutil.rmtree(self.folder) + self.file_processor.persistent_fs.removetree(self.folder) # Lookup uid on token token = self.context['token'] @@ -546,9 +538,10 @@ def finalize(self): # Not a great practice. See process_upload() for details. cgi_field = util.obj_from_map({ 'filename': self.name, - 'path': self.path, - 'size': os.path.getsize(self.path), - 'hash': files.hash_file_formatted(self.path), + 'path': self.path, + 'size': self.file_processor.temp_fs.getsize(self.path), + 'hash': self.file_processor.hash_file_formatted(self.path, self.file_processor.temp_fs), + 'uuid': str(uuid.uuid4()), 'mimetype': util.guess_mimetype('lol.zip'), 'modified': self.timestamp }) @@ -557,10 +550,11 @@ def finalize(self): # This could be coalesced into a single map thrown on file fields, for example. # Used in the API return. cgi_attrs = { - 'name': cgi_field.filename, + '_id': cgi_field.uuid, + 'name': cgi_field.filename, 'modified': cgi_field.modified, - 'size': cgi_field.size, - 'hash': cgi_field.hash, + 'size': cgi_field.size, + 'hash': cgi_field.hash, 'mimetype': cgi_field.mimetype, 'type': self.metadata['packfile']['type'], @@ -763,7 +757,8 @@ def process_file_field(self, field, file_attrs): proper_hash = file_attrs.get('hash')[3:].replace('-', ':') self.metadata.update({'exchange': {'rootfs-hash': proper_hash, 'git-commit': 'local', - 'rootfs-url': 'INVALID'}}) + 'rootfs-url': 'INVALID', + 'rootfs-id': file_attrs['_id']}}) # self.metadata['hash'] = file_attrs.get('hash') self.save_file(field) self.saved.append(file_attrs) diff --git a/api/upload.py b/api/upload.py index 75de2e470..90c753141 100644 --- a/api/upload.py +++ b/api/upload.py @@ -1,8 +1,9 @@ import bson import datetime import json -import os.path -import shutil +import uuid + +import fs.path from .web import base from .web.errors import FileStoreException, FileFormException @@ -74,7 +75,8 @@ def process_upload(request, strategy, container_type=None, id_=None, origin=None # The vast majority of this function's wall-clock time is spent here. # Tempdir is deleted off disk once out of scope, so let's hold onto this reference. - form, tempdir = files.process_form(request) + file_processor = files.FileProcessor(config.get_item('persistent', 'data_path'), config.fs) + form = file_processor.process_form(request) if 'metadata' in form: try: @@ -83,7 +85,7 @@ def process_upload(request, strategy, container_type=None, id_=None, origin=None raise FileStoreException('wrong format for field "metadata"') placer_class = strategy.value - placer = placer_class(container_type, container, id_, metadata, timestamp, origin, context) + placer = placer_class(container_type, container, id_, metadata, timestamp, origin, context, file_processor) placer.check() # Browsers, when sending a multipart upload, will send files with field name "file" (if sinuglar) @@ -98,31 +100,32 @@ def process_upload(request, strategy, container_type=None, id_=None, origin=None raise FileFormException("Targeted uploads can only send one file") for field in file_fields: + field.file.close() # Augment the cgi.FieldStorage with a variety of custom fields. # Not the best practice. Open to improvements. # These are presumbed to be required by every function later called with field as a parameter. - field.path = os.path.join(tempdir.name, field.filename) - if not os.path.exists(field.path): - tempdir_exists = os.path.exists(tempdir.name) - raise Exception("file {} does not exist, tmpdir {} exists: {}, files in tmpdir: {}".format( + field.path = field.filename + if not file_processor.temp_fs.exists(field.path): + #tempdir_exists = os.path.exists(tempdir.name) + raise Exception("file {} does not exist, files in tmpdir: {}".format( field.path, - tempdir.name, - tempdir_exists, - tempdir_exists and os.listdir(tempdir.name), + file_processor.temp_fs.listdir('/'), )) - field.size = os.path.getsize(field.path) - field.hash = field.file.get_formatted_hash() - field.mimetype = util.guess_mimetype(field.filename) # TODO: does not honor metadata's mime type if any + field.size = file_processor.temp_fs.getsize(field.path) + field.hash = util.format_hash(files.DEFAULT_HASH_ALG, field.hasher.hexdigest()) + field.uuid = str(uuid.uuid4()) + field.mimetype = util.guess_mimetype(field.filename) # TODO: does not honor metadata's mime type if any field.modified = timestamp # create a file-attribute map commonly used elsewhere in the codebase. # Stands in for a dedicated object... for now. file_attrs = { + '_id': field.uuid, 'name': field.filename, - 'modified': field.modified, # + 'modified': field.modified, 'size': field.size, 'mimetype': field.mimetype, - 'hash': field.hash, + 'hash': field.hash, 'origin': origin, 'type': None, @@ -237,15 +240,14 @@ def clean_packfile_tokens(self): # upload.clean_packfile_tokens # # It must be kept in sync between each instance. - basepath = config.get_item('persistent', 'data_path') - folder = os.path.join(basepath, 'tokens', 'packfile') + folder = fs.path.join('tokens', 'packfile') - util.mkdir_p(folder) - paths = os.listdir(folder) + util.mkdir_p(folder, config.fs) + paths = config.fs.listdir(folder) cleaned = 0 for token in paths: - path = os.path.join(folder, token) + path = fs.path.join(folder, token) result = None try: @@ -258,7 +260,7 @@ def clean_packfile_tokens(self): if result is None: log.info('Cleaning expired token directory ' + token) - shutil.rmtree(path) + config.fs.removetree(path) cleaned += 1 return { @@ -269,7 +271,7 @@ def clean_packfile_tokens(self): } def extract_file_fields(form): - """Returns a list of file fields in the form, handling multiple values""" + """Returns a list of file fields in the form, handling multiple values""" result = [] for fieldname in form: field = form[fieldname] diff --git a/api/util.py b/api/util.py index 64d2f85d1..87d1a5f7e 100644 --- a/api/util.py +++ b/api/util.py @@ -1,6 +1,5 @@ import datetime import enum as baseEnum -import errno import hashlib import json import mimetypes @@ -11,6 +10,9 @@ import string import uuid +import fs.path +import fs.errors + import django from django.conf import settings from django.template import Template, Context @@ -197,20 +199,6 @@ def obj_from_map(_map): return type('',(object,),_map)() -def path_from_hash(hash_): - """ - create a filepath from a hash - e.g. - hash_ = v0-sha384-01b395a1cbc0f218 - will return - v0/sha384/01/b3/v0-sha384-01b395a1cbc0f218 - """ - hash_version, hash_alg, actual_hash = hash_.split('-') - first_stanza = actual_hash[0:2] - second_stanza = actual_hash[2:4] - path = (hash_version, hash_alg, first_stanza, second_stanza, hash_) - return os.path.join(*path) - def set_for_download(response, stream=None, filename=None, length=None): """Takes a self.response, and various download options.""" @@ -268,14 +256,11 @@ def __eq__(self, other): else: return super.__eq__(other) -def mkdir_p(path): +def mkdir_p(path, file_system): try: - os.makedirs(path) - except OSError as exc: # Python >2.5 - if exc.errno == errno.EEXIST and os.path.isdir(path): - pass - else: - raise + file_system.makedirs(path) + except fs.errors.DirectoryExists: + pass NONCE_CHARS = string.ascii_letters + string.digits NONCE_LENGTH = 18 @@ -290,6 +275,36 @@ def create_nonce(): return ''.join([NONCE_CHARS[randrange(x)] for _ in range(NONCE_LENGTH)]) +def path_from_uuid(uuid_): + """ + create a filepath from a UUID + e.g. + uuid_ = cbb33a87-6754-4dfd-abd3-7466d4463ebc + will return + cb/b3/cbb33a87-6754-4dfd-abd3-7466d4463ebc + """ + uuid_1 = uuid_.split('-')[0] + first_stanza = uuid_1[0:2] + second_stanza = uuid_1[2:4] + path = (first_stanza, second_stanza, uuid_) + return fs.path.join(*path) + + +def path_from_hash(hash_): + """ + create a filepath from a hash + e.g. + hash_ = v0-sha384-01b395a1cbc0f218 + will return + v0/sha384/01/b3/v0-sha384-01b395a1cbc0f218 + """ + hash_version, hash_alg, actual_hash = hash_.split('-') + first_stanza = actual_hash[0:2] + second_stanza = actual_hash[2:4] + path = (hash_version, hash_alg, first_stanza, second_stanza, hash_) + return os.path.join(*path) + + class RangeHeaderParseError(ValueError): """Exception class representing a string parsing error.""" diff --git a/bin/database.py b/bin/database.py index 375c4b76f..0ce62d838 100755 --- a/bin/database.py +++ b/bin/database.py @@ -1436,4 +1436,4 @@ def upgrade_schema(force_from = None): sys.exit(1) except Exception as e: logging.exception('Unexpected error in database.py') - sys.exit(1) + sys.exit(1) \ No newline at end of file diff --git a/bin/oneoffs/create_placeholders.py b/bin/oneoffs/create_placeholders.py new file mode 100755 index 000000000..534ea88c3 --- /dev/null +++ b/bin/oneoffs/create_placeholders.py @@ -0,0 +1,88 @@ +#!/usr/bin/env python +import logging +import os +import shutil + +from api import util, config + +log = logging.getLogger('create_placeholders') +log.setLevel(logging.INFO) + + +def get_files_by_prefix(document, prefix): + for key in prefix.split('.'): + document = document.get(key, {}) + return document + + +def create_placeholder_file(f_path, extra_content): + target_dir = os.path.dirname(f_path) + if not os.path.exists(target_dir): + os.makedirs(target_dir) + with open(f_path, 'w') as placeholder: + placeholder.write('%s %s' % (f_path, extra_content)) + + +def placeholders_for_collections(): + log.info('Create placeholders for colelctions') + COLLECTIONS_PREFIXES = [('projects', 'files'), + ('acquisitions', 'files'), + ('analyses', 'files'), + ('sessions', 'files'), + ('sessions', 'subject.files'), + ('collections', 'files')] + + _files = [] + + for collection, prefix in COLLECTIONS_PREFIXES: + cursor = config.db.get_collection(collection).find({}) + for document in cursor: + for f in get_files_by_prefix(document, prefix): + f_dict = { + 'collection_id': document.get('_id'), + 'collection': collection, + 'fileinfo': f, + 'prefix': prefix + } + _files.append(f_dict) + + base = config.get_item('persistent', 'data_path') + for i, f in enumerate(_files): + f_path = os.path.join(base, util.path_from_hash(f['fileinfo']['hash'])) + create_placeholder_file(f_path, f['fileinfo']['size']) + + # Show progress + if i % (len(_files) / 10 + 1) == 0: + log.info('Processed %s files of total %s files ...' % (i, len(_files))) + + +def placeholders_for_gears(): + log.info('Create placeholders for gears') + cursor = config.db.get_collection('gears').find({}) + _files = [] + for document in cursor: + if document['exchange']['git-commit'] == 'local': + f_dict = { + 'gear_id': document['_id'], + 'gear_name': document['gear']['name'], + 'exchange': document['exchange'] + } + _files.append(f_dict) + + base = config.get_item('persistent', 'data_path') + for i, f in enumerate(_files): + f_hash = 'v0-' + f['exchange']['rootfs-hash'].replace(':', '-') + f_path = os.path.join(base, util.path_from_hash(f_hash)) + create_placeholder_file(f_path, f['gear_name']) + + # Show progress + if i % (len(_files) / 10 + 1) == 0: + log.info('Processed %s gear files of total %s files ...' % (i, len(_files))) + + +if __name__ == '__main__': + """ + Create placeholder files to help testing a system using sanitized customer DBs without the corresponding data files. + """ + placeholders_for_collections() + placeholders_for_gears() diff --git a/bin/oneoffs/migrate_storage.py b/bin/oneoffs/migrate_storage.py new file mode 100644 index 000000000..0df7fed36 --- /dev/null +++ b/bin/oneoffs/migrate_storage.py @@ -0,0 +1,252 @@ +#!/usr/bin/env python +import argparse +import datetime +import logging +import os +import uuid + +import fs.path +import fs.move + +from api import config, util + +log = logging.getLogger('migrate_storage') +log.setLevel(logging.INFO) + + +class MigrationError(Exception): + pass + + +def get_files_by_prefix(document, prefix): + for key in prefix.split('.'): + document = document.get(key, {}) + return document + + +def show_progress(current_index, total_files): + if current_index % (total_files / 10 + 1) == 0: + log.info('Processed %s files of total %s files ...' % (current_index, total_files)) + + +def cleanup_empty_folders(): + log.info('Cleanup empty folders') + + for _dirpath, _, _ in os.walk(config.get_item('persistent', 'data_path'), topdown=False): + if not (os.listdir(_dirpath) or config.get_item('persistent', 'data_path') == _dirpath): + os.rmdir(_dirpath) + + +def get_containers_files(containers_prefixes): + _files = [] + + for container, prefix in containers_prefixes: + cursor = config.db.get_collection(container).find({}) + for document in cursor: + for f in get_files_by_prefix(document, prefix): + f_dict = { + 'container_id': document.get('_id'), + 'container': container, + 'fileinfo': f, + 'prefix': prefix + } + _files.append(f_dict) + + return _files + + +def get_gears_files(): + cursor = config.db.get_collection('gears').find({}) + _files = [] + + for document in cursor: + if document.get('exchange', {}).get('git-commit', '') == 'local': + f_dict = { + 'gear_id': document['_id'], + 'gear_name': document['gear']['name'], + 'exchange': document['exchange'] + } + _files.append(f_dict) + + return _files + + +def migrate_file(f): + try: + file_id = f['fileinfo'].get('_id', '') + if file_id: + file_path = util.path_from_uuid(file_id) + if not config.fs.isfile(file_path): + """Copy file from legacy to new storage""" + + log.debug('copy file between the legacy and new storage: %s' % file_path) + + dst_dir = fs.path.dirname(file_path) + config.fs.makedirs(dst_dir, recreate=True) + fs.move.copy_file(src_fs=config.legacy_fs, src_path=file_path, dst_fs=config.fs, dst_path=file_path) + else: + """generate uuuid, set the id field and copy the file""" + file_id = str(uuid.uuid4()) + f_old_path = util.path_from_hash(f['fileinfo']['hash']) + f_new_path = util.path_from_uuid(file_id) + + log.debug('copy file %s to %s' % (f_old_path, f_new_path)) + + dst_dir = fs.path.dirname(f_new_path) + config.fs.makedirs(dst_dir, recreate=True) + fs.move.copy_file(src_fs=config.legacy_fs, src_path=f_old_path, dst_fs=config.fs, dst_path=f_new_path) + + update_set = { + f['prefix'] + '.$.modified': datetime.datetime.utcnow(), + f['prefix'] + '.$._id': file_id + } + log.debug('update file in mongo: %s' % update_set) + # Update the file with the newly generated UUID + updated_doc = config.db[f['container']].find_one_and_update( + {'_id': f['container_id'], + f['prefix'] + '.name': f['fileinfo']['name'], + f['prefix'] + '.hash': f['fileinfo']['hash']}, + {'$set': update_set} + ) + + if not updated_doc: + log.info('Probably the following file has been updated during the migration ' + 'and its hash is changed, cleaning up from the new filesystem') + config.fs.remove(f_new_path) + + except Exception as e: + log.exception(e) + raise MigrationError('Wasn\'t able to migrate the \'%s\' ' + 'file in the \'%s\' container (container id: %s)' % + (f['fileinfo']['name'], f['container'], str(f['container_id'])), e) + + +def migrate_containers(): + log.info('Migrate container (projects, acquisitions, sessions, subject, collections) files...') + + container_files = get_containers_files([('projects', 'files'), + ('acquisitions', 'files'), + ('sessions', 'files'), + ('sessions', 'subject.files'), + ('collections', 'files')]) + + for i, f in enumerate(container_files): + migrate_file(f) + show_progress(i + 1, len(container_files)) + + log.info('Migrate analysis files...') + # Refresh the list of container files + container_files = get_containers_files([('projects', 'files'), + ('acquisitions', 'files'), + ('sessions', 'files'), + ('sessions', 'subject.files'), + ('collections', 'files')]) + analysis_files = get_containers_files([('analyses', 'files')]) + + for i, f in enumerate(analysis_files): + match = [cf for cf in container_files if cf['fileinfo']['hash'] == f['fileinfo']['hash'] and cf['fileinfo'].get('_id')] + # The file is already migrated + if len(match) > 0 and not f['fileinfo'].get('_id'): + update_set = { + f['prefix'] + '.$.modified': match[0]['fileinfo']['modified'], + f['prefix'] + '.$._id': match[0]['fileinfo']['_id'] + } + log.debug('update file in mongo: %s' % update_set) + # Update the file with the newly generated UUID + config.db[f['container']].find_one_and_update( + {'_id': f['container_id'], + f['prefix'] + '.name': f['fileinfo']['name'], + f['prefix'] + '.hash': f['fileinfo']['hash']}, + {'$set': update_set} + ) + else: + migrate_file(f) + show_progress(i + 1, len(analysis_files)) + + +def migrate_gears(): + log.info('Migrate gears...') + + _files = get_gears_files() + + for i, f in enumerate(_files): + try: + file_id = f['exchange'].get('rootfs-id', '') + if file_id: + file_path = util.path_from_uuid(file_id) + if not config.fs.isfile(file_path): + """Copy file from legacy to new storage""" + + log.debug('copy file between the legacy and new storage: %s' % file_path) + + dst_dir = fs.path.dirname(file_path) + config.fs.makedirs(dst_dir, recreate=True) + fs.move.copy_file(src_fs=config.legacy_fs, src_path=file_path, dst_fs=config.fs, dst_path=file_path) + else: + file_id = str(uuid.uuid4()) + file_hash = 'v0-' + f['exchange']['rootfs-hash'].replace(':', '-') + f_old_path = util.path_from_hash(file_hash) + f_new_path = util.path_from_uuid(file_id) + + log.debug('copy file %s to %s' % (f_old_path, f_new_path)) + + dst_dir = fs.path.dirname(f_new_path) + config.fs.makedirs(dst_dir, recreate=True) + fs.move.copy_file(src_fs=config.legacy_fs, src_path=f_old_path, dst_fs=config.fs, dst_path=f_new_path) + + update_set = { + 'modified': datetime.datetime.utcnow(), + 'exchange.rootfs-id': file_id + } + + log.debug('update file in mongo: %s' % update_set) + # Update the gear with the newly generated UUID + config.db['gears'].find_one_and_update( + {'_id': f['gear_id']}, + {'$set': update_set} + ) + + show_progress(i + 1, len(_files)) + except Exception as e: + log.exception(e) + raise MigrationError('Wasn\'t able to migrate the \'%s\' gear (gear id: %s)' % + (f['gear_name'], f['gear_id']), e) + + +def migrate_storage(): + """ + Remove CAS logic, generate UUID for the files and move the files from the lagacy storage to the new one. + """ + + parser = argparse.ArgumentParser(prog='Migrate storage') + parser.add_argument('--containers', action='store_true', help='Migrate containers') + parser.add_argument('--gears', action='store_true', help='Migrate gears') + parser.add_argument('--delete-files', action='store_true', help='Delete files from legacy storage') + + + args = parser.parse_args() + + try: + if not (args.containers or args.gears): + migrate_containers() + migrate_gears() + + if args.containers: + migrate_containers() + + if args.gears: + migrate_gears() + + if args.delete_files: + config.legacy_fs.removetree('/') + + except MigrationError as e: + log.exception(e) + exit(1) + finally: + cleanup_empty_folders() + pass + + +if __name__ == '__main__': + migrate_storage() diff --git a/requirements.txt b/requirements.txt index e544505b0..61f6a1f92 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,6 +2,7 @@ backports.tempfile==1.0 django>=1.11.0,<1.12.0 elasticsearch==5.3.0 enum==0.4.6 +fs==2.0.16 jsonschema==2.6.0 Markdown==2.6.5 pymongo==3.2 diff --git a/swagger/examples/file_info_list.json b/swagger/examples/file_info_list.json index 70992e72e..cf4edfcf8 100755 --- a/swagger/examples/file_info_list.json +++ b/swagger/examples/file_info_list.json @@ -4,6 +4,7 @@ "type": "job", "id": "58063f24e5dc5b001657a87f" }, + "_id": "50708f69-7549-4331-bac0-b4789e5c9ca1", "mimetype": "application/octet-stream", "hash": "v0-sha384-12188e00a26650b2baa3f0195337dcf504f4362bb2136eef0cdbefb57159356b1355a0402fca0ab5ab081f21c305e5c2", "name": "cortical_surface_right_hemisphere.obj", @@ -19,6 +20,7 @@ "type": "job", "id": "58065fa7e5dc5b001457a882" }, + "_id": "50708f69-7549-4331-bac0-b4779e5c9ca1", "mimetype": "application/octet-stream", "hash": "v0-sha384-12188e00a26650b2baa3f0195337dcf504f4362bb2136eef0cdbefb57159356b1355a0402fca0ab5ab081f21c305e5c2", "name": "cortical_surface_right_hemisphere.obj", diff --git a/swagger/examples/output/acquisition-list.json b/swagger/examples/output/acquisition-list.json index 1b29d562c..4dde51b80 100644 --- a/swagger/examples/output/acquisition-list.json +++ b/swagger/examples/output/acquisition-list.json @@ -8,6 +8,7 @@ "id": "importer_Admin_Import", "name": "Admin Import" }, + "_id": "50708f69-7549-4331-bac0-b4779e5c9ca1", "mimetype": "application/zip", "measurements": [], "hash": "v0-sha384-dd3c97bfe0ad1fcba75ae6718c6e81038c59af4f447f5db194d52732efa4f955b28455db02eb64cad3e4e55f11e3679f", @@ -47,6 +48,7 @@ "id": "importer_Admin_Import", "name": "Admin Import" }, + "_id": "50708f69-7549-4331-bac0-b4779e5c9ca1", "mimetype": "application/zip", "measurements": [], "hash": "v0-sha384-ca055fb36845db86e4278cf6e185f8674d11a96f4b29af27e401fc495cc82ef6b53a5729c3757713064649dc71c8c725", @@ -86,6 +88,7 @@ "id": "importer_Admin_Import", "name": "Admin Import" }, + "_id": "50708f69-7549-4331-bac0-b4779e5c9ca1", "mimetype": "application/zip", "measurements": [], "hash": "v0-sha384-537e42b1dd8f1feef9844fbfb4f60461361e71cafa7055556097e9d0b9f7fac68c8f234ed126af9412bd43a548948847", diff --git a/swagger/examples/output/acquisition.json b/swagger/examples/output/acquisition.json index fef6ba738..fdffc3310 100644 --- a/swagger/examples/output/acquisition.json +++ b/swagger/examples/output/acquisition.json @@ -7,6 +7,7 @@ "id": "importer_Admin_Import", "name": "Admin Import" }, + "_id": "50708f69-7549-4331-bac0-b4779e5c9ca1", "mimetype": "application/zip", "measurements": [], "hash": "v0-sha384-dd3c97bfe0ad1fcba75ae6718c6e81038c59af4f447f5db194d52732efa4f955b28455db02eb64cad3e4e55f11e3679f", diff --git a/swagger/examples/output/analysis.json b/swagger/examples/output/analysis.json index bb0c4caae..fde0d0aea 100644 --- a/swagger/examples/output/analysis.json +++ b/swagger/examples/output/analysis.json @@ -4,6 +4,7 @@ "type": "job", "id": "58063f24e5dc5b001657a87f" }, + "_id": "50708f69-7549-4331-bac0-b4779e5c9ca1", "mimetype": "application/octet-stream", "hash": "v0-sha384-12188e00a26650b2baa3f0195337dcf504f4362bb2136eef0cdbefb57159356b1355a0402fca0ab5ab081f21c305e5c2", "name": "cortical_surface_right_hemisphere.obj", @@ -20,6 +21,7 @@ "type": "job", "id": "58065fa7e5dc5b001457a882" }, + "_id": "50708f69-7549-4331-bac0-b4779e5c9ca1", "mimetype": "application/octet-stream", "hash": "v0-sha384-12188e00a26650b2baa3f0195337dcf504f4362bb2136eef0cdbefb57159356b1355a0402fca0ab5ab081f21c305e5c2", "name": "cortical_surface_right_hemisphere.obj", diff --git a/swagger/schemas/definitions/file.json b/swagger/schemas/definitions/file.json index f6cc81590..2dbe43b14 100644 --- a/swagger/schemas/definitions/file.json +++ b/swagger/schemas/definitions/file.json @@ -1,6 +1,7 @@ { "$schema": "http://json-schema.org/draft-04/schema#", "definitions":{ + "_id": { "type": "string" }, "name": { "type": "string" }, "file-type": { "type": "string" }, "mimetype": { "type": "string" }, @@ -44,6 +45,7 @@ "file": { "type": "object", "properties": { + "_id": {"$ref":"#/definitions/_id"}, "name": {"$ref":"#/definitions/name"}, "type": {"$ref":"#/definitions/file-type"}, "mimetype": {"$ref":"#/definitions/mimetype"}, diff --git a/swagger/schemas/mongo/file.json b/swagger/schemas/mongo/file.json index 4e077d85a..6fc7cfe30 100644 --- a/swagger/schemas/mongo/file.json +++ b/swagger/schemas/mongo/file.json @@ -2,6 +2,7 @@ "$schema": "http://json-schema.org/draft-04/schema#", "type": "object", "properties": { + "_id": { "type": "string" }, "name": { "type": "string" }, "created": {}, "modified": {}, @@ -24,7 +25,7 @@ "type": "object" } }, - "required": ["name", "created", "modified", "size", "hash"], + "required": ["_id", "name", "created", "modified", "size", "hash"], "key_fields": ["name"], "additionalProperties": false } diff --git a/tests/bin/run-tests-ubuntu.sh b/tests/bin/run-tests-ubuntu.sh index 443fa304f..b2ee2e140 100755 --- a/tests/bin/run-tests-ubuntu.sh +++ b/tests/bin/run-tests-ubuntu.sh @@ -78,6 +78,8 @@ function main() { export SCITRAN_PERSISTENT_DB_LOG_URI=${SCITRAN_PERSISTENT_DB_LOG_URI:-"mongodb://localhost:$SCITRAN_PERSISTENT_DB_PORT/logs"} export SCITRAN_PERSISTENT_PATH=`mktemp -d` export SCITRAN_PERSISTENT_DATA_PATH="$SCITRAN_PERSISTENT_PATH/data" + mkdir -p "$SCITRAN_PERSISTENT_DATA_PATH/v1" + export SCITRAN_PERSISTENT_FS_URL="osfs://$SCITRAN_PERSISTENT_PATH/data/v1" export SCITRAN_CORE_DRONE_SECRET=${SCITRAN_CORE_DRONE_SECRET:-$( openssl rand -base64 32 )} if ${RUN_LINT}; then diff --git a/tests/integration_tests/python/conftest.py b/tests/integration_tests/python/conftest.py index 1cba4892d..e98ec608d 100644 --- a/tests/integration_tests/python/conftest.py +++ b/tests/integration_tests/python/conftest.py @@ -7,10 +7,13 @@ import attrdict import bson +import fs.move +import fs.path import pymongo import pytest import requests +from api import config, files, util # load required envvars w/ the same name SCITRAN_CORE_DRONE_SECRET = os.environ['SCITRAN_CORE_DRONE_SECRET'] @@ -207,6 +210,38 @@ def with_user(data_builder, randstr, as_public): return attrdict.AttrDict(user=user, api_key=api_key, session=session) +@pytest.yield_fixture(scope='function') +def legacy_cas_file(as_admin, api_db, data_builder, randstr, file_form): + """Yield legacy CAS file""" + project = data_builder.create_project() + file_name = '%s.csv' % randstr() + file_content = randstr() + as_admin.post('/projects/' + project + '/files', files=file_form((file_name, file_content))) + + file_info = api_db['projects'].find_one( + {'files.name': file_name} + )['files'][0] + file_id = file_info['_id'] + file_hash = file_info['hash'] + # verify cas backward compatibility + api_db['projects'].find_one_and_update( + {'files.name': file_name}, + {'$unset': {'files.$._id': ''}} + ) + + file_path = unicode(util.path_from_hash(file_hash)) + target_dir = fs.path.dirname(file_path) + if not config.legacy_fs.exists(target_dir): + config.legacy_fs.makedirs(target_dir) + fs.move.move_file(src_fs=config.fs, src_path=util.path_from_uuid(file_id), dst_fs=config.legacy_fs, dst_path=file_path) + + yield (project, file_name, file_content) + + # clean up + config.legacy_fs.remove(file_path) + config.legacy_fs.removetree(target_dir) + api_db['projects'].delete_one({'_id': project}) + class BaseUrlSession(requests.Session): """Requests session subclass using core api's base url""" def request(self, method, url, **kwargs): diff --git a/tests/integration_tests/python/test_containers.py b/tests/integration_tests/python/test_containers.py index 792481a7d..dc11cfda1 100644 --- a/tests/integration_tests/python/test_containers.py +++ b/tests/integration_tests/python/test_containers.py @@ -297,11 +297,6 @@ def test_get_container(data_builder, default_payload, file_form, as_drone, as_ad r = as_public.get('/projects/' + project) assert r.ok - # get container w/ ?paths=true - r = as_public.get('/projects/' + project, params={'paths': 'true'}) - assert r.ok - assert all('path' in f for f in r.json()['files']) - # get container w/ ?join=origin&join=origin_job_gear_name r = as_public.get('/projects/' + project, params={'join': ['origin', 'origin_job_gear_name']}) assert r.ok diff --git a/tests/integration_tests/python/test_download.py b/tests/integration_tests/python/test_download.py index 3abd655bf..eaa46bad3 100644 --- a/tests/integration_tests/python/test_download.py +++ b/tests/integration_tests/python/test_download.py @@ -4,11 +4,11 @@ import zipfile -def test_download(data_builder, file_form, as_admin, api_db): +def test_download_k(data_builder, file_form, as_admin, api_db, legacy_cas_file): project = data_builder.create_project(label='project1') - session = data_builder.create_session(label='session1') - session2 = data_builder.create_session(label='session1') - session3 = data_builder.create_session(label='session1') + session = data_builder.create_session(label='session1', project=project) + session2 = data_builder.create_session(label='session1', project=project) + session3 = data_builder.create_session(label='session1', project=project) acquisition = data_builder.create_acquisition(session=session) acquisition2 = data_builder.create_acquisition(session=session2) acquisition3 = data_builder.create_acquisition(session=session3) @@ -151,8 +151,32 @@ def test_download(data_builder, file_form, as_admin, api_db): r = as_admin.get('/download', params={'ticket': ticket, 'symlinks': 'true'}) assert r.ok + # test legacy cas file handling + (project_legacy, file_name_legacy, file_content) = legacy_cas_file + r = as_admin.post('/download', json={ + 'optional': False, + 'nodes': [ + {'level': 'project', '_id': project_legacy}, + ] + }) + assert r.ok + ticket = r.json()['ticket'] + + # Perform the download + r = as_admin.get('/download', params={'ticket': ticket}) + assert r.ok + + tar_file = cStringIO.StringIO(r.content) + tar = tarfile.open(mode="r", fileobj=tar_file) + + # Verify a single file in tar with correct file name + for tarinfo in tar: + assert os.path.basename(tarinfo.name) == file_name_legacy + + tar.close() -def test_filelist_download(data_builder, file_form, as_admin): + +def test_filelist_download(data_builder, file_form, as_admin, legacy_cas_file): session = data_builder.create_session() zip_cont = cStringIO.StringIO() with zipfile.ZipFile(zip_cont, 'w') as zip_file: @@ -208,6 +232,17 @@ def test_filelist_download(data_builder, file_form, as_admin): r = as_admin.get(session_files + '/two.zip', params={'ticket': ticket, 'member': 'two.csv'}) assert r.ok + # test legacy cas file handling + (project, file_name, file_content) = legacy_cas_file + r = as_admin.get('/projects/' + project + '/files/' + file_name, params={'ticket': ''}) + assert r.ok + + ticket = r.json()['ticket'] + + r = as_admin.get('/projects/' + project + '/files/' + file_name, params={'ticket': ticket}) + assert r.ok + assert r.content == file_content + def test_filelist_range_download(data_builder, as_admin, file_form): session = data_builder.create_session() diff --git a/tests/integration_tests/python/test_jobs.py b/tests/integration_tests/python/test_jobs.py index 220b21140..1e41105cf 100644 --- a/tests/integration_tests/python/test_jobs.py +++ b/tests/integration_tests/python/test_jobs.py @@ -284,6 +284,8 @@ def test_jobs(data_builder, default_payload, as_public, as_user, as_admin, as_ro r = as_admin.post('/jobs/add', json=job6) assert r.status_code == 500 + assert as_root.delete('/gears/' + gear3).ok + # Attempt to set a malformed file reference as input job7 = copy.deepcopy(job_data) job7['inputs'] = { diff --git a/tests/integration_tests/python/test_migrate_storage.py b/tests/integration_tests/python/test_migrate_storage.py new file mode 100644 index 000000000..6695d9f38 --- /dev/null +++ b/tests/integration_tests/python/test_migrate_storage.py @@ -0,0 +1,394 @@ +import os +import sys + +import fs.move +import fs.path +import pytest +import pymongo + +from api import config, util +from bson.objectid import ObjectId + + +def move_file_to_legacy(src, dst): + target_dir = fs.path.dirname(dst) + if not config.legacy_fs.exists(target_dir): + config.legacy_fs.makedirs(target_dir) + fs.move.move_file(src_fs=config.fs, src_path=src, + dst_fs=config.legacy_fs, dst_path=dst) + +@pytest.fixture(scope='function') +def migrate_storage(mocker): + """Enable importing from `bin` and return `undelete.undelete`.""" + bin_path = os.path.join(os.getcwd(), 'bin', 'oneoffs') + mocker.patch('sys.path', [bin_path] + sys.path) + import migrate_storage + return migrate_storage + + +@pytest.yield_fixture(scope='function') +def gears_to_migrate(api_db, as_admin, randstr, file_form): + def gen_gear_meta(gear_name): + return {'gear': { + "version": '0.0.1', + "config": {}, + "name": gear_name, + "inputs": { + "file": { + "base": "file", + "description": "Any image." + } + }, + "maintainer": "Test", + "description": "Test", + "license": "Other", + "author": "Test", + "url": "http://example.example", + "label": "Test Gear", + "flywheel": "0", + "source": "http://example.example" + }} + + gears = [] + + gear_name_1 = randstr() + + file_name = '%s.tar.gz' % randstr() + file_content = randstr() + r = as_admin.post('/gears/temp', files=file_form((file_name, file_content), meta=gen_gear_meta(gear_name_1))) + gear_id_1 = r.json()['_id'] + + r = as_admin.get('/gears/' + gear_id_1) + gear_json_1 = r.json() + + file_hash__1 = 'v0-' + gear_json_1['exchange']['rootfs-hash'].replace(':', '-') + file_id_1 = gear_json_1['exchange']['rootfs-id'] + + file_path = unicode(util.path_from_hash(file_hash__1)) + target_dir = fs.path.dirname(file_path) + if not config.legacy_fs.exists(target_dir): + config.legacy_fs.makedirs(target_dir) + fs.move.move_file(src_fs=config.fs, src_path=util.path_from_uuid(file_id_1), + dst_fs=config.legacy_fs, dst_path=file_path) + + api_db['gears'].find_one_and_update( + {'_id': ObjectId(gear_id_1)}, + {'$unset': {'exchange.rootfs-id': ''}}) + + gears.append((gear_id_1, file_path)) + + gear_name_2 = randstr() + file_name = '%s.tar.gz' % randstr() + file_content = randstr() + r = as_admin.post('/gears/temp', files=file_form((file_name, file_content), meta=gen_gear_meta(gear_name_2))) + gear_id_2 = r.json()['_id'] + + + r = as_admin.get('/gears/' + gear_id_2) + gear_json_2 = r.json() + + file_id_2 = gear_json_2['exchange']['rootfs-id'] + + file_path = unicode(util.path_from_uuid(file_id_2)) + target_dir = fs.path.dirname(file_path) + if not config.legacy_fs.exists(target_dir): + config.legacy_fs.makedirs(target_dir) + fs.move.move_file(src_fs=config.fs, src_path=file_path, + dst_fs=config.legacy_fs, dst_path=file_path) + + gears.append((gear_id_2, file_path)) + + yield gears + + # clean up + gear_json_1 = api_db['gears'].find_one({'_id': ObjectId(gear_id_1)}) + gear_json_2 = api_db['gears'].find_one({'_id': ObjectId(gear_id_2)}) + files_to_delete = [] + files_to_delete.append(util.path_from_uuid(gear_json_1['exchange'].get('rootfs-id', ''))) + files_to_delete.append(util.path_from_uuid(gear_json_1['exchange'].get('rootfs-hash', ''))) + files_to_delete.append(util.path_from_uuid(gear_json_2['exchange'].get('rootfs-id', ''))) + + for f_path in files_to_delete: + try: + config.fs.remove(f_path) + except: + pass + + api_db['gears'].delete_one({'_id': ObjectId(gear_id_1)}) + api_db['gears'].delete_one({'_id': ObjectId(gear_id_2)}) + +@pytest.yield_fixture(scope='function') +def files_to_migrate(data_builder, api_db, as_admin, randstr, file_form): + # Create a project + project_id = data_builder.create_project() + + files = [] + + # Create a CAS file + file_name_1 = '%s.csv' % randstr() + file_content_1 = randstr() + as_admin.post('/projects/' + project_id + '/files', files=file_form((file_name_1, file_content_1))) + + file_info = api_db['projects'].find_one( + {'files.name': file_name_1} + )['files'][0] + file_id_1 = file_info['_id'] + file_hash_1 = file_info['hash'] + url_1 = '/projects/' + project_id + '/files/' + file_name_1 + + api_db['projects'].find_one_and_update( + {'files.name': file_name_1}, + {'$unset': {'files.$._id': ''}} + ) + + move_file_to_legacy(util.path_from_uuid(file_id_1), util.path_from_hash(file_hash_1)) + files.append((project_id, file_name_1, url_1, util.path_from_hash(file_hash_1))) + # Create an UUID file + file_name_2 = '%s.csv' % randstr() + file_content_2 = randstr() + as_admin.post('/projects/' + project_id + '/files', files=file_form((file_name_2, file_content_2))) + + file_info = api_db['projects'].find_one( + {'files.name': file_name_2} + )['files'][1] + file_id_2 = file_info['_id'] + url_2 = '/projects/' + project_id + '/files/' + file_name_2 + + move_file_to_legacy(util.path_from_uuid(file_id_2), util.path_from_uuid(file_id_2)) + files.append((project_id, file_name_2, url_2, util.path_from_uuid(file_id_2))) + + yield files + + # Clean up, get the files + files = api_db['projects'].find_one( + {'_id': ObjectId(project_id)} + )['files'] + # Delete the files + for f in files: + try: + config.fs.remove(util.path_from_uuid(f['_id'])) + except: + pass + +def test_migrate_containers(files_to_migrate, as_admin, migrate_storage): + """Testing collection migration""" + + # get file storing by hash in legacy storage + (_, _, url_1, file_path_1) = files_to_migrate[0] + # get ile storing by uuid in legacy storage + (_, _,url_2, file_path_2) = files_to_migrate[1] + + # get the ticket + r = as_admin.get(url_1, params={'ticket': ''}) + assert r.ok + ticket = r.json()['ticket'] + + # download the file + assert as_admin.get(url_1, params={'ticket': ticket}).ok + + # get the ticket + r = as_admin.get(url_2, params={'ticket': ''}) + assert r.ok + ticket = r.json()['ticket'] + + # download the file + assert as_admin.get(url_2, params={'ticket': ticket}).ok + # run the migration + migrate_storage.migrate_containers() + + # delete files from the legacy storage + config.legacy_fs.remove(file_path_1) + config.legacy_fs.remove(file_path_2) + + # get the files from the new filesystem + # get the ticket + r = as_admin.get(url_1, params={'ticket': ''}) + assert r.ok + ticket = r.json()['ticket'] + + # download the file + assert as_admin.get(url_1, params={'ticket': ticket}).ok + + # get the ticket + r = as_admin.get(url_2, params={'ticket': ''}) + assert r.ok + ticket = r.json()['ticket'] + + # download the file + assert as_admin.get(url_2, params={'ticket': ticket}).ok + +def test_migrate_containers_error(files_to_migrate, migrate_storage): + """Testing that the migration script throws an exception if it couldn't migrate a file""" + + # get file storing by hash in legacy storage + (_, _, url, file_path_1) = files_to_migrate[0] + # get the other file, so we can clean up + (_, _, _, file_path_2) = files_to_migrate[1] + + # delete the file + config.legacy_fs.remove(file_path_1) + + with pytest.raises(migrate_storage.MigrationError): + migrate_storage.migrate_containers() + + # clean up + config.legacy_fs.remove(file_path_2) + + +def test_migrate_gears(gears_to_migrate, as_admin, migrate_storage): + """Testing collection migration""" + + (gear_id_1, gear_file_path_1) = gears_to_migrate[0] + (gear_id_2, gear_file_path_2) = gears_to_migrate[1] + + # get gears before migration + assert as_admin.get('/gears/temp/' + gear_id_1).ok + assert as_admin.get('/gears/temp/' + gear_id_2).ok + + # run migration + migrate_storage.migrate_gears() + + # delete files from the legacy storage + config.legacy_fs.remove(gear_file_path_1) + config.legacy_fs.remove(gear_file_path_2) + + # get the files from the new filesystem + assert as_admin.get('/gears/temp/' + gear_id_1).ok + assert as_admin.get('/gears/temp/' + gear_id_2).ok + + +def test_migrate_gears_error(gears_to_migrate, migrate_storage): + """Testing that the migration script throws an exception if it couldn't migrate a file""" + + # get file storing by hash in legacy storage + (gear_id, gear_file_path_1) = gears_to_migrate[0] + # get the other file, so we can clean up + (_, gear_file_path_2) = gears_to_migrate[1] + + # delete the file + config.legacy_fs.remove(gear_file_path_1) + + with pytest.raises(migrate_storage.MigrationError): + migrate_storage.migrate_gears() + + # clean up + config.legacy_fs.remove(gear_file_path_2) + + +def test_file_replaced_handling(files_to_migrate, migrate_storage, as_admin, file_form, api_db, mocker, caplog): + + origin_find_one_and_update = pymongo.collection.Collection.find_one_and_update + + def mocked(*args, **kwargs): + self = args[0] + filter = args[1] + update = args[2] + + as_admin.post('/projects/' + project_id + '/files', files=file_form((file_name_1, 'new_content'))) + + return origin_find_one_and_update(self, filter, update) + + + with mocker.mock_module.patch.object(pymongo.collection.Collection, 'find_one_and_update', mocked): + # get file storing by hash in legacy storage + (project_id, file_name_1, url_1, file_path_1) = files_to_migrate[0] + # get ile storing by uuid in legacy storage + (_, file_name_2, url_2, file_path_2) = files_to_migrate[1] + + # run the migration + migrate_storage.migrate_containers() + + file_1_id = api_db['projects'].find_one( + {'files.name': file_name_1} + )['files'][0]['_id'] + + file_2_id = api_db['projects'].find_one( + {'files.name': file_name_2} + )['files'][1]['_id'] + + assert config.fs.exists(util.path_from_uuid(file_1_id)) + assert config.fs.exists(util.path_from_uuid(file_2_id)) + + assert any(log.message == 'Probably the following file has been updated during the migration and its hash is changed, cleaning up from the new filesystem' for log in caplog.records) + + +def test_migrate_analysis(files_to_migrate, as_admin, migrate_storage, default_payload, data_builder, file_form): + """Testing analysis migration""" + + # get file storing by hash in legacy storage + (project_id, file_name_1, url_1, file_path_1) = files_to_migrate[0] + # get ile storing by uuid in legacy storage + (_, _,url_2, file_path_2) = files_to_migrate[1] + + gear_doc = default_payload['gear']['gear'] + gear_doc['inputs'] = { + 'csv': { + 'base': 'file' + } + } + gear = data_builder.create_gear(gear=gear_doc) + + # create project analysis (job) using project's file as input + r = as_admin.post('/projects/' + project_id + '/analyses', params={'job': 'true'}, json={ + 'analysis': {'label': 'test analysis job'}, + 'job': { + 'gear_id': gear, + 'inputs': { + 'csv': { + 'type': 'projects', + 'id': project_id, + 'name': file_name_1 + } + }, + 'tags': ['example'] + } + }) + assert r.ok + analysis_id1 = r.json()['_id'] + + r = as_admin.get('/projects/' + project_id + '/analyses/' + analysis_id1) + assert r.ok + analysis_files1 = '/projects/' + project_id + '/analyses/' + analysis_id1 + '/files' + + # run the migration + migrate_storage.migrate_containers() + + # delete files from the legacy storage + config.legacy_fs.remove(file_path_1) + config.legacy_fs.remove(file_path_2) + + # get the files from the new filesystem + # get the ticket + r = as_admin.get(url_1, params={'ticket': ''}) + assert r.ok + ticket = r.json()['ticket'] + + # download the file + assert as_admin.get(url_1, params={'ticket': ticket}).ok + + # get the ticket + r = as_admin.get(url_2, params={'ticket': ''}) + assert r.ok + ticket = r.json()['ticket'] + + # download the file + assert as_admin.get(url_2, params={'ticket': ticket}).ok + + # get analysis download ticket for single file + r = as_admin.get(analysis_files1 + '/' + file_name_1, params={'ticket': ''}) + assert r.ok + ticket = r.json()['ticket'] + + # download single analysis file w/ ticket + r = as_admin.get(analysis_files1 + '/' + file_name_1, params={'ticket': ticket}) + assert r.ok + + r = as_admin.get('/projects/' + project_id + '/analyses/' + analysis_id1) + assert r.ok + input_file_id = r.json()['files'][0]['_id'] + + r = as_admin.get('/projects/' + project_id) + assert r.ok + project_file_id = r.json()['files'][0]['_id'] + + assert input_file_id == project_file_id diff --git a/tests/integration_tests/python/test_resolver.py b/tests/integration_tests/python/test_resolver.py index 37c988226..b772a03d0 100644 --- a/tests/integration_tests/python/test_resolver.py +++ b/tests/integration_tests/python/test_resolver.py @@ -74,6 +74,7 @@ def test_resolver(data_builder, as_admin, as_user, as_public, file_form): project_file = 'project_file' r = as_admin.post('/projects/' + project + '/files', files=file_form(project_file)) assert r.ok + project_file_id = r.json()[0]['_id'] # save the file id for later usage r = as_admin.post('/resolve', json={'path': [group, project_label]}) result = r.json() assert r.ok @@ -95,7 +96,7 @@ def test_resolver(data_builder, as_admin, as_user, as_public, file_form): r = as_admin.post('/resolve', json={'path': [group, project_label, project_file]}) result = r.json() assert r.ok - assert path_in_result([group, project, project_file], result) + assert path_in_result([group, project, project_file_id], result) assert result['children'] == [] # try to resolve non-existent root/group/project/child @@ -115,6 +116,7 @@ def test_resolver(data_builder, as_admin, as_user, as_public, file_form): session_file = 'session_file' r = as_admin.post('/sessions/' + session + '/files', files=file_form(session_file)) assert r.ok + session_file_id = r.json()[0]['_id'] r = as_admin.post('/resolve', json={'path': [group, project_label, session_label]}) result = r.json() assert r.ok @@ -136,7 +138,7 @@ def test_resolver(data_builder, as_admin, as_user, as_public, file_form): r = as_admin.post('/resolve', json={'path': [group, project_label, session_label, session_file]}) result = r.json() assert r.ok - assert path_in_result([group, project, session, session_file], result) + assert path_in_result([group, project, session, session_file_id], result) assert result['children'] == [] # try to resolve non-existent root/group/project/session/child @@ -156,6 +158,7 @@ def test_resolver(data_builder, as_admin, as_user, as_public, file_form): acquisition_file = 'acquisition_file' r = as_admin.post('/acquisitions/' + acquisition + '/files', files=file_form(acquisition_file)) assert r.ok + acquisition_file_id = r.json()[0]['_id'] r = as_admin.post('/resolve', json={'path': [group, project_label, session_label, acquisition_label]}) result = r.json() assert r.ok @@ -167,17 +170,17 @@ def test_resolver(data_builder, as_admin, as_user, as_public, file_form): r = as_admin.post('/resolve', json={'path': [group, project_label, session_label, acquisition_label, acquisition_file]}) result = r.json() assert r.ok - assert path_in_result([group, project, session, acquisition, acquisition_file], result) + assert path_in_result([group, project, session, acquisition, acquisition_file_id], result) assert result['children'] == [] def idz(s): return '' # resolve root/group/project/session/acquisition/file with id - r = as_admin.post('/resolve', json={'path': [idz(group), idz(project), idz(session), idz(acquisition), acquisition_file]}) + r = as_admin.post('/resolve', json={'path': [idz(group), idz(project), idz(session), idz(acquisition), idz(acquisition_file)]}) result = r.json() assert r.ok - assert path_in_result([group, project, session, acquisition, acquisition_file], result) + assert path_in_result([group, project, session, acquisition, acquisition_file_id], result) assert result['children'] == [] # try to resolve non-existent root/group/project/session/acquisition/child diff --git a/tests/integration_tests/python/test_undelete.py b/tests/integration_tests/python/test_undelete.py index 0af999ef2..2b2550008 100644 --- a/tests/integration_tests/python/test_undelete.py +++ b/tests/integration_tests/python/test_undelete.py @@ -116,7 +116,7 @@ def test_undelete_scope(undelete, containers, as_admin, api_db): def test_undelete_options(undelete, containers): - with pytest.raises(RuntimeError, match=r'use --include-parents'): + with pytest.raises(RuntimeError, match=r'use `--include-parents`'): undelete('acquisitions', containers.ac_1_1_1, filename='f_1_1_1_1') undelete('acquisitions', containers.ac_1_1_1, filename='f_1_1_1_1', include_parents=True) diff --git a/tests/integration_tests/requirements-integration-test.txt b/tests/integration_tests/requirements-integration-test.txt index 53c9cec21..474c13d0b 100644 --- a/tests/integration_tests/requirements-integration-test.txt +++ b/tests/integration_tests/requirements-integration-test.txt @@ -4,9 +4,9 @@ mock==2.0.0 mongomock==3.8.0 pdbpp==0.8.3 pylint==1.5.3 -pytest-cov==2.2.0 -pytest-mock==1.6.0 -pytest-watch==3.8.0 -pytest==2.8.5 +pytest-cov==2.5.1 +pytest-mock==1.7.0 +pytest-watch==4.1.0 +pytest==3.4.1 requests_mock==1.3.0 testfixtures==4.10.1