Skip to content

Commit

Permalink
Address PR comments
Browse files Browse the repository at this point in the history
Signed-off-by: Gaurav Bafna <[email protected]>
  • Loading branch information
gbbafna committed Mar 21, 2024
1 parent d075375 commit 72cefe9
Show file tree
Hide file tree
Showing 21 changed files with 77 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ public void testIndexReplicationSettingOverridesSegRepClusterSetting() throws Ex
Index index = resolveIndex(INDEX_NAME);
Index anotherIndex = resolveIndex(ANOTHER_INDEX);
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, primaryNode);
assertEquals(indicesService.indexService(index).getIndexSettings().isSegRepEnabled(), false);
assertEquals(indicesService.indexService(anotherIndex).getIndexSettings().isSegRepEnabled(), true);
assertEquals(indicesService.indexService(index).getIndexSettings().isSegRepEnabledOrRemoteNode(), false);
assertEquals(indicesService.indexService(anotherIndex).getIndexSettings().isSegRepEnabledOrRemoteNode(), true);
}

public void testIndexReplicationSettingOverridesDocRepClusterSetting() throws Exception {
Expand Down Expand Up @@ -119,8 +119,8 @@ public void testIndexReplicationSettingOverridesDocRepClusterSetting() throws Ex
Index index = resolveIndex(INDEX_NAME);
Index anotherIndex = resolveIndex(ANOTHER_INDEX);
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, primaryNode);
assertEquals(indicesService.indexService(index).getIndexSettings().isSegRepEnabled(), true);
assertEquals(indicesService.indexService(anotherIndex).getIndexSettings().isSegRepEnabled(), false);
assertEquals(indicesService.indexService(index).getIndexSettings().isSegRepEnabledOrRemoteNode(), true);
assertEquals(indicesService.indexService(anotherIndex).getIndexSettings().isSegRepEnabledOrRemoteNode(), false);
}

