Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

multiple queues #16

Open
vvanglro opened this issue May 8, 2023 · 6 comments
Open

multiple queues #16

vvanglro opened this issue May 8, 2023 · 6 comments

Comments

@vvanglro
Copy link
Contributor

vvanglro commented May 8, 2023

Like this, I just give a simple example, or there may be other more elegant ways to create multiple queues.

from taskiq_aio_pika import AioPikaBroker

broker = AioPikaBroker(queue_name="q1")
broker.add_queue(queue_name="q2")

# The first way to specify the queue name
@broker.task(queue="q1")
async def test() -> None:
    print("nothing")

@broker.task(queue="q2")
async def test2() -> None:
    print("nothing")    

# The second way to specify the queue name
@broker.task()
async def test3() -> None:
    print("nothing")

await test3.kicker().with_queue(queue="q1").kiq()
@s3rius
Copy link
Member

s3rius commented May 8, 2023

Hi and thanks for creating an issue and PR. But unfortunately, currently I'm not sure about the PR and the issue. So, here's the way you can achieve desired behavior right now:

  1. You defined different brokers for different queues. And explicitly set queue name for each of them.
  2. You define tasks with default brokers for them. For you example it goes like this:
from taskiq import async_shared_broker

broker1 = AioPikaBroker(queue_name="q1")
broker2 = AioPikaBroker(queue_name="q2")

@broker1.task
async def test() -> None:
    print("nothing")

@broker2.task
async def test2() -> None:
    print("nothing")    

@async_shared_broker.task
async def test3() -> None:
    print("nothing")

But in this example test3 doesn't know on which broker it should be called and you must specify it explicitly when calling using kicker.

  1. When you want to send a task that was created for a different broker, you use kicker. For example if you have a task, that was created for broker1:
@broker1.task
async def target_task():
	...

You may call it with another broker like this:

task = await target_task.kicker().with_broker(broker2).kiq()

Adding queue names slow down message processing and I'm wondering how it can be useful in different scenarios. Can you please provide examples where it would be useful?

@vvanglro
Copy link
Contributor Author

vvanglro commented May 8, 2023

For example, if I have background tasks of different durations and types, I can put tasks of different durations into different queues, and tasks of different categories into different queues.

  1. You defined different brokers for different queues. And explicitly set queue name for each of them.
  2. You define tasks with default brokers for them. For you example it goes like this:
from taskiq import async_shared_broker

broker1 = AioPikaBroker(queue_name="q1")
broker2 = AioPikaBroker(queue_name="q2")

@broker1.task
async def test() -> None:
    print("nothing")

@broker2.task
async def test2() -> None:
    print("nothing")    

@async_shared_broker.task
async def test3() -> None:
    print("nothing")

This can be done by defining different brokers, but the management and use may be confusing. For example, if I need 5 queues, then I have to define 5 brokers, and I need to execute it 5 times when starting through the command line

taskiq worker1:broker

@s3rius
Copy link
Member

s3rius commented May 8, 2023

Different durations for tasks should not be a problem, since we execute our tasks in async way. Even if the function is sync, we use threadPool executor to make it async.

If you're really want this functionality, probably I would suggest you to inherit from the AioPikaBroker and get queue name from task labels. Then you'll be able to define tasks exactly as in your example.

I have to ask @taskiq-python/core-team about this feature, because currently I'm not sure into implementing it.

@vvanglro
Copy link
Contributor Author

vvanglro commented May 8, 2023

This is an optional feature. If the user add_queue, multiple queues can be used, and there is almost no intrusion.

@osttra-o-rotel
Copy link

osttra-o-rotel commented Jul 24, 2024

Hej :)

I'm trying this approach with two brokers... and FastAPI involved, so I have this broker declaration:

brokers

repo_process_broker = AioPikaBroker(env.message_broker_url, queue_name="repo_updates_queue").with_result_backend(
    RedisAsyncResultBackend(env.result_backend_url)
)

admin_broker = AioPikaBroker(env.message_broker_url, queue_name="admin_queue").with_result_backend(
    RedisAsyncResultBackend(env.result_backend_url)
)

brokers = [repo_process_broker, admin_broker]

for broker in brokers:
    taskiq_fastapi.init(broker, "sparrow.distributed_app.fastapi_app:get_app")

These in tasks.py:

@repo_process_broker.task()
async def process_repo_target(
    # some args
    db: Session = TaskiqDepends(get_db),
    deps: Dependencies = TaskiqDepends(get_deps),
):
    ...

@admin_broker.task()
async def fetch_sparrow_repo_config(
    # other args
    db: Session = TaskiqDepends(get_db),
    deps: Dependencies = TaskiqDepends(get_deps),
) -> dict[RepoTargetId, RepoTargetSettings]:

And when I trigger either of them. Example

# in a FastApi route:
await fetch_sparrow_repo_config.kiq(project_key=project_key, repo_name=repo_name)

I can see in rabbitMQ management, that the message is placed on both repo_updates_queue and admin_queue.

And when I start two worker processes (one with each broker).

Both brokers get the message, one of them process the task correctly (the admin), but the other one prints:

WARNING:	2024-07-24 23:11:51,380::taskiq.receiver.receiver: task "app.tasks:fetch_sparrow_repo_config" is not found. Maybe you forgot to import it?

This is how rabbit looks when I start the FastApi app
Screenshot 2024-07-24 at 23 17 43

This one after a kik() the task from FastApi
Screenshot 2024-07-24 at 23 19 33

After I start the admin broker, and executes the task properly.
Screenshot 2024-07-24 at 23 21 16

And when I start the repo_process one, which is when I get the warning fot the task not being defined, the message ends up un acked (I guess because I have the default ack on save, and that doesn't happen here).
Screenshot 2024-07-24 at 23 21 38

So every time I restart the repo_process_broker worker, It asks for those unacked ones and the warnings accumulate with each new message.

Side note, I really like the project! Has very nice usage!

Thanks for any info... let me know if you need more details to reproduce...

Saludos!

@osttra-o-rotel
Copy link

After playing a bit more, and looking at taskiq_aio_pika/broker.py, I saw that declaring the brokers with an exchange name, did the trick:

repo_process_broker = AioPikaBroker(
    env.message_broker_url,
    queue_name="repo_updates_queue",
    exchange_name="repo_updates_exchange",
).with_result_backend(
    RedisAsyncResultBackend(env.result_backend_url)
)

:)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants