From 5d513ac347374eb66e7fb73da5ebcb2cf4a45cf5 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Tue, 24 Oct 2023 08:59:58 +0000 Subject: [PATCH 1/2] Add cluster state stats (#10670) * Add cluster state update stats along with remote upload stats around success/ failure, latency metric Signed-off-by: Aman Khare (cherry picked from commit 54e74a84437238c6154b0a15d209a9a1ecbaa4bb) Signed-off-by: github-actions[bot] --- CHANGELOG.md | 1 + .../discovery/ClusterManagerDisruptionIT.java | 3 + .../remote/RemoteClusterStateServiceIT.java | 43 ++++++ .../coordination/CoordinationState.java | 6 + .../cluster/coordination/Coordinator.java | 12 +- .../coordination/InMemoryPersistedState.java | 5 + .../coordination/PersistedStateStats.java | 126 ++++++++++++++++++ .../cluster/service/ClusterStateStats.java | 120 +++++++++++++++++ .../cluster/service/MasterService.java | 23 +++- .../opensearch/discovery/DiscoveryStats.java | 21 ++- .../opensearch/gateway/GatewayMetaState.java | 13 ++ .../remote/RemoteClusterStateService.java | 20 ++- .../remote/RemotePersistenceStats.java | 37 +++++ .../cluster/node/stats/NodeStatsTests.java | 29 +++- .../cluster/service/MasterServiceTests.java | 3 + .../GatewayMetaStatePersistedStateTests.java | 22 +++ .../RemoteClusterStateServiceTests.java | 34 +++++ .../AbstractCoordinatorTestCase.java | 5 + 18 files changed, 511 insertions(+), 12 deletions(-) create mode 100644 server/src/main/java/org/opensearch/cluster/coordination/PersistedStateStats.java create mode 100644 server/src/main/java/org/opensearch/cluster/service/ClusterStateStats.java create mode 100644 server/src/main/java/org/opensearch/gateway/remote/RemotePersistenceStats.java diff --git a/CHANGELOG.md b/CHANGELOG.md index acf3b60ec4211..4c435bd759e8a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add search query categorizor ([#10255](https://github.com/opensearch-project/OpenSearch/pull/10255)) - Per request phase latency ([#10351](https://github.com/opensearch-project/OpenSearch/issues/10351)) - Added cluster setting cluster.restrict.index.replication_type to restrict setting of index setting replication type ([#10866](https://github.com/opensearch-project/OpenSearch/pull/10866)) +- Add cluster state stats ([#10670](https://github.com/opensearch-project/OpenSearch/pull/10670)) ### Dependencies - Bumps jetty version to 9.4.52.v20230823 to fix GMS-2023-1857 ([#9822](https://github.com/opensearch-project/OpenSearch/pull/9822)) diff --git a/server/src/internalClusterTest/java/org/opensearch/discovery/ClusterManagerDisruptionIT.java b/server/src/internalClusterTest/java/org/opensearch/discovery/ClusterManagerDisruptionIT.java index f0d52405efac6..9aee6f7f7a192 100644 --- a/server/src/internalClusterTest/java/org/opensearch/discovery/ClusterManagerDisruptionIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/discovery/ClusterManagerDisruptionIT.java @@ -39,6 +39,7 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.coordination.NoClusterManagerBlockService; import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.service.ClusterStateStats; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.xcontent.MediaTypeRegistry; @@ -195,6 +196,8 @@ public void testIsolateClusterManagerAndVerifyClusterStateConsensus() throws Exc } } + ClusterStateStats clusterStateStats = internalCluster().clusterService().getClusterManagerService().getClusterStateStats(); + assertTrue(clusterStateStats.getUpdateFailed() > 0); }); } diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java index 7304304e522f8..59eef3c06844b 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java @@ -8,9 +8,12 @@ package org.opensearch.gateway.remote; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.settings.Settings; +import org.opensearch.discovery.DiscoveryStats; import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.blobstore.BlobStoreRepository; @@ -19,6 +22,7 @@ import java.nio.charset.StandardCharsets; import java.util.Base64; import java.util.Map; +import java.util.stream.Collectors; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING; @@ -94,6 +98,45 @@ public void testFullClusterRestoreStaleDelete() throws Exception { assertEquals(shardCount, indexMetadataMap.values().stream().findFirst().get().getNumberOfShards()); } + public void testRemoteStateStats() { + int shardCount = randomIntBetween(1, 2); + int replicaCount = 1; + int dataNodeCount = shardCount * (replicaCount + 1); + int clusterManagerNodeCount = 1; + prepareCluster(clusterManagerNodeCount, dataNodeCount, INDEX_NAME, replicaCount, shardCount); + String clusterManagerNode = internalCluster().getClusterManagerName(); + String dataNode = internalCluster().getDataNodeNames().stream().collect(Collectors.toList()).get(0); + + // Fetch _nodes/stats + NodesStatsResponse nodesStatsResponse = client().admin() + .cluster() + .prepareNodesStats(clusterManagerNode) + .addMetric(NodesStatsRequest.Metric.DISCOVERY.metricName()) + .get(); + + // assert cluster state stats + DiscoveryStats discoveryStats = nodesStatsResponse.getNodes().get(0).getDiscoveryStats(); + + assertNotNull(discoveryStats.getClusterStateStats()); + assertTrue(discoveryStats.getClusterStateStats().getUpdateSuccess() > 1); + assertEquals(0, discoveryStats.getClusterStateStats().getUpdateFailed()); + assertTrue(discoveryStats.getClusterStateStats().getUpdateTotalTimeInMillis() > 0); + // assert remote state stats + assertTrue(discoveryStats.getClusterStateStats().getPersistenceStats().get(0).getSuccessCount() > 1); + assertEquals(0, discoveryStats.getClusterStateStats().getPersistenceStats().get(0).getFailedCount()); + assertTrue(discoveryStats.getClusterStateStats().getPersistenceStats().get(0).getTotalTimeInMillis() > 0); + + NodesStatsResponse nodesStatsResponseDataNode = client().admin() + .cluster() + .prepareNodesStats(dataNode) + .addMetric(NodesStatsRequest.Metric.DISCOVERY.metricName()) + .get(); + // assert cluster state stats for data node + DiscoveryStats dataNodeDiscoveryStats = nodesStatsResponseDataNode.getNodes().get(0).getDiscoveryStats(); + assertNotNull(dataNodeDiscoveryStats.getClusterStateStats()); + assertEquals(0, dataNodeDiscoveryStats.getClusterStateStats().getUpdateSuccess()); + } + private void setReplicaCount(int replicaCount) { client().admin() .indices() diff --git a/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java b/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java index a339852e6ed8d..987a3e3ffa7d3 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java @@ -638,6 +638,12 @@ public interface PersistedState extends Closeable { */ void setLastAcceptedState(ClusterState clusterState); + /** + * Returns the stats for the persistence layer for {@link CoordinationState}. + * @return PersistedStateStats + */ + PersistedStateStats getStats(); + /** * Marks the last accepted cluster state as committed. * After a successful call to this method, {@link #getLastAcceptedState()} should return the last cluster state that was set, diff --git a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java index f587a73137c0a..af63236a73ade 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java @@ -57,6 +57,7 @@ import org.opensearch.cluster.service.ClusterApplier; import org.opensearch.cluster.service.ClusterApplier.ClusterApplyListener; import org.opensearch.cluster.service.ClusterManagerService; +import org.opensearch.cluster.service.ClusterStateStats; import org.opensearch.common.Booleans; import org.opensearch.common.Nullable; import org.opensearch.common.Priority; @@ -866,7 +867,16 @@ protected void doStart() { @Override public DiscoveryStats stats() { - return new DiscoveryStats(new PendingClusterStateStats(0, 0, 0), publicationHandler.stats()); + ClusterStateStats clusterStateStats = clusterManagerService.getClusterStateStats(); + ArrayList stats = new ArrayList<>(); + Stream.of(PersistedStateRegistry.PersistedStateType.values()).forEach(stateType -> { + if (persistedStateRegistry.getPersistedState(stateType) != null + && persistedStateRegistry.getPersistedState(stateType).getStats() != null) { + stats.add(persistedStateRegistry.getPersistedState(stateType).getStats()); + } + }); + clusterStateStats.setPersistenceStats(stats); + return new DiscoveryStats(new PendingClusterStateStats(0, 0, 0), publicationHandler.stats(), clusterStateStats); } @Override diff --git a/server/src/main/java/org/opensearch/cluster/coordination/InMemoryPersistedState.java b/server/src/main/java/org/opensearch/cluster/coordination/InMemoryPersistedState.java index 67ef82ee7b2e9..b77ede5471534 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/InMemoryPersistedState.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/InMemoryPersistedState.java @@ -65,6 +65,11 @@ public void setLastAcceptedState(ClusterState clusterState) { this.acceptedState = clusterState; } + @Override + public PersistedStateStats getStats() { + return null; + } + @Override public long getCurrentTerm() { return currentTerm; diff --git a/server/src/main/java/org/opensearch/cluster/coordination/PersistedStateStats.java b/server/src/main/java/org/opensearch/cluster/coordination/PersistedStateStats.java new file mode 100644 index 0000000000000..1dc20e564ade2 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/coordination/PersistedStateStats.java @@ -0,0 +1,126 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.coordination; + +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Persisted cluster state related stats. + * + * @opensearch.internal + */ +public class PersistedStateStats implements Writeable, ToXContentObject { + private String statsName; + private AtomicLong totalTimeInMillis = new AtomicLong(0); + private AtomicLong failedCount = new AtomicLong(0); + private AtomicLong successCount = new AtomicLong(0); + private Map extendedFields = new HashMap<>(); // keeping minimal extensibility + + public PersistedStateStats(String statsName) { + this.statsName = statsName; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVLong(successCount.get()); + out.writeVLong(failedCount.get()); + out.writeVLong(totalTimeInMillis.get()); + if (extendedFields.size() > 0) { + out.writeBoolean(true); + out.writeVInt(extendedFields.size()); + for (Map.Entry extendedField : extendedFields.entrySet()) { + out.writeString(extendedField.getKey()); + out.writeVLong(extendedField.getValue().get()); + } + } else { + out.writeBoolean(false); + } + } + + public PersistedStateStats(StreamInput in) throws IOException { + this.successCount = new AtomicLong(in.readVLong()); + this.failedCount = new AtomicLong(in.readVLong()); + this.totalTimeInMillis = new AtomicLong(in.readVLong()); + if (in.readBoolean()) { + int extendedFieldsSize = in.readVInt(); + this.extendedFields = new HashMap<>(); + for (int fieldNumber = 0; fieldNumber < extendedFieldsSize; fieldNumber++) { + extendedFields.put(in.readString(), new AtomicLong(in.readVLong())); + } + } + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(statsName); + builder.field(Fields.SUCCESS_COUNT, getSuccessCount()); + builder.field(Fields.FAILED_COUNT, getFailedCount()); + builder.field(Fields.TOTAL_TIME_IN_MILLIS, getTotalTimeInMillis()); + if (extendedFields.size() > 0) { + for (Map.Entry extendedField : extendedFields.entrySet()) { + builder.field(extendedField.getKey(), extendedField.getValue().get()); + } + } + builder.endObject(); + return builder; + } + + public void stateFailed() { + failedCount.incrementAndGet(); + } + + public void stateSucceeded() { + successCount.incrementAndGet(); + } + + /** + * Expects user to send time taken in milliseconds. + * + * @param timeTakenInUpload time taken in uploading the cluster state to remote + */ + public void stateTook(long timeTakenInUpload) { + totalTimeInMillis.addAndGet(timeTakenInUpload); + } + + public long getTotalTimeInMillis() { + return totalTimeInMillis.get(); + } + + public long getFailedCount() { + return failedCount.get(); + } + + public long getSuccessCount() { + return successCount.get(); + } + + protected void addToExtendedFields(String extendedField, AtomicLong extendedFieldValue) { + this.extendedFields.put(extendedField, extendedFieldValue); + } + + /** + * Fields for parsing and toXContent + * + * @opensearch.internal + */ + static final class Fields { + static final String SUCCESS_COUNT = "success_count"; + static final String TOTAL_TIME_IN_MILLIS = "total_time_in_millis"; + static final String FAILED_COUNT = "failed_count"; + } +} diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterStateStats.java b/server/src/main/java/org/opensearch/cluster/service/ClusterStateStats.java new file mode 100644 index 0000000000000..96683ce720d0b --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/service/ClusterStateStats.java @@ -0,0 +1,120 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.service; + +import org.opensearch.cluster.coordination.PersistedStateStats; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Cluster state related stats. + * + * @opensearch.internal + */ +public class ClusterStateStats implements Writeable, ToXContentObject { + + private AtomicLong updateSuccess = new AtomicLong(0); + private AtomicLong updateTotalTimeInMillis = new AtomicLong(0); + private AtomicLong updateFailed = new AtomicLong(0); + private List persistenceStats = new ArrayList<>(); + + public ClusterStateStats() {} + + public long getUpdateSuccess() { + return updateSuccess.get(); + } + + public long getUpdateTotalTimeInMillis() { + return updateTotalTimeInMillis.get(); + } + + public long getUpdateFailed() { + return updateFailed.get(); + } + + public List getPersistenceStats() { + return persistenceStats; + } + + public void stateUpdated() { + updateSuccess.incrementAndGet(); + } + + public void stateUpdateFailed() { + updateFailed.incrementAndGet(); + } + + public void stateUpdateTook(long stateUpdateTime) { + updateTotalTimeInMillis.addAndGet(stateUpdateTime); + } + + public ClusterStateStats setPersistenceStats(List persistenceStats) { + this.persistenceStats = persistenceStats; + return this; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVLong(updateSuccess.get()); + out.writeVLong(updateTotalTimeInMillis.get()); + out.writeVLong(updateFailed.get()); + out.writeVInt(persistenceStats.size()); + for (PersistedStateStats stats : persistenceStats) { + stats.writeTo(out); + } + } + + public ClusterStateStats(StreamInput in) throws IOException { + this.updateSuccess = new AtomicLong(in.readVLong()); + this.updateTotalTimeInMillis = new AtomicLong(in.readVLong()); + this.updateFailed = new AtomicLong(in.readVLong()); + int persistedStatsSize = in.readVInt(); + this.persistenceStats = new ArrayList<>(); + for (int statsNumber = 0; statsNumber < persistedStatsSize; statsNumber++) { + PersistedStateStats stats = new PersistedStateStats(in); + this.persistenceStats.add(stats); + } + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(Fields.CLUSTER_STATE_STATS); + builder.startObject(Fields.OVERALL); + builder.field(Fields.UPDATE_COUNT, getUpdateSuccess()); + builder.field(Fields.TOTAL_TIME_IN_MILLIS, getUpdateTotalTimeInMillis()); + builder.field(Fields.FAILED_COUNT, getUpdateFailed()); + builder.endObject(); + for (PersistedStateStats stats : persistenceStats) { + stats.toXContent(builder, params); + } + builder.endObject(); + return builder; + } + + /** + * Fields for parsing and toXContent + * + * @opensearch.internal + */ + static final class Fields { + static final String CLUSTER_STATE_STATS = "cluster_state_stats"; + static final String OVERALL = "overall"; + static final String UPDATE_COUNT = "update_count"; + static final String TOTAL_TIME_IN_MILLIS = "total_time_in_millis"; + static final String FAILED_COUNT = "failed_count"; + } +} diff --git a/server/src/main/java/org/opensearch/cluster/service/MasterService.java b/server/src/main/java/org/opensearch/cluster/service/MasterService.java index 563b69dfd0e2a..07c3f93ae6486 100644 --- a/server/src/main/java/org/opensearch/cluster/service/MasterService.java +++ b/server/src/main/java/org/opensearch/cluster/service/MasterService.java @@ -112,7 +112,9 @@ public class MasterService extends AbstractLifecycleComponent { static final String CLUSTER_MANAGER_UPDATE_THREAD_NAME = "clusterManagerService#updateTask"; - /** @deprecated As of 2.2, because supporting inclusive language, replaced by {@link #CLUSTER_MANAGER_UPDATE_THREAD_NAME} */ + /** + * @deprecated As of 2.2, because supporting inclusive language, replaced by {@link #CLUSTER_MANAGER_UPDATE_THREAD_NAME} + */ @Deprecated static final String MASTER_UPDATE_THREAD_NAME = "masterService#updateTask"; @@ -130,6 +132,7 @@ public class MasterService extends AbstractLifecycleComponent { private volatile Batcher taskBatcher; protected final ClusterManagerTaskThrottler clusterManagerTaskThrottler; private final ClusterManagerThrottlingStats throttlingStats; + private final ClusterStateStats stateStats; public MasterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) { this.nodeName = Objects.requireNonNull(Node.NODE_NAME_SETTING.get(settings)); @@ -147,6 +150,7 @@ public MasterService(Settings settings, ClusterSettings clusterSettings, ThreadP this::getMinNodeVersion, throttlingStats ); + this.stateStats = new ClusterStateStats(); this.threadPool = threadPool; } @@ -339,7 +343,7 @@ private TimeValue getTimeSince(long startTimeNanos) { return TimeValue.timeValueMillis(TimeValue.nsecToMSec(threadPool.preciseRelativeTimeInNanos() - startTimeNanos)); } - protected void publish(ClusterChangedEvent clusterChangedEvent, TaskOutputs taskOutputs, long startTimeMillis) { + protected void publish(ClusterChangedEvent clusterChangedEvent, TaskOutputs taskOutputs, long startTimeNanos) { final PlainActionFuture fut = new PlainActionFuture() { @Override protected boolean blockingAllowed() { @@ -352,8 +356,12 @@ protected boolean blockingAllowed() { try { FutureUtils.get(fut); onPublicationSuccess(clusterChangedEvent, taskOutputs); + final long durationMillis = getTimeSince(startTimeNanos).millis(); + stateStats.stateUpdateTook(durationMillis); + stateStats.stateUpdated(); } catch (Exception e) { - onPublicationFailed(clusterChangedEvent, taskOutputs, startTimeMillis, e); + stateStats.stateUpdateFailed(); + onPublicationFailed(clusterChangedEvent, taskOutputs, startTimeNanos, e); } } @@ -464,7 +472,6 @@ public Builder incrementVersion(ClusterState clusterState) { * @param source the source of the cluster state update task * @param updateTask the full context for the cluster state update * task - * */ public & ClusterStateTaskListener> void submitStateUpdateTask( String source, @@ -490,7 +497,6 @@ public & Cluster * @param listener callback after the cluster state update task * completes * @param the type of the cluster state update task state - * */ public void submitStateUpdateTask( String source, @@ -947,7 +953,7 @@ void onNoLongerClusterManager() { /** * Functionality for register task key to cluster manager node. * - * @param taskKey - task key of task + * @param taskKey - task key of task * @param throttlingEnabled - throttling is enabled for task or not i.e does data node perform retries on it or not * @return throttling task key which needs to be passed while submitting task to cluster manager */ @@ -966,7 +972,6 @@ public ClusterManagerTaskThrottler.ThrottlingKey registerClusterManagerTask(Stri * that share the same executor will be executed * batches on this executor * @param the type of the cluster state update task state - * */ public void submitStateUpdateTasks( final String source, @@ -996,4 +1001,8 @@ public void submitStateUpdateTasks( } } + public ClusterStateStats getClusterStateStats() { + return stateStats; + } + } diff --git a/server/src/main/java/org/opensearch/discovery/DiscoveryStats.java b/server/src/main/java/org/opensearch/discovery/DiscoveryStats.java index 665ecf77d7aa7..ea93ccd09ed39 100644 --- a/server/src/main/java/org/opensearch/discovery/DiscoveryStats.java +++ b/server/src/main/java/org/opensearch/discovery/DiscoveryStats.java @@ -32,8 +32,10 @@ package org.opensearch.discovery; +import org.opensearch.Version; import org.opensearch.cluster.coordination.PendingClusterStateStats; import org.opensearch.cluster.coordination.PublishClusterStateStats; +import org.opensearch.cluster.service.ClusterStateStats; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; @@ -51,21 +53,31 @@ public class DiscoveryStats implements Writeable, ToXContentFragment { private final PendingClusterStateStats queueStats; private final PublishClusterStateStats publishStats; + private final ClusterStateStats clusterStateStats; - public DiscoveryStats(PendingClusterStateStats queueStats, PublishClusterStateStats publishStats) { + public DiscoveryStats(PendingClusterStateStats queueStats, PublishClusterStateStats publishStats, ClusterStateStats clusterStateStats) { this.queueStats = queueStats; this.publishStats = publishStats; + this.clusterStateStats = clusterStateStats; } public DiscoveryStats(StreamInput in) throws IOException { queueStats = in.readOptionalWriteable(PendingClusterStateStats::new); publishStats = in.readOptionalWriteable(PublishClusterStateStats::new); + if (in.getVersion().onOrAfter(Version.V_3_0_0)) { + clusterStateStats = in.readOptionalWriteable(ClusterStateStats::new); + } else { + clusterStateStats = null; + } } @Override public void writeTo(StreamOutput out) throws IOException { out.writeOptionalWriteable(queueStats); out.writeOptionalWriteable(publishStats); + if (out.getVersion().onOrAfter(Version.V_3_0_0)) { + out.writeOptionalWriteable(clusterStateStats); + } } @Override @@ -77,6 +89,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (publishStats != null) { publishStats.toXContent(builder, params); } + if (clusterStateStats != null) { + clusterStateStats.toXContent(builder, params); + } builder.endObject(); return builder; } @@ -92,4 +107,8 @@ public PendingClusterStateStats getQueueStats() { public PublishClusterStateStats getPublishStats() { return publishStats; } + + public ClusterStateStats getClusterStateStats() { + return clusterStateStats; + } } diff --git a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java index b696d93324124..f1f4f3c3ea6c7 100644 --- a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java +++ b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java @@ -48,6 +48,7 @@ import org.opensearch.cluster.coordination.InMemoryPersistedState; import org.opensearch.cluster.coordination.PersistedStateRegistry; import org.opensearch.cluster.coordination.PersistedStateRegistry.PersistedStateType; +import org.opensearch.cluster.coordination.PersistedStateStats; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.IndexTemplateMetadata; import org.opensearch.cluster.metadata.Manifest; @@ -616,6 +617,12 @@ public void setLastAcceptedState(ClusterState clusterState) { lastAcceptedState = clusterState; } + @Override + public PersistedStateStats getStats() { + // Note: These stats are not published yet, will come in future + return null; + } + private PersistedClusterStateService.Writer getWriterSafe() { final PersistedClusterStateService.Writer writer = persistenceWriter.get(); if (writer == null) { @@ -718,10 +725,16 @@ assert verifyManifestAndClusterState(lastAcceptedManifest, lastAcceptedState) == lastAcceptedManifest = manifest; lastAcceptedState = clusterState; } catch (Exception e) { + remoteClusterStateService.writeMetadataFailed(); handleExceptionOnWrite(e); } } + @Override + public PersistedStateStats getStats() { + return remoteClusterStateService.getStats(); + } + private boolean verifyManifestAndClusterState(ClusterMetadataManifest manifest, ClusterState clusterState) { assert manifest != null : "ClusterMetadataManifest is null"; assert clusterState != null : "ClusterState is null"; diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index ae4a3fab9852d..0246fe8947eb7 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -159,7 +159,7 @@ public class RemoteClusterStateService implements Closeable { private volatile TimeValue globalMetadataUploadTimeout; private final AtomicBoolean deleteStaleMetadataRunning = new AtomicBoolean(false); - + private final RemotePersistenceStats remoteStateStats; public static final int INDEX_METADATA_CURRENT_CODEC_VERSION = 1; public static final int MANIFEST_CURRENT_CODEC_VERSION = ClusterMetadataManifest.CODEC_V1; public static final int GLOBAL_METADATA_CURRENT_CODEC_VERSION = 1; @@ -193,6 +193,7 @@ public RemoteClusterStateService( clusterSettings.addSettingsUpdateConsumer(SLOW_WRITE_LOGGING_THRESHOLD, this::setSlowWriteLoggingThreshold); clusterSettings.addSettingsUpdateConsumer(INDEX_METADATA_UPLOAD_TIMEOUT_SETTING, this::setIndexMetadataUploadTimeout); clusterSettings.addSettingsUpdateConsumer(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING, this::setGlobalMetadataUploadTimeout); + this.remoteStateStats = new RemotePersistenceStats(); } private BlobStoreTransferService getBlobStoreTransferService() { @@ -233,6 +234,8 @@ public ClusterMetadataManifest writeFullMetadata(ClusterState clusterState, Stri false ); final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos); + remoteStateStats.stateSucceeded(); + remoteStateStats.stateTook(durationMillis); if (durationMillis >= slowWriteLoggingThreshold.getMillis()) { logger.warn( "writing cluster state took [{}ms] which is above the warn threshold of [{}]; " + "wrote full state with [{}] indices", @@ -334,6 +337,8 @@ public ClusterMetadataManifest writeIncrementalMetadata( deleteStaleClusterMetadata(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), RETAINED_MANIFESTS); final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos); + remoteStateStats.stateSucceeded(); + remoteStateStats.stateTook(durationMillis); if (durationMillis >= slowWriteLoggingThreshold.getMillis()) { logger.warn( "writing cluster state took [{}ms] which is above the warn threshold of [{}]; " @@ -1059,6 +1064,10 @@ public static String encodeString(String content) { return Base64.getUrlEncoder().withoutPadding().encodeToString(content.getBytes(StandardCharsets.UTF_8)); } + public void writeMetadataFailed() { + getStats().stateFailed(); + } + /** * Exception for IndexMetadata transfer failures to remote */ @@ -1093,7 +1102,7 @@ public GlobalMetadataTransferException(String errorDesc, Throwable cause) { * @param clusterName name of the cluster * @param clusterUUIDs clusteUUIDs for which the remote state needs to be purged */ - private void deleteStaleUUIDsClusterMetadata(String clusterName, List clusterUUIDs) { + void deleteStaleUUIDsClusterMetadata(String clusterName, List clusterUUIDs) { clusterUUIDs.forEach(clusterUUID -> { getBlobStoreTransferService().deleteAsync( ThreadPool.Names.REMOTE_PURGE, @@ -1113,6 +1122,7 @@ public void onFailure(Exception e) { ), e ); + remoteStateStats.cleanUpAttemptFailed(); } } ); @@ -1228,8 +1238,10 @@ private void deleteClusterMetadata( logger.error("Error while fetching Remote Cluster Metadata manifests", e); } catch (IOException e) { logger.error("Error while deleting stale Remote Cluster Metadata files", e); + remoteStateStats.cleanUpAttemptFailed(); } catch (Exception e) { logger.error("Unexpected error while deleting stale Remote Cluster Metadata files", e); + remoteStateStats.cleanUpAttemptFailed(); } } @@ -1260,4 +1272,8 @@ public void deleteStaleClusterUUIDs(ClusterState clusterState, ClusterMetadataMa deleteStaleUUIDsClusterMetadata(clusterName, new ArrayList<>(allClustersUUIDsInRemote)); }); } + + public RemotePersistenceStats getStats() { + return remoteStateStats; + } } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemotePersistenceStats.java b/server/src/main/java/org/opensearch/gateway/remote/RemotePersistenceStats.java new file mode 100644 index 0000000000000..f2330846fa23e --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/RemotePersistenceStats.java @@ -0,0 +1,37 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote; + +import org.opensearch.cluster.coordination.PersistedStateStats; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * Remote state related extended stats. + * + * @opensearch.internal + */ +public class RemotePersistenceStats extends PersistedStateStats { + static final String CLEANUP_ATTEMPT_FAILED_COUNT = "cleanup_attempt_failed_count"; + static final String REMOTE_UPLOAD = "remote_upload"; + private AtomicLong cleanupAttemptFailedCount = new AtomicLong(0); + + public RemotePersistenceStats() { + super(REMOTE_UPLOAD); + addToExtendedFields(CLEANUP_ATTEMPT_FAILED_COUNT, cleanupAttemptFailedCount); + } + + public void cleanUpAttemptFailed() { + cleanupAttemptFailedCount.incrementAndGet(); + } + + public long getCleanupAttemptFailedCount() { + return cleanupAttemptFailedCount.get(); + } +} diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java index 2212bd971de7c..3f2b53143c88a 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -36,10 +36,12 @@ import org.opensearch.action.admin.indices.stats.CommonStatsFlags; import org.opensearch.action.search.SearchRequestStats; import org.opensearch.cluster.coordination.PendingClusterStateStats; +import org.opensearch.cluster.coordination.PersistedStateStats; import org.opensearch.cluster.coordination.PublishClusterStateStats; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.WeightedRoutingStats; import org.opensearch.cluster.service.ClusterManagerThrottlingStats; +import org.opensearch.cluster.service.ClusterStateStats; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.metrics.OperationStats; import org.opensearch.core.common.io.stream.StreamInput; @@ -47,6 +49,7 @@ import org.opensearch.core.indices.breaker.AllCircuitBreakerStats; import org.opensearch.core.indices.breaker.CircuitBreakerStats; import org.opensearch.discovery.DiscoveryStats; +import org.opensearch.gateway.remote.RemotePersistenceStats; import org.opensearch.http.HttpStats; import org.opensearch.index.ReplicationStats; import org.opensearch.index.SegmentReplicationRejectionStats; @@ -72,6 +75,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -349,6 +353,25 @@ public void testSerialization() throws IOException { assertEquals(queueStats.getTotal(), deserializedDiscoveryStats.getQueueStats().getTotal()); assertEquals(queueStats.getPending(), deserializedDiscoveryStats.getQueueStats().getPending()); } + ClusterStateStats stateStats = discoveryStats.getClusterStateStats(); + if (stateStats == null) { + assertNull(deserializedDiscoveryStats.getClusterStateStats()); + } else { + assertEquals(stateStats.getUpdateFailed(), deserializedDiscoveryStats.getClusterStateStats().getUpdateFailed()); + assertEquals(stateStats.getUpdateSuccess(), deserializedDiscoveryStats.getClusterStateStats().getUpdateSuccess()); + assertEquals( + stateStats.getUpdateTotalTimeInMillis(), + deserializedDiscoveryStats.getClusterStateStats().getUpdateTotalTimeInMillis() + ); + assertEquals(1, deserializedDiscoveryStats.getClusterStateStats().getPersistenceStats().size()); + PersistedStateStats deserializedRemoteStateStats = deserializedDiscoveryStats.getClusterStateStats() + .getPersistenceStats() + .get(0); + PersistedStateStats remoteStateStats = stateStats.getPersistenceStats().get(0); + assertEquals(remoteStateStats.getFailedCount(), deserializedRemoteStateStats.getFailedCount()); + assertEquals(remoteStateStats.getSuccessCount(), deserializedRemoteStateStats.getSuccessCount()); + assertEquals(remoteStateStats.getTotalTimeInMillis(), deserializedRemoteStateStats.getTotalTimeInMillis()); + } } IngestStats ingestStats = nodeStats.getIngestStats(); IngestStats deserializedIngestStats = deserializedNodeStats.getIngestStats(); @@ -725,12 +748,16 @@ public static NodeStats createNodeStats(boolean remoteStoreStats) { ScriptStats scriptStats = frequently() ? new ScriptStats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()) : null; + ClusterStateStats stateStats = new ClusterStateStats(); + RemotePersistenceStats remoteStateStats = new RemotePersistenceStats(); + stateStats.setPersistenceStats(Arrays.asList(remoteStateStats)); DiscoveryStats discoveryStats = frequently() ? new DiscoveryStats( randomBoolean() ? new PendingClusterStateStats(randomInt(), randomInt(), randomInt()) : null, randomBoolean() ? new PublishClusterStateStats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()) - : null + : null, + randomBoolean() ? stateStats : null ) : null; IngestStats ingestStats = null; diff --git a/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java b/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java index 9cdbe04e0a0e4..4c0ca826f5dcc 100644 --- a/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java @@ -691,6 +691,9 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS submittedTasksPerThread.get(entry.getKey()).get() ); } + // verify stats values after state is published + assertEquals(1, clusterManagerService.getClusterStateStats().getUpdateSuccess()); + assertEquals(0, clusterManagerService.getClusterStateStats().getUpdateFailed()); } } diff --git a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java index 1d5c2a0f01b5c..fd113ed4313d7 100644 --- a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java +++ b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java @@ -68,6 +68,7 @@ import org.opensearch.gateway.PersistedClusterStateService.Writer; import org.opensearch.gateway.remote.ClusterMetadataManifest; import org.opensearch.gateway.remote.RemoteClusterStateService; +import org.opensearch.gateway.remote.RemotePersistenceStats; import org.opensearch.index.recovery.RemoteStoreRestoreService; import org.opensearch.index.recovery.RemoteStoreRestoreService.RemoteRestoreResult; import org.opensearch.node.Node; @@ -104,6 +105,7 @@ import static org.hamcrest.Matchers.nullValue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -779,6 +781,26 @@ public void testRemotePersistedStateExceptionOnFullStateUpload() throws IOExcept assertThrows(OpenSearchException.class, () -> remotePersistedState.setLastAcceptedState(clusterState)); } + public void testRemotePersistedStateFailureStats() throws IOException { + RemotePersistenceStats remoteStateStats = new RemotePersistenceStats(); + final RemoteClusterStateService remoteClusterStateService = Mockito.mock(RemoteClusterStateService.class); + final String previousClusterUUID = "prev-cluster-uuid"; + Mockito.doThrow(IOException.class).when(remoteClusterStateService).writeFullMetadata(Mockito.any(), Mockito.any()); + when(remoteClusterStateService.getStats()).thenReturn(remoteStateStats); + doCallRealMethod().when(remoteClusterStateService).writeMetadataFailed(); + CoordinationState.PersistedState remotePersistedState = new RemotePersistedState(remoteClusterStateService, previousClusterUUID); + + final long clusterTerm = randomNonNegativeLong(); + final ClusterState clusterState = createClusterState( + randomNonNegativeLong(), + Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(clusterTerm).build()).build() + ); + + assertThrows(OpenSearchException.class, () -> remotePersistedState.setLastAcceptedState(clusterState)); + assertEquals(1, remoteClusterStateService.getStats().getFailedCount()); + assertEquals(0, remoteClusterStateService.getStats().getSuccessCount()); + } + public void testGatewayForRemoteState() throws IOException { MockGatewayMetaState gateway = null; try { diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index 4be5fc03c2a6d..9950b5a08b89e 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -324,6 +324,7 @@ public void testWriteFullMetadataInParallelFailureForIndexMetadata() throws IOEx RemoteClusterStateService.IndexMetadataTransferException.class, () -> remoteClusterStateService.writeFullMetadata(clusterState, randomAlphaOfLength(10)) ); + assertEquals(0, remoteClusterStateService.getStats().getSuccessCount()); } public void testFailWriteIncrementalMetadataNonClusterManagerNode() throws IOException { @@ -331,6 +332,7 @@ public void testFailWriteIncrementalMetadataNonClusterManagerNode() throws IOExc remoteClusterStateService.start(); final ClusterMetadataManifest manifest = remoteClusterStateService.writeIncrementalMetadata(clusterState, clusterState, null); Assert.assertThat(manifest, nullValue()); + assertEquals(0, remoteClusterStateService.getStats().getSuccessCount()); } public void testFailWriteIncrementalMetadataWhenTermChanged() { @@ -991,6 +993,38 @@ public void testDeleteStaleClusterUUIDs() throws IOException { } } + public void testRemoteStateStats() throws IOException { + final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); + mockBlobStoreObjects(); + remoteClusterStateService.start(); + final ClusterMetadataManifest manifest = remoteClusterStateService.writeFullMetadata(clusterState, "prev-cluster-uuid"); + + assertTrue(remoteClusterStateService.getStats() != null); + assertEquals(1, remoteClusterStateService.getStats().getSuccessCount()); + assertEquals(0, remoteClusterStateService.getStats().getCleanupAttemptFailedCount()); + assertEquals(0, remoteClusterStateService.getStats().getFailedCount()); + } + + public void testRemoteStateCleanupFailureStats() throws IOException { + BlobContainer blobContainer = mock(BlobContainer.class); + doThrow(IOException.class).when(blobContainer).delete(); + when(blobStore.blobContainer(any())).thenReturn(blobContainer); + BlobPath blobPath = new BlobPath().add("random-path"); + when((blobStoreRepository.basePath())).thenReturn(blobPath); + remoteClusterStateService.start(); + remoteClusterStateService.deleteStaleUUIDsClusterMetadata("cluster1", Arrays.asList("cluster-uuid1")); + try { + assertBusy(() -> { + // wait for stats to get updated + assertTrue(remoteClusterStateService.getStats() != null); + assertEquals(0, remoteClusterStateService.getStats().getSuccessCount()); + assertEquals(1, remoteClusterStateService.getStats().getCleanupAttemptFailedCount()); + }); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + public void testFileNames() { final Index index = new Index("test-index", "index-uuid"); final Settings idxSettings = Settings.builder() diff --git a/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java b/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java index d24cc24d28579..28d7706fb1493 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java +++ b/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java @@ -1016,6 +1016,11 @@ public void setLastAcceptedState(ClusterState clusterState) { delegate.setLastAcceptedState(clusterState); } + @Override + public PersistedStateStats getStats() { + return null; + } + @Override public void close() { assertTrue(openPersistedStates.remove(this)); From 3c5c0760bcc47fc586f5551ecd9d23d86a73e6c1 Mon Sep 17 00:00:00 2001 From: Aman Khare Date: Wed, 25 Oct 2023 11:07:33 +0530 Subject: [PATCH 2/2] Change backward compatibility version to 2.12 for 2.x merge Signed-off-by: Aman Khare --- .../main/java/org/opensearch/discovery/DiscoveryStats.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/discovery/DiscoveryStats.java b/server/src/main/java/org/opensearch/discovery/DiscoveryStats.java index ea93ccd09ed39..fb341ac2ac569 100644 --- a/server/src/main/java/org/opensearch/discovery/DiscoveryStats.java +++ b/server/src/main/java/org/opensearch/discovery/DiscoveryStats.java @@ -64,7 +64,7 @@ public DiscoveryStats(PendingClusterStateStats queueStats, PublishClusterStateSt public DiscoveryStats(StreamInput in) throws IOException { queueStats = in.readOptionalWriteable(PendingClusterStateStats::new); publishStats = in.readOptionalWriteable(PublishClusterStateStats::new); - if (in.getVersion().onOrAfter(Version.V_3_0_0)) { + if (in.getVersion().onOrAfter(Version.V_2_12_0)) { clusterStateStats = in.readOptionalWriteable(ClusterStateStats::new); } else { clusterStateStats = null; @@ -75,7 +75,7 @@ public DiscoveryStats(StreamInput in) throws IOException { public void writeTo(StreamOutput out) throws IOException { out.writeOptionalWriteable(queueStats); out.writeOptionalWriteable(publishStats); - if (out.getVersion().onOrAfter(Version.V_3_0_0)) { + if (out.getVersion().onOrAfter(Version.V_2_12_0)) { out.writeOptionalWriteable(clusterStateStats); } }