Skip to content

Commit

Permalink
Merge pull request #427 from sartography/feature/improved-loggers
Browse files Browse the repository at this point in the history
split workflow & task logs, add state and update time to task logs
  • Loading branch information
essweine authored Jul 25, 2024
2 parents 8e000c0 + c232b04 commit 14c84aa
Show file tree
Hide file tree
Showing 59 changed files with 198 additions and 184 deletions.
2 changes: 2 additions & 0 deletions SpiffWorkflow/bpmn/serializer/default/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,14 @@ def to_dict(self, workflow):
'correlations': workflow.correlations,
'last_task': str(workflow.last_task.id) if workflow.last_task is not None else None,
'success': workflow.success,
'completed': workflow.completed,
'tasks': self.mapping_to_dict(workflow.tasks),
'root': str(workflow.task_tree.id),
}

def set_default_attributes(self, workflow, dct):
workflow.success = dct['success']
workflow.completed = dct.get('completed', False)
workflow.correlations = dct.pop('correlations', {})
if isinstance(dct['last_task'], str):
workflow.last_task = workflow.tasks.get(UUID(dct['last_task']))
Expand Down
6 changes: 3 additions & 3 deletions SpiffWorkflow/bpmn/specs/data_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

from SpiffWorkflow.bpmn.exceptions import WorkflowDataException

data_log = logging.getLogger('spiff.data')
logger = logging.getLogger('spiff.data')


class BpmnDataSpecification:
Expand Down Expand Up @@ -73,7 +73,7 @@ def get(self, my_task):
raise WorkflowDataException(message, my_task, data_input=self)

my_task.data[self.bpmn_id] = deepcopy(wf.data_objects[self.bpmn_id])
data_log.info(f'Read workflow variable {self.bpmn_id}', extra=my_task.log_info())
logger.info(f'Read workflow variable', extra=my_task.collect_log_extras({'bpmn_id': self.bpmn_id}))

def set(self, my_task):
"""Copy a value from the task data to the workflow data"""
Expand All @@ -88,7 +88,7 @@ def set(self, my_task):

wf.data_objects[self.bpmn_id] = deepcopy(my_task.data[self.bpmn_id])
del my_task.data[self.bpmn_id]
data_log.info(f'Set workflow variable {self.bpmn_id}', extra=my_task.log_info())
logger.info(f'Set workflow variable', extra=my_task.collect_log_extras({'bpmn_id': self.bpmn_id}))

def delete(self, my_task):
my_task.data.pop(self.bpmn_id, None)
Expand Down
7 changes: 3 additions & 4 deletions SpiffWorkflow/bpmn/specs/mixins/events/end_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301 USA

from SpiffWorkflow.util.task import TaskState, TaskFilter
from SpiffWorkflow.util.task import TaskState
from SpiffWorkflow.bpmn.specs.event_definitions.simple import TerminateEventDefinition, CancelEventDefinition
from .event_types import ThrowingEvent

Expand Down Expand Up @@ -46,11 +46,10 @@ def _on_complete_hook(self, my_task):
super(EndEvent, self)._on_complete_hook(my_task)

if isinstance(self.event_definition, TerminateEventDefinition):

# We are finished. Set the workflow data and cancel all tasks
my_task.workflow.set_data(**my_task.data)
for task in my_task.workflow.get_tasks(task_filter=TaskFilter(state=TaskState.NOT_FINISHED_MASK)):
for task in my_task.workflow.get_tasks(state=TaskState.NOT_FINISHED_MASK):
task.cancel()
my_task.workflow._mark_complete(my_task)

elif isinstance(self.event_definition, CancelEventDefinition):
my_task.workflow.cancel()
1 change: 0 additions & 1 deletion SpiffWorkflow/bpmn/specs/mixins/events/event_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ def catch(self, my_task, event):
definition, at which point we can update our task's state.
"""
self.event_definition.catch(my_task, event)
my_task.last_state_change = time.time()
my_task._set_state(TaskState.WAITING)

def _update_hook(self, my_task):
Expand Down
2 changes: 1 addition & 1 deletion SpiffWorkflow/bpmn/specs/mixins/subworkflow_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def _update_hook(self, my_task):
self.start_workflow(my_task)
my_task._set_state(TaskState.STARTED)
else:
return subprocess.is_completed()
return subprocess.completed

def _on_cancel(self, my_task):
subworkflow = my_task.workflow.top_workflow.get_subprocess(my_task)
Expand Down
1 change: 0 additions & 1 deletion SpiffWorkflow/bpmn/util/diff.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ def _aligned(self, original, candidates):
all(first is not None and first.name == second.name for first, second in zip(subs, candidates))

def _compare_task_specs(self, spec, candidate):

s1 = self._registry.convert(spec)
s2 = self._registry.convert(candidate)
if s1.get('typename') != s2.get('typename'):
Expand Down
2 changes: 1 addition & 1 deletion SpiffWorkflow/bpmn/util/subworkflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ def get_tasks_iterator(self, first_task=None, **kwargs):
class BpmnSubWorkflow(BpmnBaseWorkflow):

def __init__(self, spec, parent_task_id, top_workflow, **kwargs):
super().__init__(spec, **kwargs)
self.parent_task_id = parent_task_id
self.top_workflow = top_workflow
self.correlations = {}
self.depth = self._calculate_depth()
super().__init__(spec, **kwargs)

@property
def script_engine(self):
Expand Down
2 changes: 1 addition & 1 deletion SpiffWorkflow/bpmn/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def delete_subprocess(self, my_task):
return tasks

def get_active_subprocesses(self):
return [sp for sp in self.subprocesses.values() if not sp.is_completed()]
return [sp for sp in self.subprocesses.values() if not sp.completed]

def catch(self, event):
"""
Expand Down
6 changes: 3 additions & 3 deletions SpiffWorkflow/operators.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import logging
import re

