Skip to content

Commit

Permalink
Include failing step for unexpected failures, turn more exceptions in…
Browse files Browse the repository at this point in the history
…to MessageExceptions
  • Loading branch information
mvdbeek committed Oct 25, 2023
1 parent 285db62 commit c351301
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,12 @@ const workflow = computed(() => {
});
const workflowStep = computed(() => {
if ("workflow_step_id" in props.invocationMessage && workflow.value) {
if (
"workflow_step_id" in props.invocationMessage &&
props.invocationMessage.workflow_step_id !== undefined &&
props.invocationMessage.workflow_step_id !== null &&
workflow.value
) {
return workflow.value.steps[props.invocationMessage.workflow_step_id];
}
return undefined;
Expand Down Expand Up @@ -146,10 +151,14 @@ const infoString = computed(() => {
invocationMessage.workflow_step_id + 1
} is a conditional step and the result of the when expression is not a boolean type.`;
} else if (reason === "unexpected_failure") {
let atStep = "";
if (invocationMessage.workflow_step_id !== null && invocationMessage.workflow_step_id !== undefined) {
atStep = ` at step ${invocationMessage.workflow_step_id + 1}`;
}
if (invocationMessage.details) {
return `${failFragment} an unexpected failure occurred: '${invocationMessage.details}'`;
return `${failFragment} an unexpected failure occurred${atStep}: '${invocationMessage.details}'`;
}
return `${failFragment} an unexpected failure occurred.`;
return `${failFragment} an unexpected failure occurred${atStep}.`;
} else if (reason === "workflow_output_not_found") {
return `Defined workflow output '${invocationMessage.output_name}' was not found in step ${
invocationMessage.workflow_step_id + 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ export interface GenericInvocationFailureCollectionFailedEncodedDatabaseIdField
/**
* HistoryDatasetCollectionAssociation ID that relates to failure.
*/
hdca_id?: string;
hdca_id: string;
/**
* Workflow step id of step that caused failure.
*/
Expand All @@ -92,7 +92,7 @@ export interface GenericInvocationFailureCollectionFailedInt {
/**
* HistoryDatasetCollectionAssociation ID that relates to failure.
*/
hdca_id?: number;
hdca_id: number;
/**
* Workflow step id of step that caused failure.
*/
Expand Down Expand Up @@ -159,7 +159,7 @@ export interface GenericInvocationFailureJobFailedEncodedDatabaseIdField {
/**
* Job ID that relates to failure.
*/
job_id?: string;
job_id: string;
/**
* Workflow step id of step that caused failure.
*/
Expand All @@ -174,7 +174,7 @@ export interface GenericInvocationFailureJobFailedInt {
/**
* Job ID that relates to failure.
*/
job_id?: number;
job_id: number;
/**
* Workflow step id of step that caused failure.
*/
Expand Down Expand Up @@ -232,11 +232,19 @@ export interface GenericInvocationUnexpectedFailureEncodedDatabaseIdField {
* May contains details to help troubleshoot this problem.
*/
details?: string;
/**
* Workflow step id of step that failed.
*/
workflow_step_id?: number;
}
export interface GenericInvocationUnexpectedFailureInt {
reason: "unexpected_failure";
/**
* May contains details to help troubleshoot this problem.
*/
details?: string;
/**
* Workflow step id of step that failed.
*/
workflow_step_id?: number;
}
6 changes: 5 additions & 1 deletion lib/galaxy/schema/invocation.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@ def get(self, key: Any, default: Any = None) -> Any:
# Fetch the order_index when serializing for the API,
# which makes much more sense when pointing to steps.
if key == "workflow_step_id":
return self._obj.workflow_step.order_index
if self._obj.workflow_step:
return self._obj.workflow_step.order_index
else:
return default
elif key == "dependent_workflow_step_id":
if self._obj.dependent_workflow_step_id:
return self._obj.dependent_workflow_step.order_index
Expand Down Expand Up @@ -132,6 +135,7 @@ class GenericInvocationFailureWhenNotBoolean(InvocationFailureMessageBase[Databa
class GenericInvocationUnexpectedFailure(InvocationMessageBase, Generic[DatabaseIdT]):
reason: Literal[FailureReason.unexpected_failure]
details: Optional[str] = Field(None, description="May contains details to help troubleshoot this problem.")
workflow_step_id: Optional[int] = Field(None, description="Workflow step id of step that failed.")


class GenericInvocationWarning(InvocationMessageBase, Generic[DatabaseIdT]):
Expand Down
22 changes: 12 additions & 10 deletions lib/galaxy/tools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2861,7 +2861,7 @@ def exec_after_process(self, app, inp_data, out_data, param_dict, job=None, **kw
copy_object = input_dataset
break
if copy_object is None:
raise Exception("Failed to find dataset output.")
raise exceptions.MessageException("Failed to find dataset output.")
out_data[key].copy_from(copy_object)

def parse_environment_variables(self, tool_source):
Expand Down Expand Up @@ -3335,7 +3335,7 @@ def produce_outputs(self, trans, out_data, output_collections, incoming, history
elif how == "by_index":
extracted_element = collection[int(incoming["which"]["index"])]
else:
raise Exception("Invalid tool parameters.")
raise exceptions.MessageException("Invalid tool parameters.")
extracted = extracted_element.element_object
extracted_o = extracted.copy(
copy_tags=extracted.tags, new_name=extracted_element.element_identifier, flush=False
Expand Down Expand Up @@ -3375,7 +3375,9 @@ def produce_outputs(self, trans, out_data, output_collections, incoming, history
if element_identifier not in identifiers_map:
identifiers_map[element_identifier] = []
elif dupl_actions == "fail":
raise Exception(f"Duplicate collection element identifiers found for [{element_identifier}]")
raise exceptions.MessageException(
f"Duplicate collection element identifiers found for [{element_identifier}]"
)
identifiers_map[element_identifier].append(input_num)

for copy, input_list in enumerate(input_lists):
Expand Down Expand Up @@ -3567,12 +3569,12 @@ def produce_outputs(self, trans, out_data, output_collections, incoming, history
except KeyError:
hdca_history_name = f"{hdca.hid}: {hdca.name}"
message = f"List of element identifiers does not match element identifiers in collection '{hdca_history_name}'"
raise Exception(message)
raise exceptions.MessageException(message)
else:
message = f"Number of lines must match number of list elements ({len(elements)}), but file has {data_lines} lines"
raise Exception(message)
raise exceptions.MessageException(message)
else:
raise Exception(f"Unknown sort_type '{sorttype}'")
raise exceptions.MessageException(f"Unknown sort_type '{sorttype}'")

if presort_elements is not None:
sorted_elements = [x[1] for x in sorted(presort_elements, key=lambda x: x[0])]
Expand Down Expand Up @@ -3604,7 +3606,7 @@ def produce_outputs(self, trans, out_data, output_collections, incoming, history
def add_copied_value_to_new_elements(new_label, dce_object):
new_label = new_label.strip()
if new_label in new_elements:
raise Exception(
raise exceptions.MessageException(
f"New identifier [{new_label}] appears twice in resulting collection, these values must be unique."
)
if getattr(dce_object, "history_content_type", None) == "dataset":
Expand All @@ -3617,7 +3619,7 @@ def add_copied_value_to_new_elements(new_label, dce_object):
with open(new_labels_path) as fh:
new_labels = fh.readlines(1024 * 1000000)
if strict and len(hdca.collection.elements) != len(new_labels):
raise Exception("Relabel mapping file contains incorrect number of identifiers")
raise exceptions.MessageException("Relabel mapping file contains incorrect number of identifiers")
if how_type == "tabular":
# We have a tabular file, where the first column is an existing element identifier,
# and the second column is the new element identifier.
Expand All @@ -3629,7 +3631,7 @@ def add_copied_value_to_new_elements(new_label, dce_object):
default = None if strict else element_identifier
new_label = new_labels_dict.get(element_identifier, default)
if not new_label:
raise Exception(f"Failed to find new label for identifier [{element_identifier}]")
raise exceptions.MessageException(f"Failed to find new label for identifier [{element_identifier}]")
add_copied_value_to_new_elements(new_label, dce_object)
else:
# If new_labels_dataset_assoc is not a two-column tabular dataset we label with the current line of the dataset
Expand All @@ -3638,7 +3640,7 @@ def add_copied_value_to_new_elements(new_label, dce_object):
add_copied_value_to_new_elements(new_labels[i], dce_object)
for key in new_elements.keys():
if not re.match(r"^[\w\- \.,]+$", key):
raise Exception(f"Invalid new collection identifier [{key}]")
raise exceptions.MessageException(f"Invalid new collection identifier [{key}]")
self._add_datasets_to_history(history, new_elements.values())
output_collections.create_collection(
next(iter(self.outputs.values())), "output", elements=new_elements, propagate_hda_tags=False
Expand Down
10 changes: 9 additions & 1 deletion lib/galaxy/workflow/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,12 +240,20 @@ def invoke(self) -> Dict[int, Any]:
except modules.DelayedWorkflowEvaluation as de:
step_delayed = delayed_steps = True
self.progress.mark_step_outputs_delayed(step, why=de.why)
except Exception:
except Exception as e:
log.exception(
"Failed to schedule %s, problem occurred on %s.",
self.workflow_invocation.workflow.log_str(),
step.log_str(),
)
if isinstance(e, MessageException):
# This is the highest level at which we can inject the step id
# to provide some more context to the exception.
raise modules.FailWorkflowEvaluation(
why=InvocationUnexpectedFailure(
reason=FailureReason.unexpected_failure, details=str(e), workflow_step_id=step.id
)
)
raise

if not step_delayed:
Expand Down

0 comments on commit c351301

Please sign in to comment.