Skip to content

Commit

Permalink
[server][test] Fix checksum calculation bug in Active/Active Partial …
Browse files Browse the repository at this point in the history
…Update RMD chunking (linkedin#723)

This PR fixes repush failure in A/A partial update store

Current checksum calculation will take every PUT message's key, schemaId, value into account. However, RMD chunk as it is also a chunk PUT message and it is calculated in the running checksum, while the expected checksum which iterates the SST file does not contain it (since RMD chunk is put into RMD CF). This is the reason that repush failed on checksum verification.

During the investigation, we also found that default integration test config setup is not correct. It enables PlainTable mode by default and disable checksum verification by default, which is not consistent with current production behavior. This PR fixes the issue.
  • Loading branch information
sixpluszero authored and elijahgrimaldi committed Nov 1, 2023
1 parent 5e75415 commit 5e21148
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.storage.protocol.ChunkedValueManifest;
import com.linkedin.venice.utils.PartitionUtils;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
Expand Down Expand Up @@ -442,6 +443,15 @@ public void maybeUpdateExpectedChecksum(byte[] key, Put put) {
if (this.expectedSSTFileChecksum == null) {
return;
}
/**
* 1. For regular value and value chunk, we will take the value payload.
* 2. When A/A partial update is enabled, RMD chunking is turned on, we should skip the RMD chunk.
* 3. For chunk manifest, if RMD chunking is enabled, RMD manifest will be in the RMD payload. No matter it is RMD
* chunking or not, we should only take the value manifest part.
*/
if (put.schemaId == AvroProtocolDefinition.CHUNK.getCurrentProtocolVersion() && put.putValue.remaining() == 0) {
return;
}
this.expectedSSTFileChecksum.update(key);
ByteBuffer putValue = put.putValue;
this.expectedSSTFileChecksum.update(put.schemaId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,14 @@

import static org.mockito.Mockito.mock;

import com.linkedin.venice.kafka.protocol.Put;
import com.linkedin.venice.kafka.validation.checksum.CheckSum;
import com.linkedin.venice.kafka.validation.checksum.CheckSumType;
import com.linkedin.venice.offsets.OffsetRecord;
import com.linkedin.venice.schema.rmd.RmdSchemaGenerator;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.writer.WriterChunkingHelper;
import java.nio.ByteBuffer;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
Expand All @@ -12,6 +18,58 @@


public class PartitionConsumptionStateTest {
@Test
public void testUpdateChecksum() {
PartitionConsumptionState pcs = new PartitionConsumptionState(0, 1, mock(OffsetRecord.class), false);
pcs.initializeExpectedChecksum();
byte[] rmdPayload = new byte[] { 127 };
byte[] key1 = new byte[] { 1 };
byte[] key2 = new byte[] { 2 };
byte[] key3 = new byte[] { 3 };
byte[] key4 = new byte[] { 4 };
byte[] valuePayload1 = new byte[] { 10 };
byte[] valuePayload3 = new byte[] { 11 };
byte[] valuePayload4 = new byte[] { 12 };

Put put = new Put();
// Try to update a value chunk (should only update value payload)
put.schemaId = AvroProtocolDefinition.CHUNK.getCurrentProtocolVersion();
put.putValue = ByteBuffer.wrap(valuePayload1);
put.replicationMetadataPayload = WriterChunkingHelper.EMPTY_BYTE_BUFFER;
pcs.maybeUpdateExpectedChecksum(key1, put);
// Try to update a RMD chunk (should not be updated)
put.putValue = WriterChunkingHelper.EMPTY_BYTE_BUFFER;
put.replicationMetadataPayload = ByteBuffer.wrap(rmdPayload);
pcs.maybeUpdateExpectedChecksum(key2, put);
// Try to update a regular value with RMD (should only update value payload)
put.schemaId = 1;
put.putValue = ByteBuffer.wrap(valuePayload3);
put.replicationMetadataPayload = ByteBuffer.wrap(rmdPayload);
pcs.maybeUpdateExpectedChecksum(key3, put);
// Try to update a manifest (should only update value payload)
put.schemaId = AvroProtocolDefinition.CHUNKED_VALUE_MANIFEST.getCurrentProtocolVersion();
put.putValue = ByteBuffer.wrap(valuePayload4);
put.replicationMetadataPayload = ByteBuffer.wrap(rmdPayload);
pcs.maybeUpdateExpectedChecksum(key4, put);

byte[] checksum = pcs.getExpectedChecksum();
Assert.assertNotNull(checksum);

// Calculate expected checksum.
CheckSum expectedChecksum = CheckSum.getInstance(CheckSumType.MD5);
expectedChecksum.update(key1);
expectedChecksum.update(AvroProtocolDefinition.CHUNK.getCurrentProtocolVersion());
expectedChecksum.update(valuePayload1, 0, valuePayload1.length);
expectedChecksum.update(key3);
expectedChecksum.update(1);
expectedChecksum.update(valuePayload3, 0, valuePayload3.length);
expectedChecksum.update(key4);
expectedChecksum.update(AvroProtocolDefinition.CHUNKED_VALUE_MANIFEST.getCurrentProtocolVersion());
expectedChecksum.update(valuePayload4, 0, valuePayload4.length);

Assert.assertEquals(expectedChecksum.getCheckSum(), checksum);
}

/**
* Test the different transientRecordMap operations.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public class ServiceFactory {
*/
static {
TestUtils.preventSystemExit();

Utils.thisIsLocalhost();
StringBuilder sb;
try {
String[] cmd = { "/bin/bash", "-c", "ulimit -a" };
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.venice.integration.utils;

import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED;
import static com.linkedin.venice.ConfigKeys.*;
import static com.linkedin.venice.integration.utils.VeniceServerWrapper.CLIENT_CONFIG_FOR_CONSUMER;
import static com.linkedin.venice.integration.utils.VeniceServerWrapper.SERVER_ENABLE_SERVER_ALLOW_LIST;
Expand Down Expand Up @@ -259,6 +260,8 @@ static ServiceProvider<VeniceClusterWrapper> generateService(VeniceClusterCreate
featureProperties.setProperty(ENABLE_GRPC_READ_SERVER, Boolean.toString(options.isGrpcEnabled()));
// Half of servers on each mode, with 1 server clusters aligning with the default (true)
featureProperties.setProperty(STORE_WRITER_BUFFER_AFTER_LEADER_LOGIC_ENABLED, Boolean.toString(i % 2 == 0));
// Half of servers will in PT mode, with 1 server clusters aligning with the default (block based mode)
featureProperties.setProperty(ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED, Boolean.toString(i % 2 != 0));

if (!veniceRouterWrappers.isEmpty()) {
ClientConfig clientConfig = new ClientConfig().setVeniceURL(zkAddress)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static com.linkedin.venice.ConfigKeys.PUB_SUB_ADMIN_ADAPTER_FACTORY_CLASS;
import static com.linkedin.venice.ConfigKeys.PUB_SUB_CONSUMER_ADAPTER_FACTORY_CLASS;
import static com.linkedin.venice.ConfigKeys.PUB_SUB_PRODUCER_ADAPTER_FACTORY_CLASS;
import static com.linkedin.venice.ConfigKeys.SERVER_DATABASE_CHECKSUM_VERIFICATION_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_DISK_FULL_THRESHOLD;
import static com.linkedin.venice.ConfigKeys.SERVER_HTTP2_INBOUND_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_INGESTION_HEARTBEAT_INTERVAL_MS;
Expand Down Expand Up @@ -196,6 +197,8 @@ static StatefulServiceProvider<VeniceServerWrapper> generateService(
boolean ssl = Boolean.parseBoolean(featureProperties.getProperty(SERVER_ENABLE_SSL, "false"));
boolean isAutoJoin = Boolean.parseBoolean(featureProperties.getProperty(SERVER_IS_AUTO_JOIN, "false"));
boolean isGrpcEnabled = Boolean.parseBoolean(featureProperties.getProperty(ENABLE_GRPC_READ_SERVER, "false"));
boolean isPlainTableEnabled =
Boolean.parseBoolean(featureProperties.getProperty(ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED, "false"));
int numGrpcWorkerThreads = Integer.parseInt(
featureProperties.getProperty(
GRPC_SERVER_WORKER_THREAD_COUNT,
Expand Down Expand Up @@ -224,8 +227,7 @@ static StatefulServiceProvider<VeniceServerWrapper> generateService(
.put(MAX_ONLINE_OFFLINE_STATE_TRANSITION_THREAD_NUMBER, 100)
.put(SERVER_NETTY_GRACEFUL_SHUTDOWN_PERIOD_SECONDS, 0)
.put(PERSISTENCE_TYPE, ROCKS_DB)
.put(ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED, true)
.put(ROCKSDB_OPTIONS_USE_DIRECT_READS, false) // Required by PlainTable format
.put(SERVER_DATABASE_CHECKSUM_VERIFICATION_ENABLED, true)
.put(SERVER_PARTITION_GRACEFUL_DROP_DELAY_IN_SECONDS, 0)
.put(PARTICIPANT_MESSAGE_CONSUMPTION_DELAY_MS, 1000)
.put(KAFKA_READ_CYCLE_DELAY_MS, 50)
Expand All @@ -248,8 +250,7 @@ static StatefulServiceProvider<VeniceServerWrapper> generateService(
.put(
PUB_SUB_ADMIN_ADAPTER_FACTORY_CLASS,
pubSubBrokerWrapper.getPubSubClientsFactory().getAdminAdapterFactory().getClass().getName())
.put(SERVER_INGESTION_HEARTBEAT_INTERVAL_MS, 5000)
.put(configProperties);
.put(SERVER_INGESTION_HEARTBEAT_INTERVAL_MS, 5000);
if (sslToKafka) {
serverPropsBuilder.put(KAFKA_SECURITY_PROTOCOL, SecurityProtocol.SSL.name);
serverPropsBuilder.put(KafkaTestUtils.getLocalCommonKafkaSSLConfig(SslUtils.getTlsConfiguration()));
Expand All @@ -261,6 +262,11 @@ static StatefulServiceProvider<VeniceServerWrapper> generateService(
serverPropsBuilder.put(GRPC_SERVER_WORKER_THREAD_COUNT, numGrpcWorkerThreads);
}

if (isPlainTableEnabled) {
serverPropsBuilder.put(ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED, true);
serverPropsBuilder.put(ROCKSDB_OPTIONS_USE_DIRECT_READS, false); // Required by PlainTable format
}

// Add additional config from PubSubBrokerWrapper to server.properties iff the key is not already present
Map<String, String> brokerDetails =
PubSubBrokerWrapper.getBrokerDetailsForClients(Collections.singletonList(pubSubBrokerWrapper));
Expand All @@ -272,6 +278,8 @@ static StatefulServiceProvider<VeniceServerWrapper> generateService(
serverPropsBuilder.putIfAbsent(entry.getKey(), entry.getValue());
}

// Adds integration test config override at the end.
serverPropsBuilder.put(configProperties);
VeniceProperties serverProps = serverPropsBuilder.build();

File serverConfigFile = new File(configDirectory, VeniceConfigLoader.SERVER_PROPERTIES_FILE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ public void setUp() throws VeniceClientException {
serverProperties.put(PERSISTENCE_TYPE, PersistenceType.ROCKS_DB);
serverProperties.put(SERVER_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS, Long.toString(1L));
serverProperties.setProperty(ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED, "false");
// Has to disable checksum verification, otherwise it will fail when deleteSSTFiles is true.
serverProperties.setProperty(SERVER_DATABASE_CHECKSUM_VERIFICATION_ENABLED, "false");
serverProperties.setProperty(SERVER_DATABASE_SYNC_BYTES_INTERNAL_FOR_DEFERRED_WRITE_MODE, "300");
serverProperties.setProperty(DATA_BASE_PATH, Utils.getTempDataDirectory().getAbsolutePath());
veniceCluster.addVeniceServer(new Properties(), serverProperties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED;
import static com.linkedin.venice.ConfigKeys.CHILD_DATA_CENTER_KAFKA_URL_PREFIX;
import static com.linkedin.venice.ConfigKeys.PERSISTENCE_TYPE;
import static com.linkedin.venice.ConfigKeys.SERVER_DATABASE_CHECKSUM_VERIFICATION_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_DATABASE_SYNC_BYTES_INTERNAL_FOR_DEFERRED_WRITE_MODE;
import static com.linkedin.venice.hadoop.VenicePushJob.DEFAULT_KEY_FIELD_PROP;
import static com.linkedin.venice.hadoop.VenicePushJob.DEFAULT_VALUE_FIELD_PROP;
Expand Down Expand Up @@ -108,6 +109,8 @@ public void setUp() throws Exception {
Properties serverProperties = new Properties();
serverProperties.setProperty(ConfigKeys.SERVER_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS, Long.toString(1));
serverProperties.put(ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED, false);
// Has to disable checksum verification, otherwise it will fail when deleteSSTFiles is true.
serverProperties.put(SERVER_DATABASE_CHECKSUM_VERIFICATION_ENABLED, false);
serverProperties.put(PERSISTENCE_TYPE, PersistenceType.ROCKS_DB);
serverProperties.setProperty(SERVER_DATABASE_SYNC_BYTES_INTERNAL_FOR_DEFERRED_WRITE_MODE, "300");
serverProperties.put(
Expand Down

0 comments on commit 5e21148

Please sign in to comment.