Skip to content

Commit

Permalink
Merge branch 'main' into feature/receive-message-correlations
Browse files Browse the repository at this point in the history
  • Loading branch information
essweine committed Apr 25, 2024
2 parents 328a1b5 + 76eb9e6 commit ca2a50c
Show file tree
Hide file tree
Showing 38 changed files with 715 additions and 417 deletions.
17 changes: 6 additions & 11 deletions SpiffWorkflow/bpmn/parser/node_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,12 @@ def bpmn_attributes(self):
def get_description(self):
return self.process_parser.parser.spec_descriptions.get(self.node.tag)

def xpath(self, xpath, extra_ns=None):
return self._xpath(self.node, xpath, extra_ns)
def xpath(self, xpath):
return self._xpath(self.node, xpath)

def doc_xpath(self, xpath, extra_ns=None):
def doc_xpath(self, xpath):
root = self.node.getroottree().getroot()
return self._xpath(root, xpath, extra_ns)
return self._xpath(root, xpath)

def attribute(self, attribute, namespace=None, node=None):
if node is None:
Expand Down Expand Up @@ -156,13 +156,8 @@ def _get_lane(self):
if noderef is not None:
return noderef.getparent().get('name')

def _xpath(self, node, xpath, extra_ns=None):
if extra_ns is not None:
nsmap = self.nsmap.copy()
nsmap.update(extra_ns)
else:
nsmap = self.nsmap
return node.xpath(xpath, namespaces=nsmap)
def _xpath(self, node, xpath):
return node.xpath(xpath, namespaces=self.nsmap)

def raise_validation_exception(self, message):
raise ValidationException(message, self.node, self.filename)
30 changes: 14 additions & 16 deletions SpiffWorkflow/bpmn/specs/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
# 02110-1301 USA

from SpiffWorkflow.exceptions import WorkflowException
from SpiffWorkflow.util.task import TaskState, TaskFilter
from SpiffWorkflow.util.task import TaskState, TaskFilter, TaskIterator
from SpiffWorkflow.specs.StartTask import StartTask
from SpiffWorkflow.specs.Join import Join

Expand Down Expand Up @@ -72,7 +72,7 @@ class BoundaryEventJoin(Join, BpmnTaskSpec):
def __init__(self, wf_spec, name, **kwargs):
super().__init__(wf_spec, name, **kwargs)

def _check_threshold_structured(self, my_task, force=False):
def _check_threshold_structured(self, my_task):
split_task = my_task.find_ancestor(self.split_task)
if split_task is None:
raise WorkflowException(f'Split at {self.split_task} was not reached', task_spec=self)
Expand All @@ -97,15 +97,15 @@ def _check_threshold_structured(self, my_task, force=False):
cancel += [main]
else:
cancel = []
return force or finished, cancel
return finished, cancel


class StartEventJoin(Join, BpmnTaskSpec):

def __init__(self, wf_spec, name, **kwargs):
super().__init__(wf_spec, name, **kwargs)

def _check_threshold_structured(self, my_task, force=False):
def _check_threshold_structured(self, my_task):

split_task = my_task.find_ancestor(self.split_task)
if split_task is None:
Expand All @@ -118,23 +118,21 @@ def _check_threshold_structured(self, my_task, force=False):
else:
waiting.append(task)

return force or may_fire, waiting
return may_fire, waiting


class _EndJoin(UnstructuredJoin, BpmnTaskSpec):

def _check_threshold_unstructured(self, my_task, force=False):
# Look at the tree to find all ready and waiting tasks (excluding
# ourself). The EndJoin waits for everyone!
waiting_tasks = []
for task in my_task.workflow.get_tasks(state=TaskState.READY|TaskState.WAITING):
if task.thread_id != my_task.thread_id:
def _check_threshold_unstructured(self, my_task):
# Look at the tree to find all ready and waiting tasks (excluding ourself). The EndJoin waits for everyone!
for task in TaskIterator(my_task.workflow.task_tree, state=TaskState.NOT_FINISHED_MASK, end_at_spec=self.name):
if task == my_task:
continue
if task.task_spec == my_task.task_spec:
continue
waiting_tasks.append(task)

return force or len(waiting_tasks) == 0, waiting_tasks
may_fire = False
break
else:
may_fire = True
return may_fire

def _run_hook(self, my_task):
result = super(_EndJoin, self)._run_hook(my_task)
Expand Down
37 changes: 19 additions & 18 deletions SpiffWorkflow/bpmn/specs/mixins/inclusive_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
# 02110-1301 USA

from SpiffWorkflow.bpmn.exceptions import WorkflowTaskException
from SpiffWorkflow.util.task import TaskState, TaskFilter
from SpiffWorkflow.util.task import TaskState
from SpiffWorkflow.specs.MultiChoice import MultiChoice
from .unstructured_join import UnstructuredJoin

