Skip to content

Commit

Permalink
Add setting to disable diff application in remote publication
Browse files Browse the repository at this point in the history
Signed-off-by: Shivansh Arora <[email protected]>
  • Loading branch information
shiv0408 committed Aug 13, 2024
1 parent 170ea27 commit 513badb
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@
import java.util.function.Consumer;
import java.util.function.Function;

import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_PUBLICATION_APPLY_FULL_STATE;

/**
* Transport handler for publication
*
Expand Down Expand Up @@ -243,7 +245,12 @@ PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest
}
boolean applyFullState = false;
final ClusterState lastSeen = lastSeenClusterState.get();
if (lastSeen == null) {
if (remoteClusterStateService.getRemotePublicationApplyFullState()) {
logger.debug(
() -> "Using full state for publication as " + REMOTE_PUBLICATION_APPLY_FULL_STATE.getKey() + " setting is enabled"
);
applyFullState = true;
} else if (lastSeen == null) {
logger.debug(() -> "Diff cannot be applied as there is no last cluster state");
applyFullState = true;
} else if (manifest.getDiffManifest() == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,7 @@ public void apply(Settings value, Settings current, Settings previous) {
IndicesService.CLUSTER_INDEX_RESTRICT_REPLICATION_TYPE_SETTING,
RemoteRoutingTableBlobStore.REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING,
RemoteRoutingTableBlobStore.REMOTE_ROUTING_TABLE_PATH_HASH_ALGO_SETTING,
RemoteClusterStateService.REMOTE_PUBLICATION_APPLY_FULL_STATE,

// Admission Control Settings
AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,18 @@ public class RemoteClusterStateService implements Closeable {
Setting.Property.NodeScope
);

/**
* If enabled, the remote publication flow will always apply full state and not use the diff.
* This is to be used as an andon cord in case we want to rebuild the full cluster state on nodes from remote.
* If disabled, we will use the diff manifest shared by cluster manager to build the cluster state on nodes.
*/
public static final Setting<Boolean> REMOTE_PUBLICATION_APPLY_FULL_STATE = Setting.boolSetting(
"cluster.remote_publication.apply_full_state",
false,
Property.Dynamic,
Property.NodeScope
);

