Skip to content

Commit

Permalink
add tests + fix failing tests
Browse files Browse the repository at this point in the history
Signed-off-by: Poojita Raj <[email protected]>
  • Loading branch information
Poojita-Raj committed Jun 26, 2023
1 parent 03d87ae commit 1aceeda
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,21 @@ public class NRTReplicationEngineTests extends EngineTestCase {
Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build()
);

private static final IndexSettings REMOTE_STORE_INDEX_SETTINGS = IndexSettingsModule.newIndexSettings(
"index",
Settings.builder()
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, "true")
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, "true")
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, "translog-repo")
.build()
);

public void testCreateEngine() throws IOException {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
try (
final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory());
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore)
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, defaultSettings)
) {
final SegmentInfos latestSegmentInfos = nrtEngine.getLatestSegmentInfos();
final SegmentInfos lastCommittedSegmentInfos = nrtEngine.getLastCommittedSegmentInfos();
Expand All @@ -74,7 +84,7 @@ public void testEngineWritesOpsToTranslog() throws Exception {

try (
final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory());
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore)
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, defaultSettings)
) {
List<Engine.Operation> operations = generateHistoryOnReplica(
between(1, 500),
Expand Down Expand Up @@ -115,7 +125,7 @@ public void testUpdateSegments_replicaReceivesSISWithHigherGen() throws IOExcept

try (
final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory());
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore)
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, defaultSettings)
) {
// assume we start at the same gen.
assertEquals(2, nrtEngine.getLatestSegmentInfos().getGeneration());
Expand All @@ -131,13 +141,36 @@ public void testUpdateSegments_replicaReceivesSISWithHigherGen() throws IOExcept
}
}

public void testUpdateSegments_replicaReceivesSISWithHigherGen_remoteStoreEnabled() throws IOException {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);

try (
final Store nrtEngineStore = createStore(REMOTE_STORE_INDEX_SETTINGS, newDirectory());
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, REMOTE_STORE_INDEX_SETTINGS)
) {
// assume we start at the same gen.
assertEquals(2, nrtEngine.getLatestSegmentInfos().getGeneration());
assertEquals(nrtEngine.getLatestSegmentInfos().getGeneration(), nrtEngine.getLastCommittedSegmentInfos().getGeneration());
assertEquals(engine.getLatestSegmentInfos().getGeneration(), nrtEngine.getLatestSegmentInfos().getGeneration());

// flush the primary engine - we don't need any segments, just force a new commit point.
engine.flush(true, true);
assertEquals(3, engine.getLatestSegmentInfos().getGeneration());

// When remote store is enabled, we don't commit on replicas since all segments are durably persisted in the store
nrtEngine.updateSegments(engine.getLatestSegmentInfos());
assertEquals(2, nrtEngine.getLastCommittedSegmentInfos().getGeneration());
assertEquals(2, nrtEngine.getLatestSegmentInfos().getGeneration());
}
}

public void testUpdateSegments_replicaReceivesSISWithLowerGen() throws IOException {
// if the replica is already at segments_N that is received, it will commit segments_N+1.
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);

try (
final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory());
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore)
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, defaultSettings)
) {
nrtEngine.getLatestSegmentInfos().changed();
nrtEngine.getLatestSegmentInfos().changed();
Expand All @@ -160,12 +193,42 @@ public void testUpdateSegments_replicaReceivesSISWithLowerGen() throws IOExcepti
}
}

public void testUpdateSegments_replicaReceivesSISWithLowerGen_remoteStoreEnabled() throws IOException {
// if the replica is already at segments_N that is received, it will commit segments_N+1.
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);

