Skip to content

Commit

Permalink
[Remote State] Upload incremental cluster state on master re-election (
Browse files Browse the repository at this point in the history
…opensearch-project#15145)

* Upload incremental cluster state on master re-election

Signed-off-by: Shivansh Arora <[email protected]>
  • Loading branch information
shiv0408 authored and dk2k committed Oct 21, 2024
1 parent 56f0a32 commit 5268d53
Show file tree
Hide file tree
Showing 12 changed files with 732 additions and 126 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Relax the join validation for Remote State publication ([#15471](https://github.com/opensearch-project/OpenSearch/pull/15471))
- Static RemotePublication setting added, removed experimental feature flag ([#15478](https://github.com/opensearch-project/OpenSearch/pull/15478))
- MultiTermQueries in keyword fields now default to `indexed` approach and gated behind cluster setting ([#15637](https://github.com/opensearch-project/OpenSearch/pull/15637))
- [Remote Publication] Upload incremental cluster state on master re-election ([#15145](https://github.com/opensearch-project/OpenSearch/pull/15145))
- Making _cat/allocation API use indexLevelStats ([#15292](https://github.com/opensearch-project/OpenSearch/pull/15292))

### Dependencies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.settings.Settings;
import org.opensearch.discovery.DiscoveryStats;
import org.opensearch.gateway.GatewayMetaState;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata;
import org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest;
import org.opensearch.gateway.remote.model.RemoteRoutingTableBlobStore;
Expand All @@ -43,6 +44,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -74,15 +76,15 @@ public class RemoteStatePublicationIT extends RemoteStoreBaseIntegTestCase {
private static final String REMOTE_STATE_PREFIX = "!";
private static final String REMOTE_ROUTING_PREFIX = "_";
private boolean isRemoteStateEnabled = true;
private String isRemotePublicationEnabled = "true";
private boolean isRemotePublicationEnabled = true;
private boolean hasRemoteStateCharPrefix;
private boolean hasRemoteRoutingCharPrefix;

@Before
public void setup() {
asyncUploadMockFsRepo = false;
isRemoteStateEnabled = true;
isRemotePublicationEnabled = "true";
isRemotePublicationEnabled = true;
hasRemoteStateCharPrefix = randomBoolean();
hasRemoteRoutingCharPrefix = randomBoolean();
}
Expand Down Expand Up @@ -112,6 +114,7 @@ protected Settings nodeSettings(int nodeOrdinal) {
RemoteClusterStateService.REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_MODE_SETTING.getKey(),
RemoteClusterStateService.RemoteClusterStateValidationMode.FAILURE
)
.put(REMOTE_PUBLICATION_SETTING_KEY, isRemotePublicationEnabled)
.put(
RemoteClusterStateService.CLUSTER_REMOTE_STORE_STATE_PATH_PREFIX.getKey(),
hasRemoteStateCharPrefix ? REMOTE_STATE_PREFIX : ""
Expand Down Expand Up @@ -341,6 +344,59 @@ public void doAfterNodes(int n, Client client) {
});
}

public void testMasterReElectionUsesIncrementalUpload() throws IOException {
prepareCluster(3, 2, INDEX_NAME, 1, 1);
PersistedStateRegistry persistedStateRegistry = internalCluster().getClusterManagerNodeInstance(PersistedStateRegistry.class);
GatewayMetaState.RemotePersistedState remotePersistedState = (GatewayMetaState.RemotePersistedState) persistedStateRegistry
.getPersistedState(PersistedStateRegistry.PersistedStateType.REMOTE);
ClusterMetadataManifest manifest = remotePersistedState.getLastAcceptedManifest();
// force elected master to step down
internalCluster().stopCurrentClusterManagerNode();
ensureStableCluster(4);

persistedStateRegistry = internalCluster().getClusterManagerNodeInstance(PersistedStateRegistry.class);
CoordinationState.PersistedState persistedStateAfterElection = persistedStateRegistry.getPersistedState(
PersistedStateRegistry.PersistedStateType.REMOTE
);
ClusterMetadataManifest manifestAfterElection = persistedStateAfterElection.getLastAcceptedManifest();

// coordination metadata is updated, it will be unequal
assertNotEquals(manifest.getCoordinationMetadata(), manifestAfterElection.getCoordinationMetadata());
// all other attributes are not uploaded again and will be pointing to same files in manifest after new master is elected
assertEquals(manifest.getClusterUUID(), manifestAfterElection.getClusterUUID());
assertEquals(manifest.getIndices(), manifestAfterElection.getIndices());
assertEquals(manifest.getSettingsMetadata(), manifestAfterElection.getSettingsMetadata());
assertEquals(manifest.getTemplatesMetadata(), manifestAfterElection.getTemplatesMetadata());
assertEquals(manifest.getCustomMetadataMap(), manifestAfterElection.getCustomMetadataMap());
assertEquals(manifest.getRoutingTableVersion(), manifest.getRoutingTableVersion());
assertEquals(manifest.getIndicesRouting(), manifestAfterElection.getIndicesRouting());
}

public void testVotingConfigAreCommitted() throws ExecutionException, InterruptedException {
prepareCluster(3, 2, INDEX_NAME, 1, 2);
ensureStableCluster(5);
ensureGreen(INDEX_NAME);
// add two new nodes to the cluster, to update the voting config
internalCluster().startClusterManagerOnlyNodes(2, Settings.EMPTY);
ensureStableCluster(7);

internalCluster().getInstances(PersistedStateRegistry.class).forEach(persistedStateRegistry -> {
CoordinationState.PersistedState localState = persistedStateRegistry.getPersistedState(
PersistedStateRegistry.PersistedStateType.LOCAL
);
CoordinationState.PersistedState remoteState = persistedStateRegistry.getPersistedState(
PersistedStateRegistry.PersistedStateType.REMOTE
);
if (remoteState != null) {
assertEquals(
localState.getLastAcceptedState().getLastCommittedConfiguration(),
remoteState.getLastAcceptedState().getLastCommittedConfiguration()
);
assertEquals(5, remoteState.getLastAcceptedState().getLastCommittedConfiguration().getNodeIds().size());
}
});
}

private void assertDataNodeDownloadStats(NodesStatsResponse nodesStatsResponse) {
// assert cluster state stats for data node
DiscoveryStats dataNodeDiscoveryStats = nodesStatsResponse.getNodes().get(0).getDiscoveryStats();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,29 +76,4 @@ protected void verifyRestoredData(Map<String, Long> indexStats, String indexName
protected void verifyRestoredData(Map<String, Long> indexStats, String indexName) throws Exception {
verifyRestoredData(indexStats, indexName, true);
}

public void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, String indices, int replicaCount, int shardCount) {
prepareCluster(numClusterManagerNodes, numDataOnlyNodes, indices, replicaCount, shardCount, Settings.EMPTY);
}

public void prepareCluster(
int numClusterManagerNodes,
int numDataOnlyNodes,
String indices,
int replicaCount,
int shardCount,
Settings settings
) {
prepareCluster(numClusterManagerNodes, numDataOnlyNodes, settings);
for (String index : indices.split(",")) {
createIndex(index, remoteStoreIndexSettings(replicaCount, shardCount));
ensureYellowAndNoInitializingShards(index);
ensureGreen(index);
}
}

public void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, Settings settings) {
internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes, settings);
internalCluster().startDataOnlyNodes(numDataOnlyNodes, settings);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -351,13 +351,7 @@ protected void restore(boolean restoreAllShards, String... indices) {
}

protected void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, String indices, int replicaCount, int shardCount) {
internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes);
internalCluster().startDataOnlyNodes(numDataOnlyNodes);
for (String index : indices.split(",")) {
createIndex(index, remoteStoreIndexSettings(replicaCount, shardCount));
ensureYellowAndNoInitializingShards(index);
ensureGreen(index);
}
prepareCluster(numClusterManagerNodes, numDataOnlyNodes, indices, replicaCount, shardCount, Settings.EMPTY);
}

protected void prepareCluster(
Expand All @@ -368,11 +362,16 @@ protected void prepareCluster(
int shardCount,
Settings settings
) {
internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes, settings);
internalCluster().startDataOnlyNodes(numDataOnlyNodes, settings);
prepareCluster(numClusterManagerNodes, numDataOnlyNodes, settings);
for (String index : indices.split(",")) {
createIndex(index, remoteStoreIndexSettings(replicaCount, shardCount));
ensureYellowAndNoInitializingShards(index);
ensureGreen(index);
}
}

protected void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, Settings settings) {
internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes, settings);
internalCluster().startDataOnlyNodes(numDataOnlyNodes, settings);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.gateway.remote.ClusterMetadataManifest;

import java.io.Closeable;
import java.io.IOException;
Expand Down Expand Up @@ -104,6 +105,7 @@ public CoordinationState(
.getLastAcceptedConfiguration();
this.publishVotes = new VoteCollection();
this.isRemoteStateEnabled = isRemoteStoreClusterStateEnabled(settings);
// ToDo: revisit this check while making the setting dynamic
this.isRemotePublicationEnabled = isRemoteStateEnabled
&& REMOTE_PUBLICATION_SETTING.get(settings)
&& localNode.isRemoteStatePublicationEnabled();
Expand Down Expand Up @@ -459,6 +461,9 @@ public PublishResponse handlePublishRequest(PublishRequest publishRequest) {
clusterState.term()
);
persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL).setLastAcceptedState(clusterState);
if (shouldUpdateRemotePersistedState(publishRequest)) {
updateRemotePersistedStateOnPublishRequest(publishRequest);
}
assert getLastAcceptedState() == clusterState;

return new PublishResponse(clusterState.term(), clusterState.version());
Expand Down Expand Up @@ -571,6 +576,9 @@ public void handleCommit(ApplyCommitRequest applyCommit) {
);

persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL).markLastAcceptedStateAsCommitted();
if (shouldCommitRemotePersistedState()) {
persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).markLastAcceptedStateAsCommitted();
}
assert getLastCommittedConfiguration().equals(getLastAcceptedConfiguration());
}

Expand Down Expand Up @@ -616,6 +624,33 @@ public void close() throws IOException {
IOUtils.close(persistedStateRegistry);
}

private boolean shouldUpdateRemotePersistedState(PublishRequest publishRequest) {
return persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE) != null
&& publishRequest.getAcceptedState().getNodes().isLocalNodeElectedClusterManager() == false;
}

private void updateRemotePersistedStateOnPublishRequest(PublishRequest publishRequest) {
if (publishRequest instanceof RemoteStatePublishRequest) {
persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).setLastAcceptedState(publishRequest.getAcceptedState());
persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE)
.setLastAcceptedManifest(((RemoteStatePublishRequest) publishRequest).getAcceptedManifest());
} else {
// We will end up here if PublishRequest was sent not using Remote Store even with remote persisted state on this node
persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).setLastAcceptedState(null);
persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).setLastAcceptedManifest(null);
}
}

