Skip to content

Commit

Permalink
Speed up verification by using multiple threads.
Browse files Browse the repository at this point in the history
  • Loading branch information
broneill committed Nov 15, 2024
1 parent 99b0e72 commit 64e2ef9
Show file tree
Hide file tree
Showing 53 changed files with 410 additions and 200 deletions.
69 changes: 36 additions & 33 deletions src/main/java/org/cojen/tupl/Database.java
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public static Database rebuild(DatabaseConfig oldConfig, DatabaseConfig newConfi
*
* @return shared Index instance
*/
public abstract Index openIndex(byte[] name) throws IOException;
public Index openIndex(byte[] name) throws IOException;

/**
* Returns the given named index, creating it if necessary. Name is UTF-8
Expand All @@ -161,7 +161,7 @@ public default Index openIndex(String name) throws IOException {
*
* @return shared Index instance; null if not found
*/
public abstract Index findIndex(byte[] name) throws IOException;
public Index findIndex(byte[] name) throws IOException;

/**
* Returns the given named index, returning null if not found. Name is UTF-8
Expand All @@ -179,7 +179,7 @@ public default Index findIndex(String name) throws IOException {
* @return shared Index instance
* @throws IllegalArgumentException if id is reserved
*/
public abstract Index indexById(long id) throws IOException;
public Index indexById(long id) throws IOException;

/**
* Returns an index by its identifier, returning null if not found.
Expand Down Expand Up @@ -239,7 +239,7 @@ public default <J> Table<J> openJoinTable(Class<J> joinType, String spec) throws
* @throws IllegalStateException if name is already in use by another index
* @throws IllegalStateException if index belongs to another database instance
*/
public abstract void renameIndex(Index index, byte[] newName) throws IOException;
public void renameIndex(Index index, byte[] newName) throws IOException;

/**
* Renames the given index to the one given. Name is UTF-8 encoded.
Expand Down Expand Up @@ -274,29 +274,29 @@ public default void renameIndex(Index index, String newName) throws IOException
* @see EventListener
* @see Index#drop Index.drop
*/
public abstract Runnable deleteIndex(Index index) throws IOException;
public Runnable deleteIndex(Index index) throws IOException;

/**
* Creates a new unnamed temporary index. Temporary indexes never get written to the redo
* log, and they are deleted when the database is re-opened. Temporary indexes should be
* explicitly {@linkplain #deleteIndex deleted} when no longer needed, rather than waiting
* until the database is re-opened.
*/
public abstract Index newTemporaryIndex() throws IOException;
public Index newTemporaryIndex() throws IOException;

/**
* Returns an {@linkplain UnmodifiableViewException unmodifiable} View which maps all
* available index names to identifiers. Identifiers are long integers, {@linkplain
* org.cojen.tupl.io.Utils#decodeLongBE big-endian} encoded.
*/
public abstract View indexRegistryByName() throws IOException;
public View indexRegistryByName() throws IOException;

/**
* Returns an {@linkplain UnmodifiableViewException unmodifiable} View which maps all
* available index identifiers to names. Identifiers are long integers, {@linkplain
* org.cojen.tupl.io.Utils#decodeLongBE big-endian} encoded.
*/
public abstract View indexRegistryById() throws IOException;
public View indexRegistryById() throws IOException;

/**
* Returns a new Transaction with the {@linkplain DatabaseConfig#durabilityMode default}
Expand All @@ -310,7 +310,7 @@ public default Transaction newTransaction() {
* Returns a new Transaction with the given durability mode. If null, the
* {@linkplain DatabaseConfig#durabilityMode default} is used.
*/
public abstract Transaction newTransaction(DurabilityMode durabilityMode);
public Transaction newTransaction(DurabilityMode durabilityMode);

/**
* Returns a handler instance suitable for writing custom redo and undo operations. A
Expand All @@ -320,7 +320,7 @@ public default Transaction newTransaction() {
* @return new writer instance
* @throws IllegalStateException if no recovery instance by the given name is installed
*/
public abstract CustomHandler customWriter(String name) throws IOException;
public CustomHandler customWriter(String name) throws IOException;

/**
* Returns a handler instance suitable for preparing transactions. A corresponding recovery
Expand All @@ -330,22 +330,22 @@ public default Transaction newTransaction() {
* @return new writer instance
* @throws IllegalStateException if no recovery instance by the given name is installed
*/
public abstract PrepareHandler prepareWriter(String name) throws IOException;
public PrepareHandler prepareWriter(String name) throws IOException;

/**
* Returns a new Sorter instance. The standard algorithm is a parallel external mergesort,
* which attempts to use all available processors. All external storage is maintained in
* the database itself, in the form of temporary indexes.
*/
public abstract Sorter newSorter();
public Sorter newSorter();

/**
* Preallocates pages for immediate use. The actual amount allocated
* varies, depending on the amount of free pages already available.
*
* @return actual amount allocated
*/
public abstract long preallocate(long bytes) throws IOException;
public long preallocate(long bytes) throws IOException;

/**
* Set a soft capacity limit for the database, to prevent filling up the storage
Expand Down Expand Up @@ -404,7 +404,7 @@ public default void capacityLimitOverride(long bytes) {
*
* @return a snapshot control object, which must be closed when no longer needed
*/
public abstract Snapshot beginSnapshot() throws IOException;
public Snapshot beginSnapshot() throws IOException;

/**
* Restore from a {@linkplain #beginSnapshot snapshot}, into the data files defined by the
Expand All @@ -426,15 +426,15 @@ public static Database restoreFromSnapshot(DatabaseConfig config, InputStream in
* @param out cache priming destination; buffering is recommended; not auto-closed
* @see DatabaseConfig#cachePriming
*/
public abstract void createCachePrimer(OutputStream out) throws IOException;
public void createCachePrimer(OutputStream out) throws IOException;

/**
* Prime the cache, from a set encoded {@linkplain #createCachePrimer earlier}.
*
* @param in caching priming source; buffering is recommended; auto-closed
* @see DatabaseConfig#cachePriming
*/
public abstract void applyCachePrimer(InputStream in) throws IOException;
public void applyCachePrimer(InputStream in) throws IOException;

/**
* Returns an object for enabling remote access into this database. As long as the server
Expand All @@ -445,12 +445,12 @@ public static Database restoreFromSnapshot(DatabaseConfig config, InputStream in
*
* @see #connect connect
*/
public abstract Server newServer() throws IOException;
public Server newServer() throws IOException;

/**
* Returns a collection of database statistics.
*/
public abstract DatabaseStats stats();
public DatabaseStats stats();

/**
* Flushes all committed transactions, but not durably. Transactions committed with
Expand All @@ -461,22 +461,22 @@ public static Database restoreFromSnapshot(DatabaseConfig config, InputStream in
* Calling this method on a replicated database has no effect.
*/
@Override
public abstract void flush() throws IOException;
public void flush() throws IOException;

/**
* Durably flushes all committed transactions. Transactions committed with {@linkplain
* DurabilityMode#NO_FLUSH no-flush} and {@linkplain DurabilityMode#NO_SYNC no-sync}
* effectively become {@linkplain DurabilityMode#SYNC sync} durable.
*/
public abstract void sync() throws IOException;
public void sync() throws IOException;

/**
* Durably sync and checkpoint all changes to the database. In addition to ensuring that
* all committed transactions are durable, checkpointing ensures that non-transactional
* modifications are durable. Checkpoints are performed automatically by a background
* thread, at a {@linkplain DatabaseConfig#checkpointRate configurable} rate.
*/
public abstract void checkpoint() throws IOException;
public void checkpoint() throws IOException;

/**
* Temporarily suspend automatic checkpoints and wait for any in-progress checkpoint to
Expand All @@ -485,15 +485,15 @@ public static Database restoreFromSnapshot(DatabaseConfig config, InputStream in
*
* @throws IllegalStateException if suspended more than 2<sup>31</sup> times
*/
public abstract void suspendCheckpoints();
public void suspendCheckpoints();

/**
* Resume automatic checkpoints after having been temporarily {@link #suspendCheckpoints
* suspended}.
*
* @throws IllegalStateException if resumed more than suspended
*/
public abstract void resumeCheckpoints();
public void resumeCheckpoints();

/**
* Returns the checkpoint commit lock, which can be held to prevent checkpoints from
Expand All @@ -505,7 +505,7 @@ public static Database restoreFromSnapshot(DatabaseConfig config, InputStream in
* modifications, if a checkpoint is trying to start. In addition, a thread holding the
* commit lock must not attempt to issue a checkpoint, because deadlock is possible.
*/
public abstract Lock commitLock();
public Lock commitLock();

/**
* Compacts the database by shrinking the database file. The compaction target is the
Expand All @@ -530,21 +530,24 @@ public static Database restoreFromSnapshot(DatabaseConfig config, InputStream in
* @throws IllegalArgumentException if compaction target is out of bounds
* @throws IllegalStateException if compaction is already in progress
*/
public abstract boolean compactFile(CompactionObserver observer, double target)
public boolean compactFile(CompactionObserver observer, double target)
throws IOException;

/**
* Verifies the integrity of the database and all indexes.
* Verifies the integrity of the database and all indexes. Using multiple threads speeds up
* verification, even though some nodes might be visited multiple times.
*
* @param observer optional observer; pass null for default
* @param numThreads pass 0 for default, or if negative, the actual number will be {@code
* (-numThreads * availableProcessors)}.
* @return true if verification passed
*/
public abstract boolean verify(VerificationObserver observer) throws IOException;
public boolean verify(VerificationObserver observer, int numThreads) throws IOException;

/**
* Returns true if the database instance is currently the leader.
*/
public abstract boolean isLeader();
public boolean isLeader();

/**
* Registers the given task to start in a separate thread when the database instance has
Expand All @@ -556,15 +559,15 @@ public abstract boolean compactFile(CompactionObserver observer, double target)
* @param acquired called when leadership is acquired (can be null)
* @param lost called when leadership is lost (can be null)
*/
public abstract void uponLeader(Runnable acquired, Runnable lost);
public void uponLeader(Runnable acquired, Runnable lost);

/**
* If the database instance is currently acting as a leader, attempt to give up leadership
* and become a replica. If the database is a replica, or if failover is successful, true
* is returned. When false is returned, the database is likely still the leader, either
* because the database isn't replicated, or because no replicas exist to failover to.
*/
public abstract boolean failover() throws IOException;
public boolean failover() throws IOException;

/**
* Closes the database, ensuring durability of committed transactions. No
Expand All @@ -587,17 +590,17 @@ public default void close() throws IOException {
* @see #shutdown
*/
@Override
public abstract void close(Throwable cause) throws IOException;
public void close(Throwable cause) throws IOException;

/**
* Returns true if database was explicitly closed, or if it was closed due to a panic.
*/
public abstract boolean isClosed();
public boolean isClosed();

/**
* Cleanly closes the database, ensuring durability of all modifications. A checkpoint is
* issued first, and so a quick recovery is performed when the database is re-opened. As a
* side effect of shutting down, all extraneous files are deleted.
*/
public abstract void shutdown() throws IOException;
public void shutdown() throws IOException;
}
7 changes: 5 additions & 2 deletions src/main/java/org/cojen/tupl/Index.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,15 @@ public long evict(Transaction txn, byte[] lowKey, byte[] highKey,
public IndexStats analyze(byte[] lowKey, byte[] highKey) throws IOException;

/**
* Verifies the integrity of the index.
* Verifies the integrity of the index. Using multiple threads speeds up verification,
* even though some nodes might be visited multiple times.
*
* @param observer optional observer; pass null for default
* @param numThreads pass 0 for default, or if negative, the actual number will be {@code
* (-numThreads * availableProcessors)}.
* @return true if verification passed
*/
public boolean verify(VerificationObserver observer) throws IOException;
public boolean verify(VerificationObserver observer, int numThreads) throws IOException;

/**
* Closes this index reference. The underlying index is still valid and can be re-opened,
Expand Down
33 changes: 24 additions & 9 deletions src/main/java/org/cojen/tupl/core/BTree.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import org.cojen.tupl.diag.EventType;
import org.cojen.tupl.diag.IndexStats;

import org.cojen.tupl.util.Runner;

import org.cojen.tupl.views.BoundedView;
import org.cojen.tupl.views.UnmodifiableView;

Expand Down Expand Up @@ -847,15 +849,28 @@ final boolean compactTree(Index view, long highestNodeId, CompactionObserver obs
}

@Override
final boolean verifyTree(Index view, VerifyObserver observer) throws IOException {
BTreeCursor cursor = newCursor(Transaction.BOGUS);
try {
cursor.mKeyOnly = true;
cursor.first(); // must start with loaded key
int height = cursor.height();
return observer.indexBegin(view, height) && cursor.verify(height, observer);
} finally {
cursor.reset();
final boolean verifyTree(Index view, VerifyObserver observer, int numThreads)
throws IOException
{
if (numThreads <= 0) {
int procs = Runtime.getRuntime().availableProcessors();
numThreads = numThreads == 0 ? procs : procs * -numThreads;
}

if (numThreads <= 1) {
BTreeCursor cursor = newCursor(Transaction.BOGUS);
try {
cursor.mKeyOnly = true;
cursor.first(); // must start with loaded key
int height = cursor.height();
return observer.indexBegin(view, height) && cursor.verify(height, observer);
} finally {
cursor.reset();
}
} else {
var verifier = new BTreeVerifier(observer, this, Runner.current(), numThreads);
verifier.start();
return !verifier.await();
}
}

Expand Down
1 change: 1 addition & 0 deletions src/main/java/org/cojen/tupl/core/BTreeCopier.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public BTree result() throws IOException {
return mMerged;
}
if (mCondition.await(mLatch) < 0) {
stop();
throw new InterruptedIOException();
}
}
Expand Down
Loading

0 comments on commit 64e2ef9

Please sign in to comment.