From f7d21daae93aa923cca54d4721b2833693af5e2f Mon Sep 17 00:00:00 2001 From: Owais Kazi Date: Mon, 15 Apr 2024 12:32:49 -0700 Subject: [PATCH 1/3] Handled default search pipeline for multiple indices Signed-off-by: Owais Kazi --- CHANGELOG.md | 1 + .../action/search/TransportSearchAction.java | 2 +- .../pipeline/SearchPipelineService.java | 25 +- .../pipeline/SearchPipelineServiceTests.java | 225 +++++++++++++++--- 4 files changed, 213 insertions(+), 40 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fc2f2c951a853..f04f14613baee 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Batch mode for async fetching shard information in GatewayAllocator for unassigned shards ([#8746](https://github.com/opensearch-project/OpenSearch/pull/8746)) - [Remote Store] Add settings for remote path type and hash algorithm ([#13225](https://github.com/opensearch-project/OpenSearch/pull/13225)) - [Remote Store] Upload remote paths during remote enabled index creation ([#13386](https://github.com/opensearch-project/OpenSearch/pull/13386)) +- [Search Pipeline] Handle default pipeline for multiple indices ([#13276](https://github.com/opensearch-project/OpenSearch/pull/13276)) ### Dependencies - Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896)) diff --git a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java index 65cfd35489033..143b01af3f62f 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java @@ -457,7 +457,7 @@ private void executeRequest( PipelinedRequest searchRequest; ActionListener listener; try { - searchRequest = searchPipelineService.resolvePipeline(originalSearchRequest); + searchRequest = searchPipelineService.resolvePipeline(originalSearchRequest, indexNameExpressionResolver); listener = searchRequest.transformResponseListener(updatedListener); } catch (Exception e) { updatedListener.onFailure(e); diff --git a/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java b/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java index 2175b5d135394..2bae68b7a25ba 100644 --- a/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java +++ b/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java @@ -23,6 +23,7 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateApplier; import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.service.ClusterManagerTaskKeys; @@ -35,6 +36,7 @@ import org.opensearch.common.xcontent.XContentHelper; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; +import org.opensearch.core.index.Index; import org.opensearch.core.service.ReportingService; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.env.Environment; @@ -360,7 +362,7 @@ static ClusterState innerDelete(DeleteSearchPipelineRequest request, ClusterStat return newState.build(); } - public PipelinedRequest resolvePipeline(SearchRequest searchRequest) { + public PipelinedRequest resolvePipeline(SearchRequest searchRequest, IndexNameExpressionResolver indexNameExpressionResolver) { Pipeline pipeline = Pipeline.NO_OP_PIPELINE; if (searchRequest.source() != null && searchRequest.source().searchPipelineSource() != null) { @@ -390,13 +392,22 @@ public PipelinedRequest resolvePipeline(SearchRequest searchRequest) { if (searchRequest.pipeline() != null) { // Named pipeline specified for the request pipelineId = searchRequest.pipeline(); - } else if (state != null && searchRequest.indices() != null && searchRequest.indices().length == 1) { + } else if (state != null && searchRequest.indices() != null) { // Check for index default pipeline - IndexMetadata indexMetadata = state.metadata().index(searchRequest.indices()[0]); - if (indexMetadata != null) { - Settings indexSettings = indexMetadata.getSettings(); - if (IndexSettings.DEFAULT_SEARCH_PIPELINE.exists(indexSettings)) { - pipelineId = IndexSettings.DEFAULT_SEARCH_PIPELINE.get(indexSettings); + Index[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, searchRequest); + for (Index index : concreteIndices) { + IndexMetadata indexMetadata = state.metadata().index(index); + if (indexMetadata != null) { + Settings indexSettings = indexMetadata.getSettings(); + if (IndexSettings.DEFAULT_SEARCH_PIPELINE.exists(indexSettings)) { + String currentPipelineId = IndexSettings.DEFAULT_SEARCH_PIPELINE.get(indexSettings); + if (NOOP_PIPELINE_ID.equals(pipelineId)) { + pipelineId = currentPipelineId; + } else if (pipelineId.equals(currentPipelineId) == false) { + pipelineId = NOOP_PIPELINE_ID; + break; + } + } } } } diff --git a/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java b/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java index f5851e669a2da..4b9daadeda02f 100644 --- a/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java +++ b/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java @@ -32,7 +32,9 @@ import org.opensearch.cluster.ClusterChangedEvent; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.AliasMetadata; import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.service.ClusterService; @@ -41,6 +43,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.util.concurrent.AtomicArray; import org.opensearch.common.util.concurrent.OpenSearchExecutors; +import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.common.xcontent.XContentType; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.breaker.CircuitBreaker; @@ -96,14 +99,31 @@ public Map> getSearchPhas private ThreadPool threadPool; + private IndexNameExpressionResolver indexNameExpressionResolver; + @Before public void setup() { threadPool = mock(ThreadPool.class); ExecutorService executorService = OpenSearchExecutors.newDirectExecutorService(); + indexNameExpressionResolver = new IndexNameExpressionResolver(new ThreadContext(Settings.EMPTY)); when(threadPool.generic()).thenReturn(executorService); when(threadPool.executor(anyString())).thenReturn(executorService); } + private static IndexMetadata.Builder indexBuilder(String index) { + return indexBuilder(index, Settings.EMPTY); + } + + private static IndexMetadata.Builder indexBuilder(String index, Settings additionalSettings) { + return IndexMetadata.builder(index).settings(settings(additionalSettings)); + } + + private static Settings.Builder settings(Settings additionalSettings) { + return settings(Version.CURRENT).put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(additionalSettings); + } + public void testSearchPipelinePlugin() { Client client = mock(Client.class); SearchPipelineService searchPipelineService = new SearchPipelineService( @@ -162,7 +182,7 @@ public void testResolveSearchPipelineDoesNotExist() { final SearchRequest searchRequest = new SearchRequest("_index").pipeline("bar"); IllegalArgumentException e = expectThrows( IllegalArgumentException.class, - () -> searchPipelineService.resolvePipeline(searchRequest) + () -> searchPipelineService.resolvePipeline(searchRequest, indexNameExpressionResolver) ); assertTrue(e.getMessage(), e.getMessage().contains(" not defined")); } @@ -197,13 +217,13 @@ public void testResolveIndexDefaultPipeline() throws Exception { service.applyClusterState(cce); SearchRequest searchRequest = new SearchRequest("my_index").source(SearchSourceBuilder.searchSource().size(5)); - PipelinedRequest pipelinedRequest = syncTransformRequest(service.resolvePipeline(searchRequest)); + PipelinedRequest pipelinedRequest = syncTransformRequest(service.resolvePipeline(searchRequest, indexNameExpressionResolver)); assertEquals("p1", pipelinedRequest.getPipeline().getId()); assertEquals(10, pipelinedRequest.source().size()); // Bypass the default pipeline searchRequest.pipeline("_none"); - pipelinedRequest = service.resolvePipeline(searchRequest); + pipelinedRequest = service.resolvePipeline(searchRequest, indexNameExpressionResolver); assertEquals("_none", pipelinedRequest.getPipeline().getId()); assertEquals(5, pipelinedRequest.source().size()); } @@ -591,23 +611,29 @@ public void testTransformRequest() throws Exception { ); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); ClusterState previousState = clusterState; - clusterState = ClusterState.builder(clusterState) - .metadata(Metadata.builder().putCustom(SearchPipelineMetadata.TYPE, metadata)) - .build(); + + Metadata.Builder mdBuilder = Metadata.builder() + .putCustom(SearchPipelineMetadata.TYPE, metadata) + .put(indexBuilder("my-index").putAlias(AliasMetadata.builder("barbaz"))); + + clusterState = ClusterState.builder(clusterState).metadata(mdBuilder).build(); + searchPipelineService.applyClusterState(new ClusterChangedEvent("", clusterState, previousState)); int size = 10; SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(new TermQueryBuilder("foo", "bar")).size(size); - SearchRequest request = new SearchRequest("_index").source(sourceBuilder).pipeline("p1"); + SearchRequest request = new SearchRequest("my-index").source(sourceBuilder).pipeline("p1"); - PipelinedRequest pipelinedRequest = syncTransformRequest(searchPipelineService.resolvePipeline(request)); + PipelinedRequest pipelinedRequest = syncTransformRequest( + searchPipelineService.resolvePipeline(request, indexNameExpressionResolver) + ); assertEquals(2 * size, pipelinedRequest.source().size()); assertEquals(size, request.source().size()); // This request doesn't specify a pipeline, it doesn't get transformed. - request = new SearchRequest("_index").source(sourceBuilder); - pipelinedRequest = searchPipelineService.resolvePipeline(request); + request = new SearchRequest("my-index").source(sourceBuilder); + pipelinedRequest = searchPipelineService.resolvePipeline(request, indexNameExpressionResolver); assertEquals(size, pipelinedRequest.source().size()); } @@ -643,13 +669,13 @@ public void testTransformResponse() throws Exception { // First try without specifying a pipeline, which should be a no-op. SearchRequest searchRequest = new SearchRequest(); - PipelinedRequest pipelinedRequest = searchPipelineService.resolvePipeline(searchRequest); + PipelinedRequest pipelinedRequest = searchPipelineService.resolvePipeline(searchRequest, indexNameExpressionResolver); SearchResponse notTransformedResponse = syncTransformResponse(pipelinedRequest, searchResponse); assertSame(searchResponse, notTransformedResponse); // Now apply a pipeline searchRequest = new SearchRequest().pipeline("p1"); - pipelinedRequest = searchPipelineService.resolvePipeline(searchRequest); + pipelinedRequest = searchPipelineService.resolvePipeline(searchRequest, indexNameExpressionResolver); SearchResponse transformedResponse = syncTransformResponse(pipelinedRequest, searchResponse); assertEquals(size, transformedResponse.getHits().getHits().length); for (int i = 0; i < size; i++) { @@ -736,7 +762,7 @@ public void testTransformSearchPhase() { // First try without specifying a pipeline, which should be a no-op. SearchRequest searchRequest = new SearchRequest(); - PipelinedRequest pipelinedRequest = searchPipelineService.resolvePipeline(searchRequest); + PipelinedRequest pipelinedRequest = searchPipelineService.resolvePipeline(searchRequest, indexNameExpressionResolver); AtomicArray notTransformedSearchPhaseResults = searchPhaseResults.getAtomicArray(); pipelinedRequest.transformSearchPhaseResults( searchPhaseResults, @@ -748,7 +774,7 @@ public void testTransformSearchPhase() { // Now set the pipeline as p1 searchRequest = new SearchRequest().pipeline("p1"); - pipelinedRequest = searchPipelineService.resolvePipeline(searchRequest); + pipelinedRequest = searchPipelineService.resolvePipeline(searchRequest, indexNameExpressionResolver); pipelinedRequest.transformSearchPhaseResults( searchPhaseResults, @@ -766,7 +792,7 @@ public void testTransformSearchPhase() { // Check Processor doesn't run for between other phases searchRequest = new SearchRequest().pipeline("p1"); - pipelinedRequest = searchPipelineService.resolvePipeline(searchRequest); + pipelinedRequest = searchPipelineService.resolvePipeline(searchRequest, indexNameExpressionResolver); AtomicArray notTransformedSearchPhaseResult = searchPhaseResults.getAtomicArray(); pipelinedRequest.transformSearchPhaseResults( searchPhaseResults, @@ -916,7 +942,9 @@ public void testInlinePipeline() throws Exception { SearchRequest searchRequest = new SearchRequest().source(sourceBuilder); // Verify pipeline - PipelinedRequest pipelinedRequest = syncTransformRequest(searchPipelineService.resolvePipeline(searchRequest)); + PipelinedRequest pipelinedRequest = syncTransformRequest( + searchPipelineService.resolvePipeline(searchRequest, indexNameExpressionResolver) + ); Pipeline pipeline = pipelinedRequest.getPipeline(); assertEquals(SearchPipelineService.AD_HOC_PIPELINE_ID, pipeline.getId()); assertEquals(1, pipeline.getSearchRequestProcessors().size()); @@ -961,7 +989,10 @@ public void testExceptionOnPipelineCreation() { SearchRequest searchRequest = new SearchRequest().source(sourceBuilder); // Exception thrown when creating the pipeline - expectThrows(SearchPipelineProcessingException.class, () -> searchPipelineService.resolvePipeline(searchRequest)); + expectThrows( + SearchPipelineProcessingException.class, + () -> searchPipelineService.resolvePipeline(searchRequest, indexNameExpressionResolver) + ); } @@ -989,7 +1020,7 @@ public void testExceptionOnRequestProcessing() { // Exception thrown when processing the request expectThrows( SearchPipelineProcessingException.class, - () -> syncTransformRequest(searchPipelineService.resolvePipeline(searchRequest)) + () -> syncTransformRequest(searchPipelineService.resolvePipeline(searchRequest, indexNameExpressionResolver)) ); } @@ -1014,7 +1045,7 @@ public void testExceptionOnResponseProcessing() throws Exception { SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource().size(100).searchPipelineSource(pipelineSourceMap); SearchRequest searchRequest = new SearchRequest().source(sourceBuilder); - PipelinedRequest pipelinedRequest = searchPipelineService.resolvePipeline(searchRequest); + PipelinedRequest pipelinedRequest = searchPipelineService.resolvePipeline(searchRequest, indexNameExpressionResolver); SearchResponse response = new SearchResponse(null, null, 0, 0, 0, 0, null, null); // Exception thrown when processing response @@ -1052,7 +1083,7 @@ public void testCatchExceptionOnRequestProcessing() throws Exception { "The exception from request processor [throwing_request] in the search pipeline [_ad_hoc_pipeline] was ignored" ) ); - syncTransformRequest(searchPipelineService.resolvePipeline(searchRequest)); + syncTransformRequest(searchPipelineService.resolvePipeline(searchRequest, indexNameExpressionResolver)); mockAppender.assertAllExpectationsMatched(); } } @@ -1078,7 +1109,7 @@ public void testCatchExceptionOnResponseProcessing() throws Exception { SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource().size(100).searchPipelineSource(pipelineSourceMap); SearchRequest searchRequest = new SearchRequest().source(sourceBuilder); - PipelinedRequest pipelinedRequest = searchPipelineService.resolvePipeline(searchRequest); + PipelinedRequest pipelinedRequest = searchPipelineService.resolvePipeline(searchRequest, indexNameExpressionResolver); SearchResponse response = new SearchResponse(null, null, 0, 0, 0, 0, null, null); @@ -1122,15 +1153,27 @@ public void testStats() throws Exception { SearchRequest request = new SearchRequest(); SearchResponse response = new SearchResponse(null, null, 0, 0, 0, 0, null, null); - syncExecutePipeline(searchPipelineService.resolvePipeline(request.pipeline("good_request_pipeline")), response); + syncExecutePipeline( + searchPipelineService.resolvePipeline(request.pipeline("good_request_pipeline"), indexNameExpressionResolver), + response + ); expectThrows( SearchPipelineProcessingException.class, - () -> syncExecutePipeline(searchPipelineService.resolvePipeline(request.pipeline("bad_request_pipeline")), response) + () -> syncExecutePipeline( + searchPipelineService.resolvePipeline(request.pipeline("bad_request_pipeline"), indexNameExpressionResolver), + response + ) + ); + syncExecutePipeline( + searchPipelineService.resolvePipeline(request.pipeline("good_response_pipeline"), indexNameExpressionResolver), + response ); - syncExecutePipeline(searchPipelineService.resolvePipeline(request.pipeline("good_response_pipeline")), response); expectThrows( SearchPipelineProcessingException.class, - () -> syncExecutePipeline(searchPipelineService.resolvePipeline(request.pipeline("bad_response_pipeline")), response) + () -> syncExecutePipeline( + searchPipelineService.resolvePipeline(request.pipeline("bad_response_pipeline"), indexNameExpressionResolver), + response + ) ); SearchPipelineStats stats = searchPipelineService.stats(); @@ -1208,12 +1251,24 @@ public void testStatsEnabledIgnoreFailure() throws Exception { SearchRequest request = new SearchRequest(); SearchResponse response = new SearchResponse(null, null, 0, 0, 0, 0, null, null); - syncExecutePipeline(searchPipelineService.resolvePipeline(request.pipeline("good_request_pipeline")), response); + syncExecutePipeline( + searchPipelineService.resolvePipeline(request.pipeline("good_request_pipeline"), indexNameExpressionResolver), + response + ); // Caught Exception here - syncExecutePipeline(searchPipelineService.resolvePipeline(request.pipeline("bad_request_pipeline")), response); - syncExecutePipeline(searchPipelineService.resolvePipeline(request.pipeline("good_response_pipeline")), response); + syncExecutePipeline( + searchPipelineService.resolvePipeline(request.pipeline("bad_request_pipeline"), indexNameExpressionResolver), + response + ); + syncExecutePipeline( + searchPipelineService.resolvePipeline(request.pipeline("good_response_pipeline"), indexNameExpressionResolver), + response + ); // Caught Exception here - syncExecutePipeline(searchPipelineService.resolvePipeline(request.pipeline("bad_response_pipeline")), response); + syncExecutePipeline( + searchPipelineService.resolvePipeline(request.pipeline("bad_response_pipeline"), indexNameExpressionResolver), + response + ); // when ignoreFailure enabled, the search pipelines will all succeed. SearchPipelineStats stats = searchPipelineService.stats(); @@ -1355,7 +1410,10 @@ public void testAdHocRejectingProcessor() { SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource().searchPipelineSource(pipelineSourceMap); SearchRequest searchRequest = new SearchRequest().source(sourceBuilder); - expectThrows(SearchPipelineProcessingException.class, () -> searchPipelineService.resolvePipeline(searchRequest)); + expectThrows( + SearchPipelineProcessingException.class, + () -> searchPipelineService.resolvePipeline(searchRequest, indexNameExpressionResolver) + ); } public void testExtraParameterInProcessorConfig() { @@ -1369,7 +1427,7 @@ public void testExtraParameterInProcessorConfig() { SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource().searchPipelineSource(pipelineSourceMap); SearchRequest searchRequest = new SearchRequest().source(sourceBuilder); try { - searchPipelineService.resolvePipeline(searchRequest); + searchPipelineService.resolvePipeline(searchRequest, indexNameExpressionResolver); fail("Exception should have been thrown"); } catch (SearchPipelineProcessingException e) { assertTrue( @@ -1462,10 +1520,113 @@ public void testStatefulProcessors() throws Exception { .build(); searchPipelineService.applyClusterState(new ClusterChangedEvent("", clusterState, previousState)); - PipelinedRequest request = searchPipelineService.resolvePipeline(new SearchRequest().pipeline("p1")); + PipelinedRequest request = searchPipelineService.resolvePipeline(new SearchRequest().pipeline("p1"), indexNameExpressionResolver); assertNull(contextHolder.get()); syncExecutePipeline(request, new SearchResponse(null, null, 0, 0, 0, 0, null, null)); assertNotNull(contextHolder.get()); assertEquals("b", contextHolder.get()); } + + public void testDefaultPipelineForMultipleIndices() throws Exception { + SearchPipelineService service = createWithProcessors(); + + SearchPipelineMetadata metadata = new SearchPipelineMetadata( + Map.of( + "p1", + new PipelineConfiguration( + "p1", + new BytesArray("{\"request_processors\" : [ { \"scale_request_size\": { \"scale\" : 2 } } ] }"), + MediaTypeRegistry.JSON + ) + ) + ); + + Settings defaultPipelineSetting = Settings.builder() + .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) + .put(IndexMetadata.SETTING_INDEX_VERSION_CREATED.getKey(), Version.CURRENT) + .put(IndexSettings.DEFAULT_SEARCH_PIPELINE.getKey(), "p1") + .build(); + + Metadata.Builder mdBuilder = Metadata.builder() + .put(indexBuilder("foo", defaultPipelineSetting).putAlias(AliasMetadata.builder("bar"))) + .put(indexBuilder("foobar", defaultPipelineSetting).putAlias(AliasMetadata.builder("bar"))) + .put(indexBuilder("foofoo-closed", defaultPipelineSetting).putAlias(AliasMetadata.builder("bar"))) + .put(indexBuilder("foofoo", defaultPipelineSetting).putAlias(AliasMetadata.builder("bar"))); + + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); + ClusterState previousState = clusterState; + + clusterState = ClusterState.builder(clusterState).metadata(mdBuilder.putCustom(SearchPipelineMetadata.TYPE, metadata)).build(); + + ClusterChangedEvent cce = new ClusterChangedEvent("", clusterState, previousState); + service.applyClusterState(cce); + + SearchRequest searchRequest = new SearchRequest("bar").source(SearchSourceBuilder.searchSource().size(5)); + PipelinedRequest pipelinedRequest = syncTransformRequest(service.resolvePipeline(searchRequest, indexNameExpressionResolver)); + assertEquals("p1", pipelinedRequest.getPipeline().getId()); + assertEquals(10, pipelinedRequest.source().size()); + + // Bypass the default pipeline + searchRequest.pipeline("_none"); + pipelinedRequest = service.resolvePipeline(searchRequest, indexNameExpressionResolver); + assertEquals("_none", pipelinedRequest.getPipeline().getId()); + assertEquals(5, pipelinedRequest.source().size()); + } + + public void testDifferentDefaultPipelineForMultipleIndices() throws Exception { + SearchPipelineService service = createWithProcessors(); + + SearchPipelineMetadata metadata = new SearchPipelineMetadata( + Map.of( + "p1", + new PipelineConfiguration( + "p1", + new BytesArray("{\"request_processors\" : [ { \"scale_request_size\": { \"scale\" : 2 } } ] }"), + MediaTypeRegistry.JSON + ), + + "p2", + new PipelineConfiguration( + "p2", + new BytesArray("{\"request_processors\" : [ { \"scale_request_size\": { \"scale\" : 1 } } ] }"), + MediaTypeRegistry.JSON + ) + ) + ); + + Settings defaultPipelineSetting1 = Settings.builder() + .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) + .put(IndexMetadata.SETTING_INDEX_VERSION_CREATED.getKey(), Version.CURRENT) + .put(IndexSettings.DEFAULT_SEARCH_PIPELINE.getKey(), "p1") + .build(); + + Settings defaultPipelineSetting2 = Settings.builder() + .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) + .put(IndexMetadata.SETTING_INDEX_VERSION_CREATED.getKey(), Version.CURRENT) + .put(IndexSettings.DEFAULT_SEARCH_PIPELINE.getKey(), "p2") + .build(); + + Metadata.Builder mdBuilder = Metadata.builder() + .put(indexBuilder("foo", defaultPipelineSetting1).putAlias(AliasMetadata.builder("bar"))) + .put(indexBuilder("foobar", defaultPipelineSetting1).putAlias(AliasMetadata.builder("bar"))) + .put(indexBuilder("foofoo-closed", defaultPipelineSetting1).putAlias(AliasMetadata.builder("bar"))) + .put(indexBuilder("foofoo", defaultPipelineSetting2).putAlias(AliasMetadata.builder("bar"))); + + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); + ClusterState previousState = clusterState; + + clusterState = ClusterState.builder(clusterState).metadata(mdBuilder.putCustom(SearchPipelineMetadata.TYPE, metadata)).build(); + + ClusterChangedEvent cce = new ClusterChangedEvent("", clusterState, previousState); + service.applyClusterState(cce); + + SearchRequest searchRequest = new SearchRequest("bar").source(SearchSourceBuilder.searchSource().size(5)); + PipelinedRequest pipelinedRequest = syncTransformRequest(service.resolvePipeline(searchRequest, indexNameExpressionResolver)); + assertEquals("_none", pipelinedRequest.getPipeline().getId()); + assertEquals(5, pipelinedRequest.source().size()); + } + } From efbe3fd213061dc11a6bfecf0b36be28310642d1 Mon Sep 17 00:00:00 2001 From: Owais Kazi Date: Thu, 18 Apr 2024 10:50:42 -0700 Subject: [PATCH 2/3] Addressed PR comments Signed-off-by: Owais Kazi --- .../pipeline/SearchPipelineService.java | 2 +- .../pipeline/SearchPipelineServiceTests.java | 39 ++++++++++++++++++- 2 files changed, 38 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java b/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java index 2bae68b7a25ba..9e4341347d8a8 100644 --- a/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java +++ b/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java @@ -392,7 +392,7 @@ public PipelinedRequest resolvePipeline(SearchRequest searchRequest, IndexNameEx if (searchRequest.pipeline() != null) { // Named pipeline specified for the request pipelineId = searchRequest.pipeline(); - } else if (state != null && searchRequest.indices() != null) { + } else if (state != null && searchRequest.indices() != null && searchRequest.indices().length != 0) { // Check for index default pipeline Index[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, searchRequest); for (Index index : concreteIndices) { diff --git a/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java b/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java index 4b9daadeda02f..0788b2b483a96 100644 --- a/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java +++ b/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java @@ -115,10 +115,10 @@ private static IndexMetadata.Builder indexBuilder(String index) { } private static IndexMetadata.Builder indexBuilder(String index, Settings additionalSettings) { - return IndexMetadata.builder(index).settings(settings(additionalSettings)); + return IndexMetadata.builder(index).settings(addAdditionalSettings(additionalSettings)); } - private static Settings.Builder settings(Settings additionalSettings) { + private static Settings.Builder addAdditionalSettings(Settings additionalSettings) { return settings(Version.CURRENT).put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) .put(additionalSettings); @@ -1629,4 +1629,39 @@ public void testDifferentDefaultPipelineForMultipleIndices() throws Exception { assertEquals(5, pipelinedRequest.source().size()); } + public void testNoIndexResolveIndexDefaultPipeline() throws Exception { + SearchPipelineService service = createWithProcessors(); + + SearchPipelineMetadata metadata = new SearchPipelineMetadata( + Map.of( + "p1", + new PipelineConfiguration( + "p1", + new BytesArray("{\"request_processors\" : [ { \"scale_request_size\": { \"scale\" : 2 } } ] }"), + MediaTypeRegistry.JSON + ) + ) + ); + Settings defaultPipelineSetting = Settings.builder() + .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) + .put(IndexMetadata.SETTING_INDEX_VERSION_CREATED.getKey(), Version.CURRENT) + .put(IndexSettings.DEFAULT_SEARCH_PIPELINE.getKey(), "p1") + .build(); + IndexMetadata indexMetadata = new IndexMetadata.Builder("my_index").settings(defaultPipelineSetting).build(); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); + ClusterState previousState = clusterState; + clusterState = ClusterState.builder(clusterState) + .metadata(Metadata.builder().put(indexMetadata, false).putCustom(SearchPipelineMetadata.TYPE, metadata)) + .build(); + + ClusterChangedEvent cce = new ClusterChangedEvent("", clusterState, previousState); + service.applyClusterState(cce); + + SearchRequest searchRequest = new SearchRequest().source(SearchSourceBuilder.searchSource().size(5)); + PipelinedRequest pipelinedRequest = syncTransformRequest(service.resolvePipeline(searchRequest, indexNameExpressionResolver)); + assertEquals("_none", pipelinedRequest.getPipeline().getId()); + assertEquals(5, pipelinedRequest.source().size()); + } + } From 6f36ec534dedec0196c913a298d6d3d2506ede0b Mon Sep 17 00:00:00 2001 From: Owais Kazi Date: Thu, 18 Apr 2024 13:59:37 -0700 Subject: [PATCH 3/3] Handled exception Signed-off-by: Owais Kazi --- .../pipeline/SearchPipelineService.java | 33 ++++++++++------- .../pipeline/SearchPipelineServiceTests.java | 35 +++++++++++++++++++ 2 files changed, 55 insertions(+), 13 deletions(-) diff --git a/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java b/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java index 9e4341347d8a8..012d6695c042b 100644 --- a/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java +++ b/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java @@ -41,6 +41,7 @@ import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.env.Environment; import org.opensearch.gateway.GatewayService; +import org.opensearch.index.IndexNotFoundException; import org.opensearch.index.IndexSettings; import org.opensearch.index.analysis.AnalysisRegistry; import org.opensearch.ingest.ConfigurationUtils; @@ -64,6 +65,8 @@ /** * The main entry point for search pipelines. Handles CRUD operations and exposes the API to execute search pipelines * against requests and responses. + * + * @opensearch.internal */ public class SearchPipelineService implements ClusterStateApplier, ReportingService { @@ -393,22 +396,26 @@ public PipelinedRequest resolvePipeline(SearchRequest searchRequest, IndexNameEx // Named pipeline specified for the request pipelineId = searchRequest.pipeline(); } else if (state != null && searchRequest.indices() != null && searchRequest.indices().length != 0) { - // Check for index default pipeline - Index[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, searchRequest); - for (Index index : concreteIndices) { - IndexMetadata indexMetadata = state.metadata().index(index); - if (indexMetadata != null) { - Settings indexSettings = indexMetadata.getSettings(); - if (IndexSettings.DEFAULT_SEARCH_PIPELINE.exists(indexSettings)) { - String currentPipelineId = IndexSettings.DEFAULT_SEARCH_PIPELINE.get(indexSettings); - if (NOOP_PIPELINE_ID.equals(pipelineId)) { - pipelineId = currentPipelineId; - } else if (pipelineId.equals(currentPipelineId) == false) { - pipelineId = NOOP_PIPELINE_ID; - break; + try { + // Check for index default pipeline + Index[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, searchRequest); + for (Index index : concreteIndices) { + IndexMetadata indexMetadata = state.metadata().index(index); + if (indexMetadata != null) { + Settings indexSettings = indexMetadata.getSettings(); + if (IndexSettings.DEFAULT_SEARCH_PIPELINE.exists(indexSettings)) { + String currentPipelineId = IndexSettings.DEFAULT_SEARCH_PIPELINE.get(indexSettings); + if (NOOP_PIPELINE_ID.equals(pipelineId)) { + pipelineId = currentPipelineId; + } else if (!pipelineId.equals(currentPipelineId)) { + pipelineId = NOOP_PIPELINE_ID; + break; + } } } } + } catch (IndexNotFoundException e) { + logger.debug("Default pipeline not applied for {}", (Object) searchRequest.indices()); } } if (NOOP_PIPELINE_ID.equals(pipelineId) == false) { diff --git a/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java b/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java index 0788b2b483a96..f5857922fdff2 100644 --- a/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java +++ b/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java @@ -1664,4 +1664,39 @@ public void testNoIndexResolveIndexDefaultPipeline() throws Exception { assertEquals(5, pipelinedRequest.source().size()); } + public void testInvalidIndexResolveIndexDefaultPipeline() throws Exception { + SearchPipelineService service = createWithProcessors(); + + SearchPipelineMetadata metadata = new SearchPipelineMetadata( + Map.of( + "p1", + new PipelineConfiguration( + "p1", + new BytesArray("{\"request_processors\" : [ { \"scale_request_size\": { \"scale\" : 2 } } ] }"), + MediaTypeRegistry.JSON + ) + ) + ); + Settings defaultPipelineSetting = Settings.builder() + .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) + .put(IndexMetadata.SETTING_INDEX_VERSION_CREATED.getKey(), Version.CURRENT) + .put(IndexSettings.DEFAULT_SEARCH_PIPELINE.getKey(), "p1") + .build(); + IndexMetadata indexMetadata = new IndexMetadata.Builder("my_index").settings(defaultPipelineSetting).build(); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); + ClusterState previousState = clusterState; + clusterState = ClusterState.builder(clusterState) + .metadata(Metadata.builder().put(indexMetadata, false).putCustom(SearchPipelineMetadata.TYPE, metadata)) + .build(); + + ClusterChangedEvent cce = new ClusterChangedEvent("", clusterState, previousState); + service.applyClusterState(cce); + + SearchRequest searchRequest = new SearchRequest("xyz").source(SearchSourceBuilder.searchSource().size(5)); + PipelinedRequest pipelinedRequest = syncTransformRequest(service.resolvePipeline(searchRequest, indexNameExpressionResolver)); + assertEquals("_none", pipelinedRequest.getPipeline().getId()); + assertEquals(5, pipelinedRequest.source().size()); + } + }