Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SAI-4731 : supporting feature flag in solr #180

Open
wants to merge 7 commits into
base: fs/branch_9_3
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 113 additions & 0 deletions solr/core/src/java/org/apache/solr/cloud/WatchedClusterProperties.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package org.apache.solr.cloud;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.BiConsumer;
import org.apache.solr.common.cloud.ClusterPropertiesListener;

public class WatchedClusterProperties implements ClusterPropertiesListener {

final String nodeName;

public static final String WATCHED_PROPERTIES = "watched-properties";
public static final String WATCHED_NODE_PROPERTIES = "watched-node-properties";

private Map<String, List<BiConsumer<String, String>>> propertiesListeners =
Collections.synchronizedMap(new HashMap<>());

private Map<String, List<BiConsumer<String, String>>> nodePropertiesListeners =
Collections.synchronizedMap(new HashMap<>());

private volatile Map<String, String> knownData = new HashMap<>();
private volatile Map<String, String> knownNodeData = new HashMap<>();

public WatchedClusterProperties(String nodeName) {
this.nodeName = nodeName;
}

@Override
@SuppressWarnings("unchecked")
public boolean onChange(Map<String, Object> properties) {
Map<String, String> newData =
(Map<String, String>) properties.getOrDefault(WATCHED_PROPERTIES, Collections.EMPTY_MAP);
Map<String, String> modified =
compareAndInvokeListeners(newData, knownData, propertiesListeners);
if (modified != null) knownData = modified;

newData =
((Map<String, Map<String, String>>)
properties.getOrDefault(WATCHED_NODE_PROPERTIES, Collections.EMPTY_MAP))
.getOrDefault(nodeName, Collections.EMPTY_MAP);
modified = compareAndInvokeListeners(newData, knownNodeData, nodePropertiesListeners);
if (modified != null) knownNodeData = modified;
return false;
}

private static Map<String, String> compareAndInvokeListeners(
Map<String, String> newData,
Map<String, String> knownData,
Map<String, List<BiConsumer<String, String>>> propertiesListeners) {
boolean isModified = false;
// look for changed data or missing keys
for (String k : knownData.keySet()) {
String oldVal = knownData.get(k);
String newVal = newData.get(k);
if (Objects.equals(oldVal, newVal)) continue;
isModified = true;
invokeListener(k, newVal, propertiesListeners);
}
for (String k : newData.keySet()) {
if (knownData.containsKey(k)) continue;
isModified = true;
invokeListener(k, newData.get(k), propertiesListeners);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if prop is set for node1, then we need to invoke listener on that node only

Copy link
Collaborator Author

@noblepaul noblepaul Feb 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, that is what it does. You register the watcher from within a node and listeners are fired for the properties of that node only

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will happens we update the property in ZK, will that cause listener-update-event on each node or all node?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do you mean by each node and all node?

everyone who registered a listener should get a callback

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example I want these property value for this node only

{
  component: "basiclimit", host:solr-1,property-x:value-y
} 

Then this should be invoked on host solr-1 only.

Copy link
Collaborator Author

@noblepaul noblepaul Mar 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when the property is defined for solr-1 , you get a calback on solr-1 only.
It would look something like this. Inside a node, you can only watch properties of that node

{
  "watched-node-properties" :{
    "solr-1" : {
        "basiclimit.property-x":"value-y"
    }
  }
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, let's give demo on this in Monday's sync. thanks!

}

if (isModified) {
return Map.copyOf(newData);
} else {
return null;
}
}

private static void invokeListener(
String key,
String newVal,
Map<String, List<BiConsumer<String, String>>> propertiesListeners) {
List<BiConsumer<String, String>> listeners = propertiesListeners.get(key);
if (listeners != null) {
for (BiConsumer<String, String> l : listeners) {
l.accept(key, newVal);
}
}
listeners = propertiesListeners.get(null);
if (listeners != null) {
for (BiConsumer<String, String> listener : listeners) {
listener.accept(key, newVal);
}
}
}

public void watchProperty(String name, BiConsumer<String, String> listener) {
propertiesListeners.computeIfAbsent(name, s -> new CopyOnWriteArrayList<>()).add(listener);
}

public void unwatchProperty(String name, BiConsumer<String, String> listener) {
List<BiConsumer<String, String>> listeners = propertiesListeners.get(name);
if (listeners == null) return;
listeners.remove(listener);
}

public void watchNodeProperty(String name, BiConsumer<String, String> listener) {
nodePropertiesListeners.computeIfAbsent(name, s -> new CopyOnWriteArrayList<>()).add(listener);
}

public void unwatchNodeProperty(String name, BiConsumer<String, String> listener) {
List<BiConsumer<String, String>> listeners = nodePropertiesListeners.get(name);
if (listeners == null) return;
listeners.remove(listener);
}
}
8 changes: 8 additions & 0 deletions solr/core/src/java/org/apache/solr/cloud/ZkController.java
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ public class ZkController implements Closeable {
private final DistributedMap overseerFailureMap;
private final DistributedMap asyncIdsMap;

private final WatchedClusterProperties watchedClusterProperties;

public static final String COLLECTION_PARAM_PREFIX = "collection.";
public static final String CONFIGNAME_PROP = "configName";

Expand Down Expand Up @@ -421,6 +423,7 @@ public ZkController(

// this must happen after zkStateReader has initialized the cluster props
this.baseURL = Utils.getBaseUrlForNodeName(this.nodeName, urlSchemeFromClusterProp);
watchedClusterProperties = new WatchedClusterProperties(nodeName);
} catch (KeeperException e) {
// Convert checked exception to one acceptable by the caller (see also init() further down)
log.error("", e);
Expand All @@ -434,6 +437,7 @@ public ZkController(
} else {
this.overseerJobQueue = overseer.getStateUpdateQueue();
}
zkStateReader.registerClusterPropertiesListener(watchedClusterProperties);
this.overseerCollectionQueue = overseer.getCollectionQueue(zkClient);
this.overseerConfigSetQueue = overseer.getConfigSetQueue(zkClient);
this.sysPropsCacher =
Expand Down Expand Up @@ -3013,6 +3017,10 @@ public void publishNodeAsDown(String nodeName) {
}
}

public WatchedClusterProperties getWatchedClusterProperties() {
return watchedClusterProperties;
}

/**
* Ensures that a searcher is registered for the given core and if not, waits until one is
* registered
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@

package org.apache.solr.cloud;

import java.util.Map;
import java.util.function.BiConsumer;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterProperties;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.Utils;
import org.junit.BeforeClass;
import org.junit.Test;

Expand Down Expand Up @@ -52,4 +56,88 @@ public void testSetInvalidPluginClusterProperty() throws Exception {
CollectionAdminRequest.setClusterProperty(propertyName, "valueA")
.process(cluster.getSolrClient());
}

@Test
@SuppressWarnings("unchecked")
public void testWatchedNodeProperties() {
WatchedClusterProperties wcp = new WatchedClusterProperties("node1");
NamedList<String> listener1 = new NamedList<>();
NamedList<String> all = new NamedList<>();
NamedList<String> listener3 = new NamedList<>();
wcp.watchNodeProperty("p1", (key, value) -> listener1.add(key, value));
wcp.watchNodeProperty(null, (key, value) -> all.add(key, value));
BiConsumer<String, String> p3Listener = (key, value) -> listener3.add(key, value);
wcp.watchNodeProperty("p3", p3Listener);
wcp.onChange(
(Map<String, Object>)
Utils.fromJSONString(
"{\n"
+ " \"watched-node-properties\" :{\n"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What all way we can define this?

  1. Can we differentiate between two components? (say, component-x, component-y)
  2. Can we define prop for component-x (node-level, otherwise-all, just-for-all) ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not per component, it's just a property per node. May be we can prefix a component name in the property key

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need three cases here

  1. One can define properties at component level. While registering watch take component-name, if same exists then through the exception.
  2. Component will register default value, which we can update through ZK (or maybe later nice solr api)
    2.1. We can update component1-prop in ZK with node name, saying apply this value for that node. Then we should call listener for that component on that node only.

Side node, recently we introduced this behavior in prod, where we updated max-basic-queries limit on one solr-node. We watched that node for ~4/5 weeks and now we ready to update this limit on all the nodes. This solution require whole cluster restart, that's why we are looking for this feature in solr. Hope this will help.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for example , if there is a property component-a.property-x that component may watch that property explicitly at load time. if no other component has registered a watch on that property, then no callbacks are given to other components. Am I missing something?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

f there is a property component-a.property-x that component may watch that property explicitly at load time.

As long as component is registered, it should get callback. do you think it should register property as well?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is no concept of a "component" is Solr. So, the system would not know whom to give a callback if it does not explicitly register for a callback

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just name if we need to use feature flag. watch("test1", listener)

+ " \"node1\" : {\n"
+ " \"p1\" : \"v1\",\n"
+ " \"p2\" : \"v2\"\n"
+ " }\n"
+ " }\n"
+ "}"));

assertEquals(1, listener1.size());
assertEquals("v1", listener1.get("p1"));
assertEquals(2, all.size());
assertEquals("v1", all.get("p1"));
assertEquals("v2", all.get("p2"));
assertEquals(0, listener3.size());
}

@Test
@SuppressWarnings("unchecked")
public void testWatchedClusterProperties() {
WatchedClusterProperties wcp = new WatchedClusterProperties("node1");
NamedList<String> listener1 = new NamedList<>();
NamedList<String> all = new NamedList<>();
NamedList<String> listener3 = new NamedList<>();
wcp.watchProperty("p1", (key, value) -> listener1.add(key, value));
wcp.watchProperty(null, (key, value) -> all.add(key, value));
BiConsumer<String, String> p3Listener = (key, value) -> listener3.add(key, value);
wcp.watchProperty("p3", p3Listener);

wcp.onChange(
(Map<String, Object>)
Utils.fromJSONString(
"{\n"
+ " \"watched-properties\": {\n"
+ " \"p1\": \"v1\",\n"
+ " \"p2\": \"v2\"\n"
+ " }\n"
+ "}"));
assertEquals(1, listener1.size());
assertEquals("v1", listener1.get("p1"));
assertEquals(2, all.size());
assertEquals("v1", all.get("p1"));
assertEquals("v2", all.get("p2"));
assertEquals(0, listener3.size());
listener1.clear();
all.clear();
listener3.clear();
wcp.onChange(
(Map<String, Object>)
Utils.fromJSONString(
"{\n"
+ " \"watched-properties\": {\n"
+ " \"p1\": \"v1\",\n"
+ " \"p2\": \"v2\"\n"
+ " }\n"
+ "}"));
assertEquals(0, listener1.size());
assertEquals(0, all.size());
assertEquals(0, listener3.size());
wcp.onChange(
(Map<String, Object>)
Utils.fromJSONString(
"{\n" + " \"watched-properties\": {\n" + " \"p3\": \"v3\"\n" + " }\n" + "}"));
assertEquals(1, listener1.size());
assertEquals(null, listener1.get("p1"));
assertEquals(3, all.size());
assertEquals(1, listener3.size());
assertEquals("v3", listener3.get("p3"));
}
}
Loading