diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java index 367fb476e5f..168fdb68f5f 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java @@ -12,6 +12,7 @@ import com.linkedin.venice.pubsub.PubSubTopicRepository; import com.linkedin.venice.pubsub.api.PubSubTopic; import com.linkedin.venice.pubsub.api.PubSubTopicPartition; +import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; import com.linkedin.venice.storage.protocol.ChunkedValueManifest; import com.linkedin.venice.utils.PartitionUtils; import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; @@ -442,6 +443,15 @@ public void maybeUpdateExpectedChecksum(byte[] key, Put put) { if (this.expectedSSTFileChecksum == null) { return; } + /** + * 1. For regular value and value chunk, we will take the value payload. + * 2. When A/A partial update is enabled, RMD chunking is turned on, we should skip the RMD chunk. + * 3. For chunk manifest, if RMD chunking is enabled, RMD manifest will be in the RMD payload. No matter it is RMD + * chunking or not, we should only take the value manifest part. + */ + if (put.schemaId == AvroProtocolDefinition.CHUNK.getCurrentProtocolVersion() && put.putValue.remaining() == 0) { + return; + } this.expectedSSTFileChecksum.update(key); ByteBuffer putValue = put.putValue; this.expectedSSTFileChecksum.update(put.schemaId); diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionStateTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionStateTest.java index a9c0264b3cc..08369d60385 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionStateTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionStateTest.java @@ -2,8 +2,14 @@ import static org.mockito.Mockito.mock; +import com.linkedin.venice.kafka.protocol.Put; +import com.linkedin.venice.kafka.validation.checksum.CheckSum; +import com.linkedin.venice.kafka.validation.checksum.CheckSumType; import com.linkedin.venice.offsets.OffsetRecord; import com.linkedin.venice.schema.rmd.RmdSchemaGenerator; +import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; +import com.linkedin.venice.writer.WriterChunkingHelper; +import java.nio.ByteBuffer; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; @@ -12,6 +18,58 @@ public class PartitionConsumptionStateTest { + @Test + public void testUpdateChecksum() { + PartitionConsumptionState pcs = new PartitionConsumptionState(0, 1, mock(OffsetRecord.class), false); + pcs.initializeExpectedChecksum(); + byte[] rmdPayload = new byte[] { 127 }; + byte[] key1 = new byte[] { 1 }; + byte[] key2 = new byte[] { 2 }; + byte[] key3 = new byte[] { 3 }; + byte[] key4 = new byte[] { 4 }; + byte[] valuePayload1 = new byte[] { 10 }; + byte[] valuePayload3 = new byte[] { 11 }; + byte[] valuePayload4 = new byte[] { 12 }; + + Put put = new Put(); + // Try to update a value chunk (should only update value payload) + put.schemaId = AvroProtocolDefinition.CHUNK.getCurrentProtocolVersion(); + put.putValue = ByteBuffer.wrap(valuePayload1); + put.replicationMetadataPayload = WriterChunkingHelper.EMPTY_BYTE_BUFFER; + pcs.maybeUpdateExpectedChecksum(key1, put); + // Try to update a RMD chunk (should not be updated) + put.putValue = WriterChunkingHelper.EMPTY_BYTE_BUFFER; + put.replicationMetadataPayload = ByteBuffer.wrap(rmdPayload); + pcs.maybeUpdateExpectedChecksum(key2, put); + // Try to update a regular value with RMD (should only update value payload) + put.schemaId = 1; + put.putValue = ByteBuffer.wrap(valuePayload3); + put.replicationMetadataPayload = ByteBuffer.wrap(rmdPayload); + pcs.maybeUpdateExpectedChecksum(key3, put); + // Try to update a manifest (should only update value payload) + put.schemaId = AvroProtocolDefinition.CHUNKED_VALUE_MANIFEST.getCurrentProtocolVersion(); + put.putValue = ByteBuffer.wrap(valuePayload4); + put.replicationMetadataPayload = ByteBuffer.wrap(rmdPayload); + pcs.maybeUpdateExpectedChecksum(key4, put); + + byte[] checksum = pcs.getExpectedChecksum(); + Assert.assertNotNull(checksum); + + // Calculate expected checksum. + CheckSum expectedChecksum = CheckSum.getInstance(CheckSumType.MD5); + expectedChecksum.update(key1); + expectedChecksum.update(AvroProtocolDefinition.CHUNK.getCurrentProtocolVersion()); + expectedChecksum.update(valuePayload1, 0, valuePayload1.length); + expectedChecksum.update(key3); + expectedChecksum.update(1); + expectedChecksum.update(valuePayload3, 0, valuePayload3.length); + expectedChecksum.update(key4); + expectedChecksum.update(AvroProtocolDefinition.CHUNKED_VALUE_MANIFEST.getCurrentProtocolVersion()); + expectedChecksum.update(valuePayload4, 0, valuePayload4.length); + + Assert.assertEquals(expectedChecksum.getCheckSum(), checksum); + } + /** * Test the different transientRecordMap operations. */ diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/ServiceFactory.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/ServiceFactory.java index 8de2ed7a506..9292aabf32f 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/ServiceFactory.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/ServiceFactory.java @@ -66,7 +66,7 @@ public class ServiceFactory { */ static { TestUtils.preventSystemExit(); - + Utils.thisIsLocalhost(); StringBuilder sb; try { String[] cmd = { "/bin/bash", "-c", "ulimit -a" }; diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceClusterWrapper.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceClusterWrapper.java index 95e6b82eec8..795ae3b1b37 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceClusterWrapper.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceClusterWrapper.java @@ -1,5 +1,6 @@ package com.linkedin.venice.integration.utils; +import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED; import static com.linkedin.venice.ConfigKeys.*; import static com.linkedin.venice.integration.utils.VeniceServerWrapper.CLIENT_CONFIG_FOR_CONSUMER; import static com.linkedin.venice.integration.utils.VeniceServerWrapper.SERVER_ENABLE_SERVER_ALLOW_LIST; @@ -259,6 +260,8 @@ static ServiceProvider generateService(VeniceClusterCreate featureProperties.setProperty(ENABLE_GRPC_READ_SERVER, Boolean.toString(options.isGrpcEnabled())); // Half of servers on each mode, with 1 server clusters aligning with the default (true) featureProperties.setProperty(STORE_WRITER_BUFFER_AFTER_LEADER_LOGIC_ENABLED, Boolean.toString(i % 2 == 0)); + // Half of servers will in PT mode, with 1 server clusters aligning with the default (block based mode) + featureProperties.setProperty(ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED, Boolean.toString(i % 2 != 0)); if (!veniceRouterWrappers.isEmpty()) { ClientConfig clientConfig = new ClientConfig().setVeniceURL(zkAddress) diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceServerWrapper.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceServerWrapper.java index 003e5af46d0..7b00c050d97 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceServerWrapper.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceServerWrapper.java @@ -20,6 +20,7 @@ import static com.linkedin.venice.ConfigKeys.PUB_SUB_ADMIN_ADAPTER_FACTORY_CLASS; import static com.linkedin.venice.ConfigKeys.PUB_SUB_CONSUMER_ADAPTER_FACTORY_CLASS; import static com.linkedin.venice.ConfigKeys.PUB_SUB_PRODUCER_ADAPTER_FACTORY_CLASS; +import static com.linkedin.venice.ConfigKeys.SERVER_DATABASE_CHECKSUM_VERIFICATION_ENABLED; import static com.linkedin.venice.ConfigKeys.SERVER_DISK_FULL_THRESHOLD; import static com.linkedin.venice.ConfigKeys.SERVER_HTTP2_INBOUND_ENABLED; import static com.linkedin.venice.ConfigKeys.SERVER_INGESTION_HEARTBEAT_INTERVAL_MS; @@ -196,6 +197,8 @@ static StatefulServiceProvider generateService( boolean ssl = Boolean.parseBoolean(featureProperties.getProperty(SERVER_ENABLE_SSL, "false")); boolean isAutoJoin = Boolean.parseBoolean(featureProperties.getProperty(SERVER_IS_AUTO_JOIN, "false")); boolean isGrpcEnabled = Boolean.parseBoolean(featureProperties.getProperty(ENABLE_GRPC_READ_SERVER, "false")); + boolean isPlainTableEnabled = + Boolean.parseBoolean(featureProperties.getProperty(ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED, "false")); int numGrpcWorkerThreads = Integer.parseInt( featureProperties.getProperty( GRPC_SERVER_WORKER_THREAD_COUNT, @@ -224,8 +227,7 @@ static StatefulServiceProvider generateService( .put(MAX_ONLINE_OFFLINE_STATE_TRANSITION_THREAD_NUMBER, 100) .put(SERVER_NETTY_GRACEFUL_SHUTDOWN_PERIOD_SECONDS, 0) .put(PERSISTENCE_TYPE, ROCKS_DB) - .put(ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED, true) - .put(ROCKSDB_OPTIONS_USE_DIRECT_READS, false) // Required by PlainTable format + .put(SERVER_DATABASE_CHECKSUM_VERIFICATION_ENABLED, true) .put(SERVER_PARTITION_GRACEFUL_DROP_DELAY_IN_SECONDS, 0) .put(PARTICIPANT_MESSAGE_CONSUMPTION_DELAY_MS, 1000) .put(KAFKA_READ_CYCLE_DELAY_MS, 50) @@ -248,8 +250,7 @@ static StatefulServiceProvider generateService( .put( PUB_SUB_ADMIN_ADAPTER_FACTORY_CLASS, pubSubBrokerWrapper.getPubSubClientsFactory().getAdminAdapterFactory().getClass().getName()) - .put(SERVER_INGESTION_HEARTBEAT_INTERVAL_MS, 5000) - .put(configProperties); + .put(SERVER_INGESTION_HEARTBEAT_INTERVAL_MS, 5000); if (sslToKafka) { serverPropsBuilder.put(KAFKA_SECURITY_PROTOCOL, SecurityProtocol.SSL.name); serverPropsBuilder.put(KafkaTestUtils.getLocalCommonKafkaSSLConfig(SslUtils.getTlsConfiguration())); @@ -261,6 +262,11 @@ static StatefulServiceProvider generateService( serverPropsBuilder.put(GRPC_SERVER_WORKER_THREAD_COUNT, numGrpcWorkerThreads); } + if (isPlainTableEnabled) { + serverPropsBuilder.put(ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED, true); + serverPropsBuilder.put(ROCKSDB_OPTIONS_USE_DIRECT_READS, false); // Required by PlainTable format + } + // Add additional config from PubSubBrokerWrapper to server.properties iff the key is not already present Map brokerDetails = PubSubBrokerWrapper.getBrokerDetailsForClients(Collections.singletonList(pubSubBrokerWrapper)); @@ -272,6 +278,8 @@ static StatefulServiceProvider generateService( serverPropsBuilder.putIfAbsent(entry.getKey(), entry.getValue()); } + // Adds integration test config override at the end. + serverPropsBuilder.put(configProperties); VeniceProperties serverProps = serverPropsBuilder.build(); File serverConfigFile = new File(configDirectory, VeniceConfigLoader.SERVER_PROPERTIES_FILE); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/restart/TestRestartServerAfterDeletingSstFiles.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/restart/TestRestartServerAfterDeletingSstFiles.java index 69fda308552..17b318b9d33 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/restart/TestRestartServerAfterDeletingSstFiles.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/restart/TestRestartServerAfterDeletingSstFiles.java @@ -87,6 +87,8 @@ public void setUp() throws VeniceClientException { serverProperties.put(PERSISTENCE_TYPE, PersistenceType.ROCKS_DB); serverProperties.put(SERVER_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS, Long.toString(1L)); serverProperties.setProperty(ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED, "false"); + // Has to disable checksum verification, otherwise it will fail when deleteSSTFiles is true. + serverProperties.setProperty(SERVER_DATABASE_CHECKSUM_VERIFICATION_ENABLED, "false"); serverProperties.setProperty(SERVER_DATABASE_SYNC_BYTES_INTERNAL_FOR_DEFERRED_WRITE_MODE, "300"); serverProperties.setProperty(DATA_BASE_PATH, Utils.getTempDataDirectory().getAbsolutePath()); veniceCluster.addVeniceServer(new Properties(), serverProperties); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/restart/TestRestartServerAfterDeletingSstFilesWithActiveActiveIngestion.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/restart/TestRestartServerAfterDeletingSstFilesWithActiveActiveIngestion.java index 48a553425fd..cfe7837dc8e 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/restart/TestRestartServerAfterDeletingSstFilesWithActiveActiveIngestion.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/restart/TestRestartServerAfterDeletingSstFilesWithActiveActiveIngestion.java @@ -3,6 +3,7 @@ import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED; import static com.linkedin.venice.ConfigKeys.CHILD_DATA_CENTER_KAFKA_URL_PREFIX; import static com.linkedin.venice.ConfigKeys.PERSISTENCE_TYPE; +import static com.linkedin.venice.ConfigKeys.SERVER_DATABASE_CHECKSUM_VERIFICATION_ENABLED; import static com.linkedin.venice.ConfigKeys.SERVER_DATABASE_SYNC_BYTES_INTERNAL_FOR_DEFERRED_WRITE_MODE; import static com.linkedin.venice.hadoop.VenicePushJob.DEFAULT_KEY_FIELD_PROP; import static com.linkedin.venice.hadoop.VenicePushJob.DEFAULT_VALUE_FIELD_PROP; @@ -108,6 +109,8 @@ public void setUp() throws Exception { Properties serverProperties = new Properties(); serverProperties.setProperty(ConfigKeys.SERVER_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS, Long.toString(1)); serverProperties.put(ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED, false); + // Has to disable checksum verification, otherwise it will fail when deleteSSTFiles is true. + serverProperties.put(SERVER_DATABASE_CHECKSUM_VERIFICATION_ENABLED, false); serverProperties.put(PERSISTENCE_TYPE, PersistenceType.ROCKS_DB); serverProperties.setProperty(SERVER_DATABASE_SYNC_BYTES_INTERNAL_FOR_DEFERRED_WRITE_MODE, "300"); serverProperties.put(