diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index da6a18a1bc938..a9f7a2e70884c 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -56,6 +56,7 @@ public class NRTReplicationEngine extends Engine { private final CompletionStatsCache completionStatsCache; private final LocalCheckpointTracker localCheckpointTracker; private final WriteOnlyTranslogManager translogManager; + private final boolean shouldCommit; private volatile long lastReceivedGen = SequenceNumbers.NO_OPS_PERFORMED; @@ -113,6 +114,7 @@ public void onAfterTranslogSync() { engineConfig.getPrimaryModeSupplier() ); this.translogManager = translogManagerRef; + this.shouldCommit = engineConfig.getIndexSettings().isRemoteStoreEnabled() == false; } catch (IOException e) { IOUtils.closeWhileHandlingException(store::decRef, readerManager, translogManagerRef); throw new EngineCreationFailureException(shardId, "failed to create engine", e); @@ -138,17 +140,12 @@ public synchronized void updateSegments(final SegmentInfos infos) throws IOExcep ensureOpen(); final long maxSeqNo = Long.parseLong(infos.userData.get(MAX_SEQ_NO)); final long incomingGeneration = infos.getGeneration(); - boolean remoteStoreEnabled = engineConfig.getIndexSettings().isRemoteStoreEnabled(); - readerManager.updateSegments(infos, remoteStoreEnabled); + readerManager.updateSegments(infos); // Commit and roll the translog when we receive a different generation than what was last received. // lower/higher gens are possible from a new primary that was just elected. if (incomingGeneration != lastReceivedGen) { - if (remoteStoreEnabled == false) { - commitSegmentInfos(); - } else { - this.lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo(); - } + commitSegmentInfos(); translogManager.getDeletionPolicy().setLocalCheckpointOfSafeCommit(maxSeqNo); translogManager.rollTranslogGeneration(); } @@ -168,7 +165,9 @@ public synchronized void updateSegments(final SegmentInfos infos) throws IOExcep * @throws IOException - When there is an IO error committing the SegmentInfos. */ private void commitSegmentInfos(SegmentInfos infos) throws IOException { - store.commitSegmentInfos(infos, localCheckpointTracker.getMaxSeqNo(), localCheckpointTracker.getProcessedCheckpoint()); + if (shouldCommit) { + store.commitSegmentInfos(infos, localCheckpointTracker.getMaxSeqNo(), localCheckpointTracker.getProcessedCheckpoint()); + } this.lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo(); translogManager.syncTranslog(); } @@ -389,7 +388,7 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) { : "Either the write lock must be held or the engine must be currently be failing itself"; try { // if remote store is enabled, all segments durably persisted - if (engineConfig.getIndexSettings().isRemoteStoreEnabled() == false) { + if (shouldCommit) { final SegmentInfos latestSegmentInfos = getLatestSegmentInfos(); /* This is a workaround solution which decreases the chances of conflict on replica nodes when same file is copied @@ -399,6 +398,9 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) { latestSegmentInfos.counter = latestSegmentInfos.counter + SI_COUNTER_INCREMENT; latestSegmentInfos.changed(); commitSegmentInfos(latestSegmentInfos); + } else { + store.directory().sync(List.of(store.directory().listAll())); + store.directory().syncMetaData(); } IOUtils.close(readerManager, translogManager, store::decRef); } catch (Exception e) { diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java index 671e3e64962d8..268ba1a436393 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java @@ -96,15 +96,12 @@ protected OpenSearchDirectoryReader refreshIfNeeded(OpenSearchDirectoryReader re * Update this reader's segments and refresh. * * @param infos {@link SegmentInfos} infos - * @param remoteStoreEnabled true if remote store is enabled * @throws IOException - When Refresh fails with an IOException. */ - public synchronized void updateSegments(SegmentInfos infos, boolean remoteStoreEnabled) throws IOException { + public synchronized void updateSegments(SegmentInfos infos) throws IOException { // roll over the currentInfo's generation, this ensures the on-disk gen - // is always increased (in the case remote store is not enabled) - if (remoteStoreEnabled == false) { - infos.updateGeneration(currentInfos); - } + // is always increased. + infos.updateGeneration(currentInfos); currentInfos = infos; maybeRefresh(); } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 4f61e987a3dd2..3dd8a673bf779 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -4643,16 +4643,15 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re indexInput, remoteSegmentMetadata.getGeneration() ); + // Replicas never need a local commit if (shouldCommit) { - // Replicas never need a local commit if (this.shardRouting.primary()) { long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY)); // Following code block makes sure to use SegmentInfosSnapshot in the remote store if generation differs // with local filesystem. If local filesystem already has segments_N+2 and infosSnapshot has generation N, // after commit, there would be 2 files that would be created segments_N+1 and segments_N+2. With the // policy of preserving only the latest commit, we will delete segments_N+1 which in fact is the part of the - // latest - // commit. + // latest commit. Optional localMaxSegmentInfos = localSegmentFiles.stream() .filter(file -> file.startsWith(IndexFileNames.SEGMENTS)) .max(Comparator.comparingLong(SegmentInfos::generationFromSegmentsFileName)); diff --git a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java index 2af801ad9a7c6..a289c8f8a04b7 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java @@ -245,7 +245,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi indexShard.prepareForIndexRecovery(); final boolean hasRemoteSegmentStore = indexShard.indexSettings().isRemoteStoreEnabled(); if (hasRemoteSegmentStore) { - indexShard.syncSegmentsFromRemoteSegmentStore(false, false, false); + indexShard.syncSegmentsFromRemoteSegmentStore(false, false, true); } final boolean hasRemoteTranslog = recoveryTarget.state().getPrimary() == false && indexShard.isRemoteTranslogEnabled(); final boolean hasNoTranslog = indexShard.indexSettings().isRemoteSnapshot(); diff --git a/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java b/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java index 57bc529193b26..64fe42493c686 100644 --- a/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java @@ -52,8 +52,6 @@ public class NRTReplicationEngineTests extends EngineTestCase { Settings.builder() .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, "true") - .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, "true") - .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, "translog-repo") .build() ); @@ -61,7 +59,7 @@ public void testCreateEngine() throws IOException { final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); try ( final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory()); - final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, defaultSettings) + final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore) ) { final SegmentInfos latestSegmentInfos = nrtEngine.getLatestSegmentInfos(); final SegmentInfos lastCommittedSegmentInfos = nrtEngine.getLastCommittedSegmentInfos(); @@ -85,7 +83,7 @@ public void testEngineWritesOpsToTranslog() throws Exception { try ( final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory()); - final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, defaultSettings) + final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore) ) { List operations = generateHistoryOnReplica( between(1, 500), @@ -126,7 +124,7 @@ public void testUpdateSegments_replicaReceivesSISWithHigherGen() throws IOExcept try ( final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory()); - final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, defaultSettings) + final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore) ) { // assume we start at the same gen. assertEquals(2, nrtEngine.getLatestSegmentInfos().getGeneration()); @@ -161,7 +159,7 @@ public void testUpdateSegments_replicaReceivesSISWithHigherGen_remoteStoreEnable // When remote store is enabled, we don't commit on replicas since all segments are durably persisted in the store nrtEngine.updateSegments(engine.getLatestSegmentInfos()); assertEquals(2, nrtEngine.getLastCommittedSegmentInfos().getGeneration()); - assertEquals(3, nrtEngine.getLatestSegmentInfos().getGeneration()); + assertEquals(2, nrtEngine.getLatestSegmentInfos().getGeneration()); } } @@ -171,7 +169,7 @@ public void testUpdateSegments_replicaReceivesSISWithLowerGen() throws IOExcepti try ( final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory()); - final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, defaultSettings) + final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore) ) { nrtEngine.getLatestSegmentInfos().changed(); nrtEngine.getLatestSegmentInfos().changed(); @@ -198,7 +196,7 @@ public void testSimultaneousEngineCloseAndCommit() throws IOException, Interrupt final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); try ( final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory()); - final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, defaultSettings) + final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore) ) { CountDownLatch latch = new CountDownLatch(1); Thread commitThread = new Thread(() -> { @@ -231,7 +229,7 @@ public void testUpdateSegments_replicaCommitsFirstReceivedInfos() throws IOExcep try ( final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory()); - final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, defaultSettings) + final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore) ) { assertEquals(2, nrtEngine.getLastCommittedSegmentInfos().getGeneration()); assertEquals(2, nrtEngine.getLatestSegmentInfos().getGeneration()); @@ -255,7 +253,7 @@ public void testRefreshOnNRTEngine() throws IOException { try ( final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory()); - final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, defaultSettings) + final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore) ) { assertEquals(2, nrtEngine.getLastCommittedSegmentInfos().getGeneration()); assertEquals(2, nrtEngine.getLatestSegmentInfos().getGeneration()); @@ -277,7 +275,7 @@ public void testTrimTranslogOps() throws Exception { try ( final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory()); - final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, defaultSettings); + final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore); ) { List operations = generateHistoryOnReplica( between(1, 100), @@ -313,7 +311,7 @@ public void testCommitSegmentInfos() throws Exception { try ( final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory()); - final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, defaultSettings) + final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore) ) { List operations = generateHistoryOnReplica(between(1, 500), randomBoolean(), randomBoolean(), randomBoolean()) .stream() @@ -365,4 +363,8 @@ private NRTReplicationEngine buildNrtReplicaEngine(AtomicLong globalCheckpoint, } return new NRTReplicationEngine(replicaConfig); } + + private NRTReplicationEngine buildNrtReplicaEngine(AtomicLong globalCheckpoint, Store store) throws IOException { + return buildNrtReplicaEngine(globalCheckpoint, store, defaultSettings); + } } diff --git a/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java b/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java index 3953005b591da..aef90483b4bbd 100644 --- a/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java +++ b/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java @@ -8,9 +8,6 @@ package org.opensearch.index.shard; -import org.apache.lucene.store.Directory; -import org.apache.lucene.store.FilterDirectory; -import org.apache.lucene.tests.store.BaseDirectoryWrapper; import org.junit.Assert; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.routing.RecoverySource; @@ -61,8 +58,6 @@ public void testStartSequenceForReplicaRecovery() throws Exception { final IndexMetadata newIndexMetadata = IndexMetadata.builder(replica.indexSettings().getIndexMetadata()) .primaryTerm(replicaRouting.shardId().id(), replica.getOperationPrimaryTerm() + 1) .build(); - Directory storeDirectory = ((FilterDirectory) ((FilterDirectory) replica.store().directory()).getDelegate()).getDelegate(); - ((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false); closeShards(replica); shards.removeReplica(replica); @@ -113,12 +108,6 @@ public IndexShard indexShard() { shards.flush(); replicateSegments(primary, shards.getReplicas()); shards.assertAllEqual(numDocs + moreDocs); - - storeDirectory = ((FilterDirectory) ((FilterDirectory) primary.store().directory()).getDelegate()).getDelegate(); - ((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false); - - storeDirectory = ((FilterDirectory) ((FilterDirectory) newReplicaShard.store().directory()).getDelegate()).getDelegate(); - ((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false); } } @@ -150,12 +139,6 @@ public void testNoTranslogHistoryTransferred() throws Exception { shards.flush(); replicateSegments(primary, shards.getReplicas()); shards.assertAllEqual(numDocs + moreDocs); - - Directory storeDirectory = ((FilterDirectory) ((FilterDirectory) primary.store().directory()).getDelegate()).getDelegate(); - ((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false); - - storeDirectory = ((FilterDirectory) ((FilterDirectory) replica.store().directory()).getDelegate()).getDelegate(); - ((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false); } } } diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithRemoteIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithRemoteIndexShardTests.java index 4074563645376..cae15a2c53f3f 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithRemoteIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithRemoteIndexShardTests.java @@ -8,9 +8,6 @@ package org.opensearch.index.shard; -import org.apache.lucene.store.Directory; -import org.apache.lucene.store.FilterDirectory; -import org.apache.lucene.tests.store.BaseDirectoryWrapper; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; import org.opensearch.index.engine.NRTReplicationEngineFactory; @@ -41,11 +38,6 @@ public void testReplicaSyncingFromRemoteStore() throws IOException { replicaShard.syncSegmentsFromRemoteSegmentStore(true, true, false); assertDocs(replicaShard, "1", "2"); - - Directory storeDirectory = ((FilterDirectory) ((FilterDirectory) primaryShard.store().directory()).getDelegate()).getDelegate(); - ((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false); - storeDirectory = ((FilterDirectory) ((FilterDirectory) replicaShard.store().directory()).getDelegate()).getDelegate(); - ((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false); closeShards(primaryShard, replicaShard); } } diff --git a/server/src/test/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandlerTests.java b/server/src/test/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandlerTests.java index 9ae3ae7ccaee2..96022315743c2 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandlerTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandlerTests.java @@ -8,9 +8,6 @@ package org.opensearch.indices.recovery; -import org.apache.lucene.store.Directory; -import org.apache.lucene.store.FilterDirectory; -import org.apache.lucene.tests.store.BaseDirectoryWrapper; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; import org.opensearch.index.IndexSettings; @@ -66,15 +63,6 @@ public void testReplicaShardRecoveryUptoLastFlushedCommit() throws Exception { // Step 7 - Check retention lease does not exist for the replica shard assertEquals(1, primary.getRetentionLeases().leases().size()); assertFalse(primary.getRetentionLeases().contains(ReplicationTracker.getPeerRecoveryRetentionLeaseId(replica2.routingEntry()))); - - Directory storeDirectory = ((FilterDirectory) ((FilterDirectory) primary.store().directory()).getDelegate()).getDelegate(); - ((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false); - - storeDirectory = ((FilterDirectory) ((FilterDirectory) replica1.store().directory()).getDelegate()).getDelegate(); - ((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false); - - storeDirectory = ((FilterDirectory) ((FilterDirectory) replica2.store().directory()).getDelegate()).getDelegate(); - ((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false); } } }