Skip to content

Commit

Permalink
Add support to delete remote cluster state
Browse files Browse the repository at this point in the history
Signed-off-by: bansvaru <[email protected]>
  • Loading branch information
linuxpi committed Sep 4, 2023
1 parent cf50add commit fa0f297
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,29 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.Version;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.Nullable;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobMetadata;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.action.ActionListener;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata;
import org.opensearch.index.remote.RemoteStoreUtils;
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat;
import org.opensearch.threadpool.ThreadPool;

import java.io.Closeable;
import java.io.IOException;
Expand Down Expand Up @@ -96,6 +101,7 @@ public class RemoteClusterStateService implements Closeable {
private final Supplier<RepositoriesService> repositoriesService;
private final Settings settings;
private final LongSupplier relativeTimeNanosSupplier;
private final ThreadPool threadpool;
private BlobStoreRepository blobStoreRepository;
private volatile TimeValue slowWriteLoggingThreshold;

Expand All @@ -104,13 +110,15 @@ public RemoteClusterStateService(
Supplier<RepositoriesService> repositoriesService,
Settings settings,
ClusterSettings clusterSettings,
LongSupplier relativeTimeNanosSupplier
LongSupplier relativeTimeNanosSupplier,
ThreadPool threadPool
) {
assert REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings) == true : "Remote cluster state is not enabled";
this.nodeId = nodeId;
this.repositoriesService = repositoriesService;
this.settings = settings;
this.relativeTimeNanosSupplier = relativeTimeNanosSupplier;
this.threadpool = threadPool;
this.slowWriteLoggingThreshold = clusterSettings.get(SLOW_WRITE_LOGGING_THRESHOLD);
clusterSettings.addSettingsUpdateConsumer(SLOW_WRITE_LOGGING_THRESHOLD, this::setSlowWriteLoggingThreshold);
}
Expand Down Expand Up @@ -330,26 +338,19 @@ private void writeMetadataManifest(String clusterName, String clusterUUID, Clust
private BlobContainer indexMetadataContainer(String clusterName, String clusterUUID, String indexUUID) {
// 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/index/ftqsCnn9TgOX
return blobStoreRepository.blobStore()
.blobContainer(
blobStoreRepository.basePath()
.add(Base64.getUrlEncoder().withoutPadding().encodeToString(clusterName.getBytes(StandardCharsets.UTF_8)))
.add("cluster-state")
.add(clusterUUID)
.add("index")
.add(indexUUID)
);
.blobContainer(getCusterMetadataBasePath(clusterName, clusterUUID).add("index").add(indexUUID));
}

private BlobContainer manifestContainer(String clusterName, String clusterUUID) {
// 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/manifest
return blobStoreRepository.blobStore()
.blobContainer(
blobStoreRepository.basePath()
.add(Base64.getUrlEncoder().withoutPadding().encodeToString(clusterName.getBytes(StandardCharsets.UTF_8)))
.add("cluster-state")
.add(clusterUUID)
.add("manifest")
);
return blobStoreRepository.blobStore().blobContainer(getCusterMetadataBasePath(clusterName, clusterUUID).add("manifest"));
}

private BlobPath getCusterMetadataBasePath(String clusterName, String clusterUUID) {
return blobStoreRepository.basePath()
.add(Base64.getUrlEncoder().withoutPadding().encodeToString(clusterName.getBytes(StandardCharsets.UTF_8)))
.add("cluster-state")
.add(clusterUUID);
}

private void setSlowWriteLoggingThreshold(TimeValue slowWriteLoggingThreshold) {
Expand Down Expand Up @@ -468,4 +469,34 @@ private ClusterMetadataManifest fetchRemoteClusterMetadataManifest(String cluste
throw new IllegalStateException(String.format(Locale.ROOT, "Error while downloading cluster metadata - %s", filename), e);
}
}

/**
* Deletes entire cluster metadata content for given cluster UUIDs in remote
* @param clusterName name of the cluster
* @param clusterUUIDs clusterUUID's for which remote cluster metadata needs to be purged.
*/
public void deleteMetadataAsync(String clusterName, List<String> clusterUUIDs) {
(new BlobStoreTransferService(blobStoreRepository.blobStore(), threadpool)).deleteAsync(
ThreadPool.Names.REMOTE_PURGE,
clusterUUIDs.stream()
.map(clusterUUID -> getCusterMetadataBasePath(clusterName, clusterUUID).toString())
.collect(Collectors.toList()),
new ActionListener<>() {
@Override
public void onResponse(Void unused) {
logger.info("Deleted Remote Cluster Metadata for clusterUUIDs {}", clusterUUIDs);
}

@Override
public void onFailure(Exception e) {
logger.error(
new ParameterizedMessage(
"Exception occurred while deleting Remote Cluster Metadata for clusterUUIDs {}",
clusterUUIDs
)
);
}
}
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.function.Supplier;

import org.mockito.ArgumentMatchers;
import org.opensearch.threadpool.TestThreadPool;

import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
Expand Down Expand Up @@ -84,6 +85,7 @@ public void setup() {
repositoriesServiceSupplier,
settings,
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
new TestThreadPool(getClass().getName()),
() -> 0L
);
}
Expand All @@ -103,6 +105,7 @@ public void testFailInitializationWhenRemoteStateDisabled() throws IOException {
repositoriesServiceSupplier,
settings,
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
new TestThreadPool(getClass().getName()),
() -> 0L
)
);
Expand Down

0 comments on commit fa0f297

Please sign in to comment.