private TimeValue remoteStateReadTimeout;
private final String nodeId;
private final Supplier<RepositoriesService> repositoriesService;
Expand All @@ -148,6 +160,7 @@ public class RemoteClusterStateService implements Closeable {
private BlobStoreTransferService blobStoreTransferService;
private RemoteRoutingTableService remoteRoutingTableService;
private volatile TimeValue slowWriteLoggingThreshold;
private boolean remotePublicationApplyFullState;

private final RemotePersistenceStats remoteStateStats;
private RemoteClusterStateCleanupManager remoteClusterStateCleanupManager;
Expand Down Expand Up @@ -194,6 +207,8 @@ public RemoteClusterStateService(
clusterSettings.addSettingsUpdateConsumer(SLOW_WRITE_LOGGING_THRESHOLD, this::setSlowWriteLoggingThreshold);
this.remoteStateReadTimeout = clusterSettings.get(REMOTE_STATE_READ_TIMEOUT_SETTING);
clusterSettings.addSettingsUpdateConsumer(REMOTE_STATE_READ_TIMEOUT_SETTING, this::setRemoteStateReadTimeout);
this.remotePublicationApplyFullState = clusterSettings.get(REMOTE_PUBLICATION_APPLY_FULL_STATE);
clusterSettings.addSettingsUpdateConsumer(REMOTE_PUBLICATION_APPLY_FULL_STATE, this::setRemotePublicationApplyFullState);
this.remoteStateStats = new RemotePersistenceStats();
this.namedWriteableRegistry = namedWriteableRegistry;
this.indexMetadataUploadListeners = indexMetadataUploadListeners;
Expand Down Expand Up @@ -241,17 +256,11 @@ public RemoteClusterStateManifestInfo writeFullMetadata(ClusterState clusterStat
null
);

ClusterStateDiffManifest clusterStateDiffManifest = new ClusterStateDiffManifest(
clusterState,
ClusterState.EMPTY_STATE,
null,
null
);
final RemoteClusterStateManifestInfo manifestDetails = remoteManifestManager.uploadManifest(
clusterState,
uploadedMetadataResults,
previousClusterUUID,
clusterStateDiffManifest,
null,
false
);

Expand Down Expand Up @@ -429,14 +438,18 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata(
routingTableIncrementalDiff.getDeletes()
);

ClusterStateDiffManifest clusterStateDiffManifest = new ClusterStateDiffManifest(
clusterState,
previousClusterState,
routingTableIncrementalDiff,
uploadedMetadataResults.uploadedIndicesRoutingDiffMetadata != null
? uploadedMetadataResults.uploadedIndicesRoutingDiffMetadata.getUploadedFilename()
: null
);
ClusterStateDiffManifest clusterStateDiffManifest = null;
if (remotePublicationApplyFullState == false) {
logger.debug("skipping cluster state diff calculation as apply full state is enabled");
clusterStateDiffManifest = new ClusterStateDiffManifest(
clusterState,
previousClusterState,
routingTableIncrementalDiff,
uploadedMetadataResults.uploadedIndicesRoutingDiffMetadata != null
? uploadedMetadataResults.uploadedIndicesRoutingDiffMetadata.getUploadedFilename()
: null
);
}

final RemoteClusterStateManifestInfo manifestDetails = remoteManifestManager.uploadManifest(
clusterState,
Expand Down Expand Up @@ -1643,4 +1656,13 @@ public void writeMetadataFailed() {
public RemotePersistenceStats getStats() {
return remoteStateStats;
}

public void setRemotePublicationApplyFullState(boolean applyFullState) {
logger.info("Updating {} to: {}", REMOTE_PUBLICATION_APPLY_FULL_STATE.getKey(), applyFullState);
this.remotePublicationApplyFullState = applyFullState;
}

public boolean getRemotePublicationApplyFullState() {
return this.remotePublicationApplyFullState;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

public class PublicationTransportHandlerTests extends OpenSearchTestCase {
Expand Down Expand Up @@ -288,6 +290,37 @@ public void testHandleIncomingRemotePublishRequestWhenNoLastSeenState() throws I
Mockito.verify(remoteClusterStateService, times(1)).getClusterMetadataManifestByFileName(Mockito.any(), Mockito.any());
}

public void testHandleIncomingRemotePublicationRequest_WhenApplyFullStateSettingEnabled() throws IOException {
RemoteClusterStateService remoteClusterStateService = mock(RemoteClusterStateService.class);

PublishWithJoinResponse expectedPublishResponse = new PublishWithJoinResponse(new PublishResponse(TERM, VERSION), Optional.empty());
Function<PublishRequest, PublishWithJoinResponse> handlePublishRequest = p -> expectedPublishResponse;
final PublicationTransportHandler handler = getPublicationTransportHandler(handlePublishRequest, remoteClusterStateService);
RemotePublishRequest remotePublishRequest = new RemotePublishRequest(
secondNode,
TERM,
VERSION,
CLUSTER_NAME,
CLUSTER_UUID,
MANIFEST_FILE
);
ClusterMetadataManifest manifest = ClusterMetadataManifest.builder().clusterTerm(TERM).stateVersion(VERSION).build();
when(remoteClusterStateService.getRemotePublicationApplyFullState()).thenReturn(true);
when(remoteClusterStateService.getClusterMetadataManifestByFileName(CLUSTER_UUID, MANIFEST_FILE)).thenReturn(manifest);
when(remoteClusterStateService.getClusterStateForManifest(CLUSTER_NAME, manifest, LOCAL_NODE_ID, true)).thenReturn(
buildClusterState(TERM, VERSION)
);
ClusterState clusterState = buildClusterState(TERM, VERSION);
PublishRequest publishRequest = new PublishRequest(clusterState);
handler.setCurrentPublishRequestToSelf(publishRequest);
PublishWithJoinResponse publishWithJoinResponse = handler.handleIncomingRemotePublishRequest(remotePublishRequest);
assertThat(publishWithJoinResponse, is(expectedPublishResponse));
verify(remoteClusterStateService, times(1)).getClusterMetadataManifestByFileName(Mockito.any(), Mockito.any());
verify(remoteClusterStateService, times(1)).getClusterStateForManifest(CLUSTER_NAME, manifest, LOCAL_NODE_ID, true);
verify(remoteClusterStateService, times(1)).getRemotePublicationApplyFullState();
verifyNoMoreInteractions(remoteClusterStateService);
}

private PublicationTransportHandler getPublicationTransportHandler(
Function<PublishRequest, PublishWithJoinResponse> handlePublishRequest,
RemoteClusterStateService remoteClusterStateService
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V2;
import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.CLUSTER_BLOCKS;
import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.CLUSTER_STATE_ATTRIBUTE;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_PUBLICATION_APPLY_FULL_STATE;
import static org.opensearch.gateway.remote.RemoteClusterStateTestUtils.CustomMetadata1;
import static org.opensearch.gateway.remote.RemoteClusterStateTestUtils.CustomMetadata2;
import static org.opensearch.gateway.remote.RemoteClusterStateTestUtils.CustomMetadata3;
Expand Down Expand Up @@ -170,6 +171,7 @@
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;

public class RemoteClusterStateServiceTests extends OpenSearchTestCase {
Expand Down Expand Up @@ -789,6 +791,45 @@ public void testWriteIncrementalMetadataSuccessWhenPublicationEnabled() throws I
assertThat(manifest.getCoordinationMetadata(), notNullValue());
assertThat(manifest.getCustomMetadataMap().size(), is(2));
assertThat(manifest.getIndicesRouting().size(), is(1));
assertNotNull(manifest.getDiffManifest());
}

public void testWriteIncrementalMetadataNotSendDiffIfApplyFullStateEnabled() throws IOException {
Settings nodeSettings = Settings.builder().put(REMOTE_PUBLICATION_EXPERIMENTAL, true).build();
FeatureFlags.initializeFeatureFlags(nodeSettings);
Settings applyFullStateSetting = Settings.builder().put(REMOTE_PUBLICATION_APPLY_FULL_STATE.getKey(), true).build();
clusterSettings.applySettings(applyFullStateSetting);
remoteClusterStateService = new RemoteClusterStateService(
"test-node-id",
repositoriesServiceSupplier,
settings,
clusterService,
() -> 0L,
threadPool,
List.of(new RemoteIndexPathUploader(threadPool, settings, repositoriesServiceSupplier, clusterSettings)),
writableRegistry()
);
final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build();
mockBlobStoreObjects();
final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder().term(1L).build();
final ClusterState previousClusterState = ClusterState.builder(ClusterName.DEFAULT)
.metadata(Metadata.builder().coordinationMetadata(coordinationMetadata))
.build();

final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder().indices(Collections.emptyList()).build();

remoteClusterStateService.start();
ClusterStateDiffManifest mockCSDM = mock(ClusterStateDiffManifest.class);
final RemoteClusterStateService rcssSpy = Mockito.spy(remoteClusterStateService);
final RemoteClusterStateManifestInfo manifestInfo = rcssSpy.writeIncrementalMetadata(
previousClusterState,
clusterState,
previousManifest
);
final ClusterMetadataManifest manifest = manifestInfo.getClusterMetadataManifest();

assertNull(manifest.getDiffManifest());
verifyNoInteractions(mockCSDM);
}

public void testTimeoutWhileWritingMetadata() throws IOException {
Expand Down Expand Up @@ -2860,6 +2901,14 @@ public void testWriteIncrementalMetadataSuccessWithRoutingTableDiffNull() throws
assertThat(manifest.getDiffManifest().getIndicesRoutingDiffPath(), nullValue());
}

public void testRemotePublicationApplyFullState() {
assertFalse(remoteClusterStateService.getRemotePublicationApplyFullState());
Settings newSetting = Settings.builder().put(REMOTE_PUBLICATION_APPLY_FULL_STATE.getKey(), true).build();
clusterSettings.applySettings(newSetting);

assertTrue(remoteClusterStateService.getRemotePublicationApplyFullState());
}

private void initializeRoutingTable() {
Settings newSettings = Settings.builder()
.put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, "routing_repository")
Expand Down

0 comments on commit 513badb

Please sign in to comment.