From 051e1d6c3cc04f325703ca846dc84f58a0ece00e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20B=C3=BCscher?= Date: Thu, 27 Jan 2022 15:22:49 +0100 Subject: [PATCH] Introduce flag for testing CCS compatibility (#81809) CCS works with a subset of APIs and features depending on the versions of the clusters being communicated with. Currently we limit this CCS compliance to one minor version backward and one minor forward. This change adds a setting useful for testing in clients like Kibana that can be turned on to check if a search request sent to one of the endpoints that are supporting CCS is compatible with a cluster that is on one minor version back. We do this by trying to serialize the request to a stream with the earlier version. Features and components that are not supported in that version should throw errors upon atempted serialization to indicate they are not compatible. In addition we need components extending NamedWriteable (e.g. new queries) to also error when they are written to a stream that has a version before the version they were released. --- .../mustache/MultiSearchTemplateIT.java | 42 ++++++- .../script/mustache/SearchTemplateIT.java | 35 +++++- .../query/plugin/CustomQueryParserIT.java | 2 + .../search/msearch/MultiSearchIT.java | 40 +++++++ .../indices/resolve/ResolveIndexAction.java | 8 ++ .../TransportFieldCapabilitiesAction.java | 8 ++ .../action/search/TransportSearchAction.java | 6 + .../action/search/TransportSearchHelper.java | 51 ++++++++- .../stream/VersionCheckingStreamOutput.java | 81 ++++++++++++++ .../common/settings/ClusterSettings.java | 1 + .../index/query/QueryBuilder.java | 10 +- .../search/SearchExtBuilder.java | 12 +- .../elasticsearch/search/SearchService.java | 6 + .../aggregations/AggregationBuilder.java | 10 +- .../PipelineAggregationBuilder.java | 10 +- .../search/rescore/QueryRescorerBuilder.java | 6 + .../search/rescore/RescorerBuilder.java | 4 +- .../search/sort/FieldSortBuilder.java | 5 + .../search/sort/GeoDistanceSortBuilder.java | 5 + .../search/sort/ScoreSortBuilder.java | 6 + .../search/sort/ScriptSortBuilder.java | 5 + .../search/sort/SortBuilder.java | 8 +- .../search/suggest/SuggestionBuilder.java | 4 +- .../CompletionSuggestionBuilder.java | 6 + .../phrase/PhraseSuggestionBuilder.java | 6 + .../suggest/term/TermSuggestionBuilder.java | 6 + .../TransportResolveIndexActionTests.java | 94 ++++++++++++++++ ...TransportFieldCapabilitiesActionTests.java | 103 ++++++++++++++++++ .../search/TransportSearchActionTests.java | 81 ++++++++++++++ .../search/TransportSearchHelperTests.java | 20 ++++ .../VersionCheckingStreamOutputTests.java | 54 +++++++++ .../query/SpanMultiTermQueryBuilderTests.java | 6 + .../search/RestMultiSearchActionTests.java | 6 - .../search/SearchModuleTests.java | 11 ++ .../search}/DummyQueryBuilder.java | 4 +- .../search}/DummyQueryParserPlugin.java | 13 ++- .../FailBeforeCurrentVersionQueryBuilder.java | 54 +++++++++ .../aggregations/AggregatorTestCase.java | 5 + .../xpack/search/AsyncSearchActionIT.java | 26 +++++ .../action/TransportTermsEnumAction.java | 6 + .../TransportTermsEnumActionTests.java | 42 +++++++ 41 files changed, 878 insertions(+), 30 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/common/io/stream/VersionCheckingStreamOutput.java create mode 100644 server/src/test/java/org/elasticsearch/action/admin/indices/resolve/TransportResolveIndexActionTests.java create mode 100644 server/src/test/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesActionTests.java create mode 100644 server/src/test/java/org/elasticsearch/common/io/stream/VersionCheckingStreamOutputTests.java rename {server/src/test/java/org/elasticsearch/index/query/plugin => test/framework/src/main/java/org/elasticsearch/search}/DummyQueryBuilder.java (93%) rename {server/src/test/java/org/elasticsearch/index/query/plugin => test/framework/src/main/java/org/elasticsearch/search}/DummyQueryParserPlugin.java (80%) create mode 100644 test/framework/src/main/java/org/elasticsearch/search/FailBeforeCurrentVersionQueryBuilder.java diff --git a/modules/lang-mustache/src/internalClusterTest/java/org/elasticsearch/script/mustache/MultiSearchTemplateIT.java b/modules/lang-mustache/src/internalClusterTest/java/org/elasticsearch/script/mustache/MultiSearchTemplateIT.java index 35177c743f400..302016a19dfde 100644 --- a/modules/lang-mustache/src/internalClusterTest/java/org/elasticsearch/script/mustache/MultiSearchTemplateIT.java +++ b/modules/lang-mustache/src/internalClusterTest/java/org/elasticsearch/script/mustache/MultiSearchTemplateIT.java @@ -8,23 +8,30 @@ package org.elasticsearch.script.mustache; +import org.elasticsearch.Version; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.script.ScriptType; +import org.elasticsearch.script.mustache.MultiSearchTemplateResponse.Item; +import org.elasticsearch.search.DummyQueryParserPlugin; +import org.elasticsearch.search.SearchService; import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.xcontent.json.JsonXContent; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder; import static org.hamcrest.Matchers.arrayWithSize; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.instanceOf; @@ -34,7 +41,15 @@ public class MultiSearchTemplateIT extends ESIntegTestCase { @Override protected Collection> nodePlugins() { - return Collections.singleton(MustachePlugin.class); + return List.of(MustachePlugin.class, DummyQueryParserPlugin.class); + } + + @Override + protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal, otherSettings)) + .put(SearchService.CCS_VERSION_CHECK_SETTING.getKey(), "true") + .build(); } public void testBasic() throws Exception { @@ -164,4 +179,27 @@ public void testBasic() throws Exception { assertThat(searchTemplateResponse5.hasResponse(), is(false)); assertThat(searchTemplateResponse5.getSource().utf8ToString(), equalTo("{\"query\":{\"terms\":{\"group\":[1,2,3,]}}}")); } + + /** + * Test that triggering the CCS compatibility check with a query that shouldn't go to the minor before Version.CURRENT works + */ + public void testCCSCheckCompatibility() throws Exception { + String templateString = """ + { + "source": "{ \\"query\\":{\\"fail_before_current_version\\":{}} }" + }"""; + SearchTemplateRequest searchTemplateRequest = SearchTemplateRequest.fromXContent( + createParser(JsonXContent.jsonXContent, templateString) + ); + searchTemplateRequest.setRequest(new SearchRequest()); + MultiSearchTemplateRequest request = new MultiSearchTemplateRequest(); + request.add(searchTemplateRequest); + MultiSearchTemplateResponse multiSearchTemplateResponse = client().execute(MultiSearchTemplateAction.INSTANCE, request).get(); + Item response = multiSearchTemplateResponse.getResponses()[0]; + assertTrue(response.isFailure()); + Exception ex = response.getFailure(); + assertThat(ex.getMessage(), containsString("[class org.elasticsearch.action.search.SearchRequest] is not compatible with version")); + assertThat(ex.getMessage(), containsString("'search.check_ccs_compatibility' setting is enabled.")); + assertEquals("This query isn't serializable to nodes before " + Version.CURRENT, ex.getCause().getMessage()); + } } diff --git a/modules/lang-mustache/src/internalClusterTest/java/org/elasticsearch/script/mustache/SearchTemplateIT.java b/modules/lang-mustache/src/internalClusterTest/java/org/elasticsearch/script/mustache/SearchTemplateIT.java index 751f933f5d775..7d9684334e702 100644 --- a/modules/lang-mustache/src/internalClusterTest/java/org/elasticsearch/script/mustache/SearchTemplateIT.java +++ b/modules/lang-mustache/src/internalClusterTest/java/org/elasticsearch/script/mustache/SearchTemplateIT.java @@ -8,12 +8,16 @@ package org.elasticsearch.script.mustache; import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.Version; import org.elasticsearch.action.admin.cluster.storedscripts.GetStoredScriptResponse; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.script.ScriptType; +import org.elasticsearch.search.DummyQueryParserPlugin; +import org.elasticsearch.search.SearchService; import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.xcontent.json.JsonXContent; @@ -23,7 +27,9 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; @@ -38,7 +44,12 @@ public class SearchTemplateIT extends ESSingleNodeTestCase { @Override protected Collection> getPlugins() { - return Collections.singleton(MustachePlugin.class); + return List.of(MustachePlugin.class, DummyQueryParserPlugin.class); + } + + @Override + protected Settings nodeSettings() { + return Settings.builder().put(SearchService.CCS_VERSION_CHECK_SETTING.getKey(), "true").build(); } @Before @@ -346,4 +357,26 @@ public void testIndexedTemplateWithArray() throws Exception { assertHitCount(searchResponse.getResponse(), 5); } + /** + * Test that triggering the CCS compatibility check with a query that shouldn't go to the minor before Version.CURRENT works + */ + public void testCCSCheckCompatibility() throws Exception { + String templateString = """ + { + "source": "{ \\"query\\":{\\"fail_before_current_version\\":{}} }" + }"""; + SearchTemplateRequest request = SearchTemplateRequest.fromXContent(createParser(JsonXContent.jsonXContent, templateString)); + request.setRequest(new SearchRequest()); + ExecutionException ex = expectThrows( + ExecutionException.class, + () -> client().execute(SearchTemplateAction.INSTANCE, request).get() + ); + assertThat( + ex.getCause().getMessage(), + containsString("[class org.elasticsearch.action.search.SearchRequest] is not compatible with version") + ); + assertThat(ex.getCause().getMessage(), containsString("'search.check_ccs_compatibility' setting is enabled.")); + assertEquals("This query isn't serializable to nodes before " + Version.CURRENT, ex.getCause().getCause().getMessage()); + } + } diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/query/plugin/CustomQueryParserIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/query/plugin/CustomQueryParserIT.java index 182effc3626c1..2aee517aa4531 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/query/plugin/CustomQueryParserIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/query/plugin/CustomQueryParserIT.java @@ -10,6 +10,8 @@ import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.search.DummyQueryBuilder; +import org.elasticsearch.search.DummyQueryParserPlugin; import org.elasticsearch.test.ESIntegTestCase; import org.junit.Before; diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/msearch/MultiSearchIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/msearch/MultiSearchIT.java index 4de2f53423061..ecca33f08d9da 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/search/msearch/MultiSearchIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/msearch/MultiSearchIT.java @@ -8,9 +8,13 @@ package org.elasticsearch.search.msearch; +import org.elasticsearch.Version; import org.elasticsearch.action.search.MultiSearchRequest; import org.elasticsearch.action.search.MultiSearchResponse; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.DummyQueryBuilder; +import org.elasticsearch.search.SearchService; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.xcontent.XContentType; @@ -22,6 +26,14 @@ public class MultiSearchIT extends ESIntegTestCase { + @Override + protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal, otherSettings)) + .put(SearchService.CCS_VERSION_CHECK_SETTING.getKey(), "true") + .build(); + } + public void testSimpleMultiSearch() { createIndex("test"); ensureGreen(); @@ -70,4 +82,32 @@ public void testSimpleMultiSearchMoreRequests() { } } + /** + * Test that triggering the CCS compatibility check with a query that shouldn't go to the minor before Version.CURRENT works + */ + public void testCCSCheckCompatibility() throws Exception { + createIndex("test"); + ensureGreen(); + client().prepareIndex("test").setId("1").setSource("field", "xxx").get(); + client().prepareIndex("test").setId("2").setSource("field", "yyy").get(); + refresh(); + MultiSearchResponse response = client().prepareMultiSearch() + .add(client().prepareSearch("test").setQuery(QueryBuilders.termQuery("field", "xxx"))) + .add(client().prepareSearch("test").setQuery(QueryBuilders.termQuery("field", "yyy"))) + .add(client().prepareSearch("test").setQuery(new DummyQueryBuilder() { + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT; + } + })) + .get(); + + assertThat(response.getResponses().length, equalTo(3)); + assertHitCount(response.getResponses()[0].getResponse(), 1L); + assertHitCount(response.getResponses()[1].getResponse(), 1L); + assertTrue(response.getResponses()[2].isFailure()); + assertTrue( + response.getResponses()[2].getFailure().getMessage().contains("the 'search.check_ccs_compatibility' setting is enabled") + ); + } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/resolve/ResolveIndexAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/resolve/ResolveIndexAction.java index 1f4ef2b7a7c87..358ba10f2a74e 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/resolve/ResolveIndexAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/resolve/ResolveIndexAction.java @@ -34,6 +34,7 @@ import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.core.Nullable; import org.elasticsearch.index.Index; +import org.elasticsearch.search.SearchService; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.RemoteClusterAware; @@ -54,6 +55,8 @@ import java.util.SortedMap; import java.util.TreeMap; +import static org.elasticsearch.action.search.TransportSearchHelper.checkCCSVersionCompatibility; + public class ResolveIndexAction extends ActionType { public static final ResolveIndexAction INSTANCE = new ResolveIndexAction(); @@ -436,6 +439,7 @@ public static class TransportAction extends HandledTransportAction listener) { + if (ccsCheckCompatibility) { + checkCCSVersionCompatibility(request); + } final ClusterState clusterState = clusterService.state(); final Map remoteClusterIndices = remoteClusterService.groupIndices( request.indicesOptions(), diff --git a/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java b/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java index 7429ec5e8b50a..71e4e19c4de1f 100644 --- a/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java +++ b/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java @@ -27,6 +27,7 @@ import org.elasticsearch.core.Tuple; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.search.SearchService; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.RemoteClusterAware; @@ -45,6 +46,8 @@ import java.util.function.Predicate; import java.util.stream.Collectors; +import static org.elasticsearch.action.search.TransportSearchHelper.checkCCSVersionCompatibility; + public class TransportFieldCapabilitiesAction extends HandledTransportAction { public static final String ACTION_NODE_NAME = FieldCapabilitiesAction.NAME + "[n]"; @@ -55,6 +58,7 @@ public class TransportFieldCapabilitiesAction extends HandledTransportAction metadataFieldPred; + private final boolean ccsCheckCompatibility; @Inject public TransportFieldCapabilitiesAction( @@ -79,10 +83,14 @@ public TransportFieldCapabilitiesAction( FieldCapabilitiesNodeRequest::new, new NodeTransportHandler() ); + this.ccsCheckCompatibility = SearchService.CCS_VERSION_CHECK_SETTING.get(clusterService.getSettings()); } @Override protected void doExecute(Task task, FieldCapabilitiesRequest request, final ActionListener listener) { + if (ccsCheckCompatibility) { + checkCCSVersionCompatibility(request); + } // retrieve the initial timestamp in case the action is a cross cluster search long nowInMillis = request.nowInMillis() == null ? System.currentTimeMillis() : request.nowInMillis(); final ClusterState clusterState = clusterService.state(); diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 549506bc56d8b..11072b273ab91 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -95,6 +95,7 @@ import static org.elasticsearch.action.search.SearchType.DFS_QUERY_THEN_FETCH; import static org.elasticsearch.action.search.SearchType.QUERY_THEN_FETCH; +import static org.elasticsearch.action.search.TransportSearchHelper.checkCCSVersionCompatibility; import static org.elasticsearch.search.sort.FieldSortBuilder.hasPrimaryFieldSort; import static org.elasticsearch.threadpool.ThreadPool.Names.SYSTEM_CRITICAL_READ; import static org.elasticsearch.threadpool.ThreadPool.Names.SYSTEM_READ; @@ -132,6 +133,7 @@ public class TransportSearchAction extends HandledTransportAction buildPerIndexOriginalIndices( @@ -371,6 +374,9 @@ private void executeRequest( ActionListener rewriteListener = ActionListener.wrap(rewritten -> { final SearchContextId searchContext; final Map remoteClusterIndices; + if (ccsCheckCompatibility) { + checkCCSVersionCompatibility(rewritten); + } if (rewritten.pointInTimeBuilder() != null) { searchContext = rewritten.pointInTimeBuilder().getSearchContextId(namedWriteableRegistry); remoteClusterIndices = getIndicesFromSearchContexts(searchContext, rewritten.indicesOptions()); diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchHelper.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchHelper.java index cd40ddef723d5..a58585ffb8305 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchHelper.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchHelper.java @@ -8,10 +8,14 @@ package org.elasticsearch.action.search; +import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.ByteArrayStreamInput; import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.VersionCheckingStreamOutput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.search.SearchPhaseResult; +import org.elasticsearch.search.SearchService; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.internal.InternalScrollSearchRequest; import org.elasticsearch.search.internal.ShardSearchContextId; @@ -20,8 +24,9 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.util.Base64; +import java.util.List; -final class TransportSearchHelper { +public final class TransportSearchHelper { private static final String INCLUDE_CONTEXT_UUID = "include_context_uuid"; @@ -91,6 +96,50 @@ static ParsedScrollId parseScrollId(String scrollId) { } } + private static final List ALL_VERSIONS = Version.getDeclaredVersions(Version.class); + private static final Version CCS_CHECK_VERSION = getPreviousMinorSeries(Version.CURRENT); + + /** + * Using the 'search.check_ccs_compatibility' setting, clients can ask for an early + * check that inspects the incoming request and tries to verify that it can be handled by + * a CCS compliant earlier version, e.g. currently a N-1 version where N is the current minor. + * + * Checking the compatibility involved serializing the request to a stream output that acts like + * it was on the previous minor version. This should e.g. trigger errors for {@link Writeable} parts of + * the requests that were not available in those versions. + */ + public static void checkCCSVersionCompatibility(Writeable writeableRequest) { + try { + writeableRequest.writeTo(new VersionCheckingStreamOutput(CCS_CHECK_VERSION)); + } catch (Exception e) { + // if we cannot serialize, raise this as an error to indicate to the caller that CCS has problems with this request + throw new IllegalArgumentException( + "[" + + writeableRequest.getClass() + + "] is not compatible with version " + + CCS_CHECK_VERSION + + " and the '" + + SearchService.CCS_VERSION_CHECK_SETTING.getKey() + + "' setting is enabled.", + e + ); + } + } + + /** + * Returns the first minor version previous to the minor version passed in. + * I.e 8.2.1 will return 8.1.0 + */ + static Version getPreviousMinorSeries(Version current) { + for (int i = ALL_VERSIONS.size() - 1; i >= 0; i--) { + Version v = ALL_VERSIONS.get(i); + if (v.before(current) && (v.minor < current.minor || v.major < current.major)) { + return Version.fromId(v.major * 1000000 + v.minor * 10000 + 99); + } + } + throw new IllegalArgumentException("couldn't find any released versions of the minor before [" + current + "]"); + } + private TransportSearchHelper() { } diff --git a/server/src/main/java/org/elasticsearch/common/io/stream/VersionCheckingStreamOutput.java b/server/src/main/java/org/elasticsearch/common/io/stream/VersionCheckingStreamOutput.java new file mode 100644 index 0000000000000..bd9ace99eb276 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/common/io/stream/VersionCheckingStreamOutput.java @@ -0,0 +1,81 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.common.io.stream; + +import org.elasticsearch.Version; +import org.elasticsearch.core.Nullable; + +import java.io.IOException; + +/** + * This {@link StreamOutput} writes nowhere. It can be used to check if serialization would + * be successful writing to a specific version. + */ +public class VersionCheckingStreamOutput extends StreamOutput { + + public VersionCheckingStreamOutput(Version version) { + setVersion(version); + } + + @Override + public void writeByte(byte b) throws IOException { + // no-op + } + + @Override + public void writeBytes(byte[] b, int offset, int length) throws IOException { + // no-op + } + + @Override + public void flush() throws IOException { + // no-op + + } + + @Override + public void close() throws IOException { + // no-op + + } + + @Override + public void reset() throws IOException { + // no-op + } + + @Override + public void writeNamedWriteable(NamedWriteable namedWriteable) throws IOException { + if (namedWriteable instanceof VersionedNamedWriteable vnw) { + checkVersionCompatibility(vnw); + } + super.writeNamedWriteable(namedWriteable); + } + + @Override + public void writeOptionalNamedWriteable(@Nullable NamedWriteable namedWriteable) throws IOException { + if (namedWriteable != null && namedWriteable instanceof VersionedNamedWriteable vnw) { + checkVersionCompatibility(vnw); + } + super.writeOptionalNamedWriteable(namedWriteable); + } + + private void checkVersionCompatibility(VersionedNamedWriteable namedWriteable) { + if (namedWriteable.getMinimalSupportedVersion().after(getVersion())) { + throw new IllegalArgumentException( + "[" + + namedWriteable.getWriteableName() + + "] was released first in version " + + namedWriteable.getMinimalSupportedVersion() + + ", failed compatibility check trying to send it to node with version " + + getVersion() + ); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 618c128a76fd2..f46de609296dc 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -407,6 +407,7 @@ public void apply(Settings value, Settings current, Settings previous) { SearchService.KEEPALIVE_INTERVAL_SETTING, SearchService.MAX_KEEPALIVE_SETTING, SearchService.ALLOW_EXPENSIVE_QUERIES, + SearchService.CCS_VERSION_CHECK_SETTING, MultiBucketConsumerService.MAX_BUCKET_SETTING, SearchService.LOW_LEVEL_CANCELLATION_SETTING, SearchService.MAX_OPEN_SCROLL_CONTEXT, diff --git a/server/src/main/java/org/elasticsearch/index/query/QueryBuilder.java b/server/src/main/java/org/elasticsearch/index/query/QueryBuilder.java index 1480a70a48c3f..fb2da07ca96d6 100644 --- a/server/src/main/java/org/elasticsearch/index/query/QueryBuilder.java +++ b/server/src/main/java/org/elasticsearch/index/query/QueryBuilder.java @@ -9,12 +9,13 @@ package org.elasticsearch.index.query; import org.apache.lucene.search.Query; -import org.elasticsearch.common.io.stream.NamedWriteable; +import org.elasticsearch.Version; +import org.elasticsearch.common.io.stream.VersionedNamedWriteable; import org.elasticsearch.xcontent.ToXContentObject; import java.io.IOException; -public interface QueryBuilder extends NamedWriteable, ToXContentObject, Rewriteable { +public interface QueryBuilder extends VersionedNamedWriteable, ToXContentObject, Rewriteable { /** * Converts this QueryBuilder to a lucene {@link Query}. @@ -66,4 +67,9 @@ public interface QueryBuilder extends NamedWriteable, ToXContentObject, Rewritea default QueryBuilder rewrite(QueryRewriteContext queryRewriteContext) throws IOException { return this; } + + @Override + default Version getMinimalSupportedVersion() { + return Version.V_EMPTY; + } } diff --git a/server/src/main/java/org/elasticsearch/search/SearchExtBuilder.java b/server/src/main/java/org/elasticsearch/search/SearchExtBuilder.java index e49d4ee78ca2e..26a5f630461cd 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchExtBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/SearchExtBuilder.java @@ -8,9 +8,10 @@ package org.elasticsearch.search; -import org.elasticsearch.common.io.stream.NamedWriteable; +import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.VersionedNamedWriteable; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.core.CheckedFunction; import org.elasticsearch.plugins.SearchPlugin; @@ -32,9 +33,16 @@ * * @see SearchExtSpec */ -public abstract class SearchExtBuilder implements NamedWriteable, ToXContentFragment { +public abstract class SearchExtBuilder implements VersionedNamedWriteable, ToXContentFragment { + @Override public abstract int hashCode(); + @Override public abstract boolean equals(Object obj); + + @Override + public Version getMinimalSupportedVersion() { + return Version.V_EMPTY; + } } diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 416e4e5a7a317..91877f8b730a1 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -170,6 +170,12 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv Property.Dynamic ); + public static final Setting CCS_VERSION_CHECK_SETTING = Setting.boolSetting( + "search.check_ccs_compatibility", + false, + Property.NodeScope + ); + /** * Enables low-level, frequent search cancellation checks. Enabling low-level checks will make long running searches to react * to the cancellation request faster. It will produce more cancellation checks but benchmarking has shown these did not diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/AggregationBuilder.java b/server/src/main/java/org/elasticsearch/search/aggregations/AggregationBuilder.java index a89f51854320c..9c78ff3864f0e 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/AggregationBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/AggregationBuilder.java @@ -7,8 +7,9 @@ */ package org.elasticsearch.search.aggregations; +import org.elasticsearch.Version; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.io.stream.NamedWriteable; +import org.elasticsearch.common.io.stream.VersionedNamedWriteable; import org.elasticsearch.index.query.QueryRewriteContext; import org.elasticsearch.index.query.Rewriteable; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; @@ -29,7 +30,7 @@ */ public abstract class AggregationBuilder implements - NamedWriteable, + VersionedNamedWriteable, ToXContentFragment, BaseAggregationBuilder, Rewriteable { @@ -190,6 +191,11 @@ public String toString() { return Strings.toString(this); } + @Override + public Version getMinimalSupportedVersion() { + return Version.V_EMPTY; + } + /** * Return true if any of the child aggregations is a time-series aggregation that requires an in-order execution */ diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/PipelineAggregationBuilder.java b/server/src/main/java/org/elasticsearch/search/aggregations/PipelineAggregationBuilder.java index e6a5842409a6b..85b6ce2c3893d 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/PipelineAggregationBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/PipelineAggregationBuilder.java @@ -7,10 +7,11 @@ */ package org.elasticsearch.search.aggregations; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ValidateActions; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.io.stream.NamedWriteable; +import org.elasticsearch.common.io.stream.VersionedNamedWriteable; import org.elasticsearch.index.query.QueryRewriteContext; import org.elasticsearch.index.query.Rewriteable; import org.elasticsearch.search.aggregations.AggregatorFactories.Builder; @@ -31,7 +32,7 @@ */ public abstract class PipelineAggregationBuilder implements - NamedWriteable, + VersionedNamedWriteable, BaseAggregationBuilder, ToXContentFragment, Rewriteable { @@ -258,4 +259,9 @@ public String toString() { public PipelineAggregationBuilder rewrite(QueryRewriteContext context) throws IOException { return this; } + + @Override + public Version getMinimalSupportedVersion() { + return Version.V_EMPTY; + } } diff --git a/server/src/main/java/org/elasticsearch/search/rescore/QueryRescorerBuilder.java b/server/src/main/java/org/elasticsearch/search/rescore/QueryRescorerBuilder.java index ced8b487dbe5d..96872ae342e94 100644 --- a/server/src/main/java/org/elasticsearch/search/rescore/QueryRescorerBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/rescore/QueryRescorerBuilder.java @@ -8,6 +8,7 @@ package org.elasticsearch.search.rescore; +import org.elasticsearch.Version; import org.elasticsearch.common.ParsingException; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -91,6 +92,11 @@ public String getWriteableName() { return NAME; } + @Override + public Version getMinimalSupportedVersion() { + return Version.V_EMPTY; + } + /** * @return the query used for this rescore query */ diff --git a/server/src/main/java/org/elasticsearch/search/rescore/RescorerBuilder.java b/server/src/main/java/org/elasticsearch/search/rescore/RescorerBuilder.java index bfe7c6bfd3200..897c14409b5fd 100644 --- a/server/src/main/java/org/elasticsearch/search/rescore/RescorerBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/rescore/RescorerBuilder.java @@ -10,9 +10,9 @@ import org.elasticsearch.common.ParsingException; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.io.stream.NamedWriteable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.VersionedNamedWriteable; import org.elasticsearch.index.query.Rewriteable; import org.elasticsearch.index.query.SearchExecutionContext; import org.elasticsearch.xcontent.ParseField; @@ -28,7 +28,7 @@ */ public abstract class RescorerBuilder> implements - NamedWriteable, + VersionedNamedWriteable, ToXContentObject, Rewriteable> { public static final int DEFAULT_WINDOW_SIZE = 10; diff --git a/server/src/main/java/org/elasticsearch/search/sort/FieldSortBuilder.java b/server/src/main/java/org/elasticsearch/search/sort/FieldSortBuilder.java index 2efae46d05627..46c88b8915782 100644 --- a/server/src/main/java/org/elasticsearch/search/sort/FieldSortBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/sort/FieldSortBuilder.java @@ -709,6 +709,11 @@ public String getWriteableName() { return NAME; } + @Override + public Version getMinimalSupportedVersion() { + return Version.V_EMPTY; + } + /** * Creates a new {@link FieldSortBuilder} from the query held by the {@link XContentParser} in * {@link org.elasticsearch.xcontent.XContent} format. diff --git a/server/src/main/java/org/elasticsearch/search/sort/GeoDistanceSortBuilder.java b/server/src/main/java/org/elasticsearch/search/sort/GeoDistanceSortBuilder.java index aa1b0f0833454..0b3b6ac8b92d2 100644 --- a/server/src/main/java/org/elasticsearch/search/sort/GeoDistanceSortBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/sort/GeoDistanceSortBuilder.java @@ -351,6 +351,11 @@ public String getWriteableName() { return NAME; } + @Override + public Version getMinimalSupportedVersion() { + return Version.V_EMPTY; + } + @Override public boolean equals(Object object) { if (this == object) { diff --git a/server/src/main/java/org/elasticsearch/search/sort/ScoreSortBuilder.java b/server/src/main/java/org/elasticsearch/search/sort/ScoreSortBuilder.java index 1da72092625da..915788af6373b 100644 --- a/server/src/main/java/org/elasticsearch/search/sort/ScoreSortBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/sort/ScoreSortBuilder.java @@ -11,6 +11,7 @@ import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.search.Scorable; import org.apache.lucene.search.SortField; +import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.util.BigArrays; @@ -157,6 +158,11 @@ public String getWriteableName() { return NAME; } + @Override + public Version getMinimalSupportedVersion() { + return Version.V_EMPTY; + } + @Override public ScoreSortBuilder rewrite(QueryRewriteContext ctx) throws IOException { return this; diff --git a/server/src/main/java/org/elasticsearch/search/sort/ScriptSortBuilder.java b/server/src/main/java/org/elasticsearch/search/sort/ScriptSortBuilder.java index 9eed79459e5f2..e1c0b94291c8c 100644 --- a/server/src/main/java/org/elasticsearch/search/sort/ScriptSortBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/sort/ScriptSortBuilder.java @@ -385,6 +385,11 @@ public String getWriteableName() { return NAME; } + @Override + public Version getMinimalSupportedVersion() { + return Version.V_EMPTY; + } + public enum ScriptSortType implements Writeable { /** script sort for a string value **/ STRING, diff --git a/server/src/main/java/org/elasticsearch/search/sort/SortBuilder.java b/server/src/main/java/org/elasticsearch/search/sort/SortBuilder.java index 7b45f19943274..bc4ef6a2d108b 100644 --- a/server/src/main/java/org/elasticsearch/search/sort/SortBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/sort/SortBuilder.java @@ -14,7 +14,7 @@ import org.apache.lucene.search.join.ToChildBlockJoinQuery; import org.elasticsearch.common.ParsingException; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.io.stream.NamedWriteable; +import org.elasticsearch.common.io.stream.VersionedNamedWriteable; import org.elasticsearch.common.lucene.search.Queries; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.core.RestApiVersion; @@ -39,7 +39,11 @@ import static org.elasticsearch.index.query.AbstractQueryBuilder.parseInnerQueryBuilder; import static org.elasticsearch.search.sort.NestedSortBuilder.FILTER_FIELD; -public abstract class SortBuilder> implements NamedWriteable, ToXContentObject, Rewriteable> { +public abstract class SortBuilder> + implements + VersionedNamedWriteable, + ToXContentObject, + Rewriteable> { protected SortOrder order = SortOrder.ASC; diff --git a/server/src/main/java/org/elasticsearch/search/suggest/SuggestionBuilder.java b/server/src/main/java/org/elasticsearch/search/suggest/SuggestionBuilder.java index 785e7d555d62f..de4279d79919b 100644 --- a/server/src/main/java/org/elasticsearch/search/suggest/SuggestionBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/suggest/SuggestionBuilder.java @@ -11,9 +11,9 @@ import org.apache.lucene.analysis.Analyzer; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.common.ParsingException; -import org.elasticsearch.common.io.stream.NamedWriteable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.VersionedNamedWriteable; import org.elasticsearch.common.lucene.BytesRefs; import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.query.SearchExecutionContext; @@ -29,7 +29,7 @@ /** * Base class for the different suggestion implementations. */ -public abstract class SuggestionBuilder> implements NamedWriteable, ToXContentFragment { +public abstract class SuggestionBuilder> implements VersionedNamedWriteable, ToXContentFragment { protected final String field; protected String text; diff --git a/server/src/main/java/org/elasticsearch/search/suggest/completion/CompletionSuggestionBuilder.java b/server/src/main/java/org/elasticsearch/search/suggest/completion/CompletionSuggestionBuilder.java index e66c5c06c1030..d10b10c09d66a 100644 --- a/server/src/main/java/org/elasticsearch/search/suggest/completion/CompletionSuggestionBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/suggest/completion/CompletionSuggestionBuilder.java @@ -8,6 +8,7 @@ package org.elasticsearch.search.suggest.completion; import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.Version; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -325,6 +326,11 @@ public String getWriteableName() { return SUGGESTION_NAME; } + @Override + public Version getMinimalSupportedVersion() { + return Version.V_EMPTY; + } + @Override protected boolean doEquals(CompletionSuggestionBuilder other) { return skipDuplicates == other.skipDuplicates diff --git a/server/src/main/java/org/elasticsearch/search/suggest/phrase/PhraseSuggestionBuilder.java b/server/src/main/java/org/elasticsearch/search/suggest/phrase/PhraseSuggestionBuilder.java index 65e17cc88890d..f7a415c3cbb07 100644 --- a/server/src/main/java/org/elasticsearch/search/suggest/phrase/PhraseSuggestionBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/suggest/phrase/PhraseSuggestionBuilder.java @@ -9,6 +9,7 @@ import org.apache.lucene.analysis.Analyzer; import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.Version; import org.elasticsearch.common.ParsingException; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -704,6 +705,11 @@ public String getWriteableName() { return SUGGESTION_NAME; } + @Override + public Version getMinimalSupportedVersion() { + return Version.V_EMPTY; + } + @Override protected boolean doEquals(PhraseSuggestionBuilder other) { return Objects.equals(maxErrors, other.maxErrors) diff --git a/server/src/main/java/org/elasticsearch/search/suggest/term/TermSuggestionBuilder.java b/server/src/main/java/org/elasticsearch/search/suggest/term/TermSuggestionBuilder.java index 9764d256f0b79..bf9f26864bfbe 100644 --- a/server/src/main/java/org/elasticsearch/search/suggest/term/TermSuggestionBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/suggest/term/TermSuggestionBuilder.java @@ -15,6 +15,7 @@ import org.apache.lucene.search.spell.NGramDistance; import org.apache.lucene.search.spell.StringDistance; import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.Version; import org.elasticsearch.common.ParsingException; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -456,6 +457,11 @@ public String getWriteableName() { return SUGGESTION_NAME; } + @Override + public Version getMinimalSupportedVersion() { + return Version.V_EMPTY; + } + @Override protected boolean doEquals(TermSuggestionBuilder other) { return Objects.equals(suggestMode, other.suggestMode) diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/resolve/TransportResolveIndexActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/resolve/TransportResolveIndexActionTests.java new file mode 100644 index 0000000000000..23facdf2fa0f4 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/resolve/TransportResolveIndexActionTests.java @@ -0,0 +1,94 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.admin.indices.resolve; + +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilter; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.search.SearchService; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.Matchers.containsString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TransportResolveIndexActionTests extends ESTestCase { + + private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); + + @Override + public void tearDown() throws Exception { + super.tearDown(); + ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); + } + + public void testCCSCompatibilityCheck() throws Exception { + Settings settings = Settings.builder() + .put("node.name", TransportResolveIndexActionTests.class.getSimpleName()) + .put(SearchService.CCS_VERSION_CHECK_SETTING.getKey(), "true") + .build(); + ActionFilters actionFilters = mock(ActionFilters.class); + when(actionFilters.filters()).thenReturn(new ActionFilter[0]); + try { + TransportService transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool); + + ResolveIndexAction.Request request = new ResolveIndexAction.Request(new String[] { "test" }) { + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + if (out.getVersion().before(Version.CURRENT)) { + throw new IllegalArgumentException("This request isn't serializable to nodes before " + Version.CURRENT); + } + } + }; + + ClusterService clusterService = new ClusterService( + settings, + new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + threadPool + ); + ResolveIndexAction.TransportAction action = new ResolveIndexAction.TransportAction( + transportService, + clusterService, + threadPool, + actionFilters, + null + ); + + IllegalArgumentException ex = expectThrows( + IllegalArgumentException.class, + () -> action.doExecute(null, request, new ActionListener() { + @Override + public void onResponse(ResolveIndexAction.Response response) {} + + @Override + public void onFailure(Exception e) {} + }) + ); + + assertThat(ex.getMessage(), containsString("not compatible with version")); + assertThat(ex.getMessage(), containsString("and the 'search.check_ccs_compatibility' setting is enabled.")); + assertEquals("This request isn't serializable to nodes before " + Version.CURRENT, ex.getCause().getMessage()); + } finally { + assertTrue(ESTestCase.terminate(threadPool)); + } + } +} diff --git a/server/src/test/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesActionTests.java b/server/src/test/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesActionTests.java new file mode 100644 index 0000000000000..77e1c990ff690 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesActionTests.java @@ -0,0 +1,103 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.fieldcaps; + +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilter; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.search.DummyQueryBuilder; +import org.elasticsearch.search.SearchService; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.Collections; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.Matchers.containsString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TransportFieldCapabilitiesActionTests extends ESTestCase { + + private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); + + @Override + public void tearDown() throws Exception { + super.tearDown(); + ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); + } + + public void testCCSCompatibilityCheck() throws Exception { + Settings settings = Settings.builder() + .put("node.name", TransportFieldCapabilitiesActionTests.class.getSimpleName()) + .put(SearchService.CCS_VERSION_CHECK_SETTING.getKey(), "true") + .build(); + ActionFilters actionFilters = mock(ActionFilters.class); + when(actionFilters.filters()).thenReturn(new ActionFilter[0]); + try { + TransportService transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool); + + FieldCapabilitiesRequest fieldCapsRequest = new FieldCapabilitiesRequest(); + fieldCapsRequest.indexFilter(new DummyQueryBuilder() { + @Override + protected void doWriteTo(StreamOutput out) throws IOException { + if (out.getVersion().before(Version.CURRENT)) { + throw new IllegalArgumentException("This query isn't serializable to nodes before " + Version.CURRENT); + } + } + }); + + IndicesService indicesService = mock(IndicesService.class); + when(indicesService.getAllMetadataFields()).thenReturn(Collections.singleton("_index")); + ClusterService clusterService = new ClusterService( + settings, + new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + threadPool + ); + TransportFieldCapabilitiesAction action = new TransportFieldCapabilitiesAction( + transportService, + clusterService, + threadPool, + actionFilters, + indicesService, + null + ); + + IllegalArgumentException ex = expectThrows( + IllegalArgumentException.class, + () -> action.doExecute(null, fieldCapsRequest, new ActionListener() { + @Override + public void onResponse(FieldCapabilitiesResponse response) {} + + @Override + public void onFailure(Exception e) {} + }) + ); + + assertThat( + ex.getMessage(), + containsString("[class org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest] is not compatible with version") + ); + assertThat(ex.getMessage(), containsString("and the 'search.check_ccs_compatibility' setting is enabled.")); + assertEquals("This query isn't serializable to nodes before " + Version.CURRENT, ex.getCause().getMessage()); + } finally { + assertTrue(ESTestCase.terminate(threadPool)); + } + } +} diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java index 68f9162d615c6..db749b02ad8c0 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java @@ -17,8 +17,11 @@ import org.elasticsearch.action.OriginalIndicesTests; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse; +import org.elasticsearch.action.support.ActionFilter; +import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; +import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlocks; @@ -30,8 +33,11 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.core.TimeValue; @@ -41,12 +47,16 @@ import org.elasticsearch.index.query.InnerHitBuilder; import org.elasticsearch.index.query.MatchAllQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.query.QueryRewriteContext; import org.elasticsearch.index.query.TermsQueryBuilder; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.search.DummyQueryBuilder; import org.elasticsearch.search.Scroll; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.SearchService; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.aggregations.AggregationReduceContext; import org.elasticsearch.search.aggregations.InternalAggregations; @@ -74,6 +84,7 @@ import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -102,6 +113,9 @@ import static org.hamcrest.CoreMatchers.startsWith; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.hasSize; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class TransportSearchActionTests extends ESTestCase { @@ -1356,4 +1370,71 @@ public void testLocalShardIteratorFromPointInTime() { assertTrue(anotherShardIterator.isPresent()); assertThat(anotherShardIterator.get().getTargetNodeIds(), hasSize(1)); } + + public void testCCSCompatibilityCheck() throws Exception { + Settings settings = Settings.builder() + .put("node.name", TransportSearchAction.class.getSimpleName()) + .put(SearchService.CCS_VERSION_CHECK_SETTING.getKey(), "true") + .build(); + ActionFilters actionFilters = mock(ActionFilters.class); + when(actionFilters.filters()).thenReturn(new ActionFilter[0]); + ThreadPool threadPool = new ThreadPool(settings); + try { + TransportService transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool); + + SearchRequest searchRequest = new SearchRequest(); + searchRequest.source(new SearchSourceBuilder().query(new DummyQueryBuilder() { + @Override + protected void doWriteTo(StreamOutput out) throws IOException { + throw new IllegalArgumentException("This query isn't serializable to nodes before " + Version.CURRENT); + } + })); + NodeClient client = new NodeClient(settings, threadPool); + + SearchService searchService = mock(SearchService.class); + when(searchService.getRewriteContext(any())).thenReturn(new QueryRewriteContext(null, null, null, null)); + ClusterService clusterService = new ClusterService( + settings, + new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + threadPool + ); + TransportSearchAction action = new TransportSearchAction( + threadPool, + new NoneCircuitBreakerService(), + transportService, + searchService, + new SearchTransportService(transportService, client, null), + null, + clusterService, + actionFilters, + null, + null, + null + ); + + CountDownLatch latch = new CountDownLatch(1); + action.doExecute(null, searchRequest, new ActionListener<>() { + + @Override + public void onResponse(SearchResponse response) { + latch.countDown(); + fail("should not be called"); + } + + @Override + public void onFailure(Exception ex) { + assertThat( + ex.getMessage(), + containsString("[class org.elasticsearch.action.search.SearchRequest] is not compatible with version") + ); + assertThat(ex.getMessage(), containsString("and the 'search.check_ccs_compatibility' setting is enabled.")); + assertEquals("This query isn't serializable to nodes before " + Version.CURRENT, ex.getCause().getMessage()); + latch.countDown(); + } + }); + latch.await(); + } finally { + assertTrue(ESTestCase.terminate(threadPool)); + } + } } diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportSearchHelperTests.java b/server/src/test/java/org/elasticsearch/action/search/TransportSearchHelperTests.java index cc05269cae66e..ed3751eaecf6e 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportSearchHelperTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportSearchHelperTests.java @@ -16,6 +16,8 @@ import org.elasticsearch.search.internal.ShardSearchContextId; import org.elasticsearch.test.ESTestCase; +import java.util.List; + import static org.hamcrest.Matchers.equalTo; public class TransportSearchHelperTests extends ESTestCase { @@ -66,4 +68,22 @@ public void testParseScrollId() { assertEquals(42, parseScrollId.getContext()[2].getSearchContextId().getId()); assertThat(parseScrollId.getContext()[2].getSearchContextId().getSessionId(), equalTo("c")); } + + public void testGetPreviousMinorSeries() throws Exception { + final List declaredVersions = Version.getDeclaredVersions(Version.class); + Version randomVersion = randomValueOtherThanMany(v -> v.before(Version.V_7_1_0), () -> randomFrom(declaredVersions)); + Version previousFirstMinor = TransportSearchHelper.getPreviousMinorSeries(randomVersion); + assertTrue(previousFirstMinor.before(randomVersion)); + assertTrue(previousFirstMinor.revision == 0); + for (int i = declaredVersions.indexOf(previousFirstMinor); i < declaredVersions.indexOf(randomVersion); i++) { + Version version = declaredVersions.get(i); + assertTrue(version.before(randomVersion)); + if (randomVersion.major == previousFirstMinor.major) { + assertTrue(previousFirstMinor.minor == randomVersion.minor - 1); + } else { + assertTrue((randomVersion.major - 1) == previousFirstMinor.major); + assertTrue(randomVersion.minor == 0); + } + } + } } diff --git a/server/src/test/java/org/elasticsearch/common/io/stream/VersionCheckingStreamOutputTests.java b/server/src/test/java/org/elasticsearch/common/io/stream/VersionCheckingStreamOutputTests.java new file mode 100644 index 0000000000000..43766005be5f4 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/common/io/stream/VersionCheckingStreamOutputTests.java @@ -0,0 +1,54 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.common.io.stream; + +import org.elasticsearch.Version; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.VersionUtils; + +import java.io.IOException; + +public class VersionCheckingStreamOutputTests extends ESTestCase { + + private static class DummyNamedWriteable implements VersionedNamedWriteable { + + @Override + public String getWriteableName() { + return "test_writable"; + } + + @Override + public void writeTo(StreamOutput out) throws IOException {} + + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT; + } + } + + public void testCheckVersionCompatibility() throws IOException { + Version streamVersion = VersionUtils.randomPreviousCompatibleVersion(random(), Version.CURRENT); + try (VersionCheckingStreamOutput out = new VersionCheckingStreamOutput(streamVersion)) { + out.writeNamedWriteable(QueryBuilders.matchAllQuery()); + + IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> out.writeNamedWriteable(new DummyNamedWriteable()) + ); + assertEquals( + "[test_writable] was released first in version " + + Version.CURRENT + + ", failed compatibility check trying to send it to node with version " + + streamVersion, + e.getMessage() + ); + } + } +} diff --git a/server/src/test/java/org/elasticsearch/index/query/SpanMultiTermQueryBuilderTests.java b/server/src/test/java/org/elasticsearch/index/query/SpanMultiTermQueryBuilderTests.java index dbbe8e9f87b25..47e79f1e06be4 100644 --- a/server/src/test/java/org/elasticsearch/index/query/SpanMultiTermQueryBuilderTests.java +++ b/server/src/test/java/org/elasticsearch/index/query/SpanMultiTermQueryBuilderTests.java @@ -27,6 +27,7 @@ import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.TopTermsRewrite; import org.apache.lucene.store.Directory; +import org.elasticsearch.Version; import org.elasticsearch.common.Strings; import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.io.stream.StreamOutput; @@ -163,6 +164,11 @@ public void writeTo(StreamOutput out) throws IOException { public String fieldName() { return "foo"; } + + @Override + public Version getMinimalSupportedVersion() { + return Version.V_EMPTY; + } } /** diff --git a/server/src/test/java/org/elasticsearch/rest/action/search/RestMultiSearchActionTests.java b/server/src/test/java/org/elasticsearch/rest/action/search/RestMultiSearchActionTests.java index 429871c2049c4..55ba0eb0decba 100644 --- a/server/src/test/java/org/elasticsearch/rest/action/search/RestMultiSearchActionTests.java +++ b/server/src/test/java/org/elasticsearch/rest/action/search/RestMultiSearchActionTests.java @@ -21,7 +21,6 @@ import java.nio.charset.StandardCharsets; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; @@ -62,9 +61,4 @@ public void testTypeInBody() { assertCriticalWarnings(RestMultiSearchAction.TYPES_DEPRECATION_MESSAGE); } - private Map> headersWith(String accept, List value) { - Map> headers = new HashMap<>(); - headers.put(accept, value); - return headers; - } } diff --git a/server/src/test/java/org/elasticsearch/search/SearchModuleTests.java b/server/src/test/java/org/elasticsearch/search/SearchModuleTests.java index 7de59898a7c79..dd612332ce803 100644 --- a/server/src/test/java/org/elasticsearch/search/SearchModuleTests.java +++ b/server/src/test/java/org/elasticsearch/search/SearchModuleTests.java @@ -10,6 +10,7 @@ import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Query; import org.apache.lucene.util.CharsRefBuilder; +import org.elasticsearch.Version; import org.elasticsearch.common.CheckedBiConsumer; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -625,6 +626,11 @@ protected void doXContent(XContentBuilder builder, Params params) throws IOExcep public RescoreContext innerBuildContext(int windowSize, SearchExecutionContext context) throws IOException { return null; } + + @Override + public Version getMinimalSupportedVersion() { + return Version.V_EMPTY; + } } private static class TestSuggester extends Suggester { @@ -688,6 +694,11 @@ protected int doHashCode() { public String getWriteableName() { return "test"; } + + @Override + public Version getMinimalSupportedVersion() { + return Version.V_EMPTY; + } } @SuppressWarnings("rawtypes") diff --git a/server/src/test/java/org/elasticsearch/index/query/plugin/DummyQueryBuilder.java b/test/framework/src/main/java/org/elasticsearch/search/DummyQueryBuilder.java similarity index 93% rename from server/src/test/java/org/elasticsearch/index/query/plugin/DummyQueryBuilder.java rename to test/framework/src/main/java/org/elasticsearch/search/DummyQueryBuilder.java index ac9e3a40fad9f..9797c2865ab37 100644 --- a/server/src/test/java/org/elasticsearch/index/query/plugin/DummyQueryBuilder.java +++ b/test/framework/src/main/java/org/elasticsearch/search/DummyQueryBuilder.java @@ -6,14 +6,14 @@ * Side Public License, v 1. */ -package org.elasticsearch.index.query.plugin; +package org.elasticsearch.search; import org.apache.lucene.search.Query; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.index.query.AbstractQueryBuilder; import org.elasticsearch.index.query.SearchExecutionContext; -import org.elasticsearch.index.query.plugin.DummyQueryParserPlugin.DummyQuery; +import org.elasticsearch.search.DummyQueryParserPlugin.DummyQuery; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentParser; diff --git a/server/src/test/java/org/elasticsearch/index/query/plugin/DummyQueryParserPlugin.java b/test/framework/src/main/java/org/elasticsearch/search/DummyQueryParserPlugin.java similarity index 80% rename from server/src/test/java/org/elasticsearch/index/query/plugin/DummyQueryParserPlugin.java rename to test/framework/src/main/java/org/elasticsearch/search/DummyQueryParserPlugin.java index 7d017ff996420..1a4b7b9f0bdbc 100644 --- a/server/src/test/java/org/elasticsearch/index/query/plugin/DummyQueryParserPlugin.java +++ b/test/framework/src/main/java/org/elasticsearch/search/DummyQueryParserPlugin.java @@ -6,7 +6,7 @@ * Side Public License, v 1. */ -package org.elasticsearch.index.query.plugin; +package org.elasticsearch.search; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.MatchAllDocsQuery; @@ -20,13 +20,18 @@ import java.io.IOException; import java.util.List; -import static java.util.Collections.singletonList; - public class DummyQueryParserPlugin extends Plugin implements SearchPlugin { @Override public List> getQueries() { - return singletonList(new QuerySpec<>(DummyQueryBuilder.NAME, DummyQueryBuilder::new, DummyQueryBuilder::fromXContent)); + return List.of( + new QuerySpec<>(DummyQueryBuilder.NAME, DummyQueryBuilder::new, DummyQueryBuilder::fromXContent), + new QuerySpec<>( + FailBeforeCurrentVersionQueryBuilder.NAME, + FailBeforeCurrentVersionQueryBuilder::new, + FailBeforeCurrentVersionQueryBuilder::fromXContent + ) + ); } public static class DummyQuery extends Query { diff --git a/test/framework/src/main/java/org/elasticsearch/search/FailBeforeCurrentVersionQueryBuilder.java b/test/framework/src/main/java/org/elasticsearch/search/FailBeforeCurrentVersionQueryBuilder.java new file mode 100644 index 0000000000000..1106774e15c0e --- /dev/null +++ b/test/framework/src/main/java/org/elasticsearch/search/FailBeforeCurrentVersionQueryBuilder.java @@ -0,0 +1,54 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.search; + +import org.elasticsearch.Version; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryRewriteContext; +import org.elasticsearch.xcontent.XContentParser; + +import java.io.IOException; + +/** + * Query simulating serialization error on versions earlier than CURRENT + */ +public class FailBeforeCurrentVersionQueryBuilder extends DummyQueryBuilder { + + public static final String NAME = "fail_before_current_version"; + + public FailBeforeCurrentVersionQueryBuilder(StreamInput in) throws IOException { + super(in); + } + + public FailBeforeCurrentVersionQueryBuilder() {} + + @Override + protected void doWriteTo(StreamOutput out) { + if (out.getVersion().before(Version.CURRENT)) { + throw new IllegalArgumentException("This query isn't serializable to nodes before " + Version.CURRENT); + } + } + + public static DummyQueryBuilder fromXContent(XContentParser parser) throws IOException { + DummyQueryBuilder.fromXContent(parser); + return new FailBeforeCurrentVersionQueryBuilder(); + } + + @Override + public String getWriteableName() { + return NAME; + } + + @Override + protected QueryBuilder doRewrite(QueryRewriteContext queryRewriteContext) throws IOException { + return this; + } +} diff --git a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java index 52a3263c4baec..082e4e3a0d7b4 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java @@ -1320,6 +1320,11 @@ protected void doWriteTo(StreamOutput out) throws IOException { throw new UnsupportedOperationException(); } + + @Override + public Version getMinimalSupportedVersion() { + return Version.V_EMPTY; + } } public static class InternalAggCardinalityUpperBound extends InternalAggregation { diff --git a/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java b/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java index 812937a642f55..c5b7b96e50628 100644 --- a/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java +++ b/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java @@ -9,12 +9,15 @@ import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.Version; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.query.MatchAllQueryBuilder; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.search.DummyQueryBuilder; +import org.elasticsearch.search.SearchService; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.bucket.terms.InternalTerms; import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; @@ -89,6 +92,14 @@ public void setupSuiteScopeCluster() throws InterruptedException { indexRandom(true, true, reqs); } + @Override + protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal, otherSettings)) + .put(SearchService.CCS_VERSION_CHECK_SETTING.getKey(), "true") + .build(); + } + public void testMaxMinAggregation() throws Exception { int step = numShards > 2 ? randomIntBetween(2, numShards) : 2; int numFailures = randomBoolean() ? randomIntBetween(0, numShards) : 0; @@ -507,4 +518,19 @@ public void testMaxResponseSize() { updateSettingsRequest.persistentSettings(Settings.builder().put("search.max_async_search_response_size", (String) null)); assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); } + + public void testCCSCheckCompatibility() throws Exception { + SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(new SearchSourceBuilder().query(new DummyQueryBuilder() { + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT; + } + }), indexName); + + AsyncSearchResponse response = submitAsyncSearch(request); + assertFalse(response.isRunning()); + Exception failure = response.getFailure(); + assertThat(failure.getMessage(), containsString("error while executing search")); + assertThat(failure.getCause().getMessage(), containsString("the 'search.check_ccs_compatibility' setting is enabled")); + } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/TransportTermsEnumAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/TransportTermsEnumAction.java index f86c3344eb54e..87fcfefb5cd26 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/TransportTermsEnumAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/TransportTermsEnumAction.java @@ -82,6 +82,7 @@ import java.util.concurrent.atomic.AtomicReferenceArray; import java.util.stream.Collectors; +import static org.elasticsearch.action.search.TransportSearchHelper.checkCCSVersionCompatibility; import static org.elasticsearch.xpack.core.security.SecurityField.DOCUMENT_LEVEL_SECURITY_FEATURE; public class TransportTermsEnumAction extends HandledTransportAction { @@ -98,6 +99,7 @@ public class TransportTermsEnumAction extends HandledTransportAction listener) { + if (ccsCheckCompatibility) { + checkCCSVersionCompatibility(request); + } new AsyncBroadcastAction(task, request, listener).start(); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/termsenum/TransportTermsEnumActionTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/termsenum/TransportTermsEnumActionTests.java index 27b87b66316d1..20e88cb69d1c2 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/termsenum/TransportTermsEnumActionTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/termsenum/TransportTermsEnumActionTests.java @@ -6,18 +6,39 @@ */ package org.elasticsearch.xpack.core.termsenum; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.search.DummyQueryBuilder; +import org.elasticsearch.search.SearchService; import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; import org.elasticsearch.xpack.core.termsenum.action.TermsEnumAction; import org.elasticsearch.xpack.core.termsenum.action.TermsEnumRequest; import org.elasticsearch.xpack.core.termsenum.action.TermsEnumResponse; +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; public class TransportTermsEnumActionTests extends ESSingleNodeTestCase { + @Override + protected Settings nodeSettings() { + return Settings.builder().put(SearchService.CCS_VERSION_CHECK_SETTING.getKey(), "true").build(); + } + + @Override + protected Collection> getPlugins() { + return Arrays.asList(LocalStateCompositeXPackPlugin.class); + } + /* * Copy of test that tripped up similarly broadcast ValidateQuery */ @@ -42,4 +63,25 @@ public void onFailure(final Exception e) { assertThat(invoked.get(), equalTo(true)); // ensure that onFailure was invoked } + /** + * Test that triggering the CCS compatibility check with a query that shouldn't go to the minor before Version.CURRENT works + */ + public void testCCSCheckCompatibility() throws Exception { + TermsEnumRequest request = new TermsEnumRequest().field("field").timeout(TimeValue.timeValueSeconds(5)); + request.indexFilter(new DummyQueryBuilder() { + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT; + } + }); + ExecutionException ex = expectThrows(ExecutionException.class, () -> client().execute(TermsEnumAction.INSTANCE, request).get()); + assertThat(ex.getCause().getMessage(), containsString("not compatible with version")); + assertThat(ex.getCause().getMessage(), containsString("the 'search.check_ccs_compatibility' setting is enabled.")); + assertThat( + ex.getCause().getCause().getMessage(), + containsString( + "was released first in version " + Version.CURRENT + ", failed compatibility check trying to send it to node with version" + ) + ); + } }