From e828c180b28fd96b3121e92613fc0879004e791f Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Sat, 13 Apr 2024 21:58:17 -0700 Subject: [PATCH 1/2] Fix flakiness with SegmentReplicationSuiteIT (#11977) * Fix SegmentReplicationSuiteIT This test fails because of a race during shard/node shutdown with node-node replication. Fixed by properly synchronizing creation of new replication events with cancellation and cancelling after shards are closed. Signed-off-by: Marc Handalian * Remove CopyState caching from OngoingSegmentReplications. This change removes the responsibility of caching CopyState inside of OngoingSegmentReplications. 1. CopyState was originally cached to prevent frequent disk reads while building segment metadata. This is now cached lower down in IndexShard and is not required here. 2. Change prepareForReplication method to return SegmentReplicationSourceHandler directly 3. Move responsibility of creating and clearing CopyState to the handler. Signed-off-by: Marc Handalian * Fix comment for afterIndexShardClosed method. Signed-off-by: Marc Handalian * Fix comment on beforeIndexShardClosed Signed-off-by: Marc Handalian * Remove unnecessary method from OngoingSegmentReplications Signed-off-by: Marc Handalian --------- Signed-off-by: Marc Handalian --- .../SegmentReplicationSuiteIT.java | 3 +- .../replication/CheckpointInfoResponse.java | 6 + .../OngoingSegmentReplications.java | 185 ++++-------------- .../SegmentReplicationSourceHandler.java | 44 +++-- .../SegmentReplicationSourceService.java | 12 +- .../indices/replication/common/CopyState.java | 33 ++-- .../SegmentReplicationIndexShardTests.java | 16 +- .../OngoingSegmentReplicationsTests.java | 73 ++----- .../SegmentReplicationSourceHandlerTests.java | 26 +-- .../SegmentReplicationTargetServiceTests.java | 5 +- .../replication/common/CopyStateTests.java | 13 +- .../index/shard/IndexShardTestCase.java | 4 +- 12 files changed, 125 insertions(+), 295 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationSuiteIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationSuiteIT.java index 8c045c1560dd3..27b65432e0bac 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationSuiteIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationSuiteIT.java @@ -8,7 +8,6 @@ package org.opensearch.indices.replication; -import org.apache.lucene.tests.util.LuceneTestCase; import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; @@ -16,7 +15,6 @@ import org.opensearch.test.OpenSearchIntegTestCase; import org.junit.Before; -@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/9499") @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, minNumDataNodes = 2) public class SegmentReplicationSuiteIT extends SegmentReplicationBaseIT { @@ -64,6 +62,7 @@ public void testDropRandomNodeDuringReplication() throws Exception { ensureYellow(INDEX_NAME); client().prepareIndex(INDEX_NAME).setId(Integer.toString(docCount)).setSource("field", "value" + docCount).execute().get(); internalCluster().startDataOnlyNode(); + ensureGreen(INDEX_NAME); client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).actionGet(); } diff --git a/server/src/main/java/org/opensearch/indices/replication/CheckpointInfoResponse.java b/server/src/main/java/org/opensearch/indices/replication/CheckpointInfoResponse.java index 9fd3b7f3afb80..24b744bebc53d 100644 --- a/server/src/main/java/org/opensearch/indices/replication/CheckpointInfoResponse.java +++ b/server/src/main/java/org/opensearch/indices/replication/CheckpointInfoResponse.java @@ -40,6 +40,12 @@ public CheckpointInfoResponse( this.infosBytes = infosBytes; } + public CheckpointInfoResponse(final ReplicationCheckpoint checkpoint, final byte[] infosBytes) { + this.checkpoint = checkpoint; + this.infosBytes = infosBytes; + this.metadataMap = checkpoint.getMetadataMap(); + } + public CheckpointInfoResponse(StreamInput in) throws IOException { this.checkpoint = new ReplicationCheckpoint(in); this.metadataMap = in.readMap(StreamInput::readString, StoreFileMetadata::new); diff --git a/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java b/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java index 33967c0203516..6b99b3c0b0696 100644 --- a/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java +++ b/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java @@ -21,12 +21,10 @@ import org.opensearch.indices.IndicesService; import org.opensearch.indices.recovery.FileChunkWriter; import org.opensearch.indices.recovery.RecoverySettings; -import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; -import org.opensearch.indices.replication.common.CopyState; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -36,7 +34,6 @@ /** * Manages references to ongoing segrep events on a node. * Each replica will have a new {@link SegmentReplicationSourceHandler} created when starting replication. - * CopyStates will be cached for reuse between replicas and only released when all replicas have finished copying segments. * * @opensearch.internal */ @@ -45,7 +42,6 @@ class OngoingSegmentReplications { private static final Logger logger = LogManager.getLogger(OngoingSegmentReplications.class); private final RecoverySettings recoverySettings; private final IndicesService indicesService; - private final Map copyStateMap; private final Map allocationIdToHandlers; /** @@ -57,46 +53,9 @@ class OngoingSegmentReplications { OngoingSegmentReplications(IndicesService indicesService, RecoverySettings recoverySettings) { this.indicesService = indicesService; this.recoverySettings = recoverySettings; - this.copyStateMap = Collections.synchronizedMap(new HashMap<>()); this.allocationIdToHandlers = ConcurrentCollections.newConcurrentMap(); } - /* - Operations on the {@link #copyStateMap} member. - */ - - /** - * A synchronized method that checks {@link #copyStateMap} for the given {@link ReplicationCheckpoint} key - * and returns the cached value if one is present. If the key is not present, a {@link CopyState} - * object is constructed and stored in the map before being returned. - */ - synchronized CopyState getCachedCopyState(ReplicationCheckpoint checkpoint) throws IOException { - if (isInCopyStateMap(checkpoint)) { - final CopyState copyState = fetchFromCopyStateMap(checkpoint); - // we incref the copyState for every replica that is using this checkpoint. - // decref will happen when copy completes. - copyState.incRef(); - return copyState; - } else { - // From the checkpoint's shard ID, fetch the IndexShard - ShardId shardId = checkpoint.getShardId(); - final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); - final IndexShard indexShard = indexService.getShard(shardId.id()); - // build the CopyState object and cache it before returning - final CopyState copyState = new CopyState(checkpoint, indexShard); - - /* - Use the checkpoint from the request as the key in the map, rather than - the checkpoint from the created CopyState. This maximizes cache hits - if replication targets make a request with an older checkpoint. - Replication targets are expected to fetch the checkpoint in the response - CopyState to bring themselves up to date. - */ - addToCopyStateMap(checkpoint, copyState); - return copyState; - } - } - /** * Start sending files to the replica. * @@ -114,12 +73,10 @@ void startSegmentCopy(GetSegmentFilesRequest request, ActionListener wrappedListener = ActionListener.runBefore(listener, () -> { - final SegmentReplicationSourceHandler sourceHandler = allocationIdToHandlers.remove(request.getTargetAllocationId()); - if (sourceHandler != null) { - removeCopyState(sourceHandler.getCopyState()); - } - }); + final ActionListener wrappedListener = ActionListener.runBefore( + listener, + () -> allocationIdToHandlers.remove(request.getTargetAllocationId()) + ); handler.sendFiles(request, wrappedListener); } else { listener.onResponse(new GetSegmentFilesResponse(Collections.emptyList())); @@ -127,38 +84,32 @@ void startSegmentCopy(GetSegmentFilesRequest request, ActionListener handler.getAllocationId().equals(request.getTargetAllocationId()), "cancel due to retry"); - assert allocationIdToHandlers.containsKey(request.getTargetAllocationId()) == false; - allocationIdToHandlers.put(request.getTargetAllocationId(), newHandler); - } - assert allocationIdToHandlers.containsKey(request.getTargetAllocationId()); - return copyState; + SegmentReplicationSourceHandler prepareForReplication(CheckpointInfoRequest request, FileChunkWriter fileChunkWriter) { + return allocationIdToHandlers.computeIfAbsent(request.getTargetAllocationId(), aId -> { + try { + // From the checkpoint's shard ID, fetch the IndexShard + final ShardId shardId = request.getCheckpoint().getShardId(); + final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); + final IndexShard indexShard = indexService.getShard(shardId.id()); + return new SegmentReplicationSourceHandler( + request.getTargetNode(), + fileChunkWriter, + indexShard, + request.getTargetAllocationId(), + Math.toIntExact(recoverySettings.getChunkSize().getBytes()), + recoverySettings.getMaxConcurrentFileChunks() + ); + } catch (IOException e) { + throw new UncheckedIOException("Error creating replication handler", e); + } + }); } /** @@ -167,8 +118,8 @@ CopyState prepareForReplication(CheckpointInfoRequest request, FileChunkWriter f * @param shard {@link IndexShard} * @param reason {@link String} - Reason for the cancel */ - synchronized void cancel(IndexShard shard, String reason) { - cancelHandlers(handler -> handler.getCopyState().getShard().shardId().equals(shard.shardId()), reason); + void cancel(IndexShard shard, String reason) { + cancelHandlers(handler -> handler.shardId().equals(shard.shardId()), reason); } /** @@ -177,11 +128,10 @@ synchronized void cancel(IndexShard shard, String reason) { * @param allocationId {@link String} - Allocation ID. * @param reason {@link String} - Reason for the cancel */ - synchronized void cancel(String allocationId, String reason) { + void cancel(String allocationId, String reason) { final SegmentReplicationSourceHandler handler = allocationIdToHandlers.remove(allocationId); if (handler != null) { handler.cancel(reason); - removeCopyState(handler.getCopyState()); } } @@ -194,14 +144,6 @@ void cancelReplication(DiscoveryNode node) { cancelHandlers(handler -> handler.getTargetNode().equals(node), "Node left"); } - /** - * Checks if the {@link #copyStateMap} has the input {@link ReplicationCheckpoint} - * as a key by invoking {@link Map#containsKey(Object)}. - */ - boolean isInCopyStateMap(ReplicationCheckpoint replicationCheckpoint) { - return copyStateMap.containsKey(replicationCheckpoint); - } - int size() { return allocationIdToHandlers.size(); } @@ -211,58 +153,20 @@ Map getHandlers() { return allocationIdToHandlers; } - int cachedCopyStateSize() { - return copyStateMap.size(); - } - - private SegmentReplicationSourceHandler createTargetHandler( - DiscoveryNode node, - CopyState copyState, - String allocationId, - FileChunkWriter fileChunkWriter - ) { - return new SegmentReplicationSourceHandler( - node, - fileChunkWriter, - copyState.getShard().getThreadPool(), - copyState, - allocationId, - Math.toIntExact(recoverySettings.getChunkSize().getBytes()), - recoverySettings.getMaxConcurrentFileChunks() - ); - } - /** - * Adds the input {@link CopyState} object to {@link #copyStateMap}. - * The key is the CopyState's {@link ReplicationCheckpoint} object. - */ - private void addToCopyStateMap(ReplicationCheckpoint checkpoint, CopyState copyState) { - copyStateMap.putIfAbsent(checkpoint, copyState); - } - - /** - * Given a {@link ReplicationCheckpoint}, return the corresponding - * {@link CopyState} object, if any, from {@link #copyStateMap}. - */ - private CopyState fetchFromCopyStateMap(ReplicationCheckpoint replicationCheckpoint) { - return copyStateMap.get(replicationCheckpoint); - } - - /** - * Remove a CopyState. Intended to be called after a replication event completes. - * This method will remove a copyState from the copyStateMap only if its refCount hits 0. - * - * @param copyState {@link CopyState} + * Clear handlers for any allocationIds not in sync. + * @param shardId {@link ShardId} + * @param inSyncAllocationIds {@link List} of in-sync allocation Ids. */ - private synchronized void removeCopyState(CopyState copyState) { - if (copyState.decRef() == true) { - copyStateMap.remove(copyState.getRequestedReplicationCheckpoint()); - } + void clearOutOfSyncIds(ShardId shardId, Set inSyncAllocationIds) { + cancelHandlers( + (handler) -> handler.shardId().equals(shardId) && inSyncAllocationIds.contains(handler.getAllocationId()) == false, + "Shard is no longer in-sync with the primary" + ); } /** * Remove handlers from allocationIdToHandlers map based on a filter predicate. - * This will also decref the handler's CopyState reference. */ private void cancelHandlers(Predicate predicate, String reason) { final List allocationIds = allocationIdToHandlers.values() @@ -278,17 +182,4 @@ private void cancelHandlers(Predicate p cancel(allocationId, reason); } } - - /** - * Clear copystate and target handlers for any non insync allocationIds. - * @param shardId {@link ShardId} - * @param inSyncAllocationIds {@link List} of in-sync allocation Ids. - */ - public void clearOutOfSyncIds(ShardId shardId, Set inSyncAllocationIds) { - cancelHandlers( - (handler) -> handler.getCopyState().getShard().shardId().equals(shardId) - && inSyncAllocationIds.contains(handler.getAllocationId()) == false, - "Shard is no longer in-sync with the primary" - ); - } } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java index 674c09311c645..bb64d6b0c60b6 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java @@ -18,16 +18,18 @@ import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.action.ActionListener; +import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.recovery.FileChunkWriter; import org.opensearch.indices.recovery.MultiChunkTransfer; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.common.CopyState; import org.opensearch.indices.replication.common.ReplicationTimer; -import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.Transports; import java.io.Closeable; +import java.io.IOException; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; @@ -54,48 +56,46 @@ class SegmentReplicationSourceHandler { private final AtomicBoolean isReplicating = new AtomicBoolean(); private final DiscoveryNode targetNode; private final String allocationId; - private final FileChunkWriter writer; /** * Constructor. * - * @param targetNode - {@link DiscoveryNode} target node where files should be sent. + * @param targetNode {@link DiscoveryNode} target node where files should be sent. * @param writer {@link FileChunkWriter} implementation that sends file chunks over the transport layer. - * @param threadPool {@link ThreadPool} Thread pool. - * @param copyState {@link CopyState} CopyState holding segment file metadata. + * @param shard {@link IndexShard} The primary shard local to this node. * @param fileChunkSizeInBytes {@link Integer} * @param maxConcurrentFileChunks {@link Integer} */ SegmentReplicationSourceHandler( DiscoveryNode targetNode, FileChunkWriter writer, - ThreadPool threadPool, - CopyState copyState, + IndexShard shard, String allocationId, int fileChunkSizeInBytes, int maxConcurrentFileChunks - ) { + ) throws IOException { this.targetNode = targetNode; - this.shard = copyState.getShard(); + this.shard = shard; this.logger = Loggers.getLogger( SegmentReplicationSourceHandler.class, - copyState.getShard().shardId(), + shard.shardId(), "sending segments to " + targetNode.getName() ); this.segmentFileTransferHandler = new SegmentFileTransferHandler( - copyState.getShard(), + shard, targetNode, writer, logger, - threadPool, + shard.getThreadPool(), cancellableThreads, fileChunkSizeInBytes, maxConcurrentFileChunks ); this.allocationId = allocationId; - this.copyState = copyState; + this.copyState = new CopyState(shard); this.writer = writer; + resources.add(copyState); } /** @@ -109,6 +109,7 @@ public synchronized void sendFiles(GetSegmentFilesRequest request, ActionListene if (request.getFilesToFetch().isEmpty()) { // before completion, alert the primary of the replica's state. shard.updateVisibleCheckpointForShard(request.getTargetAllocationId(), copyState.getCheckpoint()); + IOUtils.closeWhileHandlingException(copyState); listener.onResponse(new GetSegmentFilesResponse(Collections.emptyList())); return; } @@ -183,10 +184,7 @@ public synchronized void sendFiles(GetSegmentFilesRequest request, ActionListene public void cancel(String reason) { writer.cancel(); cancellableThreads.cancel(reason); - } - - CopyState getCopyState() { - return copyState; + IOUtils.closeWhileHandlingException(copyState); } public boolean isReplicating() { @@ -200,4 +198,16 @@ public DiscoveryNode getTargetNode() { public String getAllocationId() { return allocationId; } + + public ReplicationCheckpoint getCheckpoint() { + return copyState.getCheckpoint(); + } + + public byte[] getInfosBytes() { + return copyState.getInfosBytes(); + } + + public ShardId shardId() { + return shard.shardId(); + } } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java index a393faabae0ea..ca89741d5bb55 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java @@ -29,7 +29,6 @@ import org.opensearch.indices.IndicesService; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.recovery.RetryableTransportClient; -import org.opensearch.indices.replication.common.CopyState; import org.opensearch.indices.replication.common.ReplicationTimer; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; @@ -126,16 +125,17 @@ public void messageReceived(CheckpointInfoRequest request, TransportChannel chan new AtomicLong(0), (throttleTime) -> {} ); - final CopyState copyState = ongoingSegmentReplications.prepareForReplication(request, segmentSegmentFileChunkWriter); - channel.sendResponse( - new CheckpointInfoResponse(copyState.getCheckpoint(), copyState.getMetadataMap(), copyState.getInfosBytes()) + final SegmentReplicationSourceHandler handler = ongoingSegmentReplications.prepareForReplication( + request, + segmentSegmentFileChunkWriter ); + channel.sendResponse(new CheckpointInfoResponse(handler.getCheckpoint(), handler.getInfosBytes())); timer.stop(); logger.trace( new ParameterizedMessage( "[replication id {}] Source node sent checkpoint info [{}] to target node [{}], timing: {}", request.getReplicationId(), - copyState.getCheckpoint(), + handler.getCheckpoint(), request.getTargetNode().getId(), timer.time() ) @@ -217,7 +217,7 @@ protected void doClose() throws IOException { /** * - * Cancels any replications on this node to a replica shard that is about to be closed. + * Before a primary shard is closed, cancel any ongoing replications to release incref'd segments. */ @Override public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) { diff --git a/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java b/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java index 3b7ae2af80ca0..7d3eb9083208b 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java @@ -13,11 +13,11 @@ import org.apache.lucene.store.ByteBuffersIndexOutput; import org.opensearch.common.collect.Tuple; import org.opensearch.common.concurrent.GatedCloseable; -import org.opensearch.common.util.concurrent.AbstractRefCounted; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import java.io.Closeable; import java.io.IOException; import java.io.UncheckedIOException; import java.util.Map; @@ -28,28 +28,21 @@ * * @opensearch.internal */ -public class CopyState extends AbstractRefCounted { +public class CopyState implements Closeable { private final GatedCloseable segmentInfosRef; - /** ReplicationCheckpoint requested */ - private final ReplicationCheckpoint requestedReplicationCheckpoint; /** Actual ReplicationCheckpoint returned by the shard */ private final ReplicationCheckpoint replicationCheckpoint; - private final Map metadataMap; private final byte[] infosBytes; private final IndexShard shard; - public CopyState(ReplicationCheckpoint requestedReplicationCheckpoint, IndexShard shard) throws IOException { - super("CopyState-" + shard.shardId()); - this.requestedReplicationCheckpoint = requestedReplicationCheckpoint; + public CopyState(IndexShard shard) throws IOException { this.shard = shard; final Tuple, ReplicationCheckpoint> latestSegmentInfosAndCheckpoint = shard .getLatestSegmentInfosAndCheckpoint(); this.segmentInfosRef = latestSegmentInfosAndCheckpoint.v1(); this.replicationCheckpoint = latestSegmentInfosAndCheckpoint.v2(); SegmentInfos segmentInfos = this.segmentInfosRef.get(); - this.metadataMap = shard.store().getSegmentMetadataMap(segmentInfos); - ByteBuffersDataOutput buffer = new ByteBuffersDataOutput(); // resource description and name are not used, but resource description cannot be null try (ByteBuffersIndexOutput indexOutput = new ByteBuffersIndexOutput(buffer, "", null)) { @@ -58,21 +51,12 @@ public CopyState(ReplicationCheckpoint requestedReplicationCheckpoint, IndexShar this.infosBytes = buffer.toArrayCopy(); } - @Override - protected void closeInternal() { - try { - segmentInfosRef.close(); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - public ReplicationCheckpoint getCheckpoint() { return replicationCheckpoint; } public Map getMetadataMap() { - return metadataMap; + return replicationCheckpoint.getMetadataMap(); } public byte[] getInfosBytes() { @@ -83,7 +67,12 @@ public IndexShard getShard() { return shard; } - public ReplicationCheckpoint getRequestedReplicationCheckpoint() { - return requestedReplicationCheckpoint; + @Override + public void close() throws IOException { + try { + segmentInfosRef.close(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } } } diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index e93d266dcab4c..2311fc582616f 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -1019,25 +1019,15 @@ protected void assertDocCounts(IndexShard indexShard, int expectedPersistedDocCo } protected void resolveCheckpointInfoResponseListener(ActionListener listener, IndexShard primary) { - final CopyState copyState; - try { - copyState = new CopyState( - ReplicationCheckpoint.empty(primary.shardId, primary.getLatestReplicationCheckpoint().getCodec()), - primary + try (final CopyState copyState = new CopyState(primary)) { + listener.onResponse( + new CheckpointInfoResponse(copyState.getCheckpoint(), copyState.getMetadataMap(), copyState.getInfosBytes()) ); } catch (IOException e) { logger.error("Unexpected error computing CopyState", e); Assert.fail("Failed to compute copyState"); throw new UncheckedIOException(e); } - - try { - listener.onResponse( - new CheckpointInfoResponse(copyState.getCheckpoint(), copyState.getMetadataMap(), copyState.getInfosBytes()) - ); - } finally { - copyState.decRef(); - } } protected void startReplicationAndAssertCancellation( diff --git a/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java b/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java index 44e2653cf01da..eb27850000bdd 100644 --- a/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java @@ -25,7 +25,6 @@ import org.opensearch.indices.recovery.FileChunkWriter; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; -import org.opensearch.indices.replication.common.CopyState; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.transport.TransportService; import org.junit.Assert; @@ -106,25 +105,21 @@ public void testPrepareAndSendSegments() throws IOException { final FileChunkWriter segmentSegmentFileChunkWriter = (fileMetadata, position, content, lastChunk, totalTranslogOps, listener) -> { listener.onResponse(null); }; - final CopyState copyState = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); - assertTrue(replications.isInCopyStateMap(request.getCheckpoint())); + final SegmentReplicationSourceHandler handler = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); assertEquals(1, replications.size()); - assertEquals(1, copyState.refCount()); getSegmentFilesRequest = new GetSegmentFilesRequest( 1L, replica.routingEntry().allocationId().getId(), replicaDiscoveryNode, - new ArrayList<>(copyState.getMetadataMap().values()), + new ArrayList<>(handler.getCheckpoint().getMetadataMap().values()), testCheckpoint ); replications.startSegmentCopy(getSegmentFilesRequest, new ActionListener<>() { @Override public void onResponse(GetSegmentFilesResponse getSegmentFilesResponse) { - assertEquals(copyState.getMetadataMap().size(), getSegmentFilesResponse.files.size()); - assertEquals(0, copyState.refCount()); - assertFalse(replications.isInCopyStateMap(request.getCheckpoint())); + assertEquals(handler.getCheckpoint().getMetadataMap().size(), getSegmentFilesResponse.files.size()); assertEquals(0, replications.size()); } @@ -148,14 +143,11 @@ public void testCancelReplication() throws IOException { // this shouldn't be called in this test. Assert.fail(); }; - final CopyState copyState = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); + final SegmentReplicationSourceHandler handler = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); assertEquals(1, replications.size()); - assertEquals(1, replications.cachedCopyStateSize()); replications.cancelReplication(primaryDiscoveryNode); - assertEquals(0, copyState.refCount()); assertEquals(0, replications.size()); - assertEquals(0, replications.cachedCopyStateSize()); } public void testCancelReplication_AfterSendFilesStarts() throws IOException, InterruptedException { @@ -174,14 +166,13 @@ public void testCancelReplication_AfterSendFilesStarts() throws IOException, Int // cancel the replication as soon as the writer starts sending files. replications.cancel(replica.routingEntry().allocationId().getId(), "Test"); }; - final CopyState copyState = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); + final SegmentReplicationSourceHandler handler = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); assertEquals(1, replications.size()); - assertEquals(1, replications.cachedCopyStateSize()); getSegmentFilesRequest = new GetSegmentFilesRequest( 1L, replica.routingEntry().allocationId().getId(), replicaDiscoveryNode, - new ArrayList<>(copyState.getMetadataMap().values()), + new ArrayList<>(handler.getCheckpoint().getMetadataMap().values()), testCheckpoint ); replications.startSegmentCopy(getSegmentFilesRequest, new ActionListener<>() { @@ -193,9 +184,7 @@ public void onResponse(GetSegmentFilesResponse getSegmentFilesResponse) { @Override public void onFailure(Exception e) { assertEquals(CancellableThreads.ExecutionCancelledException.class, e.getClass()); - assertEquals(0, copyState.refCount()); assertEquals(0, replications.size()); - assertEquals(0, replications.cachedCopyStateSize()); latch.countDown(); } }); @@ -219,8 +208,7 @@ public void testMultipleReplicasUseSameCheckpoint() throws IOException { Assert.fail(); }; - final CopyState copyState = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); - assertEquals(1, copyState.refCount()); + final SegmentReplicationSourceHandler handler = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); final CheckpointInfoRequest secondRequest = new CheckpointInfoRequest( 1L, @@ -230,15 +218,11 @@ public void testMultipleReplicasUseSameCheckpoint() throws IOException { ); replications.prepareForReplication(secondRequest, segmentSegmentFileChunkWriter); - assertEquals(2, copyState.refCount()); assertEquals(2, replications.size()); - assertEquals(1, replications.cachedCopyStateSize()); replications.cancelReplication(primaryDiscoveryNode); replications.cancelReplication(replicaDiscoveryNode); - assertEquals(0, copyState.refCount()); assertEquals(0, replications.size()); - assertEquals(0, replications.cachedCopyStateSize()); closeShards(secondReplica); } @@ -280,8 +264,7 @@ public void testShardAlreadyReplicatingToNode() throws IOException { listener.onResponse(null); }; replications.prepareForReplication(request, segmentSegmentFileChunkWriter); - CopyState copyState = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); - assertEquals(1, copyState.refCount()); + final SegmentReplicationSourceHandler handler = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); } public void testStartReplicationWithNoFilesToFetch() throws IOException { @@ -296,10 +279,8 @@ public void testStartReplicationWithNoFilesToFetch() throws IOException { // mock the FileChunkWriter so we can assert its ever called. final FileChunkWriter segmentSegmentFileChunkWriter = mock(FileChunkWriter.class); // Prepare for replication step - and ensure copyState is added to cache. - final CopyState copyState = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); - assertTrue(replications.isInCopyStateMap(request.getCheckpoint())); + final SegmentReplicationSourceHandler handler = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); assertEquals(1, replications.size()); - assertEquals(1, copyState.refCount()); getSegmentFilesRequest = new GetSegmentFilesRequest( 1L, @@ -314,8 +295,6 @@ public void testStartReplicationWithNoFilesToFetch() throws IOException { @Override public void onResponse(GetSegmentFilesResponse getSegmentFilesResponse) { assertEquals(Collections.emptyList(), getSegmentFilesResponse.files); - assertEquals(0, copyState.refCount()); - assertFalse(replications.isInCopyStateMap(request.getCheckpoint())); verifyNoInteractions(segmentSegmentFileChunkWriter); } @@ -340,8 +319,7 @@ public void testCancelAllReplicationsForShard() throws IOException { testCheckpoint ); - final CopyState copyState = replications.prepareForReplication(request, mock(FileChunkWriter.class)); - assertEquals(1, copyState.refCount()); + final SegmentReplicationSourceHandler handler = replications.prepareForReplication(request, mock(FileChunkWriter.class)); final CheckpointInfoRequest secondRequest = new CheckpointInfoRequest( 1L, @@ -351,15 +329,11 @@ public void testCancelAllReplicationsForShard() throws IOException { ); replications.prepareForReplication(secondRequest, mock(FileChunkWriter.class)); - assertEquals(2, copyState.refCount()); assertEquals(2, replications.size()); - assertEquals(1, replications.cachedCopyStateSize()); // cancel the primary's ongoing replications. replications.cancel(primary, "Test"); - assertEquals(0, copyState.refCount()); assertEquals(0, replications.size()); - assertEquals(0, replications.cachedCopyStateSize()); closeShards(replica_2); } @@ -372,8 +346,7 @@ public void testCancelForMissingIds() throws IOException { final String replicaAllocationId = replica.routingEntry().allocationId().getId(); final CheckpointInfoRequest request = new CheckpointInfoRequest(1L, replicaAllocationId, primaryDiscoveryNode, testCheckpoint); - final CopyState copyState = replications.prepareForReplication(request, mock(FileChunkWriter.class)); - assertEquals(1, copyState.refCount()); + final SegmentReplicationSourceHandler handler = replications.prepareForReplication(request, mock(FileChunkWriter.class)); final String replica_2AllocationId = replica_2.routingEntry().allocationId().getId(); final CheckpointInfoRequest secondRequest = new CheckpointInfoRequest( @@ -384,23 +357,17 @@ public void testCancelForMissingIds() throws IOException { ); replications.prepareForReplication(secondRequest, mock(FileChunkWriter.class)); - assertEquals(2, copyState.refCount()); assertEquals(2, replications.size()); assertTrue(replications.getHandlers().containsKey(replicaAllocationId)); assertTrue(replications.getHandlers().containsKey(replica_2AllocationId)); - assertEquals(1, replications.cachedCopyStateSize()); replications.clearOutOfSyncIds(primary.shardId(), Set.of(replica_2AllocationId)); - assertEquals(1, copyState.refCount()); assertEquals(1, replications.size()); assertTrue(replications.getHandlers().containsKey(replica_2AllocationId)); - assertEquals(1, replications.cachedCopyStateSize()); // cancel the primary's ongoing replications. replications.clearOutOfSyncIds(primary.shardId(), Collections.emptySet()); - assertEquals(0, copyState.refCount()); assertEquals(0, replications.size()); - assertEquals(0, replications.cachedCopyStateSize()); closeShards(replica_2); } @@ -409,11 +376,8 @@ public void testPrepareForReplicationAlreadyReplicating() throws IOException { final String replicaAllocationId = replica.routingEntry().allocationId().getId(); final CheckpointInfoRequest request = new CheckpointInfoRequest(1L, replicaAllocationId, primaryDiscoveryNode, testCheckpoint); - final CopyState copyState = replications.prepareForReplication(request, mock(FileChunkWriter.class)); - - final SegmentReplicationSourceHandler handler = replications.getHandlers().get(replicaAllocationId); - assertEquals(handler.getCopyState(), copyState); - assertEquals(1, copyState.refCount()); + final SegmentReplicationSourceHandler handler = replications.prepareForReplication(request, mock(FileChunkWriter.class)); + assertEquals(handler, replications.getHandlers().get(replicaAllocationId)); ReplicationCheckpoint secondCheckpoint = new ReplicationCheckpoint( testCheckpoint.getShardId(), @@ -430,11 +394,10 @@ public void testPrepareForReplicationAlreadyReplicating() throws IOException { secondCheckpoint ); - final CopyState secondCopyState = replications.prepareForReplication(secondRequest, mock(FileChunkWriter.class)); - final SegmentReplicationSourceHandler secondHandler = replications.getHandlers().get(replicaAllocationId); - assertEquals(secondHandler.getCopyState(), secondCopyState); - assertEquals("New copy state is incref'd", 1, secondCopyState.refCount()); - assertEquals("Old copy state is cleaned up", 0, copyState.refCount()); - + final SegmentReplicationSourceHandler secondHandler = replications.prepareForReplication( + secondRequest, + mock(FileChunkWriter.class) + ); + assertEquals(secondHandler, replications.getHandlers().get(replicaAllocationId)); } } diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java index d586767290797..901dc28794cfc 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java @@ -68,18 +68,16 @@ public void testSendFiles() throws IOException { chunkWriter = (fileMetadata, position, content, lastChunk, totalTranslogOps, listener) -> listener.onResponse(null); final ReplicationCheckpoint latestReplicationCheckpoint = primary.getLatestReplicationCheckpoint(); - final CopyState copyState = new CopyState(latestReplicationCheckpoint, primary); SegmentReplicationSourceHandler handler = new SegmentReplicationSourceHandler( localNode, chunkWriter, - threadPool, - copyState, + primary, replica.routingEntry().allocationId().getId(), 5000, 1 ); - final List expectedFiles = List.copyOf(copyState.getMetadataMap().values()); + final List expectedFiles = List.copyOf(handler.getCheckpoint().getMetadataMap().values()); final GetSegmentFilesRequest getSegmentFilesRequest = new GetSegmentFilesRequest( 1L, @@ -106,12 +104,10 @@ public void testSendFiles_emptyRequest() throws IOException { chunkWriter = mock(FileChunkWriter.class); final ReplicationCheckpoint latestReplicationCheckpoint = primary.getLatestReplicationCheckpoint(); - final CopyState copyState = new CopyState(latestReplicationCheckpoint, primary); SegmentReplicationSourceHandler handler = new SegmentReplicationSourceHandler( localNode, chunkWriter, - threadPool, - copyState, + primary, replica.routingEntry().allocationId().getId(), 5000, 1 @@ -148,12 +144,11 @@ public void testSendFileFails() throws IOException { ); final ReplicationCheckpoint latestReplicationCheckpoint = primary.getLatestReplicationCheckpoint(); - final CopyState copyState = new CopyState(latestReplicationCheckpoint, primary); + final CopyState copyState = new CopyState(primary); SegmentReplicationSourceHandler handler = new SegmentReplicationSourceHandler( localNode, chunkWriter, - threadPool, - copyState, + primary, primary.routingEntry().allocationId().getId(), 5000, 1 @@ -180,19 +175,18 @@ public void onFailure(Exception e) { assertEquals(e.getClass(), OpenSearchException.class); } }); - copyState.decRef(); + copyState.close(); } public void testReplicationAlreadyRunning() throws IOException { chunkWriter = mock(FileChunkWriter.class); final ReplicationCheckpoint latestReplicationCheckpoint = primary.getLatestReplicationCheckpoint(); - final CopyState copyState = new CopyState(latestReplicationCheckpoint, primary); + final CopyState copyState = new CopyState(primary); SegmentReplicationSourceHandler handler = new SegmentReplicationSourceHandler( localNode, chunkWriter, - threadPool, - copyState, + primary, replica.routingEntry().allocationId().getId(), 5000, 1 @@ -217,12 +211,10 @@ public void testCancelReplication() throws IOException, InterruptedException { chunkWriter = mock(FileChunkWriter.class); final ReplicationCheckpoint latestReplicationCheckpoint = primary.getLatestReplicationCheckpoint(); - final CopyState copyState = new CopyState(latestReplicationCheckpoint, primary); SegmentReplicationSourceHandler handler = new SegmentReplicationSourceHandler( localNode, chunkWriter, - threadPool, - copyState, + primary, primary.routingEntry().allocationId().getId(), 5000, 1 diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java index 3c72dda2d8b5d..f06d5595afcd5 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -273,10 +273,7 @@ public void getCheckpointMetadata( ) { try { blockGetCheckpointMetadata.await(); - final CopyState copyState = new CopyState( - ReplicationCheckpoint.empty(primaryShard.shardId(), primaryShard.getLatestReplicationCheckpoint().getCodec()), - primaryShard - ); + final CopyState copyState = new CopyState(primaryShard); listener.onResponse( new CheckpointInfoResponse(copyState.getCheckpoint(), copyState.getMetadataMap(), copyState.getInfosBytes()) ); diff --git a/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java b/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java index df180a8ab1007..0b30486038e3a 100644 --- a/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java @@ -18,7 +18,6 @@ import org.opensearch.common.settings.Settings; import org.opensearch.core.index.shard.ShardId; import org.opensearch.env.Environment; -import org.opensearch.index.codec.CodecService; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardTestCase; import org.opensearch.index.store.Store; @@ -54,13 +53,7 @@ public class CopyStateTests extends IndexShardTestCase { public void testCopyStateCreation() throws IOException { final IndexShard mockIndexShard = createMockIndexShard(); - CopyState copyState = new CopyState( - ReplicationCheckpoint.empty( - mockIndexShard.shardId(), - new CodecService(null, mockIndexShard.indexSettings(), null).codec("default").getName() - ), - mockIndexShard - ); + CopyState copyState = new CopyState(mockIndexShard); ReplicationCheckpoint checkpoint = copyState.getCheckpoint(); assertEquals(TEST_SHARD_ID, checkpoint.getShardId()); // version was never set so this should be zero @@ -86,7 +79,9 @@ public static IndexShard createMockIndexShard() throws IOException { mockShard.getOperationPrimaryTerm(), 0L, 0L, - Codec.getDefault().getName() + 0L, + Codec.getDefault().getName(), + SI_SNAPSHOT.asMap() ); final Tuple, ReplicationCheckpoint> gatedCloseableReplicationCheckpointTuple = new Tuple<>( new GatedCloseable<>(testSegmentInfos, () -> {}), diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 4dd4c734a1701..6b609d8af62a1 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -1638,12 +1638,10 @@ public void getCheckpointMetadata( ReplicationCheckpoint checkpoint, ActionListener listener ) { - try { - final CopyState copyState = new CopyState(primaryShard.getLatestReplicationCheckpoint(), primaryShard); + try (final CopyState copyState = new CopyState(primaryShard)) { listener.onResponse( new CheckpointInfoResponse(copyState.getCheckpoint(), copyState.getMetadataMap(), copyState.getInfosBytes()) ); - copyState.decRef(); } catch (IOException e) { logger.error("Unexpected error computing CopyState", e); Assert.fail("Failed to compute copyState"); From 6bc04b49a0031e37466001469abcfd593e454813 Mon Sep 17 00:00:00 2001 From: maxliu <48641774+Ferrari248@users.noreply.github.com> Date: Sun, 14 Apr 2024 13:02:52 +0800 Subject: [PATCH 2/2] [segment replication] decouple the rateLimiter of segrep and recovery (#12959) * [segment replication] decouple the rateLimiter of segrep and recovery (12939) add setting "segrep.max_bytes_per_sec" Signed-off-by: maxliu * [segment replication] decouple the rateLimiter of segrep and recovery (12939) use setting "indices.replication.max_bytes_per_sec" if enable "indices.replication.use_individual_rate_limiter" Signed-off-by: maxliu * [segment replication] decouple the rateLimiter of segrep and recovery (12939) setting "indices.replication.max_bytes_per_sec" takes effect when not negative Signed-off-by: maxliu * [segment replication] decouple the rateLimiter of segrep and recovery (#12939) add setting "indices.replication.max_bytes_per_sec" which takes effect when not negative Signed-off-by: maxliu Adds change log Signed-off-by: maxliu --------- Signed-off-by: maxliu --- CHANGELOG.md | 1 + .../common/settings/ClusterSettings.java | 1 + .../recovery/PeerRecoveryTargetService.java | 8 +- .../indices/recovery/RecoverySettings.java | 79 +++++++++++++++---- .../recovery/RemoteRecoveryTargetHandler.java | 3 +- .../RemoteSegmentFileChunkWriter.java | 10 ++- .../SegmentReplicationSourceService.java | 3 +- .../SegmentReplicationTargetService.java | 2 +- .../blobstore/BlobStoreRepository.java | 4 +- .../RecoverySettingsDynamicUpdateTests.java | 24 +++++- 10 files changed, 107 insertions(+), 28 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9b41f1cd4ca01..d336b87197e1e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add support for more than one protocol for transport ([#12967](https://github.com/opensearch-project/OpenSearch/pull/12967)) - [Tiered Caching] Add dimension-based stats to ICache implementations. ([#12531](https://github.com/opensearch-project/OpenSearch/pull/12531)) - Add changes for overriding remote store and replication settings during snapshot restore. ([#11868](https://github.com/opensearch-project/OpenSearch/pull/11868)) +- Add an individual setting of rate limiter for segment replication ([#12959](https://github.com/opensearch-project/OpenSearch/pull/12959)) ### Dependencies - Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896)) diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 004973e50d43a..ef74a794a9975 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -292,6 +292,7 @@ public void apply(Settings value, Settings current, Settings previous) { ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_CLUSTER, ShardLimitValidator.SETTING_CLUSTER_IGNORE_DOT_INDEXES, RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, + RecoverySettings.INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING, RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING, RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING, RecoverySettings.INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING, diff --git a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java index c24840d0c1333..6279a8ec3646c 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java @@ -575,7 +575,13 @@ public void messageReceived(final FileChunkRequest request, TransportChannel cha try (ReplicationRef recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) { final RecoveryTarget recoveryTarget = recoveryRef.get(); final ActionListener listener = recoveryTarget.createOrFinishListener(channel, Actions.FILE_CHUNK, request); - recoveryTarget.handleFileChunk(request, recoveryTarget, bytesSinceLastPause, recoverySettings.rateLimiter(), listener); + recoveryTarget.handleFileChunk( + request, + recoveryTarget, + bytesSinceLastPause, + recoverySettings.recoveryRateLimiter(), + listener + ); } } } diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java index 53b42347aa30d..8f9da6babdd99 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java @@ -65,6 +65,16 @@ public class RecoverySettings { Property.NodeScope ); + /** + * Individual speed setting for segment replication, default -1B to reuse the setting of recovery. + */ + public static final Setting INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING = Setting.byteSizeSetting( + "indices.replication.max_bytes_per_sec", + new ByteSizeValue(-1), + Property.Dynamic, + Property.NodeScope + ); + /** * Controls the maximum number of file chunk requests that can be sent concurrently from the source node to the target node. */ @@ -169,11 +179,13 @@ public class RecoverySettings { // choose 512KB-16B to ensure that the resulting byte[] is not a humongous allocation in G1. public static final ByteSizeValue DEFAULT_CHUNK_SIZE = new ByteSizeValue(512 * 1024 - 16, ByteSizeUnit.BYTES); - private volatile ByteSizeValue maxBytesPerSec; + private volatile ByteSizeValue recoveryMaxBytesPerSec; + private volatile ByteSizeValue replicationMaxBytesPerSec; private volatile int maxConcurrentFileChunks; private volatile int maxConcurrentOperations; private volatile int maxConcurrentRemoteStoreStreams; - private volatile SimpleRateLimiter rateLimiter; + private volatile SimpleRateLimiter recoveryRateLimiter; + private volatile SimpleRateLimiter replicationRateLimiter; private volatile TimeValue retryDelayStateSync; private volatile TimeValue retryDelayNetwork; private volatile TimeValue activityTimeout; @@ -198,17 +210,20 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) { this.internalActionLongTimeout = INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT_SETTING.get(settings); this.activityTimeout = INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING.get(settings); - this.maxBytesPerSec = INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.get(settings); - if (maxBytesPerSec.getBytes() <= 0) { - rateLimiter = null; + this.recoveryMaxBytesPerSec = INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.get(settings); + if (recoveryMaxBytesPerSec.getBytes() <= 0) { + recoveryRateLimiter = null; } else { - rateLimiter = new SimpleRateLimiter(maxBytesPerSec.getMbFrac()); + recoveryRateLimiter = new SimpleRateLimiter(recoveryMaxBytesPerSec.getMbFrac()); } + this.replicationMaxBytesPerSec = INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING.get(settings); + updateReplicationRateLimiter(); - logger.debug("using max_bytes_per_sec[{}]", maxBytesPerSec); + logger.debug("using recovery max_bytes_per_sec[{}]", recoveryMaxBytesPerSec); this.internalRemoteUploadTimeout = INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT.get(settings); - clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, this::setMaxBytesPerSec); + clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, this::setRecoveryMaxBytesPerSec); + clusterSettings.addSettingsUpdateConsumer(INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING, this::setReplicationMaxBytesPerSec); clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING, this::setMaxConcurrentFileChunks); clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING, this::setMaxConcurrentOperations); clusterSettings.addSettingsUpdateConsumer( @@ -227,8 +242,12 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) { } - public RateLimiter rateLimiter() { - return rateLimiter; + public RateLimiter recoveryRateLimiter() { + return recoveryRateLimiter; + } + + public RateLimiter replicationRateLimiter() { + return replicationRateLimiter; } public TimeValue retryDelayNetwork() { @@ -294,14 +313,40 @@ public void setInternalRemoteUploadTimeout(TimeValue internalRemoteUploadTimeout this.internalRemoteUploadTimeout = internalRemoteUploadTimeout; } - private void setMaxBytesPerSec(ByteSizeValue maxBytesPerSec) { - this.maxBytesPerSec = maxBytesPerSec; - if (maxBytesPerSec.getBytes() <= 0) { - rateLimiter = null; - } else if (rateLimiter != null) { - rateLimiter.setMBPerSec(maxBytesPerSec.getMbFrac()); + private void setRecoveryMaxBytesPerSec(ByteSizeValue recoveryMaxBytesPerSec) { + this.recoveryMaxBytesPerSec = recoveryMaxBytesPerSec; + if (recoveryMaxBytesPerSec.getBytes() <= 0) { + recoveryRateLimiter = null; + } else if (recoveryRateLimiter != null) { + recoveryRateLimiter.setMBPerSec(recoveryMaxBytesPerSec.getMbFrac()); } else { - rateLimiter = new SimpleRateLimiter(maxBytesPerSec.getMbFrac()); + recoveryRateLimiter = new SimpleRateLimiter(recoveryMaxBytesPerSec.getMbFrac()); + } + if (replicationMaxBytesPerSec.getBytes() < 0) updateReplicationRateLimiter(); + } + + private void setReplicationMaxBytesPerSec(ByteSizeValue replicationMaxBytesPerSec) { + this.replicationMaxBytesPerSec = replicationMaxBytesPerSec; + updateReplicationRateLimiter(); + } + + private void updateReplicationRateLimiter() { + if (replicationMaxBytesPerSec.getBytes() >= 0) { + if (replicationMaxBytesPerSec.getBytes() == 0) { + replicationRateLimiter = null; + } else if (replicationRateLimiter != null) { + replicationRateLimiter.setMBPerSec(replicationMaxBytesPerSec.getMbFrac()); + } else { + replicationRateLimiter = new SimpleRateLimiter(replicationMaxBytesPerSec.getMbFrac()); + } + } else { // when replicationMaxBytesPerSec = -1B, use setting of recovery + if (recoveryMaxBytesPerSec.getBytes() <= 0) { + replicationRateLimiter = null; + } else if (replicationRateLimiter != null) { + replicationRateLimiter.setMBPerSec(recoveryMaxBytesPerSec.getMbFrac()); + } else { + replicationRateLimiter = new SimpleRateLimiter(recoveryMaxBytesPerSec.getMbFrac()); + } } } diff --git a/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java index 37227596fdfe7..4d4c20f778ef3 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java @@ -111,7 +111,8 @@ public RemoteRecoveryTargetHandler( shardId, PeerRecoveryTargetService.Actions.FILE_CHUNK, requestSeqNoGenerator, - onSourceThrottle + onSourceThrottle, + recoverySettings::recoveryRateLimiter ); this.remoteStoreEnabled = remoteStoreEnabled; } diff --git a/server/src/main/java/org/opensearch/indices/replication/RemoteSegmentFileChunkWriter.java b/server/src/main/java/org/opensearch/indices/replication/RemoteSegmentFileChunkWriter.java index b52fe66816098..179e497565326 100644 --- a/server/src/main/java/org/opensearch/indices/replication/RemoteSegmentFileChunkWriter.java +++ b/server/src/main/java/org/opensearch/indices/replication/RemoteSegmentFileChunkWriter.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; +import java.util.function.Supplier; /** * This class handles sending file chunks over the transport layer to a target shard. @@ -36,11 +37,11 @@ public final class RemoteSegmentFileChunkWriter implements FileChunkWriter { private final AtomicLong requestSeqNoGenerator; private final RetryableTransportClient retryableTransportClient; private final ShardId shardId; - private final RecoverySettings recoverySettings; private final long replicationId; private final AtomicLong bytesSinceLastPause = new AtomicLong(); private final TransportRequestOptions fileChunkRequestOptions; private final Consumer onSourceThrottle; + private final Supplier rateLimiterSupplier; private final String action; public RemoteSegmentFileChunkWriter( @@ -50,14 +51,15 @@ public RemoteSegmentFileChunkWriter( ShardId shardId, String action, AtomicLong requestSeqNoGenerator, - Consumer onSourceThrottle + Consumer onSourceThrottle, + Supplier rateLimiterSupplier ) { this.replicationId = replicationId; - this.recoverySettings = recoverySettings; this.retryableTransportClient = retryableTransportClient; this.shardId = shardId; this.requestSeqNoGenerator = requestSeqNoGenerator; this.onSourceThrottle = onSourceThrottle; + this.rateLimiterSupplier = rateLimiterSupplier; this.fileChunkRequestOptions = TransportRequestOptions.builder() .withType(TransportRequestOptions.Type.RECOVERY) .withTimeout(recoverySettings.internalActionTimeout()) @@ -78,7 +80,7 @@ public void writeFileChunk( // Pause using the rate limiter, if desired, to throttle the recovery final long throttleTimeInNanos; // always fetch the ratelimiter - it might be updated in real-time on the recovery settings - final RateLimiter rl = recoverySettings.rateLimiter(); + final RateLimiter rl = rateLimiterSupplier.get(); if (rl != null) { long bytes = bytesSinceLastPause.addAndGet(content.length()); if (bytes > rl.getMinPauseCheckBytes()) { diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java index ca89741d5bb55..21fd066b8be2f 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java @@ -123,7 +123,8 @@ public void messageReceived(CheckpointInfoRequest request, TransportChannel chan request.getCheckpoint().getShardId(), SegmentReplicationTargetService.Actions.FILE_CHUNK, new AtomicLong(0), - (throttleTime) -> {} + (throttleTime) -> {}, + recoverySettings::replicationRateLimiter ); final SegmentReplicationSourceHandler handler = ongoingSegmentReplications.prepareForReplication( request, diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index 4942d39cfa48a..fbd7ab7cea346 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -635,7 +635,7 @@ public void messageReceived(final FileChunkRequest request, TransportChannel cha try (ReplicationRef ref = onGoingReplications.getSafe(request.recoveryId(), request.shardId())) { final SegmentReplicationTarget target = ref.get(); final ActionListener listener = target.createOrFinishListener(channel, Actions.FILE_CHUNK, request); - target.handleFileChunk(request, target, bytesSinceLastPause, recoverySettings.rateLimiter(), listener); + target.handleFileChunk(request, target, bytesSinceLastPause, recoverySettings.replicationRateLimiter(), listener); } } } diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index ce2ffd8bf3fb4..1a5701d9204ef 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -3162,7 +3162,7 @@ private static OffsetRangeInputStream maybeRateLimitRemoteTransfers( public InputStream maybeRateLimitRestores(InputStream stream) { return maybeRateLimit( maybeRateLimit(stream, () -> restoreRateLimiter, restoreRateLimitingTimeInNanos, BlobStoreTransferContext.SNAPSHOT_RESTORE), - recoverySettings::rateLimiter, + recoverySettings::recoveryRateLimiter, restoreRateLimitingTimeInNanos, BlobStoreTransferContext.SNAPSHOT_RESTORE ); @@ -3185,7 +3185,7 @@ public InputStream maybeRateLimitRemoteDownloadTransfers(InputStream inputStream remoteDownloadRateLimitingTimeInNanos, BlobStoreTransferContext.REMOTE_DOWNLOAD ), - recoverySettings::rateLimiter, + recoverySettings::recoveryRateLimiter, remoteDownloadRateLimitingTimeInNanos, BlobStoreTransferContext.REMOTE_DOWNLOAD ); diff --git a/server/src/test/java/org/opensearch/indices/recovery/RecoverySettingsDynamicUpdateTests.java b/server/src/test/java/org/opensearch/indices/recovery/RecoverySettingsDynamicUpdateTests.java index 75639661f539d..2793d446d66c8 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/RecoverySettingsDynamicUpdateTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/RecoverySettingsDynamicUpdateTests.java @@ -35,6 +35,8 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.common.unit.ByteSizeUnit; +import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.test.OpenSearchTestCase; import java.util.concurrent.TimeUnit; @@ -47,7 +49,27 @@ public void testZeroBytesPerSecondIsNoRateLimit() { clusterSettings.applySettings( Settings.builder().put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), 0).build() ); - assertEquals(null, recoverySettings.rateLimiter()); + assertNull(recoverySettings.recoveryRateLimiter()); + clusterSettings.applySettings( + Settings.builder().put(RecoverySettings.INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING.getKey(), 0).build() + ); + assertNull(recoverySettings.replicationRateLimiter()); + } + + public void testSetReplicationMaxBytesPerSec() { + assertEquals(40, (int) recoverySettings.replicationRateLimiter().getMBPerSec()); + clusterSettings.applySettings( + Settings.builder() + .put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), new ByteSizeValue(60, ByteSizeUnit.MB)) + .build() + ); + assertEquals(60, (int) recoverySettings.replicationRateLimiter().getMBPerSec()); + clusterSettings.applySettings( + Settings.builder() + .put(RecoverySettings.INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING.getKey(), new ByteSizeValue(80, ByteSizeUnit.MB)) + .build() + ); + assertEquals(80, (int) recoverySettings.replicationRateLimiter().getMBPerSec()); } public void testRetryDelayStateSync() {