Skip to content

Commit

Permalink
Add action to create index from a source index (#118890)
Browse files Browse the repository at this point in the history
Add new action that creates an index from a source
index, copying settings and mappings from the source index.
This was refactored out of ReindexDataStreamIndexAction.
  • Loading branch information
parkertimmins authored Dec 18, 2024
1 parent 1147c9d commit 1004da2
Show file tree
Hide file tree
Showing 9 changed files with 552 additions and 66 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/118890.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 118890
summary: Add action to create index from a source index
area: Data streams
type: enhancement
issues: []
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.migrate.action;

import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.MappingMetadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.datastreams.DataStreamsPlugin;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.reindex.ReindexPlugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.migrate.MigratePlugin;

import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Map;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.REINDEX_DATA_STREAM_FEATURE_FLAG;

public class CreateIndexFromSourceActionIT extends ESIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(MigratePlugin.class, ReindexPlugin.class, MockTransportService.TestPlugin.class, DataStreamsPlugin.class);
}

public void testDestIndexCreated() throws Exception {
assumeTrue("requires the migration reindex feature flag", REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled());

var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
indicesAdmin().create(new CreateIndexRequest(sourceIndex)).get();

// create from source
var destIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
assertAcked(
client().execute(CreateIndexFromSourceAction.INSTANCE, new CreateIndexFromSourceAction.Request(sourceIndex, destIndex))
);

try {
indicesAdmin().getIndex(new GetIndexRequest().indices(destIndex)).actionGet();
} catch (IndexNotFoundException e) {
fail();
}
}

public void testSettingsCopiedFromSource() throws Exception {
assumeTrue("requires the migration reindex feature flag", REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled());

// start with a static setting
var numShards = randomIntBetween(1, 10);
var staticSettings = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numShards).build();
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
indicesAdmin().create(new CreateIndexRequest(sourceIndex, staticSettings)).get();

// update with a dynamic setting
var numReplicas = randomIntBetween(0, 10);
var dynamicSettings = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numReplicas).build();
indicesAdmin().updateSettings(new UpdateSettingsRequest(dynamicSettings, sourceIndex)).actionGet();

// create from source
var destIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
assertAcked(
client().execute(CreateIndexFromSourceAction.INSTANCE, new CreateIndexFromSourceAction.Request(sourceIndex, destIndex))
);

// assert both static and dynamic settings set on dest index
var settingsResponse = indicesAdmin().getSettings(new GetSettingsRequest().indices(destIndex)).actionGet();
assertEquals(numReplicas, Integer.parseInt(settingsResponse.getSetting(destIndex, IndexMetadata.SETTING_NUMBER_OF_REPLICAS)));
assertEquals(numShards, Integer.parseInt(settingsResponse.getSetting(destIndex, IndexMetadata.SETTING_NUMBER_OF_SHARDS)));
}

public void testMappingsCopiedFromSource() {
assumeTrue("requires the migration reindex feature flag", REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled());

var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
String mapping = """
{
"_doc":{
"dynamic":"strict",
"properties":{
"foo1":{
"type":"text"
}
}
}
}
""";
indicesAdmin().create(new CreateIndexRequest(sourceIndex).mapping(mapping)).actionGet();

// create from source
var destIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
assertAcked(
client().execute(CreateIndexFromSourceAction.INSTANCE, new CreateIndexFromSourceAction.Request(sourceIndex, destIndex))
);

var mappingsResponse = indicesAdmin().getMappings(new GetMappingsRequest().indices(sourceIndex, destIndex)).actionGet();
Map<String, MappingMetadata> mappings = mappingsResponse.mappings();
var destMappings = mappings.get(destIndex).sourceAsMap();
var sourceMappings = mappings.get(sourceIndex).sourceAsMap();

assertEquals(sourceMappings, destMappings);
// sanity check specific value from dest mapping
assertEquals("text", XContentMapValues.extractValue("properties.foo1.type", destMappings));
}

public void testSettingsOverridden() throws Exception {
assumeTrue("requires the migration reindex feature flag", REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled());

var numShardsSource = randomIntBetween(1, 10);
var numReplicasSource = randomIntBetween(0, 10);
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
var sourceSettings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numShardsSource)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numReplicasSource)
.build();
indicesAdmin().create(new CreateIndexRequest(sourceIndex, sourceSettings)).get();

boolean overrideNumShards = randomBoolean();
Settings settingsOverride = overrideNumShards
? Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numShardsSource + 1).build()
: Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numReplicasSource + 1).build();

// create from source
var destIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
assertAcked(
client().execute(
CreateIndexFromSourceAction.INSTANCE,
new CreateIndexFromSourceAction.Request(sourceIndex, destIndex, settingsOverride, Map.of())
)
);

// assert settings overridden
int expectedShards = overrideNumShards ? numShardsSource + 1 : numShardsSource;
int expectedReplicas = overrideNumShards ? numReplicasSource : numReplicasSource + 1;
var settingsResponse = indicesAdmin().getSettings(new GetSettingsRequest().indices(destIndex)).actionGet();
assertEquals(expectedShards, Integer.parseInt(settingsResponse.getSetting(destIndex, IndexMetadata.SETTING_NUMBER_OF_SHARDS)));
assertEquals(expectedReplicas, Integer.parseInt(settingsResponse.getSetting(destIndex, IndexMetadata.SETTING_NUMBER_OF_REPLICAS)));
}

