From 86940dc40505e9caf5d9dffdbea662ebd5f00a62 Mon Sep 17 00:00:00 2001 From: Raouf Date: Mon, 22 Jul 2019 15:01:04 +0200 Subject: [PATCH] Revert "add partitions and rep factor keys as env vars" This reverts commit 55b147271228f8e063b31b719528b70cdf03aa53. --- .../java/nb/kafka/operator/AppConfig.java | 22 ------------------- src/main/java/nb/kafka/operator/Main.java | 2 -- .../operator/importer/ConfigMapImporter.java | 8 +++---- .../operator/watch/ConfigMapWatcher.java | 8 +++---- .../operator/watch/ConfigMapWatcherTest.java | 4 ++-- 5 files changed, 10 insertions(+), 34 deletions(-) diff --git a/src/main/java/nb/kafka/operator/AppConfig.java b/src/main/java/nb/kafka/operator/AppConfig.java index eeb7b0a..10d7da3 100644 --- a/src/main/java/nb/kafka/operator/AppConfig.java +++ b/src/main/java/nb/kafka/operator/AppConfig.java @@ -30,8 +30,6 @@ public class AppConfig { private String sslTrustStorePassword; private String sslKeyStoreLocation; private String sslKeyStorePassword; - private String replicationFactorKey; - private String partitionsKey; private static AppConfig defaultConfig; @@ -56,8 +54,6 @@ public static final AppConfig defaultConfig() { conf.kafkaTimeoutMs = 30000; conf.maxReplicationFactor = (short)3; conf.maxPartitions = 2000; - conf.replicationFactorKey = "replication-factor"; - conf.partitionsKey = "partitions"; conf.maxRetentionMs = Duration.of(7, ChronoUnit.DAYS).toMillis(); defaultConfig = conf; } @@ -89,8 +85,6 @@ public AppConfig(AppConfig config) { this.setSslTrustStorePassword(config.getSslTrustStorePassword()); this.setSslKeyStoreLocation(config.getSslKeyStoreLocation()); this.setSslKeyStorePassword(config.getSslKeyStorePassword()); - this.setReplicationFactorKey(config.getReplicationFactorKey()); - this.setPartitionsKey(config.getPartitionsKey()); } public String getBootstrapServers() { @@ -219,18 +213,6 @@ public String getSslKeyStorePassword() { public void setSslKeyStorePassword(String sslKeyStorePassword) { this.sslKeyStorePassword = sslKeyStorePassword; } - public String getReplicationFactorKey() { - return replicationFactorKey; - } - public void setReplicationFactorKey(String replicationFactorKey) { - this.replicationFactorKey = replicationFactorKey; - } - public String getPartitionsKey() { - return partitionsKey; - } - public void setPartitionsKey(String partitionsKey) { - this.partitionsKey = partitionsKey; - } @Override public String toString() { @@ -253,11 +235,7 @@ public String toString() { ", maxPartitions=" + maxPartitions + ", maxRetentionMs=" + maxRetentionMs + ", sslTrustStoreLocation='" + sslTrustStoreLocation + '\'' + - ", sslTrustStorePassword='" + sslTrustStorePassword + '\'' + ", sslKeyStoreLocation='" + sslKeyStoreLocation + '\'' + - ", sslKeyStorePassword='" + sslKeyStorePassword + '\'' + - ", replicationFactorKey='" + replicationFactorKey + '\'' + - ", partitionsKey='" + partitionsKey + '\'' + '}'; } } diff --git a/src/main/java/nb/kafka/operator/Main.java b/src/main/java/nb/kafka/operator/Main.java index 248cbe8..24eebbd 100644 --- a/src/main/java/nb/kafka/operator/Main.java +++ b/src/main/java/nb/kafka/operator/Main.java @@ -52,8 +52,6 @@ public static AppConfig loadConfig() { config.setEnableTopicImport(getSystemPropertyOrEnvVar("enable.topic.import", defaultConfig.isEnabledTopicImport())); config.setEnableAclManagement(getSystemPropertyOrEnvVar("enable.acl", defaultConfig.isEnabledAclManagement())); config.setEnableTopicDelete(getSystemPropertyOrEnvVar("enable.topic.delete", defaultConfig.isEnabledTopicDelete())); - config.setReplicationFactorKey(getSystemPropertyOrEnvVar("replication.factor.key", defaultConfig.getReplicationFactorKey())); - config.setPartitionsKey(getSystemPropertyOrEnvVar("partitions.key", defaultConfig.getPartitionsKey())); config.setSecurityProtocol(getSystemPropertyOrEnvVar("security.protocol", defaultConfig.getSecurityProtocol())); config.setSslTrustStoreLocation(getSystemPropertyOrEnvVar("ssl.truststore.location", defaultConfig.getSslTrustStoreLocation())); config.setSslTrustStorePassword(getSystemPropertyOrEnvVar("ssl.truststore.password", defaultConfig.getSslTrustStorePassword())); diff --git a/src/main/java/nb/kafka/operator/importer/ConfigMapImporter.java b/src/main/java/nb/kafka/operator/importer/ConfigMapImporter.java index 98ac39b..d3f4ad7 100644 --- a/src/main/java/nb/kafka/operator/importer/ConfigMapImporter.java +++ b/src/main/java/nb/kafka/operator/importer/ConfigMapImporter.java @@ -27,17 +27,17 @@ public class ConfigMapImporter extends AbstractTopicImporter { private static final Logger log = LoggerFactory.getLogger(ConfigMapImporter.class); private static final String PROPERTIES_KEY = "properties"; + private static final String REPLICATION_FACTOR_KEY = "replication-factor"; + private static final String PARTITIONS_KEY = "partitions"; private static final String TOPIC_NAME_KEY = "name"; private static final String ACL_KEY = "acl"; private final KubernetesClient client; - private final AppConfig config; public ConfigMapImporter(KubernetesClient client, ConfigMapWatcher watcher, TopicManager topicManager, AppConfig config) { super(watcher, topicManager, config); this.client = client; - this.config = config; } /** @@ -54,8 +54,8 @@ protected void createTopicResource(Topic topic) { protected ConfigMap buildConfigMapResource(Topic topic) { Map data = new HashMap<>(); data.put(TOPIC_NAME_KEY, String.valueOf(topic.getName())); - data.put(config.getPartitionsKey(), String.valueOf(topic.getPartitions())); - data.put(config.getReplicationFactorKey(), String.valueOf(topic.getReplicationFactor())); + data.put(PARTITIONS_KEY, String.valueOf(topic.getPartitions())); + data.put(REPLICATION_FACTOR_KEY, String.valueOf(topic.getReplicationFactor())); data.put(PROPERTIES_KEY, PropertyUtil.propertiesAsString(topic.getProperties())); data.put(ACL_KEY, String.valueOf(topic.isAcl())); diff --git a/src/main/java/nb/kafka/operator/watch/ConfigMapWatcher.java b/src/main/java/nb/kafka/operator/watch/ConfigMapWatcher.java index da7b0ea..df8d92e 100644 --- a/src/main/java/nb/kafka/operator/watch/ConfigMapWatcher.java +++ b/src/main/java/nb/kafka/operator/watch/ConfigMapWatcher.java @@ -26,6 +26,8 @@ public class ConfigMapWatcher extends KubernetesWatcher { private static final String PROPERTIES_KEY = "properties"; + private static final String REPLICATION_FACTOR_KEY = "replication-factor"; + private static final String PARTITIONS_KEY = "partitions"; private static final String TOPIC_NAME_KEY = "name"; private static final String ACL_KEY = "acl"; private static final String KAFKA_TOPIC_LABEL_VALUE = "kafka-topic"; @@ -34,19 +36,17 @@ public class ConfigMapWatcher extends KubernetesWatcher { private static final Logger log = LoggerFactory.getLogger(ConfigMapWatcher.class); private Watch watch; - private final AppConfig config; public ConfigMapWatcher(KubernetesClient client, AppConfig config) { super(client, ConfigMap.class, config); - this.config = config; } @Override protected Topic buildTopicModel(ConfigMap cm) { try { return new Topic(getProperty(cm.getData(), TOPIC_NAME_KEY, cm.getMetadata().getName()), - getProperty(cm.getData(), config.getPartitionsKey(), -1), - getProperty(cm.getData(), config.getReplicationFactorKey(), (short)-1), + getProperty(cm.getData(), PARTITIONS_KEY, -1), + getProperty(cm.getData(), REPLICATION_FACTOR_KEY, (short)-1), propertiesFromString(getProperty(cm.getData(), PROPERTIES_KEY, "")), getProperty(cm.getData(), ACL_KEY, false)); } catch (IOException | NumberFormatException e) { // NOSONAR diff --git a/src/test/java/nb/kafka/operator/watch/ConfigMapWatcherTest.java b/src/test/java/nb/kafka/operator/watch/ConfigMapWatcherTest.java index 4d752ca..b57a2e3 100644 --- a/src/test/java/nb/kafka/operator/watch/ConfigMapWatcherTest.java +++ b/src/test/java/nb/kafka/operator/watch/ConfigMapWatcherTest.java @@ -111,8 +111,8 @@ void testBuildTopicModelWithBadFormatProperties() { @Test void testWatchConfigMapCreateTopic() throws InterruptedException { String topicName = "test-topic"; - int partitions = 10; - short replicationFactor = 2; + int partitions = 20; + short replicationFactor = 3; long retentionTime = 3600000L; ConfigMap cm = makeConfigMap(topicName, partitions, replicationFactor, retentionTime);