From be9f94258315405aee203a0737decd4e19343942 Mon Sep 17 00:00:00 2001 From: Lakshya Taragi <157457166+ltaragi@users.noreply.github.com> Date: Tue, 1 Oct 2024 19:07:29 +0530 Subject: [PATCH] [SnapshotV2] Add timestamp of last successful fetch of pinned timestamps in node stats (#15611) --------- Signed-off-by: Lakshya Taragi --- CHANGELOG.md | 1 + .../RemoteStorePinnedTimestampsIT.java | 41 ++++++++++ .../admin/cluster/node/stats/NodeStats.java | 26 +++++- .../cluster/node/stats/NodesStatsRequest.java | 3 +- .../node/stats/TransportNodesStatsAction.java | 3 +- .../stats/TransportClusterStatsAction.java | 1 + .../java/org/opensearch/node/NodeService.java | 7 +- .../remotestore/RemoteStoreNodeStats.java | 79 +++++++++++++++++++ .../cluster/node/stats/NodeStatsTests.java | 22 +++++- .../cluster/stats/ClusterStatsNodesTests.java | 1 + .../opensearch/cluster/DiskUsageTests.java | 6 ++ .../MockInternalClusterInfoService.java | 3 +- .../opensearch/test/InternalTestCluster.java | 1 + 13 files changed, 187 insertions(+), 7 deletions(-) create mode 100644 server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeStats.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 038cc407d582b..1a0859a7d5af8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add support for msearch API to pass search pipeline name - ([#15923](https://github.com/opensearch-project/OpenSearch/pull/15923)) - Add _list/indices API as paginated alternate to _cat/indices ([#14718](https://github.com/opensearch-project/OpenSearch/pull/14718)) - Add success and failure metrics for async shard fetch ([#15976](https://github.com/opensearch-project/OpenSearch/pull/15976)) +- Add new metric REMOTE_STORE to NodeStats API response ([#15611](https://github.com/opensearch-project/OpenSearch/pull/15611)) ### Dependencies - Bump `com.azure:azure-identity` from 1.13.0 to 1.13.2 ([#15578](https://github.com/opensearch-project/OpenSearch/pull/15578)) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsIT.java index 2fcda8c2d2f27..024e0e952eea5 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsIT.java @@ -9,6 +9,8 @@ package org.opensearch.remotestore; import org.opensearch.action.LatchedActionListener; +import org.opensearch.action.admin.cluster.node.stats.NodeStats; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.opensearch.common.collect.Tuple; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; @@ -20,6 +22,8 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; +import static org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest.Metric.REMOTE_STORE; + @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class RemoteStorePinnedTimestampsIT extends RemoteStoreBaseIntegTestCase { static final String INDEX_NAME = "remote-store-test-idx-1"; @@ -180,4 +184,41 @@ public void onFailure(Exception e) { assertBusy(() -> assertEquals(Set.of(timestamp2, timestamp3), RemoteStorePinnedTimestampService.getPinnedTimestamps().v2())); remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueMinutes(3)); } + + public void testLastSuccessfulFetchOfPinnedTimestampsPresentInNodeStats() throws Exception { + logger.info("Starting up cluster manager"); + logger.info("cluster.remote_store.pinned_timestamps.enabled set to true"); + logger.info("cluster.remote_store.pinned_timestamps.scheduler_interval set to minimum value of 1minute"); + Settings pinnedTimestampEnabledSettings = Settings.builder() + .put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED.getKey(), true) + .put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_SCHEDULER_INTERVAL.getKey(), "1m") + .build(); + internalCluster().startClusterManagerOnlyNode(pinnedTimestampEnabledSettings); + String remoteNodeName = internalCluster().startDataOnlyNodes(1, pinnedTimestampEnabledSettings).get(0); + ensureStableCluster(2); + RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance( + RemoteStorePinnedTimestampService.class, + remoteNodeName + ); + + remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1)); + + assertBusy(() -> { + long lastSuccessfulFetchOfPinnedTimestamps = RemoteStorePinnedTimestampService.getPinnedTimestamps().v1(); + assertTrue(lastSuccessfulFetchOfPinnedTimestamps > 0L); + NodesStatsResponse nodesStatsResponse = internalCluster().client() + .admin() + .cluster() + .prepareNodesStats() + .addMetric(REMOTE_STORE.metricName()) + .execute() + .actionGet(); + for (NodeStats nodeStats : nodesStatsResponse.getNodes()) { + long lastRecordedFetch = nodeStats.getRemoteStoreNodeStats().getLastSuccessfulFetchOfPinnedTimestamps(); + assertTrue(lastRecordedFetch >= lastSuccessfulFetchOfPinnedTimestamps); + } + }); + + remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueMinutes(3)); + } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java index 0917a0baff1ab..c91260778f037 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java @@ -59,6 +59,7 @@ import org.opensearch.monitor.process.ProcessStats; import org.opensearch.node.AdaptiveSelectionStats; import org.opensearch.node.NodesResourceUsageStats; +import org.opensearch.node.remotestore.RemoteStoreNodeStats; import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControlStats; import org.opensearch.repositories.RepositoriesStats; import org.opensearch.script.ScriptCacheStats; @@ -162,6 +163,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment { @Nullable private NodeCacheStats nodeCacheStats; + @Nullable + private RemoteStoreNodeStats remoteStoreNodeStats; + public NodeStats(StreamInput in) throws IOException { super(in); timestamp = in.readVLong(); @@ -243,6 +247,12 @@ public NodeStats(StreamInput in) throws IOException { } else { nodeCacheStats = null; } + // TODO: change version to V_2_18_0 + if (in.getVersion().onOrAfter(Version.CURRENT)) { + remoteStoreNodeStats = in.readOptionalWriteable(RemoteStoreNodeStats::new); + } else { + remoteStoreNodeStats = null; + } } public NodeStats( @@ -274,7 +284,8 @@ public NodeStats( @Nullable SegmentReplicationRejectionStats segmentReplicationRejectionStats, @Nullable RepositoriesStats repositoriesStats, @Nullable AdmissionControlStats admissionControlStats, - @Nullable NodeCacheStats nodeCacheStats + @Nullable NodeCacheStats nodeCacheStats, + @Nullable RemoteStoreNodeStats remoteStoreNodeStats ) { super(node); this.timestamp = timestamp; @@ -305,6 +316,7 @@ public NodeStats( this.repositoriesStats = repositoriesStats; this.admissionControlStats = admissionControlStats; this.nodeCacheStats = nodeCacheStats; + this.remoteStoreNodeStats = remoteStoreNodeStats; } public long getTimestamp() { @@ -467,6 +479,11 @@ public NodeCacheStats getNodeCacheStats() { return nodeCacheStats; } + @Nullable + public RemoteStoreNodeStats getRemoteStoreNodeStats() { + return remoteStoreNodeStats; + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); @@ -525,6 +542,10 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_2_14_0)) { out.writeOptionalWriteable(nodeCacheStats); } + // TODO: change version to V_2_18_0 + if (out.getVersion().onOrAfter(Version.CURRENT)) { + out.writeOptionalWriteable(remoteStoreNodeStats); + } } @Override @@ -631,6 +652,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (getNodeCacheStats() != null) { getNodeCacheStats().toXContent(builder, params); } + if (getRemoteStoreNodeStats() != null) { + getRemoteStoreNodeStats().toXContent(builder, params); + } return builder; } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java index f1f9f93afdad2..a5b00ed82d3cb 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java @@ -220,7 +220,8 @@ public enum Metric { SEGMENT_REPLICATION_BACKPRESSURE("segment_replication_backpressure"), REPOSITORIES("repositories"), ADMISSION_CONTROL("admission_control"), - CACHE_STATS("caches"); + CACHE_STATS("caches"), + REMOTE_STORE("remote_store"); private String metricName; diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java index 2c808adc97c7a..a98d245af872b 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java @@ -129,7 +129,8 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) { NodesStatsRequest.Metric.SEGMENT_REPLICATION_BACKPRESSURE.containedIn(metrics), NodesStatsRequest.Metric.REPOSITORIES.containedIn(metrics), NodesStatsRequest.Metric.ADMISSION_CONTROL.containedIn(metrics), - NodesStatsRequest.Metric.CACHE_STATS.containedIn(metrics) + NodesStatsRequest.Metric.CACHE_STATS.containedIn(metrics), + NodesStatsRequest.Metric.REMOTE_STORE.containedIn(metrics) ); } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java index a49ca2035783c..c4b3524cf6da5 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java @@ -174,6 +174,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq false, false, false, + false, false ); List shardsStats = new ArrayList<>(); diff --git a/server/src/main/java/org/opensearch/node/NodeService.java b/server/src/main/java/org/opensearch/node/NodeService.java index 1eb38ea63ad5a..9671fda14375d 100644 --- a/server/src/main/java/org/opensearch/node/NodeService.java +++ b/server/src/main/java/org/opensearch/node/NodeService.java @@ -54,6 +54,7 @@ import org.opensearch.indices.IndicesService; import org.opensearch.ingest.IngestService; import org.opensearch.monitor.MonitorService; +import org.opensearch.node.remotestore.RemoteStoreNodeStats; import org.opensearch.plugins.PluginsService; import org.opensearch.ratelimitting.admissioncontrol.AdmissionControlService; import org.opensearch.repositories.RepositoriesService; @@ -241,7 +242,8 @@ public NodeStats stats( boolean segmentReplicationTrackerStats, boolean repositoriesStats, boolean admissionControl, - boolean cacheService + boolean cacheService, + boolean remoteStoreNodeStats ) { // for indices stats we want to include previous allocated shards stats as well (it will // only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats) @@ -274,7 +276,8 @@ public NodeStats stats( segmentReplicationTrackerStats ? this.segmentReplicationStatsTracker.getTotalRejectionStats() : null, repositoriesStats ? this.repositoriesService.getRepositoriesStats() : null, admissionControl ? this.admissionControlService.stats() : null, - cacheService ? this.cacheService.stats(indices) : null + cacheService ? this.cacheService.stats(indices) : null, + remoteStoreNodeStats ? new RemoteStoreNodeStats() : null ); } diff --git a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeStats.java b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeStats.java new file mode 100644 index 0000000000000..8da8a17e21839 --- /dev/null +++ b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeStats.java @@ -0,0 +1,79 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.node.remotestore; + +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.xcontent.ToXContentFragment; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Objects; + +/** + * Node level remote store stats + * @opensearch.internal + */ +public class RemoteStoreNodeStats implements Writeable, ToXContentFragment { + + public static final String STATS_NAME = "remote_store"; + public static final String LAST_SUCCESSFUL_FETCH_OF_PINNED_TIMESTAMPS = "last_successful_fetch_of_pinned_timestamps"; + + /** + * Time stamp for the last successful fetch of pinned timestamps by the {@linkplain RemoteStorePinnedTimestampService} + */ + private final long lastSuccessfulFetchOfPinnedTimestamps; + + public RemoteStoreNodeStats() { + this.lastSuccessfulFetchOfPinnedTimestamps = RemoteStorePinnedTimestampService.getPinnedTimestamps().v1(); + } + + public long getLastSuccessfulFetchOfPinnedTimestamps() { + return this.lastSuccessfulFetchOfPinnedTimestamps; + } + + public RemoteStoreNodeStats(StreamInput in) throws IOException { + this.lastSuccessfulFetchOfPinnedTimestamps = in.readLong(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeLong(this.lastSuccessfulFetchOfPinnedTimestamps); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(STATS_NAME); + builder.field(LAST_SUCCESSFUL_FETCH_OF_PINNED_TIMESTAMPS, this.lastSuccessfulFetchOfPinnedTimestamps); + return builder.endObject(); + } + + @Override + public String toString() { + return "RemoteStoreNodeStats{ lastSuccessfulFetchOfPinnedTimestamps=" + lastSuccessfulFetchOfPinnedTimestamps + "}"; + } + + @Override + public boolean equals(Object o) { + if (o == null) { + return false; + } + if (o.getClass() != RemoteStoreNodeStats.class) { + return false; + } + RemoteStoreNodeStats other = (RemoteStoreNodeStats) o; + return this.lastSuccessfulFetchOfPinnedTimestamps == other.lastSuccessfulFetchOfPinnedTimestamps; + } + + @Override + public int hashCode() { + return Objects.hash(lastSuccessfulFetchOfPinnedTimestamps); + } +} diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java index 11902728eed07..34065daff2b8a 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -95,6 +95,7 @@ import org.opensearch.node.NodeResourceUsageStats; import org.opensearch.node.NodesResourceUsageStats; import org.opensearch.node.ResponseCollectorService; +import org.opensearch.node.remotestore.RemoteStoreNodeStats; import org.opensearch.ratelimitting.admissioncontrol.controllers.AdmissionController; import org.opensearch.ratelimitting.admissioncontrol.controllers.CpuBasedAdmissionController; import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; @@ -614,6 +615,14 @@ public void testSerialization() throws IOException { } else { assertEquals(nodeCacheStats, deserializedNodeCacheStats); } + + RemoteStoreNodeStats remoteStoreNodeStats = nodeStats.getRemoteStoreNodeStats(); + RemoteStoreNodeStats deserializedRemoteStoreNodeStats = deserializedNodeStats.getRemoteStoreNodeStats(); + if (remoteStoreNodeStats == null) { + assertNull(deserializedRemoteStoreNodeStats); + } else { + assertEquals(remoteStoreNodeStats, deserializedRemoteStoreNodeStats); + } } } } @@ -996,6 +1005,16 @@ public void apply(String action, AdmissionControlActionType admissionControlActi nodeCacheStats = new NodeCacheStats(cacheStatsMap, flags); } + RemoteStoreNodeStats remoteStoreNodeStats = null; + if (frequently()) { + remoteStoreNodeStats = new RemoteStoreNodeStats() { + @Override + public long getLastSuccessfulFetchOfPinnedTimestamps() { + return 123456L; + } + }; + } + // TODO: Only remote_store based aspects of NodeIndicesStats are being tested here. // It is possible to test other metrics in NodeIndicesStats as well since it extends Writeable now return new NodeStats( @@ -1027,7 +1046,8 @@ public void apply(String action, AdmissionControlActionType admissionControlActi segmentReplicationRejectionStats, null, admissionControlStats, - nodeCacheStats + nodeCacheStats, + remoteStoreNodeStats ); } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/stats/ClusterStatsNodesTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/stats/ClusterStatsNodesTests.java index 1c4a77905d73f..823661ba14abf 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/stats/ClusterStatsNodesTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/stats/ClusterStatsNodesTests.java @@ -349,6 +349,7 @@ private ClusterStatsNodeResponse createClusterStatsNodeResponse( null, null, null, + null, null ); if (defaultBehavior) { diff --git a/server/src/test/java/org/opensearch/cluster/DiskUsageTests.java b/server/src/test/java/org/opensearch/cluster/DiskUsageTests.java index 5539dd26dd52d..cd050fb346563 100644 --- a/server/src/test/java/org/opensearch/cluster/DiskUsageTests.java +++ b/server/src/test/java/org/opensearch/cluster/DiskUsageTests.java @@ -195,6 +195,7 @@ public void testFillDiskUsage() { null, null, null, + null, null ), new NodeStats( @@ -226,6 +227,7 @@ public void testFillDiskUsage() { null, null, null, + null, null ), new NodeStats( @@ -257,6 +259,7 @@ public void testFillDiskUsage() { null, null, null, + null, null ) ); @@ -319,6 +322,7 @@ public void testFillDiskUsageSomeInvalidValues() { null, null, null, + null, null ), new NodeStats( @@ -350,6 +354,7 @@ public void testFillDiskUsageSomeInvalidValues() { null, null, null, + null, null ), new NodeStats( @@ -381,6 +386,7 @@ public void testFillDiskUsageSomeInvalidValues() { null, null, null, + null, null ) ); diff --git a/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java b/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java index 35ca5d80aeb4e..ded457601c0ae 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java +++ b/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java @@ -125,7 +125,8 @@ List adjustNodesStats(List nodesStats) { nodeStats.getSegmentReplicationRejectionStats(), nodeStats.getRepositoriesStats(), nodeStats.getAdmissionControlStats(), - nodeStats.getNodeCacheStats() + nodeStats.getNodeCacheStats(), + nodeStats.getRemoteStoreNodeStats() ); }).collect(Collectors.toList()); } diff --git a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java index 7adff82e72245..fa5fb736f518f 100644 --- a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java @@ -2752,6 +2752,7 @@ public void ensureEstimatedStats() { false, false, false, + false, false ); assertThat(