Skip to content

Commit

Permalink
Merge pull request #402 from dimastbk/worker-level-exception-handler
Browse files Browse the repository at this point in the history
add worker level exception_handler
  • Loading branch information
dimastbk authored May 24, 2024
2 parents 4170934 + ba57cd0 commit 796c5a5
Show file tree
Hide file tree
Showing 11 changed files with 147 additions and 24 deletions.
7 changes: 7 additions & 0 deletions docs/errors.rst
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,10 @@ All ``pyzeebe`` errors inherit from :py:class:`PyZeebeError`
.. autoexception:: pyzeebe.errors.InvalidOAuthCredentialsError

.. autoexception:: pyzeebe.errors.InvalidCamundaCloudCredentialsError


=================
Exception Handler
=================

.. autofunction:: pyzeebe.default_exception_handler
5 changes: 5 additions & 0 deletions docs/worker_tasks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ Now every time ``my_task`` is called (and then fails), ``my_exception_handler``

This tells Zeebe that the job failed. The job will then be retried (if configured in process definition).

.. note::
The exception handler can also be set via :py:class:`pyzeebe.ZeebeWorker` or :py:class:`pyzeebe.ZeebeTaskRouter`.
Pyzeebe will try to find the exception handler in the following order:
``Worker`` -> ``Router`` -> ``Task`` -> :py:func:`pyzeebe.default_exception_handler`


Task timeout
------------
Expand Down
4 changes: 2 additions & 2 deletions pyzeebe/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
from pyzeebe.client.sync_client import SyncZeebeClient # type: ignore
from pyzeebe.job.job import Job
from pyzeebe.job.job_status import JobStatus
from pyzeebe.task.exception_handler import ExceptionHandler
from pyzeebe.task.exception_handler import ExceptionHandler, default_exception_handler
from pyzeebe.task.task_config import TaskConfig
from pyzeebe.task.types import TaskDecorator
from pyzeebe.worker.task_router import ZeebeTaskRouter, default_exception_handler
from pyzeebe.worker.task_router import ZeebeTaskRouter
from pyzeebe.worker.worker import ZeebeWorker
12 changes: 12 additions & 0 deletions pyzeebe/task/exception_handler.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
import logging
from typing import Awaitable, Callable

from pyzeebe.errors.pyzeebe_errors import BusinessError
from pyzeebe.job.job import Job

logger = logging.getLogger(__name__)

ExceptionHandler = Callable[[Exception, Job], Awaitable]


