From 1004da29821471eea99afd64da0d151682f46595 Mon Sep 17 00:00:00 2001 From: Parker Timmins Date: Wed, 18 Dec 2024 10:35:19 -0600 Subject: [PATCH] Add action to create index from a source index (#118890) Add new action that creates an index from a source index, copying settings and mappings from the source index. This was refactored out of ReindexDataStreamIndexAction. --- docs/changelog/118890.yaml | 5 + .../action/CreateIndexFromSourceActionIT.java | 250 ++++++++++++++++++ ...ndexDatastreamIndexTransportActionIT.java} | 8 +- .../xpack/migrate/MigratePlugin.java | 3 + .../action/CreateIndexFromSourceAction.java | 117 ++++++++ .../CreateIndexFromSourceTransportAction.java | 126 +++++++++ .../action/ReindexDataStreamIndexAction.java | 4 - ...ReindexDataStreamIndexTransportAction.java | 104 ++++---- .../xpack/security/operator/Constants.java | 1 + 9 files changed, 552 insertions(+), 66 deletions(-) create mode 100644 docs/changelog/118890.yaml create mode 100644 x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/CreateIndexFromSourceActionIT.java rename x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/{ReindexDatastreamIndexIT.java => ReindexDatastreamIndexTransportActionIT.java} (98%) create mode 100644 x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/CreateIndexFromSourceAction.java create mode 100644 x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/CreateIndexFromSourceTransportAction.java diff --git a/docs/changelog/118890.yaml b/docs/changelog/118890.yaml new file mode 100644 index 0000000000000..d3fc17157f130 --- /dev/null +++ b/docs/changelog/118890.yaml @@ -0,0 +1,5 @@ +pr: 118890 +summary: Add action to create index from a source index +area: Data streams +type: enhancement +issues: [] diff --git a/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/CreateIndexFromSourceActionIT.java b/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/CreateIndexFromSourceActionIT.java new file mode 100644 index 0000000000000..b460c6abfeee4 --- /dev/null +++ b/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/CreateIndexFromSourceActionIT.java @@ -0,0 +1,250 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.migrate.action; + +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.admin.indices.get.GetIndexRequest; +import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest; +import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest; +import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.MappingMetadata; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.support.XContentMapValues; +import org.elasticsearch.datastreams.DataStreamsPlugin; +import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.reindex.ReindexPlugin; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.xcontent.json.JsonXContent; +import org.elasticsearch.xpack.migrate.MigratePlugin; + +import java.util.Collection; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.REINDEX_DATA_STREAM_FEATURE_FLAG; + +public class CreateIndexFromSourceActionIT extends ESIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return List.of(MigratePlugin.class, ReindexPlugin.class, MockTransportService.TestPlugin.class, DataStreamsPlugin.class); + } + + public void testDestIndexCreated() throws Exception { + assumeTrue("requires the migration reindex feature flag", REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled()); + + var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT); + indicesAdmin().create(new CreateIndexRequest(sourceIndex)).get(); + + // create from source + var destIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT); + assertAcked( + client().execute(CreateIndexFromSourceAction.INSTANCE, new CreateIndexFromSourceAction.Request(sourceIndex, destIndex)) + ); + + try { + indicesAdmin().getIndex(new GetIndexRequest().indices(destIndex)).actionGet(); + } catch (IndexNotFoundException e) { + fail(); + } + } + + public void testSettingsCopiedFromSource() throws Exception { + assumeTrue("requires the migration reindex feature flag", REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled()); + + // start with a static setting + var numShards = randomIntBetween(1, 10); + var staticSettings = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numShards).build(); + var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT); + indicesAdmin().create(new CreateIndexRequest(sourceIndex, staticSettings)).get(); + + // update with a dynamic setting + var numReplicas = randomIntBetween(0, 10); + var dynamicSettings = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numReplicas).build(); + indicesAdmin().updateSettings(new UpdateSettingsRequest(dynamicSettings, sourceIndex)).actionGet(); + + // create from source + var destIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT); + assertAcked( + client().execute(CreateIndexFromSourceAction.INSTANCE, new CreateIndexFromSourceAction.Request(sourceIndex, destIndex)) + ); + + // assert both static and dynamic settings set on dest index + var settingsResponse = indicesAdmin().getSettings(new GetSettingsRequest().indices(destIndex)).actionGet(); + assertEquals(numReplicas, Integer.parseInt(settingsResponse.getSetting(destIndex, IndexMetadata.SETTING_NUMBER_OF_REPLICAS))); + assertEquals(numShards, Integer.parseInt(settingsResponse.getSetting(destIndex, IndexMetadata.SETTING_NUMBER_OF_SHARDS))); + } + + public void testMappingsCopiedFromSource() { + assumeTrue("requires the migration reindex feature flag", REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled()); + + var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT); + String mapping = """ + { + "_doc":{ + "dynamic":"strict", + "properties":{ + "foo1":{ + "type":"text" + } + } + } + } + """; + indicesAdmin().create(new CreateIndexRequest(sourceIndex).mapping(mapping)).actionGet(); + + // create from source + var destIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT); + assertAcked( + client().execute(CreateIndexFromSourceAction.INSTANCE, new CreateIndexFromSourceAction.Request(sourceIndex, destIndex)) + ); + + var mappingsResponse = indicesAdmin().getMappings(new GetMappingsRequest().indices(sourceIndex, destIndex)).actionGet(); + Map mappings = mappingsResponse.mappings(); + var destMappings = mappings.get(destIndex).sourceAsMap(); + var sourceMappings = mappings.get(sourceIndex).sourceAsMap(); + + assertEquals(sourceMappings, destMappings); + // sanity check specific value from dest mapping + assertEquals("text", XContentMapValues.extractValue("properties.foo1.type", destMappings)); + } + + public void testSettingsOverridden() throws Exception { + assumeTrue("requires the migration reindex feature flag", REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled()); + + var numShardsSource = randomIntBetween(1, 10); + var numReplicasSource = randomIntBetween(0, 10); + var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT); + var sourceSettings = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numShardsSource) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numReplicasSource) + .build(); + indicesAdmin().create(new CreateIndexRequest(sourceIndex, sourceSettings)).get(); + + boolean overrideNumShards = randomBoolean(); + Settings settingsOverride = overrideNumShards + ? Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numShardsSource + 1).build() + : Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numReplicasSource + 1).build(); + + // create from source + var destIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT); + assertAcked( + client().execute( + CreateIndexFromSourceAction.INSTANCE, + new CreateIndexFromSourceAction.Request(sourceIndex, destIndex, settingsOverride, Map.of()) + ) + ); + + // assert settings overridden + int expectedShards = overrideNumShards ? numShardsSource + 1 : numShardsSource; + int expectedReplicas = overrideNumShards ? numReplicasSource : numReplicasSource + 1; + var settingsResponse = indicesAdmin().getSettings(new GetSettingsRequest().indices(destIndex)).actionGet(); + assertEquals(expectedShards, Integer.parseInt(settingsResponse.getSetting(destIndex, IndexMetadata.SETTING_NUMBER_OF_SHARDS))); + assertEquals(expectedReplicas, Integer.parseInt(settingsResponse.getSetting(destIndex, IndexMetadata.SETTING_NUMBER_OF_REPLICAS))); + } + + public void testSettingsNullOverride() throws Exception { + assumeTrue("requires the migration reindex feature flag", REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled()); + + var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT); + var sourceSettings = Settings.builder().put(IndexMetadata.SETTING_BLOCKS_WRITE, true).build(); + indicesAdmin().create(new CreateIndexRequest(sourceIndex, sourceSettings)).get(); + + Settings settingsOverride = Settings.builder().putNull(IndexMetadata.SETTING_BLOCKS_WRITE).build(); + + // create from source + var destIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT); + assertAcked( + client().execute( + CreateIndexFromSourceAction.INSTANCE, + new CreateIndexFromSourceAction.Request(sourceIndex, destIndex, settingsOverride, Map.of()) + ) + ); + + // assert settings overridden + var settingsResponse = indicesAdmin().getSettings(new GetSettingsRequest().indices(destIndex)).actionGet(); + assertNull(settingsResponse.getSetting(destIndex, IndexMetadata.SETTING_BLOCKS_WRITE)); + } + + public void testMappingsOverridden() { + assumeTrue("requires the migration reindex feature flag", REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled()); + + var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT); + String sourceMapping = """ + { + "_doc":{ + "dynamic":"strict", + "properties":{ + "foo1":{ + "type":"text" + }, + "foo2":{ + "type":"boolean" + } + } + } + } + """; + indicesAdmin().create(new CreateIndexRequest(sourceIndex).mapping(sourceMapping)).actionGet(); + + String mappingOverrideStr = """ + { + "_doc":{ + "dynamic":"strict", + "properties":{ + "foo1":{ + "type":"integer" + }, + "foo3": { + "type":"keyword" + } + } + } + } + """; + var mappingOverride = XContentHelper.convertToMap(JsonXContent.jsonXContent, mappingOverrideStr, false); + + // create from source + var destIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT); + assertAcked( + client().execute( + CreateIndexFromSourceAction.INSTANCE, + new CreateIndexFromSourceAction.Request(sourceIndex, destIndex, Settings.EMPTY, mappingOverride) + ) + ); + + var mappingsResponse = indicesAdmin().getMappings(new GetMappingsRequest().indices(destIndex)).actionGet(); + Map mappings = mappingsResponse.mappings(); + var destMappings = mappings.get(destIndex).sourceAsMap(); + + String expectedMappingStr = """ + { + "dynamic":"strict", + "properties":{ + "foo1":{ + "type":"integer" + }, + "foo2": { + "type":"boolean" + }, + "foo3": { + "type":"keyword" + } + } + } + """; + var expectedMapping = XContentHelper.convertToMap(JsonXContent.jsonXContent, expectedMappingStr, false); + assertEquals(expectedMapping, destMappings); + } +} diff --git a/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexIT.java b/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexTransportActionIT.java similarity index 98% rename from x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexIT.java rename to x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexTransportActionIT.java index e492f035da866..0ca58ecf0f0d5 100644 --- a/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexIT.java +++ b/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexTransportActionIT.java @@ -53,7 +53,7 @@ import static org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.REINDEX_DATA_STREAM_FEATURE_FLAG; import static org.hamcrest.Matchers.equalTo; -public class ReindexDatastreamIndexIT extends ESIntegTestCase { +public class ReindexDatastreamIndexTransportActionIT extends ESIntegTestCase { private static final String MAPPING = """ { @@ -126,12 +126,14 @@ public void testDestIndexContainsDocs() throws Exception { assertHitCount(prepareSearch(response.getDestIndex()).setSize(0), numDocs); } - public void testSetSourceToReadOnly() throws Exception { + public void testSetSourceToBlockWrites() throws Exception { assumeTrue("requires the migration reindex feature flag", REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled()); + var settings = randomBoolean() ? Settings.builder().put(IndexMetadata.SETTING_BLOCKS_WRITE, true).build() : Settings.EMPTY; + // empty source index var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT); - indicesAdmin().create(new CreateIndexRequest(sourceIndex)).get(); + indicesAdmin().create(new CreateIndexRequest(sourceIndex, settings)).get(); // call reindex client().execute(ReindexDataStreamIndexAction.INSTANCE, new ReindexDataStreamIndexAction.Request(sourceIndex)).actionGet(); diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/MigratePlugin.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/MigratePlugin.java index f42d05727b9fd..d9dffdefafa2c 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/MigratePlugin.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/MigratePlugin.java @@ -34,6 +34,8 @@ import org.elasticsearch.xcontent.ParseField; import org.elasticsearch.xpack.migrate.action.CancelReindexDataStreamAction; import org.elasticsearch.xpack.migrate.action.CancelReindexDataStreamTransportAction; +import org.elasticsearch.xpack.migrate.action.CreateIndexFromSourceAction; +import org.elasticsearch.xpack.migrate.action.CreateIndexFromSourceTransportAction; import org.elasticsearch.xpack.migrate.action.GetMigrationReindexStatusAction; import org.elasticsearch.xpack.migrate.action.GetMigrationReindexStatusTransportAction; import org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction; @@ -87,6 +89,7 @@ public List getRestHandlers( actions.add(new ActionHandler<>(GetMigrationReindexStatusAction.INSTANCE, GetMigrationReindexStatusTransportAction.class)); actions.add(new ActionHandler<>(CancelReindexDataStreamAction.INSTANCE, CancelReindexDataStreamTransportAction.class)); actions.add(new ActionHandler<>(ReindexDataStreamIndexAction.INSTANCE, ReindexDataStreamIndexTransportAction.class)); + actions.add(new ActionHandler<>(CreateIndexFromSourceAction.INSTANCE, CreateIndexFromSourceTransportAction.class)); } return actions; } diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/CreateIndexFromSourceAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/CreateIndexFromSourceAction.java new file mode 100644 index 0000000000000..d67eaee3d251f --- /dev/null +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/CreateIndexFromSourceAction.java @@ -0,0 +1,117 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package org.elasticsearch.xpack.migrate.action; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.Settings; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; + +public class CreateIndexFromSourceAction extends ActionType { + + public static final String NAME = "indices:admin/index/create_from_source"; + + public static final ActionType INSTANCE = new CreateIndexFromSourceAction(); + + private CreateIndexFromSourceAction() { + super(NAME); + } + + public static class Request extends ActionRequest implements IndicesRequest { + + private final String sourceIndex; + private final String destIndex; + private final Settings settingsOverride; + private final Map mappingsOverride; + + public Request(String sourceIndex, String destIndex) { + this(sourceIndex, destIndex, Settings.EMPTY, Map.of()); + } + + public Request(String sourceIndex, String destIndex, Settings settingsOverride, Map mappingsOverride) { + Objects.requireNonNull(mappingsOverride); + this.sourceIndex = sourceIndex; + this.destIndex = destIndex; + this.settingsOverride = settingsOverride; + this.mappingsOverride = mappingsOverride; + } + + @SuppressWarnings("unchecked") + public Request(StreamInput in) throws IOException { + super(in); + this.sourceIndex = in.readString(); + this.destIndex = in.readString(); + this.settingsOverride = Settings.readSettingsFromStream(in); + this.mappingsOverride = (Map) in.readGenericValue(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(sourceIndex); + out.writeString(destIndex); + settingsOverride.writeTo(out); + out.writeGenericValue(mappingsOverride); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + public String getSourceIndex() { + return sourceIndex; + } + + public String getDestIndex() { + return destIndex; + } + + public Settings getSettingsOverride() { + return settingsOverride; + } + + public Map getMappingsOverride() { + return mappingsOverride; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Request request = (Request) o; + return Objects.equals(sourceIndex, request.sourceIndex) + && Objects.equals(destIndex, request.destIndex) + && Objects.equals(settingsOverride, request.settingsOverride) + && Objects.equals(mappingsOverride, request.mappingsOverride); + } + + @Override + public int hashCode() { + return Objects.hash(sourceIndex, destIndex, settingsOverride, mappingsOverride); + } + + @Override + public String[] indices() { + return new String[] { sourceIndex }; + } + + @Override + public IndicesOptions indicesOptions() { + return IndicesOptions.strictSingleIndexNoExpandForbidClosed(); + } + } +} diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/CreateIndexFromSourceTransportAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/CreateIndexFromSourceTransportAction.java new file mode 100644 index 0000000000000..968b2220628a9 --- /dev/null +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/CreateIndexFromSourceTransportAction.java @@ -0,0 +1,126 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package org.elasticsearch.xpack.migrate.action; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.MappingMetadata; +import org.elasticsearch.cluster.metadata.MetadataCreateIndexService; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.compress.CompressedXContent; +import org.elasticsearch.common.settings.IndexScopedSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xcontent.XContentType; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +public class CreateIndexFromSourceTransportAction extends HandledTransportAction< + CreateIndexFromSourceAction.Request, + AcknowledgedResponse> { + private static final Logger logger = LogManager.getLogger(CreateIndexFromSourceTransportAction.class); + + private final ClusterService clusterService; + private final Client client; + private final IndexScopedSettings indexScopedSettings; + + @Inject + public CreateIndexFromSourceTransportAction( + TransportService transportService, + ClusterService clusterService, + ActionFilters actionFilters, + Client client, + IndexScopedSettings indexScopedSettings + ) { + super( + CreateIndexFromSourceAction.NAME, + false, + transportService, + actionFilters, + CreateIndexFromSourceAction.Request::new, + transportService.getThreadPool().executor(ThreadPool.Names.GENERIC) + ); + this.clusterService = clusterService; + this.client = client; + this.indexScopedSettings = indexScopedSettings; + } + + @Override + protected void doExecute(Task task, CreateIndexFromSourceAction.Request request, ActionListener listener) { + + IndexMetadata sourceIndex = clusterService.state().getMetadata().index(request.getSourceIndex()); + + if (sourceIndex == null) { + listener.onFailure(new IndexNotFoundException(request.getSourceIndex())); + return; + } + + logger.debug("Creating destination index [{}] for source index [{}]", request.getDestIndex(), request.getSourceIndex()); + + Settings settings = Settings.builder() + // add source settings + .put(filterSettings(sourceIndex)) + // add override settings from request + .put(request.getSettingsOverride()) + .build(); + + Map mergeMappings; + try { + mergeMappings = mergeMappings(sourceIndex.mapping(), request.getMappingsOverride()); + } catch (IOException e) { + listener.onFailure(e); + return; + } + + var createIndexRequest = new CreateIndexRequest(request.getDestIndex()).settings(settings); + if (mergeMappings.isEmpty() == false) { + createIndexRequest.mapping(mergeMappings); + } + + client.admin().indices().create(createIndexRequest, listener.map(response -> response)); + } + + private static Map toMap(@Nullable MappingMetadata sourceMapping) { + return Optional.ofNullable(sourceMapping) + .map(MappingMetadata::source) + .map(CompressedXContent::uncompressed) + .map(s -> XContentHelper.convertToMap(s, true, XContentType.JSON).v2()) + .orElse(Map.of()); + } + + private static Map mergeMappings(@Nullable MappingMetadata sourceMapping, Map mappingAddition) + throws IOException { + Map combinedMappingMap = new HashMap<>(toMap(sourceMapping)); + XContentHelper.update(combinedMappingMap, mappingAddition, true); + return combinedMappingMap; + } + + // Filter source index settings to subset of settings that can be included during reindex. + // Similar to the settings filtering done when reindexing for upgrade in Kibana + // https://github.com/elastic/kibana/blob/8a8363f02cc990732eb9cbb60cd388643a336bed/x-pack + // /plugins/upgrade_assistant/server/lib/reindexing/index_settings.ts#L155 + private Settings filterSettings(IndexMetadata sourceIndex) { + return MetadataCreateIndexService.copySettingsFromSource(false, sourceIndex.getSettings(), indexScopedSettings, Settings.builder()) + .build(); + } +} diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexAction.java index 00c81fdc9fbc6..2e3fd1b76ed32 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexAction.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexAction.java @@ -41,10 +41,6 @@ public Request(StreamInput in) throws IOException { this.sourceIndex = in.readString(); } - public static Request readFrom(StreamInput in) throws IOException { - return new Request(in); - } - @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java index 8863c45691c92..165fd61ae6599 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java @@ -10,10 +10,10 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; -import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; -import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; +import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockRequest; +import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockResponse; +import org.elasticsearch.action.admin.indices.readonly.TransportAddIndexBlockAction; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.IndicesOptions; @@ -22,9 +22,7 @@ import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.metadata.IndexMetadata; -import org.elasticsearch.cluster.metadata.MetadataCreateIndexService; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.reindex.BulkByScrollResponse; @@ -37,28 +35,25 @@ import java.util.Locale; import java.util.Map; -import java.util.Set; + +import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.READ_ONLY; +import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.WRITE; public class ReindexDataStreamIndexTransportAction extends HandledTransportAction< ReindexDataStreamIndexAction.Request, ReindexDataStreamIndexAction.Response> { private static final Logger logger = LogManager.getLogger(ReindexDataStreamIndexTransportAction.class); - - private static final Set SETTINGS_TO_ADD_BACK = Set.of(IndexMetadata.SETTING_BLOCKS_WRITE, IndexMetadata.SETTING_READ_ONLY); - private static final IndicesOptions IGNORE_MISSING_OPTIONS = IndicesOptions.fromOptions(true, true, false, false); private final ClusterService clusterService; private final Client client; - private final IndexScopedSettings indexScopedSettings; @Inject public ReindexDataStreamIndexTransportAction( TransportService transportService, ClusterService clusterService, ActionFilters actionFilters, - Client client, - IndexScopedSettings indexScopedSettings + Client client ) { super( ReindexDataStreamIndexAction.NAME, @@ -70,7 +65,6 @@ public ReindexDataStreamIndexTransportAction( ); this.clusterService = clusterService; this.client = client; - this.indexScopedSettings = indexScopedSettings; } @Override @@ -96,20 +90,19 @@ protected void doExecute( SubscribableListener.newForked(l -> setBlockWrites(sourceIndexName, l)) .andThen(l -> deleteDestIfExists(destIndexName, l)) - .andThen(l -> createIndex(sourceIndex, destIndexName, l)) + .andThen(l -> createIndex(sourceIndex, destIndexName, l)) .andThen(l -> reindex(sourceIndexName, destIndexName, l)) - .andThen(l -> updateSettings(settingsBefore, destIndexName, l)) + .andThen(l -> addBlockIfFromSource(READ_ONLY, settingsBefore, destIndexName, l)) + .andThen(l -> addBlockIfFromSource(WRITE, settingsBefore, destIndexName, l)) .andThenApply(ignored -> new ReindexDataStreamIndexAction.Response(destIndexName)) .addListener(listener); } private void setBlockWrites(String sourceIndexName, ActionListener listener) { logger.debug("Setting write block on source index [{}]", sourceIndexName); - final Settings readOnlySettings = Settings.builder().put(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey(), true).build(); - var updateSettingsRequest = new UpdateSettingsRequest(readOnlySettings, sourceIndexName); - client.admin().indices().updateSettings(updateSettingsRequest, new ActionListener<>() { + addBlockToIndex(WRITE, sourceIndexName, new ActionListener<>() { @Override - public void onResponse(AcknowledgedResponse response) { + public void onResponse(AddIndexBlockResponse response) { if (response.isAcknowledged()) { listener.onResponse(null); } else { @@ -121,7 +114,7 @@ public void onResponse(AcknowledgedResponse response) { @Override public void onFailure(Exception e) { if (e instanceof ClusterBlockException || e.getCause() instanceof ClusterBlockException) { - // It's fine if read-only is already set + // It's fine if block-writes is already set listener.onResponse(null); } else { listener.onFailure(e); @@ -138,18 +131,23 @@ private void deleteDestIfExists(String destIndexName, ActionListener listener) { + private void createIndex(IndexMetadata sourceIndex, String destIndexName, ActionListener listener) { logger.debug("Creating destination index [{}] for source index [{}]", destIndexName, sourceIndex.getIndex().getName()); - // Create destination with subset of source index settings that can be added before reindex - var settings = getPreSettings(sourceIndex); - - var sourceMapping = sourceIndex.mapping(); - Map mapping = sourceMapping != null ? sourceMapping.rawSourceAsMap() : Map.of(); - var createIndexRequest = new CreateIndexRequest(destIndexName).settings(settings).mapping(mapping); - - var errorMessage = String.format(Locale.ROOT, "Could not create index [%s]", destIndexName); - client.admin().indices().create(createIndexRequest, failIfNotAcknowledged(listener, errorMessage)); + // override read-only settings if they exist + var removeReadOnlyOverride = Settings.builder() + .putNull(IndexMetadata.SETTING_READ_ONLY) + .putNull(IndexMetadata.SETTING_BLOCKS_WRITE) + .build(); + + var request = new CreateIndexFromSourceAction.Request( + sourceIndex.getIndex().getName(), + destIndexName, + removeReadOnlyOverride, + Map.of() + ); + var errorMessage = String.format(Locale.ROOT, "Could not create index [%s]", request.getDestIndex()); + client.execute(CreateIndexFromSourceAction.INSTANCE, request, failIfNotAcknowledged(listener, errorMessage)); } private void reindex(String sourceIndexName, String destIndexName, ActionListener listener) { @@ -162,35 +160,18 @@ private void reindex(String sourceIndexName, String destIndexName, ActionListene client.execute(ReindexAction.INSTANCE, reindexRequest, listener); } - private void updateSettings(Settings settingsBefore, String destIndexName, ActionListener listener) { - logger.debug("Adding settings from source index that could not be added before reindex"); - - Settings postSettings = getPostSettings(settingsBefore); - if (postSettings.isEmpty()) { + private void addBlockIfFromSource( + IndexMetadata.APIBlock block, + Settings settingsBefore, + String destIndexName, + ActionListener listener + ) { + if (settingsBefore.getAsBoolean(block.settingName(), false)) { + var errorMessage = String.format(Locale.ROOT, "Add [%s] block to index [%s] was not acknowledged", block.name(), destIndexName); + addBlockToIndex(block, destIndexName, failIfNotAcknowledged(listener, errorMessage)); + } else { listener.onResponse(null); - return; } - - var updateSettingsRequest = new UpdateSettingsRequest(postSettings, destIndexName); - var errorMessage = String.format(Locale.ROOT, "Could not update settings on index [%s]", destIndexName); - client.admin().indices().updateSettings(updateSettingsRequest, failIfNotAcknowledged(listener, errorMessage)); - } - - // Filter source index settings to subset of settings that can be included during reindex. - // Similar to the settings filtering done when reindexing for upgrade in Kibana - // https://github.com/elastic/kibana/blob/8a8363f02cc990732eb9cbb60cd388643a336bed/x-pack - // /plugins/upgrade_assistant/server/lib/reindexing/index_settings.ts#L155 - private Settings getPreSettings(IndexMetadata sourceIndex) { - // filter settings that will be added back later - var filtered = sourceIndex.getSettings().filter(settingName -> SETTINGS_TO_ADD_BACK.contains(settingName) == false); - - // filter private and non-copyable settings - var builder = MetadataCreateIndexService.copySettingsFromSource(false, filtered, indexScopedSettings, Settings.builder()); - return builder.build(); - } - - private Settings getPostSettings(Settings settingsBefore) { - return settingsBefore.filter(SETTINGS_TO_ADD_BACK::contains); } public static String generateDestIndexName(String sourceIndex) { @@ -201,11 +182,16 @@ private static ActionListener failIfNotAckno ActionListener listener, String errorMessage ) { - return listener.delegateFailureAndWrap((delegate, response) -> { + return listener.delegateFailure((delegate, response) -> { if (response.isAcknowledged()) { delegate.onResponse(null); + } else { + delegate.onFailure(new ElasticsearchException(errorMessage)); } - throw new ElasticsearchException(errorMessage); }); } + + private void addBlockToIndex(IndexMetadata.APIBlock block, String index, ActionListener listener) { + client.admin().indices().execute(TransportAddIndexBlockAction.TYPE, new AddIndexBlockRequest(block, index), listener); + } } diff --git a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java index c07e4b2c541a2..1cb73de4646cc 100644 --- a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java +++ b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java @@ -641,6 +641,7 @@ public class Constants { new FeatureFlag("reindex_data_stream").isEnabled() ? "indices:admin/data_stream/index/reindex" : null, new FeatureFlag("reindex_data_stream").isEnabled() ? "indices:admin/data_stream/reindex" : null, new FeatureFlag("reindex_data_stream").isEnabled() ? "indices:admin/data_stream/reindex_cancel" : null, + new FeatureFlag("reindex_data_stream").isEnabled() ? "indices:admin/index/create_from_source" : null, "internal:admin/repository/verify", "internal:admin/repository/verify/coordinate" ).filter(Objects::nonNull).collect(Collectors.toUnmodifiableSet());