Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] [Remote State] Upload incremental cluster state on master re-election (#15145) #15853

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions release-notes/opensearch.release-notes-2.17.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
- Reset DiscoveryNodes in all transport node actions request ([#15131](https://github.com/opensearch-project/OpenSearch/pull/15131))
- MultiTermQueries in keyword fields now default to `indexed` approach and gated behind cluster setting ([#15637](https://github.com/opensearch-project/OpenSearch/pull/15637))
- Static RemotePublication setting added, removed experimental feature flag ([#15478](https://github.com/opensearch-project/OpenSearch/pull/15478))
- [Remote Publication] Upload incremental cluster state on master re-election ([#15145](https://github.com/opensearch-project/OpenSearch/pull/15145))

### Dependencies
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,15 @@ public class RemoteStatePublicationIT extends RemoteStoreBaseIntegTestCase {
private static final String REMOTE_STATE_PREFIX = "!";
private static final String REMOTE_ROUTING_PREFIX = "_";
private boolean isRemoteStateEnabled = true;
private String isRemotePublicationEnabled = "true";
private boolean isRemotePublicationEnabled = true;
private boolean hasRemoteStateCharPrefix;
private boolean hasRemoteRoutingCharPrefix;

@Before
public void setup() {
asyncUploadMockFsRepo = false;
isRemoteStateEnabled = true;
isRemotePublicationEnabled = "true";
isRemotePublicationEnabled = true;
hasRemoteStateCharPrefix = randomBoolean();
hasRemoteRoutingCharPrefix = randomBoolean();
}
Expand Down Expand Up @@ -100,6 +100,7 @@ protected Settings nodeSettings(int nodeOrdinal) {
RemoteClusterStateService.REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_MODE_SETTING.getKey(),
RemoteClusterStateService.RemoteClusterStateValidationMode.FAILURE
)
.put(REMOTE_PUBLICATION_SETTING_KEY, isRemotePublicationEnabled)
.put(
RemoteClusterStateService.CLUSTER_REMOTE_STORE_STATE_PATH_PREFIX.getKey(),
hasRemoteStateCharPrefix ? REMOTE_STATE_PREFIX : ""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,29 +76,4 @@ protected void verifyRestoredData(Map<String, Long> indexStats, String indexName
protected void verifyRestoredData(Map<String, Long> indexStats, String indexName) throws Exception {
verifyRestoredData(indexStats, indexName, true);
}

public void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, String indices, int replicaCount, int shardCount) {
prepareCluster(numClusterManagerNodes, numDataOnlyNodes, indices, replicaCount, shardCount, Settings.EMPTY);
}

public void prepareCluster(
int numClusterManagerNodes,
int numDataOnlyNodes,
String indices,
int replicaCount,
int shardCount,
Settings settings
) {
prepareCluster(numClusterManagerNodes, numDataOnlyNodes, settings);
for (String index : indices.split(",")) {
createIndex(index, remoteStoreIndexSettings(replicaCount, shardCount));
ensureYellowAndNoInitializingShards(index);
ensureGreen(index);
}
}

public void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, Settings settings) {
internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes, settings);
internalCluster().startDataOnlyNodes(numDataOnlyNodes, settings);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -351,13 +351,7 @@ protected void restore(boolean restoreAllShards, String... indices) {
}

protected void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, String indices, int replicaCount, int shardCount) {
internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes);
internalCluster().startDataOnlyNodes(numDataOnlyNodes);
for (String index : indices.split(",")) {
createIndex(index, remoteStoreIndexSettings(replicaCount, shardCount));
ensureYellowAndNoInitializingShards(index);
ensureGreen(index);
}
prepareCluster(numClusterManagerNodes, numDataOnlyNodes, indices, replicaCount, shardCount, Settings.EMPTY);
}

protected void prepareCluster(
Expand All @@ -368,11 +362,16 @@ protected void prepareCluster(
int shardCount,
Settings settings
) {
internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes, settings);
internalCluster().startDataOnlyNodes(numDataOnlyNodes, settings);
prepareCluster(numClusterManagerNodes, numDataOnlyNodes, settings);
for (String index : indices.split(",")) {
createIndex(index, remoteStoreIndexSettings(replicaCount, shardCount));
ensureYellowAndNoInitializingShards(index);
ensureGreen(index);
}
}

protected void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, Settings settings) {
internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes, settings);
internalCluster().startDataOnlyNodes(numDataOnlyNodes, settings);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.gateway.remote.ClusterMetadataManifest;

import java.io.Closeable;
import java.io.IOException;
Expand Down Expand Up @@ -104,6 +105,7 @@
.getLastAcceptedConfiguration();
this.publishVotes = new VoteCollection();
this.isRemoteStateEnabled = isRemoteStoreClusterStateEnabled(settings);
// ToDo: revisit this check while making the setting dynamic
this.isRemotePublicationEnabled = isRemoteStateEnabled
&& REMOTE_PUBLICATION_SETTING.get(settings)
&& localNode.isRemoteStatePublicationEnabled();
Expand Down Expand Up @@ -459,6 +461,9 @@
clusterState.term()
);
persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL).setLastAcceptedState(clusterState);
if (shouldUpdateRemotePersistedState(publishRequest)) {
updateRemotePersistedStateOnPublishRequest(publishRequest);
}
assert getLastAcceptedState() == clusterState;

return new PublishResponse(clusterState.term(), clusterState.version());
Expand Down Expand Up @@ -571,6 +576,9 @@
);

persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL).markLastAcceptedStateAsCommitted();
if (shouldCommitRemotePersistedState()) {
persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).markLastAcceptedStateAsCommitted();
}
assert getLastCommittedConfiguration().equals(getLastAcceptedConfiguration());
}

Expand Down Expand Up @@ -616,6 +624,33 @@
IOUtils.close(persistedStateRegistry);
}

