From 91a355924d643498ee4632309b70ca2fd61ce060 Mon Sep 17 00:00:00 2001 From: Tirtha Chatterjee Date: Thu, 23 Mar 2023 15:30:49 -0700 Subject: [PATCH] [LI-HOTFIX] [Delayed Election PR - Part 1] When a corrupted broker joins cluster, it should register with ZK (#429) * [LI-HOTFIX] When a corrupted broker joins cluster, it should register with ZK. TICKET = LIKAFKA-49941 LI_DESCRIPTION = [Delayed Election PR - Part 1] When a corrupted broker joins a Kafka cluster, it needs special treatment before it can be elected leader for the topics on the broker. If "delayed election" feature is enabled, we want to wait for a period of time to elect the leader for partitions on this broker, and make the decision based on whichever replica has the highest broker epoch and offset. To achieve this, the first step is for the controller to know about this corrupted broker. This is achieved in this PR by registering under /brokers/corrupted/. The controller will then get to know about this broker, and as a first step, remove this broker from the ISR of all partitions that are on it. This is done to make sure that a preferred leader election does not accidentally make this broker the leader while the delayed election is taking place. This PR will be followed up with a PR where the controller actually prevents this leader from becoming the leader for all OfflineReplicas on it immediately, and initiate a delayed election. EXIT_CRITERIA = Until this change (corrupted brokers delayed election) is merged into upstream Kafka. --- .../kafka/controller/ControllerContext.scala | 6 + .../kafka/controller/ControllerState.scala | 4 + .../kafka/controller/KafkaController.scala | 142 +++++++++++++++++- .../main/scala/kafka/server/KafkaConfig.scala | 10 ++ .../main/scala/kafka/server/KafkaServer.scala | 6 +- .../main/scala/kafka/zk/KafkaZkClient.scala | 34 +++++ core/src/main/scala/kafka/zk/ZkData.scala | 43 ++++++ .../kafka/api/CorruptedBrokersTest.scala | 117 +++++++++++++++ .../integration/KafkaServerTestHarness.scala | 24 +-- 9 files changed, 372 insertions(+), 14 deletions(-) create mode 100644 core/src/test/scala/integration/kafka/api/CorruptedBrokersTest.scala diff --git a/core/src/main/scala/kafka/controller/ControllerContext.scala b/core/src/main/scala/kafka/controller/ControllerContext.scala index 1f128152c0c7c..73790869be8c3 100644 --- a/core/src/main/scala/kafka/controller/ControllerContext.scala +++ b/core/src/main/scala/kafka/controller/ControllerContext.scala @@ -81,6 +81,7 @@ class ControllerContext { private val liveBrokers = mutable.Set.empty[Broker] private val liveBrokerEpochs = mutable.Map.empty[Int, Long] private val leaderAndIsrRequestSent = mutable.Map.empty[Int, Boolean] + val corruptedBrokers = mutable.Map.empty[Int, Boolean] var epoch: Int = KafkaController.InitialControllerEpoch var epochZkVersion: Int = KafkaController.InitialControllerEpochZkVersion @@ -259,6 +260,11 @@ class ControllerContext { livePreferredControllerIds = preferredControllerIds } + def setCorruptedBrokers(brokers: Map[Int, Boolean]): Unit = { + corruptedBrokers.clear() + corruptedBrokers ++= brokers + } + def markLeaderAndIsrSent(brokerId: Int): Unit = { leaderAndIsrRequestSent.put(brokerId, true) } diff --git a/core/src/main/scala/kafka/controller/ControllerState.scala b/core/src/main/scala/kafka/controller/ControllerState.scala index d890d6eaa212b..bd33b82f85e88 100644 --- a/core/src/main/scala/kafka/controller/ControllerState.scala +++ b/core/src/main/scala/kafka/controller/ControllerState.scala @@ -130,6 +130,10 @@ object ControllerState { def value = 101 } + case object CorruptedBrokersChange extends ControllerState { + def value = 102 + } + val values: Seq[ControllerState] = Seq(Idle, ControllerChange, BrokerChange, TopicChange, TopicDeletion, AlterPartitionReassignment, AutoLeaderBalance, ManualLeaderBalance, ControlledShutdown, IsrChange, LeaderAndIsrResponseReceived, LogDirChange, ControllerShutdown, UncleanLeaderElectionEnable, diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index add7694688aa1..060a87a2deabb 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -40,7 +40,7 @@ import org.apache.kafka.common.message.{AllocateProducerIdsRequestData, Allocate import org.apache.kafka.common.feature.{Features, FinalizedVersionRange} import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.Errors -import org.apache.kafka.common.requests.{AbstractControlRequest, ApiError, LeaderAndIsrResponse, UpdateFeaturesRequest, UpdateMetadataResponse} +import org.apache.kafka.common.requests.{AbstractControlRequest, ApiError, LeaderAndIsrResponse, ListOffsetsResponse, UpdateFeaturesRequest, UpdateMetadataResponse} import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.server.common.ProducerIdsBlock import org.apache.kafka.server.policy.CreateTopicPolicy @@ -170,6 +170,7 @@ class KafkaController(val config: KafkaConfig, private val brokerModificationsHandlers: mutable.Map[Int, BrokerModificationsHandler] = mutable.Map.empty private val topicChangeHandler = new TopicChangeHandler(eventManager) private val topicDeletionHandler = new TopicDeletionHandler(eventManager) + private val corruptedBrokersHandler = new CorruptedBrokersHandler(eventManager) private val partitionModificationsHandlers: mutable.Map[String, PartitionModificationsHandler] = mutable.Map.empty private val partitionReassignmentHandler = new PartitionReassignmentHandler(eventManager) private val preferredReplicaElectionHandler = new PreferredReplicaElectionHandler(eventManager) @@ -331,7 +332,7 @@ class KafkaController(val config: KafkaConfig, // before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks val childChangeHandlers = Seq(brokerChangeHandler, topicChangeHandler, topicDeletionHandler, logDirEventNotificationHandler, - isrChangeNotificationHandler) + isrChangeNotificationHandler, corruptedBrokersHandler) childChangeHandlers.foreach(zkClient.registerZNodeChildChangeHandler) val nodeChangeHandlers = Seq(preferredReplicaElectionHandler, partitionReassignmentHandler, topicDeletionFlagHandler) @@ -357,6 +358,8 @@ class KafkaController(val config: KafkaConfig, // PERF TODO: add controllerContextSnapshot as optional (defaultable) 3rd arg: sendUpdateMetadataRequest(controllerContextSnapshot.liveOrShuttingDownBrokerIds.toSeq, Set.empty) + maybeCleanIsrsForCorruptedBrokers(controllerContext.corruptedBrokers) + info(s"Starting replica state machine ${KafkaController.timing(-1, failoverStartMs, time)}") // much slower than ZK partition scan replicaStateMachine.startup(controllerContextSnapshot, failoverStartMs) info(s"Starting partition state machine ${KafkaController.timing(-1, failoverStartMs, time)}") @@ -578,6 +581,7 @@ class KafkaController(val config: KafkaConfig, // shutdown partition state machine partitionStateMachine.shutdown() zkClient.unregisterZNodeChildChangeHandler(topicChangeHandler.path) + zkClient.unregisterZNodeChildChangeHandler(corruptedBrokersHandler.path) unregisterPartitionModificationsHandlers(partitionModificationsHandlers.keys.toSeq) zkClient.unregisterZNodeChildChangeHandler(topicDeletionHandler.path) zkClient.unregisterZNodeChangeHandler(topicDeletionFlagHandler.path) @@ -1091,6 +1095,11 @@ class KafkaController(val config: KafkaConfig, } info(s"Finished recursing ${allPartitions.size} partitions in ZK and setting up LAI cache ${KafkaController.timing(taskStartMs, failoverStartMs, time)}") + // Load corrupted brokers list from ZK + taskStartMs = time.milliseconds() + controllerContext.setCorruptedBrokers(zkClient.getCorruptedBrokers.mapValues(_.clearedFromIsrs)) + info(s"Finished loading corrupted partitions from ZK into context ${KafkaController.timing(taskStartMs, failoverStartMs, time)}") + // start the channel manager controllerChannelManager.startup() info(s"Currently active brokers in the cluster: ${controllerContext.liveBrokerIds}") @@ -2952,6 +2961,119 @@ class KafkaController(val config: KafkaConfig, allocateProducerIdsRequest.brokerEpoch, eventManagerCallback)) } + def processCorruptedBrokersChange(): Unit = { + if (!isActive) { + return + } + val corruptedBrokers = zkClient.getCorruptedBrokers.mapValues(_.clearedFromIsrs) + info(s"Received corrupted brokers change = $corruptedBrokers") + if (corruptedBrokers == controllerContext.corruptedBrokers) { + return + } + maybeCleanIsrsForCorruptedBrokers(corruptedBrokers) + } + + private def maybeCleanIsrsForCorruptedBrokers(originalCorruptedBrokers: Map[Int, Boolean]): Unit = { + // If delayed election is not enabled, do not clear corrupted broker from ISR. If it is removed, it + // may cause problems with elections when unclean leader election is disabled. + if (!config.isDelayedElectionEnabled) { + return + } + + def updateCorruptedBrokerToCleanedInZk(brokerId: Int): Try[Unit] = Try { + info(s"Updating zk node for corrupted broker $brokerId to cleaned") + val corruptedBroker = CorruptedBroker(brokerId, clearedFromIsrs = true) + zkClient.updateCorruptedBroker(corruptedBroker) + } + + val corruptedBrokersToClearFromIsr = originalCorruptedBrokers + .filterNot { case (_, isrCleared) => isrCleared }.keys + info(s"Clearing corrupted brokers $corruptedBrokersToClearFromIsr from ISR") + val corruptedBrokersIsrClearResults = corruptedBrokersToClearFromIsr.map { corruptedBrokerId => + val tryResult = removeBrokerIdFromPartitionsIsrs(corruptedBrokerId).flatMap { _ => + updateCorruptedBrokerToCleanedInZk(corruptedBrokerId) + } + corruptedBrokerId -> tryResult + } + + val (isrCleanSuccessful, isrCleanUnsuccessful) = corruptedBrokersIsrClearResults.partition(_._2.isSuccess) + val corruptedBrokersWithUnsuccessfulClean = isrCleanUnsuccessful.map(isrAndTryResult => isrAndTryResult._1) + if (corruptedBrokersWithUnsuccessfulClean.nonEmpty) { + error(s"Could not clear corrupted brokers $corruptedBrokersWithUnsuccessfulClean from ISR") + } + + val modifiedCorruptedBrokers = isrCleanSuccessful + .map{ case (brokerId, tryResult) => brokerId -> true } + .toMap + + controllerContext.setCorruptedBrokers(originalCorruptedBrokers ++ modifiedCorruptedBrokers) + } + + def removeBrokerIdFromPartitionsIsrs(brokerId: Int): Try[Unit] = Try { + val partitionsOnBroker = controllerContext.partitionsOnBroker(brokerId).toSeq + var remaining = partitionsOnBroker + var success = true + while (remaining.nonEmpty) { + val (finishedRemoval, removalsToRetry) = doRemoveBrokerIdFromPartitionsIsrs(brokerId, remaining) + remaining = removalsToRetry + + finishedRemoval.foreach { + case (partition, Left(e)) => + logger.error(s"Could not remove corrupted brokerId $brokerId from partition $partition", e) + success = false + case _ => + } + } + + if (!success) { + throw new KafkaException(s"Unexpected error trying to remove brokerId $brokerId from ISRs") + } + } + + private def doRemoveBrokerIdFromPartitionsIsrs(brokerId: Int, partitions: Iterable[TopicPartition]) + : (Map[TopicPartition, Either[Exception, LeaderIsrAndControllerEpoch]], Seq[TopicPartition]) = { + val leaderAndIsrs = partitions + .map(tp => tp -> controllerContext.partitionLeadershipInfo(tp)) + .toMap + .filterNot { case (_, leaderAndIsrAndControllerEpochOption) => leaderAndIsrAndControllerEpochOption.isEmpty } + .mapValues(_.get) + .filter { case (_, leaderAndIsrAndControllerEpoch) => leaderAndIsrAndControllerEpoch.leaderAndIsr.isr.contains(brokerId) } + .mapValues(_.leaderAndIsr) + + val updatedLeaderAndIsrs = leaderAndIsrs.map { case(tp, leaderAndIsr) => + // Validate that brokerId is not present in ISR for any partition that has a leader + // The two checks below are like assertions, and they are never expected to occur. + if (leaderAndIsr.leader != LeaderAndIsr.NoLeader) { + logger.warn( + s"""Corruption-recovery: Unexpected entry $brokerId in ISR for partition + | $tp, leaderAndIsr = $leaderAndIsr. This should never happen, please investigate.""".stripMargin) + } + else if (leaderAndIsr.isr.length > 1) { + logger.warn( + s"""Corruption-recovery: Unexpected multiple entries in ISR for leaderless partition + | $tp, leaderAndIsr = $leaderAndIsr. This should never happen, please investigate.""".stripMargin) + } + + val newLeader = if (brokerId == leaderAndIsr.leader) LeaderAndIsr.NoLeader else leaderAndIsr.leader + val adjustedIsr = leaderAndIsr.isr.filter(_ != brokerId) + tp -> leaderAndIsr.newLeaderAndIsr(newLeader, adjustedIsr) + } + + val UpdateLeaderAndIsrResult(finishedPartitions, updatesToRetry) = + zkClient.updateLeaderAndIsr(updatedLeaderAndIsrs, controllerContext.epoch, controllerContext.epochZkVersion) + + val leaderIsrAndControllerEpochs = finishedPartitions.map { case (partition, result) => + (partition, result.map { leaderAndIsr => + val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerContext.epoch) + // Add updated LeaderAndIsr with controller epoch to cache + controllerContext.putPartitionLeadershipInfo(partition, leaderIsrAndControllerEpoch) + leaderIsrAndControllerEpoch + }) + } + + (leaderIsrAndControllerEpochs, updatesToRetry) + } + def processAllocateProducerIds(brokerId: Int, brokerEpoch: Long, callback: Either[Errors, ProducerIdsBlock] => Unit): Unit = { // Handle a few short-circuits if (!isActive) { @@ -3129,6 +3251,8 @@ class KafkaController(val config: KafkaConfig, processStartup() case SkipControlledShutdownSafetyCheck(id, brokerEpoch, callback) => processSkipControlledShutdownSafetyCheck(id, brokerEpoch, callback) + case CorruptedBrokersChange => + processCorruptedBrokersChange() } } catch { case e: ControllerMovedException => @@ -3154,6 +3278,14 @@ class BrokerChangeHandler(eventManager: ControllerEventManager) extends ZNodeChi } } +class CorruptedBrokersHandler(eventManager: ControllerEventManager) extends ZNodeChildChangeHandler { + override val path: String = CorruptedBrokersZNode.path + + override def handleChildChange(): Unit = { + eventManager.put(CorruptedBrokersChange) + } +} + class PreferredControllerChangeHandler(eventManager: ControllerEventManager) extends ZNodeChildChangeHandler { override val path: String = PreferredControllersZNode.path @@ -3378,6 +3510,11 @@ case object PreferredControllerChange extends ControllerEvent { override def preempt(): Unit = {} } +case object CorruptedBrokersChange extends ControllerEvent { + override def state: ControllerState = ControllerState.CorruptedBrokersChange + override def preempt(): Unit = {} +} + case class BrokerModifications(brokerId: Int) extends ControllerEvent { override def state: ControllerState = ControllerState.BrokerChange override def preempt(): Unit = {} @@ -3470,7 +3607,6 @@ case class AllocateProducerIds(brokerId: Int, brokerEpoch: Long, callback: Eithe override def preempt(): Unit = {} } - // Used only in test cases abstract class MockEvent(val state: ControllerState) extends ControllerEvent { def process(): Unit diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 070fe29c41ba8..5134c42b4619a 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -333,6 +333,7 @@ object Defaults { val LiLogCleanerFineGrainedLockEnabled = true val LiDropCorruptedFilesEnabled = false val LiConsumerFetchSampleRatio = 0.01 + val LiLeaderElectionOnCorruptionWaitMs = 0L val LiZookeeperPaginationEnable = false val LiRackIdMapperClassNameForRackAwareReplicaAssignment: String = null } @@ -454,6 +455,7 @@ object KafkaConfig { val LiLogCleanerFineGrainedLockEnableProp = "li.log.cleaner.fine.grained.lock.enable" val LiDropCorruptedFilesEnableProp = "li.drop.corrupted.files.enable" val LiConsumerFetchSampleRatioProp = "li.consumer.fetch.sample.ratio" + val LiLeaderElectionOnCorruptionWaitMsProp = "li.leader.election.on.corruption.wait.ms" val LiZookeeperPaginationEnableProp = "li.zookeeper.pagination.enable" val LiRackIdMapperClassNameForRackAwareReplicaAssignmentProp = "li.rack.aware.assignment.rack.id.mapper.class" val AllowPreferredControllerFallbackProp = "allow.preferred.controller.fallback" @@ -796,6 +798,10 @@ object KafkaConfig { val LiUpdateMetadataDelayMsDoc = "Specifies how long a UpdateMetadata request with partitions should be delayed before its processing can start. This config is purely for testing the LiCombinedControl feature and should not be enabled in a production environment." val LiDropCorruptedFilesEnableDoc = "Specifies whether the broker should delete corrupted files during startup." val LiConsumerFetchSampleRatioDoc = "Specifies the ratio of consumer Fetch requests to sample, which must be a number in the range [0.0, 1.0]. For now, the sampling is used to derive the age of consumed data." + val LiLeaderElectionOnCorruptionWaitMsDoc = + """Specifies how long (in milliseconds) the controller should wait for other replicas to come up before electing a broker that has data corruption as leader. + |Setting this value to 0 disables delayed leader election on corruption. + |Increasing this number increases the unavailability of impacted topics during corruption broker startup, so operators should be careful about configuring it to a high number.""".stripMargin val LiDropFetchFollowerEnableDoc = "Specifies whether a leader should drop Fetch requests from followers. This config is used to simulate a slow leader and test the leader initiated leadership transfer" val LiDenyAlterIsrDoc = "Test only config, and never enable this in a real cluster. Specifies whether controller should deny the AlterISRRequest." val LiNumControllerInitThreadsDoc = "Number of threads (and Zookeeper clients + connections) to be used while recursing the topic-partitions tree in Zookeeper during controller startup/failover." @@ -1248,6 +1254,7 @@ object KafkaConfig { .define(LiLogCleanerFineGrainedLockEnableProp, BOOLEAN, Defaults.LiLogCleanerFineGrainedLockEnabled, LOW, LiLogCleanerFineGrainedLockEnableDoc) .define(LiDropCorruptedFilesEnableProp, BOOLEAN, Defaults.LiDropCorruptedFilesEnabled, HIGH, LiDropCorruptedFilesEnableDoc) .define(LiConsumerFetchSampleRatioProp, DOUBLE, Defaults.LiConsumerFetchSampleRatio, between(0.0, 1.0), LOW, LiConsumerFetchSampleRatioDoc) + .define(LiLeaderElectionOnCorruptionWaitMsProp, LONG, Defaults.LiLeaderElectionOnCorruptionWaitMs, atLeast(0), HIGH, LiLeaderElectionOnCorruptionWaitMsDoc) .define(LiZookeeperPaginationEnableProp, BOOLEAN, Defaults.LiZookeeperPaginationEnable, LOW, LiZookeeperPaginationEnableDoc) .define(LiRackIdMapperClassNameForRackAwareReplicaAssignmentProp, STRING, Defaults.LiRackIdMapperClassNameForRackAwareReplicaAssignment, LOW, LiRackIdMapperClassNameForRackAwareReplicaAssignmentDoc) .define(AllowPreferredControllerFallbackProp, BOOLEAN, Defaults.AllowPreferredControllerFallback, HIGH, AllowPreferredControllerFallbackDoc) @@ -1795,6 +1802,8 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO def liLogCleanerFineGrainedLockEnable = getBoolean(KafkaConfig.LiLogCleanerFineGrainedLockEnableProp) val liDropCorruptedFilesEnable = getBoolean(KafkaConfig.LiDropCorruptedFilesEnableProp) val liConsumerFetchSampleRatio = getDouble(KafkaConfig.LiConsumerFetchSampleRatioProp) + val liLeaderElectionOnCorruptionWaitMs = getLong(KafkaConfig.LiLeaderElectionOnCorruptionWaitMsProp) + def liZookeeperPaginationEnable = getBoolean(KafkaConfig.LiZookeeperPaginationEnableProp) def unofficialClientLoggingEnable = getBoolean(KafkaConfig.UnofficialClientLoggingEnableProp) def unofficialClientCacheTtl = getLong(KafkaConfig.UnofficialClientCacheTtlProp) @@ -2162,6 +2171,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO def usesTopicId: Boolean = usesSelfManagedQuorum || interBrokerProtocolVersion >= KAFKA_2_8_IV0 + def isDelayedElectionEnabled: Boolean = liLeaderElectionOnCorruptionWaitMs > 0 def isRemoteLogStorageEnabled: Boolean = getBoolean(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index dbb3e3d884235..a6ce3adf306d2 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -46,7 +46,7 @@ import org.apache.kafka.common.requests.{ControlledShutdownRequest, ControlledSh import org.apache.kafka.common.security.scram.internals.ScramMechanism import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache import org.apache.kafka.common.security.{JaasContext, JaasUtils} -import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time, Utils, PoisonPill} +import org.apache.kafka.common.utils.{AppInfoParser, LogContext, PoisonPill, Time, Utils} import org.apache.kafka.common.{Endpoint, Node, TopicPartition} import org.apache.kafka.metadata.BrokerState import org.apache.kafka.server.authorizer.Authorizer @@ -322,6 +322,10 @@ class KafkaServer( autoTopicCreationChannel = Some(clientToControllerChannelManager) } + if (config.liDropCorruptedFilesEnable) { + zkClient.registerCorruptedBrokerId(config.brokerId) + } + val apiVersionManager = ApiVersionManager( ListenerType.ZK_BROKER, config, diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala index c6da8166f1710..3a50aea309211 100644 --- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala +++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala @@ -111,6 +111,40 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, info(s"Registered preferred controller ${id} at path $path with czxid (preferred controller epoch): ${stat.getCzxid}") } + def registerCorruptedBrokerId(brokerId: Int): Unit = { + val corruptedBroker = CorruptedBroker(brokerId) + val path = corruptedBroker.path + checkedEphemeralCreate(path, corruptedBroker.toJsonBytes) + info(s"Registered corrupted broker ${corruptedBroker.brokerId} at path $path") + } + + def updateCorruptedBroker(corruptedBroker: CorruptedBroker): Unit = { + val path = corruptedBroker.path + val setDataRequest = SetDataRequest(path, corruptedBroker.toJsonBytes, ZkVersion.MatchAnyVersion) + val response = retryRequestUntilConnected(setDataRequest) + response.maybeThrow() + info(s"Updated corrupted broker ${corruptedBroker.brokerId} at path $path, clearedFromIsrs = ${corruptedBroker.clearedFromIsrs}") + } + + def getCorruptedBrokers: Map[Int, CorruptedBroker] = { + val brokerIds = getChildren(CorruptedBrokersZNode.path).map(_.toInt).sorted + + val getDataRequests = brokerIds.map(brokerId => GetDataRequest( + CorruptedBrokerIdZNode.path(brokerId), + ctx = Some(brokerId))) + + val getDataResponses = retryRequestsUntilConnected(getDataRequests) + getDataResponses.flatMap { getDataResponse => + val brokerId = getDataResponse.ctx.get.asInstanceOf[Int] + getDataResponse.resultCode match { + case Code.OK => + Some(brokerId, CorruptedBrokerIdZNode.decode(brokerId, getDataResponse.data)) + case Code.NONODE => None + case _ => throw getDataResponse.resultException.get + } + }.toMap + } + /** * Registers a given broker in zookeeper as the controller and increments controller epoch. * @param controllerId the id of the broker that is to be registered as the controller. diff --git a/core/src/main/scala/kafka/zk/ZkData.scala b/core/src/main/scala/kafka/zk/ZkData.scala index f24100c2259e1..7a41dd57c13cf 100644 --- a/core/src/main/scala/kafka/zk/ZkData.scala +++ b/core/src/main/scala/kafka/zk/ZkData.scala @@ -84,6 +84,48 @@ object PreferredControllerIdZNode { def path(id: Int) = s"${BrokersZNode.path}/preferred_controllers/$id" } +object CorruptedBrokersZNode { + def path = s"${BrokersZNode.path}/corrupted" + def encode: Array[Byte] = null +} + +object CorruptedBroker { + def apply(brokerId: Int, clearedFromIsrs: Boolean = false): CorruptedBroker = { + val version = 0 + new CorruptedBroker(brokerId, version, clearedFromIsrs) + } +} + +case class CorruptedBroker(brokerId: Int, version: Int, clearedFromIsrs: Boolean) { + val path: String = CorruptedBrokerIdZNode.path(brokerId) + def toJsonBytes: Array[Byte] = CorruptedBrokerIdZNode.encode(this) +} + +object CorruptedBrokerIdZNode { + private val VersionKey = "version" + private val ClearedFromIsrsKey = "clearedFromIsrs" + + def path(id: Int) = s"${CorruptedBrokersZNode.path}/$id" + def encode(corruptedBroker: CorruptedBroker): Array[Byte] = { + val jsonMap = Map( + VersionKey -> corruptedBroker.version, + ClearedFromIsrsKey -> corruptedBroker.clearedFromIsrs) + Json.encodeAsBytes(jsonMap.asJava) + } + def decode(id: Int, jsonBytes: Array[Byte]): CorruptedBroker = { + Json.tryParseBytes(jsonBytes) match { + case Right(json) => + val corruptedBrokerJson = json.asJsonObject + val version = corruptedBrokerJson(VersionKey).to[Int] + val clearedFromIsrs = corruptedBrokerJson(ClearedFromIsrsKey).to[Boolean] + new CorruptedBroker(id, version, clearedFromIsrs) + case Left(e) => + throw new KafkaException(s"Failed to parse Zookeeper data for corrupted broker Id $id: " + + s"${new String(jsonBytes, UTF_8)}", e) + } + } +} + object BrokerIdsZNode { def path = s"${BrokersZNode.path}/ids" def encode: Array[Byte] = null @@ -1011,6 +1053,7 @@ object ZkData { BrokerIdsZNode.path, BrokerShutdownNode.path, PreferredControllersZNode.path, + CorruptedBrokersZNode.path, TopicsZNode.path, ConfigEntityChangeNotificationZNode.path, DeleteTopicsZNode.path, diff --git a/core/src/test/scala/integration/kafka/api/CorruptedBrokersTest.scala b/core/src/test/scala/integration/kafka/api/CorruptedBrokersTest.scala new file mode 100644 index 0000000000000..3bc573776a59b --- /dev/null +++ b/core/src/test/scala/integration/kafka/api/CorruptedBrokersTest.scala @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package integration.kafka.api + +import kafka.api.{IntegrationTestHarness, LeaderAndIsr} +import kafka.log.Log.offsetFromFile +import kafka.log.{LogSegment} +import kafka.server.KafkaConfig +import kafka.utils.TestUtils +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.utils.Time +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.Test + +import java.util.Properties + +class CorruptedBrokersTest extends IntegrationTestHarness { + val brokerCount = 3 + val controllerId = 0 + val broker1Id = 1 + val broker2Id = 2 + val topic = "topic1" + val partition = 0 + val topicPartition = new TopicPartition(topic, partition) + + serverConfig.setProperty(KafkaConfig.LiDropCorruptedFilesEnableProp, "true") + serverConfig.setProperty(KafkaConfig.LiLeaderElectionOnCorruptionWaitMsProp, "30000") + serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false") + serverConfig.setProperty(KafkaConfig.MinInSyncReplicasProp, "1") + serverConfig.setProperty(KafkaConfig.UncleanLeaderElectionEnableProp, "true") + + producerConfig.setProperty(ProducerConfig.LINGER_MS_CONFIG, "0") + producerConfig.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "0") + producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all") + + override def modifyConfigs(props: Seq[Properties]): Unit = { + super.modifyConfigs(props) + // Make broker 0 controller + props.head.setProperty(KafkaConfig.PreferredControllerProp, "true") + } + + @Test + def isrCleanupOnCorruptedBrokerStartup(): Unit = { + val electedControllerId = TestUtils.waitUntilControllerElected(zkClient) + val controller = getController().kafkaController + assertTrue(electedControllerId == controllerId) + + createTopic(topic, Map(partition -> Seq(broker1Id, broker2Id))) + + val producer = createProducer() + sendRecords(producer, 5) + + killBroker(broker1Id) + TestUtils.waitUntilTrue(() => zkClient.getBroker(broker1Id).isEmpty, "Broker 1 failed to shut down.") + + killBroker(broker2Id) + TestUtils.waitUntilTrue(() => zkClient.getBroker(broker2Id).isEmpty, "Broker 2 failed to shut down.") + + val partitionState = zkClient.getTopicPartitionState(topicPartition) + assertTrue(partitionState.exists(state => + state.leaderAndIsr.isr == List(2) && + state.leaderAndIsr.leader == LeaderAndIsr.NoLeader)) + + truncateLog(broker1Id, 3) + truncateLog(broker2Id, 2) + + restartDeadBroker(broker2Id) + + TestUtils.waitUntilTrue( + () => { + zkClient.getCorruptedBrokers.get(2).exists(_.clearedFromIsrs) + }, "Broker 2 did not get cleaned from ISRs") + + restartDeadBroker(broker1Id) + + TestUtils.waitUntilTrue( + () => { + zkClient.getCorruptedBrokers.get(1).exists(_.clearedFromIsrs) + }, "Broker 1 did not get cleaned from ISRs") + } + + def truncateLog(brokerId: Int, truncationOffset: Int): Unit = { + val server = serverForId(brokerId).get + val log = server.logManager.getLog(topicPartition).get + val lastSegment = log.segments.lastSegment.get + val segmentFile = lastSegment.log.file + + val baseOffset = offsetFromFile(segmentFile) + val s2 = LogSegment.open(segmentFile.getParentFile, baseOffset, log.config, Time.SYSTEM) + s2.truncateTo(truncationOffset) + s2.close() + } + + def sendRecords(producer: KafkaProducer[Array[Byte], Array[Byte]], recordCount: Int) { + val futures = (0 until recordCount).map { index => + val record = new ProducerRecord(topic, partition, index.toString.getBytes, index.toString.getBytes) + producer.send(record) + } + futures.foreach(_.get) + } +} diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala index 34676b03760ed..823695280cee8 100755 --- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala +++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala @@ -170,19 +170,23 @@ abstract class KafkaServerTestHarness extends ZooKeeperTestHarness { instanceConfigs = null } for(i <- servers.indices if !alive(i)) { - if (reconfigure) { - servers(i) = TestUtils.createServer( - configs(i), - time = brokerTime(configs(i).brokerId), - threadNamePrefix = None, - enableForwarding - ) - } - servers(i).startup() - alive(i) = true + restartDeadBroker(i, reconfigure) } } + def restartDeadBroker(brokerIndex: Int, reconfigure: Boolean = false): Unit = { + if (reconfigure) { + servers(brokerIndex) = TestUtils.createServer( + configs(brokerIndex), + time = brokerTime(configs(brokerIndex).brokerId), + threadNamePrefix = None, + enableForwarding + ) + } + servers(brokerIndex).startup() + alive(brokerIndex) = true + } + def waitForUserScramCredentialToAppearOnAllBrokers(clientPrincipal: String, mechanismName: String): Unit = { servers.foreach { server => val cache = server.credentialProvider.credentialCache.cache(mechanismName, classOf[ScramCredential])