Skip to content

Commit

Permalink
[Extensions] Improves performance by replacing cluster state calls to…
Browse files Browse the repository at this point in the history
… retrieve index mappings, cluster index health (#921)

* Replacing uses of cluster state to retrieve index mappings from IndexMetaData with GetMappingsRequest for the given index

Signed-off-by: Joshua Palis <[email protected]>

* Replacing clusterstate metadata/routing table calls to determine cluster index health code with ClusterHealthRequest

Signed-off-by: Joshua Palis <[email protected]>

* Fixing Releasable import changes

Signed-off-by: Joshua Palis <[email protected]>

* Spotless

Signed-off-by: Joshua Palis <[email protected]>

---------

Signed-off-by: Joshua Palis <[email protected]>
  • Loading branch information
joshpalis authored Jun 7, 2023
1 parent 56b038f commit 45127f7
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 12 deletions.
2 changes: 1 addition & 1 deletion src/main/java/org/opensearch/ad/NodeStateManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@
import org.opensearch.ad.transport.BackPressureRouting;
import org.opensearch.ad.util.ClientUtil;
import org.opensearch.ad.util.ExceptionUtil;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.common.lease.Releasable;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.sdk.SDKClient.SDKRestClient;
import org.opensearch.sdk.SDKClusterService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@
import org.opensearch.client.indices.CreateIndexRequest;
import org.opensearch.client.indices.CreateIndexResponse;
import org.opensearch.client.indices.GetIndexRequest;
import org.opensearch.client.indices.GetMappingsRequest;
import org.opensearch.client.indices.GetMappingsResponse;
import org.opensearch.client.indices.PutMappingRequest;
import org.opensearch.client.indices.rollover.RolloverRequest;
import org.opensearch.client.opensearch.OpenSearchAsyncClient;
Expand All @@ -77,6 +79,7 @@
import org.opensearch.cluster.LocalNodeMasterListener;
import org.opensearch.cluster.metadata.AliasMetadata;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.MappingMetadata;
import org.opensearch.common.bytes.BytesArray;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
Expand Down Expand Up @@ -480,8 +483,24 @@ public boolean isValidResultIndexMapping(String resultIndex) {
// failed to populate the field
return false;
}
IndexMetadata indexMetadata = sdkClusterService.state().metadata().index(resultIndex);
Map<String, Object> indexMapping = indexMetadata.mapping().sourceAsMap();

GetMappingsRequest getMappingRequest = new GetMappingsRequest().indices(resultIndex);
CompletableFuture<GetMappingsResponse> getMappingsFuture = new CompletableFuture<>();
sdkRestClient
.indices()
.getMapping(getMappingRequest, ActionListener.wrap(response -> { getMappingsFuture.complete(response); }, exception -> {
getMappingsFuture.completeExceptionally(exception);
}));
GetMappingsResponse getMappingResponse = getMappingsFuture
.orTimeout(AnomalyDetectorSettings.REQUEST_TIMEOUT.get(environmentSettings).getMillis(), TimeUnit.MILLISECONDS)
.join();

Map<String, MappingMetadata> resultIndexMappings = getMappingResponse.mappings();
if (resultIndexMappings.size() == 0) {
return false;
}
Map<String, Object> indexMapping = resultIndexMappings.get(resultIndex).sourceAsMap();

String propertyName = CommonName.PROPERTIES;
if (!indexMapping.containsKey(propertyName) || !(indexMapping.get(propertyName) instanceof LinkedHashMap)) {
return false;
Expand Down Expand Up @@ -1040,14 +1059,25 @@ private void shouldUpdateIndex(ADIndex index, ActionListener<Boolean> thenDo) {

@SuppressWarnings("unchecked")
private void shouldUpdateConcreteIndex(String concreteIndex, Integer newVersion, ActionListener<Boolean> thenDo) {
IndexMetadata indexMeataData = sdkClusterService.state().getMetadata().indices().get(concreteIndex);
if (indexMeataData == null) {
GetMappingsRequest getMappingsRequest = new GetMappingsRequest().indices(concreteIndex);
CompletableFuture<GetMappingsResponse> getMappingsFuture = new CompletableFuture<>();
sdkRestClient
.indices()
.getMapping(getMappingsRequest, ActionListener.wrap(response -> { getMappingsFuture.complete(response); }, exception -> {
getMappingsFuture.completeExceptionally(exception);
}));
GetMappingsResponse getMappingResponse = getMappingsFuture
.orTimeout(AnomalyDetectorSettings.REQUEST_TIMEOUT.get(environmentSettings).getMillis(), TimeUnit.MILLISECONDS)
.join();

Map<String, MappingMetadata> concreteIndexMappings = getMappingResponse.mappings();
if (concreteIndexMappings.size() == 0) {
thenDo.onResponse(Boolean.FALSE);
return;
}
Integer oldVersion = CommonValue.NO_SCHEMA_VERSION;

Map<String, Object> indexMapping = indexMeataData.mapping().getSourceAsMap();
Map<String, Object> indexMapping = concreteIndexMappings.get(concreteIndex).sourceAsMap();
Object meta = indexMapping.get(META);
if (meta != null && meta instanceof Map) {
Map<String, Object> metaMapping = (Map<String, Object>) meta;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.common.io.stream.NotSerializableExceptionWrapper;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.transport.NetworkExceptionHelper;
import org.opensearch.core.common.lease.Releasable;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.node.NodeClosedException;
import org.opensearch.rest.RestStatus;
Expand Down
19 changes: 15 additions & 4 deletions src/main/java/org/opensearch/ad/util/IndexUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.ActionListener;
import org.opensearch.action.admin.cluster.health.ClusterHealthRequest;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
Expand Down Expand Up @@ -138,10 +140,19 @@ public String getIndexHealthStatus(String indexOrAliasName) throws IllegalArgume
}
}

ClusterIndexHealth indexHealth = new ClusterIndexHealth(
clusterService.state().metadata().index(indexOrAliasName),
clusterService.state().getRoutingTable().index(indexOrAliasName)
);
ClusterHealthRequest clusterHealthRequest = new ClusterHealthRequest(indexOrAliasName);
CompletableFuture<ClusterHealthResponse> clusterHealthFuture = new CompletableFuture<>();
sdkRestClient
.cluster()
.health(clusterHealthRequest, ActionListener.wrap(response -> { clusterHealthFuture.complete(response); }, exception -> {
clusterHealthFuture.completeExceptionally(exception);
}));

ClusterHealthResponse clusterHealthResponse = clusterHealthFuture
.orTimeout(AnomalyDetectorSettings.REQUEST_TIMEOUT.get(settings).getMillis(), TimeUnit.MILLISECONDS)
.join();

ClusterIndexHealth indexHealth = clusterHealthResponse.getIndices().get(indexOrAliasName);

return indexHealth.getStatus().name().toLowerCase(Locale.ROOT);
}
Expand Down
2 changes: 1 addition & 1 deletion src/test/java/test/org/opensearch/ad/util/FakeNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.io.stream.NamedWriteableRegistry;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.network.NetworkService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.transport.BoundTransportAddress;
import org.opensearch.common.transport.TransportAddress;
import org.opensearch.common.util.PageCacheRecycler;
import org.opensearch.core.common.lease.Releasable;
import org.opensearch.indices.breaker.NoneCircuitBreakerService;
import org.opensearch.tasks.TaskManager;
import org.opensearch.tasks.TaskResourceTrackingService;
Expand Down

0 comments on commit 45127f7

Please sign in to comment.