Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for consumer event listener #259

Open
guilload opened this issue Feb 12, 2023 · 2 comments
Open

Support for consumer event listener #259

guilload opened this issue Feb 12, 2023 · 2 comments

Comments

@guilload
Copy link
Contributor

We're interested in adding support for this feature that already exists in the Java client. I have quickly looked at the code and most of the logic to handle active/inactive events is already in place. What remains to be implemented is surfacing the event to the user.

First, I have one question:
Is there any guarantee that when a consumer becomes active for a topic, it always receives a CommandActiveConsumerChange before the first message?

I see a few options regarding the public API:

  1. Add a new method at the consumer level that returns a stream of events. The user is in charge of polling both futures.
tokio::select! {
  biased;
  event = consumer.events.next() => {
  }
  
  message = consumer.next() => {
  }
}
  1. Add a new method at the consumer level that returns a stream of events where the event object is an enum:
pub enum PulsarEvent {
  BecameActive(...),
  BecameInactive(...),
  Message(...)
}

 while let Some(event) = consumer.events.try_next().await?

The latter is, IMHO, easier to consume for the end user.

Regarding the implementation strategy, we can reuse the main message channel with an enum, or handle events in a separate channel.

Thoughts?

@BewareMyPower
Copy link

Is there any guarantee that when a consumer becomes active for a topic, it always receives a CommandActiveConsumerChange before the first message?

Yes, you can see the following code in PersistentDispatcherSingleActiveConsumer:

            notifyActiveConsumerChanged(activeConsumer);
            readMoreEntries(activeConsumer);

Only after the ACTIVE_CONSUMER_CHANGE command was sent to the consumer would the dispatcher read entries.

The latter is, IMHO, easier to consume for the end user.

I prefer the latter one as well.. The first one is more Java-style, which is not easy to use IMO.

@FlorentinDUBOIS
Copy link
Collaborator

I am agree with @BewareMyPower, the latter is better. To me, this syntax is more idiomatic than the first one.

Regarding the implementation strategy, we can reuse the main message channel with an enum, or handle events in a separate channel.

I think it is better to use the main channel with an enum.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants