Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Making Xinfra Monitor highly available #355

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Open
72 changes: 53 additions & 19 deletions src/main/java/com/linkedin/xinfra/monitor/XinfraMonitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.linkedin.xinfra.monitor.apps.App;
import com.linkedin.xinfra.monitor.services.HAMonitoringService;
import com.linkedin.xinfra.monitor.services.HAMonitoringServiceFactory;
import com.linkedin.xinfra.monitor.services.Service;
import com.linkedin.xinfra.monitor.services.ServiceFactory;
import java.io.BufferedReader;
Expand Down Expand Up @@ -47,9 +49,11 @@ public class XinfraMonitor {
private final ConcurrentMap<String, App> _apps;
private final ConcurrentMap<String, Service> _services;
private final ConcurrentMap<String, Object> _offlineRunnables;
private final ScheduledExecutorService _executor;
private ScheduledExecutorService _executor;
/** When true start has been called on this instance of Xinfra Monitor. */
private final AtomicBoolean _isRunning = new AtomicBoolean(false);
/** When true user has provided config to run monitor in highly available mode */
private Boolean _isRunningHA = false;

/**
* XinfraMonitor constructor creates apps and services for each of the individual clusters (properties) that's passed in.
Expand All @@ -64,34 +68,57 @@ public XinfraMonitor(Map<String, Map> allClusterProps) throws Exception {
_apps = new ConcurrentHashMap<>();
_services = new ConcurrentHashMap<>();

_offlineRunnables = new ConcurrentHashMap<>();
List<MetricsReporter> reporters = new ArrayList<>();
reporters.add(new JmxReporter(XinfraMonitorConstants.JMX_PREFIX));
Metrics metrics = new Metrics(new MetricConfig(), reporters, new SystemTime());
metrics.addMetric(metrics.metricName("offline-runnable-count", XinfraMonitorConstants.METRIC_GROUP_NAME, "The number of Service/App that are not fully running"),
(config, now) -> _offlineRunnables.size());

for (Map.Entry<String, Map> clusterProperty : allClusterProps.entrySet()) {
String clusterName = clusterProperty.getKey();
String name = clusterProperty.getKey();
Map props = clusterProperty.getValue();
if (!props.containsKey(XinfraMonitorConstants.CLASS_NAME_CONFIG))
throw new IllegalArgumentException(clusterName + " is not configured with " + XinfraMonitorConstants.CLASS_NAME_CONFIG);
throw new IllegalArgumentException(name + " is not configured with " + XinfraMonitorConstants.CLASS_NAME_CONFIG);
String className = (String) props.get(XinfraMonitorConstants.CLASS_NAME_CONFIG);

Class<?> aClass = Class.forName(className);

/**
* If HA config is specified, create Runnables for the service to start and stop the monitor
*/
if (HAMonitoringService.class.isAssignableFrom(aClass)) {
_isRunningHA = true;
Runnable startMonitor = () -> {
try {
LOG.info("HAXinfraMonitor starting...");
this.start();
LOG.info("HAXinfraMonitor started.");
} catch (Exception e) {
throw new IllegalStateException("Error starting HAXinfraMonitor", e);
}
};
Runnable stopMonitor = () -> {
this.stop();
LOG.info("HAXinfraMonitor stopped.");
};

props.put(HAMonitoringServiceFactory.STARTMONITOR, startMonitor);
props.put(HAMonitoringServiceFactory.STOPMONITOR, stopMonitor);
}

if (App.class.isAssignableFrom(aClass)) {
App clusterApp = (App) Class.forName(className).getConstructor(Map.class, String.class).newInstance(props, clusterName);
_apps.put(clusterName, clusterApp);
App clusterApp = (App) Class.forName(className).getConstructor(Map.class, String.class).newInstance(props, name);
_apps.put(name, clusterApp);
} else if (Service.class.isAssignableFrom(aClass)) {
ServiceFactory serviceFactory = (ServiceFactory) Class.forName(className + XinfraMonitorConstants.FACTORY)
.getConstructor(Map.class, String.class)
.newInstance(props, clusterName);
.getConstructor(Map.class, String.class)
.newInstance(props, name);
Service service = serviceFactory.createService();
_services.put(clusterName, service);
_services.put(name, service);
} else {
throw new IllegalArgumentException(className + " should implement either " + App.class.getSimpleName() + " or " + Service.class.getSimpleName());
}
}
_executor = Executors.newSingleThreadScheduledExecutor();
_offlineRunnables = new ConcurrentHashMap<>();
List<MetricsReporter> reporters = new ArrayList<>();
reporters.add(new JmxReporter(XinfraMonitorConstants.JMX_PREFIX));
Metrics metrics = new Metrics(new MetricConfig(), reporters, new SystemTime());
metrics.addMetric(metrics.metricName("offline-runnable-count", XinfraMonitorConstants.METRIC_GROUP_NAME, "The number of Service/App that are not fully running"),
(config, now) -> _offlineRunnables.size());
}