public void testSettingsNullOverride() throws Exception {
assumeTrue("requires the migration reindex feature flag", REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled());

var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
var sourceSettings = Settings.builder().put(IndexMetadata.SETTING_BLOCKS_WRITE, true).build();
indicesAdmin().create(new CreateIndexRequest(sourceIndex, sourceSettings)).get();

Settings settingsOverride = Settings.builder().putNull(IndexMetadata.SETTING_BLOCKS_WRITE).build();

// create from source
var destIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
assertAcked(
client().execute(
CreateIndexFromSourceAction.INSTANCE,
new CreateIndexFromSourceAction.Request(sourceIndex, destIndex, settingsOverride, Map.of())
)
);

// assert settings overridden
var settingsResponse = indicesAdmin().getSettings(new GetSettingsRequest().indices(destIndex)).actionGet();
assertNull(settingsResponse.getSetting(destIndex, IndexMetadata.SETTING_BLOCKS_WRITE));
}

public void testMappingsOverridden() {
assumeTrue("requires the migration reindex feature flag", REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled());

var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
String sourceMapping = """
{
"_doc":{
"dynamic":"strict",
"properties":{
"foo1":{
"type":"text"
},
"foo2":{
"type":"boolean"
}
}
}
}
""";
indicesAdmin().create(new CreateIndexRequest(sourceIndex).mapping(sourceMapping)).actionGet();

String mappingOverrideStr = """
{
"_doc":{
"dynamic":"strict",
"properties":{
"foo1":{
"type":"integer"
},
"foo3": {
"type":"keyword"
}
}
}
}
""";
var mappingOverride = XContentHelper.convertToMap(JsonXContent.jsonXContent, mappingOverrideStr, false);

// create from source
var destIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
assertAcked(
client().execute(
CreateIndexFromSourceAction.INSTANCE,
new CreateIndexFromSourceAction.Request(sourceIndex, destIndex, Settings.EMPTY, mappingOverride)
)
);

var mappingsResponse = indicesAdmin().getMappings(new GetMappingsRequest().indices(destIndex)).actionGet();
Map<String, MappingMetadata> mappings = mappingsResponse.mappings();
var destMappings = mappings.get(destIndex).sourceAsMap();

String expectedMappingStr = """
{
"dynamic":"strict",
"properties":{
"foo1":{
"type":"integer"
},
"foo2": {
"type":"boolean"
},
"foo3": {
"type":"keyword"
}
}
}
""";
var expectedMapping = XContentHelper.convertToMap(JsonXContent.jsonXContent, expectedMappingStr, false);
assertEquals(expectedMapping, destMappings);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
import static org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.REINDEX_DATA_STREAM_FEATURE_FLAG;
import static org.hamcrest.Matchers.equalTo;

public class ReindexDatastreamIndexIT extends ESIntegTestCase {
public class ReindexDatastreamIndexTransportActionIT extends ESIntegTestCase {

private static final String MAPPING = """
{
Expand Down Expand Up @@ -126,12 +126,14 @@ public void testDestIndexContainsDocs() throws Exception {
assertHitCount(prepareSearch(response.getDestIndex()).setSize(0), numDocs);
}

public void testSetSourceToReadOnly() throws Exception {
public void testSetSourceToBlockWrites() throws Exception {
assumeTrue("requires the migration reindex feature flag", REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled());

var settings = randomBoolean() ? Settings.builder().put(IndexMetadata.SETTING_BLOCKS_WRITE, true).build() : Settings.EMPTY;

// empty source index
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
indicesAdmin().create(new CreateIndexRequest(sourceIndex)).get();
indicesAdmin().create(new CreateIndexRequest(sourceIndex, settings)).get();

// call reindex
client().execute(ReindexDataStreamIndexAction.INSTANCE, new ReindexDataStreamIndexAction.Request(sourceIndex)).actionGet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xpack.migrate.action.CancelReindexDataStreamAction;
import org.elasticsearch.xpack.migrate.action.CancelReindexDataStreamTransportAction;
import org.elasticsearch.xpack.migrate.action.CreateIndexFromSourceAction;
import org.elasticsearch.xpack.migrate.action.CreateIndexFromSourceTransportAction;
import org.elasticsearch.xpack.migrate.action.GetMigrationReindexStatusAction;
import org.elasticsearch.xpack.migrate.action.GetMigrationReindexStatusTransportAction;
import org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction;
Expand Down Expand Up @@ -87,6 +89,7 @@ public List<RestHandler> getRestHandlers(
actions.add(new ActionHandler<>(GetMigrationReindexStatusAction.INSTANCE, GetMigrationReindexStatusTransportAction.class));
actions.add(new ActionHandler<>(CancelReindexDataStreamAction.INSTANCE, CancelReindexDataStreamTransportAction.class));
actions.add(new ActionHandler<>(ReindexDataStreamIndexAction.INSTANCE, ReindexDataStreamIndexTransportAction.class));
actions.add(new ActionHandler<>(CreateIndexFromSourceAction.INSTANCE, CreateIndexFromSourceTransportAction.class));
}
return actions;
}
Expand Down
Loading

0 comments on commit 1004da2

Please sign in to comment.