From b7a4656ec3582e321e1d47f21a623bde144a4d5c Mon Sep 17 00:00:00 2001 From: Raouf Date: Tue, 16 Jul 2019 11:12:20 +0200 Subject: [PATCH] enable ssl security --- README.md | 2 +- pom.xml | 6 +++ .../java/nb/kafka/operator/AppConfig.java | 49 ++++++++++++++++--- .../nb/kafka/operator/KafkaAdminImpl.java | 11 +++-- src/main/java/nb/kafka/operator/Main.java | 6 ++- .../operator/TopicCreationException.java | 17 ------- .../nb/kafka/operator/util/MeterManager.java | 20 -------- .../nb/kafka/operator/util/PropertyUtil.java | 5 -- 8 files changed, 61 insertions(+), 55 deletions(-) diff --git a/README.md b/README.md index 0c0a4ee..24907b5 100644 --- a/README.md +++ b/README.md @@ -53,7 +53,7 @@ STANDARD_LABELS | standard.labels| Comma-separated list of labels that must STANDARD_ACL_LABELS | standard.acl.labels| Comma-separated list of labels that must be set on deployments that are taken into account| empty list USERNAME_POOL_SECRET | username.pool.secret| Name of the secret containing pool of available usernames | kafka-cluster-kafka-auth-pool CONSUMED_USERNAMES_SECRET | consumed.usernames.secret| Name of the secret containing list of already used usernames | kafka-cluster-kafka-consumed-auth-pool -SECURITY_PROTOCOL | security.protocol | Security protocol to use SASL_SSL or SASL_PLAINTEXT. | empty +SECURITY_PROTOCOL | security.protocol | Security protocol to use SSL, SASL_SSL or SASL_PLAINTEXT. | empty OPERATOR_ID | operator.id| Unique id of the operator in a namespace | kafka-operator KAFKA_TIMEOUT_MS | kafka.timeout.ms | Unique id of the operator in a namespace | 30000 METRICS_PORT | metrics.port | HTTP port to expose metrics | 9889 diff --git a/pom.xml b/pom.xml index 6fa2333..f23bbb4 100644 --- a/pom.xml +++ b/pom.xml @@ -102,6 +102,12 @@ + + + **/*Crd*.* + **/AclManager** + + org.apache.maven.plugins diff --git a/src/main/java/nb/kafka/operator/AppConfig.java b/src/main/java/nb/kafka/operator/AppConfig.java index ffa9059..10d7da3 100644 --- a/src/main/java/nb/kafka/operator/AppConfig.java +++ b/src/main/java/nb/kafka/operator/AppConfig.java @@ -26,6 +26,10 @@ public class AppConfig { private short maxReplicationFactor; private int maxPartitions; private long maxRetentionMs; + private String sslTrustStoreLocation; + private String sslTrustStorePassword; + private String sslKeyStoreLocation; + private String sslKeyStorePassword; private static AppConfig defaultConfig; @@ -36,7 +40,6 @@ public static final AppConfig defaultConfig() { if (defaultConfig == null) { AppConfig conf = new AppConfig(); conf.bootstrapServers = "kafka:9092"; - conf.securityProtocol = ""; conf.defaultReplicationFactor = (short)1; conf.enableTopicDelete = false; conf.enableTopicImport = false; @@ -62,7 +65,6 @@ public AppConfig() { public AppConfig(AppConfig config) { this.setBootstrapServers(config.getBootstrapServers()); - this.setSecurityProtocol(config.getSecurityProtocol()); this.setDefaultReplicationFactor(config.getDefaultReplicationFactor()); this.setEnableTopicDelete(config.isEnabledTopicDelete()); this.setEnableTopicImport(config.isEnabledTopicImport()); @@ -78,6 +80,11 @@ public AppConfig(AppConfig config) { this.setMaxReplicationFactor(config.getMaxReplicationFactor()); this.setMaxPartitions(config.getMaxPartitions()); this.setMaxRetentionMs(config.getMaxRetentionMs()); + this.setSecurityProtocol(config.getSecurityProtocol()); + this.setSslTrustStoreLocation(config.getSslTrustStoreLocation()); + this.setSslTrustStorePassword(config.getSslTrustStorePassword()); + this.setSslKeyStoreLocation(config.getSslKeyStoreLocation()); + this.setSslKeyStorePassword(config.getSslKeyStorePassword()); } public String getBootstrapServers() { @@ -86,12 +93,6 @@ public String getBootstrapServers() { public void setBootstrapServers(String bootstrapServers) { this.bootstrapServers = bootstrapServers; } - public String getSecurityProtocol() { - return securityProtocol; - } - public void setSecurityProtocol(String securityProtocol) { - this.securityProtocol = securityProtocol; - } public short getDefaultReplicationFactor() { return defaultReplicationFactor; } @@ -182,6 +183,36 @@ public long getMaxRetentionMs() { public void setMaxRetentionMs(long maxRetentionMs) { this.maxRetentionMs = maxRetentionMs; } + public String getSecurityProtocol() { + return securityProtocol; + } + public void setSecurityProtocol(String securityProtocol) { + this.securityProtocol = securityProtocol; + } + public String getSslTrustStoreLocation() { + return sslTrustStoreLocation; + } + public void setSslTrustStoreLocation(String sslTrustStoreLocation) { + this.sslTrustStoreLocation = sslTrustStoreLocation; + } + public String getSslTrustStorePassword() { + return sslTrustStorePassword; + } + public void setSslTrustStorePassword(String sslTrustStorePassword) { + this.sslTrustStorePassword = sslTrustStorePassword; + } + public String getSslKeyStoreLocation() { + return sslKeyStoreLocation; + } + public void setSslKeyStoreLocation(String sslKeyStoreLocation) { + this.sslKeyStoreLocation = sslKeyStoreLocation; + } + public String getSslKeyStorePassword() { + return sslKeyStorePassword; + } + public void setSslKeyStorePassword(String sslKeyStorePassword) { + this.sslKeyStorePassword = sslKeyStorePassword; + } @Override public String toString() { @@ -203,6 +234,8 @@ public String toString() { ", maxReplicationFactor=" + maxReplicationFactor + ", maxPartitions=" + maxPartitions + ", maxRetentionMs=" + maxRetentionMs + + ", sslTrustStoreLocation='" + sslTrustStoreLocation + '\'' + + ", sslKeyStoreLocation='" + sslKeyStoreLocation + '\'' + '}'; } } diff --git a/src/main/java/nb/kafka/operator/KafkaAdminImpl.java b/src/main/java/nb/kafka/operator/KafkaAdminImpl.java index 7258fc9..998a1e3 100644 --- a/src/main/java/nb/kafka/operator/KafkaAdminImpl.java +++ b/src/main/java/nb/kafka/operator/KafkaAdminImpl.java @@ -27,7 +27,8 @@ import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.config.ConfigResource; -import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.security.auth.SecurityProtocol; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,10 +42,14 @@ public KafkaAdminImpl(AppConfig config) { Properties conf = new Properties(); conf.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers()); conf.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, Integer.toString(config.getKafkaTimeoutMs())); - if (!isBlank(config.getSecurityProtocol())) { + if (SecurityProtocol.SSL.name.equals(config.getSecurityProtocol())) { log.info("Using security protocol {}.", config.getSecurityProtocol()); conf.setProperty(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, config.getSecurityProtocol()); - conf.setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN"); + + conf.setProperty(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, config.getSslTrustStoreLocation() ); + conf.setProperty(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, config.getSslTrustStorePassword()); + conf.setProperty(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, config.getSslKeyStoreLocation()); + conf.setProperty(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, config.getSslKeyStorePassword()); } this.config = config; this.client = AdminClient.create(conf); diff --git a/src/main/java/nb/kafka/operator/Main.java b/src/main/java/nb/kafka/operator/Main.java index 95373b8..24eebbd 100644 --- a/src/main/java/nb/kafka/operator/Main.java +++ b/src/main/java/nb/kafka/operator/Main.java @@ -52,7 +52,11 @@ public static AppConfig loadConfig() { config.setEnableTopicImport(getSystemPropertyOrEnvVar("enable.topic.import", defaultConfig.isEnabledTopicImport())); config.setEnableAclManagement(getSystemPropertyOrEnvVar("enable.acl", defaultConfig.isEnabledAclManagement())); config.setEnableTopicDelete(getSystemPropertyOrEnvVar("enable.topic.delete", defaultConfig.isEnabledTopicDelete())); - config.setSecurityProtocol(getSystemPropertyOrEnvVar("security.protocol", "")); + config.setSecurityProtocol(getSystemPropertyOrEnvVar("security.protocol", defaultConfig.getSecurityProtocol())); + config.setSslTrustStoreLocation(getSystemPropertyOrEnvVar("ssl.truststore.location", defaultConfig.getSslTrustStoreLocation())); + config.setSslTrustStorePassword(getSystemPropertyOrEnvVar("ssl.truststore.password", defaultConfig.getSslTrustStorePassword())); + config.setSslKeyStoreLocation(getSystemPropertyOrEnvVar("ssl.keystore.location", defaultConfig.getSslKeyStoreLocation())); + config.setSslKeyStorePassword(getSystemPropertyOrEnvVar("ssl.keystore.password", defaultConfig.getSslKeyStorePassword())); if (config.isEnabledAclManagement() && isBlank(config.getSecurityProtocol())) { config.setSecurityProtocol("SASL_PLAINTEXT"); log.warn("ACL was enabled, but not security.protocol, forcing security protocol to {}", diff --git a/src/main/java/nb/kafka/operator/TopicCreationException.java b/src/main/java/nb/kafka/operator/TopicCreationException.java index 64697ea..10c922c 100644 --- a/src/main/java/nb/kafka/operator/TopicCreationException.java +++ b/src/main/java/nb/kafka/operator/TopicCreationException.java @@ -3,23 +3,6 @@ public class TopicCreationException extends Exception { private static final long serialVersionUID = -2135941761570261774L; - public TopicCreationException() { - super(); - } - - public TopicCreationException(String message, Throwable cause, boolean enableSuppression, - boolean writableStackTrace) { - super(message, cause, enableSuppression, writableStackTrace); - } - - public TopicCreationException(String message, Throwable cause) { - super(message, cause); - } - - public TopicCreationException(String message) { - super(message); - } - public TopicCreationException(Throwable cause) { super(cause); } diff --git a/src/main/java/nb/kafka/operator/util/MeterManager.java b/src/main/java/nb/kafka/operator/util/MeterManager.java index 87d9575..ede1a5a 100644 --- a/src/main/java/nb/kafka/operator/util/MeterManager.java +++ b/src/main/java/nb/kafka/operator/util/MeterManager.java @@ -36,18 +36,6 @@ public Gauge register(Gauge.Builder builder) { return register(builder.register(registry)); } - public Counter register(Counter.Builder builder) { - return register(builder.register(registry)); - } - - public Timer register(Timer.Builder builder) { - return register(builder.register(registry)); - } - - public DistributionSummary register(DistributionSummary.Builder builder) { - return register(builder.register(registry)); - } - private T register(T meter) { meters.put(meter.getId(), meter); return meter; @@ -66,18 +54,10 @@ public void close(Meter meter) { close(meter.getId()); } - public Meter getMeter(Meter.Id id) { - return meters.get(id); - } - public MeterRegistry getRegistry() { return registry; } - public Set getMeterIds() { - return meters.keySet(); - } - @Override public void close() { meters.values().forEach(meter -> { diff --git a/src/main/java/nb/kafka/operator/util/PropertyUtil.java b/src/main/java/nb/kafka/operator/util/PropertyUtil.java index 6ebca4a..a13b162 100644 --- a/src/main/java/nb/kafka/operator/util/PropertyUtil.java +++ b/src/main/java/nb/kafka/operator/util/PropertyUtil.java @@ -47,11 +47,6 @@ public static String convertSystemPropertyNameToEnvVar(String systemPropertyName return systemPropertyName.toUpperCase().replaceAll("[.-]", "_"); } - public static String getEnvVar(String envVarName, String defaultValue) { - String answer = System.getenv(envVarName); - return isNotNullOrEmpty(answer) ? answer : defaultValue; - } - public static String getSystemPropertyOrEnvVar(String systemPropertyName, String defaultValue) { return getSystemPropertyOrEnvVar(systemPropertyName, convertSystemPropertyNameToEnvVar(systemPropertyName), defaultValue);