diff --git a/SpiffWorkflow/bpmn/specs/control.py b/SpiffWorkflow/bpmn/specs/control.py index 40dc736b..24f91b4a 100644 --- a/SpiffWorkflow/bpmn/specs/control.py +++ b/SpiffWorkflow/bpmn/specs/control.py @@ -18,7 +18,7 @@ # 02110-1301 USA from SpiffWorkflow.exceptions import WorkflowException -from SpiffWorkflow.util.task import TaskState, TaskFilter +from SpiffWorkflow.util.task import TaskState, TaskFilter, TaskIterator from SpiffWorkflow.specs.StartTask import StartTask from SpiffWorkflow.specs.Join import Join @@ -72,7 +72,7 @@ class BoundaryEventJoin(Join, BpmnTaskSpec): def __init__(self, wf_spec, name, **kwargs): super().__init__(wf_spec, name, **kwargs) - def _check_threshold_structured(self, my_task, force=False): + def _check_threshold_structured(self, my_task): split_task = my_task.find_ancestor(self.split_task) if split_task is None: raise WorkflowException(f'Split at {self.split_task} was not reached', task_spec=self) @@ -97,7 +97,7 @@ def _check_threshold_structured(self, my_task, force=False): cancel += [main] else: cancel = [] - return force or finished, cancel + return finished, cancel class StartEventJoin(Join, BpmnTaskSpec): @@ -105,7 +105,7 @@ class StartEventJoin(Join, BpmnTaskSpec): def __init__(self, wf_spec, name, **kwargs): super().__init__(wf_spec, name, **kwargs) - def _check_threshold_structured(self, my_task, force=False): + def _check_threshold_structured(self, my_task): split_task = my_task.find_ancestor(self.split_task) if split_task is None: @@ -118,23 +118,21 @@ def _check_threshold_structured(self, my_task, force=False): else: waiting.append(task) - return force or may_fire, waiting + return may_fire, waiting class _EndJoin(UnstructuredJoin, BpmnTaskSpec): - def _check_threshold_unstructured(self, my_task, force=False): - # Look at the tree to find all ready and waiting tasks (excluding - # ourself). The EndJoin waits for everyone! - waiting_tasks = [] - for task in my_task.workflow.get_tasks(state=TaskState.READY|TaskState.WAITING): - if task.thread_id != my_task.thread_id: + def _check_threshold_unstructured(self, my_task): + # Look at the tree to find all ready and waiting tasks (excluding ourself). The EndJoin waits for everyone! + for task in TaskIterator(my_task.workflow.task_tree, state=TaskState.NOT_FINISHED_MASK, end_at_spec=self.name): + if task == my_task: continue - if task.task_spec == my_task.task_spec: - continue - waiting_tasks.append(task) - - return force or len(waiting_tasks) == 0, waiting_tasks + may_fire = False + break + else: + may_fire = True + return may_fire def _run_hook(self, my_task): result = super(_EndJoin, self)._run_hook(my_task) diff --git a/SpiffWorkflow/bpmn/specs/mixins/inclusive_gateway.py b/SpiffWorkflow/bpmn/specs/mixins/inclusive_gateway.py index bb3d87be..ca5b5d72 100644 --- a/SpiffWorkflow/bpmn/specs/mixins/inclusive_gateway.py +++ b/SpiffWorkflow/bpmn/specs/mixins/inclusive_gateway.py @@ -18,7 +18,7 @@ # 02110-1301 USA from SpiffWorkflow.bpmn.exceptions import WorkflowTaskException -from SpiffWorkflow.util.task import TaskState, TaskFilter +from SpiffWorkflow.util.task import TaskState from SpiffWorkflow.specs.MultiChoice import MultiChoice from .unstructured_join import UnstructuredJoin @@ -68,39 +68,40 @@ def test(self): MultiChoice.test(self) UnstructuredJoin.test(self) - def _check_threshold_unstructured(self, my_task, force=False): - # Look at the tree to find all places where this task is used. - tasks = my_task.workflow.get_tasks(task_filter=TaskFilter(spec_name=self.name)) + def _check_threshold_unstructured(self, my_task): + # Look at the tree to find all places where this task is used and unfinished tasks that may be ancestors + # If there are any, we may have to check whether this gateway is reachable from any of them. + tasks, sources = [], [] + for task in my_task.workflow.get_tasks(end_at_spec=self.name): + if task.task_spec == self: + tasks.append(task) + elif task.has_state(TaskState.READY|TaskState.WAITING): + sources.append(task.task_spec) # Look up which tasks have parents completed. completed_inputs = set([ task.parent.task_spec for task in tasks if task.parent.state == TaskState.COMPLETED ]) - # Find waiting tasks - # Exclude tasks whose specs have already been completed - # A spec only has to complete once, even if on multiple paths - waiting_tasks = [] + # If any parents of this join have not been finished, this task must wait. + # A parent spec only has to be completed once, even it is on multiple paths + tasks_waiting = False for task in tasks: if task.parent.has_state(TaskState.DEFINITE_MASK) and task.parent.task_spec not in completed_inputs: - waiting_tasks.append(task.parent) + tasks_waiting = True + break - if force: - # If force is true, complete the task - complete = True - elif len(waiting_tasks) > 0: - # If we have waiting tasks, we're obviously not done + if tasks_waiting: complete = False else: # Handle the case where there are paths from active tasks that must go through waiting inputs waiting_inputs = [i for i in self.inputs if i not in completed_inputs] - task_filter = TaskFilter(state=TaskState.READY|TaskState.WAITING) - sources = [t.task_spec for t in my_task.workflow.get_tasks(task_filter=task_filter)] # This will go back through a task spec's ancestors and return the source, if applicable def check(spec): for parent in spec.inputs: return parent if parent in sources else check(parent) - # If we can get to a completed input from this task, we don't have to wait for it + # Start with the completed inputs and recurse back through its ancestors, removing any waiting tasks that + # could reach one of them. for spec in completed_inputs: source = check(spec) if source is not None: @@ -115,7 +116,7 @@ def check(spec): complete = len(unfinished_paths) == 0 - return complete, waiting_tasks + return complete def _run_hook(self, my_task): outputs = self._get_matching_outputs(my_task) diff --git a/SpiffWorkflow/bpmn/specs/mixins/parallel_gateway.py b/SpiffWorkflow/bpmn/specs/mixins/parallel_gateway.py index 110fa8e0..b7fbf79b 100644 --- a/SpiffWorkflow/bpmn/specs/mixins/parallel_gateway.py +++ b/SpiffWorkflow/bpmn/specs/mixins/parallel_gateway.py @@ -17,7 +17,7 @@ # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA # 02110-1301 USA -from SpiffWorkflow.util.task import TaskState, TaskFilter +from SpiffWorkflow.util.task import TaskState from .unstructured_join import UnstructuredJoin @@ -41,11 +41,9 @@ class ParallelGateway(UnstructuredJoin): Essentially, this means that we must wait until we have a completed parent task on each incoming sequence. """ - def _check_threshold_unstructured(self, my_task, force=False): + def _check_threshold_unstructured(self, my_task): - tasks = my_task.workflow.get_tasks(task_filter=TaskFilter(spec_name=self.name)) - # Look up which tasks have parents completed. - waiting_tasks = [] + tasks = my_task.workflow.get_tasks(spec_name=self.name) waiting_inputs = set(self.inputs) def remove_ancestor(task): @@ -56,13 +54,15 @@ def remove_ancestor(task): remove_ancestor(task.parent) for task in tasks: - if task.parent.state == TaskState.COMPLETED and task.parent.task_spec in waiting_inputs: - waiting_inputs.remove(task.parent.task_spec) - # Do not wait for descendants of this task - elif task.is_descendant_of(my_task): + # Handle the case where the parallel gateway is part of a loop. + if task.is_descendant_of(my_task): + # This is the first iteration; we should not wait on this task, because it will not be reached + # until after this join completes remove_ancestor(task) - # Ignore predicted tasks; we don't care about anything not definite - elif task.parent.has_state(TaskState.DEFINITE_MASK): - waiting_tasks.append(task.parent) + elif my_task.is_descendant_of(task): + # This is an subsequent iteration; we need to ignore the parents of previous iterations + continue + elif task.parent.state == TaskState.COMPLETED and task.parent.task_spec in waiting_inputs: + waiting_inputs.remove(task.parent.task_spec) - return force or len(waiting_inputs) == 0, waiting_tasks + return len(waiting_inputs) == 0 diff --git a/SpiffWorkflow/bpmn/specs/mixins/subworkflow_task.py b/SpiffWorkflow/bpmn/specs/mixins/subworkflow_task.py index 80e11c57..b2392f71 100644 --- a/SpiffWorkflow/bpmn/specs/mixins/subworkflow_task.py +++ b/SpiffWorkflow/bpmn/specs/mixins/subworkflow_task.py @@ -41,6 +41,14 @@ def __init__(self, wf_spec, bpmn_id, subworkflow_spec, transaction=False, **kwar def _on_subworkflow_completed(self, subworkflow, my_task): self.update_data(my_task, subworkflow) + # I don't like manually moving back to ready, but don't want to run it + # Ideally, update hook would create the subprocess and return True, _run would start the subprocess and + # return None (so that the state would transition to started), and the completed event for this task + # could be used to run post-completed actions automatically. + # However, until I align the events with state transitions, I don't want to encourage external use of + # callback methods (though completed event is not going to change). + if my_task.state is TaskState.COMPLETED: + my_task._set_state(TaskState.READY) def _update_hook(self, my_task): subprocess = my_task.workflow.top_workflow.subprocesses.get(my_task.id) @@ -48,7 +56,7 @@ def _update_hook(self, my_task): super()._update_hook(my_task) self.create_workflow(my_task) self.start_workflow(my_task) - my_task._set_state(TaskState.WAITING) + my_task._set_state(TaskState.STARTED) else: return subprocess.is_completed() diff --git a/SpiffWorkflow/bpmn/specs/mixins/unstructured_join.py b/SpiffWorkflow/bpmn/specs/mixins/unstructured_join.py index c7f5e835..64be05b8 100644 --- a/SpiffWorkflow/bpmn/specs/mixins/unstructured_join.py +++ b/SpiffWorkflow/bpmn/specs/mixins/unstructured_join.py @@ -26,43 +26,33 @@ class UnstructuredJoin(Join): A helper subclass of Join that makes it work in a slightly friendlier way for the BPMN style threading """ - def _do_join(self, my_task): - split_task = self._get_split_task(my_task) + def _update_hook(self, my_task): - # Identify all corresponding task instances within the thread. - # Also remember which of those instances was most recently changed, - # because we are making this one the instance that will - # continue the thread of control. In other words, we will continue - # to build the task tree underneath the most recently changed task. - last_changed = None - thread_tasks = [] - for task in TaskIterator(split_task, spec_name=self.name): - if task.thread_id != my_task.thread_id: - # Ignore tasks from other threads. (Do we need this condition?) - continue - if not task.parent.has_state(TaskState.FINISHED_MASK): - # For an inclusive join, this can happen - it's a future join - continue - if my_task.is_descendant_of(task): - # Skip ancestors (otherwise the branch this task is on will get dropped) - continue - # We have found a matching instance. - thread_tasks.append(task) + may_fire = self._check_threshold_unstructured(my_task) + other_tasks = [t for t in my_task.workflow.tasks.values() if t.task_spec == self and t != my_task and t.state is TaskState.WAITING] + for task in other_tasks: + # By cancelling other waiting tasks immediately, we can prevent them from being updated repeeatedly and pointlessly + task.cancel() + if not may_fire: + # Only the most recent instance of the spec needs to wait. + my_task._set_state(TaskState.WAITING) + else: + # Only copy the data to the task that will proceed + my_task._inherit_data() + return may_fire - # Check whether the state of the instance was recently changed. - changed = task.parent.last_state_change - if last_changed is None or changed > last_changed.parent.last_state_change: - last_changed = task - - # Update data from all the same thread tasks. - thread_tasks.sort(key=lambda t: t.parent.last_state_change) - collected_data = {} - for task in thread_tasks: - collected_data.update(task.data) - - for task in thread_tasks: - if task != last_changed: - task._set_state(TaskState.CANCELLED) - task._drop_children() - else: - task.data.update(collected_data) + def _run_hook(self, my_task): + other_tasks = filter( + lambda t: t.task_spec == self and t != my_task and t.has_state(TaskState.FINISHED_MASK) and not my_task.is_descendant_of(t), + my_task.workflow.tasks.values() + ) + pass + for task in sorted(other_tasks, key=lambda t: t.last_state_change): + # By inheriting directly from parent tasks, we can avoid copying previouly merged data + my_task.data.update(task.parent.data) + # This condition only applies when a workflow is reset inside a parallel branch. + # If reset to a branch that was originally cancelled, all the descendants of the previously completed branch will still + # appear in the tree, potentially corrupting the structure and data. + if task.has_state(TaskState.COMPLETED): + task._drop_children(force=True) + return True \ No newline at end of file diff --git a/SpiffWorkflow/bpmn/util/task.py b/SpiffWorkflow/bpmn/util/task.py index d275c7a8..c167f8d5 100644 --- a/SpiffWorkflow/bpmn/util/task.py +++ b/SpiffWorkflow/bpmn/util/task.py @@ -17,7 +17,7 @@ # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA # 02110-1301 USA -from SpiffWorkflow.util.task import TaskFilter, TaskIterator +from SpiffWorkflow.util.task import TaskFilter, TaskIterator, TaskState from SpiffWorkflow.bpmn.specs.mixins.events.event_types import CatchingEvent class BpmnTaskFilter(TaskFilter): @@ -54,27 +54,28 @@ def _next(self): task = self.task_list.pop(0) subprocess = task.workflow.top_workflow.subprocesses.get(task.id) - if task.task_spec.name == self.end_at_spec: - self.task_list = [] - elif all([ + if all([ len(task._children) > 0 or subprocess is not None, task.state >= self.min_state or subprocess is not None, self.depth < self.max_depth, + task.task_spec.name != self.end_at_spec, ]): - if subprocess is None: - next_tasks = task.children - elif self.depth_first: - next_tasks = [subprocess.task_tree] + task.children + # Do not descend into a completed subprocess to look for unfinished tasks. + if subprocess is None or (task.state >= TaskState.FINISHED_MASK and self.task_filter.state <= TaskState.FINISHED_MASK): + subprocess_tasks = [] else: - next_tasks = task.children + [subprocess.task_tree] + subprocess_tasks = [subprocess.task_tree] if self.depth_first: + next_tasks = subprocess_tasks + task.children self.task_list = next_tasks + self.task_list else: + next_tasks = task.children + subprocess_tasks self.task_list.extend(next_tasks) + self._update_depth(task) elif self.depth_first and len(self.task_list) > 0: self._handle_leaf_depth(task) - return task \ No newline at end of file + return task diff --git a/SpiffWorkflow/specs/Join.py b/SpiffWorkflow/specs/Join.py index 3334a273..23b714ae 100644 --- a/SpiffWorkflow/specs/Join.py +++ b/SpiffWorkflow/specs/Join.py @@ -97,22 +97,41 @@ def __init__(self, self.threshold = threshold self.cancel_remaining = cancel - def _branch_is_complete(self, my_task): - # Determine whether that branch is now completed by checking whether - # it has any waiting items other than myself in it. - skip = None - for task in TaskIterator(my_task, state=TaskState.NOT_FINISHED_MASK): - # If the current task is a child of myself, ignore it. - if skip is not None and task.is_descendant_of(skip): - continue - if task.task_spec == self: - skip = task + def _check_threshold_unstructured(self, my_task): + # This method is extremely poorly named. It is called where there is no split task, but whether or not + # there is a known split is actually irrelevant. The distinction that actually needs to be made is + # "Do we have to look at unfinshed tasks to find out if any of the might pass through this task?" vs + # "Can we make a distinction solely by looking at our own completed inputs?" + + # The default threshold is the number of inputs. + threshold = valueof(my_task, self.threshold) + if threshold is None: + threshold = len(self.inputs) + + # Find all places where this task spec is used and check whether enough inputs have completed to meet the threshold + # Omit building the list of waiting tasks unless they need to be cancelled if the threshold is met + waiting_tasks = [] + completed = 0 + spec_names = [ts.name for ts in self.inputs] + for task in TaskIterator(my_task.workflow.task_tree, end_at_spec=self.name): + if not task.task_spec.name in spec_names: continue - return False - return True + if task.parent is None or task.has_state(TaskState.COMPLETED): + completed += 1 + elif not task.has_state(TaskState.FINISHED_MASK) and self.cancel_remaining: + waiting_tasks.append(task) + if completed >= threshold: + may_fire = True + if not self.cancel_remaining: + break + else: + may_fire = False + + # If the threshold was reached, get ready to fire. + return may_fire, waiting_tasks - def _branch_may_merge_at(self, task): - for child in task: + def _branch_may_merge(self, task): + for child in TaskIterator(task, end_at_spec=self.name): # Ignore tasks that were created by a trigger. if child.triggered: continue @@ -126,54 +145,19 @@ def _branch_may_merge_at(self, task): return True return False - def _get_split_task(self, my_task): - # One Join spec may have multiple corresponding Task objects:: - # - # - Due to the MultiInstance pattern. - # - Due to the ThreadSplit pattern. - # - # When using the MultiInstance pattern, we want to join across - # the resulting task instances. When using the ThreadSplit - # pattern, we only join within the same thread. (Both patterns - # may also be mixed.) - # - # We are looking for all task instances that must be joined. - # We limit our search by starting at the split point. - if self.split_task: - split_task = my_task.find_ancestor(self.split_task) - else: - split_task = my_task.workflow.task_tree - return split_task - - def _check_threshold_unstructured(self, my_task, force=False): - # The default threshold is the number of inputs. - threshold = valueof(my_task, self.threshold) - if threshold is None: - threshold = len(self.inputs) - - # Look at the tree to find all places where this task is used. - tasks = [] - for spec in self.inputs: - tasks.extend([t for t in my_task.workflow.get_tasks(spec_name=spec.name) if t.thread_id == my_task.thread_id]) - - # Look up which tasks have already completed. - waiting_tasks = [] - completed = 0 - for task in tasks: - if task.parent is None or task.has_state(TaskState.COMPLETED): - completed += 1 - elif not task.has_state(TaskState.FINISHED_MASK): - waiting_tasks.append(task) - - # If the threshold was reached, get ready to fire. - return force or completed >= threshold, waiting_tasks + def _branch_is_complete(self, task): + # Determine whether that branch is now completed by checking whether + # it has any waiting items other than myself in it. + for child in TaskIterator(task, state=TaskState.NOT_FINISHED_MASK, end_at_spec=self.name): + if child.task_spec != self: + return False + return True - def _check_threshold_structured(self, my_task, force=False): + def _check_threshold_structured(self, my_task): # Retrieve a list of all activated tasks from the associated task that did the conditional parallel split. split_task = my_task.find_ancestor(self.split_task) if split_task is None: - msg = 'Join with %s, which was not reached' % self.split_task - raise WorkflowException(msg, task_spec=self) + raise WorkflowException(f'Split task {self.split_task} which was not reached', task_spec=self) tasks = split_task.task_spec._get_activated_tasks(split_task, my_task) # The default threshold is the number of branches that were started. @@ -184,38 +168,27 @@ def _check_threshold_structured(self, my_task, force=False): # Look up which tasks have already completed. waiting_tasks = [] completed = 0 + for task in tasks: - if not self._branch_may_merge_at(task): + if self._branch_is_complete(task): completed += 1 - elif self._branch_is_complete(task): + elif not self._branch_may_merge(task): completed += 1 else: waiting_tasks.append(task) # If the threshold was reached, get ready to fire. - return force or completed >= threshold, waiting_tasks + return completed >= threshold, waiting_tasks - def _start(self, my_task, force=False): - """ - Checks whether the preconditions for going to READY state are met. - Returns True if the threshold was reached, False otherwise. - Also returns the list of tasks that yet need to be completed. - """ - if my_task.has_state(TaskState.FINISHED_MASK): - return False, None - if my_task.has_state(TaskState.READY): - return True, None + def _update_hook(self, my_task): - # Check whether we may fire. + # Check whether enough incoming branches have completed. + my_task._inherit_data() if self.split_task is None: - return self._check_threshold_unstructured(my_task, force) + may_fire, waiting_tasks = self._check_threshold_unstructured(my_task) else: - return self._check_threshold_structured(my_task, force) + may_fire, waiting_tasks = self._check_threshold_structured(my_task) - def _update_hook(self, my_task): - # Check whether enough incoming branches have completed. - my_task._inherit_data() - may_fire, waiting_tasks = self._start(my_task) if may_fire: # If this is a cancelling join, cancel all incoming branches except for the one that just completed. if self.cancel_remaining: @@ -223,16 +196,17 @@ def _update_hook(self, my_task): task.cancel() # Update the state of our child objects. self._do_join(my_task) - return True elif not my_task.has_state(TaskState.FINISHED_MASK): my_task._set_state(TaskState.WAITING) + return may_fire + def _find_tasks(self, my_task): - split_task = self._get_split_task(my_task) or my_task.workflow.task_tree + split_task = my_task.find_ancestor(self.split_task) or my_task.workflow.task_tree # Identify all corresponding task instances within the thread. thread_tasks = [] - for task in TaskIterator(split_task, spec_name=self.name): + for task in TaskIterator(split_task, spec_name=self.name, end_at_spec=self.name): # Ignore tasks from other threads. if task.thread_id != my_task.thread_id: continue diff --git a/SpiffWorkflow/specs/ThreadMerge.py b/SpiffWorkflow/specs/ThreadMerge.py index 8e026790..9e69d287 100644 --- a/SpiffWorkflow/specs/ThreadMerge.py +++ b/SpiffWorkflow/specs/ThreadMerge.py @@ -112,8 +112,7 @@ def _update_hook(self, my_task): if self.split_task and task.is_descendant_of(my_task): continue changed = task.parent.last_state_change - if last_changed is None \ - or changed > last_changed.parent.last_state_change: + if last_changed is None or changed > last_changed.parent.last_state_change: last_changed = task tasks.append(task) diff --git a/SpiffWorkflow/util/task.py b/SpiffWorkflow/util/task.py index 4c3378f0..857724e9 100644 --- a/SpiffWorkflow/util/task.py +++ b/SpiffWorkflow/util/task.py @@ -218,13 +218,11 @@ def _next(self): raise StopIteration() task = self.task_list.pop(0) - - if task.task_spec.name == self.end_at_spec: - self.task_list = [] - elif all([ + if all([ len(task._children) > 0, task.state >= self.min_state, self.depth < self.max_depth, + task.task_spec.name != self.end_at_spec, ]): if self.depth_first: self.task_list = task.children + self.task_list diff --git a/tests/SpiffWorkflow/bpmn/CollaborationTest.py b/tests/SpiffWorkflow/bpmn/CollaborationTest.py index 782f736a..6e834247 100644 --- a/tests/SpiffWorkflow/bpmn/CollaborationTest.py +++ b/tests/SpiffWorkflow/bpmn/CollaborationTest.py @@ -35,7 +35,7 @@ def testCollaboration(self): buddy = self.workflow.get_next_task(spec_name='process_buddy') self.assertIsInstance(buddy.task_spec, CallActivityMixin) self.assertEqual(buddy.task_spec.spec, 'process_buddy') - self.assertEqual(buddy.state, TaskState.WAITING) + self.assertEqual(buddy.state, TaskState.STARTED) def testBpmnMessage(self): diff --git a/tests/SpiffWorkflow/bpmn/IOSpecTest.py b/tests/SpiffWorkflow/bpmn/IOSpecTest.py index ce5030fb..61b7d066 100644 --- a/tests/SpiffWorkflow/bpmn/IOSpecTest.py +++ b/tests/SpiffWorkflow/bpmn/IOSpecTest.py @@ -61,9 +61,8 @@ def actual_test(self, save_restore=False): self.assertNotIn('unused', task.data) self.complete_subprocess() - # Refreshing causes the subprocess to become ready - self.workflow.refresh_waiting_tasks() - task = self.workflow.get_next_task(state=TaskState.READY) + # This is the subprocess + task = self.workflow.get_next_task(spec_name='Activity_1wdjypm') # Originals should not change self.assertEqual(task.data['in_1'], 1) self.assertEqual(task.data['in_2'], "hello world") @@ -74,11 +73,11 @@ def actual_test(self, save_restore=False): def advance_to_subprocess(self): # Once we enter the subworkflow it becomes a waiting task - waiting = self.workflow.get_tasks(state=TaskState.WAITING) - while len(waiting) == 0: + started = self.workflow.get_tasks(state=TaskState.STARTED) + while len(started) == 0: next_task = self.workflow.get_next_task(state=TaskState.READY) next_task.run() - waiting = self.workflow.get_tasks(state=TaskState.WAITING) + started = self.workflow.get_tasks(state=TaskState.STARTED) def complete_subprocess(self): # Complete the ready tasks in the subprocess diff --git a/tests/SpiffWorkflow/bpmn/NestedProcessesTest.py b/tests/SpiffWorkflow/bpmn/NestedProcessesTest.py index fe862620..46ec354a 100644 --- a/tests/SpiffWorkflow/bpmn/NestedProcessesTest.py +++ b/tests/SpiffWorkflow/bpmn/NestedProcessesTest.py @@ -68,7 +68,7 @@ def testResetToSubworkflow(self): self.workflow.do_engine_steps() self.assertEqual(len(self.workflow.subprocesses), 1) - self.assertEqual(task.state, TaskState.WAITING) + self.assertEqual(task.state, TaskState.STARTED) self.complete_task('Action2', True) self.complete_task('Action3', True) self.assertTrue(self.workflow.is_completed()) diff --git a/tests/SpiffWorkflow/bpmn/events/ActionManagementTest.py b/tests/SpiffWorkflow/bpmn/events/ActionManagementTest.py index 5b5036e7..a3d0df1a 100644 --- a/tests/SpiffWorkflow/bpmn/events/ActionManagementTest.py +++ b/tests/SpiffWorkflow/bpmn/events/ActionManagementTest.py @@ -40,7 +40,8 @@ def testRunThroughHappy(self): time.sleep(self.START_TIME_DELTA) self.workflow.refresh_waiting_tasks() self.workflow.do_engine_steps() - self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.WAITING))) + self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.WAITING))) + self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.STARTED))) self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.READY))) self.do_next_named_step("Start Work") @@ -62,18 +63,21 @@ def testRunThroughOverdue(self): time.sleep(self.START_TIME_DELTA) self.workflow.refresh_waiting_tasks() self.workflow.do_engine_steps() - self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.WAITING))) + self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.WAITING))) + self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.STARTED))) self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.READY))) self.do_next_named_step("Start Work") self.workflow.do_engine_steps() - self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.WAITING))) - self.assertEqual('Finish Time', self.workflow.get_tasks(state=TaskState.WAITING)[1].task_spec.bpmn_name) + self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.WAITING))) + self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.STARTED))) + self.assertEqual('Finish Time', self.workflow.get_next_task(state=TaskState.WAITING).task_spec.bpmn_name) time.sleep(self.FINISH_TIME_DELTA) self.workflow.refresh_waiting_tasks() self.workflow.do_engine_steps() - self.assertEqual(3, len(self.workflow.get_tasks(state=TaskState.WAITING))) + self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.WAITING))) + self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.STARTED))) self.assertNotEqual('Finish Time', self.workflow.get_tasks(state=TaskState.WAITING)[0].task_spec.bpmn_name) overdue_escalation_task = [ @@ -109,10 +113,12 @@ def testRunThroughCancelAfterWorkStarted(self): self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.WAITING))) self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.READY))) + time.sleep(self.START_TIME_DELTA) self.workflow.refresh_waiting_tasks() self.workflow.do_engine_steps() - self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.WAITING))) + self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.WAITING))) + self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.STARTED))) self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.READY))) self.do_next_named_step("Start Work") diff --git a/tests/SpiffWorkflow/bpmn/events/MessageInterruptsSpTest.py b/tests/SpiffWorkflow/bpmn/events/MessageInterruptsSpTest.py index 897ac851..d89a3e35 100644 --- a/tests/SpiffWorkflow/bpmn/events/MessageInterruptsSpTest.py +++ b/tests/SpiffWorkflow/bpmn/events/MessageInterruptsSpTest.py @@ -24,7 +24,8 @@ def testRunThroughHappySaveAndRestore(self): self.save_restore() self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.READY))) - self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.WAITING))) + self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.STARTED))) + self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.WAITING))) self.do_next_exclusive_step('Do Something In a Subprocess') self.workflow.do_engine_steps() @@ -35,7 +36,7 @@ def testRunThroughHappySaveAndRestore(self): self.save_restore() self.workflow.do_engine_steps() - self.assertEqual(0, len(self.workflow.get_tasks(state=TaskState.READY|TaskState.WAITING))) + self.assertTrue(self.workflow.is_completed()) def testRunThroughInterruptSaveAndRestore(self): @@ -46,7 +47,8 @@ def testRunThroughInterruptSaveAndRestore(self): self.save_restore() self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.READY))) - self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.WAITING))) + self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.STARTED))) + self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.WAITING))) self.workflow.catch(BpmnEvent(MessageEventDefinition('Test Message'), {})) self.workflow.do_engine_steps() @@ -57,4 +59,4 @@ def testRunThroughInterruptSaveAndRestore(self): self.save_restore() self.workflow.do_engine_steps() - self.assertEqual(0, len(self.workflow.get_tasks(state=TaskState.READY|TaskState.WAITING))) + self.assertTrue(self.workflow.is_completed()) diff --git a/tests/SpiffWorkflow/bpmn/events/MessageInterruptsTest.py b/tests/SpiffWorkflow/bpmn/events/MessageInterruptsTest.py index 9b959699..7a587c78 100644 --- a/tests/SpiffWorkflow/bpmn/events/MessageInterruptsTest.py +++ b/tests/SpiffWorkflow/bpmn/events/MessageInterruptsTest.py @@ -24,7 +24,8 @@ def testRunThroughHappySaveAndRestore(self): self.save_restore() self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.READY))) - self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.WAITING))) + self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.STARTED))) + self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.WAITING))) self.do_next_exclusive_step('Do Something That Takes A Long Time') self.save_restore() @@ -35,7 +36,7 @@ def testRunThroughHappySaveAndRestore(self): self.save_restore() self.workflow.do_engine_steps() - self.assertEqual(0, len(self.workflow.get_tasks(state=TaskState.READY|TaskState.WAITING))) + self.assertTrue(self.workflow.is_completed()) def testRunThroughMessageInterruptSaveAndRestore(self): @@ -46,14 +47,15 @@ def testRunThroughMessageInterruptSaveAndRestore(self): self.save_restore() self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.READY))) - self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.WAITING))) + self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.STARTED))) + self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.WAITING))) self.workflow.catch(BpmnEvent(MessageEventDefinition('Test Message'), {})) self.save_restore() self.workflow.do_engine_steps() self.save_restore() - self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.WAITING))) + self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.STARTED))) self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.READY))) self.do_next_exclusive_step('Acknowledge Interrupt Message') @@ -61,7 +63,7 @@ def testRunThroughMessageInterruptSaveAndRestore(self): self.workflow.do_engine_steps() self.save_restore() - self.assertEqual(0, len(self.workflow.get_tasks(state=TaskState.READY|TaskState.WAITING))) + self.assertTrue(self.workflow.is_completed()) def testRunThroughHappy(self): @@ -70,7 +72,8 @@ def testRunThroughHappy(self): self.workflow.do_engine_steps() self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.READY))) - self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.WAITING))) + self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.STARTED))) + self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.WAITING))) self.do_next_exclusive_step('Do Something That Takes A Long Time') @@ -78,7 +81,7 @@ def testRunThroughHappy(self): self.assertEqual(0, len(self.workflow.get_tasks(state=TaskState.WAITING))) self.workflow.do_engine_steps() - self.assertEqual(0, len(self.workflow.get_tasks(state=TaskState.READY|TaskState.WAITING))) + self.assertTrue(self.workflow.is_completed()) def testRunThroughMessageInterrupt(self): @@ -87,15 +90,16 @@ def testRunThroughMessageInterrupt(self): self.workflow.do_engine_steps() self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.READY))) - self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.WAITING))) + self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.STARTED))) + self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.WAITING))) self.workflow.catch(BpmnEvent(MessageEventDefinition('Test Message'), {})) self.workflow.do_engine_steps() - self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.WAITING))) + self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.STARTED))) self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.READY))) self.do_next_exclusive_step('Acknowledge Interrupt Message') self.workflow.do_engine_steps() - self.assertEqual(0, len(self.workflow.get_tasks(state=TaskState.READY|TaskState.WAITING))) + self.assertTrue(self.workflow.is_completed()) diff --git a/tests/SpiffWorkflow/bpmn/events/MessageNonInterruptTest.py b/tests/SpiffWorkflow/bpmn/events/MessageNonInterruptTest.py index a9a68f48..386fd894 100644 --- a/tests/SpiffWorkflow/bpmn/events/MessageNonInterruptTest.py +++ b/tests/SpiffWorkflow/bpmn/events/MessageNonInterruptTest.py @@ -24,7 +24,8 @@ def testRunThroughHappySaveAndRestore(self): self.save_restore() self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.READY))) - self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.WAITING))) + self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.STARTED))) + self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.WAITING))) self.do_next_exclusive_step('Do Something That Takes A Long Time') self.save_restore() @@ -35,7 +36,7 @@ def testRunThroughHappySaveAndRestore(self): self.save_restore() self.workflow.do_engine_steps() - self.assertEqual(0, len(self.workflow.get_tasks(state=TaskState.READY|TaskState.WAITING))) + self.assertTrue(self.workflow.is_completed()) def testRunThroughMessageInterruptSaveAndRestore(self): @@ -46,13 +47,15 @@ def testRunThroughMessageInterruptSaveAndRestore(self): self.save_restore() self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.READY))) - self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.WAITING))) + self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.STARTED))) + self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.WAITING))) self.workflow.catch(BpmnEvent(MessageEventDefinition('Test Message'), {})) self.save_restore() self.workflow.do_engine_steps() - self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.WAITING))) + self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.STARTED))) + self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.WAITING))) self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.READY))) self.do_next_named_step('Acknowledge Non-Interrupt Message') @@ -68,7 +71,7 @@ def testRunThroughMessageInterruptSaveAndRestore(self): self.save_restore() self.workflow.do_engine_steps() - self.assertEqual(0, len(self.workflow.get_tasks(state=TaskState.READY|TaskState.WAITING))) + self.assertTrue(self.workflow.is_completed()) def testRunThroughHappy(self): @@ -77,7 +80,8 @@ def testRunThroughHappy(self): self.workflow.do_engine_steps() self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.READY))) - self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.WAITING))) + self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.STARTED))) + self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.WAITING))) self.do_next_exclusive_step('Do Something That Takes A Long Time') @@ -85,7 +89,7 @@ def testRunThroughHappy(self): self.assertEqual(0, len(self.workflow.get_tasks(state=TaskState.WAITING))) self.workflow.do_engine_steps() - self.assertEqual(0, len(self.workflow.get_tasks(state=TaskState.READY|TaskState.WAITING))) + self.assertTrue(self.workflow.is_completed()) def testRunThroughMessageInterrupt(self): @@ -94,24 +98,27 @@ def testRunThroughMessageInterrupt(self): self.workflow.do_engine_steps() self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.READY))) - self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.WAITING))) + self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.STARTED))) + self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.WAITING))) self.workflow.catch(BpmnEvent(MessageEventDefinition('Test Message'), {})) self.workflow.do_engine_steps() - self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.WAITING))) + self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.STARTED))) + self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.WAITING))) self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.READY))) self.do_next_named_step('Acknowledge Non-Interrupt Message') self.workflow.do_engine_steps() self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.READY))) - self.assertEqual(3, len(self.workflow.get_tasks(state=TaskState.WAITING))) + self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.STARTED))) + self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.WAITING))) self.do_next_named_step('Do Something That Takes A Long Time') self.workflow.do_engine_steps() - self.assertEqual(0, len(self.workflow.get_tasks(state=TaskState.READY|TaskState.WAITING))) + self.assertTrue(self.workflow.is_completed()) def testRunThroughMessageInterruptOtherOrder(self): @@ -120,12 +127,14 @@ def testRunThroughMessageInterruptOtherOrder(self): self.workflow.do_engine_steps() self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.READY))) - self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.WAITING))) + self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.STARTED))) + self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.WAITING))) self.workflow.catch(BpmnEvent(MessageEventDefinition('Test Message'), {})) self.workflow.do_engine_steps() - self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.WAITING))) + self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.STARTED))) + self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.WAITING))) self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.READY))) self.do_next_named_step('Do Something That Takes A Long Time') @@ -136,7 +145,7 @@ def testRunThroughMessageInterruptOtherOrder(self): self.do_next_named_step('Acknowledge Non-Interrupt Message') self.workflow.do_engine_steps() - self.assertEqual(0, len(self.workflow.get_tasks(state=TaskState.READY|TaskState.WAITING))) + self.assertTrue(self.workflow.is_completed()) def testRunThroughMessageInterruptOtherOrderSaveAndRestore(self): @@ -148,13 +157,15 @@ def testRunThroughMessageInterruptOtherOrderSaveAndRestore(self): self.save_restore() self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.READY))) - self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.WAITING))) + self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.STARTED))) + self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.WAITING))) self.workflow.catch(BpmnEvent(MessageEventDefinition('Test Message'), {})) self.save_restore() self.workflow.do_engine_steps() - self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.WAITING))) + self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.STARTED))) + self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.WAITING))) self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.READY))) self.do_next_named_step('Do Something That Takes A Long Time') @@ -167,4 +178,4 @@ def testRunThroughMessageInterruptOtherOrderSaveAndRestore(self): self.save_restore() self.workflow.do_engine_steps() - self.assertEqual(0, len(self.workflow.get_tasks(state=TaskState.READY|TaskState.WAITING))) + self.assertTrue(self.workflow.is_completed()) diff --git a/tests/SpiffWorkflow/bpmn/events/MessageNonInterruptsSpTest.py b/tests/SpiffWorkflow/bpmn/events/MessageNonInterruptsSpTest.py index 8e94e2aa..df29f65a 100644 --- a/tests/SpiffWorkflow/bpmn/events/MessageNonInterruptsSpTest.py +++ b/tests/SpiffWorkflow/bpmn/events/MessageNonInterruptsSpTest.py @@ -24,7 +24,8 @@ def testRunThroughHappySaveAndRestore(self): self.save_restore() self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.READY))) - self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.WAITING))) + self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.STARTED))) + self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.WAITING))) self.do_next_exclusive_step('Do Something In a Subprocess') self.workflow.do_engine_steps() @@ -35,7 +36,7 @@ def testRunThroughHappySaveAndRestore(self): self.save_restore() self.workflow.do_engine_steps() - self.assertEqual(0, len(self.workflow.get_tasks(state=TaskState.READY|TaskState.WAITING))) + self.assertTrue(self.workflow.is_completed()) def testRunThroughMessageSaveAndRestore(self): @@ -46,7 +47,8 @@ def testRunThroughMessageSaveAndRestore(self): self.save_restore() self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.READY))) - self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.WAITING))) + self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.STARTED))) + self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.WAITING))) self.workflow.catch(BpmnEvent(MessageEventDefinition('Test Message'), {})) @@ -63,7 +65,7 @@ def testRunThroughMessageSaveAndRestore(self): self.save_restore() self.workflow.do_engine_steps() - self.assertEqual(0, len(self.workflow.get_tasks(state=TaskState.READY|TaskState.WAITING))) + self.assertTrue(self.workflow.is_completed()) def testRunThroughMessageOrder2SaveAndRestore(self): @@ -74,7 +76,8 @@ def testRunThroughMessageOrder2SaveAndRestore(self): self.save_restore() self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.READY))) - self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.WAITING))) + self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.STARTED))) + self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.WAITING))) self.workflow.catch(BpmnEvent(MessageEventDefinition('Test Message'), {})) self.do_next_named_step('Do Something In a Subprocess') @@ -90,7 +93,7 @@ def testRunThroughMessageOrder2SaveAndRestore(self): self.save_restore() self.workflow.do_engine_steps() - self.assertEqual(0, len(self.workflow.get_tasks(state=TaskState.READY|TaskState.WAITING))) + self.assertTrue(self.workflow.is_completed()) def testRunThroughMessageOrder3SaveAndRestore(self): @@ -101,7 +104,8 @@ def testRunThroughMessageOrder3SaveAndRestore(self): self.save_restore() self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.READY))) - self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.WAITING))) + self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.STARTED))) + self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.WAITING))) self.workflow.catch(BpmnEvent(MessageEventDefinition('Test Message'), {})) @@ -118,4 +122,4 @@ def testRunThroughMessageOrder3SaveAndRestore(self): self.save_restore() self.workflow.do_engine_steps() - self.assertEqual(0, len(self.workflow.get_tasks(state=TaskState.READY|TaskState.WAITING))) + self.assertTrue(self.workflow.is_completed()) diff --git a/tests/SpiffWorkflow/bpmn/events/MessagesTest.py b/tests/SpiffWorkflow/bpmn/events/MessagesTest.py index a6fb08df..d6bba07b 100644 --- a/tests/SpiffWorkflow/bpmn/events/MessagesTest.py +++ b/tests/SpiffWorkflow/bpmn/events/MessagesTest.py @@ -21,7 +21,8 @@ def testRunThroughHappy(self): self.do_next_exclusive_step('Select Test', choice='Messages') self.workflow.do_engine_steps() self.assertEqual([], self.workflow.get_tasks(state=TaskState.READY)) - self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.WAITING))) + self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.STARTED))) + self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.WAITING))) self.workflow.catch(BpmnEvent(MessageEventDefinition('Wrong Message'), {})) self.assertEqual([], self.workflow.get_tasks(state=TaskState.READY)) self.workflow.catch(BpmnEvent(MessageEventDefinition('Test Message'), {})) @@ -30,7 +31,7 @@ def testRunThroughHappy(self): self.assertEqual('Test Message', self.workflow.get_tasks(state=TaskState.READY)[0].task_spec.bpmn_name) self.workflow.do_engine_steps() - self.assertEqual(0, len(self.workflow.get_tasks(state=TaskState.READY|TaskState.WAITING))) + self.assertTrue(self.workflow.is_completed()) def testRunThroughSaveAndRestore(self): @@ -41,7 +42,8 @@ def testRunThroughSaveAndRestore(self): self.save_restore() self.assertEqual([], self.workflow.get_tasks(state=TaskState.READY)) - self.assertEqual(2, len(self.workflow.get_tasks(state=TaskState.WAITING))) + self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.STARTED))) + self.assertEqual(1, len(self.workflow.get_tasks(state=TaskState.WAITING))) self.workflow.catch(BpmnEvent(MessageEventDefinition('Wrong Message'), {})) self.assertEqual([], self.workflow.get_tasks(state=TaskState.READY)) self.workflow.catch(BpmnEvent(MessageEventDefinition('Test Message'), {})) @@ -50,4 +52,4 @@ def testRunThroughSaveAndRestore(self): self.save_restore() self.workflow.do_engine_steps() - self.assertEqual(0, len(self.workflow.get_tasks(state=TaskState.READY|TaskState.WAITING))) + self.assertTrue(self.workflow.is_completed()) \ No newline at end of file diff --git a/tests/SpiffWorkflow/bpmn/events/TimerDurationBoundaryTest.py b/tests/SpiffWorkflow/bpmn/events/TimerDurationBoundaryTest.py index c7416fa4..2aa12889 100644 --- a/tests/SpiffWorkflow/bpmn/events/TimerDurationBoundaryTest.py +++ b/tests/SpiffWorkflow/bpmn/events/TimerDurationBoundaryTest.py @@ -22,13 +22,13 @@ def testThroughSaveRestore(self): def actual_test(self,save_restore = False): self.workflow.do_engine_steps() - ready_tasks = self.workflow.get_tasks(state=TaskState.READY) - ready_tasks[0].run() + ready_tasks = self.workflow.get_next_task(state=TaskState.READY) + ready_tasks.run() self.workflow.do_engine_steps() loopcount = 0 # test bpmn has a timeout of .03s; we should terminate loop before that. - while len(self.workflow.get_tasks(state=TaskState.WAITING)) == 2 and loopcount < 11: + while len(self.workflow.get_tasks(state=TaskState.WAITING)) == 1 and loopcount < 11: if save_restore: self.save_restore() time.sleep(0.01) diff --git a/tests/SpiffWorkflow/camunda/CallActivityMessageTest.py b/tests/SpiffWorkflow/camunda/CallActivityMessageTest.py index b074132e..6c95e3cf 100644 --- a/tests/SpiffWorkflow/camunda/CallActivityMessageTest.py +++ b/tests/SpiffWorkflow/camunda/CallActivityMessageTest.py @@ -30,9 +30,9 @@ def actual_test(self, save_restore=False): self.workflow.do_engine_steps() ready_tasks = self.workflow.get_tasks(state=TaskState.READY) - waiting_tasks = self.workflow.get_tasks(state=TaskState.WAITING) + started_tasks = self.workflow.get_tasks(state=TaskState.STARTED) self.assertEqual(1, len(ready_tasks),'Expected to have one ready task') - self.assertEqual(2, len(waiting_tasks), 'Expected to have two waiting tasks') + self.assertEqual(2, len(started_tasks), 'Expected to have two started tasks') for step in steps: current_task = ready_tasks[0] diff --git a/tests/SpiffWorkflow/core/IteratorTest.py b/tests/SpiffWorkflow/core/IteratorTest.py index b653e925..d8adbab6 100644 --- a/tests/SpiffWorkflow/core/IteratorTest.py +++ b/tests/SpiffWorkflow/core/IteratorTest.py @@ -43,7 +43,7 @@ def test_get_tasks_end_at(self): tasks = self.workflow.get_tasks(end_at_spec='c') self.assertEqual( [t.task_spec.name for t in tasks], - ['Start', 'a', 'a1', 'last', 'End', 'a2', 'last', 'End', 'c'] + ['Start', 'a', 'a1', 'last', 'End', 'a2', 'last', 'End', 'c', 'b', 'b1', 'last', 'End', 'b2', 'last', 'End'] ) def test_get_tasks_max_depth(self): @@ -67,7 +67,7 @@ def test_get_tasks_end_at(self): tasks = self.workflow.get_tasks(end_at_spec='c', depth_first=False) self.assertEqual( [t.task_spec.name for t in tasks], - ['Start', 'a', 'b', 'a1', 'a2', 'c'] + ['Start', 'a', 'b', 'a1', 'a2', 'c', 'b1', 'b2', 'last', 'last', 'last', 'last', 'End', 'End', 'End', 'End'] ) def test_get_tasks_max_depth(self):