diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java index 692727357a88a..ebb911c739eb3 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java @@ -18,6 +18,7 @@ import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest; import org.opensearch.action.index.IndexResponse; import org.opensearch.action.search.SearchPhaseExecutionException; +import org.opensearch.client.Requests; import org.opensearch.cluster.health.ClusterHealthStatus; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.routing.RecoverySource; @@ -202,7 +203,7 @@ public void testRemoteTranslogCleanup() throws Exception { public void testStaleCommitDeletionWithInvokeFlush() throws Exception { String dataNode = internalCluster().startNode(); - createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1)); + createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000L, -1)); int numberOfIterations = randomIntBetween(5, 15); indexData(numberOfIterations, true, INDEX_NAME); String segmentsPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_SEGMENTS_PATH_PREFIX.get(getNodeSettings()); @@ -1011,4 +1012,70 @@ public void testAsyncTranslogDurabilityRestrictionsThroughIdxTemplates() throws .get() ); } + + public void testCloseIndexWithNoOpSyncAndFlushForSyncTranslog() throws InterruptedException { + internalCluster().startNodes(3); + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings(Settings.builder().put(CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey(), "5s")) + .get(); + Settings.Builder settings = Settings.builder() + .put(remoteStoreIndexSettings(0, 10000L, -1)) + .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "1s"); + createIndex(INDEX_NAME, settings.build()); + CountDownLatch latch = new CountDownLatch(1); + new Thread(() -> { + if (randomBoolean()) { + for (int i = 0; i < randomIntBetween(1, 5); i++) { + indexSingleDoc(INDEX_NAME); + } + flushAndRefresh(INDEX_NAME); + } + // Index single doc to start the asyn io processor to run which will lead to 10s wait time before the next sync. + indexSingleDoc(INDEX_NAME); + // Reduce the latch for the main thread to flush after some sleep. + latch.countDown(); + // Index another doc and in this case the flush would have happened before the sync. + indexSingleDoc(INDEX_NAME); + }).start(); + // Wait for atleast one doc to be ingested. + latch.await(); + // Sleep for some time for the next doc to be present in lucene buffer. If flush happens first before the doc #2 + // gets indexed, then it goes into the happy case where the close index happens succefully. + Thread.sleep(1000); + // Flush so that the subsequent sync or flushes are no-op. + flush(INDEX_NAME); + // Closing the index involves translog.sync and shard.flush which are now no-op. + client().admin().indices().close(Requests.closeIndexRequest(INDEX_NAME)).actionGet(); + Thread.sleep(10000); + ensureGreen(INDEX_NAME); + } + + public void testCloseIndexWithNoOpSyncAndFlushForAsyncTranslog() throws InterruptedException { + internalCluster().startNodes(3); + Settings.Builder settings = Settings.builder() + .put(remoteStoreIndexSettings(0, 10000L, -1)) + .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "1s") + .put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Durability.ASYNC) + .put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), "10s"); + createIndex(INDEX_NAME, settings.build()); + CountDownLatch latch = new CountDownLatch(1); + new Thread(() -> { + // Index some docs to start the asyn io processor to run which will lead to 10s wait time before the next sync. + indexSingleDoc(INDEX_NAME); + indexSingleDoc(INDEX_NAME); + indexSingleDoc(INDEX_NAME); + // Reduce the latch for the main thread to flush after some sleep. + latch.countDown(); + }).start(); + // Wait for atleast one doc to be ingested. + latch.await(); + // Flush so that the subsequent sync or flushes are no-op. + flush(INDEX_NAME); + // Closing the index involves translog.sync and shard.flush which are now no-op. + client().admin().indices().close(Requests.closeIndexRequest(INDEX_NAME)).actionGet(); + Thread.sleep(10000); + ensureGreen(INDEX_NAME); + } } diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 82fc149cf086b..e700ce778107d 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -78,6 +78,9 @@ public class RemoteFsTranslog extends Translog { // min generation referred by last uploaded translog protected volatile long minRemoteGenReferenced; + // the max global checkpoint that has been synced + protected volatile long globalCheckpointSynced; + // clean up translog folder uploaded by previous primaries once protected final SetOnce olderPrimaryCleaned = new SetOnce<>(); @@ -437,9 +440,10 @@ private boolean upload(long primaryTerm, long generation, long maxSeqNo) throws config.getNodeId() ).build() ) { + Checkpoint checkpoint = current.getLastSyncedCheckpoint(); return translogTransferManager.transferSnapshot( transferSnapshotProvider, - new RemoteFsTranslogTransferListener(generation, primaryTerm, maxSeqNo) + new RemoteFsTranslogTransferListener(generation, primaryTerm, maxSeqNo, checkpoint.globalCheckpoint) ); } finally { syncPermit.release(SYNC_PERMIT); @@ -474,7 +478,10 @@ public void sync() throws IOException { public boolean syncNeeded() { try (ReleasableLock lock = readLock.acquire()) { return current.syncNeeded() - || (maxRemoteTranslogGenerationUploaded + 1 < this.currentFileGeneration() && current.totalOperations() == 0); + || (maxRemoteTranslogGenerationUploaded + 1 < this.currentFileGeneration() && current.totalOperations() == 0) + // The below condition on GCP exists to handle global checkpoint updates during close index. + // Refer issue - https://github.com/opensearch-project/OpenSearch/issues/15989 + || (current.getLastSyncedCheckpoint().globalCheckpoint > globalCheckpointSynced); } } @@ -682,10 +689,13 @@ private class RemoteFsTranslogTransferListener implements TranslogTransferListen private final long maxSeqNo; - RemoteFsTranslogTransferListener(long generation, long primaryTerm, long maxSeqNo) { + private final long globalCheckpoint; + + RemoteFsTranslogTransferListener(long generation, long primaryTerm, long maxSeqNo, long globalCheckpoint) { this.generation = generation; this.primaryTerm = primaryTerm; this.maxSeqNo = maxSeqNo; + this.globalCheckpoint = globalCheckpoint; } @Override @@ -693,6 +703,11 @@ public void onUploadComplete(TransferSnapshot transferSnapshot) throws IOExcepti maxRemoteTranslogGenerationUploaded = generation; long previousMinRemoteGenReferenced = minRemoteGenReferenced; minRemoteGenReferenced = getMinFileGeneration(); + // Update the global checkpoint only if the supplied global checkpoint is greater than it + // When a new writer is created the + if (globalCheckpoint > globalCheckpointSynced) { + globalCheckpointSynced = globalCheckpoint; + } if (previousMinRemoteGenReferenced != minRemoteGenReferenced) { onMinRemoteGenReferencedChange(); } diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java index 339d876274557..03c77a9a83f57 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java @@ -1801,6 +1801,83 @@ public void testDownloadWithEmptyTranslogOnlyInLocal() throws IOException { assertArrayEquals(filesPostFirstDownload, filesPostSecondDownload); } + public void testSyncWithGlobalCheckpointUpdate() throws IOException { + ArrayList ops = new ArrayList<>(); + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("1", 0, primaryTerm.get(), new byte[] { 1 })); + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("2", 1, primaryTerm.get(), new byte[] { 2 })); + + // Set a global checkpoint + long initialGlobalCheckpoint = 1L; + globalCheckpoint.set(initialGlobalCheckpoint); + + // Sync the translog + translog.sync(); + + // Verify that the globalCheckpointSynced is updated + assertEquals(initialGlobalCheckpoint, ((RemoteFsTranslog) translog).getLastSyncedCheckpoint().globalCheckpoint); + + // Update global checkpoint + long newGlobalCheckpoint = 2L; + globalCheckpoint.set(newGlobalCheckpoint); + + // Add a new operation and sync + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("3", 2, primaryTerm.get(), new byte[] { 3 })); + translog.sync(); + + // Verify that the globalCheckpointSynced is updated to the new value + assertEquals(newGlobalCheckpoint, ((RemoteFsTranslog) translog).getLastSyncedCheckpoint().globalCheckpoint); + } + + public void testSyncNeededWithGlobalCheckpointUpdate() throws IOException { + ArrayList ops = new ArrayList<>(); + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("1", 0, primaryTerm.get(), new byte[] { 1 })); + + // Set initial global checkpoint + long initialGlobalCheckpoint = 0L; + globalCheckpoint.set(initialGlobalCheckpoint); + + // Sync the translog + translog.sync(); + + // Verify that sync is not needed + assertFalse(translog.syncNeeded()); + + // Update global checkpoint + long newGlobalCheckpoint = 1L; + globalCheckpoint.set(newGlobalCheckpoint); + + // Verify that sync is now needed due to global checkpoint update + assertTrue(translog.syncNeeded()); + + // Sync again + translog.sync(); + + // Verify that sync is not needed after syncing + assertFalse(translog.syncNeeded()); + } + + public void testGlobalCheckpointUpdateDuringClose() throws IOException { + ArrayList ops = new ArrayList<>(); + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("1", 0, primaryTerm.get(), new byte[] { 1 })); + + // Set initial global checkpoint + long initialGlobalCheckpoint = 0L; + globalCheckpoint.set(initialGlobalCheckpoint); + + // Sync the translog + translog.sync(); + + // Update global checkpoint + long newGlobalCheckpoint = 1L; + globalCheckpoint.set(newGlobalCheckpoint); + + // Close the translog + translog.close(); + + // Verify that the last synced checkpoint includes the updated global checkpoint + assertEquals(newGlobalCheckpoint, ((RemoteFsTranslog) translog).getLastSyncedCheckpoint().globalCheckpoint); + } + public class ThrowingBlobRepository extends FsRepository { private final Environment environment;