Skip to content

Commit

Permalink
[Remote Cluster State] Move remote cluster state related settings to …
Browse files Browse the repository at this point in the history
…node attrs

Signed-off-by: bansvaru <[email protected]>
  • Loading branch information
linuxpi committed Sep 6, 2023
1 parent 6fe9388 commit c8892e6
Show file tree
Hide file tree
Showing 11 changed files with 103 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

Expand Down Expand Up @@ -222,14 +223,27 @@ public static Settings buildRemoteStoreNodeAttributes(
"node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
translogRepoName
);
String stateRepoTypeAttributeKey = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
translogRepoName
);
String stateRepoSettingsAttributeKeyPrefix = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
translogRepoName
);

Settings.Builder settings = Settings.builder()
.put("node.attr." + REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY, segmentRepoName)
.put(segmentRepoTypeAttributeKey, FsRepository.TYPE)
.put(segmentRepoSettingsAttributeKeyPrefix + "location", segmentRepoPath)
.put("node.attr." + REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY, translogRepoName)
.put(translogRepoTypeAttributeKey, FsRepository.TYPE)
.put(translogRepoSettingsAttributeKeyPrefix + "location", translogRepoPath);
.put(translogRepoSettingsAttributeKeyPrefix + "location", translogRepoPath)
.put("node.attr." + REMOTE_STORE_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, segmentRepoName)
.put(stateRepoTypeAttributeKey, FsRepository.TYPE)
.put(stateRepoSettingsAttributeKeyPrefix + "location", segmentRepoPath);

