Skip to content

Commit

Permalink
[Streaming Indexing] Ensure support of the new transport by security …
Browse files Browse the repository at this point in the history
…plugin

Signed-off-by: Andriy Redko <[email protected]>
  • Loading branch information
reta committed Apr 12, 2024
1 parent c168e1c commit 1921646
Show file tree
Hide file tree
Showing 9 changed files with 925 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.http.reactor.netty4;

import org.opensearch.common.Nullable;
import org.opensearch.common.network.NetworkService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
Expand All @@ -23,21 +24,33 @@
import org.opensearch.http.HttpChannel;
import org.opensearch.http.HttpReadTimeoutException;
import org.opensearch.http.HttpServerChannel;
import org.opensearch.http.reactor.netty4.ssl.SslUtils;
import org.opensearch.plugins.SecureHttpTransportSettingsProvider;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.reactor.SharedGroupFactory;
import org.opensearch.transport.reactor.netty4.Netty4Utils;

import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLException;
import javax.net.ssl.SSLSessionContext;

import java.net.InetSocketAddress;
import java.net.SocketOption;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;

import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelOption;
import io.netty.channel.socket.nio.NioChannelOption;
import io.netty.handler.codec.http.DefaultLastHttpContent;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.ssl.ApplicationProtocolNegotiator;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.timeout.ReadTimeoutException;
import io.netty.util.ReferenceCountUtil;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
Expand Down Expand Up @@ -116,6 +129,7 @@ public class ReactorNetty4HttpServerTransport extends AbstractHttpServerTranspor
private final ByteSizeValue maxInitialLineLength;
private final ByteSizeValue maxHeaderSize;
private final ByteSizeValue maxChunkSize;
private final SecureHttpTransportSettingsProvider secureHttpTransportSettingsProvider;
private volatile SharedGroupFactory.SharedGroup sharedGroup;
private volatile DisposableServer disposableServer;
private volatile Scheduler scheduler;
Expand All @@ -142,6 +156,45 @@ public ReactorNetty4HttpServerTransport(
ClusterSettings clusterSettings,
SharedGroupFactory sharedGroupFactory,
Tracer tracer
) {
this(
settings,
networkService,
bigArrays,
threadPool,
xContentRegistry,
dispatcher,
clusterSettings,
sharedGroupFactory,
null,
tracer
);
}

