From d254d1693004b17850ac5c523eca055b2b9a7753 Mon Sep 17 00:00:00 2001 From: Arpit Bandejiya Date: Fri, 7 Jun 2024 13:03:01 +0530 Subject: [PATCH] Add more cluster service change for review Signed-off-by: Arpit Bandejiya --- .../remote/RemoteRoutingTableService.java | 16 ---- .../remote/RemoteClusterStateService.java | 82 +++++++++++++++---- 2 files changed, 68 insertions(+), 30 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java index 109a9f23d4558..6543a52279b2a 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java @@ -94,22 +94,6 @@ public RemoteRoutingTableService(Supplier repositoriesServi this.threadPool = threadPool; } - public List getAllUploadedIndicesRouting(ClusterMetadataManifest previousManifest, List indicesRoutingToUpload, Set indicesRoutingToDelete) { - final Map allUploadedIndicesRouting = previousManifest.getIndicesRouting() - .stream() - .collect(Collectors.toMap(ClusterMetadataManifest.UploadedIndexMetadata::getIndexName, Function.identity())); - - indicesRoutingToUpload.forEach( - uploadedIndexRouting -> allUploadedIndicesRouting.put(uploadedIndexRouting.getIndexName(), uploadedIndexRouting) - ); - - indicesRoutingToDelete.forEach(allUploadedIndicesRouting::remove); - - logger.info("allUploadedIndicesRouting ROUTING {}", allUploadedIndicesRouting); - - return new ArrayList<>(allUploadedIndicesRouting.values()); - } - private String getIndexRoutingFileName() { return String.join( diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index a7ae28acc8413..e23360de1f5fe 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -13,6 +13,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.Version; import org.opensearch.action.LatchedActionListener; +import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.block.ClusterBlocks; import org.opensearch.cluster.coordination.CoordinationMetadata; @@ -822,6 +823,73 @@ private ClusterMetadataManifest uploadManifest( } } + public ClusterState getClusterStateForManifest(String clusterName, ClusterMetadataManifest manifest, String localNodeId, boolean includeEphemeral) + throws IOException { + return readClusterStateInParallel( + ClusterState.builder(new ClusterName(clusterName)).build(), + manifest, + clusterName, + manifest.getClusterUUID(), + localNodeId, + manifest.getIndices(), + manifest.getCustomMetadataMap(), + manifest.getCoordinationMetadata() != null, + manifest.getSettingsMetadata() != null, + manifest.getTransientSettingsMetadata() != null, + manifest.getTemplatesMetadata() != null, + includeEphemeral && manifest.getDiscoveryNodesMetadata() != null, + includeEphemeral && manifest.getClusterBlocksMetadata() != null, + includeEphemeral ? manifest.getIndicesRouting() : Collections.emptyList(), + includeEphemeral && manifest.getHashesOfConsistentSettings() != null, + includeEphemeral ? manifest.getClusterStateCustomMap() : Collections.emptyMap() + ); + } + + public ClusterState getClusterStateUsingDiff(String clusterName, ClusterMetadataManifest manifest, ClusterState previousState, String localNodeId) + throws IOException { + assert manifest.getDiffManifest() != null; + + + List updatedIndexRouting = remoteRoutingTableService.get().getUpdatedIndexRoutingTableMetadata(diff.getIndicesRoutingUpdated(), + manifest.getIndicesRouting()); + + + ClusterState updatedClusterState = readClusterStateInParallel( + previousState, + manifest, + clusterName, + manifest.getClusterUUID(), + localNodeId, + updatedIndices, + updatedCustomMetadata, + diff.isCoordinationMetadataUpdated(), + diff.isSettingsMetadataUpdated(), + diff.isTransientSettingsMetadataUpdated(), + diff.isTemplatesMetadataUpdated(), + diff.isDiscoveryNodesUpdated(), + diff.isClusterBlocksUpdated(), + updatedIndexRouting, + diff.isHashesOfConsistentSettingsUpdated(), + updatedClusterStateCustom + ); + ClusterState.Builder clusterStateBuilder = ClusterState.builder(updatedClusterState); + + HashMap indexRoutingTables = new HashMap<>(updatedClusterState.getRoutingTable().getIndicesRouting()); + + for (String indexName : diff.getIndicesRoutingDeleted()) { + indexRoutingTables.remove(indexName); + } + + RoutingTable routingTable = new RoutingTable(manifest.getRoutingTableVersion(), indexRoutingTables); + + return clusterStateBuilder. + stateUUID(manifest.getStateUUID()). + version(manifest.getStateVersion()). + metadata(metadataBuilder). + routingTable(routingTable). + build(); + } + private ClusterState readClusterStateInParallel( ClusterState previousState, ClusterMetadataManifest manifest, @@ -850,20 +918,6 @@ private ClusterState readClusterStateInParallel( List readIndexRoutingTableResults = Collections.synchronizedList(new ArrayList<>()); List exceptionList = Collections.synchronizedList(new ArrayList<>(totalReadTasks)); - LatchedActionListener listener = new LatchedActionListener<>( - ActionListener.wrap( - response -> { - logger.debug("Successfully read cluster state component from remote"); - readResults.add(response); - }, - ex -> { - logger.error("Failed to read cluster state from remote", ex); - exceptionList.add(ex); - } - ), - latch - ); - for (UploadedIndexMetadata indexMetadata : indicesToRead) { asyncMetadataReadActions.add( remoteIndexMetadataManager.getAsyncIndexMetadataReadAction(