Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

QueryFetchSearchResult as a proto message and node-to-node communication with protobuf #11910

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
82db23c
Adding proto message structure for QuerySearchResult
VachaShah Jan 4, 2024
fe1fd90
Adding ProtobufWriteable interface and integrating with transport req…
VachaShah Jan 4, 2024
931e1d9
Adding experimental feature flag for protobuf integration
VachaShah Jan 5, 2024
7bbf23a
Removing ShardSearchRequest from proto and modifying QuerySearchResult
VachaShah Jan 9, 2024
044b07f
Making protobuf feature flag dynamic
VachaShah Jan 12, 2024
b198451
Merging latest changes from main
VachaShah Jan 14, 2024
4f569c0
Adding feature flag to cluster settings
VachaShah Jan 14, 2024
891a085
Converting FetchSearchResult to proto message
VachaShah Jan 14, 2024
e00c1a8
Converting QueryFetchSearchResult to a proto message
VachaShah Jan 16, 2024
6274355
Adding ActionListenerResponseHandler for protobuf
VachaShah Jan 16, 2024
540846e
Protobuf support for node-to-node communication
VachaShah Jan 17, 2024
e7024db
Fixing precommit
VachaShah Jan 17, 2024
3d49a4b
Fixing tests
VachaShah Jan 18, 2024
a2cf25a
Addressing comments
VachaShah Jan 22, 2024
0cf2fcb
Adding abstraction for InboundMessage - comment
VachaShah Jan 23, 2024
f7a9d47
Removing unused code
VachaShah Jan 23, 2024
58a8af8
Fixing empty hits
VachaShah Jan 26, 2024
1ae993b
Improving the logic for inboundpipeline
VachaShah Jan 28, 2024
bd83a3f
Added tests
VachaShah Feb 2, 2024
fb24d35
Adding protos for the remaining structures
VachaShah Feb 8, 2024
67a3262
Incorporating changes from main
VachaShah Feb 8, 2024
40df9ac
Improving protocol detection for node to node message with protobuf
VachaShah Feb 14, 2024
d6431ed
Fixing reading for collapse values object
VachaShah Feb 23, 2024
69831a8
Converting from byte[] to InputStream for ProtobufWriteable
VachaShah Feb 26, 2024
0cb9018
Adding shardSearchRequest to QuerySearchResult proto
VachaShah Feb 27, 2024
4b801ea
Addressing minor comments and cleaning up code
VachaShah Feb 28, 2024
5ae83ab
Extract native and protobuf message handler from InboundHandler
VachaShah Feb 29, 2024
243f039
Changing protocol enum to string
VachaShah Feb 29, 2024
64b79dc
Fixing tests for message listener in inbound handler
VachaShah Mar 1, 2024
ff965e6
Fixing mixed cluster tests
VachaShah Mar 5, 2024
de88e46
Addressing comments and fixing some proto messages
VachaShah Mar 13, 2024
b7ee789
Extracting protobuf serialization from model classes
VachaShah Mar 25, 2024
117572c
Fixing precommit
VachaShah Mar 26, 2024
46eeaf6
Addressing comments
VachaShah Mar 27, 2024
ed00f89
Merging latest changes from main
VachaShah Apr 9, 2024
324c58f
Fixing after merging to main
VachaShah Apr 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions distribution/src/config/opensearch.yml
Original file line number Diff line number Diff line change
Expand Up @@ -125,3 +125,7 @@ ${path.logs}
# Gates the functionality of enabling Opensearch to use pluggable caches with respective store names via setting.
#
#opensearch.experimental.feature.pluggable.caching.enabled: false
#
# Gates the functionality of enabling Opensearch to use protobuf with basic searches and for node-to-node communication.
#
#opensearch.experimental.feature.search_with_protobuf.enabled: false
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,15 @@ private boolean inspectable(ExecutableElement executable) {
*/
private boolean inspectable(Element element) {
final PackageElement pckg = processingEnv.getElementUtils().getPackageOf(element);
return pckg.getQualifiedName().toString().startsWith(OPENSEARCH_PACKAGE);
return pckg.getQualifiedName().toString().startsWith(OPENSEARCH_PACKAGE)
&& !element.getEnclosingElement()
.getAnnotationMirrors()
.stream()
.anyMatch(
m -> m.getAnnotationType()
.toString() /* ClassSymbol.toString() returns class name */
.equalsIgnoreCase("javax.annotation.Generated")
);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.core.common.io.stream;

import org.opensearch.common.annotation.ExperimentalApi;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

/**
* Implementers can be written to a {@linkplain OutputStream} and read from a byte array. This allows them to be "thrown
* across the wire" using OpenSearch's internal protocol with protobuf bytes.
*
* @opensearch.api
*/
@ExperimentalApi
public interface BytesWriteable {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we generalize of the source (stream) here instead of introducing a parallel hierarchy?

public interface StreamWriteable {
    interface Writer<O, V> {
        void write(final O out, V value) throws IOException;
    }

    interface Reader<I, V> {
        V read(final I in) throws IOException;
    }
}

In this case we could make the current Writeable and new one the part of the same hierarchy?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had tried generalizing the hierarchy, the problem with that is: TransportMessage cannot implement both Writeable and BytesWriteable as it complains that it is implementing the same thing twice from the same hierarchy. We want the message to implement both since for now both ways exist side by side. I tried multiple ways to do it but ended up with the same fix of keeping it separate for now.

Copy link
Collaborator

@reta reta Mar 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm ... we should be able to implement both: the classes have functions with different input arguments, in any case - we don't have to implement both on the same class, the intermediary layers could do that.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I think I tried a little differently than mentioned here, let me add this change to a newer commit as well.


/**
* Write this into the {@linkplain OutputStream}.
*/
void writeTo(OutputStream out) throws IOException;

/**
* Reference to a method that can write some object to a {@link OutputStream}.
*
* @opensearch.experimental
*/
@FunctionalInterface
@ExperimentalApi
interface Writer<V> {

/**
* Write {@code V}-type {@code value} to the {@code out}put stream.
*
* @param out Output to write the {@code value} too
* @param value The value to add
*/
void write(final OutputStream out, V value) throws IOException;
}

/**
* Reference to a method that can read some object from a byte array.
*
* @opensearch.experimental
*/
@FunctionalInterface
@ExperimentalApi
interface Reader<V> {

/**
* Read {@code V}-type value from a byte array.
*
* @param in byte array to read the value from
*/
V read(final InputStream in) throws IOException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,24 @@

package org.opensearch.core.transport;

import org.opensearch.core.common.io.stream.BytesWriteable;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.common.transport.TransportAddress;

import java.io.InputStream;

/**
* Message over the transport interface
*
* @opensearch.internal
*/
public abstract class TransportMessage implements Writeable {
public abstract class TransportMessage implements Writeable, BytesWriteable {

private TransportAddress remoteAddress;

private String protocol;

public void remoteAddress(TransportAddress remoteAddress) {
this.remoteAddress = remoteAddress;
}
Expand All @@ -53,6 +58,13 @@
return remoteAddress;
}

public String getProtocol() {
if (protocol != null) {
return protocol;

Check warning on line 63 in libs/core/src/main/java/org/opensearch/core/transport/TransportMessage.java

View check run for this annotation

Codecov / codecov/patch

libs/core/src/main/java/org/opensearch/core/transport/TransportMessage.java#L63

Added line #L63 was not covered by tests
}
return "native";
}

/**
* Constructs a new empty transport message
*/
Expand All @@ -63,4 +75,10 @@
* currently a no-op
*/
public TransportMessage(StreamInput in) {}

/**
* Constructs a new transport message with the data from the byte array. This is
* currently a no-op
*/
public TransportMessage(InputStream in) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.opensearch.core.common.io.stream.StreamOutput;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

/**
* Response over the transport interface
Expand All @@ -60,6 +62,24 @@
super(in);
}

/**
* Constructs a new transport response with the data from the byte array. This is
* currently a no-op. However, this exists to allow extenders to call <code>super(in)</code>
* so that reading can mirror writing where we often call <code>super.writeTo(out)</code>.
*/
public TransportResponse(InputStream in) throws IOException {
super(in);
}

/**
* Writes this response to the {@linkplain OutputStream}. This is added here so that classes
* don't have to implement since it is an experimental feature and only being added for
* search apis incrementally.
*/
public void writeTo(OutputStream out) throws IOException {
// no-op
}

Check warning on line 81 in libs/core/src/main/java/org/opensearch/core/transport/TransportResponse.java

View check run for this annotation

Codecov / codecov/patch

libs/core/src/main/java/org/opensearch/core/transport/TransportResponse.java#L81

Added line #L81 was not covered by tests

/**
* Empty transport response
*
Expand All @@ -75,5 +95,8 @@

@Override
public void writeTo(StreamOutput out) throws IOException {}

@Override
public void writeTo(OutputStream out) throws IOException {}

Check warning on line 100 in libs/core/src/main/java/org/opensearch/core/transport/TransportResponse.java

View check run for this annotation

Codecov / codecov/patch

libs/core/src/main/java/org/opensearch/core/transport/TransportResponse.java#L100

Added line #L100 was not covered by tests
}
}
27 changes: 27 additions & 0 deletions server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,32 @@ tasks.named("dependencyLicenses").configure {
}
}

tasks.named("missingJavadoc").configure {
/*
* annotate_code in L210 does not add the Generated annotation to nested code generated using protobuf.
* TODO: Add support to missingJavadoc task to ignore all such nested classes.
* https://github.com/opensearch-project/OpenSearch/issues/11913
*/
dependsOn("generateProto")
javadocMissingIgnore = [
"org.opensearch.server.proto.QuerySearchResultProto.QuerySearchResult.RescoreDocIds.setIntegerOrBuilder",
"org.opensearch.server.proto.QuerySearchResultProto.QuerySearchResult.RescoreDocIdsOrBuilder",
"org.opensearch.server.proto.QuerySearchResultProto.QuerySearchResult.TopDocs.ScoreDocOrBuilder",
"org.opensearch.server.proto.QuerySearchResultProto.QuerySearchResult.TopDocsOrBuilder",
"org.opensearch.server.proto.QuerySearchResultProto.QuerySearchResult.TopDocsAndMaxScoreOrBuilder",
"org.opensearch.server.proto.FetchSearchResultProto.SearchHit.SearchSortValuesOrBuilder",
"org.opensearch.server.proto.FetchSearchResultProto.SearchHit.HighlightFieldOrBuilder",
"org.opensearch.server.proto.FetchSearchResultProto.SearchHit.DocumentFieldOrBuilder",
"org.opensearch.server.proto.FetchSearchResultProto.SearchHit.NestedIdentityOrBuilder",
"org.opensearch.server.proto.NodeToNodeMessageProto.NodeToNodeMessage.MessageCase",
"org.opensearch.server.proto.NodeToNodeMessageProto.NodeToNodeMessage.ResponseHandlersListOrBuilder",
"org.opensearch.server.proto.NodeToNodeMessageProto.NodeToNodeMessage.HeaderOrBuilder",
"org.opensearch.server.proto.FetchSearchResultProto.SearchHit.Explanation.ExplanationValueCase",
"org.opensearch.server.proto.FetchSearchResultProto.SearchHit.ExplanationOrBuilder",
"org.opensearch.server.proto.ShardSearchRequestProto.OriginalIndices.IndicesOptionsOrBuilder",
]
}

tasks.named("filepermissions").configure {
mustRunAfter("generateProto")
}
Expand All @@ -364,6 +390,7 @@ tasks.named("licenseHeaders").configure {
excludes << 'org/opensearch/client/documentation/placeholder.txt'
// Ignore for protobuf generated code
excludes << 'org/opensearch/extensions/proto/*'
excludes << 'org/opensearch/server/proto/*'
}

tasks.test {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action;

import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.BytesWriteable;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.transport.TransportResponse;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportException;
import org.opensearch.transport.TransportResponseHandler;

import java.io.IOException;
import java.io.InputStream;
import java.util.Objects;

/**
* A simple base class for action response listeners, defaulting to using the SAME executor (as its
* very common on response handlers).
*
* @opensearch.api
*/
public class ProtobufActionListenerResponseHandler<Response extends TransportResponse> implements TransportResponseHandler<Response> {
VachaShah marked this conversation as resolved.
Show resolved Hide resolved

private final ActionListener<? super Response> listener;
private final BytesWriteable.Reader<Response> reader;
private final String executor;

public ProtobufActionListenerResponseHandler(
ActionListener<? super Response> listener,
BytesWriteable.Reader<Response> reader,
String executor
) {
this.listener = Objects.requireNonNull(listener);
this.reader = Objects.requireNonNull(reader);
this.executor = Objects.requireNonNull(executor);
}

Check warning on line 43 in server/src/main/java/org/opensearch/action/ProtobufActionListenerResponseHandler.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/ProtobufActionListenerResponseHandler.java#L39-L43

Added lines #L39 - L43 were not covered by tests

public ProtobufActionListenerResponseHandler(ActionListener<? super Response> listener, BytesWriteable.Reader<Response> reader) {
this(listener, reader, ThreadPool.Names.SAME);
}

Check warning on line 47 in server/src/main/java/org/opensearch/action/ProtobufActionListenerResponseHandler.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/ProtobufActionListenerResponseHandler.java#L46-L47

Added lines #L46 - L47 were not covered by tests

@Override
public void handleResponse(Response response) {
listener.onResponse(response);
}

Check warning on line 52 in server/src/main/java/org/opensearch/action/ProtobufActionListenerResponseHandler.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/ProtobufActionListenerResponseHandler.java#L51-L52

Added lines #L51 - L52 were not covered by tests

@Override
public void handleException(TransportException e) {
listener.onFailure(e);
}

Check warning on line 57 in server/src/main/java/org/opensearch/action/ProtobufActionListenerResponseHandler.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/ProtobufActionListenerResponseHandler.java#L56-L57

Added lines #L56 - L57 were not covered by tests

@Override
public String executor() {
return executor;

Check warning on line 61 in server/src/main/java/org/opensearch/action/ProtobufActionListenerResponseHandler.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/ProtobufActionListenerResponseHandler.java#L61

Added line #L61 was not covered by tests
}

@Override
public String toString() {
return super.toString() + "/" + listener;

Check warning on line 66 in server/src/main/java/org/opensearch/action/ProtobufActionListenerResponseHandler.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/ProtobufActionListenerResponseHandler.java#L66

Added line #L66 was not covered by tests
}

@Override
public Response read(StreamInput in) throws IOException {
throw new UnsupportedOperationException("Unimplemented method 'read'");

Check warning on line 71 in server/src/main/java/org/opensearch/action/ProtobufActionListenerResponseHandler.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/ProtobufActionListenerResponseHandler.java#L71

Added line #L71 was not covered by tests
}

@Override
public Response read(InputStream in) throws IOException {
return reader.read(in);

Check warning on line 76 in server/src/main/java/org/opensearch/action/ProtobufActionListenerResponseHandler.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/ProtobufActionListenerResponseHandler.java#L76

Added line #L76 was not covered by tests
}
}
Loading
Loading