Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[server][test] Fix checksum calculation bug in Active/Active Partial Update RMD chunking #723

Merged
merged 8 commits into from
Nov 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.storage.protocol.ChunkedValueManifest;
import com.linkedin.venice.utils.PartitionUtils;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
Expand Down Expand Up @@ -442,6 +443,15 @@ public void maybeUpdateExpectedChecksum(byte[] key, Put put) {
if (this.expectedSSTFileChecksum == null) {
return;
}
/**
* 1. For regular value and value chunk, we will take the value payload.
* 2. When A/A partial update is enabled, RMD chunking is turned on, we should skip the RMD chunk.
* 3. For chunk manifest, if RMD chunking is enabled, RMD manifest will be in the RMD payload. No matter it is RMD
* chunking or not, we should only take the value manifest part.
*/
if (put.schemaId == AvroProtocolDefinition.CHUNK.getCurrentProtocolVersion() && put.putValue.remaining() == 0) {
sixpluszero marked this conversation as resolved.
Show resolved Hide resolved
return;
}
this.expectedSSTFileChecksum.update(key);
ByteBuffer putValue = put.putValue;
this.expectedSSTFileChecksum.update(put.schemaId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,14 @@

import static org.mockito.Mockito.mock;

import com.linkedin.venice.kafka.protocol.Put;
import com.linkedin.venice.kafka.validation.checksum.CheckSum;
import com.linkedin.venice.kafka.validation.checksum.CheckSumType;
import com.linkedin.venice.offsets.OffsetRecord;
import com.linkedin.venice.schema.rmd.RmdSchemaGenerator;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.writer.WriterChunkingHelper;
import java.nio.ByteBuffer;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
Expand All @@ -12,6 +18,58 @@


public class PartitionConsumptionStateTest {
@Test
public void testUpdateChecksum() {
PartitionConsumptionState pcs = new PartitionConsumptionState(0, 1, mock(OffsetRecord.class), false);
pcs.initializeExpectedChecksum();
byte[] rmdPayload = new byte[] { 127 };
byte[] key1 = new byte[] { 1 };
byte[] key2 = new byte[] { 2 };
byte[] key3 = new byte[] { 3 };
byte[] key4 = new byte[] { 4 };
byte[] valuePayload1 = new byte[] { 10 };
byte[] valuePayload3 = new byte[] { 11 };
byte[] valuePayload4 = new byte[] { 12 };

Put put = new Put();
// Try to update a value chunk (should only update value payload)
put.schemaId = AvroProtocolDefinition.CHUNK.getCurrentProtocolVersion();
put.putValue = ByteBuffer.wrap(valuePayload1);
put.replicationMetadataPayload = WriterChunkingHelper.EMPTY_BYTE_BUFFER;
pcs.maybeUpdateExpectedChecksum(key1, put);
// Try to update a RMD chunk (should not be updated)
put.putValue = WriterChunkingHelper.EMPTY_BYTE_BUFFER;
put.replicationMetadataPayload = ByteBuffer.wrap(rmdPayload);
pcs.maybeUpdateExpectedChecksum(key2, put);
// Try to update a regular value with RMD (should only update value payload)
put.schemaId = 1;
put.putValue = ByteBuffer.wrap(valuePayload3);
put.replicationMetadataPayload = ByteBuffer.wrap(rmdPayload);
pcs.maybeUpdateExpectedChecksum(key3, put);
// Try to update a manifest (should only update value payload)
put.schemaId = AvroProtocolDefinition.CHUNKED_VALUE_MANIFEST.getCurrentProtocolVersion();
put.putValue = ByteBuffer.wrap(valuePayload4);
put.replicationMetadataPayload = ByteBuffer.wrap(rmdPayload);
pcs.maybeUpdateExpectedChecksum(key4, put);

byte[] checksum = pcs.getExpectedChecksum();
Assert.assertNotNull(checksum);

// Calculate expected checksum.
CheckSum expectedChecksum = CheckSum.getInstance(CheckSumType.MD5);
expectedChecksum.update(key1);
expectedChecksum.update(AvroProtocolDefinition.CHUNK.getCurrentProtocolVersion());
expectedChecksum.update(valuePayload1, 0, valuePayload1.length);
expectedChecksum.update(key3);
expectedChecksum.update(1);
expectedChecksum.update(valuePayload3, 0, valuePayload3.length);
expectedChecksum.update(key4);
expectedChecksum.update(AvroProtocolDefinition.CHUNKED_VALUE_MANIFEST.getCurrentProtocolVersion());
expectedChecksum.update(valuePayload4, 0, valuePayload4.length);

Assert.assertEquals(expectedChecksum.getCheckSum(), checksum);
}

/**
* Test the different transientRecordMap operations.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public class ServiceFactory {
*/
static {
TestUtils.preventSystemExit();

Utils.thisIsLocalhost();
StringBuilder sb;
try {
String[] cmd = { "/bin/bash", "-c", "ulimit -a" };
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.venice.integration.utils;

import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED;
import static com.linkedin.venice.ConfigKeys.*;
import static com.linkedin.venice.integration.utils.VeniceServerWrapper.CLIENT_CONFIG_FOR_CONSUMER;
import static com.linkedin.venice.integration.utils.VeniceServerWrapper.SERVER_ENABLE_SERVER_ALLOW_LIST;
Expand Down Expand Up @@ -259,6 +260,8 @@ static ServiceProvider<VeniceClusterWrapper> generateService(VeniceClusterCreate
featureProperties.setProperty(ENABLE_GRPC_READ_SERVER, Boolean.toString(options.isGrpcEnabled()));
// Half of servers on each mode, with 1 server clusters aligning with the default (true)
featureProperties.setProperty(STORE_WRITER_BUFFER_AFTER_LEADER_LOGIC_ENABLED, Boolean.toString(i % 2 == 0));
// Half of servers will in PT mode, with 1 server clusters aligning with the default (block based mode)
featureProperties.setProperty(ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED, Boolean.toString(i % 2 != 0));

if (!veniceRouterWrappers.isEmpty()) {
ClientConfig clientConfig = new ClientConfig().setVeniceURL(zkAddress)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static com.linkedin.venice.ConfigKeys.PUB_SUB_ADMIN_ADAPTER_FACTORY_CLASS;
import static com.linkedin.venice.ConfigKeys.PUB_SUB_CONSUMER_ADAPTER_FACTORY_CLASS;
import static com.linkedin.venice.ConfigKeys.PUB_SUB_PRODUCER_ADAPTER_FACTORY_CLASS;
import static com.linkedin.venice.ConfigKeys.SERVER_DATABASE_CHECKSUM_VERIFICATION_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_DISK_FULL_THRESHOLD;
import static com.linkedin.venice.ConfigKeys.SERVER_HTTP2_INBOUND_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_INGESTION_HEARTBEAT_INTERVAL_MS;
Expand Down Expand Up @@ -196,6 +197,8 @@ static StatefulServiceProvider<VeniceServerWrapper> generateService(
boolean ssl = Boolean.parseBoolean(featureProperties.getProperty(SERVER_ENABLE_SSL, "false"));
boolean isAutoJoin = Boolean.parseBoolean(featureProperties.getProperty(SERVER_IS_AUTO_JOIN, "false"));
boolean isGrpcEnabled = Boolean.parseBoolean(featureProperties.getProperty(ENABLE_GRPC_READ_SERVER, "false"));
boolean isPlainTableEnabled =
Boolean.parseBoolean(featureProperties.getProperty(ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED, "false"));
int numGrpcWorkerThreads = Integer.parseInt(
featureProperties.getProperty(
GRPC_SERVER_WORKER_THREAD_COUNT,
Expand Down Expand Up @@ -224,8 +227,7 @@ static StatefulServiceProvider<VeniceServerWrapper> generateService(
.put(MAX_ONLINE_OFFLINE_STATE_TRANSITION_THREAD_NUMBER, 100)
.put(SERVER_NETTY_GRACEFUL_SHUTDOWN_PERIOD_SECONDS, 0)
.put(PERSISTENCE_TYPE, ROCKS_DB)
.put(ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED, true)
.put(ROCKSDB_OPTIONS_USE_DIRECT_READS, false) // Required by PlainTable format
.put(SERVER_DATABASE_CHECKSUM_VERIFICATION_ENABLED, true)
sixpluszero marked this conversation as resolved.
Show resolved Hide resolved
.put(SERVER_PARTITION_GRACEFUL_DROP_DELAY_IN_SECONDS, 0)
.put(PARTICIPANT_MESSAGE_CONSUMPTION_DELAY_MS, 1000)
.put(KAFKA_READ_CYCLE_DELAY_MS, 50)
Expand All @@ -248,8 +250,7 @@ static StatefulServiceProvider<VeniceServerWrapper> generateService(
.put(
PUB_SUB_ADMIN_ADAPTER_FACTORY_CLASS,
pubSubBrokerWrapper.getPubSubClientsFactory().getAdminAdapterFactory().getClass().getName())
.put(SERVER_INGESTION_HEARTBEAT_INTERVAL_MS, 5000)
.put(configProperties);
.put(SERVER_INGESTION_HEARTBEAT_INTERVAL_MS, 5000);
if (sslToKafka) {
serverPropsBuilder.put(KAFKA_SECURITY_PROTOCOL, SecurityProtocol.SSL.name);
serverPropsBuilder.put(KafkaTestUtils.getLocalCommonKafkaSSLConfig(SslUtils.getTlsConfiguration()));
Expand All @@ -261,6 +262,11 @@ static StatefulServiceProvider<VeniceServerWrapper> generateService(
serverPropsBuilder.put(GRPC_SERVER_WORKER_THREAD_COUNT, numGrpcWorkerThreads);
}

if (isPlainTableEnabled) {
serverPropsBuilder.put(ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED, true);
serverPropsBuilder.put(ROCKSDB_OPTIONS_USE_DIRECT_READS, false); // Required by PlainTable format
}
sixpluszero marked this conversation as resolved.
Show resolved Hide resolved

// Add additional config from PubSubBrokerWrapper to server.properties iff the key is not already present
Map<String, String> brokerDetails =
PubSubBrokerWrapper.getBrokerDetailsForClients(Collections.singletonList(pubSubBrokerWrapper));
Expand All @@ -272,6 +278,8 @@ static StatefulServiceProvider<VeniceServerWrapper> generateService(
serverPropsBuilder.putIfAbsent(entry.getKey(), entry.getValue());
}

// Adds integration test config override at the end.
serverPropsBuilder.put(configProperties);
VeniceProperties serverProps = serverPropsBuilder.build();

File serverConfigFile = new File(configDirectory, VeniceConfigLoader.SERVER_PROPERTIES_FILE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ public void setUp() throws VeniceClientException {
serverProperties.put(PERSISTENCE_TYPE, PersistenceType.ROCKS_DB);
serverProperties.put(SERVER_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS, Long.toString(1L));
serverProperties.setProperty(ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED, "false");
// Has to disable checksum verification, otherwise it will fail when deleteSSTFiles is true.
serverProperties.setProperty(SERVER_DATABASE_CHECKSUM_VERIFICATION_ENABLED, "false");
serverProperties.setProperty(SERVER_DATABASE_SYNC_BYTES_INTERNAL_FOR_DEFERRED_WRITE_MODE, "300");
serverProperties.setProperty(DATA_BASE_PATH, Utils.getTempDataDirectory().getAbsolutePath());
veniceCluster.addVeniceServer(new Properties(), serverProperties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED;
import static com.linkedin.venice.ConfigKeys.CHILD_DATA_CENTER_KAFKA_URL_PREFIX;
import static com.linkedin.venice.ConfigKeys.PERSISTENCE_TYPE;
import static com.linkedin.venice.ConfigKeys.SERVER_DATABASE_CHECKSUM_VERIFICATION_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_DATABASE_SYNC_BYTES_INTERNAL_FOR_DEFERRED_WRITE_MODE;
import static com.linkedin.venice.hadoop.VenicePushJob.DEFAULT_KEY_FIELD_PROP;
import static com.linkedin.venice.hadoop.VenicePushJob.DEFAULT_VALUE_FIELD_PROP;
Expand Down Expand Up @@ -108,6 +109,8 @@ public void setUp() throws Exception {
Properties serverProperties = new Properties();
serverProperties.setProperty(ConfigKeys.SERVER_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS, Long.toString(1));
serverProperties.put(ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED, false);
// Has to disable checksum verification, otherwise it will fail when deleteSSTFiles is true.
serverProperties.put(SERVER_DATABASE_CHECKSUM_VERIFICATION_ENABLED, false);
serverProperties.put(PERSISTENCE_TYPE, PersistenceType.ROCKS_DB);
serverProperties.setProperty(SERVER_DATABASE_SYNC_BYTES_INTERNAL_FOR_DEFERRED_WRITE_MODE, "300");
serverProperties.put(
Expand Down