Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor remote-routing-table service inline with remote state interfaces #14668

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Print reason why parent task was cancelled ([#14604](https://github.com/opensearch-project/OpenSearch/issues/14604))
- Add matchesPluginSystemIndexPattern to SystemIndexRegistry ([#14750](https://github.com/opensearch-project/OpenSearch/pull/14750))
- Add Plugin interface for loading application based configuration templates (([#14659](https://github.com/opensearch-project/OpenSearch/issues/14659)))
- Refactor remote-routing-table service inline with remote state interfaces([#14668](https://github.com/opensearch-project/OpenSearch/pull/14668))

### Dependencies
- Bump `org.gradle.test-retry` from 1.5.8 to 1.5.9 ([#13442](https://github.com/opensearch-project/OpenSearch/pull/13442))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,35 +11,24 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.IndexInput;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.DiffableUtils;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.common.CheckedRunnable;
import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.stream.write.WritePriority;
import org.opensearch.common.blobstore.transfer.RemoteTransferContainer;
import org.opensearch.common.blobstore.transfer.stream.OffsetRangeIndexInputStream;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.common.lucene.store.ByteArrayIndexInput;
import org.opensearch.common.remote.RemoteWritableEntityStore;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.index.Index;
import org.opensearch.core.compress.Compressor;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.gateway.remote.RemoteStateTransferException;
import org.opensearch.gateway.remote.model.RemoteRoutingTableBlobStore;
import org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable;
import org.opensearch.index.remote.RemoteStoreEnums;
import org.opensearch.index.remote.RemoteStorePathStrategy;
import org.opensearch.index.remote.RemoteStoreUtils;
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
import org.opensearch.node.Node;
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;
import org.opensearch.repositories.RepositoriesService;
Expand All @@ -52,12 +41,10 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled;

/**
Expand All @@ -67,64 +54,29 @@
*/
public class InternalRemoteRoutingTableService extends AbstractLifecycleComponent implements RemoteRoutingTableService {

/**
* This setting is used to set the remote routing table store blob store path type strategy.
*/
public static final Setting<RemoteStoreEnums.PathType> REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING = new Setting<>(
"cluster.remote_store.routing_table.path_type",
RemoteStoreEnums.PathType.HASHED_PREFIX.toString(),
RemoteStoreEnums.PathType::parseString,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

/**
* This setting is used to set the remote routing table store blob store path hash algorithm strategy.
* This setting will come to effect if the {@link #REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING}
* is either {@code HASHED_PREFIX} or {@code HASHED_INFIX}.
*/
public static final Setting<RemoteStoreEnums.PathHashAlgorithm> REMOTE_ROUTING_TABLE_PATH_HASH_ALGO_SETTING = new Setting<>(
"cluster.remote_store.routing_table.path_hash_algo",
RemoteStoreEnums.PathHashAlgorithm.FNV_1A_BASE64.toString(),
RemoteStoreEnums.PathHashAlgorithm::parseString,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

public static final String INDEX_ROUTING_PATH_TOKEN = "index-routing";
public static final String INDEX_ROUTING_FILE_PREFIX = "index_routing";
public static final String INDEX_ROUTING_METADATA_PREFIX = "indexRouting--";

private static final Logger logger = LogManager.getLogger(InternalRemoteRoutingTableService.class);
private final Settings settings;
private final Supplier<RepositoriesService> repositoriesService;
private Compressor compressor;
private RemoteWritableEntityStore<IndexRoutingTable, RemoteIndexRoutingTable> remoteIndexRoutingTableStore;
private final ClusterSettings clusterSettings;
private BlobStoreRepository blobStoreRepository;
private RemoteStoreEnums.PathType pathType;
private RemoteStoreEnums.PathHashAlgorithm pathHashAlgo;
private ThreadPool threadPool;
private final ThreadPool threadPool;
private final String clusterName;

public InternalRemoteRoutingTableService(
Supplier<RepositoriesService> repositoriesService,
Settings settings,
ClusterSettings clusterSettings,
ThreadPool threadpool
ThreadPool threadpool,
String clusterName
) {
assert isRemoteRoutingTableEnabled(settings) : "Remote routing table is not enabled";
this.repositoriesService = repositoriesService;
this.settings = settings;
this.pathType = clusterSettings.get(REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING);
this.pathHashAlgo = clusterSettings.get(REMOTE_ROUTING_TABLE_PATH_HASH_ALGO_SETTING);
clusterSettings.addSettingsUpdateConsumer(REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING, this::setPathTypeSetting);
clusterSettings.addSettingsUpdateConsumer(REMOTE_ROUTING_TABLE_PATH_HASH_ALGO_SETTING, this::setPathHashAlgoSetting);
this.threadPool = threadpool;
}

private void setPathTypeSetting(RemoteStoreEnums.PathType pathType) {
this.pathType = pathType;
}

private void setPathHashAlgoSetting(RemoteStoreEnums.PathHashAlgorithm pathHashAlgo) {
this.pathHashAlgo = pathHashAlgo;
this.clusterName = clusterName;
this.clusterSettings = clusterSettings;
}

public List<IndexRoutingTable> getIndicesRouting(RoutingTable routingTable) {
Expand All @@ -151,43 +103,32 @@ public DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRouting

/**
* Create async action for writing one {@code IndexRoutingTable} to remote store
* @param clusterState current cluster state
* @param term current term
* @param version current version
* @param clusterUUID current cluster UUID
Arpit-Bandejiya marked this conversation as resolved.
Show resolved Hide resolved
* @param indexRouting indexRoutingTable to write to remote store
* @param latchedActionListener listener for handling async action response
* @param clusterBasePath base path for remote file
* @return returns runnable async action
*/
public CheckedRunnable<IOException> getIndexRoutingAsyncAction(
ClusterState clusterState,
@Override
public CheckedRunnable<IOException> getAsyncIndexRoutingWriteAction(
String clusterUUID,
long term,
long version,
IndexRoutingTable indexRouting,
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener,
BlobPath clusterBasePath
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener
) {

BlobPath indexRoutingPath = clusterBasePath.add(INDEX_ROUTING_PATH_TOKEN);
BlobPath path = pathType.path(
RemoteStorePathStrategy.PathInput.builder().basePath(indexRoutingPath).indexUUID(indexRouting.getIndex().getUUID()).build(),
pathHashAlgo
);
final BlobContainer blobContainer = blobStoreRepository.blobStore().blobContainer(path);

final String fileName = getIndexRoutingFileName(clusterState.term(), clusterState.version());
RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable(indexRouting, clusterUUID, compressor, term, version);

ActionListener<Void> completionListener = ActionListener.wrap(
resp -> latchedActionListener.onResponse(
new ClusterMetadataManifest.UploadedIndexMetadata(
indexRouting.getIndex().getName(),
indexRouting.getIndex().getUUID(),
path.buildAsString() + fileName,
INDEX_ROUTING_METADATA_PREFIX
)
),
resp -> latchedActionListener.onResponse(remoteIndexRoutingTable.getUploadedMetadata()),
ex -> latchedActionListener.onFailure(
new RemoteStateTransferException("Exception in writing index to remote store: " + indexRouting.getIndex().toString(), ex)
)
);

return () -> uploadIndex(indexRouting, fileName, blobContainer, completionListener);
return () -> remoteIndexRoutingTableStore.writeAsync(remoteIndexRoutingTable, completionListener);
}

/**
Expand All @@ -214,111 +155,21 @@ public List<ClusterMetadataManifest.UploadedIndexMetadata> getAllUploadedIndices
return new ArrayList<>(allUploadedIndicesRouting.values());
}

private void uploadIndex(
Arpit-Bandejiya marked this conversation as resolved.
Show resolved Hide resolved
IndexRoutingTable indexRouting,
String fileName,
BlobContainer blobContainer,
ActionListener<Void> completionListener
) {
RemoteIndexRoutingTable indexRoutingInput = new RemoteIndexRoutingTable(indexRouting);
BytesReference bytesInput = null;
try (BytesStreamOutput streamOutput = new BytesStreamOutput()) {
indexRoutingInput.writeTo(streamOutput);
bytesInput = streamOutput.bytes();
} catch (IOException e) {
logger.error("Failed to serialize IndexRoutingTable for [{}]: [{}]", indexRouting, e);
completionListener.onFailure(e);
return;
}

if (blobContainer instanceof AsyncMultiStreamBlobContainer == false) {
try {
blobContainer.writeBlob(fileName, bytesInput.streamInput(), bytesInput.length(), true);
completionListener.onResponse(null);
} catch (IOException e) {
logger.error("Failed to write IndexRoutingTable to remote store for indexRouting [{}]: [{}]", indexRouting, e);
completionListener.onFailure(e);
}
return;
}

try (IndexInput input = new ByteArrayIndexInput("indexrouting", BytesReference.toBytes(bytesInput))) {
try (
RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer(
fileName,
fileName,
input.length(),
true,
WritePriority.URGENT,
(size, position) -> new OffsetRangeIndexInputStream(input, size, position),
null,
false
)
) {
((AsyncMultiStreamBlobContainer) blobContainer).asyncBlobUpload(
remoteTransferContainer.createWriteContext(),
completionListener
);
} catch (IOException e) {
logger.error("Failed to write IndexRoutingTable to remote store for indexRouting [{}]: [{}]", indexRouting, e);
completionListener.onFailure(e);
}
} catch (IOException e) {
logger.error(
"Failed to create transfer object for IndexRoutingTable for remote store upload for indexRouting [{}]: [{}]",
indexRouting,
e
);
completionListener.onFailure(e);
}
}

@Override
public CheckedRunnable<IOException> getAsyncIndexRoutingReadAction(
String clusterUUID,
String uploadedFilename,
Index index,
LatchedActionListener<IndexRoutingTable> latchedActionListener
) {
int idx = uploadedFilename.lastIndexOf("/");
String blobFileName = uploadedFilename.substring(idx + 1);
BlobContainer blobContainer = blobStoreRepository.blobStore()
.blobContainer(BlobPath.cleanPath().add(uploadedFilename.substring(0, idx)));

return () -> readAsync(
blobContainer,
blobFileName,
index,
threadPool.executor(ThreadPool.Names.REMOTE_STATE_READ),
ActionListener.wrap(
response -> latchedActionListener.onResponse(response.getIndexRoutingTable()),
latchedActionListener::onFailure
)
ActionListener<IndexRoutingTable> actionListener = ActionListener.wrap(
latchedActionListener::onResponse,
latchedActionListener::onFailure
Arpit-Bandejiya marked this conversation as resolved.
Show resolved Hide resolved
);
}

private void readAsync(
BlobContainer blobContainer,
String name,
Index index,
ExecutorService executorService,
ActionListener<RemoteIndexRoutingTable> listener
) {
executorService.execute(() -> {
try {
listener.onResponse(read(blobContainer, name, index));
} catch (Exception e) {
listener.onFailure(e);
}
});
}
RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable(uploadedFilename, clusterUUID, compressor);

private RemoteIndexRoutingTable read(BlobContainer blobContainer, String path, Index index) {
try {
return new RemoteIndexRoutingTable(blobContainer.readBlob(path), index);
} catch (IOException | AssertionError e) {
logger.error(() -> new ParameterizedMessage("RoutingTable read failed for path {}", path), e);
throw new RemoteStateTransferException("Failed to read RemoteRoutingTable from Manifest with error ", e);
}
return () -> remoteIndexRoutingTableStore.readAsync(remoteIndexRoutingTable, actionListener);
}

@Override
Expand All @@ -335,16 +186,6 @@ public List<ClusterMetadataManifest.UploadedIndexMetadata> getUpdatedIndexRoutin
}).collect(Collectors.toList());
}

private String getIndexRoutingFileName(long term, long version) {
return String.join(
DELIMITER,
INDEX_ROUTING_FILE_PREFIX,
RemoteStoreUtils.invertLong(term),
RemoteStoreUtils.invertLong(version),
RemoteStoreUtils.invertLong(System.currentTimeMillis())
);
}

@Override
protected void doClose() throws IOException {
if (blobStoreRepository != null) {
Expand All @@ -362,6 +203,16 @@ protected void doStart() {
final Repository repository = repositoriesService.get().repository(remoteStoreRepo);
assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository";
blobStoreRepository = (BlobStoreRepository) repository;
compressor = blobStoreRepository.getCompressor();

this.remoteIndexRoutingTableStore = new RemoteRoutingTableBlobStore<>(
new BlobStoreTransferService(blobStoreRepository.blobStore(), threadPool),
blobStoreRepository,
clusterName,
threadPool,
ThreadPool.Names.REMOTE_STATE_READ,
clusterSettings
Arpit-Bandejiya marked this conversation as resolved.
Show resolved Hide resolved
);
}

@Override
Expand All @@ -377,5 +228,4 @@ public void deleteStaleIndexRoutingPaths(List<String> stalePaths) throws IOExcep
throw e;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,11 @@
package org.opensearch.cluster.routing.remote;

import org.opensearch.action.LatchedActionListener;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.DiffableUtils;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.common.CheckedRunnable;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.core.index.Index;
import org.opensearch.gateway.remote.ClusterMetadataManifest;

import java.io.IOException;
Expand All @@ -42,11 +39,12 @@ public DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRouting
}

@Override
public CheckedRunnable<IOException> getIndexRoutingAsyncAction(
ClusterState clusterState,
public CheckedRunnable<IOException> getAsyncIndexRoutingWriteAction(
String clusterUUID,
long term,
long version,
IndexRoutingTable indexRouting,
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener,
BlobPath clusterBasePath
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener
) {
// noop
return () -> {};
Expand All @@ -64,8 +62,8 @@ public List<ClusterMetadataManifest.UploadedIndexMetadata> getAllUploadedIndices

@Override
public CheckedRunnable<IOException> getAsyncIndexRoutingReadAction(
String clusterUUID,
String uploadedFilename,
Index index,
LatchedActionListener<IndexRoutingTable> latchedActionListener
) {
// noop
Expand Down
Loading
Loading