Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

split workflow & task logs, add state and update time to task logs #427

Merged
merged 4 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions SpiffWorkflow/bpmn/specs/data_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

from SpiffWorkflow.bpmn.exceptions import WorkflowDataException

data_log = logging.getLogger('spiff.data')
logger = logging.getLogger('spiff.data')


class BpmnDataSpecification:
Expand Down Expand Up @@ -73,7 +73,7 @@ def get(self, my_task):
raise WorkflowDataException(message, my_task, data_input=self)

my_task.data[self.bpmn_id] = deepcopy(wf.data_objects[self.bpmn_id])
data_log.info(f'Read workflow variable {self.bpmn_id}', extra=my_task.log_info())
logger.info(f'Read workflow variable', extra=my_task.collect_log_extras({'bpmn_id': self.bpmn_id}))

def set(self, my_task):
"""Copy a value from the task data to the workflow data"""
Expand All @@ -88,7 +88,7 @@ def set(self, my_task):

wf.data_objects[self.bpmn_id] = deepcopy(my_task.data[self.bpmn_id])
del my_task.data[self.bpmn_id]
data_log.info(f'Set workflow variable {self.bpmn_id}', extra=my_task.log_info())
logger.info(f'Set workflow variable', extra=my_task.collect_log_extras({'bpmn_id': self.bpmn_id}))

def delete(self, my_task):
my_task.data.pop(self.bpmn_id, None)
Expand Down
1 change: 0 additions & 1 deletion SpiffWorkflow/bpmn/util/diff.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ def _aligned(self, original, candidates):
all(first is not None and first.name == second.name for first, second in zip(subs, candidates))

def _compare_task_specs(self, spec, candidate):

s1 = self._registry.convert(spec)
s2 = self._registry.convert(candidate)
if s1.get('typename') != s2.get('typename'):
Expand Down
6 changes: 3 additions & 3 deletions SpiffWorkflow/operators.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import logging
import re

logger = logging.getLogger('spiff')
logger = logging.getLogger('spiff.task')


class Term(object):
Expand Down Expand Up @@ -182,7 +182,7 @@ def valueof(scope, op, default=None):
return default
elif isinstance(op, Attrib):
if op.name not in scope.data:
logger.debug("Attrib('{op.name}') not present in task data", extra=scope.log_info({'data': scope.data}))
logger.debug("Attrib('{op.name}') not present in task data", extra=scope.collect_log_extras({'data': scope.data}))
return scope.get_data(op.name, default)
elif isinstance(op, PathAttrib):
if not op.path:
Expand All @@ -191,7 +191,7 @@ def valueof(scope, op, default=None):
data = scope.data
for part in parts:
if part not in data:
logger.debug(f"PathAttrib('{op.name}') not present in task data", extra=scope.log_info({'data': scope.data}))
logger.debug(f"PathAttrib('{op.name}') not present in task data", extra=scope.collect_log_extras({'data': scope.data}))
return default
data = data[part] # move down the path
return data
Expand Down
4 changes: 2 additions & 2 deletions SpiffWorkflow/specs/Transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

from .base import TaskSpec

logger = logging.getLogger('spiff')
logger = logging.getLogger('spiff.task')


class Transform(TaskSpec):
Expand Down Expand Up @@ -56,7 +56,7 @@ def _update_hook(self, my_task):

if self.transforms:
for transform in self.transforms:
logger.debug('Execute transform', extra=my_task.log_info({'transform': transform}))
logger.debug('Execute transform', extra=my_task.collect_log_extras({'transform': transform}))
exec(transform)
return True

Expand Down
26 changes: 12 additions & 14 deletions SpiffWorkflow/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@
from .util.deep_merge import DeepMerge
from .exceptions import WorkflowException

logger = logging.getLogger('spiff')
metrics = logging.getLogger('spiff.metrics')
data_log = logging.getLogger('spiff.data')
logger = logging.getLogger('spiff.task')


class Task(object):
Expand Down Expand Up @@ -100,7 +98,6 @@ def state(self):

@state.setter
def state(self, value):

