Skip to content

Commit

Permalink
checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
magibney committed Mar 4, 2024
1 parent 3a2d466 commit ec1936c
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.util.Arrays;
Expand Down Expand Up @@ -107,7 +108,10 @@ public CompressingDirectory(
public long fileLength(String name) throws IOException {
Path path = directoryPath.resolve(name);
File file = path.toFile();
super.fileLength(name); // to throw NoSuchFileException -- hacky
ensureOpen();
if (getPendingDeletions().contains(name)) {
throw new NoSuchFileException("file \"" + name + "\" is pending delete");
}
if (file.length() < Long.BYTES) {
return 0;
} else {
Expand Down
35 changes: 31 additions & 4 deletions solr/core/src/java/org/apache/solr/storage/TeeDirectory.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.io.Closeable;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.util.AbstractMap;
Expand All @@ -31,6 +32,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand All @@ -49,15 +51,21 @@
import org.apache.lucene.util.IOUtils;
import org.apache.solr.common.util.CollectionUtil;
import org.apache.solr.util.IOFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TeeDirectory extends BaseDirectory {

private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

private volatile Directory access;
private final ExecutorService ioExec;
private final AutoCloseable closeLocal;
private final IOFunction<Void, Map.Entry<String, Directory>> accessFunction;
private final IOFunction<Directory, Map.Entry<Directory, List<String>>> persistentFunction;
private volatile Directory persistent;
private final BlockingQueue<TeeDirectoryFactory.PersistentLengthVerification>
persistentLengthVerificationQueue;

/**
* This ctor (with default inline config) exists to be invoked during testing, via
Expand All @@ -66,8 +74,9 @@ public class TeeDirectory extends BaseDirectory {
public TeeDirectory(Path path, LockFactory lockFactory) throws IOException {
super(TEE_LOCK_FACTORY);
TeeDirectoryFactory.NodeLevelTeeDirectoryState ownState =
new TeeDirectoryFactory.NodeLevelTeeDirectoryState();
new TeeDirectoryFactory.NodeLevelTeeDirectoryState(64);
this.ioExec = ownState.ioExec;
this.persistentLengthVerificationQueue = ownState.persistentLengthVerificationQueue;
Directory naive = new MMapDirectory(path, lockFactory, MMapDirectory.DEFAULT_MAX_CHUNK_SIZE);
this.access = naive;
Path compressedPath = path;
Expand Down Expand Up @@ -100,13 +109,14 @@ public TeeDirectory(
Directory naive,
IOFunction<Void, Map.Entry<String, Directory>> accessFunction,
IOFunction<Directory, Map.Entry<Directory, List<String>>> persistentFunction,
ExecutorService ioExec) {
TeeDirectoryFactory.NodeLevelTeeDirectoryState nodeLevelState) {
super(TEE_LOCK_FACTORY);
this.accessFunction = accessFunction;
this.persistentFunction = persistentFunction;
this.access = naive;
this.ioExec = ioExec;
this.ioExec = nodeLevelState.ioExec;
this.closeLocal = null;
this.persistentLengthVerificationQueue = nodeLevelState.persistentLengthVerificationQueue;
}

private List<String> associatedPaths;
Expand Down Expand Up @@ -315,6 +325,19 @@ public void deleteFile(String name) throws IOException {
} finally {
try {
access.deleteFile(name);
if (th instanceof NoSuchFileException) {
// if we successfully delete the access copy, but threw `NoSuchFileException` for
// the persistent copy, then swallow the original exception.
// we expect this to happen only in the case of recovering after a disorderly shutdown
// (or similar situation?). It is expected in such cases that a file may have been
// flushed to disk for the access copy, and not for the persistent copy. In the case

This comment has been minimized.

Copy link
@hiteshk25

hiteshk25 Mar 4, 2024

Collaborator

File flush should happen in persistent directory first. right? do we do parallel flush?

This comment has been minimized.

Copy link
@magibney

magibney Mar 4, 2024

Author Collaborator

We do parallel flush for sync'ing file content, but this fails unless sync to both directories succeed. This is ok because there is no "rollback" necessary (nor approriate) in the event of a file content sync failure.

The place where order really does matter is in rename(), because the renaming of pending_segments_N to segments_N is what concludes the commit and ultimately references all the files. So we do not parallelize rename() (renaming for persistent first), and in fact we must also inline a call to persistent.syncMetaData() within TeeDirectory.rename(). This is addressed (along with related comments) in the latest commit.

// of `pending_segments_*` files, these files are ~explicitly partial, but must be
// deleted. It's not ideal that we have to be lenient here, but we kind of have to do,
// because we know there are legit situations where files can exist in access and not
// in persistent, and we must support the ability to delete such files.
log.info("swallow exception deleting missing persistent file: {}", name);
th = null;
}
} catch (NoSuchFileException ex) {
// when `persistent != null`, `access` is a special case. Since access may be on ephemeral
// storage, we should be ok with files being already absent if we're asked to delete them.
Expand Down Expand Up @@ -525,7 +548,11 @@ public void rename(String source, String dest) throws IOException {

@Override
public IndexInput openInput(String name, IOContext context) throws IOException {
return access.openInput(name, context);
IndexInput ret = access.openInput(name, context);
persistentLengthVerificationQueue.offer(
new TeeDirectoryFactory.PersistentLengthVerification(
access, persistent, name, ret.length()));
return ret;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,18 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.BiConsumer;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.LockFactory;
import org.apache.lucene.util.RamUsageEstimator;
Expand Down Expand Up @@ -73,6 +77,8 @@ public void initCoreContainer(CoreContainer cc) {

static class NodeLevelTeeDirectoryState implements SolrMetricProducer {
final ExecutorService ioExec = ExecutorUtil.newMDCAwareCachedThreadPool("teeIOExec");
private final Future<?> lengthVerificationTask;
final BlockingQueue<PersistentLengthVerification> persistentLengthVerificationQueue;
private final Future<?> activationTask;
final LinkedBlockingQueue<AccessDirectory.LazyEntry> activationQueue =
new LinkedBlockingQueue<>();
Expand All @@ -90,7 +96,8 @@ static class NodeLevelTeeDirectoryState implements SolrMetricProducer {
final Meter priorityActivateMeter = new Meter();
final Meter activateMeter = new Meter();

NodeLevelTeeDirectoryState() {
NodeLevelTeeDirectoryState(int lengthVerificationQueueSize) {
persistentLengthVerificationQueue = new ArrayBlockingQueue<>(lengthVerificationQueueSize);
activationTask =
ioExec.submit(
() -> {
Expand Down Expand Up @@ -144,6 +151,23 @@ static class NodeLevelTeeDirectoryState implements SolrMetricProducer {
}
return null;
});
lengthVerificationTask =
ioExec.submit(
() -> {
Thread t = Thread.currentThread();
while (!t.isInterrupted()) {
PersistentLengthVerification a = null;
try {
a = persistentLengthVerificationQueue.take();
a.verify();
} catch (InterruptedException e) {
t.interrupt();
break;
} catch (Throwable th) {
log.error("error verifying persistent length {}", a);
}
}
});
}

@Override
Expand Down Expand Up @@ -199,8 +223,69 @@ public SolrMetricsContext getSolrMetricsContext() {
public void close() throws IOException {
try (Closeable c1 = SolrMetricProducer.super::close;
Closeable c2 = () -> ExecutorUtil.shutdownAndAwaitTermination(ioExec)) {
activationTask.cancel(true);
try {
lengthVerificationTask.cancel(true);
} finally {
activationTask.cancel(true);
}
}
}
}

static final class PersistentLengthVerification {
private final Directory accessDir;
private final Directory persistentDir;
private final String name;
private final long accessLength;

PersistentLengthVerification(
Directory accessDir, Directory persistentDir, String name, long accessLength) {
this.accessDir = accessDir;
this.persistentDir = persistentDir;
this.name = name;
this.accessLength = accessLength;
}

private void verify() {
try {
long l = persistentDir.fileLength(name);
if (l != accessLength) {
log.error("file length mismatch {} != {}", l, this);
}
} catch (AlreadyClosedException th) {
// swallow this; we have to defer lookup, but we know that in doing so we run the risk
// the the directory will already have been closed by the time we look up the length
} catch (NoSuchFileException e) {
try {
accessDir.fileLength(name);
log.error("file absent in persistent, present in access: {}", this);
} catch (NoSuchFileException e1) {
// this is what we expect, so just swallow it
} catch (Throwable t) {
log.warn("unable to re-verify access length {}", this, t);
}
} catch (Throwable t) {
log.warn("unable to verify persistent length {}", this, t);
}
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("{name=").append(name).append(", length=").append(accessLength).append(", access=");
if (accessDir instanceof FSDirectory) {
sb.append(((FSDirectory) accessDir).getDirectory());
} else {
sb.append(accessDir);
}
sb.append(", persistent=");
if (persistentDir instanceof FSDirectory) {
sb.append(((FSDirectory) persistentDir).getDirectory());
} else {
sb.append(persistentDir);
}
sb.append("}");
return sb.toString();
}
}

Expand All @@ -216,13 +301,13 @@ public void init(NamedList<?> args) {
"nodeLevelTeeDirectoryState",
NodeLevelTeeDirectoryState.class,
(k) -> {
NodeLevelTeeDirectoryState ret = new NodeLevelTeeDirectoryState();
NodeLevelTeeDirectoryState ret = new NodeLevelTeeDirectoryState(4096);
ret.initializeMetrics(
cc.getMetricsHandler().getSolrMetricsContext(), "teeDirectory");
return ret;
});
} else {
nodeLevelState = new NodeLevelTeeDirectoryState();
nodeLevelState = new NodeLevelTeeDirectoryState(64);
ownNodeLevelState = nodeLevelState;
}
super.init(args);
Expand Down Expand Up @@ -297,7 +382,7 @@ public Directory create(String path, LockFactory lockFactory, DirContext dirCont
return new AbstractMap.SimpleImmutableEntry<>(content, Collections.emptyList());
};
return new SizeAwareDirectory(
new TeeDirectory(naive, accessFunction, persistentFunction, nodeLevelState.ioExec), 0);
new TeeDirectory(naive, accessFunction, persistentFunction, nodeLevelState), 0);
}

@Override
Expand Down

0 comments on commit ec1936c

Please sign in to comment.