public void testReplicationTypesOverrideNotAllowed_IndexAPI() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ public void testSnapshotRestoreOnIndexWithSegRepClusterSetting() throws Exceptio
// Verify index setting isSegRepEnabled.
Index index = resolveIndex(RESTORED_INDEX_NAME);
IndicesService indicesService = internalCluster().getInstance(IndicesService.class);
assertEquals(indicesService.indexService(index).getIndexSettings().isSegRepEnabled(), false);
assertEquals(indicesService.indexService(index).getIndexSettings().isSegRepEnabledOrRemoteNode(), false);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ protected SegmentReplicationShardStatsResponse shardOperation(SegmentReplication
IndexShard indexShard = indexService.getShard(shardRouting.shardId().id());
ShardId shardId = shardRouting.shardId();

if (indexShard.indexSettings().isSegRepEnabled() == false) {
if (indexShard.indexSettings().isSegRepEnabledOrRemoteNode() == false) {
return null;
}

Expand Down
19 changes: 8 additions & 11 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@
import org.opensearch.index.shard.ShardNotInPrimaryModeException;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.similarity.SimilarityService;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.Translog;
Expand Down Expand Up @@ -494,26 +493,24 @@ public synchronized IndexShard createShard(
Store remoteStore = null;
boolean seedRemote = false;
if (targetNode.isRemoteStoreNode()) {
final Directory remoteDirectory;
if (this.indexSettings.isRemoteStoreEnabled()) {
Directory remoteDirectory = remoteDirectoryFactory.newDirectory(this.indexSettings, path);
remoteStore = new Store(shardId, this.indexSettings, remoteDirectory, lock, Store.OnClose.EMPTY, path);
remoteDirectory = remoteDirectoryFactory.newDirectory(this.indexSettings, path);
} else {
if (sourceNode != null && sourceNode.isRemoteStoreNode() == false) {
assert routing.primary();
if (routing.primary() == false) {
throw new IllegalStateException("Can't migrate a remote shard to replica before primary " + routing.shardId());
}
logger.info("DocRep shard {} is migrating to remote", shardId);
seedRemote = true;
}
RemoteSegmentStoreDirectoryFactory directoryFactory = new RemoteSegmentStoreDirectoryFactory(
() -> repositoriesService,
threadPool
);
RemoteSegmentStoreDirectory remoteDirectory = (RemoteSegmentStoreDirectory) directoryFactory.newDirectory(
remoteDirectory = ((RemoteSegmentStoreDirectoryFactory) remoteDirectoryFactory).newDirectory(
RemoteStoreNodeAttribute.getRemoteStoreSegmentRepo(this.indexSettings.getNodeSettings()),
this.indexSettings.getUUID(),
shardId
);
remoteStore = new Store(shardId, this.indexSettings, remoteDirectory, lock, Store.OnClose.EMPTY, path);
}
remoteStore = new Store(shardId, this.indexSettings, remoteDirectory, lock, Store.OnClose.EMPTY, path);
}

Directory directory = directoryFactory.newDirectory(this.indexSettings, path);
Expand Down Expand Up @@ -548,7 +545,7 @@ public synchronized IndexShard createShard(
retentionLeaseSyncer,
circuitBreakerService,
translogFactorySupplier,
this.indexSettings.isSegRepEnabled() ? checkpointPublisher : null,
this.indexSettings.isSegRepEnabledOrRemoteNode() ? checkpointPublisher : null,
remoteStore,
remoteStoreStatsTrackerFactory,
clusterRemoteTranslogBufferIntervalSupplier,
Expand Down
6 changes: 3 additions & 3 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -1226,16 +1226,16 @@ public int getNumberOfReplicas() {
* Every shard on a remote node would also have SegRep enabled even without
* proper index setting during the migration.
*/
public boolean isSegRepEnabled() {
public boolean isSegRepEnabledOrRemoteNode() {
return ReplicationType.SEGMENT.equals(replicationType) || isRemoteNode();
}

public boolean isSegRepLocalEnabled() {
return isSegRepEnabled() && !isRemoteStoreEnabled();
return isSegRepEnabledOrRemoteNode() && !isRemoteStoreEnabled();
}

public boolean isSegRepWithRemoteEnabled() {
return isSegRepEnabled() && isRemoteStoreEnabled();
return isSegRepEnabledOrRemoteNode() && isRemoteStoreEnabled();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,9 @@ public void isSegrepLimitBreached(ShardId shardId) {
final IndexService indexService = indicesService.indexService(shardId.getIndex());
if (indexService != null) {
final IndexShard shard = indexService.getShard(shardId.id());
if (isSegmentReplicationBackpressureEnabled && shard.indexSettings().isSegRepEnabled() && shard.routingEntry().primary()) {
if (isSegmentReplicationBackpressureEnabled
&& shard.indexSettings().isSegRepEnabledOrRemoteNode()
&& shard.routingEntry().primary()) {
validateReplicationGroup(shard);
}
}
Expand Down Expand Up @@ -264,7 +266,8 @@ protected void runInternal() {
stats.getShardStats().get(shardId).getReplicaStats()
);
final IndexService indexService = pressureService.indicesService.indexService(shardId.getIndex());
if (indexService.getIndexSettings() != null && indexService.getIndexSettings().isSegRepEnabled() == false) {
if (indexService.getIndexSettings() != null
&& indexService.getIndexSettings().isSegRepEnabledOrRemoteNode() == false) {
return;
}
final IndexShard primaryShard = indexService.getShard(shardId.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public SegmentReplicationStats getStats() {
Map<ShardId, SegmentReplicationPerGroupStats> stats = new HashMap<>();
for (IndexService indexService : indicesService) {
for (IndexShard indexShard : indexService) {
if (indexShard.indexSettings().isSegRepEnabled() && indexShard.routingEntry().primary()) {
if (indexShard.indexSettings().isSegRepEnabledOrRemoteNode() && indexShard.routingEntry().primary()) {
stats.putIfAbsent(indexShard.shardId(), getStatsForShard(indexShard));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ private static void doValidateCodecSettings(final String codec) {
* Creates a new {@link org.opensearch.index.engine.EngineConfig}
*/
private EngineConfig(Builder builder) {
if (builder.isReadOnlyReplica && builder.indexSettings.isSegRepEnabled() == false) {
if (builder.isReadOnlyReplica && builder.indexSettings.isSegRepEnabledOrRemoteNode() == false) {
throw new IllegalArgumentException("Shard can only be wired as a read only replica with Segment Replication enabled");
}
this.shardId = builder.shardId;
Expand Down Expand Up @@ -491,7 +491,7 @@ public LongSupplier getPrimaryTermSupplier() {
* @return true if this engine should be wired as read only.
*/
public boolean isReadOnlyReplica() {
return indexSettings.isSegRepEnabled() && isReadOnlyReplica;
return indexSettings.isSegRepEnabledOrRemoteNode() && isReadOnlyReplica;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -710,7 +710,7 @@ private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op)
final OpVsLuceneDocStatus status;
VersionValue versionValue = getVersionFromMap(op.uid().bytes());
assert incrementVersionLookup();
boolean segRepEnabled = engineConfig.getIndexSettings().isSegRepEnabled();
boolean segRepEnabled = engineConfig.getIndexSettings().isSegRepEnabledOrRemoteNode();
if (versionValue != null) {
status = compareOpToVersionMapOnSeqNo(op.id(), op.seqNo(), op.primaryTerm(), versionValue);
} else {
Expand Down Expand Up @@ -1005,7 +1005,7 @@ protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IO
assert maxSeqNoOfUpdatesOrDeletes < index.seqNo() : index.seqNo() + ">=" + maxSeqNoOfUpdatesOrDeletes;
plan = IndexingStrategy.optimizedAppendOnly(index.version(), 0);
} else {
boolean segRepEnabled = engineConfig.getIndexSettings().isSegRepEnabled();
boolean segRepEnabled = engineConfig.getIndexSettings().isSegRepEnabledOrRemoteNode();
versionMap.enforceSafeAccess();
final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index);
if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) {
Expand Down Expand Up @@ -1452,7 +1452,7 @@ protected final DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws
// See testRecoveryWithOutOfOrderDelete for an example of peer recovery
plan = DeletionStrategy.processButSkipLucene(false, delete.version());
} else {
boolean segRepEnabled = engineConfig.getIndexSettings().isSegRepEnabled();
boolean segRepEnabled = engineConfig.getIndexSettings().isSegRepEnabledOrRemoteNode();
final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(delete);
if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) {
if (segRepEnabled) {
Expand Down Expand Up @@ -1868,7 +1868,7 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
// only after the active reader is updated. This ensures that a flush does not wipe out a required commit point file
// while we are
// in refresh listeners.
final GatedCloseable<IndexCommit> latestCommit = engineConfig.getIndexSettings().isSegRepEnabled()
final GatedCloseable<IndexCommit> latestCommit = engineConfig.getIndexSettings().isSegRepEnabledOrRemoteNode()
? acquireLastIndexCommit(false)
: null;
commitIndexWriter(indexWriter, translogManager.getTranslogUUID());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1059,7 +1059,7 @@ public ReplicationTracker(
this.fileBasedRecoveryThreshold = IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING.get(indexSettings.getSettings());
this.safeCommitInfoSupplier = safeCommitInfoSupplier;
this.onReplicationGroupUpdated = onReplicationGroupUpdated;
this.latestReplicationCheckpoint = indexSettings.isSegRepEnabled() ? ReplicationCheckpoint.empty(shardId) : null;
this.latestReplicationCheckpoint = indexSettings.isSegRepEnabledOrRemoteNode() ? ReplicationCheckpoint.empty(shardId) : null;
assert Version.V_EMPTY.equals(indexSettings.getIndexVersionCreated()) == false;
assert invariant();
}
Expand Down Expand Up @@ -1173,7 +1173,7 @@ public synchronized void updateGlobalCheckpointForShard(final String allocationI
* @param visibleCheckpoint the visible checkpoint
*/
public synchronized void updateVisibleCheckpointForShard(final String allocationId, final ReplicationCheckpoint visibleCheckpoint) {
assert indexSettings.isSegRepEnabled();
assert indexSettings.isSegRepEnabledOrRemoteNode();
assert primaryMode;
assert handoffInProgress == false;
assert invariant();
Expand Down Expand Up @@ -1217,7 +1217,7 @@ public synchronized void updateVisibleCheckpointForShard(final String allocation
* @param checkpoint {@link ReplicationCheckpoint}
*/
public synchronized void setLatestReplicationCheckpoint(ReplicationCheckpoint checkpoint) {
assert indexSettings.isSegRepEnabled();
assert indexSettings.isSegRepEnabledOrRemoteNode();
if (checkpoint.equals(latestReplicationCheckpoint) == false) {
this.latestReplicationCheckpoint = checkpoint;
}
Expand Down Expand Up @@ -1269,7 +1269,7 @@ && isPrimaryRelocation(allocationId) == false
* @param checkpoint {@link ReplicationCheckpoint}
*/
public synchronized void startReplicationLagTimers(ReplicationCheckpoint checkpoint) {
assert indexSettings.isSegRepEnabled();
assert indexSettings.isSegRepEnabledOrRemoteNode();
if (checkpoint.equals(latestReplicationCheckpoint) == false) {
this.latestReplicationCheckpoint = checkpoint;
}
Expand All @@ -1294,7 +1294,7 @@ && isPrimaryRelocation(e.getKey()) == false
* V2 - Set of {@link SegmentReplicationShardStats} per shard in this primary's replication group.
*/
public synchronized Set<SegmentReplicationShardStats> getSegmentReplicationStats() {
assert indexSettings.isSegRepEnabled();
assert indexSettings.isSegRepEnabledOrRemoteNode();
if (primaryMode) {
return this.checkpoints.entrySet()
.stream()
Expand Down
Loading

0 comments on commit 72cefe9

Please sign in to comment.