diff --git a/CHANGELOG.md b/CHANGELOG.md index 94298373cd3ca..103be38bfba09 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add support for more than one protocol for transport ([#12967](https://github.com/opensearch-project/OpenSearch/pull/12967)) - [Tiered Caching] Add dimension-based stats to ICache implementations. ([#12531](https://github.com/opensearch-project/OpenSearch/pull/12531)) - Add changes for overriding remote store and replication settings during snapshot restore. ([#11868](https://github.com/opensearch-project/OpenSearch/pull/11868)) +- Add an individual setting of rate limiter for segment replication ([#12959](https://github.com/opensearch-project/OpenSearch/pull/12959)) - Add cluster setting to dynamically disable filter rewrite optimization. ([#13179](https://github.com/opensearch-project/OpenSearch/pull/13179)) ### Dependencies diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationSuiteIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationSuiteIT.java index 8c045c1560dd3..27b65432e0bac 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationSuiteIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationSuiteIT.java @@ -8,7 +8,6 @@ package org.opensearch.indices.replication; -import org.apache.lucene.tests.util.LuceneTestCase; import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; @@ -16,7 +15,6 @@ import org.opensearch.test.OpenSearchIntegTestCase; import org.junit.Before; -@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/9499") @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, minNumDataNodes = 2) public class SegmentReplicationSuiteIT extends SegmentReplicationBaseIT { @@ -64,6 +62,7 @@ public void testDropRandomNodeDuringReplication() throws Exception { ensureYellow(INDEX_NAME); client().prepareIndex(INDEX_NAME).setId(Integer.toString(docCount)).setSource("field", "value" + docCount).execute().get(); internalCluster().startDataOnlyNode(); + ensureGreen(INDEX_NAME); client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).actionGet(); } diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index ddebe773d4e65..65415e325569c 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -292,6 +292,7 @@ public void apply(Settings value, Settings current, Settings previous) { ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_CLUSTER, ShardLimitValidator.SETTING_CLUSTER_IGNORE_DOT_INDEXES, RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, + RecoverySettings.INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING, RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING, RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING, RecoverySettings.INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING, diff --git a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java index c24840d0c1333..6279a8ec3646c 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java @@ -575,7 +575,13 @@ public void messageReceived(final FileChunkRequest request, TransportChannel cha try (ReplicationRef recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) { final RecoveryTarget recoveryTarget = recoveryRef.get(); final ActionListener listener = recoveryTarget.createOrFinishListener(channel, Actions.FILE_CHUNK, request); - recoveryTarget.handleFileChunk(request, recoveryTarget, bytesSinceLastPause, recoverySettings.rateLimiter(), listener); + recoveryTarget.handleFileChunk( + request, + recoveryTarget, + bytesSinceLastPause, + recoverySettings.recoveryRateLimiter(), + listener + ); } } } diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java index 53b42347aa30d..8f9da6babdd99 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java @@ -65,6 +65,16 @@ public class RecoverySettings { Property.NodeScope ); + /** + * Individual speed setting for segment replication, default -1B to reuse the setting of recovery. + */ + public static final Setting INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING = Setting.byteSizeSetting( + "indices.replication.max_bytes_per_sec", + new ByteSizeValue(-1), + Property.Dynamic, + Property.NodeScope + ); + /** * Controls the maximum number of file chunk requests that can be sent concurrently from the source node to the target node. */ @@ -169,11 +179,13 @@ public class RecoverySettings { // choose 512KB-16B to ensure that the resulting byte[] is not a humongous allocation in G1. public static final ByteSizeValue DEFAULT_CHUNK_SIZE = new ByteSizeValue(512 * 1024 - 16, ByteSizeUnit.BYTES); - private volatile ByteSizeValue maxBytesPerSec; + private volatile ByteSizeValue recoveryMaxBytesPerSec; + private volatile ByteSizeValue replicationMaxBytesPerSec; private volatile int maxConcurrentFileChunks; private volatile int maxConcurrentOperations; private volatile int maxConcurrentRemoteStoreStreams; - private volatile SimpleRateLimiter rateLimiter; + private volatile SimpleRateLimiter recoveryRateLimiter; + private volatile SimpleRateLimiter replicationRateLimiter; private volatile TimeValue retryDelayStateSync; private volatile TimeValue retryDelayNetwork; private volatile TimeValue activityTimeout; @@ -198,17 +210,20 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) { this.internalActionLongTimeout = INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT_SETTING.get(settings); this.activityTimeout = INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING.get(settings); - this.maxBytesPerSec = INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.get(settings); - if (maxBytesPerSec.getBytes() <= 0) { - rateLimiter = null; + this.recoveryMaxBytesPerSec = INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.get(settings); + if (recoveryMaxBytesPerSec.getBytes() <= 0) { + recoveryRateLimiter = null; } else { - rateLimiter = new SimpleRateLimiter(maxBytesPerSec.getMbFrac()); + recoveryRateLimiter = new SimpleRateLimiter(recoveryMaxBytesPerSec.getMbFrac()); } + this.replicationMaxBytesPerSec = INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING.get(settings); + updateReplicationRateLimiter(); - logger.debug("using max_bytes_per_sec[{}]", maxBytesPerSec); + logger.debug("using recovery max_bytes_per_sec[{}]", recoveryMaxBytesPerSec); this.internalRemoteUploadTimeout = INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT.get(settings); - clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, this::setMaxBytesPerSec); + clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, this::setRecoveryMaxBytesPerSec); + clusterSettings.addSettingsUpdateConsumer(INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING, this::setReplicationMaxBytesPerSec); clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING, this::setMaxConcurrentFileChunks); clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING, this::setMaxConcurrentOperations); clusterSettings.addSettingsUpdateConsumer( @@ -227,8 +242,12 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) { } - public RateLimiter rateLimiter() { - return rateLimiter; + public RateLimiter recoveryRateLimiter() { + return recoveryRateLimiter; + } + + public RateLimiter replicationRateLimiter() { + return replicationRateLimiter; } public TimeValue retryDelayNetwork() { @@ -294,14 +313,40 @@ public void setInternalRemoteUploadTimeout(TimeValue internalRemoteUploadTimeout this.internalRemoteUploadTimeout = internalRemoteUploadTimeout; } - private void setMaxBytesPerSec(ByteSizeValue maxBytesPerSec) { - this.maxBytesPerSec = maxBytesPerSec; - if (maxBytesPerSec.getBytes() <= 0) { - rateLimiter = null; - } else if (rateLimiter != null) { - rateLimiter.setMBPerSec(maxBytesPerSec.getMbFrac()); + private void setRecoveryMaxBytesPerSec(ByteSizeValue recoveryMaxBytesPerSec) { + this.recoveryMaxBytesPerSec = recoveryMaxBytesPerSec; + if (recoveryMaxBytesPerSec.getBytes() <= 0) { + recoveryRateLimiter = null; + } else if (recoveryRateLimiter != null) { + recoveryRateLimiter.setMBPerSec(recoveryMaxBytesPerSec.getMbFrac()); } else { - rateLimiter = new SimpleRateLimiter(maxBytesPerSec.getMbFrac()); + recoveryRateLimiter = new SimpleRateLimiter(recoveryMaxBytesPerSec.getMbFrac()); + } + if (replicationMaxBytesPerSec.getBytes() < 0) updateReplicationRateLimiter(); + } + + private void setReplicationMaxBytesPerSec(ByteSizeValue replicationMaxBytesPerSec) { + this.replicationMaxBytesPerSec = replicationMaxBytesPerSec; + updateReplicationRateLimiter(); + } + + private void updateReplicationRateLimiter() { + if (replicationMaxBytesPerSec.getBytes() >= 0) { + if (replicationMaxBytesPerSec.getBytes() == 0) { + replicationRateLimiter = null; + } else if (replicationRateLimiter != null) { + replicationRateLimiter.setMBPerSec(replicationMaxBytesPerSec.getMbFrac()); + } else { + replicationRateLimiter = new SimpleRateLimiter(replicationMaxBytesPerSec.getMbFrac()); + } + } else { // when replicationMaxBytesPerSec = -1B, use setting of recovery + if (recoveryMaxBytesPerSec.getBytes() <= 0) { + replicationRateLimiter = null; + } else if (replicationRateLimiter != null) { + replicationRateLimiter.setMBPerSec(recoveryMaxBytesPerSec.getMbFrac()); + } else { + replicationRateLimiter = new SimpleRateLimiter(recoveryMaxBytesPerSec.getMbFrac()); + } } } diff --git a/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java index 37227596fdfe7..4d4c20f778ef3 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java @@ -111,7 +111,8 @@ public RemoteRecoveryTargetHandler( shardId, PeerRecoveryTargetService.Actions.FILE_CHUNK, requestSeqNoGenerator, - onSourceThrottle + onSourceThrottle, + recoverySettings::recoveryRateLimiter ); this.remoteStoreEnabled = remoteStoreEnabled; } diff --git a/server/src/main/java/org/opensearch/indices/replication/CheckpointInfoResponse.java b/server/src/main/java/org/opensearch/indices/replication/CheckpointInfoResponse.java index 9fd3b7f3afb80..24b744bebc53d 100644 --- a/server/src/main/java/org/opensearch/indices/replication/CheckpointInfoResponse.java +++ b/server/src/main/java/org/opensearch/indices/replication/CheckpointInfoResponse.java @@ -40,6 +40,12 @@ public CheckpointInfoResponse( this.infosBytes = infosBytes; } + public CheckpointInfoResponse(final ReplicationCheckpoint checkpoint, final byte[] infosBytes) { + this.checkpoint = checkpoint; + this.infosBytes = infosBytes; + this.metadataMap = checkpoint.getMetadataMap(); + } + public CheckpointInfoResponse(StreamInput in) throws IOException { this.checkpoint = new ReplicationCheckpoint(in); this.metadataMap = in.readMap(StreamInput::readString, StoreFileMetadata::new); diff --git a/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java b/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java index 33967c0203516..6b99b3c0b0696 100644 --- a/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java +++ b/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java @@ -21,12 +21,10 @@ import org.opensearch.indices.IndicesService; import org.opensearch.indices.recovery.FileChunkWriter; import org.opensearch.indices.recovery.RecoverySettings; -import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; -import org.opensearch.indices.replication.common.CopyState; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -36,7 +34,6 @@ /** * Manages references to ongoing segrep events on a node. * Each replica will have a new {@link SegmentReplicationSourceHandler} created when starting replication. - * CopyStates will be cached for reuse between replicas and only released when all replicas have finished copying segments. * * @opensearch.internal */ @@ -45,7 +42,6 @@ class OngoingSegmentReplications { private static final Logger logger = LogManager.getLogger(OngoingSegmentReplications.class); private final RecoverySettings recoverySettings; private final IndicesService indicesService; - private final Map copyStateMap; private final Map allocationIdToHandlers; /** @@ -57,46 +53,9 @@ class OngoingSegmentReplications { OngoingSegmentReplications(IndicesService indicesService, RecoverySettings recoverySettings) { this.indicesService = indicesService; this.recoverySettings = recoverySettings; - this.copyStateMap = Collections.synchronizedMap(new HashMap<>()); this.allocationIdToHandlers = ConcurrentCollections.newConcurrentMap(); } - /* - Operations on the {@link #copyStateMap} member. - */ - - /** - * A synchronized method that checks {@link #copyStateMap} for the given {@link ReplicationCheckpoint} key - * and returns the cached value if one is present. If the key is not present, a {@link CopyState} - * object is constructed and stored in the map before being returned. - */ - synchronized CopyState getCachedCopyState(ReplicationCheckpoint checkpoint) throws IOException { - if (isInCopyStateMap(checkpoint)) { - final CopyState copyState = fetchFromCopyStateMap(checkpoint); - // we incref the copyState for every replica that is using this checkpoint. - // decref will happen when copy completes. - copyState.incRef(); - return copyState; - } else { - // From the checkpoint's shard ID, fetch the IndexShard - ShardId shardId = checkpoint.getShardId(); - final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); - final IndexShard indexShard = indexService.getShard(shardId.id()); - // build the CopyState object and cache it before returning - final CopyState copyState = new CopyState(checkpoint, indexShard); - - /* - Use the checkpoint from the request as the key in the map, rather than - the checkpoint from the created CopyState. This maximizes cache hits - if replication targets make a request with an older checkpoint. - Replication targets are expected to fetch the checkpoint in the response - CopyState to bring themselves up to date. - */ - addToCopyStateMap(checkpoint, copyState); - return copyState; - } - } - /** * Start sending files to the replica. * @@ -114,12 +73,10 @@ void startSegmentCopy(GetSegmentFilesRequest request, ActionListener wrappedListener = ActionListener.runBefore(listener, () -> { - final SegmentReplicationSourceHandler sourceHandler = allocationIdToHandlers.remove(request.getTargetAllocationId()); - if (sourceHandler != null) { - removeCopyState(sourceHandler.getCopyState()); - } - }); + final ActionListener wrappedListener = ActionListener.runBefore( + listener, + () -> allocationIdToHandlers.remove(request.getTargetAllocationId()) + ); handler.sendFiles(request, wrappedListener); } else { listener.onResponse(new GetSegmentFilesResponse(Collections.emptyList())); @@ -127,38 +84,32 @@ void startSegmentCopy(GetSegmentFilesRequest request, ActionListener handler.getAllocationId().equals(request.getTargetAllocationId()), "cancel due to retry"); - assert allocationIdToHandlers.containsKey(request.getTargetAllocationId()) == false; - allocationIdToHandlers.put(request.getTargetAllocationId(), newHandler); - } - assert allocationIdToHandlers.containsKey(request.getTargetAllocationId()); - return copyState; + SegmentReplicationSourceHandler prepareForReplication(CheckpointInfoRequest request, FileChunkWriter fileChunkWriter) { + return allocationIdToHandlers.computeIfAbsent(request.getTargetAllocationId(), aId -> { + try { + // From the checkpoint's shard ID, fetch the IndexShard + final ShardId shardId = request.getCheckpoint().getShardId(); + final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); + final IndexShard indexShard = indexService.getShard(shardId.id()); + return new SegmentReplicationSourceHandler( + request.getTargetNode(), + fileChunkWriter, + indexShard, + request.getTargetAllocationId(), + Math.toIntExact(recoverySettings.getChunkSize().getBytes()), + recoverySettings.getMaxConcurrentFileChunks() + ); + } catch (IOException e) { + throw new UncheckedIOException("Error creating replication handler", e); + } + }); } /** @@ -167,8 +118,8 @@ CopyState prepareForReplication(CheckpointInfoRequest request, FileChunkWriter f * @param shard {@link IndexShard} * @param reason {@link String} - Reason for the cancel */ - synchronized void cancel(IndexShard shard, String reason) { - cancelHandlers(handler -> handler.getCopyState().getShard().shardId().equals(shard.shardId()), reason); + void cancel(IndexShard shard, String reason) { + cancelHandlers(handler -> handler.shardId().equals(shard.shardId()), reason); } /** @@ -177,11 +128,10 @@ synchronized void cancel(IndexShard shard, String reason) { * @param allocationId {@link String} - Allocation ID. * @param reason {@link String} - Reason for the cancel */ - synchronized void cancel(String allocationId, String reason) { + void cancel(String allocationId, String reason) { final SegmentReplicationSourceHandler handler = allocationIdToHandlers.remove(allocationId); if (handler != null) { handler.cancel(reason); - removeCopyState(handler.getCopyState()); } } @@ -194,14 +144,6 @@ void cancelReplication(DiscoveryNode node) { cancelHandlers(handler -> handler.getTargetNode().equals(node), "Node left"); } - /** - * Checks if the {@link #copyStateMap} has the input {@link ReplicationCheckpoint} - * as a key by invoking {@link Map#containsKey(Object)}. - */ - boolean isInCopyStateMap(ReplicationCheckpoint replicationCheckpoint) { - return copyStateMap.containsKey(replicationCheckpoint); - } - int size() { return allocationIdToHandlers.size(); } @@ -211,58 +153,20 @@ Map getHandlers() { return allocationIdToHandlers; } - int cachedCopyStateSize() { - return copyStateMap.size(); - } - - private SegmentReplicationSourceHandler createTargetHandler( - DiscoveryNode node, - CopyState copyState, - String allocationId, - FileChunkWriter fileChunkWriter - ) { - return new SegmentReplicationSourceHandler( - node, - fileChunkWriter, - copyState.getShard().getThreadPool(), - copyState, - allocationId, - Math.toIntExact(recoverySettings.getChunkSize().getBytes()), - recoverySettings.getMaxConcurrentFileChunks() - ); - } - /** - * Adds the input {@link CopyState} object to {@link #copyStateMap}. - * The key is the CopyState's {@link ReplicationCheckpoint} object. - */ - private void addToCopyStateMap(ReplicationCheckpoint checkpoint, CopyState copyState) { - copyStateMap.putIfAbsent(checkpoint, copyState); - } - - /** - * Given a {@link ReplicationCheckpoint}, return the corresponding - * {@link CopyState} object, if any, from {@link #copyStateMap}. - */ - private CopyState fetchFromCopyStateMap(ReplicationCheckpoint replicationCheckpoint) { - return copyStateMap.get(replicationCheckpoint); - } - - /** - * Remove a CopyState. Intended to be called after a replication event completes. - * This method will remove a copyState from the copyStateMap only if its refCount hits 0. - * - * @param copyState {@link CopyState} + * Clear handlers for any allocationIds not in sync. + * @param shardId {@link ShardId} + * @param inSyncAllocationIds {@link List} of in-sync allocation Ids. */ - private synchronized void removeCopyState(CopyState copyState) { - if (copyState.decRef() == true) { - copyStateMap.remove(copyState.getRequestedReplicationCheckpoint()); - } + void clearOutOfSyncIds(ShardId shardId, Set inSyncAllocationIds) { + cancelHandlers( + (handler) -> handler.shardId().equals(shardId) && inSyncAllocationIds.contains(handler.getAllocationId()) == false, + "Shard is no longer in-sync with the primary" + ); } /** * Remove handlers from allocationIdToHandlers map based on a filter predicate. - * This will also decref the handler's CopyState reference. */ private void cancelHandlers(Predicate predicate, String reason) { final List allocationIds = allocationIdToHandlers.values() @@ -278,17 +182,4 @@ private void cancelHandlers(Predicate p cancel(allocationId, reason); } } - - /** - * Clear copystate and target handlers for any non insync allocationIds. - * @param shardId {@link ShardId} - * @param inSyncAllocationIds {@link List} of in-sync allocation Ids. - */ - public void clearOutOfSyncIds(ShardId shardId, Set inSyncAllocationIds) { - cancelHandlers( - (handler) -> handler.getCopyState().getShard().shardId().equals(shardId) - && inSyncAllocationIds.contains(handler.getAllocationId()) == false, - "Shard is no longer in-sync with the primary" - ); - } } diff --git a/server/src/main/java/org/opensearch/indices/replication/RemoteSegmentFileChunkWriter.java b/server/src/main/java/org/opensearch/indices/replication/RemoteSegmentFileChunkWriter.java index b52fe66816098..179e497565326 100644 --- a/server/src/main/java/org/opensearch/indices/replication/RemoteSegmentFileChunkWriter.java +++ b/server/src/main/java/org/opensearch/indices/replication/RemoteSegmentFileChunkWriter.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; +import java.util.function.Supplier; /** * This class handles sending file chunks over the transport layer to a target shard. @@ -36,11 +37,11 @@ public final class RemoteSegmentFileChunkWriter implements FileChunkWriter { private final AtomicLong requestSeqNoGenerator; private final RetryableTransportClient retryableTransportClient; private final ShardId shardId; - private final RecoverySettings recoverySettings; private final long replicationId; private final AtomicLong bytesSinceLastPause = new AtomicLong(); private final TransportRequestOptions fileChunkRequestOptions; private final Consumer onSourceThrottle; + private final Supplier rateLimiterSupplier; private final String action; public RemoteSegmentFileChunkWriter( @@ -50,14 +51,15 @@ public RemoteSegmentFileChunkWriter( ShardId shardId, String action, AtomicLong requestSeqNoGenerator, - Consumer onSourceThrottle + Consumer onSourceThrottle, + Supplier rateLimiterSupplier ) { this.replicationId = replicationId; - this.recoverySettings = recoverySettings; this.retryableTransportClient = retryableTransportClient; this.shardId = shardId; this.requestSeqNoGenerator = requestSeqNoGenerator; this.onSourceThrottle = onSourceThrottle; + this.rateLimiterSupplier = rateLimiterSupplier; this.fileChunkRequestOptions = TransportRequestOptions.builder() .withType(TransportRequestOptions.Type.RECOVERY) .withTimeout(recoverySettings.internalActionTimeout()) @@ -78,7 +80,7 @@ public void writeFileChunk( // Pause using the rate limiter, if desired, to throttle the recovery final long throttleTimeInNanos; // always fetch the ratelimiter - it might be updated in real-time on the recovery settings - final RateLimiter rl = recoverySettings.rateLimiter(); + final RateLimiter rl = rateLimiterSupplier.get(); if (rl != null) { long bytes = bytesSinceLastPause.addAndGet(content.length()); if (bytes > rl.getMinPauseCheckBytes()) { diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java index 674c09311c645..bb64d6b0c60b6 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java @@ -18,16 +18,18 @@ import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.action.ActionListener; +import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.recovery.FileChunkWriter; import org.opensearch.indices.recovery.MultiChunkTransfer; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.common.CopyState; import org.opensearch.indices.replication.common.ReplicationTimer; -import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.Transports; import java.io.Closeable; +import java.io.IOException; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; @@ -54,48 +56,46 @@ class SegmentReplicationSourceHandler { private final AtomicBoolean isReplicating = new AtomicBoolean(); private final DiscoveryNode targetNode; private final String allocationId; - private final FileChunkWriter writer; /** * Constructor. * - * @param targetNode - {@link DiscoveryNode} target node where files should be sent. + * @param targetNode {@link DiscoveryNode} target node where files should be sent. * @param writer {@link FileChunkWriter} implementation that sends file chunks over the transport layer. - * @param threadPool {@link ThreadPool} Thread pool. - * @param copyState {@link CopyState} CopyState holding segment file metadata. + * @param shard {@link IndexShard} The primary shard local to this node. * @param fileChunkSizeInBytes {@link Integer} * @param maxConcurrentFileChunks {@link Integer} */ SegmentReplicationSourceHandler( DiscoveryNode targetNode, FileChunkWriter writer, - ThreadPool threadPool, - CopyState copyState, + IndexShard shard, String allocationId, int fileChunkSizeInBytes, int maxConcurrentFileChunks - ) { + ) throws IOException { this.targetNode = targetNode; - this.shard = copyState.getShard(); + this.shard = shard; this.logger = Loggers.getLogger( SegmentReplicationSourceHandler.class, - copyState.getShard().shardId(), + shard.shardId(), "sending segments to " + targetNode.getName() ); this.segmentFileTransferHandler = new SegmentFileTransferHandler( - copyState.getShard(), + shard, targetNode, writer, logger, - threadPool, + shard.getThreadPool(), cancellableThreads, fileChunkSizeInBytes, maxConcurrentFileChunks ); this.allocationId = allocationId; - this.copyState = copyState; + this.copyState = new CopyState(shard); this.writer = writer; + resources.add(copyState); } /** @@ -109,6 +109,7 @@ public synchronized void sendFiles(GetSegmentFilesRequest request, ActionListene if (request.getFilesToFetch().isEmpty()) { // before completion, alert the primary of the replica's state. shard.updateVisibleCheckpointForShard(request.getTargetAllocationId(), copyState.getCheckpoint()); + IOUtils.closeWhileHandlingException(copyState); listener.onResponse(new GetSegmentFilesResponse(Collections.emptyList())); return; } @@ -183,10 +184,7 @@ public synchronized void sendFiles(GetSegmentFilesRequest request, ActionListene public void cancel(String reason) { writer.cancel(); cancellableThreads.cancel(reason); - } - - CopyState getCopyState() { - return copyState; + IOUtils.closeWhileHandlingException(copyState); } public boolean isReplicating() { @@ -200,4 +198,16 @@ public DiscoveryNode getTargetNode() { public String getAllocationId() { return allocationId; } + + public ReplicationCheckpoint getCheckpoint() { + return copyState.getCheckpoint(); + } + + public byte[] getInfosBytes() { + return copyState.getInfosBytes(); + } + + public ShardId shardId() { + return shard.shardId(); + } } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java index a393faabae0ea..21fd066b8be2f 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java @@ -29,7 +29,6 @@ import org.opensearch.indices.IndicesService; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.recovery.RetryableTransportClient; -import org.opensearch.indices.replication.common.CopyState; import org.opensearch.indices.replication.common.ReplicationTimer; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; @@ -124,18 +123,20 @@ public void messageReceived(CheckpointInfoRequest request, TransportChannel chan request.getCheckpoint().getShardId(), SegmentReplicationTargetService.Actions.FILE_CHUNK, new AtomicLong(0), - (throttleTime) -> {} + (throttleTime) -> {}, + recoverySettings::replicationRateLimiter ); - final CopyState copyState = ongoingSegmentReplications.prepareForReplication(request, segmentSegmentFileChunkWriter); - channel.sendResponse( - new CheckpointInfoResponse(copyState.getCheckpoint(), copyState.getMetadataMap(), copyState.getInfosBytes()) + final SegmentReplicationSourceHandler handler = ongoingSegmentReplications.prepareForReplication( + request, + segmentSegmentFileChunkWriter ); + channel.sendResponse(new CheckpointInfoResponse(handler.getCheckpoint(), handler.getInfosBytes())); timer.stop(); logger.trace( new ParameterizedMessage( "[replication id {}] Source node sent checkpoint info [{}] to target node [{}], timing: {}", request.getReplicationId(), - copyState.getCheckpoint(), + handler.getCheckpoint(), request.getTargetNode().getId(), timer.time() ) @@ -217,7 +218,7 @@ protected void doClose() throws IOException { /** * - * Cancels any replications on this node to a replica shard that is about to be closed. + * Before a primary shard is closed, cancel any ongoing replications to release incref'd segments. */ @Override public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) { diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index 4942d39cfa48a..fbd7ab7cea346 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -635,7 +635,7 @@ public void messageReceived(final FileChunkRequest request, TransportChannel cha try (ReplicationRef ref = onGoingReplications.getSafe(request.recoveryId(), request.shardId())) { final SegmentReplicationTarget target = ref.get(); final ActionListener listener = target.createOrFinishListener(channel, Actions.FILE_CHUNK, request); - target.handleFileChunk(request, target, bytesSinceLastPause, recoverySettings.rateLimiter(), listener); + target.handleFileChunk(request, target, bytesSinceLastPause, recoverySettings.replicationRateLimiter(), listener); } } } diff --git a/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java b/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java index 3b7ae2af80ca0..7d3eb9083208b 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java @@ -13,11 +13,11 @@ import org.apache.lucene.store.ByteBuffersIndexOutput; import org.opensearch.common.collect.Tuple; import org.opensearch.common.concurrent.GatedCloseable; -import org.opensearch.common.util.concurrent.AbstractRefCounted; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import java.io.Closeable; import java.io.IOException; import java.io.UncheckedIOException; import java.util.Map; @@ -28,28 +28,21 @@ * * @opensearch.internal */ -public class CopyState extends AbstractRefCounted { +public class CopyState implements Closeable { private final GatedCloseable segmentInfosRef; - /** ReplicationCheckpoint requested */ - private final ReplicationCheckpoint requestedReplicationCheckpoint; /** Actual ReplicationCheckpoint returned by the shard */ private final ReplicationCheckpoint replicationCheckpoint; - private final Map metadataMap; private final byte[] infosBytes; private final IndexShard shard; - public CopyState(ReplicationCheckpoint requestedReplicationCheckpoint, IndexShard shard) throws IOException { - super("CopyState-" + shard.shardId()); - this.requestedReplicationCheckpoint = requestedReplicationCheckpoint; + public CopyState(IndexShard shard) throws IOException { this.shard = shard; final Tuple, ReplicationCheckpoint> latestSegmentInfosAndCheckpoint = shard .getLatestSegmentInfosAndCheckpoint(); this.segmentInfosRef = latestSegmentInfosAndCheckpoint.v1(); this.replicationCheckpoint = latestSegmentInfosAndCheckpoint.v2(); SegmentInfos segmentInfos = this.segmentInfosRef.get(); - this.metadataMap = shard.store().getSegmentMetadataMap(segmentInfos); - ByteBuffersDataOutput buffer = new ByteBuffersDataOutput(); // resource description and name are not used, but resource description cannot be null try (ByteBuffersIndexOutput indexOutput = new ByteBuffersIndexOutput(buffer, "", null)) { @@ -58,21 +51,12 @@ public CopyState(ReplicationCheckpoint requestedReplicationCheckpoint, IndexShar this.infosBytes = buffer.toArrayCopy(); } - @Override - protected void closeInternal() { - try { - segmentInfosRef.close(); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - public ReplicationCheckpoint getCheckpoint() { return replicationCheckpoint; } public Map getMetadataMap() { - return metadataMap; + return replicationCheckpoint.getMetadataMap(); } public byte[] getInfosBytes() { @@ -83,7 +67,12 @@ public IndexShard getShard() { return shard; } - public ReplicationCheckpoint getRequestedReplicationCheckpoint() { - return requestedReplicationCheckpoint; + @Override + public void close() throws IOException { + try { + segmentInfosRef.close(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } } } diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index ce2ffd8bf3fb4..1a5701d9204ef 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -3162,7 +3162,7 @@ private static OffsetRangeInputStream maybeRateLimitRemoteTransfers( public InputStream maybeRateLimitRestores(InputStream stream) { return maybeRateLimit( maybeRateLimit(stream, () -> restoreRateLimiter, restoreRateLimitingTimeInNanos, BlobStoreTransferContext.SNAPSHOT_RESTORE), - recoverySettings::rateLimiter, + recoverySettings::recoveryRateLimiter, restoreRateLimitingTimeInNanos, BlobStoreTransferContext.SNAPSHOT_RESTORE ); @@ -3185,7 +3185,7 @@ public InputStream maybeRateLimitRemoteDownloadTransfers(InputStream inputStream remoteDownloadRateLimitingTimeInNanos, BlobStoreTransferContext.REMOTE_DOWNLOAD ), - recoverySettings::rateLimiter, + recoverySettings::recoveryRateLimiter, remoteDownloadRateLimitingTimeInNanos, BlobStoreTransferContext.REMOTE_DOWNLOAD ); diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index e93d266dcab4c..2311fc582616f 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -1019,25 +1019,15 @@ protected void assertDocCounts(IndexShard indexShard, int expectedPersistedDocCo } protected void resolveCheckpointInfoResponseListener(ActionListener listener, IndexShard primary) { - final CopyState copyState; - try { - copyState = new CopyState( - ReplicationCheckpoint.empty(primary.shardId, primary.getLatestReplicationCheckpoint().getCodec()), - primary + try (final CopyState copyState = new CopyState(primary)) { + listener.onResponse( + new CheckpointInfoResponse(copyState.getCheckpoint(), copyState.getMetadataMap(), copyState.getInfosBytes()) ); } catch (IOException e) { logger.error("Unexpected error computing CopyState", e); Assert.fail("Failed to compute copyState"); throw new UncheckedIOException(e); } - - try { - listener.onResponse( - new CheckpointInfoResponse(copyState.getCheckpoint(), copyState.getMetadataMap(), copyState.getInfosBytes()) - ); - } finally { - copyState.decRef(); - } } protected void startReplicationAndAssertCancellation( diff --git a/server/src/test/java/org/opensearch/indices/recovery/RecoverySettingsDynamicUpdateTests.java b/server/src/test/java/org/opensearch/indices/recovery/RecoverySettingsDynamicUpdateTests.java index 75639661f539d..2793d446d66c8 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/RecoverySettingsDynamicUpdateTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/RecoverySettingsDynamicUpdateTests.java @@ -35,6 +35,8 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.common.unit.ByteSizeUnit; +import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.test.OpenSearchTestCase; import java.util.concurrent.TimeUnit; @@ -47,7 +49,27 @@ public void testZeroBytesPerSecondIsNoRateLimit() { clusterSettings.applySettings( Settings.builder().put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), 0).build() ); - assertEquals(null, recoverySettings.rateLimiter()); + assertNull(recoverySettings.recoveryRateLimiter()); + clusterSettings.applySettings( + Settings.builder().put(RecoverySettings.INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING.getKey(), 0).build() + ); + assertNull(recoverySettings.replicationRateLimiter()); + } + + public void testSetReplicationMaxBytesPerSec() { + assertEquals(40, (int) recoverySettings.replicationRateLimiter().getMBPerSec()); + clusterSettings.applySettings( + Settings.builder() + .put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), new ByteSizeValue(60, ByteSizeUnit.MB)) + .build() + ); + assertEquals(60, (int) recoverySettings.replicationRateLimiter().getMBPerSec()); + clusterSettings.applySettings( + Settings.builder() + .put(RecoverySettings.INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING.getKey(), new ByteSizeValue(80, ByteSizeUnit.MB)) + .build() + ); + assertEquals(80, (int) recoverySettings.replicationRateLimiter().getMBPerSec()); } public void testRetryDelayStateSync() { diff --git a/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java b/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java index 44e2653cf01da..eb27850000bdd 100644 --- a/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java @@ -25,7 +25,6 @@ import org.opensearch.indices.recovery.FileChunkWriter; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; -import org.opensearch.indices.replication.common.CopyState; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.transport.TransportService; import org.junit.Assert; @@ -106,25 +105,21 @@ public void testPrepareAndSendSegments() throws IOException { final FileChunkWriter segmentSegmentFileChunkWriter = (fileMetadata, position, content, lastChunk, totalTranslogOps, listener) -> { listener.onResponse(null); }; - final CopyState copyState = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); - assertTrue(replications.isInCopyStateMap(request.getCheckpoint())); + final SegmentReplicationSourceHandler handler = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); assertEquals(1, replications.size()); - assertEquals(1, copyState.refCount()); getSegmentFilesRequest = new GetSegmentFilesRequest( 1L, replica.routingEntry().allocationId().getId(), replicaDiscoveryNode, - new ArrayList<>(copyState.getMetadataMap().values()), + new ArrayList<>(handler.getCheckpoint().getMetadataMap().values()), testCheckpoint ); replications.startSegmentCopy(getSegmentFilesRequest, new ActionListener<>() { @Override public void onResponse(GetSegmentFilesResponse getSegmentFilesResponse) { - assertEquals(copyState.getMetadataMap().size(), getSegmentFilesResponse.files.size()); - assertEquals(0, copyState.refCount()); - assertFalse(replications.isInCopyStateMap(request.getCheckpoint())); + assertEquals(handler.getCheckpoint().getMetadataMap().size(), getSegmentFilesResponse.files.size()); assertEquals(0, replications.size()); } @@ -148,14 +143,11 @@ public void testCancelReplication() throws IOException { // this shouldn't be called in this test. Assert.fail(); }; - final CopyState copyState = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); + final SegmentReplicationSourceHandler handler = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); assertEquals(1, replications.size()); - assertEquals(1, replications.cachedCopyStateSize()); replications.cancelReplication(primaryDiscoveryNode); - assertEquals(0, copyState.refCount()); assertEquals(0, replications.size()); - assertEquals(0, replications.cachedCopyStateSize()); } public void testCancelReplication_AfterSendFilesStarts() throws IOException, InterruptedException { @@ -174,14 +166,13 @@ public void testCancelReplication_AfterSendFilesStarts() throws IOException, Int // cancel the replication as soon as the writer starts sending files. replications.cancel(replica.routingEntry().allocationId().getId(), "Test"); }; - final CopyState copyState = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); + final SegmentReplicationSourceHandler handler = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); assertEquals(1, replications.size()); - assertEquals(1, replications.cachedCopyStateSize()); getSegmentFilesRequest = new GetSegmentFilesRequest( 1L, replica.routingEntry().allocationId().getId(), replicaDiscoveryNode, - new ArrayList<>(copyState.getMetadataMap().values()), + new ArrayList<>(handler.getCheckpoint().getMetadataMap().values()), testCheckpoint ); replications.startSegmentCopy(getSegmentFilesRequest, new ActionListener<>() { @@ -193,9 +184,7 @@ public void onResponse(GetSegmentFilesResponse getSegmentFilesResponse) { @Override public void onFailure(Exception e) { assertEquals(CancellableThreads.ExecutionCancelledException.class, e.getClass()); - assertEquals(0, copyState.refCount()); assertEquals(0, replications.size()); - assertEquals(0, replications.cachedCopyStateSize()); latch.countDown(); } }); @@ -219,8 +208,7 @@ public void testMultipleReplicasUseSameCheckpoint() throws IOException { Assert.fail(); }; - final CopyState copyState = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); - assertEquals(1, copyState.refCount()); + final SegmentReplicationSourceHandler handler = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); final CheckpointInfoRequest secondRequest = new CheckpointInfoRequest( 1L, @@ -230,15 +218,11 @@ public void testMultipleReplicasUseSameCheckpoint() throws IOException { ); replications.prepareForReplication(secondRequest, segmentSegmentFileChunkWriter); - assertEquals(2, copyState.refCount()); assertEquals(2, replications.size()); - assertEquals(1, replications.cachedCopyStateSize()); replications.cancelReplication(primaryDiscoveryNode); replications.cancelReplication(replicaDiscoveryNode); - assertEquals(0, copyState.refCount()); assertEquals(0, replications.size()); - assertEquals(0, replications.cachedCopyStateSize()); closeShards(secondReplica); } @@ -280,8 +264,7 @@ public void testShardAlreadyReplicatingToNode() throws IOException { listener.onResponse(null); }; replications.prepareForReplication(request, segmentSegmentFileChunkWriter); - CopyState copyState = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); - assertEquals(1, copyState.refCount()); + final SegmentReplicationSourceHandler handler = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); } public void testStartReplicationWithNoFilesToFetch() throws IOException { @@ -296,10 +279,8 @@ public void testStartReplicationWithNoFilesToFetch() throws IOException { // mock the FileChunkWriter so we can assert its ever called. final FileChunkWriter segmentSegmentFileChunkWriter = mock(FileChunkWriter.class); // Prepare for replication step - and ensure copyState is added to cache. - final CopyState copyState = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); - assertTrue(replications.isInCopyStateMap(request.getCheckpoint())); + final SegmentReplicationSourceHandler handler = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); assertEquals(1, replications.size()); - assertEquals(1, copyState.refCount()); getSegmentFilesRequest = new GetSegmentFilesRequest( 1L, @@ -314,8 +295,6 @@ public void testStartReplicationWithNoFilesToFetch() throws IOException { @Override public void onResponse(GetSegmentFilesResponse getSegmentFilesResponse) { assertEquals(Collections.emptyList(), getSegmentFilesResponse.files); - assertEquals(0, copyState.refCount()); - assertFalse(replications.isInCopyStateMap(request.getCheckpoint())); verifyNoInteractions(segmentSegmentFileChunkWriter); } @@ -340,8 +319,7 @@ public void testCancelAllReplicationsForShard() throws IOException { testCheckpoint ); - final CopyState copyState = replications.prepareForReplication(request, mock(FileChunkWriter.class)); - assertEquals(1, copyState.refCount()); + final SegmentReplicationSourceHandler handler = replications.prepareForReplication(request, mock(FileChunkWriter.class)); final CheckpointInfoRequest secondRequest = new CheckpointInfoRequest( 1L, @@ -351,15 +329,11 @@ public void testCancelAllReplicationsForShard() throws IOException { ); replications.prepareForReplication(secondRequest, mock(FileChunkWriter.class)); - assertEquals(2, copyState.refCount()); assertEquals(2, replications.size()); - assertEquals(1, replications.cachedCopyStateSize()); // cancel the primary's ongoing replications. replications.cancel(primary, "Test"); - assertEquals(0, copyState.refCount()); assertEquals(0, replications.size()); - assertEquals(0, replications.cachedCopyStateSize()); closeShards(replica_2); } @@ -372,8 +346,7 @@ public void testCancelForMissingIds() throws IOException { final String replicaAllocationId = replica.routingEntry().allocationId().getId(); final CheckpointInfoRequest request = new CheckpointInfoRequest(1L, replicaAllocationId, primaryDiscoveryNode, testCheckpoint); - final CopyState copyState = replications.prepareForReplication(request, mock(FileChunkWriter.class)); - assertEquals(1, copyState.refCount()); + final SegmentReplicationSourceHandler handler = replications.prepareForReplication(request, mock(FileChunkWriter.class)); final String replica_2AllocationId = replica_2.routingEntry().allocationId().getId(); final CheckpointInfoRequest secondRequest = new CheckpointInfoRequest( @@ -384,23 +357,17 @@ public void testCancelForMissingIds() throws IOException { ); replications.prepareForReplication(secondRequest, mock(FileChunkWriter.class)); - assertEquals(2, copyState.refCount()); assertEquals(2, replications.size()); assertTrue(replications.getHandlers().containsKey(replicaAllocationId)); assertTrue(replications.getHandlers().containsKey(replica_2AllocationId)); - assertEquals(1, replications.cachedCopyStateSize()); replications.clearOutOfSyncIds(primary.shardId(), Set.of(replica_2AllocationId)); - assertEquals(1, copyState.refCount()); assertEquals(1, replications.size()); assertTrue(replications.getHandlers().containsKey(replica_2AllocationId)); - assertEquals(1, replications.cachedCopyStateSize()); // cancel the primary's ongoing replications. replications.clearOutOfSyncIds(primary.shardId(), Collections.emptySet()); - assertEquals(0, copyState.refCount()); assertEquals(0, replications.size()); - assertEquals(0, replications.cachedCopyStateSize()); closeShards(replica_2); } @@ -409,11 +376,8 @@ public void testPrepareForReplicationAlreadyReplicating() throws IOException { final String replicaAllocationId = replica.routingEntry().allocationId().getId(); final CheckpointInfoRequest request = new CheckpointInfoRequest(1L, replicaAllocationId, primaryDiscoveryNode, testCheckpoint); - final CopyState copyState = replications.prepareForReplication(request, mock(FileChunkWriter.class)); - - final SegmentReplicationSourceHandler handler = replications.getHandlers().get(replicaAllocationId); - assertEquals(handler.getCopyState(), copyState); - assertEquals(1, copyState.refCount()); + final SegmentReplicationSourceHandler handler = replications.prepareForReplication(request, mock(FileChunkWriter.class)); + assertEquals(handler, replications.getHandlers().get(replicaAllocationId)); ReplicationCheckpoint secondCheckpoint = new ReplicationCheckpoint( testCheckpoint.getShardId(), @@ -430,11 +394,10 @@ public void testPrepareForReplicationAlreadyReplicating() throws IOException { secondCheckpoint ); - final CopyState secondCopyState = replications.prepareForReplication(secondRequest, mock(FileChunkWriter.class)); - final SegmentReplicationSourceHandler secondHandler = replications.getHandlers().get(replicaAllocationId); - assertEquals(secondHandler.getCopyState(), secondCopyState); - assertEquals("New copy state is incref'd", 1, secondCopyState.refCount()); - assertEquals("Old copy state is cleaned up", 0, copyState.refCount()); - + final SegmentReplicationSourceHandler secondHandler = replications.prepareForReplication( + secondRequest, + mock(FileChunkWriter.class) + ); + assertEquals(secondHandler, replications.getHandlers().get(replicaAllocationId)); } } diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java index d586767290797..901dc28794cfc 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java @@ -68,18 +68,16 @@ public void testSendFiles() throws IOException { chunkWriter = (fileMetadata, position, content, lastChunk, totalTranslogOps, listener) -> listener.onResponse(null); final ReplicationCheckpoint latestReplicationCheckpoint = primary.getLatestReplicationCheckpoint(); - final CopyState copyState = new CopyState(latestReplicationCheckpoint, primary); SegmentReplicationSourceHandler handler = new SegmentReplicationSourceHandler( localNode, chunkWriter, - threadPool, - copyState, + primary, replica.routingEntry().allocationId().getId(), 5000, 1 ); - final List expectedFiles = List.copyOf(copyState.getMetadataMap().values()); + final List expectedFiles = List.copyOf(handler.getCheckpoint().getMetadataMap().values()); final GetSegmentFilesRequest getSegmentFilesRequest = new GetSegmentFilesRequest( 1L, @@ -106,12 +104,10 @@ public void testSendFiles_emptyRequest() throws IOException { chunkWriter = mock(FileChunkWriter.class); final ReplicationCheckpoint latestReplicationCheckpoint = primary.getLatestReplicationCheckpoint(); - final CopyState copyState = new CopyState(latestReplicationCheckpoint, primary); SegmentReplicationSourceHandler handler = new SegmentReplicationSourceHandler( localNode, chunkWriter, - threadPool, - copyState, + primary, replica.routingEntry().allocationId().getId(), 5000, 1 @@ -148,12 +144,11 @@ public void testSendFileFails() throws IOException { ); final ReplicationCheckpoint latestReplicationCheckpoint = primary.getLatestReplicationCheckpoint(); - final CopyState copyState = new CopyState(latestReplicationCheckpoint, primary); + final CopyState copyState = new CopyState(primary); SegmentReplicationSourceHandler handler = new SegmentReplicationSourceHandler( localNode, chunkWriter, - threadPool, - copyState, + primary, primary.routingEntry().allocationId().getId(), 5000, 1 @@ -180,19 +175,18 @@ public void onFailure(Exception e) { assertEquals(e.getClass(), OpenSearchException.class); } }); - copyState.decRef(); + copyState.close(); } public void testReplicationAlreadyRunning() throws IOException { chunkWriter = mock(FileChunkWriter.class); final ReplicationCheckpoint latestReplicationCheckpoint = primary.getLatestReplicationCheckpoint(); - final CopyState copyState = new CopyState(latestReplicationCheckpoint, primary); + final CopyState copyState = new CopyState(primary); SegmentReplicationSourceHandler handler = new SegmentReplicationSourceHandler( localNode, chunkWriter, - threadPool, - copyState, + primary, replica.routingEntry().allocationId().getId(), 5000, 1 @@ -217,12 +211,10 @@ public void testCancelReplication() throws IOException, InterruptedException { chunkWriter = mock(FileChunkWriter.class); final ReplicationCheckpoint latestReplicationCheckpoint = primary.getLatestReplicationCheckpoint(); - final CopyState copyState = new CopyState(latestReplicationCheckpoint, primary); SegmentReplicationSourceHandler handler = new SegmentReplicationSourceHandler( localNode, chunkWriter, - threadPool, - copyState, + primary, primary.routingEntry().allocationId().getId(), 5000, 1 diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java index 3c72dda2d8b5d..f06d5595afcd5 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -273,10 +273,7 @@ public void getCheckpointMetadata( ) { try { blockGetCheckpointMetadata.await(); - final CopyState copyState = new CopyState( - ReplicationCheckpoint.empty(primaryShard.shardId(), primaryShard.getLatestReplicationCheckpoint().getCodec()), - primaryShard - ); + final CopyState copyState = new CopyState(primaryShard); listener.onResponse( new CheckpointInfoResponse(copyState.getCheckpoint(), copyState.getMetadataMap(), copyState.getInfosBytes()) ); diff --git a/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java b/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java index df180a8ab1007..0b30486038e3a 100644 --- a/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java @@ -18,7 +18,6 @@ import org.opensearch.common.settings.Settings; import org.opensearch.core.index.shard.ShardId; import org.opensearch.env.Environment; -import org.opensearch.index.codec.CodecService; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardTestCase; import org.opensearch.index.store.Store; @@ -54,13 +53,7 @@ public class CopyStateTests extends IndexShardTestCase { public void testCopyStateCreation() throws IOException { final IndexShard mockIndexShard = createMockIndexShard(); - CopyState copyState = new CopyState( - ReplicationCheckpoint.empty( - mockIndexShard.shardId(), - new CodecService(null, mockIndexShard.indexSettings(), null).codec("default").getName() - ), - mockIndexShard - ); + CopyState copyState = new CopyState(mockIndexShard); ReplicationCheckpoint checkpoint = copyState.getCheckpoint(); assertEquals(TEST_SHARD_ID, checkpoint.getShardId()); // version was never set so this should be zero @@ -86,7 +79,9 @@ public static IndexShard createMockIndexShard() throws IOException { mockShard.getOperationPrimaryTerm(), 0L, 0L, - Codec.getDefault().getName() + 0L, + Codec.getDefault().getName(), + SI_SNAPSHOT.asMap() ); final Tuple, ReplicationCheckpoint> gatedCloseableReplicationCheckpointTuple = new Tuple<>( new GatedCloseable<>(testSegmentInfos, () -> {}), diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 4dd4c734a1701..6b609d8af62a1 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -1638,12 +1638,10 @@ public void getCheckpointMetadata( ReplicationCheckpoint checkpoint, ActionListener listener ) { - try { - final CopyState copyState = new CopyState(primaryShard.getLatestReplicationCheckpoint(), primaryShard); + try (final CopyState copyState = new CopyState(primaryShard)) { listener.onResponse( new CheckpointInfoResponse(copyState.getCheckpoint(), copyState.getMetadataMap(), copyState.getInfosBytes()) ); - copyState.decRef(); } catch (IOException e) { logger.error("Unexpected error computing CopyState", e); Assert.fail("Failed to compute copyState");