Skip to content

Commit

Permalink
Create interface RemoteEntitiesManager
Browse files Browse the repository at this point in the history
Signed-off-by: Shivansh Arora <[email protected]>
  • Loading branch information
shiv0408 committed Jul 5, 2024
1 parent 501a702 commit 4e731d0
Show file tree
Hide file tree
Showing 6 changed files with 206 additions and 170 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.remote;

import org.opensearch.action.LatchedActionListener;
import org.opensearch.common.CheckedRunnable;
import org.opensearch.core.action.ActionListener;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.gateway.remote.model.RemoteReadResult;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public abstract class AbstractRemoteEntitiesManager implements RemoteEntitiesManager {
protected final Map<String, RemoteWritableEntityStore> remoteWritableEntityStores = new HashMap<>();

protected RemoteWritableEntityStore getStore(AbstractRemoteWritableBlobEntity entity) {
RemoteWritableEntityStore remoteStore = remoteWritableEntityStores.get(entity.getType());
if (remoteStore == null) {
throw new IllegalArgumentException("Unknown entity type [" + entity.getType() + "]");
}
return remoteStore;
}

protected abstract ActionListener<Void> getWriteActionListener(
String component,
AbstractRemoteWritableBlobEntity remoteObject,
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener
);

protected abstract ActionListener<Object> getReadActionListener(
String component,
AbstractRemoteWritableBlobEntity remoteObject,
LatchedActionListener<RemoteReadResult> latchedActionListener
);

@Override
public CheckedRunnable<IOException> getAsyncWriteRunnable(
String component,
AbstractRemoteWritableBlobEntity entity,
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener
) {
return () -> getStore(entity).writeAsync(entity, getWriteActionListener(component, entity, latchedActionListener));
}

@Override
public CheckedRunnable<IOException> getAsyncReadRunnable(
String component,
AbstractRemoteWritableBlobEntity entity,
LatchedActionListener<RemoteReadResult> latchedActionListener
) {
return () -> getStore(entity).readAsync(entity, getReadActionListener(component, entity, latchedActionListener));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.remote;

import org.opensearch.action.LatchedActionListener;
import org.opensearch.common.CheckedRunnable;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata;
import org.opensearch.gateway.remote.model.RemoteReadResult;

import java.io.IOException;

public interface RemoteEntitiesManager {
CheckedRunnable<IOException> getAsyncReadRunnable(
String component,
AbstractRemoteWritableBlobEntity entity,
LatchedActionListener<RemoteReadResult> latchedActionListener
);

CheckedRunnable<IOException> getAsyncWriteRunnable(
String component,
AbstractRemoteWritableBlobEntity entity,
LatchedActionListener<UploadedMetadata> latchedActionListener
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.DiffableUtils;
import org.opensearch.cluster.DiffableUtils.NonDiffableValueSerializer;
import org.opensearch.common.CheckedRunnable;
import org.opensearch.common.remote.AbstractRemoteEntitiesManager;
import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity;
import org.opensearch.common.remote.RemoteWritableEntityStore;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.gateway.remote.model.RemoteClusterBlocks;
Expand All @@ -26,22 +25,19 @@
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

/**
* A Manager which provides APIs to upload and download attributes of ClusterState to the {@link RemoteClusterStateBlobStore}
*
* @opensearch.internal
*/
public class RemoteClusterStateAttributesManager {
public class RemoteClusterStateAttributesManager extends AbstractRemoteEntitiesManager {
public static final String CLUSTER_STATE_ATTRIBUTE = "cluster_state_attribute";
public static final String DISCOVERY_NODES = "nodes";
public static final String CLUSTER_BLOCKS = "blocks";
public static final int CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION = 1;
private final Map<String, RemoteWritableEntityStore> remoteWritableEntityStores;
private final NamedWriteableRegistry namedWriteableRegistry;

RemoteClusterStateAttributesManager(
Expand All @@ -52,7 +48,6 @@ public class RemoteClusterStateAttributesManager {
ThreadPool threadpool
) {
this.namedWriteableRegistry = namedWriteableRegistry;
this.remoteWritableEntityStores = new HashMap<>();
this.remoteWritableEntityStores.put(
RemoteDiscoveryNodes.DISCOVERY_NODES,
new RemoteClusterStateBlobStore<>(
Expand Down Expand Up @@ -85,46 +80,28 @@ public class RemoteClusterStateAttributesManager {
);
}

/**
* Allows async upload of Cluster State Attribute components to remote
*/
CheckedRunnable<IOException> getAsyncMetadataWriteAction(
String component,
AbstractRemoteWritableBlobEntity blobEntity,
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener
) {
return () -> getStore(blobEntity).writeAsync(blobEntity, getActionListener(component, blobEntity, latchedActionListener));
}

private ActionListener<Void> getActionListener(
@Override
protected ActionListener<Void> getWriteActionListener(
String component,
AbstractRemoteWritableBlobEntity remoteObject,
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener
) {
return ActionListener.wrap(
resp -> latchedActionListener.onResponse(remoteObject.getUploadedMetadata()),
ex -> latchedActionListener.onFailure(new RemoteStateTransferException(component, remoteObject, ex))
ex -> latchedActionListener.onFailure(new RemoteStateTransferException("Upload failed for " + component, remoteObject, ex))
);
}

private RemoteWritableEntityStore getStore(AbstractRemoteWritableBlobEntity entity) {
RemoteWritableEntityStore remoteStore = remoteWritableEntityStores.get(entity.getType());
if (remoteStore == null) {
throw new IllegalArgumentException("Unknown entity type [" + entity.getType() + "]");
}
return remoteStore;
}

public CheckedRunnable<IOException> getAsyncMetadataReadAction(
@Override
protected ActionListener<Object> getReadActionListener(
String component,
AbstractRemoteWritableBlobEntity blobEntity,
LatchedActionListener<RemoteReadResult> listener
AbstractRemoteWritableBlobEntity remoteObject,
LatchedActionListener<RemoteReadResult> latchedActionListener
) {
final ActionListener actionListener = ActionListener.wrap(
response -> listener.onResponse(new RemoteReadResult(response, CLUSTER_STATE_ATTRIBUTE, component)),
listener::onFailure
return ActionListener.wrap(
response -> latchedActionListener.onResponse(new RemoteReadResult(response, CLUSTER_STATE_ATTRIBUTE, component)),
ex -> latchedActionListener.onFailure(new RemoteStateTransferException("Download failed for " + component, remoteObject, ex))
);
return () -> getStore(blobEntity).readAsync(blobEntity, actionListener);
}

public DiffableUtils.MapDiff<String, ClusterState.Custom, Map<String, ClusterState.Custom>> getUpdatedCustoms(
Expand Down Expand Up @@ -158,4 +135,5 @@ public DiffableUtils.MapDiff<String, ClusterState.Custom, Map<String, ClusterSta
NonDiffableValueSerializer.getAbstractInstance()
);
}

}
Loading

0 comments on commit 4e731d0

Please sign in to comment.