Skip to content

Commit

Permalink
delete topic without building the model
Browse files Browse the repository at this point in the history
  • Loading branch information
roufa85 committed Jul 11, 2019
1 parent c5653b5 commit 67df8d9
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 14 deletions.
4 changes: 0 additions & 4 deletions src/main/java/nb/kafka/operator/PartitionedTopic.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@ public PartitionedTopic(String name, int numPartitions, short replicationFactor,
this.partitionInfos = Collections.unmodifiableList(partitionInfos);
}

public List<TopicPartitionInfo> getPartitionInfos() {
return partitionInfos;
}

@Override
public int hashCode() {
final int prime = 31;
Expand Down
8 changes: 0 additions & 8 deletions src/main/java/nb/kafka/operator/Topic.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,6 @@ public Topic(String name, int partitions, short replicationFactor, Map<String, S
this.acl = acl;
}

public Topic(Topic topic) {
this.name = topic.name;
this.partitions = topic.partitions;
this.replicationFactor = topic.replicationFactor;
this.properties = topic.properties;
this.acl = topic.acl;
}

public String getName() {
return name;
}
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/nb/kafka/operator/watch/ConfigMapWatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ protected Topic buildTopicModel(ConfigMap cm) {
}
}

@Override
protected String getTopicName(ConfigMap cm) {
return getProperty(cm.getData(), TOPIC_NAME_KEY, cm.getMetadata().getName());
}

@Override
public void watch() {
log.debug("Watching {} for ConfigMap changes", kubeClient().getNamespace());
Expand Down
6 changes: 4 additions & 2 deletions src/main/java/nb/kafka/operator/watch/KubernetesWatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,16 @@ public KubernetesWatcher(KubernetesClient client, Class<T> resourceClass, AppCon

protected abstract Topic buildTopicModel(T resource);

protected abstract String getTopicName(T resource);

@Override
public void eventReceived(Action action, T resource) {
if (resource == null) {
log.warn("Event {} received for null resource {}", action, resourceKind());
return;
}

String topicName = resource.getMetadata().getName();
String topicName = getTopicName(resource);
log.info("Got event {} for {} {}", action, resourceKind(), topicName);

if (!TopicUtil.isValidTopicName(topicName)) {
Expand All @@ -58,7 +60,7 @@ public void eventReceived(Action action, T resource) {
emitUpdate(buildTopicModel(resource));
break;
case DELETED:
emitDelete(buildTopicModel(resource).getName());
emitDelete(topicName);
break;
case ERROR:
log.error("Error event received for {}: {}", resourceKind(), resource);
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/nb/kafka/operator/watch/TopicCrdWatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ protected Topic buildTopicModel(KafkaTopic resource) {
}
}

@Override
protected String getTopicName(KafkaTopic resource) {
return resource.getSpec().getName();
}

@Override
public List<Topic> listTopics() {
KafkaTopicList list = crdClient().withLabels(labels()).list();
Expand Down
5 changes: 5 additions & 0 deletions src/test/java/nb/kafka/operator/PartitionedTopicTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import static org.junit.Assert.*;

public class PartitionedTopicTest {

}
48 changes: 48 additions & 0 deletions src/test/java/nb/kafka/operator/watch/ConfigMapWatcherTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,54 @@ void testListTopics() {
}
}

@Test
public void testGetTopicNameFromData() {
//Arrange
String configMapName = "topic-name";
String topicName = "_topic_name_";
Map<String, String> data = new HashMap<>();
data.put("name", topicName);
ObjectMeta metadata = new ObjectMeta();
metadata.setName(configMapName);

Map<String, String> labels = Collections.singletonMap("config", "kafka-topic");
metadata.setLabels(labels);
ConfigMap cm = new ConfigMap("v1", data, "ConfigMap", metadata);
cm.setData(data);
KubernetesClient client = server.getClient();

//Act
try (ConfigMapWatcher configMapWatcher = new ConfigMapWatcher(client, appConfig)) {
String result = configMapWatcher.getTopicName(cm);

//Assert
assertEquals(topicName, result);
}
}

@Test
public void testGetTopicNameFromMetadataName() {
//Arrange
String configMapName = "topic-name";
Map<String, String> data = new HashMap<>();
ObjectMeta metadata = new ObjectMeta();
metadata.setName(configMapName);

Map<String, String> labels = Collections.singletonMap("config", "kafka-topic");
metadata.setLabels(labels);
ConfigMap cm = new ConfigMap("v1", data, "ConfigMap", metadata);
cm.setData(data);
KubernetesClient client = server.getClient();

//Act
try (ConfigMapWatcher configMapWatcher = new ConfigMapWatcher(client, appConfig)) {
String result = configMapWatcher.getTopicName(cm);

//Assert
assertEquals(configMapName, result);
}
}

private ConfigMap makeConfigMap(String topicName, int partitions, short replicationFactor, long retentionTime) {
Map<String, String> data = new HashMap<>();
data.put("partitions", Integer.toString(partitions));
Expand Down

0 comments on commit 67df8d9

Please sign in to comment.