Skip to content

Commit

Permalink
Merge branch 'release/0.2.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
s3rius committed Mar 13, 2023
2 parents 820287a + 4055588 commit 6ef1a22
Show file tree
Hide file tree
Showing 11 changed files with 895 additions and 838 deletions.
16 changes: 5 additions & 11 deletions docs/examples/extending/broker.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import asyncio
from typing import Any, Callable, Coroutine, Optional, TypeVar
from typing import AsyncGenerator, Callable, Optional, TypeVar

from taskiq import AsyncBroker, AsyncResultBackend, BrokerMessage

Expand Down Expand Up @@ -29,14 +28,9 @@ async def kick(self, message: BrokerMessage) -> None:
# Send a message.
pass

async def listen(
self,
callback: Callable[[BrokerMessage], Coroutine[Any, Any, None]],
) -> None:
loop = asyncio.get_event_loop()
async def listen(self) -> AsyncGenerator[BrokerMessage, None]:
while True:
# Get new message.
# new_message = ...
# Create a new task to execute.
# loop.create_task(callback(new_message))
pass
new_message: BrokerMessage = ... # type: ignore
# Yield it!
yield new_message
2 changes: 1 addition & 1 deletion docs/extending-taskiq/broker.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ These rules are optional, and it's ok if your broker doesn't implement them.
1. If the message has the `delay` label with int or float number, this task's `execution` must be delayed
with the same number of seconds as in the delay label.
2. If the message has the `priority` label, this message must be sent with priority. Tasks with
higher priorities are executed faster.
higher priorities are executed sooner.
1,637 changes: 844 additions & 793 deletions poetry.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "taskiq"
version = "0.1.8"
version = "0.2.0"
description = "Distributed task queue with full async support"
authors = ["Pavel Kirilin <[email protected]>"]
maintainers = ["Pavel Kirilin <[email protected]>"]
Expand Down
8 changes: 3 additions & 5 deletions taskiq/abc/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from typing import ( # noqa: WPS235
TYPE_CHECKING,
Any,
AsyncGenerator,
Awaitable,
Callable,
Coroutine,
Expand Down Expand Up @@ -149,17 +150,14 @@ async def kick(
"""

@abstractmethod
async def listen(
self,
callback: Callable[[BrokerMessage], Coroutine[Any, Any, None]],
) -> None:
def listen(self) -> AsyncGenerator[BrokerMessage, None]:
"""
This function listens to new messages and yields them.
This it the main point for workers.
This function is used to get new tasks from the network.
:param callback: function to call when message received.
:yield: incoming messages.
:return: nothing.
"""

Expand Down
8 changes: 2 additions & 6 deletions taskiq/brokers/inmemory_broker.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import inspect
from collections import OrderedDict
from typing import Any, Callable, Coroutine, Optional, TypeVar, get_type_hints
from typing import Any, AsyncGenerator, Callable, Optional, TypeVar, get_type_hints

from taskiq_dependencies import DependencyGraph

Expand Down Expand Up @@ -143,17 +143,13 @@ async def kick(self, message: BrokerMessage) -> None:

await self.receiver.callback(message=message)

async def listen(
self,
callback: Callable[[BrokerMessage], Coroutine[Any, Any, None]],
) -> None:
def listen(self) -> AsyncGenerator[BrokerMessage, None]:
"""
Inmemory broker cannot listen.
This method throws RuntimeError if you call it.
Because inmemory broker cannot really listen to any of tasks.
:param callback: message callback.
:raises RuntimeError: if this method is called.
"""
raise RuntimeError("Inmemory brokers cannot listen.")
Expand Down
8 changes: 2 additions & 6 deletions taskiq/brokers/shared_broker.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Callable, Coroutine, Optional, TypeVar
from typing import AsyncGenerator, Optional, TypeVar

from taskiq.abc.broker import AsyncBroker
from taskiq.decor import AsyncTaskiqDecoratedTask
Expand Down Expand Up @@ -59,16 +59,12 @@ async def kick(self, message: BrokerMessage) -> None:
"without setting the default_broker.",
)

async def listen(
self,
callback: Callable[[BrokerMessage], Coroutine[Any, Any, None]],
) -> None: # type: ignore
async def listen(self) -> AsyncGenerator[BrokerMessage, None]: # type: ignore
"""
Shared broker cannot listen to tasks.
This method will throw an exception.
:param callback: message callback.
:raises TaskiqError: if called.
"""
raise TaskiqError("Shared broker cannot listen")
Expand Down
13 changes: 4 additions & 9 deletions taskiq/brokers/zmq_broker.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import asyncio
from logging import getLogger
from typing import Any, Callable, Coroutine, Optional, TypeVar
from typing import AsyncGenerator, Callable, Optional, TypeVar

from taskiq.abc.broker import AsyncBroker
from taskiq.abc.result_backend import AsyncResultBackend
Expand Down Expand Up @@ -62,16 +61,12 @@ async def kick(self, message: BrokerMessage) -> None:
with self.socket.connect(self.sub_host) as sock:
await sock.send_string(message.json())

async def listen(
self,
callback: Callable[[BrokerMessage], Coroutine[Any, Any, None]],
) -> None:
async def listen(self) -> AsyncGenerator[BrokerMessage, None]:
"""
Start accepting new messages.
:param callback: function to call when message received.
:yields: incoming messages.
"""
loop = asyncio.get_event_loop()
with self.socket.connect(self.sub_host) as sock:
while True:
received_str = await sock.recv_string()
Expand All @@ -80,4 +75,4 @@ async def listen(
except ValueError:
logger.warning("Cannot parse received message %s", received_str)
continue
loop.create_task(callback(broker_msg))
yield broker_msg
9 changes: 9 additions & 0 deletions taskiq/cli/worker/args.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class WorkerArgs:
shutdown_timeout: float = 5
reload: bool = False
no_gitignore: bool = False
max_async_tasks: int = 100

@classmethod
def from_cli( # noqa: WPS213
Expand Down Expand Up @@ -123,6 +124,14 @@ def from_cli( # noqa: WPS213
dest="no_gitignore",
help="Do not use gitignore to check for updated files.",
)
parser.add_argument(
"--max-async-tasks",
type=int,
dest="max_async_tasks",
default=100,
help="Maximum simultaneous async tasks per worker process. "
+ "Infinite if less than 1",
)

namespace = parser.parse_args(args)
return WorkerArgs(**namespace.__dict__)
23 changes: 22 additions & 1 deletion taskiq/cli/worker/async_task_runner.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
from logging import getLogger

from taskiq.abc.broker import AsyncBroker
Expand Down Expand Up @@ -25,4 +26,24 @@ async def async_listen_messages(
logger.info("Inicialized receiver.")
receiver = Receiver(broker, cli_args)
logger.info("Listening started.")
await broker.listen(receiver.callback)
tasks = set()
async for message in broker.listen():
task = asyncio.create_task(
receiver.callback(message=message, raise_err=False),
)
tasks.add(task)

# We want the task to remove itself from the set when it's done.
#
# Because python's GC can silently cancel task
# and it considered to be Hisenbug.
# https://textual.textualize.io/blog/2023/02/11/the-heisenbug-lurking-in-your-async-code/
task.add_done_callback(tasks.discard)

# If we have finite number of maximum simultanious tasks,
# we await them when we reached the limit.
# But we don't await all of them, we await only first completed task,
# and then continue.
if 1 <= cli_args.max_async_tasks <= len(tasks):
_, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
tasks = pending
7 changes: 2 additions & 5 deletions tests/abc/test_broker.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Callable, Coroutine
from typing import AsyncGenerator

from taskiq.abc.broker import AsyncBroker
from taskiq.decor import AsyncTaskiqDecoratedTask
Expand All @@ -17,10 +17,7 @@ async def kick(self, message: BrokerMessage) -> None:
:param message: message to lost.
"""

async def listen(
self,
callback: Callable[[BrokerMessage], Coroutine[Any, Any, None]],
) -> None:
async def listen(self) -> AsyncGenerator[BrokerMessage, None]: # type: ignore
"""
This method is not implemented.
Expand Down

0 comments on commit 6ef1a22

Please sign in to comment.