Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: sync decorators #343

Open
wants to merge 4 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions taskiq/receiver/receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,16 @@ async def run_task( # noqa: C901, PLR0912, PLR0915
if timeout is not None:
if not is_coroutine:
logger.warning("Timeouts for sync tasks don't work in python well.")
target_future = asyncio.wait_for(target_future, float(timeout))
returned = await target_future

with anyio.fail_after(float(timeout)):
while inspect.isawaitable(target_future):
Sobes76rus marked this conversation as resolved.
Show resolved Hide resolved
target_future = await target_future

else:
while inspect.isawaitable(target_future):
target_future = await target_future

returned = target_future
except NoResultError as no_res_exc:
found_exception = no_res_exc
logger.warning(
Expand Down
65 changes: 65 additions & 0 deletions tests/receiver/test_receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import random
import time
from concurrent.futures import ThreadPoolExecutor
from functools import wraps
from typing import Any, ClassVar, List, Optional

import pytest
Expand Down Expand Up @@ -472,3 +473,67 @@ async def task_no_result() -> str:
assert resp.return_value is None
assert not broker._running_tasks
assert isinstance(resp.error, ValueError)


@pytest.mark.anyio
async def test_sync_decorator_on_async_function() -> None:
broker = InMemoryBroker()
wrapper_call = False

def wrapper(f: Any) -> Any:
@wraps(f)
def wrapper_impl(*args: Any, **kwargs: Any) -> Any:
nonlocal wrapper_call

wrapper_call = True
return f(*args, **kwargs)

return wrapper_impl

@broker.task
@wrapper
async def task_no_result() -> str:
return "some value"

task = await task_no_result.kiq()
resp = await task.wait_result(timeout=1)

assert resp.return_value == "some value"
assert not broker._running_tasks
assert wrapper_call is True


@pytest.mark.anyio
async def test_sync_decorator_on_async_function_with_timeout() -> None:
wrapper_call = False

def wrapper(f: Any) -> Any:
@wraps(f)
def wrapper_impl(*args: Any, **kwargs: Any) -> Any:
nonlocal wrapper_call

wrapper_call = True
return f(*args, **kwargs)

return wrapper_impl

@wrapper
async def test_func() -> None:
await asyncio.sleep(2)

receiver = get_receiver()

result = await receiver.run_task(
test_func,
TaskiqMessage(
task_id="",
task_name="",
labels={"timeout": "0.3"},
args=[],
kwargs={},
),
)
assert result.return_value is None
assert result.execution_time < 2
assert result.is_err
assert wrapper_call is True
Loading