Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Cancel Workflows #960

Merged
merged 5 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions beeflow/common/gdb/gdb_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,16 @@ def workflow_completed(self):
:rtype: bool
"""

@abstractmethod
def cancelled_workflow_completed(self):
"""Determine if a cancelled workflow has completed.

A cancelled workflow has completed if each of its final tasks are not
'PENDING', 'RUNNING' 'COMPLETING'.

:rtype: bool
"""

@abstractmethod
def close(self):
"""Close the connection to the graph database."""
Expand Down
19 changes: 19 additions & 0 deletions beeflow/common/gdb/neo4j_cypher.py
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,25 @@ def final_tasks_completed(tx, wf_id):
return bool(tx.run(not_completed_query, wf_id=wf_id).single() is None)


def cancelled_final_tasks_completed(tx, wf_id):
"""Return true if all a cancelled workflow's scheduled tasks have completed, else false.

All of the workflow's scheduled tasks are completed if each of the final task nodes
are not in states 'PENDING', 'RUNNING', or 'COMPLETING'.

:param wf_id: the workflow's id
:type wf_id: str
:rtype: bool
"""
active_states_query = ("MATCH (m:Metadata)-[:DESCRIBES]->(t:Task {workflow_id: $wf_id}) "
"WHERE NOT (t)<-[:DEPENDS_ON|:RESTARTED_FROM]-(:Task) "
"AND m.state IN ['PENDING', 'RUNNING', 'COMPLETING'] "
"RETURN t IS NOT NULL LIMIT 1")

# False if at least one task is in 'PENDING', 'RUNNING', or 'COMPLETING'
return bool(tx.run(active_states_query, wf_id=wf_id).single() is None)


def is_empty(tx):
"""Return true if the database is empty, else false.

Expand Down
11 changes: 11 additions & 0 deletions beeflow/common/gdb/neo4j_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,17 @@ def workflow_completed(self, workflow_id):
"""
return self._read_transaction(tx.final_tasks_completed, wf_id=workflow_id)

def cancelled_workflow_completed(self, workflow_id):
"""Determine if a cancelled workflow has completed.

A cancelled workflow has completed if each of its final tasks are not
'PENDING', 'RUNNING' 'COMPLETING'.
:param workflow_id: the workflow id
:type workflow_id: str
:rtype: bool
"""
return self._read_transaction(tx.cancelled_final_tasks_completed, wf_id=workflow_id)

def close(self):
"""Close the connection to the Neo4j database."""
self._driver.close()
Expand Down
7 changes: 7 additions & 0 deletions beeflow/common/wf_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,13 @@ def workflow_completed(self):
"""
return self._gdb_driver.workflow_completed(self._workflow_id)

def cancelled_workflow_completed(self):
"""Return true if all a cancelled workflow's scheduled tasks have completed, else false.

:rtype: bool
"""
return self._gdb_driver.cancelled_workflow_completed(self._workflow_id)

def export_graphml(self):
"""Export a BEE workflow as a graphml."""
self._gdb_driver.export_graphml(self._workflow_id)
6 changes: 4 additions & 2 deletions beeflow/wf_manager/resources/wf_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,15 @@ def delete(self, wf_id):
db = connect_db(wfm_db, db_path)
if option == "cancel":
wfi = wf_utils.get_workflow_interface(wf_id)
wf_state = wfi.get_workflow_state()
# Remove all tasks currently in the database
wfi.set_workflow_state('Cancelled')
wf_utils.update_wf_status(wf_id, 'Cancelled')
db.workflows.update_workflow_state(wf_id, 'Cancelled')
log.info(f"Workflow {wf_id} cancelled")
# Archive cancelled workflow
archive_workflow(db, wf_id, final_state='Cancelled')
# Archive cancelled workflow if it was originally paused
if wf_state == 'PAUSED':
archive_workflow(db, wf_id, final_state='Cancelled')
resp = make_response(jsonify(status='Cancelled'), 202)
elif option == "remove":
log.info(f"Removing workflow {wf_id}.")
Expand Down
7 changes: 6 additions & 1 deletion beeflow/wf_manager/resources/wf_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,14 +134,19 @@ def handle_state_change(self, state_update, task, wfi, db):
wfi.set_task_output(task, output.id, "temp")
tasks = wfi.finalize_task(task)
wf_state = wfi.get_workflow_state()
if tasks and wf_state != 'PAUSED':
if tasks and wf_state not in ('PAUSED', 'Cancelled'):
wf_utils.schedule_submit_tasks(state_update.wf_id, tasks)

if wfi.workflow_completed():
wf_id = wfi.workflow_id
log.info(f"Workflow {wf_id} Completed")
archive_workflow(db, state_update.wf_id)
log.info('Workflow Completed')
elif wf_state == 'Cancelled' and wfi.cancelled_workflow_completed():
wf_id = wfi.workflow_id
log.info(f"Scheduled tasks for cancelled workflow {wf_id} completed")
archive_workflow(db, wf_id, final_state=wf_state)
log.info('Workflow Archived')

# If the job failed and it doesn't include a checkpoint-restart hint,
# then fail the entire workflow
Expand Down
Loading