Skip to content

Commit

Permalink
Add wait in replica recovery for allocation id to propagate on source…
Browse files Browse the repository at this point in the history
… node (#15558) (#16097)

Signed-off-by: Gaurav Bafna <[email protected]>
  • Loading branch information
gbbafna authored Sep 27, 2024
1 parent 6375b62 commit 5695d57
Show file tree
Hide file tree
Showing 5 changed files with 423 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,12 @@
import org.opensearch.action.StepListener;
import org.opensearch.action.support.ThreadedActionListener;
import org.opensearch.action.support.replication.ReplicationResponse;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.SetOnce;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.index.engine.RecoveryEngineException;
import org.opensearch.index.seqno.ReplicationTracker;
import org.opensearch.index.seqno.RetentionLease;
import org.opensearch.index.seqno.RetentionLeaseNotFoundException;
import org.opensearch.index.seqno.RetentionLeases;
Expand Down Expand Up @@ -58,21 +55,7 @@ public LocalStorePeerRecoverySourceHandler(
@Override
protected void innerRecoveryToTarget(ActionListener<RecoveryResponse> listener, Consumer<Exception> onFailure) throws IOException {
final SetOnce<RetentionLease> retentionLeaseRef = new SetOnce<>();

RunUnderPrimaryPermit.run(() -> {
final IndexShardRoutingTable routingTable = shard.getReplicationGroup().getRoutingTable();
ShardRouting targetShardRouting = routingTable.getByAllocationId(request.targetAllocationId());
if (targetShardRouting == null) {
logger.debug(
"delaying recovery of {} as it is not listed as assigned to target node {}",
request.shardId(),
request.targetNode()
);
throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node");
}
assert targetShardRouting.initializing() : "expected recovery target to be initializing but was " + targetShardRouting;
retentionLeaseRef.set(shard.getRetentionLeases().get(ReplicationTracker.getPeerRecoveryRetentionLeaseId(targetShardRouting)));
}, shardId + " validating recovery target [" + request.targetAllocationId() + "] registered ", shard, cancellableThreads, logger);
waitForAssignmentPropagate(retentionLeaseRef);
final Closeable retentionLock = shard.acquireHistoryRetentionLock();
resources.add(retentionLock);
final long startingSeqNo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,14 @@
import org.opensearch.LegacyESVersion;
import org.opensearch.action.ActionRunnable;
import org.opensearch.action.StepListener;
import org.opensearch.action.bulk.BackoffPolicy;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.action.support.ThreadedActionListener;
import org.opensearch.action.support.replication.ReplicationResponse;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.CheckedRunnable;
import org.opensearch.common.SetOnce;
import org.opensearch.common.StopWatch;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.lease.Releasable;
Expand All @@ -60,6 +64,7 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.index.engine.RecoveryEngineException;
import org.opensearch.index.seqno.ReplicationTracker;
import org.opensearch.index.seqno.RetentionLease;
import org.opensearch.index.seqno.RetentionLeaseNotFoundException;
import org.opensearch.index.seqno.RetentionLeases;
Expand All @@ -80,12 +85,14 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.IntSupplier;
import java.util.stream.StreamSupport;
Expand Down Expand Up @@ -192,6 +199,50 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
protected abstract void innerRecoveryToTarget(ActionListener<RecoveryResponse> listener, Consumer<Exception> onFailure)
throws IOException;

/*
Waits for cluster state propagation of assignment of replica on the target node
*/
void waitForAssignmentPropagate(SetOnce<RetentionLease> retentionLeaseRef) {
BackoffPolicy EXPONENTIAL_BACKOFF_POLICY = BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(1000), 5);
AtomicReference<ShardRouting> targetShardRouting = new AtomicReference<>();
Iterator<TimeValue> backoffDelayIterator = EXPONENTIAL_BACKOFF_POLICY.iterator();
while (backoffDelayIterator.hasNext()) {
RunUnderPrimaryPermit.run(() -> {
final IndexShardRoutingTable routingTable = shard.getReplicationGroup().getRoutingTable();
targetShardRouting.set(routingTable.getByAllocationId(request.targetAllocationId()));
if (targetShardRouting.get() == null) {
logger.info(
"delaying recovery of {} as it is not listed as assigned to target node {}",
request.shardId(),
request.targetNode()
);
Thread.sleep(backoffDelayIterator.next().millis());
}
if (targetShardRouting.get() != null) {
assert targetShardRouting.get().initializing() : "expected recovery target to be initializing but was "
+ targetShardRouting;
retentionLeaseRef.set(
shard.getRetentionLeases().get(ReplicationTracker.getPeerRecoveryRetentionLeaseId(targetShardRouting.get()))
);
}

},
shardId + " validating recovery target [" + request.targetAllocationId() + "] registered ",
shard,
cancellableThreads,
logger
);

if (targetShardRouting.get() != null) {
return;
}
}
if (targetShardRouting.get() != null) {
return;
}
throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node");
}

protected void finalizeStepAndCompleteFuture(
long startingSeqNo,
StepListener<SendSnapshotResult> sendSnapshotStep,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@

import org.apache.lucene.index.IndexCommit;
import org.opensearch.action.StepListener;
import org.opensearch.common.SetOnce;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.index.engine.RecoveryEngineException;
import org.opensearch.index.seqno.RetentionLease;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.RunUnderPrimaryPermit;
Expand Down Expand Up @@ -48,7 +50,8 @@ protected void innerRecoveryToTarget(ActionListener<RecoveryResponse> listener,
// A replica of an index with remote translog does not require the translogs locally and keeps receiving the
// updated segments file on refresh, flushes, and merges. In recovery, here, only file-based recovery is performed
// and there is no translog replay done.

final SetOnce<RetentionLease> retentionLeaseRef = new SetOnce<>();
waitForAssignmentPropagate(retentionLeaseRef);
final StepListener<SendFileResult> sendFileStep = new StepListener<>();
final StepListener<TimeValue> prepareEngineStep = new StepListener<>();
final StepListener<SendSnapshotResult> sendSnapshotStep = new StepListener<>();
Expand Down Expand Up @@ -102,4 +105,5 @@ protected void innerRecoveryToTarget(ActionListener<RecoveryResponse> listener,

finalizeStepAndCompleteFuture(startingSeqNo, sendSnapshotStep, sendFileStep, prepareEngineStep, onFailure);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.Numbers;
import org.opensearch.common.Randomness;
import org.opensearch.common.SetOnce;
Expand Down Expand Up @@ -141,6 +143,8 @@
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

/**
Expand Down Expand Up @@ -746,6 +750,206 @@ void phase2(
assertFalse(phase2Called.get());
}

/*
If the replica allocation id is not reflected in source nodes routing table even after retries,
recoveries should fail
*/
public void testThrowExceptionOnNoTargetInRouting() throws IOException {
final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, service);
final StartRecoveryRequest request = getStartRecoveryRequest();
final IndexShard shard = mock(IndexShard.class);
when(shard.seqNoStats()).thenReturn(mock(SeqNoStats.class));
when(shard.segmentStats(anyBoolean(), anyBoolean())).thenReturn(mock(SegmentsStats.class));
when(shard.isRelocatedPrimary()).thenReturn(false);
final org.opensearch.index.shard.ReplicationGroup replicationGroup = mock(org.opensearch.index.shard.ReplicationGroup.class);
final IndexShardRoutingTable routingTable = mock(IndexShardRoutingTable.class);
when(routingTable.getByAllocationId(anyString())).thenReturn(null);
when(shard.getReplicationGroup()).thenReturn(replicationGroup);
when(replicationGroup.getRoutingTable()).thenReturn(routingTable);
when(shard.acquireSafeIndexCommit()).thenReturn(mock(GatedCloseable.class));
doAnswer(invocation -> {
((ActionListener<Releasable>) invocation.getArguments()[0]).onResponse(() -> {});
return null;
}).when(shard).acquirePrimaryOperationPermit(any(), anyString(), any());

final IndexMetadata.Builder indexMetadata = IndexMetadata.builder("test")
.settings(
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, between(0, 5))
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 5))
.put(IndexMetadata.SETTING_VERSION_CREATED, VersionUtils.randomVersion(random()))
.put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random()))
);
if (randomBoolean()) {
indexMetadata.state(IndexMetadata.State.CLOSE);
}
when(shard.indexSettings()).thenReturn(new IndexSettings(indexMetadata.build(), Settings.EMPTY));