private boolean shouldCommitRemotePersistedState() {
return persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE) != null
&& persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL)
.getLastAcceptedState()
.getNodes()
.isLocalNodeElectedClusterManager() == false
&& persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedState() != null
&& persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedManifest() != null;
}

/**
* Pluggable persistence layer for {@link CoordinationState}.
*
Expand Down Expand Up @@ -653,6 +688,22 @@ public interface PersistedState extends Closeable {
*/
PersistedStateStats getStats();

/**
* Returns the last accepted {@link ClusterMetadataManifest}.
*
* @return The last accepted {@link ClusterMetadataManifest}, or null if no manifest
* has been accepted yet.
*/
default ClusterMetadataManifest getLastAcceptedManifest() {
// return null by default, this method needs to be overridden wherever required
return null;
}

/**
* Sets the last accepted {@link ClusterMetadataManifest}.
*/
default void setLastAcceptedManifest(ClusterMetadataManifest manifest) {}

/**
* Marks the last accepted cluster state as committed.
* After a successful call to this method, {@link #getLastAcceptedState()} should return the last cluster state that was set,
Expand All @@ -661,14 +712,7 @@ public interface PersistedState extends Closeable {
*/
default void markLastAcceptedStateAsCommitted() {
final ClusterState lastAcceptedState = getLastAcceptedState();
Metadata.Builder metadataBuilder = null;
if (lastAcceptedState.getLastAcceptedConfiguration().equals(lastAcceptedState.getLastCommittedConfiguration()) == false) {
final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder(lastAcceptedState.coordinationMetadata())
.lastCommittedConfiguration(lastAcceptedState.getLastAcceptedConfiguration())
.build();
metadataBuilder = Metadata.builder(lastAcceptedState.metadata());
metadataBuilder.coordinationMetadata(coordinationMetadata);
}
Metadata.Builder metadataBuilder = commitVotingConfiguration(lastAcceptedState);
// if we receive a commit from a Zen1 cluster-manager that has not recovered its state yet,
// the cluster uuid might not been known yet.
assert lastAcceptedState.metadata().clusterUUID().equals(Metadata.UNKNOWN_CLUSTER_UUID) == false
Expand All @@ -693,6 +737,18 @@ default void markLastAcceptedStateAsCommitted() {
}
}

default Metadata.Builder commitVotingConfiguration(ClusterState lastAcceptedState) {
Metadata.Builder metadataBuilder = null;
if (lastAcceptedState.getLastAcceptedConfiguration().equals(lastAcceptedState.getLastCommittedConfiguration()) == false) {
final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder(lastAcceptedState.coordinationMetadata())
.lastCommittedConfiguration(lastAcceptedState.getLastAcceptedConfiguration())
.build();
metadataBuilder = Metadata.builder(lastAcceptedState.metadata());
metadataBuilder.coordinationMetadata(coordinationMetadata);
}
return metadataBuilder;
}

default void close() throws IOException {}
}

Expand Down
Loading

0 comments on commit 5268d53

Please sign in to comment.