Skip to content

Commit

Permalink
Start adding tests to IngestService that exercise plugin pipelines up…
Browse files Browse the repository at this point in the history
…dates
  • Loading branch information
carlosdelest committed Dec 14, 2023
1 parent fbdbbfa commit 819adeb
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ public Pipeline getPipeline(String id) {
}
}

private List<Pipeline> getPluginsPipelines(String id) {
List<Pipeline> getPluginsPipelines(String id) {
if (id == null) {
return null;
}
Expand Down Expand Up @@ -1138,7 +1138,7 @@ public void applyClusterState(final ClusterChangedEvent event) {
updatePluginPipelines(event);
}

private synchronized void updatePluginPipelines(ClusterChangedEvent event) {
synchronized void updatePluginPipelines(ClusterChangedEvent event) {

Map<String, List<Pipeline>> updatedPluginPipelines = new HashMap<>(pluginPipelines);

Expand Down Expand Up @@ -1172,6 +1172,10 @@ private synchronized void updatePluginPipelines(ClusterChangedEvent event) {
pluginPipelines = Map.copyOf(updatedPluginPipelines);
}

Map<String, List<Pipeline>> getPluginsPipelines() {
return pluginPipelines;
}

synchronized void innerUpdatePipelines(IngestMetadata newIngestMetadata) {
Map<String, PipelineHolder> existingPipelines = this.pipelines;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
Expand All @@ -31,7 +30,6 @@
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;
Expand All @@ -42,9 +40,7 @@
import org.elasticsearch.indices.EmptySystemIndices;
import org.elasticsearch.indices.SystemIndexDescriptorUtils;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.ingest.IngestServiceTests;
import org.elasticsearch.plugins.internal.DocumentParsingObserver;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.test.index.IndexVersionUtils;
Expand All @@ -66,9 +62,6 @@
import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class TransportBulkActionTests extends ESTestCase {

Expand Down
170 changes: 117 additions & 53 deletions server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -132,26 +133,11 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
}
};

private ThreadPool threadPool;
private IngestService ingestService;

@Before
public void setup() {
threadPool = mock(ThreadPool.class);
when(threadPool.generic()).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE);
when(threadPool.executor(anyString())).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE);
Client client = mock(Client.class);
ingestService = new IngestService(
mock(ClusterService.class),
threadPool,
null,
null,
null,
List.of(DUMMY_PLUGIN),
client,
null,
() -> DocumentParsingObserver.EMPTY_INSTANCE
);
ingestService = createIngestService();
}

public void testIngestPlugin() {
Expand All @@ -164,17 +150,7 @@ public void testIngestPluginDuplicate() {
Client client = mock(Client.class);
IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> new IngestService(
mock(ClusterService.class),
threadPool,
null,
null,
null,
List.of(DUMMY_PLUGIN, DUMMY_PLUGIN),
client,
null,
() -> DocumentParsingObserver.EMPTY_INSTANCE
)
() -> createIngestService(List.of(DUMMY_PLUGIN, DUMMY_PLUGIN))
);
assertTrue(e.getMessage(), e.getMessage().contains("already registered"));
}
Expand Down Expand Up @@ -1962,17 +1938,7 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet

// Create ingest service:
Client client = mock(Client.class);
IngestService ingestService = new IngestService(
mock(ClusterService.class),
threadPool,
null,
null,
null,
List.of(testPlugin),
client,
null,
() -> DocumentParsingObserver.EMPTY_INSTANCE
);
IngestService ingestService = createIngestService(List.of(testPlugin));
ingestService.addIngestClusterStateListener(ingestClusterStateListener);

// Create pipeline and apply the resulting cluster state, which should update the counter in the right order:
Expand Down Expand Up @@ -2305,17 +2271,6 @@ private void testUpdatingPipeline(String pipelineString) throws Exception {
Client client = mock(Client.class);
ClusterService clusterService = mock(ClusterService.class);
when(clusterService.state()).thenReturn(clusterState);
IngestService ingestService = new IngestService(
clusterService,
threadPool,
null,
null,
null,
List.of(DUMMY_PLUGIN),
client,
null,
() -> DocumentParsingObserver.EMPTY_INSTANCE
);
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, clusterState));

CountDownLatch latch = new CountDownLatch(1);
Expand Down Expand Up @@ -2548,6 +2503,118 @@ public void testResolvePipelinesWithNonePipeline() {
}
}

