From 4253d3941fff4999e000c85388554c6ffc8c115c Mon Sep 17 00:00:00 2001 From: Lucas Wang Date: Fri, 22 Dec 2023 15:45:31 -0800 Subject: [PATCH] [LI-HOTFIX] Make the msgSendLatencySensor non-static (#500) TICKET = LIKAFKA-56148 LI_DESCRIPTION = The msgSendLatencySensor sensor is used to add the following metrics together with client-id tags to a single KafkaProducer: MetricName [name=message-produce-latency-avg, group=producer-metrics, description=The average latency between record queuing and get acknowledged in ms, tags={client-id=client-id1}], MetricName [name=message-produce-latency-max, group=producer-metrics, description=The average latency between record queuing and get acknowledged in ms, tags={client-id=client-id1}], The current implematation uses a static variable for the msgSendLatencySensor field, which means all KafkaProducer objects in the same process share the same sensor, and hence can interfere with each other. This PR fixes the problem by making the msgSendLatencySensor a non-static field. EXIT_CRITERIA = When this patch is merged in upstream kafka. --- .../kafka/clients/producer/KafkaProducer.java | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 21486978f8290..1445daa240a82 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -240,7 +240,7 @@ public class KafkaProducer implements Producer { private static final String JMX_PREFIX = "kafka.producer"; public static final String NETWORK_THREAD_PREFIX = "kafka-producer-network-thread"; public static final String PRODUCER_METRIC_GROUP_NAME = "producer-metrics"; - private static Sensor msgSendLatencySensor = null; + private final Sensor msgSendLatencySensor; private final String clientId; // Visible for testing @@ -982,7 +982,7 @@ private Future doSend(ProducerRecord record, Callback call log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition); } // producer callback will make sure to call both 'callback' and interceptor callback - Callback interceptCallback = new InterceptorCallback<>(callback, startTime, time, this.interceptors, tp); + Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp, startTime, time, this.msgSendLatencySensor); if (transactionManager != null && transactionManager.isTransactional()) { transactionManager.failIfNotReadyForSend(); @@ -999,7 +999,7 @@ private Future doSend(ProducerRecord record, Callback call log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition); } // producer callback will make sure to call both 'callback' and interceptor callback - interceptCallback = new InterceptorCallback<>(callback, startTime, time, this.interceptors, tp); + interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp, startTime, time, this.msgSendLatencySensor); result = accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs); @@ -1366,9 +1366,6 @@ private static class ClusterAndWaitTime { } } - public static void recordMsgProducingLatency(long start, long now) { - msgSendLatencySensor.record(now - start, now); - } private static class FutureFailure implements Future { @@ -1416,16 +1413,20 @@ private static class InterceptorCallback implements Callback { private final Time time; private final long startTime; - private InterceptorCallback(Callback userCallback, ProducerInterceptors interceptors, TopicPartition tp) { - this(userCallback, Long.MIN_VALUE, null, interceptors, tp); - } + private final Sensor msgSendLatencySensor; + - private InterceptorCallback(Callback userCallback, long startTime, Time time, ProducerInterceptors interceptors, TopicPartition tp) { + private InterceptorCallback(Callback userCallback, ProducerInterceptors interceptors, TopicPartition tp, long startTime, Time time, Sensor msgSendLatencySensor) { this.userCallback = userCallback; this.interceptors = interceptors; this.tp = tp; this.time = time; this.startTime = startTime; + this.msgSendLatencySensor = msgSendLatencySensor; + } + + public void recordMsgProducingLatency(long start, long now) { + msgSendLatencySensor.record(now - start, now); } public void onCompletion(RecordMetadata metadata, Exception exception) {