Expand Down Expand Up @@ -68,39 +68,40 @@ def test(self):
MultiChoice.test(self)
UnstructuredJoin.test(self)

def _check_threshold_unstructured(self, my_task, force=False):
# Look at the tree to find all places where this task is used.
tasks = my_task.workflow.get_tasks(task_filter=TaskFilter(spec_name=self.name))
def _check_threshold_unstructured(self, my_task):
# Look at the tree to find all places where this task is used and unfinished tasks that may be ancestors
# If there are any, we may have to check whether this gateway is reachable from any of them.
tasks, sources = [], []
for task in my_task.workflow.get_tasks(end_at_spec=self.name):
if task.task_spec == self:
tasks.append(task)
elif task.has_state(TaskState.READY|TaskState.WAITING):
sources.append(task.task_spec)

# Look up which tasks have parents completed.
completed_inputs = set([ task.parent.task_spec for task in tasks if task.parent.state == TaskState.COMPLETED ])

# Find waiting tasks
# Exclude tasks whose specs have already been completed
# A spec only has to complete once, even if on multiple paths
waiting_tasks = []
# If any parents of this join have not been finished, this task must wait.
# A parent spec only has to be completed once, even it is on multiple paths
tasks_waiting = False
for task in tasks:
if task.parent.has_state(TaskState.DEFINITE_MASK) and task.parent.task_spec not in completed_inputs:
waiting_tasks.append(task.parent)
tasks_waiting = True
break

if force:
# If force is true, complete the task
complete = True
elif len(waiting_tasks) > 0:
# If we have waiting tasks, we're obviously not done
if tasks_waiting:
complete = False
else:
# Handle the case where there are paths from active tasks that must go through waiting inputs
waiting_inputs = [i for i in self.inputs if i not in completed_inputs]
task_filter = TaskFilter(state=TaskState.READY|TaskState.WAITING)
sources = [t.task_spec for t in my_task.workflow.get_tasks(task_filter=task_filter)]

# This will go back through a task spec's ancestors and return the source, if applicable
def check(spec):
for parent in spec.inputs:
return parent if parent in sources else check(parent)

# If we can get to a completed input from this task, we don't have to wait for it
# Start with the completed inputs and recurse back through its ancestors, removing any waiting tasks that
# could reach one of them.
for spec in completed_inputs:
source = check(spec)
if source is not None:
Expand All @@ -115,7 +116,7 @@ def check(spec):

complete = len(unfinished_paths) == 0

return complete, waiting_tasks
return complete

def _run_hook(self, my_task):
outputs = self._get_matching_outputs(my_task)
Expand Down
26 changes: 13 additions & 13 deletions SpiffWorkflow/bpmn/specs/mixins/parallel_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301 USA

from SpiffWorkflow.util.task import TaskState, TaskFilter
from SpiffWorkflow.util.task import TaskState
from .unstructured_join import UnstructuredJoin


