Skip to content

Commit

Permalink
Optimize GC flow with pinned timestamps
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <[email protected]>
  • Loading branch information
sachinpkale committed Sep 13, 2024
1 parent 4223fab commit 1a26e5c
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore;

import org.opensearch.action.LatchedActionListener;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.junit.annotations.TestIssueLogging;

import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;

import static org.opensearch.index.IndexSettings.INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING;
import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.SEGMENTS;
import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.TRANSLOG;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.METADATA;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteStorePinnedTimestampsGarbageCollectionIT extends RemoteStoreBaseIntegTestCase {
static final String INDEX_NAME = "remote-store-test-idx-1";

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED.getKey(), true)
.build();
}

ActionListener<Void> noOpActionListener = new ActionListener<>() {
@Override
public void onResponse(Void unused) {}

@Override
public void onFailure(Exception e) {}
};

private void keepPinnedTimestampSchedulerUpdated() throws InterruptedException {
long currentTime = System.currentTimeMillis();
int maxRetry = 10;
while(maxRetry > 0 && RemoteStorePinnedTimestampService.getPinnedTimestamps().v1() <= currentTime) {
Thread.sleep(1000);
maxRetry--;
}
}

public void testLiveIndexNoPinnedTimestamps() throws Exception {
prepareCluster(1, 1, Settings.EMPTY);
Settings indexSettings = Settings.builder()
.put(remoteStoreIndexSettings(0, 1))
.put(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING.getKey(), 0)
.build();
createIndex(INDEX_NAME, indexSettings);
ensureYellowAndNoInitializingShards(INDEX_NAME);
ensureGreen(INDEX_NAME);

RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO);

RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance(
RemoteStorePinnedTimestampService.class,
primaryNodeName(INDEX_NAME)
);

remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1));

int numDocs = randomIntBetween(5, 10);
for (int i = 0; i < numDocs; i++) {
keepPinnedTimestampSchedulerUpdated();
indexSingleDoc(INDEX_NAME, true);
}

String translogPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.get(getNodeSettings());
String shardDataPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", TRANSLOG, DATA, translogPathFixedPrefix)
.buildAsString();
Path translogDataPath = Path.of(translogRepoPath + "/" + shardDataPath);
String shardMetadataPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", TRANSLOG, METADATA, translogPathFixedPrefix)
.buildAsString();
Path translogMetadataPath = Path.of(translogRepoPath + "/" + shardMetadataPath);

