From d9e39bca17b1726989e2d4dded9b90a97d039570 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Sat, 25 Nov 2023 10:27:21 +0530 Subject: [PATCH] Prevent remote upload before CM starts the primary shard Signed-off-by: Ashish Singh --- .../org/opensearch/index/shard/IndexShard.java | 13 +++++++++++-- .../index/shard/RemoteStoreRefreshListener.java | 9 ++++----- .../shard/RemoteStoreRefreshListenerTests.java | 14 +++++++------- 3 files changed, 22 insertions(+), 14 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 252aa6592eae3..a70fdaaab014c 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -3881,10 +3881,10 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro circuitBreakerService, globalCheckpointSupplier, replicationTracker::getRetentionLeases, - () -> getOperationPrimaryTerm(), + this::getOperationPrimaryTerm, tombstoneDocSupplier(), isReadOnlyReplica, - replicationTracker::isPrimaryMode, + this::isStartedPrimary, translogFactorySupplier.apply(indexSettings, shardRouting), isTimeSeriesDescSortOptimizationEnabled() ? DataStream.TIMESERIES_LEAF_SORTER : null // DESC @timestamp default order for // timeseries @@ -3899,6 +3899,15 @@ public boolean isRemoteTranslogEnabled() { return indexSettings() != null && indexSettings().isRemoteTranslogStoreEnabled(); } + /** + * This checks if we are in state to upload to remote store. Until the cluster-manager informs the shard through + * cluster state, the shard will not be in STARTED state. This method is used to prevent pre-emptive segment or + * translog uploads. + */ + public boolean isStartedPrimary() { + return getReplicationTracker().isPrimaryMode() && state() == IndexShardState.STARTED; + } + /** * @return true if segment reverse search optimization is enabled for time series based workload. */ diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index dd40327298874..9ab2bed872a2f 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -489,8 +489,7 @@ private void initializeRemoteDirectoryOnTermUpdate() throws IOException { * @return true iff primaryMode is true and index shard is not in closed state. */ private boolean isReadyForUpload() { - boolean isReady = (indexShard.getReplicationTracker().isPrimaryMode() && indexShard.state() != IndexShardState.CLOSED) - || isLocalOrSnapshotRecovery(); + boolean isReady = indexShard.isStartedPrimary() || isLocalOrSnapshotRecovery(); if (isReady == false) { StringBuilder sb = new StringBuilder("Skipped syncing segments with"); @@ -503,11 +502,11 @@ private boolean isReadyForUpload() { if (indexShard.getEngineOrNull() != null) { sb.append(" engineType=").append(indexShard.getEngine().getClass().getSimpleName()); } - if (isLocalOrSnapshotRecovery() == false) { + if (indexShard.recoveryState() != null) { sb.append(" recoverySourceType=").append(indexShard.recoveryState().getRecoverySource().getType()); sb.append(" primary=").append(indexShard.shardRouting.primary()); } - logger.trace(sb.toString()); + logger.info(sb.toString()); } return isReady; } @@ -516,8 +515,8 @@ private boolean isLocalOrSnapshotRecovery() { // In this case when the primary mode is false, we need to upload segments to Remote Store // This is required in case of snapshots/shrink/ split/clone where we need to durable persist // all segments to remote before completing the recovery to ensure durability. - return (indexShard.state() == IndexShardState.RECOVERING && indexShard.shardRouting.primary()) + && indexShard.recoveryState() != null && (indexShard.recoveryState().getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS || indexShard.recoveryState().getRecoverySource().getType() == RecoverySource.Type.SNAPSHOT); } diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index 51814283c5eb3..a82950d0d72c3 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -319,7 +319,7 @@ public void testRefreshSuccessOnFirstAttempt() throws Exception { // This is the case of isRetry=false, shouldRetry=false // Succeed on 1st attempt int succeedOnAttempt = 1; - // We spy on IndexShard.getReplicationTracker() to validate that we have tried running remote time as per the expectation. + // We spy on IndexShard.isPrimaryStarted() to validate that we have tried running remote time as per the expectation. CountDownLatch refreshCountLatch = new CountDownLatch(succeedOnAttempt); // We spy on IndexShard.getEngine() to validate that we have successfully hit the terminal code for ascertaining successful upload. // Value has been set as 3 as during a successful upload IndexShard.getEngine() is hit thrice and with mockito we are counting down @@ -340,7 +340,7 @@ public void testRefreshSuccessOnSecondAttempt() throws Exception { // This covers 2 cases - 1) isRetry=false, shouldRetry=true 2) isRetry=true, shouldRetry=false // Succeed on 2nd attempt int succeedOnAttempt = 2; - // We spy on IndexShard.getReplicationTracker() to validate that we have tried running remote time as per the expectation. + // We spy on IndexShard.isPrimaryStarted() to validate that we have tried running remote time as per the expectation. CountDownLatch refreshCountLatch = new CountDownLatch(succeedOnAttempt); // We spy on IndexShard.getEngine() to validate that we have successfully hit the terminal code for ascertaining successful upload. // Value has been set as 3 as during a successful upload IndexShard.getEngine() is hit thrice and with mockito we are counting down @@ -364,7 +364,7 @@ public void testRefreshSuccessOnSecondAttempt() throws Exception { public void testRefreshSuccessAfterFailureInFirstAttemptAfterSnapshotAndMetadataUpload() throws Exception { int succeedOnAttempt = 1; int checkpointPublishSucceedOnAttempt = 2; - // We spy on IndexShard.getReplicationTracker() to validate that we have tried running remote time as per the expectation. + // We spy on IndexShard.isPrimaryStarted() to validate that we have tried running remote time as per the expectation. CountDownLatch refreshCountLatch = new CountDownLatch(succeedOnAttempt); // We spy on IndexShard.getEngine() to validate that we have successfully hit the terminal code for ascertaining successful upload. // Value has been set as 6 as during a successful upload IndexShard.getEngine() is hit thrice and here we are running the flow twice @@ -386,7 +386,7 @@ public void testRefreshSuccessOnThirdAttempt() throws Exception { // This covers 3 cases - 1) isRetry=false, shouldRetry=true 2) isRetry=true, shouldRetry=false 3) isRetry=True, shouldRetry=true // Succeed on 3rd attempt int succeedOnAttempt = 3; - // We spy on IndexShard.getReplicationTracker() to validate that we have tried running remote time as per the expectation. + // We spy on IndexShard.isPrimaryStarted() to validate that we have tried running remote time as per the expectation. CountDownLatch refreshCountLatch = new CountDownLatch(succeedOnAttempt); // We spy on IndexShard.getEngine() to validate that we have successfully hit the terminal code for ascertaining successful upload. // Value has been set as 3 as during a successful upload IndexShard.getEngine() is hit thrice and with mockito we are counting down @@ -478,6 +478,7 @@ private Tuple mockIn IndexShard shard = mock(IndexShard.class); Store store = mock(Store.class); when(shard.store()).thenReturn(store); + when(shard.state()).thenReturn(IndexShardState.STARTED); when(store.directory()).thenReturn(indexShard.store().directory()); // Mock (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory()) @@ -499,13 +500,12 @@ private Tuple mockIn when(shard.getThreadPool()).thenReturn(threadPool); // Mock indexShard.getReplicationTracker().isPrimaryMode() - doAnswer(invocation -> { if (Objects.nonNull(refreshCountLatch)) { refreshCountLatch.countDown(); } - return indexShard.getReplicationTracker(); - }).when(shard).getReplicationTracker(); + return true; + }).when(shard).isStartedPrimary(); AtomicLong counter = new AtomicLong(); // Mock indexShard.getSegmentInfosSnapshot()