diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java index 17f811f934..9436ab48a1 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java @@ -41,7 +41,10 @@ public class PartitionConsumptionState { private String leaderHostId; - /** whether the ingestion of current partition is deferred-write. */ + /** + * whether the ingestion of current partition is deferred-write. + * Refer {@link com.linkedin.davinci.store.rocksdb.RocksDBStoragePartition#deferredWrite} + */ private boolean deferredWrite; private boolean errorReported; private boolean lagCaughtUp; diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/StoragePartitionConfig.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/StoragePartitionConfig.java index 1a2830fbb9..0fd53f4a6e 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/StoragePartitionConfig.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/StoragePartitionConfig.java @@ -10,6 +10,9 @@ public class StoragePartitionConfig { private final String storeName; private final int partitionId; + /** + * Refer {@link com.linkedin.davinci.store.rocksdb.RocksDBStoragePartition#deferredWrite} + */ private boolean deferredWrite; private boolean readOnly; private boolean writeOnlyConfig; diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/rocksdb/RocksDBSstFileWriter.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/rocksdb/RocksDBSstFileWriter.java index 7ebba532a2..514dcfd2bd 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/rocksdb/RocksDBSstFileWriter.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/rocksdb/RocksDBSstFileWriter.java @@ -225,6 +225,10 @@ public void close() { } } + /** + * Closes currentSSTFileWriter, update lastCheckPointedSSTFileNum with the current SST file number, + * validates checksum on this SST file and return updated checkpointingInfo with this lastCheckPointedSSTFileNum. + */ public Map sync() { try { /** @@ -352,13 +356,13 @@ private String composeFullPathForSSTFile(int sstFileNo) { * This function calculates checksum of all the key/value pair stored in the input sstFilePath. It then * verifies if the checksum matches with the input checksumToMatch and return the result. * A SstFileReader handle is used to perform bulk scan through the entire SST file. fillCache option is - * explicitely disabled to not pollute the rocksdb internal block caches. And also implicit checksum verification + * explicitly disabled to not pollute the rocksdb internal block caches. And also implicit checksum verification * is disabled to reduce latency of the entire operation. * * @param sstFilePath the full absolute path of the SST file * @param expectedRecordNumInSSTFile expected number of key/value pairs in the SST File * @param checksumToMatch pre-calculated checksum to match against. - * @return true if the the sstFile checksum matches with the provided checksum. + * @return true if the sstFile checksum matches with the provided checksum. */ private boolean verifyChecksum(String sstFilePath, long expectedRecordNumInSSTFile, byte[] checksumToMatch) { SstFileReader sstFileReader = null; diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/rocksdb/RocksDBStoragePartition.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/rocksdb/RocksDBStoragePartition.java index e2969e73d0..558b554ce8 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/rocksdb/RocksDBStoragePartition.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/rocksdb/RocksDBStoragePartition.java @@ -114,7 +114,10 @@ public class RocksDBStoragePartition extends AbstractStoragePartition { private final RocksDBStorageEngineFactory factory; private final RocksDBThrottler rocksDBThrottler; /** - * Whether the input is sorted or not. + * Whether the input is sorted or not.
+ * deferredWrite = sortedInput => ingested via batch push which is sorted in VPJ, can use {@link RocksDBSstFileWriter} to ingest + * the input data to RocksDB
+ * !deferredWrite = !sortedInput => can not use RocksDBSstFileWriter for ingestion */ protected final boolean deferredWrite; @@ -739,9 +742,8 @@ public synchronized Map sync() { LOGGER.debug("Unexpected sync in RocksDB read-only mode"); } else { try { - // Since Venice RocksDB database disables WAL, flush will be triggered for every 'sync' to avoid data loss - // during - // crash recovery + // Since Venice RocksDB database disables WAL, flush will be triggered for every 'sync' to + // avoid data loss during crash recovery rocksDB.flush(WAIT_FOR_FLUSH_OPTIONS, columnFamilyHandleList); } catch (RocksDBException e) { checkAndThrowMemoryLimitException(e); diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/rocksdb/RocksDBSstFileWriterTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/rocksdb/RocksDBSstFileWriterTest.java index e7a2f44dba..7882957f54 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/rocksdb/RocksDBSstFileWriterTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/rocksdb/RocksDBSstFileWriterTest.java @@ -1,16 +1,20 @@ package com.linkedin.davinci.store.rocksdb; import com.linkedin.venice.exceptions.VeniceException; +import com.linkedin.venice.kafka.validation.checksum.CheckSum; +import com.linkedin.venice.kafka.validation.checksum.CheckSumType; import com.linkedin.venice.utils.Utils; import com.linkedin.venice.utils.VeniceProperties; import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; import java.util.Optional; import org.apache.commons.io.FileUtils; import org.rocksdb.EnvOptions; import org.rocksdb.Options; +import org.rocksdb.RocksDBException; import org.testng.Assert; import org.testng.annotations.Test; @@ -232,7 +236,7 @@ public void testOpenWithMissingFile() throws IOException { } @Test - public void testOpenWithAllValidFiles() throws IOException { + public void testOpenAndSyncWithAllValidFiles() throws IOException { RocksDBSstFileWriter rocksDBSstFileWriter = null; try { rocksDBSstFileWriter = new RocksDBSstFileWriter( @@ -255,6 +259,124 @@ public void testOpenWithAllValidFiles() throws IOException { // check that only 7 files exists (0-5 after cleanup by checkDatabaseIntegrity) // and 6 after rocksDBSstFileWriter.open opens a new file Assert.assertEquals(getNumberOfFilesInTempDirectory(), 7); + rocksDBSstFileWriter.sync(); + } finally { + if (rocksDBSstFileWriter != null) { + rocksDBSstFileWriter.close(); + } + deleteTempDatabaseDir(); + } + } + + @Test + public void testSyncWithCorrectChecksum() throws IOException, RocksDBException { + RocksDBSstFileWriter rocksDBSstFileWriter = null; + try { + rocksDBSstFileWriter = new RocksDBSstFileWriter( + STORE_NAME, + PARTITION_ID, + "", + new EnvOptions(), + new Options(), + DB_DIR, + IS_RMD, + ROCKS_DB_SERVER_CONFIG); + Map checkpointedInfo = new HashMap<>(); + + // Checkpoint that 1 sst file should be found + checkpointedInfo.put(rocksDBSstFileWriter.getLastCheckPointedSSTFileNum(), "0"); + // create 5 sst file + createSstFiles(5); + + rocksDBSstFileWriter.open(checkpointedInfo, Optional.of(() -> { + CheckSum sstFileFinalCheckSum = CheckSum.getInstance(CheckSumType.MD5); + sstFileFinalCheckSum.update("key".getBytes()); + sstFileFinalCheckSum.update("value".getBytes()); + return sstFileFinalCheckSum.getCheckSum(); + })); + // check that only 1 files exists ("0" after cleanup by checkDatabaseIntegrity) + // and "1" after rocksDBSstFileWriter.open opens a new file + Assert.assertEquals(getNumberOfFilesInTempDirectory(), 2); + rocksDBSstFileWriter.put("key".getBytes(), ByteBuffer.wrap("value".getBytes())); + // call sync to verify checksum + rocksDBSstFileWriter.sync(); + } finally { + if (rocksDBSstFileWriter != null) { + rocksDBSstFileWriter.close(); + } + deleteTempDatabaseDir(); + } + } + + @Test(expectedExceptions = VeniceException.class, expectedExceptionsMessageRegExp = "verifyChecksum: failure. last sstFile checksum didn't match for store.*") + public void testSyncWithInCorrectChecksum() throws IOException, RocksDBException { + RocksDBSstFileWriter rocksDBSstFileWriter = null; + try { + rocksDBSstFileWriter = new RocksDBSstFileWriter( + STORE_NAME, + PARTITION_ID, + "", + new EnvOptions(), + new Options(), + DB_DIR, + IS_RMD, + ROCKS_DB_SERVER_CONFIG); + Map checkpointedInfo = new HashMap<>(); + + // Checkpoint that 1 sst file should be found + checkpointedInfo.put(rocksDBSstFileWriter.getLastCheckPointedSSTFileNum(), "0"); + // create 10 sst file + createSstFiles(5); + + rocksDBSstFileWriter.open(checkpointedInfo, Optional.of(() -> { + CheckSum sstFileFinalCheckSum = CheckSum.getInstance(CheckSumType.MD5); + sstFileFinalCheckSum.update("wrong_key".getBytes()); + sstFileFinalCheckSum.update("wrong_value".getBytes()); + return sstFileFinalCheckSum.getCheckSum(); + })); + // check that only 2 files exists ("0" after cleanup by checkDatabaseIntegrity) + // and "1" after rocksDBSstFileWriter.open opens a new file + Assert.assertEquals(getNumberOfFilesInTempDirectory(), 2); + rocksDBSstFileWriter.put("key".getBytes(), ByteBuffer.wrap("value".getBytes())); + // call sync to verify checksum + rocksDBSstFileWriter.sync(); + } finally { + if (rocksDBSstFileWriter != null) { + rocksDBSstFileWriter.close(); + } + deleteTempDatabaseDir(); + } + } + + @Test(expectedExceptions = VeniceException.class, expectedExceptionsMessageRegExp = "Checksum mismatch in SST files.*") + public void testSyncWithMissingFile() throws IOException, RocksDBException { + RocksDBSstFileWriter rocksDBSstFileWriter = null; + try { + rocksDBSstFileWriter = new RocksDBSstFileWriter( + STORE_NAME, + PARTITION_ID, + "", + new EnvOptions(), + new Options(), + DB_DIR, + IS_RMD, + ROCKS_DB_SERVER_CONFIG); + Map checkpointedInfo = new HashMap<>(); + + // Checkpoint that 6 sst file should be found + checkpointedInfo.put(rocksDBSstFileWriter.getLastCheckPointedSSTFileNum(), "5"); + // create 10 sst file + createSstFiles(10); + + rocksDBSstFileWriter.open(checkpointedInfo, Optional.of(() -> "1".getBytes())); + // check that only 7 files exists (0-5 after cleanup by checkDatabaseIntegrity) + // and 6 after rocksDBSstFileWriter.open opens a new file + Assert.assertEquals(getNumberOfFilesInTempDirectory(), 7); + rocksDBSstFileWriter.put("key".getBytes(), ByteBuffer.wrap("value".getBytes())); + deleteAllSstFiles(getNumberOfFilesInTempDirectory()); + // call sync after deleting all the files to mimic exception thrown during graceful shutdown of servers with + // missing sst files + rocksDBSstFileWriter.sync(); } finally { if (rocksDBSstFileWriter != null) { rocksDBSstFileWriter.close(); @@ -286,6 +408,12 @@ private void deleteSstFile(int fileNumber) throws IOException { FileUtils.delete(new File(DB_DIR + "/sst_file_" + fileNumber)); } + private void deleteAllSstFiles(int maxFileNumber) throws IOException { + for (int i = 0; i < maxFileNumber; i++) { + FileUtils.delete(new File(DB_DIR + "/sst_file_" + i)); + } + } + private void createSstFiles(int numberOfFiles) throws IOException { getTempDatabaseDir(); for (int i = 0; i < numberOfFiles; i++) { diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/restart/TestRestartServerAfterDeletingSstFiles.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/restart/TestRestartServerAfterDeletingSstFiles.java index 17b318b9d3..4a767beaf1 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/restart/TestRestartServerAfterDeletingSstFiles.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/restart/TestRestartServerAfterDeletingSstFiles.java @@ -87,8 +87,7 @@ public void setUp() throws VeniceClientException { serverProperties.put(PERSISTENCE_TYPE, PersistenceType.ROCKS_DB); serverProperties.put(SERVER_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS, Long.toString(1L)); serverProperties.setProperty(ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED, "false"); - // Has to disable checksum verification, otherwise it will fail when deleteSSTFiles is true. - serverProperties.setProperty(SERVER_DATABASE_CHECKSUM_VERIFICATION_ENABLED, "false"); + serverProperties.setProperty(SERVER_DATABASE_CHECKSUM_VERIFICATION_ENABLED, "true"); serverProperties.setProperty(SERVER_DATABASE_SYNC_BYTES_INTERNAL_FOR_DEFERRED_WRITE_MODE, "300"); serverProperties.setProperty(DATA_BASE_PATH, Utils.getTempDataDirectory().getAbsolutePath()); veniceCluster.addVeniceServer(new Properties(), serverProperties); @@ -135,7 +134,7 @@ public void testWithServerRestart() throws Exception { VeniceServerWrapper server = veniceCluster.getVeniceServers().get(0); // restart the venice servers: Mimic Process crash and restart - restartVeniceServer(server); + restartServerByDeletingSSTFiles(server, false, null); endIngestion(); @@ -174,22 +173,7 @@ public void testWithServerRestartWithDeletedSSTFiles() throws Exception { Assert.assertEquals(totalIngestedKeys, numKeys); }); - // Delete the sst files to mimic how ingestExternalFile() moves them to RocksDB. - LOGGER.info("Finished Ingestion of all data to SST Files: Delete the sst files"); - rocksDBStorageEngine.getPartitionIds().forEach(id -> { - ReadWriteLock rwLock = rocksDBStorageEngine.getRWLockForPartitionOrThrow(id); - try { - rwLock.writeLock().lock(); - RocksDBStoragePartition partition = (RocksDBStoragePartition) rocksDBStorageEngine.getPartitionOrThrow(id); - partition.deleteFilesInDirectory(partition.getFullPathForTempSSTFileDir()); - } finally { - rwLock.writeLock().unlock(); - } - }); - - // restart the venice servers: Mimic Process crash and restart after ingestExternalFile() - // completes but before EOP was synced to OffsetRecord - restartVeniceServer(server); + restartServerByDeletingSSTFiles(server, true, rocksDBStorageEngine); endIngestion(); verifyIngestion(); @@ -254,8 +238,36 @@ private void verifyIngestion() throws ExecutionException, InterruptedException { } } - private void restartVeniceServer(VeniceServerWrapper server) { + /** + * Mimic non-graceful shutdown of the servers in the midst of sst files being moved to RocksDB: Mimic process crash + * and restart after ingestExternalFile() completes but before EOP was synced to OffsetRecord + * + * 1. Stop the server + * 2. delete the SST files (Deleting before graceful shutdown will not help + * mimic this scenario as there will be a sync during the graceful shutdown) + * 3. start the server + */ + private void restartServerByDeletingSSTFiles( + VeniceServerWrapper server, + boolean deleteSSTFiles, + RocksDBStorageEngine rocksDBStorageEngine) { veniceCluster.stopVeniceServer(server.getPort()); + + if (deleteSSTFiles) { + // Delete the sst files to mimic how ingestExternalFile() moves them to RocksDB. + LOGGER.info("Delete the sst files"); + rocksDBStorageEngine.getPartitionIds().forEach(id -> { + ReadWriteLock rwLock = rocksDBStorageEngine.getRWLockForPartitionOrThrow(id); + try { + rwLock.writeLock().lock(); + RocksDBStoragePartition partition = (RocksDBStoragePartition) rocksDBStorageEngine.getPartitionOrThrow(id); + partition.deleteFilesInDirectory(partition.getFullPathForTempSSTFileDir()); + } finally { + rwLock.writeLock().unlock(); + } + }); + } + veniceCluster.restartVeniceServer(server.getPort()); } } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/restart/TestRestartServerAfterDeletingSstFilesWithActiveActiveIngestion.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/restart/TestRestartServerAfterDeletingSstFilesWithActiveActiveIngestion.java index 2be003f36b..5e39531cc6 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/restart/TestRestartServerAfterDeletingSstFilesWithActiveActiveIngestion.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/restart/TestRestartServerAfterDeletingSstFilesWithActiveActiveIngestion.java @@ -119,8 +119,7 @@ public void setUp() throws Exception { Properties serverProperties = new Properties(); serverProperties.setProperty(ConfigKeys.SERVER_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS, Long.toString(1)); serverProperties.put(ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED, false); - // Has to disable checksum verification, otherwise it will fail when deleteSSTFiles is true. - serverProperties.put(SERVER_DATABASE_CHECKSUM_VERIFICATION_ENABLED, false); + serverProperties.put(SERVER_DATABASE_CHECKSUM_VERIFICATION_ENABLED, true); serverProperties.put(PERSISTENCE_TYPE, PersistenceType.ROCKS_DB); serverProperties.setProperty(SERVER_DATABASE_SYNC_BYTES_INTERNAL_FOR_DEFERRED_WRITE_MODE, "300"); serverProperties.put( @@ -236,13 +235,13 @@ private void getPartitionForTopic( int server; int numSelectedServers = 0; for (server = 0; server < numServers; server++) { - VeniceServerWrapper _serverWrapper = clusterWrappers.get(colo).getVeniceServers().get(server); - if (_serverWrapper.getVeniceServer() + VeniceServerWrapper serverWrapper = clusterWrappers.get(colo).getVeniceServers().get(server); + if (serverWrapper.getVeniceServer() .getStorageService() .getStorageEngineRepository() .getLocalStorageEngine(topic) != null) { LOGGER.info("selected server is: {} in colo {}", server, colo); - TestVeniceServer testVeniceServer = _serverWrapper.getVeniceServer(); + TestVeniceServer testVeniceServer = serverWrapper.getVeniceServer(); StorageService storageService = testVeniceServer.getStorageService(); RocksDBStorageEngine rocksDBStorageEngine = (RocksDBStorageEngine) storageService.getStorageEngineRepository().getLocalStorageEngine(topic); @@ -250,7 +249,7 @@ private void getPartitionForTopic( assertEquals(rocksDBStorageEngine.getNumberOfPartitions(), NUMBER_OF_PARTITIONS); rocksDBStoragePartitions.get(colo) .add((ReplicationMetadataRocksDBStoragePartition) rocksDBStorageEngine.getPartitionOrThrow(PARTITION_ID)); - serverWrappers.get(colo).add(_serverWrapper); + serverWrappers.get(colo).add(serverWrapper); if (++numSelectedServers == NUMBER_OF_REPLICAS) { break; @@ -264,12 +263,11 @@ private void getPartitionForTopic( /** * This test include below steps: * 1. Batch Push data without EOP (100 keys) - * 2. Delete SST files (based on params) - * 3. restart servers - * 4. Validate whether the data is ingested - * 5. Incremental push data (10 keys (90-100 of the batch push)) - * 6. Validate whether the data is ingested - * 7. Validate whether all the data from RT is ingested to the new versions as well. + * 2. stop servers, delete SST files, start servers (based on params) + * 3. Validate whether the data is ingested + * 4. Incremental push data (10 keys (90-100 of the batch push)) + * 5. Validate whether the data is ingested + * 6. Validate whether all the data from RT is ingested to the new versions as well. */ @Test(timeOut = TEST_TIMEOUT, dataProvider = "Two-True-and-False", dataProviderClass = DataProviderUtils.class) public void testActiveActiveStoreWithRMDAndRestartServer(boolean deleteSSTFiles, boolean deleteRMDSSTFiles) @@ -352,12 +350,21 @@ public void testActiveActiveStoreWithRMDAndRestartServer(boolean deleteSSTFiles, } }); - // Delete the sst files to mimic how ingestExternalFile() moves them to RocksDB. + /** + * Mimic non-graceful shutdown of the servers in the midst of sst files being moved to RocksDB. + * 1. Stop the server + * 2. delete the SST files (Deleting before graceful shutdown will not help + * mimic this scenario as there will be a sync during the graceful shutdown) + * 3. start the server + */ // TBD: Deleting files and restarting servers from more than one colo and/or n-1 replicas // results in flaky tests/failures. LOGGER.info("Finished Ingestion of all data to SST Files: Delete the sst files"); for (int colo = 0; colo < 1; colo++) { for (int replica = 0; replica < 1; replica++) { + VeniceServerWrapper serverWrapper = serverWrappers.get(colo).get(replica); + clusterWrappers.get(colo).stopVeniceServer(serverWrapper.getPort()); + ReplicationMetadataRocksDBStoragePartition partition = rocksDBStoragePartitions.get(colo).get(replica); if (deleteSSTFiles) { partition.deleteFilesInDirectory(partition.getValueFullPathForTempSSTFileDir()); @@ -365,14 +372,7 @@ public void testActiveActiveStoreWithRMDAndRestartServer(boolean deleteSSTFiles, if (deleteRMDSSTFiles) { partition.deleteFilesInDirectory(partition.getFullPathForTempSSTFileDir()); } - } - } - for (int colo = 0; colo < 1; colo++) { - // Restart server - for (int replica = 0; replica < 1; replica++) { - VeniceServerWrapper serverWrapper = serverWrappers.get(colo).get(replica); - clusterWrappers.get(colo).stopVeniceServer(serverWrapper.getPort()); clusterWrappers.get(colo).restartVeniceServer(serverWrapper.getPort()); } }