From 197909c71f91f1e26da7bc6c7218b53fbb73cdae Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Fri, 22 Jul 2022 09:31:17 -0700 Subject: [PATCH] [Segment Replication] Wire up segment replication with peer recovery and add ITs. (#3743) * Add null check when computing max segment version. With segment replication enabled it is possible Lucene does not set the SegmentInfos min segment version, leaving the default value as null. Signed-off-by: Marc Handalian * Update peer recovery to set the translogUUID of replicas to the UUID generated on the primary. This change updates the UUID when the translog is created to the value stored in the passed segment userdata. This is to ensure during failover scenarios that the replica can be promoted and not have a uuid mismatch with the value stored in user data. Signed-off-by: Marc Handalian * Wire up Segment Replication under the feature flag. This PR wires up segment replication and adds some initial integration tests. Signed-off-by: Marc Handalian * Add test to ensure replicas use primary translog uuid with segrep. Signed-off-by: Marc Handalian * Update SegmentReplicationIT to assert previous commit points are valid and SegmentInfos can be built. Fix nitpicks in PR feedback. Signed-off-by: Marc Handalian * Fix test with Assert.fail to include a message. Signed-off-by: Marc Handalian --- .../replication/SegmentReplicationIT.java | 306 ++++++++++++++++++ .../opensearch/index/shard/IndexShard.java | 16 +- .../org/opensearch/index/store/Store.java | 7 +- .../cluster/IndicesClusterStateService.java | 20 +- .../indices/recovery/RecoveryTarget.java | 33 +- .../OngoingSegmentReplications.java | 6 +- .../SegmentReplicationSourceFactory.java | 8 +- .../SegmentReplicationSourceHandler.java | 8 + .../replication/SegmentReplicationTarget.java | 7 +- .../SegmentReplicationTargetService.java | 21 ++ .../checkpoint/ReplicationCheckpoint.java | 13 + .../replication/common/ReplicationTarget.java | 4 +- .../main/java/org/opensearch/node/Node.java | 2 + ...ClusterStateServiceRandomUpdatesTests.java | 2 + .../indices/recovery/RecoveryTests.java | 12 + .../OngoingSegmentReplicationsTests.java | 46 ++- .../snapshots/SnapshotResiliencyTests.java | 8 + 17 files changed, 484 insertions(+), 35 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java new file mode 100644 index 0000000000000..2c91eadafbee8 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -0,0 +1,306 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import com.carrotsearch.randomizedtesting.RandomizedTest; +import org.apache.lucene.index.SegmentInfos; +import org.junit.BeforeClass; +import org.opensearch.action.admin.indices.segments.IndexShardSegments; +import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse; +import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest; +import org.opensearch.action.admin.indices.segments.ShardSegments; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.index.Index; +import org.opensearch.index.IndexModule; +import org.opensearch.index.IndexService; +import org.opensearch.index.engine.Segment; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.indices.IndicesService; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.test.BackgroundIndexer; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class SegmentReplicationIT extends OpenSearchIntegTestCase { + + private static final String INDEX_NAME = "test-idx-1"; + private static final int SHARD_COUNT = 1; + private static final int REPLICA_COUNT = 1; + + @BeforeClass + public static void assumeFeatureFlag() { + assumeTrue("Segment replication Feature flag is enabled", Boolean.parseBoolean(System.getProperty(FeatureFlags.REPLICATION_TYPE))); + } + + @Override + public Settings indexSettings() { + return Settings.builder() + .put(super.indexSettings()) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, SHARD_COUNT) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT) + .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .build(); + } + + @Override + protected boolean addMockInternalEngine() { + return false; + } + + public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception { + final String nodeA = internalCluster().startNode(); + final String nodeB = internalCluster().startNode(); + createIndex(INDEX_NAME); + ensureGreen(INDEX_NAME); + + final int initialDocCount = scaledRandomIntBetween(0, 200); + try ( + BackgroundIndexer indexer = new BackgroundIndexer( + INDEX_NAME, + "_doc", + client(), + -1, + RandomizedTest.scaledRandomIntBetween(2, 5), + false, + random() + ) + ) { + indexer.start(initialDocCount); + waitForDocs(initialDocCount, indexer); + refresh(INDEX_NAME); + waitForReplicaUpdate(); + + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + + final int additionalDocCount = scaledRandomIntBetween(0, 200); + final int expectedHitCount = initialDocCount + additionalDocCount; + indexer.start(additionalDocCount); + waitForDocs(expectedHitCount, indexer); + + flushAndRefresh(INDEX_NAME); + waitForReplicaUpdate(); + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + + ensureGreen(INDEX_NAME); + assertSegmentStats(REPLICA_COUNT); + } + } + + public void testReplicationAfterForceMerge() throws Exception { + final String nodeA = internalCluster().startNode(); + final String nodeB = internalCluster().startNode(); + createIndex(INDEX_NAME); + ensureGreen(INDEX_NAME); + + final int initialDocCount = scaledRandomIntBetween(0, 200); + final int additionalDocCount = scaledRandomIntBetween(0, 200); + final int expectedHitCount = initialDocCount + additionalDocCount; + try ( + BackgroundIndexer indexer = new BackgroundIndexer( + INDEX_NAME, + "_doc", + client(), + -1, + RandomizedTest.scaledRandomIntBetween(2, 5), + false, + random() + ) + ) { + indexer.start(initialDocCount); + waitForDocs(initialDocCount, indexer); + + flush(INDEX_NAME); + waitForReplicaUpdate(); + // wait a short amount of time to give replication a chance to complete. + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + + // Index a second set of docs so we can merge into one segment. + indexer.start(additionalDocCount); + waitForDocs(expectedHitCount, indexer); + + // Force a merge here so that the in memory SegmentInfos does not reference old segments on disk. + client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(false).get(); + refresh(INDEX_NAME); + waitForReplicaUpdate(); + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + + ensureGreen(INDEX_NAME); + assertSegmentStats(REPLICA_COUNT); + } + } + + public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception { + final String primaryNode = internalCluster().startNode(); + createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()); + ensureGreen(INDEX_NAME); + + // Index a doc to create the first set of segments. _s1.si + client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").get(); + // Flush segments to disk and create a new commit point (Primary: segments_3, _s1.si) + flushAndRefresh(INDEX_NAME); + assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1); + + // Index to create another segment + client().prepareIndex(INDEX_NAME).setId("2").setSource("foo", "bar").get(); + + // Force a merge here so that the in memory SegmentInfos does not reference old segments on disk. + client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(false).get(); + refresh(INDEX_NAME); + + assertAcked( + client().admin() + .indices() + .prepareUpdateSettings(INDEX_NAME) + .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)) + ); + final String replicaNode = internalCluster().startNode(); + ensureGreen(INDEX_NAME); + + client().prepareIndex(INDEX_NAME).setId("3").setSource("foo", "bar").get(); + + waitForReplicaUpdate(); + assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3); + assertHitCount(client(replicaNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3); + + final Index index = resolveIndex(INDEX_NAME); + IndexShard primaryShard = getIndexShard(index, primaryNode); + IndexShard replicaShard = getIndexShard(index, replicaNode); + assertEquals( + primaryShard.translogStats().estimatedNumberOfOperations(), + replicaShard.translogStats().estimatedNumberOfOperations() + ); + assertSegmentStats(REPLICA_COUNT); + } + + private void assertSegmentStats(int numberOfReplicas) throws IOException { + final IndicesSegmentResponse indicesSegmentResponse = client().admin().indices().segments(new IndicesSegmentsRequest()).actionGet(); + + List segmentsByIndex = getShardSegments(indicesSegmentResponse); + + // There will be an entry in the list for each index. + for (ShardSegments[] replicationGroupSegments : segmentsByIndex) { + + // Separate Primary & replica shards ShardSegments. + final Map> segmentListMap = segmentsByShardType(replicationGroupSegments); + final List primaryShardSegmentsList = segmentListMap.get(true); + final List replicaShardSegments = segmentListMap.get(false); + + assertEquals("There should only be one primary in the replicationGroup", primaryShardSegmentsList.size(), 1); + final ShardSegments primaryShardSegments = primaryShardSegmentsList.stream().findFirst().get(); + final Map latestPrimarySegments = getLatestSegments(primaryShardSegments); + + assertEquals( + "There should be a ShardSegment entry for each replica in the replicationGroup", + numberOfReplicas, + replicaShardSegments.size() + ); + + for (ShardSegments shardSegment : replicaShardSegments) { + final Map latestReplicaSegments = getLatestSegments(shardSegment); + for (Segment replicaSegment : latestReplicaSegments.values()) { + final Segment primarySegment = latestPrimarySegments.get(replicaSegment.getName()); + assertEquals(replicaSegment.getGeneration(), primarySegment.getGeneration()); + assertEquals(replicaSegment.getNumDocs(), primarySegment.getNumDocs()); + assertEquals(replicaSegment.getDeletedDocs(), primarySegment.getDeletedDocs()); + assertEquals(replicaSegment.getSize(), primarySegment.getSize()); + } + + // Fetch the IndexShard for this replica and try and build its SegmentInfos from the previous commit point. + // This ensures the previous commit point is not wiped. + final ShardRouting replicaShardRouting = shardSegment.getShardRouting(); + ClusterState state = client(internalCluster().getMasterName()).admin().cluster().prepareState().get().getState(); + final DiscoveryNode replicaNode = state.nodes().resolveNode(replicaShardRouting.currentNodeId()); + final Index index = resolveIndex(INDEX_NAME); + IndexShard indexShard = getIndexShard(index, replicaNode.getName()); + final String lastCommitSegmentsFileName = SegmentInfos.getLastCommitSegmentsFileName(indexShard.store().directory()); + // calls to readCommit will fail if a valid commit point and all its segments are not in the store. + SegmentInfos.readCommit(indexShard.store().directory(), lastCommitSegmentsFileName); + } + } + } + + /** + * Waits until the replica is caught up to the latest primary segments gen. + * @throws Exception + */ + private void waitForReplicaUpdate() throws Exception { + // wait until the replica has the latest segment generation. + assertBusy(() -> { + final IndicesSegmentResponse indicesSegmentResponse = client().admin() + .indices() + .segments(new IndicesSegmentsRequest()) + .actionGet(); + List segmentsByIndex = getShardSegments(indicesSegmentResponse); + for (ShardSegments[] replicationGroupSegments : segmentsByIndex) { + final Map> segmentListMap = segmentsByShardType(replicationGroupSegments); + final List primaryShardSegmentsList = segmentListMap.get(true); + final List replicaShardSegments = segmentListMap.get(false); + + final ShardSegments primaryShardSegments = primaryShardSegmentsList.stream().findFirst().get(); + final Map latestPrimarySegments = getLatestSegments(primaryShardSegments); + final Long latestPrimaryGen = latestPrimarySegments.values().stream().findFirst().map(Segment::getGeneration).get(); + for (ShardSegments shardSegments : replicaShardSegments) { + final boolean isReplicaCaughtUpToPrimary = shardSegments.getSegments() + .stream() + .anyMatch(segment -> segment.getGeneration() == latestPrimaryGen); + assertTrue(isReplicaCaughtUpToPrimary); + } + } + }); + } + + private IndexShard getIndexShard(Index index, String node) { + IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node); + IndexService indexService = indicesService.indexServiceSafe(index); + final Optional shardId = indexService.shardIds().stream().findFirst(); + return indexService.getShard(shardId.get()); + } + + private List getShardSegments(IndicesSegmentResponse indicesSegmentResponse) { + return indicesSegmentResponse.getIndices() + .values() + .stream() // get list of IndexSegments + .flatMap(is -> is.getShards().values().stream()) // Map to shard replication group + .map(IndexShardSegments::getShards) // get list of segments across replication group + .collect(Collectors.toList()); + } + + private Map getLatestSegments(ShardSegments segments) { + final Long latestPrimaryGen = segments.getSegments().stream().map(Segment::getGeneration).max(Long::compare).get(); + return segments.getSegments() + .stream() + .filter(s -> s.getGeneration() == latestPrimaryGen) + .collect(Collectors.toMap(Segment::getName, Function.identity())); + } + + private Map> segmentsByShardType(ShardSegments[] replicationGroupSegments) { + return Arrays.stream(replicationGroupSegments).collect(Collectors.groupingBy(s -> s.getShardRouting().primary())); + } +} diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index fbe81b9320de5..9694f3dd37f80 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1396,9 +1396,13 @@ public GatedCloseable acquireSafeIndexCommit() throws EngineExcepti } /** - * Returns the lastest Replication Checkpoint that shard received + * Returns the lastest Replication Checkpoint that shard received. Shards will return an EMPTY checkpoint before + * the engine is opened. */ public ReplicationCheckpoint getLatestReplicationCheckpoint() { + if (getEngineOrNull() == null) { + return ReplicationCheckpoint.empty(shardId); + } try (final GatedCloseable snapshot = getSegmentInfosSnapshot()) { return Optional.ofNullable(snapshot.get()) .map( @@ -1410,15 +1414,7 @@ public ReplicationCheckpoint getLatestReplicationCheckpoint() { segmentInfos.getVersion() ) ) - .orElse( - new ReplicationCheckpoint( - shardId, - getOperationPrimaryTerm(), - SequenceNumbers.NO_OPS_PERFORMED, - getProcessedLocalCheckpoint(), - SequenceNumbers.NO_OPS_PERFORMED - ) - ); + .orElse(ReplicationCheckpoint.empty(shardId)); } catch (IOException ex) { throw new OpenSearchException("Error Closing SegmentInfos Snapshot", ex); } diff --git a/server/src/main/java/org/opensearch/index/store/Store.java b/server/src/main/java/org/opensearch/index/store/Store.java index 2309004c0777d..6828ab7d91b2c 100644 --- a/server/src/main/java/org/opensearch/index/store/Store.java +++ b/server/src/main/java/org/opensearch/index/store/Store.java @@ -1003,7 +1003,12 @@ static LoadedMetadata loadMetadata(SegmentInfos segmentInfos, Directory director // version is written since 3.1+: we should have already hit IndexFormatTooOld. throw new IllegalArgumentException("expected valid version value: " + info.info.toString()); } - if (version.onOrAfter(maxVersion)) { + // With segment replication enabled, we compute metadata snapshots from the latest in memory infos. + // In this case we will have SegmentInfos objects fetched from the primary's reader + // where the minSegmentLuceneVersion can be null even though there are segments. + // This is because the SegmentInfos object is not read from a commit/IndexInput, which sets + // minSegmentLuceneVersion. + if (maxVersion == null || version.onOrAfter(maxVersion)) { maxVersion = version; } for (String file : info.files()) { diff --git a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java index 33d3bc2ba07b0..8884ef2cddd0a 100644 --- a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java @@ -56,6 +56,7 @@ import org.opensearch.common.inject.Inject; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.util.concurrent.AbstractRunnable; import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.env.ShardLockObtainFailedException; @@ -80,6 +81,7 @@ import org.opensearch.indices.recovery.PeerRecoveryTargetService; import org.opensearch.indices.recovery.RecoveryListener; import org.opensearch.indices.recovery.RecoveryState; +import org.opensearch.indices.replication.SegmentReplicationTargetService; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.indices.replication.common.ReplicationState; import org.opensearch.repositories.RepositoriesService; @@ -90,6 +92,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -134,7 +137,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple private final FailedShardHandler failedShardHandler = new FailedShardHandler(); private final boolean sendRefreshMapping; - private final List buildInIndexListener; + private final List builtInIndexListener; private final PrimaryReplicaSyncer primaryReplicaSyncer; private final Consumer globalCheckpointSyncer; private final RetentionLeaseSyncer retentionLeaseSyncer; @@ -148,6 +151,7 @@ public IndicesClusterStateService( final ClusterService clusterService, final ThreadPool threadPool, final PeerRecoveryTargetService recoveryTargetService, + final SegmentReplicationTargetService segmentReplicationTargetService, final ShardStateAction shardStateAction, final NodeMappingRefreshAction nodeMappingRefreshAction, final RepositoriesService repositoriesService, @@ -165,6 +169,7 @@ public IndicesClusterStateService( clusterService, threadPool, checkpointPublisher, + segmentReplicationTargetService, recoveryTargetService, shardStateAction, nodeMappingRefreshAction, @@ -185,6 +190,7 @@ public IndicesClusterStateService( final ClusterService clusterService, final ThreadPool threadPool, final SegmentReplicationCheckpointPublisher checkpointPublisher, + final SegmentReplicationTargetService segmentReplicationTargetService, final PeerRecoveryTargetService recoveryTargetService, final ShardStateAction shardStateAction, final NodeMappingRefreshAction nodeMappingRefreshAction, @@ -198,7 +204,15 @@ public IndicesClusterStateService( ) { this.settings = settings; this.checkpointPublisher = checkpointPublisher; - this.buildInIndexListener = Arrays.asList(peerRecoverySourceService, recoveryTargetService, searchService, snapshotShardsService); + + final List indexEventListeners = new ArrayList<>( + Arrays.asList(peerRecoverySourceService, recoveryTargetService, searchService, snapshotShardsService) + ); + // if segrep feature flag is not enabled, don't wire the target serivce as an IndexEventListener. + if (FeatureFlags.isEnabled(FeatureFlags.REPLICATION_TYPE)) { + indexEventListeners.add(segmentReplicationTargetService); + } + this.builtInIndexListener = Collections.unmodifiableList(indexEventListeners); this.indicesService = indicesService; this.clusterService = clusterService; this.threadPool = threadPool; @@ -514,7 +528,7 @@ private void createIndices(final ClusterState state) { AllocatedIndex indexService = null; try { - indexService = indicesService.createIndex(indexMetadata, buildInIndexListener, true); + indexService = indicesService.createIndex(indexMetadata, builtInIndexListener, true); if (indexService.updateMapping(null, indexMetadata) && sendRefreshMapping) { nodeMappingRefreshAction.nodeMappingRefresh( state.nodes().getClusterManagerNode(), diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java index 426409f7a5b65..652f3c9a55f53 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java @@ -62,10 +62,13 @@ import org.opensearch.indices.replication.common.ReplicationCollection; import java.io.IOException; +import java.nio.channels.FileChannel; import java.nio.file.Path; import java.util.List; import java.util.concurrent.CountDownLatch; +import static org.opensearch.index.translog.Translog.TRANSLOG_UUID_KEY; + /** * Represents a recovery where the current node is the target node of the recovery. To track recoveries in a central place, instances of * this class are created through {@link ReplicationCollection}. @@ -398,13 +401,29 @@ public void cleanFiles( store.incRef(); try { store.cleanupAndVerify("recovery CleanFilesRequestHandler", sourceMetadata); - final String translogUUID = Translog.createEmptyTranslog( - indexShard.shardPath().resolveTranslog(), - globalCheckpoint, - shardId(), - indexShard.getPendingPrimaryTerm() - ); - store.associateIndexWithNewTranslog(translogUUID); + + // If Segment Replication is enabled, we need to reuse the primary's translog UUID already stored in the index. + // With Segrep, replicas should never create their own commit points. This ensures the index and xlog share the same + // UUID without the extra step to associate the index with a new xlog. + if (indexShard.indexSettings().isSegRepEnabled()) { + final String translogUUID = store.getMetadata().getCommitUserData().get(TRANSLOG_UUID_KEY); + Translog.createEmptyTranslog( + indexShard.shardPath().resolveTranslog(), + shardId(), + globalCheckpoint, + indexShard.getPendingPrimaryTerm(), + translogUUID, + FileChannel::open + ); + } else { + final String translogUUID = Translog.createEmptyTranslog( + indexShard.shardPath().resolveTranslog(), + globalCheckpoint, + shardId(), + indexShard.getPendingPrimaryTerm() + ); + store.associateIndexWithNewTranslog(translogUUID); + } if (indexShard.getRetentionLeases().leases().isEmpty()) { // if empty, may be a fresh IndexShard, so write an empty leases file to disk diff --git a/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java b/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java index 6302d364fc6d1..a9b032c98b70f 100644 --- a/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java +++ b/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java @@ -113,7 +113,11 @@ void startSegmentCopy(GetSegmentFilesRequest request, ActionListener { + final String targetAllocationId = request.getTargetAllocationId(); + RunUnderPrimaryPermit.run( + () -> shard.markAllocationIdAsInSync(targetAllocationId, request.getCheckpoint().getSeqNo()), + shard.shardId() + " marking " + targetAllocationId + " as in sync", + shard, + cancellableThreads, + logger + ); try { future.onResponse(new GetSegmentFilesResponse(List.of(storeFileMetadata))); } finally { diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index fb68e59f3b2ef..516cfa91a787b 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -181,11 +181,8 @@ private void getFiles(CheckpointInfoResponse checkpointInfo, StepListener listener) { diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index f9b40d14b0d53..f699f0edba842 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -53,6 +53,27 @@ public class SegmentReplicationTargetService implements IndexEventListener { private final Map latestReceivedCheckpoint = new HashMap<>(); + // Empty Implementation, only required while Segment Replication is under feature flag. + public static final SegmentReplicationTargetService NO_OP = new SegmentReplicationTargetService() { + @Override + public void beforeIndexShardClosed(ShardId shardId, IndexShard indexShard, Settings indexSettings) { + // NoOp; + } + + @Override + public synchronized void onNewCheckpoint(ReplicationCheckpoint receivedCheckpoint, IndexShard replicaShard) { + // noOp; + } + }; + + // Used only for empty implementation. + private SegmentReplicationTargetService() { + threadPool = null; + recoverySettings = null; + onGoingReplications = null; + sourceFactory = null; + } + /** * The internal actions * diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java index f84a65206190b..abcef1bd91944 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java @@ -12,6 +12,7 @@ import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.common.io.stream.Writeable; +import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.ShardId; import java.io.IOException; @@ -30,6 +31,18 @@ public class ReplicationCheckpoint implements Writeable { private final long seqNo; private final long segmentInfosVersion; + public static ReplicationCheckpoint empty(ShardId shardId) { + return new ReplicationCheckpoint(shardId); + } + + private ReplicationCheckpoint(ShardId shardId) { + this.shardId = shardId; + primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM; + segmentsGen = SequenceNumbers.NO_OPS_PERFORMED; + seqNo = SequenceNumbers.NO_OPS_PERFORMED; + segmentInfosVersion = SequenceNumbers.NO_OPS_PERFORMED; + } + public ReplicationCheckpoint(ShardId shardId, long primaryTerm, long segmentsGen, long seqNo, long segmentInfosVersion) { this.shardId = shardId; this.primaryTerm = primaryTerm; diff --git a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java index 27e23ceafb15e..501ff46eeb2ff 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java @@ -49,7 +49,6 @@ public abstract class ReplicationTarget extends AbstractRefCounted { private final long id; protected final AtomicBoolean finished = new AtomicBoolean(); - private final ShardId shardId; protected final IndexShard indexShard; protected final Store store; protected final ReplicationListener listener; @@ -89,7 +88,6 @@ public ReplicationTarget(String name, IndexShard indexShard, ReplicationLuceneIn this.stateIndex = stateIndex; this.indexShard = indexShard; this.store = indexShard.store(); - this.shardId = indexShard.shardId(); // make sure the store is not released until we are done. this.cancellableThreads = new CancellableThreads(); store.incRef(); @@ -131,7 +129,7 @@ public Store store() { } public ShardId shardId() { - return shardId; + return indexShard.shardId(); } /** diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index eb2604af0147b..6cc8128cffd52 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -953,6 +953,8 @@ protected Node( ); b.bind(SegmentReplicationSourceService.class) .toInstance(new SegmentReplicationSourceService(indicesService, transportService, recoverySettings)); + } else { + b.bind(SegmentReplicationTargetService.class).toInstance(SegmentReplicationTargetService.NO_OP); } } b.bind(HttpServerTransport.class).toInstance(httpServerTransport); diff --git a/server/src/test/java/org/opensearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java b/server/src/test/java/org/opensearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java index 903bdccef5587..939e8b2d4e782 100644 --- a/server/src/test/java/org/opensearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java +++ b/server/src/test/java/org/opensearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java @@ -66,6 +66,7 @@ import org.opensearch.index.shard.PrimaryReplicaSyncer; import org.opensearch.index.shard.ShardId; import org.opensearch.indices.recovery.PeerRecoveryTargetService; +import org.opensearch.indices.replication.SegmentReplicationTargetService; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.repositories.RepositoriesService; import org.opensearch.threadpool.TestThreadPool; @@ -566,6 +567,7 @@ private IndicesClusterStateService createIndicesClusterStateService( clusterService, threadPool, SegmentReplicationCheckpointPublisher.EMPTY, + SegmentReplicationTargetService.NO_OP, recoveryTargetService, shardStateAction, null, diff --git a/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java index 5224a54a35e96..3ea74dbf38919 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java @@ -71,6 +71,7 @@ import org.opensearch.index.translog.Translog; import org.opensearch.indices.replication.common.ReplicationListener; import org.opensearch.indices.replication.common.ReplicationState; +import org.opensearch.indices.replication.common.ReplicationType; import java.io.IOException; import java.util.HashMap; @@ -103,6 +104,17 @@ public void testTranslogHistoryTransferred() throws Exception { } } + public void testWithSegmentReplication_ReplicaUsesPrimaryTranslogUUID() throws Exception { + Settings settings = Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build(); + try (ReplicationGroup shards = createGroup(2, settings)) { + shards.startAll(); + final String expectedUUID = getTranslog(shards.getPrimary()).getTranslogUUID(); + assertTrue( + shards.getReplicas().stream().allMatch(indexShard -> getTranslog(indexShard).getTranslogUUID().equals(expectedUUID)) + ); + } + } + public void testRetentionPolicyChangeDuringRecovery() throws Exception { try (ReplicationGroup shards = createGroup(0)) { shards.startPrimary(); diff --git a/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java b/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java index 260f6a13b5010..d42e75871a45a 100644 --- a/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java @@ -37,6 +37,7 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; public class OngoingSegmentReplicationsTests extends IndexShardTestCase { @@ -126,7 +127,7 @@ public void onResponse(GetSegmentFilesResponse getSegmentFilesResponse) { @Override public void onFailure(Exception e) { logger.error("Unexpected failure", e); - Assert.fail(); + Assert.fail("Unexpected failure from startSegmentCopy listener: " + e); } }); } @@ -228,4 +229,47 @@ public void testShardAlreadyReplicatingToNode() throws IOException { replications.prepareForReplication(request, segmentSegmentFileChunkWriter); assertThrows(OpenSearchException.class, () -> { replications.prepareForReplication(request, segmentSegmentFileChunkWriter); }); } + + public void testStartReplicationWithNoFilesToFetch() throws IOException { + // create a replications object and request a checkpoint. + OngoingSegmentReplications replications = spy(new OngoingSegmentReplications(mockIndicesService, recoverySettings)); + final CheckpointInfoRequest request = new CheckpointInfoRequest( + 1L, + replica.routingEntry().allocationId().getId(), + replicaDiscoveryNode, + testCheckpoint + ); + // mock the FileChunkWriter so we can assert its ever called. + final FileChunkWriter segmentSegmentFileChunkWriter = mock(FileChunkWriter.class); + // Prepare for replication step - and ensure copyState is added to cache. + final CopyState copyState = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); + assertTrue(replications.isInCopyStateMap(request.getCheckpoint())); + assertEquals(1, replications.size()); + assertEquals(1, copyState.refCount()); + + getSegmentFilesRequest = new GetSegmentFilesRequest( + 1L, + replica.routingEntry().allocationId().getId(), + replicaDiscoveryNode, + Collections.emptyList(), + testCheckpoint + ); + + // invoke startSegmentCopy and assert our fileChunkWriter is never invoked. + replications.startSegmentCopy(getSegmentFilesRequest, new ActionListener<>() { + @Override + public void onResponse(GetSegmentFilesResponse getSegmentFilesResponse) { + assertEquals(Collections.emptyList(), getSegmentFilesResponse.files); + assertEquals(0, copyState.refCount()); + assertFalse(replications.isInCopyStateMap(request.getCheckpoint())); + verifyNoInteractions(segmentSegmentFileChunkWriter); + } + + @Override + public void onFailure(Exception e) { + logger.error("Unexpected failure", e); + Assert.fail(); + } + }); + } } diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index e20fd347fc3fd..cca27366f7df0 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -183,6 +183,8 @@ import org.opensearch.indices.recovery.PeerRecoverySourceService; import org.opensearch.indices.recovery.PeerRecoveryTargetService; import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.indices.replication.SegmentReplicationSourceFactory; +import org.opensearch.indices.replication.SegmentReplicationTargetService; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.ingest.IngestService; import org.opensearch.monitor.StatusInfo; @@ -1840,6 +1842,12 @@ public void onFailure(final Exception e) { clusterService, threadPool, new PeerRecoveryTargetService(threadPool, transportService, recoverySettings, clusterService), + new SegmentReplicationTargetService( + threadPool, + recoverySettings, + transportService, + new SegmentReplicationSourceFactory(transportService, recoverySettings, clusterService) + ), shardStateAction, new NodeMappingRefreshAction(transportService, metadataMappingService), repositoriesService,