From 6386f7a302ba49283925745b224d6bc813c6d686 Mon Sep 17 00:00:00 2001 From: Aman Khare Date: Sat, 21 Oct 2023 12:31:38 +0530 Subject: [PATCH] Add local or remote stats using array to extend upcoming stats automatically Signed-off-by: Aman Khare --- .../coordination/CoordinationState.java | 2 +- .../cluster/coordination/Coordinator.java | 15 ++-- .../coordination/InMemoryPersistedState.java | 2 +- .../coordination/PersistedStateStats.java | 6 +- .../cluster/service/ClusterStateStats.java | 74 ++++++++++--------- .../opensearch/gateway/GatewayMetaState.java | 5 +- .../remote/RemoteClusterStateService.java | 16 ++-- ...Stats.java => RemotePersistenceStats.java} | 4 +- .../cluster/node/stats/NodeStatsTests.java | 27 ++++++- .../cluster/service/MasterServiceTests.java | 3 + .../AbstractCoordinatorTestCase.java | 2 +- 11 files changed, 98 insertions(+), 58 deletions(-) rename server/src/main/java/org/opensearch/gateway/remote/{RemoteStateStats.java => RemotePersistenceStats.java} (90%) diff --git a/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java b/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java index c20fd72b88345..987a3e3ffa7d3 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java @@ -642,7 +642,7 @@ public interface PersistedState extends Closeable { * Returns the stats for the persistence layer for {@link CoordinationState}. * @return PersistedStateStats */ - PersistedStateStats getPersistedStateStats(); + PersistedStateStats getStats(); /** * Marks the last accepted cluster state as committed. diff --git a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java index a3ebf793ca287..5c8459d3d6f3d 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java @@ -866,13 +866,16 @@ protected void doStart() { @Override public DiscoveryStats stats() { - CoordinationState.PersistedState remotePersistedState = persistedStateRegistry.getPersistedState( - PersistedStateRegistry.PersistedStateType.REMOTE - ); ClusterStateStats clusterStateStats = clusterManagerService.getStateStats(); - if (remotePersistedState != null) { - clusterStateStats.setRemoteStateStats(remotePersistedState.getPersistedStateStats()); - } + ArrayList stats = new ArrayList<>(); + Stream.of(PersistedStateRegistry.PersistedStateType.values()).forEach(stateType -> { + if (persistedStateRegistry.getPersistedState(stateType) != null) { + if (persistedStateRegistry.getPersistedState(stateType).getStats() != null) { + stats.add(persistedStateRegistry.getPersistedState(stateType).getStats()); + } + } + }); + clusterStateStats.setPersistenceStats(stats); return new DiscoveryStats(new PendingClusterStateStats(0, 0, 0), publicationHandler.stats(), clusterStateStats); } diff --git a/server/src/main/java/org/opensearch/cluster/coordination/InMemoryPersistedState.java b/server/src/main/java/org/opensearch/cluster/coordination/InMemoryPersistedState.java index e9390044bf714..b77ede5471534 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/InMemoryPersistedState.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/InMemoryPersistedState.java @@ -66,7 +66,7 @@ public void setLastAcceptedState(ClusterState clusterState) { } @Override - public PersistedStateStats getPersistedStateStats() { + public PersistedStateStats getStats() { return null; } diff --git a/server/src/main/java/org/opensearch/cluster/coordination/PersistedStateStats.java b/server/src/main/java/org/opensearch/cluster/coordination/PersistedStateStats.java index 3c3fcd4b5a9e4..d845b6fbf53f0 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/PersistedStateStats.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/PersistedStateStats.java @@ -80,11 +80,11 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return builder; } - public void stateUploadFailed() { + public void stateFailed() { failedCount.incrementAndGet(); } - public void stateUploaded() { + public void stateSucceeded() { successCount.incrementAndGet(); } @@ -93,7 +93,7 @@ public void stateUploaded() { * * @param timeTakenInUpload time taken in uploading the cluster state to remote */ - public void stateUploadTook(long timeTakenInUpload) { + public void stateTook(long timeTakenInUpload) { totalTimeInMillis.addAndGet(timeTakenInUpload); } diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterStateStats.java b/server/src/main/java/org/opensearch/cluster/service/ClusterStateStats.java index ec8b600202907..96683ce720d0b 100644 --- a/server/src/main/java/org/opensearch/cluster/service/ClusterStateStats.java +++ b/server/src/main/java/org/opensearch/cluster/service/ClusterStateStats.java @@ -16,6 +16,8 @@ import org.opensearch.core.xcontent.XContentBuilder; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.atomic.AtomicLong; /** @@ -25,60 +27,66 @@ */ public class ClusterStateStats implements Writeable, ToXContentObject { - private AtomicLong stateUpdateSuccess = new AtomicLong(0); - private AtomicLong stateUpdateTotalTimeInMillis = new AtomicLong(0); - private AtomicLong stateUpdateFailed = new AtomicLong(0); - private PersistedStateStats remoteStateStats = null; + private AtomicLong updateSuccess = new AtomicLong(0); + private AtomicLong updateTotalTimeInMillis = new AtomicLong(0); + private AtomicLong updateFailed = new AtomicLong(0); + private List persistenceStats = new ArrayList<>(); public ClusterStateStats() {} - public long getStateUpdateSuccess() { - return stateUpdateSuccess.get(); + public long getUpdateSuccess() { + return updateSuccess.get(); } - public long getStateUpdateTotalTimeInMillis() { - return stateUpdateTotalTimeInMillis.get(); + public long getUpdateTotalTimeInMillis() { + return updateTotalTimeInMillis.get(); } - public long getStateUpdateFailed() { - return stateUpdateFailed.get(); + public long getUpdateFailed() { + return updateFailed.get(); + } + + public List getPersistenceStats() { + return persistenceStats; } public void stateUpdated() { - stateUpdateSuccess.incrementAndGet(); + updateSuccess.incrementAndGet(); } public void stateUpdateFailed() { - stateUpdateFailed.incrementAndGet(); + updateFailed.incrementAndGet(); } public void stateUpdateTook(long stateUpdateTime) { - stateUpdateTotalTimeInMillis.addAndGet(stateUpdateTime); + updateTotalTimeInMillis.addAndGet(stateUpdateTime); } - public void setRemoteStateStats(PersistedStateStats remoteStateStats) { - this.remoteStateStats = remoteStateStats; + public ClusterStateStats setPersistenceStats(List persistenceStats) { + this.persistenceStats = persistenceStats; + return this; } @Override public void writeTo(StreamOutput out) throws IOException { - out.writeVLong(stateUpdateSuccess.get()); - out.writeVLong(stateUpdateTotalTimeInMillis.get()); - out.writeVLong(stateUpdateFailed.get()); - if (remoteStateStats != null) { - out.writeBoolean(true); - remoteStateStats.writeTo(out); - } else { - out.writeBoolean(false); + out.writeVLong(updateSuccess.get()); + out.writeVLong(updateTotalTimeInMillis.get()); + out.writeVLong(updateFailed.get()); + out.writeVInt(persistenceStats.size()); + for (PersistedStateStats stats : persistenceStats) { + stats.writeTo(out); } } public ClusterStateStats(StreamInput in) throws IOException { - this.stateUpdateSuccess = new AtomicLong(in.readVLong()); - this.stateUpdateTotalTimeInMillis = new AtomicLong(in.readVLong()); - this.stateUpdateFailed = new AtomicLong(in.readVLong()); - if (in.readBoolean()) { - this.remoteStateStats = new PersistedStateStats(in); + this.updateSuccess = new AtomicLong(in.readVLong()); + this.updateTotalTimeInMillis = new AtomicLong(in.readVLong()); + this.updateFailed = new AtomicLong(in.readVLong()); + int persistedStatsSize = in.readVInt(); + this.persistenceStats = new ArrayList<>(); + for (int statsNumber = 0; statsNumber < persistedStatsSize; statsNumber++) { + PersistedStateStats stats = new PersistedStateStats(in); + this.persistenceStats.add(stats); } } @@ -86,12 +94,12 @@ public ClusterStateStats(StreamInput in) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(Fields.CLUSTER_STATE_STATS); builder.startObject(Fields.OVERALL); - builder.field(Fields.UPDATE_COUNT, getStateUpdateSuccess()); - builder.field(Fields.TOTAL_TIME_IN_MILLIS, getStateUpdateTotalTimeInMillis()); - builder.field(Fields.FAILED_COUNT, getStateUpdateFailed()); + builder.field(Fields.UPDATE_COUNT, getUpdateSuccess()); + builder.field(Fields.TOTAL_TIME_IN_MILLIS, getUpdateTotalTimeInMillis()); + builder.field(Fields.FAILED_COUNT, getUpdateFailed()); builder.endObject(); - if (remoteStateStats != null) { - remoteStateStats.toXContent(builder, params); + for (PersistedStateStats stats : persistenceStats) { + stats.toXContent(builder, params); } builder.endObject(); return builder; diff --git a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java index 13d2f43d6b7f7..314cd9b047b34 100644 --- a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java +++ b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java @@ -613,7 +613,8 @@ public void setLastAcceptedState(ClusterState clusterState) { } @Override - public PersistedStateStats getPersistedStateStats() { + public PersistedStateStats getStats() { + // Note: These stats are not published yet, will come in future return null; } @@ -731,7 +732,7 @@ assert verifyManifestAndClusterState(lastAcceptedManifest, lastAcceptedState) == } @Override - public PersistedStateStats getPersistedStateStats() { + public PersistedStateStats getStats() { return remoteClusterStateService.getRemoteClusterStateStats(); } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 4702c7dbfe818..f9c0184348795 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -142,7 +142,7 @@ public class RemoteClusterStateService implements Closeable { private volatile TimeValue slowWriteLoggingThreshold; private final AtomicBoolean deleteStaleMetadataRunning = new AtomicBoolean(false); - private final RemoteStateStats remoteStateStats; + private final RemotePersistenceStats remoteStateStats; public static final int INDEX_METADATA_CURRENT_CODEC_VERSION = 1; public static final int MANIFEST_CURRENT_CODEC_VERSION = ClusterMetadataManifest.CODEC_V1; public static final int GLOBAL_METADATA_CURRENT_CODEC_VERSION = 1; @@ -172,7 +172,7 @@ public RemoteClusterStateService( this.threadpool = threadPool; this.slowWriteLoggingThreshold = clusterSettings.get(SLOW_WRITE_LOGGING_THRESHOLD); clusterSettings.addSettingsUpdateConsumer(SLOW_WRITE_LOGGING_THRESHOLD, this::setSlowWriteLoggingThreshold); - this.remoteStateStats = new RemoteStateStats(); + this.remoteStateStats = new RemotePersistenceStats(); } private BlobStoreTransferService getBlobStoreTransferService() { @@ -213,8 +213,8 @@ public ClusterMetadataManifest writeFullMetadata(ClusterState clusterState, Stri false ); final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos); - remoteStateStats.stateUploaded(); - remoteStateStats.stateUploadTook(durationMillis); + remoteStateStats.stateSucceeded(); + remoteStateStats.stateTook(durationMillis); if (durationMillis >= slowWriteLoggingThreshold.getMillis()) { logger.warn( "writing cluster state took [{}ms] which is above the warn threshold of [{}]; " + "wrote full state with [{}] indices", @@ -316,8 +316,8 @@ public ClusterMetadataManifest writeIncrementalMetadata( deleteStaleClusterMetadata(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), RETAINED_MANIFESTS); final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos); - remoteStateStats.stateUploaded(); - remoteStateStats.stateUploadTook(durationMillis); + remoteStateStats.stateSucceeded(); + remoteStateStats.stateTook(durationMillis); if (durationMillis >= slowWriteLoggingThreshold.getMillis()) { logger.warn( "writing cluster state took [{}ms] which is above the warn threshold of [{}]; " @@ -1011,7 +1011,7 @@ public static String encodeString(String content) { } public void writeMetadataFailed() { - remoteStateStats.stateUploadFailed(); + remoteStateStats.stateFailed(); } /** @@ -1217,7 +1217,7 @@ public void deleteStaleClusterUUIDs(ClusterState clusterState, ClusterMetadataMa }); } - public RemoteStateStats getRemoteClusterStateStats() { + public RemotePersistenceStats getRemoteClusterStateStats() { return remoteStateStats; } } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteStateStats.java b/server/src/main/java/org/opensearch/gateway/remote/RemotePersistenceStats.java similarity index 90% rename from server/src/main/java/org/opensearch/gateway/remote/RemoteStateStats.java rename to server/src/main/java/org/opensearch/gateway/remote/RemotePersistenceStats.java index 819508acfcb90..d992223eba742 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteStateStats.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemotePersistenceStats.java @@ -17,12 +17,12 @@ * * @opensearch.internal */ -public class RemoteStateStats extends PersistedStateStats { +public class RemotePersistenceStats extends PersistedStateStats { static final String CLEANUP_ATTEMPT_FAILED_COUNT = "cleanup_attempt_failed_count"; static final String REMOTE = "remote"; private AtomicLong cleanupAttemptFailedCount = new AtomicLong(0); - public RemoteStateStats() { + public RemotePersistenceStats() { super(REMOTE); addToExtendedFields(CLEANUP_ATTEMPT_FAILED_COUNT, cleanupAttemptFailedCount); } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java index 6e1131cf1bafc..b6f6470c182df 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -36,6 +36,7 @@ import org.opensearch.action.admin.indices.stats.CommonStatsFlags; import org.opensearch.action.search.SearchRequestStats; import org.opensearch.cluster.coordination.PendingClusterStateStats; +import org.opensearch.cluster.coordination.PersistedStateStats; import org.opensearch.cluster.coordination.PublishClusterStateStats; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.WeightedRoutingStats; @@ -48,6 +49,7 @@ import org.opensearch.core.indices.breaker.AllCircuitBreakerStats; import org.opensearch.core.indices.breaker.CircuitBreakerStats; import org.opensearch.discovery.DiscoveryStats; +import org.opensearch.gateway.remote.RemotePersistenceStats; import org.opensearch.http.HttpStats; import org.opensearch.index.ReplicationStats; import org.opensearch.index.remote.RemoteSegmentStats; @@ -72,6 +74,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -349,6 +352,25 @@ public void testSerialization() throws IOException { assertEquals(queueStats.getTotal(), deserializedDiscoveryStats.getQueueStats().getTotal()); assertEquals(queueStats.getPending(), deserializedDiscoveryStats.getQueueStats().getPending()); } + ClusterStateStats stateStats = discoveryStats.getClusterStateStats(); + if (stateStats == null) { + assertNull(deserializedDiscoveryStats.getClusterStateStats()); + } else { + assertEquals(stateStats.getUpdateFailed(), deserializedDiscoveryStats.getClusterStateStats().getUpdateFailed()); + assertEquals(stateStats.getUpdateSuccess(), deserializedDiscoveryStats.getClusterStateStats().getUpdateSuccess()); + assertEquals( + stateStats.getUpdateTotalTimeInMillis(), + deserializedDiscoveryStats.getClusterStateStats().getUpdateTotalTimeInMillis() + ); + assertEquals(1, deserializedDiscoveryStats.getClusterStateStats().getPersistenceStats().size()); + PersistedStateStats deserializedRemoteStateStats = deserializedDiscoveryStats.getClusterStateStats() + .getPersistenceStats() + .get(0); + PersistedStateStats remoteStateStats = stateStats.getPersistenceStats().get(0); + assertEquals(remoteStateStats.getFailedCount(), deserializedRemoteStateStats.getFailedCount()); + assertEquals(remoteStateStats.getSuccessCount(), deserializedRemoteStateStats.getSuccessCount()); + assertEquals(remoteStateStats.getTotalTimeInMillis(), deserializedRemoteStateStats.getTotalTimeInMillis()); + } } IngestStats ingestStats = nodeStats.getIngestStats(); IngestStats deserializedIngestStats = deserializedNodeStats.getIngestStats(); @@ -714,13 +736,16 @@ public static NodeStats createNodeStats(boolean remoteStoreStats) { ScriptStats scriptStats = frequently() ? new ScriptStats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()) : null; + ClusterStateStats stateStats = new ClusterStateStats(); + RemotePersistenceStats remoteStateStats = new RemotePersistenceStats(); + stateStats.setPersistenceStats(Arrays.asList(remoteStateStats)); DiscoveryStats discoveryStats = frequently() ? new DiscoveryStats( randomBoolean() ? new PendingClusterStateStats(randomInt(), randomInt(), randomInt()) : null, randomBoolean() ? new PublishClusterStateStats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()) : null, - randomBoolean() ? new ClusterStateStats() : null + randomBoolean() ? stateStats : null ) : null; IngestStats ingestStats = null; diff --git a/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java b/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java index 9cdbe04e0a0e4..8aff33b8fea7d 100644 --- a/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java @@ -691,6 +691,9 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS submittedTasksPerThread.get(entry.getKey()).get() ); } + // verify stats values after state is published + assertEquals(1, clusterManagerService.getStateStats().getUpdateSuccess()); + assertEquals(0, clusterManagerService.getStateStats().getUpdateFailed()); } } diff --git a/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java b/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java index e034c60ce5f4f..28d7706fb1493 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java +++ b/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java @@ -1017,7 +1017,7 @@ public void setLastAcceptedState(ClusterState clusterState) { } @Override - public PersistedStateStats getPersistedStateStats() { + public PersistedStateStats getStats() { return null; }