diff --git a/TESTING.asciidoc b/TESTING.asciidoc index 305e0f6371833..d33121a15dcf7 100644 --- a/TESTING.asciidoc +++ b/TESTING.asciidoc @@ -453,6 +453,11 @@ You can run a group of YAML test by using wildcards: --tests "org.elasticsearch.test.rest.ClientYamlTestSuiteIT.test {yaml=index/*/*}" --------------------------------------------------------------------------- +Note that if the selected test via the `--tests` filter is not a valid test, i.e., the YAML test +runner is not able to parse and load it, you might get an error message indicating that the test +was not found. In such cases, running the whole suite without using the `--tests` could show more +specific error messages about why the test runner is not able to parse or load a certain test. + The YAML REST tests support all the options provided by the randomized runner, plus the following: * `tests.rest.blacklist`: comma separated globs that identify tests that are diff --git a/docs/changelog/98518.yaml b/docs/changelog/98518.yaml new file mode 100644 index 0000000000000..2f961fc11ce69 --- /dev/null +++ b/docs/changelog/98518.yaml @@ -0,0 +1,6 @@ +pr: 98518 +summary: Add `index.look_back_time` setting for tsdb data streams +area: TSDB +type: enhancement +issues: + - 98463 diff --git a/docs/changelog/98915.yaml b/docs/changelog/98915.yaml new file mode 100644 index 0000000000000..c23ddcc55d98e --- /dev/null +++ b/docs/changelog/98915.yaml @@ -0,0 +1,5 @@ +pr: 98915 +summary: Avoid risk of OOM in datafeeds when memory is constrained +area: Machine Learning +type: bug +issues: [89769] diff --git a/docs/reference/data-streams/set-up-tsds.asciidoc b/docs/reference/data-streams/set-up-tsds.asciidoc index 3c15011871f89..a98e3c7302424 100644 --- a/docs/reference/data-streams/set-up-tsds.asciidoc +++ b/docs/reference/data-streams/set-up-tsds.asciidoc @@ -177,6 +177,7 @@ Optionally, the index settings component template for a TSDS can include: * Your lifecycle policy in the `index.lifecycle.name` index setting. * The <> index setting. +* The <> index setting. * Other index settings, such as <>, for your TSDS's backing indices. diff --git a/docs/reference/data-streams/tsds-index-settings.asciidoc b/docs/reference/data-streams/tsds-index-settings.asciidoc index fa5c9b8cd821f..8091163ffe883 100644 --- a/docs/reference/data-streams/tsds-index-settings.asciidoc +++ b/docs/reference/data-streams/tsds-index-settings.asciidoc @@ -33,6 +33,15 @@ days). Only indices with an `index.mode` of `time_series` support this setting. For more information, refer to <>. Additionally this setting can not be less than `time_series.poll_interval` cluster setting. +[[index-look-back-time]] +`index.look_back_time`:: +(<<_static_index_settings,Static>>, <>) +Interval used to calculate the `index.time_series.start_time` for a TSDS's first +backing index when a tsdb data stream is created. Defaults to `2h` (2 hours). +Accepts `1m` (one minute) to `7d` (seven days). Only indices with an `index.mode` +of `time_series` support this setting. For more information, +refer to <>. + [[index-routing-path]] `index.routing_path`:: (<<_static_index_settings,Static>>, string or array of strings) Plain `keyword` fields used to route documents in a TSDS to index shards. Supports wildcards diff --git a/docs/reference/data-streams/tsds.asciidoc b/docs/reference/data-streams/tsds.asciidoc index 3f49a7ab8c700..d6e9ea08f0892 100644 --- a/docs/reference/data-streams/tsds.asciidoc +++ b/docs/reference/data-streams/tsds.asciidoc @@ -253,6 +253,22 @@ value borders the `index.time_series.start_time` for the new write index. This ensures the `@timestamp` ranges for neighboring backing indices always border but never overlap. +[discrete] +[[tsds-look-back-time]] +==== Look-back time + +Use the <> index setting to +configure how far in the past you can add documents to an index. When you +create a data stream for a TSDS, {es} calculates the index's +`index.time_series.start_time` value as: + +`now - index.look_back_time` + +This setting is only used when a data stream gets created and controls +the `index.time_series.start_time` index setting of the first backing index. +Configuring this index setting can be useful to accept documents with `@timestamp` +field values that are older than 2 hours (the `index.look_back_time` default). + [discrete] [[tsds-accepted-time-range]] ==== Accepted time range for adding data diff --git a/docs/reference/snapshot-restore/apis/put-repo-api.asciidoc b/docs/reference/snapshot-restore/apis/put-repo-api.asciidoc index 4d578b3df489d..a50d4e3311937 100644 --- a/docs/reference/snapshot-restore/apis/put-repo-api.asciidoc +++ b/docs/reference/snapshot-restore/apis/put-repo-api.asciidoc @@ -17,6 +17,8 @@ PUT /_snapshot/my_repository } ---- +IMPORTANT: If you're migrating {ref}/searchable-snapshots.html[searchable snapshots], the repository's name must be identical in the source and destination clusters. + [[put-snapshot-repo-api-request]] ==== {api-request-title} diff --git a/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/TsdbDataStreamRestIT.java b/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/TsdbDataStreamRestIT.java index 1f27c43f2f2f4..2a4b6f0c5a5ee 100644 --- a/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/TsdbDataStreamRestIT.java +++ b/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/TsdbDataStreamRestIT.java @@ -10,7 +10,9 @@ import org.elasticsearch.client.Request; import org.elasticsearch.client.ResponseException; import org.elasticsearch.common.time.DateFormatter; +import org.elasticsearch.common.time.DateFormatters; import org.elasticsearch.common.time.FormatNames; +import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.test.rest.ObjectPath; import org.junit.Before; @@ -618,6 +620,69 @@ public void testUpdateComponentTemplateDoesNotFailIndexTemplateValidation() thro client().performRequest(request); } + public void testLookBackTime() throws IOException { + // Create template that uses index.look_back_time index setting: + String template = """ + { + "index_patterns": ["test*"], + "template": { + "settings":{ + "index": { + "look_back_time": "24h", + "number_of_replicas": 0, + "mode": "time_series" + } + }, + "mappings":{ + "properties": { + "@timestamp" : { + "type": "date" + }, + "field": { + "type": "keyword", + "time_series_dimension": true + } + } + } + }, + "data_stream": {} + }"""; + var putIndexTemplateRequest = new Request("PUT", "/_index_template/2"); + putIndexTemplateRequest.setJsonEntity(template); + assertOK(client().performRequest(putIndexTemplateRequest)); + + // Create data stream: + var createDataStreamRequest = new Request("PUT", "/_data_stream/test123"); + assertOK(client().performRequest(createDataStreamRequest)); + + // Check data stream has been created: + var getDataStreamsRequest = new Request("GET", "/_data_stream"); + var response = client().performRequest(getDataStreamsRequest); + assertOK(response); + var dataStreams = entityAsMap(response); + assertThat(ObjectPath.evaluate(dataStreams, "data_streams"), hasSize(1)); + assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.name"), equalTo("test123")); + assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.generation"), equalTo(1)); + assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.template"), equalTo("2")); + assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.indices"), hasSize(1)); + String firstBackingIndex = ObjectPath.evaluate(dataStreams, "data_streams.0.indices.0.index_name"); + assertThat(firstBackingIndex, backingIndexEqualTo("test123", 1)); + + // Check the backing index: + // 2023-08-15T04:35:50.000Z + var indices = getIndex(firstBackingIndex); + var escapedBackingIndex = firstBackingIndex.replace(".", "\\."); + assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".data_stream"), equalTo("test123")); + assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.mode"), equalTo("time_series")); + String startTimeFirstBackingIndex = ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.start_time"); + assertThat(startTimeFirstBackingIndex, notNullValue()); + Instant now = Instant.now(); + Instant startTime = DateFormatters.from(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parse(startTimeFirstBackingIndex)).toInstant(); + assertTrue(now.minus(24, ChronoUnit.HOURS).isAfter(startTime)); + String endTimeFirstBackingIndex = ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.end_time"); + assertThat(endTimeFirstBackingIndex, notNullValue()); + } + private static Map getIndex(String indexName) throws IOException { var getIndexRequest = new Request("GET", "/" + indexName + "?human"); var response = client().performRequest(getIndexRequest); diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamIndexSettingsProvider.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamIndexSettingsProvider.java index f4e660c2b18f4..064030ed2b6d5 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamIndexSettingsProvider.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamIndexSettingsProvider.java @@ -84,10 +84,11 @@ public Settings getAdditionalIndexSettings( if (indexMode == IndexMode.TIME_SERIES) { Settings.Builder builder = Settings.builder(); TimeValue lookAheadTime = DataStreamsPlugin.LOOK_AHEAD_TIME.get(allSettings); + TimeValue lookBackTime = DataStreamsPlugin.LOOK_BACK_TIME.get(allSettings); final Instant start; final Instant end; if (dataStream == null || migrating) { - start = DataStream.getCanonicalTimestampBound(resolvedAt.minusMillis(lookAheadTime.getMillis())); + start = DataStream.getCanonicalTimestampBound(resolvedAt.minusMillis(lookBackTime.getMillis())); end = DataStream.getCanonicalTimestampBound(resolvedAt.plusMillis(lookAheadTime.getMillis())); } else { IndexMetadata currentLatestBackingIndex = metadata.index(dataStream.getWriteIndex()); diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java index 3f8b7e40eeb43..cd221ada7a4dc 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java @@ -105,7 +105,14 @@ public class DataStreamsPlugin extends Plugin implements ActionPlugin { Setting.Property.Dynamic ); public static final String LIFECYCLE_CUSTOM_INDEX_METADATA_KEY = "data_stream_lifecycle"; - + public static final Setting LOOK_BACK_TIME = Setting.timeSetting( + "index.look_back_time", + TimeValue.timeValueHours(2), + TimeValue.timeValueMinutes(1), + TimeValue.timeValueDays(7), + Setting.Property.IndexScope, + Setting.Property.Dynamic + ); // The dependency of index.look_ahead_time is a cluster setting and currently there is no clean validation approach for this: private final SetOnce updateTimeSeriesRangeService = new SetOnce<>(); private final SetOnce errorStoreInitialisationService = new SetOnce<>(); @@ -141,6 +148,7 @@ public List> getSettings() { List> pluginSettings = new ArrayList<>(); pluginSettings.add(TIME_SERIES_POLL_INTERVAL); pluginSettings.add(LOOK_AHEAD_TIME); + pluginSettings.add(LOOK_BACK_TIME); pluginSettings.add(DataStreamLifecycleService.DATA_STREAM_LIFECYCLE_POLL_INTERVAL_SETTING); pluginSettings.add(DataStreamLifecycleService.DATA_STREAM_MERGE_POLICY_TARGET_FLOOR_SEGMENT_SETTING); pluginSettings.add(DataStreamLifecycleService.DATA_STREAM_MERGE_POLICY_TARGET_FACTOR_SETTING); diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/DataStreamIndexSettingsProviderTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/DataStreamIndexSettingsProviderTests.java index 27fe65ba309d3..23a86b657b82d 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/DataStreamIndexSettingsProviderTests.java +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/DataStreamIndexSettingsProviderTests.java @@ -38,6 +38,7 @@ public class DataStreamIndexSettingsProviderTests extends ESTestCase { + private static final TimeValue DEFAULT_LOOK_BACK_TIME = TimeValue.timeValueHours(2); // default private static final TimeValue DEFAULT_LOOK_AHEAD_TIME = TimeValue.timeValueHours(2); // default DataStreamIndexSettingsProvider provider; @@ -83,7 +84,7 @@ public void testGetAdditionalIndexSettings() throws Exception { List.of(new CompressedXContent(mapping)) ); assertThat(result.size(), equalTo(3)); - assertThat(IndexSettings.TIME_SERIES_START_TIME.get(result), equalTo(now.minusMillis(DEFAULT_LOOK_AHEAD_TIME.getMillis()))); + assertThat(IndexSettings.TIME_SERIES_START_TIME.get(result), equalTo(now.minusMillis(DEFAULT_LOOK_BACK_TIME.getMillis()))); assertThat(IndexSettings.TIME_SERIES_END_TIME.get(result), equalTo(now.plusMillis(DEFAULT_LOOK_AHEAD_TIME.getMillis()))); assertThat(IndexMetadata.INDEX_ROUTING_PATH.get(result), contains("field3")); } @@ -235,10 +236,31 @@ public void testGetAdditionalIndexSettingsLookAheadTime() throws Exception { List.of(new CompressedXContent("{}")) ); assertThat(result.size(), equalTo(2)); - assertThat(IndexSettings.TIME_SERIES_START_TIME.get(result), equalTo(now.minusMillis(lookAheadTime.getMillis()))); + assertThat(IndexSettings.TIME_SERIES_START_TIME.get(result), equalTo(now.minusMillis(DEFAULT_LOOK_BACK_TIME.getMillis()))); assertThat(IndexSettings.TIME_SERIES_END_TIME.get(result), equalTo(now.plusMillis(lookAheadTime.getMillis()))); } + public void testGetAdditionalIndexSettingsLookBackTime() throws Exception { + Metadata metadata = Metadata.EMPTY_METADATA; + String dataStreamName = "logs-app1"; + + Instant now = Instant.now().truncatedTo(ChronoUnit.SECONDS); + TimeValue lookBackTime = TimeValue.timeValueHours(12); + Settings settings = builder().put("index.mode", "time_series").put("index.look_back_time", lookBackTime.getStringRep()).build(); + Settings result = provider.getAdditionalIndexSettings( + DataStream.getDefaultBackingIndexName(dataStreamName, 1), + dataStreamName, + true, + metadata, + now, + settings, + List.of(new CompressedXContent("{}")) + ); + assertThat(result.size(), equalTo(2)); + assertThat(IndexSettings.TIME_SERIES_START_TIME.get(result), equalTo(now.minusMillis(lookBackTime.getMillis()))); + assertThat(IndexSettings.TIME_SERIES_END_TIME.get(result), equalTo(now.plusMillis(DEFAULT_LOOK_AHEAD_TIME.getMillis()))); + } + public void testGetAdditionalIndexSettingsDataStreamAlreadyCreated() throws Exception { String dataStreamName = "logs-app1"; TimeValue lookAheadTime = TimeValue.timeValueHours(2); @@ -358,7 +380,7 @@ public void testGetAdditionalIndexSettingsMigrateToTsdb() { List.of() ); assertThat(result.size(), equalTo(2)); - assertThat(IndexSettings.TIME_SERIES_START_TIME.get(result), equalTo(now.minusMillis(DEFAULT_LOOK_AHEAD_TIME.getMillis()))); + assertThat(IndexSettings.TIME_SERIES_START_TIME.get(result), equalTo(now.minusMillis(DEFAULT_LOOK_BACK_TIME.getMillis()))); assertThat(IndexSettings.TIME_SERIES_END_TIME.get(result), equalTo(now.plusMillis(DEFAULT_LOOK_AHEAD_TIME.getMillis()))); } diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/LookAHeadTimeTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/LookAHeadTimeTests.java index a0ed1a83d0de1..a612587262463 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/LookAHeadTimeTests.java +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/LookAHeadTimeTests.java @@ -16,7 +16,6 @@ import org.elasticsearch.test.ESSingleNodeTestCase; import org.junit.After; -import java.io.IOException; import java.util.Collection; import java.util.List; @@ -52,7 +51,7 @@ public void testTimeSeriesPollIntervalSettingToHigh() { assertThat(e.getMessage(), equalTo("failed to parse value [11m] for setting [time_series.poll_interval], must be <= [10m]")); } - public void testLookAheadTimeSetting() throws IOException { + public void testLookAheadTimeSetting() { var settings = Settings.builder().put(DataStreamsPlugin.LOOK_AHEAD_TIME.getKey(), "10m").build(); updateIndexSettings(settings); } @@ -69,6 +68,18 @@ public void testLookAheadTimeSettingToHigh() { assertThat(e.getMessage(), equalTo("failed to parse value [8d] for setting [index.look_ahead_time], must be <= [7d]")); } + public void testLookBackTimeSettingToLow() { + var settings = Settings.builder().put(DataStreamsPlugin.LOOK_BACK_TIME.getKey(), "1s").build(); + var e = expectThrows(IllegalArgumentException.class, () -> updateIndexSettings(settings)); + assertThat(e.getMessage(), equalTo("failed to parse value [1s] for setting [index.look_back_time], must be >= [1m]")); + } + + public void testLookBackTimeSettingToHigh() { + var settings = Settings.builder().put(DataStreamsPlugin.LOOK_BACK_TIME.getKey(), "8d").build(); + var e = expectThrows(IllegalArgumentException.class, () -> updateIndexSettings(settings)); + assertThat(e.getMessage(), equalTo("failed to parse value [8d] for setting [index.look_back_time], must be <= [7d]")); + } + public void testLookAheadTimeSettingLowerThanTimeSeriesPollIntervalSetting() { { var settings = Settings.builder() @@ -99,7 +110,7 @@ public void testLookAheadTimeSettingLowerThanTimeSeriesPollIntervalSetting() { } } - public void testLookAheadTimeSettingHigherThanTimeSeriesPollIntervalSetting() throws IOException { + public void testLookAheadTimeSettingHigherThanTimeSeriesPollIntervalSetting() { var clusterSettings = Settings.builder().put(DataStreamsPlugin.TIME_SERIES_POLL_INTERVAL.getKey(), "10m").build(); updateClusterSettings(clusterSettings); var indexSettings = Settings.builder().put(DataStreamsPlugin.LOOK_AHEAD_TIME.getKey(), "100m").build(); @@ -110,7 +121,7 @@ private void updateClusterSettings(Settings settings) { clusterAdmin().updateSettings(new ClusterUpdateSettingsRequest().persistentSettings(settings)).actionGet(); } - private void updateIndexSettings(Settings settings) throws IOException { + private void updateIndexSettings(Settings settings) { try { createIndex("test"); } catch (ResourceAlreadyExistsException e) { diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/search.vectors/90_sparse_vector.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/search.vectors/90_sparse_vector.yml index 2ddb95d6fc139..6a03e99e97319 100644 --- a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/search.vectors/90_sparse_vector.yml +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/search.vectors/90_sparse_vector.yml @@ -122,8 +122,10 @@ --- "Sparse vector in 8.x": - skip: - version: " - 7.99.99, 8.11.0 - " - reason: "sparse_vector field type not supported in 8.x until 8.11.0" + # version: " - 7.99.99, 8.11.0 - " + # reason: "sparse_vector field type not supported in 8.x until 8.11.0" + version: "all" + reason: "https://github.com/elastic/elasticsearch/issues/99318" - do: catch: /The \[sparse_vector\] field type .* supported/ indices.create: diff --git a/server/src/main/java/org/elasticsearch/action/support/ListenerTimeouts.java b/server/src/main/java/org/elasticsearch/action/support/ListenerTimeouts.java index 98c19fa5174dc..a4c66f6e0b541 100644 --- a/server/src/main/java/org/elasticsearch/action/support/ListenerTimeouts.java +++ b/server/src/main/java/org/elasticsearch/action/support/ListenerTimeouts.java @@ -14,6 +14,7 @@ import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; +import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; @@ -35,7 +36,7 @@ public static ActionListener wrapWithTimeout( ThreadPool threadPool, ActionListener listener, TimeValue timeout, - String executor, + Executor executor, String listenerName ) { return wrapWithTimeout(threadPool, timeout, executor, listener, (ignore) -> { @@ -55,6 +56,22 @@ public static ActionListener wrapWithTimeout( * @param onTimeout consumer will be called and the resulting wrapper will be passed to it as a parameter * @return the wrapped listener that will timeout */ + public static ActionListener wrapWithTimeout( + ThreadPool threadPool, + TimeValue timeout, + Executor executor, + ActionListener listener, + Consumer> onTimeout + ) { + TimeoutableListener wrappedListener = new TimeoutableListener<>(listener, onTimeout); + wrappedListener.cancellable = threadPool.schedule(wrappedListener, timeout, executor); + return wrappedListener; + } + + /** + * @deprecated Use {@link #wrapWithTimeout(ThreadPool, TimeValue, Executor, ActionListener, Consumer)} instead. + */ + @Deprecated(forRemoval = true) public static ActionListener wrapWithTimeout( ThreadPool threadPool, TimeValue timeout, diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTask.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTask.java index 031ff232351a8..add11b03c5f1c 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTask.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTask.java @@ -14,6 +14,7 @@ import java.io.Closeable; import java.util.Objects; +import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import static org.elasticsearch.core.Strings.format; @@ -25,6 +26,7 @@ public abstract class AbstractAsyncTask implements Runnable, Closeable { private final Logger logger; private final ThreadPool threadPool; + private final Executor executor; private final AtomicBoolean closed = new AtomicBoolean(false); private final boolean autoReschedule; private volatile Scheduler.Cancellable cancellable; @@ -32,9 +34,10 @@ public abstract class AbstractAsyncTask implements Runnable, Closeable { private volatile Exception lastThrownException; private volatile TimeValue interval; - protected AbstractAsyncTask(Logger logger, ThreadPool threadPool, TimeValue interval, boolean autoReschedule) { + protected AbstractAsyncTask(Logger logger, ThreadPool threadPool, Executor executor, TimeValue interval, boolean autoReschedule) { this.logger = logger; this.threadPool = threadPool; + this.executor = executor; this.interval = interval; this.autoReschedule = autoReschedule; } @@ -81,7 +84,7 @@ public synchronized void rescheduleIfNecessary() { if (logger.isTraceEnabled()) { logger.trace("scheduling {} every {}", toString(), interval); } - cancellable = threadPool.schedule(this, interval, getThreadPool()); + cancellable = threadPool.schedule(this, interval, executor); isScheduledOrRunning = true; } else { logger.trace("scheduled {} disabled", toString()); @@ -167,12 +170,4 @@ private static boolean sameException(Exception left, Exception right) { } protected abstract void runInternal(); - - /** - * Use the same threadpool by default. - * Derived classes can change this if required. - */ - protected String getThreadPool() { - return ThreadPool.Names.SAME; - } } diff --git a/server/src/main/java/org/elasticsearch/index/IndexService.java b/server/src/main/java/org/elasticsearch/index/IndexService.java index ded7e459403eb..05c6fd63c3fcb 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexService.java +++ b/server/src/main/java/org/elasticsearch/index/IndexService.java @@ -98,6 +98,7 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BooleanSupplier; @@ -1062,8 +1063,8 @@ abstract static class BaseAsyncTask extends AbstractAsyncTask { protected final IndexService indexService; - BaseAsyncTask(final IndexService indexService, final TimeValue interval) { - super(indexService.logger, indexService.threadPool, interval, true); + BaseAsyncTask(final IndexService indexService, final Executor executor, final TimeValue interval) { + super(indexService.logger, indexService.threadPool, executor, interval, true); this.indexService = indexService; rescheduleIfNecessary(); } @@ -1082,12 +1083,11 @@ protected boolean mustReschedule() { static final class AsyncTranslogFSync extends BaseAsyncTask { AsyncTranslogFSync(IndexService indexService) { - super(indexService, indexService.getIndexSettings().getTranslogSyncInterval()); - } - - @Override - protected String getThreadPool() { - return ThreadPool.Names.FLUSH; + super( + indexService, + indexService.threadPool.executor(ThreadPool.Names.FLUSH), + indexService.getIndexSettings().getTranslogSyncInterval() + ); } @Override @@ -1111,7 +1111,11 @@ public String toString() { static final class AsyncRefreshTask extends BaseAsyncTask { AsyncRefreshTask(IndexService indexService) { - super(indexService, indexService.getIndexSettings().getRefreshInterval()); + super( + indexService, + indexService.threadPool.executor(ThreadPool.Names.REFRESH), + indexService.getIndexSettings().getRefreshInterval() + ); } @Override @@ -1119,11 +1123,6 @@ protected void runInternal() { indexService.maybeRefreshEngine(false); } - @Override - protected String getThreadPool() { - return ThreadPool.Names.REFRESH; - } - @Override public String toString() { return "refresh"; @@ -1135,6 +1134,7 @@ final class AsyncTrimTranslogTask extends BaseAsyncTask { AsyncTrimTranslogTask(IndexService indexService) { super( indexService, + threadPool.generic(), indexService.getIndexSettings() .getSettings() .getAsTime(INDEX_TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING, TimeValue.timeValueMinutes(10)) @@ -1151,11 +1151,6 @@ protected void runInternal() { indexService.maybeTrimTranslog(); } - @Override - protected String getThreadPool() { - return ThreadPool.Names.GENERIC; - } - @Override public String toString() { return "trim_translog"; @@ -1187,7 +1182,11 @@ private static final class AsyncGlobalCheckpointTask extends BaseAsyncTask { AsyncGlobalCheckpointTask(final IndexService indexService) { // index.global_checkpoint_sync_interval is not a real setting, it is only registered in tests - super(indexService, GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.get(indexService.getIndexSettings().getSettings())); + super( + indexService, + indexService.getThreadPool().generic(), + GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.get(indexService.getIndexSettings().getSettings()) + ); } @Override @@ -1195,11 +1194,6 @@ protected void runInternal() { indexService.maybeSyncGlobalCheckpoints(); } - @Override - protected String getThreadPool() { - return ThreadPool.Names.GENERIC; - } - @Override public String toString() { return "global_checkpoint_sync"; @@ -1209,7 +1203,11 @@ public String toString() { private static final class AsyncRetentionLeaseSyncTask extends BaseAsyncTask { AsyncRetentionLeaseSyncTask(final IndexService indexService) { - super(indexService, RETENTION_LEASE_SYNC_INTERVAL_SETTING.get(indexService.getIndexSettings().getSettings())); + super( + indexService, + indexService.threadPool.executor(ThreadPool.Names.MANAGEMENT), + RETENTION_LEASE_SYNC_INTERVAL_SETTING.get(indexService.getIndexSettings().getSettings()) + ); } @Override @@ -1217,11 +1215,6 @@ protected void runInternal() { indexService.syncRetentionLeases(); } - @Override - protected String getThreadPool() { - return ThreadPool.Names.MANAGEMENT; - } - @Override public String toString() { return "retention_lease_sync"; diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java index e2ca66f3835a9..e2d7d0c8366d8 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AbstractAsyncTask; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.core.TimeValue; @@ -524,7 +525,7 @@ private static Assignment unassignedAssignment(String reason) { class PeriodicRechecker extends AbstractAsyncTask { PeriodicRechecker(TimeValue recheckInterval) { - super(logger, threadPool, recheckInterval, false); + super(logger, threadPool, EsExecutors.DIRECT_EXECUTOR_SERVICE, recheckInterval, false); } @Override @@ -535,6 +536,7 @@ protected boolean mustReschedule() { @Override public void runInternal() { if (clusterService.localNode().isMasterNode()) { + // TODO just run on the elected master? final ClusterState state = clusterService.state(); logger.trace("periodic persistent task assignment check running for cluster state {}", state.getVersion()); if (isAnyTaskUnassigned(state.getMetadata().custom(PersistentTasksCustomMetadata.TYPE))) { diff --git a/server/src/test/java/org/elasticsearch/action/support/ListenerTimeoutsTests.java b/server/src/test/java/org/elasticsearch/action/support/ListenerTimeoutsTests.java index ca959f47fd105..deaa7e5d31373 100644 --- a/server/src/test/java/org/elasticsearch/action/support/ListenerTimeoutsTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/ListenerTimeoutsTests.java @@ -17,6 +17,7 @@ import org.junit.Before; import java.io.IOException; +import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -25,13 +26,16 @@ public class ListenerTimeoutsTests extends ESTestCase { private final TimeValue timeout = TimeValue.timeValueMillis(10); - private final String generic = ThreadPool.Names.GENERIC; private DeterministicTaskQueue taskQueue; + private ThreadPool threadPool; + private Executor timeoutExecutor; @Before public void setUp() throws Exception { super.setUp(); taskQueue = new DeterministicTaskQueue(); + threadPool = taskQueue.getThreadPool(); + timeoutExecutor = threadPool.generic(); } public void testListenerTimeout() { @@ -39,7 +43,7 @@ public void testListenerTimeout() { AtomicReference exception = new AtomicReference<>(); ActionListener listener = wrap(success, exception); - ActionListener wrapped = ListenerTimeouts.wrapWithTimeout(taskQueue.getThreadPool(), listener, timeout, generic, "test"); + ActionListener wrapped = ListenerTimeouts.wrapWithTimeout(threadPool, listener, timeout, timeoutExecutor, "test"); assertTrue(taskQueue.hasDeferredTasks()); taskQueue.advanceTime(); taskQueue.runAllRunnableTasks(); @@ -56,7 +60,7 @@ public void testFinishNormallyBeforeTimeout() { AtomicReference exception = new AtomicReference<>(); ActionListener listener = wrap(success, exception); - ActionListener wrapped = ListenerTimeouts.wrapWithTimeout(taskQueue.getThreadPool(), listener, timeout, generic, "test"); + ActionListener wrapped = ListenerTimeouts.wrapWithTimeout(threadPool, listener, timeout, timeoutExecutor, "test"); wrapped.onResponse(null); wrapped.onFailure(new IOException("boom")); wrapped.onResponse(null); @@ -74,7 +78,7 @@ public void testFinishExceptionallyBeforeTimeout() { AtomicReference exception = new AtomicReference<>(); ActionListener listener = wrap(success, exception); - ActionListener wrapped = ListenerTimeouts.wrapWithTimeout(taskQueue.getThreadPool(), listener, timeout, generic, "test"); + ActionListener wrapped = ListenerTimeouts.wrapWithTimeout(threadPool, listener, timeout, timeoutExecutor, "test"); wrapped.onFailure(new IOException("boom")); assertTrue(taskQueue.hasDeferredTasks()); diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTaskTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTaskTests.java index 0a7bf591bb2c7..cdb85aaae2fc6 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTaskTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTaskTests.java @@ -45,7 +45,7 @@ public void testAutoRepeat() throws Exception { final CyclicBarrier barrier1 = new CyclicBarrier(2); // 1 for runInternal plus 1 for the test sequence final CyclicBarrier barrier2 = new CyclicBarrier(2); // 1 for runInternal plus 1 for the test sequence final AtomicInteger count = new AtomicInteger(); - AbstractAsyncTask task = new AbstractAsyncTask(logger, threadPool, TimeValue.timeValueMillis(1), true) { + AbstractAsyncTask task = new AbstractAsyncTask(logger, threadPool, threadPool.generic(), TimeValue.timeValueMillis(1), true) { @Override protected boolean mustReschedule() { @@ -71,10 +71,6 @@ protected void runInternal() { } } - @Override - protected String getThreadPool() { - return ThreadPool.Names.GENERIC; - } }; assertFalse(task.isScheduled()); @@ -101,7 +97,7 @@ public void testManualRepeat() throws Exception { boolean shouldRunThrowException = randomBoolean(); final CyclicBarrier barrier = new CyclicBarrier(2); // 1 for runInternal plus 1 for the test sequence final AtomicInteger count = new AtomicInteger(); - AbstractAsyncTask task = new AbstractAsyncTask(logger, threadPool, TimeValue.timeValueMillis(1), false) { + AbstractAsyncTask task = new AbstractAsyncTask(logger, threadPool, threadPool.generic(), TimeValue.timeValueMillis(1), false) { @Override protected boolean mustReschedule() { @@ -122,10 +118,6 @@ protected void runInternal() { } } - @Override - protected String getThreadPool() { - return ThreadPool.Names.GENERIC; - } }; assertFalse(task.isScheduled()); @@ -148,7 +140,13 @@ protected String getThreadPool() { public void testCloseWithNoRun() { - AbstractAsyncTask task = new AbstractAsyncTask(logger, threadPool, TimeValue.timeValueMinutes(10), true) { + AbstractAsyncTask task = new AbstractAsyncTask( + logger, + threadPool, + EsExecutors.DIRECT_EXECUTOR_SERVICE, + TimeValue.timeValueMinutes(10), + true + ) { @Override protected boolean mustReschedule() { @@ -171,7 +169,13 @@ public void testChangeInterval() throws Exception { final CountDownLatch latch = new CountDownLatch(2); - AbstractAsyncTask task = new AbstractAsyncTask(logger, threadPool, TimeValue.timeValueHours(1), true) { + AbstractAsyncTask task = new AbstractAsyncTask( + logger, + threadPool, + EsExecutors.DIRECT_EXECUTOR_SERVICE, + TimeValue.timeValueHours(1), + true + ) { @Override protected boolean mustReschedule() { @@ -202,7 +206,14 @@ public void testIsScheduledRemainFalseAfterClose() throws Exception { List tasks = new ArrayList<>(numTasks); AtomicLong counter = new AtomicLong(); for (int i = 0; i < numTasks; i++) { - AbstractAsyncTask task = new AbstractAsyncTask(logger, threadPool, TimeValue.timeValueMillis(randomIntBetween(1, 2)), true) { + AbstractAsyncTask task = new AbstractAsyncTask( + logger, + threadPool, + EsExecutors.DIRECT_EXECUTOR_SERVICE, + TimeValue.timeValueMillis(randomIntBetween(1, 2)), + true + ) { + @Override protected boolean mustReschedule() { return counter.get() <= 1000; diff --git a/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java b/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java index dba69e7db7ba4..eb034778be63d 100644 --- a/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java +++ b/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java @@ -14,6 +14,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.query.QueryBuilder; @@ -24,7 +25,6 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.test.InternalSettingsPlugin; -import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentFactory; @@ -75,7 +75,11 @@ public void testBaseAsyncTask() throws Exception { AtomicReference latch = new AtomicReference<>(new CountDownLatch(1)); AtomicReference latch2 = new AtomicReference<>(new CountDownLatch(1)); final AtomicInteger count = new AtomicInteger(); - IndexService.BaseAsyncTask task = new IndexService.BaseAsyncTask(indexService, TimeValue.timeValueMillis(1)) { + IndexService.BaseAsyncTask task = new IndexService.BaseAsyncTask( + indexService, + indexService.getThreadPool().generic(), + TimeValue.timeValueMillis(1) + ) { @Override protected void runInternal() { final CountDownLatch l1 = latch.get(); @@ -96,11 +100,6 @@ protected void runInternal() { } } } - - @Override - protected String getThreadPool() { - return ThreadPool.Names.GENERIC; - } }; latch.get().await(); @@ -115,11 +114,9 @@ protected String getThreadPool() { latch2.get().countDown(); assertEquals(2, count.get()); - task = new IndexService.BaseAsyncTask(indexService, TimeValue.timeValueMillis(1000000)) { + task = new IndexService.BaseAsyncTask(indexService, EsExecutors.DIRECT_EXECUTOR_SERVICE, TimeValue.timeValueMillis(1000000)) { @Override - protected void runInternal() { - - } + protected void runInternal() {} }; assertTrue(task.mustReschedule()); @@ -140,11 +137,9 @@ protected void runInternal() { indexService = getInstanceFromNode(IndicesService.class).indexServiceSafe(index); assertNotSame(closedIndexService, indexService); - task = new IndexService.BaseAsyncTask(indexService, TimeValue.timeValueMillis(100000)) { + task = new IndexService.BaseAsyncTask(indexService, EsExecutors.DIRECT_EXECUTOR_SERVICE, TimeValue.timeValueMillis(100000)) { @Override - protected void runInternal() { - - } + protected void runInternal() {} }; assertTrue(task.mustReschedule()); assertFalse(task.isClosed()); diff --git a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java index 493d25d685768..d7cbddf490df5 100644 --- a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java +++ b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java @@ -336,7 +336,7 @@ public SharedBlobCacheService( for (int i = 0; i < numRegions; i++) { freeRegions.add(sharedBytes.getFileChannel(i)); } - decayTask = new CacheDecayTask(threadPool, SHARED_CACHE_DECAY_INTERVAL_SETTING.get(settings)); + decayTask = new CacheDecayTask(threadPool, threadPool.generic(), SHARED_CACHE_DECAY_INTERVAL_SETTING.get(settings)); decayTask.rescheduleIfNecessary(); this.rangeSize = SHARED_CACHE_RANGE_SIZE_SETTING.get(settings); this.recoveryRangeSize = SHARED_CACHE_RECOVERY_RANGE_SIZE_SETTING.get(settings); @@ -745,8 +745,8 @@ public void close() { class CacheDecayTask extends AbstractAsyncTask { - CacheDecayTask(ThreadPool threadPool, TimeValue interval) { - super(logger, Objects.requireNonNull(threadPool), Objects.requireNonNull(interval), true); + CacheDecayTask(ThreadPool threadPool, Executor executor, TimeValue interval) { + super(logger, Objects.requireNonNull(threadPool), executor, Objects.requireNonNull(interval), true); } @Override @@ -759,11 +759,6 @@ public void runInternal() { computeDecay(); } - @Override - protected String getThreadPool() { - return ThreadPool.Names.GENERIC; - } - @Override public String toString() { return "shared_cache_decay_task"; diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index 9dfea4a6c9358..fa8f8099900ce 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -583,6 +583,7 @@ void openSession( response.getStoreFileMetadata(), response.getMappingVersion(), threadPool, + chunkResponseExecutor, ccrSettings, throttledTime::inc, leaderShardId @@ -595,7 +596,7 @@ void openSession( threadPool, responseListener, ccrSettings.getRecoveryActionTimeout(), - ThreadPool.Names.GENERIC, + threadPool.generic(), // TODO should be the remote-client response executor to match the non-timeout case PutCcrRestoreSessionAction.INTERNAL_NAME ) ); @@ -611,6 +612,7 @@ private static class RestoreSession extends FileRestoreContext { private final CcrSettings ccrSettings; private final LongConsumer throttleListener; private final ThreadPool threadPool; + private final Executor timeoutExecutor; private final ShardId leaderShardId; RestoreSession( @@ -623,6 +625,7 @@ private static class RestoreSession extends FileRestoreContext { Store.MetadataSnapshot sourceMetadata, long mappingVersion, ThreadPool threadPool, + Executor timeoutExecutor, CcrSettings ccrSettings, LongConsumer throttleListener, ShardId leaderShardId @@ -634,6 +637,7 @@ private static class RestoreSession extends FileRestoreContext { this.sourceMetadata = sourceMetadata; this.mappingVersion = mappingVersion; this.threadPool = threadPool; + this.timeoutExecutor = timeoutExecutor; this.ccrSettings = ccrSettings; this.throttleListener = throttleListener; this.leaderShardId = leaderShardId; @@ -685,7 +689,7 @@ protected void executeChunkRequest(FileChunk request, ActionListener liste ListenerTimeouts.wrapWithTimeout(threadPool, listener.map(getCcrRestoreFileChunkResponse -> { writeFileChunk(request.md, getCcrRestoreFileChunkResponse); return null; - }), ccrSettings.getRecoveryActionTimeout(), ThreadPool.Names.GENERIC, GetCcrRestoreFileChunkAction.INTERNAL_NAME) + }), ccrSettings.getRecoveryActionTimeout(), timeoutExecutor, GetCcrRestoreFileChunkAction.INTERNAL_NAME) ); } @@ -740,7 +744,7 @@ public void close(ActionListener listener) { threadPool, listener, ccrSettings.getRecoveryActionTimeout(), - ThreadPool.Names.GENERIC, + timeoutExecutor, ClearCcrRestoreSessionAction.INTERNAL_NAME ); ClearCcrRestoreSessionRequest clearRequest = new ClearCcrRestoreSessionRequest(sessionUUID, node, leaderShardId); diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java index 1054e240eed6b..c3efd67579d66 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java @@ -22,6 +22,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AbstractAsyncTask; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.core.TimeValue; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; @@ -211,7 +212,7 @@ public void messageReceived(ExchangeRequest request, TransportChannel channel, T private final class InactiveSinksReaper extends AbstractAsyncTask { InactiveSinksReaper(Logger logger, ThreadPool threadPool, TimeValue interval) { - super(logger, threadPool, interval, true); + super(logger, threadPool, EsExecutors.DIRECT_EXECUTOR_SERVICE, interval, true); rescheduleIfNecessary(); } diff --git a/x-pack/plugin/esql/qa/server/multi-node/build.gradle b/x-pack/plugin/esql/qa/server/multi-node/build.gradle new file mode 100644 index 0000000000000..1b62fdea2671c --- /dev/null +++ b/x-pack/plugin/esql/qa/server/multi-node/build.gradle @@ -0,0 +1,19 @@ +apply plugin: 'elasticsearch.legacy-yaml-rest-test' + +dependencies { + javaRestTestImplementation project(xpackModule('esql:qa:testFixtures')) +} + +restResources { + restApi { + include '_common', 'bulk', 'indices', 'esql', 'xpack', 'enrich' + } +} + +testClusters.configureEach { + numberOfNodes = 2 + testDistribution = 'DEFAULT' + setting 'xpack.license.self_generated.type', 'trial' + setting 'xpack.monitoring.collection.enabled', 'true' + setting 'xpack.security.enabled', 'false' +} diff --git a/x-pack/plugin/esql/qa/server/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/multi_node/EsqlSpecIT.java b/x-pack/plugin/esql/qa/server/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/multi_node/EsqlSpecIT.java new file mode 100644 index 0000000000000..eab26b565f93d --- /dev/null +++ b/x-pack/plugin/esql/qa/server/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/multi_node/EsqlSpecIT.java @@ -0,0 +1,17 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.qa.multi_node; + +import org.elasticsearch.xpack.esql.qa.rest.EsqlSpecTestCase; +import org.elasticsearch.xpack.ql.CsvSpecReader.CsvTestCase; + +public class EsqlSpecIT extends EsqlSpecTestCase { + public EsqlSpecIT(String fileName, String groupName, String testName, Integer lineNumber, CsvTestCase testCase) { + super(fileName, groupName, testName, lineNumber, testCase); + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetMlAutoscalingStats.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetMlAutoscalingStats.java index c2acd97e53c95..7ce5fd0a66eb2 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetMlAutoscalingStats.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetMlAutoscalingStats.java @@ -33,6 +33,8 @@ import org.elasticsearch.xpack.ml.autoscaling.MlAutoscalingResourceTracker; import org.elasticsearch.xpack.ml.process.MlMemoryTracker; +import java.util.concurrent.Executor; + /** * Internal (no-REST) transport to retrieve metrics for serverless autoscaling. */ @@ -41,6 +43,7 @@ public class TransportGetMlAutoscalingStats extends TransportMasterNodeAction MlAutoscalingResourceTracker.getMlAutoscalingStats( state, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java index d70d34126fe27..e7aba2211b2df 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java @@ -18,6 +18,7 @@ import org.elasticsearch.action.search.SearchScrollAction; import org.elasticsearch.action.search.SearchScrollRequestBuilder; import org.elasticsearch.client.internal.Client; +import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.core.TimeValue; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.SearchSourceBuilder; @@ -30,8 +31,6 @@ import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter; import org.elasticsearch.xpack.ml.extractor.ExtractedField; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.util.NoSuchElementException; @@ -169,18 +168,6 @@ private SearchRequestBuilder buildSearchRequest(long start) { return searchRequestBuilder; } - /** - * Utility class to convert ByteArrayOutputStream to ByteArrayInputStream without copying the underlying buffer. - */ - private static class ConvertableByteArrayOutputStream extends ByteArrayOutputStream { - public ByteArrayInputStream resetThisAndGetByteArrayInputStream() { - ByteArrayInputStream inputStream = new ByteArrayInputStream(buf, 0, count); - buf = new byte[0]; - count = 0; - return inputStream; - } - } - /** * IMPORTANT: This is not an idempotent method. This method changes the input array by setting each element to null. */ @@ -192,7 +179,7 @@ private InputStream processAndConsumeSearchHits(SearchHit hits[]) throws IOExcep return null; } - ConvertableByteArrayOutputStream outputStream = new ConvertableByteArrayOutputStream(); + BytesStreamOutput outputStream = new BytesStreamOutput(); SearchHit lastHit = hits[hits.length - 1]; lastTimestamp = context.extractedFields.timeFieldValue(lastHit); @@ -217,7 +204,7 @@ private InputStream processAndConsumeSearchHits(SearchHit hits[]) throws IOExcep hits[i] = null; } } - return outputStream.resetThisAndGetByteArrayInputStream(); + return outputStream.bytes().streamInput(); } private InputStream continueScroll() throws IOException { diff --git a/x-pack/plugin/ql/src/main/java/org/elasticsearch/xpack/ql/async/AsyncTaskManagementService.java b/x-pack/plugin/ql/src/main/java/org/elasticsearch/xpack/ql/async/AsyncTaskManagementService.java index e065b3814f292..29705d9e4b116 100644 --- a/x-pack/plugin/ql/src/main/java/org/elasticsearch/xpack/ql/async/AsyncTaskManagementService.java +++ b/x-pack/plugin/ql/src/main/java/org/elasticsearch/xpack/ql/async/AsyncTaskManagementService.java @@ -324,7 +324,7 @@ public static listener.onResponse(new StoredAsyncResponse<>(r, task.getExpirationTimeMillis())), e -> listener.onResponse(new StoredAsyncResponse<>(e, task.getExpirationTimeMillis())) diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/full/CacheService.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/full/CacheService.java index 3daa72efdd608..db56addb434c7 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/full/CacheService.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/full/CacheService.java @@ -636,7 +636,7 @@ public void synchronizeCache() { class CacheSynchronizationTask extends AbstractAsyncTask { CacheSynchronizationTask(ThreadPool threadPool, TimeValue interval) { - super(logger, Objects.requireNonNull(threadPool), Objects.requireNonNull(interval), true); + super(logger, Objects.requireNonNull(threadPool), threadPool.generic(), Objects.requireNonNull(interval), true); } @Override @@ -649,11 +649,6 @@ public void runInternal() { synchronizeCache(); } - @Override - protected String getThreadPool() { - return ThreadPool.Names.GENERIC; - } - @Override public String toString() { return "cache_synchronization_task";