Skip to content

Commit

Permalink
Back out DelegatingRestHandler changes to simplify this PR and follow…
Browse files Browse the repository at this point in the history
… with a PR to introduce DelegatingRestHandler

Signed-off-by: Craig Perkins <[email protected]>
  • Loading branch information
cwperks committed Sep 29, 2023
1 parent f4eb416 commit 226299a
Show file tree
Hide file tree
Showing 8 changed files with 273 additions and 136 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.http.netty4;

import org.opensearch.common.network.NetworkService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.BigArrays;
import org.opensearch.common.util.PageCacheRecycler;
import org.opensearch.core.indices.breaker.CircuitBreakerService;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.http.HttpServerTransport;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.Netty4ModulePlugin;
import org.opensearch.transport.SharedGroupFactory;

import java.util.Collections;
import java.util.Map;
import java.util.function.Supplier;

import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.util.ReferenceCountUtil;

public class Netty4BlockingPlugin extends Netty4ModulePlugin {

public Netty4BlockingPlugin() {
super();
}

public class Netty4BlockingHttpServerTransport extends Netty4HttpServerTransport {

public Netty4BlockingHttpServerTransport(
Settings settings,
NetworkService networkService,
BigArrays bigArrays,
ThreadPool threadPool,
NamedXContentRegistry xContentRegistry,
Dispatcher dispatcher,
ClusterSettings clusterSettings,
SharedGroupFactory sharedGroupFactory,
Tracer tracer
) {
super(
settings,
networkService,
bigArrays,
threadPool,
xContentRegistry,
dispatcher,
clusterSettings,
sharedGroupFactory,
tracer
);
}

@Override
protected ChannelInboundHandlerAdapter createHeaderVerifier() {
return new ExampleBlockingNetty4HeaderVerifier();
}
}

@Override
public Map<String, Supplier<HttpServerTransport>> getHttpTransports(
Settings settings,
ThreadPool threadPool,
BigArrays bigArrays,
PageCacheRecycler pageCacheRecycler,
CircuitBreakerService circuitBreakerService,
NamedXContentRegistry xContentRegistry,
NetworkService networkService,
HttpServerTransport.Dispatcher dispatcher,
ClusterSettings clusterSettings,
Tracer tracer
) {
return Collections.singletonMap(
NETTY_HTTP_TRANSPORT_NAME,
() -> new Netty4BlockingHttpServerTransport(
settings,
networkService,
bigArrays,
threadPool,
xContentRegistry,
dispatcher,
clusterSettings,
getSharedGroupFactory(settings),
tracer
)
);
}

/** POC for how an external header verifier would be implemented */
public class ExampleBlockingNetty4HeaderVerifier extends ChannelInboundHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (!(msg instanceof HttpRequest)) {
ctx.fireChannelRead(msg);
}

HttpRequest request = (HttpRequest) msg;
if (!isAuthenticated(request)) {
final FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.UNAUTHORIZED);
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
ReferenceCountUtil.release(msg);
} else {
// Lets the request pass to the next channel handler
ctx.fireChannelRead(msg);
}
}

private boolean isAuthenticated(HttpRequest request) {
final boolean shouldBlock = request.headers().contains("blockme");

return !shouldBlock;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.http.netty4;

import org.opensearch.OpenSearchNetty4IntegTestCase;
import org.opensearch.core.common.transport.TransportAddress;
import org.opensearch.http.HttpServerTransport;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;
import org.opensearch.test.OpenSearchIntegTestCase.Scope;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;

import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.util.ReferenceCounted;

import static org.hamcrest.CoreMatchers.equalTo;
import static io.netty.handler.codec.http.HttpHeaderNames.HOST;

@ClusterScope(scope = Scope.TEST, supportsDedicatedMasters = false, numDataNodes = 1)
public class Netty4HeaderVerifierIT extends OpenSearchNetty4IntegTestCase {

@Override
protected boolean addMockHttpTransport() {
return false; // enable http
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singletonList(Netty4BlockingPlugin.class);
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/10260")
public void testThatNettyHttpServerRequestBlockedWithHeaderVerifier() throws Exception {
HttpServerTransport httpServerTransport = internalCluster().getInstance(HttpServerTransport.class);
TransportAddress[] boundAddresses = httpServerTransport.boundAddress().boundAddresses();
TransportAddress transportAddress = randomFrom(boundAddresses);

final FullHttpRequest blockedRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/");
blockedRequest.headers().add("blockme", "Not Allowed");
blockedRequest.headers().add(HOST, "localhost");
blockedRequest.headers().add("scheme", "http");

final List<FullHttpResponse> responses = new ArrayList<>();
try (Netty4HttpClient nettyHttpClient = Netty4HttpClient.http2()) {
try {
FullHttpResponse blockedResponse = nettyHttpClient.send(transportAddress.address(), blockedRequest);
responses.add(blockedResponse);
assertThat(blockedResponse.status().code(), equalTo(401));
} finally {
responses.forEach(ReferenceCounted::release);
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public Map<String, Supplier<HttpServerTransport>> getHttpTransports(
);
}

private SharedGroupFactory getSharedGroupFactory(Settings settings) {
protected SharedGroupFactory getSharedGroupFactory(Settings settings) {
SharedGroupFactory groupFactory = this.groupFactory.get();
if (groupFactory != null) {
assert groupFactory.getSettings().equals(settings) : "Different settings than originally provided";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.http.netty4.example;

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public RestController(
this.headersToCopy = headersToCopy;
this.usageService = usageService;
if (handlerWrapper == null) {
handlerWrapper = (delegate) -> new DelegatingRestHandler(delegate);
handlerWrapper = (h) -> h;
}
this.handlerWrapper = handlerWrapper;
this.client = client;
Expand Down
64 changes: 62 additions & 2 deletions server/src/main/java/org/opensearch/rest/RestHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@
/**
* Handler for REST requests
*
* If new methods are added to this interface they must also be added to {@link DelegatingRestHandler}
*
* @opensearch.api
*/
@FunctionalInterface
Expand Down Expand Up @@ -117,6 +115,68 @@ default boolean allowSystemIndexAccessByDefault() {
return false;
}

static RestHandler wrapper(RestHandler delegate) {
return new Wrapper(delegate);
}

/**
* Wrapper for a handler.
*
* @opensearch.internal
*/
class Wrapper implements RestHandler {
private final RestHandler delegate;

public Wrapper(RestHandler delegate) {
this.delegate = Objects.requireNonNull(delegate, "RestHandler delegate can not be null");
}

@Override
public String toString() {
return delegate.toString();
}

@Override
public void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {
delegate.handleRequest(request, channel, client);
}

@Override
public boolean canTripCircuitBreaker() {
return delegate.canTripCircuitBreaker();
}

@Override
public boolean supportsContentStream() {
return delegate.supportsContentStream();
}

@Override
public boolean allowsUnsafeBuffers() {
return delegate.allowsUnsafeBuffers();
}

@Override
public List<Route> routes() {
return delegate.routes();
}

@Override
public List<DeprecatedRoute> deprecatedRoutes() {
return delegate.deprecatedRoutes();
}

@Override
public List<ReplacedRoute> replacedRoutes() {
return delegate.replacedRoutes();
}

@Override
public boolean allowSystemIndexAccessByDefault() {
return delegate.allowSystemIndexAccessByDefault();
}
}

/**
* Route for the request.
*
Expand Down
Loading

0 comments on commit 226299a

Please sign in to comment.