Skip to content

Commit

Permalink
improvements to diff utils
Browse files Browse the repository at this point in the history
  • Loading branch information
essweine committed Jul 3, 2024
1 parent ed6c428 commit 95197b2
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 64 deletions.
187 changes: 133 additions & 54 deletions SpiffWorkflow/bpmn/util/diff.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,46 @@
from SpiffWorkflow import TaskState
from .task import BpmnTaskFilter

class SpecDiff:
"""This class is used to hold results for comparisions between two workflow specs.
def __init__(self, serializer, original, new):
"""This class is used to hold results for comparisions between two workflow specs.
Attributes:
added (list(`TaskSpec`)): task specs from the new version that cannot be aligned
alignment (dict): a mapping of old task spec to new
comparisons (dict): a mapping of old task spec to changed attributes
Attributes:
registry: a serializer's registry
unmatched (list(`TaskSpec`)): a list of task specs that cannot be aligned
alignment (dict): a mapping of old task spec to new
updated (dict): a mapping of old task spec to changed attributes
Properties:
removed (list of `TaskSpec`: specs from the original that cannot be aligned
changed (dict): a filtered version of comparisons that contains only changed items
The chief basis for alignment is `TaskSpec.name` (ie, the BPMN ID of the spec): if the IDs are identical,
it is assumed the task specs correspond. If a spec in the old version does not have an ID in the new,
some attempt to match based on inputs and outputs is made.
The general procdedure is to attempt to align as many tasks based on ID as possible first, and then
attempt to match by other means while backing out of the traversal.
The chief basis for alignment is `TaskSpec.name` (ie, the BPMN ID of the spec): if the IDs are identical,
it is assumed the task specs correspond. If a spec in the old version does not have an ID in the new,
some attempt to match based on inputs and outputs is made.
Updates are organized primarily by the specs from the original version.
"""
The general procdedure is to attempt to align as many tasks based on ID as possible first, and then
attempt to match by other means while backing out of the traversal.
Updates are organized primarily by the specs from the original version.
"""

def __init__(self, registry, original, new):
"""Constructor for a spec diff.
Args:
registry (`DictionaryConverter`): a serislizer registry
original (`BpmnProcessSpec`): the original spec
new (`BpmnProcessSpec`): the spec to compare
self.registry = serializer.registry
self.unmatched = [spec for spec in new.task_specs.values() if spec.name not in original.task_specs]
Aligns specs from the original with specs from the new workflow and checks each aligned pair
for chames.
"""
self.added = [spec for spec in new.task_specs.values() if spec.name not in original.task_specs]
self.alignment = {}
self.updated = {}
self.comparisons = {}
self._registry = registry
self._align(original.start, new)

@property
def added(self):
"""Task specs from the new version that did not exist in the old"""
return self.unmatched

@property
def removed(self):
"""Task specs from the old version that were removed from the new"""
Expand All @@ -40,14 +49,14 @@ def removed(self):
@property
def changed(self):
"""Task specs with updated attributes"""
return dict((ts, changes) for ts, changes in self.updated.items() if changes)
return dict((ts, changes) for ts, changes in self.comparisons.items() if changes)

def _align(self, spec, new):

candidate = new.task_specs.get(spec.name)
self.alignment[spec] = candidate
if candidate is not None:
self.updated[spec] = self._compare_task_specs(spec, candidate)
self.comparisons[spec] = self._compare_task_specs(spec, candidate)

# Traverse the spec, prioritizing matching by name
# Without this starting point, alignment would be way too difficult
Expand All @@ -62,7 +71,7 @@ def _align(self, spec, new):
def _search_unmatched(self, spec):
# If any outputs were matched, we can use its unmatched inputs as candidates
for match in self._substitutions(spec.outputs):
for parent in [ts for ts in match.inputs if ts in self.unmatched]:
for parent in [ts for ts in match.inputs if ts in self.added]:
if self._aligned(spec.outputs, parent.outputs):
path = [parent] # We may need to check ancestor inputs as well as this spec's inputs
searched = [] # We have to keep track of what we've looked at in case of loops
Expand All @@ -72,15 +81,17 @@ def _find_ancestor(self, spec, path, searched):
if path[-1] not in searched:
searched.append(path[-1])
# Stop if we reach a previously matched spec or if an ancestor's inputs match
if path[-1] not in self.unmatched or self._aligned(spec.inputs, path[-1].inputs):
if path[-1] not in self.added or self._aligned(spec.inputs, path[-1].inputs):
self.alignment[spec] = path[0]
self.unmatched.remove(path[0])
if path[0] in self.added:
self.added.remove(path[0])
self.comparisons[spec] = self._compare_task_specs(spec, path[0])
else:
for parent in [ts for ts in path[-1].inputs if ts not in searched]:
self._find_ancestor(spec, path + [parent], searched)

def _substitutions(self, spec_list, skip_unaligned=True):
subs = [self.alignment[ts] for ts in spec_list]
subs = [self.alignment.get(ts) for ts in spec_list]
return [ts for ts in subs if ts is not None] if skip_unaligned else subs

def _aligned(self, original, candidates):
Expand All @@ -89,8 +100,8 @@ def _aligned(self, original, candidates):
all(first is not None and first.name == second.name for first, second in zip(subs, candidates))

def _compare_task_specs(self, spec, candidate):
s1 = self.registry.convert(spec)
s2 = self.registry.convert(candidate)
s1 = self._registry.convert(spec)
s2 = self._registry.convert(candidate)
if s1.get('typename') != s2.get('typename'):
return ['typename']
else:
Expand All @@ -101,41 +112,109 @@ class WorkflowDiff:
to its WorkflowSpec.
Attributes
workflow (`BpmnWorkflow`): a workflow instance
spec_diff (`SpecDiff`): the results of a comparision of two specs
removed (list(`Task`)): a list of tasks whose specs do not exist in the new version
changed (list(`Task`)): a list of tasks with aligned specs where attributes have changed
alignment (dict): a mapping of old task spec to new task spec
"""

def __init__(self, workflow, spec_diff):
self.workflow = workflow
self.spec_diff = spec_diff
self.removed = []
self.changed = []
self.alignment = {}
self._align()
self._align(workflow, spec_diff)

def filter_tasks(self, tasks, **kwargs):
"""Applies task filtering arguments to a list of tasks.
def _align(self, workflow, spec_diff):
for task in workflow.get_tasks(skip_subprocesses=True):
if task.task_spec in spec_diff.changed:
self.changed.append(task)
if task.task_spec in spec_diff.removed:
self.removed.append(task)
else:
self.alignment[task] = spec_diff.alignment[task.task_spec]

Args:
tasks (list(`Task`)): a list of of tasks

Keyword Args:
any keyword arg that may be passed to `BpmnTaskFilter`
def diff_dependencies(registry, original, new):
"""Helper method for comparing sets of spec dependencies.
Returns:
a list containing tasks matching the filter
"""
task_filter = BpmnTaskFilter(**kwargs)
return [t for t in tasks if task_filter.matches(t)]
Args:
registry (`DictionaryConverter`): a serislizer registry
original (dict): the name -> `BpmnProcessSpec` mapping for the original spec
new (dict): the name -> `BpmnProcessSpec` mapping for the updated spec
def _align(self):
for task in self.workflow.get_tasks(skip_subprocesses=True):
if task.task_spec in self.spec_diff.changed:
self.changed.append(task)
if task.task_spec in self.spec_diff.removed:
self.removed.append(task)
else:
self.alignment[task] = self.spec_diff.alignment[task.task_spec]
Returns:
a tuple of:
mapping from name -> `SpecDiff` (or None) for each original dependency
list of names of specs in the new dependencies that did not previously exist
"""
result = {}
subprocesses = {}
for name, spec in original.items():
if name in new:
result[name] = SpecDiff(registry, spec, new[name])
else:
result[name] = None

return result, [name for name in new if name not in original]


def diff_workflow(registry, workflow, new_spec, new_dependencies):
"""Helper method to handle diffing a workflow and all its dependencies at once.
Args:
registry (`DictionaryConverter`): a serislizer registry
workflow (`BpmnWorkflow`): a workflow instance
new_spec (`BpmnProcessSpec`): the new top level spec
new_depedencies (dict): a dictionary of name -> `BpmnProcessSpec`
Returns:
tuple of `WorkflowDiff` and mapping of subworkflow id -> `WorkflowDiff`
This method checks the top level workflow against the new spec as well as any
existing subprocesses for missing or updated specs.
"""
spec_diff = SpecDiff(registry, workflow.spec, new_spec)
top_diff = WorkflowDiff(workflow, spec_diff)
sp_diffs = {}
for sp_id, sp in workflow.subprocesses.items():
if sp.spec.name in new_dependencies:
dep_diff = SpecDiff(registry, sp.spec, new_dependencies[sp.spec.name])
sp_diffs[sp_id] = WorkflowDiff(sp, dep_diff)
else:
sp_diffs[sp_id] = None
return top_diff, sp_diffs

def filter_tasks(tasks, **kwargs):
"""Applies task filtering arguments to a list of tasks.
Args:
tasks (list(`Task`)): a list of of tasks
Keyword Args:
any keyword arg that may be passed to `BpmnTaskFilter`
Returns:
a list containing tasks matching the filter
"""
task_filter = BpmnTaskFilter(**kwargs)
return [t for t in tasks if task_filter.matches(t)]

def migrate_workflow(diff, workflow, spec, reset_mask=None):
"""Update the spec for workflow.
Args:
diff (`WorkflowDiff`): the diff of this workflow and spec
workflow (`BpmnWorkflow` or `BpmnSubWorkflow`): the workflow
spec (`BpmnProcessSpec`): the new spec
Keyword Args:
reset_mask (`TaskState`): reset and repredict tasks in this state
The default rest_mask is TaskState.READY|TaskState.WAITING but can be overridden.
"""
workflow.spec = spec
for task in workflow.get_tasks():
task.task_spec = diff.alignment.get(task)

default_mask = TaskState.READY|TaskState.WAITING
for task in list(workflow.get_tasks(state=reset_mask or default_mask)):
task.reset_branch(None)
31 changes: 21 additions & 10 deletions tests/SpiffWorkflow/bpmn/DiffUtilTest.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from SpiffWorkflow import TaskState
from SpiffWorkflow.bpmn import BpmnWorkflow
from SpiffWorkflow.bpmn.util.diff import SpecDiff, WorkflowDiff
from SpiffWorkflow.bpmn.util.diff import SpecDiff, WorkflowDiff, diff_workflow

from .BpmnWorkflowTestCase import BpmnWorkflowTestCase

Expand All @@ -9,7 +9,7 @@ class CompareSpecTest(BpmnWorkflowTestCase):
def test_tasks_added(self):
v1_spec, v1_sp_specs = self.load_workflow_spec('diff/v1.bpmn', 'Process')
v2_spec, v2_sp_specs = self.load_workflow_spec('diff/v2.bpmn', 'Process')
result = SpecDiff(self.serializer, v1_spec, v2_spec)
result = SpecDiff(self.serializer.registry, v1_spec, v2_spec)
self.assertEqual(len(result.added), 3)
self.assertIn(v2_spec.task_specs.get('Gateway_1618q26'), result.added)
self.assertIn(v2_spec.task_specs.get('Activity_1ds7clb'), result.added)
Expand All @@ -18,7 +18,7 @@ def test_tasks_added(self):
def test_tasks_removed(self):
v1_spec, v1_sp_specs = self.load_workflow_spec('diff/v1.bpmn', 'Process')
v2_spec, v2_sp_specs = self.load_workflow_spec('diff/v2.bpmn', 'Process')
result = SpecDiff(self.serializer, v2_spec, v1_spec)
result = SpecDiff(self.serializer.registry, v2_spec, v1_spec)
self.assertEqual(len(result.removed), 3)
self.assertIn(v2_spec.task_specs.get('Gateway_1618q26'), result.removed)
self.assertIn(v2_spec.task_specs.get('Activity_1ds7clb'), result.removed)
Expand All @@ -27,7 +27,7 @@ def test_tasks_removed(self):
def test_tasks_changed(self):
v2_spec, v2_sp_specs = self.load_workflow_spec('diff/v2.bpmn', 'Process')
v3_spec, v3_sp_specs = self.load_workflow_spec('diff/v3.bpmn', 'Process')
result = SpecDiff(self.serializer, v2_spec, v3_spec)
result = SpecDiff(self.serializer.registry, v2_spec, v3_spec)
# The deafult output was changed and a the conditional output was converted to a subprocess
self.assertListEqual(
result.changed.get(v2_spec.task_specs.get('Gateway_1618q26')),
Expand All @@ -42,7 +42,7 @@ def test_tasks_changed(self):
def test_alignment(self):
v2_spec, v2_sp_specs = self.load_workflow_spec('diff/v2.bpmn', 'Process')
v3_spec, v3_sp_specs = self.load_workflow_spec('diff/v3.bpmn', 'Process')
result = SpecDiff(self.serializer, v2_spec, v3_spec)
result = SpecDiff(self.serializer.registry, v2_spec, v3_spec)
old_end_event = v2_spec.task_specs.get('Event_0rilo47')
new_end_event = v3_spec.task_specs.get('Event_18osyv3')
self.assertEqual(result.alignment[old_end_event], new_end_event)
Expand All @@ -53,7 +53,7 @@ def test_alignment(self):
def test_multiple(self):
v4_spec, v4_sp_specs = self.load_workflow_spec('diff/v4.bpmn', 'Process')
v5_spec, v5_sp_specs = self.load_workflow_spec('diff/v5.bpmn', 'Process')
result = SpecDiff(self.serializer, v4_spec, v5_spec)
result = SpecDiff(self.serializer.registry, v4_spec, v5_spec)
self.assertEqual(len(result.removed), 4)
self.assertEqual(len(result.changed), 4)
self.assertIn(v4_spec.task_specs.get('Gateway_0z1qhgl'), result.removed)
Expand All @@ -78,9 +78,9 @@ class CompareWorkflowTest(BpmnWorkflowTestCase):
def test_changed(self):
v3_spec, v3_sp_specs = self.load_workflow_spec('diff/v3.bpmn', 'Process')
v4_spec, v4_sp_specs = self.load_workflow_spec('diff/v4.bpmn', 'Process')
spec_diff = SpecDiff(self.serializer, v3_spec, v4_spec)
spec_diff = SpecDiff(self.serializer.registry, v3_spec, v4_spec)
sp_spec_diff = SpecDiff(
self.serializer,
self.serializer.registry,
v3_sp_specs['Activity_1ds7clb'],
v4_sp_specs['Activity_1ds7clb']
)
Expand All @@ -102,9 +102,9 @@ def test_changed(self):
def test_removed(self):
v4_spec, v4_sp_specs = self.load_workflow_spec('diff/v4.bpmn', 'Process')
v5_spec, v5_sp_specs = self.load_workflow_spec('diff/v5.bpmn', 'Process')
spec_diff = SpecDiff(self.serializer, v4_spec, v5_spec)
spec_diff = SpecDiff(self.serializer.registry, v4_spec, v5_spec)
sp_spec_diff = SpecDiff(
self.serializer,
self.serializer.registry,
v4_sp_specs['Activity_1ds7clb'],
v5_sp_specs['Activity_1ds7clb']
)
Expand All @@ -123,3 +123,14 @@ def test_removed(self):
self.assertIn(workflow.get_next_task(spec_name='Activity_11gnihu'), wf_diff.removed)
self.assertIn(workflow.get_next_task(spec_name='Gateway_1acqedb'), wf_diff.removed)

def test_subprocess_changed(self):
v3_spec, v3_sp_specs = self.load_workflow_spec('diff/v3.bpmn', 'Process')
v4_spec, v4_sp_specs = self.load_workflow_spec('diff/v4.bpmn', 'Process')
workflow = BpmnWorkflow(v3_spec, v3_sp_specs)
task = workflow.get_next_task(state=TaskState.READY, manual=False)
while task is not None:
task.run()
task = workflow.get_next_task(state=TaskState.READY, manual=False)
result, sp_result = diff_workflow(self.serializer.registry, workflow, v4_spec, v4_sp_specs)
sp_task = workflow.get_next_task(spec_name='Activity_1ds7clb')
self.assertIn(sp_task.id, sp_result)

0 comments on commit 95197b2

Please sign in to comment.