From e9d925e12728943b14f3d73e9db2ec41d1d161e5 Mon Sep 17 00:00:00 2001 From: Luigi Dell'Aquila Date: Fri, 6 Dec 2024 11:15:49 +0100 Subject: [PATCH 1/3] ES|QL: make ignoreOrder parsing more strict in CSV tests (#118136) --- muted-tests.yml | 2 -- .../java/org/elasticsearch/xpack/esql/CsvSpecReader.java | 7 ++++++- .../testFixtures/src/main/resources/lookup-join.csv-spec | 8 ++++---- 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/muted-tests.yml b/muted-tests.yml index e5199b0a1e238..e4cd94b221536 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -242,8 +242,6 @@ tests: - class: org.elasticsearch.packaging.test.ArchiveTests method: test40AutoconfigurationNotTriggeredWhenNodeIsMeantToJoinExistingCluster issue: https://github.com/elastic/elasticsearch/issues/118029 -- class: org.elasticsearch.xpack.esql.qa.multi_node.EsqlSpecIT - issue: https://github.com/elastic/elasticsearch/issues/117981 - class: org.elasticsearch.packaging.test.ConfigurationTests method: test30SymlinkedDataPath issue: https://github.com/elastic/elasticsearch/issues/118111 diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvSpecReader.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvSpecReader.java index 84e06e0c1b674..ba0d11059a69b 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvSpecReader.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvSpecReader.java @@ -80,7 +80,12 @@ public Object parse(String line) { testCase.expectedWarningsRegexString.add(regex); testCase.expectedWarningsRegex.add(warningRegexToPattern(regex)); } else if (lower.startsWith("ignoreorder:")) { - testCase.ignoreOrder = Boolean.parseBoolean(line.substring("ignoreOrder:".length()).trim()); + String value = lower.substring("ignoreOrder:".length()).trim(); + if ("true".equals(value)) { + testCase.ignoreOrder = true; + } else if ("false".equals(value) == false) { + throw new IllegalArgumentException("Invalid value for ignoreOrder: [" + value + "], it can only be true or false"); + } } else if (line.startsWith(";")) { testCase.expectedResults = data.toString(); // clean-up and emit diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec index 584cde55080ef..2d4c105cfff20 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec @@ -140,7 +140,7 @@ FROM sample_data | EVAL client_ip = client_ip::keyword | LOOKUP JOIN clientips_lookup ON client_ip ; -ignoreOrder:true; +ignoreOrder:true @timestamp:date | event_duration:long | message:keyword | client_ip:keyword | env:keyword 2023-10-23T13:55:01.543Z | 1756467 | Connected to 10.1.0.1 | 172.21.3.15 | Production @@ -160,7 +160,7 @@ FROM sample_data | LOOKUP JOIN clientips_lookup ON client_ip | KEEP @timestamp, client_ip, event_duration, message, env ; -ignoreOrder:true; +ignoreOrder:true @timestamp:date | client_ip:keyword | event_duration:long | message:keyword | env:keyword 2023-10-23T13:55:01.543Z | 172.21.3.15 | 1756467 | Connected to 10.1.0.1 | Production @@ -245,7 +245,7 @@ required_capability: join_lookup_v4 FROM sample_data | LOOKUP JOIN message_types_lookup ON message ; -ignoreOrder:true; +ignoreOrder:true @timestamp:date | client_ip:ip | event_duration:long | message:keyword | type:keyword 2023-10-23T13:55:01.543Z | 172.21.3.15 | 1756467 | Connected to 10.1.0.1 | Success @@ -264,7 +264,7 @@ FROM sample_data | LOOKUP JOIN message_types_lookup ON message | KEEP @timestamp, client_ip, event_duration, message, type ; -ignoreOrder:true; +ignoreOrder:true @timestamp:date | client_ip:ip | event_duration:long | message:keyword | type:keyword 2023-10-23T13:55:01.543Z | 172.21.3.15 | 1756467 | Connected to 10.1.0.1 | Success From 34ea8f365f16b70a31693e6ce8dda3a8dfd13d92 Mon Sep 17 00:00:00 2001 From: Dimitris Rempapis Date: Fri, 6 Dec 2024 12:40:58 +0200 Subject: [PATCH 2/3] Search Queries in parallel assertHitcount - part 1 (#117467) Update tests applying an optimization in assertions --- .../elasticsearch/cluster/NoMasterNodeIT.java | 9 +-- .../indices/IndicesOptionsIntegrationIT.java | 7 ++- .../recovery/IndexPrimaryRelocationIT.java | 6 +- .../template/SimpleIndexTemplateIT.java | 7 +-- .../search/nested/SimpleNestedIT.java | 24 ++++---- .../search/scroll/SearchScrollIT.java | 32 +++++++--- .../search/simple/SimpleSearchIT.java | 61 +++++++++++-------- .../snapshots/RestoreSnapshotIT.java | 21 +++++-- 8 files changed, 99 insertions(+), 68 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/NoMasterNodeIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/NoMasterNodeIT.java index 13515d34ec65f..545b38f30ba94 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/NoMasterNodeIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/NoMasterNodeIT.java @@ -261,10 +261,11 @@ public void testNoMasterActionsWriteMasterBlock() throws Exception { GetResponse getResponse = clientToMasterlessNode.prepareGet("test1", "1").get(); assertExists(getResponse); - assertHitCount(clientToMasterlessNode.prepareSearch("test1").setAllowPartialSearchResults(true).setSize(0), 1L); - - logger.info("--> here 3"); - assertHitCount(clientToMasterlessNode.prepareSearch("test1").setAllowPartialSearchResults(true), 1L); + assertHitCount( + 1L, + clientToMasterlessNode.prepareSearch("test1").setAllowPartialSearchResults(true).setSize(0), + clientToMasterlessNode.prepareSearch("test1").setAllowPartialSearchResults(true) + ); assertResponse(clientToMasterlessNode.prepareSearch("test2").setAllowPartialSearchResults(true).setSize(0), countResponse -> { assertThat(countResponse.getTotalShards(), equalTo(3)); diff --git a/server/src/internalClusterTest/java/org/elasticsearch/indices/IndicesOptionsIntegrationIT.java b/server/src/internalClusterTest/java/org/elasticsearch/indices/IndicesOptionsIntegrationIT.java index f41277c5b80ca..545ed83bb79c8 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/indices/IndicesOptionsIntegrationIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/indices/IndicesOptionsIntegrationIT.java @@ -398,8 +398,11 @@ public void testWildcardBehaviourSnapshotRestore() throws Exception { public void testAllMissingLenient() throws Exception { createIndex("test1"); prepareIndex("test1").setId("1").setSource("k", "v").setRefreshPolicy(IMMEDIATE).get(); - assertHitCount(prepareSearch("test2").setIndicesOptions(IndicesOptions.lenientExpandOpen()).setQuery(matchAllQuery()), 0L); - assertHitCount(prepareSearch("test2", "test3").setQuery(matchAllQuery()).setIndicesOptions(IndicesOptions.lenientExpandOpen()), 0L); + assertHitCount( + 0L, + prepareSearch("test2").setIndicesOptions(IndicesOptions.lenientExpandOpen()).setQuery(matchAllQuery()), + prepareSearch("test2", "test3").setQuery(matchAllQuery()).setIndicesOptions(IndicesOptions.lenientExpandOpen()) + ); // you should still be able to run empty searches without things blowing up assertHitCount(prepareSearch().setIndicesOptions(IndicesOptions.lenientExpandOpen()).setQuery(matchAllQuery()), 1L); } diff --git a/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/IndexPrimaryRelocationIT.java b/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/IndexPrimaryRelocationIT.java index 581145d949cf9..debcf5c06a7d6 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/IndexPrimaryRelocationIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/IndexPrimaryRelocationIT.java @@ -98,11 +98,11 @@ public void run() { finished.set(true); indexingThread.join(); refresh("test"); - ElasticsearchAssertions.assertHitCount(prepareSearch("test").setTrackTotalHits(true), numAutoGenDocs.get()); ElasticsearchAssertions.assertHitCount( + numAutoGenDocs.get(), + prepareSearch("test").setTrackTotalHits(true), prepareSearch("test").setTrackTotalHits(true)// extra paranoia ;) - .setQuery(QueryBuilders.termQuery("auto", true)), - numAutoGenDocs.get() + .setQuery(QueryBuilders.termQuery("auto", true)) ); } diff --git a/server/src/internalClusterTest/java/org/elasticsearch/indices/template/SimpleIndexTemplateIT.java b/server/src/internalClusterTest/java/org/elasticsearch/indices/template/SimpleIndexTemplateIT.java index de9e3f28a2109..8496180e85d4e 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/indices/template/SimpleIndexTemplateIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/indices/template/SimpleIndexTemplateIT.java @@ -500,9 +500,7 @@ public void testIndexTemplateWithAliases() throws Exception { refresh(); - assertHitCount(prepareSearch("test_index"), 5L); - assertHitCount(prepareSearch("simple_alias"), 5L); - assertHitCount(prepareSearch("templated_alias-test_index"), 5L); + assertHitCount(5L, prepareSearch("test_index"), prepareSearch("simple_alias"), prepareSearch("templated_alias-test_index")); assertResponse(prepareSearch("filtered_alias"), response -> { assertHitCount(response, 1L); @@ -584,8 +582,7 @@ public void testIndexTemplateWithAliasesSource() { prepareIndex("test_index").setId("2").setSource("field", "value2").get(); refresh(); - assertHitCount(prepareSearch("test_index"), 2L); - assertHitCount(prepareSearch("alias1"), 2L); + assertHitCount(2L, prepareSearch("test_index"), prepareSearch("alias1")); assertResponse(prepareSearch("alias2"), response -> { assertHitCount(response, 1L); diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/nested/SimpleNestedIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/nested/SimpleNestedIT.java index 8225386ed02d2..acfc55a740f1e 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/search/nested/SimpleNestedIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/nested/SimpleNestedIT.java @@ -53,6 +53,7 @@ import static org.hamcrest.Matchers.startsWith; public class SimpleNestedIT extends ESIntegTestCase { + public void testSimpleNested() throws Exception { assertAcked(prepareCreate("test").setMapping("nested1", "type=nested")); ensureGreen(); @@ -87,21 +88,20 @@ public void testSimpleNested() throws Exception { // check the numDocs assertDocumentCount("test", 3); - assertHitCount(prepareSearch("test").setQuery(termQuery("n_field1", "n_value1_1")), 0L); - - // search for something that matches the nested doc, and see that we don't find the nested doc - assertHitCount(prepareSearch("test"), 1L); - assertHitCount(prepareSearch("test").setQuery(termQuery("n_field1", "n_value1_1")), 0L); + assertHitCount( + 0L, + prepareSearch("test").setQuery(termQuery("n_field1", "n_value1_1")), + prepareSearch("test").setQuery(termQuery("n_field1", "n_value1_1")) + ); - // now, do a nested query - assertHitCountAndNoFailures( + assertHitCount( + 1L, + // search for something that matches the nested doc, and see that we don't find the nested doc + prepareSearch("test"), + // now, do a nested query prepareSearch("test").setQuery(nestedQuery("nested1", termQuery("nested1.n_field1", "n_value1_1"), ScoreMode.Avg)), - 1L - ); - assertHitCountAndNoFailures( prepareSearch("test").setQuery(nestedQuery("nested1", termQuery("nested1.n_field1", "n_value1_1"), ScoreMode.Avg)) - .setSearchType(SearchType.DFS_QUERY_THEN_FETCH), - 1L + .setSearchType(SearchType.DFS_QUERY_THEN_FETCH) ); // add another doc, one that would match if it was not nested... diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/scroll/SearchScrollIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/scroll/SearchScrollIT.java index 7ac24b77a4b6d..a54e19b839ad3 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/search/scroll/SearchScrollIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/scroll/SearchScrollIT.java @@ -206,11 +206,17 @@ public void testScrollAndUpdateIndex() throws Exception { indicesAdmin().prepareRefresh().get(); - assertHitCount(prepareSearch().setSize(0).setQuery(matchAllQuery()), 500); - assertHitCount(prepareSearch().setSize(0).setQuery(termQuery("message", "test")), 500); - assertHitCount(prepareSearch().setSize(0).setQuery(termQuery("message", "test")), 500); - assertHitCount(prepareSearch().setSize(0).setQuery(termQuery("message", "update")), 0); - assertHitCount(prepareSearch().setSize(0).setQuery(termQuery("message", "update")), 0); + assertHitCount( + 500, + prepareSearch().setSize(0).setQuery(matchAllQuery()), + prepareSearch().setSize(0).setQuery(termQuery("message", "test")), + prepareSearch().setSize(0).setQuery(termQuery("message", "test")) + ); + assertHitCount( + 0, + prepareSearch().setSize(0).setQuery(termQuery("message", "update")), + prepareSearch().setSize(0).setQuery(termQuery("message", "update")) + ); SearchResponse searchResponse = prepareSearch().setQuery(queryStringQuery("user:kimchy")) .setSize(35) @@ -229,11 +235,17 @@ public void testScrollAndUpdateIndex() throws Exception { } while (searchResponse.getHits().getHits().length > 0); indicesAdmin().prepareRefresh().get(); - assertHitCount(prepareSearch().setSize(0).setQuery(matchAllQuery()), 500); - assertHitCount(prepareSearch().setSize(0).setQuery(termQuery("message", "test")), 0); - assertHitCount(prepareSearch().setSize(0).setQuery(termQuery("message", "test")), 0); - assertHitCount(prepareSearch().setSize(0).setQuery(termQuery("message", "update")), 500); - assertHitCount(prepareSearch().setSize(0).setQuery(termQuery("message", "update")), 500); + assertHitCount( + 500, + prepareSearch().setSize(0).setQuery(matchAllQuery()), + prepareSearch().setSize(0).setQuery(termQuery("message", "update")), + prepareSearch().setSize(0).setQuery(termQuery("message", "update")) + ); + assertHitCount( + 0, + prepareSearch().setSize(0).setQuery(termQuery("message", "test")), + prepareSearch().setSize(0).setQuery(termQuery("message", "test")) + ); } finally { clearScroll(searchResponse.getScrollId()); searchResponse.decRef(); diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/simple/SimpleSearchIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/simple/SimpleSearchIT.java index e87c4790aa665..5a9be73d92268 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/search/simple/SimpleSearchIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/simple/SimpleSearchIT.java @@ -147,16 +147,22 @@ public void testIpCidr() throws Exception { prepareIndex("test").setId("5").setSource("ip", "2001:db8::ff00:42:8329").get(); refresh(); - assertHitCount(prepareSearch().setQuery(boolQuery().must(QueryBuilders.termQuery("ip", "192.168.0.1"))), 1L); - assertHitCount(prepareSearch().setQuery(queryStringQuery("ip: 192.168.0.1")), 1L); - assertHitCount(prepareSearch().setQuery(boolQuery().must(QueryBuilders.termQuery("ip", "192.168.0.1/32"))), 1L); + assertHitCount(prepareSearch().setQuery(boolQuery().must(QueryBuilders.termQuery("ip", "192.168.1.5/32"))), 0L); + assertHitCount( + 1L, + prepareSearch().setQuery(boolQuery().must(QueryBuilders.termQuery("ip", "192.168.0.1"))), + prepareSearch().setQuery(queryStringQuery("ip: 192.168.0.1")), + prepareSearch().setQuery(boolQuery().must(QueryBuilders.termQuery("ip", "192.168.0.1/32"))), + prepareSearch().setQuery(boolQuery().must(QueryBuilders.termQuery("ip", "2001:db8::ff00:42:8329/128"))), + prepareSearch().setQuery(boolQuery().must(QueryBuilders.termQuery("ip", "2001:db8::/64"))) + ); assertHitCount(prepareSearch().setQuery(boolQuery().must(QueryBuilders.termQuery("ip", "192.168.0.0/24"))), 3L); - assertHitCount(prepareSearch().setQuery(boolQuery().must(QueryBuilders.termQuery("ip", "192.0.0.0/8"))), 4L); - assertHitCount(prepareSearch().setQuery(boolQuery().must(QueryBuilders.termQuery("ip", "0.0.0.0/0"))), 4L); - assertHitCount(prepareSearch().setQuery(boolQuery().must(QueryBuilders.termQuery("ip", "2001:db8::ff00:42:8329/128"))), 1L); - assertHitCount(prepareSearch().setQuery(boolQuery().must(QueryBuilders.termQuery("ip", "2001:db8::/64"))), 1L); + assertHitCount( + 4L, + prepareSearch().setQuery(boolQuery().must(QueryBuilders.termQuery("ip", "192.0.0.0/8"))), + prepareSearch().setQuery(boolQuery().must(QueryBuilders.termQuery("ip", "0.0.0.0/0"))) + ); assertHitCount(prepareSearch().setQuery(boolQuery().must(QueryBuilders.termQuery("ip", "::/0"))), 5L); - assertHitCount(prepareSearch().setQuery(boolQuery().must(QueryBuilders.termQuery("ip", "192.168.1.5/32"))), 0L); assertFailures( prepareSearch().setQuery(boolQuery().must(QueryBuilders.termQuery("ip", "0/0/0/0/0"))), @@ -170,8 +176,11 @@ public void testSimpleId() { prepareIndex("test").setId("XXX1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); // id is not indexed, but lets see that we automatically convert to - assertHitCount(prepareSearch().setQuery(QueryBuilders.termQuery("_id", "XXX1")), 1L); - assertHitCount(prepareSearch().setQuery(QueryBuilders.queryStringQuery("_id:XXX1")), 1L); + assertHitCount( + 1L, + prepareSearch().setQuery(QueryBuilders.termQuery("_id", "XXX1")), + prepareSearch().setQuery(QueryBuilders.queryStringQuery("_id:XXX1")) + ); } public void testSimpleDateRange() throws Exception { @@ -324,12 +333,12 @@ public void testLargeFromAndSizeSucceeds() throws Exception { createIndex("idx"); indexRandom(true, prepareIndex("idx").setSource("{}", XContentType.JSON)); - assertHitCount(prepareSearch("idx").setFrom(IndexSettings.MAX_RESULT_WINDOW_SETTING.get(Settings.EMPTY) - 10), 1); - assertHitCount(prepareSearch("idx").setSize(IndexSettings.MAX_RESULT_WINDOW_SETTING.get(Settings.EMPTY)), 1); assertHitCount( + 1, + prepareSearch("idx").setFrom(IndexSettings.MAX_RESULT_WINDOW_SETTING.get(Settings.EMPTY) - 10), + prepareSearch("idx").setSize(IndexSettings.MAX_RESULT_WINDOW_SETTING.get(Settings.EMPTY)), prepareSearch("idx").setSize(IndexSettings.MAX_RESULT_WINDOW_SETTING.get(Settings.EMPTY) / 2) - .setFrom(IndexSettings.MAX_RESULT_WINDOW_SETTING.get(Settings.EMPTY) / 2 - 1), - 1 + .setFrom(IndexSettings.MAX_RESULT_WINDOW_SETTING.get(Settings.EMPTY) / 2 - 1) ); } @@ -340,12 +349,12 @@ public void testTooLargeFromAndSizeOkBySetting() throws Exception { ).get(); indexRandom(true, prepareIndex("idx").setSource("{}", XContentType.JSON)); - assertHitCount(prepareSearch("idx").setFrom(IndexSettings.MAX_RESULT_WINDOW_SETTING.get(Settings.EMPTY)), 1); - assertHitCount(prepareSearch("idx").setSize(IndexSettings.MAX_RESULT_WINDOW_SETTING.get(Settings.EMPTY) + 1), 1); assertHitCount( + 1, + prepareSearch("idx").setFrom(IndexSettings.MAX_RESULT_WINDOW_SETTING.get(Settings.EMPTY)), + prepareSearch("idx").setSize(IndexSettings.MAX_RESULT_WINDOW_SETTING.get(Settings.EMPTY) + 1), prepareSearch("idx").setSize(IndexSettings.MAX_RESULT_WINDOW_SETTING.get(Settings.EMPTY)) - .setFrom(IndexSettings.MAX_RESULT_WINDOW_SETTING.get(Settings.EMPTY)), - 1 + .setFrom(IndexSettings.MAX_RESULT_WINDOW_SETTING.get(Settings.EMPTY)) ); } @@ -358,12 +367,12 @@ public void testTooLargeFromAndSizeOkByDynamicSetting() throws Exception { ); indexRandom(true, prepareIndex("idx").setSource("{}", XContentType.JSON)); - assertHitCount(prepareSearch("idx").setFrom(IndexSettings.MAX_RESULT_WINDOW_SETTING.get(Settings.EMPTY)), 1); - assertHitCount(prepareSearch("idx").setSize(IndexSettings.MAX_RESULT_WINDOW_SETTING.get(Settings.EMPTY) + 1), 1); assertHitCount( + 1, + prepareSearch("idx").setFrom(IndexSettings.MAX_RESULT_WINDOW_SETTING.get(Settings.EMPTY)), + prepareSearch("idx").setSize(IndexSettings.MAX_RESULT_WINDOW_SETTING.get(Settings.EMPTY) + 1), prepareSearch("idx").setSize(IndexSettings.MAX_RESULT_WINDOW_SETTING.get(Settings.EMPTY)) - .setFrom(IndexSettings.MAX_RESULT_WINDOW_SETTING.get(Settings.EMPTY)), - 1 + .setFrom(IndexSettings.MAX_RESULT_WINDOW_SETTING.get(Settings.EMPTY)) ); } @@ -371,12 +380,12 @@ public void testTooLargeFromAndSizeBackwardsCompatibilityRecommendation() throws prepareCreate("idx").setSettings(Settings.builder().put(IndexSettings.MAX_RESULT_WINDOW_SETTING.getKey(), Integer.MAX_VALUE)).get(); indexRandom(true, prepareIndex("idx").setSource("{}", XContentType.JSON)); - assertHitCount(prepareSearch("idx").setFrom(IndexSettings.MAX_RESULT_WINDOW_SETTING.get(Settings.EMPTY) * 10), 1); - assertHitCount(prepareSearch("idx").setSize(IndexSettings.MAX_RESULT_WINDOW_SETTING.get(Settings.EMPTY) * 10), 1); assertHitCount( + 1, + prepareSearch("idx").setFrom(IndexSettings.MAX_RESULT_WINDOW_SETTING.get(Settings.EMPTY) * 10), + prepareSearch("idx").setSize(IndexSettings.MAX_RESULT_WINDOW_SETTING.get(Settings.EMPTY) * 10), prepareSearch("idx").setSize(IndexSettings.MAX_RESULT_WINDOW_SETTING.get(Settings.EMPTY) * 10) - .setFrom(IndexSettings.MAX_RESULT_WINDOW_SETTING.get(Settings.EMPTY) * 10), - 1 + .setFrom(IndexSettings.MAX_RESULT_WINDOW_SETTING.get(Settings.EMPTY) * 10) ); } diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/RestoreSnapshotIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/RestoreSnapshotIT.java index fe83073eeb780..b490c7efd52cd 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/RestoreSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/RestoreSnapshotIT.java @@ -678,9 +678,12 @@ public void testChangeSettingsOnRestore() throws Exception { indexRandom(true, builders); flushAndRefresh(); - assertHitCount(client.prepareSearch("test-idx").setSize(0).setQuery(matchQuery("field1", "foo")), numdocs); + assertHitCount( + numdocs, + client.prepareSearch("test-idx").setSize(0).setQuery(matchQuery("field1", "foo")), + client.prepareSearch("test-idx").setSize(0).setQuery(matchQuery("field1", "bar")) + ); assertHitCount(client.prepareSearch("test-idx").setSize(0).setQuery(matchQuery("field1", "Foo")), 0); - assertHitCount(client.prepareSearch("test-idx").setSize(0).setQuery(matchQuery("field1", "bar")), numdocs); createSnapshot("test-repo", "test-snap", Collections.singletonList("test-idx")); @@ -736,8 +739,11 @@ public void testChangeSettingsOnRestore() throws Exception { assertThat(getSettingsResponse.getSetting("test-idx", SETTING_NUMBER_OF_SHARDS), equalTo("" + numberOfShards)); assertThat(getSettingsResponse.getSetting("test-idx", "index.analysis.analyzer.my_analyzer.type"), equalTo("standard")); - assertHitCount(client.prepareSearch("test-idx").setSize(0).setQuery(matchQuery("field1", "Foo")), numdocs); - assertHitCount(client.prepareSearch("test-idx").setSize(0).setQuery(matchQuery("field1", "bar")), numdocs); + assertHitCount( + numdocs, + client.prepareSearch("test-idx").setSize(0).setQuery(matchQuery("field1", "Foo")), + client.prepareSearch("test-idx").setSize(0).setQuery(matchQuery("field1", "bar")) + ); logger.info("--> delete the index and recreate it while deleting all index settings"); cluster().wipeIndices("test-idx"); @@ -758,8 +764,11 @@ public void testChangeSettingsOnRestore() throws Exception { // Make sure that number of shards didn't change assertThat(getSettingsResponse.getSetting("test-idx", SETTING_NUMBER_OF_SHARDS), equalTo("" + numberOfShards)); - assertHitCount(client.prepareSearch("test-idx").setSize(0).setQuery(matchQuery("field1", "Foo")), numdocs); - assertHitCount(client.prepareSearch("test-idx").setSize(0).setQuery(matchQuery("field1", "bar")), numdocs); + assertHitCount( + numdocs, + client.prepareSearch("test-idx").setSize(0).setQuery(matchQuery("field1", "Foo")), + client.prepareSearch("test-idx").setSize(0).setQuery(matchQuery("field1", "bar")) + ); } public void testRestoreChangeIndexMode() { From a0ac839189431e016ea810ac1e48b6932b6daa0b Mon Sep 17 00:00:00 2001 From: David Kyle Date: Fri, 6 Dec 2024 11:02:17 +0000 Subject: [PATCH 3/3] [ML] Wait for the worker service to shutdown before closing task processor (#117920) --- docs/changelog/117920.yaml | 6 ++++ .../deployment/DeploymentManager.java | 33 +++++++------------ 2 files changed, 18 insertions(+), 21 deletions(-) create mode 100644 docs/changelog/117920.yaml diff --git a/docs/changelog/117920.yaml b/docs/changelog/117920.yaml new file mode 100644 index 0000000000000..1bfddabd4462d --- /dev/null +++ b/docs/changelog/117920.yaml @@ -0,0 +1,6 @@ +pr: 117920 +summary: Wait for the worker service to shutdown before closing task processor +area: Machine Learning +type: bug +issues: + - 117563 diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/deployment/DeploymentManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/deployment/DeploymentManager.java index 9187969fc25a4..c6f1ebcc10780 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/deployment/DeploymentManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/deployment/DeploymentManager.java @@ -631,12 +631,15 @@ synchronized void forcefullyStopProcess() { logger.debug(() -> format("[%s] Forcefully stopping process", task.getDeploymentId())); prepareInternalStateForShutdown(); - if (priorityProcessWorker.isShutdown()) { - // most likely there was a crash or exception that caused the - // thread to stop. Notify any waiting requests in the work queue - handleAlreadyShuttingDownWorker(); - } else { - priorityProcessWorker.shutdown(); + priorityProcessWorker.shutdownNow(); + try { + // wait for any currently executing work to finish + if (priorityProcessWorker.awaitTermination(10L, TimeUnit.SECONDS)) { + priorityProcessWorker.notifyQueueRunnables(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.info(Strings.format("[%s] Interrupted waiting for process worker after shutdownNow", PROCESS_NAME)); } killProcessIfPresent(); @@ -649,12 +652,6 @@ private void prepareInternalStateForShutdown() { stateStreamer.cancel(); } - private void handleAlreadyShuttingDownWorker() { - logger.debug(() -> format("[%s] Process worker was already marked for shutdown", task.getDeploymentId())); - - priorityProcessWorker.notifyQueueRunnables(); - } - private void killProcessIfPresent() { try { if (process.get() == null) { @@ -675,15 +672,7 @@ private void closeNlpTaskProcessor() { private synchronized void stopProcessAfterCompletingPendingWork() { logger.debug(() -> format("[%s] Stopping process after completing its pending work", task.getDeploymentId())); prepareInternalStateForShutdown(); - - if (priorityProcessWorker.isShutdown()) { - // most likely there was a crash or exception that caused the - // thread to stop. Notify any waiting requests in the work queue - handleAlreadyShuttingDownWorker(); - } else { - signalAndWaitForWorkerTermination(); - } - + signalAndWaitForWorkerTermination(); stopProcessGracefully(); closeNlpTaskProcessor(); } @@ -707,6 +696,8 @@ private void awaitTerminationAfterCompletingWork() throws TimeoutException { throw new TimeoutException( Strings.format("Timed out waiting for process worker to complete for process %s", PROCESS_NAME) ); + } else { + priorityProcessWorker.notifyQueueRunnables(); } } catch (InterruptedException e) { Thread.currentThread().interrupt();