Skip to content

Commit

Permalink
Cancellation of reassignments allowed if the resulting ISR is at min-…
Browse files Browse the repository at this point in the history
…ISR (#442)

Previous implementation of cancellation is more restrictive as in if any of the brokers in the original replica list are down, cancellations are disallowed. That's overly restrictive if one of the original brokers is down for maintenance while there're enough replicas alive in the original list that could handle read and write at min-ISR.

This implementation raises an error in cancellation (and disallows cancellation) only if the resulting ISR (consisting of the subset of the original alive brokers) is under min-ISR.
  • Loading branch information
sutambe authored Mar 1, 2023
1 parent 27cc188 commit 28cca3e
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 21 deletions.
43 changes: 29 additions & 14 deletions core/src/main/scala/kafka/controller/KafkaController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2281,14 +2281,14 @@ class KafkaController(val config: KafkaConfig,
val partitionsToReassign = mutable.Map.empty[TopicPartition, ReplicaAssignment]

reassignments.forKeyValue { (tp, targetReplicas) =>
val maybeApiError = validateReplicas(tp, targetReplicas)
maybeApiError match {
case None =>
maybeBuildReassignment(tp, targetReplicas) match {
val apiErrorOrNewTargetReplicas = validateReplicas(tp, targetReplicas)
apiErrorOrNewTargetReplicas match {
case Right(newTargetReplicas) =>
maybeBuildReassignment(tp, Some(newTargetReplicas)) match {
case Some(context) => partitionsToReassign.put(tp, context)
case None => reassignmentResults.put(tp, new ApiError(Errors.NO_REASSIGNMENT_IN_PROGRESS))
}
case Some(err) =>
case Left(err) =>
reassignmentResults.put(tp, err)
}
}
Expand All @@ -2302,21 +2302,36 @@ class KafkaController(val config: KafkaConfig,
}
}

private def validateReplicas(topicPartition: TopicPartition, replicas: Option[Seq[Int]]): Option[ApiError] = {
private def validateReplicas(topicPartition: TopicPartition, replicas: Option[Seq[Int]]): Either[ApiError, Seq[Int]] = {
replicas match {
case Some(targetReplicas) => validateTargetReplicas(topicPartition, targetReplicas)
case Some(targetReplicas) => {
validateTargetReplicas(topicPartition, targetReplicas)
Right(targetReplicas)
}
case None => {
// this is trying to cancel an existing replica reassignment
val replicaAssignment = controllerContext.partitionFullReplicaAssignment(topicPartition)
if (replicaAssignment.isBeingReassigned) {
val originalReplicasAlive = replicaAssignment.originReplicas.toSet.subsetOf(controllerContext.liveBrokerIds)
if (!originalReplicasAlive)
Some(new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT,
s"Replica assignment cancellation has original brokers that are not alive. " +
"Replica list: " + s"${replicaAssignment.originReplicas}, live broker list: ${controllerContext.liveBrokerIds}"))
else None
val allOriginalReplicasAlive = replicaAssignment.originReplicas.toSet.subsetOf(controllerContext.liveBrokerIds)
if (allOriginalReplicasAlive) {
// Simply fallback to the original set of replicas
Right(replicaAssignment.originReplicas)
} else {
val aliveOriginalReplicas = replicaAssignment.originReplicas.toSet.intersect(controllerContext.liveBrokerIds);
if (aliveOriginalReplicas.size >= config.liMinOriginalAliveReplicas) {
// If there are enough original replicas alive (to maintain minISR), we allow the cancellation to go through
Right(aliveOriginalReplicas.toSeq)
} else {
// If there are not enough alive replicas in the original set, we don't allow the cancellation to go though.
Left(new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT,
s"Replica assignment cancellation precondition failed. Don't have enough alive replicas in the original replica set: " +
s"replica list: ${replicaAssignment.originReplicas}, " +
s"live broker list: ${controllerContext.liveBrokerIds}, " +
s"${KafkaConfig.LiMinOriginalAliveReplicasProp}: ${config.liMinOriginalAliveReplicas}"))
}
}
} else {
Some(new ApiError(Errors.NO_REASSIGNMENT_IN_PROGRESS))
Left(new ApiError(Errors.NO_REASSIGNMENT_IN_PROGRESS))
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import kafka.log.LogConfig
import kafka.log.LogConfig.MessageFormatVersion
import kafka.message.{BrokerCompressionCodec, CompressionCodec, ZStdCompressionCodec}
import kafka.security.authorizer.AuthorizerUtils
import kafka.server.KafkaConfig.MinInSyncReplicasProp
import kafka.server.KafkaRaftServer.{BrokerRole, ControllerRole, ProcessRole}
import kafka.utils.{CoreUtils, Logging}
import kafka.utils.Implicits._
Expand Down Expand Up @@ -566,6 +567,7 @@ object KafkaConfig {
val InterBrokerProtocolVersionProp = "inter.broker.protocol.version"
val InterBrokerListenerNameProp = "inter.broker.listener.name"
val ReplicaSelectorClassProp = "replica.selector.class"
val LiMinOriginalAliveReplicasProp = "li.min.original.alive.replicas"
/** ********* Controlled shutdown configuration ***********/
val ControlledShutdownMaxRetriesProp = "controlled.shutdown.max.retries"
val ControlledShutdownRetryBackoffMsProp = "controlled.shutdown.retry.backoff.ms"
Expand Down Expand Up @@ -1042,6 +1044,7 @@ object KafkaConfig {
val TransactionsTopicSegmentBytesDoc = "The transaction topic segment bytes should be kept relatively small in order to facilitate faster log compaction and cache loads"
val TransactionsAbortTimedOutTransactionsIntervalMsDoc = "The interval at which to rollback transactions that have timed out"
val TransactionsRemoveExpiredTransactionsIntervalMsDoc = "The interval at which to remove transactions that have expired due to <code>transactional.id.expiration.ms</code> passing"
val liMinOriginalAliveReplicasDoc = "The minimum number of alive replicas in the original replica set as a precondition to cancellation of a partition reassignment (which restores the original replica set)"

/** ********* Fetch Configuration **************/
val MaxIncrementalFetchSessionCacheSlotsDoc = "The maximum number of incremental fetch sessions that we will maintain."
Expand Down Expand Up @@ -1355,6 +1358,7 @@ object KafkaConfig {
.define(InterBrokerProtocolVersionProp, STRING, Defaults.InterBrokerProtocolVersion, ApiVersionValidator, MEDIUM, InterBrokerProtocolVersionDoc)
.define(InterBrokerListenerNameProp, STRING, null, MEDIUM, InterBrokerListenerNameDoc)
.define(ReplicaSelectorClassProp, STRING, null, MEDIUM, ReplicaSelectorClassDoc)
.define(LiMinOriginalAliveReplicasProp, INT, Defaults.MinInSyncReplicas, LOW, liMinOriginalAliveReplicasDoc)

/** ********* Controlled shutdown configuration ***********/
.define(ControlledShutdownMaxRetriesProp, INT, Defaults.ControlledShutdownMaxRetries, MEDIUM, ControlledShutdownMaxRetriesDoc)
Expand Down Expand Up @@ -1911,6 +1915,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
val autoLeaderRebalanceEnable = getBoolean(KafkaConfig.AutoLeaderRebalanceEnableProp)
val leaderImbalancePerBrokerPercentage = getInt(KafkaConfig.LeaderImbalancePerBrokerPercentageProp)
val leaderImbalanceCheckIntervalSeconds = getLong(KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp)
val liMinOriginalAliveReplicas = getInt(KafkaConfig.LiMinOriginalAliveReplicasProp)
def uncleanLeaderElectionEnable: java.lang.Boolean = getBoolean(KafkaConfig.UncleanLeaderElectionEnableProp)

// We keep the user-provided String as `ApiVersion.apply` can choose a slightly different version (eg if `0.10.0`
Expand Down Expand Up @@ -2287,5 +2292,6 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
require(principalBuilderClass != null, s"${KafkaConfig.PrincipalBuilderClassProp} must be non-null")
require(classOf[KafkaPrincipalSerde].isAssignableFrom(principalBuilderClass),
s"${KafkaConfig.PrincipalBuilderClassProp} must implement KafkaPrincipalSerde")
require(liMinOriginalAliveReplicas >= Defaults.MinInSyncReplicas, KafkaConfig.liMinOriginalAliveReplicasDoc)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -371,14 +371,13 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness {
assertFalse(runVerifyAssignment(cluster.adminClient, assignment, false).partsOngoing)
}

@Test
def testInvalidCancellation(): Unit = {
cluster = new ReassignPartitionsTestCluster(zkConnect)
def setupPartitionMovementBeforeCancellation(cluster: ReassignPartitionsTestCluster) : String = {
cluster.setup()
cluster.produceMessages("foo", 0, 200)
val assignment = """{"version":1,"partitions":""" +
"""[{"topic":"foo","partition":0,"replicas":[0,1,3],"log_dirs":["any","any","any"]}""" +
"""]}"""
val assignment =
"""{"version":1,"partitions":""" +
"""[{"topic":"foo","partition":0,"replicas":[0,1,3],"log_dirs":["any","any","any"]}""" +
"""]}"""
assertEquals(unthrottledBrokerConfigs,
describeBrokerLevelThrottles(unthrottledBrokerConfigs.keySet.toSeq))
val interBrokerThrottle = 1L
Expand All @@ -392,10 +391,36 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness {
new TopicPartition("foo", 0) -> PartitionReassignmentState(Seq(0, 1, 3, 2), Seq(0, 1, 3), false)),
true, Map(), false))

assignment
}

@Test
def testAtOrAboveMinISRCancellationSucceeds(): Unit = {
cluster = new ReassignPartitionsTestCluster(zkConnect)
val assignment = setupPartitionMovementBeforeCancellation(cluster)

// shutdown one of the original replicas
cluster.servers.find(_.config.brokerId == 2).get.shutdown()

// Cancel the reassignment should result in an exception
// Canceling the reassignment should be okay because
// there are enough replicas alive in the original list. {0, 1} are alive.
assertTrue(() => {
runCancelAssignment(cluster.adminClient, assignment, true)
true
})
}

@Test
def testBelowMinISRCancellationFails(): Unit = {
cluster = new ReassignPartitionsTestCluster(zkConnect, Map(KafkaConfig.LiMinOriginalAliveReplicasProp -> "2"))
val assignment = setupPartitionMovementBeforeCancellation(cluster)

// shutdown two of the original replicas.
cluster.servers.find(_.config.brokerId == 1).get.shutdown()
cluster.servers.find(_.config.brokerId == 2).get.shutdown()

// Cancel the reassignment should result in an exception because
// there are not enough alive replicas in the original list. Only {0} is alive.
assertThrows(classOf[TerseReassignmentFailureException], () => runCancelAssignment(cluster.adminClient, assignment, true))
}

Expand Down

0 comments on commit 28cca3e

Please sign in to comment.