From 11ded3ed36ecd8bcc3bb1ad8f244dbaee4b29e15 Mon Sep 17 00:00:00 2001 From: Shivansh Arora Date: Mon, 3 Jun 2024 04:13:52 +0530 Subject: [PATCH] Create Remote Object managers and use them in orchestration from RemoteClusterStateService Signed-off-by: Shivansh Arora --- .../remote/RemoteClusterStateServiceIT.java | 12 +- .../remote/RemoteClusterStateService.java | 981 +++--------------- .../remote/RemoteClusterStateUtils.java | 102 ++ .../remote/RemoteGlobalMetadataManager.java | 352 +++++++ .../remote/RemoteIndexMetadataManager.java | 133 +++ .../gateway/remote/RemoteManifestManager.java | 313 ++++++ .../remote/ClusterMetadataManifestTests.java | 37 +- ...RemoteClusterStateCleanupManagerTests.java | 37 +- .../RemoteClusterStateServiceTests.java | 215 +--- .../RemoteGlobalMetadataManagerTests.java | 63 ++ .../RemoteIndexMetadataManagerTests.java | 62 ++ .../remote/RemoteManifestManagerTests.java | 106 ++ 12 files changed, 1387 insertions(+), 1026 deletions(-) create mode 100644 server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateUtils.java create mode 100644 server/src/main/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManager.java create mode 100644 server/src/main/java/org/opensearch/gateway/remote/RemoteIndexMetadataManager.java create mode 100644 server/src/main/java/org/opensearch/gateway/remote/RemoteManifestManager.java create mode 100644 server/src/test/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManagerTests.java create mode 100644 server/src/test/java/org/opensearch/gateway/remote/RemoteIndexMetadataManagerTests.java create mode 100644 server/src/test/java/org/opensearch/gateway/remote/RemoteManifestManagerTests.java diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java index ab2f0f0080566..bf23d2b50616b 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteClusterStateServiceIT.java @@ -26,13 +26,13 @@ import java.util.function.Function; import java.util.stream.Collectors; -import static org.opensearch.gateway.remote.RemoteClusterStateService.COORDINATION_METADATA; -import static org.opensearch.gateway.remote.RemoteClusterStateService.CUSTOM_METADATA; -import static org.opensearch.gateway.remote.RemoteClusterStateService.DELIMITER; -import static org.opensearch.gateway.remote.RemoteClusterStateService.METADATA_FILE_PREFIX; import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING; -import static org.opensearch.gateway.remote.RemoteClusterStateService.SETTING_METADATA; -import static org.opensearch.gateway.remote.RemoteClusterStateService.TEMPLATES_METADATA; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.METADATA_FILE_PREFIX; +import static org.opensearch.gateway.remote.RemoteGlobalMetadataManager.COORDINATION_METADATA; +import static org.opensearch.gateway.remote.RemoteGlobalMetadataManager.CUSTOM_METADATA; +import static org.opensearch.gateway.remote.RemoteGlobalMetadataManager.SETTING_METADATA; +import static org.opensearch.gateway.remote.RemoteGlobalMetadataManager.TEMPLATES_METADATA; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class RemoteClusterStateServiceIT extends RemoteStoreBaseIntegTestCase { diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 01ffd8f1cca46..e485908547d8a 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -11,19 +11,15 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.opensearch.Version; import org.opensearch.action.LatchedActionListener; +import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; -import org.opensearch.cluster.coordination.CoordinationMetadata; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.Metadata; -import org.opensearch.cluster.metadata.TemplatesMetadata; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.CheckedRunnable; import org.opensearch.common.Nullable; import org.opensearch.common.blobstore.BlobContainer; -import org.opensearch.common.blobstore.BlobMetadata; -import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStore; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; @@ -35,40 +31,41 @@ import org.opensearch.core.xcontent.ToXContent; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute; -import org.opensearch.index.remote.RemoteStoreUtils; import org.opensearch.index.translog.transfer.BlobStoreTransferService; import org.opensearch.node.Node; import org.opensearch.node.remotestore.RemoteStoreNodeAttribute; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; import org.opensearch.repositories.blobstore.BlobStoreRepository; -import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat; import org.opensearch.threadpool.ThreadPool; import java.io.Closeable; import java.io.IOException; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.Base64; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.LongSupplier; import java.util.function.Supplier; import java.util.stream.Collectors; -import static java.util.Objects.requireNonNull; import static org.opensearch.gateway.PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.UploadedMetadataResults; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.clusterUUIDContainer; +import static org.opensearch.gateway.remote.RemoteGlobalMetadataManager.COORDINATION_METADATA; +import static org.opensearch.gateway.remote.RemoteGlobalMetadataManager.CUSTOM_DELIMITER; +import static org.opensearch.gateway.remote.RemoteGlobalMetadataManager.CUSTOM_METADATA; +import static org.opensearch.gateway.remote.RemoteGlobalMetadataManager.SETTING_METADATA; +import static org.opensearch.gateway.remote.RemoteGlobalMetadataManager.TEMPLATES_METADATA; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled; /** @@ -78,99 +75,8 @@ */ public class RemoteClusterStateService implements Closeable { - public static final String METADATA_NAME_FORMAT = "%s.dat"; - - public static final String METADATA_MANIFEST_NAME_FORMAT = "%s"; - - public static final String DELIMITER = "__"; - public static final String CUSTOM_DELIMITER = "--"; - private static final Logger logger = LogManager.getLogger(RemoteClusterStateService.class); - public static final TimeValue INDEX_METADATA_UPLOAD_TIMEOUT_DEFAULT = TimeValue.timeValueMillis(20000); - - public static final TimeValue GLOBAL_METADATA_UPLOAD_TIMEOUT_DEFAULT = TimeValue.timeValueMillis(20000); - - public static final TimeValue METADATA_MANIFEST_UPLOAD_TIMEOUT_DEFAULT = TimeValue.timeValueMillis(20000); - - public static final Setting INDEX_METADATA_UPLOAD_TIMEOUT_SETTING = Setting.timeSetting( - "cluster.remote_store.state.index_metadata.upload_timeout", - INDEX_METADATA_UPLOAD_TIMEOUT_DEFAULT, - Setting.Property.Dynamic, - Setting.Property.NodeScope - ); - - public static final Setting GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING = Setting.timeSetting( - "cluster.remote_store.state.global_metadata.upload_timeout", - GLOBAL_METADATA_UPLOAD_TIMEOUT_DEFAULT, - Setting.Property.Dynamic, - Setting.Property.NodeScope - ); - - public static final Setting METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING = Setting.timeSetting( - "cluster.remote_store.state.metadata_manifest.upload_timeout", - METADATA_MANIFEST_UPLOAD_TIMEOUT_DEFAULT, - Setting.Property.Dynamic, - Setting.Property.NodeScope - ); - - public static final ChecksumBlobStoreFormat INDEX_METADATA_FORMAT = new ChecksumBlobStoreFormat<>( - "index-metadata", - METADATA_NAME_FORMAT, - IndexMetadata::fromXContent - ); - - public static final ChecksumBlobStoreFormat GLOBAL_METADATA_FORMAT = new ChecksumBlobStoreFormat<>( - "metadata", - METADATA_NAME_FORMAT, - Metadata::fromXContent - ); - - public static final ChecksumBlobStoreFormat COORDINATION_METADATA_FORMAT = new ChecksumBlobStoreFormat<>( - "coordination", - METADATA_NAME_FORMAT, - CoordinationMetadata::fromXContent - ); - - public static final ChecksumBlobStoreFormat SETTINGS_METADATA_FORMAT = new ChecksumBlobStoreFormat<>( - "settings", - METADATA_NAME_FORMAT, - Settings::fromXContent - ); - - public static final ChecksumBlobStoreFormat TEMPLATES_METADATA_FORMAT = new ChecksumBlobStoreFormat<>( - "templates", - METADATA_NAME_FORMAT, - TemplatesMetadata::fromXContent - ); - - public static final ChecksumBlobStoreFormat CUSTOM_METADATA_FORMAT = new ChecksumBlobStoreFormat<>( - "custom", - METADATA_NAME_FORMAT, - null // no need of reader here, as this object is only used to write/serialize the object - ); - - /** - * Manifest format compatible with older codec v0, where codec version was missing. - */ - public static final ChecksumBlobStoreFormat CLUSTER_METADATA_MANIFEST_FORMAT_V0 = - new ChecksumBlobStoreFormat<>("cluster-metadata-manifest", METADATA_MANIFEST_NAME_FORMAT, ClusterMetadataManifest::fromXContentV0); - - /** - * Manifest format compatible with older codec v1, where codec versions/global metadata was introduced. - */ - public static final ChecksumBlobStoreFormat CLUSTER_METADATA_MANIFEST_FORMAT_V1 = - new ChecksumBlobStoreFormat<>("cluster-metadata-manifest", METADATA_MANIFEST_NAME_FORMAT, ClusterMetadataManifest::fromXContentV1); - - /** - * Manifest format compatible with codec v2, where global metadata file is replaced with multiple metadata attribute files - */ - public static final ChecksumBlobStoreFormat CLUSTER_METADATA_MANIFEST_FORMAT = new ChecksumBlobStoreFormat<>( - "cluster-metadata-manifest", - METADATA_MANIFEST_NAME_FORMAT, - ClusterMetadataManifest::fromXContent - ); - /** * Used to specify if cluster state metadata should be published to remote store */ @@ -181,18 +87,6 @@ public class RemoteClusterStateService implements Closeable { Property.Final ); - public static final String CLUSTER_STATE_PATH_TOKEN = "cluster-state"; - public static final String INDEX_PATH_TOKEN = "index"; - public static final String GLOBAL_METADATA_PATH_TOKEN = "global-metadata"; - public static final String MANIFEST_PATH_TOKEN = "manifest"; - public static final String MANIFEST_FILE_PREFIX = "manifest"; - public static final String METADATA_FILE_PREFIX = "metadata"; - public static final String COORDINATION_METADATA = "coordination"; - public static final String SETTING_METADATA = "settings"; - public static final String TEMPLATES_METADATA = "templates"; - public static final String CUSTOM_METADATA = "custom"; - public static final int SPLITED_MANIFEST_FILE_LENGTH = 6; // file name manifest__term__version__C/P__timestamp__codecversion - private final String nodeId; private final Supplier repositoriesService; private final Settings settings; @@ -200,21 +94,20 @@ public class RemoteClusterStateService implements Closeable { private final ThreadPool threadpool; private final List indexMetadataUploadListeners; private BlobStoreRepository blobStoreRepository; - private BlobStoreTransferService blobStoreTransferService; private volatile TimeValue slowWriteLoggingThreshold; - private volatile TimeValue indexMetadataUploadTimeout; - private volatile TimeValue globalMetadataUploadTimeout; - private volatile TimeValue metadataManifestUploadTimeout; - private RemoteClusterStateCleanupManager remoteClusterStateCleanupManager; private final RemotePersistenceStats remoteStateStats; + private RemoteClusterStateCleanupManager remoteClusterStateCleanupManager; + private RemoteIndexMetadataManager remoteIndexMetadataManager; + private RemoteGlobalMetadataManager remoteGlobalMetadataManager; + private RemoteManifestManager remoteManifestManager; + private ClusterSettings clusterSettings; + private BlobStoreTransferService blobStoreTransferService; private final String CLUSTER_STATE_UPLOAD_TIME_LOG_STRING = "writing cluster state for version [{}] took [{}ms]"; private final String METADATA_UPDATE_LOG_STRING = "wrote metadata for [{}] indices and skipped [{}] unchanged " + "indices, coordination metadata updated : [{}], settings metadata updated : [{}], templates metadata " + "updated : [{}], custom metadata updated : [{}]"; public static final int INDEX_METADATA_CURRENT_CODEC_VERSION = 1; - public static final int MANIFEST_CURRENT_CODEC_VERSION = ClusterMetadataManifest.CODEC_V2; - public static final int GLOBAL_METADATA_CURRENT_CODEC_VERSION = 2; // ToXContent Params with gateway mode. // We are using gateway context mode to persist all custom metadata. @@ -241,15 +134,9 @@ public RemoteClusterStateService( this.settings = settings; this.relativeTimeNanosSupplier = relativeTimeNanosSupplier; this.threadpool = threadPool; - ClusterSettings clusterSettings = clusterService.getClusterSettings(); + clusterSettings = clusterService.getClusterSettings(); this.slowWriteLoggingThreshold = clusterSettings.get(SLOW_WRITE_LOGGING_THRESHOLD); - this.indexMetadataUploadTimeout = clusterSettings.get(INDEX_METADATA_UPLOAD_TIMEOUT_SETTING); - this.globalMetadataUploadTimeout = clusterSettings.get(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING); - this.metadataManifestUploadTimeout = clusterSettings.get(METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING); clusterSettings.addSettingsUpdateConsumer(SLOW_WRITE_LOGGING_THRESHOLD, this::setSlowWriteLoggingThreshold); - clusterSettings.addSettingsUpdateConsumer(INDEX_METADATA_UPLOAD_TIMEOUT_SETTING, this::setIndexMetadataUploadTimeout); - clusterSettings.addSettingsUpdateConsumer(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING, this::setGlobalMetadataUploadTimeout); - clusterSettings.addSettingsUpdateConsumer(METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING, this::setMetadataManifestUploadTimeout); this.remoteStateStats = new RemotePersistenceStats(); this.remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(this, clusterService); this.indexMetadataUploadListeners = indexMetadataUploadListeners; @@ -278,14 +165,11 @@ public ClusterMetadataManifest writeFullMetadata(ClusterState clusterState, Stri true, true ); - final ClusterMetadataManifest manifest = uploadManifest( + + final ClusterMetadataManifest manifest = remoteManifestManager.uploadManifest( clusterState, - uploadedMetadataResults.uploadedIndexMetadata, + uploadedMetadataResults, previousClusterUUID, - uploadedMetadataResults.uploadedCoordinationMetadata, - uploadedMetadataResults.uploadedSettingsMetadata, - uploadedMetadataResults.uploadedTemplatesMetadata, - uploadedMetadataResults.uploadedCustomMetadataMap, false ); final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos); @@ -329,15 +213,16 @@ public ClusterMetadataManifest writeIncrementalMetadata( assert previousClusterState.metadata().coordinationMetadata().term() == clusterState.metadata().coordinationMetadata().term(); final Map customsToBeDeletedFromRemote = new HashMap<>(previousManifest.getCustomMetadataMap()); - final Map customsToUpload = getUpdatedCustoms(clusterState, previousClusterState); + final Map customsToUpload = remoteGlobalMetadataManager.getUpdatedCustoms( + clusterState, + previousClusterState + ); final Map allUploadedCustomMap = new HashMap<>(previousManifest.getCustomMetadataMap()); for (final String custom : clusterState.metadata().customs().keySet()) { // remove all the customs which are present currently customsToBeDeletedFromRemote.remove(custom); } - final Map indicesToBeDeletedFromRemote = new HashMap<>(previousClusterState.metadata().indices()); - int numIndicesUpdated = 0; int numIndicesUnchanged = 0; final Map allUploadedIndexMetadata = previousManifest.getIndices() @@ -379,6 +264,11 @@ public ClusterMetadataManifest writeIncrementalMetadata( boolean updateTemplatesMetadata = firstUploadForSplitGlobalMetadata || Metadata.isTemplatesMetadataEqual(previousClusterState.metadata(), clusterState.metadata()) == false; + final Map previousStateIndexMetadataByName = new HashMap<>(); + for (final IndexMetadata indexMetadata : previousClusterState.metadata().indices().values()) { + previousStateIndexMetadataByName.put(indexMetadata.getIndex().getName(), indexMetadata); + } + uploadedMetadataResults = writeMetadataInParallel( clusterState, toUpload, @@ -398,16 +288,24 @@ public ClusterMetadataManifest writeIncrementalMetadata( customsToBeDeletedFromRemote.keySet().forEach(allUploadedCustomMap::remove); indicesToBeDeletedFromRemote.keySet().forEach(allUploadedIndexMetadata::remove); - final ClusterMetadataManifest manifest = uploadManifest( + if (!updateCoordinationMetadata) { + uploadedMetadataResults.uploadedCoordinationMetadata = previousManifest.getCoordinationMetadata(); + } + if (!updateSettingsMetadata) { + uploadedMetadataResults.uploadedSettingsMetadata = previousManifest.getSettingsMetadata(); + } + if (!updateTemplatesMetadata) { + uploadedMetadataResults.uploadedTemplatesMetadata = previousManifest.getTemplatesMetadata(); + } + if (!firstUploadForSplitGlobalMetadata && customsToUpload.isEmpty()) { + uploadedMetadataResults.uploadedCustomMetadataMap = previousManifest.getCustomMetadataMap(); + } + uploadedMetadataResults.uploadedIndexMetadata = new ArrayList<>(allUploadedIndexMetadata.values()); + + final ClusterMetadataManifest manifest = remoteManifestManager.uploadManifest( clusterState, - new ArrayList<>(allUploadedIndexMetadata.values()), + uploadedMetadataResults, previousManifest.getPreviousClusterUUID(), - updateCoordinationMetadata ? uploadedMetadataResults.uploadedCoordinationMetadata : previousManifest.getCoordinationMetadata(), - updateSettingsMetadata ? uploadedMetadataResults.uploadedSettingsMetadata : previousManifest.getSettingsMetadata(), - updateTemplatesMetadata ? uploadedMetadataResults.uploadedTemplatesMetadata : previousManifest.getTemplatesMetadata(), - firstUploadForSplitGlobalMetadata || !customsToUpload.isEmpty() - ? allUploadedCustomMap - : previousManifest.getCustomMetadataMap(), false ); @@ -450,12 +348,11 @@ private UploadedMetadataResults writeMetadataInParallel( boolean uploadSettingsMetadata, boolean uploadTemplateMetadata ) throws IOException { - assert Objects.nonNull(indexMetadataUploadListeners) : "indexMetadataUploadListeners can not be null"; int totalUploadTasks = indexToUpload.size() + indexMetadataUploadListeners.size() + customToUpload.size() + (uploadCoordinationMetadata ? 1 : 0) + (uploadSettingsMetadata ? 1 : 0) + (uploadTemplateMetadata ? 1 : 0); CountDownLatch latch = new CountDownLatch(totalUploadTasks); - Map> uploadTasks = new HashMap<>(totalUploadTasks); - Map results = new HashMap<>(totalUploadTasks); + Map> uploadTasks = new ConcurrentHashMap<>(totalUploadTasks); + Map results = new ConcurrentHashMap<>(totalUploadTasks); List exceptionList = Collections.synchronizedList(new ArrayList<>(totalUploadTasks)); LatchedActionListener listener = new LatchedActionListener<>( @@ -475,36 +372,36 @@ private UploadedMetadataResults writeMetadataInParallel( if (uploadSettingsMetadata) { uploadTasks.put( SETTING_METADATA, - getAsyncMetadataWriteAction( - clusterState, - SETTING_METADATA, - SETTINGS_METADATA_FORMAT, + remoteGlobalMetadataManager.getAsyncMetadataWriteAction( clusterState.metadata().persistentSettings(), - listener + clusterState.metadata().version(), + clusterState.metadata().clusterUUID(), + listener, + null ) ); } if (uploadCoordinationMetadata) { uploadTasks.put( COORDINATION_METADATA, - getAsyncMetadataWriteAction( - clusterState, - COORDINATION_METADATA, - COORDINATION_METADATA_FORMAT, + remoteGlobalMetadataManager.getAsyncMetadataWriteAction( clusterState.metadata().coordinationMetadata(), - listener + clusterState.metadata().version(), + clusterState.metadata().clusterUUID(), + listener, + null ) ); } if (uploadTemplateMetadata) { uploadTasks.put( TEMPLATES_METADATA, - getAsyncMetadataWriteAction( - clusterState, - TEMPLATES_METADATA, - TEMPLATES_METADATA_FORMAT, + remoteGlobalMetadataManager.getAsyncMetadataWriteAction( clusterState.metadata().templatesMetadata(), - listener + clusterState.metadata().version(), + clusterState.metadata().clusterUUID(), + listener, + null ) ); } @@ -512,22 +409,30 @@ private UploadedMetadataResults writeMetadataInParallel( String customComponent = String.join(CUSTOM_DELIMITER, CUSTOM_METADATA, key); uploadTasks.put( customComponent, - getAsyncMetadataWriteAction(clusterState, customComponent, CUSTOM_METADATA_FORMAT, value, listener) + remoteGlobalMetadataManager.getAsyncMetadataWriteAction( + value, + clusterState.metadata().version(), + clusterState.metadata().clusterUUID(), + listener, + key + ) ); }); indexToUpload.forEach(indexMetadata -> { - uploadTasks.put(indexMetadata.getIndex().getName(), getIndexMetadataAsyncAction(clusterState, indexMetadata, listener)); + uploadTasks.put( + indexMetadata.getIndex().getName(), + remoteIndexMetadataManager.getIndexMetadataAsyncAction(indexMetadata, clusterState.metadata().clusterUUID(), listener) + ); }); // start async upload of all required metadata files for (CheckedRunnable uploadTask : uploadTasks.values()) { uploadTask.run(); } - invokeIndexMetadataUploadListeners(indexToUpload, prevIndexMetadataByName, latch, exceptionList); try { - if (latch.await(getGlobalMetadataUploadTimeout().millis(), TimeUnit.MILLISECONDS) == false) { + if (latch.await(remoteGlobalMetadataManager.getGlobalMetadataUploadTimeout().millis(), TimeUnit.MILLISECONDS) == false) { // TODO: We should add metrics where transfer is timing out. [Issue: #10687] RemoteStateTransferException ex = new RemoteStateTransferException( String.format( @@ -644,79 +549,14 @@ private ActionListener getIndexMetadataUploadActionListener( ); } - /** - * Allows async Upload of IndexMetadata to remote - * - * @param clusterState current ClusterState - * @param indexMetadata {@link IndexMetadata} to upload - * @param latchedActionListener listener to respond back on after upload finishes - */ - private CheckedRunnable getIndexMetadataAsyncAction( - ClusterState clusterState, - IndexMetadata indexMetadata, - LatchedActionListener latchedActionListener - ) { - final BlobContainer indexMetadataContainer = indexMetadataContainer( - clusterState.getClusterName().value(), - clusterState.metadata().clusterUUID(), - indexMetadata.getIndexUUID() - ); - final String indexMetadataFilename = indexMetadataFileName(indexMetadata); - ActionListener completionListener = ActionListener.wrap( - resp -> latchedActionListener.onResponse( - new UploadedIndexMetadata( - indexMetadata.getIndex().getName(), - indexMetadata.getIndexUUID(), - indexMetadataContainer.path().buildAsString() + indexMetadataFilename - ) - ), - ex -> latchedActionListener.onFailure(new RemoteStateTransferException(indexMetadata.getIndex().toString(), ex)) - ); - - return () -> INDEX_METADATA_FORMAT.writeAsyncWithUrgentPriority( - indexMetadata, - indexMetadataContainer, - indexMetadataFilename, - blobStoreRepository.getCompressor(), - completionListener, - FORMAT_PARAMS - ); - } - - /** - * Allows async upload of Metadata components to remote - */ - - private CheckedRunnable getAsyncMetadataWriteAction( - ClusterState clusterState, - String component, - ChecksumBlobStoreFormat componentMetadataBlobStore, - ToXContent componentMetadata, - LatchedActionListener latchedActionListener - ) { - final BlobContainer globalMetadataContainer = globalMetadataContainer( - clusterState.getClusterName().value(), - clusterState.metadata().clusterUUID() - ); - final String componentMetadataFilename = metadataAttributeFileName(component, clusterState.metadata().version()); - ActionListener completionListener = ActionListener.wrap( - resp -> latchedActionListener.onResponse(new UploadedMetadataAttribute(component, componentMetadataFilename)), - ex -> latchedActionListener.onFailure(new RemoteStateTransferException(component, ex)) - ); - return () -> componentMetadataBlobStore.writeAsyncWithUrgentPriority( - componentMetadata, - globalMetadataContainer, - componentMetadataFilename, - blobStoreRepository.getCompressor(), - completionListener, - FORMAT_PARAMS - ); - } - public RemoteClusterStateCleanupManager getCleanupManager() { return remoteClusterStateCleanupManager; } + public RemoteManifestManager getRemoteManifestManager() { + return remoteManifestManager; + } + @Nullable public ClusterMetadataManifest markLastStateAsCommitted(ClusterState clusterState, ClusterMetadataManifest previousManifest) throws IOException { @@ -726,14 +566,18 @@ public ClusterMetadataManifest markLastStateAsCommitted(ClusterState clusterStat return null; } assert previousManifest != null : "Last cluster metadata manifest is not set"; - ClusterMetadataManifest committedManifest = uploadManifest( - clusterState, + UploadedMetadataResults uploadedMetadataResults = new UploadedMetadataResults( previousManifest.getIndices(), - previousManifest.getPreviousClusterUUID(), + previousManifest.getCustomMetadataMap(), previousManifest.getCoordinationMetadata(), previousManifest.getSettingsMetadata(), - previousManifest.getTemplatesMetadata(), - previousManifest.getCustomMetadataMap(), + previousManifest.getTemplatesMetadata() + ); + + ClusterMetadataManifest committedManifest = remoteManifestManager.uploadManifest( + clusterState, + uploadedMetadataResults, + previousManifest.getPreviousClusterUUID(), true ); if (!previousManifest.isClusterUUIDCommitted() && committedManifest.isClusterUUIDCommitted()) { @@ -743,6 +587,26 @@ public ClusterMetadataManifest markLastStateAsCommitted(ClusterState clusterStat return committedManifest; } + /** + * Fetch latest ClusterMetadataManifest from remote state store + * + * @param clusterUUID uuid of cluster state to refer to in remote + * @param clusterName name of the cluster + * @return ClusterMetadataManifest + */ + public Optional getLatestClusterMetadataManifest(String clusterName, String clusterUUID) { + return remoteManifestManager.getLatestClusterMetadataManifest(clusterName, clusterUUID); + } + + public Optional getClusterMetadataManifestByTermVersion( + String clusterName, + String clusterUUID, + long term, + long version + ) { + return remoteManifestManager.getClusterMetadataManifestByTermVersion(clusterName, clusterUUID, term, version); + } + @Override public void close() throws IOException { remoteClusterStateCleanupManager.close(); @@ -760,95 +624,34 @@ public void start() { final Repository repository = repositoriesService.get().repository(remoteStoreRepo); assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository"; blobStoreRepository = (BlobStoreRepository) repository; + String clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings).value(); + remoteGlobalMetadataManager = new RemoteGlobalMetadataManager( + blobStoreRepository, + clusterSettings, + threadpool, + getBlobStoreTransferService(), + clusterName + ); + remoteIndexMetadataManager = new RemoteIndexMetadataManager( + blobStoreRepository, + clusterSettings, + threadpool, + clusterName, + getBlobStoreTransferService() + ); + remoteManifestManager = new RemoteManifestManager( + getBlobStoreTransferService(), + blobStoreRepository, + clusterSettings, + nodeId, + threadpool, + clusterName + ); remoteClusterStateCleanupManager.start(); } - private ClusterMetadataManifest uploadManifest( - ClusterState clusterState, - List uploadedIndexMetadata, - String previousClusterUUID, - UploadedMetadataAttribute uploadedCoordinationMetadata, - UploadedMetadataAttribute uploadedSettingsMetadata, - UploadedMetadataAttribute uploadedTemplatesMetadata, - Map uploadedCustomMetadataMap, - boolean committed - ) throws IOException { - synchronized (this) { - final String manifestFileName = getManifestFileName( - clusterState.term(), - clusterState.version(), - committed, - MANIFEST_CURRENT_CODEC_VERSION - ); - final ClusterMetadataManifest manifest = new ClusterMetadataManifest( - clusterState.term(), - clusterState.getVersion(), - clusterState.metadata().clusterUUID(), - clusterState.stateUUID(), - Version.CURRENT, - nodeId, - committed, - MANIFEST_CURRENT_CODEC_VERSION, - null, - uploadedIndexMetadata, - previousClusterUUID, - clusterState.metadata().clusterUUIDCommitted(), - uploadedCoordinationMetadata, - uploadedSettingsMetadata, - uploadedTemplatesMetadata, - uploadedCustomMetadataMap - ); - writeMetadataManifest(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), manifest, manifestFileName); - return manifest; - } - } - - private void writeMetadataManifest(String clusterName, String clusterUUID, ClusterMetadataManifest uploadManifest, String fileName) - throws IOException { - AtomicReference result = new AtomicReference(); - AtomicReference exceptionReference = new AtomicReference(); - - final BlobContainer metadataManifestContainer = manifestContainer(clusterName, clusterUUID); - - // latch to wait until upload is not finished - CountDownLatch latch = new CountDownLatch(1); - - LatchedActionListener completionListener = new LatchedActionListener<>(ActionListener.wrap(resp -> { - logger.trace(String.format(Locale.ROOT, "Manifest file uploaded successfully.")); - }, ex -> { exceptionReference.set(ex); }), latch); - - getClusterMetadataManifestBlobStoreFormat(fileName).writeAsyncWithUrgentPriority( - uploadManifest, - metadataManifestContainer, - fileName, - blobStoreRepository.getCompressor(), - completionListener, - FORMAT_PARAMS - ); - - try { - if (latch.await(getMetadataManifestUploadTimeout().millis(), TimeUnit.MILLISECONDS) == false) { - RemoteStateTransferException ex = new RemoteStateTransferException( - String.format(Locale.ROOT, "Timed out waiting for transfer of manifest file to complete") - ); - throw ex; - } - } catch (InterruptedException ex) { - RemoteStateTransferException exception = new RemoteStateTransferException( - String.format(Locale.ROOT, "Timed out waiting for transfer of manifest file to complete - %s"), - ex - ); - Thread.currentThread().interrupt(); - throw exception; - } - if (exceptionReference.get() != null) { - throw new RemoteStateTransferException(exceptionReference.get().getMessage(), exceptionReference.get()); - } - logger.debug( - "Metadata manifest file [{}] written during [{}] phase. ", - fileName, - uploadManifest.isCommitted() ? "commit" : "publish" - ); + private void setSlowWriteLoggingThreshold(TimeValue slowWriteLoggingThreshold) { + this.slowWriteLoggingThreshold = slowWriteLoggingThreshold; } ThreadPool getThreadpool() { @@ -859,199 +662,19 @@ BlobStore getBlobStore() { return blobStoreRepository.blobStore(); } - private BlobContainer indexMetadataContainer(String clusterName, String clusterUUID, String indexUUID) { - // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/index/ftqsCnn9TgOX - return blobStoreRepository.blobStore() - .blobContainer(getCusterMetadataBasePath(clusterName, clusterUUID).add(INDEX_PATH_TOKEN).add(indexUUID)); - } - - private BlobContainer globalMetadataContainer(String clusterName, String clusterUUID) { - // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/global-metadata/ - return blobStoreRepository.blobStore() - .blobContainer(getCusterMetadataBasePath(clusterName, clusterUUID).add(GLOBAL_METADATA_PATH_TOKEN)); - } - - private BlobContainer manifestContainer(String clusterName, String clusterUUID) { - // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/manifest - return blobStoreRepository.blobStore().blobContainer(getManifestFolderPath(clusterName, clusterUUID)); - } - - BlobPath getCusterMetadataBasePath(String clusterName, String clusterUUID) { - return blobStoreRepository.basePath().add(encodeString(clusterName)).add(CLUSTER_STATE_PATH_TOKEN).add(clusterUUID); - } - - private BlobContainer clusterUUIDContainer(String clusterName) { - return blobStoreRepository.blobStore() - .blobContainer( - blobStoreRepository.basePath() - .add(Base64.getUrlEncoder().withoutPadding().encodeToString(clusterName.getBytes(StandardCharsets.UTF_8))) - .add(CLUSTER_STATE_PATH_TOKEN) - ); - } - - private void setSlowWriteLoggingThreshold(TimeValue slowWriteLoggingThreshold) { - this.slowWriteLoggingThreshold = slowWriteLoggingThreshold; - } - - private void setIndexMetadataUploadTimeout(TimeValue newIndexMetadataUploadTimeout) { - this.indexMetadataUploadTimeout = newIndexMetadataUploadTimeout; - } - - private void setGlobalMetadataUploadTimeout(TimeValue newGlobalMetadataUploadTimeout) { - this.globalMetadataUploadTimeout = newGlobalMetadataUploadTimeout; - } - - private void setMetadataManifestUploadTimeout(TimeValue newMetadataManifestUploadTimeout) { - this.metadataManifestUploadTimeout = newMetadataManifestUploadTimeout; - } - - private Map getUpdatedCustoms(ClusterState currentState, ClusterState previousState) { - if (Metadata.isCustomMetadataEqual(previousState.metadata(), currentState.metadata())) { - return new HashMap<>(); - } - Map updatedCustom = new HashMap<>(); - Set currentCustoms = new HashSet<>(currentState.metadata().customs().keySet()); - for (Map.Entry cursor : previousState.metadata().customs().entrySet()) { - if (cursor.getValue().context().contains(Metadata.XContentContext.GATEWAY)) { - if (currentCustoms.contains(cursor.getKey()) - && !cursor.getValue().equals(currentState.metadata().custom(cursor.getKey()))) { - // If the custom metadata is updated, we need to upload the new version. - updatedCustom.put(cursor.getKey(), currentState.metadata().custom(cursor.getKey())); - } - currentCustoms.remove(cursor.getKey()); - } - } - for (String custom : currentCustoms) { - Metadata.Custom cursor = currentState.metadata().custom(custom); - if (cursor.context().contains(Metadata.XContentContext.GATEWAY)) { - updatedCustom.put(custom, cursor); - } - } - return updatedCustom; - } - - public TimeValue getIndexMetadataUploadTimeout() { - return this.indexMetadataUploadTimeout; - } - - public TimeValue getGlobalMetadataUploadTimeout() { - return this.globalMetadataUploadTimeout; - } - - public TimeValue getMetadataManifestUploadTimeout() { - return this.metadataManifestUploadTimeout; - } - - static String getManifestFileName(long term, long version, boolean committed, int codecVersion) { - // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/manifest/manifest______C/P____ - return String.join( - DELIMITER, - MANIFEST_PATH_TOKEN, - RemoteStoreUtils.invertLong(term), - RemoteStoreUtils.invertLong(version), - (committed ? "C" : "P"), // C for committed and P for published - RemoteStoreUtils.invertLong(System.currentTimeMillis()), - String.valueOf(codecVersion) // Keep the codec version at last place only, during read we reads last place to - // determine codec version. - ); - } - - static String indexMetadataFileName(IndexMetadata indexMetadata) { - // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/index//metadata______ - return String.join( - DELIMITER, - METADATA_FILE_PREFIX, - RemoteStoreUtils.invertLong(indexMetadata.getVersion()), - RemoteStoreUtils.invertLong(System.currentTimeMillis()), - String.valueOf(INDEX_METADATA_CURRENT_CODEC_VERSION) // Keep the codec version at last place only, during read we reads last - // place to determine codec version. - ); - } - - private static String globalMetadataFileName(Metadata metadata) { - // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/global-metadata/metadata______ - return String.join( - DELIMITER, - METADATA_FILE_PREFIX, - RemoteStoreUtils.invertLong(metadata.version()), - RemoteStoreUtils.invertLong(System.currentTimeMillis()), - String.valueOf(GLOBAL_METADATA_CURRENT_CODEC_VERSION) - ); - } - - private static String metadataAttributeFileName(String componentPrefix, Long metadataVersion) { - // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/global-metadata/______ - return String.join( - DELIMITER, - componentPrefix, - RemoteStoreUtils.invertLong(metadataVersion), - RemoteStoreUtils.invertLong(System.currentTimeMillis()), - String.valueOf(GLOBAL_METADATA_CURRENT_CODEC_VERSION) - ); - } - - BlobPath getManifestFolderPath(String clusterName, String clusterUUID) { - return getCusterMetadataBasePath(clusterName, clusterUUID).add(MANIFEST_PATH_TOKEN); - } - - /** - * Fetch latest index metadata from remote cluster state - * - * @param clusterUUID uuid of cluster state to refer to in remote - * @param clusterName name of the cluster - * @param clusterMetadataManifest manifest file of cluster - * @return {@code Map} latest IndexUUID to IndexMetadata map - */ - private Map getIndexMetadataMap( - String clusterName, - String clusterUUID, - ClusterMetadataManifest clusterMetadataManifest - ) { - assert Objects.equals(clusterUUID, clusterMetadataManifest.getClusterUUID()) - : "Corrupt ClusterMetadataManifest found. Cluster UUID mismatch."; - Map remoteIndexMetadata = new HashMap<>(); - for (UploadedIndexMetadata uploadedIndexMetadata : clusterMetadataManifest.getIndices()) { - IndexMetadata indexMetadata = getIndexMetadata(clusterName, clusterUUID, uploadedIndexMetadata); - remoteIndexMetadata.put(uploadedIndexMetadata.getIndexUUID(), indexMetadata); - } - return remoteIndexMetadata; - } - - /** - * Fetch index metadata from remote cluster state - * - * @param clusterUUID uuid of cluster state to refer to in remote - * @param clusterName name of the cluster - * @param uploadedIndexMetadata {@link UploadedIndexMetadata} contains details about remote location of index metadata - * @return {@link IndexMetadata} - */ - private IndexMetadata getIndexMetadata(String clusterName, String clusterUUID, UploadedIndexMetadata uploadedIndexMetadata) { - BlobContainer blobContainer = indexMetadataContainer(clusterName, clusterUUID, uploadedIndexMetadata.getIndexUUID()); - try { - String[] splitPath = uploadedIndexMetadata.getUploadedFilename().split("/"); - return INDEX_METADATA_FORMAT.read( - blobContainer, - splitPath[splitPath.length - 1], - blobStoreRepository.getNamedXContentRegistry() - ); - } catch (IOException e) { - throw new IllegalStateException( - String.format(Locale.ROOT, "Error while downloading IndexMetadata - %s", uploadedIndexMetadata.getUploadedFilename()), - e - ); - } - } - /** * Fetch latest ClusterState from remote, including global metadata, index metadata and cluster state version * - * @param clusterUUID uuid of cluster state to refer to in remote * @param clusterName name of the cluster + * @param clusterUUID uuid of cluster state to refer to in remote * @return {@link IndexMetadata} */ - public ClusterState getLatestClusterState(String clusterName, String clusterUUID) { - Optional clusterMetadataManifest = getLatestClusterMetadataManifest(clusterName, clusterUUID); + public ClusterState getLatestClusterState(String clusterName, String clusterUUID) throws IOException { + start(); + Optional clusterMetadataManifest = remoteManifestManager.getLatestClusterMetadataManifest( + clusterName, + clusterUUID + ); if (clusterMetadataManifest.isEmpty()) { throw new IllegalStateException( String.format(Locale.ROOT, "Latest cluster metadata manifest is not present for the provided clusterUUID: %s", clusterUUID) @@ -1059,10 +682,10 @@ public ClusterState getLatestClusterState(String clusterName, String clusterUUID } // Fetch Global Metadata - Metadata globalMetadata = getGlobalMetadata(clusterName, clusterUUID, clusterMetadataManifest.get()); + Metadata globalMetadata = remoteGlobalMetadataManager.getGlobalMetadata(clusterName, clusterUUID, clusterMetadataManifest.get()); // Fetch Index Metadata - Map indices = getIndexMetadataMap(clusterName, clusterUUID, clusterMetadataManifest.get()); + Map indices = remoteIndexMetadataManager.getIndexMetadataMap(clusterUUID, clusterMetadataManifest.get()); Map indexMetadataMap = new HashMap<>(); indices.values().forEach(indexMetadata -> { indexMetadataMap.put(indexMetadata.getIndex().getName(), indexMetadata); }); @@ -1073,154 +696,6 @@ public ClusterState getLatestClusterState(String clusterName, String clusterUUID .build(); } - private Metadata getGlobalMetadata(String clusterName, String clusterUUID, ClusterMetadataManifest clusterMetadataManifest) { - String globalMetadataFileName = clusterMetadataManifest.getGlobalMetadataFileName(); - try { - // Fetch Global metadata - if (globalMetadataFileName != null) { - String[] splitPath = globalMetadataFileName.split("/"); - return GLOBAL_METADATA_FORMAT.read( - globalMetadataContainer(clusterName, clusterUUID), - splitPath[splitPath.length - 1], - blobStoreRepository.getNamedXContentRegistry() - ); - } else if (clusterMetadataManifest.hasMetadataAttributesFiles()) { - CoordinationMetadata coordinationMetadata = getCoordinationMetadata( - clusterName, - clusterUUID, - clusterMetadataManifest.getCoordinationMetadata().getUploadedFilename() - ); - Settings settingsMetadata = getSettingsMetadata( - clusterName, - clusterUUID, - clusterMetadataManifest.getSettingsMetadata().getUploadedFilename() - ); - TemplatesMetadata templatesMetadata = getTemplatesMetadata( - clusterName, - clusterUUID, - clusterMetadataManifest.getTemplatesMetadata().getUploadedFilename() - ); - Metadata.Builder builder = new Metadata.Builder(); - builder.coordinationMetadata(coordinationMetadata); - builder.persistentSettings(settingsMetadata); - builder.templates(templatesMetadata); - clusterMetadataManifest.getCustomMetadataMap() - .forEach( - (key, value) -> builder.putCustom( - key, - getCustomsMetadata(clusterName, clusterUUID, value.getUploadedFilename(), key) - ) - ); - return builder.build(); - } else { - return Metadata.EMPTY_METADATA; - } - } catch (IOException e) { - throw new IllegalStateException( - String.format(Locale.ROOT, "Error while downloading Global Metadata - %s", globalMetadataFileName), - e - ); - } - } - - private CoordinationMetadata getCoordinationMetadata(String clusterName, String clusterUUID, String coordinationMetadataFileName) { - try { - // Fetch Coordination metadata - if (coordinationMetadataFileName != null) { - String[] splitPath = coordinationMetadataFileName.split("/"); - return COORDINATION_METADATA_FORMAT.read( - globalMetadataContainer(clusterName, clusterUUID), - splitPath[splitPath.length - 1], - blobStoreRepository.getNamedXContentRegistry() - ); - } else { - return CoordinationMetadata.EMPTY_METADATA; - } - } catch (IOException e) { - throw new IllegalStateException( - String.format(Locale.ROOT, "Error while downloading Coordination Metadata - %s", coordinationMetadataFileName), - e - ); - } - } - - private Settings getSettingsMetadata(String clusterName, String clusterUUID, String settingsMetadataFileName) { - try { - // Fetch Settings metadata - if (settingsMetadataFileName != null) { - String[] splitPath = settingsMetadataFileName.split("/"); - return SETTINGS_METADATA_FORMAT.read( - globalMetadataContainer(clusterName, clusterUUID), - splitPath[splitPath.length - 1], - blobStoreRepository.getNamedXContentRegistry() - ); - } else { - return Settings.EMPTY; - } - } catch (IOException e) { - throw new IllegalStateException( - String.format(Locale.ROOT, "Error while downloading Settings Metadata - %s", settingsMetadataFileName), - e - ); - } - } - - private TemplatesMetadata getTemplatesMetadata(String clusterName, String clusterUUID, String templatesMetadataFileName) { - try { - // Fetch Templates metadata - if (templatesMetadataFileName != null) { - String[] splitPath = templatesMetadataFileName.split("/"); - return TEMPLATES_METADATA_FORMAT.read( - globalMetadataContainer(clusterName, clusterUUID), - splitPath[splitPath.length - 1], - blobStoreRepository.getNamedXContentRegistry() - ); - } else { - return TemplatesMetadata.EMPTY_METADATA; - } - } catch (IOException e) { - throw new IllegalStateException( - String.format(Locale.ROOT, "Error while downloading Templates Metadata - %s", templatesMetadataFileName), - e - ); - } - } - - private Metadata.Custom getCustomsMetadata(String clusterName, String clusterUUID, String customMetadataFileName, String custom) { - requireNonNull(customMetadataFileName); - try { - // Fetch Custom metadata - String[] splitPath = customMetadataFileName.split("/"); - ChecksumBlobStoreFormat customChecksumBlobStoreFormat = new ChecksumBlobStoreFormat<>( - "custom", - METADATA_NAME_FORMAT, - (parser -> Metadata.Custom.fromXContent(parser, custom)) - ); - return customChecksumBlobStoreFormat.read( - globalMetadataContainer(clusterName, clusterUUID), - splitPath[splitPath.length - 1], - blobStoreRepository.getNamedXContentRegistry() - ); - } catch (IOException e) { - throw new IllegalStateException( - String.format(Locale.ROOT, "Error while downloading Custom Metadata - %s", customMetadataFileName), - e - ); - } - } - - /** - * Fetch latest ClusterMetadataManifest from remote state store - * - * @param clusterUUID uuid of cluster state to refer to in remote - * @param clusterName name of the cluster - * @return ClusterMetadataManifest - */ - public Optional getLatestClusterMetadataManifest(String clusterName, String clusterUUID) { - Optional latestManifestFileName = getLatestManifestFileName(clusterName, clusterUUID); - return latestManifestFileName.map(s -> fetchRemoteClusterMetadataManifest(clusterName, clusterUUID, s)); - } - /** * Fetch the previous cluster UUIDs from remote state store and return the most recent valid cluster UUID * @@ -1230,7 +705,10 @@ public Optional getLatestClusterMetadataManifest(String public String getLastKnownUUIDFromRemote(String clusterName) { try { Set clusterUUIDs = getAllClusterUUIDs(clusterName); - Map latestManifests = getLatestManifestForAllClusterUUIDs(clusterName, clusterUUIDs); + Map latestManifests = remoteManifestManager.getLatestManifestForAllClusterUUIDs( + clusterName, + clusterUUIDs + ); List validChain = createClusterChain(latestManifests, clusterName); if (validChain.isEmpty()) { return ClusterState.UNKNOWN_UUID; @@ -1244,30 +722,21 @@ public String getLastKnownUUIDFromRemote(String clusterName) { } } + private BlobStoreTransferService getBlobStoreTransferService() { + if (blobStoreTransferService == null) { + blobStoreTransferService = new BlobStoreTransferService(getBlobStore(), threadpool); + } + return blobStoreTransferService; + } + Set getAllClusterUUIDs(String clusterName) throws IOException { - Map clusterUUIDMetadata = clusterUUIDContainer(clusterName).children(); + Map clusterUUIDMetadata = clusterUUIDContainer(blobStoreRepository, clusterName).children(); if (clusterUUIDMetadata == null) { return Collections.emptySet(); } return Collections.unmodifiableSet(clusterUUIDMetadata.keySet()); } - private Map getLatestManifestForAllClusterUUIDs(String clusterName, Set clusterUUIDs) { - Map manifestsByClusterUUID = new HashMap<>(); - for (String clusterUUID : clusterUUIDs) { - try { - Optional manifest = getLatestClusterMetadataManifest(clusterName, clusterUUID); - manifest.ifPresent(clusterMetadataManifest -> manifestsByClusterUUID.put(clusterUUID, clusterMetadataManifest)); - } catch (Exception e) { - throw new IllegalStateException( - String.format(Locale.ROOT, "Exception in fetching manifest for clusterUUID: %s", clusterUUID), - e - ); - } - } - return manifestsByClusterUUID; - } - /** * This method creates a valid cluster UUID chain. * @@ -1329,7 +798,6 @@ private List createClusterChain(final Map trimClusterUUIDs( @@ -1346,7 +814,7 @@ private Map trimClusterUUIDs( if (!ClusterState.UNKNOWN_UUID.equals(currentManifest.getPreviousClusterUUID())) { ClusterMetadataManifest previousManifest = trimmedUUIDs.get(currentManifest.getPreviousClusterUUID()); if (isMetadataEqual(currentManifest, previousManifest, clusterName) - && isGlobalMetadataEqual(currentManifest, previousManifest, clusterName)) { + && remoteGlobalMetadataManager.isGlobalMetadataEqual(currentManifest, previousManifest, clusterName)) { trimmedUUIDs.remove(clusterUUID); } } @@ -1361,14 +829,22 @@ private boolean isMetadataEqual(ClusterMetadataManifest first, ClusterMetadataMa } final Map secondIndices = second.getIndices() .stream() - .collect(Collectors.toMap(md -> md.getIndexName(), Function.identity())); + .collect(Collectors.toMap(UploadedIndexMetadata::getIndexName, Function.identity())); for (UploadedIndexMetadata uploadedIndexMetadata : first.getIndices()) { - final IndexMetadata firstIndexMetadata = getIndexMetadata(clusterName, first.getClusterUUID(), uploadedIndexMetadata); + final IndexMetadata firstIndexMetadata = remoteIndexMetadataManager.getIndexMetadata( + uploadedIndexMetadata, + first.getClusterUUID(), + first.getCodecVersion() + ); final UploadedIndexMetadata secondUploadedIndexMetadata = secondIndices.get(uploadedIndexMetadata.getIndexName()); if (secondUploadedIndexMetadata == null) { return false; } - final IndexMetadata secondIndexMetadata = getIndexMetadata(clusterName, second.getClusterUUID(), secondUploadedIndexMetadata); + final IndexMetadata secondIndexMetadata = remoteIndexMetadataManager.getIndexMetadata( + secondUploadedIndexMetadata, + second.getClusterUUID(), + second.getCodecVersion() + ); if (firstIndexMetadata.equals(secondIndexMetadata) == false) { return false; } @@ -1376,107 +852,10 @@ private boolean isMetadataEqual(ClusterMetadataManifest first, ClusterMetadataMa return true; } - private boolean isGlobalMetadataEqual(ClusterMetadataManifest first, ClusterMetadataManifest second, String clusterName) { - Metadata secondGlobalMetadata = getGlobalMetadata(clusterName, second.getClusterUUID(), second); - Metadata firstGlobalMetadata = getGlobalMetadata(clusterName, first.getClusterUUID(), first); - return Metadata.isGlobalResourcesMetadataEquals(firstGlobalMetadata, secondGlobalMetadata); - } - private boolean isValidClusterUUID(ClusterMetadataManifest manifest) { return manifest.isClusterUUIDCommitted(); } - /** - * Fetch ClusterMetadataManifest files from remote state store in order - * - * @param clusterUUID uuid of cluster state to refer to in remote - * @param clusterName name of the cluster - * @param limit max no of files to fetch - * @return all manifest file names - */ - private List getManifestFileNames(String clusterName, String clusterUUID, int limit) throws IllegalStateException { - try { - - /* - {@link BlobContainer#listBlobsByPrefixInSortedOrder} will list the latest manifest file first - as the manifest file name generated via {@link RemoteClusterStateService#getManifestFileName} ensures - when sorted in LEXICOGRAPHIC order the latest uploaded manifest file comes on top. - */ - return manifestContainer(clusterName, clusterUUID).listBlobsByPrefixInSortedOrder( - MANIFEST_FILE_PREFIX + DELIMITER, - limit, - BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC - ); - } catch (IOException e) { - throw new IllegalStateException("Error while fetching latest manifest file for remote cluster state", e); - } - } - - /** - * Fetch latest ClusterMetadataManifest file from remote state store - * - * @param clusterUUID uuid of cluster state to refer to in remote - * @param clusterName name of the cluster - * @return latest ClusterMetadataManifest filename - */ - private Optional getLatestManifestFileName(String clusterName, String clusterUUID) throws IllegalStateException { - List manifestFilesMetadata = getManifestFileNames(clusterName, clusterUUID, 1); - if (manifestFilesMetadata != null && !manifestFilesMetadata.isEmpty()) { - return Optional.of(manifestFilesMetadata.get(0).name()); - } - logger.info("No manifest file present in remote store for cluster name: {}, cluster UUID: {}", clusterName, clusterUUID); - return Optional.empty(); - } - - /** - * Fetch ClusterMetadataManifest from remote state store - * - * @param clusterUUID uuid of cluster state to refer to in remote - * @param clusterName name of the cluster - * @return ClusterMetadataManifest - */ - ClusterMetadataManifest fetchRemoteClusterMetadataManifest(String clusterName, String clusterUUID, String filename) - throws IllegalStateException { - try { - return getClusterMetadataManifestBlobStoreFormat(filename).read( - manifestContainer(clusterName, clusterUUID), - filename, - blobStoreRepository.getNamedXContentRegistry() - ); - } catch (IOException e) { - throw new IllegalStateException(String.format(Locale.ROOT, "Error while downloading cluster metadata - %s", filename), e); - } - } - - private ChecksumBlobStoreFormat getClusterMetadataManifestBlobStoreFormat(String fileName) { - long codecVersion = getManifestCodecVersion(fileName); - if (codecVersion == MANIFEST_CURRENT_CODEC_VERSION) { - return CLUSTER_METADATA_MANIFEST_FORMAT; - } else if (codecVersion == ClusterMetadataManifest.CODEC_V1) { - return CLUSTER_METADATA_MANIFEST_FORMAT_V1; - } else if (codecVersion == ClusterMetadataManifest.CODEC_V0) { - return CLUSTER_METADATA_MANIFEST_FORMAT_V0; - } - - throw new IllegalArgumentException("Cluster metadata manifest file is corrupted, don't have valid codec version"); - } - - private int getManifestCodecVersion(String fileName) { - String[] splitName = fileName.split(DELIMITER); - if (splitName.length == SPLITED_MANIFEST_FILE_LENGTH) { - return Integer.parseInt(splitName[splitName.length - 1]); // Last value would be codec version. - } else if (splitName.length < SPLITED_MANIFEST_FILE_LENGTH) { // Where codec is not part of file name, i.e. default codec version 0 - // is used. - return ClusterMetadataManifest.CODEC_V0; - } else { - throw new IllegalArgumentException("Manifest file name is corrupted"); - } - } - - public static String encodeString(String content) { - return Base64.getUrlEncoder().withoutPadding().encodeToString(content.getBytes(StandardCharsets.UTF_8)); - } - public void writeMetadataFailed() { getStats().stateFailed(); } @@ -1498,34 +877,4 @@ public RemoteStateTransferException(String errorDesc, Throwable cause) { public RemotePersistenceStats getStats() { return remoteStateStats; } - - private static class UploadedMetadataResults { - List uploadedIndexMetadata; - Map uploadedCustomMetadataMap; - UploadedMetadataAttribute uploadedCoordinationMetadata; - UploadedMetadataAttribute uploadedSettingsMetadata; - UploadedMetadataAttribute uploadedTemplatesMetadata; - - public UploadedMetadataResults( - List uploadedIndexMetadata, - Map uploadedCustomMetadataMap, - UploadedMetadataAttribute uploadedCoordinationMetadata, - UploadedMetadataAttribute uploadedSettingsMetadata, - UploadedMetadataAttribute uploadedTemplatesMetadata - ) { - this.uploadedIndexMetadata = uploadedIndexMetadata; - this.uploadedCustomMetadataMap = uploadedCustomMetadataMap; - this.uploadedCoordinationMetadata = uploadedCoordinationMetadata; - this.uploadedSettingsMetadata = uploadedSettingsMetadata; - this.uploadedTemplatesMetadata = uploadedTemplatesMetadata; - } - - public UploadedMetadataResults() { - this.uploadedIndexMetadata = new ArrayList<>(); - this.uploadedCustomMetadataMap = new HashMap<>(); - this.uploadedCoordinationMetadata = null; - this.uploadedSettingsMetadata = null; - this.uploadedTemplatesMetadata = null; - } - } } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateUtils.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateUtils.java new file mode 100644 index 0000000000000..74a59b6ff6f44 --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateUtils.java @@ -0,0 +1,102 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote; + +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.repositories.blobstore.BlobStoreRepository; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Base64; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +public class RemoteClusterStateUtils { + public static final String METADATA_NAME_FORMAT = "%s.dat"; + public static final String METADATA_FILE_PREFIX = "metadata"; + public static final String CLUSTER_STATE_PATH_TOKEN = "cluster-state"; + public static final String GLOBAL_METADATA_PATH_TOKEN = "global-metadata"; + public static final String DELIMITER = "__"; + + // ToXContent Params with gateway mode. + // We are using gateway context mode to persist all custom metadata. + public static final ToXContent.Params FORMAT_PARAMS = new ToXContent.MapParams( + Map.of(Metadata.CONTEXT_MODE_PARAM, Metadata.CONTEXT_MODE_GATEWAY) + ); + + public static BlobPath getCusterMetadataBasePath(BlobStoreRepository blobStoreRepository, String clusterName, String clusterUUID) { + return blobStoreRepository.basePath().add(encodeString(clusterName)).add(CLUSTER_STATE_PATH_TOKEN).add(clusterUUID); + } + + public static String encodeString(String content) { + return Base64.getUrlEncoder().withoutPadding().encodeToString(content.getBytes(StandardCharsets.UTF_8)); + } + + public static String getFormattedFileName(String fileName, int codecVersion) { + return String.format(Locale.ROOT, METADATA_NAME_FORMAT, fileName); + } + + static BlobContainer clusterUUIDContainer(BlobStoreRepository blobStoreRepository, String clusterName) { + return blobStoreRepository.blobStore() + .blobContainer( + blobStoreRepository.basePath() + .add(Base64.getUrlEncoder().withoutPadding().encodeToString(clusterName.getBytes(StandardCharsets.UTF_8))) + .add(CLUSTER_STATE_PATH_TOKEN) + ); + } + + /** + * Exception for Remote state transfer. + */ + public static class RemoteStateTransferException extends RuntimeException { + + public RemoteStateTransferException(String errorDesc) { + super(errorDesc); + } + + public RemoteStateTransferException(String errorDesc, Throwable cause) { + super(errorDesc, cause); + } + } + + public static class UploadedMetadataResults { + List uploadedIndexMetadata; + Map uploadedCustomMetadataMap; + ClusterMetadataManifest.UploadedMetadataAttribute uploadedCoordinationMetadata; + ClusterMetadataManifest.UploadedMetadataAttribute uploadedSettingsMetadata; + ClusterMetadataManifest.UploadedMetadataAttribute uploadedTemplatesMetadata; + + public UploadedMetadataResults( + List uploadedIndexMetadata, + Map uploadedCustomMetadataMap, + ClusterMetadataManifest.UploadedMetadataAttribute uploadedCoordinationMetadata, + ClusterMetadataManifest.UploadedMetadataAttribute uploadedSettingsMetadata, + ClusterMetadataManifest.UploadedMetadataAttribute uploadedTemplatesMetadata + ) { + this.uploadedIndexMetadata = uploadedIndexMetadata; + this.uploadedCustomMetadataMap = uploadedCustomMetadataMap; + this.uploadedCoordinationMetadata = uploadedCoordinationMetadata; + this.uploadedSettingsMetadata = uploadedSettingsMetadata; + this.uploadedTemplatesMetadata = uploadedTemplatesMetadata; + } + + public UploadedMetadataResults() { + this.uploadedIndexMetadata = new ArrayList<>(); + this.uploadedCustomMetadataMap = new HashMap<>(); + this.uploadedCoordinationMetadata = null; + this.uploadedSettingsMetadata = null; + this.uploadedTemplatesMetadata = null; + } + } +} diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManager.java new file mode 100644 index 0000000000000..2d84461907d52 --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManager.java @@ -0,0 +1,352 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote; + +import org.opensearch.action.LatchedActionListener; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.coordination.CoordinationMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.metadata.Metadata.Custom; +import org.opensearch.cluster.metadata.TemplatesMetadata; +import org.opensearch.common.CheckedRunnable; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.gateway.remote.model.AbstractRemoteBlobObject; +import org.opensearch.gateway.remote.model.RemoteCoordinationMetadata; +import org.opensearch.gateway.remote.model.RemoteCoordinationMetadataBlobStore; +import org.opensearch.gateway.remote.model.RemoteCustomMetadata; +import org.opensearch.gateway.remote.model.RemoteCustomMetadataBlobStore; +import org.opensearch.gateway.remote.model.RemotePersistentSettingsBlobStore; +import org.opensearch.gateway.remote.model.RemotePersistentSettingsMetadata; +import org.opensearch.gateway.remote.model.RemoteTemplatesMetadata; +import org.opensearch.gateway.remote.model.RemoteTemplatesMetadataBlobStore; +import org.opensearch.index.translog.transfer.BlobStoreTransferService; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat; +import org.opensearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Locale; +import java.util.Map; +import java.util.Set; + +import static java.util.Objects.requireNonNull; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.GLOBAL_METADATA_PATH_TOKEN; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.METADATA_NAME_FORMAT; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.RemoteStateTransferException; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.getCusterMetadataBasePath; + +public class RemoteGlobalMetadataManager { + + public static final String COORDINATION_METADATA = "coordination"; + public static final String SETTING_METADATA = "settings"; + public static final String TEMPLATES_METADATA = "templates"; + public static final String CUSTOM_METADATA = "custom"; + public static final String CUSTOM_DELIMITER = "--"; + + public static final TimeValue GLOBAL_METADATA_UPLOAD_TIMEOUT_DEFAULT = TimeValue.timeValueMillis(20000); + + public static final Setting GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING = Setting.timeSetting( + "cluster.remote_store.state.global_metadata.upload_timeout", + GLOBAL_METADATA_UPLOAD_TIMEOUT_DEFAULT, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + public static final ChecksumBlobStoreFormat GLOBAL_METADATA_FORMAT = new ChecksumBlobStoreFormat<>( + "metadata", + METADATA_NAME_FORMAT, + Metadata::fromXContent + ); + + public static final ChecksumBlobStoreFormat COORDINATION_METADATA_FORMAT = new ChecksumBlobStoreFormat<>( + "coordination", + METADATA_NAME_FORMAT, + CoordinationMetadata::fromXContent + ); + + public static final ChecksumBlobStoreFormat SETTINGS_METADATA_FORMAT = new ChecksumBlobStoreFormat<>( + "settings", + METADATA_NAME_FORMAT, + Settings::fromXContent + ); + + public static final ChecksumBlobStoreFormat TEMPLATES_METADATA_FORMAT = new ChecksumBlobStoreFormat<>( + "templates", + METADATA_NAME_FORMAT, + TemplatesMetadata::fromXContent + ); + + public static final int GLOBAL_METADATA_CURRENT_CODEC_VERSION = 1; + + private final BlobStoreRepository blobStoreRepository; + private final ThreadPool threadPool; + + private volatile TimeValue globalMetadataUploadTimeout; + private final BlobStoreTransferService blobStoreTransferService; + private final String clusterName; + private final RemoteCoordinationMetadataBlobStore coordinationMetadataBlobStore; + private final RemotePersistentSettingsBlobStore persistentSettingsBlobStore; + private final RemoteTemplatesMetadataBlobStore templatesMetadataBlobStore; + private final RemoteCustomMetadataBlobStore customMetadataBlobStore; + + RemoteGlobalMetadataManager( + BlobStoreRepository blobStoreRepository, + ClusterSettings clusterSettings, + ThreadPool threadPool, + BlobStoreTransferService blobStoreTransferService, + String clusterName + ) { + this.blobStoreRepository = blobStoreRepository; + this.globalMetadataUploadTimeout = clusterSettings.get(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING); + this.threadPool = threadPool; + this.blobStoreTransferService = blobStoreTransferService; + this.clusterName = clusterName; + this.coordinationMetadataBlobStore = new RemoteCoordinationMetadataBlobStore( + blobStoreTransferService, + blobStoreRepository, + clusterName, + threadPool + ); + this.persistentSettingsBlobStore = new RemotePersistentSettingsBlobStore( + blobStoreTransferService, + blobStoreRepository, + clusterName, + threadPool + ); + this.templatesMetadataBlobStore = new RemoteTemplatesMetadataBlobStore( + blobStoreTransferService, + blobStoreRepository, + clusterName, + threadPool + ); + this.customMetadataBlobStore = new RemoteCustomMetadataBlobStore( + blobStoreTransferService, + blobStoreRepository, + clusterName, + threadPool + ); + clusterSettings.addSettingsUpdateConsumer(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING, this::setGlobalMetadataUploadTimeout); + } + + /** + * Allows async upload of Metadata components to remote + */ + CheckedRunnable getAsyncMetadataWriteAction( + Object objectToUpload, + long metadataVersion, + String clusterUUID, + LatchedActionListener latchedActionListener, + String customType + ) { + if (objectToUpload instanceof CoordinationMetadata) { + RemoteCoordinationMetadata remoteCoordinationMetadata = new RemoteCoordinationMetadata( + (CoordinationMetadata) objectToUpload, + metadataVersion, + clusterUUID, + blobStoreRepository + ); + return () -> coordinationMetadataBlobStore.writeAsync( + remoteCoordinationMetadata, + getActionListener(remoteCoordinationMetadata, latchedActionListener) + ); + } else if (objectToUpload instanceof Settings) { + RemotePersistentSettingsMetadata remotePersistentSettingsMetadata = new RemotePersistentSettingsMetadata( + (Settings) objectToUpload, + metadataVersion, + clusterUUID, + blobStoreRepository + ); + return () -> persistentSettingsBlobStore.writeAsync( + remotePersistentSettingsMetadata, + getActionListener(remotePersistentSettingsMetadata, latchedActionListener) + ); + } else if (objectToUpload instanceof TemplatesMetadata) { + RemoteTemplatesMetadata remoteTemplatesMetadata = new RemoteTemplatesMetadata( + (TemplatesMetadata) objectToUpload, + metadataVersion, + clusterUUID, + blobStoreRepository + ); + return () -> templatesMetadataBlobStore.writeAsync( + remoteTemplatesMetadata, + getActionListener(remoteTemplatesMetadata, latchedActionListener) + ); + } else if (objectToUpload instanceof Custom) { + RemoteCustomMetadata remoteCustomMetadata = new RemoteCustomMetadata( + (Custom) objectToUpload, + customType, + metadataVersion, + clusterUUID, + blobStoreRepository + ); + return () -> customMetadataBlobStore.writeAsync( + remoteCustomMetadata, + getActionListener(remoteCustomMetadata, latchedActionListener) + ); + } + throw new RemoteStateTransferException("Remote object cannot be created for " + objectToUpload.getClass()); + } + + private ActionListener getActionListener( + AbstractRemoteBlobObject remoteBlobStoreObject, + LatchedActionListener latchedActionListener + ) { + return ActionListener.wrap( + resp -> latchedActionListener.onResponse(remoteBlobStoreObject.getUploadedMetadata()), + ex -> latchedActionListener.onFailure(new RemoteStateTransferException("Upload failed", ex)) + ); + } + + Metadata getGlobalMetadata(String clusterName, String clusterUUID, ClusterMetadataManifest clusterMetadataManifest) { + String globalMetadataFileName = clusterMetadataManifest.getGlobalMetadataFileName(); + try { + // Fetch Global metadata + if (globalMetadataFileName != null) { + String[] splitPath = globalMetadataFileName.split("/"); + return GLOBAL_METADATA_FORMAT.read( + globalMetadataContainer(clusterName, clusterUUID), + splitPath[splitPath.length - 1], + blobStoreRepository.getNamedXContentRegistry() + ); + } else if (clusterMetadataManifest.hasMetadataAttributesFiles()) { + CoordinationMetadata coordinationMetadata = CoordinationMetadata.EMPTY_METADATA; + Settings settingsMetadata = Settings.EMPTY; + TemplatesMetadata templatesMetadata = TemplatesMetadata.EMPTY_METADATA; + if (clusterMetadataManifest.getCoordinationMetadata().getUploadedFilename() != null) { + coordinationMetadata = getMetadataAttribute( + new RemoteCoordinationMetadata( + clusterMetadataManifest.getCoordinationMetadata().getUploadedFilename(), + clusterUUID, + blobStoreRepository + ), + coordinationMetadataBlobStore + ); + } + if (clusterMetadataManifest.getSettingsMetadata().getUploadedFilename() != null) { + settingsMetadata = getMetadataAttribute( + new RemotePersistentSettingsMetadata( + clusterMetadataManifest.getSettingsMetadata().getUploadedFilename(), + clusterUUID, + blobStoreRepository + ), + persistentSettingsBlobStore + ); + } + if (clusterMetadataManifest.getTemplatesMetadata().getUploadedFilename() != null) { + templatesMetadata = getMetadataAttribute( + new RemoteTemplatesMetadata( + clusterMetadataManifest.getTemplatesMetadata().getUploadedFilename(), + clusterUUID, + blobStoreRepository + ), + templatesMetadataBlobStore + ); + } + Metadata.Builder builder = new Metadata.Builder(); + builder.coordinationMetadata(coordinationMetadata); + builder.persistentSettings(settingsMetadata); + builder.templates(templatesMetadata); + builder.clusterUUID(clusterMetadataManifest.getClusterUUID()); + builder.clusterUUIDCommitted(clusterMetadataManifest.isClusterUUIDCommitted()); + clusterMetadataManifest + .getCustomMetadataMap() + .forEach((key, value) -> + builder.putCustom( + key, + getMetadataAttribute( + new RemoteCustomMetadata( + value.getUploadedFilename(), + key, + clusterUUID, + blobStoreRepository + ), + customMetadataBlobStore + ) + ) + ); + return builder.build(); + } else { + return Metadata.EMPTY_METADATA; + } + } catch (IOException e) { + throw new IllegalStateException( + String.format(Locale.ROOT, "Error while downloading Global Metadata - %s", globalMetadataFileName), + e + ); + } + } + + public ToXContent getMetadataAttribute(RemoteObject object, RemoteObjectStore store) { + try { + if (object != null) { + return store.read(object); + } else { + return null; + } + } catch (IOException e) { + throw new IllegalStateException( + String.format(Locale.ROOT, "Error while downloading Metadata Attribute - %s", object.get()), + e + ); + } + } + + Map getUpdatedCustoms(ClusterState currentState, ClusterState previousState) { + if (Metadata.isCustomMetadataEqual(previousState.metadata(), currentState.metadata())) { + return new HashMap<>(); + } + Map updatedCustom = new HashMap<>(); + Set currentCustoms = new HashSet<>(currentState.metadata().customs().keySet()); + for (Map.Entry cursor : previousState.metadata().customs().entrySet()) { + if (cursor.getValue().context().contains(Metadata.XContentContext.GATEWAY)) { + if (currentCustoms.contains(cursor.getKey()) + && !cursor.getValue().equals(currentState.metadata().custom(cursor.getKey()))) { + // If the custom metadata is updated, we need to upload the new version. + updatedCustom.put(cursor.getKey(), currentState.metadata().custom(cursor.getKey())); + } + currentCustoms.remove(cursor.getKey()); + } + } + for (String custom : currentCustoms) { + Metadata.Custom cursor = currentState.metadata().custom(custom); + if (cursor.context().contains(Metadata.XContentContext.GATEWAY)) { + updatedCustom.put(custom, cursor); + } + } + return updatedCustom; + } + + private BlobContainer globalMetadataContainer(String clusterName, String clusterUUID) { + // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/global-metadata/ + return blobStoreRepository.blobStore() + .blobContainer(getCusterMetadataBasePath(blobStoreRepository, clusterName, clusterUUID).add(GLOBAL_METADATA_PATH_TOKEN)); + } + + boolean isGlobalMetadataEqual(ClusterMetadataManifest first, ClusterMetadataManifest second, String clusterName) { + Metadata secondGlobalMetadata = getGlobalMetadata(clusterName, second.getClusterUUID(), second); + Metadata firstGlobalMetadata = getGlobalMetadata(clusterName, first.getClusterUUID(), first); + return Metadata.isGlobalResourcesMetadataEquals(firstGlobalMetadata, secondGlobalMetadata); + } + + private void setGlobalMetadataUploadTimeout(TimeValue newGlobalMetadataUploadTimeout) { + this.globalMetadataUploadTimeout = newGlobalMetadataUploadTimeout; + } + + public TimeValue getGlobalMetadataUploadTimeout() { + return this.globalMetadataUploadTimeout; + } +} diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteIndexMetadataManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteIndexMetadataManager.java new file mode 100644 index 0000000000000..819b9498a3854 --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteIndexMetadataManager.java @@ -0,0 +1,133 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote; + +import org.opensearch.action.LatchedActionListener; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.CheckedRunnable; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.action.ActionListener; +import org.opensearch.gateway.remote.model.RemoteIndexMetadata; +import org.opensearch.gateway.remote.model.RemoteIndexMetadataBlobStore; +import org.opensearch.index.translog.transfer.BlobStoreTransferService; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; + +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.RemoteStateTransferException; + +public class RemoteIndexMetadataManager { + + public static final TimeValue INDEX_METADATA_UPLOAD_TIMEOUT_DEFAULT = TimeValue.timeValueMillis(20000); + + public static final Setting INDEX_METADATA_UPLOAD_TIMEOUT_SETTING = Setting.timeSetting( + "cluster.remote_store.state.index_metadata.upload_timeout", + INDEX_METADATA_UPLOAD_TIMEOUT_DEFAULT, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + private final BlobStoreRepository blobStoreRepository; + + private final RemoteIndexMetadataBlobStore indexMetadataBlobStore; + + private volatile TimeValue indexMetadataUploadTimeout; + + public RemoteIndexMetadataManager( + BlobStoreRepository blobStoreRepository, + ClusterSettings clusterSettings, + ThreadPool threadPool, + String clusterName, + BlobStoreTransferService blobStoreTransferService + ) { + indexMetadataBlobStore = new RemoteIndexMetadataBlobStore(blobStoreTransferService, blobStoreRepository, clusterName, threadPool); + this.blobStoreRepository = blobStoreRepository; + this.indexMetadataUploadTimeout = clusterSettings.get(INDEX_METADATA_UPLOAD_TIMEOUT_SETTING); + clusterSettings.addSettingsUpdateConsumer(INDEX_METADATA_UPLOAD_TIMEOUT_SETTING, this::setIndexMetadataUploadTimeout); + } + + /** + * Allows async Upload of IndexMetadata to remote + * + * @param indexMetadata {@link IndexMetadata} to upload + * @param latchedActionListener listener to respond back on after upload finishes + */ + CheckedRunnable getIndexMetadataAsyncAction( + IndexMetadata indexMetadata, + String clusterUUID, + LatchedActionListener latchedActionListener + ) { + RemoteIndexMetadata remoteIndexMetadata = new RemoteIndexMetadata(indexMetadata, clusterUUID, blobStoreRepository); + ActionListener completionListener = ActionListener.wrap( + resp -> latchedActionListener.onResponse(remoteIndexMetadata.getUploadedMetadata()), + ex -> latchedActionListener.onFailure(new RemoteStateTransferException(indexMetadata.getIndex().getName(), ex)) + ); + return () -> indexMetadataBlobStore.writeAsync(remoteIndexMetadata, completionListener); + } + + /** + * Fetch index metadata from remote cluster state + * + * @param uploadedIndexMetadata {@link ClusterMetadataManifest.UploadedIndexMetadata} contains details about remote location of index metadata + * @return {@link IndexMetadata} + */ + IndexMetadata getIndexMetadata( + ClusterMetadataManifest.UploadedIndexMetadata uploadedIndexMetadata, + String clusterUUID, + int manifestCodecVersion + ) { + RemoteIndexMetadata remoteIndexMetadata = new RemoteIndexMetadata( + RemoteClusterStateUtils.getFormattedFileName(uploadedIndexMetadata.getUploadedFilename(), manifestCodecVersion), + clusterUUID, + blobStoreRepository + ); + try { + return indexMetadataBlobStore.read(remoteIndexMetadata); + } catch (IOException e) { + throw new IllegalStateException( + String.format(Locale.ROOT, "Error while downloading IndexMetadata - %s", uploadedIndexMetadata.getUploadedFilename()), + e + ); + } + } + + /** + * Fetch latest index metadata from remote cluster state + * + * @param clusterUUID uuid of cluster state to refer to in remote + * @param clusterMetadataManifest manifest file of cluster + * @return {@code Map} latest IndexUUID to IndexMetadata map + */ + Map getIndexMetadataMap(String clusterUUID, ClusterMetadataManifest clusterMetadataManifest) { + assert Objects.equals(clusterUUID, clusterMetadataManifest.getClusterUUID()) + : "Corrupt ClusterMetadataManifest found. Cluster UUID mismatch."; + Map remoteIndexMetadata = new HashMap<>(); + for (ClusterMetadataManifest.UploadedIndexMetadata uploadedIndexMetadata : clusterMetadataManifest.getIndices()) { + IndexMetadata indexMetadata = getIndexMetadata(uploadedIndexMetadata, clusterUUID, clusterMetadataManifest.getCodecVersion()); + remoteIndexMetadata.put(uploadedIndexMetadata.getIndexUUID(), indexMetadata); + } + return remoteIndexMetadata; + } + + public TimeValue getIndexMetadataUploadTimeout() { + return this.indexMetadataUploadTimeout; + } + + private void setIndexMetadataUploadTimeout(TimeValue newIndexMetadataUploadTimeout) { + this.indexMetadataUploadTimeout = newIndexMetadataUploadTimeout; + } + +} diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteManifestManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteManifestManager.java new file mode 100644 index 0000000000000..78ea4772027f0 --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteManifestManager.java @@ -0,0 +1,313 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.Version; +import org.opensearch.action.LatchedActionListener; +import org.opensearch.cluster.ClusterState; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobMetadata; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.action.ActionListener; +import org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest; +import org.opensearch.gateway.remote.model.RemoteClusterMetadataManifestBlobStore; +import org.opensearch.index.remote.RemoteStoreUtils; +import org.opensearch.index.translog.transfer.BlobStoreTransferService; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.RemoteStateTransferException; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.getCusterMetadataBasePath; + +public class RemoteManifestManager { + + public static final TimeValue METADATA_MANIFEST_UPLOAD_TIMEOUT_DEFAULT = TimeValue.timeValueMillis(20000); + + public static final Setting METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING = Setting.timeSetting( + "cluster.remote_store.state.metadata_manifest.upload_timeout", + METADATA_MANIFEST_UPLOAD_TIMEOUT_DEFAULT, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + private final BlobStoreRepository blobStoreRepository; + private volatile TimeValue metadataManifestUploadTimeout; + private final String nodeId; + private final RemoteClusterMetadataManifestBlobStore manifestBlobStore; + private static final Logger logger = LogManager.getLogger(RemoteManifestManager.class); + + RemoteManifestManager( + BlobStoreTransferService blobStoreTransferService, + BlobStoreRepository blobStoreRepository, + ClusterSettings clusterSettings, + String nodeId, + ThreadPool threadPool, + String clusterName + ) { + this.blobStoreRepository = blobStoreRepository; + this.metadataManifestUploadTimeout = clusterSettings.get(METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING); + this.nodeId = nodeId; + manifestBlobStore = new RemoteClusterMetadataManifestBlobStore( + blobStoreTransferService, + blobStoreRepository, + clusterName, + threadPool + ); + clusterSettings.addSettingsUpdateConsumer(METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING, this::setMetadataManifestUploadTimeout); + } + + ClusterMetadataManifest uploadManifest( + ClusterState clusterState, + RemoteClusterStateUtils.UploadedMetadataResults uploadedMetadataResult, + String previousClusterUUID, + boolean committed + ) { + synchronized (this) { + ClusterMetadataManifest.Builder manifestBuilder = ClusterMetadataManifest.builder(); + manifestBuilder.clusterTerm(clusterState.term()) + .stateVersion(clusterState.getVersion()) + .clusterUUID(clusterState.metadata().clusterUUID()) + .stateUUID(clusterState.stateUUID()) + .opensearchVersion(Version.CURRENT) + .nodeId(nodeId) + .committed(committed) + .codecVersion(RemoteClusterMetadataManifest.MANIFEST_CURRENT_CODEC_VERSION) + .indices(uploadedMetadataResult.uploadedIndexMetadata) + .previousClusterUUID(previousClusterUUID) + .clusterUUIDCommitted(clusterState.metadata().clusterUUIDCommitted()) + .coordinationMetadata(uploadedMetadataResult.uploadedCoordinationMetadata) + .settingMetadata(uploadedMetadataResult.uploadedSettingsMetadata) + .templatesMetadata(uploadedMetadataResult.uploadedTemplatesMetadata) + .customMetadataMap(uploadedMetadataResult.uploadedCustomMetadataMap); + final ClusterMetadataManifest manifest = manifestBuilder.build(); + writeMetadataManifest(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), manifest); + return manifest; + } + } + + private void writeMetadataManifest(String clusterUUID, ClusterMetadataManifest uploadManifest) { + AtomicReference exceptionReference = new AtomicReference(); + + // latch to wait until upload is not finished + CountDownLatch latch = new CountDownLatch(1); + + LatchedActionListener completionListener = new LatchedActionListener<>(ActionListener.wrap(resp -> { + logger.trace(String.format(Locale.ROOT, "Manifest file uploaded successfully.")); + }, ex -> { exceptionReference.set(ex); }), latch); + + RemoteClusterMetadataManifest remoteClusterMetadataManifest = new RemoteClusterMetadataManifest( + uploadManifest, + clusterUUID, + blobStoreRepository + ); + manifestBlobStore.writeAsync(remoteClusterMetadataManifest, completionListener); + + try { + if (latch.await(getMetadataManifestUploadTimeout().millis(), TimeUnit.MILLISECONDS) == false) { + RemoteStateTransferException ex = new RemoteStateTransferException( + String.format(Locale.ROOT, "Timed out waiting for transfer of manifest file to complete") + ); + throw ex; + } + } catch (InterruptedException ex) { + RemoteStateTransferException exception = new RemoteStateTransferException( + String.format(Locale.ROOT, "Timed out waiting for transfer of manifest file to complete - %s"), + ex + ); + Thread.currentThread().interrupt(); + throw exception; + } + if (exceptionReference.get() != null) { + throw new RemoteStateTransferException(exceptionReference.get().getMessage(), exceptionReference.get()); + } + logger.debug( + "Metadata manifest file [{}] written during [{}] phase. ", + remoteClusterMetadataManifest.getBlobFileName(), + uploadManifest.isCommitted() ? "commit" : "publish" + ); + } + + /** + * Fetch latest ClusterMetadataManifest from remote state store + * + * @param clusterUUID uuid of cluster state to refer to in remote + * @param clusterName name of the cluster + * @return ClusterMetadataManifest + */ + public Optional getLatestClusterMetadataManifest(String clusterName, String clusterUUID) { + Optional latestManifestFileName = getLatestManifestFileName(clusterName, clusterUUID); + return latestManifestFileName.map(s -> fetchRemoteClusterMetadataManifest(clusterName, clusterUUID, s)); + } + + /** + * Fetch the cluster metadata manifest using term and version + * @param clusterName uuid of cluster state to refer to in remote + * @param clusterUUID name of the cluster + * @param term election term of the cluster + * @param version cluster state version number + * @return ClusterMetadataManifest + */ + public Optional getClusterMetadataManifestByTermVersion( + String clusterName, + String clusterUUID, + long term, + long version + ) { + Optional manifestFileName = getManifestFileNameByTermVersion(clusterName, clusterUUID, term, version); + return manifestFileName.map(s -> fetchRemoteClusterMetadataManifest(clusterName, clusterUUID, s)); + } + + /** + * Fetch ClusterMetadataManifest from remote state store + * + * @param clusterUUID uuid of cluster state to refer to in remote + * @param clusterName name of the cluster + * @return ClusterMetadataManifest + */ + ClusterMetadataManifest fetchRemoteClusterMetadataManifest(String clusterName, String clusterUUID, String filename) + throws IllegalStateException { + try { + String fullBlobName = getManifestFolderPath(clusterName, clusterUUID).buildAsString() + filename; + RemoteClusterMetadataManifest remoteClusterMetadataManifest = new RemoteClusterMetadataManifest( + fullBlobName, + clusterUUID, + blobStoreRepository + ); + return manifestBlobStore.read(remoteClusterMetadataManifest); + } catch (IOException e) { + throw new IllegalStateException(String.format(Locale.ROOT, "Error while downloading cluster metadata - %s", filename), e); + } + } + + Map getLatestManifestForAllClusterUUIDs(String clusterName, Set clusterUUIDs) { + Map manifestsByClusterUUID = new HashMap<>(); + for (String clusterUUID : clusterUUIDs) { + try { + Optional manifest = getLatestClusterMetadataManifest(clusterName, clusterUUID); + manifest.ifPresent(clusterMetadataManifest -> manifestsByClusterUUID.put(clusterUUID, clusterMetadataManifest)); + } catch (Exception e) { + throw new IllegalStateException( + String.format(Locale.ROOT, "Exception in fetching manifest for clusterUUID: %s", clusterUUID), + e + ); + } + } + return manifestsByClusterUUID; + } + + private BlobContainer manifestContainer(String clusterName, String clusterUUID) { + // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/manifest + return blobStoreRepository.blobStore().blobContainer(getManifestFolderPath(clusterName, clusterUUID)); + } + + BlobPath getManifestFolderPath(String clusterName, String clusterUUID) { + return getCusterMetadataBasePath(blobStoreRepository, clusterName, clusterUUID).add( + RemoteClusterMetadataManifest.MANIFEST_PATH_TOKEN + ); + } + + public TimeValue getMetadataManifestUploadTimeout() { + return this.metadataManifestUploadTimeout; + } + + private void setMetadataManifestUploadTimeout(TimeValue newMetadataManifestUploadTimeout) { + this.metadataManifestUploadTimeout = newMetadataManifestUploadTimeout; + } + + /** + * Fetch ClusterMetadataManifest files from remote state store in order + * + * @param clusterUUID uuid of cluster state to refer to in remote + * @param clusterName name of the cluster + * @param limit max no of files to fetch + * @return all manifest file names + */ + private List getManifestFileNames(String clusterName, String clusterUUID, String filePrefix, int limit) + throws IllegalStateException { + try { + + /* + {@link BlobContainer#listBlobsByPrefixInSortedOrder} will list the latest manifest file first + as the manifest file name generated via {@link RemoteClusterStateService#getManifestFileName} ensures + when sorted in LEXICOGRAPHIC order the latest uploaded manifest file comes on top. + */ + return manifestContainer(clusterName, clusterUUID).listBlobsByPrefixInSortedOrder( + filePrefix, + limit, + BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC + ); + } catch (IOException e) { + throw new IllegalStateException("Error while fetching latest manifest file for remote cluster state", e); + } + } + + static String getManifestFilePrefixForTermVersion(long term, long version) { + return String.join( + DELIMITER, + RemoteClusterMetadataManifest.MANIFEST_FILE_PREFIX, + RemoteStoreUtils.invertLong(term), + RemoteStoreUtils.invertLong(version) + ) + DELIMITER; + } + + /** + * Fetch latest ClusterMetadataManifest file from remote state store + * + * @param clusterUUID uuid of cluster state to refer to in remote + * @param clusterName name of the cluster + * @return latest ClusterMetadataManifest filename + */ + private Optional getLatestManifestFileName(String clusterName, String clusterUUID) throws IllegalStateException { + List manifestFilesMetadata = getManifestFileNames( + clusterName, + clusterUUID, + RemoteClusterMetadataManifest.MANIFEST_FILE_PREFIX + DELIMITER, + 1 + ); + if (manifestFilesMetadata != null && !manifestFilesMetadata.isEmpty()) { + return Optional.of(manifestFilesMetadata.get(0).name()); + } + logger.info("No manifest file present in remote store for cluster name: {}, cluster UUID: {}", clusterName, clusterUUID); + return Optional.empty(); + } + + private Optional getManifestFileNameByTermVersion(String clusterName, String clusterUUID, long term, long version) + throws IllegalStateException { + final String filePrefix = getManifestFilePrefixForTermVersion(term, version); + List manifestFilesMetadata = getManifestFileNames(clusterName, clusterUUID, filePrefix, 1); + if (manifestFilesMetadata != null && !manifestFilesMetadata.isEmpty()) { + return Optional.of(manifestFilesMetadata.get(0).name()); + } + logger.info( + "No manifest file present in remote store for cluster name: {}, cluster UUID: {}, term: {}, version: {}", + clusterName, + clusterUUID, + term, + version + ); + return Optional.empty(); + } +} diff --git a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java index 0b3cd49140939..9e11ab97c9f31 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java @@ -33,6 +33,11 @@ import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V0; import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V1; +import static org.opensearch.gateway.remote.RemoteGlobalMetadataManager.COORDINATION_METADATA; +import static org.opensearch.gateway.remote.RemoteGlobalMetadataManager.CUSTOM_DELIMITER; +import static org.opensearch.gateway.remote.RemoteGlobalMetadataManager.CUSTOM_METADATA; +import static org.opensearch.gateway.remote.RemoteGlobalMetadataManager.SETTING_METADATA; +import static org.opensearch.gateway.remote.RemoteGlobalMetadataManager.TEMPLATES_METADATA; public class ClusterMetadataManifestTests extends OpenSearchTestCase { @@ -104,22 +109,18 @@ public void testClusterMetadataManifestXContent() throws IOException { Collections.singletonList(uploadedIndexMetadata), "prev-cluster-uuid", true, - new UploadedMetadataAttribute(RemoteClusterStateService.COORDINATION_METADATA, "coordination-file"), - new UploadedMetadataAttribute(RemoteClusterStateService.SETTING_METADATA, "setting-file"), - new UploadedMetadataAttribute(RemoteClusterStateService.TEMPLATES_METADATA, "templates-file"), + new UploadedMetadataAttribute(COORDINATION_METADATA, "coordination-file"), + new UploadedMetadataAttribute(SETTING_METADATA, "setting-file"), + new UploadedMetadataAttribute(TEMPLATES_METADATA, "templates-file"), Collections.unmodifiableList( Arrays.asList( new UploadedMetadataAttribute( - RemoteClusterStateService.CUSTOM_METADATA + RemoteClusterStateService.CUSTOM_DELIMITER + RepositoriesMetadata.TYPE, + CUSTOM_METADATA + CUSTOM_DELIMITER + RepositoriesMetadata.TYPE, "custom--repositories-file" ), + new UploadedMetadataAttribute(CUSTOM_METADATA + CUSTOM_DELIMITER + IndexGraveyard.TYPE, "custom--index_graveyard-file"), new UploadedMetadataAttribute( - RemoteClusterStateService.CUSTOM_METADATA + RemoteClusterStateService.CUSTOM_DELIMITER + IndexGraveyard.TYPE, - "custom--index_graveyard-file" - ), - new UploadedMetadataAttribute( - RemoteClusterStateService.CUSTOM_METADATA + RemoteClusterStateService.CUSTOM_DELIMITER - + WeightedRoutingMetadata.TYPE, + CUSTOM_METADATA + CUSTOM_DELIMITER + WeightedRoutingMetadata.TYPE, "custom--weighted_routing_netadata-file" ) ) @@ -150,22 +151,18 @@ public void testClusterMetadataManifestSerializationEqualsHashCode() { randomUploadedIndexMetadataList(), "yfObdx8KSMKKrXf8UyHhM", true, - new UploadedMetadataAttribute(RemoteClusterStateService.COORDINATION_METADATA, "coordination-file"), - new UploadedMetadataAttribute(RemoteClusterStateService.SETTING_METADATA, "setting-file"), - new UploadedMetadataAttribute(RemoteClusterStateService.TEMPLATES_METADATA, "templates-file"), + new UploadedMetadataAttribute(COORDINATION_METADATA, "coordination-file"), + new UploadedMetadataAttribute(SETTING_METADATA, "setting-file"), + new UploadedMetadataAttribute(TEMPLATES_METADATA, "templates-file"), Collections.unmodifiableList( Arrays.asList( new UploadedMetadataAttribute( - RemoteClusterStateService.CUSTOM_METADATA + RemoteClusterStateService.CUSTOM_DELIMITER + RepositoriesMetadata.TYPE, + CUSTOM_METADATA + CUSTOM_DELIMITER + RepositoriesMetadata.TYPE, "custom--repositories-file" ), + new UploadedMetadataAttribute(CUSTOM_METADATA + CUSTOM_DELIMITER + IndexGraveyard.TYPE, "custom--index_graveyard-file"), new UploadedMetadataAttribute( - RemoteClusterStateService.CUSTOM_METADATA + RemoteClusterStateService.CUSTOM_DELIMITER + IndexGraveyard.TYPE, - "custom--index_graveyard-file" - ), - new UploadedMetadataAttribute( - RemoteClusterStateService.CUSTOM_METADATA + RemoteClusterStateService.CUSTOM_DELIMITER - + WeightedRoutingMetadata.TYPE, + CUSTOM_METADATA + CUSTOM_DELIMITER + WeightedRoutingMetadata.TYPE, "custom--weighted_routing_netadata-file" ) ) diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerTests.java index 24fd1b164a4ff..1ce57b0a006b0 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerTests.java @@ -54,18 +54,19 @@ import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING; import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.RETAINED_MANIFESTS; import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.SKIP_CLEANUP_STATE_CHANGES; -import static org.opensearch.gateway.remote.RemoteClusterStateService.CLUSTER_STATE_PATH_TOKEN; -import static org.opensearch.gateway.remote.RemoteClusterStateService.COORDINATION_METADATA; -import static org.opensearch.gateway.remote.RemoteClusterStateService.DELIMITER; -import static org.opensearch.gateway.remote.RemoteClusterStateService.GLOBAL_METADATA_PATH_TOKEN; -import static org.opensearch.gateway.remote.RemoteClusterStateService.INDEX_PATH_TOKEN; -import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_FILE_PREFIX; -import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_PATH_TOKEN; -import static org.opensearch.gateway.remote.RemoteClusterStateService.SETTING_METADATA; -import static org.opensearch.gateway.remote.RemoteClusterStateService.TEMPLATES_METADATA; -import static org.opensearch.gateway.remote.RemoteClusterStateService.encodeString; import static org.opensearch.gateway.remote.RemoteClusterStateServiceTests.generateClusterStateWithOneIndex; import static org.opensearch.gateway.remote.RemoteClusterStateServiceTests.nodesWithLocalNodeClusterManager; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.CLUSTER_STATE_PATH_TOKEN; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.GLOBAL_METADATA_PATH_TOKEN; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.encodeString; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.getCusterMetadataBasePath; +import static org.opensearch.gateway.remote.RemoteGlobalMetadataManager.COORDINATION_METADATA; +import static org.opensearch.gateway.remote.RemoteGlobalMetadataManager.SETTING_METADATA; +import static org.opensearch.gateway.remote.RemoteGlobalMetadataManager.TEMPLATES_METADATA; +import static org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest.MANIFEST_FILE_PREFIX; +import static org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest.MANIFEST_PATH_TOKEN; +import static org.opensearch.gateway.remote.model.RemoteIndexMetadata.INDEX_PATH_TOKEN; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT; @@ -205,13 +206,9 @@ public void testDeleteClusterMetadata() throws IOException { .build(); ClusterMetadataManifest manifest5 = ClusterMetadataManifest.builder(manifest4).templatesMetadata(templateMetadataUpdated).build(); - when(remoteClusterStateService.fetchRemoteClusterMetadataManifest(eq(clusterName), eq(clusterUUID), any())).thenReturn( - manifest4, - manifest5, - manifest1, - manifest2, - manifest3 - ); + when( + remoteClusterStateService.getRemoteManifestManager().fetchRemoteClusterMetadataManifest(eq(clusterName), eq(clusterUUID), any()) + ).thenReturn(manifest4, manifest5, manifest1, manifest2, manifest3); BlobContainer container = mock(BlobContainer.class); when(blobStore.blobContainer(any())).thenReturn(container); doNothing().when(container).deleteBlobsIgnoringIfNotExists(any()); @@ -282,7 +279,7 @@ public void testDeleteStaleClusterUUIDs() throws IOException { ).thenReturn(List.of(new PlainBlobMetadata("mainfest3", 1L))); Set uuids = new HashSet<>(Arrays.asList("cluster-uuid1", "cluster-uuid2", "cluster-uuid3")); when(remoteClusterStateService.getAllClusterUUIDs(any())).thenReturn(uuids); - when(remoteClusterStateService.getCusterMetadataBasePath(any(), any())).then( + when(getCusterMetadataBasePath(any(), any(), any())).then( invocationOnMock -> blobPath.add(encodeString(invocationOnMock.getArgument(0))) .add(CLUSTER_STATE_PATH_TOKEN) .add((String) invocationOnMock.getArgument(1)) @@ -306,11 +303,11 @@ public void testRemoteStateCleanupFailureStats() throws IOException { BlobPath blobPath = new BlobPath().add("random-path"); when((blobStoreRepository.basePath())).thenReturn(blobPath); remoteClusterStateCleanupManager.start(); - remoteClusterStateCleanupManager.deleteStaleUUIDsClusterMetadata("cluster1", List.of("cluster-uuid1")); + remoteClusterStateCleanupManager.deleteStaleUUIDsClusterMetadata("cluster1", Arrays.asList("cluster-uuid1")); try { assertBusy(() -> { // wait for stats to get updated - assertNotNull(remoteClusterStateCleanupManager.getStats()); + assertTrue(remoteClusterStateCleanupManager.getStats() != null); assertEquals(0, remoteClusterStateCleanupManager.getStats().getSuccessCount()); assertEquals(1, remoteClusterStateCleanupManager.getStats().getCleanupAttemptFailedCount()); }); diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index 5f0c371a3137e..45d347792f2eb 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -42,8 +42,10 @@ import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute; +import org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest; +import org.opensearch.gateway.remote.model.RemoteCustomMetadata; +import org.opensearch.gateway.remote.model.RemoteIndexMetadata; import org.opensearch.index.remote.RemoteIndexPathUploader; -import org.opensearch.index.remote.RemoteStoreUtils; import org.opensearch.indices.IndicesModule; import org.opensearch.repositories.FilterRepository; import org.opensearch.repositories.RepositoriesService; @@ -84,15 +86,13 @@ import org.mockito.ArgumentMatchers; import static java.util.stream.Collectors.toList; -import static org.opensearch.gateway.remote.RemoteClusterStateService.COORDINATION_METADATA; -import static org.opensearch.gateway.remote.RemoteClusterStateService.DELIMITER; -import static org.opensearch.gateway.remote.RemoteClusterStateService.FORMAT_PARAMS; -import static org.opensearch.gateway.remote.RemoteClusterStateService.INDEX_METADATA_CURRENT_CODEC_VERSION; -import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_CURRENT_CODEC_VERSION; -import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_FILE_PREFIX; -import static org.opensearch.gateway.remote.RemoteClusterStateService.METADATA_FILE_PREFIX; -import static org.opensearch.gateway.remote.RemoteClusterStateService.SETTING_METADATA; -import static org.opensearch.gateway.remote.RemoteClusterStateService.TEMPLATES_METADATA; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.FORMAT_PARAMS; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.RemoteStateTransferException; +import static org.opensearch.gateway.remote.RemoteGlobalMetadataManager.COORDINATION_METADATA; +import static org.opensearch.gateway.remote.RemoteGlobalMetadataManager.SETTING_METADATA; +import static org.opensearch.gateway.remote.RemoteGlobalMetadataManager.TEMPLATES_METADATA; +import static org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest.MANIFEST_CURRENT_CODEC_VERSION; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT; @@ -117,6 +117,7 @@ public class RemoteClusterStateServiceTests extends OpenSearchTestCase { private RepositoriesService repositoriesService; private BlobStoreRepository blobStoreRepository; private BlobStore blobStore; + private Settings settings; private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); @Before @@ -136,7 +137,7 @@ public void setup() { "remote_store_repository" ); - Settings settings = Settings.builder() + settings = Settings.builder() .put("node.attr." + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, "remote_store_repository") .put(stateRepoTypeAttributeKey, FsRepository.TYPE) .put(stateRepoSettingsAttributeKeyPrefix + "location", "randomRepoPath") @@ -296,7 +297,7 @@ public void testWriteFullMetadataInParallelSuccess() throws IOException { .provideStream(0) .getInputStream() .readAllBytes(); - IndexMetadata writtenIndexMetadata = RemoteClusterStateService.INDEX_METADATA_FORMAT.deserialize( + IndexMetadata writtenIndexMetadata = RemoteIndexMetadata.INDEX_METADATA_FORMAT.deserialize( capturedWriteContext.get("metadata").getFileName(), blobStoreRepository.getNamedXContentRegistry(), new BytesArray(writtenBytes) @@ -335,7 +336,7 @@ public void run() { remoteClusterStateService.start(); assertThrows( - RemoteClusterStateService.RemoteStateTransferException.class, + RemoteStateTransferException.class, () -> remoteClusterStateService.writeFullMetadata(clusterState, randomAlphaOfLength(10)) ); } @@ -368,7 +369,7 @@ public void testTimeoutWhileWritingManifestFile() throws IOException { try { remoteClusterStateService.writeFullMetadata(clusterState, randomAlphaOfLength(10)); } catch (Exception e) { - assertTrue(e instanceof RemoteClusterStateService.RemoteStateTransferException); + assertTrue(e instanceof RemoteStateTransferException); assertTrue(e.getMessage().contains("Timed out waiting for transfer of following metadata to complete")); } } @@ -389,7 +390,7 @@ public void testWriteFullMetadataInParallelFailureForIndexMetadata() throws IOEx remoteClusterStateService.start(); assertThrows( - RemoteClusterStateService.RemoteStateTransferException.class, + RemoteStateTransferException.class, () -> remoteClusterStateService.writeFullMetadata(clusterState, randomAlphaOfLength(10)) ); assertEquals(0, remoteClusterStateService.getStats().getSuccessCount()); @@ -904,24 +905,6 @@ private void verifyMetadataAttributeOnlyUpdated( assertions.accept(initialManifest, manifestAfterMetadataUpdate); } - public void testReadLatestMetadataManifestFailedIOException() throws IOException { - final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); - - BlobContainer blobContainer = mockBlobStoreObjects(); - when(blobContainer.listBlobsByPrefixInSortedOrder("manifest" + DELIMITER, 1, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC)) - .thenThrow(IOException.class); - - remoteClusterStateService.start(); - Exception e = assertThrows( - IllegalStateException.class, - () -> remoteClusterStateService.getLatestClusterMetadataManifest( - clusterState.getClusterName().value(), - clusterState.metadata().clusterUUID() - ) - ); - assertEquals(e.getMessage(), "Error while fetching latest manifest file for remote cluster state"); - } - public void testReadLatestMetadataManifestFailedNoManifestFileInRemote() throws IOException { final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); @@ -976,10 +959,10 @@ public void testReadLatestMetadataManifestSuccessButNoIndexMetadata() throws IOE remoteClusterStateService.start(); assertEquals( - remoteClusterStateService.getLatestClusterState(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID()) - .getMetadata() - .getIndices() - .size(), + remoteClusterStateService.getLatestClusterState( + clusterState.getClusterName().value(), + clusterState.metadata().clusterUUID() + ).getMetadata().getIndices().size(), 0 ); } @@ -1062,10 +1045,10 @@ public void testReadGlobalMetadata() throws IOException { .stateUUID("state-uuid") .clusterUUID("cluster-uuid") .codecVersion(MANIFEST_CURRENT_CODEC_VERSION) - .coordinationMetadata(new UploadedMetadataAttribute(COORDINATION_METADATA, "mock-coordination-file")) - .settingMetadata(new UploadedMetadataAttribute(SETTING_METADATA, "mock-setting-file")) - .templatesMetadata(new UploadedMetadataAttribute(TEMPLATES_METADATA, "mock-templates-file")) - .put(IndexGraveyard.TYPE, new UploadedMetadataAttribute(IndexGraveyard.TYPE, "mock-custom-" +IndexGraveyard.TYPE+ "-file")) + .coordinationMetadata(new ClusterMetadataManifest.UploadedMetadataAttribute(COORDINATION_METADATA, "mock-coordination-file")) + .settingMetadata(new ClusterMetadataManifest.UploadedMetadataAttribute(SETTING_METADATA, "mock-setting-file")) + .templatesMetadata(new ClusterMetadataManifest.UploadedMetadataAttribute(TEMPLATES_METADATA, "mock-templates-file")) + .put(IndexGraveyard.TYPE, new ClusterMetadataManifest.UploadedMetadataAttribute(IndexGraveyard.TYPE, "mock-custom-" +IndexGraveyard.TYPE+ "-file")) .nodeId("nodeA") .opensearchVersion(VersionUtils.randomOpenSearchVersion(random())) .previousClusterUUID("prev-cluster-uuid") @@ -1074,7 +1057,7 @@ public void testReadGlobalMetadata() throws IOException { Metadata expactedMetadata = Metadata.builder().persistentSettings(Settings.builder().put("readonly", true).build()).build(); mockBlobContainerForGlobalMetadata(mockBlobStoreObjects(), expectedManifest, expactedMetadata); - ClusterState newClusterState = remoteClusterStateService.getLatestClusterState( + ClusterState newClusterState = remoteClusterStateService.getLatestClusterState( clusterState.getClusterName().value(), clusterState.metadata().clusterUUID() ); @@ -1112,7 +1095,7 @@ public void testReadGlobalMetadataIOException() throws IOException { BlobContainer blobContainer = mockBlobStoreObjects(); mockBlobContainerForGlobalMetadata(blobContainer, expectedManifest, expactedMetadata); - when(blobContainer.readBlob(RemoteClusterStateService.GLOBAL_METADATA_FORMAT.blobName(globalIndexMetadataName))).thenThrow( + when(blobContainer.readBlob(RemoteGlobalMetadataManager.GLOBAL_METADATA_FORMAT.blobName(globalIndexMetadataName))).thenThrow( FileNotFoundException.class ); @@ -1290,94 +1273,6 @@ public void testRemoteStateStats() throws IOException { assertEquals(0, remoteClusterStateService.getStats().getFailedCount()); } - public void testFileNames() { - final Index index = new Index("test-index", "index-uuid"); - final Settings idxSettings = Settings.builder() - .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) - .put(IndexMetadata.SETTING_INDEX_UUID, index.getUUID()) - .build(); - final IndexMetadata indexMetadata = new IndexMetadata.Builder(index.getName()).settings(idxSettings) - .numberOfShards(1) - .numberOfReplicas(0) - .build(); - - String indexMetadataFileName = RemoteClusterStateService.indexMetadataFileName(indexMetadata); - String[] splittedIndexMetadataFileName = indexMetadataFileName.split(DELIMITER); - assertThat(indexMetadataFileName.split(DELIMITER).length, is(4)); - assertThat(splittedIndexMetadataFileName[0], is(METADATA_FILE_PREFIX)); - assertThat(splittedIndexMetadataFileName[1], is(RemoteStoreUtils.invertLong(indexMetadata.getVersion()))); - assertThat(splittedIndexMetadataFileName[3], is(String.valueOf(INDEX_METADATA_CURRENT_CODEC_VERSION))); - - verifyManifestFileNameWithCodec(MANIFEST_CURRENT_CODEC_VERSION); - verifyManifestFileNameWithCodec(ClusterMetadataManifest.CODEC_V1); - verifyManifestFileNameWithCodec(ClusterMetadataManifest.CODEC_V0); - } - - private void verifyManifestFileNameWithCodec(int codecVersion) { - int term = randomIntBetween(5, 10); - int version = randomIntBetween(5, 10); - String manifestFileName = RemoteClusterStateService.getManifestFileName(term, version, true, codecVersion); - assertThat(manifestFileName.split(DELIMITER).length, is(6)); - String[] splittedName = manifestFileName.split(DELIMITER); - assertThat(splittedName[0], is(MANIFEST_FILE_PREFIX)); - assertThat(splittedName[1], is(RemoteStoreUtils.invertLong(term))); - assertThat(splittedName[2], is(RemoteStoreUtils.invertLong(version))); - assertThat(splittedName[3], is("C")); - assertThat(splittedName[5], is(String.valueOf(codecVersion))); - - manifestFileName = RemoteClusterStateService.getManifestFileName(term, version, false, codecVersion); - splittedName = manifestFileName.split(DELIMITER); - assertThat(splittedName[3], is("P")); - } - - public void testIndexMetadataUploadWaitTimeSetting() { - // verify default value - assertEquals( - RemoteClusterStateService.INDEX_METADATA_UPLOAD_TIMEOUT_DEFAULT, - remoteClusterStateService.getIndexMetadataUploadTimeout() - ); - - // verify update index metadata upload timeout - int indexMetadataUploadTimeout = randomIntBetween(1, 10); - Settings newSettings = Settings.builder() - .put("cluster.remote_store.state.index_metadata.upload_timeout", indexMetadataUploadTimeout + "s") - .build(); - clusterSettings.applySettings(newSettings); - assertEquals(indexMetadataUploadTimeout, remoteClusterStateService.getIndexMetadataUploadTimeout().seconds()); - } - - public void testMetadataManifestUploadWaitTimeSetting() { - // verify default value - assertEquals( - RemoteClusterStateService.METADATA_MANIFEST_UPLOAD_TIMEOUT_DEFAULT, - remoteClusterStateService.getMetadataManifestUploadTimeout() - ); - - // verify update metadata manifest upload timeout - int metadataManifestUploadTimeout = randomIntBetween(1, 10); - Settings newSettings = Settings.builder() - .put("cluster.remote_store.state.metadata_manifest.upload_timeout", metadataManifestUploadTimeout + "s") - .build(); - clusterSettings.applySettings(newSettings); - assertEquals(metadataManifestUploadTimeout, remoteClusterStateService.getMetadataManifestUploadTimeout().seconds()); - } - - public void testGlobalMetadataUploadWaitTimeSetting() { - // verify default value - assertEquals( - RemoteClusterStateService.GLOBAL_METADATA_UPLOAD_TIMEOUT_DEFAULT, - remoteClusterStateService.getGlobalMetadataUploadTimeout() - ); - - // verify update global metadata upload timeout - int globalMetadataUploadTimeout = randomIntBetween(1, 10); - Settings newSettings = Settings.builder() - .put("cluster.remote_store.state.global_metadata.upload_timeout", globalMetadataUploadTimeout + "s") - .build(); - clusterSettings.applySettings(newSettings); - assertEquals(globalMetadataUploadTimeout, remoteClusterStateService.getGlobalMetadataUploadTimeout().seconds()); - } - private void mockObjectsForGettingPreviousClusterUUID(Map clusterUUIDsPointers) throws IOException { mockObjectsForGettingPreviousClusterUUID(clusterUUIDsPointers, false, Collections.emptyMap()); } @@ -1624,7 +1519,7 @@ private void mockBlobContainer( when(blobContainer.listBlobsByPrefixInSortedOrder("manifest" + DELIMITER, 1, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC)) .thenReturn(Arrays.asList(blobMetadata)); - BytesReference bytes = RemoteClusterStateService.CLUSTER_METADATA_MANIFEST_FORMAT.serialize( + BytesReference bytes = RemoteClusterMetadataManifest.CLUSTER_METADATA_MANIFEST_FORMAT.serialize( clusterMetadataManifest, manifestFileName, blobStoreRepository.getCompressor(), @@ -1640,7 +1535,7 @@ private void mockBlobContainer( } String fileName = uploadedIndexMetadata.getUploadedFilename(); when(blobContainer.readBlob(fileName + ".dat")).thenAnswer((invocationOnMock) -> { - BytesReference bytesIndexMetadata = RemoteClusterStateService.INDEX_METADATA_FORMAT.serialize( + BytesReference bytesIndexMetadata = RemoteIndexMetadata.INDEX_METADATA_FORMAT.serialize( indexMetadata, fileName, blobStoreRepository.getCompressor(), @@ -1662,15 +1557,10 @@ private void mockBlobContainerForGlobalMetadata( int codecVersion = clusterMetadataManifest.getCodecVersion(); String mockManifestFileName = "manifest__1__2__C__456__" + codecVersion; BlobMetadata blobMetadata = new PlainBlobMetadata(mockManifestFileName, 1); - when( - blobContainer.listBlobsByPrefixInSortedOrder( - "manifest" + RemoteClusterStateService.DELIMITER, - 1, - BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC - ) - ).thenReturn(Arrays.asList(blobMetadata)); + when(blobContainer.listBlobsByPrefixInSortedOrder("manifest" + DELIMITER, 1, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC)) + .thenReturn(Arrays.asList(blobMetadata)); - BytesReference bytes = RemoteClusterStateService.CLUSTER_METADATA_MANIFEST_FORMAT.serialize( + BytesReference bytes = RemoteClusterMetadataManifest.CLUSTER_METADATA_MANIFEST_FORMAT.serialize( clusterMetadataManifest, mockManifestFileName, blobStoreRepository.getCompressor(), @@ -1679,22 +1569,21 @@ private void mockBlobContainerForGlobalMetadata( when(blobContainer.readBlob(mockManifestFileName)).thenReturn(new ByteArrayInputStream(bytes.streamInput().readAllBytes())); if (codecVersion >= ClusterMetadataManifest.CODEC_V2) { String coordinationFileName = getFileNameFromPath(clusterMetadataManifest.getCoordinationMetadata().getUploadedFilename()); - when(blobContainer.readBlob(RemoteClusterStateService.COORDINATION_METADATA_FORMAT.blobName(coordinationFileName))).thenAnswer( - (invocationOnMock) -> { - BytesReference bytesReference = RemoteClusterStateService.COORDINATION_METADATA_FORMAT.serialize( + when(blobContainer.readBlob(RemoteGlobalMetadataManager.COORDINATION_METADATA_FORMAT.blobName(coordinationFileName))) + .thenAnswer((invocationOnMock) -> { + BytesReference bytesReference = RemoteGlobalMetadataManager.COORDINATION_METADATA_FORMAT.serialize( metadata.coordinationMetadata(), coordinationFileName, blobStoreRepository.getCompressor(), FORMAT_PARAMS ); return new ByteArrayInputStream(bytesReference.streamInput().readAllBytes()); - } - ); + }); String settingsFileName = getFileNameFromPath(clusterMetadataManifest.getSettingsMetadata().getUploadedFilename()); - when(blobContainer.readBlob(RemoteClusterStateService.SETTINGS_METADATA_FORMAT.blobName(settingsFileName))).thenAnswer( + when(blobContainer.readBlob(RemoteGlobalMetadataManager.SETTINGS_METADATA_FORMAT.blobName(settingsFileName))).thenAnswer( (invocationOnMock) -> { - BytesReference bytesReference = RemoteClusterStateService.SETTINGS_METADATA_FORMAT.serialize( + BytesReference bytesReference = RemoteGlobalMetadataManager.SETTINGS_METADATA_FORMAT.serialize( metadata.persistentSettings(), settingsFileName, blobStoreRepository.getCompressor(), @@ -1705,9 +1594,9 @@ private void mockBlobContainerForGlobalMetadata( ); String templatesFileName = getFileNameFromPath(clusterMetadataManifest.getTemplatesMetadata().getUploadedFilename()); - when(blobContainer.readBlob(RemoteClusterStateService.TEMPLATES_METADATA_FORMAT.blobName(templatesFileName))).thenAnswer( + when(blobContainer.readBlob(RemoteGlobalMetadataManager.TEMPLATES_METADATA_FORMAT.blobName(templatesFileName))).thenAnswer( (invocationOnMock) -> { - BytesReference bytesReference = RemoteClusterStateService.TEMPLATES_METADATA_FORMAT.serialize( + BytesReference bytesReference = RemoteGlobalMetadataManager.TEMPLATES_METADATA_FORMAT.serialize( metadata.templatesMetadata(), templatesFileName, blobStoreRepository.getCompressor(), @@ -1725,23 +1614,21 @@ private void mockBlobContainerForGlobalMetadata( for (Map.Entry entry : customFileMap.entrySet()) { String custom = entry.getKey(); String fileName = entry.getValue(); - when(blobContainer.readBlob(RemoteClusterStateService.CUSTOM_METADATA_FORMAT.blobName(fileName))).thenAnswer( - (invocation) -> { - BytesReference bytesReference = RemoteClusterStateService.CUSTOM_METADATA_FORMAT.serialize( - metadata.custom(custom), - fileName, - blobStoreRepository.getCompressor(), - FORMAT_PARAMS - ); - return new ByteArrayInputStream(bytesReference.streamInput().readAllBytes()); - } - ); + when(blobContainer.readBlob(RemoteCustomMetadata.CUSTOM_METADATA_FORMAT.blobName(fileName))).thenAnswer((invocation) -> { + BytesReference bytesReference = RemoteCustomMetadata.CUSTOM_METADATA_FORMAT.serialize( + metadata.custom(custom), + fileName, + blobStoreRepository.getCompressor(), + FORMAT_PARAMS + ); + return new ByteArrayInputStream(bytesReference.streamInput().readAllBytes()); + }); } } else if (codecVersion == ClusterMetadataManifest.CODEC_V1) { String[] splitPath = clusterMetadataManifest.getGlobalMetadataFileName().split("/"); - when(blobContainer.readBlob(RemoteClusterStateService.GLOBAL_METADATA_FORMAT.blobName(splitPath[splitPath.length - 1]))) + when(blobContainer.readBlob(RemoteGlobalMetadataManager.GLOBAL_METADATA_FORMAT.blobName(splitPath[splitPath.length - 1]))) .thenAnswer((invocationOnMock) -> { - BytesReference bytesGlobalMetadata = RemoteClusterStateService.GLOBAL_METADATA_FORMAT.serialize( + BytesReference bytesGlobalMetadata = RemoteGlobalMetadataManager.GLOBAL_METADATA_FORMAT.serialize( metadata, "global-metadata-file", blobStoreRepository.getCompressor(), @@ -1757,7 +1644,7 @@ private String getFileNameFromPath(String filePath) { return splitPath[splitPath.length - 1]; } - private static ClusterState.Builder generateClusterStateWithGlobalMetadata() { + static ClusterState.Builder generateClusterStateWithGlobalMetadata() { final Settings clusterSettings = Settings.builder().put("cluster.blocks.read_only", true).build(); final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder().term(1L).build(); diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManagerTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManagerTests.java new file mode 100644 index 0000000000000..dc9d55ddec7c6 --- /dev/null +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManagerTests.java @@ -0,0 +1,63 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote; + +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.translog.transfer.BlobStoreTransferService; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.junit.After; +import org.junit.Before; + +import static org.mockito.Mockito.mock; + +public class RemoteGlobalMetadataManagerTests extends OpenSearchTestCase { + private RemoteGlobalMetadataManager remoteGlobalMetadataManager; + private ClusterSettings clusterSettings; + private BlobStoreRepository blobStoreRepository; + private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); + + @Before + public void setup() { + clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + blobStoreRepository = mock(BlobStoreRepository.class); + BlobStoreTransferService blobStoreTransferService = mock(BlobStoreTransferService.class); + remoteGlobalMetadataManager = new RemoteGlobalMetadataManager( + blobStoreRepository, + clusterSettings, + threadPool, + blobStoreTransferService, + "test-cluster" + ); + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + } + + public void testGlobalMetadataUploadWaitTimeSetting() { + // verify default value + assertEquals( + RemoteGlobalMetadataManager.GLOBAL_METADATA_UPLOAD_TIMEOUT_DEFAULT, + remoteGlobalMetadataManager.getGlobalMetadataUploadTimeout() + ); + + // verify update global metadata upload timeout + int globalMetadataUploadTimeout = randomIntBetween(1, 10); + Settings newSettings = Settings.builder() + .put("cluster.remote_store.state.global_metadata.upload_timeout", globalMetadataUploadTimeout + "s") + .build(); + clusterSettings.applySettings(newSettings); + assertEquals(globalMetadataUploadTimeout, remoteGlobalMetadataManager.getGlobalMetadataUploadTimeout().seconds()); + } +} diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteIndexMetadataManagerTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteIndexMetadataManagerTests.java new file mode 100644 index 0000000000000..a6660d663401a --- /dev/null +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteIndexMetadataManagerTests.java @@ -0,0 +1,62 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote; + +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.translog.transfer.BlobStoreTransferService; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.junit.After; +import org.junit.Before; + +import static org.mockito.Mockito.mock; + +public class RemoteIndexMetadataManagerTests extends OpenSearchTestCase { + private RemoteIndexMetadataManager remoteIndexMetadataManager; + private BlobStoreRepository blobStoreRepository; + private ClusterSettings clusterSettings; + private BlobStoreTransferService blobStoreTransferService; + + @Before + public void setup() { + clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + blobStoreRepository = mock(BlobStoreRepository.class); + blobStoreTransferService = mock(BlobStoreTransferService.class); + remoteIndexMetadataManager = new RemoteIndexMetadataManager( + blobStoreRepository, + clusterSettings, + new TestThreadPool("test"), + "cluster-name", + blobStoreTransferService + ); + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + } + + public void testIndexMetadataUploadWaitTimeSetting() { + // verify default value + assertEquals( + RemoteIndexMetadataManager.INDEX_METADATA_UPLOAD_TIMEOUT_DEFAULT, + remoteIndexMetadataManager.getIndexMetadataUploadTimeout() + ); + + // verify update index metadata upload timeout + int indexMetadataUploadTimeout = randomIntBetween(1, 10); + Settings newSettings = Settings.builder() + .put("cluster.remote_store.state.index_metadata.upload_timeout", indexMetadataUploadTimeout + "s") + .build(); + clusterSettings.applySettings(newSettings); + assertEquals(indexMetadataUploadTimeout, remoteIndexMetadataManager.getIndexMetadataUploadTimeout().seconds()); + } +} diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteManifestManagerTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteManifestManagerTests.java new file mode 100644 index 0000000000000..cb873552ca62d --- /dev/null +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteManifestManagerTests.java @@ -0,0 +1,106 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote; + +import org.opensearch.cluster.ClusterState; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.BlobStore; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.translog.transfer.BlobStoreTransferService; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.junit.After; +import org.junit.Before; + +import java.io.IOException; + +import static org.opensearch.gateway.remote.RemoteClusterStateServiceTests.generateClusterStateWithOneIndex; +import static org.opensearch.gateway.remote.RemoteClusterStateServiceTests.nodesWithLocalNodeClusterManager; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class RemoteManifestManagerTests extends OpenSearchTestCase { + private RemoteManifestManager remoteManifestManager; + private ClusterSettings clusterSettings; + private BlobStoreRepository blobStoreRepository; + private BlobStore blobStore; + private BlobStoreTransferService blobStoreTransferService; + + @Before + public void setup() { + clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + blobStoreRepository = mock(BlobStoreRepository.class); + remoteManifestManager = new RemoteManifestManager( + blobStoreTransferService, + blobStoreRepository, + clusterSettings, + "test-node-id", + new TestThreadPool("test"), + "test-cluster-name" + ); + blobStoreTransferService = mock(BlobStoreTransferService.class); + blobStore = mock(BlobStore.class); + when(blobStoreRepository.blobStore()).thenReturn(blobStore); + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + } + + public void testMetadataManifestUploadWaitTimeSetting() { + // verify default value + assertEquals( + RemoteManifestManager.METADATA_MANIFEST_UPLOAD_TIMEOUT_DEFAULT, + remoteManifestManager.getMetadataManifestUploadTimeout() + ); + + // verify update metadata manifest upload timeout + int metadataManifestUploadTimeout = randomIntBetween(1, 10); + Settings newSettings = Settings.builder() + .put("cluster.remote_store.state.metadata_manifest.upload_timeout", metadataManifestUploadTimeout + "s") + .build(); + clusterSettings.applySettings(newSettings); + assertEquals(metadataManifestUploadTimeout, remoteManifestManager.getMetadataManifestUploadTimeout().seconds()); + } + + public void testReadLatestMetadataManifestFailedIOException() throws IOException { + final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); + + BlobContainer blobContainer = mockBlobStoreObjects(); + when(blobContainer.listBlobsByPrefixInSortedOrder("manifest" + DELIMITER, 1, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC)) + .thenThrow(IOException.class); + + Exception e = assertThrows( + IllegalStateException.class, + () -> remoteManifestManager.getLatestClusterMetadataManifest( + clusterState.getClusterName().value(), + clusterState.metadata().clusterUUID() + ) + ); + assertEquals(e.getMessage(), "Error while fetching latest manifest file for remote cluster state"); + } + + private BlobContainer mockBlobStoreObjects() { + final BlobPath blobPath = mock(BlobPath.class); + when((blobStoreRepository.basePath())).thenReturn(blobPath); + when(blobPath.add(anyString())).thenReturn(blobPath); + when(blobPath.buildAsString()).thenReturn("/blob/path/"); + final BlobContainer blobContainer = mock(BlobContainer.class); + when(blobContainer.path()).thenReturn(blobPath); + when(blobStore.blobContainer(any())).thenReturn(blobContainer); + return blobContainer; + } +}