Skip to content

Commit

Permalink
[ML] Extend use of MlConfigVersion to one more place
Browse files Browse the repository at this point in the history
In elastic#97699 we moved away from Version to MlConfigVersion for
versioning ML configurations. However, one place was missed.
This PR adjusts that missing place, namely the metadata on
DFA destination indices. As a result of this change being
shipped in a later version than the original change, a hack
is required to map the intervening product versions to the
MlConfigVersion that was in force.
  • Loading branch information
droberts195 committed Sep 25, 2023
1 parent 6b7c4e2 commit c78410a
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -321,13 +321,18 @@ public static MlConfigVersion getMlConfigVersionForNode(DiscoveryNode node) {
}

// Parse an MlConfigVersion from a string.
// Note that version "8.10.0" is silently converted to "10.0.0".
// Note that version "8.10.x" and "8.11.0" are silently converted to "10.0.0".
// This is to support upgrade scenarios in pre-prod QA environments.
public static MlConfigVersion fromString(String str) {
if (str == null) {
return CURRENT;
}
if (str.equals("8.10.0")) {
// The whole switch from Version to MlConfigVersion was supposed to take
// place during development of 8.10.0, however, one place was missed. As
// a result there may be DFA destination indices in the wild with metadata
// containing 8.10.1, 8.10.2, 8.10.3 or 8.11.0. We can treat these as V_10
// for config version comparison purposes.
if (str.startsWith("8.10.") || str.equals("8.11.0")) {
return V_10;
}
Matcher matcher = Pattern.compile("^(\\d+)\\.0\\.0$").matcher(str);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.master.MasterNodeRequest;
Expand Down Expand Up @@ -148,7 +147,7 @@ public static class TaskParams implements PersistentTaskParams, MlTaskParams {

public static final MlConfigVersion VERSION_INTRODUCED = MlConfigVersion.V_7_3_0;
public static final TransportVersion TRANSPORT_VERSION_INTRODUCED = TransportVersions.V_7_3_0;
public static final Version VERSION_DESTINATION_INDEX_MAPPINGS_CHANGED = Version.V_7_10_0;
public static final MlConfigVersion VERSION_DESTINATION_INDEX_MAPPINGS_CHANGED = MlConfigVersion.V_7_10_0;

public static final ConstructingObjectParser<TaskParams, Void> PARSER = new ConstructingObjectParser<>(
MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -810,19 +810,6 @@ public static String nodeFilter(DiscoveryNode node, TaskParams params) {
+ TaskParams.VERSION_INTRODUCED
+ "] or higher";
}
if (node.getVersion().before(TaskParams.VERSION_DESTINATION_INDEX_MAPPINGS_CHANGED)
&& params.getVersion().onOrAfter(MlConfigVersion.fromVersion(TaskParams.VERSION_DESTINATION_INDEX_MAPPINGS_CHANGED))) {
return "Not opening job ["
+ id
+ "] on node ["
+ JobNodeSelector.nodeNameAndVersion(node)
+ "], because the data frame analytics created for version ["
+ params.getVersion()
+ "] requires a node of version "
+ "["
+ TaskParams.VERSION_DESTINATION_INDEX_MAPPINGS_CHANGED
+ "] or higher";
}

return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexAction;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
Expand All @@ -33,6 +32,7 @@
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.MlConfigVersion;
import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsDest;
Expand Down Expand Up @@ -95,7 +95,7 @@ public final class DestinationIndex {
* If the results mappings change in a way existing destination indices will fail to index
* the results, this should be bumped accordingly.
*/
public static final Version MIN_COMPATIBLE_VERSION =
public static final MlConfigVersion MIN_COMPATIBLE_VERSION =
StartDataFrameAnalyticsAction.TaskParams.VERSION_DESTINATION_INDEX_MAPPINGS_CHANGED;

private DestinationIndex() {}
Expand Down Expand Up @@ -202,7 +202,7 @@ private static CreateIndexRequest createIndexRequest(
checkResultsFieldIsNotPresentInProperties(config, properties);
properties.putAll(createAdditionalMappings(config, fieldCapabilitiesResponse));
Map<String, Object> metadata = getOrPutDefault(mappingsAsMap, META, HashMap::new);
metadata.putAll(createMetadata(config.getId(), clock, Version.CURRENT));
metadata.putAll(createMetadata(config.getId(), clock, MlConfigVersion.CURRENT));
if (config.getSource().getRuntimeMappings().isEmpty() == false) {
Map<String, Object> runtimeMappings = getOrPutDefault(mappingsAsMap, RUNTIME, HashMap::new);
runtimeMappings.putAll(config.getSource().getRuntimeMappings());
Expand Down Expand Up @@ -317,7 +317,7 @@ private static Map<String, Object> createAdditionalMappings(
}

// Visible for testing
static Map<String, Object> createMetadata(String analyticsId, Clock clock, Version version) {
static Map<String, Object> createMetadata(String analyticsId, Clock clock, MlConfigVersion version) {
Map<String, Object> metadata = new HashMap<>();
metadata.put(CREATION_DATE_MILLIS, clock.millis());
metadata.put(CREATED_BY, DFA_CREATOR);
Expand Down Expand Up @@ -403,11 +403,11 @@ public static Metadata readMetadata(String jobId, MappingMetadata mappingMetadat
}

@SuppressWarnings("unchecked")
private static Version getVersion(String jobId, Map<String, Object> meta) {
private static MlConfigVersion getVersion(String jobId, Map<String, Object> meta) {
try {
Map<String, Object> version = (Map<String, Object>) meta.get(VERSION);
String createdVersionString = (String) version.get(CREATED);
return Version.fromString(createdVersionString);
return MlConfigVersion.fromString(createdVersionString);
} catch (Exception e) {
logger.error(() -> "[" + jobId + "] Could not retrieve destination index version", e);
return null;
Expand Down Expand Up @@ -443,9 +443,9 @@ public String getVersion() {

private static class DestMetadata implements Metadata {

private final Version version;
private final MlConfigVersion version;

private DestMetadata(Version version) {
private DestMetadata(MlConfigVersion version) {
this.version = version;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,48 +103,6 @@ public void testGetAssignment_NoMlNodes() {
);
}

// Cannot assign the node because none of the existing nodes is appropriate:
// - _node_name0 is too old (version 7.2.0)
// - _node_name1 is too old (version 7.9.1)
// - _node_name2 is too old (version 7.9.2)
public void testGetAssignment_MlNodesAreTooOld() {
TaskExecutor executor = createTaskExecutor();
TaskParams params = new TaskParams(JOB_ID, MlConfigVersion.CURRENT, false);
ClusterState clusterState = ClusterState.builder(new ClusterName("_name"))
.metadata(Metadata.builder().putCustom(MlMetadata.TYPE, new MlMetadata.Builder().build()))
.nodes(
DiscoveryNodes.builder()
.add(createNode(0, true, Version.V_7_2_0, MlConfigVersion.V_7_2_0))
.add(createNode(1, true, Version.V_7_9_1, MlConfigVersion.V_7_9_1))
.add(createNode(2, true, Version.V_7_9_2, MlConfigVersion.V_7_9_2))
)
.build();

Assignment assignment = executor.getAssignment(params, clusterState.nodes().getAllNodes(), clusterState);
assertThat(assignment.getExecutorNode(), is(nullValue()));
assertThat(
assignment.getExplanation(),
allOf(
containsString(
"Not opening job [data_frame_id] on node [{_node_name0}{version=7.2.0}], "
+ "because the data frame analytics requires a node of version [7.3.0] or higher"
),
containsString(
"Not opening job [data_frame_id] on node [{_node_name1}{version=7.9.1}], "
+ "because the data frame analytics created for version ["
+ MlConfigVersion.CURRENT
+ "] requires a node of version [7.10.0] or higher"
),
containsString(
"Not opening job [data_frame_id] on node [{_node_name2}{version=7.9.2}], "
+ "because the data frame analytics created for version ["
+ MlConfigVersion.CURRENT
+ "] requires a node of version [7.10.0] or higher"
)
)
);
}

// The node can be assigned despite being newer than the job.
// In such a case destination index will be created from scratch so that its mappings are up-to-date.
public void testGetAssignment_MlNodeIsNewerThanTheMlJobButTheAssignmentSuceeds() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.core.ml.MlConfigVersion;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsDest;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsSource;
Expand Down Expand Up @@ -748,15 +749,15 @@ public void testReadMetadata_GivenMetaNotCreatedByAnalytics() {

public void testReadMetadata_GivenCurrentVersion() {
Map<String, Object> mappings = new HashMap<>();
mappings.put("_meta", DestinationIndex.createMetadata("test_id", Clock.systemUTC(), Version.CURRENT));
mappings.put("_meta", DestinationIndex.createMetadata("test_id", Clock.systemUTC(), MlConfigVersion.CURRENT));
MappingMetadata mappingMetadata = mock(MappingMetadata.class);
when(mappingMetadata.getSourceAsMap()).thenReturn(mappings);

DestinationIndex.Metadata metadata = DestinationIndex.readMetadata("test_id", mappingMetadata);

assertThat(metadata.hasMetadata(), is(true));
assertThat(metadata.isCompatible(), is(true));
assertThat(metadata.getVersion(), equalTo(Version.CURRENT.toString()));
assertThat(metadata.getVersion(), equalTo(MlConfigVersion.CURRENT.toString()));
}

public void testReadMetadata_GivenMinCompatibleVersion() {
Expand All @@ -774,7 +775,7 @@ public void testReadMetadata_GivenMinCompatibleVersion() {

public void testReadMetadata_GivenIncompatibleVersion() {
Map<String, Object> mappings = new HashMap<>();
mappings.put("_meta", DestinationIndex.createMetadata("test_id", Clock.systemUTC(), Version.V_7_9_3));
mappings.put("_meta", DestinationIndex.createMetadata("test_id", Clock.systemUTC(), MlConfigVersion.V_7_9_3));
MappingMetadata mappingMetadata = mock(MappingMetadata.class);
when(mappingMetadata.getSourceAsMap()).thenReturn(mappings);

Expand Down

0 comments on commit c78410a

Please sign in to comment.