Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enable ssl security #28

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ STANDARD_LABELS | standard.labels| Comma-separated list of labels that must
STANDARD_ACL_LABELS | standard.acl.labels| Comma-separated list of labels that must be set on deployments that are taken into account| empty list
USERNAME_POOL_SECRET | username.pool.secret| Name of the secret containing pool of available usernames | kafka-cluster-kafka-auth-pool
CONSUMED_USERNAMES_SECRET | consumed.usernames.secret| Name of the secret containing list of already used usernames | kafka-cluster-kafka-consumed-auth-pool
SECURITY_PROTOCOL | security.protocol | Security protocol to use SASL_SSL or SASL_PLAINTEXT. | empty
SECURITY_PROTOCOL | security.protocol | Security protocol to use SSL, SASL_SSL or SASL_PLAINTEXT. | empty
OPERATOR_ID | operator.id| Unique id of the operator in a namespace | kafka-operator
KAFKA_TIMEOUT_MS | kafka.timeout.ms | Unique id of the operator in a namespace | 30000
METRICS_PORT | metrics.port | HTTP port to expose metrics | 9889
Expand Down
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,12 @@
</goals>
</execution>
</executions>
<configuration>
<excludes>
<exclude>**/*Crd*.*</exclude>
<exclude>**/AclManager**</exclude>
</excludes>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand Down
49 changes: 41 additions & 8 deletions src/main/java/nb/kafka/operator/AppConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ public class AppConfig {
private short maxReplicationFactor;
private int maxPartitions;
private long maxRetentionMs;
private String sslTrustStoreLocation;
private String sslTrustStorePassword;
private String sslKeyStoreLocation;
private String sslKeyStorePassword;

private static AppConfig defaultConfig;

Expand All @@ -36,7 +40,6 @@ public static final AppConfig defaultConfig() {
if (defaultConfig == null) {
AppConfig conf = new AppConfig();
conf.bootstrapServers = "kafka:9092";
conf.securityProtocol = "";
conf.defaultReplicationFactor = (short)1;
conf.enableTopicDelete = false;
conf.enableTopicImport = false;
Expand All @@ -62,7 +65,6 @@ public AppConfig() {

public AppConfig(AppConfig config) {
this.setBootstrapServers(config.getBootstrapServers());
this.setSecurityProtocol(config.getSecurityProtocol());
this.setDefaultReplicationFactor(config.getDefaultReplicationFactor());
this.setEnableTopicDelete(config.isEnabledTopicDelete());
this.setEnableTopicImport(config.isEnabledTopicImport());
Expand All @@ -78,6 +80,11 @@ public AppConfig(AppConfig config) {
this.setMaxReplicationFactor(config.getMaxReplicationFactor());
this.setMaxPartitions(config.getMaxPartitions());
this.setMaxRetentionMs(config.getMaxRetentionMs());
this.setSecurityProtocol(config.getSecurityProtocol());
this.setSslTrustStoreLocation(config.getSslTrustStoreLocation());
this.setSslTrustStorePassword(config.getSslTrustStorePassword());
this.setSslKeyStoreLocation(config.getSslKeyStoreLocation());
this.setSslKeyStorePassword(config.getSslKeyStorePassword());
}

public String getBootstrapServers() {
Expand All @@ -86,12 +93,6 @@ public String getBootstrapServers() {
public void setBootstrapServers(String bootstrapServers) {
this.bootstrapServers = bootstrapServers;
}
public String getSecurityProtocol() {
return securityProtocol;
}
public void setSecurityProtocol(String securityProtocol) {
this.securityProtocol = securityProtocol;
}
public short getDefaultReplicationFactor() {
return defaultReplicationFactor;
}
Expand Down Expand Up @@ -182,6 +183,36 @@ public long getMaxRetentionMs() {
public void setMaxRetentionMs(long maxRetentionMs) {
this.maxRetentionMs = maxRetentionMs;
}
public String getSecurityProtocol() {
return securityProtocol;
}
public void setSecurityProtocol(String securityProtocol) {
this.securityProtocol = securityProtocol;
}
public String getSslTrustStoreLocation() {
return sslTrustStoreLocation;
}
public void setSslTrustStoreLocation(String sslTrustStoreLocation) {
this.sslTrustStoreLocation = sslTrustStoreLocation;
}
public String getSslTrustStorePassword() {
return sslTrustStorePassword;
}
public void setSslTrustStorePassword(String sslTrustStorePassword) {
this.sslTrustStorePassword = sslTrustStorePassword;
}
public String getSslKeyStoreLocation() {
return sslKeyStoreLocation;
}
public void setSslKeyStoreLocation(String sslKeyStoreLocation) {
this.sslKeyStoreLocation = sslKeyStoreLocation;
}
public String getSslKeyStorePassword() {
return sslKeyStorePassword;
}
public void setSslKeyStorePassword(String sslKeyStorePassword) {
this.sslKeyStorePassword = sslKeyStorePassword;
}

@Override
public String toString() {
Expand All @@ -203,6 +234,8 @@ public String toString() {
", maxReplicationFactor=" + maxReplicationFactor +
", maxPartitions=" + maxPartitions +
", maxRetentionMs=" + maxRetentionMs +
", sslTrustStoreLocation='" + sslTrustStoreLocation + '\'' +
", sslKeyStoreLocation='" + sslKeyStoreLocation + '\'' +
'}';
}
}
11 changes: 8 additions & 3 deletions src/main/java/nb/kafka/operator/KafkaAdminImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -41,10 +42,14 @@ public KafkaAdminImpl(AppConfig config) {
Properties conf = new Properties();
conf.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers());
conf.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, Integer.toString(config.getKafkaTimeoutMs()));
if (!isBlank(config.getSecurityProtocol())) {
if (SecurityProtocol.SSL.name.equals(config.getSecurityProtocol())) {
log.info("Using security protocol {}.", config.getSecurityProtocol());
conf.setProperty(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, config.getSecurityProtocol());
conf.setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN");

conf.setProperty(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, config.getSslTrustStoreLocation() );
conf.setProperty(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, config.getSslTrustStorePassword());
conf.setProperty(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, config.getSslKeyStoreLocation());
conf.setProperty(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, config.getSslKeyStorePassword());
}
this.config = config;
this.client = AdminClient.create(conf);
Expand Down
6 changes: 5 additions & 1 deletion src/main/java/nb/kafka/operator/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,11 @@ public static AppConfig loadConfig() {
config.setEnableTopicImport(getSystemPropertyOrEnvVar("enable.topic.import", defaultConfig.isEnabledTopicImport()));
config.setEnableAclManagement(getSystemPropertyOrEnvVar("enable.acl", defaultConfig.isEnabledAclManagement()));
config.setEnableTopicDelete(getSystemPropertyOrEnvVar("enable.topic.delete", defaultConfig.isEnabledTopicDelete()));
config.setSecurityProtocol(getSystemPropertyOrEnvVar("security.protocol", ""));
config.setSecurityProtocol(getSystemPropertyOrEnvVar("security.protocol", defaultConfig.getSecurityProtocol()));
config.setSslTrustStoreLocation(getSystemPropertyOrEnvVar("ssl.truststore.location", defaultConfig.getSslTrustStoreLocation()));
config.setSslTrustStorePassword(getSystemPropertyOrEnvVar("ssl.truststore.password", defaultConfig.getSslTrustStorePassword()));
config.setSslKeyStoreLocation(getSystemPropertyOrEnvVar("ssl.keystore.location", defaultConfig.getSslKeyStoreLocation()));
config.setSslKeyStorePassword(getSystemPropertyOrEnvVar("ssl.keystore.password", defaultConfig.getSslKeyStorePassword()));
if (config.isEnabledAclManagement() && isBlank(config.getSecurityProtocol())) {
config.setSecurityProtocol("SASL_PLAINTEXT");
log.warn("ACL was enabled, but not security.protocol, forcing security protocol to {}",
Expand Down
17 changes: 0 additions & 17 deletions src/main/java/nb/kafka/operator/TopicCreationException.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,6 @@
public class TopicCreationException extends Exception {
private static final long serialVersionUID = -2135941761570261774L;

public TopicCreationException() {
super();
}

public TopicCreationException(String message, Throwable cause, boolean enableSuppression,
boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}

public TopicCreationException(String message, Throwable cause) {
super(message, cause);
}

public TopicCreationException(String message) {
super(message);
}

public TopicCreationException(Throwable cause) {
super(cause);
}
Expand Down
20 changes: 0 additions & 20 deletions src/main/java/nb/kafka/operator/util/MeterManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,6 @@ public <T> Gauge register(Gauge.Builder<T> builder) {
return register(builder.register(registry));
}

public Counter register(Counter.Builder builder) {
return register(builder.register(registry));
}

public Timer register(Timer.Builder builder) {
return register(builder.register(registry));
}

public DistributionSummary register(DistributionSummary.Builder builder) {
return register(builder.register(registry));
}

private <T extends Meter> T register(T meter) {
meters.put(meter.getId(), meter);
return meter;
Expand All @@ -66,18 +54,10 @@ public void close(Meter meter) {
close(meter.getId());
}

public Meter getMeter(Meter.Id id) {
return meters.get(id);
}

public MeterRegistry getRegistry() {
return registry;
}

public Set<Meter.Id> getMeterIds() {
return meters.keySet();
}

@Override
public void close() {
meters.values().forEach(meter -> {
Expand Down
5 changes: 0 additions & 5 deletions src/main/java/nb/kafka/operator/util/PropertyUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,6 @@ public static String convertSystemPropertyNameToEnvVar(String systemPropertyName
return systemPropertyName.toUpperCase().replaceAll("[.-]", "_");
}

public static String getEnvVar(String envVarName, String defaultValue) {
String answer = System.getenv(envVarName);
return isNotNullOrEmpty(answer) ? answer : defaultValue;
}

public static String getSystemPropertyOrEnvVar(String systemPropertyName, String defaultValue) {
return getSystemPropertyOrEnvVar(systemPropertyName, convertSystemPropertyNameToEnvVar(systemPropertyName),
defaultValue);
Expand Down