From bc189aa666733194725033acb56c57c20074a45e Mon Sep 17 00:00:00 2001 From: Frank Dattalo Date: Fri, 24 May 2024 17:44:08 -0700 Subject: [PATCH] Remove Direct ClusterState access in LocalClusterState for IndexFieldMappings Signed-off-by: Frank Dattalo --- .../legacy/esdomain/LocalClusterState.java | 141 ++++++------------ .../unittest/LocalClusterStateTest.java | 54 +------ .../sql/legacy/util/CheckScriptContents.java | 60 ++++---- .../util/MultipleIndexClusterUtils.java | 59 +++++--- .../org/opensearch/sql/plugin/SQLPlugin.java | 2 +- 5 files changed, 111 insertions(+), 205 deletions(-) diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/esdomain/LocalClusterState.java b/legacy/src/main/java/org/opensearch/sql/legacy/esdomain/LocalClusterState.java index 1e6595bd32..6395b4712d 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/esdomain/LocalClusterState.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/esdomain/LocalClusterState.java @@ -5,22 +5,12 @@ package org.opensearch.sql.legacy.esdomain; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import java.io.IOException; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutionException; -import java.util.function.Function; -import java.util.function.Predicate; +import lombok.NonNull; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.action.support.IndicesOptions; -import org.opensearch.cluster.ClusterState; -import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.client.Client; +import org.opensearch.cluster.metadata.MappingMetadata; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.Setting; import org.opensearch.index.IndexNotFoundException; @@ -28,6 +18,12 @@ import org.opensearch.sql.legacy.esdomain.mapping.IndexMappings; import org.opensearch.sql.opensearch.setting.OpenSearchSettings; +import java.util.Arrays; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + /** * Local cluster state information which may be stale but help avoid blocking operation in NIO * thread. @@ -43,26 +39,15 @@ public class LocalClusterState { private static final Logger LOG = LogManager.getLogger(); - private static final Function> ALL_FIELDS = - (anyIndex -> (anyField -> true)); - /** Singleton instance */ private static LocalClusterState INSTANCE; /** Current cluster state on local node */ private ClusterService clusterService; - private OpenSearchSettings pluginSettings; + private Client client; - /** Index name expression resolver to get concrete index name */ - private IndexNameExpressionResolver resolver; - - /** - * Thread-safe mapping cache to save the computation of sourceAsMap() which is not lightweight as - * thought Array cannot be used as key because hashCode() always return reference address, so - * either use wrapper or List. - */ - private final Cache, IndexMappings> cache; + private OpenSearchSettings pluginSettings; /** Latest setting value for each registered key. Thread-safe is required. */ private final Map latestSettings = new ConcurrentHashMap<>(); @@ -79,25 +64,30 @@ public static synchronized void state(LocalClusterState instance) { INSTANCE = instance; } - public void setClusterService(ClusterService clusterService) { + /** + * Sets the ClusterService used to receive ClusterSetting update notifications. + * @param clusterService The non-null cluster service instance. + */ + public void setClusterService(@NonNull ClusterService clusterService) { this.clusterService = clusterService; + } - clusterService.addListener( - event -> { - if (event.metadataChanged()) { - // State in cluster service is already changed to event.state() before listener fired - if (LOG.isDebugEnabled()) { - LOG.debug( - "Metadata in cluster state changed: {}", - new IndexMappings(clusterService.state().metadata())); - } - cache.invalidateAll(); - } - }); + /** + * Sets the Client used to interact with OpenSearch core. + * @param client The non-null client instance + */ + public void setClient(@NonNull Client client) { + this.client = client; } - public void setPluginSettings(OpenSearchSettings settings) { + /** + * Sets the plugin's settings. + * @param settings The non-null plugin settings instance + */ + public void setPluginSettings(@NonNull OpenSearchSettings settings) { + this.pluginSettings = settings; + for (Setting setting : settings.getSettings()) { clusterService .getClusterSettings() @@ -112,12 +102,7 @@ public void setPluginSettings(OpenSearchSettings settings) { } } - public void setResolver(IndexNameExpressionResolver resolver) { - this.resolver = resolver; - } - private LocalClusterState() { - cache = CacheBuilder.newBuilder().maximumSize(100).build(); } /** @@ -132,39 +117,30 @@ public T getSettingValue(Settings.Key key) { return (T) latestSettings.getOrDefault(key.getKeyValue(), pluginSettings.getSettingValue(key)); } - /** Get field mappings by index expressions. All types and fields are included in response. */ - public IndexMappings getFieldMappings(String[] indices) { - return getFieldMappings(indices, ALL_FIELDS); - } - /** - * Get field mappings by index expressions, type and field filter. Because + * Get field mappings by index expressions. Because * IndexMetaData/MappingMetaData is hard to convert to FieldMappingMetaData, custom mapping domain * objects are being used here. In future, it should be moved to domain model layer for all * OpenSearch specific knowledge. * - *

Note that cluster state may be change inside OpenSearch so it's possible to read different - * state in 2 accesses to ClusterService.state() here. - * * @param indices index name expression - * @param fieldFilter field filter predicate * @return index mapping(s) */ - private IndexMappings getFieldMappings( - String[] indices, Function> fieldFilter) { - Objects.requireNonNull(clusterService, "Cluster service is null"); - Objects.requireNonNull(resolver, "Index name expression resolver is null"); + public IndexMappings getFieldMappings(String[] indices) { + Objects.requireNonNull(client, "Client is null"); try { - ClusterState state = clusterService.state(); - String[] concreteIndices = resolveIndexExpression(state, indices); - IndexMappings mappings; - if (fieldFilter == ALL_FIELDS) { - mappings = findMappingsInCache(state, concreteIndices); - } else { - mappings = findMappings(state, concreteIndices, fieldFilter); - } + Map mappingMetadata = client.admin() + .indices() + .prepareGetMappings(indices) + .setLocal(true) + .setIndicesOptions(IndicesOptions.strictExpandOpen()) + .execute() + .actionGet(0, TimeUnit.NANOSECONDS) + .mappings(); + + IndexMappings mappings = new IndexMappings(mappingMetadata); LOG.debug("Found mappings: {}", mappings); return mappings; @@ -176,35 +152,4 @@ private IndexMappings getFieldMappings( } } - private String[] resolveIndexExpression(ClusterState state, String[] indices) { - String[] concreteIndices = - resolver.concreteIndexNames(state, IndicesOptions.strictExpandOpen(), true, indices); - - if (LOG.isDebugEnabled()) { - LOG.debug( - "Resolved index expression {} to concrete index names {}", - Arrays.toString(indices), - Arrays.toString(concreteIndices)); - } - return concreteIndices; - } - - private IndexMappings findMappings( - ClusterState state, String[] indices, Function> fieldFilter) - throws IOException { - LOG.debug("Cache didn't help. Load and parse mapping in cluster state"); - return new IndexMappings(state.metadata().findMappings(indices, fieldFilter)); - } - - private IndexMappings findMappingsInCache(ClusterState state, String[] indices) - throws ExecutionException { - LOG.debug("Looking for mapping in cache: {}", cache.asMap()); - return cache.get(sortToList(indices), () -> findMappings(state, indices, ALL_FIELDS)); - } - - private List sortToList(T[] array) { - // Mostly array has single element - Arrays.sort(array); - return Arrays.asList(array); - } } diff --git a/legacy/src/test/java/org/opensearch/sql/legacy/unittest/LocalClusterStateTest.java b/legacy/src/test/java/org/opensearch/sql/legacy/unittest/LocalClusterStateTest.java index 6ea45ed6c8..e0ab935944 100644 --- a/legacy/src/test/java/org/opensearch/sql/legacy/unittest/LocalClusterStateTest.java +++ b/legacy/src/test/java/org/opensearch/sql/legacy/unittest/LocalClusterStateTest.java @@ -5,27 +5,12 @@ package org.opensearch.sql.legacy.unittest; -import static org.junit.Assert.assertEquals; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; -import static org.opensearch.sql.legacy.util.CheckScriptContents.mockClusterService; -import static org.opensearch.sql.legacy.util.CheckScriptContents.mockLocalClusterState; - -import java.io.IOException; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; -import org.opensearch.cluster.ClusterChangedEvent; import org.opensearch.cluster.ClusterName; -import org.opensearch.cluster.ClusterStateListener; -import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.legacy.esdomain.LocalClusterState; @@ -34,6 +19,10 @@ import org.opensearch.sql.legacy.util.TestsConstants; import org.opensearch.sql.opensearch.setting.OpenSearchSettings; +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.when; +import static org.opensearch.sql.legacy.util.CheckScriptContents.mockLocalClusterState; + /** Local cluster state testing without covering OpenSearch logic, ex. resolve index pattern. */ public class LocalClusterStateTest { @@ -150,41 +139,6 @@ public void getMappingForInvalidField() { Assert.assertNull(fieldMappings.mapping("manager.name.first.uppercase")); } - @Test - public void getMappingFromCache() throws IOException { - // Mock here again for verification below and mock addListener() - ClusterService mockService = mockClusterService(MAPPING); - ClusterStateListener[] listener = new ClusterStateListener[1]; // Trick to access inside lambda - doAnswer( - invocation -> { - listener[0] = (ClusterStateListener) invocation.getArguments()[0]; - return null; - }) - .when(mockService) - .addListener(any()); - LocalClusterState.state().setClusterService(mockService); - - // 1.Actual findMappings be invoked only once - for (int i = 0; i < 10; i++) { - LocalClusterState.state().getFieldMappings(new String[] {INDEX_NAME}); - } - verify(mockService.state().metadata(), times(1)) - .findMappings(eq(new String[] {INDEX_NAME}), any()); - - // 2.Fire cluster state change event - Assert.assertNotNull(listener[0]); - ClusterChangedEvent mockEvent = mock(ClusterChangedEvent.class); - when(mockEvent.metadataChanged()).thenReturn(true); - listener[0].clusterChanged(mockEvent); - - // 3.Cache should be invalidated and call findMapping another time only - for (int i = 0; i < 5; i++) { - LocalClusterState.state().getFieldMappings(new String[] {INDEX_NAME}); - } - verify(mockService.state().metadata(), times(2)) - .findMappings(eq(new String[] {INDEX_NAME}), any()); - } - @Test public void getDefaultValueForQuerySlowLog() { when(clusterSettings.get(ClusterName.CLUSTER_NAME_SETTING)).thenReturn(ClusterName.DEFAULT); diff --git a/legacy/src/test/java/org/opensearch/sql/legacy/util/CheckScriptContents.java b/legacy/src/test/java/org/opensearch/sql/legacy/util/CheckScriptContents.java index 7578720624..d7f6e075c9 100644 --- a/legacy/src/test/java/org/opensearch/sql/legacy/util/CheckScriptContents.java +++ b/legacy/src/test/java/org/opensearch/sql/legacy/util/CheckScriptContents.java @@ -8,6 +8,7 @@ import static java.util.Collections.emptyList; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doReturn; @@ -24,6 +25,9 @@ import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; + +import lombok.SneakyThrows; +import org.mockito.Mockito; import org.mockito.stubbing.Answer; import org.opensearch.action.admin.indices.mapping.get.GetFieldMappingsRequest; import org.opensearch.action.admin.indices.mapping.get.GetFieldMappingsResponse; @@ -214,45 +218,31 @@ public static XContentParser createParser(String mappings) throws IOException { NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, mappings); } + @SneakyThrows public static void mockLocalClusterState(String mappings) { - LocalClusterState.state().setClusterService(mockClusterService(mappings)); - LocalClusterState.state().setResolver(mockIndexNameExpressionResolver()); - LocalClusterState.state().setPluginSettings(mockPluginSettings()); - } - public static ClusterService mockClusterService(String mappings) { - ClusterService mockService = mock(ClusterService.class); - ClusterState mockState = mock(ClusterState.class); - Metadata mockMetaData = mock(Metadata.class); + Client client = Mockito.mock(Client.class, Mockito.RETURNS_DEEP_STUBS); - when(mockService.state()).thenReturn(mockState); - when(mockState.metadata()).thenReturn(mockMetaData); - try { - when(mockMetaData.findMappings(any(), any())) - .thenReturn( - Map.of( - TestsConstants.TEST_INDEX_BANK, - IndexMetadata.fromXContent(createParser(mappings)).mapping())); - } catch (IOException e) { - throw new IllegalStateException(e); - } - return mockService; - } + when( + client + .admin() + .indices() + .prepareGetMappings(any(String[].class)) + .setLocal(anyBoolean()) + .setIndicesOptions(any()) + .execute() + .actionGet(anyLong(), any()) + .mappings() + ).thenReturn( + Map.of( + TestsConstants.TEST_INDEX_BANK, + IndexMetadata.fromXContent(createParser(mappings)).mapping() + ) + ); - public static IndexNameExpressionResolver mockIndexNameExpressionResolver() { - IndexNameExpressionResolver mockResolver = mock(IndexNameExpressionResolver.class); - when(mockResolver.concreteIndexNames(any(), any(), anyBoolean(), anyString())) - .thenAnswer( - (Answer) - invocation -> { - // Return index expression directly without resolving - Object indexExprs = invocation.getArguments()[3]; - if (indexExprs instanceof String) { - return new String[] {(String) indexExprs}; - } - return (String[]) indexExprs; - }); - return mockResolver; + LocalClusterState.state().setClusterService(mock(ClusterService.class)); + LocalClusterState.state().setPluginSettings(mockPluginSettings()); + LocalClusterState.state().setClient(client); } public static OpenSearchSettings mockPluginSettings() { diff --git a/legacy/src/test/java/org/opensearch/sql/legacy/util/MultipleIndexClusterUtils.java b/legacy/src/test/java/org/opensearch/sql/legacy/util/MultipleIndexClusterUtils.java index 13a4ae3a6b..221620754b 100644 --- a/legacy/src/test/java/org/opensearch/sql/legacy/util/MultipleIndexClusterUtils.java +++ b/legacy/src/test/java/org/opensearch/sql/legacy/util/MultipleIndexClusterUtils.java @@ -6,16 +6,25 @@ package org.opensearch.sql.legacy.util; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.opensearch.sql.legacy.util.CheckScriptContents.createParser; -import static org.opensearch.sql.legacy.util.CheckScriptContents.mockIndexNameExpressionResolver; import static org.opensearch.sql.legacy.util.CheckScriptContents.mockPluginSettings; import java.io.IOException; import java.util.Map; import java.util.stream.Collectors; + +import lombok.SneakyThrows; +import org.mockito.ArgumentMatcher; +import org.mockito.ArgumentMatchers; +import org.mockito.Mockito; +import org.mockito.stubbing.Answer; +import org.opensearch.client.Client; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.MappingMetadata; @@ -152,29 +161,37 @@ public static void mockMultipleIndexEnv() { INDEX_ACCOUNT_2_MAPPING)))); } + @SneakyThrows public static void mockLocalClusterState(Map> indexMapping) { - LocalClusterState.state().setClusterService(mockClusterService(indexMapping)); - LocalClusterState.state().setResolver(mockIndexNameExpressionResolver()); - LocalClusterState.state().setPluginSettings(mockPluginSettings()); - } - public static ClusterService mockClusterService( - Map> indexMapping) { - ClusterService mockService = mock(ClusterService.class); - ClusterState mockState = mock(ClusterState.class); - Metadata mockMetaData = mock(Metadata.class); + Client client = Mockito.mock(Client.class, Mockito.RETURNS_DEEP_STUBS); - when(mockService.state()).thenReturn(mockState); - when(mockState.metadata()).thenReturn(mockMetaData); - try { - for (var entry : indexMapping.entrySet()) { - when(mockMetaData.findMappings(eq(new String[] {entry.getKey()}), any())) - .thenReturn(entry.getValue()); - } - } catch (IOException e) { - throw new IllegalStateException(e); - } - return mockService; + ThreadLocal callerIndexExpression = new ThreadLocal<>(); + ArgumentMatcher preserveIndexMappingsFromCaller = arg -> { + callerIndexExpression.set((String) arg); + return true; + }; + Answer> getIndexMappingsForCaller = invoke -> { + return indexMapping.get(callerIndexExpression.get()); + }; + + when( + client + .admin() + .indices() + .prepareGetMappings((String[]) argThat(preserveIndexMappingsFromCaller)) + .setLocal(anyBoolean()) + .setIndicesOptions(any()) + .execute() + .actionGet(anyLong(), any()) + .mappings() + ).thenAnswer( + getIndexMappingsForCaller + ); + + LocalClusterState.state().setClusterService(mock(ClusterService.class)); + LocalClusterState.state().setPluginSettings(mockPluginSettings()); + LocalClusterState.state().setClient(client); } private static Map buildIndexMapping(Map indexMapping) { diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java index 16fd46c253..cfce8e9cfe 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java @@ -129,7 +129,6 @@ public List getRestHandlers( Objects.requireNonNull(clusterService, "Cluster service is required"); Objects.requireNonNull(pluginSettings, "Cluster settings is required"); - LocalClusterState.state().setResolver(indexNameExpressionResolver); Metrics.getInstance().registerDefaultMetrics(); return Arrays.asList( @@ -202,6 +201,7 @@ public Collection createComponents( dataSourceService.createDataSource(defaultOpenSearchDataSourceMetadata()); LocalClusterState.state().setClusterService(clusterService); LocalClusterState.state().setPluginSettings((OpenSearchSettings) pluginSettings); + LocalClusterState.state().setClient(client); ModulesBuilder modules = new ModulesBuilder(); modules.add(new OpenSearchPluginModule()); modules.add(