From fa8aec486c40b2e5a8e682651a098f3a95baca5e Mon Sep 17 00:00:00 2001 From: "opensearch-trigger-bot[bot]" <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com> Date: Wed, 14 Jun 2023 10:23:17 -0700 Subject: [PATCH] [Remote Store] Ensure metadata file is deleted from local fs store during metadata upload failures (#7458) (#8062) * Ensure metadata file is deleted from local store during metadata upload failures or success * move finally inside synchonized block --------- (cherry picked from commit 00526f2477d229b3a7ef4bb81bd8c9a32fb7415d) Signed-off-by: bansvaru Signed-off-by: github-actions[bot] Co-authored-by: github-actions[bot] --- .../store/RemoteSegmentStoreDirectory.java | 65 ++++++++++++------- .../RemoteStoreRefreshListenerTests.java | 53 ++++++++++++++- 2 files changed, 94 insertions(+), 24 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index 93fa4b7eff7b7..93db8c191937d 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -28,6 +28,7 @@ import org.opensearch.common.io.VersionedCodecStreamWrapper; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler; +import java.io.FileNotFoundException; import java.io.IOException; import java.nio.file.NoSuchFileException; import java.util.Map; @@ -464,32 +465,50 @@ public void uploadMetadata( segmentInfosSnapshot.getGeneration(), this.commonFilenameSuffix ); - IndexOutput indexOutput = storeDirectory.createOutput(metadataFilename, IOContext.DEFAULT); - Map uploadedSegments = new HashMap<>(); - for (String file : segmentFiles) { - if (segmentsUploadedToRemoteStore.containsKey(file)) { - uploadedSegments.put(file, segmentsUploadedToRemoteStore.get(file).toString()); - } else { - throw new NoSuchFileException(file); + try { + IndexOutput indexOutput = storeDirectory.createOutput(metadataFilename, IOContext.DEFAULT); + Map uploadedSegments = new HashMap<>(); + for (String file : segmentFiles) { + if (segmentsUploadedToRemoteStore.containsKey(file)) { + uploadedSegments.put(file, segmentsUploadedToRemoteStore.get(file).toString()); + } else { + throw new NoSuchFileException(file); + } } + + ByteBuffersDataOutput byteBuffersIndexOutput = new ByteBuffersDataOutput(); + segmentInfosSnapshot.write(new ByteBuffersIndexOutput(byteBuffersIndexOutput, "Snapshot of SegmentInfos", "SegmentInfos")); + byte[] segmentInfoSnapshotByteArray = byteBuffersIndexOutput.toArrayCopy(); + + metadataStreamWrapper.writeStream( + indexOutput, + new RemoteSegmentMetadata( + RemoteSegmentMetadata.fromMapOfStrings(uploadedSegments), + segmentInfoSnapshotByteArray, + segmentInfosSnapshot.getGeneration() + ) + ); + indexOutput.close(); + storeDirectory.sync(Collections.singleton(metadataFilename)); + remoteMetadataDirectory.copyFrom(storeDirectory, metadataFilename, metadataFilename, IOContext.DEFAULT); + } finally { + tryAndDeleteLocalFile(metadataFilename, storeDirectory); } + } + } - ByteBuffersDataOutput byteBuffersIndexOutput = new ByteBuffersDataOutput(); - segmentInfosSnapshot.write(new ByteBuffersIndexOutput(byteBuffersIndexOutput, "Snapshot of SegmentInfos", "SegmentInfos")); - byte[] segmentInfoSnapshotByteArray = byteBuffersIndexOutput.toArrayCopy(); - - metadataStreamWrapper.writeStream( - indexOutput, - new RemoteSegmentMetadata( - RemoteSegmentMetadata.fromMapOfStrings(uploadedSegments), - segmentInfoSnapshotByteArray, - segmentInfosSnapshot.getGeneration() - ) - ); - indexOutput.close(); - storeDirectory.sync(Collections.singleton(metadataFilename)); - remoteMetadataDirectory.copyFrom(storeDirectory, metadataFilename, metadataFilename, IOContext.DEFAULT); - storeDirectory.deleteFile(metadataFilename); + /** + * Try to delete file from local store. Fails silently on failures + * @param filename: name of the file to be deleted + */ + private void tryAndDeleteLocalFile(String filename, Directory directory) { + try { + logger.trace("Deleting file: " + filename); + directory.deleteFile(filename); + } catch (NoSuchFileException | FileNotFoundException e) { + logger.trace("Exception while deleting. Missing file : " + filename, e); + } catch (IOException e) { + logger.warn("Exception while deleting: " + filename, e); } } diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index ce86a17cd0b23..b698b1a30ce20 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -40,8 +40,10 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicLong; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE; @@ -277,6 +279,32 @@ public void testRefreshSuccessOnSecondAttempt() throws Exception { assertEquals(1, segmentTracker.getTotalUploadsFailed()); } + /** + * Tests retry flow after snapshot and metadata files have been uploaded to remote store in the failed attempt. + * Snapshot and metadata files created in failed attempt should not break retry. + * @throws Exception + */ + public void testRefreshSuccessAfterFailureInFirstAttemptAfterSnapshotAndMetadataUpload() throws Exception { + int succeedOnAttempt = 1; + int checkpointPublishSucceedOnAttempt = 2; + // We spy on IndexShard.getReplicationTracker() to validate that we have tried running remote time as per the expectation. + CountDownLatch refreshCountLatch = new CountDownLatch(succeedOnAttempt); + // We spy on IndexShard.getEngine() to validate that we have successfully hit the terminal code for ascertaining successful upload. + // Value has been set as 6 as during a successful upload IndexShard.getEngine() is hit thrice and here we are running the flow twice + CountDownLatch successLatch = new CountDownLatch(3); + CountDownLatch reachedCheckpointPublishLatch = new CountDownLatch(0); + mockIndexShardWithRetryAndScheduleRefresh( + succeedOnAttempt, + refreshCountLatch, + successLatch, + checkpointPublishSucceedOnAttempt, + reachedCheckpointPublishLatch + ); + assertBusy(() -> assertEquals(0, refreshCountLatch.getCount())); + assertBusy(() -> assertEquals(0, successLatch.getCount())); + assertBusy(() -> assertEquals(0, reachedCheckpointPublishLatch.getCount())); + } + public void testRefreshSuccessOnThirdAttemptAttempt() throws Exception { // This covers 3 cases - 1) isRetry=false, shouldRetry=true 2) isRetry=true, shouldRetry=false 3) isRetry=True, shouldRetry=true // Succeed on 3rd attempt @@ -338,6 +366,17 @@ private Tuple m int succeedOnAttempt, CountDownLatch refreshCountLatch, CountDownLatch successLatch + ) throws IOException { + CountDownLatch noOpLatch = new CountDownLatch(0); + return mockIndexShardWithRetryAndScheduleRefresh(succeedOnAttempt, refreshCountLatch, successLatch, 1, noOpLatch); + } + + private Tuple mockIndexShardWithRetryAndScheduleRefresh( + int succeedOnAttempt, + CountDownLatch refreshCountLatch, + CountDownLatch successLatch, + int succeedCheckpointPublishOnAttempt, + CountDownLatch reachedCheckpointPublishLatch ) throws IOException { // Create index shard that we will be using to mock different methods in IndexShard for the unit test indexShard = newStartedShard( @@ -400,6 +439,18 @@ private Tuple m return indexShard.getEngine(); }).when(shard).getEngine(); + SegmentReplicationCheckpointPublisher emptyCheckpointPublisher = spy(SegmentReplicationCheckpointPublisher.EMPTY); + AtomicLong checkpointPublisherCounter = new AtomicLong(); + doAnswer(invocation -> { + if (checkpointPublisherCounter.incrementAndGet() <= succeedCheckpointPublishOnAttempt - 1) { + throw new RuntimeException("Inducing failure after snapshot info snapshot to test if snapshot info file is deleted"); + } + if (Objects.nonNull(reachedCheckpointPublishLatch)) { + reachedCheckpointPublishLatch.countDown(); + } + return null; + }).when(emptyCheckpointPublisher).publish(any(), any()); + clusterService = new ClusterService( Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), @@ -414,7 +465,7 @@ private Tuple m remoteRefreshSegmentPressureService.afterIndexShardCreated(shard); RemoteStoreRefreshListener refreshListener = new RemoteStoreRefreshListener( shard, - SegmentReplicationCheckpointPublisher.EMPTY, + emptyCheckpointPublisher, remoteRefreshSegmentPressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()) ); refreshListener.afterRefresh(true);