diff --git a/qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/IndexingIT.java b/qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/IndexingIT.java index 78d964d76fa9c..a808553656f84 100644 --- a/qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/IndexingIT.java +++ b/qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/IndexingIT.java @@ -267,7 +267,6 @@ public void testIndexing() throws Exception { * @throws Exception if index creation fail * @throws UnsupportedOperationException if cluster type is unknown */ - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/7679") public void testIndexingWithSegRep() throws Exception { if (UPGRADE_FROM_VERSION.before(Version.V_2_4_0)) { logger.info("--> Skip test for version {} where segment replication feature is not available", UPGRADE_FROM_VERSION); diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index c6ecd7d62c7d1..70da3b0e38472 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -611,6 +611,67 @@ public void testCancellation() throws Exception { assertDocCounts(docCount, primaryNode); } + public void testCancellationDuringGetCheckpointInfo() throws Exception { + cancelDuringReplicaAction(SegmentReplicationSourceService.Actions.GET_CHECKPOINT_INFO); + } + + public void testCancellationDuringGetSegments() throws Exception { + cancelDuringReplicaAction(SegmentReplicationSourceService.Actions.GET_SEGMENT_FILES); + } + + private void cancelDuringReplicaAction(String actionToblock) throws Exception { + // this test stubs transport calls specific to node-node replication. + assumeFalse( + "Skipping the test as its not compatible with segment replication with remote store.", + segmentReplicationWithRemoteEnabled() + ); + final String primaryNode = internalCluster().startDataOnlyNode(); + createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build()); + ensureYellow(INDEX_NAME); + + final String replicaNode = internalCluster().startDataOnlyNode(); + ensureGreen(INDEX_NAME); + final SegmentReplicationTargetService targetService = internalCluster().getInstance( + SegmentReplicationTargetService.class, + replicaNode + ); + final IndexShard replicaShard = getIndexShard(replicaNode, INDEX_NAME); + CountDownLatch startCancellationLatch = new CountDownLatch(1); + CountDownLatch latch = new CountDownLatch(1); + + MockTransportService primaryTransportService = (MockTransportService) internalCluster().getInstance( + TransportService.class, + primaryNode + ); + primaryTransportService.addRequestHandlingBehavior(actionToblock, (handler, request, channel, task) -> { + logger.info("action {}", actionToblock); + try { + startCancellationLatch.countDown(); + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + + // index a doc and trigger replication + client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); + + // remove the replica and ensure it is cleaned up. + startCancellationLatch.await(); + SegmentReplicationTarget target = targetService.get(replicaShard.shardId()); + assertAcked( + client().admin() + .indices() + .prepareUpdateSettings(INDEX_NAME) + .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)) + ); + assertEquals("Replication not closed: " + target.getId(), 0, target.refCount()); + assertEquals("Store has a positive refCount", 0, replicaShard.store().refCount()); + // stop the replica, this will do additional checks on shutDown to ensure the replica and its store are closed properly + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(replicaNode)); + latch.countDown(); + } + public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception { final String primaryNode = internalCluster().startDataOnlyNode(); createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()); diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationUsingRemoteStoreDisruptionIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationUsingRemoteStoreDisruptionIT.java index 8372135fc55c4..3d8d001b17ddf 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationUsingRemoteStoreDisruptionIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationUsingRemoteStoreDisruptionIT.java @@ -23,8 +23,6 @@ import org.opensearch.indices.replication.SegmentReplicationState; import org.opensearch.indices.replication.SegmentReplicationTarget; import org.opensearch.indices.replication.SegmentReplicationTargetService; -import org.opensearch.indices.replication.common.ReplicationCollection; -import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.disruption.SlowClusterStateProcessing; @@ -33,6 +31,8 @@ import java.util.Set; import java.util.concurrent.TimeUnit; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + /** * This class runs tests with remote store + segRep while blocking file downloads */ @@ -59,22 +59,18 @@ public void testCancelReplicationWhileSyncingSegments() throws Exception { indexSingleDoc(); refresh(INDEX_NAME); waitForBlock(replicaNode, REPOSITORY_NAME, TimeValue.timeValueSeconds(10)); - final SegmentReplicationState state = targetService.getOngoingEventSegmentReplicationState(indexShard.shardId()); - assertEquals(SegmentReplicationState.Stage.GET_FILES, state.getStage()); - ReplicationCollection.ReplicationRef segmentReplicationTargetReplicationRef = targetService.get( - state.getReplicationId() - ); - final SegmentReplicationTarget segmentReplicationTarget = segmentReplicationTargetReplicationRef.get(); - // close the target ref here otherwise it will hold a refcount - segmentReplicationTargetReplicationRef.close(); + SegmentReplicationTarget segmentReplicationTarget = targetService.get(indexShard.shardId()); assertNotNull(segmentReplicationTarget); + assertEquals(SegmentReplicationState.Stage.GET_FILES, segmentReplicationTarget.state().getStage()); assertTrue(segmentReplicationTarget.refCount() > 0); - internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNode)); - assertBusy(() -> { - assertTrue(indexShard.routingEntry().primary()); - assertNull(targetService.getOngoingEventSegmentReplicationState(indexShard.shardId())); - assertEquals("Target should be closed", 0, segmentReplicationTarget.refCount()); - }); + assertAcked( + client().admin() + .indices() + .prepareUpdateSettings(INDEX_NAME) + .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)) + ); + assertNull(targetService.getOngoingEventSegmentReplicationState(indexShard.shardId())); + assertEquals("Target should be closed", 0, segmentReplicationTarget.refCount()); unblockNode(REPOSITORY_NAME, replicaNode); cleanupRepo(); } @@ -85,7 +81,6 @@ public void testCancelReplicationWhileFetchingMetadata() throws Exception { final Set dataNodeNames = internalCluster().getDataNodeNames(); final String replicaNode = getNode(dataNodeNames, false); - final String primaryNode = getNode(dataNodeNames, true); SegmentReplicationTargetService targetService = internalCluster().getInstance(SegmentReplicationTargetService.class, replicaNode); ensureGreen(INDEX_NAME); @@ -94,22 +89,18 @@ public void testCancelReplicationWhileFetchingMetadata() throws Exception { indexSingleDoc(); refresh(INDEX_NAME); waitForBlock(replicaNode, REPOSITORY_NAME, TimeValue.timeValueSeconds(10)); - final SegmentReplicationState state = targetService.getOngoingEventSegmentReplicationState(indexShard.shardId()); - assertEquals(SegmentReplicationState.Stage.GET_CHECKPOINT_INFO, state.getStage()); - ReplicationCollection.ReplicationRef segmentReplicationTargetReplicationRef = targetService.get( - state.getReplicationId() - ); - final SegmentReplicationTarget segmentReplicationTarget = segmentReplicationTargetReplicationRef.get(); - // close the target ref here otherwise it will hold a refcount - segmentReplicationTargetReplicationRef.close(); + SegmentReplicationTarget segmentReplicationTarget = targetService.get(indexShard.shardId()); assertNotNull(segmentReplicationTarget); + assertEquals(SegmentReplicationState.Stage.GET_CHECKPOINT_INFO, segmentReplicationTarget.state().getStage()); assertTrue(segmentReplicationTarget.refCount() > 0); - internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNode)); - assertBusy(() -> { - assertTrue(indexShard.routingEntry().primary()); - assertNull(targetService.getOngoingEventSegmentReplicationState(indexShard.shardId())); - assertEquals("Target should be closed", 0, segmentReplicationTarget.refCount()); - }); + assertAcked( + client().admin() + .indices() + .prepareUpdateSettings(INDEX_NAME) + .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)) + ); + assertNull(targetService.get(indexShard.shardId())); + assertEquals("Target should be closed", 0, segmentReplicationTarget.refCount()); unblockNode(REPOSITORY_NAME, replicaNode); cleanupRepo(); } diff --git a/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java b/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java index 02fc8feefd698..a17779810239a 100644 --- a/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java +++ b/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java @@ -8,16 +8,14 @@ package org.opensearch.indices.replication; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; +import org.opensearch.action.ActionListenerResponseHandler; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.core.action.ActionListener; -import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.recovery.RecoverySettings; -import org.opensearch.indices.recovery.RetryableTransportClient; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportRequestOptions; import org.opensearch.transport.TransportService; @@ -35,9 +33,7 @@ */ public class PrimaryShardReplicationSource implements SegmentReplicationSource { - private static final Logger logger = LogManager.getLogger(PrimaryShardReplicationSource.class); - - private final RetryableTransportClient transportClient; + private final TransportService transportService; private final DiscoveryNode sourceNode; private final DiscoveryNode targetNode; @@ -52,12 +48,7 @@ public PrimaryShardReplicationSource( DiscoveryNode sourceNode ) { this.targetAllocationId = targetAllocationId; - this.transportClient = new RetryableTransportClient( - transportService, - sourceNode, - recoverySettings.internalActionRetryTimeout(), - logger - ); + this.transportService = transportService; this.sourceNode = sourceNode; this.targetNode = targetNode; this.recoverySettings = recoverySettings; @@ -69,10 +60,14 @@ public void getCheckpointMetadata( ReplicationCheckpoint checkpoint, ActionListener listener ) { - final Writeable.Reader reader = CheckpointInfoResponse::new; - final ActionListener responseListener = ActionListener.map(listener, r -> r); final CheckpointInfoRequest request = new CheckpointInfoRequest(replicationId, targetAllocationId, targetNode, checkpoint); - transportClient.executeRetryableAction(GET_CHECKPOINT_INFO, request, responseListener, reader); + transportService.sendRequest( + sourceNode, + GET_CHECKPOINT_INFO, + request, + TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionRetryTimeout()).build(), + new ActionListenerResponseHandler<>(listener, CheckpointInfoResponse::new, ThreadPool.Names.GENERIC) + ); } @Override @@ -88,8 +83,6 @@ public void getSegmentFiles( // MultiFileWriter takes care of progress tracking for downloads in this scenario // TODO: Move state management and tracking into replication methods and use chunking and data // copy mechanisms only from MultiFileWriter - final Writeable.Reader reader = GetSegmentFilesResponse::new; - final ActionListener responseListener = ActionListener.map(listener, r -> r); final GetSegmentFilesRequest request = new GetSegmentFilesRequest( replicationId, targetAllocationId, @@ -97,20 +90,17 @@ public void getSegmentFiles( filesToFetch, checkpoint ); - final TransportRequestOptions options = TransportRequestOptions.builder() - .withTimeout(recoverySettings.internalActionLongTimeout()) - .build(); - transportClient.executeRetryableAction(GET_SEGMENT_FILES, request, options, responseListener, reader); + transportService.sendRequest( + sourceNode, + GET_SEGMENT_FILES, + request, + TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionLongTimeout()).build(), + new ActionListenerResponseHandler<>(listener, GetSegmentFilesResponse::new, ThreadPool.Names.GENERIC) + ); } @Override public String getDescription() { return sourceNode.getName(); } - - @Override - public void cancel() { - transportClient.cancel(); - } - } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index cc71ef816e525..af764556b7549 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -83,6 +83,16 @@ protected void closeInternal() { } } + @Override + protected void onCancel(String reason) { + try { + notifyListener(new ReplicationFailedException(reason), false); + } finally { + source.cancel(); + cancellableThreads.cancel(reason); + } + } + @Override protected String getPrefix() { return REPLICATION_PREFIX + UUIDs.randomBase64UUID() + "."; @@ -320,16 +330,4 @@ private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse) } } } - - /** - * Trigger a cancellation, this method will not close the target a subsequent call to #fail is required from target service. - */ - @Override - public void cancel(String reason) { - if (finished.get() == false) { - logger.trace(new ParameterizedMessage("Cancelling replication for target {}", description())); - cancellableThreads.cancel(reason); - source.cancel(); - } - } } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index d6db154a4e0e3..f28f829545d59 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -84,10 +84,6 @@ public class SegmentReplicationTargetService extends AbstractLifecycleComponent private final ClusterService clusterService; private final TransportService transportService; - public ReplicationRef get(long replicationId) { - return onGoingReplications.get(replicationId); - } - /** * The internal actions * @@ -158,6 +154,7 @@ protected void doStart() { @Override protected void doStop() { if (DiscoveryNode.isDataNode(clusterService.getSettings())) { + assert onGoingReplications.size() == 0 : "Replication collection should be empty on shutdown"; clusterService.removeListener(this); } } @@ -201,7 +198,7 @@ public void clusterChanged(ClusterChangedEvent event) { @Override public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) { if (indexShard != null && indexShard.indexSettings().isSegRepEnabled()) { - onGoingReplications.requestCancel(indexShard.shardId(), "Shard closing"); + onGoingReplications.cancelForShard(indexShard.shardId(), "Shard closing"); latestReceivedCheckpoint.remove(shardId); } } @@ -223,7 +220,7 @@ public void afterIndexShardStarted(IndexShard indexShard) { @Override public void shardRoutingChanged(IndexShard indexShard, @Nullable ShardRouting oldRouting, ShardRouting newRouting) { if (oldRouting != null && indexShard.indexSettings().isSegRepEnabled() && oldRouting.primary() == false && newRouting.primary()) { - onGoingReplications.requestCancel(indexShard.shardId(), "Shard has been promoted to primary"); + onGoingReplications.cancelForShard(indexShard.shardId(), "Shard has been promoted to primary"); latestReceivedCheckpoint.remove(indexShard.shardId()); } } @@ -255,6 +252,14 @@ public SegmentReplicationState getSegmentReplicationState(ShardId shardId) { .orElseGet(() -> getlatestCompletedEventSegmentReplicationState(shardId)); } + public ReplicationRef get(long replicationId) { + return onGoingReplications.get(replicationId); + } + + public SegmentReplicationTarget get(ShardId shardId) { + return onGoingReplications.getOngoingReplicationTarget(shardId); + } + /** * Invoked when a new checkpoint is received from a primary shard. * It checks if a new checkpoint should be processed or not and starts replication if needed. @@ -454,7 +459,13 @@ protected boolean processLatestReceivedCheckpoint(IndexShard replicaShard, Threa latestPublishedCheckpoint ) ); - Runnable runnable = () -> onNewCheckpoint(latestReceivedCheckpoint.get(replicaShard.shardId()), replicaShard); + Runnable runnable = () -> { + // if we retry ensure the shard is not in the process of being closed. + // it will be removed from indexService's collection before the shard is actually marked as closed. + if (indicesService.getShardOrNull(replicaShard.shardId()) != null) { + onNewCheckpoint(latestReceivedCheckpoint.get(replicaShard.shardId()), replicaShard); + } + }; // Checks if we are using same thread and forks if necessary. if (thread == Thread.currentThread()) { threadPool.generic().execute(runnable); @@ -548,9 +559,6 @@ public ReplicationRunner(long replicationId) { @Override public void onFailure(Exception e) { - try (final ReplicationRef ref = onGoingReplications.get(replicationId)) { - logger.error(() -> new ParameterizedMessage("Error during segment replication, {}", ref.get().description()), e); - } onGoingReplications.fail(replicationId, new ReplicationFailedException("Unexpected Error during replication", e), false); } diff --git a/server/src/test/java/org/opensearch/indices/replication/PrimaryShardReplicationSourceTests.java b/server/src/test/java/org/opensearch/indices/replication/PrimaryShardReplicationSourceTests.java index e4dd32e5c6f70..2531790ede4af 100644 --- a/server/src/test/java/org/opensearch/indices/replication/PrimaryShardReplicationSourceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/PrimaryShardReplicationSourceTests.java @@ -15,7 +15,6 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; -import org.opensearch.common.util.CancellableThreads; import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.action.ActionListener; import org.opensearch.index.shard.IndexShard; @@ -27,12 +26,9 @@ import org.opensearch.test.ClusterServiceUtils; import org.opensearch.test.transport.CapturingTransport; import org.opensearch.transport.TransportService; -import org.junit.Assert; import java.util.Arrays; import java.util.Collections; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import static org.mockito.Mockito.mock; @@ -165,40 +161,6 @@ public void testTransportTimeoutForGetSegmentFilesAction() { assertEquals(recoverySettings.internalActionLongTimeout(), capturedRequest.options.timeout()); } - public void testGetSegmentFiles_CancelWhileRequestOpen() throws InterruptedException { - CountDownLatch latch = new CountDownLatch(1); - final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint( - indexShard.shardId(), - PRIMARY_TERM, - SEGMENTS_GEN, - VERSION, - Codec.getDefault().getName() - ); - StoreFileMetadata testMetadata = new StoreFileMetadata("testFile", 1L, "checksum", Version.LATEST); - replicationSource.getSegmentFiles( - REPLICATION_ID, - checkpoint, - Arrays.asList(testMetadata), - mock(IndexShard.class), - (fileName, bytesRecovered) -> {}, - new ActionListener<>() { - @Override - public void onResponse(GetSegmentFilesResponse getSegmentFilesResponse) { - Assert.fail("onFailure response expected."); - } - - @Override - public void onFailure(Exception e) { - assertEquals(e.getClass(), CancellableThreads.ExecutionCancelledException.class); - latch.countDown(); - } - } - ); - replicationSource.cancel(); - latch.await(2, TimeUnit.SECONDS); - assertEquals("listener should have resolved in a failure", 0, latch.getCount()); - } - private DiscoveryNode newDiscoveryNode(String nodeName) { return new DiscoveryNode( nodeName, diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java index 9877e4f96f018..85030faeab061 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -623,6 +623,7 @@ public void testForceSegmentSyncHandlerWithFailure_AlreadyClosedException_swallo } public void testTargetCancelledBeforeStartInvoked() { + final String cancelReason = "test"; final SegmentReplicationTarget target = new SegmentReplicationTarget( replicaShard, primaryShard.getLatestReplicationCheckpoint(), @@ -636,12 +637,12 @@ public void onReplicationDone(SegmentReplicationState state) { @Override public void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure) { // failures leave state object in last entered stage. - assertEquals(SegmentReplicationState.Stage.GET_CHECKPOINT_INFO, state.getStage()); - assertTrue(e.getCause() instanceof CancellableThreads.ExecutionCancelledException); + assertEquals(SegmentReplicationState.Stage.INIT, state.getStage()); + assertEquals(cancelReason, e.getMessage()); } } ); - target.cancel("test"); + target.cancel(cancelReason); sut.startReplication(target); }