Skip to content

Commit

Permalink
Automatically rollover legacy ml indices
Browse files Browse the repository at this point in the history
  • Loading branch information
davidkyle committed Jan 17, 2025
1 parent fb5d364 commit 502f456
Show file tree
Hide file tree
Showing 10 changed files with 530 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ static TransportVersion def(int id) {
public static final TransportVersion KQL_QUERY_TECH_PREVIEW = def(8_823_00_0);
public static final TransportVersion ESQL_PROFILE_ROWS_PROCESSED = def(8_824_00_0);
public static final TransportVersion BYTE_SIZE_VALUE_ALWAYS_USES_BYTES = def(8_825_00_0);
public static final TransportVersion ML_ROLLOVER_LEGACY_INDICES = def(8_826_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class AnnotationIndex {

// Exposed for testing, but always use the aliases in non-test code.
public static final String LATEST_INDEX_NAME = ".ml-annotations-000001";
public static final String INDEX_PATTERN = ".ml-annotations-*";
// Due to historical bugs this index may not have the correct mappings
// in some production clusters. Therefore new annotations should be
// written to the latest index. If we ever switch to another new annotations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ public final class MlIndexAndAlias {

private static final Logger logger = LogManager.getLogger(MlIndexAndAlias.class);

// Visible for testing
static final Comparator<String> INDEX_NAME_COMPARATOR = new Comparator<>() {

private final Predicate<String> HAS_SIX_DIGIT_SUFFIX = Pattern.compile("\\d{6}").asMatchPredicate();
Expand Down Expand Up @@ -172,7 +171,7 @@ public static void createIndexAndAliasIfNecessary(
} else {
if (indexPointedByCurrentWriteAlias.isEmpty()) {
assert concreteIndexNames.length > 0;
String latestConcreteIndexName = Arrays.stream(concreteIndexNames).max(INDEX_NAME_COMPARATOR).get();
String latestConcreteIndexName = latestIndex(concreteIndexNames);
updateWriteAlias(client, alias, null, latestConcreteIndexName, loggingListener);
return;
}
Expand Down Expand Up @@ -279,18 +278,22 @@ private static void createFirstConcreteIndex(
);
}

private static void updateWriteAlias(
public static void updateWriteAlias(
Client client,
String alias,
@Nullable String currentIndex,
String newIndex,
ActionListener<Boolean> listener
) {
logger.info("About to move write alias [{}] from index [{}] to index [{}]", alias, currentIndex, newIndex);
if (currentIndex != null) {
logger.info("About to move write alias [{}] from index [{}] to index [{}]", alias, currentIndex, newIndex);
} else {
logger.info("About to create write alias [{}] for index [{}]", alias, newIndex);
}
IndicesAliasesRequestBuilder requestBuilder = client.admin()
.indices()
.prepareAliases()
.addAliasAction(IndicesAliasesRequest.AliasActions.add().index(newIndex).alias(alias).isHidden(true));
.addAliasAction(IndicesAliasesRequest.AliasActions.add().index(newIndex).alias(alias).isHidden(true).writeIndex(true));
if (currentIndex != null) {
requestBuilder.removeAlias(currentIndex, alias);
}
Expand Down Expand Up @@ -380,4 +383,16 @@ public static void installIndexTemplateIfRequired(
public static boolean hasIndexTemplate(ClusterState state, String templateName) {
return state.getMetadata().templatesV2().containsKey(templateName);
}

/**
* Returns the latest index. Latest is the index with the highest
* 6 digit suffix.
* @param concreteIndices List of index names
* @return The latest index by index name version suffix
*/
public static String latestIndex(String[] concreteIndices) {
return concreteIndices.length == 1
? concreteIndices[0]
: Arrays.stream(concreteIndices).max(MlIndexAndAlias.INDEX_NAME_COMPARATOR).get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ public void testCreateStateIndexAndAliasIfNecessary_WriteAliasAlreadyExistsAndPo
assertThat(
indicesAliasesRequest.getAliasActions(),
contains(
AliasActions.add().alias(TEST_INDEX_ALIAS).index(FIRST_CONCRETE_INDEX).isHidden(true),
AliasActions.add().alias(TEST_INDEX_ALIAS).index(FIRST_CONCRETE_INDEX).isHidden(true).writeIndex(true),
AliasActions.remove().alias(TEST_INDEX_ALIAS).index(LEGACY_INDEX_WITHOUT_SUFFIX)
)
);
Expand All @@ -318,7 +318,7 @@ private void assertMlStateWriteAliasAddedToMostRecentMlStateIndex(List<String> e
IndicesAliasesRequest indicesAliasesRequest = aliasesRequestCaptor.getValue();
assertThat(
indicesAliasesRequest.getAliasActions(),
contains(AliasActions.add().alias(TEST_INDEX_ALIAS).index(expectedWriteIndexName).isHidden(true))
contains(AliasActions.add().alias(TEST_INDEX_ALIAS).index(expectedWriteIndexName).isHidden(true).writeIndex(true))
);
}

Expand Down Expand Up @@ -364,6 +364,11 @@ public void testIndexNameComparator() {
assertThat(Stream.of(".a-000002", ".b-000001").max(comparator).get(), equalTo(".a-000002"));
}

public void testLatestIndex() {
var names = new String[] { "index-000001", "index-000002", "index-000003" };
assertThat(MlIndexAndAlias.latestIndex(names), equalTo("index-000003"));
}

private void createIndexAndAliasIfNecessary(ClusterState clusterState) {
MlIndexAndAlias.createIndexAndAliasIfNecessary(
client,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@
import org.elasticsearch.xpack.core.ml.action.UpgradeJobModelSnapshotAction;
import org.elasticsearch.xpack.core.ml.action.ValidateDetectorAction;
import org.elasticsearch.xpack.core.ml.action.ValidateJobConfigAction;
import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState;
import org.elasticsearch.xpack.core.ml.dataframe.analyses.MlDataFrameAnalysisNamedXContentProvider;
Expand Down Expand Up @@ -1222,7 +1223,25 @@ public Collection<?> createComponents(PluginServices services) {

MlAutoUpdateService mlAutoUpdateService = new MlAutoUpdateService(
threadPool,
List.of(new DatafeedConfigAutoUpdater(datafeedConfigProvider, indexNameExpressionResolver))
List.of(
new DatafeedConfigAutoUpdater(datafeedConfigProvider, indexNameExpressionResolver),
new MlIndexRollover(
List.of(
new MlIndexRollover.IndexPatternAndAlias(
AnomalyDetectorsIndex.jobStateIndexPattern(),
AnomalyDetectorsIndex.jobStateIndexWriteAlias()
),
new MlIndexRollover.IndexPatternAndAlias(MlStatsIndex.indexPattern(), MlStatsIndex.writeAlias()),
new MlIndexRollover.IndexPatternAndAlias(AnnotationIndex.INDEX_PATTERN, AnnotationIndex.WRITE_ALIAS_NAME)
// TODO notifications = https://github.com/elastic/elasticsearch/pull/120064
// TODO anomaly results
// TODO .ml-inference-XXXXXX - requires alias
// TODO .ml-inference-native-XXXXXX - requires alias (index added in 8.0)
),
indexNameExpressionResolver,
client
)
)
);
clusterService.addListener(mlAutoUpdateService);
// this object registers as a license state listener, and is never removed, so there's no need to retain another reference to it
Expand Down Expand Up @@ -2025,6 +2044,9 @@ public void indicesMigrationComplete(
new AssociatedIndexDescriptor(MlStatsIndex.indexPattern(), "ML stats index"),
new AssociatedIndexDescriptor(".ml-notifications*", "ML notifications indices"),
new AssociatedIndexDescriptor(".ml-annotations*", "ML annotations indices")
// TODO should the inference indices be included here?
// new AssociatedIndexDescriptor(".ml-inference-*", "ML Data Frame Analytics")
// new AssociatedIndexDescriptor(".ml-inference-native*", "ML indices for trained models")
);

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public interface UpdateAction {

String getName();

void runUpdate();
void runUpdate(ClusterState latestState);
}

private final List<UpdateAction> updateActions;
Expand All @@ -47,27 +47,34 @@ public MlAutoUpdateService(ThreadPool threadPool, List<UpdateAction> updateActio

@Override
public void clusterChanged(ClusterChangedEvent event) {
if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
if (event.localNodeMaster() == false) {
return;
}
if (event.localNodeMaster() == false) {
if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
return;
}

TransportVersion minTransportVersion = event.state().getMinTransportVersion();
if (completedUpdates.size() == updateActions.size()) {
return; // all work complete
}

final var latestState = event.state();
TransportVersion minTransportVersion = latestState.getMinTransportVersion();
final List<UpdateAction> toRun = updateActions.stream()
.filter(action -> action.isMinTransportVersionSupported(minTransportVersion))
.filter(action -> completedUpdates.contains(action.getName()) == false)
.filter(action -> action.isAbleToRun(event.state()))
.filter(action -> action.isAbleToRun(latestState))
.filter(action -> currentlyUpdating.add(action.getName()))
.toList();
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> toRun.forEach(this::runUpdate));
// TODO run updates serially
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME)
.execute(() -> toRun.forEach((action) -> this.runUpdate(action, latestState)));
}

private void runUpdate(UpdateAction action) {
private void runUpdate(UpdateAction action, ClusterState latestState) {
try {
logger.debug(() -> "[" + action.getName() + "] starting executing update action");
action.runUpdate();
action.runUpdate(latestState);
this.completedUpdates.add(action.getName());
logger.debug(() -> "[" + action.getName() + "] succeeded executing update action");
} catch (Exception ex) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.ml;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.client.internal.OriginSettingClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.IndexVersions;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.core.ml.utils.MlIndexAndAlias;

import java.util.ArrayList;
import java.util.List;

import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;

/**
* If any of the indices listed in {@code indicesToRollover} are legacy indices
* then call rollover to produce a new index with the current version. If the
* index does not have an alias the alias is created first.
* If none of the {@code indicesToRollover} exist or they are all non-legacy
* indices then nothing will be updated.
*/
public class MlIndexRollover implements MlAutoUpdateService.UpdateAction {

private static final Logger logger = LogManager.getLogger(MlIndexRollover.class);

public record IndexPatternAndAlias(String indexPattern, String alias) {}

private final IndexNameExpressionResolver expressionResolver;
private final OriginSettingClient client;
private final List<IndexPatternAndAlias> indicesToRollover;

public MlIndexRollover(List<IndexPatternAndAlias> indicesToRollover, IndexNameExpressionResolver expressionResolver, Client client) {
this.expressionResolver = expressionResolver;
this.client = new OriginSettingClient(client, ML_ORIGIN);
this.indicesToRollover = indicesToRollover;
}

@Override
public boolean isMinTransportVersionSupported(TransportVersion minTransportVersion) {
// Automatic rollover does not require any new features
// but wait for all nodes to be upgraded anyway
return minTransportVersion.onOrAfter(TransportVersions.ML_ROLLOVER_LEGACY_INDICES);
}

@Override
public boolean isAbleToRun(ClusterState latestState) {
for (var indexPatternAndAlias : indicesToRollover) {
String[] indices = expressionResolver.concreteIndexNames(
latestState,
IndicesOptions.lenientExpandOpenHidden(),
indexPatternAndAlias.indexPattern
);
if (indices.length == 0) {
// The index does not exist but the MlAutoUpdateService will
// need to run this action and mark it as done.
// Ignore the missing index and continue the loop
continue;
}

String latestIndex = MlIndexAndAlias.latestIndex(indices);
IndexRoutingTable routingTable = latestState.getRoutingTable().index(latestIndex);
if (routingTable == null || routingTable.allPrimaryShardsActive() == false) {
return false;
}
}

return true;
}

@Override
public String getName() {
return "ml_legacy_index_rollover";
}

@Override
public void runUpdate(ClusterState latestState) {
List<Exception> failures = new ArrayList<>();

for (var indexPatternAndAlias : indicesToRollover) {
PlainActionFuture<Boolean> rolloverIndices = new PlainActionFuture<>();
rolloverLegacyIndices(latestState, indexPatternAndAlias.indexPattern(), indexPatternAndAlias.alias(), rolloverIndices);
try {
rolloverIndices.actionGet();
} catch (Exception ex) {
logger.warn(() -> "failed rolling over legacy index [" + indexPatternAndAlias.indexPattern() + "]", ex);
if (ex instanceof ElasticsearchException elasticsearchException) {
failures.add(
new ElasticsearchStatusException("Failed rollover", elasticsearchException.status(), elasticsearchException)
);
} else {
failures.add(new ElasticsearchStatusException("Failed rollover", RestStatus.REQUEST_TIMEOUT, ex));
}

break;
}
}

if (failures.isEmpty()) {
logger.info("ML legacy indies rolled over");
return;
}

ElasticsearchException exception = new ElasticsearchException("some error");
failures.forEach(exception::addSuppressed);
throw exception;
}

private void rolloverLegacyIndices(ClusterState clusterState, String indexPattern, String alias, ActionListener<Boolean> listener) {
var concreteIndices = expressionResolver.concreteIndexNames(clusterState, IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED, indexPattern);

if (concreteIndices.length == 0) {
// no matching indices
listener.onResponse(Boolean.FALSE);
return;
}

String latestIndex = MlIndexAndAlias.latestIndex(concreteIndices);
boolean isCompatibleIndexVersion = isCompatibleIndexVersion(clusterState.metadata().index(latestIndex).getCreationVersion());
boolean hasAlias = clusterState.getMetadata().hasAlias(alias);

if (isCompatibleIndexVersion && hasAlias) {
// v8 index with alias, no action required
listener.onResponse(Boolean.FALSE);
return;
}

SubscribableListener.<Boolean>newForked(l -> {
if (hasAlias == false) {
MlIndexAndAlias.updateWriteAlias(client, alias, null, latestIndex, l);
} else {
l.onResponse(Boolean.TRUE);
}
}).<Boolean>andThen((l, success) -> {
if (isCompatibleIndexVersion == false) {
logger.info("rolling over legacy index [{}] with alias [{}]", latestIndex, alias);
rollover(alias, l);
} else {
l.onResponse(Boolean.TRUE);
}
}).addListener(listener);
}

private void rollover(String alias, ActionListener<Boolean> listener) {
client.admin().indices().rolloverIndex(new RolloverRequest(alias, null), listener.delegateFailure((l, response) -> {
l.onResponse(Boolean.TRUE);
}));
}

/**
* True if the version is read *and* write compatible not just read only compatible
*/
static boolean isCompatibleIndexVersion(IndexVersion version) {
return version.onOrAfter(IndexVersions.MINIMUM_COMPATIBLE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public String getName() {
}

@Override
public void runUpdate() {
public void runUpdate(ClusterState latestState) {
PlainActionFuture<List<DatafeedConfig.Builder>> getdatafeeds = new PlainActionFuture<>();
provider.expandDatafeedConfigs("_all", true, null, getdatafeeds);
List<DatafeedConfig.Builder> datafeedConfigBuilders = getdatafeeds.actionGet();
Expand Down
Loading

0 comments on commit 502f456

Please sign in to comment.