Skip to content

Commit

Permalink
Adding ActionListenerResponseHandler for protobuf
Browse files Browse the repository at this point in the history
Signed-off-by: Vacha Shah <[email protected]>
  • Loading branch information
VachaShah committed Jan 16, 2024
1 parent 9f00058 commit e8bfd09
Show file tree
Hide file tree
Showing 59 changed files with 818 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,12 @@ public String executor() {
public TestResponse read(StreamInput in) throws IOException {
return new TestResponse(in);
}

@Override
public TestResponse read(byte[] in) throws IOException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'read'");
}
}
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,10 @@ public Response read(StreamInput in) throws IOException {
public String toString() {
return super.toString() + "/" + listener;
}

@Override
public Response read(byte[] in) throws IOException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'read'");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action;

import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.ProtobufWriteable;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.transport.TransportResponse;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportException;
import org.opensearch.transport.TransportResponseHandler;

import java.io.IOException;
import java.util.Objects;

/**
* A simple base class for action response listeners, defaulting to using the SAME executor (as its
* very common on response handlers).
*
* @opensearch.api
*/
public class ProtobufActionListenerResponseHandler<Response extends TransportResponse> implements TransportResponseHandler<Response> {

private final ActionListener<? super Response> listener;
private final ProtobufWriteable.Reader<Response> reader;
private final String executor;

public ProtobufActionListenerResponseHandler(
ActionListener<? super Response> listener,
ProtobufWriteable.Reader<Response> reader,
String executor
) {
this.listener = Objects.requireNonNull(listener);
this.reader = Objects.requireNonNull(reader);
this.executor = Objects.requireNonNull(executor);
}

public ProtobufActionListenerResponseHandler(ActionListener<? super Response> listener, ProtobufWriteable.Reader<Response> reader) {
this(listener, reader, ThreadPool.Names.SAME);
}

@Override
public void handleResponse(Response response) {
listener.onResponse(response);
}

@Override
public void handleException(TransportException e) {
listener.onFailure(e);
}

@Override
public String executor() {
return executor;
}

@Override
public String toString() {
return super.toString() + "/" + listener;
}

@Override
public Response read(StreamInput in) throws IOException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'read'");
}

@Override
public Response read(byte[] in) throws IOException {
return reader.read(in);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,12 @@ public void handleResponse(final FieldCapabilitiesIndexResponse response) {
public void handleException(TransportException exp) {
onFailure(shardRouting, exp);
}

@Override
public FieldCapabilitiesIndexResponse read(byte[] in) throws IOException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'read'");
}
}
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,12 @@ public void handleResponse(ResyncReplicationResponse response) {
public void handleException(TransportException exp) {
listener.onFailure(exp);
}

@Override
public ResyncReplicationResponse read(byte[] in) throws IOException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'read'");
}
}
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,12 @@ public String executor() {
public GetAllPitNodesResponse read(StreamInput in) throws IOException {
return new GetAllPitNodesResponse(in);
}

@Override
public GetAllPitNodesResponse read(byte[] in) throws IOException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'read'");
}
}
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,15 @@
import org.opensearch.action.ActionListenerResponseHandler;
import org.opensearch.action.IndicesRequest;
import org.opensearch.action.OriginalIndices;
import org.opensearch.action.ProtobufActionListenerResponseHandler;
import org.opensearch.action.support.ChannelActionListener;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.Nullable;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.ProtobufWriteable;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
Expand Down Expand Up @@ -241,16 +244,31 @@ public void sendExecuteQuery(
// we optimize this and expect a QueryFetchSearchResult if we only have a single shard in the search request
// this used to be the QUERY_AND_FETCH which doesn't exist anymore.
final boolean fetchDocuments = request.numberOfShards() == 1;
Writeable.Reader<SearchPhaseResult> reader = fetchDocuments ? QueryFetchSearchResult::new : QuerySearchResult::new;
// System.setProperty("opensearch.experimental.feature.search_with_protobuf.enabled", "true");
if (FeatureFlags.isEnabled(FeatureFlags.PROTOBUF_SETTING)) {
// System.out.println("Feature flag enabled");
ProtobufWriteable.Reader<SearchPhaseResult> reader = fetchDocuments ? QueryFetchSearchResult::new : QuerySearchResult::new;

final ActionListener handler = responseWrapper.apply(connection, listener);
transportService.sendChildRequest(
connection,
QUERY_ACTION_NAME,
request,
task,
new ProtobufConnectionCountingHandler<>(handler, reader, clientConnections, connection.getNode().getId())
);
} else {
Writeable.Reader<SearchPhaseResult> reader = fetchDocuments ? QueryFetchSearchResult::new : QuerySearchResult::new;

final ActionListener handler = responseWrapper.apply(connection, listener);
transportService.sendChildRequest(
connection,
QUERY_ACTION_NAME,
request,
task,
new ConnectionCountingHandler<>(handler, reader, clientConnections, connection.getNode().getId())
);
final ActionListener handler = responseWrapper.apply(connection, listener);
transportService.sendChildRequest(
connection,
QUERY_ACTION_NAME,
request,
task,
new ConnectionCountingHandler<>(handler, reader, clientConnections, connection.getNode().getId())
);
}
}

