From b733a38351a67ee102126e1500b56577b784675a Mon Sep 17 00:00:00 2001 From: Pavel Kirilin Date: Sat, 9 Sep 2023 12:37:57 +0400 Subject: [PATCH 1/2] Added timeouts. (#196) --- docs/guide/getting-started.md | 33 ++++++++++++++++++++++++ taskiq/receiver/receiver.py | 13 +++++++--- tests/receiver/test_receiver.py | 45 +++++++++++++++++++++++++++++++++ 3 files changed, 88 insertions(+), 3 deletions(-) diff --git a/docs/guide/getting-started.md b/docs/guide/getting-started.md index a575d49..a12da7e 100644 --- a/docs/guide/getting-started.md +++ b/docs/guide/getting-started.md @@ -217,3 +217,36 @@ Returned value: 2 ``` Continue reading to get more information about taskiq internals. + + +## Timeouts + +If you want to restrict amount of time you want to run task, +just add timeout label to the task. + +You can do it either with decorator or when calling the task. + +::: tabs + +@tab decorator + +```python +@broker.task(timeout=0.1) +async def mytask(): + await asyncio.sleep(2) +``` + +@tab when calling + +```python +await my_task.kicker().with_labels(timeout=0.3).kiq() +``` + +::: + +::: danger Cool alert + +We use [run_in_executor](https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor) method to run sync functions. Timeouts will raise a TimeoutException, but +synchronous function may not stop from execution. This is a constraint of python. + +::: diff --git a/taskiq/receiver/receiver.py b/taskiq/receiver/receiver.py index b68118c..db23bbe 100644 --- a/taskiq/receiver/receiver.py +++ b/taskiq/receiver/receiver.py @@ -218,20 +218,27 @@ async def run_task( # noqa: C901, WPS210 kwargs = await dep_ctx.resolve_kwargs() # We udpate kwargs with kwargs from network. kwargs.update(message.kwargs) - + is_coroutine = True # If the function is a coroutine, we await it. if asyncio.iscoroutinefunction(target): - returned = await target(*message.args, **kwargs) + target_future = target(*message.args, **kwargs) else: + is_coroutine = False # If this is a synchronous function, we # run it in executor. - returned = await loop.run_in_executor( + target_future = loop.run_in_executor( self.executor, _run_sync, target, message.args, kwargs, ) + timeout = message.labels.get("timeout") + if timeout is not None: + if not is_coroutine: + logger.warning("Timeouts for sync tasks don't work in python well.") + target_future = asyncio.wait_for(target_future, float(timeout)) + returned = await target_future except NoResultError as no_res_exc: found_exception = no_res_exc logger.warning( diff --git a/tests/receiver/test_receiver.py b/tests/receiver/test_receiver.py index 5567197..97cd82f 100644 --- a/tests/receiver/test_receiver.py +++ b/tests/receiver/test_receiver.py @@ -1,4 +1,5 @@ import asyncio +import time from concurrent.futures import ThreadPoolExecutor from typing import Any, AsyncGenerator, List, Optional, TypeVar @@ -117,6 +118,50 @@ def test_func() -> None: assert result.is_err +@pytest.mark.anyio +async def test_run_timeouts() -> None: + async def test_func() -> None: + await asyncio.sleep(2) + + receiver = get_receiver() + + result = await receiver.run_task( + test_func, + TaskiqMessage( + task_id="", + task_name="", + labels={"timeout": "0.3"}, + args=[], + kwargs={}, + ), + ) + assert result.return_value is None + assert result.execution_time < 2 + assert result.is_err + + +@pytest.mark.anyio +async def test_run_timeouts_sync() -> None: + def test_func() -> None: + time.sleep(2) + + receiver = get_receiver() + + result = await receiver.run_task( + test_func, + TaskiqMessage( + task_id="", + task_name="", + labels={"timeout": "0.3"}, + args=[], + kwargs={}, + ), + ) + assert result.return_value is None + assert result.execution_time < 2 + assert result.is_err + + @pytest.mark.anyio async def test_run_task_exception_middlewares() -> None: """Tests that run_task can run sync tasks.""" From 1a3f3d353612081b22aa2cfac4bd8e68f670bebc Mon Sep 17 00:00:00 2001 From: Pavel Kirilin Date: Sat, 9 Sep 2023 12:38:28 +0400 Subject: [PATCH 2/2] Version bumped to 0.8.8 Signed-off-by: Pavel Kirilin --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 85c669c..a670b5a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "taskiq" -version = "0.8.7" +version = "0.8.8" description = "Distributed task queue with full async support" authors = ["Pavel Kirilin "] maintainers = ["Pavel Kirilin "]