From 3c6a3b0f7a0e085aa88fe3600f308fb67464b2e9 Mon Sep 17 00:00:00 2001 From: Dhwanil Patel Date: Thu, 5 Oct 2023 19:17:23 +0530 Subject: [PATCH 01/16] Upload global cluster state to remote store Signed-off-by: Dhwanil Patel --- .../remote/ClusterMetadataManifest.java | 35 ++++++- .../remote/RemoteClusterStateService.java | 91 ++++++++++++++++++- 2 files changed, 123 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java index 40b16f3d6323b..2974fe437e16a 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java +++ b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java @@ -40,6 +40,7 @@ public class ClusterMetadataManifest implements Writeable, ToXContentFragment { private static final ParseField OPENSEARCH_VERSION_FIELD = new ParseField("opensearch_version"); private static final ParseField NODE_ID_FIELD = new ParseField("node_id"); private static final ParseField COMMITTED_FIELD = new ParseField("committed"); + private static final ParseField GLOBAL_METADATA_FIELD = new ParseField("global_metadata"); private static final ParseField INDICES_FIELD = new ParseField("indices"); private static final ParseField PREVIOUS_CLUSTER_UUID = new ParseField("previous_cluster_uuid"); private static final ParseField CLUSTER_UUID_COMMITTED = new ParseField("cluster_uuid_committed"); @@ -84,6 +85,10 @@ private static boolean clusterUUIDCommitted(Object[] fields) { return (boolean) fields[9]; } + private static String globalMetadataFileName(Object[] fields) { + return (String) fields[10]; + } + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( "cluster_metadata_manifest", fields -> new ClusterMetadataManifest( @@ -94,6 +99,7 @@ private static boolean clusterUUIDCommitted(Object[] fields) { opensearchVersion(fields), nodeId(fields), committed(fields), + globalMetadataFileName(fields), indices(fields), previousClusterUUID(fields), clusterUUIDCommitted(fields) @@ -115,8 +121,10 @@ private static boolean clusterUUIDCommitted(Object[] fields) { ); PARSER.declareString(ConstructingObjectParser.constructorArg(), PREVIOUS_CLUSTER_UUID); PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), CLUSTER_UUID_COMMITTED); + PARSER.declareString(ConstructingObjectParser.constructorArg(), GLOBAL_METADATA_FIELD); } + private final String globalMetadataFileName; private final List indices; private final long clusterTerm; private final long stateVersion; @@ -168,6 +176,10 @@ public boolean isClusterUUIDCommitted() { return clusterUUIDCommitted; } + public String getGlobalMetadataFileName() { + return globalMetadataFileName; + } + public ClusterMetadataManifest( long clusterTerm, long version, @@ -176,6 +188,7 @@ public ClusterMetadataManifest( Version opensearchVersion, String nodeId, boolean committed, + String globalMetadataFileName, List indices, String previousClusterUUID, boolean clusterUUIDCommitted @@ -187,6 +200,7 @@ public ClusterMetadataManifest( this.opensearchVersion = opensearchVersion; this.nodeId = nodeId; this.committed = committed; + this.globalMetadataFileName = globalMetadataFileName; this.indices = Collections.unmodifiableList(indices); this.previousClusterUUID = previousClusterUUID; this.clusterUUIDCommitted = clusterUUIDCommitted; @@ -203,6 +217,11 @@ public ClusterMetadataManifest(StreamInput in) throws IOException { this.indices = Collections.unmodifiableList(in.readList(UploadedIndexMetadata::new)); this.previousClusterUUID = in.readString(); this.clusterUUIDCommitted = in.readBoolean(); + if(in.getVersion().onOrAfter(Version.V_3_0_0)) { + this.globalMetadataFileName = in.readString(); + } else { + this.globalMetadataFileName = null; + } } public static Builder builder() { @@ -231,6 +250,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.endArray(); builder.field(PREVIOUS_CLUSTER_UUID.getPreferredName(), getPreviousClusterUUID()); builder.field(CLUSTER_UUID_COMMITTED.getPreferredName(), isClusterUUIDCommitted()); + builder.field(GLOBAL_METADATA_FIELD.getPreferredName(), getGlobalMetadataFileName()); return builder; } @@ -246,6 +266,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeCollection(indices); out.writeString(previousClusterUUID); out.writeBoolean(clusterUUIDCommitted); + if(out.getVersion().onOrAfter(Version.V_3_0_0)) { + out.writeString(globalMetadataFileName); + } } @Override @@ -266,12 +289,14 @@ public boolean equals(Object o) { && Objects.equals(nodeId, that.nodeId) && Objects.equals(committed, that.committed) && Objects.equals(previousClusterUUID, that.previousClusterUUID) - && Objects.equals(clusterUUIDCommitted, that.clusterUUIDCommitted); + && Objects.equals(clusterUUIDCommitted, that.clusterUUIDCommitted) + && Objects.equals(globalMetadataFileName, that.globalMetadataFileName); } @Override public int hashCode() { return Objects.hash( + globalMetadataFileName, indices, clusterTerm, stateVersion, @@ -301,6 +326,7 @@ public static ClusterMetadataManifest fromXContent(XContentParser parser) throws */ public static class Builder { + private String globalMetadataFileName; private List indices; private long clusterTerm; private long stateVersion; @@ -317,6 +343,11 @@ public Builder indices(List indices) { return this; } + public Builder globalMetadataFileName(String globalMetadataFileName) { + this.globalMetadataFileName = globalMetadataFileName; + return this; + } + public Builder clusterTerm(long clusterTerm) { this.clusterTerm = clusterTerm; return this; @@ -378,6 +409,7 @@ public Builder(ClusterMetadataManifest manifest) { this.opensearchVersion = manifest.opensearchVersion; this.nodeId = manifest.nodeId; this.committed = manifest.committed; + this.globalMetadataFileName = manifest.globalMetadataFileName; this.indices = new ArrayList<>(manifest.indices); this.previousClusterUUID = manifest.previousClusterUUID; this.clusterUUIDCommitted = manifest.clusterUUIDCommitted; @@ -392,6 +424,7 @@ public ClusterMetadataManifest build() { opensearchVersion, nodeId, committed, + globalMetadataFileName, indices, previousClusterUUID, clusterUUIDCommitted diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index dddc5376803a5..50dc2083d3394 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -14,11 +14,15 @@ import org.opensearch.Version; import org.opensearch.action.LatchedActionListener; import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.ComposableIndexTemplate; +import org.opensearch.cluster.metadata.ComposableIndexTemplateMetadata; import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; import org.opensearch.common.Nullable; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.BlobStore; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; @@ -27,6 +31,8 @@ import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.action.ActionListener; import org.opensearch.core.index.Index; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.ToXContentObject; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata; import org.opensearch.index.remote.RemoteStoreUtils; import org.opensearch.index.translog.transfer.BlobStoreTransferService; @@ -55,6 +61,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.LongSupplier; import java.util.function.Supplier; @@ -105,9 +112,11 @@ public class RemoteClusterStateService implements Closeable { private static final String CLUSTER_STATE_PATH_TOKEN = "cluster-state"; private static final String INDEX_PATH_TOKEN = "index"; + private static final String GLOBAL_METADATA_PATH_TOKEN = "global-metadata"; private static final String MANIFEST_PATH_TOKEN = "manifest"; private static final String MANIFEST_FILE_PREFIX = "manifest"; private static final String INDEX_METADATA_FILE_PREFIX = "metadata"; + private static final String GLOBAL_METADATA_FILE_PREFIX = "global-metadata"; private final String nodeId; private final Supplier repositoriesService; @@ -159,12 +168,15 @@ public ClusterMetadataManifest writeFullMetadata(ClusterState clusterState, Stri return null; } + // Write globalMetadata + String globalMetadataFile = writeGlobalMetadata(clusterState); + // any validations before/after upload ? final List allUploadedIndexMetadata = writeIndexMetadataParallel( clusterState, new ArrayList<>(clusterState.metadata().indices().values()) ); - final ClusterMetadataManifest manifest = uploadManifest(clusterState, allUploadedIndexMetadata, previousClusterUUID, false); + final ClusterMetadataManifest manifest = uploadManifest(clusterState, allUploadedIndexMetadata, previousClusterUUID, globalMetadataFile, false); final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos); if (durationMillis >= slowWriteLoggingThreshold.getMillis()) { logger.warn( @@ -203,6 +215,17 @@ public ClusterMetadataManifest writeIncrementalMetadata( return null; } assert previousClusterState.metadata().coordinationMetadata().term() == clusterState.metadata().coordinationMetadata().term(); + + // Write Global Metadata + final boolean updateGlobalMeta = Metadata.isGlobalStateEquals(previousClusterState.metadata(), clusterState.metadata()) == false; + String globalMetadataFile; + if(updateGlobalMeta) { + globalMetadataFile = writeGlobalMetadata(clusterState); + } else { + globalMetadataFile = previousManifest.getGlobalMetadataFileName(); + } + + // Write Index Metadata final Map previousStateIndexMetadataVersionByName = new HashMap<>(); for (final IndexMetadata indexMetadata : previousClusterState.metadata().indices().values()) { previousStateIndexMetadataVersionByName.put(indexMetadata.getIndex().getName(), indexMetadata.getVersion()); @@ -245,6 +268,7 @@ public ClusterMetadataManifest writeIncrementalMetadata( clusterState, new ArrayList<>(allUploadedIndexMetadata.values()), previousManifest.getPreviousClusterUUID(), + globalMetadataFile, false ); deleteStaleClusterMetadata(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), RETAINED_MANIFESTS); @@ -270,6 +294,52 @@ public ClusterMetadataManifest writeIncrementalMetadata( return manifest; } + /** + * Uploads provided ClusterState's global Metadata to remote store in parallel. + * The call is blocking so the method waits for upload to finish and then return. + * + * @param clusterState current ClusterState + * @return String file name where globalMetadata file is stored. + */ + private String writeGlobalMetadata(ClusterState clusterState) + throws IOException { + + AtomicReference result = new AtomicReference(); + final BlobContainer globalMetadataContainer = globalMetadataContainer( + clusterState.getClusterName().value(), + clusterState.metadata().clusterUUID() + ); + final String globalMetadataFilename = globalMetadataFileName(clusterState.metadata()); + + // latch to wait until upload is not finished + CountDownLatch latch = new CountDownLatch(1); + ActionListener completionListener = ActionListener.wrap( + resp -> { + result.set(globalMetadataContainer.path().buildAsString() + globalMetadataFilename); + latch.countDown(); + }, ex -> { + // TODO change the exception for Global Metadata + throw new IndexMetadataTransferException(ex.getMessage(), ex); + } + ); + + BlobStoreRepository.GLOBAL_METADATA_FORMAT.writeAsync( + clusterState.metadata(), + globalMetadataContainer, + globalMetadataFilename, + blobStoreRepository.getCompressor(), + completionListener + ); + + // TODO Add proper exception handling. + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return result.get(); + } + /** * Uploads provided IndexMetadata's to remote store in parallel. The call is blocking so the method waits for upload to finish and then return. * @@ -391,7 +461,7 @@ public ClusterMetadataManifest markLastStateAsCommitted(ClusterState clusterStat } assert clusterState != null : "Last accepted cluster state is not set"; assert previousManifest != null : "Last cluster metadata manifest is not set"; - return uploadManifest(clusterState, previousManifest.getIndices(), previousManifest.getPreviousClusterUUID(), true); + return uploadManifest(clusterState, previousManifest.getIndices(), previousManifest.getPreviousClusterUUID(), previousManifest.getGlobalMetadataFileName(), true); } @Override @@ -415,6 +485,7 @@ public void start() { private ClusterMetadataManifest uploadManifest( ClusterState clusterState, List uploadedIndexMetadata, + String globalClusterMetadataFileName, String previousClusterUUID, boolean committed ) throws IOException { @@ -428,6 +499,7 @@ private ClusterMetadataManifest uploadManifest( Version.CURRENT, nodeId, committed, + globalClusterMetadataFileName, uploadedIndexMetadata, previousClusterUUID, clusterState.metadata().clusterUUIDCommitted() @@ -459,6 +531,12 @@ private BlobContainer indexMetadataContainer(String clusterName, String clusterU .blobContainer(getCusterMetadataBasePath(clusterName, clusterUUID).add(INDEX_PATH_TOKEN).add(indexUUID)); } + private BlobContainer globalMetadataContainer(String clusterName, String clusterUUID) { + // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/global-metadata/ + return blobStoreRepository.blobStore() + .blobContainer(getCusterMetadataBasePath(clusterName, clusterUUID).add(GLOBAL_METADATA_PATH_TOKEN)); + } + private BlobContainer manifestContainer(String clusterName, String clusterUUID) { // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/manifest return blobStoreRepository.blobStore().blobContainer(getManifestFolderPath(clusterName, clusterUUID)); @@ -500,6 +578,15 @@ private static String indexMetadataFileName(IndexMetadata indexMetadata) { ); } + private static String globalMetadataFileName(Metadata metadata) { + return String.join( + DELIMITER, + GLOBAL_METADATA_FILE_PREFIX, + String.valueOf(metadata.version()), + String.valueOf(System.currentTimeMillis()) + ); + } + private BlobPath getManifestFolderPath(String clusterName, String clusterUUID) { return getCusterMetadataBasePath(clusterName, clusterUUID).add(MANIFEST_PATH_TOKEN); } From 3f92a8875e46959a44134582d2468ce29ad125e3 Mon Sep 17 00:00:00 2001 From: Dhwanil Patel Date: Fri, 6 Oct 2023 15:34:22 +0530 Subject: [PATCH 02/16] Incorporated comments and completed TODO Signed-off-by: Dhwanil Patel --- .../remote/RemoteClusterStateService.java | 89 ++++++++++++++----- .../blobstore/BlobStoreRepository.java | 3 +- .../blobstore/ChecksumBlobStoreFormat.java | 14 +-- 3 files changed, 79 insertions(+), 27 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 50dc2083d3394..19daa03bc0653 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -14,15 +14,12 @@ import org.opensearch.Version; import org.opensearch.action.LatchedActionListener; import org.opensearch.cluster.ClusterState; -import org.opensearch.cluster.metadata.ComposableIndexTemplate; -import org.opensearch.cluster.metadata.ComposableIndexTemplateMetadata; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.common.Nullable; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; -import org.opensearch.common.blobstore.BlobStore; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; @@ -32,7 +29,6 @@ import org.opensearch.core.action.ActionListener; import org.opensearch.core.index.Index; import org.opensearch.core.xcontent.ToXContent; -import org.opensearch.core.xcontent.ToXContentObject; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata; import org.opensearch.index.remote.RemoteStoreUtils; import org.opensearch.index.translog.transfer.BlobStoreTransferService; @@ -88,6 +84,7 @@ public class RemoteClusterStateService implements Closeable { private static final Logger logger = LogManager.getLogger(RemoteClusterStateService.class); public static final int INDEX_METADATA_UPLOAD_WAIT_MILLIS = 20000; + public static final int GLOBAL_METADATA_UPLOAD_WAIT_MILLIS = 20000; public static final ChecksumBlobStoreFormat INDEX_METADATA_FORMAT = new ChecksumBlobStoreFormat<>( "index-metadata", @@ -129,6 +126,18 @@ public class RemoteClusterStateService implements Closeable { private final AtomicBoolean deleteStaleMetadataRunning = new AtomicBoolean(false); + private static final int CURRENT_GLOBAL_METADATA_VERSION = 1; + + // ToXContent Params with gateway mode. + // We are using gateway context mode to persist all custom metadata. + private static final ToXContent.Params FORMAT_PARAMS; + static { + Map params = new HashMap<>(1); + params.put(Metadata.CONTEXT_MODE_PARAM, Metadata.CONTEXT_MODE_GATEWAY); + FORMAT_PARAMS = new ToXContent.MapParams(params); + } + + public RemoteClusterStateService( String nodeId, Supplier repositoriesService, @@ -313,14 +322,22 @@ private String writeGlobalMetadata(ClusterState clusterState) // latch to wait until upload is not finished CountDownLatch latch = new CountDownLatch(1); - ActionListener completionListener = ActionListener.wrap( - resp -> { - result.set(globalMetadataContainer.path().buildAsString() + globalMetadataFilename); - latch.countDown(); - }, ex -> { - // TODO change the exception for Global Metadata - throw new IndexMetadataTransferException(ex.getMessage(), ex); - } + + LatchedActionListener completionListener = new LatchedActionListener<>( + ActionListener.wrap( + resp -> { + logger.trace( + String.format(Locale.ROOT, "GlobalMetadata uploaded successfully.") + ); + result.set(globalMetadataContainer.path().buildAsString() + globalMetadataFilename); + }, ex -> { + logger.error( + () -> new ParameterizedMessage("Exception during transfer of GlobalMetadata to Remote {}", ex.getMessage()), + ex + ); + throw new GlobalMetadataTransferException(ex.getMessage(), ex); + }), + latch ); BlobStoreRepository.GLOBAL_METADATA_FORMAT.writeAsync( @@ -328,15 +345,32 @@ private String writeGlobalMetadata(ClusterState clusterState) globalMetadataContainer, globalMetadataFilename, blobStoreRepository.getCompressor(), - completionListener + completionListener, + FORMAT_PARAMS ); - // TODO Add proper exception handling. try { - latch.await(); - } catch (InterruptedException e) { - throw new RuntimeException(e); + if (latch.await(GLOBAL_METADATA_UPLOAD_WAIT_MILLIS, TimeUnit.MILLISECONDS) == false) { + GlobalMetadataTransferException ex = new GlobalMetadataTransferException( + String.format( + Locale.ROOT, + "Timed out waiting for transfer of global metadata to complete" + ) + ); + throw ex; + } + } catch (InterruptedException ex) { + GlobalMetadataTransferException exception = new GlobalMetadataTransferException( + String.format( + Locale.ROOT, + "Timed out waiting for transfer of index metadata to complete - %s" + ), + ex + ); + Thread.currentThread().interrupt(); + throw exception; } + return result.get(); } @@ -448,7 +482,8 @@ private void writeIndexMetadataAsync( indexMetadataContainer, indexMetadataFilename, blobStoreRepository.getCompressor(), - completionListener + completionListener, + FORMAT_PARAMS ); } @@ -485,8 +520,8 @@ public void start() { private ClusterMetadataManifest uploadManifest( ClusterState clusterState, List uploadedIndexMetadata, - String globalClusterMetadataFileName, String previousClusterUUID, + String globalClusterMetadataFileName, boolean committed ) throws IOException { synchronized (this) { @@ -582,7 +617,7 @@ private static String globalMetadataFileName(Metadata metadata) { return String.join( DELIMITER, GLOBAL_METADATA_FILE_PREFIX, - String.valueOf(metadata.version()), + String.valueOf(CURRENT_GLOBAL_METADATA_VERSION), String.valueOf(System.currentTimeMillis()) ); } @@ -872,6 +907,20 @@ public IndexMetadataTransferException(String errorDesc, Throwable cause) { } } + /** + * Exception for GlobalMetadata transfer failures to remote + */ + static class GlobalMetadataTransferException extends RuntimeException { + + public GlobalMetadataTransferException(String errorDesc) { + super(errorDesc); + } + + public GlobalMetadataTransferException(String errorDesc, Throwable cause) { + super(errorDesc, cause); + } + } + /** * Purges all remote cluster state against provided cluster UUIDs * diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index 490ebda24bf60..439996fe9f797 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -172,6 +172,7 @@ import java.util.stream.Stream; import static org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo.canonicalName; +import static org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat.SNAPSHOT_ONLY_FORMAT_PARAMS; /** * BlobStore - based implementation of Snapshot Repository @@ -3309,7 +3310,7 @@ private void writeShardIndexBlobAtomic( () -> new ParameterizedMessage("[{}] Writing shard index [{}] to [{}]", metadata.name(), indexGeneration, shardContainer.path()) ); final String blobName = INDEX_SHARD_SNAPSHOTS_FORMAT.blobName(String.valueOf(indexGeneration)); - writeAtomic(shardContainer, blobName, INDEX_SHARD_SNAPSHOTS_FORMAT.serialize(updatedSnapshots, blobName, compressor), true); + writeAtomic(shardContainer, blobName, INDEX_SHARD_SNAPSHOTS_FORMAT.serialize(updatedSnapshots, blobName, compressor, SNAPSHOT_ONLY_FORMAT_PARAMS), true); } // Unused blobs are all previous index-, data- and meta-blobs and that are not referenced by the new index- as well as all diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java b/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java index 7e1960171043a..a020acb5e11b2 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java @@ -83,7 +83,7 @@ public final class ChecksumBlobStoreFormat { // Serialization parameters to specify correct context for metadata serialization - private static final ToXContent.Params SNAPSHOT_ONLY_FORMAT_PARAMS; + public static final ToXContent.Params SNAPSHOT_ONLY_FORMAT_PARAMS; static { Map snapshotOnlyParams = new HashMap<>(); @@ -171,7 +171,7 @@ public T deserialize(String blobName, NamedXContentRegistry namedXContentRegistr */ public void write(final T obj, final BlobContainer blobContainer, final String name, final Compressor compressor) throws IOException { final String blobName = blobName(name); - final BytesReference bytes = serialize(obj, blobName, compressor); + final BytesReference bytes = serialize(obj, blobName, compressor, SNAPSHOT_ONLY_FORMAT_PARAMS); blobContainer.writeBlob(blobName, bytes.streamInput(), bytes.length(), false); } @@ -184,13 +184,15 @@ public void write(final T obj, final BlobContainer blobContainer, final String n * @param name blob name * @param compressor whether to use compression * @param listener listener to listen to write result + * @param params ToXContent params */ public void writeAsync( final T obj, final BlobContainer blobContainer, final String name, final Compressor compressor, - ActionListener listener + ActionListener listener, + final ToXContent.Params params ) throws IOException { if (blobContainer instanceof AsyncMultiStreamBlobContainer == false) { write(obj, blobContainer, name, compressor); @@ -198,7 +200,7 @@ public void writeAsync( return; } final String blobName = blobName(name); - final BytesReference bytes = serialize(obj, blobName, compressor); + final BytesReference bytes = serialize(obj, blobName, compressor, params); final String resourceDescription = "ChecksumBlobStoreFormat.writeAsync(blob=\"" + blobName + "\")"; try (IndexInput input = new ByteArrayIndexInput(resourceDescription, BytesReference.toBytes(bytes))) { long expectedChecksum; @@ -230,7 +232,7 @@ public void writeAsync( } } - public BytesReference serialize(final T obj, final String blobName, final Compressor compressor) throws IOException { + public BytesReference serialize(final T obj, final String blobName, final Compressor compressor, final ToXContent.Params params) throws IOException { try (BytesStreamOutput outputStream = new BytesStreamOutput()) { try ( OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput( @@ -254,7 +256,7 @@ public void close() throws IOException { ) ) { builder.startObject(); - obj.toXContent(builder, SNAPSHOT_ONLY_FORMAT_PARAMS); + obj.toXContent(builder, params); builder.endObject(); } CodecUtil.writeFooter(indexOutput); From c2cf6566e4c375d2199325c1a34dbeb2c71762bc Mon Sep 17 00:00:00 2001 From: Dhwanil Patel Date: Mon, 9 Oct 2023 18:34:59 +0530 Subject: [PATCH 03/16] Added UTs Signed-off-by: Dhwanil Patel --- CHANGELOG.md | 1 + .../remote/ClusterMetadataManifest.java | 4 +- .../remote/RemoteClusterStateService.java | 54 ++--- .../blobstore/BlobStoreRepository.java | 7 +- .../blobstore/ChecksumBlobStoreFormat.java | 3 +- .../coordination/CoordinationStateTests.java | 1 + .../remote/ClusterMetadataManifestTests.java | 2 + .../RemoteClusterStateServiceTests.java | 195 +++++++++++++++++- .../snapshots/BlobStoreFormatTests.java | 6 +- 9 files changed, 230 insertions(+), 43 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e408df2307587..7a57b4b068637 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Implement Visitor Design pattern in QueryBuilder to enable the capability to traverse through the complex QueryBuilder tree. ([#10110](https://github.com/opensearch-project/OpenSearch/pull/10110)) - Provide service accounts tokens to extensions ([#9618](https://github.com/opensearch-project/OpenSearch/pull/9618)) - Configurable merge policy for index with an option to choose from LogByteSize and Tiered merge policy ([#9992](https://github.com/opensearch-project/OpenSearch/pull/9992)) +- Upload global cluster state to remote store([#10404](https://github.com/opensearch-project/OpenSearch/pull/10404)) ### Dependencies - Bump `log4j-core` from 2.18.0 to 2.19.0 diff --git a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java index 2974fe437e16a..cdcb91539644b 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java +++ b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java @@ -217,7 +217,7 @@ public ClusterMetadataManifest(StreamInput in) throws IOException { this.indices = Collections.unmodifiableList(in.readList(UploadedIndexMetadata::new)); this.previousClusterUUID = in.readString(); this.clusterUUIDCommitted = in.readBoolean(); - if(in.getVersion().onOrAfter(Version.V_3_0_0)) { + if (in.getVersion().onOrAfter(Version.V_3_0_0)) { this.globalMetadataFileName = in.readString(); } else { this.globalMetadataFileName = null; @@ -266,7 +266,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeCollection(indices); out.writeString(previousClusterUUID); out.writeBoolean(clusterUUIDCommitted); - if(out.getVersion().onOrAfter(Version.V_3_0_0)) { + if (out.getVersion().onOrAfter(Version.V_3_0_0)) { out.writeString(globalMetadataFileName); } } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index be31ddc20fd6d..55bd2f2e974bd 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -128,16 +128,16 @@ public class RemoteClusterStateService implements Closeable { private static final int CURRENT_GLOBAL_METADATA_VERSION = 1; - // ToXContent Params with gateway mode. - // We are using gateway context mode to persist all custom metadata. - private static final ToXContent.Params FORMAT_PARAMS; + // TODO: For now using Gateway context, check if how we can cover all type of custom by default. + // ToXContent Params with gateway mode. + // We are using gateway context mode to persist all custom metadata. + public static final ToXContent.Params FORMAT_PARAMS; static { Map params = new HashMap<>(1); params.put(Metadata.CONTEXT_MODE_PARAM, Metadata.CONTEXT_MODE_GATEWAY); FORMAT_PARAMS = new ToXContent.MapParams(params); } - public RemoteClusterStateService( String nodeId, Supplier repositoriesService, @@ -185,7 +185,13 @@ public ClusterMetadataManifest writeFullMetadata(ClusterState clusterState, Stri clusterState, new ArrayList<>(clusterState.metadata().indices().values()) ); - final ClusterMetadataManifest manifest = uploadManifest(clusterState, allUploadedIndexMetadata, previousClusterUUID, globalMetadataFile, false); + final ClusterMetadataManifest manifest = uploadManifest( + clusterState, + allUploadedIndexMetadata, + previousClusterUUID, + globalMetadataFile, + false + ); final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos); if (durationMillis >= slowWriteLoggingThreshold.getMillis()) { logger.warn( @@ -228,7 +234,7 @@ public ClusterMetadataManifest writeIncrementalMetadata( // Write Global Metadata final boolean updateGlobalMeta = Metadata.isGlobalStateEquals(previousClusterState.metadata(), clusterState.metadata()) == false; String globalMetadataFile; - if(updateGlobalMeta) { + if (updateGlobalMeta) { globalMetadataFile = writeGlobalMetadata(clusterState); } else { globalMetadataFile = previousManifest.getGlobalMetadataFileName(); @@ -310,8 +316,7 @@ public ClusterMetadataManifest writeIncrementalMetadata( * @param clusterState current ClusterState * @return String file name where globalMetadata file is stored. */ - private String writeGlobalMetadata(ClusterState clusterState) - throws IOException { + private String writeGlobalMetadata(ClusterState clusterState) throws IOException { AtomicReference result = new AtomicReference(); final BlobContainer globalMetadataContainer = globalMetadataContainer( @@ -323,22 +328,13 @@ private String writeGlobalMetadata(ClusterState clusterState) // latch to wait until upload is not finished CountDownLatch latch = new CountDownLatch(1); - LatchedActionListener completionListener = new LatchedActionListener<>( - ActionListener.wrap( - resp -> { - logger.trace( - String.format(Locale.ROOT, "GlobalMetadata uploaded successfully.") - ); - result.set(globalMetadataContainer.path().buildAsString() + globalMetadataFilename); - }, ex -> { - logger.error( - () -> new ParameterizedMessage("Exception during transfer of GlobalMetadata to Remote {}", ex.getMessage()), - ex - ); - throw new GlobalMetadataTransferException(ex.getMessage(), ex); - }), - latch - ); + LatchedActionListener completionListener = new LatchedActionListener<>(ActionListener.wrap(resp -> { + logger.trace(String.format(Locale.ROOT, "GlobalMetadata uploaded successfully.")); + result.set(globalMetadataContainer.path().buildAsString() + globalMetadataFilename); + }, ex -> { + logger.error(() -> new ParameterizedMessage("Exception during transfer of GlobalMetadata to Remote {}", ex.getMessage()), ex); + throw new GlobalMetadataTransferException(ex.getMessage(), ex); + }), latch); BlobStoreRepository.GLOBAL_METADATA_FORMAT.writeAsync( clusterState.metadata(), @@ -352,19 +348,13 @@ private String writeGlobalMetadata(ClusterState clusterState) try { if (latch.await(GLOBAL_METADATA_UPLOAD_WAIT_MILLIS, TimeUnit.MILLISECONDS) == false) { GlobalMetadataTransferException ex = new GlobalMetadataTransferException( - String.format( - Locale.ROOT, - "Timed out waiting for transfer of global metadata to complete" - ) + String.format(Locale.ROOT, "Timed out waiting for transfer of global metadata to complete") ); throw ex; } } catch (InterruptedException ex) { GlobalMetadataTransferException exception = new GlobalMetadataTransferException( - String.format( - Locale.ROOT, - "Timed out waiting for transfer of index metadata to complete - %s" - ), + String.format(Locale.ROOT, "Timed out waiting for transfer of index metadata to complete - %s"), ex ); Thread.currentThread().interrupt(); diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index faaf1f288c620..56dbc6003af82 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -3336,7 +3336,12 @@ private void writeShardIndexBlobAtomic( () -> new ParameterizedMessage("[{}] Writing shard index [{}] to [{}]", metadata.name(), indexGeneration, shardContainer.path()) ); final String blobName = INDEX_SHARD_SNAPSHOTS_FORMAT.blobName(String.valueOf(indexGeneration)); - writeAtomic(shardContainer, blobName, INDEX_SHARD_SNAPSHOTS_FORMAT.serialize(updatedSnapshots, blobName, compressor, SNAPSHOT_ONLY_FORMAT_PARAMS), true); + writeAtomic( + shardContainer, + blobName, + INDEX_SHARD_SNAPSHOTS_FORMAT.serialize(updatedSnapshots, blobName, compressor, SNAPSHOT_ONLY_FORMAT_PARAMS), + true + ); } // Unused blobs are all previous index-, data- and meta-blobs and that are not referenced by the new index- as well as all diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java b/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java index a020acb5e11b2..17cb68f798094 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java @@ -232,7 +232,8 @@ public void writeAsync( } } - public BytesReference serialize(final T obj, final String blobName, final Compressor compressor, final ToXContent.Params params) throws IOException { + public BytesReference serialize(final T obj, final String blobName, final Compressor compressor, final ToXContent.Params params) + throws IOException { try (BytesStreamOutput outputStream = new BytesStreamOutput()) { try ( OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput( diff --git a/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java b/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java index f37823d2c0c7d..ce6f6119a85d9 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java @@ -938,6 +938,7 @@ public void testHandlePrePublishAndCommitWhenRemoteStateEnabled() throws IOExcep Version.CURRENT, randomAlphaOfLength(10), false, + randomAlphaOfLength(10), Collections.emptyList(), randomAlphaOfLength(10), true diff --git a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java index 66426c2a880a3..99e37afb4d989 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java @@ -36,6 +36,7 @@ public void testClusterMetadataManifestXContent() throws IOException { Version.CURRENT, "test-node-id", false, + "test-global-metadata-file", Collections.singletonList(uploadedIndexMetadata), "prev-cluster-uuid", true @@ -60,6 +61,7 @@ public void testClusterMetadataManifestSerializationEqualsHashCode() { Version.CURRENT, "B10RX1f5RJenMQvYccCgSQ", true, + "test-global-metadata-file", randomUploadedIndexMetadataList(), "yfObdx8KSMKKrXf8UyHhM", true diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index 6ecbc23f75bee..1e3b34c1768b8 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -65,6 +65,7 @@ import org.mockito.ArgumentMatchers; import static org.opensearch.gateway.remote.RemoteClusterStateService.DELIMITER; +import static org.opensearch.gateway.remote.RemoteClusterStateService.FORMAT_PARAMS; import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_FILE_PREFIX; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX; @@ -228,14 +229,15 @@ public void testWriteFullMetadataInParallelSuccess() throws IOException { assertThat(manifest.getIndices().get(0).getIndexName(), is(uploadedIndexMetadata.getIndexName())); assertThat(manifest.getIndices().get(0).getIndexUUID(), is(uploadedIndexMetadata.getIndexUUID())); assertThat(manifest.getIndices().get(0).getUploadedFilename(), notNullValue()); + assertThat(manifest.getGlobalMetadataFileName(), notNullValue()); assertThat(manifest.getClusterTerm(), is(expectedManifest.getClusterTerm())); assertThat(manifest.getStateVersion(), is(expectedManifest.getStateVersion())); assertThat(manifest.getClusterUUID(), is(expectedManifest.getClusterUUID())); assertThat(manifest.getStateUUID(), is(expectedManifest.getStateUUID())); assertThat(manifest.getPreviousClusterUUID(), is(expectedManifest.getPreviousClusterUUID())); - assertEquals(actionListenerArgumentCaptor.getAllValues().size(), 1); - assertEquals(writeContextArgumentCaptor.getAllValues().size(), 1); + assertEquals(actionListenerArgumentCaptor.getAllValues().size(), 2); + assertEquals(writeContextArgumentCaptor.getAllValues().size(), 2); WriteContext capturedWriteContext = writeContextArgumentCaptor.getValue(); byte[] writtenBytes = capturedWriteContext.getStreamProvider(Integer.MAX_VALUE).provideStream(0).getInputStream().readAllBytes(); @@ -259,7 +261,7 @@ public void testWriteFullMetadataInParallelSuccess() throws IOException { } - public void testWriteFullMetadataInParallelFailure() throws IOException { + public void testWriteFullMetadataInParallelFailureForGlobalMetadata() throws IOException { final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); AsyncMultiStreamBlobContainer container = (AsyncMultiStreamBlobContainer) mockBlobStoreObjects(AsyncMultiStreamBlobContainer.class); @@ -270,6 +272,27 @@ public void testWriteFullMetadataInParallelFailure() throws IOException { return null; }).when(container).asyncBlobUpload(any(WriteContext.class), actionListenerArgumentCaptor.capture()); + remoteClusterStateService.start(); + assertThrows( + RemoteClusterStateService.GlobalMetadataTransferException.class, + () -> remoteClusterStateService.writeFullMetadata(clusterState, randomAlphaOfLength(10)) + ); + } + + public void testWriteFullMetadataInParallelFailureForIndexMetadata() throws IOException { + final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); + AsyncMultiStreamBlobContainer container = (AsyncMultiStreamBlobContainer) mockBlobStoreObjects(AsyncMultiStreamBlobContainer.class); + + ArgumentCaptor> actionListenerArgumentCaptor = ArgumentCaptor.forClass(ActionListener.class); + + doAnswer((i) -> { + actionListenerArgumentCaptor.getValue().onResponse(null); + return null; + }).doAnswer((i) -> { + actionListenerArgumentCaptor.getValue().onFailure(new RuntimeException("Cannot upload to remote")); + return null; + }).when(container).asyncBlobUpload(any(WriteContext.class), actionListenerArgumentCaptor.capture()); + remoteClusterStateService.start(); assertThrows( RemoteClusterStateService.IndexMetadataTransferException.class, @@ -334,6 +357,145 @@ public void testWriteIncrementalMetadataSuccess() throws IOException { assertThat(manifest.getStateUUID(), is(expectedManifest.getStateUUID())); } + public void testWriteIncrementalGlobalMetadataSuccess() throws IOException { + final ClusterState clusterState = generateClusterStateWithGlobalMetadata().nodes(nodesWithLocalNodeClusterManager()).build(); + mockBlobStoreObjects(); + final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder().term(1L).build(); + final ClusterState previousClusterState = ClusterState.builder(ClusterName.DEFAULT) + .metadata(Metadata.builder().coordinationMetadata(coordinationMetadata)) + .build(); + + final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder().indices(Collections.emptyList()).build(); + + remoteClusterStateService.start(); + final ClusterMetadataManifest manifest = remoteClusterStateService.writeIncrementalMetadata( + previousClusterState, + clusterState, + previousManifest + ); + + final ClusterMetadataManifest expectedManifest = ClusterMetadataManifest.builder() + .indices(Collections.emptyList()) + .globalMetadataFileName("mock-filename") + .clusterTerm(1L) + .stateVersion(1L) + .stateUUID("state-uuid") + .clusterUUID("cluster-uuid") + .previousClusterUUID("prev-cluster-uuid") + .build(); + + assertThat(manifest.getGlobalMetadataFileName(), notNullValue()); + assertThat(manifest.getClusterTerm(), is(expectedManifest.getClusterTerm())); + assertThat(manifest.getStateVersion(), is(expectedManifest.getStateVersion())); + assertThat(manifest.getClusterUUID(), is(expectedManifest.getClusterUUID())); + assertThat(manifest.getStateUUID(), is(expectedManifest.getStateUUID())); + } + + public void testGlobalMetadataNotUpdatingIndexMetadata() throws IOException { + // setup + mockBlobStoreObjects(); + final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder().term(1L).build(); + final ClusterState initialClusterState = ClusterState.builder(ClusterName.DEFAULT) + .metadata(Metadata.builder().coordinationMetadata(coordinationMetadata)) + .build(); + final ClusterMetadataManifest initialManifest = ClusterMetadataManifest.builder().indices(Collections.emptyList()).build(); + remoteClusterStateService.start(); + + // Initial cluster state with index. + final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); + // Updating remote cluster state with changing index metadata + final ClusterMetadataManifest manifestAfterIndexMetadataUpdate = remoteClusterStateService.writeIncrementalMetadata( + initialClusterState, + clusterState, + initialManifest + ); + + // new cluster state where only global metadata is different + Metadata newMetadata = Metadata.builder(clusterState.metadata()) + .persistentSettings(Settings.builder().put("cluster.blocks.read_only", true).build()) + .build(); + ClusterState newClusterState = ClusterState.builder(clusterState).metadata(newMetadata).build(); + + // updating remote cluster state with global metadata + final ClusterMetadataManifest manifestAfterGlobalMetadataUpdate = remoteClusterStateService.writeIncrementalMetadata( + clusterState, + newClusterState, + manifestAfterIndexMetadataUpdate + ); + + // Verify that index metadata information is same in manifest files + assertThat(manifestAfterIndexMetadataUpdate.getIndices().size(), is(manifestAfterGlobalMetadataUpdate.getIndices().size())); + assertThat( + manifestAfterIndexMetadataUpdate.getIndices().get(0).getIndexName(), + is(manifestAfterGlobalMetadataUpdate.getIndices().get(0).getIndexName()) + ); + assertThat( + manifestAfterIndexMetadataUpdate.getIndices().get(0).getIndexUUID(), + is(manifestAfterGlobalMetadataUpdate.getIndices().get(0).getIndexUUID()) + ); + + // since timestamp is part of file name, if file name is same we can confirm that file is not update in global metadata update + assertThat( + manifestAfterIndexMetadataUpdate.getIndices().get(0).getUploadedFilename(), + is(manifestAfterGlobalMetadataUpdate.getIndices().get(0).getUploadedFilename()) + ); + + // global metadata file would have changed + assertFalse( + manifestAfterIndexMetadataUpdate.getGlobalMetadataFileName() + .equalsIgnoreCase(manifestAfterGlobalMetadataUpdate.getGlobalMetadataFileName()) + ); + } + + public void testIndexMetadataNotUpdatingGlobalMetadata() throws IOException { + // setup + mockBlobStoreObjects(); + final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder().term(1L).build(); + final ClusterState initialClusterState = ClusterState.builder(ClusterName.DEFAULT) + .metadata(Metadata.builder().coordinationMetadata(coordinationMetadata)) + .build(); + final ClusterMetadataManifest initialManifest = ClusterMetadataManifest.builder().indices(Collections.emptyList()).build(); + remoteClusterStateService.start(); + + // Initial cluster state with global metadata. + final ClusterState clusterState = generateClusterStateWithGlobalMetadata().nodes(nodesWithLocalNodeClusterManager()).build(); + + // Updating remote cluster state with changing global metadata + final ClusterMetadataManifest manifestAfterGlobalMetadataUpdate = remoteClusterStateService.writeIncrementalMetadata( + initialClusterState, + clusterState, + initialManifest + ); + + // new cluster state where only Index metadata is different + final IndexMetadata indexMetadata = new IndexMetadata.Builder("test").settings( + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, "uuid") + .build() + ).numberOfShards(1).numberOfReplicas(0).build(); + Metadata newMetadata = Metadata.builder(clusterState.metadata()).put(indexMetadata, true).build(); + ClusterState newClusterState = ClusterState.builder(clusterState).metadata(newMetadata).build(); + + // updating remote cluster state with index metadata + final ClusterMetadataManifest manifestAfterIndexMetadataUpdate = remoteClusterStateService.writeIncrementalMetadata( + clusterState, + newClusterState, + manifestAfterGlobalMetadataUpdate + ); + + // Verify that global metadata information is same in manifest files after updating index Metadata + // since timestamp is part of file name, if file name is same we can confirm that file is not update in index metadata update + assertThat( + manifestAfterIndexMetadataUpdate.getGlobalMetadataFileName(), + is(manifestAfterGlobalMetadataUpdate.getGlobalMetadataFileName()) + ); + + // Index metadata would have changed + assertThat(manifestAfterGlobalMetadataUpdate.getIndices().size(), is(0)); + assertThat(manifestAfterIndexMetadataUpdate.getIndices().size(), is(1)); + } + public void testReadLatestMetadataManifestFailedIOException() throws IOException { final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); @@ -398,6 +560,7 @@ public void testReadLatestMetadataManifestSuccessButNoIndexMetadata() throws IOE .nodeId("nodeA") .opensearchVersion(VersionUtils.randomOpenSearchVersion(random())) .previousClusterUUID("prev-cluster-uuid") + .globalMetadataFileName("global-metadata-file") .build(); BlobContainer blobContainer = mockBlobStoreObjects(); @@ -424,6 +587,7 @@ public void testReadLatestMetadataManifestSuccessButIndexMetadataFetchIOExceptio .nodeId("nodeA") .opensearchVersion(VersionUtils.randomOpenSearchVersion(random())) .previousClusterUUID("prev-cluster-uuid") + .globalMetadataFileName("global-metadata") .build(); BlobContainer blobContainer = mockBlobStoreObjects(); @@ -454,6 +618,7 @@ public void testReadLatestMetadataManifestSuccess() throws IOException { .clusterUUID("cluster-uuid") .nodeId("nodeA") .opensearchVersion(VersionUtils.randomOpenSearchVersion(random())) + .globalMetadataFileName("global-metadata") .previousClusterUUID("prev-cluster-uuid") .build(); @@ -500,6 +665,7 @@ public void testReadLatestIndexMetadataSuccess() throws IOException { .nodeId("nodeA") .opensearchVersion(VersionUtils.randomOpenSearchVersion(random())) .previousClusterUUID("prev-cluster-uuid") + .globalMetadataFileName("global-metadata") .build(); mockBlobContainer(mockBlobStoreObjects(), expectedManifest, Map.of(index.getUUID(), indexMetadata)); @@ -782,6 +948,7 @@ private ClusterMetadataManifest generateClusterMetadataManifest( .previousClusterUUID(previousClusterUUID) .committed(true) .clusterUUIDCommitted(true) + .globalMetadataFileName("test-global-metadata") .build(); } @@ -821,7 +988,8 @@ private void mockBlobContainer( BytesReference bytes = RemoteClusterStateService.CLUSTER_METADATA_MANIFEST_FORMAT.serialize( clusterMetadataManifest, "manifestFileName", - blobStoreRepository.getCompressor() + blobStoreRepository.getCompressor(), + FORMAT_PARAMS ); when(blobContainer.readBlob("manifestFileName")).thenReturn(new ByteArrayInputStream(bytes.streamInput().readAllBytes())); @@ -835,7 +1003,8 @@ private void mockBlobContainer( BytesReference bytesIndexMetadata = RemoteClusterStateService.INDEX_METADATA_FORMAT.serialize( indexMetadata, fileName, - blobStoreRepository.getCompressor() + blobStoreRepository.getCompressor(), + FORMAT_PARAMS ); when(blobContainer.readBlob(fileName + ".dat")).thenReturn( new ByteArrayInputStream(bytesIndexMetadata.streamInput().readAllBytes()) @@ -846,6 +1015,22 @@ private void mockBlobContainer( }); } + private static ClusterState.Builder generateClusterStateWithGlobalMetadata() { + final Settings clusterSettings = Settings.builder().put("cluster.blocks.read_only", true).build(); + final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder().term(1L).build(); + + return ClusterState.builder(ClusterName.DEFAULT) + .version(1L) + .stateUUID("state-uuid") + .metadata( + Metadata.builder() + .persistentSettings(clusterSettings) + .clusterUUID("cluster-uuid") + .coordinationMetadata(coordinationMetadata) + .build() + ); + } + private static ClusterState.Builder generateClusterStateWithOneIndex() { final Index index = new Index("test-index", "index-uuid"); final Settings idxSettings = Settings.builder() diff --git a/server/src/test/java/org/opensearch/snapshots/BlobStoreFormatTests.java b/server/src/test/java/org/opensearch/snapshots/BlobStoreFormatTests.java index 03f0d27188027..c114b56bd0b39 100644 --- a/server/src/test/java/org/opensearch/snapshots/BlobStoreFormatTests.java +++ b/server/src/test/java/org/opensearch/snapshots/BlobStoreFormatTests.java @@ -152,14 +152,16 @@ public void onFailure(Exception e) { mockBlobContainer, "check-smile", CompressorRegistry.none(), - actionListener + actionListener, + ChecksumBlobStoreFormat.SNAPSHOT_ONLY_FORMAT_PARAMS ); checksumSMILE.writeAsync( new BlobObj("checksum smile compressed"), mockBlobContainer, "check-smile-comp", CompressorRegistry.getCompressor(DeflateCompressor.NAME), - actionListener + actionListener, + ChecksumBlobStoreFormat.SNAPSHOT_ONLY_FORMAT_PARAMS ); latch.await(); From c9a5632c01ca9ea8f6d764a11dfab30a6b2a77e5 Mon Sep 17 00:00:00 2001 From: Dhwanil Patel Date: Tue, 10 Oct 2023 16:45:29 +0530 Subject: [PATCH 04/16] Add logic for clearing stale global metadata file Signed-off-by: Dhwanil Patel --- .../gateway/remote/RemoteClusterStateService.java | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 55bd2f2e974bd..939485b19f3f3 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -128,7 +128,6 @@ public class RemoteClusterStateService implements Closeable { private static final int CURRENT_GLOBAL_METADATA_VERSION = 1; - // TODO: For now using Gateway context, check if how we can cover all type of custom by default. // ToXContent Params with gateway mode. // We are using gateway context mode to persist all custom metadata. public static final ToXContent.Params FORMAT_PARAMS; @@ -615,8 +614,9 @@ private static String globalMetadataFileName(Metadata metadata) { return String.join( DELIMITER, GLOBAL_METADATA_FILE_PREFIX, - String.valueOf(CURRENT_GLOBAL_METADATA_VERSION), - String.valueOf(System.currentTimeMillis()) + String.valueOf(metadata.version()), + String.valueOf(System.currentTimeMillis()), + String.valueOf(CURRENT_GLOBAL_METADATA_VERSION) ); } @@ -1022,6 +1022,7 @@ private void deleteClusterMetadata( Set filesToKeep = new HashSet<>(); Set staleManifestPaths = new HashSet<>(); Set staleIndexMetadataPaths = new HashSet<>(); + Set staleGlobalMetadataPaths = new HashSet<>(); activeManifestBlobMetadata.forEach(blobMetadata -> { ClusterMetadataManifest clusterMetadataManifest = fetchRemoteClusterMetadataManifest( clusterName, @@ -1030,6 +1031,7 @@ private void deleteClusterMetadata( ); clusterMetadataManifest.getIndices() .forEach(uploadedIndexMetadata -> filesToKeep.add(uploadedIndexMetadata.getUploadedFilename())); + filesToKeep.add(clusterMetadataManifest.getGlobalMetadataFileName()); }); staleManifestBlobMetadata.forEach(blobMetadata -> { ClusterMetadataManifest clusterMetadataManifest = fetchRemoteClusterMetadataManifest( @@ -1038,6 +1040,10 @@ private void deleteClusterMetadata( blobMetadata.name() ); staleManifestPaths.add(new BlobPath().add(MANIFEST_PATH_TOKEN).buildAsString() + blobMetadata.name()); + String[] globalMetadataSplitPath = clusterMetadataManifest.getGlobalMetadataFileName().split("/"); + staleGlobalMetadataPaths.add( + new BlobPath().add(GLOBAL_METADATA_PATH_TOKEN).buildAsString() + + BlobStoreRepository.GLOBAL_METADATA_FORMAT.blobName(globalMetadataSplitPath[globalMetadataSplitPath.length -1])); clusterMetadataManifest.getIndices().forEach(uploadedIndexMetadata -> { if (filesToKeep.contains(uploadedIndexMetadata.getUploadedFilename()) == false) { staleIndexMetadataPaths.add( @@ -1054,6 +1060,7 @@ private void deleteClusterMetadata( return; } + deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleGlobalMetadataPaths)); deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleIndexMetadataPaths)); deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleManifestPaths)); } catch (IllegalStateException e) { From 2f9fc36eda90acf8de33b0a31af18ce93893580d Mon Sep 17 00:00:00 2001 From: Dhwanil Patel Date: Wed, 11 Oct 2023 14:11:50 +0530 Subject: [PATCH 05/16] Fix spotless check Signed-off-by: Dhwanil Patel --- .../gateway/remote/RemoteClusterStateService.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 718e0c1c9de79..e300514371af5 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -353,7 +353,7 @@ private String writeGlobalMetadata(ClusterState clusterState) throws IOException } } catch (InterruptedException ex) { GlobalMetadataTransferException exception = new GlobalMetadataTransferException( - String.format(Locale.ROOT, "Timed out waiting for transfer of index metadata to complete - %s"), + String.format(Locale.ROOT, "Timed out waiting for transfer of global metadata to complete - %s"), ex ); Thread.currentThread().interrupt(); @@ -1043,8 +1043,10 @@ private void deleteClusterMetadata( staleManifestPaths.add(new BlobPath().add(MANIFEST_PATH_TOKEN).buildAsString() + blobMetadata.name()); String[] globalMetadataSplitPath = clusterMetadataManifest.getGlobalMetadataFileName().split("/"); staleGlobalMetadataPaths.add( - new BlobPath().add(GLOBAL_METADATA_PATH_TOKEN).buildAsString() + - BlobStoreRepository.GLOBAL_METADATA_FORMAT.blobName(globalMetadataSplitPath[globalMetadataSplitPath.length -1])); + new BlobPath().add(GLOBAL_METADATA_PATH_TOKEN).buildAsString() + BlobStoreRepository.GLOBAL_METADATA_FORMAT.blobName( + globalMetadataSplitPath[globalMetadataSplitPath.length - 1] + ) + ); clusterMetadataManifest.getIndices().forEach(uploadedIndexMetadata -> { if (filesToKeep.contains(uploadedIndexMetadata.getUploadedFilename()) == false) { staleIndexMetadataPaths.add( From 0f3fea88142dd286adecd9ece41d07dfec9bf1c0 Mon Sep 17 00:00:00 2001 From: Dhwanil Patel Date: Wed, 11 Oct 2023 16:26:01 +0530 Subject: [PATCH 06/16] Changed the file name format for global metadata Signed-off-by: Dhwanil Patel --- .../remote/RemoteClusterStateService.java | 25 +++++++++++-------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index e300514371af5..dddbad69caadf 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -92,6 +92,12 @@ public class RemoteClusterStateService implements Closeable { IndexMetadata::fromXContent ); + public static final ChecksumBlobStoreFormat GLOBAL_METADATA_FORMAT = new ChecksumBlobStoreFormat<>( + "metadata", + METADATA_NAME_FORMAT, + Metadata::fromXContent + ); + public static final ChecksumBlobStoreFormat CLUSTER_METADATA_MANIFEST_FORMAT = new ChecksumBlobStoreFormat<>( "cluster-metadata-manifest", METADATA_MANIFEST_NAME_FORMAT, @@ -112,8 +118,7 @@ public class RemoteClusterStateService implements Closeable { public static final String GLOBAL_METADATA_PATH_TOKEN = "global-metadata"; public static final String MANIFEST_PATH_TOKEN = "manifest"; public static final String MANIFEST_FILE_PREFIX = "manifest"; - public static final String INDEX_METADATA_FILE_PREFIX = "metadata"; - public static final String GLOBAL_METADATA_FILE_PREFIX = "global-metadata"; + public static final String METADATA_FILE_PREFIX = "metadata"; private final String nodeId; private final Supplier repositoriesService; @@ -126,7 +131,7 @@ public class RemoteClusterStateService implements Closeable { private final AtomicBoolean deleteStaleMetadataRunning = new AtomicBoolean(false); - private static final int CURRENT_GLOBAL_METADATA_VERSION = 1; + private static final int GLOBAL_METADATA_CODEC_VERSION = 1; // ToXContent Params with gateway mode. // We are using gateway context mode to persist all custom metadata. @@ -335,7 +340,7 @@ private String writeGlobalMetadata(ClusterState clusterState) throws IOException throw new GlobalMetadataTransferException(ex.getMessage(), ex); }), latch); - BlobStoreRepository.GLOBAL_METADATA_FORMAT.writeAsync( + GLOBAL_METADATA_FORMAT.writeAsync( clusterState.metadata(), globalMetadataContainer, globalMetadataFilename, @@ -604,7 +609,7 @@ private static String getManifestFileNamePrefix(long term, long version) { private static String indexMetadataFileName(IndexMetadata indexMetadata) { return String.join( DELIMITER, - INDEX_METADATA_FILE_PREFIX, + METADATA_FILE_PREFIX, String.valueOf(indexMetadata.getVersion()), String.valueOf(System.currentTimeMillis()) ); @@ -613,10 +618,10 @@ private static String indexMetadataFileName(IndexMetadata indexMetadata) { private static String globalMetadataFileName(Metadata metadata) { return String.join( DELIMITER, - GLOBAL_METADATA_FILE_PREFIX, - String.valueOf(metadata.version()), - String.valueOf(System.currentTimeMillis()), - String.valueOf(CURRENT_GLOBAL_METADATA_VERSION) + METADATA_FILE_PREFIX, + RemoteStoreUtils.invertLong(metadata.version()), + RemoteStoreUtils.invertLong(GLOBAL_METADATA_CODEC_VERSION), + String.valueOf(System.currentTimeMillis()) ); } @@ -1043,7 +1048,7 @@ private void deleteClusterMetadata( staleManifestPaths.add(new BlobPath().add(MANIFEST_PATH_TOKEN).buildAsString() + blobMetadata.name()); String[] globalMetadataSplitPath = clusterMetadataManifest.getGlobalMetadataFileName().split("/"); staleGlobalMetadataPaths.add( - new BlobPath().add(GLOBAL_METADATA_PATH_TOKEN).buildAsString() + BlobStoreRepository.GLOBAL_METADATA_FORMAT.blobName( + new BlobPath().add(GLOBAL_METADATA_PATH_TOKEN).buildAsString() + GLOBAL_METADATA_FORMAT.blobName( globalMetadataSplitPath[globalMetadataSplitPath.length - 1] ) ); From e146ec301c5b4f43dc75b3f030168ee563d072df Mon Sep 17 00:00:00 2001 From: Dhwanil Patel Date: Thu, 12 Oct 2023 17:49:06 +0530 Subject: [PATCH 07/16] Fix issue in deleting stale file Signed-off-by: Dhwanil Patel --- .../gateway/remote/RemoteClusterStateService.java | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index dddbad69caadf..d7f370a1d2a3a 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -1046,12 +1046,14 @@ private void deleteClusterMetadata( blobMetadata.name() ); staleManifestPaths.add(new BlobPath().add(MANIFEST_PATH_TOKEN).buildAsString() + blobMetadata.name()); - String[] globalMetadataSplitPath = clusterMetadataManifest.getGlobalMetadataFileName().split("/"); - staleGlobalMetadataPaths.add( - new BlobPath().add(GLOBAL_METADATA_PATH_TOKEN).buildAsString() + GLOBAL_METADATA_FORMAT.blobName( - globalMetadataSplitPath[globalMetadataSplitPath.length - 1] - ) - ); + if (filesToKeep.contains(clusterMetadataManifest.getGlobalMetadataFileName()) == false) { + String[] globalMetadataSplitPath = clusterMetadataManifest.getGlobalMetadataFileName().split("/"); + staleGlobalMetadataPaths.add( + new BlobPath().add(GLOBAL_METADATA_PATH_TOKEN).buildAsString() + GLOBAL_METADATA_FORMAT.blobName( + globalMetadataSplitPath[globalMetadataSplitPath.length - 1] + ) + ); + } clusterMetadataManifest.getIndices().forEach(uploadedIndexMetadata -> { if (filesToKeep.contains(uploadedIndexMetadata.getUploadedFilename()) == false) { staleIndexMetadataPaths.add( From f52a8a4d3583ca3ba41a910bcc935cf33f0302fc Mon Sep 17 00:00:00 2001 From: Dhwanil Patel Date: Fri, 13 Oct 2023 11:38:53 +0530 Subject: [PATCH 08/16] Made global metadata as optional for parser Signed-off-by: Dhwanil Patel --- .../org/opensearch/gateway/remote/ClusterMetadataManifest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java index cdcb91539644b..62fe862789e5e 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java +++ b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java @@ -121,7 +121,7 @@ private static String globalMetadataFileName(Object[] fields) { ); PARSER.declareString(ConstructingObjectParser.constructorArg(), PREVIOUS_CLUSTER_UUID); PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), CLUSTER_UUID_COMMITTED); - PARSER.declareString(ConstructingObjectParser.constructorArg(), GLOBAL_METADATA_FIELD); + PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), GLOBAL_METADATA_FIELD); } private final String globalMetadataFileName; From b53629c24dafe78cd44774f0c9c767ee90d9e676 Mon Sep 17 00:00:00 2001 From: Dhwanil Patel Date: Mon, 16 Oct 2023 23:40:56 +0530 Subject: [PATCH 09/16] Use manifest codec version to handle backward compitiblity Signed-off-by: Dhwanil Patel --- CHANGELOG.md | 4 +- .../remote/ClusterMetadataManifest.java | 92 ++++++++++++++++--- .../remote/RemoteClusterStateService.java | 50 +++++++--- .../coordination/CoordinationStateTests.java | 1 + .../remote/ClusterMetadataManifestTests.java | 31 ++++++- .../RemoteClusterStateServiceTests.java | 72 ++++++++++++++- 6 files changed, 218 insertions(+), 32 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5ecd394fde0fd..4ae8c33f02296 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,7 +14,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Implement Visitor Design pattern in QueryBuilder to enable the capability to traverse through the complex QueryBuilder tree. ([#10110](https://github.com/opensearch-project/OpenSearch/pull/10110)) - Provide service accounts tokens to extensions ([#9618](https://github.com/opensearch-project/OpenSearch/pull/9618)) - Configurable merge policy for index with an option to choose from LogByteSize and Tiered merge policy ([#9992](https://github.com/opensearch-project/OpenSearch/pull/9992)) -- Upload global cluster state to remote store([#10404](https://github.com/opensearch-project/OpenSearch/pull/10404)) +- Upload global metadata in cluster state to remote store([#10404](https://github.com/opensearch-project/OpenSearch/pull/10404)) ### Dependencies - Bump `log4j-core` from 2.18.0 to 2.19.0 @@ -158,4 +158,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Security [Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.x...HEAD -[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.11...2.x \ No newline at end of file +[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.11...2.x diff --git a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java index 62fe862789e5e..73ddae6e67ec6 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java +++ b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java @@ -40,6 +40,7 @@ public class ClusterMetadataManifest implements Writeable, ToXContentFragment { private static final ParseField OPENSEARCH_VERSION_FIELD = new ParseField("opensearch_version"); private static final ParseField NODE_ID_FIELD = new ParseField("node_id"); private static final ParseField COMMITTED_FIELD = new ParseField("committed"); + private static final ParseField CODEC_VERSION_FIELD = new ParseField("codec_version"); private static final ParseField GLOBAL_METADATA_FIELD = new ParseField("global_metadata"); private static final ParseField INDICES_FIELD = new ParseField("indices"); private static final ParseField PREVIOUS_CLUSTER_UUID = new ParseField("previous_cluster_uuid"); @@ -85,8 +86,12 @@ private static boolean clusterUUIDCommitted(Object[] fields) { return (boolean) fields[9]; } + private static int codecVersion(Object[] fields) { + return (int) fields[10]; + } + private static String globalMetadataFileName(Object[] fields) { - return (String) fields[10]; + return (String) fields[11]; } private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( @@ -99,6 +104,25 @@ private static String globalMetadataFileName(Object[] fields) { opensearchVersion(fields), nodeId(fields), committed(fields), + 1, // Default codec version + null, // null global metadata for v1 manifest files + indices(fields), + previousClusterUUID(fields), + clusterUUIDCommitted(fields) + ) + ); + + private static final ConstructingObjectParser PARSER_V2 = new ConstructingObjectParser<>( + "cluster_metadata_manifest", + fields -> new ClusterMetadataManifest( + term(fields), + version(fields), + clusterUUID(fields), + stateUUID(fields), + opensearchVersion(fields), + nodeId(fields), + committed(fields), + codecVersion(fields), globalMetadataFileName(fields), indices(fields), previousClusterUUID(fields), @@ -107,23 +131,33 @@ private static String globalMetadataFileName(Object[] fields) { ); static { - PARSER.declareLong(ConstructingObjectParser.constructorArg(), CLUSTER_TERM_FIELD); - PARSER.declareLong(ConstructingObjectParser.constructorArg(), STATE_VERSION_FIELD); - PARSER.declareString(ConstructingObjectParser.constructorArg(), CLUSTER_UUID_FIELD); - PARSER.declareString(ConstructingObjectParser.constructorArg(), STATE_UUID_FIELD); - PARSER.declareInt(ConstructingObjectParser.constructorArg(), OPENSEARCH_VERSION_FIELD); - PARSER.declareString(ConstructingObjectParser.constructorArg(), NODE_ID_FIELD); - PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), COMMITTED_FIELD); - PARSER.declareObjectArray( + declareParser(PARSER, 1); + declareParser(PARSER_V2, 2); + } + + private static void declareParser(ConstructingObjectParser parser, long codec_version) { + parser.declareLong(ConstructingObjectParser.constructorArg(), CLUSTER_TERM_FIELD); + parser.declareLong(ConstructingObjectParser.constructorArg(), STATE_VERSION_FIELD); + parser.declareString(ConstructingObjectParser.constructorArg(), CLUSTER_UUID_FIELD); + parser.declareString(ConstructingObjectParser.constructorArg(), STATE_UUID_FIELD); + parser.declareInt(ConstructingObjectParser.constructorArg(), OPENSEARCH_VERSION_FIELD); + parser.declareString(ConstructingObjectParser.constructorArg(), NODE_ID_FIELD); + parser.declareBoolean(ConstructingObjectParser.constructorArg(), COMMITTED_FIELD); + parser.declareObjectArray( ConstructingObjectParser.constructorArg(), (p, c) -> UploadedIndexMetadata.fromXContent(p), INDICES_FIELD ); - PARSER.declareString(ConstructingObjectParser.constructorArg(), PREVIOUS_CLUSTER_UUID); - PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), CLUSTER_UUID_COMMITTED); - PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), GLOBAL_METADATA_FIELD); + parser.declareString(ConstructingObjectParser.constructorArg(), PREVIOUS_CLUSTER_UUID); + parser.declareBoolean(ConstructingObjectParser.constructorArg(), CLUSTER_UUID_COMMITTED); + + if (codec_version == 2) { + parser.declareInt(ConstructingObjectParser.constructorArg(), CODEC_VERSION_FIELD); + parser.declareString(ConstructingObjectParser.constructorArg(), GLOBAL_METADATA_FIELD); + } } + private final int codecVersion; private final String globalMetadataFileName; private final List indices; private final long clusterTerm; @@ -176,6 +210,10 @@ public boolean isClusterUUIDCommitted() { return clusterUUIDCommitted; } + public int getCodecVersion() { + return codecVersion; + } + public String getGlobalMetadataFileName() { return globalMetadataFileName; } @@ -188,6 +226,7 @@ public ClusterMetadataManifest( Version opensearchVersion, String nodeId, boolean committed, + int codecVersion, String globalMetadataFileName, List indices, String previousClusterUUID, @@ -200,10 +239,13 @@ public ClusterMetadataManifest( this.opensearchVersion = opensearchVersion; this.nodeId = nodeId; this.committed = committed; + this.codecVersion = codecVersion; this.globalMetadataFileName = globalMetadataFileName; this.indices = Collections.unmodifiableList(indices); this.previousClusterUUID = previousClusterUUID; this.clusterUUIDCommitted = clusterUUIDCommitted; + + System.out.println("Codec version ==== " + this.codecVersion + " global metadata === " + this.globalMetadataFileName); } public ClusterMetadataManifest(StreamInput in) throws IOException { @@ -218,8 +260,10 @@ public ClusterMetadataManifest(StreamInput in) throws IOException { this.previousClusterUUID = in.readString(); this.clusterUUIDCommitted = in.readBoolean(); if (in.getVersion().onOrAfter(Version.V_3_0_0)) { + this.codecVersion = in.readInt(); this.globalMetadataFileName = in.readString(); } else { + this.codecVersion = 1; // Default codec this.globalMetadataFileName = null; } } @@ -250,7 +294,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.endArray(); builder.field(PREVIOUS_CLUSTER_UUID.getPreferredName(), getPreviousClusterUUID()); builder.field(CLUSTER_UUID_COMMITTED.getPreferredName(), isClusterUUIDCommitted()); - builder.field(GLOBAL_METADATA_FIELD.getPreferredName(), getGlobalMetadataFileName()); + if (getCodecVersion() == 2) { + builder.field(CODEC_VERSION_FIELD.getPreferredName(), getCodecVersion()); + builder.field(GLOBAL_METADATA_FIELD.getPreferredName(), getGlobalMetadataFileName()); + } return builder; } @@ -267,6 +314,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeString(previousClusterUUID); out.writeBoolean(clusterUUIDCommitted); if (out.getVersion().onOrAfter(Version.V_3_0_0)) { + out.writeInt(codecVersion); out.writeString(globalMetadataFileName); } } @@ -290,12 +338,14 @@ public boolean equals(Object o) { && Objects.equals(committed, that.committed) && Objects.equals(previousClusterUUID, that.previousClusterUUID) && Objects.equals(clusterUUIDCommitted, that.clusterUUIDCommitted) - && Objects.equals(globalMetadataFileName, that.globalMetadataFileName); + && Objects.equals(globalMetadataFileName, that.globalMetadataFileName) + && Objects.equals(codecVersion, that.codecVersion); } @Override public int hashCode() { return Objects.hash( + codecVersion, globalMetadataFileName, indices, clusterTerm, @@ -316,9 +366,15 @@ public String toString() { } public static ClusterMetadataManifest fromXContent(XContentParser parser) throws IOException { + System.out.println(" in from x content ==== "); return PARSER.parse(parser, null); } + public static ClusterMetadataManifest fromXContentV2(XContentParser parser) throws IOException { + System.out.println(" in from x content V2 ==== "); + return PARSER_V2.parse(parser, null); + } + /** * Builder for ClusterMetadataManifest * @@ -327,6 +383,7 @@ public static ClusterMetadataManifest fromXContent(XContentParser parser) throws public static class Builder { private String globalMetadataFileName; + private int codecVersion; private List indices; private long clusterTerm; private long stateVersion; @@ -343,6 +400,11 @@ public Builder indices(List indices) { return this; } + public Builder codecVersion(int codecVersion) { + this.codecVersion = codecVersion; + return this; + } + public Builder globalMetadataFileName(String globalMetadataFileName) { this.globalMetadataFileName = globalMetadataFileName; return this; @@ -410,6 +472,7 @@ public Builder(ClusterMetadataManifest manifest) { this.nodeId = manifest.nodeId; this.committed = manifest.committed; this.globalMetadataFileName = manifest.globalMetadataFileName; + this.codecVersion = manifest.codecVersion; this.indices = new ArrayList<>(manifest.indices); this.previousClusterUUID = manifest.previousClusterUUID; this.clusterUUIDCommitted = manifest.clusterUUIDCommitted; @@ -424,6 +487,7 @@ public ClusterMetadataManifest build() { opensearchVersion, nodeId, committed, + codecVersion, globalMetadataFileName, indices, previousClusterUUID, diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index d7f370a1d2a3a..557a01eec1044 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -103,6 +103,10 @@ public class RemoteClusterStateService implements Closeable { METADATA_MANIFEST_NAME_FORMAT, ClusterMetadataManifest::fromXContent ); + + public static final ChecksumBlobStoreFormat CLUSTER_METADATA_MANIFEST_FORMAT_V2 = + new ChecksumBlobStoreFormat<>("cluster-metadata-manifest", METADATA_MANIFEST_NAME_FORMAT, ClusterMetadataManifest::fromXContentV2); + /** * Used to specify if cluster state metadata should be published to remote store */ @@ -132,6 +136,7 @@ public class RemoteClusterStateService implements Closeable { private final AtomicBoolean deleteStaleMetadataRunning = new AtomicBoolean(false); private static final int GLOBAL_METADATA_CODEC_VERSION = 1; + private static final int MANIFEST_CODEC_VERSION = 2; // TODO remove this once file name change PR is merged // ToXContent Params with gateway mode. // We are using gateway context mode to persist all custom metadata. @@ -181,6 +186,7 @@ public ClusterMetadataManifest writeFullMetadata(ClusterState clusterState, Stri return null; } + // TODO: we can upload global metadata and index metadata in parallel. [issue: #10645] // Write globalMetadata String globalMetadataFile = writeGlobalMetadata(clusterState); @@ -238,7 +244,9 @@ public ClusterMetadataManifest writeIncrementalMetadata( // Write Global Metadata final boolean updateGlobalMeta = Metadata.isGlobalStateEquals(previousClusterState.metadata(), clusterState.metadata()) == false; String globalMetadataFile; - if (updateGlobalMeta) { + // For migration case from codec V1 to V2, we have added null check on global metadata file, + // If file is empty and codec is 1 then write global metadata. + if (updateGlobalMeta || previousManifest.getGlobalMetadataFileName() == null) { globalMetadataFile = writeGlobalMetadata(clusterState); } else { globalMetadataFile = previousManifest.getGlobalMetadataFileName(); @@ -335,10 +343,7 @@ private String writeGlobalMetadata(ClusterState clusterState) throws IOException LatchedActionListener completionListener = new LatchedActionListener<>(ActionListener.wrap(resp -> { logger.trace(String.format(Locale.ROOT, "GlobalMetadata uploaded successfully.")); result.set(globalMetadataContainer.path().buildAsString() + globalMetadataFilename); - }, ex -> { - logger.error(() -> new ParameterizedMessage("Exception during transfer of GlobalMetadata to Remote {}", ex.getMessage()), ex); - throw new GlobalMetadataTransferException(ex.getMessage(), ex); - }), latch); + }, ex -> { throw new GlobalMetadataTransferException(ex.getMessage(), ex); }), latch); GLOBAL_METADATA_FORMAT.writeAsync( clusterState.metadata(), @@ -351,6 +356,7 @@ private String writeGlobalMetadata(ClusterState clusterState) throws IOException try { if (latch.await(GLOBAL_METADATA_UPLOAD_WAIT_MILLIS, TimeUnit.MILLISECONDS) == false) { + // TODO: We should add metrics where transfer is timing out. GlobalMetadataTransferException ex = new GlobalMetadataTransferException( String.format(Locale.ROOT, "Timed out waiting for transfer of global metadata to complete") ); @@ -536,6 +542,7 @@ private ClusterMetadataManifest uploadManifest( Version.CURRENT, nodeId, committed, + MANIFEST_CODEC_VERSION, globalClusterMetadataFileName, uploadedIndexMetadata, previousClusterUUID, @@ -549,7 +556,7 @@ private ClusterMetadataManifest uploadManifest( private void writeMetadataManifest(String clusterName, String clusterUUID, ClusterMetadataManifest uploadManifest, String fileName) throws IOException { final BlobContainer metadataManifestContainer = manifestContainer(clusterName, clusterUUID); - CLUSTER_METADATA_MANIFEST_FORMAT.write(uploadManifest, metadataManifestContainer, fileName, blobStoreRepository.getCompressor()); + CLUSTER_METADATA_MANIFEST_FORMAT_V2.write(uploadManifest, metadataManifestContainer, fileName, blobStoreRepository.getCompressor()); } private String fetchPreviousClusterUUID(String clusterName, String clusterUUID) { @@ -603,7 +610,14 @@ private static String getManifestFileName(long term, long version) { private static String getManifestFileNamePrefix(long term, long version) { // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/manifest/manifest_2147483642_2147483637 - return String.join(DELIMITER, MANIFEST_PATH_TOKEN, RemoteStoreUtils.invertLong(term), RemoteStoreUtils.invertLong(version)); + return String.join( + DELIMITER, + MANIFEST_PATH_TOKEN, + RemoteStoreUtils.invertLong(term), + RemoteStoreUtils.invertLong(version), + "1", + RemoteStoreUtils.invertLong(1) + ); } private static String indexMetadataFileName(IndexMetadata indexMetadata) { @@ -621,7 +635,7 @@ private static String globalMetadataFileName(Metadata metadata) { METADATA_FILE_PREFIX, RemoteStoreUtils.invertLong(metadata.version()), RemoteStoreUtils.invertLong(GLOBAL_METADATA_CODEC_VERSION), - String.valueOf(System.currentTimeMillis()) + RemoteStoreUtils.invertLong(System.currentTimeMillis()) ); } @@ -708,7 +722,8 @@ public String getLastKnownUUIDFromRemote(String clusterName) { return validChain.get(0); } catch (IOException e) { throw new IllegalStateException( - String.format(Locale.ROOT, "Error while fetching previous UUIDs from remote store for cluster name: %s", clusterName) + String.format(Locale.ROOT, "Error while fetching previous UUIDs from remote store for cluster name: %s", clusterName), + e ); } } @@ -729,7 +744,8 @@ private Map getLatestManifestForAllClusterUUIDs manifest.ifPresent(clusterMetadataManifest -> manifestsByClusterUUID.put(clusterUUID, clusterMetadataManifest)); } catch (Exception e) { throw new IllegalStateException( - String.format(Locale.ROOT, "Exception in fetching manifest for clusterUUID: %s", clusterUUID) + String.format(Locale.ROOT, "Exception in fetching manifest for clusterUUID: %s", clusterUUID), + e ); } } @@ -895,7 +911,7 @@ private Optional getLatestManifestFileName(String clusterName, String cl private ClusterMetadataManifest fetchRemoteClusterMetadataManifest(String clusterName, String clusterUUID, String filename) throws IllegalStateException { try { - return RemoteClusterStateService.CLUSTER_METADATA_MANIFEST_FORMAT.read( + return getClusterMetadataManifestBlobStoreFormat(filename).read( manifestContainer(clusterName, clusterUUID), filename, blobStoreRepository.getNamedXContentRegistry() @@ -905,6 +921,18 @@ private ClusterMetadataManifest fetchRemoteClusterMetadataManifest(String cluste } } + private ChecksumBlobStoreFormat getClusterMetadataManifestBlobStoreFormat(String fileName) { + if (fileName.split(DELIMITER).length < 6) { // Where codec is not part of file name, i.e. codec version 1 is used. + return CLUSTER_METADATA_MANIFEST_FORMAT; + } else { + long codecVersion = RemoteStoreUtils.invertLong(fileName.split(DELIMITER)[4]); + if (codecVersion == MANIFEST_CODEC_VERSION) { + return CLUSTER_METADATA_MANIFEST_FORMAT_V2; + } + } + throw new IllegalArgumentException("Cluster metadata manifest file is corrupted, don't have valid codec version"); + } + public static String encodeString(String content) { return Base64.getUrlEncoder().withoutPadding().encodeToString(content.getBytes(StandardCharsets.UTF_8)); } diff --git a/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java b/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java index ce6f6119a85d9..1c0dc7fc1ca2d 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java @@ -938,6 +938,7 @@ public void testHandlePrePublishAndCommitWhenRemoteStateEnabled() throws IOExcep Version.CURRENT, randomAlphaOfLength(10), false, + 1, randomAlphaOfLength(10), Collections.emptyList(), randomAlphaOfLength(10), diff --git a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java index 99e37afb4d989..d1d9137c36853 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java @@ -36,7 +36,8 @@ public void testClusterMetadataManifestXContent() throws IOException { Version.CURRENT, "test-node-id", false, - "test-global-metadata-file", + 1, + null, Collections.singletonList(uploadedIndexMetadata), "prev-cluster-uuid", true @@ -52,6 +53,33 @@ public void testClusterMetadataManifestXContent() throws IOException { } } + public void testClusterMetadataManifestXContentCodecV2() throws IOException { + UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "test-uuid", "/test/upload/path"); + ClusterMetadataManifest originalManifest = new ClusterMetadataManifest( + 1L, + 1L, + "test-cluster-uuid", + "test-state-uuid", + Version.CURRENT, + "test-node-id", + false, + 2, + "test-global-metadata-file", + Collections.singletonList(uploadedIndexMetadata), + "prev-cluster-uuid", + true + ); + final XContentBuilder builder = JsonXContent.contentBuilder(); + builder.startObject(); + originalManifest.toXContent(builder, ToXContent.EMPTY_PARAMS); + builder.endObject(); + + try (XContentParser parser = createParser(JsonXContent.jsonXContent, BytesReference.bytes(builder))) { + final ClusterMetadataManifest fromXContentManifest = ClusterMetadataManifest.fromXContentV2(parser); + assertEquals(originalManifest, fromXContentManifest); + } + } + public void testClusterMetadataManifestSerializationEqualsHashCode() { ClusterMetadataManifest initialManifest = new ClusterMetadataManifest( 1337L, @@ -61,6 +89,7 @@ public void testClusterMetadataManifestSerializationEqualsHashCode() { Version.CURRENT, "B10RX1f5RJenMQvYccCgSQ", true, + 1, "test-global-metadata-file", randomUploadedIndexMetadataList(), "yfObdx8KSMKKrXf8UyHhM", diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index 1e3b34c1768b8..b6d1ab874cbec 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -357,6 +357,51 @@ public void testWriteIncrementalMetadataSuccess() throws IOException { assertThat(manifest.getStateUUID(), is(expectedManifest.getStateUUID())); } + /* + * Here we will verify the migration of manifest file from codec V1 and V2. + * + * Initially codec version is 1 and global metadata is also null, we will perform index metadata update. + * In final manifest codec version should be 2 and + * global metadata should be updated, even if it was not changed in this cluster state update + */ + public void testMigrationFromCodecV1ManifestToCodecV2Manifest() throws IOException { + mockBlobStoreObjects(); + final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder().term(1L).build(); + final ClusterState previousClusterState = ClusterState.builder(ClusterName.DEFAULT) + .metadata(Metadata.builder().coordinationMetadata(coordinationMetadata)) + .nodes(nodesWithLocalNodeClusterManager()) + .build(); + + // Update only index metadata + final IndexMetadata indexMetadata = new IndexMetadata.Builder("test").settings( + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, "uuid") + .build() + ).numberOfShards(1).numberOfReplicas(0).build(); + Metadata newMetadata = Metadata.builder(previousClusterState.metadata()).put(indexMetadata, true).build(); + ClusterState newClusterState = ClusterState.builder(previousClusterState).metadata(newMetadata).build(); + + // previous manifest with codec 1 and null global metadata + final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder() + .codecVersion(1) + .globalMetadataFileName(null) + .indices(Collections.emptyList()) + .build(); + + remoteClusterStateService.start(); + final ClusterMetadataManifest manifestAfterUpdate = remoteClusterStateService.writeIncrementalMetadata( + previousClusterState, + newClusterState, + previousManifest + ); + + // global metadata is updated + assertThat(manifestAfterUpdate.getGlobalMetadataFileName(), notNullValue()); + // Manifest file with codec version with 2 is updated. + assertThat(manifestAfterUpdate.getCodecVersion(), is(2)); + } + public void testWriteIncrementalGlobalMetadataSuccess() throws IOException { final ClusterState clusterState = generateClusterStateWithGlobalMetadata().nodes(nodesWithLocalNodeClusterManager()).build(); mockBlobStoreObjects(); @@ -365,7 +410,11 @@ public void testWriteIncrementalGlobalMetadataSuccess() throws IOException { .metadata(Metadata.builder().coordinationMetadata(coordinationMetadata)) .build(); - final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder().indices(Collections.emptyList()).build(); + final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder() + .codecVersion(2) + .globalMetadataFileName("global-metadata-file") + .indices(Collections.emptyList()) + .build(); remoteClusterStateService.start(); final ClusterMetadataManifest manifest = remoteClusterStateService.writeIncrementalMetadata( @@ -391,6 +440,9 @@ public void testWriteIncrementalGlobalMetadataSuccess() throws IOException { assertThat(manifest.getStateUUID(), is(expectedManifest.getStateUUID())); } + /* + * Here we will verify global metadata is not uploaded again if change is only in index metadata + */ public void testGlobalMetadataNotUpdatingIndexMetadata() throws IOException { // setup mockBlobStoreObjects(); @@ -398,7 +450,11 @@ public void testGlobalMetadataNotUpdatingIndexMetadata() throws IOException { final ClusterState initialClusterState = ClusterState.builder(ClusterName.DEFAULT) .metadata(Metadata.builder().coordinationMetadata(coordinationMetadata)) .build(); - final ClusterMetadataManifest initialManifest = ClusterMetadataManifest.builder().indices(Collections.emptyList()).build(); + final ClusterMetadataManifest initialManifest = ClusterMetadataManifest.builder() + .codecVersion(2) + .globalMetadataFileName("global-metadata-file") + .indices(Collections.emptyList()) + .build(); remoteClusterStateService.start(); // Initial cluster state with index. @@ -447,6 +503,9 @@ public void testGlobalMetadataNotUpdatingIndexMetadata() throws IOException { ); } + /* + * Here we will verify index metadata is not uploaded again if change is only in global metadata + */ public void testIndexMetadataNotUpdatingGlobalMetadata() throws IOException { // setup mockBlobStoreObjects(); @@ -454,7 +513,10 @@ public void testIndexMetadataNotUpdatingGlobalMetadata() throws IOException { final ClusterState initialClusterState = ClusterState.builder(ClusterName.DEFAULT) .metadata(Metadata.builder().coordinationMetadata(coordinationMetadata)) .build(); - final ClusterMetadataManifest initialManifest = ClusterMetadataManifest.builder().indices(Collections.emptyList()).build(); + final ClusterMetadataManifest initialManifest = ClusterMetadataManifest.builder() + .codecVersion(2) + .indices(Collections.emptyList()) + .build(); remoteClusterStateService.start(); // Initial cluster state with global metadata. @@ -561,6 +623,7 @@ public void testReadLatestMetadataManifestSuccessButNoIndexMetadata() throws IOE .opensearchVersion(VersionUtils.randomOpenSearchVersion(random())) .previousClusterUUID("prev-cluster-uuid") .globalMetadataFileName("global-metadata-file") + .codecVersion(1) .build(); BlobContainer blobContainer = mockBlobStoreObjects(); @@ -587,7 +650,6 @@ public void testReadLatestMetadataManifestSuccessButIndexMetadataFetchIOExceptio .nodeId("nodeA") .opensearchVersion(VersionUtils.randomOpenSearchVersion(random())) .previousClusterUUID("prev-cluster-uuid") - .globalMetadataFileName("global-metadata") .build(); BlobContainer blobContainer = mockBlobStoreObjects(); @@ -619,6 +681,7 @@ public void testReadLatestMetadataManifestSuccess() throws IOException { .nodeId("nodeA") .opensearchVersion(VersionUtils.randomOpenSearchVersion(random())) .globalMetadataFileName("global-metadata") + .codecVersion(1) .previousClusterUUID("prev-cluster-uuid") .build(); @@ -666,6 +729,7 @@ public void testReadLatestIndexMetadataSuccess() throws IOException { .opensearchVersion(VersionUtils.randomOpenSearchVersion(random())) .previousClusterUUID("prev-cluster-uuid") .globalMetadataFileName("global-metadata") + .codecVersion(1) .build(); mockBlobContainer(mockBlobStoreObjects(), expectedManifest, Map.of(index.getUUID(), indexMetadata)); From 7c77b9e64d2cb68f9cd334d3607319ee03fcc1c9 Mon Sep 17 00:00:00 2001 From: Dhwanil Patel Date: Tue, 17 Oct 2023 12:37:57 +0530 Subject: [PATCH 10/16] Fix gradle check Signed-off-by: Dhwanil Patel --- .../opensearch/gateway/remote/ClusterMetadataManifest.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java index 73ddae6e67ec6..2a9f74c0688b2 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java +++ b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java @@ -244,8 +244,6 @@ public ClusterMetadataManifest( this.indices = Collections.unmodifiableList(indices); this.previousClusterUUID = previousClusterUUID; this.clusterUUIDCommitted = clusterUUIDCommitted; - - System.out.println("Codec version ==== " + this.codecVersion + " global metadata === " + this.globalMetadataFileName); } public ClusterMetadataManifest(StreamInput in) throws IOException { @@ -366,12 +364,10 @@ public String toString() { } public static ClusterMetadataManifest fromXContent(XContentParser parser) throws IOException { - System.out.println(" in from x content ==== "); return PARSER.parse(parser, null); } public static ClusterMetadataManifest fromXContentV2(XContentParser parser) throws IOException { - System.out.println(" in from x content V2 ==== "); return PARSER_V2.parse(parser, null); } From efe4be8aba612d2e34895f6a9a3fb6b6e8bf6d97 Mon Sep 17 00:00:00 2001 From: Dhwanil Patel Date: Tue, 17 Oct 2023 16:22:06 +0530 Subject: [PATCH 11/16] Incorporated comments Signed-off-by: Dhwanil Patel --- .../remote/ClusterMetadataManifest.java | 15 ++++++----- .../remote/RemoteClusterStateService.java | 27 ++++++++++++------- 2 files changed, 26 insertions(+), 16 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java index 2a9f74c0688b2..f6fe75444c467 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java +++ b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java @@ -33,6 +33,9 @@ */ public class ClusterMetadataManifest implements Writeable, ToXContentFragment { + public static final int CODEC_V1 = 1; // In Codec V2 we have introduced global-metadata and codec version in Manifest file. + public static final int CODEC_V2 = 2; // In Codec V2 we have introduced global-metadata and codec version in Manifest file. + private static final ParseField CLUSTER_TERM_FIELD = new ParseField("cluster_term"); private static final ParseField STATE_VERSION_FIELD = new ParseField("state_version"); private static final ParseField CLUSTER_UUID_FIELD = new ParseField("cluster_uuid"); @@ -104,7 +107,7 @@ private static String globalMetadataFileName(Object[] fields) { opensearchVersion(fields), nodeId(fields), committed(fields), - 1, // Default codec version + CODEC_V1, // Default codec version null, // null global metadata for v1 manifest files indices(fields), previousClusterUUID(fields), @@ -131,8 +134,8 @@ private static String globalMetadataFileName(Object[] fields) { ); static { - declareParser(PARSER, 1); - declareParser(PARSER_V2, 2); + declareParser(PARSER, CODEC_V1); + declareParser(PARSER_V2, CODEC_V2); } private static void declareParser(ConstructingObjectParser parser, long codec_version) { @@ -151,7 +154,7 @@ private static void declareParser(ConstructingObjectParser getClusterMetadataManifestBlobStoreFormat(String fileName) { - if (fileName.split(DELIMITER).length < 6) { // Where codec is not part of file name, i.e. codec version 1 is used. + long codecVersion = getManifestCodecVersion(fileName); + if(codecVersion == ClusterMetadataManifest.CODEC_V1) { return CLUSTER_METADATA_MANIFEST_FORMAT; - } else { - long codecVersion = RemoteStoreUtils.invertLong(fileName.split(DELIMITER)[4]); - if (codecVersion == MANIFEST_CODEC_VERSION) { - return CLUSTER_METADATA_MANIFEST_FORMAT_V2; - } + } else if (codecVersion == ClusterMetadataManifest.CODEC_V2){ + return CLUSTER_METADATA_MANIFEST_FORMAT_V2; } + throw new IllegalArgumentException("Cluster metadata manifest file is corrupted, don't have valid codec version"); } + private long getManifestCodecVersion(String fileName) { + if (fileName.split(DELIMITER).length < 6) { // Where codec is not part of file name, i.e. codec version 1 is used. + return ClusterMetadataManifest.CODEC_V1; + } else { + return RemoteStoreUtils.invertLong(fileName.split(DELIMITER)[4]); + } + } + public static String encodeString(String content) { return Base64.getUrlEncoder().withoutPadding().encodeToString(content.getBytes(StandardCharsets.UTF_8)); } From 258160db003498da5969586fc8053b2702dd3b20 Mon Sep 17 00:00:00 2001 From: Dhwanil Patel Date: Tue, 17 Oct 2023 19:39:44 +0530 Subject: [PATCH 12/16] Incorporated comments Signed-off-by: Dhwanil Patel --- .../remote/ClusterMetadataManifest.java | 24 ++++++------ .../remote/RemoteClusterStateService.java | 37 ++++++++++++------- .../RemoteClusterStateServiceTests.java | 4 +- 3 files changed, 38 insertions(+), 27 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java index f6fe75444c467..44c2c61601b66 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java +++ b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java @@ -33,8 +33,8 @@ */ public class ClusterMetadataManifest implements Writeable, ToXContentFragment { - public static final int CODEC_V1 = 1; // In Codec V2 we have introduced global-metadata and codec version in Manifest file. - public static final int CODEC_V2 = 2; // In Codec V2 we have introduced global-metadata and codec version in Manifest file. + public static final int CODEC_V0 = 0; // Default codec version, where we haven't introduced codec versions for manifest. + public static final int CODEC_V1 = 1; // In Codec V1 we have introduced global-metadata and codec version in Manifest file. private static final ParseField CLUSTER_TERM_FIELD = new ParseField("cluster_term"); private static final ParseField STATE_VERSION_FIELD = new ParseField("state_version"); @@ -107,15 +107,15 @@ private static String globalMetadataFileName(Object[] fields) { opensearchVersion(fields), nodeId(fields), committed(fields), - CODEC_V1, // Default codec version - null, // null global metadata for v1 manifest files + CODEC_V0, // Default codec version + null, // null global metadata for manifest files with v0 codec indices(fields), previousClusterUUID(fields), clusterUUIDCommitted(fields) ) ); - private static final ConstructingObjectParser PARSER_V2 = new ConstructingObjectParser<>( + private static final ConstructingObjectParser PARSER_V1 = new ConstructingObjectParser<>( "cluster_metadata_manifest", fields -> new ClusterMetadataManifest( term(fields), @@ -134,8 +134,8 @@ private static String globalMetadataFileName(Object[] fields) { ); static { - declareParser(PARSER, CODEC_V1); - declareParser(PARSER_V2, CODEC_V2); + declareParser(PARSER, CODEC_V0); + declareParser(PARSER_V1, CODEC_V1); } private static void declareParser(ConstructingObjectParser parser, long codec_version) { @@ -154,7 +154,7 @@ private static void declareParser(ConstructingObjectParser CLUSTER_METADATA_MANIFEST_FORMAT = new ChecksumBlobStoreFormat<>( "cluster-metadata-manifest", METADATA_MANIFEST_NAME_FORMAT, ClusterMetadataManifest::fromXContent ); - public static final ChecksumBlobStoreFormat CLUSTER_METADATA_MANIFEST_FORMAT_V2 = - new ChecksumBlobStoreFormat<>("cluster-metadata-manifest", METADATA_MANIFEST_NAME_FORMAT, ClusterMetadataManifest::fromXContentV2); + /** + * Manifest format compatible with codec v1, where we introduced codec versions/global metadata. + */ + public static final ChecksumBlobStoreFormat CLUSTER_METADATA_MANIFEST_FORMAT_V1 = + new ChecksumBlobStoreFormat<>("cluster-metadata-manifest", METADATA_MANIFEST_NAME_FORMAT, ClusterMetadataManifest::fromXContentV1); /** * Used to specify if cluster state metadata should be published to remote store @@ -136,7 +142,8 @@ public class RemoteClusterStateService implements Closeable { private final AtomicBoolean deleteStaleMetadataRunning = new AtomicBoolean(false); private static final int GLOBAL_METADATA_CODEC_VERSION = 1; - private static final int MANIFEST_CODEC_CURRENT_VERSION = ClusterMetadataManifest.CODEC_V2; // TODO remove this once file name change PR is merged + private static final int MANIFEST_CODEC_CURRENT_VERSION = ClusterMetadataManifest.CODEC_V1; // TODO remove this once file name change PR + // is merged // ToXContent Params with gateway mode. // We are using gateway context mode to persist all custom metadata. @@ -242,9 +249,12 @@ public ClusterMetadataManifest writeIncrementalMetadata( assert previousClusterState.metadata().coordinationMetadata().term() == clusterState.metadata().coordinationMetadata().term(); // Write Global Metadata - final boolean updateGlobalMetadata = Metadata.isGlobalStateEquals(previousClusterState.metadata(), clusterState.metadata()) == false; + final boolean updateGlobalMetadata = Metadata.isGlobalStateEquals( + previousClusterState.metadata(), + clusterState.metadata() + ) == false; String globalMetadataFile; - // For migration case from codec V1 to V2, we have added null check on global metadata file, + // For migration case from codec V0 to V1, we have added null check on global metadata file, // If file is empty and codec is 1 then write global metadata. if (updateGlobalMetadata || previousManifest.getGlobalMetadataFileName() == null) { globalMetadataFile = writeGlobalMetadata(clusterState); @@ -556,7 +566,7 @@ private ClusterMetadataManifest uploadManifest( private void writeMetadataManifest(String clusterName, String clusterUUID, ClusterMetadataManifest uploadManifest, String fileName) throws IOException { final BlobContainer metadataManifestContainer = manifestContainer(clusterName, clusterUUID); - CLUSTER_METADATA_MANIFEST_FORMAT_V2.write(uploadManifest, metadataManifestContainer, fileName, blobStoreRepository.getCompressor()); + CLUSTER_METADATA_MANIFEST_FORMAT_V1.write(uploadManifest, metadataManifestContainer, fileName, blobStoreRepository.getCompressor()); } private String fetchPreviousClusterUUID(String clusterName, String clusterUUID) { @@ -923,20 +933,21 @@ private ClusterMetadataManifest fetchRemoteClusterMetadataManifest(String cluste private ChecksumBlobStoreFormat getClusterMetadataManifestBlobStoreFormat(String fileName) { long codecVersion = getManifestCodecVersion(fileName); - if(codecVersion == ClusterMetadataManifest.CODEC_V1) { + if (codecVersion == ClusterMetadataManifest.CODEC_V0) { return CLUSTER_METADATA_MANIFEST_FORMAT; - } else if (codecVersion == ClusterMetadataManifest.CODEC_V2){ - return CLUSTER_METADATA_MANIFEST_FORMAT_V2; + } else if (codecVersion == ClusterMetadataManifest.CODEC_V1) { + return CLUSTER_METADATA_MANIFEST_FORMAT_V1; } throw new IllegalArgumentException("Cluster metadata manifest file is corrupted, don't have valid codec version"); } - private long getManifestCodecVersion(String fileName) { - if (fileName.split(DELIMITER).length < 6) { // Where codec is not part of file name, i.e. codec version 1 is used. - return ClusterMetadataManifest.CODEC_V1; + private int getManifestCodecVersion(String fileName) { + if (fileName.split(DELIMITER).length < 6) { // Where codec is not part of file name, i.e. default codec version 0 is used. + return ClusterMetadataManifest.CODEC_V0; } else { - return RemoteStoreUtils.invertLong(fileName.split(DELIMITER)[4]); + String[] splitName = fileName.split(DELIMITER); + return Integer.parseInt(splitName[splitName.length - 1]); // Last value would be codec version. } } diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index b6d1ab874cbec..17d16701eb97f 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -358,13 +358,13 @@ public void testWriteIncrementalMetadataSuccess() throws IOException { } /* - * Here we will verify the migration of manifest file from codec V1 and V2. + * Here we will verify the migration of manifest file from codec V0 and V1. * * Initially codec version is 1 and global metadata is also null, we will perform index metadata update. * In final manifest codec version should be 2 and * global metadata should be updated, even if it was not changed in this cluster state update */ - public void testMigrationFromCodecV1ManifestToCodecV2Manifest() throws IOException { + public void testMigrationFromCodecV0ManifestToCodecV1Manifest() throws IOException { mockBlobStoreObjects(); final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder().term(1L).build(); final ClusterState previousClusterState = ClusterState.builder(ClusterName.DEFAULT) From 03167adde0677bac445759a0ca2b31c364b37d60 Mon Sep 17 00:00:00 2001 From: Dhwanil Patel Date: Wed, 18 Oct 2023 11:53:45 +0530 Subject: [PATCH 13/16] FIxed UTs Signed-off-by: Dhwanil Patel --- .../remote/ClusterMetadataManifestTests.java | 8 ++++---- .../remote/RemoteClusterStateServiceTests.java | 17 +++++++---------- 2 files changed, 11 insertions(+), 14 deletions(-) diff --git a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java index d1d9137c36853..21876acdffe38 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java @@ -36,7 +36,7 @@ public void testClusterMetadataManifestXContent() throws IOException { Version.CURRENT, "test-node-id", false, - 1, + ClusterMetadataManifest.CODEC_V0, null, Collections.singletonList(uploadedIndexMetadata), "prev-cluster-uuid", @@ -53,7 +53,7 @@ public void testClusterMetadataManifestXContent() throws IOException { } } - public void testClusterMetadataManifestXContentCodecV2() throws IOException { + public void testClusterMetadataManifestXContentCodecV1() throws IOException { UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "test-uuid", "/test/upload/path"); ClusterMetadataManifest originalManifest = new ClusterMetadataManifest( 1L, @@ -63,7 +63,7 @@ public void testClusterMetadataManifestXContentCodecV2() throws IOException { Version.CURRENT, "test-node-id", false, - 2, + ClusterMetadataManifest.CODEC_V1, "test-global-metadata-file", Collections.singletonList(uploadedIndexMetadata), "prev-cluster-uuid", @@ -75,7 +75,7 @@ public void testClusterMetadataManifestXContentCodecV2() throws IOException { builder.endObject(); try (XContentParser parser = createParser(JsonXContent.jsonXContent, BytesReference.bytes(builder))) { - final ClusterMetadataManifest fromXContentManifest = ClusterMetadataManifest.fromXContentV2(parser); + final ClusterMetadataManifest fromXContentManifest = ClusterMetadataManifest.fromXContentV1(parser); assertEquals(originalManifest, fromXContentManifest); } } diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index b5f72d204d361..57ea73ba9619c 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -386,9 +386,9 @@ public void testMigrationFromCodecV0ManifestToCodecV1Manifest() throws IOExcepti Metadata newMetadata = Metadata.builder(previousClusterState.metadata()).put(indexMetadata, true).build(); ClusterState newClusterState = ClusterState.builder(previousClusterState).metadata(newMetadata).build(); - // previous manifest with codec 1 and null global metadata + // previous manifest with codec 0 and null global metadata final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder() - .codecVersion(1) + .codecVersion(ClusterMetadataManifest.CODEC_V0) .globalMetadataFileName(null) .indices(Collections.emptyList()) .build(); @@ -402,8 +402,8 @@ public void testMigrationFromCodecV0ManifestToCodecV1Manifest() throws IOExcepti // global metadata is updated assertThat(manifestAfterUpdate.getGlobalMetadataFileName(), notNullValue()); - // Manifest file with codec version with 2 is updated. - assertThat(manifestAfterUpdate.getCodecVersion(), is(2)); + // Manifest file with codec version with 1 is updated. + assertThat(manifestAfterUpdate.getCodecVersion(), is(ClusterMetadataManifest.CODEC_V1)); } public void testWriteIncrementalGlobalMetadataSuccess() throws IOException { @@ -626,8 +626,7 @@ public void testReadLatestMetadataManifestSuccessButNoIndexMetadata() throws IOE .nodeId("nodeA") .opensearchVersion(VersionUtils.randomOpenSearchVersion(random())) .previousClusterUUID("prev-cluster-uuid") - .globalMetadataFileName("global-metadata-file") - .codecVersion(1) + .codecVersion(ClusterMetadataManifest.CODEC_V0) .build(); BlobContainer blobContainer = mockBlobStoreObjects(); @@ -684,8 +683,7 @@ public void testReadLatestMetadataManifestSuccess() throws IOException { .clusterUUID("cluster-uuid") .nodeId("nodeA") .opensearchVersion(VersionUtils.randomOpenSearchVersion(random())) - .globalMetadataFileName("global-metadata") - .codecVersion(1) + .codecVersion(ClusterMetadataManifest.CODEC_V0) .previousClusterUUID("prev-cluster-uuid") .build(); @@ -732,8 +730,7 @@ public void testReadLatestIndexMetadataSuccess() throws IOException { .nodeId("nodeA") .opensearchVersion(VersionUtils.randomOpenSearchVersion(random())) .previousClusterUUID("prev-cluster-uuid") - .globalMetadataFileName("global-metadata") - .codecVersion(1) + .codecVersion(ClusterMetadataManifest.CODEC_V0) .build(); mockBlobContainer(mockBlobStoreObjects(), expectedManifest, Map.of(index.getUUID(), indexMetadata)); From 00474a9b8474656eab0f7bdf8524c68517c467a2 Mon Sep 17 00:00:00 2001 From: Dhwanil Patel Date: Wed, 18 Oct 2023 12:04:32 +0530 Subject: [PATCH 14/16] Fix conflicts Signed-off-by: Dhwanil Patel --- CHANGELOG.md | 4 ---- .../gateway/remote/RemoteClusterStateServiceTests.java | 2 +- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5635fe823c8a9..243423223ca52 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -122,8 +122,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Security [Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.x...HEAD -<<<<<<< HEAD -[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.11...2.x -======= [Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.12...2.x ->>>>>>> upstream/main diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index 57ea73ba9619c..f1bc916a22d92 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -68,9 +68,9 @@ import static org.opensearch.gateway.remote.RemoteClusterStateService.DELIMITER; import static org.opensearch.gateway.remote.RemoteClusterStateService.FORMAT_PARAMS; import static org.opensearch.gateway.remote.RemoteClusterStateService.INDEX_METADATA_CURRENT_CODEC_VERSION; -import static org.opensearch.gateway.remote.RemoteClusterStateService.METADATA_FILE_PREFIX; import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_CURRENT_CODEC_VERSION; import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_FILE_PREFIX; +import static org.opensearch.gateway.remote.RemoteClusterStateService.METADATA_FILE_PREFIX; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT; From 7f92d597eedb2d6c4741b647f6c81434b02e89df Mon Sep 17 00:00:00 2001 From: Dhwanil Patel Date: Wed, 18 Oct 2023 14:55:52 +0530 Subject: [PATCH 15/16] Incorporated comments Signed-off-by: Dhwanil Patel --- .../remote/ClusterMetadataManifest.java | 24 ++++++++------ .../remote/RemoteClusterStateService.java | 33 +++++++++++-------- .../remote/ClusterMetadataManifestTests.java | 8 ++--- .../RemoteClusterStateServiceTests.java | 14 ++++---- 4 files changed, 44 insertions(+), 35 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java index 44c2c61601b66..58130d6fde375 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java +++ b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java @@ -33,7 +33,7 @@ */ public class ClusterMetadataManifest implements Writeable, ToXContentFragment { - public static final int CODEC_V0 = 0; // Default codec version, where we haven't introduced codec versions for manifest. + public static final int CODEC_V0 = 0; // Older codec version, where we haven't introduced codec versions for manifest. public static final int CODEC_V1 = 1; // In Codec V1 we have introduced global-metadata and codec version in Manifest file. private static final ParseField CLUSTER_TERM_FIELD = new ParseField("cluster_term"); @@ -97,7 +97,7 @@ private static String globalMetadataFileName(Object[] fields) { return (String) fields[11]; } - private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + private static final ConstructingObjectParser PARSER_V0 = new ConstructingObjectParser<>( "cluster_metadata_manifest", fields -> new ClusterMetadataManifest( term(fields), @@ -107,8 +107,8 @@ private static String globalMetadataFileName(Object[] fields) { opensearchVersion(fields), nodeId(fields), committed(fields), - CODEC_V0, // Default codec version - null, // null global metadata for manifest files with v0 codec + CODEC_V0, + null, indices(fields), previousClusterUUID(fields), clusterUUIDCommitted(fields) @@ -134,7 +134,7 @@ private static String globalMetadataFileName(Object[] fields) { ); static { - declareParser(PARSER, CODEC_V0); + declareParser(PARSER_V0, CODEC_V0); declareParser(PARSER_V1, CODEC_V1); } @@ -154,7 +154,7 @@ private static void declareParser(ConstructingObjectParser= CODEC_V1) { parser.declareInt(ConstructingObjectParser.constructorArg(), CODEC_VERSION_FIELD); parser.declareString(ConstructingObjectParser.constructorArg(), GLOBAL_METADATA_FIELD); } @@ -295,7 +295,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.endArray(); builder.field(PREVIOUS_CLUSTER_UUID.getPreferredName(), getPreviousClusterUUID()); builder.field(CLUSTER_UUID_COMMITTED.getPreferredName(), isClusterUUIDCommitted()); - if (getCodecVersion() == CODEC_V1) { + if (onOrAfterCodecVersion(CODEC_V1)) { builder.field(CODEC_VERSION_FIELD.getPreferredName(), getCodecVersion()); builder.field(GLOBAL_METADATA_FIELD.getPreferredName(), getGlobalMetadataFileName()); } @@ -366,11 +366,15 @@ public String toString() { return Strings.toString(MediaTypeRegistry.JSON, this); } - public static ClusterMetadataManifest fromXContent(XContentParser parser) throws IOException { - return PARSER.parse(parser, null); + public boolean onOrAfterCodecVersion(int codecVersion) { + return this.codecVersion >= codecVersion; + } + + public static ClusterMetadataManifest fromXContentV0(XContentParser parser) throws IOException { + return PARSER_V0.parse(parser, null); } - public static ClusterMetadataManifest fromXContentV1(XContentParser parser) throws IOException { + public static ClusterMetadataManifest fromXContent(XContentParser parser) throws IOException { return PARSER_V1.parse(parser, null); } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index a36ba0809c421..5ae34e8e25cec 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -83,6 +83,7 @@ public class RemoteClusterStateService implements Closeable { private static final Logger logger = LogManager.getLogger(RemoteClusterStateService.class); + // TODO make this two variable as dynamic setting [issue: #10688] public static final int INDEX_METADATA_UPLOAD_WAIT_MILLIS = 20000; public static final int GLOBAL_METADATA_UPLOAD_WAIT_MILLIS = 20000; @@ -101,18 +102,18 @@ public class RemoteClusterStateService implements Closeable { /** * Manifest format compatible with older codec v0, where codec version was missing. */ + public static final ChecksumBlobStoreFormat CLUSTER_METADATA_MANIFEST_FORMAT_V0 = + new ChecksumBlobStoreFormat<>("cluster-metadata-manifest", METADATA_MANIFEST_NAME_FORMAT, ClusterMetadataManifest::fromXContentV0); + + /** + * Manifest format compatible with codec v1, where we introduced codec versions/global metadata. + */ public static final ChecksumBlobStoreFormat CLUSTER_METADATA_MANIFEST_FORMAT = new ChecksumBlobStoreFormat<>( "cluster-metadata-manifest", METADATA_MANIFEST_NAME_FORMAT, ClusterMetadataManifest::fromXContent ); - /** - * Manifest format compatible with codec v1, where we introduced codec versions/global metadata. - */ - public static final ChecksumBlobStoreFormat CLUSTER_METADATA_MANIFEST_FORMAT_V1 = - new ChecksumBlobStoreFormat<>("cluster-metadata-manifest", METADATA_MANIFEST_NAME_FORMAT, ClusterMetadataManifest::fromXContentV1); - /** * Used to specify if cluster state metadata should be published to remote store */ @@ -129,6 +130,7 @@ public class RemoteClusterStateService implements Closeable { public static final String MANIFEST_PATH_TOKEN = "manifest"; public static final String MANIFEST_FILE_PREFIX = "manifest"; public static final String METADATA_FILE_PREFIX = "metadata"; + public static final int SPLITED_MANIFEST_FILE_LENGTH = 6; // file name manifest__term__version__C/P__timestamp__codecversion private final String nodeId; private final Supplier repositoriesService; @@ -366,7 +368,7 @@ private String writeGlobalMetadata(ClusterState clusterState) throws IOException try { if (latch.await(GLOBAL_METADATA_UPLOAD_WAIT_MILLIS, TimeUnit.MILLISECONDS) == false) { - // TODO: We should add metrics where transfer is timing out. + // TODO: We should add metrics where transfer is timing out. [Issue: #10687] GlobalMetadataTransferException ex = new GlobalMetadataTransferException( String.format(Locale.ROOT, "Timed out waiting for transfer of global metadata to complete") ); @@ -566,7 +568,7 @@ private ClusterMetadataManifest uploadManifest( private void writeMetadataManifest(String clusterName, String clusterUUID, ClusterMetadataManifest uploadManifest, String fileName) throws IOException { final BlobContainer metadataManifestContainer = manifestContainer(clusterName, clusterUUID); - CLUSTER_METADATA_MANIFEST_FORMAT_V1.write(uploadManifest, metadataManifestContainer, fileName, blobStoreRepository.getCompressor()); + CLUSTER_METADATA_MANIFEST_FORMAT.write(uploadManifest, metadataManifestContainer, fileName, blobStoreRepository.getCompressor()); } private String fetchPreviousClusterUUID(String clusterName, String clusterUUID) { @@ -935,21 +937,24 @@ private ClusterMetadataManifest fetchRemoteClusterMetadataManifest(String cluste private ChecksumBlobStoreFormat getClusterMetadataManifestBlobStoreFormat(String fileName) { long codecVersion = getManifestCodecVersion(fileName); - if (codecVersion == ClusterMetadataManifest.CODEC_V0) { + if (codecVersion == ClusterMetadataManifest.CODEC_V1) { return CLUSTER_METADATA_MANIFEST_FORMAT; - } else if (codecVersion == ClusterMetadataManifest.CODEC_V1) { - return CLUSTER_METADATA_MANIFEST_FORMAT_V1; + } else if (codecVersion == ClusterMetadataManifest.CODEC_V0) { + return CLUSTER_METADATA_MANIFEST_FORMAT_V0; } throw new IllegalArgumentException("Cluster metadata manifest file is corrupted, don't have valid codec version"); } private int getManifestCodecVersion(String fileName) { - if (fileName.split(DELIMITER).length < 6) { // Where codec is not part of file name, i.e. default codec version 0 is used. + String[] splitName = fileName.split(DELIMITER); + if (splitName.length == SPLITED_MANIFEST_FILE_LENGTH) { + return Integer.parseInt(splitName[splitName.length - 1]); // Last value would be codec version. + } else if (splitName.length < SPLITED_MANIFEST_FILE_LENGTH) { // Where codec is not part of file name, i.e. default codec version 0 + // is used. return ClusterMetadataManifest.CODEC_V0; } else { - String[] splitName = fileName.split(DELIMITER); - return Integer.parseInt(splitName[splitName.length - 1]); // Last value would be codec version. + throw new IllegalArgumentException("Manifest file name is corrupted"); } } diff --git a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java index 21876acdffe38..6c9a3201656d7 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java @@ -26,7 +26,7 @@ public class ClusterMetadataManifestTests extends OpenSearchTestCase { - public void testClusterMetadataManifestXContent() throws IOException { + public void testClusterMetadataManifestXContentV0() throws IOException { UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "test-uuid", "/test/upload/path"); ClusterMetadataManifest originalManifest = new ClusterMetadataManifest( 1L, @@ -48,12 +48,12 @@ public void testClusterMetadataManifestXContent() throws IOException { builder.endObject(); try (XContentParser parser = createParser(JsonXContent.jsonXContent, BytesReference.bytes(builder))) { - final ClusterMetadataManifest fromXContentManifest = ClusterMetadataManifest.fromXContent(parser); + final ClusterMetadataManifest fromXContentManifest = ClusterMetadataManifest.fromXContentV0(parser); assertEquals(originalManifest, fromXContentManifest); } } - public void testClusterMetadataManifestXContentCodecV1() throws IOException { + public void testClusterMetadataManifestXContent() throws IOException { UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "test-uuid", "/test/upload/path"); ClusterMetadataManifest originalManifest = new ClusterMetadataManifest( 1L, @@ -75,7 +75,7 @@ public void testClusterMetadataManifestXContentCodecV1() throws IOException { builder.endObject(); try (XContentParser parser = createParser(JsonXContent.jsonXContent, BytesReference.bytes(builder))) { - final ClusterMetadataManifest fromXContentManifest = ClusterMetadataManifest.fromXContentV1(parser); + final ClusterMetadataManifest fromXContentManifest = ClusterMetadataManifest.fromXContent(parser); assertEquals(originalManifest, fromXContentManifest); } } diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index f1bc916a22d92..ddee3bcf10382 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -265,7 +265,7 @@ public void testWriteFullMetadataInParallelSuccess() throws IOException { } - public void testWriteFullMetadataInParallelFailureForGlobalMetadata() throws IOException { + public void testWriteFullMetadataFailureForGlobalMetadata() throws IOException { final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); AsyncMultiStreamBlobContainer container = (AsyncMultiStreamBlobContainer) mockBlobStoreObjects(AsyncMultiStreamBlobContainer.class); @@ -364,8 +364,8 @@ public void testWriteIncrementalMetadataSuccess() throws IOException { /* * Here we will verify the migration of manifest file from codec V0 and V1. * - * Initially codec version is 1 and global metadata is also null, we will perform index metadata update. - * In final manifest codec version should be 2 and + * Initially codec version is 0 and global metadata is also null, we will perform index metadata update. + * In final manifest codec version should be 1 and * global metadata should be updated, even if it was not changed in this cluster state update */ public void testMigrationFromCodecV0ManifestToCodecV1Manifest() throws IOException { @@ -445,9 +445,9 @@ public void testWriteIncrementalGlobalMetadataSuccess() throws IOException { } /* - * Here we will verify global metadata is not uploaded again if change is only in index metadata + * Here we will verify index metadata is not uploaded again if change is only in global metadata */ - public void testGlobalMetadataNotUpdatingIndexMetadata() throws IOException { + public void testGlobalMetadataOnlyUpdated() throws IOException { // setup mockBlobStoreObjects(); final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder().term(1L).build(); @@ -508,9 +508,9 @@ public void testGlobalMetadataNotUpdatingIndexMetadata() throws IOException { } /* - * Here we will verify index metadata is not uploaded again if change is only in global metadata + * Here we will verify global metadata is not uploaded again if change is only in index metadata */ - public void testIndexMetadataNotUpdatingGlobalMetadata() throws IOException { + public void testIndexMetadataOnlyUpdated() throws IOException { // setup mockBlobStoreObjects(); final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder().term(1L).build(); From 1aa7bcf65c16a49157ee578f2bf2d8ffe0d4acf4 Mon Sep 17 00:00:00 2001 From: Dhwanil Patel Date: Wed, 18 Oct 2023 16:25:09 +0530 Subject: [PATCH 16/16] Minor comments fix Signed-off-by: Dhwanil Patel --- .../opensearch/gateway/remote/ClusterMetadataManifest.java | 4 +++- .../opensearch/gateway/remote/RemoteClusterStateService.java | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java index 58130d6fde375..97b37d9532f85 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java +++ b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java @@ -133,6 +133,8 @@ private static String globalMetadataFileName(Object[] fields) { ) ); + private static final ConstructingObjectParser CURRENT_PARSER = PARSER_V1; + static { declareParser(PARSER_V0, CODEC_V0); declareParser(PARSER_V1, CODEC_V1); @@ -375,7 +377,7 @@ public static ClusterMetadataManifest fromXContentV0(XContentParser parser) thro } public static ClusterMetadataManifest fromXContent(XContentParser parser) throws IOException { - return PARSER_V1.parse(parser, null); + return CURRENT_PARSER.parse(parser, null); } /** diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 5ae34e8e25cec..2092c2a0aac3f 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -937,7 +937,7 @@ private ClusterMetadataManifest fetchRemoteClusterMetadataManifest(String cluste private ChecksumBlobStoreFormat getClusterMetadataManifestBlobStoreFormat(String fileName) { long codecVersion = getManifestCodecVersion(fileName); - if (codecVersion == ClusterMetadataManifest.CODEC_V1) { + if (codecVersion == MANIFEST_CURRENT_CODEC_VERSION) { return CLUSTER_METADATA_MANIFEST_FORMAT; } else if (codecVersion == ClusterMetadataManifest.CODEC_V0) { return CLUSTER_METADATA_MANIFEST_FORMAT_V0;