Skip to content

Commit

Permalink
[LI-HOTFIX] back-port the fix for KAFKA-8950 - KafkaConsumer stops fe…
Browse files Browse the repository at this point in the history
…tching (#78)

TICKET = KAFKA-8950

    LI_DESCRIPTION = we got this bug when we cherry picked KAFKA-8052 from 2.1

    EXIT_CRITERIA = MANUAL (when future rebases pick up upstream code and this is no longer required)
  • Loading branch information
radai-rosenblatt authored Apr 15, 2020
1 parent 6bfd49b commit 09d704e
Showing 1 changed file with 8 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,14 @@ public synchronized int sendFetches() {
if (log.isDebugEnabled()) {
log.debug("Sending {} {} to broker {}", isolationLevel, data.toString(), fetchTarget);
}
client.send(fetchTarget, request)

RequestFuture<ClientResponse> future = client.send(fetchTarget, request);
// We add the node to the set of nodes with pending fetch requests before adding the
// listener because the future may have been fulfilled on another thread (e.g. during a
// disconnection being handled by the heartbeat thread) which will mean the listener
// will be invoked synchronously.
this.nodesWithPendingFetchRequests.add(entry.getKey().id());
future
.addListener(new RequestFutureListener<ClientResponse>() {
@Override
public void onSuccess(ClientResponse resp) {
Expand Down Expand Up @@ -343,8 +350,6 @@ public void onFailure(RuntimeException e) {
}
}
});

this.nodesWithPendingFetchRequests.add(entry.getKey().id());
}
return fetchRequestMap.size();
}
Expand Down

0 comments on commit 09d704e

Please sign in to comment.