Skip to content

Commit

Permalink
Remove Direct ClusterState access in LocalClusterState for IndexField…
Browse files Browse the repository at this point in the history
…Mappings

Signed-off-by: Frank Dattalo <[email protected]>
  • Loading branch information
fddattal committed May 25, 2024
1 parent 3a28d2a commit bc189aa
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 205 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,25 @@

package org.opensearch.sql.legacy.esdomain;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.function.Predicate;
import lombok.NonNull;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.MappingMetadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Setting;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.legacy.esdomain.mapping.IndexMappings;
import org.opensearch.sql.opensearch.setting.OpenSearchSettings;

import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

/**
* Local cluster state information which may be stale but help avoid blocking operation in NIO
* thread.
Expand All @@ -43,26 +39,15 @@ public class LocalClusterState {

private static final Logger LOG = LogManager.getLogger();

private static final Function<String, Predicate<String>> ALL_FIELDS =
(anyIndex -> (anyField -> true));

/** Singleton instance */
private static LocalClusterState INSTANCE;

/** Current cluster state on local node */
private ClusterService clusterService;

private OpenSearchSettings pluginSettings;
private Client client;

/** Index name expression resolver to get concrete index name */
private IndexNameExpressionResolver resolver;

/**
* Thread-safe mapping cache to save the computation of sourceAsMap() which is not lightweight as
* thought Array cannot be used as key because hashCode() always return reference address, so
* either use wrapper or List.
*/
private final Cache<List<String>, IndexMappings> cache;
private OpenSearchSettings pluginSettings;

/** Latest setting value for each registered key. Thread-safe is required. */
private final Map<String, Object> latestSettings = new ConcurrentHashMap<>();
Expand All @@ -79,25 +64,30 @@ public static synchronized void state(LocalClusterState instance) {
INSTANCE = instance;
}

