Skip to content

Commit

Permalink
E2E poc for matchall + termquery
Browse files Browse the repository at this point in the history
  • Loading branch information
zshuyi committed Nov 21, 2024
1 parent 853b206 commit 5efb182
Show file tree
Hide file tree
Showing 7 changed files with 282 additions and 458 deletions.
104 changes: 100 additions & 4 deletions server/src/main/java/org/opensearch/action/search/SearchRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,10 @@
import org.opensearch.search.builder.PointInTimeBuilder;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.search.internal.SearchContext;
import static org.opensearch.rest.action.search.RestSearchAction.TOTAL_HITS_AS_INT_PARAM;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.*;

import static org.opensearch.action.ValidateActions.addValidationError;

Expand Down Expand Up @@ -162,6 +160,104 @@ public SearchRequest(String[] indices, SearchSourceBuilder source) {
this.source = source;
}

public SearchRequest(opensearch.proto.SearchRequest proto) {
this();
source = new SearchSourceBuilder(proto);
if (proto.hasPreFilterShardSize()) {
batchedReduceSize = proto.getBatchedReduceSize();
}
if (proto.hasMaxConcurrentShardRequests()) {
maxConcurrentShardRequests = proto.getMaxConcurrentShardRequests();
}
if(proto.hasAllowPartialSearchResults()) {
allowPartialSearchResults = proto.getAllowPartialSearchResults();
} else {
allowPartialSearchResults = false;
}
if (proto.hasPhaseTook()) {
phaseTook = proto.getPhaseTook();
} else {
phaseTook = true;
}
switch (proto.getSearchType()) {
case SEARCH_TYPE_QUERY_THEN_FETCH:
searchType = SearchType.QUERY_THEN_FETCH;
break;
case SEARCH_TYPE_DFS_QUERY_THEN_FETCH:
searchType = SearchType.DFS_QUERY_THEN_FETCH;
break;
default:
searchType = SearchType.DEFAULT;
}
if (proto.hasRequestCache()) {
requestCache = proto.getRequestCache();
}
preference = proto.getPreference();
// ========= NOT WORKING ==============
// routing = String.join(",", proto.getRoutingList());
// EnumSet<IndicesOptions.Option> indicesoOptions = EnumSet.noneOf(IndicesOptions.Option.class);
// EnumSet<IndicesOptions.WildcardStates> wildcardStates = EnumSet.noneOf(IndicesOptions.WildcardStates.class);
//
// if (!proto.hasAllowNoIndices()) { // add option by default
// indicesoOptions.add(IndicesOptions.Option.ALLOW_NO_INDICES);
// } else if (proto.getAllowNoIndices()) {
// indicesoOptions.add(IndicesOptions.Option.ALLOW_NO_INDICES);
// }
//
//// if (proto.getIgnoreUnavailable()) {
// indicesoOptions.add(IndicesOptions.Option.IGNORE_UNAVAILABLE);
//// }
//
// if (!proto.hasIgnoreThrottled()) { // add option by default
// indicesoOptions.add(IndicesOptions.Option.IGNORE_THROTTLED);
// } else if (proto.getIgnoreThrottled()) {
// indicesoOptions.add(IndicesOptions.Option.IGNORE_THROTTLED);
// }
//
// for (opensearch.proto.SearchRequest.ExpandWildcard wc : proto.getExpandWildcardsList()) {
// switch (wc) {
// case EXPAND_WILDCARD_OPEN:
// wildcardStates.add(IndicesOptions.WildcardStates.OPEN);
// break;
// case EXPAND_WILDCARD_CLOSED:
// wildcardStates.add(IndicesOptions.WildcardStates.CLOSED);
// break;
// case EXPAND_WILDCARD_HIDDEN:
// wildcardStates.add(IndicesOptions.WildcardStates.HIDDEN);
// break;
// case EXPAND_WILDCARD_NONE:
// wildcardStates.clear();
// break;
// case EXPAND_WILDCARD_ALL:
// wildcardStates.addAll(EnumSet.allOf(IndicesOptions.WildcardStates.class));
// break;
// }
// }
// indicesOptions = new IndicesOptions(indicesoOptions, wildcardStates);

if (proto.hasSearchPipeline()) {
pipeline = proto.getSearchPipeline();
}
allowPartialSearchResults = proto.getAllowPartialSearchResults();
if (proto.hasCancelAfterTimeInterval()) {
TimeValue cancelAfter = new TimeValue(Long.parseLong(proto.getCancelAfterTimeInterval()));
cancelAfterTimeInterval = cancelAfter;
}
ccsMinimizeRoundtrips = proto.getCcsMinimizeRoundtrips();
Integer trackTotalHitsUpTo = source().trackTotalHitsUpTo();
if (trackTotalHitsUpTo == null) {
source().trackTotalHits(true);
} else if (trackTotalHitsUpTo != SearchContext.TRACK_TOTAL_HITS_ACCURATE && trackTotalHitsUpTo != SearchContext.TRACK_TOTAL_HITS_DISABLED) {
throw new IllegalArgumentException(
"["
+ TOTAL_HITS_AS_INT_PARAM
+ "] cannot be used "
+ "if the tracking of total hits is not accurate, got "
+ trackTotalHitsUpTo
);
}
}

