Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #176: Changes needed to get the tests working, at least #177

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ public interface KafkaCluster extends AutoCloseable {
void close() throws Exception;

/**
* Gets the number of brokers in the cluster, including any that are stopped.
* @return the size of the cluster.
* Gets the number of brokers in the cluster, including any that are stopped, excluding any pure controller nodes.
* @return the number of brokers in the cluster.
*/
int getNumOfBrokers();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@ public static ClusterId clusterId(String clusterId) {
* @return the kraft cluster
*/
public static KRaftCluster kraftCluster(int numControllers) {
return mkAnnotation(KRaftCluster.class, Map.of("numControllers", numControllers));
return mkAnnotation(KRaftCluster.class, Map.of(
"numControllers", numControllers,
"combinedMode", true));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,42 @@
/**
* Annotation constraining a {@link KafkaClusterProvisioningStrategy} to use
* a {@link KafkaCluster} that is KRaft-based.
*
* <table>
* <caption>Breakdown of the interaction between numControllers and numBrokers</caption>
* <tr><th>numBrokers</th><th>numControllers</th><th>combinedMode</th><th>roles</th></tr>
* <tr><td>1</td><td>1</td><td>true</td><td>1×<code>"broker,controller"</code></td></tr>
* <tr><td>1</td><td>1</td><td>false</td><td>1×<code>"broker"</code>, 1×<code>"controller"</code></td></tr>
*
* <tr><td>3</td><td>1</td><td>true</td><td>1×<code>"broker,controller"</code>, 2×<code>"broker"</code></td></tr>
* <tr><td>3</td><td>1</td><td>false</td><td>3×<code>"broker"</code>, 1×<code>"controller"</code>, </td></tr>
*
* <tr><td>1</td><td>3</td><td>true</td><td>1×<code>"broker,controller"</code>, 2×<code>"controller"</code></td></tr>
* <tr><td>1</td><td>3</td><td>false</td><td>1×<code>"broker"</code>, 3×<code>"controller"</code></td></tr>
*
* <tr><td>3</td><td>3</td><td>true</td><td>3×<code>"broker,controller"</code></td></tr>
* <tr><td>3</td><td>3</td><td>true</td><td>3×<code>"broker"</code>, 3×<code>"controller"</code></td></tr>
* </table>
*/
@Target({ ElementType.PARAMETER, ElementType.FIELD })
@Retention(RetentionPolicy.RUNTIME)
@KafkaClusterConstraint
public @interface KRaftCluster {
/**
* The number of kraft controllers
* The extension will ensure there are enough nodes started with the <code>controller</code> role.
* The extension will combine this with the <code>numBrokers</code> to generate a cluster topology.
* <table>
* <caption>Breakdown of the interaction between numControllers and numBrokers</caption>
* <tr><th>numBrokers</th><th>numControllers</th><th>roles</th></tr>
* <tr><td>1</td><td>1</td><td><code>"broker,controller"</code></td></tr>
* <tr><td>3</td><td>1</td><td><code>"broker,controller"</code>, <code>"broker"</code>, <code>"broker"</code></td></tr>
* <tr><td>1</td><td>3</td><td><code>"broker,controller"</code>, <code>"controller"</code>, <code>"controller"</code></td></tr>
* <tr><td>3</td><td>3</td><td><code>"broker,controller"</code>, <code>"broker,controller"</code>, <code>"broker,controller"</code></td></tr>
* </table>
* The number of kraft controllers.
* The extension will ensure there are this many nodes started with the <code>controller</code> role.
* combining this with the <code>numBrokers</code> and <code>combinedMode</code> to generate a cluster topology.
*
* See the class JavaDoc for example topologies.
* @return The number of KRaft controllers
*/
public int numControllers() default 1;

/**
* Whether to use combined mode, where controllers can share a JVM with brokers.
* See the class JavaDoc for example topologies.
* @return true to use combined mode.
*/
public boolean combinedMode() default true;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this really make sense? I admit its what we talked about in the other PR and thus what you've done.

I wonder if we should just expose MetadataMode here instead? I guess metadataMode = Zookeeper doesn't make sense here.

I almost wonder if we should drop the KraftCluster & ZookeeperCluster annotations and just have KafkaCluster(medataData = [KRaftCombined | KRaftIndependent | Zookeeper] ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it's a bit murky. It is certainly true that we could express all topologies using something like this:

@KafkaCluster(numXxx=X, metadataMode=Y, numBrokers=Z)

But I'm not sure what to call numXxx. It can't be numControllers, because in ZK mode the controller is a broker and the ZK nodes are counted separately. Maybe metadataQuorumSize would work?

I'm inclined to do the simplest possible thing in this PR and consider the API question separately. If nothing else to make the PRs of a sensible size.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think separate PR for an api change makes sense.

I'll give this another pass with that in mind.


}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,20 @@ public static KafkaCluster create(KafkaClusterConfig clusterConfig) {
}

var clusterMode = getExecutionMode(clusterConfig);
var kraftMode = convertClusterKraftMode(System.getenv().get(TEST_CLUSTER_KRAFT_MODE), true);
var kraftMode = convertClusterKraftMode(System.getenv().get(TEST_CLUSTER_KRAFT_MODE), MetadataMode.KRAFT_COMBINED);
var builder = clusterConfig.toBuilder();

if (clusterConfig.getExecMode() == null) {
builder.execMode(clusterMode);
}

if (clusterConfig.getKraftMode() == null) {
builder.kraftMode(kraftMode);
if (clusterConfig.getMetadataMode() == null) {
builder.metadataMode(kraftMode);
}

if (KafkaClusterExecutionMode.CONTAINER == clusterMode && kraftMode && clusterConfig.getBrokersNum() < clusterConfig.getKraftControllers()) {
if (KafkaClusterExecutionMode.CONTAINER == clusterMode
&& kraftMode != MetadataMode.ZOOKEEPER
&& clusterConfig.getBrokersNum() < clusterConfig.getKraftControllers()) {
throw new IllegalStateException(
"Due to https://github.com/ozangunalp/kafka-native/issues/88 we can't support controller only nodes in " + KafkaClusterExecutionMode.CONTAINER
+ " mode so we need to fail fast. This cluster has "
Expand Down Expand Up @@ -104,10 +106,10 @@ private static KafkaClusterExecutionMode getExecutionMode(KafkaClusterConfig clu
clusterConfig.getExecMode() == null ? KafkaClusterExecutionMode.IN_VM : clusterConfig.getExecMode());
}

private static boolean convertClusterKraftMode(String mode, boolean defaultMode) {
private static MetadataMode convertClusterKraftMode(String mode, MetadataMode defaultMode) {
if (mode == null) {
return defaultMode;
}
return Boolean.parseBoolean(mode);
return Boolean.parseBoolean(mode) ? MetadataMode.KRAFT_COMBINED : MetadataMode.ZOOKEEPER;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright Kroxylicious Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.kroxylicious.testing.kafka.common;

public enum MetadataMode {
ZOOKEEPER,
KRAFT_COMBINED,
KRAFT_SEPARATE;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
KRAFT_SEPARATE;
KRAFT_DEDICATED;


/**
* @return The total number of Kafka nodes (excludes any ZooKeeper nodes).
*/
public int numNodes(int numControllers, int numBrokers) {
return switch (this) {
case KRAFT_SEPARATE -> numControllers + numBrokers;
case KRAFT_COMBINED -> Math.max(numControllers, numBrokers);
case ZOOKEEPER -> numBrokers;
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright Kroxylicious Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.kroxylicious.testing.kafka.common;

import java.util.Collection;
import java.util.EnumSet;
import java.util.Set;
import java.util.stream.Collectors;

public enum NodeRole {
BROKER("broker"),
CONTROLLER("controller");

private final String configRole;

private NodeRole(String configRole) {
this.configRole = configRole;
}

/**
* @return The role, as it can be configured in a Kafka {@code server.properties} file.
*/
public static String forConfig(Collection<NodeRole> roles) {
return roles.stream().map(x -> x.configRole).distinct().collect(Collectors.joining(","));
}

public static boolean isPureController(Set<NodeRole> roles) {
return EnumSet.of(NodeRole.CONTROLLER).equals(roles);
}

public static boolean isCombinedNode(Set<NodeRole> roles) {
return EnumSet.of(NodeRole.CONTROLLER, NodeRole.BROKER).equals(roles);
}

public static boolean isPureBroker(Set<NodeRole> roles) {
return EnumSet.of(NodeRole.BROKER).equals(roles);
}

public static boolean hasBrokerRole(Set<NodeRole> roles) {
return roles.contains(NodeRole.BROKER);
}

public static boolean hasControllerRole(Set<NodeRole> roles) {
return roles.contains(NodeRole.CONTROLLER);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public Optional<Object> construct(Class<?> clazz, Object... parameters) {
private KafkaConfig buildBrokerConfig(KafkaClusterConfig.ConfigHolder c) {
Properties properties = new Properties();
properties.putAll(c.getProperties());
var logsDir = getBrokerLogDir(c.getBrokerNum());
var logsDir = getBrokerLogDir(c.getNodeId());
properties.setProperty(KafkaConfig.LogDirProp(), logsDir.toAbsolutePath().toString());
LOGGER.log(System.Logger.Level.DEBUG, "Generated config {0}", properties);
return new KafkaConfig(properties);
Expand All @@ -174,18 +174,33 @@ private Path getBrokerLogDir(int brokerNum) {

@Override
public synchronized void start() {
// kraft mode: per-broker: 1 external port + 1 inter-broker port + 1 controller port + 1 anon port
// zk mode: per-cluster: 1 zk port; per-broker: 1 external port + 1 inter-broker port + 1 anon port
// zookeeper mode: per-cluster: 1 zk port; per-broker: 1 external port + 1 inter-broker port + 1 anon port
// kraft combined mode: per-broker: 1 external port + 1 inter-broker port + 1 controller port + 1 anon port
// kraft separate mode: per-controller: 1 controller port
// kraft separate mode: per-broker: 1 external port + 1 inter-broker port + 1 anon port
try (PortAllocator.PortAllocationSession portAllocationSession = portsAllocator.allocationSession()) {
portAllocationSession.allocate(Set.of(Listener.EXTERNAL, Listener.ANON, Listener.INTERNAL), 0, clusterConfig.getBrokersNum());
portAllocationSession.allocate(Set.of(Listener.CONTROLLER), 0, clusterConfig.isKraftMode() ? clusterConfig.getKraftControllers() : 1);
if (!clusterConfig.isKraftMode()) {
portAllocationSession.allocate(Set.of(Listener.CONTROLLER), 0);
}
for (int nodeId = 0; nodeId < clusterConfig.numNodes(); nodeId++) {
Set<Listener> listeners = new HashSet<>();
if (clusterConfig.hasControllerRole(nodeId)) {
listeners.add(Listener.CONTROLLER);
}
if (clusterConfig.hasBrokerRole(nodeId)) {
listeners.add(Listener.EXTERNAL);
listeners.add(Listener.ANON);
listeners.add(Listener.INTERNAL);
}
portAllocationSession.allocate(listeners, nodeId);
}
}

buildAndStartZookeeper();
clusterConfig.getBrokerConfigs(() -> this).parallel().forEach(configHolder -> {
maybeBuildAndStartZookeeper();
clusterConfig.getNodeConfigs(() -> this).parallel().forEach(configHolder -> {
final Server server = this.buildKafkaServer(configHolder);
tryToStartServerWithRetry(configHolder, server);
servers.put(configHolder.getBrokerNum(), server);
servers.put(configHolder.getNodeId(), server);
});
Utils.awaitExpectedBrokerCountInClusterViaTopic(
clusterConfig.getAnonConnectConfigForCluster(buildBrokerList(nodeId -> getEndpointPair(Listener.ANON, nodeId))), 120,
Expand All @@ -198,18 +213,18 @@ private void tryToStartServerWithRetry(KafkaClusterConfig.ConfigHolder configHol
.until(() -> {
// Hopefully we can remove this once a fix for https://issues.apache.org/jira/browse/KAFKA-14908 actually lands.
try {
LOGGER.log(System.Logger.Level.DEBUG, "Attempting to start node: {0} with roles: {1}", configHolder.getBrokerNum(),
LOGGER.log(System.Logger.Level.DEBUG, "Attempting to start node: {0} with roles: {1}", configHolder.getNodeId(),
configHolder.getProperties().get("process.roles"));
server.startup();
return true;
}
catch (Throwable t) {
LOGGER.log(System.Logger.Level.WARNING, "failed to start server due to: " + t.getMessage());
LOGGER.log(System.Logger.Level.WARNING, "anon: {0}, client: {1}, controller: {2}, interBroker: {3}, ",
this.getEndpointPair(Listener.ANON, configHolder.getBrokerNum()).getBind(),
this.getEndpointPair(Listener.EXTERNAL, configHolder.getBrokerNum()).getBind(),
this.getEndpointPair(Listener.CONTROLLER, configHolder.getBrokerNum()).getBind(),
this.getEndpointPair(Listener.EXTERNAL, configHolder.getBrokerNum()).getBind());
this.getEndpointPair(Listener.ANON, configHolder.getNodeId()).getBind(),
this.getEndpointPair(Listener.EXTERNAL, configHolder.getNodeId()).getBind(),
this.getEndpointPair(Listener.CONTROLLER, configHolder.getNodeId()).getBind(),
this.getEndpointPair(Listener.EXTERNAL, configHolder.getNodeId()).getBind());

server.shutdown();
server.awaitShutdown();
Expand All @@ -218,7 +233,7 @@ private void tryToStartServerWithRetry(KafkaClusterConfig.ConfigHolder configHol
});
}

private void buildAndStartZookeeper() {
private void maybeBuildAndStartZookeeper() {
if (!clusterConfig.isKraftMode()) {
try {
final int zookeeperPort = portsAllocator.getPort(Listener.CONTROLLER, 0);
Expand Down Expand Up @@ -274,7 +289,7 @@ public Map<String, Object> getKafkaClientConfiguration(String user, String passw
@Override
public synchronized int addBroker() {
// find next free kafka node.id
var first = IntStream.rangeClosed(0, getNumOfBrokers()).filter(cand -> !servers.containsKey(cand)).findFirst();
var first = IntStream.rangeClosed(0, numNodes()).filter(cand -> !servers.containsKey(cand)).findFirst();
if (first.isEmpty()) {
throw new IllegalStateException("Could not determine new nodeId, existing set " + servers.keySet());
}
Expand All @@ -297,16 +312,16 @@ public synchronized int addBroker() {
clusterConfig.getAnonConnectConfigForCluster(buildBrokerList(nodeId -> getEndpointPair(Listener.ANON, nodeId))), 120,
TimeUnit.SECONDS,
getNumOfBrokers());
return configHolder.getBrokerNum();
return configHolder.getNodeId();
}

@Override
public synchronized void removeBroker(int nodeId) throws IllegalArgumentException, UnsupportedOperationException {
if (!servers.containsKey(nodeId)) {
throw new IllegalArgumentException("Broker node " + nodeId + " is not a member of the cluster.");
throw new IllegalArgumentException("Node " + nodeId + " is not a member of the cluster.");
}
if (clusterConfig.isKraftMode() && isController(nodeId)) {
throw new UnsupportedOperationException("Cannot remove controller node " + nodeId + " from a kraft cluster.");
if (!clusterConfig.isPureBroker(nodeId)) {
throw new UnsupportedOperationException("Node " + nodeId + " is not a pure broker.");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand what this message means but is the pure broker terminology going to be meaningful to users?

}
if (servers.size() < 2) {
throw new IllegalArgumentException("Cannot remove a node from a cluster with only %d nodes".formatted(servers.size()));
Expand All @@ -315,7 +330,7 @@ public synchronized void removeBroker(int nodeId) throws IllegalArgumentExceptio
throw new IllegalStateException("Cannot remove nodes from a cluster with stopped nodes.");
}

var target = servers.keySet().stream().filter(n -> n != nodeId).findFirst();
var target = servers.keySet().stream().filter(n -> n != nodeId && clusterConfig.hasBrokerRole(n)).findFirst();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is && clusterConfig.hasBrokerRole(n) really needed in the filter clause? Shouldn't we have validated that up front that clusterConfig.hasBrokerRole(nodeId) is true?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is about finding the target brokers for replicas currently on nodeId, I.e. n here is not the same as nodeId, and without the extra condition n might be a controller, but not a broker, thus cannot replicate any partitions hosted the broker which is being removed.

if (target.isEmpty()) {
throw new IllegalStateException("Could not identify a node to be the re-assignment target");
}
Expand Down Expand Up @@ -434,6 +449,12 @@ private static void ensureDirectoryIsEmpty(Path path) {

@Override
public synchronized int getNumOfBrokers() {
return (int) servers.keySet().stream().filter(nodeId -> nodeId >= clusterConfig.numNodes() // added nodes are always brokers
|| clusterConfig.hasBrokerRole(nodeId))
.count(); // initial nodes have broker role depending on the metadata mode
}

public synchronized int numNodes() {
return servers.size();
}

Expand Down
Loading
Loading