-
Notifications
You must be signed in to change notification settings - Fork 141
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Remove direct ClusterState access in LocalClusterState #2696
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,22 +5,19 @@ | |
|
||
package org.opensearch.sql.legacy.esdomain; | ||
|
||
import com.google.common.cache.Cache; | ||
import com.google.common.cache.CacheBuilder; | ||
import java.io.IOException; | ||
import java.util.Arrays; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Objects; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.ExecutionException; | ||
import java.util.function.Function; | ||
import java.util.function.Predicate; | ||
import java.util.concurrent.TimeUnit; | ||
import lombok.AccessLevel; | ||
import lombok.NoArgsConstructor; | ||
import lombok.NonNull; | ||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.opensearch.action.support.IndicesOptions; | ||
import org.opensearch.cluster.ClusterState; | ||
import org.opensearch.cluster.metadata.IndexNameExpressionResolver; | ||
import org.opensearch.client.Client; | ||
import org.opensearch.cluster.metadata.MappingMetadata; | ||
import org.opensearch.cluster.service.ClusterService; | ||
import org.opensearch.common.settings.Setting; | ||
import org.opensearch.index.IndexNotFoundException; | ||
|
@@ -39,30 +36,20 @@ | |
* across the plugin, ex. in rewriter, pretty formatter etc. | ||
* </ol> | ||
*/ | ||
@NoArgsConstructor(access = AccessLevel.PRIVATE) | ||
public class LocalClusterState { | ||
|
||
private static final Logger LOG = LogManager.getLogger(); | ||
|
||
private static final Function<String, Predicate<String>> ALL_FIELDS = | ||
(anyIndex -> (anyField -> true)); | ||
|
||
/** Singleton instance */ | ||
private static LocalClusterState INSTANCE; | ||
|
||
/** Current cluster state on local node */ | ||
private ClusterService clusterService; | ||
|
||
private OpenSearchSettings pluginSettings; | ||
|
||
/** Index name expression resolver to get concrete index name */ | ||
private IndexNameExpressionResolver resolver; | ||
private Client client; | ||
|
||
/** | ||
* Thread-safe mapping cache to save the computation of sourceAsMap() which is not lightweight as | ||
* thought Array cannot be used as key because hashCode() always return reference address, so | ||
* either use wrapper or List. | ||
*/ | ||
private final Cache<List<String>, IndexMappings> cache; | ||
private OpenSearchSettings pluginSettings; | ||
|
||
/** Latest setting value for each registered key. Thread-safe is required. */ | ||
private final Map<String, Object> latestSettings = new ConcurrentHashMap<>(); | ||
|
@@ -79,25 +66,33 @@ public static synchronized void state(LocalClusterState instance) { | |
INSTANCE = instance; | ||
} | ||
|
||
public void setClusterService(ClusterService clusterService) { | ||
/** | ||
* Sets the ClusterService used to receive ClusterSetting update notifications. | ||
* | ||
* @param clusterService The non-null cluster service instance. | ||
*/ | ||
public void setClusterService(@NonNull ClusterService clusterService) { | ||
this.clusterService = clusterService; | ||
} | ||
|
||
clusterService.addListener( | ||
event -> { | ||
if (event.metadataChanged()) { | ||
// State in cluster service is already changed to event.state() before listener fired | ||
if (LOG.isDebugEnabled()) { | ||
LOG.debug( | ||
"Metadata in cluster state changed: {}", | ||
new IndexMappings(clusterService.state().metadata())); | ||
} | ||
cache.invalidateAll(); | ||
} | ||
}); | ||
/** | ||
* Sets the Client used to interact with OpenSearch core. | ||
* | ||
* @param client The non-null client instance | ||
*/ | ||
public void setClient(@NonNull Client client) { | ||
this.client = client; | ||
} | ||
|
||
public void setPluginSettings(OpenSearchSettings settings) { | ||
/** | ||
* Sets the plugin's settings. | ||
* | ||
* @param settings The non-null plugin settings instance | ||
*/ | ||
public void setPluginSettings(@NonNull OpenSearchSettings settings) { | ||
|
||
this.pluginSettings = settings; | ||
|
||
for (Setting<?> setting : settings.getSettings()) { | ||
clusterService | ||
.getClusterSettings() | ||
|
@@ -112,14 +107,6 @@ public void setPluginSettings(OpenSearchSettings settings) { | |
} | ||
} | ||
|
||
public void setResolver(IndexNameExpressionResolver resolver) { | ||
this.resolver = resolver; | ||
} | ||
|
||
private LocalClusterState() { | ||
cache = CacheBuilder.newBuilder().maximumSize(100).build(); | ||
} | ||
|
||
/** | ||
* Get plugin setting value by key. Return default value if not configured explicitly. | ||
* | ||
|
@@ -132,39 +119,31 @@ public <T> T getSettingValue(Settings.Key key) { | |
return (T) latestSettings.getOrDefault(key.getKeyValue(), pluginSettings.getSettingValue(key)); | ||
} | ||
|
||
/** Get field mappings by index expressions. All types and fields are included in response. */ | ||
public IndexMappings getFieldMappings(String[] indices) { | ||
return getFieldMappings(indices, ALL_FIELDS); | ||
} | ||
|
||
/** | ||
* Get field mappings by index expressions, type and field filter. Because | ||
* IndexMetaData/MappingMetaData is hard to convert to FieldMappingMetaData, custom mapping domain | ||
* objects are being used here. In future, it should be moved to domain model layer for all | ||
* OpenSearch specific knowledge. | ||
* | ||
* <p>Note that cluster state may be change inside OpenSearch so it's possible to read different | ||
* state in 2 accesses to ClusterService.state() here. | ||
* Get field mappings by index expressions. Because IndexMetaData/MappingMetaData is hard to | ||
* convert to FieldMappingMetaData, custom mapping domain objects are being used here. In future, | ||
* it should be moved to domain model layer for all OpenSearch specific knowledge. | ||
* | ||
* @param indices index name expression | ||
* @param fieldFilter field filter predicate | ||
* @return index mapping(s) | ||
*/ | ||
private IndexMappings getFieldMappings( | ||
String[] indices, Function<String, Predicate<String>> fieldFilter) { | ||
Objects.requireNonNull(clusterService, "Cluster service is null"); | ||
Objects.requireNonNull(resolver, "Index name expression resolver is null"); | ||
public IndexMappings getFieldMappings(String[] indices) { | ||
Objects.requireNonNull(client, "Client is null"); | ||
|
||
try { | ||
ClusterState state = clusterService.state(); | ||
String[] concreteIndices = resolveIndexExpression(state, indices); | ||
|
||
IndexMappings mappings; | ||
if (fieldFilter == ALL_FIELDS) { | ||
mappings = findMappingsInCache(state, concreteIndices); | ||
} else { | ||
mappings = findMappings(state, concreteIndices, fieldFilter); | ||
} | ||
Map<String, MappingMetadata> mappingMetadata = | ||
client | ||
.admin() | ||
.indices() | ||
.prepareGetMappings(indices) | ||
.setLocal(true) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I presume this always gets mappings from local node, what if the data is stale? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the data is stale, it will behave as it does today as even today's access is resolved locally. That being said, it will fail requests with a 4xx as the fields are missing There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can accept this as it is the same as V2. Any thoughts on the long-term implications if schema freshness is critical? |
||
.setIndicesOptions(IndicesOptions.strictExpandOpen()) | ||
.execute() | ||
.actionGet(0, TimeUnit.NANOSECONDS) | ||
.mappings(); | ||
|
||
IndexMappings mappings = new IndexMappings(mappingMetadata); | ||
|
||
LOG.debug("Found mappings: {}", mappings); | ||
return mappings; | ||
|
@@ -175,36 +154,4 @@ private IndexMappings getFieldMappings( | |
"Failed to read mapping in cluster state for indices=" + Arrays.toString(indices), e); | ||
} | ||
} | ||
|
||
private String[] resolveIndexExpression(ClusterState state, String[] indices) { | ||
String[] concreteIndices = | ||
resolver.concreteIndexNames(state, IndicesOptions.strictExpandOpen(), true, indices); | ||
|
||
if (LOG.isDebugEnabled()) { | ||
LOG.debug( | ||
"Resolved index expression {} to concrete index names {}", | ||
Arrays.toString(indices), | ||
Arrays.toString(concreteIndices)); | ||
} | ||
return concreteIndices; | ||
} | ||
|
||
private IndexMappings findMappings( | ||
ClusterState state, String[] indices, Function<String, Predicate<String>> fieldFilter) | ||
throws IOException { | ||
LOG.debug("Cache didn't help. Load and parse mapping in cluster state"); | ||
return new IndexMappings(state.metadata().findMappings(indices, fieldFilter)); | ||
} | ||
|
||
private IndexMappings findMappingsInCache(ClusterState state, String[] indices) | ||
throws ExecutionException { | ||
LOG.debug("Looking for mapping in cache: {}", cache.asMap()); | ||
return cache.get(sortToList(indices), () -> findMappings(state, indices, ALL_FIELDS)); | ||
} | ||
|
||
private <T> List<T> sortToList(T[] array) { | ||
// Mostly array has single element | ||
Arrays.sort(array); | ||
return Arrays.asList(array); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Slightly outside the scope of this PR, but it's not immediately obvious why this class is registering a settings update consumer and maintaining it's local cache of settings in
latestSettings
. It appears to be duplicating what is already happening in OpenSearchSettings. I wonder ifgetSettingValue()
can be changed to just be a pass-through to OpenSearchSettings and all usage of the ClusterService can be removed from this class?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I get the feeling that you're right, but I also haven't spent enough time around it to be sure