Skip to content

Commit

Permalink
optimize _cat/nodes api
Browse files Browse the repository at this point in the history
Signed-off-by: kkewwei <[email protected]>
  • Loading branch information
kkewwei committed Jul 22, 2024
1 parent b980b12 commit fa803c8
Show file tree
Hide file tree
Showing 3 changed files with 229 additions and 34 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.http;

import org.apache.hc.core5.http.ParseException;
import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.junit.BeforeClass;
import org.opensearch.client.Request;
import org.opensearch.client.Response;
import org.opensearch.client.ResponseException;
import org.opensearch.client.RestClient;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.io.IOException;

import static org.apache.hc.core5.http.HttpStatus.SC_OK;
import static org.hamcrest.Matchers.containsString;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 5, numClientNodes = 0)
public class HttpCatIT extends HttpSmokeTestCase {

@BeforeClass
public static void doNotSetAvailableProcessors() {
System.setProperty("opensearch.set.netty.runtime.available.processors", "false");
}

public void testdoCatRequest() throws IOException, ParseException {
try (RestClient restClient = getRestClient()) {
int nodesCount = restClient.getNodes().size();
for (int i = 0; i < 20; i++) {
Request nodesRequest = new Request("GET", "/_cat/nodes?timeout=" + randomInt(300) + "ms");
try {
Response response = restClient.performRequest(nodesRequest);
assertEquals(SC_OK, response.getStatusLine().getStatusCode());
String result = EntityUtils.toString(response.getEntity());
String[] NodeInfos = result.split("\n");
assertEquals(nodesCount, NodeInfos.length);
} catch (ResponseException e) {
// it means that it costs too long to get ClusterState from the master.
assertThat(e.getMessage(), containsString("costs too long to get ClusterState from the master"));
}
}
}
}

}

208 changes: 174 additions & 34 deletions server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

package org.opensearch.rest.action.cat;

