Skip to content

Commit

Permalink
deepcopy task data in subworkflows and joins
Browse files Browse the repository at this point in the history
  • Loading branch information
essweine committed Apr 16, 2024
1 parent 18a6350 commit e0e0022
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 8 deletions.
2 changes: 1 addition & 1 deletion SpiffWorkflow/bpmn/specs/mixins/subworkflow_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def _on_cancel(self, my_task):

def copy_data(self, my_task, subworkflow):
start = subworkflow.get_next_task(spec_name='Start')
start.set_data(**my_task.data)
start.set_data(**deepcopy(my_task.data))

def update_data(self, my_task, subworkflow):
my_task.data = deepcopy(subworkflow.last_task.data)
Expand Down
16 changes: 10 additions & 6 deletions SpiffWorkflow/bpmn/specs/mixins/unstructured_join.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301 USA
from copy import deepcopy

from SpiffWorkflow.util.task import TaskState, TaskIterator
from SpiffWorkflow.specs.Join import Join


class UnstructuredJoin(Join):
"""
A helper subclass of Join that makes it work in a slightly friendlier way
Expand All @@ -29,7 +29,8 @@ class UnstructuredJoin(Join):
def _update_hook(self, my_task):

may_fire = self._check_threshold_unstructured(my_task)
other_tasks = [t for t in my_task.workflow.tasks.values() if t.task_spec == self and t != my_task and t.state is TaskState.WAITING]
other_tasks = [t for t in my_task.workflow.tasks.values()
if t.task_spec == self and t != my_task and t.state is TaskState.WAITING]
for task in other_tasks:
# By cancelling other waiting tasks immediately, we can prevent them from being updated repeeatedly and pointlessly
task.cancel()
Expand All @@ -43,16 +44,19 @@ def _update_hook(self, my_task):

def _run_hook(self, my_task):
other_tasks = filter(
lambda t: t.task_spec == self and t != my_task and t.has_state(TaskState.FINISHED_MASK) and not my_task.is_descendant_of(t),
lambda t: t.task_spec == self and t.has_state(TaskState.FINISHED_MASK) and not my_task.is_descendant_of(t),
my_task.workflow.tasks.values()
)
pass
for task in sorted(other_tasks, key=lambda t: t.last_state_change):
# By inheriting directly from parent tasks, we can avoid copying previouly merged data
my_task.data.update(task.parent.data)

my_task.set_data(**deepcopy(task.parent.data))
# This condition only applies when a workflow is reset inside a parallel branch.
# If reset to a branch that was originally cancelled, all the descendants of the previously completed branch will still
# appear in the tree, potentially corrupting the structure and data.
if task.has_state(TaskState.COMPLETED):
task._drop_children(force=True)
return True

# My task is not finished, so won't be included above.
my_task._inherit_data()
return True
2 changes: 1 addition & 1 deletion SpiffWorkflow/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ def _assign_new_thread_id(self, recursive=True):

def _inherit_data(self):
"""Copies the data from the parent."""
self.set_data(**self.parent.data)
self.set_data(**deepcopy(self.parent.data))

def _set_internal_data(self, **kwargs):
"""Defines the given attribute/value pairs in this task's internal data."""
Expand Down

0 comments on commit e0e0022

Please sign in to comment.