diff --git a/src/main/java/nb/kafka/operator/PartitionedTopic.java b/src/main/java/nb/kafka/operator/PartitionedTopic.java index 1ef7c55..9eebda6 100644 --- a/src/main/java/nb/kafka/operator/PartitionedTopic.java +++ b/src/main/java/nb/kafka/operator/PartitionedTopic.java @@ -16,10 +16,6 @@ public PartitionedTopic(String name, int numPartitions, short replicationFactor, this.partitionInfos = Collections.unmodifiableList(partitionInfos); } - public List getPartitionInfos() { - return partitionInfos; - } - @Override public int hashCode() { final int prime = 31; diff --git a/src/main/java/nb/kafka/operator/Topic.java b/src/main/java/nb/kafka/operator/Topic.java index 2c0c064..bf3b59f 100644 --- a/src/main/java/nb/kafka/operator/Topic.java +++ b/src/main/java/nb/kafka/operator/Topic.java @@ -21,14 +21,6 @@ public Topic(String name, int partitions, short replicationFactor, Map resourceClass, AppCon protected abstract Topic buildTopicModel(T resource); + protected abstract String getTopicName(T resource); + @Override public void eventReceived(Action action, T resource) { if (resource == null) { @@ -43,7 +45,7 @@ public void eventReceived(Action action, T resource) { return; } - String topicName = resource.getMetadata().getName(); + String topicName = getTopicName(resource); log.info("Got event {} for {} {}", action, resourceKind(), topicName); if (!TopicUtil.isValidTopicName(topicName)) { @@ -58,7 +60,7 @@ public void eventReceived(Action action, T resource) { emitUpdate(buildTopicModel(resource)); break; case DELETED: - emitDelete(buildTopicModel(resource).getName()); + emitDelete(topicName); break; case ERROR: log.error("Error event received for {}: {}", resourceKind(), resource); diff --git a/src/main/java/nb/kafka/operator/watch/TopicCrdWatcher.java b/src/main/java/nb/kafka/operator/watch/TopicCrdWatcher.java index b419b75..1b2d341 100644 --- a/src/main/java/nb/kafka/operator/watch/TopicCrdWatcher.java +++ b/src/main/java/nb/kafka/operator/watch/TopicCrdWatcher.java @@ -59,6 +59,11 @@ protected Topic buildTopicModel(KafkaTopic resource) { } } + @Override + protected String getTopicName(KafkaTopic resource) { + return resource.getSpec().getName(); + } + @Override public List listTopics() { KafkaTopicList list = crdClient().withLabels(labels()).list(); diff --git a/src/test/java/nb/kafka/operator/PartitionedTopicTest.java b/src/test/java/nb/kafka/operator/PartitionedTopicTest.java new file mode 100644 index 0000000..9504c56 --- /dev/null +++ b/src/test/java/nb/kafka/operator/PartitionedTopicTest.java @@ -0,0 +1,5 @@ +import static org.junit.Assert.*; + +public class PartitionedTopicTest { + +} \ No newline at end of file diff --git a/src/test/java/nb/kafka/operator/watch/ConfigMapWatcherTest.java b/src/test/java/nb/kafka/operator/watch/ConfigMapWatcherTest.java index d4196a5..b57a2e3 100644 --- a/src/test/java/nb/kafka/operator/watch/ConfigMapWatcherTest.java +++ b/src/test/java/nb/kafka/operator/watch/ConfigMapWatcherTest.java @@ -320,6 +320,54 @@ void testListTopics() { } } + @Test + public void testGetTopicNameFromData() { + //Arrange + String configMapName = "topic-name"; + String topicName = "_topic_name_"; + Map data = new HashMap<>(); + data.put("name", topicName); + ObjectMeta metadata = new ObjectMeta(); + metadata.setName(configMapName); + + Map labels = Collections.singletonMap("config", "kafka-topic"); + metadata.setLabels(labels); + ConfigMap cm = new ConfigMap("v1", data, "ConfigMap", metadata); + cm.setData(data); + KubernetesClient client = server.getClient(); + + //Act + try (ConfigMapWatcher configMapWatcher = new ConfigMapWatcher(client, appConfig)) { + String result = configMapWatcher.getTopicName(cm); + + //Assert + assertEquals(topicName, result); + } + } + + @Test + public void testGetTopicNameFromMetadataName() { + //Arrange + String configMapName = "topic-name"; + Map data = new HashMap<>(); + ObjectMeta metadata = new ObjectMeta(); + metadata.setName(configMapName); + + Map labels = Collections.singletonMap("config", "kafka-topic"); + metadata.setLabels(labels); + ConfigMap cm = new ConfigMap("v1", data, "ConfigMap", metadata); + cm.setData(data); + KubernetesClient client = server.getClient(); + + //Act + try (ConfigMapWatcher configMapWatcher = new ConfigMapWatcher(client, appConfig)) { + String result = configMapWatcher.getTopicName(cm); + + //Assert + assertEquals(configMapName, result); + } + } + private ConfigMap makeConfigMap(String topicName, int partitions, short replicationFactor, long retentionTime) { Map data = new HashMap<>(); data.put("partitions", Integer.toString(partitions));