Skip to content

Commit

Permalink
minor cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
m-nagarajan committed Nov 8, 2023
1 parent 5976b8b commit f1f4a0a
Showing 1 changed file with 45 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ public class TestRestartServerAfterDeletingSstFilesWithActiveActiveIngestion {
private static final int SOURCE_COLO = 0;
// non src colo to consume data from to validate kafka contents
private static final int NON_SOURCE_COLO = 1;
private List<VeniceClusterWrapper> clusterWrappers = new ArrayList<>(NUMBER_OF_COLOS);
private Map<Integer, List<VeniceServerWrapper>> serverWrappers = new HashMap<>(NUMBER_OF_COLOS);
private final List<VeniceClusterWrapper> clusterWrappers = new ArrayList<>(NUMBER_OF_COLOS);
private final Map<Integer, List<VeniceServerWrapper>> serverWrappers = new HashMap<>(NUMBER_OF_COLOS);
private ControllerClient parentControllerClient;
private AvroSerializer serializer;
private final int NUMBER_OF_KEYS = 100;
Expand All @@ -118,8 +118,7 @@ public class TestRestartServerAfterDeletingSstFilesWithActiveActiveIngestion {
private final String KEY_PREFIX = "key";
private final String VALUE_PREFIX = "value";
private final String VALUE_PREFIX_INC_PUSH = "value-inc";
private final String METADATA_PREFIX = "metadata";
private String storeName = Utils.getUniqueString("store");
private final String STORE_NAME = Utils.getUniqueString("store");
private final int numServers = 5;
List<Integer> allIncPushKeys = new ArrayList<>(); // all keys ingested via incremental push
List<Integer> allNonIncPushKeysUntilLastVersion = new ArrayList<>(); // all keys ingested only via batch push
Expand Down Expand Up @@ -167,12 +166,12 @@ public void setUp() throws Exception {
true,
VeniceUserStoreType.INCREMENTAL_PUSH.toString(),
Optional.empty()));
// create a active-active enabled store
// create an active-active enabled store
File inputDir = getTempDataDirectory();
Schema recordSchema = TestWriteUtils.writeSimpleAvroFileWithStringToStringSchema(inputDir);
String inputDirPath = "file:" + inputDir.getAbsolutePath();
Properties props =
IntegrationTestPushUtils.defaultVPJProps(multiRegionMultiClusterWrapper, inputDirPath, storeName);
IntegrationTestPushUtils.defaultVPJProps(multiRegionMultiClusterWrapper, inputDirPath, STORE_NAME);
String keySchemaStr = recordSchema.getField(DEFAULT_KEY_FIELD_PROP).schema().toString();
String valueSchemaStr = recordSchema.getField(DEFAULT_VALUE_FIELD_PROP).schema().toString();
UpdateStoreQueryParams storeParms = new UpdateStoreQueryParams().setActiveActiveReplicationEnabled(true)
Expand All @@ -188,7 +187,7 @@ public void setUp() throws Exception {

@AfterClass
public void cleanUp() {
parentControllerClient.disableAndDeleteStore(storeName);
parentControllerClient.disableAndDeleteStore(STORE_NAME);
multiRegionMultiClusterWrapper.close();
TestView.resetCounters();
}
Expand Down Expand Up @@ -220,6 +219,7 @@ private Map<byte[], Pair<byte[], byte[]>> generateInputWithMetadata(
}
for (int i = startIndex; i < endIndex; ++i) {
String value = isIncPush ? VALUE_PREFIX_INC_PUSH + i : VALUE_PREFIX + i;
String METADATA_PREFIX = "metadata";
String metadata = METADATA_PREFIX + i;
records.put(
serializer.serialize(KEY_PREFIX + i),
Expand Down Expand Up @@ -290,7 +290,7 @@ public void testActiveActiveStoreWithRMDAndRestartServer(boolean deleteSSTFiles,
VersionCreationResponse versionCreationResponse;
versionCreationResponse = TestUtils.assertCommand(
parentControllerClient.requestTopicForWrites(
storeName,
STORE_NAME,
1024 * 1024,
Version.PushType.BATCH,
System.currentTimeMillis() + "_test_server_restart_push",
Expand All @@ -307,7 +307,8 @@ public void testActiveActiveStoreWithRMDAndRestartServer(boolean deleteSSTFiles,
assertEquals(newVersion + 1, versionToBePushed);
newVersion = versionToBePushed;

String topic = versionCreationResponse.getKafkaTopic();
final String topic = versionCreationResponse.getKafkaTopic();
assertNotNull(topic);
PubSubBrokerWrapper pubSubBrokerWrapper = clusterWrappers.get(SOURCE_COLO).getPubSubBrokerWrapper();
PubSubProducerAdapterFactory pubSubProducerAdapterFactory =
pubSubBrokerWrapper.getPubSubClientsFactory().getProducerAdapterFactory();
Expand Down Expand Up @@ -338,35 +339,34 @@ public void testActiveActiveStoreWithRMDAndRestartServer(boolean deleteSSTFiles,
}

Map<Integer, List<ReplicationMetadataRocksDBStoragePartition>> rocksDBStoragePartitions = new HashMap<>();
String finalTopic = topic;
TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, () -> {
getPartitionForTopic(finalTopic, rocksDBStoragePartitions);
getPartitionForTopic(topic, rocksDBStoragePartitions);
for (int colo = 0; colo < NUMBER_OF_COLOS; colo++) {
for (int replica = 0; replica < 1; replica++) {
for (int replica = 0; replica < NUMBER_OF_REPLICAS; replica++) {
assertNotNull(rocksDBStoragePartitions.get(colo).get(replica).getValueRocksDBSstFileWriter());
assertNotNull(rocksDBStoragePartitions.get(colo).get(replica).getRocksDBSstFileWriter());
}
}
});

// verify the total number of records ingested
// TBD: setting colo < NUMBER_OF_COLOS and/or replica < NUMBER_OF_REPLICAS
// for the below operations results in flaky tests/failures.
TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> {
TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, () -> {
for (int colo = 0; colo < NUMBER_OF_COLOS; colo++) {
AtomicInteger totalIngestedKeys = new AtomicInteger();
AtomicInteger totalIngestedRMDKeys = new AtomicInteger();
for (int replica = 0; replica < 1; replica++) {
for (int replica = 0; replica < NUMBER_OF_REPLICAS; replica++) {
ReplicationMetadataRocksDBStoragePartition partition = rocksDBStoragePartitions.get(colo).get(replica);
totalIngestedKeys.addAndGet((int) partition.getValueRocksDBSstFileWriter().getRecordNumInAllSSTFiles());
totalIngestedRMDKeys.addAndGet((int) partition.getRocksDBSstFileWriter().getRecordNumInAllSSTFiles());
}
assertEquals(totalIngestedKeys.get(), NUMBER_OF_KEYS);
assertEquals(totalIngestedRMDKeys.get(), NUMBER_OF_KEYS);
assertEquals(totalIngestedKeys.get(), NUMBER_OF_KEYS * NUMBER_OF_REPLICAS);
assertEquals(totalIngestedRMDKeys.get(), NUMBER_OF_KEYS * NUMBER_OF_REPLICAS);
}
});

// Delete the sst files to mimic how ingestExternalFile() moves them to RocksDB.
// TBD: Deleting files and restarting servers from more than one colo and/or n-1 replicas
// results in flaky tests/failures.
LOGGER.info("Finished Ingestion of all data to SST Files: Delete the sst files");
for (int colo = 0; colo < 1; colo++) {
for (int replica = 0; replica < 1; replica++) {
Expand All @@ -393,23 +393,25 @@ public void testActiveActiveStoreWithRMDAndRestartServer(boolean deleteSSTFiles,
}

// Wait for push to be push completed.
String finalTopic1 = topic;
TestUtils.waitForNonDeterministicAssertion(120, TimeUnit.SECONDS, () -> {
for (int colo = 0; colo < NUMBER_OF_COLOS; colo++) {
assertEquals(
clusterWrappers.get(colo)
.getLeaderVeniceController()
.getVeniceAdmin()
.getOffLinePushStatus(clusterWrappers.get(colo).getClusterName(), finalTopic1)
.getOffLinePushStatus(clusterWrappers.get(colo).getClusterName(), topic)
.getExecutionStatus(),
ExecutionStatus.COMPLETED);
}
});

if (!deleteSSTFiles && !deleteRMDSSTFiles) {
// create a kafka consumer to check for the messages with leaderCompletedStatus.
// testing this for just 1 case should be enough as deleting sst and restarting
// servers doesn't affect these as we are consuming from kafka
/** creating a {@link pubSubConsumer} to check for the messages with
* {@link VENICE_LEADER_COMPLETION_STATUS_HEADER} header. Testing this for just 1 case
* should be enough as deleting sst and restarting servers doesn't affect the below generic
* test as we are consuming from kafka to see if the new header is found or not. More specific
* tests can be added for those cases if needed.
*/
Properties properties = new Properties();
properties.setProperty(
ConfigKeys.KAFKA_BOOTSTRAP_SERVERS,
Expand Down Expand Up @@ -455,7 +457,7 @@ public void testActiveActiveStoreWithRMDAndRestartServer(boolean deleteSSTFiles,
.getStore(
clusterWrappers.get(colo).getLeaderVeniceController().getControllerUrl(),
clusterWrappers.get(colo).getClusterName(),
storeName)
STORE_NAME)
.getStore()
.getCurrentVersion();
LOGGER.info("colo {} currentVersion {}, pushVersion {}", colo, currentVersion, newVersion);
Expand All @@ -473,23 +475,23 @@ public void testActiveActiveStoreWithRMDAndRestartServer(boolean deleteSSTFiles,
D2Client d2Client = D2TestUtils.getD2Client(clusterWrappers.get(NON_SOURCE_COLO).getZk().getAddress(), false);
D2ClientUtils.startClient(d2Client);
storeClient = ClientFactory.getAndStartGenericAvroClient(
ClientConfig.defaultGenericClientConfig(storeName)
ClientConfig.defaultGenericClientConfig(STORE_NAME)
.setForceClusterDiscoveryAtStartTime(true)
.setD2ServiceName(VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME)
.setD2Client(d2Client)
.setVeniceURL(clusterWrappers.get(NON_SOURCE_COLO).getRandomRouterURL())
.setSslFactory(SslUtils.getVeniceLocalSslFactory())
.setRetryOnAllErrors(true));

// 1. invalid keys: all the keys pushed before this version and not repushed via incremental push
// invalid keys: all the keys pushed before this version and not re pushed via incremental push
AvroGenericStoreClient<String, Object> finalStoreClient = storeClient;
TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> {
for (int key: allNonIncPushKeysUntilLastVersion) {
assertNull(finalStoreClient.get(KEY_PREFIX + key).get());
}
});

// 2. all valid keys
// all valid keys
currKey = startKey;
while (currKey < endKey) {
int finalCurrKey = currKey;
Expand All @@ -508,7 +510,7 @@ public void testActiveActiveStoreWithRMDAndRestartServer(boolean deleteSSTFiles,

String incPushVersion = System.currentTimeMillis() + "_test_inc_push";
versionCreationResponse = parentControllerClient.requestTopicForWrites(
storeName,
STORE_NAME,
1024 * 1024,
Version.PushType.INCREMENTAL,
incPushVersion,
Expand All @@ -521,13 +523,13 @@ public void testActiveActiveStoreWithRMDAndRestartServer(boolean deleteSSTFiles,
false,
-1);
assertFalse(versionCreationResponse.isError());
topic = versionCreationResponse.getKafkaTopic();
assertNotNull(topic);
String rtTopic = versionCreationResponse.getKafkaTopic();
assertNotNull(rtTopic);

// incremental push: the last 10 keys from the batch push
int incPushStartKey = startKey + 90;
try (VeniceWriter<byte[], byte[], byte[]> veniceWriter =
veniceWriterFactory.createVeniceWriter(new VeniceWriterOptions.Builder(topic).build())) {
veniceWriterFactory.createVeniceWriter(new VeniceWriterOptions.Builder(rtTopic).build())) {
veniceWriter.broadcastStartOfIncrementalPush(incPushVersion, new HashMap<>());

// generate and insert data into the new version
Expand All @@ -553,40 +555,40 @@ public void testActiveActiveStoreWithRMDAndRestartServer(boolean deleteSSTFiles,
D2Client d2Client = D2TestUtils.getD2Client(clusterWrappers.get(NON_SOURCE_COLO).getZk().getAddress(), false);
D2ClientUtils.startClient(d2Client);
storeClient = ClientFactory.getAndStartGenericAvroClient(
ClientConfig.defaultGenericClientConfig(storeName)
ClientConfig.defaultGenericClientConfig(STORE_NAME)
.setForceClusterDiscoveryAtStartTime(true)
.setD2ServiceName(VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME)
.setD2Client(d2Client)
.setVeniceURL(clusterWrappers.get(NON_SOURCE_COLO).getRandomRouterURL())
.setSslFactory(SslUtils.getVeniceLocalSslFactory())
.setRetryOnAllErrors(true));
// validate the ingested data
// 1. first 90 keys which should still have original data pushed via full push
// first 90 keys which should still have original data pushed via full push
currKey = startKey;
while (currKey < incPushStartKey) {
assertEquals(storeClient.get(KEY_PREFIX + currKey).get().toString(), VALUE_PREFIX + currKey);
currKey++;
}

// 2. last 10 keys should be from incremental push
// last 10 keys should be from incremental push
AvroGenericStoreClient<String, Object> finalStoreClient = storeClient;
while (currKey < endKey) {
int finalCurrKey = currKey;
AvroGenericStoreClient<String, Object> finalStoreClient1 = storeClient;
TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> {
assertEquals(
finalStoreClient1.get(KEY_PREFIX + finalCurrKey).get().toString(),
VALUE_PREFIX_INC_PUSH + finalCurrKey);
});
TestUtils.waitForNonDeterministicAssertion(
30,
TimeUnit.SECONDS,
() -> assertEquals(
finalStoreClient.get(KEY_PREFIX + finalCurrKey).get().toString(),
VALUE_PREFIX_INC_PUSH + finalCurrKey));
currKey++;
}

// also check all the incremental push data so far: New versions should get this from RT
// check setHybridRewindSeconds() config in setup
for (int key: allIncPushKeys) {
AvroGenericStoreClient<String, Object> finalStoreClient2 = storeClient;
TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> {
assertNotNull(finalStoreClient2.get(KEY_PREFIX + key).get());
assertEquals(finalStoreClient2.get(KEY_PREFIX + key).get().toString(), VALUE_PREFIX_INC_PUSH + key);
assertNotNull(finalStoreClient.get(KEY_PREFIX + key).get());
assertEquals(finalStoreClient.get(KEY_PREFIX + key).get().toString(), VALUE_PREFIX_INC_PUSH + key);
});
}
} finally {
Expand Down

0 comments on commit f1f4a0a

Please sign in to comment.