diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java index f3f245ee9f8f0..d7ebc54598b37 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java @@ -15,7 +15,6 @@ import org.opensearch.cluster.DiffableUtils; import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.RoutingTable; -import org.opensearch.common.CheckedRunnable; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.lifecycle.AbstractLifecycleComponent; import org.opensearch.common.remote.RemoteWritableEntityStore; @@ -102,16 +101,16 @@ public DiffableUtils.MapDiff getAsyncIndexRoutingWriteAction( + public void getAsyncIndexRoutingWriteAction( String clusterUUID, long term, long version, @@ -128,7 +127,7 @@ public CheckedRunnable getAsyncIndexRoutingWriteAction( ) ); - return () -> remoteIndexRoutingTableStore.writeAsync(remoteIndexRoutingTable, completionListener); + remoteIndexRoutingTableStore.writeAsync(remoteIndexRoutingTable, completionListener); } /** @@ -156,7 +155,7 @@ public List getAllUploadedIndices } @Override - public CheckedRunnable getAsyncIndexRoutingReadAction( + public void getAsyncIndexRoutingReadAction( String clusterUUID, String uploadedFilename, LatchedActionListener latchedActionListener @@ -169,7 +168,7 @@ public CheckedRunnable getAsyncIndexRoutingReadAction( RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable(uploadedFilename, clusterUUID, compressor); - return () -> remoteIndexRoutingTableStore.readAsync(remoteIndexRoutingTable, actionListener); + remoteIndexRoutingTableStore.readAsync(remoteIndexRoutingTable, actionListener); } @Override diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java index 4636e492df28f..e6e68e01e761f 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java @@ -12,7 +12,6 @@ import org.opensearch.cluster.DiffableUtils; import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.RoutingTable; -import org.opensearch.common.CheckedRunnable; import org.opensearch.common.lifecycle.AbstractLifecycleComponent; import org.opensearch.gateway.remote.ClusterMetadataManifest; @@ -39,7 +38,7 @@ public DiffableUtils.MapDiff getAsyncIndexRoutingWriteAction( + public void getAsyncIndexRoutingWriteAction( String clusterUUID, long term, long version, @@ -47,7 +46,6 @@ public CheckedRunnable getAsyncIndexRoutingWriteAction( LatchedActionListener latchedActionListener ) { // noop - return () -> {}; } @Override @@ -61,13 +59,12 @@ public List getAllUploadedIndices } @Override - public CheckedRunnable getAsyncIndexRoutingReadAction( + public void getAsyncIndexRoutingReadAction( String clusterUUID, String uploadedFilename, LatchedActionListener latchedActionListener ) { // noop - return () -> {}; } @Override diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java index d319123bc2cee..0b0b4bb7dbc84 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java @@ -12,7 +12,6 @@ import org.opensearch.cluster.DiffableUtils; import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.RoutingTable; -import org.opensearch.common.CheckedRunnable; import org.opensearch.common.lifecycle.LifecycleComponent; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; @@ -43,7 +42,7 @@ public IndexRoutingTable read(StreamInput in, String key) throws IOException { List getIndicesRouting(RoutingTable routingTable); - CheckedRunnable getAsyncIndexRoutingReadAction( + void getAsyncIndexRoutingReadAction( String clusterUUID, String uploadedFilename, LatchedActionListener latchedActionListener @@ -59,7 +58,7 @@ DiffableUtils.MapDiff> RoutingTable after ); - CheckedRunnable getAsyncIndexRoutingWriteAction( + void getAsyncIndexRoutingWriteAction( String clusterUUID, long term, long version, diff --git a/server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableEntityManager.java b/server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableEntityManager.java index 7680b4a420aef..a72794e23426a 100644 --- a/server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableEntityManager.java +++ b/server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableEntityManager.java @@ -8,12 +8,10 @@ package org.opensearch.common.remote; -import org.opensearch.common.CheckedRunnable; import org.opensearch.core.action.ActionListener; import org.opensearch.gateway.remote.ClusterMetadataManifest; import org.opensearch.gateway.remote.model.RemoteReadResult; -import java.io.IOException; import java.util.HashMap; import java.util.Map; @@ -71,20 +69,16 @@ protected abstract ActionListener getReadActionListener( ); @Override - public CheckedRunnable asyncWrite( + public void writeAsync( String component, AbstractRemoteWritableBlobEntity entity, ActionListener listener ) { - return () -> getStore(entity).writeAsync(entity, getWriteActionListener(component, entity, listener)); + getStore(entity).writeAsync(entity, getWriteActionListener(component, entity, listener)); } @Override - public CheckedRunnable asyncRead( - String component, - AbstractRemoteWritableBlobEntity entity, - ActionListener listener - ) { - return () -> getStore(entity).readAsync(entity, getReadActionListener(component, entity, listener)); + public void readAsync(String component, AbstractRemoteWritableBlobEntity entity, ActionListener listener) { + getStore(entity).readAsync(entity, getReadActionListener(component, entity, listener)); } } diff --git a/server/src/main/java/org/opensearch/common/remote/RemoteWritableEntityManager.java b/server/src/main/java/org/opensearch/common/remote/RemoteWritableEntityManager.java index bcaabac837b1f..7693d1b5284bd 100644 --- a/server/src/main/java/org/opensearch/common/remote/RemoteWritableEntityManager.java +++ b/server/src/main/java/org/opensearch/common/remote/RemoteWritableEntityManager.java @@ -8,43 +8,40 @@ package org.opensearch.common.remote; -import org.opensearch.common.CheckedRunnable; import org.opensearch.core.action.ActionListener; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata; import org.opensearch.gateway.remote.model.RemoteReadResult; -import java.io.IOException; - /** * The RemoteWritableEntityManager interface provides async read and write methods for managing remote entities in the remote store */ public interface RemoteWritableEntityManager { /** - * Returns a CheckedRunnable that performs an asynchronous read operation for the specified component and entity. + * Performs an asynchronous read operation for the specified component and entity. * * @param component the component for which the read operation is performed * @param entity the entity to be read - * @param listener the listener to be notified when the read operation completes - * @return a CheckedRunnable that performs the asynchronous read operation + * @param listener the listener to be notified when the read operation completes. + * The listener's {@link ActionListener#onResponse(Object)} method + * is called with a {@link RemoteReadResult} object containing the + * read data on successful read. The + * {@link ActionListener#onFailure(Exception)} method is called with + * an exception if the read operation fails. */ - CheckedRunnable asyncRead( - String component, - AbstractRemoteWritableBlobEntity entity, - ActionListener listener - ); + void readAsync(String component, AbstractRemoteWritableBlobEntity entity, ActionListener listener); /** - * Returns a CheckedRunnable that performs an asynchronous write operation for the specified component and entity. + * Performs an asynchronous write operation for the specified component and entity. * * @param component the component for which the write operation is performed * @param entity the entity to be written - * @param listener the listener to be notified when the write operation completes - * @return a CheckedRunnable that performs the asynchronous write operation + * @param listener the listener to be notified when the write operation completes. + * The listener's {@link ActionListener#onResponse(Object)} method + * is called with a {@link UploadedMetadata} object containing the + * uploaded metadata on successful write. The + * {@link ActionListener#onFailure(Exception)} method is called with + * an exception if the write operation fails. */ - CheckedRunnable asyncWrite( - String component, - AbstractRemoteWritableBlobEntity entity, - ActionListener listener - ); + void writeAsync(String component, AbstractRemoteWritableBlobEntity entity, ActionListener listener); } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 0464571a2c6f3..402fd14128627 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -29,7 +29,6 @@ import org.opensearch.cluster.routing.remote.RemoteRoutingTableService; import org.opensearch.cluster.routing.remote.RemoteRoutingTableServiceFactory; import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.CheckedRunnable; import org.opensearch.common.Nullable; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobStore; @@ -497,7 +496,7 @@ UploadedMetadataResults writeMetadataInParallel( + (uploadDiscoveryNodes ? 1 : 0) + (uploadClusterBlock ? 1 : 0) + (uploadTransientSettingMetadata ? 1 : 0) + clusterStateCustomToUpload.size() + (uploadHashesOfConsistentSettings ? 1 : 0) + indicesRoutingToUpload.size(); CountDownLatch latch = new CountDownLatch(totalUploadTasks); - Map> uploadTasks = new ConcurrentHashMap<>(totalUploadTasks); + List uploadTasks = Collections.synchronizedList(new ArrayList<>(totalUploadTasks)); Map results = new ConcurrentHashMap<>(totalUploadTasks); List exceptionList = Collections.synchronizedList(new ArrayList<>(totalUploadTasks)); @@ -516,182 +515,155 @@ UploadedMetadataResults writeMetadataInParallel( ); if (uploadSettingsMetadata) { - uploadTasks.put( + uploadTasks.add(SETTING_METADATA); + remoteGlobalMetadataManager.writeAsync( SETTING_METADATA, - remoteGlobalMetadataManager.asyncWrite( - SETTING_METADATA, - new RemotePersistentSettingsMetadata( - clusterState.metadata().persistentSettings(), - clusterState.metadata().version(), - clusterState.metadata().clusterUUID(), - blobStoreRepository.getCompressor(), - blobStoreRepository.getNamedXContentRegistry() - ), - listener - ) + new RemotePersistentSettingsMetadata( + clusterState.metadata().persistentSettings(), + clusterState.metadata().version(), + clusterState.metadata().clusterUUID(), + blobStoreRepository.getCompressor(), + blobStoreRepository.getNamedXContentRegistry() + ), + listener ); } if (uploadTransientSettingMetadata) { - uploadTasks.put( + uploadTasks.add(TRANSIENT_SETTING_METADATA); + remoteGlobalMetadataManager.writeAsync( TRANSIENT_SETTING_METADATA, - remoteGlobalMetadataManager.asyncWrite( - TRANSIENT_SETTING_METADATA, - new RemoteTransientSettingsMetadata( - clusterState.metadata().transientSettings(), - clusterState.metadata().version(), - clusterState.metadata().clusterUUID(), - blobStoreRepository.getCompressor(), - blobStoreRepository.getNamedXContentRegistry() - ), - listener - ) + new RemoteTransientSettingsMetadata( + clusterState.metadata().transientSettings(), + clusterState.metadata().version(), + clusterState.metadata().clusterUUID(), + blobStoreRepository.getCompressor(), + blobStoreRepository.getNamedXContentRegistry() + ), + listener ); } if (uploadCoordinationMetadata) { - uploadTasks.put( + uploadTasks.add(COORDINATION_METADATA); + remoteGlobalMetadataManager.writeAsync( COORDINATION_METADATA, - remoteGlobalMetadataManager.asyncWrite( - COORDINATION_METADATA, - new RemoteCoordinationMetadata( - clusterState.metadata().coordinationMetadata(), - clusterState.metadata().version(), - clusterState.metadata().clusterUUID(), - blobStoreRepository.getCompressor(), - blobStoreRepository.getNamedXContentRegistry() - ), - listener - ) + new RemoteCoordinationMetadata( + clusterState.metadata().coordinationMetadata(), + clusterState.metadata().version(), + clusterState.metadata().clusterUUID(), + blobStoreRepository.getCompressor(), + blobStoreRepository.getNamedXContentRegistry() + ), + listener ); } if (uploadTemplateMetadata) { - uploadTasks.put( + uploadTasks.add(TEMPLATES_METADATA); + remoteGlobalMetadataManager.writeAsync( TEMPLATES_METADATA, - remoteGlobalMetadataManager.asyncWrite( - TEMPLATES_METADATA, - new RemoteTemplatesMetadata( - clusterState.metadata().templatesMetadata(), - clusterState.metadata().version(), - clusterState.metadata().clusterUUID(), - blobStoreRepository.getCompressor(), - blobStoreRepository.getNamedXContentRegistry() - ), - listener - ) + new RemoteTemplatesMetadata( + clusterState.metadata().templatesMetadata(), + clusterState.metadata().version(), + clusterState.metadata().clusterUUID(), + blobStoreRepository.getCompressor(), + blobStoreRepository.getNamedXContentRegistry() + ), + listener ); } if (uploadDiscoveryNodes) { - uploadTasks.put( - DISCOVERY_NODES, - remoteClusterStateAttributesManager.asyncWrite( - RemoteDiscoveryNodes.DISCOVERY_NODES, - new RemoteDiscoveryNodes( - clusterState.nodes(), - clusterState.version(), - clusterState.stateUUID(), - blobStoreRepository.getCompressor() - ), - listener - ) + uploadTasks.add(DISCOVERY_NODES); + remoteClusterStateAttributesManager.writeAsync( + RemoteDiscoveryNodes.DISCOVERY_NODES, + new RemoteDiscoveryNodes( + clusterState.nodes(), + clusterState.version(), + clusterState.stateUUID(), + blobStoreRepository.getCompressor() + ), + listener ); } if (uploadClusterBlock) { - uploadTasks.put( - CLUSTER_BLOCKS, - remoteClusterStateAttributesManager.asyncWrite( - RemoteClusterBlocks.CLUSTER_BLOCKS, - new RemoteClusterBlocks( - clusterState.blocks(), - clusterState.version(), - clusterState.metadata().clusterUUID(), - blobStoreRepository.getCompressor() - ), - listener - ) + uploadTasks.add(CLUSTER_BLOCKS); + remoteClusterStateAttributesManager.writeAsync( + RemoteClusterBlocks.CLUSTER_BLOCKS, + new RemoteClusterBlocks( + clusterState.blocks(), + clusterState.version(), + clusterState.metadata().clusterUUID(), + blobStoreRepository.getCompressor() + ), + listener ); } if (uploadHashesOfConsistentSettings) { - uploadTasks.put( + uploadTasks.add(HASHES_OF_CONSISTENT_SETTINGS); + remoteGlobalMetadataManager.writeAsync( HASHES_OF_CONSISTENT_SETTINGS, - remoteGlobalMetadataManager.asyncWrite( - HASHES_OF_CONSISTENT_SETTINGS, - new RemoteHashesOfConsistentSettings( - (DiffableStringMap) clusterState.metadata().hashesOfConsistentSettings(), - clusterState.metadata().version(), - clusterState.metadata().clusterUUID(), - blobStoreRepository.getCompressor() - ), - listener - ) + new RemoteHashesOfConsistentSettings( + (DiffableStringMap) clusterState.metadata().hashesOfConsistentSettings(), + clusterState.metadata().version(), + clusterState.metadata().clusterUUID(), + blobStoreRepository.getCompressor() + ), + listener ); } customToUpload.forEach((key, value) -> { String customComponent = String.join(CUSTOM_DELIMITER, CUSTOM_METADATA, key); - uploadTasks.put( + uploadTasks.add(customComponent); + remoteGlobalMetadataManager.writeAsync( customComponent, - remoteGlobalMetadataManager.asyncWrite( - customComponent, - new RemoteCustomMetadata( - value, - key, - clusterState.metadata().version(), - clusterState.metadata().clusterUUID(), - blobStoreRepository.getCompressor(), - namedWriteableRegistry - ), - listener - ) + new RemoteCustomMetadata( + value, + key, + clusterState.metadata().version(), + clusterState.metadata().clusterUUID(), + blobStoreRepository.getCompressor(), + namedWriteableRegistry + ), + listener ); }); indexToUpload.forEach(indexMetadata -> { - uploadTasks.put( + uploadTasks.add(indexMetadata.getIndex().getName()); + remoteIndexMetadataManager.writeAsync( indexMetadata.getIndex().getName(), - remoteIndexMetadataManager.asyncWrite( - indexMetadata.getIndex().getName(), - new RemoteIndexMetadata( - indexMetadata, - clusterState.metadata().clusterUUID(), - blobStoreRepository.getCompressor(), - blobStoreRepository.getNamedXContentRegistry() - ), - listener - ) + new RemoteIndexMetadata( + indexMetadata, + clusterState.metadata().clusterUUID(), + blobStoreRepository.getCompressor(), + blobStoreRepository.getNamedXContentRegistry() + ), + listener ); }); clusterStateCustomToUpload.forEach((key, value) -> { - uploadTasks.put( - key, - remoteClusterStateAttributesManager.asyncWrite( - CLUSTER_STATE_CUSTOM, - new RemoteClusterStateCustoms( - value, - key, - clusterState.version(), - clusterState.metadata().clusterUUID(), - blobStoreRepository.getCompressor(), - namedWriteableRegistry - ), - listener - ) + uploadTasks.add(key); + remoteClusterStateAttributesManager.writeAsync( + CLUSTER_STATE_CUSTOM, + new RemoteClusterStateCustoms( + value, + key, + clusterState.version(), + clusterState.metadata().clusterUUID(), + blobStoreRepository.getCompressor(), + namedWriteableRegistry + ), + listener ); }); indicesRoutingToUpload.forEach(indexRoutingTable -> { - uploadTasks.put( - INDEX_ROUTING_METADATA_PREFIX + indexRoutingTable.getIndex().getName(), - remoteRoutingTableService.getAsyncIndexRoutingWriteAction( - clusterState.metadata().clusterUUID(), - clusterState.term(), - clusterState.version(), - indexRoutingTable, - listener - ) + uploadTasks.add(INDEX_ROUTING_METADATA_PREFIX + indexRoutingTable.getIndex().getName()); + remoteRoutingTableService.getAsyncIndexRoutingWriteAction( + clusterState.metadata().clusterUUID(), + clusterState.term(), + clusterState.version(), + indexRoutingTable, + listener ); }); - - // start async upload of all required metadata files - for (CheckedRunnable uploadTask : uploadTasks.values()) { - uploadTask.run(); - } invokeIndexMetadataUploadListeners(indexToUpload, prevIndexMetadataByName, latch, exceptionList); try { @@ -701,7 +673,7 @@ UploadedMetadataResults writeMetadataInParallel( String.format( Locale.ROOT, "Timed out waiting for transfer of following metadata to complete - %s", - String.join(", ", uploadTasks.keySet()) + String.join(", ", uploadTasks) ) ); exceptionList.forEach(ex::addSuppressed); @@ -710,11 +682,7 @@ UploadedMetadataResults writeMetadataInParallel( } catch (InterruptedException ex) { exceptionList.forEach(ex::addSuppressed); RemoteStateTransferException exception = new RemoteStateTransferException( - String.format( - Locale.ROOT, - "Timed out waiting for transfer of metadata to complete - %s", - String.join(", ", uploadTasks.keySet()) - ), + String.format(Locale.ROOT, "Timed out waiting for transfer of metadata to complete - %s", String.join(", ", uploadTasks)), ex ); Thread.currentThread().interrupt(); @@ -722,11 +690,7 @@ UploadedMetadataResults writeMetadataInParallel( } if (!exceptionList.isEmpty()) { RemoteStateTransferException exception = new RemoteStateTransferException( - String.format( - Locale.ROOT, - "Exception during transfer of following metadata to Remote - %s", - String.join(", ", uploadTasks.keySet()) - ) + String.format(Locale.ROOT, "Exception during transfer of following metadata to Remote - %s", String.join(", ", uploadTasks)) ); exceptionList.forEach(exception::addSuppressed); throw exception; @@ -1013,7 +977,6 @@ ClusterState readClusterStateInParallel( + (readTransientSettingsMetadata ? 1 : 0) + (readHashesOfConsistentSettings ? 1 : 0) + clusterStateCustomToRead.size() + indicesRoutingToRead.size(); CountDownLatch latch = new CountDownLatch(totalReadTasks); - List> asyncMetadataReadActions = new ArrayList<>(); List readResults = Collections.synchronizedList(new ArrayList<>()); List readIndexRoutingTableResults = Collections.synchronizedList(new ArrayList<>()); List exceptionList = Collections.synchronizedList(new ArrayList<>(totalReadTasks)); @@ -1027,17 +990,15 @@ ClusterState readClusterStateInParallel( }), latch); for (UploadedIndexMetadata indexMetadata : indicesToRead) { - asyncMetadataReadActions.add( - remoteIndexMetadataManager.asyncRead( - indexMetadata.getIndexName(), - new RemoteIndexMetadata( - RemoteClusterStateUtils.getFormattedIndexFileName(indexMetadata.getUploadedFilename()), - clusterUUID, - blobStoreRepository.getCompressor(), - blobStoreRepository.getNamedXContentRegistry() - ), - listener - ) + remoteIndexMetadataManager.readAsync( + indexMetadata.getIndexName(), + new RemoteIndexMetadata( + RemoteClusterStateUtils.getFormattedIndexFileName(indexMetadata.getUploadedFilename()), + clusterUUID, + blobStoreRepository.getCompressor(), + blobStoreRepository.getNamedXContentRegistry() + ), + listener ); } @@ -1053,154 +1014,130 @@ ClusterState readClusterStateInParallel( ); for (UploadedIndexMetadata indexRouting : indicesRoutingToRead) { - asyncMetadataReadActions.add( - remoteRoutingTableService.getAsyncIndexRoutingReadAction( - clusterUUID, - indexRouting.getUploadedFilename(), - routingTableLatchedActionListener - ) + remoteRoutingTableService.getAsyncIndexRoutingReadAction( + clusterUUID, + indexRouting.getUploadedFilename(), + routingTableLatchedActionListener ); } for (Map.Entry entry : customToRead.entrySet()) { - asyncMetadataReadActions.add( - remoteGlobalMetadataManager.asyncRead( - entry.getValue().getAttributeName(), - new RemoteCustomMetadata( - entry.getValue().getUploadedFilename(), - entry.getKey(), - clusterUUID, - blobStoreRepository.getCompressor(), - namedWriteableRegistry - ), - listener - ) + remoteGlobalMetadataManager.readAsync( + entry.getValue().getAttributeName(), + new RemoteCustomMetadata( + entry.getValue().getUploadedFilename(), + entry.getKey(), + clusterUUID, + blobStoreRepository.getCompressor(), + namedWriteableRegistry + ), + listener ); } if (readCoordinationMetadata) { - asyncMetadataReadActions.add( - remoteGlobalMetadataManager.asyncRead( - COORDINATION_METADATA, - new RemoteCoordinationMetadata( - manifest.getCoordinationMetadata().getUploadedFilename(), - clusterUUID, - blobStoreRepository.getCompressor(), - blobStoreRepository.getNamedXContentRegistry() - ), - listener - ) + remoteGlobalMetadataManager.readAsync( + COORDINATION_METADATA, + new RemoteCoordinationMetadata( + manifest.getCoordinationMetadata().getUploadedFilename(), + clusterUUID, + blobStoreRepository.getCompressor(), + blobStoreRepository.getNamedXContentRegistry() + ), + listener ); } if (readSettingsMetadata) { - asyncMetadataReadActions.add( - remoteGlobalMetadataManager.asyncRead( - SETTING_METADATA, - new RemotePersistentSettingsMetadata( - manifest.getSettingsMetadata().getUploadedFilename(), - clusterUUID, - blobStoreRepository.getCompressor(), - blobStoreRepository.getNamedXContentRegistry() - ), - listener - ) + remoteGlobalMetadataManager.readAsync( + SETTING_METADATA, + new RemotePersistentSettingsMetadata( + manifest.getSettingsMetadata().getUploadedFilename(), + clusterUUID, + blobStoreRepository.getCompressor(), + blobStoreRepository.getNamedXContentRegistry() + ), + listener ); } if (readTransientSettingsMetadata) { - asyncMetadataReadActions.add( - remoteGlobalMetadataManager.asyncRead( - TRANSIENT_SETTING_METADATA, - new RemoteTransientSettingsMetadata( - manifest.getTransientSettingsMetadata().getUploadedFilename(), - clusterUUID, - blobStoreRepository.getCompressor(), - blobStoreRepository.getNamedXContentRegistry() - ), - listener - ) + remoteGlobalMetadataManager.readAsync( + TRANSIENT_SETTING_METADATA, + new RemoteTransientSettingsMetadata( + manifest.getTransientSettingsMetadata().getUploadedFilename(), + clusterUUID, + blobStoreRepository.getCompressor(), + blobStoreRepository.getNamedXContentRegistry() + ), + listener ); } if (readTemplatesMetadata) { - asyncMetadataReadActions.add( - remoteGlobalMetadataManager.asyncRead( - TEMPLATES_METADATA, - new RemoteTemplatesMetadata( - manifest.getTemplatesMetadata().getUploadedFilename(), - clusterUUID, - blobStoreRepository.getCompressor(), - blobStoreRepository.getNamedXContentRegistry() - ), - listener - ) + remoteGlobalMetadataManager.readAsync( + TEMPLATES_METADATA, + new RemoteTemplatesMetadata( + manifest.getTemplatesMetadata().getUploadedFilename(), + clusterUUID, + blobStoreRepository.getCompressor(), + blobStoreRepository.getNamedXContentRegistry() + ), + listener ); } if (readDiscoveryNodes) { - asyncMetadataReadActions.add( - remoteClusterStateAttributesManager.asyncRead( - DISCOVERY_NODES, - new RemoteDiscoveryNodes( - manifest.getDiscoveryNodesMetadata().getUploadedFilename(), - clusterUUID, - blobStoreRepository.getCompressor() - ), - listener - ) + remoteClusterStateAttributesManager.readAsync( + DISCOVERY_NODES, + new RemoteDiscoveryNodes( + manifest.getDiscoveryNodesMetadata().getUploadedFilename(), + clusterUUID, + blobStoreRepository.getCompressor() + ), + listener ); } if (readClusterBlocks) { - asyncMetadataReadActions.add( - remoteClusterStateAttributesManager.asyncRead( - CLUSTER_BLOCKS, - new RemoteClusterBlocks( - manifest.getClusterBlocksMetadata().getUploadedFilename(), - clusterUUID, - blobStoreRepository.getCompressor() - ), - listener - ) + remoteClusterStateAttributesManager.readAsync( + CLUSTER_BLOCKS, + new RemoteClusterBlocks( + manifest.getClusterBlocksMetadata().getUploadedFilename(), + clusterUUID, + blobStoreRepository.getCompressor() + ), + listener ); } if (readHashesOfConsistentSettings) { - asyncMetadataReadActions.add( - remoteGlobalMetadataManager.asyncRead( - HASHES_OF_CONSISTENT_SETTINGS, - new RemoteHashesOfConsistentSettings( - manifest.getHashesOfConsistentSettings().getUploadedFilename(), - clusterUUID, - blobStoreRepository.getCompressor() - ), - listener - ) + remoteGlobalMetadataManager.readAsync( + HASHES_OF_CONSISTENT_SETTINGS, + new RemoteHashesOfConsistentSettings( + manifest.getHashesOfConsistentSettings().getUploadedFilename(), + clusterUUID, + blobStoreRepository.getCompressor() + ), + listener ); } for (Map.Entry entry : clusterStateCustomToRead.entrySet()) { - asyncMetadataReadActions.add( - remoteClusterStateAttributesManager.asyncRead( - // pass component name as cluster-state-custom--, so that we can interpret it later - String.join(CUSTOM_DELIMITER, CLUSTER_STATE_CUSTOM, entry.getKey()), - new RemoteClusterStateCustoms( - entry.getValue().getUploadedFilename(), - entry.getValue().getAttributeName(), - clusterUUID, - blobStoreRepository.getCompressor(), - namedWriteableRegistry - ), - listener - ) + remoteClusterStateAttributesManager.readAsync( + // pass component name as cluster-state-custom--, so that we can interpret it later + String.join(CUSTOM_DELIMITER, CLUSTER_STATE_CUSTOM, entry.getKey()), + new RemoteClusterStateCustoms( + entry.getValue().getUploadedFilename(), + entry.getValue().getAttributeName(), + clusterUUID, + blobStoreRepository.getCompressor(), + namedWriteableRegistry + ), + listener ); } - for (CheckedRunnable asyncMetadataReadAction : asyncMetadataReadActions) { - asyncMetadataReadAction.run(); - } - try { if (latch.await(this.remoteStateReadTimeout.getMillis(), TimeUnit.MILLISECONDS) == false) { RemoteStateTransferException exception = new RemoteStateTransferException( diff --git a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java index 564c7f7aed304..f66e096e9b548 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java @@ -543,7 +543,7 @@ public void testGetAsyncIndexRoutingReadAction() throws Exception { "cluster-uuid", uploadedFileName, new LatchedActionListener<>(listener, latch) - ).run(); + ); latch.await(); assertNull(listener.getFailure()); @@ -584,7 +584,7 @@ public void testGetAsyncIndexRoutingWriteAction() throws Exception { clusterState.version(), clusterState.getRoutingTable().indicesRouting().get(indexName), new LatchedActionListener<>(listener, latch) - ).run(); + ); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManagerTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManagerTests.java index fa6412b95574c..4ef459e6657a1 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManagerTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManagerTests.java @@ -117,8 +117,7 @@ public void testGetAsyncWriteRunnable_DiscoveryNodes() throws IOException, Inter .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); final CountDownLatch latch = new CountDownLatch(1); final TestCapturingListener listener = new TestCapturingListener<>(); - remoteClusterStateAttributesManager.asyncWrite(DISCOVERY_NODES, remoteDiscoveryNodes, new LatchedActionListener<>(listener, latch)) - .run(); + remoteClusterStateAttributesManager.writeAsync(DISCOVERY_NODES, remoteDiscoveryNodes, new LatchedActionListener<>(listener, latch)); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -146,8 +145,7 @@ public void testGetAsyncReadRunnable_DiscoveryNodes() throws IOException, Interr RemoteDiscoveryNodes remoteObjForDownload = new RemoteDiscoveryNodes(fileName, "cluster-uuid", compressor); CountDownLatch latch = new CountDownLatch(1); TestCapturingListener listener = new TestCapturingListener<>(); - remoteClusterStateAttributesManager.asyncRead(DISCOVERY_NODES, remoteObjForDownload, new LatchedActionListener<>(listener, latch)) - .run(); + remoteClusterStateAttributesManager.readAsync(DISCOVERY_NODES, remoteObjForDownload, new LatchedActionListener<>(listener, latch)); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -169,8 +167,7 @@ public void testGetAsyncWriteRunnable_ClusterBlocks() throws IOException, Interr .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); final CountDownLatch latch = new CountDownLatch(1); final TestCapturingListener listener = new TestCapturingListener<>(); - remoteClusterStateAttributesManager.asyncWrite(CLUSTER_BLOCKS, remoteClusterBlocks, new LatchedActionListener<>(listener, latch)) - .run(); + remoteClusterStateAttributesManager.writeAsync(CLUSTER_BLOCKS, remoteClusterBlocks, new LatchedActionListener<>(listener, latch)); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -199,8 +196,7 @@ public void testGetAsyncReadRunnable_ClusterBlocks() throws IOException, Interru CountDownLatch latch = new CountDownLatch(1); TestCapturingListener listener = new TestCapturingListener<>(); - remoteClusterStateAttributesManager.asyncRead(CLUSTER_BLOCKS, remoteClusterBlocks, new LatchedActionListener<>(listener, latch)) - .run(); + remoteClusterStateAttributesManager.readAsync(CLUSTER_BLOCKS, remoteClusterBlocks, new LatchedActionListener<>(listener, latch)); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -231,11 +227,11 @@ public void testGetAsyncWriteRunnable_Custom() throws IOException, InterruptedEx .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); final TestCapturingListener listener = new TestCapturingListener<>(); final CountDownLatch latch = new CountDownLatch(1); - remoteClusterStateAttributesManager.asyncWrite( + remoteClusterStateAttributesManager.writeAsync( CLUSTER_STATE_CUSTOM, remoteClusterStateCustoms, new LatchedActionListener<>(listener, latch) - ).run(); + ); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -269,11 +265,11 @@ public void testGetAsyncReadRunnable_Custom() throws IOException, InterruptedExc ); TestCapturingListener capturingListener = new TestCapturingListener<>(); final CountDownLatch latch = new CountDownLatch(1); - remoteClusterStateAttributesManager.asyncRead( + remoteClusterStateAttributesManager.readAsync( CLUSTER_STATE_CUSTOM, remoteClusterStateCustoms, new LatchedActionListener<>(capturingListener, latch) - ).run(); + ); latch.await(); assertNull(capturingListener.getFailure()); assertNotNull(capturingListener.getResult()); @@ -295,11 +291,11 @@ public void testGetAsyncWriteRunnable_Exception() throws IOException, Interrupte TestCapturingListener capturingListener = new TestCapturingListener<>(); final CountDownLatch latch = new CountDownLatch(1); - remoteClusterStateAttributesManager.asyncWrite( + remoteClusterStateAttributesManager.writeAsync( DISCOVERY_NODES, remoteDiscoveryNodes, new LatchedActionListener<>(capturingListener, latch) - ).run(); + ); latch.await(); assertNull(capturingListener.getResult()); assertTrue(capturingListener.getFailure() instanceof RemoteStateTransferException); @@ -313,11 +309,11 @@ public void testGetAsyncReadRunnable_Exception() throws IOException, Interrupted when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenThrow(ioException); CountDownLatch latch = new CountDownLatch(1); TestCapturingListener capturingListener = new TestCapturingListener<>(); - remoteClusterStateAttributesManager.asyncRead( + remoteClusterStateAttributesManager.readAsync( DISCOVERY_NODES, remoteDiscoveryNodes, new LatchedActionListener<>(capturingListener, latch) - ).run(); + ); latch.await(); assertNull(capturingListener.getResult()); assertEquals(ioException, capturingListener.getFailure().getCause()); diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index ebd3488d06007..c9ebc62d5b73c 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -781,14 +781,18 @@ public void testGetClusterStateForManifest_IncludeEphemeral() throws IOException ArgumentCaptor> listenerArgumentCaptor = ArgumentCaptor.forClass( LatchedActionListener.class ); - when(mockedIndexManager.getAsyncIndexMetadataReadAction(any(), anyString(), listenerArgumentCaptor.capture())).thenReturn( - () -> listenerArgumentCaptor.getValue().onResponse(mockedResult) - ); - when(mockedGlobalMetadataManager.getAsyncMetadataReadAction(any(), anyString(), listenerArgumentCaptor.capture())).thenReturn( - () -> listenerArgumentCaptor.getValue().onResponse(mockedResult) - ); - when(mockedClusterStateAttributeManager.getAsyncMetadataReadAction(anyString(), any(), listenerArgumentCaptor.capture())) - .thenReturn(() -> listenerArgumentCaptor.getValue().onResponse(mockedResult)); + doAnswer(invocation -> { + listenerArgumentCaptor.getValue().onResponse(mockedResult); + return null; + }).when(mockedIndexManager).readAsync(any(), any(), listenerArgumentCaptor.capture()); + doAnswer(invocation -> { + listenerArgumentCaptor.getValue().onResponse(mockedResult); + return null; + }).when(mockedGlobalMetadataManager).readAsync(any(), any(), listenerArgumentCaptor.capture()); + doAnswer(invocation -> { + listenerArgumentCaptor.getValue().onResponse(mockedResult); + return null; + }).when(mockedClusterStateAttributeManager).readAsync(anyString(), any(), listenerArgumentCaptor.capture()); when(mockedResult.getComponent()).thenReturn(COORDINATION_METADATA); RemoteClusterStateService mockService = spy(remoteClusterStateService); mockService.getClusterStateForManifest(ClusterName.DEFAULT.value(), manifest, NODE_ID, true); @@ -823,14 +827,18 @@ public void testGetClusterStateForManifest_ExcludeEphemeral() throws IOException ArgumentCaptor> listenerArgumentCaptor = ArgumentCaptor.forClass( LatchedActionListener.class ); - when(mockedIndexManager.getAsyncIndexMetadataReadAction(any(), anyString(), listenerArgumentCaptor.capture())).thenReturn( - () -> listenerArgumentCaptor.getValue().onResponse(mockedResult) - ); - when(mockedGlobalMetadataManager.getAsyncMetadataReadAction(any(), anyString(), listenerArgumentCaptor.capture())).thenReturn( - () -> listenerArgumentCaptor.getValue().onResponse(mockedResult) - ); - when(mockedClusterStateAttributeManager.getAsyncMetadataReadAction(anyString(), any(), listenerArgumentCaptor.capture())) - .thenReturn(() -> listenerArgumentCaptor.getValue().onResponse(mockedResult)); + doAnswer(invocation -> { + listenerArgumentCaptor.getValue().onResponse(mockedResult); + return null; + }).when(mockedIndexManager).readAsync(anyString(), any(), listenerArgumentCaptor.capture()); + doAnswer(invocation -> { + listenerArgumentCaptor.getValue().onResponse(mockedResult); + return null; + }).when(mockedGlobalMetadataManager).readAsync(anyString(), any(), listenerArgumentCaptor.capture()); + doAnswer(invocation -> { + listenerArgumentCaptor.getValue().onResponse(mockedResult); + return null; + }).when(mockedClusterStateAttributeManager).readAsync(anyString(), any(), listenerArgumentCaptor.capture()); when(mockedResult.getComponent()).thenReturn(COORDINATION_METADATA); remoteClusterStateService.setRemoteIndexMetadataManager(mockedIndexManager); remoteClusterStateService.setRemoteGlobalMetadataManager(mockedGlobalMetadataManager); @@ -877,9 +885,10 @@ public void testGetClusterStateFromManifest_CodecV1() throws IOException { ArgumentCaptor> listenerArgumentCaptor = ArgumentCaptor.forClass( LatchedActionListener.class ); - when(mockedIndexManager.getAsyncIndexMetadataReadAction(any(), anyString(), listenerArgumentCaptor.capture())).thenReturn( - () -> listenerArgumentCaptor.getValue().onResponse(new RemoteReadResult(indexMetadata, INDEX, INDEX)) - ); + doAnswer(invocation -> { + listenerArgumentCaptor.getValue().onResponse(new RemoteReadResult(indexMetadata, INDEX, INDEX)); + return null; + }).when(mockedIndexManager).readAsync(anyString(), any(), listenerArgumentCaptor.capture()); when(mockedGlobalMetadataManager.getGlobalMetadata(anyString(), eq(manifest))).thenReturn(Metadata.EMPTY_METADATA); RemoteClusterStateService spiedService = spy(remoteClusterStateService); spiedService.getClusterStateForManifest(ClusterName.DEFAULT.value(), manifest, NODE_ID, true); @@ -1258,7 +1267,7 @@ public void testReadClusterStateInParallel_ExceptionDuringRead() throws IOExcept ); assertEquals("Exception during reading cluster state from remote", exception.getMessage()); assertTrue(exception.getSuppressed().length > 0); - assertEquals(mockException, exception.getSuppressed()[0]); + assertEquals(mockException, exception.getSuppressed()[0].getCause()); } public void testReadClusterStateInParallel_UnexpectedResult() throws IOException { @@ -1322,19 +1331,20 @@ public void testReadClusterStateInParallel_UnexpectedResult() throws IOException RemoteIndexMetadataManager mockIndexMetadataManager = mock(RemoteIndexMetadataManager.class); CheckedRunnable mockRunnable = mock(CheckedRunnable.class); ArgumentCaptor> latchCapture = ArgumentCaptor.forClass(LatchedActionListener.class); - when(mockIndexMetadataManager.getAsyncIndexMetadataReadAction(anyString(), anyString(), latchCapture.capture())).thenReturn( - mockRunnable - ); + doAnswer(invocation -> { + latchCapture.getValue().onResponse(mockResult); + return null; + }).when(mockIndexMetadataManager).readAsync(anyString(), any(), latchCapture.capture()); RemoteGlobalMetadataManager mockGlobalMetadataManager = mock(RemoteGlobalMetadataManager.class); - when(mockGlobalMetadataManager.getAsyncMetadataReadAction(any(), anyString(), latchCapture.capture())).thenReturn(mockRunnable); + doAnswer(invocation -> { + latchCapture.getValue().onResponse(mockResult); + return null; + }).when(mockGlobalMetadataManager).readAsync(any(), any(), latchCapture.capture()); RemoteClusterStateAttributesManager mockClusterStateAttributeManager = mock(RemoteClusterStateAttributesManager.class); - when(mockClusterStateAttributeManager.getAsyncMetadataReadAction(anyString(), any(), latchCapture.capture())).thenReturn( - mockRunnable - ); - doAnswer(invocationOnMock -> { + doAnswer(invocation -> { latchCapture.getValue().onResponse(mockResult); return null; - }).when(mockRunnable).run(); + }).when(mockClusterStateAttributeManager).readAsync(anyString(), any(), latchCapture.capture()); when(mockResult.getComponent()).thenReturn("mock-result"); remoteClusterStateService.start(); remoteClusterStateService.setRemoteIndexMetadataManager(mockIndexMetadataManager); @@ -1363,56 +1373,56 @@ public void testReadClusterStateInParallel_UnexpectedResult() throws IOException ); assertEquals("Unknown component: mock-result", exception.getMessage()); newIndicesToRead.forEach( - uploadedIndexMetadata -> verify(mockIndexMetadataManager, times(1)).getAsyncIndexMetadataReadAction( - eq(previousClusterState.getMetadata().clusterUUID()), - eq(uploadedIndexMetadata.getUploadedFilename()), + uploadedIndexMetadata -> verify(mockIndexMetadataManager, times(1)).readAsync( + eq("test-index-1"), + argThat(new BlobNameMatcher(uploadedIndexMetadata.getUploadedFilename())), any() ) ); - verify(mockGlobalMetadataManager, times(1)).getAsyncMetadataReadAction( - argThat(new BlobNameMatcher(COORDINATION_METADATA_FILENAME)), + verify(mockGlobalMetadataManager, times(1)).readAsync( eq(COORDINATION_METADATA), + argThat(new BlobNameMatcher(COORDINATION_METADATA_FILENAME)), any() ); - verify(mockGlobalMetadataManager, times(1)).getAsyncMetadataReadAction( - argThat(new BlobNameMatcher(PERSISTENT_SETTINGS_FILENAME)), + verify(mockGlobalMetadataManager, times(1)).readAsync( eq(SETTING_METADATA), + argThat(new BlobNameMatcher(PERSISTENT_SETTINGS_FILENAME)), any() ); - verify(mockGlobalMetadataManager, times(1)).getAsyncMetadataReadAction( - argThat(new BlobNameMatcher(TRANSIENT_SETTINGS_FILENAME)), + verify(mockGlobalMetadataManager, times(1)).readAsync( eq(TRANSIENT_SETTING_METADATA), + argThat(new BlobNameMatcher(TRANSIENT_SETTINGS_FILENAME)), any() ); - verify(mockGlobalMetadataManager, times(1)).getAsyncMetadataReadAction( - argThat(new BlobNameMatcher(TEMPLATES_METADATA_FILENAME)), + verify(mockGlobalMetadataManager, times(1)).readAsync( eq(TEMPLATES_METADATA), + argThat(new BlobNameMatcher(TEMPLATES_METADATA_FILENAME)), any() ); - verify(mockGlobalMetadataManager, times(1)).getAsyncMetadataReadAction( - argThat(new BlobNameMatcher(HASHES_OF_CONSISTENT_SETTINGS_FILENAME)), + verify(mockGlobalMetadataManager, times(1)).readAsync( eq(HASHES_OF_CONSISTENT_SETTINGS), + argThat(new BlobNameMatcher(HASHES_OF_CONSISTENT_SETTINGS_FILENAME)), any() ); newCustomMetadataMap.keySet().forEach(uploadedCustomMetadataKey -> { - verify(mockGlobalMetadataManager, times(1)).getAsyncMetadataReadAction( - argThat(new BlobNameMatcher(newCustomMetadataMap.get(uploadedCustomMetadataKey).getUploadedFilename())), + verify(mockGlobalMetadataManager, times(1)).readAsync( eq(uploadedCustomMetadataKey), + argThat(new BlobNameMatcher(newCustomMetadataMap.get(uploadedCustomMetadataKey).getUploadedFilename())), any() ); }); - verify(mockClusterStateAttributeManager, times(1)).getAsyncMetadataReadAction( + verify(mockClusterStateAttributeManager, times(1)).readAsync( eq(DISCOVERY_NODES), argThat(new BlobNameMatcher(DISCOVERY_NODES_FILENAME)), any() ); - verify(mockClusterStateAttributeManager, times(1)).getAsyncMetadataReadAction( + verify(mockClusterStateAttributeManager, times(1)).readAsync( eq(CLUSTER_BLOCKS), argThat(new BlobNameMatcher(CLUSTER_BLOCKS_FILENAME)), any() ); newClusterStateCustoms.keySet().forEach(uploadedClusterStateCustomMetadataKey -> { - verify(mockClusterStateAttributeManager, times(1)).getAsyncMetadataReadAction( + verify(mockClusterStateAttributeManager, times(1)).readAsync( eq(String.join(CUSTOM_DELIMITER, CLUSTER_STATE_CUSTOM, uploadedClusterStateCustomMetadataKey)), argThat(new BlobNameMatcher(newClusterStateCustoms.get(uploadedClusterStateCustomMetadataKey).getUploadedFilename())), any() @@ -1495,131 +1505,80 @@ public void testReadClusterStateInParallel_Success() throws IOException { RemoteGlobalMetadataManager mockedGlobalMetadataManager = mock(RemoteGlobalMetadataManager.class); RemoteClusterStateAttributesManager mockedClusterStateAttributeManager = mock(RemoteClusterStateAttributesManager.class); - when( - mockedIndexManager.getAsyncIndexMetadataReadAction( - eq(manifest.getClusterUUID()), - eq(indexFilename), - any(LatchedActionListener.class) - ) - ).thenAnswer(invocationOnMock -> { + doAnswer(invocationOnMock -> { LatchedActionListener latchedActionListener = invocationOnMock.getArgument(2, LatchedActionListener.class); - return (CheckedRunnable) () -> latchedActionListener.onResponse( - new RemoteReadResult(newIndexMetadata, INDEX, "test-index-1") - ); - }); - when( - mockedGlobalMetadataManager.getAsyncMetadataReadAction( - argThat(new BlobNameMatcher(customMetadataFilename)), - eq("custom_md_3"), - any() - ) - ).thenAnswer(invocationOnMock -> { + latchedActionListener.onResponse(new RemoteReadResult(newIndexMetadata, INDEX, "test-index-1")); + return null; + }).when(mockedIndexManager).readAsync(eq("test-index-1"), argThat(new BlobNameMatcher(indexFilename)), any(LatchedActionListener.class)); + doAnswer(invocationOnMock -> { LatchedActionListener latchedActionListener = invocationOnMock.getArgument(2, LatchedActionListener.class); - return (CheckedRunnable) () -> latchedActionListener.onResponse( - new RemoteReadResult(customMetadata3, CUSTOM_METADATA, "custom_md_3") - ); - }); - when( - mockedGlobalMetadataManager.getAsyncMetadataReadAction( - argThat(new BlobNameMatcher(COORDINATION_METADATA_FILENAME)), - eq(COORDINATION_METADATA), - any() - ) - ).thenAnswer(invocationOnMock -> { + latchedActionListener.onResponse(new RemoteReadResult(customMetadata3, CUSTOM_METADATA, "custom_md_3")); + return null; + }).when(mockedGlobalMetadataManager).readAsync(eq("custom_md_3"), argThat(new BlobNameMatcher(customMetadataFilename)), any()); + doAnswer(invocationOnMock -> { LatchedActionListener latchedActionListener = invocationOnMock.getArgument(2, LatchedActionListener.class); - return (CheckedRunnable) () -> latchedActionListener.onResponse( + latchedActionListener.onResponse( new RemoteReadResult(updatedCoordinationMetadata, COORDINATION_METADATA, COORDINATION_METADATA) ); - }); - when( - mockedGlobalMetadataManager.getAsyncMetadataReadAction( - argThat(new BlobNameMatcher(PERSISTENT_SETTINGS_FILENAME)), - eq(SETTING_METADATA), - any() - ) - ).thenAnswer(invocationOnMock -> { + return null; + }).when(mockedGlobalMetadataManager) + .readAsync(eq(COORDINATION_METADATA), argThat(new BlobNameMatcher(COORDINATION_METADATA_FILENAME)), any()); + doAnswer(invocationOnMock -> { LatchedActionListener latchedActionListener = invocationOnMock.getArgument(2, LatchedActionListener.class); - - return (CheckedRunnable) () -> latchedActionListener.onResponse( - new RemoteReadResult(updatedPersistentSettings, SETTING_METADATA, SETTING_METADATA) - ); - }); - when( - mockedGlobalMetadataManager.getAsyncMetadataReadAction( - argThat(new BlobNameMatcher(TRANSIENT_SETTINGS_FILENAME)), - eq(TRANSIENT_SETTING_METADATA), - any() - ) - ).thenAnswer(invocationOnMock -> { + latchedActionListener.onResponse(new RemoteReadResult(updatedPersistentSettings, SETTING_METADATA, SETTING_METADATA)); + return null; + }).when(mockedGlobalMetadataManager) + .readAsync(eq(SETTING_METADATA), argThat(new BlobNameMatcher(PERSISTENT_SETTINGS_FILENAME)), any()); + doAnswer(invocationOnMock -> { LatchedActionListener latchedActionListener = invocationOnMock.getArgument(2, LatchedActionListener.class); - return (CheckedRunnable) () -> latchedActionListener.onResponse( + latchedActionListener.onResponse( new RemoteReadResult(updatedTransientSettings, TRANSIENT_SETTING_METADATA, TRANSIENT_SETTING_METADATA) ); - }); - when( - mockedGlobalMetadataManager.getAsyncMetadataReadAction( - argThat(new BlobNameMatcher(TEMPLATES_METADATA_FILENAME)), - eq(TEMPLATES_METADATA), - any() - ) - ).thenAnswer(invocationOnMock -> { + return null; + }).when(mockedGlobalMetadataManager) + .readAsync(eq(TRANSIENT_SETTING_METADATA), argThat(new BlobNameMatcher(TRANSIENT_SETTINGS_FILENAME)), any()); + doAnswer(invocationOnMock -> { LatchedActionListener latchedActionListener = invocationOnMock.getArgument(2, LatchedActionListener.class); - return (CheckedRunnable) () -> latchedActionListener.onResponse( - new RemoteReadResult(updatedTemplateMetadata, TEMPLATES_METADATA, TEMPLATES_METADATA) - ); - }); - when( - mockedGlobalMetadataManager.getAsyncMetadataReadAction( - argThat(new BlobNameMatcher(HASHES_OF_CONSISTENT_SETTINGS_FILENAME)), - eq(HASHES_OF_CONSISTENT_SETTINGS), - any() - ) - ).thenAnswer(invocationOnMock -> { + latchedActionListener.onResponse(new RemoteReadResult(updatedTemplateMetadata, TEMPLATES_METADATA, TEMPLATES_METADATA)); + return null; + }).when(mockedGlobalMetadataManager) + .readAsync(eq(TEMPLATES_METADATA), argThat(new BlobNameMatcher(TEMPLATES_METADATA_FILENAME)), any()); + doAnswer(invocationOnMock -> { LatchedActionListener latchedActionListener = invocationOnMock.getArgument(2, LatchedActionListener.class); - return (CheckedRunnable) () -> latchedActionListener.onResponse( + latchedActionListener.onResponse( new RemoteReadResult(updatedHashesOfConsistentSettings, HASHES_OF_CONSISTENT_SETTINGS, HASHES_OF_CONSISTENT_SETTINGS) ); - }); - when( - mockedClusterStateAttributeManager.getAsyncMetadataReadAction( - eq(DISCOVERY_NODES), - argThat(new BlobNameMatcher(DISCOVERY_NODES_FILENAME)), - any() - ) - ).thenAnswer(invocationOnMock -> { + return null; + }).when(mockedGlobalMetadataManager) + .readAsync(eq(HASHES_OF_CONSISTENT_SETTINGS), argThat(new BlobNameMatcher(HASHES_OF_CONSISTENT_SETTINGS_FILENAME)), any()); + doAnswer(invocationOnMock -> { LatchedActionListener latchedActionListener = invocationOnMock.getArgument(2, LatchedActionListener.class); - return (CheckedRunnable) () -> latchedActionListener.onResponse( - new RemoteReadResult(updatedDiscoveryNodes, CLUSTER_STATE_ATTRIBUTE, DISCOVERY_NODES) - ); - }); - when( - mockedClusterStateAttributeManager.getAsyncMetadataReadAction( - eq(CLUSTER_BLOCKS), - argThat(new BlobNameMatcher(CLUSTER_BLOCKS_FILENAME)), - any() - ) - ).thenAnswer(invocationOnMock -> { + latchedActionListener.onResponse(new RemoteReadResult(updatedDiscoveryNodes, CLUSTER_STATE_ATTRIBUTE, DISCOVERY_NODES)); + return null; + }).when(mockedClusterStateAttributeManager) + .readAsync(eq(DISCOVERY_NODES), argThat(new BlobNameMatcher(DISCOVERY_NODES_FILENAME)), any()); + doAnswer(invocationOnMock -> { LatchedActionListener latchedActionListener = invocationOnMock.getArgument(2, LatchedActionListener.class); - return (CheckedRunnable) () -> latchedActionListener.onResponse( - new RemoteReadResult(updatedClusterBlocks, CLUSTER_STATE_ATTRIBUTE, CLUSTER_BLOCKS) - ); - }); - when( - mockedClusterStateAttributeManager.getAsyncMetadataReadAction( - eq(String.join(CUSTOM_DELIMITER, CLUSTER_STATE_CUSTOM, updatedClusterStateCustom3.getWriteableName())), - argThat(new BlobNameMatcher(clusterStateCustomFilename)), - any() - ) - ).thenAnswer(invocationOnMock -> { + latchedActionListener.onResponse(new RemoteReadResult(updatedClusterBlocks, CLUSTER_STATE_ATTRIBUTE, CLUSTER_BLOCKS)); + return null; + }).when(mockedClusterStateAttributeManager) + .readAsync(eq(CLUSTER_BLOCKS), argThat(new BlobNameMatcher(CLUSTER_BLOCKS_FILENAME)), any()); + doAnswer(invocationOnMock -> { LatchedActionListener latchedActionListener = invocationOnMock.getArgument(2, LatchedActionListener.class); - return (CheckedRunnable) () -> latchedActionListener.onResponse( + latchedActionListener.onResponse( new RemoteReadResult( updatedClusterStateCustom3, CLUSTER_STATE_ATTRIBUTE, String.join(CUSTOM_DELIMITER, CLUSTER_STATE_CUSTOM, updatedClusterStateCustom3.getWriteableName()) ) ); - }); + return null; + }).when(mockedClusterStateAttributeManager) + .readAsync( + eq(String.join(CUSTOM_DELIMITER, CLUSTER_STATE_CUSTOM, updatedClusterStateCustom3.getWriteableName())), + argThat(new BlobNameMatcher(clusterStateCustomFilename)), + any() + ); remoteClusterStateService.start(); remoteClusterStateService.setRemoteIndexMetadataManager(mockedIndexManager); @@ -1665,56 +1624,56 @@ public void testReadClusterStateInParallel_Success() throws IOException { uploadedClusterStateCustomMap.keySet().forEach(key -> assertTrue(updatedClusterState.customs().containsKey(key))); assertEquals(updatedClusterStateCustom3, updatedClusterState.custom("custom_3")); newIndicesToRead.forEach( - uploadedIndexMetadata -> verify(mockedIndexManager, times(1)).getAsyncIndexMetadataReadAction( - eq(previousClusterState.getMetadata().clusterUUID()), - eq(uploadedIndexMetadata.getUploadedFilename()), + uploadedIndexMetadata -> verify(mockedIndexManager, times(1)).readAsync( + eq("test-index-1"), + argThat(new BlobNameMatcher(uploadedIndexMetadata.getUploadedFilename())), any() ) ); - verify(mockedGlobalMetadataManager, times(1)).getAsyncMetadataReadAction( - argThat(new BlobNameMatcher(COORDINATION_METADATA_FILENAME)), + verify(mockedGlobalMetadataManager, times(1)).readAsync( eq(COORDINATION_METADATA), + argThat(new BlobNameMatcher(COORDINATION_METADATA_FILENAME)), any() ); - verify(mockedGlobalMetadataManager, times(1)).getAsyncMetadataReadAction( - argThat(new BlobNameMatcher(PERSISTENT_SETTINGS_FILENAME)), + verify(mockedGlobalMetadataManager, times(1)).readAsync( eq(SETTING_METADATA), + argThat(new BlobNameMatcher(PERSISTENT_SETTINGS_FILENAME)), any() ); - verify(mockedGlobalMetadataManager, times(1)).getAsyncMetadataReadAction( - argThat(new BlobNameMatcher(TRANSIENT_SETTINGS_FILENAME)), + verify(mockedGlobalMetadataManager, times(1)).readAsync( eq(TRANSIENT_SETTING_METADATA), + argThat(new BlobNameMatcher(TRANSIENT_SETTINGS_FILENAME)), any() ); - verify(mockedGlobalMetadataManager, times(1)).getAsyncMetadataReadAction( - argThat(new BlobNameMatcher(TEMPLATES_METADATA_FILENAME)), + verify(mockedGlobalMetadataManager, times(1)).readAsync( eq(TEMPLATES_METADATA), + argThat(new BlobNameMatcher(TEMPLATES_METADATA_FILENAME)), any() ); - verify(mockedGlobalMetadataManager, times(1)).getAsyncMetadataReadAction( - argThat(new BlobNameMatcher(HASHES_OF_CONSISTENT_SETTINGS_FILENAME)), + verify(mockedGlobalMetadataManager, times(1)).readAsync( eq(HASHES_OF_CONSISTENT_SETTINGS), + argThat(new BlobNameMatcher(HASHES_OF_CONSISTENT_SETTINGS_FILENAME)), any() ); newCustomMetadataMap.keySet().forEach(uploadedCustomMetadataKey -> { - verify(mockedGlobalMetadataManager, times(1)).getAsyncMetadataReadAction( - argThat(new BlobNameMatcher(newCustomMetadataMap.get(uploadedCustomMetadataKey).getUploadedFilename())), + verify(mockedGlobalMetadataManager, times(1)).readAsync( eq(uploadedCustomMetadataKey), + argThat(new BlobNameMatcher(newCustomMetadataMap.get(uploadedCustomMetadataKey).getUploadedFilename())), any() ); }); - verify(mockedClusterStateAttributeManager, times(1)).getAsyncMetadataReadAction( + verify(mockedClusterStateAttributeManager, times(1)).readAsync( eq(DISCOVERY_NODES), argThat(new BlobNameMatcher(DISCOVERY_NODES_FILENAME)), any() ); - verify(mockedClusterStateAttributeManager, times(1)).getAsyncMetadataReadAction( + verify(mockedClusterStateAttributeManager, times(1)).readAsync( eq(CLUSTER_BLOCKS), argThat(new BlobNameMatcher(CLUSTER_BLOCKS_FILENAME)), any() ); newClusterStateCustoms.keySet().forEach(uploadedClusterStateCustomMetadataKey -> { - verify(mockedClusterStateAttributeManager, times(1)).getAsyncMetadataReadAction( + verify(mockedClusterStateAttributeManager, times(1)).readAsync( eq(String.join(CUSTOM_DELIMITER, CLUSTER_STATE_CUSTOM, uploadedClusterStateCustomMetadataKey)), argThat(new BlobNameMatcher(newClusterStateCustoms.get(uploadedClusterStateCustomMetadataKey).getUploadedFilename())), any() diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManagerTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManagerTests.java index 98248fb3c6065..a2da1e8b0fdb2 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManagerTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManagerTests.java @@ -173,11 +173,11 @@ public void testGetAsyncReadRunnable_CoordinationMetadata() throws Exception { TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.asyncRead( + remoteGlobalMetadataManager.readAsync( COORDINATION_METADATA, coordinationMetadataForDownload, new LatchedActionListener<>(listener, latch) - ).run(); + ); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -203,11 +203,11 @@ public void testGetAsyncWriteRunnable_CoordinationMetadata() throws Exception { TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.asyncWrite( + remoteGlobalMetadataManager.writeAsync( COORDINATION_METADATA, remoteCoordinationMetadata, new LatchedActionListener<>(listener, latch) - ).run(); + ); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -243,7 +243,7 @@ public void testGetAsyncReadRunnable_PersistentSettings() throws Exception { TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.asyncRead(SETTING_METADATA, persistentSettings, new LatchedActionListener<>(listener, latch)).run(); + remoteGlobalMetadataManager.readAsync(SETTING_METADATA, persistentSettings, new LatchedActionListener<>(listener, latch)); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -268,7 +268,7 @@ public void testGetAsyncWriteRunnable_PersistentSettings() throws Exception { .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.asyncWrite(SETTING_METADATA, persistentSettings, new LatchedActionListener<>(listener, latch)).run(); + remoteGlobalMetadataManager.writeAsync(SETTING_METADATA, persistentSettings, new LatchedActionListener<>(listener, latch)); latch.await(); assertNull(listener.getFailure()); @@ -305,8 +305,7 @@ public void testGetAsyncReadRunnable_TransientSettings() throws Exception { TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.asyncRead(TRANSIENT_SETTING_METADATA, transientSettings, new LatchedActionListener<>(listener, latch)) - .run(); + remoteGlobalMetadataManager.readAsync(TRANSIENT_SETTING_METADATA, transientSettings, new LatchedActionListener<>(listener, latch)); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -331,8 +330,7 @@ public void testGetAsyncWriteRunnable_TransientSettings() throws Exception { .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.asyncWrite(TRANSIENT_SETTING_METADATA, transientSettings, new LatchedActionListener<>(listener, latch)) - .run(); + remoteGlobalMetadataManager.writeAsync(TRANSIENT_SETTING_METADATA, transientSettings, new LatchedActionListener<>(listener, latch)); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -366,11 +364,11 @@ public void testGetAsyncReadRunnable_HashesOfConsistentSettings() throws Excepti TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.asyncRead( + remoteGlobalMetadataManager.readAsync( HASHES_OF_CONSISTENT_SETTINGS, hashesOfConsistentSettingsForDownload, new LatchedActionListener<>(listener, latch) - ).run(); + ); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -394,11 +392,11 @@ public void testGetAsyncWriteRunnable_HashesOfConsistentSettings() throws Except .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.asyncWrite( + remoteGlobalMetadataManager.writeAsync( HASHES_OF_CONSISTENT_SETTINGS, hashesOfConsistentSettingsForUpload, new LatchedActionListener<>(listener, latch) - ).run(); + ); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -432,11 +430,11 @@ public void testGetAsyncReadRunnable_TemplatesMetadata() throws Exception { ); TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.asyncRead( + remoteGlobalMetadataManager.readAsync( TEMPLATES_METADATA, templatesMetadataForDownload, new LatchedActionListener<>(listener, latch) - ).run(); + ); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -461,8 +459,7 @@ public void testGetAsyncWriteRunnable_TemplatesMetadata() throws Exception { .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.asyncWrite(TEMPLATES_METADATA, templateMetadataForUpload, new LatchedActionListener<>(listener, latch)) - .run(); + remoteGlobalMetadataManager.writeAsync(TEMPLATES_METADATA, templateMetadataForUpload, new LatchedActionListener<>(listener, latch)); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -497,8 +494,7 @@ public void testGetAsyncReadRunnable_CustomMetadata() throws Exception { ); TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.asyncRead(IndexGraveyard.TYPE, customMetadataForDownload, new LatchedActionListener<>(listener, latch)) - .run(); + remoteGlobalMetadataManager.readAsync(IndexGraveyard.TYPE, customMetadataForDownload, new LatchedActionListener<>(listener, latch)); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -524,11 +520,11 @@ public void testGetAsyncWriteRunnable_CustomMetadata() throws Exception { .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.asyncWrite( + remoteGlobalMetadataManager.writeAsync( customMetadataForUpload.getType(), customMetadataForUpload, new LatchedActionListener<>(listener, latch) - ).run(); + ); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -557,8 +553,7 @@ public void testGetAsyncReadRunnable_GlobalMetadata() throws Exception { ); TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.asyncRead(GLOBAL_METADATA, globalMetadataForDownload, new LatchedActionListener<>(listener, latch)) - .run(); + remoteGlobalMetadataManager.readAsync(GLOBAL_METADATA, globalMetadataForDownload, new LatchedActionListener<>(listener, latch)); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -579,11 +574,11 @@ public void testGetAsyncReadRunnable_IOException() throws Exception { when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenThrow(ioException); TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.asyncRead( + remoteGlobalMetadataManager.readAsync( COORDINATION_METADATA, coordinationMetadataForDownload, new LatchedActionListener<>(listener, latch) - ).run(); + ); latch.await(); assertNull(listener.getResult()); assertNotNull(listener.getFailure()); @@ -609,11 +604,11 @@ public void testGetAsyncWriteRunnable_IOException() throws Exception { TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.asyncWrite( + remoteGlobalMetadataManager.writeAsync( COORDINATION_METADATA, remoteCoordinationMetadata, new LatchedActionListener<>(listener, latch) - ).run(); + ); assertNull(listener.getResult()); assertNotNull(listener.getFailure()); assertTrue(listener.getFailure() instanceof RemoteStateTransferException); diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteIndexMetadataManagerTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteIndexMetadataManagerTests.java index ae8deab7decb1..76c5792677ea0 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteIndexMetadataManagerTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteIndexMetadataManagerTests.java @@ -98,11 +98,11 @@ public void testGetAsyncWriteRunnable_Success() throws Exception { return null; })).when(blobStoreTransferService).uploadBlob(any(), any(), any(), eq(WritePriority.URGENT), any(ActionListener.class)); - remoteIndexMetadataManager.asyncWrite( + remoteIndexMetadataManager.writeAsync( INDEX, new RemoteIndexMetadata(indexMetadata, "cluster-uuid", compressor, null), new LatchedActionListener<>(listener, latch) - ).run(); + ); latch.await(); assertNull(listener.getFailure()); @@ -130,11 +130,11 @@ public void testGetAsyncWriteRunnable_IOFailure() throws Exception { return null; })).when(blobStoreTransferService).uploadBlob(any(), any(), any(), eq(WritePriority.URGENT), any(ActionListener.class)); - remoteIndexMetadataManager.asyncWrite( + remoteIndexMetadataManager.writeAsync( INDEX, new RemoteIndexMetadata(indexMetadata, "cluster-uuid", compressor, null), new LatchedActionListener<>(listener, latch) - ).run(); + ); latch.await(); assertNull(listener.getResult()); assertNotNull(listener.getFailure()); @@ -151,11 +151,11 @@ public void testGetAsyncReadRunnable_Success() throws Exception { TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteIndexMetadataManager.asyncRead( + remoteIndexMetadataManager.readAsync( INDEX, new RemoteIndexMetadata(fileName, "cluster-uuid", compressor, null), new LatchedActionListener<>(listener, latch) - ).run(); + ); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -170,11 +170,11 @@ public void testGetAsyncReadRunnable_IOFailure() throws Exception { TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteIndexMetadataManager.asyncRead( + remoteIndexMetadataManager.readAsync( INDEX, new RemoteIndexMetadata(fileName, "cluster-uuid", compressor, null), new LatchedActionListener<>(listener, latch) - ).run(); + ); latch.await(); assertNull(listener.getResult()); assertNotNull(listener.getFailure());