assertBusy(() -> {
assertEquals(1, Files.list(translogDataPath).collect(Collectors.toList()).size());
assertEquals(1, Files.list(translogMetadataPath).collect(Collectors.toList()).size());
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public class RemoteFsTimestampAwareTranslog extends RemoteFsTranslog {
private final Map<String, Tuple<Long, Long>> oldFormatMetadataFileGenerationMap;
private final Map<String, Tuple<Long, Long>> oldFormatMetadataFilePrimaryTermMap;
private final AtomicLong minPrimaryTermInRemote = new AtomicLong(Long.MAX_VALUE);
private long maxDeletedGenerationOnRemote = 0;

public RemoteFsTimestampAwareTranslog(
TranslogConfig config,
Expand Down Expand Up @@ -141,7 +142,7 @@ protected void trimUnreferencedReaders(boolean indexDeleted, boolean trimLocal)

// This is to fail fast and avoid listing md files un-necessarily.
if (indexDeleted == false && RemoteStoreUtils.isPinnedTimestampStateStale()) {
logger.warn("Skipping remote segment store garbage collection as last fetch of pinned timestamp is stale");
logger.warn("Skipping remote translog garbage collection as last fetch of pinned timestamp is stale");
return;
}

Expand All @@ -152,6 +153,12 @@ protected void trimUnreferencedReaders(boolean indexDeleted, boolean trimLocal)
return;
}

// This code block ensures parity with RemoteFsTranslog. Without this, we will end up making list translog metadata
// call in each invocation of trimUnreferencedReaders
if (indexDeleted == false && (minRemoteGenReferenced - maxDeletedGenerationOnRemote) < indexSettings().getRemoteTranslogExtraKeep()) {
return;
}

ActionListener<List<BlobMetadata>> listMetadataFilesListener = new ActionListener<>() {
@Override
public void onResponse(List<BlobMetadata> blobMetadata) {
Expand All @@ -166,7 +173,7 @@ public void onResponse(List<BlobMetadata> blobMetadata) {

// Check last fetch status of pinned timestamps. If stale, return.
if (indexDeleted == false && RemoteStoreUtils.isPinnedTimestampStateStale()) {
logger.warn("Skipping remote segment store garbage collection as last fetch of pinned timestamp is stale");
logger.warn("Skipping remote translog garbage collection as last fetch of pinned timestamp is stale");
remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS);
return;
}
Expand Down Expand Up @@ -196,12 +203,13 @@ public void onResponse(List<BlobMetadata> blobMetadata) {
logger.debug(() -> "metadataFilesNotToBeDeleted = " + metadataFilesNotToBeDeleted);
Set<Long> generationsToBeDeleted = getGenerationsToBeDeleted(
metadataFilesNotToBeDeleted,
metadataFilesToBeDeleted,
indexDeleted
metadataFilesToBeDeleted
);

logger.debug(() -> "generationsToBeDeleted = " + generationsToBeDeleted);
if (generationsToBeDeleted.isEmpty() == false) {
maxDeletedGenerationOnRemote = generationsToBeDeleted.stream().max(Long::compareTo).get();

// Delete stale generations
translogTransferManager.deleteGenerationAsync(
primaryTermSupplier.getAsLong(),
Expand Down Expand Up @@ -240,15 +248,8 @@ public void onFailure(Exception e) {
// Visible for testing
protected Set<Long> getGenerationsToBeDeleted(
List<String> metadataFilesNotToBeDeleted,
List<String> metadataFilesToBeDeleted,
boolean indexDeleted
List<String> metadataFilesToBeDeleted
) throws IOException {
long maxGenerationToBeDeleted = Long.MAX_VALUE;

if (indexDeleted == false) {
maxGenerationToBeDeleted = minRemoteGenReferenced - 1 - indexSettings().getRemoteTranslogExtraKeep();
}

Set<Long> generationsFromMetadataFilesToBeDeleted = new HashSet<>();
for (String mdFile : metadataFilesToBeDeleted) {
Tuple<Long, Long> minMaxGen = getMinMaxTranslogGenerationFromMetadataFile(mdFile, translogTransferManager);
Expand All @@ -262,7 +263,7 @@ protected Set<Long> getGenerationsToBeDeleted(
Set<Long> generationsToBeDeleted = new HashSet<>();
for (long generation : generationsFromMetadataFilesToBeDeleted) {
// Check if the generation is not referred by metadata file matching pinned timestamps
if (generation <= maxGenerationToBeDeleted && isGenerationPinned(generation, pinnedGenerations) == false) {
if (isGenerationPinned(generation, pinnedGenerations) == false) {
generationsToBeDeleted.add(generation);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ protected void runInternal() {
try {
Map<String, BlobMetadata> pinnedTimestampList = blobContainer.listBlobs();
if (pinnedTimestampList.isEmpty()) {
logger.debug("Fetched empty pinned timestamps from remote store: {}", triggerTimestamp);
pinnedTimestampsSet = new Tuple<>(triggerTimestamp, Set.of());
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -646,8 +646,7 @@ public void testGetGenerationsToBeDeletedEmptyMetadataFilesNotToBeDeleted() thro
);
Set<Long> generations = ((RemoteFsTimestampAwareTranslog) translog).getGenerationsToBeDeleted(
metadataFilesNotToBeDeleted,
metadataFilesToBeDeleted,
true
metadataFilesToBeDeleted
);
Set<Long> md1Generations = LongStream.rangeClosed(4, 7).boxed().collect(Collectors.toSet());
Set<Long> md2Generations = LongStream.rangeClosed(17, 37).boxed().collect(Collectors.toSet());
Expand Down Expand Up @@ -682,8 +681,7 @@ public void testGetGenerationsToBeDeleted() throws IOException {
);
Set<Long> generations = ((RemoteFsTimestampAwareTranslog) translog).getGenerationsToBeDeleted(
metadataFilesNotToBeDeleted,
metadataFilesToBeDeleted,
true
metadataFilesToBeDeleted
);
Set<Long> md1Generations = LongStream.rangeClosed(5, 7).boxed().collect(Collectors.toSet());
Set<Long> md2Generations = LongStream.rangeClosed(17, 25).boxed().collect(Collectors.toSet());
Expand Down

0 comments on commit 1a26e5c

Please sign in to comment.