Skip to content

Commit

Permalink
Handled default search pipeline for multiple indices
Browse files Browse the repository at this point in the history
Signed-off-by: Owais Kazi <[email protected]>
  • Loading branch information
owaiskazi19 committed Apr 18, 2024
1 parent 84679de commit 908c67e
Show file tree
Hide file tree
Showing 4 changed files with 202 additions and 36 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add an individual setting of rate limiter for segment replication ([#12959](https://github.com/opensearch-project/OpenSearch/pull/12959))
- [Streaming Indexing] Ensure support of the new transport by security plugin ([#13174](https://github.com/opensearch-project/OpenSearch/pull/13174))
- Add cluster setting to dynamically configure the buckets for filter rewrite optimization. ([#13179](https://github.com/opensearch-project/OpenSearch/pull/13179))
- [Search Pipeline] Handle default pipeline for multiple indices ([#13276](https://github.com/opensearch-project/OpenSearch/pull/13276))

### Dependencies
- Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ private void executeRequest(
PipelinedRequest searchRequest;
ActionListener<SearchResponse> listener;
try {
searchRequest = searchPipelineService.resolvePipeline(originalSearchRequest);
searchRequest = searchPipelineService.resolvePipeline(originalSearchRequest, indexNameExpressionResolver);
listener = searchRequest.transformResponseListener(updatedListener);
} catch (Exception e) {
updatedListener.onFailure(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateApplier;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterManagerTaskKeys;
Expand All @@ -35,6 +36,7 @@
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.index.Index;
import org.opensearch.core.service.ReportingService;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
Expand Down Expand Up @@ -360,7 +362,7 @@ static ClusterState innerDelete(DeleteSearchPipelineRequest request, ClusterStat
return newState.build();
}

public PipelinedRequest resolvePipeline(SearchRequest searchRequest) {
public PipelinedRequest resolvePipeline(SearchRequest searchRequest, IndexNameExpressionResolver indexNameExpressionResolver) {
Pipeline pipeline = Pipeline.NO_OP_PIPELINE;

if (searchRequest.source() != null && searchRequest.source().searchPipelineSource() != null) {
Expand Down Expand Up @@ -390,13 +392,22 @@ public PipelinedRequest resolvePipeline(SearchRequest searchRequest) {
if (searchRequest.pipeline() != null) {
// Named pipeline specified for the request
pipelineId = searchRequest.pipeline();
} else if (state != null && searchRequest.indices() != null && searchRequest.indices().length == 1) {
} else if (state != null && searchRequest.indices() != null) {
// Check for index default pipeline
IndexMetadata indexMetadata = state.metadata().index(searchRequest.indices()[0]);
if (indexMetadata != null) {
Settings indexSettings = indexMetadata.getSettings();
if (IndexSettings.DEFAULT_SEARCH_PIPELINE.exists(indexSettings)) {
pipelineId = IndexSettings.DEFAULT_SEARCH_PIPELINE.get(indexSettings);
Index[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, searchRequest);
for (Index index : concreteIndices) {
IndexMetadata indexMetadata = state.metadata().index(index);
if (indexMetadata != null) {
Settings indexSettings = indexMetadata.getSettings();
if (IndexSettings.DEFAULT_SEARCH_PIPELINE.exists(indexSettings)) {
String currentPipelineId = IndexSettings.DEFAULT_SEARCH_PIPELINE.get(indexSettings);
if (NOOP_PIPELINE_ID.equals(pipelineId)) {
pipelineId = currentPipelineId;
} else if (pipelineId.equals(currentPipelineId) == false) {
pipelineId = NOOP_PIPELINE_ID;
break;
}
}
}
}
}
Expand Down
Loading

0 comments on commit 908c67e

Please sign in to comment.