Skip to content

Commit

Permalink
Ensure support of the transport-nio by security plugin (HTTP) (#16474)
Browse files Browse the repository at this point in the history
* Ensure support of the transport-nio by security plugin (HTTP)

Signed-off-by: Andriy Redko <[email protected]>

* Add header verifier and decompressor support of secure NIO transport variant

Signed-off-by: Andriy Redko <[email protected]>

---------

Signed-off-by: Andriy Redko <[email protected]>
(cherry picked from commit b25e10a)
  • Loading branch information
reta committed Nov 5, 2024
1 parent 2fa8370 commit dad027f
Show file tree
Hide file tree
Showing 19 changed files with 966 additions and 35 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
## [Unreleased 2.x]
### Added
- Add support for restoring from snapshot with search replicas ([#16111](https://github.com/opensearch-project/OpenSearch/pull/16111))
- Ensure support of the transport-nio by security plugin ([#16474](https://github.com/opensearch-project/OpenSearch/pull/16474))
- Switch from `buildSrc/version.properties` to Gradle version catalog (`gradle/libs.versions.toml`) to enable dependabot to perform automated upgrades on common libs ([#16284](https://github.com/opensearch-project/OpenSearch/pull/16284))
- Add dynamic setting allowing size > 0 requests to be cached in the request cache ([#16483](https://github.com/opensearch-project/OpenSearch/pull/16483/files))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@
* @see <a href="https://github.com/opensearch-project/security/blob/d526c9f6c2a438c14db8b413148204510b9fe2e2/src/main/java/org/opensearch/security/ssl/http/netty/SecuritySSLNettyHttpServerTransport.java">SecuritySSLNettyHttpServerTransport</a>
*/
public class SecureNetty4HttpServerTransport extends Netty4HttpServerTransport {
public static final String REQUEST_HEADER_VERIFIER = "HeaderVerifier";
public static final String REQUEST_DECOMPRESSOR = "RequestDecompressor";
public static final String REQUEST_HEADER_VERIFIER = SecureHttpTransportSettingsProvider.REQUEST_HEADER_VERIFIER;
public static final String REQUEST_DECOMPRESSOR = SecureHttpTransportSettingsProvider.REQUEST_DECOMPRESSOR;

private static final Logger logger = LogManager.getLogger(SecureNetty4HttpServerTransport.class);
private final SecureHttpTransportSettingsProvider secureHttpTransportSettingsProvider;
Expand Down
5 changes: 1 addition & 4 deletions plugins/transport-nio/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ dependencies {
api "io.netty:netty-handler:${versions.netty}"
api "io.netty:netty-resolver:${versions.netty}"
api "io.netty:netty-transport:${versions.netty}"
api "io.netty:netty-transport-native-unix-common:${versions.netty}"
}

tasks.named("dependencyLicenses").configure {
Expand Down Expand Up @@ -151,10 +152,6 @@ thirdPartyAudit {
'io.netty.internal.tcnative.SessionTicketKey',
'io.netty.internal.tcnative.SniHostNameMatcher',

// from io.netty.channel.unix (netty)
'io.netty.channel.unix.FileDescriptor',
'io.netty.channel.unix.UnixChannel',

'reactor.blockhound.BlockHound$Builder',
'reactor.blockhound.integration.BlockHoundIntegration'
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
d1171bb99411f282068f49d780cedf8c9adeabfd
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ public void testThatNioHttpServerSupportsPipelining() throws Exception {
TransportAddress[] boundAddresses = httpServerTransport.boundAddress().boundAddresses();
TransportAddress transportAddress = randomFrom(boundAddresses);

try (NioHttpClient nettyHttpClient = new NioHttpClient()) {
Collection<FullHttpResponse> responses = nettyHttpClient.get(transportAddress.address(), requests);
try (NioHttpClient client = NioHttpClient.http()) {
Collection<FullHttpResponse> responses = client.get(transportAddress.address(), requests);
assertThat(responses, hasSize(5));

Collection<String> opaqueIds = NioHttpClient.returnOpaqueIds(responses);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.http.nio;

import org.opensearch.common.Nullable;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.http.HttpHandlingSettings;
import org.opensearch.http.HttpPipelinedRequest;
Expand All @@ -44,6 +45,8 @@
import org.opensearch.nio.TaskScheduler;
import org.opensearch.nio.WriteOperation;

import javax.net.ssl.SSLEngine;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -58,6 +61,7 @@
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.ssl.SslHandler;

public class HttpReadWriteHandler implements NioChannelHandler {

Expand All @@ -77,6 +81,28 @@ public HttpReadWriteHandler(
HttpHandlingSettings settings,
TaskScheduler taskScheduler,
LongSupplier nanoClock
) {
this(
nioHttpChannel,
transport,
settings,
taskScheduler,
nanoClock,
null, /* no header verifier */
new HttpContentDecompressor(),
null /* no SSL/TLS */
);
}

HttpReadWriteHandler(
NioHttpChannel nioHttpChannel,
NioHttpServerTransport transport,
HttpHandlingSettings settings,
TaskScheduler taskScheduler,
LongSupplier nanoClock,
@Nullable ChannelHandler headerVerifier,
ChannelHandler decompressor,
@Nullable SSLEngine sslEngine
) {
this.nioHttpChannel = nioHttpChannel;
this.transport = transport;
Expand All @@ -85,14 +111,23 @@ public HttpReadWriteHandler(
this.readTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(settings.getReadTimeoutMillis());

List<ChannelHandler> handlers = new ArrayList<>(8);

SslHandler sslHandler = null;
if (sslEngine != null) {
sslHandler = new SslHandler(sslEngine);
}

HttpRequestDecoder decoder = new HttpRequestDecoder(
settings.getMaxInitialLineLength(),
settings.getMaxHeaderSize(),
settings.getMaxChunkSize()
);
decoder.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR);
handlers.add(decoder);
handlers.add(new HttpContentDecompressor());
if (headerVerifier != null) {
handlers.add(headerVerifier);

Check warning on line 128 in plugins/transport-nio/src/main/java/org/opensearch/http/nio/HttpReadWriteHandler.java

View check run for this annotation

Codecov / codecov/patch

plugins/transport-nio/src/main/java/org/opensearch/http/nio/HttpReadWriteHandler.java#L128

Added line #L128 was not covered by tests
}
handlers.add(decompressor);
handlers.add(new HttpResponseEncoder());
handlers.add(new HttpObjectAggregator(settings.getMaxContentLength()));
if (settings.isCompression()) {
Expand All @@ -102,7 +137,7 @@ public HttpReadWriteHandler(
handlers.add(new NioHttpResponseCreator());
handlers.add(new NioHttpPipeliningHandler(transport.getLogger(), settings.getPipeliningMaxEvents()));

adaptor = new NettyAdaptor(handlers.toArray(new ChannelHandler[0]));
adaptor = new NettyAdaptor(sslHandler, handlers.toArray(new ChannelHandler[0]));
adaptor.addCloseListener((v, e) -> nioHttpChannel.close());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
package org.opensearch.http.nio;

import org.opensearch.ExceptionsHelper;
import org.opensearch.common.Nullable;
import org.opensearch.nio.FlushOperation;
import org.opensearch.nio.Page;
import org.opensearch.nio.WriteOperation;
Expand All @@ -49,16 +50,21 @@
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.ssl.SslHandler;

class NettyAdaptor {

private final EmbeddedChannel nettyChannel;
private final LinkedList<FlushOperation> flushOperations = new LinkedList<>();

NettyAdaptor(ChannelHandler... handlers) {
nettyChannel = new EmbeddedChannel();
nettyChannel.pipeline().addLast("write_captor", new ChannelOutboundHandlerAdapter() {
this(null, handlers);
}

NettyAdaptor(@Nullable SslHandler sslHandler, ChannelHandler... handlers) {
this.nettyChannel = new EmbeddedChannel();

nettyChannel.pipeline().addLast("write_captor", new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
// This is a little tricky. The embedded channel will complete the promise once it writes the message
Expand All @@ -75,12 +81,22 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
}
}
});
if (sslHandler != null) {
nettyChannel.pipeline().addAfter("write_captor", "ssl_handler", sslHandler);
}
nettyChannel.pipeline().addLast(handlers);
}

public void close() throws Exception {
assert flushOperations.isEmpty() : "Should close outbound operations before calling close";

final SslHandler sslHandler = (SslHandler) nettyChannel.pipeline().get("ssl_handler");
if (sslHandler != null) {
// The nettyChannel.close() or sslHandler.closeOutbound() futures will block indefinitely,
// removing the handler instead from the channel.
nettyChannel.pipeline().remove(sslHandler);
}

ChannelFuture closeFuture = nettyChannel.close();
// This should be safe as we are not a real network channel
closeFuture.await();
Expand Down
Loading

0 comments on commit dad027f

Please sign in to comment.