Skip to content

Commit

Permalink
[test] Stabilize the flaky test 'testStuckConsumerRepair' (#841)
Browse files Browse the repository at this point in the history
This change aims to stabilize the flaky test 'testStuckConsumerRepair' by the following modifications:

1. Reduce the number of venice servers in cluster from 2 to 1.
2. Avoid pushing the 2nd data version as it will achieve the same testing purpose.
  • Loading branch information
lluwm authored Jan 31, 2024
1 parent 9f00526 commit 6131ed6
Showing 1 changed file with 19 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void setUp() {
private static VeniceClusterWrapper setUpCluster() {
Properties extraProperties = new Properties();
extraProperties.setProperty(DEFAULT_MAX_NUMBER_OF_PARTITIONS, "5");
VeniceClusterWrapper cluster = ServiceFactory.getVeniceCluster(1, 0, 1, 2, 1000000, false, false, extraProperties);
VeniceClusterWrapper cluster = ServiceFactory.getVeniceCluster(1, 0, 1, 1, 1000000, false, false, extraProperties);

// Add Venice Router
Properties routerProperties = new Properties();
Expand Down Expand Up @@ -107,7 +107,6 @@ private static VeniceClusterWrapper setUpCluster() {
SERVER_SHARED_CONSUMER_ASSIGNMENT_STRATEGY,
KafkaConsumerService.ConsumerAssignmentStrategy.PARTITION_WISE_SHARED_CONSUMER_ASSIGNMENT_STRATEGY.name());
cluster.addVeniceServer(new Properties(), serverProperties);
cluster.addVeniceServer(new Properties(), serverProperties);

return cluster;
}
Expand All @@ -132,6 +131,19 @@ private void checkLargeRecord(AvroGenericStoreClient client, int index)
}
}

/**
* This test verifies that the stuck consumer repair logic kicks in when the consumer is stuck. It does the following
* steps:
* 1. Set up a Venice cluster with 1 controller, 1 router, and 1 server.
* 2. Create a hybrid store with rewind seconds set to 120 seconds and offset lag threshold set to 2.
* 3. Run a VPJ job to push 100 records to the store.
* 4. Verify that the records are pushed successfully.
* 5. Write 10 streaming records to the store's rt topic.
* 6. Verify that the streaming records are pushed successfully by reading and verifying all of them.
* 7. Delete the v1 topic to simulate the producer stuck issue.
* 8. Write 80 more streaming records to the store's rt topic. Consumer thread will be blocked when processing them.
* 9. Verify that the stuck consumer repair logic kicks in and the stuck consumer is repaired.
*/
@Test(timeOut = 120 * Time.MS_PER_SECOND)
public void testStuckConsumerRepair() throws Exception {
SystemProducer veniceProducer = null;
Expand Down Expand Up @@ -195,19 +207,11 @@ public void testStuckConsumerRepair() throws Exception {
sendCustomSizeStreamingRecord(veniceProducer, storeName, i, STREAMING_RECORD_SIZE);
}

// Run one more VPJ
runVPJ(vpjProperties, 2, controllerClient);

// Verify streaming record in second version
checkLargeRecord(client, 2);
assertEquals(client.get("19").get().toString(), "test_name_19");

for (int i = 10; i <= 20; i++) {
sendCustomSizeStreamingRecord(veniceProducer, storeName, i, STREAMING_RECORD_SIZE);
}
TestUtils.waitForNonDeterministicAssertion(15, TimeUnit.SECONDS, () -> {
TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, () -> {
try {
checkLargeRecord(client, 19);
for (int i = 1; i <= 10; ++i) {
checkLargeRecord(client, i);
}
} catch (Exception e) {
throw new VeniceException(e);
}
Expand All @@ -217,23 +221,12 @@ public void testStuckConsumerRepair() throws Exception {
String topicForV1 = Version.composeKafkaTopic(storeName, 1);
topicManager.ensureTopicIsDeletedAndBlock(sharedVenice.getPubSubTopicRepository().getTopic(topicForV1));
LOGGER.info("Topic: {} has been deleted", topicForV1);
Utils.sleep(10000); // 10 seconds to let Kafka client get the topic deletion signal

// Start sending more streaming records
// Sending more streaming records, as it will block the consumer thread when processing them.
for (int i = 20; i <= 100; i++) {
sendCustomSizeStreamingRecord(veniceProducer, storeName, i, 1024);
}

TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, () -> {
try {
for (int i = 20; i <= 100; ++i) {
checkLargeRecord(client, i);
}
} catch (Exception e) {
throw new VeniceException(e);
}
});

// Verify that the stuck consumer repair logic does kick in
List<MetricsRepository> serverMetricRepos = new ArrayList<>();
for (VeniceServerWrapper server: venice.getVeniceServers()) {
Expand Down

0 comments on commit 6131ed6

Please sign in to comment.