From fcda683c7431a9097c03163c035428e81229f2b4 Mon Sep 17 00:00:00 2001 From: Tom Bentley Date: Tue, 12 Sep 2023 15:00:30 +1200 Subject: [PATCH 1/9] Fix #176: Changes needed to get the tests working, at least Signed-off-by: Tom Bentley --- .../testing/kafka/api/KafkaCluster.java | 4 +- .../testing/kafka/common/ConstraintUtils.java | 4 +- .../testing/kafka/common/KRaftCluster.java | 39 +++- .../kafka/common/KafkaClusterConfig.java | 205 +++++++++++------- .../kafka/common/KafkaClusterFactory.java | 14 +- .../testing/kafka/common/MetadataMode.java | 23 ++ .../testing/kafka/common/NodeRole.java | 49 +++++ .../testing/kafka/invm/InVMKafkaCluster.java | 59 +++-- .../TestcontainersKafkaCluster.java | 39 +++- .../testing/kafka/KafkaClusterTest.java | 173 +++++++++------ .../kafka/common/KafkaClusterConfigTest.java | 140 +++++++++++- .../kafka/common/KafkaClusterFactoryTest.java | 24 +- .../TestcontainersKafkaClusterTest.java | 11 +- 13 files changed, 569 insertions(+), 215 deletions(-) create mode 100644 impl/src/main/java/io/kroxylicious/testing/kafka/common/MetadataMode.java create mode 100644 impl/src/main/java/io/kroxylicious/testing/kafka/common/NodeRole.java diff --git a/api/src/main/java/io/kroxylicious/testing/kafka/api/KafkaCluster.java b/api/src/main/java/io/kroxylicious/testing/kafka/api/KafkaCluster.java index c2448870..706c03b5 100644 --- a/api/src/main/java/io/kroxylicious/testing/kafka/api/KafkaCluster.java +++ b/api/src/main/java/io/kroxylicious/testing/kafka/api/KafkaCluster.java @@ -86,8 +86,8 @@ public interface KafkaCluster extends AutoCloseable { void close() throws Exception; /** - * Gets the number of brokers in the cluster, including any that are stopped. - * @return the size of the cluster. + * Gets the number of brokers in the cluster, including any that are stopped, excluding any pure controller nodes. + * @return the number of brokers in the cluster. */ int getNumOfBrokers(); diff --git a/impl/src/main/java/io/kroxylicious/testing/kafka/common/ConstraintUtils.java b/impl/src/main/java/io/kroxylicious/testing/kafka/common/ConstraintUtils.java index ad1d7b6c..4b97e57f 100644 --- a/impl/src/main/java/io/kroxylicious/testing/kafka/common/ConstraintUtils.java +++ b/impl/src/main/java/io/kroxylicious/testing/kafka/common/ConstraintUtils.java @@ -84,7 +84,9 @@ public static ClusterId clusterId(String clusterId) { * @return the kraft cluster */ public static KRaftCluster kraftCluster(int numControllers) { - return mkAnnotation(KRaftCluster.class, Map.of("numControllers", numControllers)); + return mkAnnotation(KRaftCluster.class, Map.of( + "numControllers", numControllers, + "combinedMode", true)); } /** diff --git a/impl/src/main/java/io/kroxylicious/testing/kafka/common/KRaftCluster.java b/impl/src/main/java/io/kroxylicious/testing/kafka/common/KRaftCluster.java index af9cff62..8a72b00c 100644 --- a/impl/src/main/java/io/kroxylicious/testing/kafka/common/KRaftCluster.java +++ b/impl/src/main/java/io/kroxylicious/testing/kafka/common/KRaftCluster.java @@ -17,25 +17,42 @@ /** * Annotation constraining a {@link KafkaClusterProvisioningStrategy} to use * a {@link KafkaCluster} that is KRaft-based. + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
Breakdown of the interaction between numControllers and numBrokers
numBrokersnumControllerscombinedModeroles
11true"broker,controller"
11false"broker", 1×"controller"
31true"broker,controller", 2×"broker"
31false"broker", 1×"controller",
13true"broker,controller", 2×"controller"
13false"broker", 3×"controller"
33true"broker,controller"
33true"broker", 3×"controller"
*/ @Target({ ElementType.PARAMETER, ElementType.FIELD }) @Retention(RetentionPolicy.RUNTIME) @KafkaClusterConstraint public @interface KRaftCluster { /** - * The number of kraft controllers - * The extension will ensure there are enough nodes started with the controller role. - * The extension will combine this with the numBrokers to generate a cluster topology. - * - * - * - * - * - * - * - *
Breakdown of the interaction between numControllers and numBrokers
numBrokersnumControllersroles
11"broker,controller"
31"broker,controller", "broker", "broker"
13"broker,controller", "controller", "controller"
33"broker,controller", "broker,controller", "broker,controller"
+ * The number of kraft controllers. + * The extension will ensure there are this many nodes started with the controller role. + * combining this with the numBrokers and combinedMode to generate a cluster topology. + * + * See the class JavaDoc for example topologies. * @return The number of KRaft controllers */ public int numControllers() default 1; + /** + * Whether to use combined mode, where controllers can share a JVM with brokers. + * See the class JavaDoc for example topologies. + * @return true to use combined mode. + */ + public boolean combinedMode() default true; + } diff --git a/impl/src/main/java/io/kroxylicious/testing/kafka/common/KafkaClusterConfig.java b/impl/src/main/java/io/kroxylicious/testing/kafka/common/KafkaClusterConfig.java index 1e4f1122..b86b10ac 100644 --- a/impl/src/main/java/io/kroxylicious/testing/kafka/common/KafkaClusterConfig.java +++ b/impl/src/main/java/io/kroxylicious/testing/kafka/common/KafkaClusterConfig.java @@ -12,6 +12,7 @@ import java.nio.file.Paths; import java.security.GeneralSecurityException; import java.util.ArrayList; +import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -69,10 +70,7 @@ public class KafkaClusterConfig { */ private final KafkaClusterExecutionMode execMode; - /** - * if true, the cluster will be brought up in Kraft-mode - */ - private final Boolean kraftMode; + private final MetadataMode metadataMode; /** * Kafka version to be used for deploying kafka in container mode, e.g. "3.3.1". @@ -87,7 +85,7 @@ public class KafkaClusterConfig { private final String saslMechanism; private final String securityProtocol; @Builder.Default - private Integer brokersNum = 1; + private final Integer brokersNum = 1; @Builder.Default private Integer kraftControllers = 1; @@ -136,6 +134,7 @@ public static KafkaClusterConfig fromConstraints(List annotations, T var builder = builder(); builder.testInfo(testInfo); builder.brokersNum(1); + builder.metadataMode(MetadataMode.KRAFT_COMBINED); boolean sasl = false; boolean tls = false; for (Annotation annotation : annotations) { @@ -143,11 +142,11 @@ public static KafkaClusterConfig fromConstraints(List annotations, T builder.brokersNum(brokerCluster.numBrokers()); } if (annotation instanceof KRaftCluster kRaftCluster) { - builder.kraftMode(true); + builder.metadataMode(kRaftCluster.combinedMode() ? MetadataMode.KRAFT_COMBINED : MetadataMode.KRAFT_SEPARATE); builder.kraftControllers(kRaftCluster.numControllers()); } if (annotation instanceof ZooKeeperCluster) { - builder.kraftMode(false); + builder.metadataMode(MetadataMode.ZOOKEEPER); } if (annotation instanceof Tls) { tls = true; @@ -197,13 +196,12 @@ else if (annotation instanceof BrokerConfig brokerConfig) { * @param endPointConfigSupplier the end point config supplier * @return the broker configs */ - public Stream getBrokerConfigs(Supplier endPointConfigSupplier) { + public Stream getNodeConfigs(Supplier endPointConfigSupplier) { List properties = new ArrayList<>(); KafkaEndpoints kafkaEndpoints = endPointConfigSupplier.get(); - final int nodeCount = Math.max(brokersNum, kraftControllers); - for (int brokerNum = 0; brokerNum < nodeCount; brokerNum++) { - final ConfigHolder brokerConfigHolder = generateConfigForSpecificNode(kafkaEndpoints, brokerNum); - properties.add(brokerConfigHolder); + for (int nodeId = 0; nodeId < numNodes(); nodeId++) { + final ConfigHolder nodeConfigHolder = generateConfigForSpecificNode(kafkaEndpoints, nodeId); + properties.add(nodeConfigHolder); } return properties.stream(); @@ -218,7 +216,8 @@ public Stream getBrokerConfigs(Supplier endPointCo */ @NotNull public ConfigHolder generateConfigForSpecificNode(KafkaEndpoints kafkaEndpoints, int nodeId) { - final var role = determineRole(nodeId); + // checkNodeId(nodeId); + final var roles = nodeId >= numNodes() ? EnumSet.of(NodeRole.BROKER) : processRoles(nodeId); Properties nodeConfiguration = new Properties(); nodeConfiguration.putAll(brokerConfigs); @@ -230,18 +229,45 @@ public ConfigHolder generateConfigForSpecificNode(KafkaEndpoints kafkaEndpoints, var earlyStart = new TreeSet(); final ConfigHolder configHolder; - if (role.contains(BROKER_ROLE)) { - configHolder = configureBroker(kafkaEndpoints, nodeId, protocolMap, listeners, advertisedListeners, earlyStart, nodeConfiguration); + if (NodeRole.hasBrokerRole(roles)) { + var interBrokerEndpoint = kafkaEndpoints.getEndpointPair(Listener.INTERNAL, nodeId); + var clientEndpoint = kafkaEndpoints.getEndpointPair(Listener.EXTERNAL, nodeId); + var anonEndpoint = kafkaEndpoints.getEndpointPair(Listener.ANON, nodeId); + + // - EXTERNAL: used for communications to/from consumers/producers optionally with authentication + // - ANON: used for communications to/from consumers/producers without authentication primarily for the extension to validate the cluster + // - INTERNAL: used for inter-broker communications (always no auth) + // - CONTROLLER: used for inter-broker controller communications (kraft - always no auth) + + var externalListenerTransport = securityProtocol == null ? SecurityProtocol.PLAINTEXT.name() : securityProtocol; + configureExternalListener(protocolMap, externalListenerTransport, listeners, clientEndpoint, advertisedListeners); + configureInternalListener(protocolMap, listeners, interBrokerEndpoint, advertisedListeners, earlyStart, nodeConfiguration); + configureAnonListener(protocolMap, listeners, anonEndpoint, advertisedListeners); + configureTls(clientEndpoint, nodeConfiguration); + putConfig(nodeConfiguration, "advertised.listeners", + advertisedListeners.entrySet().stream().map(e -> e.getKey() + ":" + e.getValue()).collect(Collectors.joining(","))); + configHolder = new ConfigHolder(kafkaKraftClusterId, nodeId, roles, nodeConfiguration, clientEndpoint.getConnect().getPort(), + anonEndpoint.getConnect().getPort(), + clientEndpoint.connectAddress()); } else { - configHolder = configureController(kafkaEndpoints, nodeId, nodeConfiguration); + configHolder = new ConfigHolder(kafkaKraftClusterId, + nodeId, + roles, + nodeConfiguration, + null, + null, + null); } - if (isKraftMode()) { - configureKraftNode(kafkaEndpoints, nodeId, nodeConfiguration, protocolMap, listeners, earlyStart, role); + if (this.metadataMode == MetadataMode.KRAFT_SEPARATE) { + configureKraftNode(kafkaEndpoints, nodeId, nodeConfiguration, protocolMap, listeners, earlyStart, roles); } - else { - configureLegacyNode(kafkaEndpoints, nodeConfiguration); + else if (this.metadataMode == null || this.metadataMode == MetadataMode.KRAFT_COMBINED) { + configureKraftNode(kafkaEndpoints, nodeId, nodeConfiguration, protocolMap, listeners, earlyStart, roles); + } + else if (this.metadataMode == MetadataMode.ZOOKEEPER) { + configureLegacyBroker(nodeId, kafkaEndpoints, nodeConfiguration); } putConfig(nodeConfiguration, "listener.security.protocol.map", @@ -267,54 +293,74 @@ public ConfigHolder generateConfigForSpecificNode(KafkaEndpoints kafkaEndpoints, return configHolder; } - @NotNull - private ConfigHolder configureController(KafkaEndpoints kafkaEndpoints, int nodeId, Properties nodeConfiguration) { - return new ConfigHolder(nodeConfiguration, null, null, null, - nodeId, kafkaKraftClusterId); + private void checkNodeId(int nodeId) { + if (nodeId < 0 || nodeId >= numNodes()) { + throw new IllegalArgumentException("Bad node id " + nodeId + "; expected between 0 and " + numNodes() + " inclusive"); + } } - @NotNull - private ConfigHolder configureBroker(KafkaEndpoints kafkaEndpoints, int nodeId, TreeMap protocolMap, TreeMap listeners, - TreeMap advertisedListeners, TreeSet earlyStart, Properties nodeConfiguration) { - final ConfigHolder configHolder; - var interBrokerEndpoint = kafkaEndpoints.getEndpointPair(Listener.INTERNAL, nodeId); - var clientEndpoint = kafkaEndpoints.getEndpointPair(Listener.EXTERNAL, nodeId); - var anonEndpoint = kafkaEndpoints.getEndpointPair(Listener.ANON, nodeId); - - // - EXTERNAL: used for communications to/from consumers/producers optionally with authentication - // - ANON: used for communications to/from consumers/producers without authentication primarily for the extension to validate the cluster - // - INTERNAL: used for inter-broker communications (always no auth) - // - CONTROLLER: used for inter-broker controller communications (kraft - always no auth) - - var externalListenerTransport = securityProtocol == null ? SecurityProtocol.PLAINTEXT.name() : securityProtocol; - configureExternalListener(protocolMap, externalListenerTransport, listeners, clientEndpoint, advertisedListeners); - configureInternalListener(protocolMap, listeners, interBrokerEndpoint, advertisedListeners, earlyStart, nodeConfiguration); - configureAnonListener(protocolMap, listeners, anonEndpoint, advertisedListeners); - configureTls(clientEndpoint, nodeConfiguration); - putConfig(nodeConfiguration, "advertised.listeners", - advertisedListeners.entrySet().stream().map(e -> e.getKey() + ":" + e.getValue()).collect(Collectors.joining(","))); - configHolder = new ConfigHolder(nodeConfiguration, clientEndpoint.getConnect().getPort(), anonEndpoint.getConnect().getPort(), - clientEndpoint.connectAddress(), nodeId, kafkaKraftClusterId); - return configHolder; + /** + * @return The total number of Kafka nodes (excludes any ZooKeeper nodes). + */ + public int numNodes() { + if (metadataMode == null) { + return brokersNum; + } + return metadataMode.numNodes(kraftControllers, brokersNum); } @NotNull - private String determineRole(int nodeId) { - var roles = new ArrayList(); + private EnumSet processRoles(int nodeId) { + // checkNodeId(nodeId); - if (nodeId < brokersNum || isAdditionalNode(nodeId)) { - roles.add(BROKER_ROLE); + if (metadataMode == null) { + return EnumSet.of(NodeRole.BROKER); } - if (nodeId < kraftControllers) { - roles.add(CONTROLLER_ROLE); + switch (metadataMode) { + case KRAFT_COMBINED -> { + if (nodeId > brokersNum - 1) { + return EnumSet.of(NodeRole.CONTROLLER); + } + else if (nodeId > kraftControllers - 1) { + return EnumSet.of(NodeRole.BROKER); + } + else { + return EnumSet.of(NodeRole.CONTROLLER, NodeRole.BROKER); + } + } + case KRAFT_SEPARATE -> { + if (nodeId < kraftControllers) { + return EnumSet.of(NodeRole.CONTROLLER); + } + else { + return EnumSet.of(NodeRole.BROKER); + } + } + case ZOOKEEPER -> { + return EnumSet.of(NodeRole.BROKER); + } + default -> throw new RuntimeException(); } - return String.join(",", roles); } - // additional nodes can only be added after the initial topology is generated. - // Hence, it is safe to assume that a node is additional if it has a higherId than the initial topology would allow for. - private boolean isAdditionalNode(int nodeId) { - return nodeId >= Math.max(brokersNum, kraftControllers); + public boolean isPureController(int nodeId) { + return NodeRole.isPureController(processRoles(nodeId)); + } + + public boolean isCombinedNode(int nodeId) { + return NodeRole.isCombinedNode(processRoles(nodeId)); + } + + public boolean isPureBroker(int nodeId) { + return NodeRole.isPureBroker(processRoles(nodeId)); + } + + public boolean hasBrokerRole(int nodeId) { + return NodeRole.hasBrokerRole(processRoles(nodeId)); + } + + public boolean hasControllerRole(int nodeId) { + return NodeRole.hasControllerRole(processRoles(nodeId)); } private void configureTls(KafkaEndpoints.EndpointPair clientEndpoint, Properties server) { @@ -394,17 +440,20 @@ private static void configureExternalListener(TreeMap protocolMa advertisedListeners.put(EXTERNAL_LISTENER_NAME, clientEndpoint.advertisedAddress()); } - private static void configureLegacyNode(KafkaEndpoints kafkaEndpoints, Properties server) { + private static void configureLegacyBroker(int brokerId, KafkaEndpoints kafkaEndpoints, Properties server) { putConfig(server, "zookeeper.connect", kafkaEndpoints.getEndpointPair(Listener.CONTROLLER, 0).connectAddress()); putConfig(server, "zookeeper.sasl.enabled", "false"); putConfig(server, "zookeeper.connection.timeout.ms", Long.toString(60000)); putConfig(server, KafkaConfig.ZkSessionTimeoutMsProp(), Long.toString(6000)); } - private void configureKraftNode(KafkaEndpoints kafkaEndpoints, int nodeId, Properties nodeConfiguration, TreeMap protocolMap, + private void configureKraftNode(KafkaEndpoints kafkaEndpoints, + int nodeId, + Properties nodeConfiguration, + TreeMap protocolMap, TreeMap listeners, TreeSet earlyStart, - String role) { + EnumSet roles) { putConfig(nodeConfiguration, "node.id", Integer.toString(nodeId)); // Required by Kafka 3.3 onwards. var quorumVoters = IntStream.range(0, kraftControllers) @@ -414,8 +463,8 @@ private void configureKraftNode(KafkaEndpoints kafkaEndpoints, int nodeId, Prope putConfig(nodeConfiguration, "controller.listener.names", CONTROLLER_LISTENER_NAME); protocolMap.put(CONTROLLER_LISTENER_NAME, SecurityProtocol.PLAINTEXT.name()); - putConfig(nodeConfiguration, "process.roles", role); - if (role.contains(CONTROLLER_ROLE)) { + putConfig(nodeConfiguration, "process.roles", NodeRole.forConfig(roles)); + if (NodeRole.hasControllerRole(roles)) { var controllerEndpoint = kafkaEndpoints.getEndpointPair(Listener.CONTROLLER, nodeId); final String bindAddress = controllerEndpoint.getBind().toString(); listeners.put(CONTROLLER_LISTENER_NAME, bindAddress); @@ -559,7 +608,7 @@ public Map getConnectConfigForCluster(String bootstrapServers, S * @return true if kraft mode is used, false otherwise */ public boolean isKraftMode() { - return this.getKraftMode() == null || this.getKraftMode(); + return this.metadataMode == null || this.metadataMode != MetadataMode.ZOOKEEPER; } /** @@ -580,37 +629,39 @@ public static class ConfigHolder { private final Integer externalPort; private final Integer anonPort; private final String endpoint; - private final int brokerNum; + private final int nodeId; private final String kafkaKraftClusterId; - private final String roles; + private final Set roles; /** * Instantiates a new Config holder. * + * @param kafkaKraftClusterId the kafka kraft cluster id + * @param nodeId the node id * @param properties the properties * @param externalPort the external port * @param anonPort the anon port * @param endpoint the endpoint - * @param brokerNum the broker num - * @param kafkaKraftClusterId the kafka kraft cluster id */ @Builder - public ConfigHolder(Properties properties, Integer externalPort, Integer anonPort, String endpoint, int brokerNum, String kafkaKraftClusterId) { + public ConfigHolder(String kafkaKraftClusterId, + int nodeId, + Set roles, + Properties properties, + Integer externalPort, + Integer anonPort, + String endpoint) { this.properties = properties; this.externalPort = externalPort; this.anonPort = anonPort; this.endpoint = endpoint; - this.brokerNum = brokerNum; + this.nodeId = nodeId; this.kafkaKraftClusterId = kafkaKraftClusterId; - this.roles = properties.getProperty("process.roles", BROKER_ROLE); - } - - public boolean isBroker() { - return this.roles.contains(BROKER_ROLE); + this.roles = roles; } - public boolean isController() { - return this.roles.contains(CONTROLLER_ROLE); + public Set roles() { + return roles; } } diff --git a/impl/src/main/java/io/kroxylicious/testing/kafka/common/KafkaClusterFactory.java b/impl/src/main/java/io/kroxylicious/testing/kafka/common/KafkaClusterFactory.java index 7a56180a..d75f1001 100644 --- a/impl/src/main/java/io/kroxylicious/testing/kafka/common/KafkaClusterFactory.java +++ b/impl/src/main/java/io/kroxylicious/testing/kafka/common/KafkaClusterFactory.java @@ -62,18 +62,20 @@ public static KafkaCluster create(KafkaClusterConfig clusterConfig) { } var clusterMode = getExecutionMode(clusterConfig); - var kraftMode = convertClusterKraftMode(System.getenv().get(TEST_CLUSTER_KRAFT_MODE), true); + var kraftMode = convertClusterKraftMode(System.getenv().get(TEST_CLUSTER_KRAFT_MODE), MetadataMode.KRAFT_COMBINED); var builder = clusterConfig.toBuilder(); if (clusterConfig.getExecMode() == null) { builder.execMode(clusterMode); } - if (clusterConfig.getKraftMode() == null) { - builder.kraftMode(kraftMode); + if (clusterConfig.getMetadataMode() == null) { + builder.metadataMode(kraftMode); } - if (KafkaClusterExecutionMode.CONTAINER == clusterMode && kraftMode && clusterConfig.getBrokersNum() < clusterConfig.getKraftControllers()) { + if (KafkaClusterExecutionMode.CONTAINER == clusterMode + && kraftMode != MetadataMode.ZOOKEEPER + && clusterConfig.getBrokersNum() < clusterConfig.getKraftControllers()) { throw new IllegalStateException( "Due to https://github.com/ozangunalp/kafka-native/issues/88 we can't support controller only nodes in " + KafkaClusterExecutionMode.CONTAINER + " mode so we need to fail fast. This cluster has " @@ -104,10 +106,10 @@ private static KafkaClusterExecutionMode getExecutionMode(KafkaClusterConfig clu clusterConfig.getExecMode() == null ? KafkaClusterExecutionMode.IN_VM : clusterConfig.getExecMode()); } - private static boolean convertClusterKraftMode(String mode, boolean defaultMode) { + private static MetadataMode convertClusterKraftMode(String mode, MetadataMode defaultMode) { if (mode == null) { return defaultMode; } - return Boolean.parseBoolean(mode); + return Boolean.parseBoolean(mode) ? MetadataMode.KRAFT_COMBINED : MetadataMode.ZOOKEEPER; } } diff --git a/impl/src/main/java/io/kroxylicious/testing/kafka/common/MetadataMode.java b/impl/src/main/java/io/kroxylicious/testing/kafka/common/MetadataMode.java new file mode 100644 index 00000000..0f7c7061 --- /dev/null +++ b/impl/src/main/java/io/kroxylicious/testing/kafka/common/MetadataMode.java @@ -0,0 +1,23 @@ +/* + * Copyright Kroxylicious Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.kroxylicious.testing.kafka.common; + +public enum MetadataMode { + ZOOKEEPER, + KRAFT_COMBINED, + KRAFT_SEPARATE; + + /** + * @return The total number of Kafka nodes (excludes any ZooKeeper nodes). + */ + public int numNodes(int numControllers, int numBrokers) { + return switch (this) { + case KRAFT_SEPARATE -> numControllers + numBrokers; + case KRAFT_COMBINED -> Math.max(numControllers, numBrokers); + case ZOOKEEPER -> numBrokers; + }; + } +} diff --git a/impl/src/main/java/io/kroxylicious/testing/kafka/common/NodeRole.java b/impl/src/main/java/io/kroxylicious/testing/kafka/common/NodeRole.java new file mode 100644 index 00000000..aa73115b --- /dev/null +++ b/impl/src/main/java/io/kroxylicious/testing/kafka/common/NodeRole.java @@ -0,0 +1,49 @@ +/* + * Copyright Kroxylicious Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.kroxylicious.testing.kafka.common; + +import java.util.Collection; +import java.util.EnumSet; +import java.util.Set; +import java.util.stream.Collectors; + +public enum NodeRole { + BROKER("broker"), + CONTROLLER("controller"); + + private final String configRole; + + private NodeRole(String configRole) { + this.configRole = configRole; + } + + /** + * @return The role, as it can be configured in a Kafka {@code server.properties} file. + */ + public static String forConfig(Collection roles) { + return roles.stream().map(x -> x.configRole).distinct().collect(Collectors.joining(",")); + } + + public static boolean isPureController(Set roles) { + return EnumSet.of(NodeRole.CONTROLLER).equals(roles); + } + + public static boolean isCombinedNode(Set roles) { + return EnumSet.of(NodeRole.CONTROLLER, NodeRole.BROKER).equals(roles); + } + + public static boolean isPureBroker(Set roles) { + return EnumSet.of(NodeRole.BROKER).equals(roles); + } + + public static boolean hasBrokerRole(Set roles) { + return roles.contains(NodeRole.BROKER); + } + + public static boolean hasControllerRole(Set roles) { + return roles.contains(NodeRole.CONTROLLER); + } +} diff --git a/impl/src/main/java/io/kroxylicious/testing/kafka/invm/InVMKafkaCluster.java b/impl/src/main/java/io/kroxylicious/testing/kafka/invm/InVMKafkaCluster.java index 8ad2e807..27a99189 100644 --- a/impl/src/main/java/io/kroxylicious/testing/kafka/invm/InVMKafkaCluster.java +++ b/impl/src/main/java/io/kroxylicious/testing/kafka/invm/InVMKafkaCluster.java @@ -161,7 +161,7 @@ public Optional construct(Class clazz, Object... parameters) { private KafkaConfig buildBrokerConfig(KafkaClusterConfig.ConfigHolder c) { Properties properties = new Properties(); properties.putAll(c.getProperties()); - var logsDir = getBrokerLogDir(c.getBrokerNum()); + var logsDir = getBrokerLogDir(c.getNodeId()); properties.setProperty(KafkaConfig.LogDirProp(), logsDir.toAbsolutePath().toString()); LOGGER.log(System.Logger.Level.DEBUG, "Generated config {0}", properties); return new KafkaConfig(properties); @@ -174,18 +174,33 @@ private Path getBrokerLogDir(int brokerNum) { @Override public synchronized void start() { - // kraft mode: per-broker: 1 external port + 1 inter-broker port + 1 controller port + 1 anon port - // zk mode: per-cluster: 1 zk port; per-broker: 1 external port + 1 inter-broker port + 1 anon port + // zookeeper mode: per-cluster: 1 zk port; per-broker: 1 external port + 1 inter-broker port + 1 anon port + // kraft combined mode: per-broker: 1 external port + 1 inter-broker port + 1 controller port + 1 anon port + // kraft separate mode: per-controller: 1 controller port + // kraft separate mode: per-broker: 1 external port + 1 inter-broker port + 1 anon port try (PortAllocator.PortAllocationSession portAllocationSession = portsAllocator.allocationSession()) { - portAllocationSession.allocate(Set.of(Listener.EXTERNAL, Listener.ANON, Listener.INTERNAL), 0, clusterConfig.getBrokersNum()); - portAllocationSession.allocate(Set.of(Listener.CONTROLLER), 0, clusterConfig.isKraftMode() ? clusterConfig.getKraftControllers() : 1); + if (!clusterConfig.isKraftMode()) { + portAllocationSession.allocate(Set.of(Listener.CONTROLLER), 0); + } + for (int nodeId = 0; nodeId < clusterConfig.numNodes(); nodeId++) { + Set listeners = new HashSet<>(); + if (clusterConfig.hasControllerRole(nodeId)) { + listeners.add(Listener.CONTROLLER); + } + if (clusterConfig.hasBrokerRole(nodeId)) { + listeners.add(Listener.EXTERNAL); + listeners.add(Listener.ANON); + listeners.add(Listener.INTERNAL); + } + portAllocationSession.allocate(listeners, nodeId); + } } - buildAndStartZookeeper(); - clusterConfig.getBrokerConfigs(() -> this).parallel().forEach(configHolder -> { + maybeBuildAndStartZookeeper(); + clusterConfig.getNodeConfigs(() -> this).parallel().forEach(configHolder -> { final Server server = this.buildKafkaServer(configHolder); tryToStartServerWithRetry(configHolder, server); - servers.put(configHolder.getBrokerNum(), server); + servers.put(configHolder.getNodeId(), server); }); Utils.awaitExpectedBrokerCountInClusterViaTopic( clusterConfig.getAnonConnectConfigForCluster(buildBrokerList(nodeId -> getEndpointPair(Listener.ANON, nodeId))), 120, @@ -198,7 +213,7 @@ private void tryToStartServerWithRetry(KafkaClusterConfig.ConfigHolder configHol .until(() -> { // Hopefully we can remove this once a fix for https://issues.apache.org/jira/browse/KAFKA-14908 actually lands. try { - LOGGER.log(System.Logger.Level.DEBUG, "Attempting to start node: {0} with roles: {1}", configHolder.getBrokerNum(), + LOGGER.log(System.Logger.Level.DEBUG, "Attempting to start node: {0} with roles: {1}", configHolder.getNodeId(), configHolder.getProperties().get("process.roles")); server.startup(); return true; @@ -206,10 +221,10 @@ private void tryToStartServerWithRetry(KafkaClusterConfig.ConfigHolder configHol catch (Throwable t) { LOGGER.log(System.Logger.Level.WARNING, "failed to start server due to: " + t.getMessage()); LOGGER.log(System.Logger.Level.WARNING, "anon: {0}, client: {1}, controller: {2}, interBroker: {3}, ", - this.getEndpointPair(Listener.ANON, configHolder.getBrokerNum()).getBind(), - this.getEndpointPair(Listener.EXTERNAL, configHolder.getBrokerNum()).getBind(), - this.getEndpointPair(Listener.CONTROLLER, configHolder.getBrokerNum()).getBind(), - this.getEndpointPair(Listener.EXTERNAL, configHolder.getBrokerNum()).getBind()); + this.getEndpointPair(Listener.ANON, configHolder.getNodeId()).getBind(), + this.getEndpointPair(Listener.EXTERNAL, configHolder.getNodeId()).getBind(), + this.getEndpointPair(Listener.CONTROLLER, configHolder.getNodeId()).getBind(), + this.getEndpointPair(Listener.EXTERNAL, configHolder.getNodeId()).getBind()); server.shutdown(); server.awaitShutdown(); @@ -218,7 +233,7 @@ private void tryToStartServerWithRetry(KafkaClusterConfig.ConfigHolder configHol }); } - private void buildAndStartZookeeper() { + private void maybeBuildAndStartZookeeper() { if (!clusterConfig.isKraftMode()) { try { final int zookeeperPort = portsAllocator.getPort(Listener.CONTROLLER, 0); @@ -274,7 +289,7 @@ public Map getKafkaClientConfiguration(String user, String passw @Override public synchronized int addBroker() { // find next free kafka node.id - var first = IntStream.rangeClosed(0, getNumOfBrokers()).filter(cand -> !servers.containsKey(cand)).findFirst(); + var first = IntStream.rangeClosed(0, numNodes()).filter(cand -> !servers.containsKey(cand)).findFirst(); if (first.isEmpty()) { throw new IllegalStateException("Could not determine new nodeId, existing set " + servers.keySet()); } @@ -297,16 +312,16 @@ public synchronized int addBroker() { clusterConfig.getAnonConnectConfigForCluster(buildBrokerList(nodeId -> getEndpointPair(Listener.ANON, nodeId))), 120, TimeUnit.SECONDS, getNumOfBrokers()); - return configHolder.getBrokerNum(); + return configHolder.getNodeId(); } @Override public synchronized void removeBroker(int nodeId) throws IllegalArgumentException, UnsupportedOperationException { if (!servers.containsKey(nodeId)) { - throw new IllegalArgumentException("Broker node " + nodeId + " is not a member of the cluster."); + throw new IllegalArgumentException("Node " + nodeId + " is not a member of the cluster."); } - if (clusterConfig.isKraftMode() && isController(nodeId)) { - throw new UnsupportedOperationException("Cannot remove controller node " + nodeId + " from a kraft cluster."); + if (!clusterConfig.isPureBroker(nodeId)) { + throw new UnsupportedOperationException("Node " + nodeId + " is not a pure broker."); } if (servers.size() < 2) { throw new IllegalArgumentException("Cannot remove a node from a cluster with only %d nodes".formatted(servers.size())); @@ -434,6 +449,12 @@ private static void ensureDirectoryIsEmpty(Path path) { @Override public synchronized int getNumOfBrokers() { + return (int) servers.keySet().stream().filter(nodeId -> nodeId >= clusterConfig.numNodes() // added nodes are always brokers + || clusterConfig.hasBrokerRole(nodeId)) + .count(); // initial nodes have broker role depending on the metadata mode + } + + public synchronized int numNodes() { return servers.size(); } diff --git a/impl/src/main/java/io/kroxylicious/testing/kafka/testcontainers/TestcontainersKafkaCluster.java b/impl/src/main/java/io/kroxylicious/testing/kafka/testcontainers/TestcontainersKafkaCluster.java index 7d6fd47e..ccbc1b34 100644 --- a/impl/src/main/java/io/kroxylicious/testing/kafka/testcontainers/TestcontainersKafkaCluster.java +++ b/impl/src/main/java/io/kroxylicious/testing/kafka/testcontainers/TestcontainersKafkaCluster.java @@ -67,6 +67,8 @@ import io.kroxylicious.testing.kafka.api.KafkaCluster; import io.kroxylicious.testing.kafka.api.TerminationStyle; import io.kroxylicious.testing.kafka.common.KafkaClusterConfig; +import io.kroxylicious.testing.kafka.common.MetadataMode; +import io.kroxylicious.testing.kafka.common.NodeRole; import io.kroxylicious.testing.kafka.common.PortAllocator; import io.kroxylicious.testing.kafka.common.Utils; @@ -185,15 +187,32 @@ public TestcontainersKafkaCluster(DockerImageName kafkaImage, DockerImageName zo } try (PortAllocator.PortAllocationSession portAllocationSession = portsAllocator.allocationSession()) { - portAllocationSession.allocate(Set.of(Listener.EXTERNAL, Listener.ANON), 0, clusterConfig.getBrokersNum()); - portAllocationSession.allocate(Set.of(Listener.CONTROLLER), 0, clusterConfig.getKraftControllers()); + // portAllocationSession.allocate(Set.of(Listener.EXTERNAL, Listener.ANON), 0, clusterConfig.getBrokersNum()); + // portAllocationSession.allocate(Set.of(Listener.CONTROLLER), 0, clusterConfig.getKraftControllers()); + + // if (!clusterConfig.isKraftMode()) { + // portAllocationSession.allocate(Set.of(Listener.CONTROLLER), 0); + // } + for (int nodeId = 0; nodeId < clusterConfig.numNodes(); nodeId++) { + Set listeners = new HashSet<>(); + if (clusterConfig.hasControllerRole(nodeId)) { + listeners.add(Listener.CONTROLLER); + } + if (clusterConfig.hasBrokerRole(nodeId)) { + listeners.add(Listener.EXTERNAL); + listeners.add(Listener.ANON); + listeners.add(Listener.INTERNAL); + } + portAllocationSession.allocate(listeners, nodeId); + } } - clusterConfig.getBrokerConfigs(() -> this).forEach(holder -> nodes.put(holder.getBrokerNum(), buildKafkaContainer(holder))); + clusterConfig.getNodeConfigs(() -> this).forEach(holder -> nodes.put(holder.getNodeId(), buildKafkaContainer(holder))); } private static void validateClusterConfig(KafkaClusterConfig clusterConfig) { - if (Boolean.TRUE.equals(clusterConfig.getKraftMode()) && clusterConfig.getBrokersNum() < clusterConfig.getKraftControllers()) { + if (clusterConfig.getMetadataMode() != MetadataMode.ZOOKEEPER + && clusterConfig.getBrokersNum() < clusterConfig.getKraftControllers()) { throw new IllegalStateException( "Due to https://github.com/ozangunalp/kafka-native/issues/88 we can't support controller only nodes so we need to fail fast. This cluster has " + clusterConfig.getBrokersNum() + " brokers and " + clusterConfig.getKraftControllers() + " controllers"); @@ -202,7 +221,7 @@ private static void validateClusterConfig(KafkaClusterConfig clusterConfig) { @NotNull private KafkaContainer buildKafkaContainer(KafkaClusterConfig.ConfigHolder holder) { - String netAlias = "broker-" + holder.getBrokerNum(); + String netAlias = "broker-" + holder.getNodeId(); KafkaContainer kafkaContainer = new KafkaContainer(kafkaImage) .withName(name) .withNetwork(network) @@ -214,7 +233,7 @@ private KafkaContainer buildKafkaContainer(KafkaClusterConfig.ConfigHolder holde kafkaContainer // KAFKA_LOG_DIR overrides a key in the quarkus kafka image application.properties. The quarkus app uses // that to set log.dir. Any value we set for log.dir in server.properties is lost. - .withEnv("KAFKA_LOG_DIR", getBrokerLogDirectory(holder.getBrokerNum())) + .withEnv("KAFKA_LOG_DIR", getBrokerLogDirectory(holder.getNodeId())) .withEnv("SERVER_PROPERTIES_FILE", "/cnf/server.properties") .withEnv("SERVER_CLUSTER_ID", holder.getKafkaKraftClusterId()) .withCopyToContainer(Transferable.of(propertiesToBytes(holder.getProperties()), 0644), "/cnf/server.properties") @@ -222,7 +241,7 @@ private KafkaContainer buildKafkaContainer(KafkaClusterConfig.ConfigHolder holde .withMinimumRunningDuration(MINIMUM_RUNNING_DURATION) .withStartupTimeout(STARTUP_TIMEOUT); - if (holder.isBroker()) { + if (NodeRole.hasBrokerRole(holder.roles())) { kafkaContainer.addFixedExposedPort(holder.getExternalPort(), CLIENT_PORT); kafkaContainer.addFixedExposedPort(holder.getAnonPort(), ANON_PORT); } @@ -346,13 +365,13 @@ public synchronized int addBroker() { kafkaContainer.stop(); throw new RuntimeException(e); } - nodes.put(configHolder.getBrokerNum(), kafkaContainer); + nodes.put(configHolder.getNodeId(), kafkaContainer); Utils.awaitExpectedBrokerCountInClusterViaTopic( clusterConfig.getAnonConnectConfigForCluster(buildBrokerList(nodeId -> getEndpointPair(Listener.ANON, nodeId))), 120, TimeUnit.SECONDS, getNumOfBrokers()); - return configHolder.getBrokerNum(); + return configHolder.getNodeId(); } @Override @@ -453,7 +472,7 @@ public synchronized void stopNodes(IntPredicate nodeIdPredicate, TerminationStyl if (zookeeper != null) { // In the zookeeper case, we may as well wait for the zookeeper session timeout. This serves to prevent a // subsequent call to startBroker spinning excessively. - var config = clusterConfig.getBrokerConfigs(() -> this).findFirst(); + var config = clusterConfig.getNodeConfigs(() -> this).findFirst(); var zkSessionTimeout = config.map(KafkaClusterConfig.ConfigHolder::getProperties).map(p -> p.getProperty(KafkaConfig.ZkSessionTimeoutMsProp(), "0")) .map(Long::parseLong); zkSessionTimeout.filter(timeout -> timeout > 0).ifPresent( diff --git a/impl/src/test/java/io/kroxylicious/testing/kafka/KafkaClusterTest.java b/impl/src/test/java/io/kroxylicious/testing/kafka/KafkaClusterTest.java index 30caeb53..d867a482 100644 --- a/impl/src/test/java/io/kroxylicious/testing/kafka/KafkaClusterTest.java +++ b/impl/src/test/java/io/kroxylicious/testing/kafka/KafkaClusterTest.java @@ -35,8 +35,8 @@ import org.junit.jupiter.api.condition.EnabledIf; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.MethodSource; -import org.junit.jupiter.params.provider.ValueSource; import io.kroxylicious.testing.kafka.api.KafkaCluster; import io.kroxylicious.testing.kafka.api.TerminationStyle; @@ -47,6 +47,7 @@ import io.kroxylicious.testing.kafka.common.KafkaClusterExecutionMode; import io.kroxylicious.testing.kafka.common.KafkaClusterFactory; import io.kroxylicious.testing.kafka.common.KeytoolCertificateGenerator; +import io.kroxylicious.testing.kafka.common.MetadataMode; import io.kroxylicious.testing.kafka.common.Utils; import static io.kroxylicious.testing.kafka.common.KafkaClusterFactory.TEST_CLUSTER_EXECUTION_MODE; @@ -70,7 +71,7 @@ class KafkaClusterTest { void kafkaClusterKraftMode() throws Exception { try (var cluster = KafkaClusterFactory.create(KafkaClusterConfig.builder() .testInfo(testInfo) - .kraftMode(true) + .metadataMode(MetadataMode.KRAFT_COMBINED) .build())) { cluster.start(); verifyRecordRoundTrip(1, cluster); @@ -82,7 +83,7 @@ void kafkaClusterKraftMode() throws Exception { void kafkaClusterKraftModeWithMultipleControllers() throws Exception { try (var cluster = KafkaClusterFactory.create(KafkaClusterConfig.builder() .testInfo(testInfo) - .kraftMode(true) + .metadataMode(MetadataMode.KRAFT_COMBINED) .kraftControllers(3) .build())) { cluster.start(); @@ -101,7 +102,7 @@ public static boolean canTestAdditionalControllers() { void kafkaClusterZookeeperMode() throws Exception { try (var cluster = KafkaClusterFactory.create(KafkaClusterConfig.builder() .testInfo(testInfo) - .kraftMode(false) + .metadataMode(MetadataMode.ZOOKEEPER) .build())) { cluster.start(); verifyRecordRoundTrip(1, cluster); @@ -109,13 +110,13 @@ void kafkaClusterZookeeperMode() throws Exception { } @ParameterizedTest - @ValueSource(booleans = { true, false }) - void kafkaClusterAddBroker(boolean kraft) throws Exception { + @EnumSource(value = MetadataMode.class) + void kafkaClusterAddBroker(MetadataMode metadataMode) throws Exception { int brokersNum = 1; try (var cluster = KafkaClusterFactory.create(KafkaClusterConfig.builder() .testInfo(testInfo) .brokersNum(brokersNum) - .kraftMode(kraft) + .metadataMode(metadataMode) .build())) { cluster.start(); assertThat(cluster.getNumOfBrokers()).isEqualTo(brokersNum); @@ -124,7 +125,7 @@ void kafkaClusterAddBroker(boolean kraft) throws Exception { verifyRecordRoundTrip(brokersNum, cluster); int nodeId = cluster.addBroker(); - assertThat(nodeId).isEqualTo(1); + assertThat(nodeId).isEqualTo(metadataMode == MetadataMode.KRAFT_SEPARATE ? 2 : 1); assertThat(cluster.getNumOfBrokers()).isEqualTo(brokersNum + 1); assertThat(cluster.getBootstrapServers().split(",")).hasSize(brokersNum + 1); verifyRecordRoundTrip(brokersNum + 1, cluster); @@ -133,37 +134,45 @@ void kafkaClusterAddBroker(boolean kraft) throws Exception { public static Stream stopAndStartBrokers() { return Stream.of( - Arguments.of(1, true, TerminationStyle.ABRUPT, (IntPredicate) node -> true), - Arguments.of(2, true, TerminationStyle.ABRUPT, (IntPredicate) node -> node == 1), - Arguments.of(2, true, TerminationStyle.ABRUPT, (IntPredicate) node -> true), - Arguments.of(1, true, TerminationStyle.GRACEFUL, (IntPredicate) node -> true), - Arguments.of(1, false, TerminationStyle.ABRUPT, (IntPredicate) node -> true)); + Arguments.of(1, MetadataMode.KRAFT_COMBINED, TerminationStyle.ABRUPT, (IntPredicate) node -> true), + Arguments.of(2, MetadataMode.KRAFT_COMBINED, TerminationStyle.ABRUPT, (IntPredicate) node -> node == 1), + Arguments.of(2, MetadataMode.KRAFT_COMBINED, TerminationStyle.ABRUPT, (IntPredicate) node -> true), + Arguments.of(1, MetadataMode.KRAFT_COMBINED, TerminationStyle.GRACEFUL, (IntPredicate) node -> true), + Arguments.of(1, MetadataMode.KRAFT_SEPARATE, TerminationStyle.ABRUPT, (IntPredicate) node -> node >= 1), + Arguments.of(2, MetadataMode.KRAFT_SEPARATE, TerminationStyle.ABRUPT, (IntPredicate) node -> node == 1), + Arguments.of(2, MetadataMode.KRAFT_SEPARATE, TerminationStyle.ABRUPT, (IntPredicate) node -> node >= 1), + Arguments.of(1, MetadataMode.KRAFT_SEPARATE, TerminationStyle.GRACEFUL, (IntPredicate) node -> node >= 1), + Arguments.of(1, MetadataMode.ZOOKEEPER, TerminationStyle.ABRUPT, (IntPredicate) node -> true)); } @ParameterizedTest @MethodSource - void stopAndStartBrokers(int brokersNum, boolean kraft, TerminationStyle terminationStyle, IntPredicate brokerPredicate) throws Exception { + void stopAndStartBrokers(int brokersNum, + MetadataMode metadataMode, + TerminationStyle terminationStyle, + IntPredicate stopBrokerPredicate) + throws Exception { try (var cluster = KafkaClusterFactory.create(KafkaClusterConfig.builder() .testInfo(testInfo) .brokersNum(brokersNum) - .kraftMode(kraft) + .metadataMode(metadataMode) .build())) { cluster.start(); assertThat(cluster.getNumOfBrokers()).isEqualTo(brokersNum); verifyRecordRoundTrip(brokersNum, cluster); var nodes = describeClusterNodes(cluster); - var brokersExpectedDown = nodes.stream().filter(n -> brokerPredicate.test(n.id())).toList(); + var brokersExpectedDown = nodes.stream().filter(n -> stopBrokerPredicate.test(n.id())).toList(); assertThat(cluster.getStoppedBrokers()).hasSize(0); - cluster.stopNodes(brokerPredicate, terminationStyle); + cluster.stopNodes(stopBrokerPredicate, terminationStyle); assertThat(cluster.getNumOfBrokers()).isEqualTo(brokersNum); assertThat(cluster.getStoppedBrokers()).hasSameSizeAs(brokersExpectedDown); brokersExpectedDown.forEach(this::assertBrokerRefusesConnection); - cluster.startNodes(brokerPredicate); + cluster.startNodes(stopBrokerPredicate); assertThat(cluster.getNumOfBrokers()).isEqualTo(brokersNum); assertThat(cluster.getStoppedBrokers()).hasSize(0); @@ -174,11 +183,11 @@ void stopAndStartBrokers(int brokersNum, boolean kraft, TerminationStyle termina } @ParameterizedTest - @ValueSource(booleans = { true, false }) - void stopAndStartIdempotency(boolean kraft) throws Exception { + @EnumSource(value = MetadataMode.class) + void stopAndStartIdempotency(MetadataMode metadataMode) throws Exception { try (var cluster = KafkaClusterFactory.create(KafkaClusterConfig.builder() .testInfo(testInfo) - .kraftMode(kraft) + .metadataMode(metadataMode) .build())) { cluster.start(); verifyRecordRoundTrip(cluster.getNumOfBrokers(), cluster); @@ -189,11 +198,21 @@ void stopAndStartIdempotency(boolean kraft) throws Exception { assertThat(cluster.getStoppedBrokers()).hasSize(0); cluster.stopNodes((u) -> true, null); - assertThat(cluster.getStoppedBrokers()).hasSize(1); + if (metadataMode == MetadataMode.KRAFT_SEPARATE) { + assertThat(cluster.getStoppedBrokers()).hasSize(2); + } + else { + assertThat(cluster.getStoppedBrokers()).hasSize(1); + } // stopping idempotent cluster.stopNodes((u) -> true, null); - assertThat(cluster.getStoppedBrokers()).hasSize(1); + if (metadataMode == MetadataMode.KRAFT_SEPARATE) { + assertThat(cluster.getStoppedBrokers()).hasSize(2); + } + else { + assertThat(cluster.getStoppedBrokers()).hasSize(1); + } cluster.startNodes((u) -> true); verifyRecordRoundTrip(cluster.getNumOfBrokers(), cluster); @@ -206,11 +225,11 @@ void stopAndStartIdempotency(boolean kraft) throws Exception { } @ParameterizedTest - @ValueSource(booleans = { true, false }) - void stopAndStartIncrementally(boolean kraft) throws Exception { + @EnumSource(value = MetadataMode.class) + void stopAndStartIncrementally(MetadataMode metadataMode) throws Exception { try (var cluster = KafkaClusterFactory.create(KafkaClusterConfig.builder() .testInfo(testInfo) - .kraftMode(kraft) + .metadataMode(metadataMode) .brokersNum(2) .build())) { cluster.start(); @@ -221,11 +240,21 @@ void stopAndStartIncrementally(boolean kraft) throws Exception { assertThat(cluster.getStoppedBrokers()).containsExactlyInAnyOrder(1); cluster.stopNodes((u) -> true, null); - assertThat(cluster.getStoppedBrokers()).containsExactlyInAnyOrder(0, 1); + if (metadataMode == MetadataMode.KRAFT_SEPARATE) { + assertThat(cluster.getStoppedBrokers()).containsExactlyInAnyOrder(0, 1, 2); + } + else { + assertThat(cluster.getStoppedBrokers()).containsExactlyInAnyOrder(0, 1); + } // restart one node (in the kraft case, this needs to be the controller). cluster.startNodes((n) -> n == 0); - assertThat(cluster.getStoppedBrokers()).containsExactlyInAnyOrder(1); + if (metadataMode == MetadataMode.KRAFT_SEPARATE) { + assertThat(cluster.getStoppedBrokers()).containsExactlyInAnyOrder(1, 2); + } + else { + assertThat(cluster.getStoppedBrokers()).containsExactlyInAnyOrder(1); + } cluster.startNodes((u) -> true); assertThat(cluster.getStoppedBrokers()).hasSize(0); @@ -234,11 +263,11 @@ void stopAndStartIncrementally(boolean kraft) throws Exception { } @ParameterizedTest - @ValueSource(booleans = { true, false }) - void topicPersistsThroughStopAndStart(boolean kraft) throws Exception { + @EnumSource(value = MetadataMode.class) + void topicPersistsThroughStopAndStart(MetadataMode metadataMode) throws Exception { try (var cluster = KafkaClusterFactory.create(KafkaClusterConfig.builder() .testInfo(testInfo) - .kraftMode(kraft) + .metadataMode(metadataMode) .brokersNum(1) .build())) { cluster.start(); @@ -265,12 +294,12 @@ void topicPersistsThroughStopAndStart(boolean kraft) throws Exception { } @Test - void kafkaTwoNodeClusterKraftMode() throws Exception { + void kafkaTwoNodeClusterKraftCombinedMode() throws Exception { int brokersNum = 2; try (var cluster = KafkaClusterFactory.create(KafkaClusterConfig.builder() .testInfo(testInfo) .brokersNum(brokersNum) - .kraftMode(true) + .metadataMode(MetadataMode.KRAFT_COMBINED) .build())) { cluster.start(); verifyRecordRoundTrip(brokersNum, cluster); @@ -283,7 +312,7 @@ void kafkaTwoNodeClusterZookeeperMode() throws Exception { try (var cluster = KafkaClusterFactory.create(KafkaClusterConfig.builder() .testInfo(testInfo) .brokersNum(brokersNum) - .kraftMode(false) + .metadataMode(MetadataMode.ZOOKEEPER) .build())) { cluster.start(); verifyRecordRoundTrip(brokersNum, cluster); @@ -291,13 +320,13 @@ void kafkaTwoNodeClusterZookeeperMode() throws Exception { } @ParameterizedTest - @ValueSource(booleans = { true, false }) - void kafkaClusterRemoveBroker(boolean kraft) throws Exception { + @EnumSource(value = MetadataMode.class) + void kafkaClusterRemoveBroker(MetadataMode metadataMode) throws Exception { int brokersNum = 3; try (var cluster = KafkaClusterFactory.create(KafkaClusterConfig.builder() .testInfo(testInfo) .brokersNum(brokersNum) - .kraftMode(kraft) + .metadataMode(metadataMode) .build())) { cluster.start(); assertThat(cluster.getNumOfBrokers()).isEqualTo(brokersNum); @@ -313,13 +342,13 @@ void kafkaClusterRemoveBroker(boolean kraft) throws Exception { } @ParameterizedTest - @ValueSource(booleans = { true, false }) - void kafkaClusterRemoveWithStoppedBrokerDisallowed(boolean kraft) throws Exception { + @EnumSource(value = MetadataMode.class) + void kafkaClusterRemoveWithStoppedBrokerDisallowed(MetadataMode metadataMode) throws Exception { int brokersNum = 2; try (var cluster = KafkaClusterFactory.create(KafkaClusterConfig.builder() .testInfo(testInfo) .brokersNum(brokersNum) - .kraftMode(kraft) + .metadataMode(metadataMode) .build())) { cluster.start(); @@ -331,13 +360,16 @@ void kafkaClusterRemoveWithStoppedBrokerDisallowed(boolean kraft) throws Excepti } } - @Test - void kafkaClusterKraftDisallowsControllerRemoval() throws Exception { + @ParameterizedTest + @EnumSource(value = MetadataMode.class, names = { + "KRAFT_COMBINED", "KRAFT_SEPARATE" + }) + void kafkaClusterKraftCombinedDisallowsControllerRemoval(MetadataMode metadataMode) throws Exception { int brokersNum = 1; try (var cluster = KafkaClusterFactory.create(KafkaClusterConfig.builder() .testInfo(testInfo) .brokersNum(brokersNum) - .kraftMode(true) + .metadataMode(metadataMode) .build())) { cluster.start(); @@ -347,10 +379,13 @@ void kafkaClusterKraftDisallowsControllerRemoval() throws Exception { } } - @Test - void kafkaClusterKraftModeWithAuth() throws Exception { + @ParameterizedTest + @EnumSource(value = MetadataMode.class, names = { + "KRAFT_COMBINED", "KRAFT_SEPARATE" + }) + void kafkaClusterKraftModeWithAuth(MetadataMode metadataMode) throws Exception { try (var cluster = KafkaClusterFactory.create(KafkaClusterConfig.builder() - .kraftMode(true) + .metadataMode(metadataMode) .testInfo(testInfo) .securityProtocol("SASL_PLAINTEXT") .saslMechanism("PLAIN") @@ -365,7 +400,7 @@ void kafkaClusterKraftModeWithAuth() throws Exception { void kafkaClusterZookeeperModeWithAuth() throws Exception { try (var cluster = KafkaClusterFactory.create(KafkaClusterConfig.builder() .testInfo(testInfo) - .kraftMode(false) + .metadataMode(MetadataMode.ZOOKEEPER) .securityProtocol("SASL_PLAINTEXT") .saslMechanism("PLAIN") .user("guest", "pass") @@ -375,14 +410,17 @@ void kafkaClusterZookeeperModeWithAuth() throws Exception { } } - @Test - void kafkaClusterKraftModeSASL_SSL_ClientUsesSSLClientAuth() throws Exception { + @ParameterizedTest + @EnumSource(value = MetadataMode.class, names = { + "KRAFT_COMBINED", "KRAFT_SEPARATE" + }) + void kafkaClusterKraftModeSASL_SSL_ClientUsesSSLClientAuth(MetadataMode metadataMode) throws Exception { createClientCertificate(); try (var cluster = KafkaClusterFactory.create(KafkaClusterConfig.builder() .testInfo(testInfo) .brokerKeytoolCertificateGenerator(brokerKeytoolCertificateGenerator) .clientKeytoolCertificateGenerator(clientKeytoolCertificateGenerator) - .kraftMode(true) + .metadataMode(metadataMode) .securityProtocol("SASL_SSL") .saslMechanism("PLAIN") .user("guest", "pass") @@ -392,14 +430,17 @@ void kafkaClusterKraftModeSASL_SSL_ClientUsesSSLClientAuth() throws Exception { } } - @Test - void kafkaClusterKraftModeSSL_ClientUsesSSLClientAuth() throws Exception { + @ParameterizedTest + @EnumSource(value = MetadataMode.class, names = { + "KRAFT_COMBINED", "KRAFT_SEPARATE" + }) + void kafkaClusterKraftModeSSL_ClientUsesSSLClientAuth(MetadataMode metadataMode) throws Exception { createClientCertificate(); try (var cluster = KafkaClusterFactory.create(KafkaClusterConfig.builder() .testInfo(testInfo) .brokerKeytoolCertificateGenerator(brokerKeytoolCertificateGenerator) .clientKeytoolCertificateGenerator(clientKeytoolCertificateGenerator) - .kraftMode(true) + .metadataMode(metadataMode) .securityProtocol("SSL") .build())) { cluster.start(); @@ -414,7 +455,7 @@ void kafkaClusterZookeeperModeSASL_SSL_ClientUsesSSLClientAuth() throws Exceptio .testInfo(testInfo) .brokerKeytoolCertificateGenerator(brokerKeytoolCertificateGenerator) .clientKeytoolCertificateGenerator(clientKeytoolCertificateGenerator) - .kraftMode(false) + .metadataMode(MetadataMode.ZOOKEEPER) .securityProtocol("SASL_SSL") .saslMechanism("PLAIN") .user("guest", "pass") @@ -431,7 +472,7 @@ void kafkaClusterZookeeperModeSSL_ClientUsesSSLClientAuth() throws Exception { .testInfo(testInfo) .brokerKeytoolCertificateGenerator(brokerKeytoolCertificateGenerator) .clientKeytoolCertificateGenerator(clientKeytoolCertificateGenerator) - .kraftMode(false) + .metadataMode(MetadataMode.ZOOKEEPER) .securityProtocol("SSL") .build())) { cluster.start(); @@ -439,12 +480,15 @@ void kafkaClusterZookeeperModeSSL_ClientUsesSSLClientAuth() throws Exception { } } - @Test - void kafkaClusterKraftModeSSL_ClientNoAuth() throws Exception { + @ParameterizedTest + @EnumSource(value = MetadataMode.class, names = { + "KRAFT_COMBINED", "KRAFT_SEPARATE" + }) + void kafkaClusterKraftModeSSL_ClientNoAuth(MetadataMode metadataMode) throws Exception { try (var cluster = KafkaClusterFactory.create(KafkaClusterConfig.builder() .testInfo(testInfo) .brokerKeytoolCertificateGenerator(brokerKeytoolCertificateGenerator) - .kraftMode(true) + .metadataMode(metadataMode) .securityProtocol("SSL") .build())) { cluster.start(); @@ -457,7 +501,7 @@ void kafkaClusterZookeeperModeSSL_ClientNoAuth() throws Exception { try (var cluster = KafkaClusterFactory.create(KafkaClusterConfig.builder() .testInfo(testInfo) .brokerKeytoolCertificateGenerator(brokerKeytoolCertificateGenerator) - .kraftMode(false) + .metadataMode(MetadataMode.ZOOKEEPER) .securityProtocol("SSL") .build())) { cluster.start(); @@ -465,12 +509,15 @@ void kafkaClusterZookeeperModeSSL_ClientNoAuth() throws Exception { } } - @Test - void kafkaClusterKraftModeSASL_SSL_ClientNoAuth() throws Exception { + @ParameterizedTest + @EnumSource(value = MetadataMode.class, names = { + "KRAFT_COMBINED", "KRAFT_SEPARATE" + }) + void kafkaClusterKraftModeSASL_SSL_ClientNoAuth(MetadataMode metadataMode) throws Exception { try (var cluster = KafkaClusterFactory.create(KafkaClusterConfig.builder() .testInfo(testInfo) .brokerKeytoolCertificateGenerator(brokerKeytoolCertificateGenerator) - .kraftMode(true) + .metadataMode(metadataMode) .securityProtocol("SASL_SSL") .saslMechanism("PLAIN") .user("guest", "guest") @@ -485,7 +532,7 @@ void kafkaClusterZookeeperModeSASL_SSL_ClientNoAuth() throws Exception { try (var cluster = KafkaClusterFactory.create(KafkaClusterConfig.builder() .testInfo(testInfo) .brokerKeytoolCertificateGenerator(brokerKeytoolCertificateGenerator) - .kraftMode(false) + .metadataMode(MetadataMode.ZOOKEEPER) .securityProtocol("SASL_SSL") .saslMechanism("PLAIN") .user("guest", "guest") diff --git a/impl/src/test/java/io/kroxylicious/testing/kafka/common/KafkaClusterConfigTest.java b/impl/src/test/java/io/kroxylicious/testing/kafka/common/KafkaClusterConfigTest.java index 9a281246..bfa74db5 100644 --- a/impl/src/test/java/io/kroxylicious/testing/kafka/common/KafkaClusterConfigTest.java +++ b/impl/src/test/java/io/kroxylicious/testing/kafka/common/KafkaClusterConfigTest.java @@ -41,7 +41,7 @@ void generateConfigForSpecificBroker() { final var config = kafkaClusterConfig.generateConfigForSpecificNode(endpointConfig, 0); // Then - assertThat(config.getBrokerNum()).isZero(); + assertThat(config.getNodeId()).isZero(); assertThat(config.getAnonPort()).isEqualTo(ANON_BASE_PORT); assertThat(config.getExternalPort()).isEqualTo(CLIENT_BASE_PORT); assertThat(config.getEndpoint()).isEqualTo("localhost:" + CLIENT_BASE_PORT); @@ -52,7 +52,11 @@ void generateConfigForSpecificBroker() { void shouldConfigureMultipleControllersInCombinedMode() { // Given var numBrokers = 3; - final KafkaClusterConfig kafkaClusterConfig = kafkaClusterConfigBuilder.kraftMode(true).kraftControllers(3).brokersNum(numBrokers).build(); + final KafkaClusterConfig kafkaClusterConfig = kafkaClusterConfigBuilder + .metadataMode(MetadataMode.KRAFT_COMBINED) + .kraftControllers(3) + .brokersNum(numBrokers) + .build(); // When for (int nodeId = 0; nodeId < numBrokers; nodeId++) { @@ -61,11 +65,15 @@ void shouldConfigureMultipleControllersInCombinedMode() { } @Test - void shouldConfigureMultipleControllersInControllerOnlyMode() { + void shouldConfigureMultipleControllersInCombinedModeWhenExcessControllers() { // Given var numBrokers = 1; var numControllers = 3; - final KafkaClusterConfig kafkaClusterConfig = kafkaClusterConfigBuilder.kraftMode(true).kraftControllers(numControllers).brokersNum(numBrokers).build(); + final KafkaClusterConfig kafkaClusterConfig = kafkaClusterConfigBuilder + .metadataMode(MetadataMode.KRAFT_COMBINED) + .kraftControllers(numControllers) + .brokersNum(numBrokers) + .build(); // When assertNodeIdHasRole(kafkaClusterConfig, 0, "broker,controller"); @@ -76,11 +84,15 @@ void shouldConfigureMultipleControllersInControllerOnlyMode() { } @Test - void shouldConfigureSingleControllersInCombinedMode() { + void shouldConfigureSingleControllerInCombinedMode() { // Given var numBrokers = 3; var numControllers = 1; - final KafkaClusterConfig kafkaClusterConfig = kafkaClusterConfigBuilder.kraftMode(true).kraftControllers(numControllers).brokersNum(numBrokers).build(); + final KafkaClusterConfig kafkaClusterConfig = kafkaClusterConfigBuilder + .metadataMode(MetadataMode.KRAFT_COMBINED) + .kraftControllers(numControllers) + .brokersNum(numBrokers) + .build(); // When assertNodeIdHasRole(kafkaClusterConfig, 0, "broker,controller"); @@ -91,33 +103,137 @@ void shouldConfigureSingleControllersInCombinedMode() { } @Test - void shouldGenerateConfigForBrokerNodes() { + void shouldGenerateConfigForBrokerNodesInCombinedMode() { // Given var numBrokers = 3; var numControllers = 1; - final KafkaClusterConfig kafkaClusterConfig = kafkaClusterConfigBuilder.kraftMode(true).kraftControllers(numControllers).brokersNum(numBrokers).build(); + final KafkaClusterConfig kafkaClusterConfig = kafkaClusterConfigBuilder + .metadataMode(MetadataMode.KRAFT_COMBINED) + .kraftControllers(numControllers) + .brokersNum(numBrokers) + .build(); // When - final Stream brokerConfigs = kafkaClusterConfig.getBrokerConfigs(() -> endpointConfig); + final Stream brokerConfigs = kafkaClusterConfig.getNodeConfigs(() -> endpointConfig); // Then assertThat(brokerConfigs).hasSize(3); } @Test - void shouldGenerateConfigForControllerNodes() { + void shouldGenerateConfigForControllerNodesInCombinedMode() { // Given var numBrokers = 1; var numControllers = 3; - final KafkaClusterConfig kafkaClusterConfig = kafkaClusterConfigBuilder.kraftMode(true).kraftControllers(numControllers).brokersNum(numBrokers).build(); + final KafkaClusterConfig kafkaClusterConfig = kafkaClusterConfigBuilder + .metadataMode(MetadataMode.KRAFT_COMBINED) + .kraftControllers(numControllers) + .brokersNum(numBrokers) + .build(); // When - final Stream brokerConfigs = kafkaClusterConfig.getBrokerConfigs(() -> endpointConfig); + final Stream brokerConfigs = kafkaClusterConfig.getNodeConfigs(() -> endpointConfig); // Then assertThat(brokerConfigs).hasSize(3); } + //// + + @Test + void shouldConfigureMultipleControllersInSeparateMode() { + // Given + var numBrokers = 3; + var numControllers = 3; + final KafkaClusterConfig kafkaClusterConfig = kafkaClusterConfigBuilder + .metadataMode(MetadataMode.KRAFT_SEPARATE) + .kraftControllers(3) + .brokersNum(numBrokers) + .build(); + + // When + for (int nodeId = 0; nodeId < numControllers; nodeId++) { + assertNodeIdHasRole(kafkaClusterConfig, nodeId, "controller"); + } + for (int nodeId = numControllers; nodeId < numControllers + numBrokers; nodeId++) { + assertNodeIdHasRole(kafkaClusterConfig, nodeId, "broker"); + } + } + + @Test + void shouldConfigureMultipleControllersInSeparateModeWhenExcessControllers() { + // Given + var numBrokers = 1; + var numControllers = 3; + final KafkaClusterConfig kafkaClusterConfig = kafkaClusterConfigBuilder + .metadataMode(MetadataMode.KRAFT_SEPARATE) + .kraftControllers(numControllers) + .brokersNum(numBrokers) + .build(); + + // When + assertNodeIdHasRole(kafkaClusterConfig, 0, "controller"); + assertNodeIdHasRole(kafkaClusterConfig, 1, "controller"); + assertNodeIdHasRole(kafkaClusterConfig, 2, "controller"); + + assertNodeIdHasRole(kafkaClusterConfig, 3, "broker"); + } + + @Test + void shouldConfigureSingleControllerInSeparateMode() { + // Given + var numBrokers = 3; + var numControllers = 1; + final KafkaClusterConfig kafkaClusterConfig = kafkaClusterConfigBuilder + .metadataMode(MetadataMode.KRAFT_SEPARATE) + .kraftControllers(numControllers) + .brokersNum(numBrokers) + .build(); + + // When + assertNodeIdHasRole(kafkaClusterConfig, 0, "controller"); + + for (int nodeId = 1; nodeId < numBrokers; nodeId++) { + assertNodeIdHasRole(kafkaClusterConfig, nodeId, "broker"); + } + } + + @Test + void shouldGenerateConfigForBrokerNodesInSeparateMode() { + // Given + var numBrokers = 3; + var numControllers = 1; + final KafkaClusterConfig kafkaClusterConfig = kafkaClusterConfigBuilder + .metadataMode(MetadataMode.KRAFT_SEPARATE) + .kraftControllers(numControllers) + .brokersNum(numBrokers) + .build(); + + // When + final Stream brokerConfigs = kafkaClusterConfig.getNodeConfigs(() -> endpointConfig); + + // Then + assertThat(brokerConfigs).hasSize(4); + } + + @Test + void shouldGenerateConfigForControllerNodesInSeparateMode() { + // Given + var numBrokers = 1; + var numControllers = 3; + final KafkaClusterConfig kafkaClusterConfig = kafkaClusterConfigBuilder + .metadataMode(MetadataMode.KRAFT_SEPARATE) + .kraftControllers(numControllers) + .brokersNum(numBrokers) + .build(); + + // When + final Stream brokerConfigs = kafkaClusterConfig.getNodeConfigs(() -> endpointConfig); + + // Then + assertThat(brokerConfigs).hasSize(4); + } + private void assertNodeIdHasRole(KafkaClusterConfig kafkaClusterConfig, int nodeId, String expectedRole) { final var config = kafkaClusterConfig.generateConfigForSpecificNode(endpointConfig, nodeId); assertThat(config.getProperties()).extracting(brokerConfig -> brokerConfig.get("process.roles")).as("nodeId: %s to have process.roles", nodeId).isEqualTo( diff --git a/impl/src/test/java/io/kroxylicious/testing/kafka/common/KafkaClusterFactoryTest.java b/impl/src/test/java/io/kroxylicious/testing/kafka/common/KafkaClusterFactoryTest.java index 367dda03..c9b2d519 100644 --- a/impl/src/test/java/io/kroxylicious/testing/kafka/common/KafkaClusterFactoryTest.java +++ b/impl/src/test/java/io/kroxylicious/testing/kafka/common/KafkaClusterFactoryTest.java @@ -5,7 +5,8 @@ */ package io.kroxylicious.testing.kafka.common; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -14,13 +15,14 @@ class KafkaClusterFactoryTest { @SuppressWarnings("resource") - @Test - void shouldThrowInContainerModeWithControllerOnlyNodes() { + @ParameterizedTest + @EnumSource(value = MetadataMode.class, names = { "KRAFT_COMBINED", "KRAFT_SEPARATE" }) + void shouldThrowInContainerModeWithControllerOnlyNodes(MetadataMode metadataMode) { // Due to https://github.com/ozangunalp/kafka-native/issues/88 we can't support controller only nodes, so we need to fail fast // Given final KafkaClusterConfig kafkaClusterConfig = KafkaClusterConfig.builder() .execMode(KafkaClusterExecutionMode.CONTAINER) - .kraftMode(true) + .metadataMode(metadataMode) .brokersNum(1) .kraftControllers(2) .build(); @@ -29,12 +31,13 @@ void shouldThrowInContainerModeWithControllerOnlyNodes() { assertThrows(IllegalStateException.class, () -> KafkaClusterFactory.create(kafkaClusterConfig)); } - @Test - void shouldCreateInstanceInVMModeWithControllerOnlyNodes() { + @ParameterizedTest + @EnumSource(value = MetadataMode.class, names = { "KRAFT_COMBINED", "KRAFT_SEPARATE" }) + void shouldCreateInstanceInVMModeWithControllerOnlyNodes(MetadataMode metadataMode) { // Given final KafkaClusterConfig kafkaClusterConfig = KafkaClusterConfig.builder() .execMode(KafkaClusterExecutionMode.IN_VM) - .kraftMode(true) + .metadataMode(metadataMode) .brokersNum(1) .kraftControllers(2) .build(); @@ -48,11 +51,12 @@ void shouldCreateInstanceInVMModeWithControllerOnlyNodes() { } } - @Test - void shouldCreateInstanceInContainerModeWithoutControllerOnlyNodes() { + @ParameterizedTest + @EnumSource(value = MetadataMode.class, names = { "KRAFT_COMBINED", "KRAFT_SEPARATE" }) + void shouldCreateInstanceInContainerModeWithoutControllerOnlyNodes(MetadataMode metadataMode) { // Given final KafkaClusterConfig kafkaClusterConfig = KafkaClusterConfig.builder() - .execMode(KafkaClusterExecutionMode.IN_VM).kraftMode(true) + .execMode(KafkaClusterExecutionMode.IN_VM).metadataMode(metadataMode) .brokersNum(3).kraftControllers(2).build(); // When diff --git a/impl/src/test/java/io/kroxylicious/testing/kafka/testcontainers/TestcontainersKafkaClusterTest.java b/impl/src/test/java/io/kroxylicious/testing/kafka/testcontainers/TestcontainersKafkaClusterTest.java index 0d035a2a..0b35f4ef 100644 --- a/impl/src/test/java/io/kroxylicious/testing/kafka/testcontainers/TestcontainersKafkaClusterTest.java +++ b/impl/src/test/java/io/kroxylicious/testing/kafka/testcontainers/TestcontainersKafkaClusterTest.java @@ -6,23 +6,26 @@ package io.kroxylicious.testing.kafka.testcontainers; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import io.kroxylicious.testing.kafka.common.KafkaClusterConfig; import io.kroxylicious.testing.kafka.common.KafkaClusterExecutionMode; +import io.kroxylicious.testing.kafka.common.MetadataMode; import static org.junit.jupiter.api.Assertions.assertThrows; class TestcontainersKafkaClusterTest { @SuppressWarnings("resource") - @Test - void shouldThrowInContainerModeWithControllerOnlyNodes() { + @ParameterizedTest + @EnumSource(value = MetadataMode.class, names = { "KRAFT_COMBINED", "KRAFT_SEPARATE" }) + void shouldThrowInContainerModeWithControllerOnlyNodes(MetadataMode metadataMode) { // Due to https://github.com/ozangunalp/kafka-native/issues/88 we can't support controller only nodes, so we need to fail fast // Given final KafkaClusterConfig kafkaClusterConfig = KafkaClusterConfig.builder() .execMode(KafkaClusterExecutionMode.CONTAINER) - .kraftMode(true) + .metadataMode(metadataMode) .brokersNum(1) .kraftControllers(2) .build(); From 676fe961eeefcb3e4959dda625eba7cbf4b810a3 Mon Sep 17 00:00:00 2001 From: Tom Bentley Date: Tue, 12 Sep 2023 16:48:55 +1200 Subject: [PATCH 2/9] Fix flaky broker removal Signed-off-by: Tom Bentley --- .../io/kroxylicious/testing/kafka/invm/InVMKafkaCluster.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/impl/src/main/java/io/kroxylicious/testing/kafka/invm/InVMKafkaCluster.java b/impl/src/main/java/io/kroxylicious/testing/kafka/invm/InVMKafkaCluster.java index 27a99189..193b146d 100644 --- a/impl/src/main/java/io/kroxylicious/testing/kafka/invm/InVMKafkaCluster.java +++ b/impl/src/main/java/io/kroxylicious/testing/kafka/invm/InVMKafkaCluster.java @@ -330,7 +330,7 @@ public synchronized void removeBroker(int nodeId) throws IllegalArgumentExceptio throw new IllegalStateException("Cannot remove nodes from a cluster with stopped nodes."); } - var target = servers.keySet().stream().filter(n -> n != nodeId).findFirst(); + var target = servers.keySet().stream().filter(n -> n != nodeId && clusterConfig.hasBrokerRole(n)).findFirst(); if (target.isEmpty()) { throw new IllegalStateException("Could not identify a node to be the re-assignment target"); } From 7b4b38d9c6e308861a61aa7b571f5b61087d9734 Mon Sep 17 00:00:00 2001 From: Tom Bentley Date: Tue, 12 Sep 2023 17:12:00 +1200 Subject: [PATCH 3/9] Fix some smell code Signed-off-by: Tom Bentley --- .../testing/kafka/common/KafkaClusterConfig.java | 13 ++----------- .../testcontainers/TestcontainersKafkaCluster.java | 6 ------ 2 files changed, 2 insertions(+), 17 deletions(-) diff --git a/impl/src/main/java/io/kroxylicious/testing/kafka/common/KafkaClusterConfig.java b/impl/src/main/java/io/kroxylicious/testing/kafka/common/KafkaClusterConfig.java index b86b10ac..7156510f 100644 --- a/impl/src/main/java/io/kroxylicious/testing/kafka/common/KafkaClusterConfig.java +++ b/impl/src/main/java/io/kroxylicious/testing/kafka/common/KafkaClusterConfig.java @@ -216,7 +216,6 @@ public Stream getNodeConfigs(Supplier endPointConf */ @NotNull public ConfigHolder generateConfigForSpecificNode(KafkaEndpoints kafkaEndpoints, int nodeId) { - // checkNodeId(nodeId); final var roles = nodeId >= numNodes() ? EnumSet.of(NodeRole.BROKER) : processRoles(nodeId); Properties nodeConfiguration = new Properties(); nodeConfiguration.putAll(brokerConfigs); @@ -267,7 +266,7 @@ else if (this.metadataMode == null || this.metadataMode == MetadataMode.KRAFT_CO configureKraftNode(kafkaEndpoints, nodeId, nodeConfiguration, protocolMap, listeners, earlyStart, roles); } else if (this.metadataMode == MetadataMode.ZOOKEEPER) { - configureLegacyBroker(nodeId, kafkaEndpoints, nodeConfiguration); + configureLegacyBroker(kafkaEndpoints, nodeConfiguration); } putConfig(nodeConfiguration, "listener.security.protocol.map", @@ -293,12 +292,6 @@ else if (this.metadataMode == MetadataMode.ZOOKEEPER) { return configHolder; } - private void checkNodeId(int nodeId) { - if (nodeId < 0 || nodeId >= numNodes()) { - throw new IllegalArgumentException("Bad node id " + nodeId + "; expected between 0 and " + numNodes() + " inclusive"); - } - } - /** * @return The total number of Kafka nodes (excludes any ZooKeeper nodes). */ @@ -311,8 +304,6 @@ public int numNodes() { @NotNull private EnumSet processRoles(int nodeId) { - // checkNodeId(nodeId); - if (metadataMode == null) { return EnumSet.of(NodeRole.BROKER); } @@ -440,7 +431,7 @@ private static void configureExternalListener(TreeMap protocolMa advertisedListeners.put(EXTERNAL_LISTENER_NAME, clientEndpoint.advertisedAddress()); } - private static void configureLegacyBroker(int brokerId, KafkaEndpoints kafkaEndpoints, Properties server) { + private static void configureLegacyBroker(KafkaEndpoints kafkaEndpoints, Properties server) { putConfig(server, "zookeeper.connect", kafkaEndpoints.getEndpointPair(Listener.CONTROLLER, 0).connectAddress()); putConfig(server, "zookeeper.sasl.enabled", "false"); putConfig(server, "zookeeper.connection.timeout.ms", Long.toString(60000)); diff --git a/impl/src/main/java/io/kroxylicious/testing/kafka/testcontainers/TestcontainersKafkaCluster.java b/impl/src/main/java/io/kroxylicious/testing/kafka/testcontainers/TestcontainersKafkaCluster.java index ccbc1b34..68a316b2 100644 --- a/impl/src/main/java/io/kroxylicious/testing/kafka/testcontainers/TestcontainersKafkaCluster.java +++ b/impl/src/main/java/io/kroxylicious/testing/kafka/testcontainers/TestcontainersKafkaCluster.java @@ -187,12 +187,6 @@ public TestcontainersKafkaCluster(DockerImageName kafkaImage, DockerImageName zo } try (PortAllocator.PortAllocationSession portAllocationSession = portsAllocator.allocationSession()) { - // portAllocationSession.allocate(Set.of(Listener.EXTERNAL, Listener.ANON), 0, clusterConfig.getBrokersNum()); - // portAllocationSession.allocate(Set.of(Listener.CONTROLLER), 0, clusterConfig.getKraftControllers()); - - // if (!clusterConfig.isKraftMode()) { - // portAllocationSession.allocate(Set.of(Listener.CONTROLLER), 0); - // } for (int nodeId = 0; nodeId < clusterConfig.numNodes(); nodeId++) { Set listeners = new HashSet<>(); if (clusterConfig.hasControllerRole(nodeId)) { From ea6b5a3594adf817dca827c88a6284644c6e5140 Mon Sep 17 00:00:00 2001 From: Tom Bentley Date: Wed, 13 Sep 2023 13:43:59 +1200 Subject: [PATCH 4/9] Disable separate controller Kraft tests with containers Because it's not supported yet. Signed-off-by: Tom Bentley --- .../testing/kafka/KafkaClusterTest.java | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/impl/src/test/java/io/kroxylicious/testing/kafka/KafkaClusterTest.java b/impl/src/test/java/io/kroxylicious/testing/kafka/KafkaClusterTest.java index d867a482..d44a118c 100644 --- a/impl/src/test/java/io/kroxylicious/testing/kafka/KafkaClusterTest.java +++ b/impl/src/test/java/io/kroxylicious/testing/kafka/KafkaClusterTest.java @@ -18,6 +18,7 @@ import java.util.function.IntPredicate; import java.util.stream.Stream; +import io.kroxylicious.testing.kafka.invm.InVMKafkaCluster; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -55,6 +56,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.fail; +import static org.junit.jupiter.api.Assumptions.assumeTrue; /** * Test case that simply exercises the ability to control the kafka cluster from the test. @@ -118,6 +120,8 @@ void kafkaClusterAddBroker(MetadataMode metadataMode) throws Exception { .brokersNum(brokersNum) .metadataMode(metadataMode) .build())) { + assumeTrue(metadataMode != MetadataMode.KRAFT_SEPARATE || cluster instanceof InVMKafkaCluster, + "Not supported with native image: https://github.com/ozangunalp/kafka-native/issues/88"); cluster.start(); assertThat(cluster.getNumOfBrokers()).isEqualTo(brokersNum); assertThat(cluster.getBootstrapServers().split(",")).hasSize(brokersNum); @@ -152,11 +156,14 @@ void stopAndStartBrokers(int brokersNum, TerminationStyle terminationStyle, IntPredicate stopBrokerPredicate) throws Exception { + try (var cluster = KafkaClusterFactory.create(KafkaClusterConfig.builder() .testInfo(testInfo) .brokersNum(brokersNum) .metadataMode(metadataMode) .build())) { + assumeTrue(metadataMode != MetadataMode.KRAFT_SEPARATE || cluster instanceof InVMKafkaCluster, + "Not supported with native image: https://github.com/ozangunalp/kafka-native/issues/88"); cluster.start(); assertThat(cluster.getNumOfBrokers()).isEqualTo(brokersNum); verifyRecordRoundTrip(brokersNum, cluster); @@ -189,6 +196,8 @@ void stopAndStartIdempotency(MetadataMode metadataMode) throws Exception { .testInfo(testInfo) .metadataMode(metadataMode) .build())) { + assumeTrue(metadataMode != MetadataMode.KRAFT_SEPARATE || cluster instanceof InVMKafkaCluster, + "Not supported with native image: https://github.com/ozangunalp/kafka-native/issues/88"); cluster.start(); verifyRecordRoundTrip(cluster.getNumOfBrokers(), cluster); @@ -232,6 +241,8 @@ void stopAndStartIncrementally(MetadataMode metadataMode) throws Exception { .metadataMode(metadataMode) .brokersNum(2) .build())) { + assumeTrue(metadataMode != MetadataMode.KRAFT_SEPARATE || cluster instanceof InVMKafkaCluster, + "Not supported with native image: https://github.com/ozangunalp/kafka-native/issues/88"); cluster.start(); verifyRecordRoundTrip(cluster.getNumOfBrokers(), cluster); assertThat(cluster.getStoppedBrokers()).hasSize(0); @@ -270,6 +281,8 @@ void topicPersistsThroughStopAndStart(MetadataMode metadataMode) throws Exceptio .metadataMode(metadataMode) .brokersNum(1) .build())) { + assumeTrue(metadataMode != MetadataMode.KRAFT_SEPARATE || cluster instanceof InVMKafkaCluster, + "Not supported with native image: https://github.com/ozangunalp/kafka-native/issues/88"); cluster.start(); var topic = "roundTrip" + Uuid.randomUuid(); @@ -328,6 +341,8 @@ void kafkaClusterRemoveBroker(MetadataMode metadataMode) throws Exception { .brokersNum(brokersNum) .metadataMode(metadataMode) .build())) { + assumeTrue(metadataMode != MetadataMode.KRAFT_SEPARATE || cluster instanceof InVMKafkaCluster, + "Not supported with native image: https://github.com/ozangunalp/kafka-native/issues/88"); cluster.start(); assertThat(cluster.getNumOfBrokers()).isEqualTo(brokersNum); assertThat(cluster.getBootstrapServers().split(",")).hasSize(brokersNum); @@ -350,6 +365,8 @@ void kafkaClusterRemoveWithStoppedBrokerDisallowed(MetadataMode metadataMode) th .brokersNum(brokersNum) .metadataMode(metadataMode) .build())) { + assumeTrue(metadataMode != MetadataMode.KRAFT_SEPARATE || cluster instanceof InVMKafkaCluster, + "Not supported with native image: https://github.com/ozangunalp/kafka-native/issues/88"); cluster.start(); cluster.stopNodes(n -> n == 1, null); @@ -371,6 +388,8 @@ void kafkaClusterKraftCombinedDisallowsControllerRemoval(MetadataMode metadataMo .brokersNum(brokersNum) .metadataMode(metadataMode) .build())) { + assumeTrue(metadataMode != MetadataMode.KRAFT_SEPARATE || cluster instanceof InVMKafkaCluster, + "Not supported with native image: https://github.com/ozangunalp/kafka-native/issues/88"); cluster.start(); // Node zero is the controller @@ -391,6 +410,8 @@ void kafkaClusterKraftModeWithAuth(MetadataMode metadataMode) throws Exception { .saslMechanism("PLAIN") .user("guest", "pass") .build())) { + assumeTrue(metadataMode != MetadataMode.KRAFT_SEPARATE || cluster instanceof InVMKafkaCluster, + "Not supported with native image: https://github.com/ozangunalp/kafka-native/issues/88"); cluster.start(); verifyRecordRoundTrip(1, cluster); } @@ -425,6 +446,8 @@ void kafkaClusterKraftModeSASL_SSL_ClientUsesSSLClientAuth(MetadataMode metadata .saslMechanism("PLAIN") .user("guest", "pass") .build())) { + assumeTrue(metadataMode != MetadataMode.KRAFT_SEPARATE || cluster instanceof InVMKafkaCluster, + "Not supported with native image: https://github.com/ozangunalp/kafka-native/issues/88"); cluster.start(); verifyRecordRoundTrip(1, cluster); } @@ -443,6 +466,8 @@ void kafkaClusterKraftModeSSL_ClientUsesSSLClientAuth(MetadataMode metadataMode) .metadataMode(metadataMode) .securityProtocol("SSL") .build())) { + assumeTrue(metadataMode != MetadataMode.KRAFT_SEPARATE || cluster instanceof InVMKafkaCluster, + "Not supported with native image: https://github.com/ozangunalp/kafka-native/issues/88"); cluster.start(); verifyRecordRoundTrip(1, cluster); } @@ -491,6 +516,8 @@ void kafkaClusterKraftModeSSL_ClientNoAuth(MetadataMode metadataMode) throws Exc .metadataMode(metadataMode) .securityProtocol("SSL") .build())) { + assumeTrue(metadataMode != MetadataMode.KRAFT_SEPARATE || cluster instanceof InVMKafkaCluster, + "Not supported with native image: https://github.com/ozangunalp/kafka-native/issues/88"); cluster.start(); verifyRecordRoundTrip(1, cluster); } @@ -522,6 +549,8 @@ void kafkaClusterKraftModeSASL_SSL_ClientNoAuth(MetadataMode metadataMode) throw .saslMechanism("PLAIN") .user("guest", "guest") .build())) { + assumeTrue(metadataMode != MetadataMode.KRAFT_SEPARATE || cluster instanceof InVMKafkaCluster, + "Not supported with native image: https://github.com/ozangunalp/kafka-native/issues/88"); cluster.start(); verifyRecordRoundTrip(1, cluster); } From ae727753a97816077a02389960f7b76f4bdbdb98 Mon Sep 17 00:00:00 2001 From: Tom Bentley Date: Wed, 13 Sep 2023 13:54:03 +1200 Subject: [PATCH 5/9] Improve diagnostics Signed-off-by: Tom Bentley --- .../io/kroxylicious/testing/kafka/invm/InVMKafkaCluster.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/impl/src/main/java/io/kroxylicious/testing/kafka/invm/InVMKafkaCluster.java b/impl/src/main/java/io/kroxylicious/testing/kafka/invm/InVMKafkaCluster.java index 193b146d..739b14fe 100644 --- a/impl/src/main/java/io/kroxylicious/testing/kafka/invm/InVMKafkaCluster.java +++ b/impl/src/main/java/io/kroxylicious/testing/kafka/invm/InVMKafkaCluster.java @@ -219,7 +219,7 @@ private void tryToStartServerWithRetry(KafkaClusterConfig.ConfigHolder configHol return true; } catch (Throwable t) { - LOGGER.log(System.Logger.Level.WARNING, "failed to start server due to: " + t.getMessage()); + LOGGER.log(System.Logger.Level.WARNING, "failed to start server due to: {}", t.toString()); LOGGER.log(System.Logger.Level.WARNING, "anon: {0}, client: {1}, controller: {2}, interBroker: {3}, ", this.getEndpointPair(Listener.ANON, configHolder.getNodeId()).getBind(), this.getEndpointPair(Listener.EXTERNAL, configHolder.getNodeId()).getBind(), From 2c22db8b5bf0552faec87bc6052c8a194888aceb Mon Sep 17 00:00:00 2001 From: Tom Bentley Date: Wed, 13 Sep 2023 14:06:15 +1200 Subject: [PATCH 6/9] Try again Signed-off-by: Tom Bentley --- .../io/kroxylicious/testing/kafka/invm/InVMKafkaCluster.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/impl/src/main/java/io/kroxylicious/testing/kafka/invm/InVMKafkaCluster.java b/impl/src/main/java/io/kroxylicious/testing/kafka/invm/InVMKafkaCluster.java index 739b14fe..67913daf 100644 --- a/impl/src/main/java/io/kroxylicious/testing/kafka/invm/InVMKafkaCluster.java +++ b/impl/src/main/java/io/kroxylicious/testing/kafka/invm/InVMKafkaCluster.java @@ -219,7 +219,7 @@ private void tryToStartServerWithRetry(KafkaClusterConfig.ConfigHolder configHol return true; } catch (Throwable t) { - LOGGER.log(System.Logger.Level.WARNING, "failed to start server due to: {}", t.toString()); + LOGGER.log(System.Logger.Level.WARNING, "failed to start server due to: {0}", t.toString()); LOGGER.log(System.Logger.Level.WARNING, "anon: {0}, client: {1}, controller: {2}, interBroker: {3}, ", this.getEndpointPair(Listener.ANON, configHolder.getNodeId()).getBind(), this.getEndpointPair(Listener.EXTERNAL, configHolder.getNodeId()).getBind(), From a0a648f013195b56598a0c33aa44e596e3de2700 Mon Sep 17 00:00:00 2001 From: Tom Bentley Date: Wed, 13 Sep 2023 16:33:46 +1200 Subject: [PATCH 7/9] Tidy up Signed-off-by: Tom Bentley --- .../testing/kafka/invm/InVMKafkaCluster.java | 28 ++++++------------- 1 file changed, 8 insertions(+), 20 deletions(-) diff --git a/impl/src/main/java/io/kroxylicious/testing/kafka/invm/InVMKafkaCluster.java b/impl/src/main/java/io/kroxylicious/testing/kafka/invm/InVMKafkaCluster.java index 67913daf..a7360438 100644 --- a/impl/src/main/java/io/kroxylicious/testing/kafka/invm/InVMKafkaCluster.java +++ b/impl/src/main/java/io/kroxylicious/testing/kafka/invm/InVMKafkaCluster.java @@ -97,7 +97,7 @@ public InVMKafkaCluster(KafkaClusterConfig clusterConfig) { private static void exitHandler(int statusCode, String message) { final IllegalStateException illegalStateException = new IllegalStateException(message); - LOGGER.log(System.Logger.Level.WARNING, "Kafka tried to exit with statusCode: {0} and message: {1}. Including stacktrace to determine whats at fault", + LOGGER.log(System.Logger.Level.WARNING, "Kafka tried to exit with statusCode: {0} and message: {1}. Including stacktrace to determine what's at fault", statusCode, message, illegalStateException); } @@ -161,15 +161,15 @@ public Optional construct(Class clazz, Object... parameters) { private KafkaConfig buildBrokerConfig(KafkaClusterConfig.ConfigHolder c) { Properties properties = new Properties(); properties.putAll(c.getProperties()); - var logsDir = getBrokerLogDir(c.getNodeId()); + var logsDir = getNodeLogDir(c.getNodeId()); properties.setProperty(KafkaConfig.LogDirProp(), logsDir.toAbsolutePath().toString()); LOGGER.log(System.Logger.Level.DEBUG, "Generated config {0}", properties); return new KafkaConfig(properties); } @NotNull - private Path getBrokerLogDir(int brokerNum) { - return this.tempDirectory.resolve(String.format("broker-%d", brokerNum)); + private Path getNodeLogDir(int nodeId) { + return this.tempDirectory.resolve(String.format("node-%d", nodeId)); } @Override @@ -270,7 +270,7 @@ public synchronized String getBootstrapServers() { private synchronized String buildBrokerList(Function endpointFunc) { return servers.keySet().stream() - .filter(this::isBroker) + .filter(nodeId -> clusterConfig.hasBrokerRole(nodeId)) .map(endpointFunc) .map(KafkaClusterConfig.KafkaEndpoints.EndpointPair::connectAddress) .collect(Collectors.joining(",")); @@ -344,7 +344,7 @@ public synchronized void removeBroker(int nodeId) throws IllegalArgumentExceptio var s = servers.remove(nodeId); s.shutdown(); s.awaitShutdown(); - ensureDirectoryIsEmpty(getBrokerLogDir(nodeId)); + ensureDirectoryIsEmpty(getNodeLogDir(nodeId)); } @Override @@ -406,20 +406,8 @@ public synchronized void close() throws Exception { */ private void roleOrderedShutdown(Map servers) { // Shutdown servers without a controller port first. - shutdownServers(servers, e -> !isController(e.getKey())); - shutdownServers(servers, e -> isController(e.getKey())); - } - - private boolean isController(Integer key) { - // TODO this is nasty. We shouldn't need to go via the portAllocator to figure out what a node is - // But it is at least testing something meaningful about the configuration - return portsAllocator.hasRegisteredPort(Listener.CONTROLLER, key); - } - - private boolean isBroker(Integer key) { - // TODO this is nasty. We shouldn't need to go via the portAllocator to figure out what a node is - // But it is at least testing something meaningful about the configuration - return portsAllocator.hasRegisteredPort(Listener.ANON, key); + shutdownServers(servers, e -> clusterConfig.isPureBroker(e.getKey())); + shutdownServers(servers, e -> clusterConfig.hasControllerRole(e.getKey())); } @SuppressWarnings("java:S3864") // Stream.peek is being used with caution. From c2b48cb1fcea0edee2a178a4163e628623e03775 Mon Sep 17 00:00:00 2001 From: Tom Bentley Date: Wed, 13 Sep 2023 17:07:45 +1200 Subject: [PATCH 8/9] Tidyup without breaking tests Signed-off-by: Tom Bentley --- .../testing/kafka/invm/InVMKafkaCluster.java | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/impl/src/main/java/io/kroxylicious/testing/kafka/invm/InVMKafkaCluster.java b/impl/src/main/java/io/kroxylicious/testing/kafka/invm/InVMKafkaCluster.java index a7360438..9da883c6 100644 --- a/impl/src/main/java/io/kroxylicious/testing/kafka/invm/InVMKafkaCluster.java +++ b/impl/src/main/java/io/kroxylicious/testing/kafka/invm/InVMKafkaCluster.java @@ -270,7 +270,7 @@ public synchronized String getBootstrapServers() { private synchronized String buildBrokerList(Function endpointFunc) { return servers.keySet().stream() - .filter(nodeId -> clusterConfig.hasBrokerRole(nodeId)) + .filter(this::isBroker) .map(endpointFunc) .map(KafkaClusterConfig.KafkaEndpoints.EndpointPair::connectAddress) .collect(Collectors.joining(",")); @@ -406,8 +406,18 @@ public synchronized void close() throws Exception { */ private void roleOrderedShutdown(Map servers) { // Shutdown servers without a controller port first. - shutdownServers(servers, e -> clusterConfig.isPureBroker(e.getKey())); - shutdownServers(servers, e -> clusterConfig.hasControllerRole(e.getKey())); + shutdownServers(servers, e -> !isController(e.getKey())); + shutdownServers(servers, e -> isController(e.getKey())); + } + + private boolean isController(Integer key) { + return key < clusterConfig.numNodes() // dynamically added nodes are always pure brokers + && clusterConfig.hasControllerRole(key); + } + + private boolean isBroker(Integer key) { + return key >= clusterConfig.numNodes() // dynamically added nodes are always pure brokers + || clusterConfig.hasBrokerRole(key); } @SuppressWarnings("java:S3864") // Stream.peek is being used with caution. From 9b958c57a2d61569b14c97477e09991329539c2f Mon Sep 17 00:00:00 2001 From: Tom Bentley Date: Wed, 13 Sep 2023 17:28:51 +1200 Subject: [PATCH 9/9] Debug Signed-off-by: Tom Bentley --- .../testing/kafka/common/KafkaClusterConfig.java | 4 ++++ .../io/kroxylicious/testing/kafka/invm/InVMKafkaCluster.java | 5 +++-- .../java/io/kroxylicious/testing/kafka/KafkaClusterTest.java | 2 ++ 3 files changed, 9 insertions(+), 2 deletions(-) diff --git a/impl/src/main/java/io/kroxylicious/testing/kafka/common/KafkaClusterConfig.java b/impl/src/main/java/io/kroxylicious/testing/kafka/common/KafkaClusterConfig.java index 7156510f..a2988831 100644 --- a/impl/src/main/java/io/kroxylicious/testing/kafka/common/KafkaClusterConfig.java +++ b/impl/src/main/java/io/kroxylicious/testing/kafka/common/KafkaClusterConfig.java @@ -654,6 +654,10 @@ public ConfigHolder(String kafkaKraftClusterId, public Set roles() { return roles; } + + public int nodeId() { + return nodeId; + } } /** diff --git a/impl/src/main/java/io/kroxylicious/testing/kafka/invm/InVMKafkaCluster.java b/impl/src/main/java/io/kroxylicious/testing/kafka/invm/InVMKafkaCluster.java index 9da883c6..fca9e3ae 100644 --- a/impl/src/main/java/io/kroxylicious/testing/kafka/invm/InVMKafkaCluster.java +++ b/impl/src/main/java/io/kroxylicious/testing/kafka/invm/InVMKafkaCluster.java @@ -219,8 +219,9 @@ private void tryToStartServerWithRetry(KafkaClusterConfig.ConfigHolder configHol return true; } catch (Throwable t) { - LOGGER.log(System.Logger.Level.WARNING, "failed to start server due to: {0}", t.toString()); - LOGGER.log(System.Logger.Level.WARNING, "anon: {0}, client: {1}, controller: {2}, interBroker: {3}, ", + LOGGER.log(System.Logger.Level.WARNING, "failed to start server with node.id={0} due to: {1}", configHolder.nodeId(), t.toString()); + LOGGER.log(System.Logger.Level.WARNING, "nodeId: {0}, anon: {1}, client: {2}, controller: {3}, interBroker: {4}, ", + configHolder.nodeId(), this.getEndpointPair(Listener.ANON, configHolder.getNodeId()).getBind(), this.getEndpointPair(Listener.EXTERNAL, configHolder.getNodeId()).getBind(), this.getEndpointPair(Listener.CONTROLLER, configHolder.getNodeId()).getBind(), diff --git a/impl/src/test/java/io/kroxylicious/testing/kafka/KafkaClusterTest.java b/impl/src/test/java/io/kroxylicious/testing/kafka/KafkaClusterTest.java index d44a118c..d288db02 100644 --- a/impl/src/test/java/io/kroxylicious/testing/kafka/KafkaClusterTest.java +++ b/impl/src/test/java/io/kroxylicious/testing/kafka/KafkaClusterTest.java @@ -341,9 +341,11 @@ void kafkaClusterRemoveBroker(MetadataMode metadataMode) throws Exception { .brokersNum(brokersNum) .metadataMode(metadataMode) .build())) { + System.out.println("### Cluster built"); assumeTrue(metadataMode != MetadataMode.KRAFT_SEPARATE || cluster instanceof InVMKafkaCluster, "Not supported with native image: https://github.com/ozangunalp/kafka-native/issues/88"); cluster.start(); + System.out.println("### Cluster started"); assertThat(cluster.getNumOfBrokers()).isEqualTo(brokersNum); assertThat(cluster.getBootstrapServers().split(",")).hasSize(brokersNum); verifyRecordRoundTrip(brokersNum, cluster);