From 2ac18e946b12ed150fa0dd1adca51ceb20478edc Mon Sep 17 00:00:00 2001 From: Smit Patel Date: Fri, 9 Aug 2024 13:47:06 +0530 Subject: [PATCH 1/3] Added support for snapshot related metrics Signed-off-by: Smit Patel --- build.gradle | 2 +- .../PrometheusMetricsCollector.java | 40 +++++++++- .../prometheus/PrometheusSettings.java | 24 ++++++ .../action/NodePrometheusMetricsResponse.java | 15 ++++ .../opensearch/action/SnapshotsResponse.java | 74 +++++++++++++++++++ .../TransportNodePrometheusMetricsAction.java | 64 ++++++++++++++-- .../prometheus/PrometheusExporterPlugin.java | 1 + .../RestPrometheusMetricsAction.java | 7 +- 8 files changed, 215 insertions(+), 12 deletions(-) create mode 100644 src/main/java/org/opensearch/action/SnapshotsResponse.java diff --git a/build.gradle b/build.gradle index 7c214f3..d1ba583 100644 --- a/build.gradle +++ b/build.gradle @@ -101,7 +101,7 @@ dependencies { restResources { restApi { - includeCore '_common', 'cat', 'cluster', 'nodes', 'indices', 'index' + includeCore '_common', 'cat', 'cluster', 'nodes', 'indices', 'index', 'snapshot' } } diff --git a/src/main/java/org/compuscene/metrics/prometheus/PrometheusMetricsCollector.java b/src/main/java/org/compuscene/metrics/prometheus/PrometheusMetricsCollector.java index a5adbc7..def70e6 100644 --- a/src/main/java/org/compuscene/metrics/prometheus/PrometheusMetricsCollector.java +++ b/src/main/java/org/compuscene/metrics/prometheus/PrometheusMetricsCollector.java @@ -18,6 +18,7 @@ package org.compuscene.metrics.prometheus; import org.opensearch.action.ClusterStatsData; +import org.opensearch.action.SnapshotsResponse; import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; import org.opensearch.action.admin.cluster.node.stats.NodeStats; import org.opensearch.action.admin.indices.stats.CommonStats; @@ -37,6 +38,8 @@ import org.opensearch.monitor.os.OsStats; import org.opensearch.monitor.process.ProcessStats; import org.opensearch.script.ScriptStats; +import org.opensearch.snapshots.SnapshotInfo; +import org.opensearch.snapshots.SnapshotState; import org.opensearch.threadpool.ThreadPoolStats; import org.opensearch.transport.TransportStats; @@ -54,19 +57,23 @@ public class PrometheusMetricsCollector { private boolean isPrometheusClusterSettings; private boolean isPrometheusIndices; + private boolean isPrometheusSnapshots; private PrometheusMetricsCatalog catalog; /** * A constructor. * @param catalog {@link PrometheusMetricsCatalog} * @param isPrometheusIndices boolean flag for index level metric + * @param isPrometheusSnapshots boolean flag for snapshots related metrics * @param isPrometheusClusterSettings boolean flag cluster settings metrics */ public PrometheusMetricsCollector(PrometheusMetricsCatalog catalog, boolean isPrometheusIndices, + boolean isPrometheusSnapshots, boolean isPrometheusClusterSettings) { this.isPrometheusClusterSettings = isPrometheusClusterSettings; this.isPrometheusIndices = isPrometheusIndices; + this.isPrometheusSnapshots = isPrometheusSnapshots; this.catalog = catalog; } @@ -80,6 +87,7 @@ public void registerMetrics() { registerNodeMetrics(); registerIndicesMetrics(); registerPerIndexMetrics(); + registerSnapshotMetrics(); registerTransportMetrics(); registerHTTPMetrics(); registerThreadPoolMetrics(); @@ -465,6 +473,30 @@ private void updatePerIndexMetrics(@Nullable ClusterHealthResponse chr, @Nullabl } } + @SuppressWarnings("checkstyle:LineLength") + private void registerSnapshotMetrics() { + catalog.registerClusterGauge("min_snapshot_age", "Time elapsed in milliseconds since the most recent successful snapshot's start time", "sm_policy"); + } + + private void updateSnapshotsMetrics(@Nullable SnapshotsResponse snapshotsResponse) { + if (snapshotsResponse == null) { + return; + } + Map smPolicyMinSnapshotAge = new HashMap<>(); + for (SnapshotInfo snapshotInfo : snapshotsResponse.getSnapshotInfos()) { + // emit min_snapshot_age metric only for successful snapshots + if (snapshotInfo.state() != SnapshotState.SUCCESS) { + continue; + } + String smPolicy = snapshotInfo.userMetadata() == null ? "adhoc" : snapshotInfo.userMetadata().getOrDefault("sm_policy", "adhoc").toString(); + long snapshotAge = System.currentTimeMillis() - snapshotInfo.startTime(); + smPolicyMinSnapshotAge.compute(smPolicy, (key, oldValue) -> oldValue == null ? snapshotAge : Math.min(oldValue, snapshotAge)); + } + for(Map.Entry entry : smPolicyMinSnapshotAge.entrySet()) { + catalog.setClusterGauge("min_snapshot_age", entry.getValue(), entry.getKey()); + } + } + @SuppressWarnings("checkstyle:LineLength") private void updatePerIndexContextMetrics(String indexName, String context, CommonStats idx) { catalog.setClusterGauge("index_doc_number", idx.getDocs().getCount(), indexName, context); @@ -920,12 +952,14 @@ private void updateESSettings(@Nullable ClusterStatsData stats) { * @param nodeStats NodeStats filtered using nodes filter * @param indicesStats IndicesStatsResponse * @param clusterStatsData ClusterStatsData + * @param snapshotsResponse SnapshotsResponse */ public void updateMetrics(String originNodeName, String originNodeId, @Nullable ClusterHealthResponse clusterHealthResponse, NodeStats[] nodeStats, @Nullable IndicesStatsResponse indicesStats, - @Nullable ClusterStatsData clusterStatsData) { + @Nullable ClusterStatsData clusterStatsData, + @Nullable SnapshotsResponse snapshotsResponse) { Summary.Timer timer = catalog.startSummaryTimer( new Tuple<>(originNodeName, originNodeId), "metrics_generate_time_seconds"); @@ -956,7 +990,9 @@ public void updateMetrics(String originNodeName, String originNodeId, if (isPrometheusClusterSettings) { updateESSettings(clusterStatsData); } - + if (isPrometheusSnapshots) { + updateSnapshotsMetrics(snapshotsResponse); + } timer.observeDuration(); } diff --git a/src/main/java/org/compuscene/metrics/prometheus/PrometheusSettings.java b/src/main/java/org/compuscene/metrics/prometheus/PrometheusSettings.java index 984464a..b67045f 100644 --- a/src/main/java/org/compuscene/metrics/prometheus/PrometheusSettings.java +++ b/src/main/java/org/compuscene/metrics/prometheus/PrometheusSettings.java @@ -49,6 +49,7 @@ public enum INDEX_FILTER_OPTIONS { static String PROMETHEUS_CLUSTER_SETTINGS_KEY = "prometheus.cluster.settings"; static String PROMETHEUS_INDICES_KEY = "prometheus.indices"; + static String PROMETHEUS_SNAPSHOTS_KEY = "prometheus.snapshots"; static String PROMETHEUS_NODES_FILTER_KEY = "prometheus.nodes.filter"; static String PROMETHEUS_SELECTED_INDICES_KEY = "prometheus.indices_filter.selected_indices"; static String PROMETHEUS_SELECTED_OPTION_KEY = "prometheus.indices_filter.selected_option"; @@ -69,6 +70,14 @@ public enum INDEX_FILTER_OPTIONS { Setting.boolSetting(PROMETHEUS_INDICES_KEY, true, Setting.Property.Dynamic, Setting.Property.NodeScope); + /** + * This setting is used configure weather to expose snapshot metrics or not. The default value is false. + * Can be configured in opensearch.yml file or update dynamically under key {@link #PROMETHEUS_SNAPSHOTS_KEY}. + */ + public static final Setting PROMETHEUS_SNAPSHOTS = + Setting.boolSetting(PROMETHEUS_SNAPSHOTS_KEY, false, + Setting.Property.Dynamic, Setting.Property.NodeScope); + /** * This setting is used configure which cluster nodes to gather metrics from. The default value is _local. * Can be configured in opensearch.yml file or update dynamically under key {@link #PROMETHEUS_NODES_FILTER_KEY}. @@ -97,6 +106,7 @@ public enum INDEX_FILTER_OPTIONS { private volatile boolean clusterSettings; private volatile boolean indices; + private volatile boolean snapshots; private volatile String nodesFilter; private volatile String selectedIndices; private volatile INDEX_FILTER_OPTIONS selectedOption; @@ -109,11 +119,13 @@ public enum INDEX_FILTER_OPTIONS { public PrometheusSettings(Settings settings, ClusterSettings clusterSettings) { setPrometheusClusterSettings(PROMETHEUS_CLUSTER_SETTINGS.get(settings)); setPrometheusIndices(PROMETHEUS_INDICES.get(settings)); + setPrometheusSnapshots(PROMETHEUS_SNAPSHOTS.get(settings)); setPrometheusNodesFilter(PROMETHEUS_NODES_FILTER.get(settings)); setPrometheusSelectedIndices(PROMETHEUS_SELECTED_INDICES.get(settings)); setPrometheusSelectedOption(PROMETHEUS_SELECTED_OPTION.get(settings)); clusterSettings.addSettingsUpdateConsumer(PROMETHEUS_CLUSTER_SETTINGS, this::setPrometheusClusterSettings); clusterSettings.addSettingsUpdateConsumer(PROMETHEUS_INDICES, this::setPrometheusIndices); + clusterSettings.addSettingsUpdateConsumer(PROMETHEUS_SNAPSHOTS, this::setPrometheusSnapshots); clusterSettings.addSettingsUpdateConsumer(PROMETHEUS_NODES_FILTER, this::setPrometheusNodesFilter); clusterSettings.addSettingsUpdateConsumer(PROMETHEUS_SELECTED_INDICES, this::setPrometheusSelectedIndices); clusterSettings.addSettingsUpdateConsumer(PROMETHEUS_SELECTED_OPTION, this::setPrometheusSelectedOption); @@ -127,6 +139,10 @@ private void setPrometheusIndices(boolean flag) { this.indices = flag; } + private void setPrometheusSnapshots(boolean flag) { + this.snapshots = flag; + } + private void setPrometheusNodesFilter(String filter) { this.nodesFilter = filter; } private void setPrometheusSelectedIndices(String selectedIndices) { @@ -153,6 +169,14 @@ public boolean getPrometheusIndices() { return this.indices; } + /** + * Get value of settings key {@link #PROMETHEUS_SNAPSHOTS_KEY}. + * @return boolean value of the key + */ + public boolean getPrometheusSnapshots() { + return this.snapshots; + } + /** * Get value of settings key {@link #PROMETHEUS_NODES_FILTER_KEY}. * @return boolean value of the key diff --git a/src/main/java/org/opensearch/action/NodePrometheusMetricsResponse.java b/src/main/java/org/opensearch/action/NodePrometheusMetricsResponse.java index d84de13..0468042 100644 --- a/src/main/java/org/opensearch/action/NodePrometheusMetricsResponse.java +++ b/src/main/java/org/opensearch/action/NodePrometheusMetricsResponse.java @@ -43,6 +43,7 @@ public class NodePrometheusMetricsResponse extends ActionResponse { private final NodeStats[] nodeStats; @Nullable private final IndicesStatsResponse indicesStats; private ClusterStatsData clusterStatsData = null; + @Nullable private final SnapshotsResponse snapshotsResponse; /** * A constructor that materialize the instance from inputStream. @@ -56,6 +57,7 @@ public NodePrometheusMetricsResponse(StreamInput in) throws IOException { nodeStats = in.readArray(NodeStats::new, NodeStats[]::new); indicesStats = PackageAccessHelper.createIndicesStatsResponse(in); clusterStatsData = new ClusterStatsData(in); + snapshotsResponse = new SnapshotsResponse(in); } /** @@ -65,6 +67,7 @@ public NodePrometheusMetricsResponse(StreamInput in) throws IOException { * @param nodesStats NodesStats * @param indicesStats IndicesStats * @param clusterStateResponse ClusterStateResponse + * @param snapshotsResponse SnapshotsResponse * @param settings Settings * @param clusterSettings ClusterSettings */ @@ -73,6 +76,7 @@ public NodePrometheusMetricsResponse(ClusterHealthResponse clusterHealth, NodeStats[] nodesStats, @Nullable IndicesStatsResponse indicesStats, @Nullable ClusterStateResponse clusterStateResponse, + @Nullable SnapshotsResponse snapshotsResponse, Settings settings, ClusterSettings clusterSettings) { this.clusterHealth = clusterHealth; @@ -82,6 +86,7 @@ public NodePrometheusMetricsResponse(ClusterHealthResponse clusterHealth, if (clusterStateResponse != null) { this.clusterStatsData = new ClusterStatsData(clusterStateResponse, settings, clusterSettings); } + this.snapshotsResponse = snapshotsResponse; } /** @@ -106,6 +111,15 @@ public NodeStats[] getNodeStats() { return this.nodeStats; } + /** + * Get internal {@link SnapshotsResponse} object. + * @return SnapshotsResponse object + */ + @Nullable + public SnapshotsResponse getSnapshotsResponse() { + return this.snapshotsResponse; + } + /** * Get internal {@link IndicesStatsResponse} object. * @return IndicesStatsResponse object @@ -131,5 +145,6 @@ public void writeTo(StreamOutput out) throws IOException { out.writeArray(nodeStats); out.writeOptionalWriteable(indicesStats); clusterStatsData.writeTo(out); + snapshotsResponse.writeTo(out); } } diff --git a/src/main/java/org/opensearch/action/SnapshotsResponse.java b/src/main/java/org/opensearch/action/SnapshotsResponse.java new file mode 100644 index 0000000..01b1e68 --- /dev/null +++ b/src/main/java/org/opensearch/action/SnapshotsResponse.java @@ -0,0 +1,74 @@ +/* + * Copyright [2018] [Vincent VAN HOLLEBEKE] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.opensearch.action; + +import org.opensearch.core.action.ActionResponse; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.snapshots.SnapshotInfo; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +/** + * Represents a container class for holding response data related to snapshots. + */ +public class SnapshotsResponse extends ActionResponse { + private final List snapshotInfos; + + /** + * A constructor. + * @param in A streamInput to materialize the instance from + * @throws IOException if there is an exception reading from inputStream + */ + public SnapshotsResponse(StreamInput in) throws IOException { + super(in); + snapshotInfos = in.readList(SnapshotInfo::new); + } + + /** + * A constructor. + * + * @param snapshotInfos A list of {@link SnapshotInfo} objects to initialize the instance with. + */ + public SnapshotsResponse(List snapshotInfos) { + this.snapshotInfos = Collections.unmodifiableList(snapshotInfos); + } + + /** + * Writes the instance into {@link StreamOutput}. + * + * @param out the output stream to which the instance is to be written + * @throws IOException if there is an exception writing to the output stream + */ + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeCollection(snapshotInfos); + } + + /** + * Getter for {@code snapshotInfos} list. + * The returned list is unmodifiable to ensure immutability. + * + * @return the list of {@link SnapshotInfo} objects + */ + public List getSnapshotInfos() { + return snapshotInfos; + } +} \ No newline at end of file diff --git a/src/main/java/org/opensearch/action/TransportNodePrometheusMetricsAction.java b/src/main/java/org/opensearch/action/TransportNodePrometheusMetricsAction.java index 3f07b65..66213e4 100644 --- a/src/main/java/org/opensearch/action/TransportNodePrometheusMetricsAction.java +++ b/src/main/java/org/opensearch/action/TransportNodePrometheusMetricsAction.java @@ -28,6 +28,10 @@ import org.opensearch.action.admin.cluster.node.stats.NodeStats; import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest; import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesRequest; +import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse; +import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest; +import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse; import org.opensearch.action.admin.cluster.state.ClusterStateRequest; import org.opensearch.action.admin.cluster.state.ClusterStateResponse; import org.opensearch.action.admin.indices.stats.IndicesStatsRequest; @@ -37,13 +41,20 @@ import org.opensearch.client.Client; import org.opensearch.client.Requests; import org.opensearch.common.Nullable; +import org.opensearch.common.action.ActionFuture; import org.opensearch.common.inject.Inject; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.core.action.ActionListener; +import org.opensearch.snapshots.SnapshotInfo; import org.opensearch.tasks.Task; import org.opensearch.transport.TransportService; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + /** * Transport action class for Prometheus Exporter plugin. * @@ -94,15 +105,17 @@ private class AsyncAction { private final NodesStatsRequest nodesStatsRequest; private final IndicesStatsRequest indicesStatsRequest; private final ClusterStateRequest clusterStateRequest; - + private final GetRepositoriesRequest repositoriesRequest; private ClusterHealthResponse clusterHealthResponse = null; private NodesInfoResponse localNodesInfoResponse = null; private NodesStatsResponse nodesStatsResponse = null; private IndicesStatsResponse indicesStatsResponse = null; private ClusterStateResponse clusterStateResponse = null; + private SnapshotsResponse snapshotsResponse = null; // read the state of prometheus dynamic settings only once at the beginning of the async request private final boolean isPrometheusIndices = prometheusSettings.getPrometheusIndices(); + private final boolean isPrometheusSnapshots = prometheusSettings.getPrometheusSnapshots(); private final boolean isPrometheusClusterSettings = prometheusSettings.getPrometheusClusterSettings(); private final String prometheusNodesFilter = prometheusSettings.getNodesFilter(); @@ -142,6 +155,12 @@ private AsyncAction(ActionListener listener) { this.indicesStatsRequest = null; } + if (isPrometheusSnapshots) { + this.repositoriesRequest = new GetRepositoriesRequest(); + } else { + this.repositoriesRequest = null; + } + // Cluster settings are get via ClusterStateRequest (see elasticsearch RestClusterGetSettingsAction for details) // We prefer to send it to master node (hence local=false; it should be set by default but we want to be sure). this.clusterStateRequest = isPrometheusClusterSettings ? Requests.clusterStateRequest() @@ -150,15 +169,47 @@ private AsyncAction(ActionListener listener) { private void gatherRequests() { listener.onResponse(buildResponse(clusterHealthResponse, localNodesInfoResponse, nodesStatsResponse, indicesStatsResponse, - clusterStateResponse)); + clusterStateResponse, snapshotsResponse)); } + private final ActionListener repositoriesResponseActionListener = + new ActionListener() { + @Override + public void onResponse(GetRepositoriesResponse response) { + List> snapshotsResponseFutures = response.repositories().stream() + .map(metadata -> new GetSnapshotsRequest(metadata.name())) + .map(snapshotsRequest -> client.admin().cluster().getSnapshots(snapshotsRequest)) + .collect(Collectors.toList()); + List snapshotInfos = new ArrayList<>(); + for (ActionFuture snapshotsResponseFuture : snapshotsResponseFutures) { + try { + GetSnapshotsResponse getSnapshotsResponse = snapshotsResponseFuture.get(); + snapshotInfos.addAll(getSnapshotsResponse.getSnapshots()); + } catch (InterruptedException | ExecutionException e) { + listener.onFailure(new OpenSearchException("Get snapshots request failed", e)); + return; + } + } + snapshotsResponse = new SnapshotsResponse(snapshotInfos); + gatherRequests(); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(new OpenSearchException("Get repositories request failed", e)); + } + }; + private final ActionListener clusterStateResponseActionListener = new ActionListener() { @Override public void onResponse(ClusterStateResponse response) { clusterStateResponse = response; - gatherRequests(); + if (isPrometheusSnapshots) { + client.admin().cluster().getRepositories(repositoriesRequest, repositoriesResponseActionListener); + } else { + gatherRequests(); + } } @Override @@ -175,7 +226,7 @@ public void onResponse(IndicesStatsResponse response) { if (isPrometheusClusterSettings) { client.admin().cluster().state(clusterStateRequest, clusterStateResponseActionListener); } else { - gatherRequests(); + clusterStateResponseActionListener.onResponse(null); } } @@ -239,12 +290,13 @@ protected NodePrometheusMetricsResponse buildResponse(ClusterHealthResponse clus NodesInfoResponse localNodesInfoResponse, NodesStatsResponse nodesStats, @Nullable IndicesStatsResponse indicesStats, - @Nullable ClusterStateResponse clusterStateResponse) { + @Nullable ClusterStateResponse clusterStateResponse, + @Nullable SnapshotsResponse snapshotsResponse) { NodePrometheusMetricsResponse response = new NodePrometheusMetricsResponse( clusterHealth, localNodesInfoResponse, nodesStats.getNodes().toArray(new NodeStats[0]), - indicesStats, clusterStateResponse, + indicesStats, clusterStateResponse, snapshotsResponse, settings, clusterSettings); if (logger.isTraceEnabled()) { logger.trace("Return response: [{}]", response); diff --git a/src/main/java/org/opensearch/plugin/prometheus/PrometheusExporterPlugin.java b/src/main/java/org/opensearch/plugin/prometheus/PrometheusExporterPlugin.java index a05c8e0..edc2e90 100644 --- a/src/main/java/org/opensearch/plugin/prometheus/PrometheusExporterPlugin.java +++ b/src/main/java/org/opensearch/plugin/prometheus/PrometheusExporterPlugin.java @@ -75,6 +75,7 @@ public List> getSettings() { List> settings = Arrays.asList( PrometheusSettings.PROMETHEUS_CLUSTER_SETTINGS, PrometheusSettings.PROMETHEUS_INDICES, + PrometheusSettings.PROMETHEUS_SNAPSHOTS, PrometheusSettings.PROMETHEUS_NODES_FILTER, PrometheusSettings.PROMETHEUS_SELECTED_INDICES, PrometheusSettings.PROMETHEUS_SELECTED_OPTION, diff --git a/src/main/java/org/opensearch/rest/prometheus/RestPrometheusMetricsAction.java b/src/main/java/org/opensearch/rest/prometheus/RestPrometheusMetricsAction.java index ed5c9fd..2e34dbf 100644 --- a/src/main/java/org/opensearch/rest/prometheus/RestPrometheusMetricsAction.java +++ b/src/main/java/org/opensearch/rest/prometheus/RestPrometheusMetricsAction.java @@ -91,8 +91,8 @@ public String getName() { return "prometheus_metrics_action"; } - // This method does not throw any IOException because there are no request parameters to be parsed - // and processed. This may change in the future. + // This method does not throw any IOException because there are no request parameters to be parsed + // and processed. This may change in the future. @Override protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) { if (logger.isTraceEnabled()) { @@ -125,12 +125,13 @@ public RestResponse buildResponse(NodePrometheusMetricsResponse response) throws collector = new PrometheusMetricsCollector( catalog, prometheusSettings.getPrometheusIndices(), + prometheusSettings.getPrometheusSnapshots(), prometheusSettings.getPrometheusClusterSettings() ); collector.registerMetrics(); collector.updateMetrics( nodeName, nodeId, response.getClusterHealth(), response.getNodeStats(), - response.getIndicesStats(), response.getClusterStatsData()); + response.getIndicesStats(), response.getClusterStatsData(), response.getSnapshotsResponse()); textContent = collector.getTextContent(); } catch (Exception ex) { // We use try-catch block to catch exception from Prometheus catalog and collector processing From a3a40587605e47063d7457b805072d53f855d6f6 Mon Sep 17 00:00:00 2001 From: Smit Patel Date: Sun, 11 Aug 2024 00:10:31 +0530 Subject: [PATCH 2/3] Added integration test for snapshots metrics Signed-off-by: Smit Patel --- README.md | 7 +++ build.gradle | 6 +++ .../prometheus/PrometheusSettings.java | 6 +-- .../opensearch/action/SnapshotsResponse.java | 19 ++++--- .../TransportNodePrometheusMetricsAction.java | 53 +++++++++++-------- .../test/70_10_snapshots_metrics.yml | 50 +++++++++++++++++ 6 files changed, 110 insertions(+), 31 deletions(-) create mode 100644 src/yamlRestTest/resources/rest-api-spec/test/70_10_snapshots_metrics.yml diff --git a/README.md b/README.md index 3adbaa9..8826fd9 100644 --- a/README.md +++ b/README.md @@ -145,6 +145,13 @@ To disable exporting cluster settings use: prometheus.cluster.settings: false ``` +#### Snapshot metrics + +To enable exporting snapshot metrics use: +``` +prometheus.snapshots: true +``` + #### Nodes filter Metrics include statistics about individual OpenSearch nodes. diff --git a/build.gradle b/build.gradle index d1ba583..3839c5a 100644 --- a/build.gradle +++ b/build.gradle @@ -1,3 +1,4 @@ +import org.opensearch.gradle.PropertyNormalization import org.opensearch.gradle.test.RestIntegTestTask import java.util.regex.Matcher @@ -124,8 +125,13 @@ tasks.named("check").configure { dependsOn(integTest) } // Temporary disable task :testingConventions testingConventions.enabled = false +// Directory for snapshot repository +File repositoryDir = new File(project.buildDir, "shared-repository") + testClusters.all { numberOfNodes = 2 + // Configuring repo path for 'fs' type snapshot repository + setting 'path.repo', "${repositoryDir.absolutePath}", PropertyNormalization.IGNORE_VALUE // It seems cluster name can not be customized here. It gives an error: // Testclusters does not allow the following settings to be changed:[cluster.name] for node{::yamlRestTest-0} diff --git a/src/main/java/org/compuscene/metrics/prometheus/PrometheusSettings.java b/src/main/java/org/compuscene/metrics/prometheus/PrometheusSettings.java index b67045f..c757c34 100644 --- a/src/main/java/org/compuscene/metrics/prometheus/PrometheusSettings.java +++ b/src/main/java/org/compuscene/metrics/prometheus/PrometheusSettings.java @@ -55,7 +55,7 @@ public enum INDEX_FILTER_OPTIONS { static String PROMETHEUS_SELECTED_OPTION_KEY = "prometheus.indices_filter.selected_option"; /** - * This setting is used configure weather to expose cluster settings metrics or not. The default value is true. + * This setting is used configure whether to expose cluster settings metrics or not. The default value is true. * Can be configured in opensearch.yml file or update dynamically under key {@link #PROMETHEUS_CLUSTER_SETTINGS_KEY}. */ public static final Setting PROMETHEUS_CLUSTER_SETTINGS = @@ -63,7 +63,7 @@ public enum INDEX_FILTER_OPTIONS { Setting.Property.Dynamic, Setting.Property.NodeScope); /** - * This setting is used configure weather to expose low level index metrics or not. The default value is true. + * This setting is used configure whether to expose low level index metrics or not. The default value is true. * Can be configured in opensearch.yml file or update dynamically under key {@link #PROMETHEUS_INDICES_KEY}. */ public static final Setting PROMETHEUS_INDICES = @@ -71,7 +71,7 @@ public enum INDEX_FILTER_OPTIONS { Setting.Property.Dynamic, Setting.Property.NodeScope); /** - * This setting is used configure weather to expose snapshot metrics or not. The default value is false. + * This setting is used configure whether to expose snapshot metrics or not. The default value is false. * Can be configured in opensearch.yml file or update dynamically under key {@link #PROMETHEUS_SNAPSHOTS_KEY}. */ public static final Setting PROMETHEUS_SNAPSHOTS = diff --git a/src/main/java/org/opensearch/action/SnapshotsResponse.java b/src/main/java/org/opensearch/action/SnapshotsResponse.java index 01b1e68..375226e 100644 --- a/src/main/java/org/opensearch/action/SnapshotsResponse.java +++ b/src/main/java/org/opensearch/action/SnapshotsResponse.java @@ -23,6 +23,7 @@ import org.opensearch.snapshots.SnapshotInfo; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -39,16 +40,14 @@ public class SnapshotsResponse extends ActionResponse { */ public SnapshotsResponse(StreamInput in) throws IOException { super(in); - snapshotInfos = in.readList(SnapshotInfo::new); + snapshotInfos = Collections.synchronizedList(in.readList(SnapshotInfo::new)); } /** * A constructor. - * - * @param snapshotInfos A list of {@link SnapshotInfo} objects to initialize the instance with. */ - public SnapshotsResponse(List snapshotInfos) { - this.snapshotInfos = Collections.unmodifiableList(snapshotInfos); + public SnapshotsResponse() { + this.snapshotInfos = Collections.synchronizedList(new ArrayList<>()); } /** @@ -64,11 +63,17 @@ public void writeTo(StreamOutput out) throws IOException { /** * Getter for {@code snapshotInfos} list. - * The returned list is unmodifiable to ensure immutability. * * @return the list of {@link SnapshotInfo} objects */ public List getSnapshotInfos() { return snapshotInfos; } -} \ No newline at end of file + + /** + * Adds {@code snapshotInfosToAdd} to the {@code snapshotInfos} list. + */ + public void addSnapshotInfos(List snapshotInfosToAdd) { + snapshotInfos.addAll(snapshotInfosToAdd); + } +} diff --git a/src/main/java/org/opensearch/action/TransportNodePrometheusMetricsAction.java b/src/main/java/org/opensearch/action/TransportNodePrometheusMetricsAction.java index 66213e4..9a93c50 100644 --- a/src/main/java/org/opensearch/action/TransportNodePrometheusMetricsAction.java +++ b/src/main/java/org/opensearch/action/TransportNodePrometheusMetricsAction.java @@ -40,19 +40,17 @@ import org.opensearch.action.support.HandledTransportAction; import org.opensearch.client.Client; import org.opensearch.client.Requests; +import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.common.Nullable; -import org.opensearch.common.action.ActionFuture; import org.opensearch.common.inject.Inject; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.core.action.ActionListener; -import org.opensearch.snapshots.SnapshotInfo; import org.opensearch.tasks.Task; import org.opensearch.transport.TransportService; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ExecutionException; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.stream.Collectors; /** @@ -111,7 +109,8 @@ private class AsyncAction { private NodesStatsResponse nodesStatsResponse = null; private IndicesStatsResponse indicesStatsResponse = null; private ClusterStateResponse clusterStateResponse = null; - private SnapshotsResponse snapshotsResponse = null; + private final Queue snapshotRepositories = new ConcurrentLinkedQueue<>(); + private final SnapshotsResponse snapshotsResponse = new SnapshotsResponse(); // read the state of prometheus dynamic settings only once at the beginning of the async request private final boolean isPrometheusIndices = prometheusSettings.getPrometheusIndices(); @@ -176,22 +175,14 @@ private void gatherRequests() { new ActionListener() { @Override public void onResponse(GetRepositoriesResponse response) { - List> snapshotsResponseFutures = response.repositories().stream() - .map(metadata -> new GetSnapshotsRequest(metadata.name())) - .map(snapshotsRequest -> client.admin().cluster().getSnapshots(snapshotsRequest)) - .collect(Collectors.toList()); - List snapshotInfos = new ArrayList<>(); - for (ActionFuture snapshotsResponseFuture : snapshotsResponseFutures) { - try { - GetSnapshotsResponse getSnapshotsResponse = snapshotsResponseFuture.get(); - snapshotInfos.addAll(getSnapshotsResponse.getSnapshots()); - } catch (InterruptedException | ExecutionException e) { - listener.onFailure(new OpenSearchException("Get snapshots request failed", e)); - return; - } + if (response.repositories().isEmpty()) { + gatherRequests(); + return; } - snapshotsResponse = new SnapshotsResponse(snapshotInfos); - gatherRequests(); + snapshotRepositories.addAll(response.repositories().stream() + .map(RepositoryMetadata::name).collect(Collectors.toList())); + String snapshotRepository = snapshotRepositories.poll(); + client.admin().cluster().getSnapshots(new GetSnapshotsRequest(snapshotRepository), snapshotsResponseActionListener); } @Override @@ -200,6 +191,26 @@ public void onFailure(Exception e) { } }; + private final ActionListener snapshotsResponseActionListener = + new ActionListener() { + @Override + public void onResponse(GetSnapshotsResponse response) { + snapshotsResponse.addSnapshotInfos(response.getSnapshots()); + if (snapshotRepositories.isEmpty()) { + gatherRequests(); + return; + } + // Fetch the snapshots for the next repository in the queue + String snapshotRepository = snapshotRepositories.poll(); + client.admin().cluster().getSnapshots(new GetSnapshotsRequest(snapshotRepository), snapshotsResponseActionListener); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(new OpenSearchException("Get snapshots request failed", e)); + } + }; + private final ActionListener clusterStateResponseActionListener = new ActionListener() { @Override diff --git a/src/yamlRestTest/resources/rest-api-spec/test/70_10_snapshots_metrics.yml b/src/yamlRestTest/resources/rest-api-spec/test/70_10_snapshots_metrics.yml new file mode 100644 index 0000000..b52a0fe --- /dev/null +++ b/src/yamlRestTest/resources/rest-api-spec/test/70_10_snapshots_metrics.yml @@ -0,0 +1,50 @@ +--- +"Verify snapshots metrics enabled": + + # Create 'fs' snapshot repository + - do: + snapshot.create_repository: + repository: test_repo_get_1 + body: + type: fs + settings: + location: "test_repo_get_1_loc" + + - do: + snapshot.get_repository: { } + + - is_true: test_repo_get_1 + + # Enable snapshots metrics + - do: + cluster.put_settings: + body: + persistent: + prometheus.snapshots: "true" + flat_settings: true + + - match: { persistent: { prometheus.snapshots: "true" } } + + # Create snapshot + - do: + snapshot.create: + repository: test_repo_get_1 + snapshot: test_snapshot_1 + wait_for_completion: true + + - match: { snapshot.snapshot: test_snapshot_1 } + - match: { snapshot.state : SUCCESS } + + # Fetch and verify metrics + - do: + prometheus.metrics: {} + + - match: + $body: | + /.* + \# \s* HELP \s+ opensearch_min_snapshot_age \s+.*\n + \# \s* TYPE \s+ opensearch_min_snapshot_age \s+ gauge\n + opensearch_min_snapshot_age\{ + cluster="yamlRestTest",sm_policy="adhoc", + \}\s+\d+\.\d+\n + .*/ From 03bd45fa1f187106f68706a01f9f1155d525fe8c Mon Sep 17 00:00:00 2001 From: Smit Patel Date: Wed, 9 Oct 2024 15:50:32 +0530 Subject: [PATCH 3/3] addressed comments Signed-off-by: Smit Patel --- README.md | 2 +- build.gradle | 2 +- .../action/NodePrometheusMetricsResponse.java | 12 ++++++++++-- 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 0cb1b62..6351818 100644 --- a/README.md +++ b/README.md @@ -151,7 +151,7 @@ prometheus.cluster.settings: false #### Snapshot metrics -To enable exporting snapshot metrics use: +By default, snapshot metrics are disabled. To enable exporting snapshot metrics use: ``` prometheus.snapshots: true ``` diff --git a/build.gradle b/build.gradle index 3839c5a..265a4d9 100644 --- a/build.gradle +++ b/build.gradle @@ -126,7 +126,7 @@ tasks.named("check").configure { dependsOn(integTest) } testingConventions.enabled = false // Directory for snapshot repository -File repositoryDir = new File(project.buildDir, "shared-repository") +File repositoryDir = new File(project.layout.buildDirectory.get().asFile, "shared-repository") testClusters.all { numberOfNodes = 2 diff --git a/src/main/java/org/opensearch/action/NodePrometheusMetricsResponse.java b/src/main/java/org/opensearch/action/NodePrometheusMetricsResponse.java index 0468042..cee1a81 100644 --- a/src/main/java/org/opensearch/action/NodePrometheusMetricsResponse.java +++ b/src/main/java/org/opensearch/action/NodePrometheusMetricsResponse.java @@ -17,6 +17,7 @@ package org.opensearch.action; +import org.opensearch.Version; import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse; import org.opensearch.action.admin.cluster.node.stats.NodeStats; @@ -29,6 +30,7 @@ import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.search.pipeline.SearchPipelineStats; import java.io.IOException; @@ -57,7 +59,11 @@ public NodePrometheusMetricsResponse(StreamInput in) throws IOException { nodeStats = in.readArray(NodeStats::new, NodeStats[]::new); indicesStats = PackageAccessHelper.createIndicesStatsResponse(in); clusterStatsData = new ClusterStatsData(in); - snapshotsResponse = new SnapshotsResponse(in); + if (in.getVersion().onOrAfter(Version.V_2_17_1)) { + snapshotsResponse = new SnapshotsResponse(in); + } else { + snapshotsResponse = null; + } } /** @@ -145,6 +151,8 @@ public void writeTo(StreamOutput out) throws IOException { out.writeArray(nodeStats); out.writeOptionalWriteable(indicesStats); clusterStatsData.writeTo(out); - snapshotsResponse.writeTo(out); + if (out.getVersion().onOrAfter(Version.V_2_17_1)) { + snapshotsResponse.writeTo(out); + } } }