diff --git a/build.gradle b/build.gradle index 7c214f3..d1ba583 100644 --- a/build.gradle +++ b/build.gradle @@ -101,7 +101,7 @@ dependencies { restResources { restApi { - includeCore '_common', 'cat', 'cluster', 'nodes', 'indices', 'index' + includeCore '_common', 'cat', 'cluster', 'nodes', 'indices', 'index', 'snapshot' } } diff --git a/src/main/java/org/compuscene/metrics/prometheus/PrometheusMetricsCollector.java b/src/main/java/org/compuscene/metrics/prometheus/PrometheusMetricsCollector.java index a5adbc7..def70e6 100644 --- a/src/main/java/org/compuscene/metrics/prometheus/PrometheusMetricsCollector.java +++ b/src/main/java/org/compuscene/metrics/prometheus/PrometheusMetricsCollector.java @@ -18,6 +18,7 @@ package org.compuscene.metrics.prometheus; import org.opensearch.action.ClusterStatsData; +import org.opensearch.action.SnapshotsResponse; import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; import org.opensearch.action.admin.cluster.node.stats.NodeStats; import org.opensearch.action.admin.indices.stats.CommonStats; @@ -37,6 +38,8 @@ import org.opensearch.monitor.os.OsStats; import org.opensearch.monitor.process.ProcessStats; import org.opensearch.script.ScriptStats; +import org.opensearch.snapshots.SnapshotInfo; +import org.opensearch.snapshots.SnapshotState; import org.opensearch.threadpool.ThreadPoolStats; import org.opensearch.transport.TransportStats; @@ -54,19 +57,23 @@ public class PrometheusMetricsCollector { private boolean isPrometheusClusterSettings; private boolean isPrometheusIndices; + private boolean isPrometheusSnapshots; private PrometheusMetricsCatalog catalog; /** * A constructor. * @param catalog {@link PrometheusMetricsCatalog} * @param isPrometheusIndices boolean flag for index level metric + * @param isPrometheusSnapshots boolean flag for snapshots related metrics * @param isPrometheusClusterSettings boolean flag cluster settings metrics */ public PrometheusMetricsCollector(PrometheusMetricsCatalog catalog, boolean isPrometheusIndices, + boolean isPrometheusSnapshots, boolean isPrometheusClusterSettings) { this.isPrometheusClusterSettings = isPrometheusClusterSettings; this.isPrometheusIndices = isPrometheusIndices; + this.isPrometheusSnapshots = isPrometheusSnapshots; this.catalog = catalog; } @@ -80,6 +87,7 @@ public void registerMetrics() { registerNodeMetrics(); registerIndicesMetrics(); registerPerIndexMetrics(); + registerSnapshotMetrics(); registerTransportMetrics(); registerHTTPMetrics(); registerThreadPoolMetrics(); @@ -465,6 +473,30 @@ private void updatePerIndexMetrics(@Nullable ClusterHealthResponse chr, @Nullabl } } + @SuppressWarnings("checkstyle:LineLength") + private void registerSnapshotMetrics() { + catalog.registerClusterGauge("min_snapshot_age", "Time elapsed in milliseconds since the most recent successful snapshot's start time", "sm_policy"); + } + + private void updateSnapshotsMetrics(@Nullable SnapshotsResponse snapshotsResponse) { + if (snapshotsResponse == null) { + return; + } + Map smPolicyMinSnapshotAge = new HashMap<>(); + for (SnapshotInfo snapshotInfo : snapshotsResponse.getSnapshotInfos()) { + // emit min_snapshot_age metric only for successful snapshots + if (snapshotInfo.state() != SnapshotState.SUCCESS) { + continue; + } + String smPolicy = snapshotInfo.userMetadata() == null ? "adhoc" : snapshotInfo.userMetadata().getOrDefault("sm_policy", "adhoc").toString(); + long snapshotAge = System.currentTimeMillis() - snapshotInfo.startTime(); + smPolicyMinSnapshotAge.compute(smPolicy, (key, oldValue) -> oldValue == null ? snapshotAge : Math.min(oldValue, snapshotAge)); + } + for(Map.Entry entry : smPolicyMinSnapshotAge.entrySet()) { + catalog.setClusterGauge("min_snapshot_age", entry.getValue(), entry.getKey()); + } + } + @SuppressWarnings("checkstyle:LineLength") private void updatePerIndexContextMetrics(String indexName, String context, CommonStats idx) { catalog.setClusterGauge("index_doc_number", idx.getDocs().getCount(), indexName, context); @@ -920,12 +952,14 @@ private void updateESSettings(@Nullable ClusterStatsData stats) { * @param nodeStats NodeStats filtered using nodes filter * @param indicesStats IndicesStatsResponse * @param clusterStatsData ClusterStatsData + * @param snapshotsResponse SnapshotsResponse */ public void updateMetrics(String originNodeName, String originNodeId, @Nullable ClusterHealthResponse clusterHealthResponse, NodeStats[] nodeStats, @Nullable IndicesStatsResponse indicesStats, - @Nullable ClusterStatsData clusterStatsData) { + @Nullable ClusterStatsData clusterStatsData, + @Nullable SnapshotsResponse snapshotsResponse) { Summary.Timer timer = catalog.startSummaryTimer( new Tuple<>(originNodeName, originNodeId), "metrics_generate_time_seconds"); @@ -956,7 +990,9 @@ public void updateMetrics(String originNodeName, String originNodeId, if (isPrometheusClusterSettings) { updateESSettings(clusterStatsData); } - + if (isPrometheusSnapshots) { + updateSnapshotsMetrics(snapshotsResponse); + } timer.observeDuration(); } diff --git a/src/main/java/org/compuscene/metrics/prometheus/PrometheusSettings.java b/src/main/java/org/compuscene/metrics/prometheus/PrometheusSettings.java index 984464a..b67045f 100644 --- a/src/main/java/org/compuscene/metrics/prometheus/PrometheusSettings.java +++ b/src/main/java/org/compuscene/metrics/prometheus/PrometheusSettings.java @@ -49,6 +49,7 @@ public enum INDEX_FILTER_OPTIONS { static String PROMETHEUS_CLUSTER_SETTINGS_KEY = "prometheus.cluster.settings"; static String PROMETHEUS_INDICES_KEY = "prometheus.indices"; + static String PROMETHEUS_SNAPSHOTS_KEY = "prometheus.snapshots"; static String PROMETHEUS_NODES_FILTER_KEY = "prometheus.nodes.filter"; static String PROMETHEUS_SELECTED_INDICES_KEY = "prometheus.indices_filter.selected_indices"; static String PROMETHEUS_SELECTED_OPTION_KEY = "prometheus.indices_filter.selected_option"; @@ -69,6 +70,14 @@ public enum INDEX_FILTER_OPTIONS { Setting.boolSetting(PROMETHEUS_INDICES_KEY, true, Setting.Property.Dynamic, Setting.Property.NodeScope); + /** + * This setting is used configure weather to expose snapshot metrics or not. The default value is false. + * Can be configured in opensearch.yml file or update dynamically under key {@link #PROMETHEUS_SNAPSHOTS_KEY}. + */ + public static final Setting PROMETHEUS_SNAPSHOTS = + Setting.boolSetting(PROMETHEUS_SNAPSHOTS_KEY, false, + Setting.Property.Dynamic, Setting.Property.NodeScope); + /** * This setting is used configure which cluster nodes to gather metrics from. The default value is _local. * Can be configured in opensearch.yml file or update dynamically under key {@link #PROMETHEUS_NODES_FILTER_KEY}. @@ -97,6 +106,7 @@ public enum INDEX_FILTER_OPTIONS { private volatile boolean clusterSettings; private volatile boolean indices; + private volatile boolean snapshots; private volatile String nodesFilter; private volatile String selectedIndices; private volatile INDEX_FILTER_OPTIONS selectedOption; @@ -109,11 +119,13 @@ public enum INDEX_FILTER_OPTIONS { public PrometheusSettings(Settings settings, ClusterSettings clusterSettings) { setPrometheusClusterSettings(PROMETHEUS_CLUSTER_SETTINGS.get(settings)); setPrometheusIndices(PROMETHEUS_INDICES.get(settings)); + setPrometheusSnapshots(PROMETHEUS_SNAPSHOTS.get(settings)); setPrometheusNodesFilter(PROMETHEUS_NODES_FILTER.get(settings)); setPrometheusSelectedIndices(PROMETHEUS_SELECTED_INDICES.get(settings)); setPrometheusSelectedOption(PROMETHEUS_SELECTED_OPTION.get(settings)); clusterSettings.addSettingsUpdateConsumer(PROMETHEUS_CLUSTER_SETTINGS, this::setPrometheusClusterSettings); clusterSettings.addSettingsUpdateConsumer(PROMETHEUS_INDICES, this::setPrometheusIndices); + clusterSettings.addSettingsUpdateConsumer(PROMETHEUS_SNAPSHOTS, this::setPrometheusSnapshots); clusterSettings.addSettingsUpdateConsumer(PROMETHEUS_NODES_FILTER, this::setPrometheusNodesFilter); clusterSettings.addSettingsUpdateConsumer(PROMETHEUS_SELECTED_INDICES, this::setPrometheusSelectedIndices); clusterSettings.addSettingsUpdateConsumer(PROMETHEUS_SELECTED_OPTION, this::setPrometheusSelectedOption); @@ -127,6 +139,10 @@ private void setPrometheusIndices(boolean flag) { this.indices = flag; } + private void setPrometheusSnapshots(boolean flag) { + this.snapshots = flag; + } + private void setPrometheusNodesFilter(String filter) { this.nodesFilter = filter; } private void setPrometheusSelectedIndices(String selectedIndices) { @@ -153,6 +169,14 @@ public boolean getPrometheusIndices() { return this.indices; } + /** + * Get value of settings key {@link #PROMETHEUS_SNAPSHOTS_KEY}. + * @return boolean value of the key + */ + public boolean getPrometheusSnapshots() { + return this.snapshots; + } + /** * Get value of settings key {@link #PROMETHEUS_NODES_FILTER_KEY}. * @return boolean value of the key diff --git a/src/main/java/org/opensearch/action/NodePrometheusMetricsResponse.java b/src/main/java/org/opensearch/action/NodePrometheusMetricsResponse.java index d84de13..0468042 100644 --- a/src/main/java/org/opensearch/action/NodePrometheusMetricsResponse.java +++ b/src/main/java/org/opensearch/action/NodePrometheusMetricsResponse.java @@ -43,6 +43,7 @@ public class NodePrometheusMetricsResponse extends ActionResponse { private final NodeStats[] nodeStats; @Nullable private final IndicesStatsResponse indicesStats; private ClusterStatsData clusterStatsData = null; + @Nullable private final SnapshotsResponse snapshotsResponse; /** * A constructor that materialize the instance from inputStream. @@ -56,6 +57,7 @@ public NodePrometheusMetricsResponse(StreamInput in) throws IOException { nodeStats = in.readArray(NodeStats::new, NodeStats[]::new); indicesStats = PackageAccessHelper.createIndicesStatsResponse(in); clusterStatsData = new ClusterStatsData(in); + snapshotsResponse = new SnapshotsResponse(in); } /** @@ -65,6 +67,7 @@ public NodePrometheusMetricsResponse(StreamInput in) throws IOException { * @param nodesStats NodesStats * @param indicesStats IndicesStats * @param clusterStateResponse ClusterStateResponse + * @param snapshotsResponse SnapshotsResponse * @param settings Settings * @param clusterSettings ClusterSettings */ @@ -73,6 +76,7 @@ public NodePrometheusMetricsResponse(ClusterHealthResponse clusterHealth, NodeStats[] nodesStats, @Nullable IndicesStatsResponse indicesStats, @Nullable ClusterStateResponse clusterStateResponse, + @Nullable SnapshotsResponse snapshotsResponse, Settings settings, ClusterSettings clusterSettings) { this.clusterHealth = clusterHealth; @@ -82,6 +86,7 @@ public NodePrometheusMetricsResponse(ClusterHealthResponse clusterHealth, if (clusterStateResponse != null) { this.clusterStatsData = new ClusterStatsData(clusterStateResponse, settings, clusterSettings); } + this.snapshotsResponse = snapshotsResponse; } /** @@ -106,6 +111,15 @@ public NodeStats[] getNodeStats() { return this.nodeStats; } + /** + * Get internal {@link SnapshotsResponse} object. + * @return SnapshotsResponse object + */ + @Nullable + public SnapshotsResponse getSnapshotsResponse() { + return this.snapshotsResponse; + } + /** * Get internal {@link IndicesStatsResponse} object. * @return IndicesStatsResponse object @@ -131,5 +145,6 @@ public void writeTo(StreamOutput out) throws IOException { out.writeArray(nodeStats); out.writeOptionalWriteable(indicesStats); clusterStatsData.writeTo(out); + snapshotsResponse.writeTo(out); } } diff --git a/src/main/java/org/opensearch/action/SnapshotsResponse.java b/src/main/java/org/opensearch/action/SnapshotsResponse.java new file mode 100644 index 0000000..01b1e68 --- /dev/null +++ b/src/main/java/org/opensearch/action/SnapshotsResponse.java @@ -0,0 +1,74 @@ +/* + * Copyright [2018] [Vincent VAN HOLLEBEKE] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.opensearch.action; + +import org.opensearch.core.action.ActionResponse; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.snapshots.SnapshotInfo; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +/** + * Represents a container class for holding response data related to snapshots. + */ +public class SnapshotsResponse extends ActionResponse { + private final List snapshotInfos; + + /** + * A constructor. + * @param in A streamInput to materialize the instance from + * @throws IOException if there is an exception reading from inputStream + */ + public SnapshotsResponse(StreamInput in) throws IOException { + super(in); + snapshotInfos = in.readList(SnapshotInfo::new); + } + + /** + * A constructor. + * + * @param snapshotInfos A list of {@link SnapshotInfo} objects to initialize the instance with. + */ + public SnapshotsResponse(List snapshotInfos) { + this.snapshotInfos = Collections.unmodifiableList(snapshotInfos); + } + + /** + * Writes the instance into {@link StreamOutput}. + * + * @param out the output stream to which the instance is to be written + * @throws IOException if there is an exception writing to the output stream + */ + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeCollection(snapshotInfos); + } + + /** + * Getter for {@code snapshotInfos} list. + * The returned list is unmodifiable to ensure immutability. + * + * @return the list of {@link SnapshotInfo} objects + */ + public List getSnapshotInfos() { + return snapshotInfos; + } +} \ No newline at end of file diff --git a/src/main/java/org/opensearch/action/TransportNodePrometheusMetricsAction.java b/src/main/java/org/opensearch/action/TransportNodePrometheusMetricsAction.java index 3f07b65..66213e4 100644 --- a/src/main/java/org/opensearch/action/TransportNodePrometheusMetricsAction.java +++ b/src/main/java/org/opensearch/action/TransportNodePrometheusMetricsAction.java @@ -28,6 +28,10 @@ import org.opensearch.action.admin.cluster.node.stats.NodeStats; import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest; import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesRequest; +import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse; +import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest; +import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse; import org.opensearch.action.admin.cluster.state.ClusterStateRequest; import org.opensearch.action.admin.cluster.state.ClusterStateResponse; import org.opensearch.action.admin.indices.stats.IndicesStatsRequest; @@ -37,13 +41,20 @@ import org.opensearch.client.Client; import org.opensearch.client.Requests; import org.opensearch.common.Nullable; +import org.opensearch.common.action.ActionFuture; import org.opensearch.common.inject.Inject; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.core.action.ActionListener; +import org.opensearch.snapshots.SnapshotInfo; import org.opensearch.tasks.Task; import org.opensearch.transport.TransportService; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + /** * Transport action class for Prometheus Exporter plugin. * @@ -94,15 +105,17 @@ private class AsyncAction { private final NodesStatsRequest nodesStatsRequest; private final IndicesStatsRequest indicesStatsRequest; private final ClusterStateRequest clusterStateRequest; - + private final GetRepositoriesRequest repositoriesRequest; private ClusterHealthResponse clusterHealthResponse = null; private NodesInfoResponse localNodesInfoResponse = null; private NodesStatsResponse nodesStatsResponse = null; private IndicesStatsResponse indicesStatsResponse = null; private ClusterStateResponse clusterStateResponse = null; + private SnapshotsResponse snapshotsResponse = null; // read the state of prometheus dynamic settings only once at the beginning of the async request private final boolean isPrometheusIndices = prometheusSettings.getPrometheusIndices(); + private final boolean isPrometheusSnapshots = prometheusSettings.getPrometheusSnapshots(); private final boolean isPrometheusClusterSettings = prometheusSettings.getPrometheusClusterSettings(); private final String prometheusNodesFilter = prometheusSettings.getNodesFilter(); @@ -142,6 +155,12 @@ private AsyncAction(ActionListener listener) { this.indicesStatsRequest = null; } + if (isPrometheusSnapshots) { + this.repositoriesRequest = new GetRepositoriesRequest(); + } else { + this.repositoriesRequest = null; + } + // Cluster settings are get via ClusterStateRequest (see elasticsearch RestClusterGetSettingsAction for details) // We prefer to send it to master node (hence local=false; it should be set by default but we want to be sure). this.clusterStateRequest = isPrometheusClusterSettings ? Requests.clusterStateRequest() @@ -150,15 +169,47 @@ private AsyncAction(ActionListener listener) { private void gatherRequests() { listener.onResponse(buildResponse(clusterHealthResponse, localNodesInfoResponse, nodesStatsResponse, indicesStatsResponse, - clusterStateResponse)); + clusterStateResponse, snapshotsResponse)); } + private final ActionListener repositoriesResponseActionListener = + new ActionListener() { + @Override + public void onResponse(GetRepositoriesResponse response) { + List> snapshotsResponseFutures = response.repositories().stream() + .map(metadata -> new GetSnapshotsRequest(metadata.name())) + .map(snapshotsRequest -> client.admin().cluster().getSnapshots(snapshotsRequest)) + .collect(Collectors.toList()); + List snapshotInfos = new ArrayList<>(); + for (ActionFuture snapshotsResponseFuture : snapshotsResponseFutures) { + try { + GetSnapshotsResponse getSnapshotsResponse = snapshotsResponseFuture.get(); + snapshotInfos.addAll(getSnapshotsResponse.getSnapshots()); + } catch (InterruptedException | ExecutionException e) { + listener.onFailure(new OpenSearchException("Get snapshots request failed", e)); + return; + } + } + snapshotsResponse = new SnapshotsResponse(snapshotInfos); + gatherRequests(); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(new OpenSearchException("Get repositories request failed", e)); + } + }; + private final ActionListener clusterStateResponseActionListener = new ActionListener() { @Override public void onResponse(ClusterStateResponse response) { clusterStateResponse = response; - gatherRequests(); + if (isPrometheusSnapshots) { + client.admin().cluster().getRepositories(repositoriesRequest, repositoriesResponseActionListener); + } else { + gatherRequests(); + } } @Override @@ -175,7 +226,7 @@ public void onResponse(IndicesStatsResponse response) { if (isPrometheusClusterSettings) { client.admin().cluster().state(clusterStateRequest, clusterStateResponseActionListener); } else { - gatherRequests(); + clusterStateResponseActionListener.onResponse(null); } } @@ -239,12 +290,13 @@ protected NodePrometheusMetricsResponse buildResponse(ClusterHealthResponse clus NodesInfoResponse localNodesInfoResponse, NodesStatsResponse nodesStats, @Nullable IndicesStatsResponse indicesStats, - @Nullable ClusterStateResponse clusterStateResponse) { + @Nullable ClusterStateResponse clusterStateResponse, + @Nullable SnapshotsResponse snapshotsResponse) { NodePrometheusMetricsResponse response = new NodePrometheusMetricsResponse( clusterHealth, localNodesInfoResponse, nodesStats.getNodes().toArray(new NodeStats[0]), - indicesStats, clusterStateResponse, + indicesStats, clusterStateResponse, snapshotsResponse, settings, clusterSettings); if (logger.isTraceEnabled()) { logger.trace("Return response: [{}]", response); diff --git a/src/main/java/org/opensearch/plugin/prometheus/PrometheusExporterPlugin.java b/src/main/java/org/opensearch/plugin/prometheus/PrometheusExporterPlugin.java index a05c8e0..edc2e90 100644 --- a/src/main/java/org/opensearch/plugin/prometheus/PrometheusExporterPlugin.java +++ b/src/main/java/org/opensearch/plugin/prometheus/PrometheusExporterPlugin.java @@ -75,6 +75,7 @@ public List> getSettings() { List> settings = Arrays.asList( PrometheusSettings.PROMETHEUS_CLUSTER_SETTINGS, PrometheusSettings.PROMETHEUS_INDICES, + PrometheusSettings.PROMETHEUS_SNAPSHOTS, PrometheusSettings.PROMETHEUS_NODES_FILTER, PrometheusSettings.PROMETHEUS_SELECTED_INDICES, PrometheusSettings.PROMETHEUS_SELECTED_OPTION, diff --git a/src/main/java/org/opensearch/rest/prometheus/RestPrometheusMetricsAction.java b/src/main/java/org/opensearch/rest/prometheus/RestPrometheusMetricsAction.java index ed5c9fd..2e34dbf 100644 --- a/src/main/java/org/opensearch/rest/prometheus/RestPrometheusMetricsAction.java +++ b/src/main/java/org/opensearch/rest/prometheus/RestPrometheusMetricsAction.java @@ -91,8 +91,8 @@ public String getName() { return "prometheus_metrics_action"; } - // This method does not throw any IOException because there are no request parameters to be parsed - // and processed. This may change in the future. + // This method does not throw any IOException because there are no request parameters to be parsed + // and processed. This may change in the future. @Override protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) { if (logger.isTraceEnabled()) { @@ -125,12 +125,13 @@ public RestResponse buildResponse(NodePrometheusMetricsResponse response) throws collector = new PrometheusMetricsCollector( catalog, prometheusSettings.getPrometheusIndices(), + prometheusSettings.getPrometheusSnapshots(), prometheusSettings.getPrometheusClusterSettings() ); collector.registerMetrics(); collector.updateMetrics( nodeName, nodeId, response.getClusterHealth(), response.getNodeStats(), - response.getIndicesStats(), response.getClusterStatsData()); + response.getIndicesStats(), response.getClusterStatsData(), response.getSnapshotsResponse()); textContent = collector.getTextContent(); } catch (Exception ex) { // We use try-catch block to catch exception from Prometheus catalog and collector processing