Skip to content

Commit

Permalink
Fix bug where ReplicationListeners would not complete on cancellation. (
Browse files Browse the repository at this point in the history
#8478) (#8630)

* [Segment Replication] Fix bug where ReplicationListeners would not complete on target cancellation.

This change updates cancellation with Segment Replication to ensure all listeners are resolved.
It does this by requesting cancellation before shard closure instead of using ReplicationCollection's cancelForShard which immediately removes it from the replicationCollection.  This would cause the underlying ReplicationListener to never get invoked on close.

This change includes new tests using suite scope to catch for any open tasks.
This caught other locations where this was possible:
1. On a replica during force sync if the shard was closed while resolving its listeners, it would never call back to the primary if an exception was caught in the onDone method. - Fixed by refactoring those paths to use a ChannelActionListener and always reply to primary.
2. On the primary during forceSync, the primary would not successfully cancel before shard close during a forceSync, Fixed by wrapping the synchronous recoveryTarget::forceSync call in cancellableThreads.



PR cleanup.



Update log message



* PR feedback.



* Update server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java




* Add more tests.



---------



(cherry picked from commit 4ccbf9d)

Signed-off-by: Marc Handalian <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: Suraj Singh <[email protected]>
  • Loading branch information
3 people authored Jul 11, 2023
1 parent 166ed97 commit da59ae5
Show file tree
Hide file tree
Showing 12 changed files with 623 additions and 255 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import org.opensearch.index.IndexService;
import org.opensearch.index.SegmentReplicationPerGroupStats;
import org.opensearch.index.SegmentReplicationShardStats;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.IndicesService;
Expand Down Expand Up @@ -158,13 +160,16 @@ protected void verifyStoreContent() throws Exception {
final String indexName = primaryRouting.getIndexName();
final List<ShardRouting> replicaRouting = shardRoutingTable.replicaShards();
final IndexShard primaryShard = getIndexShard(clusterState, primaryRouting, indexName);
final int primaryDocCount = getDocCountFromShard(primaryShard);
final Map<String, StoreFileMetadata> primarySegmentMetadata = primaryShard.getSegmentMetadataMap();
for (ShardRouting replica : replicaRouting) {
IndexShard replicaShard = getIndexShard(clusterState, replica, indexName);
final Store.RecoveryDiff recoveryDiff = Store.segmentReplicationDiff(
primarySegmentMetadata,
replicaShard.getSegmentMetadataMap()
);
final int replicaDocCount = getDocCountFromShard(replicaShard);
assertEquals("Doc counts should match", primaryDocCount, replicaDocCount);
if (recoveryDiff.missing.isEmpty() == false || recoveryDiff.different.isEmpty() == false) {
fail(
"Expected no missing or different segments between primary and replica but diff was missing: "
Expand All @@ -185,10 +190,30 @@ protected void verifyStoreContent() throws Exception {
}, 1, TimeUnit.MINUTES);
}

private int getDocCountFromShard(IndexShard shard) {
try (final Engine.Searcher searcher = shard.acquireSearcher("test")) {
return searcher.getDirectoryReader().numDocs();
}
}

private IndexShard getIndexShard(ClusterState state, ShardRouting routing, String indexName) {
return getIndexShard(state.nodes().get(routing.currentNodeId()).getName(), indexName);
return getIndexShard(state.nodes().get(routing.currentNodeId()).getName(), routing.shardId(), indexName);
}

/**
* Fetch IndexShard by shardId, multiple shards per node allowed.
*/
protected IndexShard getIndexShard(String node, ShardId shardId, String indexName) {
final Index index = resolveIndex(indexName);
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node);
IndexService indexService = indicesService.indexServiceSafe(index);
final Optional<Integer> id = indexService.shardIds().stream().filter(sid -> sid == shardId.id()).findFirst();
return indexService.getShard(id.get());
}

/**
* Fetch IndexShard, assumes only a single shard per node.
*/
protected IndexShard getIndexShard(String node, String indexName) {
final Index index = resolveIndex(indexName);
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.replication;

import org.junit.Before;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.OpenSearchIntegTestCase;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, minNumDataNodes = 2)
public class SegmentReplicationSuiteIT extends SegmentReplicationBaseIT {

@Before
public void setup() {
internalCluster().startClusterManagerOnlyNode();
createIndex(INDEX_NAME);
}

@Override
public Settings indexSettings() {
final Settings.Builder builder = Settings.builder()
.put(super.indexSettings())
// reset shard & replica count to random values set by OpenSearchIntegTestCase.
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numberOfShards())
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas())
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT);

// TODO: Randomly enable remote store on these tests.
return builder.build();
}

public void testBasicReplication() throws Exception {
final int docCount = scaledRandomIntBetween(10, 200);
for (int i = 0; i < docCount; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get();
}
refresh();
ensureGreen(INDEX_NAME);
verifyStoreContent();
}

public void testDropRandomNodeDuringReplication() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(2);
internalCluster().startClusterManagerOnlyNodes(1);

final int docCount = scaledRandomIntBetween(10, 200);
for (int i = 0; i < docCount; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get();
}
refresh();

internalCluster().restartRandomDataNode();

ensureYellow(INDEX_NAME);
client().prepareIndex(INDEX_NAME).setId(Integer.toString(docCount)).setSource("field", "value" + docCount).execute().get();
internalCluster().startDataOnlyNode();
client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).actionGet();
}

public void testDeleteIndexWhileReplicating() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final int docCount = scaledRandomIntBetween(10, 200);
for (int i = 0; i < docCount; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get();
}
refresh(INDEX_NAME);
client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).actionGet();
}

public void testFullRestartDuringReplication() throws Exception {
internalCluster().startNode();
final int docCount = scaledRandomIntBetween(10, 200);
for (int i = 0; i < docCount; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get();
}
refresh(INDEX_NAME);
internalCluster().fullRestart();
ensureGreen(INDEX_NAME);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -837,7 +837,7 @@ void finalizeRecovery(long targetLocalCheckpoint, long trimAboveSeqNo, ActionLis
} else {
// Force round of segment replication to update its checkpoint to primary's
if (shard.indexSettings().isSegRepEnabled()) {
recoveryTarget.forceSegmentFileSync();
cancellableThreads.execute(recoveryTarget::forceSegmentFileSync);
}
}
stopWatch.stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ public enum Stage {
GET_CHECKPOINT_INFO((byte) 3),
FILE_DIFF((byte) 4),
GET_FILES((byte) 5),
FINALIZE_REPLICATION((byte) 6),
CANCELLED((byte) 7);
FINALIZE_REPLICATION((byte) 6);

private static final Stage[] STAGES = new Stage[Stage.values().length];

Expand Down Expand Up @@ -245,14 +244,6 @@ public void setStage(Stage stage) {
overallTimer.stop();
timingData.put("OVERALL", overallTimer.time());
break;
case CANCELLED:
if (this.stage == Stage.DONE) {
throw new IllegalStateException("can't move replication to Cancelled state from Done.");
}
this.stage = Stage.CANCELLED;
overallTimer.stop();
timingData.put("OVERALL", overallTimer.time());
break;
default:
throw new IllegalArgumentException("unknown SegmentReplicationState.Stage [" + stage + "]");
}
Expand Down
Loading

0 comments on commit da59ae5

Please sign in to comment.