diff --git a/src/main/java/com/linkedin/xinfra/monitor/common/Utils.java b/src/main/java/com/linkedin/xinfra/monitor/common/Utils.java index 8c5aeffc..83e8c2e3 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/common/Utils.java +++ b/src/main/java/com/linkedin/xinfra/monitor/common/Utils.java @@ -16,6 +16,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.lang.management.ManagementFactory; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -276,4 +277,12 @@ public static List getMBeanAttributeValues(String mbeanExpr } return values; } + + public static void delay(Duration duration) { + try { + Thread.sleep(duration.toMillis()); + } catch (InterruptedException e) { + LOG.warn("While trying to sleep for {} millis. Got:", duration.toMillis(), e); + } + } } diff --git a/src/main/java/com/linkedin/xinfra/monitor/services/AbstractService.java b/src/main/java/com/linkedin/xinfra/monitor/services/AbstractService.java new file mode 100644 index 00000000..45d88317 --- /dev/null +++ b/src/main/java/com/linkedin/xinfra/monitor/services/AbstractService.java @@ -0,0 +1,80 @@ +/** + * Copyright 2020 LinkedIn Corp. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this + * file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ + +package com.linkedin.xinfra.monitor.services; + +import com.linkedin.xinfra.monitor.common.Utils; +import java.time.Duration; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.DescribeTopicsResult; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.common.KafkaFuture; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public abstract class AbstractService implements Service { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractService.class); + // Below fields are used for the topic description retry logic since sometimes it takes a while for the admin clint + // to discover a topic due to the fact that Kafka's metadata is eventually consistent. The retry logic is particularly + // helpful to avoid exceptions when a new topic gets created since it takes even longer for the admin client to discover + // the newly created topic + private final int _describeTopicRetries; + private final Duration _describeTopicRetryInterval; + + AbstractService(int describeTopicRetries, Duration describeTopicRetryInterval) { + if (describeTopicRetries < 1) { + throw new IllegalArgumentException("Expect retry greater 0. Got: " + describeTopicRetries); + } + _describeTopicRetries = describeTopicRetries; + _describeTopicRetryInterval = describeTopicRetryInterval; + } + + TopicDescription getTopicDescription(AdminClient adminClient, String topic) { + int attemptCount = 0; + TopicDescription topicDescription = null; + Exception exception = null; + + while (attemptCount < _describeTopicRetries) { + DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singleton(topic)); + Map> topicResultValues = describeTopicsResult.values(); + KafkaFuture topicDescriptionKafkaFuture = topicResultValues.get(topic); + topicDescription = null; + exception = null; + try { + topicDescription = topicDescriptionKafkaFuture.get(); + } catch (InterruptedException | ExecutionException e) { + exception = e; + } + if (exception != null) { + LOG.error("Exception occurred while getting the topicDescriptionKafkaFuture for topic: {} at attempt {}", topic, + attemptCount, exception); + } else if (topicDescription == null) { + LOG.warn("Got null description for topic {} at attempt {}", topic, attemptCount); + } else { + return topicDescription; + } + attemptCount++; + if (attemptCount < _describeTopicRetries) { + Utils.delay(_describeTopicRetryInterval); + } + } + + if (exception != null) { + throw new IllegalStateException(exception); + } else { + throw new IllegalStateException(String.format("Got null description for topic %s after %d retry(s)", topic, _describeTopicRetries)); + } + } +} diff --git a/src/main/java/com/linkedin/xinfra/monitor/services/ConsumeService.java b/src/main/java/com/linkedin/xinfra/monitor/services/ConsumeService.java index d251256a..63caff1c 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/services/ConsumeService.java +++ b/src/main/java/com/linkedin/xinfra/monitor/services/ConsumeService.java @@ -17,8 +17,8 @@ import com.linkedin.xinfra.monitor.services.metrics.CommitAvailabilityMetrics; import com.linkedin.xinfra.monitor.services.metrics.CommitLatencyMetrics; import com.linkedin.xinfra.monitor.services.metrics.ConsumeMetrics; +import java.time.Duration; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -28,24 +28,21 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.avro.generic.GenericRecord; import org.apache.kafka.clients.admin.AdminClient; -import org.apache.kafka.clients.admin.DescribeTopicsResult; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetCommitCallback; -import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.JmxReporter; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.MetricsReporter; -import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.CumulativeSum; import org.apache.kafka.common.utils.SystemTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class ConsumeService implements Service { +public class ConsumeService extends AbstractService { private static final Logger LOG = LoggerFactory.getLogger(ConsumeService.class); private static final String TAGS_NAME = "name"; private static final long COMMIT_TIME_INTERVAL = 4; @@ -83,6 +80,8 @@ public ConsumeService(String name, CompletableFuture topicPartitionResult, ConsumerFactory consumerFactory) throws ExecutionException, InterruptedException { + // TODO: Make values of below fields come from configs + super(10, Duration.ofMinutes(1)); _baseConsumer = consumerFactory.baseConsumer(); _latencySlaMs = consumerFactory.latencySlaMs(); _name = name; @@ -231,19 +230,10 @@ public synchronized void start() { _consumeThread.start(); LOG.info("{}/ConsumeService started.", _name); - Sensor topicPartitionCount = metrics.sensor("topic-partitions"); - DescribeTopicsResult describeTopicsResult = _adminClient.describeTopics(Collections.singleton(_topic)); - Map> topicResultValues = describeTopicsResult.values(); - KafkaFuture topicDescriptionKafkaFuture = topicResultValues.get(_topic); - TopicDescription topicDescription = null; - try { - topicDescription = topicDescriptionKafkaFuture.get(); - } catch (InterruptedException | ExecutionException e) { - LOG.error("Exception occurred while getting the topicDescriptionKafkaFuture for topic: {}", _topic, e); - } + TopicDescription topicDescription = getTopicDescription(_adminClient, _topic); @SuppressWarnings("ConstantConditions") double partitionCount = topicDescription.partitions().size(); - topicPartitionCount.add( + metrics.sensor("topic-partitions").add( new MetricName("topic-partitions-count", METRIC_GROUP_NAME, "The total number of partitions for the topic.", tags), new CumulativeSum(partitionCount)); } diff --git a/src/main/java/com/linkedin/xinfra/monitor/services/ProduceService.java b/src/main/java/com/linkedin/xinfra/monitor/services/ProduceService.java index c16da3d0..28e49242 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/services/ProduceService.java +++ b/src/main/java/com/linkedin/xinfra/monitor/services/ProduceService.java @@ -16,6 +16,7 @@ import com.linkedin.xinfra.monitor.producer.NewProducer; import com.linkedin.xinfra.monitor.services.configs.ProduceServiceConfig; import com.linkedin.xinfra.monitor.services.metrics.ProduceMetrics; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -36,9 +37,7 @@ import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.config.ConfigException; -import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.metrics.JmxReporter; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; @@ -48,7 +47,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings("rawtypes") -public class ProduceService implements Service { +public class ProduceService extends AbstractService { private static final Logger LOG = LoggerFactory.getLogger(ProduceService.class); private static final String[] NON_OVERRIDABLE_PROPERTIES = new String[]{ ProduceServiceConfig.BOOTSTRAP_SERVERS_CONFIG, @@ -78,6 +77,8 @@ public class ProduceService implements Service { private static final String KEY_SERIALIZER_CLASS = "org.apache.kafka.common.serialization.StringSerializer"; public ProduceService(Map props, String name) throws Exception { + // TODO: Make values of below fields come from configs + super(10, Duration.ofMinutes(1)); _name = name; ProduceServiceConfig config = new ProduceServiceConfig(props); _brokerList = config.getString(ProduceServiceConfig.BOOTSTRAP_SERVERS_CONFIG); @@ -156,16 +157,11 @@ private void initializeProducer(Map props) throws Exception { @Override public synchronized void start() { if (_running.compareAndSet(false, true)) { - try { - KafkaFuture> topicDescriptionsFuture = _adminClient.describeTopics(Collections.singleton(_topic)).all(); - Map topicDescriptions = topicDescriptionsFuture.get(); - int partitionNum = topicDescriptions.get(_topic).partitions().size(); - initializeStateForPartitions(partitionNum); - _handleNewPartitionsExecutor.scheduleWithFixedDelay(new NewPartitionHandler(), 1, 30, TimeUnit.SECONDS); - LOG.info("{}/ProduceService started", _name); - } catch (InterruptedException | UnknownTopicOrPartitionException | ExecutionException e) { - LOG.error("Exception occurred while starting produce service for topic: {}", _topic, e); - } + TopicDescription topicDescription = getTopicDescription(_adminClient, _topic); + int partitionNum = topicDescription.partitions().size(); + initializeStateForPartitions(partitionNum); + _handleNewPartitionsExecutor.scheduleWithFixedDelay(new NewPartitionHandler(), 1, 30, TimeUnit.SECONDS); + LOG.info("{}/ProduceService started", _name); } }