diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index c650edc31da8d..5e1318fe047d8 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -123,14 +123,10 @@ public void beforeRefresh() throws IOException {} @Override protected void runAfterRefreshExactlyOnce(boolean didRefresh) { - if (shouldSync(didRefresh)) { + if (shouldSync(didRefresh) && isReadyForUpload()) { segmentTracker.updateLocalRefreshTimeAndSeqNo(); try { - if (this.primaryTerm != indexShard.getOperationPrimaryTerm()) { - logger.debug("primaryTerm update from={} to={}", primaryTerm, indexShard.getOperationPrimaryTerm()); - this.primaryTerm = indexShard.getOperationPrimaryTerm(); - this.remoteDirectory.init(); - } + initializeRemoteDirectoryOnTermUpdate(); try (GatedCloseable segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) { Collection localSegmentsPostRefresh = segmentInfosGatedCloseable.get().files(true); updateLocalSizeMapAndTracker(localSegmentsPostRefresh); @@ -164,16 +160,12 @@ private boolean shouldSync(boolean didRefresh) { // is important to upload the zero state segments so that the restore does not break. return this.primaryTerm != indexShard.getOperationPrimaryTerm() || didRefresh - || remoteDirectory.getSegmentsUploadedToRemoteStore().isEmpty(); + || remoteDirectory.getSegmentsUploadedToRemoteStore().isEmpty() + || isRefreshAfterCommitSafe(); } private boolean syncSegments() { - if (indexShard.getReplicationTracker().isPrimaryMode() == false || indexShard.state() == IndexShardState.CLOSED) { - logger.debug( - "Skipped syncing segments with primaryMode={} indexShardState={}", - indexShard.getReplicationTracker().isPrimaryMode(), - indexShard.state() - ); + if (isReadyForUpload() == false) { // Following check is required to enable retry and make sure that we do not lose this refresh event // When primary shard is restored from remote store, the recovery happens first followed by changing // primaryMode to true. Due to this, the refresh that is triggered post replay of translog will not go through @@ -323,6 +315,15 @@ private boolean isRefreshAfterCommit() throws IOException { && !remoteDirectory.containsFile(lastCommittedLocalSegmentFileName, getChecksumOfLocalFile(lastCommittedLocalSegmentFileName))); } + private boolean isRefreshAfterCommitSafe() { + try { + return isRefreshAfterCommit(); + } catch (Exception e) { + logger.info("Exception occurred in isRefreshAfterCommitSafe", e); + } + return false; + } + void uploadMetadata(Collection localSegmentsPostRefresh, SegmentInfos segmentInfos, ReplicationCheckpoint replicationCheckpoint) throws IOException { final long maxSeqNo = ((InternalEngine) indexShard.getEngine()).currentOngoingRefreshCheckpoint(); @@ -439,6 +440,39 @@ private void updateFinalStatusInSegmentTracker(boolean uploadStatus, long bytesB } } + private void initializeRemoteDirectoryOnTermUpdate() throws IOException { + if (this.primaryTerm != indexShard.getOperationPrimaryTerm()) { + logger.trace("primaryTerm update from={} to={}", primaryTerm, indexShard.getOperationPrimaryTerm()); + this.primaryTerm = indexShard.getOperationPrimaryTerm(); + RemoteSegmentMetadata uploadedMetadata = this.remoteDirectory.init(); + if (uploadedMetadata != null) { + segmentTracker.setLatestUploadedFiles(uploadedMetadata.getMetadata().keySet()); + } + } + } + + private boolean isReadyForUpload() { + boolean isReady = indexShard.getReplicationTracker().isPrimaryMode() && indexShard.state() != IndexShardState.CLOSED; + if (isReady == false) { + logOnSyncSkip(); + } + return isReady; + } + + private void logOnSyncSkip() { + StringBuilder sb = new StringBuilder("Skipped syncing segments with"); + if (indexShard.getReplicationTracker() != null) { + sb.append(" primaryMode=").append(indexShard.getReplicationTracker().isPrimaryMode()); + } + if (indexShard.state() != null) { + sb.append(" indexShardState=").append(indexShard.state()); + } + if (indexShard.getEngineOrNull() != null) { + sb.append(" engineType=").append(indexShard.getEngine().getClass().getSimpleName()); + } + logger.trace(sb.toString()); + } + /** * Creates an {@link UploadListener} containing the stats population logic which would be triggered before and after segment upload events */