From d4dd71c6472aa071889dcd3081acae082832ffc8 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Tue, 30 Aug 2022 12:56:39 -0700 Subject: [PATCH] [Backport 2.x] Support shard promotion with Segment Replication. (#4135) (#4325) * Support shard promotion with Segment Replication. This change adds basic failover support with segment replication. Once selected, a replica will commit and reopen a writeable engine. Signed-off-by: Marc Handalian * Add check to ensure a closed shard does not publish checkpoints. Signed-off-by: Marc Handalian * Clean up in SegmentReplicationIT. Signed-off-by: Marc Handalian * PR feedback. Signed-off-by: Marc Handalian * Fix merge conflict. Signed-off-by: Marc Handalian Signed-off-by: Marc Handalian Signed-off-by: Marc Handalian --- CHANGELOG.md | 1 + .../replication/SegmentReplicationIT.java | 148 +++++++++++++++++- .../org/opensearch/index/IndexService.java | 2 +- .../index/engine/NRTReplicationEngine.java | 17 ++ .../shard/CheckpointRefreshListener.java | 2 +- .../opensearch/index/shard/IndexShard.java | 46 ++++-- .../org/opensearch/index/store/Store.java | 42 +++++ .../engine/NRTReplicationEngineTests.java | 72 +++++++-- .../SegmentReplicationIndexShardTests.java | 80 +++++++++- 9 files changed, 378 insertions(+), 32 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1022d34b2723a..d463d4a370d72 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) ## [Unreleased] ### Added - Github workflow for changelog verification ([#4085](https://github.com/opensearch-project/OpenSearch/pull/4085)) +- Add failover support with Segment Replication enabled. ([#4325](https://github.com/opensearch-project/OpenSearch/pull/4325) ### Changed - Dependency updates (httpcore, mockito, slf4j, httpasyncclient, commons-codec) ([#4308](https://github.com/opensearch-project/OpenSearch/pull/4308)) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 617c090cd06aa..b843b99c097a8 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -19,7 +19,10 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.allocation.command.CancelAllocationCommand; +import org.opensearch.common.Nullable; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.FeatureFlags; import org.opensearch.index.Index; @@ -30,6 +33,7 @@ import org.opensearch.indices.IndicesService; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.test.BackgroundIndexer; +import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; import java.io.IOException; @@ -73,6 +77,109 @@ protected boolean addMockInternalEngine() { return false; } + public void testPrimaryStopped_ReplicaPromoted() throws Exception { + final String primary = internalCluster().startNode(); + createIndex(INDEX_NAME); + ensureYellowAndNoInitializingShards(INDEX_NAME); + final String replica = internalCluster().startNode(); + ensureGreen(INDEX_NAME); + + client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); + refresh(INDEX_NAME); + + waitForReplicaUpdate(); + assertHitCount(client(primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1); + assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1); + + // index another doc but don't refresh, we will ensure this is searchable once replica is promoted. + client().prepareIndex(INDEX_NAME).setId("2").setSource("bar", "baz").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); + + // stop the primary node - we only have one shard on here. + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primary)); + ensureYellowAndNoInitializingShards(INDEX_NAME); + + final ShardRouting replicaShardRouting = getShardRoutingForNodeName(replica); + assertNotNull(replicaShardRouting); + assertTrue(replicaShardRouting + " should be promoted as a primary", replicaShardRouting.primary()); + assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2); + + // assert we can index into the new primary. + client().prepareIndex(INDEX_NAME).setId("3").setSource("bar", "baz").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); + assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3); + + // start another node, index another doc and replicate. + String nodeC = internalCluster().startNode(); + ensureGreen(INDEX_NAME); + client().prepareIndex(INDEX_NAME).setId("4").setSource("baz", "baz").get(); + refresh(INDEX_NAME); + waitForReplicaUpdate(); + assertHitCount(client(nodeC).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 4); + assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 4); + assertSegmentStats(REPLICA_COUNT); + } + + public void testRestartPrimary() throws Exception { + final String primary = internalCluster().startNode(); + createIndex(INDEX_NAME); + ensureYellowAndNoInitializingShards(INDEX_NAME); + final String replica = internalCluster().startNode(); + ensureGreen(INDEX_NAME); + + assertEquals(getNodeContainingPrimaryShard().getName(), primary); + + final int initialDocCount = 1; + client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); + refresh(INDEX_NAME); + + waitForReplicaUpdate(); + assertDocCounts(initialDocCount, replica, primary); + + internalCluster().restartNode(primary); + ensureGreen(INDEX_NAME); + + assertEquals(getNodeContainingPrimaryShard().getName(), replica); + + flushAndRefresh(INDEX_NAME); + waitForReplicaUpdate(); + + assertDocCounts(initialDocCount, replica, primary); + assertSegmentStats(REPLICA_COUNT); + } + + public void testCancelPrimaryAllocation() throws Exception { + // this test cancels allocation on the primary - promoting the new replica and recreating the former primary as a replica. + final String primary = internalCluster().startNode(); + createIndex(INDEX_NAME); + ensureYellowAndNoInitializingShards(INDEX_NAME); + final String replica = internalCluster().startNode(); + ensureGreen(INDEX_NAME); + + final int initialDocCount = 1; + + client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); + refresh(INDEX_NAME); + + waitForReplicaUpdate(); + assertDocCounts(initialDocCount, replica, primary); + + final IndexShard indexShard = getIndexShard(primary); + client().admin() + .cluster() + .prepareReroute() + .add(new CancelAllocationCommand(INDEX_NAME, indexShard.shardId().id(), primary, true)) + .execute() + .actionGet(); + ensureGreen(INDEX_NAME); + + assertEquals(getNodeContainingPrimaryShard().getName(), replica); + + flushAndRefresh(INDEX_NAME); + waitForReplicaUpdate(); + + assertDocCounts(initialDocCount, replica, primary); + assertSegmentStats(REPLICA_COUNT); + } + public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception { final String nodeA = internalCluster().startNode(); final String nodeB = internalCluster().startNode(); @@ -240,9 +347,8 @@ public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception { assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3); assertHitCount(client(replicaNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3); - final Index index = resolveIndex(INDEX_NAME); - IndexShard primaryShard = getIndexShard(index, primaryNode); - IndexShard replicaShard = getIndexShard(index, replicaNode); + IndexShard primaryShard = getIndexShard(primaryNode); + IndexShard replicaShard = getIndexShard(replicaNode); assertEquals( primaryShard.translogStats().estimatedNumberOfOperations(), replicaShard.translogStats().estimatedNumberOfOperations() @@ -351,8 +457,7 @@ private void assertSegmentStats(int numberOfReplicas) throws IOException { final ShardRouting replicaShardRouting = shardSegment.getShardRouting(); ClusterState state = client(internalCluster().getMasterName()).admin().cluster().prepareState().get().getState(); final DiscoveryNode replicaNode = state.nodes().resolveNode(replicaShardRouting.currentNodeId()); - final Index index = resolveIndex(INDEX_NAME); - IndexShard indexShard = getIndexShard(index, replicaNode.getName()); + IndexShard indexShard = getIndexShard(replicaNode.getName()); final String lastCommitSegmentsFileName = SegmentInfos.getLastCommitSegmentsFileName(indexShard.store().directory()); // calls to readCommit will fail if a valid commit point and all its segments are not in the store. SegmentInfos.readCommit(indexShard.store().directory(), lastCommitSegmentsFileName); @@ -392,7 +497,8 @@ private void waitForReplicaUpdate() throws Exception { }); } - private IndexShard getIndexShard(Index index, String node) { + private IndexShard getIndexShard(String node) { + final Index index = resolveIndex(INDEX_NAME); IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node); IndexService indexService = indicesService.indexServiceSafe(index); final Optional shardId = indexService.shardIds().stream().findFirst(); @@ -409,7 +515,8 @@ private List getShardSegments(IndicesSegmentResponse indicesSeg } private Map getLatestSegments(ShardSegments segments) { - final Long latestPrimaryGen = segments.getSegments().stream().map(Segment::getGeneration).max(Long::compare).get(); + final Optional generation = segments.getSegments().stream().map(Segment::getGeneration).max(Long::compare); + final Long latestPrimaryGen = generation.get(); return segments.getSegments() .stream() .filter(s -> s.getGeneration() == latestPrimaryGen) @@ -419,4 +526,31 @@ private Map getLatestSegments(ShardSegments segments) { private Map> segmentsByShardType(ShardSegments[] replicationGroupSegments) { return Arrays.stream(replicationGroupSegments).collect(Collectors.groupingBy(s -> s.getShardRouting().primary())); } + + @Nullable + private ShardRouting getShardRoutingForNodeName(String nodeName) { + final ClusterState state = client(internalCluster().getClusterManagerName()).admin().cluster().prepareState().get().getState(); + for (IndexShardRoutingTable shardRoutingTable : state.routingTable().index(INDEX_NAME)) { + for (ShardRouting shardRouting : shardRoutingTable.activeShards()) { + final String nodeId = shardRouting.currentNodeId(); + final DiscoveryNode discoveryNode = state.nodes().resolveNode(nodeId); + if (discoveryNode.getName().equals(nodeName)) { + return shardRouting; + } + } + } + return null; + } + + private void assertDocCounts(int expectedDocCount, String... nodeNames) { + for (String node : nodeNames) { + assertHitCount(client(node).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedDocCount); + } + } + + private DiscoveryNode getNodeContainingPrimaryShard() { + final ClusterState state = client(internalCluster().getClusterManagerName()).admin().cluster().prepareState().get().getState(); + final ShardRouting primaryShard = state.routingTable().index(INDEX_NAME).shard(0).primaryShard(); + return state.nodes().resolveNode(primaryShard.currentNodeId()); + } } diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index 0a6d1501f2bea..929883027fa59 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -533,7 +533,7 @@ public synchronized IndexShard createShard( () -> globalCheckpointSyncer.accept(shardId), retentionLeaseSyncer, circuitBreakerService, - this.indexSettings.isSegRepEnabled() && routing.primary() ? checkpointPublisher : null + this.indexSettings.isSegRepEnabled() ? checkpointPublisher : null ); eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); eventListener.afterIndexShardCreated(indexShard); diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index 800e6d87c4b81..04385c4ac6c3b 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -129,6 +129,23 @@ public synchronized void updateSegments(final SegmentInfos infos, long seqNo) th localCheckpointTracker.fastForwardProcessedSeqNo(seqNo); } + /** + * Persist the latest live SegmentInfos. + * + * This method creates a commit point from the latest SegmentInfos. It is intended to be used when this shard is about to be promoted as the new primary. + * + * TODO: If this method is invoked while the engine is currently updating segments on its reader, wait for that update to complete so the updated segments are used. + * + * + * @throws IOException - When there is an IO error committing the SegmentInfos. + */ + public void commitSegmentInfos() throws IOException { + // TODO: This method should wait for replication events to finalize. + final SegmentInfos latestSegmentInfos = getLatestSegmentInfos(); + store.commitSegmentInfos(latestSegmentInfos, localCheckpointTracker.getMaxSeqNo(), localCheckpointTracker.getProcessedCheckpoint()); + translogManager.syncTranslog(); + } + @Override public String getHistoryUUID() { return loadHistoryUUID(lastCommittedSegmentInfos.userData); diff --git a/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java index 96d74bea85920..fb046e2310d93 100644 --- a/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java @@ -40,7 +40,7 @@ public void beforeRefresh() throws IOException { @Override public void afterRefresh(boolean didRefresh) throws IOException { - if (didRefresh && shard.getReplicationTracker().isPrimaryMode()) { + if (didRefresh && shard.state() != IndexShardState.CLOSED && shard.getReplicationTracker().isPrimaryMode()) { publisher.publish(shard); } } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index f3ad41d56687b..3babd32d12525 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -164,7 +164,6 @@ import org.opensearch.indices.recovery.RecoveryTarget; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; -import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; import org.opensearch.rest.RestStatus; @@ -240,6 +239,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl private final GlobalCheckpointListeners globalCheckpointListeners; private final PendingReplicationActions pendingReplicationActions; private final ReplicationTracker replicationTracker; + private final SegmentReplicationCheckpointPublisher checkpointPublisher; protected volatile ShardRouting shardRouting; protected volatile IndexShardState state; @@ -304,7 +304,6 @@ Runnable getGlobalCheckpointSyncer() { private final AtomicReference pendingRefreshLocation = new AtomicReference<>(); private final RefreshPendingLocationListener refreshPendingLocationListener; private volatile boolean useRetentionLeasesInPeerRecovery; - private final ReferenceManager.RefreshListener checkpointRefreshListener; public IndexShard( final ShardRouting shardRouting, @@ -410,11 +409,7 @@ public boolean shouldCache(Query query) { persistMetadata(path, indexSettings, shardRouting, null, logger); this.useRetentionLeasesInPeerRecovery = replicationTracker.hasAllPeerRecoveryRetentionLeases(); this.refreshPendingLocationListener = new RefreshPendingLocationListener(); - if (checkpointPublisher != null) { - this.checkpointRefreshListener = new CheckpointRefreshListener(this, checkpointPublisher); - } else { - this.checkpointRefreshListener = null; - } + this.checkpointPublisher = checkpointPublisher; } public ThreadPool getThreadPool() { @@ -614,6 +609,11 @@ public void updateShardState( + newRouting; assert getOperationPrimaryTerm() == newPrimaryTerm; try { + if (indexSettings.isSegRepEnabled()) { + // this Shard's engine was read only, we need to update its engine before restoring local history from xlog. + assert newRouting.primary() && currentRouting.primary() == false; + promoteNRTReplicaToPrimary(); + } replicationTracker.activatePrimaryMode(getLocalCheckpoint()); ensurePeerRecoveryRetentionLeasesExist(); /* @@ -3220,11 +3220,11 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) { } }; - final List internalRefreshListener; - if (this.checkpointRefreshListener != null) { - internalRefreshListener = Arrays.asList(new RefreshMetricUpdater(refreshMetric), checkpointRefreshListener); - } else { - internalRefreshListener = Collections.singletonList(new RefreshMetricUpdater(refreshMetric)); + final List internalRefreshListener = new ArrayList<>(); + internalRefreshListener.add(new RefreshMetricUpdater(refreshMetric)); + + if (this.checkpointPublisher != null && indexSettings.isSegRepEnabled() && shardRouting.primary()) { + internalRefreshListener.add(new CheckpointRefreshListener(this, this.checkpointPublisher)); } return this.engineConfigFactory.newEngineConfig( @@ -4108,4 +4108,26 @@ RetentionLeaseSyncer getRetentionLeaseSyncer() { public GatedCloseable getSegmentInfosSnapshot() { return getEngine().getSegmentInfosSnapshot(); } + + /** + * With segment replication enabled - prepare the shard's engine to be promoted as the new primary. + * + * If this shard is currently using a replication engine, this method: + * 1. Invokes {@link NRTReplicationEngine#commitSegmentInfos()} to ensure the engine can be reopened as writeable from the latest refresh point. + * InternalEngine opens its IndexWriter from an on-disk commit point, but this replica may have recently synced from a primary's refresh point, meaning it has documents searchable in its in-memory SegmentInfos + * that are not part of a commit point. This ensures that those documents are made part of a commit and do not need to be reindexed after promotion. + * 2. Invokes resetEngineToGlobalCheckpoint - This call performs the engine swap, opening up as a writeable engine and replays any operations in the xlog. The operations indexed from xlog here will be + * any ack'd writes that were not copied to this replica before promotion. + */ + private void promoteNRTReplicaToPrimary() { + assert shardRouting.primary() && indexSettings.isSegRepEnabled(); + getReplicationEngine().ifPresentOrElse(engine -> { + try { + engine.commitSegmentInfos(); + resetEngineToGlobalCheckpoint(); + } catch (IOException e) { + throw new EngineException(shardId, "Unable to update replica to writeable engine, failing shard", e); + } + }, () -> { throw new EngineException(shardId, "Expected replica engine to be of type NRTReplicationEngine"); }); + } } diff --git a/server/src/main/java/org/opensearch/index/store/Store.java b/server/src/main/java/org/opensearch/index/store/Store.java index 163717ad94c2c..58598ab2d08f4 100644 --- a/server/src/main/java/org/opensearch/index/store/Store.java +++ b/server/src/main/java/org/opensearch/index/store/Store.java @@ -121,6 +121,7 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.unmodifiableMap; +import static org.opensearch.index.seqno.SequenceNumbers.LOCAL_CHECKPOINT_KEY; /** * A Store provides plain access to files written by an opensearch index shard. Each shard @@ -799,6 +800,47 @@ public void beforeClose() { shardLock.setDetails("closing shard"); } + /** + * This method should only be used with Segment Replication. + * Perform a commit from a live {@link SegmentInfos}. Replica engines with segrep do not have an IndexWriter and Lucene does not currently + * have the ability to create a writer directly from a SegmentInfos object. To promote the replica as a primary and avoid reindexing, we must first commit + * on the replica so that it can be opened with a writeable engine. Further, InternalEngine currently invokes `trimUnsafeCommits` which reverts the engine to a previous safeCommit where the max seqNo is less than or equal + * to the current global checkpoint. It is likely that the replica has a maxSeqNo that is higher than the global cp and a new commit will be wiped. + * + * To get around these limitations, this method first creates an IndexCommit directly from SegmentInfos, it then + * uses an appending IW to create an IndexCommit from the commit created on SegmentInfos. + * This ensures that 1. All files in the new commit are fsynced and 2. Deletes older commit points so the only commit to start from is our new commit. + * + * @param latestSegmentInfos {@link SegmentInfos} The latest active infos + * @param maxSeqNo The engine's current maxSeqNo + * @param processedCheckpoint The engine's current processed checkpoint. + * @throws IOException when there is an IO error committing. + */ + public void commitSegmentInfos(SegmentInfos latestSegmentInfos, long maxSeqNo, long processedCheckpoint) throws IOException { + assert indexSettings.isSegRepEnabled(); + metadataLock.writeLock().lock(); + try { + final Map userData = new HashMap<>(latestSegmentInfos.getUserData()); + userData.put(LOCAL_CHECKPOINT_KEY, String.valueOf(processedCheckpoint)); + userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo)); + latestSegmentInfos.setUserData(userData, true); + latestSegmentInfos.commit(directory()); + + // similar to TrimUnsafeCommits, create a commit with an appending IW, this will delete old commits and ensure all files + // associated with the SegmentInfos.commit are fsynced. + final List existingCommits = DirectoryReader.listCommits(directory); + assert existingCommits.isEmpty() == false : "Expected at least one commit but none found"; + final IndexCommit lastIndexCommit = existingCommits.get(existingCommits.size() - 1); + assert latestSegmentInfos.getSegmentsFileName().equals(lastIndexCommit.getSegmentsFileName()); + try (IndexWriter writer = newAppendingIndexWriter(directory, lastIndexCommit)) { + writer.setLiveCommitData(lastIndexCommit.getUserData().entrySet()); + writer.commit(); + } + } finally { + metadataLock.writeLock().unlock(); + } + } + /** * A store directory * diff --git a/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java b/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java index 29fff69a433a4..de5198de9a29e 100644 --- a/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java @@ -12,18 +12,25 @@ import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.index.SegmentInfos; import org.hamcrest.MatcherAssert; +import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.lucene.Lucene; import org.opensearch.common.lucene.search.Queries; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.IndexSettings; import org.opensearch.index.mapper.ParsedDocument; +import org.opensearch.index.seqno.LocalCheckpointTracker; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.store.Store; import org.opensearch.index.translog.TestTranslog; import org.opensearch.index.translog.Translog; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.test.IndexSettingsModule; import java.io.IOException; import java.nio.file.Path; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; @@ -31,6 +38,8 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.notNullValue; import static org.opensearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; +import static org.opensearch.index.seqno.SequenceNumbers.LOCAL_CHECKPOINT_KEY; +import static org.opensearch.index.seqno.SequenceNumbers.MAX_SEQ_NO; public class NRTReplicationEngineTests extends EngineTestCase { @@ -111,8 +120,9 @@ public void testUpdateSegments() throws Exception { engine.refresh("test"); - nrtEngine.updateSegments(engine.getLatestSegmentInfos(), engine.getProcessedLocalCheckpoint()); - assertMatchingSegmentsAndCheckpoints(nrtEngine); + final SegmentInfos latestPrimaryInfos = engine.getLatestSegmentInfos(); + nrtEngine.updateSegments(latestPrimaryInfos, engine.getProcessedLocalCheckpoint()); + assertMatchingSegmentsAndCheckpoints(nrtEngine, latestPrimaryInfos); // assert a doc from the operations exists. final ParsedDocument parsedDoc = createParsedDoc(operations.stream().findFirst().get().id(), null); @@ -143,8 +153,9 @@ public void testUpdateSegments() throws Exception { ); } - nrtEngine.updateSegments(engine.getLastCommittedSegmentInfos(), engine.getProcessedLocalCheckpoint()); - assertMatchingSegmentsAndCheckpoints(nrtEngine); + final SegmentInfos primaryInfos = engine.getLastCommittedSegmentInfos(); + nrtEngine.updateSegments(primaryInfos, engine.getProcessedLocalCheckpoint()); + assertMatchingSegmentsAndCheckpoints(nrtEngine, primaryInfos); assertEquals( assertAndGetInternalTranslogManager(nrtEngine.translogManager()).getTranslog().getGeneration().translogFileGeneration, @@ -205,14 +216,57 @@ public void testTrimTranslogOps() throws Exception { } } - private void assertMatchingSegmentsAndCheckpoints(NRTReplicationEngine nrtEngine) throws IOException { + public void testCommitSegmentInfos() throws Exception { + // This test asserts that NRTReplication#commitSegmentInfos creates a new commit point with the latest checkpoints + // stored in user data. + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings( + "index", + Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build() + ); + try ( + final Store nrtEngineStore = createStore(indexSettings, newDirectory()); + final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore) + ) { + List operations = generateHistoryOnReplica(between(1, 500), randomBoolean(), randomBoolean(), randomBoolean()) + .stream() + .filter(op -> op.operationType().equals(Engine.Operation.TYPE.INDEX)) + .collect(Collectors.toList()); + for (Engine.Operation op : operations) { + applyOperation(nrtEngine, op); + } + + final SegmentInfos previousInfos = nrtEngine.getLatestSegmentInfos(); + LocalCheckpointTracker localCheckpointTracker = nrtEngine.getLocalCheckpointTracker(); + final long maxSeqNo = localCheckpointTracker.getMaxSeqNo(); + final long processedCheckpoint = localCheckpointTracker.getProcessedCheckpoint(); + nrtEngine.commitSegmentInfos(); + + // ensure getLatestSegmentInfos returns an updated infos ref with correct userdata. + final SegmentInfos latestSegmentInfos = nrtEngine.getLatestSegmentInfos(); + assertEquals(previousInfos.getGeneration(), latestSegmentInfos.getLastGeneration()); + Map userData = latestSegmentInfos.getUserData(); + assertEquals(processedCheckpoint, localCheckpointTracker.getProcessedCheckpoint()); + assertEquals(maxSeqNo, Long.parseLong(userData.get(MAX_SEQ_NO))); + assertEquals(processedCheckpoint, Long.parseLong(userData.get(LOCAL_CHECKPOINT_KEY))); + + // read infos from store and assert userdata + final String lastCommitSegmentsFileName = SegmentInfos.getLastCommitSegmentsFileName(nrtEngineStore.directory()); + final SegmentInfos committedInfos = SegmentInfos.readCommit(nrtEngineStore.directory(), lastCommitSegmentsFileName); + userData = committedInfos.getUserData(); + assertEquals(processedCheckpoint, Long.parseLong(userData.get(LOCAL_CHECKPOINT_KEY))); + assertEquals(maxSeqNo, Long.parseLong(userData.get(MAX_SEQ_NO))); + } + } + + private void assertMatchingSegmentsAndCheckpoints(NRTReplicationEngine nrtEngine, SegmentInfos expectedSegmentInfos) + throws IOException { assertEquals(engine.getPersistedLocalCheckpoint(), nrtEngine.getPersistedLocalCheckpoint()); assertEquals(engine.getProcessedLocalCheckpoint(), nrtEngine.getProcessedLocalCheckpoint()); assertEquals(engine.getLocalCheckpointTracker().getMaxSeqNo(), nrtEngine.getLocalCheckpointTracker().getMaxSeqNo()); - assertEquals(engine.getLatestSegmentInfos().files(true), nrtEngine.getLatestSegmentInfos().files(true)); - assertEquals(engine.getLatestSegmentInfos().getUserData(), nrtEngine.getLatestSegmentInfos().getUserData()); - assertEquals(engine.getLatestSegmentInfos().getVersion(), nrtEngine.getLatestSegmentInfos().getVersion()); - assertEquals(engine.segments(true), nrtEngine.segments(true)); + assertEquals(expectedSegmentInfos.files(true), nrtEngine.getLatestSegmentInfos().files(true)); + assertEquals(expectedSegmentInfos.getUserData(), nrtEngine.getLatestSegmentInfos().getUserData()); + assertEquals(expectedSegmentInfos.getVersion(), nrtEngine.getLatestSegmentInfos().getVersion()); } private void assertSearcherHits(Engine engine, int hits) { diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 4f6d86c13e12c..23371a39871c7 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -16,6 +16,8 @@ import org.opensearch.common.xcontent.XContentType; import org.opensearch.index.IndexSettings; import org.opensearch.index.engine.DocIdSeqNoAndSource; +import org.opensearch.index.engine.InternalEngine; +import org.opensearch.index.engine.NRTReplicationEngine; import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.mapper.MapperService; import org.opensearch.index.replication.OpenSearchIndexLevelReplicationTestCase; @@ -26,10 +28,12 @@ import java.io.IOException; import java.util.List; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.verify; +import static java.util.Arrays.asList; +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; public class SegmentReplicationIndexShardTests extends OpenSearchIndexLevelReplicationTestCase { @@ -177,4 +181,76 @@ public void testPublishCheckpointAfterRelocationHandOff() throws IOException { closeShards(shard); } + public void testNRTReplicaPromotedAsPrimary() throws Exception { + try (ReplicationGroup shards = createGroup(2, settings, new NRTReplicationEngineFactory())) { + shards.startAll(); + IndexShard oldPrimary = shards.getPrimary(); + final IndexShard nextPrimary = shards.getReplicas().get(0); + final IndexShard replica = shards.getReplicas().get(1); + + // 1. Create ops that are in the index and xlog of both shards but not yet part of a commit point. + final int numDocs = shards.indexDocs(randomInt(10)); + + // refresh and copy the segments over. + oldPrimary.refresh("Test"); + replicateSegments(oldPrimary, shards.getReplicas()); + + // at this point both shards should have numDocs persisted and searchable. + assertDocCounts(oldPrimary, numDocs, numDocs); + for (IndexShard shard : shards.getReplicas()) { + assertDocCounts(shard, numDocs, numDocs); + } + + // 2. Create ops that are in the replica's xlog, not in the index. + // index some more into both but don't replicate. replica will have only numDocs searchable, but should have totalDocs + // persisted. + final int totalDocs = numDocs + shards.indexDocs(randomInt(10)); + + assertDocCounts(oldPrimary, totalDocs, totalDocs); + for (IndexShard shard : shards.getReplicas()) { + assertDocCounts(shard, totalDocs, numDocs); + } + + // promote the replica + shards.syncGlobalCheckpoint(); + assertEquals(totalDocs, nextPrimary.translogStats().estimatedNumberOfOperations()); + shards.promoteReplicaToPrimary(nextPrimary); + + // close and start the oldPrimary as a replica. + oldPrimary.close("demoted", false); + oldPrimary.store().close(); + oldPrimary = shards.addReplicaWithExistingPath(oldPrimary.shardPath(), oldPrimary.routingEntry().currentNodeId()); + shards.recoverReplica(oldPrimary); + + assertEquals(NRTReplicationEngine.class, oldPrimary.getEngine().getClass()); + assertEquals(InternalEngine.class, nextPrimary.getEngine().getClass()); + assertDocCounts(nextPrimary, totalDocs, totalDocs); + assertEquals(0, nextPrimary.translogStats().estimatedNumberOfOperations()); + + // refresh and push segments to our other replica. + nextPrimary.refresh("test"); + replicateSegments(nextPrimary, asList(replica)); + + for (IndexShard shard : shards) { + assertConsistentHistoryBetweenTranslogAndLucene(shard); + } + final List docsAfterRecovery = getDocIdAndSeqNos(shards.getPrimary()); + for (IndexShard shard : shards.getReplicas()) { + assertThat(shard.routingEntry().toString(), getDocIdAndSeqNos(shard), equalTo(docsAfterRecovery)); + } + } + } + + /** + * Assert persisted and searchable doc counts. This method should not be used while docs are concurrently indexed because + * it asserts point in time seqNos are relative to the doc counts. + */ + private void assertDocCounts(IndexShard indexShard, int expectedPersistedDocCount, int expectedSearchableDocCount) throws IOException { + assertDocCount(indexShard, expectedSearchableDocCount); + // assigned seqNos start at 0, so assert max & local seqNos are 1 less than our persisted doc count. + assertEquals(expectedPersistedDocCount - 1, indexShard.seqNoStats().getMaxSeqNo()); + assertEquals(expectedPersistedDocCount - 1, indexShard.seqNoStats().getLocalCheckpoint()); + // processed cp should be 1 less than our searchable doc count. + assertEquals(expectedSearchableDocCount - 1, indexShard.getProcessedLocalCheckpoint()); + } }