Skip to content

Commit

Permalink
Upload incremental cluster state on master re-election
Browse files Browse the repository at this point in the history
Signed-off-by: Shivansh Arora <[email protected]>
  • Loading branch information
shiv0408 committed Aug 9, 2024
1 parent 170ea27 commit e6633da
Show file tree
Hide file tree
Showing 11 changed files with 559 additions and 120 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway.remote;

import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.cluster.coordination.PersistedStateRegistry;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.gateway.GatewayMetaState;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
import org.opensearch.repositories.fs.ReloadableFsRepository;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.Before;

import java.io.IOException;
import java.util.Locale;

import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteStatePublicationIT extends RemoteStoreBaseIntegTestCase {

private static String INDEX_NAME = "test-index";

@Before
public void setup() {
asyncUploadMockFsRepo = false;
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL, "true").build();
}

@Override
protected Settings nodeSettings(int nodeOrdinal) {
String routingTableRepoName = "remote-routing-repo";
String routingTableRepoTypeAttributeKey = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
routingTableRepoName
);
String routingTableRepoSettingsAttributeKeyPrefix = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
routingTableRepoName
);
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true)
.put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, routingTableRepoName)
.put(routingTableRepoTypeAttributeKey, ReloadableFsRepository.TYPE)
.put(routingTableRepoSettingsAttributeKeyPrefix + "location", segmentRepoPath)
.build();
}

public void testMasterReElectionUsesIncrementalUpload() throws IOException {
prepareCluster(3, 2, INDEX_NAME, 1, 1);
ensureStableCluster(5);
ensureGreen(INDEX_NAME);
// update settings on a random node
assertAcked(
internalCluster().client()
.admin()
.cluster()
.updateSettings(
new ClusterUpdateSettingsRequest().persistentSettings(
Settings.builder().put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), "10mb").build()
)
)
.actionGet()
);
PersistedStateRegistry persistedStateRegistry = internalCluster().getClusterManagerNodeInstance(PersistedStateRegistry.class);
GatewayMetaState.RemotePersistedState remotePersistedState = (GatewayMetaState.RemotePersistedState) persistedStateRegistry
.getPersistedState(PersistedStateRegistry.PersistedStateType.REMOTE);
ClusterMetadataManifest manifest = remotePersistedState.getLastAcceptedManifest();
// force elected master to step down
internalCluster().stopCurrentClusterManagerNode();
ensureStableCluster(4);

persistedStateRegistry = internalCluster().getClusterManagerNodeInstance(PersistedStateRegistry.class);
GatewayMetaState.RemotePersistedState persistedStateAfterElection = (GatewayMetaState.RemotePersistedState) persistedStateRegistry
.getPersistedState(PersistedStateRegistry.PersistedStateType.REMOTE);
ClusterMetadataManifest manifestAfterElection = persistedStateAfterElection.getLastAcceptedManifest();

// coordination metadata is updated, it will be unequal
assertNotEquals(manifest.getCoordinationMetadata(), manifestAfterElection.getCoordinationMetadata());
// all other attributes are not uploaded again and will be pointing to same files in manifest after new master is elected
assertEquals(manifest.getClusterUUID(), manifestAfterElection.getClusterUUID());
assertEquals(manifest.getIndices(), manifestAfterElection.getIndices());
assertEquals(manifest.getSettingsMetadata(), manifestAfterElection.getSettingsMetadata());
assertEquals(manifest.getTemplatesMetadata(), manifestAfterElection.getTemplatesMetadata());
assertEquals(manifest.getCustomMetadataMap(), manifestAfterElection.getCustomMetadataMap());
assertEquals(manifest.getRoutingTableVersion(), manifest.getRoutingTableVersion());
assertEquals(manifest.getIndicesRouting(), manifestAfterElection.getIndicesRouting());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,29 +76,4 @@ protected void verifyRestoredData(Map<String, Long> indexStats, String indexName
protected void verifyRestoredData(Map<String, Long> indexStats, String indexName) throws Exception {
verifyRestoredData(indexStats, indexName, true);
}

public void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, String indices, int replicaCount, int shardCount) {
prepareCluster(numClusterManagerNodes, numDataOnlyNodes, indices, replicaCount, shardCount, Settings.EMPTY);
}

