Skip to content

Commit

Permalink
Refactor to use Admin client instead of second set of consumers for e…
Browse files Browse the repository at this point in the history
…mpty check (#3637)

* Refactor to use Admin client instead of second set of consumers for empty check

Signed-off-by: Chase Engelbrecht <[email protected]>

* Remove debug log

Signed-off-by: Chase Engelbrecht <[email protected]>

---------

Signed-off-by: Chase Engelbrecht <[email protected]>
  • Loading branch information
engechas authored Nov 12, 2023
1 parent 504485c commit 4867886
Show file tree
Hide file tree
Showing 8 changed files with 426 additions and 291 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package org.opensearch.dataprepper.plugins.kafka.admin;

import com.google.common.annotations.VisibleForTesting;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.opensearch.dataprepper.plugins.kafka.consumer.TopicEmptinessMetadata;
import org.opensearch.dataprepper.plugins.kafka.util.KafkaClusterAuthConfig;
import org.opensearch.dataprepper.plugins.kafka.util.KafkaSecurityConfigurer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

public class KafkaAdminAccessor {
static final Logger LOG = LoggerFactory.getLogger(KafkaAdminAccessor.class);

private final AdminClient kafkaAdminClient;
private final TopicEmptinessMetadata topicEmptinessMetadata;
private final List<String> consumerGroupIds;

public KafkaAdminAccessor(final KafkaClusterAuthConfig kafkaClusterAuthConfig, final List<String> consumerGroupIds) {
Properties authProperties = new Properties();
KafkaSecurityConfigurer.setAuthProperties(authProperties, kafkaClusterAuthConfig, LOG);
this.kafkaAdminClient = KafkaAdminClient.create(authProperties);
this.topicEmptinessMetadata = new TopicEmptinessMetadata();
this.consumerGroupIds = consumerGroupIds;
}

@VisibleForTesting
KafkaAdminAccessor(final AdminClient kafkaAdminClient, final TopicEmptinessMetadata topicEmptinessMetadata, final List<String> consumerGroupIds) {
this.kafkaAdminClient = kafkaAdminClient;
this.topicEmptinessMetadata = topicEmptinessMetadata;
this.consumerGroupIds = consumerGroupIds;
}

public synchronized boolean areTopicsEmpty() {
final long currentThreadId = Thread.currentThread().getId();
if (Objects.isNull(topicEmptinessMetadata.getTopicEmptyCheckingOwnerThreadId())) {
topicEmptinessMetadata.setTopicEmptyCheckingOwnerThreadId(currentThreadId);
}

if (currentThreadId != topicEmptinessMetadata.getTopicEmptyCheckingOwnerThreadId() ||
topicEmptinessMetadata.isWithinCheckInterval(System.currentTimeMillis())) {
return topicEmptinessMetadata.isTopicEmpty();
}


final Map<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap<>();
for (String consumerGroupId: consumerGroupIds) {
final ListConsumerGroupOffsetsResult listConsumerGroupOffsets = kafkaAdminClient.listConsumerGroupOffsets(consumerGroupId);
try {
committedOffsets.putAll(listConsumerGroupOffsets.partitionsToOffsetAndMetadata().get());
} catch (final InterruptedException | ExecutionException e) {
LOG.error("Caught exception getting committed offset data", e);
return false;
}
}

final Map<TopicPartition, OffsetSpec> listOffsetsRequest = committedOffsets.keySet().stream()
.collect(Collectors.toMap(topicPartition -> topicPartition, topicPartition -> OffsetSpec.latest()));
final Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> endOffsets;
try {
endOffsets = kafkaAdminClient.listOffsets(listOffsetsRequest).all().get();
} catch (final InterruptedException | ExecutionException e) {
LOG.error("Caught exception getting end offset data", e);
return false;
}

for (TopicPartition topicPartition : committedOffsets.keySet()) {
final OffsetAndMetadata offsetAndMetadata = committedOffsets.get(topicPartition);

if (!endOffsets.containsKey(topicPartition)) {
LOG.warn("No end offset found for topic partition: {}", topicPartition);
return false;
}
final long endOffset = endOffsets.get(topicPartition).offset();

topicEmptinessMetadata.updateTopicEmptinessStatus(topicPartition, true);

// If there is data in the partition
if (endOffset != 0L) {
// If there is no committed offset for the partition or the committed offset is behind the end offset
if (Objects.isNull(offsetAndMetadata) || offsetAndMetadata.offset() < endOffset) {
topicEmptinessMetadata.updateTopicEmptinessStatus(topicPartition, false);
}
}
}

topicEmptinessMetadata.setLastIsEmptyCheckTime(System.currentTimeMillis());
return topicEmptinessMetadata.isTopicEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer;
import org.opensearch.dataprepper.plugins.kafka.admin.KafkaAdminAccessor;
import org.opensearch.dataprepper.plugins.kafka.common.serialization.SerializationFactory;
import org.opensearch.dataprepper.plugins.kafka.consumer.KafkaCustomConsumer;
import org.opensearch.dataprepper.plugins.kafka.consumer.KafkaCustomConsumerFactory;
Expand Down Expand Up @@ -48,7 +49,7 @@ public class KafkaBuffer extends AbstractBuffer<Record<Event>> {
static final String WRITE = "Write";
static final String READ = "Read";
private final KafkaCustomProducer producer;
private final List<KafkaCustomConsumer> emptyCheckingConsumers;
private final KafkaAdminAccessor kafkaAdminAccessor;
private final AbstractBuffer<Record<Event>> innerBuffer;
private final ExecutorService executorService;
private final Duration drainTimeout;
Expand All @@ -73,8 +74,7 @@ public KafkaBuffer(final PluginSetting pluginSetting, final KafkaBufferConfig ka
final PluginMetrics consumerMetrics = PluginMetrics.fromNames(metricPrefixName + READ, pluginSetting.getPipelineName());
final List<KafkaCustomConsumer> consumers = kafkaCustomConsumerFactory.createConsumersForTopic(kafkaBufferConfig, kafkaBufferConfig.getTopic(),
innerBuffer, consumerMetrics, acknowledgementSetManager, byteDecoder, shutdownInProgress, false, circuitBreaker);
emptyCheckingConsumers = kafkaCustomConsumerFactory.createConsumersForTopic(kafkaBufferConfig, kafkaBufferConfig.getTopic(),
innerBuffer, consumerMetrics, acknowledgementSetManager, byteDecoder, shutdownInProgress, false, null);
this.kafkaAdminAccessor = new KafkaAdminAccessor(kafkaBufferConfig, List.of(kafkaBufferConfig.getTopic().getGroupId()));
this.executorService = Executors.newFixedThreadPool(consumers.size());
consumers.forEach(this.executorService::submit);

Expand Down Expand Up @@ -130,10 +130,7 @@ public void doCheckpoint(CheckpointState checkpointState) {

@Override
public boolean isEmpty() {
final boolean areTopicsEmpty = emptyCheckingConsumers.stream()
.allMatch(KafkaCustomConsumer::isTopicEmpty);

return areTopicsEmpty && innerBuffer.isEmpty();
return kafkaAdminAccessor.areTopicsEmpty() && innerBuffer.isEmpty();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.RecordDeserializationException;
Expand Down Expand Up @@ -56,7 +55,6 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static org.apache.commons.lang3.exception.ExceptionUtils.getRootCause;

Expand Down Expand Up @@ -95,7 +93,6 @@ public class KafkaCustomConsumer implements Runnable, ConsumerRebalanceListener
private long numRecordsCommitted = 0;
private final LogRateLimiter errLogRateLimiter;
private final ByteDecoder byteDecoder;
private final TopicEmptinessMetadata topicEmptinessMetadata;

public KafkaCustomConsumer(final KafkaConsumer consumer,
final AtomicBoolean shutdownInProgress,
Expand All @@ -106,7 +103,6 @@ public KafkaCustomConsumer(final KafkaConsumer consumer,
final AcknowledgementSetManager acknowledgementSetManager,
final ByteDecoder byteDecoder,
final KafkaTopicConsumerMetrics topicMetrics,
final TopicEmptinessMetadata topicEmptinessMetadata,
final PauseConsumePredicate pauseConsumePredicate) {
this.topicName = topicConfig.getName();
this.topicConfig = topicConfig;
Expand All @@ -132,7 +128,6 @@ public KafkaCustomConsumer(final KafkaConsumer consumer,
this.lastCommitTime = System.currentTimeMillis();
this.numberOfAcksPending = new AtomicInteger(0);
this.errLogRateLimiter = new LogRateLimiter(2, System.currentTimeMillis());
this.topicEmptinessMetadata = topicEmptinessMetadata;
}

KafkaTopicConsumerMetrics getTopicMetrics() {
Expand Down Expand Up @@ -537,41 +532,4 @@ final String getTopicPartitionOffset(final Map<TopicPartition, Long> offsetMap,
final Long offset = offsetMap.get(topicPartition);
return Objects.isNull(offset) ? "-" : offset.toString();
}

public synchronized boolean isTopicEmpty() {
final long currentThreadId = Thread.currentThread().getId();
if (Objects.isNull(topicEmptinessMetadata.getTopicEmptyCheckingOwnerThreadId())) {
topicEmptinessMetadata.setTopicEmptyCheckingOwnerThreadId(currentThreadId);
}

if (currentThreadId != topicEmptinessMetadata.getTopicEmptyCheckingOwnerThreadId() ||
topicEmptinessMetadata.isWithinCheckInterval(System.currentTimeMillis())) {
return topicEmptinessMetadata.isTopicEmpty();
}

final List<PartitionInfo> partitions = consumer.partitionsFor(topicName);
final List<TopicPartition> topicPartitions = partitions.stream()
.map(partitionInfo -> new TopicPartition(topicName, partitionInfo.partition()))
.collect(Collectors.toList());

final Map<TopicPartition, OffsetAndMetadata> committedOffsets = consumer.committed(new HashSet<>(topicPartitions));
final Map<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitions);

for (TopicPartition topicPartition : topicPartitions) {
final OffsetAndMetadata offsetAndMetadata = committedOffsets.get(topicPartition);
final Long endOffset = endOffsets.get(topicPartition);
topicEmptinessMetadata.updateTopicEmptinessStatus(topicPartition, true);

// If there is data in the partition
if (endOffset != 0L) {
// If there is no committed offset for the partition or the committed offset is behind the end offset
if (Objects.isNull(offsetAndMetadata) || offsetAndMetadata.offset() < endOffset) {
topicEmptinessMetadata.updateTopicEmptinessStatus(topicPartition, false);
}
}
}

topicEmptinessMetadata.setLastIsEmptyCheckTime(System.currentTimeMillis());
return topicEmptinessMetadata.isTopicEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ public List<KafkaCustomConsumer> createConsumersForTopic(final KafkaConsumerConf

try {
final int numWorkers = topic.getWorkers();
final TopicEmptinessMetadata topicEmptinessMetadata = new TopicEmptinessMetadata();
IntStream.range(0, numWorkers).forEach(index -> {
KafkaDataConfig dataConfig = new KafkaDataConfigAdapter(keyFactory, topic);
Deserializer<Object> keyDeserializer = (Deserializer<Object>) serializationFactory.getDeserializer(PlaintextKafkaDataConfig.plaintextDataConfig(dataConfig));
Expand All @@ -105,7 +104,7 @@ public List<KafkaCustomConsumer> createConsumersForTopic(final KafkaConsumerConf
final KafkaConsumer kafkaConsumer = new KafkaConsumer<>(consumerProperties, keyDeserializer, valueDeserializer);

consumers.add(new KafkaCustomConsumer(kafkaConsumer, shutdownInProgress, buffer, kafkaConsumerConfig, topic,
schemaType, acknowledgementSetManager, byteDecoder, topicMetrics, topicEmptinessMetadata, pauseConsumePredicate));
schemaType, acknowledgementSetManager, byteDecoder, topicMetrics, pauseConsumePredicate));

});
} catch (Exception e) {
Expand Down Expand Up @@ -183,7 +182,6 @@ private void setSchemaRegistryProperties(final KafkaConsumerConfig kafkaConsumer
private void setPropertiesForPlaintextAndJsonWithoutSchemaRegistry(Properties properties, final TopicConfig topicConfig) {
MessageFormat dataFormat = topicConfig.getSerdeFormat();
schemaType = dataFormat.toString();
LOG.error("Setting schemaType to {}", schemaType);
}

private void setPropertiesForSchemaRegistryConnectivity(final KafkaConsumerConfig kafkaConsumerConfig, final Properties properties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig;
import org.opensearch.dataprepper.plugins.kafka.consumer.KafkaCustomConsumer;
import org.opensearch.dataprepper.plugins.kafka.consumer.PauseConsumePredicate;
import org.opensearch.dataprepper.plugins.kafka.consumer.TopicEmptinessMetadata;
import org.opensearch.dataprepper.plugins.kafka.extension.KafkaClusterConfigSupplier;
import org.opensearch.dataprepper.plugins.kafka.util.ClientDNSLookupType;
import org.opensearch.dataprepper.plugins.kafka.util.KafkaSecurityConfigurer;
Expand Down Expand Up @@ -118,7 +117,6 @@ public void start(Buffer<Record<Event>> buffer) {
int numWorkers = topic.getWorkers();
executorService = Executors.newFixedThreadPool(numWorkers);
allTopicExecutorServices.add(executorService);
final TopicEmptinessMetadata topicEmptinessMetadata = new TopicEmptinessMetadata();

IntStream.range(0, numWorkers).forEach(index -> {
while (true) {
Expand All @@ -142,7 +140,7 @@ public void start(Buffer<Record<Event>> buffer) {

}
consumer = new KafkaCustomConsumer(kafkaConsumer, shutdownInProgress, buffer, sourceConfig, topic, schemaType,
acknowledgementSetManager, null, topicMetrics, topicEmptinessMetadata, PauseConsumePredicate.noPause());
acknowledgementSetManager, null, topicMetrics, PauseConsumePredicate.noPause());
allTopicConsumers.add(consumer);

executorService.submit(consumer);
Expand Down
Loading

0 comments on commit 4867886

Please sign in to comment.