diff --git a/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java b/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java index aa1fe695ecc12..1523f44ead5f9 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java @@ -40,6 +40,7 @@ import org.opensearch.test.disruption.NetworkDisruption; import org.opensearch.test.transport.MockTransportService; import org.junit.Assert; +import org.opensearch.transport.TransportConnectionFailureStats; import java.io.IOException; import java.util.ArrayList; @@ -1462,4 +1463,86 @@ public void testWeightedRoutingFailOpenStats() throws Exception { WeightedRoutingStats.getInstance().resetFailOpenCount(); } + public void testTransportConnectionFailureStats() throws Exception { + + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .build(); + + int nodeCountPerAZ = 2; + Map> nodeMap = setupCluster(nodeCountPerAZ, commonSettings); + + int numShards = 10; + int numReplicas = 2; + setUpIndexing(numShards, numReplicas); + + logger.info("--> creating network partition disruption"); + final String clusterManagerNode1 = internalCluster().getClusterManagerName(); + Set nodesInOneSide = Stream.of(nodeMap.get("b").get(0)).collect(Collectors.toCollection(HashSet::new)); + Set nodesInOtherSide = Stream.of(nodeMap.get("a").get(0)).collect(Collectors.toCollection(HashSet::new)); + + NetworkDisruption networkDisruption = new NetworkDisruption( + new NetworkDisruption.TwoPartitions(nodesInOneSide, nodesInOtherSide), + NetworkDisruption.UNRESPONSIVE + ); + internalCluster().setDisruptionScheme(networkDisruption); + + DiscoveryNodes dataNodes = internalCluster().clusterService().state().nodes(); + + Map nodeIDMap = new HashMap<>(); + for (DiscoveryNode node : dataNodes) { + nodeIDMap.put(node.getName(), node.getId()); + } + +// List shardInNodeA = internalCluster().clusterService() +// .state() +// .getRoutingNodes() +// .node(nodeIDMap.get(nodeMap.get("a").get(0))) +// .shardsWithState(ShardRoutingState.STARTED); +// +// List shardInNodeC = internalCluster().clusterService() +// .state() +// .getRoutingNodes() +// .node(nodeIDMap.get(nodeMap.get("c").get(0))) +// .shardsWithState(ShardRoutingState.STARTED); +// +// // fail open will be called for shards in zone-a data node with replica in zone-c data node +// Set result = new HashSet<>(); +// int failOpenShardCount = 0; +// for (ShardRouting shardRouting : shardInNodeA) { +// result.add(shardRouting.shardId()); +// } +// for (ShardRouting shardRouting : shardInNodeC) { +// if (result.contains(shardRouting.shardId())) { +// failOpenShardCount++; +// } +// } + + logger.info("--> network disruption is started"); + networkDisruption.startDisrupting(); + + Set hitNodes = new HashSet<>(); + logger.info("--> making search requests"); + + Future response = internalCluster().smartClient() + .prepareSearch("test") + .setSize(100) + .setQuery(QueryBuilders.matchAllQuery()) + .execute(); + + logger.info("--> network disruption is stopped"); + networkDisruption.stopDisrupting(); + + try { + SearchResponse searchResponse = response.get(); + + } catch (Exception t) { + fail("search should not fail"); + } + + TransportConnectionFailureStats.getInstance().getConnectionFailure(); + + } + } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java index 8293a5bb27612..27d5bab9c2356 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java @@ -66,6 +66,7 @@ import org.opensearch.search.pipeline.SearchPipelineStats; import org.opensearch.tasks.TaskCancellationStats; import org.opensearch.threadpool.ThreadPoolStats; +import org.opensearch.transport.TransportConnectionFailureStats; import org.opensearch.transport.TransportStats; import java.io.IOException; @@ -158,6 +159,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment { @Nullable private AdmissionControlStats admissionControlStats; + @Nullable + private TransportConnectionFailureStats transportConnectionFailureStats; + public NodeStats(StreamInput in) throws IOException { super(in); timestamp = in.readVLong(); @@ -235,6 +239,11 @@ public NodeStats(StreamInput in) throws IOException { } else { admissionControlStats = null; } + if (in.getVersion().onOrAfter(Version.V_2_12_0)) { + transportConnectionFailureStats = in.readOptionalWriteable(TransportConnectionFailureStats::new); + } else { + transportConnectionFailureStats = null; + } } public NodeStats( @@ -265,7 +274,8 @@ public NodeStats( @Nullable SearchPipelineStats searchPipelineStats, @Nullable SegmentReplicationRejectionStats segmentReplicationRejectionStats, @Nullable RepositoriesStats repositoriesStats, - @Nullable AdmissionControlStats admissionControlStats + @Nullable AdmissionControlStats admissionControlStats, + @Nullable TransportConnectionFailureStats transportConnectionFailureStats ) { super(node); this.timestamp = timestamp; @@ -295,6 +305,7 @@ public NodeStats( this.segmentReplicationRejectionStats = segmentReplicationRejectionStats; this.repositoriesStats = repositoriesStats; this.admissionControlStats = admissionControlStats; + this.transportConnectionFailureStats = transportConnectionFailureStats; } public long getTimestamp() { @@ -452,6 +463,9 @@ public AdmissionControlStats getAdmissionControlStats() { return admissionControlStats; } + @Nullable + public TransportConnectionFailureStats getTransportConnectionFailureStats() { return transportConnectionFailureStats;} + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); @@ -508,6 +522,10 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_3_0_0)) { out.writeOptionalWriteable(admissionControlStats); } + + if (out.getVersion().onOrAfter(Version.V_2_12_0)) { + out.writeOptionalWriteable(transportConnectionFailureStats); + } } @Override @@ -611,6 +629,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (getAdmissionControlStats() != null) { getAdmissionControlStats().toXContent(builder, params); } + if (getTransportConnectionFailureStats() != null){ + getTransportConnectionFailureStats().toXContent(builder, params); + } return builder; } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java index 1af56f10b95ee..c66f1236695eb 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java @@ -219,7 +219,9 @@ public enum Metric { RESOURCE_USAGE_STATS("resource_usage_stats"), SEGMENT_REPLICATION_BACKPRESSURE("segment_replication_backpressure"), REPOSITORIES("repositories"), - ADMISSION_CONTROL("admission_control"); + ADMISSION_CONTROL("admission_control"), + TRANSPORT_CONNECTION_FAILURE_STATS("transport_connection_failure"); + private String metricName; diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java index 1df73d3b4394d..03891302a4934 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java @@ -128,7 +128,8 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) { NodesStatsRequest.Metric.RESOURCE_USAGE_STATS.containedIn(metrics), NodesStatsRequest.Metric.SEGMENT_REPLICATION_BACKPRESSURE.containedIn(metrics), NodesStatsRequest.Metric.REPOSITORIES.containedIn(metrics), - NodesStatsRequest.Metric.ADMISSION_CONTROL.containedIn(metrics) + NodesStatsRequest.Metric.ADMISSION_CONTROL.containedIn(metrics), + NodesStatsRequest.Metric.TRANSPORT_CONNECTION_FAILURE_STATS.containedIn(metrics) ); } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java index 9c5dcc9e9de3f..e4f483f796f44 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java @@ -172,6 +172,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq false, false, false, + false, false ); List shardsStats = new ArrayList<>(); diff --git a/server/src/main/java/org/opensearch/node/NodeService.java b/server/src/main/java/org/opensearch/node/NodeService.java index 15cc8f3d20bb3..ef8d08d330e26 100644 --- a/server/src/main/java/org/opensearch/node/NodeService.java +++ b/server/src/main/java/org/opensearch/node/NodeService.java @@ -32,6 +32,7 @@ package org.opensearch.node; +import com.sun.jdi.connect.Transport; import org.opensearch.Build; import org.opensearch.Version; import org.opensearch.action.admin.cluster.node.info.NodeInfo; @@ -62,6 +63,7 @@ import org.opensearch.search.pipeline.SearchPipelineService; import org.opensearch.tasks.TaskCancellationMonitoringService; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportConnectionFailureStats; import org.opensearch.transport.TransportService; import java.io.Closeable; @@ -236,7 +238,8 @@ public NodeStats stats( boolean resourceUsageStats, boolean segmentReplicationTrackerStats, boolean repositoriesStats, - boolean admissionControl + boolean admissionControl, + boolean transportConnectionFailureStats ) { // for indices stats we want to include previous allocated shards stats as well (it will // only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats) @@ -268,7 +271,8 @@ public NodeStats stats( searchPipelineStats ? this.searchPipelineService.stats() : null, segmentReplicationTrackerStats ? this.segmentReplicationStatsTracker.getTotalRejectionStats() : null, repositoriesStats ? this.repositoriesService.getRepositoriesStats() : null, - admissionControl ? this.admissionControlService.stats() : null + admissionControl ? this.admissionControlService.stats() : null, + transportConnectionFailureStats ? TransportConnectionFailureStats.getInstance(): null ); } diff --git a/server/src/main/java/org/opensearch/transport/ClusterConnectionManager.java b/server/src/main/java/org/opensearch/transport/ClusterConnectionManager.java index e634323d58269..7494a272249df 100644 --- a/server/src/main/java/org/opensearch/transport/ClusterConnectionManager.java +++ b/server/src/main/java/org/opensearch/transport/ClusterConnectionManager.java @@ -112,7 +112,7 @@ public void openConnection(DiscoveryNode node, ConnectionProfile connectionProfi /** * Connects to a node with the given connection profile. If the node is already connected this method has no effect. - * Once a successful is established, it can be validated before being exposed. + * Once a successgul is established, it can be validated before being exposed. * The ActionListener will be called on the calling thread or the generic thread pool. */ @Override @@ -203,6 +203,8 @@ public void connectToNode( public Transport.Connection getConnection(DiscoveryNode node) { Transport.Connection connection = connectedNodes.get(node); if (connection == null) { + // capture stats here + TransportConnectionFailureStats.getInstance().updateConnectionFailureCount(node.getHostAddress()); throw new NodeNotConnectedException(node, "Node not connected"); } return connection; diff --git a/server/src/main/java/org/opensearch/transport/TransportConnectionFailureStats.java b/server/src/main/java/org/opensearch/transport/TransportConnectionFailureStats.java new file mode 100644 index 0000000000000..06845b2fd4d6d --- /dev/null +++ b/server/src/main/java/org/opensearch/transport/TransportConnectionFailureStats.java @@ -0,0 +1,80 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.transport; + +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.xcontent.ToXContentFragment; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; + +public class TransportConnectionFailureStats implements ToXContentFragment, Writeable { + // Node as key with failure count as value + private ConcurrentHashMap connectionFailureMap; + + private static final TransportConnectionFailureStats INSTANCE = new TransportConnectionFailureStats(); + + public TransportConnectionFailureStats() { + connectionFailureMap = new ConcurrentHashMap<>(); + } + + public TransportConnectionFailureStats(StreamInput in) throws IOException { + connectionFailureMap = (ConcurrentHashMap)in.readMap(StreamInput::readString, StreamInput::readInt); + } + + public static TransportConnectionFailureStats getInstance() { + return INSTANCE; + } + public void updateConnectionFailureCount(String node) + { + connectionFailureMap.compute(node, (k, v) -> v==null? 1 : v+1); + } + + public int getConnectionFailureCount(String node) + { + if (connectionFailureMap.containsKey(node)) + { + Integer value = connectionFailureMap.get(node); + return value; + } + return 0; + } + + public Map getConnectionFailure() + { + return new HashMap(connectionFailureMap); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + Map map = connectionFailureMap; + out.writeMap(map,StreamOutput::writeString, StreamOutput::writeInt); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject("transport_connection_failure"); + for (var entry: connectionFailureMap.entrySet()) { + builder.field(entry.getKey(), entry.getValue()); + } + builder.endObject(); + return builder; + } + + @Override + public int hashCode() { + return Objects.hash(connectionFailureMap); + } +} diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java index b8ab5c935fa34..6e4c2dfe05242 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -954,7 +954,8 @@ public void apply(String action, AdmissionControlActionType admissionControlActi null, segmentReplicationRejectionStats, null, - admissionControlStats + admissionControlStats, + null ); } @@ -1010,4 +1011,5 @@ private static RemoteTranslogTransferTracker.Stats getRandomRemoteTranslogTransf private OperationStats getPipelineStats(List pipelineStats, String id) { return pipelineStats.stream().filter(p1 -> p1.getPipelineId().equals(id)).findFirst().map(p2 -> p2.getStats()).orElse(null); } + }