diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index 73a33a3c8803e..cc71ef816e525 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -35,6 +35,7 @@ import org.opensearch.indices.replication.common.ReplicationTarget; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.List; import java.util.Locale; import java.util.Set; @@ -194,13 +195,7 @@ private List getFiles(CheckpointInfoResponse checkpointInfo) // set of local files that can be reused final Set reuseFiles = diff.missing.stream() .filter(storeFileMetadata -> localFiles.contains(storeFileMetadata.name())) - .filter((storeFileMetadata) -> { - try { - return validateLocalChecksum(storeFileMetadata); - } catch (IOException e) { - throw new RuntimeException(e); - } - }) + .filter(this::validateLocalChecksum) .map(StoreFileMetadata::name) .collect(Collectors.toSet()); @@ -237,7 +232,8 @@ private List getFiles(CheckpointInfoResponse checkpointInfo) return missingFiles; } - private boolean validateLocalChecksum(StoreFileMetadata file) throws IOException { + // pkg private for tests + private boolean validateLocalChecksum(StoreFileMetadata file) { try (IndexInput indexInput = indexShard.store().directory().openInput(file.name(), IOContext.DEFAULT)) { String checksum = Store.digestToString(CodecUtil.retrieveChecksum(indexInput)); if (file.checksum().equals(checksum)) { @@ -251,7 +247,11 @@ private boolean validateLocalChecksum(StoreFileMetadata file) throws IOException logger.warn("Error reading " + file, e); // Delete file on exceptions so that it can be re-downloaded. This is safe to do as this file is local only // and not referenced by reader. - indexShard.store().directory().deleteFile(file.name()); + try { + indexShard.store().directory().deleteFile(file.name()); + } catch (IOException ex) { + throw new UncheckedIOException("Error reading " + file, e); + } return false; } } diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java index e9bf4122d6195..2ce0bdc607189 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java @@ -386,6 +386,7 @@ public void testSegRepSucceedsOnPreviousCopiedFiles() throws Exception { new SegmentReplicationTargetService.SegmentReplicationListener() { @Override public void onReplicationDone(SegmentReplicationState state) { + latch.countDown(); Assert.fail("Replication should fail with simulated error"); } @@ -395,9 +396,9 @@ public void onReplicationFailure( ReplicationFailedException e, boolean sendShardFailure ) { + latch.countDown(); assertFalse(sendShardFailure); logger.error("Replication error", e); - latch.countDown(); } } ); @@ -431,9 +432,9 @@ public void onReplicationFailure( ReplicationFailedException e, boolean sendShardFailure ) { + waitForSecondRound.countDown(); logger.error("Replication error", e); Assert.fail("Replication should not fail"); - waitForSecondRound.countDown(); } } ); @@ -447,10 +448,10 @@ public void onReplicationFailure( } /** - * This test validates that local non-readable (partially written, corrupt) on disk are deleted vs failing the + * This test validates that local non-readable (corrupt, partially) on disk are deleted vs failing the * replication event. This test mimics local files (not referenced by reader) by throwing exception post file copy and * blocking update of reader. Once this is done, it corrupts one segment file and ensure that file is deleted in next - * round of segment replication goes through without any issues. + * round of segment replication by ensuring doc count. */ public void testNoFailuresOnFileReads() throws Exception { try (ReplicationGroup shards = createGroup(1, getIndexSettings(), new NRTReplicationEngineFactory())) { @@ -464,22 +465,21 @@ public void testNoFailuresOnFileReads() throws Exception { final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class); final SegmentReplicationTargetService targetService = newTargetService(sourceFactory); - when(sourceFactory.get(any())).thenReturn( getRemoteStoreReplicationSource(replica, () -> { throw new RuntimeException("Simulated"); }) ); - CountDownLatch waitOnReplication = new CountDownLatch(1); + CountDownLatch waitOnReplicationCompletion = new CountDownLatch(1); // Start first round of segment replication. This should fail with simulated error but with replica having // files in its local store but not in active reader. - targetService.startReplication( + SegmentReplicationTarget segmentReplicationTarget = targetService.startReplication( replica, primary.getLatestReplicationCheckpoint(), new SegmentReplicationTargetService.SegmentReplicationListener() { @Override public void onReplicationDone(SegmentReplicationState state) { + waitOnReplicationCompletion.countDown(); Assert.fail("Replication should fail with simulated error"); - waitOnReplication.countDown(); } @Override @@ -488,12 +488,13 @@ public void onReplicationFailure( ReplicationFailedException e, boolean sendShardFailure ) { + waitOnReplicationCompletion.countDown(); assertFalse(sendShardFailure); - waitOnReplication.countDown(); } } ); - waitOnReplication.await(); + waitOnReplicationCompletion.await(); + assertBusy(() -> { assertEquals("Target should be closed", 0, segmentReplicationTarget.refCount()); }); String fileToCorrupt = null; // Corrupt one data file Path shardPath = replica.shardPath().getDataPath().resolve(ShardPath.INDEX_FOLDER_NAME);