Skip to content

Commit

Permalink
MINOR: Serialize state change logs for handling LeaderAndIsr and Stop…
Browse files Browse the repository at this point in the history
…Replica requests (apache#8493)

This patch moves the state change logger logs for handling a LeaderAndIsr/StopReplica request inside the replicaStateChangeLock in order to serialize the logs. This helps to tell apart per-partition actions of concurrent LAIR/StopReplica requests in cases where requests pile up waiting on the lock.

Reviewer: Jun Rao <[email protected]>
  • Loading branch information
stanislavkozlovski authored Apr 15, 2020
1 parent 0a50973 commit bd42734
Showing 1 changed file with 20 additions and 20 deletions.
40 changes: 20 additions & 20 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -362,16 +362,16 @@ class ReplicaManager(val config: KafkaConfig,
brokerEpoch: Long,
partitionStates: Map[TopicPartition, StopReplicaPartitionState]
): (mutable.Map[TopicPartition, Errors], Errors) = {
stateChangeLogger.info(s"Handling StopReplica request correlationId $correlationId from controller " +
s"$controllerId for ${partitionStates.size} partitions")
if (stateChangeLogger.isTraceEnabled)
partitionStates.foreach { case (topicPartition, partitionState) =>
stateChangeLogger.trace(s"Received StopReplica request $partitionState " +
s"correlation id $correlationId from controller $controllerId " +
s"epoch $controllerEpoch for partition $topicPartition")
}

replicaStateChangeLock synchronized {
stateChangeLogger.info(s"Handling StopReplica request correlationId $correlationId from controller " +
s"$controllerId for ${partitionStates.size} partitions")
if (stateChangeLogger.isTraceEnabled)
partitionStates.foreach { case (topicPartition, partitionState) =>
stateChangeLogger.trace(s"Received StopReplica request $partitionState " +
s"correlation id $correlationId from controller $controllerId " +
s"epoch $controllerEpoch for partition $topicPartition")
}

val responseMap = new collection.mutable.HashMap[TopicPartition, Errors]
if (controllerEpoch < this.controllerEpoch) {
stateChangeLogger.warn(s"Ignoring StopReplica request from " +
Expand Down Expand Up @@ -1237,18 +1237,18 @@ class ReplicaManager(val config: KafkaConfig,
def becomeLeaderOrFollower(correlationId: Int,
leaderAndIsrRequest: LeaderAndIsrRequest,
onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): LeaderAndIsrResponse = {
val controllerId = leaderAndIsrRequest.controllerId
val requestPartitionStates = leaderAndIsrRequest.partitionStates.asScala
stateChangeLogger.info(s"Handling LeaderAndIsr request correlationId $correlationId from controller " +
s"$controllerId for ${requestPartitionStates.size} partitions")
if (stateChangeLogger.isTraceEnabled)
requestPartitionStates.foreach { partitionState =>
stateChangeLogger.trace(s"Received LeaderAndIsr request $partitionState " +
s"correlation id $correlationId from controller $controllerId " +
s"epoch ${leaderAndIsrRequest.controllerEpoch}")
}

replicaStateChangeLock synchronized {
val controllerId = leaderAndIsrRequest.controllerId
val requestPartitionStates = leaderAndIsrRequest.partitionStates.asScala
stateChangeLogger.info(s"Handling LeaderAndIsr request correlationId $correlationId from controller " +
s"$controllerId for ${requestPartitionStates.size} partitions")
if (stateChangeLogger.isTraceEnabled)
requestPartitionStates.foreach { partitionState =>
stateChangeLogger.trace(s"Received LeaderAndIsr request $partitionState " +
s"correlation id $correlationId from controller $controllerId " +
s"epoch ${leaderAndIsrRequest.controllerEpoch}")
}

if (leaderAndIsrRequest.controllerEpoch < controllerEpoch) {
stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from controller $controllerId with " +
s"correlation id $correlationId since its controller epoch ${leaderAndIsrRequest.controllerEpoch} is old. " +
Expand Down

0 comments on commit bd42734

Please sign in to comment.