From 6162126c869a21e220738e8411c2a1277ceb94c7 Mon Sep 17 00:00:00 2001 From: Shivansh Arora Date: Fri, 12 Jul 2024 14:50:06 +0530 Subject: [PATCH] update async read and write method signature Signed-off-by: Shivansh Arora --- .../InternalRemoteRoutingTableService.java | 19 +- .../remote/NoopRemoteRoutingTableService.java | 7 +- .../remote/RemoteRoutingTableService.java | 5 +- .../AbstractRemoteWritableEntityManager.java | 14 +- .../remote/RemoteWritableEntityManager.java | 35 +- .../remote/RemoteClusterStateService.java | 479 ++++++++---------- .../RemoteRoutingTableServiceTests.java | 92 ++-- ...oteClusterStateAttributesManagerTests.java | 28 +- .../RemoteGlobalMetadataManagerTests.java | 51 +- .../RemoteIndexMetadataManagerTests.java | 16 +- 10 files changed, 337 insertions(+), 409 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java index cc1b0713393f3..ba1814997633f 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java @@ -17,7 +17,6 @@ import org.opensearch.cluster.DiffableUtils; import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.RoutingTable; -import org.opensearch.common.CheckedRunnable; import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobPath; @@ -150,14 +149,14 @@ public DiffableUtils.MapDiff getIndexRoutingAsyncAction( + public void writeAsync( ClusterState clusterState, IndexRoutingTable indexRouting, LatchedActionListener latchedActionListener, @@ -187,7 +186,7 @@ public CheckedRunnable getIndexRoutingAsyncAction( ) ); - return () -> uploadIndex(indexRouting, fileName, blobContainer, completionListener); + uploadIndex(indexRouting, fileName, blobContainer, completionListener); } /** @@ -274,7 +273,7 @@ private void uploadIndex( } @Override - public CheckedRunnable getAsyncIndexRoutingReadAction( + public void getAsyncIndexRoutingReadAction( String uploadedFilename, Index index, LatchedActionListener latchedActionListener @@ -284,7 +283,7 @@ public CheckedRunnable getAsyncIndexRoutingReadAction( BlobContainer blobContainer = blobStoreRepository.blobStore() .blobContainer(BlobPath.cleanPath().add(uploadedFilename.substring(0, idx))); - return () -> readAsync( + readAsync( blobContainer, blobFileName, index, diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java index 6236d107d0220..8c7e3a6283734 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java @@ -13,7 +13,6 @@ import org.opensearch.cluster.DiffableUtils; import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.RoutingTable; -import org.opensearch.common.CheckedRunnable; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.lifecycle.AbstractLifecycleComponent; import org.opensearch.core.index.Index; @@ -42,14 +41,13 @@ public DiffableUtils.MapDiff getIndexRoutingAsyncAction( + public void writeAsync( ClusterState clusterState, IndexRoutingTable indexRouting, LatchedActionListener latchedActionListener, BlobPath clusterBasePath ) { // noop - return () -> {}; } @Override @@ -63,13 +61,12 @@ public List getAllUploadedIndices } @Override - public CheckedRunnable getAsyncIndexRoutingReadAction( + public void getAsyncIndexRoutingReadAction( String uploadedFilename, Index index, LatchedActionListener latchedActionListener ) { // noop - return () -> {}; } @Override diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java index d455dfb58eabc..b4562940ba335 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java @@ -13,7 +13,6 @@ import org.opensearch.cluster.DiffableUtils; import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.RoutingTable; -import org.opensearch.common.CheckedRunnable; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.lifecycle.LifecycleComponent; import org.opensearch.core.common.io.stream.StreamInput; @@ -46,7 +45,7 @@ public IndexRoutingTable read(StreamInput in, String key) throws IOException { List getIndicesRouting(RoutingTable routingTable); - CheckedRunnable getAsyncIndexRoutingReadAction( + void getAsyncIndexRoutingReadAction( String uploadedFilename, Index index, LatchedActionListener latchedActionListener @@ -62,7 +61,7 @@ DiffableUtils.MapDiff> RoutingTable after ); - CheckedRunnable getIndexRoutingAsyncAction( + void writeAsync( ClusterState clusterState, IndexRoutingTable indexRouting, LatchedActionListener latchedActionListener, diff --git a/server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableEntityManager.java b/server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableEntityManager.java index 7680b4a420aef..a72794e23426a 100644 --- a/server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableEntityManager.java +++ b/server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableEntityManager.java @@ -8,12 +8,10 @@ package org.opensearch.common.remote; -import org.opensearch.common.CheckedRunnable; import org.opensearch.core.action.ActionListener; import org.opensearch.gateway.remote.ClusterMetadataManifest; import org.opensearch.gateway.remote.model.RemoteReadResult; -import java.io.IOException; import java.util.HashMap; import java.util.Map; @@ -71,20 +69,16 @@ protected abstract ActionListener getReadActionListener( ); @Override - public CheckedRunnable asyncWrite( + public void writeAsync( String component, AbstractRemoteWritableBlobEntity entity, ActionListener listener ) { - return () -> getStore(entity).writeAsync(entity, getWriteActionListener(component, entity, listener)); + getStore(entity).writeAsync(entity, getWriteActionListener(component, entity, listener)); } @Override - public CheckedRunnable asyncRead( - String component, - AbstractRemoteWritableBlobEntity entity, - ActionListener listener - ) { - return () -> getStore(entity).readAsync(entity, getReadActionListener(component, entity, listener)); + public void readAsync(String component, AbstractRemoteWritableBlobEntity entity, ActionListener listener) { + getStore(entity).readAsync(entity, getReadActionListener(component, entity, listener)); } } diff --git a/server/src/main/java/org/opensearch/common/remote/RemoteWritableEntityManager.java b/server/src/main/java/org/opensearch/common/remote/RemoteWritableEntityManager.java index bcaabac837b1f..7693d1b5284bd 100644 --- a/server/src/main/java/org/opensearch/common/remote/RemoteWritableEntityManager.java +++ b/server/src/main/java/org/opensearch/common/remote/RemoteWritableEntityManager.java @@ -8,43 +8,40 @@ package org.opensearch.common.remote; -import org.opensearch.common.CheckedRunnable; import org.opensearch.core.action.ActionListener; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata; import org.opensearch.gateway.remote.model.RemoteReadResult; -import java.io.IOException; - /** * The RemoteWritableEntityManager interface provides async read and write methods for managing remote entities in the remote store */ public interface RemoteWritableEntityManager { /** - * Returns a CheckedRunnable that performs an asynchronous read operation for the specified component and entity. + * Performs an asynchronous read operation for the specified component and entity. * * @param component the component for which the read operation is performed * @param entity the entity to be read - * @param listener the listener to be notified when the read operation completes - * @return a CheckedRunnable that performs the asynchronous read operation + * @param listener the listener to be notified when the read operation completes. + * The listener's {@link ActionListener#onResponse(Object)} method + * is called with a {@link RemoteReadResult} object containing the + * read data on successful read. The + * {@link ActionListener#onFailure(Exception)} method is called with + * an exception if the read operation fails. */ - CheckedRunnable asyncRead( - String component, - AbstractRemoteWritableBlobEntity entity, - ActionListener listener - ); + void readAsync(String component, AbstractRemoteWritableBlobEntity entity, ActionListener listener); /** - * Returns a CheckedRunnable that performs an asynchronous write operation for the specified component and entity. + * Performs an asynchronous write operation for the specified component and entity. * * @param component the component for which the write operation is performed * @param entity the entity to be written - * @param listener the listener to be notified when the write operation completes - * @return a CheckedRunnable that performs the asynchronous write operation + * @param listener the listener to be notified when the write operation completes. + * The listener's {@link ActionListener#onResponse(Object)} method + * is called with a {@link UploadedMetadata} object containing the + * uploaded metadata on successful write. The + * {@link ActionListener#onFailure(Exception)} method is called with + * an exception if the write operation fails. */ - CheckedRunnable asyncWrite( - String component, - AbstractRemoteWritableBlobEntity entity, - ActionListener listener - ); + void writeAsync(String component, AbstractRemoteWritableBlobEntity entity, ActionListener listener); } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 6959516570e9d..8a4c1a548efda 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -30,7 +30,6 @@ import org.opensearch.cluster.routing.remote.RemoteRoutingTableService; import org.opensearch.cluster.routing.remote.RemoteRoutingTableServiceFactory; import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.CheckedRunnable; import org.opensearch.common.Nullable; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobStore; @@ -498,7 +497,7 @@ UploadedMetadataResults writeMetadataInParallel( + (uploadDiscoveryNodes ? 1 : 0) + (uploadClusterBlock ? 1 : 0) + (uploadTransientSettingMetadata ? 1 : 0) + clusterStateCustomToUpload.size() + (uploadHashesOfConsistentSettings ? 1 : 0) + indicesRoutingToUpload.size(); CountDownLatch latch = new CountDownLatch(totalUploadTasks); - Map> uploadTasks = new ConcurrentHashMap<>(totalUploadTasks); + List uploadTasks = Collections.synchronizedList(new ArrayList<>(totalUploadTasks)); Map results = new ConcurrentHashMap<>(totalUploadTasks); List exceptionList = Collections.synchronizedList(new ArrayList<>(totalUploadTasks)); @@ -517,185 +516,158 @@ UploadedMetadataResults writeMetadataInParallel( ); if (uploadSettingsMetadata) { - uploadTasks.put( + uploadTasks.add(SETTING_METADATA); + remoteGlobalMetadataManager.writeAsync( SETTING_METADATA, - remoteGlobalMetadataManager.asyncWrite( - SETTING_METADATA, - new RemotePersistentSettingsMetadata( - clusterState.metadata().persistentSettings(), - clusterState.metadata().version(), - clusterState.metadata().clusterUUID(), - blobStoreRepository.getCompressor(), - blobStoreRepository.getNamedXContentRegistry() - ), - listener - ) + new RemotePersistentSettingsMetadata( + clusterState.metadata().persistentSettings(), + clusterState.metadata().version(), + clusterState.metadata().clusterUUID(), + blobStoreRepository.getCompressor(), + blobStoreRepository.getNamedXContentRegistry() + ), + listener ); } if (uploadTransientSettingMetadata) { - uploadTasks.put( + uploadTasks.add(TRANSIENT_SETTING_METADATA); + remoteGlobalMetadataManager.writeAsync( TRANSIENT_SETTING_METADATA, - remoteGlobalMetadataManager.asyncWrite( - TRANSIENT_SETTING_METADATA, - new RemoteTransientSettingsMetadata( - clusterState.metadata().transientSettings(), - clusterState.metadata().version(), - clusterState.metadata().clusterUUID(), - blobStoreRepository.getCompressor(), - blobStoreRepository.getNamedXContentRegistry() - ), - listener - ) + new RemoteTransientSettingsMetadata( + clusterState.metadata().transientSettings(), + clusterState.metadata().version(), + clusterState.metadata().clusterUUID(), + blobStoreRepository.getCompressor(), + blobStoreRepository.getNamedXContentRegistry() + ), + listener ); } if (uploadCoordinationMetadata) { - uploadTasks.put( + uploadTasks.add(COORDINATION_METADATA); + remoteGlobalMetadataManager.writeAsync( COORDINATION_METADATA, - remoteGlobalMetadataManager.asyncWrite( - COORDINATION_METADATA, - new RemoteCoordinationMetadata( - clusterState.metadata().coordinationMetadata(), - clusterState.metadata().version(), - clusterState.metadata().clusterUUID(), - blobStoreRepository.getCompressor(), - blobStoreRepository.getNamedXContentRegistry() - ), - listener - ) + new RemoteCoordinationMetadata( + clusterState.metadata().coordinationMetadata(), + clusterState.metadata().version(), + clusterState.metadata().clusterUUID(), + blobStoreRepository.getCompressor(), + blobStoreRepository.getNamedXContentRegistry() + ), + listener ); } if (uploadTemplateMetadata) { - uploadTasks.put( + uploadTasks.add(TEMPLATES_METADATA); + remoteGlobalMetadataManager.writeAsync( TEMPLATES_METADATA, - remoteGlobalMetadataManager.asyncWrite( - TEMPLATES_METADATA, - new RemoteTemplatesMetadata( - clusterState.metadata().templatesMetadata(), - clusterState.metadata().version(), - clusterState.metadata().clusterUUID(), - blobStoreRepository.getCompressor(), - blobStoreRepository.getNamedXContentRegistry() - ), - listener - ) + new RemoteTemplatesMetadata( + clusterState.metadata().templatesMetadata(), + clusterState.metadata().version(), + clusterState.metadata().clusterUUID(), + blobStoreRepository.getCompressor(), + blobStoreRepository.getNamedXContentRegistry() + ), + listener ); } if (uploadDiscoveryNodes) { - uploadTasks.put( - DISCOVERY_NODES, - remoteClusterStateAttributesManager.asyncWrite( - RemoteDiscoveryNodes.DISCOVERY_NODES, - new RemoteDiscoveryNodes( - clusterState.nodes(), - clusterState.version(), - clusterState.stateUUID(), - blobStoreRepository.getCompressor() - ), - listener - ) + uploadTasks.add(DISCOVERY_NODES); + remoteClusterStateAttributesManager.writeAsync( + RemoteDiscoveryNodes.DISCOVERY_NODES, + new RemoteDiscoveryNodes( + clusterState.nodes(), + clusterState.version(), + clusterState.stateUUID(), + blobStoreRepository.getCompressor() + ), + listener ); } if (uploadClusterBlock) { - uploadTasks.put( - CLUSTER_BLOCKS, - remoteClusterStateAttributesManager.asyncWrite( - RemoteClusterBlocks.CLUSTER_BLOCKS, - new RemoteClusterBlocks( - clusterState.blocks(), - clusterState.version(), - clusterState.metadata().clusterUUID(), - blobStoreRepository.getCompressor() - ), - listener - ) + uploadTasks.add(CLUSTER_BLOCKS); + remoteClusterStateAttributesManager.writeAsync( + RemoteClusterBlocks.CLUSTER_BLOCKS, + new RemoteClusterBlocks( + clusterState.blocks(), + clusterState.version(), + clusterState.metadata().clusterUUID(), + blobStoreRepository.getCompressor() + ), + listener ); } if (uploadHashesOfConsistentSettings) { - uploadTasks.put( + uploadTasks.add(HASHES_OF_CONSISTENT_SETTINGS); + remoteGlobalMetadataManager.writeAsync( HASHES_OF_CONSISTENT_SETTINGS, - remoteGlobalMetadataManager.asyncWrite( - HASHES_OF_CONSISTENT_SETTINGS, - new RemoteHashesOfConsistentSettings( - (DiffableStringMap) clusterState.metadata().hashesOfConsistentSettings(), - clusterState.metadata().version(), - clusterState.metadata().clusterUUID(), - blobStoreRepository.getCompressor() - ), - listener - ) + new RemoteHashesOfConsistentSettings( + (DiffableStringMap) clusterState.metadata().hashesOfConsistentSettings(), + clusterState.metadata().version(), + clusterState.metadata().clusterUUID(), + blobStoreRepository.getCompressor() + ), + listener ); } customToUpload.forEach((key, value) -> { String customComponent = String.join(CUSTOM_DELIMITER, CUSTOM_METADATA, key); - uploadTasks.put( + uploadTasks.add(customComponent); + remoteGlobalMetadataManager.writeAsync( customComponent, - remoteGlobalMetadataManager.asyncWrite( - customComponent, - new RemoteCustomMetadata( - value, - key, - clusterState.metadata().version(), - clusterState.metadata().clusterUUID(), - blobStoreRepository.getCompressor(), - namedWriteableRegistry - ), - listener - ) + new RemoteCustomMetadata( + value, + key, + clusterState.metadata().version(), + clusterState.metadata().clusterUUID(), + blobStoreRepository.getCompressor(), + namedWriteableRegistry + ), + listener ); }); indexToUpload.forEach(indexMetadata -> { - uploadTasks.put( + uploadTasks.add(indexMetadata.getIndex().getName()); + remoteIndexMetadataManager.writeAsync( indexMetadata.getIndex().getName(), - remoteIndexMetadataManager.asyncWrite( - indexMetadata.getIndex().getName(), - new RemoteIndexMetadata( - indexMetadata, - clusterState.metadata().clusterUUID(), - blobStoreRepository.getCompressor(), - blobStoreRepository.getNamedXContentRegistry() - ), - listener - ) + new RemoteIndexMetadata( + indexMetadata, + clusterState.metadata().clusterUUID(), + blobStoreRepository.getCompressor(), + blobStoreRepository.getNamedXContentRegistry() + ), + listener ); }); clusterStateCustomToUpload.forEach((key, value) -> { - uploadTasks.put( - key, - remoteClusterStateAttributesManager.asyncWrite( - CLUSTER_STATE_CUSTOM, - new RemoteClusterStateCustoms( - value, - key, - clusterState.version(), - clusterState.metadata().clusterUUID(), - blobStoreRepository.getCompressor(), - namedWriteableRegistry - ), - listener - ) + uploadTasks.add(key); + remoteClusterStateAttributesManager.writeAsync( + CLUSTER_STATE_CUSTOM, + new RemoteClusterStateCustoms( + value, + key, + clusterState.version(), + clusterState.metadata().clusterUUID(), + blobStoreRepository.getCompressor(), + namedWriteableRegistry + ), + listener ); }); indicesRoutingToUpload.forEach(indexRoutingTable -> { - uploadTasks.put( - InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + indexRoutingTable.getIndex().getName(), - remoteRoutingTableService.getIndexRoutingAsyncAction( - clusterState, - indexRoutingTable, - listener, - getClusterMetadataBasePath( - blobStoreRepository, - clusterState.getClusterName().value(), - clusterState.metadata().clusterUUID() - ) + uploadTasks.add(InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + indexRoutingTable.getIndex().getName()); + remoteRoutingTableService.writeAsync( + clusterState, + indexRoutingTable, + listener, + getClusterMetadataBasePath( + blobStoreRepository, + clusterState.getClusterName().value(), + clusterState.metadata().clusterUUID() ) ); }); - - // start async upload of all required metadata files - for (CheckedRunnable uploadTask : uploadTasks.values()) { - uploadTask.run(); - } invokeIndexMetadataUploadListeners(indexToUpload, prevIndexMetadataByName, latch, exceptionList); try { @@ -705,7 +677,7 @@ UploadedMetadataResults writeMetadataInParallel( String.format( Locale.ROOT, "Timed out waiting for transfer of following metadata to complete - %s", - String.join(", ", uploadTasks.keySet()) + String.join(", ", uploadTasks) ) ); exceptionList.forEach(ex::addSuppressed); @@ -714,11 +686,7 @@ UploadedMetadataResults writeMetadataInParallel( } catch (InterruptedException ex) { exceptionList.forEach(ex::addSuppressed); RemoteStateTransferException exception = new RemoteStateTransferException( - String.format( - Locale.ROOT, - "Timed out waiting for transfer of metadata to complete - %s", - String.join(", ", uploadTasks.keySet()) - ), + String.format(Locale.ROOT, "Timed out waiting for transfer of metadata to complete - %s", String.join(", ", uploadTasks)), ex ); Thread.currentThread().interrupt(); @@ -726,11 +694,7 @@ UploadedMetadataResults writeMetadataInParallel( } if (!exceptionList.isEmpty()) { RemoteStateTransferException exception = new RemoteStateTransferException( - String.format( - Locale.ROOT, - "Exception during transfer of following metadata to Remote - %s", - String.join(", ", uploadTasks.keySet()) - ) + String.format(Locale.ROOT, "Exception during transfer of following metadata to Remote - %s", String.join(", ", uploadTasks)) ); exceptionList.forEach(exception::addSuppressed); throw exception; @@ -1015,7 +979,6 @@ private ClusterState readClusterStateInParallel( + (readTransientSettingsMetadata ? 1 : 0) + (readHashesOfConsistentSettings ? 1 : 0) + clusterStateCustomToRead.size() + indicesRoutingToRead.size(); CountDownLatch latch = new CountDownLatch(totalReadTasks); - List> asyncMetadataReadActions = new ArrayList<>(); List readResults = Collections.synchronizedList(new ArrayList<>()); List readIndexRoutingTableResults = Collections.synchronizedList(new ArrayList<>()); List exceptionList = Collections.synchronizedList(new ArrayList<>(totalReadTasks)); @@ -1029,17 +992,15 @@ private ClusterState readClusterStateInParallel( }), latch); for (UploadedIndexMetadata indexMetadata : indicesToRead) { - asyncMetadataReadActions.add( - remoteIndexMetadataManager.asyncRead( - indexMetadata.getIndexName(), - new RemoteIndexMetadata( - RemoteClusterStateUtils.getFormattedIndexFileName(indexMetadata.getUploadedFilename()), - clusterUUID, - blobStoreRepository.getCompressor(), - blobStoreRepository.getNamedXContentRegistry() - ), - listener - ) + remoteIndexMetadataManager.readAsync( + indexMetadata.getIndexName(), + new RemoteIndexMetadata( + RemoteClusterStateUtils.getFormattedIndexFileName(indexMetadata.getUploadedFilename()), + clusterUUID, + blobStoreRepository.getCompressor(), + blobStoreRepository.getNamedXContentRegistry() + ), + listener ); } @@ -1055,154 +1016,130 @@ private ClusterState readClusterStateInParallel( ); for (UploadedIndexMetadata indexRouting : indicesRoutingToRead) { - asyncMetadataReadActions.add( - remoteRoutingTableService.getAsyncIndexRoutingReadAction( - indexRouting.getUploadedFilename(), - new Index(indexRouting.getIndexName(), indexRouting.getIndexUUID()), - routingTableLatchedActionListener - ) + remoteRoutingTableService.getAsyncIndexRoutingReadAction( + indexRouting.getUploadedFilename(), + new Index(indexRouting.getIndexName(), indexRouting.getIndexUUID()), + routingTableLatchedActionListener ); } for (Map.Entry entry : customToRead.entrySet()) { - asyncMetadataReadActions.add( - remoteGlobalMetadataManager.asyncRead( - entry.getValue().getAttributeName(), - new RemoteCustomMetadata( - entry.getValue().getUploadedFilename(), - entry.getKey(), - clusterUUID, - blobStoreRepository.getCompressor(), - namedWriteableRegistry - ), - listener - ) + remoteGlobalMetadataManager.readAsync( + entry.getValue().getAttributeName(), + new RemoteCustomMetadata( + entry.getValue().getUploadedFilename(), + entry.getKey(), + clusterUUID, + blobStoreRepository.getCompressor(), + namedWriteableRegistry + ), + listener ); } if (readCoordinationMetadata) { - asyncMetadataReadActions.add( - remoteGlobalMetadataManager.asyncRead( - COORDINATION_METADATA, - new RemoteCoordinationMetadata( - manifest.getCoordinationMetadata().getUploadedFilename(), - clusterUUID, - blobStoreRepository.getCompressor(), - blobStoreRepository.getNamedXContentRegistry() - ), - listener - ) + remoteGlobalMetadataManager.readAsync( + COORDINATION_METADATA, + new RemoteCoordinationMetadata( + manifest.getCoordinationMetadata().getUploadedFilename(), + clusterUUID, + blobStoreRepository.getCompressor(), + blobStoreRepository.getNamedXContentRegistry() + ), + listener ); } if (readSettingsMetadata) { - asyncMetadataReadActions.add( - remoteGlobalMetadataManager.asyncRead( - SETTING_METADATA, - new RemotePersistentSettingsMetadata( - manifest.getSettingsMetadata().getUploadedFilename(), - clusterUUID, - blobStoreRepository.getCompressor(), - blobStoreRepository.getNamedXContentRegistry() - ), - listener - ) + remoteGlobalMetadataManager.readAsync( + SETTING_METADATA, + new RemotePersistentSettingsMetadata( + manifest.getSettingsMetadata().getUploadedFilename(), + clusterUUID, + blobStoreRepository.getCompressor(), + blobStoreRepository.getNamedXContentRegistry() + ), + listener ); } if (readTransientSettingsMetadata) { - asyncMetadataReadActions.add( - remoteGlobalMetadataManager.asyncRead( - TRANSIENT_SETTING_METADATA, - new RemoteTransientSettingsMetadata( - manifest.getTransientSettingsMetadata().getUploadedFilename(), - clusterUUID, - blobStoreRepository.getCompressor(), - blobStoreRepository.getNamedXContentRegistry() - ), - listener - ) + remoteGlobalMetadataManager.readAsync( + TRANSIENT_SETTING_METADATA, + new RemoteTransientSettingsMetadata( + manifest.getTransientSettingsMetadata().getUploadedFilename(), + clusterUUID, + blobStoreRepository.getCompressor(), + blobStoreRepository.getNamedXContentRegistry() + ), + listener ); } if (readTemplatesMetadata) { - asyncMetadataReadActions.add( - remoteGlobalMetadataManager.asyncRead( - TEMPLATES_METADATA, - new RemoteTemplatesMetadata( - manifest.getTemplatesMetadata().getUploadedFilename(), - clusterUUID, - blobStoreRepository.getCompressor(), - blobStoreRepository.getNamedXContentRegistry() - ), - listener - ) + remoteGlobalMetadataManager.readAsync( + TEMPLATES_METADATA, + new RemoteTemplatesMetadata( + manifest.getTemplatesMetadata().getUploadedFilename(), + clusterUUID, + blobStoreRepository.getCompressor(), + blobStoreRepository.getNamedXContentRegistry() + ), + listener ); } if (readDiscoveryNodes) { - asyncMetadataReadActions.add( - remoteClusterStateAttributesManager.asyncRead( - DISCOVERY_NODES, - new RemoteDiscoveryNodes( - manifest.getDiscoveryNodesMetadata().getUploadedFilename(), - clusterUUID, - blobStoreRepository.getCompressor() - ), - listener - ) + remoteClusterStateAttributesManager.readAsync( + DISCOVERY_NODES, + new RemoteDiscoveryNodes( + manifest.getDiscoveryNodesMetadata().getUploadedFilename(), + clusterUUID, + blobStoreRepository.getCompressor() + ), + listener ); } if (readClusterBlocks) { - asyncMetadataReadActions.add( - remoteClusterStateAttributesManager.asyncRead( - CLUSTER_BLOCKS, - new RemoteClusterBlocks( - manifest.getClusterBlocksMetadata().getUploadedFilename(), - clusterUUID, - blobStoreRepository.getCompressor() - ), - listener - ) + remoteClusterStateAttributesManager.readAsync( + CLUSTER_BLOCKS, + new RemoteClusterBlocks( + manifest.getClusterBlocksMetadata().getUploadedFilename(), + clusterUUID, + blobStoreRepository.getCompressor() + ), + listener ); } if (readHashesOfConsistentSettings) { - asyncMetadataReadActions.add( - remoteGlobalMetadataManager.asyncRead( - HASHES_OF_CONSISTENT_SETTINGS, - new RemoteHashesOfConsistentSettings( - manifest.getHashesOfConsistentSettings().getUploadedFilename(), - clusterUUID, - blobStoreRepository.getCompressor() - ), - listener - ) + remoteGlobalMetadataManager.readAsync( + HASHES_OF_CONSISTENT_SETTINGS, + new RemoteHashesOfConsistentSettings( + manifest.getHashesOfConsistentSettings().getUploadedFilename(), + clusterUUID, + blobStoreRepository.getCompressor() + ), + listener ); } for (Map.Entry entry : clusterStateCustomToRead.entrySet()) { - asyncMetadataReadActions.add( - remoteClusterStateAttributesManager.asyncRead( - // pass component name as cluster-state-custom--, so that we can interpret it later - String.join(CUSTOM_DELIMITER, CLUSTER_STATE_CUSTOM, entry.getKey()), - new RemoteClusterStateCustoms( - entry.getValue().getUploadedFilename(), - entry.getValue().getAttributeName(), - clusterUUID, - blobStoreRepository.getCompressor(), - namedWriteableRegistry - ), - listener - ) + remoteClusterStateAttributesManager.readAsync( + // pass component name as cluster-state-custom--, so that we can interpret it later + String.join(CUSTOM_DELIMITER, CLUSTER_STATE_CUSTOM, entry.getKey()), + new RemoteClusterStateCustoms( + entry.getValue().getUploadedFilename(), + entry.getValue().getAttributeName(), + clusterUUID, + blobStoreRepository.getCompressor(), + namedWriteableRegistry + ), + listener ); } - for (CheckedRunnable asyncMetadataReadAction : asyncMetadataReadActions) { - asyncMetadataReadAction.run(); - } - try { if (latch.await(this.remoteStateReadTimeout.getMillis(), TimeUnit.MILLISECONDS) == false) { RemoteStateTransferException exception = new RemoteStateTransferException( diff --git a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java index 839ebe1ff8301..66ea2dce9b606 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java @@ -19,7 +19,6 @@ import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.CheckedRunnable; import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobPath; @@ -31,6 +30,7 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.FeatureFlags; +import org.opensearch.common.util.TestCapturingListener; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.index.Index; @@ -57,6 +57,7 @@ import java.util.Locale; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; import java.util.function.Supplier; import org.mockito.ArgumentCaptor; @@ -347,23 +348,23 @@ public void testGetIndicesRoutingMapDiffIndexDeleted() { assertEquals(indexName, diff.getDeletes().get(0)); } - public void testGetIndexRoutingAsyncAction() throws IOException { + public void testWriteAsync() throws IOException, InterruptedException { String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); ClusterState clusterState = createClusterState(indexName); BlobPath expectedPath = getPath(); - LatchedActionListener listener = mock(LatchedActionListener.class); + TestCapturingListener listener = new TestCapturingListener<>(); + CountDownLatch latch = new CountDownLatch(1); when(blobStore.blobContainer(expectedPath)).thenReturn(blobContainer); remoteRoutingTableService.start(); - CheckedRunnable runnable = remoteRoutingTableService.getIndexRoutingAsyncAction( + remoteRoutingTableService.writeAsync( clusterState, clusterState.routingTable().getIndicesRouting().get(indexName), - listener, + new LatchedActionListener<>(listener, latch), basePath ); - assertNotNull(runnable); - runnable.run(); + latch.await(); String expectedFilePrefix = String.join( DELIMITER, @@ -372,27 +373,33 @@ public void testGetIndexRoutingAsyncAction() throws IOException { RemoteStoreUtils.invertLong(clusterState.version()) ); verify(blobContainer, times(1)).writeBlob(startsWith(expectedFilePrefix), any(StreamInput.class), anyLong(), eq(true)); - verify(listener, times(1)).onResponse(any(ClusterMetadataManifest.UploadedMetadata.class)); + assertNull(listener.getFailure()); + assertNotNull(listener.getResult()); + assertTrue(listener.getResult() instanceof ClusterMetadataManifest.UploadedIndexMetadata); + ClusterMetadataManifest.UploadedIndexMetadata uploadedIndexMetadata = (ClusterMetadataManifest.UploadedIndexMetadata) listener + .getResult(); + assertEquals(indexName, uploadedIndexMetadata.getIndexName()); } - public void testGetIndexRoutingAsyncActionFailureInBlobRepo() throws IOException { + public void testWriteAsync_FailureInBlobRepo() throws IOException, InterruptedException { String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); ClusterState clusterState = createClusterState(indexName); BlobPath expectedPath = getPath(); - LatchedActionListener listener = mock(LatchedActionListener.class); + TestCapturingListener listener = new TestCapturingListener<>(); + CountDownLatch latch = new CountDownLatch(1); when(blobStore.blobContainer(expectedPath)).thenReturn(blobContainer); - doThrow(new IOException("testing failure")).when(blobContainer).writeBlob(anyString(), any(StreamInput.class), anyLong(), eq(true)); + IOException exception = new IOException("testing failure"); + doThrow(exception).when(blobContainer).writeBlob(anyString(), any(StreamInput.class), anyLong(), eq(true)); remoteRoutingTableService.start(); - CheckedRunnable runnable = remoteRoutingTableService.getIndexRoutingAsyncAction( + remoteRoutingTableService.writeAsync( clusterState, clusterState.routingTable().getIndicesRouting().get(indexName), - listener, + new LatchedActionListener<>(listener, latch), basePath ); - assertNotNull(runnable); - runnable.run(); + latch.await(); String expectedFilePrefix = String.join( DELIMITER, INDEX_ROUTING_FILE_PREFIX, @@ -400,15 +407,19 @@ public void testGetIndexRoutingAsyncActionFailureInBlobRepo() throws IOException RemoteStoreUtils.invertLong(clusterState.version()) ); verify(blobContainer, times(1)).writeBlob(startsWith(expectedFilePrefix), any(StreamInput.class), anyLong(), eq(true)); - verify(listener, times(1)).onFailure(any(RemoteStateTransferException.class)); + assertNull(listener.getResult()); + assertNotNull(listener.getFailure()); + assertTrue(listener.getFailure() instanceof RemoteStateTransferException); + assertEquals(exception, listener.getFailure().getCause()); } - public void testGetIndexRoutingAsyncActionAsyncRepo() throws IOException { + public void testWriteAsync_AsyncRepo() throws IOException, InterruptedException { String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); ClusterState clusterState = createClusterState(indexName); BlobPath expectedPath = getPath(); - LatchedActionListener listener = mock(LatchedActionListener.class); + TestCapturingListener listener = new TestCapturingListener<>(); + CountDownLatch latch = new CountDownLatch(1); blobContainer = mock(AsyncMultiStreamBlobContainer.class); when(blobStore.blobContainer(expectedPath)).thenReturn(blobContainer); ArgumentCaptor> actionListenerArgumentCaptor = ArgumentCaptor.forClass(ActionListener.class); @@ -424,14 +435,12 @@ public void testGetIndexRoutingAsyncActionAsyncRepo() throws IOException { .asyncBlobUpload(writeContextArgumentCaptor.capture(), actionListenerArgumentCaptor.capture()); remoteRoutingTableService.start(); - CheckedRunnable runnable = remoteRoutingTableService.getIndexRoutingAsyncAction( + remoteRoutingTableService.writeAsync( clusterState, clusterState.routingTable().getIndicesRouting().get(indexName), - listener, + new LatchedActionListener<>(listener, latch), basePath ); - assertNotNull(runnable); - runnable.run(); String expectedFilePrefix = String.join( DELIMITER, @@ -439,6 +448,13 @@ public void testGetIndexRoutingAsyncActionAsyncRepo() throws IOException { RemoteStoreUtils.invertLong(clusterState.term()), RemoteStoreUtils.invertLong(clusterState.version()) ); + latch.await(); + assertNull(listener.getFailure()); + assertNotNull(listener.getResult()); + assertTrue(listener.getResult() instanceof ClusterMetadataManifest.UploadedIndexMetadata); + ClusterMetadataManifest.UploadedIndexMetadata uploadedIndexMetadata = (ClusterMetadataManifest.UploadedIndexMetadata) listener + .getResult(); + assertEquals(indexName, uploadedIndexMetadata.getIndexName()); assertEquals(1, actionListenerArgumentCaptor.getAllValues().size()); assertEquals(1, writeContextArgumentCaptor.getAllValues().size()); assertNotNull(capturedWriteContext.get("index_routing")); @@ -446,28 +462,32 @@ public void testGetIndexRoutingAsyncActionAsyncRepo() throws IOException { assertTrue(capturedWriteContext.get("index_routing").getFileName().startsWith(expectedFilePrefix)); } - public void testGetIndexRoutingAsyncActionAsyncRepoFailureInRepo() throws IOException { + public void testWriteAsync_AsyncRepoFailureInRepo() throws IOException, InterruptedException { String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); ClusterState clusterState = createClusterState(indexName); BlobPath expectedPath = getPath(); - LatchedActionListener listener = mock(LatchedActionListener.class); + TestCapturingListener listener = new TestCapturingListener<>(); + CountDownLatch latch = new CountDownLatch(1); blobContainer = mock(AsyncMultiStreamBlobContainer.class); when(blobStore.blobContainer(expectedPath)).thenReturn(blobContainer); - doThrow(new IOException("Testing failure")).when((AsyncMultiStreamBlobContainer) blobContainer) + IOException exception = new IOException("Testing failure"); + doThrow(exception).when((AsyncMultiStreamBlobContainer) blobContainer) .asyncBlobUpload(any(WriteContext.class), any(ActionListener.class)); remoteRoutingTableService.start(); - CheckedRunnable runnable = remoteRoutingTableService.getIndexRoutingAsyncAction( + remoteRoutingTableService.writeAsync( clusterState, clusterState.routingTable().getIndicesRouting().get(indexName), - listener, + new LatchedActionListener<>(listener, latch), basePath ); - assertNotNull(runnable); - runnable.run(); - verify(listener, times(1)).onFailure(any(RemoteStateTransferException.class)); + latch.await(); + assertNull(listener.getResult()); + assertNotNull(listener.getFailure()); + assertTrue(listener.getFailure() instanceof RemoteStateTransferException); + assertEquals(exception, listener.getFailure().getCause()); } public void testGetAllUploadedIndicesRouting() { @@ -656,9 +676,7 @@ public void testGetAsyncIndexMetadataReadAction() throws Exception { when(blobContainer.readBlob(indexName)).thenReturn(streamOutput.bytes().streamInput()); remoteRoutingTableService.start(); - CheckedRunnable runnable = remoteRoutingTableService.getAsyncIndexRoutingReadAction(uploadedFileName, index, listener); - assertNotNull(runnable); - runnable.run(); + remoteRoutingTableService.getAsyncIndexRoutingReadAction(uploadedFileName, index, listener); assertBusy(() -> verify(blobContainer, times(1)).readBlob(any())); assertBusy(() -> verify(listener, times(1)).onResponse(any(IndexRoutingTable.class))); @@ -680,9 +698,7 @@ public void testGetAsyncIndexMetadataReadActionFailureForIncorrectIndex() throws when(blobContainer.readBlob(anyString())).thenReturn(streamOutput.bytes().streamInput()); remoteRoutingTableService.doStart(); - CheckedRunnable runnable = remoteRoutingTableService.getAsyncIndexRoutingReadAction(uploadedFileName, index, listener); - assertNotNull(runnable); - runnable.run(); + remoteRoutingTableService.getAsyncIndexRoutingReadAction(uploadedFileName, index, listener); assertBusy(() -> verify(blobContainer, times(1)).readBlob(any())); assertBusy(() -> verify(listener, times(1)).onFailure(any(Exception.class))); @@ -698,9 +714,7 @@ public void testGetAsyncIndexMetadataReadActionFailureInBlobRepo() throws Except doThrow(new IOException("testing failure")).when(blobContainer).readBlob(indexName); remoteRoutingTableService.doStart(); - CheckedRunnable runnable = remoteRoutingTableService.getAsyncIndexRoutingReadAction(uploadedFileName, index, listener); - assertNotNull(runnable); - runnable.run(); + remoteRoutingTableService.getAsyncIndexRoutingReadAction(uploadedFileName, index, listener); assertBusy(() -> verify(listener, times(1)).onFailure(any(RemoteStateTransferException.class))); } diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManagerTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManagerTests.java index 055935035b003..5d2b0d3e3946e 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManagerTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManagerTests.java @@ -118,8 +118,7 @@ public void testGetAsyncWriteRunnable_DiscoveryNodes() throws IOException, Inter .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); final CountDownLatch latch = new CountDownLatch(1); final TestCapturingListener listener = new TestCapturingListener<>(); - remoteClusterStateAttributesManager.asyncWrite(DISCOVERY_NODES, remoteDiscoveryNodes, new LatchedActionListener<>(listener, latch)) - .run(); + remoteClusterStateAttributesManager.writeAsync(DISCOVERY_NODES, remoteDiscoveryNodes, new LatchedActionListener<>(listener, latch)); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -147,8 +146,7 @@ public void testGetAsyncReadRunnable_DiscoveryNodes() throws IOException, Interr RemoteDiscoveryNodes remoteObjForDownload = new RemoteDiscoveryNodes(fileName, "cluster-uuid", compressor); CountDownLatch latch = new CountDownLatch(1); TestCapturingListener listener = new TestCapturingListener<>(); - remoteClusterStateAttributesManager.asyncRead(DISCOVERY_NODES, remoteObjForDownload, new LatchedActionListener<>(listener, latch)) - .run(); + remoteClusterStateAttributesManager.readAsync(DISCOVERY_NODES, remoteObjForDownload, new LatchedActionListener<>(listener, latch)); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -170,8 +168,7 @@ public void testGetAsyncWriteRunnable_ClusterBlocks() throws IOException, Interr .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); final CountDownLatch latch = new CountDownLatch(1); final TestCapturingListener listener = new TestCapturingListener<>(); - remoteClusterStateAttributesManager.asyncWrite(CLUSTER_BLOCKS, remoteClusterBlocks, new LatchedActionListener<>(listener, latch)) - .run(); + remoteClusterStateAttributesManager.writeAsync(CLUSTER_BLOCKS, remoteClusterBlocks, new LatchedActionListener<>(listener, latch)); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -200,8 +197,7 @@ public void testGetAsyncReadRunnable_ClusterBlocks() throws IOException, Interru CountDownLatch latch = new CountDownLatch(1); TestCapturingListener listener = new TestCapturingListener<>(); - remoteClusterStateAttributesManager.asyncRead(CLUSTER_BLOCKS, remoteClusterBlocks, new LatchedActionListener<>(listener, latch)) - .run(); + remoteClusterStateAttributesManager.readAsync(CLUSTER_BLOCKS, remoteClusterBlocks, new LatchedActionListener<>(listener, latch)); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -232,11 +228,11 @@ public void testGetAsyncWriteRunnable_Custom() throws IOException, InterruptedEx .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); final TestCapturingListener listener = new TestCapturingListener<>(); final CountDownLatch latch = new CountDownLatch(1); - remoteClusterStateAttributesManager.asyncWrite( + remoteClusterStateAttributesManager.writeAsync( CLUSTER_STATE_CUSTOM, remoteClusterStateCustoms, new LatchedActionListener<>(listener, latch) - ).run(); + ); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -270,11 +266,11 @@ public void testGetAsyncReadRunnable_Custom() throws IOException, InterruptedExc ); TestCapturingListener capturingListener = new TestCapturingListener<>(); final CountDownLatch latch = new CountDownLatch(1); - remoteClusterStateAttributesManager.asyncRead( + remoteClusterStateAttributesManager.readAsync( CLUSTER_STATE_CUSTOM, remoteClusterStateCustoms, new LatchedActionListener<>(capturingListener, latch) - ).run(); + ); latch.await(); assertNull(capturingListener.getFailure()); assertNotNull(capturingListener.getResult()); @@ -296,11 +292,11 @@ public void testGetAsyncWriteRunnable_Exception() throws IOException, Interrupte TestCapturingListener capturingListener = new TestCapturingListener<>(); final CountDownLatch latch = new CountDownLatch(1); - remoteClusterStateAttributesManager.asyncWrite( + remoteClusterStateAttributesManager.writeAsync( DISCOVERY_NODES, remoteDiscoveryNodes, new LatchedActionListener<>(capturingListener, latch) - ).run(); + ); latch.await(); assertNull(capturingListener.getResult()); assertTrue(capturingListener.getFailure() instanceof RemoteStateTransferException); @@ -314,11 +310,11 @@ public void testGetAsyncReadRunnable_Exception() throws IOException, Interrupted when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenThrow(ioException); CountDownLatch latch = new CountDownLatch(1); TestCapturingListener capturingListener = new TestCapturingListener<>(); - remoteClusterStateAttributesManager.asyncRead( + remoteClusterStateAttributesManager.readAsync( DISCOVERY_NODES, remoteDiscoveryNodes, new LatchedActionListener<>(capturingListener, latch) - ).run(); + ); latch.await(); assertNull(capturingListener.getResult()); assertEquals(ioException, capturingListener.getFailure().getCause()); diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManagerTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManagerTests.java index 6cc03a6d90d91..fa73affc928d0 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManagerTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManagerTests.java @@ -172,11 +172,11 @@ public void testGetAsyncReadRunnable_CoordinationMetadata() throws Exception { TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.asyncRead( + remoteGlobalMetadataManager.readAsync( COORDINATION_METADATA, coordinationMetadataForDownload, new LatchedActionListener<>(listener, latch) - ).run(); + ); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -202,11 +202,11 @@ public void testGetAsyncWriteRunnable_CoordinationMetadata() throws Exception { TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.asyncWrite( + remoteGlobalMetadataManager.writeAsync( COORDINATION_METADATA, remoteCoordinationMetadata, new LatchedActionListener<>(listener, latch) - ).run(); + ); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -242,7 +242,7 @@ public void testGetAsyncReadRunnable_PersistentSettings() throws Exception { TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.asyncRead(SETTING_METADATA, persistentSettings, new LatchedActionListener<>(listener, latch)).run(); + remoteGlobalMetadataManager.readAsync(SETTING_METADATA, persistentSettings, new LatchedActionListener<>(listener, latch)); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -267,7 +267,7 @@ public void testGetAsyncWriteRunnable_PersistentSettings() throws Exception { .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.asyncWrite(SETTING_METADATA, persistentSettings, new LatchedActionListener<>(listener, latch)).run(); + remoteGlobalMetadataManager.writeAsync(SETTING_METADATA, persistentSettings, new LatchedActionListener<>(listener, latch)); latch.await(); assertNull(listener.getFailure()); @@ -304,8 +304,7 @@ public void testGetAsyncReadRunnable_TransientSettings() throws Exception { TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.asyncRead(TRANSIENT_SETTING_METADATA, transientSettings, new LatchedActionListener<>(listener, latch)) - .run(); + remoteGlobalMetadataManager.readAsync(TRANSIENT_SETTING_METADATA, transientSettings, new LatchedActionListener<>(listener, latch)); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -330,8 +329,7 @@ public void testGetAsyncWriteRunnable_TransientSettings() throws Exception { .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.asyncWrite(TRANSIENT_SETTING_METADATA, transientSettings, new LatchedActionListener<>(listener, latch)) - .run(); + remoteGlobalMetadataManager.writeAsync(TRANSIENT_SETTING_METADATA, transientSettings, new LatchedActionListener<>(listener, latch)); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -365,11 +363,11 @@ public void testGetAsyncReadRunnable_HashesOfConsistentSettings() throws Excepti TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.asyncRead( + remoteGlobalMetadataManager.readAsync( HASHES_OF_CONSISTENT_SETTINGS, hashesOfConsistentSettingsForDownload, new LatchedActionListener<>(listener, latch) - ).run(); + ); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -393,11 +391,11 @@ public void testGetAsyncWriteRunnable_HashesOfConsistentSettings() throws Except .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.asyncWrite( + remoteGlobalMetadataManager.writeAsync( HASHES_OF_CONSISTENT_SETTINGS, hashesOfConsistentSettingsForUpload, new LatchedActionListener<>(listener, latch) - ).run(); + ); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -431,11 +429,11 @@ public void testGetAsyncReadRunnable_TemplatesMetadata() throws Exception { ); TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.asyncRead( + remoteGlobalMetadataManager.readAsync( TEMPLATES_METADATA, templatesMetadataForDownload, new LatchedActionListener<>(listener, latch) - ).run(); + ); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -460,8 +458,7 @@ public void testGetAsyncWriteRunnable_TemplatesMetadata() throws Exception { .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.asyncWrite(TEMPLATES_METADATA, templateMetadataForUpload, new LatchedActionListener<>(listener, latch)) - .run(); + remoteGlobalMetadataManager.writeAsync(TEMPLATES_METADATA, templateMetadataForUpload, new LatchedActionListener<>(listener, latch)); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -496,8 +493,7 @@ public void testGetAsyncReadRunnable_CustomMetadata() throws Exception { ); TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.asyncRead(IndexGraveyard.TYPE, customMetadataForDownload, new LatchedActionListener<>(listener, latch)) - .run(); + remoteGlobalMetadataManager.readAsync(IndexGraveyard.TYPE, customMetadataForDownload, new LatchedActionListener<>(listener, latch)); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -523,11 +519,11 @@ public void testGetAsyncWriteRunnable_CustomMetadata() throws Exception { .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.asyncWrite( + remoteGlobalMetadataManager.writeAsync( customMetadataForUpload.getType(), customMetadataForUpload, new LatchedActionListener<>(listener, latch) - ).run(); + ); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -556,8 +552,7 @@ public void testGetAsyncReadRunnable_GlobalMetadata() throws Exception { ); TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.asyncRead(GLOBAL_METADATA, globalMetadataForDownload, new LatchedActionListener<>(listener, latch)) - .run(); + remoteGlobalMetadataManager.readAsync(GLOBAL_METADATA, globalMetadataForDownload, new LatchedActionListener<>(listener, latch)); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -578,11 +573,11 @@ public void testGetAsyncReadRunnable_IOException() throws Exception { when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenThrow(ioException); TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.asyncRead( + remoteGlobalMetadataManager.readAsync( COORDINATION_METADATA, coordinationMetadataForDownload, new LatchedActionListener<>(listener, latch) - ).run(); + ); latch.await(); assertNull(listener.getResult()); assertNotNull(listener.getFailure()); @@ -608,11 +603,11 @@ public void testGetAsyncWriteRunnable_IOException() throws Exception { TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.asyncWrite( + remoteGlobalMetadataManager.writeAsync( COORDINATION_METADATA, remoteCoordinationMetadata, new LatchedActionListener<>(listener, latch) - ).run(); + ); assertNull(listener.getResult()); assertNotNull(listener.getFailure()); assertTrue(listener.getFailure() instanceof RemoteStateTransferException); diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteIndexMetadataManagerTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteIndexMetadataManagerTests.java index ae8deab7decb1..76c5792677ea0 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteIndexMetadataManagerTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteIndexMetadataManagerTests.java @@ -98,11 +98,11 @@ public void testGetAsyncWriteRunnable_Success() throws Exception { return null; })).when(blobStoreTransferService).uploadBlob(any(), any(), any(), eq(WritePriority.URGENT), any(ActionListener.class)); - remoteIndexMetadataManager.asyncWrite( + remoteIndexMetadataManager.writeAsync( INDEX, new RemoteIndexMetadata(indexMetadata, "cluster-uuid", compressor, null), new LatchedActionListener<>(listener, latch) - ).run(); + ); latch.await(); assertNull(listener.getFailure()); @@ -130,11 +130,11 @@ public void testGetAsyncWriteRunnable_IOFailure() throws Exception { return null; })).when(blobStoreTransferService).uploadBlob(any(), any(), any(), eq(WritePriority.URGENT), any(ActionListener.class)); - remoteIndexMetadataManager.asyncWrite( + remoteIndexMetadataManager.writeAsync( INDEX, new RemoteIndexMetadata(indexMetadata, "cluster-uuid", compressor, null), new LatchedActionListener<>(listener, latch) - ).run(); + ); latch.await(); assertNull(listener.getResult()); assertNotNull(listener.getFailure()); @@ -151,11 +151,11 @@ public void testGetAsyncReadRunnable_Success() throws Exception { TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteIndexMetadataManager.asyncRead( + remoteIndexMetadataManager.readAsync( INDEX, new RemoteIndexMetadata(fileName, "cluster-uuid", compressor, null), new LatchedActionListener<>(listener, latch) - ).run(); + ); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -170,11 +170,11 @@ public void testGetAsyncReadRunnable_IOFailure() throws Exception { TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteIndexMetadataManager.asyncRead( + remoteIndexMetadataManager.readAsync( INDEX, new RemoteIndexMetadata(fileName, "cluster-uuid", compressor, null), new LatchedActionListener<>(listener, latch) - ).run(); + ); latch.await(); assertNull(listener.getResult()); assertNotNull(listener.getFailure());