/**
* Deep clone a SearchRequest
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package org.opensearch.grpc.services.search;

import io.grpc.stub.StreamObserver;
import opensearch.proto.*;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.ShardSearchFailure;
import org.opensearch.core.action.ActionListener;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

import static org.opensearch.rest.action.search.RestSearchAction.TOTAL_HITS_AS_INT_PARAM;


public class GprcSearchAction {

public static opensearch.proto.SearchResponse searchResponseToProto(org.opensearch.action.search.SearchResponse transportResponse) {
ResponseBody.Builder responseBodyBuilder = ResponseBody.newBuilder();

// [required] Milliseconds it took Elasticsearch to execute the request.
// optional int64 took = 1;
responseBodyBuilder.setTook(transportResponse.getTook().getMillis());
//
// // [required] If true, the request timed out before completion; returned results may be partial or empty.
// optional bool timed_out = 2;
responseBodyBuilder.setTimedOut(transportResponse.isTimedOut());
//
// // [required] Contains a count of shards used for the request.
// ShardStatistics shards = 3;
responseBodyBuilder.setShards(ShardStatistics.newBuilder()
.setFailed(transportResponse.getFailedShards())
.setSuccessful(transportResponse.getSuccessfulShards())
.setTotal(transportResponse.getTotalShards())
.addAllFailures(getShardFails(transportResponse))
.build());
//
// // [optional] Phase-level took time values in the response.
// optional PhaseTook phase_took = 4;
//
// // [required] Contains returned documents and metadata.
// HitsMetadata hits = 5;
TotalHits.TotalHitsRelation totalHitsRelation = TotalHits.TotalHitsRelation.TOTAL_HITS_RELATION_INVALID;
switch (transportResponse.getHits().getTotalHits().relation.name()) {
case "EQUAL_TO":
totalHitsRelation = TotalHits.TotalHitsRelation.TOTAL_HITS_RELATION_EQ;
break;
case "GREATER_THAN_OR_EQUAL_TO":
totalHitsRelation = TotalHits.TotalHitsRelation.TOTAL_HITS_RELATION_GTE;
break;
}
// transportResponse.getHits().getHits();
responseBodyBuilder.setMaxScore(transportResponse.getHits().getMaxScore()).setHits(HitsMetadata.newBuilder()
.setTotal(opensearch.proto.HitsMetadata.Total.newBuilder()
.setTotalHits(TotalHits.newBuilder()
.setValue(transportResponse.getHits().getTotalHits().value)
.setRelation(totalHitsRelation)
.build())
.build())
.build())
.build();

//
// // [optional] When you search one or more remote clusters, a `_clusters` section is included to provide information about the search on each cluster.
// optional ClusterStatistics clusters = 6;
//
// // [optional] Retrieved specific fields in the search response
// optional .google.protobuf.Struct fields = 7;
//
// // [optional] Highest returned document _score.
// optional float max_score = 8;
//
// // [optional] The number of times that the coordinating node aggregates results from batches of shard responses
// optional int32 num_reduce_phases = 9;
//
// // [optional] Contains profiling information.
// Profile profile = 10;
//
// // [optional] The PIT ID.
// optional string pit_id = 11;
//
// // [optional] Identifier for the search and its search context.
// optional string scroll_id = 12;
//
// // [optional] If the query was terminated early, the terminated_early flag will be set to true in the response
// optional bool terminated_early = 13;

opensearch.proto.SearchResponse response = opensearch.proto.SearchResponse.newBuilder()
.setResponseBody(responseBodyBuilder.build())
.build();
return response;
}

public static List<ShardFailure> getShardFails(org.opensearch.action.search.SearchResponse transportResponse) {
ShardSearchFailure[] failures = transportResponse.getShardFailures();
return Arrays.stream(failures)
.map(failure -> ShardFailure.newBuilder()
.setIndex(failure.index())
.setShard(failure.shardId())
.setStatus(failure.status().name())
.build())
.collect(Collectors.toList());

}

static class SearchRequestActionListener implements ActionListener<SearchResponse> {
private StreamObserver<opensearch.proto.SearchResponse> respObserver = null;

public SearchRequestActionListener(StreamObserver<opensearch.proto.SearchResponse> responseObserver){
super();
respObserver = responseObserver;
}

@Override
public void onResponse(org.opensearch.action.search.SearchResponse response) {
try {
opensearch.proto.SearchResponse protoResponse = searchResponseToProto(response);
respObserver.onNext(protoResponse);
respObserver.onCompleted();
} catch (Exception e) {
respObserver.onError(
new RuntimeException("Failed to process SearchResponse", e)
);
}
}

@Override
public void onFailure(Exception e) {
respObserver.onError(
new RuntimeException("SearchRequest task failed", e)
);
}
};

}

This file was deleted.

Loading

0 comments on commit 5efb182

Please sign in to comment.