From fd184af7e50fc121bdfd224e195ddeaad1da5487 Mon Sep 17 00:00:00 2001 From: noblepaul Date: Wed, 21 Feb 2024 05:39:23 +1100 Subject: [PATCH 1/7] Watched properties impl --- .../solr/cloud/WatchedClusterProperties.java | 75 +++++++++++++++++++ .../org/apache/solr/cloud/ZkController.java | 7 ++ .../solr/cloud/TestClusterProperties.java | 55 ++++++++++++++ 3 files changed, 137 insertions(+) create mode 100644 solr/core/src/java/org/apache/solr/cloud/WatchedClusterProperties.java diff --git a/solr/core/src/java/org/apache/solr/cloud/WatchedClusterProperties.java b/solr/core/src/java/org/apache/solr/cloud/WatchedClusterProperties.java new file mode 100644 index 00000000000..8befa55db6d --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cloud/WatchedClusterProperties.java @@ -0,0 +1,75 @@ +package org.apache.solr.cloud; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.function.BiConsumer; +import org.apache.solr.common.cloud.ClusterPropertiesListener; + +public class WatchedClusterProperties implements ClusterPropertiesListener { + + public static final String WATCHED_PROPERTIES = "watched-properties"; + + private Map>> propertiesListeners = + Collections.synchronizedMap(new HashMap<>()); + + private volatile Map knownData = new HashMap<>(); + + @Override + @SuppressWarnings("unchecked") + public boolean onChange(Map properties) { + Map newData = (Map) properties.get(WATCHED_PROPERTIES); + if (newData == null) newData = Collections.EMPTY_MAP; + compareAndInvokeListeners(newData); + return false; + } + + private void compareAndInvokeListeners(Map newData) { + boolean isModified = false; + // look for changed data or missing keys + for (String k : knownData.keySet()) { + String oldVal = knownData.get(k); + String newVal = newData.get(k); + if (Objects.equals(oldVal, newVal)) continue; + isModified = true; + invokeListener(k, oldVal, newVal); + } + for (String k : newData.keySet()) { + if (knownData.containsKey(k)) continue; + isModified = true; + invokeListener(k, null, newData.get(k)); + } + + if (isModified) { + knownData = Map.copyOf(newData); + } + } + + private void invokeListener(String key, String oldVal, String newVal) { + List> listeners = propertiesListeners.get(key); + if (listeners != null) { + for (BiConsumer l : listeners) { + l.accept(key, newVal); + } + } + listeners = propertiesListeners.get(null); + if (listeners != null) { + for (BiConsumer listener : listeners) { + listener.accept(key, newVal); + } + } + } + + public void watchProperty(String name, BiConsumer listener) { + propertiesListeners.computeIfAbsent(name, s -> new CopyOnWriteArrayList<>()).add(listener); + } + + public void unwatchProperty(String name, BiConsumer listener) { + List> listeners = propertiesListeners.get(name); + if (listeners == null) return; + listeners.remove(listener); + } +} diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java index bc328c0e195..c424b40284f 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java @@ -160,6 +160,8 @@ public class ZkController implements Closeable { private final DistributedMap overseerFailureMap; private final DistributedMap asyncIdsMap; + private final WatchedClusterProperties watchedClusterProperties = new WatchedClusterProperties(); + public static final String COLLECTION_PARAM_PREFIX = "collection."; public static final String CONFIGNAME_PROP = "configName"; @@ -434,6 +436,7 @@ public ZkController( } else { this.overseerJobQueue = overseer.getStateUpdateQueue(); } + zkStateReader.registerClusterPropertiesListener(watchedClusterProperties); this.overseerCollectionQueue = overseer.getCollectionQueue(zkClient); this.overseerConfigSetQueue = overseer.getConfigSetQueue(zkClient); this.sysPropsCacher = @@ -3013,6 +3016,10 @@ public void publishNodeAsDown(String nodeName) { } } + public WatchedClusterProperties getWatchedClusterProperties() { + return watchedClusterProperties; + } + /** * Ensures that a searcher is registered for the given core and if not, waits until one is * registered diff --git a/solr/core/src/test/org/apache/solr/cloud/TestClusterProperties.java b/solr/core/src/test/org/apache/solr/cloud/TestClusterProperties.java index 35cb08fd507..f1c84049707 100644 --- a/solr/core/src/test/org/apache/solr/cloud/TestClusterProperties.java +++ b/solr/core/src/test/org/apache/solr/cloud/TestClusterProperties.java @@ -17,9 +17,12 @@ package org.apache.solr.cloud; +import java.util.HashMap; +import java.util.Map; import org.apache.solr.client.solrj.request.CollectionAdminRequest; import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.ClusterProperties; +import org.apache.solr.common.util.Utils; import org.junit.BeforeClass; import org.junit.Test; @@ -52,4 +55,56 @@ public void testSetInvalidPluginClusterProperty() throws Exception { CollectionAdminRequest.setClusterProperty(propertyName, "valueA") .process(cluster.getSolrClient()); } + + @Test + @SuppressWarnings("unchecked") + public void testWatchedClusterProperties() { + WatchedClusterProperties wcp = new WatchedClusterProperties(); + HashMap listener1 = new HashMap<>(); + HashMap listener2 = new HashMap<>(); + HashMap listener3 = new HashMap<>(); + wcp.watchProperty("p1", (key, value) -> listener1.put(key, value)); + wcp.watchProperty(null, (key, value) -> listener2.put(key, value)); + wcp.watchProperty("p3", (key, value) -> listener3.put(key, value)); + + wcp.onChange( + (Map) + Utils.fromJSONString( + "{\n" + + " \"watched-properties\": {\n" + + " \"p1\": \"v1\",\n" + + " \"p2\": \"v2\"\n" + + " }\n" + + "}")); + assertEquals(1, listener1.size()); + assertEquals("v1", listener1.get("p1")); + assertEquals(2, listener2.size()); + assertEquals("v1", listener2.get("p1")); + assertEquals("v2", listener2.get("p2")); + assertEquals(0, listener3.size()); + listener1.clear(); + listener2.clear(); + listener3.clear(); + wcp.onChange( + (Map) + Utils.fromJSONString( + "{\n" + + " \"watched-properties\": {\n" + + " \"p1\": \"v1\",\n" + + " \"p2\": \"v2\"\n" + + " }\n" + + "}")); + assertEquals(0, listener1.size()); + assertEquals(0, listener2.size()); + assertEquals(0, listener3.size()); + wcp.onChange( + (Map) + Utils.fromJSONString( + "{\n" + " \"watched-properties\": {\n" + " \"p3\": \"v3\"\n" + " }\n" + "}")); + assertEquals(1, listener1.size()); + assertEquals(null, listener1.get("p1")); + assertEquals(3, listener2.size()); + assertEquals(1, listener3.size()); + assertEquals("v3", listener3.get("p3")); + } } From 2b35ca77974e2c5bc1ce0513387471b0a667da29 Mon Sep 17 00:00:00 2001 From: noblepaul Date: Wed, 21 Feb 2024 05:42:41 +1100 Subject: [PATCH 2/7] Watched properties impl --- .../org/apache/solr/cloud/WatchedClusterProperties.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/cloud/WatchedClusterProperties.java b/solr/core/src/java/org/apache/solr/cloud/WatchedClusterProperties.java index 8befa55db6d..52604c1b73c 100644 --- a/solr/core/src/java/org/apache/solr/cloud/WatchedClusterProperties.java +++ b/solr/core/src/java/org/apache/solr/cloud/WatchedClusterProperties.java @@ -35,12 +35,12 @@ private void compareAndInvokeListeners(Map newData) { String newVal = newData.get(k); if (Objects.equals(oldVal, newVal)) continue; isModified = true; - invokeListener(k, oldVal, newVal); + invokeListener(k, newVal); } for (String k : newData.keySet()) { if (knownData.containsKey(k)) continue; isModified = true; - invokeListener(k, null, newData.get(k)); + invokeListener(k, newData.get(k)); } if (isModified) { @@ -48,7 +48,7 @@ private void compareAndInvokeListeners(Map newData) { } } - private void invokeListener(String key, String oldVal, String newVal) { + private void invokeListener(String key, String newVal) { List> listeners = propertiesListeners.get(key); if (listeners != null) { for (BiConsumer l : listeners) { From 848659ca7e4831893d1755a8942c3ba1a63878f9 Mon Sep 17 00:00:00 2001 From: noblepaul Date: Wed, 21 Feb 2024 05:44:32 +1100 Subject: [PATCH 3/7] refactor --- .../apache/solr/cloud/TestClusterProperties.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/solr/core/src/test/org/apache/solr/cloud/TestClusterProperties.java b/solr/core/src/test/org/apache/solr/cloud/TestClusterProperties.java index f1c84049707..85a106b74f3 100644 --- a/solr/core/src/test/org/apache/solr/cloud/TestClusterProperties.java +++ b/solr/core/src/test/org/apache/solr/cloud/TestClusterProperties.java @@ -22,6 +22,7 @@ import org.apache.solr.client.solrj.request.CollectionAdminRequest; import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.ClusterProperties; +import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.Utils; import org.junit.BeforeClass; import org.junit.Test; @@ -60,12 +61,12 @@ public void testSetInvalidPluginClusterProperty() throws Exception { @SuppressWarnings("unchecked") public void testWatchedClusterProperties() { WatchedClusterProperties wcp = new WatchedClusterProperties(); - HashMap listener1 = new HashMap<>(); - HashMap listener2 = new HashMap<>(); - HashMap listener3 = new HashMap<>(); - wcp.watchProperty("p1", (key, value) -> listener1.put(key, value)); - wcp.watchProperty(null, (key, value) -> listener2.put(key, value)); - wcp.watchProperty("p3", (key, value) -> listener3.put(key, value)); + NamedList listener1 = new NamedList<>(); + NamedList listener2 = new NamedList<>(); + NamedList listener3 = new NamedList<>(); + wcp.watchProperty("p1", (key, value) -> listener1.add(key, value)); + wcp.watchProperty(null, (key, value) -> listener2.add(key, value)); + wcp.watchProperty("p3", (key, value) -> listener3.add(key, value)); wcp.onChange( (Map) From 7caef16ad66fa3f0b62ded23159c0a094bea4f92 Mon Sep 17 00:00:00 2001 From: noblepaul Date: Wed, 21 Feb 2024 06:04:10 +1100 Subject: [PATCH 4/7] per node properties --- .../solr/cloud/WatchedClusterProperties.java | 54 ++++++++++++++++--- .../org/apache/solr/cloud/ZkController.java | 3 +- .../solr/cloud/TestClusterProperties.java | 7 +-- 3 files changed, 52 insertions(+), 12 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/cloud/WatchedClusterProperties.java b/solr/core/src/java/org/apache/solr/cloud/WatchedClusterProperties.java index 52604c1b73c..b9e4081e4b5 100644 --- a/solr/core/src/java/org/apache/solr/cloud/WatchedClusterProperties.java +++ b/solr/core/src/java/org/apache/solr/cloud/WatchedClusterProperties.java @@ -11,23 +11,46 @@ public class WatchedClusterProperties implements ClusterPropertiesListener { + final String nodeName; + public static final String WATCHED_PROPERTIES = "watched-properties"; + public static final String WATCHED_NODE_PROPERTIES = "watched-node-properties"; private Map>> propertiesListeners = Collections.synchronizedMap(new HashMap<>()); + private Map>> nodePropertiesListeners = + Collections.synchronizedMap(new HashMap<>()); + private volatile Map knownData = new HashMap<>(); + private volatile Map knownNodeData = new HashMap<>(); + + public WatchedClusterProperties(String nodeName) { + this.nodeName = nodeName; + } @Override @SuppressWarnings("unchecked") public boolean onChange(Map properties) { - Map newData = (Map) properties.get(WATCHED_PROPERTIES); - if (newData == null) newData = Collections.EMPTY_MAP; - compareAndInvokeListeners(newData); + Map newData = + (Map) properties.getOrDefault(WATCHED_PROPERTIES, Collections.EMPTY_MAP); + Map modified = + compareAndInvokeListeners(newData, knownData, propertiesListeners); + if (modified != null) knownData = modified; + + newData = + ((Map>) + properties.getOrDefault(WATCHED_PROPERTIES, Collections.EMPTY_MAP)) + .getOrDefault(nodeName, Collections.EMPTY_MAP); + modified = compareAndInvokeListeners(newData, knownNodeData, nodePropertiesListeners); + if (modified != null) knownNodeData = modified; return false; } - private void compareAndInvokeListeners(Map newData) { + private static Map compareAndInvokeListeners( + Map newData, + Map knownData, + Map>> propertiesListeners) { boolean isModified = false; // look for changed data or missing keys for (String k : knownData.keySet()) { @@ -35,20 +58,25 @@ private void compareAndInvokeListeners(Map newData) { String newVal = newData.get(k); if (Objects.equals(oldVal, newVal)) continue; isModified = true; - invokeListener(k, newVal); + invokeListener(k, newVal, propertiesListeners); } for (String k : newData.keySet()) { if (knownData.containsKey(k)) continue; isModified = true; - invokeListener(k, newData.get(k)); + invokeListener(k, newData.get(k), propertiesListeners); } if (isModified) { - knownData = Map.copyOf(newData); + return Map.copyOf(newData); + } else { + return null; } } - private void invokeListener(String key, String newVal) { + private static void invokeListener( + String key, + String newVal, + Map>> propertiesListeners) { List> listeners = propertiesListeners.get(key); if (listeners != null) { for (BiConsumer l : listeners) { @@ -72,4 +100,14 @@ public void unwatchProperty(String name, BiConsumer listener) { if (listeners == null) return; listeners.remove(listener); } + + public void watchNodeProperty(String name, BiConsumer listener) { + nodePropertiesListeners.computeIfAbsent(name, s -> new CopyOnWriteArrayList<>()).add(listener); + } + + public void unwatchNodeProperty(String name, BiConsumer listener) { + List> listeners = nodePropertiesListeners.get(name); + if (listeners == null) return; + listeners.remove(listener); + } } diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java index c424b40284f..3382c36f4c3 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java @@ -160,7 +160,7 @@ public class ZkController implements Closeable { private final DistributedMap overseerFailureMap; private final DistributedMap asyncIdsMap; - private final WatchedClusterProperties watchedClusterProperties = new WatchedClusterProperties(); + private final WatchedClusterProperties watchedClusterProperties; public static final String COLLECTION_PARAM_PREFIX = "collection."; public static final String CONFIGNAME_PROP = "configName"; @@ -423,6 +423,7 @@ public ZkController( // this must happen after zkStateReader has initialized the cluster props this.baseURL = Utils.getBaseUrlForNodeName(this.nodeName, urlSchemeFromClusterProp); + watchedClusterProperties = new WatchedClusterProperties(nodeName); } catch (KeeperException e) { // Convert checked exception to one acceptable by the caller (see also init() further down) log.error("", e); diff --git a/solr/core/src/test/org/apache/solr/cloud/TestClusterProperties.java b/solr/core/src/test/org/apache/solr/cloud/TestClusterProperties.java index 85a106b74f3..ccb1da6efba 100644 --- a/solr/core/src/test/org/apache/solr/cloud/TestClusterProperties.java +++ b/solr/core/src/test/org/apache/solr/cloud/TestClusterProperties.java @@ -17,8 +17,8 @@ package org.apache.solr.cloud; -import java.util.HashMap; import java.util.Map; +import java.util.function.BiConsumer; import org.apache.solr.client.solrj.request.CollectionAdminRequest; import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.ClusterProperties; @@ -60,13 +60,14 @@ public void testSetInvalidPluginClusterProperty() throws Exception { @Test @SuppressWarnings("unchecked") public void testWatchedClusterProperties() { - WatchedClusterProperties wcp = new WatchedClusterProperties(); + WatchedClusterProperties wcp = new WatchedClusterProperties("node1"); NamedList listener1 = new NamedList<>(); NamedList listener2 = new NamedList<>(); NamedList listener3 = new NamedList<>(); wcp.watchProperty("p1", (key, value) -> listener1.add(key, value)); wcp.watchProperty(null, (key, value) -> listener2.add(key, value)); - wcp.watchProperty("p3", (key, value) -> listener3.add(key, value)); + BiConsumer p3Listener = (key, value) -> listener3.add(key, value); + wcp.watchProperty("p3", p3Listener); wcp.onChange( (Map) From 8a723eb09303e1c76bfe076e853b5fc39d2b2302 Mon Sep 17 00:00:00 2001 From: noblepaul Date: Tue, 27 Feb 2024 22:40:29 +1100 Subject: [PATCH 5/7] per node properties --- .../solr/cloud/WatchedClusterProperties.java | 2 +- .../solr/cloud/TestClusterProperties.java | 47 +++++++++++++++---- 2 files changed, 40 insertions(+), 9 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/cloud/WatchedClusterProperties.java b/solr/core/src/java/org/apache/solr/cloud/WatchedClusterProperties.java index b9e4081e4b5..9547519f819 100644 --- a/solr/core/src/java/org/apache/solr/cloud/WatchedClusterProperties.java +++ b/solr/core/src/java/org/apache/solr/cloud/WatchedClusterProperties.java @@ -40,7 +40,7 @@ public boolean onChange(Map properties) { newData = ((Map>) - properties.getOrDefault(WATCHED_PROPERTIES, Collections.EMPTY_MAP)) + properties.getOrDefault(WATCHED_NODE_PROPERTIES, Collections.EMPTY_MAP)) .getOrDefault(nodeName, Collections.EMPTY_MAP); modified = compareAndInvokeListeners(newData, knownNodeData, nodePropertiesListeners); if (modified != null) knownNodeData = modified; diff --git a/solr/core/src/test/org/apache/solr/cloud/TestClusterProperties.java b/solr/core/src/test/org/apache/solr/cloud/TestClusterProperties.java index ccb1da6efba..77cad67373d 100644 --- a/solr/core/src/test/org/apache/solr/cloud/TestClusterProperties.java +++ b/solr/core/src/test/org/apache/solr/cloud/TestClusterProperties.java @@ -57,15 +57,46 @@ public void testSetInvalidPluginClusterProperty() throws Exception { .process(cluster.getSolrClient()); } + @Test + @SuppressWarnings("unchecked") + public void testWatchedNodeProperties() { + WatchedClusterProperties wcp = new WatchedClusterProperties("node1"); + NamedList listener1 = new NamedList<>(); + NamedList all = new NamedList<>(); + NamedList listener3 = new NamedList<>(); + wcp.watchNodeProperty("p1", (key, value) -> listener1.add(key, value)); + wcp.watchNodeProperty(null, (key, value) -> all.add(key, value)); + BiConsumer p3Listener = (key, value) -> listener3.add(key, value); + wcp.watchNodeProperty("p3", p3Listener); + wcp.onChange( + (Map) + Utils.fromJSONString( + "{\n" + + " \"watched-node-properties\" :{\n" + + " \"node1\" : {\n" + + " \"p1\" : \"v1\",\n" + + " \"p2\" : \"v2\"\n" + + " }\n" + + " }\n" + + "}")); + + assertEquals(1, listener1.size()); + assertEquals("v1", listener1.get("p1")); + assertEquals(2, all.size()); + assertEquals("v1", all.get("p1")); + assertEquals("v2", all.get("p2")); + assertEquals(0, listener3.size()); + } + @Test @SuppressWarnings("unchecked") public void testWatchedClusterProperties() { WatchedClusterProperties wcp = new WatchedClusterProperties("node1"); NamedList listener1 = new NamedList<>(); - NamedList listener2 = new NamedList<>(); + NamedList all = new NamedList<>(); NamedList listener3 = new NamedList<>(); wcp.watchProperty("p1", (key, value) -> listener1.add(key, value)); - wcp.watchProperty(null, (key, value) -> listener2.add(key, value)); + wcp.watchProperty(null, (key, value) -> all.add(key, value)); BiConsumer p3Listener = (key, value) -> listener3.add(key, value); wcp.watchProperty("p3", p3Listener); @@ -80,12 +111,12 @@ public void testWatchedClusterProperties() { + "}")); assertEquals(1, listener1.size()); assertEquals("v1", listener1.get("p1")); - assertEquals(2, listener2.size()); - assertEquals("v1", listener2.get("p1")); - assertEquals("v2", listener2.get("p2")); + assertEquals(2, all.size()); + assertEquals("v1", all.get("p1")); + assertEquals("v2", all.get("p2")); assertEquals(0, listener3.size()); listener1.clear(); - listener2.clear(); + all.clear(); listener3.clear(); wcp.onChange( (Map) @@ -97,7 +128,7 @@ public void testWatchedClusterProperties() { + " }\n" + "}")); assertEquals(0, listener1.size()); - assertEquals(0, listener2.size()); + assertEquals(0, all.size()); assertEquals(0, listener3.size()); wcp.onChange( (Map) @@ -105,7 +136,7 @@ public void testWatchedClusterProperties() { "{\n" + " \"watched-properties\": {\n" + " \"p3\": \"v3\"\n" + " }\n" + "}")); assertEquals(1, listener1.size()); assertEquals(null, listener1.get("p1")); - assertEquals(3, listener2.size()); + assertEquals(3, all.size()); assertEquals(1, listener3.size()); assertEquals("v3", listener3.get("p3")); } From c4bb983c2f7c61a31ed1f66c970db74c31cc2db1 Mon Sep 17 00:00:00 2001 From: noblepaul Date: Tue, 12 Mar 2024 05:15:02 +1100 Subject: [PATCH 6/7] removed the explicit watchNodeProperty(). So, the same watch applies to both global and node property. if the same property is present in global and node values, global value is ignored --- .../solr/cloud/WatchedClusterProperties.java | 35 ++-- .../org/apache/solr/handler/ClusterAPI.java | 32 ++++ .../solr/cloud/TestClusterProperties.java | 154 ++++++++++++------ 3 files changed, 152 insertions(+), 69 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/cloud/WatchedClusterProperties.java b/solr/core/src/java/org/apache/solr/cloud/WatchedClusterProperties.java index 9547519f819..bc7604c5581 100644 --- a/solr/core/src/java/org/apache/solr/cloud/WatchedClusterProperties.java +++ b/solr/core/src/java/org/apache/solr/cloud/WatchedClusterProperties.java @@ -19,8 +19,8 @@ public class WatchedClusterProperties implements ClusterPropertiesListener { private Map>> propertiesListeners = Collections.synchronizedMap(new HashMap<>()); - private Map>> nodePropertiesListeners = - Collections.synchronizedMap(new HashMap<>()); + /* private Map>> nodePropertiesListeners = + Collections.synchronizedMap(new HashMap<>());*/ private volatile Map knownData = new HashMap<>(); private volatile Map knownNodeData = new HashMap<>(); @@ -32,24 +32,27 @@ public WatchedClusterProperties(String nodeName) { @Override @SuppressWarnings("unchecked") public boolean onChange(Map properties) { - Map newData = - (Map) properties.getOrDefault(WATCHED_PROPERTIES, Collections.EMPTY_MAP); - Map modified = - compareAndInvokeListeners(newData, knownData, propertiesListeners); - if (modified != null) knownData = modified; - newData = + Map newData = ((Map>) properties.getOrDefault(WATCHED_NODE_PROPERTIES, Collections.EMPTY_MAP)) .getOrDefault(nodeName, Collections.EMPTY_MAP); - modified = compareAndInvokeListeners(newData, knownNodeData, nodePropertiesListeners); + Map modified = + compareAndInvokeListeners(newData, knownNodeData, null, propertiesListeners); if (modified != null) knownNodeData = modified; + + newData = + (Map) properties.getOrDefault(WATCHED_PROPERTIES, Collections.EMPTY_MAP); + modified = compareAndInvokeListeners(newData, knownData, knownNodeData, propertiesListeners); + if (modified != null) knownData = modified; + return false; } private static Map compareAndInvokeListeners( Map newData, Map knownData, + Map overrideData, Map>> propertiesListeners) { boolean isModified = false; // look for changed data or missing keys @@ -58,6 +61,10 @@ private static Map compareAndInvokeListeners( String newVal = newData.get(k); if (Objects.equals(oldVal, newVal)) continue; isModified = true; + if (overrideData != null && overrideData.containsKey(k)) { + // per node properties contain this key. do not invoke listener + continue; + } invokeListener(k, newVal, propertiesListeners); } for (String k : newData.keySet()) { @@ -100,14 +107,4 @@ public void unwatchProperty(String name, BiConsumer listener) { if (listeners == null) return; listeners.remove(listener); } - - public void watchNodeProperty(String name, BiConsumer listener) { - nodePropertiesListeners.computeIfAbsent(name, s -> new CopyOnWriteArrayList<>()).add(listener); - } - - public void unwatchNodeProperty(String name, BiConsumer listener) { - List> listeners = nodePropertiesListeners.get(name); - if (listeners == null) return; - listeners.remove(listener); - } } diff --git a/solr/core/src/java/org/apache/solr/handler/ClusterAPI.java b/solr/core/src/java/org/apache/solr/handler/ClusterAPI.java index e161d55e5b3..f54d9deff8e 100644 --- a/solr/core/src/java/org/apache/solr/handler/ClusterAPI.java +++ b/solr/core/src/java/org/apache/solr/handler/ClusterAPI.java @@ -34,6 +34,7 @@ import java.io.IOException; import java.util.Collections; +import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -45,6 +46,7 @@ import org.apache.solr.client.solrj.cloud.DistribStateManager; import org.apache.solr.client.solrj.request.beans.ClusterPropPayload; import org.apache.solr.client.solrj.request.beans.RateLimiterPayload; +import org.apache.solr.cloud.WatchedClusterProperties; import org.apache.solr.common.SolrException; import org.apache.solr.common.annotation.JsonProperty; import org.apache.solr.common.cloud.ClusterProperties; @@ -69,12 +71,42 @@ public class ClusterAPI { private final ConfigSetsHandler configSetsHandler; public final Commands commands = new Commands(); + private final WCP watchedProps = new WCP(); + + // NOCOMMIT. for demo + public static class WCP implements ReflectMapWriter { + @JsonProperty public String global_prop1_val = null; + @JsonProperty public String global_prop1_time; + @JsonProperty public String node_prop1_val = null; + @JsonProperty public String node_prop1_time; + } public ClusterAPI(CollectionsHandler ch, ConfigSetsHandler configSetsHandler) { this.collectionsHandler = ch; this.configSetsHandler = configSetsHandler; + WatchedClusterProperties wcp = + ch.getCoreContainer().getZkController().getWatchedClusterProperties(); + wcp.watchProperty( + "global_prop1", + (k, v) -> { + watchedProps.global_prop1_val = v; + watchedProps.global_prop1_time = new Date().toString(); + }); + wcp.watchProperty( + "node_prop1", + (k, v) -> { + watchedProps.node_prop1_time = v; + watchedProps.node_prop1_val = new Date().toString(); + }); + } + + @EndPoint(method = GET, path = "/node/watched-props", permission = COLL_READ_PERM) + public void watchedProps(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception { + rsp.add("watched-props", watchedProps); } + // end NOCOMMIT. for demo + @EndPoint(method = GET, path = "/cluster/node-roles", permission = COLL_READ_PERM) public void roles(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception { rsp.add( diff --git a/solr/core/src/test/org/apache/solr/cloud/TestClusterProperties.java b/solr/core/src/test/org/apache/solr/cloud/TestClusterProperties.java index 77cad67373d..60d680979e6 100644 --- a/solr/core/src/test/org/apache/solr/cloud/TestClusterProperties.java +++ b/solr/core/src/test/org/apache/solr/cloud/TestClusterProperties.java @@ -17,6 +17,7 @@ package org.apache.solr.cloud; +import java.util.HashMap; import java.util.Map; import java.util.function.BiConsumer; import org.apache.solr.client.solrj.request.CollectionAdminRequest; @@ -64,10 +65,10 @@ public void testWatchedNodeProperties() { NamedList listener1 = new NamedList<>(); NamedList all = new NamedList<>(); NamedList listener3 = new NamedList<>(); - wcp.watchNodeProperty("p1", (key, value) -> listener1.add(key, value)); - wcp.watchNodeProperty(null, (key, value) -> all.add(key, value)); + wcp.watchProperty("p1", (key, value) -> listener1.add(key, value)); + wcp.watchProperty(null, (key, value) -> all.add(key, value)); BiConsumer p3Listener = (key, value) -> listener3.add(key, value); - wcp.watchNodeProperty("p3", p3Listener); + wcp.watchProperty("p3", p3Listener); wcp.onChange( (Map) Utils.fromJSONString( @@ -88,56 +89,109 @@ public void testWatchedNodeProperties() { assertEquals(0, listener3.size()); } + static class NodeWatch { + final WatchedClusterProperties wcp; + Map> events = new HashMap<>(); + + NodeWatch(String name) { + wcp = new WatchedClusterProperties(name); + } + + @SuppressWarnings("unchecked") + void newProps(String json) { + events.clear(); + wcp.onChange((Map) Utils.fromJSONString(json)); + } + + NodeWatch listen(String name) { + wcp.watchProperty( + name, (k, v) -> events.computeIfAbsent(name, s -> new NamedList<>()).add(k, v)); + return this; + } + + static final NamedList empty = new NamedList<>(); + + int count(String prop) { + return events.getOrDefault(prop, empty)._size(); + } + + String val(String key) { + return events.getOrDefault(key, empty).get(key); + } + + String val(String listenerName, String key) { + return events.getOrDefault(listenerName, empty).get(key); + } + } + @Test @SuppressWarnings("unchecked") public void testWatchedClusterProperties() { - WatchedClusterProperties wcp = new WatchedClusterProperties("node1"); - NamedList listener1 = new NamedList<>(); - NamedList all = new NamedList<>(); - NamedList listener3 = new NamedList<>(); - wcp.watchProperty("p1", (key, value) -> listener1.add(key, value)); - wcp.watchProperty(null, (key, value) -> all.add(key, value)); - BiConsumer p3Listener = (key, value) -> listener3.add(key, value); - wcp.watchProperty("p3", p3Listener); + NodeWatch n1 = new NodeWatch("node1").listen("p1").listen(null).listen("p3"); + NodeWatch n2 = new NodeWatch("node2").listen("p1").listen(null).listen("p3"); + ; - wcp.onChange( - (Map) - Utils.fromJSONString( - "{\n" - + " \"watched-properties\": {\n" - + " \"p1\": \"v1\",\n" - + " \"p2\": \"v2\"\n" - + " }\n" - + "}")); - assertEquals(1, listener1.size()); - assertEquals("v1", listener1.get("p1")); - assertEquals(2, all.size()); - assertEquals("v1", all.get("p1")); - assertEquals("v2", all.get("p2")); - assertEquals(0, listener3.size()); - listener1.clear(); - all.clear(); - listener3.clear(); - wcp.onChange( - (Map) - Utils.fromJSONString( - "{\n" - + " \"watched-properties\": {\n" - + " \"p1\": \"v1\",\n" - + " \"p2\": \"v2\"\n" - + " }\n" - + "}")); - assertEquals(0, listener1.size()); - assertEquals(0, all.size()); - assertEquals(0, listener3.size()); - wcp.onChange( - (Map) - Utils.fromJSONString( - "{\n" + " \"watched-properties\": {\n" + " \"p3\": \"v3\"\n" + " }\n" + "}")); - assertEquals(1, listener1.size()); - assertEquals(null, listener1.get("p1")); - assertEquals(3, all.size()); - assertEquals(1, listener3.size()); - assertEquals("v3", listener3.get("p3")); + String json = + "{\n" + + " \"watched-properties\": {\n" + + " \"p1\": \"v1\",\n" + + " \"p2\": \"v2\"\n" + + " }\n" + + "}"; + n1.newProps(json); + n2.newProps(json); + + assertEquals(1, n1.count("p1")); + assertEquals("v1", n1.val("p1")); + assertEquals(2, n1.count(null)); + assertEquals("v1", n1.val(null, "p1")); + assertEquals("v2", n1.val(null, "p2")); + assertEquals(0, n1.count("p3")); + + assertEquals(1, n2.count("p1")); + assertEquals("v1", n2.val("p1")); + assertEquals(2, n2.count(null)); + assertEquals("v1", n2.val(null, "p1")); + assertEquals("v2", n2.val(null, "p2")); + assertEquals(0, n2.count("p3")); + + json = + "{\n" + + " \"watched-properties\": {\n" + + " \"p1\": \"v1\",\n" + + " \"p2\": \"v2\"\n" + + " \"p3\": \"v3\"\n" + + " }\n" + + "}"; + n1.newProps(json); + n2.newProps(json); + assertEquals(0, n1.count("p1")); + assertEquals(0, n1.count("p2")); + assertEquals(0, n2.count("p1")); + assertEquals(0, n2.count("p2")); + assertEquals("v3", n1.val("p3")); + assertEquals("v3", n2.val("p3")); + json = + "{\n" + + " \"watched-properties\": {\n" + + " \"p1\": \"v1\",\n" + + " \"p2\": \"v2\"\n" + + " \"p3\": \"v3\"\n" + + " },\n" + + " \"watched-node-properties\" :{\n" + + " \"node1\" : {\n" + + " \"p1\" : \"v_n1\",\n" + + " \"p2\" : \"v_n2\"\n" + + " },\n" + + " }\n" + + "}"; + n1.newProps(json); + n2.newProps(json); + assertEquals(0, n2.count("p1")); + assertEquals(0, n2.count(null)); + + assertEquals("v_n1", n1.val("p1")); + assertEquals("v_n1", n1.val(null, "p1")); + assertEquals("v_n2", n1.val(null, "p2")); } } From e9fa8b28b2da48252f822e78474c31dabe6e2ad8 Mon Sep 17 00:00:00 2001 From: noblepaul Date: Wed, 13 Mar 2024 12:28:59 +1100 Subject: [PATCH 7/7] rollback. accidentally committed demo code --- .../org/apache/solr/handler/ClusterAPI.java | 32 ------------------- 1 file changed, 32 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/handler/ClusterAPI.java b/solr/core/src/java/org/apache/solr/handler/ClusterAPI.java index f54d9deff8e..e161d55e5b3 100644 --- a/solr/core/src/java/org/apache/solr/handler/ClusterAPI.java +++ b/solr/core/src/java/org/apache/solr/handler/ClusterAPI.java @@ -34,7 +34,6 @@ import java.io.IOException; import java.util.Collections; -import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -46,7 +45,6 @@ import org.apache.solr.client.solrj.cloud.DistribStateManager; import org.apache.solr.client.solrj.request.beans.ClusterPropPayload; import org.apache.solr.client.solrj.request.beans.RateLimiterPayload; -import org.apache.solr.cloud.WatchedClusterProperties; import org.apache.solr.common.SolrException; import org.apache.solr.common.annotation.JsonProperty; import org.apache.solr.common.cloud.ClusterProperties; @@ -71,42 +69,12 @@ public class ClusterAPI { private final ConfigSetsHandler configSetsHandler; public final Commands commands = new Commands(); - private final WCP watchedProps = new WCP(); - - // NOCOMMIT. for demo - public static class WCP implements ReflectMapWriter { - @JsonProperty public String global_prop1_val = null; - @JsonProperty public String global_prop1_time; - @JsonProperty public String node_prop1_val = null; - @JsonProperty public String node_prop1_time; - } public ClusterAPI(CollectionsHandler ch, ConfigSetsHandler configSetsHandler) { this.collectionsHandler = ch; this.configSetsHandler = configSetsHandler; - WatchedClusterProperties wcp = - ch.getCoreContainer().getZkController().getWatchedClusterProperties(); - wcp.watchProperty( - "global_prop1", - (k, v) -> { - watchedProps.global_prop1_val = v; - watchedProps.global_prop1_time = new Date().toString(); - }); - wcp.watchProperty( - "node_prop1", - (k, v) -> { - watchedProps.node_prop1_time = v; - watchedProps.node_prop1_val = new Date().toString(); - }); - } - - @EndPoint(method = GET, path = "/node/watched-props", permission = COLL_READ_PERM) - public void watchedProps(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception { - rsp.add("watched-props", watchedProps); } - // end NOCOMMIT. for demo - @EndPoint(method = GET, path = "/cluster/node-roles", permission = COLL_READ_PERM) public void roles(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception { rsp.add(