Skip to content

Commit

Permalink
Delete stale index routing files.
Browse files Browse the repository at this point in the history
Signed-off-by: Shailendra Singh <[email protected]>
  • Loading branch information
Shailendra Singh committed Jun 8, 2024
1 parent f873d88 commit 63fce80
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.BlobStore;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.io.IOUtils;
Expand All @@ -20,6 +22,7 @@
import org.opensearch.repositories.blobstore.BlobStoreRepository;

import java.io.IOException;
import java.util.List;
import java.util.function.Supplier;

import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled;
Expand Down Expand Up @@ -61,6 +64,15 @@ protected void doStart() {
blobStoreRepository = (BlobStoreRepository) repository;
}

public BlobStore getBlobStore() {
return blobStoreRepository.blobStore();
}

public void DeleteStaleIndexRoutingPaths(List<String> stalePaths) throws IOException {
logger.debug(() -> "Deleting stale index routing files from remote - " + stalePaths);
getBlobStore().blobContainer(BlobPath.cleanPath()).deleteBlobsIgnoringIfNotExists(stalePaths);
}

@Override
protected void doStop() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Strings;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.routing.remote.RemoteRoutingTableService;
import org.opensearch.cluster.service.ClusterApplierService;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.blobstore.BlobMetadata;
Expand All @@ -24,11 +25,9 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.cluster.routing.remote.RemoteRoutingTableService;

import java.io.Closeable;
import java.io.IOException;
import java.sql.Blob;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -76,8 +75,13 @@ public class RemoteClusterStateCleanupManager implements Closeable {
private long lastCleanupAttemptStateVersion;
private final ThreadPool threadpool;
private final ClusterApplierService clusterApplierService;
private final RemoteRoutingTableService remoteRoutingTableService;

public RemoteClusterStateCleanupManager(RemoteClusterStateService remoteClusterStateService, ClusterService clusterService) {
public RemoteClusterStateCleanupManager(
RemoteClusterStateService remoteClusterStateService,
ClusterService clusterService,
RemoteRoutingTableService remoteRoutingTableService
) {
this.remoteClusterStateService = remoteClusterStateService;
this.remoteStateStats = remoteClusterStateService.getStats();
ClusterSettings clusterSettings = clusterService.getClusterSettings();
Expand All @@ -86,6 +90,7 @@ public RemoteClusterStateCleanupManager(RemoteClusterStateService remoteClusterS
this.threadpool = remoteClusterStateService.getThreadpool();
// initialize with 0, a cleanup will be done when this node is elected master node and version is incremented more than threshold
this.lastCleanupAttemptStateVersion = 0;
this.remoteRoutingTableService = remoteRoutingTableService;
clusterSettings.addSettingsUpdateConsumer(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING, this::updateCleanupInterval);
}

Expand Down Expand Up @@ -192,7 +197,7 @@ void deleteClusterMetadata(
.values()
.forEach(attribute -> filesToKeep.add(attribute.getUploadedFilename()));
}
if (clusterMetadataManifest.getCodecVersion() >= ClusterMetadataManifest.CODEC_V3) {
if (clusterMetadataManifest.getIndicesRouting() != null) {
clusterMetadataManifest.getIndicesRouting()
.forEach(uploadedIndicesRouting -> filesToKeep.add(uploadedIndicesRouting.getUploadedFilename()));
}
Expand Down Expand Up @@ -228,10 +233,10 @@ void deleteClusterMetadata(
attribute -> addStaleGlobalMetadataPath(attribute.getUploadedFilename(), filesToKeep, staleGlobalMetadataPaths)
);
}
if (clusterMetadataManifest.getCodecVersion() >= ClusterMetadataManifest.CODEC_V3) {
if (clusterMetadataManifest.getIndicesRouting() != null) {
clusterMetadataManifest.getIndicesRouting().forEach(uploadedIndicesRouting -> {
if (filesToKeep.contains(uploadedIndicesRouting.getUploadedFilename()) == false){
staleIndexRoutingPaths.add(new BlobPath().buildAsString() + uploadedIndicesRouting.getUploadedFilename());
if (!filesToKeep.contains(uploadedIndicesRouting.getUploadedFilename())) {
staleIndexRoutingPaths.add(uploadedIndicesRouting.getUploadedFilename());
logger.debug(() -> "Indices routing paths in stale manifest: " + uploadedIndicesRouting.getUploadedFilename());
}
});
Expand All @@ -255,7 +260,9 @@ void deleteClusterMetadata(
deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleGlobalMetadataPaths));
deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleIndexMetadataPaths));
deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleManifestPaths));
deleteStaleIndexRoutingPaths(new ArrayList<>(staleIndexRoutingPaths));
if (remoteRoutingTableService != null) {
remoteRoutingTableService.DeleteStaleIndexRoutingPaths(new ArrayList<>(staleIndexRoutingPaths));
}
} catch (IllegalStateException e) {
logger.error("Error while fetching Remote Cluster Metadata manifests", e);
} catch (IOException e) {
Expand Down Expand Up @@ -360,14 +367,6 @@ void deleteStalePaths(String clusterName, String clusterUUID, List<String> stale
);
}

