Skip to content

Commit

Permalink
MINOR: Fix throttle usage in reassignment test case (apache#7822)
Browse files Browse the repository at this point in the history
The replication throttle in `testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress` was not setting the quota on the partition correctly, so the test case was not working as expected. This patch fixes the problem and also fixes a bug in `waitForTopicCreated` which caused the function to always wait for the full timeout.

Reviewers: Ismael Juma <[email protected]>
  • Loading branch information
Jason Gustafson authored Dec 17, 2019
1 parent b50d925 commit fd7991a
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -668,9 +668,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
val sameMoveTp = new TopicPartition("orders", 2)

// Throttle to ensure we minimize race conditions and test flakiness
throttle(Seq("orders"), throttleSettingForSeconds(10), Map(
sameMoveTp -> Seq(0, 1, 2)
))
throttle(Seq("orders"), throttleSettingForSeconds(10), Set(sameMoveTp))

servers.foreach(_.shutdown())
adminClient.close()
Expand Down Expand Up @@ -758,8 +756,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {

produceMessages(tp0.topic, 500, acks = -1, valueLength = 100 * 1000)

TestUtils.throttleAllBrokersReplication(adminClient, Seq(101), throttleBytes = 1)
TestUtils.assignThrottledPartitionReplicas(adminClient, Map(tp0 -> Seq(101)))
TestUtils.setReplicationThrottleForPartitions(adminClient, Seq(101), Set(tp0), throttleBytes = 1)

adminClient.alterPartitionReassignments(
Map(reassignmentEntry(tp0, Seq(100, 101))).asJava
Expand All @@ -778,8 +775,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {

assertTrue(isAssignmentInProgress(tp0))

TestUtils.resetBrokersThrottle(adminClient, Seq(101))
TestUtils.removePartitionReplicaThrottles(adminClient, Set(tp0))
TestUtils.removeReplicationThrottleForPartitions(adminClient, brokerIds, Set(tp0))

waitForAllReassignmentsToComplete()
assertEquals(Seq(100, 101), zkClient.getReplicasForPartition(tp0))
Expand All @@ -802,10 +798,8 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
assertTrue(adminClient.listPartitionReassignments().reassignments().get().isEmpty)

// Throttle to ensure we minimize race conditions and test flakiness
throttle(Seq(topicName), throttleSettingForSeconds(10), Map(
tp0 -> Seq(100, 101),
tp2 -> Seq(100, 101)
))
throttle(Seq(topicName), throttleSettingForSeconds(10), Set(tp0, tp2))

adminClient.alterPartitionReassignments(
Map(reassignmentEntry(tp0, Seq(101)),
reassignmentEntry(tp2, Seq(101))).asJava
Expand All @@ -832,10 +826,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
servers = servers)

// Throttle to ensure we minimize race conditions and test flakiness
throttle(Seq(topicName), throttleSettingForSeconds(10), Map(
tp0 -> Seq(100, 101, 102),
tp1 -> Seq(100, 101, 102)
))
throttle(Seq(topicName), throttleSettingForSeconds(10), Set(tp0, tp1))

adminClient.alterPartitionReassignments(
Map(reassignmentEntry(tp0, Seq(100, 101, 102)),
Expand Down Expand Up @@ -880,10 +871,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {

// Throttle to avoid race conditions
val throttleSetting = throttleSettingForSeconds(10)
throttle(Seq(topicName), throttleSetting, Map(
tp0 -> Seq(100, 101, 102),
tp1 -> Seq(100, 101, 102)
))
throttle(Seq(topicName), throttleSetting, Set(tp0, tp1))

// API reassignment to 101 for both partitions
adminClient.alterPartitionReassignments(
Expand Down Expand Up @@ -938,11 +926,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
servers = servers)

// Throttle to avoid race conditions
throttle(Seq("A", "B"), throttleSettingForSeconds(10), Map(
tpA0 -> Seq(100, 101, 102),
tpA1 -> Seq(100, 101, 102),
tpB0 -> Seq(100, 101, 102)
))
throttle(Seq("A", "B"), throttleSettingForSeconds(10), Set(tpA0, tpA1, tpB0))

// 1. znode reassignment to 101 for TP A-0, A-1
val topicJson = executeAssignmentJson(Seq(
Expand Down Expand Up @@ -985,9 +969,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
adminClient = createAdminClient(servers)
createTopic(zkClient, topicName, Map(tp0.partition() -> Seq(100, 101)), servers = servers)
// Throttle to ensure we minimize race conditions and test flakiness
throttle(Seq(topicName), throttleSettingForSeconds(10), Map(
tp0 -> Seq(100, 101, 102)
))
throttle(Seq(topicName), throttleSettingForSeconds(10), Set(tp0))

// move to [102, 101]
adminClient.alterPartitionReassignments(
Expand Down Expand Up @@ -1016,10 +998,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
servers = servers)

// Throttle to avoid race conditions
throttle(Seq(topicName), throttleSettingForSeconds(10), Map(
tp0 -> Seq(100, 101),
tp1 -> Seq(100, 101)
))
throttle(Seq(topicName), throttleSettingForSeconds(10), Set(tp0, tp1))

val move = Map(
tp0 -> Seq(101),
Expand Down Expand Up @@ -1060,12 +1039,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
servers = servers)

// Throttle to avoid race conditions
throttle(Seq(topicName), throttleSettingForSeconds(10), Map(
tp0 -> Seq(100, 101, 102),
tp1 -> Seq(100, 101, 102),
tp2 -> Seq(100, 101, 102),
tp3 -> Seq(100, 101, 102)
))
throttle(Seq(topicName), throttleSettingForSeconds(10), Set(tp0, tp1, tp2, tp3))

// API reassignment to 101 for tp0 and tp1
adminClient.alterPartitionReassignments(
Expand Down Expand Up @@ -1115,11 +1089,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
servers = servers)

// Throttle to avoid race conditions
throttle(Seq("A", "B"), throttleSettingForSeconds(10), Map(
tpA0 -> Seq(100, 101, 102),
tpA1 -> Seq(100, 101, 102),
tpB0 -> Seq(100, 101, 102)
))
throttle(Seq("A", "B"), throttleSettingForSeconds(10), Set(tpA0, tpA1, tpB0))

adminClient.alterPartitionReassignments(Map(reassignmentEntry(tpA0, Seq(101))).asJava).all().get()
val apiReassignmentsInProgress1 = adminClient.listPartitionReassignments().reassignments().get()
Expand Down Expand Up @@ -1180,10 +1150,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
servers = servers)

// Throttle to avoid race conditions
throttle(Seq(topicName), throttleSettingForSeconds(10), Map(
tp0 -> Seq(100, 101),
tp1 -> Seq(100, 101)
))
throttle(Seq(topicName), throttleSettingForSeconds(10), Set(tp0, tp1))

// Alter `topicName` partition reassignment
adminClient.alterPartitionReassignments(
Expand Down Expand Up @@ -1308,13 +1275,12 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
ThrottleSetting(throttle.toString, messagesPerSecond * secondsDuration, messageSize)
}

def throttle(topics: Seq[String], throttle: ThrottleSetting, replicasToThrottle: Map[TopicPartition, Seq[Int]]): Unit = {
def throttle(topics: Seq[String], throttle: ThrottleSetting, partitions: Set[TopicPartition]): Unit = {
val messagesPerTopic = throttle.numMessages / topics.size
for (topic <- topics) {
produceMessages(topic, numMessages = messagesPerTopic, acks = 0, valueLength = throttle.messageSizeBytes)
}
TestUtils.throttleAllBrokersReplication(adminClient, brokerIds, throttle.throttleBytes.toInt)
TestUtils.assignThrottledPartitionReplicas(adminClient, replicasToThrottle)
TestUtils.setReplicationThrottleForPartitions(adminClient, brokerIds, partitions, throttle.throttleBytes.toInt)
}

private def produceMessages(topic: String, numMessages: Int, acks: Int, valueLength: Int): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import kafka.utils.{Exit, Logging, TestUtils}
import kafka.zk.{ConfigEntityChangeNotificationZNode, DeleteTopicsTopicZNode}
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin._
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.{ConfigException, ConfigResource, TopicConfig}
import org.apache.kafka.common.internals.Topic
Expand Down Expand Up @@ -54,7 +55,7 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin
rackInfo = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack1", 4 -> "rack3", 5 -> "rack3"),
numPartitions = numPartitions,
defaultReplicationFactor = defaultReplicationFactor
).map(KafkaConfig.fromProps)
).map(KafkaConfig.fromProps)

private val numPartitions = 1
private val defaultReplicationFactor = 1.toShort
Expand Down Expand Up @@ -93,7 +94,7 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin
def waitForTopicCreated(topicName: String, timeout: Int = 10000): Unit = {
val finishTime = System.currentTimeMillis() + timeout
var result = false
while (System.currentTimeMillis() < finishTime || !result) {
while (System.currentTimeMillis() < finishTime && !result) {
val topics = adminClient.listTopics(new ListTopicsOptions().listInternal(true)).names().get()
result = topics.contains(topicName)
Thread.sleep(100)
Expand Down Expand Up @@ -655,31 +656,29 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin
val configMap = new java.util.HashMap[String, String]()
val replicationFactor: Short = 1
val partitions = 1
val tp = new TopicPartition(testTopicName, 0)

adminClient.createTopics(
Collections.singletonList(new NewTopic(testTopicName, partitions, replicationFactor).configs(configMap))).all().get()
waitForTopicCreated(testTopicName)

for (msg <- 0 to 10) {
TestUtils.produceMessage(servers, testTopicName, s"$msg")
}
TestUtils.generateAndProduceMessages(servers, testTopicName, numMessages = 10, acks = -1)

val brokerIds = servers.map(_.config.brokerId)
TestUtils.throttleAllBrokersReplication(adminClient, brokerIds, throttleBytes = 1)
TestUtils.setReplicationThrottleForPartitions(adminClient, brokerIds, Set(tp), throttleBytes = 1)

val testTopicDesc = adminClient.describeTopics(Collections.singleton(testTopicName)).all().get().get(testTopicName)
val firstPartition = testTopicDesc.partitions().asScala.head
val firstTopicPartition = new TopicPartition(testTopicName, firstPartition.partition())

val replicasOfFirstPartition = firstPartition.replicas().asScala.map(_.id())
val targetReplica = brokerIds.diff(replicasOfFirstPartition).head

adminClient.alterPartitionReassignments(Collections.singletonMap(firstTopicPartition,
adminClient.alterPartitionReassignments(Collections.singletonMap(tp,
Optional.of(new NewPartitionReassignment(Collections.singletonList(targetReplica)))))

// let's wait until the LAIR is propagated
TestUtils.waitUntilTrue(() => {
val reassignments = adminClient.listPartitionReassignments(Collections.singleton(firstTopicPartition)).reassignments().get()
!reassignments.get(firstTopicPartition).addingReplicas().isEmpty
val reassignments = adminClient.listPartitionReassignments(Collections.singleton(tp)).reassignments().get()
!reassignments.get(tp).addingReplicas().isEmpty
}, "Reassignment didn't add the second node")

// describe the topic and test if it's under-replicated
Expand All @@ -693,7 +692,7 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin
topicService.describeTopic(new TopicCommandOptions(Array("--under-replicated-partitions"))))
assertEquals("--under-replicated-partitions shouldn't return anything", "", underReplicatedOutput)

TestUtils.resetBrokersThrottle(adminClient, brokerIds)
TestUtils.removeReplicationThrottleForPartitions(adminClient, brokerIds, Set(tp))
TestUtils.waitForAllReassignmentsToComplete(adminClient)
}

Expand Down Expand Up @@ -793,4 +792,5 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin
assertTrue(output.contains(testTopicName))
assertFalse(output.contains(Topic.GROUP_METADATA_TOPIC_NAME))
}

}
23 changes: 23 additions & 0 deletions core/src/test/scala/unit/kafka/utils/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1631,6 +1631,29 @@ object TestUtils extends Logging {
}
}

/**
* Set broker replication quotas and enable throttling for a set of partitions. This
* will override any previous replication quotas, but will leave the throttling status
* of other partitions unaffected.
*/
def setReplicationThrottleForPartitions(admin: Admin,
brokerIds: Seq[Int],
partitions: Set[TopicPartition],
throttleBytes: Int): Unit = {
throttleAllBrokersReplication(admin, brokerIds, throttleBytes)
assignThrottledPartitionReplicas(admin, partitions.map(_ -> brokerIds).toMap)
}

/**
* Remove a set of throttled partitions and reset the overall replication quota.
*/
def removeReplicationThrottleForPartitions(admin: Admin,
brokerIds: Seq[Int],
partitions: Set[TopicPartition]): Unit = {
removePartitionReplicaThrottles(admin, partitions)
resetBrokersThrottle(admin, brokerIds)
}

/**
* Throttles all replication across the cluster.
* @param adminClient is the adminClient to use for making connection with the cluster
Expand Down

0 comments on commit fd7991a

Please sign in to comment.