From 5ea2a28675b67766556a68df148cdaaa27b90096 Mon Sep 17 00:00:00 2001 From: Finn Carroll Date: Wed, 20 Nov 2024 11:51:44 -0800 Subject: [PATCH] Don't set optional proto fields Signed-off-by: Finn Carroll --- .../search/SearchRequestProtoHelper.java | 50 +++++++++++++------ .../services/search/SearchServiceImpl.java | 19 +++++++ server/src/main/proto/spec/pub | 1 + 3 files changed, 55 insertions(+), 15 deletions(-) create mode 160000 server/src/main/proto/spec/pub diff --git a/server/src/main/java/org/opensearch/grpc/services/search/SearchRequestProtoHelper.java b/server/src/main/java/org/opensearch/grpc/services/search/SearchRequestProtoHelper.java index 6553ee3831f3e..26904d20af110 100644 --- a/server/src/main/java/org/opensearch/grpc/services/search/SearchRequestProtoHelper.java +++ b/server/src/main/java/org/opensearch/grpc/services/search/SearchRequestProtoHelper.java @@ -8,6 +8,7 @@ package org.opensearch.grpc.services.search; +import org.opensearch.action.search.SearchType; import org.opensearch.action.support.IndicesOptions; import org.opensearch.common.unit.TimeValue; import org.opensearch.search.Scroll; @@ -83,12 +84,16 @@ public static org.opensearch.action.search.SearchRequest searchRequestFromProto( //[optional] How many shard results to reduce on a node. Default is 512. //optional int32 batched_reduce_size = 8; - searchReq.setBatchedReduceSize(proto.getBatchedReduceSize()); + if (proto.hasBatchedReduceSize()) { + searchReq.setBatchedReduceSize(proto.getBatchedReduceSize()); + } //[optional] The time after which the search request will be canceled. Request-level parameter takes precedence over cancel_after_time_interval cluster setting. Default is -1. //optional string cancel_after_time_interval = 9; - TimeValue cancelAfter = new TimeValue(Long.parseLong(proto.getCancelAfterTimeInterval())); - searchReq.setCancelAfterTimeInterval(cancelAfter); + if (proto.hasCancelAfterTimeInterval()) { + TimeValue cancelAfter = new TimeValue(Long.parseLong(proto.getCancelAfterTimeInterval())); + searchReq.setCancelAfterTimeInterval(cancelAfter); + } //[optional] Whether to minimize round-trips between a node and remote clusters. Default is true. //optional bool ccs_minimize_roundtrips = 10; @@ -138,7 +143,9 @@ public static org.opensearch.action.search.SearchRequest searchRequestFromProto( //[optional] Numbers of concurrent shard requests this request should execute on each node. Default is 5. //optional int32 max_concurrent_shard_requests = 21; - searchReq.setMaxConcurrentShardRequests(proto.getMaxConcurrentShardRequests()); + if (proto.hasMaxConcurrentShardRequests()) { + searchReq.setMaxConcurrentShardRequests(proto.getMaxConcurrentShardRequests()); + } //[optional] Whether to return phase-level took time values in the response. Default is false. //optional bool phase_took = 22; @@ -146,7 +153,9 @@ public static org.opensearch.action.search.SearchRequest searchRequestFromProto( //[optional] A prefilter size threshold that triggers a prefilter operation if the request exceeds the threshold. Default is 128 shards. //optional int32 pre_filter_shard_size = 23; - searchReq.setPreFilterShardSize(proto.getPreFilterShardSize()); + if (proto.hasPreFilterShardSize()) { + searchReq.setPreFilterShardSize(proto.getPreFilterShardSize()); + } //[optional] Specifies the shards or nodes on which OpenSearch should perform the search. //optional string preference = 24; @@ -160,7 +169,9 @@ public static org.opensearch.action.search.SearchRequest searchRequestFromProto( //[optional] Specifies whether OpenSearch should use the request cache. Default is whether it's enabled in the index's settings. //optional bool request_cache = 26; - searchReq.requestCache(proto.getRequestCache()); + if (proto.hasRequestCache()) { + searchReq.requestCache(proto.getRequestCache()); + } //[optional] Indicates whether to return hits.total as an integer. Returns an object otherwise. Default is false. //optional bool rest_total_hits_as_int = 27; @@ -172,20 +183,29 @@ public static org.opensearch.action.search.SearchRequest searchRequestFromProto( //[optional] Period to keep the search context open. //optional string scroll = 29; - Scroll scroll = new Scroll(parseTimeValue(proto.getScroll(), null, "scroll")); - searchReq.scroll(scroll); + if (proto.hasScroll()) { + Scroll scroll = new Scroll(parseTimeValue(proto.getScroll(), null, "scroll")); + searchReq.scroll(scroll); + } //[optional] Customizable sequence of processing stages applied to search queries. //optional string search_pipeline = 30; - searchReq.pipeline(proto.getSearchPipeline()); + if (proto.hasSearchPipeline()) { + searchReq.pipeline(proto.getSearchPipeline()); + } //[optional] Whether OpenSearch should use global term and document frequencies when calculating relevance scores. Default is SEARCH_TYPE_QUERY_THEN_FETCH. //optional SearchType search_type = 31; - String searchType = proto.getSearchType().name(); - if ("query_and_fetch".equals(searchType) || "dfs_query_and_fetch".equals(searchType)) { - throw new IllegalArgumentException("Unsupported search type [" + searchType + "]"); + switch (proto.getSearchType()) { + case SEARCH_TYPE_QUERY_THEN_FETCH: + searchReq.searchType(SearchType.QUERY_THEN_FETCH); + break; + case SEARCH_TYPE_DFS_QUERY_THEN_FETCH: + searchReq.searchType(SearchType.DFS_QUERY_THEN_FETCH); + break; + default: + searchReq.searchType(SearchType.DEFAULT); } - searchReq.searchType(searchType); //[optional] Whether to return sequence number and primary term of the last operation of each document hit. //optional bool seq_no_primary_term = 32; @@ -195,8 +215,8 @@ public static org.opensearch.action.search.SearchRequest searchRequestFromProto( //[optional] Number of results to include in the response. //optional int32 size = 33; - if (proto.hasSize()) { // TODO - throw new UnsupportedOperationException("opensearch.proto.SearchRequest not supported"); + if (proto.hasSize()) { + searchReq.source().size(proto.getSize()); } //[optional] A list of : pairs to sort by. diff --git a/server/src/main/java/org/opensearch/grpc/services/search/SearchServiceImpl.java b/server/src/main/java/org/opensearch/grpc/services/search/SearchServiceImpl.java index 432818f630a57..bca848beecd61 100644 --- a/server/src/main/java/org/opensearch/grpc/services/search/SearchServiceImpl.java +++ b/server/src/main/java/org/opensearch/grpc/services/search/SearchServiceImpl.java @@ -15,6 +15,8 @@ import org.opensearch.core.action.ActionListener; import opensearch.proto.services.SearchServiceGrpc; +import org.opensearch.wlm.QueryGroupTask; +import org.opensearch.wlm.WorkloadManagementTransportInterceptor; import static org.opensearch.grpc.services.search.SearchRequestProtoHelper.searchRequestFromProto; import static org.opensearch.grpc.services.search.SearchRequestProtoHelper.searchResponseToProto; @@ -59,6 +61,23 @@ public void onFailure(Exception e) { public void search(opensearch.proto.SearchRequest searchRequestProto, StreamObserver responseObserver) { org.opensearch.action.search.SearchRequest searchReq = searchRequestFromProto(searchRequestProto); SearchRequestActionListener listener = new SearchRequestActionListener(responseObserver); + + /* + When we execute a TransportSearchAction a SearchRequestOperationsListener is registered for the action to track resource usage for the request QueryGroup. + The QueryGroupTask.QUERY_GROUP_ID_HEADER uniquely identifies each QueryGroup + Typically the + */ + +// WorkloadManagementTransportInterceptor +// if (isSearchWorkloadRequest(task)) { +// ((QueryGroupTask) task).setQueryGroupId(threadPool.getThreadContext()); +// final String queryGroupId = ((QueryGroupTask) (task)).getQueryGroupId(); +// queryGroupService.rejectIfNeeded(queryGroupId); +// } + + // +// client.threadPool().getThreadContext().putHeader(name, String.join(",", distinctHeaderValues)); + client.execute(SearchAction.INSTANCE, searchReq, listener); } } diff --git a/server/src/main/proto/spec/pub b/server/src/main/proto/spec/pub new file mode 160000 index 0000000000000..ad83bfea210c7 --- /dev/null +++ b/server/src/main/proto/spec/pub @@ -0,0 +1 @@ +Subproject commit ad83bfea210c76886c35bdeddc2cd52537cb134d