Skip to content

Commit

Permalink
[LI-HOTFIX] Mark FetchSession cache misses - 3.0-li (#387)
Browse files Browse the repository at this point in the history
  • Loading branch information
dtwitty authored Sep 2, 2022
1 parent 634e3d2 commit 0e7ab47
Showing 1 changed file with 14 additions and 0 deletions.
14 changes: 14 additions & 0 deletions core/src/main/scala/kafka/server/FetchSession.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ object FetchSession {
val NUM_INCREMENTAL_FETCH_PARTITIONS_CACHED = "NumIncrementalFetchPartitionsCached"
val INCREMENTAL_FETCH_SESSIONS_EVICTIONS_PER_SEC = "IncrementalFetchSessionEvictionsPerSec"
val EVICTIONS = "evictions"
val INCREMENTAL_FETCH_SESSION_CACHE_MISSES_PER_SEC = "IncrementalFetchSessionCacheMissesPerSec"
val CACHE_MISSES = "cacheMisses"

def partitionsToLogString(partitions: util.Collection[TopicPartition], traceEnabled: Boolean): String = {
if (traceEnabled) {
Expand Down Expand Up @@ -564,6 +566,10 @@ class FetchSessionCache(private val maxEntries: Int,
private[server] val evictionsMeter = newMeter(FetchSession.INCREMENTAL_FETCH_SESSIONS_EVICTIONS_PER_SEC,
FetchSession.EVICTIONS, TimeUnit.SECONDS, Map.empty)

removeMetric(FetchSession.INCREMENTAL_FETCH_SESSION_CACHE_MISSES_PER_SEC)
val cacheMissesMeter = newMeter(FetchSession.INCREMENTAL_FETCH_SESSION_CACHE_MISSES_PER_SEC,
FetchSession.CACHE_MISSES, TimeUnit.SECONDS, Map.empty)

/**
* Get a session by session ID.
*
Expand All @@ -574,6 +580,13 @@ class FetchSessionCache(private val maxEntries: Int,
sessions.get(sessionId)
}

/**
* Records the event that the cache did not have a session it might have been expected to have (ie. a cache miss).
*/
def markCacheMiss() = synchronized {
cacheMissesMeter.mark();
}

/**
* Get the number of entries currently in the fetch session cache.
*/
Expand Down Expand Up @@ -767,6 +780,7 @@ class FetchManager(private val time: Time,
cache.get(reqMetadata.sessionId) match {
case None => {
debug(s"Session error for ${reqMetadata.sessionId}: no such session ID found.")
cache.markCacheMiss();
new SessionErrorContext(Errors.FETCH_SESSION_ID_NOT_FOUND, reqMetadata)
}
case Some(session) => session.synchronized {
Expand Down

0 comments on commit 0e7ab47

Please sign in to comment.