diff --git a/CHANGELOG.md b/CHANGELOG.md index ab0c80e37e14c..e77b183601674 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Refactor remote-routing-table service inline with remote state interfaces([#14668](https://github.com/opensearch-project/OpenSearch/pull/14668)) - Add prefix mode verification setting for repository verification (([#14790](https://github.com/opensearch-project/OpenSearch/pull/14790))) - Optimize TransportNodesAction to not send DiscoveryNodes for NodeStats, NodesInfo and ClusterStats call ([14749](https://github.com/opensearch-project/OpenSearch/pull/14749)) +- Enabling term version check on local state for all ClusterManager Read Transport Actions ([#14273](https://github.com/opensearch-project/OpenSearch/pull/14273)) ### Dependencies - Bump `org.gradle.test-retry` from 1.5.8 to 1.5.9 ([#13442](https://github.com/opensearch-project/OpenSearch/pull/13442)) diff --git a/server/src/internalClusterTest/java/org/opensearch/action/IndicesRequestIT.java b/server/src/internalClusterTest/java/org/opensearch/action/IndicesRequestIT.java index fb0bfb7ba27ce..927a79d4884ef 100644 --- a/server/src/internalClusterTest/java/org/opensearch/action/IndicesRequestIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/action/IndicesRequestIT.java @@ -85,7 +85,7 @@ import org.opensearch.action.search.SearchTransportService; import org.opensearch.action.search.SearchType; import org.opensearch.action.support.clustermanager.term.GetTermVersionAction; -import org.opensearch.action.support.clustermanager.term.GetTermVersionResponse; +import org.opensearch.action.support.clustermanager.term.GetTermVersionRequest; import org.opensearch.action.support.replication.TransportReplicationActionTests; import org.opensearch.action.termvectors.MultiTermVectorsAction; import org.opensearch.action.termvectors.MultiTermVectorsRequest; @@ -95,8 +95,6 @@ import org.opensearch.action.update.UpdateRequest; import org.opensearch.action.update.UpdateResponse; import org.opensearch.client.Requests; -import org.opensearch.cluster.ClusterName; -import org.opensearch.cluster.coordination.ClusterStateTermVersion; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.concurrent.ThreadContext; @@ -128,7 +126,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; @@ -551,14 +548,14 @@ public void testDeleteIndex() { } public void testGetMappings() { - interceptTransportActions(GetMappingsAction.NAME); - stubClusterTermResponse(internalCluster().getClusterManagerName()); - + interceptTransportActions(GetTermVersionAction.NAME, GetMappingsAction.NAME); GetMappingsRequest getMappingsRequest = new GetMappingsRequest().indices(randomIndicesOrAliases()); internalCluster().coordOnlyNodeClient().admin().indices().getMappings(getMappingsRequest).actionGet(); clearInterceptedActions(); - assertSameIndices(getMappingsRequest, GetMappingsAction.NAME); + + assertActionInvocation(GetTermVersionAction.NAME, GetTermVersionRequest.class); + assertNoActionInvocation(GetMappingsAction.NAME); } public void testPutMapping() { @@ -574,7 +571,6 @@ public void testPutMapping() { public void testGetSettings() { interceptTransportActions(GetSettingsAction.NAME); - stubClusterTermResponse(internalCluster().getClusterManagerName()); GetSettingsRequest getSettingsRequest = new GetSettingsRequest().indices(randomIndicesOrAliases()); internalCluster().coordOnlyNodeClient().admin().indices().getSettings(getSettingsRequest).actionGet(); @@ -670,6 +666,21 @@ private static void assertSameIndices(IndicesRequest originalRequest, boolean op } } + private static void assertActionInvocation(String action, Class requestClass) { + List requests = consumeTransportRequests(action); + assertFalse(requests.isEmpty()); + for (TransportRequest internalRequest : requests) { + assertTrue(internalRequest.getClass() == requestClass); + } + } + + private static void assertNoActionInvocation(String... actions) { + for (String action : actions) { + List requests = consumeTransportRequests(action); + assertTrue(requests.isEmpty()); + } + } + private static void assertIndicesSubset(List indices, String... actions) { // indices returned by each bulk shard request need to be a subset of the original indices for (String action : actions) { @@ -789,8 +800,6 @@ public List getTransportInterceptors( } private final Set actions = new HashSet<>(); - private final Map stubHandlers = new ConcurrentHashMap<>(); - private final Map> requests = new HashMap<>(); @Override @@ -813,11 +822,6 @@ synchronized void interceptTransportActions(String... actions) { synchronized void clearInterceptedActions() { actions.clear(); - stubHandlers.clear(); - } - - synchronized void stub(String action, TransportRequestHandler handler) { - stubHandlers.put(action, handler); } private class InterceptingRequestHandler implements TransportRequestHandler { @@ -844,25 +848,9 @@ public void messageReceived(T request, TransportChannel channel, Task task) thro } } } - if (!stubHandlers.containsKey(action)) { - requestHandler.messageReceived(request, channel, task); - } else { - stubHandlers.get(action).messageReceived(request, channel, task); - } + requestHandler.messageReceived(request, channel, task); } } } - - private void stubClusterTermResponse(String master) { - PluginsService pluginsService = internalCluster().getInstance(PluginsService.class, master); - pluginsService.filterPlugins(InterceptingTransportService.TestPlugin.class).stream().findFirst().get().instance.stub( - GetTermVersionAction.NAME, - (request, channel, task) -> channel.sendResponse( - new GetTermVersionResponse(new ClusterStateTermVersion(new ClusterName("test"), "1", -1, -1)) - ) - ); - - } - } diff --git a/server/src/internalClusterTest/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionForClusterManagerIT.java b/server/src/internalClusterTest/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionForClusterManagerIT.java index 21d89771e5fa6..e3a4216e772fb 100644 --- a/server/src/internalClusterTest/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionForClusterManagerIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionForClusterManagerIT.java @@ -15,8 +15,8 @@ import org.opensearch.action.support.clustermanager.term.GetTermVersionAction; import org.opensearch.action.support.clustermanager.term.GetTermVersionResponse; import org.opensearch.client.node.NodeClient; -import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.coordination.ClusterStateTermVersion; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; @@ -92,15 +92,32 @@ public void init() { // Enable admission control client().admin().cluster().prepareUpdateSettings().setTransientSettings(ENFORCE_ADMISSION_CONTROL).execute().actionGet(); + MockTransportService primaryService = (MockTransportService) internalCluster().getInstance( + TransportService.class, + clusterManagerNode + ); + + // Force always fetch from ClusterManager + ClusterService clusterService = internalCluster().clusterService(); + GetTermVersionResponse oosTerm = new GetTermVersionResponse( + new ClusterStateTermVersion( + clusterService.state().getClusterName(), + clusterService.state().metadata().clusterUUID(), + clusterService.state().term() - 1, + clusterService.state().version() - 1 + ) + ); + primaryService.addRequestHandlingBehavior( + GetTermVersionAction.NAME, + (handler, request, channel, task) -> channel.sendResponse(oosTerm) + ); } public void testAdmissionControlEnforced() throws Exception { - stubClusterTermResponse(internalCluster().getClusterManagerName()); cMResourceCollector.collectNodeResourceUsageStats(clusterManagerNodeId, System.currentTimeMillis(), 97, 99, new IoUsageStats(98)); // Write API on ClusterManager assertAcked(prepareCreate("test").setMapping("field", "type=text").setAliases("{\"alias1\" : {}}")); - stubClusterTermResponse(internalCluster().getClusterManagerName()); // Read API on ClusterManager GetAliasesRequest aliasesRequest = new GetAliasesRequest(); @@ -171,7 +188,6 @@ public void admissionControlDisabledOnBreach(Settings admission) throws Interrup } public void testAdmissionControlResponseStatus() throws Exception { - stubClusterTermResponse(internalCluster().getClusterManagerName()); cMResourceCollector.collectNodeResourceUsageStats(clusterManagerNodeId, System.currentTimeMillis(), 97, 99, new IoUsageStats(98)); // Write API on ClusterManager @@ -211,12 +227,4 @@ Map getAdmissionControlStats(AdmissionControlS } return acStats; } - - private void stubClusterTermResponse(String master) { - MockTransportService primaryService = (MockTransportService) internalCluster().getInstance(TransportService.class, master); - primaryService.addRequestHandlingBehavior(GetTermVersionAction.NAME, (handler, request, channel, task) -> { - channel.sendResponse(new GetTermVersionResponse(new ClusterStateTermVersion(new ClusterName("test"), "1", -1, -1))); - }); - } - } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/get/TransportGetDecommissionStateAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/get/TransportGetDecommissionStateAction.java index 22feb4d99297a..c8a3be78a790e 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/get/TransportGetDecommissionStateAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/get/TransportGetDecommissionStateAction.java @@ -48,7 +48,8 @@ public TransportGetDecommissionStateAction( threadPool, actionFilters, GetDecommissionStateRequest::new, - indexNameExpressionResolver + indexNameExpressionResolver, + true ); } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/repositories/get/TransportGetRepositoriesAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/repositories/get/TransportGetRepositoriesAction.java index c7d784dbc96e7..c99b52dfe34f4 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/repositories/get/TransportGetRepositoriesAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/repositories/get/TransportGetRepositoriesAction.java @@ -79,7 +79,8 @@ public TransportGetRepositoriesAction( threadPool, actionFilters, GetRepositoriesRequest::new, - indexNameExpressionResolver + indexNameExpressionResolver, + true ); } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/TransportClusterSearchShardsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/TransportClusterSearchShardsAction.java index a2a65b6400c97..83e104236f640 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/shards/TransportClusterSearchShardsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/TransportClusterSearchShardsAction.java @@ -85,7 +85,8 @@ public TransportClusterSearchShardsAction( threadPool, actionFilters, ClusterSearchShardsRequest::new, - indexNameExpressionResolver + indexNameExpressionResolver, + true ); this.indicesService = indicesService; } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/TransportGetWeightedRoutingAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/TransportGetWeightedRoutingAction.java index 50368d85e0011..6c110c0ea2a73 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/TransportGetWeightedRoutingAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/TransportGetWeightedRoutingAction.java @@ -55,7 +55,8 @@ public TransportGetWeightedRoutingAction( threadPool, actionFilters, ClusterGetWeightedRoutingRequest::new, - indexNameExpressionResolver + indexNameExpressionResolver, + true ); this.weightedRoutingService = weightedRoutingService; } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/state/TransportClusterStateAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/state/TransportClusterStateAction.java index cdb00b584c5dd..13ea7eaa43bf8 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/state/TransportClusterStateAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/state/TransportClusterStateAction.java @@ -92,6 +92,7 @@ public TransportClusterStateAction( ClusterStateRequest::new, indexNameExpressionResolver ); + this.localExecuteSupported = true; } @Override diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/storedscripts/TransportGetStoredScriptAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/storedscripts/TransportGetStoredScriptAction.java index db1f1edde2812..c34ec49406802 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/storedscripts/TransportGetStoredScriptAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/storedscripts/TransportGetStoredScriptAction.java @@ -73,7 +73,8 @@ public TransportGetStoredScriptAction( threadPool, actionFilters, GetStoredScriptRequest::new, - indexNameExpressionResolver + indexNameExpressionResolver, + true ); this.scriptService = scriptService; } diff --git a/server/src/main/java/org/opensearch/action/admin/indices/alias/get/TransportGetAliasesAction.java b/server/src/main/java/org/opensearch/action/admin/indices/alias/get/TransportGetAliasesAction.java index 3aca9c1976f16..4f4e3bd481ee7 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/alias/get/TransportGetAliasesAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/alias/get/TransportGetAliasesAction.java @@ -86,7 +86,8 @@ public TransportGetAliasesAction( threadPool, actionFilters, GetAliasesRequest::new, - indexNameExpressionResolver + indexNameExpressionResolver, + true ); this.systemIndices = systemIndices; } diff --git a/server/src/main/java/org/opensearch/action/admin/indices/exists/indices/TransportIndicesExistsAction.java b/server/src/main/java/org/opensearch/action/admin/indices/exists/indices/TransportIndicesExistsAction.java index 428a0eb35513d..a298eae1aa865 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/exists/indices/TransportIndicesExistsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/exists/indices/TransportIndicesExistsAction.java @@ -71,7 +71,8 @@ public TransportIndicesExistsAction( threadPool, actionFilters, IndicesExistsRequest::new, - indexNameExpressionResolver + indexNameExpressionResolver, + true ); } diff --git a/server/src/main/java/org/opensearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java b/server/src/main/java/org/opensearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java index 3fbf9ac1bb570..a8b97d0f344ae 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java @@ -105,7 +105,8 @@ public TransportIndicesShardStoresAction( threadPool, actionFilters, IndicesShardStoresRequest::new, - indexNameExpressionResolver + indexNameExpressionResolver, + true ); this.listShardStoresInfo = listShardStoresInfo; } diff --git a/server/src/main/java/org/opensearch/action/admin/indices/template/get/TransportGetComponentTemplateAction.java b/server/src/main/java/org/opensearch/action/admin/indices/template/get/TransportGetComponentTemplateAction.java index e2594cd792cd3..c3217d109044d 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/template/get/TransportGetComponentTemplateAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/template/get/TransportGetComponentTemplateAction.java @@ -76,7 +76,8 @@ public TransportGetComponentTemplateAction( threadPool, actionFilters, GetComponentTemplateAction.Request::new, - indexNameExpressionResolver + indexNameExpressionResolver, + true ); } diff --git a/server/src/main/java/org/opensearch/action/admin/indices/template/get/TransportGetComposableIndexTemplateAction.java b/server/src/main/java/org/opensearch/action/admin/indices/template/get/TransportGetComposableIndexTemplateAction.java index b1ef32db7274f..84fbb59481c10 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/template/get/TransportGetComposableIndexTemplateAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/template/get/TransportGetComposableIndexTemplateAction.java @@ -76,7 +76,8 @@ public TransportGetComposableIndexTemplateAction( threadPool, actionFilters, GetComposableIndexTemplateAction.Request::new, - indexNameExpressionResolver + indexNameExpressionResolver, + true ); } diff --git a/server/src/main/java/org/opensearch/action/admin/indices/template/get/TransportGetIndexTemplatesAction.java b/server/src/main/java/org/opensearch/action/admin/indices/template/get/TransportGetIndexTemplatesAction.java index 10b4975f7b9d0..522234dda509f 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/template/get/TransportGetIndexTemplatesAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/template/get/TransportGetIndexTemplatesAction.java @@ -76,7 +76,8 @@ public TransportGetIndexTemplatesAction( threadPool, actionFilters, GetIndexTemplatesRequest::new, - indexNameExpressionResolver + indexNameExpressionResolver, + true ); } diff --git a/server/src/main/java/org/opensearch/action/ingest/GetPipelineTransportAction.java b/server/src/main/java/org/opensearch/action/ingest/GetPipelineTransportAction.java index 80333c7346f92..7bc0380bccbc0 100644 --- a/server/src/main/java/org/opensearch/action/ingest/GetPipelineTransportAction.java +++ b/server/src/main/java/org/opensearch/action/ingest/GetPipelineTransportAction.java @@ -70,7 +70,8 @@ public GetPipelineTransportAction( threadPool, actionFilters, GetPipelineRequest::new, - indexNameExpressionResolver + indexNameExpressionResolver, + true ); } diff --git a/server/src/main/java/org/opensearch/action/search/GetSearchPipelineTransportAction.java b/server/src/main/java/org/opensearch/action/search/GetSearchPipelineTransportAction.java index a7fcb8f1cfbae..215b7ae1a610c 100644 --- a/server/src/main/java/org/opensearch/action/search/GetSearchPipelineTransportAction.java +++ b/server/src/main/java/org/opensearch/action/search/GetSearchPipelineTransportAction.java @@ -48,7 +48,8 @@ public GetSearchPipelineTransportAction( threadPool, actionFilters, GetSearchPipelineRequest::new, - indexNameExpressionResolver + indexNameExpressionResolver, + true ); } diff --git a/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeReadAction.java b/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeReadAction.java index 98b18c79a3c8d..88cb2ed6a9bf0 100644 --- a/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeReadAction.java +++ b/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeReadAction.java @@ -51,6 +51,8 @@ public abstract class TransportClusterManagerNodeReadAction< Request extends ClusterManagerNodeReadRequest, Response extends ActionResponse> extends TransportClusterManagerNodeAction { + protected boolean localExecuteSupported = false; + protected TransportClusterManagerNodeReadAction( String actionName, TransportService transportService, @@ -58,7 +60,8 @@ protected TransportClusterManagerNodeReadAction( ThreadPool threadPool, ActionFilters actionFilters, Writeable.Reader request, - IndexNameExpressionResolver indexNameExpressionResolver + IndexNameExpressionResolver indexNameExpressionResolver, + boolean localExecuteSupported ) { this( actionName, @@ -71,6 +74,19 @@ protected TransportClusterManagerNodeReadAction( request, indexNameExpressionResolver ); + this.localExecuteSupported = localExecuteSupported; + } + + protected TransportClusterManagerNodeReadAction( + String actionName, + TransportService transportService, + ClusterService clusterService, + ThreadPool threadPool, + ActionFilters actionFilters, + Writeable.Reader request, + IndexNameExpressionResolver indexNameExpressionResolver + ) { + this(actionName, transportService, clusterService, threadPool, actionFilters, request, indexNameExpressionResolver, false); } protected TransportClusterManagerNodeReadAction( @@ -126,7 +142,7 @@ protected final boolean localExecute(Request request) { } protected boolean localExecuteSupportedByAction() { - return true; + return localExecuteSupported; } } diff --git a/server/src/main/java/org/opensearch/action/support/clustermanager/info/TransportClusterInfoAction.java b/server/src/main/java/org/opensearch/action/support/clustermanager/info/TransportClusterInfoAction.java index 65f00a4731ab5..8a0082ad05f66 100644 --- a/server/src/main/java/org/opensearch/action/support/clustermanager/info/TransportClusterInfoAction.java +++ b/server/src/main/java/org/opensearch/action/support/clustermanager/info/TransportClusterInfoAction.java @@ -62,6 +62,7 @@ public TransportClusterInfoAction( IndexNameExpressionResolver indexNameExpressionResolver ) { super(actionName, transportService, clusterService, threadPool, actionFilters, request, indexNameExpressionResolver); + this.localExecuteSupported = true; } @Override diff --git a/server/src/test/java/org/opensearch/action/admin/indices/mapping/get/GetMappingsActionTests.java b/server/src/test/java/org/opensearch/action/admin/indices/mapping/get/GetMappingsActionTests.java new file mode 100644 index 0000000000000..87f218760038e --- /dev/null +++ b/server/src/test/java/org/opensearch/action/admin/indices/mapping/get/GetMappingsActionTests.java @@ -0,0 +1,227 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.action.admin.indices.mapping.get; + +import org.opensearch.Version; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.clustermanager.term.GetTermVersionResponse; +import org.opensearch.action.support.replication.ClusterStateCreationUtils; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.block.ClusterBlock; +import org.opensearch.cluster.block.ClusterBlockException; +import org.opensearch.cluster.block.ClusterBlockLevel; +import org.opensearch.cluster.block.ClusterBlocks; +import org.opensearch.cluster.coordination.ClusterStateTermVersion; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodeRole; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.settings.SettingsFilter; +import org.opensearch.common.settings.SettingsModule; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.indices.IndicesService; +import org.opensearch.telemetry.tracing.noop.NoopTracer; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.test.transport.CapturingTransport; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; + +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.concurrent.TimeUnit; + +import static java.util.Collections.emptyList; +import static java.util.Collections.emptySet; +import static org.opensearch.test.ClusterServiceUtils.createClusterService; +import static org.opensearch.test.ClusterServiceUtils.setState; +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.mock; + +public class GetMappingsActionTests extends OpenSearchTestCase { + private TransportService transportService; + private ClusterService clusterService; + private ThreadPool threadPool; + private SettingsFilter settingsFilter; + private final String indexName = "test_index"; + CapturingTransport capturingTransport = new CapturingTransport(); + private DiscoveryNode localNode; + private DiscoveryNode remoteNode; + private DiscoveryNode[] allNodes; + private TransportGetMappingsAction transportAction = null; + + @Before + public void setUp() throws Exception { + super.setUp(); + + settingsFilter = new SettingsModule(Settings.EMPTY, emptyList(), emptyList(), emptySet()).getSettingsFilter(); + threadPool = new TestThreadPool("GetIndexActionTests"); + clusterService = createClusterService(threadPool); + + transportService = capturingTransport.createTransportService( + clusterService.getSettings(), + threadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, + boundAddress -> clusterService.localNode(), + null, + emptySet(), + NoopTracer.INSTANCE + ); + transportService.start(); + transportService.acceptIncomingRequests(); + + localNode = new DiscoveryNode( + "local_node", + buildNewFakeTransportAddress(), + Collections.emptyMap(), + Collections.singleton(DiscoveryNodeRole.DATA_ROLE), + Version.CURRENT + ); + remoteNode = new DiscoveryNode( + "remote_node", + buildNewFakeTransportAddress(), + Collections.emptyMap(), + Collections.singleton(DiscoveryNodeRole.CLUSTER_MANAGER_ROLE), + Version.CURRENT + ); + allNodes = new DiscoveryNode[] { localNode, remoteNode }; + setState(clusterService, ClusterStateCreationUtils.state(localNode, remoteNode, allNodes)); + transportAction = new TransportGetMappingsAction( + GetMappingsActionTests.this.transportService, + GetMappingsActionTests.this.clusterService, + GetMappingsActionTests.this.threadPool, + new ActionFilters(emptySet()), + new IndexNameExpressionResolver(new ThreadContext(Settings.EMPTY)), + mock(IndicesService.class) + ); + + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + clusterService.close(); + transportService.close(); + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); + } + + public void testGetTransportWithoutMatchingTerm() { + transportAction.execute(null, new GetMappingsRequest(), ActionListener.wrap(Assert::assertNotNull, exception -> { + throw new AssertionError(exception); + })); + assertThat(capturingTransport.capturedRequests().length, equalTo(1)); + CapturingTransport.CapturedRequest capturedRequest = capturingTransport.capturedRequests()[0]; + // mismatch term and version + GetTermVersionResponse termResp = new GetTermVersionResponse( + new ClusterStateTermVersion( + clusterService.state().getClusterName(), + clusterService.state().metadata().clusterUUID(), + clusterService.state().term() - 1, + clusterService.state().version() - 1 + ) + ); + capturingTransport.handleResponse(capturedRequest.requestId, termResp); + + assertThat(capturingTransport.capturedRequests().length, equalTo(2)); + CapturingTransport.CapturedRequest capturedRequest1 = capturingTransport.capturedRequests()[1]; + + capturingTransport.handleResponse(capturedRequest1.requestId, new GetMappingsResponse(new HashMap<>())); + } + + public void testGetTransportWithMatchingTerm() { + transportAction.execute(null, new GetMappingsRequest(), ActionListener.wrap(Assert::assertNotNull, exception -> { + throw new AssertionError(exception); + })); + assertThat(capturingTransport.capturedRequests().length, equalTo(1)); + CapturingTransport.CapturedRequest capturedRequest = capturingTransport.capturedRequests()[0]; + GetTermVersionResponse termResp = new GetTermVersionResponse( + new ClusterStateTermVersion( + clusterService.state().getClusterName(), + clusterService.state().metadata().clusterUUID(), + clusterService.state().term(), + clusterService.state().version() + ) + ); + capturingTransport.handleResponse(capturedRequest.requestId, termResp); + + // no more transport calls + assertThat(capturingTransport.capturedRequests().length, equalTo(1)); + } + + public void testGetTransportClusterBlockWithMatchingTerm() { + ClusterBlock readClusterBlock = new ClusterBlock( + 1, + "uuid", + "", + false, + true, + true, + RestStatus.OK, + EnumSet.of(ClusterBlockLevel.METADATA_READ) + ); + ClusterBlocks.Builder builder = ClusterBlocks.builder(); + builder.addGlobalBlock(readClusterBlock); + ClusterState metadataReadBlockedState = ClusterState.builder(ClusterStateCreationUtils.state(localNode, remoteNode, allNodes)) + .blocks(builder) + .build(); + setState(clusterService, metadataReadBlockedState); + + transportAction.execute( + null, + new GetMappingsRequest(), + ActionListener.wrap(response -> { throw new AssertionError(response); }, exception -> { + Assert.assertTrue(exception instanceof ClusterBlockException); + }) + ); + assertThat(capturingTransport.capturedRequests().length, equalTo(1)); + CapturingTransport.CapturedRequest capturedRequest = capturingTransport.capturedRequests()[0]; + GetTermVersionResponse termResp = new GetTermVersionResponse( + new ClusterStateTermVersion( + clusterService.state().getClusterName(), + clusterService.state().metadata().clusterUUID(), + clusterService.state().term(), + clusterService.state().version() + ) + ); + capturingTransport.handleResponse(capturedRequest.requestId, termResp); + + // no more transport calls + assertThat(capturingTransport.capturedRequests().length, equalTo(1)); + } +}