Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add recovery chunk size setting #13997

Merged
merged 15 commits into from
Jun 10, 2024
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add remote routing table for remote state publication with experimental feature flag ([#13304](https://github.com/opensearch-project/OpenSearch/pull/13304))
- Add dynamic action retry timeout setting ([#14022](https://github.com/opensearch-project/OpenSearch/issues/14022))
- [Remote Store] Add support to disable flush based on translog reader count ([#14027](https://github.com/opensearch-project/OpenSearch/pull/14027))
- Add recovery chunk size setting ([#13997](https://github.com/opensearch-project/OpenSearch/pull/13997))
- [Query Insights] Add exporter support for top n queries ([#12982](https://github.com/opensearch-project/OpenSearch/pull/12982))
- [Query Insights] Add X-Opaque-Id to search request metadata for top n queries ([#13374](https://github.com/opensearch-project/OpenSearch/pull/13374))
- Add support for query level resource usage tracking ([#13172](https://github.com/opensearch-project/OpenSearch/pull/13172))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@
import org.opensearch.indices.recovery.RecoveryState.Stage;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
import org.opensearch.node.NodeClosedException;
import org.opensearch.node.RecoverySettingsChunkSizePlugin;
import org.opensearch.plugins.AnalysisPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.PluginsService;
Expand Down Expand Up @@ -156,7 +155,7 @@
import static java.util.stream.Collectors.toList;
import static org.opensearch.action.DocWriteResponse.Result.CREATED;
import static org.opensearch.action.DocWriteResponse.Result.UPDATED;
import static org.opensearch.node.RecoverySettingsChunkSizePlugin.CHUNK_SIZE_SETTING;
import static org.opensearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_CHUNK_SIZE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.empty;
Expand Down Expand Up @@ -187,7 +186,6 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(
MockTransportService.TestPlugin.class,
MockFSIndexStore.TestPlugin.class,
RecoverySettingsChunkSizePlugin.class,
TestAnalysisPlugin.class,
InternalSettingsPlugin.class,
MockEngineFactoryPlugin.class
Expand Down Expand Up @@ -263,7 +261,7 @@ private void slowDownRecovery(ByteSizeValue shardSize) {
// one chunk per sec..
.put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), chunkSize, ByteSizeUnit.BYTES)
// small chunks
.put(CHUNK_SIZE_SETTING.getKey(), new ByteSizeValue(chunkSize, ByteSizeUnit.BYTES))
.put(INDICES_RECOVERY_CHUNK_SIZE_SETTING.getKey(), new ByteSizeValue(chunkSize, ByteSizeUnit.BYTES))
)
.get()
.isAcknowledged()
Expand All @@ -278,7 +276,10 @@ private void restoreRecoverySpeed() {
.setTransientSettings(
Settings.builder()
.put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), "20mb")
.put(CHUNK_SIZE_SETTING.getKey(), RecoverySettings.DEFAULT_CHUNK_SIZE)
.put(
INDICES_RECOVERY_CHUNK_SIZE_SETTING.getKey(),
RecoverySettings.INDICES_RECOVERY_CHUNK_SIZE_SETTING.getDefault(Settings.EMPTY)
)
)
.get()
.isAcknowledged()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.indices.recovery.FileChunkRequest;
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
import org.opensearch.node.RecoverySettingsChunkSizePlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase;
Expand All @@ -61,7 +60,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.opensearch.node.RecoverySettingsChunkSizePlugin.CHUNK_SIZE_SETTING;
import static org.opensearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_CHUNK_SIZE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
Expand All @@ -81,7 +80,7 @@ public static Collection<Object[]> parameters() {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class, RecoverySettingsChunkSizePlugin.class);
return Arrays.asList(MockTransportService.TestPlugin.class);
}

/**
Expand All @@ -96,7 +95,8 @@ public void testCancelRecoveryAndResume() throws Exception {
.cluster()
.prepareUpdateSettings()
.setTransientSettings(
Settings.builder().put(CHUNK_SIZE_SETTING.getKey(), new ByteSizeValue(randomIntBetween(50, 300), ByteSizeUnit.BYTES))
Settings.builder()
.put(INDICES_RECOVERY_CHUNK_SIZE_SETTING.getKey(), new ByteSizeValue(randomIntBetween(50, 300), ByteSizeUnit.BYTES))
)
.get()
.isAcknowledged()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ public void apply(Settings value, Settings current, Settings previous) {
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING,
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_REMOTE_STORE_STREAMS_SETTING,
RecoverySettings.INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT,
RecoverySettings.INDICES_RECOVERY_CHUNK_SIZE_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,14 @@ public class RecoverySettings {
);

// choose 512KB-16B to ensure that the resulting byte[] is not a humongous allocation in G1.
public static final ByteSizeValue DEFAULT_CHUNK_SIZE = new ByteSizeValue(512 * 1024 - 16, ByteSizeUnit.BYTES);
public static final Setting<ByteSizeValue> INDICES_RECOVERY_CHUNK_SIZE_SETTING = Setting.byteSizeSetting(
gbbafna marked this conversation as resolved.
Show resolved Hide resolved
"indices.recovery.chunk_size",
new ByteSizeValue(512 * 1024 - 16, ByteSizeUnit.BYTES),
new ByteSizeValue(1, ByteSizeUnit.BYTES),
new ByteSizeValue(100, ByteSizeUnit.MB),
Property.Dynamic,
Property.NodeScope
);

private volatile ByteSizeValue recoveryMaxBytesPerSec;
private volatile ByteSizeValue replicationMaxBytesPerSec;
Expand All @@ -193,7 +200,7 @@ public class RecoverySettings {
private volatile TimeValue internalActionRetryTimeout;
private volatile TimeValue internalActionLongTimeout;

private volatile ByteSizeValue chunkSize = DEFAULT_CHUNK_SIZE;
private volatile ByteSizeValue chunkSize;
private volatile TimeValue internalRemoteUploadTimeout;

public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
Expand Down Expand Up @@ -221,6 +228,7 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {

logger.debug("using recovery max_bytes_per_sec[{}]", recoveryMaxBytesPerSec);
this.internalRemoteUploadTimeout = INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT.get(settings);
this.chunkSize = INDICES_RECOVERY_CHUNK_SIZE_SETTING.get(settings);

clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, this::setRecoveryMaxBytesPerSec);
clusterSettings.addSettingsUpdateConsumer(INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING, this::setReplicationMaxBytesPerSec);
Expand All @@ -239,11 +247,11 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING, this::setActivityTimeout);
clusterSettings.addSettingsUpdateConsumer(INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT, this::setInternalRemoteUploadTimeout);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_CHUNK_SIZE_SETTING, this::setChunkSize);
clusterSettings.addSettingsUpdateConsumer(
INDICES_RECOVERY_INTERNAL_ACTION_RETRY_TIMEOUT_SETTING,
this::setInternalActionRetryTimeout
);

}

public RateLimiter recoveryRateLimiter() {
Expand Down Expand Up @@ -286,10 +294,7 @@ public ByteSizeValue getChunkSize() {
return chunkSize;
}

public void setChunkSize(ByteSizeValue chunkSize) { // only settable for tests
if (chunkSize.bytesAsInt() <= 0) {
throw new IllegalArgumentException("chunkSize must be > 0");
}
public void setChunkSize(ByteSizeValue chunkSize) {
this.chunkSize = chunkSize;
}

Expand Down
5 changes: 0 additions & 5 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -1339,7 +1339,6 @@ protected Node(
b.bind(GatewayMetaState.class).toInstance(gatewayMetaState);
b.bind(Discovery.class).toInstance(discoveryModule.getDiscovery());
{
processRecoverySettings(settingsModule.getClusterSettings(), recoverySettings);
b.bind(PeerRecoverySourceService.class)
.toInstance(new PeerRecoverySourceService(transportService, indicesService, recoverySettings));
b.bind(PeerRecoveryTargetService.class)
Expand Down Expand Up @@ -1447,10 +1446,6 @@ protected TransportService newTransportService(
return new TransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders, tracer);
}

protected void processRecoverySettings(ClusterSettings clusterSettings, RecoverySettings recoverySettings) {
// Noop in production, overridden by tests
}

/**
* The settings that are used by this node. Contains original settings as well as additional settings provided by plugins.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,14 @@ public void testInternalLongActionTimeout() {
assertEquals(new TimeValue(duration, timeUnit), recoverySettings.internalActionLongTimeout());
}

public void testChunkSize() {
ByteSizeValue chunkSize = new ByteSizeValue(between(1, 1000), ByteSizeUnit.BYTES);
clusterSettings.applySettings(
Settings.builder().put(RecoverySettings.INDICES_RECOVERY_CHUNK_SIZE_SETTING.getKey(), chunkSize).build()
);
assertEquals(chunkSize, recoverySettings.getChunkSize());
}

public void testInternalActionRetryTimeout() {
long duration = between(1, 1000);
TimeUnit timeUnit = randomFrom(TimeUnit.MILLISECONDS, TimeUnit.SECONDS, TimeUnit.MINUTES, TimeUnit.HOURS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1148,7 +1148,7 @@ public final void recoverUnstartedReplica(
startingSeqNo
);
long fileChunkSizeInBytes = randomBoolean()
? RecoverySettings.DEFAULT_CHUNK_SIZE.getBytes()
? RecoverySettings.INDICES_RECOVERY_CHUNK_SIZE_SETTING.getDefault(Settings.EMPTY).getBytes()
: randomIntBetween(1, 10 * 1024 * 1024);
final Settings settings = Settings.builder()
.put("indices.recovery.max_concurrent_file_chunks", Integer.toString(between(1, 4)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import org.opensearch.env.Environment;
import org.opensearch.http.HttpServerTransport;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.plugins.Plugin;
import org.opensearch.script.MockScriptService;
import org.opensearch.script.ScriptContext;
Expand Down Expand Up @@ -236,13 +235,6 @@ protected TransportService newTransportService(
}
}

@Override
astute-decipher marked this conversation as resolved.
Show resolved Hide resolved
protected void processRecoverySettings(ClusterSettings clusterSettings, RecoverySettings recoverySettings) {
if (false == getPluginsService().filterPlugins(RecoverySettingsChunkSizePlugin.class).isEmpty()) {
clusterSettings.addSettingsUpdateConsumer(RecoverySettingsChunkSizePlugin.CHUNK_SIZE_SETTING, recoverySettings::setChunkSize);
}
}

@Override
protected ClusterInfoService newClusterInfoService(
Settings settings,
Expand Down

This file was deleted.

Loading