Skip to content

Commit

Permalink
address review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Poojita Raj <[email protected]>
  • Loading branch information
Poojita-Raj committed Jul 18, 2023
1 parent 262949b commit 7eb5ddb
Show file tree
Hide file tree
Showing 8 changed files with 31 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class NRTReplicationEngine extends Engine {
private final CompletionStatsCache completionStatsCache;
private final LocalCheckpointTracker localCheckpointTracker;
private final WriteOnlyTranslogManager translogManager;
private final boolean shouldCommit;

private volatile long lastReceivedGen = SequenceNumbers.NO_OPS_PERFORMED;

Expand Down Expand Up @@ -113,6 +114,7 @@ public void onAfterTranslogSync() {
engineConfig.getPrimaryModeSupplier()
);
this.translogManager = translogManagerRef;
this.shouldCommit = engineConfig.getIndexSettings().isRemoteStoreEnabled() == false;
} catch (IOException e) {
IOUtils.closeWhileHandlingException(store::decRef, readerManager, translogManagerRef);
throw new EngineCreationFailureException(shardId, "failed to create engine", e);
Expand All @@ -138,17 +140,12 @@ public synchronized void updateSegments(final SegmentInfos infos) throws IOExcep
ensureOpen();
final long maxSeqNo = Long.parseLong(infos.userData.get(MAX_SEQ_NO));
final long incomingGeneration = infos.getGeneration();
boolean remoteStoreEnabled = engineConfig.getIndexSettings().isRemoteStoreEnabled();
readerManager.updateSegments(infos, remoteStoreEnabled);
readerManager.updateSegments(infos);

// Commit and roll the translog when we receive a different generation than what was last received.
// lower/higher gens are possible from a new primary that was just elected.
if (incomingGeneration != lastReceivedGen) {
if (remoteStoreEnabled == false) {
commitSegmentInfos();
} else {
this.lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
}
commitSegmentInfos();
translogManager.getDeletionPolicy().setLocalCheckpointOfSafeCommit(maxSeqNo);
translogManager.rollTranslogGeneration();
}
Expand All @@ -168,7 +165,9 @@ public synchronized void updateSegments(final SegmentInfos infos) throws IOExcep
* @throws IOException - When there is an IO error committing the SegmentInfos.
*/
private void commitSegmentInfos(SegmentInfos infos) throws IOException {
store.commitSegmentInfos(infos, localCheckpointTracker.getMaxSeqNo(), localCheckpointTracker.getProcessedCheckpoint());
if (shouldCommit) {
store.commitSegmentInfos(infos, localCheckpointTracker.getMaxSeqNo(), localCheckpointTracker.getProcessedCheckpoint());
}
this.lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
translogManager.syncTranslog();
}
Expand Down Expand Up @@ -389,7 +388,7 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) {
: "Either the write lock must be held or the engine must be currently be failing itself";
try {
// if remote store is enabled, all segments durably persisted
if (engineConfig.getIndexSettings().isRemoteStoreEnabled() == false) {
if (shouldCommit) {
final SegmentInfos latestSegmentInfos = getLatestSegmentInfos();
/*
This is a workaround solution which decreases the chances of conflict on replica nodes when same file is copied
Expand All @@ -399,6 +398,9 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) {
latestSegmentInfos.counter = latestSegmentInfos.counter + SI_COUNTER_INCREMENT;
latestSegmentInfos.changed();
commitSegmentInfos(latestSegmentInfos);
} else {
store.directory().sync(List.of(store.directory().listAll()));
store.directory().syncMetaData();
}
IOUtils.close(readerManager, translogManager, store::decRef);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,12 @@ protected OpenSearchDirectoryReader refreshIfNeeded(OpenSearchDirectoryReader re
* Update this reader's segments and refresh.
*
* @param infos {@link SegmentInfos} infos
* @param remoteStoreEnabled true if remote store is enabled
* @throws IOException - When Refresh fails with an IOException.
*/
public synchronized void updateSegments(SegmentInfos infos, boolean remoteStoreEnabled) throws IOException {
public synchronized void updateSegments(SegmentInfos infos) throws IOException {
// roll over the currentInfo's generation, this ensures the on-disk gen
// is always increased (in the case remote store is not enabled)
if (remoteStoreEnabled == false) {
infos.updateGeneration(currentInfos);
}
// is always increased.
infos.updateGeneration(currentInfos);
currentInfos = infos;
maybeRefresh();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4643,16 +4643,15 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re
indexInput,
remoteSegmentMetadata.getGeneration()
);
// Replicas never need a local commit
if (shouldCommit) {
// Replicas never need a local commit
if (this.shardRouting.primary()) {
long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY));
// Following code block makes sure to use SegmentInfosSnapshot in the remote store if generation differs
// with local filesystem. If local filesystem already has segments_N+2 and infosSnapshot has generation N,
// after commit, there would be 2 files that would be created segments_N+1 and segments_N+2. With the
// policy of preserving only the latest commit, we will delete segments_N+1 which in fact is the part of the
// latest
// commit.
// latest commit.
Optional<String> localMaxSegmentInfos = localSegmentFiles.stream()
.filter(file -> file.startsWith(IndexFileNames.SEGMENTS))
.max(Comparator.comparingLong(SegmentInfos::generationFromSegmentsFileName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi
indexShard.prepareForIndexRecovery();
final boolean hasRemoteSegmentStore = indexShard.indexSettings().isRemoteStoreEnabled();
if (hasRemoteSegmentStore) {
indexShard.syncSegmentsFromRemoteSegmentStore(false, false, false);
indexShard.syncSegmentsFromRemoteSegmentStore(false, false, true);
}
final boolean hasRemoteTranslog = recoveryTarget.state().getPrimary() == false && indexShard.isRemoteTranslogEnabled();
final boolean hasNoTranslog = indexShard.indexSettings().isRemoteSnapshot();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,14 @@ public class NRTReplicationEngineTests extends EngineTestCase {
Settings.builder()
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, "true")
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, "true")
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, "translog-repo")
.build()
);

public void testCreateEngine() throws IOException {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
try (
final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory());
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, defaultSettings)
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore)
) {
final SegmentInfos latestSegmentInfos = nrtEngine.getLatestSegmentInfos();
final SegmentInfos lastCommittedSegmentInfos = nrtEngine.getLastCommittedSegmentInfos();
Expand All @@ -85,7 +83,7 @@ public void testEngineWritesOpsToTranslog() throws Exception {

try (
final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory());
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, defaultSettings)
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore)
) {
List<Engine.Operation> operations = generateHistoryOnReplica(
between(1, 500),
Expand Down Expand Up @@ -126,7 +124,7 @@ public void testUpdateSegments_replicaReceivesSISWithHigherGen() throws IOExcept

try (
final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory());
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, defaultSettings)
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore)
) {
// assume we start at the same gen.
assertEquals(2, nrtEngine.getLatestSegmentInfos().getGeneration());
Expand Down Expand Up @@ -161,7 +159,7 @@ public void testUpdateSegments_replicaReceivesSISWithHigherGen_remoteStoreEnable
// When remote store is enabled, we don't commit on replicas since all segments are durably persisted in the store
nrtEngine.updateSegments(engine.getLatestSegmentInfos());
assertEquals(2, nrtEngine.getLastCommittedSegmentInfos().getGeneration());
assertEquals(3, nrtEngine.getLatestSegmentInfos().getGeneration());
assertEquals(2, nrtEngine.getLatestSegmentInfos().getGeneration());
}
}

