Skip to content

Commit

Permalink
Refactor CWL workflow output collection code toward reuse...
Browse files Browse the repository at this point in the history
... with tool outputs and for both tools and workflows in Planemo via galaxy-lib.
  • Loading branch information
jmchilton committed Aug 5, 2017
1 parent bffca21 commit 43de6cf
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 44 deletions.
73 changes: 73 additions & 0 deletions lib/galaxy/tools/cwl/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@
Used to share code between the Galaxy test framework
and other Galaxy CWL clients (e.g. Planemo)."""
import hashlib
import json
import os

from collections import namedtuple

from six import iteritems, StringIO


Expand Down Expand Up @@ -152,3 +155,73 @@ class ObjectUploadTarget(object):

def __init__(self, the_object):
self.object = the_object


GalaxyOutput = namedtuple("GalaxyOutput", ["history_id", "history_content_type", "history_content_id"])


def tool_response_to_output(tool_response, history_id, output_id):

for output in tool_response["outputs"]:
if output["output_name"] == output_id:
return GalaxyOutput(history_id, "dataset", output["id"])

for output_collection in tool_response["output_collections"]:
if output["output_name"] == output_id:
return GalaxyOutput(history_id, "dataset", output["id"])

raise Exception("Failed to find output with label [%s]" % output_id)


def invocation_to_output(invocation, history_id, output_id):
if output_id in invocation["outputs"]:
dataset = invocation["outputs"][output_id]
galaxy_output = GalaxyOutput(history_id, "dataset", dataset["id"])
elif output_id in invocation["output_collections"]:
collection = invocation["output_collections"][output_id]
galaxy_output = GalaxyOutput(history_id, "dataset_collection", collection["id"])
else:
raise Exception("Failed to find output with label [%s]" % output_id)

return galaxy_output


def output_to_cwl_json(galaxy_output, get_metadata, get_dataset):
"""Convert objects in a Galaxy history into a CWL object.
Useful in running conformance tests and implementing the cwl-runner
interface via Galaxy.
"""
def element_to_cwl_json(element):
element_output = GalaxyOutput(
galaxy_output.history_id,
element["object"]["history_content_type"],
element["object"]["id"],
)
return output_to_cwl_json(element_output, get_metadata, get_dataset)

output_metadata = get_metadata(galaxy_output.history_content_type, galaxy_output.history_content_id)
if output_metadata["history_content_type"] == "dataset":
ext = output_metadata["file_ext"]
assert output_metadata["state"] == "ok"
dataset_dict = get_dataset(output_metadata)
if ext == "expression.json":
if "content" in dataset_dict:
return json.loads(dataset_dict["content"])
else:
with open(dataset_dict["path"]) as f:
return json.load(f)
else:
return output_properties(**dataset_dict)
elif output_metadata["history_content_type"] == "dataset_collection":
if output_metadata["collection_type"] == "list":
rval = []
for element in output_metadata["elements"]:
rval.append(element_to_cwl_json(element))
elif output_metadata["collection_type"] == "record":
rval = {}
for element in output_metadata["elements"]:
rval[element["element_identifier"]] = element_to_cwl_json(element)
return rval
else:
raise NotImplementedError("Unknown history content type encountered")
87 changes: 43 additions & 44 deletions test/base/populators.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,13 @@
from pkg_resources import resource_string
from six import StringIO

from galaxy.tools.cwl.util import FileUploadTarget, galactic_job_json, output_properties
from galaxy.tools.cwl.util import (
FileUploadTarget,
galactic_job_json,
invocation_to_output,
output_to_cwl_json,
tool_response_to_output,
)

from base import api_asserts
from base.workflows_format_2 import (
Expand Down Expand Up @@ -78,9 +84,36 @@ def _run_tool_payload( self, tool_id, inputs, history_id, **kwds ):
return DatasetPopulator( self.galaxy_interactor ).run_tool_payload( tool_id, inputs, history_id, **kwds )


class CwlToolRun( object ):
class CwlRun(object):

def __init__( self, history_id, run_response ):
def __init__( self, dataset_populator, history_id ):
self.dataset_populator = dataset_populator
self.history_id = history_id

def get_output_as_object(self, output_name):
galaxy_output = self._output_name_to_object(output_name)

def get_metadata(history_content_type, content_id):
if history_content_type == "dataset":
return self.dataset_populator.get_history_dataset_details(self.history_id, dataset_id=content_id)
else:
return self.dataset_populator.get_history_collection_details(self.history_id, content_id=content_id)

def get_dataset(dataset_details):
content = self.dataset_populator.get_history_dataset_content(self.history_id, dataset_id=dataset_details["id"])
return {"content": content}

return output_to_cwl_json(
galaxy_output,
get_metadata,
get_dataset,
)


class CwlToolRun( CwlRun ):

def __init__( self, dataset_populator, history_id, run_response ):
self.dataset_populator = dataset_populator
self.history_id = history_id
self.run_response = run_response

Expand All @@ -94,57 +127,23 @@ def output(self, output_index):
def output_collection(self, output_index):
return self.run_response["output_collections"][output_index]

def _output_name_to_object(self, output_name):
return tool_response_to_output(self.run_response, self.history_id, output_name)


class CwlWorkflowRun( object ):
class CwlWorkflowRun( CwlRun ):

def __init__(self, dataset_populator, history_id, workflow_id, invocation_id):
self.dataset_populator = dataset_populator
self.history_id = history_id
self.workflow_id = workflow_id
self.invocation_id = invocation_id

def get_output_as_object(self, output_name):
def _output_name_to_object(self, output_name):
invocation_response = self.dataset_populator._get("workflows/%s/invocations/%s" % (self.invocation_id, self.workflow_id))
api_asserts.assert_status_code_is( invocation_response, 200 )
invocation = invocation_response.json()

def dataset_to_json(dataset_details):
ext = dataset_details["file_ext"]
assert dataset_details["state"] == "ok"
print(dataset_details)
if ext == "expression.json":
print("hid is %s" % dataset_details["hid"])
content = self.dataset_populator.get_history_dataset_content(self.history_id, dataset_id=dataset_details["id"])
print("content is %s" % content)
return json.loads(content)
else:
content = self.dataset_populator.get_history_dataset_content(self.history_id, dataset_id=dataset_details["id"])
return output_properties(content=content)

if output_name in invocation["outputs"]:
dataset = invocation["outputs"][output_name]
dataset_id = dataset["id"]
dataset_details = self.dataset_populator.get_history_dataset_details(self.history_id, dataset_id=dataset_id)
assert dataset_details["id"] == dataset_id
return dataset_to_json(dataset_details)
elif output_name in invocation["output_collections"]:
collection = invocation["output_collections"][output_name]
collection_details = self.dataset_populator.get_history_collection_details(self.history_id, content_id=collection["id"])
if collection_details["collection_type"] == "list":
rval = []
for element in collection_details["elements"]:
if element["object"]["history_content_type"] != "dataset":
raise NotImplementedError()
rval.append(dataset_to_json(element["object"]))
elif collection_details["collection_type"] == "record":
rval = {}
for element in collection_details["elements"]:
if element["object"]["history_content_type"] != "dataset":
raise NotImplementedError()
rval[element["element_identifier"]] = dataset_to_json(element["object"])
return rval
else:
raise Exception("Unknown output [%s] encountered for invocation [%s]" % (output_name, invocation))
return invocation_to_output(invocation, self.history_id, output_name)


class BaseDatasetPopulator( object ):
Expand Down Expand Up @@ -307,7 +306,7 @@ def create_collection_func(element_identifiers, collection_type):
self.wait_for_history( history_id=history_id, assert_ok=True )
if tool_or_workflow == "tool":
run_response = self.run_tool( tool_id, job_as_dict, history_id, inputs_representation="cwl", assert_ok=assert_ok )
run_object = CwlToolRun( history_id, run_response )
run_object = CwlToolRun( self.dataset_populator, history_id, run_response )
if assert_ok:
try:
final_state = self.wait_for_job( run_object.job_id )
Expand Down

0 comments on commit 43de6cf

Please sign in to comment.