Skip to content

Commit

Permalink
[test] fix restartServerByDeletingSSTFiles tests with checksum enabled (
Browse files Browse the repository at this point in the history
#825)

This test originally had the following flow:
1. Delete the SST files.
2. Restart the servers to simulate a crash and restart in the midst of SST file move.

However, this flow introduced an issue when checksum was enabled: Graceful shutdown of the server after SST file deletion triggers a final offset sync, which throws an exception when files are missing during checksum calculations.

Fix:
1. Modified the test to: Stop the server -> Delete the SST files -> Start the server.
2. Added some unit test around sync and checksum calculation.
  • Loading branch information
m-nagarajan authored Feb 1, 2024
1 parent 25874b6 commit 4f11a4a
Show file tree
Hide file tree
Showing 7 changed files with 200 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ public class PartitionConsumptionState {

private String leaderHostId;

/** whether the ingestion of current partition is deferred-write. */
/**
* whether the ingestion of current partition is deferred-write.
* Refer {@link com.linkedin.davinci.store.rocksdb.RocksDBStoragePartition#deferredWrite}
*/
private boolean deferredWrite;
private boolean errorReported;
private boolean lagCaughtUp;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
public class StoragePartitionConfig {
private final String storeName;
private final int partitionId;
/**
* Refer {@link com.linkedin.davinci.store.rocksdb.RocksDBStoragePartition#deferredWrite}
*/
private boolean deferredWrite;
private boolean readOnly;
private boolean writeOnlyConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,10 @@ public void close() {
}
}

/**
* Closes currentSSTFileWriter, update lastCheckPointedSSTFileNum with the current SST file number,
* validates checksum on this SST file and return updated checkpointingInfo with this lastCheckPointedSSTFileNum.
*/
public Map<String, String> sync() {
try {
/**
Expand Down Expand Up @@ -352,13 +356,13 @@ private String composeFullPathForSSTFile(int sstFileNo) {
* This function calculates checksum of all the key/value pair stored in the input sstFilePath. It then
* verifies if the checksum matches with the input checksumToMatch and return the result.
* A SstFileReader handle is used to perform bulk scan through the entire SST file. fillCache option is
* explicitely disabled to not pollute the rocksdb internal block caches. And also implicit checksum verification
* explicitly disabled to not pollute the rocksdb internal block caches. And also implicit checksum verification
* is disabled to reduce latency of the entire operation.
*
* @param sstFilePath the full absolute path of the SST file
* @param expectedRecordNumInSSTFile expected number of key/value pairs in the SST File
* @param checksumToMatch pre-calculated checksum to match against.
* @return true if the the sstFile checksum matches with the provided checksum.
* @return true if the sstFile checksum matches with the provided checksum.
*/
private boolean verifyChecksum(String sstFilePath, long expectedRecordNumInSSTFile, byte[] checksumToMatch) {
SstFileReader sstFileReader = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,10 @@ public class RocksDBStoragePartition extends AbstractStoragePartition {
private final RocksDBStorageEngineFactory factory;
private final RocksDBThrottler rocksDBThrottler;
/**
* Whether the input is sorted or not.
* Whether the input is sorted or not. <br>
* deferredWrite = sortedInput => ingested via batch push which is sorted in VPJ, can use {@link RocksDBSstFileWriter} to ingest
* the input data to RocksDB <br>
* !deferredWrite = !sortedInput => can not use RocksDBSstFileWriter for ingestion
*/
protected final boolean deferredWrite;

Expand Down Expand Up @@ -739,9 +742,8 @@ public synchronized Map<String, String> sync() {
LOGGER.debug("Unexpected sync in RocksDB read-only mode");
} else {
try {
// Since Venice RocksDB database disables WAL, flush will be triggered for every 'sync' to avoid data loss
// during
// crash recovery
// Since Venice RocksDB database disables WAL, flush will be triggered for every 'sync' to
// avoid data loss during crash recovery
rocksDB.flush(WAIT_FOR_FLUSH_OPTIONS, columnFamilyHandleList);
} catch (RocksDBException e) {
checkAndThrowMemoryLimitException(e);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
package com.linkedin.davinci.store.rocksdb;

import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.kafka.validation.checksum.CheckSum;
import com.linkedin.venice.kafka.validation.checksum.CheckSumType;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.apache.commons.io.FileUtils;
import org.rocksdb.EnvOptions;
import org.rocksdb.Options;
import org.rocksdb.RocksDBException;
import org.testng.Assert;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -232,7 +236,7 @@ public void testOpenWithMissingFile() throws IOException {
}

@Test
public void testOpenWithAllValidFiles() throws IOException {
public void testOpenAndSyncWithAllValidFiles() throws IOException {
RocksDBSstFileWriter rocksDBSstFileWriter = null;
try {
rocksDBSstFileWriter = new RocksDBSstFileWriter(
Expand All @@ -255,6 +259,124 @@ public void testOpenWithAllValidFiles() throws IOException {
// check that only 7 files exists (0-5 after cleanup by checkDatabaseIntegrity)
// and 6 after rocksDBSstFileWriter.open opens a new file
Assert.assertEquals(getNumberOfFilesInTempDirectory(), 7);
rocksDBSstFileWriter.sync();
} finally {
if (rocksDBSstFileWriter != null) {
rocksDBSstFileWriter.close();
}
deleteTempDatabaseDir();
}
}

@Test
public void testSyncWithCorrectChecksum() throws IOException, RocksDBException {
RocksDBSstFileWriter rocksDBSstFileWriter = null;
try {
rocksDBSstFileWriter = new RocksDBSstFileWriter(
STORE_NAME,
PARTITION_ID,
"",
new EnvOptions(),
new Options(),
DB_DIR,
IS_RMD,
ROCKS_DB_SERVER_CONFIG);
Map<String, String> checkpointedInfo = new HashMap<>();

// Checkpoint that 1 sst file should be found
checkpointedInfo.put(rocksDBSstFileWriter.getLastCheckPointedSSTFileNum(), "0");
// create 5 sst file
createSstFiles(5);

rocksDBSstFileWriter.open(checkpointedInfo, Optional.of(() -> {
CheckSum sstFileFinalCheckSum = CheckSum.getInstance(CheckSumType.MD5);
sstFileFinalCheckSum.update("key".getBytes());
sstFileFinalCheckSum.update("value".getBytes());
return sstFileFinalCheckSum.getCheckSum();
}));
// check that only 1 files exists ("0" after cleanup by checkDatabaseIntegrity)
// and "1" after rocksDBSstFileWriter.open opens a new file
Assert.assertEquals(getNumberOfFilesInTempDirectory(), 2);
rocksDBSstFileWriter.put("key".getBytes(), ByteBuffer.wrap("value".getBytes()));
// call sync to verify checksum
rocksDBSstFileWriter.sync();
} finally {
if (rocksDBSstFileWriter != null) {
rocksDBSstFileWriter.close();
}
deleteTempDatabaseDir();
}
}

@Test(expectedExceptions = VeniceException.class, expectedExceptionsMessageRegExp = "verifyChecksum: failure. last sstFile checksum didn't match for store.*")
public void testSyncWithInCorrectChecksum() throws IOException, RocksDBException {
RocksDBSstFileWriter rocksDBSstFileWriter = null;
try {
rocksDBSstFileWriter = new RocksDBSstFileWriter(
STORE_NAME,
PARTITION_ID,
"",
new EnvOptions(),
new Options(),
DB_DIR,
IS_RMD,
ROCKS_DB_SERVER_CONFIG);
Map<String, String> checkpointedInfo = new HashMap<>();

// Checkpoint that 1 sst file should be found
checkpointedInfo.put(rocksDBSstFileWriter.getLastCheckPointedSSTFileNum(), "0");
// create 10 sst file
createSstFiles(5);

rocksDBSstFileWriter.open(checkpointedInfo, Optional.of(() -> {
CheckSum sstFileFinalCheckSum = CheckSum.getInstance(CheckSumType.MD5);
sstFileFinalCheckSum.update("wrong_key".getBytes());
sstFileFinalCheckSum.update("wrong_value".getBytes());
return sstFileFinalCheckSum.getCheckSum();
}));
// check that only 2 files exists ("0" after cleanup by checkDatabaseIntegrity)
// and "1" after rocksDBSstFileWriter.open opens a new file
Assert.assertEquals(getNumberOfFilesInTempDirectory(), 2);
rocksDBSstFileWriter.put("key".getBytes(), ByteBuffer.wrap("value".getBytes()));
// call sync to verify checksum
rocksDBSstFileWriter.sync();
} finally {
if (rocksDBSstFileWriter != null) {
rocksDBSstFileWriter.close();
}
deleteTempDatabaseDir();
}
}

@Test(expectedExceptions = VeniceException.class, expectedExceptionsMessageRegExp = "Checksum mismatch in SST files.*")
public void testSyncWithMissingFile() throws IOException, RocksDBException {
RocksDBSstFileWriter rocksDBSstFileWriter = null;
try {
rocksDBSstFileWriter = new RocksDBSstFileWriter(
STORE_NAME,
PARTITION_ID,
"",
new EnvOptions(),
new Options(),
DB_DIR,
IS_RMD,
ROCKS_DB_SERVER_CONFIG);
Map<String, String> checkpointedInfo = new HashMap<>();

// Checkpoint that 6 sst file should be found
checkpointedInfo.put(rocksDBSstFileWriter.getLastCheckPointedSSTFileNum(), "5");
// create 10 sst file
createSstFiles(10);

rocksDBSstFileWriter.open(checkpointedInfo, Optional.of(() -> "1".getBytes()));
// check that only 7 files exists (0-5 after cleanup by checkDatabaseIntegrity)
// and 6 after rocksDBSstFileWriter.open opens a new file
Assert.assertEquals(getNumberOfFilesInTempDirectory(), 7);
rocksDBSstFileWriter.put("key".getBytes(), ByteBuffer.wrap("value".getBytes()));
deleteAllSstFiles(getNumberOfFilesInTempDirectory());
// call sync after deleting all the files to mimic exception thrown during graceful shutdown of servers with
// missing sst files
rocksDBSstFileWriter.sync();
} finally {
if (rocksDBSstFileWriter != null) {
rocksDBSstFileWriter.close();
Expand Down Expand Up @@ -286,6 +408,12 @@ private void deleteSstFile(int fileNumber) throws IOException {
FileUtils.delete(new File(DB_DIR + "/sst_file_" + fileNumber));
}

private void deleteAllSstFiles(int maxFileNumber) throws IOException {
for (int i = 0; i < maxFileNumber; i++) {
FileUtils.delete(new File(DB_DIR + "/sst_file_" + i));
}
}

private void createSstFiles(int numberOfFiles) throws IOException {
getTempDatabaseDir();
for (int i = 0; i < numberOfFiles; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,7 @@ public void setUp() throws VeniceClientException {
serverProperties.put(PERSISTENCE_TYPE, PersistenceType.ROCKS_DB);
serverProperties.put(SERVER_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS, Long.toString(1L));
serverProperties.setProperty(ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED, "false");
// Has to disable checksum verification, otherwise it will fail when deleteSSTFiles is true.
serverProperties.setProperty(SERVER_DATABASE_CHECKSUM_VERIFICATION_ENABLED, "false");
serverProperties.setProperty(SERVER_DATABASE_CHECKSUM_VERIFICATION_ENABLED, "true");
serverProperties.setProperty(SERVER_DATABASE_SYNC_BYTES_INTERNAL_FOR_DEFERRED_WRITE_MODE, "300");
serverProperties.setProperty(DATA_BASE_PATH, Utils.getTempDataDirectory().getAbsolutePath());
veniceCluster.addVeniceServer(new Properties(), serverProperties);
Expand Down Expand Up @@ -135,7 +134,7 @@ public void testWithServerRestart() throws Exception {
VeniceServerWrapper server = veniceCluster.getVeniceServers().get(0);

// restart the venice servers: Mimic Process crash and restart
restartVeniceServer(server);
restartServerByDeletingSSTFiles(server, false, null);

endIngestion();

Expand Down Expand Up @@ -174,22 +173,7 @@ public void testWithServerRestartWithDeletedSSTFiles() throws Exception {
Assert.assertEquals(totalIngestedKeys, numKeys);
});

// Delete the sst files to mimic how ingestExternalFile() moves them to RocksDB.
LOGGER.info("Finished Ingestion of all data to SST Files: Delete the sst files");
rocksDBStorageEngine.getPartitionIds().forEach(id -> {
ReadWriteLock rwLock = rocksDBStorageEngine.getRWLockForPartitionOrThrow(id);
try {
rwLock.writeLock().lock();
RocksDBStoragePartition partition = (RocksDBStoragePartition) rocksDBStorageEngine.getPartitionOrThrow(id);
partition.deleteFilesInDirectory(partition.getFullPathForTempSSTFileDir());
} finally {
rwLock.writeLock().unlock();
}
});

// restart the venice servers: Mimic Process crash and restart after ingestExternalFile()
// completes but before EOP was synced to OffsetRecord
restartVeniceServer(server);
restartServerByDeletingSSTFiles(server, true, rocksDBStorageEngine);

endIngestion();
verifyIngestion();
Expand Down Expand Up @@ -254,8 +238,36 @@ private void verifyIngestion() throws ExecutionException, InterruptedException {
}
}

private void restartVeniceServer(VeniceServerWrapper server) {
/**
* Mimic non-graceful shutdown of the servers in the midst of sst files being moved to RocksDB: Mimic process crash
* and restart after ingestExternalFile() completes but before EOP was synced to OffsetRecord
*
* 1. Stop the server
* 2. delete the SST files (Deleting before graceful shutdown will not help
* mimic this scenario as there will be a sync during the graceful shutdown)
* 3. start the server
*/
private void restartServerByDeletingSSTFiles(
VeniceServerWrapper server,
boolean deleteSSTFiles,
RocksDBStorageEngine rocksDBStorageEngine) {
veniceCluster.stopVeniceServer(server.getPort());

if (deleteSSTFiles) {
// Delete the sst files to mimic how ingestExternalFile() moves them to RocksDB.
LOGGER.info("Delete the sst files");
rocksDBStorageEngine.getPartitionIds().forEach(id -> {
ReadWriteLock rwLock = rocksDBStorageEngine.getRWLockForPartitionOrThrow(id);
try {
rwLock.writeLock().lock();
RocksDBStoragePartition partition = (RocksDBStoragePartition) rocksDBStorageEngine.getPartitionOrThrow(id);
partition.deleteFilesInDirectory(partition.getFullPathForTempSSTFileDir());
} finally {
rwLock.writeLock().unlock();
}
});
}

veniceCluster.restartVeniceServer(server.getPort());
}
}
Loading

0 comments on commit 4f11a4a

Please sign in to comment.