From 853b206554f1a124ee557596be9f160be0adab54 Mon Sep 17 00:00:00 2001 From: Finn Carroll Date: Wed, 20 Nov 2024 15:53:17 -0800 Subject: [PATCH] Implement request side of match all query Signed-off-by: Finn Carroll --- .../search/SearchRequestProtoHelper.java | 72 ++++++++++++++----- .../services/search/SearchServiceImpl.java | 20 +----- 2 files changed, 59 insertions(+), 33 deletions(-) diff --git a/server/src/main/java/org/opensearch/grpc/services/search/SearchRequestProtoHelper.java b/server/src/main/java/org/opensearch/grpc/services/search/SearchRequestProtoHelper.java index 26904d20af110..21615812855fb 100644 --- a/server/src/main/java/org/opensearch/grpc/services/search/SearchRequestProtoHelper.java +++ b/server/src/main/java/org/opensearch/grpc/services/search/SearchRequestProtoHelper.java @@ -8,6 +8,8 @@ package org.opensearch.grpc.services.search; +import opensearch.proto.SearchResponse; +import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchType; import org.opensearch.action.support.IndicesOptions; import org.opensearch.common.unit.TimeValue; @@ -15,14 +17,16 @@ import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.search.internal.SearchContext; +import java.util.EnumSet; + import static org.opensearch.action.search.SearchRequest.DEFAULT_INDICES_OPTIONS; import static org.opensearch.common.unit.TimeValue.parseTimeValue; import static org.opensearch.grpc.services.search.SearchRequestBodyProtoHelper.searchSourceBuilderFromProto; public class SearchRequestProtoHelper { - public static org.opensearch.action.search.SearchRequest searchRequestFromProto(opensearch.proto.SearchRequest proto) { - org.opensearch.action.search.SearchRequest searchReq = new org.opensearch.action.search.SearchRequest(); + public static SearchRequest searchRequestFromProto(opensearch.proto.SearchRequest proto) { + SearchRequest searchReq = new SearchRequest(); if (searchReq.source() == null) { searchReq.source(new SearchSourceBuilder()); } @@ -49,22 +53,55 @@ public static org.opensearch.action.search.SearchRequest searchRequestFromProto( [optional] Whether to ignore wildcards that don't match any indexes. Default is true. optional bool allow_no_indices = 4; - [optional] Specifies the type of index that wildcard expressions can match. Supports list of values. Default is open. - repeated ExpandWildcard expand_wildcards = 14; - [optional] Specifies whether to include missing or closed indexes in the response and ignores unavailable shards during the search request. Default is false. optional bool ignore_unavailable = 18; [optional] Whether to ignore concrete, expanded, or indexes with aliases if indexes are frozen. Default is true. optional bool ignore_throttled = 17; + + [optional] Specifies the type of index that wildcard expressions can match. Supports list of values. Default is open. + repeated ExpandWildcard expand_wildcards = 14; */ - IndicesOptions indicesOptions = IndicesOptions.fromParameters( - proto.getExpandWildcardsList(), - proto.getIgnoreUnavailable(), - proto.getAllowNoIndices(), - proto.getIgnoreThrottled(), - DEFAULT_INDICES_OPTIONS); - searchReq.indicesOptions(indicesOptions); + EnumSet indicesoOptions = EnumSet.noneOf(IndicesOptions.Option.class); + EnumSet wildcardStates = EnumSet.noneOf(IndicesOptions.WildcardStates.class); + + if (!proto.hasAllowNoIndices()) { // add option by default + indicesoOptions.add(IndicesOptions.Option.ALLOW_NO_INDICES); + } else if (proto.getAllowNoIndices()) { + indicesoOptions.add(IndicesOptions.Option.ALLOW_NO_INDICES); + } + + if (proto.getIgnoreUnavailable()) { + indicesoOptions.add(IndicesOptions.Option.IGNORE_UNAVAILABLE); + } + + if (!proto.hasIgnoreThrottled()) { // add option by default + indicesoOptions.add(IndicesOptions.Option.IGNORE_THROTTLED); + } else if (proto.getIgnoreThrottled()) { + indicesoOptions.add(IndicesOptions.Option.IGNORE_THROTTLED); + } + + for (opensearch.proto.SearchRequest.ExpandWildcard wc : proto.getExpandWildcardsList()) { + switch (wc) { + case EXPAND_WILDCARD_OPEN: + wildcardStates.add(IndicesOptions.WildcardStates.OPEN); + break; + case EXPAND_WILDCARD_CLOSED: + wildcardStates.add(IndicesOptions.WildcardStates.CLOSED); + break; + case EXPAND_WILDCARD_HIDDEN: + wildcardStates.add(IndicesOptions.WildcardStates.HIDDEN); + break; + case EXPAND_WILDCARD_NONE: + wildcardStates.clear(); + break; + case EXPAND_WILDCARD_ALL: + wildcardStates.addAll(EnumSet.allOf(IndicesOptions.WildcardStates.class)); + break; + } + } + + searchReq.indicesOptions(new IndicesOptions(indicesoOptions, wildcardStates)); //[optional] Whether to return partial results if the request runs into an error or times out. Default is true. //optional bool allow_partial_search_results = 5; @@ -97,7 +134,10 @@ public static org.opensearch.action.search.SearchRequest searchRequestFromProto( //[optional] Whether to minimize round-trips between a node and remote clusters. Default is true. //optional bool ccs_minimize_roundtrips = 10; - searchReq.setCcsMinimizeRoundtrips(proto.getCcsMinimizeRoundtrips()); + searchReq.setCcsMinimizeRoundtrips(true); + if (proto.hasCcsMinimizeRoundtrips()) { + searchReq.setCcsMinimizeRoundtrips(proto.getCcsMinimizeRoundtrips()); + } //[optional] Indicates whether the default operator for a string query should be AND or OR. Default is OR. //optional Operator default_operator = 11; @@ -304,12 +344,12 @@ public static org.opensearch.action.search.SearchRequest searchRequestFromProto( return searchReq; } - public static opensearch.proto.SearchResponse searchResponseToProto(org.opensearch.action.search.SearchResponse response) { - return opensearch.proto.SearchResponse.newBuilder().build(); + public static SearchResponse searchResponseToProto(org.opensearch.action.search.SearchResponse response) { + return SearchResponse.newBuilder().build(); } // TODO: Refactor RestSearchAction::checkRestTotalHits to this - private static void checkRestTotalHits(boolean totalHitsAsInt, org.opensearch.action.search.SearchRequest searchRequest) { + private static void checkRestTotalHits(boolean totalHitsAsInt, SearchRequest searchRequest) { if (totalHitsAsInt == false) { return; } diff --git a/server/src/main/java/org/opensearch/grpc/services/search/SearchServiceImpl.java b/server/src/main/java/org/opensearch/grpc/services/search/SearchServiceImpl.java index bca848beecd61..a17ac897666fb 100644 --- a/server/src/main/java/org/opensearch/grpc/services/search/SearchServiceImpl.java +++ b/server/src/main/java/org/opensearch/grpc/services/search/SearchServiceImpl.java @@ -51,6 +51,9 @@ public void onResponse(org.opensearch.action.search.SearchResponse response) { @Override public void onFailure(Exception e) { + // DEBUG PRINT + System.out.println(e.getMessage()); + respObserver.onError( new RuntimeException("SearchRequest task failed", e) ); @@ -61,23 +64,6 @@ public void onFailure(Exception e) { public void search(opensearch.proto.SearchRequest searchRequestProto, StreamObserver responseObserver) { org.opensearch.action.search.SearchRequest searchReq = searchRequestFromProto(searchRequestProto); SearchRequestActionListener listener = new SearchRequestActionListener(responseObserver); - - /* - When we execute a TransportSearchAction a SearchRequestOperationsListener is registered for the action to track resource usage for the request QueryGroup. - The QueryGroupTask.QUERY_GROUP_ID_HEADER uniquely identifies each QueryGroup - Typically the - */ - -// WorkloadManagementTransportInterceptor -// if (isSearchWorkloadRequest(task)) { -// ((QueryGroupTask) task).setQueryGroupId(threadPool.getThreadContext()); -// final String queryGroupId = ((QueryGroupTask) (task)).getQueryGroupId(); -// queryGroupService.rejectIfNeeded(queryGroupId); -// } - - // -// client.threadPool().getThreadContext().putHeader(name, String.join(",", distinctHeaderValues)); - client.execute(SearchAction.INSTANCE, searchReq, listener); } }