Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Segment Replication] Add cancellation support in RemoteStoreReplicationSource #9234

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.replication;

import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.After;
import org.junit.Before;

import static org.opensearch.remotestore.RemoteStoreBaseIntegTestCase.remoteStoreClusterSettings;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, minNumDataNodes = 2)
public class SegmentReplicationWithRemoteStoreSuiteIT extends SegmentReplicationBaseIT {

private static final String REPOSITORY_NAME = "test-remote-store-repo";

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(remoteStoreClusterSettings(REPOSITORY_NAME)).build();
}

@Before
public void setup() {
internalCluster().startClusterManagerOnlyNode();
assertAcked(
clusterAdmin().preparePutRepository(REPOSITORY_NAME)
.setType("fs")
.setSettings(Settings.builder().put("location", randomRepoPath().toAbsolutePath()))
);
createIndex(INDEX_NAME);
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder()
.put(super.featureFlagSettings())
.put(FeatureFlags.REMOTE_STORE, "true")
.put(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL, "true")
.build();
}

@Override
public Settings indexSettings() {
final Settings.Builder builder = Settings.builder()
.put(super.indexSettings())
// reset shard & replica count to random values set by OpenSearchIntegTestCase.
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numberOfShards())
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas())
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT);

return builder.build();
}

@After
public void teardown() {
assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME));
}

public void testBasicReplication() throws Exception {
final int docCount = scaledRandomIntBetween(10, 50);
for (int i = 0; i < docCount; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get();
}
refresh();
ensureGreen(INDEX_NAME);
verifyStoreContent();
}

public void testDropRandomNodeDuringReplication() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(2);
internalCluster().startClusterManagerOnlyNodes(1);

final int docCount = scaledRandomIntBetween(10, 50);
for (int i = 0; i < docCount; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get();
}
refresh();

internalCluster().restartRandomDataNode();

ensureYellow(INDEX_NAME);
client().prepareIndex(INDEX_NAME).setId(Integer.toString(docCount)).setSource("field", "value" + docCount).execute().get();
internalCluster().startDataOnlyNode();
client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).actionGet();
}

public void testDeleteIndexWhileReplicating() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final int docCount = scaledRandomIntBetween(10, 50);
for (int i = 0; i < docCount; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get();
}
refresh(INDEX_NAME);
client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).actionGet();
}

