Skip to content

Commit

Permalink
Merge pull request #1389 from mvdbeek/track_subworkflow_invocation_jobs
Browse files Browse the repository at this point in the history
Track subworkflow invocations after main invocation is scheduled
  • Loading branch information
mvdbeek authored Sep 18, 2023
2 parents 2f79069 + dec0e6a commit 4b92085
Showing 1 changed file with 60 additions and 24 deletions.
84 changes: 60 additions & 24 deletions planemo/galaxy/activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,33 +208,25 @@ def _execute( # noqa C901

ctx.vlog("Waiting for invocation [%s]" % invocation_id)
polling_backoff = kwds.get("polling_backoff", 0)
final_invocation_state = "new"
error_message = ""
try:
final_invocation_state = _wait_for_invocation(
ctx, user_gi, history_id, workflow_id, invocation_id, polling_backoff
)
assert final_invocation_state == "scheduled"
except Exception:
ctx.vlog("Problem waiting on invocation...")
summarize_history(ctx, user_gi, history_id)
error_message = "Final invocation state is [%s]" % final_invocation_state
ctx.vlog("Final invocation state is [%s]" % final_invocation_state)
no_wait = kwds.get("no_wait", False)

if not kwds.get("no_wait"):
final_state = _wait_for_invocation_jobs(ctx, user_gi, invocation_id, polling_backoff)
if final_state not in ("ok", "skipped"):
msg = "Failed to run workflow final history state is [%s]." % final_state
error_message = msg if not error_message else f"{error_message}. {msg}"
ctx.vlog(msg)
summarize_history(ctx, user_gi, history_id)
else:
ctx.vlog("Final history state is 'ok'")
final_invocation_state, job_state, error_message = wait_for_invocation_and_jobs(
ctx,
invocation_id=invocation_id,
history_id=history_id,
user_gi=user_gi,
no_wait=no_wait,
polling_backoff=polling_backoff,
)
if final_invocation_state not in ("ok", "skipped"):
msg = f"Failed to run workflow [{workflow_id}], at least one job is in [{final_invocation_state}] state."
ctx.vlog(msg)
summarize_history(ctx, user_gi, history_id)

response_kwds = {
"workflow_id": workflow_id,
"invocation_id": invocation_id,
"history_state": final_state if not kwds.get("no_wait") else None,
"history_state": job_state if not no_wait else None,
"invocation_state": final_invocation_state,
"error_message": error_message,
}
Expand Down Expand Up @@ -722,9 +714,53 @@ def get_dict_from_workflow(gi, workflow_id):
return gi.workflows.export_workflow_dict(workflow_id)


def _wait_for_invocation(ctx, gi, history_id, workflow_id, invocation_id, polling_backoff=0):
def wait_for_invocation_and_jobs(
ctx, invocation_id: str, history_id: str, user_gi: GalaxyInstance, no_wait: bool, polling_backoff: int
):
ctx.vlog("Waiting for invocation [%s]" % invocation_id)
final_invocation_state = "new"

# TODO: hook in invocation["messages"]
error_message = ""
job_state = "ok"
try:
final_invocation_state = _wait_for_invocation(ctx, user_gi, invocation_id, polling_backoff)
assert final_invocation_state == "scheduled"
except Exception as e:
ctx.vlog(f"Problem waiting on invocation: {str(e)}")
summarize_history(ctx, user_gi, history_id)
error_message = f"Final state of invocation {invocation_id} is [{final_invocation_state}]"

ctx.vlog(f"Final state of invocation {invocation_id} is [{final_invocation_state}]")

if not no_wait:
job_state = _wait_for_invocation_jobs(ctx, user_gi, invocation_id, polling_backoff)
if job_state not in ("ok", "skipped"):
msg = f"Failed to run workflow, at least one job is in [{job_state}] state."
error_message = msg if not error_message else f"{error_message}. {msg}"
else:
# wait for possible subworkflow invocations
invocation = user_gi.invocations.show_invocation(invocation_id)
for step in invocation["steps"]:
if step.get("subworkflow_invocation_id") is not None:
final_invocation_state, job_state, error_message = wait_for_invocation_and_jobs(
ctx,
invocation_id=step["subworkflow_invocation_id"],
history_id=history_id,
user_gi=user_gi,
no_wait=no_wait,
polling_backoff=polling_backoff,
)
if final_invocation_state != "scheduled" or job_state not in ("ok", "skipped"):
return final_invocation_state, job_state, error_message

ctx.vlog(f"The final state of all jobs and subworkflow invocations for invocation [{invocation_id}] is 'ok'")
return final_invocation_state, job_state, error_message


def _wait_for_invocation(ctx, gi, invocation_id, polling_backoff=0):
def state_func():
return _retry_on_timeouts(ctx, gi, lambda gi: gi.workflows.show_invocation(workflow_id, invocation_id))
return _retry_on_timeouts(ctx, gi, lambda gi: gi.invocations.show_invocation(invocation_id))

return _wait_on_state(state_func, polling_backoff)

Expand Down

0 comments on commit 4b92085

Please sign in to comment.