From b9fd7570f752b6d0b311648e0f6b901402a2708d Mon Sep 17 00:00:00 2001 From: Michael Gibney Date: Mon, 18 Mar 2024 17:22:22 -0400 Subject: [PATCH] we must also guard mmap'd access in AccessDirectory (#185) * we must also guard mmap'd access in AccessDirectory * tidy * slightly different way to alter TeeDirectory test path scoping * preferred public ctor for CompressingDirectory takes NodeLevelTeeDirectoryState * add comment about CompressingDirectory ctor * add a comment about seemingly superfluous interface `LazyLoad` --- lucene | 2 +- .../apache/solr/storage/AccessDirectory.java | 133 ++++++++++++------ .../solr/storage/CompressingDirectory.java | 21 ++- .../org/apache/solr/storage/TeeDirectory.java | 2 +- .../solr/storage/TeeDirectoryFactory.java | 14 +- 5 files changed, 118 insertions(+), 54 deletions(-) diff --git a/lucene b/lucene index cb2b7c495ac..232bc1550f9 160000 --- a/lucene +++ b/lucene @@ -1 +1 @@ -Subproject commit cb2b7c495ac9e3bb8bca39e3798e8addd16fb7ff +Subproject commit 232bc1550f921442e1c765687dce521b3aacd1df diff --git a/solr/core/src/java/org/apache/solr/storage/AccessDirectory.java b/solr/core/src/java/org/apache/solr/storage/AccessDirectory.java index 3f74746405d..3fa87fd4a69 100644 --- a/solr/core/src/java/org/apache/solr/storage/AccessDirectory.java +++ b/solr/core/src/java/org/apache/solr/storage/AccessDirectory.java @@ -51,6 +51,7 @@ import java.util.TreeSet; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicIntegerArray; @@ -212,6 +213,7 @@ private void activate(LazyEntry lazyEntry) throws IOException { if (!canonical.loaded.compareAndSet(null, super.openInput(name, IOContext.READ))) { log.warn("out-of-band loading detected for file {}", name); } + canonical.loadedLatch.countDown(); } finally { if (!success) { IOUtils.deleteFilesIgnoringExceptions(this, lazyName); @@ -578,14 +580,26 @@ private static ByteBufferGuard.BufferCleaner unmapHack() { } } - static final class LazyLoadInput extends IndexInput implements RandomAccessInput { + /** + * This interface is mainly useful for testing. We don't want to directly expose {@link + * LazyLoadInput} (the only implementing class, at time of writing); but we expose this interface + * to allow tests to efficiently block before checking that the filesystem has been updated (i.e., + * that the access copy of a given file is present and fully populated). + */ + public interface LazyLoad { + void blockUntilLoaded() throws InterruptedException; + } + + static final class LazyLoadInput extends IndexInput implements RandomAccessInput, LazyLoad { private final AtomicIntegerArray populatedUpTo; private final ConcurrentIntSet priorityLoad; private final AtomicReference loaded; + private final CountDownLatch loadedLatch; private final AtomicIntegerArray reserve; private final AtomicIntegerArray complete; private final ByteBufferGuard compressedGuard; private final ByteBufferGuard guard; + private final boolean[] closed; private final long length; private final boolean isClone; private final long[] blockOffsets; @@ -629,7 +643,9 @@ private LazyLoadInput( this.populatedUpTo = parent.populatedUpTo; this.priorityLoad = parent.priorityLoad; this.loaded = parent.loaded; + this.loadedLatch = parent.loadedLatch; this.guard = parent.guard; + this.closed = parent.closed; this.compressedGuard = parent.compressedGuard; this.accessPopulated = parent.accessPopulated; this.isClone = true; @@ -679,7 +695,9 @@ private LazyLoadInput( this.populatedCt = populatedCt; this.lazyCt = lazyCt; loaded = new AtomicReference<>(); + loadedLatch = new CountDownLatch(1); guard = new ByteBufferGuard("accessGuard", unmapHack()); + this.closed = new boolean[1]; compressedGuard = new ByteBufferGuard("compressedGuard", unmapHack()); try (FileChannel channel = FileChannel.open(source, StandardOpenOption.READ)) { long compressedFileSize = channel.size(); @@ -990,7 +1008,23 @@ private boolean loadBlock(int blockIdx, int decompressedLen) throws IOException long accessBlockStart = ((long) blockIdx) << COMPRESSION_BLOCK_SHIFT; ByteBuffer dest = accessMapped[(int) (accessBlockStart >> MAX_MAP_SHIFT)]; int destPos = (int) (accessBlockStart & MAX_MAP_MASK); - dest.clear().position(destPos).put(supply); + synchronized (closed) { + // all read/write access to `accessMapped` is protected by `guard`, but + // it costs us essentially nothing to add another layer of protection here. + // The extra protection is particularly relevant for the case of background + // loading, since it by definition takes place asynchronously, and may be + // reasonably expected to long outlive the mmap'd buffers used here (as + // opposed to normal synchronous use, which would only relatively rarely + // persist beyond the closing of the associated input. + // NOTE: "costs us nothing" because `loadBlock()` is only ever called from + // a single thread, so the only lock contention we have is the one-time + // call to `close()` on the root input. We also don't really care about + // any locking overhead here because async loading runs in the background. + if (closed[0]) { + throw new AlreadyClosedException("already closed"); + } + guard.putBytes(dest.clear().position(destPos), supply); + } setReadable(blockIdx); } catch (Throwable t) { releaseWrite(blockIdx); @@ -1028,7 +1062,7 @@ private void refill(final long pos, final int compressedLen, final int blockIdx) ByteBuffer dest = accessMapped[(int) (accessBlockStart >> MAX_MAP_SHIFT)]; int destPos = (int) (accessBlockStart & MAX_MAP_MASK); dest.clear().position(destPos).limit(destPos + decompressedLen).mark(); - dest.put(supply).reset(); + guard.putBytes(dest, supply).reset(); postBuffer = dest; postBufferBaseline = destPos; setReadable(blockIdx); @@ -1066,7 +1100,8 @@ public byte readByte(final long pos) throws IOException { if (blockIdx != currentBlockIdx) { initBlock(blockIdx); } - return postBuffer.get(postBufferBaseline + (int) (absolutePos & COMPRESSION_BLOCK_MASK_LOW)); + return guard.getByte( + postBuffer, postBufferBaseline + (int) (absolutePos & COMPRESSION_BLOCK_MASK_LOW)); } @Override @@ -1082,7 +1117,7 @@ public short readShort(final long pos) throws IOException { if (remaining < Short.BYTES) { return slowReadShort(remaining, localPos); } else { - return postBuffer.getShort(localPos); + return guard.getShort(postBuffer, localPos); } } @@ -1099,7 +1134,7 @@ public int readInt(final long pos) throws IOException { if (remaining < Integer.BYTES) { return slowReadInt(remaining, localPos); } else { - return postBuffer.getInt(localPos); + return guard.getInt(postBuffer, localPos); } } @@ -1116,7 +1151,7 @@ public long readLong(final long pos) throws IOException { if (remaining < Long.BYTES) { return slowReadLong(remaining, localPos); } else { - return postBuffer.getLong(localPos); + return guard.getLong(postBuffer, localPos); } } @@ -1136,7 +1171,7 @@ private byte _readByte(final int remaining) throws IOException { } filePointer++; - return postBuffer.get(); + return guard.getByte(postBuffer); } @Override @@ -1151,20 +1186,20 @@ public void readBytes(byte[] dst, int offset, int len) throws IOException { if (left < len) { slowReadBytes(dst, offset, len, left); } else { - postBuffer.get(dst, offset, len); + guard.getBytes(postBuffer, dst, offset, len); } } private void slowReadBytes(final byte[] dst, int offset, int toRead, int left) throws IOException { do { - postBuffer.get(dst, offset, left); + guard.getBytes(postBuffer, dst, offset, left); toRead -= left; offset += left; refill(); left = postBuffer.remaining(); } while (left < toRead); - postBuffer.get(dst, offset, toRead); + guard.getBytes(postBuffer, dst, offset, toRead); } @Override @@ -1179,7 +1214,7 @@ public short readShort() throws IOException { return slowReadShort(remaining); } else { filePointer += Short.BYTES; - return postBuffer.getShort(); + return guard.getShort(postBuffer); } } @@ -1191,9 +1226,9 @@ private short slowReadShort(final int remaining) throws IOException { private short slowReadShort(final int remaining, final int pos) throws IOException { assert remaining == 1; - final byte b1 = postBuffer.get(pos); + final byte b1 = guard.getByte(postBuffer, pos); refill(); - final byte b2 = postBuffer.get(postBufferBaseline); + final byte b2 = guard.getByte(postBuffer, postBufferBaseline); return (short) (((b2 & 0xFF) << 8) | (b1 & 0xFF)); } @@ -1209,7 +1244,7 @@ public int readInt() throws IOException { return slowReadInt(remaining); } else { filePointer += Integer.BYTES; - return postBuffer.getInt(); + return guard.getInt(postBuffer); } } @@ -1218,7 +1253,7 @@ private int _readInt(final int remaining) throws IOException { return slowReadInt(remaining); } else { filePointer += Integer.BYTES; - return postBuffer.getInt(); + return guard.getInt(postBuffer); } } @@ -1234,28 +1269,28 @@ private int _readInt(final int remaining, final int pos) throws IOException { if (remaining < Integer.BYTES) { return slowReadInt(remaining, pos); } else { - return postBuffer.getInt(pos); + return guard.getInt(postBuffer, pos); } } private int slowReadInt(int remaining, int pos) throws IOException { assert remaining > 0; - final byte b1 = postBuffer.get(pos++); + final byte b1 = guard.getByte(postBuffer, pos++); if (--remaining == 0) { refill(); pos = postBufferBaseline; } - final byte b2 = postBuffer.get(pos++); + final byte b2 = guard.getByte(postBuffer, pos++); if (--remaining == 0) { refill(); pos = postBufferBaseline; } - final byte b3 = postBuffer.get(pos++); + final byte b3 = guard.getByte(postBuffer, pos++); if (--remaining == 0) { refill(); pos = postBufferBaseline; } - final byte b4 = postBuffer.get(pos); + final byte b4 = guard.getByte(postBuffer, pos); return ((b4 & 0xFF) << 24) | ((b3 & 0xFF) << 16) | ((b2 & 0xFF) << 8) | (b1 & 0xFF); } @@ -1272,7 +1307,7 @@ public long readLong() throws IOException { | (((long) _readInt(postBuffer.remaining())) << 32); } else { filePointer += Long.BYTES; - return postBuffer.getLong(); + return guard.getLong(postBuffer); } } @@ -1282,7 +1317,7 @@ public long _readLong(final int remaining) throws IOException { | (((long) _readInt(postBuffer.remaining())) << 32); } else { filePointer += Long.BYTES; - return postBuffer.getLong(); + return guard.getLong(postBuffer); } } @@ -1295,7 +1330,7 @@ private long slowReadLong(final int remaining, final int pos) throws IOException } else if (remaining == Integer.BYTES) { // aligned, so we can refill directly refill(); - l2 = postBuffer.getInt(postBufferBaseline); + l2 = guard.getInt(postBuffer, postBufferBaseline); } else { // the first _readInt will _not_ have refilled the buffer, so proceed normally l2 = _readInt(remaining - Integer.BYTES, pos + Integer.BYTES); @@ -1333,23 +1368,23 @@ public int _readVInt(int remaining) throws IOException { i |= (b & 0x0F) << 28; if ((b & 0xF0) == 0) return i; } else { - b = postBuffer.get(); + b = guard.getByte(postBuffer); filePointer++; if (b >= 0) return b; int i = b & 0x7F; - b = postBuffer.get(); + b = guard.getByte(postBuffer); filePointer++; i |= (b & 0x7F) << 7; if (b >= 0) return i; - b = postBuffer.get(); + b = guard.getByte(postBuffer); filePointer++; i |= (b & 0x7F) << 14; if (b >= 0) return i; - b = postBuffer.get(); + b = guard.getByte(postBuffer); filePointer++; i |= (b & 0x7F) << 21; if (b >= 0) return i; - b = postBuffer.get(); + b = guard.getByte(postBuffer); filePointer++; // Warning: the next ands use 0x0F / 0xF0 - beware copy/paste errors: i |= (b & 0x0F) << 28; @@ -1410,46 +1445,46 @@ private long readVLong(final boolean allowNegative) throws IOException { } b = _readByte(--remaining); } else { - b = postBuffer.get(); + b = guard.getByte(postBuffer); filePointer++; if (b >= 0) return b; i = b & 0x7FL; - b = postBuffer.get(); + b = guard.getByte(postBuffer); filePointer++; i |= (b & 0x7FL) << 7; if (b >= 0) return i; - b = postBuffer.get(); + b = guard.getByte(postBuffer); filePointer++; i |= (b & 0x7FL) << 14; if (b >= 0) return i; - b = postBuffer.get(); + b = guard.getByte(postBuffer); filePointer++; i |= (b & 0x7FL) << 21; if (b >= 0) return i; - b = postBuffer.get(); + b = guard.getByte(postBuffer); filePointer++; i |= (b & 0x7FL) << 28; if (b >= 0) return i; - b = postBuffer.get(); + b = guard.getByte(postBuffer); filePointer++; i |= (b & 0x7FL) << 35; if (b >= 0) return i; - b = postBuffer.get(); + b = guard.getByte(postBuffer); filePointer++; i |= (b & 0x7FL) << 42; if (b >= 0) return i; - b = postBuffer.get(); + b = guard.getByte(postBuffer); filePointer++; i |= (b & 0x7FL) << 49; if (b >= 0) return i; - b = postBuffer.get(); + b = guard.getByte(postBuffer); filePointer++; i |= (b & 0x7FL) << 56; if (b >= 0) return i; if (!allowNegative) { throw new IOException("Invalid vLong detected (negative values disallowed)"); } - b = postBuffer.get(); + b = guard.getByte(postBuffer); filePointer++; } i |= (b & 0x7FL) << 63; @@ -1476,7 +1511,7 @@ public void readLongs(final long[] dst, final int offset, final int length) thro } } else { final int position = postBuffer.position(); - longViews[position & 0x07].position(position >>> 3).get(dst, offset, length); + guard.getLongs(longViews[position & 0x07].position(position >>> 3), dst, offset, length); filePointer += bytesRequested; postBuffer.position(position + (int) bytesRequested); } @@ -1501,7 +1536,7 @@ public void readInts(final int[] dst, final int offset, final int length) throws } } else { final int position = postBuffer.position(); - intViews[position & 0x03].position(position >>> 2).get(dst, offset, length); + guard.getInts(intViews[position & 0x03].position(position >>> 2), dst, offset, length); filePointer += bytesRequested; postBuffer.position(position + (int) bytesRequested); } @@ -1527,7 +1562,7 @@ public void readFloats(final float[] dst, final int offset, final int length) } } else { final int position = postBuffer.position(); - floatViews[position & 0x03].position(position >>> 2).get(dst, offset, length); + guard.getFloats(floatViews[position & 0x03].position(position >>> 2), dst, offset, length); filePointer += bytesRequested; postBuffer.position(position + (int) bytesRequested); } @@ -1622,7 +1657,7 @@ public String _readString() throws IOException { if (left < length) { slowReadBytes(bytes, 0, length, left); } else { - postBuffer.get(bytes, 0, length); + guard.getBytes(postBuffer, bytes, 0, length); } return new String(bytes, 0, length, StandardCharsets.UTF_8); } @@ -1685,6 +1720,13 @@ public void close() throws IOException { if (isClone) return; + synchronized (closed) { + // use this to reliably signal background loading threads that they should not + // attempt to load any more blocks. The only lock contention here is (potentially) + // with single-threaded background loading + closed[0] = true; + } + // tell the guard to invalidate and later unmap the bytebuffers (if supported): try (priorityLoad; IndexInput fullyLoaded = loaded.get()) { @@ -1759,5 +1801,10 @@ public IndexInput slice(String sliceDescription, long offset, long length) throw return new LazyLoadInput("slice", this, offset, length); } } + + @Override + public void blockUntilLoaded() throws InterruptedException { + loadedLatch.await(); + } } } diff --git a/solr/core/src/java/org/apache/solr/storage/CompressingDirectory.java b/solr/core/src/java/org/apache/solr/storage/CompressingDirectory.java index 70ca3bfc654..612b22c7b6d 100644 --- a/solr/core/src/java/org/apache/solr/storage/CompressingDirectory.java +++ b/solr/core/src/java/org/apache/solr/storage/CompressingDirectory.java @@ -92,8 +92,27 @@ static OpenOption getDirectOpenOption() { private final boolean useAsyncIO; private final boolean useDirectIO; + /** + * The main way that we expect {@link CompressingDirectory} to be used is in the context of {@link + * TeeDirectory}, with node-level resources (such as {@link #ioExec}) being maintained at the node + * level by {@link org.apache.solr.storage.TeeDirectoryFactory.NodeLevelTeeDirectoryState}. + * + *

So the public ctor here takes {@link + * org.apache.solr.storage.TeeDirectoryFactory.NodeLevelTeeDirectoryState} as an arg instead of + * the (potentially simpler) {@link ExecutorService}, so that {@link + * org.apache.solr.storage.TeeDirectoryFactory.NodeLevelTeeDirectoryState} may be held and tracked + * outside of this package, but without exposing any of its internal fields. + */ public CompressingDirectory( - Path path, ExecutorService ioExec, boolean useAsyncIO, boolean useDirectIO) + Path path, + TeeDirectoryFactory.NodeLevelTeeDirectoryState s, + boolean useAsyncIO, + boolean useDirectIO) + throws IOException { + this(path, s.ioExec, useAsyncIO, useDirectIO); + } + + CompressingDirectory(Path path, ExecutorService ioExec, boolean useAsyncIO, boolean useDirectIO) throws IOException { super(path, FSLockFactory.getDefault()); this.blockSize = (int) (Files.getFileStore(path).getBlockSize()); diff --git a/solr/core/src/java/org/apache/solr/storage/TeeDirectory.java b/solr/core/src/java/org/apache/solr/storage/TeeDirectory.java index 57950e689cc..04a4360e891 100644 --- a/solr/core/src/java/org/apache/solr/storage/TeeDirectory.java +++ b/solr/core/src/java/org/apache/solr/storage/TeeDirectory.java @@ -101,7 +101,7 @@ public TeeDirectory(Path path, LockFactory lockFactory) throws IOException { content -> { assert content == naive; content.close(); - content = new CompressingDirectory(compressedPath, ownState.ioExec, true, true); + content = new CompressingDirectory(compressedPath, ownState, true, true); return new AbstractMap.SimpleImmutableEntry<>(content, Collections.emptyList()); }; } diff --git a/solr/core/src/java/org/apache/solr/storage/TeeDirectoryFactory.java b/solr/core/src/java/org/apache/solr/storage/TeeDirectoryFactory.java index 0bf52a27b70..59000232197 100644 --- a/solr/core/src/java/org/apache/solr/storage/TeeDirectoryFactory.java +++ b/solr/core/src/java/org/apache/solr/storage/TeeDirectoryFactory.java @@ -83,7 +83,7 @@ public void initCoreContainer(CoreContainer cc) { this.cc = new WeakReference<>(cc); } - static class NodeLevelTeeDirectoryState implements SolrMetricProducer { + public static class NodeLevelTeeDirectoryState implements SolrMetricProducer { final ExecutorService ioExec = ExecutorUtil.newMDCAwareCachedThreadPool("teeIOExec"); private final Future lengthVerificationTask; final BlockingQueue persistentLengthVerificationQueue; @@ -104,7 +104,7 @@ static class NodeLevelTeeDirectoryState implements SolrMetricProducer { final Meter priorityActivateMeter = new Meter(); final Meter activateMeter = new Meter(); - NodeLevelTeeDirectoryState(int lengthVerificationQueueSize) { + public NodeLevelTeeDirectoryState(int lengthVerificationQueueSize) { persistentLengthVerificationQueue = new ArrayBlockingQueue<>(lengthVerificationQueueSize); activationTask = ioExec.submit( @@ -332,8 +332,6 @@ public void init(NamedList args) { useAsyncIO = params.getBool("useAsyncIO", useDirectIO); } - private static final boolean TEST_CONTEXT = System.getProperty("tests.seed") != null; - static String getScopeName(String accessDir, String path) { int lastPathDelimIdx = path.lastIndexOf('/'); if (lastPathDelimIdx == -1) { @@ -342,18 +340,19 @@ static String getScopeName(String accessDir, String path) { String dirName = path.substring(path.lastIndexOf('/')); int end = path.lastIndexOf('/', lastPathDelimIdx - 1); int start = path.lastIndexOf('/', end - 1); + boolean testContext = System.getProperty("tests.seed") != null; String ret; if ("/index".equals(dirName)) { ret = path.substring(start, end); } else if (dirName.startsWith("/index.")) { // append the suffix identifier; this is a snapshot or temp index dir ret = path.substring(start, end).concat(dirName.substring("/index".length())); - } else if (TEST_CONTEXT) { + } else if (testContext) { ret = path.substring(path.lastIndexOf('/')); } else { throw new IllegalArgumentException("unexpected path: " + path); } - if (TEST_CONTEXT) { + if (testContext && !"disable".equals(System.getProperty("solr.teeDirectory.timeScope"))) { ret += "-" + Long.toUnsignedString(System.nanoTime(), 16); Path p = Path.of(path); if (p.startsWith(accessDir)) { @@ -390,8 +389,7 @@ public Directory create(String path, LockFactory lockFactory, DirContext dirCont assert content == naive; content.close(); content = - new CompressingDirectory( - compressedPath, nodeLevelState.ioExec, useAsyncIO, useDirectIO); + new CompressingDirectory(compressedPath, nodeLevelState, useAsyncIO, useDirectIO); return new AbstractMap.SimpleImmutableEntry<>(content, Collections.emptyList()); }; backing = new TeeDirectory(naive, accessFunction, persistentFunction, nodeLevelState);