From 55b147271228f8e063b31b719528b70cdf03aa53 Mon Sep 17 00:00:00 2001 From: Raouf Date: Tue, 16 Jul 2019 16:24:54 +0200 Subject: [PATCH] add partitions and rep factor keys as env vars --- .../java/nb/kafka/operator/AppConfig.java | 22 +++++++++++++++++++ src/main/java/nb/kafka/operator/Main.java | 2 ++ .../operator/importer/ConfigMapImporter.java | 8 +++---- .../operator/watch/ConfigMapWatcher.java | 8 +++---- .../operator/watch/ConfigMapWatcherTest.java | 4 ++-- 5 files changed, 34 insertions(+), 10 deletions(-) diff --git a/src/main/java/nb/kafka/operator/AppConfig.java b/src/main/java/nb/kafka/operator/AppConfig.java index 10d7da3..eeb7b0a 100644 --- a/src/main/java/nb/kafka/operator/AppConfig.java +++ b/src/main/java/nb/kafka/operator/AppConfig.java @@ -30,6 +30,8 @@ public class AppConfig { private String sslTrustStorePassword; private String sslKeyStoreLocation; private String sslKeyStorePassword; + private String replicationFactorKey; + private String partitionsKey; private static AppConfig defaultConfig; @@ -54,6 +56,8 @@ public static final AppConfig defaultConfig() { conf.kafkaTimeoutMs = 30000; conf.maxReplicationFactor = (short)3; conf.maxPartitions = 2000; + conf.replicationFactorKey = "replication-factor"; + conf.partitionsKey = "partitions"; conf.maxRetentionMs = Duration.of(7, ChronoUnit.DAYS).toMillis(); defaultConfig = conf; } @@ -85,6 +89,8 @@ public AppConfig(AppConfig config) { this.setSslTrustStorePassword(config.getSslTrustStorePassword()); this.setSslKeyStoreLocation(config.getSslKeyStoreLocation()); this.setSslKeyStorePassword(config.getSslKeyStorePassword()); + this.setReplicationFactorKey(config.getReplicationFactorKey()); + this.setPartitionsKey(config.getPartitionsKey()); } public String getBootstrapServers() { @@ -213,6 +219,18 @@ public String getSslKeyStorePassword() { public void setSslKeyStorePassword(String sslKeyStorePassword) { this.sslKeyStorePassword = sslKeyStorePassword; } + public String getReplicationFactorKey() { + return replicationFactorKey; + } + public void setReplicationFactorKey(String replicationFactorKey) { + this.replicationFactorKey = replicationFactorKey; + } + public String getPartitionsKey() { + return partitionsKey; + } + public void setPartitionsKey(String partitionsKey) { + this.partitionsKey = partitionsKey; + } @Override public String toString() { @@ -235,7 +253,11 @@ public String toString() { ", maxPartitions=" + maxPartitions + ", maxRetentionMs=" + maxRetentionMs + ", sslTrustStoreLocation='" + sslTrustStoreLocation + '\'' + + ", sslTrustStorePassword='" + sslTrustStorePassword + '\'' + ", sslKeyStoreLocation='" + sslKeyStoreLocation + '\'' + + ", sslKeyStorePassword='" + sslKeyStorePassword + '\'' + + ", replicationFactorKey='" + replicationFactorKey + '\'' + + ", partitionsKey='" + partitionsKey + '\'' + '}'; } } diff --git a/src/main/java/nb/kafka/operator/Main.java b/src/main/java/nb/kafka/operator/Main.java index 24eebbd..248cbe8 100644 --- a/src/main/java/nb/kafka/operator/Main.java +++ b/src/main/java/nb/kafka/operator/Main.java @@ -52,6 +52,8 @@ public static AppConfig loadConfig() { config.setEnableTopicImport(getSystemPropertyOrEnvVar("enable.topic.import", defaultConfig.isEnabledTopicImport())); config.setEnableAclManagement(getSystemPropertyOrEnvVar("enable.acl", defaultConfig.isEnabledAclManagement())); config.setEnableTopicDelete(getSystemPropertyOrEnvVar("enable.topic.delete", defaultConfig.isEnabledTopicDelete())); + config.setReplicationFactorKey(getSystemPropertyOrEnvVar("replication.factor.key", defaultConfig.getReplicationFactorKey())); + config.setPartitionsKey(getSystemPropertyOrEnvVar("partitions.key", defaultConfig.getPartitionsKey())); config.setSecurityProtocol(getSystemPropertyOrEnvVar("security.protocol", defaultConfig.getSecurityProtocol())); config.setSslTrustStoreLocation(getSystemPropertyOrEnvVar("ssl.truststore.location", defaultConfig.getSslTrustStoreLocation())); config.setSslTrustStorePassword(getSystemPropertyOrEnvVar("ssl.truststore.password", defaultConfig.getSslTrustStorePassword())); diff --git a/src/main/java/nb/kafka/operator/importer/ConfigMapImporter.java b/src/main/java/nb/kafka/operator/importer/ConfigMapImporter.java index d3f4ad7..98ac39b 100644 --- a/src/main/java/nb/kafka/operator/importer/ConfigMapImporter.java +++ b/src/main/java/nb/kafka/operator/importer/ConfigMapImporter.java @@ -27,17 +27,17 @@ public class ConfigMapImporter extends AbstractTopicImporter { private static final Logger log = LoggerFactory.getLogger(ConfigMapImporter.class); private static final String PROPERTIES_KEY = "properties"; - private static final String REPLICATION_FACTOR_KEY = "replication-factor"; - private static final String PARTITIONS_KEY = "partitions"; private static final String TOPIC_NAME_KEY = "name"; private static final String ACL_KEY = "acl"; private final KubernetesClient client; + private final AppConfig config; public ConfigMapImporter(KubernetesClient client, ConfigMapWatcher watcher, TopicManager topicManager, AppConfig config) { super(watcher, topicManager, config); this.client = client; + this.config = config; } /** @@ -54,8 +54,8 @@ protected void createTopicResource(Topic topic) { protected ConfigMap buildConfigMapResource(Topic topic) { Map data = new HashMap<>(); data.put(TOPIC_NAME_KEY, String.valueOf(topic.getName())); - data.put(PARTITIONS_KEY, String.valueOf(topic.getPartitions())); - data.put(REPLICATION_FACTOR_KEY, String.valueOf(topic.getReplicationFactor())); + data.put(config.getPartitionsKey(), String.valueOf(topic.getPartitions())); + data.put(config.getReplicationFactorKey(), String.valueOf(topic.getReplicationFactor())); data.put(PROPERTIES_KEY, PropertyUtil.propertiesAsString(topic.getProperties())); data.put(ACL_KEY, String.valueOf(topic.isAcl())); diff --git a/src/main/java/nb/kafka/operator/watch/ConfigMapWatcher.java b/src/main/java/nb/kafka/operator/watch/ConfigMapWatcher.java index df8d92e..da7b0ea 100644 --- a/src/main/java/nb/kafka/operator/watch/ConfigMapWatcher.java +++ b/src/main/java/nb/kafka/operator/watch/ConfigMapWatcher.java @@ -26,8 +26,6 @@ public class ConfigMapWatcher extends KubernetesWatcher { private static final String PROPERTIES_KEY = "properties"; - private static final String REPLICATION_FACTOR_KEY = "replication-factor"; - private static final String PARTITIONS_KEY = "partitions"; private static final String TOPIC_NAME_KEY = "name"; private static final String ACL_KEY = "acl"; private static final String KAFKA_TOPIC_LABEL_VALUE = "kafka-topic"; @@ -36,17 +34,19 @@ public class ConfigMapWatcher extends KubernetesWatcher { private static final Logger log = LoggerFactory.getLogger(ConfigMapWatcher.class); private Watch watch; + private final AppConfig config; public ConfigMapWatcher(KubernetesClient client, AppConfig config) { super(client, ConfigMap.class, config); + this.config = config; } @Override protected Topic buildTopicModel(ConfigMap cm) { try { return new Topic(getProperty(cm.getData(), TOPIC_NAME_KEY, cm.getMetadata().getName()), - getProperty(cm.getData(), PARTITIONS_KEY, -1), - getProperty(cm.getData(), REPLICATION_FACTOR_KEY, (short)-1), + getProperty(cm.getData(), config.getPartitionsKey(), -1), + getProperty(cm.getData(), config.getReplicationFactorKey(), (short)-1), propertiesFromString(getProperty(cm.getData(), PROPERTIES_KEY, "")), getProperty(cm.getData(), ACL_KEY, false)); } catch (IOException | NumberFormatException e) { // NOSONAR diff --git a/src/test/java/nb/kafka/operator/watch/ConfigMapWatcherTest.java b/src/test/java/nb/kafka/operator/watch/ConfigMapWatcherTest.java index b57a2e3..4d752ca 100644 --- a/src/test/java/nb/kafka/operator/watch/ConfigMapWatcherTest.java +++ b/src/test/java/nb/kafka/operator/watch/ConfigMapWatcherTest.java @@ -111,8 +111,8 @@ void testBuildTopicModelWithBadFormatProperties() { @Test void testWatchConfigMapCreateTopic() throws InterruptedException { String topicName = "test-topic"; - int partitions = 20; - short replicationFactor = 3; + int partitions = 10; + short replicationFactor = 2; long retentionTime = 3600000L; ConfigMap cm = makeConfigMap(topicName, partitions, replicationFactor, retentionTime);