Skip to content

Commit

Permalink
Merge pull request #352 from sartography/improvement/flexible-task-it…
Browse files Browse the repository at this point in the history
…eration

Improvement/flexible task iteration
  • Loading branch information
essweine authored Sep 20, 2023
2 parents f113aba + 518c834 commit 90159bd
Show file tree
Hide file tree
Showing 108 changed files with 1,417 additions and 1,568 deletions.
2 changes: 1 addition & 1 deletion SpiffWorkflow/bpmn/serializer/migration/version_1_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from datetime import datetime, timedelta

from SpiffWorkflow.task import TaskState
from SpiffWorkflow.util.task import TaskState
from SpiffWorkflow.bpmn.specs.event_definitions.timer import LOCALTZ

from .exceptions import VersionMigrationError
Expand Down
19 changes: 9 additions & 10 deletions SpiffWorkflow/bpmn/specs/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
# 02110-1301 USA

from SpiffWorkflow.exceptions import WorkflowException
from SpiffWorkflow.task import TaskState
from SpiffWorkflow.util.task import TaskState, TaskFilter
from SpiffWorkflow.specs.StartTask import StartTask
from SpiffWorkflow.specs.Join import Join

Expand All @@ -40,15 +40,15 @@ def _predict_hook(self, my_task):
# Events attached to the main task might occur
my_task._sync_children(self.outputs, state=TaskState.MAYBE)
# The main child's state is based on this task's state
state = TaskState.FUTURE if my_task._is_definite() else my_task.state
state = TaskState.FUTURE if my_task.has_state(TaskState.DEFINITE_MASK) else my_task.state
for child in my_task.children:
if not isinstance(child.task_spec, BoundaryEvent):
child._set_state(state)

def _update_hook(self, my_task):
super()._update_hook(my_task)
for task in my_task.children:
if isinstance(task.task_spec, BoundaryEvent) and task._is_predicted():
if isinstance(task.task_spec, BoundaryEvent) and task.has_state(TaskState.PREDICTED_MASK):
task._set_state(TaskState.WAITING)
task.task_spec._predict(task)
return True
Expand All @@ -61,9 +61,8 @@ def __init__(self, wf_spec, name, **kwargs):
super().__init__(wf_spec, name, **kwargs)

def _check_threshold_structured(self, my_task, force=False):
# Retrieve a list of all activated tasks from the associated
# task that did the conditional parallel split.
split_task = my_task._find_ancestor_from_name(self.split_task)
# Retrieve a list of all activated tasks from the associated task that did the conditional parallel split.
split_task = my_task.find_ancestor(self.split_task)
if split_task is None:
raise WorkflowException(f'Split at {self.split_task} was not reached', task_spec=self)

Expand All @@ -79,8 +78,8 @@ def _check_threshold_structured(self, my_task, force=False):
if main is None:
raise WorkflowException(f'No main task found', task_spec=self)

interrupt = any([t._has_state(TaskState.READY|TaskState.COMPLETED) for t in interrupting])
finished = main._is_finished() or interrupt
interrupt = any([t.has_state(TaskState.READY|TaskState.COMPLETED) for t in interrupting])
finished = main.has_state(TaskState.FINISHED_MASK) or interrupt
if finished:
cancel = [t for t in interrupting + noninterrupting if t.state == TaskState.WAITING]
if interrupt:
Expand All @@ -96,7 +95,7 @@ def _check_threshold_unstructured(self, my_task, force=False):
# Look at the tree to find all ready and waiting tasks (excluding
# ourself). The EndJoin waits for everyone!
waiting_tasks = []
for task in my_task.workflow.get_tasks(TaskState.READY | TaskState.WAITING):
for task in my_task.workflow.get_tasks(task_filter=TaskFilter(state=TaskState.READY|TaskState.WAITING)):
if task.thread_id != my_task.thread_id:
continue
if task.task_spec == my_task.task_spec:
Expand All @@ -108,4 +107,4 @@ def _check_threshold_unstructured(self, my_task, force=False):
def _run_hook(self, my_task):
result = super(_EndJoin, self)._run_hook(my_task)
my_task.workflow.data.update(my_task.data)
return result
return result
4 changes: 2 additions & 2 deletions SpiffWorkflow/bpmn/specs/mixins/events/end_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301 USA

