Skip to content

Commit

Permalink
Changing ingestion consumption check avoiding race condition. (linked…
Browse files Browse the repository at this point in the history
…in#635)

Previously, when OFFLINE => DROP transition happen, we will check if we can remove the partition from disk only based on partition consumption state exist or not. However, it is also possible that there are some pending state transitions in consume action queue of the StoreIngestionTask. If we drop the partition and there is a pending SUBSCRIBE action in consumption queue, then it will cause data persistence failure due to partition not existed in disk. To avoid this, we add pending queue checking for the partition about to be deleted during the partition consumption check.

Co-authored-by: Hao Xu <[email protected]>
  • Loading branch information
haoxu07 and Hao Xu authored Oct 12, 2023
1 parent e4d7a79 commit 94c59de
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1075,7 +1075,8 @@ public boolean containsRunningConsumption(String topic) {
public boolean isPartitionConsuming(String topic, int partitionId) {
try (AutoCloseableLock ignore = topicLockManager.getLockForResource(topic)) {
StoreIngestionTask ingestionTask = topicNameToIngestionTaskMap.get(topic);
return ingestionTask != null && ingestionTask.isRunning() && ingestionTask.isPartitionConsuming(partitionId);
return ingestionTask != null && ingestionTask.isRunning()
&& ingestionTask.isPartitionConsumingOrHasPendingIngestionAction(partitionId);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1776,7 +1776,6 @@ protected void processCommonConsumerAction(
} else {
LOGGER.info("{} Unsubscribed to: {}", consumerTaskId, topicPartition);
}

break;
case RESET_OFFSET:
resetOffset(partition, topicPartition, false);
Expand Down Expand Up @@ -3322,8 +3321,11 @@ public void disableMetricsEmission() {
/**
* To check whether the given partition is still consuming message from Kafka
*/
public boolean isPartitionConsuming(int userPartition) {
return amplificationFactorAdapter.meetsAny(userPartition, partitionConsumptionStateMap::containsKey);
public boolean isPartitionConsumingOrHasPendingIngestionAction(int userPartition) {
boolean subPartitionConsumptionStateExist =
amplificationFactorAdapter.meetsAny(userPartition, partitionConsumptionStateMap::containsKey);
boolean pendingPartitionIngestionAction = hasPendingPartitionIngestionAction(userPartition);
return pendingPartitionIngestionAction || subPartitionConsumptionStateExist;
}

/**
Expand Down

0 comments on commit 94c59de

Please sign in to comment.