Skip to content

Commit

Permalink
add a new indices.recovery.chunk_size to control the maximum allowed …
Browse files Browse the repository at this point in the history
…memory to store source documents during recovery.
  • Loading branch information
jimczi committed Dec 4, 2024
1 parent fdfb2cb commit d7207e4
Show file tree
Hide file tree
Showing 22 changed files with 196 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.common.ValidationException;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.query.TermsQueryBuilder;
import org.elasticsearch.index.seqno.SeqNoStats;
Expand Down Expand Up @@ -146,14 +145,14 @@ public void testResizeChangeSyntheticSource() {

public void testResizeChangeRecoveryUseSyntheticSource() {
prepareCreate("source").setSettings(indexSettings(between(1, 5), 0))
.setMapping("@timestamp", "type=date", "host.name", "type=keyword")
.get();
.setMapping("@timestamp", "type=date", "host.name", "type=keyword")
.get();
updateIndexSettings(Settings.builder().put("index.blocks.write", true), "source");
IllegalArgumentException error = expectThrows(IllegalArgumentException.class, () -> {
indicesAdmin().prepareResizeIndex("source", "target")
.setResizeType(ResizeType.CLONE)
.setSettings(Settings.builder().put("index.recovery.use_synthetic_source", true).putNull("index.blocks.write").build())
.get();
.setResizeType(ResizeType.CLONE)
.setSettings(Settings.builder().put("index.recovery.use_synthetic_source", true).putNull("index.blocks.write").build())
.get();
});
assertThat(error.getMessage(), containsString("can't change setting [index.recovery.use_synthetic_source] during resize"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,15 @@ public void testShardChangesWithDefaultDocType() throws Exception {
}
IndexShard shard = indexService.getShard(0);
try (
Translog.Snapshot luceneSnapshot = shard.newChangesSnapshot("test", 0, numOps - 1, true, randomBoolean(), randomBoolean());
Translog.Snapshot luceneSnapshot = shard.newChangesSnapshot(
"test",
0,
numOps - 1,
true,
randomBoolean(),
randomBoolean(),
randomLongBetween(1, ByteSizeValue.ofMb(32).getBytes())
);
Translog.Snapshot translogSnapshot = getTranslog(shard).newSnapshot()
) {
List<Translog.Operation> opsFromLucene = TestTranslog.drainSnapshot(luceneSnapshot, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.IndexVersions;
import org.elasticsearch.indices.InvalidIndexNameException;
Expand Down Expand Up @@ -810,11 +809,11 @@ public void testRestoreChangeRecoveryUseSyntheticSource() {
cluster().wipeIndices(indexName);
var error = expectThrows(SnapshotRestoreException.class, () -> {
client.admin()
.cluster()
.prepareRestoreSnapshot(TEST_REQUEST_TIMEOUT, "test-repo", "test-snap")
.setIndexSettings(Settings.builder().put("index.recovery.use_synthetic_source", true))
.setWaitForCompletion(true)
.get();
.cluster()
.prepareRestoreSnapshot(TEST_REQUEST_TIMEOUT, "test-repo", "test-snap")
.setIndexSettings(Settings.builder().put("index.recovery.use_synthetic_source", true))
.setWaitForCompletion(true)
.get();
});
assertThat(error.getMessage(), containsString("cannot modify setting [index.recovery.use_synthetic_source] on restore"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ public void apply(Settings value, Settings current, Settings previous) {
RecoverySettings.INDICES_RECOVERY_USE_SNAPSHOTS_SETTING,
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS,
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE,
RecoverySettings.INDICES_RECOVERY_CHUNK_SIZE,
RecoverySettings.NODE_BANDWIDTH_RECOVERY_FACTOR_READ_SETTING,
RecoverySettings.NODE_BANDWIDTH_RECOVERY_FACTOR_WRITE_SETTING,
RecoverySettings.NODE_BANDWIDTH_RECOVERY_OPERATOR_FACTOR_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -937,7 +937,7 @@ public boolean allowSearchIdleOptimization() {
* @param source the source of the request
* @param fromSeqNo the start sequence number (inclusive)
* @param toSeqNo the end sequence number (inclusive)
* @see #newChangesSnapshot(String, long, long, boolean, boolean, boolean)
* @see #newChangesSnapshot(String, long, long, boolean, boolean, boolean, long)
*/
public abstract int countChanges(String source, long fromSeqNo, long toSeqNo) throws IOException;

Expand All @@ -951,7 +951,8 @@ public abstract Translog.Snapshot newChangesSnapshot(
long toSeqNo,
boolean requiredFullRange,
boolean singleConsumer,
boolean accessStats
boolean accessStats,
long maxChunkSize
) throws IOException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3143,7 +3143,8 @@ public Translog.Snapshot newChangesSnapshot(
long toSeqNo,
boolean requiredFullRange,
boolean singleConsumer,
boolean accessStats
boolean accessStats,
long maxChunkSize
) throws IOException {
if (enableRecoverySource == false) {
throw new IllegalStateException(
Expand All @@ -3162,7 +3163,7 @@ public Translog.Snapshot newChangesSnapshot(
engineConfig.getMapperService().mappingLookup(),
searcher,
SearchBasedChangesSnapshot.DEFAULT_BATCH_SIZE,
LuceneSyntheticSourceChangesSnapshot.DEFAULT_MEMORY_SIZE,
maxChunkSize,
fromSeqNo,
toSeqNo,
requiredFullRange,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ public Closeable acquireHistoryRetentionLock() {

@Override
public int countChanges(String source, long fromSeqNo, long toSeqNo) throws IOException {
try (Translog.Snapshot snapshot = newChangesSnapshot(source, fromSeqNo, toSeqNo, false, true, true)) {
try (Translog.Snapshot snapshot = newChangesSnapshot(source, fromSeqNo, toSeqNo, false, true, true, -1)) {
return snapshot.totalOperations();
}
}
Expand All @@ -368,7 +368,8 @@ public Translog.Snapshot newChangesSnapshot(
long toSeqNo,
boolean requiredFullRange,
boolean singleConsumer,
boolean accessStats
boolean accessStats,
long maxChunkSize
) {
return Translog.Snapshot.EMPTY;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2599,7 +2599,7 @@ public long getMinRetainedSeqNo() {
* @param source the source of the request
* @param fromSeqNo the start sequence number (inclusive)
* @param toSeqNo the end sequence number (inclusive)
* @see #newChangesSnapshot(String, long, long, boolean, boolean, boolean)
* @see #newChangesSnapshot(String, long, long, boolean, boolean, boolean, long)
*/
public int countChanges(String source, long fromSeqNo, long toSeqNo) throws IOException {
return getEngine().countChanges(source, fromSeqNo, toSeqNo);
Expand All @@ -2618,16 +2618,18 @@ public int countChanges(String source, long fromSeqNo, long toSeqNo) throws IOEx
* @param singleConsumer true if the snapshot is accessed by only the thread that creates the snapshot. In this case, the
* snapshot can enable some optimizations to improve the performance.
* @param accessStats true if the stats of the snapshot is accessed via {@link Translog.Snapshot#totalOperations()}
* @param maxChunkSize The maximum allowable size, in bytes, for buffering source documents during recovery.
*/
public Translog.Snapshot newChangesSnapshot(
String source,
long fromSeqNo,
long toSeqNo,
boolean requiredFullRange,
boolean singleConsumer,
boolean accessStats
boolean accessStats,
long maxChunkSize
) throws IOException {
return getEngine().newChangesSnapshot(source, fromSeqNo, toSeqNo, requiredFullRange, singleConsumer, accessStats);
return getEngine().newChangesSnapshot(source, fromSeqNo, toSeqNo, requiredFullRange, singleConsumer, accessStats, maxChunkSize);
}

public List<Segment> segments() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void resync(final IndexShard indexShard, final ActionListener<ResyncTask>
// Wrap translog snapshot to make it synchronized as it is accessed by different threads through SnapshotSender.
// Even though those calls are not concurrent, snapshot.next() uses non-synchronized state and is not multi-thread-compatible
// Also fail the resync early if the shard is shutting down
snapshot = indexShard.newChangesSnapshot("resync", startingSeqNo, Long.MAX_VALUE, false, false, true);
snapshot = indexShard.newChangesSnapshot("resync", startingSeqNo, Long.MAX_VALUE, false, false, true, chunkSize.getBytes());
final Translog.Snapshot originalSnapshot = snapshot;
final Translog.Snapshot wrappedSnapshot = new Translog.Snapshot() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,16 @@ public Iterator<Setting<?>> settings() {

public static final ByteSizeValue DEFAULT_CHUNK_SIZE = new ByteSizeValue(512, ByteSizeUnit.KB);

/**
* The maximum allowable size, in bytes, for buffering source documents during recovery.
*/
public static final Setting<ByteSizeValue> INDICES_RECOVERY_CHUNK_SIZE = Setting.byteSizeSetting(
"indices.recovery.chunk_size",
DEFAULT_CHUNK_SIZE,
Property.NodeScope,
Property.Dynamic
);

private volatile ByteSizeValue maxBytesPerSec;
private volatile int maxConcurrentFileChunks;
private volatile int maxConcurrentOperations;
Expand All @@ -417,7 +427,7 @@ public Iterator<Setting<?>> settings() {

private final AdjustableSemaphore maxSnapshotFileDownloadsPerNodeSemaphore;

private volatile ByteSizeValue chunkSize = DEFAULT_CHUNK_SIZE;
private volatile ByteSizeValue chunkSize;

private final ByteSizeValue availableNetworkBandwidth;
private final ByteSizeValue availableDiskReadBandwidth;
Expand All @@ -444,6 +454,7 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
this.availableNetworkBandwidth = NODE_BANDWIDTH_RECOVERY_NETWORK_SETTING.get(settings);
this.availableDiskReadBandwidth = NODE_BANDWIDTH_RECOVERY_DISK_READ_SETTING.get(settings);
this.availableDiskWriteBandwidth = NODE_BANDWIDTH_RECOVERY_DISK_WRITE_SETTING.get(settings);
this.chunkSize = INDICES_RECOVERY_CHUNK_SIZE.get(settings);
validateNodeBandwidthRecoverySettings(settings);
this.nodeBandwidthSettingsExist = hasNodeBandwidthRecoverySettings(settings);
computeMaxBytesPerSec(settings);
Expand Down Expand Up @@ -493,6 +504,7 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING,
this::setMaxConcurrentIncomingRecoveries
);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_CHUNK_SIZE, this::setChunkSize);
}

private void computeMaxBytesPerSec(Settings settings) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,8 @@ && isTargetSameHistory()
Long.MAX_VALUE,
false,
false,
true
true,
chunkSizeInBytes
);
resources.add(phase2Snapshot);
retentionLock.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver;
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndSeqNo;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
Expand Down Expand Up @@ -6447,7 +6448,8 @@ protected void doRun() throws Exception {
max,
true,
randomBoolean(),
randomBoolean()
randomBoolean(),
randomLongBetween(1, ByteSizeValue.ofMb(32).getBytes())
)
) {}
} else {
Expand Down Expand Up @@ -7673,7 +7675,7 @@ public void testDisableRecoverySource() throws Exception {
) {
IllegalStateException exc = expectThrows(
IllegalStateException.class,
() -> engine.newChangesSnapshot("test", 0, 1000, true, true, true)
() -> engine.newChangesSnapshot("test", 0, 1000, true, true, true, randomLongBetween(1, ByteSizeValue.ofMb(32).getBytes()))
);
assertThat(exc.getMessage(), containsString("unavailable"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
package org.elasticsearch.index.engine;

import org.apache.lucene.index.NoMergePolicy;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.mapper.MappingLookup;
import org.elasticsearch.index.store.Store;
Expand Down Expand Up @@ -66,7 +67,8 @@ public void testAccessStoredFieldsSequentially() throws Exception {
between(1, smallBatch),
false,
randomBoolean(),
randomBoolean()
randomBoolean(),
randomLongBetween(1, ByteSizeValue.ofMb(32).getBytes())
)
) {
while ((op = snapshot.next()) != null) {
Expand All @@ -82,7 +84,8 @@ public void testAccessStoredFieldsSequentially() throws Exception {
between(20, 100),
false,
randomBoolean(),
randomBoolean()
randomBoolean(),
randomLongBetween(1, ByteSizeValue.ofMb(32).getBytes())
)
) {
while ((op = snapshot.next()) != null) {
Expand All @@ -98,7 +101,8 @@ public void testAccessStoredFieldsSequentially() throws Exception {
between(21, 100),
false,
true,
randomBoolean()
randomBoolean(),
randomLongBetween(1, ByteSizeValue.ofMb(32).getBytes())
)
) {
while ((op = snapshot.next()) != null) {
Expand All @@ -114,7 +118,8 @@ public void testAccessStoredFieldsSequentially() throws Exception {
between(21, 100),
false,
false,
randomBoolean()
randomBoolean(),
randomLongBetween(1, ByteSizeValue.ofMb(32).getBytes())
)
) {
while ((op = snapshot.next()) != null) {
Expand Down
Loading

0 comments on commit d7207e4

Please sign in to comment.