Skip to content

Commit

Permalink
[Remote State] Upload incremental cluster state on master re-election (
Browse files Browse the repository at this point in the history
…#15145) (#15853)

* Upload incremental cluster state on master re-election

Signed-off-by: Shivansh Arora <[email protected]>
(cherry picked from commit cbdcbb7)
  • Loading branch information
shiv0408 authored Sep 9, 2024
1 parent 725ed36 commit c429861
Show file tree
Hide file tree
Showing 12 changed files with 677 additions and 126 deletions.
1 change: 1 addition & 0 deletions release-notes/opensearch.release-notes-2.17.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
- Reset DiscoveryNodes in all transport node actions request ([#15131](https://github.com/opensearch-project/OpenSearch/pull/15131))
- MultiTermQueries in keyword fields now default to `indexed` approach and gated behind cluster setting ([#15637](https://github.com/opensearch-project/OpenSearch/pull/15637))
- Static RemotePublication setting added, removed experimental feature flag ([#15478](https://github.com/opensearch-project/OpenSearch/pull/15478))
- [Remote Publication] Upload incremental cluster state on master re-election ([#15145](https://github.com/opensearch-project/OpenSearch/pull/15145))

### Dependencies
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,15 @@ public class RemoteStatePublicationIT extends RemoteStoreBaseIntegTestCase {
private static final String REMOTE_STATE_PREFIX = "!";
private static final String REMOTE_ROUTING_PREFIX = "_";
private boolean isRemoteStateEnabled = true;
private String isRemotePublicationEnabled = "true";
private boolean isRemotePublicationEnabled = true;
private boolean hasRemoteStateCharPrefix;
private boolean hasRemoteRoutingCharPrefix;

@Before
public void setup() {
asyncUploadMockFsRepo = false;
isRemoteStateEnabled = true;
isRemotePublicationEnabled = "true";
isRemotePublicationEnabled = true;
hasRemoteStateCharPrefix = randomBoolean();
hasRemoteRoutingCharPrefix = randomBoolean();
}
Expand Down Expand Up @@ -100,6 +100,7 @@ protected Settings nodeSettings(int nodeOrdinal) {
RemoteClusterStateService.REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_MODE_SETTING.getKey(),
RemoteClusterStateService.RemoteClusterStateValidationMode.FAILURE
)
.put(REMOTE_PUBLICATION_SETTING_KEY, isRemotePublicationEnabled)
.put(
RemoteClusterStateService.CLUSTER_REMOTE_STORE_STATE_PATH_PREFIX.getKey(),
hasRemoteStateCharPrefix ? REMOTE_STATE_PREFIX : ""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,29 +76,4 @@ protected void verifyRestoredData(Map<String, Long> indexStats, String indexName
protected void verifyRestoredData(Map<String, Long> indexStats, String indexName) throws Exception {
verifyRestoredData(indexStats, indexName, true);
}

public void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, String indices, int replicaCount, int shardCount) {
prepareCluster(numClusterManagerNodes, numDataOnlyNodes, indices, replicaCount, shardCount, Settings.EMPTY);
}

public void prepareCluster(
int numClusterManagerNodes,
int numDataOnlyNodes,
String indices,
int replicaCount,
int shardCount,
Settings settings
) {
prepareCluster(numClusterManagerNodes, numDataOnlyNodes, settings);
for (String index : indices.split(",")) {
createIndex(index, remoteStoreIndexSettings(replicaCount, shardCount));
ensureYellowAndNoInitializingShards(index);
ensureGreen(index);
}
}

public void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, Settings settings) {
internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes, settings);
internalCluster().startDataOnlyNodes(numDataOnlyNodes, settings);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -351,13 +351,7 @@ protected void restore(boolean restoreAllShards, String... indices) {
}

protected void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, String indices, int replicaCount, int shardCount) {
internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes);
internalCluster().startDataOnlyNodes(numDataOnlyNodes);
for (String index : indices.split(",")) {
createIndex(index, remoteStoreIndexSettings(replicaCount, shardCount));
ensureYellowAndNoInitializingShards(index);
ensureGreen(index);
}
prepareCluster(numClusterManagerNodes, numDataOnlyNodes, indices, replicaCount, shardCount, Settings.EMPTY);
}

protected void prepareCluster(
Expand All @@ -368,11 +362,16 @@ protected void prepareCluster(
int shardCount,
Settings settings
) {
internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes, settings);
internalCluster().startDataOnlyNodes(numDataOnlyNodes, settings);
prepareCluster(numClusterManagerNodes, numDataOnlyNodes, settings);
for (String index : indices.split(",")) {
createIndex(index, remoteStoreIndexSettings(replicaCount, shardCount));
ensureYellowAndNoInitializingShards(index);
ensureGreen(index);
}
}

protected void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, Settings settings) {
internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes, settings);
internalCluster().startDataOnlyNodes(numDataOnlyNodes, settings);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.gateway.remote.ClusterMetadataManifest;

import java.io.Closeable;
import java.io.IOException;
Expand Down Expand Up @@ -104,6 +105,7 @@ public CoordinationState(
.getLastAcceptedConfiguration();
this.publishVotes = new VoteCollection();
this.isRemoteStateEnabled = isRemoteStoreClusterStateEnabled(settings);
// ToDo: revisit this check while making the setting dynamic
this.isRemotePublicationEnabled = isRemoteStateEnabled
&& REMOTE_PUBLICATION_SETTING.get(settings)
&& localNode.isRemoteStatePublicationEnabled();
Expand Down Expand Up @@ -459,6 +461,9 @@ public PublishResponse handlePublishRequest(PublishRequest publishRequest) {
clusterState.term()
);
persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL).setLastAcceptedState(clusterState);
if (shouldUpdateRemotePersistedState(publishRequest)) {
updateRemotePersistedStateOnPublishRequest(publishRequest);
}
assert getLastAcceptedState() == clusterState;

return new PublishResponse(clusterState.term(), clusterState.version());
Expand Down Expand Up @@ -571,6 +576,9 @@ public void handleCommit(ApplyCommitRequest applyCommit) {
);

persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL).markLastAcceptedStateAsCommitted();
if (shouldCommitRemotePersistedState()) {
persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).markLastAcceptedStateAsCommitted();
}
assert getLastCommittedConfiguration().equals(getLastAcceptedConfiguration());
}

Expand Down Expand Up @@ -616,6 +624,33 @@ public void close() throws IOException {
IOUtils.close(persistedStateRegistry);
}

private boolean shouldUpdateRemotePersistedState(PublishRequest publishRequest) {
return persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE) != null
&& publishRequest.getAcceptedState().getNodes().isLocalNodeElectedClusterManager() == false;
}

private void updateRemotePersistedStateOnPublishRequest(PublishRequest publishRequest) {
if (publishRequest instanceof RemoteStatePublishRequest) {
persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).setLastAcceptedState(publishRequest.getAcceptedState());
persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE)
.setLastAcceptedManifest(((RemoteStatePublishRequest) publishRequest).getAcceptedManifest());
} else {
// We will end up here if PublishRequest was sent not using Remote Store even with remote persisted state on this node
persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).setLastAcceptedState(null);
persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).setLastAcceptedManifest(null);
}
}

