diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index b5552ed552f09..a9f7a2e70884c 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -56,6 +56,7 @@ public class NRTReplicationEngine extends Engine { private final CompletionStatsCache completionStatsCache; private final LocalCheckpointTracker localCheckpointTracker; private final WriteOnlyTranslogManager translogManager; + private final boolean shouldCommit; private volatile long lastReceivedGen = SequenceNumbers.NO_OPS_PERFORMED; @@ -113,6 +114,7 @@ public void onAfterTranslogSync() { engineConfig.getPrimaryModeSupplier() ); this.translogManager = translogManagerRef; + this.shouldCommit = engineConfig.getIndexSettings().isRemoteStoreEnabled() == false; } catch (IOException e) { IOUtils.closeWhileHandlingException(store::decRef, readerManager, translogManagerRef); throw new EngineCreationFailureException(shardId, "failed to create engine", e); @@ -163,7 +165,9 @@ public synchronized void updateSegments(final SegmentInfos infos) throws IOExcep * @throws IOException - When there is an IO error committing the SegmentInfos. */ private void commitSegmentInfos(SegmentInfos infos) throws IOException { - store.commitSegmentInfos(infos, localCheckpointTracker.getMaxSeqNo(), localCheckpointTracker.getProcessedCheckpoint()); + if (shouldCommit) { + store.commitSegmentInfos(infos, localCheckpointTracker.getMaxSeqNo(), localCheckpointTracker.getProcessedCheckpoint()); + } this.lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo(); translogManager.syncTranslog(); } @@ -383,15 +387,21 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) { assert rwl.isWriteLockedByCurrentThread() || failEngineLock.isHeldByCurrentThread() : "Either the write lock must be held or the engine must be currently be failing itself"; try { - final SegmentInfos latestSegmentInfos = getLatestSegmentInfos(); - /* - This is a workaround solution which decreases the chances of conflict on replica nodes when same file is copied - from two different primaries during failover. Increasing counter helps in avoiding this conflict as counter is - used to generate new segment file names. The ideal solution is to identify the counter from previous primary. - */ - latestSegmentInfos.counter = latestSegmentInfos.counter + SI_COUNTER_INCREMENT; - latestSegmentInfos.changed(); - commitSegmentInfos(latestSegmentInfos); + // if remote store is enabled, all segments durably persisted + if (shouldCommit) { + final SegmentInfos latestSegmentInfos = getLatestSegmentInfos(); + /* + This is a workaround solution which decreases the chances of conflict on replica nodes when same file is copied + from two different primaries during failover. Increasing counter helps in avoiding this conflict as counter is + used to generate new segment file names. The ideal solution is to identify the counter from previous primary. + */ + latestSegmentInfos.counter = latestSegmentInfos.counter + SI_COUNTER_INCREMENT; + latestSegmentInfos.changed(); + commitSegmentInfos(latestSegmentInfos); + } else { + store.directory().sync(List.of(store.directory().listAll())); + store.directory().syncMetaData(); + } IOUtils.close(readerManager, translogManager, store::decRef); } catch (Exception e) { logger.warn("failed to close engine", e); diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 8541a1f5e554b..3dd8a673bf779 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -4643,31 +4643,34 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re indexInput, remoteSegmentMetadata.getGeneration() ); + // Replicas never need a local commit if (shouldCommit) { - long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY)); - // Following code block makes sure to use SegmentInfosSnapshot in the remote store if generation differs - // with local filesystem. If local filesystem already has segments_N+2 and infosSnapshot has generation N, - // after commit, there would be 2 files that would be created segments_N+1 and segments_N+2. With the - // policy of preserving only the latest commit, we will delete segments_N+1 which in fact is the part of the latest - // commit. - Optional localMaxSegmentInfos = localSegmentFiles.stream() - .filter(file -> file.startsWith(IndexFileNames.SEGMENTS)) - .max(Comparator.comparingLong(SegmentInfos::generationFromSegmentsFileName)); - if (localMaxSegmentInfos.isPresent() - && infosSnapshot.getGeneration() < SegmentInfos.generationFromSegmentsFileName(localMaxSegmentInfos.get()) - - 1) { - // If remote translog is not enabled, local translog will be created with different UUID. - // This fails in Store.trimUnsafeCommits() as translog UUID of checkpoint and SegmentInfos needs - // to be same. Following code block make sure to have the same UUID. - if (indexSettings.isRemoteTranslogStoreEnabled() == false) { - SegmentInfos localSegmentInfos = store.readLastCommittedSegmentsInfo(); - Map userData = new HashMap<>(infosSnapshot.getUserData()); - userData.put(TRANSLOG_UUID_KEY, localSegmentInfos.userData.get(TRANSLOG_UUID_KEY)); - infosSnapshot.setUserData(userData, false); + if (this.shardRouting.primary()) { + long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY)); + // Following code block makes sure to use SegmentInfosSnapshot in the remote store if generation differs + // with local filesystem. If local filesystem already has segments_N+2 and infosSnapshot has generation N, + // after commit, there would be 2 files that would be created segments_N+1 and segments_N+2. With the + // policy of preserving only the latest commit, we will delete segments_N+1 which in fact is the part of the + // latest commit. + Optional localMaxSegmentInfos = localSegmentFiles.stream() + .filter(file -> file.startsWith(IndexFileNames.SEGMENTS)) + .max(Comparator.comparingLong(SegmentInfos::generationFromSegmentsFileName)); + if (localMaxSegmentInfos.isPresent() + && infosSnapshot.getGeneration() < SegmentInfos.generationFromSegmentsFileName(localMaxSegmentInfos.get()) + - 1) { + // If remote translog is not enabled, local translog will be created with different UUID. + // This fails in Store.trimUnsafeCommits() as translog UUID of checkpoint and SegmentInfos needs + // to be same. Following code block make sure to have the same UUID. + if (indexSettings.isRemoteTranslogStoreEnabled() == false) { + SegmentInfos localSegmentInfos = store.readLastCommittedSegmentsInfo(); + Map userData = new HashMap<>(infosSnapshot.getUserData()); + userData.put(TRANSLOG_UUID_KEY, localSegmentInfos.userData.get(TRANSLOG_UUID_KEY)); + infosSnapshot.setUserData(userData, false); + } + storeDirectory.deleteFile(localMaxSegmentInfos.get()); } - storeDirectory.deleteFile(localMaxSegmentInfos.get()); + store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint); } - store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint); } else { finalizeReplication(infosSnapshot); } diff --git a/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java b/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java index e25e6ea206d84..64fe42493c686 100644 --- a/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java @@ -47,6 +47,14 @@ public class NRTReplicationEngineTests extends EngineTestCase { Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build() ); + private static final IndexSettings REMOTE_STORE_INDEX_SETTINGS = IndexSettingsModule.newIndexSettings( + "index", + Settings.builder() + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, "true") + .build() + ); + public void testCreateEngine() throws IOException { final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); try ( @@ -132,6 +140,29 @@ public void testUpdateSegments_replicaReceivesSISWithHigherGen() throws IOExcept } } + public void testUpdateSegments_replicaReceivesSISWithHigherGen_remoteStoreEnabled() throws IOException { + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + + try ( + final Store nrtEngineStore = createStore(REMOTE_STORE_INDEX_SETTINGS, newDirectory()); + final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, REMOTE_STORE_INDEX_SETTINGS) + ) { + // assume we start at the same gen. + assertEquals(2, nrtEngine.getLatestSegmentInfos().getGeneration()); + assertEquals(nrtEngine.getLatestSegmentInfos().getGeneration(), nrtEngine.getLastCommittedSegmentInfos().getGeneration()); + assertEquals(engine.getLatestSegmentInfos().getGeneration(), nrtEngine.getLatestSegmentInfos().getGeneration()); + + // flush the primary engine - we don't need any segments, just force a new commit point. + engine.flush(true, true); + assertEquals(3, engine.getLatestSegmentInfos().getGeneration()); + + // When remote store is enabled, we don't commit on replicas since all segments are durably persisted in the store + nrtEngine.updateSegments(engine.getLatestSegmentInfos()); + assertEquals(2, nrtEngine.getLastCommittedSegmentInfos().getGeneration()); + assertEquals(2, nrtEngine.getLatestSegmentInfos().getGeneration()); + } + } + public void testUpdateSegments_replicaReceivesSISWithLowerGen() throws IOException { // if the replica is already at segments_N that is received, it will commit segments_N+1. final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); @@ -315,18 +346,11 @@ public void testCommitSegmentInfos() throws Exception { } } - private NRTReplicationEngine buildNrtReplicaEngine(AtomicLong globalCheckpoint, Store store) throws IOException { + private NRTReplicationEngine buildNrtReplicaEngine(AtomicLong globalCheckpoint, Store store, IndexSettings settings) + throws IOException { Lucene.cleanLuceneIndex(store.directory()); final Path translogDir = createTempDir(); - final EngineConfig replicaConfig = config( - defaultSettings, - store, - translogDir, - NoMergePolicy.INSTANCE, - null, - null, - globalCheckpoint::get - ); + final EngineConfig replicaConfig = config(settings, store, translogDir, NoMergePolicy.INSTANCE, null, null, globalCheckpoint::get); if (Lucene.indexExists(store.directory()) == false) { store.createEmpty(replicaConfig.getIndexSettings().getIndexVersionCreated().luceneVersion); final String translogUuid = Translog.createEmptyTranslog( @@ -339,4 +363,8 @@ private NRTReplicationEngine buildNrtReplicaEngine(AtomicLong globalCheckpoint, } return new NRTReplicationEngine(replicaConfig); } + + private NRTReplicationEngine buildNrtReplicaEngine(AtomicLong globalCheckpoint, Store store) throws IOException { + return buildNrtReplicaEngine(globalCheckpoint, store, defaultSettings); + } }