Skip to content

Commit

Permalink
Merge branch 'main' into carlosdelest/fix-sparse-vector-test
Browse files Browse the repository at this point in the history
  • Loading branch information
carlosdelest committed Sep 8, 2023
2 parents 1d99576 + 34711e7 commit 0e5b45c
Show file tree
Hide file tree
Showing 29 changed files with 314 additions and 122 deletions.
5 changes: 5 additions & 0 deletions TESTING.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,11 @@ You can run a group of YAML test by using wildcards:
--tests "org.elasticsearch.test.rest.ClientYamlTestSuiteIT.test {yaml=index/*/*}"
---------------------------------------------------------------------------

Note that if the selected test via the `--tests` filter is not a valid test, i.e., the YAML test
runner is not able to parse and load it, you might get an error message indicating that the test
was not found. In such cases, running the whole suite without using the `--tests` could show more
specific error messages about why the test runner is not able to parse or load a certain test.

The YAML REST tests support all the options provided by the randomized runner, plus the following:

* `tests.rest.blacklist`: comma separated globs that identify tests that are
Expand Down
6 changes: 6 additions & 0 deletions docs/changelog/98518.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 98518
summary: Add `index.look_back_time` setting for tsdb data streams
area: TSDB
type: enhancement
issues:
- 98463
5 changes: 5 additions & 0 deletions docs/changelog/98915.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 98915
summary: Avoid risk of OOM in datafeeds when memory is constrained
area: Machine Learning
type: bug
issues: [89769]
1 change: 1 addition & 0 deletions docs/reference/data-streams/set-up-tsds.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ Optionally, the index settings component template for a TSDS can include:

* Your lifecycle policy in the `index.lifecycle.name` index setting.
* The <<tsds-look-ahead-time,`index.look_ahead_time`>> index setting.
* The <<tsds-look-back-time,`index.look_back_time`>> index setting.
* Other index settings, such as <<index-codec,`index.codec`>>, for your TSDS's
backing indices.

Expand Down
9 changes: 9 additions & 0 deletions docs/reference/data-streams/tsds-index-settings.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@ days). Only indices with an `index.mode` of `time_series` support this setting.
For more information, refer to <<tsds-look-ahead-time>>. Additionally this setting
can not be less than `time_series.poll_interval` cluster setting.

[[index-look-back-time]]
`index.look_back_time`::
(<<_static_index_settings,Static>>, <<time-units,time units>>)
Interval used to calculate the `index.time_series.start_time` for a TSDS's first
backing index when a tsdb data stream is created. Defaults to `2h` (2 hours).
Accepts `1m` (one minute) to `7d` (seven days). Only indices with an `index.mode`
of `time_series` support this setting. For more information,
refer to <<tsds-look-back-time>>.

[[index-routing-path]] `index.routing_path`::
(<<_static_index_settings,Static>>, string or array of strings) Plain `keyword`
fields used to route documents in a TSDS to index shards. Supports wildcards
Expand Down
16 changes: 16 additions & 0 deletions docs/reference/data-streams/tsds.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,22 @@ value borders the `index.time_series.start_time` for the new write index. This
ensures the `@timestamp` ranges for neighboring backing indices always border
but never overlap.

[discrete]
[[tsds-look-back-time]]
==== Look-back time

Use the <<index-look-back-time,`index.look_back_time`>> index setting to
configure how far in the past you can add documents to an index. When you
create a data stream for a TSDS, {es} calculates the index's
`index.time_series.start_time` value as:

`now - index.look_back_time`

This setting is only used when a data stream gets created and controls
the `index.time_series.start_time` index setting of the first backing index.
Configuring this index setting can be useful to accept documents with `@timestamp`
field values that are older than 2 hours (the `index.look_back_time` default).

[discrete]
[[tsds-accepted-time-range]]
==== Accepted time range for adding data
Expand Down
2 changes: 2 additions & 0 deletions docs/reference/snapshot-restore/apis/put-repo-api.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ PUT /_snapshot/my_repository
}
----

IMPORTANT: If you're migrating {ref}/searchable-snapshots.html[searchable snapshots], the repository's name must be identical in the source and destination clusters.

[[put-snapshot-repo-api-request]]
==== {api-request-title}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
import org.elasticsearch.client.Request;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.common.time.DateFormatter;
import org.elasticsearch.common.time.DateFormatters;
import org.elasticsearch.common.time.FormatNames;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.test.rest.ObjectPath;
import org.junit.Before;

Expand Down Expand Up @@ -618,6 +620,69 @@ public void testUpdateComponentTemplateDoesNotFailIndexTemplateValidation() thro
client().performRequest(request);
}

