Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issue of red index on close for remote enabled clusters #15990

Merged
merged 3 commits into from
Sep 25, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.search.SearchPhaseExecutionException;
import org.opensearch.client.Requests;
import org.opensearch.cluster.health.ClusterHealthStatus;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.RecoverySource;
@@ -202,7 +203,7 @@ public void testRemoteTranslogCleanup() throws Exception {

public void testStaleCommitDeletionWithInvokeFlush() throws Exception {
String dataNode = internalCluster().startNode();
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1));
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000L, -1));
int numberOfIterations = randomIntBetween(5, 15);
indexData(numberOfIterations, true, INDEX_NAME);
String segmentsPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_SEGMENTS_PATH_PREFIX.get(getNodeSettings());
@@ -1011,4 +1012,73 @@ public void testAsyncTranslogDurabilityRestrictionsThroughIdxTemplates() throws
.get()
);
}

public void testCloseIndexWithNoOpSyncAndFlushForSyncTranslog() throws InterruptedException {
internalCluster().startNodes(3);
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey(), "5s"))
.get();
Settings.Builder settings = Settings.builder()
.put(remoteStoreIndexSettings(0, 10000L, -1))
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "1s");
createIndex(INDEX_NAME, settings.build());
CountDownLatch latch = new CountDownLatch(1);
new Thread(() -> {
if (randomBoolean()) {
for (int i = 0; i < randomIntBetween(1, 5); i++) {
indexSingleDoc(INDEX_NAME);
}
flushAndRefresh(INDEX_NAME);
}
// Index single doc to start the asyn io processor to run which will lead to 10s wait time before the next sync.
indexSingleDoc(INDEX_NAME);
// Reduce the latch for the main thread to flush after some sleep.
latch.countDown();
// Index another doc and in this case the flush would have happened before the sync.
indexSingleDoc(INDEX_NAME);
}).start();
// Wait for atleast one doc to be ingested.
latch.await();
// Sleep for some time for the next doc to be present in lucene buffer. If flush happens first before the doc #2
// gets indexed, then it goes into the happy case where the close index happens succefully.
Thread.sleep(1000);
// Flush so that the subsequent sync or flushes are no-op.
flush(INDEX_NAME);
// Closing the index involves translog.sync and shard.flush which are now no-op.
client().admin().indices().close(Requests.closeIndexRequest(INDEX_NAME)).actionGet();
Thread.sleep(10000);
ensureGreen(INDEX_NAME);
}

public void testCloseIndexWithNoOpSyncAndFlushForAsyncTranslog() throws InterruptedException {
ashking94 marked this conversation as resolved.
Show resolved Hide resolved
internalCluster().startNodes(3);
Settings.Builder settings = Settings.builder()
.put(remoteStoreIndexSettings(0, 10000L, -1))
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "1s")
.put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Durability.ASYNC)
.put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), "10s");
createIndex(INDEX_NAME, settings.build());
CountDownLatch latch = new CountDownLatch(1);
new Thread(() -> {
// Index some docs to start the asyn io processor to run which will lead to 10s wait time before the next sync.
indexSingleDoc(INDEX_NAME);
indexSingleDoc(INDEX_NAME);
indexSingleDoc(INDEX_NAME);
// Reduce the latch for the main thread to flush after some sleep.
latch.countDown();
}).start();
// Wait for atleast one doc to be ingested.
latch.await();
// Sleep for some time for the next doc to be present in lucene buffer. If flush happens first before the doc #2
// gets indexed, then it goes into the happy case where the close index happens succefully.
Thread.sleep(1000);
ashking94 marked this conversation as resolved.
Show resolved Hide resolved
// Flush so that the subsequent sync or flushes are no-op.
flush(INDEX_NAME);
// Closing the index involves translog.sync and shard.flush which are now no-op.
client().admin().indices().close(Requests.closeIndexRequest(INDEX_NAME)).actionGet();
Thread.sleep(10000);
ensureGreen(INDEX_NAME);
}
}
Original file line number Diff line number Diff line change
@@ -78,6 +78,9 @@ public class RemoteFsTranslog extends Translog {
// min generation referred by last uploaded translog
protected volatile long minRemoteGenReferenced;

// the max global checkpoint that has been synced
protected volatile long globalCheckpointSynced;

// clean up translog folder uploaded by previous primaries once
protected final SetOnce<Boolean> olderPrimaryCleaned = new SetOnce<>();

@@ -437,9 +440,10 @@ private boolean upload(long primaryTerm, long generation, long maxSeqNo) throws
config.getNodeId()
).build()
) {
Checkpoint checkpoint = current.getLastSyncedCheckpoint();
return translogTransferManager.transferSnapshot(
transferSnapshotProvider,
new RemoteFsTranslogTransferListener(generation, primaryTerm, maxSeqNo)
new RemoteFsTranslogTransferListener(generation, primaryTerm, maxSeqNo, checkpoint.globalCheckpoint)
);
} finally {
syncPermit.release(SYNC_PERMIT);
@@ -474,7 +478,10 @@ public void sync() throws IOException {
public boolean syncNeeded() {
try (ReleasableLock lock = readLock.acquire()) {
return current.syncNeeded()
|| (maxRemoteTranslogGenerationUploaded + 1 < this.currentFileGeneration() && current.totalOperations() == 0);
|| (maxRemoteTranslogGenerationUploaded + 1 < this.currentFileGeneration() && current.totalOperations() == 0)
// The below condition on GCP exists to handle global checkpoint updates during close index.
// Refer issue - https://github.com/opensearch-project/OpenSearch/issues/15989
|| (current.getLastSyncedCheckpoint().globalCheckpoint > globalCheckpointSynced);
}
}

@@ -674,16 +681,24 @@ private class RemoteFsTranslogTransferListener implements TranslogTransferListen

private final long maxSeqNo;

RemoteFsTranslogTransferListener(long generation, long primaryTerm, long maxSeqNo) {
private final long globalCheckpoint;

RemoteFsTranslogTransferListener(long generation, long primaryTerm, long maxSeqNo, long globalCheckpoint) {
this.generation = generation;
this.primaryTerm = primaryTerm;
this.maxSeqNo = maxSeqNo;
this.globalCheckpoint = globalCheckpoint;
}

@Override
public void onUploadComplete(TransferSnapshot transferSnapshot) throws IOException {
maxRemoteTranslogGenerationUploaded = generation;
minRemoteGenReferenced = getMinFileGeneration();
// Update the global checkpoint only if the supplied global checkpoint is greater than it
// When a new writer is created the
if (globalCheckpoint > globalCheckpointSynced) {
globalCheckpointSynced = globalCheckpoint;
}
logger.debug(
"Successfully uploaded translog for primary term = {}, generation = {}, maxSeqNo = {}",
primaryTerm,