final AtomicBoolean phase1Called = new AtomicBoolean();
final AtomicBoolean prepareTargetForTranslogCalled = new AtomicBoolean();
final AtomicBoolean phase2Called = new AtomicBoolean();
final RecoverySourceHandler handler = new LocalStorePeerRecoverySourceHandler(
shard,
mock(RecoveryTargetHandler.class),
threadPool,
request,
Math.toIntExact(recoverySettings.getChunkSize().getBytes()),
between(1, 8),
between(1, 8)
) {

@Override
void phase1(
IndexCommit snapshot,
long startingSeqNo,
IntSupplier translogOps,
ActionListener<SendFileResult> listener,
boolean skipCreateRetentionLeaseStep
) {
phase1Called.set(true);
super.phase1(snapshot, startingSeqNo, translogOps, listener, skipCreateRetentionLeaseStep);
}

@Override
void prepareTargetForTranslog(int totalTranslogOps, ActionListener<TimeValue> listener) {
prepareTargetForTranslogCalled.set(true);
super.prepareTargetForTranslog(totalTranslogOps, listener);
}

@Override
void phase2(
long startingSeqNo,
long endingSeqNo,
Translog.Snapshot snapshot,
long maxSeenAutoIdTimestamp,
long maxSeqNoOfUpdatesOrDeletes,
RetentionLeases retentionLeases,
long mappingVersion,
ActionListener<SendSnapshotResult> listener
) throws IOException {
phase2Called.set(true);
super.phase2(
startingSeqNo,
endingSeqNo,
snapshot,
maxSeenAutoIdTimestamp,
maxSeqNoOfUpdatesOrDeletes,
retentionLeases,
mappingVersion,
listener
);
}

};
PlainActionFuture<RecoveryResponse> future = new PlainActionFuture<>();
expectThrows(DelayRecoveryException.class, () -> {
handler.recoverToTarget(future);
future.actionGet();
});
verify(routingTable, times(5)).getByAllocationId(null);
assertFalse(phase1Called.get());
assertFalse(prepareTargetForTranslogCalled.get());
assertFalse(phase2Called.get());
}

