diff --git a/docs/changelog/83302.yaml b/docs/changelog/83302.yaml new file mode 100644 index 000000000000..0bfa65caac1b --- /dev/null +++ b/docs/changelog/83302.yaml @@ -0,0 +1,6 @@ +pr: 83302 +summary: Fix autoscaling of follower data streams +area: Autoscaling +type: bug +issues: + - 82857 diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java index 309fbe647928..3e1ef209c3ea 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java @@ -273,6 +273,13 @@ public IndexMode getIndexMode() { public DataStream rollover(Index writeIndex, long generation) { ensureNotReplicated(); + return unsafeRollover(writeIndex, generation); + } + + /** + * Like {@link #rollover(Index, long)}, but does no validation, use with care only. + */ + public DataStream unsafeRollover(Index writeIndex, long generation) { List backingIndices = new ArrayList<>(indices); backingIndices.add(writeIndex); return new DataStream( @@ -299,6 +306,13 @@ public DataStream rollover(Index writeIndex, long generation) { */ public Tuple nextWriteIndexAndGeneration(Metadata clusterMetadata) { ensureNotReplicated(); + return unsafeNextWriteIndexAndGeneration(clusterMetadata); + } + + /** + * Like {@link #nextWriteIndexAndGeneration(Metadata)}, but does no validation, use with care only. + */ + public Tuple unsafeNextWriteIndexAndGeneration(Metadata clusterMetadata) { String newWriteIndexName; long generation = this.generation; long currentTimeMillis = timeProvider.getAsLong(); diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java b/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java index 3bb851316ead..4427fc231183 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java @@ -90,7 +90,18 @@ public static DataStream newInstance( long generation, Map metadata ) { - return new DataStream(name, timeStampField, indices, generation, metadata, false, false, false, false, null); + return newInstance(name, timeStampField, indices, generation, metadata, false); + } + + public static DataStream newInstance( + String name, + DataStream.TimestampField timeStampField, + List indices, + long generation, + Map metadata, + boolean replicated + ) { + return new DataStream(name, timeStampField, indices, generation, metadata, false, replicated, false, false, null); } public static String getLegacyDefaultBackingIndexName( @@ -265,6 +276,17 @@ public static ClusterState getClusterStateWithDataStreams( long currentTime, Settings settings, int replicas + ) { + return getClusterStateWithDataStreams(dataStreams, indexNames, currentTime, settings, replicas, false); + } + + public static ClusterState getClusterStateWithDataStreams( + List> dataStreams, + List indexNames, + long currentTime, + Settings settings, + int replicas, + boolean replicated ) { Metadata.Builder builder = Metadata.builder(); @@ -283,7 +305,8 @@ public static ClusterState getClusterStateWithDataStreams( createTimestampField("@timestamp"), backingIndices.stream().map(IndexMetadata::getIndex).collect(Collectors.toList()), dsTuple.v2(), - null + null, + replicated ); builder.put(ds); } diff --git a/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderService.java b/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderService.java index cf4faaee98d3..7c387b729d98 100644 --- a/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderService.java +++ b/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderService.java @@ -568,8 +568,8 @@ private SingleForecast forecast(Metadata metadata, IndexAbstraction.DataStream s DataStream dataStream = stream.getDataStream(); for (int i = 0; i < numberNewIndices; ++i) { final String uuid = UUIDs.randomBase64UUID(); - final Tuple dummyRolledDatastream = dataStream.nextWriteIndexAndGeneration(state.metadata()); - dataStream = dataStream.rollover(new Index(dummyRolledDatastream.v1(), uuid), dummyRolledDatastream.v2()); + final Tuple rolledDataStreamInfo = dataStream.unsafeNextWriteIndexAndGeneration(state.metadata()); + dataStream = dataStream.unsafeRollover(new Index(rolledDataStreamInfo.v1(), uuid), rolledDataStreamInfo.v2()); // this unintentionally copies the in-sync allocation ids too. This has the fortunate effect of these indices // not being regarded new by the disk threshold decider, thereby respecting the low watermark threshold even for primaries. diff --git a/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/storage/ProactiveStorageDeciderServiceTests.java b/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/storage/ProactiveStorageDeciderServiceTests.java index 40f27c22dde3..f8bf59622634 100644 --- a/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/storage/ProactiveStorageDeciderServiceTests.java +++ b/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/storage/ProactiveStorageDeciderServiceTests.java @@ -65,7 +65,10 @@ public void testScale() { ClusterState originalState = DataStreamTestHelper.getClusterStateWithDataStreams( List.of(Tuple.tuple("test", between(1, 10))), List.of(), - 0 + System.currentTimeMillis(), + Settings.EMPTY, + 0, + randomBoolean() ); ClusterState.Builder stateBuilder = ClusterState.builder(originalState); IntStream.range(0, between(1, 10)).forEach(i -> ReactiveStorageDeciderServiceTests.addNode(stateBuilder)); @@ -161,7 +164,10 @@ public void testForecastNoDates() { ClusterState originalState = DataStreamTestHelper.getClusterStateWithDataStreams( List.of(Tuple.tuple("test", between(1, 10))), List.of(), - between(0, 4) + System.currentTimeMillis(), + Settings.EMPTY, + between(0, 4), + randomBoolean() ); ClusterState.Builder stateBuilder = ClusterState.builder(originalState); stateBuilder.routingTable(addRouting(originalState.metadata(), RoutingTable.builder()).build()); @@ -220,7 +226,10 @@ public void testForecast() { ClusterState originalState = DataStreamTestHelper.getClusterStateWithDataStreams( List.of(Tuple.tuple("test", indices)), List.of(), - shardCopies - 1 + System.currentTimeMillis(), + Settings.EMPTY, + shardCopies - 1, + randomBoolean() ); ClusterState.Builder stateBuilder = ClusterState.builder(originalState); stateBuilder.routingTable(addRouting(originalState.metadata(), RoutingTable.builder()).build());