private boolean shouldCommitRemotePersistedState() {
return persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE) != null
&& persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL)
.getLastAcceptedState()
.getNodes()
.isLocalNodeElectedClusterManager() == false
&& persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedState() != null
&& persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedManifest() != null;
}

/**
* Pluggable persistence layer for {@link CoordinationState}.
*
Expand Down Expand Up @@ -653,6 +688,22 @@ public interface PersistedState extends Closeable {
*/
PersistedStateStats getStats();

/**
* Returns the last accepted {@link ClusterMetadataManifest}.
*
* @return The last accepted {@link ClusterMetadataManifest}, or null if no manifest
* has been accepted yet.
*/
default ClusterMetadataManifest getLastAcceptedManifest() {
// return null by default, this method needs to be overridden wherever required
return null;
}

/**
* Sets the last accepted {@link ClusterMetadataManifest}.
*/
default void setLastAcceptedManifest(ClusterMetadataManifest manifest) {}

/**
* Marks the last accepted cluster state as committed.
* After a successful call to this method, {@link #getLastAcceptedState()} should return the last cluster state that was set,
Expand All @@ -661,14 +712,7 @@ public interface PersistedState extends Closeable {
*/
default void markLastAcceptedStateAsCommitted() {
final ClusterState lastAcceptedState = getLastAcceptedState();
Metadata.Builder metadataBuilder = null;
if (lastAcceptedState.getLastAcceptedConfiguration().equals(lastAcceptedState.getLastCommittedConfiguration()) == false) {
final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder(lastAcceptedState.coordinationMetadata())
.lastCommittedConfiguration(lastAcceptedState.getLastAcceptedConfiguration())
.build();
metadataBuilder = Metadata.builder(lastAcceptedState.metadata());
metadataBuilder.coordinationMetadata(coordinationMetadata);
}
Metadata.Builder metadataBuilder = commitVotingConfiguration(lastAcceptedState);
// if we receive a commit from a Zen1 cluster-manager that has not recovered its state yet,
// the cluster uuid might not been known yet.
assert lastAcceptedState.metadata().clusterUUID().equals(Metadata.UNKNOWN_CLUSTER_UUID) == false
Expand All @@ -693,6 +737,18 @@ default void markLastAcceptedStateAsCommitted() {
}
}

