Skip to content

Commit

Permalink
Fix Cancel Workflows (#960)
Browse files Browse the repository at this point in the history
* Fix Cancel Workflows - allows jobs already scheduled to run but cancels the rest of the workflow

* Archive cancelled workflows after scheduled/running tasks finish

---------

Co-authored-by: leahh <[email protected]>
  • Loading branch information
Leahh02 and leahh authored Dec 2, 2024
1 parent 414bcab commit a01b75a
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 3 deletions.
10 changes: 10 additions & 0 deletions beeflow/common/gdb/gdb_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,16 @@ def workflow_completed(self):
:rtype: bool
"""

@abstractmethod
def cancelled_workflow_completed(self):
"""Determine if a cancelled workflow has completed.
A cancelled workflow has completed if each of its final tasks are not
'PENDING', 'RUNNING' 'COMPLETING'.
:rtype: bool
"""

@abstractmethod
def close(self):
"""Close the connection to the graph database."""
Expand Down
19 changes: 19 additions & 0 deletions beeflow/common/gdb/neo4j_cypher.py
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,25 @@ def final_tasks_completed(tx, wf_id):
return bool(tx.run(not_completed_query, wf_id=wf_id).single() is None)


def cancelled_final_tasks_completed(tx, wf_id):
"""Return true if all a cancelled workflow's scheduled tasks have completed, else false.
All of the workflow's scheduled tasks are completed if each of the final task nodes
are not in states 'PENDING', 'RUNNING', or 'COMPLETING'.
:param wf_id: the workflow's id
:type wf_id: str
:rtype: bool
"""
active_states_query = ("MATCH (m:Metadata)-[:DESCRIBES]->(t:Task {workflow_id: $wf_id}) "
"WHERE NOT (t)<-[:DEPENDS_ON|:RESTARTED_FROM]-(:Task) "
"AND m.state IN ['PENDING', 'RUNNING', 'COMPLETING'] "
"RETURN t IS NOT NULL LIMIT 1")

# False if at least one task is in 'PENDING', 'RUNNING', or 'COMPLETING'
return bool(tx.run(active_states_query, wf_id=wf_id).single() is None)


def is_empty(tx):
"""Return true if the database is empty, else false.
Expand Down
11 changes: 11 additions & 0 deletions beeflow/common/gdb/neo4j_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,17 @@ def workflow_completed(self, workflow_id):
"""
return self._read_transaction(tx.final_tasks_completed, wf_id=workflow_id)

def cancelled_workflow_completed(self, workflow_id):
"""Determine if a cancelled workflow has completed.
A cancelled workflow has completed if each of its final tasks are not
'PENDING', 'RUNNING' 'COMPLETING'.
:param workflow_id: the workflow id
:type workflow_id: str
:rtype: bool
"""
return self._read_transaction(tx.cancelled_final_tasks_completed, wf_id=workflow_id)

def close(self):
"""Close the connection to the Neo4j database."""
self._driver.close()
Expand Down
7 changes: 7 additions & 0 deletions beeflow/common/wf_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,13 @@ def workflow_completed(self):
"""
return self._gdb_driver.workflow_completed(self._workflow_id)

def cancelled_workflow_completed(self):
"""Return true if all a cancelled workflow's scheduled tasks have completed, else false.
:rtype: bool
"""
return self._gdb_driver.cancelled_workflow_completed(self._workflow_id)

def export_graphml(self):
"""Export a BEE workflow as a graphml."""
self._gdb_driver.export_graphml(self._workflow_id)
6 changes: 4 additions & 2 deletions beeflow/wf_manager/resources/wf_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,15 @@ def delete(self, wf_id):
db = connect_db(wfm_db, db_path)
if option == "cancel":
wfi = wf_utils.get_workflow_interface(wf_id)
wf_state = wfi.get_workflow_state()
# Remove all tasks currently in the database
wfi.set_workflow_state('Cancelled')
wf_utils.update_wf_status(wf_id, 'Cancelled')
db.workflows.update_workflow_state(wf_id, 'Cancelled')
log.info(f"Workflow {wf_id} cancelled")
# Archive cancelled workflow
archive_workflow(db, wf_id, final_state='Cancelled')
# Archive cancelled workflow if it was originally paused
if wf_state == 'PAUSED':
archive_workflow(db, wf_id, final_state='Cancelled')
resp = make_response(jsonify(status='Cancelled'), 202)
elif option == "remove":
log.info(f"Removing workflow {wf_id}.")
Expand Down
7 changes: 6 additions & 1 deletion beeflow/wf_manager/resources/wf_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,14 +134,19 @@ def handle_state_change(self, state_update, task, wfi, db):
wfi.set_task_output(task, output.id, "temp")
tasks = wfi.finalize_task(task)
wf_state = wfi.get_workflow_state()
if tasks and wf_state != 'PAUSED':
if tasks and wf_state not in ('PAUSED', 'Cancelled'):
wf_utils.schedule_submit_tasks(state_update.wf_id, tasks)

if wfi.workflow_completed():
wf_id = wfi.workflow_id
log.info(f"Workflow {wf_id} Completed")
archive_workflow(db, state_update.wf_id)
log.info('Workflow Completed')
elif wf_state == 'Cancelled' and wfi.cancelled_workflow_completed():
wf_id = wfi.workflow_id
log.info(f"Scheduled tasks for cancelled workflow {wf_id} completed")
archive_workflow(db, wf_id, final_state=wf_state)
log.info('Workflow Archived')

# If the job failed and it doesn't include a checkpoint-restart hint,
# then fail the entire workflow
Expand Down

0 comments on commit a01b75a

Please sign in to comment.