void deleteStaleIndexRoutingPaths(List<String> stalePaths) throws IOException {
logger.debug(() -> String.format(Locale.ROOT, "Deleting stale index routing files from remote - %s", stalePaths));
getBlobStoreTransferService().deleteBlobs(
BlobPath.cleanPath(),
stalePaths
);
}

/**
* Purges all remote cluster state against provided cluster UUIDs
* @param clusterState current state of the cluster
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,11 +254,15 @@ public RemoteClusterStateService(
clusterSettings.addSettingsUpdateConsumer(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING, this::setGlobalMetadataUploadTimeout);
clusterSettings.addSettingsUpdateConsumer(METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING, this::setMetadataManifestUploadTimeout);
this.remoteStateStats = new RemotePersistenceStats();
this.remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(this, clusterService);
this.indexMetadataUploadListeners = indexMetadataUploadListeners;
this.remoteRoutingTableService = isRemoteRoutingTableEnabled(settings)
? Optional.of(new RemoteRoutingTableService(repositoriesService, settings))
: Optional.empty();
this.remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(
this,
clusterService,
this.remoteRoutingTableService.orElse(null)
);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.remote.RemoteRoutingTableService;
import org.opensearch.cluster.service.ClusterApplierService;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.blobstore.BlobContainer;
Expand Down Expand Up @@ -47,6 +48,7 @@

import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V1;
import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V2;
import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V3;
import static org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata;
import static org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute;
import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.AsyncStaleFileDeletion;
Expand Down Expand Up @@ -76,6 +78,7 @@
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand All @@ -92,6 +95,7 @@ public class RemoteClusterStateCleanupManagerTests extends OpenSearchTestCase {
private ClusterState clusterState;
private Metadata metadata;
private RemoteClusterStateService remoteClusterStateService;
private RemoteRoutingTableService remoteRoutingTableService;
private final ThreadPool threadPool = new TestThreadPool(getClass().getName());

@Before
Expand Down Expand Up @@ -139,7 +143,13 @@ public void setup() {
when(remoteClusterStateService.getStats()).thenReturn(new RemotePersistenceStats());
when(remoteClusterStateService.getThreadpool()).thenReturn(threadPool);
when(remoteClusterStateService.getBlobStore()).thenReturn(blobStore);
remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(remoteClusterStateService, clusterService);
remoteRoutingTableService = mock(RemoteRoutingTableService.class);
when(remoteClusterStateService.getBlobStore()).thenReturn(blobStore);
remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(
remoteClusterStateService,
clusterService,
remoteRoutingTableService
);
}

@After
Expand Down Expand Up @@ -201,11 +211,15 @@ public void testDeleteClusterMetadata() throws IOException {
.settingMetadata(settingMetadataUpdated)
.build();

List<UploadedIndexMetadata> indicesRouting1 = List.of(index1Metadata, index2Metadata);
List<UploadedIndexMetadata> indicesRouting2 = List.of(index1Metadata);
UploadedIndexMetadata index3Metadata = new UploadedIndexMetadata("index3", "indexUUID3", "index_metadata3");
UploadedIndexMetadata index4Metadata = new UploadedIndexMetadata("index4", "indexUUID4", "index_metadata4");
List<UploadedIndexMetadata> indicesRouting1 = List.of(index3Metadata, index4Metadata);
List<UploadedIndexMetadata> indicesRouting2 = List.of(index4Metadata);
ClusterMetadataManifest manifest6 = ClusterMetadataManifest.builder()
.indices(List.of(index1UpdatedMetadata))
.globalMetadataFileName("global_metadata")
.indices(List.of(index1Metadata))
.coordinationMetadata(coordinationMetadataUpdated)
.templatesMetadata(templateMetadataUpdated)
.settingMetadata(settingMetadataUpdated)
.clusterTerm(1L)
.stateVersion(1L)
.codecVersion(CODEC_V3)
Expand All @@ -219,8 +233,10 @@ public void testDeleteClusterMetadata() throws IOException {
.indicesRouting(indicesRouting1)
.build();
ClusterMetadataManifest manifest7 = ClusterMetadataManifest.builder()
.indices(List.of(index1UpdatedMetadata))
.globalMetadataFileName("global_metadata")
.indices(List.of(index2Metadata))
.coordinationMetadata(coordinationMetadataUpdated)
.templatesMetadata(templateMetadataUpdated)
.settingMetadata(settingMetadataUpdated)
.clusterTerm(1L)
.stateVersion(1L)
.codecVersion(CODEC_V3)
Expand All @@ -243,11 +259,11 @@ public void testDeleteClusterMetadata() throws IOException {
when(remoteClusterStateService.fetchRemoteClusterMetadataManifest(eq(clusterName), eq(clusterUUID), any())).thenReturn(
manifest4,
manifest5,
manifest7,
manifest1,
manifest2,
manifest3,
manifest6,
manifest7
manifest6
);

BlobContainer container = mock(BlobContainer.class);
Expand All @@ -269,10 +285,24 @@ public void testDeleteClusterMetadata() throws IOException {
+ ".dat"
)
);
verify(container).deleteBlobsIgnoringIfNotExists(List.of(new BlobPath().buildAsString() + index1Metadata.getUploadedFilename()));
verify(remoteRoutingTableService).DeleteStaleIndexRoutingPaths(List.of(index3Metadata.getUploadedFilename()));
Set<String> staleManifest = new HashSet<>();
inactiveBlobs.forEach(blob -> staleManifest.add(new BlobPath().add(MANIFEST_PATH_TOKEN).buildAsString() + blob.name()));
verify(container).deleteBlobsIgnoringIfNotExists(new ArrayList<>(staleManifest));

// Test case when remoteRoutingTableService is null
remoteRoutingTableService = mock(RemoteRoutingTableService.class);
ClusterService clusterService = mock(ClusterService.class);
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
when(clusterState.getClusterName()).thenReturn(new ClusterName("test"));
when(metadata.clusterUUID()).thenReturn("testUUID");
when(clusterState.metadata()).thenReturn(metadata);
when(clusterApplierService.state()).thenReturn(clusterState);
when(clusterService.getClusterApplierService()).thenReturn(clusterApplierService);

remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(remoteClusterStateService, clusterService, null);
remoteClusterStateCleanupManager.deleteClusterMetadata(clusterName, clusterUUID, activeBlobs, inactiveBlobs);
verify(remoteRoutingTableService, never()).DeleteStaleIndexRoutingPaths(any());
}

public void testDeleteStaleClusterUUIDs() throws IOException {
Expand Down

0 comments on commit 63fce80

Please sign in to comment.