public void testLookBackTime() throws IOException {
// Create template that uses index.look_back_time index setting:
String template = """
{
"index_patterns": ["test*"],
"template": {
"settings":{
"index": {
"look_back_time": "24h",
"number_of_replicas": 0,
"mode": "time_series"
}
},
"mappings":{
"properties": {
"@timestamp" : {
"type": "date"
},
"field": {
"type": "keyword",
"time_series_dimension": true
}
}
}
},
"data_stream": {}
}""";
var putIndexTemplateRequest = new Request("PUT", "/_index_template/2");
putIndexTemplateRequest.setJsonEntity(template);
assertOK(client().performRequest(putIndexTemplateRequest));

// Create data stream:
var createDataStreamRequest = new Request("PUT", "/_data_stream/test123");
assertOK(client().performRequest(createDataStreamRequest));

// Check data stream has been created:
var getDataStreamsRequest = new Request("GET", "/_data_stream");
var response = client().performRequest(getDataStreamsRequest);
assertOK(response);
var dataStreams = entityAsMap(response);
assertThat(ObjectPath.evaluate(dataStreams, "data_streams"), hasSize(1));
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.name"), equalTo("test123"));
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.generation"), equalTo(1));
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.template"), equalTo("2"));
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.indices"), hasSize(1));
String firstBackingIndex = ObjectPath.evaluate(dataStreams, "data_streams.0.indices.0.index_name");
assertThat(firstBackingIndex, backingIndexEqualTo("test123", 1));

// Check the backing index:
// 2023-08-15T04:35:50.000Z
var indices = getIndex(firstBackingIndex);
var escapedBackingIndex = firstBackingIndex.replace(".", "\\.");
assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".data_stream"), equalTo("test123"));
assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.mode"), equalTo("time_series"));
String startTimeFirstBackingIndex = ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.start_time");
assertThat(startTimeFirstBackingIndex, notNullValue());
Instant now = Instant.now();
Instant startTime = DateFormatters.from(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parse(startTimeFirstBackingIndex)).toInstant();
assertTrue(now.minus(24, ChronoUnit.HOURS).isAfter(startTime));
String endTimeFirstBackingIndex = ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.end_time");
assertThat(endTimeFirstBackingIndex, notNullValue());
}

