Skip to content

Commit

Permalink
[Backport 2.x] Refactor remote-routing-table service inline with remo…
Browse files Browse the repository at this point in the history
…te state interfaces (#14668) (#14862)

Signed-off-by: Arpit Bandejiya <[email protected]>
  • Loading branch information
Arpit-Bandejiya authored Jul 22, 2024
1 parent 3b0ea65 commit 61ca56e
Show file tree
Hide file tree
Showing 19 changed files with 788 additions and 673 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add prefix mode verification setting for repository verification (([#14790](https://github.com/opensearch-project/OpenSearch/pull/14790)))
- Add SplitResponseProcessor to Search Pipelines (([#14800](https://github.com/opensearch-project/OpenSearch/issues/14800)))
- Optimize TransportNodesAction to not send DiscoveryNodes for NodeStats, NodesInfo and ClusterStats call ([14749](https://github.com/opensearch-project/OpenSearch/pull/14749))
- Refactor remote-routing-table service inline with remote state interfaces([#14668](https://github.com/opensearch-project/OpenSearch/pull/14668))

### Dependencies
- Update to Apache Lucene 9.11.1 ([#14042](https://github.com/opensearch-project/OpenSearch/pull/14042), [#14576](https://github.com/opensearch-project/OpenSearch/pull/14576))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,34 +11,23 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.IndexInput;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.DiffableUtils;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.stream.write.WritePriority;
import org.opensearch.common.blobstore.transfer.RemoteTransferContainer;
import org.opensearch.common.blobstore.transfer.stream.OffsetRangeIndexInputStream;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.common.lucene.store.ByteArrayIndexInput;
import org.opensearch.common.remote.RemoteWritableEntityStore;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.index.Index;
import org.opensearch.core.compress.Compressor;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.gateway.remote.RemoteStateTransferException;
import org.opensearch.gateway.remote.model.RemoteRoutingTableBlobStore;
import org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable;
import org.opensearch.index.remote.RemoteStoreEnums;
import org.opensearch.index.remote.RemoteStorePathStrategy;
import org.opensearch.index.remote.RemoteStoreUtils;
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
import org.opensearch.node.Node;
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;
import org.opensearch.repositories.RepositoriesService;
Expand All @@ -51,12 +40,10 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled;

/**
Expand All @@ -66,64 +53,29 @@
*/
public class InternalRemoteRoutingTableService extends AbstractLifecycleComponent implements RemoteRoutingTableService {

/**
* This setting is used to set the remote routing table store blob store path type strategy.
*/
public static final Setting<RemoteStoreEnums.PathType> REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING = new Setting<>(
"cluster.remote_store.routing_table.path_type",
RemoteStoreEnums.PathType.HASHED_PREFIX.toString(),
RemoteStoreEnums.PathType::parseString,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

/**
* This setting is used to set the remote routing table store blob store path hash algorithm strategy.
* This setting will come to effect if the {@link #REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING}
* is either {@code HASHED_PREFIX} or {@code HASHED_INFIX}.
*/
public static final Setting<RemoteStoreEnums.PathHashAlgorithm> REMOTE_ROUTING_TABLE_PATH_HASH_ALGO_SETTING = new Setting<>(
"cluster.remote_store.routing_table.path_hash_algo",
RemoteStoreEnums.PathHashAlgorithm.FNV_1A_BASE64.toString(),
RemoteStoreEnums.PathHashAlgorithm::parseString,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

public static final String INDEX_ROUTING_PATH_TOKEN = "index-routing";
public static final String INDEX_ROUTING_FILE_PREFIX = "index_routing";
public static final String INDEX_ROUTING_METADATA_PREFIX = "indexRouting--";

private static final Logger logger = LogManager.getLogger(InternalRemoteRoutingTableService.class);
private final Settings settings;
private final Supplier<RepositoriesService> repositoriesService;
private Compressor compressor;
private RemoteWritableEntityStore<IndexRoutingTable, RemoteIndexRoutingTable> remoteIndexRoutingTableStore;
private final ClusterSettings clusterSettings;
private BlobStoreRepository blobStoreRepository;
private RemoteStoreEnums.PathType pathType;
private RemoteStoreEnums.PathHashAlgorithm pathHashAlgo;
private ThreadPool threadPool;
private final ThreadPool threadPool;
private final String clusterName;

public InternalRemoteRoutingTableService(
Supplier<RepositoriesService> repositoriesService,
Settings settings,
ClusterSettings clusterSettings,
ThreadPool threadpool
ThreadPool threadpool,
String clusterName
) {
assert isRemoteRoutingTableEnabled(settings) : "Remote routing table is not enabled";
this.repositoriesService = repositoriesService;
this.settings = settings;
this.pathType = clusterSettings.get(REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING);
this.pathHashAlgo = clusterSettings.get(REMOTE_ROUTING_TABLE_PATH_HASH_ALGO_SETTING);
clusterSettings.addSettingsUpdateConsumer(REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING, this::setPathTypeSetting);
clusterSettings.addSettingsUpdateConsumer(REMOTE_ROUTING_TABLE_PATH_HASH_ALGO_SETTING, this::setPathHashAlgoSetting);
this.threadPool = threadpool;
}

private void setPathTypeSetting(RemoteStoreEnums.PathType pathType) {
this.pathType = pathType;
}

private void setPathHashAlgoSetting(RemoteStoreEnums.PathHashAlgorithm pathHashAlgo) {
this.pathHashAlgo = pathHashAlgo;
this.clusterName = clusterName;
this.clusterSettings = clusterSettings;
}

public List<IndexRoutingTable> getIndicesRouting(RoutingTable routingTable) {
Expand All @@ -150,43 +102,31 @@ public DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRouting

/**
* Async action for writing one {@code IndexRoutingTable} to remote store
* @param clusterState current cluster state
* @param term current term
* @param version current version
* @param clusterUUID current cluster UUID
* @param indexRouting indexRoutingTable to write to remote store
* @param latchedActionListener listener for handling async action response
* @param clusterBasePath base path for remote file
*/
@Override
public void getIndexRoutingAsyncAction(
ClusterState clusterState,
public void getAsyncIndexRoutingWriteAction(
String clusterUUID,
long term,
long version,
IndexRoutingTable indexRouting,
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener,
BlobPath clusterBasePath
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener
) {

BlobPath indexRoutingPath = clusterBasePath.add(INDEX_ROUTING_PATH_TOKEN);
BlobPath path = pathType.path(
RemoteStorePathStrategy.BasePathInput.builder().basePath(indexRoutingPath).indexUUID(indexRouting.getIndex().getUUID()).build(),
pathHashAlgo
);
final BlobContainer blobContainer = blobStoreRepository.blobStore().blobContainer(path);

final String fileName = getIndexRoutingFileName(clusterState.term(), clusterState.version());
RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable(indexRouting, clusterUUID, compressor, term, version);

ActionListener<Void> completionListener = ActionListener.wrap(
resp -> latchedActionListener.onResponse(
new ClusterMetadataManifest.UploadedIndexMetadata(
indexRouting.getIndex().getName(),
indexRouting.getIndex().getUUID(),
path.buildAsString() + fileName,
INDEX_ROUTING_METADATA_PREFIX
)
),
resp -> latchedActionListener.onResponse(remoteIndexRoutingTable.getUploadedMetadata()),
ex -> latchedActionListener.onFailure(
new RemoteStateTransferException("Exception in writing index to remote store: " + indexRouting.getIndex().toString(), ex)
)
);

uploadIndex(indexRouting, fileName, blobContainer, completionListener);
remoteIndexRoutingTableStore.writeAsync(remoteIndexRoutingTable, completionListener);
}

/**
Expand All @@ -213,111 +153,21 @@ public List<ClusterMetadataManifest.UploadedIndexMetadata> getAllUploadedIndices
return new ArrayList<>(allUploadedIndicesRouting.values());
}

private void uploadIndex(
IndexRoutingTable indexRouting,
String fileName,
BlobContainer blobContainer,
ActionListener<Void> completionListener
) {
RemoteIndexRoutingTable indexRoutingInput = new RemoteIndexRoutingTable(indexRouting);
BytesReference bytesInput = null;
try (BytesStreamOutput streamOutput = new BytesStreamOutput()) {
indexRoutingInput.writeTo(streamOutput);
bytesInput = streamOutput.bytes();
} catch (IOException e) {
logger.error("Failed to serialize IndexRoutingTable for [{}]: [{}]", indexRouting, e);
completionListener.onFailure(e);
return;
}

if (blobContainer instanceof AsyncMultiStreamBlobContainer == false) {
try {
blobContainer.writeBlob(fileName, bytesInput.streamInput(), bytesInput.length(), true);
completionListener.onResponse(null);
} catch (IOException e) {
logger.error("Failed to write IndexRoutingTable to remote store for indexRouting [{}]: [{}]", indexRouting, e);
completionListener.onFailure(e);
}
return;
}

try (IndexInput input = new ByteArrayIndexInput("indexrouting", BytesReference.toBytes(bytesInput))) {
try (
RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer(
fileName,
fileName,
input.length(),
true,
WritePriority.URGENT,
(size, position) -> new OffsetRangeIndexInputStream(input, size, position),
null,
false
)
) {
((AsyncMultiStreamBlobContainer) blobContainer).asyncBlobUpload(
remoteTransferContainer.createWriteContext(),
completionListener
);
} catch (IOException e) {
logger.error("Failed to write IndexRoutingTable to remote store for indexRouting [{}]: [{}]", indexRouting, e);
completionListener.onFailure(e);
}
} catch (IOException e) {
logger.error(
"Failed to create transfer object for IndexRoutingTable for remote store upload for indexRouting [{}]: [{}]",
indexRouting,
e
);
completionListener.onFailure(e);
}
}

@Override
public void getAsyncIndexRoutingReadAction(
String clusterUUID,
String uploadedFilename,
Index index,
LatchedActionListener<IndexRoutingTable> latchedActionListener
) {
int idx = uploadedFilename.lastIndexOf("/");
String blobFileName = uploadedFilename.substring(idx + 1);
BlobContainer blobContainer = blobStoreRepository.blobStore()
.blobContainer(BlobPath.cleanPath().add(uploadedFilename.substring(0, idx)));

readAsync(
blobContainer,
blobFileName,
index,
threadPool.executor(ThreadPool.Names.REMOTE_STATE_READ),
ActionListener.wrap(
response -> latchedActionListener.onResponse(response.getIndexRoutingTable()),
latchedActionListener::onFailure
)
ActionListener<IndexRoutingTable> actionListener = ActionListener.wrap(
latchedActionListener::onResponse,
latchedActionListener::onFailure
);
}

private void readAsync(
BlobContainer blobContainer,
String name,
Index index,
ExecutorService executorService,
ActionListener<RemoteIndexRoutingTable> listener
) {
executorService.execute(() -> {
try {
listener.onResponse(read(blobContainer, name, index));
} catch (Exception e) {
listener.onFailure(e);
}
});
}
RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable(uploadedFilename, clusterUUID, compressor);

private RemoteIndexRoutingTable read(BlobContainer blobContainer, String path, Index index) {
try {
return new RemoteIndexRoutingTable(blobContainer.readBlob(path), index);
} catch (IOException | AssertionError e) {
logger.error(() -> new ParameterizedMessage("RoutingTable read failed for path {}", path), e);
throw new RemoteStateTransferException("Failed to read RemoteRoutingTable from Manifest with error ", e);
}
remoteIndexRoutingTableStore.readAsync(remoteIndexRoutingTable, actionListener);
}

@Override
Expand All @@ -334,16 +184,6 @@ public List<ClusterMetadataManifest.UploadedIndexMetadata> getUpdatedIndexRoutin
}).collect(Collectors.toList());
}

private String getIndexRoutingFileName(long term, long version) {
return String.join(
DELIMITER,
INDEX_ROUTING_FILE_PREFIX,
RemoteStoreUtils.invertLong(term),
RemoteStoreUtils.invertLong(version),
RemoteStoreUtils.invertLong(System.currentTimeMillis())
);
}

@Override
protected void doClose() throws IOException {
if (blobStoreRepository != null) {
Expand All @@ -361,6 +201,16 @@ protected void doStart() {
final Repository repository = repositoriesService.get().repository(remoteStoreRepo);
assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository";
blobStoreRepository = (BlobStoreRepository) repository;
compressor = blobStoreRepository.getCompressor();

this.remoteIndexRoutingTableStore = new RemoteRoutingTableBlobStore<>(
new BlobStoreTransferService(blobStoreRepository.blobStore(), threadPool),
blobStoreRepository,
clusterName,
threadPool,
ThreadPool.Names.REMOTE_STATE_READ,
clusterSettings
);
}

@Override
Expand All @@ -376,5 +226,4 @@ public void deleteStaleIndexRoutingPaths(List<String> stalePaths) throws IOExcep
throw e;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,10 @@
package org.opensearch.cluster.routing.remote;

import org.opensearch.action.LatchedActionListener;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.DiffableUtils;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.core.index.Index;
import org.opensearch.gateway.remote.ClusterMetadataManifest;

import java.io.IOException;
Expand All @@ -41,11 +38,12 @@ public DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRouting
}

@Override
public void getIndexRoutingAsyncAction(
ClusterState clusterState,
public void getAsyncIndexRoutingWriteAction(
String clusterUUID,
long term,
long version,
IndexRoutingTable indexRouting,
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener,
BlobPath clusterBasePath
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener
) {
// noop
}
Expand All @@ -62,8 +60,8 @@ public List<ClusterMetadataManifest.UploadedIndexMetadata> getAllUploadedIndices

@Override
public void getAsyncIndexRoutingReadAction(
String clusterUUID,
String uploadedFilename,
Index index,
LatchedActionListener<IndexRoutingTable> latchedActionListener
) {
// noop
Expand Down
Loading

0 comments on commit 61ca56e

Please sign in to comment.