From 9c2899f42fca1f663030cb40f15bbc158afc04f9 Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Fri, 13 Sep 2024 20:10:06 +0530 Subject: [PATCH] Add more tests Signed-off-by: Sachin Kale --- ...rePinnedTimestampsGarbageCollectionIT.java | 217 ++++++++++++++++-- .../RemoteFsTimestampAwareTranslog.java | 17 +- 2 files changed, 210 insertions(+), 24 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsGarbageCollectionIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsGarbageCollectionIT.java index fc51923dd09bd..d4c5328913779 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsGarbageCollectionIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsGarbageCollectionIT.java @@ -8,25 +8,26 @@ package org.opensearch.remotestore; -import org.opensearch.action.LatchedActionListener; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.collect.Tuple; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; -import org.opensearch.core.action.ActionListener; +import org.opensearch.index.translog.transfer.TranslogTransferMetadata; import org.opensearch.indices.RemoteStoreSettings; import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService; import org.opensearch.test.OpenSearchIntegTestCase; -import org.opensearch.test.junit.annotations.TestIssueLogging; +import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.util.HashSet; +import java.util.List; import java.util.Set; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import java.util.stream.LongStream; import static org.opensearch.index.IndexSettings.INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING; -import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.SEGMENTS; import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.TRANSLOG; import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA; import static org.opensearch.index.remote.RemoteStoreEnums.DataType.METADATA; @@ -43,14 +44,6 @@ protected Settings nodeSettings(int nodeOrdinal) { .build(); } - ActionListener noOpActionListener = new ActionListener<>() { - @Override - public void onResponse(Void unused) {} - - @Override - public void onFailure(Exception e) {} - }; - private void keepPinnedTimestampSchedulerUpdated() throws InterruptedException { long currentTime = System.currentTimeMillis(); int maxRetry = 10; @@ -88,15 +81,207 @@ public void testLiveIndexNoPinnedTimestamps() throws Exception { String translogPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.get(getNodeSettings()); String shardDataPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", TRANSLOG, DATA, translogPathFixedPrefix) .buildAsString(); - Path translogDataPath = Path.of(translogRepoPath + "/" + shardDataPath); + Path translogDataPath = Path.of(translogRepoPath + "/" + shardDataPath + "/1"); + String shardMetadataPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", TRANSLOG, METADATA, translogPathFixedPrefix) + .buildAsString(); + Path translogMetadataPath = Path.of(translogRepoPath + "/" + shardMetadataPath); + + assertBusy(() -> { + List metadataFiles = Files.list(translogMetadataPath).collect(Collectors.toList()); + assertEquals(1, metadataFiles.size()); + + verifyTranslogDataFileCount(metadataFiles, translogDataPath); + }); + } + + public void testLiveIndexNoPinnedTimestampsWithExtraGenSettingWithinLimit() throws Exception { + prepareCluster(1, 1, Settings.EMPTY); + Settings indexSettings = Settings.builder() + .put(remoteStoreIndexSettings(0, 1)) + .put(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING.getKey(), 10) + .build(); + createIndex(INDEX_NAME, indexSettings); + ensureYellowAndNoInitializingShards(INDEX_NAME); + ensureGreen(INDEX_NAME); + + RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO); + + RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance( + RemoteStorePinnedTimestampService.class, + primaryNodeName(INDEX_NAME) + ); + + remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1)); + + int numDocs = randomIntBetween(5, 10); + for (int i = 0; i < numDocs; i++) { + keepPinnedTimestampSchedulerUpdated(); + indexSingleDoc(INDEX_NAME, true); + } + + String translogPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.get(getNodeSettings()); + String shardDataPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", TRANSLOG, DATA, translogPathFixedPrefix) + .buildAsString(); + Path translogDataPath = Path.of(translogRepoPath + "/" + shardDataPath + "/1"); + String shardMetadataPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", TRANSLOG, METADATA, translogPathFixedPrefix) + .buildAsString(); + Path translogMetadataPath = Path.of(translogRepoPath + "/" + shardMetadataPath); + + assertBusy(() -> { + List metadataFiles = Files.list(translogMetadataPath).collect(Collectors.toList()); + assertEquals(numDocs + 1, metadataFiles.size()); + + verifyTranslogDataFileCount(metadataFiles, translogDataPath); + }); + } + + public void testLiveIndexNoPinnedTimestampsWithExtraGenSetting() throws Exception { + prepareCluster(1, 1, Settings.EMPTY); + Settings indexSettings = Settings.builder() + .put(remoteStoreIndexSettings(0, 1)) + .put(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING.getKey(), 4) + .build(); + createIndex(INDEX_NAME, indexSettings); + ensureYellowAndNoInitializingShards(INDEX_NAME); + ensureGreen(INDEX_NAME); + + RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO); + + RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance( + RemoteStorePinnedTimestampService.class, + primaryNodeName(INDEX_NAME) + ); + + remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1)); + + int numDocs = 5; + for (int i = 0; i < numDocs; i++) { + keepPinnedTimestampSchedulerUpdated(); + indexSingleDoc(INDEX_NAME, true); + } + + String translogPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.get(getNodeSettings()); + String shardDataPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", TRANSLOG, DATA, translogPathFixedPrefix) + .buildAsString(); + Path translogDataPath = Path.of(translogRepoPath + "/" + shardDataPath + "/1"); + String shardMetadataPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", TRANSLOG, METADATA, translogPathFixedPrefix) + .buildAsString(); + Path translogMetadataPath = Path.of(translogRepoPath + "/" + shardMetadataPath); + + assertBusy(() -> { + List metadataFiles = Files.list(translogMetadataPath).collect(Collectors.toList()); + assertEquals(3, metadataFiles.size()); + + verifyTranslogDataFileCount(metadataFiles, translogDataPath); + }); + } + + public void testIndexDeletionNoPinnedTimestamps() throws Exception { + prepareCluster(1, 1, Settings.EMPTY); + Settings indexSettings = Settings.builder() + .put(remoteStoreIndexSettings(0, 1)) + .put(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING.getKey(), 0) + .build(); + createIndex(INDEX_NAME, indexSettings); + ensureYellowAndNoInitializingShards(INDEX_NAME); + ensureGreen(INDEX_NAME); + + RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO); + + RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance( + RemoteStorePinnedTimestampService.class, + primaryNodeName(INDEX_NAME) + ); + + remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1)); + + int numDocs = randomIntBetween(5, 10); + for (int i = 0; i < numDocs; i++) { + keepPinnedTimestampSchedulerUpdated(); + indexSingleDoc(INDEX_NAME, true); + } + + String translogPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.get(getNodeSettings()); + String shardDataPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", TRANSLOG, DATA, translogPathFixedPrefix) + .buildAsString(); + Path translogDataPath = Path.of(translogRepoPath + "/" + shardDataPath + "/1"); String shardMetadataPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", TRANSLOG, METADATA, translogPathFixedPrefix) .buildAsString(); Path translogMetadataPath = Path.of(translogRepoPath + "/" + shardMetadataPath); assertBusy(() -> { - assertEquals(1, Files.list(translogDataPath).collect(Collectors.toList()).size()); - assertEquals(1, Files.list(translogMetadataPath).collect(Collectors.toList()).size()); + List metadataFiles = Files.list(translogMetadataPath).collect(Collectors.toList()); + assertEquals(1, metadataFiles.size()); + + verifyTranslogDataFileCount(metadataFiles, translogDataPath); }); + + System.out.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"); + System.out.println(Files.list(translogMetadataPath).collect(Collectors.toList())); + System.out.println(Files.list(translogDataPath).collect(Collectors.toList())); + System.out.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"); + + client().admin().indices().prepareDelete(INDEX_NAME).get(); + + assertBusy(() -> { + System.out.println("#######################"); + System.out.println(Files.list(translogMetadataPath).collect(Collectors.toList())); + System.out.println(Files.list(translogDataPath).collect(Collectors.toList())); + System.out.println("#######################"); + assertEquals(0, Files.list(translogMetadataPath).collect(Collectors.toList()).size()); + assertEquals(0, Files.list(translogDataPath).collect(Collectors.toList()).size()); + }, 120, TimeUnit.SECONDS); + } + +// public void testLiveIndexPinnedTimestamps() throws Exception { +// prepareCluster(1, 1, Settings.EMPTY); +// Settings indexSettings = Settings.builder() +// .put(remoteStoreIndexSettings(0, 1)) +// .put(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING.getKey(), 0) +// .build(); +// createIndex(INDEX_NAME, indexSettings); +// ensureYellowAndNoInitializingShards(INDEX_NAME); +// ensureGreen(INDEX_NAME); +// +// RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO); +// +// RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance( +// RemoteStorePinnedTimestampService.class, +// primaryNodeName(INDEX_NAME) +// ); +// +// remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1)); +// +// int numDocs = randomIntBetween(5, 10); +// for (int i = 0; i < numDocs; i++) { +// keepPinnedTimestampSchedulerUpdated(); +// indexSingleDoc(INDEX_NAME, true); +// } +// +// String translogPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.get(getNodeSettings()); +// String shardDataPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", TRANSLOG, DATA, translogPathFixedPrefix) +// .buildAsString(); +// Path translogDataPath = Path.of(translogRepoPath + "/" + shardDataPath + "/1"); +// String shardMetadataPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", TRANSLOG, METADATA, translogPathFixedPrefix) +// .buildAsString(); +// Path translogMetadataPath = Path.of(translogRepoPath + "/" + shardMetadataPath); +// +// assertBusy(() -> { +// List metadataFiles = Files.list(translogMetadataPath).collect(Collectors.toList()); +// assertEquals(1, metadataFiles.size()); +// +// verifyTranslogDataFileCount(metadataFiles, translogDataPath); +// }); +// } + + private void verifyTranslogDataFileCount(List metadataFiles, Path translogDataPath) throws IOException { + List mdFiles = metadataFiles.stream().map(p -> p.getFileName().toString()).collect(Collectors.toList()); + Set generations = new HashSet<>(); + for(String mdFile : mdFiles) { + Tuple minMaxGen = TranslogTransferMetadata.getMinMaxTranslogGenerationFromFilename(mdFile); + generations.addAll(LongStream.rangeClosed(minMaxGen.v1(), minMaxGen.v2()).boxed().collect(Collectors.toList())); + } + assertEquals(generations.size() * 2, Files.list(translogDataPath).collect(Collectors.toList()).size()); } } diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java index e0466e64954ee..191ff592c02ed 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java @@ -136,7 +136,7 @@ protected void trimUnreferencedReaders(boolean indexDeleted, boolean trimLocal) // This is to ensure that after the permits are acquired during primary relocation, there are no further modification on remote // store. - if (startedPrimarySupplier.getAsBoolean() == false || pauseSync.get()) { + if ((indexDeleted == false && startedPrimarySupplier.getAsBoolean() == false) || pauseSync.get()) { return; } @@ -146,6 +146,12 @@ protected void trimUnreferencedReaders(boolean indexDeleted, boolean trimLocal) return; } + // This code block ensures parity with RemoteFsTranslog. Without this, we will end up making list translog metadata + // call in each invocation of trimUnreferencedReaders + if (indexDeleted == false && (minRemoteGenReferenced - maxDeletedGenerationOnRemote) < indexSettings().getRemoteTranslogExtraKeep()) { + return; + } + // Since remote generation deletion is async, this ensures that only one generation deletion happens at a time. // Remote generations involves 2 async operations - 1) Delete translog generation files 2) Delete metadata files // We try to acquire 2 permits and if we can not, we return from here itself. @@ -153,19 +159,13 @@ protected void trimUnreferencedReaders(boolean indexDeleted, boolean trimLocal) return; } - // This code block ensures parity with RemoteFsTranslog. Without this, we will end up making list translog metadata - // call in each invocation of trimUnreferencedReaders - if (indexDeleted == false && (minRemoteGenReferenced - maxDeletedGenerationOnRemote) < indexSettings().getRemoteTranslogExtraKeep()) { - return; - } - ActionListener> listMetadataFilesListener = new ActionListener<>() { @Override public void onResponse(List blobMetadata) { List metadataFiles = blobMetadata.stream().map(BlobMetadata::name).collect(Collectors.toList()); try { - if (metadataFiles.size() <= 1) { + if (indexDeleted == false && metadataFiles.size() <= 1) { logger.debug("No stale translog metadata files found"); remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS); return; @@ -201,6 +201,7 @@ public void onResponse(List blobMetadata) { metadataFilesNotToBeDeleted.removeAll(metadataFilesToBeDeleted); logger.debug(() -> "metadataFilesNotToBeDeleted = " + metadataFilesNotToBeDeleted); + Set generationsToBeDeleted = getGenerationsToBeDeleted( metadataFilesNotToBeDeleted, metadataFilesToBeDeleted