Skip to content

Commit

Permalink
Allow annotations to be used to specify scram users
Browse files Browse the repository at this point in the history
  • Loading branch information
k-wall committed Jun 16, 2024
1 parent 6dc20cf commit 980ce32
Show file tree
Hide file tree
Showing 6 changed files with 193 additions and 55 deletions.
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,20 @@ class MyTest {
}
```

## Configuring SASL

To config the Broker to use SASL, use the annotations `@SaslMechanism` to specify the SASL
mechanism. Use `@SaslUser` configure a user and password. The `@SaslUser` annotation may be repeated
to define multiple users.

```java
class MyTest {
@SaslMechanism("PLAIN") @SaslUser(user = "alice", password = "foo") KafkaCluster cluster;

// ...
}
```

## Configuring Kafka Clients

To configure kafka clients, apply the `@ClientConfig` annotation. It can be applied to the following types:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.nio.file.Paths;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -20,6 +21,7 @@
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -47,7 +49,7 @@

import io.kroxylicious.testing.kafka.common.KafkaClusterConfig.KafkaEndpoints.Listener;

import static java.util.Locale.*;
import static java.util.Locale.ROOT;

/**
* The Kafka cluster config class.
Expand All @@ -59,6 +61,8 @@ public class KafkaClusterConfig {

private static final System.Logger LOGGER = System.getLogger(KafkaClusterConfig.class.getName());
private static final String ONE_CONFIG = Integer.toString(1);
private static final AtomicBoolean DEPRECATED_SASL_PLAIN_AUTH_USE_REPORTED = new AtomicBoolean();

public static final String BROKER_ROLE = "broker";
public static final String CONTROLLER_ROLE = "controller";

Expand Down Expand Up @@ -141,6 +145,9 @@ public class KafkaClusterConfig {
BrokerConfig.List.class,
KRaftCluster.class,
Tls.class,
SaslUser.class,
SaslUser.List.class,
SaslMechanism.class,
SaslPlainAuth.class,
SaslPlainAuth.List.class,
ZooKeeperCluster.class,
Expand All @@ -167,7 +174,9 @@ public static KafkaClusterConfig fromConstraints(List<Annotation> annotations, T
var builder = builder();
builder.testInfo(testInfo);
builder.brokersNum(1);
boolean sasl = false;
boolean useSasl = false;
Optional<Map<String, String>> saslUsers = Optional.empty();
Optional<Map<String, String>> deprecatedSaslUsers = Optional.empty();
boolean tls = false;
for (Annotation annotation : annotations) {
if (annotation instanceof BrokerCluster brokerCluster) {
Expand All @@ -189,20 +198,14 @@ public static KafkaClusterConfig fromConstraints(List<Annotation> annotations, T
throw new RuntimeException(e);
}
}
if (annotation instanceof SaslPlainAuth.List saslPlainAuthList) {
builder.saslMechanism(PLAIN_SASL_MECHANISM_NAME);
sasl = true;
Map<String, String> users = new HashMap<>();
for (var user : saslPlainAuthList.value()) {
users.put(user.user(), user.password());
}
builder.users(users);
}
else if (annotation instanceof SaslPlainAuth saslPlainAuth) {
builder.saslMechanism(PLAIN_SASL_MECHANISM_NAME);
sasl = true;
builder.users(Map.of(saslPlainAuth.user(), saslPlainAuth.password()));
if (annotation instanceof SaslMechanism mechanism) {
builder.saslMechanism(mechanism.value());
useSasl = true;
}

saslUsers = processSaslUserAnnotations(annotation);
deprecatedSaslUsers = processDeprecatedSaslUserAnnotations(annotation);

if (annotation instanceof ClusterId clusterId) {
builder.kafkaKraftClusterId(clusterId.value());
}
Expand All @@ -218,10 +221,54 @@ else if (annotation instanceof BrokerConfig brokerConfig) {
builder.brokerConfig(brokerConfig.name(), brokerConfig.value());
}
}
builder.securityProtocol((sasl ? "SASL_" : "") + (tls ? "SSL" : "PLAINTEXT"));

if (saslUsers.isPresent()) {
if (deprecatedSaslUsers.isPresent()) {
throw new IllegalArgumentException("Cannot use deprecated SaslPlainAuth with SaslUser.");
}
saslUsers.ifPresent(builder::users);

}
else if (deprecatedSaslUsers.isPresent()) {
deprecatedSaslUsers.ifPresent(builder::users);

if (DEPRECATED_SASL_PLAIN_AUTH_USE_REPORTED.compareAndExchange(false, true)) {
LOGGER.log(System.Logger.Level.WARNING, "Use of deprecated SaslPlainAuth annotation, use SaslUser instead.");
}
}

if ((saslUsers.isPresent() || deprecatedSaslUsers.isPresent()) && !useSasl) {
builder.saslMechanism("PLAIN");
useSasl = true;
}

builder.securityProtocol((useSasl ? "SASL_" : "") + (tls ? "SSL" : "PLAINTEXT"));
return builder.build();
}

private static Optional<Map<String, String>> processSaslUserAnnotations(Annotation annotation) {
if (annotation instanceof SaslUser.List saslUserList) {
return Optional.of(Arrays.stream(saslUserList.value())
.collect(Collectors.toMap(SaslUser::user, SaslUser::password)));
}
else if (annotation instanceof SaslUser saslUser) {
return Optional.of(Map.of(saslUser.user(), saslUser.password()));
}
return Optional.empty();
}

@SuppressWarnings("deprecation")
private static Optional<Map<String, String>> processDeprecatedSaslUserAnnotations(Annotation annotation) {
if (annotation instanceof SaslPlainAuth.List saslPlainAuthList) {
return Optional.of(Arrays.stream(saslPlainAuthList.value())
.collect(Collectors.toMap(SaslPlainAuth::user, SaslPlainAuth::password)));
}
else if (annotation instanceof SaslPlainAuth saslPlainAuth) {
return Optional.of(Map.of(saslPlainAuth.user(), saslPlainAuth.password()));
}
return Optional.empty();
}

/**
* Gets broker configs.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright Kroxylicious Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.kroxylicious.testing.kafka.common;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

import io.kroxylicious.testing.kafka.api.KafkaClusterConstraint;
import io.kroxylicious.testing.kafka.api.KafkaClusterProvisioningStrategy;

/**
* Annotation constraining a {@link KafkaClusterProvisioningStrategy} to
* provide a cluster with an external listener configured to expect the given
* SASL mechanism.
*/
@Target({ ElementType.PARAMETER, ElementType.FIELD })
@Retention(RetentionPolicy.RUNTIME)
@KafkaClusterConstraint
public @interface SaslMechanism {
String value();
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
@Retention(RetentionPolicy.RUNTIME)
@Repeatable(SaslPlainAuth.List.class)
@KafkaClusterConstraint
@Deprecated
public @interface SaslPlainAuth {

String user();
Expand All @@ -35,6 +36,7 @@
@Target({ ElementType.FIELD, ElementType.PARAMETER })
@Retention(RetentionPolicy.RUNTIME)
@KafkaClusterConstraint
@Deprecated
@interface List {
SaslPlainAuth[] value();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright Kroxylicious Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.kroxylicious.testing.kafka.common;

import java.lang.annotation.*;

import io.kroxylicious.testing.kafka.api.KafkaClusterConstraint;
import io.kroxylicious.testing.kafka.api.KafkaClusterProvisioningStrategy;

/**
* Annotation constraining a {@link KafkaClusterProvisioningStrategy} to
* provide a cluster with SASL enabled listener with the given user.
* <br/>
* If a @{@link SaslMechanism} constraint is not also provided, SASL PLAIN
* is assumed.
*/
@Target({ ElementType.PARAMETER, ElementType.FIELD })
@Retention(RetentionPolicy.RUNTIME)
@Repeatable(SaslUser.List.class)
@KafkaClusterConstraint
public @interface SaslUser {

String user();

String password();

/**
* The interface User password.
*/
@Target({ ElementType.FIELD, ElementType.PARAMETER })
@Retention(RetentionPolicy.RUNTIME)
@KafkaClusterConstraint
@interface List {
SaslUser[] value();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,7 @@
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;

import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.Producer;
Expand All @@ -35,13 +31,7 @@
import kafka.server.KafkaConfig;

import io.kroxylicious.testing.kafka.api.KafkaCluster;
import io.kroxylicious.testing.kafka.common.BrokerCluster;
import io.kroxylicious.testing.kafka.common.BrokerConfig;
import io.kroxylicious.testing.kafka.common.ClientConfig;
import io.kroxylicious.testing.kafka.common.KRaftCluster;
import io.kroxylicious.testing.kafka.common.SaslPlainAuth;
import io.kroxylicious.testing.kafka.common.Tls;
import io.kroxylicious.testing.kafka.common.ZooKeeperCluster;
import io.kroxylicious.testing.kafka.common.*;
import io.kroxylicious.testing.kafka.invm.InVMKafkaCluster;

import static org.apache.kafka.common.config.TopicConfig.CLEANUP_POLICY_COMPACT;
Expand All @@ -56,7 +46,6 @@
import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

@ExtendWith(KafkaClusterExtension.class)
Expand Down Expand Up @@ -226,40 +215,61 @@ public void kraftBasedClusterParameter(@BrokerCluster @KRaftCluster KafkaCluster
}

@Test
public void saslPlainAuthenticatingClusterParameter2Users(
@BrokerCluster @SaslPlainAuth(user = "alice", password = "foo") @SaslPlainAuth(user = "bob", password = "bar") KafkaCluster cluster)
throws ExecutionException, InterruptedException {
var dc = describeCluster(cluster.getKafkaClientConfiguration("alice", "foo"));
assertEquals(1, dc.nodes().get().size());
assertEquals(cluster.getClusterId(), dc.clusterId().get());
public void saslPlainAuth(@BrokerCluster @SaslUser(user = "alice", password = "foo") KafkaCluster cluster) {
doAuthExpectSucceeds(cluster, "alice", "foo");
}

dc = describeCluster(cluster.getKafkaClientConfiguration("bob", "bar"));
assertEquals(cluster.getClusterId(), dc.clusterId().get());
@Test
public void saslPlainAuthExplicitMechanism(@BrokerCluster @SaslMechanism("PLAIN") @SaslUser(user = "alice", password = "foo") KafkaCluster cluster) {
doAuthExpectSucceeds(cluster, "alice", "foo");
}

var ee = assertThrows(ExecutionException.class, () -> describeCluster(cluster.getKafkaClientConfiguration("bob", "baz")),
"Expect bad password to throw");
assertInstanceOf(SaslAuthenticationException.class, ee.getCause());
@Test
public void saslScramAuth(@BrokerCluster @SaslMechanism("SCRAM-SHA-256") @SaslUser(user = "alice", password = "foo") KafkaCluster cluster) {
doAuthExpectSucceeds(cluster, "alice", "foo");
}

ee = assertThrows(ExecutionException.class, () -> describeCluster(cluster.getKafkaClientConfiguration("eve", "quux")),
"Expect unknown user to throw");
assertInstanceOf(SaslAuthenticationException.class, ee.getCause());
@Test
public void saslAuthWithManyUsers(@BrokerCluster @SaslUser(user = "alice", password = "foo") @SaslUser(user = "bob", password = "bar") KafkaCluster cluster) {
doAuthExpectSucceeds(cluster, "alice", "foo");
doAuthExpectSucceeds(cluster, "bob", "bar");
}

@Test
public void saslPlainAuthenticatingClusterParameter1User(
@BrokerCluster @SaslPlainAuth(user = "alice", password = "foo") KafkaCluster cluster)
throws ExecutionException, InterruptedException {
var dc = describeCluster(cluster.getKafkaClientConfiguration("alice", "foo"));
assertEquals(1, dc.nodes().get().size());
assertEquals(cluster.getClusterId(), dc.clusterId().get());
@Deprecated
public void saslPlainAuthDeprecatedAnnotation(@BrokerCluster @SaslUser(user = "alice", password = "foo") KafkaCluster cluster) {
doAuthExpectSucceeds(cluster, "alice", "foo");
}

var ee = assertThrows(ExecutionException.class, () -> describeCluster(cluster.getKafkaClientConfiguration("alice", "baz")),
"Expect bad password to throw");
assertInstanceOf(SaslAuthenticationException.class, ee.getCause());
@Test
@Deprecated
public void saslPlainAuthDeprecatedAnnotationManyUsers(@BrokerCluster @SaslPlainAuth(user = "alice", password = "foo") @SaslPlainAuth(user = "bob", password = "bar") KafkaCluster cluster) {
doAuthExpectSucceeds(cluster, "alice", "foo");
doAuthExpectSucceeds(cluster, "bob", "bar");
}

private void doAuthExpectSucceeds(KafkaCluster cluster, String username, String password) {
var config = cluster.getKafkaClientConfiguration(username, password);
try (var admin = Admin.create(config)) {
var dcr = admin.describeCluster();
assertThat(dcr.clusterId())
.succeedsWithin(Duration.ofSeconds(10))
.isEqualTo(cluster.getClusterId());
}
}

ee = assertThrows(ExecutionException.class, () -> describeCluster(cluster.getKafkaClientConfiguration("bob", "bar")),
"Expect unknown user to throw");
assertInstanceOf(SaslAuthenticationException.class, ee.getCause());
@Test
public void saslPlainAuthFails(@BrokerCluster @SaslUser(user = "alice", password = "foo") KafkaCluster cluster) {
var config = cluster.getKafkaClientConfiguration("alicex", "bad");
try (var admin = Admin.create(config)) {
var dcr = admin.describeCluster();
assertThat(dcr.clusterId())
.failsWithin(Duration.ofSeconds(10))
.withThrowableThat()
.havingRootCause()
.isInstanceOf(SaslAuthenticationException.class)
.withMessage("Authentication failed: Invalid username or password");
}
}

@Test
Expand Down

0 comments on commit 980ce32

Please sign in to comment.