Skip to content

Commit

Permalink
Add self-contained package with TeeDirectory implementation (#182)
Browse files Browse the repository at this point in the history
  • Loading branch information
magibney authored Mar 12, 2024
1 parent b146583 commit ba9a6b2
Show file tree
Hide file tree
Showing 19 changed files with 4,310 additions and 44 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/solr-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,4 @@ jobs:
- name: Initialize gradle settings
run: ./gradlew localSettings
- name: Test Solr
run: ./gradlew test
run: ./gradlew test -Ptests.directory=org.apache.solr.storage.TeeDirectory
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ public void call(ClusterState state, ZkNodeProps message, NamedList<Object> resu
}
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.UNLOAD.toString());
params.set(CoreAdminParams.DELETE_INDEX, message.getBool(CoreAdminParams.DELETE_INDEX, true));
params.set(CoreAdminParams.DELETE_INSTANCE_DIR, true);
params.set(CoreAdminParams.DELETE_DATA_DIR, true);

Expand Down
6 changes: 5 additions & 1 deletion solr/core/src/java/org/apache/solr/core/CoreContainer.java
Original file line number Diff line number Diff line change
Expand Up @@ -1265,7 +1265,11 @@ public void shutdown() {
// Now clear all the cores that are being operated upon.
solrCores.close();

objectCache.clear();
try {
objectCache.close();
} catch (IOException ex) {
log.warn("error closing ObjectCache", ex);
}

// It's still possible that one of the pending dynamic load operation is waiting, so wake it
// up if so. Since all the pending operations queues have been drained, there should be
Expand Down
15 changes: 14 additions & 1 deletion solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import static org.apache.solr.handler.ReplicationHandler.SIZE;
import static org.apache.solr.handler.ReplicationHandler.SKIP_COMMIT_ON_LEADER_VERSION_ZERO;

import java.io.Closeable;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
Expand Down Expand Up @@ -79,6 +80,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BooleanSupplier;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand All @@ -96,6 +98,7 @@
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.Lock;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpClientUtil;
Expand Down Expand Up @@ -434,6 +437,7 @@ IndexFetchResult fetchLatestIndex(boolean forceReplication)
* @return true on success, false if follower is already in sync
* @throws IOException if an exception occurs
*/
@SuppressWarnings("try")
IndexFetchResult fetchLatestIndex(boolean forceReplication, boolean forceCoreReload)
throws IOException, InterruptedException {

Expand Down Expand Up @@ -624,7 +628,15 @@ IndexFetchResult fetchLatestIndex(boolean forceReplication, boolean forceCoreRel
.getDirectoryFactory()
.get(indexDirPath, DirContext.DEFAULT, solrCore.getSolrConfig().indexConfig.lockType);

try {
Lock lock = tmpIndexDir.obtainLock(IndexWriter.WRITE_LOCK_NAME);
AtomicBoolean releasedLock = new AtomicBoolean();

try (Closeable releaseLock =
() -> {
if (releasedLock.compareAndSet(false, true)) {
lock.close();
}
}) {

// We will compare all the index files from the leader vs the index files on disk to see if
// there is a mismatch in the metadata. If there is a mismatch for the same index file then
Expand Down Expand Up @@ -770,6 +782,7 @@ IndexFetchResult fetchLatestIndex(boolean forceReplication, boolean forceCoreRel
}
}
if (isFullCopyNeeded) {
releaseLock.close();
solrCore.getUpdateHandler().newIndexWriter(isFullCopyNeeded);
}

Expand Down
70 changes: 38 additions & 32 deletions solr/core/src/java/org/apache/solr/handler/RestoreCore.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.Lock;
import org.apache.solr.common.SolrException;
import org.apache.solr.core.DirectoryFactory;
import org.apache.solr.core.SolrCore;
Expand Down Expand Up @@ -82,6 +84,7 @@ public Boolean call() throws Exception {
return doRestore();
}

@SuppressWarnings("try")
public boolean doRestore() throws Exception {
SimpleDateFormat dateFormat = new SimpleDateFormat(SnapShooter.DATE_FMT, Locale.ROOT);
String restoreIndexName = "restore." + dateFormat.format(new Date());
Expand All @@ -99,44 +102,47 @@ public boolean doRestore() throws Exception {
DirectoryFactory.DirContext.DEFAULT,
core.getSolrConfig().indexConfig.lockType);

// Prefer local copy.
indexDir =
core.getDirectoryFactory()
.get(
indexDirPath,
DirectoryFactory.DirContext.DEFAULT,
core.getSolrConfig().indexConfig.lockType);
Set<String> indexDirFiles = new HashSet<>(Arrays.asList(indexDir.listAll()));
// Move all files from backupDir to restoreIndexDir
for (String filename : repository.listAllFiles()) {
checkInterrupted();
try {
if (indexDirFiles.contains(filename)) {
Checksum cs = repository.checksum(filename);
IndexFetcher.CompareResult compareResult;
if (cs == null) {
compareResult = new IndexFetcher.CompareResult();
compareResult.equal = false;
try (Lock lock = restoreIndexDir.obtainLock(IndexWriter.WRITE_LOCK_NAME)) {
// Prefer local copy.
indexDir =
core.getDirectoryFactory()
.get(
indexDirPath,
DirectoryFactory.DirContext.DEFAULT,
core.getSolrConfig().indexConfig.lockType);
Set<String> indexDirFiles = new HashSet<>(Arrays.asList(indexDir.listAll()));
// Move all files from backupDir to restoreIndexDir
for (String filename : repository.listAllFiles()) {
checkInterrupted();
try {
if (indexDirFiles.contains(filename)) {
Checksum cs = repository.checksum(filename);
IndexFetcher.CompareResult compareResult;
if (cs == null) {
compareResult = new IndexFetcher.CompareResult();
compareResult.equal = false;
} else {
compareResult = IndexFetcher.compareFile(indexDir, filename, cs.size, cs.checksum);
}
if (!compareResult.equal
|| (IndexFetcher.filesToAlwaysDownloadIfNoChecksums(
filename, cs.size, compareResult))) {
repository.repoCopy(filename, restoreIndexDir);
} else {
// prefer local copy
repository.localCopy(indexDir, filename, restoreIndexDir);
}
} else {
compareResult = IndexFetcher.compareFile(indexDir, filename, cs.size, cs.checksum);
}
if (!compareResult.equal
|| (IndexFetcher.filesToAlwaysDownloadIfNoChecksums(
filename, cs.size, compareResult))) {
repository.repoCopy(filename, restoreIndexDir);
} else {
// prefer local copy
repository.localCopy(indexDir, filename, restoreIndexDir);
}
} else {
repository.repoCopy(filename, restoreIndexDir);
} catch (Exception e) {
log.warn("Exception while restoring the backup index ", e);
throw new SolrException(
SolrException.ErrorCode.UNKNOWN, "Exception while restoring the backup index", e);
}
} catch (Exception e) {
log.warn("Exception while restoring the backup index ", e);
throw new SolrException(
SolrException.ErrorCode.UNKNOWN, "Exception while restoring the backup index", e);
}
}

log.debug("Switching directories");
core.modifyIndexProps(restoreIndexName);

Expand Down
Loading

0 comments on commit ba9a6b2

Please sign in to comment.