Skip to content

Commit

Permalink
Delete all ingest processor pipelines
Browse files Browse the repository at this point in the history
  • Loading branch information
davidkyle committed Nov 11, 2024
1 parent ce27690 commit 1abe6a3
Showing 1 changed file with 21 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.test.rest.ESRestTestCase;

import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;

public class MlRestTestStateCleaner {

Expand All @@ -30,35 +31,34 @@ public MlRestTestStateCleaner(Logger logger, RestClient adminClient) {
}

public void resetFeatures() throws IOException {
deletePipelinesWithInferenceProcessors();
// This resets all features, not just ML, but they should have been getting reset between tests anyway so it shouldn't matter
adminClient.performRequest(new Request("POST", "/_features/_reset"));
}

@SuppressWarnings("unchecked")
private void deleteAllTrainedModelIngestPipelines() throws IOException {
final Request getAllTrainedModelStats = new Request("GET", "/_ml/trained_models/_stats");
getAllTrainedModelStats.addParameter("size", "10000");
final Response trainedModelsStatsResponse = adminClient.performRequest(getAllTrainedModelStats);
private void deletePipelinesWithInferenceProcessors() throws IOException {
final Response pipelinesResponse = adminClient.performRequest(new Request("GET", "/_ingest/pipeline"));
final Map<String, Object> pipelines = ESRestTestCase.entityAsMap(pipelinesResponse);

var pipelinesWithInferenceProcessors = new HashSet<String>();
for (var entry : pipelines.entrySet()) {
var pipelineDef = (Map<String, Object>) entry.getValue(); // each top level object is a separate pipeline
var processors = (List<Map<String, Object>>) pipelineDef.get("processors");
for (var processor : processors) {
assertThat(processor.entrySet(), hasSize(1));
if ("inference".equals(processor.keySet().iterator().next())) {
pipelinesWithInferenceProcessors.add(entry.getKey());
}
}
}

final List<Map<String, Object>> pipelines = (List<Map<String, Object>>) XContentMapValues.extractValue(
"trained_model_stats.ingest.pipelines",
ESRestTestCase.entityAsMap(trainedModelsStatsResponse)
);
Set<String> pipelineIds = pipelines.stream().flatMap(m -> m.keySet().stream()).collect(Collectors.toSet());
for (String pipelineId : pipelineIds) {
for (String pipelineId : pipelinesWithInferenceProcessors) {
try {
adminClient.performRequest(new Request("DELETE", "/_ingest/pipeline/" + pipelineId));
} catch (Exception ex) {
logger.warn(() -> "failed to delete pipeline [" + pipelineId + "]", ex);
}
}
}

private void waitForMlStatsIndexToInitialize() throws IOException {
ESRestTestCase.ensureHealth(adminClient, ".ml-stats-*", (request) -> {
request.addParameter("wait_for_no_initializing_shards", "true");
request.addParameter("level", "shards");
request.addParameter("timeout", "30s");
});
}
}

0 comments on commit 1abe6a3

Please sign in to comment.