try (
final Store nrtEngineStore = createStore(REMOTE_STORE_INDEX_SETTINGS, newDirectory());
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, REMOTE_STORE_INDEX_SETTINGS)
) {
nrtEngine.getLatestSegmentInfos().changed();
nrtEngine.getLatestSegmentInfos().changed();
// commit the infos to push us to segments_3.
nrtEngine.commitSegmentInfos();
assertEquals(3, nrtEngine.getLastCommittedSegmentInfos().getGeneration());
assertEquals(3, nrtEngine.getLatestSegmentInfos().getGeneration());

// update the replica with segments_2 from the primary.
final SegmentInfos primaryInfos = engine.getLatestSegmentInfos();
assertEquals(2, primaryInfos.getGeneration());

nrtEngine.updateSegments(primaryInfos);
assertEquals(3, nrtEngine.getLastCommittedSegmentInfos().getGeneration());
assertEquals(3, nrtEngine.getLatestSegmentInfos().getGeneration());
assertEquals(primaryInfos.getVersion(), nrtEngine.getLatestSegmentInfos().getVersion());
assert (primaryInfos.getVersion() < nrtEngine.getLastCommittedSegmentInfos().getVersion());

nrtEngine.close();
assertEquals(3, nrtEngine.getLastCommittedSegmentInfos().getGeneration());
}
}

public void testUpdateSegments_replicaCommitsFirstReceivedInfos() throws IOException {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);

try (
final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory());
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore)
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, defaultSettings)
) {
assertEquals(2, nrtEngine.getLastCommittedSegmentInfos().getGeneration());
assertEquals(2, nrtEngine.getLatestSegmentInfos().getGeneration());
Expand All @@ -184,12 +247,38 @@ public void testUpdateSegments_replicaCommitsFirstReceivedInfos() throws IOExcep
}
}

public void testUpdateSegments_replicaCommitsFirstReceivedInfos_remoteStoreEnabled() throws IOException {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);

try (
final Store nrtEngineStore = createStore(REMOTE_STORE_INDEX_SETTINGS, newDirectory());
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, REMOTE_STORE_INDEX_SETTINGS)
) {
assertEquals(2, nrtEngine.getLastCommittedSegmentInfos().getGeneration());
assertEquals(2, nrtEngine.getLatestSegmentInfos().getGeneration());
// bump the latest infos version a couple of times so that we can assert the correct version after commit.
engine.getLatestSegmentInfos().changed();
engine.getLatestSegmentInfos().changed();
assertNotEquals(nrtEngine.getLatestSegmentInfos().getVersion(), engine.getLatestSegmentInfos().getVersion());

// update replica with the latest primary infos, it will be the same gen, segments_2, ensure it is also committed.
final SegmentInfos primaryInfos = engine.getLatestSegmentInfos();
assertEquals(2, primaryInfos.getGeneration());
nrtEngine.updateSegments(primaryInfos);

// When remote store is enabled, we don't commit on replicas since all segments are durably persisted in the store
final SegmentInfos lastCommittedSegmentInfos = nrtEngine.getLastCommittedSegmentInfos();
assertEquals(primaryInfos.getVersion(), nrtEngine.getLatestSegmentInfos().getVersion());
assert (primaryInfos.getVersion() > lastCommittedSegmentInfos.getVersion());
}
}

public void testRefreshOnNRTEngine() throws IOException {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);

try (
final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory());
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore)
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, defaultSettings)
) {
assertEquals(2, nrtEngine.getLastCommittedSegmentInfos().getGeneration());
assertEquals(2, nrtEngine.getLatestSegmentInfos().getGeneration());
Expand All @@ -211,7 +300,7 @@ public void testTrimTranslogOps() throws Exception {

try (
final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory());
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore);
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, defaultSettings);
) {
List<Engine.Operation> operations = generateHistoryOnReplica(
between(1, 100),
Expand Down Expand Up @@ -247,7 +336,7 @@ public void testCommitSegmentInfos() throws Exception {

try (
final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory());
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore)
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, defaultSettings)
) {
List<Engine.Operation> operations = generateHistoryOnReplica(between(1, 500), randomBoolean(), randomBoolean(), randomBoolean())
.stream()
Expand Down Expand Up @@ -282,18 +371,11 @@ public void testCommitSegmentInfos() throws Exception {
}
}