Expand All @@ -41,11 +41,9 @@ class ParallelGateway(UnstructuredJoin):
Essentially, this means that we must wait until we have a completed parent
task on each incoming sequence.
"""
def _check_threshold_unstructured(self, my_task, force=False):
def _check_threshold_unstructured(self, my_task):

tasks = my_task.workflow.get_tasks(task_filter=TaskFilter(spec_name=self.name))
# Look up which tasks have parents completed.
waiting_tasks = []
tasks = my_task.workflow.get_tasks(spec_name=self.name)
waiting_inputs = set(self.inputs)

def remove_ancestor(task):
Expand All @@ -56,13 +54,15 @@ def remove_ancestor(task):
remove_ancestor(task.parent)

for task in tasks:
if task.parent.state == TaskState.COMPLETED and task.parent.task_spec in waiting_inputs:
waiting_inputs.remove(task.parent.task_spec)
# Do not wait for descendants of this task
elif task.is_descendant_of(my_task):
# Handle the case where the parallel gateway is part of a loop.
if task.is_descendant_of(my_task):
# This is the first iteration; we should not wait on this task, because it will not be reached
# until after this join completes
remove_ancestor(task)
# Ignore predicted tasks; we don't care about anything not definite
elif task.parent.has_state(TaskState.DEFINITE_MASK):
waiting_tasks.append(task.parent)
elif my_task.is_descendant_of(task):
# This is an subsequent iteration; we need to ignore the parents of previous iterations
continue
elif task.parent.state == TaskState.COMPLETED and task.parent.task_spec in waiting_inputs:
waiting_inputs.remove(task.parent.task_spec)

return force or len(waiting_inputs) == 0, waiting_tasks
return len(waiting_inputs) == 0
18 changes: 10 additions & 8 deletions SpiffWorkflow/bpmn/specs/mixins/subworkflow_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,20 @@ def __init__(self, wf_spec, bpmn_id, subworkflow_spec, transaction=False, **kwar

def _on_subworkflow_completed(self, subworkflow, my_task):
self.update_data(my_task, subworkflow)
# I don't like manually moving back to ready, but don't want to run it
# Ideally, update hook would create the subprocess and return True, _run would start the subprocess and
# return None (so that the state would transition to started), and the completed event for this task
# could be used to run post-completed actions automatically.
# However, until I align the events with state transitions, I don't want to encourage external use of
# callback methods (though completed event is not going to change).
my_task._set_state(TaskState.READY)

def _update_hook(self, my_task):
subprocess = my_task.workflow.top_workflow.subprocesses.get(my_task.id)
if subprocess is None:
super()._update_hook(my_task)
self.create_workflow(my_task)
self.start_workflow(my_task)
my_task._set_state(TaskState.WAITING)
my_task._set_state(TaskState.STARTED)
else:
return subprocess.is_completed()

Expand All @@ -59,21 +65,17 @@ def _on_cancel(self, my_task):

def copy_data(self, my_task, subworkflow):
start = subworkflow.get_next_task(spec_name='Start')
start.set_data(**my_task.data)
start.set_data(**deepcopy(my_task.data))

def update_data(self, my_task, subworkflow):
my_task.data = deepcopy(subworkflow.last_task.data)

def create_workflow(self, my_task):
def start_workflow(self, my_task):
subworkflow = my_task.workflow.top_workflow.create_subprocess(my_task, self.spec)
subworkflow.completed_event.connect(self._on_subworkflow_completed, my_task)

def start_workflow(self, my_task):
subworkflow = my_task.workflow.top_workflow.get_subprocess(my_task)
self.copy_data(my_task, subworkflow)
start = subworkflow.get_next_task(spec_name='Start')
start.run()
my_task._set_state(TaskState.WAITING)


class CallActivity(SubWorkflowTask):
Expand Down
68 changes: 31 additions & 37 deletions SpiffWorkflow/bpmn/specs/mixins/unstructured_join.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,53 +16,47 @@
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301 USA
from copy import deepcopy

from SpiffWorkflow.util.task import TaskState, TaskIterator
from SpiffWorkflow.specs.Join import Join


class UnstructuredJoin(Join):
"""
A helper subclass of Join that makes it work in a slightly friendlier way
for the BPMN style threading
"""
def _do_join(self, my_task):
split_task = self._get_split_task(my_task)
def _update_hook(self, my_task):

# Identify all corresponding task instances within the thread.
# Also remember which of those instances was most recently changed,
# because we are making this one the instance that will
# continue the thread of control. In other words, we will continue
# to build the task tree underneath the most recently changed task.
last_changed = None
thread_tasks = []
for task in TaskIterator(split_task, spec_name=self.name):
if task.thread_id != my_task.thread_id:
# Ignore tasks from other threads. (Do we need this condition?)
continue
if not task.parent.has_state(TaskState.FINISHED_MASK):
# For an inclusive join, this can happen - it's a future join
continue
if my_task.is_descendant_of(task):
# Skip ancestors (otherwise the branch this task is on will get dropped)
continue
# We have found a matching instance.
thread_tasks.append(task)
may_fire = self._check_threshold_unstructured(my_task)
other_tasks = [t for t in my_task.workflow.tasks.values()
if t.task_spec == self and t != my_task and t.state is TaskState.WAITING]
for task in other_tasks:
# By cancelling other waiting tasks immediately, we can prevent them from being updated repeeatedly and pointlessly
task.cancel()
if not may_fire:
# Only the most recent instance of the spec needs to wait.
my_task._set_state(TaskState.WAITING)
else:
# Only copy the data to the task that will proceed
my_task._inherit_data()
return may_fire

# Check whether the state of the instance was recently changed.
changed = task.parent.last_state_change
if last_changed is None or changed > last_changed.parent.last_state_change:
last_changed = task
def _run_hook(self, my_task):
other_tasks = filter(
lambda t: t.task_spec == self and t.has_state(TaskState.FINISHED_MASK) and not my_task.is_descendant_of(t),
my_task.workflow.tasks.values()
)
for task in sorted(other_tasks, key=lambda t: t.last_state_change):
# By inheriting directly from parent tasks, we can avoid copying previouly merged data

# Update data from all the same thread tasks.
thread_tasks.sort(key=lambda t: t.parent.last_state_change)
collected_data = {}
for task in thread_tasks:
collected_data.update(task.data)
my_task.set_data(**deepcopy(task.parent.data))
# This condition only applies when a workflow is reset inside a parallel branch.
# If reset to a branch that was originally cancelled, all the descendants of the previously completed branch will still
# appear in the tree, potentially corrupting the structure and data.
if task.has_state(TaskState.COMPLETED):
task._drop_children(force=True)

for task in thread_tasks:
if task != last_changed:
task._set_state(TaskState.CANCELLED)
task._drop_children()
else:
task.data.update(collected_data)
# My task is not finished, so won't be included above.
my_task._inherit_data()
return True
Loading

0 comments on commit ca2a50c

Please sign in to comment.