Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Add wait in replica recovery for allocation id to propagate on source node #15786

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,12 @@
import org.opensearch.action.StepListener;
import org.opensearch.action.support.ThreadedActionListener;
import org.opensearch.action.support.replication.ReplicationResponse;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.SetOnce;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.index.engine.RecoveryEngineException;
import org.opensearch.index.seqno.ReplicationTracker;
import org.opensearch.index.seqno.RetentionLease;
import org.opensearch.index.seqno.RetentionLeaseNotFoundException;
import org.opensearch.index.seqno.RetentionLeases;
Expand Down Expand Up @@ -58,21 +55,7 @@ public LocalStorePeerRecoverySourceHandler(
@Override
protected void innerRecoveryToTarget(ActionListener<RecoveryResponse> listener, Consumer<Exception> onFailure) throws IOException {
final SetOnce<RetentionLease> retentionLeaseRef = new SetOnce<>();

RunUnderPrimaryPermit.run(() -> {
final IndexShardRoutingTable routingTable = shard.getReplicationGroup().getRoutingTable();
ShardRouting targetShardRouting = routingTable.getByAllocationId(request.targetAllocationId());
if (targetShardRouting == null) {
logger.debug(
"delaying recovery of {} as it is not listed as assigned to target node {}",
request.shardId(),
request.targetNode()
);
throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node");
}
assert targetShardRouting.initializing() : "expected recovery target to be initializing but was " + targetShardRouting;
retentionLeaseRef.set(shard.getRetentionLeases().get(ReplicationTracker.getPeerRecoveryRetentionLeaseId(targetShardRouting)));
}, shardId + " validating recovery target [" + request.targetAllocationId() + "] registered ", shard, cancellableThreads, logger);
waitForAssignmentPropagate(retentionLeaseRef);
final Closeable retentionLock = shard.acquireHistoryRetentionLock();
resources.add(retentionLock);
final long startingSeqNo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,14 @@
import org.opensearch.LegacyESVersion;
import org.opensearch.action.ActionRunnable;
import org.opensearch.action.StepListener;
import org.opensearch.action.bulk.BackoffPolicy;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.action.support.ThreadedActionListener;
import org.opensearch.action.support.replication.ReplicationResponse;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.CheckedRunnable;
import org.opensearch.common.SetOnce;
import org.opensearch.common.StopWatch;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.lease.Releasable;
Expand All @@ -60,6 +64,7 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.index.engine.RecoveryEngineException;
import org.opensearch.index.seqno.ReplicationTracker;
import org.opensearch.index.seqno.RetentionLease;
import org.opensearch.index.seqno.RetentionLeaseNotFoundException;
import org.opensearch.index.seqno.RetentionLeases;
Expand All @@ -80,12 +85,14 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.IntSupplier;
import java.util.stream.StreamSupport;
Expand Down Expand Up @@ -192,6 +199,50 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
protected abstract void innerRecoveryToTarget(ActionListener<RecoveryResponse> listener, Consumer<Exception> onFailure)
throws IOException;

/*
Waits for cluster state propagation of assignment of replica on the target node
*/
void waitForAssignmentPropagate(SetOnce<RetentionLease> retentionLeaseRef) {
BackoffPolicy EXPONENTIAL_BACKOFF_POLICY = BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(1000), 5);
AtomicReference<ShardRouting> targetShardRouting = new AtomicReference<>();
Iterator<TimeValue> backoffDelayIterator = EXPONENTIAL_BACKOFF_POLICY.iterator();
while (backoffDelayIterator.hasNext()) {
RunUnderPrimaryPermit.run(() -> {
final IndexShardRoutingTable routingTable = shard.getReplicationGroup().getRoutingTable();
targetShardRouting.set(routingTable.getByAllocationId(request.targetAllocationId()));
if (targetShardRouting.get() == null) {
logger.info(
"delaying recovery of {} as it is not listed as assigned to target node {}",
request.shardId(),
request.targetNode()
);
Thread.sleep(backoffDelayIterator.next().millis());
}
if (targetShardRouting.get() != null) {
assert targetShardRouting.get().initializing() : "expected recovery target to be initializing but was "
+ targetShardRouting;
retentionLeaseRef.set(
shard.getRetentionLeases().get(ReplicationTracker.getPeerRecoveryRetentionLeaseId(targetShardRouting.get()))
);
}

},
shardId + " validating recovery target [" + request.targetAllocationId() + "] registered ",
shard,
cancellableThreads,
logger
);

if (targetShardRouting.get() != null) {
return;
}
}
if (targetShardRouting.get() != null) {
return;
}
throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node");
}

protected void finalizeStepAndCompleteFuture(
long startingSeqNo,
StepListener<SendSnapshotResult> sendSnapshotStep,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@

import org.apache.lucene.index.IndexCommit;
import org.opensearch.action.StepListener;
import org.opensearch.common.SetOnce;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.index.engine.RecoveryEngineException;
import org.opensearch.index.seqno.RetentionLease;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.RunUnderPrimaryPermit;
Expand Down Expand Up @@ -48,7 +50,8 @@ protected void innerRecoveryToTarget(ActionListener<RecoveryResponse> listener,
// A replica of an index with remote translog does not require the translogs locally and keeps receiving the
// updated segments file on refresh, flushes, and merges. In recovery, here, only file-based recovery is performed
// and there is no translog replay done.

final SetOnce<RetentionLease> retentionLeaseRef = new SetOnce<>();
waitForAssignmentPropagate(retentionLeaseRef);
final StepListener<SendFileResult> sendFileStep = new StepListener<>();
final StepListener<TimeValue> prepareEngineStep = new StepListener<>();
final StepListener<SendSnapshotResult> sendSnapshotStep = new StepListener<>();
Expand Down Expand Up @@ -102,4 +105,5 @@ protected void innerRecoveryToTarget(ActionListener<RecoveryResponse> listener,

finalizeStepAndCompleteFuture(startingSeqNo, sendSnapshotStep, sendFileStep, prepareEngineStep, onFailure);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.Numbers;
import org.opensearch.common.Randomness;
import org.opensearch.common.SetOnce;
Expand Down Expand Up @@ -141,6 +143,8 @@
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

/**
Expand Down Expand Up @@ -746,6 +750,206 @@ void phase2(
assertFalse(phase2Called.get());
}

/*
If the replica allocation id is not reflected in source nodes routing table even after retries,
recoveries should fail
*/
public void testThrowExceptionOnNoTargetInRouting() throws IOException {
final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, service);
final StartRecoveryRequest request = getStartRecoveryRequest();
final IndexShard shard = mock(IndexShard.class);
when(shard.seqNoStats()).thenReturn(mock(SeqNoStats.class));
when(shard.segmentStats(anyBoolean(), anyBoolean())).thenReturn(mock(SegmentsStats.class));
when(shard.isRelocatedPrimary()).thenReturn(false);
final org.opensearch.index.shard.ReplicationGroup replicationGroup = mock(org.opensearch.index.shard.ReplicationGroup.class);
final IndexShardRoutingTable routingTable = mock(IndexShardRoutingTable.class);
when(routingTable.getByAllocationId(anyString())).thenReturn(null);
when(shard.getReplicationGroup()).thenReturn(replicationGroup);
when(replicationGroup.getRoutingTable()).thenReturn(routingTable);
when(shard.acquireSafeIndexCommit()).thenReturn(mock(GatedCloseable.class));
doAnswer(invocation -> {
((ActionListener<Releasable>) invocation.getArguments()[0]).onResponse(() -> {});
return null;
}).when(shard).acquirePrimaryOperationPermit(any(), anyString(), any());

final IndexMetadata.Builder indexMetadata = IndexMetadata.builder("test")
.settings(
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, between(0, 5))
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 5))
.put(IndexMetadata.SETTING_VERSION_CREATED, VersionUtils.randomVersion(random()))
.put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random()))
);
if (randomBoolean()) {
indexMetadata.state(IndexMetadata.State.CLOSE);
}
when(shard.indexSettings()).thenReturn(new IndexSettings(indexMetadata.build(), Settings.EMPTY));

