diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java index 420d681dce..b7734cd84f 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java @@ -4252,7 +4252,8 @@ public void testBatchOnlyStoreDataRecovery() { } /** - * Tests that the following scenario no longer occurs after adding a lock on {@link LeaderFollowerStateType}: + * Simulate the consumer thread processing a batch of polled messages while another thread modifies the leader-follower + * state in PCS. The following scenario must no longer occur after adding a lock on {@link LeaderFollowerStateType}: * 1. Consumer thread calls {@link StoreIngestionTask#produceToStoreBufferServiceOrKafka} to process polled messages * 2. Execution reaches {@link LeaderFollowerStoreIngestionTask#shouldProcessRecord}, which compares the message's * topic with the topic that should be consumed from, according to {@link PartitionConsumptionState} @@ -4271,11 +4272,19 @@ public void testBatchOnlyStoreDataRecovery() { */ @Test(dataProvider = "aaConfigProvider") public void testShouldProcessRecord(AAConfig aaConfig) throws Exception { + // Create a batch of polled messages for the consumer thread to process. Only one message is necessary PubSubTopicPartition topicPartition = new PubSubTopicPartitionImpl(pubSubTopic, PARTITION_FOO); PubSubHelper.MutablePubSubMessage pubSubMessage = PubSubHelper.getDummyPubSubMessage(false); pubSubMessage.setTopicPartition(topicPartition); List> consumedMessages = Arrays.asList(pubSubMessage); + + // Set the RT topic stuff on the PCS along with the leader state to eliminate the scary exceptions due to test setup PubSubTopic rtTopic = pubSubTopicRepository.getTopic(Version.composeRealTimeTopic(storeNameWithoutVersionInfo)); + TopicSwitch topicSwitch = new TopicSwitch(); + topicSwitch.sourceKafkaServers = Arrays.asList(inMemoryRemoteKafkaBroker.getKafkaBootstrapServer()); + topicSwitch.sourceTopicName = rtTopic.getName(); + TopicSwitchWrapper topicSwitchWrapper = + new TopicSwitchWrapper(topicSwitch, pubSubTopicRepository.getTopic(topicSwitch.sourceTopicName.toString())); runTest(Collections.singleton(PARTITION_FOO), () -> { TestUtils.waitForNonDeterministicAssertion( @@ -4288,8 +4297,8 @@ public void testShouldProcessRecord(AAConfig aaConfig) throws Exception { Thread stateTransitionThread = new Thread(() -> { /* - * Waits for the main test thread to go past shouldProcessRecord() to wait at waitUntilValueSchemaAvailable() - * inside waitReadyToProcessRecord(), then frees the main test thread by making the value schema available + * Wait for the main test thread to go past shouldProcessRecord() and reach waitUntilValueSchemaAvailable() + * inside waitReadyToProcessRecord(), then free the main test thread by making the value schema available */ Utils.sleep(1000L); doReturn(true).when(mockSchemaRepo).hasValueSchema(storeNameWithoutVersionInfo, 0); @@ -4300,6 +4309,7 @@ public void testShouldProcessRecord(AAConfig aaConfig) throws Exception { * done. Otherwise, validateRecordBeforeProducingToLocalKafka() would fail and induce an ingestion exception. */ PartitionConsumptionState pcs = storeIngestionTaskUnderTest.getPartitionConsumptionState(PARTITION_FOO); + pcs.setTopicSwitch(topicSwitchWrapper); pcs.getOffsetRecord().setLeaderTopic(rtTopic); pcs.setLeaderFollowerState(LeaderFollowerStateType.LEADER); }); @@ -4317,7 +4327,7 @@ public void testShouldProcessRecord(AAConfig aaConfig) throws Exception { throw new RuntimeException(e); } - // This exception appears if the PCS is in leader state, but the processed message is from local VT + // Created in validateRecordBeforeProducingToLocalKafka() if the message is from local VT, but we're the leader Assert.assertNull(storeIngestionTaskUnderTest.getPartitionIngestionExceptionList().get(PARTITION_FOO)); }, aaConfig); }