Skip to content

Commit

Permalink
Using AutoCloseableLock instead of manually using lock() and `unl…
Browse files Browse the repository at this point in the history
…ock()`, which can be error-prone. 💹
  • Loading branch information
KaiSernLim committed Oct 31, 2024
1 parent e20d24c commit a4b050c
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import com.linkedin.venice.storage.protocol.ChunkedValueManifest;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import com.linkedin.venice.utils.lazy.Lazy;
import com.linkedin.venice.utils.locks.AutoCloseableLock;
import com.linkedin.venice.utils.locks.AutoCloseableSingleLock;
import com.linkedin.venice.writer.LeaderCompleteState;
import com.linkedin.venice.writer.VeniceWriter;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -70,31 +72,29 @@ public class PartitionConsumptionState {
* lengthy acquisition of the lock.
*/
final class LeaderFollowerState {
LeaderFollowerStateType state;
final ReadWriteLock rwLock;
private LeaderFollowerStateType state;
private final ReadWriteLock rwLock;

LeaderFollowerState() {
this.state = LeaderFollowerStateType.STANDBY;
this.rwLock = new ReentrantReadWriteLock();
}

void set(LeaderFollowerStateType state) {
rwLock.writeLock().lock();
try {
void setState(LeaderFollowerStateType state) {
try (AutoCloseableLock ignore = AutoCloseableSingleLock.of(rwLock.writeLock())) {
this.state = state;
} finally {
rwLock.writeLock().unlock();
}
}

LeaderFollowerStateType get() {
rwLock.readLock().lock();
try {
LeaderFollowerStateType getState() {
try (AutoCloseableLock ignore = AutoCloseableSingleLock.of(rwLock.readLock())) {
return this.state;
} finally {
rwLock.readLock().unlock();
}
}

ReadWriteLock getLock() {
return rwLock;
}
}

private final LeaderFollowerState leaderFollowerState;
Expand Down Expand Up @@ -428,7 +428,7 @@ public String toString() {
.append(", processedRecordSizeSinceLastSync=")
.append(processedRecordSizeSinceLastSync)
.append(", leaderFollowerState=")
.append(leaderFollowerState.get())
.append(leaderFollowerState.getState())
.append("}")
.toString();
}
Expand All @@ -446,15 +446,15 @@ public void resetProcessedRecordSizeSinceLastSync() {
}

public void setLeaderFollowerState(LeaderFollowerStateType state) {
this.leaderFollowerState.set(state);
this.leaderFollowerState.setState(state);
}

public final LeaderFollowerStateType getLeaderFollowerState() {
return this.leaderFollowerState.get();
public LeaderFollowerStateType getLeaderFollowerState() {
return this.leaderFollowerState.getState();
}

public final ReadWriteLock getLeaderFollowerStateLock() {
return this.leaderFollowerState.rwLock;
public ReadWriteLock getLeaderFollowerStateLock() {
return this.leaderFollowerState.getLock();
}

public void setLastLeaderPersistFuture(Future<Void> future) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import com.linkedin.venice.utils.lazy.Lazy;
import com.linkedin.venice.utils.locks.AutoCloseableLock;
import com.linkedin.venice.utils.locks.AutoCloseableSingleLock;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import java.io.Closeable;
import java.io.IOException;
Expand Down Expand Up @@ -143,6 +145,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
Expand Down Expand Up @@ -1180,8 +1183,8 @@ protected void produceToStoreBufferServiceOrKafka(
* which this message is processed. This lock protects against any state transitions where the SIT needs to
* acquire the write lock to modify the leader-follower state, since it would need to wait for this to finish.
*/
partitionConsumptionState.getLeaderFollowerStateLock().readLock().lock();
try {
final ReadWriteLock leaderFollowerStateLock = partitionConsumptionState.getLeaderFollowerStateLock();
try (AutoCloseableLock ignore = AutoCloseableSingleLock.of(leaderFollowerStateLock.readLock())) {
if (!shouldProcessRecord(record)) {
partitionConsumptionState.updateLatestIgnoredUpstreamRTOffset(kafkaUrl, record.getOffset());
return;
Expand All @@ -1200,8 +1203,6 @@ protected void produceToStoreBufferServiceOrKafka(
beforeProcessingBatchRecordsTimestampMs,
metricsEnabled,
elapsedTimeForPuttingIntoQueue);
} finally {
partitionConsumptionState.getLeaderFollowerStateLock().readLock().unlock();
}
}

Expand Down Expand Up @@ -1230,6 +1231,12 @@ protected void produceToStoreBufferServiceOrKafkaInBatch(
PartitionConsumptionState partitionConsumptionState,
String kafkaUrl,
int kafkaClusterId) throws InterruptedException {
if (partitionConsumptionState == null) {
throw new VeniceException(
"PartitionConsumptionState should present for store version: " + kafkaVersionTopic + ", partition: "
+ topicPartition.getPartitionNumber());
}

long totalBytesRead = 0;
ValueHolder<Double> elapsedTimeForPuttingIntoQueue = new ValueHolder<>(0d);
boolean metricsEnabled = emitMetrics.get();
Expand All @@ -1241,18 +1248,19 @@ protected void produceToStoreBufferServiceOrKafkaInBatch(
List<List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>>> batches = new ArrayList<>();
List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>> ongoingBatch = new ArrayList<>(batchSize);
Iterator<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>> iter = records.iterator();
while (iter.hasNext()) {
PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> record = iter.next();
if (partitionConsumptionState != null) {

/*
* Acquire the read lock to ensure that the result of shouldProcessRecord() holds for the entire duration for
* which this message is processed. This lock protects against any state transitions where the SIT needs to
* acquire the write lock to modify the leader-follower state, since it would need to wait for this to finish.
*/
final ReadWriteLock leaderFollowerStateLock = partitionConsumptionState.getLeaderFollowerStateLock();
try (AutoCloseableLock ignore = AutoCloseableSingleLock.of(leaderFollowerStateLock.readLock())) {
while (iter.hasNext()) {
PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> record = iter.next();
partitionConsumptionState.setLatestPolledMessageTimestampInMs(beforeProcessingBatchRecordsTimestampMs);
partitionConsumptionState.getLeaderFollowerStateLock().readLock().lock();
// see comment in produceToStoreBufferServiceOrKafka about why the lock is needed here
}
try {
if (!shouldProcessRecord(record)) {
if (partitionConsumptionState != null) {
partitionConsumptionState.updateLatestIgnoredUpstreamRTOffset(kafkaUrl, record.getOffset());
}
partitionConsumptionState.updateLatestIgnoredUpstreamRTOffset(kafkaUrl, record.getOffset());
continue;
}
waitReadyToProcessRecord(record);
Expand All @@ -1261,54 +1269,50 @@ protected void produceToStoreBufferServiceOrKafkaInBatch(
batches.add(ongoingBatch);
ongoingBatch = new ArrayList<>(batchSize);
}
} finally {
if (partitionConsumptionState != null) {
partitionConsumptionState.getLeaderFollowerStateLock().readLock().unlock();
}
}
}
if (!ongoingBatch.isEmpty()) {
batches.add(ongoingBatch);
}
if (batches.isEmpty()) {
return;
}
IngestionBatchProcessor ingestionBatchProcessor = getIngestionBatchProcessor();
if (ingestionBatchProcessor == null) {
throw new VeniceException(
"IngestionBatchProcessor object should present for store version: " + kafkaVersionTopic);
}
/**
* Process records batch by batch.
*/
for (List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>> batch: batches) {
NavigableMap<ByteArrayKey, ReentrantLock> keyLockMap = ingestionBatchProcessor.lockKeys(batch);
try {
long beforeProcessingPerRecordTimestampNs = System.nanoTime();
List<PubSubMessageProcessedResultWrapper<KafkaKey, KafkaMessageEnvelope, Long>> processedResults =
ingestionBatchProcessor.process(
batch,
if (!ongoingBatch.isEmpty()) {
batches.add(ongoingBatch);
}
if (batches.isEmpty()) {
return;
}
IngestionBatchProcessor ingestionBatchProcessor = getIngestionBatchProcessor();
if (ingestionBatchProcessor == null) {
throw new VeniceException(
"IngestionBatchProcessor object should present for store version: " + kafkaVersionTopic);
}
/**
* Process records batch by batch.
*/
for (List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>> batch: batches) {
NavigableMap<ByteArrayKey, ReentrantLock> keyLockMap = ingestionBatchProcessor.lockKeys(batch);
try {
long beforeProcessingPerRecordTimestampNs = System.nanoTime();
List<PubSubMessageProcessedResultWrapper<KafkaKey, KafkaMessageEnvelope, Long>> processedResults =
ingestionBatchProcessor.process(
batch,
partitionConsumptionState,
topicPartition.getPartitionNumber(),
kafkaUrl,
kafkaClusterId,
beforeProcessingPerRecordTimestampNs,
beforeProcessingBatchRecordsTimestampMs);

for (PubSubMessageProcessedResultWrapper<KafkaKey, KafkaMessageEnvelope, Long> processedRecord: processedResults) {
totalBytesRead += handleSingleMessage(
processedRecord,
topicPartition,
partitionConsumptionState,
topicPartition.getPartitionNumber(),
kafkaUrl,
kafkaClusterId,
beforeProcessingPerRecordTimestampNs,
beforeProcessingBatchRecordsTimestampMs);

for (PubSubMessageProcessedResultWrapper<KafkaKey, KafkaMessageEnvelope, Long> processedRecord: processedResults) {
totalBytesRead += handleSingleMessage(
processedRecord,
topicPartition,
partitionConsumptionState,
kafkaUrl,
kafkaClusterId,
beforeProcessingPerRecordTimestampNs,
beforeProcessingBatchRecordsTimestampMs,
metricsEnabled,
elapsedTimeForPuttingIntoQueue);
beforeProcessingBatchRecordsTimestampMs,
metricsEnabled,
elapsedTimeForPuttingIntoQueue);
}
} finally {
ingestionBatchProcessor.unlockKeys(keyLockMap);
}
} finally {
ingestionBatchProcessor.unlockKeys(keyLockMap);
}
}

Expand Down

0 comments on commit a4b050c

Please sign in to comment.