private static Map<?, ?> getIndex(String indexName) throws IOException {
var getIndexRequest = new Request("GET", "/" + indexName + "?human");
var response = client().performRequest(getIndexRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,11 @@ public Settings getAdditionalIndexSettings(
if (indexMode == IndexMode.TIME_SERIES) {
Settings.Builder builder = Settings.builder();
TimeValue lookAheadTime = DataStreamsPlugin.LOOK_AHEAD_TIME.get(allSettings);
TimeValue lookBackTime = DataStreamsPlugin.LOOK_BACK_TIME.get(allSettings);
final Instant start;
final Instant end;
if (dataStream == null || migrating) {
start = DataStream.getCanonicalTimestampBound(resolvedAt.minusMillis(lookAheadTime.getMillis()));
start = DataStream.getCanonicalTimestampBound(resolvedAt.minusMillis(lookBackTime.getMillis()));
end = DataStream.getCanonicalTimestampBound(resolvedAt.plusMillis(lookAheadTime.getMillis()));
} else {
IndexMetadata currentLatestBackingIndex = metadata.index(dataStream.getWriteIndex());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,14 @@ public class DataStreamsPlugin extends Plugin implements ActionPlugin {
Setting.Property.Dynamic
);
public static final String LIFECYCLE_CUSTOM_INDEX_METADATA_KEY = "data_stream_lifecycle";

public static final Setting<TimeValue> LOOK_BACK_TIME = Setting.timeSetting(
"index.look_back_time",
TimeValue.timeValueHours(2),
TimeValue.timeValueMinutes(1),
TimeValue.timeValueDays(7),
Setting.Property.IndexScope,
Setting.Property.Dynamic
);
// The dependency of index.look_ahead_time is a cluster setting and currently there is no clean validation approach for this:
private final SetOnce<UpdateTimeSeriesRangeService> updateTimeSeriesRangeService = new SetOnce<>();
private final SetOnce<DataStreamLifecycleErrorStore> errorStoreInitialisationService = new SetOnce<>();
Expand Down Expand Up @@ -141,6 +148,7 @@ public List<Setting<?>> getSettings() {
List<Setting<?>> pluginSettings = new ArrayList<>();
pluginSettings.add(TIME_SERIES_POLL_INTERVAL);
pluginSettings.add(LOOK_AHEAD_TIME);
pluginSettings.add(LOOK_BACK_TIME);
pluginSettings.add(DataStreamLifecycleService.DATA_STREAM_LIFECYCLE_POLL_INTERVAL_SETTING);
pluginSettings.add(DataStreamLifecycleService.DATA_STREAM_MERGE_POLICY_TARGET_FLOOR_SEGMENT_SETTING);
pluginSettings.add(DataStreamLifecycleService.DATA_STREAM_MERGE_POLICY_TARGET_FACTOR_SETTING);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

public class DataStreamIndexSettingsProviderTests extends ESTestCase {

private static final TimeValue DEFAULT_LOOK_BACK_TIME = TimeValue.timeValueHours(2); // default
private static final TimeValue DEFAULT_LOOK_AHEAD_TIME = TimeValue.timeValueHours(2); // default

DataStreamIndexSettingsProvider provider;
Expand Down Expand Up @@ -83,7 +84,7 @@ public void testGetAdditionalIndexSettings() throws Exception {
List.of(new CompressedXContent(mapping))
);
assertThat(result.size(), equalTo(3));
assertThat(IndexSettings.TIME_SERIES_START_TIME.get(result), equalTo(now.minusMillis(DEFAULT_LOOK_AHEAD_TIME.getMillis())));
assertThat(IndexSettings.TIME_SERIES_START_TIME.get(result), equalTo(now.minusMillis(DEFAULT_LOOK_BACK_TIME.getMillis())));
assertThat(IndexSettings.TIME_SERIES_END_TIME.get(result), equalTo(now.plusMillis(DEFAULT_LOOK_AHEAD_TIME.getMillis())));
assertThat(IndexMetadata.INDEX_ROUTING_PATH.get(result), contains("field3"));
}
Expand Down Expand Up @@ -235,10 +236,31 @@ public void testGetAdditionalIndexSettingsLookAheadTime() throws Exception {
List.of(new CompressedXContent("{}"))
);
assertThat(result.size(), equalTo(2));
assertThat(IndexSettings.TIME_SERIES_START_TIME.get(result), equalTo(now.minusMillis(lookAheadTime.getMillis())));
assertThat(IndexSettings.TIME_SERIES_START_TIME.get(result), equalTo(now.minusMillis(DEFAULT_LOOK_BACK_TIME.getMillis())));
assertThat(IndexSettings.TIME_SERIES_END_TIME.get(result), equalTo(now.plusMillis(lookAheadTime.getMillis())));
}

public void testGetAdditionalIndexSettingsLookBackTime() throws Exception {
Metadata metadata = Metadata.EMPTY_METADATA;
String dataStreamName = "logs-app1";

Instant now = Instant.now().truncatedTo(ChronoUnit.SECONDS);
TimeValue lookBackTime = TimeValue.timeValueHours(12);
Settings settings = builder().put("index.mode", "time_series").put("index.look_back_time", lookBackTime.getStringRep()).build();
Settings result = provider.getAdditionalIndexSettings(
DataStream.getDefaultBackingIndexName(dataStreamName, 1),
dataStreamName,
true,
metadata,
now,
settings,
List.of(new CompressedXContent("{}"))
);
assertThat(result.size(), equalTo(2));
assertThat(IndexSettings.TIME_SERIES_START_TIME.get(result), equalTo(now.minusMillis(lookBackTime.getMillis())));
assertThat(IndexSettings.TIME_SERIES_END_TIME.get(result), equalTo(now.plusMillis(DEFAULT_LOOK_AHEAD_TIME.getMillis())));
}

public void testGetAdditionalIndexSettingsDataStreamAlreadyCreated() throws Exception {
String dataStreamName = "logs-app1";
TimeValue lookAheadTime = TimeValue.timeValueHours(2);
Expand Down Expand Up @@ -358,7 +380,7 @@ public void testGetAdditionalIndexSettingsMigrateToTsdb() {
List.of()
);
assertThat(result.size(), equalTo(2));
assertThat(IndexSettings.TIME_SERIES_START_TIME.get(result), equalTo(now.minusMillis(DEFAULT_LOOK_AHEAD_TIME.getMillis())));
assertThat(IndexSettings.TIME_SERIES_START_TIME.get(result), equalTo(now.minusMillis(DEFAULT_LOOK_BACK_TIME.getMillis())));
assertThat(IndexSettings.TIME_SERIES_END_TIME.get(result), equalTo(now.plusMillis(DEFAULT_LOOK_AHEAD_TIME.getMillis())));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.junit.After;

import java.io.IOException;
import java.util.Collection;
import java.util.List;

Expand Down Expand Up @@ -52,7 +51,7 @@ public void testTimeSeriesPollIntervalSettingToHigh() {
assertThat(e.getMessage(), equalTo("failed to parse value [11m] for setting [time_series.poll_interval], must be <= [10m]"));
}

public void testLookAheadTimeSetting() throws IOException {
public void testLookAheadTimeSetting() {
var settings = Settings.builder().put(DataStreamsPlugin.LOOK_AHEAD_TIME.getKey(), "10m").build();
updateIndexSettings(settings);
}
Expand All @@ -69,6 +68,18 @@ public void testLookAheadTimeSettingToHigh() {
assertThat(e.getMessage(), equalTo("failed to parse value [8d] for setting [index.look_ahead_time], must be <= [7d]"));
}

public void testLookBackTimeSettingToLow() {
var settings = Settings.builder().put(DataStreamsPlugin.LOOK_BACK_TIME.getKey(), "1s").build();
var e = expectThrows(IllegalArgumentException.class, () -> updateIndexSettings(settings));
assertThat(e.getMessage(), equalTo("failed to parse value [1s] for setting [index.look_back_time], must be >= [1m]"));
}

public void testLookBackTimeSettingToHigh() {
var settings = Settings.builder().put(DataStreamsPlugin.LOOK_BACK_TIME.getKey(), "8d").build();
var e = expectThrows(IllegalArgumentException.class, () -> updateIndexSettings(settings));
assertThat(e.getMessage(), equalTo("failed to parse value [8d] for setting [index.look_back_time], must be <= [7d]"));
}

public void testLookAheadTimeSettingLowerThanTimeSeriesPollIntervalSetting() {
{
var settings = Settings.builder()
Expand Down Expand Up @@ -99,7 +110,7 @@ public void testLookAheadTimeSettingLowerThanTimeSeriesPollIntervalSetting() {
}
}

public void testLookAheadTimeSettingHigherThanTimeSeriesPollIntervalSetting() throws IOException {
public void testLookAheadTimeSettingHigherThanTimeSeriesPollIntervalSetting() {
var clusterSettings = Settings.builder().put(DataStreamsPlugin.TIME_SERIES_POLL_INTERVAL.getKey(), "10m").build();
updateClusterSettings(clusterSettings);
var indexSettings = Settings.builder().put(DataStreamsPlugin.LOOK_AHEAD_TIME.getKey(), "100m").build();
Expand All @@ -110,7 +121,7 @@ private void updateClusterSettings(Settings settings) {
clusterAdmin().updateSettings(new ClusterUpdateSettingsRequest().persistentSettings(settings)).actionGet();
}

private void updateIndexSettings(Settings settings) throws IOException {
private void updateIndexSettings(Settings settings) {
try {
createIndex("test");
} catch (ResourceAlreadyExistsException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,10 @@
---
"Sparse vector in 8.x":
- skip:
version: " - 7.99.99, 8.11.0 - "
reason: "sparse_vector field type not supported in 8.x until 8.11.0"
# version: " - 7.99.99, 8.11.0 - "
# reason: "sparse_vector field type not supported in 8.x until 8.11.0"
version: "all"
reason: "https://github.com/elastic/elasticsearch/issues/99318"
- do:
catch: /The \[sparse_vector\] field type .* supported/
indices.create:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

Expand All @@ -35,7 +36,7 @@ public static <Response> ActionListener<Response> wrapWithTimeout(
ThreadPool threadPool,
ActionListener<Response> listener,
TimeValue timeout,
String executor,
Executor executor,
String listenerName
) {
return wrapWithTimeout(threadPool, timeout, executor, listener, (ignore) -> {
Expand All @@ -55,6 +56,22 @@ public static <Response> ActionListener<Response> wrapWithTimeout(
* @param onTimeout consumer will be called and the resulting wrapper will be passed to it as a parameter
* @return the wrapped listener that will timeout
*/
public static <Response> ActionListener<Response> wrapWithTimeout(
ThreadPool threadPool,
TimeValue timeout,
Executor executor,
ActionListener<Response> listener,
Consumer<ActionListener<Response>> onTimeout
) {
TimeoutableListener<Response> wrappedListener = new TimeoutableListener<>(listener, onTimeout);
wrappedListener.cancellable = threadPool.schedule(wrappedListener, timeout, executor);
return wrappedListener;
}

/**
* @deprecated Use {@link #wrapWithTimeout(ThreadPool, TimeValue, Executor, ActionListener, Consumer)} instead.
*/
@Deprecated(forRemoval = true)
public static <Response> ActionListener<Response> wrapWithTimeout(
ThreadPool threadPool,
TimeValue timeout,
Expand Down
Loading

0 comments on commit 0e5b45c

Please sign in to comment.