Skip to content

Commit

Permalink
Handle exceptions on file read
Browse files Browse the repository at this point in the history
Signed-off-by: Suraj Singh <[email protected]>
  • Loading branch information
dreamer-89 committed Oct 25, 2023
1 parent 6779633 commit 0514aad
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.opensearch.indices.replication.common.ReplicationTarget;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Locale;
import java.util.Set;
Expand Down Expand Up @@ -195,7 +194,13 @@ private List<StoreFileMetadata> getFiles(CheckpointInfoResponse checkpointInfo)
// set of local files that can be reused
final Set<String> reuseFiles = diff.missing.stream()
.filter(storeFileMetadata -> localFiles.contains(storeFileMetadata.name()))
.filter(this::validateLocalChecksum)
.filter((storeFileMetadata) -> {
try {
return validateLocalChecksum(storeFileMetadata);
} catch (IOException e) {
throw new RuntimeException(e);
}
})
.map(StoreFileMetadata::name)
.collect(Collectors.toSet());

Expand Down Expand Up @@ -232,7 +237,7 @@ private List<StoreFileMetadata> getFiles(CheckpointInfoResponse checkpointInfo)
return missingFiles;
}

private boolean validateLocalChecksum(StoreFileMetadata file) {
private boolean validateLocalChecksum(StoreFileMetadata file) throws IOException {
try (IndexInput indexInput = indexShard.store().directory().openInput(file.name(), IOContext.DEFAULT)) {
String checksum = Store.digestToString(CodecUtil.retrieveChecksum(indexInput));
if (file.checksum().equals(checksum)) {
Expand All @@ -243,7 +248,11 @@ private boolean validateLocalChecksum(StoreFileMetadata file) {
return false;
}
} catch (IOException e) {
throw new UncheckedIOException("Error reading " + file, e);
logger.warn("Error reading " + file, e);
// Delete file on exceptions so that it can be re-downloaded. This is safe to do as this file is on disk
// file not referenced by reader.
indexShard.store().directory().deleteFile(file.name());
return false;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,20 @@
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.ReplicationFailedException;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.CorruptionUtils;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;

import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -371,37 +373,9 @@ public void testSegRepSucceedsOnPreviousCopiedFiles() throws Exception {

final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class);
final SegmentReplicationTargetService targetService = newTargetService(sourceFactory);
Runnable[] runAfterGetFiles = { () -> { throw new RuntimeException("Simulated"); }, () -> {} };
AtomicInteger index = new AtomicInteger(0);
RemoteStoreReplicationSource testRSReplicationSource = new RemoteStoreReplicationSource(replica) {
@Override
public void getCheckpointMetadata(
long replicationId,
ReplicationCheckpoint checkpoint,
ActionListener<CheckpointInfoResponse> listener
) {
super.getCheckpointMetadata(replicationId, checkpoint, listener);
}

@Override
public void getSegmentFiles(
long replicationId,
ReplicationCheckpoint checkpoint,
List<StoreFileMetadata> filesToFetch,
IndexShard indexShard,
BiConsumer<String, Long> fileProgressTracker,
ActionListener<GetSegmentFilesResponse> listener
) {
super.getSegmentFiles(replicationId, checkpoint, filesToFetch, indexShard, (fileName, bytesRecovered) -> {}, listener);
runAfterGetFiles[index.getAndIncrement()].run();
}

@Override
public String getDescription() {
return "TestRemoteStoreReplicationSource";
}
};
when(sourceFactory.get(any())).thenReturn(testRSReplicationSource);
when(sourceFactory.get(any())).thenReturn(
getRemoteStoreReplicationSource(replica, () -> { throw new RuntimeException("Simulated"); })
);
CountDownLatch latch = new CountDownLatch(1);

// Start first round of segment replication. This should fail with simulated error but with replica having
Expand Down Expand Up @@ -439,7 +413,8 @@ public void onReplicationFailure(
assertEquals("Files should be copied to disk", false, onDiskFiles.isEmpty());
assertEquals(target.state().getStage(), SegmentReplicationState.Stage.GET_FILES);

// Start next round of segment replication
// Start next round of segment replication and not throwing exception resulting in commit on replica
when(sourceFactory.get(any())).thenReturn(getRemoteStoreReplicationSource(replica, () -> {}));
CountDownLatch waitForSecondRound = new CountDownLatch(1);
final SegmentReplicationTarget newTarget = targetService.startReplication(
replica,
Expand Down Expand Up @@ -471,6 +446,138 @@ public void onReplicationFailure(
}
}

/**
* This test validates that partial written files (non-readable) on disk are deleted vs throwing corruption exception
* due to FileAlreadyExistsException. It does so by only copying files in first round of segment replication without
* commit and corrupt one segment file and ensure that file is deleted and next round of segment replication goes
* through without any issues.
*/
public void testNoFailuresOnPartialFiles() throws Exception {
try (ReplicationGroup shards = createGroup(1, getIndexSettings(), new NRTReplicationEngineFactory())) {
shards.startAll();
IndexShard primary = shards.getPrimary();
final IndexShard replica = shards.getReplicas().get(0);

final int docCount = 10;
shards.indexDocs(docCount);
primary.refresh("Test");

final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class);
final SegmentReplicationTargetService targetService = newTargetService(sourceFactory);

when(sourceFactory.get(any())).thenReturn(
getRemoteStoreReplicationSource(replica, () -> { throw new RuntimeException("Simulated"); })
);
CountDownLatch latch = new CountDownLatch(1);

// Start first round of segment replication. This should fail with simulated error but with replica having
// files in its local store but not in active reader.
final SegmentReplicationTarget target = targetService.startReplication(
replica,
primary.getLatestReplicationCheckpoint(),
new SegmentReplicationTargetService.SegmentReplicationListener() {
@Override
public void onReplicationDone(SegmentReplicationState state) {
Assert.fail("Replication should fail with simulated error");
}

@Override
public void onReplicationFailure(
SegmentReplicationState state,
ReplicationFailedException e,
boolean sendShardFailure
) {
assertFalse(sendShardFailure);
latch.countDown();
}
}
);
latch.await();
Set<String> onDiskFiles = new HashSet<>(Arrays.asList(replica.store().directory().listAll()));
onDiskFiles.removeIf(name -> EXCLUDE_FILES.contains(name) || name.startsWith(IndexFileNames.SEGMENTS));
List<String> activeFiles = replica.getSegmentMetadataMap()
.values()
.stream()
.map(metadata -> metadata.name())
.collect(Collectors.toList());
assertTrue("Files should not be committed", activeFiles.isEmpty());
assertEquals("Files should be copied to disk", false, onDiskFiles.isEmpty());
assertEquals(target.state().getStage(), SegmentReplicationState.Stage.GET_FILES);

logger.info("--> Primary store {}", Arrays.toString(primary.store().directory().listAll()));
logger.info("--> Replica store {}", Arrays.toString(replica.store().directory().listAll()));
logger.info(
"--> Replica reader {}",
replica.getSegmentMetadataMap().values().stream().map(m -> m.name()).collect(Collectors.toList())
);

String fileToCorrupt = null;
// Corrupt one data file
Path shardPath = replica.shardPath().getDataPath().resolve(ShardPath.INDEX_FOLDER_NAME);
for (String file : replica.store().directory().listAll()) {
if (file.equals("write.lock") || file.startsWith("extra") || file.startsWith("segment")) {
continue;
}
fileToCorrupt = file;
logger.info("--> Corrupting file {}", fileToCorrupt);
try (FileChannel raf = FileChannel.open(shardPath.resolve(file), StandardOpenOption.READ, StandardOpenOption.WRITE)) {
CorruptionUtils.corruptAt(shardPath.resolve(file), raf, (int) (raf.size() - 8));
}
break;
}
Assert.assertNotNull(fileToCorrupt);

// Ingest more data and start next round of segment replication
shards.indexDocs(docCount);
primary.refresh("Post corruption");
replicateSegments(primary, List.of(replica));
activeFiles = replica.getSegmentMetadataMap().values().stream().map(metadata -> metadata.name()).collect(Collectors.toList());
assertTrue("Replica should have consistent disk & reader", activeFiles.containsAll(onDiskFiles));

assertDocCount(primary, 2 * docCount);
assertDocCount(replica, 2 * docCount);

final Store.RecoveryDiff diff = Store.segmentReplicationDiff(primary.getSegmentMetadataMap(), replica.getSegmentMetadataMap());
assertTrue(diff.missing.isEmpty());
assertTrue(diff.different.isEmpty());

// clean up
shards.removeReplica(replica);
closeShards(replica);
}
}

private RemoteStoreReplicationSource getRemoteStoreReplicationSource(IndexShard shard, Runnable postGetFilesRunnable) {
return new RemoteStoreReplicationSource(shard) {
@Override
public void getCheckpointMetadata(
long replicationId,
ReplicationCheckpoint checkpoint,
ActionListener<CheckpointInfoResponse> listener
) {
super.getCheckpointMetadata(replicationId, checkpoint, listener);
}

@Override
public void getSegmentFiles(
long replicationId,
ReplicationCheckpoint checkpoint,
List<StoreFileMetadata> filesToFetch,
IndexShard indexShard,
BiConsumer<String, Long> fileProgressTracker,
ActionListener<GetSegmentFilesResponse> listener
) {
super.getSegmentFiles(replicationId, checkpoint, filesToFetch, indexShard, (fileName, bytesRecovered) -> {}, listener);
postGetFilesRunnable.run();
}

@Override
public String getDescription() {
return "TestRemoteStoreReplicationSource";
}
};
}

@Override
protected void validateShardIdleWithNoReplicas(IndexShard primary) {
// ensure search idle conditions are met.
Expand Down

0 comments on commit 0514aad

Please sign in to comment.