Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bugfix] Remote translog does not honour pinned timestamp for low value of indexSettings().getRemoteTranslogExtraKeep() #16078

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.remotestore;

import org.opensearch.action.support.IndicesOptions;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.Settings;
Expand All @@ -32,6 +33,7 @@
import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.TRANSLOG;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.METADATA;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoFailures;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteStorePinnedTimestampsGarbageCollectionIT extends RemoteStoreBaseIntegTestCase {
Expand Down Expand Up @@ -288,6 +290,79 @@ public void testLiveIndexWithPinnedTimestamps() throws Exception {
});
}

public void testLiveIndexWithPinnedTimestampsMultiplePrimaryTerms() throws Exception {
prepareCluster(1, 2, Settings.EMPTY);
Settings indexSettings = Settings.builder()
.put(remoteStoreIndexSettings(1, 1))
.put(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING.getKey(), 3)
.build();
createIndex(INDEX_NAME, indexSettings);
ensureYellowAndNoInitializingShards(INDEX_NAME);
ensureGreen(INDEX_NAME);

RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO);

RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance(
RemoteStorePinnedTimestampService.class,
primaryNodeName(INDEX_NAME)
);

remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1));

int numDocs = randomIntBetween(5, 10);
for (int i = 0; i < numDocs; i++) {
keepPinnedTimestampSchedulerUpdated();
indexSingleDoc(INDEX_NAME, true);
if (i == 2) {
RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.timeValueMinutes(1));
remoteStorePinnedTimestampService.pinTimestamp(System.currentTimeMillis(), "xyz", noOpActionListener);
RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO);
}
}

ingestDocs();

internalCluster().restartNode(primaryNodeName(INDEX_NAME));
ensureGreen(INDEX_NAME);

ingestDocs();

String translogPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.get(getNodeSettings());
String shardDataPath = getShardLevelBlobPath(
client(),
INDEX_NAME,
BlobPath.cleanPath(),
"0",
TRANSLOG,
DATA,
translogPathFixedPrefix
).buildAsString();
Path translogDataPath = Path.of(translogRepoPath + "/" + shardDataPath + "/1");

assertBusy(() -> {
List<Path> dataFiles = Files.list(translogDataPath).collect(Collectors.toList());
assertFalse(dataFiles.isEmpty());
});
}

private void ingestDocs() {
int numDocs = randomIntBetween(15, 20);
for (int i = 0; i < numDocs; i++) {
indexSingleDoc(INDEX_NAME, false);
}

assertNoFailures(client().admin().indices().prepareRefresh(INDEX_NAME).setIndicesOptions(IndicesOptions.lenientExpandOpen()).get());
flushAndRefresh(INDEX_NAME);

int numDocsPostFailover = randomIntBetween(15, 20);
for (int i = 0; i < numDocsPostFailover; i++) {
indexSingleDoc(INDEX_NAME, false);
}

flushAndRefresh(INDEX_NAME);
assertNoFailures(client().admin().indices().prepareRefresh(INDEX_NAME).setIndicesOptions(IndicesOptions.lenientExpandOpen()).get());
}

public void testIndexDeletionNoPinnedTimestamps() throws Exception {
prepareCluster(1, 1, Settings.EMPTY);
Settings indexSettings = Settings.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.opensearch.index.IndexSettings.INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
Expand Down Expand Up @@ -312,6 +313,107 @@ public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2MultipleSnapshots(
// translogPostDeletionOfSnapshot1.size()), 60, TimeUnit.SECONDS);
}

public void testRemoteStoreCleanupMultiplePrimaryOnSnapshotDeletion() throws Exception {
disableRepoConsistencyCheck("Remote store repository is being used in the test");
final Path remoteStoreRepoPath = randomRepoPath();
Settings settings = remoteStoreClusterSettings(REMOTE_REPO_NAME, remoteStoreRepoPath);
settings = Settings.builder()
.put(settings)
.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED.getKey(), true)
.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), RemoteStoreEnums.PathType.FIXED.toString())
.build();
String clusterManagerName = internalCluster().startClusterManagerOnlyNode(settings);
internalCluster().startDataOnlyNodes(3, settings);
final Client clusterManagerClient = internalCluster().clusterManagerClient();
ensureStableCluster(4);

RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance(
RemoteStorePinnedTimestampService.class,
clusterManagerName
);
remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1));
RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO);

final String snapshotRepoName = "snapshot-repo-name";
final Path snapshotRepoPath = randomRepoPath();
createRepository(snapshotRepoName, "mock", snapshotRepoSettingsForShallowV2(snapshotRepoPath));

