Skip to content

Commit

Permalink
feat: group by multi/single replica partitions (#124)
Browse files Browse the repository at this point in the history
  • Loading branch information
sverrehu authored Dec 4, 2023
1 parent 1571367 commit 263715c
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 12 deletions.
61 changes: 53 additions & 8 deletions src/main/java/io/statnett/k3a/lagexporter/ClusterLagCollector.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
import org.apache.kafka.clients.admin.ListConsumerGroupsResult;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.errors.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -45,13 +48,14 @@ public ClusterLagCollector(final String clusterName,
this.adminConfig = adminConfig;
}

public ClusterData collect() {
public ClusterData collectClusterData() {
final ClusterData clusterData = new ClusterData(clusterName);
final Set<TopicPartition> topicPartitions = new HashSet<>();
final boolean isFirstRun = admin == null || consumer == null;
final long startMs = System.currentTimeMillis();
final Set<String> allConsumerGroupIds = findAllConsumerGroupIds(getAdmin());
findConsumerGroupOffsets(getAdmin(), allConsumerGroupIds, clusterData, topicPartitions);
findReplicaCounts(getAdmin(), clusterData, topicPartitions);
findEndOffsetsAndUpdateLag(getConsumer(), topicPartitions, clusterData);
final long pollTimeMs = System.currentTimeMillis() - startMs;
if (!isFirstRun) {
Expand Down Expand Up @@ -119,20 +123,61 @@ private static Map<String, ListConsumerGroupOffsetsSpec> toMapForAllOffsets(fina
return map;
}

private void findReplicaCounts(final Admin admin, final ClusterData clusterData, final Set<TopicPartition> topicPartitions) {
final Set<String> topics = new HashSet<>();
for (final TopicPartition topicPartition : topicPartitions) {
topics.add(topicPartition.topic());
}
try {
final Collection<TopicDescription> topicDescriptions = admin.describeTopics(topics).allTopicNames().get().values();
for (final TopicDescription topicDescription : topicDescriptions) {
for (final TopicPartitionInfo topicPartitionInfo : topicDescription.partitions()) {
final TopicPartition topicPartition = new TopicPartition(topicDescription.name(), topicPartitionInfo.partition());
clusterData.findTopicPartitionData(topicPartition).setNumReplicas(topicPartitionInfo.replicas().size());
}
}
} catch (final Exception e) {
throw new RuntimeException(e);
}
}

private void findEndOffsetsAndUpdateLag(final Consumer<?, ?> consumer, final Set<TopicPartition> topicPartitions, final ClusterData clusterData) {
long t = System.currentTimeMillis();
final Map<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitions);
for (final Map.Entry<TopicPartition, Long> entry : endOffsets.entrySet()) {
final TopicPartition partition = entry.getKey();
final Long offset = entry.getValue();
final TopicPartitionData topicPartitionData = clusterData.findTopicPartitionData(partition);
topicPartitionData.setEndOffset(offset == null ? -1 : offset);
topicPartitionData.calculateLags();
final Set<TopicPartition> multiReplicaPartitions = new HashSet<>();
final Set<TopicPartition> singleReplicaPartitions = new HashSet<>();
for (final TopicPartition topicPartition : topicPartitions) {
if (clusterData.findTopicPartitionData(topicPartition).getNumReplicas() > 1) {
multiReplicaPartitions.add(topicPartition);
} else {
singleReplicaPartitions.add(topicPartition);
}
}
findEndOffsetsAndUpdateLagImpl(consumer, multiReplicaPartitions, clusterData);
findEndOffsetsAndUpdateLagImpl(consumer, singleReplicaPartitions, clusterData);
t = System.currentTimeMillis() - t;
LOG.debug("Found end offsets in " + t + " ms");
}

private void findEndOffsetsAndUpdateLagImpl(final Consumer<?, ?> consumer, final Set<TopicPartition> topicPartitions, final ClusterData clusterData) {
try {
final Map<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitions);
for (final Map.Entry<TopicPartition, Long> entry : endOffsets.entrySet()) {
final TopicPartition partition = entry.getKey();
final Long offset = entry.getValue();
final TopicPartitionData topicPartitionData = clusterData.findTopicPartitionData(partition);
topicPartitionData.setEndOffset(offset == null ? -1 : offset);
topicPartitionData.calculateLags();
}
} catch (final TimeoutException e) {
LOG.warn("Got timeout while querying end offsets. Some partitions may be offline.");
for (final TopicPartition topicPartition : topicPartitions) {
final TopicPartitionData topicPartitionData = clusterData.findTopicPartitionData(topicPartition);
topicPartitionData.setEndOffset(-1);
topicPartitionData.calculateLags();
}
}
}

private Admin getAdmin() {
if (admin == null) {
admin = AdminClient.create(adminConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public void runLoop() {
Conf.getConsumerConfig(), Conf.getAdminConfig());
for (;;) {
long t = System.currentTimeMillis();
final ClusterData clusterData = collector.collect();
final ClusterData clusterData = collector.collectClusterData();
prometheusReporter.publish(clusterData);
t = System.currentTimeMillis() - t;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,14 @@ private void publishConsumerGroupLag(final String clusterName, final ClusterData
final String topic = topicPartitionData.getTopicPartition().topic();
final String partition = String.valueOf(topicPartitionData.getTopicPartition().partition());
for (final ConsumerGroupData consumerGroupData : topicPartitionData.getConsumerGroupDataMap().values()) {
final long lag = consumerGroupData.getLag();
if (lag < 0) {
continue;
}
final String consumerGroupId = consumerGroupData.getConsumerGroupId();
consumerGroupLagGauge
.labelValues(clusterName, consumerGroupId, topic, partition)
.set(consumerGroupData.getLag());
.set(lag);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ public final class TopicPartitionData {

private final TopicPartition topicPartition;
private long endOffset = -1;
private int numReplicas = -1;
private final Map<String, ConsumerGroupData> consumerGroupDataMap = new HashMap<>();

public TopicPartitionData(final TopicPartition topicPartition) {
Expand All @@ -27,6 +28,14 @@ public void setEndOffset(final long endOffset) {
this.endOffset = endOffset;
}

public int getNumReplicas() {
return numReplicas;
}

public void setNumReplicas(final int numReplicas) {
this.numReplicas = numReplicas;
}

public Map<String, ConsumerGroupData> getConsumerGroupDataMap() {
return consumerGroupDataMap;
}
Expand All @@ -40,7 +49,11 @@ public ConsumerGroupData findConsumerGroupData(final String consumerGroupId) {
public void calculateLags() {
synchronized (consumerGroupDataMap) {
for (final ConsumerGroupData consumerGroupData : consumerGroupDataMap.values()) {
consumerGroupData.setLag(Math.max(0, endOffset - consumerGroupData.getOffset()));
if (endOffset < 0) {
consumerGroupData.setLag(-1);
} else {
consumerGroupData.setLag(Math.max(0, endOffset - consumerGroupData.getOffset()));
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public void shouldDetectLag() {
}

private void assertLag(final int expected) {
final ClusterData clusterData = lagCollector.collect();
final ClusterData clusterData = lagCollector.collectClusterData();
final TopicPartitionData topicPartitionData = clusterData.findTopicPartitionData(new TopicPartition(TOPIC, 0));
assertNotNull(topicPartitionData);
final ConsumerGroupData consumerGroupData = topicPartitionData.findConsumerGroupData(CONSUMER_GROUP_ID);
Expand Down

0 comments on commit 263715c

Please sign in to comment.