Skip to content

Commit

Permalink
Rename methods to suggested ones in review
Browse files Browse the repository at this point in the history
Signed-off-by: Shivansh Arora <[email protected]>
  • Loading branch information
shiv0408 committed Jul 10, 2024
1 parent bf30f0c commit 7017bb6
Show file tree
Hide file tree
Showing 10 changed files with 106 additions and 145 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.opensearch.common.remote;

import org.opensearch.action.LatchedActionListener;
import org.opensearch.common.CheckedRunnable;
import org.opensearch.core.action.ActionListener;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
Expand All @@ -21,7 +20,7 @@
/**
* An abstract class that provides a base implementation for managing remote entities in the remote store.
*/
public abstract class AbstractRemoteEntitiesManager implements RemoteEntitiesManager {
public abstract class AbstractRemoteWritableEntityManager implements RemoteWritableEntityManager {
/**
* A map that stores the remote writable entity stores, keyed by the entity type.
*/
Expand All @@ -47,13 +46,13 @@ protected RemoteWritableEntityStore getStore(AbstractRemoteWritableBlobEntity en
*
* @param component the component for which the write operation is performed
* @param remoteObject the remote object to be written
* @param latchedActionListener the latched action listener to be notified when the write operation completes
* @param listener the listener to be notified when the write operation completes
* @return an ActionListener for handling the write operation
*/
protected abstract ActionListener<Void> getWriteActionListener(
String component,
AbstractRemoteWritableBlobEntity remoteObject,
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener
ActionListener<ClusterMetadataManifest.UploadedMetadata> listener
);

/**
Expand All @@ -62,30 +61,30 @@ protected abstract ActionListener<Void> getWriteActionListener(
*
* @param component the component for which the read operation is performed
* @param remoteObject the remote object to be read
* @param latchedActionListener the latched action listener to be notified when the read operation completes
* @param listener the listener to be notified when the read operation completes
* @return an ActionListener for handling the read operation
*/
protected abstract ActionListener<Object> getReadActionListener(
String component,
AbstractRemoteWritableBlobEntity remoteObject,
LatchedActionListener<RemoteReadResult> latchedActionListener
ActionListener<RemoteReadResult> listener
);

@Override
public CheckedRunnable<IOException> getAsyncWriteRunnable(
public CheckedRunnable<IOException> asyncWrite(
String component,
AbstractRemoteWritableBlobEntity entity,
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener
ActionListener<ClusterMetadataManifest.UploadedMetadata> listener
) {
return () -> getStore(entity).writeAsync(entity, getWriteActionListener(component, entity, latchedActionListener));
return () -> getStore(entity).writeAsync(entity, getWriteActionListener(component, entity, listener));
}

@Override
public CheckedRunnable<IOException> getAsyncReadRunnable(
public CheckedRunnable<IOException> asyncRead(
String component,
AbstractRemoteWritableBlobEntity entity,
LatchedActionListener<RemoteReadResult> latchedActionListener
ActionListener<RemoteReadResult> listener
) {
return () -> getStore(entity).readAsync(entity, getReadActionListener(component, entity, latchedActionListener));
return () -> getStore(entity).readAsync(entity, getReadActionListener(component, entity, listener));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,43 +8,43 @@

package org.opensearch.common.remote;

import org.opensearch.action.LatchedActionListener;
import org.opensearch.common.CheckedRunnable;
import org.opensearch.core.action.ActionListener;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata;
import org.opensearch.gateway.remote.model.RemoteReadResult;

import java.io.IOException;

/**
* The RemoteEntitiesManager interface provides async read and write methods for managing remote entities in the remote store
* The RemoteWritableEntityManager interface provides async read and write methods for managing remote entities in the remote store
*/
public interface RemoteEntitiesManager {
public interface RemoteWritableEntityManager {

/**
* Returns a CheckedRunnable that performs an asynchronous read operation for the specified component and entity.
*
* @param component the component for which the read operation is performed
* @param entity the entity to be read
* @param latchedActionListener the listener to be notified when the read operation completes
* @param listener the listener to be notified when the read operation completes
* @return a CheckedRunnable that performs the asynchronous read operation
*/
CheckedRunnable<IOException> getAsyncReadRunnable(
CheckedRunnable<IOException> asyncRead(
String component,
AbstractRemoteWritableBlobEntity entity,
LatchedActionListener<RemoteReadResult> latchedActionListener
ActionListener<RemoteReadResult> listener
);

/**
* Returns a CheckedRunnable that performs an asynchronous write operation for the specified component and entity.
*
* @param component the component for which the write operation is performed
* @param entity the entity to be written
* @param latchedActionListener the listener to be notified when the write operation completes
* @param listener the listener to be notified when the write operation completes
* @return a CheckedRunnable that performs the asynchronous write operation
*/
CheckedRunnable<IOException> getAsyncWriteRunnable(
CheckedRunnable<IOException> asyncWrite(
String component,
AbstractRemoteWritableBlobEntity entity,
LatchedActionListener<UploadedMetadata> latchedActionListener
ActionListener<UploadedMetadata> listener
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@

package org.opensearch.gateway.remote;

import org.opensearch.action.LatchedActionListener;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.DiffableUtils;
import org.opensearch.cluster.DiffableUtils.NonDiffableValueSerializer;
import org.opensearch.common.remote.AbstractRemoteEntitiesManager;
import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity;
import org.opensearch.common.remote.AbstractRemoteWritableEntityManager;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.gateway.remote.model.RemoteClusterBlocks;
Expand All @@ -33,12 +32,11 @@
*
* @opensearch.internal
*/
public class RemoteClusterStateAttributesManager extends AbstractRemoteEntitiesManager {
public class RemoteClusterStateAttributesManager extends AbstractRemoteWritableEntityManager {
public static final String CLUSTER_STATE_ATTRIBUTE = "cluster_state_attribute";
public static final String DISCOVERY_NODES = "nodes";
public static final String CLUSTER_BLOCKS = "blocks";
public static final int CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION = 1;
private final NamedWriteableRegistry namedWriteableRegistry;

RemoteClusterStateAttributesManager(
String clusterName,
Expand All @@ -47,7 +45,6 @@ public class RemoteClusterStateAttributesManager extends AbstractRemoteEntitiesM
NamedWriteableRegistry namedWriteableRegistry,
ThreadPool threadpool
) {
this.namedWriteableRegistry = namedWriteableRegistry;
this.remoteWritableEntityStores.put(
RemoteDiscoveryNodes.DISCOVERY_NODES,
new RemoteClusterStateBlobStore<>(
Expand Down Expand Up @@ -84,23 +81,23 @@ public class RemoteClusterStateAttributesManager extends AbstractRemoteEntitiesM
protected ActionListener<Void> getWriteActionListener(
String component,
AbstractRemoteWritableBlobEntity remoteObject,
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener
ActionListener<ClusterMetadataManifest.UploadedMetadata> listener
) {
return ActionListener.wrap(
resp -> latchedActionListener.onResponse(remoteObject.getUploadedMetadata()),
ex -> latchedActionListener.onFailure(new RemoteStateTransferException("Upload failed for " + component, remoteObject, ex))
resp -> listener.onResponse(remoteObject.getUploadedMetadata()),
ex -> listener.onFailure(new RemoteStateTransferException("Upload failed for " + component, remoteObject, ex))
);
}

@Override
protected ActionListener<Object> getReadActionListener(
String component,
AbstractRemoteWritableBlobEntity remoteObject,
LatchedActionListener<RemoteReadResult> latchedActionListener
ActionListener<RemoteReadResult> listener
) {
return ActionListener.wrap(
response -> latchedActionListener.onResponse(new RemoteReadResult(response, CLUSTER_STATE_ATTRIBUTE, component)),
ex -> latchedActionListener.onFailure(new RemoteStateTransferException("Download failed for " + component, remoteObject, ex))
response -> listener.onResponse(new RemoteReadResult(response, CLUSTER_STATE_ATTRIBUTE, component)),
ex -> listener.onFailure(new RemoteStateTransferException("Download failed for " + component, remoteObject, ex))
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ UploadedMetadataResults writeMetadataInParallel(
if (uploadSettingsMetadata) {
uploadTasks.put(
SETTING_METADATA,
remoteGlobalMetadataManager.getAsyncWriteRunnable(
remoteGlobalMetadataManager.asyncWrite(
SETTING_METADATA,
new RemotePersistentSettingsMetadata(
clusterState.metadata().persistentSettings(),
Expand All @@ -535,7 +535,7 @@ UploadedMetadataResults writeMetadataInParallel(
if (uploadTransientSettingMetadata) {
uploadTasks.put(
TRANSIENT_SETTING_METADATA,
remoteGlobalMetadataManager.getAsyncWriteRunnable(
remoteGlobalMetadataManager.asyncWrite(
TRANSIENT_SETTING_METADATA,
new RemoteTransientSettingsMetadata(
clusterState.metadata().transientSettings(),
Expand All @@ -551,7 +551,7 @@ UploadedMetadataResults writeMetadataInParallel(
if (uploadCoordinationMetadata) {
uploadTasks.put(
COORDINATION_METADATA,
remoteGlobalMetadataManager.getAsyncWriteRunnable(
remoteGlobalMetadataManager.asyncWrite(
COORDINATION_METADATA,
new RemoteCoordinationMetadata(
clusterState.metadata().coordinationMetadata(),
Expand All @@ -567,7 +567,7 @@ UploadedMetadataResults writeMetadataInParallel(
if (uploadTemplateMetadata) {
uploadTasks.put(
TEMPLATES_METADATA,
remoteGlobalMetadataManager.getAsyncWriteRunnable(
remoteGlobalMetadataManager.asyncWrite(
TEMPLATES_METADATA,
new RemoteTemplatesMetadata(
clusterState.metadata().templatesMetadata(),
Expand All @@ -583,7 +583,7 @@ UploadedMetadataResults writeMetadataInParallel(
if (uploadDiscoveryNodes) {
uploadTasks.put(
DISCOVERY_NODES,
remoteClusterStateAttributesManager.getAsyncWriteRunnable(
remoteClusterStateAttributesManager.asyncWrite(
RemoteDiscoveryNodes.DISCOVERY_NODES,
new RemoteDiscoveryNodes(
clusterState.nodes(),
Expand All @@ -598,7 +598,7 @@ UploadedMetadataResults writeMetadataInParallel(
if (uploadClusterBlock) {
uploadTasks.put(
CLUSTER_BLOCKS,
remoteClusterStateAttributesManager.getAsyncWriteRunnable(
remoteClusterStateAttributesManager.asyncWrite(
RemoteClusterBlocks.CLUSTER_BLOCKS,
new RemoteClusterBlocks(
clusterState.blocks(),
Expand All @@ -613,7 +613,7 @@ UploadedMetadataResults writeMetadataInParallel(
if (uploadHashesOfConsistentSettings) {
uploadTasks.put(
HASHES_OF_CONSISTENT_SETTINGS,
remoteGlobalMetadataManager.getAsyncWriteRunnable(
remoteGlobalMetadataManager.asyncWrite(
HASHES_OF_CONSISTENT_SETTINGS,
new RemoteHashesOfConsistentSettings(
(DiffableStringMap) clusterState.metadata().hashesOfConsistentSettings(),
Expand All @@ -629,7 +629,7 @@ UploadedMetadataResults writeMetadataInParallel(
String customComponent = String.join(CUSTOM_DELIMITER, CUSTOM_METADATA, key);
uploadTasks.put(
customComponent,
remoteGlobalMetadataManager.getAsyncWriteRunnable(
remoteGlobalMetadataManager.asyncWrite(
customComponent,
new RemoteCustomMetadata(
value,
Expand All @@ -646,7 +646,7 @@ UploadedMetadataResults writeMetadataInParallel(
indexToUpload.forEach(indexMetadata -> {
uploadTasks.put(
indexMetadata.getIndex().getName(),
remoteIndexMetadataManager.getAsyncWriteRunnable(
remoteIndexMetadataManager.asyncWrite(
indexMetadata.getIndex().getName(),
new RemoteIndexMetadata(
indexMetadata,
Expand All @@ -662,7 +662,7 @@ UploadedMetadataResults writeMetadataInParallel(
clusterStateCustomToUpload.forEach((key, value) -> {
uploadTasks.put(
key,
remoteClusterStateAttributesManager.getAsyncWriteRunnable(
remoteClusterStateAttributesManager.asyncWrite(
CLUSTER_STATE_CUSTOM,
new RemoteClusterStateCustoms(
value,
Expand Down Expand Up @@ -1030,7 +1030,7 @@ private ClusterState readClusterStateInParallel(

for (UploadedIndexMetadata indexMetadata : indicesToRead) {
asyncMetadataReadActions.add(
remoteIndexMetadataManager.getAsyncReadRunnable(
remoteIndexMetadataManager.asyncRead(
indexMetadata.getIndexName(),
new RemoteIndexMetadata(
RemoteClusterStateUtils.getFormattedIndexFileName(indexMetadata.getUploadedFilename()),
Expand Down Expand Up @@ -1066,7 +1066,7 @@ private ClusterState readClusterStateInParallel(

for (Map.Entry<String, UploadedMetadataAttribute> entry : customToRead.entrySet()) {
asyncMetadataReadActions.add(
remoteGlobalMetadataManager.getAsyncReadRunnable(
remoteGlobalMetadataManager.asyncRead(
entry.getValue().getAttributeName(),
new RemoteCustomMetadata(
entry.getValue().getUploadedFilename(),
Expand All @@ -1082,7 +1082,7 @@ private ClusterState readClusterStateInParallel(

if (readCoordinationMetadata) {
asyncMetadataReadActions.add(
remoteGlobalMetadataManager.getAsyncReadRunnable(
remoteGlobalMetadataManager.asyncRead(
COORDINATION_METADATA,
new RemoteCoordinationMetadata(
manifest.getCoordinationMetadata().getUploadedFilename(),
Expand All @@ -1097,7 +1097,7 @@ private ClusterState readClusterStateInParallel(

if (readSettingsMetadata) {
asyncMetadataReadActions.add(
remoteGlobalMetadataManager.getAsyncReadRunnable(
remoteGlobalMetadataManager.asyncRead(
SETTING_METADATA,
new RemotePersistentSettingsMetadata(
manifest.getSettingsMetadata().getUploadedFilename(),
Expand All @@ -1112,7 +1112,7 @@ private ClusterState readClusterStateInParallel(

if (readTransientSettingsMetadata) {
asyncMetadataReadActions.add(
remoteGlobalMetadataManager.getAsyncReadRunnable(
remoteGlobalMetadataManager.asyncRead(
TRANSIENT_SETTING_METADATA,
new RemoteTransientSettingsMetadata(
manifest.getTransientSettingsMetadata().getUploadedFilename(),
Expand All @@ -1127,7 +1127,7 @@ private ClusterState readClusterStateInParallel(

if (readTemplatesMetadata) {
asyncMetadataReadActions.add(
remoteGlobalMetadataManager.getAsyncReadRunnable(
remoteGlobalMetadataManager.asyncRead(
TEMPLATES_METADATA,
new RemoteTemplatesMetadata(
manifest.getTemplatesMetadata().getUploadedFilename(),
Expand All @@ -1142,7 +1142,7 @@ private ClusterState readClusterStateInParallel(

if (readDiscoveryNodes) {
asyncMetadataReadActions.add(
remoteClusterStateAttributesManager.getAsyncReadRunnable(
remoteClusterStateAttributesManager.asyncRead(
DISCOVERY_NODES,
new RemoteDiscoveryNodes(
manifest.getDiscoveryNodesMetadata().getUploadedFilename(),
Expand All @@ -1156,7 +1156,7 @@ private ClusterState readClusterStateInParallel(

if (readClusterBlocks) {
asyncMetadataReadActions.add(
remoteClusterStateAttributesManager.getAsyncReadRunnable(
remoteClusterStateAttributesManager.asyncRead(
CLUSTER_BLOCKS,
new RemoteClusterBlocks(
manifest.getClusterBlocksMetadata().getUploadedFilename(),
Expand All @@ -1170,7 +1170,7 @@ private ClusterState readClusterStateInParallel(

if (readHashesOfConsistentSettings) {
asyncMetadataReadActions.add(
remoteGlobalMetadataManager.getAsyncReadRunnable(
remoteGlobalMetadataManager.asyncRead(
HASHES_OF_CONSISTENT_SETTINGS,
new RemoteHashesOfConsistentSettings(
manifest.getHashesOfConsistentSettings().getUploadedFilename(),
Expand All @@ -1184,7 +1184,7 @@ private ClusterState readClusterStateInParallel(

for (Map.Entry<String, UploadedMetadataAttribute> entry : clusterStateCustomToRead.entrySet()) {
asyncMetadataReadActions.add(
remoteClusterStateAttributesManager.getAsyncReadRunnable(
remoteClusterStateAttributesManager.asyncRead(
// pass component name as cluster-state-custom--<custom_name>, so that we can interpret it later
String.join(CUSTOM_DELIMITER, CLUSTER_STATE_CUSTOM, entry.getKey()),
new RemoteClusterStateCustoms(
Expand Down
Loading

0 comments on commit 7017bb6

Please sign in to comment.