diff --git a/lib/galaxy/model/__init__.py b/lib/galaxy/model/__init__.py index 2c8dae330d83..7d874ce04578 100644 --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -8014,7 +8014,7 @@ class WorkflowStep(Base, RepresentById, UsesCreateAndUpdateTime): type: Mapped[Optional[str]] = mapped_column(String(64)) tool_id: Mapped[Optional[str]] = mapped_column(TEXT) tool_version: Mapped[Optional[str]] = mapped_column(TEXT) - tool_inputs: Mapped[Optional[bytes]] = mapped_column(JSONType) + tool_inputs: Mapped[Dict[str, Any]] = mapped_column(JSONType) tool_errors: Mapped[Optional[bytes]] = mapped_column(JSONType) position: Mapped[Optional[bytes]] = mapped_column(MutableJSONType) config: Mapped[Optional[bytes]] = mapped_column(JSONType) @@ -8084,11 +8084,13 @@ def init_on_load(self): def tool_uuid(self): return self.dynamic_tool and self.dynamic_tool.uuid + @property + def is_input_type(self) -> bool: + return bool(self.type and self.type in self.STEP_TYPE_TO_INPUT_TYPE) + @property def input_type(self): - assert ( - self.type and self.type in self.STEP_TYPE_TO_INPUT_TYPE - ), "step.input_type can only be called on input step types" + assert self.is_input_type, "step.input_type can only be called on input step types" return self.STEP_TYPE_TO_INPUT_TYPE[self.type] @property @@ -8309,6 +8311,15 @@ def log_str(self): f"WorkflowStep[index={self.order_index},type={self.type},label={self.label},uuid={self.uuid},id={self.id}]" ) + @property + def effective_label(self) -> Optional[str]: + label = self.label + if label is not None: + return label + elif self.is_input_type: + return cast(Optional[str], self.tool_inputs.get("name")) + return None + def clear_module_extras(self): # the module code adds random dynamic state to the step, this # attempts to clear that. @@ -9111,6 +9122,50 @@ def attach_step(request_to_content): attach_step(request_to_content) self.input_step_parameters.append(request_to_content) + def recover_inputs(self) -> Tuple[Dict[str, Any], str]: + inputs = {} + inputs_by = "name" + + have_referenced_steps_by_order_index = False + + def best_step_reference(workflow_step: "WorkflowStep") -> str: + label = workflow_step.effective_label + if label is not None: + return label + nonlocal have_referenced_steps_by_order_index + have_referenced_steps_by_order_index = True + return str(workflow_step.order_index) + + def ensure_step(step: Optional["WorkflowStep"]) -> "WorkflowStep": + if step is None: + raise galaxy.exceptions.InconsistentDatabase( + "workflow input found without step definition, this should not happen" + ) + assert step + return step + + for input_dataset_assoc in self.input_datasets: + workflow_step = ensure_step(input_dataset_assoc.workflow_step) + input_dataset = input_dataset_assoc.dataset + input_index = best_step_reference(workflow_step) + inputs[input_index] = input_dataset + + for input_dataset_collection_assoc in self.input_dataset_collections: + workflow_step = ensure_step(input_dataset_collection_assoc.workflow_step) + input_dataset_collection = input_dataset_collection_assoc.dataset_collection + input_index = best_step_reference(workflow_step) + inputs[input_index] = input_dataset_collection + + for input_step_parameter_assoc in self.input_step_parameters: + workflow_step = ensure_step(input_step_parameter_assoc.workflow_step) + value = input_step_parameter_assoc.parameter_value + input_index = best_step_reference(workflow_step) + inputs[input_index] = value + + if have_referenced_steps_by_order_index: + inputs_by = "name|step_index" + return inputs, inputs_by + def add_message(self, message: "InvocationMessageUnion"): self.messages.append( WorkflowInvocationMessage( # type:ignore[abstract] diff --git a/lib/galaxy/schema/invocation.py b/lib/galaxy/schema/invocation.py index 1040aac4d1b4..d1f992f02dd7 100644 --- a/lib/galaxy/schema/invocation.py +++ b/lib/galaxy/schema/invocation.py @@ -47,6 +47,19 @@ UpdateTimeField, WithModelClass, ) +from .workflows import ( + INPUTS_BY_DESCRIPTION, + PreferredIntermediateObjectStoreIdField, + PreferredObjectStoreIdField, + PreferredOutputsObjectStoreIdField, + ReplacementParametersField, + ResourceParametersField, + STEP_PARAMETERS_DESCRIPTION, + STEP_PARAMETERS_NORMALIZED_DESCRIPTION, + STEP_PARAMETERS_NORMALIZED_TITLE, + STEP_PARAMETERS_TITLE, + UseCachedJobField, +) INVOCATION_STEP_OUTPUT_SRC = Literal["hda"] INVOCATION_STEP_COLLECTION_OUTPUT_SRC = Literal["hdca"] @@ -531,13 +544,16 @@ class InvocationOutputCollection(InvocationIOBase): ) +InvocationWorkflowIdField = Field( + title="Workflow ID", description="The encoded Workflow ID associated with the invocation." +) + + class WorkflowInvocationCollectionView(Model, WithModelClass): id: EncodedDatabaseIdField = InvocationIdField create_time: datetime = CreateTimeField update_time: datetime = UpdateTimeField - workflow_id: EncodedDatabaseIdField = Field( - title="Workflow ID", description="The encoded Workflow ID associated with the invocation." - ) + workflow_id: EncodedDatabaseIdField = InvocationWorkflowIdField history_id: EncodedDatabaseIdField = Field( default=..., title="History ID", @@ -583,6 +599,43 @@ class WorkflowInvocationResponse(RootModel): ] +class WorkflowInvocationRequestModel(Model): + """Model a workflow invocation request (InvokeWorkflowPayload) for an existing invocation.""" + + history_id: str = Field( + ..., + title="History ID", + description="The encoded history id the workflow was run in.", + ) + workflow_id: str = Field(title="Workflow ID", description="The encoded Workflow ID associated with the invocation.") + inputs: Dict[str, Any] = Field( + ..., + title="Inputs", + description="Values for inputs", + ) + inputs_by: str = Field( + ..., + title="Inputs by", + description=INPUTS_BY_DESCRIPTION, + ) + replacement_params: Optional[Dict[str, Any]] = ReplacementParametersField + resource_params: Optional[Dict[str, Any]] = ResourceParametersField + use_cached_job: bool = UseCachedJobField + preferred_object_store_id: Optional[str] = PreferredObjectStoreIdField + preferred_intermediate_object_store_id: Optional[str] = PreferredIntermediateObjectStoreIdField + preferred_outputs_object_store_id: Optional[str] = PreferredOutputsObjectStoreIdField + parameters_normalized: Literal[True] = Field( + True, + title=STEP_PARAMETERS_NORMALIZED_TITLE, + description=STEP_PARAMETERS_NORMALIZED_DESCRIPTION, + ) + parameters: Optional[Dict[str, Any]] = Field( + None, + title=STEP_PARAMETERS_TITLE, + description=f"{STEP_PARAMETERS_DESCRIPTION} If these are set, the workflow was not executed in a best-practice fashion and we the resulting invocation request may not fully reflect the executed workflow state.", + ) + + class InvocationJobsSummaryBaseModel(Model): id: EncodedDatabaseIdField = InvocationIdField states: Dict[JobState, int] = Field( diff --git a/lib/galaxy/schema/workflows.py b/lib/galaxy/schema/workflows.py index 4e80dcb7c55e..960e2dc3e978 100644 --- a/lib/galaxy/schema/workflows.py +++ b/lib/galaxy/schema/workflows.py @@ -1,6 +1,7 @@ import json from typing import ( Any, + cast, Dict, List, Optional, @@ -8,6 +9,7 @@ ) from pydantic import ( + AfterValidator, Field, field_validator, ) @@ -22,13 +24,73 @@ Organization, PauseStep, Person, - PreferredObjectStoreIdField, StoredWorkflowSummary, SubworkflowStep, ToolStep, WorkflowInput, ) +TargetHistoryIdField = Field( + None, + title="History ID", + # description="The history to import the workflow into.", + description="The encoded history id into which to import.", +) +INPUTS_BY_DESCRIPTION = ( + "How the 'inputs' field maps its inputs (datasets/collections/step parameters) to workflows steps." +) +STEP_PARAMETERS_NORMALIZED_TITLE = "Legacy Step Parameters Normalized" +STEP_PARAMETERS_NORMALIZED_DESCRIPTION = "Indicates if legacy parameters are already normalized to be indexed by the order_index and are specified as a dictionary per step. Legacy-style parameters could previously be specified as one parameter per step or by tool ID." +STEP_PARAMETERS_TITLE = "Legacy Step Parameters" +STEP_PARAMETERS_DESCRIPTION = "Parameters specified per-step for the workflow invocation, this is legacy and you should generally use inputs and only specify the formal parameters of a workflow instead." +ReplacementParametersField = Field( + {}, + title="Replacement Parameters", + description="Class of parameters mostly used for string replacement in PJAs. In best practice workflows, these should be replaced with input parameters", +) +UseCachedJobField = Field( + False, + title="Use cached job", + description="Indicated whether to use a cached job for workflow invocation.", +) +PreferredObjectStoreIdField = Field( + default=None, + title="Preferred Object Store ID", + description="The ID of the object store that should be used to store all datasets (can instead specify object store IDs for intermediate and outputs datasts separately) - - Galaxy's job configuration may override this in some cases but this workflow preference will override tool and user preferences", +) +PreferredIntermediateObjectStoreIdField = Field( + None, + title="Preferred Intermediate Object Store ID", + description="The ID of the object store that should be used to store the intermediate datasets of this workflow - - Galaxy's job configuration may override this in some cases but this workflow preference will override tool and user preferences", +) +PreferredOutputsObjectStoreIdField = Field( + None, + title="Preferred Outputs Object Store ID", + description="The ID of the object store that should be used to store the marked output datasets of this workflow - Galaxy's job configuration may override this in some cases but this workflow preference will override tool and user preferences.", +) +ResourceParametersField = Field( + {}, + title="Resource Parameters", + description="If a workflow_resource_params_file file is defined and the target workflow is configured to consumer resource parameters, they can be specified with this parameter. See https://github.com/galaxyproject/galaxy/pull/4830 for more information.", +) + +VALID_INPUTS_BY_ITEMS = ["step_id", "step_index", "step_uuid", "name"] + + +def validateInputsBy(inputsBy: Optional[str]) -> Optional[str]: + if inputsBy is not None: + if not isinstance(inputsBy, str): + raise ValueError(f"Invalid type for inputsBy {inputsBy}") + inputsByStr = cast(str, inputsBy) + inputsByArray: List[str] = inputsByStr.split("|") + for inputsByItem in inputsByArray: + if inputsByItem not in VALID_INPUTS_BY_ITEMS: + raise ValueError(f"Invalid inputsBy delineation {inputsByItem}") + return inputsBy + + +InputsByValidator = AfterValidator(validateInputsBy) + class GetTargetHistoryPayload(Model): # TODO - Are the descriptions correct? @@ -38,12 +100,7 @@ class GetTargetHistoryPayload(Model): # description="The encoded history id - passed exactly like this 'hist_id=...' - to import the workflow into. Or the name of the new history to import the workflow into.", description="The encoded history id - passed exactly like this 'hist_id=...' - into which to import. Or the name of the new history into which to import.", ) - history_id: Optional[str] = Field( - None, - title="History ID", - # description="The history to import the workflow into.", - description="The encoded history id into which to import.", - ) + history_id: Optional[str] = TargetHistoryIdField new_history_name: Optional[str] = Field( None, title="New History Name", @@ -85,15 +142,11 @@ class InvokeWorkflowPayload(GetTargetHistoryPayload): title="Allow tool state corrections", description="Indicates if tool state corrections are allowed for workflow invocation.", ) - use_cached_job: Optional[bool] = Field( - False, - title="Use cached job", - description="Indicated whether to use a cached job for workflow invocation.", - ) + use_cached_job: Optional[bool] = UseCachedJobField parameters_normalized: Optional[bool] = Field( False, - title="Parameters Normalized", - description="Indicates if parameters are already normalized for workflow invocation.", + title=STEP_PARAMETERS_NORMALIZED_TITLE, + description=STEP_PARAMETERS_NORMALIZED_DESCRIPTION, ) @field_validator( @@ -102,7 +155,6 @@ class InvokeWorkflowPayload(GetTargetHistoryPayload): "ds_map", "resource_params", "replacement_params", - "step_parameters", mode="before", check_fields=False, ) @@ -114,34 +166,22 @@ def inputs_string_to_json(cls, v): parameters: Optional[Dict[str, Any]] = Field( {}, - title="Parameters", - description="The raw parameters for the workflow invocation.", + title=STEP_PARAMETERS_TITLE, + description=STEP_PARAMETERS_DESCRIPTION, ) inputs: Optional[Dict[str, Any]] = Field( None, title="Inputs", - description="TODO", + description="Specify values for formal inputs to the workflow", ) ds_map: Optional[Dict[str, Dict[str, Any]]] = Field( {}, - title="Dataset Map", - description="TODO", - ) - resource_params: Optional[Dict[str, Any]] = Field( - {}, - title="Resource Parameters", - description="TODO", - ) - replacement_params: Optional[Dict[str, Any]] = Field( - {}, - title="Replacement Parameters", - description="TODO", - ) - step_parameters: Optional[Dict[str, Any]] = Field( - None, - title="Step Parameters", - description="TODO", + title="Legacy Dataset Map", + description="An older alternative to specifying inputs using database IDs, do not use this and use inputs instead", + deprecated=True, ) + resource_params: Optional[Dict[str, Any]] = ResourceParametersField + replacement_params: Optional[Dict[str, Any]] = ReplacementParametersField no_add_to_history: Optional[bool] = Field( False, title="No Add to History", @@ -152,11 +192,11 @@ def inputs_string_to_json(cls, v): title="Legacy", description="Indicating if to use legacy workflow invocation.", ) - inputs_by: Optional[str] = Field( + inputs_by: Annotated[Optional[str], InputsByValidator] = Field( None, title="Inputs By", # lib/galaxy/workflow/run_request.py - see line 60 - description="How inputs maps to inputs (datasets/collections) to workflows steps.", + description=INPUTS_BY_DESCRIPTION, ) effective_outputs: Optional[Any] = Field( None, @@ -164,17 +204,9 @@ def inputs_string_to_json(cls, v): # lib/galaxy/workflow/run_request.py - see line 455 description="TODO", ) - preferred_intermediate_object_store_id: Optional[str] = Field( - None, - title="Preferred Intermediate Object Store ID", - description="The ID of the ? object store that should be used to store ? datasets in this history.", - ) - preferred_outputs_object_store_id: Optional[str] = Field( - None, - title="Preferred Outputs Object Store ID", - description="The ID of the object store that should be used to store ? datasets in this history.", - ) preferred_object_store_id: Optional[str] = PreferredObjectStoreIdField + preferred_intermediate_object_store_id: Optional[str] = PreferredIntermediateObjectStoreIdField + preferred_outputs_object_store_id: Optional[str] = PreferredOutputsObjectStoreIdField class StoredWorkflowDetailed(StoredWorkflowSummary): diff --git a/lib/galaxy/webapps/galaxy/api/workflows.py b/lib/galaxy/webapps/galaxy/api/workflows.py index 719029e818d6..703d652f0f74 100644 --- a/lib/galaxy/webapps/galaxy/api/workflows.py +++ b/lib/galaxy/webapps/galaxy/api/workflows.py @@ -64,6 +64,7 @@ InvocationStepJobsResponseJobModel, InvocationStepJobsResponseStepModel, InvocationUpdatePayload, + WorkflowInvocationRequestModel, WorkflowInvocationResponse, ) from galaxy.schema.schema import ( @@ -1469,6 +1470,17 @@ def show_invocation( ) return self.invocations_service.show(trans, invocation_id, serialization_params, eager=True) + @router.get( + "/api/invocations/{invocation_id}/request", + summary="Get a description modeling an API request to invoke this workflow - this is recreated and will be more specific in some ways than the initial creation request.", + ) + def invocation_as_request( + self, + invocation_id: InvocationIDPathParam, + trans: ProvidesUserContext = DependsOnTrans, + ) -> WorkflowInvocationRequestModel: + return self.invocations_service.as_request(trans, invocation_id) + @router.get( "/api/workflows/{workflow_id}/invocations/{invocation_id}", summary="Get detailed description of a workflow invocation.", diff --git a/lib/galaxy/webapps/galaxy/services/invocations.py b/lib/galaxy/webapps/galaxy/services/invocations.py index 2195c7202dd7..8b44d9fd242c 100644 --- a/lib/galaxy/webapps/galaxy/services/invocations.py +++ b/lib/galaxy/webapps/galaxy/services/invocations.py @@ -1,3 +1,4 @@ +import json import logging from typing import ( Any, @@ -14,9 +15,13 @@ ) from galaxy.exceptions import ( AdminRequiredException, + InconsistentDatabase, ObjectNotFound, ) -from galaxy.managers.context import ProvidesHistoryContext +from galaxy.managers.context import ( + ProvidesHistoryContext, + ProvidesUserContext, +) from galaxy.managers.histories import HistoryManager from galaxy.managers.jobs import ( fetch_job_states, @@ -24,8 +29,11 @@ ) from galaxy.managers.workflows import WorkflowsManager from galaxy.model import ( + HistoryDatasetAssociation, + HistoryDatasetCollectionAssociation, WorkflowInvocation, WorkflowInvocationStep, + WorkflowRequestInputParameter, ) from galaxy.schema.fields import DecodedDatabaseIdField from galaxy.schema.invocation import ( @@ -33,6 +41,7 @@ InvocationSerializationParams, InvocationSerializationView, InvocationStep, + WorkflowInvocationRequestModel, WorkflowInvocationResponse, ) from galaxy.schema.schema import ( @@ -133,6 +142,12 @@ def show(self, trans, invocation_id, serialization_params, eager=False): ) return self.serialize_workflow_invocation(wfi, serialization_params) + def as_request(self, trans: ProvidesUserContext, invocation_id) -> WorkflowInvocationRequestModel: + wfi = self._workflows_manager.get_invocation( + trans, invocation_id, True, check_ownership=True, check_accessible=True + ) + return self.serialize_workflow_invocation_to_request(trans, wfi) + def cancel(self, trans, invocation_id, serialization_params): wfi = self._workflows_manager.request_invocation_cancellation(trans, invocation_id) return self.serialize_workflow_invocation(wfi, serialization_params) @@ -252,3 +267,70 @@ def create_from_store( history=history, ) return self.serialize_workflow_invocations(object_tracker.invocations_by_key.values(), serialization_params) + + def serialize_workflow_invocation_to_request( + self, trans: ProvidesUserContext, invocation: WorkflowInvocation + ) -> WorkflowInvocationRequestModel: + history_id = trans.security.encode_id(invocation.history.id) + workflow_id = trans.security.encode_id(invocation.workflow.id) + inputs, inputs_by = invocation.recover_inputs() + export_inputs = {} + for key, value in inputs.items(): + if isinstance(value, HistoryDatasetAssociation): + export_inputs[key] = {"src": "hda", "id": trans.security.encode_id(value.id)} + elif isinstance(value, HistoryDatasetCollectionAssociation): + export_inputs[key] = {"src": "hdca", "id": trans.security.encode_id(value.id)} + else: + export_inputs[key] = value + + param_types = WorkflowRequestInputParameter.types + steps_by_id = invocation.workflow.steps_by_id + + replacement_dict = {} + resource_params = {} + use_cached_job = False + preferred_object_store_id = None + preferred_intermediate_object_store_id = None + preferred_outputs_object_store_id = None + step_param_map: Dict[str, Dict] = {} + for parameter in invocation.input_parameters: + parameter_type = parameter.type + + if parameter_type == param_types.REPLACEMENT_PARAMETERS: + replacement_dict[parameter.name] = parameter.value + elif parameter_type == param_types.META_PARAMETERS: + # copy_inputs_to_history is being skipped here sort of intentionally, + # we wouldn't want to include this on re-run. + if parameter.name == "use_cached_job": + use_cached_job = parameter.value == "true" + if parameter.name == "preferred_object_store_id": + preferred_object_store_id = parameter.value + if parameter.name == "preferred_intermediate_object_store_id": + preferred_intermediate_object_store_id = parameter.value + if parameter.name == "preferred_outputs_object_store_id": + preferred_outputs_object_store_id = parameter.value + elif parameter_type == param_types.RESOURCE_PARAMETERS: + resource_params[parameter.name] = parameter.value + elif parameter_type == param_types.STEP_PARAMETERS: + # TODO: test subworkflows and ensure this works + step_id: int + try: + step_id = int(parameter.name) + except TypeError: + raise InconsistentDatabase("saved workflow step parameters not in the format expected") + step_param_map[str(steps_by_id[step_id].order_index)] = json.loads(parameter.value) + + return WorkflowInvocationRequestModel( + history_id=history_id, + workflow_id=workflow_id, + inputs=export_inputs, + inputs_by=inputs_by, + replacement_params=None if not replacement_dict else replacement_dict, + resource_params=None if not resource_params else resource_params, + use_cached_job=use_cached_job, + preferred_object_store_id=preferred_object_store_id, + preferred_intermediate_object_store_id=preferred_intermediate_object_store_id, + preferred_outputs_object_store_id=preferred_outputs_object_store_id, + parameters_normalized=True, + parameters=None if not step_param_map else step_param_map, + ) diff --git a/lib/galaxy/workflow/extract.py b/lib/galaxy/workflow/extract.py index 6072f322d9cb..d96be13b35cc 100644 --- a/lib/galaxy/workflow/extract.py +++ b/lib/galaxy/workflow/extract.py @@ -116,7 +116,7 @@ def extract_steps( if name not in step_labels: step.label = name step_labels.add(name) - step.tool_inputs = dict(name=name) # type:ignore[assignment] + step.tool_inputs = dict(name=name) hid_to_output_pair[hid] = (step, "output") steps.append(step) for i, hid in enumerate(dataset_collection_ids): @@ -132,7 +132,7 @@ def extract_steps( if name not in step_labels: step.label = name step_labels.add(name) - step.tool_inputs = dict(name=name, collection_type=collection_type) # type:ignore[assignment] + step.tool_inputs = dict(name=name, collection_type=collection_type) hid_to_output_pair[hid] = (step, "output") steps.append(step) # Tool steps diff --git a/lib/galaxy/workflow/run_request.py b/lib/galaxy/workflow/run_request.py index 24b81fff849d..b8f6b753a924 100644 --- a/lib/galaxy/workflow/run_request.py +++ b/lib/galaxy/workflow/run_request.py @@ -121,7 +121,9 @@ def _normalize_inputs( elif inputs_by_el == "step_uuid": possible_input_keys.append(str(step.uuid)) elif inputs_by_el == "name": - possible_input_keys.append(step.label or step.tool_inputs.get("name")) # type:ignore[union-attr] + label = step.effective_label + if label: + possible_input_keys.append(label) else: raise exceptions.MessageException( "Workflow cannot be run because unexpected inputs_by value specified." @@ -317,7 +319,7 @@ def build_workflow_run_configs( add_to_history = "no_add_to_history" not in payload legacy = payload.get("legacy", False) already_normalized = payload.get("parameters_normalized", False) - raw_parameters = payload.get("parameters", {}) + raw_parameters = payload.get("parameters") or {} requires_materialization: bool = False run_configs = [] unexpanded_param_map = _normalize_step_parameters( diff --git a/lib/galaxy_test/api/test_workflows.py b/lib/galaxy_test/api/test_workflows.py index 8408837df347..14b8b99b17dd 100644 --- a/lib/galaxy_test/api/test_workflows.py +++ b/lib/galaxy_test/api/test_workflows.py @@ -3460,6 +3460,31 @@ def test_workflow_request(self): invocation_id = run_workflow_response.json()["id"] self.workflow_populator.wait_for_invocation_and_jobs(history_id, workflow_id, invocation_id) + def test_workflow_request_recover(self): + workflow = self.workflow_populator.load_workflow(name="test_for_queue") + workflow_request, history_id, workflow_id = self._setup_workflow_run(workflow) + run_workflow_response = self.workflow_populator.invoke_workflow_raw( + workflow_id, workflow_request, assert_ok=True + ) + invocation_id = run_workflow_response.json()["id"] + self.workflow_populator.wait_for_invocation_and_jobs(history_id, workflow_id, invocation_id) + request = self.workflow_populator.invocation_to_request(invocation_id) + assert request["history_id"] == history_id + assert request["replacement_params"] is None + assert request["use_cached_job"] is False + assert request["preferred_object_store_id"] is None + assert request["preferred_intermediate_object_store_id"] is None + assert request["preferred_outputs_object_store_id"] is None + assert request["parameters_normalized"] is True + assert request["parameters"] is None + + assert request["inputs"]["WorkflowInput1"]["src"] == "hda" + encoded_id = request["inputs"]["WorkflowInput1"]["id"] + assert self.dataset_populator.get_history_dataset_content(history_id, dataset_id=encoded_id).strip() == "1 2 3" + assert request["inputs"]["WorkflowInput2"]["src"] == "hda" + encoded_id = request["inputs"]["WorkflowInput2"]["id"] + assert self.dataset_populator.get_history_dataset_content(history_id, dataset_id=encoded_id).strip() == "4 5 6" + def test_workflow_new_autocreated_history(self): workflow = self.workflow_populator.load_workflow(name="test_for_new_autocreated_history") workflow_request, history_id, workflow_id = self._setup_workflow_run(workflow) @@ -4898,7 +4923,7 @@ def test_run_with_non_optional_data_unspecified_fails_invocation(self): def test_run_with_optional_collection_specified(self): with self.dataset_populator.test_history() as history_id: - self._run_jobs( + result = self._run_workflow( WORKFLOW_OPTIONAL_TRUE_INPUT_COLLECTION, test_data=""" input1: @@ -4919,6 +4944,12 @@ def test_run_with_optional_collection_specified(self): content = self.dataset_populator.get_history_dataset_content(history_id) assert "GAATTGATCAGGACATAGGACAACTGTAGGCACCAT" in content + invocation_id = result.invocation_id + request = self.workflow_populator.invocation_to_request(invocation_id) + assert request["history_id"] == history_id + assert request["inputs"]["input1"]["src"] == "hdca" + assert request["inputs"]["input1"]["id"] + def test_run_with_optional_collection_unspecified(self): with self.dataset_populator.test_history() as history_id: self._run_jobs( @@ -5008,9 +5039,14 @@ def test_run_with_int_parameter(self): # self.dataset_populator.wait_for_history(history_id, assert_ok=True) content = self.dataset_populator.get_history_dataset_content(history_id) assert len(content.splitlines()) == 1, content - invocation = self.workflow_populator.get_invocation(run_response.invocation_id) + invocation_id = run_response.invocation_id + invocation = self.workflow_populator.get_invocation(invocation_id) assert invocation["input_step_parameters"]["int_input"]["parameter_value"] == 1 + request = self.workflow_populator.invocation_to_request(invocation_id) + assert request["history_id"] == history_id + assert request["inputs"]["int_input"] == 1 + run_response = self._run_workflow( WORKFLOW_PARAMETER_INPUT_INTEGER_OPTIONAL, test_data=""" @@ -5359,6 +5395,10 @@ def test_workflow_rerun_with_use_cached_job(self): workflow_id, new_workflow_request, assert_ok=True ).json() invocation_id = new_workflow_response["id"] + + request = self.workflow_populator.invocation_to_request(invocation_id) + assert request["use_cached_job"] is True + self.workflow_populator.wait_for_invocation_and_jobs(history_id_two, workflow_id, invocation_id) # get_history_dataset_details defaults to last item in history, so since we've done @@ -5558,6 +5598,9 @@ def test_run_with_pja(self): content = self.dataset_populator.get_history_dataset_details(history_id, wait=True, assert_ok=True) assert content["name"] == "foo was replaced" + request = self.workflow_populator.invocation_to_request(invocation_id) + assert request["replacement_params"]["replaceme"] == "was replaced" + @skip_without_tool("hidden_param") def test_hidden_param_in_workflow(self): with self.dataset_populator.test_history() as history_id: @@ -7060,7 +7103,7 @@ def test_run_replace_params_nested_normalized(self): @skip_without_tool("random_lines1") def test_run_replace_params_over_default(self): with self.dataset_populator.test_history() as history_id: - self._run_jobs( + wf_run = self._run_workflow( WORKFLOW_ONE_STEP_DEFAULT, test_data=""" step_parameters: @@ -7078,6 +7121,13 @@ def test_run_replace_params_over_default(self): result = self.dataset_populator.get_history_dataset_content(history_id) assert result.count("\n") == 4 + request = self.workflow_populator.invocation_to_request(wf_run.invocation_id) + assert request["parameters"]["1"]["num_lines"] == 4 + + self.workflow_populator.rerun(wf_run) + result = self.dataset_populator.get_history_dataset_content(history_id) + assert result.count("\n") == 4 + @skip_without_tool("random_lines1") def test_defaults_editor(self): workflow_id = self._upload_yaml_workflow(WORKFLOW_ONE_STEP_DEFAULT, publish=True) diff --git a/lib/galaxy_test/base/populators.py b/lib/galaxy_test/base/populators.py index a31c5fe73956..5a442982e5ce 100644 --- a/lib/galaxy_test/base/populators.py +++ b/lib/galaxy_test/base/populators.py @@ -1953,7 +1953,7 @@ def invoke_workflow_raw(self, workflow_id: str, request: dict, assert_ok: bool = url = f"workflows/{workflow_id}/invocations" invocation_response = self._post(url, data=request, json=True) if assert_ok: - invocation_response.raise_for_status() + api_asserts.assert_status_code_is_ok(invocation_response) return invocation_response def invoke_workflow( @@ -2042,6 +2042,11 @@ def download_workflow( else: return ordered_load(response.text) + def invocation_to_request(self, invocation_id: str): + request_response = self._get(f"invocations/{invocation_id}/request") + api_asserts.assert_status_code_is_ok(request_response) + return request_response.json() + def set_tags(self, workflow_id: str, tags: List[str]) -> None: update_payload = {"tags": tags} response = self.update_workflow(workflow_id, update_payload) @@ -2140,7 +2145,54 @@ def run_workflow( workflow_request.update(extra_invocation_kwds) if has_uploads: self.dataset_populator.wait_for_history(history_id, assert_ok=True) + + return self._request_to_summary( + history_id, + workflow_id, + workflow_request, + inputs=inputs, + wait=wait, + assert_ok=assert_ok, + invocations=invocations, + expected_response=expected_response, + ) + + def rerun(self, run_jobs_summary: "RunJobsSummary", wait: bool = True, assert_ok: bool = True) -> "RunJobsSummary": + history_id = run_jobs_summary.history_id + invocation_id = run_jobs_summary.invocation_id + workflow_id = run_jobs_summary.workflow_id + inputs = run_jobs_summary.inputs + workflow_request = self.invocation_to_request(invocation_id) + assert workflow_request["history_id"] == history_id + if workflow_request["workflow_id"] != workflow_id: + raise AssertionError( + f"response workflow id {workflow_request['workflow_id']} does not match record workflow {workflow_id}" + ) + return self._request_to_summary( + history_id, + workflow_id, + workflow_request, + inputs=inputs, + wait=wait, + assert_ok=assert_ok, + invocations=1, + expected_response=200, + ) + + def _request_to_summary( + self, + history_id: str, + workflow_id: str, + workflow_request: Dict[str, Any], + inputs, + wait: bool, + assert_ok: bool, + invocations: int, + expected_response: int, + ): + workflow_populator = self assert invocations > 0 + jobs = [] for _ in range(invocations): invocation_response = workflow_populator.invoke_workflow_raw(workflow_id, workflow_request) @@ -2156,6 +2208,7 @@ def run_workflow( if wait: workflow_populator.wait_for_workflow(workflow_id, invocation_id, history_id, assert_ok=assert_ok) jobs.extend(self.dataset_populator.invocation_jobs(invocation_id)) + return RunJobsSummary( history_id=history_id, workflow_id=workflow_id, diff --git a/lib/galaxy_test/workflow/test_framework_workflows.py b/lib/galaxy_test/workflow/test_framework_workflows.py index a358433025ca..264f039d2751 100644 --- a/lib/galaxy_test/workflow/test_framework_workflows.py +++ b/lib/galaxy_test/workflow/test_framework_workflows.py @@ -21,6 +21,7 @@ get_metadata_to_test, verify_collection, ) +from galaxy.util import asbool from galaxy_test.api._framework import ApiTestCase from galaxy_test.base.populators import ( DatasetCollectionPopulator, @@ -30,6 +31,7 @@ ) SCRIPT_DIRECTORY = os.path.abspath(os.path.dirname(__file__)) +TEST_WORKFLOW_AFTER_RERUN = asbool(os.environ.get("GALAXY_TEST_WORKFLOW_AFTER_RERUN", "0")) def find_workflows(): @@ -67,6 +69,8 @@ def test_workflow(self, workflow_path: Path, test_job: TestJobDict): test_data=test_job["job"], history_id=history_id, ) + if TEST_WORKFLOW_AFTER_RERUN: + run_summary = self.workflow_populator.rerun(run_summary) self._verify(run_summary, test_job["outputs"]) def _verify(self, run_summary: RunJobsSummary, output_definitions: OutputsDict): diff --git a/test/integration/objectstore/test_selection_with_user_preferred_object_store.py b/test/integration/objectstore/test_selection_with_user_preferred_object_store.py index 5bf562bc8b5b..fee15742b8a2 100644 --- a/test/integration/objectstore/test_selection_with_user_preferred_object_store.py +++ b/test/integration/objectstore/test_selection_with_user_preferred_object_store.py @@ -244,17 +244,22 @@ def _run_tool(tool_id, inputs, preferred_object_store_id=None): def test_workflow_objectstore_selection(self): with self.dataset_populator.test_history() as history_id: - output_dict, intermediate_dict = self._run_workflow_get_output_storage_info_dicts(history_id) + output_dict, intermediate_dict, _ = self._run_workflow_get_output_storage_info_dicts(history_id) assert_storage_name_is(output_dict, "Default Store") assert_storage_name_is(intermediate_dict, "Default Store") - output_dict, intermediate_dict = self._run_workflow_get_output_storage_info_dicts( + output_dict, intermediate_dict, wf_run = self._run_workflow_get_output_storage_info_dicts( history_id, {"preferred_object_store_id": "static"} ) assert_storage_name_is(output_dict, "Static Storage") assert_storage_name_is(intermediate_dict, "Static Storage") - output_dict, intermediate_dict = self._run_workflow_get_output_storage_info_dicts( + request = self.workflow_populator.invocation_to_request(wf_run.invocation_id) + assert request["preferred_object_store_id"] == "static" + assert request["preferred_outputs_object_store_id"] is None + assert request["preferred_intermediate_object_store_id"] is None + + output_dict, intermediate_dict, wf_run = self._run_workflow_get_output_storage_info_dicts( history_id, { "preferred_outputs_object_store_id": "static", @@ -264,6 +269,11 @@ def test_workflow_objectstore_selection(self): assert_storage_name_is(output_dict, "Static Storage") assert_storage_name_is(intermediate_dict, "Dynamic EBS") + request = self.workflow_populator.invocation_to_request(wf_run.invocation_id) + assert request["preferred_object_store_id"] is None + assert request["preferred_outputs_object_store_id"] == "static" + assert request["preferred_intermediate_object_store_id"] == "dynamic_ebs" + def test_simple_subworkflow_objectstore_selection(self): with self.dataset_populator.test_history() as history_id: output_dict, intermediate_dict = self._run_simple_nested_workflow_get_output_storage_info_dicts( @@ -428,7 +438,6 @@ def _run_nested_workflow_with_effective_output_get_output_storage_info_dicts( extra_invocation_kwds=extra_invocation_kwds, ) jobs = wf_run.jobs_for_tool("cat1") - print(jobs) assert len(jobs) == 2 intermediate_info = self._storage_info_for_job_id(jobs[1]["id"]) @@ -448,11 +457,10 @@ def _run_workflow_get_output_storage_info_dicts( extra_invocation_kwds=extra_invocation_kwds, ) jobs = wf_run.jobs_for_tool("cat") - print(jobs) assert len(jobs) == 2 output_info = self._storage_info_for_job_id(jobs[0]["id"]) intermediate_info = self._storage_info_for_job_id(jobs[1]["id"]) - return output_info, intermediate_info + return output_info, intermediate_info, wf_run def _storage_info_for_job_id(self, job_id: str) -> Dict[str, Any]: job_dict = self.dataset_populator.get_job_details(job_id, full=True).json() diff --git a/test/unit/data/model/test_model_store.py b/test/unit/data/model/test_model_store.py index 99dab6ddcb9e..cbf8afa5a7e2 100644 --- a/test/unit/data/model/test_model_store.py +++ b/test/unit/data/model/test_model_store.py @@ -1066,7 +1066,7 @@ def _setup_collection_invocation(app): workflow_step_1 = model.WorkflowStep() workflow_step_1.order_index = 0 workflow_step_1.type = "data_collection_input" - workflow_step_1.tool_inputs = {} # type:ignore[assignment] + workflow_step_1.tool_inputs = {} sa_session.add(workflow_step_1) workflow_1 = _workflow_from_steps(u, [workflow_step_1]) workflow_1.license = "MIT" @@ -1092,7 +1092,7 @@ def _setup_simple_invocation(app): workflow_step_1 = model.WorkflowStep() workflow_step_1.order_index = 0 workflow_step_1.type = "data_input" - workflow_step_1.tool_inputs = {} # type:ignore[assignment] + workflow_step_1.tool_inputs = {} sa_session.add(workflow_step_1) workflow = _workflow_from_steps(u, [workflow_step_1]) workflow.license = "MIT"