Skip to content

Commit

Permalink
Add RemoteStoreSettings class to handle remote store related settings
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <[email protected]>
  • Loading branch information
Sachin Kale committed Mar 21, 2024
1 parent fd458d6 commit 7dc7de5
Show file tree
Hide file tree
Showing 18 changed files with 293 additions and 135 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.opensearch.index.shard.IndexShardClosedException;
import org.opensearch.index.translog.Translog.Durability;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.recovery.RecoveryState;
Expand All @@ -56,7 +57,7 @@

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.opensearch.indices.IndicesService.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING;
import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.comparesEqualTo;
Expand Down Expand Up @@ -189,7 +190,7 @@ public void testStaleCommitDeletionWithInvokeFlush() throws Exception {
Path indexPath = Path.of(String.valueOf(segmentRepoPath), indexUUID, "/0/segments/metadata");

IndexShard indexShard = getIndexShard(dataNode, INDEX_NAME);
int lastNMetadataFilesToKeep = indexShard.getRecoverySettings().getMinRemoteSegmentMetadataFiles();
int lastNMetadataFilesToKeep = indexShard.getRemoteStoreSettings().getMinRemoteSegmentMetadataFiles();
// Delete is async.
assertBusy(() -> {
int actualFileCount = getFileCount(indexPath);
Expand Down Expand Up @@ -224,7 +225,7 @@ public void testStaleCommitDeletionWithoutInvokeFlush() throws Exception {

public void testStaleCommitDeletionWithMinSegmentFiles_3() throws Exception {
Settings.Builder settings = Settings.builder()
.put(RecoverySettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING.getKey(), "3");
.put(RemoteStoreSettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING.getKey(), "3");
internalCluster().startNode(settings);

createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1));
Expand All @@ -243,7 +244,7 @@ public void testStaleCommitDeletionWithMinSegmentFiles_3() throws Exception {

public void testStaleCommitDeletionWithMinSegmentFiles_Disabled() throws Exception {
Settings.Builder settings = Settings.builder()
.put(RecoverySettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING.getKey(), "-1");
.put(RemoteStoreSettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING.getKey(), "-1");
internalCluster().startNode(settings);

createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1));
Expand Down Expand Up @@ -469,7 +470,7 @@ public void testAsyncDurabilityThrowsExceptionWhenRestrictSettingTrue() throws E

private void assertClusterRemoteBufferInterval(TimeValue expectedBufferInterval, String dataNode) {
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, dataNode);
assertEquals(expectedBufferInterval, indicesService.getClusterRemoteTranslogBufferInterval());
assertEquals(expectedBufferInterval, indicesService.getRemoteStoreSettings().getClusterRemoteTranslogBufferInterval());
}

private void assertBufferInterval(TimeValue expectedBufferInterval, IndexShard indexShard) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@
import org.opensearch.indices.IndicesQueryCache;
import org.opensearch.indices.IndicesRequestCache;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.indices.ShardLimitValidator;
import org.opensearch.indices.analysis.HunspellService;
import org.opensearch.indices.breaker.BreakerSettings;
Expand Down Expand Up @@ -297,7 +298,6 @@ public void apply(Settings value, Settings current, Settings previous) {
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING,
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_REMOTE_STORE_STREAMS_SETTING,
RecoverySettings.INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT,
RecoverySettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING,
Expand Down Expand Up @@ -706,7 +706,6 @@ public void apply(Settings value, Settings current, Settings previous) {
RemoteClusterStateService.METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING,
RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING,
RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING,
IndicesService.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING,
IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING,
IndicesService.CLUSTER_INDEX_RESTRICT_REPLICATION_TYPE_SETTING,
IndicesService.CLUSTER_REMOTE_STORE_PATH_PREFIX_TYPE_SETTING,
Expand All @@ -723,7 +722,10 @@ public void apply(Settings value, Settings current, Settings previous) {

// Concurrent segment search settings
SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING,
SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING
SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING,

RemoteStoreSettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING,
RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING
)
)
);
Expand Down
9 changes: 5 additions & 4 deletions server/src/main/java/org/opensearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.index.translog.TranslogFactory;
import org.opensearch.indices.IndicesQueryCache;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.opensearch.indices.mapper.MapperRegistry;
import org.opensearch.indices.recovery.RecoverySettings;
Expand Down Expand Up @@ -604,8 +605,8 @@ public IndexService newIndexService(
IndexStorePlugin.DirectoryFactory remoteDirectoryFactory,
BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier,
Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier,
Supplier<TimeValue> clusterRemoteTranslogBufferIntervalSupplier,
RecoverySettings recoverySettings
RecoverySettings recoverySettings,
RemoteStoreSettings remoteStoreSettings
) throws IOException {
final IndexEventListener eventListener = freeze();
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> readerWrapperFactory = indexReaderWrapper
Expand Down Expand Up @@ -663,8 +664,8 @@ public IndexService newIndexService(
recoveryStateFactory,
translogFactorySupplier,
clusterDefaultRefreshIntervalSupplier,
clusterRemoteTranslogBufferIntervalSupplier,
recoverySettings
recoverySettings,
remoteStoreSettings
);
success = true;
return indexService;
Expand Down
11 changes: 6 additions & 5 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogFactory;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.indices.cluster.IndicesClusterStateService;
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.opensearch.indices.mapper.MapperRegistry;
Expand Down Expand Up @@ -183,8 +184,8 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private final ValuesSourceRegistry valuesSourceRegistry;
private final BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier;
private final Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier;
private final Supplier<TimeValue> clusterRemoteTranslogBufferIntervalSupplier;
private final RecoverySettings recoverySettings;
private final RemoteStoreSettings remoteStoreSettings;

public IndexService(
IndexSettings indexSettings,
Expand Down Expand Up @@ -219,8 +220,8 @@ public IndexService(
IndexStorePlugin.RecoveryStateFactory recoveryStateFactory,
BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier,
Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier,
Supplier<TimeValue> clusterRemoteTranslogBufferIntervalSupplier,
RecoverySettings recoverySettings
RecoverySettings recoverySettings,
RemoteStoreSettings remoteStoreSettings
) {
super(indexSettings);
this.allowExpensiveQueries = allowExpensiveQueries;
Expand Down Expand Up @@ -296,8 +297,8 @@ public IndexService(
this.globalCheckpointTask = new AsyncGlobalCheckpointTask(this);
this.retentionLeaseSyncTask = new AsyncRetentionLeaseSyncTask(this);
this.translogFactorySupplier = translogFactorySupplier;
this.clusterRemoteTranslogBufferIntervalSupplier = clusterRemoteTranslogBufferIntervalSupplier;
this.recoverySettings = recoverySettings;
this.remoteStoreSettings = remoteStoreSettings;
updateFsyncTaskIfNecessary();
}

Expand Down Expand Up @@ -548,9 +549,9 @@ public synchronized IndexShard createShard(
this.indexSettings.isSegRepEnabledOrRemoteNode() ? checkpointPublisher : null,
remoteStore,
remoteStoreStatsTrackerFactory,
clusterRemoteTranslogBufferIntervalSupplier,
nodeEnv.nodeId(),
recoverySettings,
remoteStoreSettings,
seedRemote
);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
Expand Down
11 changes: 9 additions & 2 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@
import org.opensearch.index.warmer.WarmerStats;
import org.opensearch.indices.IndexingMemoryController;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.indices.cluster.IndicesClusterStateService;
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
import org.opensearch.indices.recovery.RecoveryFailedException;
Expand Down Expand Up @@ -349,6 +350,7 @@ Runnable getGlobalCheckpointSyncer() {
private final List<ReferenceManager.RefreshListener> internalRefreshListener = new ArrayList<>();
private final RemoteStoreFileDownloader fileDownloader;
private final RecoverySettings recoverySettings;
private final RemoteStoreSettings remoteStoreSettings;
/*
On source doc rep node, It will be DOCREP_NON_MIGRATING.
On source remote node , it will be REMOTE_MIGRATING_SEEDED when relocating from remote node
Expand Down Expand Up @@ -381,9 +383,9 @@ public IndexShard(
@Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher,
@Nullable final Store remoteStore,
final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory,
final Supplier<TimeValue> clusterRemoteTranslogBufferIntervalSupplier,
final String nodeId,
final RecoverySettings recoverySettings,
final RemoteStoreSettings remoteStoreSettings,
boolean seedRemote
) throws IOException {
super(shardRouting.shardId(), indexSettings);
Expand All @@ -405,7 +407,7 @@ public IndexShard(
threadPool,
this::getEngine,
indexSettings.isRemoteNode(),
() -> getRemoteTranslogUploadBufferInterval(clusterRemoteTranslogBufferIntervalSupplier)
() -> getRemoteTranslogUploadBufferInterval(remoteStoreSettings::getClusterRemoteTranslogBufferInterval)
);
this.mapperService = mapperService;
this.indexCache = indexCache;
Expand Down Expand Up @@ -481,6 +483,7 @@ public boolean shouldCache(Query query) {
: mapperService.documentMapper().mappers().containsTimeStampField();
this.remoteStoreStatsTrackerFactory = remoteStoreStatsTrackerFactory;
this.recoverySettings = recoverySettings;
this.remoteStoreSettings = remoteStoreSettings;
this.fileDownloader = new RemoteStoreFileDownloader(shardRouting.shardId(), threadPool, recoverySettings);
this.shardMigrationState = getShardMigrationState(indexSettings, seedRemote);
}
Expand Down Expand Up @@ -598,6 +601,10 @@ public RecoverySettings getRecoverySettings() {
return recoverySettings;
}

public RemoteStoreSettings getRemoteStoreSettings() {
return remoteStoreSettings;
}

public RemoteStoreFileDownloader getFileDownloader() {
return fileDownloader;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ private boolean syncSegments() {
// is considered as a first refresh post commit. A cleanup of stale commit files is triggered.
// This is done to avoid delete post each refresh.
if (isRefreshAfterCommit()) {
remoteDirectory.deleteStaleSegmentsAsync(indexShard.getRecoverySettings().getMinRemoteSegmentMetadataFiles());
remoteDirectory.deleteStaleSegmentsAsync(indexShard.getRemoteStoreSettings().getMinRemoteSegmentMetadataFiles());
}

try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices;

import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;

/**
* Utility to provide a {@link RemoteStoreSettings} instance containing all defaults
*/
public final class DefaultRemoteStoreSettings {
private DefaultRemoteStoreSettings() {}

public static final RemoteStoreSettings INSTANCE = new RemoteStoreSettings(
Settings.EMPTY,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
);
}
35 changes: 8 additions & 27 deletions server/src/main/java/org/opensearch/indices/IndicesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -249,17 +249,6 @@ public class IndicesService extends AbstractLifecycleComponent
Property.Final
);

/**
* Used to specify the default translog buffer interval for remote store backed indexes.
*/
public static final Setting<TimeValue> CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING = Setting.timeSetting(
"cluster.remote_store.translog.buffer_interval",
IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL,
IndexSettings.MINIMUM_REMOTE_TRANSLOG_BUFFER_INTERVAL,
Property.NodeScope,
Property.Dynamic
);

/**
* This setting is used to set the refresh interval when the {@code index.refresh_interval} index setting is not
* provided during index creation or when the existing {@code index.refresh_interval} index setting is set as null.
Expand Down Expand Up @@ -366,7 +355,7 @@ public class IndicesService extends AbstractLifecycleComponent
private volatile boolean idFieldDataEnabled;
private volatile boolean allowExpensiveQueries;
private final RecoverySettings recoverySettings;

private final RemoteStoreSettings remoteStoreSettings;
@Nullable
private final OpenSearchThreadPoolExecutor danglingIndicesThreadPoolExecutor;
private final Set<Index> danglingIndicesToWrite = Sets.newConcurrentHashSet();
Expand All @@ -375,8 +364,6 @@ public class IndicesService extends AbstractLifecycleComponent
private final IndexStorePlugin.DirectoryFactory remoteDirectoryFactory;
private final BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier;
private volatile TimeValue clusterDefaultRefreshInterval;
private volatile TimeValue clusterRemoteTranslogBufferInterval;

private final SearchRequestStats searchRequestStats;

@Override
Expand Down Expand Up @@ -411,7 +398,8 @@ public IndicesService(
SearchRequestStats searchRequestStats,
@Nullable RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory,
RecoverySettings recoverySettings,
CacheService cacheService
CacheService cacheService,
RemoteStoreSettings remoteStoreSettings
) {
this.settings = settings;
this.threadPool = threadPool;
Expand Down Expand Up @@ -515,10 +503,8 @@ protected void closeInternal() {
this.clusterDefaultRefreshInterval = CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING.get(clusterService.getSettings());
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING, this::onRefreshIntervalUpdate);
this.clusterRemoteTranslogBufferInterval = CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.get(clusterService.getSettings());
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING, this::setClusterRemoteTranslogBufferInterval);
this.recoverySettings = recoverySettings;
this.remoteStoreSettings = remoteStoreSettings;
}

/**
Expand Down Expand Up @@ -923,8 +909,8 @@ private synchronized IndexService createIndexService(
remoteDirectoryFactory,
translogFactorySupplier,
this::getClusterDefaultRefreshInterval,
this::getClusterRemoteTranslogBufferInterval,
this.recoverySettings
this.recoverySettings,
this.remoteStoreSettings
);
}

Expand Down Expand Up @@ -2044,12 +2030,7 @@ private TimeValue getClusterDefaultRefreshInterval() {
return this.clusterDefaultRefreshInterval;
}

// Exclusively for testing, please do not use it elsewhere.
public TimeValue getClusterRemoteTranslogBufferInterval() {
return clusterRemoteTranslogBufferInterval;
}

private void setClusterRemoteTranslogBufferInterval(TimeValue clusterRemoteTranslogBufferInterval) {
this.clusterRemoteTranslogBufferInterval = clusterRemoteTranslogBufferInterval;
public RemoteStoreSettings getRemoteStoreSettings() {
return this.remoteStoreSettings;
}
}
Loading

0 comments on commit 7dc7de5

Please sign in to comment.