From 1e42f6a32bde348db144ef865de40c882a4c860a Mon Sep 17 00:00:00 2001 From: Dmitriy Date: Fri, 6 Dec 2024 18:24:22 +0500 Subject: [PATCH] fix: add logging a exception in job execution task --- pyproject.toml | 3 +++ pyzeebe/worker/job_executor.py | 10 ++++++-- tests/integration/cancel_process_test.py | 3 +++ tests/integration/publish_message_test.py | 3 +++ tests/integration/run_process_test.py | 3 +++ tests/unit/worker/job_executor_test.py | 29 +++++++++++++++++++++++ 6 files changed, 49 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index bbc80af8..1636b052 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -71,6 +71,9 @@ profile = "black" [tool.pytest.ini_options] asyncio_mode = "auto" +markers = [ + "e2e: end to end tests", +] [tool.ruff] target-version = "py39" diff --git a/pyzeebe/worker/job_executor.py b/pyzeebe/worker/job_executor.py index 6318f89a..ed34b497 100644 --- a/pyzeebe/worker/job_executor.py +++ b/pyzeebe/worker/job_executor.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import asyncio import logging from typing import Callable @@ -14,7 +16,7 @@ class JobExecutor: - def __init__(self, task: Task, jobs: "asyncio.Queue[Job]", task_state: TaskState, zeebe_adapter: ZeebeAdapter): + def __init__(self, task: Task, jobs: asyncio.Queue[Job], task_state: TaskState, zeebe_adapter: ZeebeAdapter): self.task = task self.jobs = jobs self.task_state = task_state @@ -45,7 +47,11 @@ async def stop(self) -> None: def create_job_callback(job_executor: JobExecutor, job: Job) -> AsyncTaskCallback: - def callback(_: "asyncio.Future[None]") -> None: + def callback(fut: asyncio.Future[None]) -> None: + err = fut.done() and not fut.cancelled() and fut.exception() + if err: + logger.exception("Error in job executor. Task: %s. Error: %s.", job.type, err, exc_info=err) + job_executor.jobs.task_done() job_executor.task_state.remove(job) diff --git a/tests/integration/cancel_process_test.py b/tests/integration/cancel_process_test.py index b969222e..39b190ec 100644 --- a/tests/integration/cancel_process_test.py +++ b/tests/integration/cancel_process_test.py @@ -1,6 +1,9 @@ +import pytest + from pyzeebe import ZeebeClient +@pytest.mark.e2e async def test_cancel_process(zeebe_client: ZeebeClient, process_name: str, process_variables: dict): response = await zeebe_client.run_process(process_name, process_variables) diff --git a/tests/integration/publish_message_test.py b/tests/integration/publish_message_test.py index 29583a31..71405998 100644 --- a/tests/integration/publish_message_test.py +++ b/tests/integration/publish_message_test.py @@ -1,8 +1,11 @@ +import pytest + from pyzeebe import ZeebeClient from tests.integration.utils import ProcessStats from tests.integration.utils.wait_for_process import wait_for_process_with_variables +@pytest.mark.e2e async def test_publish_message(zeebe_client: ZeebeClient, process_stats: ProcessStats, process_variables: dict): initial_amount_of_processes = process_stats.get_process_runs() diff --git a/tests/integration/run_process_test.py b/tests/integration/run_process_test.py index 3c25e3f7..3c1cb106 100644 --- a/tests/integration/run_process_test.py +++ b/tests/integration/run_process_test.py @@ -9,6 +9,7 @@ PROCESS_TIMEOUT_IN_MS = 60_000 +@pytest.mark.e2e async def test_run_process( zeebe_client: ZeebeClient, process_name: str, process_variables: dict, process_stats: ProcessStats ): @@ -20,11 +21,13 @@ async def test_run_process( assert process_stats.get_process_runs() == initial_amount_of_processes + 1 +@pytest.mark.e2e async def test_non_existent_process(zeebe_client: ZeebeClient): with pytest.raises(ProcessDefinitionNotFoundError): await zeebe_client.run_process(str(uuid4())) +@pytest.mark.e2e async def test_run_process_with_result(zeebe_client: ZeebeClient, process_name: str, process_variables: dict): response = await zeebe_client.run_process_with_result( process_name, process_variables, timeout=PROCESS_TIMEOUT_IN_MS diff --git a/tests/unit/worker/job_executor_test.py b/tests/unit/worker/job_executor_test.py index 2cfe4abe..8b822ffd 100644 --- a/tests/unit/worker/job_executor_test.py +++ b/tests/unit/worker/job_executor_test.py @@ -1,4 +1,5 @@ import asyncio +import json from unittest.mock import AsyncMock, Mock import pytest @@ -80,3 +81,31 @@ def test_signals_that_job_is_done(self, job_executor: JobExecutor, job_from_task task_done_mock.assert_called_once() remove_from_task_state_mock.assert_called_once_with(job_from_task) + + def test_signals_that_job_is_done_with_exception( + self, job_executor: JobExecutor, job_from_task: Job, caplog: pytest.LogCaptureFixture + ): + task_done_mock = Mock() + remove_from_task_state_mock = Mock() + job_executor.jobs.task_done = task_done_mock + job_executor.task_state.remove = remove_from_task_state_mock + + callback = create_job_callback(job_executor, job_from_task) + + exception = None + try: + json.dumps({"foo": object}) + except TypeError as err: + exception = err + + assert exception + + fut = asyncio.Future() + fut.set_exception(exception) + callback(fut) + + task_done_mock.assert_called_once() + remove_from_task_state_mock.assert_called_once_with(job_from_task) + + assert len(caplog.records) == 1 + assert caplog.records[0].getMessage().startswith("Error in job executor. Task:")