Skip to content

Commit

Permalink
Fix unclosed store references with node-node segrep when primary node…
Browse files Browse the repository at this point in the history
… is unknown. (opensearch-project#16106) (opensearch-project#16435)

This PR fixes a bug with node-node pull based replication where if the replica does not know
the DiscoveryNode of its primary we would fail after constructing a
SegmentReplicationTarget that holds a store reference.  Only after replication
is started would a failure occur because the source node is null, and the target would not get cleaned up.
Push based replication already handled this case by catching any error and closing the target.
This update ensures the validation is done before constructing our PrimaryShardReplicationSource, before
any target object is created in both cases push and pull.


(cherry picked from commit 267c68e)

Signed-off-by: Marc Handalian <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent 06c1a5a commit 3312eda
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ public void testFailoverWithSearchReplica_WithoutWriterReplicas() throws IOExcep
.put(indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numWriterReplicas)
.put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, numSearchReplicas)
.put("index.refresh_interval", "40ms") // set lower interval so replica attempts replication cycles after primary is
// removed.
.build()
);
ensureYellow(TEST_INDEX);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public PrimaryShardReplicationSource(
RecoverySettings recoverySettings,
DiscoveryNode sourceNode
) {
assert targetNode != null : "Target node must be set";
assert sourceNode != null : "Source node must be set";
this.targetAllocationId = targetAllocationId;
this.transportService = transportService;
this.sourceNode = sourceNode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ public SegmentReplicationSource get(IndexShard shard) {

private DiscoveryNode getPrimaryNode(ShardId shardId) {
ShardRouting primaryShard = clusterService.state().routingTable().shardRoutingTable(shardId).primaryShard();
return clusterService.state().nodes().get(primaryShard.currentNodeId());
DiscoveryNode node = clusterService.state().nodes().get(primaryShard.currentNodeId());
if (node == null) {
throw new IllegalStateException("Cannot replicate, primary shard for " + shardId + " is not allocated on any node");
}
return node;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,13 @@

import org.apache.lucene.store.IOContext;
import org.opensearch.OpenSearchCorruptionException;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.lucene.Lucene;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.action.ActionListener;
Expand All @@ -19,10 +25,12 @@
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardTestCase;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.CopyState;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.io.UncheckedIOException;
Expand All @@ -45,6 +53,48 @@ public class SegmentReplicatorTests extends IndexShardTestCase {
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build();

public void testReplicationWithUnassignedPrimary() throws Exception {
final IndexShard replica = newStartedShard(false, settings, new NRTReplicationEngineFactory());
final IndexShard primary = newStartedShard(true, settings, new NRTReplicationEngineFactory());
SegmentReplicator replicator = new SegmentReplicator(threadPool);

ClusterService cs = mock(ClusterService.class);
IndexShardRoutingTable.Builder shardRoutingTable = new IndexShardRoutingTable.Builder(replica.shardId());
shardRoutingTable.addShard(replica.routingEntry());
shardRoutingTable.addShard(primary.routingEntry().moveToUnassigned(new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "test")));

when(cs.state()).thenReturn(buildClusterState(replica, shardRoutingTable));
replicator.setSourceFactory(new SegmentReplicationSourceFactory(mock(TransportService.class), mock(RecoverySettings.class), cs));
expectThrows(IllegalStateException.class, () -> replicator.startReplication(replica));
closeShards(replica, primary);
}

public void testReplicationWithUnknownPrimaryNode() throws Exception {
final IndexShard replica = newStartedShard(false, settings, new NRTReplicationEngineFactory());
final IndexShard primary = newStartedShard(true, settings, new NRTReplicationEngineFactory());
SegmentReplicator replicator = new SegmentReplicator(threadPool);

ClusterService cs = mock(ClusterService.class);
IndexShardRoutingTable.Builder shardRoutingTable = new IndexShardRoutingTable.Builder(replica.shardId());
shardRoutingTable.addShard(replica.routingEntry());
shardRoutingTable.addShard(primary.routingEntry());

when(cs.state()).thenReturn(buildClusterState(replica, shardRoutingTable));
replicator.setSourceFactory(new SegmentReplicationSourceFactory(mock(TransportService.class), mock(RecoverySettings.class), cs));
expectThrows(IllegalStateException.class, () -> replicator.startReplication(replica));
closeShards(replica, primary);
}

private ClusterState buildClusterState(IndexShard replica, IndexShardRoutingTable.Builder indexShard) {
return ClusterState.builder(clusterService.state())
.routingTable(
RoutingTable.builder()
.add(IndexRoutingTable.builder(replica.shardId().getIndex()).addIndexShard(indexShard.build()).build())
.build()
)
.build();
}

public void testStartReplicationWithoutSourceFactory() {
ThreadPool threadpool = mock(ThreadPool.class);
ExecutorService mock = mock(ExecutorService.class);
Expand Down

0 comments on commit 3312eda

Please sign in to comment.