/**
* Creates new HTTP transport implementations based on Reactor Netty (see please {@link HttpServer}).
* @param settings settings
* @param networkService network service
* @param bigArrays big array allocator
* @param threadPool thread pool instance
* @param xContentRegistry XContent registry instance
* @param dispatcher dispatcher instance
* @param clusterSettings cluster settings
* @param sharedGroupFactory shared group factory
* @param secureHttpTransportSettingsProvider secure HTTP transport settings provider
* @param tracer tracer instance
*/
public ReactorNetty4HttpServerTransport(
Settings settings,
NetworkService networkService,
BigArrays bigArrays,
ThreadPool threadPool,
NamedXContentRegistry xContentRegistry,
Dispatcher dispatcher,
ClusterSettings clusterSettings,
SharedGroupFactory sharedGroupFactory,
@Nullable SecureHttpTransportSettingsProvider secureHttpTransportSettingsProvider,
Tracer tracer
) {
super(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher, clusterSettings, tracer);
Netty4Utils.setAvailableProcessors(OpenSearchExecutors.NODE_PROCESSORS_SETTING.get(settings));
Expand All @@ -152,6 +205,7 @@ public ReactorNetty4HttpServerTransport(
this.maxChunkSize = SETTING_HTTP_MAX_CHUNK_SIZE.get(settings);
this.maxHeaderSize = SETTING_HTTP_MAX_HEADER_SIZE.get(settings);
this.maxInitialLineLength = SETTING_HTTP_MAX_INITIAL_LINE_LENGTH.get(settings);
this.secureHttpTransportSettingsProvider = secureHttpTransportSettingsProvider;
}

/**
Expand All @@ -160,7 +214,7 @@ public ReactorNetty4HttpServerTransport(
*/
@Override
protected HttpServerChannel bind(InetSocketAddress socketAddress) throws Exception {
final HttpServer server = configureChannelOptions(
final HttpServer server = configure(
HttpServer.create()
.httpFormDecoder(builder -> builder.scheduler(scheduler))
.idleTimeout(Duration.ofMillis(connectTimeoutMillis))
Expand All @@ -173,16 +227,15 @@ protected HttpServerChannel bind(InetSocketAddress socketAddress) throws Excepti
.maxHeaderSize(maxHeaderSize.bytesAsInt())
.maxInitialLineLength(maxInitialLineLength.bytesAsInt())
)
.protocol(HttpProtocol.HTTP11, HttpProtocol.H2C)
.handle((req, res) -> incomingRequest(req, res))
);

disposableServer = server.bindNow();
return new ReactorNetty4HttpServerChannel(disposableServer.channel());
}

private HttpServer configureChannelOptions(final HttpServer server1) {
HttpServer configured = server1.childOption(ChannelOption.TCP_NODELAY, SETTING_HTTP_TCP_NO_DELAY.get(settings))
private HttpServer configure(final HttpServer server) throws Exception {
HttpServer configured = server.childOption(ChannelOption.TCP_NODELAY, SETTING_HTTP_TCP_NO_DELAY.get(settings))
.childOption(ChannelOption.SO_KEEPALIVE, SETTING_HTTP_TCP_KEEP_ALIVE.get(settings));

if (SETTING_HTTP_TCP_KEEP_ALIVE.get(settings)) {
Expand Down Expand Up @@ -229,6 +282,65 @@ private HttpServer configureChannelOptions(final HttpServer server1) {
configured = configured.option(ChannelOption.SO_REUSEADDR, reuseAddress);
configured = configured.childOption(ChannelOption.SO_REUSEADDR, reuseAddress);

// Configure SSL context if available
if (secureHttpTransportSettingsProvider != null) {
final SSLEngine engine = secureHttpTransportSettingsProvider.buildSecureHttpServerEngine(settings, this)
.orElseGet(SslUtils::createDefaultServerSSLEngine);

try {
final List<String> cipherSuites = Arrays.asList(engine.getEnabledCipherSuites());
final List<String> applicationProtocols = Arrays.asList(engine.getSSLParameters().getApplicationProtocols());

configured = configured.secure(spec -> spec.sslContext(new SslContext() {
@Override
public SSLSessionContext sessionContext() {
throw new UnsupportedOperationException(); /* server only, should never be called */

Check warning on line 297 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java

View check run for this annotation

Codecov / codecov/patch

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java#L297

Added line #L297 was not covered by tests
}

@Override
public SSLEngine newEngine(ByteBufAllocator alloc, String peerHost, int peerPort) {
throw new UnsupportedOperationException(); /* server only, should never be called */

Check warning on line 302 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java

View check run for this annotation

Codecov / codecov/patch

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java#L302

Added line #L302 was not covered by tests
}

@Override
public SSLEngine newEngine(ByteBufAllocator alloc) {
try {
return secureHttpTransportSettingsProvider.buildSecureHttpServerEngine(
settings,
ReactorNetty4HttpServerTransport.this
).orElseGet(SslUtils::createDefaultServerSSLEngine);
} catch (final SSLException ex) {
throw new UnsupportedOperationException("Unable to create SSLEngine", ex);

Check warning on line 313 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java

View check run for this annotation

Codecov / codecov/patch

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java#L312-L313

Added lines #L312 - L313 were not covered by tests
}
}

@Override
public boolean isClient() {
return false; /* server only */

Check warning on line 319 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java

View check run for this annotation

Codecov / codecov/patch

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java#L319

Added line #L319 was not covered by tests
}

@Override
public List<String> cipherSuites() {
return cipherSuites;

Check warning on line 324 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java

View check run for this annotation

Codecov / codecov/patch

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java#L324

Added line #L324 was not covered by tests
}

@Override
public ApplicationProtocolNegotiator applicationProtocolNegotiator() {
return new ApplicationProtocolNegotiator() {

Check warning on line 329 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java

View check run for this annotation

Codecov / codecov/patch

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java#L329

Added line #L329 was not covered by tests
@Override
public List<String> protocols() {
return applicationProtocols;

Check warning on line 332 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java

View check run for this annotation

Codecov / codecov/patch

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java#L332

Added line #L332 was not covered by tests
}
};
}
}).build()).protocol(HttpProtocol.HTTP11, HttpProtocol.H2);
} finally {
ReferenceCountUtil.release(engine);
}
} else {
configured = configured.protocol(HttpProtocol.HTTP11, HttpProtocol.H2C);
}

return configured;
}

Expand Down Expand Up @@ -302,6 +414,11 @@ protected void doStart() {
}
}

/**
* Exception handler
* @param channel HTTP channel
* @param cause exception occurred
*/
@Override
public void onException(HttpChannel channel, Exception cause) {
if (cause instanceof ReadTimeoutException) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/
package org.opensearch.http.reactor.netty4.ssl;

import org.opensearch.OpenSearchSecurityException;

import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;

import java.security.NoSuchAlgorithmException;

/**
* Helper class for creating default SSL engines
*/
public class SslUtils {
private static final String[] DEFAULT_SSL_PROTOCOLS = { "TLSv1.3", "TLSv1.2", "TLSv1.1" };

Check warning on line 24 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ssl/SslUtils.java

View check run for this annotation

Codecov / codecov/patch

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ssl/SslUtils.java#L24

Added line #L24 was not covered by tests

private SslUtils() {}

/**
* Creates default server {@link SSLEngine} instance
* @return default server {@link SSLEngine} instance
*/
public static SSLEngine createDefaultServerSSLEngine() {
try {
final SSLEngine engine = SSLContext.getDefault().createSSLEngine();
engine.setEnabledProtocols(DEFAULT_SSL_PROTOCOLS);
engine.setUseClientMode(false);
return engine;
} catch (final NoSuchAlgorithmException ex) {
throw new OpenSearchSecurityException("Unable to initialize default server SSL engine", ex);

Check warning on line 39 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ssl/SslUtils.java

View check run for this annotation

Codecov / codecov/patch

plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ssl/SslUtils.java#L34-L39

Added lines #L34 - L39 were not covered by tests
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/**
* SSL supporting utility classes
*/
package org.opensearch.http.reactor.netty4.ssl;
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
import org.opensearch.core.indices.breaker.CircuitBreakerService;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.http.HttpServerTransport;
import org.opensearch.http.HttpServerTransport.Dispatcher;
import org.opensearch.http.reactor.netty4.ReactorNetty4HttpServerTransport;
import org.opensearch.plugins.NetworkPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.SecureHttpTransportSettingsProvider;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.threadpool.ThreadPool;

Expand All @@ -38,6 +40,11 @@ public class ReactorNetty4Plugin extends Plugin implements NetworkPlugin {
*/
public static final String REACTOR_NETTY_HTTP_TRANSPORT_NAME = "reactor-netty4";

/**
* The name of new experimental secure HTTP transport implementations based on Reactor Netty.
*/
public static final String REACTOR_NETTY_SECURE_HTTP_TRANSPORT_NAME = "reactor-netty4-secure";

private final SetOnce<SharedGroupFactory> groupFactory = new SetOnce<>();

/**
Expand Down Expand Up @@ -91,6 +98,53 @@ public Map<String, Supplier<HttpServerTransport>> getHttpTransports(
dispatcher,
clusterSettings,
getSharedGroupFactory(settings),
null, /* no security settings provider */
tracer
)
);
}

/**
* Returns a map of {@link HttpServerTransport} suppliers.
* See {@link org.opensearch.common.network.NetworkModule#HTTP_TYPE_SETTING} to configure a specific implementation.
* @param settings settings
* @param networkService network service
* @param bigArrays big array allocator
* @param pageCacheRecycler page cache recycler instance
* @param circuitBreakerService circuit breaker service instance
* @param threadPool thread pool instance
* @param xContentRegistry XContent registry instance
* @param dispatcher dispatcher instance
* @param clusterSettings cluster settings
* @param secureHttpTransportSettingsProvider secure HTTP transport settings provider
* @param tracer tracer instance
*/
@Override
public Map<String, Supplier<HttpServerTransport>> getSecureHttpTransports(
Settings settings,
ThreadPool threadPool,
BigArrays bigArrays,
PageCacheRecycler pageCacheRecycler,
CircuitBreakerService circuitBreakerService,
NamedXContentRegistry xContentRegistry,
NetworkService networkService,
Dispatcher dispatcher,
ClusterSettings clusterSettings,
SecureHttpTransportSettingsProvider secureHttpTransportSettingsProvider,
Tracer tracer
) {
return Collections.singletonMap(

Check warning on line 136 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/transport/reactor/ReactorNetty4Plugin.java

View check run for this annotation

Codecov / codecov/patch

plugins/transport-reactor-netty4/src/main/java/org/opensearch/transport/reactor/ReactorNetty4Plugin.java#L136

Added line #L136 was not covered by tests
REACTOR_NETTY_SECURE_HTTP_TRANSPORT_NAME,
() -> new ReactorNetty4HttpServerTransport(

Check warning on line 138 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/transport/reactor/ReactorNetty4Plugin.java

View check run for this annotation

Codecov / codecov/patch

plugins/transport-reactor-netty4/src/main/java/org/opensearch/transport/reactor/ReactorNetty4Plugin.java#L138

Added line #L138 was not covered by tests
settings,
networkService,
bigArrays,
threadPool,
xContentRegistry,
dispatcher,
clusterSettings,
getSharedGroupFactory(settings),

Check warning on line 146 in plugins/transport-reactor-netty4/src/main/java/org/opensearch/transport/reactor/ReactorNetty4Plugin.java

View check run for this annotation

Codecov / codecov/patch

plugins/transport-reactor-netty4/src/main/java/org/opensearch/transport/reactor/ReactorNetty4Plugin.java#L146

Added line #L146 was not covered by tests
secureHttpTransportSettingsProvider,
tracer
)
);
Expand Down
Loading

0 comments on commit 1921646

Please sign in to comment.