diff --git a/.github/workflows/solr-test.yml b/.github/workflows/solr-test.yml index 14de91109ac..5df67ba7e5b 100644 --- a/.github/workflows/solr-test.yml +++ b/.github/workflows/solr-test.yml @@ -35,4 +35,4 @@ jobs: - name: Initialize gradle settings run: ./gradlew localSettings - name: Test Solr - run: ./gradlew test \ No newline at end of file + run: ./gradlew test -Ptests.directory=org.apache.solr.storage.TeeDirectory diff --git a/lucene b/lucene index e96815ed519..cb2b7c495ac 160000 --- a/lucene +++ b/lucene @@ -1 +1 @@ -Subproject commit e96815ed5193ce2409e523adcb88ddba47ca3d01 +Subproject commit cb2b7c495ac9e3bb8bca39e3798e8addd16fb7ff diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java index a1e9012aaf0..2d43a3c8f57 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java @@ -116,6 +116,7 @@ public void call(ClusterState state, ZkNodeProps message, NamedList resu } ModifiableSolrParams params = new ModifiableSolrParams(); params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.UNLOAD.toString()); + params.set(CoreAdminParams.DELETE_INDEX, message.getBool(CoreAdminParams.DELETE_INDEX, true)); params.set(CoreAdminParams.DELETE_INSTANCE_DIR, true); params.set(CoreAdminParams.DELETE_DATA_DIR, true); diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java index 99f503258b8..2aae7e733b9 100644 --- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java +++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java @@ -1265,7 +1265,11 @@ public void shutdown() { // Now clear all the cores that are being operated upon. solrCores.close(); - objectCache.clear(); + try { + objectCache.close(); + } catch (IOException ex) { + log.warn("error closing ObjectCache", ex); + } // It's still possible that one of the pending dynamic load operation is waiting, so wake it // up if so. Since all the pending operations queues have been drained, there should be diff --git a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java index c32fde81593..582e507aec7 100644 --- a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java +++ b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java @@ -42,6 +42,7 @@ import static org.apache.solr.handler.ReplicationHandler.SIZE; import static org.apache.solr.handler.ReplicationHandler.SKIP_COMMIT_ON_LEADER_VERSION_ZERO; +import java.io.Closeable; import java.io.File; import java.io.FileNotFoundException; import java.io.FileOutputStream; @@ -79,6 +80,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BooleanSupplier; import java.util.function.Function; import java.util.stream.Collectors; @@ -96,6 +98,7 @@ import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.store.Lock; import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.impl.HttpClientUtil; @@ -434,6 +437,7 @@ IndexFetchResult fetchLatestIndex(boolean forceReplication) * @return true on success, false if follower is already in sync * @throws IOException if an exception occurs */ + @SuppressWarnings("try") IndexFetchResult fetchLatestIndex(boolean forceReplication, boolean forceCoreReload) throws IOException, InterruptedException { @@ -624,7 +628,15 @@ IndexFetchResult fetchLatestIndex(boolean forceReplication, boolean forceCoreRel .getDirectoryFactory() .get(indexDirPath, DirContext.DEFAULT, solrCore.getSolrConfig().indexConfig.lockType); - try { + Lock lock = tmpIndexDir.obtainLock(IndexWriter.WRITE_LOCK_NAME); + AtomicBoolean releasedLock = new AtomicBoolean(); + + try (Closeable releaseLock = + () -> { + if (releasedLock.compareAndSet(false, true)) { + lock.close(); + } + }) { // We will compare all the index files from the leader vs the index files on disk to see if // there is a mismatch in the metadata. If there is a mismatch for the same index file then @@ -770,6 +782,7 @@ IndexFetchResult fetchLatestIndex(boolean forceReplication, boolean forceCoreRel } } if (isFullCopyNeeded) { + releaseLock.close(); solrCore.getUpdateHandler().newIndexWriter(isFullCopyNeeded); } diff --git a/solr/core/src/java/org/apache/solr/handler/RestoreCore.java b/solr/core/src/java/org/apache/solr/handler/RestoreCore.java index 9bbfa4b9049..c3f0f3a2499 100644 --- a/solr/core/src/java/org/apache/solr/handler/RestoreCore.java +++ b/solr/core/src/java/org/apache/solr/handler/RestoreCore.java @@ -30,9 +30,11 @@ import java.util.concurrent.Callable; import java.util.concurrent.Future; import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.index.IndexWriter; import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.Lock; import org.apache.solr.common.SolrException; import org.apache.solr.core.DirectoryFactory; import org.apache.solr.core.SolrCore; @@ -82,6 +84,7 @@ public Boolean call() throws Exception { return doRestore(); } + @SuppressWarnings("try") public boolean doRestore() throws Exception { SimpleDateFormat dateFormat = new SimpleDateFormat(SnapShooter.DATE_FMT, Locale.ROOT); String restoreIndexName = "restore." + dateFormat.format(new Date()); @@ -99,44 +102,47 @@ public boolean doRestore() throws Exception { DirectoryFactory.DirContext.DEFAULT, core.getSolrConfig().indexConfig.lockType); - // Prefer local copy. - indexDir = - core.getDirectoryFactory() - .get( - indexDirPath, - DirectoryFactory.DirContext.DEFAULT, - core.getSolrConfig().indexConfig.lockType); - Set indexDirFiles = new HashSet<>(Arrays.asList(indexDir.listAll())); - // Move all files from backupDir to restoreIndexDir - for (String filename : repository.listAllFiles()) { - checkInterrupted(); - try { - if (indexDirFiles.contains(filename)) { - Checksum cs = repository.checksum(filename); - IndexFetcher.CompareResult compareResult; - if (cs == null) { - compareResult = new IndexFetcher.CompareResult(); - compareResult.equal = false; + try (Lock lock = restoreIndexDir.obtainLock(IndexWriter.WRITE_LOCK_NAME)) { + // Prefer local copy. + indexDir = + core.getDirectoryFactory() + .get( + indexDirPath, + DirectoryFactory.DirContext.DEFAULT, + core.getSolrConfig().indexConfig.lockType); + Set indexDirFiles = new HashSet<>(Arrays.asList(indexDir.listAll())); + // Move all files from backupDir to restoreIndexDir + for (String filename : repository.listAllFiles()) { + checkInterrupted(); + try { + if (indexDirFiles.contains(filename)) { + Checksum cs = repository.checksum(filename); + IndexFetcher.CompareResult compareResult; + if (cs == null) { + compareResult = new IndexFetcher.CompareResult(); + compareResult.equal = false; + } else { + compareResult = IndexFetcher.compareFile(indexDir, filename, cs.size, cs.checksum); + } + if (!compareResult.equal + || (IndexFetcher.filesToAlwaysDownloadIfNoChecksums( + filename, cs.size, compareResult))) { + repository.repoCopy(filename, restoreIndexDir); + } else { + // prefer local copy + repository.localCopy(indexDir, filename, restoreIndexDir); + } } else { - compareResult = IndexFetcher.compareFile(indexDir, filename, cs.size, cs.checksum); - } - if (!compareResult.equal - || (IndexFetcher.filesToAlwaysDownloadIfNoChecksums( - filename, cs.size, compareResult))) { repository.repoCopy(filename, restoreIndexDir); - } else { - // prefer local copy - repository.localCopy(indexDir, filename, restoreIndexDir); } - } else { - repository.repoCopy(filename, restoreIndexDir); + } catch (Exception e) { + log.warn("Exception while restoring the backup index ", e); + throw new SolrException( + SolrException.ErrorCode.UNKNOWN, "Exception while restoring the backup index", e); } - } catch (Exception e) { - log.warn("Exception while restoring the backup index ", e); - throw new SolrException( - SolrException.ErrorCode.UNKNOWN, "Exception while restoring the backup index", e); } } + log.debug("Switching directories"); core.modifyIndexProps(restoreIndexName); diff --git a/solr/core/src/java/org/apache/solr/storage/AccessDirectory.java b/solr/core/src/java/org/apache/solr/storage/AccessDirectory.java new file mode 100644 index 00000000000..3f74746405d --- /dev/null +++ b/solr/core/src/java/org/apache/solr/storage/AccessDirectory.java @@ -0,0 +1,1763 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.storage; + +import static org.apache.solr.storage.CompressingDirectory.BLOCK_SIZE_ESTIMATE; +import static org.apache.solr.storage.CompressingDirectory.COMPRESSION_BLOCK_MASK_LOW; +import static org.apache.solr.storage.CompressingDirectory.COMPRESSION_BLOCK_SHIFT; +import static org.apache.solr.storage.CompressingDirectory.COMPRESSION_BLOCK_SIZE; +import static org.apache.solr.storage.CompressingDirectory.DirectIOIndexOutput.HEADER_SIZE; + +import java.io.Closeable; +import java.io.EOFException; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.io.UncheckedIOException; +import java.lang.invoke.MethodHandles; +import java.lang.ref.WeakReference; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.FloatBuffer; +import java.nio.IntBuffer; +import java.nio.LongBuffer; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.charset.StandardCharsets; +import java.nio.file.NoSuchFileException; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicIntegerArray; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.LongAdder; +import java.util.function.IntUnaryOperator; +import java.util.stream.Collectors; +import org.apache.lucene.store.AlreadyClosedException; +import org.apache.lucene.store.ByteArrayDataInput; +import org.apache.lucene.store.ByteBufferGuard; +import org.apache.lucene.store.ByteBufferIndexInput; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.store.LockFactory; +import org.apache.lucene.store.MMapDirectory; +import org.apache.lucene.store.MappedByteBufferIndexInputProvider; +import org.apache.lucene.store.RandomAccessInput; +import org.apache.lucene.util.BitUtil; +import org.apache.lucene.util.CollectionUtil; +import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.automaton.Automaton; +import org.apache.lucene.util.automaton.ByteRunAutomaton; +import org.apache.lucene.util.automaton.CompiledAutomaton; +import org.apache.lucene.util.automaton.Operations; +import org.apache.lucene.util.automaton.RegExp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AccessDirectory extends MMapDirectory { + + /** + * Determines chunk size for mmapping files. `1` yields 1g chunks, but this may be set higher to + * stress test buffer boundaries (as low as 15, which yields the min chunk size of 64k, equal to + * {@link CompressingDirectory#COMPRESSION_BLOCK_SIZE}). + */ + private static final int MAP_BUF_DIVIDE_SHIFT = 1; + + public static final int MAX_MAP_SIZE = Integer.MIN_VALUE >>> MAP_BUF_DIVIDE_SHIFT; + public static final int MAX_MAP_MASK = MAX_MAP_SIZE - 1; + public static final int MAX_MAP_SHIFT = Integer.numberOfTrailingZeros(MAX_MAP_SIZE); + + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private static final AtomicInteger RESERVATION = new AtomicInteger(); + + private final Path path; + private final Path compressedPath; + private final LinkedBlockingQueue activationQueue; + private final ConcurrentHashMap priorityActivate; + private final LongAdder rawCt; + private final LongAdder loadedCt; + private final LongAdder populatedCt; + private final LongAdder lazyCt; + private final LongAdder lazyMapSize; + private final LongAdder lazyMapDiskUsage; + private final LongAdder lazyLoadedBlockBytes; + + public AccessDirectory( + Path path, + LockFactory lockFactory, + Path compressedPath, + TeeDirectoryFactory.NodeLevelTeeDirectoryState s) + throws IOException { + super(path, lockFactory); + this.path = path; + this.compressedPath = compressedPath; + this.activationQueue = s.activationQueue; + this.priorityActivate = s.priorityActivate; + this.rawCt = s.rawCt; + this.loadedCt = s.loadedCt; + this.populatedCt = s.populatedCt; + this.lazyCt = s.lazyCt; + this.lazyMapSize = s.lazyMapSize; + this.lazyMapDiskUsage = s.lazyMapDiskUsage; + this.lazyLoadedBlockBytes = s.lazyLoadedBlockBytes; + } + + @Override + public long fileLength(String name) throws IOException { + try { + return super.fileLength(name); + } catch (NoSuchFileException | FileNotFoundException ex) { + LazyEntry lazy = open(name); + if (lazy == null) { + throw ex; + } else { + return lazy.input.length(); + } + } + } + + @Override + public void rename(String source, String dest) throws IOException { + try { + super.rename(source, dest); + } catch (IOException ex) { + LazyEntry lazy = activeLazy.get(source); + if (lazy == null) { + super.rename(source, dest); // will usually throw another copy of same exception + return; + } + if (isClosed) { + throw new AlreadyClosedException("already closed"); + } + LazyEntry extant = activeLazy.putIfAbsent(dest, lazy); + if (extant != null) { + throw new IllegalStateException( + "already have a mapping for " + dest + " => " + extant.lazyName); + } + if (!activeLazy.remove(source, lazy)) { + activeLazy.remove(dest); + throw new IllegalStateException("source is mapped to a new value"); + } + lazy.canonicalName = dest; + } + } + + @Override + public void sync(Collection names) throws IOException { + ensureOpen(); + + for (String name : names) { + try { + fsync(name); + } catch (NoSuchFileException | FileNotFoundException ex) { + LazyEntry lazy = open(name); + if (lazy == null) { + throw ex; + } else { + try { + fsync(lazy.lazyName); + } catch (IOException ex1) { + // try one more time with the original name in case the file was in the + // process of being renamed + fsync(name); + } + } + } + } + + // to trigger `maybeDeletePendingFiles()` + super.sync(Collections.emptySet()); + } + + private volatile boolean isClosed = false; + private final ConcurrentHashMap activeLazy = new ConcurrentHashMap<>(); + + private void activate(LazyEntry lazyEntry) throws IOException { + LazyLoadInput canonical = lazyEntry.input; + String lazyName = lazyEntry.lazyName; + String name = lazyEntry.canonicalName; + long fileSize = canonical.length(); + boolean success = false; + try { + super.sync(Collections.singleton(lazyName)); + super.rename(lazyName, name); + success = true; + syncMetaData(); + if (!canonical.loaded.compareAndSet(null, super.openInput(name, IOContext.READ))) { + log.warn("out-of-band loading detected for file {}", name); + } + } finally { + if (!success) { + IOUtils.deleteFilesIgnoringExceptions(this, lazyName); + } + if (!activeLazy.remove(name, lazyEntry)) { + log.warn("out-of-band lazy removal detected for file {}", name); + } else { + lazyMapSize.decrement(); + lazyMapDiskUsage.add(-fileSize); + } + } + } + + static final class LazyEntry { + private String canonicalName; + private final String lazyName; + private final LazyLoadInput input; + private final WeakReference dir; + private final int[] from = new int[1]; + + private LazyEntry( + String canonicalName, String lazyName, LazyLoadInput input, AccessDirectory dir) { + this.canonicalName = canonicalName; + this.lazyName = lazyName; + this.input = input; + this.dir = new WeakReference<>(dir); + } + + public int load() throws IOException { + AccessDirectory dir = this.dir.get(); + if (dir == null) { + return -1; + } + long offset = ((long) from[0]) << COMPRESSION_BLOCK_SHIFT; + + // NOTE: regular activate (what we're doing here) should bypass `slice`, since + // we don't want crosstalk with `priorityActivate`, `populated()`, etc. + LazyLoadInput in = new LazyLoadInput("activation", input, offset, input.length - offset); + + final int ret = in.load(from); + if (ret >= 0) { + return ret; + } + // we are finished, so finally activate the file + dir.activate(this); + return ret; + } + } + + private static final String LAZY_TMP_FILE_SUFFIX = "lazy"; + private static final String LAZY_TMP_FILE_REGEX = + ".*_" + LAZY_TMP_FILE_SUFFIX + "_[0-9a-z]+\\.tmp"; + private static final int PATTERN_LOWEST = '.'; + private static final int PATTERN_HIGHEST = 'z'; + private static final ByteRunAutomaton LAZY_TMP_FILE_AUTOMATON; + private static final int LAZY_TMP_FILE_AUTOMATON_ACCEPT_STATE; + + static { + // reverse the pattern automaton and step through in reverse + Automaton a = + Operations.reverse( + new RegExp(LAZY_TMP_FILE_REGEX).toAutomaton(Operations.DEFAULT_DETERMINIZE_WORK_LIMIT)); + LAZY_TMP_FILE_AUTOMATON = + new CompiledAutomaton(a, null, true, Operations.DEFAULT_DETERMINIZE_WORK_LIMIT, false) + .runAutomaton; + int s = 0; + String matchingFilename = "_lazy_1.tmp"; + for (int i = matchingFilename.length() - 1; i >= 0; i--) { + char c = matchingFilename.charAt(i); + s = LAZY_TMP_FILE_AUTOMATON.step(s, c & 0x7f); + if (LAZY_TMP_FILE_AUTOMATON.isAccept(s)) { + break; + } + } + if (!LAZY_TMP_FILE_AUTOMATON.isAccept(s)) { + throw new AssertionError(); + } + // because the pattern is non-branching, we know there is exactly one accept state + LAZY_TMP_FILE_AUTOMATON_ACCEPT_STATE = s; + } + + static int lazyTmpFileSuffixStartIdx(String filename) { + int s = 0; + for (int i = filename.length() - 1; i >= 0; i--) { + char c = filename.charAt(i); + if (c < PATTERN_LOWEST || c > PATTERN_HIGHEST) { + // this allows us to use char input with byte-based ByteRunAutomaton + return -1; + } + if ((s = LAZY_TMP_FILE_AUTOMATON.step(s, c & 0x7f)) == -1) { + return -1; + } + if (s == LAZY_TMP_FILE_AUTOMATON_ACCEPT_STATE) { + return i; + } + } + return -1; + } + + @SuppressWarnings("try") + private LazyEntry open(String name) throws IOException { + LazyEntry lazyEntry; + try { + boolean[] weComputed = new boolean[1]; + lazyEntry = + activeLazy.computeIfAbsent( + name, + (k) -> { + String lazyName = null; + LazyEntry ret = null; + try (IndexOutput out = + super.createTempOutput(name, LAZY_TMP_FILE_SUFFIX, IOContext.DEFAULT)) { + // create a stub + lazyName = out.getName(); + ret = + new LazyEntry( + name, + lazyName, + new LazyLoadInput( + compressedPath.resolve(name), + path.resolve(lazyName), + rawCt, + loadedCt, + populatedCt, + lazyCt, + priorityActivate, + lazyLoadedBlockBytes), + this); + weComputed[0] = true; + lazyMapDiskUsage.add(ret.input.length()); + lazyMapSize.increment(); + if (isClosed) { + try (Closeable c = ret.input) { + lazyMapDiskUsage.add(-ret.input.length()); + lazyMapSize.decrement(); + } + ret = null; + throw new AlreadyClosedException("Directory already closed"); + } + return ret; + } catch (IOException e) { + throw new UncheckedIOException(e); + } finally { + if (ret == null && lazyName != null) { + try { + super.deleteFile(lazyName); + } catch (IOException e) { + // another exception has been thrown; we can ignore this one + } + } + } + }); + if (weComputed[0]) { + activationQueue.add(lazyEntry); + } + return lazyEntry; + } catch (UncheckedIOException e) { + throw e.getCause(); + } catch (Exception e) { + throw new IOException(e); + } + } + + @Override + public IndexInput openInput(String name, IOContext context) throws IOException { + try { + IndexInput ret = super.openInput(name, context); + rawCt.increment(); + return ret; + } catch (NoSuchFileException | FileNotFoundException ex) { + try { + LazyEntry lazy = open(name); + if (lazy == null) { + throw ex; + } else { + return lazy.input.clone(); + } + } catch (IOException ex1) { + ex1.addSuppressed(ex); + throw ex1; + } + } + } + + @Override + @SuppressWarnings("try") + public void deleteFile(String name) throws IOException { + try (Closeable c = () -> super.deleteFile(name)) { + LazyEntry lazyRemoved = activeLazy.remove(name); + if (lazyRemoved != null) { + long fileSize = lazyRemoved.input.length(); + try (lazyRemoved.input) { + super.deleteFile(lazyRemoved.lazyName); + } finally { + lazyMapDiskUsage.add(-fileSize); + lazyMapSize.decrement(); + } + } + } + } + + @Override + public synchronized void close() throws IOException { + isClosed = true; + try { + // try to remove and delete all the active lazy files + Iterator iter = activeLazy.values().iterator(); + while (iter.hasNext()) { + LazyEntry e = iter.next(); + try (e.input) { + IOUtils.deleteFilesIgnoringExceptions(this, e.lazyName); + iter.remove(); + lazyMapSize.decrement(); + lazyMapDiskUsage.add(-e.input.length()); + } catch (Exception ex) { + log.warn("exception closing lazy input for {}", e.canonicalName, ex); + } + } + } finally { + if (!activeLazy.isEmpty()) { + log.error( + "found residual lazy input after close: {}", + activeLazy.values().stream().map((e) -> e.canonicalName).collect(Collectors.toList())); + activeLazy.clear(); + } + super.close(); + } + } + + private static final ByteBuffer EMPTY = ByteBuffer.allocate(0); + + private static final int COMPRESSION_TO_MAP_TRANSLATE_SHIFT = + MAX_MAP_SHIFT - COMPRESSION_BLOCK_SHIFT; + + static class ConcurrentIntSet + implements Comparable, Callable, AutoCloseable { + private volatile boolean closed = false; + private final String tmpPath; + private final int lastIdx; + private final int blocksSize; + private final AtomicIntegerArray bits; + private final ConcurrentHashMap priorityActivate; + private LazyLoadInput in; + + ConcurrentIntSet( + String tmpPath, + int size, + int blocksSize, + LazyLoadInput in, + ConcurrentHashMap priorityActivate) { + this.tmpPath = tmpPath; + this.lastIdx = size - 1; + this.blocksSize = ((blocksSize - 1) / Integer.SIZE) + 1; + bits = new AtomicIntegerArray(this.blocksSize); + this.priorityActivate = priorityActivate; + this.in = in; + } + + boolean add(int val) { + int alignment = val & RESERVE_MASK; + int extant = bits.getAndUpdate(val >> RESERVE_SHIFT, COMPLETE_SET[alignment]); + try { + return (extant & RESERVE_MASKS[alignment]) == 0; + } finally { + priorityActivate.putIfAbsent(this, Boolean.TRUE); + } + } + + @Override + public Integer call() throws IOException { + LazyLoadInput in = this.in; + if (in == null) { + return 0; + } + int loadedCt = 0; + int blockIdx = 0; + assert lastIdx == in.lastBlockIdx; + do { + int blockVal; + while ((blockVal = bits.getAndSet(blockIdx, 0)) == 0) { + if (++blockIdx >= blocksSize) { + return loadedCt; + } + // keep advancing + } + int mask = Integer.MIN_VALUE; + int blockIdxBaseline = + blockIdx << RESERVE_SHIFT; // the first of group of indexes into `complete` array + int i = 0; + do { + if ((blockVal & mask) != 0) { + // get the actual base compressed block idx + int compressedBlockBaseline = (blockIdxBaseline + i) << RESERVE_SHIFT; + for (int idx = compressedBlockBaseline, lim = compressedBlockBaseline + Integer.SIZE; + idx < lim; + idx++) { + int decompressedLen; + if (idx < lastIdx) { + decompressedLen = COMPRESSION_BLOCK_SIZE; + } else if (idx == lastIdx) { + decompressedLen = in.lastBlockDecompressedLen; + } else { + return loadedCt; + } + if (closed) { + return loadedCt; + } else if (in.loadBlock(idx, decompressedLen)) { + loadedCt++; + } + } + } + i++; + } while ((mask >>>= 1) != 0); + } while (++blockIdx < blocksSize); + return loadedCt; + } + + @Override + public int hashCode() { + return tmpPath.hashCode() ^ ConcurrentIntSet.class.hashCode(); + } + + @Override + public boolean equals(Object obj) { + return tmpPath.equals(((ConcurrentIntSet) obj).tmpPath); + } + + @Override + public int compareTo(ConcurrentIntSet o) { + return tmpPath.compareTo(o.tmpPath); + } + + @Override + public void close() { + closed = true; + in = null; + } + } + + private static final int RESERVE_SHIFT = 5; + private static final int RESERVE_MASK = (1 << RESERVE_SHIFT) - 1; + private static final int[] RESERVE_MASKS = new int[Integer.SIZE]; + private static final IntUnaryOperator[] RESERVE_RELEASE = new IntUnaryOperator[Integer.SIZE]; + private static final IntUnaryOperator[] COMPLETE_SET = new IntUnaryOperator[Integer.SIZE]; + + static { + int mask = 1; + for (int i = Integer.SIZE - 1; i >= 0; i--) { + RESERVE_MASKS[i] = mask; + int invert = ~mask; + int maskF = mask; + RESERVE_RELEASE[i] = (j) -> j & invert; + COMPLETE_SET[i] = (j) -> j | maskF; + mask <<= 1; + } + } + + private static ByteBufferGuard.BufferCleaner unmapHack() { + Object hack = MappedByteBufferIndexInputProvider.unmapHackImpl(); + if (hack instanceof ByteBufferGuard.BufferCleaner) { + return (ByteBufferGuard.BufferCleaner) hack; + } else { + throw new UnsupportedOperationException("unmap not available"); + } + } + + static final class LazyLoadInput extends IndexInput implements RandomAccessInput { + private final AtomicIntegerArray populatedUpTo; + private final ConcurrentIntSet priorityLoad; + private final AtomicReference loaded; + private final AtomicIntegerArray reserve; + private final AtomicIntegerArray complete; + private final ByteBufferGuard compressedGuard; + private final ByteBufferGuard guard; + private final long length; + private final boolean isClone; + private final long[] blockOffsets; + private final int blockCount; + private final int lastBlockIdx; + private final int lastBlockDecompressedLen; + private ByteBuffer[] mapped; + private ByteBuffer[] accessMapped; + + private final IndexInput accessPopulated; + + private final long offset; + private final long sliceLength; + + private long seekPos = -1; + private long filePointer = 0; + private ByteBuffer postBuffer = EMPTY; + private int postBufferBaseline; + private int currentBlockIdx = -1; + + private final LongBuffer[][] multiLongViews; + private final IntBuffer[][] multiIntViews; + private final FloatBuffer[][] multiFloatViews; + private static final LongBuffer EMPTY_LONGBUFFER = LongBuffer.allocate(0); + private static final IntBuffer EMPTY_INTBUFFER = IntBuffer.allocate(0); + private static final FloatBuffer EMPTY_FLOATBUFFER = FloatBuffer.allocate(0); + private LongBuffer[] longViews; + private IntBuffer[] intViews; + private FloatBuffer[] floatViews; + + private final LongAdder rawCt; + private final LongAdder loadedCt; + private final LongAdder populatedCt; + private final LongAdder lazyCt; + private final LongAdder lazyLoadedBlockBytes; + + private LazyLoadInput( + String resourceDescription, LazyLoadInput parent, long offset, long length) { + super(resourceDescription); + this.lazyLoadedBlockBytes = parent.lazyLoadedBlockBytes; + this.populatedUpTo = parent.populatedUpTo; + this.priorityLoad = parent.priorityLoad; + this.loaded = parent.loaded; + this.guard = parent.guard; + this.compressedGuard = parent.compressedGuard; + this.accessPopulated = parent.accessPopulated; + this.isClone = true; + this.length = parent.length; + this.blockOffsets = parent.blockOffsets; + this.blockCount = parent.blockCount; + this.lastBlockIdx = parent.lastBlockIdx; + this.lastBlockDecompressedLen = parent.lastBlockDecompressedLen; + this.reserve = parent.reserve; + this.complete = parent.complete; + this.offset = parent.offset + offset; + this.seekPos = this.offset; + this.sliceLength = length; + this.multiFloatViews = parent.multiFloatViews; + this.multiLongViews = parent.multiLongViews; + this.multiIntViews = parent.multiIntViews; + ByteBuffer[] parentMapped = parent.mapped; + this.mapped = new ByteBuffer[parentMapped.length]; + for (int i = parentMapped.length - 1; i >= 0; i--) { + mapped[i] = parentMapped[i].duplicate(); + } + ByteBuffer[] parentAccessMapped = parent.accessMapped; + this.accessMapped = new ByteBuffer[parentAccessMapped.length]; + for (int i = parentAccessMapped.length - 1; i >= 0; i--) { + accessMapped[i] = parentAccessMapped[i].duplicate().order(ByteOrder.LITTLE_ENDIAN); + } + this.rawCt = parent.rawCt; + this.loadedCt = parent.loadedCt; + this.populatedCt = parent.populatedCt; + this.lazyCt = parent.lazyCt; + } + + private LazyLoadInput( + Path source, + Path lazy, + LongAdder rawCt, + LongAdder loadedCt, + LongAdder populatedCt, + LongAdder lazyCt, + ConcurrentHashMap priorityActivate, + LongAdder lazyLoadedBlockBytes) + throws IOException { + super("lazy"); + this.lazyLoadedBlockBytes = lazyLoadedBlockBytes; + this.rawCt = rawCt; + this.loadedCt = loadedCt; + this.populatedCt = populatedCt; + this.lazyCt = lazyCt; + loaded = new AtomicReference<>(); + guard = new ByteBufferGuard("accessGuard", unmapHack()); + compressedGuard = new ByteBufferGuard("compressedGuard", unmapHack()); + try (FileChannel channel = FileChannel.open(source, StandardOpenOption.READ)) { + long compressedFileSize = channel.size(); + mapped = + new MappedByteBuffer[Math.toIntExact(((compressedFileSize - 1) >> MAX_MAP_SHIFT) + 1)]; + long pos = 0; + long limit = MAX_MAP_SIZE; + for (int i = 0, lim = mapped.length; i < lim; i++) { + int size = (int) (Math.min(limit, compressedFileSize) - pos); + mapped[i] = channel.map(FileChannel.MapMode.READ_ONLY, pos, size); + pos = limit; + limit += MAX_MAP_SIZE; + } + + isClone = false; + long size = channel.size(); + if (size == 0) { + length = 0; + blockOffsets = null; + blockCount = 0; + lastBlockIdx = -1; + lastBlockDecompressedLen = 0; + reserve = null; + complete = null; + priorityLoad = null; + populatedUpTo = null; + } else { + ByteBuffer initial = mapped[0]; + length = guard.getLong(initial, 0); + if (length >> COMPRESSION_BLOCK_SHIFT > Integer.MAX_VALUE) { + throw new IllegalArgumentException( + "file too long " + Long.toHexString(length) + ", " + source); + } + int blockDeltaFooterSize = guard.getInt(initial, Long.BYTES); + int cTypeId = guard.getByte(initial, HEADER_SIZE - Integer.BYTES) & 0xff; + if (cTypeId != CompressingDirectory.COMPRESSION_TYPE.id) { + throw new IllegalArgumentException("unrecognized compression type id: " + cTypeId); + } + int cBlockTypeId = guard.getByte(initial, HEADER_SIZE - Integer.BYTES + 1) & 0xff; + if (cBlockTypeId != CompressingDirectory.COMPRESSION_TYPE.id) { + throw new IllegalArgumentException( + "unrecognized compression block type id: " + cBlockTypeId); + } + byte[] footer = new byte[blockDeltaFooterSize]; + long blockDeltaFooterOffset = size - blockDeltaFooterSize; + + // TODO: read this from mapped instead? + channel.read(ByteBuffer.wrap(footer), blockDeltaFooterOffset); + ByteArrayDataInput in = new ByteArrayDataInput(footer); + + long blockOffset = HEADER_SIZE; + int lastBlockSize = BLOCK_SIZE_ESTIMATE; + blockCount = (int) (((length - 1) >> COMPRESSION_BLOCK_SHIFT) + 1); + blockOffsets = new long[blockCount + 1]; + lastBlockIdx = blockCount - 1; + lastBlockDecompressedLen = (int) (((length - 1) & COMPRESSION_BLOCK_MASK_LOW) + 1); + blockOffsets[0] = blockOffset; + for (int i = 1; i < blockCount; i++) { + int delta = in.readZInt(); + int nextBlockSize = lastBlockSize + delta; + blockOffset += nextBlockSize; + blockOffsets[i] = blockOffset; + lastBlockSize = nextBlockSize; + } + blockOffsets[blockCount] = blockDeltaFooterOffset; + int reserveSize = ((blockCount - 1) / Integer.SIZE) + 1; + reserve = new AtomicIntegerArray(reserveSize); + complete = new AtomicIntegerArray(reserveSize); + priorityLoad = + new ConcurrentIntSet( + lazy.toString(), blockCount, reserveSize, this, priorityActivate); + populatedUpTo = new AtomicIntegerArray(reserveSize); + populatedUpTo.set(0, -1); // every index must initially have a value less than itself. + } + } + + this.offset = 0; + this.sliceLength = length; + + try (RandomAccessFile raf = new RandomAccessFile(lazy.toFile(), "rw")) { + // allocate the proper amount of space + raf.setLength(length); + FileChannel channel = raf.getChannel(); + int mapChunkCount = Math.toIntExact(((length - 1) >> MAX_MAP_SHIFT) + 1); + accessMapped = new MappedByteBuffer[mapChunkCount]; + multiLongViews = new LongBuffer[mapChunkCount][]; + multiIntViews = new IntBuffer[mapChunkCount][]; + multiFloatViews = new FloatBuffer[mapChunkCount][]; + long pos = 0; + long limit = MAX_MAP_SIZE; + for (int i = 0, lim = accessMapped.length; i < lim; i++) { + int size = (int) (Math.min(limit, length) - pos); + MappedByteBuffer mbb = channel.map(FileChannel.MapMode.READ_WRITE, pos, size); + accessMapped[i] = mbb; + LongBuffer[] lbb = new LongBuffer[Long.BYTES]; + IntBuffer[] ibb = new IntBuffer[Integer.BYTES]; + FloatBuffer[] fbb = new FloatBuffer[Float.BYTES]; + multiLongViews[i] = lbb; + multiIntViews[i] = ibb; + multiFloatViews[i] = fbb; + mbb.order(ByteOrder.LITTLE_ENDIAN); + for (int j = Long.BYTES - 1; j >= 0; j--) { + mbb.position(j); + lbb[j] = mbb.asLongBuffer(); + if (j < Integer.BYTES) { + ibb[j] = mbb.asIntBuffer(); + fbb[j] = mbb.asFloatBuffer(); + } + } + pos = limit; + limit += MAX_MAP_SIZE; + } + } + boolean padCopy = (length & MAX_MAP_SHIFT) == 0; + ByteBuffer[] copy = new ByteBuffer[padCopy ? accessMapped.length + 1 : accessMapped.length]; + for (int i = accessMapped.length - 1; i >= 0; i--) { + copy[i] = accessMapped[i].duplicate().order(ByteOrder.LITTLE_ENDIAN); + } + if (padCopy) { + // ByteBufferIndexInput seems to overallocate number of buffers on chunk size boundaries, + // so add padding empty buffer to avoid creating problems. + copy[accessMapped.length] = ByteBuffer.allocateDirect(0).order(ByteOrder.LITTLE_ENDIAN); + } + this.accessPopulated = + ByteBufferIndexInput.newInstance("accessPopulated", copy, length, MAX_MAP_SHIFT, guard); + } + + private boolean acquireWrite(int blockIdx) { + final int reserveBlockIdx = blockIdx >>> RESERVE_SHIFT; + final int reserveMask = RESERVE_MASKS[blockIdx & RESERVE_MASK]; + int witness = reserve.get(reserveBlockIdx); + int expected; + while (((expected = witness) & reserveMask) == 0) { + witness = reserve.compareAndExchange(reserveBlockIdx, expected, expected | reserveMask); + if (witness == expected) { + return true; + } + } + return false; + } + + private void releaseWrite(int blockIdx) { + reserve.updateAndGet(blockIdx >>> RESERVE_SHIFT, RESERVE_RELEASE[blockIdx & RESERVE_MASK]); + } + + private void setReadable(int blockIdx) { + complete.updateAndGet(blockIdx >>> RESERVE_SHIFT, COMPLETE_SET[blockIdx & RESERVE_MASK]); + } + + private boolean acquireRead(int blockIdx) { + final int reserveBlockIdx = blockIdx >>> RESERVE_SHIFT; + final int reserveMask = RESERVE_MASKS[blockIdx & RESERVE_MASK]; + while ((complete.get(reserveBlockIdx) & reserveMask) == 0) { + // keep checking + Thread.yield(); + if ((reserve.get(reserveBlockIdx) & reserveMask) != reserveMask) { + // the computation we were waiting for must have failed + return false; + } + // TODO: this should be ok to busy-wait, but maybe re-evaluate based on stats? + } + return true; + } + + private boolean populated(long offset, long length) { + int startBlockIdx = (int) (offset >> COMPRESSION_BLOCK_SHIFT); + int lastBlockIdx = (int) ((offset + length - 1) >> COMPRESSION_BLOCK_SHIFT); + final int startCompleteBlockIdx = startBlockIdx >>> RESERVE_SHIFT; + final int lastCompleteBlockIdx = lastBlockIdx >>> RESERVE_SHIFT; + final int firstSubsequentPartialBlock = populatedUpTo.get(startCompleteBlockIdx); + if (firstSubsequentPartialBlock > lastCompleteBlockIdx) { + // we've already checked and recorded that this is a populated range + return true; + } else if (firstSubsequentPartialBlock < lastCompleteBlockIdx) { + // we haven't checked the full range + final boolean registerRun; + if (firstSubsequentPartialBlock >= startCompleteBlockIdx) { + // the first block is populated + registerRun = true; + } else { + // the first block is not populated + final int startCompleteBlockVal = complete.get(startCompleteBlockIdx); + registerRun = + startCompleteBlockVal == -1 + || (blockCount <= Integer.SIZE + && Integer.bitCount(startCompleteBlockVal) == blockCount); + if (!registerRun) { + priorityLoad.add(startCompleteBlockIdx); + final int startMask = -1 >>> (startBlockIdx & RESERVE_MASK); // not sign-extended + if ((startCompleteBlockVal & startMask) != startMask) { + return false; + } + } + } + for (int i = Math.max(startCompleteBlockIdx + 1, firstSubsequentPartialBlock); + i < lastCompleteBlockIdx; + i++) { + if (complete.get(i) != -1) { + priorityLoad.add(i); + if (registerRun) { + registerRun(startCompleteBlockIdx, i); + } + return false; + } + } + if (registerRun) { + registerRun(startCompleteBlockIdx, lastCompleteBlockIdx); + } + } + final int lastMask = Integer.MIN_VALUE >> (lastBlockIdx & RESERVE_MASK); // sign-extended + if ((complete.get(lastCompleteBlockIdx) & lastMask) == lastMask) { + return true; + } else { + priorityLoad.add(lastCompleteBlockIdx); + return false; + } + } + + private void registerRun(int startCompleteBlockIdx, int upTo) { + int expected = populatedUpTo.get(startCompleteBlockIdx); + int witnessUpTo; + while (upTo > expected + && (witnessUpTo = populatedUpTo.compareAndExchange(startCompleteBlockIdx, expected, upTo)) + != expected) { + expected = witnessUpTo; + } + } + + private void actualSeek(final long pos) throws IOException { + filePointer = pos; + final int blockIdx = (int) (pos >> COMPRESSION_BLOCK_SHIFT); + if (blockIdx != currentBlockIdx) { + initBlock(blockIdx); + } + postBuffer.position(postBufferBaseline + (int) (pos & COMPRESSION_BLOCK_MASK_LOW)); + } + + private void initBlock(int blockIdx) throws IOException { + // NOTE: it is very important to keep this method small so that it can be inlined. + if (blockIdx > lastBlockIdx) { + throw new EOFException(); + } + final long blockOffset = blockOffsets[blockIdx]; + final int compressedLen = (int) (blockOffsets[blockIdx + 1] - blockOffset); + refill(blockOffset, compressedLen, blockIdx); + } + + private void refill() throws IOException { + final int blockIdx = currentBlockIdx + 1; + if (blockIdx > lastBlockIdx) { + throw new EOFException(); + } + final long blockOffset = blockOffsets[blockIdx]; + final int compressedLen = (int) (blockOffsets[blockIdx + 1] - blockOffset); + refill(blockOffset, compressedLen, blockIdx); + } + + private ByteBuffer supply(long pos, int compressedLen, int decompressedLen) throws IOException { + final byte[] preBuffer = new byte[compressedLen]; + final byte[] decompressBuffer = + new byte[decompressedLen + 7]; // +7 overhead for optimal decompression + ByteBuffer bb = mapped[(int) (pos >> MAX_MAP_SHIFT)].position((int) (pos & MAX_MAP_MASK)); + int offset = 0; + int left = bb.remaining(); + int toRead = compressedLen; + RESERVATION.incrementAndGet(); + try { + while (left < toRead) { + compressedGuard.getBytes(bb, preBuffer, offset, left); + toRead -= left; + offset += left; + pos += left; + bb = mapped[(int) (pos >> MAX_MAP_SHIFT)].position((int) (pos & MAX_MAP_MASK)); + left = bb.remaining(); + } + compressedGuard.getBytes(bb, preBuffer, offset, toRead); + } finally { + RESERVATION.decrementAndGet(); + } + CompressingDirectory.decompress(preBuffer, 0, decompressedLen, decompressBuffer, 0); + lazyLoadedBlockBytes.add(decompressedLen); + return ByteBuffer.wrap( + decompressBuffer, + 0, + decompressedLen); // usually decompressedLen == COMPRESSION_BLOCK_SIZE + } + + private boolean loadBlock(int blockIdx, int decompressedLen) throws IOException { + if (!acquireWrite(blockIdx)) { + return false; + } else { + try { + final long blockOffset = blockOffsets[blockIdx]; + final int compressedLen = (int) (blockOffsets[blockIdx + 1] - blockOffset); + int reservationCt; + while ((reservationCt = RESERVATION.get()) > 0) { + // Yes, we are busy-waiting here. But we can't afford synchronization in the + // block-cache supply method, which is the code that modifies `reservation` + // ... whereas we really don't care about the performance of this method, it's + // async, and best-effort only. + try { + Thread.sleep((long) reservationCt << 3); + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + } + ByteBuffer supply = supply(blockOffset, compressedLen, decompressedLen); + long accessBlockStart = ((long) blockIdx) << COMPRESSION_BLOCK_SHIFT; + ByteBuffer dest = accessMapped[(int) (accessBlockStart >> MAX_MAP_SHIFT)]; + int destPos = (int) (accessBlockStart & MAX_MAP_MASK); + dest.clear().position(destPos).put(supply); + setReadable(blockIdx); + } catch (Throwable t) { + releaseWrite(blockIdx); + throw t; // this should really never happen; if it does, just bail + } + return true; + } + } + + private int load(int[] from) throws IOException { + int ret = 0; + for (int blockIdx = from[0]; blockIdx <= lastBlockIdx; blockIdx++) { + do { + boolean isLastBlock = blockIdx == lastBlockIdx; + if (loadBlock( + blockIdx, isLastBlock ? lastBlockDecompressedLen : COMPRESSION_BLOCK_SIZE)) { + if (++ret >= Integer.SIZE) { + from[0] = blockIdx + 1; + return isLastBlock ? ~ret : ret; + } + } + } while (!acquireRead(blockIdx)); + } + return ~ret; + } + + private void refill(final long pos, final int compressedLen, final int blockIdx) + throws IOException { + final int decompressedLen = + blockIdx == lastBlockIdx ? lastBlockDecompressedLen : COMPRESSION_BLOCK_SIZE; + if (acquireWrite(blockIdx)) { + try { + ByteBuffer supply = supply(pos, compressedLen, decompressedLen); + long accessBlockStart = ((long) blockIdx) << COMPRESSION_BLOCK_SHIFT; + ByteBuffer dest = accessMapped[(int) (accessBlockStart >> MAX_MAP_SHIFT)]; + int destPos = (int) (accessBlockStart & MAX_MAP_MASK); + dest.clear().position(destPos).limit(destPos + decompressedLen).mark(); + dest.put(supply).reset(); + postBuffer = dest; + postBufferBaseline = destPos; + setReadable(blockIdx); + } catch (Throwable t) { + releaseWrite(blockIdx); + throw t; + } + } else if (acquireRead(blockIdx)) { + long accessBlockStart = ((long) blockIdx) << COMPRESSION_BLOCK_SHIFT; + ByteBuffer dest = accessMapped[(int) (accessBlockStart >> MAX_MAP_SHIFT)]; + int destPos = (int) (accessBlockStart & MAX_MAP_MASK); + dest.clear().position(destPos).limit(destPos + decompressedLen); + postBuffer = dest; + postBufferBaseline = destPos; + } else { + postBuffer = supply(pos, compressedLen, decompressedLen); + postBufferBaseline = postBuffer.position(); + postBuffer.order(ByteOrder.LITTLE_ENDIAN); + } + if (currentBlockIdx != -1 + && currentBlockIdx >> COMPRESSION_TO_MAP_TRANSLATE_SHIFT + != blockIdx >> COMPRESSION_TO_MAP_TRANSLATE_SHIFT) { + // views have gone out of scope + floatViews = null; + intViews = null; + longViews = null; + } + currentBlockIdx = blockIdx; + } + + @Override + public byte readByte(final long pos) throws IOException { + final long absolutePos = pos + offset; + final int blockIdx = (int) (absolutePos >> COMPRESSION_BLOCK_SHIFT); + if (blockIdx != currentBlockIdx) { + initBlock(blockIdx); + } + return postBuffer.get(postBufferBaseline + (int) (absolutePos & COMPRESSION_BLOCK_MASK_LOW)); + } + + @Override + public short readShort(final long pos) throws IOException { + final long absolutePos = pos + offset; + final int blockIdx = (int) (absolutePos >> COMPRESSION_BLOCK_SHIFT); + if (blockIdx != currentBlockIdx) { + initBlock(blockIdx); + } + final int limit = postBuffer.limit(); + final int localPos = postBufferBaseline + (int) (absolutePos & COMPRESSION_BLOCK_MASK_LOW); + final int remaining = limit - localPos; + if (remaining < Short.BYTES) { + return slowReadShort(remaining, localPos); + } else { + return postBuffer.getShort(localPos); + } + } + + @Override + public int readInt(final long pos) throws IOException { + final long absolutePos = pos + offset; + final int blockIdx = (int) (absolutePos >> COMPRESSION_BLOCK_SHIFT); + if (blockIdx != currentBlockIdx) { + initBlock(blockIdx); + } + final int limit = postBuffer.limit(); + final int localPos = postBufferBaseline + (int) (absolutePos & COMPRESSION_BLOCK_MASK_LOW); + final int remaining = limit - localPos; + if (remaining < Integer.BYTES) { + return slowReadInt(remaining, localPos); + } else { + return postBuffer.getInt(localPos); + } + } + + @Override + public long readLong(final long pos) throws IOException { + final long absolutePos = pos + offset; + final int blockIdx = (int) (absolutePos >> COMPRESSION_BLOCK_SHIFT); + if (blockIdx != currentBlockIdx) { + initBlock(blockIdx); + } + final int limit = postBuffer.limit(); + final int localPos = postBufferBaseline + (int) (absolutePos & COMPRESSION_BLOCK_MASK_LOW); + final int remaining = limit - localPos; + if (remaining < Long.BYTES) { + return slowReadLong(remaining, localPos); + } else { + return postBuffer.getLong(localPos); + } + } + + @Override + public byte readByte() throws IOException { + final long pos = seekPos; + if (pos != -1) { + seekPos = -1; + actualSeek(pos); + } + return _readByte(postBuffer.hasRemaining() ? 1 : 0); + } + + private byte _readByte(final int remaining) throws IOException { + if (remaining == 0) { + refill(); + } + filePointer++; + + return postBuffer.get(); + } + + @Override + public void readBytes(byte[] dst, int offset, int len) throws IOException { + final long pos = seekPos; + if (pos != -1) { + seekPos = -1; + actualSeek(pos); + } + final int left = postBuffer.remaining(); + filePointer += len; + if (left < len) { + slowReadBytes(dst, offset, len, left); + } else { + postBuffer.get(dst, offset, len); + } + } + + private void slowReadBytes(final byte[] dst, int offset, int toRead, int left) + throws IOException { + do { + postBuffer.get(dst, offset, left); + toRead -= left; + offset += left; + refill(); + left = postBuffer.remaining(); + } while (left < toRead); + postBuffer.get(dst, offset, toRead); + } + + @Override + public short readShort() throws IOException { + final long pos = seekPos; + if (pos != -1) { + seekPos = -1; + actualSeek(pos); + } + final int remaining = postBuffer.remaining(); + if (remaining < Short.BYTES) { + return slowReadShort(remaining); + } else { + filePointer += Short.BYTES; + return postBuffer.getShort(); + } + } + + private short slowReadShort(final int remaining) throws IOException { + final byte b1 = _readByte(remaining); + final byte b2 = _readByte(remaining - 1); + return (short) (((b2 & 0xFF) << 8) | (b1 & 0xFF)); + } + + private short slowReadShort(final int remaining, final int pos) throws IOException { + assert remaining == 1; + final byte b1 = postBuffer.get(pos); + refill(); + final byte b2 = postBuffer.get(postBufferBaseline); + return (short) (((b2 & 0xFF) << 8) | (b1 & 0xFF)); + } + + @Override + public int readInt() throws IOException { + final long pos = seekPos; + if (pos != -1) { + seekPos = -1; + actualSeek(pos); + } + final int remaining = postBuffer.remaining(); + if (remaining < Integer.BYTES) { + return slowReadInt(remaining); + } else { + filePointer += Integer.BYTES; + return postBuffer.getInt(); + } + } + + private int _readInt(final int remaining) throws IOException { + if (remaining < Integer.BYTES) { + return slowReadInt(remaining); + } else { + filePointer += Integer.BYTES; + return postBuffer.getInt(); + } + } + + private int slowReadInt(int remaining) throws IOException { + final byte b1 = _readByte(remaining); + final byte b2 = _readByte(--remaining); + final byte b3 = _readByte(--remaining); + final byte b4 = _readByte(--remaining); + return ((b4 & 0xFF) << 24) | ((b3 & 0xFF) << 16) | ((b2 & 0xFF) << 8) | (b1 & 0xFF); + } + + private int _readInt(final int remaining, final int pos) throws IOException { + if (remaining < Integer.BYTES) { + return slowReadInt(remaining, pos); + } else { + return postBuffer.getInt(pos); + } + } + + private int slowReadInt(int remaining, int pos) throws IOException { + assert remaining > 0; + final byte b1 = postBuffer.get(pos++); + if (--remaining == 0) { + refill(); + pos = postBufferBaseline; + } + final byte b2 = postBuffer.get(pos++); + if (--remaining == 0) { + refill(); + pos = postBufferBaseline; + } + final byte b3 = postBuffer.get(pos++); + if (--remaining == 0) { + refill(); + pos = postBufferBaseline; + } + final byte b4 = postBuffer.get(pos); + return ((b4 & 0xFF) << 24) | ((b3 & 0xFF) << 16) | ((b2 & 0xFF) << 8) | (b1 & 0xFF); + } + + @Override + public long readLong() throws IOException { + final long pos = seekPos; + if (pos != -1) { + seekPos = -1; + actualSeek(pos); + } + final int remaining = postBuffer.remaining(); + if (remaining < Long.BYTES) { + return (_readInt(remaining) & 0xFFFFFFFFL) + | (((long) _readInt(postBuffer.remaining())) << 32); + } else { + filePointer += Long.BYTES; + return postBuffer.getLong(); + } + } + + public long _readLong(final int remaining) throws IOException { + if (remaining < Long.BYTES) { + return (_readInt(remaining) & 0xFFFFFFFFL) + | (((long) _readInt(postBuffer.remaining())) << 32); + } else { + filePointer += Long.BYTES; + return postBuffer.getLong(); + } + } + + private long slowReadLong(final int remaining, final int pos) throws IOException { + final long l1 = _readInt(remaining, pos); + final long l2; + if (remaining < Integer.BYTES) { + // the first _readInt will have refilled the buffer, so we adjust here + l2 = _readInt(Integer.BYTES, postBufferBaseline + (Integer.BYTES - remaining)); + } else if (remaining == Integer.BYTES) { + // aligned, so we can refill directly + refill(); + l2 = postBuffer.getInt(postBufferBaseline); + } else { + // the first _readInt will _not_ have refilled the buffer, so proceed normally + l2 = _readInt(remaining - Integer.BYTES, pos + Integer.BYTES); + } + return (l1 & 0xFFFFFFFFL) | (l2 << 32); + } + + @Override + public int readVInt() throws IOException { + final long pos = seekPos; + if (pos != -1) { + seekPos = -1; + actualSeek(pos); + } + return _readVInt(postBuffer.remaining()); + } + + public int _readVInt(int remaining) throws IOException { + byte b; + if (remaining <= Integer.BYTES) { + b = _readByte(remaining); + if (b >= 0) return b; + int i = b & 0x7F; + b = _readByte(--remaining); + i |= (b & 0x7F) << 7; + if (b >= 0) return i; + b = _readByte(--remaining); + i |= (b & 0x7F) << 14; + if (b >= 0) return i; + b = _readByte(--remaining); + i |= (b & 0x7F) << 21; + if (b >= 0) return i; + b = _readByte(--remaining); + // Warning: the next ands use 0x0F / 0xF0 - beware copy/paste errors: + i |= (b & 0x0F) << 28; + if ((b & 0xF0) == 0) return i; + } else { + b = postBuffer.get(); + filePointer++; + if (b >= 0) return b; + int i = b & 0x7F; + b = postBuffer.get(); + filePointer++; + i |= (b & 0x7F) << 7; + if (b >= 0) return i; + b = postBuffer.get(); + filePointer++; + i |= (b & 0x7F) << 14; + if (b >= 0) return i; + b = postBuffer.get(); + filePointer++; + i |= (b & 0x7F) << 21; + if (b >= 0) return i; + b = postBuffer.get(); + filePointer++; + // Warning: the next ands use 0x0F / 0xF0 - beware copy/paste errors: + i |= (b & 0x0F) << 28; + if ((b & 0xF0) == 0) return i; + } + throw new IOException("Invalid vInt detected (too many bits)"); + } + + @Override + public long readVLong() throws IOException { + return readVLong(false); + } + + @Override + public long readZLong() throws IOException { + return BitUtil.zigZagDecode(readVLong(true)); + } + + private long readVLong(final boolean allowNegative) throws IOException { + final long pos = seekPos; + if (pos != -1) { + seekPos = -1; + actualSeek(pos); + } + int remaining = postBuffer.remaining(); + byte b; + long i; + if (remaining <= (allowNegative ? Long.BYTES + 1 : Long.BYTES)) { + b = _readByte(remaining); + if (b >= 0) return b; + i = b & 0x7FL; + b = _readByte(--remaining); + i |= (b & 0x7FL) << 7; + if (b >= 0) return i; + b = _readByte(--remaining); + i |= (b & 0x7FL) << 14; + if (b >= 0) return i; + b = _readByte(--remaining); + i |= (b & 0x7FL) << 21; + if (b >= 0) return i; + b = _readByte(--remaining); + i |= (b & 0x7FL) << 28; + if (b >= 0) return i; + b = _readByte(--remaining); + i |= (b & 0x7FL) << 35; + if (b >= 0) return i; + b = _readByte(--remaining); + i |= (b & 0x7FL) << 42; + if (b >= 0) return i; + b = _readByte(--remaining); + i |= (b & 0x7FL) << 49; + if (b >= 0) return i; + b = _readByte(--remaining); + i |= (b & 0x7FL) << 56; + if (b >= 0) return i; + if (!allowNegative) { + throw new IOException("Invalid vLong detected (negative values disallowed)"); + } + b = _readByte(--remaining); + } else { + b = postBuffer.get(); + filePointer++; + if (b >= 0) return b; + i = b & 0x7FL; + b = postBuffer.get(); + filePointer++; + i |= (b & 0x7FL) << 7; + if (b >= 0) return i; + b = postBuffer.get(); + filePointer++; + i |= (b & 0x7FL) << 14; + if (b >= 0) return i; + b = postBuffer.get(); + filePointer++; + i |= (b & 0x7FL) << 21; + if (b >= 0) return i; + b = postBuffer.get(); + filePointer++; + i |= (b & 0x7FL) << 28; + if (b >= 0) return i; + b = postBuffer.get(); + filePointer++; + i |= (b & 0x7FL) << 35; + if (b >= 0) return i; + b = postBuffer.get(); + filePointer++; + i |= (b & 0x7FL) << 42; + if (b >= 0) return i; + b = postBuffer.get(); + filePointer++; + i |= (b & 0x7FL) << 49; + if (b >= 0) return i; + b = postBuffer.get(); + filePointer++; + i |= (b & 0x7FL) << 56; + if (b >= 0) return i; + if (!allowNegative) { + throw new IOException("Invalid vLong detected (negative values disallowed)"); + } + b = postBuffer.get(); + filePointer++; + } + i |= (b & 0x7FL) << 63; + if (b == 0 || b == 1) return i; + throw new IOException("Invalid vLong detected (more than 64 bits)"); + } + + @Override + public void readLongs(final long[] dst, final int offset, final int length) throws IOException { + final long pos = seekPos; + if (pos != -1) { + seekPos = -1; + actualSeek(pos); + } + if (longViews == null) { + longViews = initLongViews(); + } + final int remaining = postBuffer.remaining(); + final long bytesRequested = (long) length << 3; + if (remaining < bytesRequested) { + dst[offset] = _readLong(remaining); + for (int i = 1; i < length; ++i) { + dst[offset + i] = _readLong(postBuffer.remaining()); + } + } else { + final int position = postBuffer.position(); + longViews[position & 0x07].position(position >>> 3).get(dst, offset, length); + filePointer += bytesRequested; + postBuffer.position(position + (int) bytesRequested); + } + } + + @Override + public void readInts(final int[] dst, final int offset, final int length) throws IOException { + final long pos = seekPos; + if (pos != -1) { + seekPos = -1; + actualSeek(pos); + } + if (intViews == null) { + intViews = initIntViews(); + } + final int remaining = postBuffer.remaining(); + final long bytesRequested = (long) length << 2; + if (remaining < bytesRequested) { + dst[offset] = _readInt(remaining); + for (int i = 1; i < length; ++i) { + dst[offset + i] = _readInt(postBuffer.remaining()); + } + } else { + final int position = postBuffer.position(); + intViews[position & 0x03].position(position >>> 2).get(dst, offset, length); + filePointer += bytesRequested; + postBuffer.position(position + (int) bytesRequested); + } + } + + @Override + public void readFloats(final float[] dst, final int offset, final int length) + throws IOException { + final long pos = seekPos; + if (pos != -1) { + seekPos = -1; + actualSeek(pos); + } + if (floatViews == null) { + floatViews = initFloatViews(); + } + final int remaining = postBuffer.remaining(); + final long bytesRequested = (long) length << 2; + if (remaining < bytesRequested) { + dst[offset] = Float.intBitsToFloat(_readInt(remaining)); + for (int i = 1; i < length; ++i) { + dst[offset + i] = Float.intBitsToFloat(_readInt(postBuffer.remaining())); + } + } else { + final int position = postBuffer.position(); + floatViews[position & 0x03].position(position >>> 2).get(dst, offset, length); + filePointer += bytesRequested; + postBuffer.position(position + (int) bytesRequested); + } + } + + private LongBuffer[] initLongViews() { + if (postBuffer.isDirect()) { + LongBuffer[] template = + multiLongViews[currentBlockIdx >> COMPRESSION_TO_MAP_TRANSLATE_SHIFT]; + LongBuffer[] ret = new LongBuffer[Long.BYTES]; + for (int i = Long.BYTES - 1; i >= 0; i--) { + ret[i] = template[i].duplicate(); + } + return ret; + } else { + final LongBuffer[] ret = new LongBuffer[Long.BYTES]; + final ByteBuffer template = postBuffer.duplicate().order(ByteOrder.LITTLE_ENDIAN); + final int lim = postBuffer.limit(); + for (int i = Long.BYTES - 1; i >= 0; i--) { + if (i < lim) { + ret[i] = template.position(i).asLongBuffer(); + } else { + ret[i] = EMPTY_LONGBUFFER; + } + } + return ret; + } + } + + private IntBuffer[] initIntViews() { + if (postBuffer.isDirect()) { + IntBuffer[] template = multiIntViews[currentBlockIdx >> COMPRESSION_TO_MAP_TRANSLATE_SHIFT]; + IntBuffer[] ret = new IntBuffer[Integer.BYTES]; + for (int i = Integer.BYTES - 1; i >= 0; i--) { + ret[i] = template[i].duplicate(); + } + return ret; + } else { + final IntBuffer[] ret = new IntBuffer[Integer.BYTES]; + final ByteBuffer template = postBuffer.duplicate().order(ByteOrder.LITTLE_ENDIAN); + final int lim = postBuffer.limit(); + for (int i = Integer.BYTES - 1; i >= 0; i--) { + if (i < lim) { + ret[i] = template.position(i).asIntBuffer(); + } else { + ret[i] = EMPTY_INTBUFFER; + } + } + return ret; + } + } + + private FloatBuffer[] initFloatViews() { + if (postBuffer.isDirect()) { + FloatBuffer[] template = + multiFloatViews[currentBlockIdx >> COMPRESSION_TO_MAP_TRANSLATE_SHIFT]; + FloatBuffer[] ret = new FloatBuffer[Float.BYTES]; + for (int i = Float.BYTES - 1; i >= 0; i--) { + ret[i] = template[i].duplicate(); + } + return ret; + } else { + final FloatBuffer[] ret = new FloatBuffer[Float.BYTES]; + final ByteBuffer template = postBuffer.duplicate().order(ByteOrder.LITTLE_ENDIAN); + final int lim = postBuffer.limit(); + for (int i = Integer.BYTES - 1; i >= 0; i--) { + if (i < lim) { + ret[i] = template.position(i).asFloatBuffer(); + } else { + ret[i] = EMPTY_FLOATBUFFER; + } + } + return ret; + } + } + + @Override + public String readString() throws IOException { + final long pos = seekPos; + if (pos != -1) { + seekPos = -1; + actualSeek(pos); + } + return _readString(); + } + + public String _readString() throws IOException { + final int length = _readVInt(postBuffer.remaining()); + final byte[] bytes = new byte[length]; + final int left = postBuffer.remaining(); + filePointer += length; + if (left < length) { + slowReadBytes(bytes, 0, length, left); + } else { + postBuffer.get(bytes, 0, length); + } + return new String(bytes, 0, length, StandardCharsets.UTF_8); + } + + @Override + public Map readMapOfStrings() throws IOException { + final long pos = seekPos; + if (pos != -1) { + seekPos = -1; + actualSeek(pos); + } + final int count = _readVInt(postBuffer.remaining()); + switch (count) { + case 0: + return Collections.emptyMap(); + case 1: + return Collections.singletonMap(_readString(), _readString()); + default: + final Map map = + count > 10 ? CollectionUtil.newHashMap(count) : new TreeMap<>(); + for (int i = count; i > 0; i--) { + map.put(_readString(), _readString()); + } + return Collections.unmodifiableMap(map); + } + } + + @Override + public Set readSetOfStrings() throws IOException { + final long pos = seekPos; + if (pos != -1) { + seekPos = -1; + actualSeek(pos); + } + final int count = _readVInt(postBuffer.remaining()); + switch (count) { + case 0: + return Collections.emptySet(); + case 1: + return Collections.singleton(_readString()); + default: + final Set set = count > 10 ? CollectionUtil.newHashSet(count) : new TreeSet<>(); + for (int i = count; i > 0; i--) { + set.add(_readString()); + } + return Collections.unmodifiableSet(set); + } + } + + @Override + @SuppressWarnings("try") + public void close() throws IOException { + try { + if (mapped == null) return; + + // make local copy, then un-set early + final ByteBuffer[] bufs = mapped; + final ByteBuffer[] accessBufs = accessMapped; + unsetBuffers(); + + if (isClone) return; + + // tell the guard to invalidate and later unmap the bytebuffers (if supported): + try (priorityLoad; + IndexInput fullyLoaded = loaded.get()) { + compressedGuard.invalidateAndUnmap(bufs); + } finally { + guard.invalidateAndUnmap(accessBufs); + } + } finally { + unsetBuffers(); + } + } + + private void unsetBuffers() { + accessMapped = null; + mapped = null; + currentBlockIdx = -1; + postBuffer = null; + floatViews = null; + intViews = null; + longViews = null; + } + + @Override + public long getFilePointer() { + return (seekPos == -1 ? filePointer : seekPos) - offset; + } + + @Override + public void seek(final long pos) throws IOException { + seekPos = offset + pos; // defer the actual seek + } + + @Override + public long length() { + return sliceLength; + } + + @Override + public IndexInput clone() { + IndexInput ret; + try { + IndexInput fullyLoaded = loaded.get(); + if (fullyLoaded != null) { + loadedCt.increment(); + ret = fullyLoaded.slice("clone", offset, sliceLength); + } else if (populated(offset, sliceLength)) { + populatedCt.increment(); + ret = accessPopulated.slice("clone", offset, sliceLength); + } else { + lazyCt.increment(); + ret = new LazyLoadInput("clone", this, 0, sliceLength); + } + ret.seek(getFilePointer()); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + return ret; + } + + @Override + public IndexInput slice(String sliceDescription, long offset, long length) throws IOException { + long absoluteOffset = this.offset + offset; + IndexInput fullyLoaded = loaded.get(); + if (fullyLoaded != null) { + loadedCt.increment(); + return fullyLoaded.slice("slice", absoluteOffset, length); + } else if (populated(absoluteOffset, length)) { + populatedCt.increment(); + return accessPopulated.slice("slice", absoluteOffset, length); + } else { + lazyCt.increment(); + return new LazyLoadInput("slice", this, offset, length); + } + } + } +} diff --git a/solr/core/src/java/org/apache/solr/storage/AsyncDirectWriteHelper.java b/solr/core/src/java/org/apache/solr/storage/AsyncDirectWriteHelper.java new file mode 100644 index 00000000000..e0a1ac2b229 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/storage/AsyncDirectWriteHelper.java @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.storage; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.Phaser; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import org.apache.solr.util.IOFunction; + +public class AsyncDirectWriteHelper implements Closeable { + + private int populatingBuffer = 0; + private int consumingBuffer = 1; + private final int blockSize; + private final boolean useDirectIO; + private final Struct[] buffers = new Struct[2]; + private final AtomicReference> future = new AtomicReference<>(); + + private enum Status { + SYNC, + ASYNC, + FINISHED, + FLUSH_ASYNC + } + + private volatile Status status = Status.SYNC; + private volatile int flushBufferIdx = -1; + private static final Future CLOSED = new CompletableFuture<>(); + private final long[] writePos = new long[] {-1}; + private final Path path; + private final FileChannel[] channel = new FileChannel[1]; + + public AsyncDirectWriteHelper(int blockSize, int bufferSize, Path path, boolean useDirectIO) { + this.blockSize = blockSize; + this.path = path; + this.useDirectIO = useDirectIO; + Function> writeFunctionSupplier = + (buffer) -> { + return (writePos) -> { + writePos[0] += channel[0].write(buffer, writePos[0]); + return writePos; + }; + }; + for (int i = 1; i >= 0; i--) { + buffers[i] = new Struct(blockSize, bufferSize, writeFunctionSupplier); + } + } + + public ByteBuffer init(long pos) throws IOException { + writePos[0] = pos; + assert populatingBuffer == 0; + return buffers[populatingBuffer].buffer; + } + + private ByteBuffer syncSwap(ByteBuffer populated) throws IOException { + Struct sync = buffers[populatingBuffer]; + assert Objects.equals(sync.buffer, populated); + sync.writeFunction.apply(writePos); + return populated.clear(); + } + + private IOFunction swapConsume() { + Struct releasing = buffers[consumingBuffer]; + Struct acquiring = buffers[consumingBuffer ^= 1]; + // mark previous buffer as ready to be written to + releasing.write.arrive(); + // block on reaching the read phase for the new buffer + acquiring.read.arriveAndAwaitAdvance(); + return acquiring.writeFunction; + } + + public ByteBuffer write(ByteBuffer populated) throws IOException { + switch (status) { + case FINISHED: + case FLUSH_ASYNC: + throw new IllegalStateException(); + case SYNC: + return syncSwap(populated); + case ASYNC: + break; // proceed + } + Struct releasing = buffers[populatingBuffer]; + Struct acquiring = buffers[populatingBuffer ^= 1]; + assert Objects.equals(releasing.buffer, populated); + // mark previous buffer as ready to be read from + releasing.read.arrive(); + // block on reaching the write phase for the new buffer + acquiring.write.arriveAndAwaitAdvance(); + return acquiring.buffer.clear(); + } + + private Future startWrite(ExecutorService exec) { + return exec.submit( + () -> { + initChannel(); + IOFunction ioFunction = swapConsume(); + while (status == Status.ASYNC) { + ioFunction.apply(writePos); + ioFunction = swapConsume(); + } + int adjust; + if (consumingBuffer != flushBufferIdx) { + // we've broken out of the loop. If we (consumer) were blocking waiting for input on the + // final `flushBufferIdx`, then we do nothing. But if we (consumingBuffer idx) is _not_ + // the final flush buffer, we need to write our output and place our entry back in write + // phase to signal the flush thread that it may proceed. + ioFunction.apply(writePos); + switch (status) { + case FINISHED: + buffers[consumingBuffer].write.arrive(); + return null; + case FLUSH_ASYNC: + status = Status.FINISHED; + ioFunction = swapConsume(); + adjust = adjustFinalBuffer(buffers[consumingBuffer].buffer); + break; + default: + throw new IllegalStateException(); + } + } else if (status == Status.FLUSH_ASYNC) { + status = Status.FINISHED; + adjust = adjustFinalBuffer(buffers[consumingBuffer].buffer); + } else { + return null; + } + if (adjust != -1) { + ioFunction.apply(writePos); + if (adjust != 0) { + channel[0].truncate(writePos[0] - adjust); + } + } + return null; + }); + } + + private void initChannel() throws IOException { + if (useDirectIO) { + channel[0] = + FileChannel.open( + path, + StandardOpenOption.WRITE, + StandardOpenOption.CREATE_NEW, + CompressingDirectory.getDirectOpenOption()); + } else { + channel[0] = FileChannel.open(path, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW); + } + } + + public void startSync() throws IOException { + initChannel(); + } + + public void start(ExecutorService exec) { + status = Status.ASYNC; + Future f = startWrite(exec); + if (!future.compareAndSet(null, f)) { + f.cancel(true); + throw new IllegalStateException("started multiple times"); + } + } + + public void flush(ByteBuffer populated, boolean synchronous) throws IOException { + switch (status) { + case FINISHED: + case FLUSH_ASYNC: + throw new IllegalStateException("flushed multiple times"); + case SYNC: + status = Status.FINISHED; + int adjust = adjustFinalBuffer(populated); + if (adjust != -1) { + syncSwap(populated); + if (adjust != 0) { + channel[0].truncate(writePos[0] - adjust); + } + } + return; + case ASYNC: + flushBufferIdx = populatingBuffer; + status = synchronous ? Status.FINISHED : Status.FLUSH_ASYNC; + break; // proceed + } + Struct last = buffers[populatingBuffer]; + assert Objects.equals(last.buffer, populated); + // first mark status as finished so that write thread may exit + // mark the final buffer has ready to have its content read. The only practical reason + // we must do this here is to unblock the write thread if it's waiting on this condition + // (the write thread will exit though, as we write the last buffer synchronously) + last.read.arrive(); + // wait for the other buffer to be writable. We will not actually populate it (as we + // would in the case of `swapPopulate()`, but blocking on this condition indicates that + // any data in this buffer has been flushed, and we may proceed to flush the last buffer. + if (synchronous) { + int adjust = adjustFinalBuffer(populated); + buffers[populatingBuffer ^ 1].write.arriveAndAwaitAdvance(); + // finally, write the last buffer synchronously. + if (adjust != -1) { + last.writeFunction.apply(writePos); + if (adjust != 0) { + channel[0].truncate(writePos[0] - adjust); + } + } + } + } + + private int adjustFinalBuffer(ByteBuffer populated) { + int remainingInBuffer = populated.position(); + int adjust; + if (remainingInBuffer == 0) { + populated.limit(0); + adjust = -1; + } else { + // we need to rewind, as we have to write full blocks (we truncate file later): + populated.rewind(); + int flushLimit = (((remainingInBuffer - 1) / blockSize) + 1) * blockSize; + populated.limit(flushLimit); + adjust = flushLimit - remainingInBuffer; + } + return adjust; + } + + public int write(ByteBuffer src, long position) throws IOException { + return channel[0].write(src, position); + } + + @Override + @SuppressWarnings("try") + public void close() throws IOException { + try { + Future f = future.getAndSet(CLOSED); + if (f != null) { + if (f == CLOSED) { + throw new IllegalStateException("closed multiple times"); + } + if (status == Status.ASYNC) { + f.cancel(true); + } + try { + f.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (cause instanceof IOException) { + throw (IOException) cause; + } else { + throw new RuntimeException(e); + } + } + } + } finally { + try (FileChannel ignored = channel[0]) { + // ensure that FileChannel is closed. + } + } + } + + private static final class Struct { + private final ByteBuffer buffer; + private final Phaser read = new Phaser(2); + private final Phaser write = new Phaser(2); + private final IOFunction writeFunction; + + private Struct( + int blockSize, + int bufferSize, + Function> writeFunctionSupplier) { + this.buffer = ByteBuffer.allocateDirect(bufferSize + blockSize - 1).alignedSlice(blockSize); + this.writeFunction = writeFunctionSupplier.apply(this.buffer); + } + } +} diff --git a/solr/core/src/java/org/apache/solr/storage/CompressingDirectory.java b/solr/core/src/java/org/apache/solr/storage/CompressingDirectory.java new file mode 100644 index 00000000000..70ca3bfc654 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/storage/CompressingDirectory.java @@ -0,0 +1,490 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.storage; + +import java.io.ByteArrayOutputStream; +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.nio.file.OpenOption; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.concurrent.ExecutorService; +import org.apache.lucene.store.DataInput; +import org.apache.lucene.store.DataOutput; +import org.apache.lucene.store.FSDirectory; +import org.apache.lucene.store.FSLockFactory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.store.OutputStreamDataOutput; +import org.apache.lucene.util.compress.LZ4; + +public class CompressingDirectory extends FSDirectory { + + /** + * Reference to {@code com.sun.nio.file.ExtendedOpenOption.DIRECT} by reflective class and enum + * lookup. There are two reasons for using this instead of directly referencing + * ExtendedOpenOption.DIRECT: + * + *
    + *
  1. ExtendedOpenOption.DIRECT is OpenJDK's internal proprietary API. This API causes + * un-suppressible(?) warning to be emitted when compiling with --release flag and value N, + * where N is smaller than the the version of javac used for compilation. For details, + * please refer to https://bugs.java.com/bugdatabase/view_bug.do?bug_id=JDK-8259039. + *
  2. It is possible that Lucene is run using JDK that does not support + * ExtendedOpenOption.DIRECT. In such a case, dynamic lookup allows us to bail out with + * UnsupportedOperationException with meaningful error message. + *
+ * + *

This reference is {@code null}, if the JDK does not support direct I/O. + */ + static final OpenOption ExtendedOpenOption_DIRECT; // visible for test + + static { + OpenOption option; + try { + final Class clazz = + Class.forName("com.sun.nio.file.ExtendedOpenOption").asSubclass(OpenOption.class); + option = + Arrays.stream(clazz.getEnumConstants()) + .filter(e -> e.toString().equalsIgnoreCase("DIRECT")) + .findFirst() + .orElse(null); + } catch ( + @SuppressWarnings("unused") + Exception e) { + option = null; + } + ExtendedOpenOption_DIRECT = option; + } + + static final boolean DEFAULT_USE_DIRECT_IO = false; + + static OpenOption getDirectOpenOption() { + if (ExtendedOpenOption_DIRECT == null) { + throw new UnsupportedOperationException( + "com.sun.nio.file.ExtendedOpenOption.DIRECT is not available in the current JDK version."); + } + return ExtendedOpenOption_DIRECT; + } + + private final int blockSize; + private final ExecutorService ioExec; + private final Path directoryPath; + private final boolean useAsyncIO; + private final boolean useDirectIO; + + public CompressingDirectory( + Path path, ExecutorService ioExec, boolean useAsyncIO, boolean useDirectIO) + throws IOException { + super(path, FSLockFactory.getDefault()); + this.blockSize = (int) (Files.getFileStore(path).getBlockSize()); + this.ioExec = ioExec; + directoryPath = path; + this.useAsyncIO = useAsyncIO; + this.useDirectIO = useDirectIO; + } + + @Override + public long fileLength(String name) throws IOException { + Path path = directoryPath.resolve(name); + ensureOpen(); + if (getPendingDeletions().contains(name)) { + throw new NoSuchFileException("file \"" + name + "\" is pending delete"); + } + if (Files.size(path) < Long.BYTES) { + return 0; + } else { + try (FileInputStream in = new FileInputStream(path.toFile())) { + byte[] bytes = in.readNBytes(Long.BYTES); + return ByteBuffer.wrap(bytes).getLong(0); + } + } + } + + /** From DirectIODirectory */ + public static final int DEFAULT_MERGE_BUFFER_SIZE = 256 * 1024; + + /** + * We will have 2 alternating read buffers of this size. Each buffer is quite large (8M), but this + * is off-heap memory, and recall that we are hereby entirely skipping the page cache, through + * which all of this data would otherwise be churned. + * + *

The main reason for the large size is because iowait is quite bursty; the larger the reads, + * the more diffusely the iowait is spread out, reducing the risk that an individual hiccup will + * block the processing (i.e. decompressing) thread. + */ + public static final int DEFAULT_DISK_READ_BUFFER_SIZE = + DEFAULT_MERGE_BUFFER_SIZE << 5; // 8M buffer (large) seems optimal + + @Override + public IndexInput openInput(String name, IOContext context) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public IndexOutput createOutput(String name, IOContext context) throws IOException { + return new DirectIOIndexOutput( + directoryPath.resolve(name), + name, + blockSize, + DEFAULT_MERGE_BUFFER_SIZE, + ioExec, + Integer.MAX_VALUE, + useAsyncIO, + useDirectIO); + } + + @Override + public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) + throws IOException { + throw new UnsupportedOperationException("CompressingDirectory does not create temp outputs"); + } + + private static class AccessibleBAOS extends ByteArrayOutputStream { + public int transferTo(DataOutput out) throws IOException { + out.writeBytes(this.buf, this.count); + return this.count; + } + } + + private static class BytesOut extends OutputStreamDataOutput { + private final AccessibleBAOS baos; + + public BytesOut() { + this(new AccessibleBAOS()); + } + + private BytesOut(AccessibleBAOS baos) { + super(baos); + this.baos = baos; + } + + public int transferTo(DataOutput out) throws IOException { + return baos.transferTo(out); + } + } + + static final class DirectIOIndexOutput extends IndexOutput { + private final byte[] compressBuffer = new byte[COMPRESSION_BLOCK_SIZE]; + private final LZ4.FastCompressionHashTable ht = new LZ4.FastCompressionHashTable(); + private final ByteBuffer preBuffer; + private final AsyncDirectWriteHelper writeHelper; + private ByteBuffer buffer; + private final BytesOut blockDeltas = new BytesOut(); + private int prevBlockSize = BLOCK_SIZE_ESTIMATE; // estimate 50% compression + private boolean wroteBlock = false; + private final ByteBuffer initialBlock; + + // { + // [(long) LOGICAL_LENGTH], + // [(int) BLOCK_MAP_FOOTER_LENGTH], + // [(byte) COMPRESSION_TYPE_ID], + // [(byte) COMPRESSION_BLOCK_TYPE_ID] + // [(2 bytes) unspecified] -- for potential future use + // } + static final int HEADER_SIZE = 16; // 16 bytes + + private long filePos; + private boolean isOpen; + + /** + * Creates a new instance of DirectIOIndexOutput for writing index output with direct IO + * bypassing OS buffer + * + * @throws UnsupportedOperationException if the JDK does not support Direct I/O + * @throws IOException if the operating system or filesystem does not support support Direct I/O + * or a sufficient equivalent. + */ + public DirectIOIndexOutput( + Path path, + String name, + int blockSize, + int bufferSize, + ExecutorService ioExec, + int expectLength, + boolean useAsyncIO, + boolean useDirectIO) + throws IOException { + super("DirectIOIndexOutput(path=\"" + path.toString() + "\")", name); + + // stored only to lazily compute the pathHash + writeHelper = new AsyncDirectWriteHelper(blockSize, bufferSize, path, useDirectIO); + buffer = writeHelper.init(0); + preBuffer = ByteBuffer.wrap(compressBuffer); + initialBlock = ByteBuffer.allocateDirect(blockSize + blockSize - 1).alignedSlice(blockSize); + + // allocate space for the header + buffer.position(buffer.position() + HEADER_SIZE); + + if (expectLength > bufferSize && useAsyncIO) { + writeHelper.start(ioExec); + } else { + writeHelper.startSync(); + } + isOpen = true; + } + + @Override + public void writeByte(byte b) throws IOException { + preBuffer.put(b); + if (!preBuffer.hasRemaining()) { + dump(); + } + } + + @Override + public void writeBytes(byte[] src, int offset, int len) throws IOException { + int toWrite = len; + while (true) { + final int left = preBuffer.remaining(); + if (left <= toWrite) { + preBuffer.put(src, offset, left); + toWrite -= left; + offset += left; + dump(); + } else { + preBuffer.put(src, offset, toWrite); + break; + } + } + } + + private void dump() throws IOException { + assert preBuffer.position() == COMPRESSION_BLOCK_SIZE; + + preBuffer.rewind(); + + LZ4.compressWithDictionary(compressBuffer, 0, 0, COMPRESSION_BLOCK_SIZE, out, ht); + int nextBlockSize = out.resetSize(); + blockDeltas.writeZInt(nextBlockSize - prevBlockSize); + prevBlockSize = nextBlockSize; + filePos += COMPRESSION_BLOCK_SIZE; + + preBuffer.clear(); + } + + private void flush() throws IOException { + preBuffer.flip(); + int preBufferRemaining = preBuffer.remaining(); + if (preBufferRemaining > 0) { + filePos += preBufferRemaining; + LZ4.compressWithDictionary(compressBuffer, 0, 0, preBufferRemaining, out, ht); + } + int blockMapFooterSize = blockDeltas.transferTo(out); + if (wroteBlock) { + writeHelper.flush(buffer, true); + initialBlock.putLong(0, filePos); + initialBlock.putInt(Long.BYTES, blockMapFooterSize); + initialBlock.put(HEADER_SIZE - Integer.BYTES, (byte) COMPRESSION_BLOCK_TYPE.id); + initialBlock.put(HEADER_SIZE - Integer.BYTES + 1, (byte) COMPRESSION_TYPE.id); + writeHelper.write(initialBlock, 0); + } else { + if (filePos > 0) { + buffer.putLong(0, filePos); + buffer.putInt(Long.BYTES, blockMapFooterSize); + buffer.put(HEADER_SIZE - Integer.BYTES, (byte) COMPRESSION_BLOCK_TYPE.id); + buffer.put(HEADER_SIZE - Integer.BYTES + 1, (byte) COMPRESSION_TYPE.id); + } else { + assert filePos == 0 && buffer.position() == HEADER_SIZE; + buffer.rewind(); + buffer.limit(0); + } + writeHelper.flush(buffer, true); + } + } + + private final SizeTrackingDataOutput out = new SizeTrackingDataOutput(); + + private void writeBlock() throws IOException { + if (!wroteBlock) { + wroteBlock = true; + buffer.rewind(); + int restoreLimit = buffer.limit(); + buffer.limit(initialBlock.limit()); + initialBlock.put(buffer); + initialBlock.rewind(); + buffer.limit(restoreLimit); + } + // we need to rewind, as we have to write full blocks (we truncate file later): + buffer.rewind(); + buffer = writeHelper.write(buffer); + } + + @Override + public long getFilePointer() { + return filePos + preBuffer.position(); + } + + @Override + public long getChecksum() { + throw new UnsupportedOperationException(); + } + + @Override + public void close() throws IOException { + if (isOpen) { + isOpen = false; + try (writeHelper) { + // flush and close channel + flush(); + } + } + } + + private class SizeTrackingDataOutput extends DataOutput { + private int size = 0; + + private int resetSize() { + int ret = size; + size = 0; + return ret; + } + + @Override + public void writeByte(byte b) throws IOException { + size++; + shouldWriteBytes(1); + buffer.put(b); + } + + @Override + public void writeBytes(byte[] b, int offset, int length) throws IOException { + size += length; + do { + int shouldWriteBytes = shouldWriteBytes(length); + buffer.put(b, offset, shouldWriteBytes); + offset += shouldWriteBytes; + length -= shouldWriteBytes; + } while (length > 0); + } + + private int shouldWriteBytes(int wantWriteBytes) throws IOException { + int remaining = buffer.remaining(); + if (remaining >= wantWriteBytes) { + return wantWriteBytes; + } else if (remaining > 0) { + return remaining; + } + writeBlock(); + return Math.min(wantWriteBytes, buffer.remaining()); + } + } + } + + enum CompressionType { + LZ4(0); + final int id; + + CompressionType(int id) { + this.id = id; + } + } + + enum CompressionBlockType { + SIZE_256K(0, 18); + + CompressionBlockType(int id, int blockShift) { + this.id = id; + this.blockShift = blockShift; + this.blockSize = 1 << blockShift; + this.blockMaskLow = this.blockSize - 1; + this.blockSizeEstimate = this.blockSize >> 1; // estimate 50% compression + } + + final int id; + final int blockShift; + final int blockSize; + final int blockMaskLow; + final int blockSizeEstimate; + } + + static final CompressionType COMPRESSION_TYPE = CompressionType.LZ4; + static final CompressionBlockType COMPRESSION_BLOCK_TYPE = CompressionBlockType.SIZE_256K; + public static final int COMPRESSION_BLOCK_SHIFT = COMPRESSION_BLOCK_TYPE.blockShift; + public static final int COMPRESSION_BLOCK_SIZE = COMPRESSION_BLOCK_TYPE.blockSize; + public static final int COMPRESSION_BLOCK_MASK_LOW = COMPRESSION_BLOCK_TYPE.blockMaskLow; + public static final int BLOCK_SIZE_ESTIMATE = COMPRESSION_BLOCK_TYPE.blockSizeEstimate; + + private static final int MIN_MATCH = 4; + + /** + * Copied from {@link LZ4#decompress(DataInput, int, byte[], int)} because it's faster + * decompressing from byte[] than from {@link DataInput}. + */ + public static int decompress( + final byte[] compressed, int srcPos, final int decompressedLen, final byte[] dest, int dOff) + throws IOException { + final int destEnd = dOff + decompressedLen; + + do { + // literals + final int token = compressed[srcPos++] & 0xFF; + int literalLen = token >>> 4; + + if (literalLen != 0) { + if (literalLen == 0x0F) { + byte len; + while ((len = compressed[srcPos++]) == (byte) 0xFF) { + literalLen += 0xFF; + } + literalLen += len & 0xFF; + } + System.arraycopy(compressed, srcPos, dest, dOff, literalLen); + srcPos += literalLen; + dOff += literalLen; + } + + if (dOff >= destEnd) { + break; + } + + // matchs + final int matchDec = ((compressed[srcPos++] & 0xFF) | (compressed[srcPos++] << 8)) & 0xFFFF; + assert matchDec > 0; + + int matchLen = token & 0x0F; + if (matchLen == 0x0F) { + int len; + while ((len = compressed[srcPos++]) == (byte) 0xFF) { + matchLen += 0xFF; + } + matchLen += len & 0xFF; + } + matchLen += MIN_MATCH; + + // copying a multiple of 8 bytes can make decompression from 5% to 10% faster + final int fastLen = (matchLen + 7) & 0xFFFFFFF8; + if (matchDec < matchLen || dOff + fastLen > destEnd) { + // overlap -> naive incremental copy + for (int ref = dOff - matchDec, end = dOff + matchLen; dOff < end; ++ref, ++dOff) { + dest[dOff] = dest[ref]; + } + } else { + // no overlap -> arraycopy + System.arraycopy(dest, dOff - matchDec, dest, dOff, fastLen); + dOff += matchLen; + } + } while (dOff < destEnd); + + return srcPos; + } +} diff --git a/solr/core/src/java/org/apache/solr/storage/CompressingDirectoryFactory.java b/solr/core/src/java/org/apache/solr/storage/CompressingDirectoryFactory.java new file mode 100644 index 00000000000..e5de5a89f9f --- /dev/null +++ b/solr/core/src/java/org/apache/solr/storage/CompressingDirectoryFactory.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.storage; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.file.Path; +import java.util.concurrent.ExecutorService; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FSDirectory; +import org.apache.lucene.store.LockFactory; +import org.apache.solr.common.params.SolrParams; +import org.apache.solr.common.util.ExecutorUtil; +import org.apache.solr.common.util.NamedList; +import org.apache.solr.core.StandardDirectoryFactory; + +public class CompressingDirectoryFactory extends StandardDirectoryFactory { + + private final ExecutorService ioExec = ExecutorUtil.newMDCAwareCachedThreadPool("ioExec"); + private boolean compress; + private boolean useAsyncIO; + private boolean useDirectIO; + + @Override + public void init(NamedList args) { + super.init(args); + SolrParams params = args.toSolrParams(); + compress = params.getBool("compress", true); + useDirectIO = params.getBool("useDirectIO", CompressingDirectory.DEFAULT_USE_DIRECT_IO); + useAsyncIO = params.getBool("useAsyncIO", useDirectIO); + } + + @Override + protected Directory create(String path, LockFactory lockFactory, DirContext dirContext) + throws IOException { + Directory backing; + Path p = Path.of(path); + if (compress) { + backing = new CompressingDirectory(p, ioExec, useAsyncIO, useDirectIO); + } else { + backing = FSDirectory.open(p, lockFactory); + } + return new SizeAwareDirectory(backing, 0); + } + + @Override + @SuppressWarnings("try") + public void close() throws IOException { + try (Closeable c = () -> ExecutorUtil.shutdownAndAwaitTermination(ioExec)) { + super.close(); + } + } +} diff --git a/solr/core/src/java/org/apache/solr/storage/SizeAwareDirectory.java b/solr/core/src/java/org/apache/solr/storage/SizeAwareDirectory.java new file mode 100644 index 00000000000..ae95e95782b --- /dev/null +++ b/solr/core/src/java/org/apache/solr/storage/SizeAwareDirectory.java @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.storage; + +import com.carrotsearch.hppc.procedures.LongObjectProcedure; +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.LongAdder; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FilterDirectory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.util.Accountable; +import org.apache.lucene.util.RamUsageEstimator; +import org.apache.solr.core.DirectoryFactory; +import org.apache.solr.handler.admin.CoreAdminHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SizeAwareDirectory extends FilterDirectory + implements DirectoryFactory.SizeAware, Accountable { + @SuppressWarnings("rawtypes") + private static final long BASE_RAM_BYTES_USED = + RamUsageEstimator.shallowSizeOfInstance(SizeAwareDirectory.class) + + RamUsageEstimator.shallowSizeOfInstance(LongAdder.class) + + RamUsageEstimator.shallowSizeOf(new Future[1]); + + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private final long reconcileTTLNanos; + private boolean initialized = false; + private volatile long reconciledTimeNanos; + private volatile LongAdder size = new LongAdder(); + private volatile LongObjectProcedure sizeWriter = (size, name) -> this.size.add(size); + + private final ConcurrentHashMap fileSizeMap = new ConcurrentHashMap<>(); + + private final ConcurrentHashMap liveOutputs = + new ConcurrentHashMap<>(); + + @SuppressWarnings({"unchecked", "rawtypes"}) + private final Future[] computingSize = new Future[1]; + + public SizeAwareDirectory(Directory in, long reconcileTTLNanos) { + super(in); + this.reconcileTTLNanos = reconcileTTLNanos; + if (reconcileTTLNanos == Long.MAX_VALUE) { + this.reconciledTimeNanos = 0; + } else { + this.reconciledTimeNanos = System.nanoTime() - reconcileTTLNanos; // ensure initialization + } + } + + @Override + public long ramBytesUsed() { + return BASE_RAM_BYTES_USED + + RamUsageEstimator.sizeOfMap(fileSizeMap) + + RamUsageEstimator.sizeOfMap(liveOutputs); + } + + @Override + public long size() throws IOException { + Integer reconcileThreshold = CoreAdminHandler.getReconcileThreshold(); + if (initialized + && (reconcileThreshold == null + || System.nanoTime() - reconciledTimeNanos < reconcileTTLNanos)) { + return size.sum(); + } + CompletableFuture weCompute; + Future theyCompute; + synchronized (computingSize) { + theyCompute = computingSize[0]; + if (theyCompute == null) { + weCompute = new CompletableFuture<>(); + } else { + weCompute = null; + } + } + if (weCompute == null) { + try { + return theyCompute.get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + + try { + final String[] files = in.listAll(); + + LongAdder recomputeSize = new LongAdder(); + Set recomputed = ConcurrentHashMap.newKeySet(); + LongObjectProcedure dualSizeWriter = + (fileSize, name) -> { + size.add(fileSize); + if (fileSize >= 0 || recomputed.remove(name)) { + // if it's a removal, we only want to adjust if we've already + // incorporated this file in our count! + recomputeSize.add(fileSize); + } + }; + sizeWriter = dualSizeWriter; + for (final String file : files) { + long fileSize; + recomputed.add(file); + SizeAccountingIndexOutput liveOutput = liveOutputs.get(file); + if (liveOutput != null) { + // get fileSize already written at this moment + fileSize = liveOutput.setSizeWriter(dualSizeWriter); + } else { + fileSize = DirectoryFactory.sizeOf(in, file); + if (fileSize > 0) { + // whether the file exists or not, we don't care about it if it has zero size. + // more often though, 0 size means the file isn't there. + fileSizeMap.put(file, fileSize); + if (DirectoryFactory.sizeOf(in, file) == 0) { + // during reconciliation, we have to check for file presence _after_ adding + // to the map, to prevent a race condition that could leak entries into `fileSizeMap` + fileSizeMap.remove(file); + } + } + } + recomputeSize.add(fileSize); + // TODO: do we really need to check for overflow here? + // if (recomputeSize < 0) { + // break; + // } + } + + long ret = recomputeSize.sum(); + long extant = size.sum(); + long diff = extant - ret; + boolean initializing = !initialized; + if (!initializing && Math.abs(diff) < reconcileThreshold) { + double ratio = (double) extant / ret; + if (log.isInfoEnabled()) { + log.info( + "no need to reconcile (diff {}; ratio {}; overhead {}; sizes {}/{}/{})", + humanReadableByteDiff(diff), + ratio, + RamUsageEstimator.humanReadableUnits(ramBytesUsed()), + liveOutputs.size(), + fileSizeMap.size(), + files.length); + } + ret = extant; + } else { + // swap the new objects into place + LongObjectProcedure replaceSizeWriter = (size, name) -> recomputeSize.add(size); + sizeWriter = replaceSizeWriter; + size = recomputeSize; + for (SizeAccountingIndexOutput liveOutput : liveOutputs.values()) { + liveOutput.setSizeWriter(replaceSizeWriter); + } + reconciledTimeNanos = System.nanoTime(); + if (initializing) { + initialized = true; + if (log.isInfoEnabled()) { + log.info( + "initialized heap-tracked size {} (overhead: {})", + RamUsageEstimator.humanReadableUnits(ret), + RamUsageEstimator.humanReadableUnits(ramBytesUsed())); + } + } else { + double ratio = (double) extant / ret; + log.warn( + "reconcile size {} => {} (diff {}; ratio {}; overhead {})", + extant, + ret, + humanReadableByteDiff(diff), + ratio, + RamUsageEstimator.humanReadableUnits(ramBytesUsed())); + } + } + + weCompute.complete(ret); + + return ret; + } finally { + synchronized (computingSize) { + computingSize[0] = null; + } + } + } + + private static String humanReadableByteDiff(long diff) { + if (diff >= 0) { + return RamUsageEstimator.humanReadableUnits(diff); + } else { + return "-".concat(RamUsageEstimator.humanReadableUnits(-diff)); + } + } + + @Override + public void deleteFile(String name) throws IOException { + try { + in.deleteFile(name); + } finally { + Long fileSize = fileSizeMap.remove(name); + if (fileSize != null) { + sizeWriter.apply(-fileSize, name); + } + } + } + + @Override + public IndexOutput createOutput(String name, IOContext context) throws IOException { + SizeAccountingIndexOutput ret = + new SizeAccountingIndexOutput( + name, in.createOutput(name, context), fileSizeMap, liveOutputs, sizeWriter); + liveOutputs.put(name, ret); + return ret; + } + + @Override + public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) + throws IOException { + IndexOutput backing = in.createTempOutput(prefix, suffix, context); + String name = backing.getName(); + SizeAccountingIndexOutput ret = + new SizeAccountingIndexOutput(name, backing, fileSizeMap, liveOutputs, sizeWriter); + liveOutputs.put(name, ret); + return ret; + } + + private static final class SizeAccountingIndexOutput extends IndexOutput implements Accountable { + + private static final long BASE_RAM_BYTES_USED = + RamUsageEstimator.shallowSizeOfInstance(SizeAccountingIndexOutput.class); + + private final String name; + + private final IndexOutput backing; + + private final ConcurrentHashMap fileSizeMap; + + private final ConcurrentHashMap liveOutputs; + + private volatile LongObjectProcedure sizeWriter; + + private SizeAccountingIndexOutput( + String name, + IndexOutput backing, + ConcurrentHashMap fileSizeMap, + ConcurrentHashMap liveOutputs, + LongObjectProcedure sizeWriter) { + super("byteSize(" + name + ")", name); + this.name = name; + this.backing = backing; + this.liveOutputs = liveOutputs; + this.sizeWriter = sizeWriter; + this.fileSizeMap = fileSizeMap; + } + + public long setSizeWriter(LongObjectProcedure sizeWriter) { + if (this.sizeWriter == sizeWriter) { + return 0; + } else { + // NOTE: there's an unavoidable race condition between these two + // lines, and this ordering may occasionally overestimate directory size. + this.sizeWriter = sizeWriter; + return backing.getFilePointer(); + } + } + + @Override + @SuppressWarnings("try") + public void close() throws IOException { + try (backing) { + fileSizeMap.put(name, backing.getFilePointer()); + } finally { + liveOutputs.remove(name); + } + } + + @Override + public long getFilePointer() { + return backing.getFilePointer(); + } + + @Override + public long getChecksum() throws IOException { + return backing.getChecksum(); + } + + @Override + public void writeByte(byte b) throws IOException { + backing.writeByte(b); + sizeWriter.apply(1, name); + } + + @Override + public void writeBytes(byte[] b, int offset, int length) throws IOException { + backing.writeBytes(b, offset, length); + sizeWriter.apply(length, name); + } + + @Override + public long ramBytesUsed() { + // all fields have to exist regardless; we're only interested in the overhead we add + return BASE_RAM_BYTES_USED; + } + } + + @Override + public void rename(String source, String dest) throws IOException { + in.rename(source, dest); + Long extant = fileSizeMap.put(dest, fileSizeMap.remove(source)); + assert extant == null; // it's illegal for dest to already exist + } +} diff --git a/solr/core/src/java/org/apache/solr/storage/TeeDirectory.java b/solr/core/src/java/org/apache/solr/storage/TeeDirectory.java new file mode 100644 index 00000000000..57950e689cc --- /dev/null +++ b/solr/core/src/java/org/apache/solr/storage/TeeDirectory.java @@ -0,0 +1,635 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.storage; + +import static org.apache.solr.storage.AccessDirectory.lazyTmpFileSuffixStartIdx; + +import java.io.Closeable; +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.nio.file.NoSuchFileException; +import java.nio.file.Path; +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import org.apache.commons.io.file.PathUtils; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.store.BaseDirectory; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FSDirectory; +import org.apache.lucene.store.FilterDirectory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.store.Lock; +import org.apache.lucene.store.LockFactory; +import org.apache.lucene.store.MMapDirectory; +import org.apache.lucene.util.IOUtils; +import org.apache.solr.common.util.CollectionUtil; +import org.apache.solr.util.IOFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TeeDirectory extends BaseDirectory { + + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private volatile Directory access; + private final ExecutorService ioExec; + private final AutoCloseable closeLocal; + private final IOFunction> accessFunction; + private final IOFunction>> persistentFunction; + private volatile Directory persistent; + private final BlockingQueue + persistentLengthVerificationQueue; + + /** + * This ctor (with default inline config) exists to be invoked during testing, via + * MockDirectoryFactory. + */ + public TeeDirectory(Path path, LockFactory lockFactory) throws IOException { + super(TEE_LOCK_FACTORY); + TeeDirectoryFactory.NodeLevelTeeDirectoryState ownState = + new TeeDirectoryFactory.NodeLevelTeeDirectoryState(64); + this.ioExec = ownState.ioExec; + this.persistentLengthVerificationQueue = ownState.persistentLengthVerificationQueue; + Directory naive = new MMapDirectory(path, lockFactory, MMapDirectory.DEFAULT_MAX_CHUNK_SIZE); + this.access = naive; + Path compressedPath = path; + String accessDir = System.getProperty("java.io.tmpdir"); + String pathS = path.toString(); + String scope = TeeDirectoryFactory.getScopeName(accessDir, pathS); + String accessPath = scope + "-" + Long.toUnsignedString(System.nanoTime(), 16); + this.closeLocal = + () -> { + try (ownState) { + PathUtils.delete(Path.of(accessPath)); + } + }; + accessFunction = + unused -> { + Directory dir = + new AccessDirectory(Path.of(accessPath), lockFactory, compressedPath, ownState); + return new AbstractMap.SimpleImmutableEntry<>(accessPath, dir); + }; + persistentFunction = + content -> { + assert content == naive; + content.close(); + content = new CompressingDirectory(compressedPath, ownState.ioExec, true, true); + return new AbstractMap.SimpleImmutableEntry<>(content, Collections.emptyList()); + }; + } + + public TeeDirectory( + Directory naive, + IOFunction> accessFunction, + IOFunction>> persistentFunction, + TeeDirectoryFactory.NodeLevelTeeDirectoryState nodeLevelState) { + super(TEE_LOCK_FACTORY); + this.accessFunction = accessFunction; + this.persistentFunction = persistentFunction; + this.access = naive; + this.ioExec = nodeLevelState.ioExec; + this.closeLocal = null; + this.persistentLengthVerificationQueue = nodeLevelState.persistentLengthVerificationQueue; + } + + private List associatedPaths; + + private void init() throws IOException { + synchronized (persistentFunction) { + if (this.persistent == null) { + List buildAssociatedPaths = new ArrayList<>(3); + Map.Entry> persistentEntry = persistentFunction.apply(access); + this.persistent = persistentEntry.getKey(); + Path persistentFSPath = ((FSDirectory) persistent).getDirectory(); + buildAssociatedPaths.addAll(persistentEntry.getValue()); + Map.Entry accessEntry = accessFunction.apply(null); + this.access = accessEntry.getValue(); + buildAssociatedPaths.add(accessEntry.getKey()); + associatedPaths = buildAssociatedPaths; + } + } + } + + private static final LockFactory TEE_LOCK_FACTORY = + new LockFactory() { + @Override + public Lock obtainLock(Directory dir, String lockName) throws IOException { + if (!(dir instanceof TeeDirectory)) { + throw new IllegalArgumentException(); + } + TeeDirectory teeDir = (TeeDirectory) dir; + if (IndexWriter.WRITE_LOCK_NAME.equals(lockName)) { + teeDir.init(); + } + Lock primary = teeDir.access.obtainLock(lockName); + if (teeDir.persistent == null) { + return primary; + } else { + Lock secondary; + try { + secondary = teeDir.persistent.obtainLock(lockName); + } catch (Exception e) { + primary.close(); + throw e; + } + return new TeeLock(primary, secondary); + } + } + }; + + public Path getAccessDir() { + if (access == null) { + return null; + } + Directory unwrapped = FilterDirectory.unwrap(access); + if (unwrapped instanceof FSDirectory) { + return ((FSDirectory) unwrapped).getDirectory(); + } else { + throw new UnsupportedOperationException("no directory path for " + access); + } + } + + private static final class TeeLock extends Lock { + + private final Lock primary; + private final Lock secondary; + + private TeeLock(Lock primary, Lock secondary) { + this.primary = primary; + this.secondary = secondary; + } + + @Override + public void close() throws IOException { + try (primary) { + secondary.close(); + } + } + + @Override + public void ensureValid() throws IOException { + Throwable th = null; + try { + secondary.ensureValid(); + } catch (Throwable t) { + th = t; + } finally { + if (th == null) { + try { + primary.ensureValid(); + } catch (Throwable t) { + th = t; + } + } + } + if (th != null) { + throw IOUtils.rethrowAlways(th); + } + } + } + + public void removeAssociated() throws IOException { + synchronized (persistentFunction) { + if (associatedPaths != null) { + IOUtils.rm( + associatedPaths.stream() + .map(Path::of) + .filter(p -> p.toFile().exists()) + .toArray(Path[]::new)); + } + } + } + + @Override + public String[] listAll() throws IOException { + String[] accessFiles = access.listAll(); + if (persistent == null) { + return accessFiles; + } else { + // in the common case, the access directory will contain all the files. Notably, + // temp files will _only_ be present in the access dir. But during initial startup, + // there may be files present in `persistent` that are not present in `access` + return sortAndMergeArrays(accessFiles, persistent.listAll()); + } + } + + /** + * Merges filenames (deduping) from access and persistent copies, skipping any lazy tmp files that + * exist in the access copy. + */ + static String[] sortAndMergeArrays(String[] accessFiles, String[] persistentFiles) { + final int accessLen = accessFiles.length; + if (accessLen == 0) { + return persistentFiles; + } + final int persistentLen = persistentFiles.length; + if (persistentLen == 0) { + int prunedIdx = 0; + for (int i = 0; i < accessLen; i++) { + String name = accessFiles[i]; + if (lazyTmpFileSuffixStartIdx(name) != -1) { + continue; + } + if (prunedIdx != i) { + accessFiles[prunedIdx] = name; + } + prunedIdx++; + } + if (prunedIdx == accessLen) { + return accessFiles; + } else { + String[] ret = new String[prunedIdx]; + System.arraycopy(accessFiles, 0, ret, 0, prunedIdx); + return ret; + } + } + Arrays.sort(accessFiles); + Arrays.sort(persistentFiles); + String[] tailFiles = null; + String otherFile = persistentFiles[0]; + int persistentIdx = 0; + int idx = 0; + int headUpTo = 0; + for (int i = 0; i < accessLen; i++) { + String file = accessFiles[i]; + if (lazyTmpFileSuffixStartIdx(file) != -1) { + // skip lazy temp files + if (tailFiles == null) { + tailFiles = new String[accessLen - i + persistentLen - persistentIdx]; + headUpTo = i; + } + continue; + } + while (otherFile != null) { + int cmp = otherFile.compareTo(file); + if (cmp < 0) { + if (tailFiles == null) { + tailFiles = new String[accessLen - i + persistentLen - persistentIdx]; + headUpTo = i; + } + tailFiles[idx++] = otherFile; + } else if (cmp > 0) { + break; + } + otherFile = ++persistentIdx < persistentLen ? persistentFiles[persistentIdx] : null; + } + if (tailFiles != null) { + tailFiles[idx++] = file; + } + } + if (otherFile != null) { + int persistentRemaining = persistentLen - persistentIdx; + if (tailFiles == null) { + tailFiles = new String[persistentRemaining]; + headUpTo = accessLen; + } + System.arraycopy(persistentFiles, persistentIdx, tailFiles, idx, persistentRemaining); + idx += persistentRemaining; + } + if (tailFiles == null) { + return accessFiles; + } else { + String[] ret = new String[headUpTo + idx]; + System.arraycopy(accessFiles, 0, ret, 0, headUpTo); + System.arraycopy(tailFiles, 0, ret, headUpTo, idx); + return ret; + } + } + + @Override + public void deleteFile(String name) throws IOException { + Throwable th = null; + try { + if (persistent != null && !name.endsWith(".tmp")) { + // persistent directory should never have tmp files; skip files with this reserved + // extension. + persistent.deleteFile(name); + } + } catch (Throwable t) { + th = t; + } finally { + try { + access.deleteFile(name); + if (th instanceof NoSuchFileException) { + // if we successfully delete the access copy, but threw `NoSuchFileException` for + // the persistent copy, then swallow the original exception. + // we expect this to happen only in the case of recovering after a disorderly shutdown + // (or similar situation?). It is expected in such cases that a file may have been + // flushed to disk for the access copy, and not for the persistent copy. In the case + // of `pending_segments_*` files, these files are ~explicitly partial, but must be + // deleted. It's not ideal that we have to be lenient here, but we kind of have to do, + // because we know there are legit situations where files can exist in access and not + // in persistent, and we must support the ability to delete such files. + log.info("swallow exception deleting missing persistent file: {}", name); + th = null; + } + } catch (NoSuchFileException ex) { + // when `persistent != null`, `access` is a special case. Since access may be on ephemeral + // storage, we should be ok with files being already absent if we're asked to delete them. + if (persistent == null) { + th = IOUtils.useOrSuppress(th, ex); + } + } catch (Throwable t) { + th = IOUtils.useOrSuppress(th, t); + } + } + if (th != null) { + throw IOUtils.rethrowAlways(th); + } + } + + @Override + public long fileLength(String name) throws IOException { + return access.fileLength(name); + } + + @Override + @SuppressWarnings("try") + public IndexOutput createOutput(String name, IOContext context) throws IOException { + if (name.startsWith("pending_segments_")) { + init(); + } + if (persistent == null) { + return access.createOutput(name, context); + } + IndexOutput a = null; + IndexOutput b = null; + Throwable th = null; + try { + b = persistent.createOutput(name, context); + } catch (Throwable t) { + th = t; + } finally { + if (b != null) { + try { + a = access.createOutput(name, context); + } catch (Throwable t) { + try (IndexOutput closeB = b) { + th = IOUtils.useOrSuppress(th, t); + } catch (Throwable t1) { + th = IOUtils.useOrSuppress(th, t1); + } finally { + persistent.deleteFile(name); + } + } + } + } + if (th != null) { + throw IOUtils.rethrowAlways(th); + } + assert a != null; + return new TeeIndexOutput(a, b); + } + + private static final class TeeIndexOutput extends IndexOutput { + private final IndexOutput primary; + private final IndexOutput secondary; + + private TeeIndexOutput(IndexOutput primary, IndexOutput secondary) { + super("Tee(" + primary.toString() + ", " + secondary.toString() + ")", primary.getName()); + assert primary.getName().equals(secondary.getName()); + this.primary = primary; + this.secondary = secondary; + } + + @Override + public void writeByte(byte b) throws IOException { + secondary.writeByte(b); + primary.writeByte(b); + } + + @Override + public void writeBytes(byte[] b, int offset, int length) throws IOException { + secondary.writeBytes(b, offset, length); + primary.writeBytes(b, offset, length); + } + + @Override + public void close() throws IOException { + try (primary) { + secondary.close(); + } + } + + @Override + public long getFilePointer() { + long ret = primary.getFilePointer(); + assert ret == secondary.getFilePointer(); + return ret; + } + + @Override + public long getChecksum() throws IOException { + return primary.getChecksum(); + } + } + + @Override + public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) + throws IOException { + return access.createTempOutput(prefix, suffix, context); + } + + @Override + public void sync(Collection names) throws IOException { + Future persistentFuture; + if (persistent == null) { + persistentFuture = null; + } else { + persistentFuture = + ioExec.submit( + () -> { + persistent.sync(names); + return null; + }); + } + Throwable th = null; + try { + access.sync(names); + } catch (Throwable t) { + th = t; + } finally { + if (persistentFuture != null) { + if (th == null || !persistentFuture.cancel(true)) { + try { + persistentFuture.get(); + } catch (InterruptedException e) { + // we don't throw InterruptedException, so at least we should reset the + // current thread's interrupt status + Thread.currentThread().interrupt(); + if (th == null) { + // make sure this completes exceptionally, but don't add it as + // a cause, because we've re-interrupted the thread + th = new RuntimeException("interrupted"); + } + th.addSuppressed(e); + } catch (CancellationException e) { + assert th != null; + // we are the only ones who could have cancelled this + } catch (ExecutionException e) { + th = IOUtils.useOrSuppress(th, e.getCause()); + } catch (Throwable t) { + th = IOUtils.useOrSuppress(th, t); + } + } + } + } + if (th != null) { + throw IOUtils.rethrowAlways(th); + } + } + + @Override + public void syncMetaData() throws IOException { + Throwable th = null; + try { + if (persistent != null) { + persistent.syncMetaData(); + } + } catch (Throwable t) { + th = t; + } finally { + try { + access.syncMetaData(); + } catch (Throwable t) { + th = IOUtils.useOrSuppress(th, t); + } + } + if (th != null) { + throw IOUtils.rethrowAlways(th); + } + } + + @Override + public void rename(String source, String dest) throws IOException { + Throwable th = null; + try { + if (persistent != null) { + persistent.rename(source, dest); + // NOTE: we need to incorporate `persistent.syncMetaData()` within `rename()` here, + // because `persistent` is our source of truth, and we must ensure that the changes + // here are persisted to disk _before_ we proceed to `access.rename()`. + // The pathological (though extremely unlikely) case that we protect against here is: + // both renames succeed, and `access` rename happens to be arbitrarily flushed to disk, + // but `persistent` rename is not. Then, before the explicit `syncMetaData()` in segment + // commit, there is a hard shutdown (e.g., `kill -9` or similar), potentially leaving + // the `segments_N` file intact on startup for `access`, but _not_ `persistent`. This + // is exactly what we most want to avoid: running the index off of ephemeral storage + // that is not backed by a persistent copy. + persistent.syncMetaData(); + // NOTE also: in the event of a partial rename, we should be safe based on how + // `IndexFileDeleter` cleans up partial state on startup. Worst-case scenario: + // rename pending_segments_N => segments_N succeeds on `persistent` first (because this + // is the source of truth so we run the rename there first), then we get `kill -9`'d so + // we have both pending_segments_N (in access dir) and segments_N (in persistent dir). + // The logic in `IndexFileDeleter` runs multiple passes in segment discovery, incRef'ing + // files that are referenced by known `segments_*` files, then deletion happens for + // unreferenced files only (which would in include all `pending_segments_*`). Each + // `pending_segments_N` file will be consulted on startup to determine the max segment + // gen (to prevent double-writing the same segment number), but should not conflict + // with analogous `segments_N` files, if present. + } + } catch (Throwable t) { + th = t; + } finally { + if (th == null) { + try { + access.rename(source, dest); + } catch (Throwable t) { + th = t; + if (persistent != null) { + try { + // best-effort to put it back, so the operation is atomic across both dirs + persistent.rename(dest, source); + } catch (Throwable t1) { + th = IOUtils.useOrSuppress(th, t1); + } + } + } + } + } + if (th != null) { + throw IOUtils.rethrowAlways(th); + } + } + + @Override + public IndexInput openInput(String name, IOContext context) throws IOException { + IndexInput ret = access.openInput(name, context); + if (!name.endsWith(".tmp")) { + // we do not expect tmp files to be present in persistent directory + persistentLengthVerificationQueue.offer( + new TeeDirectoryFactory.PersistentLengthVerification( + access, persistent, name, ret.length())); + } + return ret; + } + + @Override + @SuppressWarnings("try") + public void close() throws IOException { + try (closeLocal; + Closeable a = access) { + if (persistent != null) { + persistent.close(); + } + } catch (Exception e) { + throw IOUtils.rethrowAlways(e); + } + } + + @Override + public Set getPendingDeletions() throws IOException { + Set a = access.getPendingDeletions(); + if (persistent == null) { + return a; + } + Set p = persistent.getPendingDeletions(); + if (p.isEmpty()) { + return a; + } else if (a.isEmpty()) { + return p; + } + Set ret = CollectionUtil.newHashSet(a.size() + p.size()); + ret.addAll(p); + for (String f : a) { + int suffixStartIdx = lazyTmpFileSuffixStartIdx(f); + if (suffixStartIdx == -1) { + ret.add(f); + } else { + // don't externally expose actual lazy filenames; + // instead, map them to the corresponding base filename + ret.add(f.substring(0, suffixStartIdx)); + } + } + return ret; + } +} diff --git a/solr/core/src/java/org/apache/solr/storage/TeeDirectoryFactory.java b/solr/core/src/java/org/apache/solr/storage/TeeDirectoryFactory.java new file mode 100644 index 00000000000..2f91d82be13 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/storage/TeeDirectoryFactory.java @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.storage; + +import com.codahale.metrics.Meter; +import java.io.Closeable; +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.lang.ref.WeakReference; +import java.nio.file.NoSuchFileException; +import java.nio.file.Path; +import java.util.AbstractMap; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAdder; +import java.util.function.BiConsumer; +import org.apache.lucene.store.AlreadyClosedException; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FSDirectory; +import org.apache.lucene.store.FilterDirectory; +import org.apache.lucene.store.LockFactory; +import org.apache.lucene.util.RamUsageEstimator; +import org.apache.solr.common.params.SolrParams; +import org.apache.solr.common.util.ExecutorUtil; +import org.apache.solr.common.util.NamedList; +import org.apache.solr.core.CoreContainer; +import org.apache.solr.core.MMapDirectoryFactory; +import org.apache.solr.core.SolrInfoBean; +import org.apache.solr.metrics.MetricsMap; +import org.apache.solr.metrics.SolrMetricProducer; +import org.apache.solr.metrics.SolrMetricsContext; +import org.apache.solr.util.IOFunction; +import org.apache.solr.util.stats.MetricUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TeeDirectoryFactory extends MMapDirectoryFactory { + + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private NodeLevelTeeDirectoryState nodeLevelState; + private NodeLevelTeeDirectoryState ownNodeLevelState; + private WeakReference cc; + + private String accessDir; + private boolean useAsyncIO; + private boolean useDirectIO; + + @Override + public void initCoreContainer(CoreContainer cc) { + super.initCoreContainer(cc); + this.cc = new WeakReference<>(cc); + } + + static class NodeLevelTeeDirectoryState implements SolrMetricProducer { + final ExecutorService ioExec = ExecutorUtil.newMDCAwareCachedThreadPool("teeIOExec"); + private final Future lengthVerificationTask; + final BlockingQueue persistentLengthVerificationQueue; + private final Future activationTask; + final LinkedBlockingQueue activationQueue = + new LinkedBlockingQueue<>(); + final ConcurrentHashMap priorityActivate = + new ConcurrentHashMap<>(); + + private SolrMetricsContext solrMetricsContext; + final LongAdder rawCt = new LongAdder(); + final LongAdder loadedCt = new LongAdder(); + final LongAdder populatedCt = new LongAdder(); + final LongAdder lazyCt = new LongAdder(); + final LongAdder lazyMapSize = new LongAdder(); + final LongAdder lazyMapDiskUsage = new LongAdder(); + final LongAdder lazyLoadedBlockBytes = new LongAdder(); + final Meter priorityActivateMeter = new Meter(); + final Meter activateMeter = new Meter(); + + NodeLevelTeeDirectoryState(int lengthVerificationQueueSize) { + persistentLengthVerificationQueue = new ArrayBlockingQueue<>(lengthVerificationQueueSize); + activationTask = + ioExec.submit( + () -> { + Thread t = Thread.currentThread(); + int idleCount = 0; // allow a longer poll interval when nothing's happening + AccessDirectory.LazyEntry lazyEntry = null; + while (!t.isInterrupted()) { + try { + if (!priorityActivate.isEmpty()) { + idleCount = 0; + Iterator iter = + priorityActivate.keySet().iterator(); + while (iter.hasNext()) { + priorityActivateMeter.mark(iter.next().call()); + iter.remove(); + } + } else { + if (lazyEntry == null) { + lazyEntry = activationQueue.poll(idleCount * 200L, TimeUnit.MILLISECONDS); + } + if (lazyEntry != null) { + // we load background activation in multiple passes, in order to + // periodically give + // `priorityActivate` a crack at running. Otherwise, a single monolithic + // large file + // could block IO for a long time, depriving us of the ability to benefit + // from + // signals about specific file areas that should be loaded earlier. + int blocksLoadedCount = lazyEntry.load(); + if (blocksLoadedCount < 0) { + blocksLoadedCount = ~blocksLoadedCount; + lazyEntry = null; + } + activateMeter.mark(blocksLoadedCount); + idleCount = 0; + } else if (idleCount < 5) { + idleCount++; + } + } + } catch (InterruptedException ex) { + t.interrupt(); + return null; + } catch (IOException ex) { + lazyEntry = null; + String logMsg = ex.toString(); + log.warn("swallowed exception while activating input: {}", logMsg); + } catch (Throwable ex) { + lazyEntry = null; + log.warn("swallowed unexpected exception while activating input", ex); + } + } + return null; + }); + lengthVerificationTask = + ioExec.submit( + () -> { + Thread t = Thread.currentThread(); + while (!t.isInterrupted()) { + PersistentLengthVerification a = null; + try { + a = persistentLengthVerificationQueue.take(); + a.verify(); + } catch (InterruptedException e) { + t.interrupt(); + break; + } catch (Throwable th) { + log.error("error verifying persistent length {}", a); + } + } + }); + } + + @Override + public void initializeMetrics(SolrMetricsContext parentContext, String scope) { + solrMetricsContext = parentContext.getChildContext(this); + MetricsMap mm = + new MetricsMap( + (writer) -> { + writer.put("rawCt", rawCt.sum()); + writer.put("loadedCt", loadedCt.sum()); + writer.put("populatedCt", populatedCt.sum()); + writer.put("lazyCt", lazyCt.sum()); + writer.put("cumulativeLazyMapSize", lazyMapSize.sum()); + final long diskUsage = lazyMapDiskUsage.sum(); + writer.put("lazyDiskUsage", RamUsageEstimator.humanReadableUnits(diskUsage)); + writer.put("lazyDiskBytesUsed", diskUsage); + final long blockBytesLoaded = lazyLoadedBlockBytes.sum(); + writer.put( + "lazyLoadedBlockUsage", RamUsageEstimator.humanReadableUnits(blockBytesLoaded)); + writer.put("lazyLoadedBlockBytes", blockBytesLoaded); + BiConsumer c = writer.getBiConsumer(); + MetricUtils.convertMetric( + "priorityActivate", + priorityActivateMeter, + MetricUtils.ALL_PROPERTIES, + false, + false, + false, + false, + ":", + c::accept); + MetricUtils.convertMetric( + "activate", + activateMeter, + MetricUtils.ALL_PROPERTIES, + false, + false, + false, + false, + ":", + c::accept); + }); + solrMetricsContext.gauge(mm, true, scope, SolrInfoBean.Category.DIRECTORY.toString()); + } + + @Override + public SolrMetricsContext getSolrMetricsContext() { + return solrMetricsContext; + } + + @Override + @SuppressWarnings("try") + public void close() throws IOException { + try (Closeable c1 = SolrMetricProducer.super::close; + Closeable c2 = () -> ExecutorUtil.shutdownAndAwaitTermination(ioExec)) { + try { + lengthVerificationTask.cancel(true); + } finally { + activationTask.cancel(true); + } + } + } + } + + static final class PersistentLengthVerification { + private final Directory accessDir; + private final Directory persistentDir; + private final String name; + private final long accessLength; + + PersistentLengthVerification( + Directory accessDir, Directory persistentDir, String name, long accessLength) { + this.accessDir = accessDir; + this.persistentDir = persistentDir; + this.name = name; + this.accessLength = accessLength; + } + + private void verify() { + try { + long l = persistentDir.fileLength(name); + if (l != accessLength) { + log.error("file length mismatch {} != {}", l, this); + } + } catch (AlreadyClosedException th) { + // swallow this; we have to defer lookup, but we know that in doing so we run the risk + // the the directory will already have been closed by the time we look up the length + } catch (NoSuchFileException e) { + try { + accessDir.fileLength(name); + log.error("file absent in persistent, present in access: {}", this); + } catch (NoSuchFileException e1) { + // this is what we expect, so just swallow it + } catch (Throwable t) { + log.warn("unable to re-verify access length {}", this, t); + } + } catch (Throwable t) { + log.warn("unable to verify persistent length {}", this, t); + } + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("{name=").append(name).append(", length=").append(accessLength).append(", access="); + if (accessDir instanceof FSDirectory) { + sb.append(((FSDirectory) accessDir).getDirectory()); + } else { + sb.append(accessDir); + } + sb.append(", persistent="); + if (persistentDir instanceof FSDirectory) { + sb.append(((FSDirectory) persistentDir).getDirectory()); + } else { + sb.append(persistentDir); + } + sb.append("}"); + return sb.toString(); + } + } + + @Override + public void init(NamedList args) { + if (this.cc != null) { + CoreContainer cc = this.cc.get(); + this.cc = null; + assert cc != null; + nodeLevelState = + cc.getObjectCache() + .computeIfAbsent( + "nodeLevelTeeDirectoryState", + NodeLevelTeeDirectoryState.class, + (k) -> { + NodeLevelTeeDirectoryState ret = new NodeLevelTeeDirectoryState(4096); + ret.initializeMetrics( + cc.getMetricsHandler().getSolrMetricsContext(), "teeDirectory"); + return ret; + }); + } else { + nodeLevelState = new NodeLevelTeeDirectoryState(64); + ownNodeLevelState = nodeLevelState; + } + super.init(args); + SolrParams params = args.toSolrParams(); + accessDir = + params.get( + "accessDir", + System.getProperty( + "solr.teeDirectory.accessDir", System.getProperty("java.io.tmpdir"))); + if (!Path.of(accessDir).isAbsolute()) { + throw new IllegalArgumentException("accessDir should be absolute; found " + accessDir); + } + useDirectIO = params.getBool("useDirectIO", CompressingDirectory.DEFAULT_USE_DIRECT_IO); + useAsyncIO = params.getBool("useAsyncIO", useDirectIO); + } + + private static final boolean TEST_CONTEXT = System.getProperty("tests.seed") != null; + + static String getScopeName(String accessDir, String path) { + int lastPathDelimIdx = path.lastIndexOf('/'); + if (lastPathDelimIdx == -1) { + throw new IllegalArgumentException("unexpected path: " + path); + } + String dirName = path.substring(path.lastIndexOf('/')); + int end = path.lastIndexOf('/', lastPathDelimIdx - 1); + int start = path.lastIndexOf('/', end - 1); + String ret; + if ("/index".equals(dirName)) { + ret = path.substring(start, end); + } else if (dirName.startsWith("/index.")) { + // append the suffix identifier; this is a snapshot or temp index dir + ret = path.substring(start, end).concat(dirName.substring("/index".length())); + } else if (TEST_CONTEXT) { + ret = path.substring(path.lastIndexOf('/')); + } else { + throw new IllegalArgumentException("unexpected path: " + path); + } + if (TEST_CONTEXT) { + ret += "-" + Long.toUnsignedString(System.nanoTime(), 16); + Path p = Path.of(path); + if (p.startsWith(accessDir)) { + Path a = Path.of(accessDir); + Path relative = a.relativize(p); + if (relative.getNameCount() > 0) { + accessDir = + a.resolve(relative.getName(0)).toString().concat("/TeeDirectoryFactory-access"); + } + } + } + return accessDir.concat(ret); + } + + @Override + public Directory create(String path, LockFactory lockFactory, DirContext dirContext) + throws IOException { + Directory naive = super.create(path, lockFactory, dirContext); + Path compressedPath = Path.of(path); + IOFunction> accessFunction = + unused -> { + String accessPath = getScopeName(accessDir, path); + Directory dir = + new AccessDirectory(Path.of(accessPath), lockFactory, compressedPath, nodeLevelState); + return new AbstractMap.SimpleImmutableEntry<>(accessPath, dir); + }; + IOFunction>> persistentFunction = + content -> { + assert content == naive; + content.close(); + content = + new CompressingDirectory( + compressedPath, nodeLevelState.ioExec, useAsyncIO, useDirectIO); + return new AbstractMap.SimpleImmutableEntry<>(content, Collections.emptyList()); + }; + return new SizeAwareDirectory( + new TeeDirectory(naive, accessFunction, persistentFunction, nodeLevelState), 0); + } + + @Override + @SuppressWarnings("try") + protected synchronized void removeDirectory(CacheValue cacheValue) throws IOException { + try (Closeable c = () -> super.removeDirectory(cacheValue)) { + Directory d = FilterDirectory.unwrap(cacheValue.directory); + if (d instanceof TeeDirectory) { + ((TeeDirectory) d).removeAssociated(); + } + } catch (NoSuchFileException ex) { + // swallow this. Depending on the order of Directory removal, a parent directory + // may have removed us first. In any event, the file's not there, which is what + // we wanted anyway. + } + } + + @Override + @SuppressWarnings("try") + public void close() throws IOException { + try (NodeLevelTeeDirectoryState close = ownNodeLevelState) { + super.close(); + } + } +} diff --git a/solr/core/src/java/org/apache/solr/storage/package-info.java b/solr/core/src/java/org/apache/solr/storage/package-info.java new file mode 100644 index 00000000000..7fb0179ad4b --- /dev/null +++ b/solr/core/src/java/org/apache/solr/storage/package-info.java @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Support for creating docvalues on-the-fly from the inverted index at runtime. */ +package org.apache.solr.storage; diff --git a/solr/core/src/test/org/apache/solr/TestCrossCoreJoin.java b/solr/core/src/test/org/apache/solr/TestCrossCoreJoin.java index 83f91365294..081f03e6560 100644 --- a/solr/core/src/test/org/apache/solr/TestCrossCoreJoin.java +++ b/solr/core/src/test/org/apache/solr/TestCrossCoreJoin.java @@ -39,10 +39,17 @@ public class TestCrossCoreJoin extends SolrTestCaseJ4 { private static SolrCore fromCore; + private static String restoreDirectoryFactory; + @BeforeClass public static void beforeTests() throws Exception { System.setProperty("enable.update.log", "false"); // schema12 doesn't support _version_ System.setProperty("solr.filterCache.async", "true"); + + // for some reason path access in creating `fromCore` is a problem for some DirectoryFactories + restoreDirectoryFactory = System.getProperty("solr.directoryFactory"); + System.setProperty("solr.directoryFactory", "org.apache.solr.core.MockDirectoryFactory"); + // initCore("solrconfig.xml","schema12.xml"); // File testHome = createTempDir().toFile(); @@ -247,5 +254,6 @@ public String query(SolrCore core, SolrQueryRequest req) throws Exception { @AfterClass public static void nukeAll() { fromCore = null; + System.setProperty("solr.directoryFactory", restoreDirectoryFactory); } } diff --git a/solr/core/src/test/org/apache/solr/search/TestIndexSearcher.java b/solr/core/src/test/org/apache/solr/search/TestIndexSearcher.java index 25975ce80e8..b511dd7075d 100644 --- a/solr/core/src/test/org/apache/solr/search/TestIndexSearcher.java +++ b/solr/core/src/test/org/apache/solr/search/TestIndexSearcher.java @@ -18,6 +18,7 @@ import com.codahale.metrics.Gauge; import com.codahale.metrics.Metric; +import java.io.Closeable; import java.io.IOException; import java.lang.reflect.Array; import java.util.Date; @@ -223,6 +224,7 @@ public void testSearcherListeners() throws Exception { createCoreAndValidateListeners(1, 1, 2, 1); } + @SuppressWarnings("try") private void createCoreAndValidateListeners( int numTimesCalled, int numTimesCalledFirstSearcher, @@ -236,7 +238,9 @@ private void createCoreAndValidateListeners( MockSearcherListener.numberOfTimesCalled = new AtomicInteger(); MockSearcherListener.numberOfTimesCalledFirstSearcher = new AtomicInteger(); - try { + String restoreDirFactory = System.getProperty("solr.directoryFactory"); + System.setProperty("solr.directoryFactory", "org.apache.solr.core.MockDirectoryFactory"); + try (Closeable c = () -> System.setProperty("solr.directoryFactory", restoreDirFactory)) { // Create a new core, this should call all the firstSearcherListeners newCore = cores.create( @@ -284,6 +288,7 @@ private void doQuery(SolrCore core) throws Exception { connection.request("/select", params, null).contains("0")); } + @SuppressWarnings("try") public void testDontUseColdSearcher() throws Exception { MockSearchComponent.registerFirstSearcherListener = false; MockSearchComponent.registerNewSearcherListener = false; @@ -296,7 +301,9 @@ public void testDontUseColdSearcher() throws Exception { CoreDescriptor cd = h.getCore().getCoreDescriptor(); final SolrCore newCore; boolean coreCreated = false; - try { + String restoreDirFactory = System.getProperty("solr.directoryFactory"); + System.setProperty("solr.directoryFactory", "org.apache.solr.core.MockDirectoryFactory"); + try (Closeable c = () -> System.setProperty("solr.directoryFactory", restoreDirFactory)) { // Create a new core, this should call all the firstSearcherListeners newCore = cores.create( @@ -356,6 +363,7 @@ public void run() { } } + @SuppressWarnings("try") public void testUseColdSearcher() throws Exception { MockSearchComponent.registerFirstSearcherListener = false; MockSearchComponent.registerNewSearcherListener = false; @@ -368,7 +376,9 @@ public void testUseColdSearcher() throws Exception { CoreDescriptor cd = h.getCore().getCoreDescriptor(); final SolrCore newCore; boolean coreCreated = false; - try { + String restoreDirFactory = System.getProperty("solr.directoryFactory"); + System.setProperty("solr.directoryFactory", "org.apache.solr.core.MockDirectoryFactory"); + try (Closeable c = () -> System.setProperty("solr.directoryFactory", restoreDirFactory)) { System.setProperty("tests.solr.useColdSearcher", "true"); // Create a new core, this should call all the firstSearcherListeners newCore = diff --git a/solr/core/src/test/org/apache/solr/storage/TeeDirectoryTest.java b/solr/core/src/test/org/apache/solr/storage/TeeDirectoryTest.java new file mode 100644 index 00000000000..f1aeaddf253 --- /dev/null +++ b/solr/core/src/test/org/apache/solr/storage/TeeDirectoryTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.storage; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Random; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import org.apache.solr.SolrTestCase; +import org.junit.Test; + +public class TeeDirectoryTest extends SolrTestCase { + + @Test + public void testIsLazyTmpFile() { + String[] lazyTmp = + new String[] { + "_lazy_123.tmp", "_lazy_1.tmp", "_lazy_123abcdefghijklmnopqrstuvwxyz0456789.tmp" + }; + String[] notLazyTmp = + new String[] {"", "a", "lazy_123.tmp", "_lazy_.tmp", "_lazy_Q.tmp", "_lazy_123.tmpp"}; + String prefix = "prefix"; + int prefixLen = prefix.length(); + for (String s : lazyTmp) { + assertEquals(0, AccessDirectory.lazyTmpFileSuffixStartIdx(s)); + assertEquals(prefixLen, AccessDirectory.lazyTmpFileSuffixStartIdx(prefix.concat(s))); + } + for (String s : notLazyTmp) { + assertEquals(-1, AccessDirectory.lazyTmpFileSuffixStartIdx(s)); + assertEquals(-1, AccessDirectory.lazyTmpFileSuffixStartIdx("prefix".concat(s))); + } + } + + @Test + public void testSortAndMergeArrays() { + Random r = random(); + Set a = new HashSet<>(); + Set b = new HashSet<>(); + SortedSet sorted = new TreeSet<>(); + for (int i = 0; i < 1000; i++) { + a.clear(); + b.clear(); + sorted.clear(); + for (int j = r.nextInt(14); j > 0; j--) { + String s = Character.toString('a' + r.nextInt(26)); + if (r.nextInt(10) == 0) { + s = s.concat("_lazy_1.tmp"); + } else { + sorted.add(s); + } + a.add(s); + } + for (int j = r.nextInt(14); j > 0; j--) { + String s = Character.toString('a' + r.nextInt(26)); + sorted.add(s); + b.add(s); + } + String[] aArr = a.toArray(new String[0]); + String[] bArr = b.toArray(new String[0]); + Collections.shuffle(Arrays.asList(aArr), r); + Collections.shuffle(Arrays.asList(bArr), r); + String[] result = TeeDirectory.sortAndMergeArrays(aArr, bArr); + Arrays.sort(result); + assertArrayEquals(sorted.toArray(new String[0]), result); + } + } +} diff --git a/solr/solrj/src/java/org/apache/solr/common/util/ObjectCache.java b/solr/solrj/src/java/org/apache/solr/common/util/ObjectCache.java index be9ec69d926..407241e14d2 100644 --- a/solr/solrj/src/java/org/apache/solr/common/util/ObjectCache.java +++ b/solr/solrj/src/java/org/apache/solr/common/util/ObjectCache.java @@ -17,21 +17,27 @@ package org.apache.solr.common.util; import java.io.IOException; +import java.lang.invoke.MethodHandles; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import org.apache.solr.common.SolrCloseable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** Simple object cache with a type-safe accessor. */ public class ObjectCache extends MapBackedCache implements SolrCloseable { - private volatile boolean isClosed; + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private final AtomicBoolean isClosed = new AtomicBoolean(false); public ObjectCache() { super(new ConcurrentHashMap<>()); } private void ensureNotClosed() { - if (isClosed) { + if (isClosed.get()) { throw new RuntimeException("This ObjectCache is already closed."); } } @@ -78,12 +84,26 @@ public T computeIfAbsent( @Override public boolean isClosed() { - return isClosed; + return isClosed.get(); } @Override public void close() throws IOException { - isClosed = true; - map.clear(); + if (isClosed.compareAndSet(false, true)) { + // Close any Closeable object which may have been stored into this cache. + // This allows to tie some objects to the lifecycle of the object which + // owns this ObjectCache, which is useful for plugins to register objects + // which should be closed before being garbage-collected. + for (Object value : map.values()) { + if (value instanceof AutoCloseable) { + try { + ((AutoCloseable) value).close(); + } catch (Exception e) { + log.warn("exception closing resource {}", value, e); + } + } + } + map.clear(); + } } } diff --git a/solr/solrj/src/test/org/apache/solr/common/util/ObjectCacheTest.java b/solr/solrj/src/test/org/apache/solr/common/util/ObjectCacheTest.java new file mode 100644 index 00000000000..a7bd478491c --- /dev/null +++ b/solr/solrj/src/test/org/apache/solr/common/util/ObjectCacheTest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.common.util; + +import java.io.Closeable; +import org.apache.solr.SolrTestCase; +import org.junit.Test; + +/** Tests for {@link ObjectCache}. */ +public class ObjectCacheTest extends SolrTestCase { + + private ObjectCache objectCache; + + @Override + public void setUp() throws Exception { + super.setUp(); + objectCache = new ObjectCache(); + } + + @Override + public void tearDown() throws Exception { + objectCache.close(); + super.tearDown(); + } + + @Test + public void testGetPutRemove() { + assertNull(objectCache.get("key")); + + objectCache.put("key", "value"); + assertEquals("value", objectCache.get("key")); + + objectCache.remove("key"); + assertNull(objectCache.get("key")); + } + + @Test + public void testClear() { + objectCache.put("key1", "value1"); + objectCache.put("key2", "value2"); + + objectCache.clear(); + assertNull(objectCache.get("key1")); + assertNull(objectCache.get("key2")); + } + + @Test + public void testGetTypeSafe() { + objectCache.put("string", "a string"); + objectCache.put("integer", 42); + + assertEquals("a string", objectCache.get("string", String.class)); + assertEquals((Integer) 42, objectCache.get("integer", Integer.class)); + } + + @Test + public void testComputeIfAbsentTypeSafe() { + String returnValue = objectCache.computeIfAbsent("string", String.class, k -> "a string"); + assertEquals("a string", returnValue); + assertEquals("a string", objectCache.get("string")); + + returnValue = objectCache.computeIfAbsent("string", String.class, k -> "another string"); + assertEquals("a string", returnValue); + assertEquals("a string", objectCache.get("string")); + } + + @Test + public void testClose() throws Exception { + assertFalse(objectCache.isClosed()); + objectCache.close(); + assertTrue(objectCache.isClosed()); + } + + @Test + public void testCloseCloseableValues() throws Exception { + MyCloseable object1 = new MyCloseable(); + MyCloseable object2 = new MyCloseable(); + objectCache.put("object1", object1); + objectCache.put("object2", object2); + objectCache.put("string", "a string"); + objectCache.close(); + + assertTrue(object1.closed); + assertTrue(object2.closed); + } + + private static class MyCloseable implements Closeable { + private boolean closed = false; + + @Override + public void close() { + closed = true; + } + } +}