Skip to content

Commit

Permalink
Experimental QA core as proxy in memory
Browse files Browse the repository at this point in the history
  • Loading branch information
patsonluk committed Feb 24, 2024
1 parent 7141ef4 commit 7a12a63
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ public ZkConfigSetService(SolrZkClient zkClient) {
this.zkClient = zkClient;
}

@Override
protected SolrResourceLoader createConfigSetResourceLoader(CoreDescriptor cd, String configSetName) {
return new ZkSolrResourceLoader(
cd.getInstanceDir(), configSetName, parentLoader.getClassLoader(), zkController);
}

@Override
public SolrResourceLoader createCoreResourceLoader(CoreDescriptor cd) {
final String colName = cd.getCollectionName();
Expand Down
15 changes: 12 additions & 3 deletions solr/core/src/java/org/apache/solr/core/SolrCore.java
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@
import org.apache.solr.search.facet.FacetParserFactory;
import org.apache.solr.search.stats.LocalStatsCache;
import org.apache.solr.search.stats.StatsCache;
import org.apache.solr.servlet.CoordinatorHttpSolrCall;
import org.apache.solr.update.DefaultSolrCoreState;
import org.apache.solr.update.DirectUpdateHandler2;
import org.apache.solr.update.IndexFingerprint;
Expand Down Expand Up @@ -1052,7 +1053,9 @@ public CoreContainer getCoreContainer() {
return coreContainer;
}

SolrCore(CoreContainer coreContainer, CoreDescriptor cd, ConfigSet configSet) {

//TODO was protected. a proper way will likely be creating a SolrCoreProxy class
public SolrCore(CoreContainer coreContainer, CoreDescriptor cd, ConfigSet configSet) {
this(coreContainer, cd, configSet, null, null, null, null, false);
}

Expand Down Expand Up @@ -1175,7 +1178,7 @@ private SolrCore(
initSearcher(prev);

// Initialize the RestManager
restManager = initRestManager();
restManager = isSynthetic() ? new RestManager() : initRestManager();

// Finally tell anyone who wants to know
resourceLoader.inform(resourceLoader);
Expand All @@ -1202,7 +1205,9 @@ private SolrCore(
// searcher!
seedVersionBuckets();

bufferUpdatesIfConstructing(coreDescriptor);
if (!isSynthetic()) {
bufferUpdatesIfConstructing(coreDescriptor);
}

this.ruleExpiryLock = new ReentrantLock();
this.snapshotDelLock = new ReentrantLock();
Expand Down Expand Up @@ -3095,6 +3100,10 @@ public void fetchLatestSchema() {
setLatestSchema(schema);
}

public final boolean isSynthetic() {
return coreDescriptor.getCollectionName() != null && coreDescriptor.getCollectionName().startsWith(CoordinatorHttpSolrCall.SYNTHETIC_COLL_PREFIX);
}

public interface RawWriter {
default String getContentType() {
return BinaryResponseParser.BINARY_CONTENT_TYPE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ public static SolrCore getCore(
ZkStateReader zkStateReader = solrCall.cores.getZkController().getZkStateReader();
ClusterState clusterState = zkStateReader.getClusterState();
DocCollection coll = clusterState.getCollectionOrNull(collectionName, true);
SolrCore core = null;
if (coll != null) {

synchronized (CoordinatorHttpSolrCall.class) {
String confName = coll.getConfigName();
String syntheticCollectionName = getSyntheticCollectionName(confName);

Expand All @@ -113,118 +113,9 @@ public static SolrCore getCore(

//after this point the sync core should be available in the container. Double check
if (coreContainer.getCore(syntheticCore.getName()) != null) {
factory.collectionVsCoreNameMapping.put(collectionName, core.getName());
factory.collectionVsCoreNameMapping.put(collectionName, syntheticCore.getName());
return syntheticCore;
}


DocCollection syntheticColl = clusterState.getCollectionOrNull(syntheticCollectionName);
synchronized (CoordinatorHttpSolrCall.class) {
if (syntheticColl == null) {
// no synthetic collection for this config, let's create one
if (log.isInfoEnabled()) {
log.info(
"synthetic collection: {} does not exist, creating.. ", syntheticCollectionName);
}

SolrException createException = null;
try {
createColl(syntheticCollectionName, solrCall.cores, confName);
} catch (SolrException exception) {
// concurrent requests could have created the collection hence causing collection
// exists
// exception
createException = exception;
} finally {
syntheticColl =
zkStateReader.getClusterState().getCollectionOrNull(syntheticCollectionName);
}

// then indeed the collection was not created properly, either by this or other
// concurrent
// requests
if (syntheticColl == null) {
if (createException != null) {
throw createException; // rethrow the exception since such collection was not
// created
} else {
throw new SolrException(
SolrException.ErrorCode.SERVER_ERROR,
"Could not locate synthetic collection ["
+ syntheticCollectionName
+ "] after creation!");
}
}
}

// get docCollection again to ensure we get the fresh state
syntheticColl =
zkStateReader.getClusterState().getCollectionOrNull(syntheticCollectionName);
List<Replica> nodeNameSyntheticReplicas =
syntheticColl.getReplicas(solrCall.cores.getZkController().getNodeName());
if (nodeNameSyntheticReplicas == null || nodeNameSyntheticReplicas.isEmpty()) {
// this node does not have a replica. add one
if (log.isInfoEnabled()) {
log.info(
"this node does not have a replica of the synthetic collection: {} , adding replica ",
syntheticCollectionName);
}

addReplica(syntheticCollectionName, solrCall.cores);
}

// still have to ensure that it's active, otherwise super.getCoreByCollection
// will return null and then CoordinatorHttpSolrCall will call getCore again
// hence creating a calling loop
try {
zkStateReader.waitForState(
syntheticCollectionName,
10,
TimeUnit.SECONDS,
docCollection -> {
for (Replica nodeNameSyntheticReplica :
docCollection.getReplicas(solrCall.cores.getZkController().getNodeName())) {
if (nodeNameSyntheticReplica.getState() == Replica.State.ACTIVE) {
return true;
}
}
return false;
});
} catch (Exception e) {
throw new SolrException(
SolrException.ErrorCode.SERVER_ERROR,
"Failed to wait for active replica for synthetic collection ["
+ syntheticCollectionName
+ "]",
e);
}
}

core = solrCall.getCoreByCollection(syntheticCollectionName, isPreferLeader);
if (core != null) {
factory.collectionVsCoreNameMapping.put(collectionName, core.getName());
// for the watcher, only remove on collection deletion (ie collection == null), since
// watch from coordinator is collection specific
solrCall
.cores
.getZkController()
.getZkStateReader()
.registerDocCollectionWatcher(
collectionName,
collection -> {
if (collection == null) {
factory.collectionVsCoreNameMapping.remove(collectionName);
return true;
} else {
return false;
}
});
if (log.isDebugEnabled()) {
log.debug("coordinator node, returns synthetic core: {}", core.getName());
}
}
setMDCLoggingContext(collectionName);
return core;
}
return null;
}
Expand Down Expand Up @@ -308,7 +199,9 @@ protected void init() throws Exception {
public static SolrQueryRequest wrappedReq(
SolrQueryRequest delegate, String collectionName, HttpSolrCall httpSolrCall) {
Properties p = new Properties();
p.put(CoreDescriptor.CORE_COLLECTION, collectionName);
if (collectionName != null) {
p.put(CoreDescriptor.CORE_COLLECTION, collectionName);
}
p.put(CloudDescriptor.REPLICA_TYPE, Replica.Type.PULL.toString());
p.put(CoreDescriptor.CORE_SHARD, "_");

Expand Down

0 comments on commit 7a12a63

Please sign in to comment.