/*
Tests when the replica allocation id is reflected in source nodes routing table even after 1 retry
*/
public void testTargetInRoutingInSecondAttempt() throws IOException {
final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, service);
final StartRecoveryRequest request = getStartRecoveryRequest();
final IndexShard shard = mock(IndexShard.class);
when(shard.seqNoStats()).thenReturn(mock(SeqNoStats.class));
when(shard.segmentStats(anyBoolean(), anyBoolean())).thenReturn(mock(SegmentsStats.class));
when(shard.isRelocatedPrimary()).thenReturn(false);
when(shard.getRetentionLeases()).thenReturn(mock(RetentionLeases.class));
final org.opensearch.index.shard.ReplicationGroup replicationGroup = mock(org.opensearch.index.shard.ReplicationGroup.class);
final IndexShardRoutingTable routingTable = mock(IndexShardRoutingTable.class);
final ShardRouting shardRouting = mock(ShardRouting.class);
when(shardRouting.initializing()).thenReturn(true);
when(shardRouting.currentNodeId()).thenReturn("node");
when(routingTable.getByAllocationId(any())).thenReturn(null, shardRouting);
when(shard.getReplicationGroup()).thenReturn(replicationGroup);
when(replicationGroup.getRoutingTable()).thenReturn(routingTable);
when(shard.acquireSafeIndexCommit()).thenReturn(mock(GatedCloseable.class));
doAnswer(invocation -> {
((ActionListener<Releasable>) invocation.getArguments()[0]).onResponse(() -> {});
return null;
}).when(shard).acquirePrimaryOperationPermit(any(), anyString(), any());

final IndexMetadata.Builder indexMetadata = IndexMetadata.builder("test")
.settings(
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, between(0, 5))
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 5))
.put(IndexMetadata.SETTING_VERSION_CREATED, VersionUtils.randomVersion(random()))
.put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random()))
);
if (randomBoolean()) {
indexMetadata.state(IndexMetadata.State.CLOSE);
}
when(shard.indexSettings()).thenReturn(new IndexSettings(indexMetadata.build(), Settings.EMPTY));

final AtomicBoolean phase1Called = new AtomicBoolean();
final AtomicBoolean prepareTargetForTranslogCalled = new AtomicBoolean();
final AtomicBoolean phase2Called = new AtomicBoolean();
final RecoverySourceHandler handler = new LocalStorePeerRecoverySourceHandler(
shard,
mock(RecoveryTargetHandler.class),
threadPool,
request,
Math.toIntExact(recoverySettings.getChunkSize().getBytes()),
between(1, 8),
between(1, 8)
) {

@Override
void phase1(
IndexCommit snapshot,
long startingSeqNo,
IntSupplier translogOps,
ActionListener<SendFileResult> listener,
boolean skipCreateRetentionLeaseStep
) {
phase1Called.set(true);
super.phase1(snapshot, startingSeqNo, translogOps, listener, skipCreateRetentionLeaseStep);
}

@Override
void prepareTargetForTranslog(int totalTranslogOps, ActionListener<TimeValue> listener) {
prepareTargetForTranslogCalled.set(true);
super.prepareTargetForTranslog(totalTranslogOps, listener);
}

@Override
void phase2(
long startingSeqNo,
long endingSeqNo,
Translog.Snapshot snapshot,
long maxSeenAutoIdTimestamp,
long maxSeqNoOfUpdatesOrDeletes,
RetentionLeases retentionLeases,
long mappingVersion,
ActionListener<SendSnapshotResult> listener
) throws IOException {
phase2Called.set(true);
super.phase2(
startingSeqNo,
endingSeqNo,
snapshot,
maxSeenAutoIdTimestamp,
maxSeqNoOfUpdatesOrDeletes,
retentionLeases,
mappingVersion,
listener
);
}

};
handler.waitForAssignmentPropagate(new SetOnce<>());
verify(routingTable, times(2)).getByAllocationId(null);
}

public void testCancellationsDoesNotLeakPrimaryPermits() throws Exception {
final CancellableThreads cancellableThreads = new CancellableThreads();
final IndexShard shard = mock(IndexShard.class);
Expand Down
Loading

0 comments on commit 5695d57

Please sign in to comment.