Skip to content

Commit

Permalink
fix: add logging a exception in job execution task
Browse files Browse the repository at this point in the history
  • Loading branch information
dimastbk committed Dec 6, 2024
1 parent 3533397 commit 1e42f6a
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 2 deletions.
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ profile = "black"

[tool.pytest.ini_options]
asyncio_mode = "auto"
markers = [
"e2e: end to end tests",
]

[tool.ruff]
target-version = "py39"
Expand Down
10 changes: 8 additions & 2 deletions pyzeebe/worker/job_executor.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import asyncio
import logging
from typing import Callable
Expand All @@ -14,7 +16,7 @@


class JobExecutor:
def __init__(self, task: Task, jobs: "asyncio.Queue[Job]", task_state: TaskState, zeebe_adapter: ZeebeAdapter):
def __init__(self, task: Task, jobs: asyncio.Queue[Job], task_state: TaskState, zeebe_adapter: ZeebeAdapter):
self.task = task
self.jobs = jobs
self.task_state = task_state
Expand Down Expand Up @@ -45,7 +47,11 @@ async def stop(self) -> None:


def create_job_callback(job_executor: JobExecutor, job: Job) -> AsyncTaskCallback:
def callback(_: "asyncio.Future[None]") -> None:
def callback(fut: asyncio.Future[None]) -> None:
err = fut.done() and not fut.cancelled() and fut.exception()
if err:
logger.exception("Error in job executor. Task: %s. Error: %s.", job.type, err, exc_info=err)

job_executor.jobs.task_done()
job_executor.task_state.remove(job)

Expand Down
3 changes: 3 additions & 0 deletions tests/integration/cancel_process_test.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import pytest

from pyzeebe import ZeebeClient


@pytest.mark.e2e
async def test_cancel_process(zeebe_client: ZeebeClient, process_name: str, process_variables: dict):
response = await zeebe_client.run_process(process_name, process_variables)

Expand Down
3 changes: 3 additions & 0 deletions tests/integration/publish_message_test.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import pytest

from pyzeebe import ZeebeClient
from tests.integration.utils import ProcessStats
from tests.integration.utils.wait_for_process import wait_for_process_with_variables


@pytest.mark.e2e
async def test_publish_message(zeebe_client: ZeebeClient, process_stats: ProcessStats, process_variables: dict):
initial_amount_of_processes = process_stats.get_process_runs()

Expand Down
3 changes: 3 additions & 0 deletions tests/integration/run_process_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
PROCESS_TIMEOUT_IN_MS = 60_000


@pytest.mark.e2e
async def test_run_process(
zeebe_client: ZeebeClient, process_name: str, process_variables: dict, process_stats: ProcessStats
):
Expand All @@ -20,11 +21,13 @@ async def test_run_process(
assert process_stats.get_process_runs() == initial_amount_of_processes + 1


@pytest.mark.e2e
async def test_non_existent_process(zeebe_client: ZeebeClient):
with pytest.raises(ProcessDefinitionNotFoundError):
await zeebe_client.run_process(str(uuid4()))


@pytest.mark.e2e
async def test_run_process_with_result(zeebe_client: ZeebeClient, process_name: str, process_variables: dict):
response = await zeebe_client.run_process_with_result(
process_name, process_variables, timeout=PROCESS_TIMEOUT_IN_MS
Expand Down
29 changes: 29 additions & 0 deletions tests/unit/worker/job_executor_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import json
from unittest.mock import AsyncMock, Mock

import pytest
Expand Down Expand Up @@ -80,3 +81,31 @@ def test_signals_that_job_is_done(self, job_executor: JobExecutor, job_from_task

task_done_mock.assert_called_once()
remove_from_task_state_mock.assert_called_once_with(job_from_task)

def test_signals_that_job_is_done_with_exception(
self, job_executor: JobExecutor, job_from_task: Job, caplog: pytest.LogCaptureFixture
):
task_done_mock = Mock()
remove_from_task_state_mock = Mock()
job_executor.jobs.task_done = task_done_mock
job_executor.task_state.remove = remove_from_task_state_mock

callback = create_job_callback(job_executor, job_from_task)

exception = None
try:
json.dumps({"foo": object})
except TypeError as err:
exception = err

assert exception

fut = asyncio.Future()
fut.set_exception(exception)
callback(fut)

task_done_mock.assert_called_once()
remove_from_task_state_mock.assert_called_once_with(job_from_task)

assert len(caplog.records) == 1
assert caplog.records[0].getMessage().startswith("Error in job executor. Task:")

0 comments on commit 1e42f6a

Please sign in to comment.