Skip to content

Commit

Permalink
enable ssl security
Browse files Browse the repository at this point in the history
  • Loading branch information
roufa85 committed Jul 22, 2019
1 parent 67df8d9 commit be4c5b1
Show file tree
Hide file tree
Showing 8 changed files with 61 additions and 55 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ STANDARD_LABELS | standard.labels| Comma-separated list of labels that must
STANDARD_ACL_LABELS | standard.acl.labels| Comma-separated list of labels that must be set on deployments that are taken into account| empty list
USERNAME_POOL_SECRET | username.pool.secret| Name of the secret containing pool of available usernames | kafka-cluster-kafka-auth-pool
CONSUMED_USERNAMES_SECRET | consumed.usernames.secret| Name of the secret containing list of already used usernames | kafka-cluster-kafka-consumed-auth-pool
SECURITY_PROTOCOL | security.protocol | Security protocol to use SASL_SSL or SASL_PLAINTEXT. | empty
SECURITY_PROTOCOL | security.protocol | Security protocol to use SSL, SASL_SSL or SASL_PLAINTEXT. | empty
OPERATOR_ID | operator.id| Unique id of the operator in a namespace | kafka-operator
KAFKA_TIMEOUT_MS | kafka.timeout.ms | Unique id of the operator in a namespace | 30000
METRICS_PORT | metrics.port | HTTP port to expose metrics | 9889
Expand Down
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,12 @@
</goals>
</execution>
</executions>
<configuration>
<excludes>
<exclude>**/*Crd*.*</exclude>
<exclude>**/AclManager**</exclude>
</excludes>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand Down
49 changes: 41 additions & 8 deletions src/main/java/nb/kafka/operator/AppConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ public class AppConfig {
private short maxReplicationFactor;
private int maxPartitions;
private long maxRetentionMs;
private String sslTrustStoreLocation;
private String sslTrustStorePassword;
private String sslKeyStoreLocation;
private String sslKeyStorePassword;

private static AppConfig defaultConfig;

Expand All @@ -36,7 +40,6 @@ public static final AppConfig defaultConfig() {
if (defaultConfig == null) {
AppConfig conf = new AppConfig();
conf.bootstrapServers = "kafka:9092";
conf.securityProtocol = "";
conf.defaultReplicationFactor = (short)1;
conf.enableTopicDelete = false;
conf.enableTopicImport = false;
Expand All @@ -62,7 +65,6 @@ public AppConfig() {

public AppConfig(AppConfig config) {
this.setBootstrapServers(config.getBootstrapServers());
this.setSecurityProtocol(config.getSecurityProtocol());
this.setDefaultReplicationFactor(config.getDefaultReplicationFactor());
this.setEnableTopicDelete(config.isEnabledTopicDelete());
this.setEnableTopicImport(config.isEnabledTopicImport());
Expand All @@ -78,6 +80,11 @@ public AppConfig(AppConfig config) {
this.setMaxReplicationFactor(config.getMaxReplicationFactor());
this.setMaxPartitions(config.getMaxPartitions());
this.setMaxRetentionMs(config.getMaxRetentionMs());
this.setSecurityProtocol(config.getSecurityProtocol());
this.setSslTrustStoreLocation(config.getSslTrustStoreLocation());
this.setSslTrustStorePassword(config.getSslTrustStorePassword());
this.setSslKeyStoreLocation(config.getSslKeyStoreLocation());
this.setSslKeyStorePassword(config.getSslKeyStorePassword());
}

public String getBootstrapServers() {
Expand All @@ -86,12 +93,6 @@ public String getBootstrapServers() {
public void setBootstrapServers(String bootstrapServers) {
this.bootstrapServers = bootstrapServers;
}
public String getSecurityProtocol() {
return securityProtocol;
}
public void setSecurityProtocol(String securityProtocol) {
this.securityProtocol = securityProtocol;
}
public short getDefaultReplicationFactor() {
return defaultReplicationFactor;
}
Expand Down Expand Up @@ -182,6 +183,36 @@ public long getMaxRetentionMs() {
public void setMaxRetentionMs(long maxRetentionMs) {
this.maxRetentionMs = maxRetentionMs;
}
public String getSecurityProtocol() {
return securityProtocol;
}
public void setSecurityProtocol(String securityProtocol) {
this.securityProtocol = securityProtocol;
}
public String getSslTrustStoreLocation() {
return sslTrustStoreLocation;
}
public void setSslTrustStoreLocation(String sslTrustStoreLocation) {
this.sslTrustStoreLocation = sslTrustStoreLocation;
}
public String getSslTrustStorePassword() {
return sslTrustStorePassword;
}
public void setSslTrustStorePassword(String sslTrustStorePassword) {
this.sslTrustStorePassword = sslTrustStorePassword;
}
public String getSslKeyStoreLocation() {
return sslKeyStoreLocation;
}
public void setSslKeyStoreLocation(String sslKeyStoreLocation) {
this.sslKeyStoreLocation = sslKeyStoreLocation;
}
public String getSslKeyStorePassword() {
return sslKeyStorePassword;
}
public void setSslKeyStorePassword(String sslKeyStorePassword) {
this.sslKeyStorePassword = sslKeyStorePassword;
}

@Override
public String toString() {
Expand All @@ -203,6 +234,8 @@ public String toString() {
", maxReplicationFactor=" + maxReplicationFactor +
", maxPartitions=" + maxPartitions +
", maxRetentionMs=" + maxRetentionMs +
", sslTrustStoreLocation='" + sslTrustStoreLocation + '\'' +
", sslKeyStoreLocation='" + sslKeyStoreLocation + '\'' +
'}';
}
}
11 changes: 8 additions & 3 deletions src/main/java/nb/kafka/operator/KafkaAdminImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -41,10 +42,14 @@ public KafkaAdminImpl(AppConfig config) {
Properties conf = new Properties();
conf.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers());
conf.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, Integer.toString(config.getKafkaTimeoutMs()));
if (!isBlank(config.getSecurityProtocol())) {
if (SecurityProtocol.SSL.name.equals(config.getSecurityProtocol())) {
log.info("Using security protocol {}.", config.getSecurityProtocol());
conf.setProperty(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, config.getSecurityProtocol());
conf.setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN");

conf.setProperty(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, config.getSslTrustStoreLocation() );
conf.setProperty(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, config.getSslTrustStorePassword());
conf.setProperty(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, config.getSslKeyStoreLocation());
conf.setProperty(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, config.getSslKeyStorePassword());
}
this.config = config;
this.client = AdminClient.create(conf);
Expand Down
6 changes: 5 additions & 1 deletion src/main/java/nb/kafka/operator/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,11 @@ public static AppConfig loadConfig() {
config.setEnableTopicImport(getSystemPropertyOrEnvVar("enable.topic.import", defaultConfig.isEnabledTopicImport()));
config.setEnableAclManagement(getSystemPropertyOrEnvVar("enable.acl", defaultConfig.isEnabledAclManagement()));
config.setEnableTopicDelete(getSystemPropertyOrEnvVar("enable.topic.delete", defaultConfig.isEnabledTopicDelete()));
config.setSecurityProtocol(getSystemPropertyOrEnvVar("security.protocol", ""));
config.setSecurityProtocol(getSystemPropertyOrEnvVar("security.protocol", defaultConfig.getSecurityProtocol()));
config.setSslTrustStoreLocation(getSystemPropertyOrEnvVar("ssl.truststore.location", defaultConfig.getSslTrustStoreLocation()));
config.setSslTrustStorePassword(getSystemPropertyOrEnvVar("ssl.truststore.password", defaultConfig.getSslTrustStorePassword()));
config.setSslKeyStoreLocation(getSystemPropertyOrEnvVar("ssl.keystore.location", defaultConfig.getSslKeyStoreLocation()));
config.setSslKeyStorePassword(getSystemPropertyOrEnvVar("ssl.keystore.password", defaultConfig.getSslKeyStorePassword()));
if (config.isEnabledAclManagement() && isBlank(config.getSecurityProtocol())) {
config.setSecurityProtocol("SASL_PLAINTEXT");
log.warn("ACL was enabled, but not security.protocol, forcing security protocol to {}",
Expand Down
17 changes: 0 additions & 17 deletions src/main/java/nb/kafka/operator/TopicCreationException.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,6 @@
public class TopicCreationException extends Exception {
private static final long serialVersionUID = -2135941761570261774L;

public TopicCreationException() {
super();
}

public TopicCreationException(String message, Throwable cause, boolean enableSuppression,
boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}

public TopicCreationException(String message, Throwable cause) {
super(message, cause);
}

public TopicCreationException(String message) {
super(message);
}

public TopicCreationException(Throwable cause) {
super(cause);
}
Expand Down
20 changes: 0 additions & 20 deletions src/main/java/nb/kafka/operator/util/MeterManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,6 @@ public <T> Gauge register(Gauge.Builder<T> builder) {
return register(builder.register(registry));
}

public Counter register(Counter.Builder builder) {
return register(builder.register(registry));
}

public Timer register(Timer.Builder builder) {
return register(builder.register(registry));
}

public DistributionSummary register(DistributionSummary.Builder builder) {
return register(builder.register(registry));
}

private <T extends Meter> T register(T meter) {
meters.put(meter.getId(), meter);
return meter;
Expand All @@ -66,18 +54,10 @@ public void close(Meter meter) {
close(meter.getId());
}

public Meter getMeter(Meter.Id id) {
return meters.get(id);
}

public MeterRegistry getRegistry() {
return registry;
}

public Set<Meter.Id> getMeterIds() {
return meters.keySet();
}

@Override
public void close() {
meters.values().forEach(meter -> {
Expand Down
5 changes: 0 additions & 5 deletions src/main/java/nb/kafka/operator/util/PropertyUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,6 @@ public static String convertSystemPropertyNameToEnvVar(String systemPropertyName
return systemPropertyName.toUpperCase().replaceAll("[.-]", "_");
}

public static String getEnvVar(String envVarName, String defaultValue) {
String answer = System.getenv(envVarName);
return isNotNullOrEmpty(answer) ? answer : defaultValue;
}

public static String getSystemPropertyOrEnvVar(String systemPropertyName, String defaultValue) {
return getSystemPropertyOrEnvVar(systemPropertyName, convertSystemPropertyNameToEnvVar(systemPropertyName),
defaultValue);
Expand Down

0 comments on commit be4c5b1

Please sign in to comment.