diff --git a/README.md b/README.md index 3adbaa9..8826fd9 100644 --- a/README.md +++ b/README.md @@ -145,6 +145,13 @@ To disable exporting cluster settings use: prometheus.cluster.settings: false ``` +#### Snapshot metrics + +To enable exporting snapshot metrics use: +``` +prometheus.snapshots: true +``` + #### Nodes filter Metrics include statistics about individual OpenSearch nodes. diff --git a/build.gradle b/build.gradle index d1ba583..3839c5a 100644 --- a/build.gradle +++ b/build.gradle @@ -1,3 +1,4 @@ +import org.opensearch.gradle.PropertyNormalization import org.opensearch.gradle.test.RestIntegTestTask import java.util.regex.Matcher @@ -124,8 +125,13 @@ tasks.named("check").configure { dependsOn(integTest) } // Temporary disable task :testingConventions testingConventions.enabled = false +// Directory for snapshot repository +File repositoryDir = new File(project.buildDir, "shared-repository") + testClusters.all { numberOfNodes = 2 + // Configuring repo path for 'fs' type snapshot repository + setting 'path.repo', "${repositoryDir.absolutePath}", PropertyNormalization.IGNORE_VALUE // It seems cluster name can not be customized here. It gives an error: // Testclusters does not allow the following settings to be changed:[cluster.name] for node{::yamlRestTest-0} diff --git a/src/main/java/org/compuscene/metrics/prometheus/PrometheusSettings.java b/src/main/java/org/compuscene/metrics/prometheus/PrometheusSettings.java index b67045f..c757c34 100644 --- a/src/main/java/org/compuscene/metrics/prometheus/PrometheusSettings.java +++ b/src/main/java/org/compuscene/metrics/prometheus/PrometheusSettings.java @@ -55,7 +55,7 @@ public enum INDEX_FILTER_OPTIONS { static String PROMETHEUS_SELECTED_OPTION_KEY = "prometheus.indices_filter.selected_option"; /** - * This setting is used configure weather to expose cluster settings metrics or not. The default value is true. + * This setting is used configure whether to expose cluster settings metrics or not. The default value is true. * Can be configured in opensearch.yml file or update dynamically under key {@link #PROMETHEUS_CLUSTER_SETTINGS_KEY}. */ public static final Setting PROMETHEUS_CLUSTER_SETTINGS = @@ -63,7 +63,7 @@ public enum INDEX_FILTER_OPTIONS { Setting.Property.Dynamic, Setting.Property.NodeScope); /** - * This setting is used configure weather to expose low level index metrics or not. The default value is true. + * This setting is used configure whether to expose low level index metrics or not. The default value is true. * Can be configured in opensearch.yml file or update dynamically under key {@link #PROMETHEUS_INDICES_KEY}. */ public static final Setting PROMETHEUS_INDICES = @@ -71,7 +71,7 @@ public enum INDEX_FILTER_OPTIONS { Setting.Property.Dynamic, Setting.Property.NodeScope); /** - * This setting is used configure weather to expose snapshot metrics or not. The default value is false. + * This setting is used configure whether to expose snapshot metrics or not. The default value is false. * Can be configured in opensearch.yml file or update dynamically under key {@link #PROMETHEUS_SNAPSHOTS_KEY}. */ public static final Setting PROMETHEUS_SNAPSHOTS = diff --git a/src/main/java/org/opensearch/action/SnapshotsResponse.java b/src/main/java/org/opensearch/action/SnapshotsResponse.java index 01b1e68..375226e 100644 --- a/src/main/java/org/opensearch/action/SnapshotsResponse.java +++ b/src/main/java/org/opensearch/action/SnapshotsResponse.java @@ -23,6 +23,7 @@ import org.opensearch.snapshots.SnapshotInfo; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -39,16 +40,14 @@ public class SnapshotsResponse extends ActionResponse { */ public SnapshotsResponse(StreamInput in) throws IOException { super(in); - snapshotInfos = in.readList(SnapshotInfo::new); + snapshotInfos = Collections.synchronizedList(in.readList(SnapshotInfo::new)); } /** * A constructor. - * - * @param snapshotInfos A list of {@link SnapshotInfo} objects to initialize the instance with. */ - public SnapshotsResponse(List snapshotInfos) { - this.snapshotInfos = Collections.unmodifiableList(snapshotInfos); + public SnapshotsResponse() { + this.snapshotInfos = Collections.synchronizedList(new ArrayList<>()); } /** @@ -64,11 +63,17 @@ public void writeTo(StreamOutput out) throws IOException { /** * Getter for {@code snapshotInfos} list. - * The returned list is unmodifiable to ensure immutability. * * @return the list of {@link SnapshotInfo} objects */ public List getSnapshotInfos() { return snapshotInfos; } -} \ No newline at end of file + + /** + * Adds {@code snapshotInfosToAdd} to the {@code snapshotInfos} list. + */ + public void addSnapshotInfos(List snapshotInfosToAdd) { + snapshotInfos.addAll(snapshotInfosToAdd); + } +} diff --git a/src/main/java/org/opensearch/action/TransportNodePrometheusMetricsAction.java b/src/main/java/org/opensearch/action/TransportNodePrometheusMetricsAction.java index 66213e4..b12b8c9 100644 --- a/src/main/java/org/opensearch/action/TransportNodePrometheusMetricsAction.java +++ b/src/main/java/org/opensearch/action/TransportNodePrometheusMetricsAction.java @@ -40,19 +40,17 @@ import org.opensearch.action.support.HandledTransportAction; import org.opensearch.client.Client; import org.opensearch.client.Requests; +import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.common.Nullable; -import org.opensearch.common.action.ActionFuture; import org.opensearch.common.inject.Inject; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.core.action.ActionListener; -import org.opensearch.snapshots.SnapshotInfo; import org.opensearch.tasks.Task; import org.opensearch.transport.TransportService; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ExecutionException; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.stream.Collectors; /** @@ -111,7 +109,10 @@ private class AsyncAction { private NodesStatsResponse nodesStatsResponse = null; private IndicesStatsResponse indicesStatsResponse = null; private ClusterStateResponse clusterStateResponse = null; - private SnapshotsResponse snapshotsResponse = null; + private final Queue snapshotRepositories = new ConcurrentLinkedQueue<>(); + private final int maxSnapshotRepositoriesToBeFetched = 5; + private volatile int snapshotRepositoriesFetchedCount = 0; + private final SnapshotsResponse snapshotsResponse = new SnapshotsResponse(); // read the state of prometheus dynamic settings only once at the beginning of the async request private final boolean isPrometheusIndices = prometheusSettings.getPrometheusIndices(); @@ -176,22 +177,14 @@ private void gatherRequests() { new ActionListener() { @Override public void onResponse(GetRepositoriesResponse response) { - List> snapshotsResponseFutures = response.repositories().stream() - .map(metadata -> new GetSnapshotsRequest(metadata.name())) - .map(snapshotsRequest -> client.admin().cluster().getSnapshots(snapshotsRequest)) - .collect(Collectors.toList()); - List snapshotInfos = new ArrayList<>(); - for (ActionFuture snapshotsResponseFuture : snapshotsResponseFutures) { - try { - GetSnapshotsResponse getSnapshotsResponse = snapshotsResponseFuture.get(); - snapshotInfos.addAll(getSnapshotsResponse.getSnapshots()); - } catch (InterruptedException | ExecutionException e) { - listener.onFailure(new OpenSearchException("Get snapshots request failed", e)); - return; - } + if (response.repositories().isEmpty()) { + gatherRequests(); + return; } - snapshotsResponse = new SnapshotsResponse(snapshotInfos); - gatherRequests(); + snapshotRepositories.addAll(response.repositories().stream() + .map(RepositoryMetadata::name).collect(Collectors.toList())); + String snapshotRepository = snapshotRepositories.poll(); + client.admin().cluster().getSnapshots(new GetSnapshotsRequest(snapshotRepository), snapshotsResponseActionListener); } @Override @@ -200,6 +193,27 @@ public void onFailure(Exception e) { } }; + private final ActionListener snapshotsResponseActionListener = + new ActionListener() { + @Override + public void onResponse(GetSnapshotsResponse response) { + snapshotsResponse.addSnapshotInfos(response.getSnapshots()); + snapshotRepositoriesFetchedCount++; + if (snapshotRepositories.isEmpty() || snapshotRepositoriesFetchedCount >= maxSnapshotRepositoriesToBeFetched) { + gatherRequests(); + return; + } + // Fetch the snapshots for the next repository in the queue + String snapshotRepository = snapshotRepositories.poll(); + client.admin().cluster().getSnapshots(new GetSnapshotsRequest(snapshotRepository), snapshotsResponseActionListener); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(new OpenSearchException("Get snapshots request failed", e)); + } + }; + private final ActionListener clusterStateResponseActionListener = new ActionListener() { @Override diff --git a/src/yamlRestTest/resources/rest-api-spec/test/70_10_snapshots_metrics.yml b/src/yamlRestTest/resources/rest-api-spec/test/70_10_snapshots_metrics.yml new file mode 100644 index 0000000..b52a0fe --- /dev/null +++ b/src/yamlRestTest/resources/rest-api-spec/test/70_10_snapshots_metrics.yml @@ -0,0 +1,50 @@ +--- +"Verify snapshots metrics enabled": + + # Create 'fs' snapshot repository + - do: + snapshot.create_repository: + repository: test_repo_get_1 + body: + type: fs + settings: + location: "test_repo_get_1_loc" + + - do: + snapshot.get_repository: { } + + - is_true: test_repo_get_1 + + # Enable snapshots metrics + - do: + cluster.put_settings: + body: + persistent: + prometheus.snapshots: "true" + flat_settings: true + + - match: { persistent: { prometheus.snapshots: "true" } } + + # Create snapshot + - do: + snapshot.create: + repository: test_repo_get_1 + snapshot: test_snapshot_1 + wait_for_completion: true + + - match: { snapshot.snapshot: test_snapshot_1 } + - match: { snapshot.state : SUCCESS } + + # Fetch and verify metrics + - do: + prometheus.metrics: {} + + - match: + $body: | + /.* + \# \s* HELP \s+ opensearch_min_snapshot_age \s+.*\n + \# \s* TYPE \s+ opensearch_min_snapshot_age \s+ gauge\n + opensearch_min_snapshot_age\{ + cluster="yamlRestTest",sm_policy="adhoc", + \}\s+\d+\.\d+\n + .*/