Skip to content

Commit

Permalink
Boolean switch for XM's enabling and disabling topic management servi…
Browse files Browse the repository at this point in the history
…ce (#326)

Boolean switch for XM's enabling and disabling topic management service


Signed-off-by: Andrew Choi <[email protected]>
  • Loading branch information
Andrew Choi authored Nov 11, 2020
1 parent e65f6e7 commit 59fbcf6
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 31 deletions.
17 changes: 9 additions & 8 deletions config/xinfra-monitor.properties
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
"bootstrap.servers": "localhost:9092,localhost:9093",
"request.timeout.ms": 9000,
"produce.record.delay.ms": 100,
"topic-management.topicManagementEnabled": true,
"topic-management.topicCreationEnabled": true,
"topic-management.replicationFactor" : 1,
"topic-management.partitionsToBrokersRatio" : 2.0,
Expand All @@ -69,14 +70,14 @@
}
},

"offset-commit-service": {
"class.name": "com.linkedin.xinfra.monitor.services.OffsetCommitService",
"zookeeper.connect": "localhost:2181",
"bootstrap.servers": "localhost:9092,localhost:9093",
"consumer.props": {
"group.id": "target-consumer-group"
}
},
"offset-commit-service": {
"class.name": "com.linkedin.xinfra.monitor.services.OffsetCommitService",
"zookeeper.connect": "localhost:2181",
"bootstrap.servers": "localhost:9092,localhost:9093",
"consumer.props": {
"group.id": "target-consumer-group"
}
},

"jolokia-service": {
"class.name": "com.linkedin.xinfra.monitor.services.JolokiaService"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.linkedin.xinfra.monitor.common.Utils.prettyPrint;

/*
* The SingleClusterMonitor app is intended to monitor the performance and availability of a given Kafka cluster. It creates
* one producer and one consumer with the given configuration, produces messages with increasing integer in the
Expand All @@ -55,53 +57,76 @@ public class SingleClusterMonitor implements App {
private final TopicManagementService _topicManagementService;
private final String _name;
private final List<Service> _allServices;
private final boolean _isTopicManagementServiceEnabled;

public SingleClusterMonitor(Map<String, Object> props, String name) throws Exception {
ConsumerFactory consumerFactory = new ConsumerFactoryImpl(props);
_name = name;
_topicManagementService = new TopicManagementService(props, name);
CompletableFuture<Void> topicPartitionResult = _topicManagementService.topicPartitionResult();
LOG.info("SingleClusterMonitor properties: {}", prettyPrint(props));
TopicManagementServiceConfig config = new TopicManagementServiceConfig(props);
_isTopicManagementServiceEnabled =
config.getBoolean(TopicManagementServiceConfig.TOPIC_MANAGEMENT_ENABLED_CONFIG);
_allServices = new ArrayList<>(SERVICES_INITIAL_CAPACITY);
CompletableFuture<Void> topicPartitionResult;
if (_isTopicManagementServiceEnabled) {
_topicManagementService = new TopicManagementService(props, name);
topicPartitionResult = _topicManagementService.topicPartitionResult();

// block on the MultiClusterTopicManagementService to complete.
topicPartitionResult.get();
// block on the MultiClusterTopicManagementService to complete.
topicPartitionResult.get();

_allServices.add(_topicManagementService);
} else {
_topicManagementService = null;
topicPartitionResult = new CompletableFuture<>();
topicPartitionResult.complete(null);

}
ProduceService produceService = new ProduceService(props, name);
ConsumeService consumeService = new ConsumeService(name, topicPartitionResult, consumerFactory);

_allServices = new ArrayList<>(SERVICES_INITIAL_CAPACITY);
_allServices.add(_topicManagementService);
_allServices.add(produceService);
_allServices.add(consumeService);
}

@Override
public void start() throws Exception {
_topicManagementService.start();
CompletableFuture<Void> topicPartitionResult = _topicManagementService.topicPartitionResult();
try {
if (_isTopicManagementServiceEnabled) {
_topicManagementService.start();
CompletableFuture<Void> topicPartitionResult = _topicManagementService.topicPartitionResult();

try {
/* Delay 2 second to reduce the chance that produce and consumer thread has race condition
with TopicManagementService and MultiClusterTopicManagementService */
long threadSleepMs = TimeUnit.SECONDS.toMillis(2);
Thread.sleep(threadSleepMs);
} catch (InterruptedException e) {
throw new Exception("Interrupted while sleeping the thread", e);
}
CompletableFuture<Void> topicPartitionFuture = topicPartitionResult.thenRun(() -> {
long threadSleepMs = TimeUnit.SECONDS.toMillis(2);
Thread.sleep(threadSleepMs);
} catch (InterruptedException e) {
throw new Exception("Interrupted while sleeping the thread", e);
}
CompletableFuture<Void> topicPartitionFuture = topicPartitionResult.thenRun(() -> {
for (Service service : _allServices) {
if (!service.isRunning()) {
LOG.debug("Now starting {}", service.getServiceName());
service.start();
}
}
});

try {
topicPartitionFuture.get();
} catch (InterruptedException | ExecutionException e) {
throw new Exception("Exception occurred while getting the TopicPartitionFuture", e);
}

} else {
for (Service service : _allServices) {
if (!service.isRunning()) {
LOG.debug("Now starting {}", service.getServiceName());
service.start();
}
}
});

try {
topicPartitionFuture.get();
} catch (InterruptedException | ExecutionException e) {
throw new Exception("Exception occurred while getting the TopicPartitionFuture", e);
}

LOG.info(_name + "/SingleClusterMonitor started.");
LOG.info(_name + "/SingleClusterMonitor started!");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,16 @@ public class TopicManagementServiceConfig extends AbstractConfig {
public static final String TOPIC_PROPS_CONFIG = "topic-management.topic.props";
public static final String TOPIC_PROPS_DOC = "A configuration map for the topic";

public static final String TOPIC_MANAGEMENT_ENABLED_CONFIG = "topic-management.topicManagementEnabled";
public static final String TOPIC_MANAGEMENT_ENABLED_DOC = "Boolean switch for enabling Topic Management Service";

static {
CONFIG = new ConfigDef()
.define(TOPIC_MANAGEMENT_ENABLED_CONFIG,
ConfigDef.Type.BOOLEAN,
true,
ConfigDef.Importance.HIGH,
TOPIC_MANAGEMENT_ENABLED_DOC)
.define(ZOOKEEPER_CONNECT_CONFIG,
ConfigDef.Type.STRING,
ConfigDef.Importance.HIGH,
Expand Down

0 comments on commit 59fbcf6

Please sign in to comment.