Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generic storage #1059

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ after_success:
- if [ "$TRAVIS_EVENT_TYPE" == "push" -a "$TRAVIS_BRANCH" == "master" ]; then
./docker/build-trigger.sh Branch "$TRAVIS_BRANCH" "$BUILD_TRIGGER_URL";
fi
- if [ "$TRAVIS_EVENT_TYPE" == "push" -a "$TRAVIS_BRANCH" == "generic-storage" ]; then
./docker/build-trigger.sh Branch "$TRAVIS_BRANCH" "$BUILD_TRIGGER_URL";
fi
- if [ "$TRAVIS_EVENT_TYPE" == "push" -a -z "$TRAVIS_TAG" ]; then
./bin/push-docs.sh "$GIT_REMOTE" branches "$TRAVIS_BRANCH" "Travis Core Docs Build - ${TRAVIS_BUILD_NUMBER}";
fi

12 changes: 11 additions & 1 deletion api/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import datetime
import elasticsearch

from fs import open_fs

from . import util
from .dao.dbutil import try_replace_one

Expand All @@ -21,7 +23,7 @@
logging.getLogger('requests').setLevel(logging.WARNING) # silence Requests library
logging.getLogger('paste.httpserver').setLevel(logging.WARNING) # silence Paste library
logging.getLogger('elasticsearch').setLevel(logging.WARNING) # silence Elastic library

logging.getLogger('urllib3').setLevel(logging.WARNING) # silence urllib3 library

# NOTE: Keep in sync with environment variables in sample.config file.
DEFAULT_CONFIG = {
Expand Down Expand Up @@ -63,6 +65,8 @@
'db_server_selection_timeout': '3000',
'data_path': os.path.join(os.path.dirname(__file__), '../persistent/data'),
'elasticsearch_host': 'localhost:9200',
'fs_url': 'osfs://' + os.path.join(os.path.dirname(__file__), '../persistent/data'),
'support_legacy_fs': True
},
}

Expand Down Expand Up @@ -321,3 +325,9 @@ def mongo_pipeline(table, pipeline):

def get_auth(auth_type):
return get_config()['auth'][auth_type]


# Storage configuration
fs = open_fs(__config['persistent']['fs_url'])
legacy_fs = open_fs('osfs://' + __config['persistent']['data_path'])
support_legacy_fs = __config['persistent']['support_legacy_fs']
59 changes: 30 additions & 29 deletions api/download.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
import os
import bson
import pytz
import os.path
import tarfile
import datetime
import cStringIO

import fs.path
import fs.errors

from .web import base
from .web.request import AccessType
from . import config
from . import util
from . import validators
import os
from . import config, files, util, validators
from .dao.containerutil import pluralize

log = config.log

BYTES_IN_MEGABYTE = float(1<<20)
Expand All @@ -32,7 +33,7 @@ def _filter_check(property_filter, property_values):

class Download(base.RequestHandler):

