diff --git a/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java b/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java index a339852e6ed8d..c20fd72b88345 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java @@ -638,6 +638,12 @@ public interface PersistedState extends Closeable { */ void setLastAcceptedState(ClusterState clusterState); + /** + * Returns the stats for the persistence layer for {@link CoordinationState}. + * @return PersistedStateStats + */ + PersistedStateStats getPersistedStateStats(); + /** * Marks the last accepted cluster state as committed. * After a successful call to this method, {@link #getLastAcceptedState()} should return the last cluster state that was set, diff --git a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java index 28c76a36a2852..a3ebf793ca287 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java @@ -84,7 +84,6 @@ import org.opensearch.discovery.PeerFinder; import org.opensearch.discovery.SeedHostsProvider; import org.opensearch.discovery.SeedHostsResolver; -import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.monitor.NodeHealthService; import org.opensearch.monitor.StatusInfo; import org.opensearch.node.remotestore.RemoteStoreNodeService; @@ -186,7 +185,6 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery private final NodeHealthService nodeHealthService; private final PersistedStateRegistry persistedStateRegistry; private final RemoteStoreNodeService remoteStoreNodeService; - private final RemoteClusterStateService remoteClusterStateService; /** * @param nodeName The name of the node, used to name the {@link java.util.concurrent.ExecutorService} of the {@link SeedHostsResolver}. @@ -209,8 +207,7 @@ public Coordinator( ElectionStrategy electionStrategy, NodeHealthService nodeHealthService, PersistedStateRegistry persistedStateRegistry, - RemoteStoreNodeService remoteStoreNodeService, - RemoteClusterStateService remoteClusterStateService + RemoteStoreNodeService remoteStoreNodeService ) { this.settings = settings; this.transportService = transportService; @@ -299,7 +296,6 @@ public Coordinator( this.persistedStateRegistry = persistedStateRegistry; this.localNodeCommissioned = true; this.remoteStoreNodeService = remoteStoreNodeService; - this.remoteClusterStateService = remoteClusterStateService; } private ClusterFormationState getClusterFormationState() { @@ -870,9 +866,13 @@ protected void doStart() { @Override public DiscoveryStats stats() { - ClusterStateStats clusterStateStats = remoteClusterStateService != null - ? clusterManagerService.getStateStats(this.remoteClusterStateService.getRemoteClusterStateStats()) - : clusterManagerService.getStateStats(); + CoordinationState.PersistedState remotePersistedState = persistedStateRegistry.getPersistedState( + PersistedStateRegistry.PersistedStateType.REMOTE + ); + ClusterStateStats clusterStateStats = clusterManagerService.getStateStats(); + if (remotePersistedState != null) { + clusterStateStats.setRemoteStateStats(remotePersistedState.getPersistedStateStats()); + } return new DiscoveryStats(new PendingClusterStateStats(0, 0, 0), publicationHandler.stats(), clusterStateStats); } diff --git a/server/src/main/java/org/opensearch/cluster/coordination/InMemoryPersistedState.java b/server/src/main/java/org/opensearch/cluster/coordination/InMemoryPersistedState.java index 67ef82ee7b2e9..e9390044bf714 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/InMemoryPersistedState.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/InMemoryPersistedState.java @@ -65,6 +65,11 @@ public void setLastAcceptedState(ClusterState clusterState) { this.acceptedState = clusterState; } + @Override + public PersistedStateStats getPersistedStateStats() { + return null; + } + @Override public long getCurrentTerm() { return currentTerm; diff --git a/server/src/main/java/org/opensearch/cluster/coordination/PersistedStateStats.java b/server/src/main/java/org/opensearch/cluster/coordination/PersistedStateStats.java new file mode 100644 index 0000000000000..3c3fcd4b5a9e4 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/coordination/PersistedStateStats.java @@ -0,0 +1,126 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.coordination; + +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Persisted cluster state related stats. + * + * @opensearch.internal + */ +public class PersistedStateStats implements Writeable, ToXContentObject { + private String statsName; + private AtomicLong totalTimeInMillis = new AtomicLong(0); + private AtomicLong failedCount = new AtomicLong(0); + private AtomicLong successCount = new AtomicLong(0); + private Map extendedFields = new HashMap<>(); // keeping minimal extensibility + + public PersistedStateStats(String statsName) { + this.statsName = statsName; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVLong(successCount.get()); + out.writeVLong(failedCount.get()); + out.writeVLong(totalTimeInMillis.get()); + if (extendedFields.size() > 0) { + out.writeBoolean(true); + out.writeVInt(extendedFields.size()); + for (Map.Entry extendedField : extendedFields.entrySet()) { + out.writeString(extendedField.getKey()); + out.writeVLong(extendedField.getValue().get()); + } + } else { + out.writeBoolean(false); + } + } + + public PersistedStateStats(StreamInput in) throws IOException { + this.successCount = new AtomicLong(in.readVLong()); + this.failedCount = new AtomicLong(in.readVLong()); + this.totalTimeInMillis = new AtomicLong(in.readVLong()); + if (in.readBoolean()) { + int extendedFieldsSize = in.readVInt(); + this.extendedFields = new HashMap<>(); + for (int fieldNumber = 0; fieldNumber < extendedFieldsSize; fieldNumber++) { + extendedFields.put(in.readString(), new AtomicLong(in.readVLong())); + } + } + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(statsName); + builder.field(Fields.UPDATE_COUNT, getSuccessCount()); + builder.field(Fields.FAILED_COUNT, getFailedCount()); + builder.field(Fields.TOTAL_TIME_IN_MILLIS, getTotalTimeInMillis()); + if (extendedFields.size() > 0) { + for (Map.Entry extendedField : extendedFields.entrySet()) { + builder.field(extendedField.getKey(), extendedField.getValue().get()); + } + } + builder.endObject(); + return builder; + } + + public void stateUploadFailed() { + failedCount.incrementAndGet(); + } + + public void stateUploaded() { + successCount.incrementAndGet(); + } + + /** + * Expects user to send time taken in milliseconds. + * + * @param timeTakenInUpload time taken in uploading the cluster state to remote + */ + public void stateUploadTook(long timeTakenInUpload) { + totalTimeInMillis.addAndGet(timeTakenInUpload); + } + + public long getTotalTimeInMillis() { + return totalTimeInMillis.get(); + } + + public long getFailedCount() { + return failedCount.get(); + } + + public long getSuccessCount() { + return successCount.get(); + } + + protected void addToExtendedFields(String extendedField, AtomicLong extendedFieldValue) { + this.extendedFields.put(extendedField, extendedFieldValue); + } + + /** + * Fields for parsing and toXContent + * + * @opensearch.internal + */ + static final class Fields { + static final String UPDATE_COUNT = "update_count"; + static final String TOTAL_TIME_IN_MILLIS = "total_time_in_millis"; + static final String FAILED_COUNT = "failed_count"; + } +} diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterStateStats.java b/server/src/main/java/org/opensearch/cluster/service/ClusterStateStats.java index ac88b4981ef9d..ec8b600202907 100644 --- a/server/src/main/java/org/opensearch/cluster/service/ClusterStateStats.java +++ b/server/src/main/java/org/opensearch/cluster/service/ClusterStateStats.java @@ -8,12 +8,12 @@ package org.opensearch.cluster.service; +import org.opensearch.cluster.coordination.PersistedStateStats; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.core.xcontent.ToXContentObject; import org.opensearch.core.xcontent.XContentBuilder; -import org.opensearch.gateway.remote.RemoteClusterStateStats; import java.io.IOException; import java.util.concurrent.atomic.AtomicLong; @@ -25,19 +25,19 @@ */ public class ClusterStateStats implements Writeable, ToXContentObject { - private AtomicLong stateUpdated = new AtomicLong(0); - private AtomicLong stateUpdateTimeTotalInMS = new AtomicLong(0); + private AtomicLong stateUpdateSuccess = new AtomicLong(0); + private AtomicLong stateUpdateTotalTimeInMillis = new AtomicLong(0); private AtomicLong stateUpdateFailed = new AtomicLong(0); - private RemoteClusterStateStats remoteStateStats = null; + private PersistedStateStats remoteStateStats = null; public ClusterStateStats() {} - public long getStateUpdated() { - return stateUpdated.get(); + public long getStateUpdateSuccess() { + return stateUpdateSuccess.get(); } - public long getStateUpdateTimeTotalInMS() { - return stateUpdateTimeTotalInMS.get(); + public long getStateUpdateTotalTimeInMillis() { + return stateUpdateTotalTimeInMillis.get(); } public long getStateUpdateFailed() { @@ -45,7 +45,7 @@ public long getStateUpdateFailed() { } public void stateUpdated() { - stateUpdated.incrementAndGet(); + stateUpdateSuccess.incrementAndGet(); } public void stateUpdateFailed() { @@ -53,19 +53,20 @@ public void stateUpdateFailed() { } public void stateUpdateTook(long stateUpdateTime) { - stateUpdateTimeTotalInMS.addAndGet(stateUpdateTime); + stateUpdateTotalTimeInMillis.addAndGet(stateUpdateTime); } - public void setRemoteStateStats(RemoteClusterStateStats remoteStateStats) { + public void setRemoteStateStats(PersistedStateStats remoteStateStats) { this.remoteStateStats = remoteStateStats; } @Override public void writeTo(StreamOutput out) throws IOException { - out.writeVLong(stateUpdated.get()); - out.writeVLong(stateUpdateTimeTotalInMS.get()); + out.writeVLong(stateUpdateSuccess.get()); + out.writeVLong(stateUpdateTotalTimeInMillis.get()); out.writeVLong(stateUpdateFailed.get()); if (remoteStateStats != null) { + out.writeBoolean(true); remoteStateStats.writeTo(out); } else { out.writeBoolean(false); @@ -73,20 +74,20 @@ public void writeTo(StreamOutput out) throws IOException { } public ClusterStateStats(StreamInput in) throws IOException { - this.stateUpdated = new AtomicLong(in.readVLong()); - this.stateUpdateTimeTotalInMS = new AtomicLong(in.readVLong()); + this.stateUpdateSuccess = new AtomicLong(in.readVLong()); + this.stateUpdateTotalTimeInMillis = new AtomicLong(in.readVLong()); this.stateUpdateFailed = new AtomicLong(in.readVLong()); if (in.readBoolean()) { - this.remoteStateStats = new RemoteClusterStateStats(in); + this.remoteStateStats = new PersistedStateStats(in); } } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(Fields.CLUSTER_STATE); - builder.startObject(Fields.OVERALL_STATS); - builder.field(Fields.UPDATE_COUNT, getStateUpdated()); - builder.field(Fields.TOTAL_TIME_IN_MILLIS, getStateUpdateTimeTotalInMS()); + builder.startObject(Fields.CLUSTER_STATE_STATS); + builder.startObject(Fields.OVERALL); + builder.field(Fields.UPDATE_COUNT, getStateUpdateSuccess()); + builder.field(Fields.TOTAL_TIME_IN_MILLIS, getStateUpdateTotalTimeInMillis()); builder.field(Fields.FAILED_COUNT, getStateUpdateFailed()); builder.endObject(); if (remoteStateStats != null) { @@ -102,8 +103,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws * @opensearch.internal */ static final class Fields { - static final String CLUSTER_STATE = "cluster_state"; - static final String OVERALL_STATS = "overall_stats"; + static final String CLUSTER_STATE_STATS = "cluster_state_stats"; + static final String OVERALL = "overall"; static final String UPDATE_COUNT = "update_count"; static final String TOTAL_TIME_IN_MILLIS = "total_time_in_millis"; static final String FAILED_COUNT = "failed_count"; diff --git a/server/src/main/java/org/opensearch/cluster/service/MasterService.java b/server/src/main/java/org/opensearch/cluster/service/MasterService.java index 51b3e4261105d..4f343f3fd0569 100644 --- a/server/src/main/java/org/opensearch/cluster/service/MasterService.java +++ b/server/src/main/java/org/opensearch/cluster/service/MasterService.java @@ -68,7 +68,6 @@ import org.opensearch.core.common.text.Text; import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; import org.opensearch.discovery.Discovery; -import org.opensearch.gateway.remote.RemoteClusterStateStats; import org.opensearch.node.Node; import org.opensearch.threadpool.Scheduler; import org.opensearch.threadpool.ThreadPool; @@ -1002,11 +1001,6 @@ public void submitStateUpdateTasks( } } - public ClusterStateStats getStateStats(RemoteClusterStateStats remoteStateStats) { - stateStats.setRemoteStateStats(remoteStateStats); - return stateStats; - } - public ClusterStateStats getStateStats() { return stateStats; } diff --git a/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java b/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java index 2edb9a61a85a8..288371aa240a0 100644 --- a/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java +++ b/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java @@ -52,7 +52,6 @@ import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.core.common.transport.TransportAddress; import org.opensearch.gateway.GatewayMetaState; -import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.monitor.NodeHealthService; import org.opensearch.node.remotestore.RemoteStoreNodeService; import org.opensearch.plugins.DiscoveryPlugin; @@ -134,8 +133,7 @@ public DiscoveryModule( RerouteService rerouteService, NodeHealthService nodeHealthService, PersistedStateRegistry persistedStateRegistry, - RemoteStoreNodeService remoteStoreNodeService, - RemoteClusterStateService remoteClusterStateService + RemoteStoreNodeService remoteStoreNodeService ) { final Collection> joinValidators = new ArrayList<>(); final Map> hostProviders = new HashMap<>(); @@ -213,8 +211,7 @@ public DiscoveryModule( electionStrategy, nodeHealthService, persistedStateRegistry, - remoteStoreNodeService, - remoteClusterStateService + remoteStoreNodeService ); } else { throw new IllegalArgumentException("Unknown discovery type [" + discoveryType + "]"); diff --git a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java index 0cf8706fda3e2..bba9abbd67ed6 100644 --- a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java +++ b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java @@ -47,6 +47,7 @@ import org.opensearch.cluster.coordination.InMemoryPersistedState; import org.opensearch.cluster.coordination.PersistedStateRegistry; import org.opensearch.cluster.coordination.PersistedStateRegistry.PersistedStateType; +import org.opensearch.cluster.coordination.PersistedStateStats; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.IndexTemplateMetadata; import org.opensearch.cluster.metadata.Manifest; @@ -605,6 +606,11 @@ public void setLastAcceptedState(ClusterState clusterState) { lastAcceptedState = clusterState; } + @Override + public PersistedStateStats getPersistedStateStats() { + return null; + } + private PersistedClusterStateService.Writer getWriterSafe() { final PersistedClusterStateService.Writer writer = persistenceWriter.get(); if (writer == null) { @@ -712,13 +718,17 @@ assert verifyManifestAndClusterState(lastAcceptedManifest, lastAcceptedState) == // After the above PR is pushed, we can remove this silent failure and throw the exception instead. logger.error("Remote repository is not yet registered"); lastAcceptedState = clusterState; - remoteClusterStateService.writeMetadataFailed(); } catch (Exception e) { remoteClusterStateService.writeMetadataFailed(); handleExceptionOnWrite(e); } } + @Override + public PersistedStateStats getPersistedStateStats() { + return remoteClusterStateService.getRemoteClusterStateStats(); + } + private boolean verifyManifestAndClusterState(ClusterMetadataManifest manifest, ClusterState clusterState) { assert manifest != null : "ClusterMetadataManifest is null"; assert clusterState != null : "ClusterState is null"; diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 3a3194b90567e..251b55a31e16c 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -119,8 +119,7 @@ public class RemoteClusterStateService implements Closeable { private volatile TimeValue slowWriteLoggingThreshold; private final AtomicBoolean deleteStaleMetadataRunning = new AtomicBoolean(false); - private final RemoteClusterStateStats remoteStateStats; - + private final RemoteStateStats remoteStateStats; public static final int INDEX_METADATA_CURRENT_CODEC_VERSION = 1; public static final int MANIFEST_CURRENT_CODEC_VERSION = 1; @@ -140,7 +139,7 @@ public RemoteClusterStateService( this.threadpool = threadPool; this.slowWriteLoggingThreshold = clusterSettings.get(SLOW_WRITE_LOGGING_THRESHOLD); clusterSettings.addSettingsUpdateConsumer(SLOW_WRITE_LOGGING_THRESHOLD, this::setSlowWriteLoggingThreshold); - this.remoteStateStats = new RemoteClusterStateStats(); + this.remoteStateStats = new RemoteStateStats(); } private BlobStoreTransferService getBlobStoreTransferService() { @@ -990,7 +989,7 @@ public void deleteStaleClusterUUIDs(ClusterState clusterState, ClusterMetadataMa }); } - public RemoteClusterStateStats getRemoteClusterStateStats() { + public RemoteStateStats getRemoteClusterStateStats() { return remoteStateStats; } } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateStats.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateStats.java deleted file mode 100644 index a6f1dc7b96d1c..0000000000000 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateStats.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.gateway.remote; - -import org.opensearch.core.common.io.stream.StreamInput; -import org.opensearch.core.common.io.stream.StreamOutput; -import org.opensearch.core.common.io.stream.Writeable; -import org.opensearch.core.xcontent.ToXContentObject; -import org.opensearch.core.xcontent.XContentBuilder; - -import java.io.IOException; -import java.util.concurrent.atomic.AtomicLong; - -/** - * Remote cluster state related stats. - * - * @opensearch.internal - */ -public class RemoteClusterStateStats implements Writeable, ToXContentObject { - private AtomicLong totalUploadTimeInMS = new AtomicLong(0); - private AtomicLong uploadFailedCount = new AtomicLong(0); - private AtomicLong uploadSuccessCount = new AtomicLong(0); - private AtomicLong cleanupAttemptFailedCount = new AtomicLong(0); - - public RemoteClusterStateStats() {} - - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeBoolean(true); - out.writeVLong(uploadSuccessCount.get()); - out.writeVLong(uploadFailedCount.get()); - out.writeVLong(totalUploadTimeInMS.get()); - out.writeVLong(cleanupAttemptFailedCount.get()); - } - - public RemoteClusterStateStats(StreamInput in) throws IOException { - this.uploadSuccessCount = new AtomicLong(in.readVLong()); - this.uploadFailedCount = new AtomicLong(in.readVLong()); - this.totalUploadTimeInMS = new AtomicLong(in.readVLong()); - this.cleanupAttemptFailedCount = new AtomicLong(in.readVLong()); - } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(Fields.REMOTE_UPLOAD); - builder.field(Fields.UPDATE_COUNT, getUploadSuccessCount()); - builder.field(Fields.FAILED_COUNT, getUploadFailedCount()); - builder.field(Fields.TOTAL_TIME_IN_MILLIS, getTotalUploadTimeInMS()); - builder.field(Fields.CLEANUP_ATTEMPT_FAILED_COUNT, getCleanupAttemptFailedCount()); - builder.endObject(); - return builder; - } - - public void stateUploadFailed() { - uploadFailedCount.incrementAndGet(); - } - - public void stateUploaded() { - uploadSuccessCount.incrementAndGet(); - } - - /** - * Expects user to send time taken in milliseconds. - * - * @param timeTakenInUpload time taken in uploading the cluster state to remote - */ - public void stateUploadTook(long timeTakenInUpload) { - totalUploadTimeInMS.addAndGet(timeTakenInUpload); - } - - public void cleanUpAttemptFailed() { - cleanupAttemptFailedCount.incrementAndGet(); - } - - public long getTotalUploadTimeInMS() { - return totalUploadTimeInMS.get(); - } - - public long getUploadFailedCount() { - return uploadFailedCount.get(); - } - - public long getUploadSuccessCount() { - return uploadSuccessCount.get(); - } - - public long getCleanupAttemptFailedCount() { - return cleanupAttemptFailedCount.get(); - } - - /** - * Fields for parsing and toXContent - * - * @opensearch.internal - */ - static final class Fields { - static final String REMOTE_UPLOAD = "remote_upload"; - static final String UPDATE_COUNT = "update_count"; - static final String TOTAL_TIME_IN_MILLIS = "total_time_in_millis"; - static final String FAILED_COUNT = "failed_count"; - static final String CLEANUP_ATTEMPT_FAILED_COUNT = "cleanup_attempt_failed_count"; - } -} diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteStateStats.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteStateStats.java new file mode 100644 index 0000000000000..819508acfcb90 --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteStateStats.java @@ -0,0 +1,37 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote; + +import org.opensearch.cluster.coordination.PersistedStateStats; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * Remote state related extended stats. + * + * @opensearch.internal + */ +public class RemoteStateStats extends PersistedStateStats { + static final String CLEANUP_ATTEMPT_FAILED_COUNT = "cleanup_attempt_failed_count"; + static final String REMOTE = "remote"; + private AtomicLong cleanupAttemptFailedCount = new AtomicLong(0); + + public RemoteStateStats() { + super(REMOTE); + addToExtendedFields(CLEANUP_ATTEMPT_FAILED_COUNT, cleanupAttemptFailedCount); + } + + public void cleanUpAttemptFailed() { + cleanupAttemptFailedCount.incrementAndGet(); + } + + public long getCleanupAttemptFailedCount() { + return cleanupAttemptFailedCount.get(); + } +} diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 065df4c3e0870..69b80462bbf0b 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -1061,8 +1061,7 @@ protected Node( rerouteService, fsHealthService, persistedStateRegistry, - remoteStoreNodeService, - remoteClusterStateService + remoteStoreNodeService ); final SearchPipelineService searchPipelineService = new SearchPipelineService( clusterService, @@ -1116,8 +1115,7 @@ protected Node( searchPipelineService, fileCache, taskCancellationMonitoringService, - resourceUsageCollectorService, - remoteClusterStateService + resourceUsageCollectorService ); final SearchService searchService = newSearchService( diff --git a/server/src/main/java/org/opensearch/node/NodeService.java b/server/src/main/java/org/opensearch/node/NodeService.java index 8531d1f5d77c3..9bb07080fa717 100644 --- a/server/src/main/java/org/opensearch/node/NodeService.java +++ b/server/src/main/java/org/opensearch/node/NodeService.java @@ -46,7 +46,6 @@ import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.indices.breaker.CircuitBreakerService; import org.opensearch.discovery.Discovery; -import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.http.HttpServerTransport; import org.opensearch.index.IndexingPressureService; import org.opensearch.index.store.remote.filecache.FileCache; @@ -94,7 +93,6 @@ public class NodeService implements Closeable { private final Discovery discovery; private final FileCache fileCache; private final TaskCancellationMonitoringService taskCancellationMonitoringService; - private final RemoteClusterStateService remoteClusterStateService; NodeService( Settings settings, @@ -118,8 +116,7 @@ public class NodeService implements Closeable { SearchPipelineService searchPipelineService, FileCache fileCache, TaskCancellationMonitoringService taskCancellationMonitoringService, - ResourceUsageCollectorService resourceUsageCollectorService, - RemoteClusterStateService remoteClusterStateService + ResourceUsageCollectorService resourceUsageCollectorService ) { this.settings = settings; this.threadPool = threadPool; @@ -143,7 +140,6 @@ public class NodeService implements Closeable { this.fileCache = fileCache; this.taskCancellationMonitoringService = taskCancellationMonitoringService; this.resourceUsageCollectorService = resourceUsageCollectorService; - this.remoteClusterStateService = remoteClusterStateService; clusterService.addStateApplier(ingestService); clusterService.addStateApplier(searchPipelineService); } diff --git a/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java b/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java index a209a5e032a23..d94f3fb304fe2 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java @@ -57,7 +57,6 @@ import org.opensearch.common.util.concurrent.FutureUtils; import org.opensearch.core.action.ActionListener; import org.opensearch.core.transport.TransportResponse; -import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.monitor.NodeHealthService; import org.opensearch.monitor.StatusInfo; import org.opensearch.node.Node; @@ -271,8 +270,7 @@ protected void onSendRequest( ElectionStrategy.DEFAULT_INSTANCE, nodeHealthService, persistedStateRegistry, - Mockito.mock(RemoteStoreNodeService.class), - Mockito.mock(RemoteClusterStateService.class) + Mockito.mock(RemoteStoreNodeService.class) ); transportService.start(); transportService.acceptIncomingRequests(); diff --git a/server/src/test/java/org/opensearch/discovery/DiscoveryModuleTests.java b/server/src/test/java/org/opensearch/discovery/DiscoveryModuleTests.java index 0b951c1927c71..b33ebf8333b36 100644 --- a/server/src/test/java/org/opensearch/discovery/DiscoveryModuleTests.java +++ b/server/src/test/java/org/opensearch/discovery/DiscoveryModuleTests.java @@ -128,8 +128,7 @@ private DiscoveryModule newModule(Settings settings, List plugi mock(RerouteService.class), null, new PersistedStateRegistry(), - remoteStoreNodeService, - null + remoteStoreNodeService ); } diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index ae08b02ebe3e9..1edfb7c35094e 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -279,7 +279,7 @@ public void testWriteFullMetadataInParallelFailure() throws IOException { RemoteClusterStateService.IndexMetadataTransferException.class, () -> remoteClusterStateService.writeFullMetadata(clusterState, randomAlphaOfLength(10)) ); - assertEquals(0, remoteClusterStateService.getRemoteClusterStateStats().getUploadSuccessCount()); + assertEquals(0, remoteClusterStateService.getRemoteClusterStateStats().getSuccessCount()); } public void testFailWriteIncrementalMetadataNonClusterManagerNode() throws IOException { @@ -287,7 +287,7 @@ public void testFailWriteIncrementalMetadataNonClusterManagerNode() throws IOExc remoteClusterStateService.start(); final ClusterMetadataManifest manifest = remoteClusterStateService.writeIncrementalMetadata(clusterState, clusterState, null); Assert.assertThat(manifest, nullValue()); - assertEquals(0, remoteClusterStateService.getRemoteClusterStateStats().getUploadSuccessCount()); + assertEquals(0, remoteClusterStateService.getRemoteClusterStateStats().getSuccessCount()); } public void testFailWriteIncrementalMetadataWhenTermChanged() { @@ -697,9 +697,9 @@ public void testRemoteStateStats() throws IOException { .build(); assertTrue(remoteClusterStateService.getRemoteClusterStateStats() != null); - assertEquals(1, remoteClusterStateService.getRemoteClusterStateStats().getUploadSuccessCount()); + assertEquals(1, remoteClusterStateService.getRemoteClusterStateStats().getSuccessCount()); assertEquals(0, remoteClusterStateService.getRemoteClusterStateStats().getCleanupAttemptFailedCount()); - assertEquals(0, remoteClusterStateService.getRemoteClusterStateStats().getUploadFailedCount()); + assertEquals(0, remoteClusterStateService.getRemoteClusterStateStats().getFailedCount()); } public void testFileNames() { diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 233a9281e775b..97c5d23831965 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -2523,8 +2523,7 @@ public void start(ClusterState initialState) { ElectionStrategy.DEFAULT_INSTANCE, () -> new StatusInfo(HEALTHY, "healthy-info"), persistedStateRegistry, - remoteStoreNodeService, - null + remoteStoreNodeService ); clusterManagerService.setClusterStatePublisher(coordinator); coordinator.start(); diff --git a/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java b/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java index 03b92f59eb155..29f4765c4d29e 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java +++ b/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java @@ -1018,6 +1018,11 @@ public void setLastAcceptedState(ClusterState clusterState) { delegate.setLastAcceptedState(clusterState); } + @Override + public PersistedStateStats getPersistedStateStats() { + return null; + } + @Override public void close() { assertTrue(openPersistedStates.remove(this)); @@ -1153,19 +1158,6 @@ protected Optional getDisruptableMockTransport(Transpo (dn, cs) -> extraJoinValidators.forEach(validator -> validator.accept(dn, cs)) ); final AllocationService allocationService = OpenSearchAllocationTestCase.createAllocationService(Settings.EMPTY); - RemoteClusterStateService remoteClusterStateService; - if (isRemoteStoreClusterStateEnabled(settings)) { - remoteClusterStateService = new RemoteClusterStateService( - localNode.getId(), - new SetOnce<>(repositoriesService)::get, - settings, - clusterService.getClusterSettings(), - threadPool::preciseRelativeTimeInNanos, - threadPool - ); - } else { - remoteClusterStateService = null; - } coordinator = new Coordinator( "test_node", settings, @@ -1183,8 +1175,7 @@ protected Optional getDisruptableMockTransport(Transpo getElectionStrategy(), nodeHealthService, persistedStateRegistry, - remoteStoreNodeService, - remoteClusterStateService + remoteStoreNodeService ); clusterManagerService.setClusterStatePublisher(coordinator); final GatewayService gatewayService = new GatewayService(