From 4f6b6ed4802ae9549feff3016d41ae516d1c21b7 Mon Sep 17 00:00:00 2001 From: Pranshu Shukla Date: Thu, 11 Jul 2024 19:56:03 +0530 Subject: [PATCH] Optimize TransportNodesAction to not send DiscoveryNodes for NodeStats, NodesInfo and ClusterStats call Signed-off-by: Pranshu Shukla --- .../node/info/TransportNodesInfoAction.java | 2 +- .../node/stats/TransportNodesStatsAction.java | 2 +- .../stats/TransportClusterStatsAction.java | 2 +- .../support/nodes/BaseNodesRequest.java | 11 ++ .../support/nodes/TransportNodesAction.java | 35 ++++- .../admin/cluster/RestClusterStatsAction.java | 1 + .../admin/cluster/RestNodesInfoAction.java | 2 +- .../admin/cluster/RestNodesStatsAction.java | 1 + .../rest/action/cat/RestNodesAction.java | 2 + .../TransportClusterStatsActionTests.java | 124 ++++++++++++++++++ .../nodes/TransportNodesActionTests.java | 13 +- .../nodes/TransportNodesInfoActionTests.java | 120 +++++++++++++++++ .../nodes/TransportNodesStatsActionTests.java | 120 +++++++++++++++++ 13 files changed, 421 insertions(+), 14 deletions(-) create mode 100644 server/src/test/java/org/opensearch/action/support/nodes/TransportClusterStatsActionTests.java create mode 100644 server/src/test/java/org/opensearch/action/support/nodes/TransportNodesInfoActionTests.java create mode 100644 server/src/test/java/org/opensearch/action/support/nodes/TransportNodesStatsActionTests.java diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/info/TransportNodesInfoAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/info/TransportNodesInfoAction.java index 2c4f8522a5a5c..dda54cce334ec 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/info/TransportNodesInfoAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/info/TransportNodesInfoAction.java @@ -129,7 +129,7 @@ protected NodeInfo nodeOperation(NodeInfoRequest nodeRequest) { */ public static class NodeInfoRequest extends TransportRequest { - NodesInfoRequest request; + protected NodesInfoRequest request; public NodeInfoRequest(StreamInput in) throws IOException { super(in); diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java index 2e93e5e7841cb..2c808adc97c7a 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java @@ -140,7 +140,7 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) { */ public static class NodeStatsRequest extends TransportRequest { - NodesStatsRequest request; + protected NodesStatsRequest request; public NodeStatsRequest(StreamInput in) throws IOException { super(in); diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java index e4f483f796f44..c7d03596a2a36 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java @@ -223,7 +223,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq */ public static class ClusterStatsNodeRequest extends TransportRequest { - ClusterStatsRequest request; + protected ClusterStatsRequest request; public ClusterStatsNodeRequest(StreamInput in) throws IOException { super(in); diff --git a/server/src/main/java/org/opensearch/action/support/nodes/BaseNodesRequest.java b/server/src/main/java/org/opensearch/action/support/nodes/BaseNodesRequest.java index 4d54ce51c923c..628721e18e08f 100644 --- a/server/src/main/java/org/opensearch/action/support/nodes/BaseNodesRequest.java +++ b/server/src/main/java/org/opensearch/action/support/nodes/BaseNodesRequest.java @@ -65,6 +65,11 @@ public abstract class BaseNodesRequest * will be ignored and this will be used. * */ private DiscoveryNode[] concreteNodes; + + /** + * since in multiple code paths we do not use the discovery nodes coming from the request, we do not require it to sent around across all nodes + */ + private boolean populateDiscoveryNodesInTransportRequest = true; private final TimeValue DEFAULT_TIMEOUT_SECS = TimeValue.timeValueSeconds(30); private TimeValue timeout; @@ -119,6 +124,12 @@ public void setConcreteNodes(DiscoveryNode[] concreteNodes) { this.concreteNodes = concreteNodes; } + public void populateDiscoveryNodesInTransportRequest(boolean value) { + populateDiscoveryNodesInTransportRequest = value; + } + public boolean populateDiscoveryNodesInTransportRequest() { + return populateDiscoveryNodesInTransportRequest; + } @Override public ActionRequestValidationException validate() { return null; diff --git a/server/src/main/java/org/opensearch/action/support/nodes/TransportNodesAction.java b/server/src/main/java/org/opensearch/action/support/nodes/TransportNodesAction.java index 9a1a28dd70636..689487557ba21 100644 --- a/server/src/main/java/org/opensearch/action/support/nodes/TransportNodesAction.java +++ b/server/src/main/java/org/opensearch/action/support/nodes/TransportNodesAction.java @@ -40,6 +40,7 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.Setting; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.Writeable; @@ -57,10 +58,12 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReferenceArray; +import java.util.function.Function; /** * Base action class for transport nodes @@ -209,6 +212,15 @@ protected void resolveRequest(NodesRequest request, ClusterState clusterState) { request.setConcreteNodes(Arrays.stream(nodesIds).map(clusterState.nodes()::get).toArray(DiscoveryNode[]::new)); } + /** + * populate the concrete nodes from the request node ids + **/ + protected DiscoveryNode[] resolveConcreteNodes(NodesRequest request, ClusterState clusterState) { + assert request.concreteNodes() == null : "request concreteNodes shouldn't be set"; + String[] nodesIds = clusterState.nodes().resolveNodes(request.nodesIds()); + return Arrays.stream(nodesIds).map(clusterState.nodes()::get).toArray(DiscoveryNode[]::new); + } + /** * Get a backwards compatible transport action name */ @@ -226,6 +238,7 @@ class AsyncAction { private final NodesRequest request; private final ActionListener listener; private final AtomicReferenceArray responses; + private final DiscoveryNode[] concreteNodes; private final AtomicInteger counter = new AtomicInteger(); private final Task task; @@ -234,14 +247,27 @@ class AsyncAction { this.request = request; this.listener = listener; if (request.concreteNodes() == null) { - resolveRequest(request, clusterService.state()); - assert request.concreteNodes() != null; + if (request.populateDiscoveryNodesInTransportRequest()) { + resolveRequest(request, clusterService.state()); + assert request.concreteNodes() != null; + this.concreteNodes = null; + } else { + this.concreteNodes = resolveConcreteNodes(request, clusterService.state()); + assert request.concreteNodes() == null; + } + } else { + this.concreteNodes = null; + } + if (request.concreteNodes() == null) { + assert concreteNodes != null; + this.responses = new AtomicReferenceArray<>(concreteNodes.length); + } else { + this.responses = new AtomicReferenceArray<>(request.concreteNodes().length); } - this.responses = new AtomicReferenceArray<>(request.concreteNodes().length); } void start() { - final DiscoveryNode[] nodes = request.concreteNodes(); + final DiscoveryNode[] nodes = request.concreteNodes() != null ? request.concreteNodes() : concreteNodes; if (nodes.length == 0) { // nothing to notify threadPool.generic().execute(() -> listener.onResponse(newResponse(request, responses))); @@ -260,7 +286,6 @@ void start() { if (task != null) { nodeRequest.setParentTask(clusterService.localNode().getId(), task.getId()); } - transportService.sendRequest( node, getTransportNodeAction(node), diff --git a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterStatsAction.java b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterStatsAction.java index 0766e838210fa..0ea651ba8b7d6 100644 --- a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterStatsAction.java +++ b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterStatsAction.java @@ -66,6 +66,7 @@ public String getName() { public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { ClusterStatsRequest clusterStatsRequest = new ClusterStatsRequest().nodesIds(request.paramAsStringArray("nodeId", null)); clusterStatsRequest.timeout(request.param("timeout")); + clusterStatsRequest.populateDiscoveryNodesInTransportRequest(false); return channel -> client.admin().cluster().clusterStats(clusterStatsRequest, new NodesResponseRestListener<>(channel)); } diff --git a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesInfoAction.java b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesInfoAction.java index 3b83bf9d6f68c..e5e94f1cefc3b 100644 --- a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesInfoAction.java +++ b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesInfoAction.java @@ -88,7 +88,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC final NodesInfoRequest nodesInfoRequest = prepareRequest(request); nodesInfoRequest.timeout(request.param("timeout")); settingsFilter.addFilterSettingParams(request); - + nodesInfoRequest.populateDiscoveryNodesInTransportRequest(false); return channel -> client.admin().cluster().nodesInfo(nodesInfoRequest, new NodesResponseRestListener<>(channel)); } diff --git a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesStatsAction.java b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesStatsAction.java index 267bfde576dec..a277b08e7a34c 100644 --- a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesStatsAction.java +++ b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestNodesStatsAction.java @@ -232,6 +232,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC // If no levels are passed in this results in an empty array. String[] levels = Strings.splitStringByCommaToArray(request.param("level")); nodesStatsRequest.indices().setLevels(levels); + nodesStatsRequest.populateDiscoveryNodesInTransportRequest(false); return channel -> client.admin().cluster().nodesStats(nodesStatsRequest, new NodesResponseRestListener<>(channel)); } diff --git a/server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java b/server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java index e11012a23fce7..81c2e3b533d62 100644 --- a/server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java +++ b/server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java @@ -125,6 +125,7 @@ public RestChannelConsumer doCatRequest(final RestRequest request, final NodeCli public void processResponse(final ClusterStateResponse clusterStateResponse) { NodesInfoRequest nodesInfoRequest = new NodesInfoRequest(); nodesInfoRequest.timeout(request.param("timeout")); + nodesInfoRequest.populateDiscoveryNodesInTransportRequest(false); nodesInfoRequest.clear() .addMetrics( NodesInfoRequest.Metric.JVM.metricName(), @@ -137,6 +138,7 @@ public void processResponse(final ClusterStateResponse clusterStateResponse) { public void processResponse(final NodesInfoResponse nodesInfoResponse) { NodesStatsRequest nodesStatsRequest = new NodesStatsRequest(); nodesStatsRequest.timeout(request.param("timeout")); + nodesStatsRequest.populateDiscoveryNodesInTransportRequest(false); nodesStatsRequest.clear() .indices(true) .addMetrics( diff --git a/server/src/test/java/org/opensearch/action/support/nodes/TransportClusterStatsActionTests.java b/server/src/test/java/org/opensearch/action/support/nodes/TransportClusterStatsActionTests.java new file mode 100644 index 0000000000000..22df7e1c1dc9d --- /dev/null +++ b/server/src/test/java/org/opensearch/action/support/nodes/TransportClusterStatsActionTests.java @@ -0,0 +1,124 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.support.nodes; + +import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest; +import org.opensearch.action.admin.cluster.node.info.TransportNodesInfoAction; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest; +import org.opensearch.action.admin.cluster.stats.ClusterStatsRequest; +import org.opensearch.action.admin.cluster.stats.TransportClusterStatsAction; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.PlainActionFuture; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.indices.IndicesService; +import org.opensearch.node.NodeService; +import org.opensearch.test.transport.CapturingTransport; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; + +import java.io.IOException; +import java.util.*; + +public class TransportClusterStatsActionTests extends TransportNodesActionTests { + + public void testOptimizedBehavior() { + ClusterStatsRequest request = new ClusterStatsRequest(); + request.populateDiscoveryNodesInTransportRequest(false); + Map> combinedSentRequest = performNodesInfoAction(request); + + assertNotNull(combinedSentRequest); + combinedSentRequest.forEach((node, capturedRequestList) -> { + assertNotNull(capturedRequestList); + capturedRequestList.forEach(sentRequest -> { + assertNull(sentRequest.getDiscoveryNodes()); + }); + }); + } + + public void testDefaultBehavior() { + ClusterStatsRequest request = new ClusterStatsRequest(); + request.populateDiscoveryNodesInTransportRequest(true); + Map> combinedSentRequest = performNodesInfoAction(request); + + assertNotNull(combinedSentRequest); + combinedSentRequest.forEach((node, capturedRequestList) -> { + assertNotNull(capturedRequestList); + capturedRequestList.forEach(sentRequest -> { + assertNotNull(sentRequest.getDiscoveryNodes()); + assertEquals(sentRequest.getDiscoveryNodes().length, clusterService.state().nodes().getSize() ); + }); + }); + } + + private Map> performNodesInfoAction(ClusterStatsRequest request) { + TransportNodesAction action = getTestTransportClusterStatsAction(); + PlainActionFuture listener = new PlainActionFuture<>(); + action.new AsyncAction(null, request, listener).start(); + Map> capturedRequests = transport.getCapturedRequestsByTargetNodeAndClear(); + Map> combinedSentRequest = new HashMap<>(); + + capturedRequests.forEach((node, capturedRequestList) -> { + List sentRequestList = new ArrayList<>(); + + capturedRequestList.forEach(preSentRequest -> { + BytesStreamOutput out = new BytesStreamOutput(); + try { + TransportClusterStatsAction.ClusterStatsNodeRequest clusterStatsNodeRequestFromCoordinator= (TransportClusterStatsAction.ClusterStatsNodeRequest) preSentRequest.request; + clusterStatsNodeRequestFromCoordinator.writeTo(out); + StreamInput in = out.bytes().streamInput(); + MockClusterStatsNodeRequest mockClusterStatsNodeRequest = new MockClusterStatsNodeRequest(in); + sentRequestList.add(mockClusterStatsNodeRequest); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + + combinedSentRequest.put(node, sentRequestList); + }); + + return combinedSentRequest; + } + + public TestTransportClusterStatsAction getTestTransportClusterStatsAction() { + return new TestTransportClusterStatsAction( + THREAD_POOL, + clusterService, + transportService, + nodeService, + indicesService, + new ActionFilters(Collections.emptySet()) + ); + } + + private static class TestTransportClusterStatsAction extends TransportClusterStatsAction { + public TestTransportClusterStatsAction( + ThreadPool threadPool, + ClusterService clusterService, + TransportService transportService, + NodeService nodeService, + IndicesService indicesService, + ActionFilters actionFilters) { + super(threadPool, clusterService, transportService, nodeService, indicesService, actionFilters); + } + } + + private static class MockClusterStatsNodeRequest extends TransportClusterStatsAction.ClusterStatsNodeRequest{ + + public MockClusterStatsNodeRequest(StreamInput in) throws IOException { + super(in); + } + + public DiscoveryNode[] getDiscoveryNodes() { + return this.request.concreteNodes(); + } + } +} diff --git a/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesActionTests.java b/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesActionTests.java index 445934b0ccdfd..7e968aa8fb199 100644 --- a/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesActionTests.java +++ b/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesActionTests.java @@ -46,6 +46,8 @@ import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.indices.IndicesService; +import org.opensearch.node.NodeService; import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.transport.CapturingTransport; @@ -76,11 +78,12 @@ public class TransportNodesActionTests extends OpenSearchTestCase { - private static ThreadPool THREAD_POOL; - - private ClusterService clusterService; - private CapturingTransport transport; - private TransportService transportService; + protected static ThreadPool THREAD_POOL; + protected ClusterService clusterService; + protected CapturingTransport transport; + protected TransportService transportService; + protected NodeService nodeService; + protected IndicesService indicesService; public void testRequestIsSentToEachNode() throws Exception { TransportNodesAction action = getTestTransportNodesAction(); diff --git a/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesInfoActionTests.java b/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesInfoActionTests.java new file mode 100644 index 0000000000000..12b491f5de39b --- /dev/null +++ b/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesInfoActionTests.java @@ -0,0 +1,120 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.support.nodes; + +import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest; +import org.opensearch.action.admin.cluster.node.info.TransportNodesInfoAction; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest; +import org.opensearch.action.admin.cluster.node.stats.TransportNodesStatsAction; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.PlainActionFuture; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.node.NodeService; +import org.opensearch.test.transport.CapturingTransport; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; + +import java.io.IOException; +import java.util.*; + +public class TransportNodesInfoActionTests extends TransportNodesActionTests { + + public void testOptimizedBehavior() { + NodesInfoRequest request = new NodesInfoRequest(); + request.populateDiscoveryNodesInTransportRequest(false); + Map> combinedSentRequest = performNodesInfoAction(request); + + assertNotNull(combinedSentRequest); + combinedSentRequest.forEach((node, capturedRequestList) -> { + assertNotNull(capturedRequestList); + capturedRequestList.forEach(sentRequest -> { + assertNull(sentRequest.getDiscoveryNodes()); + }); + }); + } + + public void testDefaultBehavior() { + NodesInfoRequest request = new NodesInfoRequest(); + request.populateDiscoveryNodesInTransportRequest(true); + Map> combinedSentRequest = performNodesInfoAction(request); + + assertNotNull(combinedSentRequest); + combinedSentRequest.forEach((node, capturedRequestList) -> { + assertNotNull(capturedRequestList); + capturedRequestList.forEach(sentRequest -> { + assertNotNull(sentRequest.getDiscoveryNodes()); + assertEquals(sentRequest.getDiscoveryNodes().length, clusterService.state().nodes().getSize() ); + }); + }); + } + + private Map> performNodesInfoAction(NodesInfoRequest request) { + TransportNodesAction action = getTestTransportNodesInfoAction(); + PlainActionFuture listener = new PlainActionFuture<>(); + action.new AsyncAction(null, request, listener).start(); + Map> capturedRequests = transport.getCapturedRequestsByTargetNodeAndClear(); + Map> combinedSentRequest = new HashMap<>(); + + capturedRequests.forEach((node, capturedRequestList) -> { + List sentRequestList = new ArrayList<>(); + + capturedRequestList.forEach(preSentRequest -> { + BytesStreamOutput out = new BytesStreamOutput(); + try { + TransportNodesInfoAction.NodeInfoRequest nodesInfoRequestFromCoordinator= (TransportNodesInfoAction.NodeInfoRequest) preSentRequest.request; + nodesInfoRequestFromCoordinator.writeTo(out); + StreamInput in = out.bytes().streamInput(); + MockNodesInfoRequest nodesStatsRequest = new MockNodesInfoRequest(in); + sentRequestList.add(nodesStatsRequest); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + + combinedSentRequest.put(node, sentRequestList); + }); + + return combinedSentRequest; + } + + public TestTransportNodesInfoAction getTestTransportNodesInfoAction() { + return new TestTransportNodesInfoAction( + THREAD_POOL, + clusterService, + transportService, + nodeService, + new ActionFilters(Collections.emptySet()) + ); + } + + private static class TestTransportNodesInfoAction extends TransportNodesInfoAction { + public TestTransportNodesInfoAction( + ThreadPool threadPool, + ClusterService clusterService, + TransportService transportService, + NodeService nodeService, + ActionFilters actionFilters) { + super(threadPool, clusterService, transportService, nodeService, actionFilters); + } + } + + private static class MockNodesInfoRequest extends TransportNodesInfoAction.NodeInfoRequest{ + + public MockNodesInfoRequest(StreamInput in) throws IOException { + super(in); + } + + public DiscoveryNode[] getDiscoveryNodes() { + return this.request.concreteNodes(); + } + } +} diff --git a/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesStatsActionTests.java b/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesStatsActionTests.java new file mode 100644 index 0000000000000..7666310f7c10e --- /dev/null +++ b/server/src/test/java/org/opensearch/action/support/nodes/TransportNodesStatsActionTests.java @@ -0,0 +1,120 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.support.nodes; + +import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest; +import org.opensearch.action.admin.cluster.node.stats.TransportNodesStatsAction; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.PlainActionFuture; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.node.NodeService; +import org.opensearch.test.transport.CapturingTransport; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; + +import java.io.IOException; +import java.util.*; + +public class TransportNodesStatsActionTests extends TransportNodesActionTests { + + public void testOptimizedBehavior() { + NodesStatsRequest request = new NodesStatsRequest(); + request.populateDiscoveryNodesInTransportRequest(false); + Map> combinedSentRequest = performNodesStatsAction(request); + + assertNotNull(combinedSentRequest); + combinedSentRequest.forEach((node, capturedRequestList) -> { + assertNotNull(capturedRequestList); + capturedRequestList.forEach(sentRequest -> { + assertNull(sentRequest.getDiscoveryNodes()); + }); + }); + } + + public void testDefaultBehavior() { + NodesStatsRequest request = new NodesStatsRequest(); + request.populateDiscoveryNodesInTransportRequest(true); + Map> combinedSentRequest = performNodesStatsAction(request); + + assertNotNull(combinedSentRequest); + combinedSentRequest.forEach((node, capturedRequestList) -> { + assertNotNull(capturedRequestList); + capturedRequestList.forEach(sentRequest -> { + assertNotNull(sentRequest.getDiscoveryNodes()); + assertEquals(sentRequest.getDiscoveryNodes().length, clusterService.state().nodes().getSize() ); + }); + }); + } + + private Map> performNodesStatsAction(NodesStatsRequest request) { + TransportNodesAction action = getTestTransportNodesStatsAction(); + PlainActionFuture listener = new PlainActionFuture<>(); + action.new AsyncAction(null, request, listener).start(); + Map> capturedRequests = transport.getCapturedRequestsByTargetNodeAndClear(); + Map> combinedSentRequest = new HashMap<>(); + + capturedRequests.forEach((node, capturedRequestList) -> { + List sentRequestList = new ArrayList<>(); + + capturedRequestList.forEach(preSentRequest -> { + BytesStreamOutput out = new BytesStreamOutput(); + try { + TransportNodesStatsAction.NodeStatsRequest nodesStatsRequestFromCoordinator= (TransportNodesStatsAction.NodeStatsRequest) preSentRequest.request; + nodesStatsRequestFromCoordinator.writeTo(out); + StreamInput in = out.bytes().streamInput(); + MockNodeStatsRequest nodesStatsRequest = new MockNodeStatsRequest(in); + sentRequestList.add(nodesStatsRequest); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + + combinedSentRequest.put(node, sentRequestList); + }); + + return combinedSentRequest; + } + + + + public TestTransportNodesStatsAction getTestTransportNodesStatsAction() { + return new TestTransportNodesStatsAction( + THREAD_POOL, + clusterService, + transportService, + nodeService, + new ActionFilters(Collections.emptySet()) + ); + } + + private static class TestTransportNodesStatsAction extends TransportNodesStatsAction { + public TestTransportNodesStatsAction( + ThreadPool threadPool, + ClusterService clusterService, + TransportService transportService, + NodeService nodeService, + ActionFilters actionFilters) { + super(threadPool, clusterService, transportService, nodeService, actionFilters); + } + } + + private static class MockNodeStatsRequest extends TransportNodesStatsAction.NodeStatsRequest{ + + public MockNodeStatsRequest(StreamInput in) throws IOException { + super(in); + } + + public DiscoveryNode[] getDiscoveryNodes() { + return this.request.concreteNodes(); + } + } +}