def _append_targets(self, targets, cont_name, container, prefix, total_size, total_cnt, data_path, filters):
def _append_targets(self, targets, cont_name, container, prefix, total_size, total_cnt, filters):
for f in container.get('files', []):
if filters:
filtered = True
Expand All @@ -46,20 +47,20 @@ def _append_targets(self, targets, cont_name, container, prefix, total_size, tot
break
if filtered:
continue
filepath = os.path.join(data_path, util.path_from_hash(f['hash']))
if os.path.exists(filepath): # silently skip missing files

file_path, _ = files.get_valid_file(f)
if file_path: # silently skip missing files
if cont_name == 'analyses':
targets.append((filepath, prefix + '/' + ('input' if f.get('input') else 'output') + '/' + f['name'], cont_name, str(container.get('_id')),f['size']))
targets.append((file_path, prefix + '/' + ('input' if f.get('input') else 'output') + '/' + f['name'], cont_name, str(container.get('_id')), f['size']))
else:
targets.append((filepath, prefix + '/' + f['name'], cont_name, str(container.get('_id')),f['size']))
targets.append((file_path, prefix + '/' + f['name'], cont_name, str(container.get('_id')), f['size']))
total_size += f['size']
total_cnt += 1
else:
log.warn("Expected {} to exist but it is missing. File will be skipped in download.".format(filepath))
log.warn("Expected {} to exist but it is missing. File will be skipped in download.".format(file_path))
return total_size, total_cnt

def _bulk_preflight_archivestream(self, file_refs):
data_path = config.get_item('persistent', 'data_path')
arc_prefix = self.get_param('prefix', 'scitran')
file_cnt = 0
total_size = 0
Expand Down Expand Up @@ -95,9 +96,9 @@ def _bulk_preflight_archivestream(self, file_refs):
log.warn("Expected file {} on Container {} {} to exist but it is missing. File will be skipped in download.".format(filename, cont_name, cont_id))
continue

filepath = os.path.join(data_path, util.path_from_hash(file_obj['hash']))
if os.path.exists(filepath): # silently skip missing files
targets.append((filepath, cont_name+'/'+cont_id+'/'+file_obj['name'], cont_name, cont_id, file_obj['size']))
file_path, _ = files.get_valid_file(file_obj)
if file_path: # silently skip missing files
targets.append((file_path, cont_name+'/'+cont_id+'/'+file_obj['name'], cont_name, cont_id, file_obj['size']))
total_size += file_obj['size']
file_cnt += 1

Expand All @@ -111,7 +112,6 @@ def _bulk_preflight_archivestream(self, file_refs):


def _preflight_archivestream(self, req_spec, collection=None):
data_path = config.get_item('persistent', 'data_path')
arc_prefix = self.get_param('prefix', 'scitran')
file_cnt = 0
total_size = 0
Expand All @@ -136,7 +136,7 @@ def _preflight_archivestream(self, req_spec, collection=None):
continue

prefix = '/'.join([arc_prefix, project['group'], project['label']])
total_size, file_cnt = self._append_targets(targets, 'projects', project, prefix, total_size, file_cnt, data_path, req_spec.get('filters'))
total_size, file_cnt = self._append_targets(targets, 'projects', project, prefix, total_size, file_cnt, req_spec.get('filters'))

sessions = config.db.sessions.find({'project': item_id, 'deleted': {'$exists': False}}, ['label', 'files', 'uid', 'timestamp', 'timezone', 'subject'])
session_dict = {session['_id']: session for session in sessions}
Expand All @@ -157,19 +157,19 @@ def _preflight_archivestream(self, req_spec, collection=None):
for code, subject in subject_dict.iteritems():
subject_prefix = self._path_from_container(prefix, subject, ids_of_paths, code)
subject_prefixes[code] = subject_prefix
total_size, file_cnt = self._append_targets(targets, 'subjects', subject, subject_prefix, total_size, file_cnt, data_path, req_spec.get('filters'))
total_size, file_cnt = self._append_targets(targets, 'subjects', subject, subject_prefix, total_size, file_cnt, req_spec.get('filters'))

for session in session_dict.itervalues():
subject_code = session['subject'].get('code', 'unknown_subject')
subject = subject_dict[subject_code]
session_prefix = self._path_from_container(subject_prefixes[subject_code], session, ids_of_paths, session["_id"])
session_prefixes[session['_id']] = session_prefix
total_size, file_cnt = self._append_targets(targets, 'sessions', session, session_prefix, total_size, file_cnt, data_path, req_spec.get('filters'))
total_size, file_cnt = self._append_targets(targets, 'sessions', session, session_prefix, total_size, file_cnt, req_spec.get('filters'))

for acq in acquisitions:
session = session_dict[acq['session']]
acq_prefix = self._path_from_container(session_prefixes[session['_id']], acq, ids_of_paths, acq['_id'])
total_size, file_cnt = self._append_targets(targets, 'acquisitions', acq, acq_prefix, total_size, file_cnt, data_path, req_spec.get('filters'))
total_size, file_cnt = self._append_targets(targets, 'acquisitions', acq, acq_prefix, total_size, file_cnt, req_spec.get('filters'))


elif item['level'] == 'session':
Expand All @@ -184,7 +184,7 @@ def _preflight_archivestream(self, req_spec, collection=None):
if not subject.get('code'):
subject['code'] = 'unknown_subject'
prefix = self._path_from_container(self._path_from_container(project['group'] + '/' + project['label'], subject, ids_of_paths, subject["code"]), session, ids_of_paths, session['_id'])
total_size, file_cnt = self._append_targets(targets, 'sessions', session, prefix, total_size, file_cnt, data_path, req_spec.get('filters'))
total_size, file_cnt = self._append_targets(targets, 'sessions', session, prefix, total_size, file_cnt, req_spec.get('filters'))

# If the param `collection` holding a collection id is not None, filter out acquisitions that are not in the collection
a_query = {'session': item_id, 'deleted': {'$exists': False}}
Expand All @@ -194,7 +194,7 @@ def _preflight_archivestream(self, req_spec, collection=None):

for acq in acquisitions:
acq_prefix = self._path_from_container(prefix, acq, ids_of_paths, acq['_id'])
total_size, file_cnt = self._append_targets(targets, 'acquisitions', acq, acq_prefix, total_size, file_cnt, data_path, req_spec.get('filters'))
total_size, file_cnt = self._append_targets(targets, 'acquisitions', acq, acq_prefix, total_size, file_cnt, req_spec.get('filters'))

elif item['level'] == 'acquisition':
acq = config.db.acquisitions.find_one(base_query, ['session', 'label', 'files', 'uid', 'timestamp', 'timezone'])
Expand All @@ -210,7 +210,7 @@ def _preflight_archivestream(self, req_spec, collection=None):

project = config.db.projects.find_one({'_id': session['project']}, ['group', 'label'])
prefix = self._path_from_container(self._path_from_container(self._path_from_container(project['group'] + '/' + project['label'], subject, ids_of_paths, subject['code']), session, ids_of_paths, session["_id"]), acq, ids_of_paths, acq['_id'])
total_size, file_cnt = self._append_targets(targets, 'acquisitions', acq, prefix, total_size, file_cnt, data_path, req_spec.get('filters'))
total_size, file_cnt = self._append_targets(targets, 'acquisitions', acq, prefix, total_size, file_cnt, req_spec.get('filters'))

elif item['level'] == 'analysis':
analysis = config.db.analyses.find_one(base_query, ['parent', 'label', 'files', 'uid', 'timestamp'])
Expand All @@ -220,7 +220,7 @@ def _preflight_archivestream(self, req_spec, collection=None):
continue
prefix = self._path_from_container("", analysis, ids_of_paths, util.sanitize_string_to_filename(analysis['label']))
filename = 'analysis_' + util.sanitize_string_to_filename(analysis['label']) + '.tar'
total_size, file_cnt = self._append_targets(targets, 'analyses', analysis, prefix, total_size, file_cnt, data_path, req_spec.get('filters'))
total_size, file_cnt = self._append_targets(targets, 'analyses', analysis, prefix, total_size, file_cnt, req_spec.get('filters'))

if len(targets) > 0:
if not filename:
Expand Down Expand Up @@ -278,8 +278,9 @@ def archivestream(self, ticket):
stream = cStringIO.StringIO()
with tarfile.open(mode='w|', fileobj=stream) as archive:
for filepath, arcpath, cont_name, cont_id, _ in ticket['target']:
yield archive.gettarinfo(filepath, arcpath).tobuf()
with open(filepath, 'rb') as fd:
file_system = files.get_fs_by_file_path(filepath)
with file_system.open(filepath, 'rb') as fd:
yield archive.gettarinfo(fileobj=fd, arcname=arcpath).tobuf()
chunk = ''
for chunk in iter(lambda: fd.read(CHUNKSIZE), ''): # pylint: disable=cell-var-from-loop
yield chunk
Expand All @@ -289,11 +290,11 @@ def archivestream(self, ticket):
yield stream.getvalue() # get tar stream trailer
stream.close()

def symlinkarchivestream(self, ticket, data_path):
def symlinkarchivestream(self, ticket):
for filepath, arcpath, cont_name, cont_id, _ in ticket['target']:
t = tarfile.TarInfo(name=arcpath)
t.type = tarfile.SYMTYPE
t.linkname = os.path.relpath(filepath, data_path)
t.linkname = fs.path.relpath(filepath)
yield t.tobuf()
self.log_user_access(AccessType.download_file, cont_name=cont_name, cont_id=cont_id, filename=os.path.basename(arcpath), multifile=True, origin_override=ticket['origin']) # log download
stream = cStringIO.StringIO()
Expand All @@ -312,7 +313,7 @@ def download(self):
if ticket['ip'] != self.request.client_addr:
self.abort(400, 'ticket not for this source IP')
if self.get_param('symlinks'):
self.response.app_iter = self.symlinkarchivestream(ticket, config.get_item('persistent', 'data_path'))
self.response.app_iter = self.symlinkarchivestream(ticket)
else:
self.response.app_iter = self.archivestream(ticket)
self.response.headers['Content-Type'] = 'application/octet-stream'
Expand Down
Loading