Skip to content

Commit

Permalink
Tidyup without breaking tests
Browse files Browse the repository at this point in the history
Signed-off-by: Tom Bentley <[email protected]>
  • Loading branch information
tombentley committed Sep 13, 2023
1 parent a0a648f commit c2b48cb
Showing 1 changed file with 13 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ public synchronized String getBootstrapServers() {

private synchronized String buildBrokerList(Function<Integer, KafkaClusterConfig.KafkaEndpoints.EndpointPair> endpointFunc) {
return servers.keySet().stream()
.filter(nodeId -> clusterConfig.hasBrokerRole(nodeId))
.filter(this::isBroker)
.map(endpointFunc)
.map(KafkaClusterConfig.KafkaEndpoints.EndpointPair::connectAddress)
.collect(Collectors.joining(","));
Expand Down Expand Up @@ -406,8 +406,18 @@ public synchronized void close() throws Exception {
*/
private void roleOrderedShutdown(Map<Integer, Server> servers) {
// Shutdown servers without a controller port first.
shutdownServers(servers, e -> clusterConfig.isPureBroker(e.getKey()));
shutdownServers(servers, e -> clusterConfig.hasControllerRole(e.getKey()));
shutdownServers(servers, e -> !isController(e.getKey()));
shutdownServers(servers, e -> isController(e.getKey()));
}

private boolean isController(Integer key) {
return key < clusterConfig.numNodes() // dynamically added nodes are always pure brokers
&& clusterConfig.hasControllerRole(key);
}

private boolean isBroker(Integer key) {
return key >= clusterConfig.numNodes() // dynamically added nodes are always pure brokers
|| clusterConfig.hasBrokerRole(key);
}

@SuppressWarnings("java:S3864") // Stream.peek is being used with caution.
Expand Down

0 comments on commit c2b48cb

Please sign in to comment.