logger = logging.getLogger('spiff')
logger = logging.getLogger('spiff.task')


class Term(object):
Expand Down Expand Up @@ -182,7 +182,7 @@ def valueof(scope, op, default=None):
return default
elif isinstance(op, Attrib):
if op.name not in scope.data:
logger.debug("Attrib('{op.name}') not present in task data", extra=scope.log_info({'data': scope.data}))
logger.debug("Attrib('{op.name}') not present in task data", extra=scope.collect_log_extras({'data': scope.data}))
return scope.get_data(op.name, default)
elif isinstance(op, PathAttrib):
if not op.path:
Expand All @@ -191,7 +191,7 @@ def valueof(scope, op, default=None):
data = scope.data
for part in parts:
if part not in data:
logger.debug(f"PathAttrib('{op.name}') not present in task data", extra=scope.log_info({'data': scope.data}))
logger.debug(f"PathAttrib('{op.name}') not present in task data", extra=scope.collect_log_extras({'data': scope.data}))
return default
data = data[part] # move down the path
return data
Expand Down
2 changes: 1 addition & 1 deletion SpiffWorkflow/specs/SubWorkflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def _update_hook(self, my_task):
subworkflow = my_task._get_internal_data('subworkflow')
if subworkflow is None:
self._create_subworkflow(my_task)
elif subworkflow.is_completed():
elif subworkflow.completed:
my_task.complete()

def _on_subworkflow_completed(self, subworkflow, my_task):
Expand Down
4 changes: 2 additions & 2 deletions SpiffWorkflow/specs/Transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

from .base import TaskSpec

logger = logging.getLogger('spiff')
logger = logging.getLogger('spiff.task')


class Transform(TaskSpec):
Expand Down Expand Up @@ -56,7 +56,7 @@ def _update_hook(self, my_task):

if self.transforms:
for transform in self.transforms:
logger.debug('Execute transform', extra=my_task.log_info({'transform': transform}))
logger.debug('Execute transform', extra=my_task.collect_log_extras({'transform': transform}))
exec(transform)
return True

Expand Down
40 changes: 22 additions & 18 deletions SpiffWorkflow/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@
from .util.deep_merge import DeepMerge
from .exceptions import WorkflowException

logger = logging.getLogger('spiff')
metrics = logging.getLogger('spiff.metrics')
data_log = logging.getLogger('spiff.data')
logger = logging.getLogger('spiff.task')


class Task(object):
Expand Down Expand Up @@ -100,7 +98,6 @@ def state(self):

@state.setter
def state(self, value):

