diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/esdomain/LocalClusterState.java b/legacy/src/main/java/org/opensearch/sql/legacy/esdomain/LocalClusterState.java index cc91fb8b39..786c9310df 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/esdomain/LocalClusterState.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/esdomain/LocalClusterState.java @@ -5,22 +5,19 @@ package org.opensearch.sql.legacy.esdomain; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import java.io.IOException; import java.util.Arrays; -import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutionException; -import java.util.function.Function; -import java.util.function.Predicate; +import java.util.concurrent.TimeUnit; +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import lombok.NonNull; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.action.support.IndicesOptions; -import org.opensearch.cluster.ClusterState; -import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.client.Client; +import org.opensearch.cluster.metadata.MappingMetadata; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.Setting; import org.opensearch.index.IndexNotFoundException; @@ -38,30 +35,20 @@ *

2) Why injection by AbstractModule doesn't work here? Because this state needs to be used * across the plugin, ex. in rewriter, pretty formatter etc. */ +@NoArgsConstructor(access = AccessLevel.PRIVATE) public class LocalClusterState { private static final Logger LOG = LogManager.getLogger(); - private static final Function> ALL_FIELDS = - (anyIndex -> (anyField -> true)); - /** Singleton instance */ private static LocalClusterState INSTANCE; /** Current cluster state on local node */ private ClusterService clusterService; - private OpenSearchSettings pluginSettings; - - /** Index name expression resolver to get concrete index name */ - private IndexNameExpressionResolver resolver; + private Client client; - /** - * Thread-safe mapping cache to save the computation of sourceAsMap() which is not lightweight as - * thought Array cannot be used as key because hashCode() always return reference address, so - * either use wrapper or List. - */ - private final Cache, IndexMappings> cache; + private OpenSearchSettings pluginSettings; /** Latest setting value for each registered key. Thread-safe is required. */ private final Map latestSettings = new ConcurrentHashMap<>(); @@ -78,25 +65,33 @@ public static synchronized void state(LocalClusterState instance) { INSTANCE = instance; } - public void setClusterService(ClusterService clusterService) { + /** + * Sets the ClusterService used to receive ClusterSetting update notifications. + * + * @param clusterService The non-null cluster service instance. + */ + public void setClusterService(@NonNull ClusterService clusterService) { this.clusterService = clusterService; + } - clusterService.addListener( - event -> { - if (event.metadataChanged()) { - // State in cluster service is already changed to event.state() before listener fired - if (LOG.isDebugEnabled()) { - LOG.debug( - "Metadata in cluster state changed: {}", - new IndexMappings(clusterService.state().metadata())); - } - cache.invalidateAll(); - } - }); + /** + * Sets the Client used to interact with OpenSearch core. + * + * @param client The non-null client instance + */ + public void setClient(@NonNull Client client) { + this.client = client; } - public void setPluginSettings(OpenSearchSettings settings) { + /** + * Sets the plugin's settings. + * + * @param settings The non-null plugin settings instance + */ + public void setPluginSettings(@NonNull OpenSearchSettings settings) { + this.pluginSettings = settings; + for (Setting setting : settings.getSettings()) { clusterService .getClusterSettings() @@ -111,14 +106,6 @@ public void setPluginSettings(OpenSearchSettings settings) { } } - public void setResolver(IndexNameExpressionResolver resolver) { - this.resolver = resolver; - } - - private LocalClusterState() { - cache = CacheBuilder.newBuilder().maximumSize(100).build(); - } - /** * Get plugin setting value by key. Return default value if not configured explicitly. * @@ -131,39 +118,31 @@ public T getSettingValue(Settings.Key key) { return (T) latestSettings.getOrDefault(key.getKeyValue(), pluginSettings.getSettingValue(key)); } - /** Get field mappings by index expressions. All types and fields are included in response. */ - public IndexMappings getFieldMappings(String[] indices) { - return getFieldMappings(indices, ALL_FIELDS); - } - /** - * Get field mappings by index expressions, type and field filter. Because - * IndexMetaData/MappingMetaData is hard to convert to FieldMappingMetaData, custom mapping domain - * objects are being used here. In future, it should be moved to domain model layer for all - * OpenSearch specific knowledge. - * - *

Note that cluster state may be change inside OpenSearch so it's possible to read different - * state in 2 accesses to ClusterService.state() here. + * Get field mappings by index expressions. Because IndexMetaData/MappingMetaData is hard to + * convert to FieldMappingMetaData, custom mapping domain objects are being used here. In future, + * it should be moved to domain model layer for all OpenSearch specific knowledge. * * @param indices index name expression - * @param fieldFilter field filter predicate * @return index mapping(s) */ - private IndexMappings getFieldMappings( - String[] indices, Function> fieldFilter) { - Objects.requireNonNull(clusterService, "Cluster service is null"); - Objects.requireNonNull(resolver, "Index name expression resolver is null"); + public IndexMappings getFieldMappings(String[] indices) { + Objects.requireNonNull(client, "Client is null"); try { - ClusterState state = clusterService.state(); - String[] concreteIndices = resolveIndexExpression(state, indices); - IndexMappings mappings; - if (fieldFilter == ALL_FIELDS) { - mappings = findMappingsInCache(state, concreteIndices); - } else { - mappings = findMappings(state, concreteIndices, fieldFilter); - } + Map mappingMetadata = + client + .admin() + .indices() + .prepareGetMappings(indices) + .setLocal(true) + .setIndicesOptions(IndicesOptions.strictExpandOpen()) + .execute() + .actionGet(0, TimeUnit.NANOSECONDS) + .mappings(); + + IndexMappings mappings = new IndexMappings(mappingMetadata); LOG.debug("Found mappings: {}", mappings); return mappings; @@ -174,36 +153,4 @@ private IndexMappings getFieldMappings( "Failed to read mapping in cluster state for indices=" + Arrays.toString(indices), e); } } - - private String[] resolveIndexExpression(ClusterState state, String[] indices) { - String[] concreteIndices = - resolver.concreteIndexNames(state, IndicesOptions.strictExpandOpen(), true, indices); - - if (LOG.isDebugEnabled()) { - LOG.debug( - "Resolved index expression {} to concrete index names {}", - Arrays.toString(indices), - Arrays.toString(concreteIndices)); - } - return concreteIndices; - } - - private IndexMappings findMappings( - ClusterState state, String[] indices, Function> fieldFilter) - throws IOException { - LOG.debug("Cache didn't help. Load and parse mapping in cluster state"); - return new IndexMappings(state.metadata().findMappings(indices, fieldFilter)); - } - - private IndexMappings findMappingsInCache(ClusterState state, String[] indices) - throws ExecutionException { - LOG.debug("Looking for mapping in cache: {}", cache.asMap()); - return cache.get(sortToList(indices), () -> findMappings(state, indices, ALL_FIELDS)); - } - - private List sortToList(T[] array) { - // Mostly array has single element - Arrays.sort(array); - return Arrays.asList(array); - } } diff --git a/legacy/src/test/java/org/opensearch/sql/legacy/unittest/LocalClusterStateTest.java b/legacy/src/test/java/org/opensearch/sql/legacy/unittest/LocalClusterStateTest.java index 49c95fa23e..8e5c31d036 100644 --- a/legacy/src/test/java/org/opensearch/sql/legacy/unittest/LocalClusterStateTest.java +++ b/legacy/src/test/java/org/opensearch/sql/legacy/unittest/LocalClusterStateTest.java @@ -6,26 +6,15 @@ package org.opensearch.sql.legacy.unittest; import static org.junit.Assert.assertEquals; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import static org.opensearch.sql.legacy.util.CheckScriptContents.mockClusterService; import static org.opensearch.sql.legacy.util.CheckScriptContents.mockLocalClusterState; -import java.io.IOException; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; -import org.opensearch.cluster.ClusterChangedEvent; import org.opensearch.cluster.ClusterName; -import org.opensearch.cluster.ClusterStateListener; -import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.legacy.esdomain.LocalClusterState; @@ -149,41 +138,6 @@ public void getMappingForInvalidField() { Assert.assertNull(fieldMappings.mapping("manager.name.first.uppercase")); } - @Test - public void getMappingFromCache() throws IOException { - // Mock here again for verification below and mock addListener() - ClusterService mockService = mockClusterService(MAPPING); - ClusterStateListener[] listener = new ClusterStateListener[1]; // Trick to access inside lambda - doAnswer( - invocation -> { - listener[0] = (ClusterStateListener) invocation.getArguments()[0]; - return null; - }) - .when(mockService) - .addListener(any()); - LocalClusterState.state().setClusterService(mockService); - - // 1.Actual findMappings be invoked only once - for (int i = 0; i < 10; i++) { - LocalClusterState.state().getFieldMappings(new String[] {INDEX_NAME}); - } - verify(mockService.state().metadata(), times(1)) - .findMappings(eq(new String[] {INDEX_NAME}), any()); - - // 2.Fire cluster state change event - Assert.assertNotNull(listener[0]); - ClusterChangedEvent mockEvent = mock(ClusterChangedEvent.class); - when(mockEvent.metadataChanged()).thenReturn(true); - listener[0].clusterChanged(mockEvent); - - // 3.Cache should be invalidated and call findMapping another time only - for (int i = 0; i < 5; i++) { - LocalClusterState.state().getFieldMappings(new String[] {INDEX_NAME}); - } - verify(mockService.state().metadata(), times(2)) - .findMappings(eq(new String[] {INDEX_NAME}), any()); - } - @Test public void getDefaultValueForQuerySlowLog() { when(clusterSettings.get(ClusterName.CLUSTER_NAME_SETTING)).thenReturn(ClusterName.DEFAULT); diff --git a/legacy/src/test/java/org/opensearch/sql/legacy/util/CheckScriptContents.java b/legacy/src/test/java/org/opensearch/sql/legacy/util/CheckScriptContents.java index 5f0e07aa35..76347c5048 100644 --- a/legacy/src/test/java/org/opensearch/sql/legacy/util/CheckScriptContents.java +++ b/legacy/src/test/java/org/opensearch/sql/legacy/util/CheckScriptContents.java @@ -8,7 +8,7 @@ import static java.util.Collections.emptyList; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.anyBoolean; -import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -24,17 +24,15 @@ import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; -import org.mockito.stubbing.Answer; +import lombok.SneakyThrows; +import org.mockito.Mockito; import org.opensearch.action.admin.indices.mapping.get.GetFieldMappingsRequest; import org.opensearch.action.admin.indices.mapping.get.GetFieldMappingsResponse; import org.opensearch.action.search.SearchRequestBuilder; import org.opensearch.client.AdminClient; import org.opensearch.client.Client; import org.opensearch.client.IndicesAdminClient; -import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; -import org.opensearch.cluster.metadata.IndexNameExpressionResolver; -import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.action.ActionFuture; import org.opensearch.common.xcontent.XContentType; @@ -213,45 +211,28 @@ public static XContentParser createParser(String mappings) throws IOException { NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, mappings); } + @SneakyThrows public static void mockLocalClusterState(String mappings) { - LocalClusterState.state().setClusterService(mockClusterService(mappings)); - LocalClusterState.state().setResolver(mockIndexNameExpressionResolver()); - LocalClusterState.state().setPluginSettings(mockPluginSettings()); - } - - public static ClusterService mockClusterService(String mappings) { - ClusterService mockService = mock(ClusterService.class); - ClusterState mockState = mock(ClusterState.class); - Metadata mockMetaData = mock(Metadata.class); - - when(mockService.state()).thenReturn(mockState); - when(mockState.metadata()).thenReturn(mockMetaData); - try { - when(mockMetaData.findMappings(any(), any())) - .thenReturn( - Map.of( - TestsConstants.TEST_INDEX_BANK, - IndexMetadata.fromXContent(createParser(mappings)).mapping())); - } catch (IOException e) { - throw new IllegalStateException(e); - } - return mockService; - } - public static IndexNameExpressionResolver mockIndexNameExpressionResolver() { - IndexNameExpressionResolver mockResolver = mock(IndexNameExpressionResolver.class); - when(mockResolver.concreteIndexNames(any(), any(), anyBoolean(), anyString())) - .thenAnswer( - (Answer) - invocation -> { - // Return index expression directly without resolving - Object indexExprs = invocation.getArguments()[3]; - if (indexExprs instanceof String) { - return new String[] {(String) indexExprs}; - } - return (String[]) indexExprs; - }); - return mockResolver; + Client client = Mockito.mock(Client.class, Mockito.RETURNS_DEEP_STUBS); + + when(client + .admin() + .indices() + .prepareGetMappings(any(String[].class)) + .setLocal(anyBoolean()) + .setIndicesOptions(any()) + .execute() + .actionGet(anyLong(), any()) + .mappings()) + .thenReturn( + Map.of( + TestsConstants.TEST_INDEX_BANK, + IndexMetadata.fromXContent(createParser(mappings)).mapping())); + + LocalClusterState.state().setClusterService(mock(ClusterService.class)); + LocalClusterState.state().setPluginSettings(mockPluginSettings()); + LocalClusterState.state().setClient(client); } public static OpenSearchSettings mockPluginSettings() { diff --git a/legacy/src/test/java/org/opensearch/sql/legacy/util/MultipleIndexClusterUtils.java b/legacy/src/test/java/org/opensearch/sql/legacy/util/MultipleIndexClusterUtils.java index 42620c11a6..b483ef0852 100644 --- a/legacy/src/test/java/org/opensearch/sql/legacy/util/MultipleIndexClusterUtils.java +++ b/legacy/src/test/java/org/opensearch/sql/legacy/util/MultipleIndexClusterUtils.java @@ -6,20 +6,24 @@ package org.opensearch.sql.legacy.util; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.opensearch.sql.legacy.util.CheckScriptContents.createParser; -import static org.opensearch.sql.legacy.util.CheckScriptContents.mockIndexNameExpressionResolver; import static org.opensearch.sql.legacy.util.CheckScriptContents.mockPluginSettings; import java.io.IOException; import java.util.Map; import java.util.stream.Collectors; -import org.opensearch.cluster.ClusterState; +import lombok.SneakyThrows; +import org.mockito.ArgumentMatcher; +import org.mockito.Mockito; +import org.mockito.stubbing.Answer; +import org.opensearch.client.Client; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.MappingMetadata; -import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.service.ClusterService; import org.opensearch.sql.legacy.esdomain.LocalClusterState; @@ -150,29 +154,36 @@ public static void mockMultipleIndexEnv() { INDEX_ACCOUNT_2_MAPPING)))); } + @SneakyThrows public static void mockLocalClusterState(Map> indexMapping) { - LocalClusterState.state().setClusterService(mockClusterService(indexMapping)); - LocalClusterState.state().setResolver(mockIndexNameExpressionResolver()); - LocalClusterState.state().setPluginSettings(mockPluginSettings()); - } - public static ClusterService mockClusterService( - Map> indexMapping) { - ClusterService mockService = mock(ClusterService.class); - ClusterState mockState = mock(ClusterState.class); - Metadata mockMetaData = mock(Metadata.class); + Client client = Mockito.mock(Client.class, Mockito.RETURNS_DEEP_STUBS); - when(mockService.state()).thenReturn(mockState); - when(mockState.metadata()).thenReturn(mockMetaData); - try { - for (var entry : indexMapping.entrySet()) { - when(mockMetaData.findMappings(eq(new String[] {entry.getKey()}), any())) - .thenReturn(entry.getValue()); - } - } catch (IOException e) { - throw new IllegalStateException(e); - } - return mockService; + ThreadLocal callerIndexExpression = new ThreadLocal<>(); + ArgumentMatcher preserveIndexMappingsFromCaller = + arg -> { + callerIndexExpression.set((String) arg); + return true; + }; + Answer> getIndexMappingsForCaller = + invoke -> { + return indexMapping.get(callerIndexExpression.get()); + }; + + when(client + .admin() + .indices() + .prepareGetMappings((String[]) argThat(preserveIndexMappingsFromCaller)) + .setLocal(anyBoolean()) + .setIndicesOptions(any()) + .execute() + .actionGet(anyLong(), any()) + .mappings()) + .thenAnswer(getIndexMappingsForCaller); + + LocalClusterState.state().setClusterService(mock(ClusterService.class)); + LocalClusterState.state().setPluginSettings(mockPluginSettings()); + LocalClusterState.state().setClient(client); } private static Map buildIndexMapping(Map indexMapping) { diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java index 16fd46c253..cfce8e9cfe 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java @@ -129,7 +129,6 @@ public List getRestHandlers( Objects.requireNonNull(clusterService, "Cluster service is required"); Objects.requireNonNull(pluginSettings, "Cluster settings is required"); - LocalClusterState.state().setResolver(indexNameExpressionResolver); Metrics.getInstance().registerDefaultMetrics(); return Arrays.asList( @@ -202,6 +201,7 @@ public Collection createComponents( dataSourceService.createDataSource(defaultOpenSearchDataSourceMetadata()); LocalClusterState.state().setClusterService(clusterService); LocalClusterState.state().setPluginSettings((OpenSearchSettings) pluginSettings); + LocalClusterState.state().setClient(client); ModulesBuilder modules = new ModulesBuilder(); modules.add(new OpenSearchPluginModule()); modules.add(