diff --git a/src/main/java/com/linkedin/xinfra/monitor/XinfraMonitor.java b/src/main/java/com/linkedin/xinfra/monitor/XinfraMonitor.java index d516b076..21e7d689 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/XinfraMonitor.java +++ b/src/main/java/com/linkedin/xinfra/monitor/XinfraMonitor.java @@ -12,6 +12,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.linkedin.xinfra.monitor.apps.App; +import com.linkedin.xinfra.monitor.services.HAMonitoringService; +import com.linkedin.xinfra.monitor.services.HAMonitoringServiceFactory; import com.linkedin.xinfra.monitor.services.Service; import com.linkedin.xinfra.monitor.services.ServiceFactory; import java.io.BufferedReader; @@ -47,9 +49,11 @@ public class XinfraMonitor { private final ConcurrentMap _apps; private final ConcurrentMap _services; private final ConcurrentMap _offlineRunnables; - private final ScheduledExecutorService _executor; + private ScheduledExecutorService _executor; /** When true start has been called on this instance of Xinfra Monitor. */ private final AtomicBoolean _isRunning = new AtomicBoolean(false); + /** When true user has provided config to run monitor in highly available mode */ + private Boolean _isRunningHA = false; /** * XinfraMonitor constructor creates apps and services for each of the individual clusters (properties) that's passed in. @@ -64,34 +68,57 @@ public XinfraMonitor(Map allClusterProps) throws Exception { _apps = new ConcurrentHashMap<>(); _services = new ConcurrentHashMap<>(); + _offlineRunnables = new ConcurrentHashMap<>(); + List reporters = new ArrayList<>(); + reporters.add(new JmxReporter(XinfraMonitorConstants.JMX_PREFIX)); + Metrics metrics = new Metrics(new MetricConfig(), reporters, new SystemTime()); + metrics.addMetric(metrics.metricName("offline-runnable-count", XinfraMonitorConstants.METRIC_GROUP_NAME, "The number of Service/App that are not fully running"), + (config, now) -> _offlineRunnables.size()); + for (Map.Entry clusterProperty : allClusterProps.entrySet()) { - String clusterName = clusterProperty.getKey(); + String name = clusterProperty.getKey(); Map props = clusterProperty.getValue(); if (!props.containsKey(XinfraMonitorConstants.CLASS_NAME_CONFIG)) - throw new IllegalArgumentException(clusterName + " is not configured with " + XinfraMonitorConstants.CLASS_NAME_CONFIG); + throw new IllegalArgumentException(name + " is not configured with " + XinfraMonitorConstants.CLASS_NAME_CONFIG); String className = (String) props.get(XinfraMonitorConstants.CLASS_NAME_CONFIG); - Class aClass = Class.forName(className); + + /** + * If HA config is specified, create Runnables for the service to start and stop the monitor + */ + if (HAMonitoringService.class.isAssignableFrom(aClass)) { + _isRunningHA = true; + Runnable startMonitor = () -> { + try { + LOG.info("HAXinfraMonitor starting..."); + this.start(); + LOG.info("HAXinfraMonitor started."); + } catch (Exception e) { + throw new IllegalStateException("Error starting HAXinfraMonitor", e); + } + }; + Runnable stopMonitor = () -> { + this.stop(); + LOG.info("HAXinfraMonitor stopped."); + }; + + props.put(HAMonitoringServiceFactory.STARTMONITOR, startMonitor); + props.put(HAMonitoringServiceFactory.STOPMONITOR, stopMonitor); + } + if (App.class.isAssignableFrom(aClass)) { - App clusterApp = (App) Class.forName(className).getConstructor(Map.class, String.class).newInstance(props, clusterName); - _apps.put(clusterName, clusterApp); + App clusterApp = (App) Class.forName(className).getConstructor(Map.class, String.class).newInstance(props, name); + _apps.put(name, clusterApp); } else if (Service.class.isAssignableFrom(aClass)) { ServiceFactory serviceFactory = (ServiceFactory) Class.forName(className + XinfraMonitorConstants.FACTORY) - .getConstructor(Map.class, String.class) - .newInstance(props, clusterName); + .getConstructor(Map.class, String.class) + .newInstance(props, name); Service service = serviceFactory.createService(); - _services.put(clusterName, service); + _services.put(name, service); } else { throw new IllegalArgumentException(className + " should implement either " + App.class.getSimpleName() + " or " + Service.class.getSimpleName()); } } - _executor = Executors.newSingleThreadScheduledExecutor(); - _offlineRunnables = new ConcurrentHashMap<>(); - List reporters = new ArrayList<>(); - reporters.add(new JmxReporter(XinfraMonitorConstants.JMX_PREFIX)); - Metrics metrics = new Metrics(new MetricConfig(), reporters, new SystemTime()); - metrics.addMetric(metrics.metricName("offline-runnable-count", XinfraMonitorConstants.METRIC_GROUP_NAME, "The number of Service/App that are not fully running"), - (config, now) -> _offlineRunnables.size()); } private boolean constructorContainsClass(Constructor[] constructors, Class classObject) { @@ -117,6 +144,7 @@ public synchronized void start() throws Exception { long initialDelaySecond = 5; long periodSecond = 5; + _executor = Executors.newSingleThreadScheduledExecutor(); _executor.scheduleAtFixedRate(() -> { try { checkHealth(); @@ -184,10 +212,16 @@ public static void main(String[] args) throws Exception { @SuppressWarnings("unchecked") Map props = new ObjectMapper().readValue(buffer.toString(), Map.class); XinfraMonitor xinfraMonitor = new XinfraMonitor(props); - xinfraMonitor.start(); - LOG.info("Xinfra Monitor has started."); - xinfraMonitor.awaitShutdown(); + /** + * If HA version is not running, start the monitor + * HA version will start the monitor if this instance is selected by the coordinator + */ + if (!xinfraMonitor._isRunningHA) { + xinfraMonitor.start(); + LOG.info("Xinfra Monitor has started."); + xinfraMonitor.awaitShutdown(); + } } } diff --git a/src/main/java/com/linkedin/xinfra/monitor/consumer/NewConsumer.java b/src/main/java/com/linkedin/xinfra/monitor/consumer/NewConsumer.java index e958d43c..a13411b9 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/consumer/NewConsumer.java +++ b/src/main/java/com/linkedin/xinfra/monitor/consumer/NewConsumer.java @@ -26,7 +26,8 @@ import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import java.util.Collection; /** * Wraps around the new consumer from Apache Kafka and implements the #KMBaseConsumer interface @@ -49,7 +50,7 @@ public NewConsumer(String topic, Properties consumerProperties, AdminClient admi consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, configureGroupId(targetConsumerGroupId, adminClient)); } _consumer = new KafkaConsumer<>(consumerProperties); - _consumer.subscribe(Collections.singletonList(topic)); + _consumer.subscribe(Collections.singletonList(topic), new KMRebalance(_consumer)); } static String configureGroupId(String targetConsumerGroupId, AdminClient adminClient) @@ -102,4 +103,16 @@ public long lastCommitted() { public void updateLastCommit() { lastCommitted = System.currentTimeMillis(); } + + private class KMRebalance implements ConsumerRebalanceListener { + private final KafkaConsumer _consumer; + public KMRebalance(KafkaConsumer consumer) { + _consumer = consumer; + } + public void onPartitionsRevoked(Collection partitions) { + } + public void onPartitionsAssigned(Collection partitions) { + _consumer.seekToEnd(partitions); + } + } } diff --git a/src/main/java/com/linkedin/xinfra/monitor/services/ClusterTopicManipulationService.java b/src/main/java/com/linkedin/xinfra/monitor/services/ClusterTopicManipulationService.java index 56c3ddc2..13edc8d7 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/services/ClusterTopicManipulationService.java +++ b/src/main/java/com/linkedin/xinfra/monitor/services/ClusterTopicManipulationService.java @@ -57,7 +57,7 @@ public class ClusterTopicManipulationService implements Service { private static final Logger LOGGER = LoggerFactory.getLogger(ClusterTopicManipulationService.class); private final String _configDefinedServiceName; private final Duration _reportIntervalSecond; - private final ScheduledExecutorService _executor; + private ScheduledExecutorService _executor; private final AdminClient _adminClient; private boolean _isOngoingTopicCreationDone; private boolean _isOngoingTopicDeletionDone; @@ -77,7 +77,6 @@ public ClusterTopicManipulationService(String name, AdminClient adminClient, Map _isOngoingTopicCreationDone = true; _isOngoingTopicDeletionDone = true; _adminClient = adminClient; - _executor = Executors.newSingleThreadScheduledExecutor(); _reportIntervalSecond = Duration.ofSeconds(1); _running = new AtomicBoolean(false); _configDefinedServiceName = name; @@ -114,6 +113,7 @@ public void start() { this.getClass().getCanonicalName()); Runnable clusterTopicManipulationServiceRunnable = new ClusterTopicManipulationServiceRunnable(); + _executor = Executors.newSingleThreadScheduledExecutor(); _executor.scheduleAtFixedRate(clusterTopicManipulationServiceRunnable, _reportIntervalSecond.getSeconds(), _reportIntervalSecond.getSeconds(), TimeUnit.SECONDS); } diff --git a/src/main/java/com/linkedin/xinfra/monitor/services/ConsumeService.java b/src/main/java/com/linkedin/xinfra/monitor/services/ConsumeService.java index 63caff1c..08ccc010 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/services/ConsumeService.java +++ b/src/main/java/com/linkedin/xinfra/monitor/services/ConsumeService.java @@ -49,7 +49,7 @@ public class ConsumeService extends AbstractService { private static final long CONSUME_THREAD_SLEEP_MS = 100; private static Metrics metrics; private final AtomicBoolean _running; - private final KMBaseConsumer _baseConsumer; + private KMBaseConsumer _baseConsumer; private final int _latencySlaMs; private ConsumeMetrics _sensors; private Thread _consumeThread; @@ -60,6 +60,9 @@ public class ConsumeService extends AbstractService { private final String _name; private static final String METRIC_GROUP_NAME = "consume-service"; private static Map tags; + private CompletableFuture _topicPartitionFuture; + private ConsumerFactory _consumerFactory; + private CompletableFuture _topicPartitionResult; /** * Mainly contains services for three metrics: @@ -80,41 +83,14 @@ public ConsumeService(String name, CompletableFuture topicPartitionResult, ConsumerFactory consumerFactory) throws ExecutionException, InterruptedException { - // TODO: Make values of below fields come from configs + // TODO: Make values of below fields come from configs super(10, Duration.ofMinutes(1)); - _baseConsumer = consumerFactory.baseConsumer(); _latencySlaMs = consumerFactory.latencySlaMs(); _name = name; _adminClient = consumerFactory.adminClient(); _running = new AtomicBoolean(false); - - // Returns a new CompletionStage (topicPartitionFuture) which - // executes the given action - code inside run() - when this stage (topicPartitionResult) completes normally,. - CompletableFuture topicPartitionFuture = topicPartitionResult.thenRun(() -> { - MetricConfig metricConfig = new MetricConfig().samples(60).timeWindow(1000, TimeUnit.MILLISECONDS); - List reporters = new ArrayList<>(); - reporters.add(new JmxReporter(JMX_PREFIX)); - metrics = new Metrics(metricConfig, reporters, new SystemTime()); - tags = new HashMap<>(); - tags.put(TAGS_NAME, name); - _topic = consumerFactory.topic(); - _sensors = new ConsumeMetrics(metrics, tags, consumerFactory.latencyPercentileMaxMs(), - consumerFactory.latencyPercentileGranularityMs()); - _commitLatencyMetrics = new CommitLatencyMetrics(metrics, tags, consumerFactory.latencyPercentileMaxMs(), - consumerFactory.latencyPercentileGranularityMs()); - _commitAvailabilityMetrics = new CommitAvailabilityMetrics(metrics, tags); - _consumeThread = new Thread(() -> { - try { - consume(); - } catch (Exception e) { - LOG.error(name + "/ConsumeService failed", e); - } - }, name + " consume-service"); - _consumeThread.setDaemon(true); - }); - - // In a blocking fashion, waits for this topicPartitionFuture to complete, and then returns its result. - topicPartitionFuture.get(); + _topicPartitionResult = topicPartitionResult; + _consumerFactory = consumerFactory; } private void consume() throws Exception { @@ -227,6 +203,34 @@ void startConsumeThreadForTesting() { @Override public synchronized void start() { if (_running.compareAndSet(false, true)) { + try { + _baseConsumer = _consumerFactory.baseConsumer(); + + _topicPartitionFuture = _topicPartitionResult.thenRun(() -> { + MetricConfig metricConfig = new MetricConfig().samples(60).timeWindow(1000, TimeUnit.MILLISECONDS); + List reporters = new ArrayList<>(); + reporters.add(new JmxReporter(JMX_PREFIX)); + metrics = new Metrics(metricConfig, reporters, new SystemTime()); + tags = new HashMap<>(); + tags.put(TAGS_NAME, _name); + _topic = _consumerFactory.topic(); + _sensors = new ConsumeMetrics(metrics, tags, _consumerFactory.latencyPercentileMaxMs(), _consumerFactory.latencyPercentileGranularityMs()); + _commitLatencyMetrics = new CommitLatencyMetrics(metrics, tags, _consumerFactory.latencyPercentileMaxMs(), _consumerFactory.latencyPercentileGranularityMs()); + _commitAvailabilityMetrics = new CommitAvailabilityMetrics(metrics, tags); + _consumeThread = new Thread(() -> { + try { + consume(); + } catch (Exception e) { + LOG.error(_name + "/ConsumeService failed", e); + } + }, _name + " consume-service"); + _consumeThread.setDaemon(true); + }); + + _topicPartitionFuture.get(); + } catch (Exception e) { + LOG.error("Error trying to start ConsumeService", e); + } _consumeThread.start(); LOG.info("{}/ConsumeService started.", _name); @@ -243,6 +247,7 @@ public synchronized void start() { public synchronized void stop() { if (_running.compareAndSet(true, false)) { try { + _consumeThread.join(); _baseConsumer.close(); } catch (Exception e) { LOG.warn(_name + "/ConsumeService while trying to close consumer.", e); diff --git a/src/main/java/com/linkedin/xinfra/monitor/services/ConsumerFactoryImpl.java b/src/main/java/com/linkedin/xinfra/monitor/services/ConsumerFactoryImpl.java index 07943db8..df2e886c 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/services/ConsumerFactoryImpl.java +++ b/src/main/java/com/linkedin/xinfra/monitor/services/ConsumerFactoryImpl.java @@ -27,8 +27,8 @@ public class ConsumerFactoryImpl implements ConsumerFactory { - private final KMBaseConsumer _baseConsumer; - private final String _topic; + private KMBaseConsumer _baseConsumer; + private String _topic; private static final String FALSE = "false"; private final int _latencyPercentileMaxMs; private final int _latencyPercentileGranularityMs; @@ -37,6 +37,8 @@ public class ConsumerFactoryImpl implements ConsumerFactory { private final int _latencySlaMs; private static AdminClient adminClient; private static final Logger LOG = LoggerFactory.getLogger(ConsumerFactoryImpl.class); + private String _consumerClassName; + private final Properties consumerProps = new Properties(); @SuppressWarnings("rawtypes") public ConsumerFactoryImpl(Map props) throws Exception { @@ -48,7 +50,7 @@ public ConsumerFactoryImpl(Map props) throws Exception { _topic = config.getString(ConsumeServiceConfig.TOPIC_CONFIG); String zkConnect = config.getString(ConsumeServiceConfig.ZOOKEEPER_CONNECT_CONFIG); String brokerList = config.getString(ConsumeServiceConfig.BOOTSTRAP_SERVERS_CONFIG); - String consumerClassName = config.getString(ConsumeServiceConfig.CONSUMER_CLASS_CONFIG); + _consumerClassName = config.getString(ConsumeServiceConfig.CONSUMER_CLASS_CONFIG); _latencySlaMs = config.getInt(ConsumeServiceConfig.LATENCY_SLA_MS_CONFIG); _latencyPercentileMaxMs = config.getInt(ConsumeServiceConfig.LATENCY_PERCENTILE_MAX_MS_CONFIG); _latencyPercentileGranularityMs = config.getInt(ConsumeServiceConfig.LATENCY_PERCENTILE_GRANULARITY_MS_CONFIG); @@ -57,7 +59,6 @@ public ConsumerFactoryImpl(Map props) throws Exception { throw new ConfigException("Override must not contain " + property + " config."); } } - Properties consumerProps = new Properties(); /* Assign default config. This has the lowest priority. */ consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, FALSE); @@ -66,8 +67,8 @@ public ConsumerFactoryImpl(Map props) throws Exception { consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "kmf-consumer-group-" + new Random().nextInt()); consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - if (consumerClassName.equals(NewConsumer.class.getCanonicalName()) || consumerClassName.equals(NewConsumer.class.getSimpleName())) { - consumerClassName = NewConsumer.class.getCanonicalName(); + if (_consumerClassName.equals(NewConsumer.class.getCanonicalName()) || _consumerClassName.equals(NewConsumer.class.getSimpleName())) { + _consumerClassName = NewConsumer.class.getCanonicalName(); } /* Assign config specified for ConsumeService. */ @@ -80,16 +81,6 @@ public ConsumerFactoryImpl(Map props) throws Exception { if (props.containsKey(ConsumeServiceConfig.CONSUMER_PROPS_CONFIG)) { props.forEach(consumerProps::putIfAbsent); } - - java.lang.reflect.Constructor constructor = adminClientConstructorIfExists(consumerClassName); - if (constructor != null) { - _baseConsumer = (KMBaseConsumer) constructor - .newInstance(_topic, consumerProps, adminClient()); - } else { - _baseConsumer = (KMBaseConsumer) Class.forName(consumerClassName) - .getConstructor(String.class, Properties.class) - .newInstance(_topic, consumerProps); - } } private static java.lang.reflect.Constructor adminClientConstructorIfExists(String consumerClassName) @@ -117,6 +108,19 @@ public int latencySlaMs() { @Override public KMBaseConsumer baseConsumer() { + try { + java.lang.reflect.Constructor constructor = adminClientConstructorIfExists(_consumerClassName); + if (constructor != null) { + _baseConsumer = (KMBaseConsumer) constructor + .newInstance(_topic, consumerProps, adminClient()); + } else { + _baseConsumer = (KMBaseConsumer) Class.forName(_consumerClassName) + .getConstructor(String.class, Properties.class) + .newInstance(_topic, consumerProps); + } + } catch (Exception e) { + LOG.error("Cannot create consumer", e); + } return _baseConsumer; } diff --git a/src/main/java/com/linkedin/xinfra/monitor/services/DefaultMetricsReporterService.java b/src/main/java/com/linkedin/xinfra/monitor/services/DefaultMetricsReporterService.java index 64a62bf1..2849765f 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/services/DefaultMetricsReporterService.java +++ b/src/main/java/com/linkedin/xinfra/monitor/services/DefaultMetricsReporterService.java @@ -28,18 +28,19 @@ public class DefaultMetricsReporterService implements Service { private final String _name; private final List _metricNames; private final int _reportIntervalSec; - private final ScheduledExecutorService _executor; + private ScheduledExecutorService _executor; public DefaultMetricsReporterService(Map props, String name) { _name = name; DefaultMetricsReporterServiceConfig config = new DefaultMetricsReporterServiceConfig(props); _metricNames = config.getList(DefaultMetricsReporterServiceConfig.REPORT_METRICS_CONFIG); _reportIntervalSec = config.getInt(DefaultMetricsReporterServiceConfig.REPORT_INTERVAL_SEC_CONFIG); - _executor = Executors.newSingleThreadScheduledExecutor(); } @Override public synchronized void start() { + _executor = Executors.newSingleThreadScheduledExecutor(); + _executor.scheduleAtFixedRate(() -> { try { reportMetrics(); diff --git a/src/main/java/com/linkedin/xinfra/monitor/services/GraphiteMetricsReporterService.java b/src/main/java/com/linkedin/xinfra/monitor/services/GraphiteMetricsReporterService.java index 043f34b5..4918dda4 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/services/GraphiteMetricsReporterService.java +++ b/src/main/java/com/linkedin/xinfra/monitor/services/GraphiteMetricsReporterService.java @@ -33,7 +33,7 @@ public class GraphiteMetricsReporterService implements Service { private final String _name; private final List _metricNames; private final int _reportIntervalSec; - private final ScheduledExecutorService _executor; + private ScheduledExecutorService _executor; private final GraphiteClient _graphiteClient; private final String _metricNamePrefix; @@ -43,7 +43,6 @@ public GraphiteMetricsReporterService(Map props, String name) GraphiteMetricsReporterServiceConfig config = new GraphiteMetricsReporterServiceConfig(props); _metricNames = config.getList(GraphiteMetricsReporterServiceConfig.REPORT_METRICS_CONFIG); _reportIntervalSec = config.getInt(GraphiteMetricsReporterServiceConfig.REPORT_INTERVAL_SEC_CONFIG); - _executor = Executors.newSingleThreadScheduledExecutor(); _metricNamePrefix = config.getString(GraphiteMetricsReporterServiceConfig.REPORT_GRAPHITE_PREFIX); _graphiteClient = GraphiteClientFactory.defaultGraphiteClient( config.getString(GraphiteMetricsReporterServiceConfig.REPORT_GRAPHITE_HOST), @@ -52,6 +51,8 @@ public GraphiteMetricsReporterService(Map props, String name) @Override public synchronized void start() { + _executor = Executors.newSingleThreadScheduledExecutor(); + _executor.scheduleAtFixedRate(new Runnable() { @Override public void run() { diff --git a/src/main/java/com/linkedin/xinfra/monitor/services/HAMonitoringCoordinator.java b/src/main/java/com/linkedin/xinfra/monitor/services/HAMonitoringCoordinator.java new file mode 100755 index 00000000..08fdbccf --- /dev/null +++ b/src/main/java/com/linkedin/xinfra/monitor/services/HAMonitoringCoordinator.java @@ -0,0 +1,180 @@ +package com.linkedin.xinfra.monitor.services; + +import org.apache.kafka.clients.consumer.internals.AbstractCoordinator; +import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient; +import org.apache.kafka.clients.GroupRebalanceConfig; +import org.apache.kafka.common.message.JoinGroupRequestData; +import org.apache.kafka.common.message.JoinGroupResponseData; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Define methods to serialize and desercialize HAMonitoringIdentity instances. This is used by the coordinator to + * perform assignments and group joins. + */ +class HAMonitoringProtocol { + public static ByteBuffer serializeMetadata(HAMonitoringIdentity id) { + return ByteBuffer.wrap(id.toBytes()); + } + + public static HAMonitoringIdentity deserializeMetadata(ByteBuffer data) { + return HAMonitoringIdentity.fromBytes(data.array()); + } +} + +/** + * This coordinator, based on Kafka's AbstractCooridnator, manages the instances of Xinfra Monitor running in the + * group specified in the config. One of these instances is elected to report metrics. That instance, referred to as + * the leader, will run Xinfra Monitor to report metrics. If the leader fails or leaves the group, a different instance + * will start running Xinfra Monitor. + */ +public class HAMonitoringCoordinator extends AbstractCoordinator { + private final Runnable _startMonitor; + private final Runnable _stopMonitor; + private HAMonitoringIdentity _identity; + private final Logger LOG; + + public HAMonitoringCoordinator(GroupRebalanceConfig groupRebalanceConfig, + LogContext logContext, + ConsumerNetworkClient client, + Metrics metrics, + String metricGrpPrefix, + Time time, + Runnable start, + Runnable stop, + HAMonitoringIdentity id) { + super(groupRebalanceConfig, + logContext, + client, + metrics, + metricGrpPrefix, + time); + + this._startMonitor = start; + this._stopMonitor = stop; + this._identity = id; + this.LOG = logContext.logger(HAMonitoringCoordinator.class); + } + + @Override + public String protocolType() { + return "xinfraleaderelector"; + } + + @Override + public JoinGroupRequestData.JoinGroupRequestProtocolCollection metadata() { + ByteBuffer metadata = HAMonitoringProtocol.serializeMetadata(this._identity); + + return new JoinGroupRequestData.JoinGroupRequestProtocolCollection( + Collections.singletonList(new JoinGroupRequestData.JoinGroupRequestProtocol() + .setName("HAMonitoringCoordinator") + .setMetadata(metadata.array())).iterator()); + } + + @Override + protected void onJoinPrepare(int generation, String memberId) { + /** + * When a new member joins the group, do nothing - no clean up required. Xinfra Monitor should keep running + * until the leader has changed. + */ + } + + /** + * One group member will perform assignemnt for the group. This method determines which member should report + * metrics and returns that assignment to all members. + * + * Unless a leader already exists, the member with the lexicographically smallest group ID is chosen as the leader. + * The group ID is used instead of the user defined ID since the latter is not guaranteed to be unique. + */ + @Override + protected Map performAssignment( + String leaderId, + String protocol, + List allMemberMetadata + ) { + // Map the group defined member ID to HAMonitoringIdentity object + Map assignments = new HashMap<>(); + int numLeaders = 0; + String leaderGroupId = null; // Store the leader's group defined member ID + + for (JoinGroupResponseData.JoinGroupResponseMember entry : allMemberMetadata) { + HAMonitoringIdentity id = HAMonitoringProtocol.deserializeMetadata(ByteBuffer.wrap(entry.metadata())); + if (id.isLeader()) numLeaders++; + + // Update lexicographically smallest group defined id + if (leaderGroupId == null || entry.memberId().compareTo(leaderGroupId) < 0) { + leaderGroupId = entry.memberId(); + } + + assignments.put(entry.memberId(), id); + } + + if (numLeaders != 1) { + // Make member with lexicographically smallest group id the leader to run Xinfra Monitor + for (Map.Entry entry : assignments.entrySet()) { + entry.getValue().setLeader(entry.getKey() == leaderGroupId); + } + } // Otherwise, leave the current leader + + // Map group defined id to serialized identity object + Map serializedAssignments = new HashMap<>(); + for (Map.Entry entry : assignments.entrySet()) { + serializedAssignments.put(entry.getKey(), HAMonitoringProtocol.serializeMetadata(entry.getValue())); + } + + return serializedAssignments; + } + + @Override + protected void onJoinComplete( + int generation, + String memberId, + String protocol, + ByteBuffer memberAssignment + ) { + /** + * Only the assigned leader should run Xinfra Monitor. All other members should stop running the monitor. + * The start and stop methods are defined in Xinfra Monitor's constructor. + */ + this._identity = HAMonitoringProtocol.deserializeMetadata(memberAssignment); + + if (this._identity.isLeader()) { + LOG.info("HAMonitoringCoordinator received assignment: is leader"); + try { + _startMonitor.run(); + } catch (Exception e) { + LOG.error("Error starting HAXinfraMonitor", e); + // Leave group so another member can be elected leader to run the monitor + maybeLeaveGroup("Failed to start HAXinfraMonitor"); + } + } else { + LOG.info("HAMonitoringCoordinator received assignment: is not leader"); + _stopMonitor.run(); + } + } + + /** + * The service will poll the coordinator at a fixed rate. If a reassignment is required, the coordinator will handle + * that. + */ + public void poll() { + if (coordinatorUnknown()) { + ensureCoordinatorReady(time.timer(1000)); + } + + if (rejoinNeededOrPending()) { + ensureActiveGroup(); + } + + pollHeartbeat(time.milliseconds()); + client.poll(time.timer(1000)); + } +} diff --git a/src/main/java/com/linkedin/xinfra/monitor/services/HAMonitoringIdentity.java b/src/main/java/com/linkedin/xinfra/monitor/services/HAMonitoringIdentity.java new file mode 100755 index 00000000..3cd6c719 --- /dev/null +++ b/src/main/java/com/linkedin/xinfra/monitor/services/HAMonitoringIdentity.java @@ -0,0 +1,53 @@ +package com.linkedin.xinfra.monitor.services; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectMapper; + +/** + * Keep track of assignments made in HAMonitoringCoordinator + */ +public class HAMonitoringIdentity { + private String _id; + private Boolean _isLeader; + + private static final ObjectMapper MAPPER = new ObjectMapper(); + + public HAMonitoringIdentity( + @JsonProperty("_id") String id + ) { + _id = id; + _isLeader = false; + } + + public byte[] toBytes() { + try { + return MAPPER.writeValueAsBytes(this); + } catch (Exception e) { + throw new IllegalArgumentException("Error serializing identity information", e); + } + } + + public static HAMonitoringIdentity fromBytes(byte[] jsonData) { + try { + return MAPPER.readValue(jsonData, HAMonitoringIdentity.class); + } catch (Exception e) { + throw new IllegalArgumentException("Error deserializing identity information", e); + } + } + + public String getId() { + return _id; + } + + public void setId(String id) { + _id = id; + } + + public boolean isLeader() { + return _isLeader; + } + + public void setLeader(boolean isLeader) { + _isLeader = isLeader; + } +} diff --git a/src/main/java/com/linkedin/xinfra/monitor/services/HAMonitoringService.java b/src/main/java/com/linkedin/xinfra/monitor/services/HAMonitoringService.java new file mode 100755 index 00000000..de33a0ae --- /dev/null +++ b/src/main/java/com/linkedin/xinfra/monitor/services/HAMonitoringService.java @@ -0,0 +1,197 @@ +package com.linkedin.xinfra.monitor.services; + +import com.linkedin.xinfra.monitor.services.configs.HAMonitoringConfig; +import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.ClientDnsLookup; +import org.apache.kafka.clients.ClientUtils; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.NetworkClient; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient; +import org.apache.kafka.clients.GroupRebalanceConfig; +import org.apache.kafka.common.internals.ClusterResourceListeners; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.network.ChannelBuilder; +import org.apache.kafka.common.network.Selector; +import org.apache.kafka.common.requests.JoinGroupRequest; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Service that creates HAMonitoringCoordinator and regularly polls it. + */ +public class HAMonitoringService implements Service { + private static final Logger LOG = LoggerFactory.getLogger(HAMonitoringService.class); + private ScheduledExecutorService _executor; + private AtomicBoolean _isRunning; + private final String _name; + private final HAMonitoringCoordinator _coordinator; + + public HAMonitoringService(Map props, String name, Runnable startMonitor, Runnable stopMonitor) { + _name = name; + HAMonitoringConfig config = new HAMonitoringConfig(props); + + long retryBackoffMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG); + int requestTimeoutMs = config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG); + int heartbeatIntervalMs = config.getInt(HAMonitoringConfig.HEARTBEAT_INTERVAL_MS_CONFIG); + String metricGrpPrefix = config.getString(HAMonitoringConfig.METRIC_GROUP_PREFIX_CONFIG); + String clientId = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG); + + Time time = Time.SYSTEM; + + // Each instance the monitor will need an ID to join the coordinator group + String groupInstanceId = config.getString(HAMonitoringConfig.GROUP_INSTANCE_ID_CONFIG); + if (groupInstanceId == null) { + groupInstanceId = UUID.randomUUID().toString(); + } + JoinGroupRequest.validateGroupInstanceId(groupInstanceId); + String groupId = config.getString(HAMonitoringConfig.GROUP_ID_CONFIG); + + HAMonitoringIdentity id = new HAMonitoringIdentity(groupInstanceId); + + // Create paramaters required by the coordinator + MetricConfig metricConfig = new MetricConfig(); + Metrics metrics = new Metrics(metricConfig, new ArrayList<>(), time); + + LogContext logContext = new LogContext("[HA Leader Election group=" + groupId + " instance=" + groupInstanceId + "] "); + + Metadata metadata = new Metadata( + retryBackoffMs, + config.getLong(CommonClientConfigs.METADATA_MAX_AGE_CONFIG), + logContext, + new ClusterResourceListeners() + ); + + final List addresses = ClientUtils + .parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG), + config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG)); + metadata.bootstrap(addresses, time.milliseconds()); + + ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config, time); + + NetworkClient netClient = new NetworkClient( + new Selector(config.getLong(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG), // maxIdleMs, + metrics, + time, + metricGrpPrefix, + channelBuilder, + logContext), + metadata, + clientId, + 100, + config.getLong(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG), + config.getLong(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG), + config.getInt(CommonClientConfigs.SEND_BUFFER_CONFIG), + config.getInt(CommonClientConfigs.RECEIVE_BUFFER_CONFIG), + requestTimeoutMs, + ClientDnsLookup.forConfig(config.getString(ConsumerConfig.CLIENT_DNS_LOOKUP_CONFIG)), + time, + true, + new ApiVersions(), + logContext + ); + + ConsumerNetworkClient client = new ConsumerNetworkClient( + logContext, + netClient, + metadata, + time, + retryBackoffMs, + requestTimeoutMs, + heartbeatIntervalMs + ); + + GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig( + config.getInt(HAMonitoringConfig.SESSION_TIMEOUT_MS_CONFIG), + config.getInt(CommonClientConfigs.REBALANCE_TIMEOUT_MS_CONFIG), + heartbeatIntervalMs, + groupId, + Optional.of(groupInstanceId), + retryBackoffMs, + false + ); + + // Create a HAMonitoringCoordinator instance + _coordinator = new HAMonitoringCoordinator( + groupRebalanceConfig, + logContext, + client, + metrics, + metricGrpPrefix, + time, + startMonitor, + stopMonitor, + id + ); + + _isRunning = new AtomicBoolean(true); + + /** + * Start a thread to poll the coordinator at a fixed rate. + */ + long initialDelaySecond = 5; + long periodSecond = 30; + + _coordinator.ensureActiveGroup(); + + _executor = Executors.newSingleThreadScheduledExecutor(); + _executor.scheduleAtFixedRate(() -> _coordinator.poll(), initialDelaySecond, periodSecond, TimeUnit.SECONDS); + + LOG.info("{}/HAMonitoringService started.", _name); + } + + @Override + public synchronized void start() { + if (!_isRunning.compareAndSet(false, true)) { + return; + } + + // start a thread to ensure coordinator ready + // while not stopping, poll the coordinator +// long initialDelaySecond = 5; +// long periodSecond = 30; +// +// _coordinator.ensureActiveGroup(); +// +// _executor = Executors.newSingleThreadScheduledExecutor(); +// _executor.scheduleAtFixedRate(() -> _coordinator.poll(), initialDelaySecond, periodSecond, TimeUnit.SECONDS); + } + + @Override + public synchronized void stop() { + /** + * This service should contine running to see if this group member becomes in charge of reporting metrics. + * Continue polling the cooridnator. + */ + } + + @Override + public boolean isRunning() { + return _isRunning.get(); + } + + @Override + public void awaitShutdown(long timeout, TimeUnit unit) { + try { + _executor.awaitTermination(Integer.MAX_VALUE, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + LOG.info("Thread interrupted when waiting for {}/HAMonitoringService to shutdown.", _name); + } + LOG.info("{}/HAMonitoringService shutdown completed.", _name); + } +} diff --git a/src/main/java/com/linkedin/xinfra/monitor/services/HAMonitoringServiceFactory.java b/src/main/java/com/linkedin/xinfra/monitor/services/HAMonitoringServiceFactory.java new file mode 100755 index 00000000..92ab2228 --- /dev/null +++ b/src/main/java/com/linkedin/xinfra/monitor/services/HAMonitoringServiceFactory.java @@ -0,0 +1,27 @@ +package com.linkedin.xinfra.monitor.services; + +import java.util.Map; + + +/** + * Factory class that constructs the HAMonitoringService. + */ +@SuppressWarnings("rawtypes") +public class HAMonitoringServiceFactory implements ServiceFactory { + public static final String STARTMONITOR = "start.monitor"; + public static final String STOPMONITOR = "stop.monitor"; + + private final Map _properties; + private final String _serviceName; + + public HAMonitoringServiceFactory(Map properties, String serviceName) { + _properties = properties; + _serviceName = serviceName; + } + + @SuppressWarnings("unchecked") + @Override + public Service createService() throws Exception { + return new HAMonitoringService(_properties, _serviceName, (Runnable) _properties.get(STARTMONITOR), (Runnable) _properties.get(STOPMONITOR)); + } +} diff --git a/src/main/java/com/linkedin/xinfra/monitor/services/JolokiaService.java b/src/main/java/com/linkedin/xinfra/monitor/services/JolokiaService.java index ae1806e7..cb50d0bc 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/services/JolokiaService.java +++ b/src/main/java/com/linkedin/xinfra/monitor/services/JolokiaService.java @@ -25,17 +25,21 @@ public class JolokiaService implements Service { private static final Logger LOG = LoggerFactory.getLogger(JolokiaService.class); private final String _name; - private final JolokiaServer _jolokiaServer; + private JolokiaServer _jolokiaServer; private final AtomicBoolean _isRunning; public JolokiaService(Map props, String name) throws Exception { _name = name; - _jolokiaServer = new JolokiaServer(new JvmAgentConfig("host=*,port=8778"), false); _isRunning = new AtomicBoolean(false); } public synchronized void start() { if (_isRunning.compareAndSet(false, true)) { + try { + _jolokiaServer = new JolokiaServer(new JvmAgentConfig("host=*,port=8778"), false); + } catch (Exception e) { + LOG.error("Error starting JolokiaService", e); + } _jolokiaServer.start(); LOG.info("{}/JolokiaService started at port 8778", _name); } diff --git a/src/main/java/com/linkedin/xinfra/monitor/services/KafkaMetricsReporterService.java b/src/main/java/com/linkedin/xinfra/monitor/services/KafkaMetricsReporterService.java index 4027dc08..00565496 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/services/KafkaMetricsReporterService.java +++ b/src/main/java/com/linkedin/xinfra/monitor/services/KafkaMetricsReporterService.java @@ -36,7 +36,7 @@ public class KafkaMetricsReporterService implements Service { private final String _name; private final List _metricsNames; private final int _reportIntervalSec; - private final ScheduledExecutorService _executor; + private ScheduledExecutorService _executor; private KafkaProducer _producer; private final String _brokerList; private final String _topic; @@ -47,9 +47,7 @@ public KafkaMetricsReporterService(Map props, String name, Admin KafkaMetricsReporterServiceConfig config = new KafkaMetricsReporterServiceConfig(props); _metricsNames = config.getList(KafkaMetricsReporterServiceConfig.REPORT_METRICS_CONFIG); _reportIntervalSec = config.getInt(KafkaMetricsReporterServiceConfig.REPORT_INTERVAL_SEC_CONFIG); - _executor = Executors.newSingleThreadScheduledExecutor(); _brokerList = config.getString(KafkaMetricsReporterServiceConfig.BOOTSTRAP_SERVERS_CONFIG); - initializeProducer(); _topic = config.getString(KafkaMetricsReporterServiceConfig.TOPIC_CONFIG); Integer rf = config.getInt(KafkaMetricsReporterServiceConfig.TOPIC_REPLICATION_FACTOR); Utils.createTopicIfNotExists( @@ -64,6 +62,10 @@ public KafkaMetricsReporterService(Map props, String name, Admin @Override public synchronized void start() { + initializeProducer(); + + _executor = Executors.newSingleThreadScheduledExecutor(); + _executor.scheduleAtFixedRate(() -> { try { reportMetrics(); diff --git a/src/main/java/com/linkedin/xinfra/monitor/services/MultiClusterTopicManagementService.java b/src/main/java/com/linkedin/xinfra/monitor/services/MultiClusterTopicManagementService.java index dca1eb65..16cf0779 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/services/MultiClusterTopicManagementService.java +++ b/src/main/java/com/linkedin/xinfra/monitor/services/MultiClusterTopicManagementService.java @@ -87,7 +87,7 @@ public class MultiClusterTopicManagementService implements Service { private final Map _topicManagementByCluster; private final int _rebalanceIntervalMs; private final long _preferredLeaderElectionIntervalMs; - private final ScheduledExecutorService _executor; + private ScheduledExecutorService _executor; @SuppressWarnings("unchecked") public MultiClusterTopicManagementService(Map props, String serviceName) throws Exception { @@ -101,8 +101,6 @@ public MultiClusterTopicManagementService(Map props, String serv _rebalanceIntervalMs = config.getInt(MultiClusterTopicManagementServiceConfig.REBALANCE_INTERVAL_MS_CONFIG); _preferredLeaderElectionIntervalMs = config.getLong(MultiClusterTopicManagementServiceConfig.PREFERRED_LEADER_ELECTION_CHECK_INTERVAL_MS_CONFIG); - _executor = Executors.newSingleThreadScheduledExecutor( - r -> new Thread(r, _serviceName + "-multi-cluster-topic-management-service")); _topicPartitionResult.complete(null); } @@ -129,6 +127,11 @@ private Map initializeTopicManagementHelper(Map new Thread(r, _serviceName + "-multi-cluster-topic-management-service")); + + // Runnable tmRunnable = new TopicManagementRunnable(); + // _executor.scheduleWithFixedDelay(tmRunnable, 0, _scheduleIntervalMs, TimeUnit.MILLISECONDS); final long topicManagementProcedureInitialDelay = 0; _executor.scheduleWithFixedDelay( new TopicManagementRunnable(), diff --git a/src/main/java/com/linkedin/xinfra/monitor/services/OffsetCommitService.java b/src/main/java/com/linkedin/xinfra/monitor/services/OffsetCommitService.java index 18c31803..74a5ee56 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/services/OffsetCommitService.java +++ b/src/main/java/com/linkedin/xinfra/monitor/services/OffsetCommitService.java @@ -65,7 +65,7 @@ public class OffsetCommitService implements Service { private static final Logger LOGGER = LoggerFactory.getLogger(OffsetCommitService.class); private static final String SERVICE_SUFFIX = "-consumer-offset-commit-service"; private final AtomicBoolean _isRunning; - private final ScheduledExecutorService _scheduledExecutorService; + private ScheduledExecutorService _scheduledExecutorService; private final String _serviceName; private final AdminClient _adminClient; private final String _consumerGroup; @@ -75,6 +75,8 @@ public class OffsetCommitService implements Service { private final Time _time; private final OffsetCommitServiceMetrics _offsetCommitServiceMetrics; + private final ThreadFactory threadFactory; + /** * * @param config The consumer configuration keys @@ -134,13 +136,12 @@ public class OffsetCommitService implements Service { _consumerNetworkClient = new ConsumerNetworkClient(logContext, kafkaClient, metadata, _time, retryBackoffMs, config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), heartbeatIntervalMs); - ThreadFactory threadFactory = new ThreadFactory() { + threadFactory = new ThreadFactory() { @Override public Thread newThread(Runnable runnable) { return new Thread(runnable, serviceName + SERVICE_SUFFIX); } }; - _scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(threadFactory); LOGGER.info("OffsetCommitService's ConsumerConfig - {}", Utils.prettyPrint(config.values())); } @@ -154,8 +155,8 @@ public Thread newThread(Runnable runnable) { @Override public void start() { if (_isRunning.compareAndSet(false, true)) { - Runnable runnable = new OffsetCommitServiceRunnable(); + _scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(threadFactory); _scheduledExecutorService.scheduleWithFixedDelay(runnable, 1, 2, TimeUnit.SECONDS); LOGGER.info("Scheduled the offset commit service executor."); } diff --git a/src/main/java/com/linkedin/xinfra/monitor/services/ProduceService.java b/src/main/java/com/linkedin/xinfra/monitor/services/ProduceService.java index 28e49242..bd4b89df 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/services/ProduceService.java +++ b/src/main/java/com/linkedin/xinfra/monitor/services/ProduceService.java @@ -58,7 +58,7 @@ public class ProduceService extends AbstractService { private KMBaseProducer _producer; private final KMPartitioner _partitioner; private ScheduledExecutorService _produceExecutor; - private final ScheduledExecutorService _handleNewPartitionsExecutor; + private ScheduledExecutorService _handleNewPartitionsExecutor; private final int _produceDelayMs; private final boolean _sync; /** This can be updated while running when new partitions are added to the monitor topic. */ @@ -106,6 +106,8 @@ public ProduceService(Map props, String name) throws Exception { } } + props.forEach(_producerPropsOverride::putIfAbsent); + _adminClient = AdminClient.create(props); if (producerClass.equals(NewProducer.class.getCanonicalName()) || producerClass.equals(NewProducer.class.getSimpleName())) { @@ -114,11 +116,6 @@ public ProduceService(Map props, String name) throws Exception { _producerClassName = producerClass; } - initializeProducer(props); - - _produceExecutor = Executors.newScheduledThreadPool(_threadsNum, new ProduceServiceThreadFactory()); - _handleNewPartitionsExecutor = Executors.newSingleThreadScheduledExecutor(new HandleNewPartitionsThreadFactory()); - MetricConfig metricConfig = new MetricConfig().samples(60).timeWindow(1000, TimeUnit.MILLISECONDS); List reporters = new ArrayList<>(); reporters.add(new JmxReporter(JMX_PREFIX)); @@ -157,6 +154,17 @@ private void initializeProducer(Map props) throws Exception { @Override public synchronized void start() { if (_running.compareAndSet(false, true)) { + _produceExecutor = Executors.newScheduledThreadPool(_threadsNum, new ProduceServiceThreadFactory()); + _handleNewPartitionsExecutor = Executors.newSingleThreadScheduledExecutor(new HandleNewPartitionsThreadFactory()); + + try { + initializeProducer(_producerPropsOverride); + } catch (Exception e) { + LOG.error("Failed to restart producer.", e); + } + _produceExecutor = Executors.newScheduledThreadPool(_threadsNum, new ProduceServiceThreadFactory()); + _handleNewPartitionsExecutor = Executors.newSingleThreadScheduledExecutor(new HandleNewPartitionsThreadFactory()); + TopicDescription topicDescription = getTopicDescription(_adminClient, _topic); int partitionNum = topicDescription.partitions().size(); initializeStateForPartitions(partitionNum); diff --git a/src/main/java/com/linkedin/xinfra/monitor/services/SignalFxMetricsReporterService.java b/src/main/java/com/linkedin/xinfra/monitor/services/SignalFxMetricsReporterService.java index 9678ed10..74d8ee8e 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/services/SignalFxMetricsReporterService.java +++ b/src/main/java/com/linkedin/xinfra/monitor/services/SignalFxMetricsReporterService.java @@ -29,7 +29,7 @@ public class SignalFxMetricsReporterService implements Service { private final String _name; private final List _metricNames; private final int _reportIntervalSec; - private final ScheduledExecutorService _executor; + private ScheduledExecutorService _executor; private final MetricRegistry _metricRegistry; private final SignalFxReporter _signalfxReporter; @@ -50,7 +50,6 @@ public SignalFxMetricsReporterService(Map props, String name) th throw new IllegalArgumentException("SignalFx token is not configured"); } - _executor = Executors.newSingleThreadScheduledExecutor(); _metricRegistry = new MetricRegistry(); _metricMap = new HashMap(); _dimensionsMap = new HashMap(); @@ -71,6 +70,8 @@ public SignalFxMetricsReporterService(Map props, String name) th @Override public synchronized void start() { + _executor = Executors.newSingleThreadScheduledExecutor(); + _signalfxReporter.start(_reportIntervalSec, TimeUnit.SECONDS); _executor.scheduleAtFixedRate(() -> { try { diff --git a/src/main/java/com/linkedin/xinfra/monitor/services/StatsdMetricsReporterService.java b/src/main/java/com/linkedin/xinfra/monitor/services/StatsdMetricsReporterService.java index 079beb78..a6139d82 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/services/StatsdMetricsReporterService.java +++ b/src/main/java/com/linkedin/xinfra/monitor/services/StatsdMetricsReporterService.java @@ -31,7 +31,7 @@ public class StatsdMetricsReporterService implements Service { private final String _name; private final List _metricNames; private final int _reportIntervalSec; - private final ScheduledExecutorService _executor; + private ScheduledExecutorService _executor; private final StatsDClient _statsdClient; public StatsdMetricsReporterService(Map props, String name) { @@ -40,7 +40,6 @@ public StatsdMetricsReporterService(Map props, String name) { _name = name; _metricNames = config.getList(StatsdMetricsReporterServiceConfig.REPORT_METRICS_CONFIG); _reportIntervalSec = config.getInt(StatsdMetricsReporterServiceConfig.REPORT_INTERVAL_SEC_CONFIG); - _executor = Executors.newSingleThreadScheduledExecutor(); _statsdClient = new NonBlockingStatsDClient(config.getString(StatsdMetricsReporterServiceConfig.REPORT_STATSD_PREFIX), config.getString(StatsdMetricsReporterServiceConfig.REPORT_STATSD_HOST), config.getInt(StatsdMetricsReporterServiceConfig.REPORT_STATSD_PORT)); @@ -48,6 +47,8 @@ public StatsdMetricsReporterService(Map props, String name) { @Override public synchronized void start() { + _executor = Executors.newSingleThreadScheduledExecutor(); + _executor.scheduleAtFixedRate(() -> { try { reportMetrics(); diff --git a/src/main/java/com/linkedin/xinfra/monitor/services/configs/HAMonitoringConfig.java b/src/main/java/com/linkedin/xinfra/monitor/services/configs/HAMonitoringConfig.java new file mode 100755 index 00000000..94c08825 --- /dev/null +++ b/src/main/java/com/linkedin/xinfra/monitor/services/configs/HAMonitoringConfig.java @@ -0,0 +1,157 @@ +package com.linkedin.xinfra.monitor.services.configs; + +import java.util.Collections; +import java.util.concurrent.TimeUnit; +import java.util.Map; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.clients.ClientDnsLookup; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.config.ConfigDef; + +public class HAMonitoringConfig extends AbstractConfig { + private static final ConfigDef CONFIG; + + public static final String GROUP_ID_CONFIG = "group.id"; + public static final String GROUP_ID_DOC = "A unique string that identifies the group this member belongs to."; + + public static final String GROUP_INSTANCE_ID_CONFIG = "group.instance.id"; + public static final String GROUP_INSTANCE_ID_DOC = "A unique identifier for the application instance."; + + public static final String HEARTBEAT_INTERVAL_MS_CONFIG = "heartbeat.interval.ms"; + public static final String HEARTBEAT_INTERVAL_MS_DOC = "The expected time between heartbeats to the coordinator " + + "Heartbeats are used to ensure that the member's session stays active and to facilitate rebalancing when " + + " new consumers join or leave the group. " + + "The value must be set lower than session.timeout.ms, but typically should be set no higher " + + "than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances."; + + public static final String METRIC_GROUP_PREFIX_CONFIG = "metric.group.prefix"; + public static final String METRIC_GROUP_PREFIX_DOC = "Prefix for the group of metrics reported."; + + public static final String MAX_POLL_INTERVAL_MS_CONFIG = "max.poll.interval.ms"; + private static final String MAX_POLL_INTERVAL_MS_DOC = "The maximum delay between invocations of poll(). " + + "This places an upper bound on the amount of time that the member can be idle before checking for " + + "leadership. If poll() is not called before expiration of this timeout, then the member is considered " + + "failed and the group will rebalance in order to reassign the partitions to another member. "; + + public static final String SESSION_TIMEOUT_MS_CONFIG = "session.timeout.ms"; + private static final String SESSION_TIMEOUT_MS_DOC = "The timeout used to detect failures when using " + + "The member sends periodic heartbeats to indicate its liveness to the broker. " + + "If no heartbeats are received by the broker before the expiration of this session timeout, " + + "then the broker will remove this member from the group and initiate a rebalance. Note that the value " + + "must be in the allowable range as configured in the broker configuration by " + + " group.min.session.timeout.ms and group.max.session.timeout.ms."; + + static { + CONFIG = new ConfigDef().define(GROUP_ID_CONFIG, + ConfigDef.Type.STRING, + "xinfra-monitor-leader", + ConfigDef.Importance.HIGH, + GROUP_ID_DOC) + .define(GROUP_INSTANCE_ID_CONFIG, + ConfigDef.Type.STRING, + null, + ConfigDef.Importance.MEDIUM, + GROUP_INSTANCE_ID_DOC) + .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, + ConfigDef.Type.STRING, + CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL, + ConfigDef.Importance.MEDIUM, + CommonClientConfigs.SECURITY_PROTOCOL_DOC) + .define(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG, + ConfigDef.Type.LONG, + 100L, + ConfigDef.Range.atLeast(0L), + ConfigDef.Importance.LOW, + CommonClientConfigs.RETRY_BACKOFF_MS_DOC) + .define(CommonClientConfigs.METADATA_MAX_AGE_CONFIG, + ConfigDef.Type.LONG, + TimeUnit.MINUTES.toMillis(5), + ConfigDef.Range.atLeast(0), + ConfigDef.Importance.LOW, + CommonClientConfigs.METADATA_MAX_AGE_DOC) + .define(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG, + ConfigDef.Type.LONG, + 50L, + ConfigDef.Range.atLeast(0L), + ConfigDef.Importance.LOW, + CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC) + .define(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG, + ConfigDef.Type.LONG, + TimeUnit.SECONDS.toMillis(1), + ConfigDef.Range.atLeast(0L), + ConfigDef.Importance.LOW, + CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_DOC) + .define(CommonClientConfigs.SEND_BUFFER_CONFIG, + ConfigDef.Type.INT, + 128 * 1024, + ConfigDef.Range.atLeast(0), + ConfigDef.Importance.MEDIUM, + CommonClientConfigs.SEND_BUFFER_DOC) + .define(CommonClientConfigs.RECEIVE_BUFFER_CONFIG, + ConfigDef.Type.INT, + 32 * 1024, + ConfigDef.Range.atLeast(0), + ConfigDef.Importance.MEDIUM, + CommonClientConfigs.RECEIVE_BUFFER_DOC) + .define(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG, + ConfigDef.Type.INT, + Math.toIntExact(TimeUnit.SECONDS.toMillis(40)), + ConfigDef.Range.atLeast(0), + ConfigDef.Importance.MEDIUM, + CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC) + .define(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG, + ConfigDef.Type.STRING, + ClientDnsLookup.USE_ALL_DNS_IPS.toString(), + ConfigDef.ValidString.in(ClientDnsLookup.USE_ALL_DNS_IPS.toString(), + ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString()), + ConfigDef.Importance.MEDIUM, + CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC) + .define(HEARTBEAT_INTERVAL_MS_CONFIG, + ConfigDef.Type.INT, + Math.toIntExact(TimeUnit.SECONDS.toMillis(3)), + ConfigDef.Importance.HIGH, + HEARTBEAT_INTERVAL_MS_DOC) + .define(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG, + ConfigDef.Type.LONG, + 9 * 60 * 1000, + ConfigDef.Importance.MEDIUM, + CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC) + .define(METRIC_GROUP_PREFIX_CONFIG, + ConfigDef.Type.STRING, + "xinfra-leader-election", + ConfigDef.Importance.LOW, + METRIC_GROUP_PREFIX_DOC) + .define(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, + ConfigDef.Type.LIST, + Collections.emptyList(), + new ConfigDef.NonNullValidator(), + ConfigDef.Importance.HIGH, + CommonClientConfigs.BOOTSTRAP_SERVERS_DOC) + .define(MAX_POLL_INTERVAL_MS_CONFIG, + ConfigDef.Type.INT, + 60000, + ConfigDef.Range.atLeast(1), + ConfigDef.Importance.MEDIUM, + MAX_POLL_INTERVAL_MS_DOC) + .define(SESSION_TIMEOUT_MS_CONFIG, + ConfigDef.Type.INT, + 10000, + ConfigDef.Importance.HIGH, + SESSION_TIMEOUT_MS_DOC) + .define(CommonClientConfigs.CLIENT_ID_CONFIG, + ConfigDef.Type.STRING, + "", + ConfigDef.Importance.LOW, + CommonClientConfigs.CLIENT_ID_DOC) + .define(CommonClientConfigs.REBALANCE_TIMEOUT_MS_CONFIG, + ConfigDef.Type.INT, + Math.toIntExact(TimeUnit.MINUTES.toMillis(1)), + ConfigDef.Importance.HIGH, + CommonClientConfigs.REBALANCE_TIMEOUT_MS_DOC) + .withClientSaslSupport(); + } + + public HAMonitoringConfig(Map props) { + super(CONFIG, props); + } +}