Skip to content

Commit

Permalink
[LI-HOTFIX] [Delayed Election PR - Part 1] When a corrupted broker jo…
Browse files Browse the repository at this point in the history
…ins cluster, it should register with ZK (#429)

* [LI-HOTFIX] When a corrupted broker joins cluster, it should register with ZK.

TICKET = LIKAFKA-49941
LI_DESCRIPTION =
[Delayed Election PR - Part 1]
When a corrupted broker joins a Kafka cluster, it needs special treatment before it can be elected leader for the topics on the broker. If "delayed election" feature is enabled, we want to wait for a period of time to elect the leader for partitions on this broker, and make the decision based on whichever replica has the highest broker epoch and offset.

To achieve this, the first step is for the controller to know about this corrupted broker. This is achieved in this PR by registering under /brokers/corrupted/<id>. The controller will then get to know about this broker, and as a first step, remove this broker from the ISR of all partitions that are on it. This is done to make sure that a preferred leader election does not accidentally make this broker the leader while the delayed election is taking place.

This PR will be followed up with a PR where the controller actually prevents this leader from becoming the leader for all OfflineReplicas on it immediately, and initiate a delayed election.

EXIT_CRITERIA = Until this change (corrupted brokers delayed election) is merged into upstream Kafka.
  • Loading branch information
wyuka authored Mar 23, 2023
1 parent 3fa25a9 commit 91a3559
Show file tree
Hide file tree
Showing 9 changed files with 372 additions and 14 deletions.
6 changes: 6 additions & 0 deletions core/src/main/scala/kafka/controller/ControllerContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ class ControllerContext {
private val liveBrokers = mutable.Set.empty[Broker]
private val liveBrokerEpochs = mutable.Map.empty[Int, Long]
private val leaderAndIsrRequestSent = mutable.Map.empty[Int, Boolean]
val corruptedBrokers = mutable.Map.empty[Int, Boolean]
var epoch: Int = KafkaController.InitialControllerEpoch
var epochZkVersion: Int = KafkaController.InitialControllerEpochZkVersion

Expand Down Expand Up @@ -259,6 +260,11 @@ class ControllerContext {
livePreferredControllerIds = preferredControllerIds
}

def setCorruptedBrokers(brokers: Map[Int, Boolean]): Unit = {
corruptedBrokers.clear()
corruptedBrokers ++= brokers
}

def markLeaderAndIsrSent(brokerId: Int): Unit = {
leaderAndIsrRequestSent.put(brokerId, true)
}
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/scala/kafka/controller/ControllerState.scala
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ object ControllerState {
def value = 101
}

case object CorruptedBrokersChange extends ControllerState {
def value = 102
}

val values: Seq[ControllerState] = Seq(Idle, ControllerChange, BrokerChange, TopicChange, TopicDeletion,
AlterPartitionReassignment, AutoLeaderBalance, ManualLeaderBalance, ControlledShutdown, IsrChange,
LeaderAndIsrResponseReceived, LogDirChange, ControllerShutdown, UncleanLeaderElectionEnable,
Expand Down
142 changes: 139 additions & 3 deletions core/src/main/scala/kafka/controller/KafkaController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import org.apache.kafka.common.message.{AllocateProducerIdsRequestData, Allocate
import org.apache.kafka.common.feature.{Features, FinalizedVersionRange}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{AbstractControlRequest, ApiError, LeaderAndIsrResponse, UpdateFeaturesRequest, UpdateMetadataResponse}
import org.apache.kafka.common.requests.{AbstractControlRequest, ApiError, LeaderAndIsrResponse, ListOffsetsResponse, UpdateFeaturesRequest, UpdateMetadataResponse}
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.server.common.ProducerIdsBlock
import org.apache.kafka.server.policy.CreateTopicPolicy
Expand Down Expand Up @@ -170,6 +170,7 @@ class KafkaController(val config: KafkaConfig,
private val brokerModificationsHandlers: mutable.Map[Int, BrokerModificationsHandler] = mutable.Map.empty
private val topicChangeHandler = new TopicChangeHandler(eventManager)
private val topicDeletionHandler = new TopicDeletionHandler(eventManager)
private val corruptedBrokersHandler = new CorruptedBrokersHandler(eventManager)
private val partitionModificationsHandlers: mutable.Map[String, PartitionModificationsHandler] = mutable.Map.empty
private val partitionReassignmentHandler = new PartitionReassignmentHandler(eventManager)
private val preferredReplicaElectionHandler = new PreferredReplicaElectionHandler(eventManager)
Expand Down Expand Up @@ -331,7 +332,7 @@ class KafkaController(val config: KafkaConfig,

// before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks
val childChangeHandlers = Seq(brokerChangeHandler, topicChangeHandler, topicDeletionHandler, logDirEventNotificationHandler,
isrChangeNotificationHandler)
isrChangeNotificationHandler, corruptedBrokersHandler)
childChangeHandlers.foreach(zkClient.registerZNodeChildChangeHandler)

val nodeChangeHandlers = Seq(preferredReplicaElectionHandler, partitionReassignmentHandler, topicDeletionFlagHandler)
Expand All @@ -357,6 +358,8 @@ class KafkaController(val config: KafkaConfig,
// PERF TODO: add controllerContextSnapshot as optional (defaultable) 3rd arg:
sendUpdateMetadataRequest(controllerContextSnapshot.liveOrShuttingDownBrokerIds.toSeq, Set.empty)

maybeCleanIsrsForCorruptedBrokers(controllerContext.corruptedBrokers)

info(s"Starting replica state machine ${KafkaController.timing(-1, failoverStartMs, time)}") // much slower than ZK partition scan
replicaStateMachine.startup(controllerContextSnapshot, failoverStartMs)
info(s"Starting partition state machine ${KafkaController.timing(-1, failoverStartMs, time)}")
Expand Down Expand Up @@ -578,6 +581,7 @@ class KafkaController(val config: KafkaConfig,
// shutdown partition state machine
partitionStateMachine.shutdown()
zkClient.unregisterZNodeChildChangeHandler(topicChangeHandler.path)
zkClient.unregisterZNodeChildChangeHandler(corruptedBrokersHandler.path)
unregisterPartitionModificationsHandlers(partitionModificationsHandlers.keys.toSeq)
zkClient.unregisterZNodeChildChangeHandler(topicDeletionHandler.path)
zkClient.unregisterZNodeChangeHandler(topicDeletionFlagHandler.path)
Expand Down Expand Up @@ -1091,6 +1095,11 @@ class KafkaController(val config: KafkaConfig,
}
info(s"Finished recursing ${allPartitions.size} partitions in ZK and setting up LAI cache ${KafkaController.timing(taskStartMs, failoverStartMs, time)}")

// Load corrupted brokers list from ZK
taskStartMs = time.milliseconds()
controllerContext.setCorruptedBrokers(zkClient.getCorruptedBrokers.mapValues(_.clearedFromIsrs))
info(s"Finished loading corrupted partitions from ZK into context ${KafkaController.timing(taskStartMs, failoverStartMs, time)}")

// start the channel manager
controllerChannelManager.startup()
info(s"Currently active brokers in the cluster: ${controllerContext.liveBrokerIds}")
Expand Down Expand Up @@ -2952,6 +2961,119 @@ class KafkaController(val config: KafkaConfig,
allocateProducerIdsRequest.brokerEpoch, eventManagerCallback))
}

def processCorruptedBrokersChange(): Unit = {
if (!isActive) {
return
}
val corruptedBrokers = zkClient.getCorruptedBrokers.mapValues(_.clearedFromIsrs)
info(s"Received corrupted brokers change = $corruptedBrokers")
if (corruptedBrokers == controllerContext.corruptedBrokers) {
return
}
maybeCleanIsrsForCorruptedBrokers(corruptedBrokers)
}

private def maybeCleanIsrsForCorruptedBrokers(originalCorruptedBrokers: Map[Int, Boolean]): Unit = {
// If delayed election is not enabled, do not clear corrupted broker from ISR. If it is removed, it
// may cause problems with elections when unclean leader election is disabled.
if (!config.isDelayedElectionEnabled) {
return
}

def updateCorruptedBrokerToCleanedInZk(brokerId: Int): Try[Unit] = Try {
info(s"Updating zk node for corrupted broker $brokerId to cleaned")
val corruptedBroker = CorruptedBroker(brokerId, clearedFromIsrs = true)
zkClient.updateCorruptedBroker(corruptedBroker)
}

val corruptedBrokersToClearFromIsr = originalCorruptedBrokers
.filterNot { case (_, isrCleared) => isrCleared }.keys
info(s"Clearing corrupted brokers $corruptedBrokersToClearFromIsr from ISR")
val corruptedBrokersIsrClearResults = corruptedBrokersToClearFromIsr.map { corruptedBrokerId =>
val tryResult = removeBrokerIdFromPartitionsIsrs(corruptedBrokerId).flatMap { _ =>
updateCorruptedBrokerToCleanedInZk(corruptedBrokerId)
}
corruptedBrokerId -> tryResult
}

val (isrCleanSuccessful, isrCleanUnsuccessful) = corruptedBrokersIsrClearResults.partition(_._2.isSuccess)
val corruptedBrokersWithUnsuccessfulClean = isrCleanUnsuccessful.map(isrAndTryResult => isrAndTryResult._1)
if (corruptedBrokersWithUnsuccessfulClean.nonEmpty) {
error(s"Could not clear corrupted brokers $corruptedBrokersWithUnsuccessfulClean from ISR")
}

val modifiedCorruptedBrokers = isrCleanSuccessful
.map{ case (brokerId, tryResult) => brokerId -> true }
.toMap

controllerContext.setCorruptedBrokers(originalCorruptedBrokers ++ modifiedCorruptedBrokers)
}

def removeBrokerIdFromPartitionsIsrs(brokerId: Int): Try[Unit] = Try {
val partitionsOnBroker = controllerContext.partitionsOnBroker(brokerId).toSeq
var remaining = partitionsOnBroker
var success = true
while (remaining.nonEmpty) {
val (finishedRemoval, removalsToRetry) = doRemoveBrokerIdFromPartitionsIsrs(brokerId, remaining)
remaining = removalsToRetry

finishedRemoval.foreach {
case (partition, Left(e)) =>
logger.error(s"Could not remove corrupted brokerId $brokerId from partition $partition", e)
success = false
case _ =>
}
}

if (!success) {
throw new KafkaException(s"Unexpected error trying to remove brokerId $brokerId from ISRs")
}
}

private def doRemoveBrokerIdFromPartitionsIsrs(brokerId: Int, partitions: Iterable[TopicPartition])
: (Map[TopicPartition, Either[Exception, LeaderIsrAndControllerEpoch]], Seq[TopicPartition]) = {
val leaderAndIsrs = partitions
.map(tp => tp -> controllerContext.partitionLeadershipInfo(tp))
.toMap
.filterNot { case (_, leaderAndIsrAndControllerEpochOption) => leaderAndIsrAndControllerEpochOption.isEmpty }
.mapValues(_.get)
.filter { case (_, leaderAndIsrAndControllerEpoch) => leaderAndIsrAndControllerEpoch.leaderAndIsr.isr.contains(brokerId) }
.mapValues(_.leaderAndIsr)

val updatedLeaderAndIsrs = leaderAndIsrs.map { case(tp, leaderAndIsr) =>
// Validate that brokerId is not present in ISR for any partition that has a leader
// The two checks below are like assertions, and they are never expected to occur.
if (leaderAndIsr.leader != LeaderAndIsr.NoLeader) {
logger.warn(
s"""Corruption-recovery: Unexpected entry $brokerId in ISR for partition
| $tp, leaderAndIsr = $leaderAndIsr. This should never happen, please investigate.""".stripMargin)
}
else if (leaderAndIsr.isr.length > 1) {
logger.warn(
s"""Corruption-recovery: Unexpected multiple entries in ISR for leaderless partition
| $tp, leaderAndIsr = $leaderAndIsr. This should never happen, please investigate.""".stripMargin)
}

val newLeader = if (brokerId == leaderAndIsr.leader) LeaderAndIsr.NoLeader else leaderAndIsr.leader
val adjustedIsr = leaderAndIsr.isr.filter(_ != brokerId)
tp -> leaderAndIsr.newLeaderAndIsr(newLeader, adjustedIsr)
}

val UpdateLeaderAndIsrResult(finishedPartitions, updatesToRetry) =
zkClient.updateLeaderAndIsr(updatedLeaderAndIsrs, controllerContext.epoch, controllerContext.epochZkVersion)

val leaderIsrAndControllerEpochs = finishedPartitions.map { case (partition, result) =>
(partition, result.map { leaderAndIsr =>
val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerContext.epoch)
// Add updated LeaderAndIsr with controller epoch to cache
controllerContext.putPartitionLeadershipInfo(partition, leaderIsrAndControllerEpoch)
leaderIsrAndControllerEpoch
})
}

(leaderIsrAndControllerEpochs, updatesToRetry)
}

def processAllocateProducerIds(brokerId: Int, brokerEpoch: Long, callback: Either[Errors, ProducerIdsBlock] => Unit): Unit = {
// Handle a few short-circuits
if (!isActive) {
Expand Down Expand Up @@ -3129,6 +3251,8 @@ class KafkaController(val config: KafkaConfig,
processStartup()
case SkipControlledShutdownSafetyCheck(id, brokerEpoch, callback) =>
processSkipControlledShutdownSafetyCheck(id, brokerEpoch, callback)
case CorruptedBrokersChange =>
processCorruptedBrokersChange()
}
} catch {
case e: ControllerMovedException =>
Expand All @@ -3154,6 +3278,14 @@ class BrokerChangeHandler(eventManager: ControllerEventManager) extends ZNodeChi
}
}

class CorruptedBrokersHandler(eventManager: ControllerEventManager) extends ZNodeChildChangeHandler {
override val path: String = CorruptedBrokersZNode.path

override def handleChildChange(): Unit = {
eventManager.put(CorruptedBrokersChange)
}
}

class PreferredControllerChangeHandler(eventManager: ControllerEventManager) extends ZNodeChildChangeHandler {
override val path: String = PreferredControllersZNode.path

Expand Down Expand Up @@ -3378,6 +3510,11 @@ case object PreferredControllerChange extends ControllerEvent {
override def preempt(): Unit = {}
}

case object CorruptedBrokersChange extends ControllerEvent {
override def state: ControllerState = ControllerState.CorruptedBrokersChange
override def preempt(): Unit = {}
}

case class BrokerModifications(brokerId: Int) extends ControllerEvent {
override def state: ControllerState = ControllerState.BrokerChange
override def preempt(): Unit = {}
Expand Down Expand Up @@ -3470,7 +3607,6 @@ case class AllocateProducerIds(brokerId: Int, brokerEpoch: Long, callback: Eithe
override def preempt(): Unit = {}
}


// Used only in test cases
abstract class MockEvent(val state: ControllerState) extends ControllerEvent {
def process(): Unit
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ object Defaults {
val LiLogCleanerFineGrainedLockEnabled = true
val LiDropCorruptedFilesEnabled = false
val LiConsumerFetchSampleRatio = 0.01
val LiLeaderElectionOnCorruptionWaitMs = 0L
val LiZookeeperPaginationEnable = false
val LiRackIdMapperClassNameForRackAwareReplicaAssignment: String = null
}
Expand Down Expand Up @@ -454,6 +455,7 @@ object KafkaConfig {
val LiLogCleanerFineGrainedLockEnableProp = "li.log.cleaner.fine.grained.lock.enable"
val LiDropCorruptedFilesEnableProp = "li.drop.corrupted.files.enable"
val LiConsumerFetchSampleRatioProp = "li.consumer.fetch.sample.ratio"
val LiLeaderElectionOnCorruptionWaitMsProp = "li.leader.election.on.corruption.wait.ms"
val LiZookeeperPaginationEnableProp = "li.zookeeper.pagination.enable"
val LiRackIdMapperClassNameForRackAwareReplicaAssignmentProp = "li.rack.aware.assignment.rack.id.mapper.class"
val AllowPreferredControllerFallbackProp = "allow.preferred.controller.fallback"
Expand Down Expand Up @@ -796,6 +798,10 @@ object KafkaConfig {
val LiUpdateMetadataDelayMsDoc = "Specifies how long a UpdateMetadata request with partitions should be delayed before its processing can start. This config is purely for testing the LiCombinedControl feature and should not be enabled in a production environment."
val LiDropCorruptedFilesEnableDoc = "Specifies whether the broker should delete corrupted files during startup."
val LiConsumerFetchSampleRatioDoc = "Specifies the ratio of consumer Fetch requests to sample, which must be a number in the range [0.0, 1.0]. For now, the sampling is used to derive the age of consumed data."
val LiLeaderElectionOnCorruptionWaitMsDoc =
"""Specifies how long (in milliseconds) the controller should wait for other replicas to come up before electing a broker that has data corruption as leader.
|Setting this value to 0 disables delayed leader election on corruption.
|Increasing this number increases the unavailability of impacted topics during corruption broker startup, so operators should be careful about configuring it to a high number.""".stripMargin
val LiDropFetchFollowerEnableDoc = "Specifies whether a leader should drop Fetch requests from followers. This config is used to simulate a slow leader and test the leader initiated leadership transfer"
val LiDenyAlterIsrDoc = "Test only config, and never enable this in a real cluster. Specifies whether controller should deny the AlterISRRequest."
val LiNumControllerInitThreadsDoc = "Number of threads (and Zookeeper clients + connections) to be used while recursing the topic-partitions tree in Zookeeper during controller startup/failover."
Expand Down Expand Up @@ -1248,6 +1254,7 @@ object KafkaConfig {
.define(LiLogCleanerFineGrainedLockEnableProp, BOOLEAN, Defaults.LiLogCleanerFineGrainedLockEnabled, LOW, LiLogCleanerFineGrainedLockEnableDoc)
.define(LiDropCorruptedFilesEnableProp, BOOLEAN, Defaults.LiDropCorruptedFilesEnabled, HIGH, LiDropCorruptedFilesEnableDoc)
.define(LiConsumerFetchSampleRatioProp, DOUBLE, Defaults.LiConsumerFetchSampleRatio, between(0.0, 1.0), LOW, LiConsumerFetchSampleRatioDoc)
.define(LiLeaderElectionOnCorruptionWaitMsProp, LONG, Defaults.LiLeaderElectionOnCorruptionWaitMs, atLeast(0), HIGH, LiLeaderElectionOnCorruptionWaitMsDoc)
.define(LiZookeeperPaginationEnableProp, BOOLEAN, Defaults.LiZookeeperPaginationEnable, LOW, LiZookeeperPaginationEnableDoc)
.define(LiRackIdMapperClassNameForRackAwareReplicaAssignmentProp, STRING, Defaults.LiRackIdMapperClassNameForRackAwareReplicaAssignment, LOW, LiRackIdMapperClassNameForRackAwareReplicaAssignmentDoc)
.define(AllowPreferredControllerFallbackProp, BOOLEAN, Defaults.AllowPreferredControllerFallback, HIGH, AllowPreferredControllerFallbackDoc)
Expand Down Expand Up @@ -1795,6 +1802,8 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
def liLogCleanerFineGrainedLockEnable = getBoolean(KafkaConfig.LiLogCleanerFineGrainedLockEnableProp)
val liDropCorruptedFilesEnable = getBoolean(KafkaConfig.LiDropCorruptedFilesEnableProp)
val liConsumerFetchSampleRatio = getDouble(KafkaConfig.LiConsumerFetchSampleRatioProp)
val liLeaderElectionOnCorruptionWaitMs = getLong(KafkaConfig.LiLeaderElectionOnCorruptionWaitMsProp)

def liZookeeperPaginationEnable = getBoolean(KafkaConfig.LiZookeeperPaginationEnableProp)
def unofficialClientLoggingEnable = getBoolean(KafkaConfig.UnofficialClientLoggingEnableProp)
def unofficialClientCacheTtl = getLong(KafkaConfig.UnofficialClientCacheTtlProp)
Expand Down Expand Up @@ -2162,6 +2171,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
def usesTopicId: Boolean =
usesSelfManagedQuorum || interBrokerProtocolVersion >= KAFKA_2_8_IV0

def isDelayedElectionEnabled: Boolean = liLeaderElectionOnCorruptionWaitMs > 0
def isRemoteLogStorageEnabled: Boolean =
getBoolean(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP)

Expand Down
6 changes: 5 additions & 1 deletion core/src/main/scala/kafka/server/KafkaServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import org.apache.kafka.common.requests.{ControlledShutdownRequest, ControlledSh
import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
import org.apache.kafka.common.security.{JaasContext, JaasUtils}
import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time, Utils, PoisonPill}
import org.apache.kafka.common.utils.{AppInfoParser, LogContext, PoisonPill, Time, Utils}
import org.apache.kafka.common.{Endpoint, Node, TopicPartition}
import org.apache.kafka.metadata.BrokerState
import org.apache.kafka.server.authorizer.Authorizer
Expand Down Expand Up @@ -322,6 +322,10 @@ class KafkaServer(
autoTopicCreationChannel = Some(clientToControllerChannelManager)
}

if (config.liDropCorruptedFilesEnable) {
zkClient.registerCorruptedBrokerId(config.brokerId)
}

val apiVersionManager = ApiVersionManager(
ListenerType.ZK_BROKER,
config,
Expand Down
Loading

0 comments on commit 91a3559

Please sign in to comment.