Skip to content

Commit

Permalink
[dvc] Decompress/assemble record before transformation (#818)
Browse files Browse the repository at this point in the history
Events consumed from the version topic may be compressed/chunked. So we need to reassemble them before they're transformed.
  • Loading branch information
kvargha authored Feb 2, 2024
1 parent 314c8dc commit a3ee761
Show file tree
Hide file tree
Showing 7 changed files with 169 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.linkedin.davinci.store.StoragePartitionConfig;
import com.linkedin.davinci.store.cache.backend.ObjectCacheBackend;
import com.linkedin.davinci.store.record.ValueRecord;
import com.linkedin.davinci.utils.ChunkAssembler;
import com.linkedin.davinci.validation.KafkaDataIntegrityValidator;
import com.linkedin.venice.common.VeniceSystemStoreType;
import com.linkedin.venice.common.VeniceSystemStoreUtils;
Expand Down Expand Up @@ -297,6 +298,7 @@ public abstract class StoreIngestionTask implements Runnable, Closeable {

protected final StatusReportAdapter statusReportAdapter;

protected final ChunkAssembler chunkAssembler;
private final Optional<ObjectCacheBackend> cacheBackend;
private final DaVinciRecordTransformer recordTransformer;
private final Runnable runnableForKillIngestionTasksForMissingSOP;
Expand Down Expand Up @@ -435,6 +437,7 @@ public StoreIngestionTask(

this.runnableForKillIngestionTasksForMissingSOP = () -> waitForStateVersion(kafkaVersionTopic);
this.missingSOPCheckExecutor.execute(runnableForKillIngestionTasksForMissingSOP);
this.chunkAssembler = new ChunkAssembler(storeName);
this.cacheBackend = cacheBackend;
this.recordTransformer = recordTransformer;
if (recordTransformer != null) {
Expand Down Expand Up @@ -3104,15 +3107,37 @@ private int processKafkaDataMessage(
// update checksum for this PUT message if needed.
partitionConsumptionState.maybeUpdateExpectedChecksum(keyBytes, put);

// Do transorfmation recompute key, value and partition
// Check if put.getSchemaId is positive, if not default to 1
int putSchemaId = put.getSchemaId() > 0 ? put.getSchemaId() : 1;

// Do transformation recompute key, value and partition
if (recordTransformer != null) {
long recordTransformStartTime = System.currentTimeMillis();
ByteBuffer valueBytes = put.getPutValue();
Schema valueSchema = schemaRepository.getValueSchema(storeName, putSchemaId).getSchema();

// Decompress/assemble record
Object assembledObject = chunkAssembler.bufferAndAssembleRecord(
consumerRecord.getTopicPartition(),
putSchemaId,
keyBytes,
valueBytes,
consumerRecord.getOffset(),
Lazy.of(() -> new AvroGenericDeserializer<>(valueSchema, valueSchema)),
putSchemaId,
compressorFactory.getCompressor(compressionStrategy, kafkaVersionTopic));

// Current record is a chunk. We only write to the storage engine for fully assembled records
if (assembledObject == null) {
return 0;
}

SchemaEntry keySchema = schemaRepository.getKeySchema(storeName);
SchemaEntry valueSchema = schemaRepository.getValueSchema(storeName, put.schemaId);
Lazy<Object> lazyKey = Lazy.of(() -> deserializeAvroObjectAndReturn(ByteBuffer.wrap(keyBytes), keySchema));
Lazy<Object> lazyValue = Lazy.of(() -> deserializeAvroObjectAndReturn(put.putValue, valueSchema));
Lazy<Object> lazyValue = Lazy.of(() -> assembledObject);
TransformedRecord transformedRecord = recordTransformer.put(lazyKey, lazyValue);
ByteBuffer transformedBytes = transformedRecord.getValueBytes(recordTransformer.getValueOutputSchema());

put.putValue = transformedBytes;
versionedIngestionStats.recordTransformerLatency(
storeName,
Expand All @@ -3133,8 +3158,8 @@ private int processKafkaDataMessage(
}
// grab the positive schema id (actual value schema id) to be used in schema warm-up value schema id.
// for hybrid use case in read compute store in future we need revisit this as we can have multiple schemas.
if (put.schemaId > 0) {
valueSchemaId = put.schemaId;
if (putSchemaId > 0) {
valueSchemaId = putSchemaId;
}
break;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,10 @@ public void recordTransformerLatency(double value, long currentTimeMs) {
}

public void registerTransformerLatencySensor() {
transformerLatencySensor = new WritePathLatencySensor(localMetricRepository, METRIC_CONFIG, TRANSFORMER_LATENCY);
// Check to make sure there isn't already a registered transformerLatencySensor
if (transformerLatencySensor == null) {
transformerLatencySensor = new WritePathLatencySensor(localMetricRepository, METRIC_CONFIG, TRANSFORMER_LATENCY);
}
}

public static double unAvailableToZero(double value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,17 @@
import static com.linkedin.venice.writer.VeniceWriter.DEFAULT_LEADER_METADATA_WRAPPER;
import static com.linkedin.venice.writer.VeniceWriter.generateHeartbeatMessage;
import static com.linkedin.venice.writer.VeniceWriter.getHeartbeatKME;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyDouble;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anySet;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.ArgumentMatchers.longThat;
import static org.mockito.Mockito.after;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyDouble;
Expand Down Expand Up @@ -73,6 +82,7 @@
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import com.linkedin.davinci.client.DaVinciRecordTransformer;
import com.linkedin.davinci.compression.StorageEngineBackedCompressorFactory;
import com.linkedin.davinci.config.VeniceServerConfig;
import com.linkedin.davinci.config.VeniceStoreVersionConfig;
Expand All @@ -94,6 +104,7 @@
import com.linkedin.davinci.store.StoragePartitionConfig;
import com.linkedin.davinci.store.record.ValueRecord;
import com.linkedin.davinci.store.rocksdb.RocksDBServerConfig;
import com.linkedin.davinci.transformer.TestStringRecordTransformer;
import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.exceptions.MemoryLimitExhaustedException;
import com.linkedin.venice.exceptions.VeniceException;
Expand Down Expand Up @@ -626,7 +637,8 @@ private void runTest(
aaConfig,
1,
Collections.emptyMap(),
storeVersionConfigOverride -> {});
storeVersionConfigOverride -> {},
null);
}

private void runTest(
Expand All @@ -646,15 +658,17 @@ private void runTest(
aaConfig,
1,
Collections.emptyMap(),
storeVersionConfigOverride);
storeVersionConfigOverride,
null);
}

private void runTest(
PollStrategy pollStrategy,
Set<Integer> partitions,
Runnable beforeStartingConsumption,
Runnable assertions,
AAConfig aaConfig) throws Exception {
AAConfig aaConfig,
DaVinciRecordTransformer recordTransformer) throws Exception {
runTest(
pollStrategy,
partitions,
Expand All @@ -666,7 +680,8 @@ private void runTest(
aaConfig,
1,
Collections.emptyMap(),
storeVersionConfigOverride -> {});
storeVersionConfigOverride -> {},
recordTransformer);
}

private void runTest(
Expand All @@ -691,7 +706,8 @@ private void runTest(
aaConfig,
amplificationFactor,
extraServerProperties,
storeVersionConfigOverride -> {});
storeVersionConfigOverride -> {},
null);
}

/**
Expand Down Expand Up @@ -723,7 +739,8 @@ private void runTest(
AAConfig aaConfig,
int amplificationFactor,
Map<String, Object> extraServerProperties,
Consumer<VeniceStoreVersionConfig> storeVersionConfigOverride) throws Exception {
Consumer<VeniceStoreVersionConfig> storeVersionConfigOverride,
DaVinciRecordTransformer recordTransformer) throws Exception {

int partitionCount = PARTITION_COUNT / amplificationFactor;
VenicePartitioner partitioner = getVenicePartitioner(1); // Only get base venice partitioner
Expand Down Expand Up @@ -764,7 +781,7 @@ private void runTest(
leaderSubPartition,
false,
Optional.empty(),
null);
recordTransformer);

Future testSubscribeTaskFuture = null;
try {
Expand Down Expand Up @@ -1212,7 +1229,7 @@ public void testVeniceMessagesProcessing(AAConfig aaConfig) throws Exception {

verify(mockVersionedStorageIngestionStats, timeout(TEST_TIMEOUT_MS).atLeast(3))
.recordConsumedRecordEndToEndProcessingLatency(any(), eq(1), anyDouble(), anyLong());
}, aaConfig);
}, aaConfig, null);

// verify the shared consumer should be detached when the ingestion task is closed.
verify(aggKafkaConsumerService).unsubscribeAll(pubSubTopic);
Expand Down Expand Up @@ -1325,7 +1342,7 @@ public void testMissingMessagesForTopicWithLogCompactionEnabled(AAConfig aaConfi
OffsetRecord expectedOffsetRecordForLastMessage = getOffsetRecord(putMetadata4.getOffset());
verify(mockStorageMetadataService, timeout(TEST_TIMEOUT_MS))
.put(topic, PARTITION_FOO, expectedOffsetRecordForLastMessage);
}, aaConfig);
}, aaConfig, null);
}

@Test(dataProvider = "aaConfigProvider")
Expand Down Expand Up @@ -1611,7 +1628,7 @@ public void testDetectionOfMissingRecord(AAConfig aaConfig) throws Exception {

verify(mockLogNotifier, atLeastOnce()).started(topic, PARTITION_FOO);
verify(mockLogNotifier, atLeastOnce()).started(topic, PARTITION_BAR);
}, aaConfig);
}, aaConfig, null);
}

/**
Expand Down Expand Up @@ -1643,7 +1660,7 @@ public void testSkippingOfDuplicateRecord(AAConfig aaConfig) throws Exception {

verify(mockLogNotifier, atLeastOnce()).started(topic, PARTITION_FOO);
verify(mockLogNotifier, atLeastOnce()).started(topic, PARTITION_BAR);
}, aaConfig);
}, aaConfig, null);
}

@Test(dataProvider = "aaConfigProvider")
Expand All @@ -1656,7 +1673,7 @@ public void testThrottling(AAConfig aaConfig) throws Exception {
// START_OF_SEGMENT, START_OF_PUSH, PUT, DELETE
verify(mockRecordsThrottler, timeout(TEST_TIMEOUT_MS).times(4)).maybeThrottle(1);
verify(mockBandwidthThrottler, timeout(TEST_TIMEOUT_MS).times(4)).maybeThrottle(anyDouble());
}, aaConfig);
}, aaConfig, null);
}

/**
Expand Down Expand Up @@ -1843,7 +1860,7 @@ public void testDIVErrorMessagesNotFailFastAfterEOP(AAConfig aaConfig) throws Ex
args[0].equals(topic) && args[1].equals(PARTITION_FOO) && ((String) args[2]).length() > 0
&& args[3] instanceof FatalDataValidationException);
}
}, aaConfig);
}, aaConfig, null);
}

/**
Expand Down Expand Up @@ -2159,7 +2176,7 @@ public void testDataValidationCheckPointing(SortedInput sortedInput, AAConfig aa
PartitionConsumptionState pcs = storeIngestionTaskUnderTest.getPartitionConsumptionState(partition);
Assert.assertTrue(pcs.getLatestProcessedUpstreamRTOffsetMap().isEmpty());
});
}, aaConfig);
}, aaConfig, null);
}

@Test(dataProvider = "aaConfigProvider")
Expand Down Expand Up @@ -2189,7 +2206,7 @@ public void testNeverReportProgressBeforeStart(AAConfig aaConfig) throws Excepti
// of messages in bytes, since control message is being counted as 0 bytes (no data persisted in disk),
// then no progress will be reported during start, but only for processed messages.
verify(mockLogNotifier, after(TEST_TIMEOUT_MS).never()).progress(any(), anyInt(), anyInt());
}, aaConfig);
}, aaConfig, null);
}

@Test(dataProvider = "aaConfigProvider")
Expand Down Expand Up @@ -4243,6 +4260,49 @@ public void testMaybeSendIngestionHeartbeatWithHBSuccessOrFailure() throws Inter
});
}

@Test(dataProvider = "aaConfigProvider")
public void testStoreIngestionRecordTransformer(AAConfig aaConfig) throws Exception {
localVeniceWriter.broadcastStartOfPush(new HashMap<>());
PubSubProduceResult putMetadata = (PubSubProduceResult) localVeniceWriter
.put(putKeyFoo, putValue, EXISTING_SCHEMA_ID, PUT_KEY_FOO_TIMESTAMP, null)
.get();

Queue<AbstractPollStrategy> pollStrategies = new LinkedList<>();
pollStrategies.add(new RandomPollStrategy());

// We re-deliver the old put out of order, so we can make sure it's ignored.
Queue<PubSubTopicPartitionOffset> pollDeliveryOrder = new LinkedList<>();
pollDeliveryOrder.add(getTopicPartitionOffsetPair(putMetadata));
pollStrategies.add(new ArbitraryOrderingPollStrategy(pollDeliveryOrder));

PollStrategy pollStrategy = new CompositePollStrategy(pollStrategies);

TestStringRecordTransformer recordTransformer = new TestStringRecordTransformer();

VenicePartitioner partitioner = getVenicePartitioner(1);
int targetPartitionPutKeyFoo = partitioner.getPartitionId(putKeyFoo, PARTITION_COUNT);

runTest(pollStrategy, Utils.setOf(PARTITION_FOO), () -> {}, () -> {
Schema keySchema = Schema.create(Schema.Type.INT);
SchemaEntry keySchemaEntry = mock(SchemaEntry.class);
when(keySchemaEntry.getSchema()).thenReturn(keySchema);
when(mockSchemaRepo.getKeySchema(storeNameWithoutVersionInfo)).thenReturn(keySchemaEntry);

Schema valueSchema = Schema.create(Schema.Type.STRING);
SchemaEntry valueSchemaEntry = mock(SchemaEntry.class);
when(valueSchemaEntry.getSchema()).thenReturn(valueSchema);
when(mockSchemaRepo.getValueSchema(eq(storeNameWithoutVersionInfo), anyInt())).thenReturn(valueSchemaEntry);

mockAbstractStorageEngine.put(
targetPartitionPutKeyFoo,
putKeyFoo,
ByteBuffer.wrap(ValueRecord.create(EXISTING_SCHEMA_ID, putValue).serialize()));
}, aaConfig, recordTransformer);

// verify the shared consumer should be detached when the ingestion task is closed.
verify(aggKafkaConsumerService).unsubscribeAll(pubSubTopic);
}

private VeniceStoreVersionConfig getDefaultMockVeniceStoreVersionConfig(
Consumer<VeniceStoreVersionConfig> storeVersionConfigOverride) {
// mock the store config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,28 +49,10 @@ private Schema createSampleSchema() {
return new Schema.Parser().parse(schemaString);
}

public class TestRecordTransformer
implements DaVinciRecordTransformer<Integer, String, TransformedRecord<Integer, String>> {
public Schema getKeyOutputSchema() {
return Schema.create(Schema.Type.INT);
}

public Schema getValueOutputSchema() {
return Schema.create(Schema.Type.STRING);
}

public TransformedRecord<Integer, String> put(Lazy<Integer> key, Lazy<String> value) {
TransformedRecord<Integer, String> transformedRecord = new TransformedRecord<>();
transformedRecord.setKey(key.get());
transformedRecord.setValue(value.get() + "Transformed");
return transformedRecord;
}
}

@Test
public void testRecordTransformer() {
DaVinciRecordTransformer<Integer, String, TransformedRecord<Integer, String>> recordTransformer =
new TestRecordTransformer();
DaVinciRecordTransformer<Integer, Object, TransformedRecord<Integer, Object>> recordTransformer =
new TestStringRecordTransformer();

Schema keyOutputSchema = recordTransformer.getKeyOutputSchema();
assertEquals(keyOutputSchema.getType(), Schema.Type.INT);
Expand All @@ -79,13 +61,13 @@ public void testRecordTransformer() {
assertEquals(valueOutputSchema.getType(), Schema.Type.STRING);

Lazy<Integer> lazyKey = Lazy.of(() -> 42);
Lazy<String> lazyValue = Lazy.of(() -> "SampleValue");
TransformedRecord<Integer, String> transformedRecord = recordTransformer.put(lazyKey, lazyValue);
Lazy<Object> lazyValue = Lazy.of(() -> "SampleValue");
TransformedRecord<Integer, Object> transformedRecord = recordTransformer.put(lazyKey, lazyValue);
assertEquals(Optional.ofNullable(transformedRecord.getKey()), Optional.ofNullable(42));
assertEquals(transformedRecord.getValue(), "SampleValueTransformed");

Lazy<Integer> lazyDeleteKey = Lazy.of(() -> 99);
TransformedRecord<Integer, String> deletedRecord = recordTransformer.delete(lazyDeleteKey);
TransformedRecord<Integer, Object> deletedRecord = recordTransformer.delete(lazyDeleteKey);
assertNull(deletedRecord);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.linkedin.davinci.transformer;

import com.linkedin.davinci.client.DaVinciRecordTransformer;
import com.linkedin.davinci.client.TransformedRecord;
import com.linkedin.venice.utils.lazy.Lazy;
import org.apache.avro.Schema;


public class TestStringRecordTransformer
implements DaVinciRecordTransformer<Integer, Object, TransformedRecord<Integer, Object>> {
public Schema getKeyOutputSchema() {
return Schema.create(Schema.Type.INT);
}

public Schema getValueOutputSchema() {
return Schema.create(Schema.Type.STRING);
}

public TransformedRecord<Integer, Object> put(Lazy<Integer> key, Lazy<Object> value) {
TransformedRecord<Integer, Object> transformedRecord = new TransformedRecord<>();
transformedRecord.setKey(key.get());
transformedRecord.setValue(value.get() + "Transformed");
return transformedRecord;
}
}
Loading

0 comments on commit a3ee761

Please sign in to comment.