Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18071: Avoid event to refresh regex if no pattern subscription #17917

Open
wants to merge 4 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1765,12 +1765,14 @@ private void sendPrefetches(Timer timer) {
@Override
public boolean updateAssignmentMetadataIfNeeded(Timer timer) {
offsetCommitCallbackInvoker.executeCallbacks();
try {
applicationEventHandler.addAndGet(new UpdatePatternSubscriptionEvent(calculateDeadlineMs(timer)));
} catch (TimeoutException e) {
return false;
} finally {
timer.update();
if (subscriptions.hasPatternSubscription()) {
try {
applicationEventHandler.addAndGet(new UpdatePatternSubscriptionEvent(calculateDeadlineMs(timer)));
} catch (TimeoutException e) {
return false;
} finally {
timer.update();
}
}
processBackgroundEvents();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,11 +320,12 @@ private void process(final TopicPatternSubscriptionChangeEvent event) {
* This will make the consumer send the updated subscription on the next poll.
*/
private void process(final UpdatePatternSubscriptionEvent event) {
if (!subscriptions.hasPatternSubscription()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this test really necessary? This event is only sent if the application thread knows there's a pattern subscription.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

truly it's not needed with the current usage of this event from the app thread. I just left it for extra protection in this thread really (ok to remove it if we prefer)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just checking my understanding. It looks like the remains of an earlier thought :)

return;
}
if (this.metadataVersionSnapshot < metadata.updateVersion()) {
this.metadataVersionSnapshot = metadata.updateVersion();
if (subscriptions.hasPatternSubscription()) {
updatePatternSubscription(metadata.fetch());
}
updatePatternSubscription(metadata.fetch());
}
event.future().complete(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.kafka.clients.consumer.internals.events.TopicPatternSubscriptionChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.TopicSubscriptionChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
import org.apache.kafka.clients.consumer.internals.events.UpdatePatternSubscriptionEvent;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.TopicPartition;
Expand Down Expand Up @@ -1829,6 +1830,27 @@ public void testSeekToEnd() {
assertEquals(OffsetResetStrategy.LATEST, resetOffsetEvent.offsetResetStrategy());
}

@Test
public void testUpdatePatternSubscriptionEventGeneratedOnlyIfPatternUsed() {
consumer = newConsumer();
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
completeAssignmentChangeEventSuccessfully();
completeTopicPatternSubscriptionChangeEventSuccessfully();
completeUnsubscribeApplicationEventSuccessfully();

consumer.assign(singleton(new TopicPartition("topic1", 0)));
consumer.poll(Duration.ZERO);
verify(applicationEventHandler, never()).addAndGet(any(UpdatePatternSubscriptionEvent.class));

consumer.unsubscribe();

consumer.subscribe(Pattern.compile("t*"));
consumer.poll(Duration.ZERO);
verify(applicationEventHandler).addAndGet(any(UpdatePatternSubscriptionEvent.class));
}

private Map<TopicPartition, OffsetAndMetadata> mockTopicPartitionOffset() {
final TopicPartition t0 = new TopicPartition("t0", 2);
final TopicPartition t1 = new TopicPartition("t0", 3);
Expand Down Expand Up @@ -1913,6 +1935,7 @@ private void completeFetchedCommittedOffsetApplicationEventExceptionally(Excepti
private void completeUnsubscribeApplicationEventSuccessfully() {
doAnswer(invocation -> {
UnsubscribeEvent event = invocation.getArgument(0);
consumer.subscriptions().unsubscribe();
event.future().complete(null);
return null;
}).when(applicationEventHandler).add(ArgumentMatchers.isA(UnsubscribeEvent.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,11 +355,10 @@ public void testUpdatePatternSubscriptionEventOnlyTakesEffectWhenMetadataHasNewV
UpdatePatternSubscriptionEvent event1 = new UpdatePatternSubscriptionEvent(12345);

setupProcessor(true);

when(subscriptionState.hasPatternSubscription()).thenReturn(true);
when(metadata.updateVersion()).thenReturn(0);

processor.process(event1);
verify(subscriptionState, never()).hasPatternSubscription();
assertDoesNotThrow(() -> event1.future().get());

Cluster cluster = mock(Cluster.class);
Expand All @@ -377,7 +376,6 @@ public void testUpdatePatternSubscriptionEventOnlyTakesEffectWhenMetadataHasNewV
UpdatePatternSubscriptionEvent event2 = new UpdatePatternSubscriptionEvent(12345);
processor.process(event2);
verify(metadata).requestUpdateForNewTopics();
verify(subscriptionState).hasPatternSubscription();
verify(subscriptionState).subscribeFromPattern(topics);
assertEquals(1, processor.metadataVersionSnapshot());
verify(membershipManager).onSubscriptionUpdated();
Expand Down
Loading