From 8c76a022ce2571636c7346c1a375aa15cca77c3c Mon Sep 17 00:00:00 2001 From: John Chilton Date: Sat, 5 Aug 2017 15:22:36 -0400 Subject: [PATCH] Refactor CWL workflow output collection code toward reuse... ... with tool outputs and for both tools and workflows in Planemo via galaxy-lib. --- lib/galaxy/tools/cwl/util.py | 47 +++++++++++++++++ test/base/populators.py | 97 +++++++++++++++++++++--------------- 2 files changed, 105 insertions(+), 39 deletions(-) diff --git a/lib/galaxy/tools/cwl/util.py b/lib/galaxy/tools/cwl/util.py index e66198c92d24..4ece8d450fe4 100644 --- a/lib/galaxy/tools/cwl/util.py +++ b/lib/galaxy/tools/cwl/util.py @@ -3,8 +3,11 @@ Used to share code between the Galaxy test framework and other Galaxy CWL clients (e.g. Planemo).""" import hashlib +import json import os +from collections import namedtuple + from six import iteritems, StringIO @@ -152,3 +155,47 @@ class ObjectUploadTarget(object): def __init__(self, the_object): self.object = the_object + + +GalaxyOutput = namedtuple("GalaxyOutput", ["history_id", "history_content_type", "history_content_id"]) + + +def output_to_cwl_json(galaxy_output, get_metadata, get_dataset): + """Convert objects in a Galaxy history into a CWL object. + + Useful in running conformance tests and implementing the cwl-runner + interface via Galaxy. + """ + def element_to_cwl_json(element): + element_output = GalaxyOutput( + galaxy_output.history_id, + element["object"]["history_content_type"], + element["object"]["id"], + ) + return output_to_cwl_json(element_output, get_metadata, get_dataset) + + output_metadata = get_metadata(galaxy_output.history_content_type, galaxy_output.history_content_id) + if output_metadata["history_content_type"] == "dataset": + ext = output_metadata["file_ext"] + assert output_metadata["state"] == "ok" + dataset_dict = get_dataset(output_metadata) + if ext == "expression.json": + if "content" in dataset_dict: + return json.loads(dataset_dict["content"]) + else: + with open(dataset_dict["path"]) as f: + return json.load(f) + else: + return output_properties(**dataset_dict) + elif output_metadata["history_content_type"] == "dataset_collection": + if output_metadata["collection_type"] == "list": + rval = [] + for element in output_metadata["elements"]: + rval.append(element_to_cwl_json(element)) + elif output_metadata["collection_type"] == "record": + rval = {} + for element in output_metadata["elements"]: + rval[element["element_identifier"]] = element_to_cwl_json(element) + return rval + else: + raise NotImplementedError("Unknown history content type encountered") diff --git a/test/base/populators.py b/test/base/populators.py index 9ec8c705d28a..9a89749c7d94 100644 --- a/test/base/populators.py +++ b/test/base/populators.py @@ -11,7 +11,12 @@ from pkg_resources import resource_string from six import StringIO -from galaxy.tools.cwl.util import FileUploadTarget, galactic_job_json, output_properties +from galaxy.tools.cwl.util import ( + FileUploadTarget, + galactic_job_json, + GalaxyOutput, + output_to_cwl_json, +) from base import api_asserts from base.workflows_format_2 import ( @@ -78,9 +83,36 @@ def _run_tool_payload( self, tool_id, inputs, history_id, **kwds ): return DatasetPopulator( self.galaxy_interactor ).run_tool_payload( tool_id, inputs, history_id, **kwds ) -class CwlToolRun( object ): +class CwlRun(object): + + def __init__( self, dataset_populator, history_id ): + self.dataset_populator = dataset_populator + self.history_id = history_id + + def get_output_as_object(self, output_name): + galaxy_output = self._output_name_to_object(output_name) + + def get_metadata(history_content_type, content_id): + if history_content_type == "dataset": + return self.dataset_populator.get_history_dataset_details(self.history_id, dataset_id=content_id) + else: + return self.dataset_populator.get_history_collection_details(self.history_id, content_id=content_id) + + def get_dataset(dataset_details): + content = self.dataset_populator.get_history_dataset_content(self.history_id, dataset_id=dataset_details["id"]) + return {"content": content} + + return output_to_cwl_json( + galaxy_output, + get_metadata, + get_dataset, + ) + + +class CwlToolRun( CwlRun ): - def __init__( self, history_id, run_response ): + def __init__( self, dataset_populator, history_id, run_response ): + self.dataset_populator = dataset_populator self.history_id = history_id self.run_response = run_response @@ -94,8 +126,24 @@ def output(self, output_index): def output_collection(self, output_index): return self.run_response["output_collections"][output_index] + def _output_name_to_object(self, output_name): + galaxy_output = None + + for output in self.run_response["outputs"]: + if output["output_name"] == output_name: + galaxy_output = GalaxyOutput(self.history_id, "dataset", output["id"]) + + for output in self.run_response["output_collections"]: + if output["output_name"] == output_name: + galaxy_output = GalaxyOutput(self.history_id, "dataset_collection", output["id"]) + + if galaxy_output is None: + raise Exception("Failed to find tool output [%s]" % output_name) + + return galaxy_output -class CwlWorkflowRun( object ): + +class CwlWorkflowRun( CwlRun ): def __init__(self, dataset_populator, history_id, workflow_id, invocation_id): self.dataset_populator = dataset_populator @@ -103,48 +151,19 @@ def __init__(self, dataset_populator, history_id, workflow_id, invocation_id): self.workflow_id = workflow_id self.invocation_id = invocation_id - def get_output_as_object(self, output_name): + def _output_name_to_object(self, output_name): invocation_response = self.dataset_populator._get("workflows/%s/invocations/%s" % (self.invocation_id, self.workflow_id)) api_asserts.assert_status_code_is( invocation_response, 200 ) invocation = invocation_response.json() - def dataset_to_json(dataset_details): - ext = dataset_details["file_ext"] - assert dataset_details["state"] == "ok" - print(dataset_details) - if ext == "expression.json": - print("hid is %s" % dataset_details["hid"]) - content = self.dataset_populator.get_history_dataset_content(self.history_id, dataset_id=dataset_details["id"]) - print("content is %s" % content) - return json.loads(content) - else: - content = self.dataset_populator.get_history_dataset_content(self.history_id, dataset_id=dataset_details["id"]) - return output_properties(content=content) - if output_name in invocation["outputs"]: dataset = invocation["outputs"][output_name] - dataset_id = dataset["id"] - dataset_details = self.dataset_populator.get_history_dataset_details(self.history_id, dataset_id=dataset_id) - assert dataset_details["id"] == dataset_id - return dataset_to_json(dataset_details) + galaxy_output = GalaxyOutput(self.history_id, "dataset", dataset["id"]) elif output_name in invocation["output_collections"]: collection = invocation["output_collections"][output_name] - collection_details = self.dataset_populator.get_history_collection_details(self.history_id, content_id=collection["id"]) - if collection_details["collection_type"] == "list": - rval = [] - for element in collection_details["elements"]: - if element["object"]["history_content_type"] != "dataset": - raise NotImplementedError() - rval.append(dataset_to_json(element["object"])) - elif collection_details["collection_type"] == "record": - rval = {} - for element in collection_details["elements"]: - if element["object"]["history_content_type"] != "dataset": - raise NotImplementedError() - rval[element["element_identifier"]] = dataset_to_json(element["object"]) - return rval - else: - raise Exception("Unknown output [%s] encountered for invocation [%s]" % (output_name, invocation)) + galaxy_output = GalaxyOutput(self.history_id, "dataset_collection", collection["id"]) + + return galaxy_output class BaseDatasetPopulator( object ): @@ -307,7 +326,7 @@ def create_collection_func(element_identifiers, collection_type): self.wait_for_history( history_id=history_id, assert_ok=True ) if tool_or_workflow == "tool": run_response = self.run_tool( tool_id, job_as_dict, history_id, inputs_representation="cwl", assert_ok=assert_ok ) - run_object = CwlToolRun( history_id, run_response ) + run_object = CwlToolRun( self.dataset_populator, history_id, run_response ) if assert_ok: try: final_state = self.wait_for_job( run_object.job_id )