from SpiffWorkflow.task import TaskState
from SpiffWorkflow.util.task import TaskState, TaskFilter
from SpiffWorkflow.bpmn.specs.event_definitions.simple import TerminateEventDefinition, CancelEventDefinition
from .event_types import ThrowingEvent

Expand Down Expand Up @@ -49,7 +49,7 @@ def _on_complete_hook(self, my_task):

# We are finished. Set the workflow data and cancel all tasks
my_task.workflow.set_data(**my_task.data)
for task in my_task.workflow.get_tasks(TaskState.NOT_FINISHED_MASK):
for task in my_task.workflow.get_tasks(task_filter=TaskFilter(state=TaskState.NOT_FINISHED_MASK)):
task.cancel()

elif isinstance(self.event_definition, CancelEventDefinition):
Expand Down
2 changes: 1 addition & 1 deletion SpiffWorkflow/bpmn/specs/mixins/events/event_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301 USA
import time
from SpiffWorkflow.task import TaskState
from SpiffWorkflow.util.task import TaskState
from SpiffWorkflow.specs.base import TaskSpec

from SpiffWorkflow.bpmn.specs.event_definitions.simple import NoneEventDefinition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301 USA

from SpiffWorkflow.task import TaskState
from SpiffWorkflow.util.task import TaskState
from .event_types import ThrowingEvent, CatchingEvent


Expand Down
2 changes: 1 addition & 1 deletion SpiffWorkflow/bpmn/specs/mixins/events/start_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301 USA

from SpiffWorkflow.task import TaskState
from SpiffWorkflow.util.task import TaskState
from .event_types import CatchingEvent


Expand Down
9 changes: 5 additions & 4 deletions SpiffWorkflow/bpmn/specs/mixins/inclusive_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
# 02110-1301 USA

from SpiffWorkflow.bpmn.exceptions import WorkflowTaskException
from SpiffWorkflow.task import TaskState
from SpiffWorkflow.util.task import TaskState, TaskFilter
from SpiffWorkflow.specs.MultiChoice import MultiChoice
from .unstructured_join import UnstructuredJoin

Expand Down Expand Up @@ -70,7 +70,7 @@ def test(self):

def _check_threshold_unstructured(self, my_task, force=False):
# Look at the tree to find all places where this task is used.
tasks = my_task.workflow.get_tasks_from_spec_name(self.name)
tasks = my_task.workflow.get_tasks(task_filter=TaskFilter(spec_name=self.name))

# Look up which tasks have parents completed.
completed_inputs = set([ task.parent.task_spec for task in tasks if task.parent.state == TaskState.COMPLETED ])
Expand All @@ -80,7 +80,7 @@ def _check_threshold_unstructured(self, my_task, force=False):
# A spec only has to complete once, even if on multiple paths
waiting_tasks = []
for task in tasks:
if task.parent._has_state(TaskState.DEFINITE_MASK) and task.parent.task_spec not in completed_inputs:
if task.parent.has_state(TaskState.DEFINITE_MASK) and task.parent.task_spec not in completed_inputs:
waiting_tasks.append(task.parent)

if force:
Expand All @@ -92,7 +92,8 @@ def _check_threshold_unstructured(self, my_task, force=False):
else:
# Handle the case where there are paths from active tasks that must go through waiting inputs
waiting_inputs = [i for i in self.inputs if i not in completed_inputs]
sources = [t.task_spec for t in my_task.workflow.get_tasks(TaskState.READY | TaskState.WAITING)]
task_filter = TaskFilter(state=TaskState.READY|TaskState.WAITING)
sources = [t.task_spec for t in my_task.workflow.get_tasks(task_filter=task_filter)]

