Skip to content

Commit

Permalink
[Remote Store] Ensure metadata file is deleted from local fs store du…
Browse files Browse the repository at this point in the history
…ring metadata upload failures (#7458) (#8062)

* Ensure metadata file is deleted from local store during metadata upload failures or success
* move finally inside synchonized block

---------


(cherry picked from commit 00526f2)

Signed-off-by: bansvaru <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent 5123e21 commit fa8aec4
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.opensearch.common.io.VersionedCodecStreamWrapper;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.util.Map;
Expand Down Expand Up @@ -464,32 +465,50 @@ public void uploadMetadata(
segmentInfosSnapshot.getGeneration(),
this.commonFilenameSuffix
);
IndexOutput indexOutput = storeDirectory.createOutput(metadataFilename, IOContext.DEFAULT);
Map<String, String> uploadedSegments = new HashMap<>();
for (String file : segmentFiles) {
if (segmentsUploadedToRemoteStore.containsKey(file)) {
uploadedSegments.put(file, segmentsUploadedToRemoteStore.get(file).toString());
} else {
throw new NoSuchFileException(file);
try {
IndexOutput indexOutput = storeDirectory.createOutput(metadataFilename, IOContext.DEFAULT);
Map<String, String> uploadedSegments = new HashMap<>();
for (String file : segmentFiles) {
if (segmentsUploadedToRemoteStore.containsKey(file)) {
uploadedSegments.put(file, segmentsUploadedToRemoteStore.get(file).toString());
} else {
throw new NoSuchFileException(file);
}
}

ByteBuffersDataOutput byteBuffersIndexOutput = new ByteBuffersDataOutput();
segmentInfosSnapshot.write(new ByteBuffersIndexOutput(byteBuffersIndexOutput, "Snapshot of SegmentInfos", "SegmentInfos"));
byte[] segmentInfoSnapshotByteArray = byteBuffersIndexOutput.toArrayCopy();

metadataStreamWrapper.writeStream(
indexOutput,
new RemoteSegmentMetadata(
RemoteSegmentMetadata.fromMapOfStrings(uploadedSegments),
segmentInfoSnapshotByteArray,
segmentInfosSnapshot.getGeneration()
)
);
indexOutput.close();
storeDirectory.sync(Collections.singleton(metadataFilename));
remoteMetadataDirectory.copyFrom(storeDirectory, metadataFilename, metadataFilename, IOContext.DEFAULT);
} finally {
tryAndDeleteLocalFile(metadataFilename, storeDirectory);
}
}
}

ByteBuffersDataOutput byteBuffersIndexOutput = new ByteBuffersDataOutput();
segmentInfosSnapshot.write(new ByteBuffersIndexOutput(byteBuffersIndexOutput, "Snapshot of SegmentInfos", "SegmentInfos"));
byte[] segmentInfoSnapshotByteArray = byteBuffersIndexOutput.toArrayCopy();

metadataStreamWrapper.writeStream(
indexOutput,
new RemoteSegmentMetadata(
RemoteSegmentMetadata.fromMapOfStrings(uploadedSegments),
segmentInfoSnapshotByteArray,
segmentInfosSnapshot.getGeneration()
)
);
indexOutput.close();
storeDirectory.sync(Collections.singleton(metadataFilename));
remoteMetadataDirectory.copyFrom(storeDirectory, metadataFilename, metadataFilename, IOContext.DEFAULT);
storeDirectory.deleteFile(metadataFilename);
/**
* Try to delete file from local store. Fails silently on failures
* @param filename: name of the file to be deleted
*/
private void tryAndDeleteLocalFile(String filename, Directory directory) {
try {
logger.trace("Deleting file: " + filename);
directory.deleteFile(filename);
} catch (NoSuchFileException | FileNotFoundException e) {
logger.trace("Exception while deleting. Missing file : " + filename, e);
} catch (IOException e) {
logger.warn("Exception while deleting: " + filename, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;

Expand Down Expand Up @@ -277,6 +279,32 @@ public void testRefreshSuccessOnSecondAttempt() throws Exception {
assertEquals(1, segmentTracker.getTotalUploadsFailed());
}

/**
* Tests retry flow after snapshot and metadata files have been uploaded to remote store in the failed attempt.
* Snapshot and metadata files created in failed attempt should not break retry.
* @throws Exception
*/
public void testRefreshSuccessAfterFailureInFirstAttemptAfterSnapshotAndMetadataUpload() throws Exception {
int succeedOnAttempt = 1;
int checkpointPublishSucceedOnAttempt = 2;
// We spy on IndexShard.getReplicationTracker() to validate that we have tried running remote time as per the expectation.
CountDownLatch refreshCountLatch = new CountDownLatch(succeedOnAttempt);
// We spy on IndexShard.getEngine() to validate that we have successfully hit the terminal code for ascertaining successful upload.
// Value has been set as 6 as during a successful upload IndexShard.getEngine() is hit thrice and here we are running the flow twice
CountDownLatch successLatch = new CountDownLatch(3);
CountDownLatch reachedCheckpointPublishLatch = new CountDownLatch(0);
mockIndexShardWithRetryAndScheduleRefresh(
succeedOnAttempt,
refreshCountLatch,
successLatch,
checkpointPublishSucceedOnAttempt,
reachedCheckpointPublishLatch
);
assertBusy(() -> assertEquals(0, refreshCountLatch.getCount()));
assertBusy(() -> assertEquals(0, successLatch.getCount()));
assertBusy(() -> assertEquals(0, reachedCheckpointPublishLatch.getCount()));
}

public void testRefreshSuccessOnThirdAttemptAttempt() throws Exception {
// This covers 3 cases - 1) isRetry=false, shouldRetry=true 2) isRetry=true, shouldRetry=false 3) isRetry=True, shouldRetry=true
// Succeed on 3rd attempt
Expand Down Expand Up @@ -338,6 +366,17 @@ private Tuple<RemoteStoreRefreshListener, RemoteRefreshSegmentPressureService> m
int succeedOnAttempt,
CountDownLatch refreshCountLatch,
CountDownLatch successLatch
) throws IOException {
CountDownLatch noOpLatch = new CountDownLatch(0);
return mockIndexShardWithRetryAndScheduleRefresh(succeedOnAttempt, refreshCountLatch, successLatch, 1, noOpLatch);
}

private Tuple<RemoteStoreRefreshListener, RemoteRefreshSegmentPressureService> mockIndexShardWithRetryAndScheduleRefresh(
int succeedOnAttempt,
CountDownLatch refreshCountLatch,
CountDownLatch successLatch,
int succeedCheckpointPublishOnAttempt,
CountDownLatch reachedCheckpointPublishLatch
) throws IOException {
// Create index shard that we will be using to mock different methods in IndexShard for the unit test
indexShard = newStartedShard(
Expand Down Expand Up @@ -400,6 +439,18 @@ private Tuple<RemoteStoreRefreshListener, RemoteRefreshSegmentPressureService> m
return indexShard.getEngine();
}).when(shard).getEngine();

SegmentReplicationCheckpointPublisher emptyCheckpointPublisher = spy(SegmentReplicationCheckpointPublisher.EMPTY);
AtomicLong checkpointPublisherCounter = new AtomicLong();
doAnswer(invocation -> {
if (checkpointPublisherCounter.incrementAndGet() <= succeedCheckpointPublishOnAttempt - 1) {
throw new RuntimeException("Inducing failure after snapshot info snapshot to test if snapshot info file is deleted");
}
if (Objects.nonNull(reachedCheckpointPublishLatch)) {
reachedCheckpointPublishLatch.countDown();
}
return null;
}).when(emptyCheckpointPublisher).publish(any(), any());

clusterService = new ClusterService(
Settings.EMPTY,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
Expand All @@ -414,7 +465,7 @@ private Tuple<RemoteStoreRefreshListener, RemoteRefreshSegmentPressureService> m
remoteRefreshSegmentPressureService.afterIndexShardCreated(shard);
RemoteStoreRefreshListener refreshListener = new RemoteStoreRefreshListener(
shard,
SegmentReplicationCheckpointPublisher.EMPTY,
emptyCheckpointPublisher,
remoteRefreshSegmentPressureService.getRemoteRefreshSegmentTracker(indexShard.shardId())
);
refreshListener.afterRefresh(true);
Expand Down

0 comments on commit fa8aec4

Please sign in to comment.