Skip to content

Commit

Permalink
Add more cluster service change for review
Browse files Browse the repository at this point in the history
Signed-off-by: Arpit Bandejiya <[email protected]>
  • Loading branch information
Arpit-Bandejiya committed Jun 7, 2024
1 parent 768585e commit d254d16
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,22 +94,6 @@ public RemoteRoutingTableService(Supplier<RepositoriesService> repositoriesServi
this.threadPool = threadPool;
}

public List<ClusterMetadataManifest.UploadedIndexMetadata> getAllUploadedIndicesRouting(ClusterMetadataManifest previousManifest, List<ClusterMetadataManifest.UploadedIndexMetadata> indicesRoutingToUpload, Set<String> indicesRoutingToDelete) {
final Map<String, ClusterMetadataManifest.UploadedIndexMetadata> allUploadedIndicesRouting = previousManifest.getIndicesRouting()
.stream()
.collect(Collectors.toMap(ClusterMetadataManifest.UploadedIndexMetadata::getIndexName, Function.identity()));

indicesRoutingToUpload.forEach(
uploadedIndexRouting -> allUploadedIndicesRouting.put(uploadedIndexRouting.getIndexName(), uploadedIndexRouting)
);

indicesRoutingToDelete.forEach(allUploadedIndicesRouting::remove);

logger.info("allUploadedIndicesRouting ROUTING {}", allUploadedIndicesRouting);

return new ArrayList<>(allUploadedIndicesRouting.values());
}


private String getIndexRoutingFileName() {
return String.join(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.Version;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.block.ClusterBlocks;
import org.opensearch.cluster.coordination.CoordinationMetadata;
Expand Down Expand Up @@ -822,6 +823,73 @@ private ClusterMetadataManifest uploadManifest(
}
}

public ClusterState getClusterStateForManifest(String clusterName, ClusterMetadataManifest manifest, String localNodeId, boolean includeEphemeral)
throws IOException {
return readClusterStateInParallel(
ClusterState.builder(new ClusterName(clusterName)).build(),
manifest,
clusterName,
manifest.getClusterUUID(),
localNodeId,
manifest.getIndices(),
manifest.getCustomMetadataMap(),
manifest.getCoordinationMetadata() != null,
manifest.getSettingsMetadata() != null,
manifest.getTransientSettingsMetadata() != null,
manifest.getTemplatesMetadata() != null,
includeEphemeral && manifest.getDiscoveryNodesMetadata() != null,
includeEphemeral && manifest.getClusterBlocksMetadata() != null,
includeEphemeral ? manifest.getIndicesRouting() : Collections.emptyList(),
includeEphemeral && manifest.getHashesOfConsistentSettings() != null,
includeEphemeral ? manifest.getClusterStateCustomMap() : Collections.emptyMap()
);
}

public ClusterState getClusterStateUsingDiff(String clusterName, ClusterMetadataManifest manifest, ClusterState previousState, String localNodeId)
throws IOException {
assert manifest.getDiffManifest() != null;


List<UploadedIndexMetadata> updatedIndexRouting = remoteRoutingTableService.get().getUpdatedIndexRoutingTableMetadata(diff.getIndicesRoutingUpdated(),
manifest.getIndicesRouting());


ClusterState updatedClusterState = readClusterStateInParallel(
previousState,
manifest,
clusterName,
manifest.getClusterUUID(),
localNodeId,
updatedIndices,
updatedCustomMetadata,
diff.isCoordinationMetadataUpdated(),
diff.isSettingsMetadataUpdated(),
diff.isTransientSettingsMetadataUpdated(),
diff.isTemplatesMetadataUpdated(),
diff.isDiscoveryNodesUpdated(),
diff.isClusterBlocksUpdated(),
updatedIndexRouting,
diff.isHashesOfConsistentSettingsUpdated(),
updatedClusterStateCustom
);
ClusterState.Builder clusterStateBuilder = ClusterState.builder(updatedClusterState);

HashMap<String, IndexRoutingTable> indexRoutingTables = new HashMap<>(updatedClusterState.getRoutingTable().getIndicesRouting());

for (String indexName : diff.getIndicesRoutingDeleted()) {
indexRoutingTables.remove(indexName);
}

RoutingTable routingTable = new RoutingTable(manifest.getRoutingTableVersion(), indexRoutingTables);

return clusterStateBuilder.
stateUUID(manifest.getStateUUID()).
version(manifest.getStateVersion()).
metadata(metadataBuilder).
routingTable(routingTable).
build();
}

private ClusterState readClusterStateInParallel(
ClusterState previousState,
ClusterMetadataManifest manifest,
Expand Down Expand Up @@ -850,20 +918,6 @@ private ClusterState readClusterStateInParallel(
List<IndexRoutingTable> readIndexRoutingTableResults = Collections.synchronizedList(new ArrayList<>());
List<Exception> exceptionList = Collections.synchronizedList(new ArrayList<>(totalReadTasks));

LatchedActionListener<RemoteReadResult> listener = new LatchedActionListener<>(
ActionListener.wrap(
response -> {
logger.debug("Successfully read cluster state component from remote");
readResults.add(response);
},
ex -> {
logger.error("Failed to read cluster state from remote", ex);
exceptionList.add(ex);
}
),
latch
);

for (UploadedIndexMetadata indexMetadata : indicesToRead) {
asyncMetadataReadActions.add(
remoteIndexMetadataManager.getAsyncIndexMetadataReadAction(
Expand Down

0 comments on commit d254d16

Please sign in to comment.