Skip to content

Commit

Permalink
Fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
carlosdelest committed Dec 19, 2023
1 parent 243c578 commit 7d9c6bf
Showing 3 changed files with 45 additions and 25 deletions.
Original file line number Diff line number Diff line change
@@ -112,6 +112,7 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
// processor factories rely on other node services. Custom metadata is statically registered when classes
// are loaded, so in the cluster state we just save the pipeline config and here we keep the actual pipelines around.
private volatile Map<String, PipelineHolder> pipelines = Map.of();
// Plugin contributed pipelines, that are executed after the default and final pipelines.
private volatile Map<String, List<Pipeline>> pluginPipelines = Map.of();

private final ThreadPool threadPool;
@@ -1137,6 +1138,12 @@ public void applyClusterState(final ClusterChangedEvent event) {
updatePluginPipelines(event);
}

/**
* Updates plugin pipelines based on the current cluster state. It will ask plugins for pipelines
* for each index in the cluster state and update the pluginPipelines map with new / removed pipelines.
*
* @param event cluster changed event
*/
synchronized void updatePluginPipelines(ClusterChangedEvent event) {

Map<String, List<Pipeline>> updatedPluginPipelines = new HashMap<>(pluginPipelines);
@@ -1439,7 +1446,8 @@ private static Optional<Pipelines> resolvePipelinesFromIndexTemplates(IndexReque
defaultPipeline = Objects.requireNonNullElse(defaultPipeline, NOOP_PIPELINE_NAME);
finalPipeline = Objects.requireNonNullElse(finalPipeline, NOOP_PIPELINE_NAME);

// TODO We can't resolve yet the plugins pipeline as we don't have the IndexMetadata. Check if we can do it.
// TODO We're not adding plugin pipelines here yet - we could have a separate method that contains the ComposableIndextemplate
// so plugins can decide to add pipelines based on template information
return Optional.of(new Pipelines(defaultPipeline, finalPipeline, NOOP_PIPELINE_NAME));
}

Original file line number Diff line number Diff line change
@@ -2109,7 +2109,7 @@ public void testResolveRequiredOrDefaultPipelineDefaultPipeline() {
.numberOfShards(1)
.numberOfReplicas(0)
.putAlias(AliasMetadata.builder("alias").writeIndex(true).build());
Metadata metadata = Metadata.builder().put(builder).build();
setIndexMetadataForIngestService(Metadata.builder().put(builder));

// index name matches with IDM:
IndexRequest indexRequest = new IndexRequest("idx");
@@ -2129,7 +2129,7 @@ public void testResolveRequiredOrDefaultPipelineDefaultPipeline() {
IndexTemplateMetadata.Builder templateBuilder = IndexTemplateMetadata.builder("name1")
.patterns(List.of("id*"))
.settings(settings(IndexVersion.current()).put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default-pipeline"));
metadata = Metadata.builder().put(templateBuilder).build();
setIndexMetadataForIngestService(Metadata.builder().put(templateBuilder));
indexRequest = new IndexRequest("idx");
ingestService.resolvePipelinesAndUpdateIndexRequest(indexRequest, indexRequest);
assertTrue(IngestService.hasPipeline(indexRequest));
@@ -2143,7 +2143,7 @@ public void testResolveFinalPipeline() {
.numberOfShards(1)
.numberOfReplicas(0)
.putAlias(AliasMetadata.builder("alias").writeIndex(true).build());
Metadata metadata = Metadata.builder().put(builder).build();
setIndexMetadataForIngestService(Metadata.builder().put(builder));

// index name matches with IDM:
IndexRequest indexRequest = new IndexRequest("idx");
@@ -2165,7 +2165,7 @@ public void testResolveFinalPipeline() {
IndexTemplateMetadata.Builder templateBuilder = IndexTemplateMetadata.builder("name1")
.patterns(List.of("id*"))
.settings(settings(IndexVersion.current()).put(IndexSettings.FINAL_PIPELINE.getKey(), "final-pipeline"));
metadata = Metadata.builder().put(templateBuilder).build();
setIndexMetadataForIngestService(Metadata.builder().put(templateBuilder));
indexRequest = new IndexRequest("idx");
ingestService.resolvePipelinesAndUpdateIndexRequest(indexRequest, indexRequest);
assertTrue(IngestService.hasPipeline(indexRequest));
@@ -2181,7 +2181,7 @@ public void testResolveFinalPipelineWithDateMathExpression() {
.settings(settings(IndexVersion.current()).put(IndexSettings.FINAL_PIPELINE.getKey(), "final-pipeline"))
.numberOfShards(1)
.numberOfReplicas(0);
Metadata metadata = Metadata.builder().put(builder).build();
setIndexMetadataForIngestService(Metadata.builder().put(builder));

// index name matches with IDM:
IndexRequest indexRequest = new IndexRequest("<idx-{now/d}>");
@@ -2195,7 +2195,8 @@ public void testResolveFinalPipelineWithDateMathExpression() {
public void testResolveRequestOrDefaultPipelineAndFinalPipeline() {
// no pipeline:
{
Metadata metadata = Metadata.builder().build();
setIndexMetadataForIngestService(Metadata.builder());

IndexRequest indexRequest = new IndexRequest("idx");
ingestService.resolvePipelinesAndUpdateIndexRequest(indexRequest, indexRequest);
assertFalse(IngestService.hasPipeline(indexRequest));
@@ -2206,7 +2207,8 @@ public void testResolveRequestOrDefaultPipelineAndFinalPipeline() {

// request pipeline:
{
Metadata metadata = Metadata.builder().build();
setIndexMetadataForIngestService(Metadata.builder());

IndexRequest indexRequest = new IndexRequest("idx").setPipeline("request-pipeline");
ingestService.resolvePipelinesAndUpdateIndexRequest(indexRequest, indexRequest);
assertTrue(IngestService.hasPipeline(indexRequest));
@@ -2221,7 +2223,8 @@ public void testResolveRequestOrDefaultPipelineAndFinalPipeline() {
.settings(settings(IndexVersion.current()).put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default-pipeline"))
.numberOfShards(1)
.numberOfReplicas(0);
Metadata metadata = Metadata.builder().put(builder).build();
setIndexMetadataForIngestService(Metadata.builder().put(builder));

IndexRequest indexRequest = new IndexRequest("idx").setPipeline("request-pipeline");
ingestService.resolvePipelinesAndUpdateIndexRequest(indexRequest, indexRequest);
assertTrue(IngestService.hasPipeline(indexRequest));
@@ -2236,7 +2239,8 @@ public void testResolveRequestOrDefaultPipelineAndFinalPipeline() {
.settings(settings(IndexVersion.current()).put(IndexSettings.FINAL_PIPELINE.getKey(), "final-pipeline"))
.numberOfShards(1)
.numberOfReplicas(0);
Metadata metadata = Metadata.builder().put(builder).build();
setIndexMetadataForIngestService(Metadata.builder().put(builder));

IndexRequest indexRequest = new IndexRequest("idx").setPipeline("request-pipeline");
ingestService.resolvePipelinesAndUpdateIndexRequest(indexRequest, indexRequest);
assertTrue(IngestService.hasPipeline(indexRequest));
@@ -2247,6 +2251,12 @@ public void testResolveRequestOrDefaultPipelineAndFinalPipeline() {
}
}

private void setIndexMetadataForIngestService(Metadata.Builder builder) {
Metadata metadata = builder.build();
ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).metadata(metadata).build();
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, clusterState));
}

public void testUpdatingRandomPipelineWithoutChangesIsNoOp() throws Exception {
var randomMap = randomMap(10, 50, IngestServiceTests::randomMapEntry);

@@ -2455,7 +2465,7 @@ public void testResolvePipelinesWithPluginsPipeline() {
.settings(settings(IndexVersion.current()).put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default-pipeline"))
.numberOfShards(1)
.numberOfReplicas(0);
Metadata metadata = Metadata.builder().put(builder).build();
setIndexMetadataForIngestService(Metadata.builder().put(builder));
IndexRequest indexRequest = new IndexRequest("idx").setPipeline("request-pipeline");
ingestService.resolvePipelinesAndUpdateIndexRequest(indexRequest, indexRequest);
assertTrue(IngestService.hasPipeline(indexRequest));
@@ -2470,7 +2480,7 @@ public void testResolvePipelinesWithPluginsPipeline() {
.settings(settings(IndexVersion.current()).put(IndexSettings.FINAL_PIPELINE.getKey(), "final-pipeline"))
.numberOfShards(1)
.numberOfReplicas(0);
Metadata metadata = Metadata.builder().put(builder).build();
setIndexMetadataForIngestService(Metadata.builder().put(builder));
IndexRequest indexRequest = new IndexRequest("idx").setPipeline("request-pipeline");
ingestService.resolvePipelinesAndUpdateIndexRequest(indexRequest, indexRequest);
assertTrue(IngestService.hasPipeline(indexRequest));
@@ -2544,7 +2554,7 @@ public void testResolvePipelinesWithNonePipeline() {
.settings(settings(IndexVersion.current()).put(IndexSettings.FINAL_PIPELINE.getKey(), "final-pipeline"))
.numberOfShards(1)
.numberOfReplicas(0);
Metadata metadata = Metadata.builder().put(builder).build();
setIndexMetadataForIngestService(Metadata.builder().put(builder));
IndexRequest indexRequest = new IndexRequest("idx").setPipeline(NOOP_PIPELINE_NAME);
ingestService.resolvePipelinesAndUpdateIndexRequest(indexRequest, indexRequest);
assertTrue(IngestService.hasPipeline(indexRequest));
@@ -2560,7 +2570,7 @@ public void testResolvePipelinesWithNonePipeline() {
.settings(settings(IndexVersion.current()).put(IndexSettings.FINAL_PIPELINE.getKey(), NOOP_PIPELINE_NAME))
.numberOfShards(1)
.numberOfReplicas(0);
Metadata metadata = Metadata.builder().put(builder).build();
setIndexMetadataForIngestService(Metadata.builder().put(builder));
IndexRequest indexRequest = new IndexRequest("idx").setPipeline("pipeline1");
ingestService.resolvePipelinesAndUpdateIndexRequest(indexRequest, indexRequest);
assertTrue(IngestService.hasPipeline(indexRequest));
Original file line number Diff line number Diff line change
@@ -1921,23 +1921,25 @@ protected void assertSnapshotOrGenericThread() {
final MappingUpdatedAction mappingUpdatedAction = new MappingUpdatedAction(settings, clusterSettings);
final IndexingPressure indexingMemoryLimits = new IndexingPressure(settings);
mappingUpdatedAction.setClient(client);
IngestService ingestService = new IngestService(
clusterService,
threadPool,
environment,
scriptService,
new AnalysisModule(environment, Collections.emptyList(), new StablePluginsRegistry()).getAnalysisRegistry(),
Collections.emptyList(),
client,
null,
() -> DocumentParsingObserver.EMPTY_INSTANCE
);
ingestService.applyClusterState(new ClusterChangedEvent("test", ClusterState.EMPTY_STATE, ClusterState.EMPTY_STATE));
actions.put(
BulkAction.INSTANCE,
new TransportBulkAction(
threadPool,
transportService,
clusterService,
new IngestService(
clusterService,
threadPool,
environment,
scriptService,
new AnalysisModule(environment, Collections.emptyList(), new StablePluginsRegistry()).getAnalysisRegistry(),
Collections.emptyList(),
client,
null,
() -> DocumentParsingObserver.EMPTY_INSTANCE
),
ingestService,
client,
actionFilters,
indexNameExpressionResolver,

0 comments on commit 7d9c6bf

Please sign in to comment.