From 4d660f152477a76e41a9669472495be8cdc86c10 Mon Sep 17 00:00:00 2001 From: Andriy Redko Date: Tue, 16 Apr 2024 10:44:42 -0400 Subject: [PATCH] [Streaming Indexing] Ensure support of the new transport by security plugin (#13174) (#13229) Signed-off-by: Andriy Redko (cherry picked from commit 6afbcd40e72ec0a0ff6d3e28d5a3e0773a7fb741) --- CHANGELOG.md | 1 + .../http/netty4/Netty4HttpClient.java | 4 +- .../SecureNetty4HttpServerTransportTests.java | 4 +- .../ssl/SimpleSecureNetty4TransportTests.java | 5 +- .../transport/netty4/ssl/TrustAllManager.java | 28 - .../ReactorNetty4HttpServerTransport.java | 125 +++- .../http/reactor/netty4/ssl/SslUtils.java | 42 ++ .../http/reactor/netty4/ssl/package-info.java | 12 + .../reactor/ReactorNetty4Plugin.java | 54 ++ .../reactor/netty4/ReactorHttpClient.java | 55 +- ...ReactorNetty4HttpServerTransportTests.java | 595 ++++++++++++++++++ .../src/test/resources/README.txt | 14 + .../src/test/resources/certificate.crt | 22 + .../src/test/resources/certificate.key | 28 + 14 files changed, 938 insertions(+), 51 deletions(-) delete mode 100644 modules/transport-netty4/src/test/java/org/opensearch/transport/netty4/ssl/TrustAllManager.java create mode 100644 plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ssl/SslUtils.java create mode 100644 plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ssl/package-info.java create mode 100644 plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java create mode 100644 plugins/transport-reactor-netty4/src/test/resources/README.txt create mode 100644 plugins/transport-reactor-netty4/src/test/resources/certificate.crt create mode 100644 plugins/transport-reactor-netty4/src/test/resources/certificate.key diff --git a/CHANGELOG.md b/CHANGELOG.md index ca177106b68e7..e4f074fa096c8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add changes for overriding remote store and replication settings during snapshot restore. ([#11868](https://github.com/opensearch-project/OpenSearch/pull/11868)) - Reject Resize index requests (i.e, split, shrink and clone), While DocRep to SegRep migration is in progress.([#12686](https://github.com/opensearch-project/OpenSearch/pull/12686)) - Add an individual setting of rate limiter for segment replication ([#12959](https://github.com/opensearch-project/OpenSearch/pull/12959)) +- [Streaming Indexing] Ensure support of the new transport by security plugin ([#13174](https://github.com/opensearch-project/OpenSearch/pull/13174)) ### Dependencies - Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896)) diff --git a/modules/transport-netty4/src/test/java/org/opensearch/http/netty4/Netty4HttpClient.java b/modules/transport-netty4/src/test/java/org/opensearch/http/netty4/Netty4HttpClient.java index f33a1b8ba989f..46003d8360fb6 100644 --- a/modules/transport-netty4/src/test/java/org/opensearch/http/netty4/Netty4HttpClient.java +++ b/modules/transport-netty4/src/test/java/org/opensearch/http/netty4/Netty4HttpClient.java @@ -37,7 +37,6 @@ import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.tasks.Task; import org.opensearch.transport.NettyAllocator; -import org.opensearch.transport.netty4.ssl.TrustAllManager; import java.io.Closeable; import java.net.SocketAddress; @@ -75,6 +74,7 @@ import io.netty.handler.ssl.ClientAuth; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.SslHandler; +import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import static io.netty.handler.codec.http.HttpHeaderNames.HOST; import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; @@ -222,7 +222,7 @@ protected void initChannel(SocketChannel ch) throws Exception { final SslHandler sslHandler = new SslHandler( SslContextBuilder.forClient() .clientAuth(ClientAuth.NONE) - .trustManager(TrustAllManager.INSTANCE) + .trustManager(InsecureTrustManagerFactory.INSTANCE) .build() .newEngine(ch.alloc()) ); diff --git a/modules/transport-netty4/src/test/java/org/opensearch/http/netty4/ssl/SecureNetty4HttpServerTransportTests.java b/modules/transport-netty4/src/test/java/org/opensearch/http/netty4/ssl/SecureNetty4HttpServerTransportTests.java index e79a066ad8f63..f80ad901ce765 100644 --- a/modules/transport-netty4/src/test/java/org/opensearch/http/netty4/ssl/SecureNetty4HttpServerTransportTests.java +++ b/modules/transport-netty4/src/test/java/org/opensearch/http/netty4/ssl/SecureNetty4HttpServerTransportTests.java @@ -41,7 +41,6 @@ import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.NettyAllocator; import org.opensearch.transport.SharedGroupFactory; -import org.opensearch.transport.netty4.ssl.TrustAllManager; import org.junit.After; import org.junit.Before; @@ -84,6 +83,7 @@ import io.netty.handler.codec.http.HttpUtil; import io.netty.handler.codec.http.HttpVersion; import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import static org.opensearch.core.rest.RestStatus.BAD_REQUEST; import static org.opensearch.core.rest.RestStatus.OK; @@ -131,7 +131,7 @@ public Optional buildSecureHttpServerEngine(Settings settings, HttpSe keyManagerFactory.init(keyStore, "password".toCharArray()); SSLEngine engine = SslContextBuilder.forServer(keyManagerFactory) - .trustManager(TrustAllManager.INSTANCE) + .trustManager(InsecureTrustManagerFactory.INSTANCE) .build() .newEngine(NettyAllocator.getAllocator()); return Optional.of(engine); diff --git a/modules/transport-netty4/src/test/java/org/opensearch/transport/netty4/ssl/SimpleSecureNetty4TransportTests.java b/modules/transport-netty4/src/test/java/org/opensearch/transport/netty4/ssl/SimpleSecureNetty4TransportTests.java index df3b005f40903..e0600aebd90e5 100644 --- a/modules/transport-netty4/src/test/java/org/opensearch/transport/netty4/ssl/SimpleSecureNetty4TransportTests.java +++ b/modules/transport-netty4/src/test/java/org/opensearch/transport/netty4/ssl/SimpleSecureNetty4TransportTests.java @@ -55,6 +55,7 @@ import io.netty.handler.ssl.ClientAuth; import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; @@ -87,7 +88,7 @@ public Optional buildSecureServerTransportEngine(Settings settings, T SSLEngine engine = SslContextBuilder.forServer(keyManagerFactory) .clientAuth(ClientAuth.NONE) - .trustManager(TrustAllManager.INSTANCE) + .trustManager(InsecureTrustManagerFactory.INSTANCE) .build() .newEngine(NettyAllocator.getAllocator()); return Optional.of(engine); @@ -103,7 +104,7 @@ public Optional buildSecureClientTransportEngine(Settings settings, S return Optional.of( SslContextBuilder.forClient() .clientAuth(ClientAuth.NONE) - .trustManager(TrustAllManager.INSTANCE) + .trustManager(InsecureTrustManagerFactory.INSTANCE) .build() .newEngine(NettyAllocator.getAllocator()) ); diff --git a/modules/transport-netty4/src/test/java/org/opensearch/transport/netty4/ssl/TrustAllManager.java b/modules/transport-netty4/src/test/java/org/opensearch/transport/netty4/ssl/TrustAllManager.java deleted file mode 100644 index a38c542b5780e..0000000000000 --- a/modules/transport-netty4/src/test/java/org/opensearch/transport/netty4/ssl/TrustAllManager.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.transport.netty4.ssl; - -import javax.net.ssl.X509TrustManager; - -import java.security.cert.CertificateException; -import java.security.cert.X509Certificate; - -public class TrustAllManager implements X509TrustManager { - public static final X509TrustManager INSTANCE = new TrustAllManager(); - - private TrustAllManager() {} - - public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException {} - - public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException {} - - public X509Certificate[] getAcceptedIssuers() { - return new X509Certificate[0]; - } -} diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java index d4a5a9ad83af6..bd1646d753016 100644 --- a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java @@ -8,6 +8,7 @@ package org.opensearch.http.reactor.netty4; +import org.opensearch.common.Nullable; import org.opensearch.common.network.NetworkService; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; @@ -23,21 +24,33 @@ import org.opensearch.http.HttpChannel; import org.opensearch.http.HttpReadTimeoutException; import org.opensearch.http.HttpServerChannel; +import org.opensearch.http.reactor.netty4.ssl.SslUtils; +import org.opensearch.plugins.SecureHttpTransportSettingsProvider; import org.opensearch.telemetry.tracing.Tracer; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.reactor.SharedGroupFactory; import org.opensearch.transport.reactor.netty4.Netty4Utils; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLException; +import javax.net.ssl.SSLSessionContext; + import java.net.InetSocketAddress; import java.net.SocketOption; import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import io.netty.buffer.ByteBufAllocator; import io.netty.channel.ChannelOption; import io.netty.channel.socket.nio.NioChannelOption; import io.netty.handler.codec.http.DefaultLastHttpContent; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.HttpContent; +import io.netty.handler.ssl.ApplicationProtocolNegotiator; +import io.netty.handler.ssl.SslContext; import io.netty.handler.timeout.ReadTimeoutException; +import io.netty.util.ReferenceCountUtil; import org.reactivestreams.Publisher; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; @@ -116,6 +129,7 @@ public class ReactorNetty4HttpServerTransport extends AbstractHttpServerTranspor private final ByteSizeValue maxInitialLineLength; private final ByteSizeValue maxHeaderSize; private final ByteSizeValue maxChunkSize; + private final SecureHttpTransportSettingsProvider secureHttpTransportSettingsProvider; private volatile SharedGroupFactory.SharedGroup sharedGroup; private volatile DisposableServer disposableServer; private volatile Scheduler scheduler; @@ -142,6 +156,45 @@ public ReactorNetty4HttpServerTransport( ClusterSettings clusterSettings, SharedGroupFactory sharedGroupFactory, Tracer tracer + ) { + this( + settings, + networkService, + bigArrays, + threadPool, + xContentRegistry, + dispatcher, + clusterSettings, + sharedGroupFactory, + null, + tracer + ); + } + + /** + * Creates new HTTP transport implementations based on Reactor Netty (see please {@link HttpServer}). + * @param settings settings + * @param networkService network service + * @param bigArrays big array allocator + * @param threadPool thread pool instance + * @param xContentRegistry XContent registry instance + * @param dispatcher dispatcher instance + * @param clusterSettings cluster settings + * @param sharedGroupFactory shared group factory + * @param secureHttpTransportSettingsProvider secure HTTP transport settings provider + * @param tracer tracer instance + */ + public ReactorNetty4HttpServerTransport( + Settings settings, + NetworkService networkService, + BigArrays bigArrays, + ThreadPool threadPool, + NamedXContentRegistry xContentRegistry, + Dispatcher dispatcher, + ClusterSettings clusterSettings, + SharedGroupFactory sharedGroupFactory, + @Nullable SecureHttpTransportSettingsProvider secureHttpTransportSettingsProvider, + Tracer tracer ) { super(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher, clusterSettings, tracer); Netty4Utils.setAvailableProcessors(OpenSearchExecutors.NODE_PROCESSORS_SETTING.get(settings)); @@ -152,6 +205,7 @@ public ReactorNetty4HttpServerTransport( this.maxChunkSize = SETTING_HTTP_MAX_CHUNK_SIZE.get(settings); this.maxHeaderSize = SETTING_HTTP_MAX_HEADER_SIZE.get(settings); this.maxInitialLineLength = SETTING_HTTP_MAX_INITIAL_LINE_LENGTH.get(settings); + this.secureHttpTransportSettingsProvider = secureHttpTransportSettingsProvider; } /** @@ -160,7 +214,7 @@ public ReactorNetty4HttpServerTransport( */ @Override protected HttpServerChannel bind(InetSocketAddress socketAddress) throws Exception { - final HttpServer server = configureChannelOptions( + final HttpServer server = configure( HttpServer.create() .httpFormDecoder(builder -> builder.scheduler(scheduler)) .idleTimeout(Duration.ofMillis(connectTimeoutMillis)) @@ -173,7 +227,6 @@ protected HttpServerChannel bind(InetSocketAddress socketAddress) throws Excepti .maxHeaderSize(maxHeaderSize.bytesAsInt()) .maxInitialLineLength(maxInitialLineLength.bytesAsInt()) ) - .protocol(HttpProtocol.HTTP11, HttpProtocol.H2C) .handle((req, res) -> incomingRequest(req, res)) ); @@ -181,8 +234,8 @@ protected HttpServerChannel bind(InetSocketAddress socketAddress) throws Excepti return new ReactorNetty4HttpServerChannel(disposableServer.channel()); } - private HttpServer configureChannelOptions(final HttpServer server1) { - HttpServer configured = server1.childOption(ChannelOption.TCP_NODELAY, SETTING_HTTP_TCP_NO_DELAY.get(settings)) + private HttpServer configure(final HttpServer server) throws Exception { + HttpServer configured = server.childOption(ChannelOption.TCP_NODELAY, SETTING_HTTP_TCP_NO_DELAY.get(settings)) .childOption(ChannelOption.SO_KEEPALIVE, SETTING_HTTP_TCP_KEEP_ALIVE.get(settings)); if (SETTING_HTTP_TCP_KEEP_ALIVE.get(settings)) { @@ -229,6 +282,65 @@ private HttpServer configureChannelOptions(final HttpServer server1) { configured = configured.option(ChannelOption.SO_REUSEADDR, reuseAddress); configured = configured.childOption(ChannelOption.SO_REUSEADDR, reuseAddress); + // Configure SSL context if available + if (secureHttpTransportSettingsProvider != null) { + final SSLEngine engine = secureHttpTransportSettingsProvider.buildSecureHttpServerEngine(settings, this) + .orElseGet(SslUtils::createDefaultServerSSLEngine); + + try { + final List cipherSuites = Arrays.asList(engine.getEnabledCipherSuites()); + final List applicationProtocols = Arrays.asList(engine.getSSLParameters().getApplicationProtocols()); + + configured = configured.secure(spec -> spec.sslContext(new SslContext() { + @Override + public SSLSessionContext sessionContext() { + throw new UnsupportedOperationException(); /* server only, should never be called */ + } + + @Override + public SSLEngine newEngine(ByteBufAllocator alloc, String peerHost, int peerPort) { + throw new UnsupportedOperationException(); /* server only, should never be called */ + } + + @Override + public SSLEngine newEngine(ByteBufAllocator alloc) { + try { + return secureHttpTransportSettingsProvider.buildSecureHttpServerEngine( + settings, + ReactorNetty4HttpServerTransport.this + ).orElseGet(SslUtils::createDefaultServerSSLEngine); + } catch (final SSLException ex) { + throw new UnsupportedOperationException("Unable to create SSLEngine", ex); + } + } + + @Override + public boolean isClient() { + return false; /* server only */ + } + + @Override + public List cipherSuites() { + return cipherSuites; + } + + @Override + public ApplicationProtocolNegotiator applicationProtocolNegotiator() { + return new ApplicationProtocolNegotiator() { + @Override + public List protocols() { + return applicationProtocols; + } + }; + } + }).build()).protocol(HttpProtocol.HTTP11, HttpProtocol.H2); + } finally { + ReferenceCountUtil.release(engine); + } + } else { + configured = configured.protocol(HttpProtocol.HTTP11, HttpProtocol.H2C); + } + return configured; } @@ -302,6 +414,11 @@ protected void doStart() { } } + /** + * Exception handler + * @param channel HTTP channel + * @param cause exception occurred + */ @Override public void onException(HttpChannel channel, Exception cause) { if (cause instanceof ReadTimeoutException) { diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ssl/SslUtils.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ssl/SslUtils.java new file mode 100644 index 0000000000000..a0b4319c7c2d1 --- /dev/null +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ssl/SslUtils.java @@ -0,0 +1,42 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ +package org.opensearch.http.reactor.netty4.ssl; + +import org.opensearch.OpenSearchSecurityException; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; + +import java.security.NoSuchAlgorithmException; + +/** + * Helper class for creating default SSL engines + */ +public class SslUtils { + private static final String[] DEFAULT_SSL_PROTOCOLS = { "TLSv1.3", "TLSv1.2", "TLSv1.1" }; + + private SslUtils() {} + + /** + * Creates default server {@link SSLEngine} instance + * @return default server {@link SSLEngine} instance + */ + public static SSLEngine createDefaultServerSSLEngine() { + try { + final SSLEngine engine = SSLContext.getDefault().createSSLEngine(); + engine.setEnabledProtocols(DEFAULT_SSL_PROTOCOLS); + engine.setUseClientMode(false); + return engine; + } catch (final NoSuchAlgorithmException ex) { + throw new OpenSearchSecurityException("Unable to initialize default server SSL engine", ex); + } + } +} diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ssl/package-info.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ssl/package-info.java new file mode 100644 index 0000000000000..95dbd2d6bd9ca --- /dev/null +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ssl/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * SSL supporting utility classes + */ +package org.opensearch.http.reactor.netty4.ssl; diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/transport/reactor/ReactorNetty4Plugin.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/transport/reactor/ReactorNetty4Plugin.java index dc310c3793109..6e5b0215b58a4 100644 --- a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/transport/reactor/ReactorNetty4Plugin.java +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/transport/reactor/ReactorNetty4Plugin.java @@ -17,9 +17,11 @@ import org.opensearch.core.indices.breaker.CircuitBreakerService; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.http.HttpServerTransport; +import org.opensearch.http.HttpServerTransport.Dispatcher; import org.opensearch.http.reactor.netty4.ReactorNetty4HttpServerTransport; import org.opensearch.plugins.NetworkPlugin; import org.opensearch.plugins.Plugin; +import org.opensearch.plugins.SecureHttpTransportSettingsProvider; import org.opensearch.telemetry.tracing.Tracer; import org.opensearch.threadpool.ThreadPool; @@ -38,6 +40,11 @@ public class ReactorNetty4Plugin extends Plugin implements NetworkPlugin { */ public static final String REACTOR_NETTY_HTTP_TRANSPORT_NAME = "reactor-netty4"; + /** + * The name of new experimental secure HTTP transport implementations based on Reactor Netty. + */ + public static final String REACTOR_NETTY_SECURE_HTTP_TRANSPORT_NAME = "reactor-netty4-secure"; + private final SetOnce groupFactory = new SetOnce<>(); /** @@ -91,6 +98,53 @@ public Map> getHttpTransports( dispatcher, clusterSettings, getSharedGroupFactory(settings), + null, /* no security settings provider */ + tracer + ) + ); + } + + /** + * Returns a map of {@link HttpServerTransport} suppliers. + * See {@link org.opensearch.common.network.NetworkModule#HTTP_TYPE_SETTING} to configure a specific implementation. + * @param settings settings + * @param networkService network service + * @param bigArrays big array allocator + * @param pageCacheRecycler page cache recycler instance + * @param circuitBreakerService circuit breaker service instance + * @param threadPool thread pool instance + * @param xContentRegistry XContent registry instance + * @param dispatcher dispatcher instance + * @param clusterSettings cluster settings + * @param secureHttpTransportSettingsProvider secure HTTP transport settings provider + * @param tracer tracer instance + */ + @Override + public Map> getSecureHttpTransports( + Settings settings, + ThreadPool threadPool, + BigArrays bigArrays, + PageCacheRecycler pageCacheRecycler, + CircuitBreakerService circuitBreakerService, + NamedXContentRegistry xContentRegistry, + NetworkService networkService, + Dispatcher dispatcher, + ClusterSettings clusterSettings, + SecureHttpTransportSettingsProvider secureHttpTransportSettingsProvider, + Tracer tracer + ) { + return Collections.singletonMap( + REACTOR_NETTY_SECURE_HTTP_TRANSPORT_NAME, + () -> new ReactorNetty4HttpServerTransport( + settings, + networkService, + bigArrays, + threadPool, + xContentRegistry, + dispatcher, + clusterSettings, + getSharedGroupFactory(settings), + secureHttpTransportSettingsProvider, tracer ) ); diff --git a/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorHttpClient.java b/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorHttpClient.java index 443ecd0f40ead..920c895205023 100644 --- a/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorHttpClient.java +++ b/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorHttpClient.java @@ -15,6 +15,7 @@ import org.opensearch.common.collect.Tuple; import org.opensearch.tasks.Task; +import org.opensearch.test.OpenSearchTestCase; import java.io.Closeable; import java.net.InetSocketAddress; @@ -38,10 +39,14 @@ import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.codec.http.HttpVersion; import io.netty.handler.codec.http2.HttpConversionUtil; +import io.netty.handler.ssl.ClientAuth; +import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import io.netty.resolver.DefaultAddressResolverGroup; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.ParallelFlux; +import reactor.netty.http.Http11SslContextSpec; +import reactor.netty.http.Http2SslContextSpec; import reactor.netty.http.client.HttpClient; import static io.netty.handler.codec.http.HttpHeaderNames.HOST; @@ -50,8 +55,9 @@ /** * Tiny helper to send http requests over netty. */ -class ReactorHttpClient implements Closeable { +public class ReactorHttpClient implements Closeable { private final boolean compression; + private final boolean secure; static Collection returnHttpResponseBodies(Collection responses) { List list = new ArrayList<>(responses.size()); @@ -69,16 +75,21 @@ static Collection returnOpaqueIds(Collection responses return list; } - ReactorHttpClient(boolean compression) { + public ReactorHttpClient(boolean compression, boolean secure) { this.compression = compression; + this.secure = secure; } - static ReactorHttpClient create() { + public static ReactorHttpClient create() { return create(true); } - static ReactorHttpClient create(boolean compression) { - return new ReactorHttpClient(compression); + public static ReactorHttpClient create(boolean compression) { + return new ReactorHttpClient(compression, false); + } + + public static ReactorHttpClient https() { + return new ReactorHttpClient(true, true); } public List get(InetSocketAddress remoteAddress, String... uris) throws InterruptedException { @@ -92,7 +103,7 @@ public List get(InetSocketAddress remoteAddress, boolean order final FullHttpRequest httpRequest = new DefaultFullHttpRequest(HTTP_1_1, HttpMethod.GET, uris[i]); httpRequest.headers().add(HOST, "localhost"); httpRequest.headers().add("X-Opaque-ID", String.valueOf(i)); - httpRequest.headers().add(HttpConversionUtil.ExtensionHeaderNames.SCHEME.text(), "http"); + httpRequest.headers().add(HttpConversionUtil.ExtensionHeaderNames.SCHEME.text(), secure ? "https" : "http"); requests.add(httpRequest); } @@ -148,7 +159,7 @@ private List processRequestsWithBody( request.headers().add(HttpHeaderNames.HOST, "localhost"); request.headers().add(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes()); request.headers().add(HttpHeaderNames.CONTENT_TYPE, "application/json"); - request.headers().add(HttpConversionUtil.ExtensionHeaderNames.SCHEME.text(), "http"); + request.headers().add(HttpConversionUtil.ExtensionHeaderNames.SCHEME.text(), secure ? "https" : "http"); request.headers().add("X-Opaque-ID", String.valueOf(i)); requests.add(request); } @@ -162,12 +173,7 @@ private List sendRequests( ) { final NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(1); try { - final HttpClient client = HttpClient.newConnection() - .resolver(DefaultAddressResolverGroup.INSTANCE) - .runOn(eventLoopGroup) - .host(remoteAddress.getHostString()) - .port(remoteAddress.getPort()) - .compress(compression); + final HttpClient client = createClient(remoteAddress, eventLoopGroup); @SuppressWarnings("unchecked") final Mono[] monos = requests.stream() @@ -201,6 +207,29 @@ private List sendRequests( } } + private HttpClient createClient(final InetSocketAddress remoteAddress, final NioEventLoopGroup eventLoopGroup) { + final HttpClient client = HttpClient.newConnection() + .resolver(DefaultAddressResolverGroup.INSTANCE) + .runOn(eventLoopGroup) + .host(remoteAddress.getHostString()) + .port(remoteAddress.getPort()) + .compress(compression); + + if (secure) { + return client.secure( + spec -> spec.sslContext( + OpenSearchTestCase.randomBoolean() + /* switch between HTTP 1.1/HTTP 2 randomly, both are supported */ ? Http11SslContextSpec.forClient() + .configure(s -> s.clientAuth(ClientAuth.NONE).trustManager(InsecureTrustManagerFactory.INSTANCE)) + : Http2SslContextSpec.forClient() + .configure(s -> s.clientAuth(ClientAuth.NONE).trustManager(InsecureTrustManagerFactory.INSTANCE)) + ) + ); + } + + return client; + } + @Override public void close() { diff --git a/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java b/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java new file mode 100644 index 0000000000000..ac7687d551766 --- /dev/null +++ b/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java @@ -0,0 +1,595 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.http.reactor.netty4.ssl; + +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.common.network.NetworkAddress; +import org.opensearch.common.network.NetworkService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.MockBigArrays; +import org.opensearch.common.util.MockPageCacheRecycler; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.core.common.bytes.BytesArray; +import org.opensearch.core.common.transport.TransportAddress; +import org.opensearch.core.common.unit.ByteSizeValue; +import org.opensearch.core.indices.breaker.NoneCircuitBreakerService; +import org.opensearch.http.BindHttpException; +import org.opensearch.http.CorsHandler; +import org.opensearch.http.HttpServerTransport; +import org.opensearch.http.HttpTransportSettings; +import org.opensearch.http.NullDispatcher; +import org.opensearch.http.reactor.netty4.ReactorHttpClient; +import org.opensearch.http.reactor.netty4.ReactorNetty4HttpServerTransport; +import org.opensearch.plugins.SecureHttpTransportSettingsProvider; +import org.opensearch.plugins.TransportExceptionHandler; +import org.opensearch.rest.BytesRestResponse; +import org.opensearch.rest.RestChannel; +import org.opensearch.rest.RestRequest; +import org.opensearch.telemetry.tracing.noop.NoopTracer; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.test.rest.FakeRestRequest; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.NettyAllocator; +import org.opensearch.transport.reactor.SharedGroupFactory; +import org.junit.After; +import org.junit.Before; + +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLException; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.ByteBufUtil; +import io.netty.buffer.PoolArenaMetric; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.buffer.PooledByteBufAllocatorMetric; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.http.DefaultFullHttpRequest; +import io.netty.handler.codec.http.DefaultHttpContent; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpContent; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpHeaderValues; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpUtil; +import io.netty.handler.codec.http.HttpVersion; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.ssl.util.InsecureTrustManagerFactory; + +import static org.opensearch.core.rest.RestStatus.OK; +import static org.opensearch.http.HttpTransportSettings.SETTING_CORS_ALLOW_ORIGIN; +import static org.opensearch.http.HttpTransportSettings.SETTING_CORS_ENABLED; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +/** + * Tests for the secure {@link ReactorNetty4HttpServerTransport} class. + */ +public class SecureReactorNetty4HttpServerTransportTests extends OpenSearchTestCase { + + private NetworkService networkService; + private ThreadPool threadPool; + private MockBigArrays bigArrays; + private ClusterSettings clusterSettings; + private SecureHttpTransportSettingsProvider secureHttpTransportSettingsProvider; + + @Before + public void setup() throws Exception { + networkService = new NetworkService(Collections.emptyList()); + threadPool = new TestThreadPool("test"); + bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); + clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + + secureHttpTransportSettingsProvider = new SecureHttpTransportSettingsProvider() { + @Override + public Optional buildHttpServerExceptionHandler(Settings settings, HttpServerTransport transport) { + return Optional.empty(); + } + + @Override + public Optional buildSecureHttpServerEngine(Settings settings, HttpServerTransport transport) throws SSLException { + try { + SSLEngine engine = SslContextBuilder.forServer( + SecureReactorNetty4HttpServerTransportTests.class.getResourceAsStream("/certificate.crt"), + SecureReactorNetty4HttpServerTransportTests.class.getResourceAsStream("/certificate.key") + ).trustManager(InsecureTrustManagerFactory.INSTANCE).build().newEngine(NettyAllocator.getAllocator()); + return Optional.of(engine); + } catch (final IOException ex) { + throw new SSLException(ex); + } + } + }; + } + + @After + public void shutdown() throws Exception { + if (threadPool != null) { + threadPool.shutdownNow(); + } + threadPool = null; + networkService = null; + bigArrays = null; + clusterSettings = null; + } + + /** + * Test that {@link ReactorNetty4HttpServerTransport} supports the "Expect: 100-continue" HTTP header + * @throws InterruptedException if the client communication with the server is interrupted + */ + public void testExpectContinueHeader() throws InterruptedException { + final Settings settings = createSettings(); + final int contentLength = randomIntBetween(1, HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH.get(settings).bytesAsInt()); + runExpectHeaderTest(settings, HttpHeaderValues.CONTINUE.toString(), contentLength, HttpResponseStatus.CONTINUE); + } + + /** + * Test that {@link ReactorNetty4HttpServerTransport} responds to a + * 100-continue expectation with too large a content-length + * with a 413 status. + * @throws InterruptedException if the client communication with the server is interrupted + */ + public void testExpectContinueHeaderContentLengthTooLong() throws InterruptedException { + final String key = HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH.getKey(); + final int maxContentLength = randomIntBetween(1, 104857600); + final Settings settings = createBuilderWithPort().put(key, maxContentLength + "b").build(); + final int contentLength = randomIntBetween(maxContentLength + 1, Integer.MAX_VALUE); + runExpectHeaderTest(settings, HttpHeaderValues.CONTINUE.toString(), contentLength, HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE); + } + + /** + * Test that {@link ReactorNetty4HttpServerTransport} responds to an unsupported expectation with a 417 status. + * @throws InterruptedException if the client communication with the server is interrupted + */ + public void testExpectUnsupportedExpectation() throws InterruptedException { + Settings settings = createSettings(); + runExpectHeaderTest(settings, "chocolate=yummy", 0, HttpResponseStatus.EXPECTATION_FAILED); + } + + private void runExpectHeaderTest( + final Settings settings, + final String expectation, + final int contentLength, + final HttpResponseStatus expectedStatus + ) throws InterruptedException { + final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() { + @Override + public void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) { + channel.sendResponse(new BytesRestResponse(OK, BytesRestResponse.TEXT_CONTENT_TYPE, new BytesArray("done"))); + } + + @Override + public void dispatchBadRequest(RestChannel channel, ThreadContext threadContext, Throwable cause) { + logger.error( + new ParameterizedMessage("--> Unexpected bad request [{}]", FakeRestRequest.requestToString(channel.request())), + cause + ); + throw new AssertionError(); + } + }; + try ( + ReactorNetty4HttpServerTransport transport = new ReactorNetty4HttpServerTransport( + settings, + networkService, + bigArrays, + threadPool, + xContentRegistry(), + dispatcher, + clusterSettings, + new SharedGroupFactory(settings), + secureHttpTransportSettingsProvider, + NoopTracer.INSTANCE + ) + ) { + transport.start(); + final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); + try (ReactorHttpClient client = ReactorHttpClient.https()) { + final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/"); + request.headers().set(HttpHeaderNames.EXPECT, expectation); + HttpUtil.setContentLength(request, contentLength); + + // Reactor Netty 4 does not expose 100 CONTINUE response but instead just asks for content + final HttpContent continuationRequest = new DefaultHttpContent(Unpooled.EMPTY_BUFFER); + final FullHttpResponse continuationResponse = client.send(remoteAddress.address(), request, continuationRequest); + try { + assertThat(continuationResponse.status(), is(HttpResponseStatus.OK)); + assertThat(new String(ByteBufUtil.getBytes(continuationResponse.content()), StandardCharsets.UTF_8), is("done")); + } finally { + continuationResponse.release(); + } + } + } + } + + public void testBindUnavailableAddress() { + Settings initialSettings = createSettings(); + try ( + ReactorNetty4HttpServerTransport transport = new ReactorNetty4HttpServerTransport( + initialSettings, + networkService, + bigArrays, + threadPool, + xContentRegistry(), + new NullDispatcher(), + clusterSettings, + new SharedGroupFactory(Settings.EMPTY), + secureHttpTransportSettingsProvider, + NoopTracer.INSTANCE + ) + ) { + transport.start(); + TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); + Settings settings = Settings.builder() + .put("http.port", remoteAddress.getPort()) + .put("network.host", remoteAddress.getAddress()) + .build(); + try ( + ReactorNetty4HttpServerTransport otherTransport = new ReactorNetty4HttpServerTransport( + settings, + networkService, + bigArrays, + threadPool, + xContentRegistry(), + new NullDispatcher(), + clusterSettings, + new SharedGroupFactory(settings), + secureHttpTransportSettingsProvider, + NoopTracer.INSTANCE + ) + ) { + BindHttpException bindHttpException = expectThrows(BindHttpException.class, otherTransport::start); + assertEquals("Failed to bind to " + NetworkAddress.format(remoteAddress.address()), bindHttpException.getMessage()); + } + } + } + + public void testBadRequest() throws InterruptedException { + final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() { + @Override + public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) { + logger.error("--> Unexpected successful request [{}]", FakeRestRequest.requestToString(request)); + throw new AssertionError(); + } + + @Override + public void dispatchBadRequest(final RestChannel channel, final ThreadContext threadContext, final Throwable cause) { + logger.error("--> Unexpected bad request request"); + throw new AssertionError(cause); + } + }; + + final Settings settings; + final int maxInitialLineLength; + final Setting httpMaxInitialLineLengthSetting = HttpTransportSettings.SETTING_HTTP_MAX_INITIAL_LINE_LENGTH; + if (randomBoolean()) { + maxInitialLineLength = httpMaxInitialLineLengthSetting.getDefault(Settings.EMPTY).bytesAsInt(); + settings = createSettings(); + } else { + maxInitialLineLength = randomIntBetween(1, 8192); + settings = createBuilderWithPort().put(httpMaxInitialLineLengthSetting.getKey(), maxInitialLineLength + "b").build(); + } + + try ( + ReactorNetty4HttpServerTransport transport = new ReactorNetty4HttpServerTransport( + settings, + networkService, + bigArrays, + threadPool, + xContentRegistry(), + dispatcher, + clusterSettings, + new SharedGroupFactory(settings), + secureHttpTransportSettingsProvider, + NoopTracer.INSTANCE + ) + ) { + transport.start(); + final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); + + try (ReactorHttpClient client = ReactorHttpClient.https()) { + final String url = "/" + randomAlphaOfLength(maxInitialLineLength); + final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url); + + final FullHttpResponse response = client.send(remoteAddress.address(), request); + try { + assertThat(response.status(), equalTo(HttpResponseStatus.REQUEST_URI_TOO_LONG)); + assertThat(response.content().array().length, equalTo(0)); + } finally { + response.release(); + } + } + } + } + + public void testDispatchFailed() throws InterruptedException { + final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() { + @Override + public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) { + throw new RuntimeException("Bad things happen"); + } + + @Override + public void dispatchBadRequest(final RestChannel channel, final ThreadContext threadContext, final Throwable cause) { + logger.error("--> Unexpected bad request request"); + throw new AssertionError(cause); + } + }; + + final Settings settings = createSettings(); + try ( + ReactorNetty4HttpServerTransport transport = new ReactorNetty4HttpServerTransport( + settings, + networkService, + bigArrays, + threadPool, + xContentRegistry(), + dispatcher, + clusterSettings, + new SharedGroupFactory(settings), + secureHttpTransportSettingsProvider, + NoopTracer.INSTANCE + ) + ) { + transport.start(); + final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); + + try (ReactorHttpClient client = ReactorHttpClient.https()) { + final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"); + + final FullHttpResponse response = client.send(remoteAddress.address(), request); + try { + assertThat(response.status(), equalTo(HttpResponseStatus.INTERNAL_SERVER_ERROR)); + assertThat(response.content().array().length, equalTo(0)); + } finally { + response.release(); + } + } + } + } + + public void testLargeCompressedResponse() throws InterruptedException { + final String responseString = randomAlphaOfLength(4 * 1024 * 1024); + final String url = "/thing/"; + final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() { + + @Override + public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) { + if (url.equals(request.uri())) { + channel.sendResponse(new BytesRestResponse(OK, responseString)); + } else { + logger.error("--> Unexpected successful uri [{}]", request.uri()); + throw new AssertionError(); + } + } + + @Override + public void dispatchBadRequest(final RestChannel channel, final ThreadContext threadContext, final Throwable cause) { + logger.error( + new ParameterizedMessage("--> Unexpected bad request [{}]", FakeRestRequest.requestToString(channel.request())), + cause + ); + throw new AssertionError(); + } + + }; + + try ( + ReactorNetty4HttpServerTransport transport = new ReactorNetty4HttpServerTransport( + Settings.EMPTY, + networkService, + bigArrays, + threadPool, + xContentRegistry(), + dispatcher, + clusterSettings, + new SharedGroupFactory(Settings.EMPTY), + secureHttpTransportSettingsProvider, + NoopTracer.INSTANCE + ) + ) { + transport.start(); + final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); + + try (ReactorHttpClient client = ReactorHttpClient.https()) { + DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url); + request.headers().add(HttpHeaderNames.ACCEPT_ENCODING, randomFrom("deflate", "gzip")); + long numOfHugeAllocations = getHugeAllocationCount(); + final FullHttpResponse response = client.send(remoteAddress.address(), request); + try { + assertThat(getHugeAllocationCount(), equalTo(numOfHugeAllocations)); + assertThat(response.status(), equalTo(HttpResponseStatus.OK)); + byte[] bytes = new byte[response.content().readableBytes()]; + response.content().readBytes(bytes); + assertThat(new String(bytes, StandardCharsets.UTF_8), equalTo(responseString)); + } finally { + response.release(); + } + } + } + } + + private long getHugeAllocationCount() { + long numOfHugAllocations = 0; + ByteBufAllocator allocator = NettyAllocator.getAllocator(); + assert allocator instanceof NettyAllocator.NoDirectBuffers; + ByteBufAllocator delegate = ((NettyAllocator.NoDirectBuffers) allocator).getDelegate(); + if (delegate instanceof PooledByteBufAllocator) { + PooledByteBufAllocatorMetric metric = ((PooledByteBufAllocator) delegate).metric(); + numOfHugAllocations = metric.heapArenas().stream().mapToLong(PoolArenaMetric::numHugeAllocations).sum(); + } + return numOfHugAllocations; + } + + public void testCorsRequest() throws InterruptedException { + final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() { + + @Override + public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) { + logger.error("--> Unexpected successful request [{}]", FakeRestRequest.requestToString(request)); + throw new AssertionError(); + } + + @Override + public void dispatchBadRequest(final RestChannel channel, final ThreadContext threadContext, final Throwable cause) { + logger.error( + new ParameterizedMessage("--> Unexpected bad request [{}]", FakeRestRequest.requestToString(channel.request())), + cause + ); + throw new AssertionError(); + } + + }; + + final Settings settings = createBuilderWithPort().put(SETTING_CORS_ENABLED.getKey(), true) + .put(SETTING_CORS_ALLOW_ORIGIN.getKey(), "test-cors.org") + .build(); + + try ( + ReactorNetty4HttpServerTransport transport = new ReactorNetty4HttpServerTransport( + settings, + networkService, + bigArrays, + threadPool, + xContentRegistry(), + dispatcher, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + new SharedGroupFactory(settings), + secureHttpTransportSettingsProvider, + NoopTracer.INSTANCE + ) + ) { + transport.start(); + final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); + + // Test pre-flight request + try (ReactorHttpClient client = ReactorHttpClient.https()) { + final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.OPTIONS, "/"); + request.headers().add(CorsHandler.ORIGIN, "test-cors.org"); + request.headers().add(CorsHandler.ACCESS_CONTROL_REQUEST_METHOD, "POST"); + + final FullHttpResponse response = client.send(remoteAddress.address(), request); + try { + assertThat(response.status(), equalTo(HttpResponseStatus.OK)); + assertThat(response.headers().get(CorsHandler.ACCESS_CONTROL_ALLOW_ORIGIN), equalTo("test-cors.org")); + assertThat(response.headers().get(CorsHandler.VARY), equalTo(CorsHandler.ORIGIN)); + assertTrue(response.headers().contains(CorsHandler.DATE)); + } finally { + response.release(); + } + } + + // Test short-circuited request + try (ReactorHttpClient client = ReactorHttpClient.https()) { + final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"); + request.headers().add(CorsHandler.ORIGIN, "google.com"); + + final FullHttpResponse response = client.send(remoteAddress.address(), request); + try { + assertThat(response.status(), equalTo(HttpResponseStatus.FORBIDDEN)); + } finally { + response.release(); + } + } + } + } + + public void testConnectTimeout() throws Exception { + final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() { + + @Override + public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) { + logger.error("--> Unexpected successful request [{}]", FakeRestRequest.requestToString(request)); + throw new AssertionError("Should not have received a dispatched request"); + } + + @Override + public void dispatchBadRequest(final RestChannel channel, final ThreadContext threadContext, final Throwable cause) { + logger.error( + new ParameterizedMessage("--> Unexpected bad request [{}]", FakeRestRequest.requestToString(channel.request())), + cause + ); + throw new AssertionError("Should not have received a dispatched request"); + } + + }; + + Settings settings = createBuilderWithPort().put( + HttpTransportSettings.SETTING_HTTP_CONNECT_TIMEOUT.getKey(), + new TimeValue(randomIntBetween(100, 300)) + ).build(); + + NioEventLoopGroup group = new NioEventLoopGroup(); + try ( + ReactorNetty4HttpServerTransport transport = new ReactorNetty4HttpServerTransport( + settings, + networkService, + bigArrays, + threadPool, + xContentRegistry(), + dispatcher, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + new SharedGroupFactory(settings), + secureHttpTransportSettingsProvider, + NoopTracer.INSTANCE + ) + ) { + transport.start(); + final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); + + final CountDownLatch channelClosedLatch = new CountDownLatch(1); + + final Bootstrap clientBootstrap = new Bootstrap().option(ChannelOption.ALLOCATOR, NettyAllocator.getAllocator()) + .channel(NioSocketChannel.class) + .handler(new ChannelInitializer() { + + @Override + protected void initChannel(SocketChannel ch) { + ch.pipeline().addLast(new ChannelHandlerAdapter() { + }); + + } + }) + .group(group); + ChannelFuture connect = clientBootstrap.connect(remoteAddress.address()); + connect.channel().closeFuture().addListener(future -> channelClosedLatch.countDown()); + + assertTrue("Channel should be closed due to read timeout", channelClosedLatch.await(1, TimeUnit.MINUTES)); + + } finally { + group.shutdownGracefully().await(); + } + } + + private Settings createSettings() { + return createBuilderWithPort().build(); + } + + private Settings.Builder createBuilderWithPort() { + return Settings.builder().put(HttpTransportSettings.SETTING_HTTP_PORT.getKey(), getPortRange()); + } +} diff --git a/plugins/transport-reactor-netty4/src/test/resources/README.txt b/plugins/transport-reactor-netty4/src/test/resources/README.txt new file mode 100644 index 0000000000000..a4353cee45a97 --- /dev/null +++ b/plugins/transport-reactor-netty4/src/test/resources/README.txt @@ -0,0 +1,14 @@ +#!/usr/bin/env bash +# +# This is README describes how the certificates in this directory were created. +# This file can also be executed as a script +# + +# 1. Create certificate key + +openssl req -x509 -sha256 -newkey rsa:2048 -keyout certificate.key -out certificate.crt -days 1024 -nodes + +# 2. Export the certificate in pkcs12 format + +openssl pkcs12 -export -in certificate.crt -inkey certificate.key -out server.p12 -name netty4-secure -password pass:password + diff --git a/plugins/transport-reactor-netty4/src/test/resources/certificate.crt b/plugins/transport-reactor-netty4/src/test/resources/certificate.crt new file mode 100644 index 0000000000000..54c78fdbcf6de --- /dev/null +++ b/plugins/transport-reactor-netty4/src/test/resources/certificate.crt @@ -0,0 +1,22 @@ +-----BEGIN CERTIFICATE----- +MIIDkzCCAnugAwIBAgIUddAawr5zygcd+Dcn9WVDpO4BJ7YwDQYJKoZIhvcNAQEL +BQAwWTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM +GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDESMBAGA1UEAwwJbG9jYWxob3N0MB4X +DTI0MDMxNDE5NDQzOVoXDTI3MDEwMjE5NDQzOVowWTELMAkGA1UEBhMCQVUxEzAR +BgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoMGEludGVybmV0IFdpZGdpdHMgUHR5 +IEx0ZDESMBAGA1UEAwwJbG9jYWxob3N0MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8A +MIIBCgKCAQEAzjOKkg6Iba5zfZ8b/RYw+PGmGEfbdGuuF10Wz4Jmx/Nk4VfDLxdh +TW8VllUL2JD7uPkjABj7pW3awAbvIJ+VGbKqfBr1Nsz0mPPzhT8cfuMH/FDZgQs3 +4HuqDKr0LfC1Kw5E3WF0GVMBDNu0U+nKoeqySeYjGdxDnd3W4cqK5AnUxL0RnIny +Bw7ZuhcU55XndH/Xauro/2EpvJduDsWMdqt7ZfIf1TOmaiQHK+82yb/drVaJbczK +uTpn1Kv2bnzkQEckgq+z1dLNOOyvP2xf+nsziw5ilJe92e5GJOUJYFAlEgUAGpfD +dv6j/gTRYvdJCJItOQEQtektNCAZsoc0wwIDAQABo1MwUTAdBgNVHQ4EFgQUzHts +wIt+zhB/R4U4Do2P6rr0YhkwHwYDVR0jBBgwFoAUzHtswIt+zhB/R4U4Do2P6rr0 +YhkwDwYDVR0TAQH/BAUwAwEB/zANBgkqhkiG9w0BAQsFAAOCAQEAveh870jJX7vt +oLCrdugsyo79pR4f7Nr1kUy3jJrfoaoUmrjiiiHWgT22fGwp7j1GZF2mVfo8YVaK +63YNn5gB2NNZhguPOFC4AdvHRYOKRBOaOvWK8oq7BcJ//18JYI/pPnpgkYvJjqv4 +gFKaZX9qWtujHpAmKiVGs7pwYGNXfixPHRNV4owcfHMIH5dhbbqT49j94xVpjbXs +OymKtFl4kpCE/0LzKFrFcuu55Am1VLBHx2cPpHLOipgUcF5BHFlQ8AXiCMOwfPAw +d22mLB6Gt1oVEpyvQHYd3e04FetEXQ9E8T+NKWZx/8Ucf+IWBYmZBRxch6O83xgk +bAbGzqkbzQ== +-----END CERTIFICATE----- diff --git a/plugins/transport-reactor-netty4/src/test/resources/certificate.key b/plugins/transport-reactor-netty4/src/test/resources/certificate.key new file mode 100644 index 0000000000000..228350180935d --- /dev/null +++ b/plugins/transport-reactor-netty4/src/test/resources/certificate.key @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQDOM4qSDohtrnN9 +nxv9FjD48aYYR9t0a64XXRbPgmbH82ThV8MvF2FNbxWWVQvYkPu4+SMAGPulbdrA +Bu8gn5UZsqp8GvU2zPSY8/OFPxx+4wf8UNmBCzfge6oMqvQt8LUrDkTdYXQZUwEM +27RT6cqh6rJJ5iMZ3EOd3dbhyorkCdTEvRGcifIHDtm6FxTnled0f9dq6uj/YSm8 +l24OxYx2q3tl8h/VM6ZqJAcr7zbJv92tVoltzMq5OmfUq/ZufORARySCr7PV0s04 +7K8/bF/6ezOLDmKUl73Z7kYk5QlgUCUSBQAal8N2/qP+BNFi90kIki05ARC16S00 +IBmyhzTDAgMBAAECggEAVOdiElvLjyX6xeoC00YU6hxOIMdNtHU2HMamwtDV01UD +38mMQ9KjrQelYt4n34drLrHe2IZw75/5J4JzagJrmUY47psHBwaDXItuZRokeJaw +zhLYTEs7OcKRtV+a5WOspUrdzi33aQoFb67zZG3qkpsZyFXrdBV+/fy/Iv+MCvLH +xR0jQ5mzE3cw20R7S4nddChBA/y8oKGOo6QRf2SznC1jL/+yolHvJPEn1v8AUxYm +BMPHxj1O0c4M4IxnJQ3Y5Jy9OaFMyMsFlF1hVhc/3LDDxDyOuBsVsFDicojyrRea +GKngIke0yezy7Wo4NUcp8YQhafonpWVsSJJdOUotcQKBgQD0rihFBXVtcG1d/Vy7 +FvLHrmccD56JNV744LSn2CDM7W1IulNbDUZINdCFqL91u5LpxozeE1FPY1nhwncJ +N7V7XYCaSLCuV1YJzRmUCjnzk2RyopGpzWog3f9uUFGgrk1HGbNAv99k/REya6Iu +IRSkuQhaJOj3bRXzonh0K4GjewKBgQDXvamtCioOUMSP8vq919YMkBw7F+z/fr0p +pamO8HL9eewAUg6N92JQ9kobSo/GptdmdHIjs8LqnS5C3H13GX5Qlf5GskOlCpla +V55ElaSp0gvKwWE168U7gQH4etPQAXXJrOGFaGbPj9W81hTUud7HVE88KYdfWTBo +I7TuE25tWQKBgBRjcr2Vn9xXsvVTCGgamG5lLPhcoNREGz7X0pXt34XT/vhBdnKu +331i5pZMom+YCrzqK5DRwUPBPpseTjb5amj2OKIijn5ojqXQbmI0m/GdBZC71TF2 +CXLlrMQvcy3VeGEFVjd+BYpvwAAYkfIQFZ1IQdbpHnSHpX2guzLK8UmDAoGBANUy +PIcf0EetUVHfkCIjNQfdMcjD8BTcLhsF9vWmcDxFTA9VB8ULf0D64mjt2f85yQsa +b+EQN8KZ6alxMxuLOeRxFYLPj0F9o+Y/R8wHBV48kCKhz2r1v0b6SfQ/jSm1B61x +BrxLW64qOdIOzS8bLyhUDKkrcPesr8V548aRtUKhAoGBAKlNJFd8BCGKD9Td+3dE +oP1iHTX5XZ+cQIqL0e+GMQlK4HnQP566DFZU5/GHNNAfmyxd5iSRwhTqPMHRAmOb +pqQwsyufx0dFeIBxeSO3Z6jW5h2sl4nBipZpw9bzv6EBL1xRr0SfMNZzdnf4JFzc +0htGo/VO93Z2pv8w7uGUz1nN +-----END PRIVATE KEY-----