diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala index d71db70c16d11..df643ea449372 100644 --- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala @@ -668,9 +668,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging { val sameMoveTp = new TopicPartition("orders", 2) // Throttle to ensure we minimize race conditions and test flakiness - throttle(Seq("orders"), throttleSettingForSeconds(10), Map( - sameMoveTp -> Seq(0, 1, 2) - )) + throttle(Seq("orders"), throttleSettingForSeconds(10), Set(sameMoveTp)) servers.foreach(_.shutdown()) adminClient.close() @@ -758,8 +756,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging { produceMessages(tp0.topic, 500, acks = -1, valueLength = 100 * 1000) - TestUtils.throttleAllBrokersReplication(adminClient, Seq(101), throttleBytes = 1) - TestUtils.assignThrottledPartitionReplicas(adminClient, Map(tp0 -> Seq(101))) + TestUtils.setReplicationThrottleForPartitions(adminClient, Seq(101), Set(tp0), throttleBytes = 1) adminClient.alterPartitionReassignments( Map(reassignmentEntry(tp0, Seq(100, 101))).asJava @@ -778,8 +775,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging { assertTrue(isAssignmentInProgress(tp0)) - TestUtils.resetBrokersThrottle(adminClient, Seq(101)) - TestUtils.removePartitionReplicaThrottles(adminClient, Set(tp0)) + TestUtils.removeReplicationThrottleForPartitions(adminClient, brokerIds, Set(tp0)) waitForAllReassignmentsToComplete() assertEquals(Seq(100, 101), zkClient.getReplicasForPartition(tp0)) @@ -802,10 +798,8 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging { assertTrue(adminClient.listPartitionReassignments().reassignments().get().isEmpty) // Throttle to ensure we minimize race conditions and test flakiness - throttle(Seq(topicName), throttleSettingForSeconds(10), Map( - tp0 -> Seq(100, 101), - tp2 -> Seq(100, 101) - )) + throttle(Seq(topicName), throttleSettingForSeconds(10), Set(tp0, tp2)) + adminClient.alterPartitionReassignments( Map(reassignmentEntry(tp0, Seq(101)), reassignmentEntry(tp2, Seq(101))).asJava @@ -832,10 +826,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging { servers = servers) // Throttle to ensure we minimize race conditions and test flakiness - throttle(Seq(topicName), throttleSettingForSeconds(10), Map( - tp0 -> Seq(100, 101, 102), - tp1 -> Seq(100, 101, 102) - )) + throttle(Seq(topicName), throttleSettingForSeconds(10), Set(tp0, tp1)) adminClient.alterPartitionReassignments( Map(reassignmentEntry(tp0, Seq(100, 101, 102)), @@ -880,10 +871,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging { // Throttle to avoid race conditions val throttleSetting = throttleSettingForSeconds(10) - throttle(Seq(topicName), throttleSetting, Map( - tp0 -> Seq(100, 101, 102), - tp1 -> Seq(100, 101, 102) - )) + throttle(Seq(topicName), throttleSetting, Set(tp0, tp1)) // API reassignment to 101 for both partitions adminClient.alterPartitionReassignments( @@ -938,11 +926,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging { servers = servers) // Throttle to avoid race conditions - throttle(Seq("A", "B"), throttleSettingForSeconds(10), Map( - tpA0 -> Seq(100, 101, 102), - tpA1 -> Seq(100, 101, 102), - tpB0 -> Seq(100, 101, 102) - )) + throttle(Seq("A", "B"), throttleSettingForSeconds(10), Set(tpA0, tpA1, tpB0)) // 1. znode reassignment to 101 for TP A-0, A-1 val topicJson = executeAssignmentJson(Seq( @@ -985,9 +969,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging { adminClient = createAdminClient(servers) createTopic(zkClient, topicName, Map(tp0.partition() -> Seq(100, 101)), servers = servers) // Throttle to ensure we minimize race conditions and test flakiness - throttle(Seq(topicName), throttleSettingForSeconds(10), Map( - tp0 -> Seq(100, 101, 102) - )) + throttle(Seq(topicName), throttleSettingForSeconds(10), Set(tp0)) // move to [102, 101] adminClient.alterPartitionReassignments( @@ -1016,10 +998,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging { servers = servers) // Throttle to avoid race conditions - throttle(Seq(topicName), throttleSettingForSeconds(10), Map( - tp0 -> Seq(100, 101), - tp1 -> Seq(100, 101) - )) + throttle(Seq(topicName), throttleSettingForSeconds(10), Set(tp0, tp1)) val move = Map( tp0 -> Seq(101), @@ -1060,12 +1039,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging { servers = servers) // Throttle to avoid race conditions - throttle(Seq(topicName), throttleSettingForSeconds(10), Map( - tp0 -> Seq(100, 101, 102), - tp1 -> Seq(100, 101, 102), - tp2 -> Seq(100, 101, 102), - tp3 -> Seq(100, 101, 102) - )) + throttle(Seq(topicName), throttleSettingForSeconds(10), Set(tp0, tp1, tp2, tp3)) // API reassignment to 101 for tp0 and tp1 adminClient.alterPartitionReassignments( @@ -1115,11 +1089,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging { servers = servers) // Throttle to avoid race conditions - throttle(Seq("A", "B"), throttleSettingForSeconds(10), Map( - tpA0 -> Seq(100, 101, 102), - tpA1 -> Seq(100, 101, 102), - tpB0 -> Seq(100, 101, 102) - )) + throttle(Seq("A", "B"), throttleSettingForSeconds(10), Set(tpA0, tpA1, tpB0)) adminClient.alterPartitionReassignments(Map(reassignmentEntry(tpA0, Seq(101))).asJava).all().get() val apiReassignmentsInProgress1 = adminClient.listPartitionReassignments().reassignments().get() @@ -1180,10 +1150,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging { servers = servers) // Throttle to avoid race conditions - throttle(Seq(topicName), throttleSettingForSeconds(10), Map( - tp0 -> Seq(100, 101), - tp1 -> Seq(100, 101) - )) + throttle(Seq(topicName), throttleSettingForSeconds(10), Set(tp0, tp1)) // Alter `topicName` partition reassignment adminClient.alterPartitionReassignments( @@ -1308,13 +1275,12 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging { ThrottleSetting(throttle.toString, messagesPerSecond * secondsDuration, messageSize) } - def throttle(topics: Seq[String], throttle: ThrottleSetting, replicasToThrottle: Map[TopicPartition, Seq[Int]]): Unit = { + def throttle(topics: Seq[String], throttle: ThrottleSetting, partitions: Set[TopicPartition]): Unit = { val messagesPerTopic = throttle.numMessages / topics.size for (topic <- topics) { produceMessages(topic, numMessages = messagesPerTopic, acks = 0, valueLength = throttle.messageSizeBytes) } - TestUtils.throttleAllBrokersReplication(adminClient, brokerIds, throttle.throttleBytes.toInt) - TestUtils.assignThrottledPartitionReplicas(adminClient, replicasToThrottle) + TestUtils.setReplicationThrottleForPartitions(adminClient, brokerIds, partitions, throttle.throttleBytes.toInt) } private def produceMessages(topic: String, numMessages: Int, acks: Int, valueLength: Int): Unit = { diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala index 288b72f15e432..c0d46851226ce 100644 --- a/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala +++ b/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala @@ -26,6 +26,7 @@ import kafka.utils.{Exit, Logging, TestUtils} import kafka.zk.{ConfigEntityChangeNotificationZNode, DeleteTopicsTopicZNode} import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.clients.admin._ +import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.config.{ConfigException, ConfigResource, TopicConfig} import org.apache.kafka.common.internals.Topic @@ -54,7 +55,7 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin rackInfo = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack1", 4 -> "rack3", 5 -> "rack3"), numPartitions = numPartitions, defaultReplicationFactor = defaultReplicationFactor - ).map(KafkaConfig.fromProps) + ).map(KafkaConfig.fromProps) private val numPartitions = 1 private val defaultReplicationFactor = 1.toShort @@ -93,7 +94,7 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin def waitForTopicCreated(topicName: String, timeout: Int = 10000): Unit = { val finishTime = System.currentTimeMillis() + timeout var result = false - while (System.currentTimeMillis() < finishTime || !result) { + while (System.currentTimeMillis() < finishTime && !result) { val topics = adminClient.listTopics(new ListTopicsOptions().listInternal(true)).names().get() result = topics.contains(topicName) Thread.sleep(100) @@ -655,31 +656,29 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin val configMap = new java.util.HashMap[String, String]() val replicationFactor: Short = 1 val partitions = 1 + val tp = new TopicPartition(testTopicName, 0) adminClient.createTopics( Collections.singletonList(new NewTopic(testTopicName, partitions, replicationFactor).configs(configMap))).all().get() waitForTopicCreated(testTopicName) - - for (msg <- 0 to 10) { - TestUtils.produceMessage(servers, testTopicName, s"$msg") - } + TestUtils.generateAndProduceMessages(servers, testTopicName, numMessages = 10, acks = -1) val brokerIds = servers.map(_.config.brokerId) - TestUtils.throttleAllBrokersReplication(adminClient, brokerIds, throttleBytes = 1) + TestUtils.setReplicationThrottleForPartitions(adminClient, brokerIds, Set(tp), throttleBytes = 1) val testTopicDesc = adminClient.describeTopics(Collections.singleton(testTopicName)).all().get().get(testTopicName) val firstPartition = testTopicDesc.partitions().asScala.head - val firstTopicPartition = new TopicPartition(testTopicName, firstPartition.partition()) + val replicasOfFirstPartition = firstPartition.replicas().asScala.map(_.id()) val targetReplica = brokerIds.diff(replicasOfFirstPartition).head - adminClient.alterPartitionReassignments(Collections.singletonMap(firstTopicPartition, + adminClient.alterPartitionReassignments(Collections.singletonMap(tp, Optional.of(new NewPartitionReassignment(Collections.singletonList(targetReplica))))) // let's wait until the LAIR is propagated TestUtils.waitUntilTrue(() => { - val reassignments = adminClient.listPartitionReassignments(Collections.singleton(firstTopicPartition)).reassignments().get() - !reassignments.get(firstTopicPartition).addingReplicas().isEmpty + val reassignments = adminClient.listPartitionReassignments(Collections.singleton(tp)).reassignments().get() + !reassignments.get(tp).addingReplicas().isEmpty }, "Reassignment didn't add the second node") // describe the topic and test if it's under-replicated @@ -693,7 +692,7 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin topicService.describeTopic(new TopicCommandOptions(Array("--under-replicated-partitions")))) assertEquals("--under-replicated-partitions shouldn't return anything", "", underReplicatedOutput) - TestUtils.resetBrokersThrottle(adminClient, brokerIds) + TestUtils.removeReplicationThrottleForPartitions(adminClient, brokerIds, Set(tp)) TestUtils.waitForAllReassignmentsToComplete(adminClient) } @@ -793,4 +792,5 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin assertTrue(output.contains(testTopicName)) assertFalse(output.contains(Topic.GROUP_METADATA_TOPIC_NAME)) } + } diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 0f34268b31b79..8fda346960e0c 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -1631,6 +1631,29 @@ object TestUtils extends Logging { } } + /** + * Set broker replication quotas and enable throttling for a set of partitions. This + * will override any previous replication quotas, but will leave the throttling status + * of other partitions unaffected. + */ + def setReplicationThrottleForPartitions(admin: Admin, + brokerIds: Seq[Int], + partitions: Set[TopicPartition], + throttleBytes: Int): Unit = { + throttleAllBrokersReplication(admin, brokerIds, throttleBytes) + assignThrottledPartitionReplicas(admin, partitions.map(_ -> brokerIds).toMap) + } + + /** + * Remove a set of throttled partitions and reset the overall replication quota. + */ + def removeReplicationThrottleForPartitions(admin: Admin, + brokerIds: Seq[Int], + partitions: Set[TopicPartition]): Unit = { + removePartitionReplicaThrottles(admin, partitions) + resetBrokersThrottle(admin, brokerIds) + } + /** * Throttles all replication across the cluster. * @param adminClient is the adminClient to use for making connection with the cluster