diff --git a/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/create/ShrinkIndexIT.java b/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/create/ShrinkIndexIT.java index 6a27508ca1289..03b48561c2af5 100644 --- a/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/create/ShrinkIndexIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/create/ShrinkIndexIT.java @@ -96,6 +96,11 @@ protected boolean forbidPrivateIndexSettings() { return false; } + @Override + protected boolean useSegmentReplication() { + return false; + } + public void testCreateShrinkIndexToN() throws Exception { assumeFalse("https://github.com/elastic/elasticsearch/issues/34080", Constants.WINDOWS); diff --git a/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/create/SplitIndexIT.java b/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/create/SplitIndexIT.java index a4e39bd892125..5fa8a939a76fa 100644 --- a/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/create/SplitIndexIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/create/SplitIndexIT.java @@ -95,6 +95,11 @@ protected boolean forbidPrivateIndexSettings() { return false; } + @Override + protected boolean useSegmentReplication() { + return false; + } + public void testCreateSplitIndexToN() throws IOException { int[][] possibleShardSplits = new int[][] { { 2, 4, 8 }, { 3, 6, 12 }, { 1, 2, 4 } }; int[] shardSplits = randomFrom(possibleShardSplits); diff --git a/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeIT.java b/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeIT.java index 8699ce187fda9..bb78c27fc26f7 100644 --- a/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeIT.java @@ -54,6 +54,11 @@ public class ForceMergeIT extends OpenSearchIntegTestCase { + @Override + protected boolean useSegmentReplication() { + return false; + } + public void testForceMergeUUIDConsistent() throws Exception { internalCluster().ensureAtLeastNumDataNodes(2); final String index = "test-index"; diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/NoClusterManagerNodeIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/NoClusterManagerNodeIT.java index 94df4ecb1826c..87c37a1623426 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/NoClusterManagerNodeIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/NoClusterManagerNodeIT.java @@ -82,6 +82,11 @@ protected int numberOfReplicas() { return 2; } + @Override + protected boolean useSegmentReplication() { + return false; + } + @Override protected Collection> nodePlugins() { return Collections.singletonList(MockTransportService.TestPlugin.class); diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/routing/PrimaryAllocationIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/routing/PrimaryAllocationIT.java index 0dd5f036457ad..8b2bc5da718ad 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/routing/PrimaryAllocationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/routing/PrimaryAllocationIT.java @@ -109,6 +109,11 @@ protected boolean addMockInternalEngine() { return false; } + @Override + protected boolean useSegmentReplication() { + return false; + } + public void testBulkWeirdScenario() throws Exception { String clusterManager = internalCluster().startClusterManagerOnlyNode(Settings.EMPTY); internalCluster().startDataOnlyNodes(2); diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java index 089a91a30dd17..71a5338a7d595 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java @@ -113,6 +113,11 @@ public class DiskThresholdDeciderIT extends OpenSearchIntegTestCase { private FileSystem defaultFileSystem; + @Override + protected boolean useSegmentReplication() { + return false; + } + @Before public void installFilesystemProvider() { assertNull(defaultFileSystem); diff --git a/server/src/internalClusterTest/java/org/opensearch/discovery/ClusterDisruptionIT.java b/server/src/internalClusterTest/java/org/opensearch/discovery/ClusterDisruptionIT.java index 85a03142b1d38..50932a0900650 100644 --- a/server/src/internalClusterTest/java/org/opensearch/discovery/ClusterDisruptionIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/discovery/ClusterDisruptionIT.java @@ -98,6 +98,11 @@ @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class ClusterDisruptionIT extends AbstractDisruptionTestCase { + @Override + protected boolean useSegmentReplication() { + return false; + } + private enum ConflictMode { none, external, diff --git a/server/src/internalClusterTest/java/org/opensearch/explain/ExplainActionIT.java b/server/src/internalClusterTest/java/org/opensearch/explain/ExplainActionIT.java index 2949fa34a0795..6b3cecdf63371 100644 --- a/server/src/internalClusterTest/java/org/opensearch/explain/ExplainActionIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/explain/ExplainActionIT.java @@ -59,6 +59,12 @@ import static org.hamcrest.Matchers.notNullValue; public class ExplainActionIT extends OpenSearchIntegTestCase { + + @Override + protected boolean useSegmentReplication() { + return false; + } + public void testSimple() throws Exception { assertAcked(prepareCreate("test").addAlias(new Alias("alias")).setSettings(Settings.builder().put("index.refresh_interval", -1))); ensureGreen("test"); diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/ReplicaShardAllocatorIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/ReplicaShardAllocatorIT.java index 5a429d5f7d910..bf14ed5ad25e8 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/ReplicaShardAllocatorIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/ReplicaShardAllocatorIT.java @@ -78,6 +78,11 @@ @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class ReplicaShardAllocatorIT extends OpenSearchIntegTestCase { + @Override + protected boolean useSegmentReplication() { + return false; + } + @Override protected Collection> nodePlugins() { return Arrays.asList(MockTransportService.TestPlugin.class, InternalSettingsPlugin.class); diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexPrimaryRelocationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexPrimaryRelocationIT.java index c049c8ed2d4a6..c484204533c36 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexPrimaryRelocationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexPrimaryRelocationIT.java @@ -54,6 +54,11 @@ @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST) public class IndexPrimaryRelocationIT extends OpenSearchIntegTestCase { + @Override + protected boolean addMockNRTReplicationEngine() { + return false; + } + private static final int RELOCATION_COUNT = 15; public Settings indexSettings() { diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java index 4d0b242d1f727..8ac8448bdc4b7 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java @@ -126,7 +126,7 @@ public static void waitForCurrentReplicas(String index, List nodes) thro assertBusy(() -> { for (String node : nodes) { final IndexShard indexShard = getIndexShard(node, index); - indexShard.getReplicationEngine().ifPresent((engine) -> { assertFalse(engine.hasRefreshPending()); }); + indexShard.getReplicationEngineForTests().ifPresent((engine) -> { assertFalse(engine.hasRefreshPending()); }); } }); } diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java index a0f01acd1f8e9..141f5eed93aa8 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java @@ -127,6 +127,11 @@ public IndexStatsIT(Settings settings) { super(settings); } + @Override + protected boolean useSegmentReplication() { + return false; + } + @ParametersFactory public static Collection parameters() { return Arrays.asList( diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteIndexRecoveryIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteIndexRecoveryIT.java index 4eb1cc7703735..b20bea5a23ff1 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteIndexRecoveryIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteIndexRecoveryIT.java @@ -28,6 +28,11 @@ @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class RemoteIndexRecoveryIT extends IndexRecoveryIT { + @Override + protected boolean useSegmentReplication() { + return false; + } + protected static final String REPOSITORY_NAME = "test-remote-store-repo"; protected Path repositoryPath; diff --git a/server/src/internalClusterTest/java/org/opensearch/routing/SimpleRoutingIT.java b/server/src/internalClusterTest/java/org/opensearch/routing/SimpleRoutingIT.java index 80e82fa387c96..06a994843884f 100644 --- a/server/src/internalClusterTest/java/org/opensearch/routing/SimpleRoutingIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/routing/SimpleRoutingIT.java @@ -66,6 +66,11 @@ public class SimpleRoutingIT extends OpenSearchIntegTestCase { + @Override + protected boolean useSegmentReplication() { + return false; + } + @Override protected int minimumNumberOfShards() { return 2; diff --git a/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java b/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java index 5207dab83f1d9..0be3ca6ed400f 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java @@ -67,6 +67,11 @@ @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, minNumDataNodes = 3) public class SearchWeightedRoutingIT extends OpenSearchIntegTestCase { + @Override + protected boolean useSegmentReplication() { + return false; + } + @Override protected Collection> nodePlugins() { return Arrays.asList(MockTransportService.TestPlugin.class, MockRepository.Plugin.class); diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index fd51524cbf252..20982d586c8d3 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -137,6 +137,9 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.opensearch.index.seqno.SequenceNumbers.LOCAL_CHECKPOINT_KEY; +import static org.opensearch.index.seqno.SequenceNumbers.MAX_SEQ_NO; + /** * The default internal engine (can be overridden by plugins) * @@ -382,7 +385,7 @@ private SoftDeletesPolicy newSoftDeletesPolicy() throws IOException { if (commitUserData.containsKey(Engine.MIN_RETAINED_SEQNO)) { lastMinRetainedSeqNo = Long.parseLong(commitUserData.get(Engine.MIN_RETAINED_SEQNO)); } else { - lastMinRetainedSeqNo = Long.parseLong(commitUserData.get(SequenceNumbers.MAX_SEQ_NO)) + 1; + lastMinRetainedSeqNo = Long.parseLong(commitUserData.get(MAX_SEQ_NO)) + 1; } return new SoftDeletesPolicy( translogManager::getLastSyncedGlobalCheckpoint, @@ -1815,9 +1818,7 @@ public boolean shouldPeriodicallyFlush() { if (shouldPeriodicallyFlushAfterBigMerge.get()) { return true; } - final long localCheckpointOfLastCommit = Long.parseLong( - lastCommittedSegmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY) - ); + final long localCheckpointOfLastCommit = Long.parseLong(lastCommittedSegmentInfos.userData.get(LOCAL_CHECKPOINT_KEY)); return translogManager.shouldPeriodicallyFlush( localCheckpointOfLastCommit, config().getIndexSettings().getFlushThresholdSize().getBytes() @@ -1855,9 +1856,7 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException { if (hasUncommittedChanges || force || shouldPeriodicallyFlush - || getProcessedLocalCheckpoint() > Long.parseLong( - lastCommittedSegmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY) - )) { + || getProcessedLocalCheckpoint() > Long.parseLong(lastCommittedSegmentInfos.userData.get(LOCAL_CHECKPOINT_KEY))) { translogManager.ensureCanFlush(); try { translogManager.rollTranslogGeneration(); @@ -2516,8 +2515,8 @@ protected void commitIndexWriter(final IndexWriter writer, final String translog */ final Map commitData = new HashMap<>(7); commitData.put(Translog.TRANSLOG_UUID_KEY, translogUUID); - commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(localCheckpoint)); - commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(localCheckpointTracker.getMaxSeqNo())); + commitData.put(LOCAL_CHECKPOINT_KEY, Long.toString(localCheckpoint)); + commitData.put(MAX_SEQ_NO, Long.toString(localCheckpointTracker.getMaxSeqNo())); commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get())); commitData.put(HISTORY_UUID_KEY, historyUUID); commitData.put(Engine.MIN_RETAINED_SEQNO, Long.toString(softDeletesPolicy.getMinRetainedSeqNo())); diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index f50ed71bb8e54..270071491e9b2 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1013,7 +1013,7 @@ private Engine.IndexResult applyIndexOperation( UNASSIGNED_SEQ_NO, 0 ); - return getEngine().index(index); + return index(engine, index); } assert opPrimaryTerm <= getOperationPrimaryTerm() : "op term [ " + opPrimaryTerm @@ -1571,6 +1571,12 @@ public Optional getReplicationEngine() { } } + public Optional getReplicationEngineForTests() { + return Optional.ofNullable(getEngineOrNull()) + .filter((engine) -> engine instanceof NRTReplicationEngine) + .map((engine) -> (NRTReplicationEngine) engine); + } + public void finalizeReplication(SegmentInfos infos) throws IOException { if (getReplicationEngine().isPresent()) { getReplicationEngine().get().updateSegments(infos); @@ -4419,7 +4425,8 @@ final long getLastSearcherAccess() { * Returns true if this shard has some scheduled refresh that is pending because of search-idle. */ public final boolean hasRefreshPending() { - return pendingRefreshLocation.get() != null; + final Boolean nrtPending = getReplicationEngine().map(NRTReplicationEngine::hasRefreshPending).orElse(false); + return pendingRefreshLocation.get() != null || nrtPending; } private void setRefreshPending(Engine engine) { diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index ffc4ab86661db..db61569e1ab24 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -212,7 +212,7 @@ public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedChe // if the shard is in any state if (replicaShard.state().equals(IndexShardState.CLOSED)) { // ignore if shard is closed - logger.trace(() -> "Ignoring checkpoint, Shard is closed"); + logger.info(() -> "Ignoring checkpoint, Shard is closed"); return; } updateLatestReceivedCheckpoint(receivedCheckpoint, replicaShard); @@ -281,7 +281,7 @@ public void onReplicationFailure( }); } } else { - logger.trace( + logger.info( () -> new ParameterizedMessage("Ignoring checkpoint, shard not started {} {}", receivedCheckpoint, replicaShard.state()) ); } @@ -330,7 +330,7 @@ protected void updateVisibleCheckpoint(long replicationId, IndexShard replicaSha final TransportRequestOptions options = TransportRequestOptions.builder() .withTimeout(recoverySettings.internalActionTimeout()) .build(); - logger.trace( + logger.info( () -> new ParameterizedMessage( "Updating Primary shard that replica {}-{} is synced to checkpoint {}", replicaShard.shardId(), @@ -347,7 +347,7 @@ protected void updateVisibleCheckpoint(long replicationId, IndexShard replicaSha final ActionListener listener = new ActionListener<>() { @Override public void onResponse(Void unused) { - logger.trace( + logger.info( () -> new ParameterizedMessage( "Successfully updated replication checkpoint {} for replica {}", replicaShard.shardId(), @@ -386,7 +386,7 @@ private DiscoveryNode getPrimaryNode(ShardRouting primaryShard) { protected boolean processLatestReceivedCheckpoint(IndexShard replicaShard, Thread thread) { final ReplicationCheckpoint latestPublishedCheckpoint = latestReceivedCheckpoint.get(replicaShard.shardId()); if (latestPublishedCheckpoint != null && latestPublishedCheckpoint.isAheadOf(replicaShard.getLatestReplicationCheckpoint())) { - logger.trace( + logger.info( () -> new ParameterizedMessage( "Processing latest received checkpoint for shard {} {}", replicaShard.shardId(), @@ -448,7 +448,7 @@ void startReplication(final SegmentReplicationTarget target) { target.fail(e, false); return; } - logger.trace(() -> new ParameterizedMessage("Added new replication to collection {}", target.description())); + logger.info(() -> new ParameterizedMessage("Added new replication to collection {}", target.description())); threadPool.generic().execute(new ReplicationRunner(replicationId)); } @@ -573,7 +573,7 @@ private void forceReplication(ForceSyncRequest request, ActionListener new ParameterizedMessage( "[shardId {}] [replication id {}] Force replication Sync complete to {}, timing data: {}", shardId, diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java index fff388ffc576e..32f3a6b3ebad5 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java @@ -1697,7 +1697,7 @@ protected static Collection getReplicaShards(String... node) { public static void waitForCurrentReplicas(Collection shards) throws Exception { assertBusy(() -> { for (IndexShard indexShard : shards) { - indexShard.getReplicationEngine().ifPresent((engine) -> assertFalse(engine.hasRefreshPending())); + indexShard.getReplicationEngineForTests().ifPresent((engine) -> assertFalse(engine.hasRefreshPending())); } }); }