Skip to content

Commit

Permalink
cherry-pick KAFKA-8002; Log dir reassignment stalls if future replica…
Browse files Browse the repository at this point in the history
… has different segment base offset (apache#6346)

This patch fixes a bug in log dir reassignment where Partition.maybeReplaceCurrentWithFutureReplica would compare the entire LogEndOffsetMetadata of each replica to determine whether the reassignment has completed. If the active segments of both replicas have different base segments (for example, if the current replica had previously been cleaned and the future replica rolled segments at different points), the reassignment will never complete. The fix is to compare only the LogEndOffsetMetadata.messageOffset for each replica. Tested with a unit test that simulates the compacted current replica case.
backported to 2.0-li
Reviewers: Anna Povzner <[email protected]>, Jason Gustafson <[email protected]>
  • Loading branch information
bob-barrett authored and xiowu0 committed Mar 12, 2019
1 parent 68fdc1c commit a99b4f8
Show file tree
Hide file tree
Showing 14 changed files with 130 additions and 73 deletions.
21 changes: 12 additions & 9 deletions core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ class Partition(val topic: String,
inSyncReplicas = newInSyncReplicas

info(s"$topicPartition starts at Leader Epoch ${partitionStateInfo.basePartitionState.leaderEpoch} from " +
s"offset ${getReplica().get.logEndOffset.messageOffset}. Previous Leader Epoch was: $leaderEpoch")
s"offset ${getReplica().get.logEndOffset}. Previous Leader Epoch was: $leaderEpoch")

//We cache the leader epoch here, persisting it only if it's local (hence having a log dir)
leaderEpoch = partitionStateInfo.basePartitionState.leaderEpoch
Expand All @@ -304,7 +304,7 @@ class Partition(val topic: String,
val isNewLeader = leaderReplicaIdOpt.map(_ != localBrokerId).getOrElse(true)

val leaderReplica = getReplica().get
val curLeaderLogEndOffset = leaderReplica.logEndOffset.messageOffset
val curLeaderLogEndOffset = leaderReplica.logEndOffset
val curTimeMs = time.milliseconds
// initialize lastCaughtUpTime of replicas as well as their lastFetchTimeMs and lastFetchLeaderLogEndOffset.
(assignedReplicas - leaderReplica).foreach { replica =>
Expand Down Expand Up @@ -412,7 +412,7 @@ class Partition(val topic: String,
val leaderHW = leaderReplica.highWatermark
if (!inSyncReplicas.contains(replica) &&
assignedReplicas.map(_.brokerId).contains(replicaId) &&
replica.logEndOffset.offsetDiff(leaderHW) >= 0) {
replica.logEndOffsetMetadata.offsetDiff(leaderHW) >= 0) {
val newInSyncReplicas = inSyncReplicas + replica
info(s"Expanding ISR from ${inSyncReplicas.map(_.brokerId).mkString(",")} " +
s"to ${newInSyncReplicas.map(_.brokerId).mkString(",")}")
Expand Down Expand Up @@ -443,9 +443,9 @@ class Partition(val topic: String,
val curInSyncReplicas = inSyncReplicas

if (isTraceEnabled) {
def logEndOffsetString(r: Replica) = s"broker ${r.brokerId}: ${r.logEndOffset.messageOffset}"
def logEndOffsetString(r: Replica) = s"broker ${r.brokerId}: ${r.logEndOffset}"
val (ackedReplicas, awaitingReplicas) = curInSyncReplicas.partition { replica =>
replica.logEndOffset.messageOffset >= requiredOffset
replica.logEndOffset >= requiredOffset
}
trace(s"Progress awaiting ISR acks for offset $requiredOffset: acked: ${ackedReplicas.map(logEndOffsetString)}, " +
s"awaiting ${awaitingReplicas.map(logEndOffsetString)}")
Expand Down Expand Up @@ -489,7 +489,8 @@ class Partition(val topic: String,
private def maybeIncrementLeaderHW(leaderReplica: Replica, curTime: Long = time.milliseconds): Boolean = {
val allLogEndOffsets = assignedReplicas.filter { replica =>
curTime - replica.lastCaughtUpTimeMs <= replicaManager.config.replicaLagTimeMaxMs || inSyncReplicas.contains(replica)
}.map(_.logEndOffset)
}.map(_.logEndOffsetMetadata)

val newHighWatermark = allLogEndOffsets.min(new LogOffsetMetadata.OffsetOrdering)
val oldHighWatermark = leaderReplica.highWatermark

Expand All @@ -501,7 +502,7 @@ class Partition(val topic: String,
debug(s"High watermark updated to $newHighWatermark")
true
} else {
def logEndOffsetString(r: Replica) = s"replica ${r.brokerId}: ${r.logEndOffset}"
def logEndOffsetString(r: Replica) = s"replica ${r.brokerId}: ${r.logEndOffsetMetadata}"
debug(s"Skipping update high watermark since new hw $newHighWatermark is not larger than old hw $oldHighWatermark. " +
s"All current LEOs are ${assignedReplicas.map(logEndOffsetString)}")
false
Expand Down Expand Up @@ -542,6 +543,7 @@ class Partition(val topic: String,
assert(newInSyncReplicas.nonEmpty)
info("Shrinking ISR from %s to %s".format(inSyncReplicas.map(_.brokerId).mkString(","),
newInSyncReplicas.map(_.brokerId).mkString(",")))

// update ISR in zk and in cache
updateIsr(newInSyncReplicas)
// we may need to increment high watermark since ISR could be down to 1
Expand Down Expand Up @@ -577,7 +579,7 @@ class Partition(val topic: String,
val candidateReplicas = inSyncReplicas - leaderReplica

val laggingReplicas = candidateReplicas.filter(r =>
r.logEndOffset.messageOffset != leaderReplica.logEndOffset.messageOffset && (time.milliseconds - r.lastCaughtUpTimeMs) > maxLagMs)
r.logEndOffset != leaderReplica.logEndOffset && (time.milliseconds - r.lastCaughtUpTimeMs) > maxLagMs)
if (laggingReplicas.nonEmpty)
debug("Lagging replicas are %s".format(laggingReplicas.map(_.brokerId).mkString(",")))

Expand Down Expand Up @@ -609,7 +611,8 @@ class Partition(val topic: String,
} catch {
case e: UnexpectedAppendOffsetException =>
val replica = if (isFuture) getReplicaOrException(Request.FutureLocalReplicaId) else getReplicaOrException()
val logEndOffset = replica.logEndOffset.messageOffset
val logEndOffset = replica.logEndOffset

if (logEndOffset == replica.logStartOffset &&
e.firstOffset < logEndOffset && e.lastOffset >= logEndOffset) {
// This may happen if the log start offset on the leader (or current replica) falls in
Expand Down
17 changes: 10 additions & 7 deletions core/src/main/scala/kafka/cluster/Replica.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class Replica(val brokerId: Int,
@volatile private[this] var highWatermarkMetadata = new LogOffsetMetadata(initialHighWatermarkValue)
// the log end offset value, kept in all replicas;
// for local replica it is the log's end offset, for remote replicas its value is only updated by follower fetch
@volatile private[this] var logEndOffsetMetadata = LogOffsetMetadata.UnknownOffsetMetadata
@volatile private[this] var _logEndOffsetMetadata = LogOffsetMetadata.UnknownOffsetMetadata
// the log start offset value, kept in all replicas;
// for local replica it is the log's start offset, for remote replicas its value is only updated by follower fetch
@volatile private[this] var _logStartOffset = Log.UnknownLogStartOffset
Expand Down Expand Up @@ -78,7 +78,7 @@ class Replica(val brokerId: Int,
_lastCaughtUpTimeMs = math.max(_lastCaughtUpTimeMs, lastFetchTimeMs)

logStartOffset = logReadResult.followerLogStartOffset
logEndOffset = logReadResult.info.fetchOffsetMetadata
logEndOffsetMetadata = logReadResult.info.fetchOffsetMetadata
lastFetchLeaderLogEndOffset = logReadResult.leaderLogEndOffset
lastFetchTimeMs = logReadResult.fetchTimeMs
}
Expand All @@ -89,20 +89,23 @@ class Replica(val brokerId: Int,
_lastCaughtUpTimeMs = lastCaughtUpTimeMs
}

private def logEndOffset_=(newLogEndOffset: LogOffsetMetadata) {
private def logEndOffsetMetadata_=(newLogEndOffset: LogOffsetMetadata) {
if (isLocal) {
throw new KafkaException(s"Should not set log end offset on partition $topicPartition's local replica $brokerId")
} else {
logEndOffsetMetadata = newLogEndOffset
trace(s"Setting log end offset for replica $brokerId for partition $topicPartition to [$logEndOffsetMetadata]")
_logEndOffsetMetadata = newLogEndOffset
trace(s"Setting log end offset for replica $brokerId for partition $topicPartition to [${_logEndOffsetMetadata}]")
}
}

def logEndOffset: LogOffsetMetadata =
def logEndOffsetMetadata: LogOffsetMetadata =
if (isLocal)
log.get.logEndOffsetMetadata
else
logEndOffsetMetadata
_logEndOffsetMetadata

def logEndOffset: Long =
logEndOffsetMetadata.messageOffset

/**
* Increment the log start offset if the new offset is greater than the previous log start offset. The replica
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/kafka/server/AbstractFetcherThread.scala
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ abstract class AbstractFetcherThread(name: String,
// (version 0 of OffsetForLeaderEpoch request/response)
warn(s"Leader or $followerName is on protocol version where leader epoch is not considered in the OffsetsForLeaderEpoch response. " +
s"The leader's offset ${leaderEpochOffset.endOffset} will be used for truncation in ${replica.topicPartition}.")
OffsetTruncationState(min(leaderEpochOffset.endOffset, replica.logEndOffset.messageOffset), truncationCompleted = true)
OffsetTruncationState(min(leaderEpochOffset.endOffset, replica.logEndOffset), truncationCompleted = true)
} else {
// get (leader epoch, end offset) pair that corresponds to the largest leader epoch
// less than or equal to the requested epoch.
Expand All @@ -353,19 +353,19 @@ abstract class AbstractFetcherThread(name: String,
warn(s"Based on $followerName's leader epoch, leader replied with epoch ${leaderEpochOffset.leaderEpoch} " +
s"below any $followerName's tracked epochs for ${replica.topicPartition}. " +
s"The leader's offset only ${leaderEpochOffset.endOffset} will be used for truncation.")
OffsetTruncationState(min(leaderEpochOffset.endOffset, replica.logEndOffset.messageOffset), truncationCompleted = true)
OffsetTruncationState(min(leaderEpochOffset.endOffset, replica.logEndOffset), truncationCompleted = true)
} else if (followerEpoch != leaderEpochOffset.leaderEpoch) {
// the follower does not know about the epoch that leader replied with
// we truncate to the end offset of the largest epoch that is smaller than the
// epoch the leader replied with, and send another offset for leader epoch request
val intermediateOffsetToTruncateTo = min(followerEndOffset, replica.logEndOffset.messageOffset)
val intermediateOffsetToTruncateTo = min(followerEndOffset, replica.logEndOffset)
info(s"Based on $followerName's leader epoch, leader replied with epoch ${leaderEpochOffset.leaderEpoch} " +
s"unknown to the $followerName for ${replica.topicPartition}. " +
s"Will truncate to $intermediateOffsetToTruncateTo and send another leader epoch request to the leader.")
OffsetTruncationState(intermediateOffsetToTruncateTo, truncationCompleted = false)
} else {
val offsetToTruncateTo = min(followerEndOffset, leaderEpochOffset.endOffset)
OffsetTruncationState(min(offsetToTruncateTo, replica.logEndOffset.messageOffset), truncationCompleted = true)
OffsetTruncationState(min(offsetToTruncateTo, replica.logEndOffset), truncationCompleted = true)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/DelayedFetch.scala
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class DelayedFetch(delayMs: Long,
else if (fetchMetadata.fetchOnlyCommitted)
replica.highWatermark
else
replica.logEndOffset
replica.logEndOffsetMetadata

// Go directly to the check for Case D if the message offsets are the same. If the log segment
// has just rolled, then the high watermark offset will remain the same but be on the old segment,
Expand Down
19 changes: 10 additions & 9 deletions core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,13 @@ class ReplicaAlterLogDirsThread(name: String,
val partition = replicaMgr.getPartition(topicPartition).get
val records = partitionData.toRecords

if (fetchOffset != futureReplica.logEndOffset.messageOffset)
if (fetchOffset != futureReplica.logEndOffset)
throw new IllegalStateException("Offset mismatch for the future replica %s: fetched offset = %d, log end offset = %d.".format(
topicPartition, fetchOffset, futureReplica.logEndOffset.messageOffset))
topicPartition, fetchOffset, futureReplica.logEndOffset))

partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = true)
val futureReplicaHighWatermark = futureReplica.logEndOffset.messageOffset.min(partitionData.highWatermark)
val futureReplicaHighWatermark = futureReplica.logEndOffset.min(partitionData.highWatermark)

futureReplica.highWatermark = new LogOffsetMetadata(futureReplicaHighWatermark)
futureReplica.maybeIncrementLogStartOffset(partitionData.logStartOffset)

Expand All @@ -113,20 +114,20 @@ class ReplicaAlterLogDirsThread(name: String,
val futureReplica = replicaMgr.getReplicaOrException(topicPartition, Request.FutureLocalReplicaId)
val currentReplica = replicaMgr.getReplicaOrException(topicPartition)
val partition = replicaMgr.getPartition(topicPartition).get
val logEndOffset: Long = currentReplica.logEndOffset.messageOffset
val logEndOffset: Long = currentReplica.logEndOffset

if (logEndOffset < futureReplica.logEndOffset.messageOffset) {
if (logEndOffset < futureReplica.logEndOffset) {
warn("Future replica for partition %s reset its fetch offset from %d to current replica's latest offset %d"
.format(topicPartition, futureReplica.logEndOffset.messageOffset, logEndOffset))
.format(topicPartition, futureReplica.logEndOffset, logEndOffset))
partition.truncateTo(logEndOffset, isFuture = true)
logEndOffset
} else {
val currentReplicaStartOffset: Long = currentReplica.logStartOffset
warn("Future replica for partition %s reset its fetch offset from %d to current replica's start offset %d"
.format(topicPartition, futureReplica.logEndOffset.messageOffset, currentReplicaStartOffset))
val offsetToFetch = Math.max(currentReplicaStartOffset, futureReplica.logEndOffset.messageOffset)
.format(topicPartition, futureReplica.logEndOffset, currentReplicaStartOffset))
val offsetToFetch = Math.max(currentReplicaStartOffset, futureReplica.logEndOffset)
// Only truncate the log when current replica's log start offset is greater than future replica's log end offset.
if (currentReplicaStartOffset > futureReplica.logEndOffset.messageOffset)
if (currentReplicaStartOffset > futureReplica.logEndOffset)
partition.truncateFullyAndStartAt(currentReplicaStartOffset, isFuture = true)
offsetToFetch
}
Expand Down
22 changes: 11 additions & 11 deletions core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
Original file line number Diff line number Diff line change
Expand Up @@ -130,21 +130,21 @@ class ReplicaFetcherThread(name: String,

maybeWarnIfOversizedRecords(records, topicPartition)

if (fetchOffset != replica.logEndOffset.messageOffset)
if (fetchOffset != replica.logEndOffset)
throw new IllegalStateException("Offset mismatch for partition %s: fetched offset = %d, log end offset = %d.".format(
topicPartition, fetchOffset, replica.logEndOffset.messageOffset))
topicPartition, fetchOffset, replica.logEndOffset))

if (isTraceEnabled)
trace("Follower has replica log end offset %d for partition %s. Received %d messages and leader hw %d"
.format(replica.logEndOffset.messageOffset, topicPartition, records.sizeInBytes, partitionData.highWatermark))
.format(replica.logEndOffset, topicPartition, records.sizeInBytes, partitionData.highWatermark))

// Append the leader's messages to the log
partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = false)

if (isTraceEnabled)
trace("Follower has replica log end offset %d after appending %d bytes of messages for partition %s"
.format(replica.logEndOffset.messageOffset, records.sizeInBytes, topicPartition))
val followerHighWatermark = replica.logEndOffset.messageOffset.min(partitionData.highWatermark)
.format(replica.logEndOffset, records.sizeInBytes, topicPartition))
val followerHighWatermark = replica.logEndOffset.min(partitionData.highWatermark)
val leaderLogStartOffset = partitionData.logStartOffset
// for the follower replica, we do not need to keep
// its segment base offset the physical position,
Expand Down Expand Up @@ -189,7 +189,7 @@ class ReplicaFetcherThread(name: String,
*/
val leaderEndOffset: Long = earliestOrLatestOffset(topicPartition, ListOffsetRequest.LATEST_TIMESTAMP)

if (leaderEndOffset < replica.logEndOffset.messageOffset) {
if (leaderEndOffset < replica.logEndOffset) {
// Prior to truncating the follower's log, ensure that doing so is not disallowed by the configuration for unclean leader election.
// This situation could only happen if the unclean election configuration for a topic changes while a replica is down. Otherwise,
// we should never encounter this situation since a non-ISR leader cannot be elected if disallowed by the broker configuration.
Expand All @@ -198,11 +198,11 @@ class ReplicaFetcherThread(name: String,
ConfigType.Topic, topicPartition.topic)).uncleanLeaderElectionEnable) {
// Log a fatal error and shutdown the broker to ensure that data loss does not occur unexpectedly.
fatal(s"Exiting because log truncation is not allowed for partition $topicPartition, current leader's " +
s"latest offset $leaderEndOffset is less than replica's latest offset ${replica.logEndOffset.messageOffset}")
s"latest offset $leaderEndOffset is less than replica's latest offset ${replica.logEndOffset}")
throw new FatalExitError
}

warn(s"Reset fetch offset for partition $topicPartition from ${replica.logEndOffset.messageOffset} to current " +
warn(s"Reset fetch offset for partition $topicPartition from ${replica.logEndOffset} to current " +
s"leader's latest offset $leaderEndOffset")
partition.truncateTo(leaderEndOffset, isFuture = false)
replicaMgr.replicaAlterLogDirsManager.markPartitionsForTruncation(brokerConfig.brokerId, topicPartition, leaderEndOffset)
Expand Down Expand Up @@ -231,11 +231,11 @@ class ReplicaFetcherThread(name: String,
*
*/
val leaderStartOffset: Long = earliestOrLatestOffset(topicPartition, ListOffsetRequest.EARLIEST_TIMESTAMP)
warn(s"Reset fetch offset for partition $topicPartition from ${replica.logEndOffset.messageOffset} to current " +
warn(s"Reset fetch offset for partition $topicPartition from ${replica.logEndOffset} to current " +
s"leader's start offset $leaderStartOffset")
val offsetToFetch = Math.max(leaderStartOffset, replica.logEndOffset.messageOffset)
val offsetToFetch = Math.max(leaderStartOffset, replica.logEndOffset)
// Only truncate log when current leader's log start offset is greater than follower's log end offset.
if (leaderStartOffset > replica.logEndOffset.messageOffset) {
if (leaderStartOffset > replica.logEndOffset) {
partition.truncateFullyAndStartAt(leaderStartOffset, isFuture = false)
}
offsetToFetch
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,7 @@ class ReplicaManager(val config: KafkaConfig,
getReplica(topicPartition) match {
case Some(replica) =>
if (isFuture)
replica.logEndOffset.messageOffset - logEndOffset
replica.logEndOffset - logEndOffset
else
math.max(replica.highWatermark.messageOffset - logEndOffset, 0)
case None =>
Expand Down Expand Up @@ -928,7 +928,7 @@ class ReplicaManager(val config: KafkaConfig,
* where data gets appended to the log immediately after the replica has consumed from it
* This can cause a replica to always be out of sync.
*/
val initialLogEndOffset = localReplica.logEndOffset.messageOffset
val initialLogEndOffset = localReplica.logEndOffset
val initialLogStartOffset = localReplica.logStartOffset
val fetchTimeMs = time.milliseconds
val logReadInfo = localReplica.log match {
Expand Down Expand Up @@ -1397,7 +1397,7 @@ class ReplicaManager(val config: KafkaConfig,
nonOfflinePartitionsIterator.filter(_.leaderReplicaIfLocal.isDefined)

def getLogEndOffset(topicPartition: TopicPartition): Option[Long] =
nonOfflinePartition(topicPartition).flatMap(_.leaderReplicaIfLocal.map(_.logEndOffset.messageOffset))
nonOfflinePartition(topicPartition).flatMap(_.leaderReplicaIfLocal.map(_.logEndOffset))

// Flushes the highwatermark value for all partitions to the highwatermark file
def checkpointHighWatermarks() {
Expand Down
Loading

0 comments on commit a99b4f8

Please sign in to comment.