Skip to content

Commit

Permalink
Merge pull request #36 from JonatanMartens/development
Browse files Browse the repository at this point in the history
v1.2.3
  • Loading branch information
JonatanMartens authored Sep 27, 2020
2 parents 4bb3975 + b537164 commit f555e5c
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 15 deletions.
13 changes: 8 additions & 5 deletions pyzeebe/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,23 +75,26 @@ def _create_task_handler(self, task: Task) -> Callable[[TaskContext], TaskContex

def task_handler(context: TaskContext) -> TaskContext:
context = before_decorator_runner(context)
context = self.run_task_inner_function(task, context)
context, task_succeeded = self._run_task_inner_function(task, context)
context = after_decorator_runner(context)
self.complete_job(context)
if task_succeeded:
self._complete_job(context)
return context

return task_handler

def run_task_inner_function(self, task: Task, context: TaskContext) -> TaskContext:
def _run_task_inner_function(self, task: Task, context: TaskContext) -> Tuple[TaskContext, bool]:
task_succeeded = False
try:
context.variables = task.inner_function(**context.variables)
task_succeeded = True
except Exception as e:
logging.debug(f"Failed job: {context}. Error: {e}.")
task.exception_handler(e, context, TaskStatusController(context, self.zeebe_adapter))
finally:
return context
return context, task_succeeded

def complete_job(self, context: TaskContext) -> None:
def _complete_job(self, context: TaskContext) -> None:
try:
logging.debug(f"Completing job: {context}")
self.zeebe_adapter.complete_job(job_key=context.key, variables=context.variables)
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

setuptools.setup(
name="pyzeebe",
version="1.2.2",
version="1.2.3",
author="Jonatan Martens",
author_email="[email protected]",
description="Zeebe client api",
Expand Down
16 changes: 7 additions & 9 deletions tests/unit/worker/worker_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@ def decorator(context: TaskContext) -> TaskContext:
return context


def failing_decorator(context: TaskContext) -> TaskContext:
raise Exception()


@pytest.fixture(scope="module")
def grpc_add_to_server():
from pyzeebe.grpc_internals.zeebe_pb2_grpc import add_GatewayServicer_to_server
Expand Down Expand Up @@ -101,12 +97,14 @@ def test_after_task_decorator_called():
def test_decorator_failed():
context = random_task_context(task)

zeebe_worker.before(failing_decorator)
zeebe_worker.after(failing_decorator)
zeebe_worker.add_task(task)
with patch("pyzeebe.grpc_internals.zeebe_adapter.ZeebeAdapter.complete_job") as grpc_mock:
with patch("tests.unit.worker.worker_test.decorator") as decorator_mock:
decorator_mock.side_effect = Exception()
zeebe_worker.before(decorator)
zeebe_worker.after(decorator)
zeebe_worker.add_task(task)

assert isinstance(task.handler(context), TaskContext)
grpc_mock.assert_called_with(job_key=context.key, variables=context.variables)
assert decorator_mock.call_count == 2


def test_task_exception_handler_called():
Expand Down

0 comments on commit f555e5c

Please sign in to comment.