Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Remove direct ClusterState access in LocalClusterState #2717

Merged
merged 1 commit into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,19 @@

package org.opensearch.sql.legacy.esdomain;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.concurrent.TimeUnit;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.MappingMetadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Setting;
import org.opensearch.index.IndexNotFoundException;
Expand All @@ -38,30 +35,20 @@
* <p>2) Why injection by AbstractModule doesn't work here? Because this state needs to be used
* across the plugin, ex. in rewriter, pretty formatter etc.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class LocalClusterState {

private static final Logger LOG = LogManager.getLogger();

private static final Function<String, Predicate<String>> ALL_FIELDS =
(anyIndex -> (anyField -> true));

/** Singleton instance */
private static LocalClusterState INSTANCE;

/** Current cluster state on local node */
private ClusterService clusterService;

private OpenSearchSettings pluginSettings;

/** Index name expression resolver to get concrete index name */
private IndexNameExpressionResolver resolver;
private Client client;

/**
* Thread-safe mapping cache to save the computation of sourceAsMap() which is not lightweight as
* thought Array cannot be used as key because hashCode() always return reference address, so
* either use wrapper or List.
*/
private final Cache<List<String>, IndexMappings> cache;
private OpenSearchSettings pluginSettings;

/** Latest setting value for each registered key. Thread-safe is required. */
private final Map<String, Object> latestSettings = new ConcurrentHashMap<>();
Expand All @@ -78,25 +65,33 @@ public static synchronized void state(LocalClusterState instance) {
INSTANCE = instance;
}

public void setClusterService(ClusterService clusterService) {
/**
* Sets the ClusterService used to receive ClusterSetting update notifications.
*
* @param clusterService The non-null cluster service instance.
*/
public void setClusterService(@NonNull ClusterService clusterService) {
this.clusterService = clusterService;
}

clusterService.addListener(
event -> {
if (event.metadataChanged()) {
// State in cluster service is already changed to event.state() before listener fired
if (LOG.isDebugEnabled()) {
LOG.debug(
"Metadata in cluster state changed: {}",
new IndexMappings(clusterService.state().metadata()));
}
cache.invalidateAll();
}
});
/**
* Sets the Client used to interact with OpenSearch core.
*
* @param client The non-null client instance
*/
public void setClient(@NonNull Client client) {
this.client = client;
}

public void setPluginSettings(OpenSearchSettings settings) {
/**
* Sets the plugin's settings.
*
* @param settings The non-null plugin settings instance
*/
public void setPluginSettings(@NonNull OpenSearchSettings settings) {

this.pluginSettings = settings;

for (Setting<?> setting : settings.getSettings()) {
clusterService
.getClusterSettings()
Expand All @@ -111,14 +106,6 @@ public void setPluginSettings(OpenSearchSettings settings) {
}
}

public void setResolver(IndexNameExpressionResolver resolver) {
this.resolver = resolver;
}

private LocalClusterState() {
cache = CacheBuilder.newBuilder().maximumSize(100).build();
}

/**
* Get plugin setting value by key. Return default value if not configured explicitly.
*
Expand All @@ -131,39 +118,31 @@ public <T> T getSettingValue(Settings.Key key) {
return (T) latestSettings.getOrDefault(key.getKeyValue(), pluginSettings.getSettingValue(key));
}

/** Get field mappings by index expressions. All types and fields are included in response. */
public IndexMappings getFieldMappings(String[] indices) {
return getFieldMappings(indices, ALL_FIELDS);
}

/**
* Get field mappings by index expressions, type and field filter. Because
* IndexMetaData/MappingMetaData is hard to convert to FieldMappingMetaData, custom mapping domain
* objects are being used here. In future, it should be moved to domain model layer for all
* OpenSearch specific knowledge.
*
* <p>Note that cluster state may be change inside OpenSearch so it's possible to read different
* state in 2 accesses to ClusterService.state() here.
* Get field mappings by index expressions. Because IndexMetaData/MappingMetaData is hard to
* convert to FieldMappingMetaData, custom mapping domain objects are being used here. In future,
* it should be moved to domain model layer for all OpenSearch specific knowledge.
*
* @param indices index name expression
* @param fieldFilter field filter predicate
* @return index mapping(s)
*/
private IndexMappings getFieldMappings(
String[] indices, Function<String, Predicate<String>> fieldFilter) {
Objects.requireNonNull(clusterService, "Cluster service is null");
Objects.requireNonNull(resolver, "Index name expression resolver is null");
public IndexMappings getFieldMappings(String[] indices) {
Objects.requireNonNull(client, "Client is null");

try {
ClusterState state = clusterService.state();
String[] concreteIndices = resolveIndexExpression(state, indices);

IndexMappings mappings;
if (fieldFilter == ALL_FIELDS) {
mappings = findMappingsInCache(state, concreteIndices);
} else {
mappings = findMappings(state, concreteIndices, fieldFilter);
}
Map<String, MappingMetadata> mappingMetadata =
client
.admin()
.indices()
.prepareGetMappings(indices)
.setLocal(true)
.setIndicesOptions(IndicesOptions.strictExpandOpen())
.execute()
.actionGet(0, TimeUnit.NANOSECONDS)
.mappings();

IndexMappings mappings = new IndexMappings(mappingMetadata);

LOG.debug("Found mappings: {}", mappings);
return mappings;
Expand All @@ -174,36 +153,4 @@ private IndexMappings getFieldMappings(
"Failed to read mapping in cluster state for indices=" + Arrays.toString(indices), e);
}
}

private String[] resolveIndexExpression(ClusterState state, String[] indices) {
String[] concreteIndices =
resolver.concreteIndexNames(state, IndicesOptions.strictExpandOpen(), true, indices);

if (LOG.isDebugEnabled()) {
LOG.debug(
"Resolved index expression {} to concrete index names {}",
Arrays.toString(indices),
Arrays.toString(concreteIndices));
}
return concreteIndices;
}

private IndexMappings findMappings(
ClusterState state, String[] indices, Function<String, Predicate<String>> fieldFilter)
throws IOException {
LOG.debug("Cache didn't help. Load and parse mapping in cluster state");
return new IndexMappings(state.metadata().findMappings(indices, fieldFilter));
}

private IndexMappings findMappingsInCache(ClusterState state, String[] indices)
throws ExecutionException {
LOG.debug("Looking for mapping in cache: {}", cache.asMap());
return cache.get(sortToList(indices), () -> findMappings(state, indices, ALL_FIELDS));
}

private <T> List<T> sortToList(T[] array) {
// Mostly array has single element
Arrays.sort(array);
return Arrays.asList(array);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,15 @@
package org.opensearch.sql.legacy.unittest;

import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.opensearch.sql.legacy.util.CheckScriptContents.mockClusterService;
import static org.opensearch.sql.legacy.util.CheckScriptContents.mockLocalClusterState;

import java.io.IOException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterStateListener;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.legacy.esdomain.LocalClusterState;
Expand Down Expand Up @@ -149,41 +138,6 @@ public void getMappingForInvalidField() {
Assert.assertNull(fieldMappings.mapping("manager.name.first.uppercase"));
}

@Test
public void getMappingFromCache() throws IOException {
// Mock here again for verification below and mock addListener()
ClusterService mockService = mockClusterService(MAPPING);
ClusterStateListener[] listener = new ClusterStateListener[1]; // Trick to access inside lambda
doAnswer(
invocation -> {
listener[0] = (ClusterStateListener) invocation.getArguments()[0];
return null;
})
.when(mockService)
.addListener(any());
LocalClusterState.state().setClusterService(mockService);

// 1.Actual findMappings be invoked only once
for (int i = 0; i < 10; i++) {
LocalClusterState.state().getFieldMappings(new String[] {INDEX_NAME});
}
verify(mockService.state().metadata(), times(1))
.findMappings(eq(new String[] {INDEX_NAME}), any());

// 2.Fire cluster state change event
Assert.assertNotNull(listener[0]);
ClusterChangedEvent mockEvent = mock(ClusterChangedEvent.class);
when(mockEvent.metadataChanged()).thenReturn(true);
listener[0].clusterChanged(mockEvent);

// 3.Cache should be invalidated and call findMapping another time only
for (int i = 0; i < 5; i++) {
LocalClusterState.state().getFieldMappings(new String[] {INDEX_NAME});
}
verify(mockService.state().metadata(), times(2))
.findMappings(eq(new String[] {INDEX_NAME}), any());
}

@Test
public void getDefaultValueForQuerySlowLog() {
when(clusterSettings.get(ClusterName.CLUSTER_NAME_SETTING)).thenReturn(ClusterName.DEFAULT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import static java.util.Collections.emptyList;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
Expand All @@ -24,17 +24,15 @@
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.mockito.stubbing.Answer;
import lombok.SneakyThrows;
import org.mockito.Mockito;
import org.opensearch.action.admin.indices.mapping.get.GetFieldMappingsRequest;
import org.opensearch.action.admin.indices.mapping.get.GetFieldMappingsResponse;
import org.opensearch.action.search.SearchRequestBuilder;
import org.opensearch.client.AdminClient;
import org.opensearch.client.Client;
import org.opensearch.client.IndicesAdminClient;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.action.ActionFuture;
import org.opensearch.common.xcontent.XContentType;
Expand Down Expand Up @@ -213,45 +211,28 @@ public static XContentParser createParser(String mappings) throws IOException {
NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, mappings);
}

@SneakyThrows
public static void mockLocalClusterState(String mappings) {
LocalClusterState.state().setClusterService(mockClusterService(mappings));
LocalClusterState.state().setResolver(mockIndexNameExpressionResolver());
LocalClusterState.state().setPluginSettings(mockPluginSettings());
}

public static ClusterService mockClusterService(String mappings) {
ClusterService mockService = mock(ClusterService.class);
ClusterState mockState = mock(ClusterState.class);
Metadata mockMetaData = mock(Metadata.class);

when(mockService.state()).thenReturn(mockState);
when(mockState.metadata()).thenReturn(mockMetaData);
try {
when(mockMetaData.findMappings(any(), any()))
.thenReturn(
Map.of(
TestsConstants.TEST_INDEX_BANK,
IndexMetadata.fromXContent(createParser(mappings)).mapping()));
} catch (IOException e) {
throw new IllegalStateException(e);
}
return mockService;
}

public static IndexNameExpressionResolver mockIndexNameExpressionResolver() {
IndexNameExpressionResolver mockResolver = mock(IndexNameExpressionResolver.class);
when(mockResolver.concreteIndexNames(any(), any(), anyBoolean(), anyString()))
.thenAnswer(
(Answer<String[]>)
invocation -> {
// Return index expression directly without resolving
Object indexExprs = invocation.getArguments()[3];
if (indexExprs instanceof String) {
return new String[] {(String) indexExprs};
}
return (String[]) indexExprs;
});
return mockResolver;
Client client = Mockito.mock(Client.class, Mockito.RETURNS_DEEP_STUBS);

when(client
.admin()
.indices()
.prepareGetMappings(any(String[].class))
.setLocal(anyBoolean())
.setIndicesOptions(any())
.execute()
.actionGet(anyLong(), any())
.mappings())
.thenReturn(
Map.of(
TestsConstants.TEST_INDEX_BANK,
IndexMetadata.fromXContent(createParser(mappings)).mapping()));

LocalClusterState.state().setClusterService(mock(ClusterService.class));
LocalClusterState.state().setPluginSettings(mockPluginSettings());
LocalClusterState.state().setClient(client);
}

public static OpenSearchSettings mockPluginSettings() {
Expand Down
Loading
Loading