From bbbc58f37a073880ec7923f31ad599efd94f64bc Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Fri, 8 Dec 2023 15:34:07 -0500 Subject: [PATCH 001/152] Fix NPE & empty result handling in CountOnlyQueryPhaseResultConsumer (#103203) Query results can be "null" in that they are a null instance, containing no information from the shard. Additionally, if we see no results, its an empty reduce phase. This code was introduced into 8.12, which has yet to be released, but flagging as a bug for clarity. --- docs/changelog/103203.yaml | 5 + .../CountOnlyQueryPhaseResultConsumer.java | 9 +- .../search/query/QuerySearchResult.java | 6 + ...ountOnlyQueryPhaseResultConsumerTests.java | 133 ++++++++++++++++++ 4 files changed, 151 insertions(+), 2 deletions(-) create mode 100644 docs/changelog/103203.yaml create mode 100644 server/src/test/java/org/elasticsearch/action/search/CountOnlyQueryPhaseResultConsumerTests.java diff --git a/docs/changelog/103203.yaml b/docs/changelog/103203.yaml new file mode 100644 index 0000000000000..d2aa3e9961c6a --- /dev/null +++ b/docs/changelog/103203.yaml @@ -0,0 +1,5 @@ +pr: 103203 +summary: Fix NPE & empty result handling in `CountOnlyQueryPhaseResultConsumer` +area: Search +type: bug +issues: [] diff --git a/server/src/main/java/org/elasticsearch/action/search/CountOnlyQueryPhaseResultConsumer.java b/server/src/main/java/org/elasticsearch/action/search/CountOnlyQueryPhaseResultConsumer.java index 1e67522f6a671..13972ea2bf64a 100644 --- a/server/src/main/java/org/elasticsearch/action/search/CountOnlyQueryPhaseResultConsumer.java +++ b/server/src/main/java/org/elasticsearch/action/search/CountOnlyQueryPhaseResultConsumer.java @@ -49,12 +49,17 @@ Stream getSuccessfulResults() { public void consumeResult(SearchPhaseResult result, Runnable next) { assert results.contains(result.getShardIndex()) == false : "shardIndex: " + result.getShardIndex() + " is already set"; results.add(result.getShardIndex()); + progressListener.notifyQueryResult(result.getShardIndex(), result.queryResult()); + // We have an empty result, track that we saw it for this shard and continue; + if (result.queryResult().isNull()) { + next.run(); + return; + } // set the relation to the first non-equal relation relationAtomicReference.compareAndSet(TotalHits.Relation.EQUAL_TO, result.queryResult().getTotalHits().relation); totalHits.add(result.queryResult().getTotalHits().value); terminatedEarly.compareAndSet(false, (result.queryResult().terminatedEarly() != null && result.queryResult().terminatedEarly())); timedOut.compareAndSet(false, result.queryResult().searchTimedOut()); - progressListener.notifyQueryResult(result.getShardIndex(), result.queryResult()); next.run(); } @@ -80,7 +85,7 @@ public SearchPhaseController.ReducedQueryPhase reduce() throws Exception { 1, 0, 0, - false + results.isEmpty() ); if (progressListener != SearchProgressListener.NOOP) { progressListener.notifyFinalReduce( diff --git a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java index 40d4e37045016..7bcafe7005047 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java +++ b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java @@ -16,6 +16,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore; import org.elasticsearch.core.AbstractRefCounted; +import org.elasticsearch.core.Nullable; import org.elasticsearch.core.RefCounted; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; @@ -149,6 +150,7 @@ public void terminatedEarly(boolean terminatedEarly) { this.terminatedEarly = terminatedEarly; } + @Nullable public Boolean terminatedEarly() { return this.terminatedEarly; } @@ -204,10 +206,12 @@ public void setRankShardResult(RankShardResult rankShardResult) { this.rankShardResult = rankShardResult; } + @Nullable public RankShardResult getRankShardResult() { return rankShardResult; } + @Nullable public DocValueFormat[] sortValueFormats() { return sortValueFormats; } @@ -252,6 +256,7 @@ public void aggregations(InternalAggregations aggregations) { hasAggs = aggregations != null; } + @Nullable public DelayableWriteable aggregations() { return aggregations; } @@ -455,6 +460,7 @@ public void writeToNoId(StreamOutput out) throws IOException { } } + @Nullable public TotalHits getTotalHits() { return totalHits; } diff --git a/server/src/test/java/org/elasticsearch/action/search/CountOnlyQueryPhaseResultConsumerTests.java b/server/src/test/java/org/elasticsearch/action/search/CountOnlyQueryPhaseResultConsumerTests.java new file mode 100644 index 0000000000000..33e6096bab763 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/search/CountOnlyQueryPhaseResultConsumerTests.java @@ -0,0 +1,133 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.search; + +import org.apache.lucene.search.ScoreDoc; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.search.TotalHits; +import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.search.SearchShardTarget; +import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.query.QuerySearchResult; +import org.elasticsearch.test.ESTestCase; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +public class CountOnlyQueryPhaseResultConsumerTests extends ESTestCase { + + public void testProgressListenerExceptionsAreCaught() throws Exception { + ThrowingSearchProgressListener searchProgressListener = new ThrowingSearchProgressListener(); + + List searchShards = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + searchShards.add(new SearchShard(null, new ShardId("index", "uuid", i))); + } + long timestamp = randomLongBetween(1000, Long.MAX_VALUE - 1000); + TransportSearchAction.SearchTimeProvider timeProvider = new TransportSearchAction.SearchTimeProvider( + timestamp, + timestamp, + () -> timestamp + 1000 + ); + searchProgressListener.notifyListShards(searchShards, Collections.emptyList(), SearchResponse.Clusters.EMPTY, false, timeProvider); + + CountOnlyQueryPhaseResultConsumer queryPhaseResultConsumer = new CountOnlyQueryPhaseResultConsumer(searchProgressListener, 10); + try { + AtomicInteger nextCounter = new AtomicInteger(0); + for (int i = 0; i < 10; i++) { + SearchShardTarget searchShardTarget = new SearchShardTarget("node", new ShardId("index", "uuid", i), null); + QuerySearchResult querySearchResult = new QuerySearchResult(); + TopDocs topDocs = new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), new ScoreDoc[0]); + querySearchResult.topDocs(new TopDocsAndMaxScore(topDocs, Float.NaN), new DocValueFormat[0]); + querySearchResult.setSearchShardTarget(searchShardTarget); + querySearchResult.setShardIndex(i); + queryPhaseResultConsumer.consumeResult(querySearchResult, nextCounter::incrementAndGet); + } + + assertEquals(10, searchProgressListener.onQueryResult.get()); + queryPhaseResultConsumer.reduce(); + assertEquals(1, searchProgressListener.onFinalReduce.get()); + assertEquals(10, nextCounter.get()); + } finally { + queryPhaseResultConsumer.decRef(); + } + } + + public void testNullShardResultHandling() throws Exception { + CountOnlyQueryPhaseResultConsumer queryPhaseResultConsumer = new CountOnlyQueryPhaseResultConsumer(SearchProgressListener.NOOP, 10); + try { + AtomicInteger nextCounter = new AtomicInteger(0); + for (int i = 0; i < 10; i++) { + SearchShardTarget searchShardTarget = new SearchShardTarget("node", new ShardId("index", "uuid", i), null); + QuerySearchResult querySearchResult = QuerySearchResult.nullInstance(); + querySearchResult.setSearchShardTarget(searchShardTarget); + querySearchResult.setShardIndex(i); + queryPhaseResultConsumer.consumeResult(querySearchResult, nextCounter::incrementAndGet); + } + var reducePhase = queryPhaseResultConsumer.reduce(); + assertEquals(0, reducePhase.totalHits().value); + assertEquals(TotalHits.Relation.EQUAL_TO, reducePhase.totalHits().relation); + assertFalse(reducePhase.isEmptyResult()); + assertEquals(10, nextCounter.get()); + } finally { + queryPhaseResultConsumer.decRef(); + } + } + + public void testEmptyResults() throws Exception { + CountOnlyQueryPhaseResultConsumer queryPhaseResultConsumer = new CountOnlyQueryPhaseResultConsumer(SearchProgressListener.NOOP, 10); + try { + var reducePhase = queryPhaseResultConsumer.reduce(); + assertEquals(0, reducePhase.totalHits().value); + assertEquals(TotalHits.Relation.EQUAL_TO, reducePhase.totalHits().relation); + assertTrue(reducePhase.isEmptyResult()); + } finally { + queryPhaseResultConsumer.decRef(); + } + } + + private static class ThrowingSearchProgressListener extends SearchProgressListener { + private final AtomicInteger onQueryResult = new AtomicInteger(0); + private final AtomicInteger onPartialReduce = new AtomicInteger(0); + private final AtomicInteger onFinalReduce = new AtomicInteger(0); + + @Override + protected void onListShards( + List shards, + List skippedShards, + SearchResponse.Clusters clusters, + boolean fetchPhase, + TransportSearchAction.SearchTimeProvider timeProvider + ) { + throw new UnsupportedOperationException(); + } + + @Override + protected void onQueryResult(int shardIndex, QuerySearchResult queryResult) { + onQueryResult.incrementAndGet(); + throw new UnsupportedOperationException(); + } + + @Override + protected void onPartialReduce(List shards, TotalHits totalHits, InternalAggregations aggs, int reducePhase) { + onPartialReduce.incrementAndGet(); + throw new UnsupportedOperationException(); + } + + @Override + protected void onFinalReduce(List shards, TotalHits totalHits, InternalAggregations aggs, int reducePhase) { + onFinalReduce.incrementAndGet(); + throw new UnsupportedOperationException(); + } + } +} From f74d18d39acb3bc17b2015104fc26ec5c44fb36b Mon Sep 17 00:00:00 2001 From: sabi0 <2sabio@gmail.com> Date: Fri, 8 Dec 2023 22:21:57 +0100 Subject: [PATCH 002/152] Fix typos in CoordinationDiagnosticsService (#103184) --- .../coordination/CoordinationDiagnosticsService.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationDiagnosticsService.java b/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationDiagnosticsService.java index fb5c9f2fea7de..229c34ecc1a14 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationDiagnosticsService.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationDiagnosticsService.java @@ -68,7 +68,7 @@ * this will report GREEN. * If we have had a master within the last 30 seconds, but that master has changed more than 3 times in the last 30 minutes (and that is * confirmed by checking with the last-known master), then this will report YELLOW. - * If we have not had a master within the last 30 seconds, then this will will report RED with one exception. That exception is when: + * If we have not had a master within the last 30 seconds, then this will report RED with one exception. That exception is when: * (1) no node is elected master, (2) this node is not master eligible, (3) some node is master eligible, (4) we ask a master-eligible node * to run this service, and (5) it comes back with a result that is not RED. * Since this service needs to be able to run when there is no master at all, it does not depend on the dedicated health node (which @@ -99,7 +99,7 @@ public class CoordinationDiagnosticsService implements ClusterStateListener { /* * This is a Map of tasks that are periodically reaching out to other master eligible nodes to get their ClusterFormationStates for - * diagnosis. The key is the DisoveryNode for the master eligible node being polled, and the value is a Cancellable. + * diagnosis. The key is the DiscoveryNode for the master eligible node being polled, and the value is a Cancellable. * The field is accessed (reads/writes) from multiple threads, but the reference itself is only ever changed on the cluster change * event thread. */ @@ -121,7 +121,7 @@ public class CoordinationDiagnosticsService implements ClusterStateListener { volatile AtomicReference remoteCoordinationDiagnosisTask = null; /* * This field holds the result of the task in the remoteCoordinationDiagnosisTask field above. The field is accessed - * (reads/writes) from multiple threads, but is only ever reassigned on a the initialization thread and the cluster change event thread. + * (reads/writes) from multiple threads, but is only ever reassigned on the initialization thread and the cluster change event thread. */ volatile AtomicReference remoteCoordinationDiagnosisResult = null; @@ -294,7 +294,7 @@ private static CoordinationDiagnosticsDetails getDetails( /** * Returns the health result when we have detected locally that the master has changed to null repeatedly (by default more than 3 times - * in the last 30 minutes). This method attemtps to use the master history from a remote node to confirm what we are seeing locally. + * in the last 30 minutes). This method attempts to use the master history from a remote node to confirm what we are seeing locally. * If the information from the remote node confirms that the master history has been unstable, a YELLOW status is returned. If the * information from the remote node shows that the master history has been stable, then we assume that the problem is with this node * and a GREEN status is returned (the problems with this node will be covered in a separate health indicator). If there had been @@ -1133,7 +1133,7 @@ private Scheduler.Cancellable sendTransportRequest ); responseConsumer.accept(responseTransformationFunction.apply(response, null)); }, e -> { - logger.warn("Exception in remote request to master" + masterEligibleNode, e); + logger.warn("Exception in remote request to master " + masterEligibleNode, e); responseConsumer.accept(responseTransformationFunction.apply(null, e)); })); @@ -1143,7 +1143,7 @@ public void run() { if (masterEligibleNode == null) { /* * This node's PeerFinder hasn't yet discovered the master-eligible nodes. By notifying the responseConsumer with a null - * value we effectively do nothing, and allow this request to be recheduled. + * value we effectively do nothing, and allow this request to be rescheduled. */ responseConsumer.accept(null); } else { From 6fac62eb1433be2ec4e54d4afa66c4f79bdf26d0 Mon Sep 17 00:00:00 2001 From: Ryan Ernst Date: Fri, 8 Dec 2023 17:00:38 -0800 Subject: [PATCH 003/152] Update Lucene to remove terms enum optimzation (#103229) This commit updates Lucene to a patched version of 9.9.0 without https://github.com/apache/lucene/pull/12699. See https://github.com/apache/lucene/issues/12895. --- build-tools-internal/version.properties | 2 +- gradle/verification-metadata.xml | 120 ++++++++++++++++++++++++ 2 files changed, 121 insertions(+), 1 deletion(-) diff --git a/build-tools-internal/version.properties b/build-tools-internal/version.properties index adf33dd070a22..c34bdc95046b3 100644 --- a/build-tools-internal/version.properties +++ b/build-tools-internal/version.properties @@ -1,5 +1,5 @@ elasticsearch = 8.13.0 -lucene = 9.9.0 +lucene = 9.9.0-snapshot-bb4fec631e6 bundled_jdk_vendor = openjdk bundled_jdk = 21.0.1+12@415e3f918a1f4062a0074a2794853d0d diff --git a/gradle/verification-metadata.xml b/gradle/verification-metadata.xml index 7f672ece21f66..263602c9841a8 100644 --- a/gradle/verification-metadata.xml +++ b/gradle/verification-metadata.xml @@ -2664,121 +2664,241 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + From 3a1b0dc6c5732a7c2675f0a50837c8ca41cf19de Mon Sep 17 00:00:00 2001 From: Ryan Ernst Date: Fri, 8 Dec 2023 22:03:28 -0800 Subject: [PATCH 004/152] Wrap painless explain error (#103151) In https://github.com/elastic/elasticsearch/pull/100872 Painless errors were wrapped so as to avoid throwing Errors outside scripting. However, one case was missed: PainlessExplainError which is used by Debug.explain. This commit adds the explain error to those that painless wraps. closes #103018 --- docs/changelog/103151.yaml | 6 ++++++ .../painless/ErrorCauseWrapper.java | 1 + .../org/elasticsearch/painless/DebugTests.java | 16 ++++++++++------ 3 files changed, 17 insertions(+), 6 deletions(-) create mode 100644 docs/changelog/103151.yaml diff --git a/docs/changelog/103151.yaml b/docs/changelog/103151.yaml new file mode 100644 index 0000000000000..bd9eea97cac6d --- /dev/null +++ b/docs/changelog/103151.yaml @@ -0,0 +1,6 @@ +pr: 103151 +summary: Wrap painless explain error +area: Infra/Scripting +type: bug +issues: + - 103018 diff --git a/modules/lang-painless/src/main/java/org/elasticsearch/painless/ErrorCauseWrapper.java b/modules/lang-painless/src/main/java/org/elasticsearch/painless/ErrorCauseWrapper.java index aeaf44bfd014c..308d6223c666e 100644 --- a/modules/lang-painless/src/main/java/org/elasticsearch/painless/ErrorCauseWrapper.java +++ b/modules/lang-painless/src/main/java/org/elasticsearch/painless/ErrorCauseWrapper.java @@ -23,6 +23,7 @@ class ErrorCauseWrapper extends ElasticsearchException { private static final List> wrappedErrors = List.of( PainlessError.class, + PainlessExplainError.class, OutOfMemoryError.class, StackOverflowError.class, LinkageError.class diff --git a/modules/lang-painless/src/test/java/org/elasticsearch/painless/DebugTests.java b/modules/lang-painless/src/test/java/org/elasticsearch/painless/DebugTests.java index 87b199cd1b43f..48da785e801d3 100644 --- a/modules/lang-painless/src/test/java/org/elasticsearch/painless/DebugTests.java +++ b/modules/lang-painless/src/test/java/org/elasticsearch/painless/DebugTests.java @@ -20,6 +20,7 @@ import static java.util.Collections.singletonList; import static java.util.Collections.singletonMap; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.not; @@ -30,29 +31,32 @@ public class DebugTests extends ScriptTestCase { public void testExplain() { // Debug.explain can explain an object Object dummy = new Object(); - PainlessExplainError e = expectScriptThrows( - PainlessExplainError.class, - () -> exec("Debug.explain(params.a)", singletonMap("a", dummy), true) - ); + var wrapper = expectScriptThrows(ErrorCauseWrapper.class, () -> exec("Debug.explain(params.a)", singletonMap("a", dummy), true)); + assertThat(wrapper.realCause.getClass(), equalTo(PainlessExplainError.class)); + var e = (PainlessExplainError) wrapper.realCause; assertSame(dummy, e.getObjectToExplain()); assertThat(e.getHeaders(painlessLookup), hasEntry("es.to_string", singletonList(dummy.toString()))); assertThat(e.getHeaders(painlessLookup), hasEntry("es.java_class", singletonList("java.lang.Object"))); assertThat(e.getHeaders(painlessLookup), hasEntry("es.painless_class", singletonList("java.lang.Object"))); // Null should be ok - e = expectScriptThrows(PainlessExplainError.class, () -> exec("Debug.explain(null)")); + wrapper = expectScriptThrows(ErrorCauseWrapper.class, () -> exec("Debug.explain(null)")); + assertThat(wrapper.realCause.getClass(), equalTo(PainlessExplainError.class)); + e = (PainlessExplainError) wrapper.realCause; assertNull(e.getObjectToExplain()); assertThat(e.getHeaders(painlessLookup), hasEntry("es.to_string", singletonList("null"))); assertThat(e.getHeaders(painlessLookup), not(hasKey("es.java_class"))); assertThat(e.getHeaders(painlessLookup), not(hasKey("es.painless_class"))); // You can't catch the explain exception - e = expectScriptThrows(PainlessExplainError.class, () -> exec(""" + wrapper = expectScriptThrows(ErrorCauseWrapper.class, () -> exec(""" try { Debug.explain(params.a) } catch (Exception e) { return 1 }""", singletonMap("a", dummy), true)); + assertThat(wrapper.realCause.getClass(), equalTo(PainlessExplainError.class)); + e = (PainlessExplainError) wrapper.realCause; assertSame(dummy, e.getObjectToExplain()); } From 80b222c3957942c9e11fd1043bf98ef773d862d0 Mon Sep 17 00:00:00 2001 From: John Verwolf Date: Sat, 9 Dec 2023 15:09:02 -0800 Subject: [PATCH 005/152] Update elasticsearch.modules.parent-join.internalClusterTest (#102189) Part of the broader work covered in https://github.com/elastic/elasticsearch/issues/102030 Updates tests in: - ChildQuerySearchIT - TokenCountFieldMapperIntegrationIT --- .../join/query/ChildQuerySearchIT.java | 27 ++++----- .../AggregationsIntegrationIT.java | 43 ++++++-------- .../elasticsearch/test/ESIntegTestCase.java | 2 +- .../hamcrest/ElasticsearchAssertions.java | 58 ++++++++++++++++++- 4 files changed, 83 insertions(+), 47 deletions(-) diff --git a/modules/parent-join/src/internalClusterTest/java/org/elasticsearch/join/query/ChildQuerySearchIT.java b/modules/parent-join/src/internalClusterTest/java/org/elasticsearch/join/query/ChildQuerySearchIT.java index ae1adf4160c2a..02776eb277020 100644 --- a/modules/parent-join/src/internalClusterTest/java/org/elasticsearch/join/query/ChildQuerySearchIT.java +++ b/modules/parent-join/src/internalClusterTest/java/org/elasticsearch/join/query/ChildQuerySearchIT.java @@ -12,7 +12,6 @@ import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.SearchRequestBuilder; -import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; import org.elasticsearch.common.lucene.search.function.CombineFunction; @@ -69,6 +68,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailuresAndResponse; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertScrollResponsesAndHitCount; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchHit; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchHits; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.hasId; @@ -1403,22 +1403,15 @@ public void testParentChildQueriesViaScrollApi() throws Exception { boolQuery().must(matchAllQuery()).filter(hasParentQuery("parent", matchAllQuery(), false)) }; for (QueryBuilder query : queries) { - SearchResponse scrollResponse = prepareSearch("test").setScroll(TimeValue.timeValueSeconds(30)) - .setSize(1) - .addStoredField("_id") - .setQuery(query) - .get(); - - assertNoFailures(scrollResponse); - assertThat(scrollResponse.getHits().getTotalHits().value, equalTo(10L)); - int scannedDocs = 0; - do { - assertThat(scrollResponse.getHits().getTotalHits().value, equalTo(10L)); - scannedDocs += scrollResponse.getHits().getHits().length; - scrollResponse = client().prepareSearchScroll(scrollResponse.getScrollId()).setScroll(TimeValue.timeValueSeconds(30)).get(); - } while (scrollResponse.getHits().getHits().length > 0); - clearScroll(scrollResponse.getScrollId()); - assertThat(scannedDocs, equalTo(10)); + assertScrollResponsesAndHitCount( + TimeValue.timeValueSeconds(60), + prepareSearch("test").setScroll(TimeValue.timeValueSeconds(30)).setSize(1).addStoredField("_id").setQuery(query), + 10, + (respNum, response) -> { + assertNoFailures(response); + assertThat(response.getHits().getTotalHits().value, equalTo(10L)); + } + ); } } diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/AggregationsIntegrationIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/AggregationsIntegrationIT.java index df8f3825a5ea6..a856ee36aadc2 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/AggregationsIntegrationIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/AggregationsIntegrationIT.java @@ -18,7 +18,8 @@ import static org.elasticsearch.search.aggregations.AggregationBuilders.terms; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailuresAndResponse; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertScrollResponsesAndHitCount; @ESIntegTestCase.SuiteScopeTestCase public class AggregationsIntegrationIT extends ESIntegTestCase { @@ -38,32 +39,22 @@ public void setupSuiteScopeCluster() throws Exception { public void testScroll() { final int size = randomIntBetween(1, 4); - final String[] scroll = new String[1]; - final int[] total = new int[1]; - assertNoFailuresAndResponse( - prepareSearch("index").setSize(size).setScroll(TimeValue.timeValueMinutes(1)).addAggregation(terms("f").field("f")), - response -> { - Aggregations aggregations = response.getAggregations(); - assertNotNull(aggregations); - Terms terms = aggregations.get("f"); - assertEquals(Math.min(numDocs, 3L), terms.getBucketByKey("0").getDocCount()); - scroll[0] = response.getScrollId(); - total[0] = response.getHits().getHits().length; + assertScrollResponsesAndHitCount( + TimeValue.timeValueSeconds(60), + prepareSearch("index").setSize(size).addAggregation(terms("f").field("f")), + numDocs, + (respNum, response) -> { + assertNoFailures(response); + + if (respNum == 1) { // initial response. + Aggregations aggregations = response.getAggregations(); + assertNotNull(aggregations); + Terms terms = aggregations.get("f"); + assertEquals(Math.min(numDocs, 3L), terms.getBucketByKey("0").getDocCount()); + } else { + assertNull(response.getAggregations()); + } } ); - int currentTotal = 0; - while (total[0] - currentTotal > 0) { - currentTotal = total[0]; - assertNoFailuresAndResponse( - client().prepareSearchScroll(scroll[0]).setScroll(TimeValue.timeValueMinutes(1)), - scrollResponse -> { - assertNull(scrollResponse.getAggregations()); - total[0] += scrollResponse.getHits().getHits().length; - scroll[0] = scrollResponse.getScrollId(); - } - ); - } - clearScroll(scroll[0]); - assertEquals(numDocs, total[0]); } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index e0083d5570baa..779d846f4eac2 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -1881,7 +1881,7 @@ protected void addError(Exception e) { /** * Clears the given scroll Ids */ - public void clearScroll(String... scrollIds) { + public static void clearScroll(String... scrollIds) { ClearScrollResponse clearResponse = client().prepareClearScroll().setScrollIds(Arrays.asList(scrollIds)).get(); assertThat(clearResponse.isSucceeded(), equalTo(true)); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java b/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java index bba7a6e19deea..0d7ab26faecf9 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java +++ b/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java @@ -25,6 +25,7 @@ import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchScrollRequestBuilder; import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.action.support.DefaultShardOperationFailedException; import org.elasticsearch.action.support.broadcast.BaseBroadcastResponse; @@ -51,6 +52,7 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.List; @@ -60,10 +62,13 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; import java.util.function.Consumer; import static org.apache.lucene.tests.util.LuceneTestCase.expectThrows; import static org.apache.lucene.tests.util.LuceneTestCase.expectThrowsAnyOf; +import static org.elasticsearch.test.ESIntegTestCase.clearScroll; +import static org.elasticsearch.test.ESIntegTestCase.client; import static org.elasticsearch.test.LambdaMatchers.transformedArrayItemsMatch; import static org.elasticsearch.test.LambdaMatchers.transformedItemsMatch; import static org.elasticsearch.test.LambdaMatchers.transformedMatch; @@ -73,6 +78,7 @@ import static org.hamcrest.Matchers.arrayContaining; import static org.hamcrest.Matchers.arrayWithSize; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.emptyArray; import static org.hamcrest.Matchers.greaterThan; @@ -369,6 +375,48 @@ public static void assertRes } } + /** + * A helper to enable the testing of scroll requests with ref-counting. + * + * @param keepAlive The TTL for the scroll context. + * @param searchRequestBuilder The initial search request. + * @param expectedTotalHitCount The number of hits that are expected to be retrieved. + * @param responseConsumer (respNum, response) -> {your assertions here}. + * respNum starts at 1, which contains the resp from the initial request. + */ + public static void assertScrollResponsesAndHitCount( + TimeValue keepAlive, + SearchRequestBuilder searchRequestBuilder, + int expectedTotalHitCount, + BiConsumer responseConsumer + ) { + searchRequestBuilder.setScroll(keepAlive); + List responses = new ArrayList<>(); + var scrollResponse = searchRequestBuilder.get(); + responses.add(scrollResponse); + int retrievedDocsCount = 0; + try { + assertThat(scrollResponse.getHits().getTotalHits().value, equalTo((long) expectedTotalHitCount)); + retrievedDocsCount += scrollResponse.getHits().getHits().length; + responseConsumer.accept(responses.size(), scrollResponse); + while (scrollResponse.getHits().getHits().length > 0) { + scrollResponse = prepareScrollSearch(scrollResponse.getScrollId(), keepAlive).get(); + responses.add(scrollResponse); + assertThat(scrollResponse.getHits().getTotalHits().value, equalTo((long) expectedTotalHitCount)); + retrievedDocsCount += scrollResponse.getHits().getHits().length; + responseConsumer.accept(responses.size(), scrollResponse); + } + } finally { + clearScroll(scrollResponse.getScrollId()); + responses.forEach(SearchResponse::decRef); + } + assertThat(retrievedDocsCount, equalTo(expectedTotalHitCount)); + } + + public static SearchScrollRequestBuilder prepareScrollSearch(String scrollId, TimeValue timeout) { + return client().prepareSearchScroll(scrollId).setScroll(timeout); + } + public static void assertResponse(ActionFuture responseFuture, Consumer consumer) throws ExecutionException, InterruptedException { var res = responseFuture.get(); @@ -442,6 +490,10 @@ public static void assertFailures(SearchRequestBuilder searchRequestBuilder, Res } } + public static void assertFailures(SearchRequestBuilder searchRequestBuilder, RestStatus restStatus) { + assertFailures(searchRequestBuilder, restStatus, containsString("")); + } + public static void assertNoFailures(BaseBroadcastResponse response) { if (response.getFailedShards() != 0) { final AssertionError assertionError = new AssertionError("[" + response.getFailedShards() + "] shard failures"); @@ -791,9 +843,9 @@ public static void assertToXContentEquivalent(BytesReference expected, BytesRefe * Often latches are called as assertTrue(latch.await(1, TimeUnit.SECONDS)); * In case of a failure this will just throw an assertion error without any further message * - * @param latch The latch to wait for - * @param timeout The value of the timeout - * @param unit The unit of the timeout + * @param latch The latch to wait for + * @param timeout The value of the timeout + * @param unit The unit of the timeout * @throws InterruptedException An exception if the waiting is interrupted */ public static void awaitLatch(CountDownLatch latch, long timeout, TimeUnit unit) throws InterruptedException { From 64818df6e8bee30944e34cde9aaa14a69a719ce2 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Sun, 10 Dec 2023 20:40:39 +0100 Subject: [PATCH 006/152] Fix SearchResponse leaks in searchable snapshot test and production code (#103118) It's in the title. Fix all the tests like we have everywhere else. Fix the production use of SearchResponse in BlobStoreCacheMaintenanceService by properly ref-counting the response it holds on to. --- .../search/SearchResponseUtils.java | 9 +- .../BaseSearchableSnapshotsIntegTestCase.java | 9 +- .../RetrySearchIntegTests.java | 42 +-- ...pshotsCanMatchOnCoordinatorIntegTests.java | 83 +++--- .../SearchableSnapshotsIntegTests.java | 58 ++-- ...napshotsRecoverFromSnapshotIntegTests.java | 10 +- ...archableSnapshotsRepositoryIntegTests.java | 18 +- ...ableSnapshotsBlobStoreCacheIntegTests.java | 9 +- ...tsBlobStoreCacheMaintenanceIntegTests.java | 17 +- .../shared/NodesCachesStatsIntegTests.java | 2 +- .../BlobStoreCacheMaintenanceService.java | 278 ++++++++++-------- 11 files changed, 270 insertions(+), 265 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/search/SearchResponseUtils.java b/test/framework/src/main/java/org/elasticsearch/search/SearchResponseUtils.java index e61b89fcff42c..589bc76c55a3d 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/SearchResponseUtils.java +++ b/test/framework/src/main/java/org/elasticsearch/search/SearchResponseUtils.java @@ -7,17 +7,22 @@ */ package org.elasticsearch.search; +import org.apache.lucene.search.TotalHits; import org.elasticsearch.action.search.SearchRequestBuilder; public enum SearchResponseUtils { ; - public static long getTotalHitsValue(SearchRequestBuilder request) { + public static TotalHits getTotalHits(SearchRequestBuilder request) { var resp = request.get(); try { - return resp.getHits().getTotalHits().value; + return resp.getHits().getTotalHits(); } finally { resp.decRef(); } } + + public static long getTotalHitsValue(SearchRequestBuilder request) { + return getTotalHits(request).value; + } } diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/BaseSearchableSnapshotsIntegTestCase.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/BaseSearchableSnapshotsIntegTestCase.java index d3bb435dc03ab..b7dc212fe12ad 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/BaseSearchableSnapshotsIntegTestCase.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/BaseSearchableSnapshotsIntegTestCase.java @@ -64,6 +64,7 @@ import static org.elasticsearch.license.LicenseSettings.SELF_GENERATED_LICENSE_TYPE; import static org.elasticsearch.test.NodeRoles.addRoles; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse; import static org.elasticsearch.xpack.searchablesnapshots.cache.common.TestUtils.pageAligned; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -289,10 +290,10 @@ protected void assertTotalHits(String indexName, TotalHits originalAllHits, Tota } catch (InterruptedException e) { throw new RuntimeException(e); } - allHits.set(t, prepareSearch(indexName).setTrackTotalHits(true).get().getHits().getTotalHits()); - barHits.set( - t, - prepareSearch(indexName).setTrackTotalHits(true).setQuery(matchQuery("foo", "bar")).get().getHits().getTotalHits() + assertResponse(prepareSearch(indexName).setTrackTotalHits(true), resp -> allHits.set(t, resp.getHits().getTotalHits())); + assertResponse( + prepareSearch(indexName).setTrackTotalHits(true).setQuery(matchQuery("foo", "bar")), + resp -> barHits.set(t, resp.getHits().getTotalHits()) ); }); threads[i].start(); diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/RetrySearchIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/RetrySearchIntegTests.java index 0551ac3007f10..c80cf3c3d62e3 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/RetrySearchIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/RetrySearchIntegTests.java @@ -9,7 +9,6 @@ import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.search.ClosePointInTimeRequest; import org.elasticsearch.action.search.OpenPointInTimeRequest; -import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.action.search.TransportClosePointInTimeAction; import org.elasticsearch.action.search.TransportOpenPointInTimeAction; @@ -32,7 +31,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailuresAndResponse; import static org.hamcrest.Matchers.equalTo; public class RetrySearchIntegTests extends BaseSearchableSnapshotsIntegTestCase { @@ -144,30 +143,31 @@ public void testRetryPointInTime() throws Exception { ).keepAlive(TimeValue.timeValueMinutes(2)); final String pitId = client().execute(TransportOpenPointInTimeAction.TYPE, openRequest).actionGet().getPointInTimeId(); try { - SearchResponse resp = prepareSearch().setIndices(indexName) - .setPreference(null) - .setPointInTime(new PointInTimeBuilder(pitId)) - .get(); - assertNoFailures(resp); - assertThat(resp.pointInTimeId(), equalTo(pitId)); - assertHitCount(resp, docCount); - + assertNoFailuresAndResponse( + prepareSearch().setIndices(indexName).setPreference(null).setPointInTime(new PointInTimeBuilder(pitId)), + resp -> { + assertThat(resp.pointInTimeId(), equalTo(pitId)); + assertHitCount(resp, docCount); + } + ); final Set allocatedNodes = internalCluster().nodesInclude(indexName); for (String allocatedNode : allocatedNodes) { internalCluster().restartNode(allocatedNode); } ensureGreen(indexName); - resp = prepareSearch().setIndices(indexName) - .setQuery(new RangeQueryBuilder("created_date").gte("2011-01-01").lte("2011-12-12")) - .setSearchType(SearchType.QUERY_THEN_FETCH) - .setPreference(null) - .setPreFilterShardSize(between(1, 10)) - .setAllowPartialSearchResults(true) - .setPointInTime(new PointInTimeBuilder(pitId)) - .get(); - assertNoFailures(resp); - assertThat(resp.pointInTimeId(), equalTo(pitId)); - assertHitCount(resp, docCount); + assertNoFailuresAndResponse( + prepareSearch().setIndices(indexName) + .setQuery(new RangeQueryBuilder("created_date").gte("2011-01-01").lte("2011-12-12")) + .setSearchType(SearchType.QUERY_THEN_FETCH) + .setPreference(null) + .setPreFilterShardSize(between(1, 10)) + .setAllowPartialSearchResults(true) + .setPointInTime(new PointInTimeBuilder(pitId)), + resp -> { + assertThat(resp.pointInTimeId(), equalTo(pitId)); + assertHitCount(resp, docCount); + } + ); } finally { client().execute(TransportClosePointInTimeAction.TYPE, new ClosePointInTimeRequest(pitId)).actionGet(); } diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsCanMatchOnCoordinatorIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsCanMatchOnCoordinatorIntegTests.java index 844e6099460b2..a7a3b8e461604 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsCanMatchOnCoordinatorIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsCanMatchOnCoordinatorIntegTests.java @@ -52,6 +52,7 @@ import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_SETTING; import static org.elasticsearch.index.IndexSettings.INDEX_SOFT_DELETES_SETTING; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.lessThanOrEqualTo; @@ -182,14 +183,14 @@ public void testSearchableSnapshotShardsAreSkippedBySearchRequestWithoutQuerying .source(new SearchSourceBuilder().query(rangeQuery)); if (includeIndexCoveringSearchRangeInSearchRequest) { - SearchResponse searchResponse = client().search(request).actionGet(); - - // All the regular index searches succeeded - assertThat(searchResponse.getSuccessfulShards(), equalTo(indexWithinSearchRangeShardCount)); - // All the searchable snapshots shard search failed - assertThat(searchResponse.getFailedShards(), equalTo(indexOutsideSearchRangeShardCount)); - assertThat(searchResponse.getSkippedShards(), equalTo(0)); - assertThat(searchResponse.getTotalShards(), equalTo(totalShards)); + assertResponse(client().search(request), searchResponse -> { + // All the regular index searches succeeded + assertThat(searchResponse.getSuccessfulShards(), equalTo(indexWithinSearchRangeShardCount)); + // All the searchable snapshots shard search failed + assertThat(searchResponse.getFailedShards(), equalTo(indexOutsideSearchRangeShardCount)); + assertThat(searchResponse.getSkippedShards(), equalTo(0)); + assertThat(searchResponse.getTotalShards(), equalTo(totalShards)); + }); } else { // All shards failed, since all shards are unassigned and the IndexMetadata min/max timestamp // is not available yet @@ -271,13 +272,13 @@ public void testSearchableSnapshotShardsAreSkippedBySearchRequestWithoutQuerying waitUntilAllShardsAreUnassigned(updatedIndexMetadata.getIndex()); if (includeIndexCoveringSearchRangeInSearchRequest) { - SearchResponse newSearchResponse = client().search(request).actionGet(); - - assertThat(newSearchResponse.getSkippedShards(), equalTo(indexOutsideSearchRangeShardCount)); - assertThat(newSearchResponse.getSuccessfulShards(), equalTo(totalShards)); - assertThat(newSearchResponse.getFailedShards(), equalTo(0)); - assertThat(newSearchResponse.getTotalShards(), equalTo(totalShards)); - assertThat(newSearchResponse.getHits().getTotalHits().value, equalTo((long) numDocsWithinRange)); + assertResponse(client().search(request), newSearchResponse -> { + assertThat(newSearchResponse.getSkippedShards(), equalTo(indexOutsideSearchRangeShardCount)); + assertThat(newSearchResponse.getSuccessfulShards(), equalTo(totalShards)); + assertThat(newSearchResponse.getFailedShards(), equalTo(0)); + assertThat(newSearchResponse.getTotalShards(), equalTo(totalShards)); + assertThat(newSearchResponse.getHits().getTotalHits().value, equalTo((long) numDocsWithinRange)); + }); // test with SearchShardsAPI { @@ -338,13 +339,14 @@ public void testSearchableSnapshotShardsAreSkippedBySearchRequestWithoutQuerying } } } else { - SearchResponse newSearchResponse = client().search(request).actionGet(); - // When all shards are skipped, at least one of them should be queried in order to - // provide a proper search response. - assertThat(newSearchResponse.getSkippedShards(), equalTo(indexOutsideSearchRangeShardCount - 1)); - assertThat(newSearchResponse.getSuccessfulShards(), equalTo(indexOutsideSearchRangeShardCount - 1)); - assertThat(newSearchResponse.getFailedShards(), equalTo(1)); - assertThat(newSearchResponse.getTotalShards(), equalTo(indexOutsideSearchRangeShardCount)); + assertResponse(client().search(request), newSearchResponse -> { + // When all shards are skipped, at least one of them should be queried in order to + // provide a proper search response. + assertThat(newSearchResponse.getSkippedShards(), equalTo(indexOutsideSearchRangeShardCount - 1)); + assertThat(newSearchResponse.getSuccessfulShards(), equalTo(indexOutsideSearchRangeShardCount - 1)); + assertThat(newSearchResponse.getFailedShards(), equalTo(1)); + assertThat(newSearchResponse.getTotalShards(), equalTo(indexOutsideSearchRangeShardCount)); + }); // test with SearchShardsAPI { @@ -449,14 +451,15 @@ public void testQueryPhaseIsExecutedInAnAvailableNodeWhenAllShardsCanBeSkipped() // test with Search API { - SearchResponse searchResponse = client().search(request).actionGet(); - // All the regular index searches succeeded - assertThat(searchResponse.getSuccessfulShards(), equalTo(indexOutsideSearchRangeShardCount)); - // All the searchable snapshots shard search failed - assertThat(searchResponse.getFailedShards(), equalTo(indexOutsideSearchRangeShardCount)); - assertThat(searchResponse.getSkippedShards(), equalTo(searchableSnapshotShardCount)); - assertThat(searchResponse.getTotalShards(), equalTo(totalShards)); - assertThat(searchResponse.getHits().getTotalHits().value, equalTo(0L)); + assertResponse(client().search(request), searchResponse -> { + // All the regular index searches succeeded + assertThat(searchResponse.getSuccessfulShards(), equalTo(indexOutsideSearchRangeShardCount)); + // All the searchable snapshots shard search failed + assertThat(searchResponse.getFailedShards(), equalTo(indexOutsideSearchRangeShardCount)); + assertThat(searchResponse.getSkippedShards(), equalTo(searchableSnapshotShardCount)); + assertThat(searchResponse.getTotalShards(), equalTo(totalShards)); + assertThat(searchResponse.getHits().getTotalHits().value, equalTo(0L)); + }); } // test with SearchShards API @@ -513,16 +516,16 @@ public void testQueryPhaseIsExecutedInAnAvailableNodeWhenAllShardsCanBeSkipped() // busy assert since computing the time stamp field from the cluster state happens off of the CS applier thread and thus can be // slightly delayed assertBusy(() -> { - SearchResponse newSearchResponse = client().search(request).actionGet(); - - // All the regular index searches succeeded - assertThat(newSearchResponse.getSuccessfulShards(), equalTo(totalShards)); - assertThat(newSearchResponse.getFailedShards(), equalTo(0)); - // We have to query at least one node to construct a valid response, and we pick - // a shard that's available in order to construct the search response - assertThat(newSearchResponse.getSkippedShards(), equalTo(totalShards - 1)); - assertThat(newSearchResponse.getTotalShards(), equalTo(totalShards)); - assertThat(newSearchResponse.getHits().getTotalHits().value, equalTo(0L)); + assertResponse(client().search(request), newSearchResponse -> { + // All the regular index searches succeeded + assertThat(newSearchResponse.getSuccessfulShards(), equalTo(totalShards)); + assertThat(newSearchResponse.getFailedShards(), equalTo(0)); + // We have to query at least one node to construct a valid response, and we pick + // a shard that's available in order to construct the search response + assertThat(newSearchResponse.getSkippedShards(), equalTo(totalShards - 1)); + assertThat(newSearchResponse.getTotalShards(), equalTo(totalShards)); + assertThat(newSearchResponse.getHits().getTotalHits().value, equalTo(0L)); + }); }); // test with SearchShards API diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java index c3f5e44ae32a0..876ff9ebdb86f 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java @@ -46,6 +46,7 @@ import org.elasticsearch.indices.IndicesService; import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.search.SearchResponseUtils; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.snapshots.SnapshotsService; @@ -117,19 +118,12 @@ public void testCreateAndRestoreSearchableSnapshot() throws Exception { populateIndex(indexName, 10_000); - final TotalHits originalAllHits = internalCluster().client() - .prepareSearch(indexName) - .setTrackTotalHits(true) - .get() - .getHits() - .getTotalHits(); - final TotalHits originalBarHits = internalCluster().client() - .prepareSearch(indexName) - .setTrackTotalHits(true) - .setQuery(matchQuery("foo", "bar")) - .get() - .getHits() - .getTotalHits(); + final TotalHits originalAllHits = SearchResponseUtils.getTotalHits( + internalCluster().client().prepareSearch(indexName).setTrackTotalHits(true) + ); + final TotalHits originalBarHits = SearchResponseUtils.getTotalHits( + internalCluster().client().prepareSearch(indexName).setTrackTotalHits(true).setQuery(matchQuery("foo", "bar")) + ); logger.info("--> [{}] in total, of which [{}] match the query", originalAllHits, originalBarHits); expectThrows( @@ -765,19 +759,12 @@ public void testSnapshotOfSearchableSnapshotIncludesNoDataButCanBeRestored() thr Settings.builder().put(INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1).put(INDEX_SOFT_DELETES_SETTING.getKey(), true) ); - final TotalHits originalAllHits = internalCluster().client() - .prepareSearch(indexName) - .setTrackTotalHits(true) - .get() - .getHits() - .getTotalHits(); - final TotalHits originalBarHits = internalCluster().client() - .prepareSearch(indexName) - .setTrackTotalHits(true) - .setQuery(matchQuery("foo", "bar")) - .get() - .getHits() - .getTotalHits(); + final TotalHits originalAllHits = SearchResponseUtils.getTotalHits( + internalCluster().client().prepareSearch(indexName).setTrackTotalHits(true) + ); + final TotalHits originalBarHits = SearchResponseUtils.getTotalHits( + internalCluster().client().prepareSearch(indexName).setTrackTotalHits(true).setQuery(matchQuery("foo", "bar")) + ); logger.info("--> [{}] in total, of which [{}] match the query", originalAllHits, originalBarHits); // The repository that contains the actual data @@ -936,19 +923,12 @@ public void testSnapshotOfSearchableSnapshotCanBeRestoredBeforeRepositoryRegiste Settings.builder().put(INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1).put(INDEX_SOFT_DELETES_SETTING.getKey(), true) ); - final TotalHits originalAllHits = internalCluster().client() - .prepareSearch(indexName) - .setTrackTotalHits(true) - .get() - .getHits() - .getTotalHits(); - final TotalHits originalBarHits = internalCluster().client() - .prepareSearch(indexName) - .setTrackTotalHits(true) - .setQuery(matchQuery("foo", "bar")) - .get() - .getHits() - .getTotalHits(); + final TotalHits originalAllHits = SearchResponseUtils.getTotalHits( + internalCluster().client().prepareSearch(indexName).setTrackTotalHits(true) + ); + final TotalHits originalBarHits = SearchResponseUtils.getTotalHits( + internalCluster().client().prepareSearch(indexName).setTrackTotalHits(true).setQuery(matchQuery("foo", "bar")) + ); logger.info("--> [{}] in total, of which [{}] match the query", originalAllHits, originalBarHits); // Take snapshot containing the actual data to one repository diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsRecoverFromSnapshotIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsRecoverFromSnapshotIntegTests.java index 6f71f7c33bf06..894d3af8d75b8 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsRecoverFromSnapshotIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsRecoverFromSnapshotIntegTests.java @@ -15,6 +15,7 @@ import org.elasticsearch.indices.recovery.plan.ShardSnapshotsService; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import org.elasticsearch.repositories.fs.FsRepository; +import org.elasticsearch.search.SearchResponseUtils; import org.elasticsearch.test.MockLogAppender; import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotRequest; @@ -43,12 +44,9 @@ public void testSearchableSnapshotRelocationDoNotUseSnapshotBasedRecoveries() th .put(INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) ); - final TotalHits totalHits = internalCluster().client() - .prepareSearch(indexName) - .setTrackTotalHits(true) - .get() - .getHits() - .getTotalHits(); + final TotalHits totalHits = SearchResponseUtils.getTotalHits( + internalCluster().client().prepareSearch(indexName).setTrackTotalHits(true) + ); final var snapshotName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); createSnapshot(repositoryName, snapshotName, List.of(indexName)); diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsRepositoryIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsRepositoryIntegTests.java index cb6cf45b641c6..f97151f9ae330 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsRepositoryIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsRepositoryIntegTests.java @@ -17,6 +17,7 @@ import org.elasticsearch.core.Nullable; import org.elasticsearch.repositories.RepositoryConflictException; import org.elasticsearch.repositories.fs.FsRepository; +import org.elasticsearch.search.SearchResponseUtils; import org.elasticsearch.snapshots.SnapshotRestoreException; import java.util.Arrays; @@ -49,12 +50,9 @@ public void testRepositoryUsedBySearchableSnapshotCanBeUpdatedButNotUnregistered Settings.builder().put(INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1).put(INDEX_SOFT_DELETES_SETTING.getKey(), true) ); - final TotalHits totalHits = internalCluster().client() - .prepareSearch(indexName) - .setTrackTotalHits(true) - .get() - .getHits() - .getTotalHits(); + final TotalHits totalHits = SearchResponseUtils.getTotalHits( + internalCluster().client().prepareSearch(indexName).setTrackTotalHits(true) + ); final String snapshotName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); createSnapshot(repositoryName, snapshotName, List.of(indexName)); @@ -164,7 +162,9 @@ public void testMountIndexWithDifferentDeletionOfSnapshot() throws Exception { final String index = "index"; createAndPopulateIndex(index, Settings.builder().put(INDEX_SOFT_DELETES_SETTING.getKey(), true)); - final TotalHits totalHits = internalCluster().client().prepareSearch(index).setTrackTotalHits(true).get().getHits().getTotalHits(); + final TotalHits totalHits = SearchResponseUtils.getTotalHits( + internalCluster().client().prepareSearch(index).setTrackTotalHits(true) + ); final String snapshot = "snapshot"; createSnapshot(repository, snapshot, List.of(index)); @@ -220,7 +220,9 @@ public void testDeletionOfSnapshotSettingCannotBeUpdated() throws Exception { final String index = "index"; createAndPopulateIndex(index, Settings.builder().put(INDEX_SOFT_DELETES_SETTING.getKey(), true)); - final TotalHits totalHits = internalCluster().client().prepareSearch(index).setTrackTotalHits(true).get().getHits().getTotalHits(); + final TotalHits totalHits = SearchResponseUtils.getTotalHits( + internalCluster().client().prepareSearch(index).setTrackTotalHits(true) + ); final String snapshot = "snapshot"; createSnapshot(repository, snapshot, List.of(index)); diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheIntegTests.java index b5ebf1104a195..37b3ecfd36959 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheIntegTests.java @@ -37,6 +37,7 @@ import org.elasticsearch.plugins.ClusterPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.reindex.ReindexPlugin; +import org.elasticsearch.search.SearchResponseUtils; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.xcontent.XContentBuilder; @@ -207,11 +208,9 @@ public void testBlobStoreCache() throws Exception { refreshSystemIndex(); - final long numberOfCachedBlobs = systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX) - .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN) - .get() - .getHits() - .getTotalHits().value; + final long numberOfCachedBlobs = SearchResponseUtils.getTotalHitsValue( + systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX).setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN) + ); IndexingStats indexingStats = systemClient().admin() .indices() diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests.java index 04233e47b7bcc..981ffe2832e66 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests.java @@ -29,6 +29,7 @@ import org.elasticsearch.reindex.ReindexPlugin; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.fs.FsRepository; +import org.elasticsearch.search.SearchResponseUtils; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentFactory; @@ -318,16 +319,12 @@ private Client systemClient() { } private long numberOfEntriesInCache() { - var res = systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX) - .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN) - .setTrackTotalHits(true) - .setSize(0) - .get(); - try { - return res.getHits().getTotalHits().value; - } finally { - res.decRef(); - } + return SearchResponseUtils.getTotalHitsValue( + systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX) + .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN) + .setTrackTotalHits(true) + .setSize(0) + ); } private void refreshSystemIndex(boolean failIfNotExist) { diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/shared/NodesCachesStatsIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/shared/NodesCachesStatsIntegTests.java index 3858b087f4d3a..42ac63579b6c6 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/shared/NodesCachesStatsIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/shared/NodesCachesStatsIntegTests.java @@ -112,7 +112,7 @@ public void testNodesCachesStats() throws Exception { randomBoolean() ? QueryBuilders.rangeQuery("id").gte(randomIntBetween(0, 1000)) : QueryBuilders.termQuery("test", "value" + randomIntBetween(0, 1000)) - ).setSize(randomIntBetween(0, 1000)).get(); + ).setSize(randomIntBetween(0, 1000)).get().decRef(); } assertExecutorIsIdle(SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME); diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java index 64508e1d49959..89cab65765bf9 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java @@ -417,7 +417,7 @@ private class PeriodicMaintenanceTask implements Runnable, Releasable { private volatile Map> existingSnapshots; private volatile Set existingRepositories; - private volatile SearchResponse searchResponse; + private final AtomicReference searchResponse = new AtomicReference<>(); private volatile Instant expirationTime; private volatile String pointIntTimeId; private volatile Object[] searchAfter; @@ -458,146 +458,155 @@ public void onFailure(Exception e) { final String pitId = pointIntTimeId; assert Strings.hasLength(pitId); - if (searchResponse == null) { - final SearchSourceBuilder searchSource = new SearchSourceBuilder(); - searchSource.fetchField(new FieldAndFormat(CachedBlob.CREATION_TIME_FIELD, "epoch_millis")); - searchSource.fetchSource(false); - searchSource.trackScores(false); - searchSource.sort(ShardDocSortField.NAME); - searchSource.size(batchSize); - if (searchAfter != null) { - searchSource.searchAfter(searchAfter); - searchSource.trackTotalHits(false); - } else { - searchSource.trackTotalHits(true); + SearchResponse searchResponseRef; + do { + searchResponseRef = searchResponse.get(); + if (searchResponseRef == null) { + handleMissingSearchResponse(pitId); + return; } - final PointInTimeBuilder pointInTime = new PointInTimeBuilder(pitId); - pointInTime.setKeepAlive(keepAlive); - searchSource.pointInTimeBuilder(pointInTime); - final SearchRequest searchRequest = new SearchRequest(); - searchRequest.source(searchSource); - clientWithOrigin.execute(TransportSearchAction.TYPE, searchRequest, new ActionListener<>() { - @Override - public void onResponse(SearchResponse response) { - if (searchAfter == null) { - assert PeriodicMaintenanceTask.this.total.get() == 0L; - PeriodicMaintenanceTask.this.total.set(response.getHits().getTotalHits().value); - } - PeriodicMaintenanceTask.this.searchResponse = response; - PeriodicMaintenanceTask.this.searchAfter = null; - executeNext(PeriodicMaintenanceTask.this); - } - - @Override - public void onFailure(Exception e) { - complete(e); - } - }); - return; + } while (searchResponseRef.tryIncRef() == false); + try { + var searchHits = searchResponseRef.getHits().getHits(); + if (searchHits != null && searchHits.length > 0) { + updateWithSearchHits(searchHits); + return; + } + } finally { + searchResponseRef.decRef(); } + // we're done, complete the task + complete(null); + } catch (Exception e) { + complete(e); + } + } - final SearchHit[] searchHits = searchResponse.getHits().getHits(); - if (searchHits != null && searchHits.length > 0) { - if (expirationTime == null) { - final TimeValue retention = periodicTaskRetention; - expirationTime = Instant.ofEpochMilli(threadPool.absoluteTimeInMillis()) - .minus(retention.duration(), retention.timeUnit().toChronoUnit()); - - final ClusterState state = clusterService.state(); - // compute the list of existing searchable snapshots and repositories once - existingSnapshots = listSearchableSnapshots(state); - existingRepositories = RepositoriesMetadata.get(state) - .repositories() - .stream() - .map(RepositoryMetadata::name) - .collect(Collectors.toSet()); + private void handleMissingSearchResponse(String pitId) { + final SearchSourceBuilder searchSource = new SearchSourceBuilder(); + searchSource.fetchField(new FieldAndFormat(CachedBlob.CREATION_TIME_FIELD, "epoch_millis")); + searchSource.fetchSource(false); + searchSource.trackScores(false); + searchSource.sort(ShardDocSortField.NAME); + searchSource.size(batchSize); + if (searchAfter != null) { + searchSource.searchAfter(searchAfter); + searchSource.trackTotalHits(false); + } else { + searchSource.trackTotalHits(true); + } + final PointInTimeBuilder pointInTime = new PointInTimeBuilder(pitId); + pointInTime.setKeepAlive(keepAlive); + searchSource.pointInTimeBuilder(pointInTime); + final SearchRequest searchRequest = new SearchRequest(); + searchRequest.source(searchSource); + clientWithOrigin.execute(TransportSearchAction.TYPE, searchRequest, new ActionListener<>() { + @Override + public void onResponse(SearchResponse response) { + if (searchAfter == null) { + assert PeriodicMaintenanceTask.this.total.get() == 0L; + PeriodicMaintenanceTask.this.total.set(response.getHits().getTotalHits().value); } + PeriodicMaintenanceTask.this.setCurrentResponse(response); + PeriodicMaintenanceTask.this.searchAfter = null; + executeNext(PeriodicMaintenanceTask.this); + } - final BulkRequest bulkRequest = new BulkRequest(); - final Map> knownSnapshots = existingSnapshots; - assert knownSnapshots != null; - final Set knownRepositories = existingRepositories; - assert knownRepositories != null; - final Instant expirationTimeCopy = this.expirationTime; - assert expirationTimeCopy != null; - - Object[] lastSortValues = null; - for (SearchHit searchHit : searchHits) { - lastSortValues = searchHit.getSortValues(); - assert searchHit.getId() != null; - try { - boolean delete = false; - - // See {@link BlobStoreCacheService#generateId} - // doc id = {repository name}/{snapshot id}/{snapshot index id}/{shard id}/{file name}/@{file offset} - final String[] parts = Objects.requireNonNull(searchHit.getId()).split("/"); - assert parts.length == 6 : Arrays.toString(parts) + " vs " + searchHit.getId(); - - final String repositoryName = parts[0]; - if (knownRepositories.contains(repositoryName) == false) { - logger.trace("deleting blob store cache entry with id [{}]: repository does not exist", searchHit.getId()); - delete = true; - } else { - final Set knownIndexIds = knownSnapshots.get(parts[1]); - if (knownIndexIds == null || knownIndexIds.contains(parts[2]) == false) { - logger.trace("deleting blob store cache entry with id [{}]: not used", searchHit.getId()); - delete = true; - } - } - if (delete) { - final Instant creationTime = getCreationTime(searchHit); - if (creationTime.isAfter(expirationTimeCopy)) { - logger.trace( - "blob store cache entry with id [{}] was created recently, skipping deletion", - searchHit.getId() - ); - continue; - } - bulkRequest.add(new DeleteRequest().index(searchHit.getIndex()).id(searchHit.getId())); - } - } catch (Exception e) { - logger.warn( - () -> format("exception when parsing blob store cache entry with id [%s], skipping", searchHit.getId()), - e - ); + @Override + public void onFailure(Exception e) { + complete(e); + } + }); + } + + private void updateWithSearchHits(SearchHit[] searchHits) { + if (expirationTime == null) { + final TimeValue retention = periodicTaskRetention; + expirationTime = Instant.ofEpochMilli(threadPool.absoluteTimeInMillis()) + .minus(retention.duration(), retention.timeUnit().toChronoUnit()); + + final ClusterState state = clusterService.state(); + // compute the list of existing searchable snapshots and repositories once + existingSnapshots = listSearchableSnapshots(state); + existingRepositories = RepositoriesMetadata.get(state) + .repositories() + .stream() + .map(RepositoryMetadata::name) + .collect(Collectors.toSet()); + } + + final BulkRequest bulkRequest = new BulkRequest(); + final Map> knownSnapshots = existingSnapshots; + assert knownSnapshots != null; + final Set knownRepositories = existingRepositories; + assert knownRepositories != null; + final Instant expirationTimeCopy = this.expirationTime; + assert expirationTimeCopy != null; + + Object[] lastSortValues = null; + for (SearchHit searchHit : searchHits) { + lastSortValues = searchHit.getSortValues(); + assert searchHit.getId() != null; + try { + boolean delete = false; + + // See {@link BlobStoreCacheService#generateId} + // doc id = {repository name}/{snapshot id}/{snapshot index id}/{shard id}/{file name}/@{file offset} + final String[] parts = Objects.requireNonNull(searchHit.getId()).split("/"); + assert parts.length == 6 : Arrays.toString(parts) + " vs " + searchHit.getId(); + + final String repositoryName = parts[0]; + if (knownRepositories.contains(repositoryName) == false) { + logger.trace("deleting blob store cache entry with id [{}]: repository does not exist", searchHit.getId()); + delete = true; + } else { + final Set knownIndexIds = knownSnapshots.get(parts[1]); + if (knownIndexIds == null || knownIndexIds.contains(parts[2]) == false) { + logger.trace("deleting blob store cache entry with id [{}]: not used", searchHit.getId()); + delete = true; } } - - assert lastSortValues != null; - if (bulkRequest.numberOfActions() == 0) { - this.searchResponse = null; - this.searchAfter = lastSortValues; - executeNext(this); - return; + if (delete) { + final Instant creationTime = getCreationTime(searchHit); + if (creationTime.isAfter(expirationTimeCopy)) { + logger.trace("blob store cache entry with id [{}] was created recently, skipping deletion", searchHit.getId()); + continue; + } + bulkRequest.add(new DeleteRequest().index(searchHit.getIndex()).id(searchHit.getId())); } + } catch (Exception e) { + logger.warn(() -> format("exception when parsing blob store cache entry with id [%s], skipping", searchHit.getId()), e); + } + } - final Object[] finalSearchAfter = lastSortValues; - clientWithOrigin.execute(BulkAction.INSTANCE, bulkRequest, new ActionListener<>() { - @Override - public void onResponse(BulkResponse response) { - for (BulkItemResponse itemResponse : response.getItems()) { - if (itemResponse.isFailed() == false) { - assert itemResponse.getResponse() instanceof DeleteResponse; - PeriodicMaintenanceTask.this.deletes.incrementAndGet(); - } - } - PeriodicMaintenanceTask.this.searchResponse = null; - PeriodicMaintenanceTask.this.searchAfter = finalSearchAfter; - executeNext(PeriodicMaintenanceTask.this); - } + assert lastSortValues != null; + if (bulkRequest.numberOfActions() == 0) { + setCurrentResponse(null); + this.searchAfter = lastSortValues; + executeNext(this); + return; + } - @Override - public void onFailure(Exception e) { - complete(e); + final Object[] finalSearchAfter = lastSortValues; + clientWithOrigin.execute(BulkAction.INSTANCE, bulkRequest, new ActionListener<>() { + @Override + public void onResponse(BulkResponse response) { + for (BulkItemResponse itemResponse : response.getItems()) { + if (itemResponse.isFailed() == false) { + assert itemResponse.getResponse() instanceof DeleteResponse; + deletes.incrementAndGet(); } - }); - return; + } + PeriodicMaintenanceTask.this.setCurrentResponse(null); + PeriodicMaintenanceTask.this.searchAfter = finalSearchAfter; + executeNext(PeriodicMaintenanceTask.this); } - // we're done, complete the task - complete(null); - } catch (Exception e) { - complete(e); - } + + @Override + public void onFailure(Exception e) { + complete(e); + } + }); } public boolean isClosed() { @@ -614,6 +623,7 @@ private void ensureOpen() { @Override public void close() { if (closed.compareAndSet(false, true)) { + setCurrentResponse(null); final Exception e = error.get(); if (e != null) { logger.warn( @@ -679,6 +689,16 @@ public void onFailure(Exception e) { } } } + + private void setCurrentResponse(SearchResponse response) { + if (response != null) { + response.mustIncRef(); + } + var previous = searchResponse.getAndSet(response); + if (previous != null) { + previous.decRef(); + } + } } private void executeNext(PeriodicMaintenanceTask maintenanceTask) { From 50d541869a95f6cef74f0f8ce47528a696abe17a Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Sun, 10 Dec 2023 22:27:58 +0100 Subject: [PATCH 007/152] Remove redundant try-catch from some listeners in TransportStartDatafeedAction (#103246) Found this during some listener logic cleanup: We use ActionListener.wrap here already so any exception will make it to onFailure. The way this is set up throwing in onFailure will lead to another call to onFailure, which seems like it could cause or at least hide bugs. --- .../action/TransportStartDatafeedAction.java | 30 +++++++------------ 1 file changed, 11 insertions(+), 19 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java index b02f6339e49c0..65ef493f664f9 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java @@ -269,28 +269,20 @@ public void onFailure(Exception e) { }; ActionListener jobListener = ActionListener.wrap(jobBuilder -> { - try { - Job job = jobBuilder.build(); - validate(job, datafeedConfigHolder.get(), tasks, xContentRegistry); - auditDeprecations(datafeedConfigHolder.get(), job, auditor, xContentRegistry); - createDataExtractor.accept(job); - } catch (Exception e) { - listener.onFailure(e); - } + Job job = jobBuilder.build(); + validate(job, datafeedConfigHolder.get(), tasks, xContentRegistry); + auditDeprecations(datafeedConfigHolder.get(), job, auditor, xContentRegistry); + createDataExtractor.accept(job); }, listener::onFailure); ActionListener datafeedListener = ActionListener.wrap(datafeedBuilder -> { - try { - DatafeedConfig datafeedConfig = datafeedBuilder.build(); - params.setDatafeedIndices(datafeedConfig.getIndices()); - params.setJobId(datafeedConfig.getJobId()); - params.setIndicesOptions(datafeedConfig.getIndicesOptions()); - datafeedConfigHolder.set(datafeedConfig); - - jobConfigProvider.getJob(datafeedConfig.getJobId(), null, jobListener); - } catch (Exception e) { - listener.onFailure(e); - } + DatafeedConfig datafeedConfig = datafeedBuilder.build(); + params.setDatafeedIndices(datafeedConfig.getIndices()); + params.setJobId(datafeedConfig.getJobId()); + params.setIndicesOptions(datafeedConfig.getIndicesOptions()); + datafeedConfigHolder.set(datafeedConfig); + + jobConfigProvider.getJob(datafeedConfig.getJobId(), null, jobListener); }, listener::onFailure); datafeedConfigProvider.getDatafeedConfig(params.getDatafeedId(), null, datafeedListener); From 86933ad2f3255002365616897f476808b68d1d97 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Sun, 10 Dec 2023 22:31:34 +0100 Subject: [PATCH 008/152] Remove deprecated + unused ActionListener#wrap variant (#103243) This isn't used anymore. --- .../java/org/elasticsearch/action/ActionListener.java | 9 --------- 1 file changed, 9 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/ActionListener.java b/server/src/main/java/org/elasticsearch/action/ActionListener.java index 5017f0af0007c..ae5835686425d 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionListener.java +++ b/server/src/main/java/org/elasticsearch/action/ActionListener.java @@ -145,15 +145,6 @@ public String toString() { }; } - /** - * @deprecated in favour of {@link #running(Runnable)} because this implementation doesn't "wrap" exceptions from {@link #onResponse} - * into {@link #onFailure}. - */ - @Deprecated(forRemoval = true) - static ActionListener wrap(Runnable runnable) { - return running(runnable); - } - /** * Creates a listener that executes the appropriate consumer when the response (or failure) is received. This listener is "wrapped" in * the sense that an exception from the {@code onResponse} consumer is passed into the {@code onFailure} consumer. From 394444cd27f2a355959c9c302fedbda53626315b Mon Sep 17 00:00:00 2001 From: Tim Vernum Date: Mon, 11 Dec 2023 16:01:29 +1100 Subject: [PATCH 009/152] [Docs] Custom S3 CA must be reinstalled on upgrade (#103168) This commit updates the docs to call out that custom certificate authorities for S3 repositories will need to be reinstalled every time ES is upgraded, is the node is using the bundled JDK --- docs/reference/snapshot-restore/repository-s3.asciidoc | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/docs/reference/snapshot-restore/repository-s3.asciidoc b/docs/reference/snapshot-restore/repository-s3.asciidoc index 032d4f47bf678..e204061c28458 100644 --- a/docs/reference/snapshot-restore/repository-s3.asciidoc +++ b/docs/reference/snapshot-restore/repository-s3.asciidoc @@ -123,7 +123,9 @@ settings belong in the `elasticsearch.yml` file. `https`. Defaults to `https`. When using HTTPS, this repository type validates the repository's certificate chain using the JVM-wide truststore. Ensure that the root certificate authority is in this truststore using the JVM's - `keytool` tool. + `keytool` tool. If you have a custom certificate authority for your S3 repository + and you use the {es} <>, then you will need to reinstall your + CA certificate every time you upgrade {es}. `proxy.host`:: From da231d9623bab65dd110f5a477081a7edd34a09d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lorenzo=20Dematt=C3=A9?= Date: Mon, 11 Dec 2023 09:28:52 +0100 Subject: [PATCH 010/152] Fix: Watcher API watcher_get_settings clears product header (#103003) Current code do stashing and un-stashing of the context to ignore warning headers about accessing a system index. However, this ignores all header, omitting the X-elastic-product header from the response. This PR removes the stashing/un-stashing and instead calls the concreteIndexNames variant that allows system index access (which does not generate the warning), concreteIndexNamesWithSystemIndexAccess --- docs/changelog/103003.yaml | 6 +++ .../TransportGetWatcherSettingsAction.java | 19 +++---- .../TransportUpdateWatcherSettingsAction.java | 52 +++++++++++-------- 3 files changed, 43 insertions(+), 34 deletions(-) create mode 100644 docs/changelog/103003.yaml diff --git a/docs/changelog/103003.yaml b/docs/changelog/103003.yaml new file mode 100644 index 0000000000000..accacc2b62416 --- /dev/null +++ b/docs/changelog/103003.yaml @@ -0,0 +1,6 @@ +pr: 103003 +summary: "Fix: Watcher REST API `GET /_watcher/settings` now includes product header" +area: "Watcher" +type: bug +issues: + - 102928 diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/TransportGetWatcherSettingsAction.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/TransportGetWatcherSettingsAction.java index eecbc21ad0475..0c52057100860 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/TransportGetWatcherSettingsAction.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/TransportGetWatcherSettingsAction.java @@ -9,7 +9,6 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; @@ -20,7 +19,6 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -28,6 +26,7 @@ import org.elasticsearch.xpack.core.watcher.transport.actions.put.UpdateWatcherSettingsAction; import static org.elasticsearch.xpack.watcher.transport.actions.TransportUpdateWatcherSettingsAction.WATCHER_INDEX_NAME; +import static org.elasticsearch.xpack.watcher.transport.actions.TransportUpdateWatcherSettingsAction.WATCHER_INDEX_REQUEST; public class TransportGetWatcherSettingsAction extends TransportMasterNodeAction< GetWatcherSettingsAction.Request, @@ -61,15 +60,11 @@ protected void masterOperation( ClusterState state, ActionListener listener ) { - final ThreadContext threadContext = threadPool.getThreadContext(); - // Stashing and un-stashing the context allows warning headers about accessing a system index to be ignored. - try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { - IndexMetadata metadata = state.metadata().index(WATCHER_INDEX_NAME); - if (metadata == null) { - listener.onResponse(new GetWatcherSettingsAction.Response(Settings.EMPTY)); - } else { - listener.onResponse(new GetWatcherSettingsAction.Response(filterSettableSettings(metadata.getSettings()))); - } + IndexMetadata metadata = state.metadata().index(WATCHER_INDEX_NAME); + if (metadata == null) { + listener.onResponse(new GetWatcherSettingsAction.Response(Settings.EMPTY)); + } else { + listener.onResponse(new GetWatcherSettingsAction.Response(filterSettableSettings(metadata.getSettings()))); } } @@ -95,7 +90,7 @@ protected ClusterBlockException checkBlock(GetWatcherSettingsAction.Request requ return state.blocks() .indicesBlockedException( ClusterBlockLevel.METADATA_READ, - indexNameExpressionResolver.concreteIndexNames(state, IndicesOptions.LENIENT_EXPAND_OPEN, WATCHER_INDEX_NAME) + indexNameExpressionResolver.concreteIndexNamesWithSystemIndexAccess(state, WATCHER_INDEX_REQUEST) ); } } diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/TransportUpdateWatcherSettingsAction.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/TransportUpdateWatcherSettingsAction.java index 124cb6de5fd40..b9895b2319599 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/TransportUpdateWatcherSettingsAction.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/TransportUpdateWatcherSettingsAction.java @@ -9,6 +9,7 @@ import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsClusterStateUpdateRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.IndicesOptions; @@ -24,7 +25,6 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.index.Index; import org.elasticsearch.logging.LogManager; import org.elasticsearch.logging.Logger; @@ -37,7 +37,19 @@ public class TransportUpdateWatcherSettingsAction extends TransportMasterNodeAct UpdateWatcherSettingsAction.Request, AcknowledgedResponse> { - public static final String WATCHER_INDEX_NAME = ".watches"; + static final String WATCHER_INDEX_NAME = ".watches"; + + static final IndicesRequest WATCHER_INDEX_REQUEST = new IndicesRequest() { + @Override + public String[] indices() { + return new String[] { WATCHER_INDEX_NAME }; + } + + @Override + public IndicesOptions indicesOptions() { + return IndicesOptions.LENIENT_EXPAND_OPEN; + } + }; private static final Logger logger = LogManager.getLogger(TransportUpdateWatcherSettingsAction.class); private final MetadataUpdateSettingsService updateSettingsService; @@ -83,27 +95,23 @@ protected void masterOperation( new Index[] { watcherIndexMd.getIndex() } ).settings(newSettings).ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout()); - final ThreadContext threadContext = threadPool.getThreadContext(); - // Stashing and un-stashing the context allows warning headers about accessing a system index to be ignored. - try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { - updateSettingsService.updateSettings(clusterStateUpdateRequest, new ActionListener<>() { - @Override - public void onResponse(AcknowledgedResponse acknowledgedResponse) { - if (acknowledgedResponse.isAcknowledged()) { - logger.info("successfully updated Watcher service settings to {}", request.settings()); - } else { - logger.warn("updating Watcher service settings to {} was not acknowledged", request.settings()); - } - listener.onResponse(acknowledgedResponse); + updateSettingsService.updateSettings(clusterStateUpdateRequest, new ActionListener<>() { + @Override + public void onResponse(AcknowledgedResponse acknowledgedResponse) { + if (acknowledgedResponse.isAcknowledged()) { + logger.info("successfully updated Watcher service settings to {}", request.settings()); + } else { + logger.warn("updating Watcher service settings to {} was not acknowledged", request.settings()); } + listener.onResponse(acknowledgedResponse); + } - @Override - public void onFailure(Exception e) { - logger.debug(() -> "failed to update settings for Watcher service", e); - listener.onFailure(e); - } - }); - } + @Override + public void onFailure(Exception e) { + logger.debug(() -> "failed to update settings for Watcher service", e); + listener.onFailure(e); + } + }); } @Override @@ -115,7 +123,7 @@ protected ClusterBlockException checkBlock(UpdateWatcherSettingsAction.Request r return state.blocks() .indicesBlockedException( ClusterBlockLevel.METADATA_WRITE, - indexNameExpressionResolver.concreteIndexNames(state, IndicesOptions.LENIENT_EXPAND_OPEN, WATCHER_INDEX_NAME) + indexNameExpressionResolver.concreteIndexNamesWithSystemIndexAccess(state, WATCHER_INDEX_REQUEST) ); } } From cd2e2946fa7da88ba5095e418be82d5f100b9f18 Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 11 Dec 2023 08:37:40 +0000 Subject: [PATCH 011/152] Fix format string in OldLuceneVersions (#103185) The message introduced in #92986 used `{}` instead of `%s` for the placeholders, which doesn't work for `Strings.format`. This commit fixes the problem. --- docs/changelog/103185.yaml | 5 +++++ .../elasticsearch/xpack/lucene/bwc/OldLuceneVersions.java | 4 ++-- 2 files changed, 7 insertions(+), 2 deletions(-) create mode 100644 docs/changelog/103185.yaml diff --git a/docs/changelog/103185.yaml b/docs/changelog/103185.yaml new file mode 100644 index 0000000000000..3a1a4960ba98c --- /dev/null +++ b/docs/changelog/103185.yaml @@ -0,0 +1,5 @@ +pr: 103185 +summary: Fix format string in `OldLuceneVersions` +area: Search +type: bug +issues: [] diff --git a/x-pack/plugin/old-lucene-versions/src/main/java/org/elasticsearch/xpack/lucene/bwc/OldLuceneVersions.java b/x-pack/plugin/old-lucene-versions/src/main/java/org/elasticsearch/xpack/lucene/bwc/OldLuceneVersions.java index 406ea50315de0..42fe09691d249 100644 --- a/x-pack/plugin/old-lucene-versions/src/main/java/org/elasticsearch/xpack/lucene/bwc/OldLuceneVersions.java +++ b/x-pack/plugin/old-lucene-versions/src/main/java/org/elasticsearch/xpack/lucene/bwc/OldLuceneVersions.java @@ -170,8 +170,8 @@ private static void convertToNewFormat(IndexShard indexShard) { throw new UncheckedIOException( Strings.format( """ - Elasticsearch version [{}] has limited support for indices created with version [{}] but this index could not be \ - read. It may be using an unsupported feature, or it may be damaged or corrupt. See {} for further information.""", + Elasticsearch version [%s] has limited support for indices created with version [%s] but this index could not be \ + read. It may be using an unsupported feature, or it may be damaged or corrupt. See %s for further information.""", Build.current().version(), IndexMetadata.SETTING_INDEX_VERSION_CREATED.get(indexShard.indexSettings().getSettings()), ReferenceDocs.ARCHIVE_INDICES From dd0d9eca48504a082507f31fb3350752f456998a Mon Sep 17 00:00:00 2001 From: Tim Grein Date: Mon, 11 Dec 2023 10:16:15 +0100 Subject: [PATCH 012/152] [Connectors API] Check connector sync job stats after they were updated using another get request Assert that the connector sync job stats were updated in integration tests --- .../460_connector_sync_job_update_stats.yml | 40 ++++++++++++++++++- 1 file changed, 38 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/ent-search/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/entsearch/460_connector_sync_job_update_stats.yml b/x-pack/plugin/ent-search/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/entsearch/460_connector_sync_job_update_stats.yml index 0e69866ce8b6c..0c7300bd2b436 100644 --- a/x-pack/plugin/ent-search/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/entsearch/460_connector_sync_job_update_stats.yml +++ b/x-pack/plugin/ent-search/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/entsearch/460_connector_sync_job_update_stats.yml @@ -2,6 +2,7 @@ setup: - skip: version: " - 8.11.99" reason: Introduced in 8.12.0 + - do: connector.put: connector_id: test-connector @@ -20,7 +21,9 @@ setup: id: test-connector job_type: full trigger_method: on_demand + - set: { id: id } + - do: connector_sync_job.update_stats: connector_sync_job_id: $id @@ -31,6 +34,13 @@ setup: - match: { acknowledged: true } + - do: + connector_sync_job.get: + connector_sync_job_id: $id + + - match: { deleted_document_count: 10 } + - match: { indexed_document_count: 20 } + - match: { indexed_document_volume: 1000 } --- "Update the ingestion stats for a connector sync job - negative deleted document count error": @@ -40,7 +50,9 @@ setup: id: test-connector job_type: full trigger_method: on_demand + - set: { id: id } + - do: connector_sync_job.update_stats: connector_sync_job_id: $id @@ -50,7 +62,6 @@ setup: indexed_document_volume: 1000 catch: bad_request - --- "Update the ingestion stats for a connector sync job - negative indexed document count error": - do: @@ -59,7 +70,9 @@ setup: id: test-connector job_type: full trigger_method: on_demand + - set: { id: id } + - do: connector_sync_job.update_stats: connector_sync_job_id: $id @@ -69,7 +82,6 @@ setup: indexed_document_volume: 1000 catch: bad_request - --- "Update the ingestion stats for a connector sync job - negative indexed document volume error": - do: @@ -78,7 +90,9 @@ setup: id: test-connector job_type: full trigger_method: on_demand + - set: { id: id } + - do: connector_sync_job.update_stats: connector_sync_job_id: $id @@ -96,7 +110,9 @@ setup: id: test-connector job_type: full trigger_method: on_demand + - set: { id: id } + - do: connector_sync_job.update_stats: connector_sync_job_id: $id @@ -115,7 +131,9 @@ setup: id: test-connector job_type: full trigger_method: on_demand + - set: { id: id } + - do: connector_sync_job.update_stats: connector_sync_job_id: $id @@ -127,6 +145,14 @@ setup: - match: { acknowledged: true } + - do: + connector_sync_job.get: + connector_sync_job_id: $id + + - match: { deleted_document_count: 10 } + - match: { indexed_document_count: 20 } + - match: { indexed_document_volume: 1000 } + - match: { total_document_count: 20 } --- "Update the ingestion stats for a connector sync job - with optional last_seen": @@ -137,6 +163,7 @@ setup: job_type: full trigger_method: on_demand - set: { id: id } + - do: connector_sync_job.update_stats: connector_sync_job_id: $id @@ -148,6 +175,15 @@ setup: - match: { acknowledged: true } + - do: + connector_sync_job.get: + connector_sync_job_id: $id + + - match: { deleted_document_count: 10 } + - match: { indexed_document_count: 20 } + - match: { indexed_document_volume: 1000 } + - match: { last_seen: 2023-12-04T08:45:50.567149Z } + --- "Update the ingestion stats for a Connector Sync Job - Connector Sync Job does not exist": - do: From ce9e91937e256598d2b861b63953800c2a723d15 Mon Sep 17 00:00:00 2001 From: Tim Grein Date: Mon, 11 Dec 2023 10:19:00 +0100 Subject: [PATCH 013/152] [Connectors API] Verify that last_seen was updated in the check-in integration tests (#103194) Extend the check-in integration tests to verify that last_seen was updated correctly after a check-in. --- .../entsearch/420_connector_sync_job_check_in.yml | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/x-pack/plugin/ent-search/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/entsearch/420_connector_sync_job_check_in.yml b/x-pack/plugin/ent-search/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/entsearch/420_connector_sync_job_check_in.yml index 9ef37f4a9fe60..d08f7f6a51c91 100644 --- a/x-pack/plugin/ent-search/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/entsearch/420_connector_sync_job_check_in.yml +++ b/x-pack/plugin/ent-search/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/entsearch/420_connector_sync_job_check_in.yml @@ -2,6 +2,7 @@ setup: - skip: version: " - 8.11.99" reason: Introduced in 8.12.0 + features: is_after - do: connector.put: connector_id: test-connector @@ -20,13 +21,26 @@ setup: id: test-connector job_type: full trigger_method: on_demand + - set: { id: sync-job-id-to-check-in } + + - do: + connector_sync_job.get: + connector_sync_job_id: $sync-job-id-to-check-in + + - set: { last_seen: last_seen_before_check_in } + - do: connector_sync_job.check_in: connector_sync_job_id: $sync-job-id-to-check-in - match: { acknowledged: true } + - do: + connector_sync_job.get: + connector_sync_job_id: $sync-job-id-to-check-in + + - is_after: { last_seen: $last_seen_before_check_in } --- "Check in a Connector Sync Job - Connector Sync Job does not exist": From 784c49b2b809d33bde6aac922f218dd98173594a Mon Sep 17 00:00:00 2001 From: Tim Grein Date: Mon, 11 Dec 2023 11:18:59 +0100 Subject: [PATCH 014/152] [Connectors API] Fix: Handle nullable fields correctly in the ConnectorSyncJob parser (#103183) Declare nullable fields correctly in the ConnectorSyncJob parser. --- docs/changelog/103183.yaml | 6 ++ .../connector/syncjob/ConnectorSyncJob.java | 31 +++++-- .../syncjob/ConnectorSyncJobTests.java | 92 ++++++++++++++++++- 3 files changed, 122 insertions(+), 7 deletions(-) create mode 100644 docs/changelog/103183.yaml diff --git a/docs/changelog/103183.yaml b/docs/changelog/103183.yaml new file mode 100644 index 0000000000000..cb28033cff6a7 --- /dev/null +++ b/docs/changelog/103183.yaml @@ -0,0 +1,6 @@ +pr: 103183 +summary: "[Connectors API] Handle nullable fields correctly in the `ConnectorSyncJob`\ + \ parser" +area: Application +type: bug +issues: [] diff --git a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/syncjob/ConnectorSyncJob.java b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/syncjob/ConnectorSyncJob.java index f14d0fa52b1c7..c63cb1921adc6 100644 --- a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/syncjob/ConnectorSyncJob.java +++ b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/syncjob/ConnectorSyncJob.java @@ -263,12 +263,22 @@ public ConnectorSyncJob(StreamInput in) throws IOException { static { PARSER.declareField( optionalConstructorArg(), - (p, c) -> Instant.parse(p.text()), + (p, c) -> parseNullableInstant(p), CANCELATION_REQUESTED_AT_FIELD, ObjectParser.ValueType.STRING_OR_NULL ); - PARSER.declareField(optionalConstructorArg(), (p, c) -> Instant.parse(p.text()), CANCELED_AT_FIELD, ObjectParser.ValueType.STRING); - PARSER.declareField(optionalConstructorArg(), (p, c) -> Instant.parse(p.text()), COMPLETED_AT_FIELD, ObjectParser.ValueType.STRING); + PARSER.declareField( + optionalConstructorArg(), + (p, c) -> parseNullableInstant(p), + CANCELED_AT_FIELD, + ObjectParser.ValueType.STRING_OR_NULL + ); + PARSER.declareField( + optionalConstructorArg(), + (p, c) -> parseNullableInstant(p), + COMPLETED_AT_FIELD, + ObjectParser.ValueType.STRING_OR_NULL + ); PARSER.declareField( constructorArg(), (p, c) -> ConnectorSyncJob.syncJobConnectorFromXContent(p), @@ -287,9 +297,14 @@ public ConnectorSyncJob(StreamInput in) throws IOException { JOB_TYPE_FIELD, ObjectParser.ValueType.STRING ); - PARSER.declareField(constructorArg(), (p, c) -> Instant.parse(p.text()), LAST_SEEN_FIELD, ObjectParser.ValueType.STRING); + PARSER.declareField(constructorArg(), (p, c) -> parseNullableInstant(p), LAST_SEEN_FIELD, ObjectParser.ValueType.STRING_OR_NULL); PARSER.declareField(constructorArg(), (p, c) -> p.map(), METADATA_FIELD, ObjectParser.ValueType.OBJECT); - PARSER.declareField(optionalConstructorArg(), (p, c) -> Instant.parse(p.text()), STARTED_AT_FIELD, ObjectParser.ValueType.STRING); + PARSER.declareField( + optionalConstructorArg(), + (p, c) -> parseNullableInstant(p), + STARTED_AT_FIELD, + ObjectParser.ValueType.STRING_OR_NULL + ); PARSER.declareField( constructorArg(), (p, c) -> ConnectorSyncStatus.fromString(p.text()), @@ -303,7 +318,11 @@ public ConnectorSyncJob(StreamInput in) throws IOException { TRIGGER_METHOD_FIELD, ObjectParser.ValueType.STRING ); - PARSER.declareString(optionalConstructorArg(), WORKER_HOSTNAME_FIELD); + PARSER.declareStringOrNull(optionalConstructorArg(), WORKER_HOSTNAME_FIELD); + } + + private static Instant parseNullableInstant(XContentParser p) throws IOException { + return p.currentToken() == XContentParser.Token.VALUE_NULL ? null : Instant.parse(p.text()); } @SuppressWarnings("unchecked") diff --git a/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/syncjob/ConnectorSyncJobTests.java b/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/syncjob/ConnectorSyncJobTests.java index 49a3f0c4ad043..04629b6ee9751 100644 --- a/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/syncjob/ConnectorSyncJobTests.java +++ b/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/syncjob/ConnectorSyncJobTests.java @@ -158,7 +158,7 @@ public void testFromXContent_WithAllFields_AllSet() throws IOException { assertThat(syncJob.getWorkerHostname(), equalTo("worker-hostname")); } - public void testFromXContent_WithAllNonOptionalFieldsSet_DoesNotThrow() throws IOException { + public void testFromXContent_WithOnlyNonNullableFieldsSet_DoesNotThrow() throws IOException { String content = XContentHelper.stripWhitespace(""" { "connector": { @@ -242,6 +242,96 @@ public void testFromXContent_WithAllNonOptionalFieldsSet_DoesNotThrow() throws I ConnectorSyncJob.fromXContentBytes(new BytesArray(content), XContentType.JSON); } + public void testFromXContent_WithAllNullableFieldsSetToNull_DoesNotThrow() throws IOException { + String content = XContentHelper.stripWhitespace(""" + { + "cancelation_requested_at": null, + "canceled_at": null, + "completed_at": null, + "connector": { + "id": "connector-id", + "filtering": [ + { + "active": { + "advanced_snippet": { + "created_at": "2023-12-01T14:18:37.397819Z", + "updated_at": "2023-12-01T14:18:37.397819Z", + "value": {} + }, + "rules": [ + { + "created_at": "2023-12-01T14:18:37.397819Z", + "field": "_", + "id": "DEFAULT", + "order": 0, + "policy": "include", + "rule": "regex", + "updated_at": "2023-12-01T14:18:37.397819Z", + "value": ".*" + } + ], + "validation": { + "errors": [], + "state": "valid" + } + }, + "domain": "DEFAULT", + "draft": { + "advanced_snippet": { + "created_at": "2023-12-01T14:18:37.397819Z", + "updated_at": "2023-12-01T14:18:37.397819Z", + "value": {} + }, + "rules": [ + { + "created_at": "2023-12-01T14:18:37.397819Z", + "field": "_", + "id": "DEFAULT", + "order": 0, + "policy": "include", + "rule": "regex", + "updated_at": "2023-12-01T14:18:37.397819Z", + "value": ".*" + } + ], + "validation": { + "errors": [], + "state": "valid" + } + } + } + ], + "index_name": "search-connector", + "language": "english", + "pipeline": { + "extract_binary_content": true, + "name": "ent-search-generic-ingestion", + "reduce_whitespace": true, + "run_ml_inference": false + }, + "service_type": "service type", + "configuration": {} + }, + "created_at": "2023-12-01T14:18:43.07693Z", + "deleted_document_count": 10, + "error": null, + "id": "HIC-JYwB9RqKhB7x_hIE", + "indexed_document_count": 10, + "indexed_document_volume": 10, + "job_type": "full", + "last_seen": null, + "metadata": {}, + "started_at": null, + "status": "canceling", + "total_document_count": 0, + "trigger_method": "scheduled", + "worker_hostname": null + } + """); + + ConnectorSyncJob.fromXContentBytes(new BytesArray(content), XContentType.JSON); + } + private void assertTransportSerialization(ConnectorSyncJob testInstance) throws IOException { ConnectorSyncJob deserializedInstance = copyInstance(testInstance); assertNotSame(testInstance, deserializedInstance); From 7ddf450d1961296445315c185c84b2dffb6873a7 Mon Sep 17 00:00:00 2001 From: Abdon Pijpelink Date: Mon, 11 Dec 2023 11:23:27 +0100 Subject: [PATCH 015/152] [DOCS] Empty keys for ES|QL DISSECT (#102632) * [DOCS] Empty keys for ES|QL DISSECT * Update test --- ...sql-process-data-with-dissect-grok.asciidoc | 18 ++++++------------ .../src/main/resources/docs.csv-spec | 1 - 2 files changed, 6 insertions(+), 13 deletions(-) diff --git a/docs/reference/esql/esql-process-data-with-dissect-grok.asciidoc b/docs/reference/esql/esql-process-data-with-dissect-grok.asciidoc index a13633a9f8d92..87748fee4f202 100644 --- a/docs/reference/esql/esql-process-data-with-dissect-grok.asciidoc +++ b/docs/reference/esql/esql-process-data-with-dissect-grok.asciidoc @@ -62,9 +62,8 @@ clientip:keyword | @timestamp:keyword | status:keyword include::../ingest/processors/dissect.asciidoc[tag=intro-example-explanation] -A <> can be used to match values, but -exclude the value from the output. -// TODO: Change back to original text when https://github.com/elastic/elasticsearch/pull/102580 is merged +An empty key (`%{}`) or <> can be used to +match values, but exclude the value from the output. All matched values are output as keyword string data types. Use the <> to convert to another data type. @@ -137,8 +136,6 @@ include::{esql-specs}/docs.csv-spec[tag=dissectRightPaddingModifier] include::{esql-specs}/docs.csv-spec[tag=dissectRightPaddingModifier-result] |=== -//// -// TODO: Re-enable when https://github.com/elastic/elasticsearch/pull/102580 is merged include::../ingest/processors/dissect.asciidoc[tag=dissect-modifier-empty-right-padding] For example: @@ -150,7 +147,6 @@ include::{esql-specs}/docs.csv-spec[tag=dissectEmptyRightPaddingModifier] |=== include::{esql-specs}/docs.csv-spec[tag=dissectEmptyRightPaddingModifier-result] |=== -//// [[esql-append-modifier]] ====== Append modifier (`+`) @@ -180,11 +176,9 @@ include::{esql-specs}/docs.csv-spec[tag=dissectAppendWithOrderModifier-result] [[esql-named-skip-key]] ====== Named skip key (`?`) -// include::../ingest/processors/dissect.asciidoc[tag=named-skip-key] -// TODO: Re-enable when https://github.com/elastic/elasticsearch/pull/102580 is merged - -Dissect supports ignoring matches in the final result. This can be done with a -named skip key using the `{?name}` syntax: +include::../ingest/processors/dissect.asciidoc[tag=named-skip-key] +This can be done with a named skip key using the `{?name}` syntax. In the +following query, `ident` and `auth` are not added to the output table: [source.merge.styled,esql] ---- @@ -199,7 +193,7 @@ include::{esql-specs}/docs.csv-spec[tag=dissectNamedSkipKey-result] ===== Limitations // tag::dissect-limitations[] -The `DISSECT` command does not support reference keys and empty keys. +The `DISSECT` command does not support reference keys. // end::dissect-limitations[] [[esql-process-data-with-grok]] diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/docs.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/docs.csv-spec index a754194739992..ef3e43aa6d8ab 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/docs.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/docs.csv-spec @@ -574,7 +574,6 @@ dissectEmptyRightPaddingModifier ROW message="[1998-08-10T17:15:42] [WARN]" | DISSECT message "[%{ts}]%{->}[%{level}]" // end::dissectEmptyRightPaddingModifier[] -| KEEP message, ts, level ; // tag::dissectEmptyRightPaddingModifier-result[] From df450348f7e02782415fba33551c867bada9521a Mon Sep 17 00:00:00 2001 From: Simon Cooper Date: Mon, 11 Dec 2023 10:41:59 +0000 Subject: [PATCH 016/152] Close rather than stop HttpServerTransport on shutdown (#102759) Also add a clearer exception message when a lifecycle object is starting when the ES process is shutting down --- docs/changelog/102759.yaml | 6 +++++ .../bootstrap/Elasticsearch.java | 2 ++ .../bootstrap/ElasticsearchProcess.java | 24 +++++++++++++++++++ .../common/component/Lifecycle.java | 8 ++++++- .../java/org/elasticsearch/node/Node.java | 5 +--- .../org/elasticsearch/node/NodeTests.java | 7 ++++++ 6 files changed, 47 insertions(+), 5 deletions(-) create mode 100644 docs/changelog/102759.yaml create mode 100644 server/src/main/java/org/elasticsearch/bootstrap/ElasticsearchProcess.java diff --git a/docs/changelog/102759.yaml b/docs/changelog/102759.yaml new file mode 100644 index 0000000000000..1c002ef2b678e --- /dev/null +++ b/docs/changelog/102759.yaml @@ -0,0 +1,6 @@ +pr: 102759 +summary: Close rather than stop `HttpServerTransport` on shutdown +area: Infra/Node Lifecycle +type: bug +issues: + - 102501 diff --git a/server/src/main/java/org/elasticsearch/bootstrap/Elasticsearch.java b/server/src/main/java/org/elasticsearch/bootstrap/Elasticsearch.java index eb2c2b7f6738e..dd60a2085acc1 100644 --- a/server/src/main/java/org/elasticsearch/bootstrap/Elasticsearch.java +++ b/server/src/main/java/org/elasticsearch/bootstrap/Elasticsearch.java @@ -460,6 +460,8 @@ private void start() throws NodeValidationException { } private static void shutdown() { + ElasticsearchProcess.markStopping(); + if (INSTANCE == null) { return; // never got far enough } diff --git a/server/src/main/java/org/elasticsearch/bootstrap/ElasticsearchProcess.java b/server/src/main/java/org/elasticsearch/bootstrap/ElasticsearchProcess.java new file mode 100644 index 0000000000000..7397bb98322f5 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/bootstrap/ElasticsearchProcess.java @@ -0,0 +1,24 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.bootstrap; + +/** + * Helper class to determine if the ES process is shutting down + */ +public class ElasticsearchProcess { + private static volatile boolean stopping; + + static void markStopping() { + stopping = true; + } + + public static boolean isStopping() { + return stopping; + } +} diff --git a/server/src/main/java/org/elasticsearch/common/component/Lifecycle.java b/server/src/main/java/org/elasticsearch/common/component/Lifecycle.java index 793975048f846..1488963ab2644 100644 --- a/server/src/main/java/org/elasticsearch/common/component/Lifecycle.java +++ b/server/src/main/java/org/elasticsearch/common/component/Lifecycle.java @@ -8,6 +8,8 @@ package org.elasticsearch.common.component; +import org.elasticsearch.bootstrap.ElasticsearchProcess; + /** * Lifecycle state. Allows the following transitions: *