diff --git a/solr/core/src/java/org/apache/solr/cloud/WatchedClusterProperties.java b/solr/core/src/java/org/apache/solr/cloud/WatchedClusterProperties.java new file mode 100644 index 00000000000..bc7604c5581 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cloud/WatchedClusterProperties.java @@ -0,0 +1,110 @@ +package org.apache.solr.cloud; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.function.BiConsumer; +import org.apache.solr.common.cloud.ClusterPropertiesListener; + +public class WatchedClusterProperties implements ClusterPropertiesListener { + + final String nodeName; + + public static final String WATCHED_PROPERTIES = "watched-properties"; + public static final String WATCHED_NODE_PROPERTIES = "watched-node-properties"; + + private Map>> propertiesListeners = + Collections.synchronizedMap(new HashMap<>()); + + /* private Map>> nodePropertiesListeners = + Collections.synchronizedMap(new HashMap<>());*/ + + private volatile Map knownData = new HashMap<>(); + private volatile Map knownNodeData = new HashMap<>(); + + public WatchedClusterProperties(String nodeName) { + this.nodeName = nodeName; + } + + @Override + @SuppressWarnings("unchecked") + public boolean onChange(Map properties) { + + Map newData = + ((Map>) + properties.getOrDefault(WATCHED_NODE_PROPERTIES, Collections.EMPTY_MAP)) + .getOrDefault(nodeName, Collections.EMPTY_MAP); + Map modified = + compareAndInvokeListeners(newData, knownNodeData, null, propertiesListeners); + if (modified != null) knownNodeData = modified; + + newData = + (Map) properties.getOrDefault(WATCHED_PROPERTIES, Collections.EMPTY_MAP); + modified = compareAndInvokeListeners(newData, knownData, knownNodeData, propertiesListeners); + if (modified != null) knownData = modified; + + return false; + } + + private static Map compareAndInvokeListeners( + Map newData, + Map knownData, + Map overrideData, + Map>> propertiesListeners) { + boolean isModified = false; + // look for changed data or missing keys + for (String k : knownData.keySet()) { + String oldVal = knownData.get(k); + String newVal = newData.get(k); + if (Objects.equals(oldVal, newVal)) continue; + isModified = true; + if (overrideData != null && overrideData.containsKey(k)) { + // per node properties contain this key. do not invoke listener + continue; + } + invokeListener(k, newVal, propertiesListeners); + } + for (String k : newData.keySet()) { + if (knownData.containsKey(k)) continue; + isModified = true; + invokeListener(k, newData.get(k), propertiesListeners); + } + + if (isModified) { + return Map.copyOf(newData); + } else { + return null; + } + } + + private static void invokeListener( + String key, + String newVal, + Map>> propertiesListeners) { + List> listeners = propertiesListeners.get(key); + if (listeners != null) { + for (BiConsumer l : listeners) { + l.accept(key, newVal); + } + } + listeners = propertiesListeners.get(null); + if (listeners != null) { + for (BiConsumer listener : listeners) { + listener.accept(key, newVal); + } + } + } + + public void watchProperty(String name, BiConsumer listener) { + propertiesListeners.computeIfAbsent(name, s -> new CopyOnWriteArrayList<>()).add(listener); + } + + public void unwatchProperty(String name, BiConsumer listener) { + List> listeners = propertiesListeners.get(name); + if (listeners == null) return; + listeners.remove(listener); + } +} diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java index bc328c0e195..3382c36f4c3 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java @@ -160,6 +160,8 @@ public class ZkController implements Closeable { private final DistributedMap overseerFailureMap; private final DistributedMap asyncIdsMap; + private final WatchedClusterProperties watchedClusterProperties; + public static final String COLLECTION_PARAM_PREFIX = "collection."; public static final String CONFIGNAME_PROP = "configName"; @@ -421,6 +423,7 @@ public ZkController( // this must happen after zkStateReader has initialized the cluster props this.baseURL = Utils.getBaseUrlForNodeName(this.nodeName, urlSchemeFromClusterProp); + watchedClusterProperties = new WatchedClusterProperties(nodeName); } catch (KeeperException e) { // Convert checked exception to one acceptable by the caller (see also init() further down) log.error("", e); @@ -434,6 +437,7 @@ public ZkController( } else { this.overseerJobQueue = overseer.getStateUpdateQueue(); } + zkStateReader.registerClusterPropertiesListener(watchedClusterProperties); this.overseerCollectionQueue = overseer.getCollectionQueue(zkClient); this.overseerConfigSetQueue = overseer.getConfigSetQueue(zkClient); this.sysPropsCacher = @@ -3013,6 +3017,10 @@ public void publishNodeAsDown(String nodeName) { } } + public WatchedClusterProperties getWatchedClusterProperties() { + return watchedClusterProperties; + } + /** * Ensures that a searcher is registered for the given core and if not, waits until one is * registered diff --git a/solr/core/src/test/org/apache/solr/cloud/TestClusterProperties.java b/solr/core/src/test/org/apache/solr/cloud/TestClusterProperties.java index 35cb08fd507..60d680979e6 100644 --- a/solr/core/src/test/org/apache/solr/cloud/TestClusterProperties.java +++ b/solr/core/src/test/org/apache/solr/cloud/TestClusterProperties.java @@ -17,9 +17,14 @@ package org.apache.solr.cloud; +import java.util.HashMap; +import java.util.Map; +import java.util.function.BiConsumer; import org.apache.solr.client.solrj.request.CollectionAdminRequest; import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.ClusterProperties; +import org.apache.solr.common.util.NamedList; +import org.apache.solr.common.util.Utils; import org.junit.BeforeClass; import org.junit.Test; @@ -52,4 +57,141 @@ public void testSetInvalidPluginClusterProperty() throws Exception { CollectionAdminRequest.setClusterProperty(propertyName, "valueA") .process(cluster.getSolrClient()); } + + @Test + @SuppressWarnings("unchecked") + public void testWatchedNodeProperties() { + WatchedClusterProperties wcp = new WatchedClusterProperties("node1"); + NamedList listener1 = new NamedList<>(); + NamedList all = new NamedList<>(); + NamedList listener3 = new NamedList<>(); + wcp.watchProperty("p1", (key, value) -> listener1.add(key, value)); + wcp.watchProperty(null, (key, value) -> all.add(key, value)); + BiConsumer p3Listener = (key, value) -> listener3.add(key, value); + wcp.watchProperty("p3", p3Listener); + wcp.onChange( + (Map) + Utils.fromJSONString( + "{\n" + + " \"watched-node-properties\" :{\n" + + " \"node1\" : {\n" + + " \"p1\" : \"v1\",\n" + + " \"p2\" : \"v2\"\n" + + " }\n" + + " }\n" + + "}")); + + assertEquals(1, listener1.size()); + assertEquals("v1", listener1.get("p1")); + assertEquals(2, all.size()); + assertEquals("v1", all.get("p1")); + assertEquals("v2", all.get("p2")); + assertEquals(0, listener3.size()); + } + + static class NodeWatch { + final WatchedClusterProperties wcp; + Map> events = new HashMap<>(); + + NodeWatch(String name) { + wcp = new WatchedClusterProperties(name); + } + + @SuppressWarnings("unchecked") + void newProps(String json) { + events.clear(); + wcp.onChange((Map) Utils.fromJSONString(json)); + } + + NodeWatch listen(String name) { + wcp.watchProperty( + name, (k, v) -> events.computeIfAbsent(name, s -> new NamedList<>()).add(k, v)); + return this; + } + + static final NamedList empty = new NamedList<>(); + + int count(String prop) { + return events.getOrDefault(prop, empty)._size(); + } + + String val(String key) { + return events.getOrDefault(key, empty).get(key); + } + + String val(String listenerName, String key) { + return events.getOrDefault(listenerName, empty).get(key); + } + } + + @Test + @SuppressWarnings("unchecked") + public void testWatchedClusterProperties() { + NodeWatch n1 = new NodeWatch("node1").listen("p1").listen(null).listen("p3"); + NodeWatch n2 = new NodeWatch("node2").listen("p1").listen(null).listen("p3"); + ; + + String json = + "{\n" + + " \"watched-properties\": {\n" + + " \"p1\": \"v1\",\n" + + " \"p2\": \"v2\"\n" + + " }\n" + + "}"; + n1.newProps(json); + n2.newProps(json); + + assertEquals(1, n1.count("p1")); + assertEquals("v1", n1.val("p1")); + assertEquals(2, n1.count(null)); + assertEquals("v1", n1.val(null, "p1")); + assertEquals("v2", n1.val(null, "p2")); + assertEquals(0, n1.count("p3")); + + assertEquals(1, n2.count("p1")); + assertEquals("v1", n2.val("p1")); + assertEquals(2, n2.count(null)); + assertEquals("v1", n2.val(null, "p1")); + assertEquals("v2", n2.val(null, "p2")); + assertEquals(0, n2.count("p3")); + + json = + "{\n" + + " \"watched-properties\": {\n" + + " \"p1\": \"v1\",\n" + + " \"p2\": \"v2\"\n" + + " \"p3\": \"v3\"\n" + + " }\n" + + "}"; + n1.newProps(json); + n2.newProps(json); + assertEquals(0, n1.count("p1")); + assertEquals(0, n1.count("p2")); + assertEquals(0, n2.count("p1")); + assertEquals(0, n2.count("p2")); + assertEquals("v3", n1.val("p3")); + assertEquals("v3", n2.val("p3")); + json = + "{\n" + + " \"watched-properties\": {\n" + + " \"p1\": \"v1\",\n" + + " \"p2\": \"v2\"\n" + + " \"p3\": \"v3\"\n" + + " },\n" + + " \"watched-node-properties\" :{\n" + + " \"node1\" : {\n" + + " \"p1\" : \"v_n1\",\n" + + " \"p2\" : \"v_n2\"\n" + + " },\n" + + " }\n" + + "}"; + n1.newProps(json); + n2.newProps(json); + assertEquals(0, n2.count("p1")); + assertEquals(0, n2.count(null)); + + assertEquals("v_n1", n1.val("p1")); + assertEquals("v_n1", n1.val(null, "p1")); + assertEquals("v_n2", n1.val(null, "p2")); + } }