From 0669700038a3f8da2e0e81541090f25692d7a9b4 Mon Sep 17 00:00:00 2001 From: Michael Gibney Date: Mon, 4 Mar 2024 10:22:42 -0500 Subject: [PATCH] more tolerant of missing persistent copy; verify persistent length asynchronously --- .../solr/storage/CompressingDirectory.java | 6 +- .../org/apache/solr/storage/TeeDirectory.java | 35 ++++++- .../solr/storage/TeeDirectoryFactory.java | 95 ++++++++++++++++++- 3 files changed, 126 insertions(+), 10 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/storage/CompressingDirectory.java b/solr/core/src/java/org/apache/solr/storage/CompressingDirectory.java index a69da3dc2da..2ebfdd30232 100644 --- a/solr/core/src/java/org/apache/solr/storage/CompressingDirectory.java +++ b/solr/core/src/java/org/apache/solr/storage/CompressingDirectory.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.file.Files; +import java.nio.file.NoSuchFileException; import java.nio.file.OpenOption; import java.nio.file.Path; import java.util.Arrays; @@ -107,7 +108,10 @@ public CompressingDirectory( public long fileLength(String name) throws IOException { Path path = directoryPath.resolve(name); File file = path.toFile(); - super.fileLength(name); // to throw NoSuchFileException -- hacky + ensureOpen(); + if (getPendingDeletions().contains(name)) { + throw new NoSuchFileException("file \"" + name + "\" is pending delete"); + } if (file.length() < Long.BYTES) { return 0; } else { diff --git a/solr/core/src/java/org/apache/solr/storage/TeeDirectory.java b/solr/core/src/java/org/apache/solr/storage/TeeDirectory.java index 28d63a9f311..8b59e0908e3 100644 --- a/solr/core/src/java/org/apache/solr/storage/TeeDirectory.java +++ b/solr/core/src/java/org/apache/solr/storage/TeeDirectory.java @@ -21,6 +21,7 @@ import java.io.Closeable; import java.io.IOException; +import java.lang.invoke.MethodHandles; import java.nio.file.NoSuchFileException; import java.nio.file.Path; import java.util.AbstractMap; @@ -31,6 +32,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -49,15 +51,21 @@ import org.apache.lucene.util.IOUtils; import org.apache.solr.common.util.CollectionUtil; import org.apache.solr.util.IOFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class TeeDirectory extends BaseDirectory { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private volatile Directory access; private final ExecutorService ioExec; private final AutoCloseable closeLocal; private final IOFunction> accessFunction; private final IOFunction>> persistentFunction; private volatile Directory persistent; + private final BlockingQueue + persistentLengthVerificationQueue; /** * This ctor (with default inline config) exists to be invoked during testing, via @@ -66,8 +74,9 @@ public class TeeDirectory extends BaseDirectory { public TeeDirectory(Path path, LockFactory lockFactory) throws IOException { super(TEE_LOCK_FACTORY); TeeDirectoryFactory.NodeLevelTeeDirectoryState ownState = - new TeeDirectoryFactory.NodeLevelTeeDirectoryState(); + new TeeDirectoryFactory.NodeLevelTeeDirectoryState(64); this.ioExec = ownState.ioExec; + this.persistentLengthVerificationQueue = ownState.persistentLengthVerificationQueue; Directory naive = new MMapDirectory(path, lockFactory, MMapDirectory.DEFAULT_MAX_CHUNK_SIZE); this.access = naive; Path compressedPath = path; @@ -100,13 +109,14 @@ public TeeDirectory( Directory naive, IOFunction> accessFunction, IOFunction>> persistentFunction, - ExecutorService ioExec) { + TeeDirectoryFactory.NodeLevelTeeDirectoryState nodeLevelState) { super(TEE_LOCK_FACTORY); this.accessFunction = accessFunction; this.persistentFunction = persistentFunction; this.access = naive; - this.ioExec = ioExec; + this.ioExec = nodeLevelState.ioExec; this.closeLocal = null; + this.persistentLengthVerificationQueue = nodeLevelState.persistentLengthVerificationQueue; } private List associatedPaths; @@ -315,6 +325,19 @@ public void deleteFile(String name) throws IOException { } finally { try { access.deleteFile(name); + if (th instanceof NoSuchFileException) { + // if we successfully delete the access copy, but threw `NoSuchFileException` for + // the persistent copy, then swallow the original exception. + // we expect this to happen only in the case of recovering after a disorderly shutdown + // (or similar situation?). It is expected in such cases that a file may have been + // flushed to disk for the access copy, and not for the persistent copy. In the case + // of `pending_segments_*` files, these files are ~explicitly partial, but must be + // deleted. It's not ideal that we have to be lenient here, but we kind of have to do, + // because we know there are legit situations where files can exist in access and not + // in persistent, and we must support the ability to delete such files. + log.info("swallow exception deleting missing persistent file: {}", name); + th = null; + } } catch (NoSuchFileException ex) { // when `persistent != null`, `access` is a special case. Since access may be on ephemeral // storage, we should be ok with files being already absent if we're asked to delete them. @@ -525,7 +548,11 @@ public void rename(String source, String dest) throws IOException { @Override public IndexInput openInput(String name, IOContext context) throws IOException { - return access.openInput(name, context); + IndexInput ret = access.openInput(name, context); + persistentLengthVerificationQueue.offer( + new TeeDirectoryFactory.PersistentLengthVerification( + access, persistent, name, ret.length())); + return ret; } @Override diff --git a/solr/core/src/java/org/apache/solr/storage/TeeDirectoryFactory.java b/solr/core/src/java/org/apache/solr/storage/TeeDirectoryFactory.java index e2c115c3125..2f91d82be13 100644 --- a/solr/core/src/java/org/apache/solr/storage/TeeDirectoryFactory.java +++ b/solr/core/src/java/org/apache/solr/storage/TeeDirectoryFactory.java @@ -29,6 +29,8 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -36,7 +38,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.LongAdder; import java.util.function.BiConsumer; +import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FSDirectory; import org.apache.lucene.store.FilterDirectory; import org.apache.lucene.store.LockFactory; import org.apache.lucene.util.RamUsageEstimator; @@ -73,6 +77,8 @@ public void initCoreContainer(CoreContainer cc) { static class NodeLevelTeeDirectoryState implements SolrMetricProducer { final ExecutorService ioExec = ExecutorUtil.newMDCAwareCachedThreadPool("teeIOExec"); + private final Future lengthVerificationTask; + final BlockingQueue persistentLengthVerificationQueue; private final Future activationTask; final LinkedBlockingQueue activationQueue = new LinkedBlockingQueue<>(); @@ -90,7 +96,8 @@ static class NodeLevelTeeDirectoryState implements SolrMetricProducer { final Meter priorityActivateMeter = new Meter(); final Meter activateMeter = new Meter(); - NodeLevelTeeDirectoryState() { + NodeLevelTeeDirectoryState(int lengthVerificationQueueSize) { + persistentLengthVerificationQueue = new ArrayBlockingQueue<>(lengthVerificationQueueSize); activationTask = ioExec.submit( () -> { @@ -144,6 +151,23 @@ static class NodeLevelTeeDirectoryState implements SolrMetricProducer { } return null; }); + lengthVerificationTask = + ioExec.submit( + () -> { + Thread t = Thread.currentThread(); + while (!t.isInterrupted()) { + PersistentLengthVerification a = null; + try { + a = persistentLengthVerificationQueue.take(); + a.verify(); + } catch (InterruptedException e) { + t.interrupt(); + break; + } catch (Throwable th) { + log.error("error verifying persistent length {}", a); + } + } + }); } @Override @@ -199,8 +223,69 @@ public SolrMetricsContext getSolrMetricsContext() { public void close() throws IOException { try (Closeable c1 = SolrMetricProducer.super::close; Closeable c2 = () -> ExecutorUtil.shutdownAndAwaitTermination(ioExec)) { - activationTask.cancel(true); + try { + lengthVerificationTask.cancel(true); + } finally { + activationTask.cancel(true); + } + } + } + } + + static final class PersistentLengthVerification { + private final Directory accessDir; + private final Directory persistentDir; + private final String name; + private final long accessLength; + + PersistentLengthVerification( + Directory accessDir, Directory persistentDir, String name, long accessLength) { + this.accessDir = accessDir; + this.persistentDir = persistentDir; + this.name = name; + this.accessLength = accessLength; + } + + private void verify() { + try { + long l = persistentDir.fileLength(name); + if (l != accessLength) { + log.error("file length mismatch {} != {}", l, this); + } + } catch (AlreadyClosedException th) { + // swallow this; we have to defer lookup, but we know that in doing so we run the risk + // the the directory will already have been closed by the time we look up the length + } catch (NoSuchFileException e) { + try { + accessDir.fileLength(name); + log.error("file absent in persistent, present in access: {}", this); + } catch (NoSuchFileException e1) { + // this is what we expect, so just swallow it + } catch (Throwable t) { + log.warn("unable to re-verify access length {}", this, t); + } + } catch (Throwable t) { + log.warn("unable to verify persistent length {}", this, t); + } + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("{name=").append(name).append(", length=").append(accessLength).append(", access="); + if (accessDir instanceof FSDirectory) { + sb.append(((FSDirectory) accessDir).getDirectory()); + } else { + sb.append(accessDir); + } + sb.append(", persistent="); + if (persistentDir instanceof FSDirectory) { + sb.append(((FSDirectory) persistentDir).getDirectory()); + } else { + sb.append(persistentDir); } + sb.append("}"); + return sb.toString(); } } @@ -216,13 +301,13 @@ public void init(NamedList args) { "nodeLevelTeeDirectoryState", NodeLevelTeeDirectoryState.class, (k) -> { - NodeLevelTeeDirectoryState ret = new NodeLevelTeeDirectoryState(); + NodeLevelTeeDirectoryState ret = new NodeLevelTeeDirectoryState(4096); ret.initializeMetrics( cc.getMetricsHandler().getSolrMetricsContext(), "teeDirectory"); return ret; }); } else { - nodeLevelState = new NodeLevelTeeDirectoryState(); + nodeLevelState = new NodeLevelTeeDirectoryState(64); ownNodeLevelState = nodeLevelState; } super.init(args); @@ -297,7 +382,7 @@ public Directory create(String path, LockFactory lockFactory, DirContext dirCont return new AbstractMap.SimpleImmutableEntry<>(content, Collections.emptyList()); }; return new SizeAwareDirectory( - new TeeDirectory(naive, accessFunction, persistentFunction, nodeLevelState.ioExec), 0); + new TeeDirectory(naive, accessFunction, persistentFunction, nodeLevelState), 0); } @Override