Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimise TransportNodesAction to not send DiscoveryNodes for NodeStat… #14749

Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [Unreleased 2.x]
### Added
- Optimize TransportNodesAction to not send DiscoveryNodes for NodeStats, NodesInfo and ClusterStats call ([14749](https://github.com/opensearch-project/OpenSearch/pull/14749))
Pranshu-S marked this conversation as resolved.
Show resolved Hide resolved
- Add fingerprint ingest processor ([#13724](https://github.com/opensearch-project/OpenSearch/pull/13724))
- [Remote Store] Rate limiter for remote store low priority uploads ([#14374](https://github.com/opensearch-project/OpenSearch/pull/14374/))
- Apply the date histogram rewrite optimization to range aggregation ([#13865](https://github.com/opensearch-project/OpenSearch/pull/13865))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ protected NodeInfo nodeOperation(NodeInfoRequest nodeRequest) {
*/
public static class NodeInfoRequest extends TransportRequest {

NodesInfoRequest request;
protected NodesInfoRequest request;

public NodeInfoRequest(StreamInput in) throws IOException {
super(in);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) {
*/
public static class NodeStatsRequest extends TransportRequest {

NodesStatsRequest request;
protected NodesStatsRequest request;

public NodeStatsRequest(StreamInput in) throws IOException {
super(in);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
*/
public static class ClusterStatsNodeRequest extends TransportRequest {

ClusterStatsRequest request;
protected ClusterStatsRequest request;

public ClusterStatsNodeRequest(StreamInput in) throws IOException {
super(in);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,14 @@ public abstract class BaseNodesRequest<Request extends BaseNodesRequest<Request>
* will be ignored and this will be used.
* */
private DiscoveryNode[] concreteNodes;

/**
* Since do not use the discovery nodes coming from the request in all code paths following a request extended off from
* BaseNodeRequest, we do not require it to sent around across all nodes.
*
* Setting default behavior as `true` but can be explicitly changed in requests that do not require.
*/
private boolean populateDiscoveryNodesInTransportRequest = true;
Pranshu-S marked this conversation as resolved.
Show resolved Hide resolved
private final TimeValue DEFAULT_TIMEOUT_SECS = TimeValue.timeValueSeconds(30);

private TimeValue timeout;
Expand Down Expand Up @@ -119,6 +127,14 @@ public void setConcreteNodes(DiscoveryNode[] concreteNodes) {
this.concreteNodes = concreteNodes;
}

public void populateDiscoveryNodesInTransportRequest(boolean value) {
populateDiscoveryNodesInTransportRequest = value;
}

public boolean populateDiscoveryNodesInTransportRequest() {
return populateDiscoveryNodesInTransportRequest;
}

@Override
public ActionRequestValidationException validate() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,15 @@ protected void resolveRequest(NodesRequest request, ClusterState clusterState) {
request.setConcreteNodes(Arrays.stream(nodesIds).map(clusterState.nodes()::get).toArray(DiscoveryNode[]::new));
}

/**
* Return the concrete nodes from the request node ids which will be later used for routing requests to nodes.
**/
Pranshu-S marked this conversation as resolved.
Show resolved Hide resolved
protected DiscoveryNode[] resolveConcreteNodes(NodesRequest request, ClusterState clusterState) {
Pranshu-S marked this conversation as resolved.
Show resolved Hide resolved
assert request.concreteNodes() == null : "request concreteNodes shouldn't be set";
String[] nodesIds = clusterState.nodes().resolveNodes(request.nodesIds());
return Arrays.stream(nodesIds).map(clusterState.nodes()::get).toArray(DiscoveryNode[]::new);
}

/**
* Get a backwards compatible transport action name
*/
Expand All @@ -226,6 +235,7 @@ class AsyncAction {
private final NodesRequest request;
private final ActionListener<NodesResponse> listener;
private final AtomicReferenceArray<Object> responses;
private final DiscoveryNode[] concreteNodes;
private final AtomicInteger counter = new AtomicInteger();
private final Task task;

Expand All @@ -234,14 +244,27 @@ class AsyncAction {
this.request = request;
this.listener = listener;
if (request.concreteNodes() == null) {
resolveRequest(request, clusterService.state());
assert request.concreteNodes() != null;
if (request.populateDiscoveryNodesInTransportRequest()) {
resolveRequest(request, clusterService.state());
assert request.concreteNodes() != null;
this.concreteNodes = null;
Pranshu-S marked this conversation as resolved.
Show resolved Hide resolved
} else {
this.concreteNodes = resolveConcreteNodes(request, clusterService.state());
assert request.concreteNodes() == null;
}
} else {
this.concreteNodes = null;
Pranshu-S marked this conversation as resolved.
Show resolved Hide resolved
}
if (request.concreteNodes() == null) {
assert concreteNodes != null;
this.responses = new AtomicReferenceArray<>(concreteNodes.length);
} else {
this.responses = new AtomicReferenceArray<>(request.concreteNodes().length);
Pranshu-S marked this conversation as resolved.
Show resolved Hide resolved
}
this.responses = new AtomicReferenceArray<>(request.concreteNodes().length);
}

void start() {
final DiscoveryNode[] nodes = request.concreteNodes();
final DiscoveryNode[] nodes = request.concreteNodes() != null ? request.concreteNodes() : concreteNodes;
Pranshu-S marked this conversation as resolved.
Show resolved Hide resolved
if (nodes.length == 0) {
// nothing to notify
threadPool.generic().execute(() -> listener.onResponse(newResponse(request, responses)));
Expand All @@ -260,7 +283,6 @@ void start() {
if (task != null) {
nodeRequest.setParentTask(clusterService.localNode().getId(), task.getId());
}

transportService.sendRequest(
node,
getTransportNodeAction(node),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public String getName() {
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
ClusterStatsRequest clusterStatsRequest = new ClusterStatsRequest().nodesIds(request.paramAsStringArray("nodeId", null));
clusterStatsRequest.timeout(request.param("timeout"));
clusterStatsRequest.populateDiscoveryNodesInTransportRequest(false);
return channel -> client.admin().cluster().clusterStats(clusterStatsRequest, new NodesResponseRestListener<>(channel));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
final NodesInfoRequest nodesInfoRequest = prepareRequest(request);
nodesInfoRequest.timeout(request.param("timeout"));
settingsFilter.addFilterSettingParams(request);

nodesInfoRequest.populateDiscoveryNodesInTransportRequest(false);
return channel -> client.admin().cluster().nodesInfo(nodesInfoRequest, new NodesResponseRestListener<>(channel));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
// If no levels are passed in this results in an empty array.
String[] levels = Strings.splitStringByCommaToArray(request.param("level"));
nodesStatsRequest.indices().setLevels(levels);
nodesStatsRequest.populateDiscoveryNodesInTransportRequest(false);

return channel -> client.admin().cluster().nodesStats(nodesStatsRequest, new NodesResponseRestListener<>(channel));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ public RestChannelConsumer doCatRequest(final RestRequest request, final NodeCli
public void processResponse(final ClusterStateResponse clusterStateResponse) {
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
nodesInfoRequest.timeout(request.param("timeout"));
nodesInfoRequest.populateDiscoveryNodesInTransportRequest(false);
nodesInfoRequest.clear()
.addMetrics(
NodesInfoRequest.Metric.JVM.metricName(),
Expand All @@ -137,6 +138,7 @@ public void processResponse(final ClusterStateResponse clusterStateResponse) {
public void processResponse(final NodesInfoResponse nodesInfoResponse) {
NodesStatsRequest nodesStatsRequest = new NodesStatsRequest();
nodesStatsRequest.timeout(request.param("timeout"));
nodesStatsRequest.populateDiscoveryNodesInTransportRequest(false);
nodesStatsRequest.clear()
.indices(true)
.addMetrics(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.support.nodes;

import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.opensearch.action.admin.cluster.stats.ClusterStatsRequest;
import org.opensearch.action.admin.cluster.stats.TransportClusterStatsAction;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.indices.IndicesService;
import org.opensearch.node.NodeService;
import org.opensearch.test.transport.CapturingTransport;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class TransportClusterStatsActionTests extends TransportNodesActionTests {

/**
* By default, we send discovery nodes list to each request that is sent across from the coordinator node. This
* behavior is asserted in this test.
*/
public void testDefaultBehavior() {
Pranshu-S marked this conversation as resolved.
Show resolved Hide resolved
ClusterStatsRequest request = new ClusterStatsRequest();
request.populateDiscoveryNodesInTransportRequest(true);
Map<String, List<MockClusterStatsNodeRequest>> combinedSentRequest = performNodesInfoAction(request);

assertNotNull(combinedSentRequest);
combinedSentRequest.forEach((node, capturedRequestList) -> {
assertNotNull(capturedRequestList);
capturedRequestList.forEach(sentRequest -> {
assertNotNull(sentRequest.getDiscoveryNodes());
assertEquals(sentRequest.getDiscoveryNodes().length, clusterService.state().nodes().getSize());
});
});
}

/**
* In the optimized ClusterStats Request, we do not send the DiscoveryNodes List to each node. This behavior is
* asserted in this test.
*/
public void testOptimizedBehavior() {
ClusterStatsRequest request = new ClusterStatsRequest();
request.populateDiscoveryNodesInTransportRequest(false);
Map<String, List<MockClusterStatsNodeRequest>> combinedSentRequest = performNodesInfoAction(request);

assertNotNull(combinedSentRequest);
combinedSentRequest.forEach((node, capturedRequestList) -> {
assertNotNull(capturedRequestList);
capturedRequestList.forEach(sentRequest -> { assertNull(sentRequest.getDiscoveryNodes()); });
});
}

private Map<String, List<MockClusterStatsNodeRequest>> performNodesInfoAction(ClusterStatsRequest request) {
TransportNodesAction action = getTestTransportClusterStatsAction();
PlainActionFuture<NodesStatsRequest> listener = new PlainActionFuture<>();
action.new AsyncAction(null, request, listener).start();
Map<String, List<CapturingTransport.CapturedRequest>> capturedRequests = transport.getCapturedRequestsByTargetNodeAndClear();
Map<String, List<MockClusterStatsNodeRequest>> combinedSentRequest = new HashMap<>();

capturedRequests.forEach((node, capturedRequestList) -> {
List<MockClusterStatsNodeRequest> sentRequestList = new ArrayList<>();

capturedRequestList.forEach(preSentRequest -> {
BytesStreamOutput out = new BytesStreamOutput();
try {
TransportClusterStatsAction.ClusterStatsNodeRequest clusterStatsNodeRequestFromCoordinator =
(TransportClusterStatsAction.ClusterStatsNodeRequest) preSentRequest.request;
clusterStatsNodeRequestFromCoordinator.writeTo(out);
StreamInput in = out.bytes().streamInput();
MockClusterStatsNodeRequest mockClusterStatsNodeRequest = new MockClusterStatsNodeRequest(in);
sentRequestList.add(mockClusterStatsNodeRequest);
} catch (IOException e) {
throw new RuntimeException(e);
}
});

combinedSentRequest.put(node, sentRequestList);
});

return combinedSentRequest;
}

public TestTransportClusterStatsAction getTestTransportClusterStatsAction() {
Pranshu-S marked this conversation as resolved.
Show resolved Hide resolved
return new TestTransportClusterStatsAction(
THREAD_POOL,
clusterService,
transportService,
nodeService,
indicesService,
new ActionFilters(Collections.emptySet())
);
}

private static class TestTransportClusterStatsAction extends TransportClusterStatsAction {
public TestTransportClusterStatsAction(
ThreadPool threadPool,
ClusterService clusterService,
TransportService transportService,
NodeService nodeService,
IndicesService indicesService,
ActionFilters actionFilters
) {
super(threadPool, clusterService, transportService, nodeService, indicesService, actionFilters);
}
}

private static class MockClusterStatsNodeRequest extends TransportClusterStatsAction.ClusterStatsNodeRequest {

public MockClusterStatsNodeRequest(StreamInput in) throws IOException {
super(in);
}

public DiscoveryNode[] getDiscoveryNodes() {
return this.request.concreteNodes();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.indices.IndicesService;
import org.opensearch.node.NodeService;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.transport.CapturingTransport;
Expand Down Expand Up @@ -76,11 +78,12 @@

public class TransportNodesActionTests extends OpenSearchTestCase {

private static ThreadPool THREAD_POOL;

private ClusterService clusterService;
private CapturingTransport transport;
private TransportService transportService;
protected static ThreadPool THREAD_POOL;
protected ClusterService clusterService;
protected CapturingTransport transport;
protected TransportService transportService;
protected NodeService nodeService;
protected IndicesService indicesService;

public void testRequestIsSentToEachNode() throws Exception {
TransportNodesAction action = getTestTransportNodesAction();
Expand Down
Loading
Loading