public void testPluginPipelinesUpdated() {
Processor processor = new TestProcessor("tag", "type", "description", (ingestDocument) -> {});
Pipeline ingestPluginPipeline = new Pipeline("test-pipeline-1", null, null, null, new CompoundProcessor(processor));
Pipeline aliasedIndexPipeline = new Pipeline("test-pipeline-2", null, null, null, new CompoundProcessor(processor));

// Ingest plugin that always return a pipeline
IngestPlugin ingestPluginWithPipeline = new IngestPlugin() {
public Optional<Pipeline> getIngestPipeline(IndexMetadata indexMetadata, Processor.Parameters parameters) {
return Optional.of(ingestPluginPipeline);
}
};

// Ingest plugin that returns a pipeline only when index has aliases
IngestPlugin ingestPluginForAliasedIndex = new IngestPlugin() {
public Optional<Pipeline> getIngestPipeline(IndexMetadata indexMetadata, Processor.Parameters parameters) {
if (indexMetadata.getAliases().isEmpty() == false) {
return Optional.of(aliasedIndexPipeline);
}
return Optional.empty();
}
};

String indexName = "idx";
IndexMetadata.Builder indexMetadata = IndexMetadata.builder(indexName)
.settings(settings(IndexVersion.current()))
.numberOfShards(1)
.numberOfReplicas(0);

ClusterState clusterStateWithIndex = ClusterState.builder(ClusterName.DEFAULT)
.metadata(Metadata.builder().put(indexMetadata))
.build();
ClusterState clusterStateWithoutIndex = ClusterState.builder(ClusterName.DEFAULT).metadata(Metadata.builder().build()).build();
;

IngestService ingestService = createIngestService(List.of(ingestPluginWithPipeline, ingestPluginForAliasedIndex, DUMMY_PLUGIN));

// Initial state, no plugin pipelines without cluster state changes
assertThat(ingestService.getPluginsPipelines().size(), equalTo(0));

// Index is added - create plugin pipeline
ingestService.applyClusterState(new ClusterChangedEvent("", clusterStateWithIndex, clusterStateWithoutIndex));

assertThat(ingestService.getPluginsPipelines(indexName), equalTo(List.of(ingestPluginPipeline)));
assertThat(ingestService.getPluginsPipelines().size(), equalTo(1));

// Index is updated with an alias - update plugin pipeline
IndexMetadata.Builder aliasedIndexMetadata = IndexMetadata.builder(indexName)
.settings(settings(IndexVersion.current()))
.numberOfShards(1)
.numberOfReplicas(0)
.putAlias(AliasMetadata.builder("alias"));

ClusterState clusterStateWithAliasedIndex = ClusterState.builder(ClusterName.DEFAULT)
.metadata(Metadata.builder().put(aliasedIndexMetadata).build())
.build();
ingestService.applyClusterState(new ClusterChangedEvent("", clusterStateWithAliasedIndex, clusterStateWithIndex));

assertThat(ingestService.getPluginsPipelines(indexName), equalTo(List.of(ingestPluginPipeline, aliasedIndexPipeline)));
assertThat(ingestService.getPluginsPipelines().size(), equalTo(1));

// Index is removed - remove plugin pipeline
ingestService.applyClusterState(new ClusterChangedEvent("", clusterStateWithoutIndex, clusterStateWithAliasedIndex));

assertThat(ingestService.getPluginsPipelines().size(), equalTo(0));
}

public void testIngestPluginsContributePipelinesToIndices() {

Processor processor = new TestProcessor("tag", "type", "description", (ingestDocument) -> {});
Pipeline pipeline = new Pipeline("test-pipeline", null, null, null, new CompoundProcessor(processor));

IngestPlugin ingestPlugin = new IngestPlugin() {
public Optional<Pipeline> getIngestPipeline(IndexMetadata indexMetadata, Processor.Parameters parameters) {
return Optional.of(pipeline);
}
};

ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build();
ClusterState previousClusterState = clusterState;
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
assertThat(ingestService.pipelines().size(), is(0));

}

public static IngestService createIngestService() {
return createIngestService(List.of(DUMMY_PLUGIN));
}

private static IngestService createIngestService(List<IngestPlugin> ingestPlugins) {
Client client = mock(Client.class);

return new IngestService(
mock(ClusterService.class),
createThreadPool(),
null,
null,
null,
ingestPlugins,
client,
null,
() -> DocumentParsingObserver.EMPTY_INSTANCE
);
}

private static ThreadPool createThreadPool() {
ThreadPool threadPool = mock(ThreadPool.class);
when(threadPool.generic()).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE);
when(threadPool.executor(anyString())).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE);

return threadPool;
}

private static Tuple<String, Object> randomMapEntry() {
return tuple(randomAlphaOfLength(5), randomObject());
}
Expand Down Expand Up @@ -2596,12 +2663,9 @@ private static IngestService createWithProcessors(
Supplier<DocumentParsingObserver> documentParsingObserverSupplier
) {
Client client = mock(Client.class);
ThreadPool threadPool = mock(ThreadPool.class);
when(threadPool.generic()).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE);
when(threadPool.executor(anyString())).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE);
IngestService ingestService = new IngestService(
mock(ClusterService.class),
threadPool,
createThreadPool(),
null,
null,
null,
Expand Down

0 comments on commit 819adeb

Please sign in to comment.