diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 8b718bcd1605d..b546d638c6202 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -104,7 +104,9 @@ object Partition extends KafkaMetricsGroup { metadataCache = replicaManager.metadataCache, logManager = replicaManager.logManager, alterIsrManager = replicaManager.alterIsrManager, - transferLeaderManager = replicaManager.transferLeaderManager) + transferLeaderManager = replicaManager.transferLeaderManager, + logSlowReplicationThresholdMs = replicaManager.config.longTailProduceRequestLogThresholdMs + ) } def removeMetrics(topicPartition: TopicPartition): Unit = { @@ -228,7 +230,8 @@ class Partition(val topicPartition: TopicPartition, metadataCache: MetadataCache, logManager: LogManager, alterIsrManager: AlterIsrManager, - transferLeaderManager: TransferLeaderManager) extends Logging with KafkaMetricsGroup { + transferLeaderManager: TransferLeaderManager, + logSlowReplicationThresholdMs: Long = Long.MaxValue) extends Logging with KafkaMetricsGroup { def topic: String = topicPartition.topic def partitionId: Int = topicPartition.partition @@ -248,6 +251,12 @@ class Partition(val topicPartition: TopicPartition, @volatile var leaderReplicaIdOpt: Option[Int] = None @volatile private[cluster] var isrState: IsrState = CommittedIsr(Set.empty) @volatile var assignmentState: AssignmentState = SimpleAssignmentState(Seq.empty) + @volatile private var logSlowReplicationBucketStartTime: Long = 0 + @volatile private var logSlowReplicationBucketCounter: Int = 0 + + // The bucket size is set to 1 minute, with max log number 300. The logs will hit max QPS at 5, which should be acceptable. + val logSlowReplicationBucketLengthMs: Long = 60000 + val logSlowReplicationBucketMaxLogCount: Int = 300 // Logs belonging to this partition. Majority of time it will be only one log, but if log directory // is getting changed (as a result of ReplicaAlterLogDirs command), we may have two logs until copy @@ -821,26 +830,12 @@ class Partition(val topicPartition: TopicPartition, * fully caught up to the (local) leader's offset corresponding to this produce request before we acknowledge the * produce request. */ - def checkEnoughReplicasReachOffset(requiredOffset: Long): (Boolean, Errors) = { + def checkEnoughReplicasReachOffset(requiredOffset: Long, requestCreationTime: Long): (Boolean, Errors) = { leaderLogIfLocal match { case Some(leaderLog) => // keep the current immutable replica list reference val curMaximalIsr = isrState.maximalIsr - - if (isTraceEnabled) { - def logEndOffsetString: ((Int, Long)) => String = { - case (brokerId, logEndOffset) => s"broker $brokerId: $logEndOffset" - } - - val curInSyncReplicaObjects = (curMaximalIsr - localBrokerId).flatMap(getReplica) - val replicaInfo = curInSyncReplicaObjects.map(replica => (replica.brokerId, replica.logEndOffset)) - val localLogInfo = (localBrokerId, localLogOrException.logEndOffset) - val (ackedReplicas, awaitingReplicas) = (replicaInfo + localLogInfo).partition { _._2 >= requiredOffset} - - trace(s"Progress awaiting ISR acks for offset $requiredOffset: " + - s"acked: ${ackedReplicas.map(logEndOffsetString)}, " + - s"awaiting ${awaitingReplicas.map(logEndOffsetString)}") - } + maybeLogSlowReplication(curMaximalIsr, requiredOffset, requestCreationTime, leaderLog.topicPartition) val minIsr = leaderLog.config.minInSyncReplicas if (leaderLog.highWatermark >= requiredOffset) { @@ -859,6 +854,44 @@ class Partition(val topicPartition: TopicPartition, } } + def maybeLogSlowReplication(curMaximalIsr: Set[Int], requiredOffset: Long, requestCreationTime: Long, topicPartition: TopicPartition): Unit = { + val now = time.milliseconds() + val timeElapsedMs = time.milliseconds() - requestCreationTime + + if (timeElapsedMs < logSlowReplicationThresholdMs) { + // Only slow replications is worth logging + return + } + + // Check and maybe create new time bucket for log slow replication + if (now - logSlowReplicationBucketStartTime > logSlowReplicationBucketLengthMs) { + logSlowReplicationBucketCounter = 0 + logSlowReplicationBucketStartTime = now + } + + if (logSlowReplicationBucketCounter > logSlowReplicationBucketMaxLogCount) { + // Stop logging if there are already many logs in a time bucket. This is to avoid logs overwhelming the host. + return + } + + def logEndOffsetString: ((Int, Long)) => String = { + case (brokerId, logEndOffset) => s"broker $brokerId: $logEndOffset" + } + + val curInSyncReplicaObjects = (curMaximalIsr - localBrokerId).flatMap(getReplica) + val replicaInfo = curInSyncReplicaObjects.map(replica => (replica.brokerId, replica.logEndOffset)) + val localLogInfo = (localBrokerId, localLogOrException.logEndOffset) + val (ackedReplicas, awaitingReplicas) = (replicaInfo + localLogInfo).partition { + _._2 >= requiredOffset + } + + info(s"Progress awaiting ISR acks for partition ${topicPartition} offset $requiredOffset: " + + s"acked: ${ackedReplicas.map(logEndOffsetString)}, " + + s"awaiting ${awaitingReplicas.map(logEndOffsetString)}. " + + s"Already waited for ${timeElapsedMs} milliseconds so far") + logSlowReplicationBucketCounter += 1 + } + /** * Check and maybe increment the high watermark of the partition; * this function can be triggered when diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala index ced8d5f53a358..112da481370f7 100644 --- a/core/src/main/scala/kafka/server/DelayedProduce.scala +++ b/core/src/main/scala/kafka/server/DelayedProduce.scala @@ -54,7 +54,8 @@ class DelayedProduce(delayMs: Long, produceMetadata: ProduceMetadata, replicaManager: ReplicaManager, responseCallback: Map[TopicPartition, PartitionResponse] => Unit, - lockOpt: Option[Lock] = None) + lockOpt: Option[Lock] = None, + creationTime: Long = System.currentTimeMillis()) extends DelayedOperation(delayMs, lockOpt) { import DelayedOperation._ @@ -95,7 +96,7 @@ class DelayedProduce(delayMs: Long, (false, err) case Right(partition) => - partition.checkEnoughReplicasReachOffset(status.requiredOffset) + partition.checkEnoughReplicasReachOffset(status.requiredOffset, creationTime) } // Case B || C.1 || C.2 diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java index 88c45cfd877f5..dcf711f6152e1 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java @@ -166,7 +166,7 @@ public void setup() throws IOException { TransferLeaderManager transferLeaderManager = Mockito.mock(TransferLeaderManager.class); Partition partition = new Partition(tp, 100, ApiVersion$.MODULE$.latestVersion(), 0, Time.SYSTEM, isrChangeListener, new DelayedOperationsMock(tp), - Mockito.mock(MetadataCache.class), logManager, isrChannelManager, transferLeaderManager); + Mockito.mock(MetadataCache.class), logManager, isrChannelManager, transferLeaderManager, 0L); partition.makeFollower(partitionState, offsetCheckpoints, topicId); pool.put(tp, partition); @@ -275,7 +275,7 @@ public boolean isThrottled(TopicPartition topicPartition) { } }, Option.empty()); - + pool = partitions; } diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java index 459fd010fc7fc..c8a5521e3b6a5 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java @@ -133,7 +133,7 @@ public void setup() throws IOException { partition = new Partition(tp, 100, ApiVersion$.MODULE$.latestVersion(), 0, Time.SYSTEM, isrChangeListener, delayedOperations, - Mockito.mock(MetadataCache.class), logManager, alterIsrManager, transferLeaderManager); + Mockito.mock(MetadataCache.class), logManager, alterIsrManager, transferLeaderManager, 0L); partition.createLogIfNotExists(true, false, offsetCheckpoints, topicId); executorService.submit((Runnable) () -> { SimpleRecord[] simpleRecords = new SimpleRecord[] { diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java index 3a6e4c854098f..98b28bea4c96a 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java @@ -130,7 +130,7 @@ public void setUp() { partition = new Partition(topicPartition, 100, ApiVersion$.MODULE$.latestVersion(), 0, Time.SYSTEM, isrChangeListener, delayedOperations, - Mockito.mock(MetadataCache.class), logManager, alterIsrManager, transferLeaderManager); + Mockito.mock(MetadataCache.class), logManager, alterIsrManager, transferLeaderManager, 0L); partition.makeLeader(partitionState, offsetCheckpoints, topicId); }