Skip to content

Commit

Permalink
Add more tests
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <[email protected]>
  • Loading branch information
sachinpkale committed Sep 12, 2024
1 parent 330b249 commit fec6cd0
Show file tree
Hide file tree
Showing 4 changed files with 235 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.index.remote.RemoteStoreEnums;
import org.opensearch.index.remote.RemoteStoreUtils;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService;
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.repositories.fs.FsRepository;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -206,7 +208,6 @@ public void testDeleteShallowCopyV2MultipleSnapshots() throws Exception {

}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/15692")
public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2() throws Exception {
disableRepoConsistencyCheck("Remote store repository is being used in the test");
final Path remoteStoreRepoPath = randomRepoPath();
Expand Down Expand Up @@ -242,12 +243,6 @@ public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2() throws Exceptio
.get()
.getSetting(remoteStoreEnabledIndexName, IndexMetadata.SETTING_INDEX_UUID);

String numShards = client().admin()
.indices()
.prepareGetSettings(remoteStoreEnabledIndexName)
.get()
.getSetting(remoteStoreEnabledIndexName, IndexMetadata.SETTING_NUMBER_OF_SHARDS);

logger.info("--> create two remote index shallow snapshots");
CreateSnapshotResponse createSnapshotResponse = client().admin()
.cluster()
Expand All @@ -269,6 +264,14 @@ public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2() throws Exceptio
assertThat(snapshotInfo2.successfulShards(), equalTo(snapshotInfo2.totalShards()));
assertThat(snapshotInfo2.snapshotId().getName(), equalTo("snap2"));

remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1));
long currentTime = System.currentTimeMillis();
long maxWaitRetry = 10;
while (maxWaitRetry >= 0 && RemoteStorePinnedTimestampService.getPinnedTimestamps().v1() <= currentTime) {
Thread.sleep(1000);
maxWaitRetry -= 1;
}

// delete remote store index
assertAcked(client().admin().indices().prepareDelete(remoteStoreEnabledIndexName));

Expand All @@ -291,14 +294,13 @@ public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2() throws Exceptio
.get();
assertAcked(deleteSnapshotResponse);

Thread.sleep(5000);

assertBusy(() -> {
try {
assertThat(RemoteStoreBaseIntegTestCase.getFileCount(segmentsPath), lessThan(segmentFilesCountBeforeDeletingSnapshot1));
} catch (Exception e) {}
} catch (NoSuchFileException e) {
fail();
}
}, 30, TimeUnit.SECONDS);
int segmentFilesCountAfterDeletingSnapshot1 = RemoteStoreBaseIntegTestCase.getFileCount(segmentsPath);

logger.info("--> delete snapshot 1");
RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO);
Expand All @@ -312,18 +314,171 @@ public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2() throws Exceptio
// Delete is async. Give time for it
assertBusy(() -> {
try {
assertThat(RemoteStoreBaseIntegTestCase.getFileCount(segmentsPath), lessThan(segmentFilesCountAfterDeletingSnapshot1));
} catch (Exception e) {}
assertEquals(0, RemoteStoreBaseIntegTestCase.getFileCount(segmentsPath));
} catch (NoSuchFileException e) {
fail();
}
}, 60, TimeUnit.SECONDS);

assertBusy(() -> {
try {
assertThat(RemoteStoreBaseIntegTestCase.getFileCount(translogPath), lessThan(translogFilesCountBeforeDeletingSnapshot1));
} catch (Exception e) {}
assertEquals(0, RemoteStoreBaseIntegTestCase.getFileCount(translogPath));
} catch (NoSuchFileException e) {
fail();
}
}, 60, TimeUnit.SECONDS);

}

public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2SingleSnapshot() throws Exception {
disableRepoConsistencyCheck("Remote store repository is being used in the test");
final Path remoteStoreRepoPath = randomRepoPath();
Settings settings = remoteStoreClusterSettings(REMOTE_REPO_NAME, remoteStoreRepoPath);
settings = Settings.builder()
.put(settings)
.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED.getKey(), true)
.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), RemoteStoreEnums.PathType.FIXED.toString())
.build();
String clusterManagerName = internalCluster().startClusterManagerOnlyNode(settings);
internalCluster().startDataOnlyNode(settings);
final Client clusterManagerClient = internalCluster().clusterManagerClient();
ensureStableCluster(2);

RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance(
RemoteStorePinnedTimestampService.class,
clusterManagerName
);

final String snapshotRepoName = "snapshot-repo-name";
final Path snapshotRepoPath = randomRepoPath();
createRepository(snapshotRepoName, "mock", snapshotRepoSettingsForShallowV2(snapshotRepoPath));

final String remoteStoreEnabledIndexName = "remote-index-1";
final Settings remoteStoreEnabledIndexSettings = getRemoteStoreBackedIndexSettings();
createIndex(remoteStoreEnabledIndexName, remoteStoreEnabledIndexSettings);
indexRandomDocs(remoteStoreEnabledIndexName, 25);

