Skip to content

Commit

Permalink
Memoize isOnRemoteNode in index settings and refactor as well (opense…
Browse files Browse the repository at this point in the history
…arch-project#12994)

* Memoize isOnRemoteNode in index settings and rename it as well
---------
Signed-off-by: Gaurav Bafna <[email protected]>
Signed-off-by: Shivansh Arora <[email protected]>
  • Loading branch information
gbbafna authored and shiv0408 committed Apr 25, 2024
1 parent 37e7868 commit 45f8bcd
Show file tree
Hide file tree
Showing 10 changed files with 37 additions and 24 deletions.
8 changes: 5 additions & 3 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,7 @@ public static IndexMergePolicy fromString(String text) {

private volatile String defaultSearchPipeline;
private final boolean widenIndexSortType;
private final boolean assignedOnRemoteNode;

/**
* The maximum age of a retention lease before it is considered expired.
Expand Down Expand Up @@ -986,6 +987,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
* Now this sortField (IndexSort) is stored in SegmentInfo and we need to maintain backward compatibility for them.
*/
widenIndexSortType = IndexMetadata.SETTING_INDEX_VERSION_CREATED.get(settings).before(V_2_7_0);
assignedOnRemoteNode = RemoteStoreNodeAttribute.isRemoteDataAttributePresent(this.getNodeSettings());

setEnableFuzzySetForDocId(scopedSettings.get(INDEX_DOC_ID_FUZZY_SET_ENABLED_SETTING));
setDocIdFuzzySetFalsePositiveProbability(scopedSettings.get(INDEX_DOC_ID_FUZZY_SET_FALSE_POSITIVE_PROBABILITY_SETTING));
Expand Down Expand Up @@ -1231,7 +1233,7 @@ public int getNumberOfReplicas() {
* proper index setting during the migration.
*/
public boolean isSegRepEnabledOrRemoteNode() {
return ReplicationType.SEGMENT.equals(replicationType) || isRemoteNode();
return ReplicationType.SEGMENT.equals(replicationType) || isAssignedOnRemoteNode();
}

public boolean isSegRepLocalEnabled() {
Expand All @@ -1249,8 +1251,8 @@ public boolean isRemoteStoreEnabled() {
return isRemoteStoreEnabled;
}

public boolean isRemoteNode() {
return RemoteStoreNodeAttribute.isRemoteDataAttributePresent(this.getNodeSettings());
public boolean isAssignedOnRemoteNode() {
return assignedOnRemoteNode;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) {
during promotion.
*/
if (engineConfig.getIndexSettings().isRemoteStoreEnabled() == false
&& engineConfig.getIndexSettings().isRemoteNode() == false) {
&& engineConfig.getIndexSettings().isAssignedOnRemoteNode() == false) {
latestSegmentInfos.counter = latestSegmentInfos.counter + SI_COUNTER_INCREMENT;
latestSegmentInfos.changed();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public RemoteStoreStatsTrackerFactory(ClusterService clusterService, Settings se

@Override
public void afterIndexShardCreated(IndexShard indexShard) {
if (indexShard.indexSettings().isRemoteStoreEnabled() == false && indexShard.indexSettings().isRemoteNode() == false) {
if (indexShard.indexSettings().isRemoteStoreEnabled() == false && indexShard.indexSettings().isAssignedOnRemoteNode() == false) {
return;
}
ShardId shardId = indexShard.shardId();
Expand Down
26 changes: 13 additions & 13 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ public IndexShard(
logger,
threadPool,
this::getEngine,
indexSettings.isRemoteNode(),
indexSettings.isAssignedOnRemoteNode(),
() -> getRemoteTranslogUploadBufferInterval(remoteStoreSettings::getClusterRemoteTranslogBufferInterval)
);
this.mapperService = mapperService;
Expand Down Expand Up @@ -1469,7 +1469,7 @@ public SegmentsStats segmentStats(boolean includeSegmentFileSizes, boolean inclu
SegmentsStats segmentsStats = getEngine().segmentsStats(includeSegmentFileSizes, includeUnloadedSegments);
segmentsStats.addBitsetMemoryInBytes(shardBitsetFilterCache.getMemorySizeInBytes());
// Populate remote_store stats only if the index is remote store backed
if (indexSettings().isRemoteNode()) {
if (indexSettings().isAssignedOnRemoteNode()) {
segmentsStats.addRemoteSegmentStats(
new RemoteSegmentStats(remoteStoreStatsTrackerFactory.getRemoteSegmentTransferTracker(shardId).stats())
);
Expand All @@ -1491,7 +1491,7 @@ public FieldDataStats fieldDataStats(String... fields) {
public TranslogStats translogStats() {
TranslogStats translogStats = getEngine().translogManager().getTranslogStats();
// Populate remote_store stats only if the index is remote store backed
if (indexSettings.isRemoteNode()) {
if (indexSettings.isAssignedOnRemoteNode()) {
translogStats.addRemoteTranslogStats(
new RemoteTranslogStats(remoteStoreStatsTrackerFactory.getRemoteTranslogTransferTracker(shardId).stats())
);
Expand Down Expand Up @@ -1530,7 +1530,7 @@ public void flush(FlushRequest request) {
* {@link org.opensearch.index.translog.TranslogDeletionPolicy} for details
*/
public void trimTranslog() {
if (indexSettings.isRemoteNode()) {
if (indexSettings.isAssignedOnRemoteNode()) {
return;
}
verifyNotClosed();
Expand Down Expand Up @@ -2050,7 +2050,7 @@ public void close(String reason, boolean flushEngine, boolean deleted) throws IO
ToDo : Fix this https://github.com/opensearch-project/OpenSearch/issues/8003
*/
public RemoteSegmentStoreDirectory getRemoteDirectory() {
assert indexSettings.isRemoteNode();
assert indexSettings.isAssignedOnRemoteNode();
assert remoteStore.directory() instanceof FilterDirectory : "Store.directory is not an instance of FilterDirectory";
FilterDirectory remoteStoreDirectory = (FilterDirectory) remoteStore.directory();
FilterDirectory byteSizeCachingStoreDirectory = (FilterDirectory) remoteStoreDirectory.getDelegate();
Expand All @@ -2063,7 +2063,7 @@ public RemoteSegmentStoreDirectory getRemoteDirectory() {
* is in sync with local
*/
public boolean isRemoteSegmentStoreInSync() {
assert indexSettings.isRemoteNode();
assert indexSettings.isAssignedOnRemoteNode();
try {
RemoteSegmentStoreDirectory directory = getRemoteDirectory();
if (directory.readLatestMetadataFile() != null) {
Expand Down Expand Up @@ -2102,7 +2102,7 @@ public void waitForRemoteStoreSync() {
Calls onProgress on seeing an increased file count on remote
*/
public void waitForRemoteStoreSync(Runnable onProgress) {
assert indexSettings.isRemoteNode();
assert indexSettings.isAssignedOnRemoteNode();
RemoteSegmentStoreDirectory directory = getRemoteDirectory();
int segmentUploadeCount = 0;
if (shardRouting.primary() == false) {
Expand Down Expand Up @@ -2277,7 +2277,7 @@ public long recoverLocallyAndFetchStartSeqNo(boolean localTranslog) {
* @return the starting sequence number from which the recovery should start.
*/
private long recoverLocallyUptoLastCommit() {
assert indexSettings.isRemoteNode() : "Remote translog store is not enabled";
assert indexSettings.isAssignedOnRemoteNode() : "Remote translog store is not enabled";
long seqNo;
validateLocalRecoveryState();

Expand Down Expand Up @@ -3540,7 +3540,7 @@ assert getLocalCheckpoint() == primaryContext.getCheckpointStates().get(allocati
}

private void postActivatePrimaryMode() {
if (indexSettings.isRemoteNode()) {
if (indexSettings.isAssignedOnRemoteNode()) {
// We make sure to upload translog (even if it does not contain any operations) to remote translog.
// This helps to get a consistent state in remote store where both remote segment store and remote
// translog contains data.
Expand Down Expand Up @@ -4010,7 +4010,7 @@ public boolean enableUploadToRemoteTranslog() {
}

private boolean hasOneRemoteSegmentSyncHappened() {
assert indexSettings.isRemoteNode();
assert indexSettings.isAssignedOnRemoteNode();
// We upload remote translog only after one remote segment upload in case of migration
RemoteSegmentStoreDirectory rd = getRemoteDirectory();
AtomicBoolean segment_n_uploaded = new AtomicBoolean(false);
Expand Down Expand Up @@ -4624,7 +4624,7 @@ public final boolean isSearchIdle() {
public final boolean isSearchIdleSupported() {
// If the index is remote store backed, then search idle is not supported. This is to ensure that async refresh
// task continues to upload to remote store periodically.
if (isRemoteTranslogEnabled() || indexSettings.isRemoteNode()) {
if (isRemoteTranslogEnabled() || indexSettings.isAssignedOnRemoteNode()) {
return false;
}
return indexSettings.isSegRepEnabledOrRemoteNode() == false || indexSettings.getNumberOfReplicas() == 0;
Expand Down Expand Up @@ -5263,9 +5263,9 @@ enum ShardMigrationState {
}

static ShardMigrationState getShardMigrationState(IndexSettings indexSettings, boolean shouldSeed) {
if (indexSettings.isRemoteNode() && indexSettings.isRemoteStoreEnabled()) {
if (indexSettings.isAssignedOnRemoteNode() && indexSettings.isRemoteStoreEnabled()) {
return REMOTE_NON_MIGRATING;
} else if (indexSettings.isRemoteNode()) {
} else if (indexSettings.isAssignedOnRemoteNode()) {
return shouldSeed ? REMOTE_MIGRATING_UNSEEDED : REMOTE_MIGRATING_SEEDED;
}
return ShardMigrationState.DOCREP_NON_MIGRATING;
Expand Down
2 changes: 1 addition & 1 deletion server/src/main/java/org/opensearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -893,7 +893,7 @@ public void beforeClose() {
* @throws IOException when there is an IO error committing.
*/
public void commitSegmentInfos(SegmentInfos latestSegmentInfos, long maxSeqNo, long processedCheckpoint) throws IOException {
assert indexSettings.isSegRepEnabledOrRemoteNode() || indexSettings.isRemoteNode();
assert indexSettings.isSegRepEnabledOrRemoteNode() || indexSettings.isAssignedOnRemoteNode();
metadataLock.writeLock().lock();
try {
final Map<String, String> userData = new HashMap<>(latestSegmentInfos.getUserData());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ TranslogWriter createWriter(
tragedy,
persistedSequenceNumberConsumer,
bigArrays,
indexSettings.isRemoteNode()
indexSettings.isAssignedOnRemoteNode()
);
} catch (final IOException e) {
throw new TranslogException(shardId, "failed to create new translog file", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -933,7 +933,7 @@ private EngineFactory getEngineFactory(final IndexSettings idxSettings) {
if (idxSettings.isRemoteSnapshot()) {
return config -> new ReadOnlyEngine(config, new SeqNoStats(0, 0, 0), new TranslogStats(), true, Function.identity(), false);
}
if (idxSettings.isSegRepEnabledOrRemoteNode() || idxSettings.isRemoteNode()) {
if (idxSettings.isSegRepEnabledOrRemoteNode() || idxSettings.isAssignedOnRemoteNode()) {
return new NRTReplicationEngineFactory();
}
return new InternalEngineFactory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi
}
}
final boolean hasRemoteTranslog = recoveryTarget.state().getPrimary() == false
&& indexShard.indexSettings().isRemoteNode();
&& indexShard.indexSettings().isAssignedOnRemoteNode();
final boolean hasNoTranslog = indexShard.indexSettings().isRemoteSnapshot();
final boolean verifyTranslog = (hasRemoteTranslog || hasNoTranslog || hasRemoteSegmentStore) == false;
final long startingSeqNo = indexShard.recoverLocallyAndFetchStartSeqNo(!hasRemoteTranslog);
Expand Down
11 changes: 11 additions & 0 deletions server/src/test/java/org/opensearch/index/IndexSettingsTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -1053,4 +1053,15 @@ public void testDefaultSearchPipeline() throws Exception {
settings.updateIndexMetadata(metadata);
assertEquals("foo", settings.getDefaultSearchPipeline());
}

public void testIsOnRemoteNode() {
Version version = VersionUtils.getPreviousVersion();
Settings theSettings = Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, version)
.put(IndexMetadata.SETTING_INDEX_UUID, "0xdeadbeef")
.build();
Settings nodeSettings = Settings.builder().put("node.attr.remote_store.translog.repository", "my-repo-1").build();
IndexSettings settings = newIndexSettings(newIndexMeta("index", theSettings), nodeSettings);
assertTrue("Index should be on remote node", settings.isAssignedOnRemoteNode());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ protected IndexShard newShard(
RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory = null;
RepositoriesService mockRepoSvc = mock(RepositoriesService.class);

if (indexSettings.isRemoteStoreEnabled() || indexSettings.isRemoteNode()) {
if (indexSettings.isRemoteStoreEnabled() || indexSettings.isAssignedOnRemoteNode()) {
String remoteStoreRepository = indexSettings.getRemoteStoreRepository();
// remote path via setting a repository . This is a hack used for shards are created using reset .
// since we can't get remote path from IndexShard directly, we are using repository to store it .
Expand Down Expand Up @@ -1498,7 +1498,7 @@ private SegmentReplicationTargetService prepareForReplication(

SegmentReplicationSourceFactory sourceFactory = null;
SegmentReplicationTargetService targetService;
if (primaryShard.indexSettings.isRemoteStoreEnabled() || primaryShard.indexSettings.isRemoteNode()) {
if (primaryShard.indexSettings.isRemoteStoreEnabled() || primaryShard.indexSettings.isAssignedOnRemoteNode()) {
RecoverySettings recoverySettings = new RecoverySettings(
Settings.EMPTY,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
Expand Down

0 comments on commit 45f8bcd

Please sign in to comment.