public void testFullRestartDuringReplication() throws Exception {
internalCluster().startNode();
final int docCount = scaledRandomIntBetween(10, 50);
for (int i = 0; i < docCount; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get();
}
refresh(INDEX_NAME);
internalCluster().fullRestart();
ensureGreen(INDEX_NAME);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -158,14 +158,14 @@ public void startReplication(ActionListener<Void> listener) {
logger.trace(new ParameterizedMessage("Starting Replication Target: {}", description()));
// Get list of files to copy from this checkpoint.
state.setStage(SegmentReplicationState.Stage.GET_CHECKPOINT_INFO);
cancellableThreads.checkForCancel();
source.getCheckpointMetadata(getId(), checkpoint, checkpointInfoListener);
cancellableThreads.execute(() -> source.getCheckpointMetadata(getId(), checkpoint, checkpointInfoListener));

checkpointInfoListener.whenComplete(checkpointInfo -> {
final List<StoreFileMetadata> filesToFetch = getFiles(checkpointInfo);
state.setStage(SegmentReplicationState.Stage.GET_FILES);
cancellableThreads.checkForCancel();
source.getSegmentFiles(getId(), checkpointInfo.getCheckpoint(), filesToFetch, indexShard, getFilesListener);
cancellableThreads.execute(
() -> source.getSegmentFiles(getId(), checkpointInfo.getCheckpoint(), filesToFetch, indexShard, getFilesListener)
);
}, listener::onFailure);

getFilesListener.whenComplete(response -> {
Expand All @@ -175,7 +175,6 @@ public void startReplication(ActionListener<Void> listener) {
}

private List<StoreFileMetadata> getFiles(CheckpointInfoResponse checkpointInfo) throws IOException {
cancellableThreads.checkForCancel();
state.setStage(SegmentReplicationState.Stage.FILE_DIFF);
final Store.RecoveryDiff diff = Store.segmentReplicationDiff(checkpointInfo.getMetadataMap(), indexShard.getSegmentMetadataMap());
logger.trace(() -> new ParameterizedMessage("Replication diff for checkpoint {} {}", checkpointInfo.getCheckpoint(), diff));
Expand All @@ -201,7 +200,6 @@ private List<StoreFileMetadata> getFiles(CheckpointInfoResponse checkpointInfo)
}

private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse) throws OpenSearchCorruptionException {
cancellableThreads.checkForCancel();
state.setStage(SegmentReplicationState.Stage.FINALIZE_REPLICATION);
// Handle empty SegmentInfos bytes for recovering replicas
if (checkpointInfoResponse.getInfosBytes() == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,23 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.action.ActionListener;
import org.opensearch.index.engine.DocIdSeqNoAndSource;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.engine.InternalEngine;
import org.opensearch.index.engine.NRTReplicationEngineFactory;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.replication.CheckpointInfoResponse;
import org.opensearch.indices.replication.GetSegmentFilesResponse;
import org.opensearch.indices.replication.RemoteStoreReplicationSource;
import org.opensearch.indices.replication.SegmentReplicationSourceFactory;
import org.opensearch.indices.replication.SegmentReplicationTargetService;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.ReplicationType;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Before;

import java.io.IOException;
Expand All @@ -36,6 +45,9 @@
import static org.opensearch.index.engine.EngineTestCase.assertAtMostOneLuceneDocumentPerSequenceNumber;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class RemoteIndexShardTests extends SegmentReplicationIndexShardTests {

Expand Down Expand Up @@ -79,6 +91,56 @@ public void testNRTReplicaWithRemoteStorePromotedAsPrimaryCommitCommit() throws
testNRTReplicaWithRemoteStorePromotedAsPrimary(true, true);
}

public void testCloseShardWhileGettingCheckpoint() throws Exception {
String indexMapping = "{ \"" + MapperService.SINGLE_MAPPING_NAME + "\": {} }";
try (
ReplicationGroup shards = createGroup(1, getIndexSettings(), indexMapping, new NRTReplicationEngineFactory(), createTempDir())
) {
shards.startAll();
IndexShard primary = shards.getPrimary();
final IndexShard replica = shards.getReplicas().get(0);
primary.refresh("Test");
final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class);
final SegmentReplicationTargetService targetService = newTargetService(sourceFactory);
Runnable beforeCkpSourceCall = () -> targetService.beforeIndexShardClosed(replica.shardId, replica, Settings.EMPTY);
Runnable beforeGetFilesSourceCall = () -> Assert.fail("Should not have been executed");
TestRSReplicationSource testRSReplicationSource = new TestRSReplicationSource(
replica,
beforeCkpSourceCall,
beforeGetFilesSourceCall
);
when(sourceFactory.get(any())).thenReturn(testRSReplicationSource);
startReplicationAndAssertCancellation(replica, primary, targetService);
shards.removeReplica(replica);
closeShards(replica);
}
}

public void testBeforeIndexShardClosedWhileCopyingFiles() throws Exception {
try (ReplicationGroup shards = createGroup(1, getIndexSettings(), new NRTReplicationEngineFactory())) {
shards.startAll();
IndexShard primary = shards.getPrimary();
final IndexShard replica = shards.getReplicas().get(0);

shards.indexDocs(10);
primary.refresh("Test");

final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class);
final SegmentReplicationTargetService targetService = newTargetService(sourceFactory);
Runnable beforeCkpSourceCall = () -> {};
Runnable beforeGetFilesSourceCall = () -> targetService.beforeIndexShardClosed(replica.shardId, replica, Settings.EMPTY);
TestRSReplicationSource testRSReplicationSource = new TestRSReplicationSource(
replica,
beforeCkpSourceCall,
beforeGetFilesSourceCall
);
when(sourceFactory.get(any())).thenReturn(testRSReplicationSource);
startReplicationAndAssertCancellation(replica, primary, targetService);
shards.removeReplica(replica);
closeShards(replica);
}
}

public void testNRTReplicaWithRemoteStorePromotedAsPrimary(boolean performFlushFirst, boolean performFlushSecond) throws Exception {
try (
ReplicationGroup shards = createGroup(1, getIndexSettings(), indexMapping, new NRTReplicationEngineFactory(), createTempDir())
Expand Down Expand Up @@ -351,3 +413,42 @@ private void assertSingleSegmentFile(IndexShard shard, String fileName) throws I
assertEquals(segmentsFileNames.stream().findFirst().get(), fileName);
}
}

class TestRSReplicationSource extends RemoteStoreReplicationSource {

private final Thread beforeCheckpoint;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: Created threads vs Runnable to more closely mimic the actual scenario where cancellations happen in separate threads. Sample trace showing segrep event & cancellation invoke in separate threads (note Thread %tid in logs below).

[2023-08-21T19:42:05,088][INFO ][o.o.i.r.SegmentReplicationTarget] [org.opensearch.index.shard.RemoteIndexShardTests] [test][0] [Thread 42] Starting Replication Target: Id:[3] Checkpoint [ReplicationCheckpoint{shardId=[test][0], primaryTerm=88, segmentsGen=3, version=7, size=0, codec=Lucene95}] Shard:[[test][0]] Source:[TestReplicationSource] with thread opensearch[org.opensearch.index.shard.RemoteIndexShardTests][generic][T#3]
[2023-08-21T19:42:05,088][INFO ][o.o.i.r.SegmentReplicationTarget] [[Thread-5]] [test][0] [Thread 46] Cancelling replication for target Id:[3] Checkpoint [ReplicationCheckpoint{shardId=[test][0], primaryTerm=88, segmentsGen=3, version=7, size=0, codec=Lucene95}] Shard:[[test][0]] Source:[TestReplicationSource]
[2023-08-21T19:42:05,090][ERROR][o.o.i.r.SegmentReplicationTargetService] [org.opensearch.index.shard.RemoteIndexShardTests] [Thread 42] Error during segment replication, Id:[3] Checkpoint [ReplicationCheckpoint{shardId=[test][0], primaryTerm=88, segmentsGen=3, version=7, size=0, codec=Lucene95}] Shard:[[test][0]] Source:[TestReplicationSource]
...

private final Thread beforeGetFiles;

public TestRSReplicationSource(IndexShard indexShard, Runnable beforeCheckpoint, Runnable beforeGetFiles) {
super(indexShard);
this.beforeCheckpoint = new Thread(beforeCheckpoint);
this.beforeGetFiles = new Thread(beforeGetFiles);
}

@Override
public void getCheckpointMetadata(
long replicationId,
ReplicationCheckpoint checkpoint,
ActionListener<CheckpointInfoResponse> listener
) {
this.beforeCheckpoint.start();
super.getCheckpointMetadata(replicationId, checkpoint, listener);
}

@Override
public void getSegmentFiles(
long replicationId,
ReplicationCheckpoint checkpoint,
List<StoreFileMetadata> filesToFetch,
IndexShard indexShard,
ActionListener<GetSegmentFilesResponse> listener
) {
this.beforeGetFiles.start();
super.getSegmentFiles(replicationId, checkpoint, filesToFetch, indexShard, listener);
}

@Override
public String getDescription() {
return "TestReplicationSource";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.CancellableThreads;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.xcontent.MediaTypeRegistry;
import org.opensearch.index.IndexSettings;
Expand All @@ -36,16 +35,13 @@
import org.opensearch.index.engine.NRTReplicationEngineFactory;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.replication.OpenSearchIndexLevelReplicationTestCase;
import org.opensearch.index.replication.TestReplicationSource;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.index.translog.SnapshotMatchers;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.recovery.RecoveryTarget;
import org.opensearch.indices.replication.CheckpointInfoResponse;
import org.opensearch.indices.replication.GetSegmentFilesResponse;
import org.opensearch.indices.replication.SegmentReplicationSource;
import org.opensearch.indices.replication.SegmentReplicationSourceFactory;
import org.opensearch.indices.replication.SegmentReplicationState;
import org.opensearch.indices.replication.SegmentReplicationTarget;
Expand Down Expand Up @@ -84,7 +80,6 @@
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class SegmentReplicationIndexShardTests extends OpenSearchIndexLevelReplicationTestCase {

Expand Down Expand Up @@ -675,58 +670,6 @@ public void testCloseShardDuringFinalize() throws Exception {
}
}

public void testBeforeIndexShardClosedWhileCopyingFiles() throws Exception {
dreamer-89 marked this conversation as resolved.
Show resolved Hide resolved
try (ReplicationGroup shards = createGroup(1, getIndexSettings(), new NRTReplicationEngineFactory())) {
shards.startAll();
IndexShard primary = shards.getPrimary();
final IndexShard replica = shards.getReplicas().get(0);

primary.refresh("Test");

final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class);
final SegmentReplicationTargetService targetService = newTargetService(sourceFactory);
SegmentReplicationSource source = new TestReplicationSource() {

ActionListener<GetSegmentFilesResponse> listener;

@Override
public void getCheckpointMetadata(
long replicationId,
ReplicationCheckpoint checkpoint,
ActionListener<CheckpointInfoResponse> listener
) {
resolveCheckpointInfoResponseListener(listener, primary);
}

@Override
public void getSegmentFiles(
long replicationId,
ReplicationCheckpoint checkpoint,
List<StoreFileMetadata> filesToFetch,
IndexShard indexShard,
ActionListener<GetSegmentFilesResponse> listener
) {
// set the listener, we will only fail it once we cancel the source.
this.listener = listener;
// shard is closing while we are copying files.
targetService.beforeIndexShardClosed(replica.shardId, replica, Settings.EMPTY);
}

@Override
public void cancel() {
// simulate listener resolving, but only after we have issued a cancel from beforeIndexShardClosed .
final RuntimeException exception = new CancellableThreads.ExecutionCancelledException("retryable action was cancelled");
listener.onFailure(exception);
}
};
when(sourceFactory.get(any())).thenReturn(source);
startReplicationAndAssertCancellation(replica, primary, targetService);

shards.removeReplica(replica);
closeShards(replica);
}
}

protected SegmentReplicationTargetService newTargetService(SegmentReplicationSourceFactory sourceFactory) {
return new SegmentReplicationTargetService(
threadPool,
Expand Down
Loading