private boolean shouldUpdateRemotePersistedState(PublishRequest publishRequest) {
return persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE) != null
&& publishRequest.getAcceptedState().getNodes().isLocalNodeElectedClusterManager() == false;
}

private void updateRemotePersistedStateOnPublishRequest(PublishRequest publishRequest) {
if (publishRequest instanceof RemoteStatePublishRequest) {
persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).setLastAcceptedState(publishRequest.getAcceptedState());
persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE)
.setLastAcceptedManifest(((RemoteStatePublishRequest) publishRequest).getAcceptedManifest());
} else {
// We will end up here if PublishRequest was sent not using Remote Store even with remote persisted state on this node
persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).setLastAcceptedState(null);
persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).setLastAcceptedManifest(null);
}
}

private boolean shouldCommitRemotePersistedState() {
return persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE) != null
&& persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL)
.getLastAcceptedState()
.getNodes()
.isLocalNodeElectedClusterManager() == false
&& persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedState() != null
&& persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedManifest() != null;
}

/**
* Pluggable persistence layer for {@link CoordinationState}.
*
Expand Down Expand Up @@ -653,6 +688,22 @@
*/
PersistedStateStats getStats();

/**
* Returns the last accepted {@link ClusterMetadataManifest}.
*
* @return The last accepted {@link ClusterMetadataManifest}, or null if no manifest
* has been accepted yet.
*/
default ClusterMetadataManifest getLastAcceptedManifest() {
// return null by default, this method needs to be overridden wherever required
return null;

Check warning on line 699 in server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java#L699

Added line #L699 was not covered by tests
}

/**
* Sets the last accepted {@link ClusterMetadataManifest}.
*/
default void setLastAcceptedManifest(ClusterMetadataManifest manifest) {}

Check warning on line 705 in server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java#L705

Added line #L705 was not covered by tests

/**
* Marks the last accepted cluster state as committed.
* After a successful call to this method, {@link #getLastAcceptedState()} should return the last cluster state that was set,
Expand All @@ -661,14 +712,7 @@
*/
default void markLastAcceptedStateAsCommitted() {
final ClusterState lastAcceptedState = getLastAcceptedState();
Metadata.Builder metadataBuilder = null;
if (lastAcceptedState.getLastAcceptedConfiguration().equals(lastAcceptedState.getLastCommittedConfiguration()) == false) {
final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder(lastAcceptedState.coordinationMetadata())
.lastCommittedConfiguration(lastAcceptedState.getLastAcceptedConfiguration())
.build();
metadataBuilder = Metadata.builder(lastAcceptedState.metadata());
metadataBuilder.coordinationMetadata(coordinationMetadata);
}
Metadata.Builder metadataBuilder = commitVotingConfiguration(lastAcceptedState);
// if we receive a commit from a Zen1 cluster-manager that has not recovered its state yet,
// the cluster uuid might not been known yet.
assert lastAcceptedState.metadata().clusterUUID().equals(Metadata.UNKNOWN_CLUSTER_UUID) == false
Expand All @@ -693,6 +737,18 @@
}
}

default Metadata.Builder commitVotingConfiguration(ClusterState lastAcceptedState) {
Metadata.Builder metadataBuilder = null;
if (lastAcceptedState.getLastAcceptedConfiguration().equals(lastAcceptedState.getLastCommittedConfiguration()) == false) {
final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder(lastAcceptedState.coordinationMetadata())
.lastCommittedConfiguration(lastAcceptedState.getLastAcceptedConfiguration())
.build();
metadataBuilder = Metadata.builder(lastAcceptedState.metadata());
metadataBuilder.coordinationMetadata(coordinationMetadata);
}
return metadataBuilder;
}

