diff --git a/src/main/java/com/linkedin/xinfra/monitor/services/ClusterTopicManipulationService.java b/src/main/java/com/linkedin/xinfra/monitor/services/ClusterTopicManipulationService.java index 61c8101d..ded736db 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/services/ClusterTopicManipulationService.java +++ b/src/main/java/com/linkedin/xinfra/monitor/services/ClusterTopicManipulationService.java @@ -163,7 +163,7 @@ private void createDeleteClusterTopic() { try { int brokerCount = _adminClient.describeCluster().nodes().get().size(); - Set blackListedBrokers = _topicFactory.getBlackListedBrokers(_zkConnect); + Set blackListedBrokers = _topicFactory.getBlackListedBrokers(_adminClient); Set brokers = new HashSet<>(); for (Node broker : _adminClient.describeCluster().nodes().get()) { BrokerMetadata brokerMetadata = new BrokerMetadata(broker.id(), null); diff --git a/src/main/java/com/linkedin/xinfra/monitor/services/MultiClusterTopicManagementService.java b/src/main/java/com/linkedin/xinfra/monitor/services/MultiClusterTopicManagementService.java index 25a12759..c7f6eb94 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/services/MultiClusterTopicManagementService.java +++ b/src/main/java/com/linkedin/xinfra/monitor/services/MultiClusterTopicManagementService.java @@ -313,7 +313,7 @@ void maybeAddPartitions(int minPartitionNum) throws ExecutionException, Interrup if (partitionNum < minPartitionNum) { LOGGER.info("{} will increase partition of the topic {} in the cluster from {}" + " to {}.", this.getClass().toString(), _topic, partitionNum, minPartitionNum); - Set blackListedBrokers = _topicFactory.getBlackListedBrokers(_zkConnect); + Set blackListedBrokers = _topicFactory.getBlackListedBrokers(_adminClient); Set brokers = new HashSet<>(); for (Node broker : _adminClient.describeCluster().nodes().get()) { BrokerMetadata brokerMetadata = new BrokerMetadata(broker.id(), null); @@ -409,7 +409,7 @@ int numPartitions() throws InterruptedException, ExecutionException { private Set getAvailableBrokers() throws ExecutionException, InterruptedException { Set brokers = new HashSet<>(_adminClient.describeCluster().nodes().get()); - Set blackListedBrokers = _topicFactory.getBlackListedBrokers(_zkConnect); + Set blackListedBrokers = _topicFactory.getBlackListedBrokers(_adminClient); brokers.removeIf(broker -> blackListedBrokers.contains(broker.id())); return brokers; } diff --git a/src/main/java/com/linkedin/xinfra/monitor/topicfactory/DefaultTopicFactory.java b/src/main/java/com/linkedin/xinfra/monitor/topicfactory/DefaultTopicFactory.java index a8f57b96..26f29d86 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/topicfactory/DefaultTopicFactory.java +++ b/src/main/java/com/linkedin/xinfra/monitor/topicfactory/DefaultTopicFactory.java @@ -32,7 +32,7 @@ public int createTopicIfNotExist(String topic, short replicationFactor, double p } @Override - public Set getBlackListedBrokers(String zkUrl) { + public Set getBlackListedBrokers(AdminClient adminClient) { return Collections.emptySet(); } } diff --git a/src/main/java/com/linkedin/xinfra/monitor/topicfactory/TopicFactory.java b/src/main/java/com/linkedin/xinfra/monitor/topicfactory/TopicFactory.java index f638239b..7473d62d 100644 --- a/src/main/java/com/linkedin/xinfra/monitor/topicfactory/TopicFactory.java +++ b/src/main/java/com/linkedin/xinfra/monitor/topicfactory/TopicFactory.java @@ -39,9 +39,9 @@ int createTopicIfNotExist(String topic, short replicationFactor, double partitio throws ExecutionException, InterruptedException; /** - * @param zkUrl zookeeper connection url + * @param adminClient AdminClient object * @return A set of brokers that don't take new partitions or reassigned partitions for topics. */ - Set getBlackListedBrokers(String zkUrl); + Set getBlackListedBrokers(AdminClient adminClient); }