From 81dd4a6a1a90777e7c7e0a564b7deec935359983 Mon Sep 17 00:00:00 2001 From: Lucas Wang Date: Wed, 21 Sep 2022 10:05:18 -0700 Subject: [PATCH] [LI-HOTFIX] Add API to support electing recommended leaders for partitions (#390) TICKET = LIKAFKA-45529 LI_DESCRIPTION = Sometimes, a leader host can become too busy to serve Fetch requests from followers within the default request timeout of 10s. The impact is that all followers experience the timeout and fall out of the ISR, resulting in UnderMinISR partitions. A proposed solution to this problem is to let the leader host request a leadership switch for the affected partitions. The leader host already knows what followers have been in the ISR, and thus can recommend a new leader to the controller. This PR is the 1st step toward that goal by adding a new API to support electing recommended leaders for partitions. Specifically, the PR includes the following changes adding support in the controller for electing the recommended leaders. The recommended leader must be alive and already in the ISR; changing the ElectLeadersRequest.json to add a new optional field that indicates the recommended leaders; exposing a new API in the KafkaAdminClient. EXIT_CRITERIA = We will likely propose this change as a KIP in upstream Kafka. Thus this hotfix can exit when the change is merged upstream and pulled in as part of a major release. --- .../org/apache/kafka/clients/admin/Admin.java | 40 ++++++++++ .../kafka/clients/admin/KafkaAdminClient.java | 20 ++++- .../kafka/clients/admin/NoOpAdminClient.java | 7 ++ .../org/apache/kafka/common/ElectionType.java | 4 +- .../common/requests/ElectLeadersRequest.java | 23 +++++- .../common/message/ElectLeadersRequest.json | 11 ++- .../kafka/clients/admin/MockAdminClient.java | 8 ++ core/src/main/scala/kafka/api/package.scala | 13 ++++ .../scala/kafka/controller/Election.scala | 34 +++++++- .../kafka/controller/KafkaController.scala | 50 +++++++++--- .../controller/PartitionStateMachine.scala | 7 ++ .../main/scala/kafka/server/KafkaApis.scala | 1 + .../scala/kafka/server/ReplicaManager.scala | 3 +- .../ControllerIntegrationTest.scala | 2 +- .../MockPartitionStateMachine.scala | 2 + .../RecommendedLeaderElectionTest.scala | 77 +++++++++++++++++++ 16 files changed, 285 insertions(+), 17 deletions(-) create mode 100644 core/src/test/scala/unit/kafka/server/RecommendedLeaderElectionTest.scala diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java index c313894c2303c..4f87f7bdc0113 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java @@ -1002,6 +1002,46 @@ ElectLeadersResult electLeaders( ElectLeadersOptions options); + default ElectLeadersResult electRecommendedLeaders( + Map partitionsWithRecommendedLeaders) { + return electRecommendedLeaders(partitionsWithRecommendedLeaders, new ElectLeadersOptions()); + } + + /** + * Switch the leadership of the given {@code partitions} to the corresponding recommending replicas. + *

+ * This operation is not transactional, so it may succeed for some partitions while fail for others. + *

+ * It may take several seconds after this method returns success for all the brokers in the cluster + * to become aware that the partitions have new leaders. During this time, + * {@link #describeTopics(Collection)} may not return information about the partitions' + * new leaders. + *

+ * The following exceptions can be anticipated when calling {@code get()} on the future obtained + * from the returned {@link ElectLeadersResult}: + *

+ * + * @param partitionsWithRecommendedLeaders The map from partitions to their corresponding recommended new leaders + * @param options The options to use when electing the leaders. + * @return The ElectLeadersResult. + */ + ElectLeadersResult electRecommendedLeaders( + Map partitionsWithRecommendedLeaders, + ElectLeadersOptions options); + /** * Change the reassignments for one or more partitions. * Providing an empty Optional (e.g via {@link Optional#empty()}) will revert the reassignment for the associated partition. diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 74da6e4cc1eab..66ba1ab3afca1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -3438,11 +3438,29 @@ void handleFailure(Throwable throwable) { return new MoveControllerResult(future); } + @Override + public ElectLeadersResult electRecommendedLeaders( + Map partitionsWithRecommendedLeaders, + ElectLeadersOptions options) { + return electLeaders(ElectionType.RECOMMENDED, partitionsWithRecommendedLeaders.keySet(), + partitionsWithRecommendedLeaders, options); + } + + @Override public ElectLeadersResult electLeaders( final ElectionType electionType, final Set topicPartitions, ElectLeadersOptions options) { + return electLeaders(electionType, topicPartitions, Collections.emptyMap(), options); + } + + + private ElectLeadersResult electLeaders( + final ElectionType electionType, + final Set topicPartitions, + final Map recommendedLeaders, + ElectLeadersOptions options) { final KafkaFutureImpl>> electionFuture = new KafkaFutureImpl<>(); final long now = time.milliseconds(); runnable.call(new Call("electLeaders", calcDeadlineMs(now, options.timeoutMs()), @@ -3450,7 +3468,7 @@ public ElectLeadersResult electLeaders( @Override public ElectLeadersRequest.Builder createRequest(int timeoutMs) { - return new ElectLeadersRequest.Builder(electionType, topicPartitions, timeoutMs); + return new ElectLeadersRequest.Builder(electionType, topicPartitions, recommendedLeaders, timeoutMs); } @Override diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/NoOpAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/NoOpAdminClient.java index 0d61ad91a2ee8..6c430d4127840 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/NoOpAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/NoOpAdminClient.java @@ -182,6 +182,13 @@ public ElectLeadersResult electLeaders(ElectionType electionType, Set partitionsWithRecommendedLeaders, + ElectLeadersOptions options) { + return null; + } + @Override public AlterPartitionReassignmentsResult alterPartitionReassignments( Map> reassignments, diff --git a/clients/src/main/java/org/apache/kafka/common/ElectionType.java b/clients/src/main/java/org/apache/kafka/common/ElectionType.java index 55331c5ea9ecd..5c34416506152 100644 --- a/clients/src/main/java/org/apache/kafka/common/ElectionType.java +++ b/clients/src/main/java/org/apache/kafka/common/ElectionType.java @@ -29,7 +29,7 @@ */ @InterfaceStability.Evolving public enum ElectionType { - PREFERRED((byte) 0), UNCLEAN((byte) 1); + PREFERRED((byte) 0), UNCLEAN((byte) 1), RECOMMENDED((byte) 2); public final byte value; @@ -42,6 +42,8 @@ public static ElectionType valueOf(byte value) { return PREFERRED; } else if (value == UNCLEAN.value) { return UNCLEAN; + } else if (value == RECOMMENDED.value) { + return RECOMMENDED; } else { throw new IllegalArgumentException( String.format("Value %s must be one of %s", value, Arrays.asList(ElectionType.values()))); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersRequest.java index 1bc888af6050c..faf0731cbac3e 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersRequest.java @@ -20,7 +20,10 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.Optional; import org.apache.kafka.common.ElectionType; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.UnsupportedVersionException; @@ -36,12 +39,21 @@ public class ElectLeadersRequest extends AbstractRequest { public static class Builder extends AbstractRequest.Builder { private final ElectionType electionType; private final Collection topicPartitions; + private final Map recommendedLeaders; private final int timeoutMs; - public Builder(ElectionType electionType, Collection topicPartitions, int timeoutMs) { + public Builder(ElectionType electionType, Collection topicPartitions, + int timeoutMs) { + this(electionType, topicPartitions, Collections.emptyMap(), timeoutMs); + } + + public Builder(ElectionType electionType, Collection topicPartitions, + Map recommendedLeaders, + int timeoutMs) { super(ApiKeys.ELECT_LEADERS); this.electionType = electionType; this.topicPartitions = topicPartitions; + this.recommendedLeaders = recommendedLeaders; this.timeoutMs = timeoutMs; } @@ -75,6 +87,15 @@ private ElectLeadersRequestData toRequestData(short version) { data.topicPartitions().add(tps); } tps.partitions().add(tp.partition()); + + // check if there is a recommended leader + Optional recommendedLeader = Optional.ofNullable(recommendedLeaders.get(tp)); + if (recommendedLeader.isPresent()) { + tps.recommendedPartitionLeaders().add(new ElectLeadersRequestData.RecommendedPartitionLeaderState() + .setPartitionIndex(tp.partition()) + .setRecommendedLeader(recommendedLeader.get()) + ); + } }); } else { data.setTopicPartitions(null); diff --git a/clients/src/main/resources/common/message/ElectLeadersRequest.json b/clients/src/main/resources/common/message/ElectLeadersRequest.json index dd9fa21641585..2e402c5054c7a 100644 --- a/clients/src/main/resources/common/message/ElectLeadersRequest.json +++ b/clients/src/main/resources/common/message/ElectLeadersRequest.json @@ -32,7 +32,16 @@ { "name": "Topic", "type": "string", "versions": "0+", "entityType": "topicName", "mapKey": true, "about": "The name of a topic." }, { "name": "Partitions", "type": "[]int32", "versions": "0+", - "about": "The partitions of this topic whose leader should be elected." } + "about": "The partitions of this topic whose leader should be elected." }, + { "name": "RecommendedPartitionLeaders", "type": "[]RecommendedPartitionLeaderState", "versions": "2+", "tag": 0, "taggedVersions": "2+", + "about": "The recommended new leaders for partitions", + "fields": [ + { "name": "PartitionIndex", "type": "int32", "versions": "2+", + "about": "The partition index." }, + { "name": "RecommendedLeader", "type": "int32", "versions": "2+", + "about": "The recommended leader." } + ] + } ] }, { "name": "TimeoutMs", "type": "int32", "versions": "0+", "default": "60000", diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java index 9486be5273e00..c5d213fd4ccc6 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java @@ -534,6 +534,14 @@ synchronized public ElectLeadersResult electLeaders( throw new UnsupportedOperationException("Not implemented yet"); } + @Override + synchronized public ElectLeadersResult electRecommendedLeaders( + Map partitionsWithRecommendedLeaders, + ElectLeadersOptions options) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override synchronized public RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroup(String groupId, RemoveMembersFromConsumerGroupOptions options) { throw new UnsupportedOperationException("Not implemented yet"); diff --git a/core/src/main/scala/kafka/api/package.scala b/core/src/main/scala/kafka/api/package.scala index e0678f810ff4c..24d2ad04d351b 100644 --- a/core/src/main/scala/kafka/api/package.scala +++ b/core/src/main/scala/kafka/api/package.scala @@ -35,6 +35,19 @@ package object api { } } + def partitionRecommendedLeaders: Map[TopicPartition, Int] = { + if (self.data.topicPartitions == null) { + Map.empty + } else { + self.data.topicPartitions.asScala.iterator.flatMap { topicPartition => + topicPartition.recommendedPartitionLeaders().asScala.map { recommendedPartitionLeaderState => + new TopicPartition(topicPartition.topic, recommendedPartitionLeaderState.partitionIndex()) -> + recommendedPartitionLeaderState.recommendedLeader() + } + }.toMap + } + } + def electionType: ElectionType = { if (self.version == 0) { ElectionType.PREFERRED diff --git a/core/src/main/scala/kafka/controller/Election.scala b/core/src/main/scala/kafka/controller/Election.scala index 3b6d8daf89054..fdb9c60bf6a43 100644 --- a/core/src/main/scala/kafka/controller/Election.scala +++ b/core/src/main/scala/kafka/controller/Election.scala @@ -19,8 +19,7 @@ package kafka.controller import kafka.api.LeaderAndIsr import kafka.utils.Logging import org.apache.kafka.common.TopicPartition - -import scala.collection.Seq +import scala.collection.{Map, Seq} case class ElectionResult(topicPartition: TopicPartition, leaderAndIsr: Option[LeaderAndIsr], liveReplicas: Seq[Int]) @@ -103,6 +102,37 @@ object Election extends Logging { } } + private def leaderForRecommendation(partition: TopicPartition, + leaderAndIsr: LeaderAndIsr, + recommendedLeader: Option[Int], + controllerContext: ControllerContext, + controllerContextSnapshot: ControllerContextSnapshot): ElectionResult = { + val assignment = controllerContext.partitionReplicaAssignment(partition) + val liveReplicas = assignment.filter(replica => controllerContextSnapshot.isReplicaOnline(replica, partition)) + val isr = leaderAndIsr.isr + val leaderOpt = PartitionLeaderElectionAlgorithms.recommendedPartitionLeaderElection(recommendedLeader, isr, liveReplicas.toSet) + val newLeaderAndIsrOpt = leaderOpt.map(leader => leaderAndIsr.newLeader(leader)) + ElectionResult(partition, newLeaderAndIsrOpt, liveReplicas) + } + + /** + * Elect leaders for partitions that have a recommended leader. + * + * @param controllerContext Context with the current state of the cluster + * @param leaderAndIsrs A sequence of tuples representing the partitions that need election + * and their respective leader/ISR states + * @param recommendedLeaders A map from each partition to its recommended leader + * @return The election results + */ + def leaderForRecommendation(controllerContext: ControllerContext, + leaderAndIsrs: Seq[(TopicPartition, LeaderAndIsr)], + recommendedLeaders: Map[TopicPartition, Int]): Seq[ElectionResult] = { + val controllerContextSnapshot = ControllerContextSnapshot(controllerContext) + leaderAndIsrs.map { case (partition, leaderAndIsr) => + leaderForRecommendation(partition, leaderAndIsr, recommendedLeaders.get(partition), controllerContext, controllerContextSnapshot) + } + } + private def leaderForPreferredReplica(partition: TopicPartition, leaderAndIsr: LeaderAndIsr, controllerContext: ControllerContext): ElectionResult = { diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 040016147eb0d..33ed7e53da216 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -367,7 +367,7 @@ class KafkaController(val config: KafkaConfig, initializePartitionReassignments() topicDeletionManager.tryTopicDeletion() val pendingPreferredReplicaElections = fetchPendingPreferredReplicaElections() - onReplicaElection(pendingPreferredReplicaElections, ElectionType.PREFERRED, ZkTriggered) + onReplicaElection(pendingPreferredReplicaElections, Map.empty, ElectionType.PREFERRED, ZkTriggered) info(s"Starting the controller scheduler ${KafkaController.timing(-1, failoverStartMs, time)}") kafkaScheduler.startup() if (config.autoLeaderRebalanceEnable) { @@ -962,6 +962,7 @@ class KafkaController(val config: KafkaConfig, */ private[this] def onReplicaElection( partitions: Set[TopicPartition], + partitionRecommendedLeaders: Map[TopicPartition, Int], electionType: ElectionType, electionTrigger: ElectionTrigger ): Map[TopicPartition, Either[Throwable, LeaderAndIsr]] = { @@ -974,6 +975,10 @@ class KafkaController(val config: KafkaConfig, * triggered by the admin client */ OfflinePartitionLeaderElectionStrategy(allowUnclean = electionTrigger == AdminClientTriggered) + case ElectionType.RECOMMENDED => + val effectivePartitionsWithRecommendedLeaders = partitionRecommendedLeaders.filter{ case (tp, _) => + partitions.contains(tp)} + RecommendedLeaderElectionStrategy(effectivePartitionsWithRecommendedLeaders) } val results = partitionStateMachine.handleStateChanges( @@ -1434,7 +1439,7 @@ class KafkaController(val config: KafkaConfig, if (!isTriggeredByAutoRebalance) { zkClient.deletePreferredReplicaElection(controllerContext.epochZkVersion) // Ensure we detect future preferred replica leader elections - eventManager.put(ReplicaLeaderElection(None, ElectionType.PREFERRED, ZkTriggered)) + eventManager.put(ReplicaLeaderElection(None, None, ElectionType.PREFERRED, ZkTriggered)) } } @@ -1534,7 +1539,7 @@ class KafkaController(val config: KafkaConfig, controllerContext.allTopics.contains(tp.topic) && canPreferredReplicaBeLeader(tp, controllerContextSnapshot) ) - onReplicaElection(candidatePartitions.toSet, ElectionType.PREFERRED, AutoTriggered) + onReplicaElection(candidatePartitions.toSet, Map.empty, ElectionType.PREFERRED, AutoTriggered) } } } @@ -2579,10 +2584,11 @@ class KafkaController(val config: KafkaConfig, def electLeaders( partitions: Set[TopicPartition], + partitionRecommendedLeaders: Map[TopicPartition, Int], electionType: ElectionType, callback: ElectLeadersCallback ): Unit = { - eventManager.put(ReplicaLeaderElection(Some(partitions), electionType, AdminClientTriggered, callback)) + eventManager.put(ReplicaLeaderElection(Some(partitions), Some(partitionRecommendedLeaders), electionType, AdminClientTriggered, callback)) } def listPartitionReassignments(partitions: Option[Set[TopicPartition]], @@ -2602,6 +2608,7 @@ class KafkaController(val config: KafkaConfig, private def processReplicaLeaderElection( partitionsFromAdminClientOpt: Option[Set[TopicPartition]], + partitionRecommendedLeadersFromAdminClientOpt: Option[Map[TopicPartition, Int]], electionType: ElectionType, electionTrigger: ElectionTrigger, callback: ElectLeadersCallback @@ -2625,7 +2632,21 @@ class KafkaController(val config: KafkaConfig, info(s"Skipping replica leader election ($electionType) for partition $p by $electionTrigger since it doesn't exist.") } - val (partitionsBeingDeleted, livePartitions) = knownPartitions.partition(partition => + val (knownPartitionsWithValidRequest, knownPartitionsWithInvalidRequest) = if (electionType == ElectionType.RECOMMENDED) { + partitionRecommendedLeadersFromAdminClientOpt match { + case Some(partitionRecommendedLeaders) => knownPartitions.partition(partitionRecommendedLeaders.contains(_)) + case None => (Set.empty[TopicPartition], knownPartitions) // all known partitions are invalid + } + } else { + (knownPartitions, Set.empty) + } + + if (knownPartitionsWithInvalidRequest.nonEmpty) { + warn(s"Skipping replica leader election ($electionType) for partitions $knownPartitionsWithInvalidRequest " + + s"by $electionTrigger since there is no recommended leader") + } + + val (partitionsBeingDeleted, livePartitions) = knownPartitionsWithValidRequest.partition(partition => topicDeletionManager.isTopicQueuedUpForDeletion(partition.topic)) if (partitionsBeingDeleted.nonEmpty) { warn(s"Skipping replica leader election ($electionType) for partitions $partitionsBeingDeleted " + @@ -2644,10 +2665,17 @@ class KafkaController(val config: KafkaConfig, case ElectionType.UNCLEAN => val currentLeader = controllerContext.partitionLeadershipInfo(partition).get.leaderAndIsr.leader currentLeader == LeaderAndIsr.NoLeader || !controllerContext.liveBrokerIds.contains(currentLeader) + case ElectionType.RECOMMENDED => + val currentLeader = controllerContext.partitionLeadershipInfo(partition).get.leaderAndIsr.leader + // Due to the checks for knownPartitionsWithValidRequest, we know for sure that there is a + // recommended leader for the partition in the partitionRecommendedLeadersFromAdminClientOpt argument + val recommendedLeader = partitionRecommendedLeadersFromAdminClientOpt.get.get(partition).get + currentLeader != recommendedLeader } } - val results = onReplicaElection(electablePartitions, electionType, electionTrigger).map { + val results = onReplicaElection(electablePartitions, partitionRecommendedLeadersFromAdminClientOpt.getOrElse(Map.empty), + electionType, electionTrigger).map { case (k, Left(ex)) => if (ex.isInstanceOf[StateChangeFailedException]) { val error = if (electionType == ElectionType.PREFERRED) { @@ -2665,6 +2693,9 @@ class KafkaController(val config: KafkaConfig, partitionsBeingDeleted.map( _ -> Left(new ApiError(Errors.INVALID_TOPIC_EXCEPTION, "The topic is being deleted")) ) ++ + knownPartitionsWithInvalidRequest.map( + _ -> Left(new ApiError(Errors.INVALID_REQUEST, "No recommended leader provided for the partition")) + ) ++ unknownPartitions.map( _ -> Left(new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, "The partition does not exist.")) ) @@ -2960,8 +2991,8 @@ class KafkaController(val config: KafkaConfig, error("Received a ShutdownEventThread event. This type of event is supposed to be handle by ControllerEventThread") case AutoPreferredReplicaLeaderElection => processAutoPreferredReplicaLeaderElection() - case ReplicaLeaderElection(partitions, electionType, electionTrigger, callback) => - processReplicaLeaderElection(partitions, electionType, electionTrigger, callback) + case ReplicaLeaderElection(partitions, partitionRecommendedLeaders, electionType, electionTrigger, callback) => + processReplicaLeaderElection(partitions, partitionRecommendedLeaders, electionType, electionTrigger, callback) case UncleanLeaderElectionEnable => processUncleanLeaderElectionEnable() case TopicUncleanLeaderElectionEnable(topic) => @@ -3130,7 +3161,7 @@ object IsrChangeNotificationHandler { class PreferredReplicaElectionHandler(eventManager: ControllerEventManager) extends ZNodeChangeHandler { override val path: String = PreferredReplicaElectionZNode.path - override def handleCreation(): Unit = eventManager.put(ReplicaLeaderElection(None, ElectionType.PREFERRED, ZkTriggered)) + override def handleCreation(): Unit = eventManager.put(ReplicaLeaderElection(None, None, ElectionType.PREFERRED, ZkTriggered)) } class ControllerChangeHandler(eventManager: ControllerEventManager) extends ZNodeChangeHandler { @@ -3328,6 +3359,7 @@ case class TopicDeletionFlagChange(reset: Boolean = false) extends ControllerEve case class ReplicaLeaderElection( partitionsFromAdminClientOpt: Option[Set[TopicPartition]], + partitionRecommendedLeadersFromAdminClientOpt: Option[Map[TopicPartition, Int]], electionType: ElectionType, electionTrigger: ElectionTrigger, callback: ElectLeadersCallback = _ => {} diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index eb527b1937d38..0efb738673735 100755 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -419,6 +419,8 @@ class ZkPartitionStateMachine(config: KafkaConfig, leaderForPreferredReplica(controllerContext, validLeaderAndIsrs).partition(_.leaderAndIsr.isEmpty) case ControlledShutdownPartitionLeaderElectionStrategy => leaderForControlledShutdown(controllerContext, validLeaderAndIsrs).partition(_.leaderAndIsr.isEmpty) + case RecommendedLeaderElectionStrategy(recommendedLeaders) => + leaderForRecommendation(controllerContext, validLeaderAndIsrs, recommendedLeaders).partition(_.leaderAndIsr.isEmpty) } partitionsWithoutLeaders.foreach { electionResult => val partition = electionResult.topicPartition @@ -540,6 +542,10 @@ object PartitionLeaderElectionAlgorithms { reassignment.find(id => liveReplicas.contains(id) && isr.contains(id)) } + def recommendedPartitionLeaderElection(recommendedLeader: Option[Int], isr: Seq[Int], liveReplicas: Set[Int]): Option[Int] = { + recommendedLeader.find(id => liveReplicas.contains(id) && isr.contains(id)) + } + def preferredReplicaPartitionLeaderElection(assignment: Seq[Int], isr: Seq[Int], liveReplicas: Set[Int]): Option[Int] = { assignment.headOption.filter(id => liveReplicas.contains(id) && isr.contains(id)) } @@ -554,6 +560,7 @@ final case class OfflinePartitionLeaderElectionStrategy(allowUnclean: Boolean) e final case object ReassignPartitionLeaderElectionStrategy extends PartitionLeaderElectionStrategy final case object PreferredReplicaPartitionLeaderElectionStrategy extends PartitionLeaderElectionStrategy final case object ControlledShutdownPartitionLeaderElectionStrategy extends PartitionLeaderElectionStrategy +final case class RecommendedLeaderElectionStrategy(recommendedLeaders: Map[TopicPartition, Int]) extends PartitionLeaderElectionStrategy sealed trait PartitionState { def state: Byte diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index b966399b118c8..b8ec9597ded97 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -3098,6 +3098,7 @@ class KafkaApis(val requestChannel: RequestChannel, replicaManager.electLeaders( zkSupport.controller, partitions, + electionRequest.partitionRecommendedLeaders, electionRequest.electionType, sendResponseCallback(ApiError.NONE), electionRequest.data.timeoutMs diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index c60c7bea2a6a9..d0879f012fbee 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -2220,6 +2220,7 @@ class ReplicaManager(val config: KafkaConfig, def electLeaders( controller: KafkaController, partitions: Set[TopicPartition], + partitionRecommendedLeaders: Map[TopicPartition, Int], electionType: ElectionType, responseCallback: Map[TopicPartition, ApiError] => Unit, requestTimeout: Int @@ -2255,7 +2256,7 @@ class ReplicaManager(val config: KafkaConfig, } } - controller.electLeaders(partitions, electionType, electionCallback) + controller.electLeaders(partitions, partitionRecommendedLeaders, electionType, electionCallback) } def activeProducerState(requestPartition: TopicPartition): DescribeProducersResponseData.PartitionResponse = { diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala index dfaae2f1b317c..1baebd8f52278 100644 --- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala @@ -1051,7 +1051,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness { val tp0 = new TopicPartition("t", 0) val tp1 = new TopicPartition("t", 1) val partitions = Set(tp0, tp1) - val event1 = ReplicaLeaderElection(Some(partitions), ElectionType.PREFERRED, ZkTriggered, partitionsMap => { + val event1 = ReplicaLeaderElection(Some(partitions), Some(Map.empty), ElectionType.PREFERRED, ZkTriggered, partitionsMap => { for (partition <- partitionsMap) { partition._2 match { case Left(e) => assertEquals(Errors.NOT_CONTROLLER, e.error()) diff --git a/core/src/test/scala/unit/kafka/controller/MockPartitionStateMachine.scala b/core/src/test/scala/unit/kafka/controller/MockPartitionStateMachine.scala index b9a4d04198da0..a2d38beeee87e 100644 --- a/core/src/test/scala/unit/kafka/controller/MockPartitionStateMachine.scala +++ b/core/src/test/scala/unit/kafka/controller/MockPartitionStateMachine.scala @@ -108,6 +108,8 @@ class MockPartitionStateMachine(controllerContext: ControllerContext, leaderForPreferredReplica(controllerContext, validLeaderAndIsrs) case ControlledShutdownPartitionLeaderElectionStrategy => leaderForControlledShutdown(controllerContext, validLeaderAndIsrs) + case RecommendedLeaderElectionStrategy(recommendedLeaders) => + leaderForRecommendation(controllerContext, validLeaderAndIsrs, recommendedLeaders) } val results: Map[TopicPartition, Either[Exception, LeaderAndIsr]] = electionResults.map { electionResult => diff --git a/core/src/test/scala/unit/kafka/server/RecommendedLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/RecommendedLeaderElectionTest.scala new file mode 100644 index 0000000000000..bbc2b2d9d94db --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/RecommendedLeaderElectionTest.scala @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.integration.KafkaServerTestHarness +import kafka.utils.TestUtils +import org.apache.kafka.clients.admin.{Admin, AdminClientConfig} +import org.apache.kafka.common.TopicPartition +import org.junit.jupiter.api.Test + +import java.util +import java.util.Properties + +class RecommendedLeaderElectionTest extends KafkaServerTestHarness { + val numNodes = 2 + val overridingProps = new Properties + override def generateConfigs: Seq[KafkaConfig] = TestUtils.createBrokerConfigs(numNodes, zkConnect).map(KafkaConfig.fromProps(_, overridingProps)) + + @Test + def testRecommendedLeaderElection(): Unit = { + val topic = "test" + val tp = new TopicPartition(topic, 0) + createTopic(topic, 1, 2) + + val currentLeader = zkClient.getTopicPartitionState(tp).get.leaderAndIsr.leader + val currentFollower = if (currentLeader == 0) { + 1 + } else { + 0 + } + val newLeader = currentFollower + + // elect the recommended leader for the partition + val adminClient = createAdminClient() + val partitionWithRecommendedLeaders = new util.HashMap[TopicPartition, Integer]() + partitionWithRecommendedLeaders.put(tp, newLeader) + adminClient.electRecommendedLeaders(partitionWithRecommendedLeaders).all().get() + adminClient.close() + + // wait until the leader has changed to the recommended one + TestUtils.waitUntilTrue(() => { + zkClient.getTopicPartitionState(tp).get.leaderAndIsr.leader == newLeader + }, s"The leader cannot be changed to the recommended one $newLeader") + } + + @Test + def testRecommendedLeaderElectionWithoutLeaderInRequest(): Unit = { + val topic = "test" + createTopic(topic, 1, 2) + + val adminClient = createAdminClient() + val partitionWithRecommendedLeaders = new util.HashMap[TopicPartition, Integer]() + // elect with empty map should result in no exceptions + adminClient.electRecommendedLeaders(partitionWithRecommendedLeaders).all().get() + adminClient.close() + } + + def createAdminClient(props: Properties = new Properties): Admin = { + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) + Admin.create(props) + } +}