diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index a1ecbe5162e1e..63ee50b19aeb1 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -500,7 +500,7 @@ public Pipeline getPipeline(String id) { } } - private List getPluginsPipelines(String id) { + List getPluginsPipelines(String id) { if (id == null) { return null; } @@ -1138,7 +1138,7 @@ public void applyClusterState(final ClusterChangedEvent event) { updatePluginPipelines(event); } - private synchronized void updatePluginPipelines(ClusterChangedEvent event) { + synchronized void updatePluginPipelines(ClusterChangedEvent event) { Map> updatedPluginPipelines = new HashMap<>(pluginPipelines); @@ -1172,6 +1172,10 @@ private synchronized void updatePluginPipelines(ClusterChangedEvent event) { pluginPipelines = Map.copyOf(updatedPluginPipelines); } + Map> getPluginsPipelines() { + return pluginPipelines; + } + synchronized void innerUpdatePipelines(IngestMetadata newIngestMetadata) { Map existingPipelines = this.pipelines; diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java index 96008de93febd..5e6ead1ba6d4e 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java @@ -20,7 +20,6 @@ import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.update.UpdateRequest; -import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexAbstraction; @@ -31,7 +30,6 @@ import org.elasticsearch.cluster.node.DiscoveryNodeUtils; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.IndexNotFoundException; @@ -42,9 +40,7 @@ import org.elasticsearch.indices.EmptySystemIndices; import org.elasticsearch.indices.SystemIndexDescriptorUtils; import org.elasticsearch.indices.SystemIndices; -import org.elasticsearch.ingest.IngestService; import org.elasticsearch.ingest.IngestServiceTests; -import org.elasticsearch.plugins.internal.DocumentParsingObserver; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; import org.elasticsearch.test.index.IndexVersionUtils; @@ -66,9 +62,6 @@ import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; public class TransportBulkActionTests extends ESTestCase { diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index 66eeb09881ee9..7294343cd8030 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -82,6 +82,7 @@ import java.util.Locale; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; @@ -132,26 +133,11 @@ public Map getProcessors(Processor.Parameters paramet } }; - private ThreadPool threadPool; private IngestService ingestService; @Before public void setup() { - threadPool = mock(ThreadPool.class); - when(threadPool.generic()).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE); - when(threadPool.executor(anyString())).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE); - Client client = mock(Client.class); - ingestService = new IngestService( - mock(ClusterService.class), - threadPool, - null, - null, - null, - List.of(DUMMY_PLUGIN), - client, - null, - () -> DocumentParsingObserver.EMPTY_INSTANCE - ); + ingestService = createIngestService(); } public void testIngestPlugin() { @@ -164,17 +150,7 @@ public void testIngestPluginDuplicate() { Client client = mock(Client.class); IllegalArgumentException e = expectThrows( IllegalArgumentException.class, - () -> new IngestService( - mock(ClusterService.class), - threadPool, - null, - null, - null, - List.of(DUMMY_PLUGIN, DUMMY_PLUGIN), - client, - null, - () -> DocumentParsingObserver.EMPTY_INSTANCE - ) + () -> createIngestService(List.of(DUMMY_PLUGIN, DUMMY_PLUGIN)) ); assertTrue(e.getMessage(), e.getMessage().contains("already registered")); } @@ -1962,17 +1938,7 @@ public Map getProcessors(Processor.Parameters paramet // Create ingest service: Client client = mock(Client.class); - IngestService ingestService = new IngestService( - mock(ClusterService.class), - threadPool, - null, - null, - null, - List.of(testPlugin), - client, - null, - () -> DocumentParsingObserver.EMPTY_INSTANCE - ); + IngestService ingestService = createIngestService(List.of(testPlugin)); ingestService.addIngestClusterStateListener(ingestClusterStateListener); // Create pipeline and apply the resulting cluster state, which should update the counter in the right order: @@ -2305,17 +2271,6 @@ private void testUpdatingPipeline(String pipelineString) throws Exception { Client client = mock(Client.class); ClusterService clusterService = mock(ClusterService.class); when(clusterService.state()).thenReturn(clusterState); - IngestService ingestService = new IngestService( - clusterService, - threadPool, - null, - null, - null, - List.of(DUMMY_PLUGIN), - client, - null, - () -> DocumentParsingObserver.EMPTY_INSTANCE - ); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, clusterState)); CountDownLatch latch = new CountDownLatch(1); @@ -2548,6 +2503,118 @@ public void testResolvePipelinesWithNonePipeline() { } } + public void testPluginPipelinesUpdated() { + Processor processor = new TestProcessor("tag", "type", "description", (ingestDocument) -> {}); + Pipeline ingestPluginPipeline = new Pipeline("test-pipeline-1", null, null, null, new CompoundProcessor(processor)); + Pipeline aliasedIndexPipeline = new Pipeline("test-pipeline-2", null, null, null, new CompoundProcessor(processor)); + + // Ingest plugin that always return a pipeline + IngestPlugin ingestPluginWithPipeline = new IngestPlugin() { + public Optional getIngestPipeline(IndexMetadata indexMetadata, Processor.Parameters parameters) { + return Optional.of(ingestPluginPipeline); + } + }; + + // Ingest plugin that returns a pipeline only when index has aliases + IngestPlugin ingestPluginForAliasedIndex = new IngestPlugin() { + public Optional getIngestPipeline(IndexMetadata indexMetadata, Processor.Parameters parameters) { + if (indexMetadata.getAliases().isEmpty() == false) { + return Optional.of(aliasedIndexPipeline); + } + return Optional.empty(); + } + }; + + String indexName = "idx"; + IndexMetadata.Builder indexMetadata = IndexMetadata.builder(indexName) + .settings(settings(IndexVersion.current())) + .numberOfShards(1) + .numberOfReplicas(0); + + ClusterState clusterStateWithIndex = ClusterState.builder(ClusterName.DEFAULT) + .metadata(Metadata.builder().put(indexMetadata)) + .build(); + ClusterState clusterStateWithoutIndex = ClusterState.builder(ClusterName.DEFAULT).metadata(Metadata.builder().build()).build(); + ; + + IngestService ingestService = createIngestService(List.of(ingestPluginWithPipeline, ingestPluginForAliasedIndex, DUMMY_PLUGIN)); + + // Initial state, no plugin pipelines without cluster state changes + assertThat(ingestService.getPluginsPipelines().size(), equalTo(0)); + + // Index is added - create plugin pipeline + ingestService.applyClusterState(new ClusterChangedEvent("", clusterStateWithIndex, clusterStateWithoutIndex)); + + assertThat(ingestService.getPluginsPipelines(indexName), equalTo(List.of(ingestPluginPipeline))); + assertThat(ingestService.getPluginsPipelines().size(), equalTo(1)); + + // Index is updated with an alias - update plugin pipeline + IndexMetadata.Builder aliasedIndexMetadata = IndexMetadata.builder(indexName) + .settings(settings(IndexVersion.current())) + .numberOfShards(1) + .numberOfReplicas(0) + .putAlias(AliasMetadata.builder("alias")); + + ClusterState clusterStateWithAliasedIndex = ClusterState.builder(ClusterName.DEFAULT) + .metadata(Metadata.builder().put(aliasedIndexMetadata).build()) + .build(); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterStateWithAliasedIndex, clusterStateWithIndex)); + + assertThat(ingestService.getPluginsPipelines(indexName), equalTo(List.of(ingestPluginPipeline, aliasedIndexPipeline))); + assertThat(ingestService.getPluginsPipelines().size(), equalTo(1)); + + // Index is removed - remove plugin pipeline + ingestService.applyClusterState(new ClusterChangedEvent("", clusterStateWithoutIndex, clusterStateWithAliasedIndex)); + + assertThat(ingestService.getPluginsPipelines().size(), equalTo(0)); + } + + public void testIngestPluginsContributePipelinesToIndices() { + + Processor processor = new TestProcessor("tag", "type", "description", (ingestDocument) -> {}); + Pipeline pipeline = new Pipeline("test-pipeline", null, null, null, new CompoundProcessor(processor)); + + IngestPlugin ingestPlugin = new IngestPlugin() { + public Optional getIngestPipeline(IndexMetadata indexMetadata, Processor.Parameters parameters) { + return Optional.of(pipeline); + } + }; + + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); + ClusterState previousClusterState = clusterState; + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + assertThat(ingestService.pipelines().size(), is(0)); + + } + + public static IngestService createIngestService() { + return createIngestService(List.of(DUMMY_PLUGIN)); + } + + private static IngestService createIngestService(List ingestPlugins) { + Client client = mock(Client.class); + + return new IngestService( + mock(ClusterService.class), + createThreadPool(), + null, + null, + null, + ingestPlugins, + client, + null, + () -> DocumentParsingObserver.EMPTY_INSTANCE + ); + } + + private static ThreadPool createThreadPool() { + ThreadPool threadPool = mock(ThreadPool.class); + when(threadPool.generic()).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE); + when(threadPool.executor(anyString())).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE); + + return threadPool; + } + private static Tuple randomMapEntry() { return tuple(randomAlphaOfLength(5), randomObject()); } @@ -2596,12 +2663,9 @@ private static IngestService createWithProcessors( Supplier documentParsingObserverSupplier ) { Client client = mock(Client.class); - ThreadPool threadPool = mock(ThreadPool.class); - when(threadPool.generic()).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE); - when(threadPool.executor(anyString())).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE); IngestService ingestService = new IngestService( mock(ClusterService.class), - threadPool, + createThreadPool(), null, null, null,