diff --git a/README.md b/README.md index 67c925b1..9ec05d93 100644 --- a/README.md +++ b/README.md @@ -106,6 +106,20 @@ class MyTest { } ``` +## Configuring SASL + +To config the Broker to use SASL, use the annotations `@SaslMechanism` to specify the SASL +mechanism. Use `@SaslUser` configure a user and password. The `@SaslUser` annotation may be repeated +to define multiple users. + +```java +class MyTest { +@SaslMechanism("PLAIN") @SaslUser(user = "alice", password = "foo") KafkaCluster cluster; + + // ... +} +``` + ## Configuring Kafka Clients To configure kafka clients, apply the `@ClientConfig` annotation. It can be applied to the following types: diff --git a/impl/src/main/java/io/kroxylicious/testing/kafka/common/KafkaClusterConfig.java b/impl/src/main/java/io/kroxylicious/testing/kafka/common/KafkaClusterConfig.java index 63772e0b..ea39c829 100644 --- a/impl/src/main/java/io/kroxylicious/testing/kafka/common/KafkaClusterConfig.java +++ b/impl/src/main/java/io/kroxylicious/testing/kafka/common/KafkaClusterConfig.java @@ -12,6 +12,7 @@ import java.nio.file.Paths; import java.security.GeneralSecurityException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -20,6 +21,7 @@ import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -47,7 +49,7 @@ import io.kroxylicious.testing.kafka.common.KafkaClusterConfig.KafkaEndpoints.Listener; -import static java.util.Locale.*; +import static java.util.Locale.ROOT; /** * The Kafka cluster config class. @@ -59,6 +61,8 @@ public class KafkaClusterConfig { private static final System.Logger LOGGER = System.getLogger(KafkaClusterConfig.class.getName()); private static final String ONE_CONFIG = Integer.toString(1); + private static final AtomicBoolean DEPRECATED_SASL_PLAIN_AUTH_USE_REPORTED = new AtomicBoolean(); + public static final String BROKER_ROLE = "broker"; public static final String CONTROLLER_ROLE = "controller"; @@ -141,6 +145,9 @@ public class KafkaClusterConfig { BrokerConfig.List.class, KRaftCluster.class, Tls.class, + SaslUser.class, + SaslUser.List.class, + SaslMechanism.class, SaslPlainAuth.class, SaslPlainAuth.List.class, ZooKeeperCluster.class, @@ -167,7 +174,9 @@ public static KafkaClusterConfig fromConstraints(List annotations, T var builder = builder(); builder.testInfo(testInfo); builder.brokersNum(1); - boolean sasl = false; + boolean useSasl = false; + Optional> saslUsers = Optional.empty(); + Optional> deprecatedSaslUsers = Optional.empty(); boolean tls = false; for (Annotation annotation : annotations) { if (annotation instanceof BrokerCluster brokerCluster) { @@ -189,20 +198,14 @@ public static KafkaClusterConfig fromConstraints(List annotations, T throw new RuntimeException(e); } } - if (annotation instanceof SaslPlainAuth.List saslPlainAuthList) { - builder.saslMechanism(PLAIN_SASL_MECHANISM_NAME); - sasl = true; - Map users = new HashMap<>(); - for (var user : saslPlainAuthList.value()) { - users.put(user.user(), user.password()); - } - builder.users(users); - } - else if (annotation instanceof SaslPlainAuth saslPlainAuth) { - builder.saslMechanism(PLAIN_SASL_MECHANISM_NAME); - sasl = true; - builder.users(Map.of(saslPlainAuth.user(), saslPlainAuth.password())); + if (annotation instanceof SaslMechanism mechanism) { + builder.saslMechanism(mechanism.value()); + useSasl = true; } + + saslUsers = processSaslUserAnnotations(annotation); + deprecatedSaslUsers = processDeprecatedSaslUserAnnotations(annotation); + if (annotation instanceof ClusterId clusterId) { builder.kafkaKraftClusterId(clusterId.value()); } @@ -218,10 +221,54 @@ else if (annotation instanceof BrokerConfig brokerConfig) { builder.brokerConfig(brokerConfig.name(), brokerConfig.value()); } } - builder.securityProtocol((sasl ? "SASL_" : "") + (tls ? "SSL" : "PLAINTEXT")); + + if (saslUsers.isPresent()) { + if (deprecatedSaslUsers.isPresent()) { + throw new IllegalArgumentException("Cannot use deprecated SaslPlainAuth with SaslUser."); + } + saslUsers.ifPresent(builder::users); + + } + else if (deprecatedSaslUsers.isPresent()) { + deprecatedSaslUsers.ifPresent(builder::users); + + if (DEPRECATED_SASL_PLAIN_AUTH_USE_REPORTED.compareAndExchange(false, true)) { + LOGGER.log(System.Logger.Level.WARNING, "Use of deprecated SaslPlainAuth annotation, use SaslUser instead."); + } + } + + if ((saslUsers.isPresent() || deprecatedSaslUsers.isPresent()) && !useSasl) { + builder.saslMechanism("PLAIN"); + useSasl = true; + } + + builder.securityProtocol((useSasl ? "SASL_" : "") + (tls ? "SSL" : "PLAINTEXT")); return builder.build(); } + private static Optional> processSaslUserAnnotations(Annotation annotation) { + if (annotation instanceof SaslUser.List saslUserList) { + return Optional.of(Arrays.stream(saslUserList.value()) + .collect(Collectors.toMap(SaslUser::user, SaslUser::password))); + } + else if (annotation instanceof SaslUser saslUser) { + return Optional.of(Map.of(saslUser.user(), saslUser.password())); + } + return Optional.empty(); + } + + @SuppressWarnings("deprecation") + private static Optional> processDeprecatedSaslUserAnnotations(Annotation annotation) { + if (annotation instanceof SaslPlainAuth.List saslPlainAuthList) { + return Optional.of(Arrays.stream(saslPlainAuthList.value()) + .collect(Collectors.toMap(SaslPlainAuth::user, SaslPlainAuth::password))); + } + else if (annotation instanceof SaslPlainAuth saslPlainAuth) { + return Optional.of(Map.of(saslPlainAuth.user(), saslPlainAuth.password())); + } + return Optional.empty(); + } + /** * Gets broker configs. * diff --git a/impl/src/main/java/io/kroxylicious/testing/kafka/common/SaslMechanism.java b/impl/src/main/java/io/kroxylicious/testing/kafka/common/SaslMechanism.java new file mode 100644 index 00000000..25f770df --- /dev/null +++ b/impl/src/main/java/io/kroxylicious/testing/kafka/common/SaslMechanism.java @@ -0,0 +1,26 @@ +/* + * Copyright Kroxylicious Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.kroxylicious.testing.kafka.common; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +import io.kroxylicious.testing.kafka.api.KafkaClusterConstraint; +import io.kroxylicious.testing.kafka.api.KafkaClusterProvisioningStrategy; + +/** + * Annotation constraining a {@link KafkaClusterProvisioningStrategy} to + * provide a cluster with an external listener configured to expect the given + * SASL mechanism. + */ +@Target({ ElementType.PARAMETER, ElementType.FIELD }) +@Retention(RetentionPolicy.RUNTIME) +@KafkaClusterConstraint +public @interface SaslMechanism { + String value(); +} diff --git a/impl/src/main/java/io/kroxylicious/testing/kafka/common/SaslPlainAuth.java b/impl/src/main/java/io/kroxylicious/testing/kafka/common/SaslPlainAuth.java index c41fd667..3f8222b0 100644 --- a/impl/src/main/java/io/kroxylicious/testing/kafka/common/SaslPlainAuth.java +++ b/impl/src/main/java/io/kroxylicious/testing/kafka/common/SaslPlainAuth.java @@ -23,6 +23,7 @@ @Retention(RetentionPolicy.RUNTIME) @Repeatable(SaslPlainAuth.List.class) @KafkaClusterConstraint +@Deprecated public @interface SaslPlainAuth { String user(); @@ -35,6 +36,7 @@ @Target({ ElementType.FIELD, ElementType.PARAMETER }) @Retention(RetentionPolicy.RUNTIME) @KafkaClusterConstraint + @Deprecated @interface List { SaslPlainAuth[] value(); } diff --git a/impl/src/main/java/io/kroxylicious/testing/kafka/common/SaslUser.java b/impl/src/main/java/io/kroxylicious/testing/kafka/common/SaslUser.java new file mode 100644 index 00000000..608eab41 --- /dev/null +++ b/impl/src/main/java/io/kroxylicious/testing/kafka/common/SaslUser.java @@ -0,0 +1,39 @@ +/* + * Copyright Kroxylicious Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.kroxylicious.testing.kafka.common; + +import java.lang.annotation.*; + +import io.kroxylicious.testing.kafka.api.KafkaClusterConstraint; +import io.kroxylicious.testing.kafka.api.KafkaClusterProvisioningStrategy; + +/** + * Annotation constraining a {@link KafkaClusterProvisioningStrategy} to + * provide a cluster with SASL enabled listener with the given user. + *
+ * If a @{@link SaslMechanism} constraint is not also provided, SASL PLAIN + * is assumed. + */ +@Target({ ElementType.PARAMETER, ElementType.FIELD }) +@Retention(RetentionPolicy.RUNTIME) +@Repeatable(SaslUser.List.class) +@KafkaClusterConstraint +public @interface SaslUser { + + String user(); + + String password(); + + /** + * The interface User password. + */ + @Target({ ElementType.FIELD, ElementType.PARAMETER }) + @Retention(RetentionPolicy.RUNTIME) + @KafkaClusterConstraint + @interface List { + SaslUser[] value(); + } +} diff --git a/junit5-extension/src/test/java/io/kroxylicious/testing/kafka/junit5ext/ParameterExtensionTest.java b/junit5-extension/src/test/java/io/kroxylicious/testing/kafka/junit5ext/ParameterExtensionTest.java index d6d1bc2c..f0710dd3 100644 --- a/junit5-extension/src/test/java/io/kroxylicious/testing/kafka/junit5ext/ParameterExtensionTest.java +++ b/junit5-extension/src/test/java/io/kroxylicious/testing/kafka/junit5ext/ParameterExtensionTest.java @@ -12,11 +12,7 @@ import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; -import org.apache.kafka.clients.admin.Admin; -import org.apache.kafka.clients.admin.ConfigEntry; -import org.apache.kafka.clients.admin.ConsumerGroupListing; -import org.apache.kafka.clients.admin.NewTopic; -import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.clients.admin.*; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.Producer; @@ -35,13 +31,7 @@ import kafka.server.KafkaConfig; import io.kroxylicious.testing.kafka.api.KafkaCluster; -import io.kroxylicious.testing.kafka.common.BrokerCluster; -import io.kroxylicious.testing.kafka.common.BrokerConfig; -import io.kroxylicious.testing.kafka.common.ClientConfig; -import io.kroxylicious.testing.kafka.common.KRaftCluster; -import io.kroxylicious.testing.kafka.common.SaslPlainAuth; -import io.kroxylicious.testing.kafka.common.Tls; -import io.kroxylicious.testing.kafka.common.ZooKeeperCluster; +import io.kroxylicious.testing.kafka.common.*; import io.kroxylicious.testing.kafka.invm.InVMKafkaCluster; import static org.apache.kafka.common.config.TopicConfig.CLEANUP_POLICY_COMPACT; @@ -56,7 +46,6 @@ import static org.junit.jupiter.api.Assertions.assertNotSame; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertSame; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @ExtendWith(KafkaClusterExtension.class) @@ -226,40 +215,61 @@ public void kraftBasedClusterParameter(@BrokerCluster @KRaftCluster KafkaCluster } @Test - public void saslPlainAuthenticatingClusterParameter2Users( - @BrokerCluster @SaslPlainAuth(user = "alice", password = "foo") @SaslPlainAuth(user = "bob", password = "bar") KafkaCluster cluster) - throws ExecutionException, InterruptedException { - var dc = describeCluster(cluster.getKafkaClientConfiguration("alice", "foo")); - assertEquals(1, dc.nodes().get().size()); - assertEquals(cluster.getClusterId(), dc.clusterId().get()); + public void saslPlainAuth(@BrokerCluster @SaslUser(user = "alice", password = "foo") KafkaCluster cluster) { + doAuthExpectSucceeds(cluster, "alice", "foo"); + } - dc = describeCluster(cluster.getKafkaClientConfiguration("bob", "bar")); - assertEquals(cluster.getClusterId(), dc.clusterId().get()); + @Test + public void saslPlainAuthExplicitMechanism(@BrokerCluster @SaslMechanism("PLAIN") @SaslUser(user = "alice", password = "foo") KafkaCluster cluster) { + doAuthExpectSucceeds(cluster, "alice", "foo"); + } - var ee = assertThrows(ExecutionException.class, () -> describeCluster(cluster.getKafkaClientConfiguration("bob", "baz")), - "Expect bad password to throw"); - assertInstanceOf(SaslAuthenticationException.class, ee.getCause()); + @Test + public void saslScramAuth(@BrokerCluster @SaslMechanism("SCRAM-SHA-256") @SaslUser(user = "alice", password = "foo") KafkaCluster cluster) { + doAuthExpectSucceeds(cluster, "alice", "foo"); + } - ee = assertThrows(ExecutionException.class, () -> describeCluster(cluster.getKafkaClientConfiguration("eve", "quux")), - "Expect unknown user to throw"); - assertInstanceOf(SaslAuthenticationException.class, ee.getCause()); + @Test + public void saslAuthWithManyUsers(@BrokerCluster @SaslUser(user = "alice", password = "foo") @SaslUser(user = "bob", password = "bar") KafkaCluster cluster) { + doAuthExpectSucceeds(cluster, "alice", "foo"); + doAuthExpectSucceeds(cluster, "bob", "bar"); } @Test - public void saslPlainAuthenticatingClusterParameter1User( - @BrokerCluster @SaslPlainAuth(user = "alice", password = "foo") KafkaCluster cluster) - throws ExecutionException, InterruptedException { - var dc = describeCluster(cluster.getKafkaClientConfiguration("alice", "foo")); - assertEquals(1, dc.nodes().get().size()); - assertEquals(cluster.getClusterId(), dc.clusterId().get()); + @Deprecated + public void saslPlainAuthDeprecatedAnnotation(@BrokerCluster @SaslUser(user = "alice", password = "foo") KafkaCluster cluster) { + doAuthExpectSucceeds(cluster, "alice", "foo"); + } - var ee = assertThrows(ExecutionException.class, () -> describeCluster(cluster.getKafkaClientConfiguration("alice", "baz")), - "Expect bad password to throw"); - assertInstanceOf(SaslAuthenticationException.class, ee.getCause()); + @Test + @Deprecated + public void saslPlainAuthDeprecatedAnnotationManyUsers(@BrokerCluster @SaslPlainAuth(user = "alice", password = "foo") @SaslPlainAuth(user = "bob", password = "bar") KafkaCluster cluster) { + doAuthExpectSucceeds(cluster, "alice", "foo"); + doAuthExpectSucceeds(cluster, "bob", "bar"); + } + + private void doAuthExpectSucceeds(KafkaCluster cluster, String username, String password) { + var config = cluster.getKafkaClientConfiguration(username, password); + try (var admin = Admin.create(config)) { + var dcr = admin.describeCluster(); + assertThat(dcr.clusterId()) + .succeedsWithin(Duration.ofSeconds(10)) + .isEqualTo(cluster.getClusterId()); + } + } - ee = assertThrows(ExecutionException.class, () -> describeCluster(cluster.getKafkaClientConfiguration("bob", "bar")), - "Expect unknown user to throw"); - assertInstanceOf(SaslAuthenticationException.class, ee.getCause()); + @Test + public void saslPlainAuthFails(@BrokerCluster @SaslUser(user = "alice", password = "foo") KafkaCluster cluster) { + var config = cluster.getKafkaClientConfiguration("alicex", "bad"); + try (var admin = Admin.create(config)) { + var dcr = admin.describeCluster(); + assertThat(dcr.clusterId()) + .failsWithin(Duration.ofSeconds(10)) + .withThrowableThat() + .havingRootCause() + .isInstanceOf(SaslAuthenticationException.class) + .withMessage("Authentication failed: Invalid username or password"); + } } @Test