import org.opensearch.OpenSearchTimeoutException;
import org.opensearch.action.FailedNodeException;
import org.opensearch.action.admin.cluster.node.info.NodeInfo;
import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest;
import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse;
Expand All @@ -47,6 +49,8 @@
import org.opensearch.common.Table;
import org.opensearch.common.logging.DeprecationLogger;
import org.opensearch.common.network.NetworkAddress;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.transport.TransportAddress;
import org.opensearch.core.common.unit.ByteSizeValue;
Expand All @@ -68,15 +72,22 @@
import org.opensearch.monitor.os.OsStats;
import org.opensearch.monitor.process.ProcessInfo;
import org.opensearch.monitor.process.ProcessStats;
import org.opensearch.rest.RestChannel;
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.RestResponse;
import org.opensearch.rest.action.RestActionListener;
import org.opensearch.rest.action.RestResponseListener;
import org.opensearch.script.ScriptStats;
import org.opensearch.search.suggest.completion.CompletionStats;
import org.opensearch.threadpool.ThreadPool;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static java.util.Collections.singletonList;
Expand All @@ -88,6 +99,7 @@
* @opensearch.api
*/
public class RestNodesAction extends AbstractCatAction {
public static final long TIMEOUT_THRESHOLD_MILLIS = 5;
private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(RestNodesAction.class);
static final String LOCAL_DEPRECATED_MESSAGE = "Deprecated parameter [local] used. This parameter does not cause this API to act "
+ "locally, and should not be used. It will be unsupported in version 8.0.";
Expand Down Expand Up @@ -120,47 +132,175 @@ public RestChannelConsumer doCatRequest(final RestRequest request, final NodeCli
);
parseDeprecatedMasterTimeoutParameter(clusterStateRequest, request, deprecationLogger, getName());
final boolean fullId = request.paramAsBoolean("full_id", false);
return channel -> client.admin().cluster().state(clusterStateRequest, new RestActionListener<ClusterStateResponse>(channel) {
ThreadPool threadPool = client.admin().cluster().threadPool();
long beginTime = threadPool.relativeTimeInMillis();
final long timeout = request.hasParam("timeout")
? TimeValue.parseTimeValue(request.param("timeout"), "timeout").millis()
: Long.MAX_VALUE;
return channel -> client.admin().cluster().state(clusterStateRequest, new RestActionListener<>(channel) {
@Override
public void processResponse(final ClusterStateResponse clusterStateResponse) {
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
nodesInfoRequest.timeout(request.param("timeout"));
nodesInfoRequest.clear()
.addMetrics(
NodesInfoRequest.Metric.JVM.metricName(),
NodesInfoRequest.Metric.OS.metricName(),
NodesInfoRequest.Metric.PROCESS.metricName(),
NodesInfoRequest.Metric.HTTP.metricName()
long leftTime = timeout - threadPool.relativeTimeInMillis() + beginTime;
if (leftTime < TIMEOUT_THRESHOLD_MILLIS) {
onFailure(
new OpenSearchTimeoutException(
"costs too long to get ClusterState from the master:"
+ clusterStateResponse.getState().nodes().getMasterNode().getName()
)
);
client.admin().cluster().nodesInfo(nodesInfoRequest, new RestActionListener<NodesInfoResponse>(channel) {
@Override
public void processResponse(final NodesInfoResponse nodesInfoResponse) {
NodesStatsRequest nodesStatsRequest = new NodesStatsRequest();
nodesStatsRequest.timeout(request.param("timeout"));
nodesStatsRequest.clear()
.indices(true)
.addMetrics(
NodesStatsRequest.Metric.JVM.metricName(),
NodesStatsRequest.Metric.OS.metricName(),
NodesStatsRequest.Metric.FS.metricName(),
NodesStatsRequest.Metric.PROCESS.metricName(),
NodesStatsRequest.Metric.SCRIPT.metricName()
return;
}
String[] nodeIds = clusterStateResponse.getState().nodes().resolveNodes(null);
CountDownLatch nodesCount = new CountDownLatch(nodeIds.length);
ConcurrentMap<String, NodeInfo> successNodeInfos = new ConcurrentHashMap<>(nodeIds.length);
ConcurrentMap<String, FailedNodeException> failNodeInfos = new ConcurrentHashMap<>(nodeIds.length);
ConcurrentMap<String, NodeStats> successNodeStats = new ConcurrentHashMap<>(nodeIds.length);
ConcurrentMap<String, FailedNodeException> failNodeStats = new ConcurrentHashMap<>(nodeIds.length);
for (String nodeId : nodeIds) {
NodesInfoRequest nodesInfoRequest = createNodesInfoRequest(timeout, leftTime, nodeId);
client.admin().cluster().nodesInfo(nodesInfoRequest, new ActionListener<>() {
@Override
public void onResponse(NodesInfoResponse nodesInfoResponse) {
assert nodesInfoResponse.getNodes().size() + nodesInfoResponse.failures().size() == 1;
NodesStatsRequest nodesStatsRequest = checkAndCreateNodesStatsRequest(
nodesInfoResponse.failures(),
timeout,
beginTime,
nodeId,
this::onFailure,
threadPool::relativeTimeInMillis,
clusterStateResponse.getState().nodes().get(nodeId).getName()
);
client.admin().cluster().nodesStats(nodesStatsRequest, new RestResponseListener<NodesStatsResponse>(channel) {
@Override
public RestResponse buildResponse(NodesStatsResponse nodesStatsResponse) throws Exception {
return RestTable.buildResponse(
buildTable(fullId, request, clusterStateResponse, nodesInfoResponse, nodesStatsResponse),
channel
);
if (nodesStatsRequest == null) {
return;
}
});
}
});
successNodeInfos.put(nodeId, nodesInfoResponse.getNodes().get(0));
client.admin().cluster().nodesStats(nodesStatsRequest, ActionListener.runAfter(new ActionListener<>() {
@Override
public void onResponse(NodesStatsResponse nodesStatsResponse) {
assert nodesStatsResponse.getNodes().size() + nodesStatsResponse.failures().size() == 1;
if (nodesStatsResponse.getNodes().size() == 1) {
successNodeStats.put(nodeId, nodesStatsResponse.getNodes().get(0));
} else {
failNodeStats.put(nodeId, nodesStatsResponse.failures().get(0));
}
}

@Override
public void onFailure(Exception e) {
assert e instanceof FailedNodeException;
failNodeStats.put(nodeId, (FailedNodeException) e);
}
}, nodesCount::countDown));
}

@Override
public void onFailure(Exception e) {
assert e instanceof FailedNodeException;
failNodeInfos.put(nodeId, (FailedNodeException) e);
nodesCount.countDown();
}
});
}

try {
nodesCount.await();
sendResponse(
channel,
clusterStateResponse,
request,
fullId,
successNodeInfos.values(),
failNodeInfos.values(),
successNodeStats.values(),
failNodeStats.values()
);
} catch (Exception e) {
e.addSuppressed(e);
logger.error("failed to send failure response", e);
}
}
});
}

private NodesInfoRequest createNodesInfoRequest(long timeout, long leftTime, String nodeId) {
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
if (timeout != Long.MAX_VALUE) {
nodesInfoRequest.timeout(TimeValue.timeValueMillis(leftTime));
}
nodesInfoRequest.clear()
.nodesIds(nodeId)
.addMetrics(
NodesInfoRequest.Metric.JVM.metricName(),
NodesInfoRequest.Metric.OS.metricName(),
NodesInfoRequest.Metric.PROCESS.metricName(),
NodesInfoRequest.Metric.HTTP.metricName()
);
return nodesInfoRequest;
}

private NodesStatsRequest checkAndCreateNodesStatsRequest(
List<FailedNodeException> failedNodeExceptions,
long timeout,
long beginTime,
String nodeId,
Consumer<FailedNodeException> failedConsumer,
Supplier<Long> currentTimeSupplier,
String nodeName
) {
if (failedNodeExceptions.isEmpty() == false) {
failedConsumer.accept(failedNodeExceptions.get(0));
return null;
}
long leftTime = timeout - currentTimeSupplier.get() + beginTime;
if (leftTime < TIMEOUT_THRESHOLD_MILLIS) {
failedConsumer.accept(
new FailedNodeException(nodeId, "There is not enough time to obtain nodesInfo metric from " + nodeName, null)
);
return null;
}
NodesStatsRequest nodesStatsRequest = new NodesStatsRequest();
if (timeout != Long.MAX_VALUE) {
nodesStatsRequest.timeout(TimeValue.timeValueMillis(leftTime));
}
nodesStatsRequest.clear()
.nodesIds(nodeId)
.indices(true)
.addMetrics(
NodesStatsRequest.Metric.JVM.metricName(),
NodesStatsRequest.Metric.OS.metricName(),
NodesStatsRequest.Metric.FS.metricName(),
NodesStatsRequest.Metric.PROCESS.metricName(),
NodesStatsRequest.Metric.SCRIPT.metricName()
);
return nodesStatsRequest;
}

private void sendResponse(
RestChannel channel,
ClusterStateResponse clusterStateResponse,
RestRequest request,
boolean fullId,
Collection<NodeInfo> successNodeInfos,
Collection<FailedNodeException> failNodeInfos,
Collection<NodeStats> successNodeStats,
Collection<FailedNodeException> failNodeStats
) throws Exception {
NodesInfoResponse nodesInfoResponse = new NodesInfoResponse(
clusterStateResponse.getClusterName(),
new ArrayList<>(successNodeInfos),
new ArrayList<>(failNodeInfos)
);
NodesStatsResponse nodesStatsResponse = new NodesStatsResponse(
clusterStateResponse.getClusterName(),
new ArrayList<>(successNodeStats),
new ArrayList<>(failNodeStats)
);
channel.sendResponse(
RestTable.buildResponse(buildTable(fullId, request, clusterStateResponse, nodesInfoResponse, nodesStatsResponse), channel)
);
}

@Override
protected Table getTableWithHeader(final RestRequest request) {
Table table = new Table();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,4 +157,6 @@ grant {
permission java.lang.RuntimePermission "reflectionFactoryAccess";
permission java.lang.RuntimePermission "accessClassInPackage.sun.reflect";
permission java.lang.reflect.ReflectPermission "suppressAccessChecks";
permission java.util.PropertyPermission "opensearch.set.netty.runtime.available.processors", "write";
permission java.net.SocketPermission "*", "accept,connect";
};

0 comments on commit fa803c8

Please sign in to comment.