From b5299f13e0ca9a5f6979e8cb50137682e777b095 Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Wed, 25 Oct 2023 14:46:44 +0530 Subject: [PATCH] Delete corrupted file to re-download from remote store (#10891) --------- Signed-off-by: Sachin Kale Co-authored-by: Sachin Kale --- .../opensearch/index/shard/IndexShard.java | 5 +- .../index/shard/IndexShardTests.java | 52 +++++++++++++++++++ .../org/opensearch/test/CorruptionUtils.java | 2 +- 3 files changed, 57 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 3c348035ebbdd..5b6257084e440 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -4962,7 +4962,8 @@ private String copySegmentFiles( return segmentNFile; } - private boolean localDirectoryContains(Directory localDirectory, String file, long checksum) { + // Visible for testing + boolean localDirectoryContains(Directory localDirectory, String file, long checksum) throws IOException { try (IndexInput indexInput = localDirectory.openInput(file, IOContext.DEFAULT)) { if (checksum == CodecUtil.retrieveChecksum(indexInput)) { return true; @@ -4981,6 +4982,8 @@ private boolean localDirectoryContains(Directory localDirectory, String file, lo logger.debug("File {} does not exist in local FS, downloading from remote store", file); } catch (IOException e) { logger.warn("Exception while reading checksum of file: {}, this can happen if file is corrupted", file); + // For any other exception on reading checksum, we delete the file to re-download again + localDirectory.deleteFile(file); } return false; } diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index fa3cf7676f55c..f5f8cd1dcfb3f 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -32,6 +32,7 @@ package org.opensearch.index.shard; import org.apache.logging.log4j.Logger; +import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexCommit; @@ -45,6 +46,7 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.store.FilterDirectory; import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; import org.apache.lucene.tests.mockfile.ExtrasFS; import org.apache.lucene.tests.store.BaseDirectoryWrapper; import org.apache.lucene.util.BytesRef; @@ -91,6 +93,7 @@ import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.indices.breaker.NoneCircuitBreakerService; +import org.opensearch.core.util.FileSystemUtils; import org.opensearch.core.xcontent.MediaTypeRegistry; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.core.xcontent.XContentBuilder; @@ -163,11 +166,13 @@ import org.junit.Assert; import java.io.IOException; +import java.nio.channels.FileChannel; import java.nio.charset.Charset; import java.nio.file.FileVisitResult; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.SimpleFileVisitor; +import java.nio.file.StandardOpenOption; import java.nio.file.attribute.BasicFileAttributes; import java.util.ArrayList; import java.util.Arrays; @@ -4907,6 +4912,53 @@ public void testRecordsForceMerges() throws IOException { closeShards(shard); } + public void testLocalDirectoryContains() throws IOException { + IndexShard indexShard = newStartedShard(true); + int numDocs = between(1, 10); + for (int i = 0; i < numDocs; i++) { + indexDoc(indexShard, "_doc", Integer.toString(i)); + } + flushShard(indexShard); + indexShard.store().incRef(); + Directory localDirectory = indexShard.store().directory(); + Path shardPath = indexShard.shardPath().getDataPath().resolve(ShardPath.INDEX_FOLDER_NAME); + Path tempDir = createTempDir(); + for (String file : localDirectory.listAll()) { + if (file.equals("write.lock") || file.startsWith("extra")) { + continue; + } + boolean corrupted = randomBoolean(); + long checksum = 0; + try (IndexInput indexInput = localDirectory.openInput(file, IOContext.DEFAULT)) { + checksum = CodecUtil.retrieveChecksum(indexInput); + } + if (corrupted) { + Files.copy(shardPath.resolve(file), tempDir.resolve(file)); + try (FileChannel raf = FileChannel.open(shardPath.resolve(file), StandardOpenOption.READ, StandardOpenOption.WRITE)) { + CorruptionUtils.corruptAt(shardPath.resolve(file), raf, (int) (raf.size() - 8)); + } + } + if (corrupted == false) { + assertTrue(indexShard.localDirectoryContains(localDirectory, file, checksum)); + } else { + assertFalse(indexShard.localDirectoryContains(localDirectory, file, checksum)); + assertFalse(Files.exists(shardPath.resolve(file))); + } + } + try (Stream files = Files.list(tempDir)) { + files.forEach(p -> { + try { + Files.copy(p, shardPath.resolve(p.getFileName())); + } catch (IOException e) { + // Ignore + } + }); + } + FileSystemUtils.deleteSubDirectories(tempDir); + indexShard.store().decRef(); + closeShards(indexShard); + } + private void populateSampleRemoteSegmentStats(RemoteSegmentTransferTracker tracker) { tracker.addUploadBytesStarted(30L); tracker.addUploadBytesSucceeded(10L); diff --git a/test/framework/src/main/java/org/opensearch/test/CorruptionUtils.java b/test/framework/src/main/java/org/opensearch/test/CorruptionUtils.java index 0dce5e78bf91f..67522bb618cf1 100644 --- a/test/framework/src/main/java/org/opensearch/test/CorruptionUtils.java +++ b/test/framework/src/main/java/org/opensearch/test/CorruptionUtils.java @@ -121,7 +121,7 @@ public static void corruptFile(Random random, Path... files) throws IOException } } - static void corruptAt(Path path, FileChannel channel, int position) throws IOException { + public static void corruptAt(Path path, FileChannel channel, int position) throws IOException { // read channel.position(position); long filePointer = channel.position();