From d7207e476e5b4bfebcf918fdaa2736326205e94b Mon Sep 17 00:00:00 2001 From: Jim Ferenczi Date: Wed, 4 Dec 2024 11:50:32 +0000 Subject: [PATCH] add a new indices.recovery.chunk_size to control the maximum allowed memory to store source documents during recovery. --- .../admin/indices/create/CloneIndexIT.java | 11 ++--- .../index/shard/IndexShardIT.java | 10 +++- .../snapshots/RestoreSnapshotIT.java | 11 ++--- .../common/settings/ClusterSettings.java | 1 + .../elasticsearch/index/engine/Engine.java | 5 +- .../index/engine/InternalEngine.java | 5 +- .../index/engine/ReadOnlyEngine.java | 5 +- .../elasticsearch/index/shard/IndexShard.java | 8 +-- .../index/shard/PrimaryReplicaSyncer.java | 2 +- .../indices/recovery/RecoverySettings.java | 14 +++++- .../recovery/RecoverySourceHandler.java | 3 +- .../index/engine/InternalEngineTests.java | 6 ++- .../engine/LuceneChangesSnapshotTests.java | 13 +++-- .../SearchBasedChangesSnapshotTests.java | 49 ++++++++++++++++--- .../IndexLevelReplicationTests.java | 19 +++++-- .../index/shard/IndexShardTests.java | 20 +++++++- .../indices/recovery/RecoveryTests.java | 4 +- .../index/engine/EngineTestCase.java | 13 ++++- .../xpack/ccr/action/ShardChangesAction.java | 12 ++++- .../ShardFollowTaskReplicationTests.java | 13 ++++- .../action/bulk/BulkShardOperationsTests.java | 11 ++++- .../index/engine/FollowingEngineTests.java | 11 ++++- 22 files changed, 196 insertions(+), 50 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/indices/create/CloneIndexIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/indices/create/CloneIndexIT.java index b8ced275c701d..b4654134cb5f2 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/indices/create/CloneIndexIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/indices/create/CloneIndexIT.java @@ -14,7 +14,6 @@ import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.elasticsearch.common.ValidationException; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.query.TermsQueryBuilder; import org.elasticsearch.index.seqno.SeqNoStats; @@ -146,14 +145,14 @@ public void testResizeChangeSyntheticSource() { public void testResizeChangeRecoveryUseSyntheticSource() { prepareCreate("source").setSettings(indexSettings(between(1, 5), 0)) - .setMapping("@timestamp", "type=date", "host.name", "type=keyword") - .get(); + .setMapping("@timestamp", "type=date", "host.name", "type=keyword") + .get(); updateIndexSettings(Settings.builder().put("index.blocks.write", true), "source"); IllegalArgumentException error = expectThrows(IllegalArgumentException.class, () -> { indicesAdmin().prepareResizeIndex("source", "target") - .setResizeType(ResizeType.CLONE) - .setSettings(Settings.builder().put("index.recovery.use_synthetic_source", true).putNull("index.blocks.write").build()) - .get(); + .setResizeType(ResizeType.CLONE) + .setSettings(Settings.builder().put("index.recovery.use_synthetic_source", true).putNull("index.blocks.write").build()) + .get(); }); assertThat(error.getMessage(), containsString("can't change setting [index.recovery.use_synthetic_source] during resize")); } diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java index 6ffd5808cea73..870947db5bd85 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -715,7 +715,15 @@ public void testShardChangesWithDefaultDocType() throws Exception { } IndexShard shard = indexService.getShard(0); try ( - Translog.Snapshot luceneSnapshot = shard.newChangesSnapshot("test", 0, numOps - 1, true, randomBoolean(), randomBoolean()); + Translog.Snapshot luceneSnapshot = shard.newChangesSnapshot( + "test", + 0, + numOps - 1, + true, + randomBoolean(), + randomBoolean(), + randomLongBetween(1, ByteSizeValue.ofMb(32).getBytes()) + ); Translog.Snapshot translogSnapshot = getTranslog(shard).newSnapshot() ) { List opsFromLucene = TestTranslog.drainSnapshot(luceneSnapshot, true); diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/RestoreSnapshotIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/RestoreSnapshotIT.java index 34ee1c6bca440..5b54a0dbbb799 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/RestoreSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/RestoreSnapshotIT.java @@ -24,7 +24,6 @@ import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.IndexMode; -import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.IndexVersions; import org.elasticsearch.indices.InvalidIndexNameException; @@ -810,11 +809,11 @@ public void testRestoreChangeRecoveryUseSyntheticSource() { cluster().wipeIndices(indexName); var error = expectThrows(SnapshotRestoreException.class, () -> { client.admin() - .cluster() - .prepareRestoreSnapshot(TEST_REQUEST_TIMEOUT, "test-repo", "test-snap") - .setIndexSettings(Settings.builder().put("index.recovery.use_synthetic_source", true)) - .setWaitForCompletion(true) - .get(); + .cluster() + .prepareRestoreSnapshot(TEST_REQUEST_TIMEOUT, "test-repo", "test-snap") + .setIndexSettings(Settings.builder().put("index.recovery.use_synthetic_source", true)) + .setWaitForCompletion(true) + .get(); }); assertThat(error.getMessage(), containsString("cannot modify setting [index.recovery.use_synthetic_source] on restore")); } diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index a9a9411de8e1f..20a44eed7036c 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -258,6 +258,7 @@ public void apply(Settings value, Settings current, Settings previous) { RecoverySettings.INDICES_RECOVERY_USE_SNAPSHOTS_SETTING, RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS, RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE, + RecoverySettings.INDICES_RECOVERY_CHUNK_SIZE, RecoverySettings.NODE_BANDWIDTH_RECOVERY_FACTOR_READ_SETTING, RecoverySettings.NODE_BANDWIDTH_RECOVERY_FACTOR_WRITE_SETTING, RecoverySettings.NODE_BANDWIDTH_RECOVERY_OPERATOR_FACTOR_SETTING, diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index edafa1ca922fb..394de0684c104 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -937,7 +937,7 @@ public boolean allowSearchIdleOptimization() { * @param source the source of the request * @param fromSeqNo the start sequence number (inclusive) * @param toSeqNo the end sequence number (inclusive) - * @see #newChangesSnapshot(String, long, long, boolean, boolean, boolean) + * @see #newChangesSnapshot(String, long, long, boolean, boolean, boolean, long) */ public abstract int countChanges(String source, long fromSeqNo, long toSeqNo) throws IOException; @@ -951,7 +951,8 @@ public abstract Translog.Snapshot newChangesSnapshot( long toSeqNo, boolean requiredFullRange, boolean singleConsumer, - boolean accessStats + boolean accessStats, + long maxChunkSize ) throws IOException; /** diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index afad793316dd6..0a470e86ef856 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -3143,7 +3143,8 @@ public Translog.Snapshot newChangesSnapshot( long toSeqNo, boolean requiredFullRange, boolean singleConsumer, - boolean accessStats + boolean accessStats, + long maxChunkSize ) throws IOException { if (enableRecoverySource == false) { throw new IllegalStateException( @@ -3162,7 +3163,7 @@ public Translog.Snapshot newChangesSnapshot( engineConfig.getMapperService().mappingLookup(), searcher, SearchBasedChangesSnapshot.DEFAULT_BATCH_SIZE, - LuceneSyntheticSourceChangesSnapshot.DEFAULT_MEMORY_SIZE, + maxChunkSize, fromSeqNo, toSeqNo, requiredFullRange, diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index d4a2fe1b57903..c1d11223fa55e 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -356,7 +356,7 @@ public Closeable acquireHistoryRetentionLock() { @Override public int countChanges(String source, long fromSeqNo, long toSeqNo) throws IOException { - try (Translog.Snapshot snapshot = newChangesSnapshot(source, fromSeqNo, toSeqNo, false, true, true)) { + try (Translog.Snapshot snapshot = newChangesSnapshot(source, fromSeqNo, toSeqNo, false, true, true, -1)) { return snapshot.totalOperations(); } } @@ -368,7 +368,8 @@ public Translog.Snapshot newChangesSnapshot( long toSeqNo, boolean requiredFullRange, boolean singleConsumer, - boolean accessStats + boolean accessStats, + long maxChunkSize ) { return Translog.Snapshot.EMPTY; } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 993079a3106d7..211f26617185c 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2599,7 +2599,7 @@ public long getMinRetainedSeqNo() { * @param source the source of the request * @param fromSeqNo the start sequence number (inclusive) * @param toSeqNo the end sequence number (inclusive) - * @see #newChangesSnapshot(String, long, long, boolean, boolean, boolean) + * @see #newChangesSnapshot(String, long, long, boolean, boolean, boolean, long) */ public int countChanges(String source, long fromSeqNo, long toSeqNo) throws IOException { return getEngine().countChanges(source, fromSeqNo, toSeqNo); @@ -2618,6 +2618,7 @@ public int countChanges(String source, long fromSeqNo, long toSeqNo) throws IOEx * @param singleConsumer true if the snapshot is accessed by only the thread that creates the snapshot. In this case, the * snapshot can enable some optimizations to improve the performance. * @param accessStats true if the stats of the snapshot is accessed via {@link Translog.Snapshot#totalOperations()} + * @param maxChunkSize The maximum allowable size, in bytes, for buffering source documents during recovery. */ public Translog.Snapshot newChangesSnapshot( String source, @@ -2625,9 +2626,10 @@ public Translog.Snapshot newChangesSnapshot( long toSeqNo, boolean requiredFullRange, boolean singleConsumer, - boolean accessStats + boolean accessStats, + long maxChunkSize ) throws IOException { - return getEngine().newChangesSnapshot(source, fromSeqNo, toSeqNo, requiredFullRange, singleConsumer, accessStats); + return getEngine().newChangesSnapshot(source, fromSeqNo, toSeqNo, requiredFullRange, singleConsumer, accessStats, maxChunkSize); } public List segments() { diff --git a/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java b/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java index f843357e056c4..1143da30c2952 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java +++ b/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java @@ -81,7 +81,7 @@ public void resync(final IndexShard indexShard, final ActionListener // Wrap translog snapshot to make it synchronized as it is accessed by different threads through SnapshotSender. // Even though those calls are not concurrent, snapshot.next() uses non-synchronized state and is not multi-thread-compatible // Also fail the resync early if the shard is shutting down - snapshot = indexShard.newChangesSnapshot("resync", startingSeqNo, Long.MAX_VALUE, false, false, true); + snapshot = indexShard.newChangesSnapshot("resync", startingSeqNo, Long.MAX_VALUE, false, false, true, chunkSize.getBytes()); final Translog.Snapshot originalSnapshot = snapshot; final Translog.Snapshot wrappedSnapshot = new Translog.Snapshot() { @Override diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java index 1ec187ea4a34b..d9c8549671b05 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java @@ -399,6 +399,16 @@ public Iterator> settings() { public static final ByteSizeValue DEFAULT_CHUNK_SIZE = new ByteSizeValue(512, ByteSizeUnit.KB); + /** + * The maximum allowable size, in bytes, for buffering source documents during recovery. + */ + public static final Setting INDICES_RECOVERY_CHUNK_SIZE = Setting.byteSizeSetting( + "indices.recovery.chunk_size", + DEFAULT_CHUNK_SIZE, + Property.NodeScope, + Property.Dynamic + ); + private volatile ByteSizeValue maxBytesPerSec; private volatile int maxConcurrentFileChunks; private volatile int maxConcurrentOperations; @@ -417,7 +427,7 @@ public Iterator> settings() { private final AdjustableSemaphore maxSnapshotFileDownloadsPerNodeSemaphore; - private volatile ByteSizeValue chunkSize = DEFAULT_CHUNK_SIZE; + private volatile ByteSizeValue chunkSize; private final ByteSizeValue availableNetworkBandwidth; private final ByteSizeValue availableDiskReadBandwidth; @@ -444,6 +454,7 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) { this.availableNetworkBandwidth = NODE_BANDWIDTH_RECOVERY_NETWORK_SETTING.get(settings); this.availableDiskReadBandwidth = NODE_BANDWIDTH_RECOVERY_DISK_READ_SETTING.get(settings); this.availableDiskWriteBandwidth = NODE_BANDWIDTH_RECOVERY_DISK_WRITE_SETTING.get(settings); + this.chunkSize = INDICES_RECOVERY_CHUNK_SIZE.get(settings); validateNodeBandwidthRecoverySettings(settings); this.nodeBandwidthSettingsExist = hasNodeBandwidthRecoverySettings(settings); computeMaxBytesPerSec(settings); @@ -493,6 +504,7 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) { CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING, this::setMaxConcurrentIncomingRecoveries ); + clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_CHUNK_SIZE, this::setChunkSize); } private void computeMaxBytesPerSec(Settings settings) { diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 3603b984fb148..622e56f596e19 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -324,7 +324,8 @@ && isTargetSameHistory() Long.MAX_VALUE, false, false, - true + true, + chunkSizeInBytes ); resources.add(phase2Snapshot); retentionLock.close(); diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index c3f00747d2f31..3e3be6a315af2 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -89,6 +89,7 @@ import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver; import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndSeqNo; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; @@ -6447,7 +6448,8 @@ protected void doRun() throws Exception { max, true, randomBoolean(), - randomBoolean() + randomBoolean(), + randomLongBetween(1, ByteSizeValue.ofMb(32).getBytes()) ) ) {} } else { @@ -7673,7 +7675,7 @@ public void testDisableRecoverySource() throws Exception { ) { IllegalStateException exc = expectThrows( IllegalStateException.class, - () -> engine.newChangesSnapshot("test", 0, 1000, true, true, true) + () -> engine.newChangesSnapshot("test", 0, 1000, true, true, true, randomLongBetween(1, ByteSizeValue.ofMb(32).getBytes())) ); assertThat(exc.getMessage(), containsString("unavailable")); } diff --git a/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java b/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java index d78f0bed647ca..5863d2f932968 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java @@ -10,6 +10,7 @@ package org.elasticsearch.index.engine; import org.apache.lucene.index.NoMergePolicy; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.mapper.MappingLookup; import org.elasticsearch.index.store.Store; @@ -66,7 +67,8 @@ public void testAccessStoredFieldsSequentially() throws Exception { between(1, smallBatch), false, randomBoolean(), - randomBoolean() + randomBoolean(), + randomLongBetween(1, ByteSizeValue.ofMb(32).getBytes()) ) ) { while ((op = snapshot.next()) != null) { @@ -82,7 +84,8 @@ public void testAccessStoredFieldsSequentially() throws Exception { between(20, 100), false, randomBoolean(), - randomBoolean() + randomBoolean(), + randomLongBetween(1, ByteSizeValue.ofMb(32).getBytes()) ) ) { while ((op = snapshot.next()) != null) { @@ -98,7 +101,8 @@ public void testAccessStoredFieldsSequentially() throws Exception { between(21, 100), false, true, - randomBoolean() + randomBoolean(), + randomLongBetween(1, ByteSizeValue.ofMb(32).getBytes()) ) ) { while ((op = snapshot.next()) != null) { @@ -114,7 +118,8 @@ public void testAccessStoredFieldsSequentially() throws Exception { between(21, 100), false, false, - randomBoolean() + randomBoolean(), + randomLongBetween(1, ByteSizeValue.ofMb(32).getBytes()) ) ) { while ((op = snapshot.next()) != null) { diff --git a/server/src/test/java/org/elasticsearch/index/engine/SearchBasedChangesSnapshotTests.java b/server/src/test/java/org/elasticsearch/index/engine/SearchBasedChangesSnapshotTests.java index 9db6f224538b7..9cfa7321973a4 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/SearchBasedChangesSnapshotTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/SearchBasedChangesSnapshotTests.java @@ -12,6 +12,7 @@ import org.apache.lucene.index.NoMergePolicy; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.core.IOUtils; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexVersion; @@ -60,14 +61,34 @@ public void testBasics() throws Exception { long fromSeqNo = randomNonNegativeLong(); long toSeqNo = randomLongBetween(fromSeqNo, Long.MAX_VALUE); // Empty engine - try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", fromSeqNo, toSeqNo, true, randomBoolean(), randomBoolean())) { + try ( + Translog.Snapshot snapshot = engine.newChangesSnapshot( + "test", + fromSeqNo, + toSeqNo, + true, + randomBoolean(), + randomBoolean(), + randomLongBetween(1, ByteSizeValue.ofMb(32).getBytes()) + ) + ) { IllegalStateException error = expectThrows(IllegalStateException.class, () -> drainAll(snapshot)); assertThat( error.getMessage(), containsString("Not all operations between from_seqno [" + fromSeqNo + "] and to_seqno [" + toSeqNo + "] found") ); } - try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", fromSeqNo, toSeqNo, false, randomBoolean(), randomBoolean())) { + try ( + Translog.Snapshot snapshot = engine.newChangesSnapshot( + "test", + fromSeqNo, + toSeqNo, + false, + randomBoolean(), + randomBoolean(), + randomLongBetween(1, ByteSizeValue.ofMb(32).getBytes()) + ) + ) { assertThat(snapshot, SnapshotMatchers.size(0)); } int numOps = between(1, 100); @@ -212,7 +233,8 @@ public void testBasics() throws Exception { toSeqNo, randomBoolean(), randomBoolean(), - randomBoolean() + randomBoolean(), + randomLongBetween(1, ByteSizeValue.ofMb(32).getBytes()) ) ) { assertThat(snapshot, SnapshotMatchers.containsSeqNoRange(fromSeqNo, toSeqNo)); @@ -341,7 +363,8 @@ void pullOperations(InternalEngine follower) throws IOException { toSeqNo, true, randomBoolean(), - randomBoolean() + randomBoolean(), + randomLongBetween(1, ByteSizeValue.ofMb(32).getBytes()) ) ) { translogHandler.run(follower, snapshot); @@ -389,7 +412,17 @@ private List drainAll(Translog.Snapshot snapshot) throws IOE public void testOverFlow() throws Exception { long fromSeqNo = randomLongBetween(0, 5); long toSeqNo = randomLongBetween(Long.MAX_VALUE - 5, Long.MAX_VALUE); - try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", fromSeqNo, toSeqNo, true, randomBoolean(), randomBoolean())) { + try ( + Translog.Snapshot snapshot = engine.newChangesSnapshot( + "test", + fromSeqNo, + toSeqNo, + true, + randomBoolean(), + randomBoolean(), + randomLongBetween(1, ByteSizeValue.ofMb(32).getBytes()) + ) + ) { IllegalStateException error = expectThrows(IllegalStateException.class, () -> drainAll(snapshot)); assertThat( error.getMessage(), @@ -439,7 +472,8 @@ public void testStats() throws Exception { toSeqNo.getAsLong(), false, randomBoolean(), - false + false, + randomLongBetween(1, ByteSizeValue.ofMb(32).getBytes()) ) ) { IllegalStateException error = expectThrows(IllegalStateException.class, snapshot::totalOperations); @@ -457,7 +491,8 @@ public void testStats() throws Exception { toSeqNo.getAsLong(), false, randomBoolean(), - true + true, + randomLongBetween(1, ByteSizeValue.ofMb(32).getBytes()) ) ) { assertThat(snapshot.totalOperations(), equalTo(numOps)); diff --git a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java index 49b1362436ec7..0357d02dbbb98 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -22,6 +22,7 @@ import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.iterable.Iterables; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.IndexSettings; @@ -486,7 +487,8 @@ protected EngineFactory getEngineFactory(ShardRouting routing) { Long.MAX_VALUE, false, randomBoolean(), - randomBoolean() + randomBoolean(), + randomLongBetween(1, ByteSizeValue.ofMb(32).getBytes()) ) ) { assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedTranslogOps)); @@ -513,7 +515,8 @@ protected EngineFactory getEngineFactory(ShardRouting routing) { Long.MAX_VALUE, false, randomBoolean(), - randomBoolean() + randomBoolean(), + randomLongBetween(1, ByteSizeValue.ofMb(32).getBytes()) ) ) { assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedTranslogOps)); @@ -608,7 +611,17 @@ public void testSeqNoCollision() throws Exception { shards.promoteReplicaToPrimary(replica2).get(); logger.info("--> Recover replica3 from replica2"); recoverReplica(replica3, replica2, true); - try (Translog.Snapshot snapshot = replica3.newChangesSnapshot("test", 0, Long.MAX_VALUE, false, randomBoolean(), true)) { + try ( + Translog.Snapshot snapshot = replica3.newChangesSnapshot( + "test", + 0, + Long.MAX_VALUE, + false, + randomBoolean(), + true, + randomLongBetween(1, ByteSizeValue.ofMb(32).getBytes()) + ) + ) { assertThat(snapshot.totalOperations(), equalTo(initDocs + 1)); final List expectedOps = new ArrayList<>(initOperations); expectedOps.add(op2); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index d480f7bfc8d7f..eacb4cf35a422 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -1819,7 +1819,15 @@ public void testShardFieldStats() throws IOException { shard.refresh("test"); } else { // trigger internal refresh - shard.newChangesSnapshot("test", 0, Long.MAX_VALUE, false, randomBoolean(), randomBoolean()).close(); + shard.newChangesSnapshot( + "test", + 0, + Long.MAX_VALUE, + false, + randomBoolean(), + randomBoolean(), + randomLongBetween(1, ByteSizeValue.ofMb(32).getBytes()) + ).close(); } assertThat(shard.getShardFieldStats(), sameInstance(stats)); // index more docs @@ -1837,7 +1845,15 @@ public void testShardFieldStats() throws IOException { shard.refresh("test"); } else { // trigger internal refresh - shard.newChangesSnapshot("test", 0, Long.MAX_VALUE, false, randomBoolean(), randomBoolean()).close(); + shard.newChangesSnapshot( + "test", + 0, + Long.MAX_VALUE, + false, + randomBoolean(), + randomBoolean(), + randomLongBetween(1, ByteSizeValue.ofMb(32).getBytes()) + ).close(); } stats = shard.getShardFieldStats(); assertThat(stats.numSegments(), equalTo(2)); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java index 315eaaf9ffaf1..aef58cee04899 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java @@ -29,6 +29,7 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.MergePolicyConfig; import org.elasticsearch.index.VersionType; @@ -211,7 +212,8 @@ public void testRecoveryWithOutOfOrderDeleteWithSoftDeletes() throws Exception { Long.MAX_VALUE, false, randomBoolean(), - randomBoolean() + randomBoolean(), + randomLongBetween(1, ByteSizeValue.ofMb(32).getBytes()) ) ) { assertThat(snapshot, SnapshotMatchers.size(6)); diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 64e9e83892bcf..46f6a0b503bfb 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -62,6 +62,7 @@ import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.core.CheckedFunction; import org.elasticsearch.core.IOUtils; @@ -1289,7 +1290,17 @@ public static List getDocIds(Engine engine, boolean refresh */ public static List readAllOperationsInLucene(Engine engine) throws IOException { final List operations = new ArrayList<>(); - try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", 0, Long.MAX_VALUE, false, randomBoolean(), randomBoolean())) { + try ( + Translog.Snapshot snapshot = engine.newChangesSnapshot( + "test", + 0, + Long.MAX_VALUE, + false, + randomBoolean(), + randomBoolean(), + randomLongBetween(1, ByteSizeValue.ofMb(32).getBytes()) + ) + ) { Translog.Operation op; while ((op = snapshot.next()) != null) { operations.add(op); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java index 167c5f66300a9..8133702ab5354 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java @@ -564,7 +564,17 @@ static Translog.Operation[] getOperations( long toSeqNo = Math.min(globalCheckpoint, (fromSeqNo + maxOperationCount) - 1); assert fromSeqNo <= toSeqNo : "invalid range from_seqno[" + fromSeqNo + "] > to_seqno[" + toSeqNo + "]"; final List operations = new ArrayList<>(); - try (Translog.Snapshot snapshot = indexShard.newChangesSnapshot("ccr", fromSeqNo, toSeqNo, true, true, false)) { + try ( + Translog.Snapshot snapshot = indexShard.newChangesSnapshot( + "ccr", + fromSeqNo, + toSeqNo, + true, + true, + false, + maxBatchSize.getBytes() + ) + ) { Translog.Operation op; while ((op = snapshot.next()) != null) { operations.add(op); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java index 5cd9f8bc5b78c..573c66cbb614a 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -755,7 +755,15 @@ private void assertConsistentHistoryBetweenLeaderAndFollower( final Map operationsOnLeader = new HashMap<>(); try ( Translog.Snapshot snapshot = leader.getPrimary() - .newChangesSnapshot("test", 0, Long.MAX_VALUE, false, randomBoolean(), randomBoolean()) + .newChangesSnapshot( + "test", + 0, + Long.MAX_VALUE, + false, + randomBoolean(), + randomBoolean(), + randomLongBetween(1, ByteSizeValue.ofMb(32).getBytes()) + ) ) { Translog.Operation op; while ((op = snapshot.next()) != null) { @@ -780,7 +788,8 @@ private void assertConsistentHistoryBetweenLeaderAndFollower( Long.MAX_VALUE, false, randomBoolean(), - randomBoolean() + randomBoolean(), + randomLongBetween(1, ByteSizeValue.ofMb(32).getBytes()) ) ) { Translog.Operation op; diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java index 4e3aea2cad205..e3f26eed0c2e9 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java @@ -17,6 +17,7 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; import org.elasticsearch.index.translog.Translog; @@ -84,7 +85,15 @@ public void testPrimaryTermFromFollower() throws IOException { boolean accessStats = randomBoolean(); try ( - Translog.Snapshot snapshot = followerPrimary.newChangesSnapshot("test", 0, Long.MAX_VALUE, false, randomBoolean(), accessStats) + Translog.Snapshot snapshot = followerPrimary.newChangesSnapshot( + "test", + 0, + Long.MAX_VALUE, + false, + randomBoolean(), + accessStats, + randomLongBetween(1, ByteSizeValue.ofMb(32).getBytes()) + ) ) { if (accessStats) { assertThat(snapshot.totalOperations(), equalTo(operations.size())); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index e22f7fb54de76..62dc3313a1172 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -19,6 +19,7 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.Index; @@ -651,7 +652,15 @@ private void fetchOperations(AtomicBoolean stopped, AtomicLong lastFetchedSeqNo, final long toSeqNo = randomLongBetween(nextSeqNo, Math.min(nextSeqNo + 5, checkpoint)); try ( Translog.Snapshot snapshot = shuffleSnapshot( - leader.newChangesSnapshot("test", fromSeqNo, toSeqNo, true, randomBoolean(), randomBoolean()) + leader.newChangesSnapshot( + "test", + fromSeqNo, + toSeqNo, + true, + randomBoolean(), + randomBoolean(), + randomLongBetween(1, ByteSizeValue.ofMb(32).getBytes()) + ) ) ) { follower.advanceMaxSeqNoOfUpdatesOrDeletes(leader.getMaxSeqNoOfUpdatesOrDeletes());