diff --git a/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java b/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java index 3e1112ae3069b..8f24196640b17 100644 --- a/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java @@ -46,11 +46,22 @@ public class NRTReplicationEngineTests extends EngineTestCase { Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build() ); + private static final IndexSettings REMOTE_STORE_INDEX_SETTINGS = IndexSettingsModule.newIndexSettings( + "index", + Settings.builder() + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, "true") + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, "true") + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, "translog-repo") + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_BUFFER_INTERVAL, "100ms") + .build() + ); + public void testCreateEngine() throws IOException { final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); try ( final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory()); - final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore) + final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, defaultSettings) ) { final SegmentInfos latestSegmentInfos = nrtEngine.getLatestSegmentInfos(); final SegmentInfos lastCommittedSegmentInfos = nrtEngine.getLastCommittedSegmentInfos(); @@ -74,7 +85,7 @@ public void testEngineWritesOpsToTranslog() throws Exception { try ( final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory()); - final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore) + final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, defaultSettings) ) { List operations = generateHistoryOnReplica( between(1, 500), @@ -115,7 +126,7 @@ public void testUpdateSegments_replicaReceivesSISWithHigherGen() throws IOExcept try ( final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory()); - final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore) + final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, defaultSettings) ) { // assume we start at the same gen. assertEquals(2, nrtEngine.getLatestSegmentInfos().getGeneration()); @@ -131,13 +142,36 @@ public void testUpdateSegments_replicaReceivesSISWithHigherGen() throws IOExcept } } + public void testUpdateSegments_replicaReceivesSISWithHigherGen_remoteStoreEnabled() throws IOException { + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + + try ( + final Store nrtEngineStore = createStore(REMOTE_STORE_INDEX_SETTINGS, newDirectory()); + final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, REMOTE_STORE_INDEX_SETTINGS) + ) { + // assume we start at the same gen. + assertEquals(2, nrtEngine.getLatestSegmentInfos().getGeneration()); + assertEquals(nrtEngine.getLatestSegmentInfos().getGeneration(), nrtEngine.getLastCommittedSegmentInfos().getGeneration()); + assertEquals(engine.getLatestSegmentInfos().getGeneration(), nrtEngine.getLatestSegmentInfos().getGeneration()); + + // flush the primary engine - we don't need any segments, just force a new commit point. + engine.flush(true, true); + assertEquals(3, engine.getLatestSegmentInfos().getGeneration()); + + // When remote store is enabled, we don't commit on replicas since all segments are durably persisted in the store + nrtEngine.updateSegments(engine.getLatestSegmentInfos()); + assertEquals(2, nrtEngine.getLastCommittedSegmentInfos().getGeneration()); + assertEquals(2, nrtEngine.getLatestSegmentInfos().getGeneration()); + } + } + public void testUpdateSegments_replicaReceivesSISWithLowerGen() throws IOException { // if the replica is already at segments_N that is received, it will commit segments_N+1. final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); try ( final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory()); - final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore) + final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, defaultSettings) ) { nrtEngine.getLatestSegmentInfos().changed(); nrtEngine.getLatestSegmentInfos().changed(); @@ -160,12 +194,42 @@ public void testUpdateSegments_replicaReceivesSISWithLowerGen() throws IOExcepti } } + public void testUpdateSegments_replicaReceivesSISWithLowerGen_remoteStoreEnabled() throws IOException { + // if the replica is already at segments_N that is received, it will commit segments_N+1. + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + + try ( + final Store nrtEngineStore = createStore(REMOTE_STORE_INDEX_SETTINGS, newDirectory()); + final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, REMOTE_STORE_INDEX_SETTINGS) + ) { + nrtEngine.getLatestSegmentInfos().changed(); + nrtEngine.getLatestSegmentInfos().changed(); + // commit the infos to push us to segments_3. + nrtEngine.commitSegmentInfos(); + assertEquals(3, nrtEngine.getLastCommittedSegmentInfos().getGeneration()); + assertEquals(3, nrtEngine.getLatestSegmentInfos().getGeneration()); + + // update the replica with segments_2 from the primary. + final SegmentInfos primaryInfos = engine.getLatestSegmentInfos(); + assertEquals(2, primaryInfos.getGeneration()); + + nrtEngine.updateSegments(primaryInfos); + assertEquals(3, nrtEngine.getLastCommittedSegmentInfos().getGeneration()); + assertEquals(3, nrtEngine.getLatestSegmentInfos().getGeneration()); + assertEquals(primaryInfos.getVersion(), nrtEngine.getLatestSegmentInfos().getVersion()); + assert (primaryInfos.getVersion() < nrtEngine.getLastCommittedSegmentInfos().getVersion()); + + nrtEngine.close(); + assertEquals(3, nrtEngine.getLastCommittedSegmentInfos().getGeneration()); + } + } + public void testUpdateSegments_replicaCommitsFirstReceivedInfos() throws IOException { final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); try ( final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory()); - final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore) + final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, defaultSettings) ) { assertEquals(2, nrtEngine.getLastCommittedSegmentInfos().getGeneration()); assertEquals(2, nrtEngine.getLatestSegmentInfos().getGeneration()); @@ -184,12 +248,38 @@ public void testUpdateSegments_replicaCommitsFirstReceivedInfos() throws IOExcep } } + public void testUpdateSegments_replicaCommitsFirstReceivedInfos_remoteStoreEnabled() throws IOException { + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + + try ( + final Store nrtEngineStore = createStore(REMOTE_STORE_INDEX_SETTINGS, newDirectory()); + final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, REMOTE_STORE_INDEX_SETTINGS) + ) { + assertEquals(2, nrtEngine.getLastCommittedSegmentInfos().getGeneration()); + assertEquals(2, nrtEngine.getLatestSegmentInfos().getGeneration()); + // bump the latest infos version a couple of times so that we can assert the correct version after commit. + engine.getLatestSegmentInfos().changed(); + engine.getLatestSegmentInfos().changed(); + assertNotEquals(nrtEngine.getLatestSegmentInfos().getVersion(), engine.getLatestSegmentInfos().getVersion()); + + // update replica with the latest primary infos, it will be the same gen, segments_2, ensure it is also committed. + final SegmentInfos primaryInfos = engine.getLatestSegmentInfos(); + assertEquals(2, primaryInfos.getGeneration()); + nrtEngine.updateSegments(primaryInfos); + + // When remote store is enabled, we don't commit on replicas since all segments are durably persisted in the store + final SegmentInfos lastCommittedSegmentInfos = nrtEngine.getLastCommittedSegmentInfos(); + assertEquals(primaryInfos.getVersion(), nrtEngine.getLatestSegmentInfos().getVersion()); + assert (primaryInfos.getVersion() > lastCommittedSegmentInfos.getVersion()); + } + } + public void testRefreshOnNRTEngine() throws IOException { final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); try ( final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory()); - final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore) + final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, defaultSettings) ) { assertEquals(2, nrtEngine.getLastCommittedSegmentInfos().getGeneration()); assertEquals(2, nrtEngine.getLatestSegmentInfos().getGeneration()); @@ -211,7 +301,7 @@ public void testTrimTranslogOps() throws Exception { try ( final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory()); - final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore); + final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, defaultSettings); ) { List operations = generateHistoryOnReplica( between(1, 100), @@ -247,7 +337,7 @@ public void testCommitSegmentInfos() throws Exception { try ( final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory()); - final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore) + final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, defaultSettings) ) { List operations = generateHistoryOnReplica(between(1, 500), randomBoolean(), randomBoolean(), randomBoolean()) .stream() @@ -282,18 +372,11 @@ public void testCommitSegmentInfos() throws Exception { } } - private NRTReplicationEngine buildNrtReplicaEngine(AtomicLong globalCheckpoint, Store store) throws IOException { + private NRTReplicationEngine buildNrtReplicaEngine(AtomicLong globalCheckpoint, Store store, IndexSettings settings) + throws IOException { Lucene.cleanLuceneIndex(store.directory()); final Path translogDir = createTempDir(); - final EngineConfig replicaConfig = config( - defaultSettings, - store, - translogDir, - NoMergePolicy.INSTANCE, - null, - null, - globalCheckpoint::get - ); + final EngineConfig replicaConfig = config(settings, store, translogDir, NoMergePolicy.INSTANCE, null, null, globalCheckpoint::get); if (Lucene.indexExists(store.directory()) == false) { store.createEmpty(replicaConfig.getIndexSettings().getIndexVersionCreated().luceneVersion); final String translogUuid = Translog.createEmptyTranslog( diff --git a/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java b/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java index 7a2f67e206f74..d86ed3f38801b 100644 --- a/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java +++ b/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java @@ -8,6 +8,9 @@ package org.opensearch.index.shard; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FilterDirectory; +import org.apache.lucene.tests.store.BaseDirectoryWrapper; import org.junit.Assert; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.routing.RecoverySource; @@ -57,6 +60,8 @@ public void testStartSequenceForReplicaRecovery() throws Exception { final IndexMetadata newIndexMetadata = IndexMetadata.builder(replica.indexSettings().getIndexMetadata()) .primaryTerm(replicaRouting.shardId().id(), replica.getOperationPrimaryTerm() + 1) .build(); + Directory storeDirectory = ((FilterDirectory) ((FilterDirectory) replica.store().directory()).getDelegate()).getDelegate(); + ((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false); closeShards(replica); shards.removeReplica(replica); @@ -107,6 +112,12 @@ public IndexShard indexShard() { shards.flush(); replicateSegments(primary, shards.getReplicas()); shards.assertAllEqual(numDocs + moreDocs); + + storeDirectory = ((FilterDirectory) ((FilterDirectory) primary.store().directory()).getDelegate()).getDelegate(); + ((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false); + + storeDirectory = ((FilterDirectory) ((FilterDirectory) newReplicaShard.store().directory()).getDelegate()).getDelegate(); + ((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false); } } @@ -138,6 +149,12 @@ public void testNoTranslogHistoryTransferred() throws Exception { shards.flush(); replicateSegments(primary, shards.getReplicas()); shards.assertAllEqual(numDocs + moreDocs); + + Directory storeDirectory = ((FilterDirectory) ((FilterDirectory) primary.store().directory()).getDelegate()).getDelegate(); + ((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false); + + storeDirectory = ((FilterDirectory) ((FilterDirectory) replica.store().directory()).getDelegate()).getDelegate(); + ((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false); } } } diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithRemoteIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithRemoteIndexShardTests.java index cae15a2c53f3f..4074563645376 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithRemoteIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithRemoteIndexShardTests.java @@ -8,6 +8,9 @@ package org.opensearch.index.shard; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FilterDirectory; +import org.apache.lucene.tests.store.BaseDirectoryWrapper; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; import org.opensearch.index.engine.NRTReplicationEngineFactory; @@ -38,6 +41,11 @@ public void testReplicaSyncingFromRemoteStore() throws IOException { replicaShard.syncSegmentsFromRemoteSegmentStore(true, true, false); assertDocs(replicaShard, "1", "2"); + + Directory storeDirectory = ((FilterDirectory) ((FilterDirectory) primaryShard.store().directory()).getDelegate()).getDelegate(); + ((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false); + storeDirectory = ((FilterDirectory) ((FilterDirectory) replicaShard.store().directory()).getDelegate()).getDelegate(); + ((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false); closeShards(primaryShard, replicaShard); } } diff --git a/server/src/test/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandlerTests.java b/server/src/test/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandlerTests.java index ff251f42ab21b..6452906bfdf98 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandlerTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandlerTests.java @@ -8,6 +8,9 @@ package org.opensearch.indices.recovery; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FilterDirectory; +import org.apache.lucene.tests.store.BaseDirectoryWrapper; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; import org.opensearch.index.engine.NRTReplicationEngineFactory; @@ -62,6 +65,15 @@ public void testReplicaShardRecoveryUptoLastFlushedCommit() throws Exception { // Step 7 - Check retention lease does not exist for the replica shard assertEquals(1, primary.getRetentionLeases().leases().size()); assertFalse(primary.getRetentionLeases().contains(ReplicationTracker.getPeerRecoveryRetentionLeaseId(replica2.routingEntry()))); + + Directory storeDirectory = ((FilterDirectory) ((FilterDirectory) primary.store().directory()).getDelegate()).getDelegate(); + ((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false); + + storeDirectory = ((FilterDirectory) ((FilterDirectory) replica1.store().directory()).getDelegate()).getDelegate(); + ((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false); + + storeDirectory = ((FilterDirectory) ((FilterDirectory) replica2.store().directory()).getDelegate()).getDelegate(); + ((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false); } } }