diff --git a/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java b/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java index 801eac24e40..8979b52f3ae 100644 --- a/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java +++ b/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java @@ -168,29 +168,29 @@ public abstract class HTTPConduit public static final String NO_IO_EXCEPTIONS = "org.apache.cxf.transport.no_io_exceptions"; public static final String FORCE_HTTP_VERSION = "org.apache.cxf.transport.http.forceVersion"; - /** - * The HTTP status codes as contextual property (comma-separated integers as String) - * on the outgoing {@link Message} which lead to setting {@code org.apache.cxf.transport.service_not_available} - * for all responses with those status codes. This is used e.g. by the + /** + * The HTTP status codes as contextual property (comma-separated integers as String) + * on the outgoing {@link Message} which lead to setting {@code org.apache.cxf.transport.service_not_available} + * for all responses with those status codes. This is used e.g. by the * {@code org.apache.cxf.clustering.FailoverTargetSelector} to determine if it should do the fail-over. * Default: {@code 404,429,503} as per {@code DEFAULT_SERVICE_NOT_AVAILABLE_ON_HTTP_STATUS_CODES} */ - public static final String SERVICE_NOT_AVAILABLE_ON_HTTP_STATUS_CODES = + public static final String SERVICE_NOT_AVAILABLE_ON_HTTP_STATUS_CODES = "org.apache.cxf.transport.service_not_available_on_http_status_codes"; - - + + /** * The Logger for this class. */ protected static final Logger LOG = LogUtils.getL7dLogger(HTTPConduit.class); - + protected static final Set KNOWN_HTTP_VERBS_WITH_NO_CONTENT = new HashSet<>(Arrays.asList(new String[]{"GET", "HEAD", "OPTIONS", "TRACE"})); protected static final String HTTP_VERSION = SystemPropertyAction.getPropertyOrNull(FORCE_HTTP_VERSION); - private static final Collection DEFAULT_SERVICE_NOT_AVAILABLE_ON_HTTP_STATUS_CODES = + private static final Collection DEFAULT_SERVICE_NOT_AVAILABLE_ON_HTTP_STATUS_CODES = Arrays.asList(404, 429, 503); private static boolean hasLoggedAsyncWarning; @@ -1657,7 +1657,7 @@ protected int doProcessResponseCode() throws IOException { if (exchange != null) { exchange.put(Message.RESPONSE_CODE, rc); final Collection serviceNotAvailableOnHttpStatusCodes = MessageUtils - .getContextualIntegers(outMessage, SERVICE_NOT_AVAILABLE_ON_HTTP_STATUS_CODES, + .getContextualIntegers(outMessage, SERVICE_NOT_AVAILABLE_ON_HTTP_STATUS_CODES, DEFAULT_SERVICE_NOT_AVAILABLE_ON_HTTP_STATUS_CODES); if (serviceNotAvailableOnHttpStatusCodes.contains(rc)) { exchange.put("org.apache.cxf.transport.service_not_available", true); @@ -1697,7 +1697,7 @@ protected void handleResponseInternal() throws IOException { if ((!doProcessResponse(outMessage, responseCode) || HttpURLConnection.HTTP_ACCEPTED == responseCode) - && MessageUtils.getContextualBoolean(outMessage, + && MessageUtils.getContextualBoolean(outMessage, Message.PROCESS_202_RESPONSE_ONEWAY_OR_PARTIAL, true)) { in = getPartialResponse(); if (in == null @@ -1721,7 +1721,7 @@ protected void handleResponseInternal() throws IOException { exchange.put("IN_CHAIN_COMPLETE", Boolean.TRUE); exchange.setInMessage(inMessage); - if (MessageUtils.getContextualBoolean(outMessage, + if (MessageUtils.getContextualBoolean(outMessage, Message.PROPAGATE_202_RESPONSE_ONEWAY_OR_PARTIAL, false)) { incomingObserver.onMessage(inMessage); } diff --git a/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HttpClientHTTPConduit.java b/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HttpClientHTTPConduit.java index 0c7be9e1904..d40582d576e 100644 --- a/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HttpClientHTTPConduit.java +++ b/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HttpClientHTTPConduit.java @@ -106,7 +106,7 @@ public class HttpClientHTTPConduit extends URLConnectionHTTPConduit { volatile int lastTlsHash = -1; volatile URI sslURL; private final ReentrantLock initializationLock = new ReentrantLock(); - + private static final class RefCount { private final AtomicLong count = new AtomicLong(); private final TLSClientParameters clientParameters; @@ -120,12 +120,12 @@ private static final class RefCount { this.clientParameters = clientParameters; this.finalizer = finalizer; } - + RefCount acquire() { count.incrementAndGet(); return this; } - + void release() { if (count.decrementAndGet() == 0) { finalizer.run(); @@ -157,7 +157,7 @@ void release() { ((AutoCloseable)client).close(); } catch (Exception e) { //ignore - } + } } else if (client != null) { tryToShutdownSelector(client); } @@ -176,7 +176,7 @@ public TLSClientParameters clientParameters() { return clientParameters; } } - + private static final class HttpClientCache { private static final int MAX_SIZE = 100; // Keeping at most 100 clients @@ -184,9 +184,9 @@ private static final class HttpClientCache { private final ClientPolicyCalculator cpc = new ClientPolicyCalculator(); private final ReentrantLock lock = new ReentrantLock(); - RefCount computeIfAbsent(final boolean shareHttpClient, final HTTPClientPolicy policy, + RefCount computeIfAbsent(final boolean shareHttpClient, final HTTPClientPolicy policy, final TLSClientParameters clientParameters, final Supplier supplier) { - + // Do not share if it is not allowed for the conduit or cache capacity is exceeded if (!shareHttpClient || clients.size() >= MAX_SIZE) { return new RefCount(supplier.get(), policy, clientParameters, () -> { }).acquire(); @@ -231,7 +231,7 @@ void remove(final HTTPClientPolicy policy, final TLSClientParameters clientParam public HttpClientHTTPConduit(Bus b, EndpointInfo ei, EndpointReferenceType t) throws IOException { super(b, ei, t); } - + private static Set getRestrictedHeaders() { Set headers = new TreeSet<>(String.CASE_INSENSITIVE_ORDER); headers.addAll(Set.of("Connection", "Content-Length", "Expect", "Host", "Upgrade")); @@ -243,13 +243,22 @@ private boolean isSslTargetDifferent(URI lastURL, URI url) { || !lastURL.getHost().equals(url.getHost()) || lastURL.getPort() != url.getPort(); } - + @Override public void close(Message msg) throws IOException { + try { + OutputStream os = msg.getContent(OutputStream.class); + // Java 21 may hang on close, we flush stream to help close them out. + if (os != null && AutoCloseable.class.isAssignableFrom(HttpClient.class)) { + os.flush(); + } + } catch (IOException ioException) { + // ignore + } super.close(msg); msg.remove(HttpClient.class); } - + /** * Close the conduit */ @@ -264,17 +273,17 @@ public void close() { private static void tryToShutdownSelector(HttpClient client) { synchronized (client) { String n = client.toString(); - + // it can take three seconds (or more) for the JVM to determine the client // is unreferenced and then shutdown the selector thread, we'll try and speed that - // up. This is somewhat of a complete hack. + // up. This is somewhat of a complete hack. int idx = n.lastIndexOf('('); if (idx > 0) { n = n.substring(idx + 1); n = n.substring(0, n.length() - 1); n = "HttpClient-" + n + "-SelectorManager"; } - try { + try { ThreadGroup rootGroup = Thread.currentThread().getThreadGroup(); Thread[] threads = new Thread[rootGroup.activeCount()]; int cnt = rootGroup.enumerate(threads); @@ -293,7 +302,7 @@ private static void tryToShutdownSelector(HttpClient client) { } } } - + @Override protected void setupConnection(Message message, Address address, HTTPClientPolicy csPolicy) throws IOException { URI uri = address.getURI(); @@ -319,15 +328,15 @@ protected void setupConnection(Message message, Address address, HTTPClientPolic o = Boolean.TRUE; } if (clientParameters.isDisableCNCheck()) { - if (clientParameters.getSslContext() != null) { + if (clientParameters.getSslContext() != null) { // If they specify their own SSLContext, we cannot handle the // HostnameVerifier so we'll need to use the URLConnection o = Boolean.TRUE; } - if (clientParameters.getTrustManagers() != null + if (clientParameters.getTrustManagers() != null && JavaUtils.getJavaMajorVersion() < 14) { // trustmanagers hacks don't work on Java11 - o = Boolean.TRUE; + o = Boolean.TRUE; } } } @@ -336,7 +345,7 @@ protected void setupConnection(Message message, Address address, HTTPClientPolic super.setupConnection(message, address, csPolicy); return; } - + if (sslURL != null && isSslTargetDifferent(sslURL, uri)) { sslURL = null; if (clientRef != null) { @@ -360,17 +369,17 @@ protected void setupConnection(Message message, Address address, HTTPClientPolic HttpClient.Builder cb = HttpClient.newBuilder() .proxy(ps) .followRedirects(Redirect.NEVER); - + if (ctimeout > 0) { cb.connectTimeout(Duration.ofMillis(ctimeout)); } - + if ("https".equals(uri.getScheme())) { sslURL = uri; try { SSLContext sslContext = clientParameters.getSslContext(); if (sslContext == null) { - sslContext = SSLUtils.getSSLContext(clientParameters, true); + sslContext = SSLUtils.getSSLContext(clientParameters, true); cb.sslContext(sslContext); } if (sslContext != null) { @@ -382,13 +391,13 @@ protected void setupConnection(Message message, Address address, HTTPClientPolic sslContext.getSocketFactory().getDefaultCipherSuites(), supportedCiphers, LOG); - + if (clientParameters.getSecureSocketProtocol() != null) { String protocol = clientParameters.getSecureSocketProtocol(); SSLParameters params = new SSLParameters(cipherSuites, new String[] {protocol}); cb.sslParameters(params); } else { - final SSLParameters params = new SSLParameters(cipherSuites, + final SSLParameters params = new SSLParameters(cipherSuites, TLSClientParameters.getPreferredClientProtocols()); cb.sslParameters(params); } @@ -402,7 +411,7 @@ protected void setupConnection(Message message, Address address, HTTPClientPolic verc = csPolicy.getVersion(); } if ("1.1".equals(HTTP_VERSION) || "1.1".equals(verc)) { - cb.version(Version.HTTP_1_1); + cb.version(Version.HTTP_1_1); } // make sure the conduit is not yet initialized @@ -413,8 +422,8 @@ protected void setupConnection(Message message, Address address, HTTPClientPolic final boolean shareHttpClient = MessageUtils.getContextualBoolean(message, SHARE_HTTPCLIENT_CONDUIT, true); cl = CLIENTS_CACHE.computeIfAbsent(shareHttpClient, csPolicy, clientParameters, () -> cb.build()); - - if (!"https".equals(uri.getScheme()) + + if (!"https".equals(uri.getScheme()) && !KNOWN_HTTP_VERBS_WITH_NO_CONTENT.contains(httpRequestMethod) && cl.client().version() == Version.HTTP_2 && ("2".equals(verc) || ("auto".equals(verc) && "2".equals(HTTP_VERSION)))) { @@ -422,7 +431,7 @@ protected void setupConnection(Message message, Address address, HTTPClientPolic // We specifically want HTTP2, but we're using a request // that won't trigger an upgrade to HTTP/2 so we'll // call OPTIONS on the URI which may trigger HTTP/2 upgrade. - // Not needed for methods that don't have a body (GET/HEAD/etc...) + // Not needed for methods that don't have a body (GET/HEAD/etc...) // or for https (negotiated at the TLS level) HttpRequest.Builder rb = HttpRequest.newBuilder() .uri(uri) @@ -431,7 +440,7 @@ protected void setupConnection(Message message, Address address, HTTPClientPolic } catch (IOException | InterruptedException e) { // } - } + } clientRef = cl; } @@ -440,7 +449,7 @@ protected void setupConnection(Message message, Address address, HTTPClientPolic } } message.put(HttpClient.class, cl.client()); - + message.put(KEY_HTTP_CONNECTION_ADDRESS, address); } @@ -448,7 +457,7 @@ protected void setupConnection(Message message, Address address, HTTPClientPolic protected OutputStream createOutputStream(Message message, boolean needToCacheRequest, boolean isChunking, int chunkThreshold) throws IOException { - + Object o = message.get("USING_URLCONNECTION"); if (Boolean.TRUE == o) { return super.createOutputStream(message, needToCacheRequest, isChunking, chunkThreshold); @@ -508,7 +517,7 @@ static class HttpClientPipedOutputStream extends PipedOutputStream { HttpClientWrappedOutputStream stream; HTTPClientPolicy csPolicy; HttpClientBodyPublisher publisher; - HttpClientPipedOutputStream(HttpClientWrappedOutputStream s, + HttpClientPipedOutputStream(HttpClientWrappedOutputStream s, PipedInputStream pin, HTTPClientPolicy cp, HttpClientBodyPublisher bp) throws IOException { @@ -520,7 +529,7 @@ static class HttpClientPipedOutputStream extends PipedOutputStream { public void close() throws IOException { super.close(); csPolicy = null; - stream = null; + stream = null; if (publisher != null) { publisher.close(); publisher = null; @@ -587,7 +596,7 @@ private static final class InputStreamSupplier implements Supplier InputStreamSupplier(InputStream i) { in = i; } - + public InputStream get() { return in; } @@ -607,7 +616,7 @@ synchronized void close() { stream = null; } } - + @Override public synchronized void subscribe(Subscriber subscriber) { if (stream != null) { @@ -615,7 +624,7 @@ public synchronized void subscribe(Subscriber subscriber) { contentLen = stream.contentLen; if (stream.pout != null) { synchronized (stream.pout) { - stream.pout.notifyAll(); + stream.pout.notifyAll(); } if (stream != null) { contentLen = stream.contentLen; @@ -637,7 +646,7 @@ public long contentLength() { return contentLen; } } - class HttpClientWrappedOutputStream extends WrappedOutputStream { + class HttpClientWrappedOutputStream extends WrappedOutputStream { List> subscribers = new LinkedList<>(); CompletableFuture> future; @@ -648,8 +657,8 @@ class HttpClientWrappedOutputStream extends WrappedOutputStream { PipedOutputStream pout; HttpClientBodyPublisher publisher; HttpRequest request; - - + + HttpClientWrappedOutputStream(Message message, boolean needToCacheRequest, boolean isChunking, int chunkThreshold, String conduitName) { @@ -669,7 +678,7 @@ public void close() throws IOException { publisher = null; } request = null; - subscribers = null; + subscribers = null; } void addSubscriber(Flow.Subscriber subscriber) { subscribers.add(subscriber); @@ -679,7 +688,7 @@ void addSubscriber(Flow.Subscriber subscriber) { protected void setFixedLengthStreamingMode(int i) { contentLen = i; } - + @Override protected void handleNoOutput() throws IOException { contentLen = 0; @@ -738,9 +747,9 @@ public void setProtocolHeadersInBuilder(HttpRequest.Builder rb) throws IOExcepti if (!dropContentType) { rb.header(HttpHeaderHelper.CONTENT_TYPE, h.determineContentType()); } - } + } } - + private boolean isConnectionAttemptCompleted(HTTPClientPolicy csPolicy, PipedOutputStream out) throws IOException { if (!connectionComplete) { @@ -775,8 +784,8 @@ private boolean isConnectionAttemptCompleted(HTTPClientPolicy csPolicy, PipedOut } return true; } - - + + @Override protected void setProtocolHeaders() throws IOException { HttpClient cl = outMessage.get(HttpClient.class); @@ -785,7 +794,7 @@ protected void setProtocolHeaders() throws IOException { String httpRequestMethod = (String)outMessage.get(Message.HTTP_REQUEST_METHOD); - + if (KNOWN_HTTP_VERBS_WITH_NO_CONTENT.contains(httpRequestMethod) || PropertyUtils.isTrue(outMessage.get(Headers.EMPTY_REQUEST_PROPERTY))) { contentLen = 0; @@ -793,21 +802,21 @@ protected void setProtocolHeaders() throws IOException { final PipedInputStream pin = new PipedInputStream(csPolicy.getChunkLength() <= 0 ? 4096 : csPolicy.getChunkLength()); - + this.publisher = new HttpClientBodyPublisher(this, pin); if (contentLen != 0) { pout = new HttpClientPipedOutputStream(this, pin, csPolicy, publisher); } HttpRequest.Builder rb = HttpRequest.newBuilder() - .method(httpRequestMethod, publisher); + .method(httpRequestMethod, publisher); String verc = (String)outMessage.getContextualProperty(FORCE_HTTP_VERSION); if (verc == null) { verc = csPolicy.getVersion(); } if ("1.1".equals(HTTP_VERSION) || "1.1".equals(verc)) { - rb.version(Version.HTTP_1_1); - } + rb.version(Version.HTTP_1_1); + } try { rb.uri(address.getURI()); } catch (IllegalArgumentException iae) { @@ -815,17 +824,17 @@ protected void setProtocolHeaders() throws IOException { mex.initCause(iae); throw mex; } - + rtimeout = determineReceiveTimeout(outMessage, csPolicy); if (rtimeout > 0) { rb.timeout(Duration.ofMillis(rtimeout)); } setProtocolHeadersInBuilder(rb); - + request = rb.build(); - - + + final BodyHandler handler = BodyHandlers.ofInputStream(); if (System.getSecurityManager() != null) { try { @@ -891,7 +900,7 @@ HttpResponse getResponse() throws IOException { uhe.initCause(cause); throw uhe; } - + } if (t instanceof IOException) { IOException iot = (IOException)t; @@ -903,9 +912,9 @@ HttpResponse getResponse() throws IOException { } catch (TimeoutException e) { throw (IOException)(new HttpTimeoutException("Timeout").initCause(e)); } - + } - + @Override protected int getResponseCode() throws IOException { return getResponse().statusCode(); @@ -917,13 +926,13 @@ protected void updateResponseHeaders(Message inMessage) throws IOException { h.readFromConnection(rsp.headers().map()); if (rsp.headers().map().containsKey(Message.CONTENT_TYPE)) { List s = rsp.headers().allValues(Message.CONTENT_TYPE); - inMessage.put(Message.CONTENT_TYPE, String.join(",", s)); + inMessage.put(Message.CONTENT_TYPE, String.join(",", s)); } else { inMessage.put(Message.CONTENT_TYPE, null); } cookies.readFromHeaders(h); } - + @Override protected InputStream getInputStream() throws IOException { HttpResponse resp = getResponse(); @@ -1014,18 +1023,18 @@ protected String getResponseMessage() throws IOException { } return null; } - + @Override protected HttpsURLConnectionInfo getHttpsURLConnectionInfo() throws IOException { Address addrss = (Address)outMessage.get(KEY_HTTP_CONNECTION_ADDRESS); URI uri = addrss.getURI(); - + if ("http".equals(uri.getScheme())) { return null; } String method = (String)outMessage.get(Message.HTTP_REQUEST_METHOD); HttpClient cl = outMessage.get(HttpClient.class); - + while (!connectionComplete || !cl.sslContext().getClientSessionContext().getIds().hasMoreElements()) { Thread.yield(); } @@ -1036,15 +1045,15 @@ protected HttpsURLConnectionInfo getHttpsURLConnectionInfo() throws IOException Principal principal = session.getLocalPrincipal(); Certificate[] serverCerts = session.getPeerCertificates(); Principal peer = session.getPeerPrincipal(); - + HttpsURLConnectionInfo info = new HttpsURLConnectionInfo(uri, method, cipherSuite, localCerts, principal, serverCerts, peer); - + return info; } - + @Override protected boolean usingProxy() { @@ -1070,7 +1079,7 @@ protected InputStream getPartialResponse() throws IOException { } catch (IOException ioe) { // ignore } - } + } // Don't need to do anything return null; } @@ -1078,7 +1087,7 @@ protected InputStream getPartialResponse() throws IOException { @Override protected void setupNewConnection(String newURL) throws IOException { connectionComplete = false; - + HTTPClientPolicy cp = getClient(outMessage); Address address; try {