Skip to content

Commit

Permalink
Implement a subset of the Common Workflow Language tool format.
Browse files Browse the repository at this point in the history
This should support a subset of [draft-3](http://www.commonwl.org/draft-3/) and [v1.0](http://www.commonwl.org/v1.0/) tools.

CWL Support:
--------------

- Implemented integer, long, float, double, boolean, string, and File parameters, and arrays thereof as well as `["null", <simple_type>]` union parameters and Any-type parameters. More complex unions of datatypes are stil unsupported (unions of two or more non-null parameters, unions of `["null", Any]`, etc...).
 - Draft 3 ``InlineJavascriptRequirement`` are support to define output files (see ``test_cat3`` test case).
 - ``EnvVarRequirement``s are supported (see the ``test_env_tool1`` and ``test_env_tool2`` test cases).
 - Expression tools are supported (see ``parseInt-tool`` test case).

Previously Worked:
-------------------

The following things worked with Draft 3 and a previous version of cwltool but need to be updated.

- Draft 3 `CreateFileRequirement`s are supported (see the `test_rename` test case).
- Secondary files were supported at least partially, see the `index1` and `showindex1` CWL tools created to verify this as well as the `test_index1` test case.
- Docker integration is only partial (simple docker pull is supported) - so `cat3-tool.cwl` works for example. Full semantics of CWL docker support has yet to be implemented. The remaining work is straight-forward and trackd in the meta-issue galaxyproject#1684.

Remaining Work
---------------------------------

The work remaining is vast and will be tracked at https://github.com/common-workflow-language/galaxy/issues for the time being.

Implementation Notes:
----------------------
 - Non-File CWL outputs are represented as ``expression.json`` files. Traditionally Galaxy hasn't supported non-File outputs from tools but CWL Galaxy has work in progress on bringing native Galaxy support for such outputs #27.
 - CWL secondary files are stored in ``__secondary_files__`` directory in the dataset's extra_files_path directory.
 - The tool execution API has been extended to add a ``inputs_representation`` parameter that can be set to "cwl" now. The ``cwl`` representation for running tools corresonding to the CWL job json format with {class: "File: path: "/path/to/file"} inputs replaced with {"src": "hda", "id": "<dataset_id>"}. Code for building these requests for CWL job json is available in the test class.
 - Since the CWL <-> Galaxy parameter translation may change over time, for instance if Galaxy develops or refines parameter classes - CWL state and CWL state version is tracked in the database and hopefully for reruns, etc... we could update the Galaxy state from an older version to a new one.
 - CWL allows output parameters to be either ``File`` or non-``File`` and determined at runtime, so ``galaxy.json`` is used to dynamically adjust output extension as needed for non-``File`` parameters.

Implementation Description:
-----------------------------

The reference implementation Python library (mainly developed by Peter Amstutz - https://github.com/common-workflow-language/common-workflow-language/tree/master/reference) is used to load tool files ending with ``.json`` or ``.cwl`` and proxy objects are created to adapt these tools to Galaxy representations. In particular input and output descriptions are loaded from the tool.

When the tool is submitted, a special specialized tool class is used to build a cwltool compatible job description from the supplied Galaxy inputs and the CWL reference implementation is used to generate a CWL reference implementation Job object. A command-line is generated from this Job object.

As a result of this - Galaxy largely does not need to worry about the details of command-line adapters, expressions, etc....

Galaxy writes a description of the CWL job that it can reload to the job working directory. After the process is complete (on the Galaxy compute server, but outside the Docker container) this representation is reloaded and the dynamic outputs are discovered and moved to fixed locations as expected by Galaxy. CWL allows for much more expressive output locations than Galaxy, for better or worse, and this step uses cwltool to adapt CWL to Galaxy outputs.

Currently all ``File`` outputs are sniffed to determined a Galaxy datatype, CWL allows refinement on this and this remains work to be done.

  1) CWL should support EDAM declaration of types and Galaxy should provide a mapping to core datasets to skip sniffing is types are found.
  2) For finer grain control within Galaxy, extensions to CWL should allow setting actual Galaxy output types on outputs. (Distinction between fastq and fastqsanger in Galaxy is very important for instance.)

Testing:
---------------------

    % git clone https://github.com/common-workflow-language/galaxy.git
    % cd galaxy
    % git checkout cwl-1.0

Start Galaxy.

    % GALAXY_RUN_WITH_TEST_TOOLS=1 sh run.sh

Open http://localhost:8080/ and see CWL test tools (along with all Galaxy test tools) in left hand tool panel.

To go a step further and actually run CWL jobs within their designated Docker containers, copy the following minimal Galaxy job configuration file to ``config/job_conf.xml``. (Adjust the ``docker_sudo`` parameter based on how you execute Docker).

https://gist.github.com/jmchilton/3997fa471d1b4c556966

Run API tests demonstrating the various CWL demo tools with the following command.

```
./run_tests.sh -api test/api/test_tools_cwl.py
```

Issues and Contact
---------------------------------

Report issues at https://github.com/common-workflow-language/galaxy/issues and feel free ping jmchilton on the CWL [Gitter channel](https://gitter.im/common-workflow-language/common-workflow-language).
  • Loading branch information
jmchilton committed Mar 11, 2017
1 parent 65f331c commit 5d521d7
Show file tree
Hide file tree
Showing 89 changed files with 2,001 additions and 54 deletions.
3 changes: 2 additions & 1 deletion lib/galaxy/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,8 @@ def __init__( self, **kwargs ):
# These are not even beta - just experiments - don't use them unless
# you want yours tools to be broken in the future.
self.enable_beta_tool_formats = string_as_bool( kwargs.get( 'enable_beta_tool_formats', 'False' ) )

# Should CWL artifacts be loaded with strict validation enabled.
self.strict_cwl_validation = string_as_bool( kwargs.get( 'strict_cwl_validation', 'True') )
# Certain modules such as the pause module will automatically cause
# workflows to be scheduled in job handlers the way all workflows will
# be someday - the following two properties can also be used to force this
Expand Down
2 changes: 2 additions & 0 deletions lib/galaxy/dependencies/pinned-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,5 @@ ecdsa==0.13

# Flexible BAM index naming
pysam==0.8.4+gx5

cwltool==1.0.20170224141733
8 changes: 7 additions & 1 deletion lib/galaxy/jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -870,6 +870,9 @@ def can_split( self ):
# Should the job handler split this job up?
return self.app.config.use_tasked_jobs and self.tool.parallelism

def is_cwl_job( self ):
return self.tool.tool_type == "cwl"

def get_job_runner_url( self ):
log.warning('(%s) Job runner URLs are deprecated, use destinations instead.' % self.job_id)
return self.job_destination.url
Expand Down Expand Up @@ -974,10 +977,13 @@ def get_special( ):
# if the server was stopped and restarted before the job finished
job.command_line = unicodify(self.command_line)
job.dependencies = self.tool.dependencies
param_dict = tool_evaluator.param_dict
job.cwl_command_state = param_dict.get('__cwl_command_state', None)
job.cwl_command_state_version = param_dict.get('__cwl_command_state_version', None)
self.sa_session.add( job )
self.sa_session.flush()
# Return list of all extra files
self.param_dict = tool_evaluator.param_dict
self.param_dict = param_dict
version_string_cmd_raw = self.tool.version_string_cmd
if version_string_cmd_raw:
version_command_template = string.Template(version_string_cmd_raw)
Expand Down
8 changes: 7 additions & 1 deletion lib/galaxy/jobs/command_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
)