Expand All @@ -171,7 +169,7 @@ public void testUpdateSegments_replicaReceivesSISWithLowerGen() throws IOExcepti

try (
final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory());
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, defaultSettings)
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore)
) {
nrtEngine.getLatestSegmentInfos().changed();
nrtEngine.getLatestSegmentInfos().changed();
Expand All @@ -198,7 +196,7 @@ public void testSimultaneousEngineCloseAndCommit() throws IOException, Interrupt
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
try (
final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory());
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, defaultSettings)
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore)
) {
CountDownLatch latch = new CountDownLatch(1);
Thread commitThread = new Thread(() -> {
Expand Down Expand Up @@ -231,7 +229,7 @@ public void testUpdateSegments_replicaCommitsFirstReceivedInfos() throws IOExcep

try (
final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory());
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, defaultSettings)
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore)
) {
assertEquals(2, nrtEngine.getLastCommittedSegmentInfos().getGeneration());
assertEquals(2, nrtEngine.getLatestSegmentInfos().getGeneration());
Expand All @@ -255,7 +253,7 @@ public void testRefreshOnNRTEngine() throws IOException {

try (
final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory());
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, defaultSettings)
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore)
) {
assertEquals(2, nrtEngine.getLastCommittedSegmentInfos().getGeneration());
assertEquals(2, nrtEngine.getLatestSegmentInfos().getGeneration());
Expand All @@ -277,7 +275,7 @@ public void testTrimTranslogOps() throws Exception {

try (
final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory());
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, defaultSettings);
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore);
) {
List<Engine.Operation> operations = generateHistoryOnReplica(
between(1, 100),
Expand Down Expand Up @@ -313,7 +311,7 @@ public void testCommitSegmentInfos() throws Exception {

try (
final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory());
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, defaultSettings)
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore)
) {
List<Engine.Operation> operations = generateHistoryOnReplica(between(1, 500), randomBoolean(), randomBoolean(), randomBoolean())
.stream()
Expand Down Expand Up @@ -365,4 +363,8 @@ private NRTReplicationEngine buildNrtReplicaEngine(AtomicLong globalCheckpoint,
}
return new NRTReplicationEngine(replicaConfig);
}

