diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index 2519a4ee48051..8f520c0e007bb 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -197,9 +197,9 @@ private synchronized void syncSegments(boolean isRetry) { long refreshTimeMs = segmentTracker.getLocalRefreshTimeMs(), refreshClockTimeMs = segmentTracker.getLocalRefreshClockTimeMs(); long refreshSeqNo = segmentTracker.getLocalRefreshSeqNo(); long bytesBeforeUpload = segmentTracker.getUploadBytesSucceeded(), startTimeInNS = System.nanoTime(); + final AtomicBoolean shouldRetry = new AtomicBoolean(true); try { - if (this.primaryTerm != indexShard.getOperationPrimaryTerm()) { this.primaryTerm = indexShard.getOperationPrimaryTerm(); this.remoteDirectory.init(); @@ -252,7 +252,6 @@ private synchronized void syncSegments(boolean isRetry) { ActionListener segmentUploadsCompletedListener = new LatchedActionListener<>(new ActionListener<>() { @Override public void onResponse(Void unused) { - boolean shouldRetry = true; try { // Start metadata file upload uploadMetadata(localSegmentsPostRefresh, segmentInfos); @@ -266,27 +265,18 @@ public void onResponse(Void unused) { ); // At this point since we have uploaded new segments, segment infos and segment metadata file, // along with marking minSeqNoToKeep, upload has succeeded completely. - shouldRetry = false; + shouldRetry.set(false); } catch (Exception e) { // We don't want to fail refresh if upload of new segments fails. The missed segments will be re-tried // in the next refresh. This should not affect durability of the indexed data after remote trans-log // integration. logger.warn("Exception in post new segment upload actions", e); - } finally { - doComplete(shouldRetry); } } @Override public void onFailure(Exception e) { logger.warn("Exception while uploading new segments to the remote segment store", e); - doComplete(true); - } - - private void doComplete(boolean shouldRetry) { - // Update the segment tracker with the final upload status as seen at the end - updateFinalUploadStatusInSegmentTracker(shouldRetry == false, bytesBeforeUpload, startTimeInNS); - afterSegmentsSync(isRetry, shouldRetry); } }, latch); @@ -305,6 +295,8 @@ private void doComplete(boolean shouldRetry) { } catch (Throwable t) { logger.error("Exception in RemoteStoreRefreshListener.afterRefresh()", t); } + updateFinalStatusInSegmentTracker(shouldRetry.get() == false, bytesBeforeUpload, startTimeInNS); + afterSegmentsSync(isRetry, shouldRetry.get()); } /** @@ -517,7 +509,7 @@ private void updateLocalSizeMapAndTracker(Collection segmentFiles) { segmentTracker.setLatestLocalFileNameLengthMap(latestFileNameSizeOnLocalMap); } - private void updateFinalUploadStatusInSegmentTracker(boolean uploadStatus, long bytesBeforeUpload, long startTimeInNS) { + private void updateFinalStatusInSegmentTracker(boolean uploadStatus, long bytesBeforeUpload, long startTimeInNS) { if (uploadStatus) { long bytesUploaded = segmentTracker.getUploadBytesSucceeded() - bytesBeforeUpload; long timeTakenInMS = (System.nanoTime() - startTimeInNS) / 1_000_000L; diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index 395ecba442e86..d3e8d961337cc 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -581,30 +581,32 @@ public void uploadMetadata( RemoteSegmentMetadata.CURRENT_VERSION ); try { - IndexOutput indexOutput = storeDirectory.createOutput(metadataFilename, IOContext.DEFAULT); - Map uploadedSegments = new HashMap<>(); - for (String file : segmentFiles) { - if (segmentsUploadedToRemoteStore.containsKey(file)) { - uploadedSegments.put(file, segmentsUploadedToRemoteStore.get(file).toString()); - } else { - throw new NoSuchFileException(file); + try (IndexOutput indexOutput = storeDirectory.createOutput(metadataFilename, IOContext.DEFAULT)) { + Map uploadedSegments = new HashMap<>(); + for (String file : segmentFiles) { + if (segmentsUploadedToRemoteStore.containsKey(file)) { + uploadedSegments.put(file, segmentsUploadedToRemoteStore.get(file).toString()); + } else { + throw new NoSuchFileException(file); + } } - } - ByteBuffersDataOutput byteBuffersIndexOutput = new ByteBuffersDataOutput(); - segmentInfosSnapshot.write(new ByteBuffersIndexOutput(byteBuffersIndexOutput, "Snapshot of SegmentInfos", "SegmentInfos")); - byte[] segmentInfoSnapshotByteArray = byteBuffersIndexOutput.toArrayCopy(); - - metadataStreamWrapper.writeStream( - indexOutput, - new RemoteSegmentMetadata( - RemoteSegmentMetadata.fromMapOfStrings(uploadedSegments), - segmentInfoSnapshotByteArray, - primaryTerm, - segmentInfosSnapshot.getGeneration() - ) - ); - indexOutput.close(); + ByteBuffersDataOutput byteBuffersIndexOutput = new ByteBuffersDataOutput(); + segmentInfosSnapshot.write( + new ByteBuffersIndexOutput(byteBuffersIndexOutput, "Snapshot of SegmentInfos", "SegmentInfos") + ); + byte[] segmentInfoSnapshotByteArray = byteBuffersIndexOutput.toArrayCopy(); + + metadataStreamWrapper.writeStream( + indexOutput, + new RemoteSegmentMetadata( + RemoteSegmentMetadata.fromMapOfStrings(uploadedSegments), + segmentInfoSnapshotByteArray, + primaryTerm, + segmentInfosSnapshot.getGeneration() + ) + ); + } storeDirectory.sync(Collections.singleton(metadataFilename)); remoteMetadataDirectory.copyFrom(storeDirectory, metadataFilename, metadataFilename, IOContext.DEFAULT); } finally { diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index 782450286cb90..6f38f080e5035 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -13,7 +13,6 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.store.FilterDirectory; import org.apache.lucene.tests.store.BaseDirectoryWrapper; -import org.apache.lucene.tests.util.LuceneTestCase; import org.junit.After; import org.opensearch.action.ActionListener; import org.opensearch.cluster.metadata.IndexMetadata; @@ -48,7 +47,6 @@ import static org.mockito.Mockito.when; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE; -@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8549") public class RemoteStoreRefreshListenerTests extends IndexShardTestCase { private IndexShard indexShard; private ClusterService clusterService;