Skip to content

Commit

Permalink
Did a bit more cleaning up. 🧹
Browse files Browse the repository at this point in the history
  • Loading branch information
KaiSernLim committed Oct 31, 2024
1 parent af7fd9a commit e20d24c
Showing 1 changed file with 14 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4252,7 +4252,8 @@ public void testBatchOnlyStoreDataRecovery() {
}

/**
* Tests that the following scenario no longer occurs after adding a lock on {@link LeaderFollowerStateType}:
* Simulate the consumer thread processing a batch of polled messages while another thread modifies the leader-follower
* state in PCS. The following scenario must no longer occur after adding a lock on {@link LeaderFollowerStateType}:
* 1. Consumer thread calls {@link StoreIngestionTask#produceToStoreBufferServiceOrKafka} to process polled messages
* 2. Execution reaches {@link LeaderFollowerStoreIngestionTask#shouldProcessRecord}, which compares the message's
* topic with the topic that should be consumed from, according to {@link PartitionConsumptionState}
Expand All @@ -4271,11 +4272,19 @@ public void testBatchOnlyStoreDataRecovery() {
*/
@Test(dataProvider = "aaConfigProvider")
public void testShouldProcessRecord(AAConfig aaConfig) throws Exception {
// Create a batch of polled messages for the consumer thread to process. Only one message is necessary
PubSubTopicPartition topicPartition = new PubSubTopicPartitionImpl(pubSubTopic, PARTITION_FOO);
PubSubHelper.MutablePubSubMessage pubSubMessage = PubSubHelper.getDummyPubSubMessage(false);
pubSubMessage.setTopicPartition(topicPartition);
List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>> consumedMessages = Arrays.asList(pubSubMessage);

// Set the RT topic stuff on the PCS along with the leader state to eliminate the scary exceptions due to test setup
PubSubTopic rtTopic = pubSubTopicRepository.getTopic(Version.composeRealTimeTopic(storeNameWithoutVersionInfo));
TopicSwitch topicSwitch = new TopicSwitch();
topicSwitch.sourceKafkaServers = Arrays.asList(inMemoryRemoteKafkaBroker.getKafkaBootstrapServer());
topicSwitch.sourceTopicName = rtTopic.getName();
TopicSwitchWrapper topicSwitchWrapper =
new TopicSwitchWrapper(topicSwitch, pubSubTopicRepository.getTopic(topicSwitch.sourceTopicName.toString()));

runTest(Collections.singleton(PARTITION_FOO), () -> {
TestUtils.waitForNonDeterministicAssertion(
Expand All @@ -4288,8 +4297,8 @@ public void testShouldProcessRecord(AAConfig aaConfig) throws Exception {

Thread stateTransitionThread = new Thread(() -> {
/*
* Waits for the main test thread to go past shouldProcessRecord() to wait at waitUntilValueSchemaAvailable()
* inside waitReadyToProcessRecord(), then frees the main test thread by making the value schema available
* Wait for the main test thread to go past shouldProcessRecord() and reach waitUntilValueSchemaAvailable()
* inside waitReadyToProcessRecord(), then free the main test thread by making the value schema available
*/
Utils.sleep(1000L);
doReturn(true).when(mockSchemaRepo).hasValueSchema(storeNameWithoutVersionInfo, 0);
Expand All @@ -4300,6 +4309,7 @@ public void testShouldProcessRecord(AAConfig aaConfig) throws Exception {
* done. Otherwise, validateRecordBeforeProducingToLocalKafka() would fail and induce an ingestion exception.
*/
PartitionConsumptionState pcs = storeIngestionTaskUnderTest.getPartitionConsumptionState(PARTITION_FOO);
pcs.setTopicSwitch(topicSwitchWrapper);
pcs.getOffsetRecord().setLeaderTopic(rtTopic);
pcs.setLeaderFollowerState(LeaderFollowerStateType.LEADER);
});
Expand All @@ -4317,7 +4327,7 @@ public void testShouldProcessRecord(AAConfig aaConfig) throws Exception {
throw new RuntimeException(e);
}

// This exception appears if the PCS is in leader state, but the processed message is from local VT
// Created in validateRecordBeforeProducingToLocalKafka() if the message is from local VT, but we're the leader
Assert.assertNull(storeIngestionTaskUnderTest.getPartitionIngestionExceptionList().get(PARTITION_FOO));
}, aaConfig);
}
Expand Down

0 comments on commit e20d24c

Please sign in to comment.