Skip to content

Commit

Permalink
Revise workflow failure around materializing URLs.
Browse files Browse the repository at this point in the history
  • Loading branch information
jmchilton committed Sep 30, 2024
1 parent 3bf8a80 commit f80207d
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 18 deletions.
5 changes: 4 additions & 1 deletion lib/galaxy/managers/hdas.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ def create(
session.commit()
return hda

def materialize(self, request: MaterializeDatasetInstanceTaskRequest, in_place: bool = False) -> None:
def materialize(self, request: MaterializeDatasetInstanceTaskRequest, in_place: bool = False) -> bool:
request_user: RequestUser = request.user
materializer = materializer_factory(
True, # attached...
Expand All @@ -195,9 +195,12 @@ def materialize(self, request: MaterializeDatasetInstanceTaskRequest, in_place:
)
if not in_place:
history.add_dataset(new_hda, set_hid=True)
else:
new_hda.set_total_size()
session = self.session()
with transaction(session):
session.commit()
return new_hda.is_ok

def copy(
self, item: Any, history=None, hide_copy: bool = False, flush: bool = True, **kwargs: Any
Expand Down
17 changes: 14 additions & 3 deletions lib/galaxy/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8573,6 +8573,12 @@ class InputWithRequest:
request: Dict[str, Any]


@dataclass
class InputToMaterialize:
hda: "HistoryDatasetAssociation"
input_dataset: "WorkflowRequestToInputDatasetAssociation"


class WorkflowInvocation(Base, UsesCreateAndUpdateTime, Dictifiable, Serializable):
__tablename__ = "workflow_invocation"

Expand Down Expand Up @@ -8896,14 +8902,19 @@ def input_associations(self):
inputs.append(input_dataset_collection_assoc)
return inputs

def inputs_requiring_materialization(self):
hdas_to_materialize = []
def inputs_requiring_materialization(self) -> List[InputToMaterialize]:
hdas_to_materialize: List[InputToMaterialize] = []
for input_dataset_assoc in self.input_datasets:
request = input_dataset_assoc.request
if request:
deferred = request.get("deferred", False)
if not deferred:
hdas_to_materialize.append(input_dataset_assoc.dataset)
hdas_to_materialize.append(
InputToMaterialize(
input_dataset_assoc.dataset,
input_dataset_assoc,
)
)
return hdas_to_materialize

def _serialize(self, id_encoder, serialization_options):
Expand Down
29 changes: 25 additions & 4 deletions lib/galaxy/workflow/scheduling_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@
from galaxy.exceptions import HandlerAssignmentError
from galaxy.jobs.handler import InvocationGrabber
from galaxy.model.base import transaction
from galaxy.schema.invocation import InvocationState
from galaxy.schema.invocation import (
FailureReason,
InvocationFailureDatasetFailed,
InvocationState,
InvocationUnexpectedFailure,
)
from galaxy.schema.tasks import (
MaterializeDatasetInstanceTaskRequest,
RequestUser,
Expand Down Expand Up @@ -335,8 +340,9 @@ def __schedule(self, workflow_scheduler_id, workflow_scheduler):

def __attempt_materialize(self, workflow_invocation, session) -> bool:
try:
hdas_to_materialize = workflow_invocation.inputs_requiring_materialization()
for hda in hdas_to_materialize:
inputs_to_materialize = workflow_invocation.inputs_requiring_materialization()
for input_to_materialize in inputs_to_materialize:
hda = input_to_materialize.hda
user = RequestUser(user_id=workflow_invocation.history.user_id)
task_request = MaterializeDatasetInstanceTaskRequest(
user=user,
Expand All @@ -345,7 +351,20 @@ def __attempt_materialize(self, workflow_invocation, session) -> bool:
content=hda.id,
validate_hashes=True,
)
self.app.hda_manager.materialize(task_request, in_place=True)
materialized_okay = self.app.hda_manager.materialize(task_request, in_place=True)
if not materialized_okay:
workflow_invocation.fail()
workflow_invocation.add_message(
InvocationFailureDatasetFailed(
workflow_step_id=input_to_materialize.input_dataset.workflow_step.id,
reason=FailureReason.dataset_failed,
hda_id=hda.id,
)
)
session.add(workflow_invocation)
session.commit()
return False

# place back into ready and let it proceed normally on next iteration?
workflow_invocation.set_state(model.WorkflowInvocation.states.READY)
session.add(workflow_invocation)
Expand All @@ -354,6 +373,8 @@ def __attempt_materialize(self, workflow_invocation, session) -> bool:
except Exception as e:
log.exception(f"Failed to materialize dataset for workflow {workflow_invocation.id} - {e}")
workflow_invocation.fail()
failure = InvocationUnexpectedFailure(reason=FailureReason.unexpected_failure, details=str(e))
workflow_invocation.add_message(failure)
session.add(workflow_invocation)
session.commit()
return False
Expand Down
20 changes: 10 additions & 10 deletions lib/galaxy_test/api/test_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -1624,11 +1624,11 @@ def test_run_workflow_with_invalid_url_hashes(self):
invocation_id = self.workflow_populator.invoke_workflow_and_wait(
workflow_id, request=workflow_request, assert_ok=False
).json()["id"]
invocation = self._invocation_details(workflow_id, invocation_id)
assert invocation["state"] == "scheduled", invocation
invocation_jobs = self.workflow_populator.get_invocation_jobs(invocation_id)
for job in invocation_jobs:
assert job["state"] == "paused"
invocation_details = self._invocation_details(workflow_id, invocation_id)
assert invocation_details["state"] == "failed"
assert len(invocation_details["messages"]) == 1
message = invocation_details["messages"][0]
assert message["reason"] == "dataset_failed"

@skip_without_tool("cat1")
def test_run_workflow_with_invalid_url(self):
Expand Down Expand Up @@ -1658,11 +1658,11 @@ def test_run_workflow_with_invalid_url(self):
invocation_id = self.workflow_populator.invoke_workflow_and_wait(
workflow_id, request=workflow_request, assert_ok=False
).json()["id"]
invocation = self._invocation_details(workflow_id, invocation_id)
assert invocation["state"] == "scheduled", invocation
invocation_jobs = self.workflow_populator.get_invocation_jobs(invocation_id)
for job in invocation_jobs:
assert job["state"] == "paused"
invocation_details = self._invocation_details(workflow_id, invocation_id)
assert invocation_details["state"] == "failed"
assert len(invocation_details["messages"]) == 1
message = invocation_details["messages"][0]
assert message["reason"] == "dataset_failed"

def __run_cat_workflow(self, inputs_by, history_id: Optional[str] = None):
workflow = self.workflow_populator.load_workflow(name="test_for_run")
Expand Down

0 comments on commit f80207d

Please sign in to comment.