From 4408be84a5d97e086cd2807cc17b18456a82fa50 Mon Sep 17 00:00:00 2001 From: Dmitriy Date: Fri, 24 May 2024 10:52:28 +0500 Subject: [PATCH 1/4] add worker level exception_handler --- pyzeebe/__init__.py | 4 +- pyzeebe/task/exception_handler.py | 12 ++++++ pyzeebe/task/task_builder.py | 4 +- pyzeebe/task/task_config.py | 2 +- pyzeebe/worker/task_router.py | 29 +++++++------- pyzeebe/worker/worker.py | 5 ++- tests/unit/conftest.py | 8 ++++ tests/unit/worker/task_router_test.py | 21 ++++++++-- tests/unit/worker/worker_test.py | 55 ++++++++++++++++++++++++++- 9 files changed, 116 insertions(+), 24 deletions(-) diff --git a/pyzeebe/__init__.py b/pyzeebe/__init__.py index 7149c5ad..d962eef1 100644 --- a/pyzeebe/__init__.py +++ b/pyzeebe/__init__.py @@ -6,8 +6,8 @@ from pyzeebe.client.sync_client import SyncZeebeClient # type: ignore from pyzeebe.job.job import Job from pyzeebe.job.job_status import JobStatus -from pyzeebe.task.exception_handler import ExceptionHandler +from pyzeebe.task.exception_handler import ExceptionHandler, default_exception_handler from pyzeebe.task.task_config import TaskConfig from pyzeebe.task.types import TaskDecorator -from pyzeebe.worker.task_router import ZeebeTaskRouter, default_exception_handler +from pyzeebe.worker.task_router import ZeebeTaskRouter from pyzeebe.worker.worker import ZeebeWorker diff --git a/pyzeebe/task/exception_handler.py b/pyzeebe/task/exception_handler.py index 83226137..4f762c58 100644 --- a/pyzeebe/task/exception_handler.py +++ b/pyzeebe/task/exception_handler.py @@ -1,5 +1,17 @@ +import logging from typing import Awaitable, Callable +from pyzeebe.errors.pyzeebe_errors import BusinessError from pyzeebe.job.job import Job +logger = logging.getLogger(__name__) + ExceptionHandler = Callable[[Exception, Job], Awaitable] + + +async def default_exception_handler(e: Exception, job: Job) -> None: + logger.warning("Task type: %s - failed job %s. Error: %s.", job.type, job, e) + if isinstance(e, BusinessError): + await job.set_error_status(f"Failed job. Recoverable error: {e}", error_code=e.error_code) + else: + await job.set_failure_status(f"Failed job. Error: {e}") diff --git a/pyzeebe/task/task_builder.py b/pyzeebe/task/task_builder.py index d420a96a..c684ff3d 100644 --- a/pyzeebe/task/task_builder.py +++ b/pyzeebe/task/task_builder.py @@ -8,6 +8,7 @@ from pyzeebe.function_tools.dict_tools import convert_to_dict_function from pyzeebe.function_tools.parameter_tools import get_job_parameter_name from pyzeebe.job.job import create_copy +from pyzeebe.task.exception_handler import default_exception_handler from pyzeebe.task.task import Task from pyzeebe.task.task_config import TaskConfig from pyzeebe.task.types import AsyncTaskDecorator, DecoratorRunner, JobHandler @@ -65,7 +66,8 @@ async def run_original_task_function( return returned_value, True except Exception as e: logger.debug("Failed job: %s. Error: %s.", job, e) - await task_config.exception_handler(e, job) + exception_handler = task_config.exception_handler or default_exception_handler + await exception_handler(e, job) return job.variables, False diff --git a/pyzeebe/task/task_config.py b/pyzeebe/task/task_config.py index c93a8341..1c8c4b43 100644 --- a/pyzeebe/task/task_config.py +++ b/pyzeebe/task/task_config.py @@ -13,7 +13,7 @@ class TaskConfig: def __init__( self, type: str, - exception_handler: ExceptionHandler, + exception_handler: Optional[ExceptionHandler], timeout_ms: int, max_jobs_to_activate: int, max_running_jobs: int, diff --git a/pyzeebe/worker/task_router.py b/pyzeebe/worker/task_router.py index 111b46c5..16dd4712 100644 --- a/pyzeebe/worker/task_router.py +++ b/pyzeebe/worker/task_router.py @@ -1,9 +1,8 @@ import logging from typing import Callable, List, Optional, Tuple -from pyzeebe.errors import BusinessError, DuplicateTaskTypeError, TaskNotFoundError +from pyzeebe.errors import DuplicateTaskTypeError, TaskNotFoundError from pyzeebe.function_tools import parameter_tools -from pyzeebe.job.job import Job from pyzeebe.task import task_builder from pyzeebe.task.exception_handler import ExceptionHandler from pyzeebe.task.task import Task @@ -13,27 +12,20 @@ logger = logging.getLogger(__name__) -async def default_exception_handler(e: Exception, job: Job) -> None: - logger.warning("Task type: %s - failed job %s. Error: %s.", job.type, job, e) - if isinstance(e, BusinessError): - await job.set_error_status(f"Failed job. Recoverable error: {e}", error_code=e.error_code) - else: - await job.set_failure_status(f"Failed job. Error: {e}") - - class ZeebeTaskRouter: def __init__( self, before: Optional[List[TaskDecorator]] = None, after: Optional[List[TaskDecorator]] = None, - exception_handler: ExceptionHandler = default_exception_handler, + exception_handler: Optional[ExceptionHandler] = None, ): """ Args: before (List[TaskDecorator]): Decorators to be performed before each task after (List[TaskDecorator]): Decorators to be performed after each task + exception_handler (ExceptionHandler): Handler that will be called when a job fails. """ - self._default_exception_handler = exception_handler + self._exception_handler = exception_handler self._before: List[TaskDecorator] = before or [] self._after: List[TaskDecorator] = after or [] self.tasks: List[Task] = [] @@ -73,7 +65,7 @@ def task( DuplicateTaskTypeError: If a task from the router already exists in the worker NoVariableNameGivenError: When single_value is set, but no variable_name is given """ - _exception_handler = exception_handler or self._default_exception_handler + _exception_handler = exception_handler or self._exception_handler def task_wrapper(task_function: Callable): config = TaskConfig( @@ -103,7 +95,7 @@ def _add_task(self, task: Task): def _add_decorators_to_config(self, config: TaskConfig) -> TaskConfig: new_task_config = TaskConfig( type=config.type, - exception_handler=config.exception_handler, + exception_handler=config.exception_handler or self._exception_handler, timeout_ms=config.timeout_ms, max_jobs_to_activate=config.max_jobs_to_activate, max_running_jobs=config.max_running_jobs, @@ -140,6 +132,15 @@ def after(self, *decorators: TaskDecorator) -> None: """ self._after.extend(decorators) + def exception_handler(self, exception_handler: ExceptionHandler) -> None: + """ + Add exception handler to be called when a job fails + + Args: + exception_handler (ExceptionHandler): Handler that will be called when a job fails. + """ + self._exception_handler = exception_handler + def remove_task(self, task_type: str) -> Task: """ Remove a task diff --git a/pyzeebe/worker/worker.py b/pyzeebe/worker/worker.py index 76a0e4b8..f66f3855 100644 --- a/pyzeebe/worker/worker.py +++ b/pyzeebe/worker/worker.py @@ -8,6 +8,7 @@ from pyzeebe import TaskDecorator from pyzeebe.grpc_internals.zeebe_adapter import ZeebeAdapter from pyzeebe.task import task_builder +from pyzeebe.task.exception_handler import ExceptionHandler from pyzeebe.worker.job_executor import JobExecutor from pyzeebe.worker.job_poller import JobPoller from pyzeebe.worker.task_router import ZeebeTaskRouter @@ -30,6 +31,7 @@ def __init__( watcher_max_errors_factor: int = 3, poll_retry_delay: int = 5, tenant_ids: Optional[List[str]] = None, + exception_handler: Optional[ExceptionHandler] = None, ): """ Args: @@ -38,12 +40,13 @@ def __init__( request_timeout (int): Longpolling timeout for getting tasks from zeebe. If 0 default value is used before (List[TaskDecorator]): Decorators to be performed before each task after (List[TaskDecorator]): Decorators to be performed after each task + exception_handler (ExceptionHandler): Handler that will be called when a job fails. max_connection_retries (int): Amount of connection retries before worker gives up on connecting to zeebe. To setup with infinite retries use -1 watcher_max_errors_factor (int): Number of consecutive errors for a task watcher will accept before raising MaxConsecutiveTaskThreadError poll_retry_delay (int): The number of seconds to wait before attempting to poll again when reaching max amount of running jobs tenant_ids (List[str]): A list of tenant IDs for which to activate jobs. New in Zeebe 8.3. """ - super().__init__(before, after) + super().__init__(before, after, exception_handler) self.zeebe_adapter = ZeebeAdapter(grpc_channel, max_connection_retries) self.name = name or socket.gethostname() self.request_timeout = request_timeout diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index e3973347..23a8bdb6 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -145,6 +145,14 @@ async def simple_decorator(job: Job) -> Job: return AsyncMock(wraps=simple_decorator) +@pytest.fixture +def exception_handler(): + async def simple_exception_handler(e: Exception, job: Job) -> None: + return None + + return AsyncMock(wraps=simple_exception_handler) + + @pytest.fixture(scope="module") def grpc_add_to_server(): from zeebe_grpc.gateway_pb2_grpc import add_GatewayServicer_to_server diff --git a/tests/unit/worker/task_router_test.py b/tests/unit/worker/task_router_test.py index dc7d0e94..632a8030 100644 --- a/tests/unit/worker/task_router_test.py +++ b/tests/unit/worker/task_router_test.py @@ -6,8 +6,9 @@ from pyzeebe import TaskDecorator from pyzeebe.errors import BusinessError, DuplicateTaskTypeError, TaskNotFoundError from pyzeebe.job.job import Job +from pyzeebe.task.exception_handler import ExceptionHandler, default_exception_handler from pyzeebe.task.task import Task -from pyzeebe.worker.task_router import ZeebeTaskRouter, default_exception_handler +from pyzeebe.worker.task_router import ZeebeTaskRouter from tests.unit.utils.random_utils import randint @@ -20,7 +21,7 @@ def test_get_task(router: ZeebeTaskRouter, task: Task): def test_task_inherits_exception_handler(router: ZeebeTaskRouter, task: Task): - router._default_exception_handler = str + router._exception_handler = str router.task(task.type)(task.original_function) found_task = router.get_task(task.type) @@ -100,6 +101,12 @@ def test_add_after_decorator(router: ZeebeTaskRouter, decorator: TaskDecorator): assert len(router._after) == 1 +def test_set_exception_handler(router: ZeebeTaskRouter, exception_handler: ExceptionHandler): + router.exception_handler(exception_handler) + + assert router._exception_handler is exception_handler + + def test_add_before_decorator_through_constructor(decorator: TaskDecorator): router = ZeebeTaskRouter(before=[decorator]) @@ -112,9 +119,15 @@ def test_add_after_decorator_through_constructor(decorator: TaskDecorator): assert len(router._after) == 1 +def test_set_exception_handler_through_constructor(exception_handler: ExceptionHandler): + router = ZeebeTaskRouter(exception_handler=exception_handler) + + assert router._exception_handler is exception_handler + + @pytest.mark.asyncio async def test_default_exception_handler_logs_a_warning(mocked_job_with_adapter: Job): - with mock.patch("pyzeebe.worker.task_router.logger.warning") as logging_mock: + with mock.patch("pyzeebe.task.exception_handler.logger.warning") as logging_mock: await default_exception_handler(Exception(), mocked_job_with_adapter) mocked_job_with_adapter.set_failure_status.assert_called() @@ -132,7 +145,7 @@ async def test_default_exception_handler_uses_business_error(job_without_adapter @pytest.mark.asyncio async def test_default_exception_handler_warns_of_job_failure(job_without_adapter): - with mock.patch("pyzeebe.worker.task_router.logger.warning") as logging_mock: + with mock.patch("pyzeebe.task.exception_handler.logger.warning") as logging_mock: with mock.patch("pyzeebe.job.job.Job.set_error_status"): exception = BusinessError("custom-error-code") await default_exception_handler(exception, job_without_adapter) diff --git a/tests/unit/worker/worker_test.py b/tests/unit/worker/worker_test.py index 1d39e70c..2393ea89 100644 --- a/tests/unit/worker/worker_test.py +++ b/tests/unit/worker/worker_test.py @@ -4,7 +4,7 @@ import grpc import pytest -from pyzeebe import TaskDecorator, ZeebeTaskRouter +from pyzeebe import ExceptionHandler, TaskDecorator, ZeebeTaskRouter from pyzeebe.errors import DuplicateTaskTypeError from pyzeebe.job.job import Job from pyzeebe.task.task import Task @@ -55,6 +55,10 @@ def test_add_after_decorator(self, zeebe_worker: ZeebeWorker, decorator: TaskDec assert len(zeebe_worker._after) == 1 assert decorator in zeebe_worker._after + def test_set_exception_handler(self, zeebe_worker: ZeebeWorker, exception_handler: ExceptionHandler): + zeebe_worker.exception_handler(exception_handler) + assert exception_handler is zeebe_worker._exception_handler + def test_add_constructor_before_decorator(self, aio_grpc_channel: grpc.aio.Channel, decorator: TaskDecorator): zeebe_worker = ZeebeWorker(aio_grpc_channel, before=[decorator]) assert len(zeebe_worker._before) == 1 @@ -65,6 +69,12 @@ def test_add_constructor_after_decorator(self, aio_grpc_channel: grpc.aio.Channe assert len(zeebe_worker._after) == 1 assert decorator in zeebe_worker._after + def test_set_constructor_exception_handler( + self, aio_grpc_channel: grpc.aio.Channel, exception_handler: ExceptionHandler + ): + zeebe_worker = ZeebeWorker(aio_grpc_channel, exception_handler=exception_handler) + assert exception_handler is zeebe_worker._exception_handler + class TestIncludeRouter: def test_include_router_adds_task(self, zeebe_worker: ZeebeWorker, router: ZeebeTaskRouter, task_type: str): @@ -100,6 +110,21 @@ async def test_router_after_decorator( decorator.assert_called_once() + @pytest.mark.asyncio + async def test_router_with_exception_handler( + self, + zeebe_worker: ZeebeWorker, + router: ZeebeTaskRouter, + exception_handler: ExceptionHandler, + mocked_job_with_adapter: Job, + ): + router.exception_handler(exception_handler) + task = self.include_router_with_task_error(zeebe_worker, router) + + await task.job_handler(mocked_job_with_adapter) + + exception_handler.assert_called_once() + @pytest.mark.asyncio async def test_worker_with_before_decorator( self, zeebe_worker: ZeebeWorker, router: ZeebeTaskRouter, decorator: TaskDecorator, mocked_job_with_adapter: Job @@ -122,6 +147,21 @@ async def test_worker_with_after_decorator( decorator.assert_called_once() + @pytest.mark.asyncio + async def test_worker_with_exception_handler( + self, + zeebe_worker: ZeebeWorker, + router: ZeebeTaskRouter, + exception_handler: ExceptionHandler, + mocked_job_with_adapter: Job, + ): + zeebe_worker.exception_handler(exception_handler) + task = self.include_router_with_task_error(zeebe_worker, router) + + await task.job_handler(mocked_job_with_adapter) + + exception_handler.assert_called_once() + @staticmethod def include_router_with_task(zeebe_worker: ZeebeWorker, router: ZeebeTaskRouter, task_type: str = None) -> Task: task_type = task_type or str(uuid4()) @@ -132,3 +172,16 @@ def dummy_function(): zeebe_worker.include_router(router) return zeebe_worker.get_task(task_type) + + @staticmethod + def include_router_with_task_error( + zeebe_worker: ZeebeWorker, router: ZeebeTaskRouter, task_type: str = None + ) -> Task: + task_type = task_type or str(uuid4()) + + @router.task(task_type) + def dummy_function(): + raise Exception() + + zeebe_worker.include_router(router) + return zeebe_worker.get_task(task_type) From f9b852c5250d1ff4856fdd1201a8d891a0e93278 Mon Sep 17 00:00:00 2001 From: Dmitriy Date: Fri, 24 May 2024 12:00:16 +0500 Subject: [PATCH 2/4] add docs --- docs/errors.rst | 7 +++++++ docs/worker_tasks.rst | 5 +++++ 2 files changed, 12 insertions(+) diff --git a/docs/errors.rst b/docs/errors.rst index 9b1c125e..8fea5223 100644 --- a/docs/errors.rst +++ b/docs/errors.rst @@ -43,3 +43,10 @@ All ``pyzeebe`` errors inherit from :py:class:`PyZeebeError` .. autoexception:: pyzeebe.errors.InvalidOAuthCredentialsError .. autoexception:: pyzeebe.errors.InvalidCamundaCloudCredentialsError + + +================= +Exception Handler +================= + +.. autofunction:: pyzeebe.default_exception_handler diff --git a/docs/worker_tasks.rst b/docs/worker_tasks.rst index b5b10ddc..8e56eda4 100644 --- a/docs/worker_tasks.rst +++ b/docs/worker_tasks.rst @@ -85,6 +85,11 @@ Now every time ``my_task`` is called (and then fails), ``my_exception_handler`` This tells Zeebe that the job failed. The job will then be retried (if configured in process definition). +.. note:: + The exception handler can also be set via :py:class:`pyzeebe.ZeebeWorker` or :py:class:`pyzeebe.ZeebeTaskRouter`. + Pyzeebe will try to find the exception handler in the following order: + ``Worker`` -> ``Router`` -> ``Task`` -> :py:func:`pyzeebe.default_exception_handler` + Task timeout ------------ From 7cb847c11ea2946c9d0a6c12f2f052412cd51dfd Mon Sep 17 00:00:00 2001 From: Dmitriy Date: Fri, 24 May 2024 13:30:03 +0500 Subject: [PATCH 3/4] add test --- tests/unit/worker/worker_test.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/tests/unit/worker/worker_test.py b/tests/unit/worker/worker_test.py index 2393ea89..93a92e7c 100644 --- a/tests/unit/worker/worker_test.py +++ b/tests/unit/worker/worker_test.py @@ -2,6 +2,7 @@ from uuid import uuid4 import grpc +from mock import AsyncMock import pytest from pyzeebe import ExceptionHandler, TaskDecorator, ZeebeTaskRouter @@ -162,6 +163,24 @@ async def test_worker_with_exception_handler( exception_handler.assert_called_once() + @pytest.mark.asyncio + async def test_worker_and_router_with_exception_handler( + self, + zeebe_worker: ZeebeWorker, + router: ZeebeTaskRouter, + mocked_job_with_adapter: Job, + ): + exception_handler_router = AsyncMock() + exception_handler_worker = AsyncMock() + router.exception_handler(exception_handler_router) + zeebe_worker.exception_handler(exception_handler_worker) + task = self.include_router_with_task_error(zeebe_worker, router) + + await task.job_handler(mocked_job_with_adapter) + + exception_handler_router.assert_called_once() + exception_handler_worker.assert_not_called() + @staticmethod def include_router_with_task(zeebe_worker: ZeebeWorker, router: ZeebeTaskRouter, task_type: str = None) -> Task: task_type = task_type or str(uuid4()) From ba57cd0c423d51b11c2154efed49050f183f41a0 Mon Sep 17 00:00:00 2001 From: Dmitriy Date: Fri, 24 May 2024 13:58:30 +0500 Subject: [PATCH 4/4] fix merge conflict --- tests/unit/worker/worker_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/worker/worker_test.py b/tests/unit/worker/worker_test.py index 93a92e7c..720773b0 100644 --- a/tests/unit/worker/worker_test.py +++ b/tests/unit/worker/worker_test.py @@ -1,8 +1,8 @@ from typing import List +from unittest.mock import AsyncMock from uuid import uuid4 import grpc -from mock import AsyncMock import pytest from pyzeebe import ExceptionHandler, TaskDecorator, ZeebeTaskRouter