diff --git a/lib/galaxy/tools/cwl/util.py b/lib/galaxy/tools/cwl/util.py index e66198c92d24..8c85ce96de84 100644 --- a/lib/galaxy/tools/cwl/util.py +++ b/lib/galaxy/tools/cwl/util.py @@ -3,8 +3,11 @@ Used to share code between the Galaxy test framework and other Galaxy CWL clients (e.g. Planemo).""" import hashlib +import json import os +from collections import namedtuple + from six import iteritems, StringIO @@ -152,3 +155,73 @@ class ObjectUploadTarget(object): def __init__(self, the_object): self.object = the_object + + +GalaxyOutput = namedtuple("GalaxyOutput", ["history_id", "history_content_type", "history_content_id"]) + + +def tool_response_to_output(tool_response, history_id, output_id): + + for output in tool_response["outputs"]: + if output["output_name"] == output_id: + return GalaxyOutput(history_id, "dataset", output["id"]) + + for output_collection in tool_response["output_collections"]: + if output["output_name"] == output_id: + return GalaxyOutput(history_id, "dataset", output["id"]) + + raise Exception("Failed to find output with label [%s]" % output_id) + + +def invocation_to_output(invocation, history_id, output_id): + if output_id in invocation["outputs"]: + dataset = invocation["outputs"][output_id] + galaxy_output = GalaxyOutput(history_id, "dataset", dataset["id"]) + elif output_id in invocation["output_collections"]: + collection = invocation["output_collections"][output_id] + galaxy_output = GalaxyOutput(history_id, "dataset_collection", collection["id"]) + else: + raise Exception("Failed to find output with label [%s]" % output_id) + + return galaxy_output + + +def output_to_cwl_json(galaxy_output, get_metadata, get_dataset): + """Convert objects in a Galaxy history into a CWL object. + + Useful in running conformance tests and implementing the cwl-runner + interface via Galaxy. + """ + def element_to_cwl_json(element): + element_output = GalaxyOutput( + galaxy_output.history_id, + element["object"]["history_content_type"], + element["object"]["id"], + ) + return output_to_cwl_json(element_output, get_metadata, get_dataset) + + output_metadata = get_metadata(galaxy_output.history_content_type, galaxy_output.history_content_id) + if output_metadata["history_content_type"] == "dataset": + ext = output_metadata["file_ext"] + assert output_metadata["state"] == "ok" + dataset_dict = get_dataset(output_metadata) + if ext == "expression.json": + if "content" in dataset_dict: + return json.loads(dataset_dict["content"]) + else: + with open(dataset_dict["path"]) as f: + return json.load(f) + else: + return output_properties(**dataset_dict) + elif output_metadata["history_content_type"] == "dataset_collection": + if output_metadata["collection_type"] == "list": + rval = [] + for element in output_metadata["elements"]: + rval.append(element_to_cwl_json(element)) + elif output_metadata["collection_type"] == "record": + rval = {} + for element in output_metadata["elements"]: + rval[element["element_identifier"]] = element_to_cwl_json(element) + return rval + else: + raise NotImplementedError("Unknown history content type encountered") diff --git a/test/base/populators.py b/test/base/populators.py index 9ec8c705d28a..b13235362895 100644 --- a/test/base/populators.py +++ b/test/base/populators.py @@ -11,7 +11,13 @@ from pkg_resources import resource_string from six import StringIO -from galaxy.tools.cwl.util import FileUploadTarget, galactic_job_json, output_properties +from galaxy.tools.cwl.util import ( + FileUploadTarget, + galactic_job_json, + invocation_to_output, + output_to_cwl_json, + tool_response_to_output, +) from base import api_asserts from base.workflows_format_2 import ( @@ -78,9 +84,36 @@ def _run_tool_payload( self, tool_id, inputs, history_id, **kwds ): return DatasetPopulator( self.galaxy_interactor ).run_tool_payload( tool_id, inputs, history_id, **kwds ) -class CwlToolRun( object ): +class CwlRun(object): - def __init__( self, history_id, run_response ): + def __init__( self, dataset_populator, history_id ): + self.dataset_populator = dataset_populator + self.history_id = history_id + + def get_output_as_object(self, output_name): + galaxy_output = self._output_name_to_object(output_name) + + def get_metadata(history_content_type, content_id): + if history_content_type == "dataset": + return self.dataset_populator.get_history_dataset_details(self.history_id, dataset_id=content_id) + else: + return self.dataset_populator.get_history_collection_details(self.history_id, content_id=content_id) + + def get_dataset(dataset_details): + content = self.dataset_populator.get_history_dataset_content(self.history_id, dataset_id=dataset_details["id"]) + return {"content": content} + + return output_to_cwl_json( + galaxy_output, + get_metadata, + get_dataset, + ) + + +class CwlToolRun( CwlRun ): + + def __init__( self, dataset_populator, history_id, run_response ): + self.dataset_populator = dataset_populator self.history_id = history_id self.run_response = run_response @@ -94,8 +127,11 @@ def output(self, output_index): def output_collection(self, output_index): return self.run_response["output_collections"][output_index] + def _output_name_to_object(self, output_name): + return tool_response_to_output(self.run_response, self.history_id, output_name) + -class CwlWorkflowRun( object ): +class CwlWorkflowRun( CwlRun ): def __init__(self, dataset_populator, history_id, workflow_id, invocation_id): self.dataset_populator = dataset_populator @@ -103,48 +139,11 @@ def __init__(self, dataset_populator, history_id, workflow_id, invocation_id): self.workflow_id = workflow_id self.invocation_id = invocation_id - def get_output_as_object(self, output_name): + def _output_name_to_object(self, output_name): invocation_response = self.dataset_populator._get("workflows/%s/invocations/%s" % (self.invocation_id, self.workflow_id)) api_asserts.assert_status_code_is( invocation_response, 200 ) invocation = invocation_response.json() - - def dataset_to_json(dataset_details): - ext = dataset_details["file_ext"] - assert dataset_details["state"] == "ok" - print(dataset_details) - if ext == "expression.json": - print("hid is %s" % dataset_details["hid"]) - content = self.dataset_populator.get_history_dataset_content(self.history_id, dataset_id=dataset_details["id"]) - print("content is %s" % content) - return json.loads(content) - else: - content = self.dataset_populator.get_history_dataset_content(self.history_id, dataset_id=dataset_details["id"]) - return output_properties(content=content) - - if output_name in invocation["outputs"]: - dataset = invocation["outputs"][output_name] - dataset_id = dataset["id"] - dataset_details = self.dataset_populator.get_history_dataset_details(self.history_id, dataset_id=dataset_id) - assert dataset_details["id"] == dataset_id - return dataset_to_json(dataset_details) - elif output_name in invocation["output_collections"]: - collection = invocation["output_collections"][output_name] - collection_details = self.dataset_populator.get_history_collection_details(self.history_id, content_id=collection["id"]) - if collection_details["collection_type"] == "list": - rval = [] - for element in collection_details["elements"]: - if element["object"]["history_content_type"] != "dataset": - raise NotImplementedError() - rval.append(dataset_to_json(element["object"])) - elif collection_details["collection_type"] == "record": - rval = {} - for element in collection_details["elements"]: - if element["object"]["history_content_type"] != "dataset": - raise NotImplementedError() - rval[element["element_identifier"]] = dataset_to_json(element["object"]) - return rval - else: - raise Exception("Unknown output [%s] encountered for invocation [%s]" % (output_name, invocation)) + return invocation_to_output(invocation, self.history_id, output_name) class BaseDatasetPopulator( object ): @@ -307,7 +306,7 @@ def create_collection_func(element_identifiers, collection_type): self.wait_for_history( history_id=history_id, assert_ok=True ) if tool_or_workflow == "tool": run_response = self.run_tool( tool_id, job_as_dict, history_id, inputs_representation="cwl", assert_ok=assert_ok ) - run_object = CwlToolRun( history_id, run_response ) + run_object = CwlToolRun( self.dataset_populator, history_id, run_response ) if assert_ok: try: final_state = self.wait_for_job( run_object.job_id )