From fa803c8cf5a88df4741e99bb3f482ee2fe2b6a0d Mon Sep 17 00:00:00 2001 From: kkewwei Date: Mon, 22 Jul 2024 15:47:28 +0800 Subject: [PATCH] optimize _cat/nodes api Signed-off-by: kkewwei --- .../java/org/opensearch/http/HttpCatIT.java | 53 +++++ .../rest/action/cat/RestNodesAction.java | 208 +++++++++++++++--- .../bootstrap/test-framework.policy | 2 + 3 files changed, 229 insertions(+), 34 deletions(-) create mode 100644 qa/smoke-test-http/src/test/java/org/opensearch/http/HttpCatIT.java diff --git a/qa/smoke-test-http/src/test/java/org/opensearch/http/HttpCatIT.java b/qa/smoke-test-http/src/test/java/org/opensearch/http/HttpCatIT.java new file mode 100644 index 0000000000000..a6638f37fa821 --- /dev/null +++ b/qa/smoke-test-http/src/test/java/org/opensearch/http/HttpCatIT.java @@ -0,0 +1,53 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.http; + +import org.apache.hc.core5.http.ParseException; +import org.apache.hc.core5.http.io.entity.EntityUtils; +import org.junit.BeforeClass; +import org.opensearch.client.Request; +import org.opensearch.client.Response; +import org.opensearch.client.ResponseException; +import org.opensearch.client.RestClient; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.io.IOException; + +import static org.apache.hc.core5.http.HttpStatus.SC_OK; +import static org.hamcrest.Matchers.containsString; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 5, numClientNodes = 0) +public class HttpCatIT extends HttpSmokeTestCase { + + @BeforeClass + public static void doNotSetAvailableProcessors() { + System.setProperty("opensearch.set.netty.runtime.available.processors", "false"); + } + + public void testdoCatRequest() throws IOException, ParseException { + try (RestClient restClient = getRestClient()) { + int nodesCount = restClient.getNodes().size(); + for (int i = 0; i < 20; i++) { + Request nodesRequest = new Request("GET", "/_cat/nodes?timeout=" + randomInt(300) + "ms"); + try { + Response response = restClient.performRequest(nodesRequest); + assertEquals(SC_OK, response.getStatusLine().getStatusCode()); + String result = EntityUtils.toString(response.getEntity()); + String[] NodeInfos = result.split("\n"); + assertEquals(nodesCount, NodeInfos.length); + } catch (ResponseException e) { + // it means that it costs too long to get ClusterState from the master. + assertThat(e.getMessage(), containsString("costs too long to get ClusterState from the master")); + } + } + } + } + +} + diff --git a/server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java b/server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java index bffb50cc63401..acc538b5dd601 100644 --- a/server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java +++ b/server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java @@ -32,6 +32,8 @@ package org.opensearch.rest.action.cat; +import org.opensearch.OpenSearchTimeoutException; +import org.opensearch.action.FailedNodeException; import org.opensearch.action.admin.cluster.node.info.NodeInfo; import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest; import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse; @@ -47,6 +49,8 @@ import org.opensearch.common.Table; import org.opensearch.common.logging.DeprecationLogger; import org.opensearch.common.network.NetworkAddress; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.Strings; import org.opensearch.core.common.transport.TransportAddress; import org.opensearch.core.common.unit.ByteSizeValue; @@ -68,15 +72,22 @@ import org.opensearch.monitor.os.OsStats; import org.opensearch.monitor.process.ProcessInfo; import org.opensearch.monitor.process.ProcessStats; +import org.opensearch.rest.RestChannel; import org.opensearch.rest.RestRequest; -import org.opensearch.rest.RestResponse; import org.opensearch.rest.action.RestActionListener; -import org.opensearch.rest.action.RestResponseListener; import org.opensearch.script.ScriptStats; import org.opensearch.search.suggest.completion.CompletionStats; +import org.opensearch.threadpool.ThreadPool; +import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Locale; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.function.Consumer; +import java.util.function.Supplier; import java.util.stream.Collectors; import static java.util.Collections.singletonList; @@ -88,6 +99,7 @@ * @opensearch.api */ public class RestNodesAction extends AbstractCatAction { + public static final long TIMEOUT_THRESHOLD_MILLIS = 5; private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(RestNodesAction.class); static final String LOCAL_DEPRECATED_MESSAGE = "Deprecated parameter [local] used. This parameter does not cause this API to act " + "locally, and should not be used. It will be unsupported in version 8.0."; @@ -120,47 +132,175 @@ public RestChannelConsumer doCatRequest(final RestRequest request, final NodeCli ); parseDeprecatedMasterTimeoutParameter(clusterStateRequest, request, deprecationLogger, getName()); final boolean fullId = request.paramAsBoolean("full_id", false); - return channel -> client.admin().cluster().state(clusterStateRequest, new RestActionListener(channel) { + ThreadPool threadPool = client.admin().cluster().threadPool(); + long beginTime = threadPool.relativeTimeInMillis(); + final long timeout = request.hasParam("timeout") + ? TimeValue.parseTimeValue(request.param("timeout"), "timeout").millis() + : Long.MAX_VALUE; + return channel -> client.admin().cluster().state(clusterStateRequest, new RestActionListener<>(channel) { @Override public void processResponse(final ClusterStateResponse clusterStateResponse) { - NodesInfoRequest nodesInfoRequest = new NodesInfoRequest(); - nodesInfoRequest.timeout(request.param("timeout")); - nodesInfoRequest.clear() - .addMetrics( - NodesInfoRequest.Metric.JVM.metricName(), - NodesInfoRequest.Metric.OS.metricName(), - NodesInfoRequest.Metric.PROCESS.metricName(), - NodesInfoRequest.Metric.HTTP.metricName() + long leftTime = timeout - threadPool.relativeTimeInMillis() + beginTime; + if (leftTime < TIMEOUT_THRESHOLD_MILLIS) { + onFailure( + new OpenSearchTimeoutException( + "costs too long to get ClusterState from the master:" + + clusterStateResponse.getState().nodes().getMasterNode().getName() + ) ); - client.admin().cluster().nodesInfo(nodesInfoRequest, new RestActionListener(channel) { - @Override - public void processResponse(final NodesInfoResponse nodesInfoResponse) { - NodesStatsRequest nodesStatsRequest = new NodesStatsRequest(); - nodesStatsRequest.timeout(request.param("timeout")); - nodesStatsRequest.clear() - .indices(true) - .addMetrics( - NodesStatsRequest.Metric.JVM.metricName(), - NodesStatsRequest.Metric.OS.metricName(), - NodesStatsRequest.Metric.FS.metricName(), - NodesStatsRequest.Metric.PROCESS.metricName(), - NodesStatsRequest.Metric.SCRIPT.metricName() + return; + } + String[] nodeIds = clusterStateResponse.getState().nodes().resolveNodes(null); + CountDownLatch nodesCount = new CountDownLatch(nodeIds.length); + ConcurrentMap successNodeInfos = new ConcurrentHashMap<>(nodeIds.length); + ConcurrentMap failNodeInfos = new ConcurrentHashMap<>(nodeIds.length); + ConcurrentMap successNodeStats = new ConcurrentHashMap<>(nodeIds.length); + ConcurrentMap failNodeStats = new ConcurrentHashMap<>(nodeIds.length); + for (String nodeId : nodeIds) { + NodesInfoRequest nodesInfoRequest = createNodesInfoRequest(timeout, leftTime, nodeId); + client.admin().cluster().nodesInfo(nodesInfoRequest, new ActionListener<>() { + @Override + public void onResponse(NodesInfoResponse nodesInfoResponse) { + assert nodesInfoResponse.getNodes().size() + nodesInfoResponse.failures().size() == 1; + NodesStatsRequest nodesStatsRequest = checkAndCreateNodesStatsRequest( + nodesInfoResponse.failures(), + timeout, + beginTime, + nodeId, + this::onFailure, + threadPool::relativeTimeInMillis, + clusterStateResponse.getState().nodes().get(nodeId).getName() ); - client.admin().cluster().nodesStats(nodesStatsRequest, new RestResponseListener(channel) { - @Override - public RestResponse buildResponse(NodesStatsResponse nodesStatsResponse) throws Exception { - return RestTable.buildResponse( - buildTable(fullId, request, clusterStateResponse, nodesInfoResponse, nodesStatsResponse), - channel - ); + if (nodesStatsRequest == null) { + return; } - }); - } - }); + successNodeInfos.put(nodeId, nodesInfoResponse.getNodes().get(0)); + client.admin().cluster().nodesStats(nodesStatsRequest, ActionListener.runAfter(new ActionListener<>() { + @Override + public void onResponse(NodesStatsResponse nodesStatsResponse) { + assert nodesStatsResponse.getNodes().size() + nodesStatsResponse.failures().size() == 1; + if (nodesStatsResponse.getNodes().size() == 1) { + successNodeStats.put(nodeId, nodesStatsResponse.getNodes().get(0)); + } else { + failNodeStats.put(nodeId, nodesStatsResponse.failures().get(0)); + } + } + + @Override + public void onFailure(Exception e) { + assert e instanceof FailedNodeException; + failNodeStats.put(nodeId, (FailedNodeException) e); + } + }, nodesCount::countDown)); + } + + @Override + public void onFailure(Exception e) { + assert e instanceof FailedNodeException; + failNodeInfos.put(nodeId, (FailedNodeException) e); + nodesCount.countDown(); + } + }); + } + + try { + nodesCount.await(); + sendResponse( + channel, + clusterStateResponse, + request, + fullId, + successNodeInfos.values(), + failNodeInfos.values(), + successNodeStats.values(), + failNodeStats.values() + ); + } catch (Exception e) { + e.addSuppressed(e); + logger.error("failed to send failure response", e); + } } }); } + private NodesInfoRequest createNodesInfoRequest(long timeout, long leftTime, String nodeId) { + NodesInfoRequest nodesInfoRequest = new NodesInfoRequest(); + if (timeout != Long.MAX_VALUE) { + nodesInfoRequest.timeout(TimeValue.timeValueMillis(leftTime)); + } + nodesInfoRequest.clear() + .nodesIds(nodeId) + .addMetrics( + NodesInfoRequest.Metric.JVM.metricName(), + NodesInfoRequest.Metric.OS.metricName(), + NodesInfoRequest.Metric.PROCESS.metricName(), + NodesInfoRequest.Metric.HTTP.metricName() + ); + return nodesInfoRequest; + } + + private NodesStatsRequest checkAndCreateNodesStatsRequest( + List failedNodeExceptions, + long timeout, + long beginTime, + String nodeId, + Consumer failedConsumer, + Supplier currentTimeSupplier, + String nodeName + ) { + if (failedNodeExceptions.isEmpty() == false) { + failedConsumer.accept(failedNodeExceptions.get(0)); + return null; + } + long leftTime = timeout - currentTimeSupplier.get() + beginTime; + if (leftTime < TIMEOUT_THRESHOLD_MILLIS) { + failedConsumer.accept( + new FailedNodeException(nodeId, "There is not enough time to obtain nodesInfo metric from " + nodeName, null) + ); + return null; + } + NodesStatsRequest nodesStatsRequest = new NodesStatsRequest(); + if (timeout != Long.MAX_VALUE) { + nodesStatsRequest.timeout(TimeValue.timeValueMillis(leftTime)); + } + nodesStatsRequest.clear() + .nodesIds(nodeId) + .indices(true) + .addMetrics( + NodesStatsRequest.Metric.JVM.metricName(), + NodesStatsRequest.Metric.OS.metricName(), + NodesStatsRequest.Metric.FS.metricName(), + NodesStatsRequest.Metric.PROCESS.metricName(), + NodesStatsRequest.Metric.SCRIPT.metricName() + ); + return nodesStatsRequest; + } + + private void sendResponse( + RestChannel channel, + ClusterStateResponse clusterStateResponse, + RestRequest request, + boolean fullId, + Collection successNodeInfos, + Collection failNodeInfos, + Collection successNodeStats, + Collection failNodeStats + ) throws Exception { + NodesInfoResponse nodesInfoResponse = new NodesInfoResponse( + clusterStateResponse.getClusterName(), + new ArrayList<>(successNodeInfos), + new ArrayList<>(failNodeInfos) + ); + NodesStatsResponse nodesStatsResponse = new NodesStatsResponse( + clusterStateResponse.getClusterName(), + new ArrayList<>(successNodeStats), + new ArrayList<>(failNodeStats) + ); + channel.sendResponse( + RestTable.buildResponse(buildTable(fullId, request, clusterStateResponse, nodesInfoResponse, nodesStatsResponse), channel) + ); + } + @Override protected Table getTableWithHeader(final RestRequest request) { Table table = new Table(); diff --git a/server/src/main/resources/org/opensearch/bootstrap/test-framework.policy b/server/src/main/resources/org/opensearch/bootstrap/test-framework.policy index 0abfd7ef22ae7..d9ed09452ca01 100644 --- a/server/src/main/resources/org/opensearch/bootstrap/test-framework.policy +++ b/server/src/main/resources/org/opensearch/bootstrap/test-framework.policy @@ -157,4 +157,6 @@ grant { permission java.lang.RuntimePermission "reflectionFactoryAccess"; permission java.lang.RuntimePermission "accessClassInPackage.sun.reflect"; permission java.lang.reflect.ReflectPermission "suppressAccessChecks"; + permission java.util.PropertyPermission "opensearch.set.netty.runtime.available.processors", "write"; + permission java.net.SocketPermission "*", "accept,connect"; };