# This will go back through a task spec's ancestors and return the source, if applicable
def check(spec):
Expand Down
9 changes: 5 additions & 4 deletions SpiffWorkflow/bpmn/specs/mixins/multiinstance_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
from copy import deepcopy
from collections.abc import Iterable, Sequence, Mapping, MutableSequence, MutableMapping

from SpiffWorkflow.task import TaskState
from SpiffWorkflow.specs.base import TaskSpec
from SpiffWorkflow.util.task import TaskState
from SpiffWorkflow.util.deep_merge import DeepMerge
from SpiffWorkflow.bpmn.specs.bpmn_task_spec import BpmnTaskSpec
from SpiffWorkflow.bpmn.exceptions import WorkflowDataException
Expand All @@ -36,10 +37,10 @@ def process_children(self, my_task):
merged = self._merged_children(my_task)
child_running = False
for child in self._instances(my_task):
if child._has_state(TaskState.FINISHED_MASK) and str(child.id) not in merged:
if child.has_state(TaskState.FINISHED_MASK) and str(child.id) not in merged:
self.child_completed_action(my_task, child)
merged.append(str(child.id))
elif not child._has_state(TaskState.FINISHED_MASK):
elif not child.has_state(TaskState.FINISHED_MASK):
child_running = True
my_task.internal_data['merged'] = merged
return child_running
Expand Down Expand Up @@ -134,7 +135,7 @@ def task_info(self, my_task):
for task in self._instances(my_task):
key_or_index = task.internal_data.get('key_or_index')
value = task.internal_data.get('item') if key_or_index is None else key_or_index
if task._has_state(TaskState.FINISHED_MASK):
if task.has_state(TaskState.FINISHED_MASK):
info['completed'].append(value)
else:
info['running'].append(value)
Expand Down
8 changes: 4 additions & 4 deletions SpiffWorkflow/bpmn/specs/mixins/parallel_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301 USA

from SpiffWorkflow.task import TaskState
from SpiffWorkflow.util.task import TaskState, TaskFilter
from .unstructured_join import UnstructuredJoin


