diff --git a/src/main/java/io/statnett/k3a/lagexporter/ClusterLagCollector.java b/src/main/java/io/statnett/k3a/lagexporter/ClusterLagCollector.java index 0efda84..9a3a159 100644 --- a/src/main/java/io/statnett/k3a/lagexporter/ClusterLagCollector.java +++ b/src/main/java/io/statnett/k3a/lagexporter/ClusterLagCollector.java @@ -10,10 +10,13 @@ import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult; import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec; import org.apache.kafka.clients.admin.ListConsumerGroupsResult; +import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionInfo; +import org.apache.kafka.common.errors.TimeoutException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,13 +48,14 @@ public ClusterLagCollector(final String clusterName, this.adminConfig = adminConfig; } - public ClusterData collect() { + public ClusterData collectClusterData() { final ClusterData clusterData = new ClusterData(clusterName); final Set topicPartitions = new HashSet<>(); final boolean isFirstRun = admin == null || consumer == null; final long startMs = System.currentTimeMillis(); final Set allConsumerGroupIds = findAllConsumerGroupIds(getAdmin()); findConsumerGroupOffsets(getAdmin(), allConsumerGroupIds, clusterData, topicPartitions); + findReplicaCounts(getAdmin(), clusterData, topicPartitions); findEndOffsetsAndUpdateLag(getConsumer(), topicPartitions, clusterData); final long pollTimeMs = System.currentTimeMillis() - startMs; if (!isFirstRun) { @@ -119,20 +123,61 @@ private static Map toMapForAllOffsets(fina return map; } + private void findReplicaCounts(final Admin admin, final ClusterData clusterData, final Set topicPartitions) { + final Set topics = new HashSet<>(); + for (final TopicPartition topicPartition : topicPartitions) { + topics.add(topicPartition.topic()); + } + try { + final Collection topicDescriptions = admin.describeTopics(topics).allTopicNames().get().values(); + for (final TopicDescription topicDescription : topicDescriptions) { + for (final TopicPartitionInfo topicPartitionInfo : topicDescription.partitions()) { + final TopicPartition topicPartition = new TopicPartition(topicDescription.name(), topicPartitionInfo.partition()); + clusterData.findTopicPartitionData(topicPartition).setNumReplicas(topicPartitionInfo.replicas().size()); + } + } + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + private void findEndOffsetsAndUpdateLag(final Consumer consumer, final Set topicPartitions, final ClusterData clusterData) { long t = System.currentTimeMillis(); - final Map endOffsets = consumer.endOffsets(topicPartitions); - for (final Map.Entry entry : endOffsets.entrySet()) { - final TopicPartition partition = entry.getKey(); - final Long offset = entry.getValue(); - final TopicPartitionData topicPartitionData = clusterData.findTopicPartitionData(partition); - topicPartitionData.setEndOffset(offset == null ? -1 : offset); - topicPartitionData.calculateLags(); + final Set multiReplicaPartitions = new HashSet<>(); + final Set singleReplicaPartitions = new HashSet<>(); + for (final TopicPartition topicPartition : topicPartitions) { + if (clusterData.findTopicPartitionData(topicPartition).getNumReplicas() > 1) { + multiReplicaPartitions.add(topicPartition); + } else { + singleReplicaPartitions.add(topicPartition); + } } + findEndOffsetsAndUpdateLagImpl(consumer, multiReplicaPartitions, clusterData); + findEndOffsetsAndUpdateLagImpl(consumer, singleReplicaPartitions, clusterData); t = System.currentTimeMillis() - t; LOG.debug("Found end offsets in " + t + " ms"); } + private void findEndOffsetsAndUpdateLagImpl(final Consumer consumer, final Set topicPartitions, final ClusterData clusterData) { + try { + final Map endOffsets = consumer.endOffsets(topicPartitions); + for (final Map.Entry entry : endOffsets.entrySet()) { + final TopicPartition partition = entry.getKey(); + final Long offset = entry.getValue(); + final TopicPartitionData topicPartitionData = clusterData.findTopicPartitionData(partition); + topicPartitionData.setEndOffset(offset == null ? -1 : offset); + topicPartitionData.calculateLags(); + } + } catch (final TimeoutException e) { + LOG.warn("Got timeout while querying end offsets. Some partitions may be offline."); + for (final TopicPartition topicPartition : topicPartitions) { + final TopicPartitionData topicPartitionData = clusterData.findTopicPartitionData(topicPartition); + topicPartitionData.setEndOffset(-1); + topicPartitionData.calculateLags(); + } + } + } + private Admin getAdmin() { if (admin == null) { admin = AdminClient.create(adminConfig); diff --git a/src/main/java/io/statnett/k3a/lagexporter/MainLagExporterLoop.java b/src/main/java/io/statnett/k3a/lagexporter/MainLagExporterLoop.java index dbad214..4eb5f29 100644 --- a/src/main/java/io/statnett/k3a/lagexporter/MainLagExporterLoop.java +++ b/src/main/java/io/statnett/k3a/lagexporter/MainLagExporterLoop.java @@ -20,7 +20,7 @@ public void runLoop() { Conf.getConsumerConfig(), Conf.getAdminConfig()); for (;;) { long t = System.currentTimeMillis(); - final ClusterData clusterData = collector.collect(); + final ClusterData clusterData = collector.collectClusterData(); prometheusReporter.publish(clusterData); t = System.currentTimeMillis() - t; try { diff --git a/src/main/java/io/statnett/k3a/lagexporter/PrometheusReporter.java b/src/main/java/io/statnett/k3a/lagexporter/PrometheusReporter.java index c06badf..c2da881 100644 --- a/src/main/java/io/statnett/k3a/lagexporter/PrometheusReporter.java +++ b/src/main/java/io/statnett/k3a/lagexporter/PrometheusReporter.java @@ -47,10 +47,14 @@ private void publishConsumerGroupLag(final String clusterName, final ClusterData final String topic = topicPartitionData.getTopicPartition().topic(); final String partition = String.valueOf(topicPartitionData.getTopicPartition().partition()); for (final ConsumerGroupData consumerGroupData : topicPartitionData.getConsumerGroupDataMap().values()) { + final long lag = consumerGroupData.getLag(); + if (lag < 0) { + continue; + } final String consumerGroupId = consumerGroupData.getConsumerGroupId(); consumerGroupLagGauge .labelValues(clusterName, consumerGroupId, topic, partition) - .set(consumerGroupData.getLag()); + .set(lag); } } } diff --git a/src/main/java/io/statnett/k3a/lagexporter/model/TopicPartitionData.java b/src/main/java/io/statnett/k3a/lagexporter/model/TopicPartitionData.java index 9fa7d06..da1b418 100644 --- a/src/main/java/io/statnett/k3a/lagexporter/model/TopicPartitionData.java +++ b/src/main/java/io/statnett/k3a/lagexporter/model/TopicPartitionData.java @@ -9,6 +9,7 @@ public final class TopicPartitionData { private final TopicPartition topicPartition; private long endOffset = -1; + private int numReplicas = -1; private final Map consumerGroupDataMap = new HashMap<>(); public TopicPartitionData(final TopicPartition topicPartition) { @@ -27,6 +28,14 @@ public void setEndOffset(final long endOffset) { this.endOffset = endOffset; } + public int getNumReplicas() { + return numReplicas; + } + + public void setNumReplicas(final int numReplicas) { + this.numReplicas = numReplicas; + } + public Map getConsumerGroupDataMap() { return consumerGroupDataMap; } @@ -40,7 +49,11 @@ public ConsumerGroupData findConsumerGroupData(final String consumerGroupId) { public void calculateLags() { synchronized (consumerGroupDataMap) { for (final ConsumerGroupData consumerGroupData : consumerGroupDataMap.values()) { - consumerGroupData.setLag(Math.max(0, endOffset - consumerGroupData.getOffset())); + if (endOffset < 0) { + consumerGroupData.setLag(-1); + } else { + consumerGroupData.setLag(Math.max(0, endOffset - consumerGroupData.getOffset())); + } } } } diff --git a/src/test/java/io/statnett/k3a/lagexporter/itest/K3aLagExporterIT.java b/src/test/java/io/statnett/k3a/lagexporter/itest/K3aLagExporterIT.java index f3c4f50..69e9827 100644 --- a/src/test/java/io/statnett/k3a/lagexporter/itest/K3aLagExporterIT.java +++ b/src/test/java/io/statnett/k3a/lagexporter/itest/K3aLagExporterIT.java @@ -77,7 +77,7 @@ public void shouldDetectLag() { } private void assertLag(final int expected) { - final ClusterData clusterData = lagCollector.collect(); + final ClusterData clusterData = lagCollector.collectClusterData(); final TopicPartitionData topicPartitionData = clusterData.findTopicPartitionData(new TopicPartition(TOPIC, 0)); assertNotNull(topicPartitionData); final ConsumerGroupData consumerGroupData = topicPartitionData.findConsumerGroupData(CONSUMER_GROUP_ID);