Skip to content

Commit

Permalink
[server] Fixed NPE in recent AASIT refactoring (linkedin#765)
Browse files Browse the repository at this point in the history
This fixes a regression introduced in PR linkedin#735.
  • Loading branch information
FelixGV authored and m-nagarajan committed Nov 28, 2023
1 parent c139a39 commit 70c5f22
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -433,10 +433,12 @@ protected void processMessageAndMaybeProduceToKafka(

aggVersionedIngestionStats.recordTotalDCR(storeName, versionNumber);

Lazy<ByteBuffer> oldValueByteBufferProvider = unwrapByteBufferFromOldValueProvider(oldValueProvider);

switch (msgType) {
case PUT:
mergeConflictResult = mergeConflictResolver.put(
Lazy.of(() -> oldValueProvider.get().value()),
unwrapByteBufferFromOldValueProvider(oldValueProvider),
rmdWithValueSchemaID,
((Put) kafkaValue.payloadUnion).putValue,
writeTimestamp,
Expand All @@ -451,7 +453,7 @@ protected void processMessageAndMaybeProduceToKafka(

case DELETE:
mergeConflictResult = mergeConflictResolver.delete(
Lazy.of(() -> oldValueProvider.get().value()),
oldValueByteBufferProvider,
rmdWithValueSchemaID,
writeTimestamp,
sourceOffset,
Expand All @@ -461,7 +463,7 @@ protected void processMessageAndMaybeProduceToKafka(

case UPDATE:
mergeConflictResult = mergeConflictResolver.update(
Lazy.of(() -> oldValueProvider.get().value()),
oldValueByteBufferProvider,
rmdWithValueSchemaID,
((Update) kafkaValue.payloadUnion).updateValue,
incomingValueSchemaId,
Expand Down Expand Up @@ -512,14 +514,16 @@ protected void processMessageAndMaybeProduceToKafka(
int index = 0;
// The first future is for the previous write to VT
viewWriterFutures[index++] = partitionConsumptionState.getLastVTProduceCallFuture();
ByteBuffer oldValueBB = oldValueByteBufferProvider.get();
int oldValueSchemaId = oldValueBB == null ? -1 : oldValueProvider.get().writerSchemaId();
for (VeniceViewWriter writer: viewWriters.values()) {
viewWriterFutures[index++] = writer.processRecord(
mergeConflictResult.getNewValue(),
oldValueProvider.get().value(),
oldValueBB,
keyBytes,
versionNumber,
mergeConflictResult.getValueSchemaId(),
oldValueProvider.get().writerSchemaId(),
oldValueSchemaId,
mergeConflictResult.getRmdRecord());
}
CompletableFuture.allOf(viewWriterFutures).whenCompleteAsync((value, exception) -> {
Expand Down Expand Up @@ -561,7 +565,17 @@ protected void processMessageAndMaybeProduceToKafka(
rmdWithValueSchemaID == null ? null : rmdWithValueSchemaID.getRmdManifest());
}
}
}

/**
* Package private for testing purposes.
*/
static Lazy<ByteBuffer> unwrapByteBufferFromOldValueProvider(
Lazy<ByteBufferValueRecord<ByteBuffer>> oldValueProvider) {
return Lazy.of(() -> {
ByteBufferValueRecord<ByteBuffer> bbValueRecord = oldValueProvider.get();
return bbValueRecord == null ? null : bbValueRecord.value();
});
}

private long getWriteTimestampFromKME(KafkaMessageEnvelope kme) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;

import com.github.luben.zstd.Zstd;
import com.linkedin.davinci.config.VeniceServerConfig;
Expand All @@ -28,6 +30,7 @@
import com.linkedin.davinci.storage.chunking.ChunkingUtils;
import com.linkedin.davinci.store.AbstractStorageEngine;
import com.linkedin.davinci.store.blackhole.BlackHoleStorageEngine;
import com.linkedin.davinci.store.record.ByteBufferValueRecord;
import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.compression.CompressorFactory;
import com.linkedin.venice.compression.NoopCompressor;
Expand Down Expand Up @@ -526,6 +529,18 @@ public void testReadingChunkedRmdFromStorage() {
Assert.assertEquals(result3, expectedChunkedValue2);
}

@Test
public void testUnwrapByteBufferFromOldValueProvider() {
Lazy<ByteBuffer> lazyBB = ActiveActiveStoreIngestionTask.unwrapByteBufferFromOldValueProvider(Lazy.of(() -> null));
assertNotNull(lazyBB);
assertNull(lazyBB.get());

lazyBB = ActiveActiveStoreIngestionTask.unwrapByteBufferFromOldValueProvider(
Lazy.of(() -> new ByteBufferValueRecord<>(ByteBuffer.wrap(new byte[1]), 1)));
assertNotNull(lazyBB);
assertNotNull(lazyBB.get());
}

private VeniceCompressor getCompressor(CompressionStrategy strategy) {
if (Objects.requireNonNull(strategy) == CompressionStrategy.ZSTD_WITH_DICT) {
byte[] dictionary = ZstdWithDictCompressor.buildDictionaryOnSyntheticAvroData();
Expand Down

0 comments on commit 70c5f22

Please sign in to comment.