From b2aab1b36d952784f56886ce2d55fbde01b814ed Mon Sep 17 00:00:00 2001 From: Sooraj Sinha Date: Fri, 14 Jun 2024 10:31:48 +0530 Subject: [PATCH 1/7] Simplify updated customs logic Signed-off-by: Sooraj Sinha --- .../RemoteClusterStateAttributesManager.java | 44 ++-- .../remote/RemoteClusterStateService.java | 90 ++++---- .../remote/RemoteGlobalMetadataManager.java | 58 +++-- ...oteClusterStateAttributesManagerTests.java | 186 +++++++++++++++- .../RemoteClusterStateServiceTests.java | 62 +++++- .../RemoteGlobalMetadataManagerTests.java | 209 ++++++++++++++++++ 6 files changed, 558 insertions(+), 91 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java index b052b6e1a613d..12d5daa482cd4 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java @@ -10,6 +10,8 @@ import org.opensearch.action.LatchedActionListener; import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.DiffableUtils; +import org.opensearch.cluster.DiffableUtils.NonDiffableValueSerializer; import org.opensearch.common.CheckedRunnable; import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity; import org.opensearch.common.remote.RemoteWritableEntityStore; @@ -25,10 +27,9 @@ import org.opensearch.threadpool.ThreadPool; import java.io.IOException; +import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.Map; -import java.util.Set; /** * A Manager which provides APIs to upload and download attributes of ClusterState to the {@link RemoteClusterStateBlobStore} @@ -126,18 +127,35 @@ public CheckedRunnable getAsyncMetadataReadAction( return () -> getStore(blobEntity).readAsync(blobEntity, actionListener); } - public Map getUpdatedCustoms(ClusterState clusterState, ClusterState previousClusterState) { - Map updatedCustoms = new HashMap<>(); - Set currentCustoms = new HashSet<>(clusterState.customs().keySet()); - for (Map.Entry entry : previousClusterState.customs().entrySet()) { - if (currentCustoms.contains(entry.getKey()) && !entry.getValue().equals(clusterState.customs().get(entry.getKey()))) { - updatedCustoms.put(entry.getKey(), clusterState.customs().get(entry.getKey())); - } - currentCustoms.remove(entry.getKey()); + public DiffableUtils.MapDiff> getUpdatedCustoms( + ClusterState clusterState, + ClusterState previousClusterState, + boolean includeEphemeral, + boolean firstUploadForSplitGlobalMetadata + ) { + if (!includeEphemeral) { + // When includeEphemeral is false, we do not want store any custom objects + return DiffableUtils.diff( + Collections.emptyMap(), + Collections.emptyMap(), + DiffableUtils.getStringKeySerializer(), + NonDiffableValueSerializer.getAbstractInstance() + ); } - for (String custom : currentCustoms) { - updatedCustoms.put(custom, clusterState.customs().get(custom)); + if (firstUploadForSplitGlobalMetadata) { + // For first split global metadata upload, we want to upload all customs + return DiffableUtils.diff( + Collections.emptyMap(), + clusterState.customs(), + DiffableUtils.getStringKeySerializer(), + NonDiffableValueSerializer.getAbstractInstance() + ); } - return updatedCustoms; + return DiffableUtils.diff( + previousClusterState.customs(), + clusterState.customs(), + DiffableUtils.getStringKeySerializer(), + NonDiffableValueSerializer.getAbstractInstance() + ); } } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index ada29fdb57c57..d062a41a416fb 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -39,6 +39,7 @@ import org.opensearch.common.settings.Setting.Property; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; @@ -88,6 +89,7 @@ import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; +import static org.opensearch.common.util.FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL; import static org.opensearch.gateway.PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD; import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V2; import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.CLUSTER_BLOCKS; @@ -159,6 +161,7 @@ public class RemoteClusterStateService implements Closeable { private final String METADATA_UPDATE_LOG_STRING = "wrote metadata for [{}] indices and skipped [{}] unchanged " + "indices, coordination metadata updated : [{}], settings metadata updated : [{}], templates metadata " + "updated : [{}], custom metadata updated : [{}], indices routing updated : [{}]"; + private final boolean isPublicationEnabled; // ToXContent Params with gateway mode. // We are using gateway context mode to persist all custom metadata. @@ -201,6 +204,9 @@ public RemoteClusterStateService( threadPool ); this.remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(this, clusterService, remoteRoutingTableService); + this.isPublicationEnabled = FeatureFlags.isEnabled(REMOTE_PUBLICATION_EXPERIMENTAL) + && RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled(settings) + && RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled(settings); } /** @@ -221,15 +227,15 @@ public RemoteClusterStateManifestInfo writeFullMetadata(ClusterState clusterStat clusterState, new ArrayList<>(clusterState.metadata().indices().values()), emptyMap(), - clusterState.metadata().customs(), + RemoteGlobalMetadataManager.filterCustoms(clusterState.metadata().customs(), isPublicationEnabled), true, true, true, - true, - true, - true, - clusterState.customs(), - true, + isPublicationEnabled, + isPublicationEnabled, + isPublicationEnabled, + isPublicationEnabled ? clusterState.customs() : Collections.emptyMap(), + isPublicationEnabled, remoteRoutingTableService.getIndicesRouting(clusterState.getRoutingTable()) ); final RemoteClusterStateManifestInfo manifestDetails = remoteManifestManager.uploadManifest( @@ -285,28 +291,22 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata( } assert previousClusterState.metadata().coordinationMetadata().term() == clusterState.metadata().coordinationMetadata().term(); - final Map customsToBeDeletedFromRemote = new HashMap<>(previousManifest.getCustomMetadataMap()); - final Map customsToUpload = remoteGlobalMetadataManager.getUpdatedCustoms( - clusterState, - previousClusterState - ); - final Map clusterStateCustomsToBeDeleted = new HashMap<>( + boolean firstUploadForSplitGlobalMetadata = !previousManifest.hasMetadataAttributesFiles(); + + final DiffableUtils.MapDiff> customsDiff = remoteGlobalMetadataManager + .getCustomsDiff(clusterState, previousClusterState, isPublicationEnabled, firstUploadForSplitGlobalMetadata); + final DiffableUtils.MapDiff> clusterStateCustomsDiff = + remoteClusterStateAttributesManager.getUpdatedCustoms( + clusterState, + previousClusterState, + isPublicationEnabled, + firstUploadForSplitGlobalMetadata + ); + final Map allUploadedCustomMap = new HashMap<>(previousManifest.getCustomMetadataMap()); + final Map allUploadedClusterStateCustomsMap = new HashMap<>( previousManifest.getClusterStateCustomMap() ); - final Map clusterStateCustomsToUpload = remoteClusterStateAttributesManager.getUpdatedCustoms( - clusterState, - previousClusterState - ); - final Map allUploadedCustomMap = new HashMap<>(previousManifest.getCustomMetadataMap()); - for (final String custom : clusterState.metadata().customs().keySet()) { - // remove all the customs which are present currently - customsToBeDeletedFromRemote.remove(custom); - } final Map indicesToBeDeletedFromRemote = new HashMap<>(previousClusterState.metadata().indices()); - for (final String custom : clusterState.customs().keySet()) { - // remove all the custom which are present currently - clusterStateCustomsToBeDeleted.remove(custom); - } int numIndicesUpdated = 0; int numIndicesUnchanged = 0; final Map allUploadedIndexMetadata = previousManifest.getIndices() @@ -337,15 +337,14 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata( indicesToBeDeletedFromRemote.remove(indexMetadata.getIndex().getName()); } - DiffableUtils.MapDiff> routingTableDiff = remoteRoutingTableService + final DiffableUtils.MapDiff> routingTableDiff = remoteRoutingTableService .getIndicesRoutingMapDiff(previousClusterState.getRoutingTable(), clusterState.getRoutingTable()); - List indicesRoutingToUpload = new ArrayList<>(); + final List indicesRoutingToUpload = new ArrayList<>(); routingTableDiff.getUpserts().forEach((k, v) -> indicesRoutingToUpload.add(v)); UploadedMetadataResults uploadedMetadataResults; // For migration case from codec V0 or V1 to V2, we have added null check on metadata attribute files, // If file is empty and codec is 1 then write global metadata. - boolean firstUploadForSplitGlobalMetadata = !previousManifest.hasMetadataAttributesFiles(); boolean updateCoordinationMetadata = firstUploadForSplitGlobalMetadata || Metadata.isCoordinationMetadataEqual(previousClusterState.metadata(), clusterState.metadata()) == false; ; @@ -355,24 +354,25 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata( || Metadata.isTransientSettingsMetadataEqual(previousClusterState.metadata(), clusterState.metadata()) == false; boolean updateTemplatesMetadata = firstUploadForSplitGlobalMetadata || Metadata.isTemplatesMetadataEqual(previousClusterState.metadata(), clusterState.metadata()) == false; - // ToDo: check if these needs to be updated or not - final boolean updateDiscoveryNodes = clusterState.getNodes().delta(previousClusterState.getNodes()).hasChanges(); - final boolean updateClusterBlocks = !clusterState.blocks().equals(previousClusterState.blocks()); - final boolean updateHashesOfConsistentSettings = firstUploadForSplitGlobalMetadata + + final boolean updateDiscoveryNodes = isPublicationEnabled + && clusterState.getNodes().delta(previousClusterState.getNodes()).hasChanges(); + final boolean updateClusterBlocks = isPublicationEnabled && !clusterState.blocks().equals(previousClusterState.blocks()); + final boolean updateHashesOfConsistentSettings = isPublicationEnabled && firstUploadForSplitGlobalMetadata || Metadata.isHashesOfConsistentSettingsEqual(previousClusterState.metadata(), clusterState.metadata()) == false; uploadedMetadataResults = writeMetadataInParallel( clusterState, toUpload, prevIndexMetadataByName, - firstUploadForSplitGlobalMetadata ? clusterState.metadata().customs() : customsToUpload, + customsDiff.getUpserts(), updateCoordinationMetadata, updateSettingsMetadata, updateTemplatesMetadata, updateDiscoveryNodes, updateClusterBlocks, updateTransientSettingsMetadata, - clusterStateCustomsToUpload, + clusterStateCustomsDiff.getUpserts(), updateHashesOfConsistentSettings, indicesRoutingToUpload ); @@ -382,10 +382,11 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata( uploadedIndexMetadata -> allUploadedIndexMetadata.put(uploadedIndexMetadata.getIndexName(), uploadedIndexMetadata) ); allUploadedCustomMap.putAll(uploadedMetadataResults.uploadedCustomMetadataMap); + allUploadedClusterStateCustomsMap.putAll(uploadedMetadataResults.uploadedClusterStateCustomMetadataMap); // remove the data for removed custom/indices - customsToBeDeletedFromRemote.keySet().forEach(allUploadedCustomMap::remove); + customsDiff.getDeletes().forEach(allUploadedCustomMap::remove); indicesToBeDeletedFromRemote.keySet().forEach(allUploadedIndexMetadata::remove); - clusterStateCustomsToBeDeleted.keySet().forEach(allUploadedCustomMap::remove); + clusterStateCustomsDiff.getDeletes().forEach(allUploadedClusterStateCustomsMap::remove); if (!updateCoordinationMetadata) { uploadedMetadataResults.uploadedCoordinationMetadata = previousManifest.getCoordinationMetadata(); @@ -408,22 +409,15 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata( if (!updateHashesOfConsistentSettings && !firstUploadForSplitGlobalMetadata) { uploadedMetadataResults.uploadedHashesOfConsistentSettings = previousManifest.getHashesOfConsistentSettings(); } - if (!firstUploadForSplitGlobalMetadata && customsToUpload.isEmpty()) { - uploadedMetadataResults.uploadedCustomMetadataMap = previousManifest.getCustomMetadataMap(); - } - if (!firstUploadForSplitGlobalMetadata && clusterStateCustomsToUpload.isEmpty()) { - uploadedMetadataResults.uploadedClusterStateCustomMetadataMap = previousManifest.getClusterStateCustomMap(); - } uploadedMetadataResults.uploadedCustomMetadataMap = allUploadedCustomMap; + uploadedMetadataResults.uploadedClusterStateCustomMetadataMap = allUploadedClusterStateCustomsMap; uploadedMetadataResults.uploadedIndexMetadata = new ArrayList<>(allUploadedIndexMetadata.values()); - List allUploadedIndicesRouting = new ArrayList<>(); - allUploadedIndicesRouting = remoteRoutingTableService.getAllUploadedIndicesRouting( + uploadedMetadataResults.uploadedIndicesRoutingMetadata = remoteRoutingTableService.getAllUploadedIndicesRouting( previousManifest, uploadedMetadataResults.uploadedIndicesRoutingMetadata, routingTableDiff.getDeletes() ); - uploadedMetadataResults.uploadedIndicesRoutingMetadata = allUploadedIndicesRouting; final RemoteClusterStateManifestInfo manifestDetails = remoteManifestManager.uploadManifest( clusterState, @@ -448,7 +442,7 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata( updateCoordinationMetadata, updateSettingsMetadata, updateTemplatesMetadata, - customsToUpload.size(), + customsDiff.getUpserts().size(), indicesRoutingToUpload.size() ); if (durationMillis >= slowWriteLoggingThreshold.getMillis()) { @@ -464,7 +458,7 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata( updateCoordinationMetadata, updateSettingsMetadata, updateTemplatesMetadata, - customsToUpload.size() + customsDiff.getUpserts().size() ); } else { logger.info("{}; {}", clusterStateUploadTimeMessage, metadataUpdateMessage); @@ -479,7 +473,7 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata( updateCoordinationMetadata, updateSettingsMetadata, updateTemplatesMetadata, - customsToUpload.size() + customsDiff.getUpserts().size() ); } return manifestDetails; diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManager.java index 3053095368972..c1ff54d13e9db 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManager.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManager.java @@ -10,9 +10,12 @@ import org.opensearch.action.LatchedActionListener; import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.DiffableUtils; +import org.opensearch.cluster.DiffableUtils.NonDiffableValueSerializer; import org.opensearch.cluster.coordination.CoordinationMetadata; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.Metadata.Custom; +import org.opensearch.cluster.metadata.Metadata.XContentContext; import org.opensearch.cluster.metadata.TemplatesMetadata; import org.opensearch.common.CheckedRunnable; import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity; @@ -39,11 +42,12 @@ import org.opensearch.threadpool.ThreadPool; import java.io.IOException; +import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.Locale; import java.util.Map; -import java.util.Set; +import java.util.Map.Entry; +import java.util.stream.Collectors; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.METADATA_NAME_FORMAT; @@ -276,29 +280,37 @@ Metadata getGlobalMetadata(String clusterUUID, ClusterMetadataManifest clusterMe } } - Map getUpdatedCustoms(ClusterState currentState, ClusterState previousState) { - if (Metadata.isCustomMetadataEqual(previousState.metadata(), currentState.metadata())) { - return new HashMap<>(); - } - Map updatedCustom = new HashMap<>(); - Set currentCustoms = new HashSet<>(currentState.metadata().customs().keySet()); - for (Map.Entry cursor : previousState.metadata().customs().entrySet()) { - if (cursor.getValue().context().contains(Metadata.XContentContext.GATEWAY)) { - if (currentCustoms.contains(cursor.getKey()) - && !cursor.getValue().equals(currentState.metadata().custom(cursor.getKey()))) { - // If the custom metadata is updated, we need to upload the new version. - updatedCustom.put(cursor.getKey(), currentState.metadata().custom(cursor.getKey())); - } - currentCustoms.remove(cursor.getKey()); - } + DiffableUtils.MapDiff> getCustomsDiff( + ClusterState currentState, + ClusterState previousState, + boolean firstUploadForSplitGlobalMetadata, + boolean includeEphemeral + ) { + if (firstUploadForSplitGlobalMetadata) { + // For first split global metadata upload, we want to upload all customs + return DiffableUtils.diff( + Collections.emptyMap(), + filterCustoms(currentState.metadata().customs(), includeEphemeral), + DiffableUtils.getStringKeySerializer(), + NonDiffableValueSerializer.getAbstractInstance() + ); } - for (String custom : currentCustoms) { - Metadata.Custom cursor = currentState.metadata().custom(custom); - if (cursor.context().contains(Metadata.XContentContext.GATEWAY)) { - updatedCustom.put(custom, cursor); - } + return DiffableUtils.diff( + filterCustoms(previousState.metadata().customs(), includeEphemeral), + filterCustoms(currentState.metadata().customs(), includeEphemeral), + DiffableUtils.getStringKeySerializer(), + NonDiffableValueSerializer.getAbstractInstance() + ); + } + + public static Map filterCustoms(Map customs, boolean includeEphemeral) { + if (includeEphemeral) { + return customs; } - return updatedCustom; + return customs.entrySet() + .stream() + .filter(e -> e.getValue().context().contains(XContentContext.GATEWAY)) + .collect(Collectors.toMap(Entry::getKey, Entry::getValue)); } boolean isGlobalMetadataEqual(ClusterMetadataManifest first, ClusterMetadataManifest second, String clusterName) { diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManagerTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManagerTests.java index 0aff1c4b0e5e2..944cf5d867fcd 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManagerTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManagerTests.java @@ -8,7 +8,16 @@ package org.opensearch.gateway.remote; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.opensearch.Version; import org.opensearch.action.LatchedActionListener; +import org.opensearch.cluster.AbstractNamedDiffable; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.ClusterState.Custom; +import org.opensearch.cluster.DiffableUtils; import org.opensearch.cluster.block.ClusterBlocks; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.common.CheckedRunnable; @@ -16,8 +25,11 @@ import org.opensearch.common.settings.Settings; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.compress.Compressor; import org.opensearch.core.compress.NoneCompressor; +import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.gateway.remote.model.RemoteClusterBlocks; import org.opensearch.gateway.remote.model.RemoteDiscoveryNodes; import org.opensearch.gateway.remote.model.RemoteReadResult; @@ -45,13 +57,14 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.hamcrest.Matchers.is; public class RemoteClusterStateAttributesManagerTests extends OpenSearchTestCase { private RemoteClusterStateAttributesManager remoteClusterStateAttributesManager; private BlobStoreTransferService blobStoreTransferService; private BlobStoreRepository blobStoreRepository; private Compressor compressor; - private ThreadPool threadpool = new TestThreadPool(RemoteClusterStateAttributesManagerTests.class.getName()); + private ThreadPool threadPool = new TestThreadPool(RemoteClusterStateAttributesManagerTests.class.getName()); @Before public void setup() throws Exception { @@ -65,15 +78,15 @@ public void setup() throws Exception { "test-cluster", blobStoreRepository, blobStoreTransferService, - namedWriteableRegistry, - threadpool + writableRegistry(), + threadPool ); } @After public void tearDown() throws Exception { super.tearDown(); - threadpool.shutdown(); + threadPool.shutdown(); } public void testGetAsyncMetadataReadAction_DiscoveryNodes() throws IOException { @@ -138,4 +151,169 @@ public void testGetAsyncMetadataReadAction_ClusterBlocks() throws IOException { throw new RuntimeException(e); } } + + public void testGetUpdatedCustoms() { + Map previousCustoms = Map.of( + TestCustom1.TYPE, + new TestCustom1("data1"), + TestCustom2.TYPE, + new TestCustom2("data2"), + TestCustom3.TYPE, + new TestCustom3("data3") + ); + ClusterState previousState = ClusterState.builder(new ClusterName("test-cluster")).customs(previousCustoms).build(); + + Map currentCustoms = Map.of( + TestCustom2.TYPE, + new TestCustom2("data2"), + TestCustom3.TYPE, + new TestCustom3("data3-changed"), + TestCustom4.TYPE, + new TestCustom4("data4") + ); + + ClusterState currentState = ClusterState.builder(new ClusterName("test-cluster")).customs(currentCustoms).build(); + + DiffableUtils.MapDiff> customsDiff = + remoteClusterStateAttributesManager.getUpdatedCustoms(currentState, previousState, false, randomBoolean()); + assertThat(customsDiff.getUpserts(), is(Collections.emptyMap())); + assertThat(customsDiff.getDeletes(), is(Collections.emptyList())); + + customsDiff = remoteClusterStateAttributesManager.getUpdatedCustoms(currentState, previousState, true, true); + assertThat(customsDiff.getUpserts(), is(currentCustoms)); + assertThat(customsDiff.getDeletes(), is(Collections.emptyList())); + + Map expectedCustoms = Map.of( + TestCustom3.TYPE, + new TestCustom3("data3-changed"), + TestCustom4.TYPE, + new TestCustom4("data4") + ); + + customsDiff = remoteClusterStateAttributesManager.getUpdatedCustoms(currentState, previousState, true, false); + assertThat(customsDiff.getUpserts(), is(expectedCustoms)); + assertThat(customsDiff.getDeletes(), is(List.of(TestCustom1.TYPE))); + } + + private static abstract class AbstractTestCustom extends AbstractNamedDiffable implements ClusterState.Custom { + + private final String value; + + AbstractTestCustom(String value) { + this.value = value; + } + + AbstractTestCustom(StreamInput in) throws IOException { + this.value = in.readString(); + } + + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(value); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return builder; + } + + @Override + public boolean isPrivate() { + return true; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + AbstractTestCustom that = (AbstractTestCustom) o; + + if (!value.equals(that.value)) return false; + + return true; + } + + @Override + public int hashCode() { + return value.hashCode(); + } + } + + private static class TestCustom1 extends AbstractTestCustom { + + private static final String TYPE = "custom_1"; + + TestCustom1(String value) { + super(value); + } + + TestCustom1(StreamInput in) throws IOException { + super(in); + } + + @Override + public String getWriteableName() { + return TYPE; + } + } + + private static class TestCustom2 extends AbstractTestCustom { + + private static final String TYPE = "custom_2"; + + TestCustom2(String value) { + super(value); + } + + TestCustom2(StreamInput in) throws IOException { + super(in); + } + + @Override + public String getWriteableName() { + return TYPE; + } + } + + private static class TestCustom3 extends AbstractTestCustom { + + private static final String TYPE = "custom_3"; + + TestCustom3(String value) { + super(value); + } + + TestCustom3(StreamInput in) throws IOException { + super(in); + } + + @Override + public String getWriteableName() { + return TYPE; + } + } + + private static class TestCustom4 extends AbstractTestCustom { + + private static final String TYPE = "custom_4"; + + TestCustom4(String value) { + super(value); + } + + TestCustom4(StreamInput in) throws IOException { + super(in); + } + + @Override + public String getWriteableName() { + return TYPE; + } + } } diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index feae97bae48e9..32e595dcf23fa 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -12,6 +12,8 @@ import org.opensearch.cluster.ClusterModule; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.RepositoryCleanupInProgress; +import org.opensearch.cluster.RepositoryCleanupInProgress.Entry; import org.opensearch.cluster.coordination.CoordinationMetadata; import org.opensearch.cluster.metadata.IndexGraveyard; import org.opensearch.cluster.metadata.IndexMetadata; @@ -74,6 +76,7 @@ import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Locale; import java.util.Map; @@ -154,6 +157,7 @@ public void setup() { .put(stateRepoTypeAttributeKey, FsRepository.TYPE) .put(stateRepoSettingsAttributeKeyPrefix + "location", "randomRepoPath") .put(RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true) + .put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, "routing_repository") .build(); clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); @@ -174,6 +178,8 @@ public void setup() { when(repositoriesService.repository("routing_repository")).thenReturn(blobStoreRepository); when(blobStoreRepository.getNamedXContentRegistry()).thenReturn(xContentRegistry); + Settings nodeSettings = Settings.builder().put(REMOTE_PUBLICATION_EXPERIMENTAL, "true").build(); + FeatureFlags.initializeFeatureFlags(nodeSettings); remoteClusterStateService = new RemoteClusterStateService( "test-node-id", repositoriesServiceSupplier, @@ -265,7 +271,46 @@ public void testWriteFullMetadataSuccess() throws IOException { assertFalse(manifest.getCustomMetadataMap().isEmpty()); } + public void testWriteFullMetadataSuccessPublicationEnabled() throws IOException { + final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()) + .customs(Map.of(RepositoryCleanupInProgress.TYPE, new RepositoryCleanupInProgress(List.of(new Entry("test-repo", 10L))))) + .build(); + mockBlobStoreObjects(); + remoteClusterStateService.start(); + final ClusterMetadataManifest manifest = remoteClusterStateService.writeFullMetadata(clusterState, "prev-cluster-uuid") + .getClusterMetadataManifest(); + final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "index-uuid", "metadata-filename"); + List indices = List.of(uploadedIndexMetadata); + + final ClusterMetadataManifest expectedManifest = ClusterMetadataManifest.builder() + .indices(indices) + .clusterTerm(1L) + .stateVersion(1L) + .stateUUID("state-uuid") + .clusterUUID("cluster-uuid") + .previousClusterUUID("prev-cluster-uuid") + .build(); + + assertThat(manifest.getIndices().size(), is(1)); + assertThat(manifest.getIndices().get(0).getIndexName(), is(uploadedIndexMetadata.getIndexName())); + assertThat(manifest.getIndices().get(0).getIndexUUID(), is(uploadedIndexMetadata.getIndexUUID())); + assertThat(manifest.getIndices().get(0).getUploadedFilename(), notNullValue()); + assertThat(manifest.getClusterTerm(), is(expectedManifest.getClusterTerm())); + assertThat(manifest.getStateVersion(), is(expectedManifest.getStateVersion())); + assertThat(manifest.getClusterUUID(), is(expectedManifest.getClusterUUID())); + assertThat(manifest.getStateUUID(), is(expectedManifest.getStateUUID())); + assertThat(manifest.getPreviousClusterUUID(), is(expectedManifest.getPreviousClusterUUID())); + assertThat(manifest.getGlobalMetadataFileName(), nullValue()); + assertThat(manifest.getCoordinationMetadata(), notNullValue()); + assertThat(manifest.getSettingsMetadata(), notNullValue()); + assertThat(manifest.getTemplatesMetadata(), notNullValue()); + assertFalse(manifest.getCustomMetadataMap().isEmpty()); + assertThat(manifest.getClusterStateCustomMap().size(), is(1)); + assertThat(manifest.getClusterStateCustomMap().containsKey(RepositoryCleanupInProgress.TYPE), is(true)); + } + public void testWriteFullMetadataInParallelSuccess() throws IOException { + // TODO Add test with publication flag enabled final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); AsyncMultiStreamBlobContainer container = (AsyncMultiStreamBlobContainer) mockBlobStoreObjects(AsyncMultiStreamBlobContainer.class); @@ -310,8 +355,8 @@ public void testWriteFullMetadataInParallelSuccess() throws IOException { assertThat(manifest.getStateUUID(), is(expectedManifest.getStateUUID())); assertThat(manifest.getPreviousClusterUUID(), is(expectedManifest.getPreviousClusterUUID())); - assertEquals(11, actionListenerArgumentCaptor.getAllValues().size()); - assertEquals(11, writeContextArgumentCaptor.getAllValues().size()); + assertEquals(7, actionListenerArgumentCaptor.getAllValues().size()); + assertEquals(7, writeContextArgumentCaptor.getAllValues().size()); byte[] writtenBytes = capturedWriteContext.get("metadata") .getStreamProvider(Integer.MAX_VALUE) @@ -584,7 +629,7 @@ private void verifyWriteIncrementalGlobalMetadataFromOlderCodecSuccess(ClusterMe assertNotNull(manifest.getCoordinationMetadata()); assertNotNull(manifest.getSettingsMetadata()); assertNotNull(manifest.getTemplatesMetadata()); - assertNotEquals(0, manifest.getCustomMetadataMap().size()); + assertNotNull(manifest.getCustomMetadataMap()); assertEquals(expectedManifest.getClusterTerm(), manifest.getClusterTerm()); assertEquals(expectedManifest.getStateVersion(), manifest.getStateVersion()); @@ -1737,6 +1782,17 @@ private BlobContainer mockBlobStoreObjects(Class blobCo final BlobPath blobPath = mock(BlobPath.class); when((blobStoreRepository.basePath())).thenReturn(blobPath); when(blobPath.add(anyString())).thenReturn(blobPath); + when(blobPath.iterator()).thenReturn(new Iterator() { + @Override + public boolean hasNext() { + return false; + } + + @Override + public String next() { + return null; + } + }); when(blobPath.buildAsString()).thenReturn("/blob/path/"); final BlobContainer blobContainer = mock(blobContainerClazz); when(blobContainer.path()).thenReturn(blobPath); diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManagerTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManagerTests.java index f24f8ddeb1959..22f1d79882228 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManagerTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManagerTests.java @@ -8,7 +8,14 @@ package org.opensearch.gateway.remote; +import org.opensearch.Version; import org.opensearch.cluster.ClusterModule; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.DiffableUtils; +import org.opensearch.cluster.metadata.IndexGraveyard; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.metadata.Metadata.XContentContext; import org.opensearch.common.network.NetworkModule; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; @@ -19,15 +26,20 @@ import org.opensearch.indices.IndicesModule; import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.test.TestCustomMetadata; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; import org.junit.After; import org.junit.Before; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; import java.util.function.Function; import java.util.stream.Stream; import static java.util.stream.Collectors.toList; +import static org.hamcrest.Matchers.is; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -83,4 +95,201 @@ public void testGlobalMetadataUploadWaitTimeSetting() { clusterSettings.applySettings(newSettings); assertEquals(globalMetadataUploadTimeout, remoteGlobalMetadataManager.getGlobalMetadataUploadTimeout().seconds()); } + + public void testGetUpdatedCustoms() { + Map previousCustoms = Map.of( + CustomMetadata1.TYPE, + new CustomMetadata1("data1"), + CustomMetadata2.TYPE, + new CustomMetadata2("data2"), + CustomMetadata3.TYPE, + new CustomMetadata3("data3") + ); + ClusterState previousState = ClusterState.builder(new ClusterName("test-cluster")) + .metadata(Metadata.builder().customs(previousCustoms)) + .build(); + + Map currentCustoms = Map.of( + CustomMetadata2.TYPE, + new CustomMetadata2("data2"), + CustomMetadata3.TYPE, + new CustomMetadata3("data3-changed"), + CustomMetadata4.TYPE, + new CustomMetadata4("data4"), + CustomMetadata5.TYPE, + new CustomMetadata5("data4") + ); + ClusterState currentState = ClusterState.builder(new ClusterName("test-cluster")) + .metadata(Metadata.builder().customs(currentCustoms)) + .build(); + + DiffableUtils.MapDiff> customsDiff = remoteGlobalMetadataManager + .getCustomsDiff(currentState, previousState, true, false); + Map expectedUpserts = Map.of( + CustomMetadata2.TYPE, + new CustomMetadata2("data2"), + CustomMetadata3.TYPE, + new CustomMetadata3("data3-changed"), + CustomMetadata4.TYPE, + new CustomMetadata4("data4"), + IndexGraveyard.TYPE, + IndexGraveyard.builder().build() + ); + assertThat(customsDiff.getUpserts(), is(expectedUpserts)); + assertThat(customsDiff.getDeletes(), is(List.of())); + + customsDiff = remoteGlobalMetadataManager.getCustomsDiff(currentState, previousState, false, false); + expectedUpserts = Map.of( + CustomMetadata3.TYPE, + new CustomMetadata3("data3-changed"), + CustomMetadata4.TYPE, + new CustomMetadata4("data4") + ); + assertThat(customsDiff.getUpserts(), is(expectedUpserts)); + assertThat(customsDiff.getDeletes(), is(List.of(CustomMetadata1.TYPE))); + + customsDiff = remoteGlobalMetadataManager.getCustomsDiff(currentState, previousState, true, true); + expectedUpserts = Map.of( + CustomMetadata2.TYPE, + new CustomMetadata2("data2"), + CustomMetadata3.TYPE, + new CustomMetadata3("data3-changed"), + CustomMetadata4.TYPE, + new CustomMetadata4("data4"), + CustomMetadata5.TYPE, + new CustomMetadata5("data5"), + IndexGraveyard.TYPE, + IndexGraveyard.builder().build() + ); + assertThat(customsDiff.getUpserts(), is(expectedUpserts)); + assertThat(customsDiff.getDeletes(), is(List.of())); + + customsDiff = remoteGlobalMetadataManager.getCustomsDiff(currentState, previousState, false, true); + expectedUpserts = Map.of( + CustomMetadata3.TYPE, + new CustomMetadata3("data3-changed"), + CustomMetadata4.TYPE, + new CustomMetadata4("data4"), + CustomMetadata5.TYPE, + new CustomMetadata5("data5") + ); + assertThat(customsDiff.getUpserts(), is(expectedUpserts)); + assertThat(customsDiff.getDeletes(), is(List.of(CustomMetadata1.TYPE))); + + } + + private static class CustomMetadata1 extends TestCustomMetadata { + public static final String TYPE = "custom_md_1"; + + CustomMetadata1(String data) { + super(data); + } + + @Override + public String getWriteableName() { + return TYPE; + } + + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT; + } + + @Override + public EnumSet context() { + return EnumSet.of(Metadata.XContentContext.GATEWAY); + } + } + + private static class CustomMetadata2 extends TestCustomMetadata { + public static final String TYPE = "custom_md_2"; + + CustomMetadata2(String data) { + super(data); + } + + @Override + public String getWriteableName() { + return TYPE; + } + + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT; + } + + @Override + public EnumSet context() { + return EnumSet.of(Metadata.XContentContext.GATEWAY); + } + } + + private static class CustomMetadata3 extends TestCustomMetadata { + public static final String TYPE = "custom_md_3"; + + CustomMetadata3(String data) { + super(data); + } + + @Override + public String getWriteableName() { + return TYPE; + } + + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT; + } + + @Override + public EnumSet context() { + return EnumSet.of(Metadata.XContentContext.GATEWAY); + } + } + + private static class CustomMetadata4 extends TestCustomMetadata { + public static final String TYPE = "custom_md_4"; + + CustomMetadata4(String data) { + super(data); + } + + @Override + public String getWriteableName() { + return TYPE; + } + + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT; + } + + @Override + public EnumSet context() { + return EnumSet.of(Metadata.XContentContext.GATEWAY); + } + } + + private static class CustomMetadata5 extends TestCustomMetadata { + public static final String TYPE = "custom_md_5"; + + CustomMetadata5(String data) { + super(data); + } + + @Override + public String getWriteableName() { + return TYPE; + } + + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT; + } + + @Override + public EnumSet context() { + return EnumSet.of(XContentContext.API); + } + } } From 7da9fdf36b24e6b1147a2277aae4af2cbde2de39 Mon Sep 17 00:00:00 2001 From: Sooraj Sinha Date: Fri, 14 Jun 2024 12:50:26 +0530 Subject: [PATCH 2/7] Fix unit tests Signed-off-by: Sooraj Sinha --- .../remote/RemoteClusterStateService.java | 15 ++++++++------- .../remote/RemoteClusterStateServiceTests.java | 17 +++++++++++++---- 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index d062a41a416fb..a8ea660ec2d76 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -292,15 +292,16 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata( assert previousClusterState.metadata().coordinationMetadata().term() == clusterState.metadata().coordinationMetadata().term(); boolean firstUploadForSplitGlobalMetadata = !previousManifest.hasMetadataAttributesFiles(); + boolean firstUploadForEphemeralMetadata = previousManifest.getDiscoveryNodesMetadata() != null; final DiffableUtils.MapDiff> customsDiff = remoteGlobalMetadataManager - .getCustomsDiff(clusterState, previousClusterState, isPublicationEnabled, firstUploadForSplitGlobalMetadata); + .getCustomsDiff(clusterState, previousClusterState, firstUploadForSplitGlobalMetadata, isPublicationEnabled); final DiffableUtils.MapDiff> clusterStateCustomsDiff = remoteClusterStateAttributesManager.getUpdatedCustoms( clusterState, previousClusterState, isPublicationEnabled, - firstUploadForSplitGlobalMetadata + firstUploadForEphemeralMetadata ); final Map allUploadedCustomMap = new HashMap<>(previousManifest.getCustomMetadataMap()); final Map allUploadedClusterStateCustomsMap = new HashMap<>( @@ -350,7 +351,7 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata( ; boolean updateSettingsMetadata = firstUploadForSplitGlobalMetadata || Metadata.isSettingsMetadataEqual(previousClusterState.metadata(), clusterState.metadata()) == false; - boolean updateTransientSettingsMetadata = firstUploadForSplitGlobalMetadata + boolean updateTransientSettingsMetadata = firstUploadForEphemeralMetadata || Metadata.isTransientSettingsMetadataEqual(previousClusterState.metadata(), clusterState.metadata()) == false; boolean updateTemplatesMetadata = firstUploadForSplitGlobalMetadata || Metadata.isTemplatesMetadataEqual(previousClusterState.metadata(), clusterState.metadata()) == false; @@ -358,7 +359,7 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata( final boolean updateDiscoveryNodes = isPublicationEnabled && clusterState.getNodes().delta(previousClusterState.getNodes()).hasChanges(); final boolean updateClusterBlocks = isPublicationEnabled && !clusterState.blocks().equals(previousClusterState.blocks()); - final boolean updateHashesOfConsistentSettings = isPublicationEnabled && firstUploadForSplitGlobalMetadata + final boolean updateHashesOfConsistentSettings = isPublicationEnabled && firstUploadForEphemeralMetadata || Metadata.isHashesOfConsistentSettingsEqual(previousClusterState.metadata(), clusterState.metadata()) == false; uploadedMetadataResults = writeMetadataInParallel( @@ -400,13 +401,13 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata( if (!updateTemplatesMetadata) { uploadedMetadataResults.uploadedTemplatesMetadata = previousManifest.getTemplatesMetadata(); } - if (!updateDiscoveryNodes && !firstUploadForSplitGlobalMetadata) { + if (!updateDiscoveryNodes && !firstUploadForEphemeralMetadata) { uploadedMetadataResults.uploadedDiscoveryNodes = previousManifest.getDiscoveryNodesMetadata(); } - if (!updateClusterBlocks && !firstUploadForSplitGlobalMetadata) { + if (!updateClusterBlocks && !firstUploadForEphemeralMetadata) { uploadedMetadataResults.uploadedClusterBlocks = previousManifest.getClusterBlocksMetadata(); } - if (!updateHashesOfConsistentSettings && !firstUploadForSplitGlobalMetadata) { + if (!updateHashesOfConsistentSettings && !firstUploadForEphemeralMetadata) { uploadedMetadataResults.uploadedHashesOfConsistentSettings = previousManifest.getHashesOfConsistentSettings(); } uploadedMetadataResults.uploadedCustomMetadataMap = allUploadedCustomMap; diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index 32e595dcf23fa..1b67a3bd2544f 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -133,6 +133,7 @@ public class RemoteClusterStateServiceTests extends OpenSearchTestCase { private BlobStoreRepository blobStoreRepository; private BlobStore blobStore; private Settings settings; + private boolean publicationEnabled; private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); @Before @@ -178,7 +179,9 @@ public void setup() { when(repositoriesService.repository("routing_repository")).thenReturn(blobStoreRepository); when(blobStoreRepository.getNamedXContentRegistry()).thenReturn(xContentRegistry); - Settings nodeSettings = Settings.builder().put(REMOTE_PUBLICATION_EXPERIMENTAL, "true").build(); + // TODO Make the publication flag parameterized + publicationEnabled = true; + Settings nodeSettings = Settings.builder().put(REMOTE_PUBLICATION_EXPERIMENTAL, publicationEnabled).build(); FeatureFlags.initializeFeatureFlags(nodeSettings); remoteClusterStateService = new RemoteClusterStateService( "test-node-id", @@ -355,8 +358,8 @@ public void testWriteFullMetadataInParallelSuccess() throws IOException { assertThat(manifest.getStateUUID(), is(expectedManifest.getStateUUID())); assertThat(manifest.getPreviousClusterUUID(), is(expectedManifest.getPreviousClusterUUID())); - assertEquals(7, actionListenerArgumentCaptor.getAllValues().size()); - assertEquals(7, writeContextArgumentCaptor.getAllValues().size()); + assertEquals(12, actionListenerArgumentCaptor.getAllValues().size()); + assertEquals(12, writeContextArgumentCaptor.getAllValues().size()); byte[] writtenBytes = capturedWriteContext.get("metadata") .getStreamProvider(Integer.MAX_VALUE) @@ -814,6 +817,7 @@ public void testCustomMetadataDeletedUpdatedAndAdded() throws IOException { .putCustom("custom1", new CustomMetadata1("mock_custom_metadata1")) .putCustom("custom2", new CustomMetadata1("mock_custom_metadata2")) .putCustom("custom3", new CustomMetadata1("mock_custom_metadata3")) + .version(initialClusterState.metadata().version() + 1) ) .build(); @@ -829,6 +833,7 @@ public void testCustomMetadataDeletedUpdatedAndAdded() throws IOException { .putCustom("custom2", new CustomMetadata1("mock_updated_custom_metadata")) .putCustom("custom3", new CustomMetadata1("mock_custom_metadata3")) .putCustom("custom4", new CustomMetadata1("mock_custom_metadata4")) + .version(clusterState1.metadata().version() + 1) ) .build(); ClusterMetadataManifest manifest2 = remoteClusterStateService.writeIncrementalMetadata(clusterState1, clusterState2, manifest1) @@ -1358,7 +1363,11 @@ public void testRemoteStateStats() throws IOException { } public void testRemoteRoutingTableNotInitializedWhenDisabled() { - assertTrue(remoteClusterStateService.getRemoteRoutingTableService() instanceof NoopRemoteRoutingTableService); + if (publicationEnabled) { + assertTrue(remoteClusterStateService.getRemoteRoutingTableService() instanceof InternalRemoteRoutingTableService); + } else { + assertTrue(remoteClusterStateService.getRemoteRoutingTableService() instanceof NoopRemoteRoutingTableService); + } } public void testRemoteRoutingTableInitializedWhenEnabled() { From 48205b6c07452a0e506f401cb0ad1d7d2831088f Mon Sep 17 00:00:00 2001 From: Sooraj Sinha Date: Fri, 14 Jun 2024 13:53:29 +0530 Subject: [PATCH 3/7] Fix getUpdatedCustoms test Signed-off-by: Sooraj Sinha --- .../gateway/remote/RemoteClusterStateAttributesManager.java | 4 ++-- .../gateway/remote/RemoteGlobalMetadataManagerTests.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java index 12d5daa482cd4..9aefbac01cf6c 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java @@ -131,7 +131,7 @@ public DiffableUtils.MapDiff Date: Fri, 14 Jun 2024 14:53:28 +0530 Subject: [PATCH 4/7] use try with resources Signed-off-by: Sooraj Sinha --- .../gateway/remote/RemoteClusterStateAttributesManager.java | 2 +- .../opensearch/gateway/remote/RemoteClusterStateService.java | 2 +- .../gateway/remote/model/RemoteClusterStateBlobStore.java | 4 +++- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java index 9aefbac01cf6c..30dc281e3ae40 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java @@ -143,7 +143,7 @@ public DiffableUtils.MapDiff> customsDiff = remoteGlobalMetadataManager .getCustomsDiff(clusterState, previousClusterState, firstUploadForSplitGlobalMetadata, isPublicationEnabled); diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateBlobStore.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateBlobStore.java index 83326f65f0d43..1dd23443f1252 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateBlobStore.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateBlobStore.java @@ -72,7 +72,9 @@ public void writeAsync(final U entity, final ActionListener listener) { public T read(final U entity) throws IOException { // TODO Add timing logs and tracing assert entity.getFullBlobName() != null; - return entity.deserialize(transferService.downloadBlob(getBlobPathForDownload(entity), entity.getBlobFileName())); + try (InputStream inputStream = transferService.downloadBlob(getBlobPathForDownload(entity), entity.getBlobFileName())) { + return entity.deserialize(inputStream); + } } @Override From d61fe23f92c2bba876c9fcfbdd7453e07910e17a Mon Sep 17 00:00:00 2001 From: Sooraj Sinha Date: Sat, 15 Jun 2024 08:51:01 +0530 Subject: [PATCH 5/7] Add more assertions Signed-off-by: Sooraj Sinha --- .../RemoteClusterStateAttributesManager.java | 6 ++-- ...oteClusterStateAttributesManagerTests.java | 8 ++--- .../RemoteClusterStateServiceTests.java | 30 +++++++++++++++---- 3 files changed, 31 insertions(+), 13 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java index 30dc281e3ae40..3962b7c63adad 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java @@ -130,11 +130,11 @@ public CheckedRunnable getAsyncMetadataReadAction( public DiffableUtils.MapDiff> getUpdatedCustoms( ClusterState clusterState, ClusterState previousClusterState, - boolean includeEphemeral, + boolean isWriteFull, boolean firstUploadForEphemeralMetadata ) { - if (!includeEphemeral) { - // When includeEphemeral is false, we do not want store any custom objects + if (!isWriteFull) { + // When isWriteFull is false, we do not want store any custom objects return DiffableUtils.diff( Collections.emptyMap(), Collections.emptyMap(), diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManagerTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManagerTests.java index 944cf5d867fcd..41e1546ead164 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManagerTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManagerTests.java @@ -8,9 +8,6 @@ package org.opensearch.gateway.remote; -import java.util.Collections; -import java.util.List; -import java.util.Map; import org.opensearch.Version; import org.opensearch.action.LatchedActionListener; import org.opensearch.cluster.AbstractNamedDiffable; @@ -43,6 +40,9 @@ import org.junit.Before; import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; @@ -53,11 +53,11 @@ import static org.opensearch.gateway.remote.model.RemoteClusterBlocksTests.randomClusterBlocks; import static org.opensearch.gateway.remote.model.RemoteDiscoveryNodes.DISCOVERY_NODES_FORMAT; import static org.opensearch.gateway.remote.model.RemoteDiscoveryNodesTests.getDiscoveryNodes; +import static org.hamcrest.Matchers.is; import static org.mockito.ArgumentMatchers.anyIterable; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import static org.hamcrest.Matchers.is; public class RemoteClusterStateAttributesManagerTests extends OpenSearchTestCase { private RemoteClusterStateAttributesManager remoteClusterStateAttributesManager; diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index 1b67a3bd2544f..c8fd982fec1e1 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -179,10 +179,6 @@ public void setup() { when(repositoriesService.repository("routing_repository")).thenReturn(blobStoreRepository); when(blobStoreRepository.getNamedXContentRegistry()).thenReturn(xContentRegistry); - // TODO Make the publication flag parameterized - publicationEnabled = true; - Settings nodeSettings = Settings.builder().put(REMOTE_PUBLICATION_EXPERIMENTAL, publicationEnabled).build(); - FeatureFlags.initializeFeatureFlags(nodeSettings); remoteClusterStateService = new RemoteClusterStateService( "test-node-id", repositoriesServiceSupplier, @@ -199,6 +195,9 @@ public void setup() { public void teardown() throws Exception { super.tearDown(); remoteClusterStateService.close(); + publicationEnabled = false; + Settings nodeSettings = Settings.builder().build(); + FeatureFlags.initializeFeatureFlags(nodeSettings); threadPool.shutdown(); } @@ -272,9 +271,28 @@ public void testWriteFullMetadataSuccess() throws IOException { assertThat(manifest.getSettingsMetadata(), notNullValue()); assertThat(manifest.getTemplatesMetadata(), notNullValue()); assertFalse(manifest.getCustomMetadataMap().isEmpty()); + assertThat(manifest.getClusterBlocksMetadata(), nullValue()); + assertThat(manifest.getDiscoveryNodesMetadata(), nullValue()); + assertThat(manifest.getTransientSettingsMetadata(), nullValue()); + assertThat(manifest.getHashesOfConsistentSettings(), nullValue()); + assertThat(manifest.getClusterStateCustomMap().size(), is(0)); } public void testWriteFullMetadataSuccessPublicationEnabled() throws IOException { + // TODO Make the publication flag parameterized + publicationEnabled = true; + Settings nodeSettings = Settings.builder().put(REMOTE_PUBLICATION_EXPERIMENTAL, publicationEnabled).build(); + FeatureFlags.initializeFeatureFlags(nodeSettings); + remoteClusterStateService = new RemoteClusterStateService( + "test-node-id", + repositoriesServiceSupplier, + settings, + clusterService, + () -> 0L, + threadPool, + List.of(new RemoteIndexPathUploader(threadPool, settings, repositoriesServiceSupplier, clusterSettings)), + writableRegistry() + ); final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()) .customs(Map.of(RepositoryCleanupInProgress.TYPE, new RepositoryCleanupInProgress(List.of(new Entry("test-repo", 10L))))) .build(); @@ -358,8 +376,8 @@ public void testWriteFullMetadataInParallelSuccess() throws IOException { assertThat(manifest.getStateUUID(), is(expectedManifest.getStateUUID())); assertThat(manifest.getPreviousClusterUUID(), is(expectedManifest.getPreviousClusterUUID())); - assertEquals(12, actionListenerArgumentCaptor.getAllValues().size()); - assertEquals(12, writeContextArgumentCaptor.getAllValues().size()); + assertEquals(7, actionListenerArgumentCaptor.getAllValues().size()); + assertEquals(7, writeContextArgumentCaptor.getAllValues().size()); byte[] writtenBytes = capturedWriteContext.get("metadata") .getStreamProvider(Integer.MAX_VALUE) From e7bc511f6fad53a168328995cae8b51d1e5b6005 Mon Sep 17 00:00:00 2001 From: Sooraj Sinha Date: Sun, 16 Jun 2024 14:23:13 +0530 Subject: [PATCH 6/7] Remove first ephemeral upload check from incremental update Signed-off-by: Sooraj Sinha --- .../remote/RemoteClusterStateService.java | 22 ++++++++----------- .../remote/RemoteGlobalMetadataManager.java | 12 +++++----- 2 files changed, 15 insertions(+), 19 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index f07fce563bc64..4a1c9c8615e39 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -292,17 +292,11 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata( assert previousClusterState.metadata().coordinationMetadata().term() == clusterState.metadata().coordinationMetadata().term(); boolean firstUploadForSplitGlobalMetadata = !previousManifest.hasMetadataAttributesFiles(); - boolean firstUploadForEphemeralMetadata = previousManifest.getDiscoveryNodesMetadata() == null; final DiffableUtils.MapDiff> customsDiff = remoteGlobalMetadataManager .getCustomsDiff(clusterState, previousClusterState, firstUploadForSplitGlobalMetadata, isPublicationEnabled); final DiffableUtils.MapDiff> clusterStateCustomsDiff = - remoteClusterStateAttributesManager.getUpdatedCustoms( - clusterState, - previousClusterState, - isPublicationEnabled, - firstUploadForEphemeralMetadata - ); + remoteClusterStateAttributesManager.getUpdatedCustoms(clusterState, previousClusterState, isPublicationEnabled, false); final Map allUploadedCustomMap = new HashMap<>(previousManifest.getCustomMetadataMap()); final Map allUploadedClusterStateCustomsMap = new HashMap<>( previousManifest.getClusterStateCustomMap() @@ -351,15 +345,17 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata( ; boolean updateSettingsMetadata = firstUploadForSplitGlobalMetadata || Metadata.isSettingsMetadataEqual(previousClusterState.metadata(), clusterState.metadata()) == false; - boolean updateTransientSettingsMetadata = firstUploadForEphemeralMetadata - || Metadata.isTransientSettingsMetadataEqual(previousClusterState.metadata(), clusterState.metadata()) == false; + boolean updateTransientSettingsMetadata = Metadata.isTransientSettingsMetadataEqual( + previousClusterState.metadata(), + clusterState.metadata() + ) == false; boolean updateTemplatesMetadata = firstUploadForSplitGlobalMetadata || Metadata.isTemplatesMetadataEqual(previousClusterState.metadata(), clusterState.metadata()) == false; final boolean updateDiscoveryNodes = isPublicationEnabled && clusterState.getNodes().delta(previousClusterState.getNodes()).hasChanges(); final boolean updateClusterBlocks = isPublicationEnabled && !clusterState.blocks().equals(previousClusterState.blocks()); - final boolean updateHashesOfConsistentSettings = isPublicationEnabled && firstUploadForEphemeralMetadata + final boolean updateHashesOfConsistentSettings = isPublicationEnabled || Metadata.isHashesOfConsistentSettingsEqual(previousClusterState.metadata(), clusterState.metadata()) == false; uploadedMetadataResults = writeMetadataInParallel( @@ -401,13 +397,13 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata( if (!updateTemplatesMetadata) { uploadedMetadataResults.uploadedTemplatesMetadata = previousManifest.getTemplatesMetadata(); } - if (!updateDiscoveryNodes && !firstUploadForEphemeralMetadata) { + if (!updateDiscoveryNodes) { uploadedMetadataResults.uploadedDiscoveryNodes = previousManifest.getDiscoveryNodesMetadata(); } - if (!updateClusterBlocks && !firstUploadForEphemeralMetadata) { + if (!updateClusterBlocks) { uploadedMetadataResults.uploadedClusterBlocks = previousManifest.getClusterBlocksMetadata(); } - if (!updateHashesOfConsistentSettings && !firstUploadForEphemeralMetadata) { + if (!updateHashesOfConsistentSettings) { uploadedMetadataResults.uploadedHashesOfConsistentSettings = previousManifest.getHashesOfConsistentSettings(); } uploadedMetadataResults.uploadedCustomMetadataMap = allUploadedCustomMap; diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManager.java index c1ff54d13e9db..2c5aad99adc0c 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManager.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManager.java @@ -284,27 +284,27 @@ DiffableUtils.MapDiff> get ClusterState currentState, ClusterState previousState, boolean firstUploadForSplitGlobalMetadata, - boolean includeEphemeral + boolean isRemotePublicationEnabled ) { if (firstUploadForSplitGlobalMetadata) { // For first split global metadata upload, we want to upload all customs return DiffableUtils.diff( Collections.emptyMap(), - filterCustoms(currentState.metadata().customs(), includeEphemeral), + filterCustoms(currentState.metadata().customs(), isRemotePublicationEnabled), DiffableUtils.getStringKeySerializer(), NonDiffableValueSerializer.getAbstractInstance() ); } return DiffableUtils.diff( - filterCustoms(previousState.metadata().customs(), includeEphemeral), - filterCustoms(currentState.metadata().customs(), includeEphemeral), + filterCustoms(previousState.metadata().customs(), isRemotePublicationEnabled), + filterCustoms(currentState.metadata().customs(), isRemotePublicationEnabled), DiffableUtils.getStringKeySerializer(), NonDiffableValueSerializer.getAbstractInstance() ); } - public static Map filterCustoms(Map customs, boolean includeEphemeral) { - if (includeEphemeral) { + public static Map filterCustoms(Map customs, boolean isRemotePublicationEnabled) { + if (isRemotePublicationEnabled) { return customs; } return customs.entrySet() From ac465ba7482eaff43797e4e5d9ae02908c2ce753 Mon Sep 17 00:00:00 2001 From: Sooraj Sinha Date: Mon, 17 Jun 2024 12:54:52 +0530 Subject: [PATCH 7/7] Modify variable name Signed-off-by: Sooraj Sinha --- .../remote/RemoteClusterStateAttributesManager.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java index 3962b7c63adad..8f986423587d7 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java @@ -130,11 +130,11 @@ public CheckedRunnable getAsyncMetadataReadAction( public DiffableUtils.MapDiff> getUpdatedCustoms( ClusterState clusterState, ClusterState previousClusterState, - boolean isWriteFull, - boolean firstUploadForEphemeralMetadata + boolean isRemotePublicationEnabled, + boolean isFirstUpload ) { - if (!isWriteFull) { - // When isWriteFull is false, we do not want store any custom objects + if (!isRemotePublicationEnabled) { + // When isRemotePublicationEnabled is false, we do not want store any custom objects return DiffableUtils.diff( Collections.emptyMap(), Collections.emptyMap(), @@ -142,7 +142,7 @@ public DiffableUtils.MapDiff