From a4b050c1e39634f30d4a46310510e1368dc3e34e Mon Sep 17 00:00:00 2001 From: Kai-Sern Lim Date: Thu, 17 Oct 2024 15:08:59 -0700 Subject: [PATCH] =?UTF-8?q?Using=20`AutoCloseableLock`=20instead=20of=20ma?= =?UTF-8?q?nually=20using=20`lock()`=20and=20`unlock()`,=20which=20can=20b?= =?UTF-8?q?e=20error-prone.=20=F0=9F=92=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../consumer/PartitionConsumptionState.java | 36 +++--- .../kafka/consumer/StoreIngestionTask.java | 116 +++++++++--------- 2 files changed, 78 insertions(+), 74 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java index a8ab3b51dd..eb5a3365f3 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java @@ -16,6 +16,8 @@ import com.linkedin.venice.storage.protocol.ChunkedValueManifest; import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; import com.linkedin.venice.utils.lazy.Lazy; +import com.linkedin.venice.utils.locks.AutoCloseableLock; +import com.linkedin.venice.utils.locks.AutoCloseableSingleLock; import com.linkedin.venice.writer.LeaderCompleteState; import com.linkedin.venice.writer.VeniceWriter; import java.nio.ByteBuffer; @@ -70,31 +72,29 @@ public class PartitionConsumptionState { * lengthy acquisition of the lock. */ final class LeaderFollowerState { - LeaderFollowerStateType state; - final ReadWriteLock rwLock; + private LeaderFollowerStateType state; + private final ReadWriteLock rwLock; LeaderFollowerState() { this.state = LeaderFollowerStateType.STANDBY; this.rwLock = new ReentrantReadWriteLock(); } - void set(LeaderFollowerStateType state) { - rwLock.writeLock().lock(); - try { + void setState(LeaderFollowerStateType state) { + try (AutoCloseableLock ignore = AutoCloseableSingleLock.of(rwLock.writeLock())) { this.state = state; - } finally { - rwLock.writeLock().unlock(); } } - LeaderFollowerStateType get() { - rwLock.readLock().lock(); - try { + LeaderFollowerStateType getState() { + try (AutoCloseableLock ignore = AutoCloseableSingleLock.of(rwLock.readLock())) { return this.state; - } finally { - rwLock.readLock().unlock(); } } + + ReadWriteLock getLock() { + return rwLock; + } } private final LeaderFollowerState leaderFollowerState; @@ -428,7 +428,7 @@ public String toString() { .append(", processedRecordSizeSinceLastSync=") .append(processedRecordSizeSinceLastSync) .append(", leaderFollowerState=") - .append(leaderFollowerState.get()) + .append(leaderFollowerState.getState()) .append("}") .toString(); } @@ -446,15 +446,15 @@ public void resetProcessedRecordSizeSinceLastSync() { } public void setLeaderFollowerState(LeaderFollowerStateType state) { - this.leaderFollowerState.set(state); + this.leaderFollowerState.setState(state); } - public final LeaderFollowerStateType getLeaderFollowerState() { - return this.leaderFollowerState.get(); + public LeaderFollowerStateType getLeaderFollowerState() { + return this.leaderFollowerState.getState(); } - public final ReadWriteLock getLeaderFollowerStateLock() { - return this.leaderFollowerState.rwLock; + public ReadWriteLock getLeaderFollowerStateLock() { + return this.leaderFollowerState.getLock(); } public void setLastLeaderPersistFuture(Future future) { diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index 5cb4a87393..a1cccbe481 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -112,6 +112,8 @@ import com.linkedin.venice.utils.VeniceProperties; import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; import com.linkedin.venice.utils.lazy.Lazy; +import com.linkedin.venice.utils.locks.AutoCloseableLock; +import com.linkedin.venice.utils.locks.AutoCloseableSingleLock; import it.unimi.dsi.fastutil.objects.Object2IntMap; import java.io.Closeable; import java.io.IOException; @@ -143,6 +145,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.BooleanSupplier; import java.util.function.Consumer; @@ -1180,8 +1183,8 @@ protected void produceToStoreBufferServiceOrKafka( * which this message is processed. This lock protects against any state transitions where the SIT needs to * acquire the write lock to modify the leader-follower state, since it would need to wait for this to finish. */ - partitionConsumptionState.getLeaderFollowerStateLock().readLock().lock(); - try { + final ReadWriteLock leaderFollowerStateLock = partitionConsumptionState.getLeaderFollowerStateLock(); + try (AutoCloseableLock ignore = AutoCloseableSingleLock.of(leaderFollowerStateLock.readLock())) { if (!shouldProcessRecord(record)) { partitionConsumptionState.updateLatestIgnoredUpstreamRTOffset(kafkaUrl, record.getOffset()); return; @@ -1200,8 +1203,6 @@ protected void produceToStoreBufferServiceOrKafka( beforeProcessingBatchRecordsTimestampMs, metricsEnabled, elapsedTimeForPuttingIntoQueue); - } finally { - partitionConsumptionState.getLeaderFollowerStateLock().readLock().unlock(); } } @@ -1230,6 +1231,12 @@ protected void produceToStoreBufferServiceOrKafkaInBatch( PartitionConsumptionState partitionConsumptionState, String kafkaUrl, int kafkaClusterId) throws InterruptedException { + if (partitionConsumptionState == null) { + throw new VeniceException( + "PartitionConsumptionState should present for store version: " + kafkaVersionTopic + ", partition: " + + topicPartition.getPartitionNumber()); + } + long totalBytesRead = 0; ValueHolder elapsedTimeForPuttingIntoQueue = new ValueHolder<>(0d); boolean metricsEnabled = emitMetrics.get(); @@ -1241,18 +1248,19 @@ protected void produceToStoreBufferServiceOrKafkaInBatch( List>> batches = new ArrayList<>(); List> ongoingBatch = new ArrayList<>(batchSize); Iterator> iter = records.iterator(); - while (iter.hasNext()) { - PubSubMessage record = iter.next(); - if (partitionConsumptionState != null) { + + /* + * Acquire the read lock to ensure that the result of shouldProcessRecord() holds for the entire duration for + * which this message is processed. This lock protects against any state transitions where the SIT needs to + * acquire the write lock to modify the leader-follower state, since it would need to wait for this to finish. + */ + final ReadWriteLock leaderFollowerStateLock = partitionConsumptionState.getLeaderFollowerStateLock(); + try (AutoCloseableLock ignore = AutoCloseableSingleLock.of(leaderFollowerStateLock.readLock())) { + while (iter.hasNext()) { + PubSubMessage record = iter.next(); partitionConsumptionState.setLatestPolledMessageTimestampInMs(beforeProcessingBatchRecordsTimestampMs); - partitionConsumptionState.getLeaderFollowerStateLock().readLock().lock(); - // see comment in produceToStoreBufferServiceOrKafka about why the lock is needed here - } - try { if (!shouldProcessRecord(record)) { - if (partitionConsumptionState != null) { - partitionConsumptionState.updateLatestIgnoredUpstreamRTOffset(kafkaUrl, record.getOffset()); - } + partitionConsumptionState.updateLatestIgnoredUpstreamRTOffset(kafkaUrl, record.getOffset()); continue; } waitReadyToProcessRecord(record); @@ -1261,54 +1269,50 @@ protected void produceToStoreBufferServiceOrKafkaInBatch( batches.add(ongoingBatch); ongoingBatch = new ArrayList<>(batchSize); } - } finally { - if (partitionConsumptionState != null) { - partitionConsumptionState.getLeaderFollowerStateLock().readLock().unlock(); - } } - } - if (!ongoingBatch.isEmpty()) { - batches.add(ongoingBatch); - } - if (batches.isEmpty()) { - return; - } - IngestionBatchProcessor ingestionBatchProcessor = getIngestionBatchProcessor(); - if (ingestionBatchProcessor == null) { - throw new VeniceException( - "IngestionBatchProcessor object should present for store version: " + kafkaVersionTopic); - } - /** - * Process records batch by batch. - */ - for (List> batch: batches) { - NavigableMap keyLockMap = ingestionBatchProcessor.lockKeys(batch); - try { - long beforeProcessingPerRecordTimestampNs = System.nanoTime(); - List> processedResults = - ingestionBatchProcessor.process( - batch, + if (!ongoingBatch.isEmpty()) { + batches.add(ongoingBatch); + } + if (batches.isEmpty()) { + return; + } + IngestionBatchProcessor ingestionBatchProcessor = getIngestionBatchProcessor(); + if (ingestionBatchProcessor == null) { + throw new VeniceException( + "IngestionBatchProcessor object should present for store version: " + kafkaVersionTopic); + } + /** + * Process records batch by batch. + */ + for (List> batch: batches) { + NavigableMap keyLockMap = ingestionBatchProcessor.lockKeys(batch); + try { + long beforeProcessingPerRecordTimestampNs = System.nanoTime(); + List> processedResults = + ingestionBatchProcessor.process( + batch, + partitionConsumptionState, + topicPartition.getPartitionNumber(), + kafkaUrl, + kafkaClusterId, + beforeProcessingPerRecordTimestampNs, + beforeProcessingBatchRecordsTimestampMs); + + for (PubSubMessageProcessedResultWrapper processedRecord: processedResults) { + totalBytesRead += handleSingleMessage( + processedRecord, + topicPartition, partitionConsumptionState, - topicPartition.getPartitionNumber(), kafkaUrl, kafkaClusterId, beforeProcessingPerRecordTimestampNs, - beforeProcessingBatchRecordsTimestampMs); - - for (PubSubMessageProcessedResultWrapper processedRecord: processedResults) { - totalBytesRead += handleSingleMessage( - processedRecord, - topicPartition, - partitionConsumptionState, - kafkaUrl, - kafkaClusterId, - beforeProcessingPerRecordTimestampNs, - beforeProcessingBatchRecordsTimestampMs, - metricsEnabled, - elapsedTimeForPuttingIntoQueue); + beforeProcessingBatchRecordsTimestampMs, + metricsEnabled, + elapsedTimeForPuttingIntoQueue); + } + } finally { + ingestionBatchProcessor.unlockKeys(keyLockMap); } - } finally { - ingestionBatchProcessor.unlockKeys(keyLockMap); } }