From 06698dd292ecd74e86c4ffbb26270ebeabd7ce31 Mon Sep 17 00:00:00 2001 From: shailendra0811 <167273922+shailendra0811@users.noreply.github.com> Date: Wed, 24 Jul 2024 12:28:55 +0530 Subject: [PATCH] [Remote Routing Table] Implement write and read flow for shard diff file. (#14684) (#14922) * Implement write and read flow to upload/download shard diff file. Signed-off-by: Shailendra Singh --- CHANGELOG.md | 1 + .../remote/RemoteRoutingTableServiceIT.java | 97 +++++- .../routing/RoutingTableIncrementalDiff.java | 168 ++++++++++ .../InternalRemoteRoutingTableService.java | 73 +++- .../remote/NoopRemoteRoutingTableService.java | 33 +- .../remote/RemoteRoutingTableService.java | 48 ++- .../remote/ClusterMetadataManifest.java | 15 +- .../remote/ClusterStateDiffManifest.java | 60 +++- .../RemoteClusterStateCleanupManager.java | 26 ++ .../remote/RemoteClusterStateService.java | 94 +++++- .../remote/RemoteClusterStateUtils.java | 1 + .../remote/RemotePersistenceStats.java | 11 + .../model/RemoteClusterMetadataManifest.java | 7 +- .../routingtable/RemoteRoutingTableDiff.java | 150 +++++++++ .../RemoteRoutingTableServiceTests.java | 165 ++++++++- .../remote/ClusterMetadataManifestTests.java | 81 ++++- ...RemoteClusterStateCleanupManagerTests.java | 146 ++++++++ .../RemoteClusterStateServiceTests.java | 177 +++++++++- .../model/ClusterStateDiffManifestTests.java | 69 +++- .../RemoteIndexRoutingTableDiffTests.java | 317 ++++++++++++++++++ 20 files changed, 1663 insertions(+), 76 deletions(-) create mode 100644 server/src/main/java/org/opensearch/cluster/routing/RoutingTableIncrementalDiff.java create mode 100644 server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteRoutingTableDiff.java create mode 100644 server/src/test/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTableDiffTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 73efb06dbce3f..afd94d80f1fe5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add matchesPluginSystemIndexPattern to SystemIndexRegistry ([#14750](https://github.com/opensearch-project/OpenSearch/pull/14750)) - Add Plugin interface for loading application based configuration templates (([#14659](https://github.com/opensearch-project/OpenSearch/issues/14659))) - Refactor remote-routing-table service inline with remote state interfaces([#14668](https://github.com/opensearch-project/OpenSearch/pull/14668)) +- Add shard-diff path to diff manifest to reduce number of read calls remote store (([#14684](https://github.com/opensearch-project/OpenSearch/pull/14684))) - Add SortResponseProcessor to Search Pipelines (([#14785](https://github.com/opensearch-project/OpenSearch/issues/14785))) - Add prefix mode verification setting for repository verification (([#14790](https://github.com/opensearch-project/OpenSearch/pull/14790))) - Add SplitResponseProcessor to Search Pipelines (([#14800](https://github.com/opensearch-project/OpenSearch/issues/14800))) diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteRoutingTableServiceIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteRoutingTableServiceIT.java index ef10a93989a53..89d5b3f734ad7 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteRoutingTableServiceIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteRoutingTableServiceIT.java @@ -8,6 +8,7 @@ package org.opensearch.gateway.remote; +import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; import org.opensearch.action.admin.cluster.state.ClusterStateRequest; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.routing.IndexRoutingTable; @@ -32,16 +33,19 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import static org.opensearch.common.util.FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL; import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING; import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE; +import static org.opensearch.indices.IndicesService.CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class RemoteRoutingTableServiceIT extends RemoteStoreBaseIntegTestCase { private static final String INDEX_NAME = "test-index"; + private static final String INDEX_NAME_1 = "test-index-1"; BlobPath indexRoutingPath; AtomicInteger indexRoutingFiles = new AtomicInteger(); private final RemoteStoreEnums.PathType pathType = RemoteStoreEnums.PathType.HASHED_PREFIX; @@ -72,7 +76,13 @@ public void testRemoteRoutingTableIndexLifecycle() throws Exception { RemoteClusterStateService.class ); RemoteManifestManager remoteManifestManager = remoteClusterStateService.getRemoteManifestManager(); - verifyUpdatesInManifestFile(remoteManifestManager); + Optional latestManifest = remoteManifestManager.getLatestClusterMetadataManifest( + getClusterState().getClusterName().value(), + getClusterState().getMetadata().clusterUUID() + ); + List expectedIndexNames = new ArrayList<>(); + List deletedIndexNames = new ArrayList<>(); + verifyUpdatesInManifestFile(latestManifest, expectedIndexNames, 1, deletedIndexNames, true); List routingTableVersions = getRoutingTableFromAllNodes(); assertTrue(areRoutingTablesSame(routingTableVersions)); @@ -86,7 +96,11 @@ public void testRemoteRoutingTableIndexLifecycle() throws Exception { assertTrue(indexRoutingFilesAfterUpdate >= indexRoutingFiles.get() + 3); }); - verifyUpdatesInManifestFile(remoteManifestManager); + latestManifest = remoteManifestManager.getLatestClusterMetadataManifest( + getClusterState().getClusterName().value(), + getClusterState().getMetadata().clusterUUID() + ); + verifyUpdatesInManifestFile(latestManifest, expectedIndexNames, 1, deletedIndexNames, true); routingTableVersions = getRoutingTableFromAllNodes(); assertTrue(areRoutingTablesSame(routingTableVersions)); @@ -98,6 +112,42 @@ public void testRemoteRoutingTableIndexLifecycle() throws Exception { assertTrue(areRoutingTablesSame(routingTableVersions)); } + public void testRemoteRoutingTableEmptyRoutingTableDiff() throws Exception { + prepareClusterAndVerifyRepository(); + + RemoteClusterStateService remoteClusterStateService = internalCluster().getClusterManagerNodeInstance( + RemoteClusterStateService.class + ); + RemoteManifestManager remoteManifestManager = remoteClusterStateService.getRemoteManifestManager(); + Optional latestManifest = remoteManifestManager.getLatestClusterMetadataManifest( + getClusterState().getClusterName().value(), + getClusterState().getMetadata().clusterUUID() + ); + List expectedIndexNames = new ArrayList<>(); + List deletedIndexNames = new ArrayList<>(); + verifyUpdatesInManifestFile(latestManifest, expectedIndexNames, 1, deletedIndexNames, true); + + List routingTableVersions = getRoutingTableFromAllNodes(); + assertTrue(areRoutingTablesSame(routingTableVersions)); + + // Update cluster settings + ClusterUpdateSettingsResponse response = client().admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings(Settings.builder().put(CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING.getKey(), 0, TimeUnit.SECONDS)) + .get(); + assertTrue(response.isAcknowledged()); + + latestManifest = remoteManifestManager.getLatestClusterMetadataManifest( + getClusterState().getClusterName().value(), + getClusterState().getMetadata().clusterUUID() + ); + verifyUpdatesInManifestFile(latestManifest, expectedIndexNames, 1, deletedIndexNames, false); + + routingTableVersions = getRoutingTableFromAllNodes(); + assertTrue(areRoutingTablesSame(routingTableVersions)); + } + public void testRemoteRoutingTableIndexNodeRestart() throws Exception { BlobStoreRepository repository = prepareClusterAndVerifyRepository(); @@ -124,10 +174,16 @@ public void testRemoteRoutingTableIndexNodeRestart() throws Exception { RemoteClusterStateService.class ); RemoteManifestManager remoteManifestManager = remoteClusterStateService.getRemoteManifestManager(); - verifyUpdatesInManifestFile(remoteManifestManager); + Optional latestManifest = remoteManifestManager.getLatestClusterMetadataManifest( + getClusterState().getClusterName().value(), + getClusterState().getMetadata().clusterUUID() + ); + List expectedIndexNames = new ArrayList<>(); + List deletedIndexNames = new ArrayList<>(); + verifyUpdatesInManifestFile(latestManifest, expectedIndexNames, 1, deletedIndexNames, true); } - public void testRemoteRoutingTableIndexMasterRestart1() throws Exception { + public void testRemoteRoutingTableIndexMasterRestart() throws Exception { BlobStoreRepository repository = prepareClusterAndVerifyRepository(); List routingTableVersions = getRoutingTableFromAllNodes(); @@ -153,7 +209,13 @@ public void testRemoteRoutingTableIndexMasterRestart1() throws Exception { RemoteClusterStateService.class ); RemoteManifestManager remoteManifestManager = remoteClusterStateService.getRemoteManifestManager(); - verifyUpdatesInManifestFile(remoteManifestManager); + Optional latestManifest = remoteManifestManager.getLatestClusterMetadataManifest( + getClusterState().getClusterName().value(), + getClusterState().getMetadata().clusterUUID() + ); + List expectedIndexNames = new ArrayList<>(); + List deletedIndexNames = new ArrayList<>(); + verifyUpdatesInManifestFile(latestManifest, expectedIndexNames, 1, deletedIndexNames, true); } private BlobStoreRepository prepareClusterAndVerifyRepository() throws Exception { @@ -208,18 +270,23 @@ private BlobPath getIndexRoutingPath(BlobPath indexRoutingPath, String indexUUID ); } - private void verifyUpdatesInManifestFile(RemoteManifestManager remoteManifestManager) { - Optional latestManifest = remoteManifestManager.getLatestClusterMetadataManifest( - getClusterState().getClusterName().value(), - getClusterState().getMetadata().clusterUUID() - ); + private void verifyUpdatesInManifestFile( + Optional latestManifest, + List expectedIndexNames, + int expectedIndicesRoutingFilesInManifest, + List expectedDeletedIndex, + boolean isRoutingTableDiffFileExpected + ) { assertTrue(latestManifest.isPresent()); ClusterMetadataManifest manifest = latestManifest.get(); - assertTrue(manifest.getDiffManifest().getIndicesRoutingUpdated().contains(INDEX_NAME)); - assertTrue(manifest.getDiffManifest().getIndicesDeleted().isEmpty()); - assertFalse(manifest.getIndicesRouting().isEmpty()); - assertEquals(1, manifest.getIndicesRouting().size()); - assertTrue(manifest.getIndicesRouting().get(0).getUploadedFilename().contains(indexRoutingPath.buildAsString())); + + assertEquals(expectedIndexNames, manifest.getDiffManifest().getIndicesRoutingUpdated()); + assertEquals(expectedDeletedIndex, manifest.getDiffManifest().getIndicesDeleted()); + assertEquals(expectedIndicesRoutingFilesInManifest, manifest.getIndicesRouting().size()); + for (ClusterMetadataManifest.UploadedIndexMetadata uploadedFilename : manifest.getIndicesRouting()) { + assertTrue(uploadedFilename.getUploadedFilename().contains(indexRoutingPath.buildAsString())); + } + assertEquals(isRoutingTableDiffFileExpected, manifest.getDiffManifest().getIndicesRoutingDiffPath() != null); } private List getRoutingTableFromAllNodes() throws ExecutionException, InterruptedException { diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingTableIncrementalDiff.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingTableIncrementalDiff.java new file mode 100644 index 0000000000000..3d75b22a8ed7f --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingTableIncrementalDiff.java @@ -0,0 +1,168 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing; + +import org.opensearch.cluster.Diff; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Represents a difference between {@link RoutingTable} objects that can be serialized and deserialized. + */ +public class RoutingTableIncrementalDiff implements Diff { + + private final Map> diffs; + + /** + * Constructs a new RoutingTableIncrementalDiff with the given differences. + * + * @param diffs a map containing the differences of {@link IndexRoutingTable}. + */ + public RoutingTableIncrementalDiff(Map> diffs) { + this.diffs = diffs; + } + + /** + * Gets the map of differences of {@link IndexRoutingTable}. + * + * @return a map containing the differences. + */ + public Map> getDiffs() { + return diffs; + } + + /** + * Reads a {@link RoutingTableIncrementalDiff} from the given {@link StreamInput}. + * + * @param in the input stream to read from. + * @return the deserialized RoutingTableIncrementalDiff. + * @throws IOException if an I/O exception occurs while reading from the stream. + */ + public static RoutingTableIncrementalDiff readFrom(StreamInput in) throws IOException { + int size = in.readVInt(); + Map> diffs = new HashMap<>(); + + for (int i = 0; i < size; i++) { + String key = in.readString(); + Diff diff = IndexRoutingTableIncrementalDiff.readFrom(in); + diffs.put(key, diff); + } + return new RoutingTableIncrementalDiff(diffs); + } + + /** + * Applies the differences to the provided {@link RoutingTable}. + * + * @param part the original RoutingTable to which the differences will be applied. + * @return the updated RoutingTable with the applied differences. + */ + @Override + public RoutingTable apply(RoutingTable part) { + RoutingTable.Builder builder = new RoutingTable.Builder(); + for (IndexRoutingTable indexRoutingTable : part) { + builder.add(indexRoutingTable); // Add existing index routing tables to builder + } + + // Apply the diffs + for (Map.Entry> entry : diffs.entrySet()) { + builder.add(entry.getValue().apply(part.index(entry.getKey()))); + } + + return builder.build(); + } + + /** + * Writes the differences to the given {@link StreamOutput}. + * + * @param out the output stream to write to. + * @throws IOException if an I/O exception occurs while writing to the stream. + */ + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVInt(diffs.size()); + for (Map.Entry> entry : diffs.entrySet()) { + out.writeString(entry.getKey()); + entry.getValue().writeTo(out); + } + } + + /** + * Represents a difference between {@link IndexShardRoutingTable} objects that can be serialized and deserialized. + */ + public static class IndexRoutingTableIncrementalDiff implements Diff { + + private final List indexShardRoutingTables; + + /** + * Constructs a new IndexShardRoutingTableDiff with the given shard routing tables. + * + * @param indexShardRoutingTables a list of IndexShardRoutingTable representing the differences. + */ + public IndexRoutingTableIncrementalDiff(List indexShardRoutingTables) { + this.indexShardRoutingTables = indexShardRoutingTables; + } + + /** + * Applies the differences to the provided {@link IndexRoutingTable}. + * + * @param part the original IndexRoutingTable to which the differences will be applied. + * @return the updated IndexRoutingTable with the applied differences. + */ + @Override + public IndexRoutingTable apply(IndexRoutingTable part) { + IndexRoutingTable.Builder builder = new IndexRoutingTable.Builder(part.getIndex()); + for (IndexShardRoutingTable shardRoutingTable : part) { + builder.addIndexShard(shardRoutingTable); // Add existing shards to builder + } + + // Apply the diff: update or add the new shard routing tables + for (IndexShardRoutingTable diffShard : indexShardRoutingTables) { + builder.addIndexShard(diffShard); + } + return builder.build(); + } + + /** + * Writes the differences to the given {@link StreamOutput}. + * + * @param out the output stream to write to. + * @throws IOException if an I/O exception occurs while writing to the stream. + */ + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVInt(indexShardRoutingTables.size()); + for (IndexShardRoutingTable shardRoutingTable : indexShardRoutingTables) { + IndexShardRoutingTable.Builder.writeTo(shardRoutingTable, out); + } + } + + /** + * Reads a {@link IndexRoutingTableIncrementalDiff} from the given {@link StreamInput}. + * + * @param in the input stream to read from. + * @return the deserialized IndexShardRoutingTableDiff. + * @throws IOException if an I/O exception occurs while reading from the stream. + */ + public static IndexRoutingTableIncrementalDiff readFrom(StreamInput in) throws IOException { + int size = in.readVInt(); + List indexShardRoutingTables = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + IndexShardRoutingTable shardRoutingTable = IndexShardRoutingTable.Builder.readFrom(in); + indexShardRoutingTables.add(shardRoutingTable); + } + return new IndexRoutingTableIncrementalDiff(indexShardRoutingTables); + } + } +} diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java index 7c918c266148a..1f2a9d64e81a8 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java @@ -12,9 +12,11 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.action.LatchedActionListener; +import org.opensearch.cluster.Diff; import org.opensearch.cluster.DiffableUtils; import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.RoutingTableIncrementalDiff; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.lifecycle.AbstractLifecycleComponent; import org.opensearch.common.remote.RemoteWritableEntityStore; @@ -25,8 +27,10 @@ import org.opensearch.core.compress.Compressor; import org.opensearch.gateway.remote.ClusterMetadataManifest; import org.opensearch.gateway.remote.RemoteStateTransferException; +import org.opensearch.gateway.remote.model.RemoteClusterStateBlobStore; import org.opensearch.gateway.remote.model.RemoteRoutingTableBlobStore; import org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable; +import org.opensearch.gateway.remote.routingtable.RemoteRoutingTableDiff; import org.opensearch.index.translog.transfer.BlobStoreTransferService; import org.opensearch.node.Node; import org.opensearch.node.remotestore.RemoteStoreNodeAttribute; @@ -58,6 +62,7 @@ public class InternalRemoteRoutingTableService extends AbstractLifecycleComponen private final Supplier repositoriesService; private Compressor compressor; private RemoteWritableEntityStore remoteIndexRoutingTableStore; + private RemoteWritableEntityStore remoteRoutingTableDiffStore; private final ClusterSettings clusterSettings; private BlobStoreRepository blobStoreRepository; private final ThreadPool threadPool; @@ -84,9 +89,10 @@ public List getIndicesRouting(RoutingTable routingTable) { /** * Returns diff between the two routing tables, which includes upserts and deletes. + * * @param before previous routing table - * @param after current routing table - * @return diff of the previous and current routing table + * @param after current routing table + * @return incremental diff of the previous and current routing table */ public DiffableUtils.MapDiff> getIndicesRoutingMapDiff( RoutingTable before, @@ -96,7 +102,7 @@ public DiffableUtils.MapDiff> indexRoutingTableDiff, + LatchedActionListener latchedActionListener + ) { + RoutingTableIncrementalDiff routingTableIncrementalDiff = new RoutingTableIncrementalDiff(indexRoutingTableDiff); + RemoteRoutingTableDiff remoteRoutingTableDiff = new RemoteRoutingTableDiff( + routingTableIncrementalDiff, + clusterUUID, + compressor, + term, + version + ); + + ActionListener completionListener = ActionListener.wrap( + resp -> latchedActionListener.onResponse(remoteRoutingTableDiff.getUploadedMetadata()), + ex -> latchedActionListener.onFailure( + new RemoteStateTransferException("Exception in writing index routing diff to remote store", ex) + ) + ); + + remoteRoutingTableDiffStore.writeAsync(remoteRoutingTableDiff, completionListener); + } + /** * Combines IndicesRoutingMetadata from previous manifest and current uploaded indices, removes deleted indices. * @param previousManifest previous manifest, used to get all existing indices routing paths @@ -170,6 +203,22 @@ public void getAsyncIndexRoutingReadAction( remoteIndexRoutingTableStore.readAsync(remoteIndexRoutingTable, actionListener); } + @Override + public void getAsyncIndexRoutingTableDiffReadAction( + String clusterUUID, + String uploadedFilename, + LatchedActionListener latchedActionListener + ) { + ActionListener actionListener = ActionListener.wrap( + latchedActionListener::onResponse, + latchedActionListener::onFailure + ); + + RemoteRoutingTableDiff remoteRoutingTableDiff = new RemoteRoutingTableDiff(uploadedFilename, clusterUUID, compressor); + + remoteRoutingTableDiffStore.readAsync(remoteRoutingTableDiff, actionListener); + } + @Override public List getUpdatedIndexRoutingTableMetadata( List updatedIndicesRouting, @@ -211,6 +260,14 @@ protected void doStart() { ThreadPool.Names.REMOTE_STATE_READ, clusterSettings ); + + this.remoteRoutingTableDiffStore = new RemoteClusterStateBlobStore<>( + new BlobStoreTransferService(blobStoreRepository.blobStore(), threadPool), + blobStoreRepository, + clusterName, + threadPool, + ThreadPool.Names.REMOTE_STATE_READ + ); } @Override @@ -226,4 +283,14 @@ public void deleteStaleIndexRoutingPaths(List stalePaths) throws IOExcep throw e; } } + + public void deleteStaleIndexRoutingDiffPaths(List stalePaths) throws IOException { + try { + logger.debug(() -> "Deleting stale index routing diff files from remote - " + stalePaths); + blobStoreRepository.blobStore().blobContainer(BlobPath.cleanPath()).deleteBlobsIgnoringIfNotExists(stalePaths); + } catch (IOException e) { + logger.error(() -> new ParameterizedMessage("Failed to delete some stale index routing diff paths from {}", stalePaths), e); + throw e; + } + } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java index e6e68e01e761f..1ebf3206212a1 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java @@ -9,9 +9,11 @@ package org.opensearch.cluster.routing.remote; import org.opensearch.action.LatchedActionListener; +import org.opensearch.cluster.Diff; import org.opensearch.cluster.DiffableUtils; import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.RoutingTableIncrementalDiff; import org.opensearch.common.lifecycle.AbstractLifecycleComponent; import org.opensearch.gateway.remote.ClusterMetadataManifest; @@ -34,7 +36,12 @@ public DiffableUtils.MapDiff> indexRoutingTableDiff, + LatchedActionListener latchedActionListener + ) { + // noop + } + @Override public List getAllUploadedIndicesRouting( ClusterMetadataManifest previousManifest, @@ -67,6 +85,15 @@ public void getAsyncIndexRoutingReadAction( // noop } + @Override + public void getAsyncIndexRoutingTableDiffReadAction( + String clusterUUID, + String uploadedFilename, + LatchedActionListener latchedActionListener + ) { + // noop + } + @Override public List getUpdatedIndexRoutingTableMetadata( List updatedIndicesRouting, @@ -95,4 +122,8 @@ protected void doClose() throws IOException { public void deleteStaleIndexRoutingPaths(List stalePaths) throws IOException { // noop } + + public void deleteStaleIndexRoutingDiffPaths(List stalePaths) throws IOException { + // noop + } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java index 0b0b4bb7dbc84..0811a5f3010f4 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java @@ -9,15 +9,19 @@ package org.opensearch.cluster.routing.remote; import org.opensearch.action.LatchedActionListener; +import org.opensearch.cluster.Diff; import org.opensearch.cluster.DiffableUtils; import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.RoutingTableIncrementalDiff; import org.opensearch.common.lifecycle.LifecycleComponent; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.gateway.remote.ClusterMetadataManifest; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -27,16 +31,36 @@ * @opensearch.internal */ public interface RemoteRoutingTableService extends LifecycleComponent { - public static final DiffableUtils.NonDiffableValueSerializer CUSTOM_ROUTING_TABLE_VALUE_SERIALIZER = - new DiffableUtils.NonDiffableValueSerializer() { + + public static final DiffableUtils.DiffableValueSerializer CUSTOM_ROUTING_TABLE_DIFFABLE_VALUE_SERIALIZER = + new DiffableUtils.DiffableValueSerializer() { + @Override + public IndexRoutingTable read(StreamInput in, String key) throws IOException { + return IndexRoutingTable.readFrom(in); + } + @Override public void write(IndexRoutingTable value, StreamOutput out) throws IOException { value.writeTo(out); } @Override - public IndexRoutingTable read(StreamInput in, String key) throws IOException { - return IndexRoutingTable.readFrom(in); + public Diff readDiff(StreamInput in, String key) throws IOException { + return IndexRoutingTable.readDiffFrom(in); + } + + @Override + public Diff diff(IndexRoutingTable currentState, IndexRoutingTable previousState) { + List diffs = new ArrayList<>(); + for (Map.Entry entry : currentState.getShards().entrySet()) { + Integer index = entry.getKey(); + IndexShardRoutingTable currentShardRoutingTable = entry.getValue(); + IndexShardRoutingTable previousShardRoutingTable = previousState.shard(index); + if (previousShardRoutingTable == null || !previousShardRoutingTable.equals(currentShardRoutingTable)) { + diffs.add(currentShardRoutingTable); + } + } + return new RoutingTableIncrementalDiff.IndexRoutingTableIncrementalDiff(diffs); } }; @@ -48,6 +72,12 @@ void getAsyncIndexRoutingReadAction( LatchedActionListener latchedActionListener ); + void getAsyncIndexRoutingTableDiffReadAction( + String clusterUUID, + String uploadedFilename, + LatchedActionListener latchedActionListener + ); + List getUpdatedIndexRoutingTableMetadata( List updatedIndicesRouting, List allIndicesRouting @@ -66,6 +96,14 @@ void getAsyncIndexRoutingWriteAction( LatchedActionListener latchedActionListener ); + void getAsyncIndexRoutingDiffWriteAction( + String clusterUUID, + long term, + long version, + Map> indexRoutingTableDiff, + LatchedActionListener latchedActionListener + ); + List getAllUploadedIndicesRouting( ClusterMetadataManifest previousManifest, List indicesRoutingUploaded, @@ -74,4 +112,6 @@ List getAllUploadedIndicesRouting public void deleteStaleIndexRoutingPaths(List stalePaths) throws IOException; + public void deleteStaleIndexRoutingDiffPaths(List stalePaths) throws IOException; + } diff --git a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java index 3a66419b1dc20..71815b6ee324c 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java +++ b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java @@ -44,6 +44,7 @@ public class ClusterMetadataManifest implements Writeable, ToXContentFragment { public static final int CODEC_V2 = 2; // In Codec V2, there are separate metadata files rather than a single global metadata file, // also we introduce index routing-metadata, diff and other attributes as part of manifest // required for state publication + public static final int CODEC_V3 = 3; // In Codec V3, we have introduced new diff field in diff-manifest's routing_table_diff private static final ParseField CLUSTER_TERM_FIELD = new ParseField("cluster_term"); private static final ParseField STATE_VERSION_FIELD = new ParseField("state_version"); @@ -109,6 +110,10 @@ private static ClusterMetadataManifest.Builder manifestV2Builder(Object[] fields .clusterStateCustomMetadataMap(clusterStateCustomMetadata(fields)); } + private static ClusterMetadataManifest.Builder manifestV3Builder(Object[] fields) { + return manifestV2Builder(fields); + } + private static long term(Object[] fields) { return (long) fields[0]; } @@ -226,12 +231,18 @@ private static ClusterStateDiffManifest diffManifest(Object[] fields) { fields -> manifestV2Builder(fields).build() ); - private static final ConstructingObjectParser CURRENT_PARSER = PARSER_V2; + private static final ConstructingObjectParser PARSER_V3 = new ConstructingObjectParser<>( + "cluster_metadata_manifest", + fields -> manifestV3Builder(fields).build() + ); + + private static final ConstructingObjectParser CURRENT_PARSER = PARSER_V3; static { declareParser(PARSER_V0, CODEC_V0); declareParser(PARSER_V1, CODEC_V1); declareParser(PARSER_V2, CODEC_V2); + declareParser(PARSER_V3, CODEC_V3); } private static void declareParser(ConstructingObjectParser parser, long codec_version) { @@ -309,7 +320,7 @@ private static void declareParser(ConstructingObjectParser ClusterStateDiffManifest.fromXContent(p), + (p, c) -> ClusterStateDiffManifest.fromXContent(p, codec_version), DIFF_MANIFEST ); } diff --git a/server/src/main/java/org/opensearch/gateway/remote/ClusterStateDiffManifest.java b/server/src/main/java/org/opensearch/gateway/remote/ClusterStateDiffManifest.java index aca53c92781e4..ab7fa1fddf4bf 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/ClusterStateDiffManifest.java +++ b/server/src/main/java/org/opensearch/gateway/remote/ClusterStateDiffManifest.java @@ -32,8 +32,8 @@ import static org.opensearch.cluster.DiffableUtils.NonDiffableValueSerializer.getAbstractInstance; import static org.opensearch.cluster.DiffableUtils.getStringKeySerializer; -import static org.opensearch.cluster.routing.remote.RemoteRoutingTableService.CUSTOM_ROUTING_TABLE_VALUE_SERIALIZER; import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; +import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V3; /** * Manifest of diff between two cluster states @@ -53,6 +53,7 @@ public class ClusterStateDiffManifest implements ToXContentFragment, Writeable { private static final String METADATA_CUSTOM_DIFF_FIELD = "metadata_custom_diff"; private static final String UPSERTS_FIELD = "upserts"; private static final String DELETES_FIELD = "deletes"; + private static final String DIFF_FIELD = "diff"; private static final String CLUSTER_BLOCKS_UPDATED_FIELD = "cluster_blocks_diff"; private static final String DISCOVERY_NODES_UPDATED_FIELD = "discovery_nodes_diff"; private static final String ROUTING_TABLE_DIFF = "routing_table_diff"; @@ -72,11 +73,17 @@ public class ClusterStateDiffManifest implements ToXContentFragment, Writeable { private final boolean discoveryNodesUpdated; private final List indicesRoutingUpdated; private final List indicesRoutingDeleted; + private String indicesRoutingDiffPath; private final boolean hashesOfConsistentSettingsUpdated; private final List clusterStateCustomUpdated; private final List clusterStateCustomDeleted; - public ClusterStateDiffManifest(ClusterState state, ClusterState previousState) { + public ClusterStateDiffManifest( + ClusterState state, + ClusterState previousState, + DiffableUtils.MapDiff> routingTableIncrementalDiff, + String indicesRoutingDiffPath + ) { fromStateUUID = previousState.stateUUID(); toStateUUID = state.stateUUID(); coordinationMetadataUpdated = !Metadata.isCoordinationMetadataEqual(state.metadata(), previousState.metadata()); @@ -103,17 +110,13 @@ public ClusterStateDiffManifest(ClusterState state, ClusterState previousState) customMetadataUpdated.addAll(customDiff.getUpserts().keySet()); customMetadataDeleted = customDiff.getDeletes(); - DiffableUtils.MapDiff> routingTableDiff = DiffableUtils.diff( - previousState.getRoutingTable().getIndicesRouting(), - state.getRoutingTable().getIndicesRouting(), - DiffableUtils.getStringKeySerializer(), - CUSTOM_ROUTING_TABLE_VALUE_SERIALIZER - ); - indicesRoutingUpdated = new ArrayList<>(); - routingTableDiff.getUpserts().forEach((k, v) -> indicesRoutingUpdated.add(k)); - - indicesRoutingDeleted = routingTableDiff.getDeletes(); + indicesRoutingDeleted = new ArrayList<>(); + this.indicesRoutingDiffPath = indicesRoutingDiffPath; + if (routingTableIncrementalDiff != null) { + routingTableIncrementalDiff.getUpserts().forEach((k, v) -> indicesRoutingUpdated.add(k)); + indicesRoutingDeleted.addAll(routingTableIncrementalDiff.getDeletes()); + } hashesOfConsistentSettingsUpdated = !state.metadata() .hashesOfConsistentSettings() .equals(previousState.metadata().hashesOfConsistentSettings()); @@ -126,6 +129,7 @@ public ClusterStateDiffManifest(ClusterState state, ClusterState previousState) clusterStateCustomUpdated = new ArrayList<>(clusterStateCustomDiff.getDiffs().keySet()); clusterStateCustomUpdated.addAll(clusterStateCustomDiff.getUpserts().keySet()); clusterStateCustomDeleted = clusterStateCustomDiff.getDeletes(); + List indicie1s = indicesRoutingUpdated; } public ClusterStateDiffManifest( @@ -143,6 +147,7 @@ public ClusterStateDiffManifest( boolean discoveryNodesUpdated, List indicesRoutingUpdated, List indicesRoutingDeleted, + String indicesRoutingDiffPath, boolean hashesOfConsistentSettingsUpdated, List clusterStateCustomUpdated, List clusterStateCustomDeleted @@ -164,6 +169,7 @@ public ClusterStateDiffManifest( this.hashesOfConsistentSettingsUpdated = hashesOfConsistentSettingsUpdated; this.clusterStateCustomUpdated = Collections.unmodifiableList(clusterStateCustomUpdated); this.clusterStateCustomDeleted = Collections.unmodifiableList(clusterStateCustomDeleted); + this.indicesRoutingDiffPath = indicesRoutingDiffPath; } public ClusterStateDiffManifest(StreamInput in) throws IOException { @@ -184,6 +190,7 @@ public ClusterStateDiffManifest(StreamInput in) throws IOException { this.hashesOfConsistentSettingsUpdated = in.readBoolean(); this.clusterStateCustomUpdated = in.readStringList(); this.clusterStateCustomDeleted = in.readStringList(); + this.indicesRoutingDiffPath = in.readString(); } @Override @@ -237,6 +244,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.value(index); } builder.endArray(); + if (indicesRoutingDiffPath != null) { + builder.field(DIFF_FIELD, indicesRoutingDiffPath); + } builder.endObject(); builder.startObject(CLUSTER_STATE_CUSTOM_DIFF_FIELD); builder.startArray(UPSERTS_FIELD); @@ -253,7 +263,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return builder; } - public static ClusterStateDiffManifest fromXContent(XContentParser parser) throws IOException { + public static ClusterStateDiffManifest fromXContent(XContentParser parser, long codec_version) throws IOException { Builder builder = new Builder(); if (parser.currentToken() == null) { // fresh parser? move to next token parser.nextToken(); @@ -341,6 +351,11 @@ public static ClusterStateDiffManifest fromXContent(XContentParser parser) throw case DELETES_FIELD: builder.indicesRoutingDeleted(convertListToString(parser.listOrderedMap())); break; + case DIFF_FIELD: + if (codec_version >= CODEC_V3) { + builder.indicesRoutingDiffPath(parser.textOrNull()); + } + break; default: throw new XContentParseException("Unexpected field [" + currentFieldName + "]"); } @@ -456,6 +471,10 @@ public List getIndicesRoutingUpdated() { return indicesRoutingUpdated; } + public String getIndicesRoutingDiffPath() { + return indicesRoutingDiffPath; + } + public List getIndicesRoutingDeleted() { return indicesRoutingDeleted; } @@ -468,6 +487,10 @@ public List getClusterStateCustomDeleted() { return clusterStateCustomDeleted; } + public void setIndicesRoutingDiffPath(String indicesRoutingDiffPath) { + this.indicesRoutingDiffPath = indicesRoutingDiffPath; + } + @Override public boolean equals(Object o) { if (this == o) return true; @@ -489,7 +512,8 @@ public boolean equals(Object o) { && Objects.equals(indicesRoutingUpdated, that.indicesRoutingUpdated) && Objects.equals(indicesRoutingDeleted, that.indicesRoutingDeleted) && Objects.equals(clusterStateCustomUpdated, that.clusterStateCustomUpdated) - && Objects.equals(clusterStateCustomDeleted, that.clusterStateCustomDeleted); + && Objects.equals(clusterStateCustomDeleted, that.clusterStateCustomDeleted) + && Objects.equals(indicesRoutingDiffPath, that.indicesRoutingDiffPath); } @Override @@ -538,6 +562,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeBoolean(hashesOfConsistentSettingsUpdated); out.writeStringCollection(clusterStateCustomUpdated); out.writeStringCollection(clusterStateCustomDeleted); + out.writeString(indicesRoutingDiffPath); } /** @@ -560,6 +585,7 @@ public static class Builder { private boolean discoveryNodesUpdated; private List indicesRoutingUpdated; private List indicesRoutingDeleted; + private String indicesRoutingDiff; private boolean hashesOfConsistentSettingsUpdated; private List clusterStateCustomUpdated; private List clusterStateCustomDeleted; @@ -650,6 +676,11 @@ public Builder indicesRoutingDeleted(List indicesRoutingDeleted) { return this; } + public Builder indicesRoutingDiffPath(String indicesRoutingDiffPath) { + this.indicesRoutingDiff = indicesRoutingDiffPath; + return this; + } + public Builder clusterStateCustomUpdated(List clusterStateCustomUpdated) { this.clusterStateCustomUpdated = clusterStateCustomUpdated; return this; @@ -676,6 +707,7 @@ public ClusterStateDiffManifest build() { discoveryNodesUpdated, indicesRoutingUpdated, indicesRoutingDeleted, + indicesRoutingDiff, hashesOfConsistentSettingsUpdated, clusterStateCustomUpdated, clusterStateCustomDeleted diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManager.java index 99235bc96bfe3..8691187c7fbfa 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManager.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManager.java @@ -179,6 +179,7 @@ void deleteClusterMetadata( Set staleGlobalMetadataPaths = new HashSet<>(); Set staleEphemeralAttributePaths = new HashSet<>(); Set staleIndexRoutingPaths = new HashSet<>(); + Set staleIndexRoutingDiffPaths = new HashSet<>(); activeManifestBlobMetadata.forEach(blobMetadata -> { ClusterMetadataManifest clusterMetadataManifest = remoteManifestManager.fetchRemoteClusterMetadataManifest( clusterName, @@ -222,6 +223,10 @@ void deleteClusterMetadata( clusterMetadataManifest.getIndicesRouting() .forEach(uploadedIndicesRouting -> filesToKeep.add(uploadedIndicesRouting.getUploadedFilename())); } + if (clusterMetadataManifest.getDiffManifest() != null + && clusterMetadataManifest.getDiffManifest().getIndicesRoutingDiffPath() != null) { + filesToKeep.add(clusterMetadataManifest.getDiffManifest().getIndicesRoutingDiffPath()); + } }); staleManifestBlobMetadata.forEach(blobMetadata -> { ClusterMetadataManifest clusterMetadataManifest = remoteManifestManager.fetchRemoteClusterMetadataManifest( @@ -264,6 +269,18 @@ void deleteClusterMetadata( } }); } + if (clusterMetadataManifest.getDiffManifest() != null + && clusterMetadataManifest.getDiffManifest().getIndicesRoutingDiffPath() != null) { + if (!filesToKeep.contains(clusterMetadataManifest.getDiffManifest().getIndicesRoutingDiffPath())) { + staleIndexRoutingDiffPaths.add(clusterMetadataManifest.getDiffManifest().getIndicesRoutingDiffPath()); + logger.debug( + () -> new ParameterizedMessage( + "Indices routing diff paths in stale manifest: {}", + clusterMetadataManifest.getDiffManifest().getIndicesRoutingDiffPath() + ) + ); + } + } clusterMetadataManifest.getIndices().forEach(uploadedIndexMetadata -> { String fileName = RemoteClusterStateUtils.getFormattedIndexFileName(uploadedIndexMetadata.getUploadedFilename()); @@ -316,6 +333,15 @@ void deleteClusterMetadata( ); remoteStateStats.indexRoutingFilesCleanupAttemptFailed(); } + try { + remoteRoutingTableService.deleteStaleIndexRoutingDiffPaths(new ArrayList<>(staleIndexRoutingDiffPaths)); + } catch (IOException e) { + logger.error( + () -> new ParameterizedMessage("Error while deleting stale index routing diff files {}", staleIndexRoutingDiffPaths), + e + ); + remoteStateStats.indicesRoutingDiffFileCleanupAttemptFailed(); + } } catch (IllegalStateException e) { logger.error("Error while fetching Remote Cluster Metadata manifests", e); } catch (IOException e) { diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index b34641f77f607..674279f2251bd 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -14,6 +14,7 @@ import org.opensearch.action.LatchedActionListener; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.Diff; import org.opensearch.cluster.DiffableUtils; import org.opensearch.cluster.block.ClusterBlocks; import org.opensearch.cluster.coordination.CoordinationMetadata; @@ -26,6 +27,7 @@ import org.opensearch.cluster.node.DiscoveryNodes.Builder; import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.RoutingTableIncrementalDiff; import org.opensearch.cluster.routing.remote.RemoteRoutingTableService; import org.opensearch.cluster.routing.remote.RemoteRoutingTableServiceFactory; import org.opensearch.cluster.service.ClusterService; @@ -56,6 +58,7 @@ import org.opensearch.gateway.remote.model.RemoteReadResult; import org.opensearch.gateway.remote.model.RemoteTemplatesMetadata; import org.opensearch.gateway.remote.model.RemoteTransientSettingsMetadata; +import org.opensearch.gateway.remote.routingtable.RemoteRoutingTableDiff; import org.opensearch.index.translog.transfer.BlobStoreTransferService; import org.opensearch.node.Node; import org.opensearch.node.remotestore.RemoteStoreNodeAttribute; @@ -234,13 +237,21 @@ public RemoteClusterStateManifestInfo writeFullMetadata(ClusterState clusterStat isPublicationEnabled, isPublicationEnabled ? clusterState.customs() : Collections.emptyMap(), isPublicationEnabled, - remoteRoutingTableService.getIndicesRouting(clusterState.getRoutingTable()) + remoteRoutingTableService.getIndicesRouting(clusterState.getRoutingTable()), + null + ); + + ClusterStateDiffManifest clusterStateDiffManifest = new ClusterStateDiffManifest( + clusterState, + ClusterState.EMPTY_STATE, + null, + null ); final RemoteClusterStateManifestInfo manifestDetails = remoteManifestManager.uploadManifest( clusterState, uploadedMetadataResults, previousClusterUUID, - new ClusterStateDiffManifest(clusterState, ClusterState.EMPTY_STATE), + clusterStateDiffManifest, false ); @@ -330,10 +341,13 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata( indicesToBeDeletedFromRemote.remove(indexMetadata.getIndex().getName()); } - final DiffableUtils.MapDiff> routingTableDiff = remoteRoutingTableService - .getIndicesRoutingMapDiff(previousClusterState.getRoutingTable(), clusterState.getRoutingTable()); final List indicesRoutingToUpload = new ArrayList<>(); - routingTableDiff.getUpserts().forEach((k, v) -> indicesRoutingToUpload.add(v)); + final DiffableUtils.MapDiff> routingTableIncrementalDiff = + remoteRoutingTableService.getIndicesRoutingMapDiff(previousClusterState.getRoutingTable(), clusterState.getRoutingTable()); + + Map> indexRoutingTableDiffs = routingTableIncrementalDiff.getDiffs(); + routingTableIncrementalDiff.getDiffs().forEach((k, v) -> indicesRoutingToUpload.add(clusterState.getRoutingTable().index(k))); + routingTableIncrementalDiff.getUpserts().forEach((k, v) -> indicesRoutingToUpload.add(v)); UploadedMetadataResults uploadedMetadataResults; // For migration case from codec V0 or V1 to V2, we have added null check on metadata attribute files, @@ -369,7 +383,8 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata( updateTransientSettingsMetadata, clusterStateCustomsDiff.getUpserts(), updateHashesOfConsistentSettings, - indicesRoutingToUpload + indicesRoutingToUpload, + indexRoutingTableDiffs ); // update the map if the metadata was uploaded @@ -411,14 +426,23 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata( uploadedMetadataResults.uploadedIndicesRoutingMetadata = remoteRoutingTableService.getAllUploadedIndicesRouting( previousManifest, uploadedMetadataResults.uploadedIndicesRoutingMetadata, - routingTableDiff.getDeletes() + routingTableIncrementalDiff.getDeletes() + ); + + ClusterStateDiffManifest clusterStateDiffManifest = new ClusterStateDiffManifest( + clusterState, + previousClusterState, + routingTableIncrementalDiff, + uploadedMetadataResults.uploadedIndicesRoutingDiffMetadata != null + ? uploadedMetadataResults.uploadedIndicesRoutingDiffMetadata.getUploadedFilename() + : null ); final RemoteClusterStateManifestInfo manifestDetails = remoteManifestManager.uploadManifest( clusterState, uploadedMetadataResults, previousManifest.getPreviousClusterUUID(), - new ClusterStateDiffManifest(clusterState, previousClusterState), + clusterStateDiffManifest, false ); @@ -488,13 +512,15 @@ UploadedMetadataResults writeMetadataInParallel( boolean uploadTransientSettingMetadata, Map clusterStateCustomToUpload, boolean uploadHashesOfConsistentSettings, - List indicesRoutingToUpload + List indicesRoutingToUpload, + Map> indexRoutingTableDiff ) throws IOException { assert Objects.nonNull(indexMetadataUploadListeners) : "indexMetadataUploadListeners can not be null"; int totalUploadTasks = indexToUpload.size() + indexMetadataUploadListeners.size() + customToUpload.size() + (uploadCoordinationMetadata ? 1 : 0) + (uploadSettingsMetadata ? 1 : 0) + (uploadTemplateMetadata ? 1 : 0) + (uploadDiscoveryNodes ? 1 : 0) + (uploadClusterBlock ? 1 : 0) + (uploadTransientSettingMetadata ? 1 : 0) - + clusterStateCustomToUpload.size() + (uploadHashesOfConsistentSettings ? 1 : 0) + indicesRoutingToUpload.size(); + + clusterStateCustomToUpload.size() + (uploadHashesOfConsistentSettings ? 1 : 0) + indicesRoutingToUpload.size() + + (indexRoutingTableDiff != null && !indexRoutingTableDiff.isEmpty() ? 1 : 0); CountDownLatch latch = new CountDownLatch(totalUploadTasks); List uploadTasks = Collections.synchronizedList(new ArrayList<>(totalUploadTasks)); Map results = new ConcurrentHashMap<>(totalUploadTasks); @@ -664,6 +690,16 @@ UploadedMetadataResults writeMetadataInParallel( listener ); }); + if (indexRoutingTableDiff != null && !indexRoutingTableDiff.isEmpty()) { + uploadTasks.add(RemoteRoutingTableDiff.ROUTING_TABLE_DIFF_FILE); + remoteRoutingTableService.getAsyncIndexRoutingDiffWriteAction( + clusterState.metadata().clusterUUID(), + clusterState.term(), + clusterState.version(), + indexRoutingTableDiff, + listener + ); + } invokeIndexMetadataUploadListeners(indexToUpload, prevIndexMetadataByName, latch, exceptionList); try { @@ -710,6 +746,8 @@ UploadedMetadataResults writeMetadataInParallel( if (uploadedMetadata.getClass().equals(UploadedIndexMetadata.class) && uploadedMetadata.getComponent().contains(INDEX_ROUTING_METADATA_PREFIX)) { response.uploadedIndicesRoutingMetadata.add((UploadedIndexMetadata) uploadedMetadata); + } else if (RemoteRoutingTableDiff.ROUTING_TABLE_DIFF_FILE.equals(name)) { + response.uploadedIndicesRoutingDiffMetadata = (UploadedMetadataAttribute) uploadedMetadata; } else if (name.startsWith(CUSTOM_METADATA)) { // component name for custom metadata will look like custom-- String custom = name.split(DELIMITER)[0].split(CUSTOM_DELIMITER)[1]; @@ -979,16 +1017,18 @@ ClusterState readClusterStateInParallel( List indicesRoutingToRead, boolean readHashesOfConsistentSettings, Map clusterStateCustomToRead, + boolean readIndexRoutingTableDiff, boolean includeEphemeral ) throws IOException { int totalReadTasks = indicesToRead.size() + customToRead.size() + (readCoordinationMetadata ? 1 : 0) + (readSettingsMetadata ? 1 : 0) + (readTemplatesMetadata ? 1 : 0) + (readDiscoveryNodes ? 1 : 0) + (readClusterBlocks ? 1 : 0) + (readTransientSettingsMetadata ? 1 : 0) + (readHashesOfConsistentSettings ? 1 : 0) + clusterStateCustomToRead.size() - + indicesRoutingToRead.size(); + + indicesRoutingToRead.size() + (readIndexRoutingTableDiff ? 1 : 0); CountDownLatch latch = new CountDownLatch(totalReadTasks); List readResults = Collections.synchronizedList(new ArrayList<>()); List readIndexRoutingTableResults = Collections.synchronizedList(new ArrayList<>()); + AtomicReference readIndexRoutingTableDiffResults = new AtomicReference<>(); List exceptionList = Collections.synchronizedList(new ArrayList<>(totalReadTasks)); LatchedActionListener listener = new LatchedActionListener<>(ActionListener.wrap(response -> { @@ -1031,6 +1071,25 @@ ClusterState readClusterStateInParallel( ); } + LatchedActionListener routingTableDiffLatchedActionListener = new LatchedActionListener<>( + ActionListener.wrap(response -> { + logger.debug("Successfully read routing table diff component from remote"); + readIndexRoutingTableDiffResults.set(response); + }, ex -> { + logger.error("Failed to read routing table diff from remote", ex); + exceptionList.add(ex); + }), + latch + ); + + if (readIndexRoutingTableDiff) { + remoteRoutingTableService.getAsyncIndexRoutingTableDiffReadAction( + clusterUUID, + manifest.getDiffManifest().getIndicesRoutingDiffPath(), + routingTableDiffLatchedActionListener + ); + } + for (Map.Entry entry : customToRead.entrySet()) { remoteGlobalMetadataManager.readAsync( entry.getValue().getAttributeName(), @@ -1233,6 +1292,14 @@ ClusterState readClusterStateInParallel( readIndexRoutingTableResults.forEach( indexRoutingTable -> indicesRouting.put(indexRoutingTable.getIndex().getName(), indexRoutingTable) ); + RoutingTableIncrementalDiff routingTableDiff = readIndexRoutingTableDiffResults.get(); + if (routingTableDiff != null) { + routingTableDiff.getDiffs().forEach((key, diff) -> { + IndexRoutingTable previousIndexRoutingTable = indicesRouting.get(key); + IndexRoutingTable updatedTable = diff.apply(previousIndexRoutingTable); + indicesRouting.put(key, updatedTable); + }); + } clusterStateBuilder.routingTable(new RoutingTable(manifest.getRoutingTableVersion(), indicesRouting)); return clusterStateBuilder.build(); @@ -1261,6 +1328,7 @@ public ClusterState getClusterStateForManifest( includeEphemeral ? manifest.getIndicesRouting() : emptyList(), includeEphemeral && manifest.getHashesOfConsistentSettings() != null, includeEphemeral ? manifest.getClusterStateCustomMap() : emptyMap(), + false, includeEphemeral ); } else { @@ -1281,6 +1349,7 @@ public ClusterState getClusterStateForManifest( emptyList(), false, emptyMap(), + false, false ); Metadata.Builder mb = Metadata.builder(remoteGlobalMetadataManager.getGlobalMetadata(manifest.getClusterUUID(), manifest)); @@ -1337,6 +1406,9 @@ public ClusterState getClusterStateUsingDiff(ClusterMetadataManifest manifest, C updatedIndexRouting, diff.isHashesOfConsistentSettingsUpdated(), updatedClusterStateCustom, + manifest.getDiffManifest() != null + && manifest.getDiffManifest().getIndicesRoutingDiffPath() != null + && !manifest.getDiffManifest().getIndicesRoutingDiffPath().isEmpty(), true ); ClusterState.Builder clusterStateBuilder = ClusterState.builder(updatedClusterState); diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateUtils.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateUtils.java index f2b93c3784407..74cb838286961 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateUtils.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateUtils.java @@ -88,6 +88,7 @@ public static class UploadedMetadataResults { ClusterMetadataManifest.UploadedMetadataAttribute uploadedClusterBlocks; List uploadedIndicesRoutingMetadata; ClusterMetadataManifest.UploadedMetadataAttribute uploadedHashesOfConsistentSettings; + ClusterMetadataManifest.UploadedMetadataAttribute uploadedIndicesRoutingDiffMetadata; public UploadedMetadataResults( List uploadedIndexMetadata, diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemotePersistenceStats.java b/server/src/main/java/org/opensearch/gateway/remote/RemotePersistenceStats.java index 36d107a99d258..efd73e11e46b5 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemotePersistenceStats.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemotePersistenceStats.java @@ -20,15 +20,18 @@ public class RemotePersistenceStats extends PersistedStateStats { static final String CLEANUP_ATTEMPT_FAILED_COUNT = "cleanup_attempt_failed_count"; static final String INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT = "index_routing_files_cleanup_attempt_failed_count"; + static final String INDICES_ROUTING_DIFF_FILES_CLEANUP_ATTEMPT_FAILED_COUNT = "indices_routing_diff_files_cleanup_attempt_failed_count"; static final String REMOTE_UPLOAD = "remote_upload"; private AtomicLong cleanupAttemptFailedCount = new AtomicLong(0); private AtomicLong indexRoutingFilesCleanupAttemptFailedCount = new AtomicLong(0); + private AtomicLong indicesRoutingDiffFilesCleanupAttemptFailedCount = new AtomicLong(0); public RemotePersistenceStats() { super(REMOTE_UPLOAD); addToExtendedFields(CLEANUP_ATTEMPT_FAILED_COUNT, cleanupAttemptFailedCount); addToExtendedFields(INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT, indexRoutingFilesCleanupAttemptFailedCount); + addToExtendedFields(INDICES_ROUTING_DIFF_FILES_CLEANUP_ATTEMPT_FAILED_COUNT, indicesRoutingDiffFilesCleanupAttemptFailedCount); } public void cleanUpAttemptFailed() { @@ -46,4 +49,12 @@ public void indexRoutingFilesCleanupAttemptFailed() { public long getIndexRoutingFilesCleanupAttemptFailedCount() { return indexRoutingFilesCleanupAttemptFailedCount.get(); } + + public void indicesRoutingDiffFileCleanupAttemptFailed() { + indexRoutingFilesCleanupAttemptFailedCount.incrementAndGet(); + } + + public long getIndicesRoutingDiffFileCleanupAttemptFailedCount() { + return indexRoutingFilesCleanupAttemptFailedCount.get(); + } } diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterMetadataManifest.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterMetadataManifest.java index 1dc56712d4ab5..acaae3173315a 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterMetadataManifest.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterMetadataManifest.java @@ -35,7 +35,7 @@ public class RemoteClusterMetadataManifest extends AbstractRemoteWritableBlobEnt public static final int SPLITTED_MANIFEST_FILE_LENGTH = 6; public static final String METADATA_MANIFEST_NAME_FORMAT = "%s"; - public static final int MANIFEST_CURRENT_CODEC_VERSION = ClusterMetadataManifest.CODEC_V2; + public static final int MANIFEST_CURRENT_CODEC_VERSION = ClusterMetadataManifest.CODEC_V3; public static final String COMMITTED = "C"; public static final String PUBLISHED = "P"; @@ -50,6 +50,9 @@ public class RemoteClusterMetadataManifest extends AbstractRemoteWritableBlobEnt public static final ChecksumBlobStoreFormat CLUSTER_METADATA_MANIFEST_FORMAT_V1 = new ChecksumBlobStoreFormat<>("cluster-metadata-manifest", METADATA_MANIFEST_NAME_FORMAT, ClusterMetadataManifest::fromXContentV1); + public static final ChecksumBlobStoreFormat CLUSTER_METADATA_MANIFEST_FORMAT_V2 = + new ChecksumBlobStoreFormat<>("cluster-metadata-manifest", METADATA_MANIFEST_NAME_FORMAT, ClusterMetadataManifest::fromXContentV2); + /** * Manifest format compatible with codec v2, where we introduced codec versions/global metadata. */ @@ -149,6 +152,8 @@ private ChecksumBlobStoreFormat getClusterMetadataManif long codecVersion = getManifestCodecVersion(); if (codecVersion == MANIFEST_CURRENT_CODEC_VERSION) { return CLUSTER_METADATA_MANIFEST_FORMAT; + } else if (codecVersion == ClusterMetadataManifest.CODEC_V2) { + return CLUSTER_METADATA_MANIFEST_FORMAT_V2; } else if (codecVersion == ClusterMetadataManifest.CODEC_V1) { return CLUSTER_METADATA_MANIFEST_FORMAT_V1; } else if (codecVersion == ClusterMetadataManifest.CODEC_V0) { diff --git a/server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteRoutingTableDiff.java b/server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteRoutingTableDiff.java new file mode 100644 index 0000000000000..e876d939490d0 --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteRoutingTableDiff.java @@ -0,0 +1,150 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote.routingtable; + +import org.opensearch.cluster.Diff; +import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.cluster.routing.RoutingTableIncrementalDiff; +import org.opensearch.common.io.Streams; +import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity; +import org.opensearch.common.remote.BlobPathParameters; +import org.opensearch.core.compress.Compressor; +import org.opensearch.gateway.remote.ClusterMetadataManifest; +import org.opensearch.index.remote.RemoteStoreUtils; +import org.opensearch.repositories.blobstore.ChecksumWritableBlobStoreFormat; + +import java.io.IOException; +import java.io.InputStream; +import java.util.List; +import java.util.Map; + +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; + +/** + * Represents a incremental difference between {@link org.opensearch.cluster.routing.RoutingTable} objects that can be serialized and deserialized. + * This class is responsible for writing and reading the differences between RoutingTables to and from an input/output stream. + */ +public class RemoteRoutingTableDiff extends AbstractRemoteWritableBlobEntity { + private final RoutingTableIncrementalDiff routingTableIncrementalDiff; + + private long term; + private long version; + + public static final String ROUTING_TABLE_DIFF = "routing-table-diff"; + + public static final String ROUTING_TABLE_DIFF_METADATA_PREFIX = "routingTableDiff--"; + + public static final String ROUTING_TABLE_DIFF_FILE = "routing_table_diff"; + private static final String codec = "RemoteRoutingTableDiff"; + public static final String ROUTING_TABLE_DIFF_PATH_TOKEN = "routing-table-diff"; + + public static final int VERSION = 1; + + public static final ChecksumWritableBlobStoreFormat REMOTE_ROUTING_TABLE_DIFF_FORMAT = + new ChecksumWritableBlobStoreFormat<>(codec, RoutingTableIncrementalDiff::readFrom); + + /** + * Constructs a new RemoteRoutingTableDiff with the given differences. + * + * @param routingTableIncrementalDiff a RoutingTableIncrementalDiff object containing the differences of {@link IndexRoutingTable}. + * @param clusterUUID the cluster UUID. + * @param compressor the compressor to be used. + * @param term the term of the routing table. + * @param version the version of the routing table. + */ + public RemoteRoutingTableDiff( + RoutingTableIncrementalDiff routingTableIncrementalDiff, + String clusterUUID, + Compressor compressor, + long term, + long version + ) { + super(clusterUUID, compressor); + this.routingTableIncrementalDiff = routingTableIncrementalDiff; + this.term = term; + this.version = version; + } + + /** + * Constructs a new RemoteRoutingTableDiff with the given differences. + * + * @param routingTableIncrementalDiff a RoutingTableIncrementalDiff object containing the differences of {@link IndexRoutingTable}. + * @param clusterUUID the cluster UUID. + * @param compressor the compressor to be used. + */ + public RemoteRoutingTableDiff(RoutingTableIncrementalDiff routingTableIncrementalDiff, String clusterUUID, Compressor compressor) { + super(clusterUUID, compressor); + this.routingTableIncrementalDiff = routingTableIncrementalDiff; + } + + /** + * Constructs a new RemoteIndexRoutingTableDiff with the given blob name, cluster UUID, and compressor. + * + * @param blobName the name of the blob. + * @param clusterUUID the cluster UUID. + * @param compressor the compressor to be used. + */ + public RemoteRoutingTableDiff(String blobName, String clusterUUID, Compressor compressor) { + super(clusterUUID, compressor); + this.routingTableIncrementalDiff = null; + this.blobName = blobName; + } + + /** + * Gets the map of differences of {@link IndexRoutingTable}. + * + * @return a map containing the differences. + */ + public Map> getDiffs() { + assert routingTableIncrementalDiff != null; + return routingTableIncrementalDiff.getDiffs(); + } + + @Override + public BlobPathParameters getBlobPathParameters() { + return new BlobPathParameters(List.of(ROUTING_TABLE_DIFF_PATH_TOKEN), ROUTING_TABLE_DIFF_METADATA_PREFIX); + } + + @Override + public String getType() { + return ROUTING_TABLE_DIFF; + } + + @Override + public String generateBlobFileName() { + if (blobFileName == null) { + blobFileName = String.join( + DELIMITER, + getBlobPathParameters().getFilePrefix(), + RemoteStoreUtils.invertLong(term), + RemoteStoreUtils.invertLong(version), + RemoteStoreUtils.invertLong(System.currentTimeMillis()) + ); + } + return blobFileName; + } + + @Override + public ClusterMetadataManifest.UploadedMetadata getUploadedMetadata() { + assert blobName != null; + return new ClusterMetadataManifest.UploadedMetadataAttribute(ROUTING_TABLE_DIFF_FILE, blobName); + } + + @Override + public InputStream serialize() throws IOException { + assert routingTableIncrementalDiff != null; + return REMOTE_ROUTING_TABLE_DIFF_FORMAT.serialize(routingTableIncrementalDiff, generateBlobFileName(), getCompressor()) + .streamInput(); + } + + @Override + public RoutingTableIncrementalDiff deserialize(InputStream in) throws IOException { + return REMOTE_ROUTING_TABLE_DIFF_FORMAT.deserialize(blobName, Streams.readFully(in)); + } +} diff --git a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java index ddbfd5965fc5f..51c02302a232c 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java @@ -12,12 +12,15 @@ import org.opensearch.action.LatchedActionListener; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.Diff; import org.opensearch.cluster.DiffableUtils; import org.opensearch.cluster.coordination.CoordinationMetadata; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.RoutingTableIncrementalDiff; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobPath; @@ -50,8 +53,11 @@ import java.io.IOException; import java.io.InputStream; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; +import java.util.Base64; +import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; @@ -69,6 +75,10 @@ import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_METADATA_PREFIX; import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE; import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE_FORMAT; +import static org.opensearch.gateway.remote.routingtable.RemoteRoutingTableDiff.REMOTE_ROUTING_TABLE_DIFF_FORMAT; +import static org.opensearch.gateway.remote.routingtable.RemoteRoutingTableDiff.ROUTING_TABLE_DIFF_FILE; +import static org.opensearch.gateway.remote.routingtable.RemoteRoutingTableDiff.ROUTING_TABLE_DIFF_METADATA_PREFIX; +import static org.opensearch.gateway.remote.routingtable.RemoteRoutingTableDiff.ROUTING_TABLE_DIFF_PATH_TOKEN; import static org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm.FNV_1A_BASE64; import static org.opensearch.index.remote.RemoteStoreEnums.PathType.HASHED_PREFIX; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY; @@ -281,10 +291,14 @@ public void testGetIndicesRoutingMapDiffShardChanged() { DiffableUtils.MapDiff> diff = remoteRoutingTableService .getIndicesRoutingMapDiff(routingTable, routingTable2); - assertEquals(1, diff.getUpserts().size()); - assertNotNull(diff.getUpserts().get(indexName)); - assertEquals(noOfShards + 1, diff.getUpserts().get(indexName).getShards().size()); - assertEquals(noOfReplicas + 1, diff.getUpserts().get(indexName).getShards().get(0).getSize()); + assertEquals(0, diff.getUpserts().size()); + assertEquals(1, diff.getDiffs().size()); + assertNotNull(diff.getDiffs().get(indexName)); + assertEquals(noOfShards + 1, diff.getDiffs().get(indexName).apply(routingTable.indicesRouting().get(indexName)).shards().size()); + assertEquals( + noOfReplicas + 1, + diff.getDiffs().get(indexName).apply(routingTable.indicesRouting().get(indexName)).getShards().get(0).getSize() + ); assertEquals(0, diff.getDeletes().size()); final IndexMetadata indexMetadata3 = new IndexMetadata.Builder(indexName).settings( @@ -296,11 +310,14 @@ public void testGetIndicesRoutingMapDiffShardChanged() { RoutingTable routingTable3 = RoutingTable.builder().addAsNew(indexMetadata3).build(); diff = remoteRoutingTableService.getIndicesRoutingMapDiff(routingTable2, routingTable3); - assertEquals(1, diff.getUpserts().size()); - assertNotNull(diff.getUpserts().get(indexName)); - assertEquals(noOfShards + 1, diff.getUpserts().get(indexName).getShards().size()); - assertEquals(noOfReplicas + 2, diff.getUpserts().get(indexName).getShards().get(0).getSize()); - + assertEquals(0, diff.getUpserts().size()); + assertEquals(1, diff.getDiffs().size()); + assertNotNull(diff.getDiffs().get(indexName)); + assertEquals(noOfShards + 1, diff.getDiffs().get(indexName).apply(routingTable.indicesRouting().get(indexName)).shards().size()); + assertEquals( + noOfReplicas + 2, + diff.getDiffs().get(indexName).apply(routingTable.indicesRouting().get(indexName)).getShards().get(0).getSize() + ); assertEquals(0, diff.getDeletes().size()); } @@ -320,10 +337,10 @@ public void testGetIndicesRoutingMapDiffShardDetailChanged() { DiffableUtils.MapDiff> diff = remoteRoutingTableService .getIndicesRoutingMapDiff(routingTable, routingTable2); - assertEquals(1, diff.getUpserts().size()); - assertNotNull(diff.getUpserts().get(indexName)); - assertEquals(noOfShards, diff.getUpserts().get(indexName).getShards().size()); - assertEquals(noOfReplicas + 1, diff.getUpserts().get(indexName).getShards().get(0).getSize()); + assertEquals(1, diff.getDiffs().size()); + assertNotNull(diff.getDiffs().get(indexName)); + assertEquals(noOfShards, diff.getDiffs().get(indexName).apply(routingTable.indicesRouting().get(indexName)).shards().size()); + assertEquals(0, diff.getUpserts().size()); assertEquals(0, diff.getDeletes().size()); } @@ -552,6 +569,44 @@ public void testGetAsyncIndexRoutingReadAction() throws Exception { assertEquals(clusterState.getRoutingTable().getIndicesRouting().get(indexName), indexRoutingTable); } + public void testGetAsyncIndexRoutingTableDiffReadAction() throws Exception { + String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); + ClusterState currentState = createClusterState(indexName); + + // Get the IndexRoutingTable from the current state + IndexRoutingTable indexRoutingTable = currentState.routingTable().index(indexName); + Map shardRoutingTables = indexRoutingTable.getShards(); + + RoutingTableIncrementalDiff.IndexRoutingTableIncrementalDiff indexRoutingTableDiff = + new RoutingTableIncrementalDiff.IndexRoutingTableIncrementalDiff(new ArrayList<>(shardRoutingTables.values())); + + // Create the map for RoutingTableIncrementalDiff + Map> diffs = new HashMap<>(); + diffs.put(indexName, indexRoutingTableDiff); + + RoutingTableIncrementalDiff diff = new RoutingTableIncrementalDiff(diffs); + + String uploadedFileName = String.format(Locale.ROOT, "routing-table-diff/" + indexName); + when(blobContainer.readBlob(indexName)).thenReturn( + REMOTE_ROUTING_TABLE_DIFF_FORMAT.serialize(diff, uploadedFileName, compressor).streamInput() + ); + + TestCapturingListener listener = new TestCapturingListener<>(); + CountDownLatch latch = new CountDownLatch(1); + + remoteRoutingTableService.getAsyncIndexRoutingTableDiffReadAction( + "cluster-uuid", + uploadedFileName, + new LatchedActionListener<>(listener, latch) + ); + latch.await(); + + assertNull(listener.getFailure()); + assertNotNull(listener.getResult()); + RoutingTableIncrementalDiff resultDiff = listener.getResult(); + assertEquals(diff.getDiffs().size(), resultDiff.getDiffs().size()); + } + public void testGetAsyncIndexRoutingWriteAction() throws Exception { String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); ClusterState clusterState = createClusterState(indexName); @@ -604,6 +659,68 @@ public void testGetAsyncIndexRoutingWriteAction() throws Exception { assertThat(RemoteStoreUtils.invertLong(fileNameTokens[3]), lessThanOrEqualTo(System.currentTimeMillis())); } + public void testGetAsyncIndexRoutingDiffWriteAction() throws Exception { + String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); + ClusterState currentState = createClusterState(indexName); + + // Get the IndexRoutingTable from the current state + IndexRoutingTable indexRoutingTable = currentState.routingTable().index(indexName); + Map shardRoutingTables = indexRoutingTable.getShards(); + + RoutingTableIncrementalDiff.IndexRoutingTableIncrementalDiff indexRoutingTableDiff = + new RoutingTableIncrementalDiff.IndexRoutingTableIncrementalDiff(new ArrayList<>(shardRoutingTables.values())); + + // Create the map for RoutingTableIncrementalDiff + Map> diffs = new HashMap<>(); + diffs.put(indexName, indexRoutingTableDiff); + + // RoutingTableIncrementalDiff diff = new RoutingTableIncrementalDiff(diffs); + + Iterable remotePath = new BlobPath().add("base-path") + .add( + Base64.getUrlEncoder() + .withoutPadding() + .encodeToString(currentState.getClusterName().value().getBytes(StandardCharsets.UTF_8)) + ) + .add("cluster-state") + .add(currentState.metadata().clusterUUID()) + .add(ROUTING_TABLE_DIFF_PATH_TOKEN); + + doAnswer(invocationOnMock -> { + invocationOnMock.getArgument(4, ActionListener.class).onResponse(null); + return null; + }).when(blobStoreTransferService) + .uploadBlob(any(InputStream.class), eq(remotePath), anyString(), eq(WritePriority.URGENT), any(ActionListener.class)); + + TestCapturingListener listener = new TestCapturingListener<>(); + CountDownLatch latch = new CountDownLatch(1); + + remoteRoutingTableService.getAsyncIndexRoutingDiffWriteAction( + currentState.metadata().clusterUUID(), + currentState.term(), + currentState.version(), + diffs, + new LatchedActionListener<>(listener, latch) + ); + latch.await(); + assertNull(listener.getFailure()); + assertNotNull(listener.getResult()); + ClusterMetadataManifest.UploadedMetadata uploadedMetadata = listener.getResult(); + + assertEquals(ROUTING_TABLE_DIFF_FILE, uploadedMetadata.getComponent()); + String uploadedFileName = uploadedMetadata.getUploadedFilename(); + String[] pathTokens = uploadedFileName.split(PATH_DELIMITER); + assertEquals(6, pathTokens.length); + assertEquals(pathTokens[0], "base-path"); + String[] fileNameTokens = pathTokens[5].split(DELIMITER); + + assertEquals(4, fileNameTokens.length); + assertEquals(ROUTING_TABLE_DIFF_METADATA_PREFIX, fileNameTokens[0]); + assertEquals(RemoteStoreUtils.invertLong(1L), fileNameTokens[1]); + assertEquals(RemoteStoreUtils.invertLong(2L), fileNameTokens[2]); + assertThat(RemoteStoreUtils.invertLong(fileNameTokens[3]), lessThanOrEqualTo(System.currentTimeMillis())); + } + public void testGetUpdatedIndexRoutingTableMetadataWhenNoChange() { List updatedIndicesRouting = new ArrayList<>(); List indicesRouting = randomUploadedIndexMetadataList(); @@ -687,4 +804,26 @@ public void testDeleteStaleIndexRoutingPathsThrowsIOException() throws IOExcepti verify(blobContainer).deleteBlobsIgnoringIfNotExists(stalePaths); } + public void testDeleteStaleIndexRoutingDiffPaths() throws IOException { + doNothing().when(blobContainer).deleteBlobsIgnoringIfNotExists(any()); + when(blobStore.blobContainer(any())).thenReturn(blobContainer); + List stalePaths = Arrays.asList("path1", "path2"); + remoteRoutingTableService.doStart(); + remoteRoutingTableService.deleteStaleIndexRoutingDiffPaths(stalePaths); + verify(blobContainer).deleteBlobsIgnoringIfNotExists(stalePaths); + } + + public void testDeleteStaleIndexRoutingDiffPathsThrowsIOException() throws IOException { + when(blobStore.blobContainer(any())).thenReturn(blobContainer); + List stalePaths = Arrays.asList("path1", "path2"); + // Simulate an IOException + doThrow(new IOException("test exception")).when(blobContainer).deleteBlobsIgnoringIfNotExists(Mockito.anyList()); + + remoteRoutingTableService.doStart(); + IOException thrown = assertThrows(IOException.class, () -> { + remoteRoutingTableService.deleteStaleIndexRoutingDiffPaths(stalePaths); + }); + assertEquals("test exception", thrown.getMessage()); + verify(blobContainer).deleteBlobsIgnoringIfNotExists(stalePaths); + } } diff --git a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java index 256161af1a3e2..8a6dd6bc96e72 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java @@ -10,9 +10,11 @@ import org.opensearch.Version; import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.DiffableUtils; import org.opensearch.cluster.metadata.IndexGraveyard; import org.opensearch.cluster.metadata.RepositoriesMetadata; import org.opensearch.cluster.metadata.WeightedRoutingMetadata; +import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.common.xcontent.json.JsonXContent; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; @@ -29,9 +31,12 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.function.Function; import java.util.stream.Collectors; +import org.mockito.Mockito; + import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V0; import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V1; import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.CLUSTER_BLOCKS; @@ -157,7 +162,7 @@ public void testClusterMetadataManifestSerializationEqualsHashCode() { .opensearchVersion(Version.CURRENT) .nodeId("B10RX1f5RJenMQvYccCgSQ") .committed(true) - .codecVersion(ClusterMetadataManifest.CODEC_V2) + .codecVersion(ClusterMetadataManifest.CODEC_V3) .indices(randomUploadedIndexMetadataList()) .previousClusterUUID("yfObdx8KSMKKrXf8UyHhM") .clusterUUIDCommitted(true) @@ -191,7 +196,9 @@ public void testClusterMetadataManifestSerializationEqualsHashCode() { .diffManifest( new ClusterStateDiffManifest( RemoteClusterStateServiceTests.generateClusterStateWithOneIndex().build(), - ClusterState.EMPTY_STATE + ClusterState.EMPTY_STATE, + null, + "indicesRoutingDiffPath" ) ) .build(); @@ -523,7 +530,75 @@ public void testClusterMetadataManifestXContentV2() throws IOException { .diffManifest( new ClusterStateDiffManifest( RemoteClusterStateServiceTests.generateClusterStateWithOneIndex().build(), - ClusterState.EMPTY_STATE + ClusterState.EMPTY_STATE, + null, + null + ) + ) + .build(); + final XContentBuilder builder = JsonXContent.contentBuilder(); + builder.startObject(); + originalManifest.toXContent(builder, ToXContent.EMPTY_PARAMS); + builder.endObject(); + + try (XContentParser parser = createParser(JsonXContent.jsonXContent, BytesReference.bytes(builder))) { + final ClusterMetadataManifest fromXContentManifest = ClusterMetadataManifest.fromXContent(parser); + assertEquals(originalManifest, fromXContentManifest); + } + } + + public void testClusterMetadataManifestXContentV3() throws IOException { + UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "test-uuid", "/test/upload/path"); + UploadedMetadataAttribute uploadedMetadataAttribute = new UploadedMetadataAttribute("attribute_name", "testing_attribute"); + final DiffableUtils.MapDiff> routingTableIncrementalDiff = Mockito.mock( + DiffableUtils.MapDiff.class + ); + ClusterMetadataManifest originalManifest = ClusterMetadataManifest.builder() + .clusterTerm(1L) + .stateVersion(1L) + .clusterUUID("test-cluster-uuid") + .stateUUID("test-state-uuid") + .opensearchVersion(Version.CURRENT) + .nodeId("test-node-id") + .committed(false) + .codecVersion(ClusterMetadataManifest.CODEC_V3) + .indices(Collections.singletonList(uploadedIndexMetadata)) + .previousClusterUUID("prev-cluster-uuid") + .clusterUUIDCommitted(true) + .coordinationMetadata(uploadedMetadataAttribute) + .settingMetadata(uploadedMetadataAttribute) + .templatesMetadata(uploadedMetadataAttribute) + .customMetadataMap( + Collections.unmodifiableList( + Arrays.asList( + new UploadedMetadataAttribute( + CUSTOM_METADATA + CUSTOM_DELIMITER + RepositoriesMetadata.TYPE, + "custom--repositories-file" + ), + new UploadedMetadataAttribute( + CUSTOM_METADATA + CUSTOM_DELIMITER + IndexGraveyard.TYPE, + "custom--index_graveyard-file" + ), + new UploadedMetadataAttribute( + CUSTOM_METADATA + CUSTOM_DELIMITER + WeightedRoutingMetadata.TYPE, + "custom--weighted_routing_netadata-file" + ) + ) + ).stream().collect(Collectors.toMap(UploadedMetadataAttribute::getAttributeName, Function.identity())) + ) + .routingTableVersion(1L) + .indicesRouting(Collections.singletonList(uploadedIndexMetadata)) + .discoveryNodesMetadata(uploadedMetadataAttribute) + .clusterBlocksMetadata(uploadedMetadataAttribute) + .transientSettingsMetadata(uploadedMetadataAttribute) + .hashesOfConsistentSettings(uploadedMetadataAttribute) + .clusterStateCustomMetadataMap(Collections.emptyMap()) + .diffManifest( + new ClusterStateDiffManifest( + RemoteClusterStateServiceTests.generateClusterStateWithOneIndex().build(), + ClusterState.EMPTY_STATE, + routingTableIncrementalDiff, + uploadedMetadataAttribute.getUploadedFilename() ) ) .build(); diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerTests.java index ec7e3c1ce81d3..b86f23f3d37aa 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerTests.java @@ -50,6 +50,7 @@ import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V1; import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V2; +import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V3; import static org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata; import static org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute; import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.AsyncStaleFileDeletion; @@ -296,6 +297,74 @@ public void testDeleteClusterMetadata() throws IOException { verify(remoteRoutingTableService).deleteStaleIndexRoutingPaths(List.of(index3Metadata.getUploadedFilename())); } + public void testDeleteStaleIndicesRoutingDiffFile() throws IOException { + String clusterUUID = "clusterUUID"; + String clusterName = "test-cluster"; + List inactiveBlobs = Arrays.asList(new PlainBlobMetadata("manifest1.dat", 1L)); + List activeBlobs = Arrays.asList(new PlainBlobMetadata("manifest2.dat", 1L)); + + UploadedMetadataAttribute coordinationMetadata = new UploadedMetadataAttribute(COORDINATION_METADATA, "coordination_metadata"); + UploadedMetadataAttribute templateMetadata = new UploadedMetadataAttribute(TEMPLATES_METADATA, "template_metadata"); + UploadedMetadataAttribute settingMetadata = new UploadedMetadataAttribute(SETTING_METADATA, "settings_metadata"); + UploadedMetadataAttribute coordinationMetadataUpdated = new UploadedMetadataAttribute( + COORDINATION_METADATA, + "coordination_metadata_updated" + ); + + UploadedIndexMetadata index1Metadata = new UploadedIndexMetadata("index1", "indexUUID1", "index_metadata1__2"); + UploadedIndexMetadata index2Metadata = new UploadedIndexMetadata("index2", "indexUUID2", "index_metadata2__2"); + List indicesRouting1 = List.of(index1Metadata); + List indicesRouting2 = List.of(index2Metadata); + ClusterStateDiffManifest diffManifest1 = ClusterStateDiffManifest.builder().indicesRoutingDiffPath("index1RoutingDiffPath").build(); + ClusterStateDiffManifest diffManifest2 = ClusterStateDiffManifest.builder().indicesRoutingDiffPath("index2RoutingDiffPath").build(); + + ClusterMetadataManifest manifest1 = ClusterMetadataManifest.builder() + .indices(List.of(index1Metadata)) + .coordinationMetadata(coordinationMetadataUpdated) + .templatesMetadata(templateMetadata) + .settingMetadata(settingMetadata) + .clusterTerm(1L) + .stateVersion(1L) + .codecVersion(CODEC_V3) + .stateUUID(randomAlphaOfLength(10)) + .clusterUUID(clusterUUID) + .nodeId("nodeA") + .opensearchVersion(VersionUtils.randomOpenSearchVersion(random())) + .previousClusterUUID(ClusterState.UNKNOWN_UUID) + .committed(true) + .routingTableVersion(0L) + .indicesRouting(indicesRouting1) + .diffManifest(diffManifest1) + .build(); + ClusterMetadataManifest manifest2 = ClusterMetadataManifest.builder(manifest1) + .indices(List.of(index2Metadata)) + .indicesRouting(indicesRouting2) + .diffManifest(diffManifest2) + .build(); + + BlobContainer blobContainer = mock(BlobContainer.class); + doThrow(IOException.class).when(blobContainer).delete(); + when(blobStore.blobContainer(any())).thenReturn(blobContainer); + BlobPath blobPath = new BlobPath().add("random-path"); + when((blobStoreRepository.basePath())).thenReturn(blobPath); + remoteClusterStateCleanupManager.start(); + when(remoteManifestManager.getManifestFolderPath(eq(clusterName), eq(clusterUUID))).thenReturn( + new BlobPath().add(encodeString(clusterName)).add(CLUSTER_STATE_PATH_TOKEN).add(clusterUUID).add(MANIFEST) + ); + when(remoteManifestManager.fetchRemoteClusterMetadataManifest(eq(clusterName), eq(clusterUUID), any())).thenReturn( + manifest2, + manifest1 + ); + remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager( + remoteClusterStateService, + clusterService, + remoteRoutingTableService + ); + remoteClusterStateCleanupManager.start(); + remoteClusterStateCleanupManager.deleteClusterMetadata(clusterName, clusterUUID, activeBlobs, inactiveBlobs); + verify(remoteRoutingTableService).deleteStaleIndexRoutingDiffPaths(List.of("index1RoutingDiffPath")); + } + public void testDeleteClusterMetadataNoOpsRoutingTableService() throws IOException { String clusterUUID = "clusterUUID"; String clusterName = "test-cluster"; @@ -515,6 +584,83 @@ public void testIndexRoutingFilesCleanupFailureStats() throws Exception { }); } + public void testIndicesRoutingDiffFilesCleanupFailureStats() throws Exception { + String clusterUUID = "clusterUUID"; + String clusterName = "test-cluster"; + List inactiveBlobs = Arrays.asList(new PlainBlobMetadata("manifest1.dat", 1L)); + List activeBlobs = Arrays.asList(new PlainBlobMetadata("manifest2.dat", 1L)); + + UploadedMetadataAttribute coordinationMetadata = new UploadedMetadataAttribute(COORDINATION_METADATA, "coordination_metadata"); + UploadedMetadataAttribute templateMetadata = new UploadedMetadataAttribute(TEMPLATES_METADATA, "template_metadata"); + UploadedMetadataAttribute settingMetadata = new UploadedMetadataAttribute(SETTING_METADATA, "settings_metadata"); + UploadedMetadataAttribute coordinationMetadataUpdated = new UploadedMetadataAttribute( + COORDINATION_METADATA, + "coordination_metadata_updated" + ); + + UploadedIndexMetadata index1Metadata = new UploadedIndexMetadata("index1", "indexUUID1", "index_metadata1__2"); + UploadedIndexMetadata index2Metadata = new UploadedIndexMetadata("index2", "indexUUID2", "index_metadata2__2"); + List indicesRouting1 = List.of(index1Metadata); + List indicesRouting2 = List.of(index2Metadata); + ClusterStateDiffManifest diffManifest1 = ClusterStateDiffManifest.builder().indicesRoutingDiffPath("index1RoutingDiffPath").build(); + ClusterStateDiffManifest diffManifest2 = ClusterStateDiffManifest.builder().indicesRoutingDiffPath("index2RoutingDiffPath").build(); + + ClusterMetadataManifest manifest1 = ClusterMetadataManifest.builder() + .indices(List.of(index1Metadata)) + .coordinationMetadata(coordinationMetadataUpdated) + .templatesMetadata(templateMetadata) + .settingMetadata(settingMetadata) + .clusterTerm(1L) + .stateVersion(1L) + .codecVersion(CODEC_V3) + .stateUUID(randomAlphaOfLength(10)) + .clusterUUID(clusterUUID) + .nodeId("nodeA") + .opensearchVersion(VersionUtils.randomOpenSearchVersion(random())) + .previousClusterUUID(ClusterState.UNKNOWN_UUID) + .committed(true) + .routingTableVersion(0L) + .indicesRouting(indicesRouting1) + .diffManifest(diffManifest1) + .build(); + ClusterMetadataManifest manifest2 = ClusterMetadataManifest.builder(manifest1) + .indices(List.of(index2Metadata)) + .indicesRouting(indicesRouting2) + .diffManifest(diffManifest2) + .build(); + + BlobContainer blobContainer = mock(BlobContainer.class); + doThrow(IOException.class).when(blobContainer).delete(); + when(blobStore.blobContainer(any())).thenReturn(blobContainer); + + BlobPath blobPath = new BlobPath().add("random-path"); + when((blobStoreRepository.basePath())).thenReturn(blobPath); + remoteClusterStateCleanupManager.start(); + when(remoteManifestManager.getManifestFolderPath(eq(clusterName), eq(clusterUUID))).thenReturn( + new BlobPath().add(encodeString(clusterName)).add(CLUSTER_STATE_PATH_TOKEN).add(clusterUUID).add(MANIFEST) + ); + when(remoteManifestManager.fetchRemoteClusterMetadataManifest(eq(clusterName), eq(clusterUUID), any())).thenReturn( + manifest1, + manifest2 + ); + doNothing().when(remoteRoutingTableService).deleteStaleIndexRoutingDiffPaths(any()); + + remoteClusterStateCleanupManager.deleteClusterMetadata(clusterName, clusterUUID, activeBlobs, inactiveBlobs); + assertBusy(() -> { + // wait for stats to get updated + assertNotNull(remoteClusterStateCleanupManager.getStats()); + assertEquals(0, remoteClusterStateCleanupManager.getStats().getIndicesRoutingDiffFileCleanupAttemptFailedCount()); + }); + + doThrow(IOException.class).when(remoteRoutingTableService).deleteStaleIndexRoutingPaths(any()); + remoteClusterStateCleanupManager.deleteClusterMetadata(clusterName, clusterUUID, activeBlobs, inactiveBlobs); + assertBusy(() -> { + // wait for stats to get updated + assertNotNull(remoteClusterStateCleanupManager.getStats()); + assertEquals(1, remoteClusterStateCleanupManager.getStats().getIndicesRoutingDiffFileCleanupAttemptFailedCount()); + }); + } + public void testSingleConcurrentExecutionOfStaleManifestCleanup() throws Exception { BlobContainer blobContainer = mock(BlobContainer.class); when(blobStore.blobContainer(any())).thenReturn(blobContainer); diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index 6c764585c48e7..59ca62dff2aa7 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -535,14 +535,15 @@ public void testTimeoutWhileWritingManifestFile() throws IOException { anyBoolean(), anyMap(), anyBoolean(), - anyList() + anyList(), + anyMap() ) ).thenReturn(new RemoteClusterStateUtils.UploadedMetadataResults()); RemoteStateTransferException ex = expectThrows( RemoteStateTransferException.class, () -> spiedService.writeFullMetadata(clusterState, randomAlphaOfLength(10)) ); - assertTrue(ex.getMessage().contains("Timed out waiting for transfer of manifest file to complete")); + assertTrue(ex.getMessage().contains("Timed out waiting for transfer of following metadata to complete")); } public void testWriteFullMetadataInParallelFailureForIndexMetadata() throws IOException { @@ -634,7 +635,8 @@ public void testWriteMetadataInParallelIncompleteUpload() throws IOException { true, clusterState.getCustoms(), true, - emptyList() + emptyList(), + null ) ); assertTrue(exception.getMessage().startsWith("Some metadata components were not uploaded successfully")); @@ -684,7 +686,8 @@ public void testWriteIncrementalMetadataSuccess() throws IOException { eq(false), eq(Collections.emptyMap()), eq(false), - eq(Collections.emptyList()) + eq(Collections.emptyList()), + eq(Collections.emptyMap()) ); assertThat(manifestInfo.getManifestFileName(), notNullValue()); @@ -764,7 +767,8 @@ public void testWriteIncrementalMetadataSuccessWhenPublicationEnabled() throws I eq(false), eq(Collections.emptyMap()), eq(true), - Mockito.anyList() + anyList(), + eq(Collections.emptyMap()) ); assertThat(manifestInfo.getManifestFileName(), notNullValue()); @@ -811,7 +815,8 @@ public void testTimeoutWhileWritingMetadata() throws IOException { true, emptyMap(), true, - emptyList() + emptyList(), + null ) ); assertTrue(exception.getMessage().startsWith("Timed out waiting for transfer of following metadata to complete")); @@ -862,6 +867,7 @@ public void testGetClusterStateForManifest_IncludeEphemeral() throws IOException eq(manifest.getIndicesRouting()), eq(true), eq(manifest.getClusterStateCustomMap()), + eq(false), eq(true) ); } @@ -911,7 +917,9 @@ public void testGetClusterStateForManifest_ExcludeEphemeral() throws IOException eq(emptyList()), eq(false), eq(emptyMap()), + eq(false), eq(false) + ); } @@ -958,6 +966,7 @@ public void testGetClusterStateFromManifest_CodecV1() throws IOException { eq(emptyList()), eq(false), eq(emptyMap()), + eq(false), eq(false) ); verify(mockedGlobalMetadataManager, times(1)).getGlobalMetadata(eq(manifest.getClusterUUID()), eq(manifest)); @@ -1281,6 +1290,7 @@ public void testReadClusterStateInParallel_TimedOut() throws IOException { emptyList(), true, emptyMap(), + false, true ) ); @@ -1312,6 +1322,7 @@ public void testReadClusterStateInParallel_ExceptionDuringRead() throws IOExcept emptyList(), true, emptyMap(), + false, true ) ); @@ -1418,6 +1429,7 @@ public void testReadClusterStateInParallel_UnexpectedResult() throws IOException emptyList(), true, newClusterStateCustoms, + false, true ) ); @@ -1652,6 +1664,7 @@ public void testReadClusterStateInParallel_Success() throws IOException { emptyList(), true, newClusterStateCustoms, + false, true ); @@ -2745,6 +2758,108 @@ public void testWriteIncrementalMetadataSuccessWithRoutingTable() throws IOExcep assertThat(manifest.getIndicesRouting().get(0).getUploadedFilename(), notNullValue()); } + public void testWriteIncrementalMetadataSuccessWithRoutingTableDiff() throws IOException { + initializeRoutingTable(); + final ClusterState clusterState = generateClusterStateWithOneIndex("test-index", 5, 1, false).nodes( + nodesWithLocalNodeClusterManager() + ).build(); + mockBlobStoreObjects(); + List indices = new ArrayList<>(); + final UploadedIndexMetadata uploadedIndiceRoutingMetadata = new UploadedIndexMetadata( + "test-index", + "index-uuid", + "routing-filename", + INDEX_ROUTING_METADATA_PREFIX + ); + indices.add(uploadedIndiceRoutingMetadata); + final ClusterState previousClusterState = generateClusterStateWithOneIndex("test-index", 5, 1, true).nodes( + nodesWithLocalNodeClusterManager() + ).build(); + + final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder().indices(indices).build(); + when((blobStoreRepository.basePath())).thenReturn(BlobPath.cleanPath().add("base-path")); + + remoteClusterStateService.start(); + final ClusterMetadataManifest manifest = remoteClusterStateService.writeIncrementalMetadata( + previousClusterState, + clusterState, + previousManifest + ).getClusterMetadataManifest(); + final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "index-uuid", "metadata-filename"); + final ClusterMetadataManifest expectedManifest = ClusterMetadataManifest.builder() + .indices(List.of(uploadedIndexMetadata)) + .clusterTerm(clusterState.term()) + .stateVersion(1L) + .stateUUID("state-uuid") + .clusterUUID("cluster-uuid") + .previousClusterUUID("prev-cluster-uuid") + .routingTableVersion(1) + .indicesRouting(List.of(uploadedIndiceRoutingMetadata)) + .build(); + + assertThat(manifest.getIndices().size(), is(1)); + assertThat(manifest.getClusterTerm(), is(expectedManifest.getClusterTerm())); + assertThat(manifest.getStateVersion(), is(expectedManifest.getStateVersion())); + assertThat(manifest.getClusterUUID(), is(expectedManifest.getClusterUUID())); + assertThat(manifest.getStateUUID(), is(expectedManifest.getStateUUID())); + assertThat(manifest.getRoutingTableVersion(), is(expectedManifest.getRoutingTableVersion())); + assertThat(manifest.getIndicesRouting().get(0).getIndexName(), is(uploadedIndiceRoutingMetadata.getIndexName())); + assertThat(manifest.getIndicesRouting().get(0).getIndexUUID(), is(uploadedIndiceRoutingMetadata.getIndexUUID())); + assertThat(manifest.getIndicesRouting().get(0).getUploadedFilename(), notNullValue()); + assertThat(manifest.getDiffManifest().getIndicesRoutingDiffPath(), notNullValue()); + } + + public void testWriteIncrementalMetadataSuccessWithRoutingTableDiffNull() throws IOException { + initializeRoutingTable(); + final ClusterState clusterState = generateClusterStateWithOneIndex("test-index", 5, 1, false).nodes( + nodesWithLocalNodeClusterManager() + ).build(); + mockBlobStoreObjects(); + List indices = new ArrayList<>(); + final UploadedIndexMetadata uploadedIndiceRoutingMetadata = new UploadedIndexMetadata( + "test-index", + "index-uuid", + "routing-filename", + INDEX_ROUTING_METADATA_PREFIX + ); + indices.add(uploadedIndiceRoutingMetadata); + final ClusterState previousClusterState = generateClusterStateWithOneIndex("test-index2", 5, 1, false).nodes( + nodesWithLocalNodeClusterManager() + ).build(); + + final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder().indices(indices).build(); + when((blobStoreRepository.basePath())).thenReturn(BlobPath.cleanPath().add("base-path")); + + remoteClusterStateService.start(); + final ClusterMetadataManifest manifest = remoteClusterStateService.writeIncrementalMetadata( + previousClusterState, + clusterState, + previousManifest + ).getClusterMetadataManifest(); + final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "index-uuid", "metadata-filename"); + final ClusterMetadataManifest expectedManifest = ClusterMetadataManifest.builder() + .indices(List.of(uploadedIndexMetadata)) + .clusterTerm(clusterState.term()) + .stateVersion(1L) + .stateUUID("state-uuid") + .clusterUUID("cluster-uuid") + .previousClusterUUID("prev-cluster-uuid") + .routingTableVersion(1) + .indicesRouting(List.of(uploadedIndiceRoutingMetadata)) + .build(); + + assertThat(manifest.getIndices().size(), is(1)); + assertThat(manifest.getClusterTerm(), is(expectedManifest.getClusterTerm())); + assertThat(manifest.getStateVersion(), is(expectedManifest.getStateVersion())); + assertThat(manifest.getClusterUUID(), is(expectedManifest.getClusterUUID())); + assertThat(manifest.getStateUUID(), is(expectedManifest.getStateUUID())); + assertThat(manifest.getRoutingTableVersion(), is(expectedManifest.getRoutingTableVersion())); + assertThat(manifest.getIndicesRouting().get(0).getIndexName(), is(uploadedIndiceRoutingMetadata.getIndexName())); + assertThat(manifest.getIndicesRouting().get(0).getIndexUUID(), is(uploadedIndiceRoutingMetadata.getIndexUUID())); + assertThat(manifest.getIndicesRouting().get(0).getUploadedFilename(), notNullValue()); + assertThat(manifest.getDiffManifest().getIndicesRoutingDiffPath(), nullValue()); + } + private void initializeRoutingTable() { Settings newSettings = Settings.builder() .put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, "routing_repository") @@ -3217,6 +3332,54 @@ static ClusterState.Builder generateClusterStateWithOneIndex() { .routingTable(RoutingTable.builder().addAsNew(indexMetadata).version(1L).build()); } + public static ClusterState.Builder generateClusterStateWithOneIndex( + String indexName, + int primaryShards, + int replicaShards, + boolean addAsNew + ) { + + final Index index = new Index(indexName, "index-uuid"); + final Settings idxSettings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, index.getUUID()) + .build(); + final IndexMetadata indexMetadata = new IndexMetadata.Builder(index.getName()).settings(idxSettings) + .numberOfShards(primaryShards) + .numberOfReplicas(replicaShards) + .build(); + final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder().term(1L).build(); + final Settings settings = Settings.builder().put("mock-settings", true).build(); + final TemplatesMetadata templatesMetadata = TemplatesMetadata.builder() + .put(IndexTemplateMetadata.builder("template1").settings(idxSettings).patterns(List.of("test*")).build()) + .build(); + final CustomMetadata1 customMetadata1 = new CustomMetadata1("custom-metadata-1"); + + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + if (addAsNew) { + routingTableBuilder.addAsNew(indexMetadata); + } else { + routingTableBuilder.addAsRecovery(indexMetadata); + } + + return ClusterState.builder(ClusterName.DEFAULT) + .version(1L) + .stateUUID("state-uuid") + .metadata( + Metadata.builder() + .version(randomNonNegativeLong()) + .put(indexMetadata, true) + .clusterUUID("cluster-uuid") + .coordinationMetadata(coordinationMetadata) + .persistentSettings(settings) + .templates(templatesMetadata) + .hashesOfConsistentSettings(Map.of("key1", "value1", "key2", "value2")) + .putCustom(customMetadata1.getWriteableName(), customMetadata1) + .build() + ) + .routingTable(routingTableBuilder.version(1L).build()); + } + static ClusterState.Builder generateClusterStateWithAllAttributes() { final Index index = new Index("test-index", "index-uuid"); final Settings idxSettings = Settings.builder() @@ -3296,7 +3459,7 @@ static ClusterMetadataManifest.Builder generateClusterMetadataManifestWithAllAtt ); } - static DiscoveryNodes nodesWithLocalNodeClusterManager() { + public static DiscoveryNodes nodesWithLocalNodeClusterManager() { final DiscoveryNode localNode = new DiscoveryNode("cluster-manager-id", buildNewFakeTransportAddress(), Version.CURRENT); return DiscoveryNodes.builder().clusterManagerNodeId("cluster-manager-id").localNodeId("cluster-manager-id").add(localNode).build(); } diff --git a/server/src/test/java/org/opensearch/gateway/remote/model/ClusterStateDiffManifestTests.java b/server/src/test/java/org/opensearch/gateway/remote/model/ClusterStateDiffManifestTests.java index 897b2f5eeb25d..f89619a09cd52 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/model/ClusterStateDiffManifestTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/model/ClusterStateDiffManifestTests.java @@ -10,6 +10,7 @@ import org.opensearch.Version; import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.DiffableUtils; import org.opensearch.cluster.coordination.CoordinationMetadata; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.IndexTemplateMetadata; @@ -17,6 +18,7 @@ import org.opensearch.cluster.metadata.TemplatesMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.common.settings.Settings; import org.opensearch.common.xcontent.json.JsonXContent; import org.opensearch.core.common.bytes.BytesReference; @@ -40,7 +42,11 @@ import static java.util.stream.Collectors.toList; import static org.opensearch.Version.CURRENT; import static org.opensearch.cluster.ClusterState.EMPTY_STATE; +import static org.opensearch.cluster.routing.remote.RemoteRoutingTableService.CUSTOM_ROUTING_TABLE_DIFFABLE_VALUE_SERIALIZER; import static org.opensearch.core.common.transport.TransportAddress.META_ADDRESS; +import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V3; +import static org.opensearch.gateway.remote.RemoteClusterStateServiceTests.generateClusterStateWithOneIndex; +import static org.opensearch.gateway.remote.RemoteClusterStateServiceTests.nodesWithLocalNodeClusterManager; import static org.opensearch.gateway.remote.model.RemoteClusterBlocksTests.randomClusterBlocks; public class ClusterStateDiffManifestTests extends OpenSearchTestCase { @@ -114,11 +120,70 @@ public void testClusterStateDiffManifestXContent() throws IOException { diffManifest.toXContent(builder, ToXContent.EMPTY_PARAMS); builder.endObject(); try (XContentParser parser = createParser(JsonXContent.jsonXContent, BytesReference.bytes(builder))) { - final ClusterStateDiffManifest parsedManifest = ClusterStateDiffManifest.fromXContent(parser); + final ClusterStateDiffManifest parsedManifest = ClusterStateDiffManifest.fromXContent(parser, CODEC_V3); assertEquals(diffManifest, parsedManifest); } } + public void testClusterStateWithRoutingTableDiffInDiffManifestXContent() throws IOException { + ClusterState initialState = generateClusterStateWithOneIndex("test-index", 5, 1, true).nodes(nodesWithLocalNodeClusterManager()) + .build(); + + ClusterState updatedState = generateClusterStateWithOneIndex("test-index", 5, 2, false).nodes(nodesWithLocalNodeClusterManager()) + .build(); + + ClusterStateDiffManifest diffManifest = verifyRoutingTableDiffManifest(initialState, updatedState); + final XContentBuilder builder = JsonXContent.contentBuilder(); + builder.startObject(); + diffManifest.toXContent(builder, ToXContent.EMPTY_PARAMS); + builder.endObject(); + try (XContentParser parser = createParser(JsonXContent.jsonXContent, BytesReference.bytes(builder))) { + final ClusterStateDiffManifest parsedManifest = ClusterStateDiffManifest.fromXContent(parser, CODEC_V3); + assertEquals(diffManifest, parsedManifest); + } + } + + public void testClusterStateWithRoutingTableDiffInDiffManifestXContent1() throws IOException { + ClusterState initialState = generateClusterStateWithOneIndex("test-index", 5, 1, true).nodes(nodesWithLocalNodeClusterManager()) + .build(); + + ClusterState updatedState = generateClusterStateWithOneIndex("test-index-1", 5, 2, false).nodes(nodesWithLocalNodeClusterManager()) + .build(); + + ClusterStateDiffManifest diffManifest = verifyRoutingTableDiffManifest(initialState, updatedState); + final XContentBuilder builder = JsonXContent.contentBuilder(); + builder.startObject(); + diffManifest.toXContent(builder, ToXContent.EMPTY_PARAMS); + builder.endObject(); + try (XContentParser parser = createParser(JsonXContent.jsonXContent, BytesReference.bytes(builder))) { + final ClusterStateDiffManifest parsedManifest = ClusterStateDiffManifest.fromXContent(parser, CODEC_V3); + assertEquals(diffManifest, parsedManifest); + } + } + + private ClusterStateDiffManifest verifyRoutingTableDiffManifest(ClusterState previousState, ClusterState currentState) { + // Create initial and updated IndexRoutingTable maps + Map initialRoutingTableMap = previousState.getRoutingTable().indicesRouting(); + Map updatedRoutingTableMap = currentState.getRoutingTable().indicesRouting(); + + DiffableUtils.MapDiff> routingTableIncrementalDiff = DiffableUtils.diff( + initialRoutingTableMap, + updatedRoutingTableMap, + DiffableUtils.getStringKeySerializer(), + CUSTOM_ROUTING_TABLE_DIFFABLE_VALUE_SERIALIZER + ); + ClusterStateDiffManifest manifest = new ClusterStateDiffManifest( + currentState, + previousState, + routingTableIncrementalDiff, + "indicesRoutingDiffPath" + ); + assertEquals("indicesRoutingDiffPath", manifest.getIndicesRoutingDiffPath()); + assertEquals(routingTableIncrementalDiff.getUpserts().size(), manifest.getIndicesRoutingUpdated().size()); + assertEquals(routingTableIncrementalDiff.getDeletes().size(), manifest.getIndicesRoutingDeleted().size()); + return manifest; + } + private ClusterStateDiffManifest updateAndVerifyState( ClusterState initialState, List indicesToAdd, @@ -191,7 +256,7 @@ private ClusterStateDiffManifest updateAndVerifyState( } ClusterState updatedClusterState = clusterStateBuilder.metadata(metadataBuilder.build()).build(); - ClusterStateDiffManifest manifest = new ClusterStateDiffManifest(updatedClusterState, initialState); + ClusterStateDiffManifest manifest = new ClusterStateDiffManifest(updatedClusterState, initialState, null, null); assertEquals(indicesToAdd.stream().map(im -> im.getIndex().getName()).collect(toList()), manifest.getIndicesUpdated()); assertEquals(indicesToRemove, manifest.getIndicesDeleted()); assertEquals(new ArrayList<>(customsToAdd.keySet()), manifest.getCustomMetadataUpdated()); diff --git a/server/src/test/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTableDiffTests.java b/server/src/test/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTableDiffTests.java new file mode 100644 index 0000000000000..6ffa7fc5cded8 --- /dev/null +++ b/server/src/test/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTableDiffTests.java @@ -0,0 +1,317 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote.routingtable; + +import org.opensearch.Version; +import org.opensearch.cluster.Diff; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.RoutingTableIncrementalDiff; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.compress.DeflateCompressor; +import org.opensearch.common.remote.BlobPathParameters; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.common.io.stream.NamedWriteableRegistry; +import org.opensearch.core.compress.Compressor; +import org.opensearch.core.compress.NoneCompressor; +import org.opensearch.gateway.remote.ClusterMetadataManifest; +import org.opensearch.index.remote.RemoteStoreUtils; +import org.opensearch.index.translog.transfer.BlobStoreTransferService; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.junit.After; +import org.junit.Before; + +import java.io.IOException; +import java.io.InputStream; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.opensearch.gateway.remote.routingtable.RemoteRoutingTableDiff.ROUTING_TABLE_DIFF_FILE; +import static org.opensearch.gateway.remote.routingtable.RemoteRoutingTableDiff.ROUTING_TABLE_DIFF_METADATA_PREFIX; +import static org.opensearch.gateway.remote.routingtable.RemoteRoutingTableDiff.ROUTING_TABLE_DIFF_PATH_TOKEN; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.nullValue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class RemoteIndexRoutingTableDiffTests extends OpenSearchTestCase { + + private static final String TEST_BLOB_NAME = "/test-path/test-blob-name"; + private static final String TEST_BLOB_PATH = "test-path"; + private static final String TEST_BLOB_FILE_NAME = "test-blob-name"; + private static final long STATE_VERSION = 3L; + private static final long STATE_TERM = 2L; + private String clusterUUID; + private BlobStoreRepository blobStoreRepository; + private BlobStoreTransferService blobStoreTransferService; + private ClusterSettings clusterSettings; + private Compressor compressor; + + private String clusterName; + private NamedWriteableRegistry namedWriteableRegistry; + private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); + + @Before + public void setup() { + clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + this.clusterUUID = "test-cluster-uuid"; + this.blobStoreTransferService = mock(BlobStoreTransferService.class); + this.blobStoreRepository = mock(BlobStoreRepository.class); + BlobPath blobPath = new BlobPath().add("/path"); + when(blobStoreRepository.basePath()).thenReturn(blobPath); + when(blobStoreRepository.getCompressor()).thenReturn(new DeflateCompressor()); + compressor = new NoneCompressor(); + namedWriteableRegistry = writableRegistry(); + this.clusterName = "test-cluster-name"; + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + threadPool.shutdown(); + } + + public void testClusterUUID() { + Map> diffs = new HashMap<>(); + String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); + int numberOfShards = randomIntBetween(1, 10); + int numberOfReplicas = randomIntBetween(1, 10); + + IndexMetadata indexMetadata = IndexMetadata.builder(indexName) + .settings(settings(Version.CURRENT)) + .numberOfShards(numberOfShards) + .numberOfReplicas(numberOfReplicas) + .build(); + + IndexRoutingTable indexRoutingTable = IndexRoutingTable.builder(indexMetadata.getIndex()).initializeAsNew(indexMetadata).build(); + + diffs.put(indexName, indexRoutingTable.diff(indexRoutingTable)); + + RoutingTableIncrementalDiff routingTableIncrementalDiff = new RoutingTableIncrementalDiff(diffs); + + RemoteRoutingTableDiff remoteDiffForUpload = new RemoteRoutingTableDiff( + routingTableIncrementalDiff, + clusterUUID, + compressor, + STATE_TERM, + STATE_VERSION + ); + assertEquals(remoteDiffForUpload.clusterUUID(), clusterUUID); + + RemoteRoutingTableDiff remoteDiffForDownload = new RemoteRoutingTableDiff(TEST_BLOB_NAME, clusterUUID, compressor); + assertEquals(remoteDiffForDownload.clusterUUID(), clusterUUID); + } + + public void testFullBlobName() { + Map> diffs = new HashMap<>(); + String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); + int numberOfShards = randomIntBetween(1, 10); + int numberOfReplicas = randomIntBetween(1, 10); + + IndexMetadata indexMetadata = IndexMetadata.builder(indexName) + .settings(settings(Version.CURRENT)) + .numberOfShards(numberOfShards) + .numberOfReplicas(numberOfReplicas) + .build(); + + IndexRoutingTable indexRoutingTable = IndexRoutingTable.builder(indexMetadata.getIndex()).initializeAsNew(indexMetadata).build(); + + diffs.put(indexName, indexRoutingTable.diff(indexRoutingTable)); + RoutingTableIncrementalDiff routingTableIncrementalDiff = new RoutingTableIncrementalDiff(diffs); + + RemoteRoutingTableDiff remoteDiffForUpload = new RemoteRoutingTableDiff( + routingTableIncrementalDiff, + clusterUUID, + compressor, + STATE_TERM, + STATE_VERSION + ); + assertThat(remoteDiffForUpload.getFullBlobName(), nullValue()); + + RemoteRoutingTableDiff remoteDiffForDownload = new RemoteRoutingTableDiff(TEST_BLOB_NAME, clusterUUID, compressor); + assertThat(remoteDiffForDownload.getFullBlobName(), is(TEST_BLOB_NAME)); + } + + public void testBlobFileName() { + Map> diffs = new HashMap<>(); + String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); + int numberOfShards = randomIntBetween(1, 10); + int numberOfReplicas = randomIntBetween(1, 10); + + IndexMetadata indexMetadata = IndexMetadata.builder(indexName) + .settings(settings(Version.CURRENT)) + .numberOfShards(numberOfShards) + .numberOfReplicas(numberOfReplicas) + .build(); + + IndexRoutingTable indexRoutingTable = IndexRoutingTable.builder(indexMetadata.getIndex()).initializeAsNew(indexMetadata).build(); + + diffs.put(indexName, indexRoutingTable.diff(indexRoutingTable)); + RoutingTableIncrementalDiff routingTableIncrementalDiff = new RoutingTableIncrementalDiff(diffs); + + RemoteRoutingTableDiff remoteDiffForUpload = new RemoteRoutingTableDiff( + routingTableIncrementalDiff, + clusterUUID, + compressor, + STATE_TERM, + STATE_VERSION + ); + assertThat(remoteDiffForUpload.getBlobFileName(), nullValue()); + + RemoteRoutingTableDiff remoteDiffForDownload = new RemoteRoutingTableDiff(TEST_BLOB_NAME, clusterUUID, compressor); + assertThat(remoteDiffForDownload.getBlobFileName(), is(TEST_BLOB_FILE_NAME)); + } + + public void testBlobPathParameters() { + Map> diffs = new HashMap<>(); + String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); + int numberOfShards = randomIntBetween(1, 10); + int numberOfReplicas = randomIntBetween(1, 10); + + IndexMetadata indexMetadata = IndexMetadata.builder(indexName) + .settings(settings(Version.CURRENT)) + .numberOfShards(numberOfShards) + .numberOfReplicas(numberOfReplicas) + .build(); + + IndexRoutingTable indexRoutingTable = IndexRoutingTable.builder(indexMetadata.getIndex()).initializeAsNew(indexMetadata).build(); + + diffs.put(indexName, indexRoutingTable.diff(indexRoutingTable)); + RoutingTableIncrementalDiff routingTableIncrementalDiff = new RoutingTableIncrementalDiff(diffs); + + RemoteRoutingTableDiff remoteDiffForUpload = new RemoteRoutingTableDiff( + routingTableIncrementalDiff, + clusterUUID, + compressor, + STATE_TERM, + STATE_VERSION + ); + assertThat(remoteDiffForUpload.getBlobFileName(), nullValue()); + + BlobPathParameters params = remoteDiffForUpload.getBlobPathParameters(); + assertThat(params.getPathTokens(), is(List.of(ROUTING_TABLE_DIFF_PATH_TOKEN))); + String expectedPrefix = ROUTING_TABLE_DIFF_METADATA_PREFIX; + assertThat(params.getFilePrefix(), is(expectedPrefix)); + } + + public void testGenerateBlobFileName() { + Map> diffs = new HashMap<>(); + String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); + int numberOfShards = randomIntBetween(1, 10); + int numberOfReplicas = randomIntBetween(1, 10); + + IndexMetadata indexMetadata = IndexMetadata.builder(indexName) + .settings(settings(Version.CURRENT)) + .numberOfShards(numberOfShards) + .numberOfReplicas(numberOfReplicas) + .build(); + + IndexRoutingTable indexRoutingTable = IndexRoutingTable.builder(indexMetadata.getIndex()).initializeAsNew(indexMetadata).build(); + + diffs.put(indexName, indexRoutingTable.diff(indexRoutingTable)); + RoutingTableIncrementalDiff routingTableIncrementalDiff = new RoutingTableIncrementalDiff(diffs); + + RemoteRoutingTableDiff remoteDiffForUpload = new RemoteRoutingTableDiff( + routingTableIncrementalDiff, + clusterUUID, + compressor, + STATE_TERM, + STATE_VERSION + ); + + String blobFileName = remoteDiffForUpload.generateBlobFileName(); + String[] nameTokens = blobFileName.split("__"); + assertEquals(ROUTING_TABLE_DIFF_METADATA_PREFIX, nameTokens[0]); + assertEquals(RemoteStoreUtils.invertLong(STATE_TERM), nameTokens[1]); + assertEquals(RemoteStoreUtils.invertLong(STATE_VERSION), nameTokens[2]); + assertThat(RemoteStoreUtils.invertLong(nameTokens[3]), lessThanOrEqualTo(System.currentTimeMillis())); + } + + public void testGetUploadedMetadata() throws IOException { + Map> diffs = new HashMap<>(); + String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); + int numberOfShards = randomIntBetween(1, 10); + int numberOfReplicas = randomIntBetween(1, 10); + + IndexMetadata indexMetadata = IndexMetadata.builder(indexName) + .settings(settings(Version.CURRENT)) + .numberOfShards(numberOfShards) + .numberOfReplicas(numberOfReplicas) + .build(); + + IndexRoutingTable indexRoutingTable = IndexRoutingTable.builder(indexMetadata.getIndex()).initializeAsNew(indexMetadata).build(); + + diffs.put(indexName, indexRoutingTable.diff(indexRoutingTable)); + RoutingTableIncrementalDiff routingTableIncrementalDiff = new RoutingTableIncrementalDiff(diffs); + + RemoteRoutingTableDiff remoteDiffForUpload = new RemoteRoutingTableDiff( + routingTableIncrementalDiff, + clusterUUID, + compressor, + STATE_TERM, + STATE_VERSION + ); + + remoteDiffForUpload.setFullBlobName(new BlobPath().add(TEST_BLOB_PATH)); + ClusterMetadataManifest.UploadedMetadata uploadedMetadataAttribute = remoteDiffForUpload.getUploadedMetadata(); + assertEquals(ROUTING_TABLE_DIFF_FILE, uploadedMetadataAttribute.getComponent()); + } + + public void testStreamOperations() throws IOException { + String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); + int numberOfShards = randomIntBetween(1, 10); + int numberOfReplicas = randomIntBetween(1, 10); + + Metadata metadata = Metadata.builder() + .put( + IndexMetadata.builder(indexName) + .settings(settings(Version.CURRENT)) + .numberOfShards(numberOfShards) + .numberOfReplicas(numberOfReplicas) + ) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index(indexName)).build(); + Map> diffs = new HashMap<>(); + + initialRoutingTable.getIndicesRouting().values().forEach(indexRoutingTable -> { + diffs.put(indexName, indexRoutingTable.diff(indexRoutingTable)); + RoutingTableIncrementalDiff routingTableIncrementalDiff = new RoutingTableIncrementalDiff(diffs); + + RemoteRoutingTableDiff remoteDiffForUpload = new RemoteRoutingTableDiff( + routingTableIncrementalDiff, + clusterUUID, + compressor, + STATE_TERM, + STATE_VERSION + ); + + assertThrows(AssertionError.class, remoteDiffForUpload::getUploadedMetadata); + + try (InputStream inputStream = remoteDiffForUpload.serialize()) { + remoteDiffForUpload.setFullBlobName(BlobPath.cleanPath()); + assertThat(inputStream.available(), greaterThan(0)); + + routingTableIncrementalDiff = remoteDiffForUpload.deserialize(inputStream); + assertEquals(remoteDiffForUpload.getDiffs().size(), routingTableIncrementalDiff.getDiffs().size()); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } +}