Skip to content

Commit

Permalink
Prevent remote upload before CM starts the primary shard
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <[email protected]>
  • Loading branch information
ashking94 committed Nov 25, 2023
1 parent 1fca1a7 commit d9e39bc
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 14 deletions.
13 changes: 11 additions & 2 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -3881,10 +3881,10 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro
circuitBreakerService,
globalCheckpointSupplier,
replicationTracker::getRetentionLeases,
() -> getOperationPrimaryTerm(),
this::getOperationPrimaryTerm,
tombstoneDocSupplier(),
isReadOnlyReplica,
replicationTracker::isPrimaryMode,
this::isStartedPrimary,
translogFactorySupplier.apply(indexSettings, shardRouting),
isTimeSeriesDescSortOptimizationEnabled() ? DataStream.TIMESERIES_LEAF_SORTER : null // DESC @timestamp default order for
// timeseries
Expand All @@ -3899,6 +3899,15 @@ public boolean isRemoteTranslogEnabled() {
return indexSettings() != null && indexSettings().isRemoteTranslogStoreEnabled();
}

/**
* This checks if we are in state to upload to remote store. Until the cluster-manager informs the shard through
* cluster state, the shard will not be in STARTED state. This method is used to prevent pre-emptive segment or
* translog uploads.
*/
public boolean isStartedPrimary() {
return getReplicationTracker().isPrimaryMode() && state() == IndexShardState.STARTED;
}

/**
* @return true if segment reverse search optimization is enabled for time series based workload.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -489,8 +489,7 @@ private void initializeRemoteDirectoryOnTermUpdate() throws IOException {
* @return true iff primaryMode is true and index shard is not in closed state.
*/
private boolean isReadyForUpload() {
boolean isReady = (indexShard.getReplicationTracker().isPrimaryMode() && indexShard.state() != IndexShardState.CLOSED)
|| isLocalOrSnapshotRecovery();
boolean isReady = indexShard.isStartedPrimary() || isLocalOrSnapshotRecovery();

if (isReady == false) {
StringBuilder sb = new StringBuilder("Skipped syncing segments with");
Expand All @@ -503,11 +502,11 @@ private boolean isReadyForUpload() {
if (indexShard.getEngineOrNull() != null) {
sb.append(" engineType=").append(indexShard.getEngine().getClass().getSimpleName());
}
if (isLocalOrSnapshotRecovery() == false) {
if (indexShard.recoveryState() != null) {
sb.append(" recoverySourceType=").append(indexShard.recoveryState().getRecoverySource().getType());
sb.append(" primary=").append(indexShard.shardRouting.primary());
}
logger.trace(sb.toString());
logger.info(sb.toString());
}
return isReady;
}
Expand All @@ -516,8 +515,8 @@ private boolean isLocalOrSnapshotRecovery() {
// In this case when the primary mode is false, we need to upload segments to Remote Store
// This is required in case of snapshots/shrink/ split/clone where we need to durable persist
// all segments to remote before completing the recovery to ensure durability.

return (indexShard.state() == IndexShardState.RECOVERING && indexShard.shardRouting.primary())
&& indexShard.recoveryState() != null
&& (indexShard.recoveryState().getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS
|| indexShard.recoveryState().getRecoverySource().getType() == RecoverySource.Type.SNAPSHOT);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ public void testRefreshSuccessOnFirstAttempt() throws Exception {
// This is the case of isRetry=false, shouldRetry=false
// Succeed on 1st attempt
int succeedOnAttempt = 1;
// We spy on IndexShard.getReplicationTracker() to validate that we have tried running remote time as per the expectation.
// We spy on IndexShard.isPrimaryStarted() to validate that we have tried running remote time as per the expectation.
CountDownLatch refreshCountLatch = new CountDownLatch(succeedOnAttempt);
// We spy on IndexShard.getEngine() to validate that we have successfully hit the terminal code for ascertaining successful upload.
// Value has been set as 3 as during a successful upload IndexShard.getEngine() is hit thrice and with mockito we are counting down
Expand All @@ -340,7 +340,7 @@ public void testRefreshSuccessOnSecondAttempt() throws Exception {
// This covers 2 cases - 1) isRetry=false, shouldRetry=true 2) isRetry=true, shouldRetry=false
// Succeed on 2nd attempt
int succeedOnAttempt = 2;
// We spy on IndexShard.getReplicationTracker() to validate that we have tried running remote time as per the expectation.
// We spy on IndexShard.isPrimaryStarted() to validate that we have tried running remote time as per the expectation.
CountDownLatch refreshCountLatch = new CountDownLatch(succeedOnAttempt);
// We spy on IndexShard.getEngine() to validate that we have successfully hit the terminal code for ascertaining successful upload.
// Value has been set as 3 as during a successful upload IndexShard.getEngine() is hit thrice and with mockito we are counting down
Expand All @@ -364,7 +364,7 @@ public void testRefreshSuccessOnSecondAttempt() throws Exception {
public void testRefreshSuccessAfterFailureInFirstAttemptAfterSnapshotAndMetadataUpload() throws Exception {
int succeedOnAttempt = 1;
int checkpointPublishSucceedOnAttempt = 2;
// We spy on IndexShard.getReplicationTracker() to validate that we have tried running remote time as per the expectation.
// We spy on IndexShard.isPrimaryStarted() to validate that we have tried running remote time as per the expectation.
CountDownLatch refreshCountLatch = new CountDownLatch(succeedOnAttempt);
// We spy on IndexShard.getEngine() to validate that we have successfully hit the terminal code for ascertaining successful upload.
// Value has been set as 6 as during a successful upload IndexShard.getEngine() is hit thrice and here we are running the flow twice
Expand All @@ -386,7 +386,7 @@ public void testRefreshSuccessOnThirdAttempt() throws Exception {
// This covers 3 cases - 1) isRetry=false, shouldRetry=true 2) isRetry=true, shouldRetry=false 3) isRetry=True, shouldRetry=true
// Succeed on 3rd attempt
int succeedOnAttempt = 3;
// We spy on IndexShard.getReplicationTracker() to validate that we have tried running remote time as per the expectation.
// We spy on IndexShard.isPrimaryStarted() to validate that we have tried running remote time as per the expectation.
CountDownLatch refreshCountLatch = new CountDownLatch(succeedOnAttempt);
// We spy on IndexShard.getEngine() to validate that we have successfully hit the terminal code for ascertaining successful upload.
// Value has been set as 3 as during a successful upload IndexShard.getEngine() is hit thrice and with mockito we are counting down
Expand Down Expand Up @@ -478,6 +478,7 @@ private Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> mockIn
IndexShard shard = mock(IndexShard.class);
Store store = mock(Store.class);
when(shard.store()).thenReturn(store);
when(shard.state()).thenReturn(IndexShardState.STARTED);
when(store.directory()).thenReturn(indexShard.store().directory());

// Mock (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory())
Expand All @@ -499,13 +500,12 @@ private Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> mockIn
when(shard.getThreadPool()).thenReturn(threadPool);

// Mock indexShard.getReplicationTracker().isPrimaryMode()

doAnswer(invocation -> {
if (Objects.nonNull(refreshCountLatch)) {
refreshCountLatch.countDown();
}
return indexShard.getReplicationTracker();
}).when(shard).getReplicationTracker();
return true;
}).when(shard).isStartedPrimary();

AtomicLong counter = new AtomicLong();
// Mock indexShard.getSegmentInfosSnapshot()
Expand Down

0 comments on commit d9e39bc

Please sign in to comment.