String indexUUID = client().admin()
.indices()
.prepareGetSettings(remoteStoreEnabledIndexName)
.get()
.getSetting(remoteStoreEnabledIndexName, IndexMetadata.SETTING_INDEX_UUID);

logger.info("--> create two remote index shallow snapshots");
CreateSnapshotResponse createSnapshotResponse = client().admin()
.cluster()
.prepareCreateSnapshot(snapshotRepoName, "snap1")
.setWaitForCompletion(true)
.get();
SnapshotInfo snapshotInfo1 = createSnapshotResponse.getSnapshotInfo();

Path indexPath = Path.of(String.valueOf(remoteStoreRepoPath), indexUUID);
Path shardPath = Path.of(String.valueOf(indexPath), "0");

// delete remote store index
assertAcked(client().admin().indices().prepareDelete(remoteStoreEnabledIndexName));

logger.info("--> delete snapshot 1");

Path segmentsPath = Path.of(String.valueOf(shardPath), "segments");
Path translogPath = Path.of(String.valueOf(shardPath), "translog");

RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO);

remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1));
long currentTime = System.currentTimeMillis();
long maxWaitRetry = 10;
while (maxWaitRetry >= 0 && RemoteStorePinnedTimestampService.getPinnedTimestamps().v1() <= currentTime) {
Thread.sleep(1000);
maxWaitRetry -= 1;
}

AcknowledgedResponse deleteSnapshotResponse = clusterManagerClient.admin()
.cluster()
.prepareDeleteSnapshot(snapshotRepoName, snapshotInfo1.snapshotId().getName())
.get();
assertAcked(deleteSnapshotResponse);

// Delete is async. Give time for it
assertBusy(() -> {
try {
assertEquals(0, RemoteStoreBaseIntegTestCase.getFileCount(segmentsPath));
} catch (NoSuchFileException e) {
fail();
}
}, 60, TimeUnit.SECONDS);

assertBusy(() -> {
try {
assertEquals(0, RemoteStoreBaseIntegTestCase.getFileCount(translogPath));
} catch (NoSuchFileException e) {
fail();
}
}, 60, TimeUnit.SECONDS);
}

public void testRemoteStoreCleanupForDeletedIndexWithoutAnySnapshot() throws Exception {
disableRepoConsistencyCheck("Remote store repository is being used in the test");
final Path remoteStoreRepoPath = randomRepoPath();
Settings settings = remoteStoreClusterSettings(REMOTE_REPO_NAME, remoteStoreRepoPath);
settings = Settings.builder()
.put(settings)
.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED.getKey(), true)
.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), RemoteStoreEnums.PathType.FIXED.toString())
.build();
String clusterManagerName = internalCluster().startClusterManagerOnlyNode(settings);
internalCluster().startDataOnlyNode(settings);
final Client clusterManagerClient = internalCluster().clusterManagerClient();
ensureStableCluster(2);

RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance(
RemoteStorePinnedTimestampService.class,
clusterManagerName
);
RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO);
remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1));

final String remoteStoreEnabledIndexName = "remote-index-1";
final Settings remoteStoreEnabledIndexSettings = getRemoteStoreBackedIndexSettings();
createIndex(remoteStoreEnabledIndexName, remoteStoreEnabledIndexSettings);
indexRandomDocs(remoteStoreEnabledIndexName, 5);

String indexUUID = client().admin()
.indices()
.prepareGetSettings(remoteStoreEnabledIndexName)
.get()
.getSetting(remoteStoreEnabledIndexName, IndexMetadata.SETTING_INDEX_UUID);

remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1));
long currentTime = System.currentTimeMillis();
long maxWaitRetry = 10;
while (maxWaitRetry >= 0 && RemoteStorePinnedTimestampService.getPinnedTimestamps().v1() <= currentTime) {
Thread.sleep(1000);
maxWaitRetry -= 1;
}
// delete remote store index
assertAcked(client().admin().indices().prepareDelete(remoteStoreEnabledIndexName));

Path indexPath = Path.of(String.valueOf(remoteStoreRepoPath), indexUUID);
Path shardPath = Path.of(String.valueOf(indexPath), "0");
Path segmentsPath = Path.of(String.valueOf(shardPath), "segments");
Path translogPath = Path.of(String.valueOf(shardPath), "translog");

// Get total segments remote store directory file count for deleted index and shard 0
assertBusy(() -> {
try {
assertEquals(0, RemoteStoreBaseIntegTestCase.getFileCount(segmentsPath));
} catch (NoSuchFileException e) {
// While files are getting deleted, we encounter NoSuchFileException in RemoteStoreBaseIntegTestCase.getFileCount
// Failing the assertion for assertBusy to try again
fail();
}
});
assertBusy(() -> {
assertEquals(0, RemoteStoreBaseIntegTestCase.getFileCount(translogPath));
});
}

