diff --git a/README.md b/README.md
index 0c0a4ee..24907b5 100644
--- a/README.md
+++ b/README.md
@@ -53,7 +53,7 @@ STANDARD_LABELS | standard.labels| Comma-separated list of labels that must
STANDARD_ACL_LABELS | standard.acl.labels| Comma-separated list of labels that must be set on deployments that are taken into account| empty list
USERNAME_POOL_SECRET | username.pool.secret| Name of the secret containing pool of available usernames | kafka-cluster-kafka-auth-pool
CONSUMED_USERNAMES_SECRET | consumed.usernames.secret| Name of the secret containing list of already used usernames | kafka-cluster-kafka-consumed-auth-pool
-SECURITY_PROTOCOL | security.protocol | Security protocol to use SASL_SSL or SASL_PLAINTEXT. | empty
+SECURITY_PROTOCOL | security.protocol | Security protocol to use SSL, SASL_SSL or SASL_PLAINTEXT. | empty
OPERATOR_ID | operator.id| Unique id of the operator in a namespace | kafka-operator
KAFKA_TIMEOUT_MS | kafka.timeout.ms | Unique id of the operator in a namespace | 30000
METRICS_PORT | metrics.port | HTTP port to expose metrics | 9889
diff --git a/pom.xml b/pom.xml
index 6fa2333..f23bbb4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -102,6 +102,12 @@
+
+
+ **/*Crd*.*
+ **/AclManager**
+
+
org.apache.maven.plugins
diff --git a/src/main/java/nb/kafka/operator/AppConfig.java b/src/main/java/nb/kafka/operator/AppConfig.java
index ffa9059..10d7da3 100644
--- a/src/main/java/nb/kafka/operator/AppConfig.java
+++ b/src/main/java/nb/kafka/operator/AppConfig.java
@@ -26,6 +26,10 @@ public class AppConfig {
private short maxReplicationFactor;
private int maxPartitions;
private long maxRetentionMs;
+ private String sslTrustStoreLocation;
+ private String sslTrustStorePassword;
+ private String sslKeyStoreLocation;
+ private String sslKeyStorePassword;
private static AppConfig defaultConfig;
@@ -36,7 +40,6 @@ public static final AppConfig defaultConfig() {
if (defaultConfig == null) {
AppConfig conf = new AppConfig();
conf.bootstrapServers = "kafka:9092";
- conf.securityProtocol = "";
conf.defaultReplicationFactor = (short)1;
conf.enableTopicDelete = false;
conf.enableTopicImport = false;
@@ -62,7 +65,6 @@ public AppConfig() {
public AppConfig(AppConfig config) {
this.setBootstrapServers(config.getBootstrapServers());
- this.setSecurityProtocol(config.getSecurityProtocol());
this.setDefaultReplicationFactor(config.getDefaultReplicationFactor());
this.setEnableTopicDelete(config.isEnabledTopicDelete());
this.setEnableTopicImport(config.isEnabledTopicImport());
@@ -78,6 +80,11 @@ public AppConfig(AppConfig config) {
this.setMaxReplicationFactor(config.getMaxReplicationFactor());
this.setMaxPartitions(config.getMaxPartitions());
this.setMaxRetentionMs(config.getMaxRetentionMs());
+ this.setSecurityProtocol(config.getSecurityProtocol());
+ this.setSslTrustStoreLocation(config.getSslTrustStoreLocation());
+ this.setSslTrustStorePassword(config.getSslTrustStorePassword());
+ this.setSslKeyStoreLocation(config.getSslKeyStoreLocation());
+ this.setSslKeyStorePassword(config.getSslKeyStorePassword());
}
public String getBootstrapServers() {
@@ -86,12 +93,6 @@ public String getBootstrapServers() {
public void setBootstrapServers(String bootstrapServers) {
this.bootstrapServers = bootstrapServers;
}
- public String getSecurityProtocol() {
- return securityProtocol;
- }
- public void setSecurityProtocol(String securityProtocol) {
- this.securityProtocol = securityProtocol;
- }
public short getDefaultReplicationFactor() {
return defaultReplicationFactor;
}
@@ -182,6 +183,36 @@ public long getMaxRetentionMs() {
public void setMaxRetentionMs(long maxRetentionMs) {
this.maxRetentionMs = maxRetentionMs;
}
+ public String getSecurityProtocol() {
+ return securityProtocol;
+ }
+ public void setSecurityProtocol(String securityProtocol) {
+ this.securityProtocol = securityProtocol;
+ }
+ public String getSslTrustStoreLocation() {
+ return sslTrustStoreLocation;
+ }
+ public void setSslTrustStoreLocation(String sslTrustStoreLocation) {
+ this.sslTrustStoreLocation = sslTrustStoreLocation;
+ }
+ public String getSslTrustStorePassword() {
+ return sslTrustStorePassword;
+ }
+ public void setSslTrustStorePassword(String sslTrustStorePassword) {
+ this.sslTrustStorePassword = sslTrustStorePassword;
+ }
+ public String getSslKeyStoreLocation() {
+ return sslKeyStoreLocation;
+ }
+ public void setSslKeyStoreLocation(String sslKeyStoreLocation) {
+ this.sslKeyStoreLocation = sslKeyStoreLocation;
+ }
+ public String getSslKeyStorePassword() {
+ return sslKeyStorePassword;
+ }
+ public void setSslKeyStorePassword(String sslKeyStorePassword) {
+ this.sslKeyStorePassword = sslKeyStorePassword;
+ }
@Override
public String toString() {
@@ -203,6 +234,8 @@ public String toString() {
", maxReplicationFactor=" + maxReplicationFactor +
", maxPartitions=" + maxPartitions +
", maxRetentionMs=" + maxRetentionMs +
+ ", sslTrustStoreLocation='" + sslTrustStoreLocation + '\'' +
+ ", sslKeyStoreLocation='" + sslKeyStoreLocation + '\'' +
'}';
}
}
diff --git a/src/main/java/nb/kafka/operator/KafkaAdminImpl.java b/src/main/java/nb/kafka/operator/KafkaAdminImpl.java
index 7258fc9..998a1e3 100644
--- a/src/main/java/nb/kafka/operator/KafkaAdminImpl.java
+++ b/src/main/java/nb/kafka/operator/KafkaAdminImpl.java
@@ -27,7 +27,8 @@
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.ConfigResource;
-import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,10 +42,14 @@ public KafkaAdminImpl(AppConfig config) {
Properties conf = new Properties();
conf.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers());
conf.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, Integer.toString(config.getKafkaTimeoutMs()));
- if (!isBlank(config.getSecurityProtocol())) {
+ if (SecurityProtocol.SSL.name.equals(config.getSecurityProtocol())) {
log.info("Using security protocol {}.", config.getSecurityProtocol());
conf.setProperty(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, config.getSecurityProtocol());
- conf.setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN");
+
+ conf.setProperty(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, config.getSslTrustStoreLocation() );
+ conf.setProperty(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, config.getSslTrustStorePassword());
+ conf.setProperty(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, config.getSslKeyStoreLocation());
+ conf.setProperty(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, config.getSslKeyStorePassword());
}
this.config = config;
this.client = AdminClient.create(conf);
diff --git a/src/main/java/nb/kafka/operator/Main.java b/src/main/java/nb/kafka/operator/Main.java
index 95373b8..24eebbd 100644
--- a/src/main/java/nb/kafka/operator/Main.java
+++ b/src/main/java/nb/kafka/operator/Main.java
@@ -52,7 +52,11 @@ public static AppConfig loadConfig() {
config.setEnableTopicImport(getSystemPropertyOrEnvVar("enable.topic.import", defaultConfig.isEnabledTopicImport()));
config.setEnableAclManagement(getSystemPropertyOrEnvVar("enable.acl", defaultConfig.isEnabledAclManagement()));
config.setEnableTopicDelete(getSystemPropertyOrEnvVar("enable.topic.delete", defaultConfig.isEnabledTopicDelete()));
- config.setSecurityProtocol(getSystemPropertyOrEnvVar("security.protocol", ""));
+ config.setSecurityProtocol(getSystemPropertyOrEnvVar("security.protocol", defaultConfig.getSecurityProtocol()));
+ config.setSslTrustStoreLocation(getSystemPropertyOrEnvVar("ssl.truststore.location", defaultConfig.getSslTrustStoreLocation()));
+ config.setSslTrustStorePassword(getSystemPropertyOrEnvVar("ssl.truststore.password", defaultConfig.getSslTrustStorePassword()));
+ config.setSslKeyStoreLocation(getSystemPropertyOrEnvVar("ssl.keystore.location", defaultConfig.getSslKeyStoreLocation()));
+ config.setSslKeyStorePassword(getSystemPropertyOrEnvVar("ssl.keystore.password", defaultConfig.getSslKeyStorePassword()));
if (config.isEnabledAclManagement() && isBlank(config.getSecurityProtocol())) {
config.setSecurityProtocol("SASL_PLAINTEXT");
log.warn("ACL was enabled, but not security.protocol, forcing security protocol to {}",
diff --git a/src/main/java/nb/kafka/operator/TopicCreationException.java b/src/main/java/nb/kafka/operator/TopicCreationException.java
index 64697ea..10c922c 100644
--- a/src/main/java/nb/kafka/operator/TopicCreationException.java
+++ b/src/main/java/nb/kafka/operator/TopicCreationException.java
@@ -3,23 +3,6 @@
public class TopicCreationException extends Exception {
private static final long serialVersionUID = -2135941761570261774L;
- public TopicCreationException() {
- super();
- }
-
- public TopicCreationException(String message, Throwable cause, boolean enableSuppression,
- boolean writableStackTrace) {
- super(message, cause, enableSuppression, writableStackTrace);
- }
-
- public TopicCreationException(String message, Throwable cause) {
- super(message, cause);
- }
-
- public TopicCreationException(String message) {
- super(message);
- }
-
public TopicCreationException(Throwable cause) {
super(cause);
}
diff --git a/src/main/java/nb/kafka/operator/util/MeterManager.java b/src/main/java/nb/kafka/operator/util/MeterManager.java
index 87d9575..ede1a5a 100644
--- a/src/main/java/nb/kafka/operator/util/MeterManager.java
+++ b/src/main/java/nb/kafka/operator/util/MeterManager.java
@@ -36,18 +36,6 @@ public Gauge register(Gauge.Builder builder) {
return register(builder.register(registry));
}
- public Counter register(Counter.Builder builder) {
- return register(builder.register(registry));
- }
-
- public Timer register(Timer.Builder builder) {
- return register(builder.register(registry));
- }
-
- public DistributionSummary register(DistributionSummary.Builder builder) {
- return register(builder.register(registry));
- }
-
private T register(T meter) {
meters.put(meter.getId(), meter);
return meter;
@@ -66,18 +54,10 @@ public void close(Meter meter) {
close(meter.getId());
}
- public Meter getMeter(Meter.Id id) {
- return meters.get(id);
- }
-
public MeterRegistry getRegistry() {
return registry;
}
- public Set getMeterIds() {
- return meters.keySet();
- }
-
@Override
public void close() {
meters.values().forEach(meter -> {
diff --git a/src/main/java/nb/kafka/operator/util/PropertyUtil.java b/src/main/java/nb/kafka/operator/util/PropertyUtil.java
index 6ebca4a..a13b162 100644
--- a/src/main/java/nb/kafka/operator/util/PropertyUtil.java
+++ b/src/main/java/nb/kafka/operator/util/PropertyUtil.java
@@ -47,11 +47,6 @@ public static String convertSystemPropertyNameToEnvVar(String systemPropertyName
return systemPropertyName.toUpperCase().replaceAll("[.-]", "_");
}
- public static String getEnvVar(String envVarName, String defaultValue) {
- String answer = System.getenv(envVarName);
- return isNotNullOrEmpty(answer) ? answer : defaultValue;
- }
-
public static String getSystemPropertyOrEnvVar(String systemPropertyName, String defaultValue) {
return getSystemPropertyOrEnvVar(systemPropertyName, convertSystemPropertyNameToEnvVar(systemPropertyName),
defaultValue);