default Metadata.Builder commitVotingConfiguration(ClusterState lastAcceptedState) {
Metadata.Builder metadataBuilder = null;
if (lastAcceptedState.getLastAcceptedConfiguration().equals(lastAcceptedState.getLastCommittedConfiguration()) == false) {
final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder(lastAcceptedState.coordinationMetadata())
.lastCommittedConfiguration(lastAcceptedState.getLastAcceptedConfiguration())
.build();
metadataBuilder = Metadata.builder(lastAcceptedState.metadata());
metadataBuilder.coordinationMetadata(coordinationMetadata);
}
return metadataBuilder;
}

default void close() throws IOException {}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque
}
fullClusterStateReceivedCount.incrementAndGet();
logger.debug("received full cluster state version [{}] with size [{}]", incomingState.version(), request.bytes().length());
final PublishWithJoinResponse response = acceptState(incomingState);
final PublishWithJoinResponse response = acceptState(incomingState, null);
lastSeenClusterState.set(incomingState);
return response;
} else {
Expand Down Expand Up @@ -230,7 +230,7 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque
incomingState.stateUUID(),
request.bytes().length()
);
final PublishWithJoinResponse response = acceptState(incomingState);
final PublishWithJoinResponse response = acceptState(incomingState, null);
lastSeenClusterState.compareAndSet(lastSeen, incomingState);
return response;
}
Expand Down Expand Up @@ -281,7 +281,7 @@ PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest
true
);
fullClusterStateReceivedCount.incrementAndGet();
final PublishWithJoinResponse response = acceptState(clusterState);
final PublishWithJoinResponse response = acceptState(clusterState, manifest);
lastSeenClusterState.set(clusterState);
return response;
} else {
Expand All @@ -300,7 +300,7 @@ PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest
transportService.getLocalNode().getId()
);
compatibleClusterStateDiffReceivedCount.incrementAndGet();
final PublishWithJoinResponse response = acceptState(clusterState);
final PublishWithJoinResponse response = acceptState(clusterState, manifest);
lastSeenClusterState.compareAndSet(lastSeen, clusterState);
return response;
}
Expand All @@ -314,7 +314,7 @@ PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest
}
}

private PublishWithJoinResponse acceptState(ClusterState incomingState) {
private PublishWithJoinResponse acceptState(ClusterState incomingState, ClusterMetadataManifest manifest) {
// if the state is coming from the current node, use original request instead (see currentPublishRequestToSelf for explanation)
if (transportService.getLocalNode().equals(incomingState.nodes().getClusterManagerNode())) {
final PublishRequest publishRequest = currentPublishRequestToSelf.get();
Expand All @@ -324,6 +324,9 @@ private PublishWithJoinResponse acceptState(ClusterState incomingState) {
return handlePublishRequest.apply(publishRequest);
}
}
if (manifest != null) {
return handlePublishRequest.apply(new RemoteStatePublishRequest(incomingState, manifest));
}
return handlePublishRequest.apply(new PublishRequest(incomingState));
}

Expand Down Expand Up @@ -539,7 +542,7 @@ public String executor() {
}

public void sendClusterState(DiscoveryNode destination, ActionListener<PublishWithJoinResponse> listener) {
logger.info("sending cluster state over transport to node: {}", destination.getName());
logger.debug("sending cluster state over transport to node: {}", destination.getName());
if (sendFullVersion || previousState.nodes().nodeExists(destination) == false) {
logger.trace("sending full cluster state version [{}] to [{}]", newState.version(), destination);
sendFullClusterState(destination, listener);
Expand Down Expand Up @@ -639,7 +642,7 @@ public class RemotePublicationContext extends PublicationContext {
@Override
public void sendClusterState(final DiscoveryNode destination, final ActionListener<PublishWithJoinResponse> listener) {
try {
logger.info("sending remote cluster state to node: {}", destination.getName());
logger.debug("sending remote cluster state to node: {}", destination.getName());
final String manifestFileName = ((RemotePersistedState) persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE))
.getLastUploadedManifestFile();
final RemotePublishRequest remotePublishRequest = new RemotePublishRequest(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.coordination;

import org.opensearch.cluster.ClusterState;
import org.opensearch.gateway.remote.ClusterMetadataManifest;

import java.util.Objects;

/**
* PublishRequest created by downloading the accepted {@link ClusterState} from Remote Store, using the published {@link ClusterMetadataManifest}
*
* @opensearch.internal
*/
public class RemoteStatePublishRequest extends PublishRequest {
private final ClusterMetadataManifest manifest;

public RemoteStatePublishRequest(ClusterState acceptedState, ClusterMetadataManifest acceptedManifest) {
super(acceptedState);
this.manifest = acceptedManifest;
}

public ClusterMetadataManifest getAcceptedManifest() {
return manifest;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
RemoteStatePublishRequest that = (RemoteStatePublishRequest) o;
return Objects.equals(manifest, that.manifest);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), manifest);
}

@Override
public String toString() {
return "RemoteStatePublishRequest{" + super.toString() + "manifest=" + manifest + "} ";
}
}
Loading

0 comments on commit c429861

Please sign in to comment.