final AtomicBoolean phase1Called = new AtomicBoolean();
final AtomicBoolean prepareTargetForTranslogCalled = new AtomicBoolean();
final AtomicBoolean phase2Called = new AtomicBoolean();
final RecoverySourceHandler handler = new LocalStorePeerRecoverySourceHandler(
shard,
mock(RecoveryTargetHandler.class),
threadPool,
request,
Math.toIntExact(recoverySettings.getChunkSize().getBytes()),
between(1, 8),
between(1, 8)
) {

@Override
void phase1(
IndexCommit snapshot,
long startingSeqNo,
IntSupplier translogOps,
ActionListener<SendFileResult> listener,
boolean skipCreateRetentionLeaseStep
) {
phase1Called.set(true);
super.phase1(snapshot, startingSeqNo, translogOps, listener, skipCreateRetentionLeaseStep);
}

@Override
void prepareTargetForTranslog(int totalTranslogOps, ActionListener<TimeValue> listener) {
prepareTargetForTranslogCalled.set(true);
super.prepareTargetForTranslog(totalTranslogOps, listener);
}

@Override
void phase2(
long startingSeqNo,
long endingSeqNo,
Translog.Snapshot snapshot,
long maxSeenAutoIdTimestamp,
long maxSeqNoOfUpdatesOrDeletes,
RetentionLeases retentionLeases,
long mappingVersion,
ActionListener<SendSnapshotResult> listener
) throws IOException {
phase2Called.set(true);
super.phase2(
startingSeqNo,
endingSeqNo,
snapshot,
maxSeenAutoIdTimestamp,
maxSeqNoOfUpdatesOrDeletes,
retentionLeases,
mappingVersion,
listener
);
}

};
PlainActionFuture<RecoveryResponse> future = new PlainActionFuture<>();
expectThrows(DelayRecoveryException.class, () -> {
handler.recoverToTarget(future);
future.actionGet();
});
verify(routingTable, times(5)).getByAllocationId(null);
assertFalse(phase1Called.get());
assertFalse(prepareTargetForTranslogCalled.get());
assertFalse(phase2Called.get());
}

