diff --git a/server/src/main/java/org/opensearch/cluster/store/ClusterMetadataMarker.java b/server/src/main/java/org/opensearch/cluster/store/ClusterMetadataMarker.java new file mode 100644 index 0000000000000..9cd79e5fa3c86 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/store/ClusterMetadataMarker.java @@ -0,0 +1,331 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.store; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import org.opensearch.core.ParseField; +import org.opensearch.core.common.Strings; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.xcontent.ConstructingObjectParser; +import org.opensearch.core.xcontent.MediaTypeRegistry; +import org.opensearch.core.xcontent.ToXContentFragment; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.core.xcontent.XContentParser.Token; + +/** + * Marker file which contains the details of the uploaded entity metadata + * + * @opensearch.internal + */ +public class ClusterMetadataMarker implements Writeable, ToXContentFragment { + + private static final ParseField INDICES_FIELD = new ParseField("indices"); + private static final ParseField TERM_FIELD = new ParseField("term"); + private static final ParseField VERSION_FIELD = new ParseField("version"); + private static final ParseField CLUSTER_UUID_FIELD = new ParseField("cluster_uuid"); + private static final ParseField STATE_UUID_FIELD = new ParseField("state_uuid"); + + private static Map indices(Object[] fields) { + return (Map) fields[0]; + } + + private static long term(Object[] fields) { + return (long) fields[1]; + } + + private static long version(Object[] fields) { + return (long) fields[2]; + } + + private static String clusterUUID(Object[] fields) { + return (String) fields[3]; + } + + private static String stateUUID(Object[] fields) { + return (String) fields[4]; + } + + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "cluster_metadata_marker", + fields -> new ClusterMetadataMarker(indices(fields), term(fields), version(fields), clusterUUID(fields), stateUUID(fields)) + ); + + static { + PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> { + Map uploadMetadataMap = new HashMap<>(); + while (p.nextToken() != Token.END_OBJECT) { + UploadedIndexMetadata uploadMetadata = UploadedIndexMetadata.fromXContent(p); + uploadMetadataMap.put(uploadMetadata.getIndexName(), uploadMetadata); + } + return uploadMetadataMap; + }, INDICES_FIELD); + PARSER.declareLong(ConstructingObjectParser.constructorArg(), TERM_FIELD); + PARSER.declareLong(ConstructingObjectParser.constructorArg(), VERSION_FIELD); + PARSER.declareString(ConstructingObjectParser.constructorArg(), CLUSTER_UUID_FIELD); + PARSER.declareString(ConstructingObjectParser.constructorArg(), STATE_UUID_FIELD); + } + + private final Map indices; + private final long term; + private final long version; + private final String clusterUUID; + private final String stateUUID; + + public Map getIndices() { + return indices; + } + + public long getTerm() { + return term; + } + + public long getVersion() { + return version; + } + + public String getClusterUUID() { + return clusterUUID; + } + + public String getStateUUID() { + return stateUUID; + } + + public ClusterMetadataMarker( + Map indices, + long term, + long version, + String clusterUUID, + String stateUUID + ) { + this.indices = Collections.unmodifiableMap(indices); + this.term = term; + this.version = version; + this.clusterUUID = clusterUUID; + this.stateUUID = stateUUID; + } + + public static Builder builder() { + return new Builder(); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(INDICES_FIELD.getPreferredName()); + { + for (UploadedIndexMetadata uploadedIndexMetadata : indices.values()) { + uploadedIndexMetadata.toXContent(builder, params); + } + } + builder.endObject(); + builder.field(TERM_FIELD.getPreferredName(), getTerm()) + .field(VERSION_FIELD.getPreferredName(), getVersion()) + .field(CLUSTER_UUID_FIELD.getPreferredName(), getClusterUUID()) + .field(STATE_UUID_FIELD.getPreferredName(), getStateUUID()); + return builder; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeMap(indices, StreamOutput::writeString, (stream, uploadedMetadata) -> uploadedMetadata.writeTo(stream)); + out.writeVLong(term); + out.writeVLong(version); + out.writeString(clusterUUID); + out.writeString(stateUUID); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final ClusterMetadataMarker that = (ClusterMetadataMarker) o; + return Objects.equals(indices, that.indices) + && term == that.term + && version == that.version + && Objects.equals(clusterUUID, that.clusterUUID) + && Objects.equals(stateUUID, that.stateUUID); + } + + @Override + public int hashCode() { + return Objects.hash(indices, term, version, clusterUUID, stateUUID); + } + + @Override + public String toString() { + return Strings.toString(MediaTypeRegistry.JSON, this); + } + + public static ClusterMetadataMarker fromXContent(XContentParser parser) throws IOException { + return PARSER.parse(parser, null); + } + + /** + * Builder for ClusterMetadataMarker + * + * @opensearch.internal + */ + public static class Builder { + + private Map indices; + private long term; + private long version; + private String clusterUUID; + private String stateUUID; + + public Builder indices(Map indices) { + this.indices = indices; + return this; + } + + public Builder term(long term) { + this.term = term; + return this; + } + + public Builder version(long version) { + this.version = version; + return this; + } + + public Builder clusterUUID(String clusterUUID) { + this.clusterUUID = clusterUUID; + return this; + } + + public Builder stateUUID(String stateUUID) { + this.stateUUID = stateUUID; + return this; + } + + public Map getIndices() { + return indices; + } + + public Builder() { + indices = new HashMap<>(); + } + + public ClusterMetadataMarker build() { + return new ClusterMetadataMarker(indices, term, version, clusterUUID, stateUUID); + } + + } + + /** + * Metadata for uploaded index metadata + * + * @opensearch.internal + */ + public static class UploadedIndexMetadata implements Writeable, ToXContentFragment { + + private static final ParseField INDEX_NAME_FIELD = new ParseField("index_name"); + private static final ParseField INDEX_UUID_FIELD = new ParseField("index_uuid"); + private static final ParseField UPLOADED_FILENAME_FIELD = new ParseField("uploaded_filename"); + + private static String indexName(Object[] fields) { + return (String) fields[0]; + } + + private static String indexUUID(Object[] fields) { + return (String) fields[1]; + } + + private static String uploadedFilename(Object[] fields) { + return (String) fields[2]; + } + + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "uploaded_index_metadata", + fields -> new UploadedIndexMetadata(indexName(fields), indexUUID(fields), uploadedFilename(fields)) + ); + + static { + PARSER.declareString(ConstructingObjectParser.constructorArg(), INDEX_NAME_FIELD); + PARSER.declareString(ConstructingObjectParser.constructorArg(), INDEX_UUID_FIELD); + PARSER.declareString(ConstructingObjectParser.constructorArg(), UPLOADED_FILENAME_FIELD); + } + + private final String indexName; + private final String indexUUID; + private final String uploadedFilename; + + public UploadedIndexMetadata(String indexName, String indexUUID, String uploadedFileName) { + this.indexName = indexName; + this.indexUUID = indexUUID; + this.uploadedFilename = uploadedFileName; + } + + public String getUploadedFilename() { + return uploadedFilename; + } + + public String getIndexName() { + return indexName; + } + + public String getIndexUUID() { + return indexUUID; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return builder.startObject(getIndexName()) + .field(INDEX_NAME_FIELD.getPreferredName(), getIndexName()) + .field(INDEX_UUID_FIELD.getPreferredName(), getIndexUUID()) + .field(UPLOADED_FILENAME_FIELD.getPreferredName(), getUploadedFilename()) + .endObject(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(indexName); + out.writeString(indexUUID); + out.writeString(uploadedFilename); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final UploadedIndexMetadata that = (UploadedIndexMetadata) o; + return Objects.equals(indexName, that.indexName) + && Objects.equals(indexUUID, that.indexUUID) + && Objects.equals(uploadedFilename, that.uploadedFilename); + } + + @Override + public int hashCode() { + return Objects.hash(indexName, indexUUID, uploadedFilename); + } + + @Override + public String toString() { + return Strings.toString(MediaTypeRegistry.JSON, this); + } + + public static UploadedIndexMetadata fromXContent(XContentParser parser) throws IOException { + return PARSER.parse(parser, null); + } + } +} diff --git a/server/src/main/java/org/opensearch/cluster/store/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/cluster/store/RemoteClusterStateService.java new file mode 100644 index 0000000000000..e6cf811804987 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/store/RemoteClusterStateService.java @@ -0,0 +1,246 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.store; + +import static org.opensearch.indices.IndicesService.CLUSTER_REMOTE_STATE_REPOSITORY_SETTING; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Supplier; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.store.ClusterMetadataMarker.UploadedIndexMetadata; +import org.opensearch.common.Nullable; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.settings.Settings; +import org.opensearch.indices.IndicesService; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.repositories.Repository; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat; + +/** + * A Service which provides APIs to upload and download cluster metadata from remote store. + * + * @opensearch.internal + */ +public class RemoteClusterStateService { + + public static final String METADATA_NAME_FORMAT = "%s.dat"; + + public static final String METADATA_MARKER_NAME_FORMAT = "%s"; + + public static final ChecksumBlobStoreFormat INDEX_METADATA_FORMAT = new ChecksumBlobStoreFormat<>( + "index-metadata", + METADATA_NAME_FORMAT, + IndexMetadata::fromXContent + ); + + public static final ChecksumBlobStoreFormat CLUSTER_METADATA_MARKER_FORMAT = new ChecksumBlobStoreFormat<>( + "cluster-metadata-marker", + METADATA_MARKER_NAME_FORMAT, + ClusterMetadataMarker::fromXContent + ); + private static final Logger logger = LogManager.getLogger(RemoteClusterStateService.class); + + private static final String DELIMITER = "__"; + + private final Supplier repositoriesService; + private final Settings settings; + private BlobStoreRepository blobStoreRepository; + + public RemoteClusterStateService(Supplier repositoriesService, Settings settings) { + this.repositoriesService = repositoriesService; + this.settings = settings; + } + + @Nullable + public ClusterMetadataMarker writeFullMetadata(ClusterState clusterState) throws IOException { + if (clusterState.nodes().isLocalNodeElectedClusterManager() == false) { + logger.error("Local node is not elected cluster manager. Exiting"); + return null; + } + initializeRepository(); + if (blobStoreRepository == null) { + logger.error("Unable to set repository"); + return null; + } + + final Map allUploadedIndexMetadata = new HashMap<>(); + // todo parallel upload + // any validations before/after upload ? + for (IndexMetadata indexMetadata : clusterState.metadata().indices().values()) { + // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/index/ftqsCnn9TgOX/metadata_4_1690947200 + final String indexMetadataKey = writeIndexMetadata( + clusterState.getClusterName().value(), + clusterState.getMetadata().clusterUUID(), + indexMetadata, + indexMetadataFileName(indexMetadata) + ); + final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata( + indexMetadata.getIndex().getName(), + indexMetadata.getIndexUUID(), + indexMetadataKey + ); + allUploadedIndexMetadata.put(indexMetadata.getIndex().getName(), uploadedIndexMetadata); + } + return uploadMarker(clusterState, allUploadedIndexMetadata); + } + + @Nullable + public ClusterMetadataMarker writeIncrementalMetadata( + ClusterState previousClusterState, + ClusterState clusterState, + ClusterMetadataMarker previousMarker + ) throws IOException { + if (clusterState.nodes().isLocalNodeElectedClusterManager() == false) { + logger.error("Local node is not elected cluster manager. Exiting"); + return null; + } + assert previousClusterState.metadata().coordinationMetadata().term() == clusterState.metadata().coordinationMetadata().term(); + final Map indexMetadataVersionByName = new HashMap<>(); + for (final IndexMetadata indexMetadata : previousClusterState.metadata().indices().values()) { + indexMetadataVersionByName.putIfAbsent(indexMetadata.getIndex().getName(), indexMetadata.getVersion()); + } + + int numIndicesUpdated = 0; + int numIndicesUnchanged = 0; + final Map allUploadedIndexMetadata = new HashMap<>( + previousMarker.getIndices() + ); + for (final IndexMetadata indexMetadata : clusterState.metadata().indices().values()) { + final Long previousVersion = indexMetadataVersionByName.get(indexMetadata.getIndex().getName()); + if (previousVersion == null || indexMetadata.getVersion() != previousVersion) { + logger.trace( + "updating metadata for [{}], changing version from [{}] to [{}]", + indexMetadata.getIndex(), + previousVersion, + indexMetadata.getVersion() + ); + numIndicesUpdated++; + final String indexMetadataKey = writeIndexMetadata( + clusterState.getClusterName().value(), + clusterState.getMetadata().clusterUUID(), + indexMetadata, + indexMetadataFileName(indexMetadata) + ); + final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata( + indexMetadata.getIndex().getName(), + indexMetadata.getIndexUUID(), + indexMetadataKey + ); + allUploadedIndexMetadata.put(indexMetadata.getIndex().getName(), uploadedIndexMetadata); + } else { + numIndicesUnchanged++; + } + indexMetadataVersionByName.remove(indexMetadata.getIndex().getName()); + } + + for (String removedIndexName : indexMetadataVersionByName.keySet()) { + allUploadedIndexMetadata.remove(removedIndexName); + } + return uploadMarker(clusterState, allUploadedIndexMetadata); + } + + public ClusterState getLatestClusterState(String clusterUUID) { + // todo + return null; + } + + // Visible for testing + void initializeRepository() { + if (blobStoreRepository != null) { + return; + } + if (IndicesService.CLUSTER_REMOTE_STORE_ENABLED_SETTING.get(settings)) { + final String remoteStoreRepo = CLUSTER_REMOTE_STATE_REPOSITORY_SETTING.get(settings); + final Repository repository = repositoriesService.get().repository(remoteStoreRepo); + assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository"; + blobStoreRepository = (BlobStoreRepository) repository; + } + } + + private ClusterMetadataMarker uploadMarker( + ClusterState clusterState, + Map uploadedIndexMetadata + ) throws IOException { + synchronized (this) { + final String markerFileName = getMarkerFileName(clusterState.term(), clusterState.version()); + final ClusterMetadataMarker marker = new ClusterMetadataMarker( + uploadedIndexMetadata, + clusterState.term(), + clusterState.getVersion(), + clusterState.metadata().clusterUUID(), + clusterState.stateUUID() + ); + writeMetadataMarker(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), marker, markerFileName); + return marker; + } + } + + private String writeIndexMetadata(String clusterName, String clusterUUID, IndexMetadata indexMetadata, String fileName) + throws IOException { + final BlobContainer indexMetadataContainer = indexMetadataContainer(clusterName, clusterUUID, indexMetadata.getIndexUUID()); + INDEX_METADATA_FORMAT.write(indexMetadata, indexMetadataContainer, fileName, blobStoreRepository.getCompressor()); + // returning full path + return indexMetadataContainer.path().buildAsString() + fileName; + } + + private void writeMetadataMarker(String clusterName, String clusterUUID, ClusterMetadataMarker marker, String fileName) + throws IOException { + final BlobContainer metadataMarkerContainer = markerContainer(clusterName, clusterUUID); + CLUSTER_METADATA_MARKER_FORMAT.write(marker, metadataMarkerContainer, fileName, blobStoreRepository.getCompressor()); + } + + private BlobContainer indexMetadataContainer(String clusterName, String clusterUUID, String indexUUID) { + // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/index/ftqsCnn9TgOX + return blobStoreRepository.blobStore() + .blobContainer( + blobStoreRepository.basePath() + .add(Base64.getUrlEncoder().withoutPadding().encodeToString(clusterName.getBytes(StandardCharsets.UTF_8))) + .add("cluster-state") + .add(clusterUUID) + .add("index") + .add(indexUUID) + ); + } + + private BlobContainer markerContainer(String clusterName, String clusterUUID) { + // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/marker + return blobStoreRepository.blobStore() + .blobContainer( + blobStoreRepository.basePath() + .add(Base64.getUrlEncoder().withoutPadding().encodeToString(clusterName.getBytes(StandardCharsets.UTF_8))) + .add("cluster-state") + .add(clusterUUID) + .add("marker") + ); + } + + private static String getMarkerFileName(long term, long version) { + // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/marker/2147483642_2147483637_456536447_marker + return String.join( + DELIMITER, + "marker", + String.valueOf(Long.MAX_VALUE - term), + String.valueOf(Long.MAX_VALUE - version), + String.valueOf(Long.MAX_VALUE - System.currentTimeMillis()) + ); + } + + private static String indexMetadataFileName(IndexMetadata indexMetadata) { + return String.join(DELIMITER, "metadata", String.valueOf(indexMetadata.getVersion()), String.valueOf(System.currentTimeMillis())); + } + +} diff --git a/server/src/main/java/org/opensearch/cluster/store/package-info.java b/server/src/main/java/org/opensearch/cluster/store/package-info.java new file mode 100644 index 0000000000000..a36412c512a00 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/store/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Package containing class to perform operations on remote cluster state + */ +package org.opensearch.cluster.store; diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index e30876d5985f2..a01b8fe8ee05b 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -675,7 +675,8 @@ public void apply(Settings value, Settings current, Settings previous) { List.of( IndicesService.CLUSTER_REMOTE_STORE_ENABLED_SETTING, IndicesService.CLUSTER_REMOTE_SEGMENT_STORE_REPOSITORY_SETTING, - IndicesService.CLUSTER_REMOTE_TRANSLOG_REPOSITORY_SETTING + IndicesService.CLUSTER_REMOTE_TRANSLOG_REPOSITORY_SETTING, + IndicesService.CLUSTER_REMOTE_STATE_REPOSITORY_SETTING ), List.of(FeatureFlags.CONCURRENT_SEGMENT_SEARCH), List.of( diff --git a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java index ad9faef067c89..346ef581c5eb9 100644 --- a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java +++ b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java @@ -52,6 +52,8 @@ import org.opensearch.cluster.metadata.MetadataIndexUpgradeService; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.cluster.store.ClusterMetadataMarker; +import org.opensearch.cluster.store.RemoteClusterStateService; import org.opensearch.common.SetOnce; import org.opensearch.common.collect.Tuple; import org.opensearch.common.settings.Settings; @@ -599,4 +601,79 @@ public void close() throws IOException { IOUtils.close(persistenceWriter.getAndSet(null)); } } + + /** + * Encapsulates the writing of metadata to a remote store using {@link RemoteClusterStateService}. + */ + public static class RemotePersistedState implements PersistedState { + + private ClusterState lastAcceptedState; + private ClusterMetadataMarker lastAcceptedMarker; + private final RemoteClusterStateService remoteClusterStateService; + + public RemotePersistedState(final RemoteClusterStateService remoteClusterStateService) { + this.remoteClusterStateService = remoteClusterStateService; + } + + @Override + public long getCurrentTerm() { + return lastAcceptedState != null ? lastAcceptedState.term() : 0L; + } + + @Override + public ClusterState getLastAcceptedState() { + return lastAcceptedState; + } + + @Override + public void setCurrentTerm(long currentTerm) { + // no-op + // For LucenePersistedState, setCurrentTerm is used only while handling StartJoinRequest by all follower nodes. + // But for RemotePersistedState, the state is only pushed by the active cluster. So this method is not required. + } + + @Override + public void setLastAcceptedState(ClusterState clusterState) { + try { + if (lastAcceptedState == null || lastAcceptedState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) { + // On the initial bootstrap, repository will not be available. So we do not persist the cluster state and bail out. + lastAcceptedState = clusterState; + return; + } else { + + } + final ClusterMetadataMarker marker; + if (shouldWriteFullClusterState(clusterState)) { + marker = remoteClusterStateService.writeFullMetadata(clusterState); + } else { + marker = remoteClusterStateService.writeIncrementalMetadata(lastAcceptedState, clusterState, lastAcceptedMarker); + } + lastAcceptedState = clusterState; + lastAcceptedMarker = marker; + } catch (Exception e) { + handleExceptionOnWrite(e); + } + } + + private boolean shouldWriteFullClusterState(ClusterState clusterState) { + if (lastAcceptedState == null || lastAcceptedMarker == null || lastAcceptedState.term() != clusterState.term()) { + return true; + } + return false; + } + + @Override + public void markLastAcceptedStateAsCommitted() { + // TODO + } + + @Override + public void close() throws IOException { + PersistedState.super.close(); + } + + private void handleExceptionOnWrite(Exception e) { + throw ExceptionsHelper.convertToRuntime(e); + } + } } diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index e7c1502191aee..e860195a691f5 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -272,6 +272,16 @@ public class IndicesService extends AbstractLifecycleComponent Property.Final ); + /** + * Used to specify default repo to use for translog upload for remote store backed indices + */ + public static final Setting CLUSTER_REMOTE_STATE_REPOSITORY_SETTING = Setting.simpleString( + "cluster.remote_store.state.repository", + "", + Property.NodeScope, + Property.Final + ); + /** * The node's settings. */ diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index fcb281d708b9c..2c6076d2e3d59 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -789,6 +789,10 @@ public RepositoryMetadata getMetadata() { return metadata; } + public Compressor getCompressor() { + return compressor; + } + @Override public RepositoryStats stats() { final BlobStore store = blobStore.get(); diff --git a/server/src/test/java/org/opensearch/cluster/store/ClusterMetadataMarkerTests.java b/server/src/test/java/org/opensearch/cluster/store/ClusterMetadataMarkerTests.java new file mode 100644 index 0000000000000..a3ce3a5cbf59f --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/store/ClusterMetadataMarkerTests.java @@ -0,0 +1,42 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.store; + +import java.io.IOException; +import java.util.Collections; +import org.opensearch.cluster.store.ClusterMetadataMarker.UploadedIndexMetadata; +import org.opensearch.common.xcontent.json.JsonXContent; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.test.OpenSearchTestCase; + +public class ClusterMetadataMarkerTests extends OpenSearchTestCase { + + public void testXContent() throws IOException { + UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "test-uuid", "/test/upload/path"); + ClusterMetadataMarker originalMarker = new ClusterMetadataMarker( + Collections.singletonMap(uploadedIndexMetadata.getIndexName(), uploadedIndexMetadata), + 1L, + 1L, + "test-cluster-uuid", + "test-state-uuid" + ); + final XContentBuilder builder = JsonXContent.contentBuilder(); + builder.startObject(); + originalMarker.toXContent(builder, ToXContent.EMPTY_PARAMS); + builder.endObject(); + + try (XContentParser parser = createParser(JsonXContent.jsonXContent, BytesReference.bytes(builder))) { + final ClusterMetadataMarker fromXContentMarker = ClusterMetadataMarker.fromXContent(parser); + assertEquals(originalMarker, fromXContentMarker); + } + } +} diff --git a/server/src/test/java/org/opensearch/cluster/store/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/cluster/store/RemoteClusterStateServiceTests.java new file mode 100644 index 0000000000000..0c0622fcff22b --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/store/RemoteClusterStateServiceTests.java @@ -0,0 +1,244 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.store; + +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.function.Supplier; +import org.junit.Assert; +import org.junit.Before; +import org.mockito.ArgumentMatchers; +import org.opensearch.Version; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.coordination.CoordinationMetadata; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.store.ClusterMetadataMarker.UploadedIndexMetadata; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.BlobStore; +import org.opensearch.common.compress.DeflateCompressor; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.index.Index; +import org.opensearch.indices.IndicesService; +import org.opensearch.repositories.FilterRepository; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.repositories.RepositoryMissingException; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.test.OpenSearchTestCase; + +public class RemoteClusterStateServiceTests extends OpenSearchTestCase { + + private RemoteClusterStateService remoteClusterStateService; + private Supplier repositoriesServiceSupplier; + private RepositoriesService repositoriesService; + private BlobStoreRepository blobStoreRepository; + + @Before + public void setup() { + repositoriesServiceSupplier = mock(Supplier.class); + repositoriesService = mock(RepositoriesService.class); + when(repositoriesServiceSupplier.get()).thenReturn(repositoriesService); + final Settings settings = Settings.builder() + .put(IndicesService.CLUSTER_REMOTE_STORE_ENABLED_SETTING.getKey(), true) + .put(IndicesService.CLUSTER_REMOTE_STATE_REPOSITORY_SETTING.getKey(), "remote_store_repository") + .build(); + blobStoreRepository = mock(BlobStoreRepository.class); + when(repositoriesService.repository("remote_store_repository")).thenReturn(blobStoreRepository); + remoteClusterStateService = new RemoteClusterStateService(repositoriesServiceSupplier, settings); + } + + public void testFailWriteFullMetadataNonClusterManagerNode() throws IOException { + final ClusterState clusterState = generateClusterState(); + final ClusterMetadataMarker marker = remoteClusterStateService.writeFullMetadata(clusterState); + Assert.assertThat(marker, nullValue()); + } + + public void testFailWriteFullMetadataWhenRemoteStateDisabled() throws IOException { + final DiscoveryNodes nodes = DiscoveryNodes.builder() + .clusterManagerNodeId("cluster-manager-id") + .localNodeId("cluster-manager-id") + .build(); + final Settings settings = Settings.builder().build(); + remoteClusterStateService = spy(new RemoteClusterStateService(repositoriesServiceSupplier, settings)); + final ClusterState clusterState = ClusterState.builder(generateClusterState()).nodes(nodes).build(); + final ClusterMetadataMarker marker = remoteClusterStateService.writeFullMetadata(clusterState); + verify(remoteClusterStateService, times(1)).initializeRepository(); + Assert.assertThat(marker, nullValue()); + } + + public void testFailWriteFullMetadataWhenRepositoryNotSet() { + final DiscoveryNodes nodes = DiscoveryNodes.builder() + .clusterManagerNodeId("cluster-manager-id") + .localNodeId("cluster-manager-id") + .build(); + final ClusterState clusterState = ClusterState.builder(generateClusterState()).nodes(nodes).build(); + doThrow(new RepositoryMissingException("repository missing")).when(repositoriesService).repository("remote_store_repository"); + assertThrows(RepositoryMissingException.class, () -> remoteClusterStateService.writeFullMetadata(clusterState)); + } + + public void testFailWriteFullMetadataWhenNotBlobRepository() { + final FilterRepository filterRepository = mock(FilterRepository.class); + when(repositoriesService.repository("remote_store_repository")).thenReturn(filterRepository); + final DiscoveryNodes nodes = DiscoveryNodes.builder() + .clusterManagerNodeId("cluster-manager-id") + .localNodeId("cluster-manager-id") + .build(); + final ClusterState clusterState = ClusterState.builder(generateClusterState()).nodes(nodes).build(); + assertThrows(AssertionError.class, () -> remoteClusterStateService.writeFullMetadata(clusterState)); + } + + public void testWriteFullMetadataSuccess() throws IOException { + final DiscoveryNodes nodes = DiscoveryNodes.builder() + .clusterManagerNodeId("cluster-manager-id") + .localNodeId("cluster-manager-id") + .build(); + final ClusterState clusterState = ClusterState.builder(generateClusterState()).nodes(nodes).build(); + final BlobStore blobStore = mock(BlobStore.class); + when(blobStoreRepository.blobStore()).thenReturn(blobStore); + final BlobPath blobPath = mock(BlobPath.class); + when((blobStoreRepository.basePath())).thenReturn(blobPath); + when(blobPath.add(anyString())).thenReturn(blobPath); + when(blobPath.buildAsString()).thenReturn("/blob/path/"); + final BlobContainer blobContainer = mock(BlobContainer.class); + when(blobContainer.path()).thenReturn(blobPath); + when(blobStore.blobContainer(ArgumentMatchers.any())).thenReturn(blobContainer); + when(blobStoreRepository.getCompressor()).thenReturn(new DeflateCompressor()); + + final ClusterMetadataMarker marker = remoteClusterStateService.writeFullMetadata(clusterState); + final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "index-uuid", "metadata-filename"); + Map indices = Map.of("test-index", uploadedIndexMetadata); + + final ClusterMetadataMarker expectedMarker = ClusterMetadataMarker.builder() + .indices(indices) + .term(1L) + .version(1L) + .stateUUID("state-uuid") + .clusterUUID("cluster-uuid") + .build(); + + assertThat(marker.getIndices().size(), is(1)); + assertThat(marker.getIndices().get("test-index").getIndexName(), is(uploadedIndexMetadata.getIndexName())); + assertThat(marker.getIndices().get("test-index").getIndexUUID(), is(uploadedIndexMetadata.getIndexUUID())); + assertThat(marker.getIndices().get("test-index").getUploadedFilename(), notNullValue()); + assertThat(marker.getTerm(), is(expectedMarker.getTerm())); + assertThat(marker.getVersion(), is(expectedMarker.getVersion())); + assertThat(marker.getClusterUUID(), is(expectedMarker.getClusterUUID())); + assertThat(marker.getStateUUID(), is(expectedMarker.getStateUUID())); + } + + public void testFailWriteIncrementalMetadataNonClusterManagerNode() throws IOException { + final ClusterState clusterState = generateClusterState(); + final ClusterMetadataMarker marker = remoteClusterStateService.writeIncrementalMetadata(clusterState, clusterState, null); + Assert.assertThat(marker, nullValue()); + } + + public void testFailWriteIncrementalMetadataWhenTermChanged() { + final DiscoveryNodes nodes = DiscoveryNodes.builder() + .clusterManagerNodeId("cluster-manager-id") + .localNodeId("cluster-manager-id") + .build(); + final ClusterState clusterState = ClusterState.builder(generateClusterState()).nodes(nodes).build(); + final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder().term(2L).build(); + final ClusterState previousClusterState = ClusterState.builder(ClusterName.DEFAULT) + .metadata(Metadata.builder().coordinationMetadata(coordinationMetadata)) + .build(); + assertThrows( + AssertionError.class, + () -> remoteClusterStateService.writeIncrementalMetadata(previousClusterState, clusterState, null) + ); + } + + public void testWriteIncrementalMetadataSuccess() throws IOException { + final DiscoveryNodes nodes = DiscoveryNodes.builder() + .clusterManagerNodeId("cluster-manager-id") + .localNodeId("cluster-manager-id") + .build(); + final ClusterState clusterState = ClusterState.builder(generateClusterState()).nodes(nodes).build(); + final BlobStore blobStore = mock(BlobStore.class); + when(blobStoreRepository.blobStore()).thenReturn(blobStore); + final BlobPath blobPath = mock(BlobPath.class); + when((blobStoreRepository.basePath())).thenReturn(blobPath); + when(blobPath.add(anyString())).thenReturn(blobPath); + when(blobPath.buildAsString()).thenReturn("/blob/path/"); + final BlobContainer blobContainer = mock(BlobContainer.class); + when(blobContainer.path()).thenReturn(blobPath); + when(blobStore.blobContainer(ArgumentMatchers.any())).thenReturn(blobContainer); + when(blobStoreRepository.getCompressor()).thenReturn(new DeflateCompressor()); + + final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder().term(1L).build(); + final ClusterState previousClusterState = ClusterState.builder(ClusterName.DEFAULT) + .metadata(Metadata.builder().coordinationMetadata(coordinationMetadata)) + .build(); + + final ClusterMetadataMarker previousMarker = ClusterMetadataMarker.builder().indices(Collections.emptyMap()).build(); + + remoteClusterStateService.initializeRepository(); + final ClusterMetadataMarker marker = remoteClusterStateService.writeIncrementalMetadata( + previousClusterState, + clusterState, + previousMarker + ); + final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "index-uuid", "metadata-filename"); + Map indices = Map.of("test-index", uploadedIndexMetadata); + + final ClusterMetadataMarker expectedMarker = ClusterMetadataMarker.builder() + .indices(indices) + .term(1L) + .version(1L) + .stateUUID("state-uuid") + .clusterUUID("cluster-uuid") + .build(); + + assertThat(marker.getIndices().size(), is(1)); + assertThat(marker.getIndices().get("test-index").getIndexName(), is(uploadedIndexMetadata.getIndexName())); + assertThat(marker.getIndices().get("test-index").getIndexUUID(), is(uploadedIndexMetadata.getIndexUUID())); + assertThat(marker.getIndices().get("test-index").getUploadedFilename(), notNullValue()); + assertThat(marker.getTerm(), is(expectedMarker.getTerm())); + assertThat(marker.getVersion(), is(expectedMarker.getVersion())); + assertThat(marker.getClusterUUID(), is(expectedMarker.getClusterUUID())); + assertThat(marker.getStateUUID(), is(expectedMarker.getStateUUID())); + } + + private static ClusterState generateClusterState() { + final Index index = new Index("test-index", "index-uuid"); + final Settings idxSettings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, index.getUUID()) + .build(); + final IndexMetadata indexMetadata = new IndexMetadata.Builder(index.getName()).settings(idxSettings) + .numberOfShards(1) + .numberOfReplicas(0) + .build(); + final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder().term(1L).build(); + + return ClusterState.builder(ClusterName.DEFAULT) + .version(1L) + .stateUUID("state-uuid") + .metadata( + Metadata.builder().put(indexMetadata, true).clusterUUID("cluster-uuid").coordinationMetadata(coordinationMetadata).build() + ) + .build(); + } + +}