if value < self._state:
raise WorkflowException(
'state went from %s to %s!' % (TaskState.get_name(self._state), TaskState.get_name(value)),
Expand Down Expand Up @@ -173,6 +170,7 @@ def reset_branch(self, data):
Returns:
list(`Task`): tasks removed from the tree
"""
logger.info(f'Branch reset', extra=self.collect_log_extras())
self.internal_data = {}
self.data = deepcopy(self.parent.data) if data is None else data
descendants = [t for t in self]
Expand Down Expand Up @@ -294,11 +292,15 @@ def _set_state(self, value):
"""Force set the state on a task"""

if value != self.state:
logger.info(f'State change to {TaskState.get_name(value)}', extra=self.log_info())
elapsed = time.time() - self.last_state_change
self.last_state_change = time.time()
self._state = value
logger.info(
f'State changed to {TaskState.get_name(value)}',
extra=self.collect_log_extras({'elapsed': elapsed})
)
else:
logger.debug(f'State set to {TaskState.get_name(value)}', extra=self.log_info())
logger.debug(f'State set to {TaskState.get_name(value)}', extra=self.collect_log_extras())

def _assign_new_thread_id(self, recursive=True):
"""Assigns a new thread id to the task."""
Expand Down Expand Up @@ -347,11 +349,6 @@ def run(self):
"""
start = time.time()
retval = self.task_spec._run(self)
extra = self.log_info({
'action': 'Complete',
'elapsed': time.time() - start
})
metrics.debug('', extra=extra)
if retval is None:
self._set_state(TaskState.STARTED)
elif retval is False:
Expand All @@ -374,7 +371,6 @@ def complete(self):
"""Marks this task complete."""
self._set_state(TaskState.COMPLETED)
self.task_spec._on_complete(self)
self.workflow.last_task = self

def error(self):
"""Marks this task as error."""
Expand All @@ -388,17 +384,25 @@ def trigger(self, *args):
"""
self.task_spec._on_trigger(self, *args)

def log_info(self, dct=None):
def collect_log_extras(self, dct=None):
"""Return logging details for this task"""
extra = dct or {}
extra.update({
extra = {
'workflow_spec': self.workflow.spec.name,
'task_spec': self.task_spec.name,
'task_id': self.id,
'task_type': self.task_spec.__class__.__name__,
'data': self.data if logger.level < 20 else None,
'internal_data': self.internal_data if logger.level <= 10 else None,
})
'state': TaskState.get_name(self._state),
'last_state_change': self.last_state_change,
'elapsed': 0,
'parent': None if self.parent is None else self.parent.id,
}
if dct is not None:
extra.update(dct)
if logger.level < 20:
extra.update({
'data': self.data if logger.level < 20 else None,
'internal_data': self.internal_data if logger.level < 20 else None,
})
return extra

def __iter__(self):
Expand Down
57 changes: 31 additions & 26 deletions SpiffWorkflow/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from .util.event import Event
from .exceptions import TaskNotFoundException, WorkflowException

logger = logging.getLogger('spiff')
logger = logging.getLogger('spiff.workflow')


class Workflow(object):
Expand Down Expand Up @@ -64,24 +64,22 @@ def __init__(self, workflow_spec, deserializing=False):
if not deserializing:
self.task_tree = Task(self, self.spec.start, state=TaskState.FUTURE)
self.task_tree.task_spec._predict(self.task_tree, mask=TaskState.NOT_FINISHED_MASK)
logger.info('Initialized workflow', extra=self.collect_log_extras())
self.task_tree._ready()
logger.info('Initialize', extra=self.log_info())

def is_completed(self):
"""Checks whether the workflow is complete.
Returns:
bool: True if the workflow has no unfinished tasks
"""
if self.completed:
return True
iter = TaskIterator(self.task_tree, state=TaskState.NOT_FINISHED_MASK)
try:
next(iter)
except StopIteration:
self.completed = True
return True
return False
if not self.completed:
iter = TaskIterator(self.task_tree, state=TaskState.NOT_FINISHED_MASK)
try:
next(iter)
except StopIteration:
self.completed = True
return self.completed

def manual_input_required(self):
"""Checks whether the workflow requires manual input.
Expand Down Expand Up @@ -215,13 +213,14 @@ def cancel(self, success=False):
list(`Task`): the cancelled tasks
"""
self.success = success
cancel = []
self.completed = True
logger.info(f'Workflow cancelled', extra=self.collect_log_extras())
cancelled = []
for task in TaskIterator(self.task_tree, state=TaskState.NOT_FINISHED_MASK):
cancel.append(task)
for task in cancel:
cancelled.append(task)
for task in cancelled:
task.cancel()
logger.info(f'Cancel with {len(cancel)} remaining', extra=self.log_info())
return cancel
return cancelled

def set_data(self, **kwargs):
"""Defines the given attribute/value pairs."""
Expand All @@ -247,23 +246,23 @@ def reset_from_task_id(self, task_id, data=None):
task_id: the id of the task to reset to
data (dict): optionally replace the data (if None, data will be copied from the parent task)
Returns:
Returns: extra.update(
list(`Task`): tasks removed from the tree
"""
task = self.get_task_from_id(task_id)
self.last_task = task.parent
return task.reset_branch(data)

def log_info(self, dct=None):
def collect_log_extras(self, dct=None):
"""Return logging details for this workflow"""
extra = dct or {}
extra.update({
'workflow_spec': self.spec.name,
'task_spec': None,
'task_type': None,
'task_id': None,
'data': None,
'success': self.success,
'completed': self.completed,
})
if logger.level < 20:
extra.update({'tasks': [t.id for t in Workflow.get_tasks(self)]})
return extra

def _predict(self, mask=TaskState.NOT_FINISHED_MASK):
Expand All @@ -273,12 +272,13 @@ def _predict(self, mask=TaskState.NOT_FINISHED_MASK):

def _task_completed_notify(self, task):
"""Called whenever a task completes"""
self.last_task = task
if task.task_spec.name == 'End':
self.data.update(task.data)
self.update_waiting_tasks()
if self.completed_event.n_subscribers() > 0 and self.is_completed():
# Since is_completed() is expensive it makes sense to bail out if calling it is not necessary.
self._mark_complete(task)
if self.completed:
self.completed_event(self)
else:
self.update_waiting_tasks()

def _remove_task(self, task_id):
task = self.tasks[task_id]
Expand All @@ -287,6 +287,11 @@ def _remove_task(self, task_id):
task.parent._children.remove(task.id)
self.tasks.pop(task_id)

def _mark_complete(self, task):
logger.info('Workflow completed', extra=self.collect_log_extras())
self.data.update(task.data)
self.completed = True

def _get_mutex(self, name):
"""Get or create a mutex"""
if name not in self.locks:
Expand Down
Loading

0 comments on commit 14c84aa

Please sign in to comment.