diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestStuckConsumerRepair.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestStuckConsumerRepair.java index 10127a43c7..8ed72dc1c7 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestStuckConsumerRepair.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestStuckConsumerRepair.java @@ -79,7 +79,7 @@ public void setUp() { private static VeniceClusterWrapper setUpCluster() { Properties extraProperties = new Properties(); extraProperties.setProperty(DEFAULT_MAX_NUMBER_OF_PARTITIONS, "5"); - VeniceClusterWrapper cluster = ServiceFactory.getVeniceCluster(1, 0, 1, 2, 1000000, false, false, extraProperties); + VeniceClusterWrapper cluster = ServiceFactory.getVeniceCluster(1, 0, 1, 1, 1000000, false, false, extraProperties); // Add Venice Router Properties routerProperties = new Properties(); @@ -107,7 +107,6 @@ private static VeniceClusterWrapper setUpCluster() { SERVER_SHARED_CONSUMER_ASSIGNMENT_STRATEGY, KafkaConsumerService.ConsumerAssignmentStrategy.PARTITION_WISE_SHARED_CONSUMER_ASSIGNMENT_STRATEGY.name()); cluster.addVeniceServer(new Properties(), serverProperties); - cluster.addVeniceServer(new Properties(), serverProperties); return cluster; } @@ -132,6 +131,19 @@ private void checkLargeRecord(AvroGenericStoreClient client, int index) } } + /** + * This test verifies that the stuck consumer repair logic kicks in when the consumer is stuck. It does the following + * steps: + * 1. Set up a Venice cluster with 1 controller, 1 router, and 1 server. + * 2. Create a hybrid store with rewind seconds set to 120 seconds and offset lag threshold set to 2. + * 3. Run a VPJ job to push 100 records to the store. + * 4. Verify that the records are pushed successfully. + * 5. Write 10 streaming records to the store's rt topic. + * 6. Verify that the streaming records are pushed successfully by reading and verifying all of them. + * 7. Delete the v1 topic to simulate the producer stuck issue. + * 8. Write 80 more streaming records to the store's rt topic. Consumer thread will be blocked when processing them. + * 9. Verify that the stuck consumer repair logic kicks in and the stuck consumer is repaired. + */ @Test(timeOut = 120 * Time.MS_PER_SECOND) public void testStuckConsumerRepair() throws Exception { SystemProducer veniceProducer = null; @@ -195,19 +207,11 @@ public void testStuckConsumerRepair() throws Exception { sendCustomSizeStreamingRecord(veniceProducer, storeName, i, STREAMING_RECORD_SIZE); } - // Run one more VPJ - runVPJ(vpjProperties, 2, controllerClient); - - // Verify streaming record in second version - checkLargeRecord(client, 2); - assertEquals(client.get("19").get().toString(), "test_name_19"); - - for (int i = 10; i <= 20; i++) { - sendCustomSizeStreamingRecord(veniceProducer, storeName, i, STREAMING_RECORD_SIZE); - } - TestUtils.waitForNonDeterministicAssertion(15, TimeUnit.SECONDS, () -> { + TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, () -> { try { - checkLargeRecord(client, 19); + for (int i = 1; i <= 10; ++i) { + checkLargeRecord(client, i); + } } catch (Exception e) { throw new VeniceException(e); } @@ -217,23 +221,12 @@ public void testStuckConsumerRepair() throws Exception { String topicForV1 = Version.composeKafkaTopic(storeName, 1); topicManager.ensureTopicIsDeletedAndBlock(sharedVenice.getPubSubTopicRepository().getTopic(topicForV1)); LOGGER.info("Topic: {} has been deleted", topicForV1); - Utils.sleep(10000); // 10 seconds to let Kafka client get the topic deletion signal - // Start sending more streaming records + // Sending more streaming records, as it will block the consumer thread when processing them. for (int i = 20; i <= 100; i++) { sendCustomSizeStreamingRecord(veniceProducer, storeName, i, 1024); } - TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, () -> { - try { - for (int i = 20; i <= 100; ++i) { - checkLargeRecord(client, i); - } - } catch (Exception e) { - throw new VeniceException(e); - } - }); - // Verify that the stuck consumer repair logic does kick in List serverMetricRepos = new ArrayList<>(); for (VeniceServerWrapper server: venice.getVeniceServers()) {