public void prepareCluster(
int numClusterManagerNodes,
int numDataOnlyNodes,
String indices,
int replicaCount,
int shardCount,
Settings settings
) {
prepareCluster(numClusterManagerNodes, numDataOnlyNodes, settings);
for (String index : indices.split(",")) {
createIndex(index, remoteStoreIndexSettings(replicaCount, shardCount));
ensureYellowAndNoInitializingShards(index);
ensureGreen(index);
}
}

public void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, Settings settings) {
internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes, settings);
internalCluster().startDataOnlyNodes(numDataOnlyNodes, settings);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -353,13 +353,7 @@ protected void restore(boolean restoreAllShards, String... indices) {
}

protected void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, String indices, int replicaCount, int shardCount) {
internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes);
internalCluster().startDataOnlyNodes(numDataOnlyNodes);
for (String index : indices.split(",")) {
createIndex(index, remoteStoreIndexSettings(replicaCount, shardCount));
ensureYellowAndNoInitializingShards(index);
ensureGreen(index);
}
prepareCluster(numClusterManagerNodes, numDataOnlyNodes, indices, replicaCount, shardCount, Settings.EMPTY);
}

protected void prepareCluster(
Expand All @@ -370,11 +364,16 @@ protected void prepareCluster(
int shardCount,
Settings settings
) {
internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes, settings);
internalCluster().startDataOnlyNodes(numDataOnlyNodes, settings);
prepareCluster(numClusterManagerNodes, numDataOnlyNodes, settings);
for (String index : indices.split(",")) {
createIndex(index, remoteStoreIndexSettings(replicaCount, shardCount));
ensureYellowAndNoInitializingShards(index);
ensureGreen(index);
}
}

protected void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, Settings settings) {
internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes, settings);
internalCluster().startDataOnlyNodes(numDataOnlyNodes, settings);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.gateway.GatewayMetaState;

import java.io.Closeable;
import java.io.IOException;
Expand Down Expand Up @@ -459,6 +460,16 @@ public PublishResponse handlePublishRequest(PublishRequest publishRequest) {
clusterState.term()
);
persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL).setLastAcceptedState(clusterState);
// if Remote publication is enabled, we only need to update the accepted state in followers node as elected cluster manager would
// have already updated the last accepted state
if (isRemotePublicationEnabled
&& publishRequest.getManifest() != null
&& localNode.isClusterManagerNode()
&& clusterState.getNodes().isLocalNodeElectedClusterManager() == false) {
persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).setLastAcceptedState(clusterState);
((GatewayMetaState.RemotePersistedState) persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE))
.setLastAcceptedManifest(publishRequest.getManifest());
}
assert getLastAcceptedState() == clusterState;

return new PublishResponse(clusterState.term(), clusterState.version());
Expand Down Expand Up @@ -571,6 +582,9 @@ public void handleCommit(ApplyCommitRequest applyCommit) {
);

persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL).markLastAcceptedStateAsCommitted();
if (isRemotePublicationEnabled && localNode.isClusterManagerNode()) {
persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).markLastAcceptedStateAsCommitted();
}
assert getLastCommittedConfiguration().equals(getLastAcceptedConfiguration());
}