if (withRateLimiterAttributes) {
settings.put(segmentRepoSettingsAttributeKeyPrefix + "compress", randomBoolean())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
import java.util.Set;

import static org.opensearch.cluster.coordination.Coordinator.ZEN1_BWC_TERM;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreAttributePresent;

/**
* The core class of the cluster state coordination algorithm, directly implementing the
Expand Down Expand Up @@ -101,7 +101,7 @@ public CoordinationState(
.getLastAcceptedState()
.getLastAcceptedConfiguration();
this.publishVotes = new VoteCollection();
this.isRemoteStateEnabled = REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings);
this.isRemoteStateEnabled = isRemoteStoreAttributePresent(settings);
}

public long getCurrentTerm() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@
import org.opensearch.gateway.DanglingIndicesState;
import org.opensearch.gateway.GatewayService;
import org.opensearch.gateway.PersistedClusterStateService;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.http.HttpTransportSettings;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexSettings;
Expand Down Expand Up @@ -669,8 +668,8 @@ public void apply(Settings value, Settings current, Settings previous) {
TaskCancellationMonitoringSettings.DURATION_MILLIS_SETTING,

// Remote cluster state settings
RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING,
RemoteClusterStateService.REMOTE_CLUSTER_STATE_REPOSITORY_SETTING,
// RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING,
// RemoteClusterStateService.REMOTE_CLUSTER_STATE_REPOSITORY_SETTING,
RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
import java.util.function.UnaryOperator;

import static org.opensearch.common.util.concurrent.OpenSearchExecutors.daemonThreadFactory;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreAttributePresent;

/**
* Loads (and maybe upgrades) cluster metadata at startup, and persistently stores cluster metadata for future restarts.
Expand Down Expand Up @@ -165,7 +165,7 @@ public void start(

if (DiscoveryNode.isClusterManagerNode(settings)) {
persistedState = new LucenePersistedState(persistedClusterStateService, currentTerm, clusterState);
if (REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings) == true) {
if (isRemoteStoreAttributePresent(settings) == true) {
remotePersistedState = new RemotePersistedState(remoteClusterStateService);
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata;
import org.opensearch.index.remote.RemoteStoreUtils;
import org.opensearch.node.Node;
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
Expand All @@ -45,6 +47,7 @@
import java.util.stream.Collectors;

import static org.opensearch.gateway.PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreAttributePresent;

/**
* A Service which provides APIs to upload and download cluster metadata from remote store.
Expand Down Expand Up @@ -106,7 +109,7 @@ public RemoteClusterStateService(
ClusterSettings clusterSettings,
LongSupplier relativeTimeNanosSupplier
) {
assert REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings) == true : "Remote cluster state is not enabled";
assert isRemoteStoreAttributePresent(settings) == true : "Remote cluster state is not enabled";
this.nodeId = nodeId;
this.repositoriesService = repositoriesService;
this.settings = settings;
Expand Down Expand Up @@ -283,8 +286,10 @@ void ensureRepositorySet() {
if (blobStoreRepository != null) {
return;
}
assert REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings) == true : "Remote cluster state is not enabled";
final String remoteStoreRepo = REMOTE_CLUSTER_STATE_REPOSITORY_SETTING.get(settings);
// assert REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings) == true : "Remote cluster state is not enabled";
final String remoteStoreRepo = settings.get(
Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY
);
assert remoteStoreRepo != null : "Remote Cluster State repository is not configured";
final Repository repository = repositoriesService.get().repository(remoteStoreRepo);
assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository";
Expand Down
2 changes: 1 addition & 1 deletion server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,7 @@ protected Node(
threadPool::relativeTimeInMillis
);
final RemoteClusterStateService remoteClusterStateService;
if (RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings) == true) {
if (isRemoteStoreAttributePresent(settings) == true) {
remoteClusterStateService = new RemoteClusterStateService(
nodeEnvironment.nodeId(),
repositoriesServiceReference::get,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class RemoteStoreNodeAttribute {
public static final String REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX = "remote_store";
public static final String REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY = "remote_store.segment.repository";
public static final String REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY = "remote_store.translog.repository";
public static final String REMOTE_STORE_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY = "remote_store.state.repository";
public static final String REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT = "remote_store.repository.%s.type";
public static final String REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX = "remote_store.repository.%s.settings.";
private final RepositoriesMetadata repositoriesMetadata;
Expand Down Expand Up @@ -98,6 +99,7 @@ private RepositoriesMetadata buildRepositoriesMetadata(DiscoveryNode node) {

repositoryNames.add(validateAttributeNonNull(node, REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY));
repositoryNames.add(validateAttributeNonNull(node, REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY));
repositoryNames.add(validateAttributeNonNull(node, REMOTE_STORE_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY));

for (String repositoryName : repositoryNames) {
repositoryMetadataList.add(buildRepositoryMetadata(node, repositoryName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,14 @@
import org.opensearch.gateway.GatewayMetaState.RemotePersistedState;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.repositories.fs.FsRepository;
import org.opensearch.test.EqualsHashCodeTestUtils;
import org.opensearch.test.OpenSearchTestCase;
import org.junit.Before;

import java.io.IOException;
import java.util.Collections;
import java.util.Locale;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand All @@ -62,6 +64,9 @@

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
Expand Down Expand Up @@ -937,9 +942,24 @@ public void testHandlePrePublishAndCommitWhenRemoteStateEnabled() throws IOExcep
final PersistedStateRegistry persistedStateRegistry = persistedStateRegistry();
persistedStateRegistry.addPersistedState(PersistedStateType.LOCAL, ps1);
persistedStateRegistry.addPersistedState(PersistedStateType.REMOTE, new RemotePersistedState(remoteClusterStateService));
final Settings settings = Settings.builder()
.put(RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true)

String stateRepoTypeAttributeKey = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
"randomRepoName"
);
String stateRepoSettingsAttributeKeyPrefix = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
"randomRepoName"
);

Settings settings = Settings.builder()
.put("node.attr." + REMOTE_STORE_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, "randomRepoName")
.put(stateRepoTypeAttributeKey, FsRepository.TYPE)
.put(stateRepoSettingsAttributeKeyPrefix + "location", "randomRepoPath")
.build();

final CoordinationState coordinationState = createCoordinationState(persistedStateRegistry, node1, settings);
coordinationState.handlePrePublish(clusterState);
Mockito.verifyNoInteractions(remoteClusterStateService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.node.Node;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.fs.FsRepository;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
Expand All @@ -79,12 +80,17 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import org.mockito.Mockito;

import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreAttributePresent;
import static org.opensearch.test.NodeRoles.nonClusterManagerNode;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
Expand Down Expand Up @@ -445,7 +451,7 @@ public void testDataOnlyNodePersistence() throws Exception {
() -> 0L
);
Supplier<RemoteClusterStateService> remoteClusterStateServiceSupplier = () -> {
if (RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings) == true) {
if (isRemoteStoreAttributePresent(settings) == true) {
return new RemoteClusterStateService(
nodeEnvironment.nodeId(),
() -> new RepositoriesService(
Expand Down Expand Up @@ -757,11 +763,25 @@ public void testGatewayForRemoteState() throws IOException {
try {
gateway = new MockGatewayMetaState(localNode, bigArrays);
final PersistedStateRegistry persistedStateRegistry = persistedStateRegistry();
final Settings settingWithRemoteStateEnabled = Settings.builder()
.put(settings)
.put(RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true)

String stateRepoTypeAttributeKey = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
"randomRepoName"
);
String stateRepoSettingsAttributeKeyPrefix = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
"randomRepoName"
);

Settings settings = Settings.builder()
.put("node.attr." + REMOTE_STORE_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, "randomRepoName")
.put(stateRepoTypeAttributeKey, FsRepository.TYPE)
.put(stateRepoSettingsAttributeKeyPrefix + "location", "randomRepoPath")
.build();
gateway.start(settingWithRemoteStateEnabled, nodeEnvironment, xContentRegistry(), persistedStateRegistry);
gateway.start(settings, nodeEnvironment, xContentRegistry(), persistedStateRegistry);

final CoordinationState.PersistedState persistedState = gateway.getPersistedState();
assertThat(persistedState, instanceOf(GatewayMetaState.LucenePersistedState.class));
assertThat(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.RepositoryMissingException;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.repositories.fs.FsRepository;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.VersionUtils;
import org.junit.Assert;
Expand All @@ -44,11 +45,15 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.function.Supplier;

import org.mockito.ArgumentMatchers;

import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
Expand All @@ -70,10 +75,24 @@ public void setup() {
repositoriesServiceSupplier = mock(Supplier.class);
repositoriesService = mock(RepositoriesService.class);
when(repositoriesServiceSupplier.get()).thenReturn(repositoriesService);
final Settings settings = Settings.builder()
.put(RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true)
.put(RemoteClusterStateService.REMOTE_CLUSTER_STATE_REPOSITORY_SETTING.getKey(), "remote_store_repository")

String stateRepoTypeAttributeKey = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
"remote_store_repository"
);
String stateRepoSettingsAttributeKeyPrefix = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
"remote_store_repository"
);

Settings settings = Settings.builder()
.put("node.attr." + REMOTE_STORE_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, "remote_store_repository")
.put(stateRepoTypeAttributeKey, FsRepository.TYPE)
.put(stateRepoSettingsAttributeKeyPrefix + "location", "randomRepoPath")
.build();

blobStoreRepository = mock(BlobStoreRepository.class);
blobStore = mock(BlobStore.class);
when(blobStoreRepository.blobStore()).thenReturn(blobStore);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreAttributePresent;

/**
* {@link GatewayMetaState} constructor accepts a lot of arguments.
Expand Down Expand Up @@ -108,7 +109,7 @@ public void start(
throw new AssertionError(e);
}
Supplier<RemoteClusterStateService> remoteClusterStateServiceSupplier = () -> {
if (RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings) == true) {
if (isRemoteStoreAttributePresent(settings) == true) {
return new RemoteClusterStateService(
nodeEnvironment.nodeId(),
() -> new RepositoriesService(
Expand Down

0 comments on commit c8892e6

Please sign in to comment.