From 7017bb61271736bef08ca577ff2cb1e0c28c0d5c Mon Sep 17 00:00:00 2001 From: Shivansh Arora Date: Wed, 10 Jul 2024 15:16:11 +0530 Subject: [PATCH] Rename methods to suggested ones in review Signed-off-by: Shivansh Arora --- ... AbstractRemoteWritableEntityManager.java} | 23 ++++--- ....java => RemoteWritableEntityManager.java} | 18 +++--- .../RemoteClusterStateAttributesManager.java | 19 +++--- .../remote/RemoteClusterStateService.java | 40 ++++++------- .../remote/RemoteGlobalMetadataManager.java | 17 +++--- .../remote/RemoteIndexMetadataManager.java | 17 +++--- ...ractRemoteWritableEntityManagerTests.java} | 13 ++-- ...oteClusterStateAttributesManagerTests.java | 36 ++++------- .../RemoteGlobalMetadataManagerTests.java | 60 +++++++------------ .../RemoteIndexMetadataManagerTests.java | 8 +-- 10 files changed, 106 insertions(+), 145 deletions(-) rename server/src/main/java/org/opensearch/common/remote/{AbstractRemoteEntitiesManager.java => AbstractRemoteWritableEntityManager.java} (75%) rename server/src/main/java/org/opensearch/common/remote/{RemoteEntitiesManager.java => RemoteWritableEntityManager.java} (66%) rename server/src/test/java/org/opensearch/common/remote/{AbstractRemoteEntitiesManagerTests.java => AbstractRemoteWritableEntityManagerTests.java} (78%) diff --git a/server/src/main/java/org/opensearch/common/remote/AbstractRemoteEntitiesManager.java b/server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableEntityManager.java similarity index 75% rename from server/src/main/java/org/opensearch/common/remote/AbstractRemoteEntitiesManager.java rename to server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableEntityManager.java index 9023c2d95902d..7680b4a420aef 100644 --- a/server/src/main/java/org/opensearch/common/remote/AbstractRemoteEntitiesManager.java +++ b/server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableEntityManager.java @@ -8,7 +8,6 @@ package org.opensearch.common.remote; -import org.opensearch.action.LatchedActionListener; import org.opensearch.common.CheckedRunnable; import org.opensearch.core.action.ActionListener; import org.opensearch.gateway.remote.ClusterMetadataManifest; @@ -21,7 +20,7 @@ /** * An abstract class that provides a base implementation for managing remote entities in the remote store. */ -public abstract class AbstractRemoteEntitiesManager implements RemoteEntitiesManager { +public abstract class AbstractRemoteWritableEntityManager implements RemoteWritableEntityManager { /** * A map that stores the remote writable entity stores, keyed by the entity type. */ @@ -47,13 +46,13 @@ protected RemoteWritableEntityStore getStore(AbstractRemoteWritableBlobEntity en * * @param component the component for which the write operation is performed * @param remoteObject the remote object to be written - * @param latchedActionListener the latched action listener to be notified when the write operation completes + * @param listener the listener to be notified when the write operation completes * @return an ActionListener for handling the write operation */ protected abstract ActionListener getWriteActionListener( String component, AbstractRemoteWritableBlobEntity remoteObject, - LatchedActionListener latchedActionListener + ActionListener listener ); /** @@ -62,30 +61,30 @@ protected abstract ActionListener getWriteActionListener( * * @param component the component for which the read operation is performed * @param remoteObject the remote object to be read - * @param latchedActionListener the latched action listener to be notified when the read operation completes + * @param listener the listener to be notified when the read operation completes * @return an ActionListener for handling the read operation */ protected abstract ActionListener getReadActionListener( String component, AbstractRemoteWritableBlobEntity remoteObject, - LatchedActionListener latchedActionListener + ActionListener listener ); @Override - public CheckedRunnable getAsyncWriteRunnable( + public CheckedRunnable asyncWrite( String component, AbstractRemoteWritableBlobEntity entity, - LatchedActionListener latchedActionListener + ActionListener listener ) { - return () -> getStore(entity).writeAsync(entity, getWriteActionListener(component, entity, latchedActionListener)); + return () -> getStore(entity).writeAsync(entity, getWriteActionListener(component, entity, listener)); } @Override - public CheckedRunnable getAsyncReadRunnable( + public CheckedRunnable asyncRead( String component, AbstractRemoteWritableBlobEntity entity, - LatchedActionListener latchedActionListener + ActionListener listener ) { - return () -> getStore(entity).readAsync(entity, getReadActionListener(component, entity, latchedActionListener)); + return () -> getStore(entity).readAsync(entity, getReadActionListener(component, entity, listener)); } } diff --git a/server/src/main/java/org/opensearch/common/remote/RemoteEntitiesManager.java b/server/src/main/java/org/opensearch/common/remote/RemoteWritableEntityManager.java similarity index 66% rename from server/src/main/java/org/opensearch/common/remote/RemoteEntitiesManager.java rename to server/src/main/java/org/opensearch/common/remote/RemoteWritableEntityManager.java index f01d3ab509fbc..bcaabac837b1f 100644 --- a/server/src/main/java/org/opensearch/common/remote/RemoteEntitiesManager.java +++ b/server/src/main/java/org/opensearch/common/remote/RemoteWritableEntityManager.java @@ -8,30 +8,30 @@ package org.opensearch.common.remote; -import org.opensearch.action.LatchedActionListener; import org.opensearch.common.CheckedRunnable; +import org.opensearch.core.action.ActionListener; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata; import org.opensearch.gateway.remote.model.RemoteReadResult; import java.io.IOException; /** - * The RemoteEntitiesManager interface provides async read and write methods for managing remote entities in the remote store + * The RemoteWritableEntityManager interface provides async read and write methods for managing remote entities in the remote store */ -public interface RemoteEntitiesManager { +public interface RemoteWritableEntityManager { /** * Returns a CheckedRunnable that performs an asynchronous read operation for the specified component and entity. * * @param component the component for which the read operation is performed * @param entity the entity to be read - * @param latchedActionListener the listener to be notified when the read operation completes + * @param listener the listener to be notified when the read operation completes * @return a CheckedRunnable that performs the asynchronous read operation */ - CheckedRunnable getAsyncReadRunnable( + CheckedRunnable asyncRead( String component, AbstractRemoteWritableBlobEntity entity, - LatchedActionListener latchedActionListener + ActionListener listener ); /** @@ -39,12 +39,12 @@ CheckedRunnable getAsyncReadRunnable( * * @param component the component for which the write operation is performed * @param entity the entity to be written - * @param latchedActionListener the listener to be notified when the write operation completes + * @param listener the listener to be notified when the write operation completes * @return a CheckedRunnable that performs the asynchronous write operation */ - CheckedRunnable getAsyncWriteRunnable( + CheckedRunnable asyncWrite( String component, AbstractRemoteWritableBlobEntity entity, - LatchedActionListener latchedActionListener + ActionListener listener ); } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java index 22a00ee5d6353..258e537577261 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java @@ -8,12 +8,11 @@ package org.opensearch.gateway.remote; -import org.opensearch.action.LatchedActionListener; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.DiffableUtils; import org.opensearch.cluster.DiffableUtils.NonDiffableValueSerializer; -import org.opensearch.common.remote.AbstractRemoteEntitiesManager; import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity; +import org.opensearch.common.remote.AbstractRemoteWritableEntityManager; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.gateway.remote.model.RemoteClusterBlocks; @@ -33,12 +32,11 @@ * * @opensearch.internal */ -public class RemoteClusterStateAttributesManager extends AbstractRemoteEntitiesManager { +public class RemoteClusterStateAttributesManager extends AbstractRemoteWritableEntityManager { public static final String CLUSTER_STATE_ATTRIBUTE = "cluster_state_attribute"; public static final String DISCOVERY_NODES = "nodes"; public static final String CLUSTER_BLOCKS = "blocks"; public static final int CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION = 1; - private final NamedWriteableRegistry namedWriteableRegistry; RemoteClusterStateAttributesManager( String clusterName, @@ -47,7 +45,6 @@ public class RemoteClusterStateAttributesManager extends AbstractRemoteEntitiesM NamedWriteableRegistry namedWriteableRegistry, ThreadPool threadpool ) { - this.namedWriteableRegistry = namedWriteableRegistry; this.remoteWritableEntityStores.put( RemoteDiscoveryNodes.DISCOVERY_NODES, new RemoteClusterStateBlobStore<>( @@ -84,11 +81,11 @@ public class RemoteClusterStateAttributesManager extends AbstractRemoteEntitiesM protected ActionListener getWriteActionListener( String component, AbstractRemoteWritableBlobEntity remoteObject, - LatchedActionListener latchedActionListener + ActionListener listener ) { return ActionListener.wrap( - resp -> latchedActionListener.onResponse(remoteObject.getUploadedMetadata()), - ex -> latchedActionListener.onFailure(new RemoteStateTransferException("Upload failed for " + component, remoteObject, ex)) + resp -> listener.onResponse(remoteObject.getUploadedMetadata()), + ex -> listener.onFailure(new RemoteStateTransferException("Upload failed for " + component, remoteObject, ex)) ); } @@ -96,11 +93,11 @@ protected ActionListener getWriteActionListener( protected ActionListener getReadActionListener( String component, AbstractRemoteWritableBlobEntity remoteObject, - LatchedActionListener latchedActionListener + ActionListener listener ) { return ActionListener.wrap( - response -> latchedActionListener.onResponse(new RemoteReadResult(response, CLUSTER_STATE_ATTRIBUTE, component)), - ex -> latchedActionListener.onFailure(new RemoteStateTransferException("Download failed for " + component, remoteObject, ex)) + response -> listener.onResponse(new RemoteReadResult(response, CLUSTER_STATE_ATTRIBUTE, component)), + ex -> listener.onFailure(new RemoteStateTransferException("Download failed for " + component, remoteObject, ex)) ); } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 30e5c58dcad5c..6959516570e9d 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -519,7 +519,7 @@ UploadedMetadataResults writeMetadataInParallel( if (uploadSettingsMetadata) { uploadTasks.put( SETTING_METADATA, - remoteGlobalMetadataManager.getAsyncWriteRunnable( + remoteGlobalMetadataManager.asyncWrite( SETTING_METADATA, new RemotePersistentSettingsMetadata( clusterState.metadata().persistentSettings(), @@ -535,7 +535,7 @@ UploadedMetadataResults writeMetadataInParallel( if (uploadTransientSettingMetadata) { uploadTasks.put( TRANSIENT_SETTING_METADATA, - remoteGlobalMetadataManager.getAsyncWriteRunnable( + remoteGlobalMetadataManager.asyncWrite( TRANSIENT_SETTING_METADATA, new RemoteTransientSettingsMetadata( clusterState.metadata().transientSettings(), @@ -551,7 +551,7 @@ UploadedMetadataResults writeMetadataInParallel( if (uploadCoordinationMetadata) { uploadTasks.put( COORDINATION_METADATA, - remoteGlobalMetadataManager.getAsyncWriteRunnable( + remoteGlobalMetadataManager.asyncWrite( COORDINATION_METADATA, new RemoteCoordinationMetadata( clusterState.metadata().coordinationMetadata(), @@ -567,7 +567,7 @@ UploadedMetadataResults writeMetadataInParallel( if (uploadTemplateMetadata) { uploadTasks.put( TEMPLATES_METADATA, - remoteGlobalMetadataManager.getAsyncWriteRunnable( + remoteGlobalMetadataManager.asyncWrite( TEMPLATES_METADATA, new RemoteTemplatesMetadata( clusterState.metadata().templatesMetadata(), @@ -583,7 +583,7 @@ UploadedMetadataResults writeMetadataInParallel( if (uploadDiscoveryNodes) { uploadTasks.put( DISCOVERY_NODES, - remoteClusterStateAttributesManager.getAsyncWriteRunnable( + remoteClusterStateAttributesManager.asyncWrite( RemoteDiscoveryNodes.DISCOVERY_NODES, new RemoteDiscoveryNodes( clusterState.nodes(), @@ -598,7 +598,7 @@ UploadedMetadataResults writeMetadataInParallel( if (uploadClusterBlock) { uploadTasks.put( CLUSTER_BLOCKS, - remoteClusterStateAttributesManager.getAsyncWriteRunnable( + remoteClusterStateAttributesManager.asyncWrite( RemoteClusterBlocks.CLUSTER_BLOCKS, new RemoteClusterBlocks( clusterState.blocks(), @@ -613,7 +613,7 @@ UploadedMetadataResults writeMetadataInParallel( if (uploadHashesOfConsistentSettings) { uploadTasks.put( HASHES_OF_CONSISTENT_SETTINGS, - remoteGlobalMetadataManager.getAsyncWriteRunnable( + remoteGlobalMetadataManager.asyncWrite( HASHES_OF_CONSISTENT_SETTINGS, new RemoteHashesOfConsistentSettings( (DiffableStringMap) clusterState.metadata().hashesOfConsistentSettings(), @@ -629,7 +629,7 @@ UploadedMetadataResults writeMetadataInParallel( String customComponent = String.join(CUSTOM_DELIMITER, CUSTOM_METADATA, key); uploadTasks.put( customComponent, - remoteGlobalMetadataManager.getAsyncWriteRunnable( + remoteGlobalMetadataManager.asyncWrite( customComponent, new RemoteCustomMetadata( value, @@ -646,7 +646,7 @@ UploadedMetadataResults writeMetadataInParallel( indexToUpload.forEach(indexMetadata -> { uploadTasks.put( indexMetadata.getIndex().getName(), - remoteIndexMetadataManager.getAsyncWriteRunnable( + remoteIndexMetadataManager.asyncWrite( indexMetadata.getIndex().getName(), new RemoteIndexMetadata( indexMetadata, @@ -662,7 +662,7 @@ UploadedMetadataResults writeMetadataInParallel( clusterStateCustomToUpload.forEach((key, value) -> { uploadTasks.put( key, - remoteClusterStateAttributesManager.getAsyncWriteRunnable( + remoteClusterStateAttributesManager.asyncWrite( CLUSTER_STATE_CUSTOM, new RemoteClusterStateCustoms( value, @@ -1030,7 +1030,7 @@ private ClusterState readClusterStateInParallel( for (UploadedIndexMetadata indexMetadata : indicesToRead) { asyncMetadataReadActions.add( - remoteIndexMetadataManager.getAsyncReadRunnable( + remoteIndexMetadataManager.asyncRead( indexMetadata.getIndexName(), new RemoteIndexMetadata( RemoteClusterStateUtils.getFormattedIndexFileName(indexMetadata.getUploadedFilename()), @@ -1066,7 +1066,7 @@ private ClusterState readClusterStateInParallel( for (Map.Entry entry : customToRead.entrySet()) { asyncMetadataReadActions.add( - remoteGlobalMetadataManager.getAsyncReadRunnable( + remoteGlobalMetadataManager.asyncRead( entry.getValue().getAttributeName(), new RemoteCustomMetadata( entry.getValue().getUploadedFilename(), @@ -1082,7 +1082,7 @@ private ClusterState readClusterStateInParallel( if (readCoordinationMetadata) { asyncMetadataReadActions.add( - remoteGlobalMetadataManager.getAsyncReadRunnable( + remoteGlobalMetadataManager.asyncRead( COORDINATION_METADATA, new RemoteCoordinationMetadata( manifest.getCoordinationMetadata().getUploadedFilename(), @@ -1097,7 +1097,7 @@ private ClusterState readClusterStateInParallel( if (readSettingsMetadata) { asyncMetadataReadActions.add( - remoteGlobalMetadataManager.getAsyncReadRunnable( + remoteGlobalMetadataManager.asyncRead( SETTING_METADATA, new RemotePersistentSettingsMetadata( manifest.getSettingsMetadata().getUploadedFilename(), @@ -1112,7 +1112,7 @@ private ClusterState readClusterStateInParallel( if (readTransientSettingsMetadata) { asyncMetadataReadActions.add( - remoteGlobalMetadataManager.getAsyncReadRunnable( + remoteGlobalMetadataManager.asyncRead( TRANSIENT_SETTING_METADATA, new RemoteTransientSettingsMetadata( manifest.getTransientSettingsMetadata().getUploadedFilename(), @@ -1127,7 +1127,7 @@ private ClusterState readClusterStateInParallel( if (readTemplatesMetadata) { asyncMetadataReadActions.add( - remoteGlobalMetadataManager.getAsyncReadRunnable( + remoteGlobalMetadataManager.asyncRead( TEMPLATES_METADATA, new RemoteTemplatesMetadata( manifest.getTemplatesMetadata().getUploadedFilename(), @@ -1142,7 +1142,7 @@ private ClusterState readClusterStateInParallel( if (readDiscoveryNodes) { asyncMetadataReadActions.add( - remoteClusterStateAttributesManager.getAsyncReadRunnable( + remoteClusterStateAttributesManager.asyncRead( DISCOVERY_NODES, new RemoteDiscoveryNodes( manifest.getDiscoveryNodesMetadata().getUploadedFilename(), @@ -1156,7 +1156,7 @@ private ClusterState readClusterStateInParallel( if (readClusterBlocks) { asyncMetadataReadActions.add( - remoteClusterStateAttributesManager.getAsyncReadRunnable( + remoteClusterStateAttributesManager.asyncRead( CLUSTER_BLOCKS, new RemoteClusterBlocks( manifest.getClusterBlocksMetadata().getUploadedFilename(), @@ -1170,7 +1170,7 @@ private ClusterState readClusterStateInParallel( if (readHashesOfConsistentSettings) { asyncMetadataReadActions.add( - remoteGlobalMetadataManager.getAsyncReadRunnable( + remoteGlobalMetadataManager.asyncRead( HASHES_OF_CONSISTENT_SETTINGS, new RemoteHashesOfConsistentSettings( manifest.getHashesOfConsistentSettings().getUploadedFilename(), @@ -1184,7 +1184,7 @@ private ClusterState readClusterStateInParallel( for (Map.Entry entry : clusterStateCustomToRead.entrySet()) { asyncMetadataReadActions.add( - remoteClusterStateAttributesManager.getAsyncReadRunnable( + remoteClusterStateAttributesManager.asyncRead( // pass component name as cluster-state-custom--, so that we can interpret it later String.join(CUSTOM_DELIMITER, CLUSTER_STATE_CUSTOM, entry.getKey()), new RemoteClusterStateCustoms( diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManager.java index 3c00c4cc2d94a..f227ef8ae20e2 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManager.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManager.java @@ -8,7 +8,6 @@ package org.opensearch.gateway.remote; -import org.opensearch.action.LatchedActionListener; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.DiffableUtils; import org.opensearch.cluster.DiffableUtils.NonDiffableValueSerializer; @@ -17,8 +16,8 @@ import org.opensearch.cluster.metadata.Metadata.Custom; import org.opensearch.cluster.metadata.Metadata.XContentContext; import org.opensearch.cluster.metadata.TemplatesMetadata; -import org.opensearch.common.remote.AbstractRemoteEntitiesManager; import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity; +import org.opensearch.common.remote.AbstractRemoteWritableEntityManager; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; @@ -54,7 +53,7 @@ * * @opensearch.internal */ -public class RemoteGlobalMetadataManager extends AbstractRemoteEntitiesManager { +public class RemoteGlobalMetadataManager extends AbstractRemoteWritableEntityManager { public static final TimeValue GLOBAL_METADATA_UPLOAD_TIMEOUT_DEFAULT = TimeValue.timeValueMillis(20000); @@ -161,11 +160,11 @@ public class RemoteGlobalMetadataManager extends AbstractRemoteEntitiesManager { protected ActionListener getWriteActionListener( String component, AbstractRemoteWritableBlobEntity remoteObject, - LatchedActionListener latchedActionListener + ActionListener listener ) { return ActionListener.wrap( - resp -> latchedActionListener.onResponse(remoteObject.getUploadedMetadata()), - ex -> latchedActionListener.onFailure(new RemoteStateTransferException("Upload failed for " + component, remoteObject, ex)) + resp -> listener.onResponse(remoteObject.getUploadedMetadata()), + ex -> listener.onFailure(new RemoteStateTransferException("Upload failed for " + component, remoteObject, ex)) ); } @@ -173,11 +172,11 @@ protected ActionListener getWriteActionListener( protected ActionListener getReadActionListener( String component, AbstractRemoteWritableBlobEntity remoteObject, - LatchedActionListener latchedActionListener + ActionListener listener ) { return ActionListener.wrap( - response -> latchedActionListener.onResponse(new RemoteReadResult(response, remoteObject.getType(), component)), - ex -> latchedActionListener.onFailure(new RemoteStateTransferException("Download failed for " + component, remoteObject, ex)) + response -> listener.onResponse(new RemoteReadResult(response, remoteObject.getType(), component)), + ex -> listener.onFailure(new RemoteStateTransferException("Download failed for " + component, remoteObject, ex)) ); } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteIndexMetadataManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteIndexMetadataManager.java index b4dce486128b9..a132ae1493686 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteIndexMetadataManager.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteIndexMetadataManager.java @@ -8,10 +8,9 @@ package org.opensearch.gateway.remote; -import org.opensearch.action.LatchedActionListener; import org.opensearch.cluster.metadata.IndexMetadata; -import org.opensearch.common.remote.AbstractRemoteEntitiesManager; import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity; +import org.opensearch.common.remote.AbstractRemoteWritableEntityManager; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.unit.TimeValue; @@ -33,7 +32,7 @@ * * @opensearch.internal */ -public class RemoteIndexMetadataManager extends AbstractRemoteEntitiesManager { +public class RemoteIndexMetadataManager extends AbstractRemoteWritableEntityManager { public static final TimeValue INDEX_METADATA_UPLOAD_TIMEOUT_DEFAULT = TimeValue.timeValueMillis(20000); @@ -108,11 +107,11 @@ private void setIndexMetadataUploadTimeout(TimeValue newIndexMetadataUploadTimeo protected ActionListener getWriteActionListener( String component, AbstractRemoteWritableBlobEntity remoteObject, - LatchedActionListener latchedActionListener + ActionListener listener ) { return ActionListener.wrap( - resp -> latchedActionListener.onResponse(remoteObject.getUploadedMetadata()), - ex -> latchedActionListener.onFailure(new RemoteStateTransferException("Upload failed for " + component, remoteObject, ex)) + resp -> listener.onResponse(remoteObject.getUploadedMetadata()), + ex -> listener.onFailure(new RemoteStateTransferException("Upload failed for " + component, remoteObject, ex)) ); } @@ -120,11 +119,11 @@ protected ActionListener getWriteActionListener( protected ActionListener getReadActionListener( String component, AbstractRemoteWritableBlobEntity remoteObject, - LatchedActionListener latchedActionListener + ActionListener listener ) { return ActionListener.wrap( - response -> latchedActionListener.onResponse(new RemoteReadResult(response, RemoteIndexMetadata.INDEX, component)), - ex -> latchedActionListener.onFailure(new RemoteStateTransferException("Download failed for " + component, remoteObject, ex)) + response -> listener.onResponse(new RemoteReadResult(response, RemoteIndexMetadata.INDEX, component)), + ex -> listener.onFailure(new RemoteStateTransferException("Download failed for " + component, remoteObject, ex)) ); } } diff --git a/server/src/test/java/org/opensearch/common/remote/AbstractRemoteEntitiesManagerTests.java b/server/src/test/java/org/opensearch/common/remote/AbstractRemoteWritableEntityManagerTests.java similarity index 78% rename from server/src/test/java/org/opensearch/common/remote/AbstractRemoteEntitiesManagerTests.java rename to server/src/test/java/org/opensearch/common/remote/AbstractRemoteWritableEntityManagerTests.java index 53ebf488bbe60..73d2bf2d58473 100644 --- a/server/src/test/java/org/opensearch/common/remote/AbstractRemoteEntitiesManagerTests.java +++ b/server/src/test/java/org/opensearch/common/remote/AbstractRemoteWritableEntityManagerTests.java @@ -8,7 +8,6 @@ package org.opensearch.common.remote; -import org.opensearch.action.LatchedActionListener; import org.opensearch.core.action.ActionListener; import org.opensearch.gateway.remote.ClusterMetadataManifest; import org.opensearch.gateway.remote.model.RemoteReadResult; @@ -19,9 +18,9 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -public class AbstractRemoteEntitiesManagerTests extends OpenSearchTestCase { +public class AbstractRemoteWritableEntityManagerTests extends OpenSearchTestCase { public void testGetStoreWithKnownEntityType() { - AbstractRemoteEntitiesManager manager = new ConcreteRemoteEntitiesManager(); + AbstractRemoteWritableEntityManager manager = new ConcreteRemoteWritableEntityManager(); String knownEntityType = "knownType"; RemoteWritableEntityStore mockStore = mock(RemoteWritableEntityStore.class); manager.remoteWritableEntityStores.put(knownEntityType, mockStore); @@ -34,7 +33,7 @@ public void testGetStoreWithKnownEntityType() { } public void testGetStoreWithUnknownEntityType() { - AbstractRemoteEntitiesManager manager = new ConcreteRemoteEntitiesManager(); + AbstractRemoteWritableEntityManager manager = new ConcreteRemoteWritableEntityManager(); String unknownEntityType = "unknownType"; AbstractRemoteWritableBlobEntity mockEntity = mock(AbstractRemoteWritableBlobEntity.class); when(mockEntity.getType()).thenReturn(unknownEntityType); @@ -43,12 +42,12 @@ public void testGetStoreWithUnknownEntityType() { verify(mockEntity, times(2)).getType(); } - private static class ConcreteRemoteEntitiesManager extends AbstractRemoteEntitiesManager { + private static class ConcreteRemoteWritableEntityManager extends AbstractRemoteWritableEntityManager { @Override protected ActionListener getWriteActionListener( String component, AbstractRemoteWritableBlobEntity remoteObject, - LatchedActionListener latchedActionListener + ActionListener listener ) { return null; } @@ -57,7 +56,7 @@ protected ActionListener getWriteActionListener( protected ActionListener getReadActionListener( String component, AbstractRemoteWritableBlobEntity remoteObject, - LatchedActionListener latchedActionListener + ActionListener listener ) { return null; } diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManagerTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManagerTests.java index afa5e5ed36161..055935035b003 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManagerTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManagerTests.java @@ -118,11 +118,8 @@ public void testGetAsyncWriteRunnable_DiscoveryNodes() throws IOException, Inter .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); final CountDownLatch latch = new CountDownLatch(1); final TestCapturingListener listener = new TestCapturingListener<>(); - remoteClusterStateAttributesManager.getAsyncWriteRunnable( - DISCOVERY_NODES, - remoteDiscoveryNodes, - new LatchedActionListener<>(listener, latch) - ).run(); + remoteClusterStateAttributesManager.asyncWrite(DISCOVERY_NODES, remoteDiscoveryNodes, new LatchedActionListener<>(listener, latch)) + .run(); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -150,11 +147,8 @@ public void testGetAsyncReadRunnable_DiscoveryNodes() throws IOException, Interr RemoteDiscoveryNodes remoteObjForDownload = new RemoteDiscoveryNodes(fileName, "cluster-uuid", compressor); CountDownLatch latch = new CountDownLatch(1); TestCapturingListener listener = new TestCapturingListener<>(); - remoteClusterStateAttributesManager.getAsyncReadRunnable( - DISCOVERY_NODES, - remoteObjForDownload, - new LatchedActionListener<>(listener, latch) - ).run(); + remoteClusterStateAttributesManager.asyncRead(DISCOVERY_NODES, remoteObjForDownload, new LatchedActionListener<>(listener, latch)) + .run(); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -176,11 +170,8 @@ public void testGetAsyncWriteRunnable_ClusterBlocks() throws IOException, Interr .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); final CountDownLatch latch = new CountDownLatch(1); final TestCapturingListener listener = new TestCapturingListener<>(); - remoteClusterStateAttributesManager.getAsyncWriteRunnable( - CLUSTER_BLOCKS, - remoteClusterBlocks, - new LatchedActionListener<>(listener, latch) - ).run(); + remoteClusterStateAttributesManager.asyncWrite(CLUSTER_BLOCKS, remoteClusterBlocks, new LatchedActionListener<>(listener, latch)) + .run(); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -209,11 +200,8 @@ public void testGetAsyncReadRunnable_ClusterBlocks() throws IOException, Interru CountDownLatch latch = new CountDownLatch(1); TestCapturingListener listener = new TestCapturingListener<>(); - remoteClusterStateAttributesManager.getAsyncReadRunnable( - CLUSTER_BLOCKS, - remoteClusterBlocks, - new LatchedActionListener<>(listener, latch) - ).run(); + remoteClusterStateAttributesManager.asyncRead(CLUSTER_BLOCKS, remoteClusterBlocks, new LatchedActionListener<>(listener, latch)) + .run(); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -244,7 +232,7 @@ public void testGetAsyncWriteRunnable_Custom() throws IOException, InterruptedEx .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); final TestCapturingListener listener = new TestCapturingListener<>(); final CountDownLatch latch = new CountDownLatch(1); - remoteClusterStateAttributesManager.getAsyncWriteRunnable( + remoteClusterStateAttributesManager.asyncWrite( CLUSTER_STATE_CUSTOM, remoteClusterStateCustoms, new LatchedActionListener<>(listener, latch) @@ -282,7 +270,7 @@ public void testGetAsyncReadRunnable_Custom() throws IOException, InterruptedExc ); TestCapturingListener capturingListener = new TestCapturingListener<>(); final CountDownLatch latch = new CountDownLatch(1); - remoteClusterStateAttributesManager.getAsyncReadRunnable( + remoteClusterStateAttributesManager.asyncRead( CLUSTER_STATE_CUSTOM, remoteClusterStateCustoms, new LatchedActionListener<>(capturingListener, latch) @@ -308,7 +296,7 @@ public void testGetAsyncWriteRunnable_Exception() throws IOException, Interrupte TestCapturingListener capturingListener = new TestCapturingListener<>(); final CountDownLatch latch = new CountDownLatch(1); - remoteClusterStateAttributesManager.getAsyncWriteRunnable( + remoteClusterStateAttributesManager.asyncWrite( DISCOVERY_NODES, remoteDiscoveryNodes, new LatchedActionListener<>(capturingListener, latch) @@ -326,7 +314,7 @@ public void testGetAsyncReadRunnable_Exception() throws IOException, Interrupted when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenThrow(ioException); CountDownLatch latch = new CountDownLatch(1); TestCapturingListener capturingListener = new TestCapturingListener<>(); - remoteClusterStateAttributesManager.getAsyncReadRunnable( + remoteClusterStateAttributesManager.asyncRead( DISCOVERY_NODES, remoteDiscoveryNodes, new LatchedActionListener<>(capturingListener, latch) diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManagerTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManagerTests.java index 2ada062e972bd..6cc03a6d90d91 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManagerTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManagerTests.java @@ -172,7 +172,7 @@ public void testGetAsyncReadRunnable_CoordinationMetadata() throws Exception { TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.getAsyncReadRunnable( + remoteGlobalMetadataManager.asyncRead( COORDINATION_METADATA, coordinationMetadataForDownload, new LatchedActionListener<>(listener, latch) @@ -202,7 +202,7 @@ public void testGetAsyncWriteRunnable_CoordinationMetadata() throws Exception { TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.getAsyncWriteRunnable( + remoteGlobalMetadataManager.asyncWrite( COORDINATION_METADATA, remoteCoordinationMetadata, new LatchedActionListener<>(listener, latch) @@ -242,8 +242,7 @@ public void testGetAsyncReadRunnable_PersistentSettings() throws Exception { TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.getAsyncReadRunnable(SETTING_METADATA, persistentSettings, new LatchedActionListener<>(listener, latch)) - .run(); + remoteGlobalMetadataManager.asyncRead(SETTING_METADATA, persistentSettings, new LatchedActionListener<>(listener, latch)).run(); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -268,11 +267,7 @@ public void testGetAsyncWriteRunnable_PersistentSettings() throws Exception { .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.getAsyncWriteRunnable( - SETTING_METADATA, - persistentSettings, - new LatchedActionListener<>(listener, latch) - ).run(); + remoteGlobalMetadataManager.asyncWrite(SETTING_METADATA, persistentSettings, new LatchedActionListener<>(listener, latch)).run(); latch.await(); assertNull(listener.getFailure()); @@ -309,11 +304,8 @@ public void testGetAsyncReadRunnable_TransientSettings() throws Exception { TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.getAsyncReadRunnable( - TRANSIENT_SETTING_METADATA, - transientSettings, - new LatchedActionListener<>(listener, latch) - ).run(); + remoteGlobalMetadataManager.asyncRead(TRANSIENT_SETTING_METADATA, transientSettings, new LatchedActionListener<>(listener, latch)) + .run(); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -338,11 +330,8 @@ public void testGetAsyncWriteRunnable_TransientSettings() throws Exception { .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.getAsyncWriteRunnable( - TRANSIENT_SETTING_METADATA, - transientSettings, - new LatchedActionListener<>(listener, latch) - ).run(); + remoteGlobalMetadataManager.asyncWrite(TRANSIENT_SETTING_METADATA, transientSettings, new LatchedActionListener<>(listener, latch)) + .run(); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -376,7 +365,7 @@ public void testGetAsyncReadRunnable_HashesOfConsistentSettings() throws Excepti TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.getAsyncReadRunnable( + remoteGlobalMetadataManager.asyncRead( HASHES_OF_CONSISTENT_SETTINGS, hashesOfConsistentSettingsForDownload, new LatchedActionListener<>(listener, latch) @@ -404,7 +393,7 @@ public void testGetAsyncWriteRunnable_HashesOfConsistentSettings() throws Except .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.getAsyncWriteRunnable( + remoteGlobalMetadataManager.asyncWrite( HASHES_OF_CONSISTENT_SETTINGS, hashesOfConsistentSettingsForUpload, new LatchedActionListener<>(listener, latch) @@ -442,7 +431,7 @@ public void testGetAsyncReadRunnable_TemplatesMetadata() throws Exception { ); TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.getAsyncReadRunnable( + remoteGlobalMetadataManager.asyncRead( TEMPLATES_METADATA, templatesMetadataForDownload, new LatchedActionListener<>(listener, latch) @@ -471,11 +460,8 @@ public void testGetAsyncWriteRunnable_TemplatesMetadata() throws Exception { .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.getAsyncWriteRunnable( - TEMPLATES_METADATA, - templateMetadataForUpload, - new LatchedActionListener<>(listener, latch) - ).run(); + remoteGlobalMetadataManager.asyncWrite(TEMPLATES_METADATA, templateMetadataForUpload, new LatchedActionListener<>(listener, latch)) + .run(); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -510,11 +496,8 @@ public void testGetAsyncReadRunnable_CustomMetadata() throws Exception { ); TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.getAsyncReadRunnable( - IndexGraveyard.TYPE, - customMetadataForDownload, - new LatchedActionListener<>(listener, latch) - ).run(); + remoteGlobalMetadataManager.asyncRead(IndexGraveyard.TYPE, customMetadataForDownload, new LatchedActionListener<>(listener, latch)) + .run(); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -540,7 +523,7 @@ public void testGetAsyncWriteRunnable_CustomMetadata() throws Exception { .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.getAsyncWriteRunnable( + remoteGlobalMetadataManager.asyncWrite( customMetadataForUpload.getType(), customMetadataForUpload, new LatchedActionListener<>(listener, latch) @@ -573,11 +556,8 @@ public void testGetAsyncReadRunnable_GlobalMetadata() throws Exception { ); TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.getAsyncReadRunnable( - GLOBAL_METADATA, - globalMetadataForDownload, - new LatchedActionListener<>(listener, latch) - ).run(); + remoteGlobalMetadataManager.asyncRead(GLOBAL_METADATA, globalMetadataForDownload, new LatchedActionListener<>(listener, latch)) + .run(); latch.await(); assertNull(listener.getFailure()); assertNotNull(listener.getResult()); @@ -598,7 +578,7 @@ public void testGetAsyncReadRunnable_IOException() throws Exception { when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenThrow(ioException); TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.getAsyncReadRunnable( + remoteGlobalMetadataManager.asyncRead( COORDINATION_METADATA, coordinationMetadataForDownload, new LatchedActionListener<>(listener, latch) @@ -628,7 +608,7 @@ public void testGetAsyncWriteRunnable_IOException() throws Exception { TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteGlobalMetadataManager.getAsyncWriteRunnable( + remoteGlobalMetadataManager.asyncWrite( COORDINATION_METADATA, remoteCoordinationMetadata, new LatchedActionListener<>(listener, latch) diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteIndexMetadataManagerTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteIndexMetadataManagerTests.java index c12306b956046..ae8deab7decb1 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteIndexMetadataManagerTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteIndexMetadataManagerTests.java @@ -98,7 +98,7 @@ public void testGetAsyncWriteRunnable_Success() throws Exception { return null; })).when(blobStoreTransferService).uploadBlob(any(), any(), any(), eq(WritePriority.URGENT), any(ActionListener.class)); - remoteIndexMetadataManager.getAsyncWriteRunnable( + remoteIndexMetadataManager.asyncWrite( INDEX, new RemoteIndexMetadata(indexMetadata, "cluster-uuid", compressor, null), new LatchedActionListener<>(listener, latch) @@ -130,7 +130,7 @@ public void testGetAsyncWriteRunnable_IOFailure() throws Exception { return null; })).when(blobStoreTransferService).uploadBlob(any(), any(), any(), eq(WritePriority.URGENT), any(ActionListener.class)); - remoteIndexMetadataManager.getAsyncWriteRunnable( + remoteIndexMetadataManager.asyncWrite( INDEX, new RemoteIndexMetadata(indexMetadata, "cluster-uuid", compressor, null), new LatchedActionListener<>(listener, latch) @@ -151,7 +151,7 @@ public void testGetAsyncReadRunnable_Success() throws Exception { TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteIndexMetadataManager.getAsyncReadRunnable( + remoteIndexMetadataManager.asyncRead( INDEX, new RemoteIndexMetadata(fileName, "cluster-uuid", compressor, null), new LatchedActionListener<>(listener, latch) @@ -170,7 +170,7 @@ public void testGetAsyncReadRunnable_IOFailure() throws Exception { TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); - remoteIndexMetadataManager.getAsyncReadRunnable( + remoteIndexMetadataManager.asyncRead( INDEX, new RemoteIndexMetadata(fileName, "cluster-uuid", compressor, null), new LatchedActionListener<>(listener, latch)