Skip to content

Commit

Permalink
[LI-HOTFIX] Return valid data during throttling (#514)
Browse files Browse the repository at this point in the history
In KIP-219, people decided to changed the throttling behavior from "wait for throttleTimeMs before sending back response" to "return immediately with empty response and mute the channel by throttleTimeMs". While it is reasonable in terms of it strictly limit the Fetch Bytes Rate to below the quota value, it introduces a new problem that the request already takes the broker system resource but the consumer does not get any useful data. As a result, the broker could be busy with handling throttled requests and keep throttle incoming request, while none of the request has data and consumers get stuck.

To mitigate it, we propose to actually return the data in the throttled response. By doing it, the consumer can still slowly proceed even though all the requests are throttled.
  • Loading branch information
CCisGG authored Jun 6, 2024
1 parent 42e6fae commit d4f6f91
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -410,20 +410,16 @@ public boolean handleResponse(FetchResponse response) {
return false;
}
if (nextMetadata.isFull()) {
if (response.responseData().isEmpty() && response.throttleTimeMs() > 0) {
// Normally, an empty full fetch response would be invalid. However, KIP-219
// specifies that if the broker wants to throttle the client, it will respond
// to a full fetch request with an empty response and a throttleTimeMs
// value set. We don't want to log this with a warning, since it's not an error.
// However, the empty full fetch response can't be processed, so it's still appropriate
// to return false here.
if (response.throttleTimeMs() > 0) {
// [LIKAFKA-59133] To avoid stuck consumer, we made a server side change to return valid fetch responses
// even when the request is throttled. To honor the server side change, we log the throttling and still
// handle the fetch response.
if (log.isDebugEnabled()) {
log.debug("Node {} sent a empty full fetch response to indicate that this " +
"client should be throttled for {} ms.", node, response.throttleTimeMs());
log.debug("Node {} sent a response indicate the request is throttled for {} ms.", node,
response.throttleTimeMs());
}
nextMetadata = FetchMetadata.INITIAL;
return false;
}

String problem = verifyFullFetchResponsePartitions(response);
if (problem != null) {
log.info("Node {} sent an invalid full fetch response with {}", node, problem);
Expand Down
13 changes: 6 additions & 7 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -990,24 +990,23 @@ class KafkaApis(val requestChannel: RequestChannel,
val bandwidthThrottleTimeMs = quotas.fetch.maybeRecordAndGetThrottleTimeMs(request, responseSize, timeMs)

val maxThrottleTimeMs = math.max(bandwidthThrottleTimeMs, requestThrottleTimeMs)

// [LIKAFKA-59133] We made a change here to actually fill in the data to the fetch response even when throttling happens.
// This prevents the consumers completely getting stuck when throttling happens intensively.
unconvertedFetchResponse = fetchContext.updateAndGenerateResponseData(partitions)

// [LIKAFKA-45345] even if the throttleTimeMs is 0, we still record it so that
// the throttle-time sensor does not expire before the byte-rate sensor in quotas.fetch
// or the request-time sensor in quotas.request.
val (effectiveBandwidthThrottleTime, effectiveRequestThrottleTime) = if (maxThrottleTimeMs > 0) {
request.apiThrottleTimeMs = maxThrottleTimeMs
// Even if we need to throttle for request quota violation, we should "unrecord" the already recorded value
// from the fetch quota because we are going to return an empty response.
quotas.fetch.unrecordQuotaSensor(request, responseSize, timeMs)
// If throttling is required, return an empty response.
unconvertedFetchResponse = fetchContext.getThrottledResponse(maxThrottleTimeMs)

if (bandwidthThrottleTimeMs > requestThrottleTimeMs) {
(bandwidthThrottleTimeMs, 0)
} else {
(0, requestThrottleTimeMs)
}
} else {
// Get the actual response. This will update the fetch context.
unconvertedFetchResponse = fetchContext.updateAndGenerateResponseData(partitions)
trace(s"Sending Fetch response with partitions.size=$responseSize, metadata=${unconvertedFetchResponse.sessionId}")
(0, 0)
}
Expand Down

0 comments on commit d4f6f91

Please sign in to comment.