Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Suraj Singh <[email protected]>
  • Loading branch information
dreamer-89 committed Oct 26, 2023
1 parent 1cec100 commit cc0e090
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.opensearch.indices.replication.common.ReplicationTarget;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Locale;
import java.util.Set;
Expand Down Expand Up @@ -194,13 +195,7 @@ private List<StoreFileMetadata> getFiles(CheckpointInfoResponse checkpointInfo)
// set of local files that can be reused
final Set<String> reuseFiles = diff.missing.stream()
.filter(storeFileMetadata -> localFiles.contains(storeFileMetadata.name()))
.filter((storeFileMetadata) -> {
try {
return validateLocalChecksum(storeFileMetadata);
} catch (IOException e) {
throw new RuntimeException(e);
}
})
.filter(this::validateLocalChecksum)
.map(StoreFileMetadata::name)
.collect(Collectors.toSet());

Expand Down Expand Up @@ -237,7 +232,8 @@ private List<StoreFileMetadata> getFiles(CheckpointInfoResponse checkpointInfo)
return missingFiles;
}

private boolean validateLocalChecksum(StoreFileMetadata file) throws IOException {
// pkg private for tests
private boolean validateLocalChecksum(StoreFileMetadata file) {
try (IndexInput indexInput = indexShard.store().directory().openInput(file.name(), IOContext.DEFAULT)) {
String checksum = Store.digestToString(CodecUtil.retrieveChecksum(indexInput));
if (file.checksum().equals(checksum)) {
Expand All @@ -251,7 +247,11 @@ private boolean validateLocalChecksum(StoreFileMetadata file) throws IOException
logger.warn("Error reading " + file, e);
// Delete file on exceptions so that it can be re-downloaded. This is safe to do as this file is local only
// and not referenced by reader.
indexShard.store().directory().deleteFile(file.name());
try {
indexShard.store().directory().deleteFile(file.name());
} catch (IOException ex) {
throw new UncheckedIOException("Error reading " + file, e);
}
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ public void testSegRepSucceedsOnPreviousCopiedFiles() throws Exception {
new SegmentReplicationTargetService.SegmentReplicationListener() {
@Override
public void onReplicationDone(SegmentReplicationState state) {
latch.countDown();
Assert.fail("Replication should fail with simulated error");
}

Expand All @@ -395,9 +396,9 @@ public void onReplicationFailure(
ReplicationFailedException e,
boolean sendShardFailure
) {
latch.countDown();
assertFalse(sendShardFailure);
logger.error("Replication error", e);
latch.countDown();
}
}
);
Expand Down Expand Up @@ -431,9 +432,9 @@ public void onReplicationFailure(
ReplicationFailedException e,
boolean sendShardFailure
) {
waitForSecondRound.countDown();
logger.error("Replication error", e);
Assert.fail("Replication should not fail");
waitForSecondRound.countDown();
}
}
);
Expand All @@ -447,10 +448,10 @@ public void onReplicationFailure(
}

/**
* This test validates that local non-readable (partially written, corrupt) on disk are deleted vs failing the
* This test validates that local non-readable (corrupt, partially) on disk are deleted vs failing the
* replication event. This test mimics local files (not referenced by reader) by throwing exception post file copy and
* blocking update of reader. Once this is done, it corrupts one segment file and ensure that file is deleted in next
* round of segment replication goes through without any issues.
* round of segment replication by ensuring doc count.
*/
public void testNoFailuresOnFileReads() throws Exception {
try (ReplicationGroup shards = createGroup(1, getIndexSettings(), new NRTReplicationEngineFactory())) {
Expand All @@ -464,22 +465,21 @@ public void testNoFailuresOnFileReads() throws Exception {

final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class);
final SegmentReplicationTargetService targetService = newTargetService(sourceFactory);

when(sourceFactory.get(any())).thenReturn(
getRemoteStoreReplicationSource(replica, () -> { throw new RuntimeException("Simulated"); })
);
CountDownLatch waitOnReplication = new CountDownLatch(1);
CountDownLatch waitOnReplicationCompletion = new CountDownLatch(1);

// Start first round of segment replication. This should fail with simulated error but with replica having
// files in its local store but not in active reader.
targetService.startReplication(
SegmentReplicationTarget segmentReplicationTarget = targetService.startReplication(
replica,
primary.getLatestReplicationCheckpoint(),
new SegmentReplicationTargetService.SegmentReplicationListener() {
@Override
public void onReplicationDone(SegmentReplicationState state) {
waitOnReplicationCompletion.countDown();
Assert.fail("Replication should fail with simulated error");
waitOnReplication.countDown();
}

@Override
Expand All @@ -488,12 +488,13 @@ public void onReplicationFailure(
ReplicationFailedException e,
boolean sendShardFailure
) {
waitOnReplicationCompletion.countDown();
assertFalse(sendShardFailure);
waitOnReplication.countDown();
}
}
);
waitOnReplication.await();
waitOnReplicationCompletion.await();
assertBusy(() -> { assertEquals("Target should be closed", 0, segmentReplicationTarget.refCount()); });
String fileToCorrupt = null;
// Corrupt one data file
Path shardPath = replica.shardPath().getDataPath().resolve(ShardPath.INDEX_FOLDER_NAME);
Expand Down

0 comments on commit cc0e090

Please sign in to comment.