async def default_exception_handler(e: Exception, job: Job) -> None:
logger.warning("Task type: %s - failed job %s. Error: %s.", job.type, job, e)
if isinstance(e, BusinessError):
await job.set_error_status(f"Failed job. Recoverable error: {e}", error_code=e.error_code)
else:
await job.set_failure_status(f"Failed job. Error: {e}")
4 changes: 3 additions & 1 deletion pyzeebe/task/task_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from pyzeebe.function_tools.dict_tools import convert_to_dict_function
from pyzeebe.function_tools.parameter_tools import get_job_parameter_name
from pyzeebe.job.job import create_copy
from pyzeebe.task.exception_handler import default_exception_handler
from pyzeebe.task.task import Task
from pyzeebe.task.task_config import TaskConfig
from pyzeebe.task.types import AsyncTaskDecorator, DecoratorRunner, JobHandler
Expand Down Expand Up @@ -65,7 +66,8 @@ async def run_original_task_function(
return returned_value, True
except Exception as e:
logger.debug("Failed job: %s. Error: %s.", job, e)
await task_config.exception_handler(e, job)
exception_handler = task_config.exception_handler or default_exception_handler
await exception_handler(e, job)
return job.variables, False


Expand Down
2 changes: 1 addition & 1 deletion pyzeebe/task/task_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class TaskConfig:
def __init__(
self,
type: str,
exception_handler: ExceptionHandler,
exception_handler: Optional[ExceptionHandler],
timeout_ms: int,
max_jobs_to_activate: int,
max_running_jobs: int,
Expand Down
29 changes: 15 additions & 14 deletions pyzeebe/worker/task_router.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import logging
from typing import Callable, List, Optional, Tuple

from pyzeebe.errors import BusinessError, DuplicateTaskTypeError, TaskNotFoundError
from pyzeebe.errors import DuplicateTaskTypeError, TaskNotFoundError
from pyzeebe.function_tools import parameter_tools
from pyzeebe.job.job import Job
from pyzeebe.task import task_builder
from pyzeebe.task.exception_handler import ExceptionHandler
from pyzeebe.task.task import Task
Expand All @@ -13,27 +12,20 @@
logger = logging.getLogger(__name__)


async def default_exception_handler(e: Exception, job: Job) -> None:
logger.warning("Task type: %s - failed job %s. Error: %s.", job.type, job, e)
if isinstance(e, BusinessError):
await job.set_error_status(f"Failed job. Recoverable error: {e}", error_code=e.error_code)
else:
await job.set_failure_status(f"Failed job. Error: {e}")


class ZeebeTaskRouter:
def __init__(
self,
before: Optional[List[TaskDecorator]] = None,
after: Optional[List[TaskDecorator]] = None,
exception_handler: ExceptionHandler = default_exception_handler,
exception_handler: Optional[ExceptionHandler] = None,
):
"""
Args:
before (List[TaskDecorator]): Decorators to be performed before each task
after (List[TaskDecorator]): Decorators to be performed after each task
exception_handler (ExceptionHandler): Handler that will be called when a job fails.
"""
self._default_exception_handler = exception_handler
self._exception_handler = exception_handler
self._before: List[TaskDecorator] = before or []
self._after: List[TaskDecorator] = after or []
self.tasks: List[Task] = []
Expand Down Expand Up @@ -73,7 +65,7 @@ def task(
DuplicateTaskTypeError: If a task from the router already exists in the worker
NoVariableNameGivenError: When single_value is set, but no variable_name is given
"""
_exception_handler = exception_handler or self._default_exception_handler
_exception_handler = exception_handler or self._exception_handler

def task_wrapper(task_function: Callable):
config = TaskConfig(
Expand Down Expand Up @@ -103,7 +95,7 @@ def _add_task(self, task: Task):
def _add_decorators_to_config(self, config: TaskConfig) -> TaskConfig:
new_task_config = TaskConfig(
type=config.type,
exception_handler=config.exception_handler,
exception_handler=config.exception_handler or self._exception_handler,
timeout_ms=config.timeout_ms,
max_jobs_to_activate=config.max_jobs_to_activate,
max_running_jobs=config.max_running_jobs,
Expand Down Expand Up @@ -140,6 +132,15 @@ def after(self, *decorators: TaskDecorator) -> None:
"""
self._after.extend(decorators)

def exception_handler(self, exception_handler: ExceptionHandler) -> None:
"""
Add exception handler to be called when a job fails
Args:
exception_handler (ExceptionHandler): Handler that will be called when a job fails.
"""
self._exception_handler = exception_handler

def remove_task(self, task_type: str) -> Task:
"""
Remove a task
Expand Down
5 changes: 4 additions & 1 deletion pyzeebe/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from pyzeebe import TaskDecorator
from pyzeebe.grpc_internals.zeebe_adapter import ZeebeAdapter
from pyzeebe.task import task_builder
from pyzeebe.task.exception_handler import ExceptionHandler
from pyzeebe.worker.job_executor import JobExecutor
from pyzeebe.worker.job_poller import JobPoller
from pyzeebe.worker.task_router import ZeebeTaskRouter
Expand All @@ -30,6 +31,7 @@ def __init__(
watcher_max_errors_factor: int = 3,
poll_retry_delay: int = 5,
tenant_ids: Optional[List[str]] = None,
exception_handler: Optional[ExceptionHandler] = None,
):
"""
Args:
Expand All @@ -38,12 +40,13 @@ def __init__(
request_timeout (int): Longpolling timeout for getting tasks from zeebe. If 0 default value is used
before (List[TaskDecorator]): Decorators to be performed before each task
after (List[TaskDecorator]): Decorators to be performed after each task
exception_handler (ExceptionHandler): Handler that will be called when a job fails.
max_connection_retries (int): Amount of connection retries before worker gives up on connecting to zeebe. To setup with infinite retries use -1
watcher_max_errors_factor (int): Number of consecutive errors for a task watcher will accept before raising MaxConsecutiveTaskThreadError
poll_retry_delay (int): The number of seconds to wait before attempting to poll again when reaching max amount of running jobs
tenant_ids (List[str]): A list of tenant IDs for which to activate jobs. New in Zeebe 8.3.
"""
super().__init__(before, after)
super().__init__(before, after, exception_handler)
self.zeebe_adapter = ZeebeAdapter(grpc_channel, max_connection_retries)
self.name = name or socket.gethostname()
self.request_timeout = request_timeout
Expand Down
8 changes: 8 additions & 0 deletions tests/unit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,14 @@ async def simple_decorator(job: Job) -> Job:
return AsyncMock(wraps=simple_decorator)


@pytest.fixture
def exception_handler():
async def simple_exception_handler(e: Exception, job: Job) -> None:
return None

return AsyncMock(wraps=simple_exception_handler)


@pytest.fixture(scope="module")
def grpc_add_to_server():
from zeebe_grpc.gateway_pb2_grpc import add_GatewayServicer_to_server
Expand Down
21 changes: 17 additions & 4 deletions tests/unit/worker/task_router_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
from pyzeebe import TaskDecorator
from pyzeebe.errors import BusinessError, DuplicateTaskTypeError, TaskNotFoundError
from pyzeebe.job.job import Job
from pyzeebe.task.exception_handler import ExceptionHandler, default_exception_handler
from pyzeebe.task.task import Task
from pyzeebe.worker.task_router import ZeebeTaskRouter, default_exception_handler
from pyzeebe.worker.task_router import ZeebeTaskRouter
from tests.unit.utils.random_utils import randint


Expand All @@ -20,7 +21,7 @@ def test_get_task(router: ZeebeTaskRouter, task: Task):


def test_task_inherits_exception_handler(router: ZeebeTaskRouter, task: Task):
router._default_exception_handler = str
router._exception_handler = str
router.task(task.type)(task.original_function)

found_task = router.get_task(task.type)
Expand Down Expand Up @@ -100,6 +101,12 @@ def test_add_after_decorator(router: ZeebeTaskRouter, decorator: TaskDecorator):
assert len(router._after) == 1


def test_set_exception_handler(router: ZeebeTaskRouter, exception_handler: ExceptionHandler):
router.exception_handler(exception_handler)

assert router._exception_handler is exception_handler


def test_add_before_decorator_through_constructor(decorator: TaskDecorator):
router = ZeebeTaskRouter(before=[decorator])

Expand All @@ -112,9 +119,15 @@ def test_add_after_decorator_through_constructor(decorator: TaskDecorator):
assert len(router._after) == 1


def test_set_exception_handler_through_constructor(exception_handler: ExceptionHandler):
router = ZeebeTaskRouter(exception_handler=exception_handler)

assert router._exception_handler is exception_handler


@pytest.mark.asyncio
async def test_default_exception_handler_logs_a_warning(mocked_job_with_adapter: Job):
with mock.patch("pyzeebe.worker.task_router.logger.warning") as logging_mock:
with mock.patch("pyzeebe.task.exception_handler.logger.warning") as logging_mock:
await default_exception_handler(Exception(), mocked_job_with_adapter)

mocked_job_with_adapter.set_failure_status.assert_called()
Expand All @@ -132,7 +145,7 @@ async def test_default_exception_handler_uses_business_error(job_without_adapter

@pytest.mark.asyncio
async def test_default_exception_handler_warns_of_job_failure(job_without_adapter):
with mock.patch("pyzeebe.worker.task_router.logger.warning") as logging_mock:
with mock.patch("pyzeebe.task.exception_handler.logger.warning") as logging_mock:
with mock.patch("pyzeebe.job.job.Job.set_error_status"):
exception = BusinessError("custom-error-code")
await default_exception_handler(exception, job_without_adapter)
Expand Down
74 changes: 73 additions & 1 deletion tests/unit/worker/worker_test.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from typing import List
from unittest.mock import AsyncMock
from uuid import uuid4

import grpc
import pytest

from pyzeebe import TaskDecorator, ZeebeTaskRouter
from pyzeebe import ExceptionHandler, TaskDecorator, ZeebeTaskRouter
from pyzeebe.errors import DuplicateTaskTypeError
from pyzeebe.job.job import Job
from pyzeebe.task.task import Task
Expand Down Expand Up @@ -55,6 +56,10 @@ def test_add_after_decorator(self, zeebe_worker: ZeebeWorker, decorator: TaskDec
assert len(zeebe_worker._after) == 1
assert decorator in zeebe_worker._after

def test_set_exception_handler(self, zeebe_worker: ZeebeWorker, exception_handler: ExceptionHandler):
zeebe_worker.exception_handler(exception_handler)
assert exception_handler is zeebe_worker._exception_handler

def test_add_constructor_before_decorator(self, aio_grpc_channel: grpc.aio.Channel, decorator: TaskDecorator):
zeebe_worker = ZeebeWorker(aio_grpc_channel, before=[decorator])
assert len(zeebe_worker._before) == 1
Expand All @@ -65,6 +70,12 @@ def test_add_constructor_after_decorator(self, aio_grpc_channel: grpc.aio.Channe
assert len(zeebe_worker._after) == 1
assert decorator in zeebe_worker._after

def test_set_constructor_exception_handler(
self, aio_grpc_channel: grpc.aio.Channel, exception_handler: ExceptionHandler
):
zeebe_worker = ZeebeWorker(aio_grpc_channel, exception_handler=exception_handler)
assert exception_handler is zeebe_worker._exception_handler


class TestIncludeRouter:
def test_include_router_adds_task(self, zeebe_worker: ZeebeWorker, router: ZeebeTaskRouter, task_type: str):
Expand Down Expand Up @@ -100,6 +111,21 @@ async def test_router_after_decorator(

decorator.assert_called_once()

@pytest.mark.asyncio
async def test_router_with_exception_handler(
self,
zeebe_worker: ZeebeWorker,
router: ZeebeTaskRouter,
exception_handler: ExceptionHandler,
mocked_job_with_adapter: Job,
):
router.exception_handler(exception_handler)
task = self.include_router_with_task_error(zeebe_worker, router)

await task.job_handler(mocked_job_with_adapter)

exception_handler.assert_called_once()

@pytest.mark.asyncio
async def test_worker_with_before_decorator(
self, zeebe_worker: ZeebeWorker, router: ZeebeTaskRouter, decorator: TaskDecorator, mocked_job_with_adapter: Job
Expand All @@ -122,6 +148,39 @@ async def test_worker_with_after_decorator(

decorator.assert_called_once()

@pytest.mark.asyncio
async def test_worker_with_exception_handler(
self,
zeebe_worker: ZeebeWorker,
router: ZeebeTaskRouter,
exception_handler: ExceptionHandler,
mocked_job_with_adapter: Job,
):
zeebe_worker.exception_handler(exception_handler)
task = self.include_router_with_task_error(zeebe_worker, router)

await task.job_handler(mocked_job_with_adapter)

exception_handler.assert_called_once()

@pytest.mark.asyncio
async def test_worker_and_router_with_exception_handler(
self,
zeebe_worker: ZeebeWorker,
router: ZeebeTaskRouter,
mocked_job_with_adapter: Job,
):
exception_handler_router = AsyncMock()
exception_handler_worker = AsyncMock()
router.exception_handler(exception_handler_router)
zeebe_worker.exception_handler(exception_handler_worker)
task = self.include_router_with_task_error(zeebe_worker, router)

await task.job_handler(mocked_job_with_adapter)

exception_handler_router.assert_called_once()
exception_handler_worker.assert_not_called()

@staticmethod
def include_router_with_task(zeebe_worker: ZeebeWorker, router: ZeebeTaskRouter, task_type: str = None) -> Task:
task_type = task_type or str(uuid4())
Expand All @@ -132,3 +191,16 @@ def dummy_function():

zeebe_worker.include_router(router)
return zeebe_worker.get_task(task_type)

@staticmethod
def include_router_with_task_error(
zeebe_worker: ZeebeWorker, router: ZeebeTaskRouter, task_type: str = None
) -> Task:
task_type = task_type or str(uuid4())

@router.task(task_type)
def dummy_function():
raise Exception()

zeebe_worker.include_router(router)
return zeebe_worker.get_task(task_type)

0 comments on commit 796c5a5

Please sign in to comment.