From 7d1d117c1e00a4431f88a976c269aef166568a03 Mon Sep 17 00:00:00 2001 From: Vacha Shah Date: Wed, 13 Mar 2024 06:03:35 +0000 Subject: [PATCH] Addressing comments and fixing some proto messages Signed-off-by: Vacha Shah --- ...obufWriteable.java => BytesWriteable.java} | 2 +- .../core/transport/TransportMessage.java | 4 +- .../node/tasks/CancellableTasksIT.java | 7 - .../action/ActionListenerResponseHandler.java | 7 - ...ProtobufActionListenerResponseHandler.java | 8 +- ...TransportFieldCapabilitiesIndexAction.java | 7 - .../TransportResyncReplicationAction.java | 7 - .../opensearch/action/search/PitService.java | 7 - .../search/QueryPhaseResultConsumer.java | 11 +- .../action/search/SearchTransportService.java | 6 +- .../broadcast/TransportBroadcastAction.java | 7 - .../node/TransportBroadcastByNodeAction.java | 8 - .../support/nodes/TransportNodesAction.java | 7 - .../TransportReplicationAction.java | 7 - ...ransportInstanceSingleOperationAction.java | 7 - .../shard/TransportSingleShardAction.java | 13 -- .../support/tasks/TransportTasksAction.java | 9 - .../coordination/FollowersChecker.java | 7 - .../cluster/coordination/JoinHelper.java | 13 -- .../cluster/coordination/LeaderChecker.java | 7 - .../coordination/PreVoteCollector.java | 7 - .../PublicationTransportHandler.java | 14 -- .../decommission/DecommissionController.java | 7 - .../common/document/DocumentField.java | 116 ++++++++++-- .../org/opensearch/discovery/PeerFinder.java | 7 - .../extensions/ExtensionsManager.java | 7 - .../UpdateSettingsResponseHandler.java | 7 - .../ExtensionTransportActionsHandler.java | 13 -- .../rest/RestSendToExtensionAction.java | 7 - .../RetentionLeaseBackgroundSyncAction.java | 7 - .../index/seqno/RetentionLeaseSyncAction.java | 7 - .../recovery/PeerRecoveryTargetService.java | 7 - .../checkpoint/PublishCheckpointAction.java | 7 - .../indices/store/IndicesStore.java | 7 - .../java/org/opensearch/search/SearchHit.java | 10 ++ .../org/opensearch/search/SearchHits.java | 18 +- .../search/fetch/FetchSearchResult.java | 2 +- .../search/internal/ShardSearchRequest.java | 3 + .../search/query/QuerySearchResult.java | 75 ++++++-- .../snapshots/SnapshotShardsService.java | 7 - .../TraceableTransportResponseHandler.java | 7 - .../EmptyTransportResponseHandler.java | 10 -- .../opensearch/transport/InboundPipeline.java | 3 +- .../transport/NodeToNodeMessage.java | 43 ++++- .../opensearch/transport/OutboundHandler.java | 48 +++-- .../transport/PlainTransportFuture.java | 7 - .../transport/ProtobufMessageHandler.java | 14 ++ .../transport/RemoteClusterConnection.java | 7 - .../transport/SniffConnectionStrategy.java | 7 - .../transport/TransportActionProxy.java | 7 - .../transport/TransportHandshaker.java | 7 - .../transport/TransportResponseHandler.java | 14 +- .../proto/server/NodeToNodeMessageProto.proto | 2 + .../search/FetchSearchResultProto.proto | 18 +- .../search/QuerySearchResultProto.proto | 1 + .../search/ShardSearchRequestProto.proto | 6 +- ...tAddVotingConfigExclusionsActionTests.java | 7 - ...learVotingConfigExclusionsActionTests.java | 7 - ...ReplicationAllPermitsAcquisitionTests.java | 8 - .../coordination/FollowersCheckerTests.java | 26 --- .../coordination/LeaderCheckerTests.java | 8 - .../coordination/PreVoteCollectorTests.java | 7 - .../opensearch/discovery/PeerFinderTests.java | 7 - .../SegmentReplicationSourceServiceTests.java | 19 -- .../transport/InboundHandlerTests.java | 33 +--- .../transport/OutboundHandlerTests.java | 3 +- .../transport/TransportActionProxyTests.java | 13 -- ...ortServiceDeserializationFailureTests.java | 15 -- .../AbstractSimpleTransportTestCase.java | 170 ------------------ .../DisruptableMockTransportTests.java | 19 -- .../test/disruption/NetworkDisruptionIT.java | 7 - 71 files changed, 343 insertions(+), 693 deletions(-) rename libs/core/src/main/java/org/opensearch/core/common/io/stream/{ProtobufWriteable.java => BytesWriteable.java} (97%) diff --git a/libs/core/src/main/java/org/opensearch/core/common/io/stream/ProtobufWriteable.java b/libs/core/src/main/java/org/opensearch/core/common/io/stream/BytesWriteable.java similarity index 97% rename from libs/core/src/main/java/org/opensearch/core/common/io/stream/ProtobufWriteable.java rename to libs/core/src/main/java/org/opensearch/core/common/io/stream/BytesWriteable.java index 7338c025921c3..001af4e672b1d 100644 --- a/libs/core/src/main/java/org/opensearch/core/common/io/stream/ProtobufWriteable.java +++ b/libs/core/src/main/java/org/opensearch/core/common/io/stream/BytesWriteable.java @@ -26,7 +26,7 @@ * @opensearch.api */ @ExperimentalApi -public interface ProtobufWriteable { +public interface BytesWriteable { /** * Write this into the {@linkplain OutputStream}. diff --git a/libs/core/src/main/java/org/opensearch/core/transport/TransportMessage.java b/libs/core/src/main/java/org/opensearch/core/transport/TransportMessage.java index 219326b44216c..a50fc51e8e964 100644 --- a/libs/core/src/main/java/org/opensearch/core/transport/TransportMessage.java +++ b/libs/core/src/main/java/org/opensearch/core/transport/TransportMessage.java @@ -32,7 +32,7 @@ package org.opensearch.core.transport; -import org.opensearch.core.common.io.stream.ProtobufWriteable; +import org.opensearch.core.common.io.stream.BytesWriteable; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.core.common.transport.TransportAddress; @@ -44,7 +44,7 @@ * * @opensearch.internal */ -public abstract class TransportMessage implements Writeable, ProtobufWriteable { +public abstract class TransportMessage implements Writeable, BytesWriteable { private TransportAddress remoteAddress; diff --git a/server/src/internalClusterTest/java/org/opensearch/action/admin/cluster/node/tasks/CancellableTasksIT.java b/server/src/internalClusterTest/java/org/opensearch/action/admin/cluster/node/tasks/CancellableTasksIT.java index de98f144aa99d..bdb36b62ada21 100644 --- a/server/src/internalClusterTest/java/org/opensearch/action/admin/cluster/node/tasks/CancellableTasksIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/action/admin/cluster/node/tasks/CancellableTasksIT.java @@ -76,7 +76,6 @@ import org.junit.Before; import java.io.IOException; -import java.io.InputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -561,12 +560,6 @@ public String executor() { public TestResponse read(StreamInput in) throws IOException { return new TestResponse(in); } - - @Override - public TestResponse read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } } ); } diff --git a/server/src/main/java/org/opensearch/action/ActionListenerResponseHandler.java b/server/src/main/java/org/opensearch/action/ActionListenerResponseHandler.java index b58d67f826b34..86cbb8a6307be 100644 --- a/server/src/main/java/org/opensearch/action/ActionListenerResponseHandler.java +++ b/server/src/main/java/org/opensearch/action/ActionListenerResponseHandler.java @@ -41,7 +41,6 @@ import org.opensearch.transport.TransportResponseHandler; import java.io.IOException; -import java.io.InputStream; import java.util.Objects; /** @@ -90,10 +89,4 @@ public Response read(StreamInput in) throws IOException { public String toString() { return super.toString() + "/" + listener; } - - @Override - public Response read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } } diff --git a/server/src/main/java/org/opensearch/action/ProtobufActionListenerResponseHandler.java b/server/src/main/java/org/opensearch/action/ProtobufActionListenerResponseHandler.java index 4dc67273f3eed..7e69bb9dc6cbd 100644 --- a/server/src/main/java/org/opensearch/action/ProtobufActionListenerResponseHandler.java +++ b/server/src/main/java/org/opensearch/action/ProtobufActionListenerResponseHandler.java @@ -9,7 +9,7 @@ package org.opensearch.action; import org.opensearch.core.action.ActionListener; -import org.opensearch.core.common.io.stream.ProtobufWriteable; +import org.opensearch.core.common.io.stream.BytesWriteable; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.transport.TransportResponse; import org.opensearch.threadpool.ThreadPool; @@ -29,12 +29,12 @@ public class ProtobufActionListenerResponseHandler implements TransportResponseHandler { private final ActionListener listener; - private final ProtobufWriteable.Reader reader; + private final BytesWriteable.Reader reader; private final String executor; public ProtobufActionListenerResponseHandler( ActionListener listener, - ProtobufWriteable.Reader reader, + BytesWriteable.Reader reader, String executor ) { this.listener = Objects.requireNonNull(listener); @@ -42,7 +42,7 @@ public ProtobufActionListenerResponseHandler( this.executor = Objects.requireNonNull(executor); } - public ProtobufActionListenerResponseHandler(ActionListener listener, ProtobufWriteable.Reader reader) { + public ProtobufActionListenerResponseHandler(ActionListener listener, BytesWriteable.Reader reader) { this(listener, reader, ThreadPool.Names.SAME); } diff --git a/server/src/main/java/org/opensearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java b/server/src/main/java/org/opensearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java index ad42f20fc3dfa..10bf4975311d6 100644 --- a/server/src/main/java/org/opensearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java +++ b/server/src/main/java/org/opensearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java @@ -76,7 +76,6 @@ import org.opensearch.transport.TransportService; import java.io.IOException; -import java.io.InputStream; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -340,12 +339,6 @@ public void handleResponse(final FieldCapabilitiesIndexResponse response) { public void handleException(TransportException exp) { onFailure(shardRouting, exp); } - - @Override - public FieldCapabilitiesIndexResponse read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } } ); } diff --git a/server/src/main/java/org/opensearch/action/resync/TransportResyncReplicationAction.java b/server/src/main/java/org/opensearch/action/resync/TransportResyncReplicationAction.java index 95b13616ab048..9d60706d1f100 100644 --- a/server/src/main/java/org/opensearch/action/resync/TransportResyncReplicationAction.java +++ b/server/src/main/java/org/opensearch/action/resync/TransportResyncReplicationAction.java @@ -62,7 +62,6 @@ import org.opensearch.transport.TransportService; import java.io.IOException; -import java.io.InputStream; import java.util.function.Function; import java.util.stream.Stream; @@ -254,12 +253,6 @@ public void handleResponse(ResyncReplicationResponse response) { public void handleException(TransportException exp) { listener.onFailure(exp); } - - @Override - public ResyncReplicationResponse read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } } ); } diff --git a/server/src/main/java/org/opensearch/action/search/PitService.java b/server/src/main/java/org/opensearch/action/search/PitService.java index c1bf5ffbd4a45..b6480ce63f827 100644 --- a/server/src/main/java/org/opensearch/action/search/PitService.java +++ b/server/src/main/java/org/opensearch/action/search/PitService.java @@ -27,7 +27,6 @@ import org.opensearch.transport.TransportService; import java.io.IOException; -import java.io.InputStream; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -203,12 +202,6 @@ public String executor() { public GetAllPitNodesResponse read(StreamInput in) throws IOException { return new GetAllPitNodesResponse(in); } - - @Override - public GetAllPitNodesResponse read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } } ); } diff --git a/server/src/main/java/org/opensearch/action/search/QueryPhaseResultConsumer.java b/server/src/main/java/org/opensearch/action/search/QueryPhaseResultConsumer.java index f1b06378bd579..3081e95dc5e26 100644 --- a/server/src/main/java/org/opensearch/action/search/QueryPhaseResultConsumer.java +++ b/server/src/main/java/org/opensearch/action/search/QueryPhaseResultConsumer.java @@ -38,6 +38,7 @@ import org.opensearch.common.lease.Releasable; import org.opensearch.common.lease.Releasables; import org.opensearch.common.lucene.search.TopDocsAndMaxScore; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.util.concurrent.AbstractRunnable; import org.opensearch.core.common.breaker.CircuitBreaker; import org.opensearch.core.common.breaker.CircuitBreakingException; @@ -114,7 +115,11 @@ public QueryPhaseResultConsumer( SearchSourceBuilder source = request.source(); this.hasTopDocs = source == null || source.size() != 0; - this.hasAggs = source != null && source.aggregations() != null; + if (FeatureFlags.isEnabled(FeatureFlags.PROTOBUF)) { + this.hasAggs = false; + } else { + this.hasAggs = source != null && source.aggregations() != null; + } int batchReduceSize = (hasAggs || hasTopDocs) ? Math.min(request.getBatchedReduceSize(), expectedResultSize) : expectedResultSize; this.pendingMerges = new PendingMerges(batchReduceSize, request.resolveTrackTotalHitsUpTo()); } @@ -320,7 +325,7 @@ synchronized long addEstimateAndMaybeBreak(long estimatedSize) { * provided {@link QuerySearchResult}. */ long ramBytesUsedQueryResult(QuerySearchResult result) { - if (hasAggs == false) { + if (hasAggs == false || FeatureFlags.isEnabled(FeatureFlags.PROTOBUF)) { return 0; } return result.aggregations().asSerialized(InternalAggregations::readFrom, namedWriteableRegistry).ramBytesUsed(); @@ -489,7 +494,7 @@ public synchronized List consumeTopDocs() { } public synchronized List consumeAggs() { - if (hasAggs == false) { + if (hasAggs == false || FeatureFlags.isEnabled(FeatureFlags.PROTOBUF)) { return Collections.emptyList(); } List aggsList = new ArrayList<>(); diff --git a/server/src/main/java/org/opensearch/action/search/SearchTransportService.java b/server/src/main/java/org/opensearch/action/search/SearchTransportService.java index a71d8c2a64f4e..ce244883d457c 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchTransportService.java +++ b/server/src/main/java/org/opensearch/action/search/SearchTransportService.java @@ -43,7 +43,7 @@ import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.core.action.ActionListener; -import org.opensearch.core.common.io.stream.ProtobufWriteable; +import org.opensearch.core.common.io.stream.BytesWriteable; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; @@ -248,7 +248,7 @@ public void sendExecuteQuery( final ActionListener handler = responseWrapper.apply(connection, listener); TransportResponseHandler transportResponseHandler; if (FeatureFlags.isEnabled(FeatureFlags.PROTOBUF_SETTING)) { - ProtobufWriteable.Reader reader = fetchDocuments ? QueryFetchSearchResult::new : QuerySearchResult::new; + BytesWriteable.Reader reader = fetchDocuments ? QueryFetchSearchResult::new : QuerySearchResult::new; transportResponseHandler = new ProtobufConnectionCountingHandler<>( handler, reader, @@ -797,7 +797,7 @@ final class ProtobufConnectionCountingHandler listener, - final ProtobufWriteable.Reader responseReader, + final BytesWriteable.Reader responseReader, final Map clientConnections, final String nodeId ) { diff --git a/server/src/main/java/org/opensearch/action/support/broadcast/TransportBroadcastAction.java b/server/src/main/java/org/opensearch/action/support/broadcast/TransportBroadcastAction.java index b41384854c122..8bf8555194976 100644 --- a/server/src/main/java/org/opensearch/action/support/broadcast/TransportBroadcastAction.java +++ b/server/src/main/java/org/opensearch/action/support/broadcast/TransportBroadcastAction.java @@ -61,7 +61,6 @@ import org.opensearch.transport.TransportService; import java.io.IOException; -import java.io.InputStream; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReferenceArray; @@ -230,12 +229,6 @@ public void handleResponse(ShardResponse response) { public void handleException(TransportException e) { onOperation(shard, shardIt, shardIndex, e); } - - @Override - public ShardResponse read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } } ); } diff --git a/server/src/main/java/org/opensearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java b/server/src/main/java/org/opensearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java index 89e18773e1079..c08cfb7af0e3d 100644 --- a/server/src/main/java/org/opensearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java +++ b/server/src/main/java/org/opensearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java @@ -69,7 +69,6 @@ import org.opensearch.transport.TransportService; import java.io.IOException; -import java.io.InputStream; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -393,13 +392,6 @@ public void handleException(TransportException exp) { public String executor() { return ThreadPool.Names.SAME; } - - @Override - public TransportBroadcastByNodeAction.NodeResponse read(InputStream in) - throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } } ); } catch (Exception e) { diff --git a/server/src/main/java/org/opensearch/action/support/nodes/TransportNodesAction.java b/server/src/main/java/org/opensearch/action/support/nodes/TransportNodesAction.java index bb7b848e2b69d..9a1a28dd70636 100644 --- a/server/src/main/java/org/opensearch/action/support/nodes/TransportNodesAction.java +++ b/server/src/main/java/org/opensearch/action/support/nodes/TransportNodesAction.java @@ -55,7 +55,6 @@ import org.opensearch.transport.TransportService; import java.io.IOException; -import java.io.InputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -287,12 +286,6 @@ public void handleException(TransportException exp) { public String executor() { return ThreadPool.Names.SAME; } - - @Override - public NodeResponse read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } } ); } catch (Exception e) { diff --git a/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java index ac58ccf54f0fc..95f998e2d89c2 100644 --- a/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java @@ -94,7 +94,6 @@ import org.opensearch.transport.TransportService; import java.io.IOException; -import java.io.InputStream; import java.util.Map; import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; @@ -1170,12 +1169,6 @@ public void handleException(TransportException exp) { finishWithUnexpectedFailure(e); } } - - @Override - public Response read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } }); } diff --git a/server/src/main/java/org/opensearch/action/support/single/instance/TransportInstanceSingleOperationAction.java b/server/src/main/java/org/opensearch/action/support/single/instance/TransportInstanceSingleOperationAction.java index ee642f96aedd4..21d4ba726e86f 100644 --- a/server/src/main/java/org/opensearch/action/support/single/instance/TransportInstanceSingleOperationAction.java +++ b/server/src/main/java/org/opensearch/action/support/single/instance/TransportInstanceSingleOperationAction.java @@ -66,7 +66,6 @@ import org.opensearch.transport.TransportService; import java.io.IOException; -import java.io.InputStream; import static org.opensearch.cluster.metadata.IndexNameExpressionResolver.EXCLUDED_DATA_STREAMS_KEY; @@ -246,12 +245,6 @@ public void handleException(TransportException exp) { listener.onFailure(exp); } } - - @Override - public Response read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } }); } diff --git a/server/src/main/java/org/opensearch/action/support/single/shard/TransportSingleShardAction.java b/server/src/main/java/org/opensearch/action/support/single/shard/TransportSingleShardAction.java index 0a5fe78c5dbf9..df91559a2f8cb 100644 --- a/server/src/main/java/org/opensearch/action/support/single/shard/TransportSingleShardAction.java +++ b/server/src/main/java/org/opensearch/action/support/single/shard/TransportSingleShardAction.java @@ -65,7 +65,6 @@ import org.opensearch.transport.TransportService; import java.io.IOException; -import java.io.InputStream; import static org.opensearch.action.support.TransportActions.isShardNotAvailableException; @@ -226,12 +225,6 @@ public void handleResponse(final Response response) { public void handleException(TransportException exp) { listener.onFailure(exp); } - - @Override - public Response read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } } ); } else { @@ -309,12 +302,6 @@ public void handleResponse(final Response response) { public void handleException(TransportException exp) { onFailure(finalShardRouting, exp); } - - @Override - public Response read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } } ); } diff --git a/server/src/main/java/org/opensearch/action/support/tasks/TransportTasksAction.java b/server/src/main/java/org/opensearch/action/support/tasks/TransportTasksAction.java index 338750506c735..f33d7161660a3 100644 --- a/server/src/main/java/org/opensearch/action/support/tasks/TransportTasksAction.java +++ b/server/src/main/java/org/opensearch/action/support/tasks/TransportTasksAction.java @@ -62,7 +62,6 @@ import org.opensearch.transport.TransportService; import java.io.IOException; -import java.io.InputStream; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -317,14 +316,6 @@ public void handleException(TransportException exp) { public String executor() { return ThreadPool.Names.SAME; } - - @Override - public - TransportTasksAction.NodeTasksResponse - read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } } ); } diff --git a/server/src/main/java/org/opensearch/cluster/coordination/FollowersChecker.java b/server/src/main/java/org/opensearch/cluster/coordination/FollowersChecker.java index 5118173ea301b..70bb0515bb022 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/FollowersChecker.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/FollowersChecker.java @@ -61,7 +61,6 @@ import org.opensearch.transport.TransportService; import java.io.IOException; -import java.io.InputStream; import java.util.HashSet; import java.util.Map; import java.util.Objects; @@ -409,12 +408,6 @@ public void handleException(TransportException exp) { public String executor() { return Names.SAME; } - - @Override - public Empty read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } } ); } diff --git a/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java index 3d675335ae867..9bf6bac07da53 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java @@ -74,7 +74,6 @@ import org.opensearch.transport.TransportService; import java.io.IOException; -import java.io.InputStream; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -422,12 +421,6 @@ public void handleException(TransportException exp) { public String executor() { return Names.SAME; } - - @Override - public Empty read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } } ); } else { @@ -458,12 +451,6 @@ public void handleException(TransportException exp) { public String executor() { return ThreadPool.Names.SAME; } - - @Override - public Empty read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } }); } diff --git a/server/src/main/java/org/opensearch/cluster/coordination/LeaderChecker.java b/server/src/main/java/org/opensearch/cluster/coordination/LeaderChecker.java index 4aad1fa3fc4a9..8d4373b865f62 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/LeaderChecker.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/LeaderChecker.java @@ -63,7 +63,6 @@ import org.opensearch.transport.TransportService; import java.io.IOException; -import java.io.InputStream; import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -350,12 +349,6 @@ public void handleException(TransportException exp) { public String executor() { return Names.SAME; } - - @Override - public Empty read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } } ); } diff --git a/server/src/main/java/org/opensearch/cluster/coordination/PreVoteCollector.java b/server/src/main/java/org/opensearch/cluster/coordination/PreVoteCollector.java index 2b2f14db31ed9..cc4d1ac156c53 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/PreVoteCollector.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/PreVoteCollector.java @@ -50,7 +50,6 @@ import org.opensearch.transport.TransportService; import java.io.IOException; -import java.io.InputStream; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.LongConsumer; @@ -218,12 +217,6 @@ public String executor() { public String toString() { return "TransportResponseHandler{" + PreVoteCollector.this + ", node=" + n + '}'; } - - @Override - public PreVoteResponse read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } } ) ); diff --git a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java index 50db5d1ba8342..1fdaeead0d28d 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java @@ -47,7 +47,6 @@ import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.transport.TransportResponse; -import org.opensearch.core.transport.TransportResponse.Empty; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.BytesTransportRequest; import org.opensearch.transport.TransportChannel; @@ -57,7 +56,6 @@ import org.opensearch.transport.TransportService; import java.io.IOException; -import java.io.InputStream; import java.util.HashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; @@ -382,12 +380,6 @@ public void handleException(TransportException exp) { public String executor() { return ThreadPool.Names.GENERIC; } - - @Override - public Empty read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } } ); } @@ -459,12 +451,6 @@ public void handleException(TransportException exp) { public String executor() { return ThreadPool.Names.GENERIC; } - - @Override - public PublishWithJoinResponse read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } }; transportService.sendRequest(destination, PUBLISH_STATE_ACTION_NAME, request, stateRequestOptions, responseHandler); } catch (Exception e) { diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java index 722a836c2dc21..fec313b4b0b73 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java @@ -36,7 +36,6 @@ import org.opensearch.transport.TransportService; import java.io.IOException; -import java.io.InputStream; import java.util.Arrays; import java.util.HashMap; import java.util.LinkedHashMap; @@ -258,12 +257,6 @@ public String executor() { public NodesStatsResponse read(StreamInput in) throws IOException { return new NodesStatsResponse(in); } - - @Override - public NodesStatsResponse read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } } ); } diff --git a/server/src/main/java/org/opensearch/common/document/DocumentField.java b/server/src/main/java/org/opensearch/common/document/DocumentField.java index a3153c296d0c4..71b0c4488dd82 100644 --- a/server/src/main/java/org/opensearch/common/document/DocumentField.java +++ b/server/src/main/java/org/opensearch/common/document/DocumentField.java @@ -33,27 +33,33 @@ package org.opensearch.common.document; import com.google.protobuf.ByteString; -import org.apache.lucene.util.SuppressForbidden; import org.opensearch.OpenSearchException; import org.opensearch.common.annotation.PublicApi; import org.opensearch.common.util.FeatureFlags; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.common.text.Text; import org.opensearch.core.xcontent.ToXContentFragment; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.index.get.GetResult; import org.opensearch.search.SearchHit; import org.opensearch.server.proto.FetchSearchResultProto; +import org.opensearch.server.proto.FetchSearchResultProto.DocumentFieldValue; +import org.opensearch.server.proto.FetchSearchResultProto.DocumentFieldValue.Builder; -import java.io.ByteArrayInputStream; import java.io.IOException; -import java.io.InputStream; -import java.io.ObjectInputStream; +import java.time.Instant; +import java.time.ZoneId; +import java.time.ZonedDateTime; import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import java.util.Objects; import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; @@ -84,19 +90,107 @@ public DocumentField(String name, List values) { this.values = Objects.requireNonNull(values, "values must not be null"); } - @SuppressForbidden(reason = "We need to read from a byte array") public DocumentField(byte[] in) throws IOException { assert FeatureFlags.isEnabled(FeatureFlags.PROTOBUF) : "protobuf feature flag is not enabled"; documentField = FetchSearchResultProto.SearchHit.DocumentField.parseFrom(in); name = documentField.getName(); values = new ArrayList<>(); - for (ByteString value : documentField.getValuesList()) { - InputStream is = new ByteArrayInputStream(value.toByteArray()); - try (ObjectInputStream ois = new ObjectInputStream(is)) { - values.add(ois.readObject()); - } catch (ClassNotFoundException e) { - throw new OpenSearchException(e); + for (FetchSearchResultProto.DocumentFieldValue value : documentField.getValuesList()) { + values.add(readDocumentFieldValueFromProtobuf(value)); + } + } + + public static FetchSearchResultProto.SearchHit.DocumentField convertDocumentFieldToProto(DocumentField documentField) { + FetchSearchResultProto.SearchHit.DocumentField.Builder builder = FetchSearchResultProto.SearchHit.DocumentField.newBuilder(); + builder.setName(documentField.getName()); + for (Object value : documentField.getValues()) { + FetchSearchResultProto.DocumentFieldValue.Builder valueBuilder = FetchSearchResultProto.DocumentFieldValue.newBuilder(); + builder.addValues(convertDocumentFieldValueToProto(value, valueBuilder)); + } + return builder.build(); + } + + private static DocumentFieldValue.Builder convertDocumentFieldValueToProto(Object value, Builder valueBuilder) { + if (value == null) { + // null is not allowed in protobuf, so we use a special string to represent null + return valueBuilder.setValueString("null"); + } + Class type = value.getClass(); + if (type == String.class) { + valueBuilder.setValueString((String) value); + } else if (type == Integer.class) { + valueBuilder.setValueInt((Integer) value); + } else if (type == Long.class) { + valueBuilder.setValueLong((Long) value); + } else if (type == Float.class) { + valueBuilder.setValueFloat((Float) value); + } else if (type == Double.class) { + valueBuilder.setValueDouble((Double) value); + } else if (type == Boolean.class) { + valueBuilder.setValueBool((Boolean) value); + } else if (type == byte[].class) { + valueBuilder.addValueByteArray(ByteString.copyFrom((byte[]) value)); + } else if (type == List.class) { + List list = (List) value; + for (Object listValue : list) { + valueBuilder.addValueArrayList(convertDocumentFieldValueToProto(listValue, valueBuilder)); + } + } else if (type == Map.class || type == HashMap.class || type == LinkedHashMap.class) { + Map map = (Map) value; + for (Map.Entry entry : map.entrySet()) { + valueBuilder.putValueMap(entry.getKey(), convertDocumentFieldValueToProto(entry.getValue(), valueBuilder).build()); + } + } else if (type == Date.class) { + valueBuilder.setValueDate(((Date) value).getTime()); + } else if (type == ZonedDateTime.class) { + valueBuilder.setValueZonedDate(((ZonedDateTime) value).getZone().getId()); + valueBuilder.setValueZonedTime(((ZonedDateTime) value).toInstant().toEpochMilli()); + } else if (type == Text.class) { + valueBuilder.setValueText(((Text) value).string()); + } else { + throw new OpenSearchException("Can't convert generic value of type [" + type + "] to protobuf"); + } + return valueBuilder; + } + + private Object readDocumentFieldValueFromProtobuf(FetchSearchResultProto.DocumentFieldValue documentFieldValue) throws IOException { + if (documentFieldValue.hasValueString()) { + return documentFieldValue.getValueString(); + } else if (documentFieldValue.hasValueInt()) { + return documentFieldValue.getValueInt(); + } else if (documentFieldValue.hasValueLong()) { + return documentFieldValue.getValueLong(); + } else if (documentFieldValue.hasValueFloat()) { + return documentFieldValue.getValueFloat(); + } else if (documentFieldValue.hasValueDouble()) { + return documentFieldValue.getValueDouble(); + } else if (documentFieldValue.hasValueBool()) { + return documentFieldValue.getValueBool(); + } else if (documentFieldValue.getValueByteArrayList().size() > 0) { + return documentFieldValue.getValueByteArrayList().toArray(); + } else if (documentFieldValue.getValueArrayListList().size() > 0) { + List list = new ArrayList<>(); + for (FetchSearchResultProto.DocumentFieldValue value : documentFieldValue.getValueArrayListList()) { + list.add(readDocumentFieldValueFromProtobuf(value)); + } + return list; + } else if (documentFieldValue.getValueMapMap().size() > 0) { + Map map = Map.of(); + for (Map.Entry entrySet : documentFieldValue.getValueMapMap().entrySet()) { + map.put(entrySet.getKey(), readDocumentFieldValueFromProtobuf(entrySet.getValue())); } + return map; + } else if (documentFieldValue.hasValueDate()) { + return new Date(documentFieldValue.getValueDate()); + } else if (documentFieldValue.hasValueZonedDate() && documentFieldValue.hasValueZonedTime()) { + return ZonedDateTime.ofInstant( + Instant.ofEpochMilli(documentFieldValue.getValueZonedTime()), + ZoneId.of(documentFieldValue.getValueZonedDate()) + ); + } else if (documentFieldValue.hasValueText()) { + return new Text(documentFieldValue.getValueText()); + } else { + throw new IOException("Can't read generic value of type [" + documentFieldValue + "]"); } } diff --git a/server/src/main/java/org/opensearch/discovery/PeerFinder.java b/server/src/main/java/org/opensearch/discovery/PeerFinder.java index a233409232c42..1d997c8cbabe8 100644 --- a/server/src/main/java/org/opensearch/discovery/PeerFinder.java +++ b/server/src/main/java/org/opensearch/discovery/PeerFinder.java @@ -54,7 +54,6 @@ import org.opensearch.transport.TransportService; import java.io.IOException; -import java.io.InputStream; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -505,12 +504,6 @@ public void handleException(TransportException exp) { public String executor() { return Names.GENERIC; } - - @Override - public PeersResponse read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } }; transportService.sendRequest( discoveryNode, diff --git a/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java b/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java index 1ac274524be28..b531abcb845d7 100644 --- a/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java +++ b/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java @@ -49,7 +49,6 @@ import org.opensearch.transport.TransportService; import java.io.IOException; -import java.io.InputStream; import java.net.InetAddress; import java.util.HashMap; import java.util.HashSet; @@ -385,12 +384,6 @@ public void handleException(TransportException exp) { public String executor() { return ThreadPool.Names.GENERIC; } - - @Override - public InitializeExtensionResponse read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } }; logger.info("Sending extension request type: " + REQUEST_EXTENSION_ACTION_NAME); diff --git a/server/src/main/java/org/opensearch/extensions/UpdateSettingsResponseHandler.java b/server/src/main/java/org/opensearch/extensions/UpdateSettingsResponseHandler.java index d824b35845cad..8c7c3c4cb9bd9 100644 --- a/server/src/main/java/org/opensearch/extensions/UpdateSettingsResponseHandler.java +++ b/server/src/main/java/org/opensearch/extensions/UpdateSettingsResponseHandler.java @@ -17,7 +17,6 @@ import org.opensearch.transport.TransportResponseHandler; import java.io.IOException; -import java.io.InputStream; /** * Response handler for {@link UpdateSettingsRequest} @@ -49,10 +48,4 @@ public void handleException(TransportException exp) { public String executor() { return ThreadPool.Names.GENERIC; } - - @Override - public AcknowledgedResponse read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } } diff --git a/server/src/main/java/org/opensearch/extensions/action/ExtensionTransportActionsHandler.java b/server/src/main/java/org/opensearch/extensions/action/ExtensionTransportActionsHandler.java index 699b99d442b2c..ac60df1b73764 100644 --- a/server/src/main/java/org/opensearch/extensions/action/ExtensionTransportActionsHandler.java +++ b/server/src/main/java/org/opensearch/extensions/action/ExtensionTransportActionsHandler.java @@ -27,7 +27,6 @@ import org.opensearch.transport.TransportService; import java.io.IOException; -import java.io.InputStream; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -220,12 +219,6 @@ public void handleException(TransportException exp) { public String executor() { return ThreadPool.Names.GENERIC; } - - @Override - public ExtensionActionResponse read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } }; try { transportService.sendRequest( @@ -290,12 +283,6 @@ public void handleException(TransportException exp) { public String executor() { return ThreadPool.Names.GENERIC; } - - @Override - public RemoteExtensionActionResponse read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } }; try { transportService.sendRequest( diff --git a/server/src/main/java/org/opensearch/extensions/rest/RestSendToExtensionAction.java b/server/src/main/java/org/opensearch/extensions/rest/RestSendToExtensionAction.java index 63db8ad04eea4..f4503ce55e6bc 100644 --- a/server/src/main/java/org/opensearch/extensions/rest/RestSendToExtensionAction.java +++ b/server/src/main/java/org/opensearch/extensions/rest/RestSendToExtensionAction.java @@ -35,7 +35,6 @@ import org.opensearch.transport.TransportService; import java.io.IOException; -import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.HashSet; @@ -241,12 +240,6 @@ public void handleException(TransportException exp) { public String executor() { return ThreadPool.Names.GENERIC; } - - @Override - public RestExecuteOnExtensionResponse read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } }; try { diff --git a/server/src/main/java/org/opensearch/index/seqno/RetentionLeaseBackgroundSyncAction.java b/server/src/main/java/org/opensearch/index/seqno/RetentionLeaseBackgroundSyncAction.java index 1e6732407d98c..5fa0a1a6459e7 100644 --- a/server/src/main/java/org/opensearch/index/seqno/RetentionLeaseBackgroundSyncAction.java +++ b/server/src/main/java/org/opensearch/index/seqno/RetentionLeaseBackgroundSyncAction.java @@ -65,7 +65,6 @@ import org.opensearch.transport.TransportService; import java.io.IOException; -import java.io.InputStream; import java.util.Map; import java.util.Objects; @@ -168,12 +167,6 @@ public void handleException(TransportException e) { } getLogger().warn(new ParameterizedMessage("{} retention lease background sync failed", shardId), e); } - - @Override - public ReplicationResponse read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } } ); } diff --git a/server/src/main/java/org/opensearch/index/seqno/RetentionLeaseSyncAction.java b/server/src/main/java/org/opensearch/index/seqno/RetentionLeaseSyncAction.java index 1d89e5bc47604..ca3c7e1d49700 100644 --- a/server/src/main/java/org/opensearch/index/seqno/RetentionLeaseSyncAction.java +++ b/server/src/main/java/org/opensearch/index/seqno/RetentionLeaseSyncAction.java @@ -69,7 +69,6 @@ import org.opensearch.transport.TransportService; import java.io.IOException; -import java.io.InputStream; import java.util.Map; import java.util.Objects; @@ -179,12 +178,6 @@ public void handleException(TransportException e) { taskManager.unregister(task); listener.onFailure(e); } - - @Override - public ReplicationResponse read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } } ); } diff --git a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java index 61648e725d190..227496f72f83d 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java @@ -80,7 +80,6 @@ import org.opensearch.transport.TransportService; import java.io.IOException; -import java.io.InputStream; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; @@ -763,11 +762,5 @@ public String executor() { public RecoveryResponse read(StreamInput in) throws IOException { return new RecoveryResponse(in); } - - @Override - public RecoveryResponse read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } } } diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java index 4f593b24aae97..821ae42e31881 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java @@ -41,7 +41,6 @@ import org.opensearch.transport.TransportService; import java.io.IOException; -import java.io.InputStream; import java.util.Objects; /** @@ -173,12 +172,6 @@ public void handleException(TransportException e) { e ); } - - @Override - public ReplicationResponse read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } } ); logger.trace( diff --git a/server/src/main/java/org/opensearch/indices/store/IndicesStore.java b/server/src/main/java/org/opensearch/indices/store/IndicesStore.java index 33e825cda34d1..1efaca09204da 100644 --- a/server/src/main/java/org/opensearch/indices/store/IndicesStore.java +++ b/server/src/main/java/org/opensearch/indices/store/IndicesStore.java @@ -75,7 +75,6 @@ import java.io.Closeable; import java.io.IOException; -import java.io.InputStream; import java.util.ArrayList; import java.util.EnumSet; import java.util.HashSet; @@ -353,12 +352,6 @@ private void allNodesResponded() { ); } - @Override - public ShardActiveResponse read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } - } private class ShardActiveRequestHandler implements TransportRequestHandler { diff --git a/server/src/main/java/org/opensearch/search/SearchHit.java b/server/src/main/java/org/opensearch/search/SearchHit.java index c0c21be704ec3..1e43e60611eb9 100644 --- a/server/src/main/java/org/opensearch/search/SearchHit.java +++ b/server/src/main/java/org/opensearch/search/SearchHit.java @@ -171,6 +171,9 @@ public SearchHit( this.nestedIdentity = nestedIdentity; this.documentFields = documentFields == null ? emptyMap() : documentFields; this.metaFields = metaFields == null ? emptyMap() : metaFields; + if (FeatureFlags.isEnabled(FeatureFlags.PROTOBUF)) { + this.searchHitProto = convertHitToProto(this); + } } public SearchHit(StreamInput in) throws IOException { @@ -341,9 +344,13 @@ public static FetchSearchResultProto.SearchHit convertHitToProto(SearchHit hit) searchHitBuilder.setSeqNo(hit.getSeqNo()); searchHitBuilder.setPrimaryTerm(hit.getPrimaryTerm()); searchHitBuilder.setVersion(hit.getVersion()); + searchHitBuilder.setDocId(hit.docId); if (hit.getSourceRef() != null) { searchHitBuilder.setSource(ByteString.copyFrom(hit.getSourceRef().toBytesRef().bytes)); } + for (Map.Entry entry : hit.getFields().entrySet()) { + searchHitBuilder.putDocumentFields(entry.getKey(), DocumentField.convertDocumentFieldToProto(entry.getValue())); + } return searchHitBuilder.build(); } @@ -563,6 +570,9 @@ public void setDocumentField(String fieldName, DocumentField field) { if (fieldName == null || field == null) return; if (documentFields.isEmpty()) this.documentFields = new HashMap<>(); this.documentFields.put(fieldName, field); + if (FeatureFlags.isEnabled(FeatureFlags.PROTOBUF)) { + this.searchHitProto = convertHitToProto(this); + } } public DocumentField removeDocumentField(String fieldName) { diff --git a/server/src/main/java/org/opensearch/search/SearchHits.java b/server/src/main/java/org/opensearch/search/SearchHits.java index 417e85399e3f1..630a7c90f8b3d 100644 --- a/server/src/main/java/org/opensearch/search/SearchHits.java +++ b/server/src/main/java/org/opensearch/search/SearchHits.java @@ -122,12 +122,14 @@ public static FetchSearchResultProto.SearchHits convertHitsToProto(SearchHits hi searchHitList.add(SearchHit.convertHitToProto(hit)); } QuerySearchResultProto.TotalHits.Builder totalHitsBuilder = QuerySearchResultProto.TotalHits.newBuilder(); - totalHitsBuilder.setValue(hits.getTotalHits().value); - totalHitsBuilder.setRelation( - hits.getTotalHits().relation == Relation.EQUAL_TO - ? QuerySearchResultProto.TotalHits.Relation.EQUAL_TO - : QuerySearchResultProto.TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO - ); + if (hits.getTotalHits() != null) { + totalHitsBuilder.setValue(hits.getTotalHits().value); + totalHitsBuilder.setRelation( + hits.getTotalHits().relation == Relation.EQUAL_TO + ? QuerySearchResultProto.TotalHits.Relation.EQUAL_TO + : QuerySearchResultProto.TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO + ); + } FetchSearchResultProto.SearchHits.Builder searchHitsBuilder = FetchSearchResultProto.SearchHits.newBuilder(); searchHitsBuilder.setMaxScore(hits.getMaxScore()); searchHitsBuilder.addAllHits(searchHitList); @@ -135,7 +137,9 @@ public static FetchSearchResultProto.SearchHits convertHitsToProto(SearchHits hi if (hits.getSortFields() != null && hits.getSortFields().length > 0) { for (SortField sortField : hits.getSortFields()) { FetchSearchResultProto.SortField.Builder sortFieldBuilder = FetchSearchResultProto.SortField.newBuilder(); - sortFieldBuilder.setField(sortField.getField()); + if (sortField.getField() != null) { + sortFieldBuilder.setField(sortField.getField()); + } sortFieldBuilder.setType(FetchSearchResultProto.SortField.Type.valueOf(sortField.getType().name())); searchHitsBuilder.addSortFields(sortFieldBuilder.build()); } diff --git a/server/src/main/java/org/opensearch/search/fetch/FetchSearchResult.java b/server/src/main/java/org/opensearch/search/fetch/FetchSearchResult.java index f966f0a0fdccc..a9c70d336225d 100644 --- a/server/src/main/java/org/opensearch/search/fetch/FetchSearchResult.java +++ b/server/src/main/java/org/opensearch/search/fetch/FetchSearchResult.java @@ -118,7 +118,7 @@ private boolean assertNoSearchTarget(SearchHits hits) { } public SearchHits hits() { - if (FeatureFlags.isEnabled(FeatureFlags.PROTOBUF_SETTING)) { + if (FeatureFlags.isEnabled(FeatureFlags.PROTOBUF_SETTING) && this.fetchSearchResultProto != null) { SearchHits hits; try { hits = new SearchHits(this.fetchSearchResultProto.getHits().toByteArray()); diff --git a/server/src/main/java/org/opensearch/search/internal/ShardSearchRequest.java b/server/src/main/java/org/opensearch/search/internal/ShardSearchRequest.java index 9ab84dda119f2..dde9f7130afa5 100644 --- a/server/src/main/java/org/opensearch/search/internal/ShardSearchRequest.java +++ b/server/src/main/java/org/opensearch/search/internal/ShardSearchRequest.java @@ -315,6 +315,9 @@ public ShardSearchRequest(byte[] in) throws IOException { this.keepAlive = searchRequestProto.hasTimeValue() ? TimeValue.parseTimeValue(searchRequestProto.getTimeValue(), "keepAlive") : null; + this.aliasFilter = searchRequestProto.hasAliasFilter() + ? new AliasFilter(null, searchRequestProto.getAliasFilter().getAliasesList().toArray(Strings.EMPTY_ARRAY)) + : AliasFilter.EMPTY; } public ShardSearchRequest(ShardSearchRequest clone) { diff --git a/server/src/main/java/org/opensearch/search/query/QuerySearchResult.java b/server/src/main/java/org/opensearch/search/query/QuerySearchResult.java index 822628e960c3c..7ae21e6667caf 100644 --- a/server/src/main/java/org/opensearch/search/query/QuerySearchResult.java +++ b/server/src/main/java/org/opensearch/search/query/QuerySearchResult.java @@ -58,6 +58,9 @@ import org.opensearch.search.suggest.Suggest; import org.opensearch.server.proto.QuerySearchResultProto; import org.opensearch.server.proto.ShardSearchRequestProto; +import org.opensearch.server.proto.ShardSearchRequestProto.AliasFilter; +import org.opensearch.server.proto.ShardSearchRequestProto.ShardSearchRequest.SearchType; +import org.opensearch.transport.BaseInboundMessage; import java.io.IOException; import java.io.InputStream; @@ -123,13 +126,20 @@ public QuerySearchResult(InputStream in) throws IOException { super(in); assert FeatureFlags.isEnabled(FeatureFlags.PROTOBUF) : "protobuf feature flag is not enabled"; this.querySearchResultProto = QuerySearchResultProto.QuerySearchResult.parseFrom(in); - isNull = false; - ShardSearchRequest shardSearchRequest; - try { - shardSearchRequest = new ShardSearchRequest(this.querySearchResultProto.getSearchShardRequest().toByteArray()); - setShardSearchRequest(shardSearchRequest); - } catch (IOException e) { - logger.error("Error while setting shard search request", e); + isNull = this.querySearchResultProto.getIsNull(); + if (!isNull) { + this.contextId = new ShardSearchContextId( + this.querySearchResultProto.getContextId().getSessionId(), + this.querySearchResultProto.getContextId().getId() + ); + ShardSearchRequest shardSearchRequest; + hasAggs = false; + try { + shardSearchRequest = new ShardSearchRequest(this.querySearchResultProto.getSearchShardRequest().toByteArray()); + setShardSearchRequest(shardSearchRequest); + } catch (IOException e) { + logger.error("Error while setting shard search request", e); + } } } @@ -155,13 +165,40 @@ public QuerySearchResult(ShardSearchContextId contextId, SearchShardTarget shard ShardSearchRequestProto.ShardSearchRequest.Builder shardSearchRequestProto = ShardSearchRequestProto.ShardSearchRequest .newBuilder(); if (shardSearchRequest != null) { + ShardSearchRequestProto.OriginalIndices.Builder originalIndices = ShardSearchRequestProto.OriginalIndices.newBuilder(); + if (shardSearchRequest.indices() != null) { + for (String index : shardSearchRequest.indices()) { + originalIndices.addIndices(index); + } + originalIndices.setIndicesOptions( + ShardSearchRequestProto.OriginalIndices.IndicesOptions.newBuilder() + .setIgnoreUnavailable(shardSearchRequest.indicesOptions().ignoreUnavailable()) + .setAllowNoIndices(shardSearchRequest.indicesOptions().allowNoIndices()) + .setExpandWildcardsOpen(shardSearchRequest.indicesOptions().expandWildcardsOpen()) + .setExpandWildcardsClosed(shardSearchRequest.indicesOptions().expandWildcardsClosed()) + .setExpandWildcardsHidden(shardSearchRequest.indicesOptions().expandWildcardsHidden()) + .setAllowAliasesToMultipleIndices(shardSearchRequest.indicesOptions().allowAliasesToMultipleIndices()) + .setForbidClosedIndices(shardSearchRequest.indicesOptions().forbidClosedIndices()) + .setIgnoreAliases(shardSearchRequest.indicesOptions().ignoreAliases()) + .setIgnoreThrottled(shardSearchRequest.indicesOptions().ignoreThrottled()) + .build() + ); + } + AliasFilter.Builder aliasFilter = AliasFilter.newBuilder(); + if (shardSearchRequest.getAliasFilter() != null) { + for (int i = 0; i < shardSearchRequest.getAliasFilter().getAliases().length; i++) { + aliasFilter.addAliases(shardSearchRequest.getAliasFilter().getAliases()[i]); + } + } shardSearchRequestProto.setInboundNetworkTime(shardSearchRequest.getInboundNetworkTime()) .setOutboundNetworkTime(shardSearchRequest.getOutboundNetworkTime()) .setShardId(shardIdProto) .setAllowPartialSearchResults(shardSearchRequest.allowPartialSearchResults()) .setNumberOfShards(shardSearchRequest.numberOfShards()) - .setReaderId(shardSearchContextId); - + .setReaderId(shardSearchContextId) + .setOriginalIndices(originalIndices) + .setSearchType(SearchType.QUERY_THEN_FETCH) + .setAliasFilter(aliasFilter); if (shardSearchRequest.keepAlive() != null) { shardSearchRequestProto.setTimeValue(shardSearchRequest.keepAlive().getStringRep()); } @@ -175,6 +212,8 @@ public QuerySearchResult(ShardSearchContextId contextId, SearchShardTarget shard .setContextId(shardSearchContextId) .setSearchShardTarget(searchShardTarget.build()) .setSearchShardRequest(shardSearchRequestProto.build()) + .setHasAggs(false) + .setIsNull(isNull) .build(); } @@ -520,7 +559,9 @@ public void writeTo(StreamOutput out) throws IOException { @Override public void writeTo(OutputStream out) throws IOException { - out.write(this.querySearchResultProto.toByteArray()); + if (!isNull) { + out.write(this.querySearchResultProto.toByteArray()); + } } public void writeToNoId(StreamOutput out) throws IOException { @@ -570,7 +611,11 @@ public QuerySearchResultProto.QuerySearchResult response() { public QuerySearchResult(QuerySearchResultProto.QuerySearchResult querySearchResult) { this.querySearchResultProto = querySearchResult; - this.isNull = false; + this.isNull = this.querySearchResultProto.getIsNull(); + this.contextId = new ShardSearchContextId( + this.querySearchResultProto.getContextId().getSessionId(), + this.querySearchResultProto.getContextId().getId() + ); ShardSearchRequest shardSearchRequest; try { shardSearchRequest = new ShardSearchRequest(this.querySearchResultProto.getSearchShardRequest().toByteArray()); @@ -579,4 +624,12 @@ public QuerySearchResult(QuerySearchResultProto.QuerySearchResult querySearchRes logger.error("Error while setting shard search request", e); } } + + @Override + public String getProtocol() { + if (FeatureFlags.isEnabled(FeatureFlags.PROTOBUF_SETTING)) { + return BaseInboundMessage.PROTOBUF_PROTOCOL; + } + return BaseInboundMessage.NATIVE_PROTOCOL; + } } diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java index b6650a763cf01..89f1ea142336e 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotShardsService.java @@ -74,7 +74,6 @@ import org.opensearch.transport.TransportService; import java.io.IOException; -import java.io.InputStream; import java.nio.file.NoSuchFileException; import java.util.HashMap; import java.util.Iterator; @@ -624,12 +623,6 @@ public void handleException(TransportException exp) { public String executor() { return ThreadPool.Names.SAME; } - - @Override - public UpdateIndexShardSnapshotStatusResponse read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } } ) ); diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/handler/TraceableTransportResponseHandler.java b/server/src/main/java/org/opensearch/telemetry/tracing/handler/TraceableTransportResponseHandler.java index e09a7213903c0..eb9d53d2df51b 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/handler/TraceableTransportResponseHandler.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/handler/TraceableTransportResponseHandler.java @@ -17,7 +17,6 @@ import org.opensearch.transport.TransportResponseHandler; import java.io.IOException; -import java.io.InputStream; import java.util.Objects; /** @@ -105,10 +104,4 @@ public void handleRejection(Exception exp) { span.endSpan(); } } - - @Override - public T read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } } diff --git a/server/src/main/java/org/opensearch/transport/EmptyTransportResponseHandler.java b/server/src/main/java/org/opensearch/transport/EmptyTransportResponseHandler.java index 62c9ab738be21..1691b427ffca1 100644 --- a/server/src/main/java/org/opensearch/transport/EmptyTransportResponseHandler.java +++ b/server/src/main/java/org/opensearch/transport/EmptyTransportResponseHandler.java @@ -34,12 +34,8 @@ import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.transport.TransportResponse; -import org.opensearch.core.transport.TransportResponse.Empty; import org.opensearch.threadpool.ThreadPool; -import java.io.IOException; -import java.io.InputStream; - /** * Handler for empty transport response * @@ -70,10 +66,4 @@ public void handleException(TransportException exp) {} public String executor() { return executor; } - - @Override - public Empty read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } } diff --git a/server/src/main/java/org/opensearch/transport/InboundPipeline.java b/server/src/main/java/org/opensearch/transport/InboundPipeline.java index 0f84d32d7e9c6..af161c70b8e22 100644 --- a/server/src/main/java/org/opensearch/transport/InboundPipeline.java +++ b/server/src/main/java/org/opensearch/transport/InboundPipeline.java @@ -41,6 +41,7 @@ import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.common.bytes.CompositeBytesReference; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.util.ArrayDeque; import java.util.ArrayList; @@ -133,7 +134,7 @@ public void doHandleBytes(TcpChannel channel, ReleasableBytesReference reference if (incomingMessageProtocol.equals(BaseInboundMessage.PROTOBUF_PROTOCOL) && this.version.onOrAfter(Version.V_3_0_0)) { // removing the first byte we added for protobuf message byte[] incomingBytes = BytesReference.toBytes(reference.slice(3, reference.length() - 3)); - NodeToNodeMessage protobufMessage = new NodeToNodeMessage(incomingBytes); + NodeToNodeMessage protobufMessage = new NodeToNodeMessage(new ByteArrayInputStream(incomingBytes)); protobufMessage.setProtocol(); messageHandler.accept(channel, protobufMessage); } else { diff --git a/server/src/main/java/org/opensearch/transport/NodeToNodeMessage.java b/server/src/main/java/org/opensearch/transport/NodeToNodeMessage.java index d1fe9775033dd..943007f7913c6 100644 --- a/server/src/main/java/org/opensearch/transport/NodeToNodeMessage.java +++ b/server/src/main/java/org/opensearch/transport/NodeToNodeMessage.java @@ -9,7 +9,6 @@ package org.opensearch.transport; import com.google.protobuf.ByteString; -import com.google.protobuf.InvalidProtocolBufferException; import org.opensearch.Version; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.util.concurrent.ThreadContext; @@ -18,8 +17,10 @@ import org.opensearch.server.proto.NodeToNodeMessageProto.NodeToNodeMessage.Header; import org.opensearch.server.proto.NodeToNodeMessageProto.NodeToNodeMessage.ResponseHandlersList; import org.opensearch.server.proto.QueryFetchSearchResultProto.QueryFetchSearchResult; +import org.opensearch.server.proto.QuerySearchResultProto.QuerySearchResult; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.util.Arrays; import java.util.HashMap; @@ -74,15 +75,51 @@ public NodeToNodeMessage( .setAction(action) .addAllFeatures(features) .build(); + } + public NodeToNodeMessage( + long requestId, + byte[] status, + Version version, + ThreadContext threadContext, + QuerySearchResult querySearchResult, + Set features, + String action + ) { + Header header = Header.newBuilder() + .addAllPrefix(Arrays.asList(ByteString.copyFrom(PREFIX))) + .setRequestId(requestId) + .setStatus(ByteString.copyFrom(status)) + .setVersionId(version.id) + .build(); + Map requestHeaders = threadContext.getHeaders(); + Map> responseHeaders = threadContext.getResponseHeaders(); + Map responseHandlers = new HashMap<>(); + for (Map.Entry> entry : responseHeaders.entrySet()) { + String key = entry.getKey(); + List value = entry.getValue(); + ResponseHandlersList responseHandlersList = ResponseHandlersList.newBuilder().addAllSetOfResponseHandlers(value).build(); + responseHandlers.put(key, responseHandlersList); + } + this.message = NodeToNodeMessageProto.NodeToNodeMessage.newBuilder() + .setHeader(header) + .putAllRequestHeaders(requestHeaders) + .putAllResponseHandlers(responseHandlers) + .setVersion(version.toString()) + .setStatus(ByteString.copyFrom(status)) + .setRequestId(requestId) + .setQuerySearchResult(querySearchResult) + .setAction(action) + .addAllFeatures(features) + .build(); } - public NodeToNodeMessage(byte[] in) throws InvalidProtocolBufferException { + public NodeToNodeMessage(InputStream in) throws IOException { this.message = NodeToNodeMessageProto.NodeToNodeMessage.parseFrom(in); } public void writeTo(OutputStream out) throws IOException { - out.write(this.message.toByteArray()); + this.message.writeTo(out); } BytesReference serialize(BytesStreamOutput bytesStream) throws IOException { diff --git a/server/src/main/java/org/opensearch/transport/OutboundHandler.java b/server/src/main/java/org/opensearch/transport/OutboundHandler.java index a45a7e65116ba..ec3a0efd3c834 100644 --- a/server/src/main/java/org/opensearch/transport/OutboundHandler.java +++ b/server/src/main/java/org/opensearch/transport/OutboundHandler.java @@ -52,6 +52,7 @@ import org.opensearch.core.common.transport.TransportAddress; import org.opensearch.core.transport.TransportResponse; import org.opensearch.search.fetch.QueryFetchSearchResult; +import org.opensearch.search.query.QuerySearchResult; import org.opensearch.threadpool.ThreadPool; import java.io.IOException; @@ -149,21 +150,40 @@ void sendResponse( Version version = Version.min(this.version, nodeVersion); ActionListener listener = ActionListener.wrap(() -> messageListener.onResponseSent(requestId, action, response)); if ((response.getProtocol()).equals(BaseInboundMessage.PROTOBUF_PROTOCOL) && version.onOrAfter(Version.V_3_0_0)) { - QueryFetchSearchResult queryFetchSearchResult = (QueryFetchSearchResult) response; - if (queryFetchSearchResult.response() != null) { - byte[] bytes = new byte[1]; - bytes[0] = 1; - NodeToNodeMessage protobufMessage = new NodeToNodeMessage( - requestId, - bytes, - Version.CURRENT, - threadPool.getThreadContext(), - queryFetchSearchResult.response(), - features, - action - ); - sendProtobufMessage(channel, protobufMessage, listener); + if (response instanceof QueryFetchSearchResult) { + QueryFetchSearchResult queryFetchSearchResult = (QueryFetchSearchResult) response; + if (queryFetchSearchResult.response() != null) { + byte[] bytes = new byte[1]; + bytes[0] = 1; + NodeToNodeMessage protobufMessage = new NodeToNodeMessage( + requestId, + bytes, + Version.CURRENT, + threadPool.getThreadContext(), + queryFetchSearchResult.response(), + features, + action + ); + sendProtobufMessage(channel, protobufMessage, listener); + } + } else if (response instanceof QuerySearchResult) { + QuerySearchResult querySearchResult = (QuerySearchResult) response; + if (querySearchResult.response() != null) { + byte[] bytes = new byte[1]; + bytes[0] = 1; + NodeToNodeMessage protobufMessage = new NodeToNodeMessage( + requestId, + bytes, + Version.CURRENT, + threadPool.getThreadContext(), + querySearchResult.response(), + features, + action + ); + sendProtobufMessage(channel, protobufMessage, listener); + } } + } else { OutboundMessage.Response message = new OutboundMessage.Response( threadPool.getThreadContext(), diff --git a/server/src/main/java/org/opensearch/transport/PlainTransportFuture.java b/server/src/main/java/org/opensearch/transport/PlainTransportFuture.java index 73d8fbd4d528d..ff9ca8b189904 100644 --- a/server/src/main/java/org/opensearch/transport/PlainTransportFuture.java +++ b/server/src/main/java/org/opensearch/transport/PlainTransportFuture.java @@ -39,7 +39,6 @@ import org.opensearch.core.transport.TransportResponse; import java.io.IOException; -import java.io.InputStream; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -126,10 +125,4 @@ public void handleException(TransportException exp) { public String toString() { return "future(" + handler.toString() + ")"; } - - @Override - public V read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } } diff --git a/server/src/main/java/org/opensearch/transport/ProtobufMessageHandler.java b/server/src/main/java/org/opensearch/transport/ProtobufMessageHandler.java index b46ff2c86c7b8..d3262f4d3e5c4 100644 --- a/server/src/main/java/org/opensearch/transport/ProtobufMessageHandler.java +++ b/server/src/main/java/org/opensearch/transport/ProtobufMessageHandler.java @@ -21,6 +21,7 @@ import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.core.common.transport.TransportAddress; import org.opensearch.core.transport.TransportResponse; +import org.opensearch.search.query.QuerySearchResult; import org.opensearch.server.proto.QueryFetchSearchResultProto.QueryFetchSearchResult; import org.opensearch.threadpool.ThreadPool; @@ -110,6 +111,19 @@ private void handleProtobufResponse( final T response = (T) queryFetchSearchResult2; response.remoteAddress(new TransportAddress(remoteAddress)); + final String executor = handler.executor(); + if (ThreadPool.Names.SAME.equals(executor)) { + doHandleResponse(handler, response); + } else { + threadPool.executor(executor).execute(() -> doHandleResponse(handler, response)); + } + } else if (receivedMessage.hasQuerySearchResult()) { + final org.opensearch.server.proto.QuerySearchResultProto.QuerySearchResult querySearchResult = receivedMessage + .getQuerySearchResult(); + QuerySearchResult querySearchResult2 = new QuerySearchResult(querySearchResult); + final T response = (T) querySearchResult2; + response.remoteAddress(new TransportAddress(remoteAddress)); + final String executor = handler.executor(); if (ThreadPool.Names.SAME.equals(executor)) { doHandleResponse(handler, response); diff --git a/server/src/main/java/org/opensearch/transport/RemoteClusterConnection.java b/server/src/main/java/org/opensearch/transport/RemoteClusterConnection.java index 4b2f8c2014206..8a5f6dfffb036 100644 --- a/server/src/main/java/org/opensearch/transport/RemoteClusterConnection.java +++ b/server/src/main/java/org/opensearch/transport/RemoteClusterConnection.java @@ -47,7 +47,6 @@ import java.io.Closeable; import java.io.IOException; -import java.io.InputStream; import java.util.function.Function; /** @@ -171,12 +170,6 @@ public void handleException(TransportException exp) { public String executor() { return ThreadPool.Names.SAME; } - - @Override - public ClusterStateResponse read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } } ); } diff --git a/server/src/main/java/org/opensearch/transport/SniffConnectionStrategy.java b/server/src/main/java/org/opensearch/transport/SniffConnectionStrategy.java index 3313ab7103db6..07ba96b135189 100644 --- a/server/src/main/java/org/opensearch/transport/SniffConnectionStrategy.java +++ b/server/src/main/java/org/opensearch/transport/SniffConnectionStrategy.java @@ -58,7 +58,6 @@ import org.opensearch.threadpool.ThreadPool; import java.io.IOException; -import java.io.InputStream; import java.net.InetSocketAddress; import java.util.Arrays; import java.util.Collections; @@ -472,12 +471,6 @@ public void handleException(TransportException exp) { public String executor() { return ThreadPool.Names.MANAGEMENT; } - - @Override - public ClusterStateResponse read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } } private Predicate getRemoteClusterNamePredicate() { diff --git a/server/src/main/java/org/opensearch/transport/TransportActionProxy.java b/server/src/main/java/org/opensearch/transport/TransportActionProxy.java index 830c320679cb6..a61aec8a34e20 100644 --- a/server/src/main/java/org/opensearch/transport/TransportActionProxy.java +++ b/server/src/main/java/org/opensearch/transport/TransportActionProxy.java @@ -40,7 +40,6 @@ import org.opensearch.threadpool.ThreadPool; import java.io.IOException; -import java.io.InputStream; import java.io.UncheckedIOException; import java.util.function.Function; @@ -131,12 +130,6 @@ public void handleException(TransportException exp) { public String executor() { return ThreadPool.Names.SAME; } - - @Override - public T read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } } /** diff --git a/server/src/main/java/org/opensearch/transport/TransportHandshaker.java b/server/src/main/java/org/opensearch/transport/TransportHandshaker.java index a6dddc430086a..d0b00ec9c59db 100644 --- a/server/src/main/java/org/opensearch/transport/TransportHandshaker.java +++ b/server/src/main/java/org/opensearch/transport/TransportHandshaker.java @@ -45,7 +45,6 @@ import java.io.EOFException; import java.io.IOException; -import java.io.InputStream; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -188,12 +187,6 @@ void handleLocalException(TransportException e) { public String executor() { return ThreadPool.Names.SAME; } - - @Override - public HandshakeResponse read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } } static final class HandshakeRequest extends TransportRequest { diff --git a/server/src/main/java/org/opensearch/transport/TransportResponseHandler.java b/server/src/main/java/org/opensearch/transport/TransportResponseHandler.java index b11892b3d6985..42048258bb5c9 100644 --- a/server/src/main/java/org/opensearch/transport/TransportResponseHandler.java +++ b/server/src/main/java/org/opensearch/transport/TransportResponseHandler.java @@ -33,7 +33,7 @@ package org.opensearch.transport; import org.opensearch.common.annotation.PublicApi; -import org.opensearch.core.common.io.stream.ProtobufWriteable; +import org.opensearch.core.common.io.stream.BytesWriteable; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.core.transport.TransportResponse; @@ -48,7 +48,7 @@ * @opensearch.api */ @PublicApi(since = "1.0.0") -public interface TransportResponseHandler extends Writeable.Reader { +public interface TransportResponseHandler extends Writeable.Reader, BytesWriteable.Reader { void handleResponse(T response); @@ -56,6 +56,16 @@ public interface TransportResponseHandler extends W String executor(); + /** + * Read {@code V}-type value from a byte array. + * + * @param in byte array to read the value from + */ + default T read(final InputStream in) throws IOException { + // TODO Auto-generated method stub + throw new UnsupportedOperationException("Unimplemented method 'read'"); + } + /** * This method should be handling the rejection/failure scenarios where connection to the node is rejected or failed. * It should be used to clear up the resources held by the {@link TransportResponseHandler}. diff --git a/server/src/main/proto/server/NodeToNodeMessageProto.proto b/server/src/main/proto/server/NodeToNodeMessageProto.proto index 05332016b4181..e0d46bac94216 100644 --- a/server/src/main/proto/server/NodeToNodeMessageProto.proto +++ b/server/src/main/proto/server/NodeToNodeMessageProto.proto @@ -13,6 +13,7 @@ syntax = "proto3"; package org.opensearch.server.proto; import "server/search/QueryFetchSearchResultProto.proto"; +import "server/search/QuerySearchResultProto.proto"; option java_outer_classname = "NodeToNodeMessageProto"; @@ -27,6 +28,7 @@ message NodeToNodeMessage { int64 requestId = 8; oneof message { QueryFetchSearchResult queryFetchSearchResult = 9; + QuerySearchResult querySearchResult = 10; } message Header { diff --git a/server/src/main/proto/server/search/FetchSearchResultProto.proto b/server/src/main/proto/server/search/FetchSearchResultProto.proto index c0ba7b4399c2b..b126ca439e9f2 100644 --- a/server/src/main/proto/server/search/FetchSearchResultProto.proto +++ b/server/src/main/proto/server/search/FetchSearchResultProto.proto @@ -62,7 +62,7 @@ message SearchHit { message DocumentField { string name = 1; - repeated bytes values = 2; + repeated DocumentFieldValue values = 2; } message HighlightField { @@ -115,3 +115,19 @@ message CollapseValue { optional bool collapseBool = 7; } +message DocumentFieldValue { + optional string valueString = 1; + optional int32 valueInt = 2; + optional int64 valueLong = 3; + optional float valueFloat = 4; + optional double valueDouble = 5; + optional bool valueBool = 6; + repeated bytes valueByteArray = 7; + repeated DocumentFieldValue valueArrayList = 8; + map valueMap = 9; + optional int64 valueDate = 10; + optional string valueZonedDate = 11; + optional int64 valueZonedTime = 12; + optional string valueText = 13; +} + diff --git a/server/src/main/proto/server/search/QuerySearchResultProto.proto b/server/src/main/proto/server/search/QuerySearchResultProto.proto index 2adc887826007..32ed57d4d9063 100644 --- a/server/src/main/proto/server/search/QuerySearchResultProto.proto +++ b/server/src/main/proto/server/search/QuerySearchResultProto.proto @@ -34,6 +34,7 @@ message QuerySearchResult { optional int32 nodeQueueSize = 15; SearchShardTarget searchShardTarget = 17; ShardSearchRequest searchShardRequest = 18; + bool isNull = 19; message TopDocsAndMaxScore { TopDocs topDocs = 1; diff --git a/server/src/main/proto/server/search/ShardSearchRequestProto.proto b/server/src/main/proto/server/search/ShardSearchRequestProto.proto index dad6ce044d69d..0975a96d77167 100644 --- a/server/src/main/proto/server/search/ShardSearchRequestProto.proto +++ b/server/src/main/proto/server/search/ShardSearchRequestProto.proto @@ -21,7 +21,7 @@ message ShardSearchRequest { SearchType searchType = 4; bytes source = 5; bool requestCache = 6; - bytes aliasFilter = 7; + AliasFilter aliasFilter = 7; float indexBoost = 8; bool allowPartialSearchResults = 9; repeated string indexRoutings = 10; @@ -72,4 +72,8 @@ message OriginalIndices { bool ignoreAliases = 8; bool ignoreThrottled = 9; } +} + +message AliasFilter { + repeated string aliases = 1; } \ No newline at end of file diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/configuration/TransportAddVotingConfigExclusionsActionTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/configuration/TransportAddVotingConfigExclusionsActionTests.java index 0e9298639a145..a015e671f4872 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/configuration/TransportAddVotingConfigExclusionsActionTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/configuration/TransportAddVotingConfigExclusionsActionTests.java @@ -69,7 +69,6 @@ import org.junit.BeforeClass; import java.io.IOException; -import java.io.InputStream; import java.util.HashSet; import java.util.Set; import java.util.concurrent.CountDownLatch; @@ -686,12 +685,6 @@ public String executor() { public AddVotingConfigExclusionsResponse read(StreamInput in) throws IOException { return new AddVotingConfigExclusionsResponse(in); } - - @Override - public AddVotingConfigExclusionsResponse read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } }; } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/configuration/TransportClearVotingConfigExclusionsActionTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/configuration/TransportClearVotingConfigExclusionsActionTests.java index 78975e19c4d2b..10e4ab6388be4 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/configuration/TransportClearVotingConfigExclusionsActionTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/configuration/TransportClearVotingConfigExclusionsActionTests.java @@ -63,7 +63,6 @@ import org.junit.BeforeClass; import java.io.IOException; -import java.io.InputStream; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -253,12 +252,6 @@ public String executor() { public ClearVotingConfigExclusionsResponse read(StreamInput in) throws IOException { return new ClearVotingConfigExclusionsResponse(in); } - - @Override - public ClearVotingConfigExclusionsResponse read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } }; } } diff --git a/server/src/test/java/org/opensearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java b/server/src/test/java/org/opensearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java index 91caed7a8c898..cce8758ef1014 100644 --- a/server/src/test/java/org/opensearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java +++ b/server/src/test/java/org/opensearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java @@ -34,7 +34,6 @@ import org.opensearch.Version; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.PlainActionFuture; -import org.opensearch.action.support.replication.TransportReplicationAction.ReplicaResponse; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.action.shard.ShardStateAction; import org.opensearch.cluster.block.ClusterBlock; @@ -78,7 +77,6 @@ import org.junit.Before; import java.io.IOException; -import java.io.InputStream; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -225,12 +223,6 @@ public void handleException(TransportException exp) { public String executor() { return ThreadPool.Names.SAME; } - - @Override - public ReplicaResponse read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } } ); } diff --git a/server/src/test/java/org/opensearch/cluster/coordination/FollowersCheckerTests.java b/server/src/test/java/org/opensearch/cluster/coordination/FollowersCheckerTests.java index fb495de053b4d..a106706c00732 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/FollowersCheckerTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/FollowersCheckerTests.java @@ -60,8 +60,6 @@ import org.opensearch.transport.TransportResponseHandler; import org.opensearch.transport.TransportService; -import java.io.IOException; -import java.io.InputStream; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -534,12 +532,6 @@ public void handleException(TransportException exp) { public String executor() { return Names.SAME; } - - @Override - public Empty read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } } ); deterministicTaskQueue.runAllTasks(); @@ -628,12 +620,6 @@ public void handleException(TransportException exp) { public String executor() { return Names.SAME; } - - @Override - public Empty read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } } ); deterministicTaskQueue.runAllTasks(); @@ -706,12 +692,6 @@ public void handleException(TransportException exp) { public String executor() { return Names.SAME; } - - @Override - public Empty read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } } ); deterministicTaskQueue.runAllTasks(); @@ -809,11 +789,5 @@ public TransportResponse.Empty read(StreamInput in) { return TransportResponse.Empty.INSTANCE; } - @Override - public Empty read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } - } } diff --git a/server/src/test/java/org/opensearch/cluster/coordination/LeaderCheckerTests.java b/server/src/test/java/org/opensearch/cluster/coordination/LeaderCheckerTests.java index 956b0887d1faa..fe65058333116 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/LeaderCheckerTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/LeaderCheckerTests.java @@ -57,8 +57,6 @@ import org.opensearch.transport.TransportResponseHandler; import org.opensearch.transport.TransportService; -import java.io.IOException; -import java.io.InputStream; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -551,12 +549,6 @@ public String executor() { public TransportResponse.Empty read(StreamInput in) { return TransportResponse.Empty.INSTANCE; } - - @Override - public Empty read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } } public void testLeaderCheckRequestEqualsHashcodeSerialization() { diff --git a/server/src/test/java/org/opensearch/cluster/coordination/PreVoteCollectorTests.java b/server/src/test/java/org/opensearch/cluster/coordination/PreVoteCollectorTests.java index b4cf0c77270b3..5ddf614db3334 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/PreVoteCollectorTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/PreVoteCollectorTests.java @@ -53,7 +53,6 @@ import org.junit.Before; import java.io.IOException; -import java.io.InputStream; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -359,12 +358,6 @@ public void handleException(TransportException exp) { public String executor() { return SAME; } - - @Override - public PreVoteResponse read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } } ); diff --git a/server/src/test/java/org/opensearch/discovery/PeerFinderTests.java b/server/src/test/java/org/opensearch/discovery/PeerFinderTests.java index e64696b00311d..f861ab90896db 100644 --- a/server/src/test/java/org/opensearch/discovery/PeerFinderTests.java +++ b/server/src/test/java/org/opensearch/discovery/PeerFinderTests.java @@ -59,7 +59,6 @@ import org.junit.Before; import java.io.IOException; -import java.io.InputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -548,12 +547,6 @@ public void handleException(TransportException exp) { public String executor() { return Names.SAME; } - - @Override - public PeersResponse read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } } ); diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceServiceTests.java index 19acb8ef938d5..8f84053f2618e 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceServiceTests.java @@ -40,7 +40,6 @@ import org.opensearch.transport.TransportService; import java.io.IOException; -import java.io.InputStream; import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; @@ -221,12 +220,6 @@ public String executor() { public CheckpointInfoResponse read(StreamInput in) throws IOException { return new CheckpointInfoResponse(in); } - - @Override - public CheckpointInfoResponse read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } } ); } @@ -256,12 +249,6 @@ public String executor() { public GetSegmentFilesResponse read(StreamInput in) throws IOException { return new GetSegmentFilesResponse(in); } - - @Override - public GetSegmentFilesResponse read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } } ); } @@ -302,12 +289,6 @@ public String executor() { public CheckpointInfoResponse read(StreamInput in) throws IOException { return new CheckpointInfoResponse(in); } - - @Override - public TransportResponse read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } } ); } diff --git a/server/src/test/java/org/opensearch/transport/InboundHandlerTests.java b/server/src/test/java/org/opensearch/transport/InboundHandlerTests.java index cfe009d21d944..892909b094eeb 100644 --- a/server/src/test/java/org/opensearch/transport/InboundHandlerTests.java +++ b/server/src/test/java/org/opensearch/transport/InboundHandlerTests.java @@ -64,6 +64,7 @@ import org.junit.After; import org.junit.Before; +import java.io.ByteArrayInputStream; import java.io.EOFException; import java.io.IOException; import java.io.InputStream; @@ -184,12 +185,6 @@ public String executor() { public TestResponse read(StreamInput in) throws IOException { return new TestResponse(in); } - - @Override - public TestResponse read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } }, null, action)); RequestHandlerRegistry registry = new RequestHandlerRegistry<>( action, @@ -333,7 +328,7 @@ public QueryFetchSearchResult read(InputStream in) throws IOException { BytesReference fullResponseBytes = channel.getMessageCaptor().get(); byte[] incomingBytes = BytesReference.toBytes(fullResponseBytes.slice(3, fullResponseBytes.length() - 3)); - NodeToNodeMessage nodeToNodeMessage = new NodeToNodeMessage(incomingBytes); + NodeToNodeMessage nodeToNodeMessage = new NodeToNodeMessage(new ByteArrayInputStream(incomingBytes)); handler.inboundMessage(channel, nodeToNodeMessage); QueryFetchSearchResult result = responseCaptor.get(); assertNotNull(result); @@ -465,12 +460,6 @@ public String executor() { public TestResponse read(StreamInput in) throws IOException { return new TestResponse(in); } - - @Override - public TestResponse read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } }, null, action)); RequestHandlerRegistry registry = new RequestHandlerRegistry<>( @@ -543,12 +532,6 @@ public String executor() { public TestResponse read(StreamInput in) throws IOException { return new TestResponse(in); } - - @Override - public TestResponse read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } }, null, action)); RequestHandlerRegistry registry = new RequestHandlerRegistry<>( @@ -622,12 +605,6 @@ public String executor() { public TestResponse read(StreamInput in) throws IOException { return new TestResponse(in); } - - @Override - public TestResponse read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } }, null, action)); RequestHandlerRegistry registry = new RequestHandlerRegistry<>( action, @@ -716,12 +693,6 @@ public String executor() { public TestResponse read(StreamInput in) throws IOException { return new TestResponse(in); } - - @Override - public TestResponse read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } }, null, action)); RequestHandlerRegistry registry = new RequestHandlerRegistry<>( action, diff --git a/server/src/test/java/org/opensearch/transport/OutboundHandlerTests.java b/server/src/test/java/org/opensearch/transport/OutboundHandlerTests.java index e03d550fb0266..ec40e95fe45c1 100644 --- a/server/src/test/java/org/opensearch/transport/OutboundHandlerTests.java +++ b/server/src/test/java/org/opensearch/transport/OutboundHandlerTests.java @@ -71,6 +71,7 @@ import org.junit.After; import org.junit.Before; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; @@ -330,7 +331,7 @@ public void onResponseSent(long requestId, String action, TransportResponse resp inboundPipeline.handleBytes(channel, new ReleasableBytesReference(reference, () -> {})); final BytesReference responseBytes = protobufMessage.get(); - final NodeToNodeMessage message = new NodeToNodeMessage(responseBytes.toBytesRef().bytes); + final NodeToNodeMessage message = new NodeToNodeMessage(new ByteArrayInputStream(responseBytes.toBytesRef().bytes)); assertEquals(version.toString(), message.getMessage().getVersion()); assertEquals(requestId, message.getHeader().getRequestId()); assertNotNull(message.getRequestHeaders()); diff --git a/server/src/test/java/org/opensearch/transport/TransportActionProxyTests.java b/server/src/test/java/org/opensearch/transport/TransportActionProxyTests.java index 77c6022bfbfbe..dd2aefd2318f7 100644 --- a/server/src/test/java/org/opensearch/transport/TransportActionProxyTests.java +++ b/server/src/test/java/org/opensearch/transport/TransportActionProxyTests.java @@ -48,7 +48,6 @@ import org.junit.Before; import java.io.IOException; -import java.io.InputStream; import java.util.concurrent.CountDownLatch; public class TransportActionProxyTests extends OpenSearchTestCase { @@ -150,12 +149,6 @@ public void handleException(TransportException exp) { public String executor() { return ThreadPool.Names.SAME; } - - @Override - public SimpleTestResponse read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } } ); latch.await(); @@ -216,12 +209,6 @@ public void handleException(TransportException exp) { public String executor() { return ThreadPool.Names.SAME; } - - @Override - public SimpleTestResponse read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } } ); latch.await(); diff --git a/server/src/test/java/org/opensearch/transport/TransportServiceDeserializationFailureTests.java b/server/src/test/java/org/opensearch/transport/TransportServiceDeserializationFailureTests.java index 443178a9610d0..d10b4f26100cc 100644 --- a/server/src/test/java/org/opensearch/transport/TransportServiceDeserializationFailureTests.java +++ b/server/src/test/java/org/opensearch/transport/TransportServiceDeserializationFailureTests.java @@ -41,7 +41,6 @@ import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.tasks.TaskId; import org.opensearch.core.transport.TransportResponse; -import org.opensearch.core.transport.TransportResponse.Empty; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskAwareRequest; import org.opensearch.telemetry.tracing.noop.NoopTracer; @@ -49,8 +48,6 @@ import org.opensearch.test.transport.MockTransport; import org.opensearch.threadpool.ThreadPool; -import java.io.IOException; -import java.io.InputStream; import java.util.Collections; import java.util.List; @@ -137,12 +134,6 @@ public TransportResponse.Empty read(StreamInput in) { public String toString() { return "test handler without parent"; } - - @Override - public Empty read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } } ); @@ -199,12 +190,6 @@ public TransportResponse.Empty read(StreamInput in) { public String toString() { return "test handler with parent"; } - - @Override - public Empty read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } } ); diff --git a/test/framework/src/main/java/org/opensearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/opensearch/transport/AbstractSimpleTransportTestCase.java index 93500d3c300cd..e43b0756e2f2b 100644 --- a/test/framework/src/main/java/org/opensearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/opensearch/transport/AbstractSimpleTransportTestCase.java @@ -65,7 +65,6 @@ import org.opensearch.core.common.transport.BoundTransportAddress; import org.opensearch.core.common.transport.TransportAddress; import org.opensearch.core.transport.TransportResponse; -import org.opensearch.core.transport.TransportResponse.Empty; import org.opensearch.node.Node; import org.opensearch.tasks.Task; import org.opensearch.telemetry.tracing.noop.NoopTracer; @@ -81,7 +80,6 @@ import org.junit.Before; import java.io.IOException; -import java.io.InputStream; import java.io.UncheckedIOException; import java.net.Inet4Address; import java.net.Inet6Address; @@ -329,12 +327,6 @@ public void handleException(TransportException exp) { logger.error("Unexpected failure", exp); fail("got exception instead of a response: " + exp.getMessage()); } - - @Override - public StringMessageResponse read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } } ); @@ -371,12 +363,6 @@ public void handleException(TransportException exp) { logger.error("Unexpected failure", exp); fail("got exception instead of a response: " + exp.getMessage()); } - - @Override - public StringMessageResponse read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } } ); @@ -434,12 +420,6 @@ public void handleException(TransportException exp) { logger.error("Unexpected failure", exp); fail("got exception instead of a response: " + exp.getMessage()); } - - @Override - public StringMessageResponse read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } }; StringMessageRequest ping = new StringMessageRequest("ping"); threadPool.getThreadContext().putHeader("test.ping.user", "ping_user"); @@ -499,12 +479,6 @@ public void handleException(TransportException exp) { public String executor() { return ThreadPool.Names.GENERIC; } - - @Override - public StringMessageResponse read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } } ); responseLatch.await(); @@ -688,12 +662,6 @@ public void handleException(TransportException exp) { logger.error("Unexpected failure", exp); fail("got exception instead of a response: " + exp.getMessage()); } - - @Override - public Empty read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } } ); @@ -756,12 +724,6 @@ public void handleException(TransportException exp) { logger.error("Unexpected failure", exp); fail("got exception instead of a response: " + exp.getMessage()); } - - @Override - public StringMessageResponse read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } } ); @@ -809,12 +771,6 @@ public void handleResponse(StringMessageResponse response) { public void handleException(TransportException exp) { assertThat("runtime_exception: bad message !!!", equalTo(exp.getCause().getMessage())); } - - @Override - public StringMessageResponse read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } } ); @@ -1064,12 +1020,6 @@ public void handleResponse(StringMessageResponse response) { public void handleException(TransportException exp) { assertThat(exp, instanceOf(ReceiveTimeoutTransportException.class)); } - - @Override - public StringMessageResponse read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } } ); @@ -1144,12 +1094,6 @@ public void handleException(TransportException exp) { latch.countDown(); assertThat(exp, instanceOf(ReceiveTimeoutTransportException.class)); } - - @Override - public StringMessageResponse read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } } ); @@ -1191,12 +1135,6 @@ public void handleException(TransportException exp) { logger.error("Unexpected failure", exp); fail("got exception instead of a response for " + counter + ": " + exp.getDetailedMessage()); } - - @Override - public StringMessageResponse read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } } ); @@ -1241,12 +1179,6 @@ public void handleException(TransportException exp) {} public String executor() { return ThreadPool.Names.SAME; } - - @Override - public StringMessageResponse read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } }; serviceA.registerRequestHandler("internal:test", ThreadPool.Names.SAME, StringMessageRequest::new, handler); @@ -1550,12 +1482,6 @@ public void handleException(TransportException exp) { public String executor() { return ThreadPool.Names.SAME; } - - @Override - public Version0Response read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } } ).txGet(); @@ -1607,12 +1533,6 @@ public void handleException(TransportException exp) { public String executor() { return ThreadPool.Names.SAME; } - - @Override - public Version1Response read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } } ).txGet(); @@ -1658,12 +1578,6 @@ public void handleException(TransportException exp) { public String executor() { return ThreadPool.Names.SAME; } - - @Override - public Version1Response read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } } ).txGet(); @@ -1706,12 +1620,6 @@ public void handleException(TransportException exp) { public String executor() { return ThreadPool.Names.SAME; } - - @Override - public Version0Response read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } } ).txGet(); @@ -1757,12 +1665,6 @@ public void handleException(TransportException exp) { assertThat(cause, instanceOf(ConnectTransportException.class)); assertThat(((ConnectTransportException) cause).node(), equalTo(nodeA)); } - - @Override - public StringMessageResponse read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } } ); @@ -1827,12 +1729,6 @@ public void handleResponse(StringMessageResponse response) { public void handleException(TransportException exp) { assertThat(exp, instanceOf(ReceiveTimeoutTransportException.class)); } - - @Override - public StringMessageResponse read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } } ); @@ -1884,12 +1780,6 @@ public void handleException(TransportException exp) { public String executor() { return ThreadPool.Names.SAME; } - - @Override - public TestResponse read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } }); if (!latch.await(10, TimeUnit.SECONDS)) { @@ -1938,12 +1828,6 @@ public void handleException(TransportException exp) { public String executor() { return ThreadPool.Names.SAME; } - - @Override - public TestResponse read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } } ); @@ -1979,12 +1863,6 @@ public void handleException(TransportException exp) { public String executor() { return ThreadPool.Names.SAME; } - - @Override - public TestResponse read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } } ); @@ -2143,12 +2021,6 @@ public void handleException(TransportException exp) { public String executor() { return randomBoolean() ? ThreadPool.Names.SAME : ThreadPool.Names.GENERIC; } - - @Override - public TestResponse read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } } ); } else { @@ -2211,12 +2083,6 @@ public void handleException(TransportException exp) { public String executor() { return randomBoolean() ? ThreadPool.Names.SAME : ThreadPool.Names.GENERIC; } - - @Override - public TestResponse read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } } for (int i = 0; i < iters; i++) { @@ -2552,12 +2418,6 @@ public void handleException(TransportException exp) { public String executor() { return randomFrom(executors); } - - @Override - public TransportResponse read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } }; serviceB.sendRequest(nodeA, "internal:action", new TestRequest(randomFrom("fail", "pass")), transportResponseHandler); @@ -2608,12 +2468,6 @@ public void handleException(TransportException exp) { public String executor() { return randomFrom(executors); } - - @Override - public TransportResponse read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } }; ConnectionProfile.Builder builder = new ConnectionProfile.Builder(); builder.addConnections( @@ -2684,12 +2538,6 @@ public void handleException(TransportException exp) { public String executor() { return ThreadPool.Names.SAME; } - - @Override - public TransportResponse read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } }; ConnectionProfile.Builder builder = new ConnectionProfile.Builder(); @@ -2764,12 +2612,6 @@ public void handleException(TransportException exp) { public String executor() { return ThreadPool.Names.SAME; } - - @Override - public TransportResponse read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } }; TransportStats stats = serviceC.transport.getStats(); // nothing transmitted / read yet @@ -2891,12 +2733,6 @@ public void handleException(TransportException exp) { public String executor() { return ThreadPool.Names.SAME; } - - @Override - public TransportResponse read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } }; TransportStats stats = serviceC.transport.getStats(); // nothing transmitted / read yet @@ -3284,12 +3120,6 @@ public String executor() { public TransportResponse read(final StreamInput in) { return TransportResponse.Empty.INSTANCE; } - - @Override - public TransportResponse read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } } ); assertThat(te.get(), not(nullValue())); diff --git a/test/framework/src/test/java/org/opensearch/test/disruption/DisruptableMockTransportTests.java b/test/framework/src/test/java/org/opensearch/test/disruption/DisruptableMockTransportTests.java index d4583b6c29419..6b64270ca68e1 100644 --- a/test/framework/src/test/java/org/opensearch/test/disruption/DisruptableMockTransportTests.java +++ b/test/framework/src/test/java/org/opensearch/test/disruption/DisruptableMockTransportTests.java @@ -57,7 +57,6 @@ import org.junit.Before; import java.io.IOException; -import java.io.InputStream; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -234,12 +233,6 @@ public void handleException(TransportException exp) { public String executor() { return ThreadPool.Names.SAME; } - - @Override - public TransportResponse read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } }; } @@ -264,12 +257,6 @@ public void handleException(TransportException exp) { public String executor() { return ThreadPool.Names.SAME; } - - @Override - public TransportResponse read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } }; } @@ -294,12 +281,6 @@ public void handleException(TransportException exp) { public String executor() { return ThreadPool.Names.SAME; } - - @Override - public TransportResponse read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } }; } diff --git a/test/framework/src/test/java/org/opensearch/test/disruption/NetworkDisruptionIT.java b/test/framework/src/test/java/org/opensearch/test/disruption/NetworkDisruptionIT.java index f36b7bde14083..362ecd692360d 100644 --- a/test/framework/src/test/java/org/opensearch/test/disruption/NetworkDisruptionIT.java +++ b/test/framework/src/test/java/org/opensearch/test/disruption/NetworkDisruptionIT.java @@ -51,7 +51,6 @@ import org.opensearch.transport.TransportService; import java.io.IOException; -import java.io.InputStream; import java.util.Arrays; import java.util.Collection; import java.util.HashSet; @@ -215,12 +214,6 @@ public String executor() { public TransportResponse read(StreamInput in) throws IOException { return ClusterHealthResponse.readResponseFrom(in); } - - @Override - public TransportResponse read(InputStream in) throws IOException { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'read'"); - } } ); }