Skip to content

Commit

Permalink
Merge pull request #489 from dimastbk/cancel-all-executors
Browse files Browse the repository at this point in the history
Cancel all executors on error
  • Loading branch information
dimastbk authored Sep 26, 2024
2 parents 614559d + 702722c commit a2e44fd
Show file tree
Hide file tree
Showing 7 changed files with 143 additions and 54 deletions.
57 changes: 34 additions & 23 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ packages = [
python = "^3.9"
oauthlib = "^3.1.0"
requests-oauthlib = ">=1.3.0,<3.0.0"
aiofiles = ">=0.7,<25"
zeebe-grpc = "^8.4.0"
typing-extensions = "^4.11.0"
anyio = "^4.6.0"

[tool.poetry.group.dev.dependencies]
pytest = ">=7.4,<9.0"
Expand All @@ -41,7 +41,6 @@ sphinx-rtd-theme = ">=1.2.2,<3.0.0"
sphinx = ">=6,<8"

[tool.poetry.group.stubs.dependencies]
types-aiofiles = ">=0.7,<25"
types-oauthlib = "^3.1.0"
types-requests-oauthlib = ">=1.3.0,<3.0.0"

Expand Down
4 changes: 2 additions & 2 deletions pyzeebe/grpc_internals/zeebe_process_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import os
from typing import Callable, Dict, Iterable, List, NoReturn, Optional, Union

import aiofiles
import anyio
import grpc
from zeebe_grpc.gateway_pb2 import (
CancelProcessInstanceRequest,
Expand Down Expand Up @@ -225,5 +225,5 @@ def _create_form_from_raw_form(response: FormMetadata) -> DeployResourceResponse


async def _create_resource_request(resource_file_path: str) -> Resource:
async with aiofiles.open(resource_file_path, "rb") as file:
async with await anyio.open_file(resource_file_path, "rb") as file:
return Resource(name=os.path.basename(resource_file_path), content=await file.read())
55 changes: 30 additions & 25 deletions pyzeebe/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import socket
from typing import List, Optional

import anyio
import grpc

from pyzeebe import TaskDecorator
Expand Down Expand Up @@ -55,22 +56,11 @@ def __init__(
self._watcher_thread = None
self.poll_retry_delay = poll_retry_delay
self.tenant_ids = tenant_ids
self._work_task: "Optional[asyncio.Future[List[None]]]" = None
self._job_pollers: List[JobPoller] = []
self._job_executors: List[JobExecutor] = []
self._stop_event = anyio.Event()

async def work(self) -> None:
"""
Start the worker. The worker will poll zeebe for jobs of each task in a different thread.
Raises:
ActivateJobsRequestInvalidError: If one of the worker's task has invalid types
ZeebeBackPressureError: If Zeebe is currently in back pressure (too many requests)
ZeebeGatewayUnavailableError: If the Zeebe gateway is unavailable
ZeebeInternalError: If Zeebe experiences an internal error
UnknownGrpcStatusCodeError: If Zeebe returns an unexpected status code
"""
def _init_tasks(self) -> None:
self._job_executors, self._job_pollers = [], []

for task in self.tasks:
Expand All @@ -88,34 +78,49 @@ async def work(self) -> None:
self.tenant_ids,
)
executor = JobExecutor(task, jobs_queue, task_state, self.zeebe_adapter)

self._job_pollers.append(poller)
self._job_executors.append(executor)

coroutines = [poller.poll() for poller in self._job_pollers] + [
executor.execute() for executor in self._job_executors
]
async def work(self) -> None:
"""
Start the worker. The worker will poll zeebe for jobs of each task in a different thread.
Raises:
ActivateJobsRequestInvalidError: If one of the worker's task has invalid types
ZeebeBackPressureError: If Zeebe is currently in back pressure (too many requests)
ZeebeGatewayUnavailableError: If the Zeebe gateway is unavailable
ZeebeInternalError: If Zeebe experiences an internal error
UnknownGrpcStatusCodeError: If Zeebe returns an unexpected status code
self._work_task = asyncio.gather(*coroutines)
"""
self._init_tasks()

async with anyio.create_task_group() as tg:
for poller in self._job_pollers:
tg.start_soon(poller.poll)

for executor in self._job_executors:
tg.start_soon(executor.execute)

await self._stop_event.wait()

try:
await self._work_task
except asyncio.CancelledError:
logger.info("Zeebe worker was stopped")
return
tg.cancel_scope.cancel()

logger.info("Zeebe worker was stopped")

async def stop(self) -> None:
"""
Stop the worker. This will emit a signal asking tasks to complete the current task and stop polling for new.
"""
if self._work_task is not None:
self._work_task.cancel()

for poller in self._job_pollers:
await poller.stop()

for executor in self._job_executors:
await executor.stop()

self._stop_event.set()

def include_router(self, *routers: ZeebeTaskRouter) -> None:
"""
Adds all router's tasks to the worker.
Expand Down
5 changes: 5 additions & 0 deletions tests/unit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,11 @@ async def aio_grpc_channel(aio_create_grpc_channel):
yield channel


@pytest.fixture()
def aio_grpc_channel_mock():
return AsyncMock(spec_set=grpc.aio.Channel)


@pytest.fixture
def task_state() -> TaskState:
return TaskState()
2 changes: 1 addition & 1 deletion tests/unit/grpc_internals/zeebe_process_adapter_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def mocked_aiofiles_open():
file_mock = AsyncMock()
file_mock.__aenter__.return_value.read = read_mock

with patch("pyzeebe.grpc_internals.zeebe_process_adapter.aiofiles.open", return_value=file_mock) as open_mock:
with patch("pyzeebe.grpc_internals.zeebe_process_adapter.anyio.open_file", return_value=file_mock) as open_mock:
yield open_mock


Expand Down
71 changes: 70 additions & 1 deletion tests/unit/worker/worker_test.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
import asyncio
from typing import List
from unittest.mock import AsyncMock
from unittest.mock import AsyncMock, Mock
from uuid import uuid4

import anyio.abc
import grpc
import pytest

from pyzeebe import ExceptionHandler, TaskDecorator, ZeebeTaskRouter
from pyzeebe.errors import DuplicateTaskTypeError
from pyzeebe.job.job import Job, JobController
from pyzeebe.task.task import Task
from pyzeebe.worker.job_executor import JobExecutor
from pyzeebe.worker.job_poller import JobPoller
from pyzeebe.worker.worker import ZeebeWorker


Expand Down Expand Up @@ -227,3 +231,68 @@ def dummy_function():

zeebe_worker.include_router(router)
return zeebe_worker.get_task(task_type)


class TestWorker:
@pytest.fixture()
def zeebe_worker(self, aio_grpc_channel_mock):
return ZeebeWorker(grpc_channel=aio_grpc_channel_mock)

@staticmethod
async def wait_for_channel_ready(*, task_status: anyio.abc.TaskStatus = anyio.TASK_STATUS_IGNORED):
task_status.started()

async def test_start_stop(self, zeebe_worker: ZeebeWorker):
zeebe_worker._stop_event = AsyncMock(spec_set=anyio.Event)

await zeebe_worker.work()
zeebe_worker._stop_event.wait.assert_awaited_once()

await zeebe_worker.stop()
zeebe_worker._stop_event.set.assert_called_once()

async def test_poller_stoped(self, zeebe_worker: ZeebeWorker):
zeebe_worker._init_tasks = Mock()
zeebe_worker._stop_event = AsyncMock(spec_set=anyio.Event)

poller_mock = AsyncMock(spec_set=JobPoller)
zeebe_worker._job_pollers = [poller_mock]

await zeebe_worker.work()
poller_mock.poll.assert_awaited_once()

await zeebe_worker.stop()
poller_mock.stop.assert_awaited_once()

async def test_poller_failed(self, zeebe_worker: ZeebeWorker):
zeebe_worker._init_tasks = Mock()

poller_mock = AsyncMock(spec_set=JobPoller, poll=AsyncMock(side_effect=[Exception("test_exception")]))
zeebe_worker._job_pollers = [poller_mock]

with pytest.raises(Exception, match=r"unhandled errors in a TaskGroup \(1 sub-exception\)") as err:
await zeebe_worker.work()

poller_mock.poll.assert_awaited_once()

async def test_second_poller_should_cancel(self, zeebe_worker: ZeebeWorker):
zeebe_worker._init_tasks = Mock()

poller2_cancel_event = anyio.Event()

async def poll2():
try:
await anyio.Event().wait()
except anyio.get_cancelled_exc_class():
poller2_cancel_event.set()

poller_mock = AsyncMock(spec_set=JobPoller, poll=AsyncMock(side_effect=[Exception("test_exception")]))
poller2_mock = AsyncMock(spec_set=JobPoller, poll=AsyncMock(wraps=poll2))
zeebe_worker._job_pollers = [poller_mock, poller2_mock]

with pytest.raises(Exception, match=r"unhandled errors in a TaskGroup \(1 sub-exception\)") as err:
await zeebe_worker.work()

poller_mock.poll.assert_awaited_once()
poller2_mock.poll.assert_awaited_once()
assert poller2_cancel_event.is_set()

0 comments on commit a2e44fd

Please sign in to comment.