Skip to content

Commit

Permalink
add partitions and rep factor keys as env vars
Browse files Browse the repository at this point in the history
  • Loading branch information
roufa85 committed Jul 16, 2019
1 parent b7a4656 commit 55b1472
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 10 deletions.
22 changes: 22 additions & 0 deletions src/main/java/nb/kafka/operator/AppConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public class AppConfig {
private String sslTrustStorePassword;
private String sslKeyStoreLocation;
private String sslKeyStorePassword;
private String replicationFactorKey;
private String partitionsKey;

private static AppConfig defaultConfig;

Expand All @@ -54,6 +56,8 @@ public static final AppConfig defaultConfig() {
conf.kafkaTimeoutMs = 30000;
conf.maxReplicationFactor = (short)3;
conf.maxPartitions = 2000;
conf.replicationFactorKey = "replication-factor";
conf.partitionsKey = "partitions";
conf.maxRetentionMs = Duration.of(7, ChronoUnit.DAYS).toMillis();
defaultConfig = conf;
}
Expand Down Expand Up @@ -85,6 +89,8 @@ public AppConfig(AppConfig config) {
this.setSslTrustStorePassword(config.getSslTrustStorePassword());
this.setSslKeyStoreLocation(config.getSslKeyStoreLocation());
this.setSslKeyStorePassword(config.getSslKeyStorePassword());
this.setReplicationFactorKey(config.getReplicationFactorKey());
this.setPartitionsKey(config.getPartitionsKey());
}

public String getBootstrapServers() {
Expand Down Expand Up @@ -213,6 +219,18 @@ public String getSslKeyStorePassword() {
public void setSslKeyStorePassword(String sslKeyStorePassword) {
this.sslKeyStorePassword = sslKeyStorePassword;
}
public String getReplicationFactorKey() {
return replicationFactorKey;
}
public void setReplicationFactorKey(String replicationFactorKey) {
this.replicationFactorKey = replicationFactorKey;
}
public String getPartitionsKey() {
return partitionsKey;
}
public void setPartitionsKey(String partitionsKey) {
this.partitionsKey = partitionsKey;
}

@Override
public String toString() {
Expand All @@ -235,7 +253,11 @@ public String toString() {
", maxPartitions=" + maxPartitions +
", maxRetentionMs=" + maxRetentionMs +
", sslTrustStoreLocation='" + sslTrustStoreLocation + '\'' +
", sslTrustStorePassword='" + sslTrustStorePassword + '\'' +
", sslKeyStoreLocation='" + sslKeyStoreLocation + '\'' +
", sslKeyStorePassword='" + sslKeyStorePassword + '\'' +
", replicationFactorKey='" + replicationFactorKey + '\'' +
", partitionsKey='" + partitionsKey + '\'' +
'}';
}
}
2 changes: 2 additions & 0 deletions src/main/java/nb/kafka/operator/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public static AppConfig loadConfig() {
config.setEnableTopicImport(getSystemPropertyOrEnvVar("enable.topic.import", defaultConfig.isEnabledTopicImport()));
config.setEnableAclManagement(getSystemPropertyOrEnvVar("enable.acl", defaultConfig.isEnabledAclManagement()));
config.setEnableTopicDelete(getSystemPropertyOrEnvVar("enable.topic.delete", defaultConfig.isEnabledTopicDelete()));
config.setReplicationFactorKey(getSystemPropertyOrEnvVar("replication.factor.key", defaultConfig.getReplicationFactorKey()));
config.setPartitionsKey(getSystemPropertyOrEnvVar("partitions.key", defaultConfig.getPartitionsKey()));
config.setSecurityProtocol(getSystemPropertyOrEnvVar("security.protocol", defaultConfig.getSecurityProtocol()));
config.setSslTrustStoreLocation(getSystemPropertyOrEnvVar("ssl.truststore.location", defaultConfig.getSslTrustStoreLocation()));
config.setSslTrustStorePassword(getSystemPropertyOrEnvVar("ssl.truststore.password", defaultConfig.getSslTrustStorePassword()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,17 @@ public class ConfigMapImporter extends AbstractTopicImporter {
private static final Logger log = LoggerFactory.getLogger(ConfigMapImporter.class);

private static final String PROPERTIES_KEY = "properties";
private static final String REPLICATION_FACTOR_KEY = "replication-factor";
private static final String PARTITIONS_KEY = "partitions";
private static final String TOPIC_NAME_KEY = "name";
private static final String ACL_KEY = "acl";

private final KubernetesClient client;
private final AppConfig config;

public ConfigMapImporter(KubernetesClient client, ConfigMapWatcher watcher, TopicManager topicManager,
AppConfig config) {
super(watcher, topicManager, config);
this.client = client;
this.config = config;
}

/**
Expand All @@ -54,8 +54,8 @@ protected void createTopicResource(Topic topic) {
protected ConfigMap buildConfigMapResource(Topic topic) {
Map<String, String> data = new HashMap<>();
data.put(TOPIC_NAME_KEY, String.valueOf(topic.getName()));
data.put(PARTITIONS_KEY, String.valueOf(topic.getPartitions()));
data.put(REPLICATION_FACTOR_KEY, String.valueOf(topic.getReplicationFactor()));
data.put(config.getPartitionsKey(), String.valueOf(topic.getPartitions()));
data.put(config.getReplicationFactorKey(), String.valueOf(topic.getReplicationFactor()));
data.put(PROPERTIES_KEY, PropertyUtil.propertiesAsString(topic.getProperties()));
data.put(ACL_KEY, String.valueOf(topic.isAcl()));

Expand Down
8 changes: 4 additions & 4 deletions src/main/java/nb/kafka/operator/watch/ConfigMapWatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
public class ConfigMapWatcher extends KubernetesWatcher<ConfigMap> {

private static final String PROPERTIES_KEY = "properties";
private static final String REPLICATION_FACTOR_KEY = "replication-factor";
private static final String PARTITIONS_KEY = "partitions";
private static final String TOPIC_NAME_KEY = "name";
private static final String ACL_KEY = "acl";
private static final String KAFKA_TOPIC_LABEL_VALUE = "kafka-topic";
Expand All @@ -36,17 +34,19 @@ public class ConfigMapWatcher extends KubernetesWatcher<ConfigMap> {
private static final Logger log = LoggerFactory.getLogger(ConfigMapWatcher.class);

private Watch watch;
private final AppConfig config;

public ConfigMapWatcher(KubernetesClient client, AppConfig config) {
super(client, ConfigMap.class, config);
this.config = config;
}

@Override
protected Topic buildTopicModel(ConfigMap cm) {
try {
return new Topic(getProperty(cm.getData(), TOPIC_NAME_KEY, cm.getMetadata().getName()),
getProperty(cm.getData(), PARTITIONS_KEY, -1),
getProperty(cm.getData(), REPLICATION_FACTOR_KEY, (short)-1),
getProperty(cm.getData(), config.getPartitionsKey(), -1),
getProperty(cm.getData(), config.getReplicationFactorKey(), (short)-1),
propertiesFromString(getProperty(cm.getData(), PROPERTIES_KEY, "")),
getProperty(cm.getData(), ACL_KEY, false));
} catch (IOException | NumberFormatException e) { // NOSONAR
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ void testBuildTopicModelWithBadFormatProperties() {
@Test
void testWatchConfigMapCreateTopic() throws InterruptedException {
String topicName = "test-topic";
int partitions = 20;
short replicationFactor = 3;
int partitions = 10;
short replicationFactor = 2;
long retentionTime = 3600000L;

ConfigMap cm = makeConfigMap(topicName, partitions, replicationFactor, retentionTime);
Expand Down

0 comments on commit 55b1472

Please sign in to comment.