Skip to content

Commit

Permalink
Log info if replication takes more than longTailProduceRequestLogThre…
Browse files Browse the repository at this point in the history
…sholdMs (#475)

LI_DESCRIPTION=https://jira01.corp.linkedin.com:8443/browse/LIKAFKA-54083

Description:
This PR changes a critical trace log line to info level if the produce delay is longer than longTailProduceRequestLogThresholdMs.

The log should be able to tell:

- which partition is getting slow
- which follower broker is the root cause of slow replication
- how long does the DelayedProduce wait for replication
- how often the check is triggered
  • Loading branch information
CCisGG authored Aug 25, 2023
1 parent ed9b33c commit cc778ba
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 24 deletions.
69 changes: 51 additions & 18 deletions core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,9 @@ object Partition extends KafkaMetricsGroup {
metadataCache = replicaManager.metadataCache,
logManager = replicaManager.logManager,
alterIsrManager = replicaManager.alterIsrManager,
transferLeaderManager = replicaManager.transferLeaderManager)
transferLeaderManager = replicaManager.transferLeaderManager,
logSlowReplicationThresholdMs = replicaManager.config.longTailProduceRequestLogThresholdMs
)
}

def removeMetrics(topicPartition: TopicPartition): Unit = {
Expand Down Expand Up @@ -228,7 +230,8 @@ class Partition(val topicPartition: TopicPartition,
metadataCache: MetadataCache,
logManager: LogManager,
alterIsrManager: AlterIsrManager,
transferLeaderManager: TransferLeaderManager) extends Logging with KafkaMetricsGroup {
transferLeaderManager: TransferLeaderManager,
logSlowReplicationThresholdMs: Long = Long.MaxValue) extends Logging with KafkaMetricsGroup {

def topic: String = topicPartition.topic
def partitionId: Int = topicPartition.partition
Expand All @@ -248,6 +251,12 @@ class Partition(val topicPartition: TopicPartition,
@volatile var leaderReplicaIdOpt: Option[Int] = None
@volatile private[cluster] var isrState: IsrState = CommittedIsr(Set.empty)
@volatile var assignmentState: AssignmentState = SimpleAssignmentState(Seq.empty)
@volatile private var logSlowReplicationBucketStartTime: Long = 0
@volatile private var logSlowReplicationBucketCounter: Int = 0

// The bucket size is set to 1 minute, with max log number 300. The logs will hit max QPS at 5, which should be acceptable.
val logSlowReplicationBucketLengthMs: Long = 60000
val logSlowReplicationBucketMaxLogCount: Int = 300

// Logs belonging to this partition. Majority of time it will be only one log, but if log directory
// is getting changed (as a result of ReplicaAlterLogDirs command), we may have two logs until copy
Expand Down Expand Up @@ -821,26 +830,12 @@ class Partition(val topicPartition: TopicPartition,
* fully caught up to the (local) leader's offset corresponding to this produce request before we acknowledge the
* produce request.
*/
def checkEnoughReplicasReachOffset(requiredOffset: Long): (Boolean, Errors) = {
def checkEnoughReplicasReachOffset(requiredOffset: Long, requestCreationTime: Long): (Boolean, Errors) = {
leaderLogIfLocal match {
case Some(leaderLog) =>
// keep the current immutable replica list reference
val curMaximalIsr = isrState.maximalIsr

if (isTraceEnabled) {
def logEndOffsetString: ((Int, Long)) => String = {
case (brokerId, logEndOffset) => s"broker $brokerId: $logEndOffset"
}

val curInSyncReplicaObjects = (curMaximalIsr - localBrokerId).flatMap(getReplica)
val replicaInfo = curInSyncReplicaObjects.map(replica => (replica.brokerId, replica.logEndOffset))
val localLogInfo = (localBrokerId, localLogOrException.logEndOffset)
val (ackedReplicas, awaitingReplicas) = (replicaInfo + localLogInfo).partition { _._2 >= requiredOffset}

trace(s"Progress awaiting ISR acks for offset $requiredOffset: " +
s"acked: ${ackedReplicas.map(logEndOffsetString)}, " +
s"awaiting ${awaitingReplicas.map(logEndOffsetString)}")
}
maybeLogSlowReplication(curMaximalIsr, requiredOffset, requestCreationTime, leaderLog.topicPartition)

val minIsr = leaderLog.config.minInSyncReplicas
if (leaderLog.highWatermark >= requiredOffset) {
Expand All @@ -859,6 +854,44 @@ class Partition(val topicPartition: TopicPartition,
}
}

def maybeLogSlowReplication(curMaximalIsr: Set[Int], requiredOffset: Long, requestCreationTime: Long, topicPartition: TopicPartition): Unit = {
val now = time.milliseconds()
val timeElapsedMs = time.milliseconds() - requestCreationTime

if (timeElapsedMs < logSlowReplicationThresholdMs) {
// Only slow replications is worth logging
return
}

// Check and maybe create new time bucket for log slow replication
if (now - logSlowReplicationBucketStartTime > logSlowReplicationBucketLengthMs) {
logSlowReplicationBucketCounter = 0
logSlowReplicationBucketStartTime = now
}

if (logSlowReplicationBucketCounter > logSlowReplicationBucketMaxLogCount) {
// Stop logging if there are already many logs in a time bucket. This is to avoid logs overwhelming the host.
return
}

def logEndOffsetString: ((Int, Long)) => String = {
case (brokerId, logEndOffset) => s"broker $brokerId: $logEndOffset"
}

val curInSyncReplicaObjects = (curMaximalIsr - localBrokerId).flatMap(getReplica)
val replicaInfo = curInSyncReplicaObjects.map(replica => (replica.brokerId, replica.logEndOffset))
val localLogInfo = (localBrokerId, localLogOrException.logEndOffset)
val (ackedReplicas, awaitingReplicas) = (replicaInfo + localLogInfo).partition {
_._2 >= requiredOffset
}

info(s"Progress awaiting ISR acks for partition ${topicPartition} offset $requiredOffset: " +
s"acked: ${ackedReplicas.map(logEndOffsetString)}, " +
s"awaiting ${awaitingReplicas.map(logEndOffsetString)}. " +
s"Already waited for ${timeElapsedMs} milliseconds so far")
logSlowReplicationBucketCounter += 1
}

/**
* Check and maybe increment the high watermark of the partition;
* this function can be triggered when
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/kafka/server/DelayedProduce.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ class DelayedProduce(delayMs: Long,
produceMetadata: ProduceMetadata,
replicaManager: ReplicaManager,
responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
lockOpt: Option[Lock] = None)
lockOpt: Option[Lock] = None,
creationTime: Long = System.currentTimeMillis())
extends DelayedOperation(delayMs, lockOpt) {

import DelayedOperation._
Expand Down Expand Up @@ -95,7 +96,7 @@ class DelayedProduce(delayMs: Long,
(false, err)

case Right(partition) =>
partition.checkEnoughReplicasReachOffset(status.requiredOffset)
partition.checkEnoughReplicasReachOffset(status.requiredOffset, creationTime)
}

// Case B || C.1 || C.2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public void setup() throws IOException {
TransferLeaderManager transferLeaderManager = Mockito.mock(TransferLeaderManager.class);
Partition partition = new Partition(tp, 100, ApiVersion$.MODULE$.latestVersion(),
0, Time.SYSTEM, isrChangeListener, new DelayedOperationsMock(tp),
Mockito.mock(MetadataCache.class), logManager, isrChannelManager, transferLeaderManager);
Mockito.mock(MetadataCache.class), logManager, isrChannelManager, transferLeaderManager, 0L);

partition.makeFollower(partitionState, offsetCheckpoints, topicId);
pool.put(tp, partition);
Expand Down Expand Up @@ -275,7 +275,7 @@ public boolean isThrottled(TopicPartition topicPartition) {
}
},
Option.empty());

pool = partitions;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public void setup() throws IOException {
partition = new Partition(tp, 100,
ApiVersion$.MODULE$.latestVersion(), 0, Time.SYSTEM,
isrChangeListener, delayedOperations,
Mockito.mock(MetadataCache.class), logManager, alterIsrManager, transferLeaderManager);
Mockito.mock(MetadataCache.class), logManager, alterIsrManager, transferLeaderManager, 0L);
partition.createLogIfNotExists(true, false, offsetCheckpoints, topicId);
executorService.submit((Runnable) () -> {
SimpleRecord[] simpleRecords = new SimpleRecord[] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public void setUp() {
partition = new Partition(topicPartition, 100,
ApiVersion$.MODULE$.latestVersion(), 0, Time.SYSTEM,
isrChangeListener, delayedOperations,
Mockito.mock(MetadataCache.class), logManager, alterIsrManager, transferLeaderManager);
Mockito.mock(MetadataCache.class), logManager, alterIsrManager, transferLeaderManager, 0L);
partition.makeLeader(partitionState, offsetCheckpoints, topicId);
}

Expand Down

0 comments on commit cc778ba

Please sign in to comment.