diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java index 597cc69424..96812de7f7 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java @@ -1075,7 +1075,8 @@ public boolean containsRunningConsumption(String topic) { public boolean isPartitionConsuming(String topic, int partitionId) { try (AutoCloseableLock ignore = topicLockManager.getLockForResource(topic)) { StoreIngestionTask ingestionTask = topicNameToIngestionTaskMap.get(topic); - return ingestionTask != null && ingestionTask.isRunning() && ingestionTask.isPartitionConsuming(partitionId); + return ingestionTask != null && ingestionTask.isRunning() + && ingestionTask.isPartitionConsumingOrHasPendingIngestionAction(partitionId); } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index e5001ca11b..dfd976b203 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -1776,7 +1776,6 @@ protected void processCommonConsumerAction( } else { LOGGER.info("{} Unsubscribed to: {}", consumerTaskId, topicPartition); } - break; case RESET_OFFSET: resetOffset(partition, topicPartition, false); @@ -3322,8 +3321,11 @@ public void disableMetricsEmission() { /** * To check whether the given partition is still consuming message from Kafka */ - public boolean isPartitionConsuming(int userPartition) { - return amplificationFactorAdapter.meetsAny(userPartition, partitionConsumptionStateMap::containsKey); + public boolean isPartitionConsumingOrHasPendingIngestionAction(int userPartition) { + boolean subPartitionConsumptionStateExist = + amplificationFactorAdapter.meetsAny(userPartition, partitionConsumptionStateMap::containsKey); + boolean pendingPartitionIngestionAction = hasPendingPartitionIngestionAction(userPartition); + return pendingPartitionIngestionAction || subPartitionConsumptionStateExist; } /**