public void setClusterService(ClusterService clusterService) {
/**
* Sets the ClusterService used to receive ClusterSetting update notifications.
* @param clusterService The non-null cluster service instance.
*/
public void setClusterService(@NonNull ClusterService clusterService) {
this.clusterService = clusterService;
}

clusterService.addListener(
event -> {
if (event.metadataChanged()) {
// State in cluster service is already changed to event.state() before listener fired
if (LOG.isDebugEnabled()) {
LOG.debug(
"Metadata in cluster state changed: {}",
new IndexMappings(clusterService.state().metadata()));
}
cache.invalidateAll();
}
});
/**
* Sets the Client used to interact with OpenSearch core.
* @param client The non-null client instance
*/
public void setClient(@NonNull Client client) {
this.client = client;
}

public void setPluginSettings(OpenSearchSettings settings) {
/**
* Sets the plugin's settings.
* @param settings The non-null plugin settings instance
*/
public void setPluginSettings(@NonNull OpenSearchSettings settings) {

this.pluginSettings = settings;

for (Setting<?> setting : settings.getSettings()) {
clusterService
.getClusterSettings()
Expand All @@ -112,12 +102,7 @@ public void setPluginSettings(OpenSearchSettings settings) {
}
}

public void setResolver(IndexNameExpressionResolver resolver) {
this.resolver = resolver;
}

private LocalClusterState() {
cache = CacheBuilder.newBuilder().maximumSize(100).build();
}

/**
Expand All @@ -132,39 +117,30 @@ public <T> T getSettingValue(Settings.Key key) {
return (T) latestSettings.getOrDefault(key.getKeyValue(), pluginSettings.getSettingValue(key));
}

/** Get field mappings by index expressions. All types and fields are included in response. */
public IndexMappings getFieldMappings(String[] indices) {
return getFieldMappings(indices, ALL_FIELDS);
}

/**
* Get field mappings by index expressions, type and field filter. Because
* Get field mappings by index expressions. Because
* IndexMetaData/MappingMetaData is hard to convert to FieldMappingMetaData, custom mapping domain
* objects are being used here. In future, it should be moved to domain model layer for all
* OpenSearch specific knowledge.
*
* <p>Note that cluster state may be change inside OpenSearch so it's possible to read different
* state in 2 accesses to ClusterService.state() here.
*
* @param indices index name expression
* @param fieldFilter field filter predicate
* @return index mapping(s)
*/
private IndexMappings getFieldMappings(
String[] indices, Function<String, Predicate<String>> fieldFilter) {
Objects.requireNonNull(clusterService, "Cluster service is null");
Objects.requireNonNull(resolver, "Index name expression resolver is null");
public IndexMappings getFieldMappings(String[] indices) {
Objects.requireNonNull(client, "Client is null");

try {
ClusterState state = clusterService.state();
String[] concreteIndices = resolveIndexExpression(state, indices);

IndexMappings mappings;
if (fieldFilter == ALL_FIELDS) {
mappings = findMappingsInCache(state, concreteIndices);
} else {
mappings = findMappings(state, concreteIndices, fieldFilter);
}
Map<String, MappingMetadata> mappingMetadata = client.admin()
.indices()
.prepareGetMappings(indices)
.setLocal(true)
.setIndicesOptions(IndicesOptions.strictExpandOpen())
.execute()
.actionGet(0, TimeUnit.NANOSECONDS)
.mappings();

IndexMappings mappings = new IndexMappings(mappingMetadata);

LOG.debug("Found mappings: {}", mappings);
return mappings;
Expand All @@ -176,35 +152,4 @@ private IndexMappings getFieldMappings(
}
}

private String[] resolveIndexExpression(ClusterState state, String[] indices) {
String[] concreteIndices =
resolver.concreteIndexNames(state, IndicesOptions.strictExpandOpen(), true, indices);

if (LOG.isDebugEnabled()) {
LOG.debug(
"Resolved index expression {} to concrete index names {}",
Arrays.toString(indices),
Arrays.toString(concreteIndices));
}
return concreteIndices;
}

private IndexMappings findMappings(
ClusterState state, String[] indices, Function<String, Predicate<String>> fieldFilter)
throws IOException {
LOG.debug("Cache didn't help. Load and parse mapping in cluster state");
return new IndexMappings(state.metadata().findMappings(indices, fieldFilter));
}

private IndexMappings findMappingsInCache(ClusterState state, String[] indices)
throws ExecutionException {
LOG.debug("Looking for mapping in cache: {}", cache.asMap());
return cache.get(sortToList(indices), () -> findMappings(state, indices, ALL_FIELDS));
}

private <T> List<T> sortToList(T[] array) {
// Mostly array has single element
Arrays.sort(array);
return Arrays.asList(array);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,12 @@

package org.opensearch.sql.legacy.unittest;

import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.opensearch.sql.legacy.util.CheckScriptContents.mockClusterService;
import static org.opensearch.sql.legacy.util.CheckScriptContents.mockLocalClusterState;

import java.io.IOException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterStateListener;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.legacy.esdomain.LocalClusterState;
Expand All @@ -34,6 +19,10 @@
import org.opensearch.sql.legacy.util.TestsConstants;
import org.opensearch.sql.opensearch.setting.OpenSearchSettings;

import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.when;
import static org.opensearch.sql.legacy.util.CheckScriptContents.mockLocalClusterState;

/** Local cluster state testing without covering OpenSearch logic, ex. resolve index pattern. */
public class LocalClusterStateTest {

Expand Down Expand Up @@ -150,41 +139,6 @@ public void getMappingForInvalidField() {
Assert.assertNull(fieldMappings.mapping("manager.name.first.uppercase"));
}

@Test
public void getMappingFromCache() throws IOException {
// Mock here again for verification below and mock addListener()
ClusterService mockService = mockClusterService(MAPPING);
ClusterStateListener[] listener = new ClusterStateListener[1]; // Trick to access inside lambda
doAnswer(
invocation -> {
listener[0] = (ClusterStateListener) invocation.getArguments()[0];
return null;
})
.when(mockService)
.addListener(any());
LocalClusterState.state().setClusterService(mockService);

// 1.Actual findMappings be invoked only once
for (int i = 0; i < 10; i++) {
LocalClusterState.state().getFieldMappings(new String[] {INDEX_NAME});
}
verify(mockService.state().metadata(), times(1))
.findMappings(eq(new String[] {INDEX_NAME}), any());

// 2.Fire cluster state change event
Assert.assertNotNull(listener[0]);
ClusterChangedEvent mockEvent = mock(ClusterChangedEvent.class);
when(mockEvent.metadataChanged()).thenReturn(true);
listener[0].clusterChanged(mockEvent);

// 3.Cache should be invalidated and call findMapping another time only
for (int i = 0; i < 5; i++) {
LocalClusterState.state().getFieldMappings(new String[] {INDEX_NAME});
}
verify(mockService.state().metadata(), times(2))
.findMappings(eq(new String[] {INDEX_NAME}), any());
}

@Test
public void getDefaultValueForQuerySlowLog() {
when(clusterSettings.get(ClusterName.CLUSTER_NAME_SETTING)).thenReturn(ClusterName.DEFAULT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static java.util.Collections.emptyList;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doReturn;
Expand All @@ -24,6 +25,9 @@
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import lombok.SneakyThrows;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import org.opensearch.action.admin.indices.mapping.get.GetFieldMappingsRequest;
import org.opensearch.action.admin.indices.mapping.get.GetFieldMappingsResponse;
Expand Down Expand Up @@ -214,45 +218,31 @@ public static XContentParser createParser(String mappings) throws IOException {
NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, mappings);
}

@SneakyThrows
public static void mockLocalClusterState(String mappings) {
LocalClusterState.state().setClusterService(mockClusterService(mappings));
LocalClusterState.state().setResolver(mockIndexNameExpressionResolver());
LocalClusterState.state().setPluginSettings(mockPluginSettings());
}

public static ClusterService mockClusterService(String mappings) {
ClusterService mockService = mock(ClusterService.class);
ClusterState mockState = mock(ClusterState.class);
Metadata mockMetaData = mock(Metadata.class);
Client client = Mockito.mock(Client.class, Mockito.RETURNS_DEEP_STUBS);

when(mockService.state()).thenReturn(mockState);
when(mockState.metadata()).thenReturn(mockMetaData);
try {
when(mockMetaData.findMappings(any(), any()))
.thenReturn(
Map.of(
TestsConstants.TEST_INDEX_BANK,
IndexMetadata.fromXContent(createParser(mappings)).mapping()));
} catch (IOException e) {
throw new IllegalStateException(e);
}
return mockService;
}
when(
client
.admin()
.indices()
.prepareGetMappings(any(String[].class))
.setLocal(anyBoolean())
.setIndicesOptions(any())
.execute()
.actionGet(anyLong(), any())
.mappings()
).thenReturn(
Map.of(
TestsConstants.TEST_INDEX_BANK,
IndexMetadata.fromXContent(createParser(mappings)).mapping()
)
);

public static IndexNameExpressionResolver mockIndexNameExpressionResolver() {
IndexNameExpressionResolver mockResolver = mock(IndexNameExpressionResolver.class);
when(mockResolver.concreteIndexNames(any(), any(), anyBoolean(), anyString()))
.thenAnswer(
(Answer<String[]>)
invocation -> {
// Return index expression directly without resolving
Object indexExprs = invocation.getArguments()[3];
if (indexExprs instanceof String) {
return new String[] {(String) indexExprs};
}
return (String[]) indexExprs;
});
return mockResolver;
LocalClusterState.state().setClusterService(mock(ClusterService.class));
LocalClusterState.state().setPluginSettings(mockPluginSettings());
LocalClusterState.state().setClient(client);
}

public static OpenSearchSettings mockPluginSettings() {
Expand Down
Loading

0 comments on commit bc189aa

Please sign in to comment.