From 27cc188e9d467fa81ce2477beea935008e9921b9 Mon Sep 17 00:00:00 2001 From: "Joseph (Ting-Chou) Lin" Date: Thu, 23 Feb 2023 09:48:51 -0800 Subject: [PATCH] [LI-HOTFIX] Add a hook for transforming rack ID used by rack-aware replica assignment (#438) TICKET = LIKAFKA-49537, LIKAFKA-50843 EXIT_CRITERIA = N/A, but will need significant porting for KRaft LI_DESCRIPTION = This is the first step for enabling fault-domain-aware replica assignment. The full implementation is to encode the fault domain into rack ID (so that we don't break the open-source CC interface) and let CC take that encoded fault domain into consideration when performing rebalance. In order to prevent assignment logic flaws during the rollout phase of encoded rack ID, we need to let the broker be able to handle the intermediate state that encoded/unencoded rack IDs are mixed in the cluster. To achieve so, we start by adding a hook interface, RackAwareReplicaAssignmentRackIdMapper, for the LI internal `kafka-server` to inject the logic for processing encoded rack ID. Since the logic is pretty critical, an integration test is also added to test if a forged mapper class can be incorporated into the replica assignment flow. --- ...ackAwareReplicaAssignmentRackIdMapper.java | 33 +++++ .../main/scala/kafka/admin/AdminUtils.scala | 10 +- .../admin/ReassignPartitionsCommand.scala | 4 + .../kafka/controller/KafkaController.scala | 10 +- .../main/scala/kafka/server/KafkaConfig.scala | 10 ++ .../scala/kafka/server/ZkAdminManager.scala | 7 +- .../main/scala/kafka/zk/AdminZkClient.scala | 47 ++++--- .../kafka/admin/IgnorePrefixRackIdMapper.java | 31 +++++ ...areReplicaAssignmentRackIdMapperTest.scala | 122 ++++++++++++++++++ .../unit/kafka/server/KafkaConfigTest.scala | 15 +++ 10 files changed, 264 insertions(+), 25 deletions(-) create mode 100644 core/src/main/java/kafka/admin/RackAwareReplicaAssignmentRackIdMapper.java create mode 100644 core/src/test/java/kafka/admin/IgnorePrefixRackIdMapper.java create mode 100644 core/src/test/scala/integration/kafka/api/RackAwareReplicaAssignmentRackIdMapperTest.scala diff --git a/core/src/main/java/kafka/admin/RackAwareReplicaAssignmentRackIdMapper.java b/core/src/main/java/kafka/admin/RackAwareReplicaAssignmentRackIdMapper.java new file mode 100644 index 0000000000000..fb0b95d1ecacd --- /dev/null +++ b/core/src/main/java/kafka/admin/RackAwareReplicaAssignmentRackIdMapper.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.admin; + +/** + * A transformer for mapping rack ID to different values. + * + * This is to be used to customize rack Id interpretation for extra encoding, specifically should be used on broker side's partition assignment code path. + * For other code path like {@link TopicCommand} invoked from kafka-*.sh, it would require extra effort for injection. + */ +@FunctionalInterface +public interface RackAwareReplicaAssignmentRackIdMapper { + /** + * @param rackId + * @return Transformed rackId from raw rackId + */ + String apply(String rackId); +} diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index a37e3d68e6456..bc5337cc58b86 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -104,7 +104,8 @@ object AdminUtils extends Logging { nPartitions: Int, replicationFactor: Int, fixedStartIndex: Int = -1, - startPartitionId: Int = -1): Map[Int, Seq[Int]] = { + startPartitionId: Int = -1, + rackIdMapperForBrokerAssignment: RackAwareReplicaAssignmentRackIdMapper = identity): Map[Int, Seq[Int]] = { if (nPartitions <= 0) throw new InvalidPartitionsException("Number of partitions must be larger than 0.") if (replicationFactor <= 0) @@ -118,7 +119,7 @@ object AdminUtils extends Logging { if (brokerMetadatas.exists(_.rack.isEmpty)) throw new AdminOperationException("Not all brokers have rack information for replica rack aware assignment.") assignReplicasToBrokersRackAware(nPartitions, replicationFactor, brokerMetadatas, fixedStartIndex, - startPartitionId) + startPartitionId, rackIdMapperForBrokerAssignment) } } @@ -149,9 +150,10 @@ object AdminUtils extends Logging { replicationFactor: Int, brokerMetadatas: Iterable[BrokerMetadata], fixedStartIndex: Int, - startPartitionId: Int): Map[Int, Seq[Int]] = { + startPartitionId: Int, + rackIdMapperForBrokerAssignment: RackAwareReplicaAssignmentRackIdMapper): Map[Int, Seq[Int]] = { val brokerRackMap = brokerMetadatas.collect { case BrokerMetadata(id, Some(rack)) => - id -> rack + id -> rackIdMapperForBrokerAssignment.apply(rack) }.toMap val numRacks = brokerRackMap.values.toSet.size val arrangedBrokerList = getRackAlternatedBrokerList(brokerRackMap) diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index 186a4313af08e..da82e40dcbbf4 100755 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -606,6 +606,10 @@ object ReassignPartitionsCommand extends Logging { val proposedAssignments = mutable.Map[TopicPartition, Seq[Int]]() groupedByTopic.forKeyValue { (topic, assignment) => val (_, replicas) = assignment.head + // TODO: + // This is for `./kafka-reassign-partitions.sh --generate` an "external operation" code path that we don't use in LI. + // If in the future, we want to use it and make it compliant with our new rackIdMapperForBrokerAssignment, + // will need to be modified to and repackaged to generate a new kafka-utils package to support passing it in. val assignedReplicas = AdminUtils. assignReplicasToBrokers(brokerMetadatas, assignment.size, replicas.size) proposedAssignments ++= assignedReplicas.map { case (partition, replicas) => diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 2c5db98c1709a..052d206fdbd81 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -1268,7 +1268,9 @@ class KafkaController(val config: KafkaConfig, val numReplica = partitionMap.head._2.replicas.size val brokers = controllerContext.liveOrShuttingDownBrokers.map { b => kafka.admin.BrokerMetadata(b.id, b.rack) }.toSeq - val replicaAssignment = adminZkClient.assignReplicasToAvailableBrokers(brokers, noNewPartitionBrokerIds.toSet, numPartitions, numReplica) + val replicaAssignment = + adminZkClient.assignReplicasToAvailableBrokers(brokers, noNewPartitionBrokerIds.toSet, numPartitions, numReplica, + rackIdMapperForRackAwareReplicaAssignment = config.rackIdMapperForRackAwareReplicaAssignment) adminZkClient.writeTopicPartitionAssignment(topic, replicaAssignment.mapValues(ReplicaAssignment(_)).toMap, true) info(s"Rearrange partition and replica assignment for topic [$topic]") } @@ -3001,8 +3003,10 @@ class KafkaController(val config: KafkaConfig, val topicId = topicsIdReplicaAssignment.topicId val numPartitions = topicsIdReplicaAssignment.assignment.size val assignment = - adminZkClient.assignReplicasToAvailableBrokers(brokers, noNewPartitionBrokerIds, numPartitions, replicationFactor) - .map { case(partition, replicas) => (new TopicPartition(topic, partition), ReplicaAssignment(replicas))} + adminZkClient + .assignReplicasToAvailableBrokers(brokers, noNewPartitionBrokerIds, numPartitions, replicationFactor, + rackIdMapperForRackAwareReplicaAssignment = config.rackIdMapperForRackAwareReplicaAssignment) + .map { case(partition, replicas) => (new TopicPartition(topic, partition), ReplicaAssignment(replicas)) } zkClient.setTopicAssignment(topic, topicId, assignment, controllerContext.epochZkVersion) info(s"Updated topic [$topic] with $assignment for replica assignment") } diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 43b87da67df34..37d1e8c8b614a 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -17,6 +17,8 @@ package kafka.server +import kafka.admin.RackAwareReplicaAssignmentRackIdMapper + import java.io.File import java.util import java.util.{Collections, Locale, Properties} @@ -332,6 +334,7 @@ object Defaults { val LiDropCorruptedFilesEnabled = false val LiConsumerFetchSampleRatio = 0.01 val LiZookeeperPaginationEnable = false + val LiRackIdMapperClassNameForRackAwareReplicaAssignment: String = null } object KafkaConfig { @@ -452,6 +455,7 @@ object KafkaConfig { val LiDropCorruptedFilesEnableProp = "li.drop.corrupted.files.enable" val LiConsumerFetchSampleRatioProp = "li.consumer.fetch.sample.ratio" val LiZookeeperPaginationEnableProp = "li.zookeeper.pagination.enable" + val LiRackIdMapperClassNameForRackAwareReplicaAssignmentProp = "li.rack.aware.assignment.rack.id.mapper.class" val AllowPreferredControllerFallbackProp = "allow.preferred.controller.fallback" val UnofficialClientLoggingEnableProp = "unofficial.client.logging.enable" val UnofficialClientCacheTtlProp = "unofficial.client.cache.ttl" @@ -796,6 +800,7 @@ object KafkaConfig { val LiNumControllerInitThreadsDoc = "Number of threads (and Zookeeper clients + connections) to be used while recursing the topic-partitions tree in Zookeeper during controller startup/failover." val LiLogCleanerFineGrainedLockEnableDoc = "Specifies whether the log cleaner should use fine grained locks when calculating the filthiest log to clean" val LiZookeeperPaginationEnableDoc = "Specifies whether Zookeeper pagination should be used when listing the /brokers/topics znode. Required when sum of all topic-name lengths in the cluster exceeds ZK response-size limit (1 MB by default)." + val LiRackIdMapperClassNameForRackAwareReplicaAssignmentDoc = "The mapper class name to translate rack ID for the use of assigning replicas to brokers in a rack-aware manner. The class should implement kafka.admin.RackAwareReplicaAssignmentRackIdMapper." // Although AllowPreferredControllerFallback is expected to be configured dynamically at per cluster level, providing a static configuration entry // here allows its value to be obtained without holding the dynamic broker configuration lock. val AllowPreferredControllerFallbackDoc = "Specifies whether a non-preferred controller node (broker) is allowed to become the controller." + @@ -1242,6 +1247,7 @@ object KafkaConfig { .define(LiDropCorruptedFilesEnableProp, BOOLEAN, Defaults.LiDropCorruptedFilesEnabled, HIGH, LiDropCorruptedFilesEnableDoc) .define(LiConsumerFetchSampleRatioProp, DOUBLE, Defaults.LiConsumerFetchSampleRatio, between(0.0, 1.0), LOW, LiConsumerFetchSampleRatioDoc) .define(LiZookeeperPaginationEnableProp, BOOLEAN, Defaults.LiZookeeperPaginationEnable, LOW, LiZookeeperPaginationEnableDoc) + .define(LiRackIdMapperClassNameForRackAwareReplicaAssignmentProp, STRING, Defaults.LiRackIdMapperClassNameForRackAwareReplicaAssignment, LOW, LiRackIdMapperClassNameForRackAwareReplicaAssignmentDoc) .define(AllowPreferredControllerFallbackProp, BOOLEAN, Defaults.AllowPreferredControllerFallback, HIGH, AllowPreferredControllerFallbackDoc) .define(UnofficialClientLoggingEnableProp, BOOLEAN, Defaults.UnofficialClientLoggingEnable, LOW, UnofficialClientLoggingEnableDoc) .define(UnofficialClientCacheTtlProp, LONG, Defaults.UnofficialClientCacheTtl, LOW, UnofficialClientCacheTtlDoc) @@ -1831,6 +1837,10 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO /***************** rack configuration **************/ val rack = Option(getString(KafkaConfig.RackProp)) val replicaSelectorClassName = Option(getString(KafkaConfig.ReplicaSelectorClassProp)) + val rackIdMapperForRackAwareReplicaAssignment: RackAwareReplicaAssignmentRackIdMapper = + Option(getString(KafkaConfig.LiRackIdMapperClassNameForRackAwareReplicaAssignmentProp)) + .map(className => CoreUtils.createObject[RackAwareReplicaAssignmentRackIdMapper](className)) + .getOrElse((rackId: String) => rackId) /** ********* Log Configuration ***********/ def autoCreateTopicsEnable: java.lang.Boolean = getBoolean(KafkaConfig.AutoCreateTopicsEnableProp) diff --git a/core/src/main/scala/kafka/server/ZkAdminManager.scala b/core/src/main/scala/kafka/server/ZkAdminManager.scala index 47d23646534e7..5e017c4742dc1 100644 --- a/core/src/main/scala/kafka/server/ZkAdminManager.scala +++ b/core/src/main/scala/kafka/server/ZkAdminManager.scala @@ -178,7 +178,8 @@ class ZkAdminManager(val config: KafkaConfig, val assignments = if (topic.assignments.isEmpty) { adminZkClient.assignReplicasToAvailableBrokers( brokers, controller.partitionUnassignableBrokerIds.toSet, - resolvedNumPartitions, resolvedReplicationFactor + resolvedNumPartitions, resolvedReplicationFactor, + rackIdMapperForRackAwareReplicaAssignment = config.rackIdMapperForRackAwareReplicaAssignment ) } else { val assignments = new mutable.HashMap[Int, Seq[Int]] @@ -352,7 +353,9 @@ class ZkAdminManager(val config: KafkaConfig, val assignmentForNewPartitions = adminZkClient.createNewPartitionsAssignment( topic, existingAssignment, allBrokers, newPartition.count, newPartitionsAssignment, - noNewPartitionBrokerIds = controller.partitionUnassignableBrokerIds.toSet) + noNewPartitionBrokerIds = controller.partitionUnassignableBrokerIds.toSet, + config.rackIdMapperForRackAwareReplicaAssignment + ) if (validateOnly) { CreatePartitionsMetadata(topic, (existingAssignment ++ assignmentForNewPartitions).keySet) diff --git a/core/src/main/scala/kafka/zk/AdminZkClient.scala b/core/src/main/scala/kafka/zk/AdminZkClient.scala index 65c2963bf94ef..7dc4fe8722a20 100644 --- a/core/src/main/scala/kafka/zk/AdminZkClient.scala +++ b/core/src/main/scala/kafka/zk/AdminZkClient.scala @@ -17,8 +17,7 @@ package kafka.zk import java.util.Properties - -import kafka.admin.{AdminOperationException, AdminUtils, BrokerMetadata, RackAwareMode} +import kafka.admin.{AdminOperationException, AdminUtils, BrokerMetadata, RackAwareMode, RackAwareReplicaAssignmentRackIdMapper} import kafka.common.TopicAlreadyMarkedForDeletionException import kafka.controller.ReplicaAssignment import kafka.log.LogConfig @@ -48,16 +47,19 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging { * @param topicConfig topic configs * @param rackAwareMode rack aware mode for replica assignment * @param usesTopicId Boolean indicating whether the topic ID will be created + * @param rackIdMapperForRackAwareReplicaAssignment A transformer for mapping rack ID to different values. This is for customized interpretation of rack ID. */ def createTopic(topic: String, partitions: Int, replicationFactor: Int, topicConfig: Properties = new Properties, rackAwareMode: RackAwareMode = RackAwareMode.Enforced, - usesTopicId: Boolean = false): Unit = { + usesTopicId: Boolean = false, + rackIdMapperForRackAwareReplicaAssignment: RackAwareReplicaAssignmentRackIdMapper = identity): Unit = { val brokerMetadatas = getBrokerMetadatas(rackAwareMode) val noNewPartitionBrokerIds = getMaintenanceBrokerList() - val replicaAssignment = assignReplicasToAvailableBrokers(brokerMetadatas, noNewPartitionBrokerIds.toSet, partitions, replicationFactor) + val replicaAssignment = assignReplicasToAvailableBrokers(brokerMetadatas, noNewPartitionBrokerIds.toSet, partitions, replicationFactor, + rackIdMapperForRackAwareReplicaAssignment = rackIdMapperForRackAwareReplicaAssignment) createTopicWithAssignment(topic, topicConfig, replicaAssignment, usesTopicId = usesTopicId) } @@ -216,6 +218,9 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging { * @param replicationFactor * @param fixedStartIndex * @param startPartitionId + * @param rackIdMapperForRackAwareReplicaAssignment A transformer for mapping rack ID to different values. + * This is to be used to customize rack Id interpretation for extra encoding, + * specifically should be used on broker side's partition assignment code path. * @return */ def assignReplicasToAvailableBrokers(brokerMetadatas: Iterable[BrokerMetadata], @@ -223,20 +228,23 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging { nPartitions: Int, replicationFactor: Int, fixedStartIndex: Int = -1, - startPartitionId: Int = -1): Map[Int, Seq[Int]] = { + startPartitionId: Int = -1, + rackIdMapperForRackAwareReplicaAssignment: RackAwareReplicaAssignmentRackIdMapper): Map[Int, Seq[Int]] = { val availableBrokerMetadata = brokerMetadatas.filter { - brokerMetadata => - if (noNewPartitionBrokerIds.contains(brokerMetadata.id)) false - else true + brokerMetadata => !noNewPartitionBrokerIds.contains(brokerMetadata.id) } - if (replicationFactor > availableBrokerMetadata.size) { + val shouldOnlyUseAvailableBrokers: Boolean = replicationFactor <= availableBrokerMetadata.size + if (!shouldOnlyUseAvailableBrokers) { info(s"Using all brokers for replica assignment since replicationFactor[$replicationFactor] " + s"is larger than the number of nonMaintenanceBroker[${availableBrokerMetadata.size}]") - AdminUtils.assignReplicasToBrokers(brokerMetadatas, nPartitions, replicationFactor, fixedStartIndex, startPartitionId) - } else - AdminUtils.assignReplicasToBrokers(availableBrokerMetadata, nPartitions, replicationFactor, fixedStartIndex, startPartitionId) + } + AdminUtils.assignReplicasToBrokers( + if (shouldOnlyUseAvailableBrokers) availableBrokerMetadata else brokerMetadatas, + nPartitions, replicationFactor, fixedStartIndex, startPartitionId, + rackIdMapperForRackAwareReplicaAssignment + ) } /** @@ -249,6 +257,7 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging { * @param numPartitions Number of partitions to be set * @param replicaAssignment Manual replica assignment, or none * @param validateOnly If true, validate the parameters without actually adding the partitions + * @param rackIdMapperForRackAwareReplicaAssignment A transformer for mapping rack ID to different values. This is for customized interpretation of rack ID. * @return the updated replica assignment */ def addPartitions(topic: String, @@ -256,7 +265,8 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging { allBrokers: Seq[BrokerMetadata], numPartitions: Int = 1, replicaAssignment: Option[Map[Int, Seq[Int]]] = None, - validateOnly: Boolean = false): Map[Int, Seq[Int]] = { + validateOnly: Boolean = false, + rackIdMapperForRackAwareReplicaAssignment: RackAwareReplicaAssignmentRackIdMapper = identity): Map[Int, Seq[Int]] = { val proposedAssignmentForNewPartitions = createNewPartitionsAssignment( topic, @@ -264,7 +274,8 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging { allBrokers, numPartitions, replicaAssignment, - getMaintenanceBrokerList().toSet + getMaintenanceBrokerList().toSet, + rackIdMapperForRackAwareReplicaAssignment ) if (validateOnly) { @@ -286,6 +297,8 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging { * @param numPartitions Number of partitions to be set * @param replicaAssignment Manual replica assignment, or none * @param noNewPartitionBrokerIds Brokers that do not take new partitions + * @param rackIdMapperForRackAwareReplicaAssignment A transformer for mapping rack ID to different values. This is for customized interpretation of rack ID. + * * @return the assignment for the new partitions */ def createNewPartitionsAssignment(topic: String, @@ -293,7 +306,8 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging { allBrokers: Seq[BrokerMetadata], numPartitions: Int = 1, replicaAssignment: Option[Map[Int, Seq[Int]]] = None, - noNewPartitionBrokerIds: Set[Int] = Set.empty[Int]): Map[Int, ReplicaAssignment] = { + noNewPartitionBrokerIds: Set[Int] = Set.empty[Int], + rackIdMapperForRackAwareReplicaAssignment: RackAwareReplicaAssignmentRackIdMapper): Map[Int, ReplicaAssignment] = { val existingAssignmentPartition0 = existingAssignment.getOrElse(0, throw new AdminOperationException( s"Unexpected existing replica assignment for topic '$topic', partition id 0 is missing. " + @@ -314,7 +328,8 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging { val proposedAssignmentForNewPartitions = replicaAssignment.getOrElse { val startIndex = math.max(0, allBrokers.indexWhere(_.id >= existingAssignmentPartition0.head)) assignReplicasToAvailableBrokers(allBrokers, noNewPartitionBrokerIds, partitionsToAdd, - existingAssignmentPartition0.size, startIndex, existingAssignment.size) + existingAssignmentPartition0.size, startIndex, existingAssignment.size, + rackIdMapperForRackAwareReplicaAssignment = rackIdMapperForRackAwareReplicaAssignment) } proposedAssignmentForNewPartitions.map { case (tp, replicas) => diff --git a/core/src/test/java/kafka/admin/IgnorePrefixRackIdMapper.java b/core/src/test/java/kafka/admin/IgnorePrefixRackIdMapper.java new file mode 100644 index 0000000000000..93e6a8f275ce3 --- /dev/null +++ b/core/src/test/java/kafka/admin/IgnorePrefixRackIdMapper.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package kafka.admin; + +public class IgnorePrefixRackIdMapper implements RackAwareReplicaAssignmentRackIdMapper { + /** + * A simple transformer that returns everything after the first "::" + * For example, "::123", "123", "AA::123" would all return 123, and "A::B::123" would return "B::123" + */ + @Override + public String apply(String rackId) { + final String[] split = rackId.split("::", 2); + return split[split.length - 1]; + } +} diff --git a/core/src/test/scala/integration/kafka/api/RackAwareReplicaAssignmentRackIdMapperTest.scala b/core/src/test/scala/integration/kafka/api/RackAwareReplicaAssignmentRackIdMapperTest.scala new file mode 100644 index 0000000000000..e236d8fde8640 --- /dev/null +++ b/core/src/test/scala/integration/kafka/api/RackAwareReplicaAssignmentRackIdMapperTest.scala @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package integration.kafka.api + +import kafka.admin.{IgnorePrefixRackIdMapper, RackAwareReplicaAssignmentRackIdMapper} +import kafka.api.IntegrationTestHarness +import kafka.server.KafkaConfig +import kafka.utils.CoreUtils +import kafka.utils.TestUtils.{adminClientSecurityConfigs, waitUntilTrue} +import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, NewTopic} +import org.apache.kafka.common.utils.Utils +import org.junit.jupiter.api.{AfterEach, Assertions, Test} + +import java.util.Properties +import scala.collection.JavaConverters._ + +class RackAwareReplicaAssignmentRackIdMapperTest extends IntegrationTestHarness { + val rackIdMapperClassName = classOf[IgnorePrefixRackIdMapper].getCanonicalName + val rackIdMapper = CoreUtils.createObject[RackAwareReplicaAssignmentRackIdMapper](rackIdMapperClassName) + val brokerRacks = List( + "A::rack1", + "B::rack1", + "B::rack2", + "C::rack3", + ) + + var client: Admin = _ + + override val brokerCount = brokerRacks.size + + @AfterEach + override def tearDown(): Unit = { + if (client != null) { + Utils.closeQuietly(client, "AdminClient") + client = null + } + super.tearDown() + } + + override def modifyConfigs(configs: Seq[Properties]): Unit = { + super.modifyConfigs(configs) + // Specify customized rack.id + configs.zip(brokerRacks).foreach { case (cfgForEachBroker, rackId) => + cfgForEachBroker.put(KafkaConfig.RackProp, rackId) + cfgForEachBroker.put(KafkaConfig.LiRackIdMapperClassNameForRackAwareReplicaAssignmentProp, rackIdMapperClassName) + } + } + + def createConfig: java.util.Map[String, Object] = { + val config = new java.util.HashMap[String, Object] + config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) + config.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "20000") + val securityProps: java.util.Map[Object, Object] = + adminClientSecurityConfigs(securityProtocol, trustStoreFile, clientSaslProperties) + securityProps.forEach { (key, value) => config.put(key.asInstanceOf[String], value) } + config + } + + def waitForTopics(client: Admin, expectedPresent: Seq[String], expectedMissing: Seq[String]): Unit = { + waitUntilTrue(() => { + val topics = client.listTopics.names.get() + expectedPresent.forall(topicName => topics.contains(topicName)) && + expectedMissing.forall(topicName => !topics.contains(topicName)) + }, "timed out waiting for topics") + } + + + /** + * The rack is prefixed with some extra info, and the mapper should "ignore" the prefix. + * This is to simulate the case that we want to a) encode datacenter cages and b) group racks across cages + * + * If the mapper is not honored, with 8 partitions & RF=3, the underlying logic would place + * 8 (partitions) * 3 (RF) / 4 (brokers) = 6 replicas + * on each broker, because all of them are viewed as of different rack. + * The "mapped rack" rack1 has 2 brokers (0 & 1), which would hold 12 replicas under such situation, + * and by the pigeon hole theorem, there will be at least 1 partition having multiple replicas on the "mapped rack" rack1. + * + * On the other hand, if the mapper is honored, the broker will treat broker 0 & 1 as they're on the same rack + * and avoid placing the same partition on them. + */ + @Test + def testCreateTopicWithExtraRackIdMapper(): Unit = { + val topicName = "mytopic" + val newTopics = Seq(new NewTopic(topicName, 8, 3.toShort)) + + client = Admin.create(createConfig) + + client.createTopics(newTopics.asJava).all.get() + waitForTopics(client, Seq(topicName), Seq()) + + val assignment = + client.describeTopics(Seq(topicName).asJava).values().get(topicName).get() + .partitions().asScala + // Flatten each replica node in every partition as (Node -> Partition) + .flatMap(tp => tp.replicas().asScala.map(_ -> tp.partition())) + // Group by mapped rack.id, now we have mappedRackedId -> [ on the rack] + .groupBy(x => rackIdMapper(x._1.rack)) + + + for ((mappedRack, placementOnRack) <- assignment) { // For each rack + Assertions.assertEquals( + placementOnRack.size, // should remain the same size for the placement + placementOnRack.map(_._2) .toSet.size, // after extracting the partition of that placement and deduplicate by partition + s"Mapped rack ${mappedRack} holds multiple replica of the same partitions. Mapper not honored or assignment logic flaw.\n" + + s"On rack: ${placementOnRack}\n" + + s"Full assignment: ${assignment}" + ) + } + } + +} diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 0d0edff63b910..f72fdb017bec6 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -17,6 +17,7 @@ package kafka.server +import kafka.admin.IgnorePrefixRackIdMapper import kafka.api.{ApiVersion, KAFKA_0_8_2, KAFKA_3_0_IV1} import kafka.cluster.EndPoint import kafka.log.LogConfig @@ -1277,4 +1278,18 @@ class KafkaConfigTest { props.put(KafkaConfig.LogDirsProp, "/tmp/a") KafkaConfig.fromProps(props) } + + @Test + def testRackAwareReplicaAssignmentRackIdMapperGeneratedFromClassName(): Unit = { + val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) + props.put(KafkaConfig.LiRackIdMapperClassNameForRackAwareReplicaAssignmentProp, classOf[IgnorePrefixRackIdMapper].getCanonicalName) + assertTrue(KafkaConfig.fromProps(props).rackIdMapperForRackAwareReplicaAssignment.isInstanceOf[IgnorePrefixRackIdMapper]) + } + + @Test + def testRackAwareReplicaAssignmentRackIdMapperDefaultsToIdentityIfNotProvided(): Unit = { + val testRackId = "A::123" + val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) + assertEquals(testRackId, KafkaConfig.fromProps(props).rackIdMapperForRackAwareReplicaAssignment(testRackId)) + } }