diff --git a/lib/galaxy/app.py b/lib/galaxy/app.py index 25de26171058..ce22ccd7f396 100644 --- a/lib/galaxy/app.py +++ b/lib/galaxy/app.py @@ -12,6 +12,8 @@ from galaxy import config, jobs from galaxy.jobs import metrics as job_metrics from galaxy.managers.collections import DatasetCollectionManager +from galaxy.managers.folders import FolderManager +from galaxy.managers.libraries import LibraryManager from galaxy.managers.tags import GalaxyTagManager from galaxy.openid.providers import OpenIDProviders from galaxy.queue_worker import GalaxyQueueWorker @@ -90,6 +92,8 @@ def __init__(self, **kwargs): self.tag_handler = GalaxyTagManager(self.model.context) # Dataset Collection Plugins self.dataset_collections_service = DatasetCollectionManager(self) + self.library_folder_manager = FolderManager() + self.library_manager = LibraryManager() # Tool Data Tables self._configure_tool_data_tables(from_shed_config=False) diff --git a/lib/galaxy/datatypes/sniff.py b/lib/galaxy/datatypes/sniff.py index cf058cf069ac..4f019234e142 100644 --- a/lib/galaxy/datatypes/sniff.py +++ b/lib/galaxy/datatypes/sniff.py @@ -14,6 +14,7 @@ import zipfile from six import text_type +from six.moves.urllib.request import urlopen from galaxy import util from galaxy.util import compression_utils @@ -39,6 +40,12 @@ def get_test_fname(fname): return full_path +def stream_url_to_file(path): + page = urlopen(path) # page will be .close()ed in stream_to_file + temp_name = stream_to_file(page, prefix='url_paste', source_encoding=util.get_charset_from_http_headers(page.headers)) + return temp_name + + def stream_to_open_named_file(stream, fd, filename, source_encoding=None, source_error='strict', target_encoding=None, target_error='strict'): """Writes a stream to the provided file descriptor, returns the file name. Closes file descriptor""" # signature and behavor is somewhat odd, due to backwards compatibility, but this can/should be done better diff --git a/lib/galaxy/datatypes/upload_util.py b/lib/galaxy/datatypes/upload_util.py new file mode 100644 index 000000000000..8c65ad6aad9a --- /dev/null +++ b/lib/galaxy/datatypes/upload_util.py @@ -0,0 +1,47 @@ +from galaxy.datatypes import sniff +from galaxy.datatypes.binary import Binary + + +class UploadProblemException(Exception): + + def __init__(self, message): + self.message = message + + +def handle_unsniffable_binary_check(data_type, ext, path, name, requested_ext, check_content, registry): + """Return modified values of data_type and ext if unsniffable binary encountered. + + Throw UploadProblemException if content problems or extension mismatches occur. + + Precondition: check_binary called returned True. + """ + if registry.is_extension_unsniffable_binary(requested_ext): + # We have a binary dataset, but it is not Bam, Sff or Pdf + data_type = 'binary' + parts = name.split(".") + if len(parts) > 1: + ext = parts[-1].strip().lower() + is_ext_unsniffable_binary = registry.is_extension_unsniffable_binary(ext) + if check_content and not is_ext_unsniffable_binary: + raise UploadProblemException('The uploaded binary file contains inappropriate content') + + elif is_ext_unsniffable_binary and requested_ext != ext: + err_msg = "You must manually set the 'File Format' to '%s' when uploading %s files." % (ext, ext) + raise UploadProblemException(err_msg) + return ext, data_type + + +def handle_sniffable_binary_check(data_type, ext, path, registry): + """Return modified values of data_type and ext if sniffable binary encountered. + + Precondition: check_binary called returned True. + """ + # Sniff the data type + guessed_ext = sniff.guess_ext(path, registry.sniff_order) + # Set data_type only if guessed_ext is a binary datatype + datatype = registry.get_datatype_by_extension(guessed_ext) + if isinstance(datatype, Binary): + data_type = guessed_ext + ext = guessed_ext + + return data_type, ext diff --git a/lib/galaxy/jobs/__init__.py b/lib/galaxy/jobs/__init__.py index 3afd5657a3af..08793f07a091 100644 --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -1363,7 +1363,7 @@ def path_rewriter(path): collected_datasets = { 'primary': self.tool.collect_primary_datasets(out_data, self.get_tool_provided_job_metadata(), tool_working_directory, input_ext, input_dbkey) } - self.tool.collect_dynamic_collections( + self.tool.collect_dynamic_outputs( out_collections, self.get_tool_provided_job_metadata(), job_working_directory=tool_working_directory, diff --git a/lib/galaxy/managers/collections.py b/lib/galaxy/managers/collections.py index e823b0f44ca3..dad652a48c2a 100644 --- a/lib/galaxy/managers/collections.py +++ b/lib/galaxy/managers/collections.py @@ -46,7 +46,7 @@ def __init__(self, app): self.tag_manager = tags.GalaxyTagManager(app.model.context) self.ldda_manager = lddas.LDDAManager(app) - def precreate_dataset_collection_instance(self, trans, parent, name, implicit_inputs, implicit_output_name, structure): + def precreate_dataset_collection_instance(self, trans, parent, name, structure, implicit_inputs=None, implicit_output_name=None): # TODO: prebuild all required HIDs and send them in so no need to flush in between. dataset_collection = self.precreate_dataset_collection(structure) instance = self._create_instance_for_collection( @@ -56,7 +56,9 @@ def precreate_dataset_collection_instance(self, trans, parent, name, implicit_in def precreate_dataset_collection(self, structure): if structure.is_leaf or not structure.children_known: - return model.DatasetCollectionElement.UNINITIALIZED_ELEMENT + collection_type_description = structure.collection_type_description + dataset_collection = model.DatasetCollection(populated=False) + dataset_collection.collection_type = collection_type_description.collection_type else: collection_type_description = structure.collection_type_description dataset_collection = model.DatasetCollection(populated=False) @@ -78,7 +80,7 @@ def precreate_dataset_collection(self, structure): dataset_collection.elements = elements dataset_collection.element_count = len(elements) - return dataset_collection + return dataset_collection def create(self, trans, parent, name, collection_type, element_identifiers=None, elements=None, implicit_collection_info=None, trusted_identifiers=None, diff --git a/lib/galaxy/tools/__init__.py b/lib/galaxy/tools/__init__.py index 660f47869633..13914089a833 100755 --- a/lib/galaxy/tools/__init__.py +++ b/lib/galaxy/tools/__init__.py @@ -101,6 +101,7 @@ # Tools that require Galaxy's Python environment to be preserved. GALAXY_LIB_TOOLS_UNVERSIONED = [ "upload1", + "__DATA_FETCH__", # Legacy tools bundled with Galaxy. "vcf_to_maf_customtrack1", "laj_1", @@ -1039,7 +1040,10 @@ def parse_input_elem(self, page_source, enctypes, context=None): group.file_type_name = elem.get('file_type_name', group.file_type_name) group.default_file_type = elem.get('default_file_type', group.default_file_type) group.metadata_ref = elem.get('metadata_ref', group.metadata_ref) - rval[group.file_type_name].refresh_on_change = True + try: + rval[group.file_type_name].refresh_on_change = True + except KeyError: + pass group_page_source = XmlPageSource(elem) group.inputs = self.parse_input_elem(group_page_source, enctypes, context) rval[group.name] = group @@ -1576,10 +1580,10 @@ def collect_primary_datasets(self, output, tool_provided_metadata, job_working_d """ return output_collect.collect_primary_datasets(self, output, tool_provided_metadata, job_working_directory, input_ext, input_dbkey=input_dbkey) - def collect_dynamic_collections(self, output, tool_provided_metadata, **kwds): - """ Find files corresponding to dynamically structured collections. + def collect_dynamic_outputs(self, output, tool_provided_metadata, **kwds): + """Collect dynamic outputs associated with a job from this tool. """ - return output_collect.collect_dynamic_collections(self, output, tool_provided_metadata, **kwds) + return output_collect.collect_dynamic_outputs(self, output, tool_provided_metadata, **kwds) def to_archive(self): tool = self diff --git a/lib/galaxy/tools/actions/upload.py b/lib/galaxy/tools/actions/upload.py index 70bf8152c44a..c19fb960ac86 100644 --- a/lib/galaxy/tools/actions/upload.py +++ b/lib/galaxy/tools/actions/upload.py @@ -1,3 +1,4 @@ +import json import logging from galaxy.tools.actions import upload_common @@ -36,3 +37,57 @@ def execute(self, tool, trans, incoming={}, set_output_hid=True, history=None, * rval = upload_common.create_job(trans, incoming, tool, json_file_path, data_list, history=history) log.debug("Created upload job %s" % create_job_timer) return rval + + +class FetchUploadToolAction(ToolAction): + + def execute(self, tool, trans, incoming={}, set_output_hid=True, history=None, **kwargs): + dataset_upload_inputs = [] + for input_name, input in tool.inputs.items(): + if input.type == "upload_dataset": + dataset_upload_inputs.append(input) + assert dataset_upload_inputs, Exception("No dataset upload groups were found.") + + persisting_uploads_timer = ExecutionTimer() + # precreated_datasets = upload_common.get_precreated_datasets(trans, incoming, trans.app.model.HistoryDatasetAssociation) + incoming = upload_common.persist_uploads(incoming, trans) + log.debug("Persisted uploads %s" % persisting_uploads_timer) + + # Now replace references in requests with these. + files = incoming.get("files", []) + files_iter = iter(files) + request = json.loads(incoming.get("request_json")) + + def replace_file_srcs(request_part): + if isinstance(request_part, dict): + if request_part.get("src", None) == "files": + path_def = next(files_iter) + request_part["path"] = path_def["file_data"]["local_filename"] + if "name" not in request_part: + request_part["name"] = path_def["file_data"]["filename"] + request_part["src"] = "path" + else: + for key, value in request_part.items(): + replace_file_srcs(value) + elif isinstance(request_part, list): + for value in request_part: + replace_file_srcs(value) + + replace_file_srcs(request) + + incoming["request_json"] = json.dumps(request) + log.info("incoming are %s" % incoming) + # We can pass an empty string as the cntrller here since it is used to check whether we + # are in an admin view, and this tool is currently not used there. + check_and_cleanup_timer = ExecutionTimer() + # uploaded_datasets = upload_common.get_uploaded_datasets(trans, '', incoming, precreated_datasets, dataset_upload_inputs, history=history) + # upload_common.cleanup_unused_precreated_datasets(precreated_datasets) + + # if not uploaded_datasets: + # return None, 'No data was entered in the upload form, please go back and choose data to upload.' + + log.debug("Checked and cleaned uploads %s" % check_and_cleanup_timer) + create_job_timer = ExecutionTimer() + rval = upload_common.create_job(trans, incoming, tool, None, [], history=history) + log.debug("Created upload job %s" % create_job_timer) + return rval diff --git a/lib/galaxy/tools/actions/upload_common.py b/lib/galaxy/tools/actions/upload_common.py index ae70cae17de3..dc7831403123 100644 --- a/lib/galaxy/tools/actions/upload_common.py +++ b/lib/galaxy/tools/actions/upload_common.py @@ -123,7 +123,7 @@ def persist_uploads(params, trans): local_filename=local_filename) elif type(f) == dict and 'local_filename' not in f: raise Exception('Uploaded file was encoded in a way not understood by Galaxy.') - if upload_dataset['url_paste'] and upload_dataset['url_paste'].strip() != '': + if 'url_paste' in upload_dataset and upload_dataset['url_paste'] and upload_dataset['url_paste'].strip() != '': upload_dataset['url_paste'] = datatypes.sniff.stream_to_file( StringIO(validate_url(upload_dataset['url_paste'], trans.app.config.fetch_url_whitelist_ips)), prefix="strio_url_paste_" @@ -334,7 +334,11 @@ def new_upload(trans, cntrller, uploaded_dataset, library_bunch=None, history=No def get_uploaded_datasets(trans, cntrller, params, precreated_datasets, dataset_upload_inputs, library_bunch=None, history=None): uploaded_datasets = [] for dataset_upload_input in dataset_upload_inputs: - uploaded_datasets.extend(dataset_upload_input.get_uploaded_datasets(trans, params)) + try: + uploaded_datasets.extend(dataset_upload_input.get_uploaded_datasets(trans, params)) + except AttributeError: + # TODO: refine... + pass for uploaded_dataset in uploaded_datasets: data = get_precreated_dataset(precreated_datasets, uploaded_dataset.name) if not data: diff --git a/lib/galaxy/tools/data_fetch.py b/lib/galaxy/tools/data_fetch.py new file mode 100644 index 000000000000..1256444e1ee1 --- /dev/null +++ b/lib/galaxy/tools/data_fetch.py @@ -0,0 +1,290 @@ +import argparse +import errno +import json +import os +import shutil +import sys +import tempfile + +import bdbag.bdbag_api + +from galaxy.datatypes import sniff +from galaxy.datatypes.registry import Registry +from galaxy.datatypes.upload_util import ( + handle_sniffable_binary_check, + handle_unsniffable_binary_check, + UploadProblemException, +) +from galaxy.util import in_directory +from galaxy.util.checkers import ( + check_binary, + check_html, +) +from galaxy.util.compression_utils import CompressedFile + +DESCRIPTION = """Data Import Script""" + + +def main(argv=None): + if argv is None: + argv = sys.argv[1:] + args = _arg_parser().parse_args(argv) + + registry = Registry() + registry.load_datatypes(root_dir=args.galaxy_root, config=args.datatypes_registry) + + request_path = args.request + assert os.path.exists(request_path) + with open(request_path) as f: + request = json.load(f) + + upload_config = UploadConfig(request, registry) + galaxy_json = _request_to_galaxy_json(upload_config, request) + with open("galaxy.json", "w") as f: + json.dump(galaxy_json, f) + + +def _request_to_galaxy_json(upload_config, request): + targets = request.get("targets", []) + fetched_targets = [] + + for target in targets: + fetched_target = _fetch_target(upload_config, target) + fetched_targets.append(fetched_target) + + return {"__unnamed_outputs": fetched_targets} + + +def _fetch_target(upload_config, target): + destination = target.get("destination", None) + assert destination, "No destination defined." + + items_from = target.get("items_from", None) + assert not items_from or items_from in ["archive", "bagit", "bagit_archive"] + if items_from == "archive": + decompressed_directory = _decompress_target(target) + items = _directory_to_items(decompressed_directory) + elif items_from == "bagit": + _, items_from_path = _has_src_to_path(target) + items = _bagit_to_items(items_from_path) + elif items_from == "bagit_archive": + decompressed_directory = _decompress_target(target) + items = _bagit_to_items(decompressed_directory) + else: + items = target.get("items") + assert items is not None, "No item definition found for destination [%s]" % destination + + fetched_target = {} + fetched_target["destination"] = destination + if "collection_type" in target: + fetched_target["collection_type"] = target["collection_type"] + + def _resolve_src(item): + converted_path = None + + name, path = _has_src_to_path(item) + dbkey = item.get("dbkey", "?") + requested_ext = item.get("ext", "auto") + info = item.get("info", None) + link_data_only = upload_config.link_data_only + if "link_data_only" in item: + # Allow overriding this on a per file basis. + link_data_only = _link_data_only(item) + to_posix_lines = upload_config.get_option(item, "to_posix_lines") + space_to_tab = upload_config.get_option(item, "space_to_tab") + in_place = item.get("in_place", False) + purge_source = item.get("purge_source", True) + + # Follow upload.py logic but without the auto-decompress logic. + registry = upload_config.registry + check_content = upload_config.check_content + data_type, ext = None, requested_ext + + is_binary = check_binary(path) + if is_binary: + data_type, ext = handle_sniffable_binary_check(data_type, ext, path, registry) + if data_type is None: + if is_binary: + data_type, ext = handle_unsniffable_binary_check( + data_type, ext, path, name, requested_ext, check_content, registry + ) + if not data_type and check_content and check_html(path): + raise UploadProblemException('The uploaded file contains inappropriate HTML content') + + if data_type != 'binary': + if not link_data_only: + if to_posix_lines: + if space_to_tab: + line_count, converted_path = sniff.convert_newlines_sep2tabs(path, in_place=in_place, tmp_dir=".") + else: + line_count, converted_path = sniff.convert_newlines(path, in_place=in_place, tmp_dir=".") + + if requested_ext == 'auto': + ext = sniff.guess_ext(path, registry.sniff_order) + else: + ext = requested_ext + + data_type = ext + + if ext == 'auto' and data_type == 'binary': + ext = 'data' + if ext == 'auto' and requested_ext: + ext = requested_ext + if ext == 'auto': + ext = 'data' + + datatype = registry.get_datatype_by_extension(ext) + if link_data_only: + # Never alter a file that will not be copied to Galaxy's local file store. + if datatype.dataset_content_needs_grooming(path): + err_msg = 'The uploaded files need grooming, so change your Copy data into Galaxy? selection to be ' + \ + 'Copy files into Galaxy instead of Link to files without copying into Galaxy so grooming can be performed.' + raise UploadProblemException(err_msg) + + # If this file is not in the workdir make sure it gets there. + if not link_data_only and converted_path: + path = upload_config.ensure_in_working_directory(converted_path, purge_source, in_place) + elif not link_data_only: + path = upload_config.ensure_in_working_directory(path, purge_source, in_place) + + if not link_data_only and datatype and datatype.dataset_content_needs_grooming(path): + # Groom the dataset content if necessary + datatype.groom_dataset_content(path) + + rval = {"name": name, "filename": path, "dbkey": dbkey, "ext": ext, "link_data_only": link_data_only} + if info is not None: + rval["info"] = info + return rval + + elements = elements_tree_map(_resolve_src, items) + + fetched_target["elements"] = elements + return fetched_target + + +def _bagit_to_items(directory): + bdbag.bdbag_api.resolve_fetch(directory) + bdbag.bdbag_api.validate_bag(directory) + items = _directory_to_items(os.path.join(directory, "data")) + return items + + +def _decompress_target(target): + items_from_name, items_from_path = _has_src_to_path(target) + temp_directory = tempfile.mkdtemp(prefix=items_from_name, dir=".") + decompressed_directory = CompressedFile(items_from_path).extract(temp_directory) + return decompressed_directory + + +def elements_tree_map(f, items): + new_items = [] + for item in items: + if "elements" in item: + new_item = item.copy() + new_item["elements"] = elements_tree_map(f, item["elements"]) + new_items.append(new_item) + else: + new_items.append(f(item)) + return new_items + + +def _directory_to_items(directory): + items = [] + dir_elements = {} + for root, dirs, files in os.walk(directory): + if root in dir_elements: + target = dir_elements[root] + else: + target = items + for dir in dirs: + dir_dict = {"name": dir, "elements": []} + dir_elements[os.path.join(root, dir)] = dir_dict["elements"] + target.append(dir_dict) + for file in files: + target.append({"src": "path", "path": os.path.join(root, file)}) + + return items + + +def _has_src_to_path(item): + assert "src" in item + src = item.get("src") + name = item.get("name") + if src == "url": + url = item.get("url") + path = sniff.stream_url_to_file(url) + if name is None: + name = url.split("/")[-1] + else: + assert src == "path" + path = item["path"] + if name is None: + name = os.path.basename(path) + return name, path + + +def _arg_parser(): + parser = argparse.ArgumentParser(description=DESCRIPTION) + parser.add_argument("--galaxy-root") + parser.add_argument("--datatypes-registry") + parser.add_argument("--request-version") + parser.add_argument("--request") + return parser + + +class UploadConfig(object): + + def __init__(self, request, registry): + self.registry = registry + self.check_content = request.get("check_content" , True) + self.to_posix_lines = request.get("to_posix_lines", False) + self.space_to_tab = request.get("space_to_tab", False) + self.link_data_only = _link_data_only(request) + + self.__workdir = os.path.abspath(".") + self.__upload_count = 0 + + def get_option(self, item, key): + """Return item[key] if specified otherwise use default from UploadConfig. + + This default represents the default for the whole request instead item which + is the option for individual files. + """ + if key in item: + return item[key] + else: + return getattr(self, key) + + def __new_dataset_path(self): + path = "gxupload_%d" % self.__upload_count + self.__upload_count += 1 + return path + + def ensure_in_working_directory(self, path, purge_source, in_place): + if in_directory(path, self.__workdir): + return path + + new_path = self.__new_dataset_path() + if purge_source: + try: + shutil.move(path, new_path) + except OSError as e: + # We may not have permission to remove converted_path + if e.errno != errno.EACCES: + raise + else: + shutil.copy(path, new_path) + + return new_path + + +def _link_data_only(has_config_dict): + link_data_only = has_config_dict.get("link_data_only", False) + if not isinstance(link_data_only, bool): + # Allow the older string values of 'copy_files' and 'link_to_files' + link_data_only = link_data_only == "copy_files" + return link_data_only + + +if __name__ == "__main__": + main() diff --git a/lib/galaxy/tools/data_fetch.xml b/lib/galaxy/tools/data_fetch.xml new file mode 100644 index 000000000000..5160cb2989c8 --- /dev/null +++ b/lib/galaxy/tools/data_fetch.xml @@ -0,0 +1,33 @@ + + + + + + + + + + + + + + + + + $request_json + + + + + diff --git a/lib/galaxy/tools/execute.py b/lib/galaxy/tools/execute.py index 1a83a0d245ff..4ca2cf967278 100644 --- a/lib/galaxy/tools/execute.py +++ b/lib/galaxy/tools/execute.py @@ -278,9 +278,9 @@ def precreate_output_collections(self, history, params): trans=trans, parent=history, name=output_collection_name, + structure=effective_structure, implicit_inputs=implicit_inputs, implicit_output_name=output_name, - structure=effective_structure, ) collection_instance.implicit_collection_jobs = implicit_collection_jobs collection_instances[output_name] = collection_instance diff --git a/lib/galaxy/tools/parameters/output_collect.py b/lib/galaxy/tools/parameters/output_collect.py index c24babea7c4a..2e54d94672fd 100644 --- a/lib/galaxy/tools/parameters/output_collect.py +++ b/lib/galaxy/tools/parameters/output_collect.py @@ -12,7 +12,9 @@ from galaxy.tools.parser.output_collection_def import ( DEFAULT_DATASET_COLLECTOR_DESCRIPTION, INPUT_DBKEY_TOKEN, + ToolProvidedMetadataDatasetCollection, ) +from galaxy.dataset_collections.structure import UnitializedTree from galaxy.util import ( ExecutionTimer, odict @@ -31,6 +33,9 @@ def get_new_datasets(self, output_name): def get_new_dataset_meta_by_basename(self, output_name, basename): return {} + def get_unnamed_outputs(self): + return [] + class LegacyToolProvidedMetadata(object): @@ -73,6 +78,9 @@ def get_new_datasets(self, output_name): log.warning("Called get_new_datasets with legacy tool metadata provider - that is unimplemented.") return [] + def get_unnamed_outputs(self): + return [] + class ToolProvidedMetadata(object): @@ -111,8 +119,12 @@ def _elements_to_datasets(self, elements, level=0): extra_kwds.update(element) yield extra_kwds + def get_unnamed_outputs(self): + log.info(self.tool_provided_job_metadata) + return self.tool_provided_job_metadata.get("__unnamed_outputs", []) + -def collect_dynamic_collections( +def collect_dynamic_outputs( tool, output_collections, tool_provided_metadata, @@ -121,6 +133,7 @@ def collect_dynamic_collections( job=None, input_dbkey="?", ): + app = tool.app collections_service = tool.app.dataset_collections_service job_context = JobContext( tool, @@ -130,6 +143,86 @@ def collect_dynamic_collections( inp_data, input_dbkey, ) + log.info(tool_provided_metadata) + for unnamed_output_dict in tool_provided_metadata.get_unnamed_outputs(): + assert "destination" in unnamed_output_dict + assert "elements" in unnamed_output_dict + destination = unnamed_output_dict["destination"] + elements = unnamed_output_dict["elements"] + + assert "type" in destination + destination_type = destination["type"] + trans = job_context.work_context + + if destination_type == "library_folder": + + library_folder_manager = app.library_folder_manager + library_folder = library_folder_manager.get(trans, app.security.decode_id(destination.get("library_folder_id"))) + + def add_elements_to_folder(elements, library_folder): + for element in elements: + if "elements" in element: + assert "name" in element + name = element["name"] + description = element.get("description") + nested_folder = library_folder_manager.create(trans, library_folder.id, name, description) + add_elements_to_folder(element["elements"], nested_folder) + else: + discovered_file = discovered_file_for_unnamed_output(element, job_working_directory) + fields_match = discovered_file.match + designation = fields_match.designation + visible = fields_match.visible + ext = fields_match.ext + dbkey = fields_match.dbkey + info = element.get("info", None) + + # Create new primary dataset + name = fields_match.name or designation + + job_context.create_dataset( + ext=ext, + designation=designation, + visible=visible, + dbkey=dbkey, + name=name, + filename=discovered_file.path, + info=info, + library_folder=library_folder + ) + + add_elements_to_folder(elements, library_folder) + elif destination_type == "hdca": + history = job.history + assert "collection_type" in unnamed_output_dict + name = unnamed_output_dict.get("name", "unnamed collection") + collection_type = unnamed_output_dict["collection_type"] + collection_type_description = collections_service.collection_type_descriptions.for_collection_type(collection_type) + structure = UnitializedTree(collection_type_description) + hdca = collections_service.precreate_dataset_collection_instance( + trans, history, name, structure=structure + ) + filenames = odict.odict() + + def add_to_discovered_files(elements): + for element in elements: + if "elements" in element: + add_to_discovered_files(element["elements"]) + else: + discovered_file = discovered_file_for_unnamed_output(element, job_working_directory) + filenames[discovered_file.path] = discovered_file + + add_to_discovered_files(elements) + + collection = hdca.collection + collection_builder = collections_service.collection_builder_for( + collection + ) + job_context.populate_collection_elements( + collection, + collection_builder, + filenames, + ) + collection_builder.populate() for name, has_collection in output_collections.items(): if name not in tool.output_collections: @@ -146,13 +239,19 @@ def collect_dynamic_collections( collection = has_collection try: + collection_builder = collections_service.collection_builder_for( collection ) + dataset_collectors = map(dataset_collector, output_collection_def.dataset_collector_descriptions) + output_name = output_collection_def.name + filenames = job_context.find_files(output_name, collection, dataset_collectors) job_context.populate_collection_elements( collection, collection_builder, - output_collection_def, + filenames, + name=output_collection_def.name, + metadata_source_name=output_collection_def.metadata_source, ) collection_builder.populate() except Exception: @@ -171,6 +270,11 @@ def __init__(self, tool, tool_provided_metadata, job, job_working_directory, inp self.job_working_directory = job_working_directory self.tool_provided_metadata = tool_provided_metadata + @property + def work_context(self): + from galaxy.work.context import WorkRequestContext + return WorkRequestContext(self.app, user=self.job.user) + @property def permissions(self): inp_data = self.inp_data @@ -188,15 +292,14 @@ def find_files(self, output_name, collection, dataset_collectors): filenames[discovered_file.path] = discovered_file return filenames - def populate_collection_elements(self, collection, root_collection_builder, output_collection_def): + def populate_collection_elements(self, collection, root_collection_builder, filenames, name=None, metadata_source_name=None): # TODO: allow configurable sorting. # # # # - dataset_collectors = map(dataset_collector, output_collection_def.dataset_collector_descriptions) - output_name = output_collection_def.name - filenames = self.find_files(output_name, collection, dataset_collectors) + if name is None: + name = "unnamed output" element_datasets = [] for filename, discovered_file in filenames.items(): @@ -222,14 +325,14 @@ def populate_collection_elements(self, collection, root_collection_builder, outp dbkey=dbkey, name=name, filename=filename, - metadata_source_name=output_collection_def.metadata_source, + metadata_source_name=metadata_source_name, ) log.debug( "(%s) Created dynamic collection dataset for path [%s] with element identifier [%s] for output [%s] %s", self.job.id, filename, designation, - output_collection_def.name, + name, create_dataset_timer, ) element_datasets.append((element_identifiers, dataset)) @@ -244,7 +347,7 @@ def populate_collection_elements(self, collection, root_collection_builder, outp log.debug( "(%s) Add dynamic collection datsets to history for output [%s] %s", self.job.id, - output_collection_def.name, + name, add_datasets_timer, ) @@ -274,12 +377,17 @@ def create_dataset( dbkey, name, filename, - metadata_source_name, + metadata_source_name=None, + info=None, + library_folder=None, ): app = self.app sa_session = self.sa_session - primary_data = _new_hda(app, sa_session, ext, designation, visible, dbkey, self.permissions) + if not library_folder: + primary_data = _new_hda(app, sa_session, ext, designation, visible, dbkey, self.permissions) + else: + primary_data = _new_ldda(self.work_context, name, ext, visible, dbkey, library_folder) # Copy metadata from one of the inputs if requested. metadata_source = None @@ -299,6 +407,9 @@ def create_dataset( else: primary_data.init_meta() + if info is not None: + primary_data.info = info + primary_data.set_meta() primary_data.set_peek() @@ -465,6 +576,14 @@ def discover_files(output_name, tool_provided_metadata, extra_file_collectors, j yield DiscoveredFile(match.path, collector, match) +def discovered_file_for_unnamed_output(dataset, job_working_directory): + extra_file_collector = DEFAULT_TOOL_PROVIDED_DATASET_COLLECTOR + target_directory = discover_target_directory(extra_file_collector, job_working_directory) + filename = dataset["filename"] + path = os.path.join(target_directory, filename) + return DiscoveredFile(path, extra_file_collector, JsonCollectedDatasetMatch(dataset, extra_file_collector, filename, path=path)) + + def discover_target_directory(extra_file_collector, job_working_directory): directory = job_working_directory if extra_file_collector.directory: @@ -637,6 +756,42 @@ def __init__(self, re_match, collector, filename, path=None): UNSET = object() +def _new_ldda( + trans, + name, + ext, + visible, + dbkey, + library_folder, +): + ld = trans.app.model.LibraryDataset(folder=library_folder, name=name) + trans.sa_session.add(ld) + trans.sa_session.flush() + trans.app.security_agent.copy_library_permissions(trans, library_folder, ld) + + ldda = trans.app.model.LibraryDatasetDatasetAssociation(name=name, + extension=ext, + dbkey=dbkey, + library_dataset=ld, + user=trans.user, + create_dataset=True, + sa_session=trans.sa_session) + trans.sa_session.add(ldda) + ldda.state = ldda.states.OK + # Permissions must be the same on the LibraryDatasetDatasetAssociation and the associated LibraryDataset + trans.app.security_agent.copy_library_permissions(trans, ld, ldda) + # Copy the current user's DefaultUserPermissions to the new LibraryDatasetDatasetAssociation.dataset + trans.app.security_agent.set_all_dataset_permissions(ldda.dataset, trans.app.security_agent.user_get_default_permissions(trans.user)) + library_folder.add_library_dataset(ld, genome_build=dbkey) + trans.sa_session.add(library_folder) + trans.sa_session.flush() + + ld.library_dataset_dataset_association_id = ldda.id + trans.sa_session.add(ld) + trans.sa_session.flush() + return ldda + + def _new_hda( app, sa_session, @@ -663,3 +818,4 @@ def _new_hda( DEFAULT_DATASET_COLLECTOR = DatasetCollector(DEFAULT_DATASET_COLLECTOR_DESCRIPTION) +DEFAULT_TOOL_PROVIDED_DATASET_COLLECTOR = ToolMetadataDatasetCollector(ToolProvidedMetadataDatasetCollection()) diff --git a/lib/galaxy/tools/special_tools.py b/lib/galaxy/tools/special_tools.py index 953e69dee647..129b7064a941 100644 --- a/lib/galaxy/tools/special_tools.py +++ b/lib/galaxy/tools/special_tools.py @@ -4,6 +4,7 @@ SPECIAL_TOOLS = { "history export": "galaxy/tools/imp_exp/exp_history_to_archive.xml", "history import": "galaxy/tools/imp_exp/imp_history_from_archive.xml", + "data fetch": "galaxy/tools/data_fetch.xml", } diff --git a/lib/galaxy/webapps/galaxy/api/_fetch_util.py b/lib/galaxy/webapps/galaxy/api/_fetch_util.py new file mode 100644 index 000000000000..5531f8271835 --- /dev/null +++ b/lib/galaxy/webapps/galaxy/api/_fetch_util.py @@ -0,0 +1,103 @@ +import os + +from galaxy.actions.library import ( + validate_path_upload, + validate_server_directory_upload, +) +from galaxy.exceptions import ( + RequestParameterInvalidException +) +from galaxy.tools.actions.upload_common import validate_url +from galaxy.util import ( + relpath, +) + + +def validate_and_normalize_targets(trans, payload): + """Validate and normalize all src references in fetch targets. + + - Normalize ftp_import and server_dir src entries into simple path entires + with the relevant paths resolved and permissions / configuration checked. + - Check for file:// URLs in items src of "url" and convert them into path + src items - after verifying path pastes are allowed and user is admin. + - Check for valid URLs to be fetched for http and https entries. + - Based on Galaxy configuration and upload types set purge_source and in_place + as needed for each upload. + """ + targets = payload.get("targets", []) + + # Unlike upload.py we don't transmit or use run_as_real_user in the job - we just make sure + # in_place and purge_source are set on the individual upload fetch sources as needed based + # on this. + run_as_real_user = trans.app.config.external_chown_script is None # See comment in upload.py + purge_ftp_source = getattr(trans.app.config, 'ftp_upload_purge', True) and not run_as_real_user + + payload["check_content"] = trans.app.config.check_upload_content + + def check_src(item): + # Normalize file:// URLs into paths. + if item["src"] == "url" and item["url"].startswith("file://"): + item["src"] = "path" + item["path"] = item["url"][len("file://"):] + del item["path"] + + if "in_place" in item: + raise Exception("in_place cannot be set") + + src = item["src"] + if src == "path" or src == "url" and src["url"].startswith("file:"): + # Validate is admin, leave alone. + validate_path_upload(trans) + elif src == "server_dir": + # Validate and replace with path definition. + server_dir = item["server_dir"] + full_path, _ = validate_server_directory_upload(trans, server_dir) + item["src"] = "path" + item["path"] = full_path + elif src == "ftp_import": + ftp_path = item["ftp_path"] + + # It'd be nice if this can be de-duplicated with what is in parameters/grouping.py. + user_ftp_dir = trans.user_ftp_dir + assert not os.path.islink(user_ftp_dir), "User FTP directory cannot be a symbolic link" + for (dirpath, dirnames, filenames) in os.walk(user_ftp_dir): + for filename in filenames: + if ftp_path == filename: + path = relpath(os.path.join(dirpath, filename), user_ftp_dir) + if not os.path.islink(os.path.join(dirpath, filename)): + full_path = os.path.abspath(os.path.join(user_ftp_dir, path)) + + if not full_path: + raise Exception("Failed to find referenced ftp_path or symbolic link was enountered") + + item["src"] = path + item["path"] = full_path + item["purge_source"] = purge_ftp_source + elif src == "url": + url = item["url"] + looks_like_url = False + for url_prefix in ["http://", "https://", "ftp://", "ftps://"]: + if url.startswith(url_prefix): + looks_like_url = True + break + + if not looks_like_url: + raise Exception("Invalid URL [%s] found in src definition." % url) + + validate_url(url, trans.app.config.fetch_url_whitelist_ips) + item["in_place"] = run_as_real_user + elif src == "files": + item["in_place"] = run_as_real_user + + _for_each_src(check_src, targets) + + +def _for_each_src(f, obj): + if isinstance(obj, list): + for item in obj: + _for_each_src(f, item) + if isinstance(obj, dict): + if "src" in obj: + f(obj) + for key, value in obj.items(): + _for_each_src(f, value) diff --git a/lib/galaxy/webapps/galaxy/api/tools.py b/lib/galaxy/webapps/galaxy/api/tools.py index 691531d3c928..30abd5e6e70b 100644 --- a/lib/galaxy/webapps/galaxy/api/tools.py +++ b/lib/galaxy/webapps/galaxy/api/tools.py @@ -11,6 +11,7 @@ from galaxy.web import _future_expose_api_anonymous_and_sessionless as expose_api_anonymous_and_sessionless from galaxy.web.base.controller import BaseAPIController from galaxy.web.base.controller import UsesVisualizationMixin +from ._fetch_util import validate_and_normalize_targets log = logging.getLogger(__name__) @@ -289,12 +290,47 @@ def download(self, trans, id, **kwds): trans.response.headers["Content-Disposition"] = 'attachment; filename="%s.tgz"' % (id) return download_file + @expose_api_anonymous + def fetch(self, trans, payload, **kwd): + """Adapt clean API to tool-constrained API. + """ + log.info("Keywords are %s" % payload) + request_version = '1' + history_id = payload.pop("history_id") + clean_payload = {} + files_payload = {} + for key, value in payload.items(): + if key == "key": + continue + if key.startswith('files_') or key.startswith('__files_'): + files_payload[key] = value + continue + clean_payload[key] = value + log.info("payload %s" % clean_payload) + validate_and_normalize_targets(trans, clean_payload) + clean_payload["check_content"] = trans.app.config.check_upload_content + request = dumps(clean_payload) + log.info(request) + create_payload = { + 'tool_id': "__DATA_FETCH__", + 'history_id': history_id, + 'inputs': { + 'request_version': request_version, + 'request_json': request, + }, + } + create_payload.update(files_payload) + return self._create(trans, create_payload, **kwd) + @expose_api_anonymous def create(self, trans, payload, **kwd): """ POST /api/tools Executes tool using specified inputs and returns tool's outputs. """ + return self._create(trans, payload, **kwd) + + def _create(self, trans, payload, **kwd): # HACK: for now, if action is rerun, rerun tool. action = payload.get('action', None) if action == 'rerun': diff --git a/lib/galaxy/webapps/galaxy/buildapp.py b/lib/galaxy/webapps/galaxy/buildapp.py index 04f56c0fd5f5..cd3c80c65188 100644 --- a/lib/galaxy/webapps/galaxy/buildapp.py +++ b/lib/galaxy/webapps/galaxy/buildapp.py @@ -266,6 +266,7 @@ def populate_api_routes(webapp, app): # ====== TOOLS API ====== # ======================= + webapp.mapper.connect('/api/tools/fetch', action='fetch', controller='tools', conditions=dict(method=["POST"])) webapp.mapper.connect('/api/tools/all_requirements', action='all_requirements', controller="tools") webapp.mapper.connect('/api/tools/{id:.+?}/build', action='build', controller="tools") webapp.mapper.connect('/api/tools/{id:.+?}/reload', action='reload', controller="tools") diff --git a/test-data/example-bag.zip b/test-data/example-bag.zip new file mode 100644 index 000000000000..ee4de52c265b Binary files /dev/null and b/test-data/example-bag.zip differ diff --git a/test-data/testdir1.zip b/test-data/testdir1.zip new file mode 100644 index 000000000000..0f71e2ce96aa Binary files /dev/null and b/test-data/testdir1.zip differ diff --git a/test/api/test_dataset_collections.py b/test/api/test_dataset_collections.py index ca8950d8bb0c..5367668bda18 100644 --- a/test/api/test_dataset_collections.py +++ b/test/api/test_dataset_collections.py @@ -188,6 +188,37 @@ def test_enforces_unique_names(self): create_response = self._post("dataset_collections", payload) self._assert_status_code_is(create_response, 400) + def test_upload_collection(self): + items = [{"src": "files", "dbkey": "hg19", "info": "my cool bed"}] + targets = [{ + "destination": {"type": "hdca"}, + "items": items, + "collection_type": "list", + }] + payload = { + "history_id": self.history_id, # TODO: Shouldn't be needed :( + "targets": json.dumps(targets), + "__files": {"files_0|file_data": open(self.test_data_resolver.get_filename("4.bed"))}, + } + self.dataset_populator.fetch(payload) + hdca = self._assert_one_collection_created_in_history() + assert len(hdca["elements"]) == 1, hdca + element0 = hdca["elements"][0] + assert element0["element_identifier"] == "4.bed" + assert element0["object"]["file_size"] == 61 + + def _assert_one_collection_created_in_history(self): + contents_response = self._get("histories/%s/contents/dataset_collections" % self.history_id) + self._assert_status_code_is(contents_response, 200) + contents = contents_response.json() + assert len(contents) == 1 + hdca = contents[0] + assert hdca["history_content_type"] == "dataset_collection" + hdca_id = hdca["id"] + collection_response = self._get("histories/%s/contents/dataset_collections/%s" % (self.history_id, hdca_id)) + self._assert_status_code_is(collection_response, 200) + return collection_response.json() + def _check_create_response(self, create_response): self._assert_status_code_is(create_response, 200) dataset_collection = create_response.json() diff --git a/test/api/test_libraries.py b/test/api/test_libraries.py index 2a715f50fcbb..edca16c4fb6d 100644 --- a/test/api/test_libraries.py +++ b/test/api/test_libraries.py @@ -1,3 +1,5 @@ +import json + from base import api from base.populators import ( DatasetCollectionPopulator, @@ -95,6 +97,151 @@ def test_create_dataset(self): assert library_dataset["peek"].find("create_test") >= 0 assert library_dataset["file_ext"] == "txt", library_dataset["file_ext"] + def test_fetch_path_to_folder(self): + history_id, library, destination = self._setup_fetch_to_folder("flat_zip") + bed_test_data_path = self.test_data_resolver.get_filename("4.bed") + items = [{"src": "path", "path": bed_test_data_path, "info": "my cool bed"}] + targets = [{ + "destination": destination, + "items": items + }] + payload = { + "history_id": history_id, # TODO: Shouldn't be needed :( + "targets": json.dumps(targets), + } + self.dataset_populator.fetch(payload) + contents = self._get("libraries/%s/contents" % library["id"]).json() + c = [c for c in contents if c["name"] == "/4.bed"][0] + dataset = self._get(c["url"]).json() + assert dataset["file_size"] == 61, dataset + + def test_fetch_upload_to_folder(self): + history_id, library, destination = self._setup_fetch_to_folder("flat_zip") + items = [{"src": "files", "dbkey": "hg19", "info": "my cool bed"}] + targets = [{ + "destination": destination, + "items": items + }] + payload = { + "history_id": history_id, # TODO: Shouldn't be needed :( + "targets": json.dumps(targets), + "__files": {"files_0|file_data": open(self.test_data_resolver.get_filename("4.bed"))}, + } + self.dataset_populator.fetch(payload) + contents = self._get("libraries/%s/contents" % library["id"]).json() + c = [c for c in contents if c["name"] == "/4.bed"][0] + dataset = self._get(c["url"]).json() + assert dataset["file_size"] == 61, dataset + assert dataset["genome_build"] == "hg19", dataset + assert dataset["misc_info"] == "my cool bed", dataset + assert dataset["file_ext"] == "bed", dataset + + def test_fetch_zip_to_folder(self): + history_id, library, destination = self._setup_fetch_to_folder("flat_zip") + bed_test_data_path = self.test_data_resolver.get_filename("4.bed.zip") + targets = [{ + "destination": destination, + "items_from": "archive", "src": "path", "path": bed_test_data_path, + }] + payload = { + "history_id": history_id, # TODO: Shouldn't be needed :( + "targets": json.dumps(targets), + } + self.dataset_populator.fetch(payload) + contents = self._get("libraries/%s/contents" % library["id"]).json() + c = [c for c in contents if c["name"] == "/4.bed"][0] + dataset = self._get(c["url"]).json() + assert dataset["file_size"] == 61, dataset + + def test_fetch_single_url_to_folder(self): + history_id, library, destination = self._setup_fetch_to_folder("single_url") + items = [{"src": "url", "url": "https://raw.githubusercontent.com/galaxyproject/galaxy/dev/test-data/4.bed"}] + targets = [{ + "destination": destination, + "items": items + }] + payload = { + "history_id": history_id, # TODO: Shouldn't be needed :( + "targets": json.dumps(targets), + } + self.dataset_populator.fetch(payload) + contents = self._get("libraries/%s/contents" % library["id"]).json() + c = [c for c in contents if c["name"] == "/4.bed"][0] + dataset = self._get(c["url"]).json() + assert dataset["file_size"] == 61, dataset + + def test_fetch_url_archive_to_folder(self): + history_id, library, destination = self._setup_fetch_to_folder("single_url") + targets = [{ + "destination": destination, + "items_from": "archive", + "src": "url", + "url": "https://raw.githubusercontent.com/galaxyproject/galaxy/dev/test-data/4.bed.zip", + }] + payload = { + "history_id": history_id, # TODO: Shouldn't be needed :( + "targets": json.dumps(targets), + } + self.dataset_populator.fetch(payload) + contents = self._get("libraries/%s/contents" % library["id"]).json() + c = [c for c in contents if c["name"] == "/4.bed"][0] + dataset = self._get(c["url"]).json() + assert dataset["file_size"] == 61, dataset + + def test_fetch_recursive_archive(self): + history_id, library, destination = self._setup_fetch_to_folder("recursive_archive") + bed_test_data_path = self.test_data_resolver.get_filename("testdir1.zip") + targets = [{ + "destination": destination, + "items_from": "archive", "src": "path", "path": bed_test_data_path, + }] + payload = { + "history_id": history_id, # TODO: Shouldn't be needed :( + "targets": json.dumps(targets), + } + self.dataset_populator.fetch(payload) + contents = self._get("libraries/%s/contents" % library["id"]).json() + c = [c for c in contents if c["name"] == "/file1"][0] + dataset = self._get(c["url"]).json() + assert dataset["file_size"] == 6, dataset + + c2 = [c2 for c2 in contents if c2["name"] == "/file2"][0] + dataset = self._get(c2["url"]).json() + assert dataset["file_size"] == 6, dataset + + c3 = [c3 for c3 in contents if c3["name"] == "/dir1/file3"][0] + dataset = self._get(c3["url"]).json() + assert dataset["file_size"] == 11, dataset + + def test_fetch_bagit_archive_to_folder(self): + history_id, library, destination = self._setup_fetch_to_folder("bagit_archive") + example_bag_path = self.test_data_resolver.get_filename("example-bag.zip") + targets = [{ + "destination": destination, + "items_from": "bagit_archive", "src": "files", + }] + payload = { + "history_id": history_id, # TODO: Shouldn't be needed :( + "targets": json.dumps(targets), + "__files": {"files_0|file_data": open(example_bag_path)}, + } + self.dataset_populator.fetch(payload) + contents = self._get("libraries/%s/contents" % library["id"]).json() + c = [c for c in contents if c["name"] == "/README.txt"][0] + dataset = self._get(c["url"]).json() + assert dataset["file_size"] == 66, dataset + + c2 = [c2 for c2 in contents if c2["name"] == "/bdbag-profile.json"][0] + dataset = self._get(c2["url"]).json() + assert dataset["file_size"] == 723, dataset + + def _setup_fetch_to_folder(self, test_name): + history_id = self.dataset_populator.new_history() + library = self.library_populator.new_private_library(test_name) + folder_id = library["root_folder_id"][1:] + destination = {"type": "library_folder", "library_folder_id": folder_id} + return history_id, library, destination + def test_create_dataset_in_folder(self): library = self.library_populator.new_private_library("ForCreateDatasets") folder_response = self._create_folder(library) diff --git a/test/base/populators.py b/test/base/populators.py index 3d828b1b4269..324a53ec890c 100644 --- a/test/base/populators.py +++ b/test/base/populators.py @@ -148,14 +148,30 @@ def new_dataset_request(self, history_id, content=None, wait=False, **kwds): self.wait_for_tool_run(history_id, run_response) return run_response + def fetch(self, payload, assert_ok=True, timeout=DEFAULT_TIMEOUT): + tool_response = self._post("tools/fetch", data=payload) + if assert_ok: + job = self.check_run(tool_response) + self.wait_for_job(job["id"], timeout=timeout) + + job = tool_response.json()["jobs"][0] + details = self.get_job_details(job["id"]).json() + assert details["state"] == "ok", details + + return tool_response + def wait_for_tool_run(self, history_id, run_response, timeout=DEFAULT_TIMEOUT): - run = run_response.json() - assert run_response.status_code == 200, run - job = run["jobs"][0] + job = self.check_run(run_response) self.wait_for_job(job["id"], timeout=timeout) self.wait_for_history(history_id, assert_ok=True, timeout=timeout) return run_response + def check_run(self, run_response): + run = run_response.json() + assert run_response.status_code == 200, run + job = run["jobs"][0] + return job + def wait_for_history(self, history_id, assert_ok=False, timeout=DEFAULT_TIMEOUT): try: return wait_on_state(lambda: self._get("histories/%s" % history_id), assert_ok=assert_ok, timeout=timeout) @@ -266,8 +282,8 @@ def run_tool(self, tool_id, inputs, history_id, assert_ok=True, **kwds): else: return tool_response - def tools_post(self, payload): - tool_response = self._post("tools", data=payload) + def tools_post(self, payload, url="tools"): + tool_response = self._post(url, data=payload) return tool_response def get_history_dataset_content(self, history_id, wait=True, filename=None, **kwds): diff --git a/tools/data_source/upload.py b/tools/data_source/upload.py index e6c6cb6dc8ca..9198f89c325b 100644 --- a/tools/data_source/upload.py +++ b/tools/data_source/upload.py @@ -18,8 +18,12 @@ from galaxy import util from galaxy.datatypes import sniff -from galaxy.datatypes.binary import Binary from galaxy.datatypes.registry import Registry +from galaxy.datatypes.upload_util import ( + handle_sniffable_binary_check, + handle_unsniffable_binary_check, + UploadProblemException, +) from galaxy.util.checkers import ( check_binary, check_bz2, @@ -104,34 +108,25 @@ def add_file(dataset, registry, json_file, output_path): try: ext = dataset.file_type except AttributeError: - file_err('Unable to process uploaded file, missing file_type parameter.', dataset, json_file) - return + raise UploadProblemException('Unable to process uploaded file, missing file_type parameter.') if dataset.type == 'url': try: - page = urlopen(dataset.path) # page will be .close()ed by sniff methods - temp_name = sniff.stream_to_file(page, prefix='url_paste', source_encoding=util.get_charset_from_http_headers(page.headers)) + dataset.path = sniff.stream_url_to_file(dataset.path) except Exception as e: - file_err('Unable to fetch %s\n%s' % (dataset.path, str(e)), dataset, json_file) - return - dataset.path = temp_name + raise UploadProblemException('Unable to fetch %s\n%s' % (dataset.path, str(e))) + # See if we have an empty file if not os.path.exists(dataset.path): - file_err('Uploaded temporary file (%s) does not exist.' % dataset.path, dataset, json_file) - return + raise UploadProblemException('Uploaded temporary file (%s) does not exist.' % dataset.path) + if not os.path.getsize(dataset.path) > 0: - file_err('The uploaded file is empty', dataset, json_file) - return + raise UploadProblemException('The uploaded file is empty') + # Is dataset content supported sniffable binary? is_binary = check_binary(dataset.path) if is_binary: - # Sniff the data type - guessed_ext = sniff.guess_ext(dataset.path, registry.sniff_order) - # Set data_type only if guessed_ext is a binary datatype - datatype = registry.get_datatype_by_extension(guessed_ext) - if isinstance(datatype, Binary): - data_type = guessed_ext - ext = guessed_ext + data_type, ext = handle_sniffable_binary_check(data_type, ext, dataset.path, registry) if not data_type: root_datatype = registry.get_datatype_by_extension(dataset.file_type) if getattr(root_datatype, 'compressed', False): @@ -141,8 +136,8 @@ def add_file(dataset, registry, json_file, output_path): # See if we have a gzipped file, which, if it passes our restrictions, we'll uncompress is_gzipped, is_valid = check_gzip(dataset.path, check_content=check_content) if is_gzipped and not is_valid: - file_err('The gzipped uploaded file contains inappropriate content', dataset, json_file) - return + raise UploadProblemException('The gzipped uploaded file contains inappropriate content') + elif is_gzipped and is_valid and auto_decompress: if not link_data_only: # We need to uncompress the temp_name file, but BAM files must remain compressed in the BGZF format @@ -155,8 +150,8 @@ def add_file(dataset, registry, json_file, output_path): except IOError: os.close(fd) os.remove(uncompressed) - file_err('Problem decompressing gzipped data', dataset, json_file) - return + raise UploadProblemException('Problem decompressing gzipped data') + if not chunk: break os.write(fd, chunk) @@ -174,8 +169,8 @@ def add_file(dataset, registry, json_file, output_path): # See if we have a bz2 file, much like gzip is_bzipped, is_valid = check_bz2(dataset.path, check_content) if is_bzipped and not is_valid: - file_err('The gzipped uploaded file contains inappropriate content', dataset, json_file) - return + raise UploadProblemException('The gzipped uploaded file contains inappropriate content') + elif is_bzipped and is_valid and auto_decompress: if not link_data_only: # We need to uncompress the temp_name file @@ -188,8 +183,8 @@ def add_file(dataset, registry, json_file, output_path): except IOError: os.close(fd) os.remove(uncompressed) - file_err('Problem decompressing bz2 compressed data', dataset, json_file) - return + raise UploadProblemException('Problem decompressing bz2 compressed data') + if not chunk: break os.write(fd, chunk) @@ -228,7 +223,7 @@ def add_file(dataset, registry, json_file, output_path): except IOError: os.close(fd) os.remove(uncompressed) - file_err('Problem decompressing zipped data', dataset, json_file) + raise UploadProblemException('Problem decompressing zipped data') return if not chunk: break @@ -247,8 +242,8 @@ def add_file(dataset, registry, json_file, output_path): except IOError: os.close(fd) os.remove(uncompressed) - file_err('Problem decompressing zipped data', dataset, json_file) - return + raise UploadProblemException('Problem decompressing zipped data') + z.close() # Replace the zipped file with the decompressed file if it's safe to do so if uncompressed is not None: @@ -259,26 +254,15 @@ def add_file(dataset, registry, json_file, output_path): os.chmod(dataset.path, 0o644) dataset.name = uncompressed_name data_type = 'zip' - if not data_type: - if is_binary or registry.is_extension_unsniffable_binary(dataset.file_type): - # We have a binary dataset, but it is not Bam, Sff or Pdf - data_type = 'binary' - parts = dataset.name.split(".") - if len(parts) > 1: - ext = parts[-1].strip().lower() - is_ext_unsniffable_binary = registry.is_extension_unsniffable_binary(ext) - if check_content and not is_ext_unsniffable_binary: - file_err('The uploaded binary file contains inappropriate content', dataset, json_file) - return - elif is_ext_unsniffable_binary and dataset.file_type != ext: - err_msg = "You must manually set the 'File Format' to '%s' when uploading %s files." % (ext, ext) - file_err(err_msg, dataset, json_file) - return + if not data_type and is_binary: + data_type, ext = handle_unsniffable_binary_check( + data_type, ext, dataset.path, dataset.name, dataset.file_type, check_content, registry + ) if not data_type: # We must have a text file if check_content and check_html(dataset.path): - file_err('The uploaded file contains inappropriate HTML content', dataset, json_file) - return + raise UploadProblemException('The uploaded file contains inappropriate HTML content') + if data_type != 'binary': if not link_data_only and data_type not in ('gzip', 'bz2', 'zip'): # Convert universal line endings to Posix line endings if to_posix_lines is True @@ -308,8 +292,8 @@ def add_file(dataset, registry, json_file, output_path): if datatype.dataset_content_needs_grooming(dataset.path): err_msg = 'The uploaded files need grooming, so change your Copy data into Galaxy? selection to be ' + \ 'Copy files into Galaxy instead of Link to files without copying into Galaxy so grooming can be performed.' - file_err(err_msg, dataset, json_file) - return + raise UploadProblemException(err_msg) + if not link_data_only and converted_path: # Move the dataset to its "real" path try: @@ -334,6 +318,7 @@ def add_file(dataset, registry, json_file, output_path): if dataset.get('uuid', None) is not None: info['uuid'] = dataset.get('uuid') json_file.write(dumps(info) + "\n") + if not link_data_only and datatype and datatype.dataset_content_needs_grooming(output_path): # Groom the dataset content if necessary datatype.groom_dataset_content(output_path) @@ -407,7 +392,10 @@ def __main__(): files_path = output_paths[int(dataset.dataset_id)][1] add_composite_file(dataset, json_file, output_path, files_path) else: - add_file(dataset, registry, json_file, output_path) + try: + add_file(dataset, registry, json_file, output_path) + except UploadProblemException as e: + file_err(e.message, dataset, json_file) # clean up paramfile # TODO: this will not work when running as the actual user unless the