Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merge release_23.1 into dev #16933

Merged
merged 19 commits into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
4246957
Provide error message instead of internal server error
mvdbeek Oct 24, 2023
81e9d52
Fix unbound runner variable when there is an error in the job config
mvdbeek Oct 24, 2023
7d9c230
Merge pull request #16906 from mvdbeek/fix_unbound_runner
mvdbeek Oct 24, 2023
575e05a
Merge pull request #16905 from mvdbeek/no_500_history
dannon Oct 24, 2023
856930e
Fix missing grep input in sort1 tool
bernt-matthias Dec 22, 2022
2bf0601
Merge pull request #16910 from mvdbeek/sort1_fix
nsoranzo Oct 24, 2023
285db62
Improve invocation error reporting when step requires datasets in ok …
mvdbeek Oct 25, 2023
c351301
Include failing step for unexpected failures, turn more exceptions in…
mvdbeek Oct 25, 2023
ce38206
Extend tests for new invocation failure messages
mvdbeek Oct 25, 2023
d411861
Revert always reset external filename
davelopez Oct 25, 2023
f620b11
Test link data with extended metadata
mvdbeek Oct 25, 2023
cd93598
Merge pull request #16917 from mvdbeek/improve_invocation_error_repor…
mvdbeek Oct 25, 2023
f2d9cb6
Merge pull request #16919 from davelopez/23.1_fix_library_import_path…
bgruening Oct 25, 2023
1dd28f6
Merge pull request #16921 from mvdbeek/23.1_fix_library_import_path_l…
davelopez Oct 26, 2023
ca5132d
Skip change_datatype things if we're not actually changing the extension
mvdbeek Oct 26, 2023
d7beda0
Commit if we've got outstanding MetadataFile instances
mvdbeek Oct 26, 2023
3e1d866
Merge pull request #16931 from mvdbeek/skip_datatype_change
martenson Oct 26, 2023
c7bd694
Merge branch 'release_23.1' into merge231in
martenson Oct 26, 2023
d92bbb1
unify statement format
martenson Oct 26, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,12 @@ const workflow = computed(() => {
});
const workflowStep = computed(() => {
if ("workflow_step_id" in props.invocationMessage && workflow.value) {
if (
"workflow_step_id" in props.invocationMessage &&
props.invocationMessage.workflow_step_id !== undefined &&
props.invocationMessage.workflow_step_id !== null &&
workflow.value
) {
return workflow.value.steps[props.invocationMessage.workflow_step_id];
}
return undefined;
Expand Down Expand Up @@ -149,10 +154,14 @@ const infoString = computed(() => {
invocationMessage.workflow_step_id + 1
} is a conditional step and the result of the when expression is not a boolean type.`;
} else if (reason === "unexpected_failure") {
let atStep = "";
if (invocationMessage.workflow_step_id !== null && invocationMessage.workflow_step_id !== undefined) {
atStep = ` at step ${invocationMessage.workflow_step_id + 1}`;
}
if (invocationMessage.details) {
return `${failFragment} an unexpected failure occurred: '${invocationMessage.details}'`;
return `${failFragment} an unexpected failure occurred${atStep}: '${invocationMessage.details}'`;
}
return `${failFragment} an unexpected failure occurred.`;
return `${failFragment} an unexpected failure occurred${atStep}.`;
} else if (reason === "workflow_output_not_found") {
return `Defined workflow output '${invocationMessage.output_name}' was not found in step ${
invocationMessage.workflow_step_id + 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ export interface GenericInvocationFailureCollectionFailedEncodedDatabaseIdField
/**
* HistoryDatasetCollectionAssociation ID that relates to failure.
*/
hdca_id?: string;
hdca_id: string;
/**
* Workflow step id of step that caused failure.
*/
Expand All @@ -92,7 +92,7 @@ export interface GenericInvocationFailureCollectionFailedInt {
/**
* HistoryDatasetCollectionAssociation ID that relates to failure.
*/
hdca_id?: number;
hdca_id: number;
/**
* Workflow step id of step that caused failure.
*/
Expand Down Expand Up @@ -159,7 +159,7 @@ export interface GenericInvocationFailureJobFailedEncodedDatabaseIdField {
/**
* Job ID that relates to failure.
*/
job_id?: string;
job_id: string;
/**
* Workflow step id of step that caused failure.
*/
Expand All @@ -174,7 +174,7 @@ export interface GenericInvocationFailureJobFailedInt {
/**
* Job ID that relates to failure.
*/
job_id?: number;
job_id: number;
/**
* Workflow step id of step that caused failure.
*/
Expand Down Expand Up @@ -232,11 +232,19 @@ export interface GenericInvocationUnexpectedFailureEncodedDatabaseIdField {
* May contains details to help troubleshoot this problem.
*/
details?: string;
/**
* Workflow step id of step that failed.
*/
workflow_step_id?: number;
}
export interface GenericInvocationUnexpectedFailureInt {
reason: "unexpected_failure";
/**
* May contains details to help troubleshoot this problem.
*/
details?: string;
/**
* Workflow step id of step that failed.
*/
workflow_step_id?: number;
}
15 changes: 8 additions & 7 deletions lib/galaxy/datatypes/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -600,13 +600,14 @@ def get_datatype_by_extension(self, ext):
return self.datatypes_by_extension.get(ext, None)

def change_datatype(self, data, ext):
data.extension = ext
# call init_meta and copy metadata from itself. The datatype
# being converted *to* will handle any metadata copying and
# initialization.
if data.has_data():
data.set_size()
data.init_meta(copy_from=data)
if data.extension != ext:
data.extension = ext
# call init_meta and copy metadata from itself. The datatype
# being converted *to* will handle any metadata copying and
# initialization.
if data.has_data():
data.set_size()
data.init_meta(copy_from=data)
return data

def load_datatype_converters(self, toolbox, use_cached=False):
Expand Down
10 changes: 10 additions & 0 deletions lib/galaxy/exceptions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,16 @@ class ToolInputsNotReadyException(MessageException):
error_code = error_codes_by_name["TOOL_INPUTS_NOT_READY"]


class ToolInputsNotOKException(MessageException):
def __init__(self, err_msg=None, type="info", *, src: str, id: int, **extra_error_info):
super().__init__(err_msg, type, **extra_error_info)
self.src = src
self.id = id

status_code = 400
error_code = error_codes_by_name["TOOL_INPUTS_NOT_OK"]


class RealUserRequiredException(MessageException):
status_code = 400
error_code = error_codes_by_name["REAL_USER_REQUIRED"]
Expand Down
5 changes: 5 additions & 0 deletions lib/galaxy/exceptions/error_codes.json
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@
"code": 400016,
"message": "Only real users can make this request."
},
{
"name": "TOOL_INPUTS_NOT_OK",
"code": 400017,
"message": "Tool inputs not in required OK state."
},
{
"name": "USER_AUTHENTICATION_FAILED",
"code": 401001,
Expand Down
4 changes: 4 additions & 0 deletions lib/galaxy/jobs/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1198,12 +1198,16 @@ def get_job_runner(self, job_wrapper, get_task_runner=False):
except KeyError:
log.error(f"({job_wrapper.job_id}) Invalid job runner: {runner_name}")
job_wrapper.fail(DEFAULT_JOB_RUNNER_FAILURE_MESSAGE)
return None
if get_task_runner and job_wrapper.can_split() and runner.runner_name != "PulsarJobRunner":
return self.job_runners["tasks"]
return runner

def put(self, job_wrapper):
runner = self.get_job_runner(job_wrapper, get_task_runner=True)
if runner is None:
# Something went wrong, we've already failed the job wrapper
return
if isinstance(job_wrapper, TaskWrapper):
# DBTODO Refactor
log.debug(f"({job_wrapper.job_id}) Dispatching task {job_wrapper.task_id} to task runner")
Expand Down
2 changes: 1 addition & 1 deletion lib/galaxy/metadata/set_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ def set_meta(new_dataset_instance, file_dict):
object_store_update_actions.append(
partial(push_if_necessary, object_store, dataset, external_filename)
)
object_store_update_actions.append(partial(reset_external_filename, dataset))
object_store_update_actions.append(partial(reset_external_filename, dataset))
object_store_update_actions.append(partial(dataset.set_total_size))
object_store_update_actions.append(partial(export_store.add_dataset, dataset))
if dataset_instance_id not in unnamed_id_to_path:
Expand Down
13 changes: 12 additions & 1 deletion lib/galaxy/model/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,18 @@ def wrap(self, value, session):
if isinstance(value, int):
return session.get(galaxy.model.MetadataFile, value)
else:
return session.execute(select(galaxy.model.MetadataFile).filter_by(uuid=value)).scalar_one()
wrapped_value = session.execute(
select(galaxy.model.MetadataFile).filter_by(uuid=value)
).scalar_one_or_none()
if wrapped_value:
return wrapped_value
else:
# If we've simultaneously copied the dataset and we've changed the datatype on the
# copy we may not have committed the MetadataFile yet, so we need to commit the session.
# TODO: It would be great if we can avoid the commit in the future.
with transaction(session):
session.commit()
return session.execute(select(galaxy.model.MetadataFile).filter_by(uuid=value)).scalar_one_or_none()

def make_copy(self, value, target_context: MetadataCollection, source_context):
session = target_context._object_session(target_context.parent)
Expand Down
6 changes: 5 additions & 1 deletion lib/galaxy/schema/invocation.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@ def get(self, key: Any, default: Any = None) -> Any:
# Fetch the order_index when serializing for the API,
# which makes much more sense when pointing to steps.
if key == "workflow_step_id":
return self._obj.workflow_step.order_index
if self._obj.workflow_step:
return self._obj.workflow_step.order_index
else:
return default
elif key == "dependent_workflow_step_id":
if self._obj.dependent_workflow_step_id:
return self._obj.dependent_workflow_step.order_index
Expand Down Expand Up @@ -132,6 +135,7 @@ class GenericInvocationFailureWhenNotBoolean(InvocationFailureMessageBase[Databa
class GenericInvocationUnexpectedFailure(InvocationMessageBase, Generic[DatabaseIdT]):
reason: Literal[FailureReason.unexpected_failure]
details: Optional[str] = Field(None, description="May contains details to help troubleshoot this problem.")
workflow_step_id: Optional[int] = Field(None, description="Workflow step id of step that failed.")


class GenericInvocationWarning(InvocationMessageBase, Generic[DatabaseIdT]):
Expand Down
33 changes: 20 additions & 13 deletions lib/galaxy/tools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@
exceptions,
model,
)
from galaxy.exceptions import ToolInputsNotReadyException
from galaxy.exceptions import (
ToolInputsNotOKException,
ToolInputsNotReadyException,
)
from galaxy.job_execution import output_collect
from galaxy.metadata import get_metadata_compute_strategy
from galaxy.model import (
Expand Down Expand Up @@ -2875,7 +2878,7 @@ def exec_after_process(self, app, inp_data, out_data, param_dict, job=None, **kw
copy_object = input_dataset
break
if copy_object is None:
raise Exception("Failed to find dataset output.")
raise exceptions.MessageException("Failed to find dataset output.")
out_data[key].copy_from(copy_object)

def parse_environment_variables(self, tool_source):
Expand Down Expand Up @@ -3217,8 +3220,10 @@ def check_dataset_state(state):

if self.require_dataset_ok:
if state != model.Dataset.states.OK:
raise ValueError(
f"Tool requires inputs to be in valid state, but dataset {input_dataset} is in state '{input_dataset.state}'"
raise ToolInputsNotOKException(
f"Tool requires inputs to be in valid state, but dataset {input_dataset} is in state '{input_dataset.state}'",
src="hda",
id=input_dataset.id,
)

for input_dataset in input_datasets.values():
Expand Down Expand Up @@ -3347,7 +3352,7 @@ def produce_outputs(self, trans, out_data, output_collections, incoming, history
elif how == "by_index":
extracted_element = collection[int(incoming["which"]["index"])]
else:
raise Exception("Invalid tool parameters.")
raise exceptions.MessageException("Invalid tool parameters.")
extracted = extracted_element.element_object
extracted_o = extracted.copy(
copy_tags=extracted.tags, new_name=extracted_element.element_identifier, flush=False
Expand Down Expand Up @@ -3387,7 +3392,9 @@ def produce_outputs(self, trans, out_data, output_collections, incoming, history
if element_identifier not in identifiers_map:
identifiers_map[element_identifier] = []
elif dupl_actions == "fail":
raise Exception(f"Duplicate collection element identifiers found for [{element_identifier}]")
raise exceptions.MessageException(
f"Duplicate collection element identifiers found for [{element_identifier}]"
)
identifiers_map[element_identifier].append(input_num)

for copy, input_list in enumerate(input_lists):
Expand Down Expand Up @@ -3579,12 +3586,12 @@ def produce_outputs(self, trans, out_data, output_collections, incoming, history
except KeyError:
hdca_history_name = f"{hdca.hid}: {hdca.name}"
message = f"List of element identifiers does not match element identifiers in collection '{hdca_history_name}'"
raise Exception(message)
raise exceptions.MessageException(message)
else:
message = f"Number of lines must match number of list elements ({len(elements)}), but file has {data_lines} lines"
raise Exception(message)
raise exceptions.MessageException(message)
else:
raise Exception(f"Unknown sort_type '{sorttype}'")
raise exceptions.MessageException(f"Unknown sort_type '{sorttype}'")

if presort_elements is not None:
sorted_elements = [x[1] for x in sorted(presort_elements, key=lambda x: x[0])]
Expand Down Expand Up @@ -3616,7 +3623,7 @@ def produce_outputs(self, trans, out_data, output_collections, incoming, history
def add_copied_value_to_new_elements(new_label, dce_object):
new_label = new_label.strip()
if new_label in new_elements:
raise Exception(
raise exceptions.MessageException(
f"New identifier [{new_label}] appears twice in resulting collection, these values must be unique."
)
if getattr(dce_object, "history_content_type", None) == "dataset":
Expand All @@ -3629,7 +3636,7 @@ def add_copied_value_to_new_elements(new_label, dce_object):
with open(new_labels_path) as fh:
new_labels = fh.readlines(1024 * 1000000)
if strict and len(hdca.collection.elements) != len(new_labels):
raise Exception("Relabel mapping file contains incorrect number of identifiers")
raise exceptions.MessageException("Relabel mapping file contains incorrect number of identifiers")
if how_type == "tabular":
# We have a tabular file, where the first column is an existing element identifier,
# and the second column is the new element identifier.
Expand All @@ -3641,7 +3648,7 @@ def add_copied_value_to_new_elements(new_label, dce_object):
default = None if strict else element_identifier
new_label = new_labels_dict.get(element_identifier, default)
if not new_label:
raise Exception(f"Failed to find new label for identifier [{element_identifier}]")
raise exceptions.MessageException(f"Failed to find new label for identifier [{element_identifier}]")
add_copied_value_to_new_elements(new_label, dce_object)
else:
# If new_labels_dataset_assoc is not a two-column tabular dataset we label with the current line of the dataset
Expand All @@ -3650,7 +3657,7 @@ def add_copied_value_to_new_elements(new_label, dce_object):
add_copied_value_to_new_elements(new_labels[i], dce_object)
for key in new_elements.keys():
if not re.match(r"^[\w\- \.,]+$", key):
raise Exception(f"Invalid new collection identifier [{key}]")
raise exceptions.MessageException(f"Invalid new collection identifier [{key}]")
self._add_datasets_to_history(history, new_elements.values())
output_collections.create_collection(
next(iter(self.outputs.values())), "output", elements=new_elements, propagate_hda_tags=False
Expand Down
20 changes: 12 additions & 8 deletions lib/galaxy/tools/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from boltons.iterutils import remap

from galaxy import model
from galaxy.exceptions import ToolInputsNotOKException
from galaxy.model.base import transaction
from galaxy.model.dataset_collections.matching import MatchingCollections
from galaxy.model.dataset_collections.structure import (
Expand Down Expand Up @@ -143,14 +144,17 @@ def execute_single_job(execution_slice, completed_job, skip=False):
if check_inputs_ready:
for params in execution_tracker.param_combinations:
# This will throw an exception if the tool is not ready.
check_inputs_ready(
tool,
trans,
params,
history,
execution_cache=execution_cache,
collection_info=collection_info,
)
try:
check_inputs_ready(
tool,
trans,
params,
history,
execution_cache=execution_cache,
collection_info=collection_info,
)
except ToolInputsNotOKException as e:
execution_tracker.record_error(e)

execution_tracker.ensure_implicit_collections_populated(history, mapping_params.param_template)
job_count = len(execution_tracker.param_combinations)
Expand Down
4 changes: 4 additions & 0 deletions lib/galaxy/webapps/galaxy/controllers/history.py
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,10 @@ def view(self, trans, id=None, show_deleted=False, show_hidden=False, use_panels
history_is_current = history_to_view == trans.history
else:
history_to_view = trans.history
if not history_to_view:
raise exceptions.RequestParameterMissingException(
"No 'id' parameter provided for history, and user does not have a current history."
)
user_is_owner = True
history_is_current = True

Expand Down
10 changes: 10 additions & 0 deletions lib/galaxy/workflow/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -2314,6 +2314,16 @@ def callback(input, prefixed_name, **kwargs):
if execution_tracker.execution_errors:
# TODO: formalize into InvocationFailure ?
message = f"Failed to create {len(execution_tracker.execution_errors)} job(s) for workflow step {step.order_index + 1}: {str(execution_tracker.execution_errors[0])}"
for error in execution_tracker.execution_errors:
# try first to raise a structured invocation error message
if isinstance(error, exceptions.ToolInputsNotOKException) and error.src == "hda":
raise FailWorkflowEvaluation(
why=InvocationFailureDatasetFailed(
reason=FailureReason.dataset_failed,
hda_id=error.id,
workflow_step_id=step.id,
)
)
raise exceptions.MessageException(message)

return complete
Expand Down
10 changes: 9 additions & 1 deletion lib/galaxy/workflow/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,12 +240,20 @@ def invoke(self) -> Dict[int, Any]:
except modules.DelayedWorkflowEvaluation as de:
step_delayed = delayed_steps = True
self.progress.mark_step_outputs_delayed(step, why=de.why)
except Exception:
except Exception as e:
log.exception(
"Failed to schedule %s, problem occurred on %s.",
self.workflow_invocation.workflow.log_str(),
step.log_str(),
)
if isinstance(e, MessageException):
# This is the highest level at which we can inject the step id
# to provide some more context to the exception.
raise modules.FailWorkflowEvaluation(
why=InvocationUnexpectedFailure(
reason=FailureReason.unexpected_failure, details=str(e), workflow_step_id=step.id
)
)
raise

if not step_delayed:
Expand Down
Loading
Loading