diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/state/TransportClusterStateAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/state/TransportClusterStateAction.java index 13ea7eaa43bf8..da91d273f3bce 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/state/TransportClusterStateAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/state/TransportClusterStateAction.java @@ -46,10 +46,12 @@ import org.opensearch.cluster.metadata.Metadata.Custom; import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.Nullable; import org.opensearch.common.inject.Inject; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.node.NodeClosedException; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; @@ -80,7 +82,8 @@ public TransportClusterStateAction( ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver + IndexNameExpressionResolver indexNameExpressionResolver, + @Nullable RemoteClusterStateService remoteClusterStateService ) { super( ClusterStateAction.NAME, @@ -93,6 +96,7 @@ public TransportClusterStateAction( indexNameExpressionResolver ); this.localExecuteSupported = true; + this.remoteClusterStateService = remoteClusterStateService; } @Override diff --git a/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java b/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java index 4e869f29878cd..ea78dd4a2873d 100644 --- a/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java +++ b/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java @@ -49,6 +49,7 @@ import org.opensearch.cluster.ClusterStateObserver; import org.opensearch.cluster.NotClusterManagerException; import org.opensearch.cluster.block.ClusterBlockException; +import org.opensearch.cluster.coordination.ClusterStateTermVersion; import org.opensearch.cluster.coordination.FailedToCommitClusterStateException; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.metadata.ProcessClusterEventTimeoutException; @@ -63,6 +64,9 @@ import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.discovery.ClusterManagerNotDiscoveredException; +import org.opensearch.gateway.remote.ClusterMetadataManifest; +import org.opensearch.gateway.remote.RemoteClusterStateService; +import org.opensearch.gateway.remote.RemoteManifestManager; import org.opensearch.node.NodeClosedException; import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; import org.opensearch.tasks.Task; @@ -94,6 +98,7 @@ public abstract class TransportClusterManagerNodeAction joinWithDestination(Optional lastJoin, DiscoveryNode leader, long term) { if (lastJoin.isPresent() && lastJoin.get().targetMatches(leader) && lastJoin.get().getTerm() == term) { return lastJoin; diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterApplier.java b/server/src/main/java/org/opensearch/cluster/service/ClusterApplier.java index 5b3f7f1001779..2d95811b8c4da 100644 --- a/server/src/main/java/org/opensearch/cluster/service/ClusterApplier.java +++ b/server/src/main/java/org/opensearch/cluster/service/ClusterApplier.java @@ -49,6 +49,12 @@ public interface ClusterApplier { */ void setInitialState(ClusterState initialState); + /** + * Sets the publish state for the applier + * @param clusterState state published by cluster-manager + */ + void setPublishState(ClusterState clusterState); + /** * Method to invoke when a new cluster state is available to be applied * diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterApplierService.java b/server/src/main/java/org/opensearch/cluster/service/ClusterApplierService.java index b2548a8976c73..b12501599e26c 100644 --- a/server/src/main/java/org/opensearch/cluster/service/ClusterApplierService.java +++ b/server/src/main/java/org/opensearch/cluster/service/ClusterApplierService.java @@ -119,7 +119,7 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements private final Collection clusterStateListeners = new CopyOnWriteArrayList<>(); private final Map timeoutClusterStateListeners = new ConcurrentHashMap<>(); - + private final AtomicReference publishState = new AtomicReference<>(); // last published state private final AtomicReference state; // last applied state private final String nodeName; @@ -169,6 +169,11 @@ public void setInitialState(ClusterState initialState) { state.set(initialState); } + @Override + public void setPublishState(ClusterState clusterState) { + publishState.set(clusterState); + } + @Override protected synchronized void doStart() { Objects.requireNonNull(nodeConnectionsService, "please set the node connection service before starting"); @@ -233,6 +238,10 @@ public ClusterState state() { return clusterState; } + public ClusterState publishState() { + return publishState.get(); + } + /** * Returns true if the appliedClusterState is not null */ diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterService.java b/server/src/main/java/org/opensearch/cluster/service/ClusterService.java index c3c48dd8b87ef..31145b5a8fc21 100644 --- a/server/src/main/java/org/opensearch/cluster/service/ClusterService.java +++ b/server/src/main/java/org/opensearch/cluster/service/ClusterService.java @@ -183,6 +183,10 @@ public ClusterState state() { return clusterApplierService.state(); } + public ClusterState publishState() { + return clusterApplierService.publishState(); + } + /** * Adds a high priority applier of updated cluster states. */ diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 910f601a81ca8..e6c57c9c38cbf 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -17,6 +17,7 @@ import org.opensearch.cluster.Diff; import org.opensearch.cluster.DiffableUtils; import org.opensearch.cluster.block.ClusterBlocks; +import org.opensearch.cluster.coordination.ClusterStateTermVersion; import org.opensearch.cluster.coordination.CoordinationMetadata; import org.opensearch.cluster.metadata.DiffableStringMap; import org.opensearch.cluster.metadata.IndexMetadata; @@ -32,6 +33,7 @@ import org.opensearch.cluster.routing.remote.RemoteRoutingTableServiceFactory; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Nullable; +import org.opensearch.common.annotation.InternalApi; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobStore; import org.opensearch.common.settings.ClusterSettings; @@ -114,6 +116,7 @@ * * @opensearch.internal */ +@InternalApi public class RemoteClusterStateService implements Closeable { private static final Logger logger = LogManager.getLogger(RemoteClusterStateService.class); @@ -979,6 +982,8 @@ BlobStore getBlobStore() { return blobStoreRepository.blobStore(); } + AtomicReference lastDownloadState = new AtomicReference<>(); + /** * Fetch latest ClusterState from remote, including global metadata, index metadata and cluster state version * @@ -996,8 +1001,16 @@ public ClusterState getLatestClusterState(String clusterName, String clusterUUID String.format(Locale.ROOT, "Latest cluster metadata manifest is not present for the provided clusterUUID: %s", clusterUUID) ); } + ClusterStateTermVersion clusterStateTermVersion = new ClusterStateTermVersion( + new ClusterName(clusterName), + clusterUUID, + clusterMetadataManifest.get().getClusterTerm(), + clusterMetadataManifest.get().getStateVersion() + ); - return getClusterStateForManifest(clusterName, clusterMetadataManifest.get(), nodeId, includeEphemeral); + ClusterState state = getClusterStateForManifest(clusterName, clusterMetadataManifest.get(), nodeId, includeEphemeral); + lastDownloadState.set(state); + return state; } // package private for testing @@ -1311,8 +1324,29 @@ public ClusterState getClusterStateForManifest( String localNodeId, boolean includeEphemeral ) throws IOException { + + ClusterStateTermVersion clusterStateTermVersion = new ClusterStateTermVersion( + new ClusterName(clusterName), + manifest.getClusterUUID(), + manifest.getClusterTerm(), + manifest.getStateVersion() + ); + ClusterState lastState = lastDownloadState.get(); + if (lastState != null) { + ClusterStateTermVersion lastStateTermVersion = new ClusterStateTermVersion( + new ClusterName(clusterName), + lastState.stateUUID(), + lastState.term(), + lastState.version() + ); + if (clusterStateTermVersion.equals(lastStateTermVersion)) { + return lastState; + } + } + + ClusterState retState = null; if (manifest.onOrAfterCodecVersion(CODEC_V2)) { - return readClusterStateInParallel( + retState = readClusterStateInParallel( ClusterState.builder(new ClusterName(clusterName)).build(), manifest, manifest.getClusterUUID(), @@ -1354,9 +1388,23 @@ public ClusterState getClusterStateForManifest( ); Metadata.Builder mb = Metadata.builder(remoteGlobalMetadataManager.getGlobalMetadata(manifest.getClusterUUID(), manifest)); mb.indices(clusterState.metadata().indices()); - return ClusterState.builder(clusterState).metadata(mb).build(); + retState = ClusterState.builder(clusterState).metadata(mb).build(); } + setLastDownloadState(retState); + return retState; + } + private void setLastDownloadState(final ClusterState newState) { + lastDownloadState.getAndUpdate(oldState -> { + if (oldState == null) { + return newState; + } + if (newState.term() > oldState.term() && newState.version() > oldState.version()) { + return newState; + } else { + return oldState; + } + }); } public ClusterState getClusterStateUsingDiff(ClusterMetadataManifest manifest, ClusterState previousState, String localNodeId) @@ -1437,11 +1485,14 @@ public ClusterState getClusterStateUsingDiff(ClusterMetadataManifest manifest, C indexRoutingTables.remove(indexName); } - return clusterStateBuilder.stateUUID(manifest.getStateUUID()) + final ClusterState newState = clusterStateBuilder.stateUUID(manifest.getStateUUID()) .version(manifest.getStateVersion()) .metadata(metadataBuilder) .routingTable(new RoutingTable(manifest.getRoutingTableVersion(), indexRoutingTables)) .build(); + + setLastDownloadState(newState); + return newState; } /** diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteManifestManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteManifestManager.java index 0ccadd7dd18da..cee2e9278daa5 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteManifestManager.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteManifestManager.java @@ -288,7 +288,7 @@ private List getManifestFileNames(String clusterName, String clust } } - static String getManifestFilePrefixForTermVersion(long term, long version) { + public static String getManifestFilePrefixForTermVersion(long term, long version) { return String.join( DELIMITER, RemoteClusterMetadataManifest.MANIFEST, diff --git a/server/src/test/java/org/opensearch/cluster/coordination/NoOpClusterApplier.java b/server/src/test/java/org/opensearch/cluster/coordination/NoOpClusterApplier.java index 9b865ace3b082..e327cb19bdcab 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/NoOpClusterApplier.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/NoOpClusterApplier.java @@ -42,6 +42,11 @@ public void setInitialState(ClusterState initialState) { } + @Override + public void setPublishState(ClusterState clusterState) { + + } + @Override public void onNewClusterState(String source, Supplier clusterStateSupplier, ClusterApplyListener listener) { listener.onSuccess(source); diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 769dfeb37ff8d..183df37dc8c90 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -2412,7 +2412,8 @@ public void onFailure(final Exception e) { clusterService, threadPool, actionFilters, - indexNameExpressionResolver + indexNameExpressionResolver, + null ) ); actions.put(