diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexClusterStateUpdateRequest.java b/server/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexClusterStateUpdateRequest.java index a53ff9133cda3..9e3cb34e9a5df 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexClusterStateUpdateRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexClusterStateUpdateRequest.java @@ -13,6 +13,7 @@ import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.cluster.ack.ClusterStateUpdateRequest; import org.elasticsearch.cluster.block.ClusterBlock; +import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; @@ -48,6 +49,8 @@ public class CreateIndexClusterStateUpdateRequest extends ClusterStateUpdateRequ private boolean performReroute = true; + private ComposableIndexTemplate matchingTemplate; + public CreateIndexClusterStateUpdateRequest(String cause, String index, String providedName) { this.cause = cause; this.index = index; @@ -186,6 +189,21 @@ public CreateIndexClusterStateUpdateRequest performReroute(boolean performRerout return this; } + /** + * @return The composable index template that matches with the index that will be cretaed by this request. + */ + public ComposableIndexTemplate matchingTemplate() { + return matchingTemplate; + } + + /** + * Sets the composable index template that matches with index that will be created by this request. + */ + public CreateIndexClusterStateUpdateRequest setMatchingTemplate(ComposableIndexTemplate matchingTemplate) { + this.matchingTemplate = matchingTemplate; + return this; + } + @Override public String toString() { return "CreateIndexClusterStateUpdateRequest{" @@ -217,6 +235,8 @@ public String toString() { + waitForActiveShards + ", systemDataStreamDescriptor=" + systemDataStreamDescriptor + + ", matchingTemplate=" + + matchingTemplate + '}'; } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java index 4f08d6a50eda4..a0b0f7341ce10 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java @@ -14,6 +14,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.AliasAction; import org.elasticsearch.cluster.metadata.AliasMetadata; +import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexAbstraction; import org.elasticsearch.cluster.metadata.IndexMetadata; @@ -242,15 +243,17 @@ private RolloverResult rolloverDataStream( ); } + final ComposableIndexTemplate templateV2; final SystemDataStreamDescriptor systemDataStreamDescriptor; if (dataStream.isSystem() == false) { systemDataStreamDescriptor = null; - lookupTemplateForDataStream(dataStreamName, currentState.metadata()); + templateV2 = lookupTemplateForDataStream(dataStreamName, currentState.metadata()); } else { systemDataStreamDescriptor = systemIndices.findMatchingDataStreamDescriptor(dataStreamName); if (systemDataStreamDescriptor == null) { throw new IllegalArgumentException("no system data stream descriptor found for data stream [" + dataStreamName + "]"); } + templateV2 = systemDataStreamDescriptor.getComposableIndexTemplate(); } final DataStream ds = dataStream.getDataStream(); @@ -270,6 +273,7 @@ private RolloverResult rolloverDataStream( systemDataStreamDescriptor, now ); + createIndexClusterStateRequest.setMatchingTemplate(templateV2); ClusterState newState = createIndexService.applyCreateIndexRequest( currentState, createIndexClusterStateRequest, diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/template/post/TransportSimulateIndexTemplateAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/template/post/TransportSimulateIndexTemplateAction.java index d72e28058c8f4..4e4abf003e6b4 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/template/post/TransportSimulateIndexTemplateAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/template/post/TransportSimulateIndexTemplateAction.java @@ -213,6 +213,7 @@ public static Template resolveTemplate( Settings result = provider.getAdditionalIndexSettings( indexName, template.getDataStreamTemplate() != null ? indexName : null, + template.getDataStreamTemplate() != null ? template.getDataStreamTemplate().getIndexMode() : null, simulatedState.getMetadata(), System.currentTimeMillis(), templateSettings diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/ComposableIndexTemplate.java b/server/src/main/java/org/elasticsearch/cluster/metadata/ComposableIndexTemplate.java index f7f5f84ab93b4..cd6edd507dc95 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/ComposableIndexTemplate.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/ComposableIndexTemplate.java @@ -17,6 +17,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.core.Nullable; +import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.mapper.DataStreamTimestampFieldMapper; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.xcontent.ConstructingObjectParser; @@ -305,28 +306,35 @@ public static class DataStreamTemplate implements Writeable, ToXContentObject { private static final ParseField HIDDEN = new ParseField("hidden"); private static final ParseField ALLOW_CUSTOM_ROUTING = new ParseField("allow_custom_routing"); + private static final ParseField INDEX_MODE = new ParseField("index_mode"); public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( "data_stream_template", false, - a -> new DataStreamTemplate(a[0] != null && (boolean) a[0], a[1] != null && (boolean) a[1]) + args -> { + IndexMode indexMode = args[2] != null ? IndexMode.fromString((String) args[2]) : null; + return new DataStreamTemplate(args[0] != null && (boolean) args[0], args[1] != null && (boolean) args[1], indexMode); + } ); static { PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), HIDDEN); PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), ALLOW_CUSTOM_ROUTING); + PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), INDEX_MODE); } private final boolean hidden; private final boolean allowCustomRouting; + private final IndexMode indexMode; public DataStreamTemplate() { - this(false, false); + this(false, false, null); } - public DataStreamTemplate(boolean hidden, boolean allowCustomRouting) { + public DataStreamTemplate(boolean hidden, boolean allowCustomRouting, IndexMode indexMode) { this.hidden = hidden; this.allowCustomRouting = allowCustomRouting; + this.indexMode = indexMode; } DataStreamTemplate(StreamInput in) throws IOException { @@ -336,6 +344,11 @@ public DataStreamTemplate(boolean hidden, boolean allowCustomRouting) { } else { allowCustomRouting = false; } + if (in.getVersion().onOrAfter(Version.V_8_1_0)) { + indexMode = in.readOptionalEnum(IndexMode.class); + } else { + indexMode = null; + } } public String getTimestampField() { @@ -368,12 +381,20 @@ public boolean isAllowCustomRouting() { return allowCustomRouting; } + @Nullable + public IndexMode getIndexMode() { + return indexMode; + } + @Override public void writeTo(StreamOutput out) throws IOException { out.writeBoolean(hidden); if (out.getVersion().onOrAfter(Version.V_8_0_0)) { out.writeBoolean(allowCustomRouting); } + if (out.getVersion().onOrAfter(Version.V_8_1_0)) { + out.writeOptionalEnum(indexMode); + } } @Override @@ -381,6 +402,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.startObject(); builder.field("hidden", hidden); builder.field(ALLOW_CUSTOM_ROUTING.getPreferredName(), allowCustomRouting); + if (indexMode != null) { + builder.field(INDEX_MODE.getPreferredName(), indexMode); + } builder.endObject(); return builder; } @@ -390,12 +414,12 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; DataStreamTemplate that = (DataStreamTemplate) o; - return hidden == that.hidden && allowCustomRouting == that.allowCustomRouting; + return hidden == that.hidden && allowCustomRouting == that.allowCustomRouting && indexMode == that.indexMode; } @Override public int hashCode() { - return Objects.hash(hidden, allowCustomRouting); + return Objects.hash(hidden, allowCustomRouting, indexMode); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java index dc4cbe79f9fb4..4e830cd95a0ae 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java @@ -24,6 +24,7 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.xcontent.ConstructingObjectParser; import org.elasticsearch.xcontent.ParseField; import org.elasticsearch.xcontent.ToXContentObject; @@ -81,6 +82,7 @@ public final class DataStream extends AbstractDiffable implements To private final boolean replicated; private final boolean system; private final boolean allowCustomRouting; + private final IndexMode indexMode; public DataStream( String name, @@ -91,7 +93,8 @@ public DataStream( boolean hidden, boolean replicated, boolean system, - boolean allowCustomRouting + boolean allowCustomRouting, + IndexMode indexMode ) { this( name, @@ -103,7 +106,8 @@ public DataStream( replicated, system, System::currentTimeMillis, - allowCustomRouting + allowCustomRouting, + indexMode ); } @@ -118,7 +122,8 @@ public DataStream( boolean replicated, boolean system, LongSupplier timeProvider, - boolean allowCustomRouting + boolean allowCustomRouting, + IndexMode indexMode ) { this.name = name; this.timeStampField = timeStampField; @@ -131,6 +136,7 @@ public DataStream( this.timeProvider = timeProvider; this.system = system; this.allowCustomRouting = allowCustomRouting; + this.indexMode = indexMode; assert indices.size() > 0; } @@ -177,13 +183,50 @@ public Index selectTimeSeriesWriteIndex(Instant timestamp, Metadata metadata) { return null; } - public boolean isTimeSeries(Function indices) { - return isTimeSeries(indices.apply(getWriteIndex())); - } - - public boolean isTimeSeries(IndexMetadata indexMetadata) { - IndexMode indexMode = IndexSettings.MODE.get(indexMetadata.getSettings()); - return indexMode == IndexMode.TIME_SERIES; + /** + * Validates this data stream. If this is a time series data stream then this method validates that temporal range + * of backing indices (defined by index.time_series.start_time and index.time_series.end_time) do not overlap with each other. + * + * @param imSupplier Function that supplies {@link IndexMetadata} instances based on the provided index name + */ + public void validate(Function imSupplier) { + if (indexMode == IndexMode.TIME_SERIES) { + // Get a sorted overview of each backing index with there start and end time range: + var startAndEndTimes = indices.stream().map(index -> imSupplier.apply(index.getName())).map(im -> { + Instant start = IndexSettings.TIME_SERIES_START_TIME.get(im.getSettings()); + Instant end = IndexSettings.TIME_SERIES_END_TIME.get(im.getSettings()); + assert end.isAfter(start); // This is also validated by TIME_SERIES_END_TIME setting. + return new Tuple<>(im.getIndex().getName(), new Tuple<>(start, end)); + }) + .sorted(Comparator.comparing(entry -> entry.v2().v1())) // Sort by start time + .collect(Collectors.toList()); + + Tuple> previous = null; + var formatter = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER; + for (var current : startAndEndTimes) { + if (previous == null) { + previous = current; + } else { + // The end_time of previous backing index should be equal or less than start_time of current backing index. + // If previous.end_time > current.start_time then we should fail here: + if (previous.v2().v2().compareTo(current.v2().v1()) > 0) { + String range1 = formatter.format(previous.v2().v1()) + " TO " + formatter.format(previous.v2().v2()); + String range2 = formatter.format(current.v2().v1()) + " TO " + formatter.format(current.v2().v2()); + throw new IllegalArgumentException( + "backing index [" + + previous.v1() + + "] with range [" + + range1 + + "] is overlapping with backing index [" + + current.v1() + + "] with range [" + + range2 + + "]" + ); + } + } + } + } } @Nullable @@ -213,6 +256,11 @@ public boolean isAllowCustomRouting() { return allowCustomRouting; } + @Nullable + public IndexMode getIndexMode() { + return indexMode; + } + /** * Performs a rollover on a {@code DataStream} instance and returns a new instance containing * the updated list of backing indices and incremented generation. @@ -227,7 +275,18 @@ public DataStream rollover(Index writeIndex, long generation) { List backingIndices = new ArrayList<>(indices); backingIndices.add(writeIndex); - return new DataStream(name, timeStampField, backingIndices, generation, metadata, hidden, false, system, allowCustomRouting); + return new DataStream( + name, + timeStampField, + backingIndices, + generation, + metadata, + hidden, + false, + system, + allowCustomRouting, + indexMode + ); } /** @@ -294,7 +353,8 @@ public DataStream removeBackingIndex(Index index) { hidden, replicated, system, - allowCustomRouting + allowCustomRouting, + indexMode ); } @@ -336,7 +396,8 @@ public DataStream replaceBackingIndex(Index existingBackingIndex, Index newBacki hidden, replicated, system, - allowCustomRouting + allowCustomRouting, + indexMode ); } @@ -395,7 +456,8 @@ public DataStream addBackingIndex(Metadata clusterMetadata, Index index) { hidden, replicated, system, - allowCustomRouting + allowCustomRouting, + indexMode ); } @@ -410,7 +472,8 @@ public DataStream promoteDataStream() { false, system, timeProvider, - allowCustomRouting + allowCustomRouting, + indexMode ); } @@ -443,7 +506,8 @@ public DataStream snapshot(Collection indicesInSnapshot) { hidden, replicated, system, - allowCustomRouting + allowCustomRouting, + indexMode ); } @@ -488,7 +552,8 @@ public DataStream(StreamInput in) throws IOException { in.readBoolean(), in.readBoolean(), in.readBoolean(), - in.getVersion().onOrAfter(Version.V_8_0_0) ? in.readBoolean() : false + in.getVersion().onOrAfter(Version.V_8_0_0) ? in.readBoolean() : false, + in.getVersion().onOrAfter(Version.V_8_1_0) ? in.readOptionalEnum(IndexMode.class) : null ); } @@ -509,6 +574,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_8_0_0)) { out.writeBoolean(allowCustomRouting); } + if (out.getVersion().onOrAfter(Version.V_8_1_0)) { + out.writeOptionalEnum(indexMode); + } } public static final ParseField NAME_FIELD = new ParseField("name"); @@ -520,6 +588,7 @@ public void writeTo(StreamOutput out) throws IOException { public static final ParseField REPLICATED_FIELD = new ParseField("replicated"); public static final ParseField SYSTEM_FIELD = new ParseField("system"); public static final ParseField ALLOW_CUSTOM_ROUTING = new ParseField("allow_custom_routing"); + public static final ParseField INDEX_MODE = new ParseField("index_mode"); @SuppressWarnings("unchecked") private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( @@ -533,7 +602,8 @@ public void writeTo(StreamOutput out) throws IOException { args[5] != null && (boolean) args[5], args[6] != null && (boolean) args[6], args[7] != null && (boolean) args[7], - args[8] != null && (boolean) args[8] + args[8] != null && (boolean) args[8], + args[9] != null ? IndexMode.fromString((String) args[9]) : null ) ); @@ -547,6 +617,7 @@ public void writeTo(StreamOutput out) throws IOException { PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), REPLICATED_FIELD); PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), SYSTEM_FIELD); PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), ALLOW_CUSTOM_ROUTING); + PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), INDEX_MODE); } public static DataStream fromXContent(XContentParser parser) throws IOException { @@ -567,6 +638,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(REPLICATED_FIELD.getPreferredName(), replicated); builder.field(SYSTEM_FIELD.getPreferredName(), system); builder.field(ALLOW_CUSTOM_ROUTING.getPreferredName(), allowCustomRouting); + if (indexMode != null) { + builder.field(INDEX_MODE.getPreferredName(), indexMode); + } builder.endObject(); return builder; } @@ -583,12 +657,13 @@ public boolean equals(Object o) { && Objects.equals(metadata, that.metadata) && hidden == that.hidden && replicated == that.replicated - && allowCustomRouting == that.allowCustomRouting; + && allowCustomRouting == that.allowCustomRouting + && indexMode == that.indexMode; } @Override public int hashCode() { - return Objects.hash(name, timeStampField, indices, generation, metadata, hidden, replicated, allowCustomRouting); + return Objects.hash(name, timeStampField, indices, generation, metadata, hidden, replicated, allowCustomRouting, indexMode); } public static final class TimestampField implements Writeable, ToXContentObject { diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexAbstraction.java b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexAbstraction.java index cb9489b7745c1..7e60d3b8bdcbb 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexAbstraction.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexAbstraction.java @@ -13,6 +13,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.core.Nullable; import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.xcontent.XContent; @@ -444,7 +445,7 @@ public Index getWriteIndex(IndexRequest request, Metadata metadata) { return getWriteIndex(); } - if (dataStream.isTimeSeries(metadata::index) == false) { + if (dataStream.getIndexMode() != IndexMode.TIME_SERIES) { return getWriteIndex(); } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java index 25ddbbccd9c3a..ad1d2af07e9ac 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java @@ -1352,14 +1352,25 @@ public DataStream dataStream(String dataStreamName) { public Builder dataStreams(Map dataStreams, Map dataStreamAliases) { previousIndicesLookup = null; + // Only perform data stream validation only when data streams are modified in Metadata: + for (DataStream dataStream : dataStreams.values()) { + dataStream.validate(indices::get); + } + this.customs.put(DataStreamMetadata.TYPE, new DataStreamMetadata(dataStreams, dataStreamAliases)); return this; } public Builder put(DataStream dataStream) { previousIndicesLookup = null; - Objects.requireNonNull(dataStream, "it is invalid to add a null data stream"); + + // Every time the backing indices of a data stream is modified a new instance will be created and + // that instance needs to be added here. So this is a good place to do data stream validation for + // the data stream and all of its backing indices. Doing this validation in the build() method would + // trigger this validation on each new Metadata creation, even if there are no changes to data streams. + dataStream.validate(indices::get); + Map existingDataStreams = Optional.ofNullable( (DataStreamMetadata) this.customs.get(DataStreamMetadata.TYPE) ).map(dsmd -> new HashMap<>(dsmd.dataStreams())).orElse(new HashMap<>()); diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java index c858c680f8185..5d8b092d9a17e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.mapper.DataStreamTimestampFieldMapper; import org.elasticsearch.index.mapper.MappingLookup; import org.elasticsearch.index.mapper.MetadataFieldMapper; @@ -214,7 +215,8 @@ static ClusterState createDataStream( ).dataStreamName(dataStreamName) .systemDataStreamDescriptor(systemDataStreamDescriptor) .nameResolvedInstant(request.startTime) - .performReroute(request.performReroute()); + .performReroute(request.performReroute()) + .setMatchingTemplate(template); if (isSystem) { createIndexRequest.settings(SystemIndexDescriptor.DEFAULT_SETTINGS); @@ -245,6 +247,7 @@ static ClusterState createDataStream( List dsBackingIndices = backingIndices.stream().map(IndexMetadata::getIndex).collect(Collectors.toList()); dsBackingIndices.add(writeIndex.getIndex()); boolean hidden = isSystem || template.getDataStreamTemplate().isHidden(); + final IndexMode indexMode = template.getDataStreamTemplate().getIndexMode(); DataStream newDataStream = new DataStream( dataStreamName, timestampField, @@ -254,7 +257,8 @@ static ClusterState createDataStream( hidden, false, isSystem, - template.getDataStreamTemplate().isAllowCustomRouting() + template.getDataStreamTemplate().isAllowCustomRouting(), + indexMode ); Metadata.Builder builder = Metadata.builder(currentState.metadata()).put(newDataStream); diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java index a6c9cb3db42da..2cf0cde203ad7 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java @@ -83,6 +83,7 @@ import java.util.Locale; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; @@ -845,6 +846,12 @@ static Settings aggregateIndexSettings( final Settings.Builder additionalIndexSettings = Settings.builder(); final Settings templateAndRequestSettings = Settings.builder().put(combinedTemplateSettings).put(request.settings()).build(); + final IndexMode matchingIndexMode = Optional.of(request) + .map(CreateIndexClusterStateUpdateRequest::matchingTemplate) + .map(ComposableIndexTemplate::getDataStreamTemplate) + .map(ComposableIndexTemplate.DataStreamTemplate::getIndexMode) + .orElse(null); + // Loop through all the explicit index setting providers, adding them to the // additionalIndexSettings map for (IndexSettingProvider provider : indexSettingProviders) { @@ -852,6 +859,7 @@ static Settings aggregateIndexSettings( provider.getAdditionalIndexSettings( request.index(), request.dataStreamName(), + matchingIndexMode, currentState.getMetadata(), request.getNameResolvedAt(), templateAndRequestSettings diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java index 745e415c1ed62..14dfb59ab36bf 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java @@ -633,6 +633,7 @@ private void validateIndexTemplateV2(String name, ComposableIndexTemplate indexT provider.getAdditionalIndexSettings( "validate-index-name", indexTemplate.getDataStreamTemplate() != null ? "validate-data-stream-name" : null, + indexTemplate.getDataStreamTemplate() != null ? indexTemplate.getDataStreamTemplate().getIndexMode() : null, currentState.getMetadata(), System.currentTimeMillis(), finalTemplate.map(Template::settings).orElse(Settings.EMPTY) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DataTier.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DataTier.java index 5558773927a67..c68f285bb4b13 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DataTier.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DataTier.java @@ -18,6 +18,7 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexSettingProvider; import org.elasticsearch.snapshots.SearchableSnapshotsSettings; @@ -234,6 +235,7 @@ public static class DefaultHotAllocationSettingProvider implements IndexSettingP public Settings getAdditionalIndexSettings( String indexName, String dataStreamName, + IndexMode templateIndexMode, Metadata metadata, long resolvedAt, Settings allSettings diff --git a/server/src/main/java/org/elasticsearch/index/IndexMode.java b/server/src/main/java/org/elasticsearch/index/IndexMode.java index 8e236fd1a36e2..3284505c0481e 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexMode.java +++ b/server/src/main/java/org/elasticsearch/index/IndexMode.java @@ -23,9 +23,11 @@ import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper; import java.io.IOException; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.stream.Collectors; import java.util.stream.Stream; import static java.util.stream.Collectors.toSet; @@ -38,7 +40,7 @@ * to be set or not set and by enabling extra fields in the mapping. */ public enum IndexMode { - STANDARD { + STANDARD("standard") { @Override void validateWithOtherSettings(Map, Object> settings) { settingRequiresTimeSeries(settings, IndexMetadata.INDEX_ROUTING_PATH); @@ -76,7 +78,7 @@ public MetadataFieldMapper buildTimeSeriesIdFieldMapper() { return null; } }, - TIME_SERIES { + TIME_SERIES("time_series") { @Override void validateWithOtherSettings(Map, Object> settings) { if (settings.get(IndexMetadata.INDEX_ROUTING_PARTITION_SIZE_SETTING) != Integer.valueOf(1)) { @@ -180,6 +182,16 @@ public MetadataFieldMapper buildTimeSeriesIdFieldMapper() { ).collect(toSet()) ); + private final String name; + + IndexMode(String name) { + this.name = name; + } + + public String getName() { + return name; + } + abstract void validateWithOtherSettings(Map, Object> settings); /** @@ -209,4 +221,23 @@ public MetadataFieldMapper buildTimeSeriesIdFieldMapper() { * field mappers for the index. */ public abstract MetadataFieldMapper buildTimeSeriesIdFieldMapper(); + + public static IndexMode fromString(String value) { + return switch (value) { + case "standard" -> IndexMode.STANDARD; + case "time_series" -> IndexMode.TIME_SERIES; + default -> throw new IllegalArgumentException( + "[" + + value + + "] is an invalid index mode, valid modes are: [" + + Arrays.stream(IndexMode.values()).map(IndexMode::toString).collect(Collectors.joining()) + + "]" + ); + }; + } + + @Override + public String toString() { + return getName(); + } } diff --git a/server/src/main/java/org/elasticsearch/index/IndexSettingProvider.java b/server/src/main/java/org/elasticsearch/index/IndexSettingProvider.java index ced531a615a3d..7837feb3eed94 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexSettingProvider.java +++ b/server/src/main/java/org/elasticsearch/index/IndexSettingProvider.java @@ -22,9 +22,17 @@ public interface IndexSettingProvider { * * @param indexName The name of the new index being created * @param dataStreamName The name of the data stream if the index being created is part of a data stream otherwise null + * @param templateIndexMode The index mode from the data stream template of the matching template. * @param metadata The current metadata instance that doesn't yet contain the index to be created * @param resolvedAt The time the request to create this new index was accepted. * @param allSettings All the setting resolved from the template that matches and any setting defined on the create index request */ - Settings getAdditionalIndexSettings(String indexName, String dataStreamName, Metadata metadata, long resolvedAt, Settings allSettings); + Settings getAdditionalIndexSettings( + String indexName, + String dataStreamName, + IndexMode templateIndexMode, + Metadata metadata, + long resolvedAt, + Settings allSettings + ); } diff --git a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java index f574fb5b00feb..8c0fc498a1e67 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java @@ -699,7 +699,8 @@ static DataStream updateDataStream(DataStream dataStream, Metadata.Builder metad dataStream.isHidden(), dataStream.isReplicated(), dataStream.isSystem(), - dataStream.isAllowCustomRouting() + dataStream.isAllowCustomRouting(), + dataStream.getIndexMode() ); } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTemplateTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTemplateTests.java index 323951dece0df..38b68df728e81 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTemplateTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTemplateTests.java @@ -9,6 +9,7 @@ import org.elasticsearch.cluster.metadata.ComposableIndexTemplate.DataStreamTemplate; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.index.IndexMode; import org.elasticsearch.test.AbstractSerializingTestCase; import org.elasticsearch.xcontent.XContentParser; @@ -32,7 +33,8 @@ protected DataStreamTemplate createTestInstance() { } public static DataStreamTemplate randomInstance() { - return new ComposableIndexTemplate.DataStreamTemplate(randomBoolean(), randomBoolean()); + IndexMode indexMode = randomBoolean() ? randomFrom(IndexMode.values()) : null; + return new ComposableIndexTemplate.DataStreamTemplate(randomBoolean(), randomBoolean(), indexMode); } } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java index a6256fb88a544..472d69b19c619 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java @@ -14,6 +14,8 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.core.Tuple; import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.test.AbstractSerializingTestCase; import org.elasticsearch.xcontent.XContentParser; @@ -33,8 +35,10 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.everyItem; import static org.hamcrest.Matchers.hasItems; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.in; import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.notNullValue; public class DataStreamTests extends AbstractSerializingTestCase { @@ -437,7 +441,8 @@ public void testSnapshot() { preSnapshotDataStream.isHidden(), preSnapshotDataStream.isReplicated() && randomBoolean(), preSnapshotDataStream.isSystem(), - preSnapshotDataStream.isAllowCustomRouting() + preSnapshotDataStream.isAllowCustomRouting(), + preSnapshotDataStream.getIndexMode() ); var reconciledDataStream = postSnapshotDataStream.snapshot( @@ -479,7 +484,8 @@ public void testSnapshotWithAllBackingIndicesRemoved() { preSnapshotDataStream.isHidden(), preSnapshotDataStream.isReplicated(), preSnapshotDataStream.isSystem(), - preSnapshotDataStream.isAllowCustomRouting() + preSnapshotDataStream.isAllowCustomRouting(), + preSnapshotDataStream.getIndexMode() ); assertNull( @@ -519,4 +525,86 @@ public void testSelectTimeSeriesWriteIndex() { assertThat(result.getName(), equalTo(DataStream.getDefaultBackingIndexName(dataStreamName, 1, start1.toEpochMilli()))); } + public void testValidate() { + { + // Valid cases: + Instant currentTime = Instant.now().truncatedTo(ChronoUnit.MILLIS); + + // These ranges are on the edge of each other temporal boundaries. + Instant start1 = currentTime.minus(6, ChronoUnit.HOURS); + Instant end1 = currentTime.minus(2, ChronoUnit.HOURS); + Instant start2 = currentTime.minus(2, ChronoUnit.HOURS); + Instant end2 = currentTime.plus(2, ChronoUnit.HOURS); + + String dataStreamName = "logs_my-app_prod"; + var clusterState = DataStreamTestHelper.getClusterStateWithDataStream( + dataStreamName, + List.of(Tuple.tuple(start1, end1), Tuple.tuple(start2, end2)) + ); + DataStream dataStream = clusterState.getMetadata().dataStreams().get(dataStreamName); + assertThat(dataStream, notNullValue()); + assertThat(dataStream.getIndices(), hasSize(2)); + assertThat( + IndexSettings.TIME_SERIES_START_TIME.get(clusterState.getMetadata().index(dataStream.getIndices().get(0)).getSettings()), + equalTo(start1) + ); + assertThat( + IndexSettings.TIME_SERIES_END_TIME.get(clusterState.getMetadata().index(dataStream.getIndices().get(0)).getSettings()), + equalTo(end1) + ); + assertThat( + IndexSettings.TIME_SERIES_START_TIME.get(clusterState.getMetadata().index(dataStream.getIndices().get(1)).getSettings()), + equalTo(start2) + ); + assertThat( + IndexSettings.TIME_SERIES_END_TIME.get(clusterState.getMetadata().index(dataStream.getIndices().get(1)).getSettings()), + equalTo(end2) + ); + + // Create a temporal gap between, this is valid and shouldn't fail: + DataStreamTestHelper.getClusterStateWithDataStream( + dataStreamName, + List.of(Tuple.tuple(start1, end1.minus(1, ChronoUnit.MINUTES)), Tuple.tuple(start2.plus(1, ChronoUnit.MINUTES), end2)) + ); + } + { + // Invalid case: + Instant currentTime = Instant.now(); + + Instant start1 = currentTime.minus(6, ChronoUnit.HOURS); + Instant end1 = currentTime.minus(2, ChronoUnit.HOURS); + // Start2 is inside start1 and end1 range: + Instant start2 = currentTime.minus(3, ChronoUnit.HOURS); + Instant end2 = currentTime.plus(2, ChronoUnit.HOURS); + + String dataStreamName = "logs_my-app_prod"; + var e = expectThrows( + IllegalArgumentException.class, + () -> DataStreamTestHelper.getClusterStateWithDataStream( + dataStreamName, + List.of(Tuple.tuple(start1, end1), Tuple.tuple(start2, end2)) + ) + ); + var formatter = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER; + assertThat( + e.getMessage(), + equalTo( + "backing index [" + + DataStream.getDefaultBackingIndexName(dataStreamName, 1, start1.toEpochMilli()) + + "] with range [" + + formatter.format(start1) + + " TO " + + formatter.format(end1) + + "] is overlapping with backing index [" + + DataStream.getDefaultBackingIndexName(dataStreamName, 2, start2.toEpochMilli()) + + "] with range [" + + formatter.format(start2) + + " TO " + + formatter.format(end2) + + "]" + ) + ); + } + } + } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolverTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolverTests.java index e6aefa60ded61..5fbd5f8b23543 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolverTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolverTests.java @@ -2785,7 +2785,8 @@ public void testHiddenDataStreams() { true, false, false, - false + false, + null ) ) ) diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java b/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java index 37e2cf71d3362..3bb851316ead8 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java @@ -21,6 +21,7 @@ import org.elasticsearch.core.Tuple; import org.elasticsearch.env.Environment; import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettingProvider; import org.elasticsearch.index.IndexSettingProviders; @@ -89,7 +90,7 @@ public static DataStream newInstance( long generation, Map metadata ) { - return new DataStream(name, timeStampField, indices, generation, metadata, false, false, false, false); + return new DataStream(name, timeStampField, indices, generation, metadata, false, false, false, false, null); } public static String getLegacyDefaultBackingIndexName( @@ -207,6 +208,7 @@ public static DataStream randomInstance(String dataStreamName, LongSupplier time if (randomBoolean()) { metadata = Map.of("key", "value"); } + return new DataStream( dataStreamName, createTimestampField("@timestamp"), @@ -215,9 +217,10 @@ public static DataStream randomInstance(String dataStreamName, LongSupplier time metadata, randomBoolean(), randomBoolean(), - false, + false, // Some tests don't work well with system data streams, since these data streams require special handling timeProvider, - false + randomBoolean(), + randomBoolean() ? IndexMode.STANDARD : null // IndexMode.TIME_SERIES triggers validation that many unit tests doesn't pass ); } @@ -315,10 +318,17 @@ public static ClusterState getClusterStateWithDataStream(String dataStream, List backingIndices.add(im); generation++; } - DataStream ds = newInstance( + DataStream ds = new DataStream( dataStream, createTimestampField("@timestamp"), - backingIndices.stream().map(IndexMetadata::getIndex).collect(Collectors.toList()) + backingIndices.stream().map(IndexMetadata::getIndex).collect(Collectors.toList()), + backingIndices.size(), + null, + false, + false, + false, + false, + IndexMode.TIME_SERIES ); builder.put(ds); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java index fc0c47dc42e92..1661585b5062f 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java @@ -311,7 +311,8 @@ static DataStream updateLocalDataStream(Index backingIndexToFollow, DataStream l remoteDataStream.isHidden(), true, remoteDataStream.isSystem(), - remoteDataStream.isAllowCustomRouting() + remoteDataStream.isAllowCustomRouting(), + remoteDataStream.getIndexMode() ); } else { if (localDataStream.isReplicated() == false) { @@ -342,7 +343,8 @@ static DataStream updateLocalDataStream(Index backingIndexToFollow, DataStream l localDataStream.isHidden(), localDataStream.isReplicated(), localDataStream.isSystem(), - localDataStream.isAllowCustomRouting() + localDataStream.isAllowCustomRouting(), + localDataStream.getIndexMode() ); } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java index 2a9ef8a491e53..09d089901dbda 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java @@ -2281,7 +2281,8 @@ private static ClusterState createRemoteClusterStateWithDataStream(String dataSt false, false, system, - false + false, + null ); ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("remote")) .metadata(Metadata.builder().put(indexMetadata, true).put(dataStream).version(0L)); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowActionTests.java index 6043080baee7a..590ffdeabc858 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowActionTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowActionTests.java @@ -87,13 +87,25 @@ static DataStream generateDataSteam(String name, int numBackingIndices, boolean false, replicate, false, - false + false, + null ); } static DataStream generateDataSteam(String name, int generation, boolean replicate, String... backingIndexNames) { List backingIndices = Arrays.stream(backingIndexNames).map(value -> new Index(value, "uuid")).collect(Collectors.toList()); - return new DataStream(name, new TimestampField("@timestamp"), backingIndices, generation, Map.of(), false, replicate, false, false); + return new DataStream( + name, + new TimestampField("@timestamp"), + backingIndices, + generation, + Map.of(), + false, + replicate, + false, + false, + null + ); } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/LifecyclePolicyUtilsTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/LifecyclePolicyUtilsTests.java index 0887b34d6b110..c74199f434f58 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/LifecyclePolicyUtilsTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/LifecyclePolicyUtilsTests.java @@ -137,7 +137,7 @@ public void testCalculateUsage() { null, null, null, - new ComposableIndexTemplate.DataStreamTemplate(false, false) + new ComposableIndexTemplate.DataStreamTemplate(false, false, null) ) ) ) @@ -206,7 +206,7 @@ public void testCalculateUsage() { null, null, null, - new ComposableIndexTemplate.DataStreamTemplate(false, false) + new ComposableIndexTemplate.DataStreamTemplate(false, false, null) ) ) ) diff --git a/x-pack/plugin/data-streams/qa/rest/src/javaRestTest/java/org/elasticsearch/xpack/datastreams/TsdbDataStreamRestIT.java b/x-pack/plugin/data-streams/qa/rest/src/javaRestTest/java/org/elasticsearch/xpack/datastreams/TsdbDataStreamRestIT.java index 78dc6f0170e9e..912411c56e72a 100644 --- a/x-pack/plugin/data-streams/qa/rest/src/javaRestTest/java/org/elasticsearch/xpack/datastreams/TsdbDataStreamRestIT.java +++ b/x-pack/plugin/data-streams/qa/rest/src/javaRestTest/java/org/elasticsearch/xpack/datastreams/TsdbDataStreamRestIT.java @@ -36,7 +36,6 @@ public class TsdbDataStreamRestIT extends ESRestTestCase { "index": { "number_of_replicas": 0, "number_of_shards": 2, - "mode": "time_series", "routing_path": ["metricset", "time_series_dimension"] } }, @@ -80,7 +79,9 @@ public class TsdbDataStreamRestIT extends ESRestTestCase { } } }, - "data_stream": {} + "data_stream": { + "index_mode": "time_series" + } }"""; private static final String DOC = """ diff --git a/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java b/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java index 1f8e3188fcc2f..2d976798565b6 100644 --- a/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java +++ b/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamIT.java @@ -1137,7 +1137,7 @@ public void testIndexDocsWithCustomRoutingAllowed() throws Exception { null, null, null, - new ComposableIndexTemplate.DataStreamTemplate(false, true) + new ComposableIndexTemplate.DataStreamTemplate(false, true, null) ); client().execute( PutComposableIndexTemplateAction.INSTANCE, @@ -1787,7 +1787,7 @@ public void testPartitionedTemplate() throws IOException { null, null, null, - new ComposableIndexTemplate.DataStreamTemplate(false, true) + new ComposableIndexTemplate.DataStreamTemplate(false, true, null) ); ComposableIndexTemplate finalTemplate = template; client().execute( @@ -1813,7 +1813,7 @@ public void testPartitionedTemplate() throws IOException { null, null, null, - new ComposableIndexTemplate.DataStreamTemplate(false, true) + new ComposableIndexTemplate.DataStreamTemplate(false, true, null) ); client().execute( PutComposableIndexTemplateAction.INSTANCE, @@ -1839,7 +1839,7 @@ public void testPartitionedTemplate() throws IOException { null, null, null, - new ComposableIndexTemplate.DataStreamTemplate(false, false) + new ComposableIndexTemplate.DataStreamTemplate(false, false, null) ); ComposableIndexTemplate finalTemplate1 = template; Exception e = expectThrows( @@ -1881,7 +1881,7 @@ public void testSearchWithRouting() throws IOException, ExecutionException, Inte null, null, null, - new ComposableIndexTemplate.DataStreamTemplate(false, true) + new ComposableIndexTemplate.DataStreamTemplate(false, true, null) ); client().execute( PutComposableIndexTemplateAction.INSTANCE, diff --git a/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/DataStreamIndexSettingsProvider.java b/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/DataStreamIndexSettingsProvider.java index ae66b25f00802..670e504db3166 100644 --- a/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/DataStreamIndexSettingsProvider.java +++ b/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/DataStreamIndexSettingsProvider.java @@ -20,7 +20,6 @@ import java.time.Instant; import java.util.Locale; -import java.util.Optional; public class DataStreamIndexSettingsProvider implements IndexSettingProvider { @@ -30,41 +29,46 @@ public class DataStreamIndexSettingsProvider implements IndexSettingProvider { public Settings getAdditionalIndexSettings( String indexName, String dataStreamName, + IndexMode templateIndexMode, Metadata metadata, long resolvedAt, Settings allSettings ) { if (dataStreamName != null) { - IndexMode indexMode = Optional.ofNullable(allSettings.get(IndexSettings.MODE.getKey())) - .map(value -> IndexMode.valueOf(value.toUpperCase(Locale.ROOT))) - .orElse(IndexMode.STANDARD); - if (indexMode == IndexMode.TIME_SERIES) { - TimeValue lookAheadTime = allSettings.getAsTime( - IndexSettings.LOOK_AHEAD_TIME.getKey(), - IndexSettings.LOOK_AHEAD_TIME.getDefault(allSettings) - ); + DataStream dataStream = metadata.dataStreams().get(dataStreamName); + IndexMode indexMode; + if (dataStream != null) { + indexMode = dataStream.getIndexMode(); + } else { + indexMode = templateIndexMode; + } + if (indexMode != null) { Settings.Builder builder = Settings.builder(); - DataStream dataStream = metadata.dataStreams().get(dataStreamName); - Instant start; - if (dataStream == null) { - start = Instant.ofEpochMilli(resolvedAt).minusMillis(lookAheadTime.getMillis()); - } else { - IndexMetadata currentLatestBackingIndex = metadata.index(dataStream.getWriteIndex()); - if (currentLatestBackingIndex.getSettings().hasValue(IndexSettings.TIME_SERIES_END_TIME.getKey()) == false) { - throw new IllegalStateException( - String.format( - Locale.ROOT, - "backing index [%s] in tsdb mode doesn't have the [%s] index setting", - currentLatestBackingIndex.getIndex().getName(), - IndexSettings.TIME_SERIES_START_TIME.getKey() - ) - ); + builder.put(IndexSettings.MODE.getKey(), indexMode); + + if (indexMode == IndexMode.TIME_SERIES) { + TimeValue lookAheadTime = IndexSettings.LOOK_AHEAD_TIME.get(allSettings); + Instant start; + if (dataStream == null) { + start = Instant.ofEpochMilli(resolvedAt).minusMillis(lookAheadTime.getMillis()); + } else { + IndexMetadata currentLatestBackingIndex = metadata.index(dataStream.getWriteIndex()); + if (currentLatestBackingIndex.getSettings().hasValue(IndexSettings.TIME_SERIES_END_TIME.getKey()) == false) { + throw new IllegalStateException( + String.format( + Locale.ROOT, + "backing index [%s] in tsdb mode doesn't have the [%s] index setting", + currentLatestBackingIndex.getIndex().getName(), + IndexSettings.TIME_SERIES_END_TIME.getKey() + ) + ); + } + start = IndexSettings.TIME_SERIES_END_TIME.get(currentLatestBackingIndex.getSettings()); } - start = IndexSettings.TIME_SERIES_END_TIME.get(currentLatestBackingIndex.getSettings()); + builder.put(IndexSettings.TIME_SERIES_START_TIME.getKey(), FORMATTER.format(start)); + Instant end = Instant.ofEpochMilli(resolvedAt).plusMillis(lookAheadTime.getMillis()); + builder.put(IndexSettings.TIME_SERIES_END_TIME.getKey(), FORMATTER.format(end)); } - builder.put(IndexSettings.TIME_SERIES_START_TIME.getKey(), FORMATTER.format(start)); - Instant end = Instant.ofEpochMilli(resolvedAt).plusMillis(lookAheadTime.getMillis()); - builder.put(IndexSettings.TIME_SERIES_END_TIME.getKey(), FORMATTER.format(end)); return builder.build(); } } diff --git a/x-pack/plugin/data-streams/src/test/java/org/elasticsearch/xpack/datastreams/DataStreamIndexSettingsProviderTests.java b/x-pack/plugin/data-streams/src/test/java/org/elasticsearch/xpack/datastreams/DataStreamIndexSettingsProviderTests.java index f6243049d4613..7cb683e00b85c 100644 --- a/x-pack/plugin/data-streams/src/test/java/org/elasticsearch/xpack/datastreams/DataStreamIndexSettingsProviderTests.java +++ b/x-pack/plugin/data-streams/src/test/java/org/elasticsearch/xpack/datastreams/DataStreamIndexSettingsProviderTests.java @@ -13,12 +13,14 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; +import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.test.ESTestCase; import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.List; +import java.util.Locale; import static org.elasticsearch.common.settings.Settings.builder; import static org.elasticsearch.xpack.datastreams.DataStreamIndexSettingsProvider.FORMATTER; @@ -32,15 +34,18 @@ public void testGetAdditionalIndexSettings() { Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS); TimeValue lookAheadTime = TimeValue.timeValueHours(2); // default - Settings settings = builder().put("index.mode", "time_series").build(); + Settings settings = Settings.EMPTY; var provider = new DataStreamIndexSettingsProvider(); Settings result = provider.getAdditionalIndexSettings( DataStream.getDefaultBackingIndexName(dataStreamName, 1), dataStreamName, + IndexMode.TIME_SERIES, metadata, now.toEpochMilli(), settings ); + assertThat(result.size(), equalTo(3)); + assertThat(result.get(IndexSettings.MODE.getKey()), equalTo(IndexMode.TIME_SERIES.name().toLowerCase(Locale.ROOT))); assertThat(IndexSettings.TIME_SERIES_START_TIME.get(result), equalTo(now.minusMillis(lookAheadTime.getMillis()))); assertThat(IndexSettings.TIME_SERIES_END_TIME.get(result), equalTo(now.plusMillis(lookAheadTime.getMillis()))); } @@ -56,59 +61,41 @@ public void testGetAdditionalIndexSettingsLookAheadTime() { Settings result = provider.getAdditionalIndexSettings( DataStream.getDefaultBackingIndexName(dataStreamName, 1), dataStreamName, + IndexMode.TIME_SERIES, metadata, now.toEpochMilli(), settings ); - assertThat(result.size(), equalTo(2)); + assertThat(result.size(), equalTo(3)); + assertThat(result.get(IndexSettings.MODE.getKey()), equalTo(IndexMode.TIME_SERIES.name().toLowerCase(Locale.ROOT))); assertThat(IndexSettings.TIME_SERIES_START_TIME.get(result), equalTo(now.minusMillis(lookAheadTime.getMillis()))); assertThat(IndexSettings.TIME_SERIES_END_TIME.get(result), equalTo(now.plusMillis(lookAheadTime.getMillis()))); } - public void testGetAdditionalIndexSettingsNoTimeSeries() { - Metadata metadata = Metadata.EMPTY_METADATA; - String dataStreamName = "logs-app1"; - - long now = Instant.now().toEpochMilli(); - Settings settings = randomBoolean() ? Settings.EMPTY : builder().put("index.mode", "standard").build(); - var provider = new DataStreamIndexSettingsProvider(); - Settings result = provider.getAdditionalIndexSettings( - DataStream.getDefaultBackingIndexName(dataStreamName, 1), - dataStreamName, - metadata, - now, - settings - ); - assertThat(result, equalTo(Settings.EMPTY)); - } - public void testGetAdditionalIndexSettingsDataStreamAlreadyCreated() { String dataStreamName = "logs-app1"; TimeValue lookAheadTime = TimeValue.timeValueHours(2); Instant sixHoursAgo = Instant.now().minus(6, ChronoUnit.HOURS).truncatedTo(ChronoUnit.MILLIS); Instant currentEnd = sixHoursAgo.plusMillis(lookAheadTime.getMillis()); - Metadata metadata = DataStreamTestHelper.getClusterStateWithDataStreams( - List.of(Tuple.tuple(dataStreamName, 1)), - List.of(), - sixHoursAgo.toEpochMilli(), - builder().put(IndexSettings.TIME_SERIES_START_TIME.getKey(), FORMATTER.format(sixHoursAgo)) - .put(IndexSettings.TIME_SERIES_END_TIME.getKey(), FORMATTER.format(currentEnd)) - .build(), - 1 + Metadata metadata = DataStreamTestHelper.getClusterStateWithDataStream( + dataStreamName, + List.of(new Tuple<>(sixHoursAgo, currentEnd)) ).getMetadata(); Instant now = sixHoursAgo.plus(6, ChronoUnit.HOURS); - Settings settings = builder().put("index.mode", "time_series").build(); + Settings settings = Settings.EMPTY; var provider = new DataStreamIndexSettingsProvider(); var result = provider.getAdditionalIndexSettings( DataStream.getDefaultBackingIndexName(dataStreamName, 1), dataStreamName, + IndexMode.TIME_SERIES, metadata, now.toEpochMilli(), settings ); - assertThat(result.size(), equalTo(2)); + assertThat(result.size(), equalTo(3)); + assertThat(result.get(IndexSettings.MODE.getKey()), equalTo(IndexMode.TIME_SERIES.name().toLowerCase(Locale.ROOT))); assertThat(result.get(IndexSettings.TIME_SERIES_START_TIME.getKey()), equalTo(FORMATTER.format(currentEnd))); assertThat( result.get(IndexSettings.TIME_SERIES_END_TIME.getKey()), @@ -119,22 +106,41 @@ public void testGetAdditionalIndexSettingsDataStreamAlreadyCreated() { public void testGetAdditionalIndexSettingsDataStreamAlreadyCreatedTimeSettingsMissing() { String dataStreamName = "logs-app1"; Instant twoHoursAgo = Instant.now().minus(4, ChronoUnit.HOURS).truncatedTo(ChronoUnit.MILLIS); - Metadata metadata = DataStreamTestHelper.getClusterStateWithDataStreams( - List.of(Tuple.tuple(dataStreamName, 1)), - List.of(), - twoHoursAgo.toEpochMilli(), - builder().build(), - 1 - ).getMetadata(); + Metadata.Builder mb = Metadata.builder( + DataStreamTestHelper.getClusterStateWithDataStreams( + List.of(Tuple.tuple(dataStreamName, 1)), + List.of(), + twoHoursAgo.toEpochMilli(), + builder().build(), + 1 + ).getMetadata() + ); + DataStream ds = mb.dataStream(dataStreamName); + mb.put( + new DataStream( + ds.getName(), + ds.getTimeStampField(), + ds.getIndices(), + ds.getGeneration(), + ds.getMetadata(), + ds.isHidden(), + ds.isReplicated(), + ds.isSystem(), + ds.isAllowCustomRouting(), + IndexMode.TIME_SERIES + ) + ); + Metadata metadata = mb.build(); Instant now = twoHoursAgo.plus(2, ChronoUnit.HOURS); - Settings settings = builder().put("index.mode", "time_series").build(); + Settings settings = Settings.EMPTY; var provider = new DataStreamIndexSettingsProvider(); Exception e = expectThrows( IllegalStateException.class, () -> provider.getAdditionalIndexSettings( DataStream.getDefaultBackingIndexName(dataStreamName, 1), dataStreamName, + IndexMode.TIME_SERIES, metadata, now.toEpochMilli(), settings @@ -143,11 +149,46 @@ public void testGetAdditionalIndexSettingsDataStreamAlreadyCreatedTimeSettingsMi assertThat( e.getMessage(), equalTo( - "backing index [%s] in tsdb mode doesn't have the [index.time_series.start_time] index setting".formatted( + "backing index [%s] in tsdb mode doesn't have the [index.time_series.end_time] index setting".formatted( DataStream.getDefaultBackingIndexName(dataStreamName, 1, twoHoursAgo.toEpochMilli()) ) ) ); } + public void testGetAdditionalIndexSettingsIndexModeNotSpecified() { + Metadata metadata = Metadata.EMPTY_METADATA; + String dataStreamName = "logs-app1"; + + Settings settings = Settings.EMPTY; + var provider = new DataStreamIndexSettingsProvider(); + Settings result = provider.getAdditionalIndexSettings( + DataStream.getDefaultBackingIndexName(dataStreamName, 1), + dataStreamName, + null, + metadata, + 1L, + settings + ); + assertThat(result.size(), equalTo(0)); + } + + public void testGetAdditionalIndexSettingsIndexModeStandardSpecified() { + Metadata metadata = Metadata.EMPTY_METADATA; + String dataStreamName = "logs-app1"; + + Settings settings = Settings.EMPTY; + var provider = new DataStreamIndexSettingsProvider(); + Settings result = provider.getAdditionalIndexSettings( + DataStream.getDefaultBackingIndexName(dataStreamName, 1), + dataStreamName, + IndexMode.STANDARD, + metadata, + 1L, + settings + ); + assertThat(result.size(), equalTo(1)); + assertThat(result.get(IndexSettings.MODE.getKey()), equalTo(IndexMode.STANDARD.name().toLowerCase(Locale.ROOT))); + } + } diff --git a/x-pack/plugin/data-streams/src/test/java/org/elasticsearch/xpack/datastreams/DataStreamsStatsTests.java b/x-pack/plugin/data-streams/src/test/java/org/elasticsearch/xpack/datastreams/DataStreamsStatsTests.java index b101d2f692708..d0f01d0d7906e 100644 --- a/x-pack/plugin/data-streams/src/test/java/org/elasticsearch/xpack/datastreams/DataStreamsStatsTests.java +++ b/x-pack/plugin/data-streams/src/test/java/org/elasticsearch/xpack/datastreams/DataStreamsStatsTests.java @@ -240,7 +240,7 @@ private String createDataStream(boolean hidden) throws Exception { null, null, null, - new ComposableIndexTemplate.DataStreamTemplate(hidden, false), + new ComposableIndexTemplate.DataStreamTemplate(hidden, false, null), null ); assertTrue( diff --git a/x-pack/plugin/data-streams/src/test/java/org/elasticsearch/xpack/datastreams/MetadataDataStreamRolloverServiceTests.java b/x-pack/plugin/data-streams/src/test/java/org/elasticsearch/xpack/datastreams/MetadataDataStreamRolloverServiceTests.java index 9b259c79af71c..641b77a88a42c 100644 --- a/x-pack/plugin/data-streams/src/test/java/org/elasticsearch/xpack/datastreams/MetadataDataStreamRolloverServiceTests.java +++ b/x-pack/plugin/data-streams/src/test/java/org/elasticsearch/xpack/datastreams/MetadataDataStreamRolloverServiceTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.cluster.metadata.Template; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; @@ -49,14 +50,21 @@ public class MetadataDataStreamRolloverServiceTests extends ESTestCase { public void testRolloverClusterStateForDataStream() throws Exception { Instant now = Instant.now(); String dataStreamName = "logs-my-app"; - final DataStream dataStream = DataStreamTestHelper.newInstance( + final DataStream dataStream = new DataStream( dataStreamName, new DataStream.TimestampField("@timestamp"), - List.of(new Index(DataStream.getDefaultBackingIndexName(dataStreamName, 1, now.toEpochMilli()), "uuid")) + List.of(new Index(DataStream.getDefaultBackingIndexName(dataStreamName, 1, now.toEpochMilli()), "uuid")), + 1, + null, + false, + false, + false, + false, + IndexMode.TIME_SERIES ); ComposableIndexTemplate template = new ComposableIndexTemplate.Builder().indexPatterns(List.of(dataStream.getName() + "*")) .template(new Template(Settings.builder().put("index.mode", "time_series").build(), null, null)) - .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate()) + .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false, IndexMode.TIME_SERIES)) .build(); Metadata.Builder builder = Metadata.builder(); builder.put("template", template); diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/TransportRollupAction.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/TransportRollupAction.java index 1ba9c4a4efa84..7e809c6d5e4a4 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/TransportRollupAction.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/TransportRollupAction.java @@ -364,7 +364,8 @@ public ClusterState execute(ClusterState currentState) { originalDataStream.isHidden(), originalDataStream.isReplicated(), originalDataStream.isSystem(), - originalDataStream.isAllowCustomRouting() + originalDataStream.isAllowCustomRouting(), + originalDataStream.getIndexMode() ); metadataBuilder.put(dataStream); }