Skip to content

Commit

Permalink
[Backport 2.x] Remove direct ClusterState access in LocalClusterState #…
Browse files Browse the repository at this point in the history
…2717

(cherry picked from commit 3f1e3bd)

Signed-off-by: Frank Dattalo <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent 23541b4 commit 5f2a137
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 214 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,19 @@

package org.opensearch.sql.legacy.esdomain;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.concurrent.TimeUnit;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.MappingMetadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Setting;
import org.opensearch.index.IndexNotFoundException;
Expand All @@ -38,30 +35,20 @@
* <p>2) Why injection by AbstractModule doesn't work here? Because this state needs to be used
* across the plugin, ex. in rewriter, pretty formatter etc.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class LocalClusterState {

private static final Logger LOG = LogManager.getLogger();

private static final Function<String, Predicate<String>> ALL_FIELDS =
(anyIndex -> (anyField -> true));

/** Singleton instance */
private static LocalClusterState INSTANCE;

/** Current cluster state on local node */
private ClusterService clusterService;

private OpenSearchSettings pluginSettings;

/** Index name expression resolver to get concrete index name */
private IndexNameExpressionResolver resolver;
private Client client;

/**
* Thread-safe mapping cache to save the computation of sourceAsMap() which is not lightweight as
* thought Array cannot be used as key because hashCode() always return reference address, so
* either use wrapper or List.
*/
private final Cache<List<String>, IndexMappings> cache;
private OpenSearchSettings pluginSettings;

/** Latest setting value for each registered key. Thread-safe is required. */
private final Map<String, Object> latestSettings = new ConcurrentHashMap<>();
Expand All @@ -78,25 +65,33 @@ public static synchronized void state(LocalClusterState instance) {
INSTANCE = instance;
}

public void setClusterService(ClusterService clusterService) {
/**
* Sets the ClusterService used to receive ClusterSetting update notifications.
*
* @param clusterService The non-null cluster service instance.
*/
public void setClusterService(@NonNull ClusterService clusterService) {
this.clusterService = clusterService;
}

clusterService.addListener(
event -> {
if (event.metadataChanged()) {
// State in cluster service is already changed to event.state() before listener fired
if (LOG.isDebugEnabled()) {
LOG.debug(
"Metadata in cluster state changed: {}",
new IndexMappings(clusterService.state().metadata()));
}
cache.invalidateAll();
}
});
/**
* Sets the Client used to interact with OpenSearch core.
*
* @param client The non-null client instance
*/
public void setClient(@NonNull Client client) {
this.client = client;
}

public void setPluginSettings(OpenSearchSettings settings) {
/**
* Sets the plugin's settings.
*
* @param settings The non-null plugin settings instance
*/
public void setPluginSettings(@NonNull OpenSearchSettings settings) {

this.pluginSettings = settings;

for (Setting<?> setting : settings.getSettings()) {
clusterService
.getClusterSettings()
Expand All @@ -111,14 +106,6 @@ public void setPluginSettings(OpenSearchSettings settings) {
}
}

public void setResolver(IndexNameExpressionResolver resolver) {
this.resolver = resolver;
}

private LocalClusterState() {
cache = CacheBuilder.newBuilder().maximumSize(100).build();
}

/**
* Get plugin setting value by key. Return default value if not configured explicitly.
*
Expand All @@ -131,39 +118,31 @@ public <T> T getSettingValue(Settings.Key key) {
return (T) latestSettings.getOrDefault(key.getKeyValue(), pluginSettings.getSettingValue(key));
}

/** Get field mappings by index expressions. All types and fields are included in response. */
public IndexMappings getFieldMappings(String[] indices) {
return getFieldMappings(indices, ALL_FIELDS);
}

/**
* Get field mappings by index expressions, type and field filter. Because
* IndexMetaData/MappingMetaData is hard to convert to FieldMappingMetaData, custom mapping domain
* objects are being used here. In future, it should be moved to domain model layer for all
* OpenSearch specific knowledge.
*
* <p>Note that cluster state may be change inside OpenSearch so it's possible to read different
* state in 2 accesses to ClusterService.state() here.
* Get field mappings by index expressions. Because IndexMetaData/MappingMetaData is hard to
* convert to FieldMappingMetaData, custom mapping domain objects are being used here. In future,
* it should be moved to domain model layer for all OpenSearch specific knowledge.
*
* @param indices index name expression
* @param fieldFilter field filter predicate
* @return index mapping(s)
*/
private IndexMappings getFieldMappings(
String[] indices, Function<String, Predicate<String>> fieldFilter) {
Objects.requireNonNull(clusterService, "Cluster service is null");
Objects.requireNonNull(resolver, "Index name expression resolver is null");
public IndexMappings getFieldMappings(String[] indices) {
Objects.requireNonNull(client, "Client is null");

try {
ClusterState state = clusterService.state();
String[] concreteIndices = resolveIndexExpression(state, indices);

IndexMappings mappings;
if (fieldFilter == ALL_FIELDS) {
mappings = findMappingsInCache(state, concreteIndices);
} else {
mappings = findMappings(state, concreteIndices, fieldFilter);
}
Map<String, MappingMetadata> mappingMetadata =
client
.admin()
.indices()
.prepareGetMappings(indices)
.setLocal(true)
.setIndicesOptions(IndicesOptions.strictExpandOpen())
.execute()
.actionGet(0, TimeUnit.NANOSECONDS)
.mappings();

IndexMappings mappings = new IndexMappings(mappingMetadata);

LOG.debug("Found mappings: {}", mappings);
return mappings;
Expand All @@ -174,36 +153,4 @@ private IndexMappings getFieldMappings(
"Failed to read mapping in cluster state for indices=" + Arrays.toString(indices), e);
}
}

private String[] resolveIndexExpression(ClusterState state, String[] indices) {
String[] concreteIndices =
resolver.concreteIndexNames(state, IndicesOptions.strictExpandOpen(), true, indices);

if (LOG.isDebugEnabled()) {
LOG.debug(
"Resolved index expression {} to concrete index names {}",
Arrays.toString(indices),
Arrays.toString(concreteIndices));
}
return concreteIndices;
}

private IndexMappings findMappings(
ClusterState state, String[] indices, Function<String, Predicate<String>> fieldFilter)
throws IOException {
LOG.debug("Cache didn't help. Load and parse mapping in cluster state");
return new IndexMappings(state.metadata().findMappings(indices, fieldFilter));
}

private IndexMappings findMappingsInCache(ClusterState state, String[] indices)
throws ExecutionException {
LOG.debug("Looking for mapping in cache: {}", cache.asMap());
return cache.get(sortToList(indices), () -> findMappings(state, indices, ALL_FIELDS));
}

private <T> List<T> sortToList(T[] array) {
// Mostly array has single element
Arrays.sort(array);
return Arrays.asList(array);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,15 @@
package org.opensearch.sql.legacy.unittest;

import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.opensearch.sql.legacy.util.CheckScriptContents.mockClusterService;
import static org.opensearch.sql.legacy.util.CheckScriptContents.mockLocalClusterState;

import java.io.IOException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterStateListener;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.legacy.esdomain.LocalClusterState;
Expand Down Expand Up @@ -149,41 +138,6 @@ public void getMappingForInvalidField() {
Assert.assertNull(fieldMappings.mapping("manager.name.first.uppercase"));
}

@Test
public void getMappingFromCache() throws IOException {
// Mock here again for verification below and mock addListener()
ClusterService mockService = mockClusterService(MAPPING);
ClusterStateListener[] listener = new ClusterStateListener[1]; // Trick to access inside lambda
doAnswer(
invocation -> {
listener[0] = (ClusterStateListener) invocation.getArguments()[0];
return null;
})
.when(mockService)
.addListener(any());
LocalClusterState.state().setClusterService(mockService);

// 1.Actual findMappings be invoked only once
for (int i = 0; i < 10; i++) {
LocalClusterState.state().getFieldMappings(new String[] {INDEX_NAME});
}
verify(mockService.state().metadata(), times(1))
.findMappings(eq(new String[] {INDEX_NAME}), any());

// 2.Fire cluster state change event
Assert.assertNotNull(listener[0]);
ClusterChangedEvent mockEvent = mock(ClusterChangedEvent.class);
when(mockEvent.metadataChanged()).thenReturn(true);
listener[0].clusterChanged(mockEvent);

// 3.Cache should be invalidated and call findMapping another time only
for (int i = 0; i < 5; i++) {
LocalClusterState.state().getFieldMappings(new String[] {INDEX_NAME});
}
verify(mockService.state().metadata(), times(2))
.findMappings(eq(new String[] {INDEX_NAME}), any());
}

@Test
public void getDefaultValueForQuerySlowLog() {
when(clusterSettings.get(ClusterName.CLUSTER_NAME_SETTING)).thenReturn(ClusterName.DEFAULT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import static java.util.Collections.emptyList;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
Expand All @@ -24,17 +24,15 @@
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.mockito.stubbing.Answer;
import lombok.SneakyThrows;
import org.mockito.Mockito;
import org.opensearch.action.admin.indices.mapping.get.GetFieldMappingsRequest;
import org.opensearch.action.admin.indices.mapping.get.GetFieldMappingsResponse;
import org.opensearch.action.search.SearchRequestBuilder;
import org.opensearch.client.AdminClient;
import org.opensearch.client.Client;
import org.opensearch.client.IndicesAdminClient;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.action.ActionFuture;
import org.opensearch.common.xcontent.XContentType;
Expand Down Expand Up @@ -213,45 +211,28 @@ public static XContentParser createParser(String mappings) throws IOException {
NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, mappings);
}

@SneakyThrows
public static void mockLocalClusterState(String mappings) {
LocalClusterState.state().setClusterService(mockClusterService(mappings));
LocalClusterState.state().setResolver(mockIndexNameExpressionResolver());
LocalClusterState.state().setPluginSettings(mockPluginSettings());
}

public static ClusterService mockClusterService(String mappings) {
ClusterService mockService = mock(ClusterService.class);
ClusterState mockState = mock(ClusterState.class);
Metadata mockMetaData = mock(Metadata.class);

when(mockService.state()).thenReturn(mockState);
when(mockState.metadata()).thenReturn(mockMetaData);
try {
when(mockMetaData.findMappings(any(), any()))
.thenReturn(
Map.of(
TestsConstants.TEST_INDEX_BANK,
IndexMetadata.fromXContent(createParser(mappings)).mapping()));
} catch (IOException e) {
throw new IllegalStateException(e);
}
return mockService;
}

public static IndexNameExpressionResolver mockIndexNameExpressionResolver() {
IndexNameExpressionResolver mockResolver = mock(IndexNameExpressionResolver.class);
when(mockResolver.concreteIndexNames(any(), any(), anyBoolean(), anyString()))
.thenAnswer(
(Answer<String[]>)
invocation -> {
// Return index expression directly without resolving
Object indexExprs = invocation.getArguments()[3];
if (indexExprs instanceof String) {
return new String[] {(String) indexExprs};
}
return (String[]) indexExprs;
});
return mockResolver;
Client client = Mockito.mock(Client.class, Mockito.RETURNS_DEEP_STUBS);

when(client
.admin()
.indices()
.prepareGetMappings(any(String[].class))
.setLocal(anyBoolean())
.setIndicesOptions(any())
.execute()
.actionGet(anyLong(), any())
.mappings())
.thenReturn(
Map.of(
TestsConstants.TEST_INDEX_BANK,
IndexMetadata.fromXContent(createParser(mappings)).mapping()));

LocalClusterState.state().setClusterService(mock(ClusterService.class));
LocalClusterState.state().setPluginSettings(mockPluginSettings());
LocalClusterState.state().setClient(client);
}

public static OpenSearchSettings mockPluginSettings() {
Expand Down
Loading

0 comments on commit 5f2a137

Please sign in to comment.