Expand All @@ -43,7 +43,7 @@ class ParallelGateway(UnstructuredJoin):
"""
def _check_threshold_unstructured(self, my_task, force=False):

tasks = my_task.workflow.get_tasks_from_spec_name(self.name)
tasks = my_task.workflow.get_tasks(task_filter=TaskFilter(spec_name=self.name))
# Look up which tasks have parents completed.
waiting_tasks = []
waiting_inputs = set(self.inputs)
Expand All @@ -59,10 +59,10 @@ def remove_ancestor(task):
if task.parent.state == TaskState.COMPLETED and task.parent.task_spec in waiting_inputs:
waiting_inputs.remove(task.parent.task_spec)
# Do not wait for descendants of this task
elif task._is_descendant_of(my_task):
elif task.is_descendant_of(my_task):
remove_ancestor(task)
# Ignore predicted tasks; we don't care about anything not definite
elif task.parent._has_state(TaskState.DEFINITE_MASK):
elif task.parent.has_state(TaskState.DEFINITE_MASK):
waiting_tasks.append(task.parent)

return force or len(waiting_inputs) == 0, waiting_tasks
22 changes: 11 additions & 11 deletions SpiffWorkflow/bpmn/specs/mixins/subworkflow_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from copy import deepcopy

from SpiffWorkflow.task import TaskState
from SpiffWorkflow.util.task import TaskState
from SpiffWorkflow.specs.base import TaskSpec
from SpiffWorkflow.bpmn.exceptions import WorkflowDataException

Expand Down Expand Up @@ -64,8 +64,8 @@ def copy_data(self, my_task, subworkflow):
# But our data management is already hopelessly messed up and in dire needs of reconsideration
if len(subworkflow.spec.data_objects) > 0:
subworkflow.data = my_task.workflow.data
start = subworkflow.get_tasks_from_spec_name('Start')
start[0].set_data(**my_task.data)
start = subworkflow.get_next_task(spec_name='Start')
start.set_data(**my_task.data)

def update_data(self, my_task, subworkflow):
my_task.data = deepcopy(subworkflow.last_task.data)
Expand All @@ -77,8 +77,8 @@ def create_workflow(self, my_task):
def start_workflow(self, my_task):
subworkflow = my_task.workflow.top_workflow.get_subprocess(my_task)
self.copy_data(my_task, subworkflow)
for child in subworkflow.task_tree.children:
child.task_spec._update(child)
start = subworkflow.get_next_task(spec_name='Start')
start.run()
my_task._set_state(TaskState.WAITING)


Expand All @@ -89,10 +89,10 @@ def __init__(self, wf_spec, bpmn_id, subworkflow_spec, **kwargs):

def copy_data(self, my_task, subworkflow):

start = subworkflow.get_tasks_from_spec_name('Start')
start = subworkflow.get_next_task(spec_name='Start')
if subworkflow.spec.io_specification is None or len(subworkflow.spec.io_specification.data_inputs) == 0:
# Copy all task data into start task if no inputs specified
start[0].set_data(**my_task.data)
start.set_data(**my_task.data)
else:
# Otherwise copy only task data with the specified names
for var in subworkflow.spec.io_specification.data_inputs:
Expand All @@ -102,24 +102,24 @@ def copy_data(self, my_task, subworkflow):
task=my_task,
data_input=var,
)
start[0].data[var.bpmn_id] = my_task.data[var.bpmn_id]
start.data[var.bpmn_id] = my_task.data[var.bpmn_id]

def update_data(self, my_task, subworkflow):

if subworkflow.spec.io_specification is None or len(subworkflow.spec.io_specification.data_outputs) == 0:
# Copy all workflow data if no outputs are specified
my_task.data = deepcopy(subworkflow.last_task.data)
else:
end = subworkflow.get_tasks_from_spec_name('End')
end = subworkflow.get_next_task(spec_name='End')
# Otherwise only copy data with the specified names
for var in subworkflow.spec.io_specification.data_outputs:
if var.bpmn_id not in end[0].data:
if var.bpmn_id not in end.data:
raise WorkflowDataException(
"The Data Output was not available in the subprocess output.",
task=my_task,
data_output=var,
)
my_task.data[var.bpmn_id] = end[0].data[var.bpmn_id]
my_task.data[var.bpmn_id] = end.data[var.bpmn_id]


class TransactionSubprocess(SubWorkflowTask):
Expand Down
8 changes: 4 additions & 4 deletions SpiffWorkflow/bpmn/specs/mixins/unstructured_join.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301 USA

from SpiffWorkflow.task import TaskState
from SpiffWorkflow.util.task import TaskState, TaskIterator
from SpiffWorkflow.specs.Join import Join


Expand All @@ -36,14 +36,14 @@ def _do_join(self, my_task):
# to build the task tree underneath the most recently changed task.
last_changed = None
thread_tasks = []
for task in split_task._find_any(self):
for task in TaskIterator(split_task, spec_name=self.name):
if task.thread_id != my_task.thread_id:
# Ignore tasks from other threads. (Do we need this condition?)
continue
if not task.parent._is_finished():
if not task.parent.has_state(TaskState.FINISHED_MASK):
# For an inclusive join, this can happen - it's a future join
continue
if my_task._is_descendant_of(task):
if my_task.is_descendant_of(task):
# Skip ancestors (otherwise the branch this task is on will get dropped)
continue
# We have found a matching instance.
Expand Down
Loading

0 comments on commit 90159bd

Please sign in to comment.