if value < self._state:
raise WorkflowException(
'state went from %s to %s!' % (TaskState.get_name(self._state), TaskState.get_name(value)),
Expand Down Expand Up @@ -173,6 +170,7 @@ def reset_branch(self, data):
Returns:
list(`Task`): tasks removed from the tree
"""
logger.info(f'Branch reset', extra=self.collect_log_extras())
self.internal_data = {}
self.data = deepcopy(self.parent.data) if data is None else data
descendants = [t for t in self]
Expand Down Expand Up @@ -294,11 +292,11 @@ def _set_state(self, value):
"""Force set the state on a task"""

if value != self.state:
logger.info(f'State change to {TaskState.get_name(value)}', extra=self.log_info())
self.last_state_change = time.time()
self._state = value
logger.info(f'State changed to {TaskState.get_name(value)}', extra=self.collect_log_extras())
else:
logger.debug(f'State set to {TaskState.get_name(value)}', extra=self.log_info())
logger.debug(f'State set to {TaskState.get_name(value)}', extra=self.collect_log_extras())

def _assign_new_thread_id(self, recursive=True):
"""Assigns a new thread id to the task."""
Expand Down Expand Up @@ -347,11 +345,6 @@ def run(self):
"""
start = time.time()
retval = self.task_spec._run(self)
extra = self.log_info({
'action': 'Complete',
'elapsed': time.time() - start
})
metrics.debug('', extra=extra)
if retval is None:
self._set_state(TaskState.STARTED)
elif retval is False:
Expand Down Expand Up @@ -388,17 +381,22 @@ def trigger(self, *args):
"""
self.task_spec._on_trigger(self, *args)

def log_info(self, dct=None):
def collect_log_extras(self, dct=None):
"""Return logging details for this task"""
extra = dct or {}
extra.update({
'workflow_spec': self.workflow.spec.name,
'task_spec': self.task_spec.name,
'task_id': self.id,
'task_type': self.task_spec.__class__.__name__,
'data': self.data if logger.level < 20 else None,
'internal_data': self.internal_data if logger.level <= 10 else None,
'state': TaskState.get_name(self._state),
'last_state_change': self.last_state_change,
})
if logger.level < 20:
extra.update({
'data': self.data if logger.level < 20 else None,
'internal_data': self.internal_data if logger.level < 20 else None,
})
return extra

def __iter__(self):
Expand Down
30 changes: 15 additions & 15 deletions SpiffWorkflow/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from .util.event import Event
from .exceptions import TaskNotFoundException, WorkflowException

logger = logging.getLogger('spiff')
logger = logging.getLogger('spiff.workflow')


class Workflow(object):
Expand Down Expand Up @@ -64,8 +64,8 @@ def __init__(self, workflow_spec, deserializing=False):
if not deserializing:
self.task_tree = Task(self, self.spec.start, state=TaskState.FUTURE)
self.task_tree.task_spec._predict(self.task_tree, mask=TaskState.NOT_FINISHED_MASK)
logger.info('Initialized workflow', extra=self.collect_log_extras())
self.task_tree._ready()
logger.info('Initialize', extra=self.log_info())

def is_completed(self):
"""Checks whether the workflow is complete.
Expand Down Expand Up @@ -215,13 +215,13 @@ def cancel(self, success=False):
list(`Task`): the cancelled tasks
"""
self.success = success
cancel = []
logger.info(f'Workflow cancelled', extra=self.collect_log_extras())
cancelled = []
for task in TaskIterator(self.task_tree, state=TaskState.NOT_FINISHED_MASK):
cancel.append(task)
for task in cancel:
cancelled.append(task)
for task in cancelled:
task.cancel()
logger.info(f'Cancel with {len(cancel)} remaining', extra=self.log_info())
return cancel
return cancelled

def set_data(self, **kwargs):
"""Defines the given attribute/value pairs."""
Expand Down Expand Up @@ -254,16 +254,16 @@ def reset_from_task_id(self, task_id, data=None):
self.last_task = task.parent
return task.reset_branch(data)

def log_info(self, dct=None):
def collect_log_extras(self, dct=None):
"""Return logging details for this workflow"""
extra = dct or {}
extra.update({
'workflow_spec': self.spec.name,
'task_spec': None,
'task_type': None,
'task_id': None,
'data': None,
})
extra.update({'workflow_spec': self.spec.name})
if logger.level < 20:
extra.update({
'finished': len([t for t in self.tasks.values() if t.has_state(TaskState.FINISHED_MASK)]),
'definite': len([t for t in self.tasks.values() if t.has_state(TaskState.DEFINITE_MASK)]),
'predicted': len([t for t in self.tasks.values() if t.has_state(TaskState.PREDICTED_MASK)]),
})
return extra

def _predict(self, mask=TaskState.NOT_FINISHED_MASK):
Expand Down
Loading