From 9d7343990e8b092ed7abb28a078256b47f0e6e7b Mon Sep 17 00:00:00 2001 From: Shivansh Arora Date: Fri, 7 Jun 2024 19:49:54 +0530 Subject: [PATCH] Use InputStreams rather than XContent for serialization Signed-off-by: Shivansh Arora --- .../cluster/block/ClusterBlocks.java | 16 + .../remote/RemoteRoutingTableService.java | 30 +- .../remote/ClusterMetadataManifest.java | 9 +- .../remote/ClusterStateDiffManifest.java | 55 ++-- .../RemoteClusterStateAttributesManager.java | 96 ++++-- .../remote/RemoteClusterStateService.java | 39 ++- .../remote/RemoteClusterStateUtils.java | 112 +++++++ .../remote/model/RemoteClusterBlocks.java | 62 ++-- .../model/RemoteClusterStateCustoms.java | 71 +++-- .../remote/model/RemoteDiscoveryNodes.java | 57 ++-- .../RemoteHashesOfConsistentSettings.java | 60 ++-- .../RemoteTransientSettingsMetadata.java | 51 ++-- .../cluster/block/ClusterBlockTests.java | 2 +- .../remote/ClusterMetadataManifestTests.java | 285 +++++++++--------- .../model/RemoteClusterBlocksTests.java | 215 +++++++++++++ .../RemoteTransientSettingsMetadataTests.java | 200 ++++++++++++ 16 files changed, 996 insertions(+), 364 deletions(-) create mode 100644 server/src/test/java/org/opensearch/gateway/remote/model/RemoteClusterBlocksTests.java create mode 100644 server/src/test/java/org/opensearch/gateway/remote/model/RemoteTransientSettingsMetadataTests.java diff --git a/server/src/main/java/org/opensearch/cluster/block/ClusterBlocks.java b/server/src/main/java/org/opensearch/cluster/block/ClusterBlocks.java index 02a20b7681ba7..377e8204e9f2f 100644 --- a/server/src/main/java/org/opensearch/cluster/block/ClusterBlocks.java +++ b/server/src/main/java/org/opensearch/cluster/block/ClusterBlocks.java @@ -49,6 +49,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.function.Predicate; @@ -325,6 +326,21 @@ public static Diff readDiffFrom(StreamInput in) throws IOExceptio return AbstractDiffable.readDiffFrom(ClusterBlocks::readFrom, in); } + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ClusterBlocks that = (ClusterBlocks) o; + return Objects.equals(global, that.global) + && Objects.equals(indicesBlocks, that.indicesBlocks) + && Objects.equals(levelHolders, that.levelHolders); + } + + @Override + public int hashCode() { + return Objects.hash(global, indicesBlocks, levelHolders); + } + /** * An immutable level holder. * diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java index 8e43e1c5f86b6..058fc3bff772a 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java @@ -42,17 +42,18 @@ public class RemoteRoutingTableService extends AbstractLifecycleComponent { private final Supplier repositoriesService; private BlobStoreRepository blobStoreRepository; - private static final DiffableUtils.NonDiffableValueSerializer CUSTOM_ROUTING_TABLE_VALUE_SERIALIZER = new DiffableUtils.NonDiffableValueSerializer() { - @Override - public void write(IndexRoutingTable value, StreamOutput out) throws IOException { - value.writeTo(out); - } - - @Override - public IndexRoutingTable read(StreamInput in, String key) throws IOException { - return IndexRoutingTable.readFrom(in); - } - }; + private static final DiffableUtils.NonDiffableValueSerializer CUSTOM_ROUTING_TABLE_VALUE_SERIALIZER = + new DiffableUtils.NonDiffableValueSerializer() { + @Override + public void write(IndexRoutingTable value, StreamOutput out) throws IOException { + value.writeTo(out); + } + + @Override + public IndexRoutingTable read(StreamInput in, String key) throws IOException { + return IndexRoutingTable.readFrom(in); + } + }; public RemoteRoutingTableService(Supplier repositoriesService, Settings settings) { assert isRemoteRoutingTableEnabled(settings) : "Remote routing table is not enabled"; @@ -60,8 +61,10 @@ public RemoteRoutingTableService(Supplier repositoriesServi this.settings = settings; } - - public static DiffableUtils.MapDiff> getIndicesRoutingMapDiff(RoutingTable before, RoutingTable after) { + public static DiffableUtils.MapDiff> getIndicesRoutingMapDiff( + RoutingTable before, + RoutingTable after + ) { return DiffableUtils.diff( before.getIndicesRouting(), after.getIndicesRouting(), @@ -70,7 +73,6 @@ public static DiffableUtils.MapDiff(); for (String custom : state.metadata().customs().keySet()) { - if (!previousState.metadata().customs().containsKey(custom) || !state.metadata().customs().get(custom).equals(previousState.metadata().customs().get(custom))) { + if (!previousState.metadata().customs().containsKey(custom) + || !state.metadata().customs().get(custom).equals(previousState.metadata().customs().get(custom))) { customMetadataUpdated.add(custom); } } @@ -91,14 +90,16 @@ public class ClusterStateDiffManifest implements ToXContentObject { } } - DiffableUtils.MapDiff> routingTableDiff = RemoteRoutingTableService.getIndicesRoutingMapDiff(previousState.getRoutingTable(), - state.getRoutingTable()); + DiffableUtils.MapDiff> routingTableDiff = RemoteRoutingTableService + .getIndicesRoutingMapDiff(previousState.getRoutingTable(), state.getRoutingTable()); indicesRoutingUpdated = new ArrayList<>(); - routingTableDiff.getUpserts().forEach((k,v) -> indicesRoutingUpdated.add(k)); + routingTableDiff.getUpserts().forEach((k, v) -> indicesRoutingUpdated.add(k)); indicesRoutingDeleted = routingTableDiff.getDeletes(); - hashesOfConsistentSettingsUpdated = !state.metadata().hashesOfConsistentSettings().equals(previousState.metadata().hashesOfConsistentSettings()); + hashesOfConsistentSettingsUpdated = !state.metadata() + .hashesOfConsistentSettings() + .equals(previousState.metadata().hashesOfConsistentSettings()); clusterStateCustomUpdated = new ArrayList<>(); clusterStateCustomDeleted = new ArrayList<>(); for (String custom : state.customs().keySet()) { @@ -126,8 +127,8 @@ public ClusterStateDiffManifest( List indicesDeleted, boolean clusterBlocksUpdated, boolean discoveryNodesUpdated, - ListindicesRoutingUpdated, - ListindicesRoutingDeleted, + List indicesRoutingUpdated, + List indicesRoutingDeleted, boolean hashesOfConsistentSettingsUpdated, List clusterStateCustomUpdated, List clusterStateCustomDeleted @@ -267,10 +268,10 @@ public static ClusterStateDiffManifest fromXContent(XContentParser parser) throw token = parser.nextToken(); switch (currentFieldName) { case UPSERTS_FIELD: - builder.indicesUpdated(parseStringList(parser)); + builder.indicesUpdated(convertListToString(parser.listOrderedMap())); break; case DELETES_FIELD: - builder.indicesDeleted(parseStringList(parser)); + builder.indicesDeleted(convertListToString(parser.listOrderedMap())); break; default: throw new XContentParseException("Unexpected field [" + currentFieldName + "]"); @@ -282,10 +283,10 @@ public static ClusterStateDiffManifest fromXContent(XContentParser parser) throw token = parser.nextToken(); switch (currentFieldName) { case UPSERTS_FIELD: - builder.customMetadataUpdated(parseStringList(parser)); + builder.customMetadataUpdated(convertListToString(parser.listOrderedMap())); break; case DELETES_FIELD: - builder.customMetadataDeleted(parseStringList(parser)); + builder.customMetadataDeleted(convertListToString(parser.listOrderedMap())); break; default: throw new XContentParseException("Unexpected field [" + currentFieldName + "]"); @@ -304,10 +305,10 @@ public static ClusterStateDiffManifest fromXContent(XContentParser parser) throw parser.nextToken(); switch (currentFieldName) { case UPSERTS_FIELD: - builder.indicesRoutingUpdated(parseStringList(parser)); + builder.indicesRoutingUpdated(convertListToString(parser.listOrderedMap())); break; case DELETES_FIELD: - builder.indicesRoutingDeleted(parseStringList(parser)); + builder.indicesRoutingDeleted(convertListToString(parser.listOrderedMap())); break; default: throw new XContentParseException("Unexpected field [" + currentFieldName + "]"); @@ -319,10 +320,10 @@ public static ClusterStateDiffManifest fromXContent(XContentParser parser) throw parser.nextToken(); switch (currentFieldName) { case UPSERTS_FIELD: - builder.clusterStateCustomUpdated(parseStringList(parser)); + builder.clusterStateCustomUpdated(convertListToString(parser.listOrderedMap())); break; case DELETES_FIELD: - builder.clusterStateCustomDeleted(parseStringList(parser)); + builder.clusterStateCustomDeleted(convertListToString(parser.listOrderedMap())); break; default: throw new XContentParseException("Unexpected field [" + currentFieldName + "]"); @@ -371,6 +372,14 @@ public List findRemovedIndices(Map indices, Map convertListToString(List list) { + List convertedList = new ArrayList<>(); + for (Object o : list) { + convertedList.add(o.toString()); + } + return convertedList; + } + public List findUpdatedIndices(Map indices, Map previousIndices) { List updatedIndices = new ArrayList<>(); for (String index : indices.keySet()) { @@ -385,8 +394,8 @@ public List findUpdatedIndices(Map indices, Map getIndicesRoutingDeleted(RoutingTable previousRoutingTable, RoutingTable currentRoutingTable) { List deletedIndicesRouting = new ArrayList<>(); - for(IndexRoutingTable previousIndexRouting: previousRoutingTable.getIndicesRouting().values()) { - if(!currentRoutingTable.getIndicesRouting().containsKey(previousIndexRouting.getIndex().getName())) { + for (IndexRoutingTable previousIndexRouting : previousRoutingTable.getIndicesRouting().values()) { + if (!currentRoutingTable.getIndicesRouting().containsKey(previousIndexRouting.getIndex().getName())) { // Latest Routing Table does not have entry for the index which means the index is deleted deletedIndicesRouting.add(previousIndexRouting.getIndex().getName()); } @@ -396,12 +405,14 @@ public List getIndicesRoutingDeleted(RoutingTable previousRoutingTable, public List getIndicesRoutingUpdated(RoutingTable previousRoutingTable, RoutingTable currentRoutingTable) { List updatedIndicesRouting = new ArrayList<>(); - for(IndexRoutingTable currentIndicesRouting: currentRoutingTable.getIndicesRouting().values()) { - if(!previousRoutingTable.getIndicesRouting().containsKey(currentIndicesRouting.getIndex().getName())) { + for (IndexRoutingTable currentIndicesRouting : currentRoutingTable.getIndicesRouting().values()) { + if (!previousRoutingTable.getIndicesRouting().containsKey(currentIndicesRouting.getIndex().getName())) { // Latest Routing Table does not have entry for the index which means the index is created updatedIndicesRouting.add(currentIndicesRouting.getIndex().getName()); } else { - if(previousRoutingTable.getIndicesRouting().get(currentIndicesRouting.getIndex().getName()).equals(currentIndicesRouting)) { + if (previousRoutingTable.getIndicesRouting() + .get(currentIndicesRouting.getIndex().getName()) + .equals(currentIndicesRouting)) { // if the latest routing table has the same routing table as the previous routing table, then the index is not updated continue; } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java index 4a4b0c79b21a9..b1c9cf44b26d5 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java @@ -8,32 +8,33 @@ package org.opensearch.gateway.remote; -import java.io.IOException; import org.opensearch.action.LatchedActionListener; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterState.Custom; import org.opensearch.cluster.block.ClusterBlocks; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.common.CheckedRunnable; +import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity; import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.core.compress.Compressor; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.gateway.remote.RemoteClusterStateUtils.RemoteStateTransferException; -import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity; -import org.opensearch.gateway.remote.model.RemoteClusterStateBlobStore; import org.opensearch.gateway.remote.model.RemoteClusterBlocks; +import org.opensearch.gateway.remote.model.RemoteClusterStateBlobStore; import org.opensearch.gateway.remote.model.RemoteClusterStateCustoms; import org.opensearch.gateway.remote.model.RemoteDiscoveryNodes; import org.opensearch.gateway.remote.model.RemoteReadResult; +import java.io.IOException; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.CUSTOM_DELIMITER; import static org.opensearch.gateway.remote.model.RemoteClusterStateCustoms.CLUSTER_STATE_CUSTOM; -import static org.opensearch.gateway.remote.model.RemoteCustomMetadata.CUSTOM_DELIMITER; public class RemoteClusterStateAttributesManager { public static final String CLUSTER_STATE_ATTRIBUTE = "cluster_state_attribute"; @@ -45,14 +46,22 @@ public class RemoteClusterStateAttributesManager { private final RemoteClusterStateBlobStore customsBlobStore; private final Compressor compressor; private final NamedXContentRegistry namedXContentRegistry; + private final NamedWriteableRegistry namedWriteableRegistry; RemoteClusterStateAttributesManager( - RemoteClusterStateBlobStore clusterBlocksBlobStore, RemoteClusterStateBlobStore discoveryNodesBlobStore, RemoteClusterStateBlobStore customsBlobStore, Compressor compressor, NamedXContentRegistry namedXContentRegistry) { + RemoteClusterStateBlobStore clusterBlocksBlobStore, + RemoteClusterStateBlobStore discoveryNodesBlobStore, + RemoteClusterStateBlobStore customsBlobStore, + Compressor compressor, + NamedXContentRegistry namedXContentRegistry, + NamedWriteableRegistry namedWriteableRegistry + ) { this.clusterBlocksBlobStore = clusterBlocksBlobStore; this.discoveryNodesBlobStore = discoveryNodesBlobStore; this.customsBlobStore = customsBlobStore; this.compressor = compressor; this.namedXContentRegistry = namedXContentRegistry; + this.namedWriteableRegistry = namedWriteableRegistry; } /** @@ -65,10 +74,25 @@ CheckedRunnable getAsyncMetadataWriteAction( LatchedActionListener latchedActionListener ) { if (componentData instanceof DiscoveryNodes) { - RemoteDiscoveryNodes remoteObject = new RemoteDiscoveryNodes((DiscoveryNodes)componentData, clusterState.version(), clusterState.metadata().clusterUUID(), compressor, namedXContentRegistry); - return () -> discoveryNodesBlobStore.writeAsync(remoteObject, getActionListener(component, remoteObject, latchedActionListener)); + RemoteDiscoveryNodes remoteObject = new RemoteDiscoveryNodes( + (DiscoveryNodes) componentData, + clusterState.version(), + clusterState.metadata().clusterUUID(), + compressor, + namedXContentRegistry + ); + return () -> discoveryNodesBlobStore.writeAsync( + remoteObject, + getActionListener(component, remoteObject, latchedActionListener) + ); } else if (componentData instanceof ClusterBlocks) { - RemoteClusterBlocks remoteObject = new RemoteClusterBlocks((ClusterBlocks) componentData, clusterState.version(), clusterState.metadata().clusterUUID(), compressor, namedXContentRegistry); + RemoteClusterBlocks remoteObject = new RemoteClusterBlocks( + (ClusterBlocks) componentData, + clusterState.version(), + clusterState.metadata().clusterUUID(), + compressor, + namedXContentRegistry + ); return () -> clusterBlocksBlobStore.writeAsync(remoteObject, getActionListener(component, remoteObject, latchedActionListener)); } else if (componentData instanceof ClusterState.Custom) { RemoteClusterStateCustoms remoteObject = new RemoteClusterStateCustoms( @@ -77,19 +101,22 @@ CheckedRunnable getAsyncMetadataWriteAction( clusterState.version(), clusterState.metadata().clusterUUID(), compressor, - namedXContentRegistry + namedXContentRegistry, + namedWriteableRegistry ); return () -> customsBlobStore.writeAsync(remoteObject, getActionListener(component, remoteObject, latchedActionListener)); } else { - throw new RemoteStateTransferException("Remote object not found for "+ componentData.getClass()); + throw new RemoteStateTransferException("Remote object not found for " + componentData.getClass()); } } - private ActionListener getActionListener(String component, AbstractRemoteWritableBlobEntity remoteObject, LatchedActionListener latchedActionListener) { + private ActionListener getActionListener( + String component, + AbstractRemoteWritableBlobEntity remoteObject, + LatchedActionListener latchedActionListener + ) { return ActionListener.wrap( - resp -> latchedActionListener.onResponse( - remoteObject.getUploadedMetadata() - ), + resp -> latchedActionListener.onResponse(remoteObject.getUploadedMetadata()), ex -> latchedActionListener.onFailure(new RemoteClusterStateUtils.RemoteStateTransferException(component, ex)) ); } @@ -101,19 +128,48 @@ public CheckedRunnable getAsyncMetadataReadAction( String uploadedFilename, LatchedActionListener listener ) { - final ActionListener actionListener = ActionListener.wrap(response -> listener.onResponse(new RemoteReadResult((ToXContent) response, CLUSTER_STATE_ATTRIBUTE, component)), listener::onFailure); + final ActionListener actionListener = ActionListener.wrap( + response -> listener.onResponse(new RemoteReadResult((ToXContent) response, CLUSTER_STATE_ATTRIBUTE, component)), + listener::onFailure + ); if (component.equals(RemoteDiscoveryNodes.DISCOVERY_NODES)) { - RemoteDiscoveryNodes remoteDiscoveryNodes = new RemoteDiscoveryNodes(uploadedFilename, clusterUUID, compressor, namedXContentRegistry); + RemoteDiscoveryNodes remoteDiscoveryNodes = new RemoteDiscoveryNodes( + uploadedFilename, + clusterUUID, + compressor, + namedXContentRegistry + ); return () -> discoveryNodesBlobStore.readAsync(remoteDiscoveryNodes, actionListener); } else if (component.equals(RemoteClusterBlocks.CLUSTER_BLOCKS)) { - RemoteClusterBlocks remoteClusterBlocks = new RemoteClusterBlocks(uploadedFilename, clusterUUID, compressor, namedXContentRegistry); + RemoteClusterBlocks remoteClusterBlocks = new RemoteClusterBlocks( + uploadedFilename, + clusterUUID, + compressor, + namedXContentRegistry + ); return () -> clusterBlocksBlobStore.readAsync(remoteClusterBlocks, actionListener); } else if (component.equals(CLUSTER_STATE_CUSTOM)) { - final ActionListener customActionListener = ActionListener.wrap(response -> listener.onResponse(new RemoteReadResult((ToXContent) response, CLUSTER_STATE_ATTRIBUTE, String.join(CUSTOM_DELIMITER, component, componentName))), listener::onFailure); - RemoteClusterStateCustoms remoteClusterStateCustoms = new RemoteClusterStateCustoms(uploadedFilename, componentName, clusterUUID, compressor, namedXContentRegistry); + final ActionListener customActionListener = ActionListener.wrap( + response -> listener.onResponse( + new RemoteReadResult( + (ToXContent) response, + CLUSTER_STATE_ATTRIBUTE, + String.join(CUSTOM_DELIMITER, component, componentName) + ) + ), + listener::onFailure + ); + RemoteClusterStateCustoms remoteClusterStateCustoms = new RemoteClusterStateCustoms( + uploadedFilename, + componentName, + clusterUUID, + compressor, + namedXContentRegistry, + namedWriteableRegistry + ); return () -> customsBlobStore.readAsync(remoteClusterStateCustoms, customActionListener); } else { - throw new RemoteStateTransferException("Remote object not found for "+ component); + throw new RemoteStateTransferException("Remote object not found for " + component); } } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index d0593dcd51475..50487315d5867 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -790,27 +790,24 @@ private ClusterMetadataManifest uploadManifest( committed, MANIFEST_CURRENT_CODEC_VERSION ); - final ClusterMetadataManifest manifest = new ClusterMetadataManifest( - clusterState.term(), - clusterState.getVersion(), - clusterState.metadata().clusterUUID(), - clusterState.stateUUID(), - Version.CURRENT, - nodeId, - committed, - MANIFEST_CURRENT_CODEC_VERSION, - null, - uploadedIndexMetadata, - previousClusterUUID, - clusterState.metadata().clusterUUIDCommitted(), - uploadedCoordinationMetadata, - uploadedSettingsMetadata, - uploadedTemplatesMetadata, - uploadedCustomMetadataMap, - clusterState.routingTable().version(), - // TODO: Add actual list of changed indices routing with index routing upload flow. - new ArrayList<>() - ); + final ClusterMetadataManifest manifest = ClusterMetadataManifest.builder() + .clusterTerm(clusterState.term()) + .stateVersion(clusterState.getVersion()) + .clusterUUID(clusterState.metadata().clusterUUID()) + .stateUUID(clusterState.stateUUID()) + .opensearchVersion(Version.CURRENT) + .nodeId(nodeId) + .committed(committed) + .codecVersion(MANIFEST_CURRENT_CODEC_VERSION) + .indices(uploadedIndexMetadata) + .previousClusterUUID(previousClusterUUID) + .clusterUUIDCommitted(clusterState.metadata().clusterUUIDCommitted()) + .coordinationMetadata(uploadedCoordinationMetadata) + .settingMetadata(uploadedSettingsMetadata) + .templatesMetadata(uploadedTemplatesMetadata) + .customMetadataMap(uploadedCustomMetadataMap) + .routingTableVersion(clusterState.routingTable().version()) + .build(); writeMetadataManifest(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), manifest, manifestFileName); return manifest; } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateUtils.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateUtils.java index 500d1af0211e8..99c2e33a964e5 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateUtils.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateUtils.java @@ -8,16 +8,128 @@ package org.opensearch.gateway.remote; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.repositories.blobstore.BlobStoreRepository; + import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Base64; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; /** * Utility class for Remote Cluster State */ public class RemoteClusterStateUtils { + public static final String METADATA_NAME_FORMAT = "%s.dat"; + public static final String CLUSTER_STATE_PATH_TOKEN = "cluster-state"; + public static final String GLOBAL_METADATA_PATH_TOKEN = "global-metadata"; + public static final String CLUSTER_STATE_EPHEMERAL_PATH_TOKEN = "ephemeral"; + public static final int GLOBAL_METADATA_CURRENT_CODEC_VERSION = 1; + public static final String DELIMITER = "__"; + public static final String CUSTOM_DELIMITER = "--"; public static final String PATH_DELIMITER = "/"; + // ToXContent Params with gateway mode. + // We are using gateway context mode to persist all custom metadata. + public static final ToXContent.Params FORMAT_PARAMS = new ToXContent.MapParams( + Map.of(Metadata.CONTEXT_MODE_PARAM, Metadata.CONTEXT_MODE_GATEWAY) + ); + + public static BlobPath getCusterMetadataBasePath(BlobStoreRepository blobStoreRepository, String clusterName, String clusterUUID) { + return blobStoreRepository.basePath().add(encodeString(clusterName)).add(CLUSTER_STATE_PATH_TOKEN).add(clusterUUID); + } + public static String encodeString(String content) { return Base64.getUrlEncoder().withoutPadding().encodeToString(content.getBytes(StandardCharsets.UTF_8)); } + + public static String getFormattedFileName(String fileName, int codecVersion) { + if (codecVersion < ClusterMetadataManifest.CODEC_V3) { + return String.format(Locale.ROOT, METADATA_NAME_FORMAT, fileName); + } + return fileName; + } + + static BlobContainer clusterUUIDContainer(BlobStoreRepository blobStoreRepository, String clusterName) { + return blobStoreRepository.blobStore() + .blobContainer( + blobStoreRepository.basePath() + .add(Base64.getUrlEncoder().withoutPadding().encodeToString(clusterName.getBytes(StandardCharsets.UTF_8))) + .add(CLUSTER_STATE_PATH_TOKEN) + ); + } + + /** + * Exception for Remote state transfer. + */ + public static class RemoteStateTransferException extends RuntimeException { + + public RemoteStateTransferException(String errorDesc) { + super(errorDesc); + } + + public RemoteStateTransferException(String errorDesc, Throwable cause) { + super(errorDesc, cause); + } + } + + public static class UploadedMetadataResults { + List uploadedIndexMetadata; + Map uploadedCustomMetadataMap; + Map uploadedClusterStateCustomMetadataMap; + ClusterMetadataManifest.UploadedMetadataAttribute uploadedCoordinationMetadata; + ClusterMetadataManifest.UploadedMetadataAttribute uploadedSettingsMetadata; + ClusterMetadataManifest.UploadedMetadataAttribute uploadedTransientSettingsMetadata; + ClusterMetadataManifest.UploadedMetadataAttribute uploadedTemplatesMetadata; + ClusterMetadataManifest.UploadedMetadataAttribute uploadedDiscoveryNodes; + ClusterMetadataManifest.UploadedMetadataAttribute uploadedClusterBlocks; + List uploadedIndicesRoutingMetadata; + ClusterMetadataManifest.UploadedMetadataAttribute uploadedHashesOfConsistentSettings; + + public UploadedMetadataResults( + List uploadedIndexMetadata, + Map uploadedCustomMetadataMap, + ClusterMetadataManifest.UploadedMetadataAttribute uploadedCoordinationMetadata, + ClusterMetadataManifest.UploadedMetadataAttribute uploadedSettingsMetadata, + ClusterMetadataManifest.UploadedMetadataAttribute uploadedTransientSettingsMetadata, + ClusterMetadataManifest.UploadedMetadataAttribute uploadedTemplatesMetadata, + ClusterMetadataManifest.UploadedMetadataAttribute uploadedDiscoveryNodes, + ClusterMetadataManifest.UploadedMetadataAttribute uploadedClusterBlocks, + List uploadedIndicesRoutingMetadata, + ClusterMetadataManifest.UploadedMetadataAttribute uploadedHashesOfConsistentSettings, + Map uploadedClusterStateCustomMap + ) { + this.uploadedIndexMetadata = uploadedIndexMetadata; + this.uploadedCustomMetadataMap = uploadedCustomMetadataMap; + this.uploadedCoordinationMetadata = uploadedCoordinationMetadata; + this.uploadedSettingsMetadata = uploadedSettingsMetadata; + this.uploadedTransientSettingsMetadata = uploadedTransientSettingsMetadata; + this.uploadedTemplatesMetadata = uploadedTemplatesMetadata; + this.uploadedDiscoveryNodes = uploadedDiscoveryNodes; + this.uploadedClusterBlocks = uploadedClusterBlocks; + this.uploadedIndicesRoutingMetadata = uploadedIndicesRoutingMetadata; + this.uploadedHashesOfConsistentSettings = uploadedHashesOfConsistentSettings; + this.uploadedClusterStateCustomMetadataMap = uploadedClusterStateCustomMap; + } + + public UploadedMetadataResults() { + this.uploadedIndexMetadata = new ArrayList<>(); + this.uploadedCustomMetadataMap = new HashMap<>(); + this.uploadedCoordinationMetadata = null; + this.uploadedSettingsMetadata = null; + this.uploadedTransientSettingsMetadata = null; + this.uploadedTemplatesMetadata = null; + this.uploadedDiscoveryNodes = null; + this.uploadedClusterBlocks = null; + this.uploadedIndicesRoutingMetadata = new ArrayList<>(); + this.uploadedHashesOfConsistentSettings = null; + this.uploadedClusterStateCustomMetadataMap = new HashMap<>(); + } + } } diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterBlocks.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterBlocks.java index 29bde27db150e..779961058350f 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterBlocks.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterBlocks.java @@ -8,24 +8,27 @@ package org.opensearch.gateway.remote.model; -import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION; -import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; -import static org.opensearch.gateway.remote.RemoteClusterStateUtils.METADATA_NAME_FORMAT; - -import java.io.IOException; -import java.io.InputStream; -import java.util.List; import org.opensearch.cluster.block.ClusterBlocks; import org.opensearch.common.io.Streams; +import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity; import org.opensearch.common.remote.BlobPathParameters; +import org.opensearch.core.common.io.stream.BytesStreamInput; +import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.compress.Compressor; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute; -import org.opensearch.gateway.remote.RemoteClusterStateUtils; import org.opensearch.index.remote.RemoteStoreUtils; -import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat; + +import java.io.IOException; +import java.io.InputStream; +import java.util.List; + +import static org.opensearch.core.common.bytes.BytesReference.toBytes; +import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.CLUSTER_STATE_EPHEMERAL_PATH_TOKEN; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; /** * Wrapper class for uploading/downloading {@link ClusterBlocks} to/from remote blob store @@ -33,30 +36,35 @@ public class RemoteClusterBlocks extends AbstractRemoteWritableBlobEntity { public static final String CLUSTER_BLOCKS = "blocks"; - public static final ChecksumBlobStoreFormat CLUSTER_BLOCKS_FORMAT = new ChecksumBlobStoreFormat<>( - "blocks", - METADATA_NAME_FORMAT, - ClusterBlocks::fromXContent - ); private ClusterBlocks clusterBlocks; private long stateVersion; - public RemoteClusterBlocks(final ClusterBlocks clusterBlocks, long stateVersion, String clusterUUID, - final Compressor compressor, final NamedXContentRegistry namedXContentRegistry) { + public RemoteClusterBlocks( + final ClusterBlocks clusterBlocks, + long stateVersion, + String clusterUUID, + final Compressor compressor, + final NamedXContentRegistry namedXContentRegistry + ) { super(clusterUUID, compressor, namedXContentRegistry); this.clusterBlocks = clusterBlocks; this.stateVersion = stateVersion; } - public RemoteClusterBlocks(final String blobName, final String clusterUUID, final Compressor compressor, final NamedXContentRegistry namedXContentRegistry) { + public RemoteClusterBlocks( + final String blobName, + final String clusterUUID, + final Compressor compressor, + final NamedXContentRegistry namedXContentRegistry + ) { super(clusterUUID, compressor, namedXContentRegistry); this.blobName = blobName; } @Override public BlobPathParameters getBlobPathParameters() { - return new BlobPathParameters(List.of("transient"), CLUSTER_BLOCKS); + return new BlobPathParameters(List.of(CLUSTER_STATE_EPHEMERAL_PATH_TOKEN), CLUSTER_BLOCKS); } @Override @@ -79,24 +87,16 @@ public UploadedMetadata getUploadedMetadata() { return new UploadedMetadataAttribute(CLUSTER_BLOCKS, blobName); } - @Override - public void set(final ClusterBlocks clusterBlocks) { - this.clusterBlocks = clusterBlocks; - } - - @Override - public ClusterBlocks get() { - return clusterBlocks; - } - - @Override public InputStream serialize() throws IOException { - return CLUSTER_BLOCKS_FORMAT.serialize(clusterBlocks, generateBlobFileName(), getCompressor(), RemoteClusterStateUtils.FORMAT_PARAMS).streamInput(); + BytesStreamOutput bytesStreamOutput = new BytesStreamOutput(); + clusterBlocks.writeTo(bytesStreamOutput); + return bytesStreamOutput.bytes().streamInput(); } @Override public ClusterBlocks deserialize(final InputStream inputStream) throws IOException { - return CLUSTER_BLOCKS_FORMAT.deserialize(blobName, getNamedXContentRegistry(), Streams.readFully(inputStream)); + StreamInput in = new BytesStreamInput(toBytes(Streams.readFully(inputStream))); + return ClusterBlocks.readFrom(in); } } diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateCustoms.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateCustoms.java index fffbcfab4e141..7691c11f0e32f 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateCustoms.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateCustoms.java @@ -11,54 +11,64 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterState.Custom; import org.opensearch.common.io.Streams; +import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity; import org.opensearch.common.remote.BlobPathParameters; +import org.opensearch.core.common.io.stream.BytesStreamInput; +import org.opensearch.core.common.io.stream.NamedWriteableAwareStreamInput; +import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.core.compress.Compressor; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.gateway.remote.ClusterMetadataManifest; -import org.opensearch.gateway.remote.RemoteClusterStateUtils; import org.opensearch.index.remote.RemoteStoreUtils; -import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat; import java.io.IOException; import java.io.InputStream; import java.util.List; +import static org.opensearch.cluster.ClusterState.FeatureAware.shouldSerialize; +import static org.opensearch.core.common.bytes.BytesReference.toBytes; import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.CLUSTER_STATE_EPHEMERAL_PATH_TOKEN; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.CUSTOM_DELIMITER; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; -import static org.opensearch.gateway.remote.RemoteClusterStateUtils.METADATA_NAME_FORMAT; -import static org.opensearch.gateway.remote.model.RemoteCustomMetadata.CUSTOM_DELIMITER; public class RemoteClusterStateCustoms extends AbstractRemoteWritableBlobEntity { public static final String CLUSTER_STATE_CUSTOM = "cluster-state-custom"; - public final ChecksumBlobStoreFormat clusterStateCustomBlobStoreFormat; private long stateVersion; private String customType; private ClusterState.Custom custom; + private final NamedWriteableRegistry namedWriteableRegistry; - public RemoteClusterStateCustoms(final ClusterState.Custom custom, final String customType, final long stateVersion, final String clusterUUID, final Compressor compressor, final NamedXContentRegistry namedXContentRegistry) { + public RemoteClusterStateCustoms( + final ClusterState.Custom custom, + final String customType, + final long stateVersion, + final String clusterUUID, + final Compressor compressor, + final NamedXContentRegistry namedXContentRegistry, + final NamedWriteableRegistry namedWriteableRegistry + ) { super(clusterUUID, compressor, namedXContentRegistry); this.stateVersion = stateVersion; this.customType = customType; this.custom = custom; - this.clusterStateCustomBlobStoreFormat = new ChecksumBlobStoreFormat<>( - CLUSTER_STATE_CUSTOM, - METADATA_NAME_FORMAT, - parser -> ClusterState.Custom.fromXContent(parser, customType) - ); + this.namedWriteableRegistry = namedWriteableRegistry; } - public RemoteClusterStateCustoms(final String blobName, final String customType, final String clusterUUID, final Compressor compressor, final NamedXContentRegistry namedXContentRegistry) { + public RemoteClusterStateCustoms( + final String blobName, + final String customType, + final String clusterUUID, + final Compressor compressor, + final NamedXContentRegistry namedXContentRegistry, + final NamedWriteableRegistry namedWriteableRegistry + ) { super(clusterUUID, compressor, namedXContentRegistry); this.blobName = blobName; this.customType = customType; - this.clusterStateCustomBlobStoreFormat = new ChecksumBlobStoreFormat<>( - CLUSTER_STATE_CUSTOM, - METADATA_NAME_FORMAT, - parser -> ClusterState.Custom.fromXContent(parser, customType) - ); + this.namedWriteableRegistry = namedWriteableRegistry; } @Override @@ -83,26 +93,27 @@ public String generateBlobFileName() { @Override public ClusterMetadataManifest.UploadedMetadata getUploadedMetadata() { assert blobName != null; - return new ClusterMetadataManifest.UploadedMetadataAttribute(String.join(CUSTOM_DELIMITER, CLUSTER_STATE_CUSTOM, customType), blobName); - } - - @Override - public void set(Custom custom) { - this.custom = custom; - } - - @Override - public ClusterState.Custom get() { - return custom; + return new ClusterMetadataManifest.UploadedMetadataAttribute( + String.join(CUSTOM_DELIMITER, CLUSTER_STATE_CUSTOM, customType), + blobName + ); } @Override public InputStream serialize() throws IOException { - return clusterStateCustomBlobStoreFormat.serialize(custom, generateBlobFileName(), getCompressor(), RemoteClusterStateUtils.FORMAT_PARAMS).streamInput(); + BytesStreamOutput outputStream = new BytesStreamOutput(); + if (shouldSerialize(outputStream, custom)) { + outputStream.writeNamedWriteable(custom); + } + return outputStream.bytes().streamInput(); } @Override public ClusterState.Custom deserialize(final InputStream inputStream) throws IOException { - return clusterStateCustomBlobStoreFormat.deserialize(blobName, getNamedXContentRegistry(), Streams.readFully(inputStream)); + NamedWriteableAwareStreamInput in = new NamedWriteableAwareStreamInput( + new BytesStreamInput(toBytes(Streams.readFully(inputStream))), + this.namedWriteableRegistry + ); + return in.readNamedWriteable(Custom.class); } } diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodes.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodes.java index 34aca7767a0b1..9bf0ca95ac9a9 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodes.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodes.java @@ -8,25 +8,26 @@ package org.opensearch.gateway.remote.model; -import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION; -import static org.opensearch.gateway.remote.RemoteClusterStateUtils.CLUSTER_STATE_EPHEMERAL_PATH_TOKEN; -import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; -import static org.opensearch.gateway.remote.RemoteClusterStateUtils.METADATA_NAME_FORMAT; - -import java.io.IOException; -import java.io.InputStream; -import java.util.List; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.common.io.Streams; +import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity; import org.opensearch.common.remote.BlobPathParameters; +import org.opensearch.core.common.io.stream.BytesStreamInput; import org.opensearch.core.compress.Compressor; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute; -import org.opensearch.gateway.remote.RemoteClusterStateUtils; import org.opensearch.index.remote.RemoteStoreUtils; -import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat; + +import java.io.IOException; +import java.io.InputStream; +import java.util.List; + +import static org.opensearch.core.common.bytes.BytesReference.toBytes; +import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.CLUSTER_STATE_EPHEMERAL_PATH_TOKEN; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; /** * Wrapper class for uploading/downloading {@link DiscoveryNodes} to/from remote blob store @@ -34,22 +35,28 @@ public class RemoteDiscoveryNodes extends AbstractRemoteWritableBlobEntity { public static final String DISCOVERY_NODES = "nodes"; - public static final ChecksumBlobStoreFormat DISCOVERY_NODES_FORMAT = new ChecksumBlobStoreFormat<>( - "nodes", - METADATA_NAME_FORMAT, - DiscoveryNodes::fromXContent - ); private DiscoveryNodes discoveryNodes; private long stateVersion; - public RemoteDiscoveryNodes(final DiscoveryNodes discoveryNodes, final long stateVersion, final String clusterUUID, final Compressor compressor, final NamedXContentRegistry namedXContentRegistry) { + public RemoteDiscoveryNodes( + final DiscoveryNodes discoveryNodes, + final long stateVersion, + final String clusterUUID, + final Compressor compressor, + final NamedXContentRegistry namedXContentRegistry + ) { super(clusterUUID, compressor, namedXContentRegistry); this.discoveryNodes = discoveryNodes; this.stateVersion = stateVersion; } - public RemoteDiscoveryNodes(final String blobName, final String clusterUUID, final Compressor compressor, final NamedXContentRegistry namedXContentRegistry) { + public RemoteDiscoveryNodes( + final String blobName, + final String clusterUUID, + final Compressor compressor, + final NamedXContentRegistry namedXContentRegistry + ) { super(clusterUUID, compressor, namedXContentRegistry); this.blobName = blobName; } @@ -79,23 +86,15 @@ public UploadedMetadata getUploadedMetadata() { return new UploadedMetadataAttribute(DISCOVERY_NODES, blobName); } - @Override - public void set(final DiscoveryNodes discoveryNodes) { - this.discoveryNodes = discoveryNodes; - } - - @Override - public DiscoveryNodes get() { - return discoveryNodes; - } - @Override public InputStream serialize() throws IOException { - return DISCOVERY_NODES_FORMAT.serialize(discoveryNodes, generateBlobFileName(), getCompressor(), RemoteClusterStateUtils.FORMAT_PARAMS).streamInput(); + BytesStreamOutput outputStream = new BytesStreamOutput(); + discoveryNodes.writeTo(outputStream); + return outputStream.bytes().streamInput(); } @Override public DiscoveryNodes deserialize(final InputStream inputStream) throws IOException { - return DISCOVERY_NODES_FORMAT.deserialize(blobName, getNamedXContentRegistry(), Streams.readFully(inputStream)); + return DiscoveryNodes.readFrom(new BytesStreamInput(toBytes(Streams.readFully(inputStream))), null); } } diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteHashesOfConsistentSettings.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteHashesOfConsistentSettings.java index 3dfa42139bb02..c762591e28940 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteHashesOfConsistentSettings.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteHashesOfConsistentSettings.java @@ -8,42 +8,51 @@ package org.opensearch.gateway.remote.model; -import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION; -import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; -import static org.opensearch.gateway.remote.RemoteClusterStateUtils.GLOBAL_METADATA_PATH_TOKEN; -import static org.opensearch.gateway.remote.RemoteClusterStateUtils.METADATA_NAME_FORMAT; - -import java.io.IOException; -import java.io.InputStream; -import java.util.List; import org.opensearch.cluster.metadata.DiffableStringMap; import org.opensearch.common.io.Streams; +import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity; import org.opensearch.common.remote.BlobPathParameters; +import org.opensearch.core.common.io.stream.BytesStreamInput; +import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.compress.Compressor; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.gateway.remote.ClusterMetadataManifest; -import org.opensearch.gateway.remote.RemoteClusterStateUtils; import org.opensearch.index.remote.RemoteStoreUtils; -import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat; + +import java.io.IOException; +import java.io.InputStream; +import java.util.List; + +import static org.opensearch.core.common.bytes.BytesReference.toBytes; +import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.GLOBAL_METADATA_PATH_TOKEN; public class RemoteHashesOfConsistentSettings extends AbstractRemoteWritableBlobEntity { public static final String HASHES_OF_CONSISTENT_SETTINGS = "hashes-of-consistent-settings"; - public static final ChecksumBlobStoreFormat HASHES_OF_CONSISTENT_SETTINGS_FORMAT = new ChecksumBlobStoreFormat<>( - HASHES_OF_CONSISTENT_SETTINGS, - METADATA_NAME_FORMAT, - DiffableStringMap::fromXContent - ); private DiffableStringMap hashesOfConsistentSettings; private long metadataVersion; - public RemoteHashesOfConsistentSettings(final DiffableStringMap hashesOfConsistentSettings, final long metadataVersion, final String clusterUUID, final Compressor compressor, final NamedXContentRegistry namedXContentRegistry) { + + public RemoteHashesOfConsistentSettings( + final DiffableStringMap hashesOfConsistentSettings, + final long metadataVersion, + final String clusterUUID, + final Compressor compressor, + final NamedXContentRegistry namedXContentRegistry + ) { super(clusterUUID, compressor, namedXContentRegistry); this.metadataVersion = metadataVersion; this.hashesOfConsistentSettings = hashesOfConsistentSettings; } - public RemoteHashesOfConsistentSettings(final String blobName, final String clusterUUID, final Compressor compressor, final NamedXContentRegistry namedXContentRegistry) { + public RemoteHashesOfConsistentSettings( + final String blobName, + final String clusterUUID, + final Compressor compressor, + final NamedXContentRegistry namedXContentRegistry + ) { super(clusterUUID, compressor, namedXContentRegistry); this.blobName = blobName; } @@ -72,23 +81,16 @@ public ClusterMetadataManifest.UploadedMetadata getUploadedMetadata() { return new ClusterMetadataManifest.UploadedMetadataAttribute(HASHES_OF_CONSISTENT_SETTINGS, blobName); } - @Override - public void set(final DiffableStringMap hashesOfConsistentSettings) { - this.hashesOfConsistentSettings = hashesOfConsistentSettings; - } - - @Override - public DiffableStringMap get() { - return hashesOfConsistentSettings; - } - @Override public InputStream serialize() throws IOException { - return HASHES_OF_CONSISTENT_SETTINGS_FORMAT.serialize(hashesOfConsistentSettings, generateBlobFileName(), getCompressor(), RemoteClusterStateUtils.FORMAT_PARAMS).streamInput(); + BytesStreamOutput bytesStreamOutput = new BytesStreamOutput(); + hashesOfConsistentSettings.writeTo(bytesStreamOutput); + return bytesStreamOutput.bytes().streamInput(); } @Override public DiffableStringMap deserialize(final InputStream inputStream) throws IOException { - return HASHES_OF_CONSISTENT_SETTINGS_FORMAT.deserialize(blobName, getNamedXContentRegistry(), Streams.readFully(inputStream)); + StreamInput in = new BytesStreamInput(toBytes(Streams.readFully(inputStream))); + return DiffableStringMap.readFrom(in); } } diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteTransientSettingsMetadata.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteTransientSettingsMetadata.java index 75b6cfc3a765a..fe32b95f5e957 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteTransientSettingsMetadata.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteTransientSettingsMetadata.java @@ -8,13 +8,6 @@ package org.opensearch.gateway.remote.model; -import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; -import static org.opensearch.gateway.remote.RemoteClusterStateUtils.METADATA_NAME_FORMAT; -import static org.opensearch.gateway.remote.RemoteGlobalMetadataManager.GLOBAL_METADATA_CURRENT_CODEC_VERSION; - -import java.io.IOException; -import java.io.InputStream; -import java.util.List; import org.opensearch.common.io.Streams; import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity; import org.opensearch.common.remote.BlobPathParameters; @@ -27,6 +20,15 @@ import org.opensearch.index.remote.RemoteStoreUtils; import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat; +import java.io.IOException; +import java.io.InputStream; +import java.util.List; + +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.GLOBAL_METADATA_CURRENT_CODEC_VERSION; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.GLOBAL_METADATA_PATH_TOKEN; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.METADATA_NAME_FORMAT; + /** * Wrapper class for uploading/downloading transient {@link Settings} to/from remote blob store */ @@ -43,20 +45,31 @@ public class RemoteTransientSettingsMetadata extends AbstractRemoteWritableBlobE private Settings transientSettings; private long metadataVersion; - public RemoteTransientSettingsMetadata(final Settings transientSettings, final long metadataVersion, final String clusterUUID, final Compressor compressor, final NamedXContentRegistry namedXContentRegistry) { + public RemoteTransientSettingsMetadata( + final Settings transientSettings, + final long metadataVersion, + final String clusterUUID, + final Compressor compressor, + final NamedXContentRegistry namedXContentRegistry + ) { super(clusterUUID, compressor, namedXContentRegistry); this.transientSettings = transientSettings; this.metadataVersion = metadataVersion; } - public RemoteTransientSettingsMetadata(final String blobName, final String clusterUUID, final Compressor compressor, final NamedXContentRegistry namedXContentRegistry) { + public RemoteTransientSettingsMetadata( + final String blobName, + final String clusterUUID, + final Compressor compressor, + final NamedXContentRegistry namedXContentRegistry + ) { super(clusterUUID, compressor, namedXContentRegistry); this.blobName = blobName; } @Override public BlobPathParameters getBlobPathParameters() { - return new BlobPathParameters(List.of("global-metadata"), TRANSIENT_SETTING_METADATA); + return new BlobPathParameters(List.of(GLOBAL_METADATA_PATH_TOKEN), TRANSIENT_SETTING_METADATA); } @Override @@ -72,20 +85,14 @@ public String generateBlobFileName() { return blobFileName; } - @Override - public void set(final Settings settings) { - this.transientSettings = settings; - } - - @Override - public Settings get() { - return transientSettings; - } - @Override public InputStream serialize() throws IOException { - return SETTINGS_METADATA_FORMAT.serialize(transientSettings, generateBlobFileName(), getCompressor(), RemoteClusterStateUtils.FORMAT_PARAMS) - .streamInput(); + return SETTINGS_METADATA_FORMAT.serialize( + transientSettings, + generateBlobFileName(), + getCompressor(), + RemoteClusterStateUtils.FORMAT_PARAMS + ).streamInput(); } @Override diff --git a/server/src/test/java/org/opensearch/cluster/block/ClusterBlockTests.java b/server/src/test/java/org/opensearch/cluster/block/ClusterBlockTests.java index 04e04bd96a7d3..0f049f99b49c0 100644 --- a/server/src/test/java/org/opensearch/cluster/block/ClusterBlockTests.java +++ b/server/src/test/java/org/opensearch/cluster/block/ClusterBlockTests.java @@ -136,7 +136,7 @@ public void testGetIndexBlockWithId() { assertThat(builder.build().getIndexBlockWithId("index", randomValueOtherThan(blockId, OpenSearchTestCase::randomInt)), nullValue()); } - private ClusterBlock randomClusterBlock() { + public static ClusterBlock randomClusterBlock() { final String uuid = randomBoolean() ? UUIDs.randomBase64UUID() : null; final List levels = Arrays.asList(ClusterBlockLevel.values()); return new ClusterBlock( diff --git a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java index d1f559eb75f85..21893257b0dc5 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java @@ -91,42 +91,43 @@ public void testClusterMetadataManifestXContentV1() throws IOException { public void testClusterMetadataManifestXContent() throws IOException { UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "test-uuid", "/test/upload/path"); - ClusterMetadataManifest originalManifest = new ClusterMetadataManifest( - 1L, - 1L, - "test-cluster-uuid", - "test-state-uuid", - Version.CURRENT, - "test-node-id", - false, - ClusterMetadataManifest.CODEC_V3, - null, - Collections.singletonList(uploadedIndexMetadata), - "prev-cluster-uuid", - true, - new UploadedMetadataAttribute(RemoteClusterStateService.COORDINATION_METADATA, "coordination-file"), - new UploadedMetadataAttribute(RemoteClusterStateService.SETTING_METADATA, "setting-file"), - new UploadedMetadataAttribute(RemoteClusterStateService.TEMPLATES_METADATA, "templates-file"), - Collections.unmodifiableList( - Arrays.asList( - new UploadedMetadataAttribute( - RemoteClusterStateService.CUSTOM_METADATA + RemoteClusterStateService.CUSTOM_DELIMITER + RepositoriesMetadata.TYPE, - "custom--repositories-file" - ), - new UploadedMetadataAttribute( - RemoteClusterStateService.CUSTOM_METADATA + RemoteClusterStateService.CUSTOM_DELIMITER + IndexGraveyard.TYPE, - "custom--index_graveyard-file" - ), - new UploadedMetadataAttribute( - RemoteClusterStateService.CUSTOM_METADATA + RemoteClusterStateService.CUSTOM_DELIMITER - + WeightedRoutingMetadata.TYPE, - "custom--weighted_routing_netadata-file" + ClusterMetadataManifest originalManifest = ClusterMetadataManifest.builder() + .clusterTerm(1L) + .stateVersion(1L) + .clusterUUID("test-cluster-uuid") + .stateUUID("test-state-uuid") + .opensearchVersion(Version.CURRENT) + .nodeId("test-node-id") + .committed(false) + .codecVersion(ClusterMetadataManifest.CODEC_V3) + .indices(Collections.singletonList(uploadedIndexMetadata)) + .previousClusterUUID("prev-cluster-uuid") + .clusterUUIDCommitted(true) + .coordinationMetadata(new UploadedMetadataAttribute(RemoteClusterStateService.COORDINATION_METADATA, "coordination-file")) + .settingMetadata(new UploadedMetadataAttribute(RemoteClusterStateService.SETTING_METADATA, "setting-file")) + .templatesMetadata(new UploadedMetadataAttribute(RemoteClusterStateService.TEMPLATES_METADATA, "templates-file")) + .customMetadataMap( + Collections.unmodifiableList( + Arrays.asList( + new UploadedMetadataAttribute( + RemoteClusterStateService.CUSTOM_METADATA + RemoteClusterStateService.CUSTOM_DELIMITER + + RepositoriesMetadata.TYPE, + "custom--repositories-file" + ), + new UploadedMetadataAttribute( + RemoteClusterStateService.CUSTOM_METADATA + RemoteClusterStateService.CUSTOM_DELIMITER + IndexGraveyard.TYPE, + "custom--index_graveyard-file" + ), + new UploadedMetadataAttribute( + RemoteClusterStateService.CUSTOM_METADATA + RemoteClusterStateService.CUSTOM_DELIMITER + + WeightedRoutingMetadata.TYPE, + "custom--weighted_routing_netadata-file" + ) ) - ) - ).stream().collect(Collectors.toMap(UploadedMetadataAttribute::getAttributeName, Function.identity())), - 1L, - randomUploadedIndexMetadataList() - ); + ).stream().collect(Collectors.toMap(UploadedMetadataAttribute::getAttributeName, Function.identity())) + ) + .routingTableVersion(1L) + .build(); final XContentBuilder builder = JsonXContent.contentBuilder(); builder.startObject(); originalManifest.toXContent(builder, ToXContent.EMPTY_PARAMS); @@ -139,42 +140,43 @@ public void testClusterMetadataManifestXContent() throws IOException { } public void testClusterMetadataManifestSerializationEqualsHashCode() { - ClusterMetadataManifest initialManifest = new ClusterMetadataManifest( - 1337L, - 7L, - "HrYF3kP5SmSPWtKlWhnNSA", - "6By9p9G0Rv2MmFYJcPAOgA", - Version.CURRENT, - "B10RX1f5RJenMQvYccCgSQ", - true, - 2, - null, - randomUploadedIndexMetadataList(), - "yfObdx8KSMKKrXf8UyHhM", - true, - new UploadedMetadataAttribute(RemoteClusterStateService.COORDINATION_METADATA, "coordination-file"), - new UploadedMetadataAttribute(RemoteClusterStateService.SETTING_METADATA, "setting-file"), - new UploadedMetadataAttribute(RemoteClusterStateService.TEMPLATES_METADATA, "templates-file"), - Collections.unmodifiableList( - Arrays.asList( - new UploadedMetadataAttribute( - RemoteClusterStateService.CUSTOM_METADATA + RemoteClusterStateService.CUSTOM_DELIMITER + RepositoriesMetadata.TYPE, - "custom--repositories-file" - ), - new UploadedMetadataAttribute( - RemoteClusterStateService.CUSTOM_METADATA + RemoteClusterStateService.CUSTOM_DELIMITER + IndexGraveyard.TYPE, - "custom--index_graveyard-file" - ), - new UploadedMetadataAttribute( - RemoteClusterStateService.CUSTOM_METADATA + RemoteClusterStateService.CUSTOM_DELIMITER - + WeightedRoutingMetadata.TYPE, - "custom--weighted_routing_netadata-file" + ClusterMetadataManifest initialManifest = ClusterMetadataManifest.builder() + .clusterTerm(1337L) + .stateVersion(7L) + .clusterUUID("HrYF3kP5SmSPWtKlWhnNSA") + .stateUUID("6By9p9G0Rv2MmFYJcPAOgA") + .opensearchVersion(Version.CURRENT) + .nodeId("B10RX1f5RJenMQvYccCgSQ") + .committed(true) + .codecVersion(ClusterMetadataManifest.CODEC_V3) + .indices(randomUploadedIndexMetadataList()) + .previousClusterUUID("yfObdx8KSMKKrXf8UyHhM") + .clusterUUIDCommitted(true) + .coordinationMetadata(new UploadedMetadataAttribute(RemoteClusterStateService.COORDINATION_METADATA, "coordination-file")) + .settingMetadata(new UploadedMetadataAttribute(RemoteClusterStateService.SETTING_METADATA, "setting-file")) + .templatesMetadata(new UploadedMetadataAttribute(RemoteClusterStateService.TEMPLATES_METADATA, "templates-file")) + .customMetadataMap( + Collections.unmodifiableList( + Arrays.asList( + new UploadedMetadataAttribute( + RemoteClusterStateService.CUSTOM_METADATA + RemoteClusterStateService.CUSTOM_DELIMITER + + RepositoriesMetadata.TYPE, + "custom--repositories-file" + ), + new UploadedMetadataAttribute( + RemoteClusterStateService.CUSTOM_METADATA + RemoteClusterStateService.CUSTOM_DELIMITER + IndexGraveyard.TYPE, + "custom--index_graveyard-file" + ), + new UploadedMetadataAttribute( + RemoteClusterStateService.CUSTOM_METADATA + RemoteClusterStateService.CUSTOM_DELIMITER + + WeightedRoutingMetadata.TYPE, + "custom--weighted_routing_netadata-file" + ) ) - ) - ).stream().collect(Collectors.toMap(UploadedMetadataAttribute::getAttributeName, Function.identity())), - 1L, - randomUploadedIndexMetadataList() - ); + ).stream().collect(Collectors.toMap(UploadedMetadataAttribute::getAttributeName, Function.identity())) + ) + .routingTableVersion(1L) + .build(); { // Mutate Cluster Term EqualsHashCodeTestUtils.checkEqualsAndHashCode( initialManifest, @@ -316,42 +318,43 @@ public void testClusterMetadataManifestSerializationEqualsHashCode() { public void testClusterMetadataManifestXContentV2() throws IOException { UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "test-uuid", "/test/upload/path"); UploadedMetadataAttribute uploadedMetadataAttribute = new UploadedMetadataAttribute("attribute_name", "testing_attribute"); - ClusterMetadataManifest originalManifest = new ClusterMetadataManifest( - 1L, - 1L, - "test-cluster-uuid", - "test-state-uuid", - Version.CURRENT, - "test-node-id", - false, - ClusterMetadataManifest.CODEC_V2, - null, - Collections.singletonList(uploadedIndexMetadata), - "prev-cluster-uuid", - true, - uploadedMetadataAttribute, - uploadedMetadataAttribute, - uploadedMetadataAttribute, - Collections.unmodifiableList( - Arrays.asList( - new UploadedMetadataAttribute( - RemoteClusterStateService.CUSTOM_METADATA + RemoteClusterStateService.CUSTOM_DELIMITER + RepositoriesMetadata.TYPE, - "custom--repositories-file" - ), - new UploadedMetadataAttribute( - RemoteClusterStateService.CUSTOM_METADATA + RemoteClusterStateService.CUSTOM_DELIMITER + IndexGraveyard.TYPE, - "custom--index_graveyard-file" - ), - new UploadedMetadataAttribute( - RemoteClusterStateService.CUSTOM_METADATA + RemoteClusterStateService.CUSTOM_DELIMITER - + WeightedRoutingMetadata.TYPE, - "custom--weighted_routing_netadata-file" + ClusterMetadataManifest originalManifest = ClusterMetadataManifest.builder() + .clusterTerm(1L) + .stateVersion(1L) + .clusterUUID("test-cluster-uuid") + .stateUUID("test-state-uuid") + .opensearchVersion(Version.CURRENT) + .nodeId("test-node-id") + .committed(false) + .codecVersion(ClusterMetadataManifest.CODEC_V3) + .indices(Collections.singletonList(uploadedIndexMetadata)) + .previousClusterUUID("prev-cluster-uuid") + .clusterUUIDCommitted(true) + .coordinationMetadata(uploadedMetadataAttribute) + .settingMetadata(uploadedMetadataAttribute) + .templatesMetadata(uploadedMetadataAttribute) + .customMetadataMap( + Collections.unmodifiableList( + Arrays.asList( + new UploadedMetadataAttribute( + RemoteClusterStateService.CUSTOM_METADATA + RemoteClusterStateService.CUSTOM_DELIMITER + + RepositoriesMetadata.TYPE, + "custom--repositories-file" + ), + new UploadedMetadataAttribute( + RemoteClusterStateService.CUSTOM_METADATA + RemoteClusterStateService.CUSTOM_DELIMITER + IndexGraveyard.TYPE, + "custom--index_graveyard-file" + ), + new UploadedMetadataAttribute( + RemoteClusterStateService.CUSTOM_METADATA + RemoteClusterStateService.CUSTOM_DELIMITER + + WeightedRoutingMetadata.TYPE, + "custom--weighted_routing_netadata-file" + ) ) - ) - ).stream().collect(Collectors.toMap(UploadedMetadataAttribute::getAttributeName, Function.identity())), - 0, - new ArrayList<>() - ); + ).stream().collect(Collectors.toMap(UploadedMetadataAttribute::getAttributeName, Function.identity())) + ) + .routingTableVersion(0) + .build(); final XContentBuilder builder = JsonXContent.contentBuilder(); builder.startObject(); originalManifest.toXContent(builder, ToXContent.EMPTY_PARAMS); @@ -366,42 +369,44 @@ public void testClusterMetadataManifestXContentV2() throws IOException { public void testClusterMetadataManifestXContentV3() throws IOException { UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "test-uuid", "/test/upload/path"); UploadedMetadataAttribute uploadedMetadataAttribute = new UploadedMetadataAttribute("attribute_name", "testing_attribute"); - ClusterMetadataManifest originalManifest = new ClusterMetadataManifest( - 1L, - 1L, - "test-cluster-uuid", - "test-state-uuid", - Version.CURRENT, - "test-node-id", - false, - ClusterMetadataManifest.CODEC_V3, - null, - Collections.singletonList(uploadedIndexMetadata), - "prev-cluster-uuid", - true, - uploadedMetadataAttribute, - uploadedMetadataAttribute, - uploadedMetadataAttribute, - Collections.unmodifiableList( - Arrays.asList( - new UploadedMetadataAttribute( - RemoteClusterStateService.CUSTOM_METADATA + RemoteClusterStateService.CUSTOM_DELIMITER + RepositoriesMetadata.TYPE, - "custom--repositories-file" - ), - new UploadedMetadataAttribute( - RemoteClusterStateService.CUSTOM_METADATA + RemoteClusterStateService.CUSTOM_DELIMITER + IndexGraveyard.TYPE, - "custom--index_graveyard-file" - ), - new UploadedMetadataAttribute( - RemoteClusterStateService.CUSTOM_METADATA + RemoteClusterStateService.CUSTOM_DELIMITER - + WeightedRoutingMetadata.TYPE, - "custom--weighted_routing_netadata-file" + ClusterMetadataManifest originalManifest = ClusterMetadataManifest.builder() + .clusterTerm(1L) + .stateVersion(1L) + .clusterUUID("test-cluster-uuid") + .stateUUID("test-state-uuid") + .opensearchVersion(Version.CURRENT) + .nodeId("test-node-id") + .committed(false) + .codecVersion(ClusterMetadataManifest.CODEC_V3) + .indices(Collections.singletonList(uploadedIndexMetadata)) + .previousClusterUUID("prev-cluster-uuid") + .clusterUUIDCommitted(true) + .coordinationMetadata(uploadedMetadataAttribute) + .settingMetadata(uploadedMetadataAttribute) + .templatesMetadata(uploadedMetadataAttribute) + .customMetadataMap( + Collections.unmodifiableList( + Arrays.asList( + new UploadedMetadataAttribute( + RemoteClusterStateService.CUSTOM_METADATA + RemoteClusterStateService.CUSTOM_DELIMITER + + RepositoriesMetadata.TYPE, + "custom--repositories-file" + ), + new UploadedMetadataAttribute( + RemoteClusterStateService.CUSTOM_METADATA + RemoteClusterStateService.CUSTOM_DELIMITER + IndexGraveyard.TYPE, + "custom--index_graveyard-file" + ), + new UploadedMetadataAttribute( + RemoteClusterStateService.CUSTOM_METADATA + RemoteClusterStateService.CUSTOM_DELIMITER + + WeightedRoutingMetadata.TYPE, + "custom--weighted_routing_netadata-file" + ) ) - ) - ).stream().collect(Collectors.toMap(UploadedMetadataAttribute::getAttributeName, Function.identity())), - 1L, - Collections.singletonList(uploadedIndexMetadata) - ); + ).stream().collect(Collectors.toMap(UploadedMetadataAttribute::getAttributeName, Function.identity())) + ) + .routingTableVersion(1L) + .indicesRouting(Collections.singletonList(uploadedIndexMetadata)) + .build(); final XContentBuilder builder = JsonXContent.contentBuilder(); builder.startObject(); originalManifest.toXContent(builder, ToXContent.EMPTY_PARAMS); diff --git a/server/src/test/java/org/opensearch/gateway/remote/model/RemoteClusterBlocksTests.java b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteClusterBlocksTests.java new file mode 100644 index 0000000000000..fffdffa0c2373 --- /dev/null +++ b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteClusterBlocksTests.java @@ -0,0 +1,215 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote.model; + +import org.opensearch.cluster.ClusterModule; +import org.opensearch.cluster.block.ClusterBlocks; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.network.NetworkModule; +import org.opensearch.common.remote.BlobPathParameters; +import org.opensearch.core.compress.Compressor; +import org.opensearch.core.compress.NoneCompressor; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.gateway.remote.ClusterMetadataManifest; +import org.opensearch.gateway.remote.RemoteClusterStateUtils; +import org.opensearch.index.remote.RemoteStoreUtils; +import org.opensearch.indices.IndicesModule; +import org.opensearch.test.OpenSearchTestCase; +import org.junit.Before; + +import java.io.IOException; +import java.io.InputStream; +import java.util.List; +import java.util.function.Function; +import java.util.stream.Stream; + +import static java.util.stream.Collectors.toList; +import static org.opensearch.cluster.block.ClusterBlockTests.randomClusterBlock; +import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.CLUSTER_STATE_EPHEMERAL_PATH_TOKEN; +import static org.opensearch.gateway.remote.model.RemoteClusterBlocks.CLUSTER_BLOCKS; + +public class RemoteClusterBlocksTests extends OpenSearchTestCase { + private static final String TEST_BLOB_NAME = "/test-path/test-blob-name"; + private static final String TEST_BLOB_PATH = "test-path"; + private static final String TEST_BLOB_FILE_NAME = "test-blob-name"; + private static final long METADATA_VERSION = 3L; + private String clusterUUID; + private Compressor compressor; + private NamedXContentRegistry namedXContentRegistry; + + @Before + public void setup() { + this.clusterUUID = "test-cluster-uuid"; + compressor = new NoneCompressor(); + namedXContentRegistry = new NamedXContentRegistry( + Stream.of( + NetworkModule.getNamedXContents().stream(), + IndicesModule.getNamedXContents().stream(), + ClusterModule.getNamedXWriteables().stream() + ).flatMap(Function.identity()).collect(toList()) + ); + } + + public void testClusterUUID() { + ClusterBlocks clusterBlocks = randomClusterBlocks(); + RemoteClusterBlocks remoteObjectForUpload = new RemoteClusterBlocks( + clusterBlocks, + METADATA_VERSION, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertEquals(remoteObjectForUpload.clusterUUID(), clusterUUID); + + RemoteClusterBlocks remoteObjectForDownload = new RemoteClusterBlocks( + TEST_BLOB_NAME, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertEquals(remoteObjectForDownload.clusterUUID(), clusterUUID); + } + + public void testFullBlobName() { + ClusterBlocks clusterBlocks = randomClusterBlocks(); + RemoteClusterBlocks remoteObjectForUpload = new RemoteClusterBlocks( + clusterBlocks, + METADATA_VERSION, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertNull(remoteObjectForUpload.getFullBlobName()); + + RemoteClusterBlocks remoteObjectForDownload = new RemoteClusterBlocks( + TEST_BLOB_NAME, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertEquals(remoteObjectForDownload.getFullBlobName(), TEST_BLOB_NAME); + } + + public void testBlobFileName() { + ClusterBlocks clusterBlocks = randomClusterBlocks(); + RemoteClusterBlocks remoteObjectForUpload = new RemoteClusterBlocks( + clusterBlocks, + METADATA_VERSION, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertNull(remoteObjectForUpload.getBlobFileName()); + + RemoteClusterBlocks remoteObjectForDownload = new RemoteClusterBlocks( + TEST_BLOB_NAME, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertEquals(remoteObjectForDownload.getBlobFileName(), TEST_BLOB_FILE_NAME); + } + + public void testBlobPathTokens() { + String uploadedFile = "user/local/opensearch/cluster-blocks"; + RemoteClusterBlocks remoteObjectForDownload = new RemoteClusterBlocks(uploadedFile, clusterUUID, compressor, namedXContentRegistry); + assertArrayEquals(remoteObjectForDownload.getBlobPathTokens(), new String[] { "user", "local", "opensearch", "cluster-blocks" }); + } + + public void testBlobPathParameters() { + ClusterBlocks clusterBlocks = randomClusterBlocks(); + RemoteClusterBlocks remoteObjectForUpload = new RemoteClusterBlocks( + clusterBlocks, + METADATA_VERSION, + clusterUUID, + compressor, + namedXContentRegistry + ); + BlobPathParameters params = remoteObjectForUpload.getBlobPathParameters(); + assertEquals(params.getPathTokens(), List.of(CLUSTER_STATE_EPHEMERAL_PATH_TOKEN)); + assertEquals(params.getFilePrefix(), CLUSTER_BLOCKS); + } + + public void testGenerateBlobFileName() { + ClusterBlocks clusterBlocks = randomClusterBlocks(); + RemoteClusterBlocks remoteObjectForUpload = new RemoteClusterBlocks( + clusterBlocks, + METADATA_VERSION, + clusterUUID, + compressor, + namedXContentRegistry + ); + String blobFileName = remoteObjectForUpload.generateBlobFileName(); + String[] nameTokens = blobFileName.split(RemoteClusterStateUtils.DELIMITER); + assertEquals(nameTokens[0], CLUSTER_BLOCKS); + assertEquals(RemoteStoreUtils.invertLong(nameTokens[1]), METADATA_VERSION); + assertTrue(RemoteStoreUtils.invertLong(nameTokens[2]) <= System.currentTimeMillis()); + assertEquals(nameTokens[3], String.valueOf(CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION)); + + } + + public void testGetUploadedMetadata() throws IOException { + ClusterBlocks clusterBlocks = randomClusterBlocks(); + RemoteClusterBlocks remoteObjectForUpload = new RemoteClusterBlocks( + clusterBlocks, + METADATA_VERSION, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertThrows(AssertionError.class, remoteObjectForUpload::getUploadedMetadata); + + try (InputStream inputStream = remoteObjectForUpload.serialize()) { + remoteObjectForUpload.setFullBlobName(new BlobPath().add(TEST_BLOB_PATH)); + ClusterMetadataManifest.UploadedMetadata uploadedMetadata = remoteObjectForUpload.getUploadedMetadata(); + assertEquals(uploadedMetadata.getComponent(), CLUSTER_BLOCKS); + assertEquals(uploadedMetadata.getUploadedFilename(), remoteObjectForUpload.getFullBlobName()); + } + } + + public void testSerDe() throws IOException { + ClusterBlocks clusterBlocks = randomClusterBlocks(); + RemoteClusterBlocks remoteObjectForUpload = new RemoteClusterBlocks( + clusterBlocks, + METADATA_VERSION, + clusterUUID, + compressor, + namedXContentRegistry + ); + try (InputStream inputStream = remoteObjectForUpload.serialize()) { + remoteObjectForUpload.setFullBlobName(BlobPath.cleanPath()); + assertTrue(inputStream.available() > 0); + ClusterBlocks readClusterBlocks = remoteObjectForUpload.deserialize(inputStream); + assertEquals(clusterBlocks.global(), readClusterBlocks.global()); + assertEquals(clusterBlocks.indices().keySet(), readClusterBlocks.indices().keySet()); + for (String index : clusterBlocks.indices().keySet()) { + assertEquals(clusterBlocks.indices().get(index), readClusterBlocks.indices().get(index)); + } + + } + } + + private ClusterBlocks randomClusterBlocks() { + ClusterBlocks.Builder builder = ClusterBlocks.builder(); + int randomGlobalBlocks = randomIntBetween(0, 10); + for (int i = 0; i < randomGlobalBlocks; i++) { + builder.addGlobalBlock(randomClusterBlock()); + } + + int randomIndices = randomIntBetween(0, 10); + for (int i = 0; i < randomIndices; i++) { + int randomIndexBlocks = randomIntBetween(0, 10); + for (int j = 0; j < randomIndexBlocks; j++) { + builder.addIndexBlock("index-" + i, randomClusterBlock()); + } + } + return builder.build(); + } +} diff --git a/server/src/test/java/org/opensearch/gateway/remote/model/RemoteTransientSettingsMetadataTests.java b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteTransientSettingsMetadataTests.java new file mode 100644 index 0000000000000..4061ab4a6b5ef --- /dev/null +++ b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteTransientSettingsMetadataTests.java @@ -0,0 +1,200 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote.model; + +import org.opensearch.cluster.ClusterModule; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.network.NetworkModule; +import org.opensearch.common.remote.BlobPathParameters; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.compress.Compressor; +import org.opensearch.core.compress.NoneCompressor; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.gateway.remote.ClusterMetadataManifest; +import org.opensearch.gateway.remote.RemoteClusterStateUtils; +import org.opensearch.index.remote.RemoteStoreUtils; +import org.opensearch.indices.IndicesModule; +import org.opensearch.test.OpenSearchTestCase; +import org.junit.Before; + +import java.io.IOException; +import java.io.InputStream; +import java.util.List; +import java.util.function.Function; +import java.util.stream.Stream; + +import static java.util.stream.Collectors.toList; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.GLOBAL_METADATA_CURRENT_CODEC_VERSION; +import static org.opensearch.gateway.remote.model.RemoteTransientSettingsMetadata.TRANSIENT_SETTING_METADATA; + +public class RemoteTransientSettingsMetadataTests extends OpenSearchTestCase { + private static final String TEST_BLOB_NAME = "/test-path/test-blob-name"; + private static final String TEST_BLOB_PATH = "test-path"; + private static final String TEST_BLOB_FILE_NAME = "test-blob-name"; + private static final long METADATA_VERSION = 3L; + private String clusterUUID; + private Compressor compressor; + private NamedXContentRegistry namedXContentRegistry; + + @Before + public void setup() { + this.clusterUUID = "test-cluster-uuid"; + compressor = new NoneCompressor(); + namedXContentRegistry = new NamedXContentRegistry( + Stream.of( + NetworkModule.getNamedXContents().stream(), + IndicesModule.getNamedXContents().stream(), + ClusterModule.getNamedXWriteables().stream() + ).flatMap(Function.identity()).collect(toList()) + ); + } + + public void testClusterUUID() { + Settings settings = getSettings(); + RemoteTransientSettingsMetadata remoteObjectForUpload = new RemoteTransientSettingsMetadata( + settings, + METADATA_VERSION, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertEquals(remoteObjectForUpload.clusterUUID(), clusterUUID); + + RemoteTransientSettingsMetadata remoteObjectForDownload = new RemoteTransientSettingsMetadata( + TEST_BLOB_NAME, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertEquals(remoteObjectForDownload.clusterUUID(), clusterUUID); + } + + public void testFullBlobName() { + Settings settings = getSettings(); + RemoteTransientSettingsMetadata remoteObjectForUpload = new RemoteTransientSettingsMetadata( + settings, + METADATA_VERSION, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertNull(remoteObjectForUpload.getFullBlobName()); + + RemoteTransientSettingsMetadata remoteObjectForDownload = new RemoteTransientSettingsMetadata( + TEST_BLOB_NAME, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertEquals(remoteObjectForDownload.getFullBlobName(), TEST_BLOB_NAME); + } + + public void testBlobFileName() { + Settings settings = getSettings(); + RemoteTransientSettingsMetadata remoteObjectForUpload = new RemoteTransientSettingsMetadata( + settings, + METADATA_VERSION, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertNull(remoteObjectForUpload.getBlobFileName()); + + RemoteTransientSettingsMetadata remoteObjectForDownload = new RemoteTransientSettingsMetadata( + TEST_BLOB_NAME, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertEquals(remoteObjectForDownload.getBlobFileName(), TEST_BLOB_FILE_NAME); + } + + public void testBlobPathTokens() { + String uploadedFile = "user/local/opensearch/settings"; + RemoteTransientSettingsMetadata remoteObjectForDownload = new RemoteTransientSettingsMetadata( + uploadedFile, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertArrayEquals(remoteObjectForDownload.getBlobPathTokens(), new String[] { "user", "local", "opensearch", "settings" }); + } + + public void testBlobPathParameters() { + Settings settings = getSettings(); + RemoteTransientSettingsMetadata remoteObjectForUpload = new RemoteTransientSettingsMetadata( + settings, + METADATA_VERSION, + clusterUUID, + compressor, + namedXContentRegistry + ); + BlobPathParameters params = remoteObjectForUpload.getBlobPathParameters(); + assertEquals(params.getPathTokens(), List.of(RemoteClusterStateUtils.GLOBAL_METADATA_PATH_TOKEN)); + assertEquals(params.getFilePrefix(), TRANSIENT_SETTING_METADATA); + } + + public void testGenerateBlobFileName() { + Settings settings = getSettings(); + RemoteTransientSettingsMetadata remoteObjectForUpload = new RemoteTransientSettingsMetadata( + settings, + METADATA_VERSION, + clusterUUID, + compressor, + namedXContentRegistry + ); + String blobFileName = remoteObjectForUpload.generateBlobFileName(); + String[] nameTokens = blobFileName.split(RemoteClusterStateUtils.DELIMITER); + assertEquals(nameTokens[0], TRANSIENT_SETTING_METADATA); + assertEquals(RemoteStoreUtils.invertLong(nameTokens[1]), METADATA_VERSION); + assertTrue(RemoteStoreUtils.invertLong(nameTokens[2]) <= System.currentTimeMillis()); + assertEquals(nameTokens[3], String.valueOf(GLOBAL_METADATA_CURRENT_CODEC_VERSION)); + + } + + public void testGetUploadedMetadata() throws IOException { + Settings settings = getSettings(); + RemoteTransientSettingsMetadata remoteObjectForUpload = new RemoteTransientSettingsMetadata( + settings, + METADATA_VERSION, + clusterUUID, + compressor, + namedXContentRegistry + ); + assertThrows(AssertionError.class, remoteObjectForUpload::getUploadedMetadata); + + try (InputStream inputStream = remoteObjectForUpload.serialize()) { + remoteObjectForUpload.setFullBlobName(new BlobPath().add(TEST_BLOB_PATH)); + ClusterMetadataManifest.UploadedMetadata uploadedMetadata = remoteObjectForUpload.getUploadedMetadata(); + assertEquals(uploadedMetadata.getComponent(), TRANSIENT_SETTING_METADATA); + assertEquals(uploadedMetadata.getUploadedFilename(), remoteObjectForUpload.getFullBlobName()); + } + } + + public void testSerDe() throws IOException { + Settings settings = getSettings(); + RemoteTransientSettingsMetadata remoteObjectForUpload = new RemoteTransientSettingsMetadata( + settings, + METADATA_VERSION, + clusterUUID, + compressor, + namedXContentRegistry + ); + try (InputStream inputStream = remoteObjectForUpload.serialize()) { + remoteObjectForUpload.setFullBlobName(BlobPath.cleanPath()); + assertTrue(inputStream.available() > 0); + Settings readsettings = remoteObjectForUpload.deserialize(inputStream); + assertEquals(readsettings, settings); + } + } + + private Settings getSettings() { + return Settings.builder().put("random_index_setting_" + randomAlphaOfLength(3), randomAlphaOfLength(5)).build(); + } +}