Skip to content

Commit

Permalink
Merge pull request #16933 from martenson/merge231in
Browse files Browse the repository at this point in the history
merge release_23.1 into dev
  • Loading branch information
mvdbeek authored Oct 27, 2023
2 parents 4fcf3b5 + d92bbb1 commit 6038b50
Show file tree
Hide file tree
Showing 17 changed files with 279 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,12 @@ const workflow = computed(() => {
});
const workflowStep = computed(() => {
if ("workflow_step_id" in props.invocationMessage && workflow.value) {
if (
"workflow_step_id" in props.invocationMessage &&
props.invocationMessage.workflow_step_id !== undefined &&
props.invocationMessage.workflow_step_id !== null &&
workflow.value
) {
return workflow.value.steps[props.invocationMessage.workflow_step_id];
}
return undefined;
Expand Down Expand Up @@ -149,10 +154,14 @@ const infoString = computed(() => {
invocationMessage.workflow_step_id + 1
} is a conditional step and the result of the when expression is not a boolean type.`;
} else if (reason === "unexpected_failure") {
let atStep = "";
if (invocationMessage.workflow_step_id !== null && invocationMessage.workflow_step_id !== undefined) {
atStep = ` at step ${invocationMessage.workflow_step_id + 1}`;
}
if (invocationMessage.details) {
return `${failFragment} an unexpected failure occurred: '${invocationMessage.details}'`;
return `${failFragment} an unexpected failure occurred${atStep}: '${invocationMessage.details}'`;
}
return `${failFragment} an unexpected failure occurred.`;
return `${failFragment} an unexpected failure occurred${atStep}.`;
} else if (reason === "workflow_output_not_found") {
return `Defined workflow output '${invocationMessage.output_name}' was not found in step ${
invocationMessage.workflow_step_id + 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ export interface GenericInvocationFailureCollectionFailedEncodedDatabaseIdField
/**
* HistoryDatasetCollectionAssociation ID that relates to failure.
*/
hdca_id?: string;
hdca_id: string;
/**
* Workflow step id of step that caused failure.
*/
Expand All @@ -92,7 +92,7 @@ export interface GenericInvocationFailureCollectionFailedInt {
/**
* HistoryDatasetCollectionAssociation ID that relates to failure.
*/
hdca_id?: number;
hdca_id: number;
/**
* Workflow step id of step that caused failure.
*/
Expand Down Expand Up @@ -159,7 +159,7 @@ export interface GenericInvocationFailureJobFailedEncodedDatabaseIdField {
/**
* Job ID that relates to failure.
*/
job_id?: string;
job_id: string;
/**
* Workflow step id of step that caused failure.
*/
Expand All @@ -174,7 +174,7 @@ export interface GenericInvocationFailureJobFailedInt {
/**
* Job ID that relates to failure.
*/
job_id?: number;
job_id: number;
/**
* Workflow step id of step that caused failure.
*/
Expand Down Expand Up @@ -232,11 +232,19 @@ export interface GenericInvocationUnexpectedFailureEncodedDatabaseIdField {
* May contains details to help troubleshoot this problem.
*/
details?: string;
/**
* Workflow step id of step that failed.
*/
workflow_step_id?: number;
}
export interface GenericInvocationUnexpectedFailureInt {
reason: "unexpected_failure";
/**
* May contains details to help troubleshoot this problem.
*/
details?: string;
/**
* Workflow step id of step that failed.
*/
workflow_step_id?: number;
}
15 changes: 8 additions & 7 deletions lib/galaxy/datatypes/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -600,13 +600,14 @@ def get_datatype_by_extension(self, ext):
return self.datatypes_by_extension.get(ext, None)

def change_datatype(self, data, ext):
data.extension = ext
# call init_meta and copy metadata from itself. The datatype
# being converted *to* will handle any metadata copying and
# initialization.
if data.has_data():
data.set_size()
data.init_meta(copy_from=data)
if data.extension != ext:
data.extension = ext
# call init_meta and copy metadata from itself. The datatype
# being converted *to* will handle any metadata copying and
# initialization.
if data.has_data():
data.set_size()
data.init_meta(copy_from=data)
return data

def load_datatype_converters(self, toolbox, use_cached=False):
Expand Down
10 changes: 10 additions & 0 deletions lib/galaxy/exceptions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,16 @@ class ToolInputsNotReadyException(MessageException):
error_code = error_codes_by_name["TOOL_INPUTS_NOT_READY"]


class ToolInputsNotOKException(MessageException):
def __init__(self, err_msg=None, type="info", *, src: str, id: int, **extra_error_info):
super().__init__(err_msg, type, **extra_error_info)
self.src = src
self.id = id

status_code = 400
error_code = error_codes_by_name["TOOL_INPUTS_NOT_OK"]


class RealUserRequiredException(MessageException):
status_code = 400
error_code = error_codes_by_name["REAL_USER_REQUIRED"]
Expand Down
5 changes: 5 additions & 0 deletions lib/galaxy/exceptions/error_codes.json
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@
"code": 400016,
"message": "Only real users can make this request."
},
{
"name": "TOOL_INPUTS_NOT_OK",
"code": 400017,
"message": "Tool inputs not in required OK state."
},
{
"name": "USER_AUTHENTICATION_FAILED",
"code": 401001,
Expand Down
4 changes: 4 additions & 0 deletions lib/galaxy/jobs/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1198,12 +1198,16 @@ def get_job_runner(self, job_wrapper, get_task_runner=False):
except KeyError:
log.error(f"({job_wrapper.job_id}) Invalid job runner: {runner_name}")
job_wrapper.fail(DEFAULT_JOB_RUNNER_FAILURE_MESSAGE)
return None
if get_task_runner and job_wrapper.can_split() and runner.runner_name != "PulsarJobRunner":
return self.job_runners["tasks"]
return runner

def put(self, job_wrapper):
runner = self.get_job_runner(job_wrapper, get_task_runner=True)
if runner is None:
# Something went wrong, we've already failed the job wrapper
return
if isinstance(job_wrapper, TaskWrapper):
# DBTODO Refactor
log.debug(f"({job_wrapper.job_id}) Dispatching task {job_wrapper.task_id} to task runner")
Expand Down
2 changes: 1 addition & 1 deletion lib/galaxy/metadata/set_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ def set_meta(new_dataset_instance, file_dict):
object_store_update_actions.append(
partial(push_if_necessary, object_store, dataset, external_filename)
)
object_store_update_actions.append(partial(reset_external_filename, dataset))
object_store_update_actions.append(partial(reset_external_filename, dataset))
object_store_update_actions.append(partial(dataset.set_total_size))
object_store_update_actions.append(partial(export_store.add_dataset, dataset))
if dataset_instance_id not in unnamed_id_to_path:
Expand Down
13 changes: 12 additions & 1 deletion lib/galaxy/model/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,18 @@ def wrap(self, value, session):
if isinstance(value, int):
return session.get(galaxy.model.MetadataFile, value)
else:
return session.execute(select(galaxy.model.MetadataFile).filter_by(uuid=value)).scalar_one()
wrapped_value = session.execute(
select(galaxy.model.MetadataFile).filter_by(uuid=value)
).scalar_one_or_none()
if wrapped_value:
return wrapped_value
else:
# If we've simultaneously copied the dataset and we've changed the datatype on the
# copy we may not have committed the MetadataFile yet, so we need to commit the session.
# TODO: It would be great if we can avoid the commit in the future.
with transaction(session):
session.commit()
return session.execute(select(galaxy.model.MetadataFile).filter_by(uuid=value)).scalar_one_or_none()

def make_copy(self, value, target_context: MetadataCollection, source_context):
session = target_context._object_session(target_context.parent)
Expand Down
6 changes: 5 additions & 1 deletion lib/galaxy/schema/invocation.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@ def get(self, key: Any, default: Any = None) -> Any:
# Fetch the order_index when serializing for the API,
# which makes much more sense when pointing to steps.
if key == "workflow_step_id":
return self._obj.workflow_step.order_index
if self._obj.workflow_step:
return self._obj.workflow_step.order_index
else:
return default
elif key == "dependent_workflow_step_id":
if self._obj.dependent_workflow_step_id:
return self._obj.dependent_workflow_step.order_index
Expand Down Expand Up @@ -132,6 +135,7 @@ class GenericInvocationFailureWhenNotBoolean(InvocationFailureMessageBase[Databa
class GenericInvocationUnexpectedFailure(InvocationMessageBase, Generic[DatabaseIdT]):
reason: Literal[FailureReason.unexpected_failure]
details: Optional[str] = Field(None, description="May contains details to help troubleshoot this problem.")
workflow_step_id: Optional[int] = Field(None, description="Workflow step id of step that failed.")


class GenericInvocationWarning(InvocationMessageBase, Generic[DatabaseIdT]):
Expand Down
33 changes: 20 additions & 13 deletions lib/galaxy/tools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@
exceptions,
model,
)
from galaxy.exceptions import ToolInputsNotReadyException
from galaxy.exceptions import (
ToolInputsNotOKException,
ToolInputsNotReadyException,
)
from galaxy.job_execution import output_collect
from galaxy.metadata import get_metadata_compute_strategy
from galaxy.model import (
Expand Down Expand Up @@ -2875,7 +2878,7 @@ def exec_after_process(self, app, inp_data, out_data, param_dict, job=None, **kw
copy_object = input_dataset
break
if copy_object is None:
raise Exception("Failed to find dataset output.")
raise exceptions.MessageException("Failed to find dataset output.")
out_data[key].copy_from(copy_object)

def parse_environment_variables(self, tool_source):
Expand Down Expand Up @@ -3217,8 +3220,10 @@ def check_dataset_state(state):

if self.require_dataset_ok:
if state != model.Dataset.states.OK:
raise ValueError(
f"Tool requires inputs to be in valid state, but dataset {input_dataset} is in state '{input_dataset.state}'"
raise ToolInputsNotOKException(
f"Tool requires inputs to be in valid state, but dataset {input_dataset} is in state '{input_dataset.state}'",
src="hda",
id=input_dataset.id,
)

for input_dataset in input_datasets.values():
Expand Down Expand Up @@ -3347,7 +3352,7 @@ def produce_outputs(self, trans, out_data, output_collections, incoming, history
elif how == "by_index":
extracted_element = collection[int(incoming["which"]["index"])]
else:
raise Exception("Invalid tool parameters.")
raise exceptions.MessageException("Invalid tool parameters.")
extracted = extracted_element.element_object
extracted_o = extracted.copy(
copy_tags=extracted.tags, new_name=extracted_element.element_identifier, flush=False
Expand Down Expand Up @@ -3387,7 +3392,9 @@ def produce_outputs(self, trans, out_data, output_collections, incoming, history
if element_identifier not in identifiers_map:
identifiers_map[element_identifier] = []
elif dupl_actions == "fail":
raise Exception(f"Duplicate collection element identifiers found for [{element_identifier}]")
raise exceptions.MessageException(
f"Duplicate collection element identifiers found for [{element_identifier}]"
)
identifiers_map[element_identifier].append(input_num)

for copy, input_list in enumerate(input_lists):
Expand Down Expand Up @@ -3579,12 +3586,12 @@ def produce_outputs(self, trans, out_data, output_collections, incoming, history
except KeyError:
hdca_history_name = f"{hdca.hid}: {hdca.name}"
message = f"List of element identifiers does not match element identifiers in collection '{hdca_history_name}'"
raise Exception(message)
raise exceptions.MessageException(message)
else:
message = f"Number of lines must match number of list elements ({len(elements)}), but file has {data_lines} lines"
raise Exception(message)
raise exceptions.MessageException(message)
else:
raise Exception(f"Unknown sort_type '{sorttype}'")
raise exceptions.MessageException(f"Unknown sort_type '{sorttype}'")

if presort_elements is not None:
sorted_elements = [x[1] for x in sorted(presort_elements, key=lambda x: x[0])]
Expand Down Expand Up @@ -3616,7 +3623,7 @@ def produce_outputs(self, trans, out_data, output_collections, incoming, history
def add_copied_value_to_new_elements(new_label, dce_object):
new_label = new_label.strip()
if new_label in new_elements:
raise Exception(
raise exceptions.MessageException(
f"New identifier [{new_label}] appears twice in resulting collection, these values must be unique."
)
if getattr(dce_object, "history_content_type", None) == "dataset":
Expand All @@ -3629,7 +3636,7 @@ def add_copied_value_to_new_elements(new_label, dce_object):
with open(new_labels_path) as fh:
new_labels = fh.readlines(1024 * 1000000)
if strict and len(hdca.collection.elements) != len(new_labels):
raise Exception("Relabel mapping file contains incorrect number of identifiers")
raise exceptions.MessageException("Relabel mapping file contains incorrect number of identifiers")
if how_type == "tabular":
# We have a tabular file, where the first column is an existing element identifier,
# and the second column is the new element identifier.
Expand All @@ -3641,7 +3648,7 @@ def add_copied_value_to_new_elements(new_label, dce_object):
default = None if strict else element_identifier
new_label = new_labels_dict.get(element_identifier, default)
if not new_label:
raise Exception(f"Failed to find new label for identifier [{element_identifier}]")
raise exceptions.MessageException(f"Failed to find new label for identifier [{element_identifier}]")
add_copied_value_to_new_elements(new_label, dce_object)
else:
# If new_labels_dataset_assoc is not a two-column tabular dataset we label with the current line of the dataset
Expand All @@ -3650,7 +3657,7 @@ def add_copied_value_to_new_elements(new_label, dce_object):
add_copied_value_to_new_elements(new_labels[i], dce_object)
for key in new_elements.keys():
if not re.match(r"^[\w\- \.,]+$", key):
raise Exception(f"Invalid new collection identifier [{key}]")
raise exceptions.MessageException(f"Invalid new collection identifier [{key}]")
self._add_datasets_to_history(history, new_elements.values())
output_collections.create_collection(
next(iter(self.outputs.values())), "output", elements=new_elements, propagate_hda_tags=False
Expand Down
20 changes: 12 additions & 8 deletions lib/galaxy/tools/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from boltons.iterutils import remap

from galaxy import model
from galaxy.exceptions import ToolInputsNotOKException
from galaxy.model.base import transaction
from galaxy.model.dataset_collections.matching import MatchingCollections
from galaxy.model.dataset_collections.structure import (
Expand Down Expand Up @@ -143,14 +144,17 @@ def execute_single_job(execution_slice, completed_job, skip=False):
if check_inputs_ready:
for params in execution_tracker.param_combinations:
# This will throw an exception if the tool is not ready.
check_inputs_ready(
tool,
trans,
params,
history,
execution_cache=execution_cache,
collection_info=collection_info,
)
try:
check_inputs_ready(
tool,
trans,
params,
history,
execution_cache=execution_cache,
collection_info=collection_info,
)
except ToolInputsNotOKException as e:
execution_tracker.record_error(e)

execution_tracker.ensure_implicit_collections_populated(history, mapping_params.param_template)
job_count = len(execution_tracker.param_combinations)
Expand Down
4 changes: 4 additions & 0 deletions lib/galaxy/webapps/galaxy/controllers/history.py
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,10 @@ def view(self, trans, id=None, show_deleted=False, show_hidden=False, use_panels
history_is_current = history_to_view == trans.history
else:
history_to_view = trans.history
if not history_to_view:
raise exceptions.RequestParameterMissingException(
"No 'id' parameter provided for history, and user does not have a current history."
)
user_is_owner = True
history_is_current = True

Expand Down
10 changes: 10 additions & 0 deletions lib/galaxy/workflow/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -2314,6 +2314,16 @@ def callback(input, prefixed_name, **kwargs):
if execution_tracker.execution_errors:
# TODO: formalize into InvocationFailure ?
message = f"Failed to create {len(execution_tracker.execution_errors)} job(s) for workflow step {step.order_index + 1}: {str(execution_tracker.execution_errors[0])}"
for error in execution_tracker.execution_errors:
# try first to raise a structured invocation error message
if isinstance(error, exceptions.ToolInputsNotOKException) and error.src == "hda":
raise FailWorkflowEvaluation(
why=InvocationFailureDatasetFailed(
reason=FailureReason.dataset_failed,
hda_id=error.id,
workflow_step_id=step.id,
)
)
raise exceptions.MessageException(message)

return complete
Expand Down
10 changes: 9 additions & 1 deletion lib/galaxy/workflow/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,12 +240,20 @@ def invoke(self) -> Dict[int, Any]:
except modules.DelayedWorkflowEvaluation as de:
step_delayed = delayed_steps = True
self.progress.mark_step_outputs_delayed(step, why=de.why)
except Exception:
except Exception as e:
log.exception(
"Failed to schedule %s, problem occurred on %s.",
self.workflow_invocation.workflow.log_str(),
step.log_str(),
)
if isinstance(e, MessageException):
# This is the highest level at which we can inject the step id
# to provide some more context to the exception.
raise modules.FailWorkflowEvaluation(
why=InvocationUnexpectedFailure(
reason=FailureReason.unexpected_failure, details=str(e), workflow_step_id=step.id
)
)
raise

if not step_delayed:
Expand Down
Loading

0 comments on commit 6038b50

Please sign in to comment.