diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/AbstractKafkaConsumerService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/AbstractKafkaConsumerService.java index 63d26dd9ae..bf6c68564e 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/AbstractKafkaConsumerService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/AbstractKafkaConsumerService.java @@ -21,7 +21,11 @@ public abstract SharedKafkaConsumer getConsumerAssignedToVersionTopicPartition( public abstract void unsubscribeAll(PubSubTopic versionTopic); - public abstract void unSubscribe(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition); + public void unSubscribe(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition) { + unSubscribe(versionTopic, pubSubTopicPartition, SharedKafkaConsumer.DEFAULT_MAX_WAIT_MS); + } + + public abstract void unSubscribe(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition, long timeoutMs); public abstract void batchUnsubscribe(PubSubTopic versionTopic, Set topicPartitionsToUnSub); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/AggKafkaConsumerService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/AggKafkaConsumerService.java index 628b93b510..9c80e6674e 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/AggKafkaConsumerService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/AggKafkaConsumerService.java @@ -364,8 +364,15 @@ void resetOffsetFor(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPa } public void unsubscribeConsumerFor(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition) { + unsubscribeConsumerFor(versionTopic, pubSubTopicPartition, SharedKafkaConsumer.DEFAULT_MAX_WAIT_MS); + } + + public void unsubscribeConsumerFor( + PubSubTopic versionTopic, + PubSubTopicPartition pubSubTopicPartition, + long timeoutMs) { for (AbstractKafkaConsumerService consumerService: kafkaServerToConsumerServiceMap.values()) { - consumerService.unSubscribe(versionTopic, pubSubTopicPartition); + consumerService.unSubscribe(versionTopic, pubSubTopicPartition, timeoutMs); } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerService.java index b7d76bb930..24110dbea6 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerService.java @@ -243,15 +243,15 @@ public void unsubscribeAll(PubSubTopic versionTopic) { * Stop specific subscription associated with the given version topic. */ @Override - public void unSubscribe(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition) { - PubSubConsumerAdapter consumer = getConsumerAssignedToVersionTopicPartition(versionTopic, pubSubTopicPartition); + public void unSubscribe(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition, long timeoutMs) { + SharedKafkaConsumer consumer = getConsumerAssignedToVersionTopicPartition(versionTopic, pubSubTopicPartition); if (consumer != null) { /** * Refer {@link KafkaConsumerService#startConsumptionIntoDataReceiver} for avoiding race condition caused by * setting data receiver and unsubscribing concurrently for the same topic partition on a shared consumer. */ synchronized (consumer) { - consumer.unSubscribe(pubSubTopicPartition); + consumer.unSubscribe(pubSubTopicPartition, timeoutMs); removeTopicPartitionFromConsumptionTask(consumer, pubSubTopicPartition); } versionTopicToTopicPartitionToConsumer.compute(versionTopic, (k, topicPartitionToConsumerMap) -> { diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerServiceDelegator.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerServiceDelegator.java index 8947726ad4..d018a000a0 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerServiceDelegator.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerServiceDelegator.java @@ -153,10 +153,10 @@ public void unsubscribeAll(PubSubTopic versionTopic) { } @Override - public void unSubscribe(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition) { + public void unSubscribe(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition, long timeoutMs) { KafkaConsumerService kafkaConsumerService = getKafkaConsumerService(versionTopic, pubSubTopicPartition); if (kafkaConsumerService != null) { - kafkaConsumerService.unSubscribe(versionTopic, pubSubTopicPartition); + kafkaConsumerService.unSubscribe(versionTopic, pubSubTopicPartition, timeoutMs); topicPartitionToConsumerService.remove(new TopicPartitionForIngestion(versionTopic, pubSubTopicPartition)); } else { LOGGER.warn( diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/SharedKafkaConsumer.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/SharedKafkaConsumer.java index 735554a5e0..21f82134ad 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/SharedKafkaConsumer.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/SharedKafkaConsumer.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; @@ -39,8 +40,15 @@ * TODO: move this logic inside consumption task, this class does not need to be sub-class of {@link PubSubConsumerAdapter} */ class SharedKafkaConsumer implements PubSubConsumerAdapter { + public static final long DEFAULT_MAX_WAIT_MS = TimeUnit.SECONDS.toMillis(10); + /** + * Increase the max wait during state transitions to ensure that it waits for the messages to finish processing. A + * poll() indicates that all previous inflight messages under the previous state were processed, so there can't be a + * state mismatch. The consumer_records_producing_to_write_buffer_latency metric suggests how long the wait should be. + */ + public static final long STATE_TRANSITION_MAX_WAIT_MS = TimeUnit.MINUTES.toMillis(30); + private static final Logger LOGGER = LogManager.getLogger(SharedKafkaConsumer.class); - private long nextPollTimeOutSeconds = 10; protected final PubSubConsumerAdapter delegate; @@ -61,6 +69,8 @@ class SharedKafkaConsumer implements PubSubConsumerAdapter { */ private final AtomicBoolean waitingForPoll = new AtomicBoolean(false); + private long timeoutMsOverride = -1; // for unit test purposes + private final Time time; /** @@ -145,20 +155,23 @@ synchronized void subscribe( updateCurrentAssignment(delegate.getAssignment()); } + public synchronized void unSubscribe(PubSubTopicPartition pubSubTopicPartition) { + unSubscribe(pubSubTopicPartition, DEFAULT_MAX_WAIT_MS); + } + /** * There is an additional goal of this function which is to make sure that all the records consumed for this {topic,partition} prior to * calling unsubscribe here is produced to drainer service. {@link ConsumptionTask#run()} ends up calling * {@link SharedKafkaConsumer#poll(long)} and produceToStoreBufferService sequentially. So waiting for at least one more * invocation of {@link SharedKafkaConsumer#poll(long)} achieves the above objective. */ - @Override - public synchronized void unSubscribe(PubSubTopicPartition pubSubTopicPartition) { + public synchronized void unSubscribe(PubSubTopicPartition pubSubTopicPartition, long timeoutMs) { unSubscribeAction(() -> { this.delegate.unSubscribe(pubSubTopicPartition); PubSubTopic versionTopic = subscribedTopicPartitionToVersionTopic.remove(pubSubTopicPartition); unsubscriptionListener.call(this, versionTopic, pubSubTopicPartition); return Collections.singleton(pubSubTopicPartition); - }); + }, timeoutMs); } @Override @@ -170,7 +183,7 @@ public synchronized void batchUnsubscribe(Set pubSubTopicP unsubscriptionListener.call(this, versionTopic, pubSubTopicPartition); } return pubSubTopicPartitionSet; - }); + }, DEFAULT_MAX_WAIT_MS); } /** @@ -179,7 +192,7 @@ public synchronized void batchUnsubscribe(Set pubSubTopicP * * @param supplier which performs the unsubscription and returns a set of partitions which were unsubscribed */ - protected synchronized void unSubscribeAction(Supplier> supplier) { + protected synchronized void unSubscribeAction(Supplier> supplier, long timeoutMs) { long currentPollTimes = pollTimes; Set topicPartitions = supplier.get(); long startTime = System.currentTimeMillis(); @@ -191,28 +204,44 @@ protected synchronized void unSubscribeAction(Supplier topicPartitions, elapsedTime); updateCurrentAssignment(delegate.getAssignment()); - waitAfterUnsubscribe(currentPollTimes, topicPartitions); + waitAfterUnsubscribe(currentPollTimes, topicPartitions, timeoutMs); } - protected void waitAfterUnsubscribe(long currentPollTimes, Set topicPartitions) { + protected void waitAfterUnsubscribe( + long currentPollTimes, + Set topicPartitions, + long timeoutMs) { + // This clause is only for unit test purposes, for when the timeout needs to be set to 0. + if (timeoutMsOverride != -1) { + timeoutMs = timeoutMsOverride; + } + currentPollTimes++; waitingForPoll.set(true); // Wait for the next poll or maximum 10 seconds. Interestingly wait api does not provide any indication if wait // returned // due to timeout. So an explicit time check is necessary. - long timeoutMs = (time.getNanoseconds() / Time.NS_PER_MS) + nextPollTimeOutSeconds * Time.MS_PER_SECOND; + final long startTimeMs = time.getMilliseconds(); + final long endTimeMs = startTimeMs + timeoutMs; try { while (currentPollTimes > pollTimes) { - long waitMs = timeoutMs - (time.getNanoseconds() / Time.NS_PER_MS); + final long waitMs = endTimeMs - time.getMilliseconds(); if (waitMs <= 0) { LOGGER.warn( - "Wait for poll request after unsubscribe topic partition(s) ({}) timed out after {} seconds", + "Wait for poll request after unsubscribe topic partition(s) ({}) timed out after {} milliseconds", topicPartitions, - nextPollTimeOutSeconds); + timeoutMs); break; } wait(waitMs); } + final long elapsedMs = time.getMilliseconds() - startTimeMs; + if (elapsedMs > TimeUnit.SECONDS.toMillis(15)) { + LOGGER.warn( + "Wait for poll request after unsubscribe topic partition(s) ({}) took {} milliseconds", + topicPartitions, + elapsedMs); + } // no action to take actually, just return; } catch (InterruptedException e) { LOGGER.info("Wait for poll request in `unsubscribe` function got interrupted."); @@ -221,8 +250,8 @@ protected void waitAfterUnsubscribe(long currentPollTimes, Set partitionsFor(PubSubTopic topic) { throw new UnsupportedOperationException("partitionsFor is not supported in SharedKafkaConsumer"); } - - // Test only - public void setNextPollTimeOutSeconds(long seconds) { - this.nextPollTimeOutSeconds = seconds; - } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index 8ac571b5d1..bcf6dc4444 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -3405,7 +3405,12 @@ public void consumerUnSubscribe(PubSubTopic topic, PartitionConsumptionState par Instant startTime = Instant.now(); int partitionId = partitionConsumptionState.getPartition(); PubSubTopicPartition topicPartition = new PubSubTopicPartitionImpl(topic, partitionId); - aggKafkaConsumerService.unsubscribeConsumerFor(versionTopic, topicPartition); + /** + * Use an increased timeout for waitAfterUnsubscribe() of up to 30 minutes according to the maximum value of + * the metric consumer_records_producing_to_write_buffer_latency + */ + aggKafkaConsumerService + .unsubscribeConsumerFor(versionTopic, topicPartition, SharedKafkaConsumer.STATE_TRANSITION_MAX_WAIT_MS); LOGGER.info( "Consumer unsubscribed to topic-partition: {} for replica: {}. Took {} ms", topicPartition, diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerServiceDelegatorTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerServiceDelegatorTest.java index cc9f932467..4ddba90b06 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerServiceDelegatorTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerServiceDelegatorTest.java @@ -65,11 +65,41 @@ public static Object[][] methodList() { { "unSubscribe" }, { "getOffsetLagBasedOnMetrics" }, { "getLatestOffsetBasedOnMetrics" } }; } + private void invokeAndVerify( + KafkaConsumerServiceDelegator delegator, + KafkaConsumerService invokedConsumerService, + KafkaConsumerService unusedConsumerService, + PubSubTopic versionTopic, + PubSubTopicPartition topicPartition, + String methodName) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { + boolean includeLongParam = methodName.equals("unSubscribe"); + if (includeLongParam) { + Method testMethod = KafkaConsumerServiceDelegator.class + .getMethod(methodName, PubSubTopic.class, PubSubTopicPartition.class, long.class); + Method verifyMethod = + KafkaConsumerService.class.getMethod(methodName, PubSubTopic.class, PubSubTopicPartition.class, long.class); + testMethod.invoke(delegator, versionTopic, topicPartition, 0L); + verifyMethod.invoke(verify(invokedConsumerService), versionTopic, topicPartition, 0L); + verifyMethod.invoke(verify(unusedConsumerService, never()), versionTopic, topicPartition, 0L); + } else { + Method testMethod = + KafkaConsumerServiceDelegator.class.getMethod(methodName, PubSubTopic.class, PubSubTopicPartition.class); + Method verifyMethod = + KafkaConsumerService.class.getMethod(methodName, PubSubTopic.class, PubSubTopicPartition.class); + testMethod.invoke(delegator, versionTopic, topicPartition); + verifyMethod.invoke(verify(invokedConsumerService), versionTopic, topicPartition); + verifyMethod.invoke(verify(unusedConsumerService, never()), versionTopic, topicPartition); + } + + reset(invokedConsumerService); + reset(unusedConsumerService); + } + @Test(dataProvider = "Method-List") public void chooseConsumerServiceTest(String methodName) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { - KafkaConsumerService mockDefaultConsumerService = mock(KafkaConsumerService.class); - KafkaConsumerService mockDedicatedConsumerService = mock(KafkaConsumerService.class); + KafkaConsumerService defaultMockService = mock(KafkaConsumerService.class); + KafkaConsumerService dedicatedMockService = mock(KafkaConsumerService.class); VeniceServerConfig mockConfig = mock(VeniceServerConfig.class); doReturn(true).when(mockConfig).isDedicatedConsumerPoolForAAWCLeaderEnabled(); doReturn(KafkaConsumerServiceDelegator.ConsumerPoolStrategyType.AA_OR_WC_LEADER_DEDICATED).when(mockConfig) @@ -78,8 +108,8 @@ public void chooseConsumerServiceTest(String methodName) Function isAAWCStoreFunc = vt -> true; KafkaConsumerServiceDelegator.KafkaConsumerServiceBuilder consumerServiceBuilder = (ignored, poolType) -> poolType.equals(ConsumerPoolType.REGULAR_POOL) - ? mockDefaultConsumerService - : mockDedicatedConsumerService; + ? defaultMockService + : dedicatedMockService; KafkaConsumerServiceDelegator delegator = new KafkaConsumerServiceDelegator(mockConfig, consumerServiceBuilder, isAAWCStoreFunc); @@ -105,35 +135,16 @@ public void chooseConsumerServiceTest(String methodName) PartitionReplicaIngestionContext.WorkloadType.NON_AA_OR_WRITE_COMPUTE); delegator.startConsumptionIntoDataReceiver(topicPartitionIngestionContextForRT, 0, dataReceiver); - Method testMethod = - KafkaConsumerServiceDelegator.class.getMethod(methodName, PubSubTopic.class, PubSubTopicPartition.class); - Method verifyMethod = - KafkaConsumerService.class.getMethod(methodName, PubSubTopic.class, PubSubTopicPartition.class); - - testMethod.invoke(delegator, versionTopic, topicPartitionForVT); - verifyMethod.invoke(verify(mockDefaultConsumerService), versionTopic, topicPartitionForVT); - verifyMethod.invoke(verify(mockDedicatedConsumerService, never()), versionTopic, topicPartitionForVT); - reset(mockDefaultConsumerService); - reset(mockDedicatedConsumerService); - testMethod.invoke(delegator, versionTopic, topicPartitionForRT); - verifyMethod.invoke(verify(mockDedicatedConsumerService), versionTopic, topicPartitionForRT); - verifyMethod.invoke(verify(mockDefaultConsumerService, never()), versionTopic, topicPartitionForRT); - reset(mockDefaultConsumerService); - reset(mockDedicatedConsumerService); + invokeAndVerify(delegator, defaultMockService, dedicatedMockService, versionTopic, topicPartitionForVT, methodName); + invokeAndVerify(delegator, dedicatedMockService, defaultMockService, versionTopic, topicPartitionForRT, methodName); isAAWCStoreFunc = vt -> false; delegator = new KafkaConsumerServiceDelegator(mockConfig, consumerServiceBuilder, isAAWCStoreFunc); delegator.startConsumptionIntoDataReceiver(topicPartitionIngestionContextForVT, 0, dataReceiver); delegator.startConsumptionIntoDataReceiver(topicPartitionIngestionContextForRT, 0, dataReceiver); - testMethod.invoke(delegator, versionTopic, topicPartitionForVT); - verifyMethod.invoke(verify(mockDefaultConsumerService), versionTopic, topicPartitionForVT); - verifyMethod.invoke(verify(mockDedicatedConsumerService, never()), versionTopic, topicPartitionForVT); - reset(mockDefaultConsumerService); - reset(mockDedicatedConsumerService); - testMethod.invoke(delegator, versionTopic, topicPartitionForRT); - verifyMethod.invoke(verify(mockDefaultConsumerService), versionTopic, topicPartitionForRT); - verifyMethod.invoke(verify(mockDedicatedConsumerService, never()), versionTopic, topicPartitionForRT); + invokeAndVerify(delegator, defaultMockService, dedicatedMockService, versionTopic, topicPartitionForVT, methodName); + invokeAndVerify(delegator, defaultMockService, dedicatedMockService, versionTopic, topicPartitionForRT, methodName); } @Test @@ -321,7 +332,8 @@ public void consumerAssignmentStickiness() { // Change the AAWC flag retValueForIsAAWCStoreFunc.set(true); delegator.unSubscribe(versionTopic, topicPartitionForRT); - verify(mockDefaultConsumerService).unSubscribe(versionTopic, topicPartitionForRT); + verify(mockDefaultConsumerService) + .unSubscribe(versionTopic, topicPartitionForRT, SharedKafkaConsumer.DEFAULT_MAX_WAIT_MS); verify(mockDedicatedConsumerService, never()).unSubscribe(versionTopic, topicPartitionForRT); } @@ -614,7 +626,7 @@ private Runnable getResubscriptionRunnableFor( consumerServiceDelegator .startConsumptionIntoDataReceiver(partitionReplicaIngestionContext, 0, consumedDataReceiver); // Avoid wait time here to increase the chance for race condition. - consumerServiceDelegator.assignConsumerFor(versionTopic, pubSubTopicPartition).setNextPollTimeOutSeconds(0); + consumerServiceDelegator.assignConsumerFor(versionTopic, pubSubTopicPartition).setTimeoutMsOverride(0L); int versionNum = Version.parseVersionFromKafkaTopicName(partitionReplicaIngestionContext.getVersionTopic().getName()); if (versionNum % 3 == 0) { diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/SharedKafkaConsumerTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/SharedKafkaConsumerTest.java index fd0fd52d9e..897cf4f121 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/SharedKafkaConsumerTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/SharedKafkaConsumerTest.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import org.testng.Assert; import org.testng.annotations.BeforeMethod; @@ -72,7 +73,7 @@ public void testSubscriptionEmptyPoll() { private void setUpSharedConsumer() { consumerAdapter = mock(PubSubConsumerAdapter.class); - AggKafkaConsumerServiceStats stats = mock(AggKafkaConsumerServiceStats.class); + stats = mock(AggKafkaConsumerServiceStats.class); Runnable assignmentChangeListener = mock(Runnable.class); SharedKafkaConsumer.UnsubscriptionListener unsubscriptionListener = mock(SharedKafkaConsumer.UnsubscriptionListener.class); @@ -91,13 +92,11 @@ private void setUpSharedConsumer() { public void testWaitAfterUnsubscribe() { setUpSharedConsumer(); Supplier> supplier = () -> topicPartitions; - - long poolTimesBeforeUnsubscribe = sharedKafkaConsumer.getPollTimes(); - sharedKafkaConsumer.setNextPollTimeoutSeconds(1); - sharedKafkaConsumer.unSubscribeAction(supplier); + long pollTimesBeforeUnsubscribe = sharedKafkaConsumer.getPollTimes(); + sharedKafkaConsumer.unSubscribeAction(supplier, TimeUnit.SECONDS.toMillis(1)); // This is to test that if the poll time is not incremented when the consumer is unsubscribed the correct log can // be found in the logs. - Assert.assertEquals(poolTimesBeforeUnsubscribe, sharedKafkaConsumer.getPollTimes()); + Assert.assertEquals(pollTimesBeforeUnsubscribe, sharedKafkaConsumer.getPollTimes()); } } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java index 4966daf8dc..53e5b75089 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java @@ -1185,6 +1185,27 @@ private void prepareAggKafkaConsumerServiceMock() { return null; }).when(aggKafkaConsumerService).unsubscribeConsumerFor(any(), any()); + doAnswer(invocation -> { + PubSubTopic versionTopic = invocation.getArgument(0, PubSubTopic.class); + PubSubTopicPartition pubSubTopicPartition = invocation.getArgument(1, PubSubTopicPartition.class); + Long timeoutMs = invocation.getArgument(2, Long.class); + /** + * The internal {@link SharedKafkaConsumer} has special logic for unsubscription to avoid some race condition + * between the fast unsubscribe and re-subscribe. + * Please check {@link SharedKafkaConsumer#unSubscribe} to find more details. + * + * We shouldn't use {@link #mockLocalKafkaConsumer} or {@link #inMemoryRemoteKafkaConsumer} here since + * they don't have the proper synchronization. + */ + if (inMemoryLocalKafkaConsumer.hasSubscription(pubSubTopicPartition)) { + localKafkaConsumerService.unSubscribe(versionTopic, pubSubTopicPartition, timeoutMs.longValue()); + } + if (inMemoryRemoteKafkaConsumer.hasSubscription(pubSubTopicPartition)) { + remoteKafkaConsumerService.unSubscribe(versionTopic, pubSubTopicPartition, timeoutMs.longValue()); + } + return null; + }).when(aggKafkaConsumerService).unsubscribeConsumerFor(any(), any(), anyLong()); + doAnswer(invocation -> { PubSubTopicPartition pubSubTopicPartition = invocation.getArgument(1, PubSubTopicPartition.class); if (inMemoryLocalKafkaConsumer.hasSubscription(pubSubTopicPartition)) { diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java index 82624dfaae..e590896f07 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java @@ -118,6 +118,11 @@ private ConfigKeys() { public static final String PUBSUB_TOPIC_MANAGER_METADATA_FETCHER_THREAD_POOL_SIZE = "pubsub.topic.manager.metadata.fetcher.thread.pool.size"; + /** + * How long to wait for the next poll request after unsubscribing, indicating that old messages were processed. + */ + public static final String SERVER_WAIT_AFTER_UNSUBSCRIBE_TIMEOUT_MS = "server.wait.after.unsubscribe.timeout.ms"; + // Cluster specific configs for controller public static final String CONTROLLER_NAME = "controller.name";