Skip to content

Commit

Permalink
more tolerant of missing persistent copy; verify persistent length as…
Browse files Browse the repository at this point in the history
…ynchronously
  • Loading branch information
magibney committed Mar 4, 2024
1 parent 3a2d466 commit 0669700
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.util.Arrays;
Expand Down Expand Up @@ -107,7 +108,10 @@ public CompressingDirectory(
public long fileLength(String name) throws IOException {
Path path = directoryPath.resolve(name);
File file = path.toFile();
super.fileLength(name); // to throw NoSuchFileException -- hacky
ensureOpen();
if (getPendingDeletions().contains(name)) {
throw new NoSuchFileException("file \"" + name + "\" is pending delete");
}
if (file.length() < Long.BYTES) {
return 0;
} else {
Expand Down
35 changes: 31 additions & 4 deletions solr/core/src/java/org/apache/solr/storage/TeeDirectory.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.io.Closeable;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.util.AbstractMap;
Expand All @@ -31,6 +32,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand All @@ -49,15 +51,21 @@
import org.apache.lucene.util.IOUtils;
import org.apache.solr.common.util.CollectionUtil;
import org.apache.solr.util.IOFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TeeDirectory extends BaseDirectory {

private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

private volatile Directory access;
private final ExecutorService ioExec;
private final AutoCloseable closeLocal;
private final IOFunction<Void, Map.Entry<String, Directory>> accessFunction;
private final IOFunction<Directory, Map.Entry<Directory, List<String>>> persistentFunction;
private volatile Directory persistent;
private final BlockingQueue<TeeDirectoryFactory.PersistentLengthVerification>
persistentLengthVerificationQueue;

/**
* This ctor (with default inline config) exists to be invoked during testing, via
Expand All @@ -66,8 +74,9 @@ public class TeeDirectory extends BaseDirectory {
public TeeDirectory(Path path, LockFactory lockFactory) throws IOException {
super(TEE_LOCK_FACTORY);
TeeDirectoryFactory.NodeLevelTeeDirectoryState ownState =
new TeeDirectoryFactory.NodeLevelTeeDirectoryState();
new TeeDirectoryFactory.NodeLevelTeeDirectoryState(64);
this.ioExec = ownState.ioExec;
this.persistentLengthVerificationQueue = ownState.persistentLengthVerificationQueue;
Directory naive = new MMapDirectory(path, lockFactory, MMapDirectory.DEFAULT_MAX_CHUNK_SIZE);
this.access = naive;
Path compressedPath = path;
Expand Down Expand Up @@ -100,13 +109,14 @@ public TeeDirectory(
Directory naive,
IOFunction<Void, Map.Entry<String, Directory>> accessFunction,
IOFunction<Directory, Map.Entry<Directory, List<String>>> persistentFunction,
ExecutorService ioExec) {
TeeDirectoryFactory.NodeLevelTeeDirectoryState nodeLevelState) {
super(TEE_LOCK_FACTORY);
this.accessFunction = accessFunction;
this.persistentFunction = persistentFunction;
this.access = naive;
this.ioExec = ioExec;
this.ioExec = nodeLevelState.ioExec;
this.closeLocal = null;
this.persistentLengthVerificationQueue = nodeLevelState.persistentLengthVerificationQueue;
}

private List<String> associatedPaths;
Expand Down Expand Up @@ -315,6 +325,19 @@ public void deleteFile(String name) throws IOException {
} finally {
try {
access.deleteFile(name);
if (th instanceof NoSuchFileException) {
// if we successfully delete the access copy, but threw `NoSuchFileException` for
// the persistent copy, then swallow the original exception.
// we expect this to happen only in the case of recovering after a disorderly shutdown
// (or similar situation?). It is expected in such cases that a file may have been
// flushed to disk for the access copy, and not for the persistent copy. In the case
// of `pending_segments_*` files, these files are ~explicitly partial, but must be
// deleted. It's not ideal that we have to be lenient here, but we kind of have to do,
// because we know there are legit situations where files can exist in access and not
// in persistent, and we must support the ability to delete such files.
log.info("swallow exception deleting missing persistent file: {}", name);
th = null;
}
} catch (NoSuchFileException ex) {
// when `persistent != null`, `access` is a special case. Since access may be on ephemeral
// storage, we should be ok with files being already absent if we're asked to delete them.
Expand Down Expand Up @@ -525,7 +548,11 @@ public void rename(String source, String dest) throws IOException {

@Override
public IndexInput openInput(String name, IOContext context) throws IOException {
return access.openInput(name, context);
IndexInput ret = access.openInput(name, context);
persistentLengthVerificationQueue.offer(
new TeeDirectoryFactory.PersistentLengthVerification(
access, persistent, name, ret.length()));
return ret;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,18 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.BiConsumer;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.LockFactory;
import org.apache.lucene.util.RamUsageEstimator;
Expand Down Expand Up @@ -73,6 +77,8 @@ public void initCoreContainer(CoreContainer cc) {

static class NodeLevelTeeDirectoryState implements SolrMetricProducer {
final ExecutorService ioExec = ExecutorUtil.newMDCAwareCachedThreadPool("teeIOExec");
private final Future<?> lengthVerificationTask;
final BlockingQueue<PersistentLengthVerification> persistentLengthVerificationQueue;
private final Future<?> activationTask;
final LinkedBlockingQueue<AccessDirectory.LazyEntry> activationQueue =
new LinkedBlockingQueue<>();
Expand All @@ -90,7 +96,8 @@ static class NodeLevelTeeDirectoryState implements SolrMetricProducer {
final Meter priorityActivateMeter = new Meter();
final Meter activateMeter = new Meter();

NodeLevelTeeDirectoryState() {
NodeLevelTeeDirectoryState(int lengthVerificationQueueSize) {
persistentLengthVerificationQueue = new ArrayBlockingQueue<>(lengthVerificationQueueSize);
activationTask =
ioExec.submit(
() -> {
Expand Down Expand Up @@ -144,6 +151,23 @@ static class NodeLevelTeeDirectoryState implements SolrMetricProducer {
}
return null;
});
lengthVerificationTask =
ioExec.submit(
() -> {
Thread t = Thread.currentThread();
while (!t.isInterrupted()) {
PersistentLengthVerification a = null;
try {
a = persistentLengthVerificationQueue.take();
a.verify();
} catch (InterruptedException e) {
t.interrupt();
break;
} catch (Throwable th) {
log.error("error verifying persistent length {}", a);
}
}
});
}

@Override
Expand Down Expand Up @@ -199,8 +223,69 @@ public SolrMetricsContext getSolrMetricsContext() {
public void close() throws IOException {
try (Closeable c1 = SolrMetricProducer.super::close;
Closeable c2 = () -> ExecutorUtil.shutdownAndAwaitTermination(ioExec)) {
activationTask.cancel(true);
try {
lengthVerificationTask.cancel(true);
} finally {
activationTask.cancel(true);
}
}
}
}

static final class PersistentLengthVerification {
private final Directory accessDir;
private final Directory persistentDir;
private final String name;
private final long accessLength;

PersistentLengthVerification(
Directory accessDir, Directory persistentDir, String name, long accessLength) {
this.accessDir = accessDir;
this.persistentDir = persistentDir;
this.name = name;
this.accessLength = accessLength;
}

private void verify() {
try {
long l = persistentDir.fileLength(name);
if (l != accessLength) {
log.error("file length mismatch {} != {}", l, this);
}
} catch (AlreadyClosedException th) {
// swallow this; we have to defer lookup, but we know that in doing so we run the risk
// the the directory will already have been closed by the time we look up the length
} catch (NoSuchFileException e) {
try {
accessDir.fileLength(name);
log.error("file absent in persistent, present in access: {}", this);
} catch (NoSuchFileException e1) {
// this is what we expect, so just swallow it
} catch (Throwable t) {
log.warn("unable to re-verify access length {}", this, t);
}
} catch (Throwable t) {
log.warn("unable to verify persistent length {}", this, t);
}
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("{name=").append(name).append(", length=").append(accessLength).append(", access=");
if (accessDir instanceof FSDirectory) {
sb.append(((FSDirectory) accessDir).getDirectory());
} else {
sb.append(accessDir);
}
sb.append(", persistent=");
if (persistentDir instanceof FSDirectory) {
sb.append(((FSDirectory) persistentDir).getDirectory());
} else {
sb.append(persistentDir);
}
sb.append("}");
return sb.toString();
}
}

Expand All @@ -216,13 +301,13 @@ public void init(NamedList<?> args) {
"nodeLevelTeeDirectoryState",
NodeLevelTeeDirectoryState.class,
(k) -> {
NodeLevelTeeDirectoryState ret = new NodeLevelTeeDirectoryState();
NodeLevelTeeDirectoryState ret = new NodeLevelTeeDirectoryState(4096);
ret.initializeMetrics(
cc.getMetricsHandler().getSolrMetricsContext(), "teeDirectory");
return ret;
});
} else {
nodeLevelState = new NodeLevelTeeDirectoryState();
nodeLevelState = new NodeLevelTeeDirectoryState(64);
ownNodeLevelState = nodeLevelState;
}
super.init(args);
Expand Down Expand Up @@ -297,7 +382,7 @@ public Directory create(String path, LockFactory lockFactory, DirContext dirCont
return new AbstractMap.SimpleImmutableEntry<>(content, Collections.emptyList());
};
return new SizeAwareDirectory(
new TeeDirectory(naive, accessFunction, persistentFunction, nodeLevelState.ioExec), 0);
new TeeDirectory(naive, accessFunction, persistentFunction, nodeLevelState), 0);
}

@Override
Expand Down

0 comments on commit 0669700

Please sign in to comment.