From 67c608754522e0fda59c109ce9b1e844dd6d52e2 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Mon, 29 Apr 2024 19:57:35 +0000 Subject: [PATCH] Handle default search pipeline for multiple indices (#13276) Signed-off-by: Owais Kazi (cherry picked from commit defbd60e4edf24ef8faf8e746f6b62eaf95b99c3) Signed-off-by: github-actions[bot] --- CHANGELOG.md | 1 + .../action/search/TransportSearchAction.java | 2 +- .../pipeline/SearchPipelineService.java | 34 +- .../pipeline/SearchPipelineServiceTests.java | 295 ++++++++++++++++-- 4 files changed, 291 insertions(+), 41 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2568cc52087d2..5fe36968f1b7a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Batch mode for async fetching shard information in GatewayAllocator for unassigned shards ([#8746](https://github.com/opensearch-project/OpenSearch/pull/8746)) - [Remote Store] Add settings for remote path type and hash algorithm ([#13225](https://github.com/opensearch-project/OpenSearch/pull/13225)) - [Remote Store] Upload remote paths during remote enabled index creation ([#13386](https://github.com/opensearch-project/OpenSearch/pull/13386)) +- [Search Pipeline] Handle default pipeline for multiple indices ([#13276](https://github.com/opensearch-project/OpenSearch/pull/13276)) ### Dependencies - Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896)) diff --git a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java index 65cfd35489033..143b01af3f62f 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java @@ -457,7 +457,7 @@ private void executeRequest( PipelinedRequest searchRequest; ActionListener listener; try { - searchRequest = searchPipelineService.resolvePipeline(originalSearchRequest); + searchRequest = searchPipelineService.resolvePipeline(originalSearchRequest, indexNameExpressionResolver); listener = searchRequest.transformResponseListener(updatedListener); } catch (Exception e) { updatedListener.onFailure(e); diff --git a/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java b/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java index 2175b5d135394..012d6695c042b 100644 --- a/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java +++ b/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java @@ -23,6 +23,7 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateApplier; import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.service.ClusterManagerTaskKeys; @@ -35,10 +36,12 @@ import org.opensearch.common.xcontent.XContentHelper; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; +import org.opensearch.core.index.Index; import org.opensearch.core.service.ReportingService; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.env.Environment; import org.opensearch.gateway.GatewayService; +import org.opensearch.index.IndexNotFoundException; import org.opensearch.index.IndexSettings; import org.opensearch.index.analysis.AnalysisRegistry; import org.opensearch.ingest.ConfigurationUtils; @@ -62,6 +65,8 @@ /** * The main entry point for search pipelines. Handles CRUD operations and exposes the API to execute search pipelines * against requests and responses. + * + * @opensearch.internal */ public class SearchPipelineService implements ClusterStateApplier, ReportingService { @@ -360,7 +365,7 @@ static ClusterState innerDelete(DeleteSearchPipelineRequest request, ClusterStat return newState.build(); } - public PipelinedRequest resolvePipeline(SearchRequest searchRequest) { + public PipelinedRequest resolvePipeline(SearchRequest searchRequest, IndexNameExpressionResolver indexNameExpressionResolver) { Pipeline pipeline = Pipeline.NO_OP_PIPELINE; if (searchRequest.source() != null && searchRequest.source().searchPipelineSource() != null) { @@ -390,14 +395,27 @@ public PipelinedRequest resolvePipeline(SearchRequest searchRequest) { if (searchRequest.pipeline() != null) { // Named pipeline specified for the request pipelineId = searchRequest.pipeline(); - } else if (state != null && searchRequest.indices() != null && searchRequest.indices().length == 1) { - // Check for index default pipeline - IndexMetadata indexMetadata = state.metadata().index(searchRequest.indices()[0]); - if (indexMetadata != null) { - Settings indexSettings = indexMetadata.getSettings(); - if (IndexSettings.DEFAULT_SEARCH_PIPELINE.exists(indexSettings)) { - pipelineId = IndexSettings.DEFAULT_SEARCH_PIPELINE.get(indexSettings); + } else if (state != null && searchRequest.indices() != null && searchRequest.indices().length != 0) { + try { + // Check for index default pipeline + Index[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, searchRequest); + for (Index index : concreteIndices) { + IndexMetadata indexMetadata = state.metadata().index(index); + if (indexMetadata != null) { + Settings indexSettings = indexMetadata.getSettings(); + if (IndexSettings.DEFAULT_SEARCH_PIPELINE.exists(indexSettings)) { + String currentPipelineId = IndexSettings.DEFAULT_SEARCH_PIPELINE.get(indexSettings); + if (NOOP_PIPELINE_ID.equals(pipelineId)) { + pipelineId = currentPipelineId; + } else if (!pipelineId.equals(currentPipelineId)) { + pipelineId = NOOP_PIPELINE_ID; + break; + } + } + } } + } catch (IndexNotFoundException e) { + logger.debug("Default pipeline not applied for {}", (Object) searchRequest.indices()); } } if (NOOP_PIPELINE_ID.equals(pipelineId) == false) { diff --git a/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java b/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java index f5851e669a2da..f5857922fdff2 100644 --- a/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java +++ b/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java @@ -32,7 +32,9 @@ import org.opensearch.cluster.ClusterChangedEvent; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.AliasMetadata; import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.service.ClusterService; @@ -41,6 +43,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.util.concurrent.AtomicArray; import org.opensearch.common.util.concurrent.OpenSearchExecutors; +import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.common.xcontent.XContentType; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.breaker.CircuitBreaker; @@ -96,14 +99,31 @@ public Map> getSearchPhas private ThreadPool threadPool; + private IndexNameExpressionResolver indexNameExpressionResolver; + @Before public void setup() { threadPool = mock(ThreadPool.class); ExecutorService executorService = OpenSearchExecutors.newDirectExecutorService(); + indexNameExpressionResolver = new IndexNameExpressionResolver(new ThreadContext(Settings.EMPTY)); when(threadPool.generic()).thenReturn(executorService); when(threadPool.executor(anyString())).thenReturn(executorService); } + private static IndexMetadata.Builder indexBuilder(String index) { + return indexBuilder(index, Settings.EMPTY); + } + + private static IndexMetadata.Builder indexBuilder(String index, Settings additionalSettings) { + return IndexMetadata.builder(index).settings(addAdditionalSettings(additionalSettings)); + } + + private static Settings.Builder addAdditionalSettings(Settings additionalSettings) { + return settings(Version.CURRENT).put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(additionalSettings); + } + public void testSearchPipelinePlugin() { Client client = mock(Client.class); SearchPipelineService searchPipelineService = new SearchPipelineService( @@ -162,7 +182,7 @@ public void testResolveSearchPipelineDoesNotExist() { final SearchRequest searchRequest = new SearchRequest("_index").pipeline("bar"); IllegalArgumentException e = expectThrows( IllegalArgumentException.class, - () -> searchPipelineService.resolvePipeline(searchRequest) + () -> searchPipelineService.resolvePipeline(searchRequest, indexNameExpressionResolver) ); assertTrue(e.getMessage(), e.getMessage().contains(" not defined")); } @@ -197,13 +217,13 @@ public void testResolveIndexDefaultPipeline() throws Exception { service.applyClusterState(cce); SearchRequest searchRequest = new SearchRequest("my_index").source(SearchSourceBuilder.searchSource().size(5)); - PipelinedRequest pipelinedRequest = syncTransformRequest(service.resolvePipeline(searchRequest)); + PipelinedRequest pipelinedRequest = syncTransformRequest(service.resolvePipeline(searchRequest, indexNameExpressionResolver)); assertEquals("p1", pipelinedRequest.getPipeline().getId()); assertEquals(10, pipelinedRequest.source().size()); // Bypass the default pipeline searchRequest.pipeline("_none"); - pipelinedRequest = service.resolvePipeline(searchRequest); + pipelinedRequest = service.resolvePipeline(searchRequest, indexNameExpressionResolver); assertEquals("_none", pipelinedRequest.getPipeline().getId()); assertEquals(5, pipelinedRequest.source().size()); } @@ -591,23 +611,29 @@ public void testTransformRequest() throws Exception { ); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); ClusterState previousState = clusterState; - clusterState = ClusterState.builder(clusterState) - .metadata(Metadata.builder().putCustom(SearchPipelineMetadata.TYPE, metadata)) - .build(); + + Metadata.Builder mdBuilder = Metadata.builder() + .putCustom(SearchPipelineMetadata.TYPE, metadata) + .put(indexBuilder("my-index").putAlias(AliasMetadata.builder("barbaz"))); + + clusterState = ClusterState.builder(clusterState).metadata(mdBuilder).build(); + searchPipelineService.applyClusterState(new ClusterChangedEvent("", clusterState, previousState)); int size = 10; SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(new TermQueryBuilder("foo", "bar")).size(size); - SearchRequest request = new SearchRequest("_index").source(sourceBuilder).pipeline("p1"); + SearchRequest request = new SearchRequest("my-index").source(sourceBuilder).pipeline("p1"); - PipelinedRequest pipelinedRequest = syncTransformRequest(searchPipelineService.resolvePipeline(request)); + PipelinedRequest pipelinedRequest = syncTransformRequest( + searchPipelineService.resolvePipeline(request, indexNameExpressionResolver) + ); assertEquals(2 * size, pipelinedRequest.source().size()); assertEquals(size, request.source().size()); // This request doesn't specify a pipeline, it doesn't get transformed. - request = new SearchRequest("_index").source(sourceBuilder); - pipelinedRequest = searchPipelineService.resolvePipeline(request); + request = new SearchRequest("my-index").source(sourceBuilder); + pipelinedRequest = searchPipelineService.resolvePipeline(request, indexNameExpressionResolver); assertEquals(size, pipelinedRequest.source().size()); } @@ -643,13 +669,13 @@ public void testTransformResponse() throws Exception { // First try without specifying a pipeline, which should be a no-op. SearchRequest searchRequest = new SearchRequest(); - PipelinedRequest pipelinedRequest = searchPipelineService.resolvePipeline(searchRequest); + PipelinedRequest pipelinedRequest = searchPipelineService.resolvePipeline(searchRequest, indexNameExpressionResolver); SearchResponse notTransformedResponse = syncTransformResponse(pipelinedRequest, searchResponse); assertSame(searchResponse, notTransformedResponse); // Now apply a pipeline searchRequest = new SearchRequest().pipeline("p1"); - pipelinedRequest = searchPipelineService.resolvePipeline(searchRequest); + pipelinedRequest = searchPipelineService.resolvePipeline(searchRequest, indexNameExpressionResolver); SearchResponse transformedResponse = syncTransformResponse(pipelinedRequest, searchResponse); assertEquals(size, transformedResponse.getHits().getHits().length); for (int i = 0; i < size; i++) { @@ -736,7 +762,7 @@ public void testTransformSearchPhase() { // First try without specifying a pipeline, which should be a no-op. SearchRequest searchRequest = new SearchRequest(); - PipelinedRequest pipelinedRequest = searchPipelineService.resolvePipeline(searchRequest); + PipelinedRequest pipelinedRequest = searchPipelineService.resolvePipeline(searchRequest, indexNameExpressionResolver); AtomicArray notTransformedSearchPhaseResults = searchPhaseResults.getAtomicArray(); pipelinedRequest.transformSearchPhaseResults( searchPhaseResults, @@ -748,7 +774,7 @@ public void testTransformSearchPhase() { // Now set the pipeline as p1 searchRequest = new SearchRequest().pipeline("p1"); - pipelinedRequest = searchPipelineService.resolvePipeline(searchRequest); + pipelinedRequest = searchPipelineService.resolvePipeline(searchRequest, indexNameExpressionResolver); pipelinedRequest.transformSearchPhaseResults( searchPhaseResults, @@ -766,7 +792,7 @@ public void testTransformSearchPhase() { // Check Processor doesn't run for between other phases searchRequest = new SearchRequest().pipeline("p1"); - pipelinedRequest = searchPipelineService.resolvePipeline(searchRequest); + pipelinedRequest = searchPipelineService.resolvePipeline(searchRequest, indexNameExpressionResolver); AtomicArray notTransformedSearchPhaseResult = searchPhaseResults.getAtomicArray(); pipelinedRequest.transformSearchPhaseResults( searchPhaseResults, @@ -916,7 +942,9 @@ public void testInlinePipeline() throws Exception { SearchRequest searchRequest = new SearchRequest().source(sourceBuilder); // Verify pipeline - PipelinedRequest pipelinedRequest = syncTransformRequest(searchPipelineService.resolvePipeline(searchRequest)); + PipelinedRequest pipelinedRequest = syncTransformRequest( + searchPipelineService.resolvePipeline(searchRequest, indexNameExpressionResolver) + ); Pipeline pipeline = pipelinedRequest.getPipeline(); assertEquals(SearchPipelineService.AD_HOC_PIPELINE_ID, pipeline.getId()); assertEquals(1, pipeline.getSearchRequestProcessors().size()); @@ -961,7 +989,10 @@ public void testExceptionOnPipelineCreation() { SearchRequest searchRequest = new SearchRequest().source(sourceBuilder); // Exception thrown when creating the pipeline - expectThrows(SearchPipelineProcessingException.class, () -> searchPipelineService.resolvePipeline(searchRequest)); + expectThrows( + SearchPipelineProcessingException.class, + () -> searchPipelineService.resolvePipeline(searchRequest, indexNameExpressionResolver) + ); } @@ -989,7 +1020,7 @@ public void testExceptionOnRequestProcessing() { // Exception thrown when processing the request expectThrows( SearchPipelineProcessingException.class, - () -> syncTransformRequest(searchPipelineService.resolvePipeline(searchRequest)) + () -> syncTransformRequest(searchPipelineService.resolvePipeline(searchRequest, indexNameExpressionResolver)) ); } @@ -1014,7 +1045,7 @@ public void testExceptionOnResponseProcessing() throws Exception { SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource().size(100).searchPipelineSource(pipelineSourceMap); SearchRequest searchRequest = new SearchRequest().source(sourceBuilder); - PipelinedRequest pipelinedRequest = searchPipelineService.resolvePipeline(searchRequest); + PipelinedRequest pipelinedRequest = searchPipelineService.resolvePipeline(searchRequest, indexNameExpressionResolver); SearchResponse response = new SearchResponse(null, null, 0, 0, 0, 0, null, null); // Exception thrown when processing response @@ -1052,7 +1083,7 @@ public void testCatchExceptionOnRequestProcessing() throws Exception { "The exception from request processor [throwing_request] in the search pipeline [_ad_hoc_pipeline] was ignored" ) ); - syncTransformRequest(searchPipelineService.resolvePipeline(searchRequest)); + syncTransformRequest(searchPipelineService.resolvePipeline(searchRequest, indexNameExpressionResolver)); mockAppender.assertAllExpectationsMatched(); } } @@ -1078,7 +1109,7 @@ public void testCatchExceptionOnResponseProcessing() throws Exception { SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource().size(100).searchPipelineSource(pipelineSourceMap); SearchRequest searchRequest = new SearchRequest().source(sourceBuilder); - PipelinedRequest pipelinedRequest = searchPipelineService.resolvePipeline(searchRequest); + PipelinedRequest pipelinedRequest = searchPipelineService.resolvePipeline(searchRequest, indexNameExpressionResolver); SearchResponse response = new SearchResponse(null, null, 0, 0, 0, 0, null, null); @@ -1122,15 +1153,27 @@ public void testStats() throws Exception { SearchRequest request = new SearchRequest(); SearchResponse response = new SearchResponse(null, null, 0, 0, 0, 0, null, null); - syncExecutePipeline(searchPipelineService.resolvePipeline(request.pipeline("good_request_pipeline")), response); + syncExecutePipeline( + searchPipelineService.resolvePipeline(request.pipeline("good_request_pipeline"), indexNameExpressionResolver), + response + ); expectThrows( SearchPipelineProcessingException.class, - () -> syncExecutePipeline(searchPipelineService.resolvePipeline(request.pipeline("bad_request_pipeline")), response) + () -> syncExecutePipeline( + searchPipelineService.resolvePipeline(request.pipeline("bad_request_pipeline"), indexNameExpressionResolver), + response + ) + ); + syncExecutePipeline( + searchPipelineService.resolvePipeline(request.pipeline("good_response_pipeline"), indexNameExpressionResolver), + response ); - syncExecutePipeline(searchPipelineService.resolvePipeline(request.pipeline("good_response_pipeline")), response); expectThrows( SearchPipelineProcessingException.class, - () -> syncExecutePipeline(searchPipelineService.resolvePipeline(request.pipeline("bad_response_pipeline")), response) + () -> syncExecutePipeline( + searchPipelineService.resolvePipeline(request.pipeline("bad_response_pipeline"), indexNameExpressionResolver), + response + ) ); SearchPipelineStats stats = searchPipelineService.stats(); @@ -1208,12 +1251,24 @@ public void testStatsEnabledIgnoreFailure() throws Exception { SearchRequest request = new SearchRequest(); SearchResponse response = new SearchResponse(null, null, 0, 0, 0, 0, null, null); - syncExecutePipeline(searchPipelineService.resolvePipeline(request.pipeline("good_request_pipeline")), response); + syncExecutePipeline( + searchPipelineService.resolvePipeline(request.pipeline("good_request_pipeline"), indexNameExpressionResolver), + response + ); // Caught Exception here - syncExecutePipeline(searchPipelineService.resolvePipeline(request.pipeline("bad_request_pipeline")), response); - syncExecutePipeline(searchPipelineService.resolvePipeline(request.pipeline("good_response_pipeline")), response); + syncExecutePipeline( + searchPipelineService.resolvePipeline(request.pipeline("bad_request_pipeline"), indexNameExpressionResolver), + response + ); + syncExecutePipeline( + searchPipelineService.resolvePipeline(request.pipeline("good_response_pipeline"), indexNameExpressionResolver), + response + ); // Caught Exception here - syncExecutePipeline(searchPipelineService.resolvePipeline(request.pipeline("bad_response_pipeline")), response); + syncExecutePipeline( + searchPipelineService.resolvePipeline(request.pipeline("bad_response_pipeline"), indexNameExpressionResolver), + response + ); // when ignoreFailure enabled, the search pipelines will all succeed. SearchPipelineStats stats = searchPipelineService.stats(); @@ -1355,7 +1410,10 @@ public void testAdHocRejectingProcessor() { SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource().searchPipelineSource(pipelineSourceMap); SearchRequest searchRequest = new SearchRequest().source(sourceBuilder); - expectThrows(SearchPipelineProcessingException.class, () -> searchPipelineService.resolvePipeline(searchRequest)); + expectThrows( + SearchPipelineProcessingException.class, + () -> searchPipelineService.resolvePipeline(searchRequest, indexNameExpressionResolver) + ); } public void testExtraParameterInProcessorConfig() { @@ -1369,7 +1427,7 @@ public void testExtraParameterInProcessorConfig() { SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource().searchPipelineSource(pipelineSourceMap); SearchRequest searchRequest = new SearchRequest().source(sourceBuilder); try { - searchPipelineService.resolvePipeline(searchRequest); + searchPipelineService.resolvePipeline(searchRequest, indexNameExpressionResolver); fail("Exception should have been thrown"); } catch (SearchPipelineProcessingException e) { assertTrue( @@ -1462,10 +1520,183 @@ public void testStatefulProcessors() throws Exception { .build(); searchPipelineService.applyClusterState(new ClusterChangedEvent("", clusterState, previousState)); - PipelinedRequest request = searchPipelineService.resolvePipeline(new SearchRequest().pipeline("p1")); + PipelinedRequest request = searchPipelineService.resolvePipeline(new SearchRequest().pipeline("p1"), indexNameExpressionResolver); assertNull(contextHolder.get()); syncExecutePipeline(request, new SearchResponse(null, null, 0, 0, 0, 0, null, null)); assertNotNull(contextHolder.get()); assertEquals("b", contextHolder.get()); } + + public void testDefaultPipelineForMultipleIndices() throws Exception { + SearchPipelineService service = createWithProcessors(); + + SearchPipelineMetadata metadata = new SearchPipelineMetadata( + Map.of( + "p1", + new PipelineConfiguration( + "p1", + new BytesArray("{\"request_processors\" : [ { \"scale_request_size\": { \"scale\" : 2 } } ] }"), + MediaTypeRegistry.JSON + ) + ) + ); + + Settings defaultPipelineSetting = Settings.builder() + .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) + .put(IndexMetadata.SETTING_INDEX_VERSION_CREATED.getKey(), Version.CURRENT) + .put(IndexSettings.DEFAULT_SEARCH_PIPELINE.getKey(), "p1") + .build(); + + Metadata.Builder mdBuilder = Metadata.builder() + .put(indexBuilder("foo", defaultPipelineSetting).putAlias(AliasMetadata.builder("bar"))) + .put(indexBuilder("foobar", defaultPipelineSetting).putAlias(AliasMetadata.builder("bar"))) + .put(indexBuilder("foofoo-closed", defaultPipelineSetting).putAlias(AliasMetadata.builder("bar"))) + .put(indexBuilder("foofoo", defaultPipelineSetting).putAlias(AliasMetadata.builder("bar"))); + + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); + ClusterState previousState = clusterState; + + clusterState = ClusterState.builder(clusterState).metadata(mdBuilder.putCustom(SearchPipelineMetadata.TYPE, metadata)).build(); + + ClusterChangedEvent cce = new ClusterChangedEvent("", clusterState, previousState); + service.applyClusterState(cce); + + SearchRequest searchRequest = new SearchRequest("bar").source(SearchSourceBuilder.searchSource().size(5)); + PipelinedRequest pipelinedRequest = syncTransformRequest(service.resolvePipeline(searchRequest, indexNameExpressionResolver)); + assertEquals("p1", pipelinedRequest.getPipeline().getId()); + assertEquals(10, pipelinedRequest.source().size()); + + // Bypass the default pipeline + searchRequest.pipeline("_none"); + pipelinedRequest = service.resolvePipeline(searchRequest, indexNameExpressionResolver); + assertEquals("_none", pipelinedRequest.getPipeline().getId()); + assertEquals(5, pipelinedRequest.source().size()); + } + + public void testDifferentDefaultPipelineForMultipleIndices() throws Exception { + SearchPipelineService service = createWithProcessors(); + + SearchPipelineMetadata metadata = new SearchPipelineMetadata( + Map.of( + "p1", + new PipelineConfiguration( + "p1", + new BytesArray("{\"request_processors\" : [ { \"scale_request_size\": { \"scale\" : 2 } } ] }"), + MediaTypeRegistry.JSON + ), + + "p2", + new PipelineConfiguration( + "p2", + new BytesArray("{\"request_processors\" : [ { \"scale_request_size\": { \"scale\" : 1 } } ] }"), + MediaTypeRegistry.JSON + ) + ) + ); + + Settings defaultPipelineSetting1 = Settings.builder() + .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) + .put(IndexMetadata.SETTING_INDEX_VERSION_CREATED.getKey(), Version.CURRENT) + .put(IndexSettings.DEFAULT_SEARCH_PIPELINE.getKey(), "p1") + .build(); + + Settings defaultPipelineSetting2 = Settings.builder() + .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) + .put(IndexMetadata.SETTING_INDEX_VERSION_CREATED.getKey(), Version.CURRENT) + .put(IndexSettings.DEFAULT_SEARCH_PIPELINE.getKey(), "p2") + .build(); + + Metadata.Builder mdBuilder = Metadata.builder() + .put(indexBuilder("foo", defaultPipelineSetting1).putAlias(AliasMetadata.builder("bar"))) + .put(indexBuilder("foobar", defaultPipelineSetting1).putAlias(AliasMetadata.builder("bar"))) + .put(indexBuilder("foofoo-closed", defaultPipelineSetting1).putAlias(AliasMetadata.builder("bar"))) + .put(indexBuilder("foofoo", defaultPipelineSetting2).putAlias(AliasMetadata.builder("bar"))); + + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); + ClusterState previousState = clusterState; + + clusterState = ClusterState.builder(clusterState).metadata(mdBuilder.putCustom(SearchPipelineMetadata.TYPE, metadata)).build(); + + ClusterChangedEvent cce = new ClusterChangedEvent("", clusterState, previousState); + service.applyClusterState(cce); + + SearchRequest searchRequest = new SearchRequest("bar").source(SearchSourceBuilder.searchSource().size(5)); + PipelinedRequest pipelinedRequest = syncTransformRequest(service.resolvePipeline(searchRequest, indexNameExpressionResolver)); + assertEquals("_none", pipelinedRequest.getPipeline().getId()); + assertEquals(5, pipelinedRequest.source().size()); + } + + public void testNoIndexResolveIndexDefaultPipeline() throws Exception { + SearchPipelineService service = createWithProcessors(); + + SearchPipelineMetadata metadata = new SearchPipelineMetadata( + Map.of( + "p1", + new PipelineConfiguration( + "p1", + new BytesArray("{\"request_processors\" : [ { \"scale_request_size\": { \"scale\" : 2 } } ] }"), + MediaTypeRegistry.JSON + ) + ) + ); + Settings defaultPipelineSetting = Settings.builder() + .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) + .put(IndexMetadata.SETTING_INDEX_VERSION_CREATED.getKey(), Version.CURRENT) + .put(IndexSettings.DEFAULT_SEARCH_PIPELINE.getKey(), "p1") + .build(); + IndexMetadata indexMetadata = new IndexMetadata.Builder("my_index").settings(defaultPipelineSetting).build(); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); + ClusterState previousState = clusterState; + clusterState = ClusterState.builder(clusterState) + .metadata(Metadata.builder().put(indexMetadata, false).putCustom(SearchPipelineMetadata.TYPE, metadata)) + .build(); + + ClusterChangedEvent cce = new ClusterChangedEvent("", clusterState, previousState); + service.applyClusterState(cce); + + SearchRequest searchRequest = new SearchRequest().source(SearchSourceBuilder.searchSource().size(5)); + PipelinedRequest pipelinedRequest = syncTransformRequest(service.resolvePipeline(searchRequest, indexNameExpressionResolver)); + assertEquals("_none", pipelinedRequest.getPipeline().getId()); + assertEquals(5, pipelinedRequest.source().size()); + } + + public void testInvalidIndexResolveIndexDefaultPipeline() throws Exception { + SearchPipelineService service = createWithProcessors(); + + SearchPipelineMetadata metadata = new SearchPipelineMetadata( + Map.of( + "p1", + new PipelineConfiguration( + "p1", + new BytesArray("{\"request_processors\" : [ { \"scale_request_size\": { \"scale\" : 2 } } ] }"), + MediaTypeRegistry.JSON + ) + ) + ); + Settings defaultPipelineSetting = Settings.builder() + .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) + .put(IndexMetadata.SETTING_INDEX_VERSION_CREATED.getKey(), Version.CURRENT) + .put(IndexSettings.DEFAULT_SEARCH_PIPELINE.getKey(), "p1") + .build(); + IndexMetadata indexMetadata = new IndexMetadata.Builder("my_index").settings(defaultPipelineSetting).build(); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); + ClusterState previousState = clusterState; + clusterState = ClusterState.builder(clusterState) + .metadata(Metadata.builder().put(indexMetadata, false).putCustom(SearchPipelineMetadata.TYPE, metadata)) + .build(); + + ClusterChangedEvent cce = new ClusterChangedEvent("", clusterState, previousState); + service.applyClusterState(cce); + + SearchRequest searchRequest = new SearchRequest("xyz").source(SearchSourceBuilder.searchSource().size(5)); + PipelinedRequest pipelinedRequest = syncTransformRequest(service.resolvePipeline(searchRequest, indexNameExpressionResolver)); + assertEquals("_none", pipelinedRequest.getPipeline().getId()); + assertEquals(5, pipelinedRequest.source().size()); + } + }