diff --git a/solr/core/src/java/org/apache/solr/core/DirectoryFactory.java b/solr/core/src/java/org/apache/solr/core/DirectoryFactory.java index b860d4608c3..2eedfcf40d8 100644 --- a/solr/core/src/java/org/apache/solr/core/DirectoryFactory.java +++ b/solr/core/src/java/org/apache/solr/core/DirectoryFactory.java @@ -166,6 +166,18 @@ public long size(Directory directory) throws IOException { return sizeOfDirectory(directory); } + /** + * @param directory to calculate size of + * @return size in bytes on disk, regardless of compression, etc + * @throws IOException on low level IO error + */ + public long onDiskSize(Directory directory) throws IOException { + if (directory instanceof OnDiskSizeDirectory) { + return onDiskSizeOfDirectory(directory); + } + return sizeOfDirectory(directory); + } + /** * @param path to calculate size of * @return size in bytes @@ -267,6 +279,12 @@ public boolean isAbsolute(String path) { public interface SizeAware { long size() throws IOException; + + long onDiskSize() throws IOException; + } + + public interface OnDiskSizeDirectory { + long onDiskFileLength(String name) throws IOException; } public static long sizeOfDirectory(Directory directory) throws IOException { @@ -295,6 +313,35 @@ public static long sizeOf(Directory directory, String file) throws IOException { } } + public static long onDiskSizeOfDirectory(Directory directory) throws IOException { + if (directory instanceof SizeAware) { + return ((SizeAware) directory).onDiskSize(); + } + final String[] files = directory.listAll(); + long size = 0; + + for (final String file : files) { + size += onDiskSizeOf(directory, file); + if (size < 0) { + break; + } + } + + return size; + } + + public static long onDiskSizeOf(Directory directory, String file) throws IOException { + if (directory instanceof DirectoryFactory.OnDiskSizeDirectory) { + try { + return ((OnDiskSizeDirectory) directory).onDiskFileLength(file); + } catch (IOException e) { + // could be a race, file no longer exists, access denied, is a directory, etc. + return 0; + } + } + return directory.fileLength(file); + } + /** Delete the files in the Directory */ public static boolean empty(Directory dir) { boolean isSuccess = true; diff --git a/solr/core/src/java/org/apache/solr/core/SolrCore.java b/solr/core/src/java/org/apache/solr/core/SolrCore.java index 351aafbf729..10022e650dd 100644 --- a/solr/core/src/java/org/apache/solr/core/SolrCore.java +++ b/solr/core/src/java/org/apache/solr/core/SolrCore.java @@ -530,11 +530,49 @@ public long getIndexSize() { } } + public long getOnDiskSize() { + SolrRequestInfo requestInfo = SolrRequestInfo.getRequestInfo(); + if (requestInfo != null) { + return (Long) + requestInfo + .getReq() + .getContext() + .computeIfAbsent(cachedOnDiskIndexSizeKeyName(), key -> calculateOnDiskSize()); + } else { + return calculateOnDiskSize(); + } + } + + private long calculateOnDiskSize() { + Directory dir; + long size = 0; + try { + if (directoryFactory.exists(getIndexDir())) { + dir = + directoryFactory.get( + getIndexDir(), DirContext.DEFAULT, solrConfig.indexConfig.lockType); + try { + size = directoryFactory.onDiskSize(dir); + } finally { + directoryFactory.release(dir); + } + } + } catch (IOException e) { + log.error("IO error while trying to get the size of the Directory", e); + } + return size; + } + private String cachedIndexSizeKeyName() { // avoid collision when we put index sizes for multiple cores in the same metrics request return "indexSize_" + getName(); } + private String cachedOnDiskIndexSizeKeyName() { + // avoid collision when we put index sizes for multiple cores in the same metrics request + return "onDiskIndexSize_" + getName(); + } + public int getSegmentCount() { try { return withSearcher( diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminOperation.java b/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminOperation.java index bf82ae38c4e..e93a7d2536f 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminOperation.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminOperation.java @@ -380,8 +380,14 @@ public static NamedList getCoreStatus( SimpleOrderedMap indexInfo = LukeRequestHandler.getIndexInfo(searcher.get().getIndexReader()); long size = core.getIndexSize(); + long onDiskSize = core.getOnDiskSize(); + String readableSize = NumberUtils.readableSize(size); + String readableOnDiskSize = + size == onDiskSize ? readableSize : NumberUtils.readableSize(onDiskSize); indexInfo.add("sizeInBytes", size); indexInfo.add("size", NumberUtils.readableSize(size)); + indexInfo.add("onDiskSizeInBytes", onDiskSize); + indexInfo.add("onDiskSize", readableOnDiskSize); info.add("index", indexInfo); } finally { searcher.decref(); diff --git a/solr/core/src/java/org/apache/solr/storage/CompressingDirectory.java b/solr/core/src/java/org/apache/solr/storage/CompressingDirectory.java index bec32a42078..98130efcc18 100644 --- a/solr/core/src/java/org/apache/solr/storage/CompressingDirectory.java +++ b/solr/core/src/java/org/apache/solr/storage/CompressingDirectory.java @@ -39,8 +39,10 @@ import org.apache.lucene.store.OutputStreamDataOutput; import org.apache.lucene.util.CollectionUtil; import org.apache.lucene.util.compress.LZ4; +import org.apache.solr.core.DirectoryFactory; -public class CompressingDirectory extends FSDirectory { +public class CompressingDirectory extends FSDirectory + implements DirectoryFactory.OnDiskSizeDirectory { /** * Reference to {@code com.sun.nio.file.ExtendedOpenOption.DIRECT} by reflective class and enum @@ -167,6 +169,11 @@ public long fileLength(String name) throws IOException { return readLengthFromHeader(path); } + @Override + public long onDiskFileLength(String name) throws IOException { + return Files.size(directoryPath.resolve(name)); + } + public static long readLengthFromHeader(Path path) throws IOException { if (Files.size(path) < Long.BYTES) { return 0; @@ -243,7 +250,11 @@ public int transferTo(DataOutput out) throws IOException { } } - static final class DirectIOIndexOutput extends IndexOutput { + public interface SizeReportingIndexOutput { + long getBytesWritten(); + } + + static final class DirectIOIndexOutput extends IndexOutput implements SizeReportingIndexOutput { private final byte[] compressBuffer = new byte[COMPRESSION_BLOCK_SIZE]; private final LZ4.FastCompressionHashTable ht = new LZ4.FastCompressionHashTable(); private final ByteBuffer preBuffer; @@ -264,6 +275,7 @@ static final class DirectIOIndexOutput extends IndexOutput { static final int HEADER_SIZE = 16; // 16 bytes private long filePos; + private long bytesWritten; private boolean isOpen; private final DirectBufferPool initialBlockBufferPool; @@ -296,6 +308,7 @@ public DirectIOIndexOutput( preBuffer = ByteBuffer.wrap(compressBuffer); this.initialBlockBufferPool = initialBlockBufferPool; initialBlock = initialBlockBufferPool.get(); + bytesWritten = HEADER_SIZE; // allocate space for the header buffer.position(buffer.position() + HEADER_SIZE); @@ -340,6 +353,7 @@ private void dump() throws IOException { LZ4.compressWithDictionary(compressBuffer, 0, 0, COMPRESSION_BLOCK_SIZE, out, ht); int nextBlockSize = out.resetSize(); + bytesWritten += nextBlockSize; blockDeltas.writeZInt(nextBlockSize - prevBlockSize); prevBlockSize = nextBlockSize; filePos += COMPRESSION_BLOCK_SIZE; @@ -355,6 +369,7 @@ private void flush() throws IOException { LZ4.compressWithDictionary(compressBuffer, 0, 0, preBufferRemaining, out, ht); } int blockMapFooterSize = blockDeltas.transferTo(out); + bytesWritten += out.resetSize(); if (wroteBlock) { writeHelper.flush(buffer, true); initialBlock.putLong(0, filePos); @@ -377,6 +392,11 @@ private void flush() throws IOException { } } + @Override + public long getBytesWritten() { + return bytesWritten; + } + private final SizeTrackingDataOutput out = new SizeTrackingDataOutput(); private void writeBlock() throws IOException { diff --git a/solr/core/src/java/org/apache/solr/storage/SizeAwareDirectory.java b/solr/core/src/java/org/apache/solr/storage/SizeAwareDirectory.java index e0627475283..20608700a77 100644 --- a/solr/core/src/java/org/apache/solr/storage/SizeAwareDirectory.java +++ b/solr/core/src/java/org/apache/solr/storage/SizeAwareDirectory.java @@ -17,7 +17,6 @@ package org.apache.solr.storage; -import com.carrotsearch.hppc.procedures.LongObjectProcedure; import java.io.IOException; import java.lang.invoke.MethodHandles; import java.util.Set; @@ -38,7 +37,7 @@ import org.slf4j.LoggerFactory; public class SizeAwareDirectory extends FilterDirectory - implements DirectoryFactory.SizeAware, Accountable { + implements DirectoryFactory.SizeAware, Accountable, DirectoryFactory.OnDiskSizeDirectory { @SuppressWarnings("rawtypes") private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(SizeAwareDirectory.class) @@ -50,15 +49,41 @@ public class SizeAwareDirectory extends FilterDirectory private boolean initialized = false; private volatile long reconciledTimeNanos; private volatile LongAdder size = new LongAdder(); - private volatile LongObjectProcedure sizeWriter = (size, name) -> this.size.add(size); + private volatile LongAdder onDiskSize = new LongAdder(); + private volatile SizeWriter sizeWriter = + (size, onDiskSize, name) -> { + this.size.add(size); + this.onDiskSize.add(onDiskSize); + }; - private final ConcurrentHashMap fileSizeMap = new ConcurrentHashMap<>(); + private final ConcurrentHashMap fileSizeMap = new ConcurrentHashMap<>(); private final ConcurrentHashMap liveOutputs = new ConcurrentHashMap<>(); @SuppressWarnings({"unchecked", "rawtypes"}) - private final Future[] computingSize = new Future[1]; + private final Future[] computingSize = new Future[1]; + + private interface SizeWriter { + void apply(long size, long onDiskSize, String name); + } + + private static class Sizes implements Accountable { + private static final long RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(Sizes.class); + + long size; + long onDiskSize; + + Sizes(long size, long onDiskSize) { + this.size = size; + this.onDiskSize = onDiskSize; + } + + @Override + public long ramBytesUsed() { + return RAM_BYTES_USED; + } + } public SizeAwareDirectory(Directory in, long reconcileTTLNanos) { super(in); @@ -79,10 +104,10 @@ public long ramBytesUsed() { @Override public long fileLength(String name) throws IOException { - Long ret = fileSizeMap.get(name); + Sizes ret = fileSizeMap.get(name); SizeAccountingIndexOutput live; if (ret != null) { - return ret; + return ret.size; } else if ((live = liveOutputs.get(name)) != null) { return live.backing.getFilePointer(); } else { @@ -91,6 +116,30 @@ public long fileLength(String name) throws IOException { } } + @Override + public long onDiskFileLength(String name) throws IOException { + Sizes ret = fileSizeMap.get(name); + SizeAccountingIndexOutput live; + if (ret != null) { + return ret.onDiskSize; + } else if ((live = liveOutputs.get(name)) != null) { + if (live.backing instanceof CompressingDirectory.SizeReportingIndexOutput) { + return ((CompressingDirectory.SizeReportingIndexOutput) live.backing).getBytesWritten(); + } else if (in instanceof DirectoryFactory.OnDiskSizeDirectory) { + // backing IndexOutput does not allow us to get onDiskSize + return 0; + } else { + return live.backing.getFilePointer(); + } + } else if (in instanceof DirectoryFactory.OnDiskSizeDirectory) { + // fallback delegate to wrapped Directory + return ((DirectoryFactory.OnDiskSizeDirectory) in).onDiskFileLength(name); + } else { + // directory does not implement onDiskSize + return in.fileLength(name); + } + } + @Override public long size() throws IOException { Integer reconcileThreshold = CoreAdminHandler.getReconcileThreshold(); @@ -99,8 +148,24 @@ public long size() throws IOException { || System.nanoTime() - reconciledTimeNanos < reconcileTTLNanos)) { return size.sum(); } - CompletableFuture weCompute; - Future theyCompute; + return initSize().size; + } + + @Override + public long onDiskSize() throws IOException { + Integer reconcileThreshold = CoreAdminHandler.getReconcileThreshold(); + if (initialized + && (reconcileThreshold == null + || System.nanoTime() - reconciledTimeNanos < reconcileTTLNanos)) { + return onDiskSize.sum(); + } + return initSize().onDiskSize; + } + + private Sizes initSize() throws IOException { + Integer reconcileThreshold = CoreAdminHandler.getReconcileThreshold(); + CompletableFuture weCompute; + Future theyCompute; synchronized (computingSize) { theyCompute = computingSize[0]; if (theyCompute == null) { @@ -121,30 +186,35 @@ public long size() throws IOException { final String[] files = in.listAll(); LongAdder recomputeSize = new LongAdder(); + LongAdder recomputeOnDiskSize = new LongAdder(); Set recomputed = ConcurrentHashMap.newKeySet(); - LongObjectProcedure dualSizeWriter = - (fileSize, name) -> { + SizeWriter dualSizeWriter = + (fileSize, onDiskFileSize, name) -> { size.add(fileSize); - if (fileSize >= 0 || recomputed.remove(name)) { + onDiskSize.add(onDiskFileSize); + if (fileSize >= 0 || onDiskFileSize >= 0 || recomputed.remove(name)) { // if it's a removal, we only want to adjust if we've already // incorporated this file in our count! recomputeSize.add(fileSize); + recomputeOnDiskSize.add(onDiskFileSize); } }; sizeWriter = dualSizeWriter; for (final String file : files) { - long fileSize; + Sizes sizes; recomputed.add(file); SizeAccountingIndexOutput liveOutput = liveOutputs.get(file); if (liveOutput != null) { // get fileSize already written at this moment - fileSize = liveOutput.setSizeWriter(dualSizeWriter); + sizes = liveOutput.setSizeWriter(dualSizeWriter); } else { - fileSize = DirectoryFactory.sizeOf(in, file); + long fileSize = DirectoryFactory.sizeOf(in, file); + long onDiskFileSize = DirectoryFactory.onDiskSizeOf(in, file); + sizes = new Sizes(fileSize, onDiskFileSize); if (fileSize > 0) { // whether the file exists or not, we don't care about it if it has zero size. // more often though, 0 size means the file isn't there. - fileSizeMap.put(file, fileSize); + fileSizeMap.put(file, sizes); if (DirectoryFactory.sizeOf(in, file) == 0) { // during reconciliation, we have to check for file presence _after_ adding // to the map, to prevent a race condition that could leak entries into `fileSizeMap` @@ -152,19 +222,23 @@ public long size() throws IOException { } } } - recomputeSize.add(fileSize); + recomputeSize.add(sizes.size); + recomputeOnDiskSize.add(sizes.onDiskSize); // TODO: do we really need to check for overflow here? // if (recomputeSize < 0) { // break; // } } - long ret = recomputeSize.sum(); - long extant = size.sum(); - long diff = extant - ret; + Sizes ret = new Sizes(recomputeSize.sum(), recomputeOnDiskSize.sum()); + Sizes extant = new Sizes(size.sum(), onDiskSize.sum()); + long diff = extant.size - ret.size; + long onDiskDiff = extant.onDiskSize - ret.onDiskSize; boolean initializing = !initialized; - if (!initializing && Math.abs(diff) < reconcileThreshold) { - double ratio = (double) extant / ret; + if (!initializing + && Math.abs(diff) < reconcileThreshold + && Math.abs(onDiskDiff) < reconcileThreshold) { + double ratio = (double) extant.size / ret.size; if (log.isInfoEnabled()) { log.info( "no need to reconcile (diff {}; ratio {}; overhead {}; sizes {}/{}/{})", @@ -178,9 +252,14 @@ public long size() throws IOException { ret = extant; } else { // swap the new objects into place - LongObjectProcedure replaceSizeWriter = (size, name) -> recomputeSize.add(size); + SizeWriter replaceSizeWriter = + (size, onDiskSize, name) -> { + recomputeSize.add(size); + recomputeOnDiskSize.add(onDiskSize); + }; sizeWriter = replaceSizeWriter; size = recomputeSize; + onDiskSize = recomputeOnDiskSize; for (SizeAccountingIndexOutput liveOutput : liveOutputs.values()) { liveOutput.setSizeWriter(replaceSizeWriter); } @@ -190,17 +269,19 @@ public long size() throws IOException { if (log.isInfoEnabled()) { log.info( "initialized heap-tracked size {} (overhead: {})", - RamUsageEstimator.humanReadableUnits(ret), + RamUsageEstimator.humanReadableUnits(ret.size), RamUsageEstimator.humanReadableUnits(ramBytesUsed())); } } else { - double ratio = (double) extant / ret; + double ratio = (double) extant.size / ret.size; + double onDiskRatio = (double) extant.onDiskSize / ret.onDiskSize; log.warn( - "reconcile size {} => {} (diff {}; ratio {}; overhead {})", + "reconcile size {} => {} (diff {}; ratio {}; onDiskRatio {}; overhead {})", extant, ret, humanReadableByteDiff(diff), ratio, + onDiskRatio, RamUsageEstimator.humanReadableUnits(ramBytesUsed())); } } @@ -228,9 +309,9 @@ public void deleteFile(String name) throws IOException { try { in.deleteFile(name); } finally { - Long fileSize = fileSizeMap.remove(name); + Sizes fileSize = fileSizeMap.remove(name); if (fileSize != null) { - sizeWriter.apply(-fileSize, name); + sizeWriter.apply(-fileSize.size, -fileSize.onDiskSize, name); } } } @@ -239,7 +320,7 @@ public void deleteFile(String name) throws IOException { public IndexOutput createOutput(String name, IOContext context) throws IOException { SizeAccountingIndexOutput ret = new SizeAccountingIndexOutput( - name, in.createOutput(name, context), fileSizeMap, liveOutputs, sizeWriter); + name, in.createOutput(name, context), fileSizeMap, liveOutputs, sizeWriter, in); liveOutputs.put(name, ret); return ret; } @@ -250,7 +331,7 @@ public IndexOutput createTempOutput(String prefix, String suffix, IOContext cont IndexOutput backing = in.createTempOutput(prefix, suffix, context); String name = backing.getName(); SizeAccountingIndexOutput ret = - new SizeAccountingIndexOutput(name, backing, fileSizeMap, liveOutputs, sizeWriter); + new SizeAccountingIndexOutput(name, backing, fileSizeMap, liveOutputs, sizeWriter, in); liveOutputs.put(name, ret); return ret; } @@ -264,45 +345,70 @@ private static final class SizeAccountingIndexOutput extends IndexOutput impleme private final IndexOutput backing; - private final ConcurrentHashMap fileSizeMap; + private final Directory backingDirectory; + + private final ConcurrentHashMap fileSizeMap; private final ConcurrentHashMap liveOutputs; - private volatile LongObjectProcedure sizeWriter; + private volatile SizeWriter sizeWriter; + + private long lastBytesWritten = 0; private SizeAccountingIndexOutput( String name, IndexOutput backing, - ConcurrentHashMap fileSizeMap, + ConcurrentHashMap fileSizeMap, ConcurrentHashMap liveOutputs, - LongObjectProcedure sizeWriter) { + SizeWriter sizeWriter, + Directory backingDirectory) { super("byteSize(" + name + ")", name); this.name = name; this.backing = backing; this.liveOutputs = liveOutputs; this.sizeWriter = sizeWriter; this.fileSizeMap = fileSizeMap; + this.backingDirectory = backingDirectory; } - public long setSizeWriter(LongObjectProcedure sizeWriter) { + public Sizes setSizeWriter(SizeWriter sizeWriter) { if (this.sizeWriter == sizeWriter) { - return 0; + return new Sizes(0, 0); } else { // NOTE: there's an unavoidable race condition between these two // lines, and this ordering may occasionally overestimate directory size. this.sizeWriter = sizeWriter; - return backing.getFilePointer(); + long onDiskSize; + if (backing instanceof CompressingDirectory.SizeReportingIndexOutput) { + onDiskSize = ((CompressingDirectory.SizeReportingIndexOutput) backing).getBytesWritten(); + } else if (backingDirectory instanceof DirectoryFactory.OnDiskSizeDirectory) { + onDiskSize = 0; + } else { + onDiskSize = getFilePointer(); + } + return new Sizes(this.getFilePointer(), onDiskSize); } } @Override @SuppressWarnings("try") public void close() throws IOException { - try (backing) { - fileSizeMap.put(name, backing.getFilePointer()); - } finally { - liveOutputs.remove(name); + backing.close(); + + long onDiskSize; + if (backing instanceof CompressingDirectory.SizeReportingIndexOutput) { + long finalBytesWritten = getBytesWritten(backing); + onDiskSize = finalBytesWritten; + // logical size should already be set through writeByte(s), but we need to finalize the + // on-disk size here + sizeWriter.apply(0, finalBytesWritten - lastBytesWritten, name); + } else if (backingDirectory instanceof DirectoryFactory.OnDiskSizeDirectory) { + onDiskSize = 0; + } else { + onDiskSize = getFilePointer(); } + fileSizeMap.put(name, new Sizes(backing.getFilePointer(), onDiskSize)); + liveOutputs.remove(name); } @Override @@ -318,13 +424,24 @@ public long getChecksum() throws IOException { @Override public void writeByte(byte b) throws IOException { backing.writeByte(b); - sizeWriter.apply(1, name); + long postBytesWritten = getBytesWritten(backing); + sizeWriter.apply(1, postBytesWritten - lastBytesWritten, name); + lastBytesWritten = postBytesWritten; + } + + private long getBytesWritten(IndexOutput out) { + if (backing instanceof CompressingDirectory.SizeReportingIndexOutput) { + return ((CompressingDirectory.SizeReportingIndexOutput) backing).getBytesWritten(); + } + return 0; } @Override public void writeBytes(byte[] b, int offset, int length) throws IOException { backing.writeBytes(b, offset, length); - sizeWriter.apply(length, name); + long postBytesWritten = getBytesWritten(backing); + sizeWriter.apply(length, postBytesWritten - lastBytesWritten, name); + lastBytesWritten = postBytesWritten; } @Override @@ -337,7 +454,7 @@ public long ramBytesUsed() { @Override public void rename(String source, String dest) throws IOException { in.rename(source, dest); - Long extant = fileSizeMap.put(dest, fileSizeMap.remove(source)); + Sizes extant = fileSizeMap.put(dest, fileSizeMap.remove(source)); assert extant == null; // it's illegal for dest to already exist } } diff --git a/solr/core/src/java/org/apache/solr/storage/TeeDirectory.java b/solr/core/src/java/org/apache/solr/storage/TeeDirectory.java index 2ae3521f0e2..5f0346ba443 100644 --- a/solr/core/src/java/org/apache/solr/storage/TeeDirectory.java +++ b/solr/core/src/java/org/apache/solr/storage/TeeDirectory.java @@ -51,11 +51,12 @@ import org.apache.lucene.store.MMapDirectory; import org.apache.lucene.util.IOUtils; import org.apache.solr.common.util.CollectionUtil; +import org.apache.solr.core.DirectoryFactory; import org.apache.solr.util.IOFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class TeeDirectory extends BaseDirectory { +public class TeeDirectory extends BaseDirectory implements DirectoryFactory.OnDiskSizeDirectory { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); @@ -64,7 +65,7 @@ public class TeeDirectory extends BaseDirectory { private final AutoCloseable closeLocal; private final IOFunction> accessFunction; private final IOFunction>> persistentFunction; - private volatile Directory persistent; + private volatile CompressingDirectory persistent; private final BlockingQueue persistentLengthVerificationQueue; @@ -127,7 +128,7 @@ private void init() throws IOException { if (this.persistent == null) { List buildAssociatedPaths = new ArrayList<>(3); Map.Entry> persistentEntry = persistentFunction.apply(access); - this.persistent = persistentEntry.getKey(); + this.persistent = (CompressingDirectory) persistentEntry.getKey(); Path persistentFSPath = ((FSDirectory) persistent).getDirectory(); buildAssociatedPaths.addAll(persistentEntry.getValue()); Map.Entry accessEntry = accessFunction.apply(null); @@ -371,6 +372,11 @@ public long fileLength(String name) throws IOException { return access.fileLength(name); } + @Override + public long onDiskFileLength(String name) throws IOException { + return persistent.onDiskFileLength(name); + } + @Override @SuppressWarnings("try") public IndexOutput createOutput(String name, IOContext context) throws IOException { @@ -409,7 +415,8 @@ public IndexOutput createOutput(String name, IOContext context) throws IOExcepti return new TeeIndexOutput(a, b); } - private static final class TeeIndexOutput extends IndexOutput { + private static final class TeeIndexOutput extends IndexOutput + implements CompressingDirectory.SizeReportingIndexOutput { private final IndexOutput primary; private final IndexOutput secondary; @@ -450,6 +457,15 @@ public long getFilePointer() { public long getChecksum() throws IOException { return primary.getChecksum(); } + + @Override + public long getBytesWritten() { + if (secondary instanceof CompressingDirectory.SizeReportingIndexOutput) { + return ((CompressingDirectory.SizeReportingIndexOutput) secondary).getBytesWritten(); + } else { + return getFilePointer(); + } + } } @Override diff --git a/solr/core/src/test/org/apache/solr/storage/SizeAwareDirectoryTest.java b/solr/core/src/test/org/apache/solr/storage/SizeAwareDirectoryTest.java new file mode 100644 index 00000000000..b8465d8c79a --- /dev/null +++ b/solr/core/src/test/org/apache/solr/storage/SizeAwareDirectoryTest.java @@ -0,0 +1,297 @@ +package org.apache.solr.storage; + +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexOutput; +import org.apache.solr.SolrTestCaseJ4; +import org.apache.solr.common.util.NamedList; +import org.apache.solr.core.DirectoryFactory; +import org.junit.Before; +import org.junit.Test; + +public class SizeAwareDirectoryTest extends SolrTestCaseJ4 { + + private final ConcurrentHashMap activeFiles = new ConcurrentHashMap<>(); + private final ConcurrentHashMap deletedFiles = new ConcurrentHashMap<>(); + private String path; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + activeFiles.clear(); + deletedFiles.clear(); + path = createTempDir().toString() + "/somedir"; + } + + @Test + public void testSizeTracking() throws Exception { + // after onDiskSize has been init(), the size should be correct using the LongAdder sum in + // SizeAwareDirectory + CompressingDirectoryFactory dirFac = new CompressingDirectoryFactory(); + try (dirFac) { + dirFac.initCoreContainer(null); + dirFac.init(new NamedList<>()); + + Directory dir = + dirFac.get(path, DirectoryFactory.DirContext.DEFAULT, DirectoryFactory.LOCK_TYPE_SINGLE); + try { + try (IndexOutput file = dir.createOutput("test_file", IOContext.DEFAULT)) { + file.writeInt(42); + } // implicitly close file + + long expectedDiskSize = Files.size(Paths.get(path + "/test_file")); + assertEquals( + "directory size should be equal to on disk size of test files", + expectedDiskSize, + dirFac.onDiskSize(dir)); + + try (IndexOutput file = dir.createOutput("test_file2", IOContext.DEFAULT)) { + file.writeInt(84); + } // implicitly close file + + expectedDiskSize = + Files.size(Paths.get(path + "/test_file")) + + Files.size(Paths.get(path + "/test_file2")); + assertEquals( + "directory size should be equal to on disk size of test files", + expectedDiskSize, + dirFac.onDiskSize(dir)); + } finally { + dirFac.release(dir); + } + } + } + + @Test + public void testDelete() throws Exception { + // write a file, then another, then delete one of the files - the onDiskSize should update + // correctly + CompressingDirectoryFactory dirFac = new CompressingDirectoryFactory(); + try (dirFac) { + dirFac.initCoreContainer(null); + dirFac.init(new NamedList<>()); + + Directory dir = + dirFac.get(path, DirectoryFactory.DirContext.DEFAULT, DirectoryFactory.LOCK_TYPE_SINGLE); + try { + // small file first to initSize() + try (IndexOutput file = dir.createOutput("test_file", IOContext.DEFAULT)) { + file.writeInt(42); + } // implicitly close file + + long expectedDiskSize = Files.size(Paths.get(path + "/test_file")); + assertEquals( + "directory size should be equal to on disk size of test files", + expectedDiskSize, + dirFac.onDiskSize(dir)); + + writeBlockSizeFile(dir, "test_file2"); + + expectedDiskSize = + Files.size(Paths.get(path + "/test_file")) + + Files.size(Paths.get(path + "/test_file2")); + assertEquals( + "directory size should be equal to on disk size of test files", + expectedDiskSize, + dirFac.onDiskSize(dir)); + + deleteFile(dir, "test_file2"); + expectedDiskSize = Files.size(Paths.get(path + "/test_file")); + assertEquals( + "directory size should be equal to on disk size of test files", + expectedDiskSize, + dirFac.onDiskSize(dir)); + } finally { + dirFac.release(dir); + } + } + } + + @Test + public void testSimultaneous() throws Exception { + CompressingDirectoryFactory dirFac = new CompressingDirectoryFactory(); + try (dirFac) { + dirFac.initCoreContainer(null); + dirFac.init(new NamedList<>()); + + Directory dir = + dirFac.get(path, DirectoryFactory.DirContext.DEFAULT, DirectoryFactory.LOCK_TYPE_SINGLE); + try { + String createPrefix = "test_first_"; + Random r = new Random(42); + int numCreateThreads = 100; + List createThreads = getCreateThreads(dir, numCreateThreads, r, createPrefix); + startThreads(createThreads); + waitForThreads(createThreads); + + List createAndDeleteThreads = + getCreateThreads(dir, numCreateThreads, r, "test_second_"); + // randomly delete 10 percent of the files created above, while also creating some files + createAndDeleteThreads.addAll( + getRandomDeleteThreads(dir, activeFiles.size() / 10, createPrefix, r)); + startThreads(createAndDeleteThreads); + waitForThreads(createAndDeleteThreads); + + long expected = expectedDiskSizeForFiles(); + long actual = dirFac.onDiskSize(dir); + assertEquals( + "directory size should be equal to on disk size of test files", expected, actual); + } finally { + dirFac.release(dir); + } + } + } + + private void waitForThreads(List threads) { + for (int i = 0; i < threads.size(); i++) { + try { + threads.get(i).join(); + } catch (InterruptedException e) { + System.out.println("Thread was interrupted"); + fail("thread was interrupted " + i); + } + } + } + + private List getCreateThreads(Directory dir, int numThreads, Random r, String prefix) { + List threads = new ArrayList<>(); + for (int i = 0; i < numThreads; i++) { + String name = prefix + i; + int size = r.nextInt(100000000) + CompressingDirectory.COMPRESSION_BLOCK_SIZE + 1; + threads.add(new Thread(new CreateFileTask(dir, name, size))); + } + return threads; + } + + private List getRandomDeleteThreads( + Directory dir, int numThreads, String prefix, Random r) { + List activeFilesFiltered = + activeFiles.keySet().stream() + .filter(s -> s.startsWith(prefix)) + .collect(Collectors.toList()); + // can't delete more files than exist + assertTrue(activeFilesFiltered.size() >= numThreads); + List filesToDelete = new ArrayList<>(); + for (int i = 0; i < numThreads; i++) { + int index = r.nextInt(activeFilesFiltered.size()); + filesToDelete.add(activeFilesFiltered.remove(index)); + } + + List threads = new ArrayList<>(); + for (int i = 0; i < numThreads; i++) { + threads.add(new Thread(new DeleteFileTask(dir, filesToDelete.get(i)))); + } + return threads; + } + + private void startThreads(List threads) { + for (Thread t : threads) { + t.start(); + } + } + + private long expectedDiskSizeForFiles() throws Exception { + long fileSize = 0; + for (String name : activeFiles.keySet()) { + fileSize += Files.size(Paths.get(path + "/" + name)); + } + return fileSize; + } + + private class CreateFileTask implements Runnable { + final String name; + final int size; + final Directory dir; + + public CreateFileTask(Directory dir, String name, int size) { + this.name = name; + this.size = size; + this.dir = dir; + } + + @Override + public void run() { + try { + writeRandomFileOfSize(dir, name, size); + } catch (Exception e) { + fail("exception writing file" + name); + } + } + } + + private class DeleteFileTask implements Runnable { + final String name; + final Directory dir; + + public DeleteFileTask(Directory dir, String name) { + this.dir = dir; + this.name = name; + } + + @Override + public void run() { + try { + deleteFile(dir, name); + } catch (Exception e) { + fail("exception deleting file" + name); + } + } + } + + private void deleteFile(Directory dir, String name) throws Exception { + dir.deleteFile(name); + activeFiles.remove(name); + addToDeletedFiles(name); + } + + private void writeBlockSizeFile(Directory dir, String name) throws Exception { + try (IndexOutput file = dir.createOutput(name, IOContext.DEFAULT)) { + // write some small things first to force past blocksize boundary + file.writeInt(42); + file.writeInt(84); + // write a giant blocksize thing to force compression with dump() + Random random = new Random(42); + int blocksize = CompressingDirectory.COMPRESSION_BLOCK_SIZE; + byte[] byteArray = new byte[blocksize]; + random.nextBytes(byteArray); + file.writeBytes(byteArray, blocksize); + } // implicitly close file + addToActiveFiles(name); + } + + private void writeRandomFileOfSize(Directory dir, String name, int size) throws Exception { + try (IndexOutput file = dir.createOutput(name, IOContext.DEFAULT)) { + // write a giant blocksize thing to force compression with dump() + Random random = new Random(42); + int bufferSize = 4096; // Chunk size + byte[] buffer = new byte[bufferSize]; + int remainingBytes = size; + + while (remainingBytes > 0) { + int bytesToWrite = Math.min(remainingBytes, bufferSize); + random.nextBytes(buffer); + + file.writeBytes(buffer, bytesToWrite); + + remainingBytes -= bytesToWrite; + } + } // implicitly close file + addToActiveFiles(name); + } + + private void addToActiveFiles(String name) { + activeFiles.put(name, true); + } + + private void addToDeletedFiles(String name) { + deletedFiles.put(name, true); + } +}