From 97bc2919ffb5d9e90f809a18233c49a52cf58faa Mon Sep 17 00:00:00 2001 From: Matteo Piergiovanni <134913285+piergm@users.noreply.github.com> Date: Wed, 18 Dec 2024 15:29:35 +0100 Subject: [PATCH] Prevent data nodes from sending stack traces to coordinator when `error_trace=false` (#118266) * first iterations * added tests * Update docs/changelog/118266.yaml * constant for error_trace and typos * centralized putHeader * moved threadContext to parent class * uses NodeClient.threadpool * updated async tests to retrieve final result * moved test to avoid starting up a node * added transport version to avoid sending useless bytes * more async tests --- docs/changelog/118266.yaml | 5 + .../bucket/SearchCancellationIT.java | 2 + .../http/SearchErrorTraceIT.java | 175 ++++++++++++++ .../org/elasticsearch/TransportVersions.java | 1 + .../action/search/SearchTransportService.java | 6 +- .../common/util/concurrent/ThreadContext.java | 21 ++ .../elasticsearch/rest/BaseRestHandler.java | 1 - .../elasticsearch/rest/RestController.java | 3 +- .../org/elasticsearch/rest/RestResponse.java | 3 +- .../action/search/RestMultiSearchAction.java | 3 + .../rest/action/search/RestSearchAction.java | 4 +- .../elasticsearch/search/SearchService.java | 56 ++++- .../elasticsearch/rest/RestResponseTests.java | 3 +- .../search/SearchServiceSingleNodeTests.java | 3 +- .../search/SearchServiceTests.java | 34 +++ .../xpack/search/AsyncSearchErrorTraceIT.java | 222 ++++++++++++++++++ .../search/RestSubmitAsyncSearchAction.java | 3 + .../ServerSentEventsRestActionListener.java | 3 +- 18 files changed, 535 insertions(+), 13 deletions(-) create mode 100644 docs/changelog/118266.yaml create mode 100644 qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/SearchErrorTraceIT.java create mode 100644 x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchErrorTraceIT.java diff --git a/docs/changelog/118266.yaml b/docs/changelog/118266.yaml new file mode 100644 index 0000000000000..1b14b12b973c5 --- /dev/null +++ b/docs/changelog/118266.yaml @@ -0,0 +1,5 @@ +pr: 118266 +summary: Prevent data nodes from sending stack traces to coordinator when `error_trace=false` +area: Search +type: enhancement +issues: [] diff --git a/modules/aggregations/src/internalClusterTest/java/org/elasticsearch/aggregations/bucket/SearchCancellationIT.java b/modules/aggregations/src/internalClusterTest/java/org/elasticsearch/aggregations/bucket/SearchCancellationIT.java index 5249077bdfdbb..7adf6a09e9a19 100644 --- a/modules/aggregations/src/internalClusterTest/java/org/elasticsearch/aggregations/bucket/SearchCancellationIT.java +++ b/modules/aggregations/src/internalClusterTest/java/org/elasticsearch/aggregations/bucket/SearchCancellationIT.java @@ -96,6 +96,8 @@ public void testCancellationDuringTimeSeriesAggregation() throws Exception { } logger.info("Executing search"); + // we have to explicitly set error_trace=true for the later exception check for `TimeSeriesIndexSearcher` + client().threadPool().getThreadContext().putHeader("error_trace", "true"); TimeSeriesAggregationBuilder timeSeriesAggregationBuilder = new TimeSeriesAggregationBuilder("test_agg"); ActionFuture searchResponse = prepareSearch("test").setQuery(matchAllQuery()) .addAggregation( diff --git a/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/SearchErrorTraceIT.java b/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/SearchErrorTraceIT.java new file mode 100644 index 0000000000000..6f9ab8ccdfdec --- /dev/null +++ b/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/SearchErrorTraceIT.java @@ -0,0 +1,175 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.http; + +import org.apache.http.entity.ContentType; +import org.apache.http.nio.entity.NByteArrayEntity; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.search.MultiSearchRequest; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.client.Request; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.transport.TransportMessageListener; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xcontent.XContentType; +import org.junit.Before; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.elasticsearch.index.query.QueryBuilders.simpleQueryStringQuery; + +public class SearchErrorTraceIT extends HttpSmokeTestCase { + private AtomicBoolean hasStackTrace; + + @Before + private void setupMessageListener() { + internalCluster().getDataNodeInstances(TransportService.class).forEach(ts -> { + ts.addMessageListener(new TransportMessageListener() { + @Override + public void onResponseSent(long requestId, String action, Exception error) { + TransportMessageListener.super.onResponseSent(requestId, action, error); + if (action.startsWith("indices:data/read/search")) { + Optional throwable = ExceptionsHelper.unwrapCausesAndSuppressed( + error, + t -> t.getStackTrace().length > 0 + ); + hasStackTrace.set(throwable.isPresent()); + } + } + }); + }); + } + + private void setupIndexWithDocs() { + createIndex("test1", "test2"); + indexRandom( + true, + prepareIndex("test1").setId("1").setSource("field", "foo"), + prepareIndex("test2").setId("10").setSource("field", 5) + ); + refresh(); + } + + public void testSearchFailingQueryErrorTraceDefault() throws IOException { + hasStackTrace = new AtomicBoolean(); + setupIndexWithDocs(); + + Request searchRequest = new Request("POST", "/_search"); + searchRequest.setJsonEntity(""" + { + "query": { + "simple_query_string" : { + "query": "foo", + "fields": ["field"] + } + } + } + """); + getRestClient().performRequest(searchRequest); + assertFalse(hasStackTrace.get()); + } + + public void testSearchFailingQueryErrorTraceTrue() throws IOException { + hasStackTrace = new AtomicBoolean(); + setupIndexWithDocs(); + + Request searchRequest = new Request("POST", "/_search"); + searchRequest.setJsonEntity(""" + { + "query": { + "simple_query_string" : { + "query": "foo", + "fields": ["field"] + } + } + } + """); + searchRequest.addParameter("error_trace", "true"); + getRestClient().performRequest(searchRequest); + assertTrue(hasStackTrace.get()); + } + + public void testSearchFailingQueryErrorTraceFalse() throws IOException { + hasStackTrace = new AtomicBoolean(); + setupIndexWithDocs(); + + Request searchRequest = new Request("POST", "/_search"); + searchRequest.setJsonEntity(""" + { + "query": { + "simple_query_string" : { + "query": "foo", + "fields": ["field"] + } + } + } + """); + searchRequest.addParameter("error_trace", "false"); + getRestClient().performRequest(searchRequest); + assertFalse(hasStackTrace.get()); + } + + public void testMultiSearchFailingQueryErrorTraceDefault() throws IOException { + hasStackTrace = new AtomicBoolean(); + setupIndexWithDocs(); + + XContentType contentType = XContentType.JSON; + MultiSearchRequest multiSearchRequest = new MultiSearchRequest().add( + new SearchRequest("test*").source(new SearchSourceBuilder().query(simpleQueryStringQuery("foo").field("field"))) + ); + Request searchRequest = new Request("POST", "/_msearch"); + byte[] requestBody = MultiSearchRequest.writeMultiLineFormat(multiSearchRequest, contentType.xContent()); + searchRequest.setEntity( + new NByteArrayEntity(requestBody, ContentType.create(contentType.mediaTypeWithoutParameters(), (Charset) null)) + ); + getRestClient().performRequest(searchRequest); + assertFalse(hasStackTrace.get()); + } + + public void testMultiSearchFailingQueryErrorTraceTrue() throws IOException { + hasStackTrace = new AtomicBoolean(); + setupIndexWithDocs(); + + XContentType contentType = XContentType.JSON; + MultiSearchRequest multiSearchRequest = new MultiSearchRequest().add( + new SearchRequest("test*").source(new SearchSourceBuilder().query(simpleQueryStringQuery("foo").field("field"))) + ); + Request searchRequest = new Request("POST", "/_msearch"); + byte[] requestBody = MultiSearchRequest.writeMultiLineFormat(multiSearchRequest, contentType.xContent()); + searchRequest.setEntity( + new NByteArrayEntity(requestBody, ContentType.create(contentType.mediaTypeWithoutParameters(), (Charset) null)) + ); + searchRequest.addParameter("error_trace", "true"); + getRestClient().performRequest(searchRequest); + assertTrue(hasStackTrace.get()); + } + + public void testMultiSearchFailingQueryErrorTraceFalse() throws IOException { + hasStackTrace = new AtomicBoolean(); + setupIndexWithDocs(); + + XContentType contentType = XContentType.JSON; + MultiSearchRequest multiSearchRequest = new MultiSearchRequest().add( + new SearchRequest("test*").source(new SearchSourceBuilder().query(simpleQueryStringQuery("foo").field("field"))) + ); + Request searchRequest = new Request("POST", "/_msearch"); + byte[] requestBody = MultiSearchRequest.writeMultiLineFormat(multiSearchRequest, contentType.xContent()); + searchRequest.setEntity( + new NByteArrayEntity(requestBody, ContentType.create(contentType.mediaTypeWithoutParameters(), (Charset) null)) + ); + searchRequest.addParameter("error_trace", "false"); + getRestClient().performRequest(searchRequest); + + assertFalse(hasStackTrace.get()); + } +} diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index d3e235f1cd82a..bda66d6a2c8cd 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -140,6 +140,7 @@ static TransportVersion def(int id) { public static final TransportVersion ESQL_QUERY_BUILDER_IN_SEARCH_FUNCTIONS = def(8_808_00_0); public static final TransportVersion EQL_ALLOW_PARTIAL_SEARCH_RESULTS = def(8_809_00_0); public static final TransportVersion NODE_VERSION_INFORMATION_WITH_MIN_READ_ONLY_INDEX_VERSION = def(8_810_00_0); + public static final TransportVersion ERROR_TRACE_IN_TRANSPORT_HEADER = def(8_811_00_0); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index cfc2e1bcdaf2b..2041754bc2bcc 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -456,7 +456,8 @@ public static void registerRequestHandler(TransportService transportService, Sea (request, channel, task) -> searchService.executeQueryPhase( request, (SearchShardTask) task, - new ChannelActionListener<>(channel) + new ChannelActionListener<>(channel), + channel.getVersion() ) ); TransportActionProxy.registerProxyAction(transportService, QUERY_ID_ACTION_NAME, true, QuerySearchResult::new); @@ -468,7 +469,8 @@ public static void registerRequestHandler(TransportService transportService, Sea (request, channel, task) -> searchService.executeQueryPhase( request, (SearchShardTask) task, - new ChannelActionListener<>(channel) + new ChannelActionListener<>(channel), + channel.getVersion() ) ); TransportActionProxy.registerProxyAction(transportService, QUERY_SCROLL_ACTION_NAME, true, ScrollQuerySearchResult::new); diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java index a9e13b86a5159..6841cb5bead0a 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java @@ -24,6 +24,8 @@ import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Tuple; import org.elasticsearch.http.HttpTransportSettings; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; import org.elasticsearch.tasks.Task; import org.elasticsearch.telemetry.tracing.TraceContext; @@ -530,6 +532,17 @@ public String getHeader(String key) { return value; } + /** + * Returns the header for the given key or defaultValue if not present + */ + public String getHeaderOrDefault(String key, String defaultValue) { + String value = getHeader(key); + if (value == null) { + return defaultValue; + } + return value; + } + /** * Returns all of the request headers from the thread's context.
* Be advised, headers might contain credentials. @@ -589,6 +602,14 @@ public void putHeader(Map header) { threadLocal.set(threadLocal.get().putHeaders(header)); } + public void setErrorTraceTransportHeader(RestRequest r) { + // set whether data nodes should send back stack trace based on the `error_trace` query parameter + if (r.paramAsBoolean("error_trace", RestController.ERROR_TRACE_DEFAULT)) { + // We only set it if error_trace is true (defaults to false) to avoid sending useless bytes + putHeader("error_trace", "true"); + } + } + /** * Puts a transient header object into this context */ diff --git a/server/src/main/java/org/elasticsearch/rest/BaseRestHandler.java b/server/src/main/java/org/elasticsearch/rest/BaseRestHandler.java index 4564a37dacf4a..509086b982319 100644 --- a/server/src/main/java/org/elasticsearch/rest/BaseRestHandler.java +++ b/server/src/main/java/org/elasticsearch/rest/BaseRestHandler.java @@ -269,5 +269,4 @@ protected Set responseParams() { protected Set responseParams(RestApiVersion restApiVersion) { return responseParams(); } - } diff --git a/server/src/main/java/org/elasticsearch/rest/RestController.java b/server/src/main/java/org/elasticsearch/rest/RestController.java index 49fe794bbe615..49801499ea991 100644 --- a/server/src/main/java/org/elasticsearch/rest/RestController.java +++ b/server/src/main/java/org/elasticsearch/rest/RestController.java @@ -93,6 +93,7 @@ public class RestController implements HttpServerTransport.Dispatcher { public static final String STATUS_CODE_KEY = "es_rest_status_code"; public static final String HANDLER_NAME_KEY = "es_rest_handler_name"; public static final String REQUEST_METHOD_KEY = "es_rest_request_method"; + public static final boolean ERROR_TRACE_DEFAULT = false; static { try (InputStream stream = RestController.class.getResourceAsStream("/config/favicon.ico")) { @@ -638,7 +639,7 @@ private void tryAllHandlers(final RestRequest request, final RestChannel channel private static void validateErrorTrace(RestRequest request, RestChannel channel) { // error_trace cannot be used when we disable detailed errors // we consume the error_trace parameter first to ensure that it is always consumed - if (request.paramAsBoolean("error_trace", false) && channel.detailedErrorsEnabled() == false) { + if (request.paramAsBoolean("error_trace", ERROR_TRACE_DEFAULT) && channel.detailedErrorsEnabled() == false) { throw new IllegalArgumentException("error traces in responses are disabled."); } } diff --git a/server/src/main/java/org/elasticsearch/rest/RestResponse.java b/server/src/main/java/org/elasticsearch/rest/RestResponse.java index d043974055667..0c359e0a4a053 100644 --- a/server/src/main/java/org/elasticsearch/rest/RestResponse.java +++ b/server/src/main/java/org/elasticsearch/rest/RestResponse.java @@ -37,6 +37,7 @@ import static java.util.Collections.singletonMap; import static org.elasticsearch.ElasticsearchException.REST_EXCEPTION_SKIP_STACK_TRACE; import static org.elasticsearch.rest.RestController.ELASTIC_PRODUCT_HTTP_HEADER; +import static org.elasticsearch.rest.RestController.ERROR_TRACE_DEFAULT; public final class RestResponse implements Releasable { @@ -143,7 +144,7 @@ public RestResponse(RestChannel channel, RestStatus status, Exception e) throws // switched in the xcontent rendering parameters. // For authorization problems (RestStatus.UNAUTHORIZED) we don't want to do this since this could // leak information to the caller who is unauthorized to make this call - if (params.paramAsBoolean("error_trace", false) && status != RestStatus.UNAUTHORIZED) { + if (params.paramAsBoolean("error_trace", ERROR_TRACE_DEFAULT) && status != RestStatus.UNAUTHORIZED) { params = new ToXContent.DelegatingMapParams(singletonMap(REST_EXCEPTION_SKIP_STACK_TRACE, "false"), params); } diff --git a/server/src/main/java/org/elasticsearch/rest/action/search/RestMultiSearchAction.java b/server/src/main/java/org/elasticsearch/rest/action/search/RestMultiSearchAction.java index 24fab92ced392..87b1a6b9c2fa8 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/search/RestMultiSearchAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/search/RestMultiSearchAction.java @@ -72,6 +72,9 @@ public String getName() { @Override public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { + if (client.threadPool() != null && client.threadPool().getThreadContext() != null) { + client.threadPool().getThreadContext().setErrorTraceTransportHeader(request); + } final MultiSearchRequest multiSearchRequest = parseRequest(request, allowExplicitIndex, searchUsageHolder, clusterSupportsFeature); return channel -> { final RestCancellableNodeClient cancellableClient = new RestCancellableNodeClient(client, request.getHttpChannel()); diff --git a/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java b/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java index a9c2ff7576b05..99c11bb60b8f0 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java @@ -95,7 +95,9 @@ public Set supportedCapabilities() { @Override public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { - + if (client.threadPool() != null && client.threadPool().getThreadContext() != null) { + client.threadPool().getThreadContext().setErrorTraceTransportHeader(request); + } SearchRequest searchRequest = new SearchRequest(); // access the BwC param, but just drop it // this might be set by old clients diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index b9bd398500c71..4557ccb3d2220 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -17,6 +17,8 @@ import org.apache.lucene.search.Query; import org.apache.lucene.search.TopDocs; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.TransportVersion; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.ResolvedIndices; @@ -152,6 +154,7 @@ import java.util.function.LongSupplier; import java.util.function.Supplier; +import static org.elasticsearch.TransportVersions.ERROR_TRACE_IN_TRANSPORT_HEADER; import static org.elasticsearch.core.TimeValue.timeValueHours; import static org.elasticsearch.core.TimeValue.timeValueMillis; import static org.elasticsearch.core.TimeValue.timeValueMinutes; @@ -272,6 +275,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv public static final int DEFAULT_SIZE = 10; public static final int DEFAULT_FROM = 0; + private static final StackTraceElement[] EMPTY_STACK_TRACE_ARRAY = new StackTraceElement[0]; private final ThreadPool threadPool; @@ -506,7 +510,41 @@ protected void doClose() { keepAliveReaper.cancel(); } + /** + * Wraps the listener to avoid sending StackTraces back to the coordinating + * node if the `error_trace` header is set to {@code false}. Upon reading we + * default to {@code true} to maintain the same behavior as before the change, + * due to older nodes not being able to specify whether it needs stack traces. + * + * @param the type of the response + * @param listener the action listener to be wrapped + * @param version channel version of the request + * @param threadPool with context where to write the new header + * @return the wrapped action listener + */ + static ActionListener maybeWrapListenerForStackTrace( + ActionListener listener, + TransportVersion version, + ThreadPool threadPool + ) { + boolean header = true; + if (version.onOrAfter(ERROR_TRACE_IN_TRANSPORT_HEADER) && threadPool.getThreadContext() != null) { + header = Boolean.parseBoolean(threadPool.getThreadContext().getHeaderOrDefault("error_trace", "false")); + } + if (header == false) { + return listener.delegateResponse((l, e) -> { + ExceptionsHelper.unwrapCausesAndSuppressed(e, err -> { + err.setStackTrace(EMPTY_STACK_TRACE_ARRAY); + return false; + }); + l.onFailure(e); + }); + } + return listener; + } + public void executeDfsPhase(ShardSearchRequest request, SearchShardTask task, ActionListener listener) { + listener = maybeWrapListenerForStackTrace(listener, request.getChannelVersion(), threadPool); final IndexShard shard = getShard(request); rewriteAndFetchShardRequest(shard, request, listener.delegateFailure((l, rewritten) -> { // fork the execution in the search thread pool @@ -544,10 +582,11 @@ private void loadOrExecuteQueryPhase(final ShardSearchRequest request, final Sea } public void executeQueryPhase(ShardSearchRequest request, SearchShardTask task, ActionListener listener) { + ActionListener finalListener = maybeWrapListenerForStackTrace(listener, request.getChannelVersion(), threadPool); assert request.canReturnNullResponseIfMatchNoDocs() == false || request.numberOfShards() > 1 : "empty responses require more than one shard"; final IndexShard shard = getShard(request); - rewriteAndFetchShardRequest(shard, request, listener.delegateFailure((l, orig) -> { + rewriteAndFetchShardRequest(shard, request, finalListener.delegateFailure((l, orig) -> { // check if we can shortcut the query phase entirely. if (orig.canReturnNullResponseIfMatchNoDocs()) { assert orig.scroll() == null; @@ -561,7 +600,7 @@ public void executeQueryPhase(ShardSearchRequest request, SearchShardTask task, ); CanMatchShardResponse canMatchResp = canMatch(canMatchContext, false); if (canMatchResp.canMatch() == false) { - listener.onResponse(QuerySearchResult.nullInstance()); + finalListener.onResponse(QuerySearchResult.nullInstance()); return; } } @@ -736,6 +775,7 @@ private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchSh } public void executeRankFeaturePhase(RankFeatureShardRequest request, SearchShardTask task, ActionListener listener) { + listener = maybeWrapListenerForStackTrace(listener, request.getShardSearchRequest().getChannelVersion(), threadPool); final ReaderContext readerContext = findReaderContext(request.contextId(), request); final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(request.getShardSearchRequest()); final Releasable markAsUsed = readerContext.markAsUsed(getKeepAlive(shardSearchRequest)); @@ -779,8 +819,10 @@ private QueryFetchSearchResult executeFetchPhase(ReaderContext reader, SearchCon public void executeQueryPhase( InternalScrollSearchRequest request, SearchShardTask task, - ActionListener listener + ActionListener listener, + TransportVersion version ) { + listener = maybeWrapListenerForStackTrace(listener, version, threadPool); final LegacyReaderContext readerContext = (LegacyReaderContext) findReaderContext(request.contextId(), request); final Releasable markAsUsed; try { @@ -816,7 +858,13 @@ public void executeQueryPhase( * It is the responsibility of the caller to ensure that the ref count is correctly decremented * when the object is no longer needed. */ - public void executeQueryPhase(QuerySearchRequest request, SearchShardTask task, ActionListener listener) { + public void executeQueryPhase( + QuerySearchRequest request, + SearchShardTask task, + ActionListener listener, + TransportVersion version + ) { + listener = maybeWrapListenerForStackTrace(listener, version, threadPool); final ReaderContext readerContext = findReaderContext(request.contextId(), request.shardSearchRequest()); final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(request.shardSearchRequest()); final Releasable markAsUsed = readerContext.markAsUsed(getKeepAlive(shardSearchRequest)); diff --git a/server/src/test/java/org/elasticsearch/rest/RestResponseTests.java b/server/src/test/java/org/elasticsearch/rest/RestResponseTests.java index b85ad31288c8c..bd810cea216fc 100644 --- a/server/src/test/java/org/elasticsearch/rest/RestResponseTests.java +++ b/server/src/test/java/org/elasticsearch/rest/RestResponseTests.java @@ -51,6 +51,7 @@ import static org.elasticsearch.ElasticsearchException.REST_EXCEPTION_SKIP_STACK_TRACE; import static org.elasticsearch.ElasticsearchExceptionTests.assertDeepEquals; import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken; +import static org.elasticsearch.rest.RestController.ERROR_TRACE_DEFAULT; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -180,7 +181,7 @@ public void testStackTrace() throws IOException { } else { assertThat(response.status(), is(RestStatus.BAD_REQUEST)); } - boolean traceExists = request.paramAsBoolean("error_trace", false) && channel.detailedErrorsEnabled(); + boolean traceExists = request.paramAsBoolean("error_trace", ERROR_TRACE_DEFAULT) && channel.detailedErrorsEnabled(); if (traceExists) { assertThat(response.content().utf8ToString(), containsString(ElasticsearchException.STACK_TRACE)); } else { diff --git a/server/src/test/java/org/elasticsearch/search/SearchServiceSingleNodeTests.java b/server/src/test/java/org/elasticsearch/search/SearchServiceSingleNodeTests.java index 02593e41f5d84..0fc1694d39926 100644 --- a/server/src/test/java/org/elasticsearch/search/SearchServiceSingleNodeTests.java +++ b/server/src/test/java/org/elasticsearch/search/SearchServiceSingleNodeTests.java @@ -2684,7 +2684,8 @@ public void testDfsQueryPhaseRewrite() { service.executeQueryPhase( new QuerySearchRequest(null, context.id(), request, new AggregatedDfs(Map.of(), Map.of(), 10)), new SearchShardTask(42L, "", "", "", null, emptyMap()), - plainActionFuture + plainActionFuture, + TransportVersion.current() ); plainActionFuture.actionGet(); diff --git a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java index 31bcab31ca8a7..d041121b8a96b 100644 --- a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java +++ b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java @@ -13,6 +13,8 @@ import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.SortField; import org.apache.lucene.util.BytesRef; +import org.elasticsearch.TransportVersion; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.cluster.metadata.IndexMetadata; @@ -53,9 +55,14 @@ import java.io.IOException; import java.util.Collections; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiFunction; import java.util.function.Predicate; +import static org.elasticsearch.search.SearchService.maybeWrapListenerForStackTrace; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.Matchers.not; + public class SearchServiceTests extends IndexShardTestCase { public void testCanMatchMatchAll() throws IOException { @@ -117,6 +124,33 @@ public Type getType() { doTestCanMatch(searchRequest, sortField, true, null, false); } + public void testMaybeWrapListenerForStackTrace() { + // Tests that the same listener has stack trace if is not wrapped or does not have stack trace if it is wrapped. + AtomicBoolean isWrapped = new AtomicBoolean(false); + ActionListener listener = new ActionListener<>() { + @Override + public void onResponse(SearchPhaseResult searchPhaseResult) { + // noop - we only care about failure scenarios + } + + @Override + public void onFailure(Exception e) { + if (isWrapped.get()) { + assertThat(e.getStackTrace().length, is(0)); + } else { + assertThat(e.getStackTrace().length, is(not(0))); + } + } + }; + Exception e = new Exception(); + e.fillInStackTrace(); + assertThat(e.getStackTrace().length, is(not(0))); + listener.onFailure(e); + listener = maybeWrapListenerForStackTrace(listener, TransportVersion.current(), threadPool); + isWrapped.set(true); + listener.onFailure(e); + } + private void doTestCanMatch( SearchRequest searchRequest, SortField sortField, diff --git a/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchErrorTraceIT.java b/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchErrorTraceIT.java new file mode 100644 index 0000000000000..39a6fa1e4b34f --- /dev/null +++ b/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchErrorTraceIT.java @@ -0,0 +1,222 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.search; + +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.transport.TransportMessageListener; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xcontent.XContentType; +import org.junit.Before; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; + +public class AsyncSearchErrorTraceIT extends ESIntegTestCase { + + @Override + protected boolean addMockHttpTransport() { + return false; // enable http + } + + @Override + protected Collection> nodePlugins() { + return List.of(AsyncSearch.class); + } + + private AtomicBoolean transportMessageHasStackTrace; + + @Before + private void setupMessageListener() { + internalCluster().getDataNodeInstances(TransportService.class).forEach(ts -> { + ts.addMessageListener(new TransportMessageListener() { + @Override + public void onResponseSent(long requestId, String action, Exception error) { + TransportMessageListener.super.onResponseSent(requestId, action, error); + if (action.startsWith("indices:data/read/search")) { + Optional throwable = ExceptionsHelper.unwrapCausesAndSuppressed( + error, + t -> t.getStackTrace().length > 0 + ); + transportMessageHasStackTrace.set(throwable.isPresent()); + } + } + }); + }); + } + + private void setupIndexWithDocs() { + createIndex("test1", "test2"); + indexRandom( + true, + prepareIndex("test1").setId("1").setSource("field", "foo"), + prepareIndex("test2").setId("10").setSource("field", 5) + ); + refresh(); + } + + public void testAsyncSearchFailingQueryErrorTraceDefault() throws IOException, InterruptedException { + transportMessageHasStackTrace = new AtomicBoolean(); + setupIndexWithDocs(); + + Request searchRequest = new Request("POST", "/_async_search"); + searchRequest.setJsonEntity(""" + { + "query": { + "simple_query_string" : { + "query": "foo", + "fields": ["field"] + } + } + } + """); + searchRequest.addParameter("keep_on_completion", "true"); + searchRequest.addParameter("wait_for_completion_timeout", "0ms"); + Map responseEntity = performRequestAndGetResponseEntityAfterDelay(searchRequest, TimeValue.ZERO); + String asyncExecutionId = (String) responseEntity.get("id"); + Request request = new Request("GET", "/_async_search/" + asyncExecutionId); + while (responseEntity.get("is_running") instanceof Boolean isRunning && isRunning) { + responseEntity = performRequestAndGetResponseEntityAfterDelay(request, TimeValue.timeValueSeconds(1L)); + } + // check that the stack trace was not sent from the data node to the coordinating node + assertFalse(transportMessageHasStackTrace.get()); + } + + public void testAsyncSearchFailingQueryErrorTraceTrue() throws IOException, InterruptedException { + transportMessageHasStackTrace = new AtomicBoolean(); + setupIndexWithDocs(); + + Request searchRequest = new Request("POST", "/_async_search"); + searchRequest.setJsonEntity(""" + { + "query": { + "simple_query_string" : { + "query": "foo", + "fields": ["field"] + } + } + } + """); + searchRequest.addParameter("error_trace", "true"); + searchRequest.addParameter("keep_on_completion", "true"); + searchRequest.addParameter("wait_for_completion_timeout", "0ms"); + Map responseEntity = performRequestAndGetResponseEntityAfterDelay(searchRequest, TimeValue.ZERO); + String asyncExecutionId = (String) responseEntity.get("id"); + Request request = new Request("GET", "/_async_search/" + asyncExecutionId); + request.addParameter("error_trace", "true"); + while (responseEntity.get("is_running") instanceof Boolean isRunning && isRunning) { + responseEntity = performRequestAndGetResponseEntityAfterDelay(request, TimeValue.timeValueSeconds(1L)); + } + // check that the stack trace was sent from the data node to the coordinating node + assertTrue(transportMessageHasStackTrace.get()); + } + + public void testAsyncSearchFailingQueryErrorTraceFalse() throws IOException, InterruptedException { + transportMessageHasStackTrace = new AtomicBoolean(); + setupIndexWithDocs(); + + Request searchRequest = new Request("POST", "/_async_search"); + searchRequest.setJsonEntity(""" + { + "query": { + "simple_query_string" : { + "query": "foo", + "fields": ["field"] + } + } + } + """); + searchRequest.addParameter("error_trace", "false"); + searchRequest.addParameter("keep_on_completion", "true"); + searchRequest.addParameter("wait_for_completion_timeout", "0ms"); + Map responseEntity = performRequestAndGetResponseEntityAfterDelay(searchRequest, TimeValue.ZERO); + String asyncExecutionId = (String) responseEntity.get("id"); + Request request = new Request("GET", "/_async_search/" + asyncExecutionId); + request.addParameter("error_trace", "false"); + while (responseEntity.get("is_running") instanceof Boolean isRunning && isRunning) { + responseEntity = performRequestAndGetResponseEntityAfterDelay(request, TimeValue.timeValueSeconds(1L)); + } + // check that the stack trace was not sent from the data node to the coordinating node + assertFalse(transportMessageHasStackTrace.get()); + } + + public void testAsyncSearchFailingQueryErrorTraceFalseOnSubmitAndTrueOnGet() throws IOException, InterruptedException { + transportMessageHasStackTrace = new AtomicBoolean(); + setupIndexWithDocs(); + + Request searchRequest = new Request("POST", "/_async_search"); + searchRequest.setJsonEntity(""" + { + "query": { + "simple_query_string" : { + "query": "foo", + "fields": ["field"] + } + } + } + """); + searchRequest.addParameter("error_trace", "false"); + searchRequest.addParameter("keep_on_completion", "true"); + searchRequest.addParameter("wait_for_completion_timeout", "0ms"); + Map responseEntity = performRequestAndGetResponseEntityAfterDelay(searchRequest, TimeValue.ZERO); + String asyncExecutionId = (String) responseEntity.get("id"); + Request request = new Request("GET", "/_async_search/" + asyncExecutionId); + request.addParameter("error_trace", "true"); + while (responseEntity.get("is_running") instanceof Boolean isRunning && isRunning) { + responseEntity = performRequestAndGetResponseEntityAfterDelay(request, TimeValue.timeValueSeconds(1L)); + } + // check that the stack trace was not sent from the data node to the coordinating node + assertFalse(transportMessageHasStackTrace.get()); + } + + public void testAsyncSearchFailingQueryErrorTraceTrueOnSubmitAndFalseOnGet() throws IOException, InterruptedException { + transportMessageHasStackTrace = new AtomicBoolean(); + setupIndexWithDocs(); + + Request searchRequest = new Request("POST", "/_async_search"); + searchRequest.setJsonEntity(""" + { + "query": { + "simple_query_string" : { + "query": "foo", + "fields": ["field"] + } + } + } + """); + searchRequest.addParameter("error_trace", "true"); + searchRequest.addParameter("keep_on_completion", "true"); + searchRequest.addParameter("wait_for_completion_timeout", "0ms"); + Map responseEntity = performRequestAndGetResponseEntityAfterDelay(searchRequest, TimeValue.ZERO); + String asyncExecutionId = (String) responseEntity.get("id"); + Request request = new Request("GET", "/_async_search/" + asyncExecutionId); + request.addParameter("error_trace", "false"); + while (responseEntity.get("is_running") instanceof Boolean isRunning && isRunning) { + responseEntity = performRequestAndGetResponseEntityAfterDelay(request, TimeValue.timeValueSeconds(1L)); + } + // check that the stack trace was sent from the data node to the coordinating node + assertTrue(transportMessageHasStackTrace.get()); + } + + private Map performRequestAndGetResponseEntityAfterDelay(Request r, TimeValue sleep) throws IOException, + InterruptedException { + Thread.sleep(sleep.millis()); + Response response = getRestClient().performRequest(r); + XContentType entityContentType = XContentType.fromMediaType(response.getEntity().getContentType().getValue()); + return XContentHelper.convertToMap(entityContentType.xContent(), response.getEntity().getContent(), false); + } +} diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/RestSubmitAsyncSearchAction.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/RestSubmitAsyncSearchAction.java index bd09d8f7740a1..952febd46c34c 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/RestSubmitAsyncSearchAction.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/RestSubmitAsyncSearchAction.java @@ -55,6 +55,9 @@ public String getName() { @Override protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + if (client.threadPool() != null && client.threadPool().getThreadContext() != null) { + client.threadPool().getThreadContext().setErrorTraceTransportHeader(request); + } SubmitAsyncSearchRequest submit = new SubmitAsyncSearchRequest(); IntConsumer setSize = size -> submit.getSearchRequest().source().size(size); // for simplicity, we share parsing with ordinary search. That means a couple of unsupported parameters, like scroll diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/rest/ServerSentEventsRestActionListener.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/rest/ServerSentEventsRestActionListener.java index 3177474ea8ca6..bf94f072b6e04 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/rest/ServerSentEventsRestActionListener.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/rest/ServerSentEventsRestActionListener.java @@ -43,6 +43,7 @@ import static org.elasticsearch.ElasticsearchException.REST_EXCEPTION_SKIP_CAUSE; import static org.elasticsearch.ElasticsearchException.REST_EXCEPTION_SKIP_STACK_TRACE; +import static org.elasticsearch.rest.RestController.ERROR_TRACE_DEFAULT; /** * A version of {@link org.elasticsearch.rest.action.RestChunkedToXContentListener} that reads from a {@link Flow.Publisher} and encodes @@ -161,7 +162,7 @@ private ChunkedToXContent errorChunk(Throwable t) { } var errorParams = p; - if (errorParams.paramAsBoolean("error_trace", false) && status != RestStatus.UNAUTHORIZED) { + if (errorParams.paramAsBoolean("error_trace", ERROR_TRACE_DEFAULT) && status != RestStatus.UNAUTHORIZED) { errorParams = new ToXContent.DelegatingMapParams( Map.of(REST_EXCEPTION_SKIP_STACK_TRACE, "false", REST_EXCEPTION_SKIP_CAUSE, "true"), params