public void sendExecuteQuery(
Expand Down Expand Up @@ -775,4 +793,57 @@ private boolean assertNodePresent() {
return true;
}
}

/**
* A handler that counts connections for protobuf
*
* @opensearch.internal
*/
final class ProtobufConnectionCountingHandler<Response extends TransportResponse> extends ProtobufActionListenerResponseHandler<
Response> {
private final Map<String, Long> clientConnections;
private final String nodeId;

ProtobufConnectionCountingHandler(
final ActionListener<? super Response> listener,
final ProtobufWriteable.Reader<Response> responseReader,
final Map<String, Long> clientConnections,
final String nodeId
) {
super(listener, responseReader);
this.clientConnections = clientConnections;
this.nodeId = nodeId;
// Increment the number of connections for this node by one
clientConnections.compute(nodeId, (id, conns) -> conns == null ? 1 : conns + 1);
}

@Override
public void handleResponse(Response response) {
super.handleResponse(response);
// Decrement the number of connections or remove it entirely if there are no more connections
// We need to remove the entry here so we don't leak when nodes go away forever
assert assertNodePresent();
clientConnections.computeIfPresent(nodeId, (id, conns) -> conns.longValue() == 1 ? null : conns - 1);
}

@Override
public void handleException(TransportException e) {
super.handleException(e);
// Decrement the number of connections or remove it entirely if there are no more connections
// We need to remove the entry here so we don't leak when nodes go away forever
assert assertNodePresent();
clientConnections.computeIfPresent(nodeId, (id, conns) -> conns.longValue() == 1 ? null : conns - 1);
}

private boolean assertNodePresent() {
clientConnections.compute(nodeId, (id, conns) -> {
assert conns != null : "number of connections for " + id + " is null, but should be an integer";
assert conns >= 1 : "number of connections for " + id + " should be >= 1 but was " + conns;
return conns;
});
// Always return true, there is additional asserting here, the boolean is just so this
// can be skipped when assertions are not enabled
return true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,12 @@ public void handleResponse(ShardResponse response) {
public void handleException(TransportException e) {
onOperation(shard, shardIt, shardIndex, e);
}

@Override
public ShardResponse read(byte[] in) throws IOException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'read'");
}
}
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,13 @@ public void handleException(TransportException exp) {
public String executor() {
return ThreadPool.Names.SAME;
}

@Override
public TransportBroadcastByNodeAction<Request, Response, ShardOperationResult>.NodeResponse read(byte[] in)
throws IOException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'read'");
}
}
);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,12 @@ public void handleException(TransportException exp) {
public String executor() {
return ThreadPool.Names.SAME;
}

@Override
public NodeResponse read(byte[] in) throws IOException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'read'");
}
}
);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1169,6 +1169,12 @@ public void handleException(TransportException exp) {
finishWithUnexpectedFailure(e);
}
}

@Override
public Response read(byte[] in) throws IOException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'read'");
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,12 @@ public void handleException(TransportException exp) {
listener.onFailure(exp);
}
}

@Override
public Response read(byte[] in) throws IOException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'read'");
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,12 @@ public void handleResponse(final Response response) {
public void handleException(TransportException exp) {
listener.onFailure(exp);
}

@Override
public Response read(byte[] in) throws IOException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'read'");
}
}
);
} else {
Expand Down Expand Up @@ -302,6 +308,12 @@ public void handleResponse(final Response response) {
public void handleException(TransportException exp) {
onFailure(finalShardRouting, exp);
}

@Override
public Response read(byte[] in) throws IOException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'read'");
}
}
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,14 @@ public void handleException(TransportException exp) {
public String executor() {
return ThreadPool.Names.SAME;
}

@Override
public
TransportTasksAction<OperationTask, TasksRequest, TasksResponse, TaskResponse>.NodeTasksResponse
read(byte[] in) throws IOException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'read'");
}
}
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,12 @@ public void handleException(TransportException exp) {
public String executor() {
return Names.SAME;
}

@Override
public Empty read(byte[] in) throws IOException {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'read'");
}
}
);
}
Expand Down
Loading

0 comments on commit e8bfd09

Please sign in to comment.