Expand Down Expand Up @@ -661,14 +675,7 @@ public interface PersistedState extends Closeable {
*/
default void markLastAcceptedStateAsCommitted() {
final ClusterState lastAcceptedState = getLastAcceptedState();
Metadata.Builder metadataBuilder = null;
if (lastAcceptedState.getLastAcceptedConfiguration().equals(lastAcceptedState.getLastCommittedConfiguration()) == false) {
final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder(lastAcceptedState.coordinationMetadata())
.lastCommittedConfiguration(lastAcceptedState.getLastAcceptedConfiguration())
.build();
metadataBuilder = Metadata.builder(lastAcceptedState.metadata());
metadataBuilder.coordinationMetadata(coordinationMetadata);
}
Metadata.Builder metadataBuilder = commitVotingConfiguration(lastAcceptedState);
// if we receive a commit from a Zen1 cluster-manager that has not recovered its state yet,
// the cluster uuid might not been known yet.
assert lastAcceptedState.metadata().clusterUUID().equals(Metadata.UNKNOWN_CLUSTER_UUID) == false
Expand All @@ -693,6 +700,18 @@ default void markLastAcceptedStateAsCommitted() {
}
}

default Metadata.Builder commitVotingConfiguration(ClusterState lastAcceptedState) {
Metadata.Builder metadataBuilder = null;
if (lastAcceptedState.getLastAcceptedConfiguration().equals(lastAcceptedState.getLastCommittedConfiguration()) == false) {
final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder(lastAcceptedState.coordinationMetadata())
.lastCommittedConfiguration(lastAcceptedState.getLastAcceptedConfiguration())
.build();
metadataBuilder = Metadata.builder(lastAcceptedState.metadata());
metadataBuilder.coordinationMetadata(coordinationMetadata);
}
return metadataBuilder;
}

default void close() throws IOException {}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque
}
fullClusterStateReceivedCount.incrementAndGet();
logger.debug("received full cluster state version [{}] with size [{}]", incomingState.version(), request.bytes().length());
final PublishWithJoinResponse response = acceptState(incomingState);
final PublishWithJoinResponse response = acceptState(incomingState, null);
lastSeenClusterState.set(incomingState);
return response;
} else {
Expand Down Expand Up @@ -220,7 +220,7 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque
incomingState.stateUUID(),
request.bytes().length()
);
final PublishWithJoinResponse response = acceptState(incomingState);
final PublishWithJoinResponse response = acceptState(incomingState, null);
lastSeenClusterState.compareAndSet(lastSeen, incomingState);
return response;
}
Expand Down Expand Up @@ -270,7 +270,7 @@ PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest
true
);
fullClusterStateReceivedCount.incrementAndGet();
final PublishWithJoinResponse response = acceptState(clusterState);
final PublishWithJoinResponse response = acceptState(clusterState, manifest);
lastSeenClusterState.set(clusterState);
return response;
} else {
Expand All @@ -289,13 +289,13 @@ PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest
transportService.getLocalNode().getId()
);
compatibleClusterStateDiffReceivedCount.incrementAndGet();
final PublishWithJoinResponse response = acceptState(clusterState);
final PublishWithJoinResponse response = acceptState(clusterState, manifest);
lastSeenClusterState.compareAndSet(lastSeen, clusterState);
return response;
}
}

private PublishWithJoinResponse acceptState(ClusterState incomingState) {
private PublishWithJoinResponse acceptState(ClusterState incomingState, ClusterMetadataManifest manifest) {
// if the state is coming from the current node, use original request instead (see currentPublishRequestToSelf for explanation)
if (transportService.getLocalNode().equals(incomingState.nodes().getClusterManagerNode())) {
final PublishRequest publishRequest = currentPublishRequestToSelf.get();
Expand All @@ -305,7 +305,7 @@ private PublishWithJoinResponse acceptState(ClusterState incomingState) {
return handlePublishRequest.apply(publishRequest);
}
}
return handlePublishRequest.apply(new PublishRequest(incomingState));
return handlePublishRequest.apply(new PublishRequest(incomingState, manifest));
}

private PublishWithJoinResponse acceptRemoteStateOnLocalNode(RemotePublishRequest remotePublishRequest) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
package org.opensearch.cluster.coordination;

import org.opensearch.cluster.ClusterState;
import org.opensearch.gateway.remote.ClusterMetadataManifest;

import java.util.Objects;

Expand All @@ -44,15 +45,26 @@
public class PublishRequest {

private final ClusterState acceptedState;
private final ClusterMetadataManifest manifest;

public PublishRequest(ClusterState acceptedState) {
this.acceptedState = acceptedState;
this.manifest = null;
}

public PublishRequest(ClusterState acceptedState, ClusterMetadataManifest manifest) {
this.acceptedState = acceptedState;
this.manifest = manifest;
}

public ClusterState getAcceptedState() {
return acceptedState;
}

public ClusterMetadataManifest getManifest() {
return manifest;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand All @@ -70,6 +82,13 @@ public int hashCode() {

@Override
public String toString() {
return "PublishRequest{term=" + acceptedState.term() + ", version=" + acceptedState.version() + ", state=" + acceptedState + '}';
return "PublishRequest{term="
+ acceptedState.term()
+ ", version="
+ acceptedState.version()
+ ", state="
+ acceptedState
+ (manifest != null ? ", manifest=" + manifest : "")
+ '}';
}
}
Loading

0 comments on commit e6633da

Please sign in to comment.