Skip to content

Commit

Permalink
Merge branch 'release/0.8.8'
Browse files Browse the repository at this point in the history
  • Loading branch information
s3rius committed Sep 9, 2023
2 parents 1731c4a + 1a3f3d3 commit 1d2f0b2
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 4 deletions.
33 changes: 33 additions & 0 deletions docs/guide/getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -217,3 +217,36 @@ Returned value: 2
```

Continue reading to get more information about taskiq internals.


## Timeouts

If you want to restrict amount of time you want to run task,
just add timeout label to the task.

You can do it either with decorator or when calling the task.

::: tabs

@tab decorator

```python
@broker.task(timeout=0.1)
async def mytask():
await asyncio.sleep(2)
```

@tab when calling

```python
await my_task.kicker().with_labels(timeout=0.3).kiq()
```

:::

::: danger Cool alert

We use [run_in_executor](https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor) method to run sync functions. Timeouts will raise a TimeoutException, but
synchronous function may not stop from execution. This is a constraint of python.

:::
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "taskiq"
version = "0.8.7"
version = "0.8.8"
description = "Distributed task queue with full async support"
authors = ["Pavel Kirilin <[email protected]>"]
maintainers = ["Pavel Kirilin <[email protected]>"]
Expand Down
13 changes: 10 additions & 3 deletions taskiq/receiver/receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,20 +218,27 @@ async def run_task( # noqa: C901, WPS210
kwargs = await dep_ctx.resolve_kwargs()
# We udpate kwargs with kwargs from network.
kwargs.update(message.kwargs)

is_coroutine = True
# If the function is a coroutine, we await it.
if asyncio.iscoroutinefunction(target):
returned = await target(*message.args, **kwargs)
target_future = target(*message.args, **kwargs)
else:
is_coroutine = False
# If this is a synchronous function, we
# run it in executor.
returned = await loop.run_in_executor(
target_future = loop.run_in_executor(
self.executor,
_run_sync,
target,
message.args,
kwargs,
)
timeout = message.labels.get("timeout")
if timeout is not None:
if not is_coroutine:
logger.warning("Timeouts for sync tasks don't work in python well.")
target_future = asyncio.wait_for(target_future, float(timeout))
returned = await target_future
except NoResultError as no_res_exc:
found_exception = no_res_exc
logger.warning(
Expand Down
45 changes: 45 additions & 0 deletions tests/receiver/test_receiver.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
from typing import Any, AsyncGenerator, List, Optional, TypeVar

Expand Down Expand Up @@ -117,6 +118,50 @@ def test_func() -> None:
assert result.is_err


@pytest.mark.anyio
async def test_run_timeouts() -> None:
async def test_func() -> None:
await asyncio.sleep(2)

receiver = get_receiver()

result = await receiver.run_task(
test_func,
TaskiqMessage(
task_id="",
task_name="",
labels={"timeout": "0.3"},
args=[],
kwargs={},
),
)
assert result.return_value is None
assert result.execution_time < 2
assert result.is_err


@pytest.mark.anyio
async def test_run_timeouts_sync() -> None:
def test_func() -> None:
time.sleep(2)

receiver = get_receiver()

result = await receiver.run_task(
test_func,
TaskiqMessage(
task_id="",
task_name="",
labels={"timeout": "0.3"},
args=[],
kwargs={},
),
)
assert result.return_value is None
assert result.execution_time < 2
assert result.is_err


@pytest.mark.anyio
async def test_run_task_exception_middlewares() -> None:
"""Tests that run_task can run sync tasks."""
Expand Down

0 comments on commit 1d2f0b2

Please sign in to comment.