private NRTReplicationEngine buildNrtReplicaEngine(AtomicLong globalCheckpoint, Store store) throws IOException {
private NRTReplicationEngine buildNrtReplicaEngine(AtomicLong globalCheckpoint, Store store, IndexSettings settings)
throws IOException {
Lucene.cleanLuceneIndex(store.directory());
final Path translogDir = createTempDir();
final EngineConfig replicaConfig = config(
defaultSettings,
store,
translogDir,
NoMergePolicy.INSTANCE,
null,
null,
globalCheckpoint::get
);
final EngineConfig replicaConfig = config(settings, store, translogDir, NoMergePolicy.INSTANCE, null, null, globalCheckpoint::get);
if (Lucene.indexExists(store.directory()) == false) {
store.createEmpty(replicaConfig.getIndexSettings().getIndexVersionCreated().luceneVersion);
final String translogUuid = Translog.createEmptyTranslog(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@

package org.opensearch.index.shard;

import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.tests.store.BaseDirectoryWrapper;
import org.junit.Assert;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.RecoverySource;
Expand Down Expand Up @@ -58,6 +61,8 @@ public void testStartSequenceForReplicaRecovery() throws Exception {
final IndexMetadata newIndexMetadata = IndexMetadata.builder(replica.indexSettings().getIndexMetadata())
.primaryTerm(replicaRouting.shardId().id(), replica.getOperationPrimaryTerm() + 1)
.build();
Directory storeDirectory = ((FilterDirectory) ((FilterDirectory) replica.store().directory()).getDelegate()).getDelegate();
((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false);
closeShards(replica);
shards.removeReplica(replica);

Expand Down Expand Up @@ -108,6 +113,12 @@ public IndexShard indexShard() {
shards.flush();
replicateSegments(primary, shards.getReplicas());
shards.assertAllEqual(numDocs + moreDocs);

storeDirectory = ((FilterDirectory) ((FilterDirectory) primary.store().directory()).getDelegate()).getDelegate();
((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false);

storeDirectory = ((FilterDirectory) ((FilterDirectory) newReplicaShard.store().directory()).getDelegate()).getDelegate();
((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false);
}
}

Expand Down Expand Up @@ -139,6 +150,12 @@ public void testNoTranslogHistoryTransferred() throws Exception {
shards.flush();
replicateSegments(primary, shards.getReplicas());
shards.assertAllEqual(numDocs + moreDocs);

Directory storeDirectory = ((FilterDirectory) ((FilterDirectory) primary.store().directory()).getDelegate()).getDelegate();
((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false);

storeDirectory = ((FilterDirectory) ((FilterDirectory) replica.store().directory()).getDelegate()).getDelegate();
((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@

package org.opensearch.index.shard;

import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.tests.store.BaseDirectoryWrapper;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.engine.NRTReplicationEngineFactory;
Expand Down Expand Up @@ -38,6 +41,11 @@ public void testReplicaSyncingFromRemoteStore() throws IOException {

replicaShard.syncSegmentsFromRemoteSegmentStore(true, true, false);
assertDocs(replicaShard, "1", "2");

Directory storeDirectory = ((FilterDirectory) ((FilterDirectory) primaryShard.store().directory()).getDelegate()).getDelegate();
((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false);
storeDirectory = ((FilterDirectory) ((FilterDirectory) replicaShard.store().directory()).getDelegate()).getDelegate();
((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false);
closeShards(primaryShard, replicaShard);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@

package org.opensearch.indices.recovery;

import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.tests.store.BaseDirectoryWrapper;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.IndexSettings;
Expand Down Expand Up @@ -63,6 +66,15 @@ public void testReplicaShardRecoveryUptoLastFlushedCommit() throws Exception {
// Step 7 - Check retention lease does not exist for the replica shard
assertEquals(1, primary.getRetentionLeases().leases().size());
assertFalse(primary.getRetentionLeases().contains(ReplicationTracker.getPeerRecoveryRetentionLeaseId(replica2.routingEntry())));

Directory storeDirectory = ((FilterDirectory) ((FilterDirectory) primary.store().directory()).getDelegate()).getDelegate();
((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false);

storeDirectory = ((FilterDirectory) ((FilterDirectory) replica1.store().directory()).getDelegate()).getDelegate();
((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false);

storeDirectory = ((FilterDirectory) ((FilterDirectory) replica2.store().directory()).getDelegate()).getDelegate();
((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false);
}
}
}

0 comments on commit 1aceeda

Please sign in to comment.