From 59fbcf66d5e8f2158349e07c39f63d6dda460d36 Mon Sep 17 00:00:00 2001 From: Andrew Choi Date: Tue, 10 Nov 2020 17:33:40 -0800 Subject: [PATCH] Boolean switch for XM's enabling and disabling topic management service (#326) Boolean switch for XM's enabling and disabling topic management service Signed-off-by: Andrew Choi --- config/xinfra-monitor.properties | 17 ++--- .../monitor/apps/SingleClusterMonitor.java | 71 +++++++++++++------ .../configs/TopicManagementServiceConfig.java | 8 +++ 3 files changed, 65 insertions(+), 31 deletions(-) diff --git a/config/xinfra-monitor.properties b/config/xinfra-monitor.properties index 358a3af0..d4d7cd38 100644 --- a/config/xinfra-monitor.properties +++ b/config/xinfra-monitor.properties @@ -50,6 +50,7 @@ "bootstrap.servers": "localhost:9092,localhost:9093", "request.timeout.ms": 9000, "produce.record.delay.ms": 100, + "topic-management.topicManagementEnabled": true, "topic-management.topicCreationEnabled": true, "topic-management.replicationFactor" : 1, "topic-management.partitionsToBrokersRatio" : 2.0, @@ -69,14 +70,14 @@ } }, - "offset-commit-service": { - "class.name": "com.linkedin.xinfra.monitor.services.OffsetCommitService", - "zookeeper.connect": "localhost:2181", - "bootstrap.servers": "localhost:9092,localhost:9093", - "consumer.props": { - "group.id": "target-consumer-group" - } - }, + "offset-commit-service": { + "class.name": "com.linkedin.xinfra.monitor.services.OffsetCommitService", + "zookeeper.connect": "localhost:2181", + "bootstrap.servers": "localhost:9092,localhost:9093", + "consumer.props": { + "group.id": "target-consumer-group" + } + }, "jolokia-service": { "class.name": "com.linkedin.xinfra.monitor.services.JolokiaService" diff --git a/src/main/java/com/linkedin/xinfra/monitor/apps/SingleClusterMonitor.java b/src/main/java/com/linkedin/xinfra/monitor/apps/SingleClusterMonitor.java index 1d94b1f8..a4da4b3b 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/apps/SingleClusterMonitor.java +++ b/src/main/java/com/linkedin/xinfra/monitor/apps/SingleClusterMonitor.java @@ -39,6 +39,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static com.linkedin.xinfra.monitor.common.Utils.prettyPrint; + /* * The SingleClusterMonitor app is intended to monitor the performance and availability of a given Kafka cluster. It creates * one producer and one consumer with the given configuration, produces messages with increasing integer in the @@ -55,53 +57,76 @@ public class SingleClusterMonitor implements App { private final TopicManagementService _topicManagementService; private final String _name; private final List _allServices; + private final boolean _isTopicManagementServiceEnabled; public SingleClusterMonitor(Map props, String name) throws Exception { ConsumerFactory consumerFactory = new ConsumerFactoryImpl(props); _name = name; - _topicManagementService = new TopicManagementService(props, name); - CompletableFuture topicPartitionResult = _topicManagementService.topicPartitionResult(); + LOG.info("SingleClusterMonitor properties: {}", prettyPrint(props)); + TopicManagementServiceConfig config = new TopicManagementServiceConfig(props); + _isTopicManagementServiceEnabled = + config.getBoolean(TopicManagementServiceConfig.TOPIC_MANAGEMENT_ENABLED_CONFIG); + _allServices = new ArrayList<>(SERVICES_INITIAL_CAPACITY); + CompletableFuture topicPartitionResult; + if (_isTopicManagementServiceEnabled) { + _topicManagementService = new TopicManagementService(props, name); + topicPartitionResult = _topicManagementService.topicPartitionResult(); - // block on the MultiClusterTopicManagementService to complete. - topicPartitionResult.get(); + // block on the MultiClusterTopicManagementService to complete. + topicPartitionResult.get(); + _allServices.add(_topicManagementService); + } else { + _topicManagementService = null; + topicPartitionResult = new CompletableFuture<>(); + topicPartitionResult.complete(null); + + } ProduceService produceService = new ProduceService(props, name); ConsumeService consumeService = new ConsumeService(name, topicPartitionResult, consumerFactory); - - _allServices = new ArrayList<>(SERVICES_INITIAL_CAPACITY); - _allServices.add(_topicManagementService); _allServices.add(produceService); _allServices.add(consumeService); } @Override public void start() throws Exception { - _topicManagementService.start(); - CompletableFuture topicPartitionResult = _topicManagementService.topicPartitionResult(); - try { + if (_isTopicManagementServiceEnabled) { + _topicManagementService.start(); + CompletableFuture topicPartitionResult = _topicManagementService.topicPartitionResult(); + + try { /* Delay 2 second to reduce the chance that produce and consumer thread has race condition with TopicManagementService and MultiClusterTopicManagementService */ - long threadSleepMs = TimeUnit.SECONDS.toMillis(2); - Thread.sleep(threadSleepMs); - } catch (InterruptedException e) { - throw new Exception("Interrupted while sleeping the thread", e); - } - CompletableFuture topicPartitionFuture = topicPartitionResult.thenRun(() -> { + long threadSleepMs = TimeUnit.SECONDS.toMillis(2); + Thread.sleep(threadSleepMs); + } catch (InterruptedException e) { + throw new Exception("Interrupted while sleeping the thread", e); + } + CompletableFuture topicPartitionFuture = topicPartitionResult.thenRun(() -> { + for (Service service : _allServices) { + if (!service.isRunning()) { + LOG.debug("Now starting {}", service.getServiceName()); + service.start(); + } + } + }); + + try { + topicPartitionFuture.get(); + } catch (InterruptedException | ExecutionException e) { + throw new Exception("Exception occurred while getting the TopicPartitionFuture", e); + } + + } else { for (Service service : _allServices) { if (!service.isRunning()) { LOG.debug("Now starting {}", service.getServiceName()); service.start(); } } - }); - - try { - topicPartitionFuture.get(); - } catch (InterruptedException | ExecutionException e) { - throw new Exception("Exception occurred while getting the TopicPartitionFuture", e); } - LOG.info(_name + "/SingleClusterMonitor started."); + LOG.info(_name + "/SingleClusterMonitor started!"); } @Override diff --git a/src/main/java/com/linkedin/xinfra/monitor/services/configs/TopicManagementServiceConfig.java b/src/main/java/com/linkedin/xinfra/monitor/services/configs/TopicManagementServiceConfig.java index 9658400f..855cbf1d 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/services/configs/TopicManagementServiceConfig.java +++ b/src/main/java/com/linkedin/xinfra/monitor/services/configs/TopicManagementServiceConfig.java @@ -60,8 +60,16 @@ public class TopicManagementServiceConfig extends AbstractConfig { public static final String TOPIC_PROPS_CONFIG = "topic-management.topic.props"; public static final String TOPIC_PROPS_DOC = "A configuration map for the topic"; + public static final String TOPIC_MANAGEMENT_ENABLED_CONFIG = "topic-management.topicManagementEnabled"; + public static final String TOPIC_MANAGEMENT_ENABLED_DOC = "Boolean switch for enabling Topic Management Service"; + static { CONFIG = new ConfigDef() + .define(TOPIC_MANAGEMENT_ENABLED_CONFIG, + ConfigDef.Type.BOOLEAN, + true, + ConfigDef.Importance.HIGH, + TOPIC_MANAGEMENT_ENABLED_DOC) .define(ZOOKEEPER_CONNECT_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH,