Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multiple producers #36

Open
josevalim opened this issue Feb 13, 2019 · 24 comments
Open

Support multiple producers #36

josevalim opened this issue Feb 13, 2019 · 24 comments

Comments

@josevalim
Copy link
Member

This issue is open to collect feedback and use cases.

@lessless
Copy link

lessless commented Mar 8, 2019

Hi folks,

Thanks for pushing boundaries. Broadway has potential to make our lives so much simpler, and according to my current understanding having multiple producers is a key here.

In our case latency and correctness are two top priorities. Say, we need to consume a 1k messages at the same time (1k producers?) and be sure that each of them will end up in the sam processing unit (partitioned demand dispatcher?).

Our events has a lifecycle and to avoid queuing we are ok for spawning a process per event and shutting it down at the end of the event lifecycle or by timeout in days.

@josevalim
Copy link
Member Author

josevalim commented Mar 8, 2019 via email

@josevalim
Copy link
Member Author

Our events has a lifecycle and to avoid queuing we are ok for spawning a process per event and shutting it down at the end of the event lifecycle or by timeout in days.

Follow up question: why is it important to go to the same unit? Do you keep intermediate results in memory?

@lessless
Copy link

lessless commented Mar 9, 2019

  1. Each message has an unique event_id field

  2. Nope, it's about sequentiality. Some of a parallelly processed messages may update an exact same entity and in our system all updated should happen in an exactly the same order as they were in a queue.
    Similar to a problem with word "are" in the word counting example ;)

@josevalim
Copy link
Member Author

Alright, it is more about ordering than locality. Thank you!

@mgwidmann
Copy link

The broadway sqs producer is very difficult to tune without this feature. If I cannot separate download concurrency from processing concurrency (by making a separate stage for downloaders and a separate stage for processors) then I cannot scale them independently.

The best workaround currently is to have the processor spawn a few tasks to do the download but that leaves the processor waiting instead of processing an already ready message (say from another downloader).

@msaraiva
Copy link
Collaborator

Hi @mgwidmann!

If I cannot separate download concurrency from processing concurrency (by making a separate stage for downloaders and a separate stage for processors) then I cannot scale them independently.

They are already independent. You can define the concurrency level of the producer or processor by setting the :stages option individually. This issue is about supporting multiple different kinds of producers/sources simultaneously, like consuming data from different queues or even from SQS and RabbitMQ.

@mgwidmann
Copy link

Sorry I misread the title, thought this was about multiple processors! Is there a separate issue for that?

@josevalim
Copy link
Member Author

@mgwidmann the issue right above this one. :D #39 I have some comments on this, so please copy and paste your original comment there and we can discuss solutions.

@whatyouhide
Copy link
Collaborator

We have a use case for multiple producers where we have different (RabbitMQ) producers producing from different RabbitMQ connections but producing the same kind of messages, that we want to process in the same way. I think it might not be such a unique use case, so it might be worth adding this :) As always, I volunteer to help if we want to go through with this at some point.

@josevalim
Copy link
Member Author

In this case you can share the code using modules. I think we won’t get this feature in because we are adding the feature for a producer to change the topology, só producers could change the topology in conflicting ways.

@whatyouhide
Copy link
Collaborator

@josevalim I can share the code, yep, but I need to start two different Broadway pipelines with basically the same set of options except for the producer. It's fine, it's what we do now, but since I saw the open issue I thought I would discuss. I'm a bit concerned about the current Broadway API which suggests that multiple producers/processors should be supported (for example, passing a list of producers/processors, passing the producer name in handle_message, and so on). So maybe it might be worth deprecating the current API at some point.

@whatyouhide
Copy link
Collaborator

@josevalim btw, can you expand on the feature of a producer changing the topology?

@josevalim
Copy link
Member Author

It is issue #100.

@stavro
Copy link

stavro commented Jan 24, 2023

I have a situation where we have ~40 SQS queues that need to be consumed from.

I have concerns about setting each SQS queue up in an isolated BroadwaySQS pipeline, with the major concern being that tuning the global concurrency of message handling isn't possible.

In an ideal scenario I would be able to merge the messages from all queues into a singular pipeline, of which a limited number of processors would handle messages across all 40 queues (perhaps with custom priority logic).

With each BroadwaySQS pipeline in isolation, each of the 40 isolated pipelines would have a fixed number of processors, and under heavy load could overwhelm the system.

@josevalim
Copy link
Member Author

My suggestion is two:

  1. Change BroadwaySQS to allow multiple queues
  2. Allow the queue/queues themselves to be costumised per producer index (BroadwayRabbitMQ already has this feature)

Then the idea is that you start X producers with Y queues. This is better than 40 producers because demand is always individual between producer/processor, and not shared.

@josevalim
Copy link
Member Author

Pull request are welcome! :)

@xandervr
Copy link

xandervr commented Mar 10, 2023

Is someone working on this?

My use case would be to handle multiple sources of information in one pipeline and thus needing multiple types of producers in one pipeline.

@whatyouhide
Copy link
Collaborator

@xandervr do you mean the SQS case that José mentioned above?

@xandervr
Copy link

xandervr commented Mar 16, 2023

Not in particular, in general I just want Broadway to be able to have multiple types of producers in 1 pipeline. SQS, RabbitMQ... does not really matter, just every type of GenStage should be supported.

Let me know if I misunderstood your question.

@whatyouhide

@josevalim
Copy link
Member Author

I cleaned up the thread a bit and reopened it.

@atomkirk
Copy link
Contributor

atomkirk commented Aug 5, 2023

My use case is I have a bunch of GCP deadletter topics that I'd like to consume from and process all the exact same way: store them in a table for review and possible retry. Would be nice not to set up a pipeline for each one.

@ekosz
Copy link

ekosz commented Jul 18, 2024

Hi there! Thank you again for the amazing library. I figured I would explain our usecase for multiple producers. We're an application that works with content creators. As creators signup with our service we need to listen to incoming events either from Youtube or Twitch for that new user, transform them, then both rebroadcast them to our own event system and store the (batched) events our datalake. We were thinking each creator/source tuple would be their own producer as they each have their own API credentials to manage as well as different ways they fetch events and such.

If we can only have a single producer than would the suggestion to create single process that all of our creator-specific produces send their events to? Which then acts as the source for the rest of the pipeline? Also not sure how we would shard / dynamically add or remove the producers as well.

@josevalim
Copy link
Member Author

I think you only need the batching part of Broadway. We had plans to extract it out but we never completed them. But you should be able to roll your own batcher processes that accumulates items and then starts a task to publish them to your storage.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

9 participants