Skip to content

Commit

Permalink
Add retry logic to the produce and consume services (#342)
Browse files Browse the repository at this point in the history
  • Loading branch information
Lincong Li authored Feb 12, 2021
1 parent 7f99c09 commit 2fa6df8
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 29 deletions.
9 changes: 9 additions & 0 deletions src/main/java/com/linkedin/xinfra/monitor/common/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -276,4 +277,12 @@ public static List<MbeanAttributeValue> getMBeanAttributeValues(String mbeanExpr
}
return values;
}

public static void delay(Duration duration) {
try {
Thread.sleep(duration.toMillis());
} catch (InterruptedException e) {
LOG.warn("While trying to sleep for {} millis. Got:", duration.toMillis(), e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/**
* Copyright 2020 LinkedIn Corp. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this
* file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/

package com.linkedin.xinfra.monitor.services;

import com.linkedin.xinfra.monitor.common.Utils;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public abstract class AbstractService implements Service {

private static final Logger LOG = LoggerFactory.getLogger(AbstractService.class);
// Below fields are used for the topic description retry logic since sometimes it takes a while for the admin clint
// to discover a topic due to the fact that Kafka's metadata is eventually consistent. The retry logic is particularly
// helpful to avoid exceptions when a new topic gets created since it takes even longer for the admin client to discover
// the newly created topic
private final int _describeTopicRetries;
private final Duration _describeTopicRetryInterval;

AbstractService(int describeTopicRetries, Duration describeTopicRetryInterval) {
if (describeTopicRetries < 1) {
throw new IllegalArgumentException("Expect retry greater 0. Got: " + describeTopicRetries);
}
_describeTopicRetries = describeTopicRetries;
_describeTopicRetryInterval = describeTopicRetryInterval;
}

TopicDescription getTopicDescription(AdminClient adminClient, String topic) {
int attemptCount = 0;
TopicDescription topicDescription = null;
Exception exception = null;

while (attemptCount < _describeTopicRetries) {
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singleton(topic));
Map<String, KafkaFuture<TopicDescription>> topicResultValues = describeTopicsResult.values();
KafkaFuture<TopicDescription> topicDescriptionKafkaFuture = topicResultValues.get(topic);
topicDescription = null;
exception = null;
try {
topicDescription = topicDescriptionKafkaFuture.get();
} catch (InterruptedException | ExecutionException e) {
exception = e;
}
if (exception != null) {
LOG.error("Exception occurred while getting the topicDescriptionKafkaFuture for topic: {} at attempt {}", topic,
attemptCount, exception);
} else if (topicDescription == null) {
LOG.warn("Got null description for topic {} at attempt {}", topic, attemptCount);
} else {
return topicDescription;
}
attemptCount++;
if (attemptCount < _describeTopicRetries) {
Utils.delay(_describeTopicRetryInterval);
}
}

if (exception != null) {
throw new IllegalStateException(exception);
} else {
throw new IllegalStateException(String.format("Got null description for topic %s after %d retry(s)", topic, _describeTopicRetries));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
import com.linkedin.xinfra.monitor.services.metrics.CommitAvailabilityMetrics;
import com.linkedin.xinfra.monitor.services.metrics.CommitLatencyMetrics;
import com.linkedin.xinfra.monitor.services.metrics.ConsumeMetrics;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -28,24 +28,21 @@
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.utils.SystemTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsumeService implements Service {
public class ConsumeService extends AbstractService {
private static final Logger LOG = LoggerFactory.getLogger(ConsumeService.class);
private static final String TAGS_NAME = "name";
private static final long COMMIT_TIME_INTERVAL = 4;
Expand Down Expand Up @@ -83,6 +80,8 @@ public ConsumeService(String name,
CompletableFuture<Void> topicPartitionResult,
ConsumerFactory consumerFactory)
throws ExecutionException, InterruptedException {
// TODO: Make values of below fields come from configs
super(10, Duration.ofMinutes(1));
_baseConsumer = consumerFactory.baseConsumer();
_latencySlaMs = consumerFactory.latencySlaMs();
_name = name;
Expand Down Expand Up @@ -231,19 +230,10 @@ public synchronized void start() {
_consumeThread.start();
LOG.info("{}/ConsumeService started.", _name);

Sensor topicPartitionCount = metrics.sensor("topic-partitions");
DescribeTopicsResult describeTopicsResult = _adminClient.describeTopics(Collections.singleton(_topic));
Map<String, KafkaFuture<TopicDescription>> topicResultValues = describeTopicsResult.values();
KafkaFuture<TopicDescription> topicDescriptionKafkaFuture = topicResultValues.get(_topic);
TopicDescription topicDescription = null;
try {
topicDescription = topicDescriptionKafkaFuture.get();
} catch (InterruptedException | ExecutionException e) {
LOG.error("Exception occurred while getting the topicDescriptionKafkaFuture for topic: {}", _topic, e);
}
TopicDescription topicDescription = getTopicDescription(_adminClient, _topic);
@SuppressWarnings("ConstantConditions")
double partitionCount = topicDescription.partitions().size();
topicPartitionCount.add(
metrics.sensor("topic-partitions").add(
new MetricName("topic-partitions-count", METRIC_GROUP_NAME, "The total number of partitions for the topic.",
tags), new CumulativeSum(partitionCount));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.linkedin.xinfra.monitor.producer.NewProducer;
import com.linkedin.xinfra.monitor.services.configs.ProduceServiceConfig;
import com.linkedin.xinfra.monitor.services.metrics.ProduceMetrics;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -36,9 +37,7 @@
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
Expand All @@ -48,7 +47,7 @@
import org.slf4j.LoggerFactory;

@SuppressWarnings("rawtypes")
public class ProduceService implements Service {
public class ProduceService extends AbstractService {
private static final Logger LOG = LoggerFactory.getLogger(ProduceService.class);
private static final String[] NON_OVERRIDABLE_PROPERTIES = new String[]{
ProduceServiceConfig.BOOTSTRAP_SERVERS_CONFIG,
Expand Down Expand Up @@ -78,6 +77,8 @@ public class ProduceService implements Service {
private static final String KEY_SERIALIZER_CLASS = "org.apache.kafka.common.serialization.StringSerializer";

public ProduceService(Map<String, Object> props, String name) throws Exception {
// TODO: Make values of below fields come from configs
super(10, Duration.ofMinutes(1));
_name = name;
ProduceServiceConfig config = new ProduceServiceConfig(props);
_brokerList = config.getString(ProduceServiceConfig.BOOTSTRAP_SERVERS_CONFIG);
Expand Down Expand Up @@ -156,16 +157,11 @@ private void initializeProducer(Map<String, Object> props) throws Exception {
@Override
public synchronized void start() {
if (_running.compareAndSet(false, true)) {
try {
KafkaFuture<Map<String, TopicDescription>> topicDescriptionsFuture = _adminClient.describeTopics(Collections.singleton(_topic)).all();
Map<String, TopicDescription> topicDescriptions = topicDescriptionsFuture.get();
int partitionNum = topicDescriptions.get(_topic).partitions().size();
initializeStateForPartitions(partitionNum);
_handleNewPartitionsExecutor.scheduleWithFixedDelay(new NewPartitionHandler(), 1, 30, TimeUnit.SECONDS);
LOG.info("{}/ProduceService started", _name);
} catch (InterruptedException | UnknownTopicOrPartitionException | ExecutionException e) {
LOG.error("Exception occurred while starting produce service for topic: {}", _topic, e);
}
TopicDescription topicDescription = getTopicDescription(_adminClient, _topic);
int partitionNum = topicDescription.partitions().size();
initializeStateForPartitions(partitionNum);
_handleNewPartitionsExecutor.scheduleWithFixedDelay(new NewPartitionHandler(), 1, 30, TimeUnit.SECONDS);
LOG.info("{}/ProduceService started", _name);
}
}

Expand Down

0 comments on commit 2fa6df8

Please sign in to comment.