diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationClusterSettingIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationClusterSettingIT.java index f2cb7c9c6bfc8..d2f1e6313db07 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationClusterSettingIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationClusterSettingIT.java @@ -89,8 +89,8 @@ public void testIndexReplicationSettingOverridesSegRepClusterSetting() throws Ex Index index = resolveIndex(INDEX_NAME); Index anotherIndex = resolveIndex(ANOTHER_INDEX); IndicesService indicesService = internalCluster().getInstance(IndicesService.class, primaryNode); - assertEquals(indicesService.indexService(index).getIndexSettings().isSegRepEnabled(), false); - assertEquals(indicesService.indexService(anotherIndex).getIndexSettings().isSegRepEnabled(), true); + assertEquals(indicesService.indexService(index).getIndexSettings().isSegRepEnabledOrRemoteNode(), false); + assertEquals(indicesService.indexService(anotherIndex).getIndexSettings().isSegRepEnabledOrRemoteNode(), true); } public void testIndexReplicationSettingOverridesDocRepClusterSetting() throws Exception { @@ -119,8 +119,8 @@ public void testIndexReplicationSettingOverridesDocRepClusterSetting() throws Ex Index index = resolveIndex(INDEX_NAME); Index anotherIndex = resolveIndex(ANOTHER_INDEX); IndicesService indicesService = internalCluster().getInstance(IndicesService.class, primaryNode); - assertEquals(indicesService.indexService(index).getIndexSettings().isSegRepEnabled(), true); - assertEquals(indicesService.indexService(anotherIndex).getIndexSettings().isSegRepEnabled(), false); + assertEquals(indicesService.indexService(index).getIndexSettings().isSegRepEnabledOrRemoteNode(), true); + assertEquals(indicesService.indexService(anotherIndex).getIndexSettings().isSegRepEnabledOrRemoteNode(), false); } public void testReplicationTypesOverrideNotAllowed_IndexAPI() { diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/SegmentReplicationSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/SegmentReplicationSnapshotIT.java index c649c4ab13e7e..b019bb57743c9 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/SegmentReplicationSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/SegmentReplicationSnapshotIT.java @@ -303,7 +303,7 @@ public void testSnapshotRestoreOnIndexWithSegRepClusterSetting() throws Exceptio // Verify index setting isSegRepEnabled. Index index = resolveIndex(RESTORED_INDEX_NAME); IndicesService indicesService = internalCluster().getInstance(IndicesService.class); - assertEquals(indicesService.indexService(index).getIndexSettings().isSegRepEnabled(), false); + assertEquals(indicesService.indexService(index).getIndexSettings().isSegRepEnabledOrRemoteNode(), false); } /** diff --git a/server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java b/server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java index 1b912518d7e04..fc97d67c6c3af 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/replication/TransportSegmentReplicationStatsAction.java @@ -148,7 +148,7 @@ protected SegmentReplicationShardStatsResponse shardOperation(SegmentReplication IndexShard indexShard = indexService.getShard(shardRouting.shardId().id()); ShardId shardId = shardRouting.shardId(); - if (indexShard.indexSettings().isSegRepEnabled() == false) { + if (indexShard.indexSettings().isSegRepEnabledOrRemoteNode() == false) { return null; } diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index e39d66d57e99b..11dc4474cfa42 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -90,7 +90,6 @@ import org.opensearch.index.shard.ShardNotInPrimaryModeException; import org.opensearch.index.shard.ShardPath; import org.opensearch.index.similarity.SimilarityService; -import org.opensearch.index.store.RemoteSegmentStoreDirectory; import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory; import org.opensearch.index.store.Store; import org.opensearch.index.translog.Translog; @@ -494,26 +493,24 @@ public synchronized IndexShard createShard( Store remoteStore = null; boolean seedRemote = false; if (targetNode.isRemoteStoreNode()) { + final Directory remoteDirectory; if (this.indexSettings.isRemoteStoreEnabled()) { - Directory remoteDirectory = remoteDirectoryFactory.newDirectory(this.indexSettings, path); - remoteStore = new Store(shardId, this.indexSettings, remoteDirectory, lock, Store.OnClose.EMPTY, path); + remoteDirectory = remoteDirectoryFactory.newDirectory(this.indexSettings, path); } else { if (sourceNode != null && sourceNode.isRemoteStoreNode() == false) { - assert routing.primary(); + if (routing.primary() == false) { + throw new IllegalStateException("Can't migrate a remote shard to replica before primary " + routing.shardId()); + } logger.info("DocRep shard {} is migrating to remote", shardId); seedRemote = true; } - RemoteSegmentStoreDirectoryFactory directoryFactory = new RemoteSegmentStoreDirectoryFactory( - () -> repositoriesService, - threadPool - ); - RemoteSegmentStoreDirectory remoteDirectory = (RemoteSegmentStoreDirectory) directoryFactory.newDirectory( + remoteDirectory = ((RemoteSegmentStoreDirectoryFactory) remoteDirectoryFactory).newDirectory( RemoteStoreNodeAttribute.getRemoteStoreSegmentRepo(this.indexSettings.getNodeSettings()), this.indexSettings.getUUID(), shardId ); - remoteStore = new Store(shardId, this.indexSettings, remoteDirectory, lock, Store.OnClose.EMPTY, path); } + remoteStore = new Store(shardId, this.indexSettings, remoteDirectory, lock, Store.OnClose.EMPTY, path); } Directory directory = directoryFactory.newDirectory(this.indexSettings, path); @@ -548,7 +545,7 @@ public synchronized IndexShard createShard( retentionLeaseSyncer, circuitBreakerService, translogFactorySupplier, - this.indexSettings.isSegRepEnabled() ? checkpointPublisher : null, + this.indexSettings.isSegRepEnabledOrRemoteNode() ? checkpointPublisher : null, remoteStore, remoteStoreStatsTrackerFactory, clusterRemoteTranslogBufferIntervalSupplier, diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index b37ef3d5f5982..7e49726c259cb 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -1226,16 +1226,16 @@ public int getNumberOfReplicas() { * Every shard on a remote node would also have SegRep enabled even without * proper index setting during the migration. */ - public boolean isSegRepEnabled() { + public boolean isSegRepEnabledOrRemoteNode() { return ReplicationType.SEGMENT.equals(replicationType) || isRemoteNode(); } public boolean isSegRepLocalEnabled() { - return isSegRepEnabled() && !isRemoteStoreEnabled(); + return isSegRepEnabledOrRemoteNode() && !isRemoteStoreEnabled(); } public boolean isSegRepWithRemoteEnabled() { - return isSegRepEnabled() && isRemoteStoreEnabled(); + return isSegRepEnabledOrRemoteNode() && isRemoteStoreEnabled(); } /** diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java b/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java index ce38dd3bb236c..297fe093f7f4e 100644 --- a/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java @@ -145,7 +145,9 @@ public void isSegrepLimitBreached(ShardId shardId) { final IndexService indexService = indicesService.indexService(shardId.getIndex()); if (indexService != null) { final IndexShard shard = indexService.getShard(shardId.id()); - if (isSegmentReplicationBackpressureEnabled && shard.indexSettings().isSegRepEnabled() && shard.routingEntry().primary()) { + if (isSegmentReplicationBackpressureEnabled + && shard.indexSettings().isSegRepEnabledOrRemoteNode() + && shard.routingEntry().primary()) { validateReplicationGroup(shard); } } @@ -264,7 +266,8 @@ protected void runInternal() { stats.getShardStats().get(shardId).getReplicaStats() ); final IndexService indexService = pressureService.indicesService.indexService(shardId.getIndex()); - if (indexService.getIndexSettings() != null && indexService.getIndexSettings().isSegRepEnabled() == false) { + if (indexService.getIndexSettings() != null + && indexService.getIndexSettings().isSegRepEnabledOrRemoteNode() == false) { return; } final IndexShard primaryShard = indexService.getShard(shardId.getId()); diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationStatsTracker.java b/server/src/main/java/org/opensearch/index/SegmentReplicationStatsTracker.java index f5fc8aa1c1eea..e48a76c438057 100644 --- a/server/src/main/java/org/opensearch/index/SegmentReplicationStatsTracker.java +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationStatsTracker.java @@ -45,7 +45,7 @@ public SegmentReplicationStats getStats() { Map stats = new HashMap<>(); for (IndexService indexService : indicesService) { for (IndexShard indexShard : indexService) { - if (indexShard.indexSettings().isSegRepEnabled() && indexShard.routingEntry().primary()) { + if (indexShard.indexSettings().isSegRepEnabledOrRemoteNode() && indexShard.routingEntry().primary()) { stats.putIfAbsent(indexShard.shardId(), getStatsForShard(indexShard)); } } diff --git a/server/src/main/java/org/opensearch/index/engine/EngineConfig.java b/server/src/main/java/org/opensearch/index/engine/EngineConfig.java index bf3e10d684c94..8106b65bddeec 100644 --- a/server/src/main/java/org/opensearch/index/engine/EngineConfig.java +++ b/server/src/main/java/org/opensearch/index/engine/EngineConfig.java @@ -244,7 +244,7 @@ private static void doValidateCodecSettings(final String codec) { * Creates a new {@link org.opensearch.index.engine.EngineConfig} */ private EngineConfig(Builder builder) { - if (builder.isReadOnlyReplica && builder.indexSettings.isSegRepEnabled() == false) { + if (builder.isReadOnlyReplica && builder.indexSettings.isSegRepEnabledOrRemoteNode() == false) { throw new IllegalArgumentException("Shard can only be wired as a read only replica with Segment Replication enabled"); } this.shardId = builder.shardId; @@ -491,7 +491,7 @@ public LongSupplier getPrimaryTermSupplier() { * @return true if this engine should be wired as read only. */ public boolean isReadOnlyReplica() { - return indexSettings.isSegRepEnabled() && isReadOnlyReplica; + return indexSettings.isSegRepEnabledOrRemoteNode() && isReadOnlyReplica; } /** diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index a25ec95f58e05..7bacec22fc850 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -710,7 +710,7 @@ private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op) final OpVsLuceneDocStatus status; VersionValue versionValue = getVersionFromMap(op.uid().bytes()); assert incrementVersionLookup(); - boolean segRepEnabled = engineConfig.getIndexSettings().isSegRepEnabled(); + boolean segRepEnabled = engineConfig.getIndexSettings().isSegRepEnabledOrRemoteNode(); if (versionValue != null) { status = compareOpToVersionMapOnSeqNo(op.id(), op.seqNo(), op.primaryTerm(), versionValue); } else { @@ -1005,7 +1005,7 @@ protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IO assert maxSeqNoOfUpdatesOrDeletes < index.seqNo() : index.seqNo() + ">=" + maxSeqNoOfUpdatesOrDeletes; plan = IndexingStrategy.optimizedAppendOnly(index.version(), 0); } else { - boolean segRepEnabled = engineConfig.getIndexSettings().isSegRepEnabled(); + boolean segRepEnabled = engineConfig.getIndexSettings().isSegRepEnabledOrRemoteNode(); versionMap.enforceSafeAccess(); final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index); if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) { @@ -1452,7 +1452,7 @@ protected final DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws // See testRecoveryWithOutOfOrderDelete for an example of peer recovery plan = DeletionStrategy.processButSkipLucene(false, delete.version()); } else { - boolean segRepEnabled = engineConfig.getIndexSettings().isSegRepEnabled(); + boolean segRepEnabled = engineConfig.getIndexSettings().isSegRepEnabledOrRemoteNode(); final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(delete); if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) { if (segRepEnabled) { @@ -1868,7 +1868,7 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException { // only after the active reader is updated. This ensures that a flush does not wipe out a required commit point file // while we are // in refresh listeners. - final GatedCloseable latestCommit = engineConfig.getIndexSettings().isSegRepEnabled() + final GatedCloseable latestCommit = engineConfig.getIndexSettings().isSegRepEnabledOrRemoteNode() ? acquireLastIndexCommit(false) : null; commitIndexWriter(indexWriter, translogManager.getTranslogUUID()); diff --git a/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java index 7b9c1d3aa548f..0e625e9f30320 100644 --- a/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java @@ -1059,7 +1059,7 @@ public ReplicationTracker( this.fileBasedRecoveryThreshold = IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING.get(indexSettings.getSettings()); this.safeCommitInfoSupplier = safeCommitInfoSupplier; this.onReplicationGroupUpdated = onReplicationGroupUpdated; - this.latestReplicationCheckpoint = indexSettings.isSegRepEnabled() ? ReplicationCheckpoint.empty(shardId) : null; + this.latestReplicationCheckpoint = indexSettings.isSegRepEnabledOrRemoteNode() ? ReplicationCheckpoint.empty(shardId) : null; assert Version.V_EMPTY.equals(indexSettings.getIndexVersionCreated()) == false; assert invariant(); } @@ -1173,7 +1173,7 @@ public synchronized void updateGlobalCheckpointForShard(final String allocationI * @param visibleCheckpoint the visible checkpoint */ public synchronized void updateVisibleCheckpointForShard(final String allocationId, final ReplicationCheckpoint visibleCheckpoint) { - assert indexSettings.isSegRepEnabled(); + assert indexSettings.isSegRepEnabledOrRemoteNode(); assert primaryMode; assert handoffInProgress == false; assert invariant(); @@ -1217,7 +1217,7 @@ public synchronized void updateVisibleCheckpointForShard(final String allocation * @param checkpoint {@link ReplicationCheckpoint} */ public synchronized void setLatestReplicationCheckpoint(ReplicationCheckpoint checkpoint) { - assert indexSettings.isSegRepEnabled(); + assert indexSettings.isSegRepEnabledOrRemoteNode(); if (checkpoint.equals(latestReplicationCheckpoint) == false) { this.latestReplicationCheckpoint = checkpoint; } @@ -1269,7 +1269,7 @@ && isPrimaryRelocation(allocationId) == false * @param checkpoint {@link ReplicationCheckpoint} */ public synchronized void startReplicationLagTimers(ReplicationCheckpoint checkpoint) { - assert indexSettings.isSegRepEnabled(); + assert indexSettings.isSegRepEnabledOrRemoteNode(); if (checkpoint.equals(latestReplicationCheckpoint) == false) { this.latestReplicationCheckpoint = checkpoint; } @@ -1294,7 +1294,7 @@ && isPrimaryRelocation(e.getKey()) == false * V2 - Set of {@link SegmentReplicationShardStats} per shard in this primary's replication group. */ public synchronized Set getSegmentReplicationStats() { - assert indexSettings.isSegRepEnabled(); + assert indexSettings.isSegRepEnabledOrRemoteNode(); if (primaryMode) { return this.checkpoints.entrySet() .stream() diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 351c4ac5e63f5..c447c3e45a667 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -650,7 +650,7 @@ public void updateShardState( // Flush here after relocation of primary, so that replica get all changes from new primary rather than waiting for more // docs to get indexed. - if (indexSettings.isSegRepEnabled()) { + if (indexSettings.isSegRepEnabledOrRemoteNode()) { flush(new FlushRequest().waitIfOngoing(true).force(true)); } } else if (currentRouting.primary() @@ -730,7 +730,7 @@ public void updateShardState( + newRouting; assert getOperationPrimaryTerm() == newPrimaryTerm; try { - if (indexSettings.isSegRepEnabled()) { + if (indexSettings.isSegRepEnabledOrRemoteNode()) { // this Shard's engine was read only, we need to update its engine before restoring local history from xlog. assert newRouting.primary() && currentRouting.primary() == false; ReplicationTimer timer = new ReplicationTimer(); @@ -750,7 +750,7 @@ public void updateShardState( } replicationTracker.activatePrimaryMode(getLocalCheckpoint()); - if (indexSettings.isSegRepEnabled()) { + if (indexSettings.isSegRepEnabledOrRemoteNode()) { // force publish a checkpoint once in primary mode so that replicas not caught up to previous primary // are brought up to date. checkpointPublisher.publish(this, getLatestReplicationCheckpoint()); @@ -1068,7 +1068,7 @@ private Engine.IndexResult applyIndexOperation( // For Segment Replication enabled replica shards we can be skip parsing the documents as we directly copy segments from primary // shard. - if (indexSettings.isSegRepEnabled() && routingEntry().primary() == false) { + if (indexSettings.isSegRepEnabledOrRemoteNode() && routingEntry().primary() == false) { Engine.Index index = new Engine.Index( new Term(IdFieldMapper.NAME, Uid.encodeId(id)), new ParsedDocument(null, null, id, null, null, sourceToParse.source(), sourceToParse.getMediaType(), null), @@ -1267,7 +1267,7 @@ public Engine.DeleteResult applyDeleteOperationOnPrimary( } public Engine.DeleteResult applyDeleteOperationOnReplica(long seqNo, long opPrimaryTerm, long version, String id) throws IOException { - if (indexSettings.isSegRepEnabled()) { + if (indexSettings.isSegRepEnabledOrRemoteNode()) { final Engine.Delete delete = new Engine.Delete( id, new Term(IdFieldMapper.NAME, Uid.encodeId(id)), @@ -1467,7 +1467,7 @@ public SegmentsStats segmentStats(boolean includeSegmentFileSizes, boolean inclu new RemoteSegmentStats(remoteStoreStatsTrackerFactory.getRemoteSegmentTransferTracker(shardId).stats()) ); } - if (indexSettings.isSegRepEnabled()) { + if (indexSettings.isSegRepEnabledOrRemoteNode()) { segmentsStats.addReplicationStats(getReplicationStats()); } return segmentsStats; @@ -1523,7 +1523,7 @@ public void flush(FlushRequest request) { * {@link org.opensearch.index.translog.TranslogDeletionPolicy} for details */ public void trimTranslog() { - if (isRemoteTranslogEnabled() || isMigratingToRemote()) { + if (indexSettings.isRemoteNode()) { return; } verifyNotClosed(); @@ -1688,7 +1688,7 @@ public ReplicationCheckpoint getLatestReplicationCheckpoint() { * */ public Tuple, ReplicationCheckpoint> getLatestSegmentInfosAndCheckpoint() { - assert indexSettings.isSegRepEnabled(); + assert indexSettings.isSegRepEnabledOrRemoteNode(); // do not close the snapshot - caller will close it. GatedCloseable snapshot = null; @@ -1747,7 +1747,7 @@ ReplicationCheckpoint computeReplicationCheckpoint(SegmentInfos segmentInfos) th * @return - True if the shard is able to perform segment replication. */ public boolean isSegmentReplicationAllowed() { - if (indexSettings.isSegRepEnabled() == false) { + if (indexSettings.isSegRepEnabledOrRemoteNode() == false) { logger.trace("Attempting to perform segment replication when it is not enabled on the index"); return false; } @@ -2555,7 +2555,7 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier, b onNewEngine(newEngine); currentEngineReference.set(newEngine); - if (indexSettings.isSegRepEnabled()) { + if (indexSettings.isSegRepEnabledOrRemoteNode()) { // set initial replication checkpoints into tracker. updateReplicationCheckpoint(); } @@ -2967,7 +2967,7 @@ public Translog.Snapshot getHistoryOperations(String reason, long startingSeqNo, * This method should only be invoked if Segment Replication or Remote Store is not enabled. */ public Translog.Snapshot getHistoryOperationsFromTranslog(long startingSeqNo, long endSeqNo) throws IOException { - assert indexSettings.isSegRepEnabled() == false + assert indexSettings.isSegRepEnabledOrRemoteNode() == false : "unsupported operation for segment replication enabled indices or remote store backed indices"; return getEngine().translogManager().newChangesSnapshot(startingSeqNo, endSeqNo, true); } @@ -3134,7 +3134,7 @@ public Set getReplicationStatsForTrackedReplicas() } public ReplicationStats getReplicationStats() { - if (indexSettings.isSegRepEnabled() && routingEntry().primary()) { + if (indexSettings.isSegRepEnabledOrRemoteNode() && routingEntry().primary()) { final Set stats = getReplicationStatsForTrackedReplicas(); long maxBytesBehind = stats.stream().mapToLong(SegmentReplicationShardStats::getBytesBehindCount).max().orElse(0L); long totalBytesBehind = stats.stream().mapToLong(SegmentReplicationShardStats::getBytesBehindCount).sum(); @@ -3538,7 +3538,6 @@ private void postActivatePrimaryMode() { // This helps to get a consistent state in remote store where both remote segment store and remote // translog contains data. try { - logger.info("Activating primary mode now"); getEngine().translogManager().syncTranslog(); } catch (IOException e) { logger.error("Failed to sync translog to remote from new primary", e); @@ -3921,7 +3920,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro internalRefreshListener.clear(); internalRefreshListener.add(new RefreshMetricUpdater(refreshMetric)); - if (indexSettings.isSegRepEnabled()) { + if (indexSettings.isSegRepEnabledOrRemoteNode()) { internalRefreshListener.add(new ReplicationCheckpointUpdater()); } if (this.checkpointPublisher != null && shardRouting.primary() && indexSettings.isSegRepLocalEnabled()) { @@ -3942,7 +3941,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro With segment replication enabled for primary relocation, recover replica shard initially as read only and change to a writeable engine during relocation handoff after a round of segment replication. */ - boolean isReadOnlyReplica = indexSettings.isSegRepEnabled() + boolean isReadOnlyReplica = indexSettings.isSegRepEnabledOrRemoteNode() && (shardRouting.primary() == false || (shardRouting.isRelocationTarget() && recoveryState.getStage() != RecoveryState.Stage.FINALIZE)); @@ -4326,7 +4325,7 @@ private void innerAcquireReplicaOperationPermit( ); // With Segment Replication enabled, we never want to reset a replica's engine unless // it is promoted to primary. - if (currentGlobalCheckpoint < maxSeqNo && indexSettings.isSegRepEnabled() == false) { + if (currentGlobalCheckpoint < maxSeqNo && indexSettings.isSegRepEnabledOrRemoteNode() == false) { resetEngineToGlobalCheckpoint(); } else { getEngine().translogManager().rollTranslogGeneration(); @@ -4621,7 +4620,7 @@ public final boolean isSearchIdleSupported() { if (isRemoteTranslogEnabled() || indexSettings.isRemoteNode()) { return false; } - return indexSettings.isSegRepEnabled() == false || indexSettings.getNumberOfReplicas() == 0; + return indexSettings.isSegRepEnabledOrRemoteNode() == false || indexSettings.getNumberOfReplicas() == 0; } /** @@ -4905,7 +4904,9 @@ public void close() throws IOException { // of truth for translog, we play all translogs that exists locally. Otherwise, the recoverUpto happens upto global checkpoint. // We also replay all local translog ops with Segment replication, because on engine swap our local translog may // hold more ops than the global checkpoint. - long recoverUpto = this.isRemoteTranslogEnabled() || indexSettings().isSegRepEnabled() ? Long.MAX_VALUE : globalCheckpoint; + long recoverUpto = this.isRemoteTranslogEnabled() || indexSettings().isSegRepEnabledOrRemoteNode() + ? Long.MAX_VALUE + : globalCheckpoint; newEngineReference.get() .translogManager() .recoverFromTranslog(translogRunner, newEngineReference.get().getProcessedLocalCheckpoint(), recoverUpto); diff --git a/server/src/main/java/org/opensearch/index/store/Store.java b/server/src/main/java/org/opensearch/index/store/Store.java index 4e17c81935549..0992d86d6f0aa 100644 --- a/server/src/main/java/org/opensearch/index/store/Store.java +++ b/server/src/main/java/org/opensearch/index/store/Store.java @@ -385,7 +385,7 @@ public MetadataSnapshot getMetadata(SegmentInfos segmentInfos) throws IOExceptio * @return {@link Map} map file name to {@link StoreFileMetadata}. */ public Map getSegmentMetadataMap(SegmentInfos segmentInfos) throws IOException { - assert indexSettings.isSegRepEnabled() || indexSettings.isRemoteNode(); + assert indexSettings.isSegRepEnabledOrRemoteNode(); failIfCorrupted(); try { return loadMetadata(segmentInfos, directory, logger, true).fileMetadata; @@ -893,7 +893,7 @@ public void beforeClose() { * @throws IOException when there is an IO error committing. */ public void commitSegmentInfos(SegmentInfos latestSegmentInfos, long maxSeqNo, long processedCheckpoint) throws IOException { - assert indexSettings.isSegRepEnabled() || indexSettings.isRemoteNode(); + assert indexSettings.isSegRepEnabledOrRemoteNode() || indexSettings.isRemoteNode(); metadataLock.writeLock().lock(); try { final Map userData = new HashMap<>(latestSegmentInfos.getUserData()); diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index 76696ea4bcc85..9bc81c1826c2d 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -947,7 +947,7 @@ private EngineFactory getEngineFactory(final IndexSettings idxSettings) { if (idxSettings.isRemoteSnapshot()) { return config -> new ReadOnlyEngine(config, new SeqNoStats(0, 0, 0), new TranslogStats(), true, Function.identity(), false); } - if (idxSettings.isSegRepEnabled() || idxSettings.isRemoteNode()) { + if (idxSettings.isSegRepEnabledOrRemoteNode() || idxSettings.isRemoteNode()) { return new NRTReplicationEngineFactory(); } return new InternalEngineFactory(); diff --git a/server/src/main/java/org/opensearch/indices/recovery/MultiFileWriter.java b/server/src/main/java/org/opensearch/indices/recovery/MultiFileWriter.java index 29ee097d36cac..fac6924435cf3 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/MultiFileWriter.java +++ b/server/src/main/java/org/opensearch/indices/recovery/MultiFileWriter.java @@ -161,7 +161,7 @@ private void innerWriteFileChunk(StoreFileMetadata fileMetadata, long position, + "] in " + Arrays.toString(store.directory().listAll()); // With Segment Replication, we will fsync after a full commit has been received. - if (store.indexSettings().isSegRepEnabled() == false) { + if (store.indexSettings().isSegRepEnabledOrRemoteNode() == false) { store.directory().sync(Collections.singleton(temporaryFileName)); } IndexOutput remove = removeOpenIndexOutputs(name); diff --git a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoverySourceService.java b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoverySourceService.java index 1fcebcba5555a..30f517fda9931 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoverySourceService.java +++ b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoverySourceService.java @@ -377,7 +377,7 @@ private Tuple createRecovery request.targetNode(), recoverySettings, throttleTime -> shard.recoveryStats().addThrottleTime(throttleTime), - request.sourceNode().isRemoteStoreNode() || request.targetNode().isRemoteStoreNode() + shard.isRemoteTranslogEnabled() || request.targetNode().isRemoteStoreNode() ); handler = RecoverySourceHandlerFactory.create(shard, recoveryTarget, request, recoverySettings); return Tuple.tuple(handler, recoveryTarget); diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java index 2434efb241f5f..abf9b1aaeb2cc 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java @@ -841,7 +841,7 @@ void finalizeRecovery(long targetLocalCheckpoint, long trimAboveSeqNo, ActionLis if (request.isPrimaryRelocation()) { logger.trace("performing relocation hand-off"); - final Runnable forceSegRepRunnable = shard.indexSettings().isSegRepEnabled() + final Runnable forceSegRepRunnable = shard.indexSettings().isSegRepEnabledOrRemoteNode() || (request.sourceNode().isRemoteStoreNode() && request.targetNode().isRemoteStoreNode()) ? recoveryTarget::forceSegmentFileSync : () -> {}; @@ -857,7 +857,7 @@ void finalizeRecovery(long targetLocalCheckpoint, long trimAboveSeqNo, ActionLis */ } else { // Force round of segment replication to update its checkpoint to primary's - if (shard.indexSettings().isSegRepEnabled()) { + if (shard.indexSettings().isSegRepEnabledOrRemoteNode()) { cancellableThreads.execute(recoveryTarget::forceSegmentFileSync); } } diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java index a8dfcf0b771af..16311d5d2cfb7 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java @@ -374,7 +374,7 @@ public void cleanFiles( // Replicas for segment replication or remote snapshot indices do not create // their own commit points and therefore do not modify the commit user data // in their store. In these cases, reuse the primary's translog UUID. - final boolean reuseTranslogUUID = indexShard.indexSettings().isSegRepEnabled() + final boolean reuseTranslogUUID = indexShard.indexSettings().isSegRepEnabledOrRemoteNode() || indexShard.indexSettings().isRemoteSnapshot(); if (reuseTranslogUUID) { final String translogUUID = store.getMetadata().getCommitUserData().get(TRANSLOG_UUID_KEY); diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java index 4062f9702fb3a..a393faabae0ea 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java @@ -175,7 +175,7 @@ public void clusterChanged(ClusterChangedEvent event) { // we need to ensure its state has cleared up in ongoing replications. if (event.routingTableChanged()) { for (IndexService indexService : indicesService) { - if (indexService.getIndexSettings().isSegRepEnabled()) { + if (indexService.getIndexSettings().isSegRepEnabledOrRemoteNode()) { for (IndexShard indexShard : indexService) { if (indexShard.routingEntry().primary()) { final IndexMetadata indexMetadata = indexService.getIndexSettings().getIndexMetadata(); @@ -221,7 +221,7 @@ protected void doClose() throws IOException { */ @Override public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) { - if (indexShard != null && indexShard.indexSettings().isSegRepEnabled()) { + if (indexShard != null && indexShard.indexSettings().isSegRepEnabledOrRemoteNode()) { ongoingSegmentReplications.cancel(indexShard, "shard is closed"); } } @@ -231,7 +231,10 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh */ @Override public void shardRoutingChanged(IndexShard indexShard, @Nullable ShardRouting oldRouting, ShardRouting newRouting) { - if (indexShard != null && indexShard.indexSettings().isSegRepEnabled() && oldRouting.primary() == false && newRouting.primary()) { + if (indexShard != null + && indexShard.indexSettings().isSegRepEnabledOrRemoteNode() + && oldRouting.primary() == false + && newRouting.primary()) { ongoingSegmentReplications.cancel(indexShard.routingEntry().allocationId().getId(), "Relocating primary shard."); } } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index f28f829545d59..4942d39cfa48a 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -168,7 +168,8 @@ protected void doClose() throws IOException { public void clusterChanged(ClusterChangedEvent event) { if (event.routingTableChanged()) { for (IndexService indexService : indicesService) { - if (indexService.getIndexSettings().isSegRepEnabled() && event.indexRoutingTableChanged(indexService.index().getName())) { + if (indexService.getIndexSettings().isSegRepEnabledOrRemoteNode() + && event.indexRoutingTableChanged(indexService.index().getName())) { for (IndexShard shard : indexService) { if (shard.routingEntry().primary() == false) { // for this shard look up its primary routing, if it has completed a relocation trigger replication @@ -197,7 +198,7 @@ public void clusterChanged(ClusterChangedEvent event) { */ @Override public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) { - if (indexShard != null && indexShard.indexSettings().isSegRepEnabled()) { + if (indexShard != null && indexShard.indexSettings().isSegRepEnabledOrRemoteNode()) { onGoingReplications.cancelForShard(indexShard.shardId(), "Shard closing"); latestReceivedCheckpoint.remove(shardId); } @@ -209,7 +210,7 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh */ @Override public void afterIndexShardStarted(IndexShard indexShard) { - if (indexShard.indexSettings().isSegRepEnabled() && indexShard.routingEntry().primary() == false) { + if (indexShard.indexSettings().isSegRepEnabledOrRemoteNode() && indexShard.routingEntry().primary() == false) { processLatestReceivedCheckpoint(indexShard, Thread.currentThread()); } } @@ -219,7 +220,10 @@ public void afterIndexShardStarted(IndexShard indexShard) { */ @Override public void shardRoutingChanged(IndexShard indexShard, @Nullable ShardRouting oldRouting, ShardRouting newRouting) { - if (oldRouting != null && indexShard.indexSettings().isSegRepEnabled() && oldRouting.primary() == false && newRouting.primary()) { + if (oldRouting != null + && indexShard.indexSettings().isSegRepEnabledOrRemoteNode() + && oldRouting.primary() == false + && newRouting.primary()) { onGoingReplications.cancelForShard(indexShard.shardId(), "Shard has been promoted to primary"); latestReceivedCheckpoint.remove(indexShard.shardId()); } diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java index 1c25d8c71f948..89f1ea142336e 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java @@ -381,7 +381,7 @@ private void snapshot( if (indexShard.routingEntry().primary() == false) { throw new IndexShardSnapshotFailedException(shardId, "snapshot should be performed only on primary"); } - if (indexShard.indexSettings().isSegRepEnabled() && indexShard.isPrimaryMode() == false) { + if (indexShard.indexSettings().isSegRepEnabledOrRemoteNode() && indexShard.isPrimaryMode() == false) { throw new IndexShardSnapshotFailedException( shardId, "snapshot triggered on a new primary following failover and cannot proceed until promotion is complete" diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index ffc2008c14c1d..a2f9eb677c0ac 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -1059,7 +1059,7 @@ protected void recoverReplica( } public Function, List> getReplicationFunc(final IndexShard target) { - return target.indexSettings().isSegRepEnabled() ? (shardList) -> { + return target.indexSettings().isSegRepEnabledOrRemoteNode() ? (shardList) -> { try { assert shardList.size() >= 2; final IndexShard primary = shardList.get(0);