From 255075c0dabafb598adc50a9b1a4cbd16b212b12 Mon Sep 17 00:00:00 2001 From: Shivansh Arora Date: Wed, 5 Jun 2024 20:55:10 +0530 Subject: [PATCH] Add POJO classes required for cluster state publication from remote Signed-off-by: Shivansh Arora --- .../opensearch/cluster/metadata/Metadata.java | 4 + .../remote/ClusterMetadataManifest.java | 292 +++++++++- .../remote/ClusterStateDiffManifest.java | 549 ++++++++++++++++++ .../RemoteClusterStateAttributesManager.java | 134 +++++ .../remote/model/RemoteClusterBlocks.java | 102 ++++ .../model/RemoteClusterStateCustoms.java | 108 ++++ .../remote/model/RemoteDiscoveryNodes.java | 101 ++++ .../RemoteHashesOfConsistentSettings.java | 94 +++ .../remote/model/RemoteReadResult.java | 39 ++ .../RemoteTransientSettingsMetadata.java | 101 ++++ 10 files changed, 1505 insertions(+), 19 deletions(-) create mode 100644 server/src/main/java/org/opensearch/gateway/remote/ClusterStateDiffManifest.java create mode 100644 server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java create mode 100644 server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterBlocks.java create mode 100644 server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateCustoms.java create mode 100644 server/src/main/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodes.java create mode 100644 server/src/main/java/org/opensearch/gateway/remote/model/RemoteHashesOfConsistentSettings.java create mode 100644 server/src/main/java/org/opensearch/gateway/remote/model/RemoteReadResult.java create mode 100644 server/src/main/java/org/opensearch/gateway/remote/model/RemoteTransientSettingsMetadata.java diff --git a/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java b/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java index e1aa5626f36c1..232f900f25375 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java @@ -973,6 +973,10 @@ public static boolean isSettingsMetadataEqual(Metadata metadata1, Metadata metad return metadata1.persistentSettings.equals(metadata2.persistentSettings); } + public static boolean isTransientSettingsMetadataEqual(Metadata metadata1, Metadata metadata2) { + return metadata1.transientSettings.equals(metadata2.transientSettings); + } + public static boolean isTemplatesMetadataEqual(Metadata metadata1, Metadata metadata2) { return metadata1.templates.equals(metadata2.templates); } diff --git a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java index b3b1bf37f8696..e2963ee228175 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java +++ b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java @@ -8,6 +8,8 @@ package org.opensearch.gateway.remote; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.opensearch.Version; import org.opensearch.core.ParseField; import org.opensearch.core.common.Strings; @@ -41,7 +43,7 @@ public class ClusterMetadataManifest implements Writeable, ToXContentFragment { public static final int CODEC_V0 = 0; // Older codec version, where we haven't introduced codec versions for manifest. public static final int CODEC_V1 = 1; // In Codec V1 we have introduced global-metadata and codec version in Manifest file. public static final int CODEC_V2 = 2; // In Codec V2, there are seperate metadata files rather than a single global metadata file. - public static final int CODEC_V3 = 3; // In Codec V3, we introduce index routing-metadata in manifest file. + public static final int CODEC_V3 = 3; // In Codec V3, we introduce index routing-metadata, diff and other attributes as part of manifest required for state publication private static final ParseField CLUSTER_TERM_FIELD = new ParseField("cluster_term"); private static final ParseField STATE_VERSION_FIELD = new ParseField("state_version"); @@ -61,6 +63,13 @@ public class ClusterMetadataManifest implements Writeable, ToXContentFragment { private static final ParseField UPLOADED_CUSTOM_METADATA = new ParseField("uploaded_custom_metadata"); private static final ParseField ROUTING_TABLE_VERSION_FIELD = new ParseField("routing_table_version"); private static final ParseField INDICES_ROUTING_FIELD = new ParseField("indices_routing"); + private static final ParseField METADATA_VERSION = new ParseField("metadata_version"); + private static final ParseField UPLOADED_TRANSIENT_SETTINGS_METADATA = new ParseField("uploaded_transient_settings_metadata"); + private static final ParseField UPLOADED_DISCOVERY_NODES_METADATA = new ParseField("uploaded_discovery_nodes_metadata"); + private static final ParseField UPLOADED_CLUSTER_BLOCKS_METADATA = new ParseField("uploaded_cluster_blocks_metadata"); + private static final ParseField UPLOADED_HASHES_OF_CONSISTENT_SETTINGS_METADATA = new ParseField("uploaded_hashes_of_consistent_settings_metadata"); + private static final ParseField UPLOADED_CLUSTER_STATE_CUSTOM_METADATA = new ParseField("uploaded_cluster_state_custom_metadata"); + private static final ParseField DIFF_MANIFEST = new ParseField("diff_manifest"); private static ClusterMetadataManifest.Builder manifestV0Builder(Object[] fields) { return ClusterMetadataManifest.builder() @@ -90,9 +99,16 @@ private static ClusterMetadataManifest.Builder manifestV2Builder(Object[] fields } private static ClusterMetadataManifest.Builder manifestV3Builder(Object[] fields) { - return manifestV2Builder(fields).codecVersion(codecVersion(fields)) + return manifestV2Builder(fields) .routingTableVersion(routingTableVersion(fields)) - .indicesRouting(indicesRouting(fields)); + .indicesRouting(indicesRouting(fields)) + .discoveryNodesMetadata(discoveryNodesMetadata(fields)) + .clusterBlocksMetadata(clusterBlocksMetadata(fields)) + .diffManifest(diffManifest(fields)) + .metadataVersion(metadataVersion(fields)) + .transientSettingsMetadata(transientSettingsMetadata(fields)) + .hashesOfConsistentSettings(hashesOfConsistentSettings(fields)) + .clusterStateCustomMetadataMap(clusterStateCustomMetadata(fields)); } private static long term(Object[] fields) { @@ -168,6 +184,35 @@ private static List indicesRouting(Object[] fields) { return (List) fields[16]; } + private static UploadedMetadataAttribute discoveryNodesMetadata(Object[] fields) { + return (UploadedMetadataAttribute) fields[17]; + } + + private static UploadedMetadataAttribute clusterBlocksMetadata(Object[] fields) { + return (UploadedMetadataAttribute) fields[18]; + } + + private static long metadataVersion(Object[] fields) { + return (long) fields[19]; + } + + private static UploadedMetadataAttribute transientSettingsMetadata(Object[] fields) { + return (UploadedMetadataAttribute) fields[20]; + } + + private static UploadedMetadataAttribute hashesOfConsistentSettings(Object[] fields) { + return (UploadedMetadataAttribute) fields[21]; + } + + private static Map clusterStateCustomMetadata(Object[] fields) { + List customs = (List) fields[22]; + return customs.stream().collect(Collectors.toMap(UploadedMetadataAttribute::getAttributeName, Function.identity())); + } + + private static ClusterStateDiffManifest diffManifest(Object[] fields) { + return (ClusterStateDiffManifest) fields[23]; + } + private static final ConstructingObjectParser PARSER_V0 = new ConstructingObjectParser<>( "cluster_metadata_manifest", fields -> manifestV0Builder(fields).build() @@ -246,6 +291,37 @@ private static void declareParser(ConstructingObjectParser UploadedIndexMetadata.fromXContent(p), INDICES_ROUTING_FIELD ); + parser.declareNamedObject( + ConstructingObjectParser.optionalConstructorArg(), + UploadedMetadataAttribute.PARSER, + UPLOADED_DISCOVERY_NODES_METADATA + ); + parser.declareNamedObject( + ConstructingObjectParser.optionalConstructorArg(), + UploadedMetadataAttribute.PARSER, + UPLOADED_CLUSTER_BLOCKS_METADATA + ); + parser.declareLong(ConstructingObjectParser.constructorArg(), METADATA_VERSION); + parser.declareNamedObject( + ConstructingObjectParser.optionalConstructorArg(), + UploadedMetadataAttribute.PARSER, + UPLOADED_TRANSIENT_SETTINGS_METADATA + ); + parser.declareNamedObject( + ConstructingObjectParser.optionalConstructorArg(), + UploadedMetadataAttribute.PARSER, + UPLOADED_HASHES_OF_CONSISTENT_SETTINGS_METADATA + ); + parser.declareNamedObjects( + ConstructingObjectParser.optionalConstructorArg(), + UploadedMetadataAttribute.PARSER, + UPLOADED_CLUSTER_STATE_CUSTOM_METADATA + ); + parser.declareObject( + ConstructingObjectParser.constructorArg(), + (p, c) -> ClusterStateDiffManifest.fromXContent(p), + DIFF_MANIFEST + ); } } @@ -267,6 +343,13 @@ private static void declareParser(ConstructingObjectParser indicesRouting; + private final long metadataVersion; + private final UploadedMetadataAttribute uploadedTransientSettingsMetadata; + private final UploadedMetadataAttribute uploadedDiscoveryNodesMetadata; + private final UploadedMetadataAttribute uploadedClusterBlocksMetadata; + private final UploadedMetadataAttribute uploadedHashesOfConsistentSettings; + private final Map uploadedClusterStateCustomMap; + private final ClusterStateDiffManifest diffManifest; public List getIndices() { return indices; @@ -332,6 +415,38 @@ public Map getCustomMetadataMap() { return uploadedCustomMetadataMap; } + public long getMetadataVersion() { + return metadataVersion; + } + + public UploadedMetadataAttribute getTransientSettingsMetadata() { + return uploadedTransientSettingsMetadata; + } + + public UploadedMetadataAttribute getDiscoveryNodesMetadata() { + return uploadedDiscoveryNodesMetadata; + } + + public UploadedMetadataAttribute getClusterBlocksMetadata() { + return uploadedClusterBlocksMetadata; + } + + public ClusterStateDiffManifest getDiffManifest() { + return diffManifest; + } + + public UploadedMetadataAttribute getDiscoverNodeMetadata() { + return uploadedDiscoveryNodesMetadata; + } + + public Map getClusterStateCustomMap() { + return uploadedClusterStateCustomMap; + } + + public UploadedMetadataAttribute getHashesOfConsistentSettings() { + return uploadedHashesOfConsistentSettings; + } + public boolean hasMetadataAttributesFiles() { return uploadedCoordinationMetadata != null || uploadedSettingsMetadata != null @@ -365,7 +480,14 @@ public ClusterMetadataManifest( UploadedMetadataAttribute uploadedTemplatesMetadata, Map uploadedCustomMetadataMap, long routingTableVersion, - List indicesRouting + List indicesRouting, + long metadataVersion, + UploadedMetadataAttribute discoveryNodesMetadata, + UploadedMetadataAttribute clusterBlocksMetadata, + UploadedMetadataAttribute uploadedTransientSettingsMetadata, + UploadedMetadataAttribute uploadedHashesOfConsistentSettings, + Map uploadedClusterStateCustomMap, + ClusterStateDiffManifest diffManifest ) { this.clusterTerm = clusterTerm; this.stateVersion = version; @@ -387,6 +509,15 @@ public ClusterMetadataManifest( this.uploadedCustomMetadataMap = Collections.unmodifiableMap( uploadedCustomMetadataMap != null ? uploadedCustomMetadataMap : new HashMap<>() ); + this.uploadedDiscoveryNodesMetadata = discoveryNodesMetadata; + this.uploadedClusterBlocksMetadata = clusterBlocksMetadata; + this.diffManifest = diffManifest; + this.metadataVersion = metadataVersion; + this.uploadedTransientSettingsMetadata = uploadedTransientSettingsMetadata; + this.uploadedHashesOfConsistentSettings = uploadedHashesOfConsistentSettings; + this.uploadedClusterStateCustomMap = Collections.unmodifiableMap( + uploadedClusterStateCustomMap != null ? uploadedClusterStateCustomMap : new HashMap<>() + ); } public ClusterMetadataManifest(StreamInput in) throws IOException { @@ -411,25 +542,36 @@ public ClusterMetadataManifest(StreamInput in) throws IOException { this.globalMetadataFileName = null; this.routingTableVersion = in.readLong(); this.indicesRouting = Collections.unmodifiableList(in.readList(UploadedIndexMetadata::new)); - } else if (in.getVersion().onOrAfter(Version.V_2_12_0)) { - this.codecVersion = in.readInt(); - this.globalMetadataFileName = in.readString(); - this.uploadedCoordinationMetadata = null; - this.uploadedSettingsMetadata = null; - this.uploadedTemplatesMetadata = null; - this.uploadedCustomMetadataMap = null; - this.routingTableVersion = -1; - this.indicesRouting = null; + this.metadataVersion = in.readLong(); + this.uploadedDiscoveryNodesMetadata = new UploadedMetadataAttribute(in); + this.uploadedClusterBlocksMetadata = new UploadedMetadataAttribute(in); + this.uploadedTransientSettingsMetadata = new UploadedMetadataAttribute(in); + this.uploadedHashesOfConsistentSettings = new UploadedMetadataAttribute(in); + this.uploadedClusterStateCustomMap = Collections.unmodifiableMap( + in.readMap(StreamInput::readString, UploadedMetadataAttribute::new) + ); + this.diffManifest = null; } else { - this.codecVersion = CODEC_V0; // Default codec - this.globalMetadataFileName = null; + if (in.getVersion().onOrAfter(Version.V_2_12_0)) { + this.codecVersion = in.readInt(); + this.globalMetadataFileName = in.readString(); + } else { + this.codecVersion = CODEC_V0; // Default codec + this.globalMetadataFileName = null; + } this.uploadedCoordinationMetadata = null; this.uploadedSettingsMetadata = null; this.uploadedTemplatesMetadata = null; this.uploadedCustomMetadataMap = null; this.routingTableVersion = -1; this.indicesRouting = null; - } + this.uploadedDiscoveryNodesMetadata = null; + this.uploadedClusterBlocksMetadata = null; + this.diffManifest = null; + this.metadataVersion = -1; + this.uploadedTransientSettingsMetadata = null; + this.uploadedHashesOfConsistentSettings = null; + this.uploadedClusterStateCustomMap = null; } public static Builder builder() { @@ -497,6 +639,37 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws } } builder.endArray(); + if (getDiscoveryNodesMetadata() != null) { + builder.startObject(UPLOADED_DISCOVERY_NODES_METADATA.getPreferredName()); + getDiscoveryNodesMetadata().toXContent(builder, params); + builder.endObject(); + } + if (getClusterBlocksMetadata() != null) { + builder.startObject(UPLOADED_CLUSTER_BLOCKS_METADATA.getPreferredName()); + getClusterBlocksMetadata().toXContent(builder, params); + builder.endObject(); + } + if (getTransientSettingsMetadata() != null) { + builder.startObject(UPLOADED_TRANSIENT_SETTINGS_METADATA.getPreferredName()); + getTransientSettingsMetadata().toXContent(builder, params); + builder.endObject(); + } + if (getDiffManifest() != null) { + builder.startObject(DIFF_MANIFEST.getPreferredName()); + getDiffManifest().toXContent(builder, params); + builder.endObject(); + } + builder.field(METADATA_VERSION.getPreferredName(), getMetadataVersion()); + if (getHashesOfConsistentSettings() != null) { + builder.startObject(UPLOADED_HASHES_OF_CONSISTENT_SETTINGS_METADATA.getPreferredName()); + getHashesOfConsistentSettings().toXContent(builder, params); + builder.endObject(); + } + builder.startObject(UPLOADED_CLUSTER_STATE_CUSTOM_METADATA.getPreferredName()); + for (UploadedMetadataAttribute attribute : getClusterStateCustomMap().values()) { + attribute.toXContent(builder, params); + } + builder.endObject(); } return builder; } @@ -521,6 +694,12 @@ public void writeTo(StreamOutput out) throws IOException { out.writeMap(uploadedCustomMetadataMap, StreamOutput::writeString, (o, v) -> v.writeTo(o)); out.writeLong(routingTableVersion); out.writeCollection(indicesRouting); + out.writeLong(metadataVersion); + uploadedDiscoveryNodesMetadata.writeTo(out); + uploadedClusterBlocksMetadata.writeTo(out); + uploadedTransientSettingsMetadata.writeTo(out); + uploadedHashesOfConsistentSettings.writeTo(out); + out.writeMap(uploadedClusterStateCustomMap, StreamOutput::writeString, (o, v) -> v.writeTo(o)); } else if (out.getVersion().onOrAfter(Version.V_2_12_0)) { out.writeInt(codecVersion); out.writeString(globalMetadataFileName); @@ -529,6 +708,7 @@ public void writeTo(StreamOutput out) throws IOException { @Override public boolean equals(Object o) { + // ToDo: update this method to contain new attributes if (this == o) { return true; } @@ -549,11 +729,22 @@ public boolean equals(Object o) { && Objects.equals(globalMetadataFileName, that.globalMetadataFileName) && Objects.equals(codecVersion, that.codecVersion) && Objects.equals(routingTableVersion, that.routingTableVersion) - && Objects.equals(indicesRouting, that.indicesRouting); + && Objects.equals(indicesRouting, that.indicesRouting) + && Objects.equals(uploadedCoordinationMetadata, that.uploadedCoordinationMetadata) + && Objects.equals(uploadedSettingsMetadata, that.uploadedSettingsMetadata) + && Objects.equals(uploadedTemplatesMetadata, that.uploadedTemplatesMetadata) + && Objects.equals(uploadedCustomMetadataMap, that.uploadedCustomMetadataMap) + && Objects.equals(metadataVersion, that.metadataVersion) + && Objects.equals(uploadedDiscoveryNodesMetadata, that.uploadedDiscoveryNodesMetadata) + && Objects.equals(uploadedClusterBlocksMetadata, that.uploadedClusterBlocksMetadata) + && Objects.equals(uploadedTransientSettingsMetadata, that.uploadedTransientSettingsMetadata) + && Objects.equals(uploadedHashesOfConsistentSettings, that.uploadedHashesOfConsistentSettings) + && Objects.equals(uploadedClusterStateCustomMap, that.uploadedClusterStateCustomMap); } @Override public int hashCode() { + // ToDo: update this method to contain new attributes return Objects.hash( codecVersion, globalMetadataFileName, @@ -568,7 +759,17 @@ public int hashCode() { previousClusterUUID, clusterUUIDCommitted, routingTableVersion, - indicesRouting + indicesRouting, + uploadedCoordinationMetadata, + uploadedSettingsMetadata, + uploadedTemplatesMetadata, + uploadedCustomMetadataMap, + metadataVersion, + uploadedDiscoveryNodesMetadata, + uploadedClusterBlocksMetadata, + uploadedTransientSettingsMetadata, + uploadedHashesOfConsistentSettings, + uploadedClusterStateCustomMap ); } @@ -622,6 +823,13 @@ public static class Builder { private boolean clusterUUIDCommitted; private long routingTableVersion; private List indicesRouting; + private long metadataVersion; + private UploadedMetadataAttribute discoveryNodesMetadata; + private UploadedMetadataAttribute clusterBlocksMetadata; + private UploadedMetadataAttribute transientSettingsMetadata; + private UploadedMetadataAttribute hashesOfConsistentSettings; + private Map clusterStateCustomMetadataMap; + private ClusterStateDiffManifest diffManifest; public Builder indices(List indices) { this.indices = indices; @@ -726,10 +934,46 @@ public Builder clusterUUIDCommitted(boolean clusterUUIDCommitted) { return this; } + public Builder metadataVersion(long metadataVersion) { + this.metadataVersion = metadataVersion; + return this; + } + + public Builder discoveryNodesMetadata(UploadedMetadataAttribute discoveryNodesMetadata) { + this.discoveryNodesMetadata = discoveryNodesMetadata; + return this; + } + + public Builder clusterBlocksMetadata(UploadedMetadataAttribute clusterBlocksMetadata) { + this.clusterBlocksMetadata = clusterBlocksMetadata; + return this; + } + + public Builder transientSettingsMetadata(UploadedMetadataAttribute settingsMetadata) { + this.transientSettingsMetadata = settingsMetadata; + return this; + } + + public Builder hashesOfConsistentSettings(UploadedMetadataAttribute hashesOfConsistentSettings) { + this.hashesOfConsistentSettings = hashesOfConsistentSettings; + return this; + } + + public Builder clusterStateCustomMetadataMap(Map clusterStateCustomMetadataMap) { + this.clusterStateCustomMetadataMap = clusterStateCustomMetadataMap; + return this; + } + + public Builder diffManifest(ClusterStateDiffManifest diffManifest) { + this.diffManifest = diffManifest; + return this; + } + public Builder() { indices = new ArrayList<>(); customMetadataMap = new HashMap<>(); indicesRouting = new ArrayList<>(); + clusterStateCustomMetadataMap = new HashMap<>(); } public Builder(ClusterMetadataManifest manifest) { @@ -751,6 +995,9 @@ public Builder(ClusterMetadataManifest manifest) { this.clusterUUIDCommitted = manifest.clusterUUIDCommitted; this.routingTableVersion = manifest.routingTableVersion; this.indicesRouting = new ArrayList<>(manifest.indicesRouting); + this.diffManifest = manifest.diffManifest; + this.hashesOfConsistentSettings = manifest.uploadedHashesOfConsistentSettings; + this.clusterStateCustomMetadataMap = manifest.uploadedClusterStateCustomMap; } public ClusterMetadataManifest build() { @@ -772,7 +1019,14 @@ public ClusterMetadataManifest build() { templatesMetadata, customMetadataMap, routingTableVersion, - indicesRouting + indicesRouting, + metadataVersion, + discoveryNodesMetadata, + clusterBlocksMetadata, + transientSettingsMetadata, + hashesOfConsistentSettings, + clusterStateCustomMetadataMap, + diffManifest ); } diff --git a/server/src/main/java/org/opensearch/gateway/remote/ClusterStateDiffManifest.java b/server/src/main/java/org/opensearch/gateway/remote/ClusterStateDiffManifest.java new file mode 100644 index 0000000000000..ea4344d4d35e8 --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/ClusterStateDiffManifest.java @@ -0,0 +1,549 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote; + +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.DiffableUtils; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.core.common.Strings; +import org.opensearch.core.xcontent.MediaTypeRegistry; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParseException; +import org.opensearch.core.xcontent.XContentParser; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; +import static org.opensearch.core.xcontent.XContentParserUtils.parseStringList; + +public class ClusterStateDiffManifest implements ToXContentObject { + private static final String FROM_STATE_UUID_FIELD = "from_state_uuid"; + private static final String TO_STATE_UUID_FIELD = "to_state_uuid"; + private static final String METADATA_DIFF_FIELD = "metadata_diff"; + private static final String COORDINATION_METADATA_UPDATED_FIELD = "coordination_metadata_diff"; + private static final String SETTINGS_METADATA_UPDATED_FIELD = "settings_metadata_diff"; + private static final String TRANSIENT_SETTINGS_METADATA_UPDATED_FIELD = "transient_settings_metadata_diff"; + private static final String TEMPLATES_METADATA_UPDATED_FIELD = "templates_metadata_diff"; + private static final String HASHES_OF_CONSISTENT_SETTINGS_UPDATED_FIELD = "hashes_of_consistent_settings_diff"; + private static final String INDICES_DIFF_FIELD = "indices_diff"; + private static final String METADATA_CUSTOM_DIFF_FIELD = "metadata_custom_diff"; + private static final String UPSERTS_FIELD = "upserts"; + private static final String DELETES_FIELD = "deletes"; + private static final String CLUSTER_BLOCKS_UPDATED_FIELD = "cluster_blocks_diff"; + private static final String DISCOVERY_NODES_UPDATED_FIELD = "discovery_nodes_diff"; + private static final String CLUSTER_STATE_CUSTOM_DIFF_FIELD = "cluster_state_custom_diff"; + + private final String fromStateUUID; + private final String toStateUUID; + private final boolean coordinationMetadataUpdated; + private final boolean settingsMetadataUpdated; + private final boolean transientSettingsMetadataUpdate; + private final boolean templatesMetadataUpdated; + private List customMetadataUpdated; + private final List customMetadataDeleted; + private final List indicesUpdated; + private final List indicesDeleted; + private final boolean clusterBlocksUpdated; + private final boolean discoveryNodesUpdated; + private final boolean hashesOfConsistentSettingsUpdated; + private final List clusterStateCustomUpdated; + private final List clusterStateCustomDeleted; + + ClusterStateDiffManifest(ClusterState state, ClusterState previousState) { + fromStateUUID = previousState.stateUUID(); + toStateUUID = state.stateUUID(); + coordinationMetadataUpdated = !Metadata.isCoordinationMetadataEqual(state.metadata(), previousState.metadata()); + settingsMetadataUpdated = !Metadata.isSettingsMetadataEqual(state.metadata(), previousState.metadata()); + transientSettingsMetadataUpdate = !Metadata.isTransientSettingsMetadataEqual(state.metadata(), previousState.metadata()); + templatesMetadataUpdated = !Metadata.isTemplatesMetadataEqual(state.metadata(), previousState.metadata()); + indicesDeleted = findRemovedIndices(state.metadata().indices(), previousState.metadata().indices()); + indicesUpdated = findUpdatedIndices(state.metadata().indices(), previousState.metadata().indices()); + clusterBlocksUpdated = !state.blocks().equals(previousState.blocks()); + discoveryNodesUpdated = state.nodes().delta(previousState.nodes()).hasChanges(); + customMetadataUpdated = new ArrayList<>(); + for (String custom : state.metadata().customs().keySet()) { + if (!previousState.metadata().customs().containsKey(custom) || !state.metadata().customs().get(custom).equals(previousState.metadata().customs().get(custom))) { + customMetadataUpdated.add(custom); + } + } + customMetadataDeleted = new ArrayList<>(); + for (String custom : previousState.metadata().customs().keySet()) { + if (state.metadata().customs().get(custom) == null) { + customMetadataDeleted.add(custom); + } + } + + hashesOfConsistentSettingsUpdated = !state.metadata().hashesOfConsistentSettings().equals(previousState.metadata().hashesOfConsistentSettings()); + clusterStateCustomUpdated = new ArrayList<>(); + clusterStateCustomDeleted = new ArrayList<>(); + for (String custom : state.customs().keySet()) { + if (!previousState.customs().containsKey(custom) || !state.customs().get(custom).equals(previousState.customs().get(custom))) { + clusterStateCustomUpdated.add(custom); + } + } + for (String custom : previousState.customs().keySet()) { + if (state.customs().get(custom) == null) { + clusterStateCustomDeleted.add(custom); + } + } + } + + public ClusterStateDiffManifest( + String fromStateUUID, + String toStateUUID, + boolean coordinationMetadataUpdated, + boolean settingsMetadataUpdated, + boolean transientSettingsMetadataUpdate, + boolean templatesMetadataUpdated, + List customMetadataUpdated, + List customMetadataDeleted, + List indicesUpdated, + List indicesDeleted, + boolean clusterBlocksUpdated, + boolean discoveryNodesUpdated, + boolean hashesOfConsistentSettingsUpdated, + List clusterStateCustomUpdated, + List clusterStateCustomDeleted + ) { + this.fromStateUUID = fromStateUUID; + this.toStateUUID = toStateUUID; + this.coordinationMetadataUpdated = coordinationMetadataUpdated; + this.settingsMetadataUpdated = settingsMetadataUpdated; + this.transientSettingsMetadataUpdate = transientSettingsMetadataUpdate; + this.templatesMetadataUpdated = templatesMetadataUpdated; + this.customMetadataUpdated = customMetadataUpdated; + this.customMetadataDeleted = customMetadataDeleted; + this.indicesUpdated = indicesUpdated; + this.indicesDeleted = indicesDeleted; + this.clusterBlocksUpdated = clusterBlocksUpdated; + this.discoveryNodesUpdated = discoveryNodesUpdated; + this.hashesOfConsistentSettingsUpdated = hashesOfConsistentSettingsUpdated; + this.clusterStateCustomUpdated = clusterStateCustomUpdated; + this.clusterStateCustomDeleted = clusterStateCustomDeleted; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + { + builder.field(FROM_STATE_UUID_FIELD, fromStateUUID); + builder.field(TO_STATE_UUID_FIELD, toStateUUID); + builder.startObject(METADATA_DIFF_FIELD); + { + builder.field(COORDINATION_METADATA_UPDATED_FIELD, coordinationMetadataUpdated); + builder.field(SETTINGS_METADATA_UPDATED_FIELD, settingsMetadataUpdated); + builder.field(TRANSIENT_SETTINGS_METADATA_UPDATED_FIELD, transientSettingsMetadataUpdate); + builder.field(TEMPLATES_METADATA_UPDATED_FIELD, templatesMetadataUpdated); + builder.startObject(INDICES_DIFF_FIELD); + builder.startArray(UPSERTS_FIELD); + for (String index : indicesUpdated) { + builder.value(index); + } + builder.endArray(); + builder.startArray(DELETES_FIELD); + for (String index : indicesDeleted) { + builder.value(index); + } + builder.endArray(); + builder.endObject(); + builder.startObject(METADATA_CUSTOM_DIFF_FIELD); + builder.startArray(UPSERTS_FIELD); + for (String custom : customMetadataUpdated) { + builder.value(custom); + } + builder.endArray(); + builder.startArray(DELETES_FIELD); + for (String custom : customMetadataDeleted) { + builder.value(custom); + } + builder.endArray(); + builder.endObject(); + builder.field(HASHES_OF_CONSISTENT_SETTINGS_UPDATED_FIELD, hashesOfConsistentSettingsUpdated); + } + builder.endObject(); + builder.field(CLUSTER_BLOCKS_UPDATED_FIELD, clusterBlocksUpdated); + builder.field(DISCOVERY_NODES_UPDATED_FIELD, discoveryNodesUpdated); + builder.startObject(CLUSTER_STATE_CUSTOM_DIFF_FIELD); + builder.startArray(UPSERTS_FIELD); + for (String custom : clusterStateCustomUpdated) { + builder.value(custom); + } + builder.endArray(); + builder.startArray(DELETES_FIELD); + for (String custom : clusterStateCustomDeleted) { + builder.value(custom); + } + builder.endArray(); + builder.endObject(); + + } + return builder; + } + + public static ClusterStateDiffManifest fromXContent(XContentParser parser) throws IOException { + Builder builder = new Builder(); + if (parser.currentToken() == null) { // fresh parser? move to next token + parser.nextToken(); + } + if (parser.currentToken() == XContentParser.Token.START_OBJECT) { + parser.nextToken(); + } + ensureExpectedToken(XContentParser.Token.FIELD_NAME, parser.currentToken(), parser); + String currentFieldName = parser.currentName(); + XContentParser.Token token; + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if (token == XContentParser.Token.START_OBJECT) { + if (currentFieldName.equals(METADATA_DIFF_FIELD)) { + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + currentFieldName = parser.currentName(); + token = parser.nextToken(); + if (token.isValue()) { + switch (currentFieldName) { + case COORDINATION_METADATA_UPDATED_FIELD: + builder.coordinationMetadataUpdated(parser.booleanValue()); + break; + case SETTINGS_METADATA_UPDATED_FIELD: + builder.settingsMetadataUpdated(parser.booleanValue()); + break; + case TRANSIENT_SETTINGS_METADATA_UPDATED_FIELD: + builder.transientSettingsMetadataUpdate(parser.booleanValue()); + break; + case TEMPLATES_METADATA_UPDATED_FIELD: + builder.templatesMetadataUpdated(parser.booleanValue()); + break; + case HASHES_OF_CONSISTENT_SETTINGS_UPDATED_FIELD: + builder.hashesOfConsistentSettingsUpdated(parser.booleanValue()); + break; + default: + throw new XContentParseException("Unexpected field [" + currentFieldName + "]"); + } + } else if (token == XContentParser.Token.START_OBJECT) { + if (currentFieldName.equals(INDICES_DIFF_FIELD)) { + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + currentFieldName = parser.currentName(); + token = parser.nextToken(); + switch (currentFieldName) { + case UPSERTS_FIELD: + builder.indicesUpdated(parseStringList(parser)); + break; + case DELETES_FIELD: + builder.indicesDeleted(parseStringList(parser)); + break; + default: + throw new XContentParseException("Unexpected field [" + currentFieldName + "]"); + } + } + } else if (currentFieldName.equals(METADATA_CUSTOM_DIFF_FIELD)) { + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + currentFieldName = parser.currentName(); + token = parser.nextToken(); + switch (currentFieldName) { + case UPSERTS_FIELD: + builder.customMetadataUpdated(parseStringList(parser)); + break; + case DELETES_FIELD: + builder.customMetadataDeleted(parseStringList(parser)); + break; + default: + throw new XContentParseException("Unexpected field [" + currentFieldName + "]"); + } + } + } else { + throw new XContentParseException("Unexpected field [" + currentFieldName + "]"); + } + } else { + throw new XContentParseException("Unexpected token [" + token + "]"); + } + } + } else if (currentFieldName.equals(CLUSTER_STATE_CUSTOM_DIFF_FIELD)) { + while ((parser.nextToken()) != XContentParser.Token.END_OBJECT) { + currentFieldName = parser.currentName(); + parser.nextToken(); + switch (currentFieldName) { + case UPSERTS_FIELD: + builder.clusterStateCustomUpdated(parseStringList(parser)); + break; + case DELETES_FIELD: + builder.clusterStateCustomDeleted(parseStringList(parser)); + break; + default: + throw new XContentParseException("Unexpected field [" + currentFieldName + "]"); + } + } + } else { + throw new XContentParseException("Unexpected field [" + currentFieldName + "]"); + } + } else if (token.isValue()) { + switch (currentFieldName) { + case FROM_STATE_UUID_FIELD: + builder.fromStateUUID(parser.text()); + break; + case TO_STATE_UUID_FIELD: + builder.toStateUUID(parser.text()); + break; + case CLUSTER_BLOCKS_UPDATED_FIELD: + builder.clusterBlocksUpdated(parser.booleanValue()); + break; + case DISCOVERY_NODES_UPDATED_FIELD: + builder.discoveryNodesUpdated(parser.booleanValue()); + break; + default: + throw new XContentParseException("Unexpected field [" + currentFieldName + "]"); + } + } else { + throw new XContentParseException("Unexpected token [" + token + "]"); + } + } + return builder.build(); + } + + @Override + public String toString() { + return Strings.toString(MediaTypeRegistry.JSON, this); + } + + public List findRemovedIndices(Map indices, Map previousIndices) { + List removedIndices = new ArrayList<>(); + for (String index : previousIndices.keySet()) { + // index present in previous state but not in current + if (!indices.containsKey(index)) { + removedIndices.add(index); + } + } + return removedIndices; + } + + public List findUpdatedIndices(Map indices, Map previousIndices) { + List updatedIndices = new ArrayList<>(); + for (String index : indices.keySet()) { + if (!previousIndices.containsKey(index)) { + updatedIndices.add(index); + } else if (previousIndices.get(index).getVersion() != indices.get(index).getVersion()) { + updatedIndices.add(index); + } + } + return updatedIndices; + } + + public List getIndicesRoutingDeleted(RoutingTable previousRoutingTable, RoutingTable currentRoutingTable) { + List deletedIndicesRouting = new ArrayList<>(); + for(IndexRoutingTable previousIndexRouting: previousRoutingTable.getIndicesRouting().values()) { + if(!currentRoutingTable.getIndicesRouting().containsKey(previousIndexRouting.getIndex().getName())) { + // Latest Routing Table does not have entry for the index which means the index is deleted + deletedIndicesRouting.add(previousIndexRouting.getIndex().getName()); + } + } + return deletedIndicesRouting; + } + + public List getIndicesRoutingUpdated(RoutingTable previousRoutingTable, RoutingTable currentRoutingTable) { + List updatedIndicesRouting = new ArrayList<>(); + for(IndexRoutingTable currentIndicesRouting: currentRoutingTable.getIndicesRouting().values()) { + if(!previousRoutingTable.getIndicesRouting().containsKey(currentIndicesRouting.getIndex().getName())) { + // Latest Routing Table does not have entry for the index which means the index is created + updatedIndicesRouting.add(currentIndicesRouting.getIndex().getName()); + } else { + if(previousRoutingTable.getIndicesRouting().get(currentIndicesRouting.getIndex().getName()).equals(currentIndicesRouting)) { + // if the latest routing table has the same routing table as the previous routing table, then the index is not updated + continue; + } + updatedIndicesRouting.add(currentIndicesRouting.getIndex().getName()); + } + } + return updatedIndicesRouting; + } + + public String getFromStateUUID() { + return fromStateUUID; + } + + public String getToStateUUID() { + return toStateUUID; + } + + public boolean isCoordinationMetadataUpdated() { + return coordinationMetadataUpdated; + } + + public boolean isSettingsMetadataUpdated() { + return settingsMetadataUpdated; + } + + public boolean isTransientSettingsMetadataUpdated() { + return transientSettingsMetadataUpdate; + } + + public boolean isTemplatesMetadataUpdated() { + return templatesMetadataUpdated; + } + + public List getCustomMetadataUpdated() { + return customMetadataUpdated; + } + + public List getCustomMetadataDeleted() { + return customMetadataDeleted; + } + + public List getIndicesUpdated() { + return indicesUpdated; + } + + public List getIndicesDeleted() { + return indicesDeleted; + } + + public boolean isClusterBlocksUpdated() { + return clusterBlocksUpdated; + } + + public boolean isDiscoveryNodesUpdated() { + return discoveryNodesUpdated; + } + + public boolean isHashesOfConsistentSettingsUpdated() { + return hashesOfConsistentSettingsUpdated; + } + + public List getClusterStateCustomUpdated() { + return clusterStateCustomUpdated; + } + + public List getClusterStateCustomDeleted() { + return clusterStateCustomDeleted; + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private String fromStateUUID; + private String toStateUUID; + private boolean coordinationMetadataUpdated; + private boolean settingsMetadataUpdated; + private boolean transientSettingsMetadataUpdated; + private boolean templatesMetadataUpdated; + private List customMetadataUpdated; + private List customMetadataDeleted; + private List indicesUpdated; + private List indicesDeleted; + private boolean clusterBlocksUpdated; + private boolean discoveryNodesUpdated; + private boolean hashesOfConsistentSettingsUpdated; + private List clusterStateCustomUpdated; + private List clusterStateCustomDeleted; + + public Builder() {} + + public Builder fromStateUUID(String fromStateUUID) { + this.fromStateUUID = fromStateUUID; + return this; + } + + public Builder toStateUUID(String toStateUUID) { + this.toStateUUID = toStateUUID; + return this; + } + + public Builder coordinationMetadataUpdated(boolean coordinationMetadataUpdated) { + this.coordinationMetadataUpdated = coordinationMetadataUpdated; + return this; + } + + public Builder settingsMetadataUpdated(boolean settingsMetadataUpdated) { + this.settingsMetadataUpdated = settingsMetadataUpdated; + return this; + } + + public Builder transientSettingsMetadataUpdate(boolean settingsMetadataUpdated) { + this.transientSettingsMetadataUpdated = settingsMetadataUpdated; + return this; + } + + public Builder templatesMetadataUpdated(boolean templatesMetadataUpdated) { + this.templatesMetadataUpdated = templatesMetadataUpdated; + return this; + } + + public Builder hashesOfConsistentSettingsUpdated(boolean hashesOfConsistentSettingsUpdated) { + this.hashesOfConsistentSettingsUpdated = hashesOfConsistentSettingsUpdated; + return this; + } + + public Builder customMetadataUpdated(List customMetadataUpdated) { + this.customMetadataUpdated = customMetadataUpdated; + return this; + } + + public Builder customMetadataDeleted(List customMetadataDeleted) { + this.customMetadataDeleted = customMetadataDeleted; + return this; + } + + public Builder indicesUpdated(List indicesUpdated) { + this.indicesUpdated = indicesUpdated; + return this; + } + + public Builder indicesDeleted(List indicesDeleted) { + this.indicesDeleted = indicesDeleted; + return this; + } + + public Builder clusterBlocksUpdated(boolean clusterBlocksUpdated) { + this.clusterBlocksUpdated = clusterBlocksUpdated; + return this; + } + + public Builder discoveryNodesUpdated(boolean discoveryNodesUpdated) { + this.discoveryNodesUpdated = discoveryNodesUpdated; + return this; + } + + public Builder clusterStateCustomUpdated(List clusterStateCustomUpdated) { + this.clusterStateCustomUpdated = clusterStateCustomUpdated; + return this; + } + + public Builder clusterStateCustomDeleted(List clusterStateCustomDeleted) { + this.clusterStateCustomDeleted = clusterStateCustomDeleted; + return this; + } + + public ClusterStateDiffManifest build() { + return new ClusterStateDiffManifest( + fromStateUUID, + toStateUUID, + coordinationMetadataUpdated, + settingsMetadataUpdated, + transientSettingsMetadataUpdated, + templatesMetadataUpdated, + customMetadataUpdated, + customMetadataDeleted, + indicesUpdated, + indicesDeleted, + clusterBlocksUpdated, + discoveryNodesUpdated, + hashesOfConsistentSettingsUpdated, + clusterStateCustomUpdated, + clusterStateCustomDeleted + ); + } + } +} diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java new file mode 100644 index 0000000000000..4a4b0c79b21a9 --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java @@ -0,0 +1,134 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote; + +import java.io.IOException; +import org.opensearch.action.LatchedActionListener; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.ClusterState.Custom; +import org.opensearch.cluster.block.ClusterBlocks; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.common.CheckedRunnable; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.compress.Compressor; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.gateway.remote.RemoteClusterStateUtils.RemoteStateTransferException; +import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity; +import org.opensearch.gateway.remote.model.RemoteClusterStateBlobStore; +import org.opensearch.gateway.remote.model.RemoteClusterBlocks; +import org.opensearch.gateway.remote.model.RemoteClusterStateCustoms; +import org.opensearch.gateway.remote.model.RemoteDiscoveryNodes; +import org.opensearch.gateway.remote.model.RemoteReadResult; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.opensearch.gateway.remote.model.RemoteClusterStateCustoms.CLUSTER_STATE_CUSTOM; +import static org.opensearch.gateway.remote.model.RemoteCustomMetadata.CUSTOM_DELIMITER; + +public class RemoteClusterStateAttributesManager { + public static final String CLUSTER_STATE_ATTRIBUTE = "cluster_state_attribute"; + public static final String DISCOVERY_NODES = "nodes"; + public static final String CLUSTER_BLOCKS = "blocks"; + public static final int CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION = 1; + private final RemoteClusterStateBlobStore clusterBlocksBlobStore; + private final RemoteClusterStateBlobStore discoveryNodesBlobStore; + private final RemoteClusterStateBlobStore customsBlobStore; + private final Compressor compressor; + private final NamedXContentRegistry namedXContentRegistry; + + RemoteClusterStateAttributesManager( + RemoteClusterStateBlobStore clusterBlocksBlobStore, RemoteClusterStateBlobStore discoveryNodesBlobStore, RemoteClusterStateBlobStore customsBlobStore, Compressor compressor, NamedXContentRegistry namedXContentRegistry) { + this.clusterBlocksBlobStore = clusterBlocksBlobStore; + this.discoveryNodesBlobStore = discoveryNodesBlobStore; + this.customsBlobStore = customsBlobStore; + this.compressor = compressor; + this.namedXContentRegistry = namedXContentRegistry; + } + + /** + * Allows async upload of Cluster State Attribute components to remote + */ + CheckedRunnable getAsyncMetadataWriteAction( + ClusterState clusterState, + String component, + ToXContent componentData, + LatchedActionListener latchedActionListener + ) { + if (componentData instanceof DiscoveryNodes) { + RemoteDiscoveryNodes remoteObject = new RemoteDiscoveryNodes((DiscoveryNodes)componentData, clusterState.version(), clusterState.metadata().clusterUUID(), compressor, namedXContentRegistry); + return () -> discoveryNodesBlobStore.writeAsync(remoteObject, getActionListener(component, remoteObject, latchedActionListener)); + } else if (componentData instanceof ClusterBlocks) { + RemoteClusterBlocks remoteObject = new RemoteClusterBlocks((ClusterBlocks) componentData, clusterState.version(), clusterState.metadata().clusterUUID(), compressor, namedXContentRegistry); + return () -> clusterBlocksBlobStore.writeAsync(remoteObject, getActionListener(component, remoteObject, latchedActionListener)); + } else if (componentData instanceof ClusterState.Custom) { + RemoteClusterStateCustoms remoteObject = new RemoteClusterStateCustoms( + (ClusterState.Custom) componentData, + component, + clusterState.version(), + clusterState.metadata().clusterUUID(), + compressor, + namedXContentRegistry + ); + return () -> customsBlobStore.writeAsync(remoteObject, getActionListener(component, remoteObject, latchedActionListener)); + } else { + throw new RemoteStateTransferException("Remote object not found for "+ componentData.getClass()); + } + } + + private ActionListener getActionListener(String component, AbstractRemoteWritableBlobEntity remoteObject, LatchedActionListener latchedActionListener) { + return ActionListener.wrap( + resp -> latchedActionListener.onResponse( + remoteObject.getUploadedMetadata() + ), + ex -> latchedActionListener.onFailure(new RemoteClusterStateUtils.RemoteStateTransferException(component, ex)) + ); + } + + public CheckedRunnable getAsyncMetadataReadAction( + String clusterUUID, + String component, + String componentName, + String uploadedFilename, + LatchedActionListener listener + ) { + final ActionListener actionListener = ActionListener.wrap(response -> listener.onResponse(new RemoteReadResult((ToXContent) response, CLUSTER_STATE_ATTRIBUTE, component)), listener::onFailure); + if (component.equals(RemoteDiscoveryNodes.DISCOVERY_NODES)) { + RemoteDiscoveryNodes remoteDiscoveryNodes = new RemoteDiscoveryNodes(uploadedFilename, clusterUUID, compressor, namedXContentRegistry); + return () -> discoveryNodesBlobStore.readAsync(remoteDiscoveryNodes, actionListener); + } else if (component.equals(RemoteClusterBlocks.CLUSTER_BLOCKS)) { + RemoteClusterBlocks remoteClusterBlocks = new RemoteClusterBlocks(uploadedFilename, clusterUUID, compressor, namedXContentRegistry); + return () -> clusterBlocksBlobStore.readAsync(remoteClusterBlocks, actionListener); + } else if (component.equals(CLUSTER_STATE_CUSTOM)) { + final ActionListener customActionListener = ActionListener.wrap(response -> listener.onResponse(new RemoteReadResult((ToXContent) response, CLUSTER_STATE_ATTRIBUTE, String.join(CUSTOM_DELIMITER, component, componentName))), listener::onFailure); + RemoteClusterStateCustoms remoteClusterStateCustoms = new RemoteClusterStateCustoms(uploadedFilename, componentName, clusterUUID, compressor, namedXContentRegistry); + return () -> customsBlobStore.readAsync(remoteClusterStateCustoms, customActionListener); + } else { + throw new RemoteStateTransferException("Remote object not found for "+ component); + } + } + + public Map getUpdatedCustoms(ClusterState clusterState, ClusterState previousClusterState) { + Map updatedCustoms = new HashMap<>(); + Set currentCustoms = new HashSet<>(clusterState.customs().keySet()); + for (Map.Entry entry : previousClusterState.customs().entrySet()) { + if (currentCustoms.contains(entry.getKey()) && !entry.getValue().equals(clusterState.customs().get(entry.getKey()))) { + updatedCustoms.put(entry.getKey(), clusterState.customs().get(entry.getKey())); + } + currentCustoms.remove(entry.getKey()); + } + for (String custom : currentCustoms) { + updatedCustoms.put(custom, clusterState.customs().get(custom)); + } + return updatedCustoms; + } +} diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterBlocks.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterBlocks.java new file mode 100644 index 0000000000000..29bde27db150e --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterBlocks.java @@ -0,0 +1,102 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote.model; + +import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.METADATA_NAME_FORMAT; + +import java.io.IOException; +import java.io.InputStream; +import java.util.List; +import org.opensearch.cluster.block.ClusterBlocks; +import org.opensearch.common.io.Streams; +import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity; +import org.opensearch.common.remote.BlobPathParameters; +import org.opensearch.core.compress.Compressor; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata; +import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute; +import org.opensearch.gateway.remote.RemoteClusterStateUtils; +import org.opensearch.index.remote.RemoteStoreUtils; +import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat; + +/** + * Wrapper class for uploading/downloading {@link ClusterBlocks} to/from remote blob store + */ +public class RemoteClusterBlocks extends AbstractRemoteWritableBlobEntity { + + public static final String CLUSTER_BLOCKS = "blocks"; + public static final ChecksumBlobStoreFormat CLUSTER_BLOCKS_FORMAT = new ChecksumBlobStoreFormat<>( + "blocks", + METADATA_NAME_FORMAT, + ClusterBlocks::fromXContent + ); + + private ClusterBlocks clusterBlocks; + private long stateVersion; + + public RemoteClusterBlocks(final ClusterBlocks clusterBlocks, long stateVersion, String clusterUUID, + final Compressor compressor, final NamedXContentRegistry namedXContentRegistry) { + super(clusterUUID, compressor, namedXContentRegistry); + this.clusterBlocks = clusterBlocks; + this.stateVersion = stateVersion; + } + + public RemoteClusterBlocks(final String blobName, final String clusterUUID, final Compressor compressor, final NamedXContentRegistry namedXContentRegistry) { + super(clusterUUID, compressor, namedXContentRegistry); + this.blobName = blobName; + } + + @Override + public BlobPathParameters getBlobPathParameters() { + return new BlobPathParameters(List.of("transient"), CLUSTER_BLOCKS); + } + + @Override + public String generateBlobFileName() { + // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/transient/______ + String blobFileName = String.join( + DELIMITER, + getBlobPathParameters().getFilePrefix(), + RemoteStoreUtils.invertLong(stateVersion), + RemoteStoreUtils.invertLong(System.currentTimeMillis()), + String.valueOf(CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION) + ); + this.blobFileName = blobFileName; + return blobFileName; + } + + @Override + public UploadedMetadata getUploadedMetadata() { + assert blobName != null; + return new UploadedMetadataAttribute(CLUSTER_BLOCKS, blobName); + } + + @Override + public void set(final ClusterBlocks clusterBlocks) { + this.clusterBlocks = clusterBlocks; + } + + @Override + public ClusterBlocks get() { + return clusterBlocks; + } + + + @Override + public InputStream serialize() throws IOException { + return CLUSTER_BLOCKS_FORMAT.serialize(clusterBlocks, generateBlobFileName(), getCompressor(), RemoteClusterStateUtils.FORMAT_PARAMS).streamInput(); + } + + @Override + public ClusterBlocks deserialize(final InputStream inputStream) throws IOException { + return CLUSTER_BLOCKS_FORMAT.deserialize(blobName, getNamedXContentRegistry(), Streams.readFully(inputStream)); + } +} diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateCustoms.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateCustoms.java new file mode 100644 index 0000000000000..fffbcfab4e141 --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateCustoms.java @@ -0,0 +1,108 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote.model; + +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.ClusterState.Custom; +import org.opensearch.common.io.Streams; +import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity; +import org.opensearch.common.remote.BlobPathParameters; +import org.opensearch.core.compress.Compressor; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.gateway.remote.ClusterMetadataManifest; +import org.opensearch.gateway.remote.RemoteClusterStateUtils; +import org.opensearch.index.remote.RemoteStoreUtils; +import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat; + +import java.io.IOException; +import java.io.InputStream; +import java.util.List; + +import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.CLUSTER_STATE_EPHEMERAL_PATH_TOKEN; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.METADATA_NAME_FORMAT; +import static org.opensearch.gateway.remote.model.RemoteCustomMetadata.CUSTOM_DELIMITER; + +public class RemoteClusterStateCustoms extends AbstractRemoteWritableBlobEntity { + public static final String CLUSTER_STATE_CUSTOM = "cluster-state-custom"; + + public final ChecksumBlobStoreFormat clusterStateCustomBlobStoreFormat; + private long stateVersion; + private String customType; + private ClusterState.Custom custom; + + public RemoteClusterStateCustoms(final ClusterState.Custom custom, final String customType, final long stateVersion, final String clusterUUID, final Compressor compressor, final NamedXContentRegistry namedXContentRegistry) { + super(clusterUUID, compressor, namedXContentRegistry); + this.stateVersion = stateVersion; + this.customType = customType; + this.custom = custom; + this.clusterStateCustomBlobStoreFormat = new ChecksumBlobStoreFormat<>( + CLUSTER_STATE_CUSTOM, + METADATA_NAME_FORMAT, + parser -> ClusterState.Custom.fromXContent(parser, customType) + ); + } + + public RemoteClusterStateCustoms(final String blobName, final String customType, final String clusterUUID, final Compressor compressor, final NamedXContentRegistry namedXContentRegistry) { + super(clusterUUID, compressor, namedXContentRegistry); + this.blobName = blobName; + this.customType = customType; + this.clusterStateCustomBlobStoreFormat = new ChecksumBlobStoreFormat<>( + CLUSTER_STATE_CUSTOM, + METADATA_NAME_FORMAT, + parser -> ClusterState.Custom.fromXContent(parser, customType) + ); + } + + @Override + public BlobPathParameters getBlobPathParameters() { + return new BlobPathParameters(List.of(CLUSTER_STATE_EPHEMERAL_PATH_TOKEN), CLUSTER_STATE_CUSTOM); + } + + @Override + public String generateBlobFileName() { + // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/ephemeral/______ + String blobFileName = String.join( + DELIMITER, + getBlobPathParameters().getFilePrefix(), + RemoteStoreUtils.invertLong(stateVersion), + RemoteStoreUtils.invertLong(System.currentTimeMillis()), + String.valueOf(CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION) + ); + this.blobFileName = blobFileName; + return blobFileName; + } + + @Override + public ClusterMetadataManifest.UploadedMetadata getUploadedMetadata() { + assert blobName != null; + return new ClusterMetadataManifest.UploadedMetadataAttribute(String.join(CUSTOM_DELIMITER, CLUSTER_STATE_CUSTOM, customType), blobName); + } + + @Override + public void set(Custom custom) { + this.custom = custom; + } + + @Override + public ClusterState.Custom get() { + return custom; + } + + @Override + public InputStream serialize() throws IOException { + return clusterStateCustomBlobStoreFormat.serialize(custom, generateBlobFileName(), getCompressor(), RemoteClusterStateUtils.FORMAT_PARAMS).streamInput(); + } + + @Override + public ClusterState.Custom deserialize(final InputStream inputStream) throws IOException { + return clusterStateCustomBlobStoreFormat.deserialize(blobName, getNamedXContentRegistry(), Streams.readFully(inputStream)); + } +} diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodes.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodes.java new file mode 100644 index 0000000000000..34aca7767a0b1 --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodes.java @@ -0,0 +1,101 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote.model; + +import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.CLUSTER_STATE_EPHEMERAL_PATH_TOKEN; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.METADATA_NAME_FORMAT; + +import java.io.IOException; +import java.io.InputStream; +import java.util.List; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.common.io.Streams; +import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity; +import org.opensearch.common.remote.BlobPathParameters; +import org.opensearch.core.compress.Compressor; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata; +import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute; +import org.opensearch.gateway.remote.RemoteClusterStateUtils; +import org.opensearch.index.remote.RemoteStoreUtils; +import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat; + +/** + * Wrapper class for uploading/downloading {@link DiscoveryNodes} to/from remote blob store + */ +public class RemoteDiscoveryNodes extends AbstractRemoteWritableBlobEntity { + + public static final String DISCOVERY_NODES = "nodes"; + public static final ChecksumBlobStoreFormat DISCOVERY_NODES_FORMAT = new ChecksumBlobStoreFormat<>( + "nodes", + METADATA_NAME_FORMAT, + DiscoveryNodes::fromXContent + ); + + private DiscoveryNodes discoveryNodes; + private long stateVersion; + + public RemoteDiscoveryNodes(final DiscoveryNodes discoveryNodes, final long stateVersion, final String clusterUUID, final Compressor compressor, final NamedXContentRegistry namedXContentRegistry) { + super(clusterUUID, compressor, namedXContentRegistry); + this.discoveryNodes = discoveryNodes; + this.stateVersion = stateVersion; + } + + public RemoteDiscoveryNodes(final String blobName, final String clusterUUID, final Compressor compressor, final NamedXContentRegistry namedXContentRegistry) { + super(clusterUUID, compressor, namedXContentRegistry); + this.blobName = blobName; + } + + @Override + public BlobPathParameters getBlobPathParameters() { + return new BlobPathParameters(List.of(CLUSTER_STATE_EPHEMERAL_PATH_TOKEN), DISCOVERY_NODES); + } + + @Override + public String generateBlobFileName() { + // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/ephemeral/______ + String blobFileName = String.join( + DELIMITER, + getBlobPathParameters().getFilePrefix(), + RemoteStoreUtils.invertLong(stateVersion), + RemoteStoreUtils.invertLong(System.currentTimeMillis()), + String.valueOf(CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION) + ); + this.blobFileName = blobFileName; + return blobFileName; + } + + @Override + public UploadedMetadata getUploadedMetadata() { + assert blobName != null; + return new UploadedMetadataAttribute(DISCOVERY_NODES, blobName); + } + + @Override + public void set(final DiscoveryNodes discoveryNodes) { + this.discoveryNodes = discoveryNodes; + } + + @Override + public DiscoveryNodes get() { + return discoveryNodes; + } + + @Override + public InputStream serialize() throws IOException { + return DISCOVERY_NODES_FORMAT.serialize(discoveryNodes, generateBlobFileName(), getCompressor(), RemoteClusterStateUtils.FORMAT_PARAMS).streamInput(); + } + + @Override + public DiscoveryNodes deserialize(final InputStream inputStream) throws IOException { + return DISCOVERY_NODES_FORMAT.deserialize(blobName, getNamedXContentRegistry(), Streams.readFully(inputStream)); + } +} diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteHashesOfConsistentSettings.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteHashesOfConsistentSettings.java new file mode 100644 index 0000000000000..3dfa42139bb02 --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteHashesOfConsistentSettings.java @@ -0,0 +1,94 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote.model; + +import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.GLOBAL_METADATA_PATH_TOKEN; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.METADATA_NAME_FORMAT; + +import java.io.IOException; +import java.io.InputStream; +import java.util.List; +import org.opensearch.cluster.metadata.DiffableStringMap; +import org.opensearch.common.io.Streams; +import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity; +import org.opensearch.common.remote.BlobPathParameters; +import org.opensearch.core.compress.Compressor; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.gateway.remote.ClusterMetadataManifest; +import org.opensearch.gateway.remote.RemoteClusterStateUtils; +import org.opensearch.index.remote.RemoteStoreUtils; +import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat; + +public class RemoteHashesOfConsistentSettings extends AbstractRemoteWritableBlobEntity { + public static final String HASHES_OF_CONSISTENT_SETTINGS = "hashes-of-consistent-settings"; + public static final ChecksumBlobStoreFormat HASHES_OF_CONSISTENT_SETTINGS_FORMAT = new ChecksumBlobStoreFormat<>( + HASHES_OF_CONSISTENT_SETTINGS, + METADATA_NAME_FORMAT, + DiffableStringMap::fromXContent + ); + + private DiffableStringMap hashesOfConsistentSettings; + private long metadataVersion; + public RemoteHashesOfConsistentSettings(final DiffableStringMap hashesOfConsistentSettings, final long metadataVersion, final String clusterUUID, final Compressor compressor, final NamedXContentRegistry namedXContentRegistry) { + super(clusterUUID, compressor, namedXContentRegistry); + this.metadataVersion = metadataVersion; + this.hashesOfConsistentSettings = hashesOfConsistentSettings; + } + + public RemoteHashesOfConsistentSettings(final String blobName, final String clusterUUID, final Compressor compressor, final NamedXContentRegistry namedXContentRegistry) { + super(clusterUUID, compressor, namedXContentRegistry); + this.blobName = blobName; + } + + @Override + public BlobPathParameters getBlobPathParameters() { + return new BlobPathParameters(List.of(GLOBAL_METADATA_PATH_TOKEN), HASHES_OF_CONSISTENT_SETTINGS); + } + + @Override + public String generateBlobFileName() { + String blobFileName = String.join( + DELIMITER, + getBlobPathParameters().getFilePrefix(), + RemoteStoreUtils.invertLong(metadataVersion), + RemoteStoreUtils.invertLong(System.currentTimeMillis()), + String.valueOf(CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION) + ); + this.blobFileName = blobFileName; + return blobFileName; + } + + @Override + public ClusterMetadataManifest.UploadedMetadata getUploadedMetadata() { + assert blobName != null; + return new ClusterMetadataManifest.UploadedMetadataAttribute(HASHES_OF_CONSISTENT_SETTINGS, blobName); + } + + @Override + public void set(final DiffableStringMap hashesOfConsistentSettings) { + this.hashesOfConsistentSettings = hashesOfConsistentSettings; + } + + @Override + public DiffableStringMap get() { + return hashesOfConsistentSettings; + } + + @Override + public InputStream serialize() throws IOException { + return HASHES_OF_CONSISTENT_SETTINGS_FORMAT.serialize(hashesOfConsistentSettings, generateBlobFileName(), getCompressor(), RemoteClusterStateUtils.FORMAT_PARAMS).streamInput(); + } + + @Override + public DiffableStringMap deserialize(final InputStream inputStream) throws IOException { + return HASHES_OF_CONSISTENT_SETTINGS_FORMAT.deserialize(blobName, getNamedXContentRegistry(), Streams.readFully(inputStream)); + } +} diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteReadResult.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteReadResult.java new file mode 100644 index 0000000000000..adee09eaeffef --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteReadResult.java @@ -0,0 +1,39 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote.model; + +import org.opensearch.core.xcontent.ToXContent; + +/** + * Container class for entity read from remote store + */ +public class RemoteReadResult { + + ToXContent obj; + String component; + String componentName; + + public RemoteReadResult(ToXContent obj, String component, String componentName) { + this.obj = obj; + this.component = component; + this.componentName = componentName; + } + + public ToXContent getObj() { + return obj; + } + + public String getComponent() { + return component; + } + + public String getComponentName() { + return componentName; + } +} diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteTransientSettingsMetadata.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteTransientSettingsMetadata.java new file mode 100644 index 0000000000000..75b6cfc3a765a --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteTransientSettingsMetadata.java @@ -0,0 +1,101 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote.model; + +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.METADATA_NAME_FORMAT; +import static org.opensearch.gateway.remote.RemoteGlobalMetadataManager.GLOBAL_METADATA_CURRENT_CODEC_VERSION; + +import java.io.IOException; +import java.io.InputStream; +import java.util.List; +import org.opensearch.common.io.Streams; +import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity; +import org.opensearch.common.remote.BlobPathParameters; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.compress.Compressor; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata; +import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute; +import org.opensearch.gateway.remote.RemoteClusterStateUtils; +import org.opensearch.index.remote.RemoteStoreUtils; +import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat; + +/** + * Wrapper class for uploading/downloading transient {@link Settings} to/from remote blob store + */ +public class RemoteTransientSettingsMetadata extends AbstractRemoteWritableBlobEntity { + + public static final String TRANSIENT_SETTING_METADATA = "transient-settings"; + + public static final ChecksumBlobStoreFormat SETTINGS_METADATA_FORMAT = new ChecksumBlobStoreFormat<>( + "transient-settings", + METADATA_NAME_FORMAT, + Settings::fromXContent + ); + + private Settings transientSettings; + private long metadataVersion; + + public RemoteTransientSettingsMetadata(final Settings transientSettings, final long metadataVersion, final String clusterUUID, final Compressor compressor, final NamedXContentRegistry namedXContentRegistry) { + super(clusterUUID, compressor, namedXContentRegistry); + this.transientSettings = transientSettings; + this.metadataVersion = metadataVersion; + } + + public RemoteTransientSettingsMetadata(final String blobName, final String clusterUUID, final Compressor compressor, final NamedXContentRegistry namedXContentRegistry) { + super(clusterUUID, compressor, namedXContentRegistry); + this.blobName = blobName; + } + + @Override + public BlobPathParameters getBlobPathParameters() { + return new BlobPathParameters(List.of("global-metadata"), TRANSIENT_SETTING_METADATA); + } + + @Override + public String generateBlobFileName() { + String blobFileName = String.join( + DELIMITER, + getBlobPathParameters().getFilePrefix(), + RemoteStoreUtils.invertLong(metadataVersion), + RemoteStoreUtils.invertLong(System.currentTimeMillis()), + String.valueOf(GLOBAL_METADATA_CURRENT_CODEC_VERSION) + ); + this.blobFileName = blobFileName; + return blobFileName; + } + + @Override + public void set(final Settings settings) { + this.transientSettings = settings; + } + + @Override + public Settings get() { + return transientSettings; + } + + @Override + public InputStream serialize() throws IOException { + return SETTINGS_METADATA_FORMAT.serialize(transientSettings, generateBlobFileName(), getCompressor(), RemoteClusterStateUtils.FORMAT_PARAMS) + .streamInput(); + } + + @Override + public Settings deserialize(final InputStream inputStream) throws IOException { + return SETTINGS_METADATA_FORMAT.deserialize(blobName, getNamedXContentRegistry(), Streams.readFully(inputStream)); + } + + @Override + public UploadedMetadata getUploadedMetadata() { + assert blobName != null; + return new UploadedMetadataAttribute(TRANSIENT_SETTING_METADATA, blobName); + } +}