diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 312c227110e..b4c7f498832 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -89,6 +89,8 @@ Optimizations * SOLR-16845: BinaryResponseWriter should not attempt cast to Utf8CharSequence (Alex Deparvu via Houston Putman) +* SOLR-16265: reduce memory usage of ContentWriter based requests in Http2SolrClient (Alex Deparvu, Kevin Risden, David Smiley) + Bug Fixes --------------------- diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java index 5bbcadbb1e8..648504a816a 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java @@ -16,8 +16,6 @@ */ package org.apache.solr.client.solrj.impl; -import static org.apache.solr.client.solrj.impl.BaseHttpSolrClient.RemoteExecutionException; -import static org.apache.solr.client.solrj.impl.BaseHttpSolrClient.RemoteSolrException; import static org.apache.solr.common.util.Utils.getObjectByPath; import java.io.ByteArrayOutputStream; @@ -30,7 +28,6 @@ import java.net.CookieStore; import java.net.MalformedURLException; import java.net.URL; -import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -57,6 +54,8 @@ import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.V2RequestSupport; import org.apache.solr.client.solrj.embedded.SSLConfig; +import org.apache.solr.client.solrj.impl.BaseHttpSolrClient.RemoteExecutionException; +import org.apache.solr.client.solrj.impl.BaseHttpSolrClient.RemoteSolrException; import org.apache.solr.client.solrj.impl.HttpListenerFactory.RequestResponseListener; import org.apache.solr.client.solrj.request.RequestWriter; import org.apache.solr.client.solrj.request.UpdateRequest; @@ -83,13 +82,12 @@ import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Result; import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP; -import org.eclipse.jetty.client.util.ByteBufferContentProvider; -import org.eclipse.jetty.client.util.FormContentProvider; -import org.eclipse.jetty.client.util.InputStreamContentProvider; +import org.eclipse.jetty.client.util.FormRequestContent; +import org.eclipse.jetty.client.util.InputStreamRequestContent; import org.eclipse.jetty.client.util.InputStreamResponseListener; -import org.eclipse.jetty.client.util.MultiPartContentProvider; -import org.eclipse.jetty.client.util.OutputStreamContentProvider; -import org.eclipse.jetty.client.util.StringContentProvider; +import org.eclipse.jetty.client.util.MultiPartRequestContent; +import org.eclipse.jetty.client.util.OutputStreamRequestContent; +import org.eclipse.jetty.client.util.StringRequestContent; import org.eclipse.jetty.http.HttpField; import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpHeader; @@ -328,19 +326,19 @@ public long getIdleTimeout() { public static class OutStream implements Closeable { private final String origCollection; private final ModifiableSolrParams origParams; - private final OutputStreamContentProvider outProvider; + private final OutputStreamRequestContent content; private final InputStreamResponseListener responseListener; private final boolean isXml; public OutStream( String origCollection, ModifiableSolrParams origParams, - OutputStreamContentProvider outProvider, + OutputStreamRequestContent content, InputStreamResponseListener responseListener, boolean isXml) { this.origCollection = origCollection; this.origParams = origParams; - this.outProvider = outProvider; + this.content = content; this.responseListener = responseListener; this.isXml = isXml; } @@ -352,11 +350,11 @@ boolean belongToThisStream(SolrRequest solrRequest, String collection) { } public void write(byte b[]) throws IOException { - this.outProvider.getOutputStream().write(b); + this.content.getOutputStream().write(b); } public void flush() throws IOException { - this.outProvider.getOutputStream().flush(); + this.content.getOutputStream().flush(); } @Override @@ -364,7 +362,7 @@ public void close() throws IOException { if (isXml) { write("".getBytes(FALLBACK_CHARSET)); } - this.outProvider.getOutputStream().close(); + this.content.getOutputStream().close(); } // TODO this class should be hidden @@ -388,19 +386,18 @@ public OutStream initOutStream(String baseUrl, UpdateRequest updateRequest, Stri if (collection != null) basePath += "/" + collection; if (!basePath.endsWith("/")) basePath += "/"; - OutputStreamContentProvider provider = new OutputStreamContentProvider(); + OutputStreamRequestContent content = new OutputStreamRequestContent(contentType); Request postRequest = httpClient .newRequest(basePath + "update" + requestParams.toQueryString()) .method(HttpMethod.POST) - .header(HttpHeader.CONTENT_TYPE, contentType) - .content(provider); - decorateRequest(postRequest, updateRequest); + .body(content); + decorateRequest(postRequest, updateRequest, false); InputStreamResponseListener responseListener = new InputStreamResponseListener(); postRequest.send(responseListener); boolean isXml = ClientUtils.TEXT_XML.equals(requestWriter.getUpdateContentType()); - OutStream outStream = new OutStream(collection, origParams, provider, responseListener, isXml); + OutStream outStream = new OutStream(collection, origParams, content, responseListener, isXml); if (isXml) { outStream.write("".getBytes(FALLBACK_CHARSET)); } @@ -409,7 +406,7 @@ public OutStream initOutStream(String baseUrl, UpdateRequest updateRequest, Stri public void send(OutStream outStream, SolrRequest req, String collection) throws IOException { assert outStream.belongToThisStream(req, collection); - this.requestWriter.write(req, outStream.outProvider.getOutputStream()); + this.requestWriter.write(req, outStream.content.getOutputStream()); if (outStream.isXml) { // check for commit or optimize SolrParams params = req.getParams(); @@ -438,81 +435,77 @@ public void send(OutStream outStream, SolrRequest req, String collection) thr private static final Cancellable FAILED_MAKING_REQUEST_CANCELLABLE = () -> {}; public Cancellable asyncRequest( - SolrRequest solrRequest, - String collection, - AsyncListener> asyncListener) { + SolrRequest solrReq, String collection, AsyncListener> asyncListener) { + MDCCopyHelper mdcCopyHelper = new MDCCopyHelper(); + SolrRequest solrRequest = unwrapV2Request(solrReq); + Request req; try { - req = makeRequest(solrRequest, collection); - } catch (SolrServerException | IOException e) { - asyncListener.onFailure(e); - return FAILED_MAKING_REQUEST_CANCELLABLE; - } - final ResponseParser parser = - solrRequest.getResponseParser() == null ? this.parser : solrRequest.getResponseParser(); - MDCCopyHelper mdcCopyHelper = new MDCCopyHelper(); - req.onRequestQueued(asyncTracker.queuedListener) - .onComplete(asyncTracker.completeListener) - .send( - new InputStreamResponseListener() { - @Override - public void onHeaders(Response response) { - super.onHeaders(response); - InputStreamResponseListener listener = this; - executor.execute( - () -> { - InputStream is = listener.getInputStream(); - assert ObjectReleaseTracker.track(is); - try { - NamedList body = - processErrorsAndResponse( - solrRequest, parser, response, is, req.getURI().toString()); + String url = getRequestPath(solrRequest, collection); + InputStreamResponseListener listener = + new InputStreamResponseListener() { + @Override + public void onHeaders(Response response) { + super.onHeaders(response); + executor.execute( + () -> { + InputStream is = getInputStream(); + assert ObjectReleaseTracker.track(is); + try { + NamedList body = + processErrorsAndResponse(solrRequest, response, is, url); + mdcCopyHelper.onBegin(null); + log.debug("response processing success"); + asyncListener.onSuccess(body); + } catch (RemoteSolrException e) { + if (SolrException.getRootCause(e) != CANCELLED_EXCEPTION) { mdcCopyHelper.onBegin(null); - log.debug("response processing success"); - asyncListener.onSuccess(body); - } catch (RemoteSolrException e) { - if (SolrException.getRootCause(e) != CANCELLED_EXCEPTION) { - mdcCopyHelper.onBegin(null); - log.debug("response processing failed"); - asyncListener.onFailure(e); - } - } catch (SolrServerException e) { - mdcCopyHelper.onBegin(null); - log.debug("response processing failed"); + log.debug("response processing failed", e); asyncListener.onFailure(e); - } finally { - log.debug("response processing completed"); - mdcCopyHelper.onComplete(null); } - }); - } + } catch (SolrServerException e) { + mdcCopyHelper.onBegin(null); + log.debug("response processing failed", e); + asyncListener.onFailure(e); + } finally { + log.debug("response processing completed"); + mdcCopyHelper.onComplete(null); + } + }); + } - @Override - public void onFailure(Response response, Throwable failure) { - super.onFailure(response, failure); - if (failure != CANCELLED_EXCEPTION) { - asyncListener.onFailure(new SolrServerException(failure.getMessage(), failure)); - } + @Override + public void onFailure(Response response, Throwable failure) { + super.onFailure(response, failure); + if (failure != CANCELLED_EXCEPTION) { + asyncListener.onFailure(new SolrServerException(failure.getMessage(), failure)); } - }); + } + }; + + req = makeRequestAndSend(solrRequest, url, listener, true); + } catch (SolrServerException | IOException e) { + asyncListener.onFailure(e); + return FAILED_MAKING_REQUEST_CANCELLABLE; + } return () -> req.abort(CANCELLED_EXCEPTION); } @Override public NamedList request(SolrRequest solrRequest, String collection) throws SolrServerException, IOException { - Request req = makeRequest(solrRequest, collection); - final ResponseParser parser = - solrRequest.getResponseParser() == null ? this.parser : solrRequest.getResponseParser(); + solrRequest = unwrapV2Request(solrRequest); + String url = getRequestPath(solrRequest, collection); Throwable abortCause = null; + Request req = null; try { InputStreamResponseListener listener = new InputStreamResponseListener(); - req.send(listener); + req = makeRequestAndSend(solrRequest, url, listener, false); Response response = listener.get(idleTimeoutMillis, TimeUnit.MILLISECONDS); InputStream is = listener.getInputStream(); assert ObjectReleaseTracker.track(is); - return processErrorsAndResponse(solrRequest, parser, response, is, req.getURI().toString()); + return processErrorsAndResponse(solrRequest, response, is, req.getURI().toString()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); abortCause = e; @@ -544,12 +537,10 @@ public NamedList request(SolrRequest solrRequest, String collection) } private NamedList processErrorsAndResponse( - SolrRequest solrRequest, - ResponseParser parser, - Response response, - InputStream is, - String urlExceptionMessage) + SolrRequest solrRequest, Response response, InputStream is, String urlExceptionMessage) throws SolrServerException { + ResponseParser parser = + solrRequest.getResponseParser() == null ? this.parser : solrRequest.getResponseParser(); String contentType = response.getHeaders().get(HttpHeader.CONTENT_TYPE); String mimeType = null; String encoding = null; @@ -566,9 +557,9 @@ private void setBasicAuthHeader(SolrRequest solrRequest, Request req) { String encoded = basicAuthCredentialsToAuthorizationString( solrRequest.getBasicAuthUser(), solrRequest.getBasicAuthPassword()); - req.header("Authorization", encoded); + req.headers(headers -> headers.put("Authorization", encoded)); } else if (basicAuthAuthorizationStr != null) { - req.header("Authorization", basicAuthAuthorizationStr); + req.headers(headers -> headers.put("Authorization", basicAuthAuthorizationStr)); } } @@ -577,15 +568,9 @@ static String basicAuthCredentialsToAuthorizationString(String user, String pass return "Basic " + Base64.getEncoder().encodeToString(userPass.getBytes(FALLBACK_CHARSET)); } - private Request makeRequest(SolrRequest solrRequest, String collection) - throws SolrServerException, IOException { - Request req = createRequest(solrRequest, collection); - decorateRequest(req, solrRequest); - return req; - } + private void decorateRequest(Request req, SolrRequest solrRequest, boolean isAsync) { + req.headers(headers -> headers.remove(HttpHeader.ACCEPT_ENCODING)); - private void decorateRequest(Request req, SolrRequest solrRequest) { - req.header(HttpHeader.ACCEPT_ENCODING, null); if (requestTimeoutMillis > 0) { req.timeout(requestTimeoutMillis, TimeUnit.MILLISECONDS); } else { @@ -603,11 +588,17 @@ private void decorateRequest(Request req, SolrRequest solrRequest) { req.onComplete(listener); } + if (isAsync) { + req.onRequestQueued(asyncTracker.queuedListener); + req.onComplete(asyncTracker.completeListener); + } + Map headers = solrRequest.getHeaders(); if (headers != null) { - for (Map.Entry entry : headers.entrySet()) { - req.header(entry.getKey(), entry.getValue()); - } + req.headers( + h -> + headers.entrySet().stream() + .forEach(entry -> h.add(entry.getKey(), entry.getValue()))); } } @@ -617,38 +608,19 @@ private String changeV2RequestEndpoint(String basePath) throws MalformedURLExcep return new URL(oldURL.getProtocol(), oldURL.getHost(), oldURL.getPort(), newPath).toString(); } - private Request createRequest(SolrRequest solrRequest, String collection) - throws IOException, SolrServerException { + private SolrRequest unwrapV2Request(SolrRequest solrRequest) { if (solrRequest.getBasePath() == null && serverBaseUrl == null) throw new IllegalArgumentException("Destination node is not provided!"); if (solrRequest instanceof V2RequestSupport) { - solrRequest = ((V2RequestSupport) solrRequest).getV2Request(); - } - SolrParams params = solrRequest.getParams(); - RequestWriter.ContentWriter contentWriter = requestWriter.getContentWriter(solrRequest); - Collection streams = - contentWriter == null ? requestWriter.getContentStreams(solrRequest) : null; - String path = requestWriter.getPath(solrRequest); - if (path == null || !path.startsWith("/")) { - path = DEFAULT_PATH; - } - - ResponseParser parser = solrRequest.getResponseParser(); - if (parser == null) { - parser = this.parser; - } - - // The parser 'wt=' and 'version=' params are used instead of the original - // params - ModifiableSolrParams wparams = new ModifiableSolrParams(params); - if (parser != null) { - wparams.set(CommonParams.WT, parser.getWriterType()); - wparams.set(CommonParams.VERSION, parser.getVersion()); + return ((V2RequestSupport) solrRequest).getV2Request(); + } else { + return solrRequest; } + } - // TODO add invariantParams support - + private String getRequestPath(SolrRequest solrRequest, String collection) + throws MalformedURLException { String basePath = solrRequest.getBasePath() == null ? serverBaseUrl : solrRequest.getBasePath(); if (collection != null) basePath += "/" + collection; @@ -659,62 +631,94 @@ private Request createRequest(SolrRequest solrRequest, String collection) basePath = serverBaseUrl + "/____v2"; } } + String path = requestWriter.getPath(solrRequest); + if (path == null || !path.startsWith("/")) { + path = DEFAULT_PATH; + } + + return basePath + path; + } + + private Request makeRequestAndSend( + SolrRequest solrRequest, String url, InputStreamResponseListener listener, boolean isAsync) + throws IOException, SolrServerException { + + // TODO add invariantParams support + ResponseParser parser = + solrRequest.getResponseParser() == null ? this.parser : solrRequest.getResponseParser(); + + // The parser 'wt=' and 'version=' params are used instead of the original + // params + ModifiableSolrParams wparams = new ModifiableSolrParams(solrRequest.getParams()); + wparams.set(CommonParams.WT, parser.getWriterType()); + wparams.set(CommonParams.VERSION, parser.getVersion()); if (SolrRequest.METHOD.GET == solrRequest.getMethod()) { - if (streams != null || contentWriter != null) { + RequestWriter.ContentWriter contentWriter = requestWriter.getContentWriter(solrRequest); + Collection streams = + contentWriter == null ? requestWriter.getContentStreams(solrRequest) : null; + if (contentWriter != null || streams != null) { throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "GET can't send streams!"); } - - return httpClient - .newRequest(basePath + path + wparams.toQueryString()) - .method(HttpMethod.GET); + var r = httpClient.newRequest(url + wparams.toQueryString()).method(HttpMethod.GET); + decorateRequest(r, solrRequest, isAsync); + r.send(listener); + return r; } if (SolrRequest.METHOD.DELETE == solrRequest.getMethod()) { - return httpClient - .newRequest(basePath + path + wparams.toQueryString()) - .method(HttpMethod.DELETE); + var r = httpClient.newRequest(url + wparams.toQueryString()).method(HttpMethod.DELETE); + decorateRequest(r, solrRequest, isAsync); + r.send(listener); + return r; } if (SolrRequest.METHOD.POST == solrRequest.getMethod() || SolrRequest.METHOD.PUT == solrRequest.getMethod()) { + RequestWriter.ContentWriter contentWriter = requestWriter.getContentWriter(solrRequest); + Collection streams = + contentWriter == null ? requestWriter.getContentStreams(solrRequest) : null; - String url = basePath + path; - boolean hasNullStreamName = false; + boolean isMultipart = false; if (streams != null) { + boolean hasNullStreamName = false; hasNullStreamName = streams.stream().anyMatch(cs -> cs.getName() == null); + isMultipart = !hasNullStreamName && streams.size() > 1; } - boolean isMultipart = streams != null && streams.size() > 1 && !hasNullStreamName; - HttpMethod method = SolrRequest.METHOD.POST == solrRequest.getMethod() ? HttpMethod.POST : HttpMethod.PUT; if (contentWriter != null) { - Request req = httpClient.newRequest(url + wparams.toQueryString()).method(method); - Utils.BAOS baos = new Utils.BAOS(); - contentWriter.write(baos); - - // SOLR-16265: TODO reduce memory usage - return req.content( - // We're throwing this BAOS away, so no need to copy the byte[], just use the raw buf - new ByteBufferContentProvider( - contentWriter.getContentType(), ByteBuffer.wrap(baos.getbuf(), 0, baos.size()))); + var content = new OutputStreamRequestContent(contentWriter.getContentType()); + var r = httpClient.newRequest(url + wparams.toQueryString()).method(method).body(content); + decorateRequest(r, solrRequest, isAsync); + r.send(listener); + try (var output = content.getOutputStream()) { + contentWriter.write(output); + } + return r; + } else if (streams == null || isMultipart) { // send server list and request list as query string params ModifiableSolrParams queryParams = calculateQueryParams(this.urlParamNames, wparams); queryParams.add(calculateQueryParams(solrRequest.getQueryParams(), wparams)); Request req = httpClient.newRequest(url + queryParams.toQueryString()).method(method); - return fillContentStream(req, streams, wparams, isMultipart); + var r = fillContentStream(req, streams, wparams, isMultipart); + decorateRequest(r, solrRequest, isAsync); + r.send(listener); + return r; + } else { // It is has one stream, it is the post body, put the params in the URL ContentStream contentStream = streams.iterator().next(); - return httpClient - .newRequest(url + wparams.toQueryString()) - .method(method) - .content( - new InputStreamContentProvider(contentStream.getStream()), - contentStream.getContentType()); + var content = + new InputStreamRequestContent( + contentStream.getContentType(), contentStream.getStream()); + var r = httpClient.newRequest(url + wparams.toQueryString()).method(method).body(content); + decorateRequest(r, solrRequest, isAsync); + r.send(listener); + return r; } } @@ -729,37 +733,38 @@ private Request fillContentStream( throws IOException { if (isMultipart) { // multipart/form-data - MultiPartContentProvider content = new MultiPartContentProvider(); - Iterator iter = wparams.getParameterNamesIterator(); - while (iter.hasNext()) { - String key = iter.next(); - String[] vals = wparams.getParams(key); - if (vals != null) { - for (String val : vals) { - content.addFieldPart(key, new StringContentProvider(val), null); + try (MultiPartRequestContent content = new MultiPartRequestContent()) { + Iterator iter = wparams.getParameterNamesIterator(); + while (iter.hasNext()) { + String key = iter.next(); + String[] vals = wparams.getParams(key); + if (vals != null) { + for (String val : vals) { + content.addFieldPart(key, new StringRequestContent(val), null); + } } } - } - if (streams != null) { - for (ContentStream contentStream : streams) { - String contentType = contentStream.getContentType(); - if (contentType == null) { - contentType = "multipart/form-data"; // default - } - String name = contentStream.getName(); - if (name == null) { - name = ""; + if (streams != null) { + for (ContentStream contentStream : streams) { + String contentType = contentStream.getContentType(); + if (contentType == null) { + contentType = "multipart/form-data"; // default + } + String name = contentStream.getName(); + if (name == null) { + name = ""; + } + HttpFields.Mutable fields = HttpFields.build(1); + fields.add(HttpHeader.CONTENT_TYPE, contentType); + content.addFilePart( + name, + contentStream.getName(), + new InputStreamRequestContent(contentStream.getStream()), + fields); } - HttpFields.Mutable fields = HttpFields.build(1); - fields.add(HttpHeader.CONTENT_TYPE, contentType); - content.addFilePart( - name, - contentStream.getName(), - new InputStreamContentProvider(contentStream.getStream()), - fields); } + req.body(content); } - req.content(content); } else { // application/x-www-form-urlencoded Fields fields = new Fields(); @@ -773,7 +778,7 @@ private Request fillContentStream( } } } - req.content(new FormContentProvider(fields, FALLBACK_CHARSET)); + req.body(new FormRequestContent(fields, FALLBACK_CHARSET)); } return req;