/*
Tests when the replica allocation id is reflected in source nodes routing table even after 1 retry
*/
public void testTargetInRoutingInSecondAttempt() throws IOException {
final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, service);
final StartRecoveryRequest request = getStartRecoveryRequest();
final IndexShard shard = mock(IndexShard.class);
when(shard.seqNoStats()).thenReturn(mock(SeqNoStats.class));
when(shard.segmentStats(anyBoolean(), anyBoolean())).thenReturn(mock(SegmentsStats.class));
when(shard.isRelocatedPrimary()).thenReturn(false);
when(shard.getRetentionLeases()).thenReturn(mock(RetentionLeases.class));
final org.opensearch.index.shard.ReplicationGroup replicationGroup = mock(org.opensearch.index.shard.ReplicationGroup.class);
final IndexShardRoutingTable routingTable = mock(IndexShardRoutingTable.class);
final ShardRouting shardRouting = mock(ShardRouting.class);
when(shardRouting.initializing()).thenReturn(true);
when(shardRouting.currentNodeId()).thenReturn("node");
when(routingTable.getByAllocationId(any())).thenReturn(null, shardRouting);
when(shard.getReplicationGroup()).thenReturn(replicationGroup);
when(replicationGroup.getRoutingTable()).thenReturn(routingTable);
when(shard.acquireSafeIndexCommit()).thenReturn(mock(GatedCloseable.class));
doAnswer(invocation -> {
((ActionListener<Releasable>) invocation.getArguments()[0]).onResponse(() -> {});
return null;
}).when(shard).acquirePrimaryOperationPermit(any(), anyString(), any());

final IndexMetadata.Builder indexMetadata = IndexMetadata.builder("test")
.settings(
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, between(0, 5))
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 5))
.put(IndexMetadata.SETTING_VERSION_CREATED, VersionUtils.randomVersion(random()))
.put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random()))
);
if (randomBoolean()) {
indexMetadata.state(IndexMetadata.State.CLOSE);
}
when(shard.indexSettings()).thenReturn(new IndexSettings(indexMetadata.build(), Settings.EMPTY));

final AtomicBoolean phase1Called = new AtomicBoolean();
final AtomicBoolean prepareTargetForTranslogCalled = new AtomicBoolean();
final AtomicBoolean phase2Called = new AtomicBoolean();
final RecoverySourceHandler handler = new LocalStorePeerRecoverySourceHandler(
shard,
mock(RecoveryTargetHandler.class),
threadPool,
request,
Math.toIntExact(recoverySettings.getChunkSize().getBytes()),
between(1, 8),
between(1, 8)
) {

@Override
void phase1(
IndexCommit snapshot,
long startingSeqNo,
IntSupplier translogOps,
ActionListener<SendFileResult> listener,
boolean skipCreateRetentionLeaseStep
) {
phase1Called.set(true);
super.phase1(snapshot, startingSeqNo, translogOps, listener, skipCreateRetentionLeaseStep);
}

@Override
void prepareTargetForTranslog(int totalTranslogOps, ActionListener<TimeValue> listener) {
prepareTargetForTranslogCalled.set(true);
super.prepareTargetForTranslog(totalTranslogOps, listener);
}

@Override
void phase2(
long startingSeqNo,
long endingSeqNo,
Translog.Snapshot snapshot,
long maxSeenAutoIdTimestamp,
long maxSeqNoOfUpdatesOrDeletes,
RetentionLeases retentionLeases,
long mappingVersion,
ActionListener<SendSnapshotResult> listener
) throws IOException {
phase2Called.set(true);
super.phase2(
startingSeqNo,
endingSeqNo,
snapshot,
maxSeenAutoIdTimestamp,
maxSeqNoOfUpdatesOrDeletes,
retentionLeases,
mappingVersion,
listener
);
}

};
handler.waitForAssignmentPropagate(new SetOnce<>());
verify(routingTable, times(2)).getByAllocationId(null);
}

public void testCancellationsDoesNotLeakPrimaryPermits() throws Exception {
final CancellableThreads cancellableThreads = new CancellableThreads();
final IndexShard shard = mock(IndexShard.class);
Expand Down
Loading
Loading