Skip to content

Commit

Permalink
[changelog][server] Fix incoming write schema id (#735)
Browse files Browse the repository at this point in the history
* [changelog][server] Fix incoming write schema id

This is to fix a WC/schema evolution bug.  Tests to follow.  But the changecapture should contain the schema of the merge result which should be the superset schema not the write compute one.
  • Loading branch information
ZacAttack authored Nov 6, 2023
1 parent d03f184 commit 353a18a
Show file tree
Hide file tree
Showing 6 changed files with 154 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -609,18 +609,6 @@ protected <T> T bufferAndAssembleRecordChangeEvent(
null),
pubSubTopicPartition,
readerSchemaId);

// assembledRecord = chunkingAdapter.get(
// inMemoryStorageEngine,
// pubSubTopicPartition.getPartitionNumber(),
// ByteBuffer.wrap(keyBytes),
// false,
// null,
// null,
// null,
// readerSchemaId,
// deserializerCache,
// compressor, null);
} catch (Exception ex) {
// We might get an exception if we haven't persisted all the chunks for a given key. This
// can actually happen if the client seeks to the middle of a chunked record either by
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.linkedin.davinci.storage.chunking.RawBytesChunkingAdapter;
import com.linkedin.davinci.storage.chunking.SingleGetChunkingAdapter;
import com.linkedin.davinci.store.cache.backend.ObjectCacheBackend;
import com.linkedin.davinci.store.record.ByteBufferValueRecord;
import com.linkedin.davinci.store.record.ValueRecord;
import com.linkedin.davinci.store.view.VeniceViewWriter;
import com.linkedin.davinci.utils.ByteArrayKey;
Expand Down Expand Up @@ -409,7 +410,7 @@ protected void processMessageAndMaybeProduceToKafka(
consumerTaskId + " : Invalid/Unrecognized operation type submitted: " + kafkaValue.messageType);
}
final ChunkedValueManifestContainer valueManifestContainer = new ChunkedValueManifestContainer();
Lazy<ByteBuffer> oldValueProvider = Lazy.of(
Lazy<ByteBufferValueRecord<ByteBuffer>> oldValueProvider = Lazy.of(
() -> getValueBytesForKey(
partitionConsumptionState,
keyBytes,
Expand All @@ -435,7 +436,7 @@ protected void processMessageAndMaybeProduceToKafka(
switch (msgType) {
case PUT:
mergeConflictResult = mergeConflictResolver.put(
oldValueProvider,
Lazy.of(() -> oldValueProvider.get().value()),
rmdWithValueSchemaID,
((Put) kafkaValue.payloadUnion).putValue,
writeTimestamp,
Expand All @@ -450,7 +451,7 @@ protected void processMessageAndMaybeProduceToKafka(

case DELETE:
mergeConflictResult = mergeConflictResolver.delete(
oldValueProvider,
Lazy.of(() -> oldValueProvider.get().value()),
rmdWithValueSchemaID,
writeTimestamp,
sourceOffset,
Expand All @@ -460,7 +461,7 @@ protected void processMessageAndMaybeProduceToKafka(

case UPDATE:
mergeConflictResult = mergeConflictResolver.update(
oldValueProvider,
Lazy.of(() -> oldValueProvider.get().value()),
rmdWithValueSchemaID,
((Update) kafkaValue.payloadUnion).updateValue,
incomingValueSchemaId,
Expand Down Expand Up @@ -495,8 +496,6 @@ protected void processMessageAndMaybeProduceToKafka(
// only extension of IngestionTask which does a read from disk before applying the record. This makes the
// following function
// call in this context much less obtrusive, however, it implies that all views can only work for AA stores
int valueSchemaId =
rmdWithValueSchemaID != null ? rmdWithValueSchemaID.getValueSchemaId() : incomingValueSchemaId;

// Write to views
if (this.viewWriters.size() > 0) {
Expand All @@ -516,11 +515,11 @@ protected void processMessageAndMaybeProduceToKafka(
for (VeniceViewWriter writer: viewWriters.values()) {
viewWriterFutures[index++] = writer.processRecord(
mergeConflictResult.getNewValue(),
oldValueProvider.get(),
oldValueProvider.get().value(),
keyBytes,
versionNumber,
incomingValueSchemaId,
valueSchemaId,
mergeConflictResult.getValueSchemaId(),
oldValueProvider.get().writerSchemaId(),
mergeConflictResult.getRmdRecord());
}
CompletableFuture.allOf(viewWriterFutures).whenCompleteAsync((value, exception) -> {
Expand Down Expand Up @@ -616,13 +615,13 @@ private void validatePostOperationResultsAndRecord(
* @param topicPartition The {@link PubSubTopicPartition} from which the incoming record was consumed
* @return
*/
private ByteBuffer getValueBytesForKey(
private ByteBufferValueRecord<ByteBuffer> getValueBytesForKey(
PartitionConsumptionState partitionConsumptionState,
byte[] key,
PubSubTopicPartition topicPartition,
ChunkedValueManifestContainer valueManifestContainer,
long currentTimeForMetricsMs) {
ByteBuffer originalValue = null;
ByteBufferValueRecord<ByteBuffer> originalValue = null;
// Find the existing value. If a value for this key is found from the transient map then use that value, otherwise
// get it from DB.
PartitionConsumptionState.TransientRecord transientRecord = partitionConsumptionState.getTransientRecord(key);
Expand All @@ -631,16 +630,13 @@ private ByteBuffer getValueBytesForKey(
ReusableObjects reusableObjects = threadLocalReusableObjects.get();
ByteBuffer reusedRawValue = reusableObjects.reusedByteBuffer;
BinaryDecoder binaryDecoder = reusableObjects.binaryDecoder;

originalValue = RawBytesChunkingAdapter.INSTANCE.get(
originalValue = RawBytesChunkingAdapter.INSTANCE.getWithSchemaId(
storageEngine,
getSubPartitionId(key, topicPartition),
ByteBuffer.wrap(key),
isChunked,
reusedRawValue,
binaryDecoder,
null,
schemaRepository.getSupersetOrLatestValueSchema(storeName).getId(),
RawBytesStoreDeserializerCache.getInstance(),
compressor.get(),
valueManifestContainer);
Expand All @@ -654,7 +650,9 @@ private ByteBuffer getValueBytesForKey(
if (valueManifestContainer != null) {
valueManifestContainer.setManifest(transientRecord.getValueManifest());
}
originalValue = getCurrentValueFromTransientRecord(transientRecord);
originalValue = new ByteBufferValueRecord<>(
getCurrentValueFromTransientRecord(transientRecord),
transientRecord.getValueSchemaId());
}
}
return originalValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.linkedin.davinci.listener.response.ReadResponse;
import com.linkedin.davinci.store.AbstractStorageEngine;
import com.linkedin.davinci.store.record.ByteBufferValueRecord;
import com.linkedin.davinci.store.record.ValueRecord;
import com.linkedin.venice.client.store.streaming.StreamingCallback;
import com.linkedin.venice.compression.CompressionStrategy;
Expand Down Expand Up @@ -127,6 +128,32 @@ public T get(
manifestContainer);
}

public ByteBufferValueRecord<T> getWithSchemaId(
AbstractStorageEngine store,
int partition,
ByteBuffer key,
boolean isChunked,
T reusedValue,
BinaryDecoder reusedDecoder,
StoreDeserializerCache<T> storeDeserializerCache,
VeniceCompressor compressor,
ChunkedValueManifestContainer manifestContainer) {
if (isChunked) {
key = ChunkingUtils.KEY_WITH_CHUNKING_SUFFIX_SERIALIZER.serializeNonChunkedKey(key);
}
return ChunkingUtils.getValueAndSchemaIdFromStorage(
this,
store,
partition,
key,
reusedValue,
reusedDecoder,
storeDeserializerCache,
compressor,
false,
manifestContainer);
}

public T get(
AbstractStorageEngine store,
int partition,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.linkedin.davinci.callback.BytesStreamingCallback;
import com.linkedin.davinci.listener.response.ReadResponse;
import com.linkedin.davinci.store.AbstractStorageEngine;
import com.linkedin.davinci.store.record.ByteBufferValueRecord;
import com.linkedin.davinci.store.record.ValueRecord;
import com.linkedin.venice.client.store.streaming.StreamingCallback;
import com.linkedin.venice.compression.VeniceCompressor;
Expand Down Expand Up @@ -242,6 +243,39 @@ static <VALUE, CHUNKS_CONTAINER> VALUE getFromStorage(
manifestContainer);
}

static <VALUE, CHUNKS_CONTAINER> ByteBufferValueRecord<VALUE> getValueAndSchemaIdFromStorage(
ChunkingAdapter<CHUNKS_CONTAINER, VALUE> adapter,
AbstractStorageEngine store,
int partition,
ByteBuffer keyBuffer,
VALUE reusedValue,
BinaryDecoder reusedDecoder,
StoreDeserializerCache<VALUE> storeDeserializerCache,
VeniceCompressor compressor,
boolean isRmdValue,
ChunkedValueManifestContainer manifestContainer) {
byte[] value =
isRmdValue ? store.getReplicationMetadata(partition, keyBuffer.array()) : store.get(partition, keyBuffer);
int writerSchemaId = value == null ? 0 : ValueRecord.parseSchemaId(value);
return new ByteBufferValueRecord<>(
getFromStorage(
value,
(value == null ? 0 : value.length),
0,
adapter,
store,
partition,
null,
reusedValue,
reusedDecoder,
-1,
storeDeserializerCache,
compressor,
isRmdValue,
manifestContainer),
writerSchemaId);
}

public static ChunkedValueManifest getChunkValueManifestFromStorage(
byte[] key,
int partition,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.linkedin.davinci.store.record;

/**
* This class encapsulates a value from venice storage accompanied by the schema
* id that was used to serialize the value.
*
* TODO: This class should probably be superseded by {@link ValueRecord}. Unfortunately,
* MANY interfaces in the ingestion path rely on the Bytebuffer interface, where ValueRecord relies on ByteBuf. Until
* we rectify that, this is our stand in.
*/
public final class ByteBufferValueRecord<T> {
private final T value;
private final int writerSchemaId;

/**
*/
public ByteBufferValueRecord(T value, int writerSchemaId) {
this.value = value;
this.writerSchemaId = writerSchemaId;
}

public T value() {
return value;
}

public int writerSchemaId() {
return writerSchemaId;
}

@Override
public String toString() {
return "ByteBufferValueRecord[" + "value=" + value + ", " + "writerSchemaId=" + writerSchemaId + ']';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.linkedin.davinci.compression.StorageEngineBackedCompressorFactory;
import com.linkedin.davinci.storage.StorageMetadataService;
import com.linkedin.davinci.store.AbstractStorageEngine;
import com.linkedin.davinci.store.record.ByteBufferValueRecord;
import com.linkedin.davinci.store.record.ValueRecord;
import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.compression.VeniceCompressor;
Expand Down Expand Up @@ -159,7 +160,8 @@ private void runTest(
GenericRecord record,
AbstractAvroChunkingAdapter chunkingAdapter,
boolean rawBytesStoreDeserializerCache,
Function<Object, Void> assertions) {
Function<Object, Void> assertions,
boolean getWithSchemaId) {
int partition = 9;
String storeName = "test";
byte[] keyBytes = ByteUtils.fromHexString("040647454ff4baf2630a5449544c45440010494d504c49434954");
Expand Down Expand Up @@ -220,19 +222,33 @@ private void runTest(
new StorageEngineBackedCompressorFactory(mock(StorageMetadataService.class))) {
VeniceCompressor compressor =
compressorFactory.getCompressor(CompressionStrategy.NO_OP, storageEngine.getStoreName());
assertions.apply(
chunkingAdapter.get(
storageEngine,
partition,
ByteBuffer.wrap(keyBytes),
true,
null,
null,
null,
readerSchemaId,
storeDeserializerCache,
compressor,
null));
Object retrievedObject;
if (getWithSchemaId) {
retrievedObject = chunkingAdapter.getWithSchemaId(
storageEngine,
partition,
ByteBuffer.wrap(keyBytes),
true,
null,
null,
storeDeserializerCache,
compressor,
null);
} else {
retrievedObject = chunkingAdapter.get(
storageEngine,
partition,
ByteBuffer.wrap(keyBytes),
true,
null,
null,
null,
readerSchemaId,
storeDeserializerCache,
compressor,
null);
}
assertions.apply(retrievedObject);
}
}

Expand All @@ -242,7 +258,7 @@ public void testGenericRecordChunkingAdapter(GenericRecord record) {
Assert.assertTrue(valueFromStorageEngine instanceof GenericRecord);
Assert.assertEquals(valueFromStorageEngine, record);
return null;
});
}, false);
}

@Test(dataProvider = "recordProvider")
Expand All @@ -253,6 +269,19 @@ public void testRawBytesChunkingAdapter(GenericRecord record) {
Assert.assertTrue(valueFromStorageEngine instanceof ByteBuffer);
Assert.assertEquals(ByteUtils.extractByteArray((ByteBuffer) valueFromStorageEngine), serializedRecord);
return null;
});
}, false);
}

@Test(dataProvider = "recordProvider")
public void testRawBytesChunkingAdapterWithSchemaId(GenericRecord record) {
byte[] serializedRecord =
SerializerDeserializerFactory.getAvroGenericSerializer(record.getSchema()).serialize(record);
runTest(record, RawBytesChunkingAdapter.INSTANCE, true, (valueFromStorageEngine) -> {
Assert.assertTrue(valueFromStorageEngine instanceof ByteBufferValueRecord<?>);
Object value = ((ByteBufferValueRecord) valueFromStorageEngine).value();
Assert.assertTrue(value instanceof ByteBuffer);
Assert.assertEquals(ByteUtils.extractByteArray((ByteBuffer) value), serializedRecord);
return null;
}, true);
}
}

0 comments on commit 353a18a

Please sign in to comment.