private boolean constructorContainsClass(Constructor<?>[] constructors, Class<?> classObject) {
Expand All @@ -117,6 +144,7 @@ public synchronized void start() throws Exception {
long initialDelaySecond = 5;
long periodSecond = 5;

_executor = Executors.newSingleThreadScheduledExecutor();
_executor.scheduleAtFixedRate(() -> {
try {
checkHealth();
Expand Down Expand Up @@ -184,10 +212,16 @@ public static void main(String[] args) throws Exception {
@SuppressWarnings("unchecked")
Map<String, Map> props = new ObjectMapper().readValue(buffer.toString(), Map.class);
XinfraMonitor xinfraMonitor = new XinfraMonitor(props);
xinfraMonitor.start();
LOG.info("Xinfra Monitor has started.");

xinfraMonitor.awaitShutdown();
/**
* If HA version is not running, start the monitor
* HA version will start the monitor if this instance is selected by the coordinator
*/
if (!xinfraMonitor._isRunningHA) {
xinfraMonitor.start();
LOG.info("Xinfra Monitor has started.");
xinfraMonitor.awaitShutdown();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import java.util.Collection;

/**
* Wraps around the new consumer from Apache Kafka and implements the #KMBaseConsumer interface
Expand All @@ -49,7 +50,7 @@ public NewConsumer(String topic, Properties consumerProperties, AdminClient admi
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, configureGroupId(targetConsumerGroupId, adminClient));
}
_consumer = new KafkaConsumer<>(consumerProperties);
_consumer.subscribe(Collections.singletonList(topic));
_consumer.subscribe(Collections.singletonList(topic), new KMRebalance(_consumer));
}

static String configureGroupId(String targetConsumerGroupId, AdminClient adminClient)
Expand Down Expand Up @@ -102,4 +103,16 @@ public long lastCommitted() {
public void updateLastCommit() {
lastCommitted = System.currentTimeMillis();
}

private class KMRebalance implements ConsumerRebalanceListener {
private final KafkaConsumer<String, String> _consumer;
public KMRebalance(KafkaConsumer consumer) {
_consumer = consumer;
}
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
}
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
_consumer.seekToEnd(partitions);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public class ClusterTopicManipulationService implements Service {
private static final Logger LOGGER = LoggerFactory.getLogger(ClusterTopicManipulationService.class);
private final String _configDefinedServiceName;
private final Duration _reportIntervalSecond;
private final ScheduledExecutorService _executor;
private ScheduledExecutorService _executor;
private final AdminClient _adminClient;
private boolean _isOngoingTopicCreationDone;
private boolean _isOngoingTopicDeletionDone;
Expand All @@ -77,7 +77,6 @@ public ClusterTopicManipulationService(String name, AdminClient adminClient, Map
_isOngoingTopicCreationDone = true;
_isOngoingTopicDeletionDone = true;
_adminClient = adminClient;
_executor = Executors.newSingleThreadScheduledExecutor();
_reportIntervalSecond = Duration.ofSeconds(1);
_running = new AtomicBoolean(false);
_configDefinedServiceName = name;
Expand Down Expand Up @@ -114,6 +113,7 @@ public void start() {
this.getClass().getCanonicalName());
Runnable clusterTopicManipulationServiceRunnable = new ClusterTopicManipulationServiceRunnable();

_executor = Executors.newSingleThreadScheduledExecutor();
_executor.scheduleAtFixedRate(clusterTopicManipulationServiceRunnable, _reportIntervalSecond.getSeconds(),
_reportIntervalSecond.getSeconds(), TimeUnit.SECONDS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class ConsumeService extends AbstractService {
private static final long CONSUME_THREAD_SLEEP_MS = 100;
private static Metrics metrics;
private final AtomicBoolean _running;
private final KMBaseConsumer _baseConsumer;
private KMBaseConsumer _baseConsumer;
private final int _latencySlaMs;
private ConsumeMetrics _sensors;
private Thread _consumeThread;
Expand All @@ -60,6 +60,9 @@ public class ConsumeService extends AbstractService {
private final String _name;
private static final String METRIC_GROUP_NAME = "consume-service";
private static Map<String, String> tags;
private CompletableFuture<Void> _topicPartitionFuture;
private ConsumerFactory _consumerFactory;
private CompletableFuture<Void> _topicPartitionResult;

/**
* Mainly contains services for three metrics:
Expand All @@ -80,41 +83,14 @@ public ConsumeService(String name,
CompletableFuture<Void> topicPartitionResult,
ConsumerFactory consumerFactory)
throws ExecutionException, InterruptedException {
// TODO: Make values of below fields come from configs
// TODO: Make values of below fields come from configs
super(10, Duration.ofMinutes(1));
_baseConsumer = consumerFactory.baseConsumer();
_latencySlaMs = consumerFactory.latencySlaMs();
_name = name;
_adminClient = consumerFactory.adminClient();
_running = new AtomicBoolean(false);

// Returns a new CompletionStage (topicPartitionFuture) which
// executes the given action - code inside run() - when this stage (topicPartitionResult) completes normally,.
CompletableFuture<Void> topicPartitionFuture = topicPartitionResult.thenRun(() -> {
MetricConfig metricConfig = new MetricConfig().samples(60).timeWindow(1000, TimeUnit.MILLISECONDS);
List<MetricsReporter> reporters = new ArrayList<>();
reporters.add(new JmxReporter(JMX_PREFIX));
metrics = new Metrics(metricConfig, reporters, new SystemTime());
tags = new HashMap<>();
tags.put(TAGS_NAME, name);
_topic = consumerFactory.topic();
_sensors = new ConsumeMetrics(metrics, tags, consumerFactory.latencyPercentileMaxMs(),
consumerFactory.latencyPercentileGranularityMs());
_commitLatencyMetrics = new CommitLatencyMetrics(metrics, tags, consumerFactory.latencyPercentileMaxMs(),
consumerFactory.latencyPercentileGranularityMs());
_commitAvailabilityMetrics = new CommitAvailabilityMetrics(metrics, tags);
_consumeThread = new Thread(() -> {
try {
consume();
} catch (Exception e) {
LOG.error(name + "/ConsumeService failed", e);
}
}, name + " consume-service");
_consumeThread.setDaemon(true);
});

// In a blocking fashion, waits for this topicPartitionFuture to complete, and then returns its result.
topicPartitionFuture.get();
_topicPartitionResult = topicPartitionResult;
_consumerFactory = consumerFactory;
}

private void consume() throws Exception {
Expand Down Expand Up @@ -227,6 +203,34 @@ void startConsumeThreadForTesting() {
@Override
public synchronized void start() {
if (_running.compareAndSet(false, true)) {
try {
_baseConsumer = _consumerFactory.baseConsumer();

_topicPartitionFuture = _topicPartitionResult.thenRun(() -> {
MetricConfig metricConfig = new MetricConfig().samples(60).timeWindow(1000, TimeUnit.MILLISECONDS);
List<MetricsReporter> reporters = new ArrayList<>();
reporters.add(new JmxReporter(JMX_PREFIX));
metrics = new Metrics(metricConfig, reporters, new SystemTime());
tags = new HashMap<>();
tags.put(TAGS_NAME, _name);
_topic = _consumerFactory.topic();
_sensors = new ConsumeMetrics(metrics, tags, _consumerFactory.latencyPercentileMaxMs(), _consumerFactory.latencyPercentileGranularityMs());
_commitLatencyMetrics = new CommitLatencyMetrics(metrics, tags, _consumerFactory.latencyPercentileMaxMs(), _consumerFactory.latencyPercentileGranularityMs());
_commitAvailabilityMetrics = new CommitAvailabilityMetrics(metrics, tags);
_consumeThread = new Thread(() -> {
try {
consume();
} catch (Exception e) {
LOG.error(_name + "/ConsumeService failed", e);
}
}, _name + " consume-service");
_consumeThread.setDaemon(true);
});

_topicPartitionFuture.get();
} catch (Exception e) {
LOG.error("Error trying to start ConsumeService", e);
}
_consumeThread.start();
LOG.info("{}/ConsumeService started.", _name);

Expand All @@ -243,6 +247,7 @@ public synchronized void start() {
public synchronized void stop() {
if (_running.compareAndSet(true, false)) {
try {
_consumeThread.join();
_baseConsumer.close();
} catch (Exception e) {
LOG.warn(_name + "/ConsumeService while trying to close consumer.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@


public class ConsumerFactoryImpl implements ConsumerFactory {
private final KMBaseConsumer _baseConsumer;
private final String _topic;
private KMBaseConsumer _baseConsumer;
private String _topic;
private static final String FALSE = "false";
private final int _latencyPercentileMaxMs;
private final int _latencyPercentileGranularityMs;
Expand All @@ -37,6 +37,8 @@ public class ConsumerFactoryImpl implements ConsumerFactory {
private final int _latencySlaMs;
private static AdminClient adminClient;
private static final Logger LOG = LoggerFactory.getLogger(ConsumerFactoryImpl.class);
private String _consumerClassName;
private final Properties consumerProps = new Properties();

@SuppressWarnings("rawtypes")
public ConsumerFactoryImpl(Map<String, Object> props) throws Exception {
Expand All @@ -48,7 +50,7 @@ public ConsumerFactoryImpl(Map<String, Object> props) throws Exception {
_topic = config.getString(ConsumeServiceConfig.TOPIC_CONFIG);
String zkConnect = config.getString(ConsumeServiceConfig.ZOOKEEPER_CONNECT_CONFIG);
String brokerList = config.getString(ConsumeServiceConfig.BOOTSTRAP_SERVERS_CONFIG);
String consumerClassName = config.getString(ConsumeServiceConfig.CONSUMER_CLASS_CONFIG);
_consumerClassName = config.getString(ConsumeServiceConfig.CONSUMER_CLASS_CONFIG);
_latencySlaMs = config.getInt(ConsumeServiceConfig.LATENCY_SLA_MS_CONFIG);
_latencyPercentileMaxMs = config.getInt(ConsumeServiceConfig.LATENCY_PERCENTILE_MAX_MS_CONFIG);
_latencyPercentileGranularityMs = config.getInt(ConsumeServiceConfig.LATENCY_PERCENTILE_GRANULARITY_MS_CONFIG);
Expand All @@ -57,7 +59,6 @@ public ConsumerFactoryImpl(Map<String, Object> props) throws Exception {
throw new ConfigException("Override must not contain " + property + " config.");
}
}
Properties consumerProps = new Properties();

/* Assign default config. This has the lowest priority. */
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, FALSE);
Expand All @@ -66,8 +67,8 @@ public ConsumerFactoryImpl(Map<String, Object> props) throws Exception {
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "kmf-consumer-group-" + new Random().nextInt());
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
if (consumerClassName.equals(NewConsumer.class.getCanonicalName()) || consumerClassName.equals(NewConsumer.class.getSimpleName())) {
consumerClassName = NewConsumer.class.getCanonicalName();
if (_consumerClassName.equals(NewConsumer.class.getCanonicalName()) || _consumerClassName.equals(NewConsumer.class.getSimpleName())) {
_consumerClassName = NewConsumer.class.getCanonicalName();
}

/* Assign config specified for ConsumeService. */
Expand All @@ -80,16 +81,6 @@ public ConsumerFactoryImpl(Map<String, Object> props) throws Exception {
if (props.containsKey(ConsumeServiceConfig.CONSUMER_PROPS_CONFIG)) {
props.forEach(consumerProps::putIfAbsent);
}

java.lang.reflect.Constructor<?> constructor = adminClientConstructorIfExists(consumerClassName);
if (constructor != null) {
_baseConsumer = (KMBaseConsumer) constructor
.newInstance(_topic, consumerProps, adminClient());
} else {
_baseConsumer = (KMBaseConsumer) Class.forName(consumerClassName)
.getConstructor(String.class, Properties.class)
.newInstance(_topic, consumerProps);
}
}

private static java.lang.reflect.Constructor<?> adminClientConstructorIfExists(String consumerClassName)
Expand Down Expand Up @@ -117,6 +108,19 @@ public int latencySlaMs() {

@Override
public KMBaseConsumer baseConsumer() {
try {
java.lang.reflect.Constructor<?> constructor = adminClientConstructorIfExists(_consumerClassName);
if (constructor != null) {
_baseConsumer = (KMBaseConsumer) constructor
.newInstance(_topic, consumerProps, adminClient());
} else {
_baseConsumer = (KMBaseConsumer) Class.forName(_consumerClassName)
.getConstructor(String.class, Properties.class)
.newInstance(_topic, consumerProps);
}
} catch (Exception e) {
LOG.error("Cannot create consumer", e);
}
return _baseConsumer;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,19 @@ public class DefaultMetricsReporterService implements Service {
private final String _name;
private final List<String> _metricNames;
private final int _reportIntervalSec;
private final ScheduledExecutorService _executor;
private ScheduledExecutorService _executor;

public DefaultMetricsReporterService(Map<String, Object> props, String name) {
_name = name;
DefaultMetricsReporterServiceConfig config = new DefaultMetricsReporterServiceConfig(props);
_metricNames = config.getList(DefaultMetricsReporterServiceConfig.REPORT_METRICS_CONFIG);
_reportIntervalSec = config.getInt(DefaultMetricsReporterServiceConfig.REPORT_INTERVAL_SEC_CONFIG);
_executor = Executors.newSingleThreadScheduledExecutor();
}

@Override
public synchronized void start() {
_executor = Executors.newSingleThreadScheduledExecutor();

_executor.scheduleAtFixedRate(() -> {
try {
reportMetrics();
Expand Down
Loading