From c78410ad7e333a1f7fc8b2efaecc716152c9518b Mon Sep 17 00:00:00 2001 From: David Roberts Date: Mon, 25 Sep 2023 17:03:29 +0100 Subject: [PATCH] [ML] Extend use of MlConfigVersion to one more place In #97699 we moved away from Version to MlConfigVersion for versioning ML configurations. However, one place was missed. This PR adjusts that missing place, namely the metadata on DFA destination indices. As a result of this change being shipped in a later version than the original change, a hack is required to map the intervening product versions to the MlConfigVersion that was in force. --- .../xpack/core/ml/MlConfigVersion.java | 9 +++- .../action/StartDataFrameAnalyticsAction.java | 3 +- ...ransportStartDataFrameAnalyticsAction.java | 13 ------ .../xpack/ml/dataframe/DestinationIndex.java | 16 +++---- ...ortStartDataFrameAnalyticsActionTests.java | 42 ------------------- .../ml/dataframe/DestinationIndexTests.java | 7 ++-- 6 files changed, 20 insertions(+), 70 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlConfigVersion.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlConfigVersion.java index 0e3f60d56843d..37446a5551fd2 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlConfigVersion.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlConfigVersion.java @@ -321,13 +321,18 @@ public static MlConfigVersion getMlConfigVersionForNode(DiscoveryNode node) { } // Parse an MlConfigVersion from a string. - // Note that version "8.10.0" is silently converted to "10.0.0". + // Note that version "8.10.x" and "8.11.0" are silently converted to "10.0.0". // This is to support upgrade scenarios in pre-prod QA environments. public static MlConfigVersion fromString(String str) { if (str == null) { return CURRENT; } - if (str.equals("8.10.0")) { + // The whole switch from Version to MlConfigVersion was supposed to take + // place during development of 8.10.0, however, one place was missed. As + // a result there may be DFA destination indices in the wild with metadata + // containing 8.10.1, 8.10.2, 8.10.3 or 8.11.0. We can treat these as V_10 + // for config version comparison purposes. + if (str.startsWith("8.10.") || str.equals("8.11.0")) { return V_10; } Matcher matcher = Pattern.compile("^(\\d+)\\.0\\.0$").matcher(str); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDataFrameAnalyticsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDataFrameAnalyticsAction.java index dd56eec10200b..85a7202817e83 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDataFrameAnalyticsAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDataFrameAnalyticsAction.java @@ -8,7 +8,6 @@ import org.elasticsearch.TransportVersion; import org.elasticsearch.TransportVersions; -import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.support.master.MasterNodeRequest; @@ -148,7 +147,7 @@ public static class TaskParams implements PersistentTaskParams, MlTaskParams { public static final MlConfigVersion VERSION_INTRODUCED = MlConfigVersion.V_7_3_0; public static final TransportVersion TRANSPORT_VERSION_INTRODUCED = TransportVersions.V_7_3_0; - public static final Version VERSION_DESTINATION_INDEX_MAPPINGS_CHANGED = Version.V_7_10_0; + public static final MlConfigVersion VERSION_DESTINATION_INDEX_MAPPINGS_CHANGED = MlConfigVersion.V_7_10_0; public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java index e24ffbff06f98..d8f1bbf9389c9 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java @@ -810,19 +810,6 @@ public static String nodeFilter(DiscoveryNode node, TaskParams params) { + TaskParams.VERSION_INTRODUCED + "] or higher"; } - if (node.getVersion().before(TaskParams.VERSION_DESTINATION_INDEX_MAPPINGS_CHANGED) - && params.getVersion().onOrAfter(MlConfigVersion.fromVersion(TaskParams.VERSION_DESTINATION_INDEX_MAPPINGS_CHANGED))) { - return "Not opening job [" - + id - + "] on node [" - + JobNodeSelector.nodeNameAndVersion(node) - + "], because the data frame analytics created for version [" - + params.getVersion() - + "] requires a node of version " - + "[" - + TaskParams.VERSION_DESTINATION_INDEX_MAPPINGS_CHANGED - + "] or higher"; - } return null; } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DestinationIndex.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DestinationIndex.java index 6073029fcf0b9..4798e699f46c4 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DestinationIndex.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DestinationIndex.java @@ -8,7 +8,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.create.CreateIndexAction; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; @@ -33,6 +32,7 @@ import org.elasticsearch.index.analysis.AnalysisRegistry; import org.elasticsearch.index.mapper.NumberFieldMapper; import org.elasticsearch.xpack.core.ClientHelper; +import org.elasticsearch.xpack.core.ml.MlConfigVersion; import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsDest; @@ -95,7 +95,7 @@ public final class DestinationIndex { * If the results mappings change in a way existing destination indices will fail to index * the results, this should be bumped accordingly. */ - public static final Version MIN_COMPATIBLE_VERSION = + public static final MlConfigVersion MIN_COMPATIBLE_VERSION = StartDataFrameAnalyticsAction.TaskParams.VERSION_DESTINATION_INDEX_MAPPINGS_CHANGED; private DestinationIndex() {} @@ -202,7 +202,7 @@ private static CreateIndexRequest createIndexRequest( checkResultsFieldIsNotPresentInProperties(config, properties); properties.putAll(createAdditionalMappings(config, fieldCapabilitiesResponse)); Map metadata = getOrPutDefault(mappingsAsMap, META, HashMap::new); - metadata.putAll(createMetadata(config.getId(), clock, Version.CURRENT)); + metadata.putAll(createMetadata(config.getId(), clock, MlConfigVersion.CURRENT)); if (config.getSource().getRuntimeMappings().isEmpty() == false) { Map runtimeMappings = getOrPutDefault(mappingsAsMap, RUNTIME, HashMap::new); runtimeMappings.putAll(config.getSource().getRuntimeMappings()); @@ -317,7 +317,7 @@ private static Map createAdditionalMappings( } // Visible for testing - static Map createMetadata(String analyticsId, Clock clock, Version version) { + static Map createMetadata(String analyticsId, Clock clock, MlConfigVersion version) { Map metadata = new HashMap<>(); metadata.put(CREATION_DATE_MILLIS, clock.millis()); metadata.put(CREATED_BY, DFA_CREATOR); @@ -403,11 +403,11 @@ public static Metadata readMetadata(String jobId, MappingMetadata mappingMetadat } @SuppressWarnings("unchecked") - private static Version getVersion(String jobId, Map meta) { + private static MlConfigVersion getVersion(String jobId, Map meta) { try { Map version = (Map) meta.get(VERSION); String createdVersionString = (String) version.get(CREATED); - return Version.fromString(createdVersionString); + return MlConfigVersion.fromString(createdVersionString); } catch (Exception e) { logger.error(() -> "[" + jobId + "] Could not retrieve destination index version", e); return null; @@ -443,9 +443,9 @@ public String getVersion() { private static class DestMetadata implements Metadata { - private final Version version; + private final MlConfigVersion version; - private DestMetadata(Version version) { + private DestMetadata(MlConfigVersion version) { this.version = version; } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsActionTests.java index 1a677a1df69bb..b3a8751051061 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsActionTests.java @@ -103,48 +103,6 @@ public void testGetAssignment_NoMlNodes() { ); } - // Cannot assign the node because none of the existing nodes is appropriate: - // - _node_name0 is too old (version 7.2.0) - // - _node_name1 is too old (version 7.9.1) - // - _node_name2 is too old (version 7.9.2) - public void testGetAssignment_MlNodesAreTooOld() { - TaskExecutor executor = createTaskExecutor(); - TaskParams params = new TaskParams(JOB_ID, MlConfigVersion.CURRENT, false); - ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) - .metadata(Metadata.builder().putCustom(MlMetadata.TYPE, new MlMetadata.Builder().build())) - .nodes( - DiscoveryNodes.builder() - .add(createNode(0, true, Version.V_7_2_0, MlConfigVersion.V_7_2_0)) - .add(createNode(1, true, Version.V_7_9_1, MlConfigVersion.V_7_9_1)) - .add(createNode(2, true, Version.V_7_9_2, MlConfigVersion.V_7_9_2)) - ) - .build(); - - Assignment assignment = executor.getAssignment(params, clusterState.nodes().getAllNodes(), clusterState); - assertThat(assignment.getExecutorNode(), is(nullValue())); - assertThat( - assignment.getExplanation(), - allOf( - containsString( - "Not opening job [data_frame_id] on node [{_node_name0}{version=7.2.0}], " - + "because the data frame analytics requires a node of version [7.3.0] or higher" - ), - containsString( - "Not opening job [data_frame_id] on node [{_node_name1}{version=7.9.1}], " - + "because the data frame analytics created for version [" - + MlConfigVersion.CURRENT - + "] requires a node of version [7.10.0] or higher" - ), - containsString( - "Not opening job [data_frame_id] on node [{_node_name2}{version=7.9.2}], " - + "because the data frame analytics created for version [" - + MlConfigVersion.CURRENT - + "] requires a node of version [7.10.0] or higher" - ) - ) - ); - } - // The node can be assigned despite being newer than the job. // In such a case destination index will be created from scratch so that its mappings are up-to-date. public void testGetAssignment_MlNodeIsNewerThanTheMlJobButTheAssignmentSuceeds() { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/DestinationIndexTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/DestinationIndexTests.java index 822b27400c419..5c928ff6e8a3a 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/DestinationIndexTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/DestinationIndexTests.java @@ -37,6 +37,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xcontent.XContentParser; import org.elasticsearch.xcontent.json.JsonXContent; +import org.elasticsearch.xpack.core.ml.MlConfigVersion; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsDest; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsSource; @@ -748,7 +749,7 @@ public void testReadMetadata_GivenMetaNotCreatedByAnalytics() { public void testReadMetadata_GivenCurrentVersion() { Map mappings = new HashMap<>(); - mappings.put("_meta", DestinationIndex.createMetadata("test_id", Clock.systemUTC(), Version.CURRENT)); + mappings.put("_meta", DestinationIndex.createMetadata("test_id", Clock.systemUTC(), MlConfigVersion.CURRENT)); MappingMetadata mappingMetadata = mock(MappingMetadata.class); when(mappingMetadata.getSourceAsMap()).thenReturn(mappings); @@ -756,7 +757,7 @@ public void testReadMetadata_GivenCurrentVersion() { assertThat(metadata.hasMetadata(), is(true)); assertThat(metadata.isCompatible(), is(true)); - assertThat(metadata.getVersion(), equalTo(Version.CURRENT.toString())); + assertThat(metadata.getVersion(), equalTo(MlConfigVersion.CURRENT.toString())); } public void testReadMetadata_GivenMinCompatibleVersion() { @@ -774,7 +775,7 @@ public void testReadMetadata_GivenMinCompatibleVersion() { public void testReadMetadata_GivenIncompatibleVersion() { Map mappings = new HashMap<>(); - mappings.put("_meta", DestinationIndex.createMetadata("test_id", Clock.systemUTC(), Version.V_7_9_3)); + mappings.put("_meta", DestinationIndex.createMetadata("test_id", Clock.systemUTC(), MlConfigVersion.V_7_9_3)); MappingMetadata mappingMetadata = mock(MappingMetadata.class); when(mappingMetadata.getSourceAsMap()).thenReturn(mappings);