From c9dcfb0b8b02eea8f45784c30e5d06861fa56b4c Mon Sep 17 00:00:00 2001 From: Michael Gibney Date: Fri, 4 Oct 2024 16:50:08 -0400 Subject: [PATCH 1/2] dedicated storedFields executor experiment --- .../org/apache/solr/core/CoreContainer.java | 15 ++ .../component/RealTimeGetComponent.java | 253 +++++++++--------- .../response/QueryResponseWriterUtil.java | 31 ++- .../org/apache/solr/util/SolrPluginUtils.java | 10 +- 4 files changed, 169 insertions(+), 140 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java index 2e550e0ed77..f7827a5ef7e 100644 --- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java +++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java @@ -50,6 +50,7 @@ import java.util.Properties; import java.util.Set; import java.util.UUID; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -177,6 +178,19 @@ public class CoreContainer { final SolrCores solrCores; + private final ExecutorService storedFieldsExecutor = ExecutorUtil.newMDCAwareCachedThreadPool("storedFieldsExecutor"); + + public void storedFieldsExecute(Callable callable) throws IOException { + Future future = storedFieldsExecutor.submit(callable); + try { + future.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (ExecutionException e) { + throw org.apache.lucene.util.IOUtils.rethrowAlways(e.getCause()); + } + } + public static class CoreLoadFailure { public final CoreDescriptor cd; @@ -1240,6 +1254,7 @@ public void shutdown() { } ExecutorUtil.shutdownAndAwaitTermination(coreContainerWorkExecutor); + ExecutorUtil.shutdownAndAwaitTermination(storedFieldsExecutor); // First wake up the closer thread, it'll terminate almost immediately since it checks // isShutDown. diff --git a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java index cee432be945..9e2457ed3d4 100644 --- a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java +++ b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java @@ -224,8 +224,6 @@ public void process(ResponseBuilder rb) throws IOException { SearcherInfo searcherInfo = new SearcherInfo(core); - // this is initialized & set on the context *after* any searcher (re-)opening - ResultContext resultContext = null; final DocTransformer transformer = rsp.getReturnFields().getTransformer(); // true in any situation where we have to use a realtime searcher rather then returning docs @@ -237,139 +235,148 @@ public void process(ResponseBuilder rb) throws IOException { try { - boolean opennedRealtimeSearcher = false; - BytesRefBuilder idBytes = new BytesRefBuilder(); - Map reuseDvIters = new HashMap<>(); - for (String idStr : reqIds.allIds) { - fieldType.readableToIndexed(idStr, idBytes); - // if _route_ is passed, id is a child doc. TODO remove in SOLR-15064 - if (!opennedRealtimeSearcher && !params.get(ShardParams._ROUTE_, idStr).equals(idStr)) { - searcherInfo.clear(); - resultContext = null; - ulog.openRealtimeSearcher(); // force open a new realtime searcher - opennedRealtimeSearcher = true; - } else if (ulog != null) { - Object o = ulog.lookup(idBytes.get()); - if (o != null) { - // should currently be a List - List entry = (List) o; - assert entry.size() >= 3; - int oper = (Integer) entry.get(UpdateLog.FLAGS_IDX) & UpdateLog.OPERATION_MASK; - switch (oper) { - case UpdateLog.UPDATE_INPLACE: // fall through to ADD - case UpdateLog.ADD: - if (mustUseRealtimeSearcher) { - // close handles to current searchers & result context - if (!opennedRealtimeSearcher) { - searcherInfo.clear(); - resultContext = null; - ulog.openRealtimeSearcher(); // force open a new realtime searcher - opennedRealtimeSearcher = true; - } - // pretend we never found this record and fall through to use the searcher - o = null; - break; - } + req.getCoreContainer().storedFieldsExecute(() -> { + extracted(rb, reqIds, fieldType, params, searcherInfo, ulog, mustUseRealtimeSearcher, core, transformer, rsp, req, docList); + return null; + }); - SolrDocument doc; - if (oper == UpdateLog.ADD) { - doc = - toSolrDoc( - (SolrInputDocument) entry.get(entry.size() - 1), core.getLatestSchema()); - // toSolrDoc filtered copy-field targets already - if (transformer != null) { - transformer.transform(doc, -1); // unknown docID - } - } else if (oper == UpdateLog.UPDATE_INPLACE) { - assert entry.size() == 5; - // For in-place update case, we have obtained the partial document till now. We - // need to resolve it to a full document to be returned to the user. - // resolveFullDocument applies the transformer, if present. - doc = - resolveFullDocument( - core, - idBytes.get(), - rsp.getReturnFields(), - (SolrInputDocument) entry.get(entry.size() - 1), - entry); - if (doc == null) { - break; // document has been deleted as the resolve was going on - } - doc.visitSelfAndNestedDocs( - (label, d) -> removeCopyFieldTargets(d, req.getSchema())); - } else { - throw new SolrException( - ErrorCode.INVALID_STATE, "Expected ADD or UPDATE_INPLACE. Got: " + oper); - } + } finally { + searcherInfo.clear(); + } - docList.add(doc); - break; - case UpdateLog.DELETE: + addDocListToResponse(rb, docList); + } + + private static void extracted(ResponseBuilder rb, IdsRequested reqIds, FieldType fieldType, SolrParams params, SearcherInfo searcherInfo, UpdateLog ulog, boolean mustUseRealtimeSearcher, SolrCore core, DocTransformer transformer, SolrQueryResponse rsp, SolrQueryRequest req, SolrDocumentList docList) throws IOException { + // this is initialized & set on the context *after* any searcher (re-)opening + ResultContext resultContext = null; + boolean opennedRealtimeSearcher = false; + BytesRefBuilder idBytes = new BytesRefBuilder(); + Map reuseDvIters = new HashMap<>(); + for (String idStr : reqIds.allIds) { + fieldType.readableToIndexed(idStr, idBytes); + // if _route_ is passed, id is a child doc. TODO remove in SOLR-15064 + if (!opennedRealtimeSearcher && !params.get(ShardParams._ROUTE_, idStr).equals(idStr)) { + searcherInfo.clear(); + resultContext = null; + ulog.openRealtimeSearcher(); // force open a new realtime searcher + opennedRealtimeSearcher = true; + } else if (ulog != null) { + Object o = ulog.lookup(idBytes.get()); + if (o != null) { + // should currently be a List + List entry = (List) o; + assert entry.size() >= 3; + int oper = (Integer) entry.get(UpdateLog.FLAGS_IDX) & UpdateLog.OPERATION_MASK; + switch (oper) { + case UpdateLog.UPDATE_INPLACE: // fall through to ADD + case UpdateLog.ADD: + if (mustUseRealtimeSearcher) { + // close handles to current searchers & result context + if (!opennedRealtimeSearcher) { + searcherInfo.clear(); + resultContext = null; + ulog.openRealtimeSearcher(); // force open a new realtime searcher + opennedRealtimeSearcher = true; + } + // pretend we never found this record and fall through to use the searcher + o = null; break; - default: + } + + SolrDocument doc; + if (oper == UpdateLog.ADD) { + doc = + toSolrDoc( + (SolrInputDocument) entry.get(entry.size() - 1), core.getLatestSchema()); + // toSolrDoc filtered copy-field targets already + if (transformer != null) { + transformer.transform(doc, -1); // unknown docID + } + } else if (oper == UpdateLog.UPDATE_INPLACE) { + assert entry.size() == 5; + // For in-place update case, we have obtained the partial document till now. We + // need to resolve it to a full document to be returned to the user. + // resolveFullDocument applies the transformer, if present. + doc = + resolveFullDocument( + core, + idBytes.get(), + rsp.getReturnFields(), + (SolrInputDocument) entry.get(entry.size() - 1), + entry); + if (doc == null) { + break; // document has been deleted as the resolve was going on + } + doc.visitSelfAndNestedDocs( + (label, d) -> removeCopyFieldTargets(d, req.getSchema())); + } else { throw new SolrException( - SolrException.ErrorCode.SERVER_ERROR, "Unknown Operation! " + oper); - } - if (o != null) continue; + ErrorCode.INVALID_STATE, "Expected ADD or UPDATE_INPLACE. Got: " + oper); + } + + docList.add(doc); + break; + case UpdateLog.DELETE: + break; + default: + throw new SolrException( + ErrorCode.SERVER_ERROR, "Unknown Operation! " + oper); } + if (o != null) continue; } + } - // didn't find it in the update log, so it should be in the newest searcher opened - searcherInfo.init(); - // don't bother with ResultContext yet, we won't need it if doc doesn't match filters - - int docid = -1; - long segAndId = searcherInfo.getSearcher().lookupId(idBytes.get()); - if (segAndId >= 0) { - int segid = (int) segAndId; - LeafReaderContext ctx = - searcherInfo.getSearcher().getTopReaderContext().leaves().get((int) (segAndId >> 32)); - docid = segid + ctx.docBase; - - if (rb.getFilters() != null) { - for (Query raw : rb.getFilters()) { - raw = makeQueryable(raw); - Query q = raw.rewrite(searcherInfo.getSearcher().getIndexReader()); - Scorer scorer = - searcherInfo - .getSearcher() - .createWeight(q, ScoreMode.COMPLETE_NO_SCORES, 1f) - .scorer(ctx); - if (scorer == null || segid != scorer.iterator().advance(segid)) { - // filter doesn't match. - docid = -1; - break; - } + // didn't find it in the update log, so it should be in the newest searcher opened + searcherInfo.init(); + // don't bother with ResultContext yet, we won't need it if doc doesn't match filters + + int docid = -1; + long segAndId = searcherInfo.getSearcher().lookupId(idBytes.get()); + if (segAndId >= 0) { + int segid = (int) segAndId; + LeafReaderContext ctx = + searcherInfo.getSearcher().getTopReaderContext().leaves().get((int) (segAndId >> 32)); + docid = segid + ctx.docBase; + + if (rb.getFilters() != null) { + for (Query raw : rb.getFilters()) { + raw = makeQueryable(raw); + Query q = raw.rewrite(searcherInfo.getSearcher().getIndexReader()); + Scorer scorer = + searcherInfo + .getSearcher() + .createWeight(q, ScoreMode.COMPLETE_NO_SCORES, 1f) + .scorer(ctx); + if (scorer == null || segid != scorer.iterator().advance(segid)) { + // filter doesn't match. + docid = -1; + break; } } } + } - if (docid < 0) continue; - - Document luceneDocument = - searcherInfo.getSearcher().doc(docid, rsp.getReturnFields().getLuceneFieldNames()); - SolrDocument doc = toSolrDoc(luceneDocument, core.getLatestSchema()); - SolrDocumentFetcher docFetcher = searcherInfo.getSearcher().getDocFetcher(); - docFetcher.decorateDocValueFields( - doc, docid, docFetcher.getNonStoredDVs(true), reuseDvIters); - if (null != transformer) { - if (null == resultContext) { - // either first pass, or we've re-opened searcher - either way now we setContext - resultContext = - new RTGResultContext(rsp.getReturnFields(), searcherInfo.getSearcher(), req); - transformer.setContext( - resultContext); // we avoid calling setContext unless searcher is new/changed - } - transformer.transform(doc, docid); + if (docid < 0) continue; + + Document luceneDocument = + searcherInfo.getSearcher().doc(docid, rsp.getReturnFields().getLuceneFieldNames()); + SolrDocument doc = toSolrDoc(luceneDocument, core.getLatestSchema()); + SolrDocumentFetcher docFetcher = searcherInfo.getSearcher().getDocFetcher(); + docFetcher.decorateDocValueFields( + doc, docid, docFetcher.getNonStoredDVs(true), reuseDvIters); + if (null != transformer) { + if (null == resultContext) { + // either first pass, or we've re-opened searcher - either way now we setContext + resultContext = + new RTGResultContext(rsp.getReturnFields(), searcherInfo.getSearcher(), req); + transformer.setContext( + resultContext); // we avoid calling setContext unless searcher is new/changed } - docList.add(doc); - } // loop on ids - - } finally { - searcherInfo.clear(); - } - - addDocListToResponse(rb, docList); + transformer.transform(doc, docid); + } + docList.add(doc); + } // loop on ids } /** diff --git a/solr/core/src/java/org/apache/solr/response/QueryResponseWriterUtil.java b/solr/core/src/java/org/apache/solr/response/QueryResponseWriterUtil.java index 05f79bf9113..acf88229373 100644 --- a/solr/core/src/java/org/apache/solr/response/QueryResponseWriterUtil.java +++ b/solr/core/src/java/org/apache/solr/response/QueryResponseWriterUtil.java @@ -49,20 +49,23 @@ public static void writeQueryResponse( String contentType) throws IOException { - if (responseWriter instanceof JacksonJsonWriter) { - JacksonJsonWriter binWriter = (JacksonJsonWriter) responseWriter; - BufferedOutputStream bos = new BufferedOutputStream(new NonFlushingStream(outputStream)); - binWriter.write(bos, solrRequest, solrResponse); - bos.flush(); - } else if (responseWriter instanceof BinaryQueryResponseWriter) { - BinaryQueryResponseWriter binWriter = (BinaryQueryResponseWriter) responseWriter; - binWriter.write(outputStream, solrRequest, solrResponse); - } else { - OutputStream out = new NonFlushingStream(outputStream); - Writer writer = buildWriter(out, ContentStreamBase.getCharsetFromContentType(contentType)); - responseWriter.write(writer, solrRequest, solrResponse); - writer.flush(); - } + solrRequest.getCoreContainer().storedFieldsExecute(() -> { + if (responseWriter instanceof JacksonJsonWriter) { + JacksonJsonWriter binWriter = (JacksonJsonWriter) responseWriter; + BufferedOutputStream bos = new BufferedOutputStream(new NonFlushingStream(outputStream)); + binWriter.write(bos, solrRequest, solrResponse); + bos.flush(); + } else if (responseWriter instanceof BinaryQueryResponseWriter) { + BinaryQueryResponseWriter binWriter = (BinaryQueryResponseWriter) responseWriter; + binWriter.write(outputStream, solrRequest, solrResponse); + } else { + OutputStream out = new NonFlushingStream(outputStream); + Writer writer = buildWriter(out, ContentStreamBase.getCharsetFromContentType(contentType)); + responseWriter.write(writer, solrRequest, solrResponse); + writer.flush(); + } + return null; + }); } private static Writer buildWriter(OutputStream outputStream, String charset) diff --git a/solr/core/src/java/org/apache/solr/util/SolrPluginUtils.java b/solr/core/src/java/org/apache/solr/util/SolrPluginUtils.java index 75364fb7730..ac3272eeac4 100644 --- a/solr/core/src/java/org/apache/solr/util/SolrPluginUtils.java +++ b/solr/core/src/java/org/apache/solr/util/SolrPluginUtils.java @@ -251,9 +251,13 @@ public static void optimizePreFetchDocs( // get documents DocIterator iter = docs.iterator(); - for (int i = 0; i < docs.size(); i++) { - searcher.doc(iter.nextDoc(), fieldFilter); - } + Set fieldFilterF = fieldFilter; + req.getCoreContainer().storedFieldsExecute(() -> { + for (int i = 0; i < docs.size(); i++) { + searcher.doc(iter.nextDoc(), fieldFilterF); + } + return null; + }); } } From 088bda02d961042c2e0526a6f72eacd4bbf858cf Mon Sep 17 00:00:00 2001 From: Michael Gibney Date: Fri, 4 Oct 2024 16:58:34 -0400 Subject: [PATCH 2/2] tidy --- .../org/apache/solr/core/CoreContainer.java | 3 +- .../component/RealTimeGetComponent.java | 43 +++++++++++++++---- .../response/QueryResponseWriterUtil.java | 39 +++++++++-------- .../org/apache/solr/util/SolrPluginUtils.java | 14 +++--- 4 files changed, 66 insertions(+), 33 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java index f7827a5ef7e..4d9cdbdeb63 100644 --- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java +++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java @@ -178,7 +178,8 @@ public class CoreContainer { final SolrCores solrCores; - private final ExecutorService storedFieldsExecutor = ExecutorUtil.newMDCAwareCachedThreadPool("storedFieldsExecutor"); + private final ExecutorService storedFieldsExecutor = + ExecutorUtil.newMDCAwareCachedThreadPool("storedFieldsExecutor"); public void storedFieldsExecute(Callable callable) throws IOException { Future future = storedFieldsExecutor.submit(callable); diff --git a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java index 9e2457ed3d4..49760b40fa4 100644 --- a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java +++ b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java @@ -235,10 +235,24 @@ public void process(ResponseBuilder rb) throws IOException { try { - req.getCoreContainer().storedFieldsExecute(() -> { - extracted(rb, reqIds, fieldType, params, searcherInfo, ulog, mustUseRealtimeSearcher, core, transformer, rsp, req, docList); - return null; - }); + req.getCoreContainer() + .storedFieldsExecute( + () -> { + extracted( + rb, + reqIds, + fieldType, + params, + searcherInfo, + ulog, + mustUseRealtimeSearcher, + core, + transformer, + rsp, + req, + docList); + return null; + }); } finally { searcherInfo.clear(); @@ -247,7 +261,20 @@ public void process(ResponseBuilder rb) throws IOException { addDocListToResponse(rb, docList); } - private static void extracted(ResponseBuilder rb, IdsRequested reqIds, FieldType fieldType, SolrParams params, SearcherInfo searcherInfo, UpdateLog ulog, boolean mustUseRealtimeSearcher, SolrCore core, DocTransformer transformer, SolrQueryResponse rsp, SolrQueryRequest req, SolrDocumentList docList) throws IOException { + private static void extracted( + ResponseBuilder rb, + IdsRequested reqIds, + FieldType fieldType, + SolrParams params, + SearcherInfo searcherInfo, + UpdateLog ulog, + boolean mustUseRealtimeSearcher, + SolrCore core, + DocTransformer transformer, + SolrQueryResponse rsp, + SolrQueryRequest req, + SolrDocumentList docList) + throws IOException { // this is initialized & set on the context *after* any searcher (re-)opening ResultContext resultContext = null; boolean opennedRealtimeSearcher = false; @@ -320,8 +347,7 @@ private static void extracted(ResponseBuilder rb, IdsRequested reqIds, FieldType case UpdateLog.DELETE: break; default: - throw new SolrException( - ErrorCode.SERVER_ERROR, "Unknown Operation! " + oper); + throw new SolrException(ErrorCode.SERVER_ERROR, "Unknown Operation! " + oper); } if (o != null) continue; } @@ -363,8 +389,7 @@ private static void extracted(ResponseBuilder rb, IdsRequested reqIds, FieldType searcherInfo.getSearcher().doc(docid, rsp.getReturnFields().getLuceneFieldNames()); SolrDocument doc = toSolrDoc(luceneDocument, core.getLatestSchema()); SolrDocumentFetcher docFetcher = searcherInfo.getSearcher().getDocFetcher(); - docFetcher.decorateDocValueFields( - doc, docid, docFetcher.getNonStoredDVs(true), reuseDvIters); + docFetcher.decorateDocValueFields(doc, docid, docFetcher.getNonStoredDVs(true), reuseDvIters); if (null != transformer) { if (null == resultContext) { // either first pass, or we've re-opened searcher - either way now we setContext diff --git a/solr/core/src/java/org/apache/solr/response/QueryResponseWriterUtil.java b/solr/core/src/java/org/apache/solr/response/QueryResponseWriterUtil.java index acf88229373..cad0925b4ae 100644 --- a/solr/core/src/java/org/apache/solr/response/QueryResponseWriterUtil.java +++ b/solr/core/src/java/org/apache/solr/response/QueryResponseWriterUtil.java @@ -49,23 +49,28 @@ public static void writeQueryResponse( String contentType) throws IOException { - solrRequest.getCoreContainer().storedFieldsExecute(() -> { - if (responseWriter instanceof JacksonJsonWriter) { - JacksonJsonWriter binWriter = (JacksonJsonWriter) responseWriter; - BufferedOutputStream bos = new BufferedOutputStream(new NonFlushingStream(outputStream)); - binWriter.write(bos, solrRequest, solrResponse); - bos.flush(); - } else if (responseWriter instanceof BinaryQueryResponseWriter) { - BinaryQueryResponseWriter binWriter = (BinaryQueryResponseWriter) responseWriter; - binWriter.write(outputStream, solrRequest, solrResponse); - } else { - OutputStream out = new NonFlushingStream(outputStream); - Writer writer = buildWriter(out, ContentStreamBase.getCharsetFromContentType(contentType)); - responseWriter.write(writer, solrRequest, solrResponse); - writer.flush(); - } - return null; - }); + solrRequest + .getCoreContainer() + .storedFieldsExecute( + () -> { + if (responseWriter instanceof JacksonJsonWriter) { + JacksonJsonWriter binWriter = (JacksonJsonWriter) responseWriter; + BufferedOutputStream bos = + new BufferedOutputStream(new NonFlushingStream(outputStream)); + binWriter.write(bos, solrRequest, solrResponse); + bos.flush(); + } else if (responseWriter instanceof BinaryQueryResponseWriter) { + BinaryQueryResponseWriter binWriter = (BinaryQueryResponseWriter) responseWriter; + binWriter.write(outputStream, solrRequest, solrResponse); + } else { + OutputStream out = new NonFlushingStream(outputStream); + Writer writer = + buildWriter(out, ContentStreamBase.getCharsetFromContentType(contentType)); + responseWriter.write(writer, solrRequest, solrResponse); + writer.flush(); + } + return null; + }); } private static Writer buildWriter(OutputStream outputStream, String charset) diff --git a/solr/core/src/java/org/apache/solr/util/SolrPluginUtils.java b/solr/core/src/java/org/apache/solr/util/SolrPluginUtils.java index ac3272eeac4..f448201aaec 100644 --- a/solr/core/src/java/org/apache/solr/util/SolrPluginUtils.java +++ b/solr/core/src/java/org/apache/solr/util/SolrPluginUtils.java @@ -252,12 +252,14 @@ public static void optimizePreFetchDocs( // get documents DocIterator iter = docs.iterator(); Set fieldFilterF = fieldFilter; - req.getCoreContainer().storedFieldsExecute(() -> { - for (int i = 0; i < docs.size(); i++) { - searcher.doc(iter.nextDoc(), fieldFilterF); - } - return null; - }); + req.getCoreContainer() + .storedFieldsExecute( + () -> { + for (int i = 0; i < docs.size(); i++) { + searcher.doc(iter.nextDoc(), fieldFilterF); + } + return null; + }); } }