From 9a142e54d438a44c6d1450a382ea15523b1a0ecd Mon Sep 17 00:00:00 2001 From: Maik Toepfer Date: Sun, 24 Apr 2022 21:17:47 +0200 Subject: [PATCH 01/15] consistency - use utility method for sleeping --- .../monitor/apps/SingleClusterMonitor.java | 10 ++++------ .../monitor/services/ConsumeService.java | 6 +++--- .../xinfra/monitor/XinfraMonitorTest.java | 5 ++++- .../monitor/services/ConsumeServiceTest.java | 18 +++++++----------- 4 files changed, 18 insertions(+), 21 deletions(-) diff --git a/src/main/java/com/linkedin/xinfra/monitor/apps/SingleClusterMonitor.java b/src/main/java/com/linkedin/xinfra/monitor/apps/SingleClusterMonitor.java index a44b0827..ccac7e9d 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/apps/SingleClusterMonitor.java +++ b/src/main/java/com/linkedin/xinfra/monitor/apps/SingleClusterMonitor.java @@ -24,6 +24,8 @@ import com.linkedin.xinfra.monitor.services.configs.ProduceServiceConfig; import com.linkedin.xinfra.monitor.services.configs.TopicManagementServiceConfig; import com.linkedin.xinfra.monitor.services.metrics.ClusterTopicManipulationMetrics; + +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -39,6 +41,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static com.linkedin.xinfra.monitor.common.Utils.delay; import static com.linkedin.xinfra.monitor.common.Utils.prettyPrint; /* @@ -94,14 +97,9 @@ public void start() throws Exception { _topicManagementService.start(); CompletableFuture topicPartitionResult = _topicManagementService.topicPartitionResult(); - try { /* Delay 2 second to reduce the chance that produce and consumer thread has race condition with TopicManagementService and MultiClusterTopicManagementService */ - long threadSleepMs = TimeUnit.SECONDS.toMillis(2); - Thread.sleep(threadSleepMs); - } catch (InterruptedException e) { - throw new Exception("Interrupted while sleeping the thread", e); - } + delay(Duration.ofSeconds(2)); CompletableFuture topicPartitionFuture = topicPartitionResult.thenRun(() -> { for (Service service : _allServices) { if (!service.isRunning()) { diff --git a/src/main/java/com/linkedin/xinfra/monitor/services/ConsumeService.java b/src/main/java/com/linkedin/xinfra/monitor/services/ConsumeService.java index 63caff1c..2dce737f 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/services/ConsumeService.java +++ b/src/main/java/com/linkedin/xinfra/monitor/services/ConsumeService.java @@ -117,9 +117,9 @@ public ConsumeService(String name, topicPartitionFuture.get(); } - private void consume() throws Exception { + private void consume() { /* Delay 1 second to reduce the chance that consumer creates topic before TopicManagementService */ - Thread.sleep(1000); + Utils.delay(Duration.ofSeconds(1)); Map nextIndexes = new HashMap<>(); @@ -132,7 +132,7 @@ record = _baseConsumer.receive(); LOG.warn(_name + "/ConsumeService failed to receive record", e); /* Avoid busy while loop */ //noinspection BusyWait - Thread.sleep(CONSUME_THREAD_SLEEP_MS); + Utils.delay(Duration.ofMillis(CONSUME_THREAD_SLEEP_MS)); continue; } diff --git a/src/test/java/com/linkedin/xinfra/monitor/XinfraMonitorTest.java b/src/test/java/com/linkedin/xinfra/monitor/XinfraMonitorTest.java index 9867718d..14874bc6 100644 --- a/src/test/java/com/linkedin/xinfra/monitor/XinfraMonitorTest.java +++ b/src/test/java/com/linkedin/xinfra/monitor/XinfraMonitorTest.java @@ -10,8 +10,11 @@ package com.linkedin.xinfra.monitor; +import com.linkedin.xinfra.monitor.common.Utils; import com.linkedin.xinfra.monitor.services.ServiceFactory; import com.linkedin.xinfra.monitor.services.Service; + +import java.time.Duration; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -71,7 +74,7 @@ public void run() { t.start(); xinfraMonitor.start(); - Thread.sleep(100); + Utils.delay(Duration.ofMillis(100)); xinfraMonitor.stop(); t.join(500); org.testng.Assert.assertFalse(t.isAlive()); diff --git a/src/test/java/com/linkedin/xinfra/monitor/services/ConsumeServiceTest.java b/src/test/java/com/linkedin/xinfra/monitor/services/ConsumeServiceTest.java index 8d11fd04..77a89598 100644 --- a/src/test/java/com/linkedin/xinfra/monitor/services/ConsumeServiceTest.java +++ b/src/test/java/com/linkedin/xinfra/monitor/services/ConsumeServiceTest.java @@ -14,6 +14,8 @@ import com.linkedin.xinfra.monitor.consumer.BaseConsumerRecord; import com.linkedin.xinfra.monitor.consumer.KMBaseConsumer; import com.linkedin.xinfra.monitor.services.metrics.CommitLatencyMetrics; + +import java.time.Duration; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -83,12 +85,9 @@ public void commitAvailabilityTest() throws Exception { consumeService.startConsumeThreadForTesting(); Assert.assertTrue(consumeService.isRunning()); - /* in milliseconds */ - long threadStartDelay = TimeUnit.SECONDS.toMillis(THREAD_START_DELAY_SECONDS); - - /* Thread.sleep safe to do here instead of ScheduledExecutorService + /* delay safe to do here instead of ScheduledExecutorService * We want to sleep current thread so that consumeService can start running for enough seconds. */ - Thread.sleep(threadStartDelay); + Utils.delay(Duration.ofSeconds(THREAD_START_DELAY_SECONDS)); Assert.assertNotNull(metrics.metrics().get(metrics.metricName("offsets-committed-total", METRIC_GROUP_NAME, tags)).metricValue()); Assert.assertNotNull(metrics.metrics().get(metrics.metricName("failed-commit-offsets-total", METRIC_GROUP_NAME, tags)).metricValue()); @@ -112,12 +111,9 @@ public void commitLatencyTest() throws Exception { consumeService.startConsumeThreadForTesting(); Assert.assertTrue(consumeService.isRunning()); - /* in milliseconds */ - long threadStartDelay = TimeUnit.SECONDS.toMillis(THREAD_START_DELAY_SECONDS); - - /* Thread.sleep safe to do here instead of ScheduledExecutorService + /* delay safe to do here instead of ScheduledExecutorService * We want to sleep current thread so that consumeService can start running for enough seconds. */ - Thread.sleep(threadStartDelay); + Utils.delay(Duration.ofSeconds(THREAD_START_DELAY_SECONDS)); shutdownConsumeService(consumeService); } @@ -190,7 +186,7 @@ public void run() { thread.start(); consumeService.startConsumeThreadForTesting(); - Thread.sleep(100); + Utils.delay(Duration.ofMillis(100)); consumeService.stop(); thread.join(500); From 7a44bf25a0f7da4425a3fd96f1436c0b3144abb2 Mon Sep 17 00:00:00 2001 From: Maik Toepfer Date: Sun, 24 Apr 2022 21:32:26 +0200 Subject: [PATCH 02/15] no raw types --- .../com/linkedin/xinfra/monitor/XinfraMonitor.java | 7 +++---- .../linkedin/xinfra/monitor/XinfraMonitorTest.java | 12 +++++++----- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/src/main/java/com/linkedin/xinfra/monitor/XinfraMonitor.java b/src/main/java/com/linkedin/xinfra/monitor/XinfraMonitor.java index d516b076..daecad30 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/XinfraMonitor.java +++ b/src/main/java/com/linkedin/xinfra/monitor/XinfraMonitor.java @@ -60,11 +60,11 @@ public class XinfraMonitor { */ @SuppressWarnings({"rawtypes"}) - public XinfraMonitor(Map allClusterProps) throws Exception { + public XinfraMonitor(Map> allClusterProps) throws Exception { _apps = new ConcurrentHashMap<>(); _services = new ConcurrentHashMap<>(); - for (Map.Entry clusterProperty : allClusterProps.entrySet()) { + for (Map.Entry> clusterProperty : allClusterProps.entrySet()) { String clusterName = clusterProperty.getKey(); Map props = clusterProperty.getValue(); if (!props.containsKey(XinfraMonitorConstants.CLASS_NAME_CONFIG)) @@ -165,7 +165,6 @@ public void awaitShutdown() { service.awaitShutdown(Integer.MAX_VALUE, TimeUnit.MILLISECONDS); } - @SuppressWarnings("rawtypes") public static void main(String[] args) throws Exception { if (args.length <= 0) { LOG.info("USAGE: java [options] " + XinfraMonitor.class.getName() + " config/xinfra-monitor.properties"); @@ -182,7 +181,7 @@ public static void main(String[] args) throws Exception { } @SuppressWarnings("unchecked") - Map props = new ObjectMapper().readValue(buffer.toString(), Map.class); + Map> props = new ObjectMapper().readValue(buffer.toString(), Map.class); XinfraMonitor xinfraMonitor = new XinfraMonitor(props); xinfraMonitor.start(); LOG.info("Xinfra Monitor has started."); diff --git a/src/test/java/com/linkedin/xinfra/monitor/XinfraMonitorTest.java b/src/test/java/com/linkedin/xinfra/monitor/XinfraMonitorTest.java index 14874bc6..ab7b2586 100644 --- a/src/test/java/com/linkedin/xinfra/monitor/XinfraMonitorTest.java +++ b/src/test/java/com/linkedin/xinfra/monitor/XinfraMonitorTest.java @@ -21,6 +21,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; + +import org.testng.Assert; import org.testng.annotations.Test; @@ -77,13 +79,13 @@ public void run() { Utils.delay(Duration.ofMillis(100)); xinfraMonitor.stop(); t.join(500); - org.testng.Assert.assertFalse(t.isAlive()); - org.testng.Assert.assertEquals(error.get(), null); + Assert.assertFalse(t.isAlive()); + Assert.assertNull(error.get()); } private XinfraMonitor xinfraMonitor() throws Exception { FakeService.clearCounters(); - Map config = new HashMap<>(); + Map> config = new HashMap<>(); Map fakeServiceConfig = new HashMap<>(); fakeServiceConfig.put(XinfraMonitorConstants.CLASS_NAME_CONFIG, FakeService.class.getName()); @@ -109,7 +111,7 @@ public FakeServiceFactory(Map config, String serviceInstanceName) { @SuppressWarnings("unchecked") @Override - public Service createService() throws Exception { + public Service createService() { return new XinfraMonitorTest.FakeService(_config, _serviceInstanceName); @@ -123,7 +125,7 @@ static final class FakeService implements Service { private final AtomicBoolean _isRunning = new AtomicBoolean(); /** required */ - public FakeService(Map config, String serviceInstanceName) { + public FakeService(Map> config, String serviceInstanceName) { } From dab0a20e8fb7de96402c6ab1733b36b9baad8ce4 Mon Sep 17 00:00:00 2001 From: Maik Toepfer Date: Sun, 24 Apr 2022 21:33:16 +0200 Subject: [PATCH 03/15] dead code --- .../com/linkedin/xinfra/monitor/XinfraMonitor.java | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/src/main/java/com/linkedin/xinfra/monitor/XinfraMonitor.java b/src/main/java/com/linkedin/xinfra/monitor/XinfraMonitor.java index daecad30..d59486b8 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/XinfraMonitor.java +++ b/src/main/java/com/linkedin/xinfra/monitor/XinfraMonitor.java @@ -16,7 +16,6 @@ import com.linkedin.xinfra.monitor.services.ServiceFactory; import java.io.BufferedReader; import java.io.FileReader; -import java.lang.reflect.Constructor; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -94,15 +93,6 @@ public XinfraMonitor(Map> allClusterProps) throws Ex (config, now) -> _offlineRunnables.size()); } - private boolean constructorContainsClass(Constructor[] constructors, Class classObject) { - for (int n = 0; n < constructors[0].getParameterTypes().length; ++n) { - if (constructors[0].getParameterTypes()[n].equals(classObject)) { - return true; - } - } - return false; - } - public synchronized void start() throws Exception { if (!_isRunning.compareAndSet(false, true)) { return; From 10f8d3e3b9b539b97011bb9a4f4d1d752638ae39 Mon Sep 17 00:00:00 2001 From: Maik Toepfer Date: Sun, 24 Apr 2022 21:46:41 +0200 Subject: [PATCH 04/15] dead code --- .../monitor/services/ClusterTopicManipulationService.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/main/java/com/linkedin/xinfra/monitor/services/ClusterTopicManipulationService.java b/src/main/java/com/linkedin/xinfra/monitor/services/ClusterTopicManipulationService.java index 56c3ddc2..549cbed3 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/services/ClusterTopicManipulationService.java +++ b/src/main/java/com/linkedin/xinfra/monitor/services/ClusterTopicManipulationService.java @@ -67,7 +67,6 @@ public class ClusterTopicManipulationService implements Service { private final ClusterTopicManipulationMetrics _clusterTopicManipulationMetrics; private final TopicFactory _topicFactory; - private final String _zkConnect; public ClusterTopicManipulationService(String name, AdminClient adminClient, Map props) throws ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InvocationTargetException, @@ -96,7 +95,6 @@ public ClusterTopicManipulationService(String name, AdminClient adminClient, Map TopicManagementServiceConfig.TOPIC_FACTORY_PROPS_CONFIG) : new HashMap(); _clusterTopicManipulationMetrics = new ClusterTopicManipulationMetrics(metrics, tags); - _zkConnect = config.getString(TopicManagementServiceConfig.ZOOKEEPER_CONNECT_CONFIG); _topicFactory = (TopicFactory) Class.forName(topicFactoryClassName).getConstructor(Map.class).newInstance(topicFactoryConfig); } From f9c41b9ec3edcb43a31e52755ad952c4d384c4dc Mon Sep 17 00:00:00 2001 From: Maik Toepfer Date: Sun, 24 Apr 2022 21:56:03 +0200 Subject: [PATCH 05/15] deprecated API usage --- .../java/com/linkedin/xinfra/monitor/consumer/NewConsumer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/linkedin/xinfra/monitor/consumer/NewConsumer.java b/src/main/java/com/linkedin/xinfra/monitor/consumer/NewConsumer.java index e958d43c..17059b44 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/consumer/NewConsumer.java +++ b/src/main/java/com/linkedin/xinfra/monitor/consumer/NewConsumer.java @@ -85,7 +85,7 @@ public void commitAsync(OffsetCommitCallback callback) { @Override public OffsetAndMetadata committed(TopicPartition tp) { - return _consumer.committed(tp); + return _consumer.committed(Collections.singleton(tp)).get(tp); } @Override From 60621c5e086785df918286523656605391aedc23 Mon Sep 17 00:00:00 2001 From: Maik Toepfer Date: Sun, 24 Apr 2022 21:57:07 +0200 Subject: [PATCH 06/15] unnecessary toString() --- .../xinfra/monitor/services/DefaultMetricsReporterService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/linkedin/xinfra/monitor/services/DefaultMetricsReporterService.java b/src/main/java/com/linkedin/xinfra/monitor/services/DefaultMetricsReporterService.java index 64a62bf1..88d305f1 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/services/DefaultMetricsReporterService.java +++ b/src/main/java/com/linkedin/xinfra/monitor/services/DefaultMetricsReporterService.java @@ -84,6 +84,6 @@ private void reportMetrics() { builder.append("\n"); } } - LOG.info("{}\n{}", LOG_DIVIDER, builder.toString()); + LOG.info("{}\n{}", LOG_DIVIDER, builder); } } From 56656cd7f4275814f9e79c2b3ae8087b7d937ad7 Mon Sep 17 00:00:00 2001 From: Maik Toepfer Date: Sun, 24 Apr 2022 22:07:47 +0200 Subject: [PATCH 07/15] unused type parameters --- .../monitor/services/SignalFxMetricsReporterService.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/linkedin/xinfra/monitor/services/SignalFxMetricsReporterService.java b/src/main/java/com/linkedin/xinfra/monitor/services/SignalFxMetricsReporterService.java index e84f1200..bf95a08a 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/services/SignalFxMetricsReporterService.java +++ b/src/main/java/com/linkedin/xinfra/monitor/services/SignalFxMetricsReporterService.java @@ -52,8 +52,8 @@ public SignalFxMetricsReporterService(Map props, String name) th _executor = Executors.newSingleThreadScheduledExecutor(); _metricRegistry = new MetricRegistry(); - _metricMap = new HashMap(); - _dimensionsMap = new HashMap(); + _metricMap = new HashMap<>(); + _dimensionsMap = new HashMap<>(); if (props.containsKey(SignalFxMetricsReporterServiceConfig.SIGNALFX_METRIC_DIMENSION)) { _dimensionsMap = (Map) props.get(SignalFxMetricsReporterServiceConfig.SIGNALFX_METRIC_DIMENSION); } From 7f92b6eaf2e9a1bb322857bd20583cefa4c73c2f Mon Sep 17 00:00:00 2001 From: Maik Toepfer Date: Sun, 24 Apr 2022 22:16:30 +0200 Subject: [PATCH 08/15] inline --- src/main/java/com/linkedin/xinfra/monitor/common/Utils.java | 4 +--- .../linkedin/xinfra/monitor/services/ConsumeServiceTest.java | 3 +-- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/linkedin/xinfra/monitor/common/Utils.java b/src/main/java/com/linkedin/xinfra/monitor/common/Utils.java index d920437d..c9b50ce1 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/common/Utils.java +++ b/src/main/java/com/linkedin/xinfra/monitor/common/Utils.java @@ -117,9 +117,7 @@ public static List replicaIdentifiers(Set brokers) { Collections.shuffle(brokerMetadataList); // Get broker ids for replica list - List replicaList = brokerMetadataList.stream().map(m -> m.id()).collect(Collectors.toList()); - - return replicaList; + return brokerMetadataList.stream().map(m -> m.id()).collect(Collectors.toList()); } /** diff --git a/src/test/java/com/linkedin/xinfra/monitor/services/ConsumeServiceTest.java b/src/test/java/com/linkedin/xinfra/monitor/services/ConsumeServiceTest.java index 77a89598..aef53449 100644 --- a/src/test/java/com/linkedin/xinfra/monitor/services/ConsumeServiceTest.java +++ b/src/test/java/com/linkedin/xinfra/monitor/services/ConsumeServiceTest.java @@ -203,8 +203,7 @@ public void run() { */ private Metrics consumeServiceMetrics(ConsumeService consumeService) { setup(); - Metrics metrics = consumeService.metrics(); - return metrics; + return consumeService.metrics(); } /** From 522ee8381c57c476cfea91166ce9745c41e928d2 Mon Sep 17 00:00:00 2001 From: Maik Toepfer Date: Sun, 24 Apr 2022 22:27:08 +0200 Subject: [PATCH 09/15] unused constants --- src/main/java/com/linkedin/xinfra/monitor/common/Utils.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/main/java/com/linkedin/xinfra/monitor/common/Utils.java b/src/main/java/com/linkedin/xinfra/monitor/common/Utils.java index c9b50ce1..ce2988ed 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/common/Utils.java +++ b/src/main/java/com/linkedin/xinfra/monitor/common/Utils.java @@ -55,8 +55,6 @@ */ public class Utils { private static final Logger LOG = LoggerFactory.getLogger(Utils.class); - public static final int ZK_CONNECTION_TIMEOUT_MS = 30_000; - public static final int ZK_SESSION_TIMEOUT_MS = 30_000; private static final long LIST_PARTITION_REASSIGNMENTS_TIMEOUT_MS = 60000L; private static final int LIST_PARTITION_REASSIGNMENTS_MAX_ATTEMPTS = 3; private static final String LIST_PARTITION_REASSIGNMENTS_TIMEOUT_MS_CONFIG = "list.partition.reassignment.timeout.ms"; From faad4fd184375f04b40c9665bcd682b3f04748ee Mon Sep 17 00:00:00 2001 From: Maik Toepfer Date: Sun, 24 Apr 2022 22:27:31 +0200 Subject: [PATCH 10/15] utility class --- .../com/linkedin/xinfra/monitor/XinfraMonitorConstants.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/main/java/com/linkedin/xinfra/monitor/XinfraMonitorConstants.java b/src/main/java/com/linkedin/xinfra/monitor/XinfraMonitorConstants.java index f22c63c9..9db078b9 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/XinfraMonitorConstants.java +++ b/src/main/java/com/linkedin/xinfra/monitor/XinfraMonitorConstants.java @@ -15,9 +15,7 @@ */ public class XinfraMonitorConstants { - public XinfraMonitorConstants() { - - } + private XinfraMonitorConstants() {} public static final String TAGS_NAME = "name"; From 7113e3f95f5b99c3d40d1c1f5dbb86a8cedcd142 Mon Sep 17 00:00:00 2001 From: Maik Toepfer Date: Sun, 24 Apr 2022 22:31:29 +0200 Subject: [PATCH 11/15] use lambdas --- .../monitor/services/ConsumeService.java | 17 ++++----- .../GraphiteMetricsReporterService.java | 17 ++++----- .../monitor/services/OffsetCommitService.java | 7 +--- .../metrics/OffsetCommitServiceMetrics.java | 37 +++++++++---------- .../monitor/services/ConsumeServiceTest.java | 15 +++----- 5 files changed, 38 insertions(+), 55 deletions(-) diff --git a/src/main/java/com/linkedin/xinfra/monitor/services/ConsumeService.java b/src/main/java/com/linkedin/xinfra/monitor/services/ConsumeService.java index 2dce737f..05a1e2c3 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/services/ConsumeService.java +++ b/src/main/java/com/linkedin/xinfra/monitor/services/ConsumeService.java @@ -152,16 +152,13 @@ record = _baseConsumer.receive(); int partition = record.partition(); /* Commit availability and commit latency service */ /* Call commitAsync, wait for a NON-NULL return value (see https://issues.apache.org/jira/browse/KAFKA-6183) */ - OffsetCommitCallback commitCallback = new OffsetCommitCallback() { - @Override - public void onComplete(Map topicPartitionOffsetAndMetadataMap, Exception kafkaException) { - if (kafkaException != null) { - LOG.error("Exception while trying to perform an asynchronous commit.", kafkaException); - _commitAvailabilityMetrics._failedCommitOffsets.record(); - } else { - _commitAvailabilityMetrics._offsetsCommitted.record(); - _commitLatencyMetrics.recordCommitComplete(); - } + OffsetCommitCallback commitCallback = (topicPartitionOffsetAndMetadataMap, kafkaException) -> { + if (kafkaException != null) { + LOG.error("Exception while trying to perform an asynchronous commit.", kafkaException); + _commitAvailabilityMetrics._failedCommitOffsets.record(); + } else { + _commitAvailabilityMetrics._offsetsCommitted.record(); + _commitLatencyMetrics.recordCommitComplete(); } }; diff --git a/src/main/java/com/linkedin/xinfra/monitor/services/GraphiteMetricsReporterService.java b/src/main/java/com/linkedin/xinfra/monitor/services/GraphiteMetricsReporterService.java index 24512d7d..6f99ce3a 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/services/GraphiteMetricsReporterService.java +++ b/src/main/java/com/linkedin/xinfra/monitor/services/GraphiteMetricsReporterService.java @@ -52,17 +52,14 @@ public GraphiteMetricsReporterService(Map props, String name) @Override public synchronized void start() { - _executor.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - try { - GraphiteMetricsReporterService.this.reportMetrics(); - } catch (Exception e) { - LOG.error(_name + "/GraphiteMetricsReporterService failed to report metrics", - e); - } + _executor.scheduleAtFixedRate(() -> { + try { + GraphiteMetricsReporterService.this.reportMetrics(); + } catch (Exception e) { + LOG.error(_name + "/GraphiteMetricsReporterService failed to report metrics", + e); } - }, _reportIntervalSec, _reportIntervalSec, TimeUnit.SECONDS + }, _reportIntervalSec, _reportIntervalSec, TimeUnit.SECONDS ); LOG.info("{}/GraphiteMetricsReporterService started", _name); } diff --git a/src/main/java/com/linkedin/xinfra/monitor/services/OffsetCommitService.java b/src/main/java/com/linkedin/xinfra/monitor/services/OffsetCommitService.java index 18c31803..22d1dc34 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/services/OffsetCommitService.java +++ b/src/main/java/com/linkedin/xinfra/monitor/services/OffsetCommitService.java @@ -134,12 +134,7 @@ public class OffsetCommitService implements Service { _consumerNetworkClient = new ConsumerNetworkClient(logContext, kafkaClient, metadata, _time, retryBackoffMs, config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), heartbeatIntervalMs); - ThreadFactory threadFactory = new ThreadFactory() { - @Override - public Thread newThread(Runnable runnable) { - return new Thread(runnable, serviceName + SERVICE_SUFFIX); - } - }; + ThreadFactory threadFactory = runnable -> new Thread(runnable, serviceName + SERVICE_SUFFIX); _scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(threadFactory); LOGGER.info("OffsetCommitService's ConsumerConfig - {}", Utils.prettyPrint(config.values())); diff --git a/src/main/java/com/linkedin/xinfra/monitor/services/metrics/OffsetCommitServiceMetrics.java b/src/main/java/com/linkedin/xinfra/monitor/services/metrics/OffsetCommitServiceMetrics.java index b6a6e753..c823b94c 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/services/metrics/OffsetCommitServiceMetrics.java +++ b/src/main/java/com/linkedin/xinfra/monitor/services/metrics/OffsetCommitServiceMetrics.java @@ -59,27 +59,24 @@ public OffsetCommitServiceMetrics(final Metrics metrics, final Map 0 ? offsetCommitSuccessRate / ( - offsetCommitSuccessRate + offsetCommitFailureRate) : 0; + Measurable measurable = (config, now) -> { + double offsetCommitSuccessRate = (double) metrics.metrics() + .get(metrics.metricName(SUCCESS_RATE_METRIC, METRIC_GROUP_NAME, tags)) + .metricValue(); + double offsetCommitFailureRate = (double) metrics.metrics() + .get(metrics.metricName(FAILURE_RATE_METRIC, METRIC_GROUP_NAME, tags)) + .metricValue(); + + if (new Double(offsetCommitSuccessRate).isNaN()) { + offsetCommitSuccessRate = 0; } + + if (new Double(offsetCommitFailureRate).isNaN()) { + offsetCommitFailureRate = 0; + } + + return offsetCommitSuccessRate + offsetCommitFailureRate > 0 ? offsetCommitSuccessRate / ( + offsetCommitSuccessRate + offsetCommitFailureRate) : 0; }; metrics.addMetric(new MetricName("offset-commit-availability-avg", METRIC_GROUP_NAME, diff --git a/src/test/java/com/linkedin/xinfra/monitor/services/ConsumeServiceTest.java b/src/test/java/com/linkedin/xinfra/monitor/services/ConsumeServiceTest.java index aef53449..5eafd371 100644 --- a/src/test/java/com/linkedin/xinfra/monitor/services/ConsumeServiceTest.java +++ b/src/test/java/com/linkedin/xinfra/monitor/services/ConsumeServiceTest.java @@ -144,16 +144,13 @@ private ConsumeService consumeService() throws Exception { /* define return value */ Mockito.when(kmBaseConsumer.lastCommitted()).thenReturn(MOCK_LAST_COMMITTED_OFFSET); Mockito.when(kmBaseConsumer.committed(Mockito.any())).thenReturn(new OffsetAndMetadata(FIRST_OFFSET)); - Mockito.doAnswer(new Answer() { - @Override - public Void answer(InvocationOnMock invocationOnMock) { - OffsetCommitCallback callback = invocationOnMock.getArgument(0); - Map committedOffsets = new HashMap<>(); - committedOffsets.put(new TopicPartition(TOPIC, PARTITION), new OffsetAndMetadata(FIRST_OFFSET)); - callback.onComplete(committedOffsets, null); + Mockito.doAnswer((Answer) invocationOnMock -> { + OffsetCommitCallback callback = invocationOnMock.getArgument(0); + Map committedOffsets = new HashMap<>(); + committedOffsets.put(new TopicPartition(TOPIC, PARTITION), new OffsetAndMetadata(FIRST_OFFSET)); + callback.onComplete(committedOffsets, null); - return null; - } + return null; }).when(kmBaseConsumer).commitAsync(Mockito.any(OffsetCommitCallback.class)); From a26d410ad06e69414c492aadad78c9b9ab4353be Mon Sep 17 00:00:00 2001 From: Maik Toepfer Date: Sun, 24 Apr 2022 22:32:02 +0200 Subject: [PATCH 12/15] simplify assertion --- .../linkedin/xinfra/monitor/services/ConsumeServiceTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/com/linkedin/xinfra/monitor/services/ConsumeServiceTest.java b/src/test/java/com/linkedin/xinfra/monitor/services/ConsumeServiceTest.java index 5eafd371..2a53104a 100644 --- a/src/test/java/com/linkedin/xinfra/monitor/services/ConsumeServiceTest.java +++ b/src/test/java/com/linkedin/xinfra/monitor/services/ConsumeServiceTest.java @@ -189,7 +189,7 @@ public void run() { thread.join(500); Assert.assertFalse(thread.isAlive()); - Assert.assertEquals(error.get(), null); + Assert.assertNull(error.get()); } From ce9d5bad55a312912cab484e274d52e49308bde9 Mon Sep 17 00:00:00 2001 From: Maik Toepfer Date: Sun, 24 Apr 2022 22:35:27 +0200 Subject: [PATCH 13/15] simplify iteration --- .../xinfra/monitor/services/OffsetCommitServiceFactory.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/main/java/com/linkedin/xinfra/monitor/services/OffsetCommitServiceFactory.java b/src/main/java/com/linkedin/xinfra/monitor/services/OffsetCommitServiceFactory.java index 87bd4f88..dc0352db 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/services/OffsetCommitServiceFactory.java +++ b/src/main/java/com/linkedin/xinfra/monitor/services/OffsetCommitServiceFactory.java @@ -74,9 +74,7 @@ private Properties prepareConfigs(Map props) { Map customProps = (Map) props.get(CommonServiceConfig.CONSUMER_PROPS_CONFIG); if (customProps != null) { - for (Map.Entry entry : customProps.entrySet()) { - consumerProps.put(entry.getKey(), entry.getValue()); - } + consumerProps.putAll(customProps); } return consumerProps; From b46f575490911c99ce5754117c4ee4a10c2c574e Mon Sep 17 00:00:00 2001 From: Maik Toepfer Date: Sun, 24 Apr 2022 22:37:34 +0200 Subject: [PATCH 14/15] checkstyle --- .../com/linkedin/xinfra/monitor/services/ConsumeService.java | 2 -- .../monitor/services/metrics/OffsetCommitServiceMetrics.java | 1 - 2 files changed, 3 deletions(-) diff --git a/src/main/java/com/linkedin/xinfra/monitor/services/ConsumeService.java b/src/main/java/com/linkedin/xinfra/monitor/services/ConsumeService.java index 05a1e2c3..81448776 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/services/ConsumeService.java +++ b/src/main/java/com/linkedin/xinfra/monitor/services/ConsumeService.java @@ -29,10 +29,8 @@ import org.apache.avro.generic.GenericRecord; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.TopicDescription; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.common.MetricName; -import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.JmxReporter; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; diff --git a/src/main/java/com/linkedin/xinfra/monitor/services/metrics/OffsetCommitServiceMetrics.java b/src/main/java/com/linkedin/xinfra/monitor/services/metrics/OffsetCommitServiceMetrics.java index c823b94c..16127b48 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/services/metrics/OffsetCommitServiceMetrics.java +++ b/src/main/java/com/linkedin/xinfra/monitor/services/metrics/OffsetCommitServiceMetrics.java @@ -13,7 +13,6 @@ import java.util.Map; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.metrics.Measurable; -import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Avg; From 53f532f87408463e2e02d9d74ec7799a625e8c25 Mon Sep 17 00:00:00 2001 From: Maik Toepfer Date: Sun, 24 Apr 2022 23:06:57 +0200 Subject: [PATCH 15/15] checkstyle --- .../com/linkedin/xinfra/monitor/services/ConsumeServiceTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/test/java/com/linkedin/xinfra/monitor/services/ConsumeServiceTest.java b/src/test/java/com/linkedin/xinfra/monitor/services/ConsumeServiceTest.java index 2a53104a..370028f5 100644 --- a/src/test/java/com/linkedin/xinfra/monitor/services/ConsumeServiceTest.java +++ b/src/test/java/com/linkedin/xinfra/monitor/services/ConsumeServiceTest.java @@ -27,7 +27,6 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Metrics; import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.slf4j.Logger; import org.slf4j.LoggerFactory;