-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adding protos for search classes #13178
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.common.document.serializer; | ||
|
||
import org.opensearch.common.document.DocumentField; | ||
|
||
import java.io.IOException; | ||
|
||
/** | ||
* Deserializer for {@link DocumentField} which can be implemented for different types of serde mechanisms. | ||
*/ | ||
public interface DocumentFieldDeserializer<T> { | ||
|
||
DocumentField createDocumentField(T inputStream) throws IOException; | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
/** Serializer package for documents. */ | ||
package org.opensearch.common.document.serializer; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,144 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.common.document.serializer.protobuf; | ||
|
||
import com.google.protobuf.ByteString; | ||
import org.opensearch.OpenSearchException; | ||
import org.opensearch.common.document.DocumentField; | ||
import org.opensearch.common.document.serializer.DocumentFieldDeserializer; | ||
import org.opensearch.core.common.text.Text; | ||
import org.opensearch.server.proto.FetchSearchResultProto; | ||
import org.opensearch.server.proto.FetchSearchResultProto.DocumentFieldValue; | ||
import org.opensearch.server.proto.FetchSearchResultProto.DocumentFieldValue.Builder; | ||
|
||
import java.io.IOException; | ||
import java.io.InputStream; | ||
import java.time.Instant; | ||
import java.time.ZoneId; | ||
import java.time.ZonedDateTime; | ||
import java.util.ArrayList; | ||
import java.util.Date; | ||
import java.util.HashMap; | ||
import java.util.LinkedHashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
|
||
/** | ||
* Deserializer for {@link DocumentField} to/from protobuf. | ||
*/ | ||
public class DocumentFieldProtobufDeserializer implements DocumentFieldDeserializer<InputStream> { | ||
Check warning on line 35 in server/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java Codecov / codecov/patchserver/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java#L35
|
||
|
||
private FetchSearchResultProto.SearchHit.DocumentField documentField; | ||
|
||
@Override | ||
public DocumentField createDocumentField(InputStream inputStream) throws IOException { | ||
documentField = FetchSearchResultProto.SearchHit.DocumentField.parseFrom(inputStream); | ||
String name = documentField.getName(); | ||
List<Object> values = new ArrayList<>(); | ||
Check warning on line 43 in server/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java Codecov / codecov/patchserver/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java#L41-L43
|
||
for (FetchSearchResultProto.DocumentFieldValue value : documentField.getValuesList()) { | ||
values.add(readDocumentFieldValueFromProtobuf(value)); | ||
} | ||
return new DocumentField(name, values); | ||
Check warning on line 47 in server/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java Codecov / codecov/patchserver/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java#L45-L47
|
||
} | ||
|
||
private Object readDocumentFieldValueFromProtobuf(FetchSearchResultProto.DocumentFieldValue documentFieldValue) throws IOException { | ||
if (documentFieldValue.hasValueString()) { | ||
return documentFieldValue.getValueString(); | ||
Check warning on line 52 in server/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java Codecov / codecov/patchserver/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java#L52
|
||
} else if (documentFieldValue.hasValueInt()) { | ||
return documentFieldValue.getValueInt(); | ||
Check warning on line 54 in server/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java Codecov / codecov/patchserver/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java#L54
|
||
} else if (documentFieldValue.hasValueLong()) { | ||
return documentFieldValue.getValueLong(); | ||
Check warning on line 56 in server/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java Codecov / codecov/patchserver/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java#L56
|
||
} else if (documentFieldValue.hasValueFloat()) { | ||
return documentFieldValue.getValueFloat(); | ||
Check warning on line 58 in server/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java Codecov / codecov/patchserver/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java#L58
|
||
} else if (documentFieldValue.hasValueDouble()) { | ||
return documentFieldValue.getValueDouble(); | ||
Check warning on line 60 in server/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java Codecov / codecov/patchserver/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java#L60
|
||
} else if (documentFieldValue.hasValueBool()) { | ||
return documentFieldValue.getValueBool(); | ||
Check warning on line 62 in server/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java Codecov / codecov/patchserver/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java#L62
|
||
} else if (documentFieldValue.getValueByteArrayList().size() > 0) { | ||
return documentFieldValue.getValueByteArrayList().toArray(); | ||
Check warning on line 64 in server/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java Codecov / codecov/patchserver/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java#L64
|
||
} else if (documentFieldValue.getValueArrayListList().size() > 0) { | ||
List<Object> list = new ArrayList<>(); | ||
Check warning on line 66 in server/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java Codecov / codecov/patchserver/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java#L66
|
||
for (FetchSearchResultProto.DocumentFieldValue value : documentFieldValue.getValueArrayListList()) { | ||
list.add(readDocumentFieldValueFromProtobuf(value)); | ||
} | ||
return list; | ||
Check warning on line 70 in server/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java Codecov / codecov/patchserver/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java#L68-L70
|
||
} else if (documentFieldValue.getValueMapMap().size() > 0) { | ||
Map<String, Object> map = Map.of(); | ||
Check warning on line 72 in server/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java Codecov / codecov/patchserver/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java#L72
|
||
for (Map.Entry<String, FetchSearchResultProto.DocumentFieldValue> entrySet : documentFieldValue.getValueMapMap().entrySet()) { | ||
map.put(entrySet.getKey(), readDocumentFieldValueFromProtobuf(entrySet.getValue())); | ||
} | ||
return map; | ||
Check warning on line 76 in server/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java Codecov / codecov/patchserver/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java#L74-L76
|
||
} else if (documentFieldValue.hasValueDate()) { | ||
return new Date(documentFieldValue.getValueDate()); | ||
Check warning on line 78 in server/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java Codecov / codecov/patchserver/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java#L78
|
||
} else if (documentFieldValue.hasValueZonedDate() && documentFieldValue.hasValueZonedTime()) { | ||
return ZonedDateTime.ofInstant( | ||
Instant.ofEpochMilli(documentFieldValue.getValueZonedTime()), | ||
ZoneId.of(documentFieldValue.getValueZonedDate()) | ||
Check warning on line 82 in server/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java Codecov / codecov/patchserver/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java#L80-L82
|
||
); | ||
} else if (documentFieldValue.hasValueText()) { | ||
return new Text(documentFieldValue.getValueText()); | ||
Check warning on line 85 in server/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java Codecov / codecov/patchserver/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java#L85
|
||
} else { | ||
throw new IOException("Can't read generic value of type [" + documentFieldValue + "]"); | ||
Check warning on line 87 in server/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java Codecov / codecov/patchserver/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java#L87
|
||
} | ||
} | ||
|
||
public static DocumentFieldValue.Builder convertDocumentFieldValueToProto(Object value, Builder valueBuilder) { | ||
if (value == null) { | ||
// null is not allowed in protobuf, so we use a special string to represent null | ||
return valueBuilder.setValueString("null"); | ||
Check warning on line 94 in server/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java Codecov / codecov/patchserver/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java#L94
|
||
} | ||
Class type = value.getClass(); | ||
Check warning on line 96 in server/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java Codecov / codecov/patchserver/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java#L96
|
||
if (type == String.class) { | ||
valueBuilder.setValueString((String) value); | ||
Check warning on line 98 in server/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java Codecov / codecov/patchserver/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java#L98
|
||
} else if (type == Integer.class) { | ||
valueBuilder.setValueInt((Integer) value); | ||
Check warning on line 100 in server/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java Codecov / codecov/patchserver/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java#L100
|
||
} else if (type == Long.class) { | ||
valueBuilder.setValueLong((Long) value); | ||
Check warning on line 102 in server/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java Codecov / codecov/patchserver/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java#L102
|
||
} else if (type == Float.class) { | ||
valueBuilder.setValueFloat((Float) value); | ||
Check warning on line 104 in server/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java Codecov / codecov/patchserver/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java#L104
|
||
} else if (type == Double.class) { | ||
valueBuilder.setValueDouble((Double) value); | ||
Check warning on line 106 in server/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java Codecov / codecov/patchserver/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java#L106
|
||
} else if (type == Boolean.class) { | ||
valueBuilder.setValueBool((Boolean) value); | ||
Check warning on line 108 in server/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java Codecov / codecov/patchserver/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java#L108
|
||
} else if (type == byte[].class) { | ||
valueBuilder.addValueByteArray(ByteString.copyFrom((byte[]) value)); | ||
Check warning on line 110 in server/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java Codecov / codecov/patchserver/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java#L110
|
||
} else if (type == List.class) { | ||
List<Object> list = (List<Object>) value; | ||
Check warning on line 112 in server/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java Codecov / codecov/patchserver/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java#L112
|
||
for (Object listValue : list) { | ||
valueBuilder.addValueArrayList(convertDocumentFieldValueToProto(listValue, valueBuilder)); | ||
} | ||
Check warning on line 115 in server/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java Codecov / codecov/patchserver/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java#L114-L115
|
||
} else if (type == Map.class || type == HashMap.class || type == LinkedHashMap.class) { | ||
Map<String, Object> map = (Map<String, Object>) value; | ||
Check warning on line 117 in server/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java Codecov / codecov/patchserver/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java#L117
|
||
for (Map.Entry<String, Object> entry : map.entrySet()) { | ||
valueBuilder.putValueMap(entry.getKey(), convertDocumentFieldValueToProto(entry.getValue(), valueBuilder).build()); | ||
} | ||
Check warning on line 120 in server/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java Codecov / codecov/patchserver/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java#L119-L120
|
||
} else if (type == Date.class) { | ||
valueBuilder.setValueDate(((Date) value).getTime()); | ||
Check warning on line 122 in server/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java Codecov / codecov/patchserver/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java#L122
|
||
} else if (type == ZonedDateTime.class) { | ||
valueBuilder.setValueZonedDate(((ZonedDateTime) value).getZone().getId()); | ||
valueBuilder.setValueZonedTime(((ZonedDateTime) value).toInstant().toEpochMilli()); | ||
Check warning on line 125 in server/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java Codecov / codecov/patchserver/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java#L124-L125
|
||
} else if (type == Text.class) { | ||
valueBuilder.setValueText(((Text) value).string()); | ||
Check warning on line 127 in server/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java Codecov / codecov/patchserver/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java#L127
|
||
} else { | ||
throw new OpenSearchException("Can't convert generic value of type [" + type + "] to protobuf"); | ||
Check warning on line 129 in server/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java Codecov / codecov/patchserver/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java#L129
|
||
} | ||
return valueBuilder; | ||
Check warning on line 131 in server/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java Codecov / codecov/patchserver/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java#L131
|
||
} | ||
|
||
public static FetchSearchResultProto.SearchHit.DocumentField convertDocumentFieldToProto(DocumentField documentField) { | ||
FetchSearchResultProto.SearchHit.DocumentField.Builder builder = FetchSearchResultProto.SearchHit.DocumentField.newBuilder(); | ||
builder.setName(documentField.getName()); | ||
Check warning on line 136 in server/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java Codecov / codecov/patchserver/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java#L135-L136
|
||
for (Object value : documentField.getValues()) { | ||
FetchSearchResultProto.DocumentFieldValue.Builder valueBuilder = FetchSearchResultProto.DocumentFieldValue.newBuilder(); | ||
builder.addValues(convertDocumentFieldValueToProto(value, valueBuilder)); | ||
} | ||
return builder.build(); | ||
Check warning on line 141 in server/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java Codecov / codecov/patchserver/src/main/java/org/opensearch/common/document/serializer/protobuf/DocumentFieldProtobufDeserializer.java#L138-L141
|
||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
/** Protobuf Serializer package for documents. */ | ||
package org.opensearch.common.document.serializer.protobuf; |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -67,6 +67,11 @@ public class FeatureFlags { | |
*/ | ||
public static final String PLUGGABLE_CACHE = "opensearch.experimental.feature.pluggable.caching.enabled"; | ||
|
||
/** | ||
* Gates the functionality of integrating protobuf within search API and node-to-node communication. | ||
*/ | ||
public static final String PROTOBUF = "opensearch.experimental.feature.search_with_protobuf.enabled"; | ||
|
||
public static final Setting<Boolean> REMOTE_STORE_MIGRATION_EXPERIMENTAL_SETTING = Setting.boolSetting( | ||
REMOTE_STORE_MIGRATION_EXPERIMENTAL, | ||
false, | ||
|
@@ -93,14 +98,17 @@ public class FeatureFlags { | |
|
||
public static final Setting<Boolean> PLUGGABLE_CACHE_SETTING = Setting.boolSetting(PLUGGABLE_CACHE, false, Property.NodeScope); | ||
|
||
public static final Setting<Boolean> PROTOBUF_SETTING = Setting.boolSetting(PROTOBUF, false, Property.NodeScope, Property.Dynamic); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. PROTOBUF_TRANSPORT_SETTING There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The feature/effort is not just transport specific though, protobuf is at the transport level and also at the API request response level. WDYT? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this variable and the name of the setting should match our intent. Won't we be rolling out the search API with protobuf support, then another API, etc.? What's the setting that will live the longest? Either way this variable and the setting string should match I think. The most generic one:
Search specific.
I'm good with either, but I prefer (1) because I don't think we'll ever want to have 2 APIs with protobuf support enabled/disabled separately. I could totally be wrong, too. |
||
|
||
private static final List<Setting<Boolean>> ALL_FEATURE_FLAG_SETTINGS = List.of( | ||
REMOTE_STORE_MIGRATION_EXPERIMENTAL_SETTING, | ||
EXTENSIONS_SETTING, | ||
IDENTITY_SETTING, | ||
TELEMETRY_SETTING, | ||
DATETIME_FORMATTER_CACHING_SETTING, | ||
WRITEABLE_REMOTE_INDEX_SETTING, | ||
PLUGGABLE_CACHE_SETTING | ||
PLUGGABLE_CACHE_SETTING, | ||
PROTOBUF_SETTING | ||
); | ||
/** | ||
* Should store the settings from opensearch.yml. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,6 +33,7 @@ | |
package org.opensearch.search.fetch; | ||
|
||
import org.opensearch.common.annotation.PublicApi; | ||
import org.opensearch.common.util.FeatureFlags; | ||
import org.opensearch.core.common.io.stream.StreamInput; | ||
import org.opensearch.core.common.io.stream.StreamOutput; | ||
import org.opensearch.search.SearchHit; | ||
|
@@ -41,8 +42,13 @@ | |
import org.opensearch.search.SearchShardTarget; | ||
import org.opensearch.search.internal.ShardSearchContextId; | ||
import org.opensearch.search.query.QuerySearchResult; | ||
import org.opensearch.search.serializer.protobuf.SearchHitsProtobufDeserializer; | ||
import org.opensearch.server.proto.FetchSearchResultProto; | ||
import org.opensearch.server.proto.ShardSearchRequestProto; | ||
|
||
import java.io.ByteArrayInputStream; | ||
import java.io.IOException; | ||
import java.io.InputStream; | ||
|
||
/** | ||
* Result from a fetch | ||
|
@@ -56,6 +62,8 @@ | |
// client side counter | ||
private transient int counter; | ||
|
||
private FetchSearchResultProto.FetchSearchResult fetchSearchResultProto; | ||
|
||
public FetchSearchResult() {} | ||
|
||
public FetchSearchResult(StreamInput in) throws IOException { | ||
|
@@ -64,9 +72,24 @@ | |
hits = new SearchHits(in); | ||
} | ||
|
||
public FetchSearchResult(InputStream in) throws IOException { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a code smell. We have This needs to be disambiguated, likely by implementing a base class and separate implementations for one vs. the other stream. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Both the constructors come from its base class implementing Writeable and BytesWriteable (this is not part of this PR). Let me see if I can make this a little more disambiguated. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need to do the same things as with |
||
this.fetchSearchResultProto = FetchSearchResultProto.FetchSearchResult.parseFrom(in); | ||
contextId = new ShardSearchContextId( | ||
this.fetchSearchResultProto.getContextId().getSessionId(), | ||
this.fetchSearchResultProto.getContextId().getId() | ||
); | ||
SearchHitsProtobufDeserializer protobufSerializer = new SearchHitsProtobufDeserializer(); | ||
hits = protobufSerializer.createSearchHits(new ByteArrayInputStream(this.fetchSearchResultProto.getHits().toByteArray())); | ||
} | ||
|
||
public FetchSearchResult(ShardSearchContextId id, SearchShardTarget shardTarget) { | ||
this.contextId = id; | ||
setSearchShardTarget(shardTarget); | ||
this.fetchSearchResultProto = FetchSearchResultProto.FetchSearchResult.newBuilder() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Transport-specific implementation needs to live in its own namespaces/classes. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is not a transport specific implementation though. This is the response/request specific serialization and deserialization. Transport specific protobuf implementation will be separate which is sent over the wire of which this message is a part. Transport specific protobuf will be a part of the next PR. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The way it's written the request/response can serialize its way two ways, however the serialization primitives are all over the class. I think there are a few options (and possibly more).
Maybe @reta has better ideas? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks @dblock definitively +1 (https://github.com/opensearch-project/OpenSearch/pull/13178/files#r1566328274) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Doing something similar to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for the hint, we probably need to align the that between transport and protocol, I will look into it shortly, thanks! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @VachaShah OK, I think we won't be able to move on with this change before we solve the protocol handling by outbound communication (we sort of addressed the inbound only, but not outbound, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm with this. I think the "right" way is to first PR splitting out the native serialization and deserialization out of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Makes sense! I will look into this. For abstracting the outbound side of transport and adding support for multiple protocols on that side of transport, I have created #13293. Next, I am looking into decoupling |
||
.setContextId( | ||
ShardSearchRequestProto.ShardSearchContextId.newBuilder().setSessionId(id.getSessionId()).setId(id.getId()).build() | ||
) | ||
.build(); | ||
} | ||
|
||
@Override | ||
|
@@ -82,6 +105,11 @@ | |
public void hits(SearchHits hits) { | ||
assert assertNoSearchTarget(hits); | ||
this.hits = hits; | ||
if (FeatureFlags.isEnabled(FeatureFlags.PROTOBUF_SETTING) && this.fetchSearchResultProto != null) { | ||
this.fetchSearchResultProto = this.fetchSearchResultProto.toBuilder() | ||
.setHits(SearchHitsProtobufDeserializer.convertHitsToProto(hits)) | ||
.build(); | ||
} | ||
} | ||
|
||
private boolean assertNoSearchTarget(SearchHits hits) { | ||
|
@@ -92,6 +120,16 @@ | |
} | ||
|
||
public SearchHits hits() { | ||
if (FeatureFlags.isEnabled(FeatureFlags.PROTOBUF_SETTING) && this.fetchSearchResultProto != null) { | ||
SearchHits hits; | ||
try { | ||
SearchHitsProtobufDeserializer protobufSerializer = new SearchHitsProtobufDeserializer(); | ||
hits = protobufSerializer.createSearchHits(new ByteArrayInputStream(this.fetchSearchResultProto.getHits().toByteArray())); | ||
return hits; | ||
} catch (IOException e) { | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
return hits; | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we call this something more generic since we may be adding more than search and gate all of protobuf transport behind it? What do you think about
"opensearch.experimental.feature.transport.protobuf.enabled"
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, only search related transport will be gated behind this since when we add the transport specific node to node protobuf structure (in the next PR), it will consider only search related classes for now. We can keep adding more classes once we expand the use case. That message would be https://github.com/opensearch-project/OpenSearch/pull/11910/files#diff-3b6cef54e769860b6ff96d7052208f6fe2ef8a86ea04aec232044984ba52e5db.