diff --git a/CHANGELOG.md b/CHANGELOG.md index 6055af47403bc..57354a972d89a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -96,6 +96,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add unreferenced file cleanup count to merge stats ([#10204](https://github.com/opensearch-project/OpenSearch/pull/10204)) - [Remote Store] Add support to restrict creation & deletion if system repository and mutation of immutable settings of system repository ([#9839](https://github.com/opensearch-project/OpenSearch/pull/9839)) - Improve compressed request handling ([#10261](https://github.com/opensearch-project/OpenSearch/pull/10261)) +- Add cluster state stats ### Dependencies - Bump `peter-evans/create-or-update-comment` from 2 to 3 ([#9575](https://github.com/opensearch-project/OpenSearch/pull/9575)) diff --git a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java index eb30460ca1b7f..28c76a36a2852 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java @@ -56,6 +56,7 @@ import org.opensearch.cluster.service.ClusterApplier; import org.opensearch.cluster.service.ClusterApplier.ClusterApplyListener; import org.opensearch.cluster.service.ClusterManagerService; +import org.opensearch.cluster.service.ClusterStateStats; import org.opensearch.common.Booleans; import org.opensearch.common.Nullable; import org.opensearch.common.Priority; @@ -83,6 +84,7 @@ import org.opensearch.discovery.PeerFinder; import org.opensearch.discovery.SeedHostsProvider; import org.opensearch.discovery.SeedHostsResolver; +import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.monitor.NodeHealthService; import org.opensearch.monitor.StatusInfo; import org.opensearch.node.remotestore.RemoteStoreNodeService; @@ -184,6 +186,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery private final NodeHealthService nodeHealthService; private final PersistedStateRegistry persistedStateRegistry; private final RemoteStoreNodeService remoteStoreNodeService; + private final RemoteClusterStateService remoteClusterStateService; /** * @param nodeName The name of the node, used to name the {@link java.util.concurrent.ExecutorService} of the {@link SeedHostsResolver}. @@ -206,7 +209,8 @@ public Coordinator( ElectionStrategy electionStrategy, NodeHealthService nodeHealthService, PersistedStateRegistry persistedStateRegistry, - RemoteStoreNodeService remoteStoreNodeService + RemoteStoreNodeService remoteStoreNodeService, + RemoteClusterStateService remoteClusterStateService ) { this.settings = settings; this.transportService = transportService; @@ -295,6 +299,7 @@ public Coordinator( this.persistedStateRegistry = persistedStateRegistry; this.localNodeCommissioned = true; this.remoteStoreNodeService = remoteStoreNodeService; + this.remoteClusterStateService = remoteClusterStateService; } private ClusterFormationState getClusterFormationState() { @@ -865,7 +870,10 @@ protected void doStart() { @Override public DiscoveryStats stats() { - return new DiscoveryStats(new PendingClusterStateStats(0, 0, 0), publicationHandler.stats()); + ClusterStateStats clusterStateStats = remoteClusterStateService != null + ? clusterManagerService.getStateStats(this.remoteClusterStateService.getRemoteClusterStateStats()) + : clusterManagerService.getStateStats(); + return new DiscoveryStats(new PendingClusterStateStats(0, 0, 0), publicationHandler.stats(), clusterStateStats); } @Override diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterStateStats.java b/server/src/main/java/org/opensearch/cluster/service/ClusterStateStats.java new file mode 100644 index 0000000000000..ac88b4981ef9d --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/service/ClusterStateStats.java @@ -0,0 +1,111 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.service; + +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.gateway.remote.RemoteClusterStateStats; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Cluster state related stats. + * + * @opensearch.internal + */ +public class ClusterStateStats implements Writeable, ToXContentObject { + + private AtomicLong stateUpdated = new AtomicLong(0); + private AtomicLong stateUpdateTimeTotalInMS = new AtomicLong(0); + private AtomicLong stateUpdateFailed = new AtomicLong(0); + private RemoteClusterStateStats remoteStateStats = null; + + public ClusterStateStats() {} + + public long getStateUpdated() { + return stateUpdated.get(); + } + + public long getStateUpdateTimeTotalInMS() { + return stateUpdateTimeTotalInMS.get(); + } + + public long getStateUpdateFailed() { + return stateUpdateFailed.get(); + } + + public void stateUpdated() { + stateUpdated.incrementAndGet(); + } + + public void stateUpdateFailed() { + stateUpdateFailed.incrementAndGet(); + } + + public void stateUpdateTook(long stateUpdateTime) { + stateUpdateTimeTotalInMS.addAndGet(stateUpdateTime); + } + + public void setRemoteStateStats(RemoteClusterStateStats remoteStateStats) { + this.remoteStateStats = remoteStateStats; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVLong(stateUpdated.get()); + out.writeVLong(stateUpdateTimeTotalInMS.get()); + out.writeVLong(stateUpdateFailed.get()); + if (remoteStateStats != null) { + remoteStateStats.writeTo(out); + } else { + out.writeBoolean(false); + } + } + + public ClusterStateStats(StreamInput in) throws IOException { + this.stateUpdated = new AtomicLong(in.readVLong()); + this.stateUpdateTimeTotalInMS = new AtomicLong(in.readVLong()); + this.stateUpdateFailed = new AtomicLong(in.readVLong()); + if (in.readBoolean()) { + this.remoteStateStats = new RemoteClusterStateStats(in); + } + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(Fields.CLUSTER_STATE); + builder.startObject(Fields.OVERALL_STATS); + builder.field(Fields.UPDATE_COUNT, getStateUpdated()); + builder.field(Fields.TOTAL_TIME_IN_MILLIS, getStateUpdateTimeTotalInMS()); + builder.field(Fields.FAILED_COUNT, getStateUpdateFailed()); + builder.endObject(); + if (remoteStateStats != null) { + remoteStateStats.toXContent(builder, params); + } + builder.endObject(); + return builder; + } + + /** + * Fields for parsing and toXContent + * + * @opensearch.internal + */ + static final class Fields { + static final String CLUSTER_STATE = "cluster_state"; + static final String OVERALL_STATS = "overall_stats"; + static final String UPDATE_COUNT = "update_count"; + static final String TOTAL_TIME_IN_MILLIS = "total_time_in_millis"; + static final String FAILED_COUNT = "failed_count"; + } +} diff --git a/server/src/main/java/org/opensearch/cluster/service/MasterService.java b/server/src/main/java/org/opensearch/cluster/service/MasterService.java index 563b69dfd0e2a..51b3e4261105d 100644 --- a/server/src/main/java/org/opensearch/cluster/service/MasterService.java +++ b/server/src/main/java/org/opensearch/cluster/service/MasterService.java @@ -68,6 +68,7 @@ import org.opensearch.core.common.text.Text; import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; import org.opensearch.discovery.Discovery; +import org.opensearch.gateway.remote.RemoteClusterStateStats; import org.opensearch.node.Node; import org.opensearch.threadpool.Scheduler; import org.opensearch.threadpool.ThreadPool; @@ -112,7 +113,9 @@ public class MasterService extends AbstractLifecycleComponent { static final String CLUSTER_MANAGER_UPDATE_THREAD_NAME = "clusterManagerService#updateTask"; - /** @deprecated As of 2.2, because supporting inclusive language, replaced by {@link #CLUSTER_MANAGER_UPDATE_THREAD_NAME} */ + /** + * @deprecated As of 2.2, because supporting inclusive language, replaced by {@link #CLUSTER_MANAGER_UPDATE_THREAD_NAME} + */ @Deprecated static final String MASTER_UPDATE_THREAD_NAME = "masterService#updateTask"; @@ -130,6 +133,7 @@ public class MasterService extends AbstractLifecycleComponent { private volatile Batcher taskBatcher; protected final ClusterManagerTaskThrottler clusterManagerTaskThrottler; private final ClusterManagerThrottlingStats throttlingStats; + private final ClusterStateStats stateStats; public MasterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) { this.nodeName = Objects.requireNonNull(Node.NODE_NAME_SETTING.get(settings)); @@ -147,6 +151,7 @@ public MasterService(Settings settings, ClusterSettings clusterSettings, ThreadP this::getMinNodeVersion, throttlingStats ); + this.stateStats = new ClusterStateStats(); this.threadPool = threadPool; } @@ -339,7 +344,7 @@ private TimeValue getTimeSince(long startTimeNanos) { return TimeValue.timeValueMillis(TimeValue.nsecToMSec(threadPool.preciseRelativeTimeInNanos() - startTimeNanos)); } - protected void publish(ClusterChangedEvent clusterChangedEvent, TaskOutputs taskOutputs, long startTimeMillis) { + protected void publish(ClusterChangedEvent clusterChangedEvent, TaskOutputs taskOutputs, long startTimeNanos) { final PlainActionFuture fut = new PlainActionFuture() { @Override protected boolean blockingAllowed() { @@ -352,8 +357,12 @@ protected boolean blockingAllowed() { try { FutureUtils.get(fut); onPublicationSuccess(clusterChangedEvent, taskOutputs); + final long durationMillis = getTimeSince(startTimeNanos).millis(); + stateStats.stateUpdateTook(durationMillis); + stateStats.stateUpdated(); } catch (Exception e) { - onPublicationFailed(clusterChangedEvent, taskOutputs, startTimeMillis, e); + stateStats.stateUpdateFailed(); + onPublicationFailed(clusterChangedEvent, taskOutputs, startTimeNanos, e); } } @@ -464,7 +473,6 @@ public Builder incrementVersion(ClusterState clusterState) { * @param source the source of the cluster state update task * @param updateTask the full context for the cluster state update * task - * */ public & ClusterStateTaskListener> void submitStateUpdateTask( String source, @@ -490,7 +498,6 @@ public & Cluster * @param listener callback after the cluster state update task * completes * @param the type of the cluster state update task state - * */ public void submitStateUpdateTask( String source, @@ -947,7 +954,7 @@ void onNoLongerClusterManager() { /** * Functionality for register task key to cluster manager node. * - * @param taskKey - task key of task + * @param taskKey - task key of task * @param throttlingEnabled - throttling is enabled for task or not i.e does data node perform retries on it or not * @return throttling task key which needs to be passed while submitting task to cluster manager */ @@ -966,7 +973,6 @@ public ClusterManagerTaskThrottler.ThrottlingKey registerClusterManagerTask(Stri * that share the same executor will be executed * batches on this executor * @param the type of the cluster state update task state - * */ public void submitStateUpdateTasks( final String source, @@ -996,4 +1002,13 @@ public void submitStateUpdateTasks( } } + public ClusterStateStats getStateStats(RemoteClusterStateStats remoteStateStats) { + stateStats.setRemoteStateStats(remoteStateStats); + return stateStats; + } + + public ClusterStateStats getStateStats() { + return stateStats; + } + } diff --git a/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java b/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java index 288371aa240a0..2edb9a61a85a8 100644 --- a/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java +++ b/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java @@ -52,6 +52,7 @@ import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.core.common.transport.TransportAddress; import org.opensearch.gateway.GatewayMetaState; +import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.monitor.NodeHealthService; import org.opensearch.node.remotestore.RemoteStoreNodeService; import org.opensearch.plugins.DiscoveryPlugin; @@ -133,7 +134,8 @@ public DiscoveryModule( RerouteService rerouteService, NodeHealthService nodeHealthService, PersistedStateRegistry persistedStateRegistry, - RemoteStoreNodeService remoteStoreNodeService + RemoteStoreNodeService remoteStoreNodeService, + RemoteClusterStateService remoteClusterStateService ) { final Collection> joinValidators = new ArrayList<>(); final Map> hostProviders = new HashMap<>(); @@ -211,7 +213,8 @@ public DiscoveryModule( electionStrategy, nodeHealthService, persistedStateRegistry, - remoteStoreNodeService + remoteStoreNodeService, + remoteClusterStateService ); } else { throw new IllegalArgumentException("Unknown discovery type [" + discoveryType + "]"); diff --git a/server/src/main/java/org/opensearch/discovery/DiscoveryStats.java b/server/src/main/java/org/opensearch/discovery/DiscoveryStats.java index 665ecf77d7aa7..ea93ccd09ed39 100644 --- a/server/src/main/java/org/opensearch/discovery/DiscoveryStats.java +++ b/server/src/main/java/org/opensearch/discovery/DiscoveryStats.java @@ -32,8 +32,10 @@ package org.opensearch.discovery; +import org.opensearch.Version; import org.opensearch.cluster.coordination.PendingClusterStateStats; import org.opensearch.cluster.coordination.PublishClusterStateStats; +import org.opensearch.cluster.service.ClusterStateStats; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; @@ -51,21 +53,31 @@ public class DiscoveryStats implements Writeable, ToXContentFragment { private final PendingClusterStateStats queueStats; private final PublishClusterStateStats publishStats; + private final ClusterStateStats clusterStateStats; - public DiscoveryStats(PendingClusterStateStats queueStats, PublishClusterStateStats publishStats) { + public DiscoveryStats(PendingClusterStateStats queueStats, PublishClusterStateStats publishStats, ClusterStateStats clusterStateStats) { this.queueStats = queueStats; this.publishStats = publishStats; + this.clusterStateStats = clusterStateStats; } public DiscoveryStats(StreamInput in) throws IOException { queueStats = in.readOptionalWriteable(PendingClusterStateStats::new); publishStats = in.readOptionalWriteable(PublishClusterStateStats::new); + if (in.getVersion().onOrAfter(Version.V_3_0_0)) { + clusterStateStats = in.readOptionalWriteable(ClusterStateStats::new); + } else { + clusterStateStats = null; + } } @Override public void writeTo(StreamOutput out) throws IOException { out.writeOptionalWriteable(queueStats); out.writeOptionalWriteable(publishStats); + if (out.getVersion().onOrAfter(Version.V_3_0_0)) { + out.writeOptionalWriteable(clusterStateStats); + } } @Override @@ -77,6 +89,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (publishStats != null) { publishStats.toXContent(builder, params); } + if (clusterStateStats != null) { + clusterStateStats.toXContent(builder, params); + } builder.endObject(); return builder; } @@ -92,4 +107,8 @@ public PendingClusterStateStats getQueueStats() { public PublishClusterStateStats getPublishStats() { return publishStats; } + + public ClusterStateStats getClusterStateStats() { + return clusterStateStats; + } } diff --git a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java index d3e7a0c482ee2..0cf8706fda3e2 100644 --- a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java +++ b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java @@ -712,7 +712,9 @@ assert verifyManifestAndClusterState(lastAcceptedManifest, lastAcceptedState) == // After the above PR is pushed, we can remove this silent failure and throw the exception instead. logger.error("Remote repository is not yet registered"); lastAcceptedState = clusterState; + remoteClusterStateService.writeMetadataFailed(); } catch (Exception e) { + remoteClusterStateService.writeMetadataFailed(); handleExceptionOnWrite(e); } } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 4a8a0618ffa60..3cb5399f5d83b 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -119,6 +119,7 @@ public class RemoteClusterStateService implements Closeable { private volatile TimeValue slowWriteLoggingThreshold; private final AtomicBoolean deleteStaleMetadataRunning = new AtomicBoolean(false); + private final RemoteClusterStateStats remoteStateStats; public RemoteClusterStateService( String nodeId, @@ -136,6 +137,7 @@ public RemoteClusterStateService( this.threadpool = threadPool; this.slowWriteLoggingThreshold = clusterSettings.get(SLOW_WRITE_LOGGING_THRESHOLD); clusterSettings.addSettingsUpdateConsumer(SLOW_WRITE_LOGGING_THRESHOLD, this::setSlowWriteLoggingThreshold); + this.remoteStateStats = new RemoteClusterStateStats(); } private BlobStoreTransferService getBlobStoreTransferService() { @@ -166,6 +168,8 @@ public ClusterMetadataManifest writeFullMetadata(ClusterState clusterState, Stri ); final ClusterMetadataManifest manifest = uploadManifest(clusterState, allUploadedIndexMetadata, previousClusterUUID, false); final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos); + remoteStateStats.stateUploaded(); + remoteStateStats.stateUploadTook(durationMillis); if (durationMillis >= slowWriteLoggingThreshold.getMillis()) { logger.warn( "writing cluster state took [{}ms] which is above the warn threshold of [{}]; " + "wrote full state with [{}] indices", @@ -250,6 +254,8 @@ public ClusterMetadataManifest writeIncrementalMetadata( deleteStaleClusterMetadata(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), RETAINED_MANIFESTS); final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos); + remoteStateStats.stateUploaded(); + remoteStateStats.stateUploadTook(durationMillis); if (durationMillis >= slowWriteLoggingThreshold.getMillis()) { logger.warn( "writing cluster state took [{}ms] which is above the warn threshold of [{}]; " @@ -791,6 +797,10 @@ public static String encodeString(String content) { return Base64.getUrlEncoder().withoutPadding().encodeToString(content.getBytes(StandardCharsets.UTF_8)); } + public void writeMetadataFailed() { + remoteStateStats.stateUploadFailed(); + } + /** * Exception for IndexMetadata transfer failures to remote */ @@ -831,6 +841,7 @@ public void onFailure(Exception e) { ), e ); + remoteStateStats.cleanUpAttemptFailed(); } } ); @@ -934,8 +945,10 @@ private void deleteClusterMetadata( logger.error("Error while fetching Remote Cluster Metadata manifests", e); } catch (IOException e) { logger.error("Error while deleting stale Remote Cluster Metadata files", e); + remoteStateStats.cleanUpAttemptFailed(); } catch (Exception e) { logger.error("Unexpected error while deleting stale Remote Cluster Metadata files", e); + remoteStateStats.cleanUpAttemptFailed(); } } @@ -966,4 +979,8 @@ public void deleteStaleClusterUUIDs(ClusterState clusterState, ClusterMetadataMa deleteStaleUUIDsClusterMetadata(clusterName, new ArrayList<>(allClustersUUIDsInRemote)); }); } + + public RemoteClusterStateStats getRemoteClusterStateStats() { + return remoteStateStats; + } } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateStats.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateStats.java new file mode 100644 index 0000000000000..a6f1dc7b96d1c --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateStats.java @@ -0,0 +1,109 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote; + +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Remote cluster state related stats. + * + * @opensearch.internal + */ +public class RemoteClusterStateStats implements Writeable, ToXContentObject { + private AtomicLong totalUploadTimeInMS = new AtomicLong(0); + private AtomicLong uploadFailedCount = new AtomicLong(0); + private AtomicLong uploadSuccessCount = new AtomicLong(0); + private AtomicLong cleanupAttemptFailedCount = new AtomicLong(0); + + public RemoteClusterStateStats() {} + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeBoolean(true); + out.writeVLong(uploadSuccessCount.get()); + out.writeVLong(uploadFailedCount.get()); + out.writeVLong(totalUploadTimeInMS.get()); + out.writeVLong(cleanupAttemptFailedCount.get()); + } + + public RemoteClusterStateStats(StreamInput in) throws IOException { + this.uploadSuccessCount = new AtomicLong(in.readVLong()); + this.uploadFailedCount = new AtomicLong(in.readVLong()); + this.totalUploadTimeInMS = new AtomicLong(in.readVLong()); + this.cleanupAttemptFailedCount = new AtomicLong(in.readVLong()); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(Fields.REMOTE_UPLOAD); + builder.field(Fields.UPDATE_COUNT, getUploadSuccessCount()); + builder.field(Fields.FAILED_COUNT, getUploadFailedCount()); + builder.field(Fields.TOTAL_TIME_IN_MILLIS, getTotalUploadTimeInMS()); + builder.field(Fields.CLEANUP_ATTEMPT_FAILED_COUNT, getCleanupAttemptFailedCount()); + builder.endObject(); + return builder; + } + + public void stateUploadFailed() { + uploadFailedCount.incrementAndGet(); + } + + public void stateUploaded() { + uploadSuccessCount.incrementAndGet(); + } + + /** + * Expects user to send time taken in milliseconds. + * + * @param timeTakenInUpload time taken in uploading the cluster state to remote + */ + public void stateUploadTook(long timeTakenInUpload) { + totalUploadTimeInMS.addAndGet(timeTakenInUpload); + } + + public void cleanUpAttemptFailed() { + cleanupAttemptFailedCount.incrementAndGet(); + } + + public long getTotalUploadTimeInMS() { + return totalUploadTimeInMS.get(); + } + + public long getUploadFailedCount() { + return uploadFailedCount.get(); + } + + public long getUploadSuccessCount() { + return uploadSuccessCount.get(); + } + + public long getCleanupAttemptFailedCount() { + return cleanupAttemptFailedCount.get(); + } + + /** + * Fields for parsing and toXContent + * + * @opensearch.internal + */ + static final class Fields { + static final String REMOTE_UPLOAD = "remote_upload"; + static final String UPDATE_COUNT = "update_count"; + static final String TOTAL_TIME_IN_MILLIS = "total_time_in_millis"; + static final String FAILED_COUNT = "failed_count"; + static final String CLEANUP_ATTEMPT_FAILED_COUNT = "cleanup_attempt_failed_count"; + } +} diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 5b3b064a47c66..8683874ec9aff 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -1048,7 +1048,8 @@ protected Node( rerouteService, fsHealthService, persistedStateRegistry, - remoteStoreNodeService + remoteStoreNodeService, + remoteClusterStateService ); final SearchPipelineService searchPipelineService = new SearchPipelineService( clusterService, @@ -1091,7 +1092,8 @@ protected Node( searchBackpressureService, searchPipelineService, fileCache, - taskCancellationMonitoringService + taskCancellationMonitoringService, + remoteClusterStateService ); final SearchService searchService = newSearchService( diff --git a/server/src/main/java/org/opensearch/node/NodeService.java b/server/src/main/java/org/opensearch/node/NodeService.java index 2688b894cb9a7..f370885c97541 100644 --- a/server/src/main/java/org/opensearch/node/NodeService.java +++ b/server/src/main/java/org/opensearch/node/NodeService.java @@ -46,6 +46,7 @@ import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.indices.breaker.CircuitBreakerService; import org.opensearch.discovery.Discovery; +import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.http.HttpServerTransport; import org.opensearch.index.IndexingPressureService; import org.opensearch.index.store.remote.filecache.FileCache; @@ -92,6 +93,7 @@ public class NodeService implements Closeable { private final Discovery discovery; private final FileCache fileCache; private final TaskCancellationMonitoringService taskCancellationMonitoringService; + private final RemoteClusterStateService remoteClusterStateService; NodeService( Settings settings, @@ -114,7 +116,8 @@ public class NodeService implements Closeable { SearchBackpressureService searchBackpressureService, SearchPipelineService searchPipelineService, FileCache fileCache, - TaskCancellationMonitoringService taskCancellationMonitoringService + TaskCancellationMonitoringService taskCancellationMonitoringService, + RemoteClusterStateService remoteClusterStateService ) { this.settings = settings; this.threadPool = threadPool; @@ -137,6 +140,7 @@ public class NodeService implements Closeable { this.clusterService = clusterService; this.fileCache = fileCache; this.taskCancellationMonitoringService = taskCancellationMonitoringService; + this.remoteClusterStateService = remoteClusterStateService; clusterService.addStateApplier(ingestService); clusterService.addStateApplier(searchPipelineService); } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java index e3f16463a5328..6e116ad65a34c 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -40,6 +40,7 @@ import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.WeightedRoutingStats; import org.opensearch.cluster.service.ClusterManagerThrottlingStats; +import org.opensearch.cluster.service.ClusterStateStats; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.metrics.OperationStats; import org.opensearch.core.common.io.stream.StreamInput; @@ -686,7 +687,8 @@ public static NodeStats createNodeStats(boolean remoteStoreStats) { randomBoolean() ? new PendingClusterStateStats(randomInt(), randomInt(), randomInt()) : null, randomBoolean() ? new PublishClusterStateStats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()) - : null + : null, + randomBoolean() ? new ClusterStateStats() : null ) : null; IngestStats ingestStats = null; diff --git a/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java b/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java index d94f3fb304fe2..a209a5e032a23 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java @@ -57,6 +57,7 @@ import org.opensearch.common.util.concurrent.FutureUtils; import org.opensearch.core.action.ActionListener; import org.opensearch.core.transport.TransportResponse; +import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.monitor.NodeHealthService; import org.opensearch.monitor.StatusInfo; import org.opensearch.node.Node; @@ -270,7 +271,8 @@ protected void onSendRequest( ElectionStrategy.DEFAULT_INSTANCE, nodeHealthService, persistedStateRegistry, - Mockito.mock(RemoteStoreNodeService.class) + Mockito.mock(RemoteStoreNodeService.class), + Mockito.mock(RemoteClusterStateService.class) ); transportService.start(); transportService.acceptIncomingRequests(); diff --git a/server/src/test/java/org/opensearch/discovery/DiscoveryModuleTests.java b/server/src/test/java/org/opensearch/discovery/DiscoveryModuleTests.java index b33ebf8333b36..0b951c1927c71 100644 --- a/server/src/test/java/org/opensearch/discovery/DiscoveryModuleTests.java +++ b/server/src/test/java/org/opensearch/discovery/DiscoveryModuleTests.java @@ -128,7 +128,8 @@ private DiscoveryModule newModule(Settings settings, List plugi mock(RerouteService.class), null, new PersistedStateRegistry(), - remoteStoreNodeService + remoteStoreNodeService, + null ); } diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index 6ecbc23f75bee..b809fa3e670d1 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -275,6 +275,7 @@ public void testWriteFullMetadataInParallelFailure() throws IOException { RemoteClusterStateService.IndexMetadataTransferException.class, () -> remoteClusterStateService.writeFullMetadata(clusterState, randomAlphaOfLength(10)) ); + assertEquals(0, remoteClusterStateService.getRemoteClusterStateStats().getUploadSuccessCount()); } public void testFailWriteIncrementalMetadataNonClusterManagerNode() throws IOException { @@ -282,6 +283,7 @@ public void testFailWriteIncrementalMetadataNonClusterManagerNode() throws IOExc remoteClusterStateService.start(); final ClusterMetadataManifest manifest = remoteClusterStateService.writeIncrementalMetadata(clusterState, clusterState, null); Assert.assertThat(manifest, nullValue()); + assertEquals(0, remoteClusterStateService.getRemoteClusterStateStats().getUploadSuccessCount()); } public void testFailWriteIncrementalMetadataWhenTermChanged() { @@ -673,6 +675,29 @@ public void testDeleteStaleClusterUUIDs() throws IOException { } } + public void testRemoteStateStats() throws IOException { + final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); + mockBlobStoreObjects(); + remoteClusterStateService.start(); + final ClusterMetadataManifest manifest = remoteClusterStateService.writeFullMetadata(clusterState, "prev-cluster-uuid"); + final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "index-uuid", "metadata-filename"); + List indices = List.of(uploadedIndexMetadata); + + final ClusterMetadataManifest expectedManifest = ClusterMetadataManifest.builder() + .indices(indices) + .clusterTerm(1L) + .stateVersion(1L) + .stateUUID("state-uuid") + .clusterUUID("cluster-uuid") + .previousClusterUUID("prev-cluster-uuid") + .build(); + + assertTrue(remoteClusterStateService.getRemoteClusterStateStats() != null); + assertEquals(1, remoteClusterStateService.getRemoteClusterStateStats().getUploadSuccessCount()); + assertEquals(0, remoteClusterStateService.getRemoteClusterStateStats().getCleanupAttemptFailedCount()); + assertEquals(0, remoteClusterStateService.getRemoteClusterStateStats().getUploadFailedCount()); + } + private void mockObjectsForGettingPreviousClusterUUID(Map clusterUUIDsPointers) throws IOException { final BlobPath blobPath = mock(BlobPath.class); when((blobStoreRepository.basePath())).thenReturn(blobPath); diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 97c5d23831965..233a9281e775b 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -2523,7 +2523,8 @@ public void start(ClusterState initialState) { ElectionStrategy.DEFAULT_INSTANCE, () -> new StatusInfo(HEALTHY, "healthy-info"), persistedStateRegistry, - remoteStoreNodeService + remoteStoreNodeService, + null ); clusterManagerService.setClusterStatePublisher(coordinator); coordinator.start(); diff --git a/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java b/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java index d24cc24d28579..03b92f59eb155 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java +++ b/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java @@ -84,6 +84,7 @@ import org.opensearch.gateway.GatewayService; import org.opensearch.gateway.MockGatewayMetaState; import org.opensearch.gateway.PersistedClusterStateService; +import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.monitor.NodeHealthService; import org.opensearch.monitor.StatusInfo; import org.opensearch.node.remotestore.RemoteStoreNodeService; @@ -151,6 +152,7 @@ import static org.opensearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; import static org.opensearch.monitor.StatusInfo.Status.HEALTHY; import static org.opensearch.node.Node.NODE_NAME_SETTING; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled; import static org.opensearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR; import static org.opensearch.transport.TransportSettings.CONNECT_TIMEOUT; import static org.hamcrest.Matchers.empty; @@ -1151,6 +1153,19 @@ protected Optional getDisruptableMockTransport(Transpo (dn, cs) -> extraJoinValidators.forEach(validator -> validator.accept(dn, cs)) ); final AllocationService allocationService = OpenSearchAllocationTestCase.createAllocationService(Settings.EMPTY); + RemoteClusterStateService remoteClusterStateService; + if (isRemoteStoreClusterStateEnabled(settings)) { + remoteClusterStateService = new RemoteClusterStateService( + localNode.getId(), + new SetOnce<>(repositoriesService)::get, + settings, + clusterService.getClusterSettings(), + threadPool::preciseRelativeTimeInNanos, + threadPool + ); + } else { + remoteClusterStateService = null; + } coordinator = new Coordinator( "test_node", settings, @@ -1168,7 +1183,8 @@ protected Optional getDisruptableMockTransport(Transpo getElectionStrategy(), nodeHealthService, persistedStateRegistry, - remoteStoreNodeService + remoteStoreNodeService, + remoteClusterStateService ); clusterManagerService.setClusterStatePublisher(coordinator); final GatewayService gatewayService = new GatewayService(