Skip to content

Commit

Permalink
Address PR comments
Browse files Browse the repository at this point in the history
Signed-off-by: Shivansh Arora <[email protected]>
  • Loading branch information
shiv0408 committed Jul 15, 2024
1 parent 7017bb6 commit f469906
Show file tree
Hide file tree
Showing 10 changed files with 337 additions and 409 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import org.opensearch.cluster.DiffableUtils;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.common.CheckedRunnable;
import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobPath;
Expand Down Expand Up @@ -150,14 +149,14 @@ public DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRouting
}

/**
* Create async action for writing one {@code IndexRoutingTable} to remote store
* @param clusterState current cluster state
* @param indexRouting indexRoutingTable to write to remote store
* Async action for writing one {@code IndexRoutingTable} to remote store
*
* @param clusterState current cluster state
* @param indexRouting indexRoutingTable to write to remote store
* @param latchedActionListener listener for handling async action response
* @param clusterBasePath base path for remote file
* @return returns runnable async action
* @param clusterBasePath base path for remote file
*/
public CheckedRunnable<IOException> getIndexRoutingAsyncAction(
public void writeAsync(
ClusterState clusterState,
IndexRoutingTable indexRouting,
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener,
Expand Down Expand Up @@ -187,7 +186,7 @@ public CheckedRunnable<IOException> getIndexRoutingAsyncAction(
)
);

return () -> uploadIndex(indexRouting, fileName, blobContainer, completionListener);
uploadIndex(indexRouting, fileName, blobContainer, completionListener);
}

/**
Expand Down Expand Up @@ -274,7 +273,7 @@ private void uploadIndex(
}

@Override
public CheckedRunnable<IOException> getAsyncIndexRoutingReadAction(
public void getAsyncIndexRoutingReadAction(
String uploadedFilename,
Index index,
LatchedActionListener<IndexRoutingTable> latchedActionListener
Expand All @@ -284,7 +283,7 @@ public CheckedRunnable<IOException> getAsyncIndexRoutingReadAction(
BlobContainer blobContainer = blobStoreRepository.blobStore()
.blobContainer(BlobPath.cleanPath().add(uploadedFilename.substring(0, idx)));

return () -> readAsync(
readAsync(
blobContainer,
blobFileName,
index,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.opensearch.cluster.DiffableUtils;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.common.CheckedRunnable;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.core.index.Index;
Expand Down Expand Up @@ -42,14 +41,13 @@ public DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRouting
}

@Override
public CheckedRunnable<IOException> getIndexRoutingAsyncAction(
public void writeAsync(
ClusterState clusterState,
IndexRoutingTable indexRouting,
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener,
BlobPath clusterBasePath
) {
// noop
return () -> {};
}

@Override
Expand All @@ -63,13 +61,12 @@ public List<ClusterMetadataManifest.UploadedIndexMetadata> getAllUploadedIndices
}

@Override
public CheckedRunnable<IOException> getAsyncIndexRoutingReadAction(
public void getAsyncIndexRoutingReadAction(
String uploadedFilename,
Index index,
LatchedActionListener<IndexRoutingTable> latchedActionListener
) {
// noop
return () -> {};
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.opensearch.cluster.DiffableUtils;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.common.CheckedRunnable;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.lifecycle.LifecycleComponent;
import org.opensearch.core.common.io.stream.StreamInput;
Expand Down Expand Up @@ -46,7 +45,7 @@ public IndexRoutingTable read(StreamInput in, String key) throws IOException {

List<IndexRoutingTable> getIndicesRouting(RoutingTable routingTable);

CheckedRunnable<IOException> getAsyncIndexRoutingReadAction(
void getAsyncIndexRoutingReadAction(
String uploadedFilename,
Index index,
LatchedActionListener<IndexRoutingTable> latchedActionListener
Expand All @@ -62,7 +61,7 @@ DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRoutingTable>>
RoutingTable after
);

CheckedRunnable<IOException> getIndexRoutingAsyncAction(
void writeAsync(
ClusterState clusterState,
IndexRoutingTable indexRouting,
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,10 @@

package org.opensearch.common.remote;

import org.opensearch.common.CheckedRunnable;
import org.opensearch.core.action.ActionListener;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.gateway.remote.model.RemoteReadResult;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

Expand Down Expand Up @@ -71,20 +69,16 @@ protected abstract ActionListener<Object> getReadActionListener(
);

@Override
public CheckedRunnable<IOException> asyncWrite(
public void writeAsync(
String component,
AbstractRemoteWritableBlobEntity entity,
ActionListener<ClusterMetadataManifest.UploadedMetadata> listener
) {
return () -> getStore(entity).writeAsync(entity, getWriteActionListener(component, entity, listener));
getStore(entity).writeAsync(entity, getWriteActionListener(component, entity, listener));
}

@Override
public CheckedRunnable<IOException> asyncRead(
String component,
AbstractRemoteWritableBlobEntity entity,
ActionListener<RemoteReadResult> listener
) {
return () -> getStore(entity).readAsync(entity, getReadActionListener(component, entity, listener));
public void readAsync(String component, AbstractRemoteWritableBlobEntity entity, ActionListener<RemoteReadResult> listener) {
getStore(entity).readAsync(entity, getReadActionListener(component, entity, listener));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,43 +8,40 @@

package org.opensearch.common.remote;

import org.opensearch.common.CheckedRunnable;
import org.opensearch.core.action.ActionListener;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata;
import org.opensearch.gateway.remote.model.RemoteReadResult;

import java.io.IOException;

/**
* The RemoteWritableEntityManager interface provides async read and write methods for managing remote entities in the remote store
*/
public interface RemoteWritableEntityManager {

/**
* Returns a CheckedRunnable that performs an asynchronous read operation for the specified component and entity.
* Performs an asynchronous read operation for the specified component and entity.
*
* @param component the component for which the read operation is performed
* @param entity the entity to be read
* @param listener the listener to be notified when the read operation completes
* @return a CheckedRunnable that performs the asynchronous read operation
* @param listener the listener to be notified when the read operation completes.
* The listener's {@link ActionListener#onResponse(Object)} method
* is called with a {@link RemoteReadResult} object containing the
* read data on successful read. The
* {@link ActionListener#onFailure(Exception)} method is called with
* an exception if the read operation fails.
*/
CheckedRunnable<IOException> asyncRead(
String component,
AbstractRemoteWritableBlobEntity entity,
ActionListener<RemoteReadResult> listener
);
void readAsync(String component, AbstractRemoteWritableBlobEntity entity, ActionListener<RemoteReadResult> listener);

/**
* Returns a CheckedRunnable that performs an asynchronous write operation for the specified component and entity.
* Performs an asynchronous write operation for the specified component and entity.
*
* @param component the component for which the write operation is performed
* @param entity the entity to be written
* @param listener the listener to be notified when the write operation completes
* @return a CheckedRunnable that performs the asynchronous write operation
* @param listener the listener to be notified when the write operation completes.
* The listener's {@link ActionListener#onResponse(Object)} method
* is called with a {@link UploadedMetadata} object containing the
* uploaded metadata on successful write. The
* {@link ActionListener#onFailure(Exception)} method is called with
* an exception if the write operation fails.
*/
CheckedRunnable<IOException> asyncWrite(
String component,
AbstractRemoteWritableBlobEntity entity,
ActionListener<UploadedMetadata> listener
);
void writeAsync(String component, AbstractRemoteWritableBlobEntity entity, ActionListener<UploadedMetadata> listener);
}
Loading

0 comments on commit f469906

Please sign in to comment.