diff --git a/lib/galaxy/config.py b/lib/galaxy/config.py index 5cfb7040bb8c..e05497af62af 100644 --- a/lib/galaxy/config.py +++ b/lib/galaxy/config.py @@ -317,7 +317,8 @@ def __init__( self, **kwargs ): # These are not even beta - just experiments - don't use them unless # you want yours tools to be broken in the future. self.enable_beta_tool_formats = string_as_bool( kwargs.get( 'enable_beta_tool_formats', 'False' ) ) - + # Should CWL artifacts be loaded with strict validation enabled. + self.strict_cwl_validation = string_as_bool( kwargs.get( 'strict_cwl_validation', 'True') ) # Certain modules such as the pause module will automatically cause # workflows to be scheduled in job handlers the way all workflows will # be someday - the following two properties can also be used to force this diff --git a/lib/galaxy/dependencies/pinned-requirements.txt b/lib/galaxy/dependencies/pinned-requirements.txt index 4c253a47a3da..5fc4ebb88cef 100644 --- a/lib/galaxy/dependencies/pinned-requirements.txt +++ b/lib/galaxy/dependencies/pinned-requirements.txt @@ -69,3 +69,5 @@ ecdsa==0.13 # Flexible BAM index naming pysam==0.8.4+gx5 + +cwltool==1.0.20170224141733 diff --git a/lib/galaxy/jobs/__init__.py b/lib/galaxy/jobs/__init__.py index a04dee8cb5b9..2046b60d9763 100644 --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -870,6 +870,9 @@ def can_split( self ): # Should the job handler split this job up? return self.app.config.use_tasked_jobs and self.tool.parallelism + def is_cwl_job( self ): + return self.tool.tool_type == "cwl" + def get_job_runner_url( self ): log.warning('(%s) Job runner URLs are deprecated, use destinations instead.' % self.job_id) return self.job_destination.url @@ -974,10 +977,13 @@ def get_special( ): # if the server was stopped and restarted before the job finished job.command_line = unicodify(self.command_line) job.dependencies = self.tool.dependencies + param_dict = tool_evaluator.param_dict + job.cwl_command_state = param_dict.get('__cwl_command_state', None) + job.cwl_command_state_version = param_dict.get('__cwl_command_state_version', None) self.sa_session.add( job ) self.sa_session.flush() # Return list of all extra files - self.param_dict = tool_evaluator.param_dict + self.param_dict = param_dict version_string_cmd_raw = self.tool.version_string_cmd if version_string_cmd_raw: version_command_template = string.Template(version_string_cmd_raw) diff --git a/lib/galaxy/jobs/command_factory.py b/lib/galaxy/jobs/command_factory.py index 896461059841..69906e85a355 100644 --- a/lib/galaxy/jobs/command_factory.py +++ b/lib/galaxy/jobs/command_factory.py @@ -6,6 +6,7 @@ ) from galaxy import util +from galaxy.util import bunch from galaxy.jobs.runners.util.job_script import ( check_script_integrity, INTEGRITY_INJECTION, @@ -175,8 +176,13 @@ def __handle_work_dir_outputs(commands_builder, job_wrapper, runner, remote_comm if 'working_directory' in remote_command_params: work_dir_outputs_kwds['job_working_directory'] = remote_command_params['working_directory'] work_dir_outputs = runner.get_work_dir_outputs( job_wrapper, **work_dir_outputs_kwds ) - if work_dir_outputs: + if work_dir_outputs or job_wrapper.is_cwl_job: commands_builder.capture_return_code() + if job_wrapper.is_cwl_job: + metadata_script_file = join(job_wrapper.working_directory, "relocate_dynamic_outputs.py") + relocate_contents = 'from galaxy_ext.cwl.handle_outputs import relocate_dynamic_outputs; relocate_dynamic_outputs()' + write_script(metadata_script_file, relocate_contents, bunch.Bunch(check_job_script_integrity=False)) + commands_builder.append_command("python %s" % metadata_script_file) copy_commands = map(__copy_if_exists_command, work_dir_outputs) commands_builder.append_commands(copy_commands) diff --git a/lib/galaxy/managers/tools.py b/lib/galaxy/managers/tools.py index 49893a2700f1..507ae3ddb64b 100644 --- a/lib/galaxy/managers/tools.py +++ b/lib/galaxy/managers/tools.py @@ -63,6 +63,13 @@ def create_tool(self, tool_payload): if tool_id is None: tool_id = str(uuid) + tool_version = representation.get("version", None) + tool_hash = build_tool_hash(representation) + value = representation + elif tool_format in ["CommandLineTool", "ExpressionTool"]: + # CWL tools + uuid = None + tool_id = representation.get("id", None) tool_version = representation.get("version", None) tool_hash = build_tool_hash(representation) value = representation diff --git a/lib/galaxy/managers/workflows.py b/lib/galaxy/managers/workflows.py index 7ec9f0ab321e..96c2a8301acc 100644 --- a/lib/galaxy/managers/workflows.py +++ b/lib/galaxy/managers/workflows.py @@ -22,6 +22,7 @@ from galaxy.util.sanitize_html import sanitize_html from galaxy.workflow.steps import attach_ordered_steps from galaxy.workflow.modules import module_factory, is_tool_module_type, ToolModule, WorkflowModuleInjector +from galaxy.tools.cwl import workflow_proxy from galaxy.tools.parameters.basic import DataToolParameter, DataCollectionToolParameter, RuntimeValue, workflow_building_modes from galaxy.tools.parameters import visit_input_values, params_to_incoming from galaxy.jobs.actions.post import ActionBox @@ -278,6 +279,10 @@ def update_workflow_from_dict(self, trans, stored_workflow, workflow_data): def _workflow_from_dict(self, trans, data, name, **kwds): if isinstance(data, string_types): data = json.loads(data) + if "src" in data: + assert data["src"] == "path" + wf_proxy = workflow_proxy(data["path"]) + data = wf_proxy.to_dict() # Create new workflow from source data workflow = model.Workflow() diff --git a/lib/galaxy/model/__init__.py b/lib/galaxy/model/__init__.py index 585e854144f1..b98b232ee951 100644 --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -721,6 +721,8 @@ def to_dict( self, view='collection', system_details=False ): # System level details that only admins should have. rval['external_id'] = self.job_runner_external_id rval['command_line'] = self.command_line + rval['cwl_command_state'] = self.cwl_command_state + rval['cwl_command_state_version'] = self.cwl_command_state_version if view == 'element': param_dict = dict( [ ( p.name, p.value ) for p in self.parameters ] ) diff --git a/lib/galaxy/model/mapping.py b/lib/galaxy/model/mapping.py index 3a54aefa22f7..fa131608939b 100644 --- a/lib/galaxy/model/mapping.py +++ b/lib/galaxy/model/mapping.py @@ -498,6 +498,8 @@ Column( "object_store_id", TrimmedString( 255 ), index=True ), Column( "imported", Boolean, default=False, index=True ), Column( "params", TrimmedString(255), index=True ), + Column( "cwl_command_state", JSONType, nullable=True ), + Column( "cwl_command_state_version", Integer, default=1 ), Column( "handler", TrimmedString( 255 ), index=True ) ) model.JobStateHistory.table = Table( diff --git a/lib/galaxy/model/migrate/versions/0136_cwl_state.py b/lib/galaxy/model/migrate/versions/0136_cwl_state.py new file mode 100644 index 000000000000..fcd9041d9e7b --- /dev/null +++ b/lib/galaxy/model/migrate/versions/0136_cwl_state.py @@ -0,0 +1,49 @@ +""" +Migration script to allow invalidation of job external output metadata temp files +""" +import datetime +import logging + +from sqlalchemy import Integer, Column, MetaData, Table +from galaxy.model.custom_types import JSONType + +now = datetime.datetime.utcnow +log = logging.getLogger( __name__ ) +metadata = MetaData() + + +def upgrade(migrate_engine): + metadata.bind = migrate_engine + print __doc__ + metadata.reflect() + + cwl_command_column = Column( "cwl_command_state", JSONType, default=True ) + cwl_command_version_column = Column( "cwl_command_state_version", Integer, default=True ) + __add_column( cwl_command_column, "job", metadata ) + __add_column( cwl_command_version_column, "job", metadata ) + + +def downgrade(migrate_engine): + metadata.bind = migrate_engine + metadata.reflect() + + __drop_column( "cwl_command_state", "job", metadata ) + __drop_column( "cwl_command_state_version", "job", metadata ) + + +def __add_column(column, table_name, metadata, **kwds): + try: + table = Table( table_name, metadata, autoload=True ) + column.create( table, **kwds ) + except Exception as e: + print str(e) + log.exception( "Adding column %s failed." % column) + + +def __drop_column( column_name, table_name, metadata ): + try: + table = Table( table_name, metadata, autoload=True ) + getattr( table.c, column_name ).drop() + except Exception as e: + print str(e) + log.exception( "Dropping column %s failed." % column_name ) diff --git a/lib/galaxy/tools/__init__.py b/lib/galaxy/tools/__init__.py index 9f622fa25551..b95a386e375e 100755 --- a/lib/galaxy/tools/__init__.py +++ b/lib/galaxy/tools/__init__.py @@ -70,12 +70,14 @@ get_tool_source_from_representation, ToolOutputCollectionPart ) +from galaxy.tools.cwl import needs_shell_quoting, shellescape from galaxy.tools.parser.xml import XmlPageSource from galaxy.tools.toolbox import BaseGalaxyToolBox from galaxy.util import ( ExecutionTimer, listify, rst_to_html, + safe_makedirs, string_as_bool, unicodify ) @@ -225,7 +227,11 @@ def tools_by_id( self ): def create_tool( self, config_file, repository_id=None, guid=None, **kwds ): try: - tool_source = get_tool_source( config_file, enable_beta_formats=getattr( self.app.config, "enable_beta_tool_formats", False ) ) + tool_source = get_tool_source( + config_file, + enable_beta_formats=getattr( self.app.config, "enable_beta_tool_formats", False ), + strict_cwl_validation=getattr( self.app.config, "strict_cwl_validation", True ), + ) except Exception as e: # capture and log parsing errors global_tool_errors.add_error(config_file, "Tool XML parsing", e) @@ -1563,7 +1569,7 @@ def call_hook( self, hook_name, *args, **kwargs ): def exec_before_job( self, app, inp_data, out_data, param_dict={} ): pass - def exec_after_process( self, app, inp_data, out_data, param_dict, job=None ): + def exec_after_process( self, app, inp_data, out_data, param_dict, job=None, **kwds ): pass def job_failed( self, job_wrapper, message, exception=False ): @@ -1797,7 +1803,7 @@ def to_dict( self, trans, link_details=False, io_details=False ): tool_dict[ 'panel_section_id' ], tool_dict[ 'panel_section_name' ] = self.get_panel_section() tool_class = self.__class__ - regular_form = tool_class == Tool or isinstance(self, DatabaseOperationTool) + regular_form = tool_class == Tool or isinstance(self, DatabaseOperationTool) or tool_class == CwlTool tool_dict["form_style"] = "regular" if regular_form else "special" return tool_dict @@ -2263,7 +2269,7 @@ class SetMetadataTool( Tool ): tool_type = 'set_metadata' requires_setting_metadata = False - def exec_after_process( self, app, inp_data, out_data, param_dict, job=None ): + def exec_after_process( self, app, inp_data, out_data, param_dict, job=None, **kwds ): for name, dataset in inp_data.items(): external_metadata = JobExternalOutputMetadataWrapper( job ) if external_metadata.external_metadata_set_successfully( dataset, app.model.context ): @@ -2304,6 +2310,68 @@ class ImportHistoryTool( Tool ): tool_type = 'import_history' +class CwlTool( Tool ): + tool_type = 'cwl' + + def exec_before_job( self, app, inp_data, out_data, param_dict=None ): + super( CwlTool, self ).exec_before_job( app, inp_data, out_data, param_dict=param_dict ) + # Working directory on Galaxy server (instead of remote compute). + local_working_directory = param_dict["__local_working_directory__"] + log.info("exec_before_job for CWL tool") + from galaxy.tools.cwl import to_cwl_job + input_json = to_cwl_job(self, param_dict, local_working_directory) + if param_dict is None: + raise Exception("Internal error - param_dict is empty.") + output_dict = {} + for name, dataset in out_data.items(): + output_dict[name] = { + "id": dataset.dataset.id, + "path": dataset.file_name, + } + + cwl_job_proxy = self._cwl_tool_proxy.job_proxy( + input_json, + output_dict, + local_working_directory, + ) + # Write representation to disk that can be reloaded at runtime + # and outputs collected before Galaxy metadata is gathered. + cwl_job_proxy.save_job() + + cwl_command_line = cwl_job_proxy.command_line + cwl_stdin = cwl_job_proxy.stdin + cwl_stdout = cwl_job_proxy.stdout + env = cwl_job_proxy.environment + + command_line = " ".join([shellescape.quote(arg) if needs_shell_quoting(arg) else arg for arg in cwl_command_line]) + if cwl_stdin: + command_line += '< "' + cwl_stdin + '"' + if cwl_stdout: + command_line += '> "' + cwl_stdout + '"' + cwl_job_state = { + 'args': cwl_command_line, + 'stdin': cwl_stdin, + 'stdout': cwl_stdout, + 'env': env, + } + tool_working_directory = os.path.join(local_working_directory, 'working') + # Move to prepare... + safe_makedirs(tool_working_directory) + cwl_job_proxy.stage_files() + + param_dict["__cwl_command"] = command_line + param_dict["__cwl_command_state"] = cwl_job_state + param_dict["__cwl_command_version"] = 1 + log.info("CwlTool.exec_before_job() generated command_line %s" % command_line) + + def parse( self, tool_source, **kwds ): + super( CwlTool, self ).parse( tool_source, **kwds ) + cwl_tool_proxy = getattr( tool_source, 'tool_proxy', None ) + if cwl_tool_proxy is None: + raise Exception("CwlTool.parse() called on tool source not defining a proxy object to underlying CWL tool.") + self._cwl_tool_proxy = cwl_tool_proxy + + class DataManagerTool( OutputParameterJSONTool ): tool_type = 'manage_data' default_tool_action = DataManagerToolAction @@ -2681,7 +2749,9 @@ def produce_outputs( self, trans, out_data, output_collections, incoming, histor for tool_class in [ Tool, SetMetadataTool, OutputParameterJSONTool, ExpressionTool, DataManagerTool, DataSourceTool, AsyncDataSourceTool, UnzipCollectionTool, ZipCollectionTool, MergeCollectionTool, - DataDestinationTool ]: + DataDestinationTool, + CwlTool ]: + tool_types[ tool_class.tool_type ] = tool_class diff --git a/lib/galaxy/tools/cwl/cwltool_deps.py b/lib/galaxy/tools/cwl/cwltool_deps.py index a2a946c91a17..9c2518f03d94 100644 --- a/lib/galaxy/tools/cwl/cwltool_deps.py +++ b/lib/galaxy/tools/cwl/cwltool_deps.py @@ -17,6 +17,7 @@ workflow, job, process, + pathmapper, ) except (ImportError, SyntaxError): # Drop SyntaxError once cwltool supports Python 3 @@ -24,6 +25,7 @@ workflow = None job = None process = None + pathmapper = None try: from cwltool import load_tool @@ -41,6 +43,12 @@ # Drop SyntaxError once schema_salad supports Python 3 schema_salad = None +try: + from schema_salad import ref_resolver +except (ImportError, SyntaxError): + ref_resolver = None + + needs_shell_quoting = re.compile(r"""(^$|[\s|&;()<>\'"$@])""").search @@ -66,9 +74,11 @@ def ensure_cwltool_available(): __all__ = ( 'main', + 'ref_resolver', 'load_tool', 'workflow', 'process', + 'pathmapper', 'ensure_cwltool_available', 'schema_salad', 'shellescape', diff --git a/lib/galaxy/tools/cwl/parser.py b/lib/galaxy/tools/cwl/parser.py index 3e1064409336..195fdf28e5ed 100644 --- a/lib/galaxy/tools/cwl/parser.py +++ b/lib/galaxy/tools/cwl/parser.py @@ -18,11 +18,10 @@ from .cwltool_deps import ( ensure_cwltool_available, - main, - workflow, + process, ) -from .schema import schema_loader +from .schema import non_strict_schema_loader, schema_loader log = logging.getLogger(__name__) @@ -42,36 +41,36 @@ ] -def tool_proxy(tool_path): +def tool_proxy(tool_path, strict_cwl_validation=True): """ Provide a proxy object to cwltool data structures to just grab relevant data. """ ensure_cwltool_available() - tool = to_cwl_tool_object(tool_path) + tool = to_cwl_tool_object(tool_path, strict_cwl_validation=strict_cwl_validation) return tool -def workflow_proxy(workflow_path): +def workflow_proxy(workflow_path, strict_cwl_validation=True): ensure_cwltool_available() - workflow = to_cwl_workflow_object(workflow_path) + workflow = to_cwl_workflow_object(workflow_path, strict_cwl_validation=strict_cwl_validation) return workflow -def load_job_proxy(job_directory): +def load_job_proxy(job_directory, strict_cwl_validation=True): ensure_cwltool_available() job_objects_path = os.path.join(job_directory, JOB_JSON_FILE) job_objects = json.load(open(job_objects_path, "r")) tool_path = job_objects["tool_path"] job_inputs = job_objects["job_inputs"] output_dict = job_objects["output_dict"] - cwl_tool = tool_proxy(tool_path) + cwl_tool = tool_proxy(tool_path, strict_cwl_validation=strict_cwl_validation) cwl_job = cwl_tool.job_proxy(job_inputs, output_dict, job_directory=job_directory) return cwl_job -def to_cwl_tool_object(tool_path): +def to_cwl_tool_object(tool_path, strict_cwl_validation=True): proxy_class = None - cwl_tool = schema_loader.tool(path=tool_path) + cwl_tool = _schema_loader(strict_cwl_validation).tool(path=tool_path) if isinstance(cwl_tool, int): raise Exception("Failed to load tool.") @@ -93,10 +92,9 @@ def to_cwl_tool_object(tool_path): return proxy -def to_cwl_workflow_object(workflow_path): +def to_cwl_workflow_object(workflow_path, strict_cwl_validation=None): proxy_class = WorkflowProxy - make_tool = workflow.defaultMakeTool - cwl_workflow = main.load_tool(workflow_path, False, False, make_tool, False) + cwl_workflow = _schema_loader(strict_cwl_validation).tool(path=workflow_path) raw_workflow = cwl_workflow.tool check_requirements(raw_workflow, tool=False) @@ -104,6 +102,11 @@ def to_cwl_workflow_object(workflow_path): return proxy +def _schema_loader(strict_cwl_validation): + target_schema_loader = schema_loader if strict_cwl_validation else non_strict_schema_loader + return target_schema_loader + + def check_requirements(rec, tool=True): if isinstance(rec, dict): if "requirements" in rec: @@ -237,14 +240,21 @@ def is_command_line_job(self): def _ensure_cwl_job_initialized(self): if self._cwl_job is None: + self._cwl_job = next(self._tool_proxy._tool.job( self._input_dict, self._output_callback, basedir=self._job_directory, + select_resources=self._select_resources, use_container=False )) self._is_command_line_job = hasattr(self._cwl_job, "command_line") + def _select_resources(self, request): + new_request = request.copy() + new_request["cores"] = "$GALAXY_SLOTS" + return new_request + @property def command_line(self): if self.is_command_line_job: @@ -324,6 +334,12 @@ def output_secondary_files_dir(self, output_name, create=False): safe_makedirs(secondary_files_dir) return secondary_files_dir + def stage_files(self): + cwl_job = self.cwl_job() + if hasattr(cwl_job, "pathmapper"): + process.stageFiles(self.cwl_job().pathmapper, os.symlink, ignoreWritable=True) + # else: expression tools do not have a path mapper. + @staticmethod def _job_file(job_directory): return os.path.join(job_directory, JOB_JSON_FILE) diff --git a/lib/galaxy/tools/cwl/representation.py b/lib/galaxy/tools/cwl/representation.py index 140708b0f646..fde19ffbf1fa 100644 --- a/lib/galaxy/tools/cwl/representation.py +++ b/lib/galaxy/tools/cwl/representation.py @@ -19,6 +19,7 @@ 'float': 'float', 'data': 'File', 'boolean': 'boolean', + 'text': 'text' } @@ -56,7 +57,8 @@ def simple_value(input, param_dict_value, cwl_type=None): os.symlink(secondary_file_path, new_input_path + secondary_file_name) path = new_input_path - return {"path": path, "class": "File"} + return {"location": path, + "class": "File"} elif cwl_type == "integer": return int(str(param_dict_value)) elif cwl_type == "long": @@ -67,7 +69,7 @@ def simple_value(input, param_dict_value, cwl_type=None): return float(str(param_dict_value)) elif cwl_type == "boolean": return string_as_bool(param_dict_value) - elif cwl_type == "string": + elif cwl_type == "text": return str(param_dict_value) elif cwl_type == "json": raw_value = param_dict_value.value @@ -94,9 +96,6 @@ def simple_value(input, param_dict_value, cwl_type=None): else: input_json[input_name] = simple_value(input, param_dict[input_name]) - input_json["allocatedResources"] = { - "cpu": "$GALAXY_SLOTS", - } return input_json diff --git a/lib/galaxy/tools/cwl/runtime_actions.py b/lib/galaxy/tools/cwl/runtime_actions.py index 07a4b2c5731a..c3814b7d29d7 100644 --- a/lib/galaxy/tools/cwl/runtime_actions.py +++ b/lib/galaxy/tools/cwl/runtime_actions.py @@ -2,6 +2,7 @@ import os import shutil +from .cwltool_deps import ref_resolver from .parser import ( JOB_JSON_FILE, load_job_proxy, @@ -17,13 +18,16 @@ def handle_outputs(job_directory=None): if not os.path.exists(cwl_job_file): # Not a CWL job, just continue return - job_proxy = load_job_proxy(job_directory) + # So we only need to do strict validation when the tool was loaded, + # no reason to do it again during job execution - so this shortcut + # allows us to not need Galaxy's full configuration on job nodes. + job_proxy = load_job_proxy(job_directory, strict_cwl_validation=False) tool_working_directory = os.path.join(job_directory, "working") outputs = job_proxy.collect_outputs(tool_working_directory) for output_name, output in outputs.items(): - target_path = job_proxy.output_path( output_name ) - if isinstance(output, dict) and "path" in output: - output_path = output["path"] + target_path = job_proxy.output_path(output_name) + if isinstance(output, dict) and "location" in output: + output_path = ref_resolver.uri_file_path(output["location"]) if output["class"] != "File": open("galaxy.json", "w").write(json.dump({ "dataset_id": job_proxy.output_id(output_name), @@ -33,7 +37,7 @@ def handle_outputs(job_directory=None): shutil.move(output_path, target_path) for secondary_file in output.get("secondaryFiles", []): # TODO: handle nested files... - secondary_file_path = secondary_file["path"] + secondary_file_path = ref_resolver.uri_file_path(secondary_file["location"]) assert secondary_file_path.startswith(output_path) secondary_file_name = secondary_file_path[len(output_path):] secondary_files_dir = job_proxy.output_secondary_files_dir( diff --git a/lib/galaxy/tools/cwl/schema.py b/lib/galaxy/tools/cwl/schema.py index fb24b7847241..6b3e1e49ccd0 100644 --- a/lib/galaxy/tools/cwl/schema.py +++ b/lib/galaxy/tools/cwl/schema.py @@ -24,11 +24,8 @@ def __init__(self, strict=True): @property def raw_document_loader(self): - if self._raw_document_loader is None: - ensure_cwltool_available() - self._raw_document_loader = schema_salad.ref_resolver.Loader({"cwl": "https://w3id.org/cwl/cwl#", "id": "@id"}) - - return self._raw_document_loader + ensure_cwltool_available() + return schema_salad.ref_resolver.Loader({"cwl": "https://w3id.org/cwl/cwl#", "id": "@id"}) def raw_process_reference(self, path): uri = "file://" + os.path.abspath(path) @@ -71,3 +68,4 @@ def tool(self, **kwds): schema_loader = SchemaLoader() +non_strict_schema_loader = SchemaLoader(strict=False) diff --git a/lib/galaxy/tools/evaluation.py b/lib/galaxy/tools/evaluation.py index 7c9d3cc64dd6..edfa6a7dc965 100644 --- a/lib/galaxy/tools/evaluation.py +++ b/lib/galaxy/tools/evaluation.py @@ -380,7 +380,7 @@ def get_data_table_entry(table_name, query_attr, query_val, return_attr): param_dict['__tool_directory__'] = self.compute_environment.tool_directory() param_dict['__get_data_table_entry__'] = get_data_table_entry - + param_dict['__local_working_directory__'] = self.local_working_directory # We add access to app here, this allows access to app.config, etc param_dict['__app__'] = RawObjectWrapper( self.app ) # More convienent access to app.config.new_file_path; we don't need to @@ -518,7 +518,12 @@ def __build_config_files( self ): def __build_environment_variables( self ): param_dict = self.param_dict environment_variables = [] - for environment_variable_def in self.tool.environment_variables: + environment_variables_raw = self.tool.environment_variables + for key, value in param_dict.get("__cwl_command_state", {}).get("env", {}).items(): + environment_variable = dict(name=key, template=value) + environment_variables_raw.append(environment_variable) + + for environment_variable_def in environment_variables_raw: directory = self.local_working_directory environment_variable = environment_variable_def.copy() environment_variable_template = environment_variable_def["template"] diff --git a/lib/galaxy/tools/parameters/grouping.py b/lib/galaxy/tools/parameters/grouping.py index f83d66490500..3037e5375a99 100644 --- a/lib/galaxy/tools/parameters/grouping.py +++ b/lib/galaxy/tools/parameters/grouping.py @@ -600,6 +600,10 @@ def nested_to_dict( input ): cond_dict[ "test_param" ] = nested_to_dict( self.test_param ) return cond_dict + @property + def case_strings(self): + return [c.value for c in self.cases] + class ConditionalWhen( object, Dictifiable ): dict_collection_visible_keys = ( 'value', ) diff --git a/lib/galaxy/tools/parser/cwl.py b/lib/galaxy/tools/parser/cwl.py index 65c33d80146d..a981bafdb076 100644 --- a/lib/galaxy/tools/parser/cwl.py +++ b/lib/galaxy/tools/parser/cwl.py @@ -18,16 +18,17 @@ class CwlToolSource(ToolSource): - def __init__(self, tool_file): + def __init__(self, tool_file, strict_cwl_validation=True): self._cwl_tool_file = tool_file self._id, _ = os.path.splitext(os.path.basename(tool_file)) self._tool_proxy = None self._source_path = tool_file + self._strict_cwl_validation = strict_cwl_validation @property def tool_proxy(self): if self._tool_proxy is None: - self._tool_proxy = tool_proxy(self._source_path) + self._tool_proxy = tool_proxy(self._source_path, strict_cwl_validation=self._strict_cwl_validation) return self._tool_proxy def parse_tool_type(self): @@ -95,7 +96,7 @@ def parse_version(self): return "0.0.1" def parse_description(self): - return "" + return self.tool_proxy.description() def parse_input_pages(self): page_source = CwlPageSource(self.tool_proxy) diff --git a/lib/galaxy/tools/parser/factory.py b/lib/galaxy/tools/parser/factory.py index 3cc27e80c229..65012b23b174 100644 --- a/lib/galaxy/tools/parser/factory.py +++ b/lib/galaxy/tools/parser/factory.py @@ -16,7 +16,7 @@ log = logging.getLogger(__name__) -def get_tool_source(config_file=None, xml_tree=None, enable_beta_formats=True): +def get_tool_source(config_file=None, xml_tree=None, enable_beta_formats=True, strict_cwl_validation=True): """Return a ToolSource object corresponding to supplied source. The supplied source may be specified as a file path (using the config_file @@ -38,7 +38,7 @@ def get_tool_source(config_file=None, xml_tree=None, enable_beta_formats=True): return YamlToolSource(as_dict, source_path=config_file) elif config_file.endswith(".json") or config_file.endswith(".cwl"): log.info("Loading CWL tool - this is experimental - tool likely will not function in future at least in same way.") - return CwlToolSource(config_file) + return CwlToolSource(config_file, strict_cwl_validation=strict_cwl_validation) else: tree = load_tool_xml(config_file) return XmlToolSource(tree, source_path=config_file) diff --git a/lib/galaxy/webapps/galaxy/api/jobs.py b/lib/galaxy/webapps/galaxy/api/jobs.py index d22a9b30f219..08900ab2342b 100644 --- a/lib/galaxy/webapps/galaxy/api/jobs.py +++ b/lib/galaxy/webapps/galaxy/api/jobs.py @@ -136,6 +136,8 @@ def show( self, trans, id, **kwd ): if full_output: job_dict.update( dict( stderr=job.stderr, stdout=job.stdout ) ) if is_admin: + job_dict['cwl_command_state'] = job.cwl_command_state + job_dict['cwl_command_state_version'] = job.cwl_command_state_version job_dict['user_email'] = job.user.email def metric_to_dict(metric): diff --git a/lib/galaxy/webapps/galaxy/api/tools.py b/lib/galaxy/webapps/galaxy/api/tools.py index fa4fdea2ea92..3472dcd89f48 100644 --- a/lib/galaxy/webapps/galaxy/api/tools.py +++ b/lib/galaxy/webapps/galaxy/api/tools.py @@ -11,6 +11,7 @@ from galaxy.web import _future_expose_api_anonymous_and_sessionless as expose_api_anonymous_and_sessionless from galaxy.web.base.controller import BaseAPIController from galaxy.web.base.controller import UsesVisualizationMixin +from galaxy.tools.cwl import to_galaxy_parameters log = logging.getLogger( __name__ ) @@ -312,6 +313,11 @@ def create( self, trans, payload, **kwd ): # Set up inputs. inputs = payload.get( 'inputs', {} ) + + inputs_representation = payload.get( 'inputs_representation', 'galaxy' ) + if inputs_representation == "cwl": + inputs = to_galaxy_parameters( tool, inputs ) + # Find files coming in as multipart file data and add to inputs. for k, v in payload.iteritems(): if k.startswith('files_') or k.startswith('__files_'): diff --git a/lib/galaxy/webapps/galaxy/api/workflows.py b/lib/galaxy/webapps/galaxy/api/workflows.py index cd930783cd8d..4273d9638496 100644 --- a/lib/galaxy/webapps/galaxy/api/workflows.py +++ b/lib/galaxy/webapps/galaxy/api/workflows.py @@ -149,6 +149,7 @@ def create(self, trans, payload, **kwd): 'workflow_id', 'installed_repository_file', 'from_history_id', + 'from_path', 'shared_workflow_id', 'workflow', ] ) @@ -189,6 +190,14 @@ def create(self, trans, payload, **kwd): item[ 'url' ] = url_for( 'workflow', id=item[ 'id' ] ) return item + if 'from_path' in payload: + if not trans.user_is_admin(): + raise exceptions.AdminRequiredException() + + from_path = payload.get( 'from_path' ) + payload["workflow"] = {"src": "from_path", "path": from_path} + return self.__api_import_new_workflow( trans, payload, **kwd ) + if 'shared_workflow_id' in payload: workflow_id = payload[ 'shared_workflow_id' ] return self.__api_import_shared_workflow( trans, workflow_id, payload ) diff --git a/lib/galaxy/webapps/galaxy/controllers/tool_runner.py b/lib/galaxy/webapps/galaxy/controllers/tool_runner.py index 2c411bd6ca9c..0af158d6b3c4 100644 --- a/lib/galaxy/webapps/galaxy/controllers/tool_runner.py +++ b/lib/galaxy/webapps/galaxy/controllers/tool_runner.py @@ -61,7 +61,7 @@ def __tool_404__(): redirect=redirect ) ) if not tool.allow_user_access( trans.user ): return __tool_404__() - if tool.tool_type == 'default': + if tool.tool_type in ['default', 'cwl']: return trans.response.send_redirect( url_for( controller='root', tool_id=tool_id ) ) # execute tool without displaying form (used for datasource tools) diff --git a/lib/galaxy_ext/cwl/__init__.py b/lib/galaxy_ext/cwl/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/lib/galaxy_ext/cwl/handle_outputs.py b/lib/galaxy_ext/cwl/handle_outputs.py new file mode 100644 index 000000000000..64bc77021ec8 --- /dev/null +++ b/lib/galaxy_ext/cwl/handle_outputs.py @@ -0,0 +1,21 @@ +""" +""" + +import logging +import os +import sys + +# insert *this* galaxy before all others on sys.path +sys.path.insert( 1, os.path.abspath( os.path.join( os.path.dirname( __file__ ), os.pardir, os.pardir ) ) ) + +from galaxy.tools.cwl import handle_outputs + +# ensure supported version +assert sys.version_info[:2] >= ( 2, 6 ) and sys.version_info[:2] <= ( 2, 7 ), 'Python version must be 2.6 or 2.7, this is: %s' % sys.version + +logging.basicConfig() +log = logging.getLogger( __name__ ) + + +def relocate_dynamic_outputs(): + handle_outputs() diff --git a/test/api/test_jobs.py b/test/api/test_jobs.py index 53ce9a1c5749..35a6c904f671 100644 --- a/test/api/test_jobs.py +++ b/test/api/test_jobs.py @@ -107,7 +107,10 @@ def test_show_security( self ): job_id = job[ "id" ] show_jobs_response = self._get( "jobs/%s" % job_id, admin=False ) - self._assert_not_has_keys( show_jobs_response.json(), "command_line", "external_id" ) + self._assert_not_has_keys( + show_jobs_response.json(), + "command_line", "external_id", "cwl_command_state", + ) # TODO: Re-activate test case when API accepts privacy settings # with self._different_user(): @@ -115,7 +118,10 @@ def test_show_security( self ): # self._assert_status_code_is( show_jobs_response, 200 ) show_jobs_response = self._get( "jobs/%s" % job_id, admin=True ) - self._assert_has_keys( show_jobs_response.json(), "command_line", "external_id" ) + self._assert_has_keys( + show_jobs_response.json(), + "command_line", "external_id", "cwl_command_state", + ) def test_search( self ): history_id, dataset_id = self.__history_with_ok_dataset() diff --git a/test/api/test_tools.py b/test/api/test_tools.py index fd7c090beb45..01500e67d907 100644 --- a/test/api/test_tools.py +++ b/test/api/test_tools.py @@ -1,4 +1,4 @@ -# Test tools API. +"""Test the Galaxy Tool API.""" import json from base import api @@ -25,6 +25,7 @@ class ToolsTestCase( api.ApiTestCase ): + """Test the Galaxy Tool API.""" def setUp( self ): super( ToolsTestCase, self ).setUp( ) @@ -1311,11 +1312,12 @@ def _run_outputs( self, create_response ): def _run_cat1( self, history_id, inputs, assert_ok=False ): return self._run( 'cat1', history_id, inputs, assert_ok=assert_ok ) - def _run( self, tool_id, history_id, inputs, assert_ok=False, tool_version=None ): + def _run( self, tool_id, history_id, inputs, assert_ok=False, tool_version=None, inputs_representation=None ): payload = self.dataset_populator.run_tool_payload( tool_id=tool_id, inputs=inputs, history_id=history_id, + inputs_representation=inputs_representation, ) if tool_version is not None: payload[ "tool_version" ] = tool_version diff --git a/test/api/test_tools_cwl.py b/test/api/test_tools_cwl.py new file mode 100644 index 000000000000..f4dea0abf6a0 --- /dev/null +++ b/test/api/test_tools_cwl.py @@ -0,0 +1,363 @@ +"""Test CWL Tool Execution via the API.""" + +from sys import platform as _platform + +from base import api +from base.populators import DatasetPopulator +from base.populators import skip_without_tool + +IS_OS_X = _platform == "darwin" + + +class CwlToolsTestCase( api.ApiTestCase ): + """Test CWL Tool Execution via the API.""" + + def setUp( self ): + """Setup dataset populator.""" + super( CwlToolsTestCase, self ).setUp( ) + self.dataset_populator = DatasetPopulator( self.galaxy_interactor ) + + @skip_without_tool( "cat1-tool" ) + def test_cat1_number( self ): + """Test execution of cat1 using the "normal" Galaxy job API representation.""" + history_id = self.dataset_populator.new_history() + hda1 = _dataset_to_param( self.dataset_populator.new_dataset( history_id, content='1\n2\n3' ) ) + inputs = { + "file1": hda1, + "numbering|_cwl__type_": "boolean", + "numbering|_cwl__value_": True, + } + stdout = self._run_and_get_stdout( "cat1-tool", history_id, inputs, assert_ok=True ) + self.assertEquals(stdout, " 1\t1\n 2\t2\n 3\t3\n") + + @skip_without_tool( "cat1-tool" ) + def test_cat1_number_cwl_json( self ): + """Test execution of cat1 using the "CWL" Galaxy job API representation.""" + history_id = self.dataset_populator.new_history() + hda1 = _dataset_to_param( self.dataset_populator.new_dataset( history_id, content='1\n2\n3' ) ) + inputs = { + "file1": hda1, + "numbering": True, + } + stdout = self._run_and_get_stdout( "cat1-tool", history_id, inputs, assert_ok=True, inputs_representation="cwl" ) + self.assertEquals(stdout, " 1\t1\n 2\t2\n 3\t3\n") + + @skip_without_tool( "cat1-tool" ) + def test_cat1_number_cwl_json_file( self ): + """Test execution of cat1 using the CWL job definition file.""" + run_object = self.dataset_populator.run_cwl_tool( "cat1-tool", "test/functional/tools/cwl_tools/draft3/cat-job.json") + stdout = self._get_job_stdout( run_object.job_id ) + self.assertEquals(stdout, "Hello world!\n") + + @skip_without_tool( "cat1-tool" ) + def test_cat1_number_cwl_n_json_file( self ): + run_object = self.dataset_populator.run_cwl_tool( "cat1-tool", "test/functional/tools/cwl_tools/draft3/cat-n-job.json") + stdout = self._get_job_stdout( run_object.job_id ) + self.assertEquals(stdout, " 1\tHello world!\n") + + @skip_without_tool( "cat2-tool" ) + def test_cat2( self ): + run_object = self.dataset_populator.run_cwl_tool( "cat2-tool", "test/functional/tools/cwl_tools/draft3/cat-job.json") + stdout = self._get_job_stdout( run_object.job_id ) + self.assertEquals(stdout, "Hello world!\n") + + @skip_without_tool( "cat4-tool" ) + def test_cat4( self ): + run_object = self.dataset_populator.run_cwl_tool( "cat4-tool", "test/functional/tools/cwl_tools/draft3/cat-job.json") + output1_content = self.dataset_populator.get_history_dataset_content( run_object.history_id ) + self.assertEquals(output1_content, "Hello world!\n") + + @skip_without_tool( "cat-default" ) + def test_cat_default( self ): + run_object = self.dataset_populator.run_cwl_tool( "cat-default", job={}) + output1_content = self.dataset_populator.get_history_dataset_content( run_object.history_id ) + self.assertEquals(output1_content, "Hello world!\n") + + @skip_without_tool( "wc-tool" ) + def test_wc( self ): + run_object = self.dataset_populator.run_cwl_tool( "wc-tool", "test/functional/tools/cwl_tools/draft3/wc-job.json") + output1_content = self.dataset_populator.get_history_dataset_content( run_object.history_id ) + if not IS_OS_X: + self.assertEquals(output1_content, " 16 198 1111\n") + else: + self.assertEquals(output1_content, " 16 198 1111\n") + + @skip_without_tool( "wc2-tool" ) + def test_wc2( self ): + run_object = self.dataset_populator.run_cwl_tool( "wc2-tool", "test/functional/tools/cwl_tools/draft3/wc-job.json") + output1_content = self.dataset_populator.get_history_dataset_content( run_object.history_id ) + self.assertEquals(output1_content, "16") + + @skip_without_tool( "wc3-tool" ) + def test_wc3( self ): + run_object = self.dataset_populator.run_cwl_tool( + "wc4-tool", + job={ + "file1": [ + { + "class": "File", + "path": "whale.txt" + }, + ], + }, + test_data_directory="test/functional/tools/cwl_tools/draft3/" + ) + output1_content = self.dataset_populator.get_history_dataset_content( run_object.history_id ) + self.assertEquals(output1_content, "16") + + @skip_without_tool( "wc4-tool" ) + def test_wc4( self ): + run_object = self.dataset_populator.run_cwl_tool( "wc4-tool", "test/functional/tools/cwl_tools/draft3/wc-job.json") + output1_content = self.dataset_populator.get_history_dataset_content( run_object.history_id ) + self.assertEquals(output1_content, "16") + + def _run_and_get_stdout( self, tool_id, history_id, inputs, **kwds): + response = self._run( tool_id, history_id, inputs, **kwds ) + assert "jobs" in response + job = response[ "jobs" ][ 0 ] + job_id = job["id"] + final_state = self.dataset_populator.wait_for_job( job_id ) + assert final_state == "ok" + return self._get_job_stdout( job_id ) + + def _get_job_stdout(self, job_id): + job_details = self.dataset_populator.get_job_details( job_id, full=True ) + stdout = job_details.json()["stdout"] + return stdout + + @skip_without_tool( "cat3-tool" ) + def test_cat3( self ): + history_id = self.dataset_populator.new_history() + hda1 = _dataset_to_param( self.dataset_populator.new_dataset( history_id, content='1\t2\t3' ) ) + inputs = { + "f1": hda1, + } + response = self._run( "cat3-tool", history_id, inputs, assert_ok=True ) + output1 = response[ "outputs" ][ 0 ] + output1_content = self.dataset_populator.get_history_dataset_content( history_id, dataset=output1 ) + assert output1_content == "1\t2\t3\n", output1_content + + @skip_without_tool( "sorttool" ) + def test_sorttool( self ): + history_id = self.dataset_populator.new_history() + hda1 = _dataset_to_param( self.dataset_populator.new_dataset( history_id, content='1\n2\n3' ) ) + inputs = { + "reverse": False, + "input": hda1 + } + response = self._run( "sorttool", history_id, inputs, assert_ok=True ) + output1 = response[ "outputs" ][ 0 ] + output1_content = self.dataset_populator.get_history_dataset_content( history_id, dataset=output1 ) + assert output1_content == "1\n2\n3\n", output1_content + + @skip_without_tool( "sorttool" ) + def test_sorttool_reverse( self ): + history_id = self.dataset_populator.new_history() + hda1 = _dataset_to_param( self.dataset_populator.new_dataset( history_id, content='1\n2\n3' ) ) + inputs = { + "reverse": True, + "input": hda1 + } + response = self._run( "sorttool", history_id, inputs, assert_ok=True ) + output1 = response[ "outputs" ][ 0 ] + output1_content = self.dataset_populator.get_history_dataset_content( history_id, dataset=output1 ) + assert output1_content == "3\n2\n1\n", output1_content + + @skip_without_tool( "env-tool1" ) + def test_env_tool1( self ): + history_id = self.dataset_populator.new_history() + inputs = { + "in": "Hello World", + } + response = self._run( "env-tool1", history_id, inputs, assert_ok=True ) + output1 = response[ "outputs" ][ 0 ] + output1_content = self.dataset_populator.get_history_dataset_content( history_id, dataset=output1 ) + self.assertEquals(output1_content, "Hello World\n") + + @skip_without_tool( "env-tool2" ) + def test_env_tool2( self ): + run_object = self.dataset_populator.run_cwl_tool( "env-tool2", "test/functional/tools/cwl_tools/draft3/env-job.json") + output1_content = self.dataset_populator.get_history_dataset_content( run_object.history_id ) + self.assertEquals(output1_content, "hello test env\n") + + @skip_without_tool( "rename" ) + def test_rename( self ): + run_object = self.dataset_populator.run_cwl_tool( "rename", "test/functional/tools/cwl_tools/draft3/rename-job.json") + output1_content = self.dataset_populator.get_history_dataset_content( run_object.history_id ) + self.assertEquals(output1_content, whale_text()) + + @skip_without_tool( "optional-output" ) + def test_optional_output( self ): + run_object = self.dataset_populator.run_cwl_tool( "optional-output", "test/functional/tools/cwl_tools/draft3/cat-job.json") + output_file = run_object.output(0) + optional_file = run_object.output(1) + output_content = self.dataset_populator.get_history_dataset_content( run_object.history_id, dataset=output_file ) + optional_content = self.dataset_populator.get_history_dataset_content( run_object.history_id, dataset=optional_file ) + self.assertEquals(output_content, "Hello world!\n") + self.assertEquals(optional_content, "null") + + @skip_without_tool( "optional-output2" ) + def test_optional_output2_on( self ): + run_object = self.dataset_populator.run_cwl_tool( + "optional-output2", + job={ + "produce": "do_write", + }, + test_data_directory="test/functional/tools/cwl_tools/draft3/" + ) + output_content = self.dataset_populator.get_history_dataset_content( run_object.history_id ) + self.assertEquals(output_content, "bees\n") + + @skip_without_tool( "optional-output2" ) + def test_optional_output2_off( self ): + run_object = self.dataset_populator.run_cwl_tool( + "optional-output2", + job={ + "produce": "dont_write", + }, + test_data_directory="test/functional/tools/cwl_tools/draft3/" + ) + output_content = self.dataset_populator.get_history_dataset_content( run_object.history_id ) + self.assertEquals(output_content, "null") + + @skip_without_tool( "index1" ) + @skip_without_tool( "showindex1" ) + def test_index1( self ): + run_object = self.dataset_populator.run_cwl_tool( + "index1", + job={ + "file": { + "class": "File", + "path": "whale.txt" + }, + }, + test_data_directory="test/functional/tools/cwl_tools/draft3/", + ) + output1 = self.dataset_populator.get_history_dataset_details( run_object.history_id ) + run_object = self.dataset_populator.run_cwl_tool( + "showindex1", + job={ + "file": { + "src": "hda", + "id": output1["id"], + }, + }, + test_data_directory="test/functional/tools/cwl_tools/draft3/", + history_id=run_object.history_id, + ) + output1_content = self.dataset_populator.get_history_dataset_content( run_object.history_id ) + assert "call: 1\n" in output1_content, output1_content + + @skip_without_tool( "any1" ) + def test_any1_0( self ): + run_object = self.dataset_populator.run_cwl_tool( + "any1", + job={"bar": 7}, + test_data_directory="test/functional/tools/cwl_tools/draft3/", + ) + output1_content = self.dataset_populator.get_history_dataset_content( run_object.history_id ) + assert output1_content == '7', output1_content + + @skip_without_tool( "any1" ) + def test_any1_1( self ): + run_object = self.dataset_populator.run_cwl_tool( + "any1", + job={"bar": "7"}, + test_data_directory="test/functional/tools/cwl_tools/draft3/", + ) + output1_content = self.dataset_populator.get_history_dataset_content( run_object.history_id ) + assert output1_content == '"7"', output1_content + + @skip_without_tool( "any1" ) + def test_any1_2( self ): + run_object = self.dataset_populator.run_cwl_tool( + "any1", + job={"bar": {"Cow": ["Turkey"]}}, + test_data_directory="test/functional/tools/cwl_tools/draft3/", + ) + output1_content = self.dataset_populator.get_history_dataset_content( run_object.history_id ) + assert output1_content == '{"Cow": ["Turkey"]}', output1_content + + @skip_without_tool( "null-expression1-tool" ) + def test_null_expression_1_1( self ): + run_object = self.dataset_populator.run_cwl_tool( "null-expression1-tool", "test/functional/tools/cwl_tools/draft3/empty.json") + output1_content = self.dataset_populator.get_history_dataset_content( run_object.history_id ) + assert output1_content == '1', output1_content + + @skip_without_tool( "null-expression1-tool" ) + def test_null_expression_1_2( self ): + run_object = self.dataset_populator.run_cwl_tool( "null-expression1-tool", "test/functional/tools/cwl_tools/draft3/null-expression2-job.json") + output1_content = self.dataset_populator.get_history_dataset_content( run_object.history_id ) + assert output1_content == '2', output1_content + + @skip_without_tool( "null-expression2-tool" ) + def test_null_expression_any_bad_1( self ): + """Test explicitly passing null to Any type without a default value fails.""" + run_object = self.dataset_populator.run_cwl_tool( "null-expression2-tool", "test/functional/tools/cwl_tools/draft3/null-expression1-job.json", assert_ok=False) + self._assert_status_code_is( run_object.run_response, 400 ) + + @skip_without_tool( "null-expression2-tool" ) + def test_null_expression_any_bad_2( self ): + """Test Any without defaults can be unspecified.""" + run_object = self.dataset_populator.run_cwl_tool( "null-expression2-tool", "test/functional/tools/cwl_tools/draft3/empty.json", assert_ok=False) + self._assert_status_code_is( run_object.run_response, 400 ) + + @skip_without_tool( "params" ) + def test_params1( self ): + run_object = self.dataset_populator.run_cwl_tool( "params", "test/functional/tools/cwl_tools/draft3/empty.json") + output1_content = self.dataset_populator.get_history_dataset_content( run_object.history_id ) + assert output1_content == '"b b"', output1_content + + @skip_without_tool( "parseInt-tool" ) + def test_parse_int_tool( self ): + run_object = self.dataset_populator.run_cwl_tool( "parseInt-tool", "test/functional/tools/cwl_tools/draft3/parseInt-job.json") + output1_content = self.dataset_populator.get_history_dataset_content( run_object.history_id, hid=2 ) + self.assertEquals(output1_content, '42') + output1 = self.dataset_populator.get_history_dataset_details( run_object.history_id, hid=2 ) + self.assertEquals(output1["extension"], "expression.json") + + # def test_dynamic_tool_execution( self ): + # workflow_tool_json = { + # 'inputs': [{'inputBinding': {}, 'type': 'File', 'id': 'file:///home/john/workspace/galaxy/test/unit/tools/cwl_tools/draft3/count-lines2-wf.cwl#step1/wc/wc_file1'}], + # 'stdout': 'output.txt', + # 'id': 'file:///home/john/workspace/galaxy/test/unit/tools/cwl_tools/draft3/count-lines2-wf.cwl#step1/wc', + # 'outputs': [{'outputBinding': {'glob': 'output.txt'}, 'type': 'File', 'id': 'file:///home/john/workspace/galaxy/test/unit/tools/cwl_tools/draft3/count-lines2-wf.cwl#step1/wc/wc_output'}], + # 'baseCommand': 'wc', + # 'class': 'CommandLineTool' + # } + + # create_payload = dict( + # representation=json.dumps(workflow_tool_json), + # ) + # create_response = self._post( "dynamic_tools", data=create_payload, admin=True ) + # self._assert_status_code_is( create_response, 200 ) + + # TODO: Use mixin so this can be shared with tools test case. + def _run( self, tool_id, history_id, inputs, assert_ok=False, tool_version=None, inputs_representation=None ): + payload = self.dataset_populator.run_tool_payload( + tool_id=tool_id, + inputs=inputs, + history_id=history_id, + inputs_representation=inputs_representation, + ) + if tool_version is not None: + payload[ "tool_version" ] = tool_version + create_response = self._post( "tools", data=payload ) + if assert_ok: + self._assert_status_code_is( create_response, 200 ) + create = create_response.json() + self._assert_has_keys( create, 'outputs' ) + return create + else: + return create_response + + +def whale_text(): + return open("test/functional/tools/cwl_tools/draft3/whale.txt", "r").read() + + +def _dataset_to_param( dataset ): + return dict( + src='hda', + id=dataset[ 'id' ] + ) diff --git a/test/api/test_workflows_cwl.py b/test/api/test_workflows_cwl.py new file mode 100644 index 000000000000..271e099a4b9a --- /dev/null +++ b/test/api/test_workflows_cwl.py @@ -0,0 +1,26 @@ +"""Test CWL workflow functionality.""" +import os + +from galaxy.util import galaxy_root_path + +from .test_workflows import BaseWorkflowsApiTestCase + +cwl_tool_directory = os.path.join(galaxy_root_path, "test", "functional", "tools", "cwl_tools") + + +class CwlWorkflowsTestCase( BaseWorkflowsApiTestCase ): + """Test case encompassing CWL workflow tests.""" + + def test_count_lines_wf1( self ): + """Test simple workflow count-lines1-wf.cwl.""" + load_response = self._load_workflow("draft3/count-lines1-wf.cwl") + self._assert_status_code_is( load_response, 200 ) + + def _load_workflow(self, rel_path): + path = os.path.join(cwl_tool_directory, rel_path) + data = dict( + from_path=path, + ) + route = "workflows" + upload_response = self._post( route, data=data, admin=True ) + return upload_response diff --git a/test/base/populators.py b/test/base/populators.py index 183d58c9903a..a948c95e321c 100644 --- a/test/base/populators.py +++ b/test/base/populators.py @@ -1,4 +1,5 @@ import json +import os.path import time from operator import itemgetter @@ -6,7 +7,7 @@ import requests from pkg_resources import resource_string -from six import StringIO +from six import iteritems, StringIO from base import api_asserts @@ -49,6 +50,48 @@ def wrapped_method( api_test_case, *args, **kwargs ): return method_wrapper +# Note: Newer than planemo version as of 2/4/16, copy back into planemo +def galactic_job_json(job, test_data_directory, upload_func): + state = {"uploaded": False} + + def upload(file_path): + state["uploaded"] = True + if not os.path.isabs(file_path): + file_path = os.path.join(test_data_directory, file_path) + + return upload_func(file_path) + + def replacement_item(value): + if not isinstance(value, dict): + return value + + type_class = value.get("class", None) + if type_class != "File": + return value + + file_path = value.get("path", None) + if file_path is None: + return value + + upload_response = upload(file_path) + dataset_id = upload_response["outputs"][0]["id"] + + return {"src": "hda", "id": dataset_id} + + replace_keys = {} + for key, value in iteritems(job): + if isinstance(value, dict): + replace_keys[key] = replacement_item(value) + elif isinstance(value, list): + new_list = [] + for item in value: + new_list.append(replacement_item(item)) + replace_keys[key] = new_list + + job.update(replace_keys) + return job, state["uploaded"] + + # Deprecated mixin, use dataset populator instead. # TODO: Rework existing tests to target DatasetPopulator in a setup method instead. class TestsDatasets: @@ -69,19 +112,37 @@ def _run_tool_payload( self, tool_id, inputs, history_id, **kwds ): return DatasetPopulator( self.galaxy_interactor ).run_tool_payload( tool_id, inputs, history_id, **kwds ) +class CwlToolRun( object ): + + def __init__( self, history_id, run_response ): + self.history_id = history_id + self.run_response = run_response + + @property + def job_id( self ): + return self.run_response[ "jobs" ][ 0 ][ "id" ] + + def output(self, output_index): + return self.run_response["outputs"][output_index] + + class BaseDatasetPopulator( object ): """ Abstract description of API operations optimized for testing Galaxy - implementations must implement _get and _post. """ def new_dataset( self, history_id, content='TestData123', wait=False, **kwds ): + run_response = self.new_dataset_request(history_id, content=content, wait=wait, **kwds) + return run_response["outputs"][0] + + def new_dataset_request( self, history_id, content='TestData123', wait=False, **kwds ): payload = self.upload_payload( history_id, content, **kwds ) run_response = self._post( "tools", data=payload ).json() if wait: job = run_response["jobs"][0] self.wait_for_job(job["id"]) self.wait_for_history(history_id, assert_ok=True) - return run_response["outputs"][0] + return run_response def wait_for_history( self, history_id, assert_ok=False, timeout=DEFAULT_TIMEOUT ): try: @@ -135,6 +196,10 @@ def run_tool_payload( self, tool_id, inputs, history_id, **kwds ): kwds[ "__files" ] = { "files_0|file_data": inputs[ "files_0|file_data" ] } del inputs[ "files_0|file_data" ] + ir = kwds.get("inputs_representation", None) + if ir is None and "inputs_representation" in kwds: + del kwds["inputs_representation"] + return dict( tool_id=tool_id, inputs=json.dumps(inputs), @@ -142,11 +207,45 @@ def run_tool_payload( self, tool_id, inputs, history_id, **kwds ): **kwds ) - def run_tool( self, tool_id, inputs, history_id, **kwds ): + def run_tool( self, tool_id, inputs, history_id, assert_ok=True, **kwds ): payload = self.run_tool_payload( tool_id, inputs, history_id, **kwds ) tool_response = self._post( "tools", data=payload ) - api_asserts.assert_status_code_is( tool_response, 200 ) - return tool_response.json() + if assert_ok: + api_asserts.assert_status_code_is( tool_response, 200 ) + return tool_response.json() + else: + return tool_response + + def run_cwl_tool( self, tool_id, json_path=None, job=None, test_data_directory=None, history_id=None, assert_ok=True ): + if test_data_directory is None and json_path is not None: + test_data_directory = os.path.dirname(json_path) + if json_path is not None: + assert job is None + with open( json_path, "r" ) as f: + job_as_dict = json.load( f ) + else: + job_as_dict = job + + if history_id is None: + history_id = self.new_history() + + def upload_path(path): + with open( path, "rb" ) as f: + content = f.read() + return self.new_dataset_request( + history_id=history_id, + content=content + ) + + job_as_dict, datasets_uploaded = galactic_job_json(job_as_dict, test_data_directory, upload_path) + if datasets_uploaded: + self.wait_for_history( history_id=history_id, assert_ok=True ) + run_response = self.run_tool( tool_id, job_as_dict, history_id, inputs_representation="cwl", assert_ok=assert_ok ) + run_object = CwlToolRun( history_id, run_response ) + if assert_ok: + final_state = self.wait_for_job( run_object.job_id ) + assert final_state == "ok" + return run_object def get_history_dataset_content( self, history_id, wait=True, **kwds ): dataset_id = self.__history_content_id( history_id, wait=wait, **kwds ) diff --git a/test/functional/tools/cwl_tools b/test/functional/tools/cwl_tools new file mode 120000 index 000000000000..f7aa3669f89a --- /dev/null +++ b/test/functional/tools/cwl_tools @@ -0,0 +1 @@ +../../unit/tools/cwl_tools \ No newline at end of file diff --git a/test/functional/tools/samples_tool_conf.xml b/test/functional/tools/samples_tool_conf.xml index 9c6577187732..1aa644eff9a9 100644 --- a/test/functional/tools/samples_tool_conf.xml +++ b/test/functional/tools/samples_tool_conf.xml @@ -110,6 +110,33 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + +