Skip to content

Commit

Permalink
Added support for search pipeline name in multi search API
Browse files Browse the repository at this point in the history
Signed-off-by: Owais <[email protected]>
  • Loading branch information
owaiskazi19 committed Sep 20, 2024
1 parent b369611 commit 0b95678
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,8 @@ public static HighlightBuilder highlight() {

private Map<String, Object> searchPipelineSource = null;

private String searchPipeline;

/**
* Constructs a new search source builder.
*/
Expand Down Expand Up @@ -273,6 +275,7 @@ public SearchSourceBuilder(StreamInput in) throws IOException {
seqNoAndPrimaryTerm = in.readOptionalBoolean();
extBuilders = in.readNamedWriteableList(SearchExtBuilder.class);
profile = in.readBoolean();
searchPipeline = in.readOptionalString();
searchAfterBuilder = in.readOptionalWriteable(SearchAfterBuilder::new);
sliceBuilder = in.readOptionalWriteable(SliceBuilder::new);
collapse = in.readOptionalWriteable(CollapseBuilder::new);
Expand Down Expand Up @@ -347,6 +350,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalBoolean(seqNoAndPrimaryTerm);
out.writeNamedWriteableList(extBuilders);
out.writeBoolean(profile);
out.writeOptionalString(searchPipeline);
out.writeOptionalWriteable(searchAfterBuilder);
out.writeOptionalWriteable(sliceBuilder);
out.writeOptionalWriteable(collapse);
Expand Down Expand Up @@ -1111,6 +1115,13 @@ public Map<String, Object> searchPipelineSource() {
return searchPipelineSource;
}

/**
* @return a search pipeline name defined within the search source (see {@link org.opensearch.search.pipeline.SearchPipelineService})
*/
public String pipeline() {
return searchPipeline;
}

/**
* Define a search pipeline to process this search request and/or its response. See {@link org.opensearch.search.pipeline.SearchPipelineService}.
*/
Expand All @@ -1119,6 +1130,14 @@ public SearchSourceBuilder searchPipelineSource(Map<String, Object> searchPipeli
return this;
}

/**
* Define a search pipeline name to process this search request and/or its response. See {@link org.opensearch.search.pipeline.SearchPipelineService}.
*/
public SearchSourceBuilder pipeline(String searchPipeline) {
this.searchPipeline = searchPipeline;
return this;
}

/**
* Rewrites this search source builder into its primitive form. e.g. by
* rewriting the QueryBuilder. If the builder did not change the identity
Expand Down Expand Up @@ -1283,6 +1302,8 @@ public void parseXContent(XContentParser parser, boolean checkTrailingTokens) th
sort(parser.text());
} else if (PROFILE_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
profile = parser.booleanValue();
} else if (SEARCH_PIPELINE.match(currentFieldName, parser.getDeprecationHandler())) {
searchPipeline = parser.text();
} else {
throw new ParsingException(
parser.getTokenLocation(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,9 @@ public PipelinedRequest resolvePipeline(SearchRequest searchRequest, IndexNameEx
if (searchRequest.pipeline() != null) {
// Named pipeline specified for the request
pipelineId = searchRequest.pipeline();
} else if (searchRequest.source() != null && searchRequest.source().pipeline() != null) {
// Inline pipeline specified for the request
pipelineId = searchRequest.source().pipeline();
} else if (state != null && searchRequest.indices() != null && searchRequest.indices().length != 0) {
try {
// Check for index default pipeline
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -969,6 +969,67 @@ public void testInlinePipeline() throws Exception {
}
}

/**
* Tests a pipeline name defined in the search request source.
*/
public void testInlineDefinedPipeline() throws Exception {
SearchPipelineService searchPipelineService = createWithProcessors();

SearchPipelineMetadata metadata = new SearchPipelineMetadata(
Map.of(
"p1",
new PipelineConfiguration(
"p1",
new BytesArray(
"{"
+ "\"request_processors\": [{ \"scale_request_size\": { \"scale\" : 2 } }],"
+ "\"response_processors\": [{ \"fixed_score\": { \"score\" : 2 } }]"
+ "}"
),
MediaTypeRegistry.JSON
)

)

);
ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build();
ClusterState previousState = clusterState;
clusterState = ClusterState.builder(clusterState)
.metadata(Metadata.builder().putCustom(SearchPipelineMetadata.TYPE, metadata))
.build();
searchPipelineService.applyClusterState(new ClusterChangedEvent("", clusterState, previousState));

SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource().size(100).pipeline("p1");
SearchRequest searchRequest = new SearchRequest().source(sourceBuilder);

// Verify pipeline
PipelinedRequest pipelinedRequest = syncTransformRequest(
searchPipelineService.resolvePipeline(searchRequest, indexNameExpressionResolver)
);
Pipeline pipeline = pipelinedRequest.getPipeline();
assertEquals("p1", pipeline.getId());
assertEquals(1, pipeline.getSearchRequestProcessors().size());
assertEquals(1, pipeline.getSearchResponseProcessors().size());

// Verify that pipeline transforms request
assertEquals(200, pipelinedRequest.source().size());

int size = 10;
SearchHit[] hits = new SearchHit[size];
for (int i = 0; i < size; i++) {
hits[i] = new SearchHit(i, "doc" + i, Collections.emptyMap(), Collections.emptyMap());
hits[i].score(i);
}
SearchHits searchHits = new SearchHits(hits, new TotalHits(size * 2, TotalHits.Relation.EQUAL_TO), size);
SearchResponseSections searchResponseSections = new SearchResponseSections(searchHits, null, null, false, false, null, 0);
SearchResponse searchResponse = new SearchResponse(searchResponseSections, null, 1, 1, 0, 10, null, null);

SearchResponse transformedResponse = syncTransformResponse(pipelinedRequest, searchResponse);
for (int i = 0; i < size; i++) {
assertEquals(2.0, transformedResponse.getHits().getHits()[i].getScore(), 0.0001);
}
}

public void testInfo() {
SearchPipelineService searchPipelineService = createWithProcessors();
SearchPipelineInfo info = searchPipelineService.info();
Expand Down

0 comments on commit 0b95678

Please sign in to comment.