Skip to content

Commit

Permalink
Initial Commit
Browse files Browse the repository at this point in the history
  • Loading branch information
Anshu Agarwal committed Dec 6, 2023
1 parent 69cc2a1 commit 783f3c6
Show file tree
Hide file tree
Showing 12 changed files with 211 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.disruption.NetworkDisruption;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.TransportConnectionFailureStats;
import org.junit.Assert;

import java.io.IOException;
Expand Down Expand Up @@ -244,7 +245,9 @@ private Map<String, List<String>> setupCluster(int nodeCountPerAZ, Settings comm
nodeMap.put("c", nodes_in_zone_c);

logger.info("--> waiting for nodes to form a cluster");
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("4").execute().actionGet();
int totalNodes = nodeCountPerAZ*3+1;
ClusterHealthResponse health =
client().admin().cluster().prepareHealth().setWaitForNodes(Integer.toString(totalNodes)).execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));

ensureGreen();
Expand Down Expand Up @@ -1462,4 +1465,84 @@ public void testWeightedRoutingFailOpenStats() throws Exception {
WeightedRoutingStats.getInstance().resetFailOpenCount();
}

public void testTransportConnectionFailureStats() throws Exception {

Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
.build();

int nodeCountPerAZ = 1;
Map<String, List<String>> nodeMap = setupCluster(nodeCountPerAZ, commonSettings);

int numShards = 10;
int numReplicas = 1;
setUpIndexing(numShards, numReplicas);

logger.info("--> creating network partition disruption");
final String clusterManagerNode1 = internalCluster().getClusterManagerName();
Set<String> nodesInOneSide =
Stream.of(nodeMap.get("b").get(0), nodeMap.get("c").get(0)).collect(Collectors.toCollection(HashSet::new));
Set<String> nodesInOtherSide = Stream.of(nodeMap.get("a").get(0)).collect(Collectors.toCollection(HashSet::new));

NetworkDisruption networkDisruption = new NetworkDisruption(
new NetworkDisruption.TwoPartitions(nodesInOneSide, nodesInOtherSide),
NetworkDisruption.DISCONNECT
);
internalCluster().setDisruptionScheme(networkDisruption);

DiscoveryNodes dataNodes = internalCluster().clusterService().state().nodes();

Map<String, String> nodeIDMap = new HashMap<>();
for (DiscoveryNode node : dataNodes) {
nodeIDMap.put(node.getName(), node.getId());
}

logger.info("--> network disruption is started");
networkDisruption.startDisrupting();

Set<String> hitNodes = new HashSet<>();
logger.info("--> making search requests");

Future<SearchResponse>[] responses = new Future[50];
logger.info("--> making search requests");
for (int i = 0; i < 50; i++) {
responses[i] = internalCluster().client(nodeMap.get("a").get(0))
.prepareSearch("test")
.setQuery(QueryBuilders.matchAllQuery())
.execute();
}

logger.info("--> network disruption is stopped");
networkDisruption.stopDisrupting();

int failedShardCount = 0;
for (int i = 0; i < 50; i++) {
try {
SearchResponse searchResponse = responses[i].get();
failedShardCount += searchResponse.getFailedShards();

} catch (Exception t) {
fail("search should not fail");
}
}



Map<String, Integer> failures = TransportConnectionFailureStats.getInstance().getConnectionFailure();

NodesStatsResponse nodeStats =
client().admin().cluster().prepareNodesStats().addMetric("transport_connection_failure").execute().actionGet();
Map<String, NodeStats> stats = nodeStats.getNodesMap();
System.out.println("completed");

NodeStats nodeStatsC = stats.get(nodeIDMap.get(nodeMap.get("c").get(0)));
NodeStats nodeStatsB = stats.get(nodeIDMap.get(nodeMap.get("b").get(0)));
NodeStats nodeStatsA = stats.get(nodeIDMap.get(nodeMap.get("a").get(0)));
System.out.println("completed");

// assertEquals(failOpenShardCount, nodeStatsC.getWeightedRoutingStats().getFailOpenCount());

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.opensearch.search.pipeline.SearchPipelineStats;
import org.opensearch.tasks.TaskCancellationStats;
import org.opensearch.threadpool.ThreadPoolStats;
import org.opensearch.transport.TransportConnectionFailureStats;
import org.opensearch.transport.TransportStats;

import java.io.IOException;
Expand Down Expand Up @@ -158,6 +159,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
@Nullable
private AdmissionControlStats admissionControlStats;

@Nullable
private TransportConnectionFailureStats transportConnectionFailureStats;

public NodeStats(StreamInput in) throws IOException {
super(in);
timestamp = in.readVLong();
Expand Down Expand Up @@ -235,6 +239,11 @@ public NodeStats(StreamInput in) throws IOException {
} else {
admissionControlStats = null;
}
if (in.getVersion().onOrAfter(Version.V_2_12_0)) {
transportConnectionFailureStats = in.readOptionalWriteable(TransportConnectionFailureStats::new);
} else {
transportConnectionFailureStats = null;
}
}

public NodeStats(
Expand Down Expand Up @@ -265,7 +274,8 @@ public NodeStats(
@Nullable SearchPipelineStats searchPipelineStats,
@Nullable SegmentReplicationRejectionStats segmentReplicationRejectionStats,
@Nullable RepositoriesStats repositoriesStats,
@Nullable AdmissionControlStats admissionControlStats
@Nullable AdmissionControlStats admissionControlStats,
@Nullable TransportConnectionFailureStats transportConnectionFailureStats
) {
super(node);
this.timestamp = timestamp;
Expand Down Expand Up @@ -295,6 +305,7 @@ public NodeStats(
this.segmentReplicationRejectionStats = segmentReplicationRejectionStats;
this.repositoriesStats = repositoriesStats;
this.admissionControlStats = admissionControlStats;
this.transportConnectionFailureStats = transportConnectionFailureStats;
}

public long getTimestamp() {
Expand Down Expand Up @@ -452,6 +463,11 @@ public AdmissionControlStats getAdmissionControlStats() {
return admissionControlStats;
}

@Nullable
public TransportConnectionFailureStats getTransportConnectionFailureStats() {
return transportConnectionFailureStats;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand Down Expand Up @@ -508,6 +524,10 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeOptionalWriteable(admissionControlStats);
}

if (out.getVersion().onOrAfter(Version.V_2_12_0)) {
out.writeOptionalWriteable(transportConnectionFailureStats);
}
}

@Override
Expand Down Expand Up @@ -611,6 +631,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (getAdmissionControlStats() != null) {
getAdmissionControlStats().toXContent(builder, params);
}
if (getTransportConnectionFailureStats() != null) {
getTransportConnectionFailureStats().toXContent(builder, params);
}
return builder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,8 @@ public enum Metric {
RESOURCE_USAGE_STATS("resource_usage_stats"),
SEGMENT_REPLICATION_BACKPRESSURE("segment_replication_backpressure"),
REPOSITORIES("repositories"),
ADMISSION_CONTROL("admission_control");
ADMISSION_CONTROL("admission_control"),
TRANSPORT_CONNECTION_FAILURE_STATS("transport_connection_failure");

private String metricName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) {
NodesStatsRequest.Metric.RESOURCE_USAGE_STATS.containedIn(metrics),
NodesStatsRequest.Metric.SEGMENT_REPLICATION_BACKPRESSURE.containedIn(metrics),
NodesStatsRequest.Metric.REPOSITORIES.containedIn(metrics),
NodesStatsRequest.Metric.ADMISSION_CONTROL.containedIn(metrics)
NodesStatsRequest.Metric.ADMISSION_CONTROL.containedIn(metrics),
NodesStatsRequest.Metric.TRANSPORT_CONNECTION_FAILURE_STATS.containedIn(metrics)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
false,
false,
false,
false,
false
);
List<ShardStats> shardsStats = new ArrayList<>();
Expand Down
7 changes: 5 additions & 2 deletions server/src/main/java/org/opensearch/node/NodeService.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.opensearch.search.pipeline.SearchPipelineService;
import org.opensearch.tasks.TaskCancellationMonitoringService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportConnectionFailureStats;
import org.opensearch.transport.TransportService;

import java.io.Closeable;
Expand Down Expand Up @@ -236,7 +237,8 @@ public NodeStats stats(
boolean resourceUsageStats,
boolean segmentReplicationTrackerStats,
boolean repositoriesStats,
boolean admissionControl
boolean admissionControl,
boolean transportConnectionFailureStats
) {
// for indices stats we want to include previous allocated shards stats as well (it will
// only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats)
Expand Down Expand Up @@ -268,7 +270,8 @@ public NodeStats stats(
searchPipelineStats ? this.searchPipelineService.stats() : null,
segmentReplicationTrackerStats ? this.segmentReplicationStatsTracker.getTotalRejectionStats() : null,
repositoriesStats ? this.repositoriesService.getRepositoriesStats() : null,
admissionControl ? this.admissionControlService.stats() : null
admissionControl ? this.admissionControlService.stats() : null,
transportConnectionFailureStats ? TransportConnectionFailureStats.getInstance() : null
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public void openConnection(DiscoveryNode node, ConnectionProfile connectionProfi

/**
* Connects to a node with the given connection profile. If the node is already connected this method has no effect.
* Once a successful is established, it can be validated before being exposed.
* Once a successgul is established, it can be validated before being exposed.
* The ActionListener will be called on the calling thread or the generic thread pool.
*/
@Override
Expand Down Expand Up @@ -203,6 +203,8 @@ public void connectToNode(
public Transport.Connection getConnection(DiscoveryNode node) {
Transport.Connection connection = connectedNodes.get(node);
if (connection == null) {
// capture stats here
TransportConnectionFailureStats.getInstance().updateConnectionFailureCount(node.getName());
throw new NodeNotConnectedException(node, "Node not connected");
}
return connection;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.transport;

import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;

public class TransportConnectionFailureStats implements ToXContentFragment, Writeable {
// Node as key with failure count as value
private Map<String, Integer> connectionFailureMap = ConcurrentCollections.newConcurrentMap();

private static final TransportConnectionFailureStats INSTANCE = new TransportConnectionFailureStats();

public TransportConnectionFailureStats() {

}

public TransportConnectionFailureStats(StreamInput in) throws IOException {
connectionFailureMap = in.readMap(StreamInput::readString, StreamInput::readInt);
}

public static TransportConnectionFailureStats getInstance() {
return INSTANCE;
}

public void updateConnectionFailureCount(String node) {
connectionFailureMap.compute(node, (k, v) -> v == null ? 1 : v + 1);
}

public int getConnectionFailureCount(String node) {
if (connectionFailureMap.containsKey(node)) {
Integer value = connectionFailureMap.get(node);
return value;
}
return 0;
}

public Map<String, Integer> getConnectionFailure() {
return new ConcurrentHashMap(connectionFailureMap);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
Map<String, Integer> map = connectionFailureMap;
out.writeMap(map, StreamOutput::writeString, StreamOutput::writeInt);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject("transport_connection_failure");
for (var entry : connectionFailureMap.entrySet()) {
builder.field(entry.getKey(), entry.getValue());
}
builder.endObject();
return builder;
}

@Override
public int hashCode() {
return Objects.hash(connectionFailureMap);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -954,7 +954,8 @@ public void apply(String action, AdmissionControlActionType admissionControlActi
null,
segmentReplicationRejectionStats,
null,
admissionControlStats
admissionControlStats,
null
);
}

Expand Down Expand Up @@ -1010,4 +1011,5 @@ private static RemoteTranslogTransferTracker.Stats getRandomRemoteTranslogTransf
private OperationStats getPipelineStats(List<IngestStats.PipelineStat> pipelineStats, String id) {
return pipelineStats.stream().filter(p1 -> p1.getPipelineId().equals(id)).findFirst().map(p2 -> p2.getStats()).orElse(null);
}

}
Loading

0 comments on commit 783f3c6

Please sign in to comment.