From 1aa6f3d44a74d2014fd457d3969601c66e6cb0e0 Mon Sep 17 00:00:00 2001 From: Shivansh Arora Date: Fri, 5 Jul 2024 19:04:00 +0530 Subject: [PATCH] Create interface RemoteEntitiesManager Signed-off-by: Shivansh Arora --- .../remote/AbstractRemoteEntitiesManager.java | 61 ++++++++++ .../common/remote/RemoteEntitiesManager.java | 30 +++++ .../RemoteClusterStateAttributesManager.java | 48 +++----- .../remote/RemoteClusterStateService.java | 76 ++++++++----- .../remote/RemoteGlobalMetadataManager.java | 54 +++------ .../remote/RemoteIndexMetadataManager.java | 107 ++++++------------ ...oteClusterStateAttributesManagerTests.java | 32 +++--- .../RemoteGlobalMetadataManagerTests.java | 77 ++++++------- 8 files changed, 261 insertions(+), 224 deletions(-) create mode 100644 server/src/main/java/org/opensearch/common/remote/AbstractRemoteEntitiesManager.java create mode 100644 server/src/main/java/org/opensearch/common/remote/RemoteEntitiesManager.java diff --git a/server/src/main/java/org/opensearch/common/remote/AbstractRemoteEntitiesManager.java b/server/src/main/java/org/opensearch/common/remote/AbstractRemoteEntitiesManager.java new file mode 100644 index 0000000000000..3da3526be7226 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/remote/AbstractRemoteEntitiesManager.java @@ -0,0 +1,61 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.remote; + +import org.opensearch.action.LatchedActionListener; +import org.opensearch.common.CheckedRunnable; +import org.opensearch.core.action.ActionListener; +import org.opensearch.gateway.remote.ClusterMetadataManifest; +import org.opensearch.gateway.remote.model.RemoteReadResult; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +public abstract class AbstractRemoteEntitiesManager implements RemoteEntitiesManager { + protected final Map remoteWritableEntityStores = new HashMap<>(); + + protected RemoteWritableEntityStore getStore(AbstractRemoteWritableBlobEntity entity) { + RemoteWritableEntityStore remoteStore = remoteWritableEntityStores.get(entity.getType()); + if (remoteStore == null) { + throw new IllegalArgumentException("Unknown entity type [" + entity.getType() + "]"); + } + return remoteStore; + } + + protected abstract ActionListener getWriteActionListener( + String component, + AbstractRemoteWritableBlobEntity remoteObject, + LatchedActionListener latchedActionListener + ); + + protected abstract ActionListener getReadActionListener( + String component, + AbstractRemoteWritableBlobEntity remoteObject, + LatchedActionListener latchedActionListener + ); + + @Override + public CheckedRunnable getAsyncWriteRunnable( + String component, + AbstractRemoteWritableBlobEntity entity, + LatchedActionListener latchedActionListener + ) { + return () -> getStore(entity).writeAsync(entity, getWriteActionListener(component, entity, latchedActionListener)); + } + + @Override + public CheckedRunnable getAsyncReadRunnable( + String component, + AbstractRemoteWritableBlobEntity entity, + LatchedActionListener latchedActionListener + ) { + return () -> getStore(entity).readAsync(entity, getReadActionListener(component, entity, latchedActionListener)); + } +} diff --git a/server/src/main/java/org/opensearch/common/remote/RemoteEntitiesManager.java b/server/src/main/java/org/opensearch/common/remote/RemoteEntitiesManager.java new file mode 100644 index 0000000000000..d947e91be12c5 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/remote/RemoteEntitiesManager.java @@ -0,0 +1,30 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.remote; + +import org.opensearch.action.LatchedActionListener; +import org.opensearch.common.CheckedRunnable; +import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata; +import org.opensearch.gateway.remote.model.RemoteReadResult; + +import java.io.IOException; + +public interface RemoteEntitiesManager { + CheckedRunnable getAsyncReadRunnable( + String component, + AbstractRemoteWritableBlobEntity entity, + LatchedActionListener latchedActionListener + ); + + CheckedRunnable getAsyncWriteRunnable( + String component, + AbstractRemoteWritableBlobEntity entity, + LatchedActionListener latchedActionListener + ); +} diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java index 8f986423587d7..22a00ee5d6353 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java @@ -12,9 +12,8 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.DiffableUtils; import org.opensearch.cluster.DiffableUtils.NonDiffableValueSerializer; -import org.opensearch.common.CheckedRunnable; +import org.opensearch.common.remote.AbstractRemoteEntitiesManager; import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity; -import org.opensearch.common.remote.RemoteWritableEntityStore; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.gateway.remote.model.RemoteClusterBlocks; @@ -26,9 +25,7 @@ import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.threadpool.ThreadPool; -import java.io.IOException; import java.util.Collections; -import java.util.HashMap; import java.util.Map; /** @@ -36,12 +33,11 @@ * * @opensearch.internal */ -public class RemoteClusterStateAttributesManager { +public class RemoteClusterStateAttributesManager extends AbstractRemoteEntitiesManager { public static final String CLUSTER_STATE_ATTRIBUTE = "cluster_state_attribute"; public static final String DISCOVERY_NODES = "nodes"; public static final String CLUSTER_BLOCKS = "blocks"; public static final int CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION = 1; - private final Map remoteWritableEntityStores; private final NamedWriteableRegistry namedWriteableRegistry; RemoteClusterStateAttributesManager( @@ -52,7 +48,6 @@ public class RemoteClusterStateAttributesManager { ThreadPool threadpool ) { this.namedWriteableRegistry = namedWriteableRegistry; - this.remoteWritableEntityStores = new HashMap<>(); this.remoteWritableEntityStores.put( RemoteDiscoveryNodes.DISCOVERY_NODES, new RemoteClusterStateBlobStore<>( @@ -85,46 +80,28 @@ public class RemoteClusterStateAttributesManager { ); } - /** - * Allows async upload of Cluster State Attribute components to remote - */ - CheckedRunnable getAsyncMetadataWriteAction( - String component, - AbstractRemoteWritableBlobEntity blobEntity, - LatchedActionListener latchedActionListener - ) { - return () -> getStore(blobEntity).writeAsync(blobEntity, getActionListener(component, blobEntity, latchedActionListener)); - } - - private ActionListener getActionListener( + @Override + protected ActionListener getWriteActionListener( String component, AbstractRemoteWritableBlobEntity remoteObject, LatchedActionListener latchedActionListener ) { return ActionListener.wrap( resp -> latchedActionListener.onResponse(remoteObject.getUploadedMetadata()), - ex -> latchedActionListener.onFailure(new RemoteStateTransferException(component, remoteObject, ex)) + ex -> latchedActionListener.onFailure(new RemoteStateTransferException("Upload failed for " + component, remoteObject, ex)) ); } - private RemoteWritableEntityStore getStore(AbstractRemoteWritableBlobEntity entity) { - RemoteWritableEntityStore remoteStore = remoteWritableEntityStores.get(entity.getType()); - if (remoteStore == null) { - throw new IllegalArgumentException("Unknown entity type [" + entity.getType() + "]"); - } - return remoteStore; - } - - public CheckedRunnable getAsyncMetadataReadAction( + @Override + protected ActionListener getReadActionListener( String component, - AbstractRemoteWritableBlobEntity blobEntity, - LatchedActionListener listener + AbstractRemoteWritableBlobEntity remoteObject, + LatchedActionListener latchedActionListener ) { - final ActionListener actionListener = ActionListener.wrap( - response -> listener.onResponse(new RemoteReadResult(response, CLUSTER_STATE_ATTRIBUTE, component)), - listener::onFailure + return ActionListener.wrap( + response -> latchedActionListener.onResponse(new RemoteReadResult(response, CLUSTER_STATE_ATTRIBUTE, component)), + ex -> latchedActionListener.onFailure(new RemoteStateTransferException("Download failed for " + component, remoteObject, ex)) ); - return () -> getStore(blobEntity).readAsync(blobEntity, actionListener); } public DiffableUtils.MapDiff> getUpdatedCustoms( @@ -158,4 +135,5 @@ public DiffableUtils.MapDiff { uploadTasks.put( indexMetadata.getIndex().getName(), - remoteIndexMetadataManager.getAsyncIndexMetadataWriteAction(indexMetadata, clusterState.metadata().clusterUUID(), listener) + remoteIndexMetadataManager.getAsyncWriteRunnable( + indexMetadata.getIndex().getName(), + new RemoteIndexMetadata( + indexMetadata, + clusterState.metadata().clusterUUID(), + blobStoreRepository.getCompressor(), + blobStoreRepository.getNamedXContentRegistry() + ), + listener + ) ); }); clusterStateCustomToUpload.forEach((key, value) -> { uploadTasks.put( key, - remoteClusterStateAttributesManager.getAsyncMetadataWriteAction( + remoteClusterStateAttributesManager.getAsyncWriteRunnable( CLUSTER_STATE_CUSTOM, new RemoteClusterStateCustoms( value, @@ -1015,7 +1030,16 @@ private ClusterState readClusterStateInParallel( for (UploadedIndexMetadata indexMetadata : indicesToRead) { asyncMetadataReadActions.add( - remoteIndexMetadataManager.getAsyncIndexMetadataReadAction(clusterUUID, indexMetadata.getUploadedFilename(), listener) + remoteIndexMetadataManager.getAsyncReadRunnable( + indexMetadata.getIndexName(), + new RemoteIndexMetadata( + RemoteClusterStateUtils.getFormattedIndexFileName(indexMetadata.getUploadedFilename()), + clusterUUID, + blobStoreRepository.getCompressor(), + blobStoreRepository.getNamedXContentRegistry() + ), + listener + ) ); } @@ -1042,7 +1066,8 @@ private ClusterState readClusterStateInParallel( for (Map.Entry entry : customToRead.entrySet()) { asyncMetadataReadActions.add( - remoteGlobalMetadataManager.getAsyncMetadataReadAction( + remoteGlobalMetadataManager.getAsyncReadRunnable( + entry.getValue().getAttributeName(), new RemoteCustomMetadata( entry.getValue().getUploadedFilename(), entry.getKey(), @@ -1050,7 +1075,6 @@ private ClusterState readClusterStateInParallel( blobStoreRepository.getCompressor(), namedWriteableRegistry ), - entry.getValue().getAttributeName(), listener ) ); @@ -1058,14 +1082,14 @@ private ClusterState readClusterStateInParallel( if (readCoordinationMetadata) { asyncMetadataReadActions.add( - remoteGlobalMetadataManager.getAsyncMetadataReadAction( + remoteGlobalMetadataManager.getAsyncReadRunnable( + COORDINATION_METADATA, new RemoteCoordinationMetadata( manifest.getCoordinationMetadata().getUploadedFilename(), clusterUUID, blobStoreRepository.getCompressor(), blobStoreRepository.getNamedXContentRegistry() ), - COORDINATION_METADATA, listener ) ); @@ -1073,14 +1097,14 @@ private ClusterState readClusterStateInParallel( if (readSettingsMetadata) { asyncMetadataReadActions.add( - remoteGlobalMetadataManager.getAsyncMetadataReadAction( + remoteGlobalMetadataManager.getAsyncReadRunnable( + SETTING_METADATA, new RemotePersistentSettingsMetadata( manifest.getSettingsMetadata().getUploadedFilename(), clusterUUID, blobStoreRepository.getCompressor(), blobStoreRepository.getNamedXContentRegistry() ), - SETTING_METADATA, listener ) ); @@ -1088,14 +1112,14 @@ private ClusterState readClusterStateInParallel( if (readTransientSettingsMetadata) { asyncMetadataReadActions.add( - remoteGlobalMetadataManager.getAsyncMetadataReadAction( + remoteGlobalMetadataManager.getAsyncReadRunnable( + TRANSIENT_SETTING_METADATA, new RemoteTransientSettingsMetadata( manifest.getTransientSettingsMetadata().getUploadedFilename(), clusterUUID, blobStoreRepository.getCompressor(), blobStoreRepository.getNamedXContentRegistry() ), - TRANSIENT_SETTING_METADATA, listener ) ); @@ -1103,14 +1127,14 @@ private ClusterState readClusterStateInParallel( if (readTemplatesMetadata) { asyncMetadataReadActions.add( - remoteGlobalMetadataManager.getAsyncMetadataReadAction( + remoteGlobalMetadataManager.getAsyncReadRunnable( + TEMPLATES_METADATA, new RemoteTemplatesMetadata( manifest.getTemplatesMetadata().getUploadedFilename(), clusterUUID, blobStoreRepository.getCompressor(), blobStoreRepository.getNamedXContentRegistry() ), - TEMPLATES_METADATA, listener ) ); @@ -1118,7 +1142,7 @@ private ClusterState readClusterStateInParallel( if (readDiscoveryNodes) { asyncMetadataReadActions.add( - remoteClusterStateAttributesManager.getAsyncMetadataReadAction( + remoteClusterStateAttributesManager.getAsyncReadRunnable( DISCOVERY_NODES, new RemoteDiscoveryNodes( manifest.getDiscoveryNodesMetadata().getUploadedFilename(), @@ -1132,7 +1156,7 @@ private ClusterState readClusterStateInParallel( if (readClusterBlocks) { asyncMetadataReadActions.add( - remoteClusterStateAttributesManager.getAsyncMetadataReadAction( + remoteClusterStateAttributesManager.getAsyncReadRunnable( CLUSTER_BLOCKS, new RemoteClusterBlocks( manifest.getClusterBlocksMetadata().getUploadedFilename(), @@ -1146,13 +1170,13 @@ private ClusterState readClusterStateInParallel( if (readHashesOfConsistentSettings) { asyncMetadataReadActions.add( - remoteGlobalMetadataManager.getAsyncMetadataReadAction( + remoteGlobalMetadataManager.getAsyncReadRunnable( + HASHES_OF_CONSISTENT_SETTINGS, new RemoteHashesOfConsistentSettings( manifest.getHashesOfConsistentSettings().getUploadedFilename(), clusterUUID, blobStoreRepository.getCompressor() ), - HASHES_OF_CONSISTENT_SETTINGS, listener ) ); @@ -1160,7 +1184,7 @@ private ClusterState readClusterStateInParallel( for (Map.Entry entry : clusterStateCustomToRead.entrySet()) { asyncMetadataReadActions.add( - remoteClusterStateAttributesManager.getAsyncMetadataReadAction( + remoteClusterStateAttributesManager.getAsyncReadRunnable( // pass component name as cluster-state-custom--, so that we can interpret it later String.join(CUSTOM_DELIMITER, CLUSTER_STATE_CUSTOM, entry.getKey()), new RemoteClusterStateCustoms( diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManager.java index 2c5aad99adc0c..3c00c4cc2d94a 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManager.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManager.java @@ -17,9 +17,8 @@ import org.opensearch.cluster.metadata.Metadata.Custom; import org.opensearch.cluster.metadata.Metadata.XContentContext; import org.opensearch.cluster.metadata.TemplatesMetadata; -import org.opensearch.common.CheckedRunnable; +import org.opensearch.common.remote.AbstractRemoteEntitiesManager; import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity; -import org.opensearch.common.remote.RemoteWritableEntityStore; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; @@ -43,7 +42,6 @@ import java.io.IOException; import java.util.Collections; -import java.util.HashMap; import java.util.Locale; import java.util.Map; import java.util.Map.Entry; @@ -56,7 +54,7 @@ * * @opensearch.internal */ -public class RemoteGlobalMetadataManager { +public class RemoteGlobalMetadataManager extends AbstractRemoteEntitiesManager { public static final TimeValue GLOBAL_METADATA_UPLOAD_TIMEOUT_DEFAULT = TimeValue.timeValueMillis(20000); @@ -70,7 +68,6 @@ public class RemoteGlobalMetadataManager { public static final int GLOBAL_METADATA_CURRENT_CODEC_VERSION = 1; private volatile TimeValue globalMetadataUploadTimeout; - private Map remoteWritableEntityStores; private final Compressor compressor; private final NamedXContentRegistry namedXContentRegistry; private final NamedWriteableRegistry namedWriteableRegistry; @@ -87,7 +84,6 @@ public class RemoteGlobalMetadataManager { this.compressor = blobStoreRepository.getCompressor(); this.namedXContentRegistry = blobStoreRepository.getNamedXContentRegistry(); this.namedWriteableRegistry = namedWriteableRegistry; - this.remoteWritableEntityStores = new HashMap<>(); this.remoteWritableEntityStores.put( RemoteGlobalMetadata.GLOBAL_METADATA, new RemoteClusterStateBlobStore<>( @@ -161,46 +157,28 @@ public class RemoteGlobalMetadataManager { clusterSettings.addSettingsUpdateConsumer(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING, this::setGlobalMetadataUploadTimeout); } - /** - * Allows async upload of Metadata components to remote - */ - CheckedRunnable getAsyncMetadataWriteAction( - AbstractRemoteWritableBlobEntity writeEntity, - LatchedActionListener latchedActionListener - ) { - return (() -> getStore(writeEntity).writeAsync(writeEntity, getActionListener(writeEntity, latchedActionListener))); - } - - private RemoteWritableEntityStore getStore(AbstractRemoteWritableBlobEntity entity) { - RemoteWritableEntityStore remoteStore = remoteWritableEntityStores.get(entity.getType()); - if (remoteStore == null) { - throw new IllegalArgumentException("Unknown entity type [" + entity.getType() + "]"); - } - return remoteStore; - } - - private ActionListener getActionListener( - AbstractRemoteWritableBlobEntity remoteBlobStoreObject, + @Override + protected ActionListener getWriteActionListener( + String component, + AbstractRemoteWritableBlobEntity remoteObject, LatchedActionListener latchedActionListener ) { return ActionListener.wrap( - resp -> latchedActionListener.onResponse(remoteBlobStoreObject.getUploadedMetadata()), - ex -> latchedActionListener.onFailure( - new RemoteStateTransferException("Upload failed for " + remoteBlobStoreObject.getType(), ex) - ) + resp -> latchedActionListener.onResponse(remoteObject.getUploadedMetadata()), + ex -> latchedActionListener.onFailure(new RemoteStateTransferException("Upload failed for " + component, remoteObject, ex)) ); } - CheckedRunnable getAsyncMetadataReadAction( - AbstractRemoteWritableBlobEntity readEntity, - String componentName, - LatchedActionListener listener + @Override + protected ActionListener getReadActionListener( + String component, + AbstractRemoteWritableBlobEntity remoteObject, + LatchedActionListener latchedActionListener ) { - ActionListener actionListener = ActionListener.wrap( - response -> listener.onResponse(new RemoteReadResult(response, readEntity.getType(), componentName)), - listener::onFailure + return ActionListener.wrap( + response -> latchedActionListener.onResponse(new RemoteReadResult(response, remoteObject.getType(), component)), + ex -> latchedActionListener.onFailure(new RemoteStateTransferException("Download failed for " + component, remoteObject, ex)) ); - return () -> getStore(readEntity).readAsync(readEntity, actionListener); } Metadata getGlobalMetadata(String clusterUUID, ClusterMetadataManifest clusterMetadataManifest) { diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteIndexMetadataManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteIndexMetadataManager.java index a84161b202a22..b4dce486128b9 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteIndexMetadataManager.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteIndexMetadataManager.java @@ -10,8 +10,8 @@ import org.opensearch.action.LatchedActionListener; import org.opensearch.cluster.metadata.IndexMetadata; -import org.opensearch.common.CheckedRunnable; -import org.opensearch.common.remote.RemoteWritableEntityStore; +import org.opensearch.common.remote.AbstractRemoteEntitiesManager; +import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.unit.TimeValue; @@ -26,17 +26,14 @@ import org.opensearch.threadpool.ThreadPool; import java.io.IOException; -import java.util.HashMap; import java.util.Locale; -import java.util.Map; -import java.util.Objects; /** * A Manager which provides APIs to write and read Index Metadata to remote store * * @opensearch.internal */ -public class RemoteIndexMetadataManager { +public class RemoteIndexMetadataManager extends AbstractRemoteEntitiesManager { public static final TimeValue INDEX_METADATA_UPLOAD_TIMEOUT_DEFAULT = TimeValue.timeValueMillis(20000); @@ -48,7 +45,6 @@ public class RemoteIndexMetadataManager { Setting.Property.Deprecated ); - private final RemoteWritableEntityStore indexMetadataBlobStore; private final Compressor compressor; private final NamedXContentRegistry namedXContentRegistry; @@ -61,12 +57,15 @@ public RemoteIndexMetadataManager( BlobStoreTransferService blobStoreTransferService, ThreadPool threadpool ) { - this.indexMetadataBlobStore = new RemoteClusterStateBlobStore<>( - blobStoreTransferService, - blobStoreRepository, - clusterName, - threadpool, - ThreadPool.Names.REMOTE_STATE_READ + this.remoteWritableEntityStores.put( + RemoteIndexMetadata.INDEX, + new RemoteClusterStateBlobStore<>( + blobStoreTransferService, + blobStoreRepository, + clusterName, + threadpool, + ThreadPool.Names.REMOTE_STATE_READ + ) ); this.namedXContentRegistry = blobStoreRepository.getNamedXContentRegistry(); this.compressor = blobStoreRepository.getCompressor(); @@ -74,45 +73,6 @@ public RemoteIndexMetadataManager( clusterSettings.addSettingsUpdateConsumer(INDEX_METADATA_UPLOAD_TIMEOUT_SETTING, this::setIndexMetadataUploadTimeout); } - /** - * Allows async Upload of IndexMetadata to remote - * - * @param indexMetadata {@link IndexMetadata} to upload - * @param latchedActionListener listener to respond back on after upload finishes - */ - CheckedRunnable getAsyncIndexMetadataWriteAction( - IndexMetadata indexMetadata, - String clusterUUID, - LatchedActionListener latchedActionListener - ) { - RemoteIndexMetadata remoteIndexMetadata = new RemoteIndexMetadata(indexMetadata, clusterUUID, compressor, namedXContentRegistry); - ActionListener completionListener = ActionListener.wrap( - resp -> latchedActionListener.onResponse(remoteIndexMetadata.getUploadedMetadata()), - ex -> latchedActionListener.onFailure(new RemoteStateTransferException(indexMetadata.getIndex().getName(), ex)) - ); - return () -> indexMetadataBlobStore.writeAsync(remoteIndexMetadata, completionListener); - } - - CheckedRunnable getAsyncIndexMetadataReadAction( - String clusterUUID, - String uploadedFilename, - LatchedActionListener latchedActionListener - ) { - RemoteIndexMetadata remoteIndexMetadata = new RemoteIndexMetadata( - RemoteClusterStateUtils.getFormattedIndexFileName(uploadedFilename), - clusterUUID, - compressor, - namedXContentRegistry - ); - ActionListener actionListener = ActionListener.wrap( - response -> latchedActionListener.onResponse( - new RemoteReadResult(response, RemoteIndexMetadata.INDEX, response.getIndex().getName()) - ), - latchedActionListener::onFailure - ); - return () -> indexMetadataBlobStore.readAsync(remoteIndexMetadata, actionListener); - } - /** * Fetch index metadata from remote cluster state * @@ -127,7 +87,7 @@ IndexMetadata getIndexMetadata(ClusterMetadataManifest.UploadedIndexMetadata upl namedXContentRegistry ); try { - return indexMetadataBlobStore.read(remoteIndexMetadata); + return (IndexMetadata) getStore(remoteIndexMetadata).read(remoteIndexMetadata); } catch (IOException e) { throw new IllegalStateException( String.format(Locale.ROOT, "Error while downloading IndexMetadata - %s", uploadedIndexMetadata.getUploadedFilename()), @@ -136,24 +96,6 @@ IndexMetadata getIndexMetadata(ClusterMetadataManifest.UploadedIndexMetadata upl } } - /** - * Fetch latest index metadata from remote cluster state - * - * @param clusterMetadataManifest manifest file of cluster - * @param clusterUUID uuid of cluster state to refer to in remote - * @return {@code Map} latest IndexUUID to IndexMetadata map - */ - Map getIndexMetadataMap(String clusterUUID, ClusterMetadataManifest clusterMetadataManifest) { - assert Objects.equals(clusterUUID, clusterMetadataManifest.getClusterUUID()) - : "Corrupt ClusterMetadataManifest found. Cluster UUID mismatch."; - Map remoteIndexMetadata = new HashMap<>(); - for (ClusterMetadataManifest.UploadedIndexMetadata uploadedIndexMetadata : clusterMetadataManifest.getIndices()) { - IndexMetadata indexMetadata = getIndexMetadata(uploadedIndexMetadata, clusterUUID); - remoteIndexMetadata.put(uploadedIndexMetadata.getIndexUUID(), indexMetadata); - } - return remoteIndexMetadata; - } - public TimeValue getIndexMetadataUploadTimeout() { return this.indexMetadataUploadTimeout; } @@ -162,4 +104,27 @@ private void setIndexMetadataUploadTimeout(TimeValue newIndexMetadataUploadTimeo this.indexMetadataUploadTimeout = newIndexMetadataUploadTimeout; } + @Override + protected ActionListener getWriteActionListener( + String component, + AbstractRemoteWritableBlobEntity remoteObject, + LatchedActionListener latchedActionListener + ) { + return ActionListener.wrap( + resp -> latchedActionListener.onResponse(remoteObject.getUploadedMetadata()), + ex -> latchedActionListener.onFailure(new RemoteStateTransferException("Upload failed for " + component, remoteObject, ex)) + ); + } + + @Override + protected ActionListener getReadActionListener( + String component, + AbstractRemoteWritableBlobEntity remoteObject, + LatchedActionListener latchedActionListener + ) { + return ActionListener.wrap( + response -> latchedActionListener.onResponse(new RemoteReadResult(response, RemoteIndexMetadata.INDEX, component)), + ex -> latchedActionListener.onFailure(new RemoteStateTransferException("Download failed for " + component, remoteObject, ex)) + ); + } } diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManagerTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManagerTests.java index fe9ed57fa77b8..0e4ae06d624f8 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManagerTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManagerTests.java @@ -108,7 +108,7 @@ public void tearDown() throws Exception { threadPool.shutdown(); } - public void testGetAsyncMetadataWriteAction_DiscoveryNodes() throws IOException, InterruptedException { + public void testGetAsyncWriteRunnable_DiscoveryNodes() throws IOException, InterruptedException { DiscoveryNodes discoveryNodes = getDiscoveryNodes(); RemoteDiscoveryNodes remoteDiscoveryNodes = new RemoteDiscoveryNodes(discoveryNodes, VERSION, CLUSTER_UUID, compressor); doAnswer(invocationOnMock -> { @@ -118,7 +118,7 @@ public void testGetAsyncMetadataWriteAction_DiscoveryNodes() throws IOException, .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); final CountDownLatch latch = new CountDownLatch(1); final TestCapturingListener listener = new TestCapturingListener<>(); - remoteClusterStateAttributesManager.getAsyncMetadataWriteAction( + remoteClusterStateAttributesManager.getAsyncWriteRunnable( DISCOVERY_NODES, remoteDiscoveryNodes, new LatchedActionListener<>(listener, latch) @@ -141,7 +141,7 @@ public void testGetAsyncMetadataWriteAction_DiscoveryNodes() throws IOException, assertEquals(CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION, Integer.parseInt(splitFileName[3])); } - public void testGetAsyncMetadataReadAction_DiscoveryNodes() throws IOException, InterruptedException { + public void testGetAsyncReadRunnable_DiscoveryNodes() throws IOException, InterruptedException { DiscoveryNodes discoveryNodes = getDiscoveryNodes(); String fileName = randomAlphaOfLength(10); when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenReturn( @@ -150,7 +150,7 @@ public void testGetAsyncMetadataReadAction_DiscoveryNodes() throws IOException, RemoteDiscoveryNodes remoteObjForDownload = new RemoteDiscoveryNodes(fileName, "cluster-uuid", compressor); CountDownLatch latch = new CountDownLatch(1); TestCapturingListener listener = new TestCapturingListener<>(); - remoteClusterStateAttributesManager.getAsyncMetadataReadAction( + remoteClusterStateAttributesManager.getAsyncReadRunnable( DISCOVERY_NODES, remoteObjForDownload, new LatchedActionListener<>(listener, latch) @@ -166,7 +166,7 @@ public void testGetAsyncMetadataReadAction_DiscoveryNodes() throws IOException, assertEquals(discoveryNodes.getClusterManagerNodeId(), readDiscoveryNodes.getClusterManagerNodeId()); } - public void testGetAsyncMetadataWriteAction_ClusterBlocks() throws IOException, InterruptedException { + public void testGetAsyncWriteRunnable_ClusterBlocks() throws IOException, InterruptedException { ClusterBlocks clusterBlocks = randomClusterBlocks(); RemoteClusterBlocks remoteClusterBlocks = new RemoteClusterBlocks(clusterBlocks, VERSION, CLUSTER_UUID, compressor); doAnswer(invocationOnMock -> { @@ -176,7 +176,7 @@ public void testGetAsyncMetadataWriteAction_ClusterBlocks() throws IOException, .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); final CountDownLatch latch = new CountDownLatch(1); final TestCapturingListener listener = new TestCapturingListener<>(); - remoteClusterStateAttributesManager.getAsyncMetadataWriteAction( + remoteClusterStateAttributesManager.getAsyncWriteRunnable( CLUSTER_BLOCKS, remoteClusterBlocks, new LatchedActionListener<>(listener, latch) @@ -199,7 +199,7 @@ public void testGetAsyncMetadataWriteAction_ClusterBlocks() throws IOException, assertEquals(CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION, Integer.parseInt(splitFileName[3])); } - public void testGetAsyncMetadataReadAction_ClusterBlocks() throws IOException, InterruptedException { + public void testGetAsyncReadRunnable_ClusterBlocks() throws IOException, InterruptedException { ClusterBlocks clusterBlocks = randomClusterBlocks(); String fileName = randomAlphaOfLength(10); when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenReturn( @@ -209,7 +209,7 @@ public void testGetAsyncMetadataReadAction_ClusterBlocks() throws IOException, I CountDownLatch latch = new CountDownLatch(1); TestCapturingListener listener = new TestCapturingListener<>(); - remoteClusterStateAttributesManager.getAsyncMetadataReadAction( + remoteClusterStateAttributesManager.getAsyncReadRunnable( CLUSTER_BLOCKS, remoteClusterBlocks, new LatchedActionListener<>(listener, latch) @@ -227,7 +227,7 @@ public void testGetAsyncMetadataReadAction_ClusterBlocks() throws IOException, I } } - public void testGetAsyncMetadataWriteAction_Custom() throws IOException, InterruptedException { + public void testGetAsyncWriteRunnable_Custom() throws IOException, InterruptedException { Custom custom = getClusterStateCustom(); RemoteClusterStateCustoms remoteClusterStateCustoms = new RemoteClusterStateCustoms( custom, @@ -244,7 +244,7 @@ public void testGetAsyncMetadataWriteAction_Custom() throws IOException, Interru .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); final TestCapturingListener listener = new TestCapturingListener<>(); final CountDownLatch latch = new CountDownLatch(1); - remoteClusterStateAttributesManager.getAsyncMetadataWriteAction( + remoteClusterStateAttributesManager.getAsyncWriteRunnable( CLUSTER_STATE_CUSTOM, remoteClusterStateCustoms, new LatchedActionListener<>(listener, latch) @@ -267,7 +267,7 @@ public void testGetAsyncMetadataWriteAction_Custom() throws IOException, Interru assertEquals(CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION, Integer.parseInt(splitFileName[3])); } - public void testGetAsyncMetadataReadAction_Custom() throws IOException, InterruptedException { + public void testGetAsyncReadRunnable_Custom() throws IOException, InterruptedException { Custom custom = getClusterStateCustom(); String fileName = randomAlphaOfLength(10); RemoteClusterStateCustoms remoteClusterStateCustoms = new RemoteClusterStateCustoms( @@ -282,7 +282,7 @@ public void testGetAsyncMetadataReadAction_Custom() throws IOException, Interrup ); TestCapturingListener capturingListener = new TestCapturingListener<>(); final CountDownLatch latch = new CountDownLatch(1); - remoteClusterStateAttributesManager.getAsyncMetadataReadAction( + remoteClusterStateAttributesManager.getAsyncReadRunnable( CLUSTER_STATE_CUSTOM, remoteClusterStateCustoms, new LatchedActionListener<>(capturingListener, latch) @@ -295,7 +295,7 @@ public void testGetAsyncMetadataReadAction_Custom() throws IOException, Interrup assertEquals(CLUSTER_STATE_CUSTOM, capturingListener.getResult().getComponentName()); } - public void testGetAsyncMetadataWriteAction_Exception() throws IOException, InterruptedException { + public void testGetAsyncWriteRunnable_Exception() throws IOException, InterruptedException { DiscoveryNodes discoveryNodes = getDiscoveryNodes(); RemoteDiscoveryNodes remoteDiscoveryNodes = new RemoteDiscoveryNodes(discoveryNodes, VERSION, CLUSTER_UUID, compressor); @@ -308,7 +308,7 @@ public void testGetAsyncMetadataWriteAction_Exception() throws IOException, Inte TestCapturingListener capturingListener = new TestCapturingListener<>(); final CountDownLatch latch = new CountDownLatch(1); - remoteClusterStateAttributesManager.getAsyncMetadataWriteAction( + remoteClusterStateAttributesManager.getAsyncWriteRunnable( DISCOVERY_NODES, remoteDiscoveryNodes, new LatchedActionListener<>(capturingListener, latch) @@ -319,14 +319,14 @@ public void testGetAsyncMetadataWriteAction_Exception() throws IOException, Inte assertEquals(ioException, capturingListener.getFailure().getCause()); } - public void testGetAsyncMetadataReadAction_Exception() throws IOException, InterruptedException { + public void testGetAsyncReadRunnable_Exception() throws IOException, InterruptedException { String fileName = randomAlphaOfLength(10); RemoteDiscoveryNodes remoteDiscoveryNodes = new RemoteDiscoveryNodes(fileName, CLUSTER_UUID, compressor); Exception ioException = new IOException("mock test exception"); when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenThrow(ioException); CountDownLatch latch = new CountDownLatch(1); TestCapturingListener capturingListener = new TestCapturingListener<>(); - remoteClusterStateAttributesManager.getAsyncMetadataReadAction( + remoteClusterStateAttributesManager.getAsyncReadRunnable( DISCOVERY_NODES, remoteDiscoveryNodes, new LatchedActionListener<>(capturingListener, latch) diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManagerTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManagerTests.java index c543f986b3e86..3d3eebe9ed161 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManagerTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManagerTests.java @@ -157,7 +157,7 @@ public void testGlobalMetadataUploadWaitTimeSetting() { assertEquals(globalMetadataUploadTimeout, remoteGlobalMetadataManager.getGlobalMetadataUploadTimeout().seconds()); } - public void testGetReadMetadataAsyncAction_CoordinationMetadata() throws Exception { + public void testGetAsyncReadRunnable_CoordinationMetadata() throws Exception { CoordinationMetadata coordinationMetadata = getCoordinationMetadata(); String fileName = randomAlphaOfLength(10); RemoteCoordinationMetadata coordinationMetadataForDownload = new RemoteCoordinationMetadata( @@ -172,9 +172,9 @@ public void testGetReadMetadataAsyncAction_CoordinationMetadata() throws Excepti TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.getAsyncMetadataReadAction( - coordinationMetadataForDownload, + remoteGlobalMetadataManager.getAsyncReadRunnable( COORDINATION_METADATA, + coordinationMetadataForDownload, new LatchedActionListener<>(listener, latch) ).run(); latch.await(); @@ -185,7 +185,7 @@ public void testGetReadMetadataAsyncAction_CoordinationMetadata() throws Excepti assertEquals(COORDINATION_METADATA, listener.getResult().getComponentName()); } - public void testGetAsyncMetadataWriteAction_CoordinationMetadata() throws Exception { + public void testGetAsyncWriteRunnable_CoordinationMetadata() throws Exception { CoordinationMetadata coordinationMetadata = getCoordinationMetadata(); RemoteCoordinationMetadata remoteCoordinationMetadata = new RemoteCoordinationMetadata( coordinationMetadata, @@ -202,7 +202,7 @@ public void testGetAsyncMetadataWriteAction_CoordinationMetadata() throws Except TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.getAsyncMetadataWriteAction(remoteCoordinationMetadata, new LatchedActionListener<>(listener, latch)) + remoteGlobalMetadataManager.getAsyncWriteRunnable(COORDINATION_METADATA, remoteCoordinationMetadata, new LatchedActionListener<>(listener, latch)) .run(); latch.await(); assertNull(listener.getFailure()); @@ -223,7 +223,7 @@ public void testGetAsyncMetadataWriteAction_CoordinationMetadata() throws Except assertEquals(GLOBAL_METADATA_CURRENT_CODEC_VERSION, Integer.parseInt(splitFileName[3])); } - public void testGetReadMetadataAsyncAction_PersistentSettings() throws Exception { + public void testGetAsyncReadRunnable_PersistentSettings() throws Exception { Settings settingsMetadata = getSettings(); String fileName = randomAlphaOfLength(10); RemotePersistentSettingsMetadata persistentSettings = new RemotePersistentSettingsMetadata( @@ -239,9 +239,9 @@ public void testGetReadMetadataAsyncAction_PersistentSettings() throws Exception TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.getAsyncMetadataReadAction( - persistentSettings, + remoteGlobalMetadataManager.getAsyncReadRunnable( SETTING_METADATA, + persistentSettings, new LatchedActionListener<>(listener, latch) ).run(); latch.await(); @@ -252,7 +252,7 @@ public void testGetReadMetadataAsyncAction_PersistentSettings() throws Exception assertEquals(SETTING_METADATA, listener.getResult().getComponentName()); } - public void testGetAsyncMetadataWriteAction_PersistentSettings() throws Exception { + public void testGetAsyncWriteRunnable_PersistentSettings() throws Exception { Settings settingsMetadata = getSettings(); RemotePersistentSettingsMetadata persistentSettings = new RemotePersistentSettingsMetadata( settingsMetadata, @@ -268,7 +268,7 @@ public void testGetAsyncMetadataWriteAction_PersistentSettings() throws Exceptio .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.getAsyncMetadataWriteAction(persistentSettings, new LatchedActionListener<>(listener, latch)).run(); + remoteGlobalMetadataManager.getAsyncWriteRunnable(SETTING_METADATA, persistentSettings, new LatchedActionListener<>(listener, latch)).run(); latch.await(); assertNull(listener.getFailure()); @@ -289,7 +289,7 @@ public void testGetAsyncMetadataWriteAction_PersistentSettings() throws Exceptio assertEquals(GLOBAL_METADATA_CURRENT_CODEC_VERSION, Integer.parseInt(splitFileName[3])); } - public void testGetReadMetadataAsyncAction_TransientSettings() throws Exception { + public void testGetAsyncReadRunnable_TransientSettings() throws Exception { Settings settingsMetadata = getSettings(); String fileName = randomAlphaOfLength(10); RemoteTransientSettingsMetadata transientSettings = new RemoteTransientSettingsMetadata( @@ -305,9 +305,9 @@ public void testGetReadMetadataAsyncAction_TransientSettings() throws Exception TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.getAsyncMetadataReadAction( - transientSettings, + remoteGlobalMetadataManager.getAsyncReadRunnable( TRANSIENT_SETTING_METADATA, + transientSettings, new LatchedActionListener<>(listener, latch) ).run(); latch.await(); @@ -318,7 +318,7 @@ public void testGetReadMetadataAsyncAction_TransientSettings() throws Exception assertEquals(TRANSIENT_SETTING_METADATA, listener.getResult().getComponentName()); } - public void testGetAsyncMetadataWriteAction_TransientSettings() throws Exception { + public void testGetAsyncWriteRunnable_TransientSettings() throws Exception { Settings settingsMetadata = getSettings(); RemoteTransientSettingsMetadata transientSettings = new RemoteTransientSettingsMetadata( settingsMetadata, @@ -334,7 +334,7 @@ public void testGetAsyncMetadataWriteAction_TransientSettings() throws Exception .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.getAsyncMetadataWriteAction(transientSettings, new LatchedActionListener<>(listener, latch)).run(); + remoteGlobalMetadataManager.getAsyncWriteRunnable(TRANSIENT_SETTING_METADATA, transientSettings, new LatchedActionListener<>(listener, latch)).run(); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -354,7 +354,7 @@ public void testGetAsyncMetadataWriteAction_TransientSettings() throws Exception assertEquals(GLOBAL_METADATA_CURRENT_CODEC_VERSION, Integer.parseInt(splitFileName[3])); } - public void testGetReadMetadataAsyncAction_HashesOfConsistentSettings() throws Exception { + public void testGetAsyncReadRunnable_HashesOfConsistentSettings() throws Exception { DiffableStringMap hashesOfConsistentSettings = getHashesOfConsistentSettings(); String fileName = randomAlphaOfLength(10); RemoteHashesOfConsistentSettings hashesOfConsistentSettingsForDownload = new RemoteHashesOfConsistentSettings( @@ -368,9 +368,9 @@ public void testGetReadMetadataAsyncAction_HashesOfConsistentSettings() throws E TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.getAsyncMetadataReadAction( - hashesOfConsistentSettingsForDownload, + remoteGlobalMetadataManager.getAsyncReadRunnable( HASHES_OF_CONSISTENT_SETTINGS, + hashesOfConsistentSettingsForDownload, new LatchedActionListener<>(listener, latch) ).run(); latch.await(); @@ -381,7 +381,7 @@ public void testGetReadMetadataAsyncAction_HashesOfConsistentSettings() throws E assertEquals(HASHES_OF_CONSISTENT_SETTINGS, listener.getResult().getComponentName()); } - public void testGetAsyncMetadataWriteAction_HashesOfConsistentSettings() throws Exception { + public void testGetAsyncWriteRunnable_HashesOfConsistentSettings() throws Exception { DiffableStringMap hashesOfConsistentSettings = getHashesOfConsistentSettings(); RemoteHashesOfConsistentSettings hashesOfConsistentSettingsForUpload = new RemoteHashesOfConsistentSettings( hashesOfConsistentSettings, @@ -396,7 +396,8 @@ public void testGetAsyncMetadataWriteAction_HashesOfConsistentSettings() throws .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.getAsyncMetadataWriteAction( + remoteGlobalMetadataManager.getAsyncWriteRunnable( + HASHES_OF_CONSISTENT_SETTINGS, hashesOfConsistentSettingsForUpload, new LatchedActionListener<>(listener, latch) ).run(); @@ -419,7 +420,7 @@ public void testGetAsyncMetadataWriteAction_HashesOfConsistentSettings() throws assertEquals(GLOBAL_METADATA_CURRENT_CODEC_VERSION, Integer.parseInt(splitFileName[3])); } - public void testGetReadMetadataAsyncAction_TemplatesMetadata() throws Exception { + public void testGetAsyncReadRunnable_TemplatesMetadata() throws Exception { TemplatesMetadata templatesMetadata = getTemplatesMetadata(); String fileName = randomAlphaOfLength(10); RemoteTemplatesMetadata templatesMetadataForDownload = new RemoteTemplatesMetadata( @@ -433,9 +434,9 @@ public void testGetReadMetadataAsyncAction_TemplatesMetadata() throws Exception ); TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.getAsyncMetadataReadAction( - templatesMetadataForDownload, + remoteGlobalMetadataManager.getAsyncReadRunnable( TEMPLATES_METADATA, + templatesMetadataForDownload, new LatchedActionListener<>(listener, latch) ).run(); latch.await(); @@ -446,7 +447,7 @@ public void testGetReadMetadataAsyncAction_TemplatesMetadata() throws Exception assertEquals(TEMPLATES_METADATA, listener.getResult().getComponentName()); } - public void testGetAsyncMetadataWriteAction_TemplatesMetadata() throws Exception { + public void testGetAsyncWriteRunnable_TemplatesMetadata() throws Exception { TemplatesMetadata templatesMetadata = getTemplatesMetadata(); RemoteTemplatesMetadata templateMetadataForUpload = new RemoteTemplatesMetadata( templatesMetadata, @@ -462,7 +463,7 @@ public void testGetAsyncMetadataWriteAction_TemplatesMetadata() throws Exception .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.getAsyncMetadataWriteAction(templateMetadataForUpload, new LatchedActionListener<>(listener, latch)) + remoteGlobalMetadataManager.getAsyncWriteRunnable(TEMPLATES_METADATA, templateMetadataForUpload, new LatchedActionListener<>(listener, latch)) .run(); latch.await(); assertNull(listener.getFailure()); @@ -483,7 +484,7 @@ public void testGetAsyncMetadataWriteAction_TemplatesMetadata() throws Exception assertEquals(GLOBAL_METADATA_CURRENT_CODEC_VERSION, Integer.parseInt(splitFileName[3])); } - public void testGetReadMetadataAsyncAction_CustomMetadata() throws Exception { + public void testGetAsyncReadRunnable_CustomMetadata() throws Exception { Metadata.Custom customMetadata = getCustomMetadata(); String fileName = randomAlphaOfLength(10); RemoteCustomMetadata customMetadataForDownload = new RemoteCustomMetadata( @@ -498,9 +499,9 @@ public void testGetReadMetadataAsyncAction_CustomMetadata() throws Exception { ); TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.getAsyncMetadataReadAction( - customMetadataForDownload, + remoteGlobalMetadataManager.getAsyncReadRunnable( IndexGraveyard.TYPE, + customMetadataForDownload, new LatchedActionListener<>(listener, latch) ).run(); latch.await(); @@ -511,7 +512,7 @@ public void testGetReadMetadataAsyncAction_CustomMetadata() throws Exception { assertEquals(IndexGraveyard.TYPE, listener.getResult().getComponentName()); } - public void testGetAsyncMetadataWriteAction_CustomMetadata() throws Exception { + public void testGetAsyncWriteRunnable_CustomMetadata() throws Exception { Metadata.Custom customMetadata = getCustomMetadata(); RemoteCustomMetadata customMetadataForUpload = new RemoteCustomMetadata( customMetadata, @@ -528,7 +529,7 @@ public void testGetAsyncMetadataWriteAction_CustomMetadata() throws Exception { .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.getAsyncMetadataWriteAction(customMetadataForUpload, new LatchedActionListener<>(listener, latch)) + remoteGlobalMetadataManager.getAsyncWriteRunnable(customMetadataForUpload.getType(), customMetadataForUpload, new LatchedActionListener<>(listener, latch)) .run(); latch.await(); assertNull(listener.getFailure()); @@ -549,7 +550,7 @@ public void testGetAsyncMetadataWriteAction_CustomMetadata() throws Exception { assertEquals(GLOBAL_METADATA_CURRENT_CODEC_VERSION, Integer.parseInt(splitFileName[3])); } - public void testGetReadMetadataAsyncAction_GlobalMetadata() throws Exception { + public void testGetAsyncReadRunnable_GlobalMetadata() throws Exception { Metadata metadata = getGlobalMetadata(); String fileName = randomAlphaOfLength(10); RemoteGlobalMetadata globalMetadataForDownload = new RemoteGlobalMetadata(fileName, CLUSTER_UUID, compressor, xContentRegistry); @@ -558,9 +559,9 @@ public void testGetReadMetadataAsyncAction_GlobalMetadata() throws Exception { ); TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.getAsyncMetadataReadAction( - globalMetadataForDownload, + remoteGlobalMetadataManager.getAsyncReadRunnable( GLOBAL_METADATA, + globalMetadataForDownload, new LatchedActionListener<>(listener, latch) ).run(); latch.await(); @@ -571,7 +572,7 @@ public void testGetReadMetadataAsyncAction_GlobalMetadata() throws Exception { assertEquals(GLOBAL_METADATA, listener.getResult().getComponentName()); } - public void testGetReadMetadataAsyncAction_IOException() throws Exception { + public void testGetAsyncReadRunnable_IOException() throws Exception { String fileName = randomAlphaOfLength(10); RemoteCoordinationMetadata coordinationMetadataForDownload = new RemoteCoordinationMetadata( fileName, @@ -583,9 +584,9 @@ public void testGetReadMetadataAsyncAction_IOException() throws Exception { when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenThrow(ioException); TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.getAsyncMetadataReadAction( - coordinationMetadataForDownload, + remoteGlobalMetadataManager.getAsyncReadRunnable( COORDINATION_METADATA, + coordinationMetadataForDownload, new LatchedActionListener<>(listener, latch) ).run(); latch.await(); @@ -594,7 +595,7 @@ public void testGetReadMetadataAsyncAction_IOException() throws Exception { assertEquals(ioException, listener.getFailure()); } - public void testGetAsyncMetadataWriteAction_IOException() throws Exception { + public void testGetAsyncWriteRunnable_IOException() throws Exception { CoordinationMetadata coordinationMetadata = getCoordinationMetadata(); RemoteCoordinationMetadata remoteCoordinationMetadata = new RemoteCoordinationMetadata( coordinationMetadata, @@ -612,7 +613,7 @@ public void testGetAsyncMetadataWriteAction_IOException() throws Exception { TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.getAsyncMetadataWriteAction(remoteCoordinationMetadata, new LatchedActionListener<>(listener, latch)) + remoteGlobalMetadataManager.getAsyncWriteRunnable(COORDINATION_METADATA, remoteCoordinationMetadata, new LatchedActionListener<>(listener, latch)) .run(); assertNull(listener.getResult()); assertNotNull(listener.getFailure());