Skip to content

Commit

Permalink
[LI-HOTFIX] Add API to support electing recommended leaders for parti…
Browse files Browse the repository at this point in the history
…tions (#390)

TICKET = LIKAFKA-45529
LI_DESCRIPTION =
Sometimes, a leader host can become too busy to serve Fetch requests from followers within the default request timeout of 10s. The impact is that all followers experience the timeout and fall out of the ISR, resulting in UnderMinISR partitions.

A proposed solution to this problem is to let the leader host request a leadership switch for the affected partitions.
The leader host already knows what followers have been in the ISR, and thus can recommend a new leader to the controller.
This PR is the 1st step toward that goal by adding a new API to support electing recommended leaders for partitions.
Specifically, the PR includes the following changes

adding support in the controller for electing the recommended leaders. The recommended leader must be alive and already in the ISR;
changing the ElectLeadersRequest.json to add a new optional field that indicates the recommended leaders;
exposing a new API in the KafkaAdminClient.
EXIT_CRITERIA = We will likely propose this change as a KIP in upstream Kafka. Thus this hotfix can exit when the change is merged upstream and pulled in as part of a major release.
  • Loading branch information
gitlw authored Sep 21, 2022
1 parent 17f549b commit 81dd4a6
Show file tree
Hide file tree
Showing 16 changed files with 285 additions and 17 deletions.
40 changes: 40 additions & 0 deletions clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
Original file line number Diff line number Diff line change
Expand Up @@ -1002,6 +1002,46 @@ ElectLeadersResult electLeaders(
ElectLeadersOptions options);


default ElectLeadersResult electRecommendedLeaders(
Map<TopicPartition, Integer> partitionsWithRecommendedLeaders) {
return electRecommendedLeaders(partitionsWithRecommendedLeaders, new ElectLeadersOptions());
}

/**
* Switch the leadership of the given {@code partitions} to the corresponding recommending replicas.
* <p>
* This operation is not transactional, so it may succeed for some partitions while fail for others.
* <p>
* It may take several seconds after this method returns success for all the brokers in the cluster
* to become aware that the partitions have new leaders. During this time,
* {@link #describeTopics(Collection)} may not return information about the partitions'
* new leaders.
* <p>
* The following exceptions can be anticipated when calling {@code get()} on the future obtained
* from the returned {@link ElectLeadersResult}:
* <ul>
* <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException}
* if the authenticated user didn't have alter access to the cluster.</li>
* <li>{@link org.apache.kafka.common.errors.UnknownTopicOrPartitionException}
* if the topic or partition did not exist within the cluster.</li>
* <li>{@link org.apache.kafka.common.errors.InvalidTopicException}
* if the topic was already queued for deletion.</li>
* <li>{@link org.apache.kafka.common.errors.NotControllerException}
* if the request was sent to a broker that was not the controller for the cluster.</li>
* <li>{@link org.apache.kafka.common.errors.TimeoutException}
* if the request timed out before the election was complete.</li>
* <li>{@link org.apache.kafka.common.errors.LeaderNotAvailableException}
* if the preferred leader was not alive or not in the ISR.</li>
* </ul>
*
* @param partitionsWithRecommendedLeaders The map from partitions to their corresponding recommended new leaders
* @param options The options to use when electing the leaders.
* @return The ElectLeadersResult.
*/
ElectLeadersResult electRecommendedLeaders(
Map<TopicPartition, Integer> partitionsWithRecommendedLeaders,
ElectLeadersOptions options);

/**
* Change the reassignments for one or more partitions.
* Providing an empty Optional (e.g via {@link Optional#empty()}) will <bold>revert</bold> the reassignment for the associated partition.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3438,19 +3438,37 @@ void handleFailure(Throwable throwable) {
return new MoveControllerResult(future);
}

@Override
public ElectLeadersResult electRecommendedLeaders(
Map<TopicPartition, Integer> partitionsWithRecommendedLeaders,
ElectLeadersOptions options) {
return electLeaders(ElectionType.RECOMMENDED, partitionsWithRecommendedLeaders.keySet(),
partitionsWithRecommendedLeaders, options);
}


@Override
public ElectLeadersResult electLeaders(
final ElectionType electionType,
final Set<TopicPartition> topicPartitions,
ElectLeadersOptions options) {
return electLeaders(electionType, topicPartitions, Collections.<TopicPartition, Integer>emptyMap(), options);
}


private ElectLeadersResult electLeaders(
final ElectionType electionType,
final Set<TopicPartition> topicPartitions,
final Map<TopicPartition, Integer> recommendedLeaders,
ElectLeadersOptions options) {
final KafkaFutureImpl<Map<TopicPartition, Optional<Throwable>>> electionFuture = new KafkaFutureImpl<>();
final long now = time.milliseconds();
runnable.call(new Call("electLeaders", calcDeadlineMs(now, options.timeoutMs()),
new ControllerNodeProvider()) {

@Override
public ElectLeadersRequest.Builder createRequest(int timeoutMs) {
return new ElectLeadersRequest.Builder(electionType, topicPartitions, timeoutMs);
return new ElectLeadersRequest.Builder(electionType, topicPartitions, recommendedLeaders, timeoutMs);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,13 @@ public ElectLeadersResult electLeaders(ElectionType electionType, Set<TopicParti
return null;
}

@Override
public ElectLeadersResult electRecommendedLeaders(
Map<TopicPartition, Integer> partitionsWithRecommendedLeaders,
ElectLeadersOptions options) {
return null;
}

@Override
public AlterPartitionReassignmentsResult alterPartitionReassignments(
Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
*/
@InterfaceStability.Evolving
public enum ElectionType {
PREFERRED((byte) 0), UNCLEAN((byte) 1);
PREFERRED((byte) 0), UNCLEAN((byte) 1), RECOMMENDED((byte) 2);

public final byte value;

Expand All @@ -42,6 +42,8 @@ public static ElectionType valueOf(byte value) {
return PREFERRED;
} else if (value == UNCLEAN.value) {
return UNCLEAN;
} else if (value == RECOMMENDED.value) {
return RECOMMENDED;
} else {
throw new IllegalArgumentException(
String.format("Value %s must be one of %s", value, Arrays.asList(ElectionType.values())));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.common.ElectionType;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnsupportedVersionException;
Expand All @@ -36,12 +39,21 @@ public class ElectLeadersRequest extends AbstractRequest {
public static class Builder extends AbstractRequest.Builder<ElectLeadersRequest> {
private final ElectionType electionType;
private final Collection<TopicPartition> topicPartitions;
private final Map<TopicPartition, Integer> recommendedLeaders;
private final int timeoutMs;

public Builder(ElectionType electionType, Collection<TopicPartition> topicPartitions, int timeoutMs) {
public Builder(ElectionType electionType, Collection<TopicPartition> topicPartitions,
int timeoutMs) {
this(electionType, topicPartitions, Collections.emptyMap(), timeoutMs);
}

public Builder(ElectionType electionType, Collection<TopicPartition> topicPartitions,
Map<TopicPartition, Integer> recommendedLeaders,
int timeoutMs) {
super(ApiKeys.ELECT_LEADERS);
this.electionType = electionType;
this.topicPartitions = topicPartitions;
this.recommendedLeaders = recommendedLeaders;
this.timeoutMs = timeoutMs;
}

Expand Down Expand Up @@ -75,6 +87,15 @@ private ElectLeadersRequestData toRequestData(short version) {
data.topicPartitions().add(tps);
}
tps.partitions().add(tp.partition());

// check if there is a recommended leader
Optional<Integer> recommendedLeader = Optional.ofNullable(recommendedLeaders.get(tp));
if (recommendedLeader.isPresent()) {
tps.recommendedPartitionLeaders().add(new ElectLeadersRequestData.RecommendedPartitionLeaderState()
.setPartitionIndex(tp.partition())
.setRecommendedLeader(recommendedLeader.get())
);
}
});
} else {
data.setTopicPartitions(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,16 @@
{ "name": "Topic", "type": "string", "versions": "0+", "entityType": "topicName", "mapKey": true,
"about": "The name of a topic." },
{ "name": "Partitions", "type": "[]int32", "versions": "0+",
"about": "The partitions of this topic whose leader should be elected." }
"about": "The partitions of this topic whose leader should be elected." },
{ "name": "RecommendedPartitionLeaders", "type": "[]RecommendedPartitionLeaderState", "versions": "2+", "tag": 0, "taggedVersions": "2+",
"about": "The recommended new leaders for partitions",
"fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "2+",
"about": "The partition index." },
{ "name": "RecommendedLeader", "type": "int32", "versions": "2+",
"about": "The recommended leader." }
]
}
]
},
{ "name": "TimeoutMs", "type": "int32", "versions": "0+", "default": "60000",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,14 @@ synchronized public ElectLeadersResult electLeaders(
throw new UnsupportedOperationException("Not implemented yet");
}

@Override
synchronized public ElectLeadersResult electRecommendedLeaders(
Map<TopicPartition, Integer> partitionsWithRecommendedLeaders,
ElectLeadersOptions options) {
throw new UnsupportedOperationException("Not implemented yet");
}


@Override
synchronized public RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroup(String groupId, RemoveMembersFromConsumerGroupOptions options) {
throw new UnsupportedOperationException("Not implemented yet");
Expand Down
13 changes: 13 additions & 0 deletions core/src/main/scala/kafka/api/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,19 @@ package object api {
}
}

def partitionRecommendedLeaders: Map[TopicPartition, Int] = {
if (self.data.topicPartitions == null) {
Map.empty
} else {
self.data.topicPartitions.asScala.iterator.flatMap { topicPartition =>
topicPartition.recommendedPartitionLeaders().asScala.map { recommendedPartitionLeaderState =>
new TopicPartition(topicPartition.topic, recommendedPartitionLeaderState.partitionIndex()) ->
recommendedPartitionLeaderState.recommendedLeader()
}
}.toMap
}
}

def electionType: ElectionType = {
if (self.version == 0) {
ElectionType.PREFERRED
Expand Down
34 changes: 32 additions & 2 deletions core/src/main/scala/kafka/controller/Election.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ package kafka.controller
import kafka.api.LeaderAndIsr
import kafka.utils.Logging
import org.apache.kafka.common.TopicPartition

import scala.collection.Seq
import scala.collection.{Map, Seq}

case class ElectionResult(topicPartition: TopicPartition, leaderAndIsr: Option[LeaderAndIsr], liveReplicas: Seq[Int])

Expand Down Expand Up @@ -103,6 +102,37 @@ object Election extends Logging {
}
}

private def leaderForRecommendation(partition: TopicPartition,
leaderAndIsr: LeaderAndIsr,
recommendedLeader: Option[Int],
controllerContext: ControllerContext,
controllerContextSnapshot: ControllerContextSnapshot): ElectionResult = {
val assignment = controllerContext.partitionReplicaAssignment(partition)
val liveReplicas = assignment.filter(replica => controllerContextSnapshot.isReplicaOnline(replica, partition))
val isr = leaderAndIsr.isr
val leaderOpt = PartitionLeaderElectionAlgorithms.recommendedPartitionLeaderElection(recommendedLeader, isr, liveReplicas.toSet)
val newLeaderAndIsrOpt = leaderOpt.map(leader => leaderAndIsr.newLeader(leader))
ElectionResult(partition, newLeaderAndIsrOpt, liveReplicas)
}

/**
* Elect leaders for partitions that have a recommended leader.
*
* @param controllerContext Context with the current state of the cluster
* @param leaderAndIsrs A sequence of tuples representing the partitions that need election
* and their respective leader/ISR states
* @param recommendedLeaders A map from each partition to its recommended leader
* @return The election results
*/
def leaderForRecommendation(controllerContext: ControllerContext,
leaderAndIsrs: Seq[(TopicPartition, LeaderAndIsr)],
recommendedLeaders: Map[TopicPartition, Int]): Seq[ElectionResult] = {
val controllerContextSnapshot = ControllerContextSnapshot(controllerContext)
leaderAndIsrs.map { case (partition, leaderAndIsr) =>
leaderForRecommendation(partition, leaderAndIsr, recommendedLeaders.get(partition), controllerContext, controllerContextSnapshot)
}
}

private def leaderForPreferredReplica(partition: TopicPartition,
leaderAndIsr: LeaderAndIsr,
controllerContext: ControllerContext): ElectionResult = {
Expand Down
Loading

0 comments on commit 81dd4a6

Please sign in to comment.