From e9591122cd5d669df009e6826365ee5d93d3108c Mon Sep 17 00:00:00 2001 From: Elizabeth Esswein Date: Tue, 16 Jul 2024 15:02:23 -0400 Subject: [PATCH] split workflow & task logs, add state and update time to task logs --- SpiffWorkflow/bpmn/specs/data_spec.py | 6 +++--- SpiffWorkflow/bpmn/util/diff.py | 1 - SpiffWorkflow/operators.py | 6 +++--- SpiffWorkflow/specs/Transform.py | 4 ++-- SpiffWorkflow/task.py | 26 +++++++++++------------ SpiffWorkflow/workflow.py | 30 +++++++++++++-------------- 6 files changed, 35 insertions(+), 38 deletions(-) diff --git a/SpiffWorkflow/bpmn/specs/data_spec.py b/SpiffWorkflow/bpmn/specs/data_spec.py index bb9a0af5..4150c641 100644 --- a/SpiffWorkflow/bpmn/specs/data_spec.py +++ b/SpiffWorkflow/bpmn/specs/data_spec.py @@ -22,7 +22,7 @@ from SpiffWorkflow.bpmn.exceptions import WorkflowDataException -data_log = logging.getLogger('spiff.data') +logger = logging.getLogger('spiff.data') class BpmnDataSpecification: @@ -73,7 +73,7 @@ def get(self, my_task): raise WorkflowDataException(message, my_task, data_input=self) my_task.data[self.bpmn_id] = deepcopy(wf.data_objects[self.bpmn_id]) - data_log.info(f'Read workflow variable {self.bpmn_id}', extra=my_task.log_info()) + logger.info(f'Read workflow variable', extra=my_task.collect_log_extras({'bpmn_id': self.bpmn_id})) def set(self, my_task): """Copy a value from the task data to the workflow data""" @@ -88,7 +88,7 @@ def set(self, my_task): wf.data_objects[self.bpmn_id] = deepcopy(my_task.data[self.bpmn_id]) del my_task.data[self.bpmn_id] - data_log.info(f'Set workflow variable {self.bpmn_id}', extra=my_task.log_info()) + logger.info(f'Set workflow variable', extra=my_task.collect_log_extras({'bpmn_id': self.bpmn_id})) def delete(self, my_task): my_task.data.pop(self.bpmn_id, None) diff --git a/SpiffWorkflow/bpmn/util/diff.py b/SpiffWorkflow/bpmn/util/diff.py index 8c8a0c5e..0f0d1b6a 100644 --- a/SpiffWorkflow/bpmn/util/diff.py +++ b/SpiffWorkflow/bpmn/util/diff.py @@ -112,7 +112,6 @@ def _aligned(self, original, candidates): all(first is not None and first.name == second.name for first, second in zip(subs, candidates)) def _compare_task_specs(self, spec, candidate): - s1 = self._registry.convert(spec) s2 = self._registry.convert(candidate) if s1.get('typename') != s2.get('typename'): diff --git a/SpiffWorkflow/operators.py b/SpiffWorkflow/operators.py index 2071861d..0049c7b6 100644 --- a/SpiffWorkflow/operators.py +++ b/SpiffWorkflow/operators.py @@ -20,7 +20,7 @@ import logging import re -logger = logging.getLogger('spiff') +logger = logging.getLogger('spiff.task') class Term(object): @@ -182,7 +182,7 @@ def valueof(scope, op, default=None): return default elif isinstance(op, Attrib): if op.name not in scope.data: - logger.debug("Attrib('{op.name}') not present in task data", extra=scope.log_info({'data': scope.data})) + logger.debug("Attrib('{op.name}') not present in task data", extra=scope.collect_log_extras({'data': scope.data})) return scope.get_data(op.name, default) elif isinstance(op, PathAttrib): if not op.path: @@ -191,7 +191,7 @@ def valueof(scope, op, default=None): data = scope.data for part in parts: if part not in data: - logger.debug(f"PathAttrib('{op.name}') not present in task data", extra=scope.log_info({'data': scope.data})) + logger.debug(f"PathAttrib('{op.name}') not present in task data", extra=scope.collect_log_extras({'data': scope.data})) return default data = data[part] # move down the path return data diff --git a/SpiffWorkflow/specs/Transform.py b/SpiffWorkflow/specs/Transform.py index 585fdcb4..5921f88e 100644 --- a/SpiffWorkflow/specs/Transform.py +++ b/SpiffWorkflow/specs/Transform.py @@ -21,7 +21,7 @@ from .base import TaskSpec -logger = logging.getLogger('spiff') +logger = logging.getLogger('spiff.task') class Transform(TaskSpec): @@ -56,7 +56,7 @@ def _update_hook(self, my_task): if self.transforms: for transform in self.transforms: - logger.debug('Execute transform', extra=my_task.log_info({'transform': transform})) + logger.debug('Execute transform', extra=my_task.collect_log_extras({'transform': transform})) exec(transform) return True diff --git a/SpiffWorkflow/task.py b/SpiffWorkflow/task.py index 1d09540f..2478f12a 100644 --- a/SpiffWorkflow/task.py +++ b/SpiffWorkflow/task.py @@ -27,9 +27,7 @@ from .util.deep_merge import DeepMerge from .exceptions import WorkflowException -logger = logging.getLogger('spiff') -metrics = logging.getLogger('spiff.metrics') -data_log = logging.getLogger('spiff.data') +logger = logging.getLogger('spiff.task') class Task(object): @@ -100,7 +98,6 @@ def state(self): @state.setter def state(self, value): - if value < self._state: raise WorkflowException( 'state went from %s to %s!' % (TaskState.get_name(self._state), TaskState.get_name(value)), @@ -173,6 +170,7 @@ def reset_branch(self, data): Returns: list(`Task`): tasks removed from the tree """ + logger.info(f'Branch reset', extra=self.collect_log_extras()) self.internal_data = {} self.data = deepcopy(self.parent.data) if data is None else data descendants = [t for t in self] @@ -294,11 +292,11 @@ def _set_state(self, value): """Force set the state on a task""" if value != self.state: - logger.info(f'State change to {TaskState.get_name(value)}', extra=self.log_info()) self.last_state_change = time.time() self._state = value + logger.info(f'State changed to {TaskState.get_name(value)}', extra=self.collect_log_extras()) else: - logger.debug(f'State set to {TaskState.get_name(value)}', extra=self.log_info()) + logger.debug(f'State set to {TaskState.get_name(value)}', extra=self.collect_log_extras()) def _assign_new_thread_id(self, recursive=True): """Assigns a new thread id to the task.""" @@ -347,11 +345,6 @@ def run(self): """ start = time.time() retval = self.task_spec._run(self) - extra = self.log_info({ - 'action': 'Complete', - 'elapsed': time.time() - start - }) - metrics.debug('', extra=extra) if retval is None: self._set_state(TaskState.STARTED) elif retval is False: @@ -388,7 +381,7 @@ def trigger(self, *args): """ self.task_spec._on_trigger(self, *args) - def log_info(self, dct=None): + def collect_log_extras(self, dct=None): """Return logging details for this task""" extra = dct or {} extra.update({ @@ -396,9 +389,14 @@ def log_info(self, dct=None): 'task_spec': self.task_spec.name, 'task_id': self.id, 'task_type': self.task_spec.__class__.__name__, - 'data': self.data if logger.level < 20 else None, - 'internal_data': self.internal_data if logger.level <= 10 else None, + 'state': TaskState.get_name(self._state), + 'last_state_change': self.last_state_change, }) + if logger.level < 20: + extra.update({ + 'data': self.data if logger.level < 20 else None, + 'internal_data': self.internal_data if logger.level < 20 else None, + }) return extra def __iter__(self): diff --git a/SpiffWorkflow/workflow.py b/SpiffWorkflow/workflow.py index 50615155..3e98a666 100644 --- a/SpiffWorkflow/workflow.py +++ b/SpiffWorkflow/workflow.py @@ -25,7 +25,7 @@ from .util.event import Event from .exceptions import TaskNotFoundException, WorkflowException -logger = logging.getLogger('spiff') +logger = logging.getLogger('spiff.workflow') class Workflow(object): @@ -64,8 +64,8 @@ def __init__(self, workflow_spec, deserializing=False): if not deserializing: self.task_tree = Task(self, self.spec.start, state=TaskState.FUTURE) self.task_tree.task_spec._predict(self.task_tree, mask=TaskState.NOT_FINISHED_MASK) + logger.info('Initialized workflow', extra=self.collect_log_extras()) self.task_tree._ready() - logger.info('Initialize', extra=self.log_info()) def is_completed(self): """Checks whether the workflow is complete. @@ -215,13 +215,13 @@ def cancel(self, success=False): list(`Task`): the cancelled tasks """ self.success = success - cancel = [] + logger.info(f'Workflow cancelled', extra=self.collect_log_extras()) + cancelled = [] for task in TaskIterator(self.task_tree, state=TaskState.NOT_FINISHED_MASK): - cancel.append(task) - for task in cancel: + cancelled.append(task) + for task in cancelled: task.cancel() - logger.info(f'Cancel with {len(cancel)} remaining', extra=self.log_info()) - return cancel + return cancelled def set_data(self, **kwargs): """Defines the given attribute/value pairs.""" @@ -254,16 +254,16 @@ def reset_from_task_id(self, task_id, data=None): self.last_task = task.parent return task.reset_branch(data) - def log_info(self, dct=None): + def collect_log_extras(self, dct=None): """Return logging details for this workflow""" extra = dct or {} - extra.update({ - 'workflow_spec': self.spec.name, - 'task_spec': None, - 'task_type': None, - 'task_id': None, - 'data': None, - }) + extra.update({'workflow_spec': self.spec.name}) + if logger.level < 20: + extra.update({ + 'finished': len([t for t in self.tasks.values() if t.has_state(TaskState.FINISHED_MASK)]), + 'definite': len([t for t in self.tasks.values() if t.has_state(TaskState.DEFINITE_MASK)]), + 'predicted': len([t for t in self.tasks.values() if t.has_state(TaskState.PREDICTED_MASK)]), + }) return extra def _predict(self, mask=TaskState.NOT_FINISHED_MASK):