Skip to content

Commit

Permalink
Merge pull request #168 from SamBarker/primitive-predicates
Browse files Browse the repository at this point in the history
Switch from Predicate<Integer> to IntPredicate
  • Loading branch information
SamBarker authored Aug 28, 2023
2 parents 1418b32 + 503a0de commit 3bad89f
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import java.util.function.IntPredicate;

/**
* A KafkaCluster, from which is it possible to create/connect clients.
Expand Down Expand Up @@ -67,17 +67,17 @@ public interface KafkaCluster extends AutoCloseable {
* @param nodeIdPredicate predicate that returns true if the node identified by the given nodeId should be restarted
* @param terminationStyle the style of termination used to shut down the broker(s).
*/
void stopNodes(Predicate<Integer> nodeIdPredicate, TerminationStyle terminationStyle);
void stopNodes(IntPredicate nodeIdPredicate, TerminationStyle terminationStyle);

/**
* Starts node(s) identified by the supplied predicate. Use this method to restart node(s)
* previously stopped by {@link #stopNodes(Predicate, TerminationStyle)}.
* previously stopped by {@link #stopNodes(IntPredicate, TerminationStyle)}.
* <br/>
* Starting a node that is already started has no effect.
*
* @param nodeIdPredicate predicate that returns true if the node identified by the given nodeId should be restarted
*/
void startNodes(Predicate<Integer> nodeIdPredicate);
void startNodes(IntPredicate nodeIdPredicate);

/**
* stops the cluster.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.IntPredicate;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -332,7 +333,7 @@ public synchronized void removeBroker(int nodeId) throws IllegalArgumentExceptio
}

@Override
public synchronized void stopNodes(Predicate<Integer> nodeIdPredicate, TerminationStyle terminationStyle) {
public synchronized void stopNodes(IntPredicate nodeIdPredicate, TerminationStyle terminationStyle) {
var kafkaServersToStop = servers.entrySet().stream()
.filter(e -> nodeIdPredicate.test(e.getKey()))
.filter(e -> !stoppedServers.contains(e.getKey()))
Expand All @@ -344,7 +345,7 @@ public synchronized void stopNodes(Predicate<Integer> nodeIdPredicate, Terminati
}

@Override
public synchronized void startNodes(Predicate<Integer> nodeIdPredicate) {
public synchronized void startNodes(IntPredicate nodeIdPredicate) {
var kafkaServersToStart = servers.entrySet().stream()
.filter(e -> nodeIdPredicate.test(e.getKey()))
.filter(e -> stoppedServers.contains(e.getKey()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.IntPredicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
Expand Down Expand Up @@ -413,7 +413,7 @@ private void gracefulStop(KafkaContainer kafkaContainer) {
}

@Override
public synchronized void stopNodes(Predicate<Integer> nodeIdPredicate, TerminationStyle terminationStyle) {
public synchronized void stopNodes(IntPredicate nodeIdPredicate, TerminationStyle terminationStyle) {
var kafkaContainersToStop = nodes.entrySet().stream()
.filter(e -> nodeIdPredicate.test(e.getKey()))
.filter(e -> !stoppedBrokers.contains(e.getKey()))
Expand Down Expand Up @@ -470,7 +470,7 @@ public synchronized void stopNodes(Predicate<Integer> nodeIdPredicate, Terminati
}

@Override
public synchronized void startNodes(Predicate<Integer> nodeIdPredicate) {
public synchronized void startNodes(IntPredicate nodeIdPredicate) {
var kafkaContainersToStart = nodes.entrySet().stream()
.filter(e -> nodeIdPredicate.test(e.getKey()))
.filter(e -> stoppedBrokers.contains(e.getKey()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.function.IntPredicate;
import java.util.stream.Stream;

import org.apache.kafka.clients.admin.Admin;
Expand Down Expand Up @@ -133,16 +133,16 @@ void kafkaClusterAddBroker(boolean kraft) throws Exception {

public static Stream<Arguments> stopAndStartBrokers() {
return Stream.of(
Arguments.of(1, true, TerminationStyle.ABRUPT, (Predicate<Integer>) node -> true),
Arguments.of(2, true, TerminationStyle.ABRUPT, (Predicate<Integer>) node -> node == 1),
Arguments.of(2, true, TerminationStyle.ABRUPT, (Predicate<Integer>) node -> true),
Arguments.of(1, true, TerminationStyle.GRACEFUL, (Predicate<Integer>) node -> true),
Arguments.of(1, false, TerminationStyle.ABRUPT, (Predicate<Integer>) node -> true));
Arguments.of(1, true, TerminationStyle.ABRUPT, (IntPredicate) node -> true),
Arguments.of(2, true, TerminationStyle.ABRUPT, (IntPredicate) node -> node == 1),
Arguments.of(2, true, TerminationStyle.ABRUPT, (IntPredicate) node -> true),
Arguments.of(1, true, TerminationStyle.GRACEFUL, (IntPredicate) node -> true),
Arguments.of(1, false, TerminationStyle.ABRUPT, (IntPredicate) node -> true));
}

@ParameterizedTest
@MethodSource
void stopAndStartBrokers(int brokersNum, boolean kraft, TerminationStyle terminationStyle, Predicate<Integer> brokerPredicate) throws Exception {
void stopAndStartBrokers(int brokersNum, boolean kraft, TerminationStyle terminationStyle, IntPredicate brokerPredicate) throws Exception {
try (var cluster = KafkaClusterFactory.create(KafkaClusterConfig.builder()
.testInfo(testInfo)
.brokersNum(brokersNum)
Expand Down

0 comments on commit 3bad89f

Please sign in to comment.