diff --git a/CHANGELOG.md b/CHANGELOG.md
index 4273409f1b645..acd9c32d15fa5 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -21,6 +21,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
 - [Tiered Caching] Add dimension-based stats to ICache implementations. ([#12531](https://github.com/opensearch-project/OpenSearch/pull/12531))
 - Add changes for overriding remote store and replication settings during snapshot restore. ([#11868](https://github.com/opensearch-project/OpenSearch/pull/11868))
 - Add an individual setting of rate limiter for segment replication ([#12959](https://github.com/opensearch-project/OpenSearch/pull/12959))
+- [Streaming Indexing] Ensure support of the new transport by security plugin ([#13174](https://github.com/opensearch-project/OpenSearch/pull/13174))
 
 ### Dependencies
 - Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896))
diff --git a/modules/transport-netty4/src/test/java/org/opensearch/http/netty4/Netty4HttpClient.java b/modules/transport-netty4/src/test/java/org/opensearch/http/netty4/Netty4HttpClient.java
index 7cc1a47a5d2a4..ef6b67ea44299 100644
--- a/modules/transport-netty4/src/test/java/org/opensearch/http/netty4/Netty4HttpClient.java
+++ b/modules/transport-netty4/src/test/java/org/opensearch/http/netty4/Netty4HttpClient.java
@@ -38,7 +38,6 @@
 import org.opensearch.core.common.unit.ByteSizeValue;
 import org.opensearch.tasks.Task;
 import org.opensearch.transport.NettyAllocator;
-import org.opensearch.transport.netty4.ssl.TrustAllManager;
 
 import java.io.Closeable;
 import java.net.SocketAddress;
@@ -90,6 +89,7 @@
 import io.netty.handler.ssl.ClientAuth;
 import io.netty.handler.ssl.SslContextBuilder;
 import io.netty.handler.ssl.SslHandler;
+import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
 import io.netty.util.AttributeKey;
 
 import static io.netty.handler.codec.http.HttpHeaderNames.HOST;
@@ -270,7 +270,7 @@ protected void initChannel(SocketChannel ch) throws Exception {
                 final SslHandler sslHandler = new SslHandler(
                     SslContextBuilder.forClient()
                         .clientAuth(ClientAuth.NONE)
-                        .trustManager(TrustAllManager.INSTANCE)
+                        .trustManager(InsecureTrustManagerFactory.INSTANCE)
                         .build()
                         .newEngine(ch.alloc())
                 );
diff --git a/modules/transport-netty4/src/test/java/org/opensearch/http/netty4/ssl/SecureNetty4HttpServerTransportTests.java b/modules/transport-netty4/src/test/java/org/opensearch/http/netty4/ssl/SecureNetty4HttpServerTransportTests.java
index e79a066ad8f63..f80ad901ce765 100644
--- a/modules/transport-netty4/src/test/java/org/opensearch/http/netty4/ssl/SecureNetty4HttpServerTransportTests.java
+++ b/modules/transport-netty4/src/test/java/org/opensearch/http/netty4/ssl/SecureNetty4HttpServerTransportTests.java
@@ -41,7 +41,6 @@
 import org.opensearch.threadpool.ThreadPool;
 import org.opensearch.transport.NettyAllocator;
 import org.opensearch.transport.SharedGroupFactory;
-import org.opensearch.transport.netty4.ssl.TrustAllManager;
 import org.junit.After;
 import org.junit.Before;
 
@@ -84,6 +83,7 @@
 import io.netty.handler.codec.http.HttpUtil;
 import io.netty.handler.codec.http.HttpVersion;
 import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
 
 import static org.opensearch.core.rest.RestStatus.BAD_REQUEST;
 import static org.opensearch.core.rest.RestStatus.OK;
@@ -131,7 +131,7 @@ public Optional<SSLEngine> buildSecureHttpServerEngine(Settings settings, HttpSe
                     keyManagerFactory.init(keyStore, "password".toCharArray());
 
                     SSLEngine engine = SslContextBuilder.forServer(keyManagerFactory)
-                        .trustManager(TrustAllManager.INSTANCE)
+                        .trustManager(InsecureTrustManagerFactory.INSTANCE)
                         .build()
                         .newEngine(NettyAllocator.getAllocator());
                     return Optional.of(engine);
diff --git a/modules/transport-netty4/src/test/java/org/opensearch/transport/netty4/ssl/SimpleSecureNetty4TransportTests.java b/modules/transport-netty4/src/test/java/org/opensearch/transport/netty4/ssl/SimpleSecureNetty4TransportTests.java
index df3b005f40903..e0600aebd90e5 100644
--- a/modules/transport-netty4/src/test/java/org/opensearch/transport/netty4/ssl/SimpleSecureNetty4TransportTests.java
+++ b/modules/transport-netty4/src/test/java/org/opensearch/transport/netty4/ssl/SimpleSecureNetty4TransportTests.java
@@ -55,6 +55,7 @@
 
 import io.netty.handler.ssl.ClientAuth;
 import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
 
 import static java.util.Collections.emptyMap;
 import static java.util.Collections.emptySet;
@@ -87,7 +88,7 @@ public Optional<SSLEngine> buildSecureServerTransportEngine(Settings settings, T
 
                     SSLEngine engine = SslContextBuilder.forServer(keyManagerFactory)
                         .clientAuth(ClientAuth.NONE)
-                        .trustManager(TrustAllManager.INSTANCE)
+                        .trustManager(InsecureTrustManagerFactory.INSTANCE)
                         .build()
                         .newEngine(NettyAllocator.getAllocator());
                     return Optional.of(engine);
@@ -103,7 +104,7 @@ public Optional<SSLEngine> buildSecureClientTransportEngine(Settings settings, S
                 return Optional.of(
                     SslContextBuilder.forClient()
                         .clientAuth(ClientAuth.NONE)
-                        .trustManager(TrustAllManager.INSTANCE)
+                        .trustManager(InsecureTrustManagerFactory.INSTANCE)
                         .build()
                         .newEngine(NettyAllocator.getAllocator())
                 );
diff --git a/modules/transport-netty4/src/test/java/org/opensearch/transport/netty4/ssl/TrustAllManager.java b/modules/transport-netty4/src/test/java/org/opensearch/transport/netty4/ssl/TrustAllManager.java
deleted file mode 100644
index a38c542b5780e..0000000000000
--- a/modules/transport-netty4/src/test/java/org/opensearch/transport/netty4/ssl/TrustAllManager.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * SPDX-License-Identifier: Apache-2.0
- *
- * The OpenSearch Contributors require contributions made to
- * this file be licensed under the Apache-2.0 license or a
- * compatible open source license.
- */
-
-package org.opensearch.transport.netty4.ssl;
-
-import javax.net.ssl.X509TrustManager;
-
-import java.security.cert.CertificateException;
-import java.security.cert.X509Certificate;
-
-public class TrustAllManager implements X509TrustManager {
-    public static final X509TrustManager INSTANCE = new TrustAllManager();
-
-    private TrustAllManager() {}
-
-    public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException {}
-
-    public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException {}
-
-    public X509Certificate[] getAcceptedIssuers() {
-        return new X509Certificate[0];
-    }
-}
diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java
index d4a5a9ad83af6..bd1646d753016 100644
--- a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java
+++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java
@@ -8,6 +8,7 @@
 
 package org.opensearch.http.reactor.netty4;
 
+import org.opensearch.common.Nullable;
 import org.opensearch.common.network.NetworkService;
 import org.opensearch.common.settings.ClusterSettings;
 import org.opensearch.common.settings.Setting;
@@ -23,21 +24,33 @@
 import org.opensearch.http.HttpChannel;
 import org.opensearch.http.HttpReadTimeoutException;
 import org.opensearch.http.HttpServerChannel;
+import org.opensearch.http.reactor.netty4.ssl.SslUtils;
+import org.opensearch.plugins.SecureHttpTransportSettingsProvider;
 import org.opensearch.telemetry.tracing.Tracer;
 import org.opensearch.threadpool.ThreadPool;
 import org.opensearch.transport.reactor.SharedGroupFactory;
 import org.opensearch.transport.reactor.netty4.Netty4Utils;
 
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.SSLSessionContext;
+
 import java.net.InetSocketAddress;
 import java.net.SocketOption;
 import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
 
+import io.netty.buffer.ByteBufAllocator;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.socket.nio.NioChannelOption;
 import io.netty.handler.codec.http.DefaultLastHttpContent;
 import io.netty.handler.codec.http.FullHttpResponse;
 import io.netty.handler.codec.http.HttpContent;
+import io.netty.handler.ssl.ApplicationProtocolNegotiator;
+import io.netty.handler.ssl.SslContext;
 import io.netty.handler.timeout.ReadTimeoutException;
+import io.netty.util.ReferenceCountUtil;
 import org.reactivestreams.Publisher;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Scheduler;
@@ -116,6 +129,7 @@ public class ReactorNetty4HttpServerTransport extends AbstractHttpServerTranspor
     private final ByteSizeValue maxInitialLineLength;
     private final ByteSizeValue maxHeaderSize;
     private final ByteSizeValue maxChunkSize;
+    private final SecureHttpTransportSettingsProvider secureHttpTransportSettingsProvider;
     private volatile SharedGroupFactory.SharedGroup sharedGroup;
     private volatile DisposableServer disposableServer;
     private volatile Scheduler scheduler;
@@ -142,6 +156,45 @@ public ReactorNetty4HttpServerTransport(
         ClusterSettings clusterSettings,
         SharedGroupFactory sharedGroupFactory,
         Tracer tracer
+    ) {
+        this(
+            settings,
+            networkService,
+            bigArrays,
+            threadPool,
+            xContentRegistry,
+            dispatcher,
+            clusterSettings,
+            sharedGroupFactory,
+            null,
+            tracer
+        );
+    }
+
+    /**
+     * Creates new HTTP transport implementations based on Reactor Netty (see please {@link HttpServer}).
+     * @param settings settings
+     * @param networkService network service
+     * @param bigArrays big array allocator
+     * @param threadPool thread pool instance
+     * @param xContentRegistry XContent registry instance
+     * @param dispatcher dispatcher instance
+     * @param clusterSettings cluster settings
+     * @param sharedGroupFactory shared group factory
+     * @param secureHttpTransportSettingsProvider secure HTTP transport settings provider
+     * @param tracer tracer instance
+     */
+    public ReactorNetty4HttpServerTransport(
+        Settings settings,
+        NetworkService networkService,
+        BigArrays bigArrays,
+        ThreadPool threadPool,
+        NamedXContentRegistry xContentRegistry,
+        Dispatcher dispatcher,
+        ClusterSettings clusterSettings,
+        SharedGroupFactory sharedGroupFactory,
+        @Nullable SecureHttpTransportSettingsProvider secureHttpTransportSettingsProvider,
+        Tracer tracer
     ) {
         super(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher, clusterSettings, tracer);
         Netty4Utils.setAvailableProcessors(OpenSearchExecutors.NODE_PROCESSORS_SETTING.get(settings));
@@ -152,6 +205,7 @@ public ReactorNetty4HttpServerTransport(
         this.maxChunkSize = SETTING_HTTP_MAX_CHUNK_SIZE.get(settings);
         this.maxHeaderSize = SETTING_HTTP_MAX_HEADER_SIZE.get(settings);
         this.maxInitialLineLength = SETTING_HTTP_MAX_INITIAL_LINE_LENGTH.get(settings);
+        this.secureHttpTransportSettingsProvider = secureHttpTransportSettingsProvider;
     }
 
     /**
@@ -160,7 +214,7 @@ public ReactorNetty4HttpServerTransport(
      */
     @Override
     protected HttpServerChannel bind(InetSocketAddress socketAddress) throws Exception {
-        final HttpServer server = configureChannelOptions(
+        final HttpServer server = configure(
             HttpServer.create()
                 .httpFormDecoder(builder -> builder.scheduler(scheduler))
                 .idleTimeout(Duration.ofMillis(connectTimeoutMillis))
@@ -173,7 +227,6 @@ protected HttpServerChannel bind(InetSocketAddress socketAddress) throws Excepti
                         .maxHeaderSize(maxHeaderSize.bytesAsInt())
                         .maxInitialLineLength(maxInitialLineLength.bytesAsInt())
                 )
-                .protocol(HttpProtocol.HTTP11, HttpProtocol.H2C)
                 .handle((req, res) -> incomingRequest(req, res))
         );
 
@@ -181,8 +234,8 @@ protected HttpServerChannel bind(InetSocketAddress socketAddress) throws Excepti
         return new ReactorNetty4HttpServerChannel(disposableServer.channel());
     }
 
-    private HttpServer configureChannelOptions(final HttpServer server1) {
-        HttpServer configured = server1.childOption(ChannelOption.TCP_NODELAY, SETTING_HTTP_TCP_NO_DELAY.get(settings))
+    private HttpServer configure(final HttpServer server) throws Exception {
+        HttpServer configured = server.childOption(ChannelOption.TCP_NODELAY, SETTING_HTTP_TCP_NO_DELAY.get(settings))
             .childOption(ChannelOption.SO_KEEPALIVE, SETTING_HTTP_TCP_KEEP_ALIVE.get(settings));
 
         if (SETTING_HTTP_TCP_KEEP_ALIVE.get(settings)) {
@@ -229,6 +282,65 @@ private HttpServer configureChannelOptions(final HttpServer server1) {
         configured = configured.option(ChannelOption.SO_REUSEADDR, reuseAddress);
         configured = configured.childOption(ChannelOption.SO_REUSEADDR, reuseAddress);
 
+        // Configure SSL context if available
+        if (secureHttpTransportSettingsProvider != null) {
+            final SSLEngine engine = secureHttpTransportSettingsProvider.buildSecureHttpServerEngine(settings, this)
+                .orElseGet(SslUtils::createDefaultServerSSLEngine);
+
+            try {
+                final List<String> cipherSuites = Arrays.asList(engine.getEnabledCipherSuites());
+                final List<String> applicationProtocols = Arrays.asList(engine.getSSLParameters().getApplicationProtocols());
+
+                configured = configured.secure(spec -> spec.sslContext(new SslContext() {
+                    @Override
+                    public SSLSessionContext sessionContext() {
+                        throw new UnsupportedOperationException(); /* server only, should never be called */
+                    }
+
+                    @Override
+                    public SSLEngine newEngine(ByteBufAllocator alloc, String peerHost, int peerPort) {
+                        throw new UnsupportedOperationException(); /* server only, should never be called */
+                    }
+
+                    @Override
+                    public SSLEngine newEngine(ByteBufAllocator alloc) {
+                        try {
+                            return secureHttpTransportSettingsProvider.buildSecureHttpServerEngine(
+                                settings,
+                                ReactorNetty4HttpServerTransport.this
+                            ).orElseGet(SslUtils::createDefaultServerSSLEngine);
+                        } catch (final SSLException ex) {
+                            throw new UnsupportedOperationException("Unable to create SSLEngine", ex);
+                        }
+                    }
+
+                    @Override
+                    public boolean isClient() {
+                        return false; /* server only */
+                    }
+
+                    @Override
+                    public List<String> cipherSuites() {
+                        return cipherSuites;
+                    }
+
+                    @Override
+                    public ApplicationProtocolNegotiator applicationProtocolNegotiator() {
+                        return new ApplicationProtocolNegotiator() {
+                            @Override
+                            public List<String> protocols() {
+                                return applicationProtocols;
+                            }
+                        };
+                    }
+                }).build()).protocol(HttpProtocol.HTTP11, HttpProtocol.H2);
+            } finally {
+                ReferenceCountUtil.release(engine);
+            }
+        } else {
+            configured = configured.protocol(HttpProtocol.HTTP11, HttpProtocol.H2C);
+        }
+
         return configured;
     }
 
@@ -302,6 +414,11 @@ protected void doStart() {
         }
     }
 
+    /**
+     * Exception handler
+     * @param channel HTTP channel
+     * @param cause exception occurred
+     */
     @Override
     public void onException(HttpChannel channel, Exception cause) {
         if (cause instanceof ReadTimeoutException) {
diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ssl/SslUtils.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ssl/SslUtils.java
new file mode 100644
index 0000000000000..a0b4319c7c2d1
--- /dev/null
+++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ssl/SslUtils.java
@@ -0,0 +1,42 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ *
+ * Modifications Copyright OpenSearch Contributors. See
+ * GitHub history for details.
+ */
+package org.opensearch.http.reactor.netty4.ssl;
+
+import org.opensearch.OpenSearchSecurityException;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+
+import java.security.NoSuchAlgorithmException;
+
+/**
+ * Helper class for creating default SSL engines
+ */
+public class SslUtils {
+    private static final String[] DEFAULT_SSL_PROTOCOLS = { "TLSv1.3", "TLSv1.2", "TLSv1.1" };
+
+    private SslUtils() {}
+
+    /**
+     * Creates default server {@link SSLEngine} instance
+     * @return default server {@link SSLEngine} instance
+     */
+    public static SSLEngine createDefaultServerSSLEngine() {
+        try {
+            final SSLEngine engine = SSLContext.getDefault().createSSLEngine();
+            engine.setEnabledProtocols(DEFAULT_SSL_PROTOCOLS);
+            engine.setUseClientMode(false);
+            return engine;
+        } catch (final NoSuchAlgorithmException ex) {
+            throw new OpenSearchSecurityException("Unable to initialize default server SSL engine", ex);
+        }
+    }
+}
diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ssl/package-info.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ssl/package-info.java
new file mode 100644
index 0000000000000..95dbd2d6bd9ca
--- /dev/null
+++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ssl/package-info.java
@@ -0,0 +1,12 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+/**
+ * SSL supporting utility classes
+ */
+package org.opensearch.http.reactor.netty4.ssl;
diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/transport/reactor/ReactorNetty4Plugin.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/transport/reactor/ReactorNetty4Plugin.java
index dc310c3793109..6e5b0215b58a4 100644
--- a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/transport/reactor/ReactorNetty4Plugin.java
+++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/transport/reactor/ReactorNetty4Plugin.java
@@ -17,9 +17,11 @@
 import org.opensearch.core.indices.breaker.CircuitBreakerService;
 import org.opensearch.core.xcontent.NamedXContentRegistry;
 import org.opensearch.http.HttpServerTransport;
+import org.opensearch.http.HttpServerTransport.Dispatcher;
 import org.opensearch.http.reactor.netty4.ReactorNetty4HttpServerTransport;
 import org.opensearch.plugins.NetworkPlugin;
 import org.opensearch.plugins.Plugin;
+import org.opensearch.plugins.SecureHttpTransportSettingsProvider;
 import org.opensearch.telemetry.tracing.Tracer;
 import org.opensearch.threadpool.ThreadPool;
 
@@ -38,6 +40,11 @@ public class ReactorNetty4Plugin extends Plugin implements NetworkPlugin {
      */
     public static final String REACTOR_NETTY_HTTP_TRANSPORT_NAME = "reactor-netty4";
 
+    /**
+     * The name of new experimental secure HTTP transport implementations based on Reactor Netty.
+     */
+    public static final String REACTOR_NETTY_SECURE_HTTP_TRANSPORT_NAME = "reactor-netty4-secure";
+
     private final SetOnce<SharedGroupFactory> groupFactory = new SetOnce<>();
 
     /**
@@ -91,6 +98,53 @@ public Map<String, Supplier<HttpServerTransport>> getHttpTransports(
                 dispatcher,
                 clusterSettings,
                 getSharedGroupFactory(settings),
+                null, /* no security settings provider */
+                tracer
+            )
+        );
+    }
+
+    /**
+     * Returns a map of {@link HttpServerTransport} suppliers.
+     * See {@link org.opensearch.common.network.NetworkModule#HTTP_TYPE_SETTING} to configure a specific implementation.
+     * @param settings settings
+     * @param networkService network service
+     * @param bigArrays big array allocator
+     * @param pageCacheRecycler page cache recycler instance
+     * @param circuitBreakerService circuit breaker service instance
+     * @param threadPool thread pool instance
+     * @param xContentRegistry XContent registry instance
+     * @param dispatcher dispatcher instance
+     * @param clusterSettings cluster settings
+     * @param secureHttpTransportSettingsProvider secure HTTP transport settings provider
+     * @param tracer tracer instance
+     */
+    @Override
+    public Map<String, Supplier<HttpServerTransport>> getSecureHttpTransports(
+        Settings settings,
+        ThreadPool threadPool,
+        BigArrays bigArrays,
+        PageCacheRecycler pageCacheRecycler,
+        CircuitBreakerService circuitBreakerService,
+        NamedXContentRegistry xContentRegistry,
+        NetworkService networkService,
+        Dispatcher dispatcher,
+        ClusterSettings clusterSettings,
+        SecureHttpTransportSettingsProvider secureHttpTransportSettingsProvider,
+        Tracer tracer
+    ) {
+        return Collections.singletonMap(
+            REACTOR_NETTY_SECURE_HTTP_TRANSPORT_NAME,
+            () -> new ReactorNetty4HttpServerTransport(
+                settings,
+                networkService,
+                bigArrays,
+                threadPool,
+                xContentRegistry,
+                dispatcher,
+                clusterSettings,
+                getSharedGroupFactory(settings),
+                secureHttpTransportSettingsProvider,
                 tracer
             )
         );
diff --git a/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorHttpClient.java b/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorHttpClient.java
index 443ecd0f40ead..920c895205023 100644
--- a/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorHttpClient.java
+++ b/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorHttpClient.java
@@ -15,6 +15,7 @@
 
 import org.opensearch.common.collect.Tuple;
 import org.opensearch.tasks.Task;
+import org.opensearch.test.OpenSearchTestCase;
 
 import java.io.Closeable;
 import java.net.InetSocketAddress;
@@ -38,10 +39,14 @@
 import io.netty.handler.codec.http.HttpResponse;
 import io.netty.handler.codec.http.HttpVersion;
 import io.netty.handler.codec.http2.HttpConversionUtil;
+import io.netty.handler.ssl.ClientAuth;
+import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
 import io.netty.resolver.DefaultAddressResolverGroup;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.publisher.ParallelFlux;
+import reactor.netty.http.Http11SslContextSpec;
+import reactor.netty.http.Http2SslContextSpec;
 import reactor.netty.http.client.HttpClient;
 
 import static io.netty.handler.codec.http.HttpHeaderNames.HOST;
@@ -50,8 +55,9 @@
 /**
  * Tiny helper to send http requests over netty.
  */
-class ReactorHttpClient implements Closeable {
+public class ReactorHttpClient implements Closeable {
     private final boolean compression;
+    private final boolean secure;
 
     static Collection<String> returnHttpResponseBodies(Collection<FullHttpResponse> responses) {
         List<String> list = new ArrayList<>(responses.size());
@@ -69,16 +75,21 @@ static Collection<String> returnOpaqueIds(Collection<FullHttpResponse> responses
         return list;
     }
 
-    ReactorHttpClient(boolean compression) {
+    public ReactorHttpClient(boolean compression, boolean secure) {
         this.compression = compression;
+        this.secure = secure;
     }
 
-    static ReactorHttpClient create() {
+    public static ReactorHttpClient create() {
         return create(true);
     }
 
-    static ReactorHttpClient create(boolean compression) {
-        return new ReactorHttpClient(compression);
+    public static ReactorHttpClient create(boolean compression) {
+        return new ReactorHttpClient(compression, false);
+    }
+
+    public static ReactorHttpClient https() {
+        return new ReactorHttpClient(true, true);
     }
 
     public List<FullHttpResponse> get(InetSocketAddress remoteAddress, String... uris) throws InterruptedException {
@@ -92,7 +103,7 @@ public List<FullHttpResponse> get(InetSocketAddress remoteAddress, boolean order
             final FullHttpRequest httpRequest = new DefaultFullHttpRequest(HTTP_1_1, HttpMethod.GET, uris[i]);
             httpRequest.headers().add(HOST, "localhost");
             httpRequest.headers().add("X-Opaque-ID", String.valueOf(i));
-            httpRequest.headers().add(HttpConversionUtil.ExtensionHeaderNames.SCHEME.text(), "http");
+            httpRequest.headers().add(HttpConversionUtil.ExtensionHeaderNames.SCHEME.text(), secure ? "https" : "http");
             requests.add(httpRequest);
         }
 
@@ -148,7 +159,7 @@ private List<FullHttpResponse> processRequestsWithBody(
             request.headers().add(HttpHeaderNames.HOST, "localhost");
             request.headers().add(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());
             request.headers().add(HttpHeaderNames.CONTENT_TYPE, "application/json");
-            request.headers().add(HttpConversionUtil.ExtensionHeaderNames.SCHEME.text(), "http");
+            request.headers().add(HttpConversionUtil.ExtensionHeaderNames.SCHEME.text(), secure ? "https" : "http");
             request.headers().add("X-Opaque-ID", String.valueOf(i));
             requests.add(request);
         }
@@ -162,12 +173,7 @@ private List<FullHttpResponse> sendRequests(
     ) {
         final NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(1);
         try {
-            final HttpClient client = HttpClient.newConnection()
-                .resolver(DefaultAddressResolverGroup.INSTANCE)
-                .runOn(eventLoopGroup)
-                .host(remoteAddress.getHostString())
-                .port(remoteAddress.getPort())
-                .compress(compression);
+            final HttpClient client = createClient(remoteAddress, eventLoopGroup);
 
             @SuppressWarnings("unchecked")
             final Mono<FullHttpResponse>[] monos = requests.stream()
@@ -201,6 +207,29 @@ private List<FullHttpResponse> sendRequests(
         }
     }
 
+    private HttpClient createClient(final InetSocketAddress remoteAddress, final NioEventLoopGroup eventLoopGroup) {
+        final HttpClient client = HttpClient.newConnection()
+            .resolver(DefaultAddressResolverGroup.INSTANCE)
+            .runOn(eventLoopGroup)
+            .host(remoteAddress.getHostString())
+            .port(remoteAddress.getPort())
+            .compress(compression);
+
+        if (secure) {
+            return client.secure(
+                spec -> spec.sslContext(
+                    OpenSearchTestCase.randomBoolean()
+                        /* switch between HTTP 1.1/HTTP 2 randomly, both are supported */ ? Http11SslContextSpec.forClient()
+                            .configure(s -> s.clientAuth(ClientAuth.NONE).trustManager(InsecureTrustManagerFactory.INSTANCE))
+                        : Http2SslContextSpec.forClient()
+                            .configure(s -> s.clientAuth(ClientAuth.NONE).trustManager(InsecureTrustManagerFactory.INSTANCE))
+                )
+            );
+        }
+
+        return client;
+    }
+
     @Override
     public void close() {
 
diff --git a/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java b/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java
new file mode 100644
index 0000000000000..ac7687d551766
--- /dev/null
+++ b/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java
@@ -0,0 +1,595 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.http.reactor.netty4.ssl;
+
+import org.apache.logging.log4j.message.ParameterizedMessage;
+import org.opensearch.common.network.NetworkAddress;
+import org.opensearch.common.network.NetworkService;
+import org.opensearch.common.settings.ClusterSettings;
+import org.opensearch.common.settings.Setting;
+import org.opensearch.common.settings.Settings;
+import org.opensearch.common.unit.TimeValue;
+import org.opensearch.common.util.MockBigArrays;
+import org.opensearch.common.util.MockPageCacheRecycler;
+import org.opensearch.common.util.concurrent.ThreadContext;
+import org.opensearch.core.common.bytes.BytesArray;
+import org.opensearch.core.common.transport.TransportAddress;
+import org.opensearch.core.common.unit.ByteSizeValue;
+import org.opensearch.core.indices.breaker.NoneCircuitBreakerService;
+import org.opensearch.http.BindHttpException;
+import org.opensearch.http.CorsHandler;
+import org.opensearch.http.HttpServerTransport;
+import org.opensearch.http.HttpTransportSettings;
+import org.opensearch.http.NullDispatcher;
+import org.opensearch.http.reactor.netty4.ReactorHttpClient;
+import org.opensearch.http.reactor.netty4.ReactorNetty4HttpServerTransport;
+import org.opensearch.plugins.SecureHttpTransportSettingsProvider;
+import org.opensearch.plugins.TransportExceptionHandler;
+import org.opensearch.rest.BytesRestResponse;
+import org.opensearch.rest.RestChannel;
+import org.opensearch.rest.RestRequest;
+import org.opensearch.telemetry.tracing.noop.NoopTracer;
+import org.opensearch.test.OpenSearchTestCase;
+import org.opensearch.test.rest.FakeRestRequest;
+import org.opensearch.threadpool.TestThreadPool;
+import org.opensearch.threadpool.ThreadPool;
+import org.opensearch.transport.NettyAllocator;
+import org.opensearch.transport.reactor.SharedGroupFactory;
+import org.junit.After;
+import org.junit.Before;
+
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLException;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.PoolArenaMetric;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.PooledByteBufAllocatorMetric;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.http.DefaultFullHttpRequest;
+import io.netty.handler.codec.http.DefaultHttpContent;
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpContent;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpHeaderValues;
+import io.netty.handler.codec.http.HttpMethod;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpUtil;
+import io.netty.handler.codec.http.HttpVersion;
+import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
+
+import static org.opensearch.core.rest.RestStatus.OK;
+import static org.opensearch.http.HttpTransportSettings.SETTING_CORS_ALLOW_ORIGIN;
+import static org.opensearch.http.HttpTransportSettings.SETTING_CORS_ENABLED;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+
+/**
+ * Tests for the secure {@link ReactorNetty4HttpServerTransport} class.
+ */
+public class SecureReactorNetty4HttpServerTransportTests extends OpenSearchTestCase {
+
+    private NetworkService networkService;
+    private ThreadPool threadPool;
+    private MockBigArrays bigArrays;
+    private ClusterSettings clusterSettings;
+    private SecureHttpTransportSettingsProvider secureHttpTransportSettingsProvider;
+
+    @Before
+    public void setup() throws Exception {
+        networkService = new NetworkService(Collections.emptyList());
+        threadPool = new TestThreadPool("test");
+        bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
+        clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
+
+        secureHttpTransportSettingsProvider = new SecureHttpTransportSettingsProvider() {
+            @Override
+            public Optional<TransportExceptionHandler> buildHttpServerExceptionHandler(Settings settings, HttpServerTransport transport) {
+                return Optional.empty();
+            }
+
+            @Override
+            public Optional<SSLEngine> buildSecureHttpServerEngine(Settings settings, HttpServerTransport transport) throws SSLException {
+                try {
+                    SSLEngine engine = SslContextBuilder.forServer(
+                        SecureReactorNetty4HttpServerTransportTests.class.getResourceAsStream("/certificate.crt"),
+                        SecureReactorNetty4HttpServerTransportTests.class.getResourceAsStream("/certificate.key")
+                    ).trustManager(InsecureTrustManagerFactory.INSTANCE).build().newEngine(NettyAllocator.getAllocator());
+                    return Optional.of(engine);
+                } catch (final IOException ex) {
+                    throw new SSLException(ex);
+                }
+            }
+        };
+    }
+
+    @After
+    public void shutdown() throws Exception {
+        if (threadPool != null) {
+            threadPool.shutdownNow();
+        }
+        threadPool = null;
+        networkService = null;
+        bigArrays = null;
+        clusterSettings = null;
+    }
+
+    /**
+     * Test that {@link ReactorNetty4HttpServerTransport} supports the "Expect: 100-continue" HTTP header
+     * @throws InterruptedException if the client communication with the server is interrupted
+     */
+    public void testExpectContinueHeader() throws InterruptedException {
+        final Settings settings = createSettings();
+        final int contentLength = randomIntBetween(1, HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH.get(settings).bytesAsInt());
+        runExpectHeaderTest(settings, HttpHeaderValues.CONTINUE.toString(), contentLength, HttpResponseStatus.CONTINUE);
+    }
+
+    /**
+     * Test that {@link ReactorNetty4HttpServerTransport} responds to a
+     * 100-continue expectation with too large a content-length
+     * with a 413 status.
+     * @throws InterruptedException if the client communication with the server is interrupted
+     */
+    public void testExpectContinueHeaderContentLengthTooLong() throws InterruptedException {
+        final String key = HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH.getKey();
+        final int maxContentLength = randomIntBetween(1, 104857600);
+        final Settings settings = createBuilderWithPort().put(key, maxContentLength + "b").build();
+        final int contentLength = randomIntBetween(maxContentLength + 1, Integer.MAX_VALUE);
+        runExpectHeaderTest(settings, HttpHeaderValues.CONTINUE.toString(), contentLength, HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE);
+    }
+
+    /**
+     * Test that {@link ReactorNetty4HttpServerTransport} responds to an unsupported expectation with a 417 status.
+     * @throws InterruptedException if the client communication with the server is interrupted
+     */
+    public void testExpectUnsupportedExpectation() throws InterruptedException {
+        Settings settings = createSettings();
+        runExpectHeaderTest(settings, "chocolate=yummy", 0, HttpResponseStatus.EXPECTATION_FAILED);
+    }
+
+    private void runExpectHeaderTest(
+        final Settings settings,
+        final String expectation,
+        final int contentLength,
+        final HttpResponseStatus expectedStatus
+    ) throws InterruptedException {
+        final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() {
+            @Override
+            public void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) {
+                channel.sendResponse(new BytesRestResponse(OK, BytesRestResponse.TEXT_CONTENT_TYPE, new BytesArray("done")));
+            }
+
+            @Override
+            public void dispatchBadRequest(RestChannel channel, ThreadContext threadContext, Throwable cause) {
+                logger.error(
+                    new ParameterizedMessage("--> Unexpected bad request [{}]", FakeRestRequest.requestToString(channel.request())),
+                    cause
+                );
+                throw new AssertionError();
+            }
+        };
+        try (
+            ReactorNetty4HttpServerTransport transport = new ReactorNetty4HttpServerTransport(
+                settings,
+                networkService,
+                bigArrays,
+                threadPool,
+                xContentRegistry(),
+                dispatcher,
+                clusterSettings,
+                new SharedGroupFactory(settings),
+                secureHttpTransportSettingsProvider,
+                NoopTracer.INSTANCE
+            )
+        ) {
+            transport.start();
+            final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses());
+            try (ReactorHttpClient client = ReactorHttpClient.https()) {
+                final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/");
+                request.headers().set(HttpHeaderNames.EXPECT, expectation);
+                HttpUtil.setContentLength(request, contentLength);
+
+                // Reactor Netty 4 does not expose 100 CONTINUE response but instead just asks for content
+                final HttpContent continuationRequest = new DefaultHttpContent(Unpooled.EMPTY_BUFFER);
+                final FullHttpResponse continuationResponse = client.send(remoteAddress.address(), request, continuationRequest);
+                try {
+                    assertThat(continuationResponse.status(), is(HttpResponseStatus.OK));
+                    assertThat(new String(ByteBufUtil.getBytes(continuationResponse.content()), StandardCharsets.UTF_8), is("done"));
+                } finally {
+                    continuationResponse.release();
+                }
+            }
+        }
+    }
+
+    public void testBindUnavailableAddress() {
+        Settings initialSettings = createSettings();
+        try (
+            ReactorNetty4HttpServerTransport transport = new ReactorNetty4HttpServerTransport(
+                initialSettings,
+                networkService,
+                bigArrays,
+                threadPool,
+                xContentRegistry(),
+                new NullDispatcher(),
+                clusterSettings,
+                new SharedGroupFactory(Settings.EMPTY),
+                secureHttpTransportSettingsProvider,
+                NoopTracer.INSTANCE
+            )
+        ) {
+            transport.start();
+            TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses());
+            Settings settings = Settings.builder()
+                .put("http.port", remoteAddress.getPort())
+                .put("network.host", remoteAddress.getAddress())
+                .build();
+            try (
+                ReactorNetty4HttpServerTransport otherTransport = new ReactorNetty4HttpServerTransport(
+                    settings,
+                    networkService,
+                    bigArrays,
+                    threadPool,
+                    xContentRegistry(),
+                    new NullDispatcher(),
+                    clusterSettings,
+                    new SharedGroupFactory(settings),
+                    secureHttpTransportSettingsProvider,
+                    NoopTracer.INSTANCE
+                )
+            ) {
+                BindHttpException bindHttpException = expectThrows(BindHttpException.class, otherTransport::start);
+                assertEquals("Failed to bind to " + NetworkAddress.format(remoteAddress.address()), bindHttpException.getMessage());
+            }
+        }
+    }
+
+    public void testBadRequest() throws InterruptedException {
+        final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() {
+            @Override
+            public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) {
+                logger.error("--> Unexpected successful request [{}]", FakeRestRequest.requestToString(request));
+                throw new AssertionError();
+            }
+
+            @Override
+            public void dispatchBadRequest(final RestChannel channel, final ThreadContext threadContext, final Throwable cause) {
+                logger.error("--> Unexpected bad request request");
+                throw new AssertionError(cause);
+            }
+        };
+
+        final Settings settings;
+        final int maxInitialLineLength;
+        final Setting<ByteSizeValue> httpMaxInitialLineLengthSetting = HttpTransportSettings.SETTING_HTTP_MAX_INITIAL_LINE_LENGTH;
+        if (randomBoolean()) {
+            maxInitialLineLength = httpMaxInitialLineLengthSetting.getDefault(Settings.EMPTY).bytesAsInt();
+            settings = createSettings();
+        } else {
+            maxInitialLineLength = randomIntBetween(1, 8192);
+            settings = createBuilderWithPort().put(httpMaxInitialLineLengthSetting.getKey(), maxInitialLineLength + "b").build();
+        }
+
+        try (
+            ReactorNetty4HttpServerTransport transport = new ReactorNetty4HttpServerTransport(
+                settings,
+                networkService,
+                bigArrays,
+                threadPool,
+                xContentRegistry(),
+                dispatcher,
+                clusterSettings,
+                new SharedGroupFactory(settings),
+                secureHttpTransportSettingsProvider,
+                NoopTracer.INSTANCE
+            )
+        ) {
+            transport.start();
+            final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses());
+
+            try (ReactorHttpClient client = ReactorHttpClient.https()) {
+                final String url = "/" + randomAlphaOfLength(maxInitialLineLength);
+                final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url);
+
+                final FullHttpResponse response = client.send(remoteAddress.address(), request);
+                try {
+                    assertThat(response.status(), equalTo(HttpResponseStatus.REQUEST_URI_TOO_LONG));
+                    assertThat(response.content().array().length, equalTo(0));
+                } finally {
+                    response.release();
+                }
+            }
+        }
+    }
+
+    public void testDispatchFailed() throws InterruptedException {
+        final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() {
+            @Override
+            public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) {
+                throw new RuntimeException("Bad things happen");
+            }
+
+            @Override
+            public void dispatchBadRequest(final RestChannel channel, final ThreadContext threadContext, final Throwable cause) {
+                logger.error("--> Unexpected bad request request");
+                throw new AssertionError(cause);
+            }
+        };
+
+        final Settings settings = createSettings();
+        try (
+            ReactorNetty4HttpServerTransport transport = new ReactorNetty4HttpServerTransport(
+                settings,
+                networkService,
+                bigArrays,
+                threadPool,
+                xContentRegistry(),
+                dispatcher,
+                clusterSettings,
+                new SharedGroupFactory(settings),
+                secureHttpTransportSettingsProvider,
+                NoopTracer.INSTANCE
+            )
+        ) {
+            transport.start();
+            final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses());
+
+            try (ReactorHttpClient client = ReactorHttpClient.https()) {
+                final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/");
+
+                final FullHttpResponse response = client.send(remoteAddress.address(), request);
+                try {
+                    assertThat(response.status(), equalTo(HttpResponseStatus.INTERNAL_SERVER_ERROR));
+                    assertThat(response.content().array().length, equalTo(0));
+                } finally {
+                    response.release();
+                }
+            }
+        }
+    }
+
+    public void testLargeCompressedResponse() throws InterruptedException {
+        final String responseString = randomAlphaOfLength(4 * 1024 * 1024);
+        final String url = "/thing/";
+        final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() {
+
+            @Override
+            public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) {
+                if (url.equals(request.uri())) {
+                    channel.sendResponse(new BytesRestResponse(OK, responseString));
+                } else {
+                    logger.error("--> Unexpected successful uri [{}]", request.uri());
+                    throw new AssertionError();
+                }
+            }
+
+            @Override
+            public void dispatchBadRequest(final RestChannel channel, final ThreadContext threadContext, final Throwable cause) {
+                logger.error(
+                    new ParameterizedMessage("--> Unexpected bad request [{}]", FakeRestRequest.requestToString(channel.request())),
+                    cause
+                );
+                throw new AssertionError();
+            }
+
+        };
+
+        try (
+            ReactorNetty4HttpServerTransport transport = new ReactorNetty4HttpServerTransport(
+                Settings.EMPTY,
+                networkService,
+                bigArrays,
+                threadPool,
+                xContentRegistry(),
+                dispatcher,
+                clusterSettings,
+                new SharedGroupFactory(Settings.EMPTY),
+                secureHttpTransportSettingsProvider,
+                NoopTracer.INSTANCE
+            )
+        ) {
+            transport.start();
+            final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses());
+
+            try (ReactorHttpClient client = ReactorHttpClient.https()) {
+                DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url);
+                request.headers().add(HttpHeaderNames.ACCEPT_ENCODING, randomFrom("deflate", "gzip"));
+                long numOfHugeAllocations = getHugeAllocationCount();
+                final FullHttpResponse response = client.send(remoteAddress.address(), request);
+                try {
+                    assertThat(getHugeAllocationCount(), equalTo(numOfHugeAllocations));
+                    assertThat(response.status(), equalTo(HttpResponseStatus.OK));
+                    byte[] bytes = new byte[response.content().readableBytes()];
+                    response.content().readBytes(bytes);
+                    assertThat(new String(bytes, StandardCharsets.UTF_8), equalTo(responseString));
+                } finally {
+                    response.release();
+                }
+            }
+        }
+    }
+
+    private long getHugeAllocationCount() {
+        long numOfHugAllocations = 0;
+        ByteBufAllocator allocator = NettyAllocator.getAllocator();
+        assert allocator instanceof NettyAllocator.NoDirectBuffers;
+        ByteBufAllocator delegate = ((NettyAllocator.NoDirectBuffers) allocator).getDelegate();
+        if (delegate instanceof PooledByteBufAllocator) {
+            PooledByteBufAllocatorMetric metric = ((PooledByteBufAllocator) delegate).metric();
+            numOfHugAllocations = metric.heapArenas().stream().mapToLong(PoolArenaMetric::numHugeAllocations).sum();
+        }
+        return numOfHugAllocations;
+    }
+
+    public void testCorsRequest() throws InterruptedException {
+        final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() {
+
+            @Override
+            public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) {
+                logger.error("--> Unexpected successful request [{}]", FakeRestRequest.requestToString(request));
+                throw new AssertionError();
+            }
+
+            @Override
+            public void dispatchBadRequest(final RestChannel channel, final ThreadContext threadContext, final Throwable cause) {
+                logger.error(
+                    new ParameterizedMessage("--> Unexpected bad request [{}]", FakeRestRequest.requestToString(channel.request())),
+                    cause
+                );
+                throw new AssertionError();
+            }
+
+        };
+
+        final Settings settings = createBuilderWithPort().put(SETTING_CORS_ENABLED.getKey(), true)
+            .put(SETTING_CORS_ALLOW_ORIGIN.getKey(), "test-cors.org")
+            .build();
+
+        try (
+            ReactorNetty4HttpServerTransport transport = new ReactorNetty4HttpServerTransport(
+                settings,
+                networkService,
+                bigArrays,
+                threadPool,
+                xContentRegistry(),
+                dispatcher,
+                new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
+                new SharedGroupFactory(settings),
+                secureHttpTransportSettingsProvider,
+                NoopTracer.INSTANCE
+            )
+        ) {
+            transport.start();
+            final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses());
+
+            // Test pre-flight request
+            try (ReactorHttpClient client = ReactorHttpClient.https()) {
+                final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.OPTIONS, "/");
+                request.headers().add(CorsHandler.ORIGIN, "test-cors.org");
+                request.headers().add(CorsHandler.ACCESS_CONTROL_REQUEST_METHOD, "POST");
+
+                final FullHttpResponse response = client.send(remoteAddress.address(), request);
+                try {
+                    assertThat(response.status(), equalTo(HttpResponseStatus.OK));
+                    assertThat(response.headers().get(CorsHandler.ACCESS_CONTROL_ALLOW_ORIGIN), equalTo("test-cors.org"));
+                    assertThat(response.headers().get(CorsHandler.VARY), equalTo(CorsHandler.ORIGIN));
+                    assertTrue(response.headers().contains(CorsHandler.DATE));
+                } finally {
+                    response.release();
+                }
+            }
+
+            // Test short-circuited request
+            try (ReactorHttpClient client = ReactorHttpClient.https()) {
+                final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/");
+                request.headers().add(CorsHandler.ORIGIN, "google.com");
+
+                final FullHttpResponse response = client.send(remoteAddress.address(), request);
+                try {
+                    assertThat(response.status(), equalTo(HttpResponseStatus.FORBIDDEN));
+                } finally {
+                    response.release();
+                }
+            }
+        }
+    }
+
+    public void testConnectTimeout() throws Exception {
+        final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() {
+
+            @Override
+            public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) {
+                logger.error("--> Unexpected successful request [{}]", FakeRestRequest.requestToString(request));
+                throw new AssertionError("Should not have received a dispatched request");
+            }
+
+            @Override
+            public void dispatchBadRequest(final RestChannel channel, final ThreadContext threadContext, final Throwable cause) {
+                logger.error(
+                    new ParameterizedMessage("--> Unexpected bad request [{}]", FakeRestRequest.requestToString(channel.request())),
+                    cause
+                );
+                throw new AssertionError("Should not have received a dispatched request");
+            }
+
+        };
+
+        Settings settings = createBuilderWithPort().put(
+            HttpTransportSettings.SETTING_HTTP_CONNECT_TIMEOUT.getKey(),
+            new TimeValue(randomIntBetween(100, 300))
+        ).build();
+
+        NioEventLoopGroup group = new NioEventLoopGroup();
+        try (
+            ReactorNetty4HttpServerTransport transport = new ReactorNetty4HttpServerTransport(
+                settings,
+                networkService,
+                bigArrays,
+                threadPool,
+                xContentRegistry(),
+                dispatcher,
+                new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
+                new SharedGroupFactory(settings),
+                secureHttpTransportSettingsProvider,
+                NoopTracer.INSTANCE
+            )
+        ) {
+            transport.start();
+            final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses());
+
+            final CountDownLatch channelClosedLatch = new CountDownLatch(1);
+
+            final Bootstrap clientBootstrap = new Bootstrap().option(ChannelOption.ALLOCATOR, NettyAllocator.getAllocator())
+                .channel(NioSocketChannel.class)
+                .handler(new ChannelInitializer<SocketChannel>() {
+
+                    @Override
+                    protected void initChannel(SocketChannel ch) {
+                        ch.pipeline().addLast(new ChannelHandlerAdapter() {
+                        });
+
+                    }
+                })
+                .group(group);
+            ChannelFuture connect = clientBootstrap.connect(remoteAddress.address());
+            connect.channel().closeFuture().addListener(future -> channelClosedLatch.countDown());
+
+            assertTrue("Channel should be closed due to read timeout", channelClosedLatch.await(1, TimeUnit.MINUTES));
+
+        } finally {
+            group.shutdownGracefully().await();
+        }
+    }
+
+    private Settings createSettings() {
+        return createBuilderWithPort().build();
+    }
+
+    private Settings.Builder createBuilderWithPort() {
+        return Settings.builder().put(HttpTransportSettings.SETTING_HTTP_PORT.getKey(), getPortRange());
+    }
+}
diff --git a/plugins/transport-reactor-netty4/src/test/resources/README.txt b/plugins/transport-reactor-netty4/src/test/resources/README.txt
new file mode 100644
index 0000000000000..a4353cee45a97
--- /dev/null
+++ b/plugins/transport-reactor-netty4/src/test/resources/README.txt
@@ -0,0 +1,14 @@
+#!/usr/bin/env bash
+#
+# This is README describes how the certificates in this directory were created.
+# This file can also be executed as a script
+#
+
+# 1. Create certificate key
+
+openssl req  -x509  -sha256  -newkey rsa:2048  -keyout certificate.key  -out certificate.crt  -days 1024  -nodes
+
+# 2. Export the certificate in pkcs12 format
+
+openssl pkcs12  -export  -in certificate.crt  -inkey certificate.key  -out server.p12  -name netty4-secure -password pass:password
+
diff --git a/plugins/transport-reactor-netty4/src/test/resources/certificate.crt b/plugins/transport-reactor-netty4/src/test/resources/certificate.crt
new file mode 100644
index 0000000000000..54c78fdbcf6de
--- /dev/null
+++ b/plugins/transport-reactor-netty4/src/test/resources/certificate.crt
@@ -0,0 +1,22 @@
+-----BEGIN CERTIFICATE-----
+MIIDkzCCAnugAwIBAgIUddAawr5zygcd+Dcn9WVDpO4BJ7YwDQYJKoZIhvcNAQEL
+BQAwWTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM
+GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDESMBAGA1UEAwwJbG9jYWxob3N0MB4X
+DTI0MDMxNDE5NDQzOVoXDTI3MDEwMjE5NDQzOVowWTELMAkGA1UEBhMCQVUxEzAR
+BgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoMGEludGVybmV0IFdpZGdpdHMgUHR5
+IEx0ZDESMBAGA1UEAwwJbG9jYWxob3N0MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8A
+MIIBCgKCAQEAzjOKkg6Iba5zfZ8b/RYw+PGmGEfbdGuuF10Wz4Jmx/Nk4VfDLxdh
+TW8VllUL2JD7uPkjABj7pW3awAbvIJ+VGbKqfBr1Nsz0mPPzhT8cfuMH/FDZgQs3
+4HuqDKr0LfC1Kw5E3WF0GVMBDNu0U+nKoeqySeYjGdxDnd3W4cqK5AnUxL0RnIny
+Bw7ZuhcU55XndH/Xauro/2EpvJduDsWMdqt7ZfIf1TOmaiQHK+82yb/drVaJbczK
+uTpn1Kv2bnzkQEckgq+z1dLNOOyvP2xf+nsziw5ilJe92e5GJOUJYFAlEgUAGpfD
+dv6j/gTRYvdJCJItOQEQtektNCAZsoc0wwIDAQABo1MwUTAdBgNVHQ4EFgQUzHts
+wIt+zhB/R4U4Do2P6rr0YhkwHwYDVR0jBBgwFoAUzHtswIt+zhB/R4U4Do2P6rr0
+YhkwDwYDVR0TAQH/BAUwAwEB/zANBgkqhkiG9w0BAQsFAAOCAQEAveh870jJX7vt
+oLCrdugsyo79pR4f7Nr1kUy3jJrfoaoUmrjiiiHWgT22fGwp7j1GZF2mVfo8YVaK
+63YNn5gB2NNZhguPOFC4AdvHRYOKRBOaOvWK8oq7BcJ//18JYI/pPnpgkYvJjqv4
+gFKaZX9qWtujHpAmKiVGs7pwYGNXfixPHRNV4owcfHMIH5dhbbqT49j94xVpjbXs
+OymKtFl4kpCE/0LzKFrFcuu55Am1VLBHx2cPpHLOipgUcF5BHFlQ8AXiCMOwfPAw
+d22mLB6Gt1oVEpyvQHYd3e04FetEXQ9E8T+NKWZx/8Ucf+IWBYmZBRxch6O83xgk
+bAbGzqkbzQ==
+-----END CERTIFICATE-----
diff --git a/plugins/transport-reactor-netty4/src/test/resources/certificate.key b/plugins/transport-reactor-netty4/src/test/resources/certificate.key
new file mode 100644
index 0000000000000..228350180935d
--- /dev/null
+++ b/plugins/transport-reactor-netty4/src/test/resources/certificate.key
@@ -0,0 +1,28 @@
+-----BEGIN PRIVATE KEY-----
+MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQDOM4qSDohtrnN9
+nxv9FjD48aYYR9t0a64XXRbPgmbH82ThV8MvF2FNbxWWVQvYkPu4+SMAGPulbdrA
+Bu8gn5UZsqp8GvU2zPSY8/OFPxx+4wf8UNmBCzfge6oMqvQt8LUrDkTdYXQZUwEM
+27RT6cqh6rJJ5iMZ3EOd3dbhyorkCdTEvRGcifIHDtm6FxTnled0f9dq6uj/YSm8
+l24OxYx2q3tl8h/VM6ZqJAcr7zbJv92tVoltzMq5OmfUq/ZufORARySCr7PV0s04
+7K8/bF/6ezOLDmKUl73Z7kYk5QlgUCUSBQAal8N2/qP+BNFi90kIki05ARC16S00
+IBmyhzTDAgMBAAECggEAVOdiElvLjyX6xeoC00YU6hxOIMdNtHU2HMamwtDV01UD
+38mMQ9KjrQelYt4n34drLrHe2IZw75/5J4JzagJrmUY47psHBwaDXItuZRokeJaw
+zhLYTEs7OcKRtV+a5WOspUrdzi33aQoFb67zZG3qkpsZyFXrdBV+/fy/Iv+MCvLH
+xR0jQ5mzE3cw20R7S4nddChBA/y8oKGOo6QRf2SznC1jL/+yolHvJPEn1v8AUxYm
+BMPHxj1O0c4M4IxnJQ3Y5Jy9OaFMyMsFlF1hVhc/3LDDxDyOuBsVsFDicojyrRea
+GKngIke0yezy7Wo4NUcp8YQhafonpWVsSJJdOUotcQKBgQD0rihFBXVtcG1d/Vy7
+FvLHrmccD56JNV744LSn2CDM7W1IulNbDUZINdCFqL91u5LpxozeE1FPY1nhwncJ
+N7V7XYCaSLCuV1YJzRmUCjnzk2RyopGpzWog3f9uUFGgrk1HGbNAv99k/REya6Iu
+IRSkuQhaJOj3bRXzonh0K4GjewKBgQDXvamtCioOUMSP8vq919YMkBw7F+z/fr0p
+pamO8HL9eewAUg6N92JQ9kobSo/GptdmdHIjs8LqnS5C3H13GX5Qlf5GskOlCpla
+V55ElaSp0gvKwWE168U7gQH4etPQAXXJrOGFaGbPj9W81hTUud7HVE88KYdfWTBo
+I7TuE25tWQKBgBRjcr2Vn9xXsvVTCGgamG5lLPhcoNREGz7X0pXt34XT/vhBdnKu
+331i5pZMom+YCrzqK5DRwUPBPpseTjb5amj2OKIijn5ojqXQbmI0m/GdBZC71TF2
+CXLlrMQvcy3VeGEFVjd+BYpvwAAYkfIQFZ1IQdbpHnSHpX2guzLK8UmDAoGBANUy
+PIcf0EetUVHfkCIjNQfdMcjD8BTcLhsF9vWmcDxFTA9VB8ULf0D64mjt2f85yQsa
+b+EQN8KZ6alxMxuLOeRxFYLPj0F9o+Y/R8wHBV48kCKhz2r1v0b6SfQ/jSm1B61x
+BrxLW64qOdIOzS8bLyhUDKkrcPesr8V548aRtUKhAoGBAKlNJFd8BCGKD9Td+3dE
+oP1iHTX5XZ+cQIqL0e+GMQlK4HnQP566DFZU5/GHNNAfmyxd5iSRwhTqPMHRAmOb
+pqQwsyufx0dFeIBxeSO3Z6jW5h2sl4nBipZpw9bzv6EBL1xRr0SfMNZzdnf4JFzc
+0htGo/VO93Z2pv8w7uGUz1nN
+-----END PRIVATE KEY-----