private NRTReplicationEngine buildNrtReplicaEngine(AtomicLong globalCheckpoint, Store store) throws IOException {
return buildNrtReplicaEngine(globalCheckpoint, store, defaultSettings);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@

package org.opensearch.index.shard;

import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.tests.store.BaseDirectoryWrapper;
import org.junit.Assert;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.RecoverySource;
Expand Down Expand Up @@ -61,8 +58,6 @@ public void testStartSequenceForReplicaRecovery() throws Exception {
final IndexMetadata newIndexMetadata = IndexMetadata.builder(replica.indexSettings().getIndexMetadata())
.primaryTerm(replicaRouting.shardId().id(), replica.getOperationPrimaryTerm() + 1)
.build();
Directory storeDirectory = ((FilterDirectory) ((FilterDirectory) replica.store().directory()).getDelegate()).getDelegate();
((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false);
closeShards(replica);
shards.removeReplica(replica);

Expand Down Expand Up @@ -113,12 +108,6 @@ public IndexShard indexShard() {
shards.flush();
replicateSegments(primary, shards.getReplicas());
shards.assertAllEqual(numDocs + moreDocs);

storeDirectory = ((FilterDirectory) ((FilterDirectory) primary.store().directory()).getDelegate()).getDelegate();
((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false);

storeDirectory = ((FilterDirectory) ((FilterDirectory) newReplicaShard.store().directory()).getDelegate()).getDelegate();
((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false);
}
}

Expand Down Expand Up @@ -150,12 +139,6 @@ public void testNoTranslogHistoryTransferred() throws Exception {
shards.flush();
replicateSegments(primary, shards.getReplicas());
shards.assertAllEqual(numDocs + moreDocs);

Directory storeDirectory = ((FilterDirectory) ((FilterDirectory) primary.store().directory()).getDelegate()).getDelegate();
((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false);

storeDirectory = ((FilterDirectory) ((FilterDirectory) replica.store().directory()).getDelegate()).getDelegate();
((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@

package org.opensearch.index.shard;

import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.tests.store.BaseDirectoryWrapper;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.engine.NRTReplicationEngineFactory;
Expand Down Expand Up @@ -41,11 +38,6 @@ public void testReplicaSyncingFromRemoteStore() throws IOException {

replicaShard.syncSegmentsFromRemoteSegmentStore(true, true, false);
assertDocs(replicaShard, "1", "2");

Directory storeDirectory = ((FilterDirectory) ((FilterDirectory) primaryShard.store().directory()).getDelegate()).getDelegate();
((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false);
storeDirectory = ((FilterDirectory) ((FilterDirectory) replicaShard.store().directory()).getDelegate()).getDelegate();
((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false);
closeShards(primaryShard, replicaShard);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@

package org.opensearch.indices.recovery;

import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.tests.store.BaseDirectoryWrapper;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.IndexSettings;
Expand Down Expand Up @@ -66,15 +63,6 @@ public void testReplicaShardRecoveryUptoLastFlushedCommit() throws Exception {
// Step 7 - Check retention lease does not exist for the replica shard
assertEquals(1, primary.getRetentionLeases().leases().size());
assertFalse(primary.getRetentionLeases().contains(ReplicationTracker.getPeerRecoveryRetentionLeaseId(replica2.routingEntry())));

Directory storeDirectory = ((FilterDirectory) ((FilterDirectory) primary.store().directory()).getDelegate()).getDelegate();
((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false);

storeDirectory = ((FilterDirectory) ((FilterDirectory) replica1.store().directory()).getDelegate()).getDelegate();
((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false);

storeDirectory = ((FilterDirectory) ((FilterDirectory) replica2.store().directory()).getDelegate()).getDelegate();
((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false);
}
}
}

0 comments on commit 7eb5ddb

Please sign in to comment.