from galaxy import util
from galaxy.util import bunch
from galaxy.jobs.runners.util.job_script import (
check_script_integrity,
INTEGRITY_INJECTION,
Expand Down Expand Up @@ -175,8 +176,13 @@ def __handle_work_dir_outputs(commands_builder, job_wrapper, runner, remote_comm
if 'working_directory' in remote_command_params:
work_dir_outputs_kwds['job_working_directory'] = remote_command_params['working_directory']
work_dir_outputs = runner.get_work_dir_outputs( job_wrapper, **work_dir_outputs_kwds )
if work_dir_outputs:
if work_dir_outputs or job_wrapper.is_cwl_job:
commands_builder.capture_return_code()
if job_wrapper.is_cwl_job:
metadata_script_file = join(job_wrapper.working_directory, "relocate_dynamic_outputs.py")
relocate_contents = 'from galaxy_ext.cwl.handle_outputs import relocate_dynamic_outputs; relocate_dynamic_outputs()'
write_script(metadata_script_file, relocate_contents, bunch.Bunch(check_job_script_integrity=False))
commands_builder.append_command("python %s" % metadata_script_file)
copy_commands = map(__copy_if_exists_command, work_dir_outputs)
commands_builder.append_commands(copy_commands)

Expand Down
7 changes: 7 additions & 0 deletions lib/galaxy/managers/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ def create_tool(self, tool_payload):
if tool_id is None:
tool_id = str(uuid)

tool_version = representation.get("version", None)
tool_hash = build_tool_hash(representation)
value = representation
elif tool_format in ["CommandLineTool", "ExpressionTool"]:
# CWL tools
uuid = None
tool_id = representation.get("id", None)
tool_version = representation.get("version", None)
tool_hash = build_tool_hash(representation)
value = representation
Expand Down
5 changes: 5 additions & 0 deletions lib/galaxy/managers/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from galaxy.util.sanitize_html import sanitize_html
from galaxy.workflow.steps import attach_ordered_steps
from galaxy.workflow.modules import module_factory, is_tool_module_type, ToolModule, WorkflowModuleInjector
from galaxy.tools.cwl import workflow_proxy
from galaxy.tools.parameters.basic import DataToolParameter, DataCollectionToolParameter, RuntimeValue, workflow_building_modes
from galaxy.tools.parameters import visit_input_values, params_to_incoming
from galaxy.jobs.actions.post import ActionBox
Expand Down Expand Up @@ -278,6 +279,10 @@ def update_workflow_from_dict(self, trans, stored_workflow, workflow_data):
def _workflow_from_dict(self, trans, data, name, **kwds):
if isinstance(data, string_types):
data = json.loads(data)
if "src" in data:
assert data["src"] == "path"
wf_proxy = workflow_proxy(data["path"])
data = wf_proxy.to_dict()

# Create new workflow from source data
workflow = model.Workflow()
Expand Down
2 changes: 2 additions & 0 deletions lib/galaxy/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,8 @@ def to_dict( self, view='collection', system_details=False ):
# System level details that only admins should have.
rval['external_id'] = self.job_runner_external_id
rval['command_line'] = self.command_line
rval['cwl_command_state'] = self.cwl_command_state
rval['cwl_command_state_version'] = self.cwl_command_state_version

if view == 'element':
param_dict = dict( [ ( p.name, p.value ) for p in self.parameters ] )
Expand Down
2 changes: 2 additions & 0 deletions lib/galaxy/model/mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,8 @@
Column( "object_store_id", TrimmedString( 255 ), index=True ),
Column( "imported", Boolean, default=False, index=True ),
Column( "params", TrimmedString(255), index=True ),
Column( "cwl_command_state", JSONType, nullable=True ),
Column( "cwl_command_state_version", Integer, default=1 ),
Column( "handler", TrimmedString( 255 ), index=True ) )

model.JobStateHistory.table = Table(
Expand Down
49 changes: 49 additions & 0 deletions lib/galaxy/model/migrate/versions/0136_cwl_state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
"""
Migration script to allow invalidation of job external output metadata temp files
"""
import datetime
import logging

from sqlalchemy import Integer, Column, MetaData, Table
from galaxy.model.custom_types import JSONType

now = datetime.datetime.utcnow
log = logging.getLogger( __name__ )
metadata = MetaData()


def upgrade(migrate_engine):
metadata.bind = migrate_engine
print __doc__
metadata.reflect()

cwl_command_column = Column( "cwl_command_state", JSONType, default=True )
cwl_command_version_column = Column( "cwl_command_state_version", Integer, default=True )
__add_column( cwl_command_column, "job", metadata )
__add_column( cwl_command_version_column, "job", metadata )


def downgrade(migrate_engine):
metadata.bind = migrate_engine
metadata.reflect()

__drop_column( "cwl_command_state", "job", metadata )
__drop_column( "cwl_command_state_version", "job", metadata )


def __add_column(column, table_name, metadata, **kwds):
try:
table = Table( table_name, metadata, autoload=True )
column.create( table, **kwds )
except Exception as e:
print str(e)
log.exception( "Adding column %s failed." % column)


def __drop_column( column_name, table_name, metadata ):
try:
table = Table( table_name, metadata, autoload=True )
getattr( table.c, column_name ).drop()
except Exception as e:
print str(e)
log.exception( "Dropping column %s failed." % column_name )
80 changes: 75 additions & 5 deletions lib/galaxy/tools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,14 @@
get_tool_source_from_representation,
ToolOutputCollectionPart
)
from galaxy.tools.cwl import needs_shell_quoting, shellescape
from galaxy.tools.parser.xml import XmlPageSource
from galaxy.tools.toolbox import BaseGalaxyToolBox
from galaxy.util import (
ExecutionTimer,
listify,
rst_to_html,
safe_makedirs,
string_as_bool,
unicodify
)
Expand Down Expand Up @@ -225,7 +227,11 @@ def tools_by_id( self ):

def create_tool( self, config_file, repository_id=None, guid=None, **kwds ):
try:
tool_source = get_tool_source( config_file, enable_beta_formats=getattr( self.app.config, "enable_beta_tool_formats", False ) )
tool_source = get_tool_source(
config_file,
enable_beta_formats=getattr( self.app.config, "enable_beta_tool_formats", False ),
strict_cwl_validation=getattr( self.app.config, "strict_cwl_validation", True ),
)
except Exception as e:
# capture and log parsing errors
global_tool_errors.add_error(config_file, "Tool XML parsing", e)
Expand Down Expand Up @@ -1563,7 +1569,7 @@ def call_hook( self, hook_name, *args, **kwargs ):
def exec_before_job( self, app, inp_data, out_data, param_dict={} ):
pass

def exec_after_process( self, app, inp_data, out_data, param_dict, job=None ):
def exec_after_process( self, app, inp_data, out_data, param_dict, job=None, **kwds ):
pass

def job_failed( self, job_wrapper, message, exception=False ):
Expand Down Expand Up @@ -1797,7 +1803,7 @@ def to_dict( self, trans, link_details=False, io_details=False ):
tool_dict[ 'panel_section_id' ], tool_dict[ 'panel_section_name' ] = self.get_panel_section()

tool_class = self.__class__
regular_form = tool_class == Tool or isinstance(self, DatabaseOperationTool)
regular_form = tool_class == Tool or isinstance(self, DatabaseOperationTool) or tool_class == CwlTool
tool_dict["form_style"] = "regular" if regular_form else "special"

return tool_dict
Expand Down Expand Up @@ -2263,7 +2269,7 @@ class SetMetadataTool( Tool ):
tool_type = 'set_metadata'
requires_setting_metadata = False

def exec_after_process( self, app, inp_data, out_data, param_dict, job=None ):
def exec_after_process( self, app, inp_data, out_data, param_dict, job=None, **kwds ):
for name, dataset in inp_data.items():
external_metadata = JobExternalOutputMetadataWrapper( job )
if external_metadata.external_metadata_set_successfully( dataset, app.model.context ):
Expand Down Expand Up @@ -2304,6 +2310,68 @@ class ImportHistoryTool( Tool ):
tool_type = 'import_history'


class CwlTool( Tool ):
tool_type = 'cwl'

def exec_before_job( self, app, inp_data, out_data, param_dict=None ):
super( CwlTool, self ).exec_before_job( app, inp_data, out_data, param_dict=param_dict )
# Working directory on Galaxy server (instead of remote compute).
local_working_directory = param_dict["__local_working_directory__"]
log.info("exec_before_job for CWL tool")
from galaxy.tools.cwl import to_cwl_job
input_json = to_cwl_job(self, param_dict, local_working_directory)
if param_dict is None:
raise Exception("Internal error - param_dict is empty.")
output_dict = {}
for name, dataset in out_data.items():
output_dict[name] = {
"id": dataset.dataset.id,
"path": dataset.file_name,
}

cwl_job_proxy = self._cwl_tool_proxy.job_proxy(
input_json,
output_dict,
local_working_directory,
)
# Write representation to disk that can be reloaded at runtime
# and outputs collected before Galaxy metadata is gathered.
cwl_job_proxy.save_job()

cwl_command_line = cwl_job_proxy.command_line
cwl_stdin = cwl_job_proxy.stdin
cwl_stdout = cwl_job_proxy.stdout
env = cwl_job_proxy.environment

command_line = " ".join([shellescape.quote(arg) if needs_shell_quoting(arg) else arg for arg in cwl_command_line])
if cwl_stdin:
command_line += '< "' + cwl_stdin + '"'
if cwl_stdout:
command_line += '> "' + cwl_stdout + '"'
cwl_job_state = {
'args': cwl_command_line,
'stdin': cwl_stdin,
'stdout': cwl_stdout,
'env': env,
}
tool_working_directory = os.path.join(local_working_directory, 'working')
# Move to prepare...
safe_makedirs(tool_working_directory)
cwl_job_proxy.stage_files()

param_dict["__cwl_command"] = command_line
param_dict["__cwl_command_state"] = cwl_job_state
param_dict["__cwl_command_version"] = 1
log.info("CwlTool.exec_before_job() generated command_line %s" % command_line)

def parse( self, tool_source, **kwds ):
super( CwlTool, self ).parse( tool_source, **kwds )
cwl_tool_proxy = getattr( tool_source, 'tool_proxy', None )
if cwl_tool_proxy is None:
raise Exception("CwlTool.parse() called on tool source not defining a proxy object to underlying CWL tool.")
self._cwl_tool_proxy = cwl_tool_proxy


class DataManagerTool( OutputParameterJSONTool ):
tool_type = 'manage_data'
default_tool_action = DataManagerToolAction
Expand Down Expand Up @@ -2681,7 +2749,9 @@ def produce_outputs( self, trans, out_data, output_collections, incoming, histor
for tool_class in [ Tool, SetMetadataTool, OutputParameterJSONTool, ExpressionTool,
DataManagerTool, DataSourceTool, AsyncDataSourceTool,
UnzipCollectionTool, ZipCollectionTool, MergeCollectionTool,
DataDestinationTool ]:
DataDestinationTool,
CwlTool ]:

tool_types[ tool_class.tool_type ] = tool_class


Expand Down
10 changes: 10 additions & 0 deletions lib/galaxy/tools/cwl/cwltool_deps.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
workflow,
job,
process,
pathmapper,
)
except (ImportError, SyntaxError):
# Drop SyntaxError once cwltool supports Python 3
main = None
workflow = None
job = None
process = None
pathmapper = None

try:
from cwltool import load_tool
Expand All @@ -41,6 +43,12 @@
# Drop SyntaxError once schema_salad supports Python 3
schema_salad = None

try:
from schema_salad import ref_resolver
except (ImportError, SyntaxError):
ref_resolver = None


needs_shell_quoting = re.compile(r"""(^$|[\s|&;()<>\'"$@])""").search


Expand All @@ -66,9 +74,11 @@ def ensure_cwltool_available():

__all__ = (
'main',
'ref_resolver',
'load_tool',
'workflow',
'process',
'pathmapper',
'ensure_cwltool_available',
'schema_salad',
'shellescape',
Expand Down
Loading

0 comments on commit 5d521d7

Please sign in to comment.