final String remoteStoreEnabledIndexName = "remote-index-1";
final Settings remoteStoreEnabledIndexSettings = Settings.builder()
.put(getRemoteStoreBackedIndexSettings())
.put(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING.getKey(), 2)
.build();
createIndex(remoteStoreEnabledIndexName, remoteStoreEnabledIndexSettings);
ensureGreen(remoteStoreEnabledIndexName);

// Create 2 snapshots for primary term 1
keepPinnedTimestampSchedulerUpdated();
indexRandomDocs(remoteStoreEnabledIndexName, 5);
createSnapshot(snapshotRepoName, "snap1");
keepPinnedTimestampSchedulerUpdated();
indexRandomDocs(remoteStoreEnabledIndexName, 5);
createSnapshot(snapshotRepoName, "snap2");

// Restart current primary to change the primary term
internalCluster().restartNode(primaryNodeName(remoteStoreEnabledIndexName));
ensureGreen(remoteStoreEnabledIndexName);

// Create 2 snapshots for primary term 2
keepPinnedTimestampSchedulerUpdated();
indexRandomDocs(remoteStoreEnabledIndexName, 5);
createSnapshot(snapshotRepoName, "snap3");
keepPinnedTimestampSchedulerUpdated();
indexRandomDocs(remoteStoreEnabledIndexName, 5);
createSnapshot(snapshotRepoName, "snap4");

String indexUUID = client().admin()
.indices()
.prepareGetSettings(remoteStoreEnabledIndexName)
.get()
.getSetting(remoteStoreEnabledIndexName, IndexMetadata.SETTING_INDEX_UUID);

Path indexPath = Path.of(String.valueOf(remoteStoreRepoPath), indexUUID);
Path shardPath = Path.of(String.valueOf(indexPath), "0");
Path translogPath = Path.of(String.valueOf(shardPath), "translog", "data", "1");

// Deleting snap1 will still keep files in primary term 1 due to snap2
deleteSnapshot(clusterManagerClient, snapshotRepoName, "snap1");
assertTrue(RemoteStoreBaseIntegTestCase.getFileCount(translogPath) > 0);

// Deleting snap2 will not remove primary term 1 as we need to trigger trimUnreferencedReaders once
deleteSnapshot(clusterManagerClient, snapshotRepoName, "snap2");
assertTrue(RemoteStoreBaseIntegTestCase.getFileCount(translogPath) > 0);
sachinpkale marked this conversation as resolved.
Show resolved Hide resolved

// Index a doc to trigger trimUnreferencedReaders
RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO);
keepPinnedTimestampSchedulerUpdated();
indexRandomDocs(remoteStoreEnabledIndexName, 5);

assertBusy(() -> assertFalse(Files.exists(translogPath)), 30, TimeUnit.SECONDS);
}

private void createSnapshot(String repoName, String snapshotName) {
CreateSnapshotResponse createSnapshotResponse = client().admin()
.cluster()
.prepareCreateSnapshot(repoName, snapshotName)
.setWaitForCompletion(true)
.get();
SnapshotInfo snapshotInfo = createSnapshotResponse.getSnapshotInfo();

assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfo.successfulShards(), greaterThan(0));
assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards()));
assertThat(snapshotInfo.snapshotId().getName(), equalTo(snapshotName));
}

private void deleteSnapshot(Client clusterManagerClient, String repoName, String snapshotName) {
AcknowledgedResponse deleteSnapshotResponse = clusterManagerClient.admin()
.cluster()
.prepareDeleteSnapshot(repoName, snapshotName)
.get();
assertAcked(deleteSnapshotResponse);
}

private Settings snapshotV2Settings(Path remoteStoreRepoPath) {
Settings settings = Settings.builder()
.put(remoteStoreClusterSettings(REMOTE_REPO_NAME, remoteStoreRepoPath))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public void trimUnreferencedReaders() throws IOException {
protected void trimUnreferencedReaders(boolean indexDeleted, boolean trimLocal) throws IOException {
if (trimLocal) {
// clean up local translog files and updates readers
super.trimUnreferencedReaders();
super.trimUnreferencedReaders(true);
}

// Update file tracker to reflect local translog state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -549,9 +549,17 @@ protected Releasable drainSync() {

@Override
public void trimUnreferencedReaders() throws IOException {
trimUnreferencedReaders(false);
}

protected void trimUnreferencedReaders(boolean onlyTrimLocal) throws IOException {
// clean up local translog files and updates readers
super.trimUnreferencedReaders();

if (onlyTrimLocal) {
return;
}

// This is to ensure that after the permits are acquired during primary relocation, there are no further modification on remote
// store.
if (startedPrimarySupplier.getAsBoolean() == false || pauseSync.get()) {
Expand Down
Loading