From 31787fd06a33e150c4cd7786d3e9733c44ca5f48 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Sun, 17 Sep 2023 09:19:03 +0200 Subject: [PATCH 1/2] Track subworkflow invocations after main invocation is scheduled Otherwise we'd attempt to download results too early. That's how I discovered https://github.com/galaxyproject/galaxy/pull/16705. --- planemo/galaxy/activity.py | 84 +++++++++++++++++++++++++++----------- 1 file changed, 60 insertions(+), 24 deletions(-) diff --git a/planemo/galaxy/activity.py b/planemo/galaxy/activity.py index 1ee3b77cc..3ab27eaf7 100644 --- a/planemo/galaxy/activity.py +++ b/planemo/galaxy/activity.py @@ -208,33 +208,25 @@ def _execute( # noqa C901 ctx.vlog("Waiting for invocation [%s]" % invocation_id) polling_backoff = kwds.get("polling_backoff", 0) - final_invocation_state = "new" - error_message = "" - try: - final_invocation_state = _wait_for_invocation( - ctx, user_gi, history_id, workflow_id, invocation_id, polling_backoff - ) - assert final_invocation_state == "scheduled" - except Exception: - ctx.vlog("Problem waiting on invocation...") - summarize_history(ctx, user_gi, history_id) - error_message = "Final invocation state is [%s]" % final_invocation_state - ctx.vlog("Final invocation state is [%s]" % final_invocation_state) + no_wait = kwds.get("no_wait", False) - if not kwds.get("no_wait"): - final_state = _wait_for_invocation_jobs(ctx, user_gi, invocation_id, polling_backoff) - if final_state not in ("ok", "skipped"): - msg = "Failed to run workflow final history state is [%s]." % final_state - error_message = msg if not error_message else f"{error_message}. {msg}" - ctx.vlog(msg) - summarize_history(ctx, user_gi, history_id) - else: - ctx.vlog("Final history state is 'ok'") + final_invocation_state, job_state, error_message = wait_for_invocation_and_jobs( + ctx, + invocation_id=invocation_id, + history_id=history_id, + user_gi=user_gi, + no_wait=no_wait, + polling_backoff=polling_backoff, + ) + if final_invocation_state not in ("ok", "skipped"): + msg = f"Failed to run workflow, at least one job is in [{final_invocation_state}] state." + ctx.vlog(msg) + summarize_history(ctx, user_gi, history_id) response_kwds = { "workflow_id": workflow_id, "invocation_id": invocation_id, - "history_state": final_state if not kwds.get("no_wait") else None, + "history_state": job_state if not no_wait else None, "invocation_state": final_invocation_state, "error_message": error_message, } @@ -722,9 +714,53 @@ def get_dict_from_workflow(gi, workflow_id): return gi.workflows.export_workflow_dict(workflow_id) -def _wait_for_invocation(ctx, gi, history_id, workflow_id, invocation_id, polling_backoff=0): +def wait_for_invocation_and_jobs( + ctx, invocation_id: str, history_id: str, user_gi: GalaxyInstance, no_wait: bool, polling_backoff: int +): + ctx.vlog("Waiting for invocation [%s]" % invocation_id) + final_invocation_state = "new" + + # TODO: hook in invocation["messages"] + error_message = "" + job_state = "ok" + try: + final_invocation_state = _wait_for_invocation(ctx, user_gi, invocation_id, polling_backoff) + assert final_invocation_state == "scheduled" + except Exception as e: + ctx.vlog(f"Problem waiting on invocation: {str(e)}") + summarize_history(ctx, user_gi, history_id) + error_message = "Final invocation state is [%s]" % final_invocation_state + + ctx.vlog("Final invocation state is [%s]" % final_invocation_state) + + if not no_wait: + job_state = _wait_for_invocation_jobs(ctx, user_gi, invocation_id, polling_backoff) + if job_state not in ("ok", "skipped"): + msg = "Failed to run workflow, at least one job is in [%s] state." % job_state + error_message = msg if not error_message else f"{error_message}. {msg}" + else: + # wait for possible subworkflow invocations + invocation = user_gi.invocations.show_invocation(invocation_id) + for step in invocation["steps"]: + if step.get("subworkflow_invocation_id") is not None: + final_invocation_state, job_state, error_message = wait_for_invocation_and_jobs( + ctx, + invocation_id=step["subworkflow_invocation_id"], + history_id=history_id, + user_gi=user_gi, + no_wait=no_wait, + polling_backoff=polling_backoff, + ) + if final_invocation_state != "scheduled" or job_state not in ("ok", "skipped"): + return final_invocation_state, job_state, error_message + + ctx.vlog("All invocation and subworkflow invocations states are 'ok'") + return final_invocation_state, job_state, error_message + + +def _wait_for_invocation(ctx, gi, invocation_id, polling_backoff=0): def state_func(): - return _retry_on_timeouts(ctx, gi, lambda gi: gi.workflows.show_invocation(workflow_id, invocation_id)) + return _retry_on_timeouts(ctx, gi, lambda gi: gi.invocations.show_invocation(invocation_id)) return _wait_on_state(state_func, polling_backoff) From dec0e6a264d426a9c2cb93e803b56b1e04a41529 Mon Sep 17 00:00:00 2001 From: Marius van den Beek Date: Mon, 18 Sep 2023 14:40:30 +0200 Subject: [PATCH 2/2] Improve log messages Co-authored-by: Nicola Soranzo --- planemo/galaxy/activity.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/planemo/galaxy/activity.py b/planemo/galaxy/activity.py index 3ab27eaf7..867b23120 100644 --- a/planemo/galaxy/activity.py +++ b/planemo/galaxy/activity.py @@ -219,7 +219,7 @@ def _execute( # noqa C901 polling_backoff=polling_backoff, ) if final_invocation_state not in ("ok", "skipped"): - msg = f"Failed to run workflow, at least one job is in [{final_invocation_state}] state." + msg = f"Failed to run workflow [{workflow_id}], at least one job is in [{final_invocation_state}] state." ctx.vlog(msg) summarize_history(ctx, user_gi, history_id) @@ -729,14 +729,14 @@ def wait_for_invocation_and_jobs( except Exception as e: ctx.vlog(f"Problem waiting on invocation: {str(e)}") summarize_history(ctx, user_gi, history_id) - error_message = "Final invocation state is [%s]" % final_invocation_state + error_message = f"Final state of invocation {invocation_id} is [{final_invocation_state}]" - ctx.vlog("Final invocation state is [%s]" % final_invocation_state) + ctx.vlog(f"Final state of invocation {invocation_id} is [{final_invocation_state}]") if not no_wait: job_state = _wait_for_invocation_jobs(ctx, user_gi, invocation_id, polling_backoff) if job_state not in ("ok", "skipped"): - msg = "Failed to run workflow, at least one job is in [%s] state." % job_state + msg = f"Failed to run workflow, at least one job is in [{job_state}] state." error_message = msg if not error_message else f"{error_message}. {msg}" else: # wait for possible subworkflow invocations @@ -754,7 +754,7 @@ def wait_for_invocation_and_jobs( if final_invocation_state != "scheduled" or job_state not in ("ok", "skipped"): return final_invocation_state, job_state, error_message - ctx.vlog("All invocation and subworkflow invocations states are 'ok'") + ctx.vlog(f"The final state of all jobs and subworkflow invocations for invocation [{invocation_id}] is 'ok'") return final_invocation_state, job_state, error_message