From a8e69f0fe73fb8b6dfeb28d2d7a1e4e547948e2b Mon Sep 17 00:00:00 2001 From: John Chilton Date: Wed, 13 Dec 2017 08:42:07 -0500 Subject: [PATCH] [WIP] Hierarchical upload API optimized for folders & collections. Allows describing hierarchical data in JSON or inferring structure from archives or directories. Datasets or archive sources can be specified via uploads, URLs, paths (if admin && allow_path_paste), library_import_dir/user_library_import_dir, and/or FTP imports. Unlike existing API endpoints, a mix of these on a per file basis is allowed and they work seemlessly between libraries and histories. Supported "archives" include gzip, zip, bagit directories, bagit achives (with fetching and validations of downloads). The existing upload API endpoint is quite rough to work with both in terms of adding parameters (e.g. the file type and dbkey hanlding in 4563 was difficult to implement, terribly hacky, and should seemingly have been trivial) and in terms of building requests (one needs to build a tool form - not describe sensible inputs in JSON). This API is built to be intelligable from an API standpoint instead of being constrained to the older style tool form. Additionally it built with hierarchical data in mind in a way that would not be easy at all enhancing the tool form components we don't even render. This implements 5159 though much simpler YAML descriptions of data libraries should be possible basically as the API descriptions. We can replace the data library script in Ephemeris https://github.com/galaxyproject/ephemeris/blob/master/ephemeris/setup_data_libraries.py with one that converts a simple YAML file into an API call and allows many new options for free. In future PRs I'll add filtering options to this and it will serve as the backend to 4733. --- lib/galaxy/app.py | 4 + lib/galaxy/datatypes/sniff.py | 7 + lib/galaxy/datatypes/upload_util.py | 47 +++ lib/galaxy/jobs/__init__.py | 2 +- lib/galaxy/managers/collections.py | 8 +- lib/galaxy/tools/__init__.py | 12 +- lib/galaxy/tools/actions/upload.py | 55 ++++ lib/galaxy/tools/actions/upload_common.py | 8 +- lib/galaxy/tools/data_fetch.py | 290 ++++++++++++++++++ lib/galaxy/tools/data_fetch.xml | 33 ++ lib/galaxy/tools/execute.py | 2 +- lib/galaxy/tools/parameters/output_collect.py | 178 ++++++++++- lib/galaxy/tools/special_tools.py | 1 + lib/galaxy/webapps/galaxy/api/_fetch_util.py | 103 +++++++ lib/galaxy/webapps/galaxy/api/tools.py | 36 +++ lib/galaxy/webapps/galaxy/buildapp.py | 1 + test-data/example-bag.zip | Bin 0 -> 2966 bytes test-data/testdir1.zip | Bin 0 -> 825 bytes test/api/test_dataset_collections.py | 31 ++ test/api/test_libraries.py | 147 +++++++++ test/base/populators.py | 26 +- tools/data_source/upload.py | 88 +++--- 22 files changed, 1002 insertions(+), 77 deletions(-) create mode 100644 lib/galaxy/datatypes/upload_util.py create mode 100644 lib/galaxy/tools/data_fetch.py create mode 100644 lib/galaxy/tools/data_fetch.xml create mode 100644 lib/galaxy/webapps/galaxy/api/_fetch_util.py create mode 100644 test-data/example-bag.zip create mode 100644 test-data/testdir1.zip diff --git a/lib/galaxy/app.py b/lib/galaxy/app.py index 25de26171058..ce22ccd7f396 100644 --- a/lib/galaxy/app.py +++ b/lib/galaxy/app.py @@ -12,6 +12,8 @@ from galaxy import config, jobs from galaxy.jobs import metrics as job_metrics from galaxy.managers.collections import DatasetCollectionManager +from galaxy.managers.folders import FolderManager +from galaxy.managers.libraries import LibraryManager from galaxy.managers.tags import GalaxyTagManager from galaxy.openid.providers import OpenIDProviders from galaxy.queue_worker import GalaxyQueueWorker @@ -90,6 +92,8 @@ def __init__(self, **kwargs): self.tag_handler = GalaxyTagManager(self.model.context) # Dataset Collection Plugins self.dataset_collections_service = DatasetCollectionManager(self) + self.library_folder_manager = FolderManager() + self.library_manager = LibraryManager() # Tool Data Tables self._configure_tool_data_tables(from_shed_config=False) diff --git a/lib/galaxy/datatypes/sniff.py b/lib/galaxy/datatypes/sniff.py index cf058cf069ac..4f019234e142 100644 --- a/lib/galaxy/datatypes/sniff.py +++ b/lib/galaxy/datatypes/sniff.py @@ -14,6 +14,7 @@ import zipfile from six import text_type +from six.moves.urllib.request import urlopen from galaxy import util from galaxy.util import compression_utils @@ -39,6 +40,12 @@ def get_test_fname(fname): return full_path +def stream_url_to_file(path): + page = urlopen(path) # page will be .close()ed in stream_to_file + temp_name = stream_to_file(page, prefix='url_paste', source_encoding=util.get_charset_from_http_headers(page.headers)) + return temp_name + + def stream_to_open_named_file(stream, fd, filename, source_encoding=None, source_error='strict', target_encoding=None, target_error='strict'): """Writes a stream to the provided file descriptor, returns the file name. Closes file descriptor""" # signature and behavor is somewhat odd, due to backwards compatibility, but this can/should be done better diff --git a/lib/galaxy/datatypes/upload_util.py b/lib/galaxy/datatypes/upload_util.py new file mode 100644 index 000000000000..8c65ad6aad9a --- /dev/null +++ b/lib/galaxy/datatypes/upload_util.py @@ -0,0 +1,47 @@ +from galaxy.datatypes import sniff +from galaxy.datatypes.binary import Binary + + +class UploadProblemException(Exception): + + def __init__(self, message): + self.message = message + + +def handle_unsniffable_binary_check(data_type, ext, path, name, requested_ext, check_content, registry): + """Return modified values of data_type and ext if unsniffable binary encountered. + + Throw UploadProblemException if content problems or extension mismatches occur. + + Precondition: check_binary called returned True. + """ + if registry.is_extension_unsniffable_binary(requested_ext): + # We have a binary dataset, but it is not Bam, Sff or Pdf + data_type = 'binary' + parts = name.split(".") + if len(parts) > 1: + ext = parts[-1].strip().lower() + is_ext_unsniffable_binary = registry.is_extension_unsniffable_binary(ext) + if check_content and not is_ext_unsniffable_binary: + raise UploadProblemException('The uploaded binary file contains inappropriate content') + + elif is_ext_unsniffable_binary and requested_ext != ext: + err_msg = "You must manually set the 'File Format' to '%s' when uploading %s files." % (ext, ext) + raise UploadProblemException(err_msg) + return ext, data_type + + +def handle_sniffable_binary_check(data_type, ext, path, registry): + """Return modified values of data_type and ext if sniffable binary encountered. + + Precondition: check_binary called returned True. + """ + # Sniff the data type + guessed_ext = sniff.guess_ext(path, registry.sniff_order) + # Set data_type only if guessed_ext is a binary datatype + datatype = registry.get_datatype_by_extension(guessed_ext) + if isinstance(datatype, Binary): + data_type = guessed_ext + ext = guessed_ext + + return data_type, ext diff --git a/lib/galaxy/jobs/__init__.py b/lib/galaxy/jobs/__init__.py index 3afd5657a3af..08793f07a091 100644 --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -1363,7 +1363,7 @@ def path_rewriter(path): collected_datasets = { 'primary': self.tool.collect_primary_datasets(out_data, self.get_tool_provided_job_metadata(), tool_working_directory, input_ext, input_dbkey) } - self.tool.collect_dynamic_collections( + self.tool.collect_dynamic_outputs( out_collections, self.get_tool_provided_job_metadata(), job_working_directory=tool_working_directory, diff --git a/lib/galaxy/managers/collections.py b/lib/galaxy/managers/collections.py index e823b0f44ca3..dad652a48c2a 100644 --- a/lib/galaxy/managers/collections.py +++ b/lib/galaxy/managers/collections.py @@ -46,7 +46,7 @@ def __init__(self, app): self.tag_manager = tags.GalaxyTagManager(app.model.context) self.ldda_manager = lddas.LDDAManager(app) - def precreate_dataset_collection_instance(self, trans, parent, name, implicit_inputs, implicit_output_name, structure): + def precreate_dataset_collection_instance(self, trans, parent, name, structure, implicit_inputs=None, implicit_output_name=None): # TODO: prebuild all required HIDs and send them in so no need to flush in between. dataset_collection = self.precreate_dataset_collection(structure) instance = self._create_instance_for_collection( @@ -56,7 +56,9 @@ def precreate_dataset_collection_instance(self, trans, parent, name, implicit_in def precreate_dataset_collection(self, structure): if structure.is_leaf or not structure.children_known: - return model.DatasetCollectionElement.UNINITIALIZED_ELEMENT + collection_type_description = structure.collection_type_description + dataset_collection = model.DatasetCollection(populated=False) + dataset_collection.collection_type = collection_type_description.collection_type else: collection_type_description = structure.collection_type_description dataset_collection = model.DatasetCollection(populated=False) @@ -78,7 +80,7 @@ def precreate_dataset_collection(self, structure): dataset_collection.elements = elements dataset_collection.element_count = len(elements) - return dataset_collection + return dataset_collection def create(self, trans, parent, name, collection_type, element_identifiers=None, elements=None, implicit_collection_info=None, trusted_identifiers=None, diff --git a/lib/galaxy/tools/__init__.py b/lib/galaxy/tools/__init__.py index 660f47869633..13914089a833 100755 --- a/lib/galaxy/tools/__init__.py +++ b/lib/galaxy/tools/__init__.py @@ -101,6 +101,7 @@ # Tools that require Galaxy's Python environment to be preserved. GALAXY_LIB_TOOLS_UNVERSIONED = [ "upload1", + "__DATA_FETCH__", # Legacy tools bundled with Galaxy. "vcf_to_maf_customtrack1", "laj_1", @@ -1039,7 +1040,10 @@ def parse_input_elem(self, page_source, enctypes, context=None): group.file_type_name = elem.get('file_type_name', group.file_type_name) group.default_file_type = elem.get('default_file_type', group.default_file_type) group.metadata_ref = elem.get('metadata_ref', group.metadata_ref) - rval[group.file_type_name].refresh_on_change = True + try: + rval[group.file_type_name].refresh_on_change = True + except KeyError: + pass group_page_source = XmlPageSource(elem) group.inputs = self.parse_input_elem(group_page_source, enctypes, context) rval[group.name] = group @@ -1576,10 +1580,10 @@ def collect_primary_datasets(self, output, tool_provided_metadata, job_working_d """ return output_collect.collect_primary_datasets(self, output, tool_provided_metadata, job_working_directory, input_ext, input_dbkey=input_dbkey) - def collect_dynamic_collections(self, output, tool_provided_metadata, **kwds): - """ Find files corresponding to dynamically structured collections. + def collect_dynamic_outputs(self, output, tool_provided_metadata, **kwds): + """Collect dynamic outputs associated with a job from this tool. """ - return output_collect.collect_dynamic_collections(self, output, tool_provided_metadata, **kwds) + return output_collect.collect_dynamic_outputs(self, output, tool_provided_metadata, **kwds) def to_archive(self): tool = self diff --git a/lib/galaxy/tools/actions/upload.py b/lib/galaxy/tools/actions/upload.py index 70bf8152c44a..c19fb960ac86 100644 --- a/lib/galaxy/tools/actions/upload.py +++ b/lib/galaxy/tools/actions/upload.py @@ -1,3 +1,4 @@ +import json import logging from galaxy.tools.actions import upload_common @@ -36,3 +37,57 @@ def execute(self, tool, trans, incoming={}, set_output_hid=True, history=None, * rval = upload_common.create_job(trans, incoming, tool, json_file_path, data_list, history=history) log.debug("Created upload job %s" % create_job_timer) return rval + + +class FetchUploadToolAction(ToolAction): + + def execute(self, tool, trans, incoming={}, set_output_hid=True, history=None, **kwargs): + dataset_upload_inputs = [] + for input_name, input in tool.inputs.items(): + if input.type == "upload_dataset": + dataset_upload_inputs.append(input) + assert dataset_upload_inputs, Exception("No dataset upload groups were found.") + + persisting_uploads_timer = ExecutionTimer() + # precreated_datasets = upload_common.get_precreated_datasets(trans, incoming, trans.app.model.HistoryDatasetAssociation) + incoming = upload_common.persist_uploads(incoming, trans) + log.debug("Persisted uploads %s" % persisting_uploads_timer) + + # Now replace references in requests with these. + files = incoming.get("files", []) + files_iter = iter(files) + request = json.loads(incoming.get("request_json")) + + def replace_file_srcs(request_part): + if isinstance(request_part, dict): + if request_part.get("src", None) == "files": + path_def = next(files_iter) + request_part["path"] = path_def["file_data"]["local_filename"] + if "name" not in request_part: + request_part["name"] = path_def["file_data"]["filename"] + request_part["src"] = "path" + else: + for key, value in request_part.items(): + replace_file_srcs(value) + elif isinstance(request_part, list): + for value in request_part: + replace_file_srcs(value) + + replace_file_srcs(request) + + incoming["request_json"] = json.dumps(request) + log.info("incoming are %s" % incoming) + # We can pass an empty string as the cntrller here since it is used to check whether we + # are in an admin view, and this tool is currently not used there. + check_and_cleanup_timer = ExecutionTimer() + # uploaded_datasets = upload_common.get_uploaded_datasets(trans, '', incoming, precreated_datasets, dataset_upload_inputs, history=history) + # upload_common.cleanup_unused_precreated_datasets(precreated_datasets) + + # if not uploaded_datasets: + # return None, 'No data was entered in the upload form, please go back and choose data to upload.' + + log.debug("Checked and cleaned uploads %s" % check_and_cleanup_timer) + create_job_timer = ExecutionTimer() + rval = upload_common.create_job(trans, incoming, tool, None, [], history=history) + log.debug("Created upload job %s" % create_job_timer) + return rval diff --git a/lib/galaxy/tools/actions/upload_common.py b/lib/galaxy/tools/actions/upload_common.py index ae70cae17de3..dc7831403123 100644 --- a/lib/galaxy/tools/actions/upload_common.py +++ b/lib/galaxy/tools/actions/upload_common.py @@ -123,7 +123,7 @@ def persist_uploads(params, trans): local_filename=local_filename) elif type(f) == dict and 'local_filename' not in f: raise Exception('Uploaded file was encoded in a way not understood by Galaxy.') - if upload_dataset['url_paste'] and upload_dataset['url_paste'].strip() != '': + if 'url_paste' in upload_dataset and upload_dataset['url_paste'] and upload_dataset['url_paste'].strip() != '': upload_dataset['url_paste'] = datatypes.sniff.stream_to_file( StringIO(validate_url(upload_dataset['url_paste'], trans.app.config.fetch_url_whitelist_ips)), prefix="strio_url_paste_" @@ -334,7 +334,11 @@ def new_upload(trans, cntrller, uploaded_dataset, library_bunch=None, history=No def get_uploaded_datasets(trans, cntrller, params, precreated_datasets, dataset_upload_inputs, library_bunch=None, history=None): uploaded_datasets = [] for dataset_upload_input in dataset_upload_inputs: - uploaded_datasets.extend(dataset_upload_input.get_uploaded_datasets(trans, params)) + try: + uploaded_datasets.extend(dataset_upload_input.get_uploaded_datasets(trans, params)) + except AttributeError: + # TODO: refine... + pass for uploaded_dataset in uploaded_datasets: data = get_precreated_dataset(precreated_datasets, uploaded_dataset.name) if not data: diff --git a/lib/galaxy/tools/data_fetch.py b/lib/galaxy/tools/data_fetch.py new file mode 100644 index 000000000000..1256444e1ee1 --- /dev/null +++ b/lib/galaxy/tools/data_fetch.py @@ -0,0 +1,290 @@ +import argparse +import errno +import json +import os +import shutil +import sys +import tempfile + +import bdbag.bdbag_api + +from galaxy.datatypes import sniff +from galaxy.datatypes.registry import Registry +from galaxy.datatypes.upload_util import ( + handle_sniffable_binary_check, + handle_unsniffable_binary_check, + UploadProblemException, +) +from galaxy.util import in_directory +from galaxy.util.checkers import ( + check_binary, + check_html, +) +from galaxy.util.compression_utils import CompressedFile + +DESCRIPTION = """Data Import Script""" + + +def main(argv=None): + if argv is None: + argv = sys.argv[1:] + args = _arg_parser().parse_args(argv) + + registry = Registry() + registry.load_datatypes(root_dir=args.galaxy_root, config=args.datatypes_registry) + + request_path = args.request + assert os.path.exists(request_path) + with open(request_path) as f: + request = json.load(f) + + upload_config = UploadConfig(request, registry) + galaxy_json = _request_to_galaxy_json(upload_config, request) + with open("galaxy.json", "w") as f: + json.dump(galaxy_json, f) + + +def _request_to_galaxy_json(upload_config, request): + targets = request.get("targets", []) + fetched_targets = [] + + for target in targets: + fetched_target = _fetch_target(upload_config, target) + fetched_targets.append(fetched_target) + + return {"__unnamed_outputs": fetched_targets} + + +def _fetch_target(upload_config, target): + destination = target.get("destination", None) + assert destination, "No destination defined." + + items_from = target.get("items_from", None) + assert not items_from or items_from in ["archive", "bagit", "bagit_archive"] + if items_from == "archive": + decompressed_directory = _decompress_target(target) + items = _directory_to_items(decompressed_directory) + elif items_from == "bagit": + _, items_from_path = _has_src_to_path(target) + items = _bagit_to_items(items_from_path) + elif items_from == "bagit_archive": + decompressed_directory = _decompress_target(target) + items = _bagit_to_items(decompressed_directory) + else: + items = target.get("items") + assert items is not None, "No item definition found for destination [%s]" % destination + + fetched_target = {} + fetched_target["destination"] = destination + if "collection_type" in target: + fetched_target["collection_type"] = target["collection_type"] + + def _resolve_src(item): + converted_path = None + + name, path = _has_src_to_path(item) + dbkey = item.get("dbkey", "?") + requested_ext = item.get("ext", "auto") + info = item.get("info", None) + link_data_only = upload_config.link_data_only + if "link_data_only" in item: + # Allow overriding this on a per file basis. + link_data_only = _link_data_only(item) + to_posix_lines = upload_config.get_option(item, "to_posix_lines") + space_to_tab = upload_config.get_option(item, "space_to_tab") + in_place = item.get("in_place", False) + purge_source = item.get("purge_source", True) + + # Follow upload.py logic but without the auto-decompress logic. + registry = upload_config.registry + check_content = upload_config.check_content + data_type, ext = None, requested_ext + + is_binary = check_binary(path) + if is_binary: + data_type, ext = handle_sniffable_binary_check(data_type, ext, path, registry) + if data_type is None: + if is_binary: + data_type, ext = handle_unsniffable_binary_check( + data_type, ext, path, name, requested_ext, check_content, registry + ) + if not data_type and check_content and check_html(path): + raise UploadProblemException('The uploaded file contains inappropriate HTML content') + + if data_type != 'binary': + if not link_data_only: + if to_posix_lines: + if space_to_tab: + line_count, converted_path = sniff.convert_newlines_sep2tabs(path, in_place=in_place, tmp_dir=".") + else: + line_count, converted_path = sniff.convert_newlines(path, in_place=in_place, tmp_dir=".") + + if requested_ext == 'auto': + ext = sniff.guess_ext(path, registry.sniff_order) + else: + ext = requested_ext + + data_type = ext + + if ext == 'auto' and data_type == 'binary': + ext = 'data' + if ext == 'auto' and requested_ext: + ext = requested_ext + if ext == 'auto': + ext = 'data' + + datatype = registry.get_datatype_by_extension(ext) + if link_data_only: + # Never alter a file that will not be copied to Galaxy's local file store. + if datatype.dataset_content_needs_grooming(path): + err_msg = 'The uploaded files need grooming, so change your Copy data into Galaxy? selection to be ' + \ + 'Copy files into Galaxy instead of Link to files without copying into Galaxy so grooming can be performed.' + raise UploadProblemException(err_msg) + + # If this file is not in the workdir make sure it gets there. + if not link_data_only and converted_path: + path = upload_config.ensure_in_working_directory(converted_path, purge_source, in_place) + elif not link_data_only: + path = upload_config.ensure_in_working_directory(path, purge_source, in_place) + + if not link_data_only and datatype and datatype.dataset_content_needs_grooming(path): + # Groom the dataset content if necessary + datatype.groom_dataset_content(path) + + rval = {"name": name, "filename": path, "dbkey": dbkey, "ext": ext, "link_data_only": link_data_only} + if info is not None: + rval["info"] = info + return rval + + elements = elements_tree_map(_resolve_src, items) + + fetched_target["elements"] = elements + return fetched_target + + +def _bagit_to_items(directory): + bdbag.bdbag_api.resolve_fetch(directory) + bdbag.bdbag_api.validate_bag(directory) + items = _directory_to_items(os.path.join(directory, "data")) + return items + + +def _decompress_target(target): + items_from_name, items_from_path = _has_src_to_path(target) + temp_directory = tempfile.mkdtemp(prefix=items_from_name, dir=".") + decompressed_directory = CompressedFile(items_from_path).extract(temp_directory) + return decompressed_directory + + +def elements_tree_map(f, items): + new_items = [] + for item in items: + if "elements" in item: + new_item = item.copy() + new_item["elements"] = elements_tree_map(f, item["elements"]) + new_items.append(new_item) + else: + new_items.append(f(item)) + return new_items + + +def _directory_to_items(directory): + items = [] + dir_elements = {} + for root, dirs, files in os.walk(directory): + if root in dir_elements: + target = dir_elements[root] + else: + target = items + for dir in dirs: + dir_dict = {"name": dir, "elements": []} + dir_elements[os.path.join(root, dir)] = dir_dict["elements"] + target.append(dir_dict) + for file in files: + target.append({"src": "path", "path": os.path.join(root, file)}) + + return items + + +def _has_src_to_path(item): + assert "src" in item + src = item.get("src") + name = item.get("name") + if src == "url": + url = item.get("url") + path = sniff.stream_url_to_file(url) + if name is None: + name = url.split("/")[-1] + else: + assert src == "path" + path = item["path"] + if name is None: + name = os.path.basename(path) + return name, path + + +def _arg_parser(): + parser = argparse.ArgumentParser(description=DESCRIPTION) + parser.add_argument("--galaxy-root") + parser.add_argument("--datatypes-registry") + parser.add_argument("--request-version") + parser.add_argument("--request") + return parser + + +class UploadConfig(object): + + def __init__(self, request, registry): + self.registry = registry + self.check_content = request.get("check_content" , True) + self.to_posix_lines = request.get("to_posix_lines", False) + self.space_to_tab = request.get("space_to_tab", False) + self.link_data_only = _link_data_only(request) + + self.__workdir = os.path.abspath(".") + self.__upload_count = 0 + + def get_option(self, item, key): + """Return item[key] if specified otherwise use default from UploadConfig. + + This default represents the default for the whole request instead item which + is the option for individual files. + """ + if key in item: + return item[key] + else: + return getattr(self, key) + + def __new_dataset_path(self): + path = "gxupload_%d" % self.__upload_count + self.__upload_count += 1 + return path + + def ensure_in_working_directory(self, path, purge_source, in_place): + if in_directory(path, self.__workdir): + return path + + new_path = self.__new_dataset_path() + if purge_source: + try: + shutil.move(path, new_path) + except OSError as e: + # We may not have permission to remove converted_path + if e.errno != errno.EACCES: + raise + else: + shutil.copy(path, new_path) + + return new_path + + +def _link_data_only(has_config_dict): + link_data_only = has_config_dict.get("link_data_only", False) + if not isinstance(link_data_only, bool): + # Allow the older string values of 'copy_files' and 'link_to_files' + link_data_only = link_data_only == "copy_files" + return link_data_only + + +if __name__ == "__main__": + main() diff --git a/lib/galaxy/tools/data_fetch.xml b/lib/galaxy/tools/data_fetch.xml new file mode 100644 index 000000000000..5160cb2989c8 --- /dev/null +++ b/lib/galaxy/tools/data_fetch.xml @@ -0,0 +1,33 @@ + + + + + + + + + + + + + + + + + $request_json + + + + + diff --git a/lib/galaxy/tools/execute.py b/lib/galaxy/tools/execute.py index 1a83a0d245ff..4ca2cf967278 100644 --- a/lib/galaxy/tools/execute.py +++ b/lib/galaxy/tools/execute.py @@ -278,9 +278,9 @@ def precreate_output_collections(self, history, params): trans=trans, parent=history, name=output_collection_name, + structure=effective_structure, implicit_inputs=implicit_inputs, implicit_output_name=output_name, - structure=effective_structure, ) collection_instance.implicit_collection_jobs = implicit_collection_jobs collection_instances[output_name] = collection_instance diff --git a/lib/galaxy/tools/parameters/output_collect.py b/lib/galaxy/tools/parameters/output_collect.py index c24babea7c4a..2e54d94672fd 100644 --- a/lib/galaxy/tools/parameters/output_collect.py +++ b/lib/galaxy/tools/parameters/output_collect.py @@ -12,7 +12,9 @@ from galaxy.tools.parser.output_collection_def import ( DEFAULT_DATASET_COLLECTOR_DESCRIPTION, INPUT_DBKEY_TOKEN, + ToolProvidedMetadataDatasetCollection, ) +from galaxy.dataset_collections.structure import UnitializedTree from galaxy.util import ( ExecutionTimer, odict @@ -31,6 +33,9 @@ def get_new_datasets(self, output_name): def get_new_dataset_meta_by_basename(self, output_name, basename): return {} + def get_unnamed_outputs(self): + return [] + class LegacyToolProvidedMetadata(object): @@ -73,6 +78,9 @@ def get_new_datasets(self, output_name): log.warning("Called get_new_datasets with legacy tool metadata provider - that is unimplemented.") return [] + def get_unnamed_outputs(self): + return [] + class ToolProvidedMetadata(object): @@ -111,8 +119,12 @@ def _elements_to_datasets(self, elements, level=0): extra_kwds.update(element) yield extra_kwds + def get_unnamed_outputs(self): + log.info(self.tool_provided_job_metadata) + return self.tool_provided_job_metadata.get("__unnamed_outputs", []) + -def collect_dynamic_collections( +def collect_dynamic_outputs( tool, output_collections, tool_provided_metadata, @@ -121,6 +133,7 @@ def collect_dynamic_collections( job=None, input_dbkey="?", ): + app = tool.app collections_service = tool.app.dataset_collections_service job_context = JobContext( tool, @@ -130,6 +143,86 @@ def collect_dynamic_collections( inp_data, input_dbkey, ) + log.info(tool_provided_metadata) + for unnamed_output_dict in tool_provided_metadata.get_unnamed_outputs(): + assert "destination" in unnamed_output_dict + assert "elements" in unnamed_output_dict + destination = unnamed_output_dict["destination"] + elements = unnamed_output_dict["elements"] + + assert "type" in destination + destination_type = destination["type"] + trans = job_context.work_context + + if destination_type == "library_folder": + + library_folder_manager = app.library_folder_manager + library_folder = library_folder_manager.get(trans, app.security.decode_id(destination.get("library_folder_id"))) + + def add_elements_to_folder(elements, library_folder): + for element in elements: + if "elements" in element: + assert "name" in element + name = element["name"] + description = element.get("description") + nested_folder = library_folder_manager.create(trans, library_folder.id, name, description) + add_elements_to_folder(element["elements"], nested_folder) + else: + discovered_file = discovered_file_for_unnamed_output(element, job_working_directory) + fields_match = discovered_file.match + designation = fields_match.designation + visible = fields_match.visible + ext = fields_match.ext + dbkey = fields_match.dbkey + info = element.get("info", None) + + # Create new primary dataset + name = fields_match.name or designation + + job_context.create_dataset( + ext=ext, + designation=designation, + visible=visible, + dbkey=dbkey, + name=name, + filename=discovered_file.path, + info=info, + library_folder=library_folder + ) + + add_elements_to_folder(elements, library_folder) + elif destination_type == "hdca": + history = job.history + assert "collection_type" in unnamed_output_dict + name = unnamed_output_dict.get("name", "unnamed collection") + collection_type = unnamed_output_dict["collection_type"] + collection_type_description = collections_service.collection_type_descriptions.for_collection_type(collection_type) + structure = UnitializedTree(collection_type_description) + hdca = collections_service.precreate_dataset_collection_instance( + trans, history, name, structure=structure + ) + filenames = odict.odict() + + def add_to_discovered_files(elements): + for element in elements: + if "elements" in element: + add_to_discovered_files(element["elements"]) + else: + discovered_file = discovered_file_for_unnamed_output(element, job_working_directory) + filenames[discovered_file.path] = discovered_file + + add_to_discovered_files(elements) + + collection = hdca.collection + collection_builder = collections_service.collection_builder_for( + collection + ) + job_context.populate_collection_elements( + collection, + collection_builder, + filenames, + ) + collection_builder.populate() for name, has_collection in output_collections.items(): if name not in tool.output_collections: @@ -146,13 +239,19 @@ def collect_dynamic_collections( collection = has_collection try: + collection_builder = collections_service.collection_builder_for( collection ) + dataset_collectors = map(dataset_collector, output_collection_def.dataset_collector_descriptions) + output_name = output_collection_def.name + filenames = job_context.find_files(output_name, collection, dataset_collectors) job_context.populate_collection_elements( collection, collection_builder, - output_collection_def, + filenames, + name=output_collection_def.name, + metadata_source_name=output_collection_def.metadata_source, ) collection_builder.populate() except Exception: @@ -171,6 +270,11 @@ def __init__(self, tool, tool_provided_metadata, job, job_working_directory, inp self.job_working_directory = job_working_directory self.tool_provided_metadata = tool_provided_metadata + @property + def work_context(self): + from galaxy.work.context import WorkRequestContext + return WorkRequestContext(self.app, user=self.job.user) + @property def permissions(self): inp_data = self.inp_data @@ -188,15 +292,14 @@ def find_files(self, output_name, collection, dataset_collectors): filenames[discovered_file.path] = discovered_file return filenames - def populate_collection_elements(self, collection, root_collection_builder, output_collection_def): + def populate_collection_elements(self, collection, root_collection_builder, filenames, name=None, metadata_source_name=None): # TODO: allow configurable sorting. # # # # - dataset_collectors = map(dataset_collector, output_collection_def.dataset_collector_descriptions) - output_name = output_collection_def.name - filenames = self.find_files(output_name, collection, dataset_collectors) + if name is None: + name = "unnamed output" element_datasets = [] for filename, discovered_file in filenames.items(): @@ -222,14 +325,14 @@ def populate_collection_elements(self, collection, root_collection_builder, outp dbkey=dbkey, name=name, filename=filename, - metadata_source_name=output_collection_def.metadata_source, + metadata_source_name=metadata_source_name, ) log.debug( "(%s) Created dynamic collection dataset for path [%s] with element identifier [%s] for output [%s] %s", self.job.id, filename, designation, - output_collection_def.name, + name, create_dataset_timer, ) element_datasets.append((element_identifiers, dataset)) @@ -244,7 +347,7 @@ def populate_collection_elements(self, collection, root_collection_builder, outp log.debug( "(%s) Add dynamic collection datsets to history for output [%s] %s", self.job.id, - output_collection_def.name, + name, add_datasets_timer, ) @@ -274,12 +377,17 @@ def create_dataset( dbkey, name, filename, - metadata_source_name, + metadata_source_name=None, + info=None, + library_folder=None, ): app = self.app sa_session = self.sa_session - primary_data = _new_hda(app, sa_session, ext, designation, visible, dbkey, self.permissions) + if not library_folder: + primary_data = _new_hda(app, sa_session, ext, designation, visible, dbkey, self.permissions) + else: + primary_data = _new_ldda(self.work_context, name, ext, visible, dbkey, library_folder) # Copy metadata from one of the inputs if requested. metadata_source = None @@ -299,6 +407,9 @@ def create_dataset( else: primary_data.init_meta() + if info is not None: + primary_data.info = info + primary_data.set_meta() primary_data.set_peek() @@ -465,6 +576,14 @@ def discover_files(output_name, tool_provided_metadata, extra_file_collectors, j yield DiscoveredFile(match.path, collector, match) +def discovered_file_for_unnamed_output(dataset, job_working_directory): + extra_file_collector = DEFAULT_TOOL_PROVIDED_DATASET_COLLECTOR + target_directory = discover_target_directory(extra_file_collector, job_working_directory) + filename = dataset["filename"] + path = os.path.join(target_directory, filename) + return DiscoveredFile(path, extra_file_collector, JsonCollectedDatasetMatch(dataset, extra_file_collector, filename, path=path)) + + def discover_target_directory(extra_file_collector, job_working_directory): directory = job_working_directory if extra_file_collector.directory: @@ -637,6 +756,42 @@ def __init__(self, re_match, collector, filename, path=None): UNSET = object() +def _new_ldda( + trans, + name, + ext, + visible, + dbkey, + library_folder, +): + ld = trans.app.model.LibraryDataset(folder=library_folder, name=name) + trans.sa_session.add(ld) + trans.sa_session.flush() + trans.app.security_agent.copy_library_permissions(trans, library_folder, ld) + + ldda = trans.app.model.LibraryDatasetDatasetAssociation(name=name, + extension=ext, + dbkey=dbkey, + library_dataset=ld, + user=trans.user, + create_dataset=True, + sa_session=trans.sa_session) + trans.sa_session.add(ldda) + ldda.state = ldda.states.OK + # Permissions must be the same on the LibraryDatasetDatasetAssociation and the associated LibraryDataset + trans.app.security_agent.copy_library_permissions(trans, ld, ldda) + # Copy the current user's DefaultUserPermissions to the new LibraryDatasetDatasetAssociation.dataset + trans.app.security_agent.set_all_dataset_permissions(ldda.dataset, trans.app.security_agent.user_get_default_permissions(trans.user)) + library_folder.add_library_dataset(ld, genome_build=dbkey) + trans.sa_session.add(library_folder) + trans.sa_session.flush() + + ld.library_dataset_dataset_association_id = ldda.id + trans.sa_session.add(ld) + trans.sa_session.flush() + return ldda + + def _new_hda( app, sa_session, @@ -663,3 +818,4 @@ def _new_hda( DEFAULT_DATASET_COLLECTOR = DatasetCollector(DEFAULT_DATASET_COLLECTOR_DESCRIPTION) +DEFAULT_TOOL_PROVIDED_DATASET_COLLECTOR = ToolMetadataDatasetCollector(ToolProvidedMetadataDatasetCollection()) diff --git a/lib/galaxy/tools/special_tools.py b/lib/galaxy/tools/special_tools.py index 953e69dee647..129b7064a941 100644 --- a/lib/galaxy/tools/special_tools.py +++ b/lib/galaxy/tools/special_tools.py @@ -4,6 +4,7 @@ SPECIAL_TOOLS = { "history export": "galaxy/tools/imp_exp/exp_history_to_archive.xml", "history import": "galaxy/tools/imp_exp/imp_history_from_archive.xml", + "data fetch": "galaxy/tools/data_fetch.xml", } diff --git a/lib/galaxy/webapps/galaxy/api/_fetch_util.py b/lib/galaxy/webapps/galaxy/api/_fetch_util.py new file mode 100644 index 000000000000..5531f8271835 --- /dev/null +++ b/lib/galaxy/webapps/galaxy/api/_fetch_util.py @@ -0,0 +1,103 @@ +import os + +from galaxy.actions.library import ( + validate_path_upload, + validate_server_directory_upload, +) +from galaxy.exceptions import ( + RequestParameterInvalidException +) +from galaxy.tools.actions.upload_common import validate_url +from galaxy.util import ( + relpath, +) + + +def validate_and_normalize_targets(trans, payload): + """Validate and normalize all src references in fetch targets. + + - Normalize ftp_import and server_dir src entries into simple path entires + with the relevant paths resolved and permissions / configuration checked. + - Check for file:// URLs in items src of "url" and convert them into path + src items - after verifying path pastes are allowed and user is admin. + - Check for valid URLs to be fetched for http and https entries. + - Based on Galaxy configuration and upload types set purge_source and in_place + as needed for each upload. + """ + targets = payload.get("targets", []) + + # Unlike upload.py we don't transmit or use run_as_real_user in the job - we just make sure + # in_place and purge_source are set on the individual upload fetch sources as needed based + # on this. + run_as_real_user = trans.app.config.external_chown_script is None # See comment in upload.py + purge_ftp_source = getattr(trans.app.config, 'ftp_upload_purge', True) and not run_as_real_user + + payload["check_content"] = trans.app.config.check_upload_content + + def check_src(item): + # Normalize file:// URLs into paths. + if item["src"] == "url" and item["url"].startswith("file://"): + item["src"] = "path" + item["path"] = item["url"][len("file://"):] + del item["path"] + + if "in_place" in item: + raise Exception("in_place cannot be set") + + src = item["src"] + if src == "path" or src == "url" and src["url"].startswith("file:"): + # Validate is admin, leave alone. + validate_path_upload(trans) + elif src == "server_dir": + # Validate and replace with path definition. + server_dir = item["server_dir"] + full_path, _ = validate_server_directory_upload(trans, server_dir) + item["src"] = "path" + item["path"] = full_path + elif src == "ftp_import": + ftp_path = item["ftp_path"] + + # It'd be nice if this can be de-duplicated with what is in parameters/grouping.py. + user_ftp_dir = trans.user_ftp_dir + assert not os.path.islink(user_ftp_dir), "User FTP directory cannot be a symbolic link" + for (dirpath, dirnames, filenames) in os.walk(user_ftp_dir): + for filename in filenames: + if ftp_path == filename: + path = relpath(os.path.join(dirpath, filename), user_ftp_dir) + if not os.path.islink(os.path.join(dirpath, filename)): + full_path = os.path.abspath(os.path.join(user_ftp_dir, path)) + + if not full_path: + raise Exception("Failed to find referenced ftp_path or symbolic link was enountered") + + item["src"] = path + item["path"] = full_path + item["purge_source"] = purge_ftp_source + elif src == "url": + url = item["url"] + looks_like_url = False + for url_prefix in ["http://", "https://", "ftp://", "ftps://"]: + if url.startswith(url_prefix): + looks_like_url = True + break + + if not looks_like_url: + raise Exception("Invalid URL [%s] found in src definition." % url) + + validate_url(url, trans.app.config.fetch_url_whitelist_ips) + item["in_place"] = run_as_real_user + elif src == "files": + item["in_place"] = run_as_real_user + + _for_each_src(check_src, targets) + + +def _for_each_src(f, obj): + if isinstance(obj, list): + for item in obj: + _for_each_src(f, item) + if isinstance(obj, dict): + if "src" in obj: + f(obj) + for key, value in obj.items(): + _for_each_src(f, value) diff --git a/lib/galaxy/webapps/galaxy/api/tools.py b/lib/galaxy/webapps/galaxy/api/tools.py index 691531d3c928..30abd5e6e70b 100644 --- a/lib/galaxy/webapps/galaxy/api/tools.py +++ b/lib/galaxy/webapps/galaxy/api/tools.py @@ -11,6 +11,7 @@ from galaxy.web import _future_expose_api_anonymous_and_sessionless as expose_api_anonymous_and_sessionless from galaxy.web.base.controller import BaseAPIController from galaxy.web.base.controller import UsesVisualizationMixin +from ._fetch_util import validate_and_normalize_targets log = logging.getLogger(__name__) @@ -289,12 +290,47 @@ def download(self, trans, id, **kwds): trans.response.headers["Content-Disposition"] = 'attachment; filename="%s.tgz"' % (id) return download_file + @expose_api_anonymous + def fetch(self, trans, payload, **kwd): + """Adapt clean API to tool-constrained API. + """ + log.info("Keywords are %s" % payload) + request_version = '1' + history_id = payload.pop("history_id") + clean_payload = {} + files_payload = {} + for key, value in payload.items(): + if key == "key": + continue + if key.startswith('files_') or key.startswith('__files_'): + files_payload[key] = value + continue + clean_payload[key] = value + log.info("payload %s" % clean_payload) + validate_and_normalize_targets(trans, clean_payload) + clean_payload["check_content"] = trans.app.config.check_upload_content + request = dumps(clean_payload) + log.info(request) + create_payload = { + 'tool_id': "__DATA_FETCH__", + 'history_id': history_id, + 'inputs': { + 'request_version': request_version, + 'request_json': request, + }, + } + create_payload.update(files_payload) + return self._create(trans, create_payload, **kwd) + @expose_api_anonymous def create(self, trans, payload, **kwd): """ POST /api/tools Executes tool using specified inputs and returns tool's outputs. """ + return self._create(trans, payload, **kwd) + + def _create(self, trans, payload, **kwd): # HACK: for now, if action is rerun, rerun tool. action = payload.get('action', None) if action == 'rerun': diff --git a/lib/galaxy/webapps/galaxy/buildapp.py b/lib/galaxy/webapps/galaxy/buildapp.py index 04f56c0fd5f5..cd3c80c65188 100644 --- a/lib/galaxy/webapps/galaxy/buildapp.py +++ b/lib/galaxy/webapps/galaxy/buildapp.py @@ -266,6 +266,7 @@ def populate_api_routes(webapp, app): # ====== TOOLS API ====== # ======================= + webapp.mapper.connect('/api/tools/fetch', action='fetch', controller='tools', conditions=dict(method=["POST"])) webapp.mapper.connect('/api/tools/all_requirements', action='all_requirements', controller="tools") webapp.mapper.connect('/api/tools/{id:.+?}/build', action='build', controller="tools") webapp.mapper.connect('/api/tools/{id:.+?}/reload', action='reload', controller="tools") diff --git a/test-data/example-bag.zip b/test-data/example-bag.zip new file mode 100644 index 0000000000000000000000000000000000000000..ee4de52c265be0c5ddfe54ade610957580115dac GIT binary patch literal 2966 zcmWIWW@h1H00G^m9&a!MN{BGXFqEVgm*^%Xrt7AqmLzBBW|Wi^=!b@IGBBsQ8$>Ph zG>9s#;AUWC`Nqh=z#;mYgs!XZ3B?^0f-fFm`Zt`8A z>1ZbBp7rKCq#(fdpw|B&E8D7L;)ZH=W_MR~s?QMXl4xXGwsP;@9}&Ce%>4g9t)Fe( z!ljIh+vZ-Jr?NV+JHQ+@O#^gzJP~5vR?8p#Z|NxI-ed zgitUzC8m3p=!T^h6=&w>St%IkS(v7HxNQv;}(335b>P85!j2=;G^&;>}FV z*p@wglDENthvh)nHl|ez8)f)Dsyn{^+WgMZs?#I zn&~Pev>=Ak0n7%S;31$=noE6!&7uF}kM> z`I-%Q7!J%0|IipDbGz`GmqFP>_ClHPwOS4n7M@Rk`Br?Tl(JE<=9yU=SQvU)v%PeZ zUsRvgy+8e@q3!d9f1}(FO*v@T^Te|`R83Kk;o}LpxAs>|_UHZH`15yk`}v(~@3Nx$ zVE(J~2^~P!uK{9pLOv+YNHj7vBj|}U)_jM6p181g`3qK!JehhUrHd0=r7a4396FR< z9Qc@+Q2F@q&7E0oN;^G*Ro5Ii+I#4r>cmM7iwx3&^Cvbf_?x#kbRbe=f3^ zH!bsb4l~oVyhxKwcAvXJC2rFumE^G;x)7<|xqy>>Wy0xeM`yKrE&OR8w!ZA9ZG3jj z``TT5uOI(_=25AY$@%kuo_vkRqa}&yq{No*S;K8ih8(RA|E*0jTcAE+LgP~g#<@|&pGu@?0@;X;NXNX1&_rZW-snEx@pSYJ(gCrTjqRC?DO=0o)h>IJB$KKEZ)tD z4V<-c@vgep$@4N##7)jVnXu&41_|cU*lOQNy2-!(`gHTgwy(Xq*NMUH+6SeH4=zk9 zu72_PM8)~j$rm(dEGiUV<#_i1%VFkIPRv_x{8PSH?)pmo7g|bHUa&4=cI05WaA)}i);Eug4GLru*t{E`I7t8a5vN&r@9VBz!Dp^6;7HVT zRN3)%sxXWDE53$Xc_us7KL57y<`R*{30H4jQ+@pV^S2lC*FXOL<3`_G=HhPuck^!S zSXHFUvrIKFE#sWw>^Q|MD`qr@Z%|$J(|fmU{;3P9;!ga13gJGIA_1%SOx)#|QlusB z)9?6Z;*{Xs)@6#9mwpetZhrOLs;h1*85Fl(ni6&~=DsX2xd=!u_cA9I`DDK+CYXA7~4#)+1~MX61z(M9*+o0jfu@Sb67|*c+&0%2R0zxKWSkt@^7^NRoTEWf0$nuSm zfq_K?s5k&_ggBZJe8@&1%;5u?LC~B=amj9Opy?nijA2e%W=^Ux*ij(E1TY;{T$%*5 z5TuL?;XV$a`xFeU%-Dctf-s8vKxz%aMt%VL^CJ+W`7=NdcP1$Rx*%D*_~dZUg}VhPRF&8YwzhA<=>6L}cSIBLmquXJq4` zQ3Es+6q*FgM2#4TnZTsLu%xjY!%S#25jGezW{?ejjBGG8o`7b8;t7vqG2;i>OhcfX g;o%7~6Bt`SGeNP1VJ0gmiWyjea0yUXJ;=8V0D^6pPXGV_ literal 0 HcmV?d00001 diff --git a/test/api/test_dataset_collections.py b/test/api/test_dataset_collections.py index ca8950d8bb0c..5367668bda18 100644 --- a/test/api/test_dataset_collections.py +++ b/test/api/test_dataset_collections.py @@ -188,6 +188,37 @@ def test_enforces_unique_names(self): create_response = self._post("dataset_collections", payload) self._assert_status_code_is(create_response, 400) + def test_upload_collection(self): + items = [{"src": "files", "dbkey": "hg19", "info": "my cool bed"}] + targets = [{ + "destination": {"type": "hdca"}, + "items": items, + "collection_type": "list", + }] + payload = { + "history_id": self.history_id, # TODO: Shouldn't be needed :( + "targets": json.dumps(targets), + "__files": {"files_0|file_data": open(self.test_data_resolver.get_filename("4.bed"))}, + } + self.dataset_populator.fetch(payload) + hdca = self._assert_one_collection_created_in_history() + assert len(hdca["elements"]) == 1, hdca + element0 = hdca["elements"][0] + assert element0["element_identifier"] == "4.bed" + assert element0["object"]["file_size"] == 61 + + def _assert_one_collection_created_in_history(self): + contents_response = self._get("histories/%s/contents/dataset_collections" % self.history_id) + self._assert_status_code_is(contents_response, 200) + contents = contents_response.json() + assert len(contents) == 1 + hdca = contents[0] + assert hdca["history_content_type"] == "dataset_collection" + hdca_id = hdca["id"] + collection_response = self._get("histories/%s/contents/dataset_collections/%s" % (self.history_id, hdca_id)) + self._assert_status_code_is(collection_response, 200) + return collection_response.json() + def _check_create_response(self, create_response): self._assert_status_code_is(create_response, 200) dataset_collection = create_response.json() diff --git a/test/api/test_libraries.py b/test/api/test_libraries.py index 2a715f50fcbb..edca16c4fb6d 100644 --- a/test/api/test_libraries.py +++ b/test/api/test_libraries.py @@ -1,3 +1,5 @@ +import json + from base import api from base.populators import ( DatasetCollectionPopulator, @@ -95,6 +97,151 @@ def test_create_dataset(self): assert library_dataset["peek"].find("create_test") >= 0 assert library_dataset["file_ext"] == "txt", library_dataset["file_ext"] + def test_fetch_path_to_folder(self): + history_id, library, destination = self._setup_fetch_to_folder("flat_zip") + bed_test_data_path = self.test_data_resolver.get_filename("4.bed") + items = [{"src": "path", "path": bed_test_data_path, "info": "my cool bed"}] + targets = [{ + "destination": destination, + "items": items + }] + payload = { + "history_id": history_id, # TODO: Shouldn't be needed :( + "targets": json.dumps(targets), + } + self.dataset_populator.fetch(payload) + contents = self._get("libraries/%s/contents" % library["id"]).json() + c = [c for c in contents if c["name"] == "/4.bed"][0] + dataset = self._get(c["url"]).json() + assert dataset["file_size"] == 61, dataset + + def test_fetch_upload_to_folder(self): + history_id, library, destination = self._setup_fetch_to_folder("flat_zip") + items = [{"src": "files", "dbkey": "hg19", "info": "my cool bed"}] + targets = [{ + "destination": destination, + "items": items + }] + payload = { + "history_id": history_id, # TODO: Shouldn't be needed :( + "targets": json.dumps(targets), + "__files": {"files_0|file_data": open(self.test_data_resolver.get_filename("4.bed"))}, + } + self.dataset_populator.fetch(payload) + contents = self._get("libraries/%s/contents" % library["id"]).json() + c = [c for c in contents if c["name"] == "/4.bed"][0] + dataset = self._get(c["url"]).json() + assert dataset["file_size"] == 61, dataset + assert dataset["genome_build"] == "hg19", dataset + assert dataset["misc_info"] == "my cool bed", dataset + assert dataset["file_ext"] == "bed", dataset + + def test_fetch_zip_to_folder(self): + history_id, library, destination = self._setup_fetch_to_folder("flat_zip") + bed_test_data_path = self.test_data_resolver.get_filename("4.bed.zip") + targets = [{ + "destination": destination, + "items_from": "archive", "src": "path", "path": bed_test_data_path, + }] + payload = { + "history_id": history_id, # TODO: Shouldn't be needed :( + "targets": json.dumps(targets), + } + self.dataset_populator.fetch(payload) + contents = self._get("libraries/%s/contents" % library["id"]).json() + c = [c for c in contents if c["name"] == "/4.bed"][0] + dataset = self._get(c["url"]).json() + assert dataset["file_size"] == 61, dataset + + def test_fetch_single_url_to_folder(self): + history_id, library, destination = self._setup_fetch_to_folder("single_url") + items = [{"src": "url", "url": "https://raw.githubusercontent.com/galaxyproject/galaxy/dev/test-data/4.bed"}] + targets = [{ + "destination": destination, + "items": items + }] + payload = { + "history_id": history_id, # TODO: Shouldn't be needed :( + "targets": json.dumps(targets), + } + self.dataset_populator.fetch(payload) + contents = self._get("libraries/%s/contents" % library["id"]).json() + c = [c for c in contents if c["name"] == "/4.bed"][0] + dataset = self._get(c["url"]).json() + assert dataset["file_size"] == 61, dataset + + def test_fetch_url_archive_to_folder(self): + history_id, library, destination = self._setup_fetch_to_folder("single_url") + targets = [{ + "destination": destination, + "items_from": "archive", + "src": "url", + "url": "https://raw.githubusercontent.com/galaxyproject/galaxy/dev/test-data/4.bed.zip", + }] + payload = { + "history_id": history_id, # TODO: Shouldn't be needed :( + "targets": json.dumps(targets), + } + self.dataset_populator.fetch(payload) + contents = self._get("libraries/%s/contents" % library["id"]).json() + c = [c for c in contents if c["name"] == "/4.bed"][0] + dataset = self._get(c["url"]).json() + assert dataset["file_size"] == 61, dataset + + def test_fetch_recursive_archive(self): + history_id, library, destination = self._setup_fetch_to_folder("recursive_archive") + bed_test_data_path = self.test_data_resolver.get_filename("testdir1.zip") + targets = [{ + "destination": destination, + "items_from": "archive", "src": "path", "path": bed_test_data_path, + }] + payload = { + "history_id": history_id, # TODO: Shouldn't be needed :( + "targets": json.dumps(targets), + } + self.dataset_populator.fetch(payload) + contents = self._get("libraries/%s/contents" % library["id"]).json() + c = [c for c in contents if c["name"] == "/file1"][0] + dataset = self._get(c["url"]).json() + assert dataset["file_size"] == 6, dataset + + c2 = [c2 for c2 in contents if c2["name"] == "/file2"][0] + dataset = self._get(c2["url"]).json() + assert dataset["file_size"] == 6, dataset + + c3 = [c3 for c3 in contents if c3["name"] == "/dir1/file3"][0] + dataset = self._get(c3["url"]).json() + assert dataset["file_size"] == 11, dataset + + def test_fetch_bagit_archive_to_folder(self): + history_id, library, destination = self._setup_fetch_to_folder("bagit_archive") + example_bag_path = self.test_data_resolver.get_filename("example-bag.zip") + targets = [{ + "destination": destination, + "items_from": "bagit_archive", "src": "files", + }] + payload = { + "history_id": history_id, # TODO: Shouldn't be needed :( + "targets": json.dumps(targets), + "__files": {"files_0|file_data": open(example_bag_path)}, + } + self.dataset_populator.fetch(payload) + contents = self._get("libraries/%s/contents" % library["id"]).json() + c = [c for c in contents if c["name"] == "/README.txt"][0] + dataset = self._get(c["url"]).json() + assert dataset["file_size"] == 66, dataset + + c2 = [c2 for c2 in contents if c2["name"] == "/bdbag-profile.json"][0] + dataset = self._get(c2["url"]).json() + assert dataset["file_size"] == 723, dataset + + def _setup_fetch_to_folder(self, test_name): + history_id = self.dataset_populator.new_history() + library = self.library_populator.new_private_library(test_name) + folder_id = library["root_folder_id"][1:] + destination = {"type": "library_folder", "library_folder_id": folder_id} + return history_id, library, destination + def test_create_dataset_in_folder(self): library = self.library_populator.new_private_library("ForCreateDatasets") folder_response = self._create_folder(library) diff --git a/test/base/populators.py b/test/base/populators.py index 3d828b1b4269..324a53ec890c 100644 --- a/test/base/populators.py +++ b/test/base/populators.py @@ -148,14 +148,30 @@ def new_dataset_request(self, history_id, content=None, wait=False, **kwds): self.wait_for_tool_run(history_id, run_response) return run_response + def fetch(self, payload, assert_ok=True, timeout=DEFAULT_TIMEOUT): + tool_response = self._post("tools/fetch", data=payload) + if assert_ok: + job = self.check_run(tool_response) + self.wait_for_job(job["id"], timeout=timeout) + + job = tool_response.json()["jobs"][0] + details = self.get_job_details(job["id"]).json() + assert details["state"] == "ok", details + + return tool_response + def wait_for_tool_run(self, history_id, run_response, timeout=DEFAULT_TIMEOUT): - run = run_response.json() - assert run_response.status_code == 200, run - job = run["jobs"][0] + job = self.check_run(run_response) self.wait_for_job(job["id"], timeout=timeout) self.wait_for_history(history_id, assert_ok=True, timeout=timeout) return run_response + def check_run(self, run_response): + run = run_response.json() + assert run_response.status_code == 200, run + job = run["jobs"][0] + return job + def wait_for_history(self, history_id, assert_ok=False, timeout=DEFAULT_TIMEOUT): try: return wait_on_state(lambda: self._get("histories/%s" % history_id), assert_ok=assert_ok, timeout=timeout) @@ -266,8 +282,8 @@ def run_tool(self, tool_id, inputs, history_id, assert_ok=True, **kwds): else: return tool_response - def tools_post(self, payload): - tool_response = self._post("tools", data=payload) + def tools_post(self, payload, url="tools"): + tool_response = self._post(url, data=payload) return tool_response def get_history_dataset_content(self, history_id, wait=True, filename=None, **kwds): diff --git a/tools/data_source/upload.py b/tools/data_source/upload.py index e6c6cb6dc8ca..9198f89c325b 100644 --- a/tools/data_source/upload.py +++ b/tools/data_source/upload.py @@ -18,8 +18,12 @@ from galaxy import util from galaxy.datatypes import sniff -from galaxy.datatypes.binary import Binary from galaxy.datatypes.registry import Registry +from galaxy.datatypes.upload_util import ( + handle_sniffable_binary_check, + handle_unsniffable_binary_check, + UploadProblemException, +) from galaxy.util.checkers import ( check_binary, check_bz2, @@ -104,34 +108,25 @@ def add_file(dataset, registry, json_file, output_path): try: ext = dataset.file_type except AttributeError: - file_err('Unable to process uploaded file, missing file_type parameter.', dataset, json_file) - return + raise UploadProblemException('Unable to process uploaded file, missing file_type parameter.') if dataset.type == 'url': try: - page = urlopen(dataset.path) # page will be .close()ed by sniff methods - temp_name = sniff.stream_to_file(page, prefix='url_paste', source_encoding=util.get_charset_from_http_headers(page.headers)) + dataset.path = sniff.stream_url_to_file(dataset.path) except Exception as e: - file_err('Unable to fetch %s\n%s' % (dataset.path, str(e)), dataset, json_file) - return - dataset.path = temp_name + raise UploadProblemException('Unable to fetch %s\n%s' % (dataset.path, str(e))) + # See if we have an empty file if not os.path.exists(dataset.path): - file_err('Uploaded temporary file (%s) does not exist.' % dataset.path, dataset, json_file) - return + raise UploadProblemException('Uploaded temporary file (%s) does not exist.' % dataset.path) + if not os.path.getsize(dataset.path) > 0: - file_err('The uploaded file is empty', dataset, json_file) - return + raise UploadProblemException('The uploaded file is empty') + # Is dataset content supported sniffable binary? is_binary = check_binary(dataset.path) if is_binary: - # Sniff the data type - guessed_ext = sniff.guess_ext(dataset.path, registry.sniff_order) - # Set data_type only if guessed_ext is a binary datatype - datatype = registry.get_datatype_by_extension(guessed_ext) - if isinstance(datatype, Binary): - data_type = guessed_ext - ext = guessed_ext + data_type, ext = handle_sniffable_binary_check(data_type, ext, dataset.path, registry) if not data_type: root_datatype = registry.get_datatype_by_extension(dataset.file_type) if getattr(root_datatype, 'compressed', False): @@ -141,8 +136,8 @@ def add_file(dataset, registry, json_file, output_path): # See if we have a gzipped file, which, if it passes our restrictions, we'll uncompress is_gzipped, is_valid = check_gzip(dataset.path, check_content=check_content) if is_gzipped and not is_valid: - file_err('The gzipped uploaded file contains inappropriate content', dataset, json_file) - return + raise UploadProblemException('The gzipped uploaded file contains inappropriate content') + elif is_gzipped and is_valid and auto_decompress: if not link_data_only: # We need to uncompress the temp_name file, but BAM files must remain compressed in the BGZF format @@ -155,8 +150,8 @@ def add_file(dataset, registry, json_file, output_path): except IOError: os.close(fd) os.remove(uncompressed) - file_err('Problem decompressing gzipped data', dataset, json_file) - return + raise UploadProblemException('Problem decompressing gzipped data') + if not chunk: break os.write(fd, chunk) @@ -174,8 +169,8 @@ def add_file(dataset, registry, json_file, output_path): # See if we have a bz2 file, much like gzip is_bzipped, is_valid = check_bz2(dataset.path, check_content) if is_bzipped and not is_valid: - file_err('The gzipped uploaded file contains inappropriate content', dataset, json_file) - return + raise UploadProblemException('The gzipped uploaded file contains inappropriate content') + elif is_bzipped and is_valid and auto_decompress: if not link_data_only: # We need to uncompress the temp_name file @@ -188,8 +183,8 @@ def add_file(dataset, registry, json_file, output_path): except IOError: os.close(fd) os.remove(uncompressed) - file_err('Problem decompressing bz2 compressed data', dataset, json_file) - return + raise UploadProblemException('Problem decompressing bz2 compressed data') + if not chunk: break os.write(fd, chunk) @@ -228,7 +223,7 @@ def add_file(dataset, registry, json_file, output_path): except IOError: os.close(fd) os.remove(uncompressed) - file_err('Problem decompressing zipped data', dataset, json_file) + raise UploadProblemException('Problem decompressing zipped data') return if not chunk: break @@ -247,8 +242,8 @@ def add_file(dataset, registry, json_file, output_path): except IOError: os.close(fd) os.remove(uncompressed) - file_err('Problem decompressing zipped data', dataset, json_file) - return + raise UploadProblemException('Problem decompressing zipped data') + z.close() # Replace the zipped file with the decompressed file if it's safe to do so if uncompressed is not None: @@ -259,26 +254,15 @@ def add_file(dataset, registry, json_file, output_path): os.chmod(dataset.path, 0o644) dataset.name = uncompressed_name data_type = 'zip' - if not data_type: - if is_binary or registry.is_extension_unsniffable_binary(dataset.file_type): - # We have a binary dataset, but it is not Bam, Sff or Pdf - data_type = 'binary' - parts = dataset.name.split(".") - if len(parts) > 1: - ext = parts[-1].strip().lower() - is_ext_unsniffable_binary = registry.is_extension_unsniffable_binary(ext) - if check_content and not is_ext_unsniffable_binary: - file_err('The uploaded binary file contains inappropriate content', dataset, json_file) - return - elif is_ext_unsniffable_binary and dataset.file_type != ext: - err_msg = "You must manually set the 'File Format' to '%s' when uploading %s files." % (ext, ext) - file_err(err_msg, dataset, json_file) - return + if not data_type and is_binary: + data_type, ext = handle_unsniffable_binary_check( + data_type, ext, dataset.path, dataset.name, dataset.file_type, check_content, registry + ) if not data_type: # We must have a text file if check_content and check_html(dataset.path): - file_err('The uploaded file contains inappropriate HTML content', dataset, json_file) - return + raise UploadProblemException('The uploaded file contains inappropriate HTML content') + if data_type != 'binary': if not link_data_only and data_type not in ('gzip', 'bz2', 'zip'): # Convert universal line endings to Posix line endings if to_posix_lines is True @@ -308,8 +292,8 @@ def add_file(dataset, registry, json_file, output_path): if datatype.dataset_content_needs_grooming(dataset.path): err_msg = 'The uploaded files need grooming, so change your Copy data into Galaxy? selection to be ' + \ 'Copy files into Galaxy instead of Link to files without copying into Galaxy so grooming can be performed.' - file_err(err_msg, dataset, json_file) - return + raise UploadProblemException(err_msg) + if not link_data_only and converted_path: # Move the dataset to its "real" path try: @@ -334,6 +318,7 @@ def add_file(dataset, registry, json_file, output_path): if dataset.get('uuid', None) is not None: info['uuid'] = dataset.get('uuid') json_file.write(dumps(info) + "\n") + if not link_data_only and datatype and datatype.dataset_content_needs_grooming(output_path): # Groom the dataset content if necessary datatype.groom_dataset_content(output_path) @@ -407,7 +392,10 @@ def __main__(): files_path = output_paths[int(dataset.dataset_id)][1] add_composite_file(dataset, json_file, output_path, files_path) else: - add_file(dataset, registry, json_file, output_path) + try: + add_file(dataset, registry, json_file, output_path) + except UploadProblemException as e: + file_err(e.message, dataset, json_file) # clean up paramfile # TODO: this will not work when running as the actual user unless the