diff --git a/lib/galaxy/managers/jobs.py b/lib/galaxy/managers/jobs.py index 546085b26a2e..b7221cf0fef9 100644 --- a/lib/galaxy/managers/jobs.py +++ b/lib/galaxy/managers/jobs.py @@ -9,6 +9,7 @@ Dict, List, Optional, + Union, ) import sqlalchemy @@ -41,7 +42,10 @@ Safety, ) from galaxy.managers.collections import DatasetCollectionManager -from galaxy.managers.context import ProvidesUserContext +from galaxy.managers.context import ( + ProvidesHistoryContext, + ProvidesUserContext, +) from galaxy.managers.datasets import DatasetManager from galaxy.managers.hdas import HDAManager from galaxy.managers.lddas import LDDAManager @@ -68,6 +72,10 @@ ) from galaxy.security.idencoding import IdEncodingHelper from galaxy.structured_app import StructuredApp +from galaxy.tools._types import ( + ToolStateDumpedToJsonInternalT, + ToolStateJobInstancePopulatedT, +) from galaxy.util import ( defaultdict, ExecutionTimer, @@ -81,6 +89,9 @@ log = logging.getLogger(__name__) +JobStateT = str +JobStatesT = Union[JobStateT, List[JobStateT]] + class JobLock(BaseModel): active: bool = Field(title="Job lock status", description="If active, jobs will not dispatch") @@ -311,7 +322,15 @@ def __init__( self.ldda_manager = ldda_manager self.decode_id = id_encoding_helper.decode_id - def by_tool_input(self, trans, tool_id, tool_version, param=None, param_dump=None, job_state="ok"): + def by_tool_input( + self, + trans: ProvidesHistoryContext, + tool_id: str, + tool_version: Optional[str], + param: ToolStateJobInstancePopulatedT, + param_dump: ToolStateDumpedToJsonInternalT, + job_state: Optional[JobStatesT] = "ok", + ): """Search for jobs producing same results using the 'inputs' part of a tool POST.""" user = trans.user input_data = defaultdict(list) @@ -353,7 +372,14 @@ def populate_input_data_input_id(path, key, value): ) def __search( - self, tool_id, tool_version, user, input_data, job_state=None, param_dump=None, wildcard_param_dump=None + self, + tool_id: str, + tool_version: Optional[str], + user: model.User, + input_data, + job_state: Optional[JobStatesT], + param_dump: ToolStateDumpedToJsonInternalT, + wildcard_param_dump=None, ): search_timer = ExecutionTimer() @@ -462,7 +488,9 @@ def replace_dataset_ids(path, key, value): log.info("No equivalent jobs found %s", search_timer) return None - def _build_job_subquery(self, tool_id, user_id, tool_version, job_state, wildcard_param_dump): + def _build_job_subquery( + self, tool_id: str, user_id: int, tool_version: Optional[str], job_state, wildcard_param_dump + ): """Build subquery that selects a job with correct job parameters.""" stmt = select(model.Job.id).where( and_( diff --git a/lib/galaxy/tools/__init__.py b/lib/galaxy/tools/__init__.py index 7193fd6ea618..6c395748bf79 100644 --- a/lib/galaxy/tools/__init__.py +++ b/lib/galaxy/tools/__init__.py @@ -120,6 +120,8 @@ check_param, params_from_strings, params_to_incoming, + params_to_json, + params_to_json_internal, params_to_strings, populate_state, visit_input_values, @@ -183,7 +185,19 @@ get_tool_shed_url_from_tool_shed_registry, ) from galaxy.version import VERSION_MAJOR -from galaxy.work.context import proxy_work_context_for_history +from galaxy.work.context import ( + proxy_work_context_for_history, + WorkRequestContext, +) +from ._types import ( + InputFormatT, + ParameterValidationErrorsT, + ToolRequestT, + ToolStateDumpedToJsonInternalT, + ToolStateDumpedToJsonT, + ToolStateJobInstancePopulatedT, + ToolStateJobInstanceT, +) from .execute import ( DatasetCollectionElementsSliceT, DEFAULT_JOB_CALLBACK, @@ -195,8 +209,6 @@ ExecutionSlice, JobCallbackT, MappingParameters, - ToolParameterRequestInstanceT, - ToolParameterRequestT, ) if TYPE_CHECKING: @@ -712,7 +724,7 @@ def encode(self, tool, app, nested=False): """ Convert the data to a string """ - value = params_to_strings(tool.inputs, self.inputs, app, nested=nested) + value = cast(Dict[str, Any], params_to_strings(tool.inputs, self.inputs, app, nested=nested)) value["__page__"] = self.page value["__rerun_remap_job_id__"] = self.rerun_remap_job_id return value @@ -1514,8 +1526,8 @@ def parse_input_elem( # Repeat group input_type = input_source.parse_input_type() if input_type == "repeat": - group_r = Repeat() - group_r.name = input_source.get("name") + repeat_name = input_source.get("name") + group_r = Repeat(repeat_name) group_r.title = input_source.get("title") group_r.help = input_source.get("help", None) page_source = input_source.parse_nested_inputs_source() @@ -1531,8 +1543,8 @@ def parse_input_elem( group_r.default = cast(int, min(max(group_r.default, group_r.min), group_r.max)) rval[group_r.name] = group_r elif input_type == "conditional": - group_c = Conditional() - group_c.name = input_source.get("name") + cond_name = input_source.get("name") + group_c = Conditional(cond_name) group_c.value_ref = input_source.get("value_ref", None) group_c.value_ref_in_group = input_source.get_bool("value_ref_in_group", True) value_from = input_source.get("value_from", None) @@ -1600,8 +1612,8 @@ def parse_input_elem( group_c.cases.append(case) rval[group_c.name] = group_c elif input_type == "section": - group_s = Section() - group_s.name = input_source.get("name") + section_name = input_source.get("name") + group_s = Section(section_name) group_s.title = input_source.get("title") group_s.help = input_source.get("help", None) group_s.expanded = input_source.get_bool("expanded", False) @@ -1610,8 +1622,8 @@ def parse_input_elem( rval[group_s.name] = group_s elif input_type == "upload_dataset": elem = input_source.elem() - group_u = UploadDataset() - group_u.name = elem.get("name") + upload_name = elem.get("name") + group_u = UploadDataset(upload_name) group_u.title = elem.get("title") group_u.file_type_name = elem.get("file_type_name", group_u.file_type_name) group_u.default_file_type = elem.get("default_file_type", group_u.default_file_type) @@ -1796,22 +1808,22 @@ def visit_inputs(self, values, callback): if self.check_values: visit_input_values(self.inputs, values, callback) - def expand_incoming(self, trans, incoming, request_context, input_format="legacy"): - rerun_remap_job_id = None - if "rerun_remap_job_id" in incoming: - try: - rerun_remap_job_id = trans.app.security.decode_id(incoming["rerun_remap_job_id"]) - except Exception as exception: - log.error(str(exception)) - raise exceptions.MessageException( - "Failure executing tool with id '%s' (attempting to rerun invalid job).", self.id - ) - + def expand_incoming( + self, request_context: WorkRequestContext, incoming: ToolRequestT, input_format: InputFormatT = "legacy" + ) -> Tuple[ + List[ToolStateJobInstancePopulatedT], + List[ToolStateJobInstancePopulatedT], + Optional[int], + Optional[MatchingCollections], + ]: + rerun_remap_job_id = _rerun_remap_job_id(request_context, incoming, self.id) set_dataset_matcher_factory(request_context, self) # Fixed set of input parameters may correspond to any number of jobs. # Expand these out to individual parameters for given jobs (tool executions). - expanded_incomings, collection_info = expand_meta_parameters(trans, self, incoming) + expanded_incomings: List[ToolStateJobInstanceT] + collection_info: Optional[MatchingCollections] + expanded_incomings, collection_info = expand_meta_parameters(request_context, self, incoming) # Remapping a single job to many jobs doesn't make sense, so disable # remap if multi-runs of tools are being used. @@ -1831,39 +1843,11 @@ def expand_incoming(self, trans, incoming, request_context, input_format="legacy "internals.galaxy.tools.validation", "Validated and populated state for tool request", ) - all_errors = [] - all_params = [] + all_errors: List[ParameterValidationErrorsT] = [] + all_params: List[ToolStateJobInstancePopulatedT] = [] + for expanded_incoming in expanded_incomings: - params = {} - errors: Dict[str, str] = {} - if self.input_translator: - self.input_translator.translate(expanded_incoming) - if not self.check_values: - # If `self.check_values` is false we don't do any checking or - # processing on input This is used to pass raw values - # through to/from external sites. - params = expanded_incoming - else: - # Update state for all inputs on the current page taking new - # values from `incoming`. - populate_state( - request_context, - self.inputs, - expanded_incoming, - params, - errors, - simple_errors=False, - input_format=input_format, - ) - # If the tool provides a `validate_input` hook, call it. - validate_input = self.get_hook("validate_input") - if validate_input: - # hooks are so terrible ... this is specifically for https://github.com/galaxyproject/tools-devteam/blob/main/tool_collections/gops/basecoverage/operation_filter.py - legacy_non_dce_params = { - k: v.hda if isinstance(v, model.DatasetCollectionElement) and v.hda else v - for k, v in params.items() - } - validate_input(request_context, errors, legacy_non_dce_params, self.inputs) + params, errors = self._populate(request_context, expanded_incoming, input_format) all_errors.append(errors) all_params.append(params) unset_dataset_matcher_factory(request_context) @@ -1871,14 +1855,74 @@ def expand_incoming(self, trans, incoming, request_context, input_format="legacy log.info(validation_timer) return all_params, all_errors, rerun_remap_job_id, collection_info + def _populate( + self, request_context, expanded_incoming: ToolStateJobInstanceT, input_format: InputFormatT + ) -> Tuple[ToolStateJobInstancePopulatedT, ParameterValidationErrorsT]: + """Validate expanded parameters for a job to replace references with model objects. + + So convert a ToolStateJobInstanceT to a ToolStateJobInstancePopulatedT. + """ + params: ToolStateJobInstancePopulatedT = {} + errors: ParameterValidationErrorsT = {} + if self.input_translator: + self.input_translator.translate(expanded_incoming) + if not self.check_values: + # If `self.check_values` is false we don't do any checking or + # processing on input This is used to pass raw values + # through to/from external sites. + params = cast(ToolStateJobInstancePopulatedT, expanded_incoming) + else: + # Update state for all inputs on the current page taking new + # values from `incoming`. + populate_state( + request_context, + self.inputs, + expanded_incoming, + params, + errors, + simple_errors=False, + input_format=input_format, + ) + # If the tool provides a `validate_input` hook, call it. + validate_input = self.get_hook("validate_input") + if validate_input: + # hooks are so terrible ... this is specifically for https://github.com/galaxyproject/tools-devteam/blob/main/tool_collections/gops/basecoverage/operation_filter.py + legacy_non_dce_params = { + k: v.hda if isinstance(v, model.DatasetCollectionElement) and v.hda else v + for k, v in params.items() + } + validate_input(request_context, errors, legacy_non_dce_params, self.inputs) + return params, errors + + def completed_jobs( + self, trans, use_cached_job: bool, all_params: List[ToolStateJobInstancePopulatedT] + ) -> Dict[int, Optional[model.Job]]: + completed_jobs: Dict[int, Optional[model.Job]] = {} + for i, param in enumerate(all_params): + if use_cached_job: + tool_id = self.id + assert tool_id + param_dump: ToolStateDumpedToJsonInternalT = params_to_json_internal(self.inputs, param, self.app) + completed_jobs[i] = self.job_search.by_tool_input( + trans=trans, + tool_id=tool_id, + tool_version=self.version, + param=param, + param_dump=param_dump, + job_state=None, + ) + else: + completed_jobs[i] = None + return completed_jobs + def handle_input( self, trans, - incoming: ToolParameterRequestT, + incoming: ToolRequestT, history: Optional[model.History] = None, use_cached_job: bool = DEFAULT_USE_CACHED_JOB, preferred_object_store_id: Optional[str] = DEFAULT_PREFERRED_OBJECT_STORE_ID, - input_format: str = "legacy", + input_format: InputFormatT = "legacy", ): """ Process incoming parameters for this tool from the dict `incoming`, @@ -1887,26 +1931,17 @@ def handle_input( there were no errors). """ request_context = proxy_work_context_for_history(trans, history=history) - all_params, all_errors, rerun_remap_job_id, collection_info = self.expand_incoming( - trans=trans, incoming=incoming, request_context=request_context, input_format=input_format - ) + expanded = self.expand_incoming(request_context, incoming=incoming, input_format=input_format) + all_params: List[ToolStateJobInstancePopulatedT] = expanded[0] + all_errors: List[ParameterValidationErrorsT] = expanded[1] + rerun_remap_job_id: Optional[int] = expanded[2] + collection_info: Optional[MatchingCollections] = expanded[3] + # If there were errors, we stay on the same page and display them self.handle_incoming_errors(all_errors) mapping_params = MappingParameters(incoming, all_params) - completed_jobs: Dict[int, Optional[model.Job]] = {} - for i, param in enumerate(all_params): - if use_cached_job: - completed_jobs[i] = self.job_search.by_tool_input( - trans=trans, - tool_id=self.id, - tool_version=self.version, - param=param, - param_dump=self.params_to_strings(param, self.app, nested=True), - job_state=None, - ) - else: - completed_jobs[i] = None + completed_jobs: Dict[int, Optional[model.Job]] = self.completed_jobs(trans, use_cached_job, all_params) execution_tracker = execute_job( trans, self, @@ -1935,7 +1970,7 @@ def handle_input( implicit_collections=execution_tracker.implicit_collections, ) - def handle_incoming_errors(self, all_errors): + def handle_incoming_errors(self, all_errors: List[ParameterValidationErrorsT]) -> None: if any(all_errors): # simple param_key -> message string for tool form. err_data = {key: unicodify(value) for d in all_errors for (key, value) in d.items()} @@ -2060,7 +2095,7 @@ def get_static_param_values(self, trans): def execute( self, trans, - incoming: Optional[ToolParameterRequestInstanceT] = None, + incoming: Optional[ToolStateJobInstancePopulatedT] = None, history: Optional[model.History] = None, set_output_hid: bool = DEFAULT_SET_OUTPUT_HID, flush_job: bool = True, @@ -2086,7 +2121,7 @@ def execute( def _execute( self, trans, - incoming: Optional[ToolParameterRequestInstanceT] = None, + incoming: Optional[ToolStateJobInstancePopulatedT] = None, history: Optional[model.History] = None, rerun_remap_job_id: Optional[int] = DEFAULT_RERUN_REMAP_JOB_ID, execution_cache: Optional[ToolExecutionCache] = None, @@ -2128,7 +2163,7 @@ def _execute( log.error("Tool execution failed for job: %s", job_id) raise - def params_to_strings(self, params, app, nested=False): + def params_to_strings(self, params: ToolStateJobInstancePopulatedT, app, nested=False): return params_to_strings(self.inputs, params, app, nested) def params_from_strings(self, params, app, ignore_errors=False): @@ -2581,6 +2616,8 @@ def to_json(self, trans, kwd=None, job=None, workflow_building_mode=False, histo else: action = self.app.url_for(self.action) + state_inputs_json: ToolStateDumpedToJsonT = params_to_json(self.inputs, state_inputs, self.app) + # update tool model tool_model.update( { @@ -2594,7 +2631,7 @@ def to_json(self, trans, kwd=None, job=None, workflow_building_mode=False, histo "requirements": [{"name": r.name, "version": r.version} for r in self.requirements], "errors": state_errors, "tool_errors": self.tool_errors, - "state_inputs": params_to_strings(self.inputs, state_inputs, self.app, use_security=True, nested=True), + "state_inputs": state_inputs_json, "job_id": trans.security.encode_id(job.id) if job else None, "job_remap": job.remappable() if job else None, "history_id": trans.security.encode_id(history.id) if history else None, @@ -4110,6 +4147,21 @@ def produce_outputs(self, trans, out_data, output_collections, incoming, history # ---- Utility classes to be factored out ----------------------------------- + + +def _rerun_remap_job_id(trans, incoming, tool_id: Optional[str]) -> Optional[int]: + rerun_remap_job_id = None + if "rerun_remap_job_id" in incoming: + try: + rerun_remap_job_id = trans.app.security.decode_id(incoming["rerun_remap_job_id"]) + except Exception as exception: + log.error(str(exception)) + raise exceptions.MessageException( + "Failure executing tool with id '%s' (attempting to rerun invalid job).", tool_id + ) + return rerun_remap_job_id + + class TracksterConfig: """Trackster configuration encapsulation.""" diff --git a/lib/galaxy/tools/_types.py b/lib/galaxy/tools/_types.py new file mode 100644 index 000000000000..7237a98780b8 --- /dev/null +++ b/lib/galaxy/tools/_types.py @@ -0,0 +1,64 @@ +""" +Tool state goes through several different iterations that are difficult to follow I think, +hopefully datatypes can be used as markers to describe what has been done - even if they don't +provide strong traditional typing semantics. + ++--------------------------------+------------+---------------------------------+------------+-----------+ +| Python Type | State for? | Object References | Validated? | xref | ++================================+============+=================================+============+===========+ +| ToolRequestT | request | src dicts of encoded ids | nope | | +| ToolStateJobInstanceT | a job | src dicts of encoded ids | nope | | +| ToolStateJobInstancePopulatedT | a job | model objs loaded from db | check_param | | +| ToolStateDumpedToJsonT | a job | src dicts of encoded ids | " | | +| | | (normalized into values attr) | " | | +| ToolStateDumpedToJsonInternalT | a job | src dicts of decoded ids | " | | +| | | (normalized into values attr) | " | | +| ToolStateDumpedToStringsT | a job | src dicts dumped to strs | " | | +| | | (normalized into values attr) | " | | ++--------------------------------+------------+---------------------------------+-------------+----------+ +""" + +from typing import ( + Any, + Dict, +) + +from typing_extensions import Literal + +# Input dictionary from the API, may include map/reduce instructions. Objects are referenced by "src" +# dictionaries and encoded IDS. +ToolRequestT = Dict[str, Any] + +# Input dictionary extracted from a tool request for running a tool individually as a single job. Objects are referenced +# by "src" dictionaries with encoded IDs still but batch instructions have been pulled out. Parameters have not +# been "checked" (check_param has not been called). +ToolStateJobInstanceT = Dict[str, Any] + +# Input dictionary for an individual job where objects are their model objects and parameters have been +# "checked" (check_param has been called). +ToolStateJobInstancePopulatedT = Dict[str, Any] + +# Input dictionary for an individual where the state has been valiated and populated but then converted back down +# to json. Object references are unified in the format of {"values": List["src" dictionary]} where the src dictionaries. +# are decoded ids (ints). +# See comments on galaxy.tools.parameters.params_to_strings for more information. +ToolStateDumpedToJsonInternalT = Dict[str, Any] + +# Input dictionary for an individual where the state has been valiated and populated but then converted back down +# to json. Object references are unified in the format of {"values": List["src" dictionary]} where src dictonaries +# are encoded (ids). See comments on galaxy.tools.parameters.params_to_strings for more information. +ToolStateDumpedToJsonT = Dict[str, Any] + +# Input dictionary for an individual where the state has been valiated and populated but then converted back down +# to json. Object references are unified in the format of {"values": List["src" dictionary]} but dumped into +# strings. See comments on galaxy.tools.parameters.params_to_strings for more information. This maybe should be +# broken into separate types for encoded and decoded IDs in subsequent type refinements if both are used, it not +# this comment should be updated to indicate which is used exclusively. +ToolStateDumpedToStringsT = Dict[str, str] + +# A dictionary of error messages that occur while attempting to validate a ToolStateJobInstanceT and transform it +# into a ToolStateJobInstancePopulatedT with model objects populated. Tool errors indicate the job should not be +# further processed. +ParameterValidationErrorsT = Dict[str, str] + +InputFormatT = Literal["legacy", "21.01"] diff --git a/lib/galaxy/tools/actions/__init__.py b/lib/galaxy/tools/actions/__init__.py index 989ab5a6ad45..1c3969236f13 100644 --- a/lib/galaxy/tools/actions/__init__.py +++ b/lib/galaxy/tools/actions/__init__.py @@ -37,6 +37,7 @@ from galaxy.model.dataset_collections.matching import MatchingCollections from galaxy.model.none_like import NoneDataset from galaxy.objectstore import ObjectStorePopulator +from galaxy.tools._types import ToolStateJobInstancePopulatedT from galaxy.tools.execute import ( DatasetCollectionElementsSliceT, DEFAULT_DATASET_COLLECTION_ELEMENTS, @@ -45,7 +46,6 @@ DEFAULT_RERUN_REMAP_JOB_ID, DEFAULT_SET_OUTPUT_HID, JobCallbackT, - ToolParameterRequestInstanceT, ) from galaxy.tools.execution_helpers import ( filter_output, @@ -88,7 +88,7 @@ def execute( self, tool, trans, - incoming: Optional[ToolParameterRequestInstanceT] = None, + incoming: Optional[ToolStateJobInstancePopulatedT] = None, history: Optional[History] = None, job_params=None, rerun_remap_job_id: Optional[int] = DEFAULT_RERUN_REMAP_JOB_ID, @@ -401,7 +401,7 @@ def execute( self, tool, trans, - incoming: Optional[ToolParameterRequestInstanceT] = None, + incoming: Optional[ToolStateJobInstancePopulatedT] = None, history: Optional[History] = None, job_params=None, rerun_remap_job_id: Optional[int] = DEFAULT_RERUN_REMAP_JOB_ID, diff --git a/lib/galaxy/tools/actions/data_manager.py b/lib/galaxy/tools/actions/data_manager.py index c24e86fd0afb..d8786ad5a921 100644 --- a/lib/galaxy/tools/actions/data_manager.py +++ b/lib/galaxy/tools/actions/data_manager.py @@ -7,6 +7,7 @@ ) from galaxy.model.base import transaction from galaxy.model.dataset_collections.matching import MatchingCollections +from galaxy.tools._types import ToolStateJobInstancePopulatedT from galaxy.tools.execute import ( DatasetCollectionElementsSliceT, DEFAULT_DATASET_COLLECTION_ELEMENTS, @@ -15,7 +16,6 @@ DEFAULT_RERUN_REMAP_JOB_ID, DEFAULT_SET_OUTPUT_HID, JobCallbackT, - ToolParameterRequestInstanceT, ) from galaxy.tools.execution_helpers import ToolExecutionCache from . import ( @@ -33,7 +33,7 @@ def execute( self, tool, trans, - incoming: Optional[ToolParameterRequestInstanceT] = None, + incoming: Optional[ToolStateJobInstancePopulatedT] = None, history: Optional[History] = None, job_params=None, rerun_remap_job_id: Optional[int] = DEFAULT_RERUN_REMAP_JOB_ID, diff --git a/lib/galaxy/tools/actions/history_imp_exp.py b/lib/galaxy/tools/actions/history_imp_exp.py index 848995c61dac..42d9db10f68f 100644 --- a/lib/galaxy/tools/actions/history_imp_exp.py +++ b/lib/galaxy/tools/actions/history_imp_exp.py @@ -23,13 +23,13 @@ DEFAULT_RERUN_REMAP_JOB_ID, DEFAULT_SET_OUTPUT_HID, JobCallbackT, - ToolParameterRequestInstanceT, ) from galaxy.tools.execution_helpers import ToolExecutionCache from galaxy.tools.imp_exp import ( JobExportHistoryArchiveWrapper, JobImportHistoryArchiveWrapper, ) +from galaxy.tools._types import ToolStateJobInstancePopulatedT from galaxy.util import ready_name_for_url log = logging.getLogger(__name__) @@ -44,7 +44,7 @@ def execute( self, tool, trans, - incoming: Optional[ToolParameterRequestInstanceT] = None, + incoming: Optional[ToolStateJobInstancePopulatedT] = None, history: Optional[History] = None, job_params=None, rerun_remap_job_id: Optional[int] = DEFAULT_RERUN_REMAP_JOB_ID, @@ -121,7 +121,7 @@ def execute( self, tool, trans, - incoming: Optional[ToolParameterRequestInstanceT] = None, + incoming: Optional[ToolStateJobInstancePopulatedT] = None, history: Optional[History] = None, job_params=None, rerun_remap_job_id: Optional[int] = DEFAULT_RERUN_REMAP_JOB_ID, diff --git a/lib/galaxy/tools/actions/metadata.py b/lib/galaxy/tools/actions/metadata.py index f7d6ce844a9d..2b46c6060ccd 100644 --- a/lib/galaxy/tools/actions/metadata.py +++ b/lib/galaxy/tools/actions/metadata.py @@ -16,6 +16,7 @@ ) from galaxy.model.base import transaction from galaxy.model.dataset_collections.matching import MatchingCollections +from galaxy.tools._types import ToolStateJobInstancePopulatedT from galaxy.tools.execute import ( DatasetCollectionElementsSliceT, DEFAULT_DATASET_COLLECTION_ELEMENTS, @@ -24,7 +25,6 @@ DEFAULT_RERUN_REMAP_JOB_ID, DEFAULT_SET_OUTPUT_HID, JobCallbackT, - ToolParameterRequestInstanceT, ) from galaxy.tools.execution_helpers import ToolExecutionCache from galaxy.util import asbool @@ -43,7 +43,7 @@ def execute( self, tool, trans, - incoming: Optional[ToolParameterRequestInstanceT] = None, + incoming: Optional[ToolStateJobInstancePopulatedT] = None, history: Optional[History] = None, job_params=None, rerun_remap_job_id: Optional[int] = DEFAULT_RERUN_REMAP_JOB_ID, diff --git a/lib/galaxy/tools/actions/model_operations.py b/lib/galaxy/tools/actions/model_operations.py index 1b18adcf39f6..fdfecfb9c65e 100644 --- a/lib/galaxy/tools/actions/model_operations.py +++ b/lib/galaxy/tools/actions/model_operations.py @@ -10,6 +10,7 @@ ) from galaxy.model.dataset_collections.matching import MatchingCollections from galaxy.objectstore import ObjectStorePopulator +from galaxy.tools._types import ToolStateJobInstancePopulatedT from galaxy.tools.actions import ( DefaultToolAction, OutputCollections, @@ -24,7 +25,6 @@ DEFAULT_RERUN_REMAP_JOB_ID, DEFAULT_SET_OUTPUT_HID, JobCallbackT, - ToolParameterRequestInstanceT, ) from galaxy.tools.execution_helpers import ToolExecutionCache @@ -52,7 +52,7 @@ def execute( self, tool, trans, - incoming: Optional[ToolParameterRequestInstanceT] = None, + incoming: Optional[ToolStateJobInstancePopulatedT] = None, history: Optional[History] = None, job_params=None, rerun_remap_job_id: Optional[int] = DEFAULT_RERUN_REMAP_JOB_ID, diff --git a/lib/galaxy/tools/actions/upload.py b/lib/galaxy/tools/actions/upload.py index b85bba71a0d2..fcaa431973f0 100644 --- a/lib/galaxy/tools/actions/upload.py +++ b/lib/galaxy/tools/actions/upload.py @@ -20,9 +20,9 @@ DEFAULT_RERUN_REMAP_JOB_ID, DEFAULT_SET_OUTPUT_HID, JobCallbackT, - ToolParameterRequestInstanceT, ) from galaxy.tools.execution_helpers import ToolExecutionCache +from galaxy.tools._types import ToolStateJobInstancePopulatedT from galaxy.util import ExecutionTimer from galaxy.util.bunch import Bunch from . import ( @@ -40,7 +40,7 @@ def execute( self, tool, trans, - incoming: Optional[ToolParameterRequestInstanceT] = None, + incoming: Optional[ToolStateJobInstancePopulatedT] = None, history: Optional[History] = None, job_params=None, rerun_remap_job_id: Optional[int] = DEFAULT_RERUN_REMAP_JOB_ID, diff --git a/lib/galaxy/tools/execute.py b/lib/galaxy/tools/execute.py index aef512f61f37..47dc3906416c 100644 --- a/lib/galaxy/tools/execute.py +++ b/lib/galaxy/tools/execute.py @@ -15,6 +15,8 @@ List, NamedTuple, Optional, + Tuple, + Union, ) from boltons.iterutils import remap @@ -35,6 +37,10 @@ ToolExecutionCache, ) from galaxy.tools.parameters.workflow_utils import is_runtime_value +from ._types import ( + ToolRequestT, + ToolStateJobInstancePopulatedT, +) if typing.TYPE_CHECKING: from galaxy.tools import Tool @@ -48,10 +54,6 @@ CompletedJobsT = Dict[int, Optional[model.Job]] JobCallbackT = Callable WorkflowResourceParametersT = Dict[str, Any] -# Input dictionary from the API, may include map/reduce instructions -ToolParameterRequestT = Dict[str, Any] -# Input dictionary extracted from a tool request for running a tool individually -ToolParameterRequestInstanceT = Dict[str, Any] DatasetCollectionElementsSliceT = Dict[str, model.DatasetCollectionElement] DEFAULT_USE_CACHED_JOB = False DEFAULT_PREFERRED_OBJECT_STORE_ID: Optional[str] = None @@ -67,8 +69,8 @@ def __init__(self, execution_tracker: "ExecutionTracker"): class MappingParameters(NamedTuple): - param_template: ToolParameterRequestT - param_combinations: List[ToolParameterRequestInstanceT] + param_template: ToolRequestT + param_combinations: List[ToolStateJobInstancePopulatedT] def execute( @@ -242,14 +244,14 @@ def execute_single_job(execution_slice: "ExecutionSlice", completed_job: Optiona class ExecutionSlice: job_index: int - param_combination: ToolParameterRequestInstanceT + param_combination: ToolStateJobInstancePopulatedT dataset_collection_elements: Optional[DatasetCollectionElementsSliceT] history: Optional[model.History] def __init__( self, job_index: int, - param_combination: ToolParameterRequestInstanceT, + param_combination: ToolStateJobInstancePopulatedT, dataset_collection_elements: Optional[DatasetCollectionElementsSliceT] = DEFAULT_DATASET_COLLECTION_ELEMENTS, ): self.job_index = job_index @@ -258,7 +260,16 @@ def __init__( self.history = None +ExecutionErrorsT = Union[str, Exception] + + class ExecutionTracker: + execution_errors: List[ExecutionErrorsT] + successful_jobs: List[model.Job] + output_datasets: List[model.HistoryDatasetAssociation] + output_collections: List[Tuple[str, model.HistoryDatasetCollectionAssociation]] + implicit_collections: Dict[str, model.HistoryDatasetCollectionAssociation] + def __init__( self, trans, @@ -312,8 +323,9 @@ def record_error(self, error): @property def on_text(self): - if self._on_text is None: - collection_names = ["collection %d" % c.hid for c in self.collection_info.collections.values()] + collection_info = self.collection_info + if self._on_text is None and collection_info is not None: + collection_names = ["collection %d" % c.hid for c in collection_info.collections.values()] self._on_text = on_text_for_names(collection_names) return self._on_text @@ -371,6 +383,8 @@ def find_collection(input_dict, input_name): ) subcollection_mapping_type = None if self.is_implicit_input(input_name): + collection_info = self.collection_info + assert collection_info subcollection_mapping_type = self.collection_info.subcollection_mapping_type(input_name) return get_structure( @@ -423,10 +437,10 @@ def precreate_output_collections(self, history, params): # collection replaced with a specific dataset. Need to replace this # with the collection and wrap everything up so can evaluate output # label. + collection_info = self.collection_info + assert collection_info trans = self.trans - params.update( - self.collection_info.collections - ) # Replace datasets with source collections for labelling outputs. + params.update(collection_info.collections) # Replace datasets with source collections for labelling outputs. collection_instances = {} implicit_inputs = self.implicit_inputs @@ -518,8 +532,10 @@ def finalize_dataset_collections(self, trans): implicit_collection_jobs = implicit_collection.implicit_collection_jobs implicit_collection_jobs.populated_state = "ok" trans.sa_session.add(implicit_collection_jobs) + collection_info = self.collection_info + assert collection_info implicit_collection.collection.finalize( - collection_type_description=self.collection_info.structure.collection_type_description + collection_type_description=collection_info.structure.collection_type_description ) # Mark implicit HDCA as copied @@ -538,14 +554,20 @@ def finalize_dataset_collections(self, trans): @property def implicit_inputs(self): - implicit_inputs = list(self.collection_info.collections.items()) + collection_info = self.collection_info + assert collection_info + implicit_inputs = list(collection_info.collections.items()) return implicit_inputs def is_implicit_input(self, input_name): - return input_name in self.collection_info.collections + collection_info = self.collection_info + assert collection_info + return input_name in collection_info.collections def walk_implicit_collections(self): - return self.collection_info.structure.walk_collections(self.implicit_collections) + collection_info = self.collection_info + assert collection_info + return collection_info.structure.walk_collections(self.implicit_collections) def new_execution_slices(self): if self.collection_info is None: @@ -594,7 +616,9 @@ def __init__( # New to track these things for tool output API response in the tool case, # in the workflow case we just write stuff to the database and forget about # it. - self.outputs_by_output_name = collections.defaultdict(list) + self.outputs_by_output_name: Dict[str, List[Union[model.DatasetInstance, model.DatasetCollection]]] = ( + collections.defaultdict(list) + ) def record_success(self, execution_slice, job, outputs): super().record_success(execution_slice, job, outputs) diff --git a/lib/galaxy/tools/parameters/__init__.py b/lib/galaxy/tools/parameters/__init__.py index 7e72bf3a9eb1..9a6406ecb174 100644 --- a/lib/galaxy/tools/parameters/__init__.py +++ b/lib/galaxy/tools/parameters/__init__.py @@ -4,7 +4,9 @@ from json import dumps from typing import ( + cast, Dict, + Optional, Union, ) @@ -32,12 +34,23 @@ runtime_to_json, ) from .wrapped import flat_to_nested_state +from .._types import ( + InputFormatT, + ParameterValidationErrorsT, + ToolStateDumpedToJsonInternalT, + ToolStateDumpedToJsonT, + ToolStateDumpedToStringsT, + ToolStateJobInstancePopulatedT, + ToolStateJobInstanceT, +) REPLACE_ON_TRUTHY = object() # Some tools use the code tag and access the code base, expecting certain tool parameters to be available here. __all__ = ("DataCollectionToolParameter", "DataToolParameter", "SelectToolParameter") +ToolInputsT = Dict[str, Union[Group, ToolParameter]] + def visit_input_values( inputs, @@ -253,15 +266,37 @@ def check_param(trans, param, incoming_value, param_values, simple_errors=True): return value, error +def params_to_json_internal( + params: ToolInputsT, param_values: ToolStateJobInstancePopulatedT, app +) -> ToolStateDumpedToJsonInternalT: + """Return ToolStateDumpedToJsonT for supplied validated and populated parameters.""" + return cast( + ToolStateDumpedToJsonInternalT, params_to_strings(params, param_values, app, nested=True, use_security=False) + ) + + +def params_to_json(params: ToolInputsT, param_values: ToolStateJobInstancePopulatedT, app) -> ToolStateDumpedToJsonT: + """Return ToolStateDumpedToJsonT for supplied validated and populated parameters.""" + return cast(ToolStateDumpedToJsonT, params_to_strings(params, param_values, app, nested=True, use_security=True)) + + def params_to_strings( - params: Dict[str, Union[Group, ToolParameter]], param_values: Dict, app, nested=False, use_security=False -) -> Dict: + params: ToolInputsT, + param_values: ToolStateJobInstancePopulatedT, + app, + nested=False, + use_security=False, +) -> Union[ToolStateDumpedToJsonT, ToolStateDumpedToJsonInternalT, ToolStateDumpedToStringsT]: """ Convert a dictionary of parameter values to a dictionary of strings suitable for persisting. The `value_to_basic` method of each parameter is called to convert its value to basic types, the result of which is then json encoded (this allowing complex nested parameters and - such). + such). If `nested` this will remain as a sort of JSON-ifiable dictionary + (ToolStateDumpedToJsonT), otherwise these will dumped into strings of the + JSON (ToolStateDumpedToStringsT). If use_security is False, this will return + object references with decoded (integer) IDs, otherwise they will be encoded + strings. """ rval = {} for key, value in param_values.items(): @@ -344,14 +379,14 @@ def replace_dataset_ids(path, key, value): def populate_state( request_context, - inputs, - incoming, - state, - errors=None, + inputs: ToolInputsT, + incoming: ToolStateJobInstanceT, + state: ToolStateJobInstancePopulatedT, + errors: Optional[ParameterValidationErrorsT] = None, context=None, check=True, simple_errors=True, - input_format="legacy", + input_format: InputFormatT = "legacy", ): """ Populates nested state dict from incoming parameter values. @@ -431,9 +466,9 @@ def populate_state( else: del group_state[:] for rep in incoming[input.name]: - new_state = {} + new_state: ToolStateJobInstancePopulatedT = {} group_state.append(new_state) - new_errors = {} + new_errors: ParameterValidationErrorsT = {} populate_state( request_context, input.inputs, @@ -516,7 +551,15 @@ def populate_state( def _populate_state_legacy( - request_context, inputs, incoming, state, errors, prefix="", context=None, check=True, simple_errors=True + request_context, + inputs: ToolInputsT, + incoming: ToolStateJobInstanceT, + state: ToolStateJobInstancePopulatedT, + errors, + prefix="", + context=None, + check=True, + simple_errors=True, ): if context is None: context = flat_to_nested_state(incoming) @@ -527,22 +570,23 @@ def _populate_state_legacy( group_state = state[input.name] group_prefix = f"{key}|" if input.type == "repeat": + repeat_input = cast(Repeat, input) rep_index = 0 del group_state[:] while True: rep_prefix = "%s_%d" % (key, rep_index) - rep_min_default = input.default if input.default > input.min else input.min + rep_min_default = repeat_input.default if repeat_input.default > repeat_input.min else repeat_input.min if ( not any(incoming_key.startswith(rep_prefix) for incoming_key in incoming.keys()) and rep_index >= rep_min_default ): break - if rep_index < input.max: - new_state = {"__index__": rep_index} + if rep_index < repeat_input.max: + new_state: ToolStateJobInstancePopulatedT = {"__index__": rep_index} group_state.append(new_state) _populate_state_legacy( request_context, - input.inputs, + repeat_input.inputs, incoming, new_state, errors, @@ -553,13 +597,20 @@ def _populate_state_legacy( ) rep_index += 1 elif input.type == "conditional": - if input.value_ref and not input.value_ref_in_group: - test_param_key = prefix + input.test_param.name + conditional_input = cast(Conditional, input) + if conditional_input.value_ref and not conditional_input.value_ref_in_group: + test_param_key = prefix + conditional_input.test_param.name else: - test_param_key = group_prefix + input.test_param.name - test_param_value = incoming.get(test_param_key, group_state.get(input.test_param.name)) + test_param_key = group_prefix + conditional_input.test_param.name + test_param_value = incoming.get(test_param_key, group_state.get(conditional_input.test_param.name)) value, error = ( - check_param(request_context, input.test_param, test_param_value, context, simple_errors=simple_errors) + check_param( + request_context, + conditional_input.test_param, + test_param_value, + context, + simple_errors=simple_errors, + ) if check else [test_param_value, None] ) @@ -567,11 +618,11 @@ def _populate_state_legacy( errors[test_param_key] = error else: try: - current_case = input.get_current_case(value) - group_state = state[input.name] = {} + current_case = conditional_input.get_current_case(value) + group_state = state[conditional_input.name] = cast(ToolStateJobInstancePopulatedT, {}) _populate_state_legacy( request_context, - input.cases[current_case].inputs, + conditional_input.cases[current_case].inputs, incoming, group_state, errors, @@ -585,9 +636,10 @@ def _populate_state_legacy( errors[test_param_key] = "The selected case is unavailable/invalid." group_state[input.test_param.name] = value elif input.type == "section": + section_input = cast(Section, input) _populate_state_legacy( request_context, - input.inputs, + section_input.inputs, incoming, group_state, errors, @@ -597,20 +649,21 @@ def _populate_state_legacy( simple_errors=simple_errors, ) elif input.type == "upload_dataset": - file_count = input.get_file_count(request_context, context) + dataset_input = cast(UploadDataset, input) + file_count = dataset_input.get_file_count(request_context, context) while len(group_state) > file_count: del group_state[-1] while file_count > len(group_state): - new_state = {"__index__": len(group_state)} - for upload_item in input.inputs.values(): - new_state[upload_item.name] = upload_item.get_initial_value(request_context, context) - group_state.append(new_state) + new_state_upload: ToolStateJobInstancePopulatedT = {"__index__": len(group_state)} + for upload_item in dataset_input.inputs.values(): + new_state_upload[upload_item.name] = upload_item.get_initial_value(request_context, context) + group_state.append(new_state_upload) for rep_index, rep_state in enumerate(group_state): rep_index = rep_state.get("__index__", rep_index) rep_prefix = "%s_%d|" % (key, rep_index) _populate_state_legacy( request_context, - input.inputs, + dataset_input.inputs, incoming, rep_state, errors, diff --git a/lib/galaxy/tools/parameters/basic.py b/lib/galaxy/tools/parameters/basic.py index 5eeba8aaecaa..9669b62771e9 100644 --- a/lib/galaxy/tools/parameters/basic.py +++ b/lib/galaxy/tools/parameters/basic.py @@ -174,6 +174,7 @@ class ToolParameter(UsesDictVisibleKeys): >>> assert sorted(p.to_dict(trans).items()) == [('argument', '--parameter-name'), ('help', ''), ('hidden', False), ('is_dynamic', False), ('label', ''), ('model_class', 'ToolParameter'), ('name', 'parameter_name'), ('optional', False), ('refresh_on_change', False), ('type', 'text'), ('value', None)] """ + name: str dict_collection_visible_keys = ["name", "argument", "type", "label", "help", "refresh_on_change"] def __init__(self, tool, input_source, context=None): diff --git a/lib/galaxy/tools/parameters/grouping.py b/lib/galaxy/tools/parameters/grouping.py index 9962e61cf624..3a62b111b3e9 100644 --- a/lib/galaxy/tools/parameters/grouping.py +++ b/lib/galaxy/tools/parameters/grouping.py @@ -58,9 +58,10 @@ class Group(UsesDictVisibleKeys): dict_collection_visible_keys = ["name", "type"] type: str + name: str - def __init__(self): - self.name = None + def __init__(self, name: str): + self.name = name @property def visible(self): @@ -95,8 +96,8 @@ class Repeat(Group): dict_collection_visible_keys = ["name", "type", "title", "help", "default", "min", "max"] type = "repeat" - def __init__(self): - Group.__init__(self) + def __init__(self, name: str): + Group.__init__(self, name) self._title = None self.inputs = None self.help = None @@ -187,8 +188,8 @@ class Section(Group): dict_collection_visible_keys = ["name", "type", "title", "help", "expanded"] type = "section" - def __init__(self): - Group.__init__(self) + def __init__(self, name: str): + Group.__init__(self, name) self.title = None self.inputs = None self.help = None @@ -267,8 +268,8 @@ class Dataset(Bunch): class UploadDataset(Group): type = "upload_dataset" - def __init__(self): - Group.__init__(self) + def __init__(self, name: str): + Group.__init__(self, name) self.title = None self.inputs = None self.file_type_name = "file_type" diff --git a/lib/galaxy/tools/parameters/meta.py b/lib/galaxy/tools/parameters/meta.py index fee9f6d6079e..8c87df3f2fe5 100644 --- a/lib/galaxy/tools/parameters/meta.py +++ b/lib/galaxy/tools/parameters/meta.py @@ -21,6 +21,10 @@ ) from galaxy.util import permutations from . import visit_input_values +from .._types import ( + ToolRequestT, + ToolStateJobInstanceT, +) from .wrapped import process_key log = logging.getLogger(__name__) @@ -154,10 +158,10 @@ def is_batch(value): return WorkflowParameterExpansion(param_combinations, params_keys, input_combinations) -ExpandedT = Tuple[List[Dict[str, Any]], Optional[matching.MatchingCollections]] +ExpandedT = Tuple[List[ToolStateJobInstanceT], Optional[matching.MatchingCollections]] -def expand_meta_parameters(trans, tool, incoming) -> ExpandedT: +def expand_meta_parameters(trans, tool, incoming: ToolRequestT) -> ExpandedT: """ Take in a dictionary of raw incoming parameters and expand to a list of expanded incoming parameters (one set of parameters per tool diff --git a/lib/galaxy/webapps/galaxy/api/jobs.py b/lib/galaxy/webapps/galaxy/api/jobs.py index 6aebebe5ec3c..9eb5efb40938 100644 --- a/lib/galaxy/webapps/galaxy/api/jobs.py +++ b/lib/galaxy/webapps/galaxy/api/jobs.py @@ -71,7 +71,7 @@ JobIndexViewEnum, JobsService, ) -from galaxy.work.context import WorkRequestContext +from galaxy.work.context import proxy_work_context_for_history log = logging.getLogger(__name__) @@ -478,10 +478,8 @@ def search( for k, v in payload.__annotations__.items(): if k.startswith("files_") or k.startswith("__files_"): inputs[k] = v - request_context = WorkRequestContext(app=trans.app, user=trans.user, history=trans.history) - all_params, all_errors, _, _ = tool.expand_incoming( - trans=trans, incoming=inputs, request_context=request_context - ) + request_context = proxy_work_context_for_history(trans) + all_params, all_errors, _, _ = tool.expand_incoming(request_context, incoming=inputs) if any(all_errors): return [] params_dump = [tool.params_to_strings(param, trans.app, nested=True) for param in all_params] diff --git a/lib/galaxy/work/context.py b/lib/galaxy/work/context.py index 025fb7aac920..81db0f7e1c6c 100644 --- a/lib/galaxy/work/context.py +++ b/lib/galaxy/work/context.py @@ -175,7 +175,7 @@ def set_history(self, history): def proxy_work_context_for_history( trans: ProvidesHistoryContext, history: Optional[History] = None, workflow_building_mode=False -): +) -> WorkRequestContext: """Create a WorkContext for supplied context with potentially different history. This provides semi-structured access to a transaction/work context with a supplied target diff --git a/lib/galaxy/workflow/modules.py b/lib/galaxy/workflow/modules.py index 9a35e4f9b07b..5ecded6a0ec1 100644 --- a/lib/galaxy/workflow/modules.py +++ b/lib/galaxy/workflow/modules.py @@ -30,6 +30,7 @@ ) from galaxy.job_execution.actions.post import ActionBox from galaxy.model import ( + Job, PostJobAction, Workflow, WorkflowInvocationStep, @@ -2290,19 +2291,7 @@ def callback(input, prefixed_name: str, **kwargs): param_combinations.append(execution_state.inputs) complete = False - completed_jobs = {} - for i, param in enumerate(param_combinations): - if use_cached_job: - completed_jobs[i] = tool.job_search.by_tool_input( - trans=trans, - tool_id=tool.id, - tool_version=tool.version, - param=param, - param_dump=tool.params_to_strings(param, trans.app, nested=True), - job_state=None, - ) - else: - completed_jobs[i] = None + completed_jobs: Dict[int, Optional[Job]] = tool.completed_jobs(trans, use_cached_job, param_combinations) try: mapping_params = MappingParameters(tool_state.inputs, param_combinations) max_num_jobs = progress.maximum_jobs_to_schedule_or_none