Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dedicated storedFields executor experiment #225

Open
wants to merge 2 commits into
base: fs/branch_9_3
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions solr/core/src/java/org/apache/solr/core/CoreContainer.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -177,6 +178,20 @@ public class CoreContainer {

final SolrCores solrCores;

private final ExecutorService storedFieldsExecutor =
ExecutorUtil.newMDCAwareCachedThreadPool("storedFieldsExecutor");

public void storedFieldsExecute(Callable<Void> callable) throws IOException {
Future<Void> future = storedFieldsExecutor.submit(callable);
try {
future.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
throw org.apache.lucene.util.IOUtils.rethrowAlways(e.getCause());
}
}

public static class CoreLoadFailure {

public final CoreDescriptor cd;
Expand Down Expand Up @@ -1240,6 +1255,7 @@ public void shutdown() {
}

ExecutorUtil.shutdownAndAwaitTermination(coreContainerWorkExecutor);
ExecutorUtil.shutdownAndAwaitTermination(storedFieldsExecutor);

// First wake up the closer thread, it'll terminate almost immediately since it checks
// isShutDown.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,6 @@ public void process(ResponseBuilder rb) throws IOException {

SearcherInfo searcherInfo = new SearcherInfo(core);

// this is initialized & set on the context *after* any searcher (re-)opening
ResultContext resultContext = null;
final DocTransformer transformer = rsp.getReturnFields().getTransformer();

// true in any situation where we have to use a realtime searcher rather then returning docs
Expand All @@ -237,139 +235,173 @@ public void process(ResponseBuilder rb) throws IOException {

try {

boolean opennedRealtimeSearcher = false;
BytesRefBuilder idBytes = new BytesRefBuilder();
Map<String, SolrDocumentFetcher.DVIterEntry> reuseDvIters = new HashMap<>();
for (String idStr : reqIds.allIds) {
fieldType.readableToIndexed(idStr, idBytes);
// if _route_ is passed, id is a child doc. TODO remove in SOLR-15064
if (!opennedRealtimeSearcher && !params.get(ShardParams._ROUTE_, idStr).equals(idStr)) {
searcherInfo.clear();
resultContext = null;
ulog.openRealtimeSearcher(); // force open a new realtime searcher
opennedRealtimeSearcher = true;
} else if (ulog != null) {
Object o = ulog.lookup(idBytes.get());
if (o != null) {
// should currently be a List<Oper,Ver,Doc/Id>
List<?> entry = (List<?>) o;
assert entry.size() >= 3;
int oper = (Integer) entry.get(UpdateLog.FLAGS_IDX) & UpdateLog.OPERATION_MASK;
switch (oper) {
case UpdateLog.UPDATE_INPLACE: // fall through to ADD
case UpdateLog.ADD:
if (mustUseRealtimeSearcher) {
// close handles to current searchers & result context
if (!opennedRealtimeSearcher) {
searcherInfo.clear();
resultContext = null;
ulog.openRealtimeSearcher(); // force open a new realtime searcher
opennedRealtimeSearcher = true;
}
// pretend we never found this record and fall through to use the searcher
o = null;
break;
}
req.getCoreContainer()
.storedFieldsExecute(
() -> {
extracted(
rb,
reqIds,
fieldType,
params,
searcherInfo,
ulog,
mustUseRealtimeSearcher,
core,
transformer,
rsp,
req,
docList);
return null;
});

SolrDocument doc;
if (oper == UpdateLog.ADD) {
doc =
toSolrDoc(
(SolrInputDocument) entry.get(entry.size() - 1), core.getLatestSchema());
// toSolrDoc filtered copy-field targets already
if (transformer != null) {
transformer.transform(doc, -1); // unknown docID
}
} else if (oper == UpdateLog.UPDATE_INPLACE) {
assert entry.size() == 5;
// For in-place update case, we have obtained the partial document till now. We
// need to resolve it to a full document to be returned to the user.
// resolveFullDocument applies the transformer, if present.
doc =
resolveFullDocument(
core,
idBytes.get(),
rsp.getReturnFields(),
(SolrInputDocument) entry.get(entry.size() - 1),
entry);
if (doc == null) {
break; // document has been deleted as the resolve was going on
}
doc.visitSelfAndNestedDocs(
(label, d) -> removeCopyFieldTargets(d, req.getSchema()));
} else {
throw new SolrException(
ErrorCode.INVALID_STATE, "Expected ADD or UPDATE_INPLACE. Got: " + oper);
}
} finally {
searcherInfo.clear();
}

docList.add(doc);
break;
case UpdateLog.DELETE:
addDocListToResponse(rb, docList);
}

private static void extracted(
ResponseBuilder rb,
IdsRequested reqIds,
FieldType fieldType,
SolrParams params,
SearcherInfo searcherInfo,
UpdateLog ulog,
boolean mustUseRealtimeSearcher,
SolrCore core,
DocTransformer transformer,
SolrQueryResponse rsp,
SolrQueryRequest req,
SolrDocumentList docList)
throws IOException {
// this is initialized & set on the context *after* any searcher (re-)opening
ResultContext resultContext = null;
boolean opennedRealtimeSearcher = false;
BytesRefBuilder idBytes = new BytesRefBuilder();
Map<String, SolrDocumentFetcher.DVIterEntry> reuseDvIters = new HashMap<>();
for (String idStr : reqIds.allIds) {
fieldType.readableToIndexed(idStr, idBytes);
// if _route_ is passed, id is a child doc. TODO remove in SOLR-15064
if (!opennedRealtimeSearcher && !params.get(ShardParams._ROUTE_, idStr).equals(idStr)) {
searcherInfo.clear();
resultContext = null;
ulog.openRealtimeSearcher(); // force open a new realtime searcher
opennedRealtimeSearcher = true;
} else if (ulog != null) {
Object o = ulog.lookup(idBytes.get());
if (o != null) {
// should currently be a List<Oper,Ver,Doc/Id>
List<?> entry = (List<?>) o;
assert entry.size() >= 3;
int oper = (Integer) entry.get(UpdateLog.FLAGS_IDX) & UpdateLog.OPERATION_MASK;
switch (oper) {
case UpdateLog.UPDATE_INPLACE: // fall through to ADD
case UpdateLog.ADD:
if (mustUseRealtimeSearcher) {
// close handles to current searchers & result context
if (!opennedRealtimeSearcher) {
searcherInfo.clear();
resultContext = null;
ulog.openRealtimeSearcher(); // force open a new realtime searcher
opennedRealtimeSearcher = true;
}
// pretend we never found this record and fall through to use the searcher
o = null;
break;
default:
}

SolrDocument doc;
if (oper == UpdateLog.ADD) {
doc =
toSolrDoc(
(SolrInputDocument) entry.get(entry.size() - 1), core.getLatestSchema());
// toSolrDoc filtered copy-field targets already
if (transformer != null) {
transformer.transform(doc, -1); // unknown docID
}
} else if (oper == UpdateLog.UPDATE_INPLACE) {
assert entry.size() == 5;
// For in-place update case, we have obtained the partial document till now. We
// need to resolve it to a full document to be returned to the user.
// resolveFullDocument applies the transformer, if present.
doc =
resolveFullDocument(
core,
idBytes.get(),
rsp.getReturnFields(),
(SolrInputDocument) entry.get(entry.size() - 1),
entry);
if (doc == null) {
break; // document has been deleted as the resolve was going on
}
doc.visitSelfAndNestedDocs(
(label, d) -> removeCopyFieldTargets(d, req.getSchema()));
} else {
throw new SolrException(
SolrException.ErrorCode.SERVER_ERROR, "Unknown Operation! " + oper);
}
if (o != null) continue;
ErrorCode.INVALID_STATE, "Expected ADD or UPDATE_INPLACE. Got: " + oper);
}

docList.add(doc);
break;
case UpdateLog.DELETE:
break;
default:
throw new SolrException(ErrorCode.SERVER_ERROR, "Unknown Operation! " + oper);
}
if (o != null) continue;
}
}

// didn't find it in the update log, so it should be in the newest searcher opened
searcherInfo.init();
// don't bother with ResultContext yet, we won't need it if doc doesn't match filters

int docid = -1;
long segAndId = searcherInfo.getSearcher().lookupId(idBytes.get());
if (segAndId >= 0) {
int segid = (int) segAndId;
LeafReaderContext ctx =
searcherInfo.getSearcher().getTopReaderContext().leaves().get((int) (segAndId >> 32));
docid = segid + ctx.docBase;

if (rb.getFilters() != null) {
for (Query raw : rb.getFilters()) {
raw = makeQueryable(raw);
Query q = raw.rewrite(searcherInfo.getSearcher().getIndexReader());
Scorer scorer =
searcherInfo
.getSearcher()
.createWeight(q, ScoreMode.COMPLETE_NO_SCORES, 1f)
.scorer(ctx);
if (scorer == null || segid != scorer.iterator().advance(segid)) {
// filter doesn't match.
docid = -1;
break;
}
// didn't find it in the update log, so it should be in the newest searcher opened
searcherInfo.init();
// don't bother with ResultContext yet, we won't need it if doc doesn't match filters

int docid = -1;
long segAndId = searcherInfo.getSearcher().lookupId(idBytes.get());
if (segAndId >= 0) {
int segid = (int) segAndId;
LeafReaderContext ctx =
searcherInfo.getSearcher().getTopReaderContext().leaves().get((int) (segAndId >> 32));
docid = segid + ctx.docBase;

if (rb.getFilters() != null) {
for (Query raw : rb.getFilters()) {
raw = makeQueryable(raw);
Query q = raw.rewrite(searcherInfo.getSearcher().getIndexReader());
Scorer scorer =
searcherInfo
.getSearcher()
.createWeight(q, ScoreMode.COMPLETE_NO_SCORES, 1f)
.scorer(ctx);
if (scorer == null || segid != scorer.iterator().advance(segid)) {
// filter doesn't match.
docid = -1;
break;
}
}
}
}

if (docid < 0) continue;

Document luceneDocument =
searcherInfo.getSearcher().doc(docid, rsp.getReturnFields().getLuceneFieldNames());
SolrDocument doc = toSolrDoc(luceneDocument, core.getLatestSchema());
SolrDocumentFetcher docFetcher = searcherInfo.getSearcher().getDocFetcher();
docFetcher.decorateDocValueFields(
doc, docid, docFetcher.getNonStoredDVs(true), reuseDvIters);
if (null != transformer) {
if (null == resultContext) {
// either first pass, or we've re-opened searcher - either way now we setContext
resultContext =
new RTGResultContext(rsp.getReturnFields(), searcherInfo.getSearcher(), req);
transformer.setContext(
resultContext); // we avoid calling setContext unless searcher is new/changed
}
transformer.transform(doc, docid);
if (docid < 0) continue;

Document luceneDocument =
searcherInfo.getSearcher().doc(docid, rsp.getReturnFields().getLuceneFieldNames());
SolrDocument doc = toSolrDoc(luceneDocument, core.getLatestSchema());
SolrDocumentFetcher docFetcher = searcherInfo.getSearcher().getDocFetcher();
docFetcher.decorateDocValueFields(doc, docid, docFetcher.getNonStoredDVs(true), reuseDvIters);
if (null != transformer) {
if (null == resultContext) {
// either first pass, or we've re-opened searcher - either way now we setContext
resultContext =
new RTGResultContext(rsp.getReturnFields(), searcherInfo.getSearcher(), req);
transformer.setContext(
resultContext); // we avoid calling setContext unless searcher is new/changed
}
docList.add(doc);
} // loop on ids

} finally {
searcherInfo.clear();
}

addDocListToResponse(rb, docList);
transformer.transform(doc, docid);
}
docList.add(doc);
} // loop on ids
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,20 +49,28 @@ public static void writeQueryResponse(
String contentType)
throws IOException {

if (responseWriter instanceof JacksonJsonWriter) {
JacksonJsonWriter binWriter = (JacksonJsonWriter) responseWriter;
BufferedOutputStream bos = new BufferedOutputStream(new NonFlushingStream(outputStream));
binWriter.write(bos, solrRequest, solrResponse);
bos.flush();
} else if (responseWriter instanceof BinaryQueryResponseWriter) {
BinaryQueryResponseWriter binWriter = (BinaryQueryResponseWriter) responseWriter;
binWriter.write(outputStream, solrRequest, solrResponse);
} else {
OutputStream out = new NonFlushingStream(outputStream);
Writer writer = buildWriter(out, ContentStreamBase.getCharsetFromContentType(contentType));
responseWriter.write(writer, solrRequest, solrResponse);
writer.flush();
}
solrRequest
.getCoreContainer()
.storedFieldsExecute(
() -> {
if (responseWriter instanceof JacksonJsonWriter) {
JacksonJsonWriter binWriter = (JacksonJsonWriter) responseWriter;
BufferedOutputStream bos =
new BufferedOutputStream(new NonFlushingStream(outputStream));
binWriter.write(bos, solrRequest, solrResponse);
bos.flush();
} else if (responseWriter instanceof BinaryQueryResponseWriter) {
BinaryQueryResponseWriter binWriter = (BinaryQueryResponseWriter) responseWriter;
binWriter.write(outputStream, solrRequest, solrResponse);
} else {
OutputStream out = new NonFlushingStream(outputStream);
Writer writer =
buildWriter(out, ContentStreamBase.getCharsetFromContentType(contentType));
responseWriter.write(writer, solrRequest, solrResponse);
writer.flush();
}
return null;
});
}

private static Writer buildWriter(OutputStream outputStream, String charset)
Expand Down
Loading
Loading