Skip to content

Commit

Permalink
create publication repos during join task execution (opensearch-proje…
Browse files Browse the repository at this point in the history
…ct#16383)

* create publication repos during join task

Signed-off-by: Rajiv Kumar Vaidyanathan <[email protected]>
  • Loading branch information
rajiv-kv committed Dec 2, 2024
1 parent d631bbc commit a3c6f00
Show file tree
Hide file tree
Showing 4 changed files with 275 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.remotemigration.MigrationBaseTestCase;
import org.opensearch.remotestore.multipart.mocks.MockFsRepositoryPlugin;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.repositories.fs.FsRepository;
import org.opensearch.repositories.fs.ReloadableFsRepository;
import org.opensearch.test.InternalSettingsPlugin;
import org.opensearch.test.OpenSearchIntegTestCase;
Expand Down Expand Up @@ -97,23 +98,26 @@ public Settings.Builder remotePublishConfiguredNodeSetting() {
.put(stateRepoSettingsAttributeKeyPrefix + prefixModeVerificationSuffix, true)
.put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true)
.put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, ROUTING_TABLE_REPO_NAME)
.put(routingTableRepoTypeAttributeKey, ReloadableFsRepository.TYPE)
.put(routingTableRepoTypeAttributeKey, FsRepository.TYPE)
.put(routingTableRepoSettingsAttributeKeyPrefix + "location", segmentRepoPath);
return builder;
}

public Settings.Builder remoteWithRoutingTableNodeSetting() {
// Remote Cluster with Routing table

return Settings.builder()
.put(
buildRemoteStoreNodeAttributes(
remoteStoreClusterSettings(
REPOSITORY_NAME,
segmentRepoPath,
ReloadableFsRepository.TYPE,
REPOSITORY_2_NAME,
translogRepoPath,
ReloadableFsRepository.TYPE,
REPOSITORY_NAME,
segmentRepoPath,
false
ReloadableFsRepository.TYPE
)
)
.put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.metadata.RepositoriesMetadata;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.RerouteService;
Expand All @@ -59,6 +60,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -190,11 +192,30 @@ public ClusterTasksResult<Task> execute(ClusterState currentState, List<Task> jo
// for every set of node join task which we can optimize to not compute if cluster state already has
// repository information.
Optional<DiscoveryNode> remoteDN = currentNodes.getNodes().values().stream().filter(DiscoveryNode::isRemoteStoreNode).findFirst();
DiscoveryNode dn = remoteDN.orElseGet(() -> (currentNodes.getNodes().values()).stream().findFirst().get());
RepositoriesMetadata repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata(
dn,
currentState.getMetadata().custom(RepositoriesMetadata.TYPE)
);
Optional<DiscoveryNode> remotePublicationDN = currentNodes.getNodes()
.values()
.stream()
.filter(DiscoveryNode::isRemoteStatePublicationEnabled)
.findFirst();
RepositoriesMetadata existingRepositoriesMetadata = currentState.getMetadata().custom(RepositoriesMetadata.TYPE);
Map<String, RepositoryMetadata> repositories = new LinkedHashMap<>();
if (existingRepositoriesMetadata != null) {
existingRepositoriesMetadata.repositories().forEach(r -> repositories.putIfAbsent(r.name(), r));
}
if (remoteDN.isPresent()) {
RepositoriesMetadata repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata(
remoteDN.get(),
existingRepositoriesMetadata
);
repositoriesMetadata.repositories().forEach(r -> repositories.putIfAbsent(r.name(), r));
}
if (remotePublicationDN.isPresent()) {
RepositoriesMetadata repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata(
remotePublicationDN.get(),
existingRepositoriesMetadata
);
repositoriesMetadata.repositories().forEach(r -> repositories.putIfAbsent(r.name(), r));
}

assert nodesBuilder.isLocalNodeElectedClusterManager();

Expand Down Expand Up @@ -224,15 +245,16 @@ public ClusterTasksResult<Task> execute(ClusterState currentState, List<Task> jo
ensureNodeCommissioned(node, currentState.metadata());
nodesBuilder.add(node);

if (remoteDN.isEmpty() && node.isRemoteStoreNode()) {
if ((remoteDN.isEmpty() && node.isRemoteStoreNode())
|| (remotePublicationDN.isEmpty() && node.isRemoteStatePublicationEnabled())) {
// This is hit only on cases where we encounter first remote node
logger.info("Updating system repository now for remote store");
repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata(
RepositoriesMetadata repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata(
node,
currentState.getMetadata().custom(RepositoriesMetadata.TYPE)
existingRepositoriesMetadata
);
repositoriesMetadata.repositories().forEach(r -> repositories.putIfAbsent(r.name(), r));
}

nodesChanged = true;
minClusterNodeVersion = Version.min(minClusterNodeVersion, node.getVersion());
maxClusterNodeVersion = Version.max(maxClusterNodeVersion, node.getVersion());
Expand All @@ -246,7 +268,7 @@ public ClusterTasksResult<Task> execute(ClusterState currentState, List<Task> jo
}
results.success(joinTask);
}

RepositoriesMetadata repositoriesMetadata = new RepositoriesMetadata(new ArrayList<>(repositories.values()));
if (nodesChanged) {
rerouteService.reroute(
"post-join reroute",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public void createAndVerifyRepositories(DiscoveryNode localNode) {
* node repository metadata an exception will be thrown and the node will not be allowed to join the cluster.
*/
public RepositoriesMetadata updateRepositoriesMetadata(DiscoveryNode joiningNode, RepositoriesMetadata existingRepositories) {
if (joiningNode.isRemoteStoreNode()) {
if (joiningNode.isRemoteStoreNode() || joiningNode.isRemoteStatePublicationEnabled()) {
List<RepositoryMetadata> updatedRepositoryMetadataList = new ArrayList<>();
List<RepositoryMetadata> newRepositoryMetadataList = new RemoteStoreNodeAttribute(joiningNode).getRepositoriesMetadata()
.repositories();
Expand Down
Loading

0 comments on commit a3c6f00

Please sign in to comment.