Skip to content

Commit

Permalink
fix: prune subscriptions trie when topics are unsubscribed
Browse files Browse the repository at this point in the history
  • Loading branch information
saranyailla committed Nov 22, 2024
1 parent 2554a42 commit e17d003
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package com.aws.greengrass.builtin.services.pubsub;

import com.aws.greengrass.util.Utils;

import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
Expand Down Expand Up @@ -69,11 +71,61 @@ public boolean remove(String topic, K cb) {
* @return if changed after removal
*/
public boolean remove(String topic, Set<K> cbs) {
SubscriptionTrie<K> sub = lookup(topic);
if (sub == null) {
if (Utils.isEmpty(topic) || Utils.isEmpty(cbs)) {
return false;
}
return sub.subscriptionCallbacks.removeAll(cbs);
SubscriptionTrie<K> current = this;
SubscriptionTrie<K> topicNodeToKeep = current;
String[] topicLevels = topic.split(TOPIC_LEVEL_SEPARATOR);
String nextTopicNode = topicLevels[0];
for (String topicLevel : topicLevels) {
if (!canRemove(current)) {
topicNodeToKeep = current;
nextTopicNode = topicLevel;
}
current = current.children.get(topicLevel);
if (current == null) {
return false; // Happens when the topic doesn't exist in the trie. So, return false for remove
}
}

boolean removedCallbacks = current.subscriptionCallbacks.removeAll(cbs);
// If topic is still a prefix of another registered topic in the trie, do not remove and return
if (!canRemove(current)) {
return removedCallbacks;
}

// If the current topic is neither a prefix of another topic nor has callbacks, prune the trie such
// that all the topic levels of this topic that don't have children or callbacks are removed.
topicNodeToKeep.children.remove(nextTopicNode);
// since topicNodeToKeep becomes a leaf, prune it and possibly all the topic levels of the topic if
// they have no children or callbacks registered
if (canRemove(topicNodeToKeep)) {
pruneRecursively(topicLevels);
}
return true;
}

private boolean canRemove(SubscriptionTrie<K> node) {
// returns false if the topic level is not prefix of another topic or has callbacks registered
return node.children.isEmpty() && node.subscriptionCallbacks.isEmpty();

}

void pruneRecursively(String... topicLevels) {
SubscriptionTrie<K> current = this;
SubscriptionTrie<K> prev = current;
for (int i = 0; i < topicLevels.length; i++) {
if (i > 0 && canRemove(current)) { // At level 0, current is root of the trie
prev.children.remove(topicLevels[i - 1]);
pruneRecursively(topicLevels);
}
prev = current;
current = current.children.get(topicLevels[i]);
if (current == null) {
return; // nothing to prune
}
}
}

/**
Expand Down Expand Up @@ -183,3 +235,4 @@ public static boolean isWildcard(String topic) {
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,51 @@ public void GIVEN_subscription_wildcard_WHEN_remove_topic_THEN_no_matches() {
assertEquals(0, trie.size());
}

@Test
public void GIVEN_subscriptions_with_wildcards_WHEN_remove_topics_THEN_clean_up_trie() {
assertEquals(0, trie.size());
SubscriptionCallback cb1 = generateSubscriptionCallback();
SubscriptionCallback cb2 = generateSubscriptionCallback();
SubscriptionCallback cb3 = generateSubscriptionCallback();
String topic = "foo/+/bar/#";
trie.add("bar", cb1);
trie.add(topic, cb1);
trie.add(topic, cb2);
// Topic is not registered with the callback, mark it as removed when requested to remove
assertThat("remove topic", trie.remove(topic, cb3), is(false));
assertThat(trie.get(topic), containsInAnyOrder(cb1, cb2));

trie.add("foo/#", cb3);
trie.add("foo/+", cb2);

assertThat(trie.get(topic), containsInAnyOrder(cb1, cb2, cb3));
assertEquals(5, trie.size());

assertThat("remove topic", trie.remove("foo/+", cb2), is(true));
assertEquals(4, trie.size());
assertThat(trie.get("foo/+"), containsInAnyOrder(cb3)); // foo/+ still matches with foo/#
assertThat(trie.get(topic), containsInAnyOrder(cb1, cb2, cb3)); // foo/+/bar/# still exists

assertThat("remove topic", trie.remove("foo/#", cb3), is(true));
assertFalse(trie.containsKey("foo/#"));
assertThat(trie.get("foo/#"), is(empty()));
assertThat(trie.get("foo/+"), is(empty())); // foo/+ doesn't match with any existing topic
assertThat(trie.get(topic), containsInAnyOrder(cb1, cb2)); // foo/+/bar/# still exists
assertEquals(3, trie.size());

assertThat("remove topic", trie.remove(topic, cb1), is(true));
assertThat(trie.get(topic), contains(cb2));
assertEquals(2, trie.size());
assertTrue(trie.containsKey("foo/+"));
assertTrue(trie.containsKey("foo/+/bar/#"));

assertThat("remove topic", trie.remove(topic, cb2), is(true));
assertThat(trie.get(topic), is(empty()));
assertEquals(1, trie.size());
assertFalse(trie.containsKey("foo/+"));
assertFalse(trie.containsKey("foo/+/bar/#"));
}

@Test
void GIVEN_topics_WHEN_isWildcard_THEN_returns_whether_it_uses_wildcard() {
assertTrue(SubscriptionTrie.isWildcard("+"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,8 @@ void GIVEN_endpoints_provided_WHEN_create_thing_THEN_deviceConfig_contains_provi
.parseArgs("-i", getClass().getResource("blank_config.yaml").toString(), "-r", tempRootDir.toString());
DeviceProvisioningHelper.ThingInfo thingInfo = deviceProvisioningHelper.createThing(iotClient, "TestThingPolicy", "TestThing", "mockEndpoint", "");

deviceProvisioningHelper.updateKernelConfigWithIotConfiguration(kernel, thingInfo, "us-east-1", "TestRoleAliasName", "TestCertPath");
deviceProvisioningHelper.updateKernelConfigWithIotConfiguration(kernel, thingInfo, "us-east-1",
"TestRoleAliasName", tempRootDir.resolve("TestCertPath").toString());
assertEquals("mockEndpoint", kernel.getConfig().lookup(SERVICES_NAMESPACE_TOPIC,
DEFAULT_NUCLEUS_COMPONENT_NAME, CONFIGURATION_CONFIG_KEY, DEVICE_PARAM_IOT_DATA_ENDPOINT).getOnce());
}
Expand Down

0 comments on commit e17d003

Please sign in to comment.