Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[test] fix restartServerByDeletingSSTFiles tests with checksum enabled #825

Merged
merged 2 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ public class PartitionConsumptionState {

private String leaderHostId;

/** whether the ingestion of current partition is deferred-write. */
/**
* whether the ingestion of current partition is deferred-write.
* Refer {@link com.linkedin.davinci.store.rocksdb.RocksDBStoragePartition#deferredWrite}
*/
private boolean deferredWrite;
private boolean errorReported;
private boolean lagCaughtUp;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
public class StoragePartitionConfig {
private final String storeName;
private final int partitionId;
/**
* Refer {@link com.linkedin.davinci.store.rocksdb.RocksDBStoragePartition#deferredWrite}
*/
private boolean deferredWrite;
private boolean readOnly;
private boolean writeOnlyConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,10 @@ public void close() {
}
}

/**
* Closes currentSSTFileWriter, update lastCheckPointedSSTFileNum with the current SST file number,
* validates checksum on this SST file and return updated checkpointingInfo with this lastCheckPointedSSTFileNum.
*/
public Map<String, String> sync() {
try {
/**
Expand Down Expand Up @@ -352,13 +356,13 @@ private String composeFullPathForSSTFile(int sstFileNo) {
* This function calculates checksum of all the key/value pair stored in the input sstFilePath. It then
* verifies if the checksum matches with the input checksumToMatch and return the result.
* A SstFileReader handle is used to perform bulk scan through the entire SST file. fillCache option is
* explicitely disabled to not pollute the rocksdb internal block caches. And also implicit checksum verification
* explicitly disabled to not pollute the rocksdb internal block caches. And also implicit checksum verification
* is disabled to reduce latency of the entire operation.
*
* @param sstFilePath the full absolute path of the SST file
* @param expectedRecordNumInSSTFile expected number of key/value pairs in the SST File
* @param checksumToMatch pre-calculated checksum to match against.
* @return true if the the sstFile checksum matches with the provided checksum.
* @return true if the sstFile checksum matches with the provided checksum.
*/
private boolean verifyChecksum(String sstFilePath, long expectedRecordNumInSSTFile, byte[] checksumToMatch) {
SstFileReader sstFileReader = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,10 @@ public class RocksDBStoragePartition extends AbstractStoragePartition {
private final RocksDBStorageEngineFactory factory;
private final RocksDBThrottler rocksDBThrottler;
/**
* Whether the input is sorted or not.
* Whether the input is sorted or not. <br>
* deferredWrite = sortedInput => ingested via batch push which is sorted in VPJ, can use {@link RocksDBSstFileWriter} to ingest
* the input data to RocksDB <br>
* !deferredWrite = !sortedInput => can not use RocksDBSstFileWriter for ingestion
*/
protected final boolean deferredWrite;

Expand Down Expand Up @@ -739,9 +742,8 @@ public synchronized Map<String, String> sync() {
LOGGER.debug("Unexpected sync in RocksDB read-only mode");
} else {
try {
// Since Venice RocksDB database disables WAL, flush will be triggered for every 'sync' to avoid data loss
// during
// crash recovery
// Since Venice RocksDB database disables WAL, flush will be triggered for every 'sync' to
// avoid data loss during crash recovery
rocksDB.flush(WAIT_FOR_FLUSH_OPTIONS, columnFamilyHandleList);
} catch (RocksDBException e) {
checkAndThrowMemoryLimitException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,7 @@ public void setUp() throws VeniceClientException {
serverProperties.put(PERSISTENCE_TYPE, PersistenceType.ROCKS_DB);
serverProperties.put(SERVER_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS, Long.toString(1L));
serverProperties.setProperty(ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED, "false");
// Has to disable checksum verification, otherwise it will fail when deleteSSTFiles is true.
serverProperties.setProperty(SERVER_DATABASE_CHECKSUM_VERIFICATION_ENABLED, "false");
serverProperties.setProperty(SERVER_DATABASE_CHECKSUM_VERIFICATION_ENABLED, "true");
serverProperties.setProperty(SERVER_DATABASE_SYNC_BYTES_INTERNAL_FOR_DEFERRED_WRITE_MODE, "300");
serverProperties.setProperty(DATA_BASE_PATH, Utils.getTempDataDirectory().getAbsolutePath());
veniceCluster.addVeniceServer(new Properties(), serverProperties);
Expand Down Expand Up @@ -135,7 +134,7 @@ public void testWithServerRestart() throws Exception {
VeniceServerWrapper server = veniceCluster.getVeniceServers().get(0);

// restart the venice servers: Mimic Process crash and restart
restartVeniceServer(server);
restartServerByDeletingSSTFiles(server, false, null);

endIngestion();

Expand Down Expand Up @@ -174,22 +173,7 @@ public void testWithServerRestartWithDeletedSSTFiles() throws Exception {
Assert.assertEquals(totalIngestedKeys, numKeys);
});

// Delete the sst files to mimic how ingestExternalFile() moves them to RocksDB.
LOGGER.info("Finished Ingestion of all data to SST Files: Delete the sst files");
rocksDBStorageEngine.getPartitionIds().forEach(id -> {
ReadWriteLock rwLock = rocksDBStorageEngine.getRWLockForPartitionOrThrow(id);
try {
rwLock.writeLock().lock();
RocksDBStoragePartition partition = (RocksDBStoragePartition) rocksDBStorageEngine.getPartitionOrThrow(id);
partition.deleteFilesInDirectory(partition.getFullPathForTempSSTFileDir());
} finally {
rwLock.writeLock().unlock();
}
});

// restart the venice servers: Mimic Process crash and restart after ingestExternalFile()
// completes but before EOP was synced to OffsetRecord
restartVeniceServer(server);
restartServerByDeletingSSTFiles(server, true, rocksDBStorageEngine);

endIngestion();
verifyIngestion();
Expand Down Expand Up @@ -254,8 +238,36 @@ private void verifyIngestion() throws ExecutionException, InterruptedException {
}
}

private void restartVeniceServer(VeniceServerWrapper server) {
/**
* Mimic non-graceful shutdown of the servers in the midst of sst files being moved to RocksDB: Mimic process crash
* and restart after ingestExternalFile() completes but before EOP was synced to OffsetRecord
*
* 1. Stop the server
* 2. delete the SST files (Deleting before graceful shutdown will not help
* mimic this scenario as there will be a sync during the graceful shutdown)
* 3. start the server
*/
private void restartServerByDeletingSSTFiles(
VeniceServerWrapper server,
boolean deleteSSTFiles,
RocksDBStorageEngine rocksDBStorageEngine) {
veniceCluster.stopVeniceServer(server.getPort());

if (deleteSSTFiles) {
// Delete the sst files to mimic how ingestExternalFile() moves them to RocksDB.
LOGGER.info("Delete the sst files");
rocksDBStorageEngine.getPartitionIds().forEach(id -> {
ReadWriteLock rwLock = rocksDBStorageEngine.getRWLockForPartitionOrThrow(id);
try {
rwLock.writeLock().lock();
RocksDBStoragePartition partition = (RocksDBStoragePartition) rocksDBStorageEngine.getPartitionOrThrow(id);
partition.deleteFilesInDirectory(partition.getFullPathForTempSSTFileDir());
} finally {
rwLock.writeLock().unlock();
}
});
}

veniceCluster.restartVeniceServer(server.getPort());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,7 @@ public void setUp() throws Exception {
Properties serverProperties = new Properties();
serverProperties.setProperty(ConfigKeys.SERVER_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS, Long.toString(1));
serverProperties.put(ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED, false);
// Has to disable checksum verification, otherwise it will fail when deleteSSTFiles is true.
serverProperties.put(SERVER_DATABASE_CHECKSUM_VERIFICATION_ENABLED, false);
serverProperties.put(SERVER_DATABASE_CHECKSUM_VERIFICATION_ENABLED, true);
serverProperties.put(PERSISTENCE_TYPE, PersistenceType.ROCKS_DB);
serverProperties.setProperty(SERVER_DATABASE_SYNC_BYTES_INTERNAL_FOR_DEFERRED_WRITE_MODE, "300");
serverProperties.put(
Expand Down Expand Up @@ -236,21 +235,21 @@ private void getPartitionForTopic(
int server;
int numSelectedServers = 0;
for (server = 0; server < numServers; server++) {
VeniceServerWrapper _serverWrapper = clusterWrappers.get(colo).getVeniceServers().get(server);
if (_serverWrapper.getVeniceServer()
VeniceServerWrapper serverWrapper = clusterWrappers.get(colo).getVeniceServers().get(server);
if (serverWrapper.getVeniceServer()
.getStorageService()
.getStorageEngineRepository()
.getLocalStorageEngine(topic) != null) {
LOGGER.info("selected server is: {} in colo {}", server, colo);
TestVeniceServer testVeniceServer = _serverWrapper.getVeniceServer();
TestVeniceServer testVeniceServer = serverWrapper.getVeniceServer();
StorageService storageService = testVeniceServer.getStorageService();
RocksDBStorageEngine rocksDBStorageEngine =
(RocksDBStorageEngine) storageService.getStorageEngineRepository().getLocalStorageEngine(topic);
assertNotNull(rocksDBStorageEngine);
assertEquals(rocksDBStorageEngine.getNumberOfPartitions(), NUMBER_OF_PARTITIONS);
rocksDBStoragePartitions.get(colo)
.add((ReplicationMetadataRocksDBStoragePartition) rocksDBStorageEngine.getPartitionOrThrow(PARTITION_ID));
serverWrappers.get(colo).add(_serverWrapper);
serverWrappers.get(colo).add(serverWrapper);

if (++numSelectedServers == NUMBER_OF_REPLICAS) {
break;
Expand All @@ -264,12 +263,11 @@ private void getPartitionForTopic(
/**
* This test include below steps:
* 1. Batch Push data without EOP (100 keys)
* 2. Delete SST files (based on params)
* 3. restart servers
* 4. Validate whether the data is ingested
* 5. Incremental push data (10 keys (90-100 of the batch push))
* 6. Validate whether the data is ingested
* 7. Validate whether all the data from RT is ingested to the new versions as well.
* 2. stop servers, delete SST files, start servers (based on params)
* 3. Validate whether the data is ingested
* 4. Incremental push data (10 keys (90-100 of the batch push))
* 5. Validate whether the data is ingested
* 6. Validate whether all the data from RT is ingested to the new versions as well.
*/
@Test(timeOut = TEST_TIMEOUT, dataProvider = "Two-True-and-False", dataProviderClass = DataProviderUtils.class)
public void testActiveActiveStoreWithRMDAndRestartServer(boolean deleteSSTFiles, boolean deleteRMDSSTFiles)
Expand Down Expand Up @@ -352,27 +350,29 @@ public void testActiveActiveStoreWithRMDAndRestartServer(boolean deleteSSTFiles,
}
});

// Delete the sst files to mimic how ingestExternalFile() moves them to RocksDB.
/**
* Mimic non-graceful shutdown of the servers in the midst of sst files being moved to RocksDB.
* 1. Stop the server
* 2. delete the SST files (Deleting before graceful shutdown will not help
* mimic this scenario as there will be a sync during the graceful shutdown)
* 3. start the server
*/
// TBD: Deleting files and restarting servers from more than one colo and/or n-1 replicas
// results in flaky tests/failures.
LOGGER.info("Finished Ingestion of all data to SST Files: Delete the sst files");
for (int colo = 0; colo < 1; colo++) {
for (int replica = 0; replica < 1; replica++) {
VeniceServerWrapper serverWrapper = serverWrappers.get(colo).get(replica);
clusterWrappers.get(colo).stopVeniceServer(serverWrapper.getPort());

ReplicationMetadataRocksDBStoragePartition partition = rocksDBStoragePartitions.get(colo).get(replica);
if (deleteSSTFiles) {
partition.deleteFilesInDirectory(partition.getValueFullPathForTempSSTFileDir());
}
if (deleteRMDSSTFiles) {
partition.deleteFilesInDirectory(partition.getFullPathForTempSSTFileDir());
}
}
}

for (int colo = 0; colo < 1; colo++) {
// Restart server
for (int replica = 0; replica < 1; replica++) {
VeniceServerWrapper serverWrapper = serverWrappers.get(colo).get(replica);
clusterWrappers.get(colo).stopVeniceServer(serverWrapper.getPort());
clusterWrappers.get(colo).restartVeniceServer(serverWrapper.getPort());
}
}
Expand Down