From bf30f0cc930b124d8dd50ceead2876dc4f8ade27 Mon Sep 17 00:00:00 2001 From: Shivansh Arora Date: Fri, 5 Jul 2024 19:04:00 +0530 Subject: [PATCH] Create interface RemoteEntitiesManager - Add javadocs - fix after rebase Signed-off-by: Shivansh Arora --- .../remote/AbstractRemoteEntitiesManager.java | 91 ++++++++++++++ .../common/remote/RemoteEntitiesManager.java | 50 ++++++++ .../RemoteClusterStateAttributesManager.java | 48 ++------ .../remote/RemoteClusterStateService.java | 76 ++++++++---- .../remote/RemoteGlobalMetadataManager.java | 54 +++------ .../remote/RemoteIndexMetadataManager.java | 86 ++++++-------- .../AbstractRemoteEntitiesManagerTests.java | 65 ++++++++++ ...oteClusterStateAttributesManagerTests.java | 35 +++--- .../RemoteGlobalMetadataManagerTests.java | 111 ++++++++++-------- .../RemoteIndexMetadataManagerTests.java | 38 +++--- 10 files changed, 427 insertions(+), 227 deletions(-) create mode 100644 server/src/main/java/org/opensearch/common/remote/AbstractRemoteEntitiesManager.java create mode 100644 server/src/main/java/org/opensearch/common/remote/RemoteEntitiesManager.java create mode 100644 server/src/test/java/org/opensearch/common/remote/AbstractRemoteEntitiesManagerTests.java diff --git a/server/src/main/java/org/opensearch/common/remote/AbstractRemoteEntitiesManager.java b/server/src/main/java/org/opensearch/common/remote/AbstractRemoteEntitiesManager.java new file mode 100644 index 0000000000000..9023c2d95902d --- /dev/null +++ b/server/src/main/java/org/opensearch/common/remote/AbstractRemoteEntitiesManager.java @@ -0,0 +1,91 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.remote; + +import org.opensearch.action.LatchedActionListener; +import org.opensearch.common.CheckedRunnable; +import org.opensearch.core.action.ActionListener; +import org.opensearch.gateway.remote.ClusterMetadataManifest; +import org.opensearch.gateway.remote.model.RemoteReadResult; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** + * An abstract class that provides a base implementation for managing remote entities in the remote store. + */ +public abstract class AbstractRemoteEntitiesManager implements RemoteEntitiesManager { + /** + * A map that stores the remote writable entity stores, keyed by the entity type. + */ + protected final Map remoteWritableEntityStores = new HashMap<>(); + + /** + * Retrieves the remote writable entity store for the given entity. + * + * @param entity the entity for which the store is requested + * @return the remote writable entity store for the given entity + * @throws IllegalArgumentException if the entity type is unknown + */ + protected RemoteWritableEntityStore getStore(AbstractRemoteWritableBlobEntity entity) { + RemoteWritableEntityStore remoteStore = remoteWritableEntityStores.get(entity.getType()); + if (remoteStore == null) { + throw new IllegalArgumentException("Unknown entity type [" + entity.getType() + "]"); + } + return remoteStore; + } + + /** + * Returns an ActionListener for handling the write operation for the specified component, remote object, and latched action listener. + * + * @param component the component for which the write operation is performed + * @param remoteObject the remote object to be written + * @param latchedActionListener the latched action listener to be notified when the write operation completes + * @return an ActionListener for handling the write operation + */ + protected abstract ActionListener getWriteActionListener( + String component, + AbstractRemoteWritableBlobEntity remoteObject, + LatchedActionListener latchedActionListener + ); + + /** + * Returns an ActionListener for handling the read operation for the specified component, + * remote object, and latched action listener. + * + * @param component the component for which the read operation is performed + * @param remoteObject the remote object to be read + * @param latchedActionListener the latched action listener to be notified when the read operation completes + * @return an ActionListener for handling the read operation + */ + protected abstract ActionListener getReadActionListener( + String component, + AbstractRemoteWritableBlobEntity remoteObject, + LatchedActionListener latchedActionListener + ); + + @Override + public CheckedRunnable getAsyncWriteRunnable( + String component, + AbstractRemoteWritableBlobEntity entity, + LatchedActionListener latchedActionListener + ) { + return () -> getStore(entity).writeAsync(entity, getWriteActionListener(component, entity, latchedActionListener)); + } + + @Override + public CheckedRunnable getAsyncReadRunnable( + String component, + AbstractRemoteWritableBlobEntity entity, + LatchedActionListener latchedActionListener + ) { + return () -> getStore(entity).readAsync(entity, getReadActionListener(component, entity, latchedActionListener)); + } +} diff --git a/server/src/main/java/org/opensearch/common/remote/RemoteEntitiesManager.java b/server/src/main/java/org/opensearch/common/remote/RemoteEntitiesManager.java new file mode 100644 index 0000000000000..f01d3ab509fbc --- /dev/null +++ b/server/src/main/java/org/opensearch/common/remote/RemoteEntitiesManager.java @@ -0,0 +1,50 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.remote; + +import org.opensearch.action.LatchedActionListener; +import org.opensearch.common.CheckedRunnable; +import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata; +import org.opensearch.gateway.remote.model.RemoteReadResult; + +import java.io.IOException; + +/** + * The RemoteEntitiesManager interface provides async read and write methods for managing remote entities in the remote store + */ +public interface RemoteEntitiesManager { + + /** + * Returns a CheckedRunnable that performs an asynchronous read operation for the specified component and entity. + * + * @param component the component for which the read operation is performed + * @param entity the entity to be read + * @param latchedActionListener the listener to be notified when the read operation completes + * @return a CheckedRunnable that performs the asynchronous read operation + */ + CheckedRunnable getAsyncReadRunnable( + String component, + AbstractRemoteWritableBlobEntity entity, + LatchedActionListener latchedActionListener + ); + + /** + * Returns a CheckedRunnable that performs an asynchronous write operation for the specified component and entity. + * + * @param component the component for which the write operation is performed + * @param entity the entity to be written + * @param latchedActionListener the listener to be notified when the write operation completes + * @return a CheckedRunnable that performs the asynchronous write operation + */ + CheckedRunnable getAsyncWriteRunnable( + String component, + AbstractRemoteWritableBlobEntity entity, + LatchedActionListener latchedActionListener + ); +} diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java index 8f986423587d7..22a00ee5d6353 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java @@ -12,9 +12,8 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.DiffableUtils; import org.opensearch.cluster.DiffableUtils.NonDiffableValueSerializer; -import org.opensearch.common.CheckedRunnable; +import org.opensearch.common.remote.AbstractRemoteEntitiesManager; import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity; -import org.opensearch.common.remote.RemoteWritableEntityStore; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.gateway.remote.model.RemoteClusterBlocks; @@ -26,9 +25,7 @@ import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.threadpool.ThreadPool; -import java.io.IOException; import java.util.Collections; -import java.util.HashMap; import java.util.Map; /** @@ -36,12 +33,11 @@ * * @opensearch.internal */ -public class RemoteClusterStateAttributesManager { +public class RemoteClusterStateAttributesManager extends AbstractRemoteEntitiesManager { public static final String CLUSTER_STATE_ATTRIBUTE = "cluster_state_attribute"; public static final String DISCOVERY_NODES = "nodes"; public static final String CLUSTER_BLOCKS = "blocks"; public static final int CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION = 1; - private final Map remoteWritableEntityStores; private final NamedWriteableRegistry namedWriteableRegistry; RemoteClusterStateAttributesManager( @@ -52,7 +48,6 @@ public class RemoteClusterStateAttributesManager { ThreadPool threadpool ) { this.namedWriteableRegistry = namedWriteableRegistry; - this.remoteWritableEntityStores = new HashMap<>(); this.remoteWritableEntityStores.put( RemoteDiscoveryNodes.DISCOVERY_NODES, new RemoteClusterStateBlobStore<>( @@ -85,46 +80,28 @@ public class RemoteClusterStateAttributesManager { ); } - /** - * Allows async upload of Cluster State Attribute components to remote - */ - CheckedRunnable getAsyncMetadataWriteAction( - String component, - AbstractRemoteWritableBlobEntity blobEntity, - LatchedActionListener latchedActionListener - ) { - return () -> getStore(blobEntity).writeAsync(blobEntity, getActionListener(component, blobEntity, latchedActionListener)); - } - - private ActionListener getActionListener( + @Override + protected ActionListener getWriteActionListener( String component, AbstractRemoteWritableBlobEntity remoteObject, LatchedActionListener latchedActionListener ) { return ActionListener.wrap( resp -> latchedActionListener.onResponse(remoteObject.getUploadedMetadata()), - ex -> latchedActionListener.onFailure(new RemoteStateTransferException(component, remoteObject, ex)) + ex -> latchedActionListener.onFailure(new RemoteStateTransferException("Upload failed for " + component, remoteObject, ex)) ); } - private RemoteWritableEntityStore getStore(AbstractRemoteWritableBlobEntity entity) { - RemoteWritableEntityStore remoteStore = remoteWritableEntityStores.get(entity.getType()); - if (remoteStore == null) { - throw new IllegalArgumentException("Unknown entity type [" + entity.getType() + "]"); - } - return remoteStore; - } - - public CheckedRunnable getAsyncMetadataReadAction( + @Override + protected ActionListener getReadActionListener( String component, - AbstractRemoteWritableBlobEntity blobEntity, - LatchedActionListener listener + AbstractRemoteWritableBlobEntity remoteObject, + LatchedActionListener latchedActionListener ) { - final ActionListener actionListener = ActionListener.wrap( - response -> listener.onResponse(new RemoteReadResult(response, CLUSTER_STATE_ATTRIBUTE, component)), - listener::onFailure + return ActionListener.wrap( + response -> latchedActionListener.onResponse(new RemoteReadResult(response, CLUSTER_STATE_ATTRIBUTE, component)), + ex -> latchedActionListener.onFailure(new RemoteStateTransferException("Download failed for " + component, remoteObject, ex)) ); - return () -> getStore(blobEntity).readAsync(blobEntity, actionListener); } public DiffableUtils.MapDiff> getUpdatedCustoms( @@ -158,4 +135,5 @@ public DiffableUtils.MapDiff { uploadTasks.put( indexMetadata.getIndex().getName(), - remoteIndexMetadataManager.getAsyncIndexMetadataWriteAction(indexMetadata, clusterState.metadata().clusterUUID(), listener) + remoteIndexMetadataManager.getAsyncWriteRunnable( + indexMetadata.getIndex().getName(), + new RemoteIndexMetadata( + indexMetadata, + clusterState.metadata().clusterUUID(), + blobStoreRepository.getCompressor(), + blobStoreRepository.getNamedXContentRegistry() + ), + listener + ) ); }); clusterStateCustomToUpload.forEach((key, value) -> { uploadTasks.put( key, - remoteClusterStateAttributesManager.getAsyncMetadataWriteAction( + remoteClusterStateAttributesManager.getAsyncWriteRunnable( CLUSTER_STATE_CUSTOM, new RemoteClusterStateCustoms( value, @@ -1015,7 +1030,16 @@ private ClusterState readClusterStateInParallel( for (UploadedIndexMetadata indexMetadata : indicesToRead) { asyncMetadataReadActions.add( - remoteIndexMetadataManager.getAsyncIndexMetadataReadAction(clusterUUID, indexMetadata.getUploadedFilename(), listener) + remoteIndexMetadataManager.getAsyncReadRunnable( + indexMetadata.getIndexName(), + new RemoteIndexMetadata( + RemoteClusterStateUtils.getFormattedIndexFileName(indexMetadata.getUploadedFilename()), + clusterUUID, + blobStoreRepository.getCompressor(), + blobStoreRepository.getNamedXContentRegistry() + ), + listener + ) ); } @@ -1042,7 +1066,8 @@ private ClusterState readClusterStateInParallel( for (Map.Entry entry : customToRead.entrySet()) { asyncMetadataReadActions.add( - remoteGlobalMetadataManager.getAsyncMetadataReadAction( + remoteGlobalMetadataManager.getAsyncReadRunnable( + entry.getValue().getAttributeName(), new RemoteCustomMetadata( entry.getValue().getUploadedFilename(), entry.getKey(), @@ -1050,7 +1075,6 @@ private ClusterState readClusterStateInParallel( blobStoreRepository.getCompressor(), namedWriteableRegistry ), - entry.getValue().getAttributeName(), listener ) ); @@ -1058,14 +1082,14 @@ private ClusterState readClusterStateInParallel( if (readCoordinationMetadata) { asyncMetadataReadActions.add( - remoteGlobalMetadataManager.getAsyncMetadataReadAction( + remoteGlobalMetadataManager.getAsyncReadRunnable( + COORDINATION_METADATA, new RemoteCoordinationMetadata( manifest.getCoordinationMetadata().getUploadedFilename(), clusterUUID, blobStoreRepository.getCompressor(), blobStoreRepository.getNamedXContentRegistry() ), - COORDINATION_METADATA, listener ) ); @@ -1073,14 +1097,14 @@ private ClusterState readClusterStateInParallel( if (readSettingsMetadata) { asyncMetadataReadActions.add( - remoteGlobalMetadataManager.getAsyncMetadataReadAction( + remoteGlobalMetadataManager.getAsyncReadRunnable( + SETTING_METADATA, new RemotePersistentSettingsMetadata( manifest.getSettingsMetadata().getUploadedFilename(), clusterUUID, blobStoreRepository.getCompressor(), blobStoreRepository.getNamedXContentRegistry() ), - SETTING_METADATA, listener ) ); @@ -1088,14 +1112,14 @@ private ClusterState readClusterStateInParallel( if (readTransientSettingsMetadata) { asyncMetadataReadActions.add( - remoteGlobalMetadataManager.getAsyncMetadataReadAction( + remoteGlobalMetadataManager.getAsyncReadRunnable( + TRANSIENT_SETTING_METADATA, new RemoteTransientSettingsMetadata( manifest.getTransientSettingsMetadata().getUploadedFilename(), clusterUUID, blobStoreRepository.getCompressor(), blobStoreRepository.getNamedXContentRegistry() ), - TRANSIENT_SETTING_METADATA, listener ) ); @@ -1103,14 +1127,14 @@ private ClusterState readClusterStateInParallel( if (readTemplatesMetadata) { asyncMetadataReadActions.add( - remoteGlobalMetadataManager.getAsyncMetadataReadAction( + remoteGlobalMetadataManager.getAsyncReadRunnable( + TEMPLATES_METADATA, new RemoteTemplatesMetadata( manifest.getTemplatesMetadata().getUploadedFilename(), clusterUUID, blobStoreRepository.getCompressor(), blobStoreRepository.getNamedXContentRegistry() ), - TEMPLATES_METADATA, listener ) ); @@ -1118,7 +1142,7 @@ private ClusterState readClusterStateInParallel( if (readDiscoveryNodes) { asyncMetadataReadActions.add( - remoteClusterStateAttributesManager.getAsyncMetadataReadAction( + remoteClusterStateAttributesManager.getAsyncReadRunnable( DISCOVERY_NODES, new RemoteDiscoveryNodes( manifest.getDiscoveryNodesMetadata().getUploadedFilename(), @@ -1132,7 +1156,7 @@ private ClusterState readClusterStateInParallel( if (readClusterBlocks) { asyncMetadataReadActions.add( - remoteClusterStateAttributesManager.getAsyncMetadataReadAction( + remoteClusterStateAttributesManager.getAsyncReadRunnable( CLUSTER_BLOCKS, new RemoteClusterBlocks( manifest.getClusterBlocksMetadata().getUploadedFilename(), @@ -1146,13 +1170,13 @@ private ClusterState readClusterStateInParallel( if (readHashesOfConsistentSettings) { asyncMetadataReadActions.add( - remoteGlobalMetadataManager.getAsyncMetadataReadAction( + remoteGlobalMetadataManager.getAsyncReadRunnable( + HASHES_OF_CONSISTENT_SETTINGS, new RemoteHashesOfConsistentSettings( manifest.getHashesOfConsistentSettings().getUploadedFilename(), clusterUUID, blobStoreRepository.getCompressor() ), - HASHES_OF_CONSISTENT_SETTINGS, listener ) ); @@ -1160,7 +1184,7 @@ private ClusterState readClusterStateInParallel( for (Map.Entry entry : clusterStateCustomToRead.entrySet()) { asyncMetadataReadActions.add( - remoteClusterStateAttributesManager.getAsyncMetadataReadAction( + remoteClusterStateAttributesManager.getAsyncReadRunnable( // pass component name as cluster-state-custom--, so that we can interpret it later String.join(CUSTOM_DELIMITER, CLUSTER_STATE_CUSTOM, entry.getKey()), new RemoteClusterStateCustoms( diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManager.java index 2c5aad99adc0c..3c00c4cc2d94a 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManager.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManager.java @@ -17,9 +17,8 @@ import org.opensearch.cluster.metadata.Metadata.Custom; import org.opensearch.cluster.metadata.Metadata.XContentContext; import org.opensearch.cluster.metadata.TemplatesMetadata; -import org.opensearch.common.CheckedRunnable; +import org.opensearch.common.remote.AbstractRemoteEntitiesManager; import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity; -import org.opensearch.common.remote.RemoteWritableEntityStore; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; @@ -43,7 +42,6 @@ import java.io.IOException; import java.util.Collections; -import java.util.HashMap; import java.util.Locale; import java.util.Map; import java.util.Map.Entry; @@ -56,7 +54,7 @@ * * @opensearch.internal */ -public class RemoteGlobalMetadataManager { +public class RemoteGlobalMetadataManager extends AbstractRemoteEntitiesManager { public static final TimeValue GLOBAL_METADATA_UPLOAD_TIMEOUT_DEFAULT = TimeValue.timeValueMillis(20000); @@ -70,7 +68,6 @@ public class RemoteGlobalMetadataManager { public static final int GLOBAL_METADATA_CURRENT_CODEC_VERSION = 1; private volatile TimeValue globalMetadataUploadTimeout; - private Map remoteWritableEntityStores; private final Compressor compressor; private final NamedXContentRegistry namedXContentRegistry; private final NamedWriteableRegistry namedWriteableRegistry; @@ -87,7 +84,6 @@ public class RemoteGlobalMetadataManager { this.compressor = blobStoreRepository.getCompressor(); this.namedXContentRegistry = blobStoreRepository.getNamedXContentRegistry(); this.namedWriteableRegistry = namedWriteableRegistry; - this.remoteWritableEntityStores = new HashMap<>(); this.remoteWritableEntityStores.put( RemoteGlobalMetadata.GLOBAL_METADATA, new RemoteClusterStateBlobStore<>( @@ -161,46 +157,28 @@ public class RemoteGlobalMetadataManager { clusterSettings.addSettingsUpdateConsumer(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING, this::setGlobalMetadataUploadTimeout); } - /** - * Allows async upload of Metadata components to remote - */ - CheckedRunnable getAsyncMetadataWriteAction( - AbstractRemoteWritableBlobEntity writeEntity, - LatchedActionListener latchedActionListener - ) { - return (() -> getStore(writeEntity).writeAsync(writeEntity, getActionListener(writeEntity, latchedActionListener))); - } - - private RemoteWritableEntityStore getStore(AbstractRemoteWritableBlobEntity entity) { - RemoteWritableEntityStore remoteStore = remoteWritableEntityStores.get(entity.getType()); - if (remoteStore == null) { - throw new IllegalArgumentException("Unknown entity type [" + entity.getType() + "]"); - } - return remoteStore; - } - - private ActionListener getActionListener( - AbstractRemoteWritableBlobEntity remoteBlobStoreObject, + @Override + protected ActionListener getWriteActionListener( + String component, + AbstractRemoteWritableBlobEntity remoteObject, LatchedActionListener latchedActionListener ) { return ActionListener.wrap( - resp -> latchedActionListener.onResponse(remoteBlobStoreObject.getUploadedMetadata()), - ex -> latchedActionListener.onFailure( - new RemoteStateTransferException("Upload failed for " + remoteBlobStoreObject.getType(), ex) - ) + resp -> latchedActionListener.onResponse(remoteObject.getUploadedMetadata()), + ex -> latchedActionListener.onFailure(new RemoteStateTransferException("Upload failed for " + component, remoteObject, ex)) ); } - CheckedRunnable getAsyncMetadataReadAction( - AbstractRemoteWritableBlobEntity readEntity, - String componentName, - LatchedActionListener listener + @Override + protected ActionListener getReadActionListener( + String component, + AbstractRemoteWritableBlobEntity remoteObject, + LatchedActionListener latchedActionListener ) { - ActionListener actionListener = ActionListener.wrap( - response -> listener.onResponse(new RemoteReadResult(response, readEntity.getType(), componentName)), - listener::onFailure + return ActionListener.wrap( + response -> latchedActionListener.onResponse(new RemoteReadResult(response, remoteObject.getType(), component)), + ex -> latchedActionListener.onFailure(new RemoteStateTransferException("Download failed for " + component, remoteObject, ex)) ); - return () -> getStore(readEntity).readAsync(readEntity, actionListener); } Metadata getGlobalMetadata(String clusterUUID, ClusterMetadataManifest clusterMetadataManifest) { diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteIndexMetadataManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteIndexMetadataManager.java index c595f19279354..b4dce486128b9 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteIndexMetadataManager.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteIndexMetadataManager.java @@ -10,8 +10,8 @@ import org.opensearch.action.LatchedActionListener; import org.opensearch.cluster.metadata.IndexMetadata; -import org.opensearch.common.CheckedRunnable; -import org.opensearch.common.remote.RemoteWritableEntityStore; +import org.opensearch.common.remote.AbstractRemoteEntitiesManager; +import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.unit.TimeValue; @@ -33,7 +33,7 @@ * * @opensearch.internal */ -public class RemoteIndexMetadataManager { +public class RemoteIndexMetadataManager extends AbstractRemoteEntitiesManager { public static final TimeValue INDEX_METADATA_UPLOAD_TIMEOUT_DEFAULT = TimeValue.timeValueMillis(20000); @@ -45,7 +45,6 @@ public class RemoteIndexMetadataManager { Setting.Property.Deprecated ); - private final RemoteWritableEntityStore indexMetadataBlobStore; private final Compressor compressor; private final NamedXContentRegistry namedXContentRegistry; @@ -58,12 +57,15 @@ public RemoteIndexMetadataManager( BlobStoreTransferService blobStoreTransferService, ThreadPool threadpool ) { - this.indexMetadataBlobStore = new RemoteClusterStateBlobStore<>( - blobStoreTransferService, - blobStoreRepository, - clusterName, - threadpool, - ThreadPool.Names.REMOTE_STATE_READ + this.remoteWritableEntityStores.put( + RemoteIndexMetadata.INDEX, + new RemoteClusterStateBlobStore<>( + blobStoreTransferService, + blobStoreRepository, + clusterName, + threadpool, + ThreadPool.Names.REMOTE_STATE_READ + ) ); this.namedXContentRegistry = blobStoreRepository.getNamedXContentRegistry(); this.compressor = blobStoreRepository.getCompressor(); @@ -71,45 +73,6 @@ public RemoteIndexMetadataManager( clusterSettings.addSettingsUpdateConsumer(INDEX_METADATA_UPLOAD_TIMEOUT_SETTING, this::setIndexMetadataUploadTimeout); } - /** - * Allows async Upload of IndexMetadata to remote - * - * @param indexMetadata {@link IndexMetadata} to upload - * @param latchedActionListener listener to respond back on after upload finishes - */ - CheckedRunnable getAsyncIndexMetadataWriteAction( - IndexMetadata indexMetadata, - String clusterUUID, - LatchedActionListener latchedActionListener - ) { - RemoteIndexMetadata remoteIndexMetadata = new RemoteIndexMetadata(indexMetadata, clusterUUID, compressor, namedXContentRegistry); - ActionListener completionListener = ActionListener.wrap( - resp -> latchedActionListener.onResponse(remoteIndexMetadata.getUploadedMetadata()), - ex -> latchedActionListener.onFailure(new RemoteStateTransferException(indexMetadata.getIndex().getName(), ex)) - ); - return () -> indexMetadataBlobStore.writeAsync(remoteIndexMetadata, completionListener); - } - - CheckedRunnable getAsyncIndexMetadataReadAction( - String clusterUUID, - String uploadedFilename, - LatchedActionListener latchedActionListener - ) { - RemoteIndexMetadata remoteIndexMetadata = new RemoteIndexMetadata( - RemoteClusterStateUtils.getFormattedIndexFileName(uploadedFilename), - clusterUUID, - compressor, - namedXContentRegistry - ); - ActionListener actionListener = ActionListener.wrap( - response -> latchedActionListener.onResponse( - new RemoteReadResult(response, RemoteIndexMetadata.INDEX, response.getIndex().getName()) - ), - latchedActionListener::onFailure - ); - return () -> indexMetadataBlobStore.readAsync(remoteIndexMetadata, actionListener); - } - /** * Fetch index metadata from remote cluster state * @@ -124,7 +87,7 @@ IndexMetadata getIndexMetadata(ClusterMetadataManifest.UploadedIndexMetadata upl namedXContentRegistry ); try { - return indexMetadataBlobStore.read(remoteIndexMetadata); + return (IndexMetadata) getStore(remoteIndexMetadata).read(remoteIndexMetadata); } catch (IOException e) { throw new IllegalStateException( String.format(Locale.ROOT, "Error while downloading IndexMetadata - %s", uploadedIndexMetadata.getUploadedFilename()), @@ -141,4 +104,27 @@ private void setIndexMetadataUploadTimeout(TimeValue newIndexMetadataUploadTimeo this.indexMetadataUploadTimeout = newIndexMetadataUploadTimeout; } + @Override + protected ActionListener getWriteActionListener( + String component, + AbstractRemoteWritableBlobEntity remoteObject, + LatchedActionListener latchedActionListener + ) { + return ActionListener.wrap( + resp -> latchedActionListener.onResponse(remoteObject.getUploadedMetadata()), + ex -> latchedActionListener.onFailure(new RemoteStateTransferException("Upload failed for " + component, remoteObject, ex)) + ); + } + + @Override + protected ActionListener getReadActionListener( + String component, + AbstractRemoteWritableBlobEntity remoteObject, + LatchedActionListener latchedActionListener + ) { + return ActionListener.wrap( + response -> latchedActionListener.onResponse(new RemoteReadResult(response, RemoteIndexMetadata.INDEX, component)), + ex -> latchedActionListener.onFailure(new RemoteStateTransferException("Download failed for " + component, remoteObject, ex)) + ); + } } diff --git a/server/src/test/java/org/opensearch/common/remote/AbstractRemoteEntitiesManagerTests.java b/server/src/test/java/org/opensearch/common/remote/AbstractRemoteEntitiesManagerTests.java new file mode 100644 index 0000000000000..53ebf488bbe60 --- /dev/null +++ b/server/src/test/java/org/opensearch/common/remote/AbstractRemoteEntitiesManagerTests.java @@ -0,0 +1,65 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.remote; + +import org.opensearch.action.LatchedActionListener; +import org.opensearch.core.action.ActionListener; +import org.opensearch.gateway.remote.ClusterMetadataManifest; +import org.opensearch.gateway.remote.model.RemoteReadResult; +import org.opensearch.test.OpenSearchTestCase; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class AbstractRemoteEntitiesManagerTests extends OpenSearchTestCase { + public void testGetStoreWithKnownEntityType() { + AbstractRemoteEntitiesManager manager = new ConcreteRemoteEntitiesManager(); + String knownEntityType = "knownType"; + RemoteWritableEntityStore mockStore = mock(RemoteWritableEntityStore.class); + manager.remoteWritableEntityStores.put(knownEntityType, mockStore); + AbstractRemoteWritableBlobEntity mockEntity = mock(AbstractRemoteWritableBlobEntity.class); + when(mockEntity.getType()).thenReturn(knownEntityType); + + RemoteWritableEntityStore store = manager.getStore(mockEntity); + verify(mockEntity).getType(); + assertEquals(mockStore, store); + } + + public void testGetStoreWithUnknownEntityType() { + AbstractRemoteEntitiesManager manager = new ConcreteRemoteEntitiesManager(); + String unknownEntityType = "unknownType"; + AbstractRemoteWritableBlobEntity mockEntity = mock(AbstractRemoteWritableBlobEntity.class); + when(mockEntity.getType()).thenReturn(unknownEntityType); + + assertThrows(IllegalArgumentException.class, () -> manager.getStore(mockEntity)); + verify(mockEntity, times(2)).getType(); + } + + private static class ConcreteRemoteEntitiesManager extends AbstractRemoteEntitiesManager { + @Override + protected ActionListener getWriteActionListener( + String component, + AbstractRemoteWritableBlobEntity remoteObject, + LatchedActionListener latchedActionListener + ) { + return null; + } + + @Override + protected ActionListener getReadActionListener( + String component, + AbstractRemoteWritableBlobEntity remoteObject, + LatchedActionListener latchedActionListener + ) { + return null; + } + } +} diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManagerTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManagerTests.java index fe9ed57fa77b8..afa5e5ed36161 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManagerTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManagerTests.java @@ -108,7 +108,7 @@ public void tearDown() throws Exception { threadPool.shutdown(); } - public void testGetAsyncMetadataWriteAction_DiscoveryNodes() throws IOException, InterruptedException { + public void testGetAsyncWriteRunnable_DiscoveryNodes() throws IOException, InterruptedException { DiscoveryNodes discoveryNodes = getDiscoveryNodes(); RemoteDiscoveryNodes remoteDiscoveryNodes = new RemoteDiscoveryNodes(discoveryNodes, VERSION, CLUSTER_UUID, compressor); doAnswer(invocationOnMock -> { @@ -118,7 +118,7 @@ public void testGetAsyncMetadataWriteAction_DiscoveryNodes() throws IOException, .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); final CountDownLatch latch = new CountDownLatch(1); final TestCapturingListener listener = new TestCapturingListener<>(); - remoteClusterStateAttributesManager.getAsyncMetadataWriteAction( + remoteClusterStateAttributesManager.getAsyncWriteRunnable( DISCOVERY_NODES, remoteDiscoveryNodes, new LatchedActionListener<>(listener, latch) @@ -141,7 +141,7 @@ public void testGetAsyncMetadataWriteAction_DiscoveryNodes() throws IOException, assertEquals(CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION, Integer.parseInt(splitFileName[3])); } - public void testGetAsyncMetadataReadAction_DiscoveryNodes() throws IOException, InterruptedException { + public void testGetAsyncReadRunnable_DiscoveryNodes() throws IOException, InterruptedException { DiscoveryNodes discoveryNodes = getDiscoveryNodes(); String fileName = randomAlphaOfLength(10); when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenReturn( @@ -150,7 +150,7 @@ public void testGetAsyncMetadataReadAction_DiscoveryNodes() throws IOException, RemoteDiscoveryNodes remoteObjForDownload = new RemoteDiscoveryNodes(fileName, "cluster-uuid", compressor); CountDownLatch latch = new CountDownLatch(1); TestCapturingListener listener = new TestCapturingListener<>(); - remoteClusterStateAttributesManager.getAsyncMetadataReadAction( + remoteClusterStateAttributesManager.getAsyncReadRunnable( DISCOVERY_NODES, remoteObjForDownload, new LatchedActionListener<>(listener, latch) @@ -166,7 +166,7 @@ public void testGetAsyncMetadataReadAction_DiscoveryNodes() throws IOException, assertEquals(discoveryNodes.getClusterManagerNodeId(), readDiscoveryNodes.getClusterManagerNodeId()); } - public void testGetAsyncMetadataWriteAction_ClusterBlocks() throws IOException, InterruptedException { + public void testGetAsyncWriteRunnable_ClusterBlocks() throws IOException, InterruptedException { ClusterBlocks clusterBlocks = randomClusterBlocks(); RemoteClusterBlocks remoteClusterBlocks = new RemoteClusterBlocks(clusterBlocks, VERSION, CLUSTER_UUID, compressor); doAnswer(invocationOnMock -> { @@ -176,7 +176,7 @@ public void testGetAsyncMetadataWriteAction_ClusterBlocks() throws IOException, .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); final CountDownLatch latch = new CountDownLatch(1); final TestCapturingListener listener = new TestCapturingListener<>(); - remoteClusterStateAttributesManager.getAsyncMetadataWriteAction( + remoteClusterStateAttributesManager.getAsyncWriteRunnable( CLUSTER_BLOCKS, remoteClusterBlocks, new LatchedActionListener<>(listener, latch) @@ -199,7 +199,7 @@ public void testGetAsyncMetadataWriteAction_ClusterBlocks() throws IOException, assertEquals(CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION, Integer.parseInt(splitFileName[3])); } - public void testGetAsyncMetadataReadAction_ClusterBlocks() throws IOException, InterruptedException { + public void testGetAsyncReadRunnable_ClusterBlocks() throws IOException, InterruptedException { ClusterBlocks clusterBlocks = randomClusterBlocks(); String fileName = randomAlphaOfLength(10); when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenReturn( @@ -209,7 +209,7 @@ public void testGetAsyncMetadataReadAction_ClusterBlocks() throws IOException, I CountDownLatch latch = new CountDownLatch(1); TestCapturingListener listener = new TestCapturingListener<>(); - remoteClusterStateAttributesManager.getAsyncMetadataReadAction( + remoteClusterStateAttributesManager.getAsyncReadRunnable( CLUSTER_BLOCKS, remoteClusterBlocks, new LatchedActionListener<>(listener, latch) @@ -227,7 +227,7 @@ public void testGetAsyncMetadataReadAction_ClusterBlocks() throws IOException, I } } - public void testGetAsyncMetadataWriteAction_Custom() throws IOException, InterruptedException { + public void testGetAsyncWriteRunnable_Custom() throws IOException, InterruptedException { Custom custom = getClusterStateCustom(); RemoteClusterStateCustoms remoteClusterStateCustoms = new RemoteClusterStateCustoms( custom, @@ -244,7 +244,7 @@ public void testGetAsyncMetadataWriteAction_Custom() throws IOException, Interru .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); final TestCapturingListener listener = new TestCapturingListener<>(); final CountDownLatch latch = new CountDownLatch(1); - remoteClusterStateAttributesManager.getAsyncMetadataWriteAction( + remoteClusterStateAttributesManager.getAsyncWriteRunnable( CLUSTER_STATE_CUSTOM, remoteClusterStateCustoms, new LatchedActionListener<>(listener, latch) @@ -267,7 +267,7 @@ public void testGetAsyncMetadataWriteAction_Custom() throws IOException, Interru assertEquals(CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION, Integer.parseInt(splitFileName[3])); } - public void testGetAsyncMetadataReadAction_Custom() throws IOException, InterruptedException { + public void testGetAsyncReadRunnable_Custom() throws IOException, InterruptedException { Custom custom = getClusterStateCustom(); String fileName = randomAlphaOfLength(10); RemoteClusterStateCustoms remoteClusterStateCustoms = new RemoteClusterStateCustoms( @@ -282,7 +282,7 @@ public void testGetAsyncMetadataReadAction_Custom() throws IOException, Interrup ); TestCapturingListener capturingListener = new TestCapturingListener<>(); final CountDownLatch latch = new CountDownLatch(1); - remoteClusterStateAttributesManager.getAsyncMetadataReadAction( + remoteClusterStateAttributesManager.getAsyncReadRunnable( CLUSTER_STATE_CUSTOM, remoteClusterStateCustoms, new LatchedActionListener<>(capturingListener, latch) @@ -295,7 +295,7 @@ public void testGetAsyncMetadataReadAction_Custom() throws IOException, Interrup assertEquals(CLUSTER_STATE_CUSTOM, capturingListener.getResult().getComponentName()); } - public void testGetAsyncMetadataWriteAction_Exception() throws IOException, InterruptedException { + public void testGetAsyncWriteRunnable_Exception() throws IOException, InterruptedException { DiscoveryNodes discoveryNodes = getDiscoveryNodes(); RemoteDiscoveryNodes remoteDiscoveryNodes = new RemoteDiscoveryNodes(discoveryNodes, VERSION, CLUSTER_UUID, compressor); @@ -308,7 +308,7 @@ public void testGetAsyncMetadataWriteAction_Exception() throws IOException, Inte TestCapturingListener capturingListener = new TestCapturingListener<>(); final CountDownLatch latch = new CountDownLatch(1); - remoteClusterStateAttributesManager.getAsyncMetadataWriteAction( + remoteClusterStateAttributesManager.getAsyncWriteRunnable( DISCOVERY_NODES, remoteDiscoveryNodes, new LatchedActionListener<>(capturingListener, latch) @@ -319,21 +319,22 @@ public void testGetAsyncMetadataWriteAction_Exception() throws IOException, Inte assertEquals(ioException, capturingListener.getFailure().getCause()); } - public void testGetAsyncMetadataReadAction_Exception() throws IOException, InterruptedException { + public void testGetAsyncReadRunnable_Exception() throws IOException, InterruptedException { String fileName = randomAlphaOfLength(10); RemoteDiscoveryNodes remoteDiscoveryNodes = new RemoteDiscoveryNodes(fileName, CLUSTER_UUID, compressor); Exception ioException = new IOException("mock test exception"); when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenThrow(ioException); CountDownLatch latch = new CountDownLatch(1); TestCapturingListener capturingListener = new TestCapturingListener<>(); - remoteClusterStateAttributesManager.getAsyncMetadataReadAction( + remoteClusterStateAttributesManager.getAsyncReadRunnable( DISCOVERY_NODES, remoteDiscoveryNodes, new LatchedActionListener<>(capturingListener, latch) ).run(); latch.await(); assertNull(capturingListener.getResult()); - assertEquals(ioException, capturingListener.getFailure()); + assertEquals(ioException, capturingListener.getFailure().getCause()); + assertTrue(capturingListener.getFailure() instanceof RemoteStateTransferException); } public void testGetUpdatedCustoms() { diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManagerTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManagerTests.java index c543f986b3e86..2ada062e972bd 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManagerTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManagerTests.java @@ -157,7 +157,7 @@ public void testGlobalMetadataUploadWaitTimeSetting() { assertEquals(globalMetadataUploadTimeout, remoteGlobalMetadataManager.getGlobalMetadataUploadTimeout().seconds()); } - public void testGetReadMetadataAsyncAction_CoordinationMetadata() throws Exception { + public void testGetAsyncReadRunnable_CoordinationMetadata() throws Exception { CoordinationMetadata coordinationMetadata = getCoordinationMetadata(); String fileName = randomAlphaOfLength(10); RemoteCoordinationMetadata coordinationMetadataForDownload = new RemoteCoordinationMetadata( @@ -172,9 +172,9 @@ public void testGetReadMetadataAsyncAction_CoordinationMetadata() throws Excepti TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.getAsyncMetadataReadAction( - coordinationMetadataForDownload, + remoteGlobalMetadataManager.getAsyncReadRunnable( COORDINATION_METADATA, + coordinationMetadataForDownload, new LatchedActionListener<>(listener, latch) ).run(); latch.await(); @@ -185,7 +185,7 @@ public void testGetReadMetadataAsyncAction_CoordinationMetadata() throws Excepti assertEquals(COORDINATION_METADATA, listener.getResult().getComponentName()); } - public void testGetAsyncMetadataWriteAction_CoordinationMetadata() throws Exception { + public void testGetAsyncWriteRunnable_CoordinationMetadata() throws Exception { CoordinationMetadata coordinationMetadata = getCoordinationMetadata(); RemoteCoordinationMetadata remoteCoordinationMetadata = new RemoteCoordinationMetadata( coordinationMetadata, @@ -202,8 +202,11 @@ public void testGetAsyncMetadataWriteAction_CoordinationMetadata() throws Except TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.getAsyncMetadataWriteAction(remoteCoordinationMetadata, new LatchedActionListener<>(listener, latch)) - .run(); + remoteGlobalMetadataManager.getAsyncWriteRunnable( + COORDINATION_METADATA, + remoteCoordinationMetadata, + new LatchedActionListener<>(listener, latch) + ).run(); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -223,7 +226,7 @@ public void testGetAsyncMetadataWriteAction_CoordinationMetadata() throws Except assertEquals(GLOBAL_METADATA_CURRENT_CODEC_VERSION, Integer.parseInt(splitFileName[3])); } - public void testGetReadMetadataAsyncAction_PersistentSettings() throws Exception { + public void testGetAsyncReadRunnable_PersistentSettings() throws Exception { Settings settingsMetadata = getSettings(); String fileName = randomAlphaOfLength(10); RemotePersistentSettingsMetadata persistentSettings = new RemotePersistentSettingsMetadata( @@ -239,11 +242,8 @@ public void testGetReadMetadataAsyncAction_PersistentSettings() throws Exception TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.getAsyncMetadataReadAction( - persistentSettings, - SETTING_METADATA, - new LatchedActionListener<>(listener, latch) - ).run(); + remoteGlobalMetadataManager.getAsyncReadRunnable(SETTING_METADATA, persistentSettings, new LatchedActionListener<>(listener, latch)) + .run(); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -252,7 +252,7 @@ public void testGetReadMetadataAsyncAction_PersistentSettings() throws Exception assertEquals(SETTING_METADATA, listener.getResult().getComponentName()); } - public void testGetAsyncMetadataWriteAction_PersistentSettings() throws Exception { + public void testGetAsyncWriteRunnable_PersistentSettings() throws Exception { Settings settingsMetadata = getSettings(); RemotePersistentSettingsMetadata persistentSettings = new RemotePersistentSettingsMetadata( settingsMetadata, @@ -268,7 +268,11 @@ public void testGetAsyncMetadataWriteAction_PersistentSettings() throws Exceptio .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.getAsyncMetadataWriteAction(persistentSettings, new LatchedActionListener<>(listener, latch)).run(); + remoteGlobalMetadataManager.getAsyncWriteRunnable( + SETTING_METADATA, + persistentSettings, + new LatchedActionListener<>(listener, latch) + ).run(); latch.await(); assertNull(listener.getFailure()); @@ -289,7 +293,7 @@ public void testGetAsyncMetadataWriteAction_PersistentSettings() throws Exceptio assertEquals(GLOBAL_METADATA_CURRENT_CODEC_VERSION, Integer.parseInt(splitFileName[3])); } - public void testGetReadMetadataAsyncAction_TransientSettings() throws Exception { + public void testGetAsyncReadRunnable_TransientSettings() throws Exception { Settings settingsMetadata = getSettings(); String fileName = randomAlphaOfLength(10); RemoteTransientSettingsMetadata transientSettings = new RemoteTransientSettingsMetadata( @@ -305,9 +309,9 @@ public void testGetReadMetadataAsyncAction_TransientSettings() throws Exception TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.getAsyncMetadataReadAction( - transientSettings, + remoteGlobalMetadataManager.getAsyncReadRunnable( TRANSIENT_SETTING_METADATA, + transientSettings, new LatchedActionListener<>(listener, latch) ).run(); latch.await(); @@ -318,7 +322,7 @@ public void testGetReadMetadataAsyncAction_TransientSettings() throws Exception assertEquals(TRANSIENT_SETTING_METADATA, listener.getResult().getComponentName()); } - public void testGetAsyncMetadataWriteAction_TransientSettings() throws Exception { + public void testGetAsyncWriteRunnable_TransientSettings() throws Exception { Settings settingsMetadata = getSettings(); RemoteTransientSettingsMetadata transientSettings = new RemoteTransientSettingsMetadata( settingsMetadata, @@ -334,7 +338,11 @@ public void testGetAsyncMetadataWriteAction_TransientSettings() throws Exception .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.getAsyncMetadataWriteAction(transientSettings, new LatchedActionListener<>(listener, latch)).run(); + remoteGlobalMetadataManager.getAsyncWriteRunnable( + TRANSIENT_SETTING_METADATA, + transientSettings, + new LatchedActionListener<>(listener, latch) + ).run(); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -354,7 +362,7 @@ public void testGetAsyncMetadataWriteAction_TransientSettings() throws Exception assertEquals(GLOBAL_METADATA_CURRENT_CODEC_VERSION, Integer.parseInt(splitFileName[3])); } - public void testGetReadMetadataAsyncAction_HashesOfConsistentSettings() throws Exception { + public void testGetAsyncReadRunnable_HashesOfConsistentSettings() throws Exception { DiffableStringMap hashesOfConsistentSettings = getHashesOfConsistentSettings(); String fileName = randomAlphaOfLength(10); RemoteHashesOfConsistentSettings hashesOfConsistentSettingsForDownload = new RemoteHashesOfConsistentSettings( @@ -368,9 +376,9 @@ public void testGetReadMetadataAsyncAction_HashesOfConsistentSettings() throws E TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.getAsyncMetadataReadAction( - hashesOfConsistentSettingsForDownload, + remoteGlobalMetadataManager.getAsyncReadRunnable( HASHES_OF_CONSISTENT_SETTINGS, + hashesOfConsistentSettingsForDownload, new LatchedActionListener<>(listener, latch) ).run(); latch.await(); @@ -381,7 +389,7 @@ public void testGetReadMetadataAsyncAction_HashesOfConsistentSettings() throws E assertEquals(HASHES_OF_CONSISTENT_SETTINGS, listener.getResult().getComponentName()); } - public void testGetAsyncMetadataWriteAction_HashesOfConsistentSettings() throws Exception { + public void testGetAsyncWriteRunnable_HashesOfConsistentSettings() throws Exception { DiffableStringMap hashesOfConsistentSettings = getHashesOfConsistentSettings(); RemoteHashesOfConsistentSettings hashesOfConsistentSettingsForUpload = new RemoteHashesOfConsistentSettings( hashesOfConsistentSettings, @@ -396,7 +404,8 @@ public void testGetAsyncMetadataWriteAction_HashesOfConsistentSettings() throws .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.getAsyncMetadataWriteAction( + remoteGlobalMetadataManager.getAsyncWriteRunnable( + HASHES_OF_CONSISTENT_SETTINGS, hashesOfConsistentSettingsForUpload, new LatchedActionListener<>(listener, latch) ).run(); @@ -419,7 +428,7 @@ public void testGetAsyncMetadataWriteAction_HashesOfConsistentSettings() throws assertEquals(GLOBAL_METADATA_CURRENT_CODEC_VERSION, Integer.parseInt(splitFileName[3])); } - public void testGetReadMetadataAsyncAction_TemplatesMetadata() throws Exception { + public void testGetAsyncReadRunnable_TemplatesMetadata() throws Exception { TemplatesMetadata templatesMetadata = getTemplatesMetadata(); String fileName = randomAlphaOfLength(10); RemoteTemplatesMetadata templatesMetadataForDownload = new RemoteTemplatesMetadata( @@ -433,9 +442,9 @@ public void testGetReadMetadataAsyncAction_TemplatesMetadata() throws Exception ); TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.getAsyncMetadataReadAction( - templatesMetadataForDownload, + remoteGlobalMetadataManager.getAsyncReadRunnable( TEMPLATES_METADATA, + templatesMetadataForDownload, new LatchedActionListener<>(listener, latch) ).run(); latch.await(); @@ -446,7 +455,7 @@ public void testGetReadMetadataAsyncAction_TemplatesMetadata() throws Exception assertEquals(TEMPLATES_METADATA, listener.getResult().getComponentName()); } - public void testGetAsyncMetadataWriteAction_TemplatesMetadata() throws Exception { + public void testGetAsyncWriteRunnable_TemplatesMetadata() throws Exception { TemplatesMetadata templatesMetadata = getTemplatesMetadata(); RemoteTemplatesMetadata templateMetadataForUpload = new RemoteTemplatesMetadata( templatesMetadata, @@ -462,8 +471,11 @@ public void testGetAsyncMetadataWriteAction_TemplatesMetadata() throws Exception .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.getAsyncMetadataWriteAction(templateMetadataForUpload, new LatchedActionListener<>(listener, latch)) - .run(); + remoteGlobalMetadataManager.getAsyncWriteRunnable( + TEMPLATES_METADATA, + templateMetadataForUpload, + new LatchedActionListener<>(listener, latch) + ).run(); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -483,7 +495,7 @@ public void testGetAsyncMetadataWriteAction_TemplatesMetadata() throws Exception assertEquals(GLOBAL_METADATA_CURRENT_CODEC_VERSION, Integer.parseInt(splitFileName[3])); } - public void testGetReadMetadataAsyncAction_CustomMetadata() throws Exception { + public void testGetAsyncReadRunnable_CustomMetadata() throws Exception { Metadata.Custom customMetadata = getCustomMetadata(); String fileName = randomAlphaOfLength(10); RemoteCustomMetadata customMetadataForDownload = new RemoteCustomMetadata( @@ -498,9 +510,9 @@ public void testGetReadMetadataAsyncAction_CustomMetadata() throws Exception { ); TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.getAsyncMetadataReadAction( - customMetadataForDownload, + remoteGlobalMetadataManager.getAsyncReadRunnable( IndexGraveyard.TYPE, + customMetadataForDownload, new LatchedActionListener<>(listener, latch) ).run(); latch.await(); @@ -511,7 +523,7 @@ public void testGetReadMetadataAsyncAction_CustomMetadata() throws Exception { assertEquals(IndexGraveyard.TYPE, listener.getResult().getComponentName()); } - public void testGetAsyncMetadataWriteAction_CustomMetadata() throws Exception { + public void testGetAsyncWriteRunnable_CustomMetadata() throws Exception { Metadata.Custom customMetadata = getCustomMetadata(); RemoteCustomMetadata customMetadataForUpload = new RemoteCustomMetadata( customMetadata, @@ -528,8 +540,11 @@ public void testGetAsyncMetadataWriteAction_CustomMetadata() throws Exception { .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.getAsyncMetadataWriteAction(customMetadataForUpload, new LatchedActionListener<>(listener, latch)) - .run(); + remoteGlobalMetadataManager.getAsyncWriteRunnable( + customMetadataForUpload.getType(), + customMetadataForUpload, + new LatchedActionListener<>(listener, latch) + ).run(); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -549,7 +564,7 @@ public void testGetAsyncMetadataWriteAction_CustomMetadata() throws Exception { assertEquals(GLOBAL_METADATA_CURRENT_CODEC_VERSION, Integer.parseInt(splitFileName[3])); } - public void testGetReadMetadataAsyncAction_GlobalMetadata() throws Exception { + public void testGetAsyncReadRunnable_GlobalMetadata() throws Exception { Metadata metadata = getGlobalMetadata(); String fileName = randomAlphaOfLength(10); RemoteGlobalMetadata globalMetadataForDownload = new RemoteGlobalMetadata(fileName, CLUSTER_UUID, compressor, xContentRegistry); @@ -558,9 +573,9 @@ public void testGetReadMetadataAsyncAction_GlobalMetadata() throws Exception { ); TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.getAsyncMetadataReadAction( - globalMetadataForDownload, + remoteGlobalMetadataManager.getAsyncReadRunnable( GLOBAL_METADATA, + globalMetadataForDownload, new LatchedActionListener<>(listener, latch) ).run(); latch.await(); @@ -571,7 +586,7 @@ public void testGetReadMetadataAsyncAction_GlobalMetadata() throws Exception { assertEquals(GLOBAL_METADATA, listener.getResult().getComponentName()); } - public void testGetReadMetadataAsyncAction_IOException() throws Exception { + public void testGetAsyncReadRunnable_IOException() throws Exception { String fileName = randomAlphaOfLength(10); RemoteCoordinationMetadata coordinationMetadataForDownload = new RemoteCoordinationMetadata( fileName, @@ -583,18 +598,19 @@ public void testGetReadMetadataAsyncAction_IOException() throws Exception { when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenThrow(ioException); TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.getAsyncMetadataReadAction( - coordinationMetadataForDownload, + remoteGlobalMetadataManager.getAsyncReadRunnable( COORDINATION_METADATA, + coordinationMetadataForDownload, new LatchedActionListener<>(listener, latch) ).run(); latch.await(); assertNull(listener.getResult()); assertNotNull(listener.getFailure()); - assertEquals(ioException, listener.getFailure()); + assertEquals(ioException, listener.getFailure().getCause()); + assertTrue(listener.getFailure() instanceof RemoteStateTransferException); } - public void testGetAsyncMetadataWriteAction_IOException() throws Exception { + public void testGetAsyncWriteRunnable_IOException() throws Exception { CoordinationMetadata coordinationMetadata = getCoordinationMetadata(); RemoteCoordinationMetadata remoteCoordinationMetadata = new RemoteCoordinationMetadata( coordinationMetadata, @@ -612,8 +628,11 @@ public void testGetAsyncMetadataWriteAction_IOException() throws Exception { TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.getAsyncMetadataWriteAction(remoteCoordinationMetadata, new LatchedActionListener<>(listener, latch)) - .run(); + remoteGlobalMetadataManager.getAsyncWriteRunnable( + COORDINATION_METADATA, + remoteCoordinationMetadata, + new LatchedActionListener<>(listener, latch) + ).run(); assertNull(listener.getResult()); assertNotNull(listener.getFailure()); assertTrue(listener.getFailure() instanceof RemoteStateTransferException); diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteIndexMetadataManagerTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteIndexMetadataManagerTests.java index 817fc7b55d09a..c12306b956046 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteIndexMetadataManagerTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteIndexMetadataManagerTests.java @@ -24,6 +24,7 @@ import org.opensearch.core.action.ActionListener; import org.opensearch.core.compress.Compressor; import org.opensearch.core.compress.NoneCompressor; +import org.opensearch.gateway.remote.model.RemoteIndexMetadata; import org.opensearch.gateway.remote.model.RemoteReadResult; import org.opensearch.index.remote.RemoteStoreUtils; import org.opensearch.index.translog.transfer.BlobStoreTransferService; @@ -83,7 +84,7 @@ public void tearDown() throws Exception { threadPool.shutdown(); } - public void testGetAsyncIndexMetadataWriteAction_Success() throws Exception { + public void testGetAsyncWriteRunnable_Success() throws Exception { IndexMetadata indexMetadata = getIndexMetadata(randomAlphaOfLength(10), randomBoolean(), randomAlphaOfLength(10)); BlobContainer blobContainer = mock(AsyncMultiStreamBlobContainer.class); BlobStore blobStore = mock(BlobStore.class); @@ -97,9 +98,9 @@ public void testGetAsyncIndexMetadataWriteAction_Success() throws Exception { return null; })).when(blobStoreTransferService).uploadBlob(any(), any(), any(), eq(WritePriority.URGENT), any(ActionListener.class)); - remoteIndexMetadataManager.getAsyncIndexMetadataWriteAction( - indexMetadata, - "cluster-uuid", + remoteIndexMetadataManager.getAsyncWriteRunnable( + INDEX, + new RemoteIndexMetadata(indexMetadata, "cluster-uuid", compressor, null), new LatchedActionListener<>(listener, latch) ).run(); latch.await(); @@ -116,7 +117,7 @@ public void testGetAsyncIndexMetadataWriteAction_Success() throws Exception { assertTrue(pathTokens[6].startsWith(expectedFilePrefix)); } - public void testGetAsyncIndexMetadataWriteAction_IOFailure() throws Exception { + public void testGetAsyncWriteRunnable_IOFailure() throws Exception { IndexMetadata indexMetadata = getIndexMetadata(randomAlphaOfLength(10), randomBoolean(), randomAlphaOfLength(10)); BlobContainer blobContainer = mock(AsyncMultiStreamBlobContainer.class); BlobStore blobStore = mock(BlobStore.class); @@ -129,9 +130,9 @@ public void testGetAsyncIndexMetadataWriteAction_IOFailure() throws Exception { return null; })).when(blobStoreTransferService).uploadBlob(any(), any(), any(), eq(WritePriority.URGENT), any(ActionListener.class)); - remoteIndexMetadataManager.getAsyncIndexMetadataWriteAction( - indexMetadata, - "cluster-uuid", + remoteIndexMetadataManager.getAsyncWriteRunnable( + INDEX, + new RemoteIndexMetadata(indexMetadata, "cluster-uuid", compressor, null), new LatchedActionListener<>(listener, latch) ).run(); latch.await(); @@ -140,7 +141,7 @@ public void testGetAsyncIndexMetadataWriteAction_IOFailure() throws Exception { assertTrue(listener.getFailure() instanceof RemoteStateTransferException); } - public void testGetAsyncIndexMetadataReadAction_Success() throws Exception { + public void testGetAsyncReadRunnable_Success() throws Exception { IndexMetadata indexMetadata = getIndexMetadata(randomAlphaOfLength(10), randomBoolean(), randomAlphaOfLength(10)); String fileName = randomAlphaOfLength(10); fileName = fileName + DELIMITER + '2'; @@ -150,15 +151,18 @@ public void testGetAsyncIndexMetadataReadAction_Success() throws Exception { TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteIndexMetadataManager.getAsyncIndexMetadataReadAction("cluster-uuid", fileName, new LatchedActionListener<>(listener, latch)) - .run(); + remoteIndexMetadataManager.getAsyncReadRunnable( + INDEX, + new RemoteIndexMetadata(fileName, "cluster-uuid", compressor, null), + new LatchedActionListener<>(listener, latch) + ).run(); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); assertEquals(indexMetadata, listener.getResult().getObj()); } - public void testGetAsyncIndexMetadataReadAction_IOFailure() throws Exception { + public void testGetAsyncReadRunnable_IOFailure() throws Exception { String fileName = randomAlphaOfLength(10); fileName = fileName + DELIMITER + '2'; Exception exception = new IOException("testing failure"); @@ -166,12 +170,16 @@ public void testGetAsyncIndexMetadataReadAction_IOFailure() throws Exception { TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteIndexMetadataManager.getAsyncIndexMetadataReadAction("cluster-uuid", fileName, new LatchedActionListener<>(listener, latch)) - .run(); + remoteIndexMetadataManager.getAsyncReadRunnable( + INDEX, + new RemoteIndexMetadata(fileName, "cluster-uuid", compressor, null), + new LatchedActionListener<>(listener, latch) + ).run(); latch.await(); assertNull(listener.getResult()); assertNotNull(listener.getFailure()); - assertEquals(exception, listener.getFailure()); + assertEquals(exception, listener.getFailure().getCause()); + assertTrue(listener.getFailure() instanceof RemoteStateTransferException); } private IndexMetadata getIndexMetadata(String name, @Nullable Boolean writeIndex, String... aliases) {