private Settings snapshotV2Settings(Path remoteStoreRepoPath) {
Settings settings = Settings.builder()
.put(remoteStoreClusterSettings(REMOTE_REPO_NAME, remoteStoreRepoPath))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ protected void trimUnreferencedReaders(boolean indexDeleted, boolean trimLocal)

// This is to ensure that after the permits are acquired during primary relocation, there are no further modification on remote
// store.
if (startedPrimarySupplier.getAsBoolean() == false || pauseSync.get()) {
if (indexDeleted == false && (startedPrimarySupplier.getAsBoolean() == false || pauseSync.get())) {
return;
}

Expand Down Expand Up @@ -472,50 +472,55 @@ protected static Tuple<Long, Long> getMinMaxPrimaryTermFromMetadataFile(
}
}

public static void cleanup(TranslogTransferManager translogTransferManager) throws IOException {
ActionListener<List<BlobMetadata>> listMetadataFilesListener = new ActionListener<>() {
@Override
public void onResponse(List<BlobMetadata> blobMetadata) {
List<String> metadataFiles = blobMetadata.stream().map(BlobMetadata::name).collect(Collectors.toList());

try {
if (metadataFiles.isEmpty()) {
staticLogger.debug("No stale translog metadata files found");
return;
}
List<String> metadataFilesToBeDeleted = getMetadataFilesToBeDeleted(metadataFiles, new HashMap<>(), staticLogger);
if (metadataFilesToBeDeleted.isEmpty()) {
staticLogger.debug("No metadata files to delete");
return;
}
staticLogger.debug(() -> "metadataFilesToBeDeleted = " + metadataFilesToBeDeleted);
public static void cleanup(TranslogTransferManager translogTransferManager, boolean forceClean, String pinningEntity) throws IOException {
if (forceClean) {
translogTransferManager.delete();
} else {
ActionListener<List<BlobMetadata>> listMetadataFilesListener = new ActionListener<>() {
@Override
public void onResponse(List<BlobMetadata> blobMetadata) {
List<String> metadataFiles = blobMetadata.stream().map(BlobMetadata::name).collect(Collectors.toList());

try {
if (metadataFiles.isEmpty()) {
staticLogger.debug("No stale translog metadata files found");
return;
}
List<String> metadataFilesToBeDeleted = getMetadataFilesToBeDeleted(metadataFiles, new HashMap<>(), staticLogger);
if (metadataFilesToBeDeleted.isEmpty()) {
staticLogger.debug("No metadata files to delete");
return;
}
staticLogger.debug(() -> "metadataFilesToBeDeleted = " + metadataFilesToBeDeleted);

// For all the files that we are keeping, fetch min and max generations
List<String> metadataFilesNotToBeDeleted = new ArrayList<>(metadataFiles);
metadataFilesNotToBeDeleted.removeAll(metadataFilesToBeDeleted);
staticLogger.debug(() -> "metadataFilesNotToBeDeleted = " + metadataFilesNotToBeDeleted);

// For all the files that we are keeping, fetch min and max generations
List<String> metadataFilesNotToBeDeleted = new ArrayList<>(metadataFiles);
metadataFilesNotToBeDeleted.removeAll(metadataFilesToBeDeleted);
staticLogger.debug(() -> "metadataFilesNotToBeDeleted = " + metadataFilesNotToBeDeleted);
// Delete stale metadata files
translogTransferManager.deleteMetadataFilesAsync(metadataFilesToBeDeleted, () -> {
});

// Delete stale metadata files
translogTransferManager.deleteMetadataFilesAsync(metadataFilesToBeDeleted, () -> {});
// Delete stale primary terms
deleteStaleRemotePrimaryTerms(
metadataFilesNotToBeDeleted,
translogTransferManager,
new HashMap<>(),
new AtomicLong(Long.MAX_VALUE),
staticLogger
);
} catch (Exception e) {
staticLogger.error("Exception while cleaning up metadata and primary terms", e);
}
}

// Delete stale primary terms
deleteStaleRemotePrimaryTerms(
metadataFilesNotToBeDeleted,
translogTransferManager,
new HashMap<>(),
new AtomicLong(Long.MAX_VALUE),
staticLogger
);
} catch (Exception e) {
@Override
public void onFailure(Exception e) {
staticLogger.error("Exception while cleaning up metadata and primary terms", e);
}
}

@Override
public void onFailure(Exception e) {
staticLogger.error("Exception while cleaning up metadata and primary terms", e);
}
};
translogTransferManager.listTranslogMetadataFilesAsync(listMetadataFilesListener);
};
translogTransferManager.listTranslogMetadataFilesAsync(listMetadataFilesListener);
}
}
}
Loading

0 comments on commit fec6cd0

Please sign in to comment.