Skip to content

Commit

Permalink
Revert "add partitions and rep factor keys as env vars"
Browse files Browse the repository at this point in the history
This reverts commit 55b1472.
  • Loading branch information
roufa85 committed Jul 22, 2019
1 parent 55b1472 commit 86940dc
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 34 deletions.
22 changes: 0 additions & 22 deletions src/main/java/nb/kafka/operator/AppConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ public class AppConfig {
private String sslTrustStorePassword;
private String sslKeyStoreLocation;
private String sslKeyStorePassword;
private String replicationFactorKey;
private String partitionsKey;

private static AppConfig defaultConfig;

Expand All @@ -56,8 +54,6 @@ public static final AppConfig defaultConfig() {
conf.kafkaTimeoutMs = 30000;
conf.maxReplicationFactor = (short)3;
conf.maxPartitions = 2000;
conf.replicationFactorKey = "replication-factor";
conf.partitionsKey = "partitions";
conf.maxRetentionMs = Duration.of(7, ChronoUnit.DAYS).toMillis();
defaultConfig = conf;
}
Expand Down Expand Up @@ -89,8 +85,6 @@ public AppConfig(AppConfig config) {
this.setSslTrustStorePassword(config.getSslTrustStorePassword());
this.setSslKeyStoreLocation(config.getSslKeyStoreLocation());
this.setSslKeyStorePassword(config.getSslKeyStorePassword());
this.setReplicationFactorKey(config.getReplicationFactorKey());
this.setPartitionsKey(config.getPartitionsKey());
}

public String getBootstrapServers() {
Expand Down Expand Up @@ -219,18 +213,6 @@ public String getSslKeyStorePassword() {
public void setSslKeyStorePassword(String sslKeyStorePassword) {
this.sslKeyStorePassword = sslKeyStorePassword;
}
public String getReplicationFactorKey() {
return replicationFactorKey;
}
public void setReplicationFactorKey(String replicationFactorKey) {
this.replicationFactorKey = replicationFactorKey;
}
public String getPartitionsKey() {
return partitionsKey;
}
public void setPartitionsKey(String partitionsKey) {
this.partitionsKey = partitionsKey;
}

@Override
public String toString() {
Expand All @@ -253,11 +235,7 @@ public String toString() {
", maxPartitions=" + maxPartitions +
", maxRetentionMs=" + maxRetentionMs +
", sslTrustStoreLocation='" + sslTrustStoreLocation + '\'' +
", sslTrustStorePassword='" + sslTrustStorePassword + '\'' +
", sslKeyStoreLocation='" + sslKeyStoreLocation + '\'' +
", sslKeyStorePassword='" + sslKeyStorePassword + '\'' +
", replicationFactorKey='" + replicationFactorKey + '\'' +
", partitionsKey='" + partitionsKey + '\'' +
'}';
}
}
2 changes: 0 additions & 2 deletions src/main/java/nb/kafka/operator/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@ public static AppConfig loadConfig() {
config.setEnableTopicImport(getSystemPropertyOrEnvVar("enable.topic.import", defaultConfig.isEnabledTopicImport()));
config.setEnableAclManagement(getSystemPropertyOrEnvVar("enable.acl", defaultConfig.isEnabledAclManagement()));
config.setEnableTopicDelete(getSystemPropertyOrEnvVar("enable.topic.delete", defaultConfig.isEnabledTopicDelete()));
config.setReplicationFactorKey(getSystemPropertyOrEnvVar("replication.factor.key", defaultConfig.getReplicationFactorKey()));
config.setPartitionsKey(getSystemPropertyOrEnvVar("partitions.key", defaultConfig.getPartitionsKey()));
config.setSecurityProtocol(getSystemPropertyOrEnvVar("security.protocol", defaultConfig.getSecurityProtocol()));
config.setSslTrustStoreLocation(getSystemPropertyOrEnvVar("ssl.truststore.location", defaultConfig.getSslTrustStoreLocation()));
config.setSslTrustStorePassword(getSystemPropertyOrEnvVar("ssl.truststore.password", defaultConfig.getSslTrustStorePassword()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,17 @@ public class ConfigMapImporter extends AbstractTopicImporter {
private static final Logger log = LoggerFactory.getLogger(ConfigMapImporter.class);

private static final String PROPERTIES_KEY = "properties";
private static final String REPLICATION_FACTOR_KEY = "replication-factor";
private static final String PARTITIONS_KEY = "partitions";
private static final String TOPIC_NAME_KEY = "name";
private static final String ACL_KEY = "acl";

private final KubernetesClient client;
private final AppConfig config;

public ConfigMapImporter(KubernetesClient client, ConfigMapWatcher watcher, TopicManager topicManager,
AppConfig config) {
super(watcher, topicManager, config);
this.client = client;
this.config = config;
}

/**
Expand All @@ -54,8 +54,8 @@ protected void createTopicResource(Topic topic) {
protected ConfigMap buildConfigMapResource(Topic topic) {
Map<String, String> data = new HashMap<>();
data.put(TOPIC_NAME_KEY, String.valueOf(topic.getName()));
data.put(config.getPartitionsKey(), String.valueOf(topic.getPartitions()));
data.put(config.getReplicationFactorKey(), String.valueOf(topic.getReplicationFactor()));
data.put(PARTITIONS_KEY, String.valueOf(topic.getPartitions()));
data.put(REPLICATION_FACTOR_KEY, String.valueOf(topic.getReplicationFactor()));
data.put(PROPERTIES_KEY, PropertyUtil.propertiesAsString(topic.getProperties()));
data.put(ACL_KEY, String.valueOf(topic.isAcl()));

Expand Down
8 changes: 4 additions & 4 deletions src/main/java/nb/kafka/operator/watch/ConfigMapWatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
public class ConfigMapWatcher extends KubernetesWatcher<ConfigMap> {

private static final String PROPERTIES_KEY = "properties";
private static final String REPLICATION_FACTOR_KEY = "replication-factor";
private static final String PARTITIONS_KEY = "partitions";
private static final String TOPIC_NAME_KEY = "name";
private static final String ACL_KEY = "acl";
private static final String KAFKA_TOPIC_LABEL_VALUE = "kafka-topic";
Expand All @@ -34,19 +36,17 @@ public class ConfigMapWatcher extends KubernetesWatcher<ConfigMap> {
private static final Logger log = LoggerFactory.getLogger(ConfigMapWatcher.class);

private Watch watch;
private final AppConfig config;

public ConfigMapWatcher(KubernetesClient client, AppConfig config) {
super(client, ConfigMap.class, config);
this.config = config;
}

@Override
protected Topic buildTopicModel(ConfigMap cm) {
try {
return new Topic(getProperty(cm.getData(), TOPIC_NAME_KEY, cm.getMetadata().getName()),
getProperty(cm.getData(), config.getPartitionsKey(), -1),
getProperty(cm.getData(), config.getReplicationFactorKey(), (short)-1),
getProperty(cm.getData(), PARTITIONS_KEY, -1),
getProperty(cm.getData(), REPLICATION_FACTOR_KEY, (short)-1),
propertiesFromString(getProperty(cm.getData(), PROPERTIES_KEY, "")),
getProperty(cm.getData(), ACL_KEY, false));
} catch (IOException | NumberFormatException e) { // NOSONAR
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ void testBuildTopicModelWithBadFormatProperties() {
@Test
void testWatchConfigMapCreateTopic() throws InterruptedException {
String topicName = "test-topic";
int partitions = 10;
short replicationFactor = 2;
int partitions = 20;
short replicationFactor = 3;
long retentionTime = 3600000L;

ConfigMap cm = makeConfigMap(topicName, partitions, replicationFactor, retentionTime);
Expand Down

0 comments on commit 86940dc

Please sign in to comment.