default void close() throws IOException {}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@
}
fullClusterStateReceivedCount.incrementAndGet();
logger.debug("received full cluster state version [{}] with size [{}]", incomingState.version(), request.bytes().length());
final PublishWithJoinResponse response = acceptState(incomingState);
final PublishWithJoinResponse response = acceptState(incomingState, null);
lastSeenClusterState.set(incomingState);
return response;
} else {
Expand Down Expand Up @@ -230,7 +230,7 @@
incomingState.stateUUID(),
request.bytes().length()
);
final PublishWithJoinResponse response = acceptState(incomingState);
final PublishWithJoinResponse response = acceptState(incomingState, null);
lastSeenClusterState.compareAndSet(lastSeen, incomingState);
return response;
}
Expand Down Expand Up @@ -281,7 +281,7 @@
true
);
fullClusterStateReceivedCount.incrementAndGet();
final PublishWithJoinResponse response = acceptState(clusterState);
final PublishWithJoinResponse response = acceptState(clusterState, manifest);
lastSeenClusterState.set(clusterState);
return response;
} else {
Expand All @@ -300,7 +300,7 @@
transportService.getLocalNode().getId()
);
compatibleClusterStateDiffReceivedCount.incrementAndGet();
final PublishWithJoinResponse response = acceptState(clusterState);
final PublishWithJoinResponse response = acceptState(clusterState, manifest);

Check warning on line 303 in server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java#L303

Added line #L303 was not covered by tests
lastSeenClusterState.compareAndSet(lastSeen, clusterState);
return response;
}
Expand All @@ -314,7 +314,7 @@
}
}

private PublishWithJoinResponse acceptState(ClusterState incomingState) {
private PublishWithJoinResponse acceptState(ClusterState incomingState, ClusterMetadataManifest manifest) {
// if the state is coming from the current node, use original request instead (see currentPublishRequestToSelf for explanation)
if (transportService.getLocalNode().equals(incomingState.nodes().getClusterManagerNode())) {
final PublishRequest publishRequest = currentPublishRequestToSelf.get();
Expand All @@ -324,6 +324,9 @@
return handlePublishRequest.apply(publishRequest);
}
}
if (manifest != null) {
return handlePublishRequest.apply(new RemoteStatePublishRequest(incomingState, manifest));
}
return handlePublishRequest.apply(new PublishRequest(incomingState));
}

Expand Down Expand Up @@ -539,7 +542,7 @@
}

public void sendClusterState(DiscoveryNode destination, ActionListener<PublishWithJoinResponse> listener) {
logger.info("sending cluster state over transport to node: {}", destination.getName());
logger.debug("sending cluster state over transport to node: {}", destination.getName());
if (sendFullVersion || previousState.nodes().nodeExists(destination) == false) {
logger.trace("sending full cluster state version [{}] to [{}]", newState.version(), destination);
sendFullClusterState(destination, listener);
Expand Down Expand Up @@ -639,7 +642,7 @@
@Override
public void sendClusterState(final DiscoveryNode destination, final ActionListener<PublishWithJoinResponse> listener) {
try {
logger.info("sending remote cluster state to node: {}", destination.getName());
logger.debug("sending remote cluster state to node: {}", destination.getName());
final String manifestFileName = ((RemotePersistedState) persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE))
.getLastUploadedManifestFile();
final RemotePublishRequest remotePublishRequest = new RemotePublishRequest(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.coordination;

import org.opensearch.cluster.ClusterState;
import org.opensearch.gateway.remote.ClusterMetadataManifest;

import java.util.Objects;

/**
* PublishRequest created by downloading the accepted {@link ClusterState} from Remote Store, using the published {@link ClusterMetadataManifest}
*
* @opensearch.internal
*/
public class RemoteStatePublishRequest extends PublishRequest {
private final ClusterMetadataManifest manifest;

public RemoteStatePublishRequest(ClusterState acceptedState, ClusterMetadataManifest acceptedManifest) {
super(acceptedState);
this.manifest = acceptedManifest;
}

public ClusterMetadataManifest getAcceptedManifest() {
return manifest;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
RemoteStatePublishRequest that = (RemoteStatePublishRequest) o;
return Objects.equals(manifest, that.manifest);

Check warning on line 39 in server/src/main/java/org/opensearch/cluster/coordination/RemoteStatePublishRequest.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/coordination/RemoteStatePublishRequest.java#L38-L39

Added lines #L38 - L39 were not covered by tests
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), manifest);

Check warning on line 44 in server/src/main/java/org/opensearch/cluster/coordination/RemoteStatePublishRequest.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/coordination/RemoteStatePublishRequest.java#L44

Added line #L44 was not covered by tests
}

@Override
public String toString() {
return "RemoteStatePublishRequest{" + super.toString() + "manifest=" + manifest + "} ";

Check warning on line 49 in server/src/main/java/org/opensearch/cluster/coordination/RemoteStatePublishRequest.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/coordination/RemoteStatePublishRequest.java#L49

Added line #L49 was not covered by tests
}
}
Loading
Loading