diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteRoutingTableServiceIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteRoutingTableServiceIT.java deleted file mode 100644 index ccb437edde2b0..0000000000000 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteRoutingTableServiceIT.java +++ /dev/null @@ -1,2 +0,0 @@ -package org.opensearch.gateway.remote;public class RemoteRoutingTableServiceIT { -} diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java index c01dc47a7012f..30522b3dd9df0 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java @@ -39,7 +39,7 @@ public DiffableUtils.MapDiff CUSTOM_ROUTING_TABLE_VALUE_SERIALIZER = - new DiffableUtils.NonDiffableValueSerializer() { - @Override - public void write(IndexRoutingTable value, StreamOutput out) throws IOException { - value.writeTo(out); - } - - @Override - public IndexRoutingTable read(StreamInput in, String key) throws IOException { - return IndexRoutingTable.readFrom(in); - } - }; public static final DiffableUtils.DiffableValueSerializer CUSTOM_ROUTING_TABLE_DIFFABLE_VALUE_SERIALIZER = new DiffableUtils.DiffableValueSerializer() { diff --git a/server/src/main/java/org/opensearch/gateway/remote/ClusterStateDiffManifest.java b/server/src/main/java/org/opensearch/gateway/remote/ClusterStateDiffManifest.java index 4592880f37ca1..fd1395ccc116b 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/ClusterStateDiffManifest.java +++ b/server/src/main/java/org/opensearch/gateway/remote/ClusterStateDiffManifest.java @@ -77,7 +77,7 @@ public class ClusterStateDiffManifest implements ToXContentFragment, Writeable { private final List clusterStateCustomUpdated; private final List clusterStateCustomDeleted; - public ClusterStateDiffManifest(ClusterState state, ClusterState previousState) { + public ClusterStateDiffManifest(ClusterState state, ClusterState previousState, DiffableUtils.MapDiff> routingTableIncrementalDiff) { fromStateUUID = previousState.stateUUID(); toStateUUID = state.stateUUID(); coordinationMetadataUpdated = !Metadata.isCoordinationMetadataEqual(state.metadata(), previousState.metadata()); @@ -104,24 +104,12 @@ public ClusterStateDiffManifest(ClusterState state, ClusterState previousState) customMetadataUpdated.addAll(customDiff.getUpserts().keySet()); customMetadataDeleted = customDiff.getDeletes(); - DiffableUtils.MapDiff> routingTableDiff = DiffableUtils.diff( - previousState.getRoutingTable().getIndicesRouting(), - state.getRoutingTable().getIndicesRouting(), - DiffableUtils.getStringKeySerializer(), - CUSTOM_ROUTING_TABLE_DIFFABLE_VALUE_SERIALIZER - ); - - DiffableUtils.MapDiff> routingTableDiff1 = DiffableUtils.diff( - previousState.getRoutingTable().getIndicesRouting(), - state.getRoutingTable().getIndicesRouting(), - DiffableUtils.getStringKeySerializer(), - CUSTOM_ROUTING_TABLE_DIFFABLE_VALUE_SERIALIZER - ); - indicesRoutingUpdated = new ArrayList<>(); - routingTableDiff.getUpserts().forEach((k, v) -> indicesRoutingUpdated.add(k)); - - indicesRoutingDeleted = routingTableDiff.getDeletes(); + indicesRoutingDeleted = new ArrayList<>(); + if (routingTableIncrementalDiff != null) { + routingTableIncrementalDiff.getUpserts().forEach((k, v) -> indicesRoutingUpdated.add(k)); + indicesRoutingDeleted.addAll(routingTableIncrementalDiff.getDeletes()); + } hashesOfConsistentSettingsUpdated = !state.metadata() .hashesOfConsistentSettings() .equals(previousState.metadata().hashesOfConsistentSettings()); @@ -590,7 +578,8 @@ public static class Builder { private List clusterStateCustomUpdated; private List clusterStateCustomDeleted; - public Builder() {} + public Builder() { + } public Builder fromStateUUID(String fromStateUUID) { this.fromStateUUID = fromStateUUID; @@ -667,7 +656,7 @@ public Builder indicesRoutingDeleted(List indicesRoutingDeleted) { return this; } - public Builder indicesRoutingDiffPath(String indicesRoutingDiffPath){ + public Builder indicesRoutingDiffPath(String indicesRoutingDiffPath) { this.indicesRoutingDiff = indicesRoutingDiffPath; return this; } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index b8fe98f43d1ed..7c55c6d5aafa9 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -242,11 +242,15 @@ public RemoteClusterStateManifestInfo writeFullMetadata(ClusterState clusterStat remoteRoutingTableService.getIndicesRouting(clusterState.getRoutingTable()), null ); + ClusterStateDiffManifest clusterStateDiffManifest = new ClusterStateDiffManifest(clusterState, ClusterState.EMPTY_STATE, null); + if (uploadedMetadataResults.uploadedIndicesRoutingDiffMetadata != null) { + clusterStateDiffManifest.setIndicesRoutingDiffPath(uploadedMetadataResults.uploadedIndicesRoutingDiffMetadata.getUploadedFilePath()); + } final RemoteClusterStateManifestInfo manifestDetails = remoteManifestManager.uploadManifest( clusterState, uploadedMetadataResults, previousClusterUUID, - new ClusterStateDiffManifest(clusterState, ClusterState.EMPTY_STATE), + clusterStateDiffManifest, false ); @@ -341,8 +345,8 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata( .getIndicesRoutingMapDiff(previousClusterState.getRoutingTable(), clusterState.getRoutingTable()); Map> indexRoutingTableDiffs = routingTableIncrementalDiff.getDiffs(); - routingTableIncrementalDiff.getDiffs().forEach((k,v)-> indicesRoutingToUpload.add(clusterState.getRoutingTable().index(k))); - routingTableIncrementalDiff.getUpserts().forEach((k,v)-> indicesRoutingToUpload.add(v)); + routingTableIncrementalDiff.getDiffs().forEach((k, v) -> indicesRoutingToUpload.add(clusterState.getRoutingTable().index(k))); + routingTableIncrementalDiff.getUpserts().forEach((k, v) -> indicesRoutingToUpload.add(v)); UploadedMetadataResults uploadedMetadataResults; // For migration case from codec V0 or V1 to V2, we have added null check on metadata attribute files, @@ -424,11 +428,15 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata( routingTableIncrementalDiff.getDeletes() ); + ClusterStateDiffManifest clusterStateDiffManifest = new ClusterStateDiffManifest(clusterState, previousClusterState, routingTableIncrementalDiff); + if (uploadedMetadataResults.uploadedIndicesRoutingDiffMetadata != null) { + clusterStateDiffManifest.setIndicesRoutingDiffPath(uploadedMetadataResults.uploadedIndicesRoutingDiffMetadata.getUploadedFilePath()); + } final RemoteClusterStateManifestInfo manifestDetails = remoteManifestManager.uploadManifest( clusterState, uploadedMetadataResults, previousManifest.getPreviousClusterUUID(), - new ClusterStateDiffManifest(clusterState, previousClusterState), + clusterStateDiffManifest, false ); @@ -506,7 +514,7 @@ UploadedMetadataResults writeMetadataInParallel( + (uploadCoordinationMetadata ? 1 : 0) + (uploadSettingsMetadata ? 1 : 0) + (uploadTemplateMetadata ? 1 : 0) + (uploadDiscoveryNodes ? 1 : 0) + (uploadClusterBlock ? 1 : 0) + (uploadTransientSettingMetadata ? 1 : 0) + clusterStateCustomToUpload.size() + (uploadHashesOfConsistentSettings ? 1 : 0) + indicesRoutingToUpload.size() - + (indexRoutingTableDiff != null && !indexRoutingTableDiff.isEmpty()? 1 : 0); + + (indexRoutingTableDiff != null && !indexRoutingTableDiff.isEmpty() ? 1 : 0); CountDownLatch latch = new CountDownLatch(totalUploadTasks); BlobPath clusterBasePath = getClusterMetadataBasePath( blobStoreRepository, @@ -678,7 +686,7 @@ UploadedMetadataResults writeMetadataInParallel( }); indicesRoutingToUpload.forEach(indexRoutingTable -> { uploadTasks.put( - InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX, + InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + indexRoutingTable.getIndex().getName(), remoteRoutingTableService.getIndexRoutingAsyncAction( clusterState, indexRoutingTable, @@ -748,7 +756,7 @@ UploadedMetadataResults writeMetadataInParallel( && uploadedMetadata.getComponent().contains(InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX)) { response.uploadedIndicesRoutingMetadata.add((UploadedIndexMetadata) uploadedMetadata); } else if (uploadedMetadata.getClass().equals(UploadedIndexMetadata.class) - && uploadedMetadata.getComponent().contains(InternalRemoteRoutingTableService.INDEX_ROUTING_DIFF_METADATA_PREFIX)){ + && uploadedMetadata.getComponent().contains(InternalRemoteRoutingTableService.INDEX_ROUTING_DIFF_METADATA_PREFIX)) { response.uploadedIndicesRoutingDiffMetadata = (UploadedIndexMetadata) uploadedMetadata; } else if (name.startsWith(CUSTOM_METADATA)) { // component name for custom metadata will look like custom-- @@ -1024,7 +1032,7 @@ private ClusterState readClusterStateInParallel( ? 1 : 0) + (readTemplatesMetadata ? 1 : 0) + (readDiscoveryNodes ? 1 : 0) + (readClusterBlocks ? 1 : 0) + (readTransientSettingsMetadata ? 1 : 0) + (readHashesOfConsistentSettings ? 1 : 0) + clusterStateCustomToRead.size() - + indicesRoutingToRead.size() + (readIndexRoutingTableDiff ? 1:0); + + indicesRoutingToRead.size() + (readIndexRoutingTableDiff ? 1 : 0); CountDownLatch latch = new CountDownLatch(totalReadTasks); List> asyncMetadataReadActions = new ArrayList<>(); List readResults = Collections.synchronizedList(new ArrayList<>()); @@ -1346,7 +1354,7 @@ public ClusterState getClusterStateForManifest( includeEphemeral ? manifest.getIndicesRouting() : emptyList(), includeEphemeral && manifest.getHashesOfConsistentSettings() != null, includeEphemeral ? manifest.getClusterStateCustomMap() : emptyMap(), - manifest.getDiffManifest() != null && manifest.getDiffManifest().getIndicesRoutingDiffPath()!=null && !manifest.getDiffManifest().getIndicesRoutingDiffPath().isEmpty(), + includeEphemeral && manifest.getDiffManifest() != null && manifest.getDiffManifest().getIndicesRoutingDiffPath() != null && !manifest.getDiffManifest().getIndicesRoutingDiffPath().isEmpty(), includeEphemeral ); } else { @@ -1428,7 +1436,7 @@ public ClusterState getClusterStateUsingDiff( updatedIndexRouting, diff.isHashesOfConsistentSettingsUpdated(), updatedClusterStateCustom, - manifest.getDiffManifest() != null && manifest.getDiffManifest().getIndicesRoutingDiffPath()!=null && !manifest.getDiffManifest().getIndicesRoutingDiffPath().isEmpty(), + manifest.getDiffManifest() != null && manifest.getDiffManifest().getIndicesRoutingDiffPath() != null && !manifest.getDiffManifest().getIndicesRoutingDiffPath().isEmpty(), true ); ClusterState.Builder clusterStateBuilder = ClusterState.builder(updatedClusterState); diff --git a/server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteIndexShardRoutingTableDiff.java b/server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteIndexShardRoutingTableDiff.java index e69de29bb2d1d..eeea4d7d3291f 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteIndexShardRoutingTableDiff.java +++ b/server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteIndexShardRoutingTableDiff.java @@ -0,0 +1,89 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote.routingtable; + +import org.opensearch.cluster.Diff; +import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.cluster.routing.IndexShardRoutingTable; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Represents the differences between {@link IndexRoutingTable}s for uploading and downloading index routing table diff data to/from a remote store. + * + * @opensearch.internal + */ +public class RemoteIndexShardRoutingTableDiff implements Diff { + + private final List indexShardRoutingTables; + + /** + * Constructs a new RemoteIndexShardRoutingTableDiff with the given shard routing tables. + * + * @param indexShardRoutingTables the list of {@link IndexShardRoutingTable}s representing the differences. + */ + public RemoteIndexShardRoutingTableDiff(List indexShardRoutingTables) { + this.indexShardRoutingTables = indexShardRoutingTables; + } + + /** + * Applies the differences to the provided {@link IndexRoutingTable}. + * + * @param part the original {@link IndexRoutingTable}. + * @return the updated {@link IndexRoutingTable} with the applied differences. + */ + @Override + public IndexRoutingTable apply(IndexRoutingTable part) { + IndexRoutingTable.Builder builder = new IndexRoutingTable.Builder(part.getIndex()); + for (IndexShardRoutingTable shardRoutingTable : part) { + builder.addIndexShard(shardRoutingTable); // Add existing shards to builder + } + + // Apply the diff: update or add the new shard routing tables + for (IndexShardRoutingTable diffShard : indexShardRoutingTables) { + builder.addIndexShard(diffShard); + } + return builder.build(); + } + + /** + * Writes the shard routing tables to the provided {@link StreamOutput}. + * + * @param out the output stream to write to. + * @throws IOException if an I/O exception occurs while writing. + */ + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVInt(indexShardRoutingTables.size()); + for (IndexShardRoutingTable shardRoutingTable : indexShardRoutingTables) { + IndexShardRoutingTable.Builder.writeTo(shardRoutingTable, out); + } + } + + /** + * Reads the shard routing tables from the provided {@link StreamInput}. + * + * @param in the input stream to read from. + * @return a new {@link RemoteIndexShardRoutingTableDiff} with the read shard routing tables. + * @throws IOException if an I/O exception occurs while reading. + */ + public static RemoteIndexShardRoutingTableDiff readFrom(StreamInput in) throws IOException { + int size = in.readVInt(); + List indexShardRoutingTables = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + IndexShardRoutingTable shardRoutingTable = IndexShardRoutingTable.Builder.readFrom(in); + indexShardRoutingTables.add(shardRoutingTable); + } + return new RemoteIndexShardRoutingTableDiff(indexShardRoutingTables); + } +} diff --git a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java index 02471c9cdbbbe..fd312001b58bd 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java @@ -191,7 +191,8 @@ public void testClusterMetadataManifestSerializationEqualsHashCode() { .diffManifest( new ClusterStateDiffManifest( RemoteClusterStateServiceTests.generateClusterStateWithOneIndex().build(), - ClusterState.EMPTY_STATE + ClusterState.EMPTY_STATE, + null ) ) .build(); @@ -523,7 +524,8 @@ public void testClusterMetadataManifestXContentV2() throws IOException { .diffManifest( new ClusterStateDiffManifest( RemoteClusterStateServiceTests.generateClusterStateWithOneIndex().build(), - ClusterState.EMPTY_STATE + ClusterState.EMPTY_STATE, + null ) ) .build(); diff --git a/server/src/test/java/org/opensearch/gateway/remote/model/ClusterStateDiffManifestTests.java b/server/src/test/java/org/opensearch/gateway/remote/model/ClusterStateDiffManifestTests.java index 897b2f5eeb25d..8fe017abbc646 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/model/ClusterStateDiffManifestTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/model/ClusterStateDiffManifestTests.java @@ -191,7 +191,7 @@ private ClusterStateDiffManifest updateAndVerifyState( } ClusterState updatedClusterState = clusterStateBuilder.metadata(metadataBuilder.build()).build(); - ClusterStateDiffManifest manifest = new ClusterStateDiffManifest(updatedClusterState, initialState); + ClusterStateDiffManifest manifest = new ClusterStateDiffManifest(updatedClusterState, initialState, null); assertEquals(indicesToAdd.stream().map(im -> im.getIndex().getName()).collect(toList()), manifest.getIndicesUpdated()); assertEquals(indicesToRemove, manifest.getIndicesDeleted()); assertEquals(new ArrayList<>(customsToAdd.keySet()), manifest.getCustomMetadataUpdated());