Skip to content

Commit

Permalink
Add header verifier and decompressor support of secure NIO transport …
Browse files Browse the repository at this point in the history
…variant

Signed-off-by: Andriy Redko <[email protected]>
  • Loading branch information
reta committed Nov 4, 2024
1 parent 7abf006 commit 420635a
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@
* @see <a href="https://github.com/opensearch-project/security/blob/d526c9f6c2a438c14db8b413148204510b9fe2e2/src/main/java/org/opensearch/security/ssl/http/netty/SecuritySSLNettyHttpServerTransport.java">SecuritySSLNettyHttpServerTransport</a>
*/
public class SecureNetty4HttpServerTransport extends Netty4HttpServerTransport {
public static final String REQUEST_HEADER_VERIFIER = "HeaderVerifier";
public static final String REQUEST_DECOMPRESSOR = "RequestDecompressor";
public static final String REQUEST_HEADER_VERIFIER = SecureHttpTransportSettingsProvider.REQUEST_HEADER_VERIFIER;
public static final String REQUEST_DECOMPRESSOR = SecureHttpTransportSettingsProvider.REQUEST_DECOMPRESSOR;

private static final Logger logger = LogManager.getLogger(SecureNetty4HttpServerTransport.class);
private final SecureHttpTransportSettingsProvider secureHttpTransportSettingsProvider;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import java.util.function.LongSupplier;

import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.http.HttpContentCompressor;
import io.netty.handler.codec.http.HttpContentDecompressor;
Expand Down Expand Up @@ -82,7 +83,16 @@ public HttpReadWriteHandler(
TaskScheduler taskScheduler,
LongSupplier nanoClock
) {
this(nioHttpChannel, transport, settings, taskScheduler, nanoClock, null /* no SSL/TLS */);
this(
nioHttpChannel,
transport,
settings,
taskScheduler,
nanoClock,
new ChannelInboundHandlerAdapter(),
new HttpContentDecompressor(),
null /* no SSL/TLS */
);
}

HttpReadWriteHandler(
Expand All @@ -91,6 +101,8 @@ public HttpReadWriteHandler(
HttpHandlingSettings settings,
TaskScheduler taskScheduler,
LongSupplier nanoClock,
@Nullable ChannelHandler headerVerifier,
ChannelHandler decompressor,
@Nullable SSLEngine sslEngine
) {
this.nioHttpChannel = nioHttpChannel;
Expand All @@ -113,7 +125,10 @@ public HttpReadWriteHandler(
);
decoder.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR);
handlers.add(decoder);
handlers.add(new HttpContentDecompressor());
if (headerVerifier != null) {
handlers.add(headerVerifier);
}
handlers.add(decompressor);
handlers.add(new HttpResponseEncoder());
handlers.add(new HttpObjectAggregator(settings.getMaxContentLength()));
if (settings.isCompression()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.opensearch.http.AbstractHttpServerTransport;
import org.opensearch.http.HttpChannel;
import org.opensearch.http.HttpServerChannel;
import org.opensearch.http.HttpServerTransport;
import org.opensearch.http.nio.ssl.SslUtils;
import org.opensearch.nio.BytesChannelContext;
import org.opensearch.nio.ChannelFactory;
Expand All @@ -61,6 +62,7 @@
import org.opensearch.plugins.SecureHttpTransportSettingsProvider;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportAdapterProvider;
import org.opensearch.transport.nio.NioGroupFactory;
import org.opensearch.transport.nio.PageAllocator;

Expand All @@ -71,7 +73,14 @@
import java.net.InetSocketAddress;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.HttpContentDecompressor;

import static org.opensearch.http.HttpTransportSettings.SETTING_HTTP_MAX_CHUNK_SIZE;
import static org.opensearch.http.HttpTransportSettings.SETTING_HTTP_MAX_HEADER_SIZE;
Expand All @@ -89,6 +98,9 @@
public class NioHttpServerTransport extends AbstractHttpServerTransport {
private static final Logger logger = LogManager.getLogger(NioHttpServerTransport.class);

public static final String REQUEST_HEADER_VERIFIER = SecureHttpTransportSettingsProvider.REQUEST_HEADER_VERIFIER;
public static final String REQUEST_DECOMPRESSOR = SecureHttpTransportSettingsProvider.REQUEST_DECOMPRESSOR;

protected final PageAllocator pageAllocator;
private final NioGroupFactory nioGroupFactory;

Expand Down Expand Up @@ -224,6 +236,8 @@ protected void acceptChannel(NioSocketChannel socketChannel) {

private class HttpChannelFactory extends ChannelFactory<NioHttpServerChannel, NioHttpChannel> {
private final SecureHttpTransportSettingsProvider secureHttpTransportSettingsProvider;
private final ChannelInboundHandlerAdapter headerVerifier;
private final TransportAdapterProvider<HttpServerTransport> decompressorProvider;

private HttpChannelFactory(@Nullable SecureHttpTransportSettingsProvider secureHttpTransportSettingsProvider) {
super(
Expand All @@ -237,6 +251,63 @@ private HttpChannelFactory(@Nullable SecureHttpTransportSettingsProvider secureH
tcpReceiveBufferSize
);
this.secureHttpTransportSettingsProvider = secureHttpTransportSettingsProvider;

final List<ChannelInboundHandlerAdapter> headerVerifiers = getHeaderVerifiers(secureHttpTransportSettingsProvider);
final Optional<TransportAdapterProvider<HttpServerTransport>> decompressorProviderOpt = getDecompressorProvider(
secureHttpTransportSettingsProvider
);

// There could be multiple request decompressor providers configured, using the first one
decompressorProviderOpt.ifPresent(p -> logger.debug("Using request decompressor provider: {}", p));

if (headerVerifiers.size() > 1) {
throw new IllegalArgumentException(
"Cannot have more than one header verifier configured, supplied " + headerVerifiers.size()
);
}

this.headerVerifier = headerVerifiers.isEmpty() ? null : headerVerifiers.get(0);
this.decompressorProvider = decompressorProviderOpt.orElseGet(() -> new TransportAdapterProvider<HttpServerTransport>() {
@Override
public String name() {
return REQUEST_DECOMPRESSOR;
}

@Override
public <C> Optional<C> create(Settings settings, HttpServerTransport transport, Class<C> adapterClass) {
return Optional.empty();
}
});

}

private List<ChannelInboundHandlerAdapter> getHeaderVerifiers(
@Nullable SecureHttpTransportSettingsProvider secureHttpTransportSettingsProvider
) {
if (secureHttpTransportSettingsProvider == null) {
return Collections.emptyList();
}

return secureHttpTransportSettingsProvider.getHttpTransportAdapterProviders(settings)
.stream()
.filter(p -> REQUEST_HEADER_VERIFIER.equalsIgnoreCase(p.name()))
.map(p -> p.create(settings, NioHttpServerTransport.this, ChannelInboundHandlerAdapter.class))
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toList());
}

private Optional<TransportAdapterProvider<HttpServerTransport>> getDecompressorProvider(
@Nullable SecureHttpTransportSettingsProvider secureHttpTransportSettingsProvider
) {
if (secureHttpTransportSettingsProvider == null) {
return Optional.empty();
}

return secureHttpTransportSettingsProvider.getHttpTransportAdapterProviders(settings)
.stream()
.filter(p -> REQUEST_DECOMPRESSOR.equalsIgnoreCase(p.name()))
.findFirst();
}

@Override
Expand All @@ -254,6 +325,9 @@ public NioHttpChannel createChannel(NioSelector selector, SocketChannel channel,
handlingSettings,
selector.getTaskScheduler(),
threadPool::relativeTimeInMillis,
headerVerifier,
decompressorProvider.create(settings, NioHttpServerTransport.this, ChannelInboundHandlerAdapter.class)
.orElseGet(HttpContentDecompressor::new),
engine
);
Consumer<Exception> exceptionHandler = (e) -> onException(httpChannel, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@
*/
@ExperimentalApi
public interface SecureHttpTransportSettingsProvider {
/**
* The well-known name of header verifier {@link TransportAdapterProvider} provider instance
*/
final String REQUEST_HEADER_VERIFIER = "HeaderVerifier";

/**
* The well-known name of request decompressor {@link TransportAdapterProvider} provider instance
*/
final String REQUEST_DECOMPRESSOR = "RequestDecompressor";

/**
* Collection of additional {@link TransportAdapterProvider}s that are specific to particular HTTP transport
* @param settings settings
Expand Down

0 comments on commit 420635a

Please sign in to comment.