diff --git a/planemo/galaxy/activity.py b/planemo/galaxy/activity.py index 1ee3b77cc..867b23120 100644 --- a/planemo/galaxy/activity.py +++ b/planemo/galaxy/activity.py @@ -208,33 +208,25 @@ def _execute( # noqa C901 ctx.vlog("Waiting for invocation [%s]" % invocation_id) polling_backoff = kwds.get("polling_backoff", 0) - final_invocation_state = "new" - error_message = "" - try: - final_invocation_state = _wait_for_invocation( - ctx, user_gi, history_id, workflow_id, invocation_id, polling_backoff - ) - assert final_invocation_state == "scheduled" - except Exception: - ctx.vlog("Problem waiting on invocation...") - summarize_history(ctx, user_gi, history_id) - error_message = "Final invocation state is [%s]" % final_invocation_state - ctx.vlog("Final invocation state is [%s]" % final_invocation_state) + no_wait = kwds.get("no_wait", False) - if not kwds.get("no_wait"): - final_state = _wait_for_invocation_jobs(ctx, user_gi, invocation_id, polling_backoff) - if final_state not in ("ok", "skipped"): - msg = "Failed to run workflow final history state is [%s]." % final_state - error_message = msg if not error_message else f"{error_message}. {msg}" - ctx.vlog(msg) - summarize_history(ctx, user_gi, history_id) - else: - ctx.vlog("Final history state is 'ok'") + final_invocation_state, job_state, error_message = wait_for_invocation_and_jobs( + ctx, + invocation_id=invocation_id, + history_id=history_id, + user_gi=user_gi, + no_wait=no_wait, + polling_backoff=polling_backoff, + ) + if final_invocation_state not in ("ok", "skipped"): + msg = f"Failed to run workflow [{workflow_id}], at least one job is in [{final_invocation_state}] state." + ctx.vlog(msg) + summarize_history(ctx, user_gi, history_id) response_kwds = { "workflow_id": workflow_id, "invocation_id": invocation_id, - "history_state": final_state if not kwds.get("no_wait") else None, + "history_state": job_state if not no_wait else None, "invocation_state": final_invocation_state, "error_message": error_message, } @@ -722,9 +714,53 @@ def get_dict_from_workflow(gi, workflow_id): return gi.workflows.export_workflow_dict(workflow_id) -def _wait_for_invocation(ctx, gi, history_id, workflow_id, invocation_id, polling_backoff=0): +def wait_for_invocation_and_jobs( + ctx, invocation_id: str, history_id: str, user_gi: GalaxyInstance, no_wait: bool, polling_backoff: int +): + ctx.vlog("Waiting for invocation [%s]" % invocation_id) + final_invocation_state = "new" + + # TODO: hook in invocation["messages"] + error_message = "" + job_state = "ok" + try: + final_invocation_state = _wait_for_invocation(ctx, user_gi, invocation_id, polling_backoff) + assert final_invocation_state == "scheduled" + except Exception as e: + ctx.vlog(f"Problem waiting on invocation: {str(e)}") + summarize_history(ctx, user_gi, history_id) + error_message = f"Final state of invocation {invocation_id} is [{final_invocation_state}]" + + ctx.vlog(f"Final state of invocation {invocation_id} is [{final_invocation_state}]") + + if not no_wait: + job_state = _wait_for_invocation_jobs(ctx, user_gi, invocation_id, polling_backoff) + if job_state not in ("ok", "skipped"): + msg = f"Failed to run workflow, at least one job is in [{job_state}] state." + error_message = msg if not error_message else f"{error_message}. {msg}" + else: + # wait for possible subworkflow invocations + invocation = user_gi.invocations.show_invocation(invocation_id) + for step in invocation["steps"]: + if step.get("subworkflow_invocation_id") is not None: + final_invocation_state, job_state, error_message = wait_for_invocation_and_jobs( + ctx, + invocation_id=step["subworkflow_invocation_id"], + history_id=history_id, + user_gi=user_gi, + no_wait=no_wait, + polling_backoff=polling_backoff, + ) + if final_invocation_state != "scheduled" or job_state not in ("ok", "skipped"): + return final_invocation_state, job_state, error_message + + ctx.vlog(f"The final state of all jobs and subworkflow invocations for invocation [{invocation_id}] is 'ok'") + return final_invocation_state, job_state, error_message + + +def _wait_for_invocation(ctx, gi, invocation_id, polling_backoff=0): def state_func(): - return _retry_on_timeouts(ctx, gi, lambda gi: gi.workflows.show_invocation(workflow_id, invocation_id)) + return _retry_on_timeouts(ctx, gi, lambda gi: gi.invocations.show_invocation(invocation_id)) return _wait_on_state(state_func, polling_backoff)