From 1f6ad45b8a694f73e18cbc207832830bb7d479db Mon Sep 17 00:00:00 2001 From: Rajaeelfarsi Date: Thu, 4 Jan 2024 17:09:38 +0100 Subject: [PATCH 1/2] add shards number by node Signed-off-by: Rajaeelfarsi --- .../PrometheusMetricsCollector.java | 40 ++++++++++++++++++- .../RestPrometheusMetricsAction.java | 23 +++++++++-- 2 files changed, 59 insertions(+), 4 deletions(-) diff --git a/src/main/java/org/compuscene/metrics/prometheus/PrometheusMetricsCollector.java b/src/main/java/org/compuscene/metrics/prometheus/PrometheusMetricsCollector.java index a5adbc7..175a515 100644 --- a/src/main/java/org/compuscene/metrics/prometheus/PrometheusMetricsCollector.java +++ b/src/main/java/org/compuscene/metrics/prometheus/PrometheusMetricsCollector.java @@ -17,9 +17,16 @@ package org.compuscene.metrics.prometheus; +import java.util.logging.LogManager; +import java.util.logging.Logger; import org.opensearch.action.ClusterStatsData; import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; import org.opensearch.action.admin.cluster.node.stats.NodeStats; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.opensearch.action.admin.cluster.state.ClusterStateResponse; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.action.admin.indices.stats.CommonStats; import org.opensearch.action.admin.indices.stats.IndexStats; import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; @@ -91,6 +98,7 @@ public void registerMetrics() { registerOsMetrics(); registerFsMetrics(); registerESSettings(); + registerNumberOfShardsPerNode(); } private void registerClusterMetrics() { @@ -925,7 +933,8 @@ public void updateMetrics(String originNodeName, String originNodeId, @Nullable ClusterHealthResponse clusterHealthResponse, NodeStats[] nodeStats, @Nullable IndicesStatsResponse indicesStats, - @Nullable ClusterStatsData clusterStatsData) { + @Nullable ClusterStatsData clusterStatsData, + @Nullable ClusterStateResponse clusterStateResponse) { Summary.Timer timer = catalog.startSummaryTimer( new Tuple<>(originNodeName, originNodeId), "metrics_generate_time_seconds"); @@ -936,6 +945,7 @@ public void updateMetrics(String originNodeName, String originNodeId, String nodeName = s.getNode().getName(); String nodeID = s.getNode().getId(); Tuple nodeInfo = new Tuple<>(nodeName, nodeID); + DiscoveryNode node = s.getNode(); updateNodeMetrics(nodeInfo, s); updateIndicesMetrics(nodeInfo, s.getIndices()); @@ -949,6 +959,7 @@ public void updateMetrics(String originNodeName, String originNodeId, updateJVMMetrics(nodeInfo, s.getJvm()); updateOsMetrics(nodeInfo, s.getOs()); updateFsMetrics(nodeInfo, s.getFs()); + updateNumberOfShardsPerNode(nodeInfo,clusterStateResponse,node); } if (isPrometheusIndices) { updatePerIndexMetrics(clusterHealthResponse, indicesStats); @@ -960,6 +971,33 @@ public void updateMetrics(String originNodeName, String originNodeId, timer.observeDuration(); } + private void registerNumberOfShardsPerNode() { + catalog.registerNodeGauge("nodes_shards_number", "node shards"); + } + public void updateNumberOfShardsPerNode(Tuple nodeInfo,ClusterStateResponse clusterStateResponse,DiscoveryNode node) { + + final Map allocs = new HashMap<>(); + if(clusterStateResponse.getState().routingTable().allShards()!=null){ + for (ShardRouting shard : clusterStateResponse.getState().routingTable().allShards()) { + String nodeId = "UNASSIGNED"; + + if (shard.assignedToNode()) { + nodeId = shard.currentNodeId(); + } + + allocs.merge(nodeId, 1, Integer::sum); + } + + int shardCount = allocs.getOrDefault(node.getId(), 0); + + catalog.setNodeGauge(nodeInfo,"nodes_shards_number", shardCount); + + } + + } + + + /** * Get the metric catalog. * @return The catalog diff --git a/src/main/java/org/opensearch/rest/prometheus/RestPrometheusMetricsAction.java b/src/main/java/org/opensearch/rest/prometheus/RestPrometheusMetricsAction.java index ed5c9fd..072e5c6 100644 --- a/src/main/java/org/opensearch/rest/prometheus/RestPrometheusMetricsAction.java +++ b/src/main/java/org/opensearch/rest/prometheus/RestPrometheusMetricsAction.java @@ -27,6 +27,10 @@ import org.compuscene.metrics.prometheus.PrometheusSettings; import org.opensearch.action.NodePrometheusMetricsRequest; import org.opensearch.action.NodePrometheusMetricsResponse; +import org.opensearch.action.admin.cluster.state.ClusterStateResponse; +import org.opensearch.action.admin.cluster.state.ClusterStateRequest; +import org.opensearch.core.action.ActionListener; +import org.opensearch.OpenSearchException; import org.opensearch.client.node.NodeClient; import org.opensearch.common.network.NetworkAddress; import org.opensearch.common.settings.ClusterSettings; @@ -65,7 +69,7 @@ public class RestPrometheusMetricsAction extends BaseRestHandler { private final String metricPrefix; private final PrometheusSettings prometheusSettings; private final Logger logger = LogManager.getLogger(getClass()); - + private ClusterStateResponse clusterStateResponse = null; /** * A constructor. * @param settings Settings @@ -91,6 +95,18 @@ public String getName() { return "prometheus_metrics_action"; } + private final ActionListener clusterStateResponseActionListener = + new ActionListener() { + @Override + public void onResponse(ClusterStateResponse response) { + clusterStateResponse = response; + } + @Override + public void onFailure(Exception e) { + + } + }; + // This method does not throw any IOException because there are no request parameters to be parsed // and processed. This may change in the future. @Override @@ -113,7 +129,8 @@ public RestResponse buildResponse(NodePrometheusMetricsResponse response) throws assert response.getLocalNodesInfoResponse().getNodes().size() == 1; String nodeName = response.getLocalNodesInfoResponse().getNodes().get(0).getNode().getName(); String nodeId = response.getLocalNodesInfoResponse().getNodes().get(0).getNode().getId(); - + ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); + client.admin().cluster().state(clusterStateRequest, clusterStateResponseActionListener); if (logger.isTraceEnabled()) { logger.trace("Preparing metrics output on node: [{}], [{}]", nodeName, nodeId); } @@ -130,7 +147,7 @@ public RestResponse buildResponse(NodePrometheusMetricsResponse response) throws collector.registerMetrics(); collector.updateMetrics( nodeName, nodeId, response.getClusterHealth(), response.getNodeStats(), - response.getIndicesStats(), response.getClusterStatsData()); + response.getIndicesStats(), response.getClusterStatsData(),clusterStateResponse); textContent = collector.getTextContent(); } catch (Exception ex) { // We use try-catch block to catch exception from Prometheus catalog and collector processing From f1525d6546bf111c65eeba5af778657853d3340f Mon Sep 17 00:00:00 2001 From: Rajaeelfarsi Date: Fri, 5 Jan 2024 10:58:34 +0100 Subject: [PATCH 2/2] add unit test Signed-off-by: Rajaeelfarsi --- .../test/70_10_nodes_shards_number.yml | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) create mode 100644 src/yamlRestTest/resources/rest-api-spec/test/70_10_nodes_shards_number.yml diff --git a/src/yamlRestTest/resources/rest-api-spec/test/70_10_nodes_shards_number.yml b/src/yamlRestTest/resources/rest-api-spec/test/70_10_nodes_shards_number.yml new file mode 100644 index 0000000..35d1954 --- /dev/null +++ b/src/yamlRestTest/resources/rest-api-spec/test/70_10_nodes_shards_number.yml @@ -0,0 +1,31 @@ +"Cluster nodes shards (nodes_shards_number)": + + # We expect no indices in the cluster + - do: + indices.refresh: { allow_no_indices: true } + + - do: + cluster.stats: {} + + - match: { indices.count: 0 } + + - do: + index: + index: twitter + id: 1 + body: { foo: bar, settings: { number_of_shards: 5 } } + + - do: + indices.refresh: { allow_no_indices: true } + + # Verify in Prometheus metrics that we get metrics only from a single node (the _local one): + - do: + prometheus.metrics: {} + + - match: + $body: | + /.* + opensearch_nodes_shards_number\{ + cluster="yamlRestTest",node="[a-zA-Z0-9\-\.\_]+",nodeid="[a-zA-Z0-9\-\.\_]+" + \,} \s \d+\.\d+ (\n\#|![\n]) + .*/