Skip to content

Commit

Permalink
Batch Index Settings Update Requests (elastic#82896)
Browse files Browse the repository at this point in the history
  • Loading branch information
joegallo authored Jan 25, 2022
1 parent eda391a commit 4bf8aec
Show file tree
Hide file tree
Showing 3 changed files with 167 additions and 135 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/82896.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 82896
summary: Batch Index Settings Update Requests
area: Cluster Coordination
type: enhancement
issues:
- 79866
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateTaskExecutor.ClusterTasksResult;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.routing.RoutingTable;
Expand Down Expand Up @@ -50,13 +51,12 @@ public class MetadataUpdateSettingsService {
private static final Logger logger = LogManager.getLogger(MetadataUpdateSettingsService.class);

private final ClusterService clusterService;

private final AllocationService allocationService;

private final IndexScopedSettings indexScopedSettings;
private final IndicesService indicesService;
private final ShardLimitValidator shardLimitValidator;
private final ThreadPool threadPool;
private final ClusterStateTaskExecutor<AckedClusterStateUpdateTask> executor;

public MetadataUpdateSettingsService(
ClusterService clusterService,
Expand All @@ -67,11 +67,28 @@ public MetadataUpdateSettingsService(
ThreadPool threadPool
) {
this.clusterService = clusterService;
this.threadPool = threadPool;
this.allocationService = allocationService;
this.indexScopedSettings = indexScopedSettings;
this.indicesService = indicesService;
this.shardLimitValidator = shardLimitValidator;
this.threadPool = threadPool;
this.executor = (currentState, tasks) -> {
ClusterTasksResult.Builder<AckedClusterStateUpdateTask> builder = ClusterTasksResult.builder();
ClusterState state = currentState;
for (AckedClusterStateUpdateTask task : tasks) {
try {
state = task.execute(state);
builder.success(task);
} catch (Exception e) {
builder.failure(task, e);
}
}
if (state != currentState) {
// reroute in case things change that require it (like number of replicas)
state = allocationService.reroute(state, "settings update");
}
return builder.build(state);
};
}

public void updateSettings(final UpdateSettingsClusterStateUpdateRequest request, final ActionListener<AcknowledgedResponse> listener) {
Expand Down Expand Up @@ -105,149 +122,149 @@ public void updateSettings(final UpdateSettingsClusterStateUpdateRequest request
final Settings openSettings = settingsForOpenIndices.build();
final boolean preserveExisting = request.isPreserveExisting();

clusterService.submitStateUpdateTask(
"update-settings " + Arrays.toString(request.indices()),
new AckedClusterStateUpdateTask(Priority.URGENT, request, wrapPreservingContext(listener, threadPool.getThreadContext())) {
// TODO: move this to custom class instead of AckedClusterStateUpdateTask
AckedClusterStateUpdateTask clusterTask = new AckedClusterStateUpdateTask(
Priority.URGENT,
request,
wrapPreservingContext(listener, threadPool.getThreadContext())
) {
@Override
public ClusterState execute(ClusterState currentState) {
RoutingTable.Builder routingTableBuilder = null;
Metadata.Builder metadataBuilder = Metadata.builder(currentState.metadata());

@Override
public ClusterState execute(ClusterState currentState) {

RoutingTable.Builder routingTableBuilder = null;
Metadata.Builder metadataBuilder = Metadata.builder(currentState.metadata());

// allow to change any settings to a close index, and only allow dynamic settings to be changed
// on an open index
Set<Index> openIndices = new HashSet<>();
Set<Index> closeIndices = new HashSet<>();
final String[] actualIndices = new String[request.indices().length];
for (int i = 0; i < request.indices().length; i++) {
Index index = request.indices()[i];
actualIndices[i] = index.getName();
final IndexMetadata metadata = currentState.metadata().getIndexSafe(index);
if (metadata.getState() == IndexMetadata.State.OPEN) {
openIndices.add(index);
} else {
closeIndices.add(index);
}
// allow to change any settings to a closed index, and only allow dynamic settings to be changed
// on an open index
Set<Index> openIndices = new HashSet<>();
Set<Index> closedIndices = new HashSet<>();
final String[] actualIndices = new String[request.indices().length];
for (int i = 0; i < request.indices().length; i++) {
Index index = request.indices()[i];
actualIndices[i] = index.getName();
final IndexMetadata metadata = currentState.metadata().getIndexSafe(index);
if (metadata.getState() == IndexMetadata.State.OPEN) {
openIndices.add(index);
} else {
closedIndices.add(index);
}
}

if (skippedSettings.isEmpty() == false && openIndices.isEmpty() == false) {
throw new IllegalArgumentException(
String.format(
Locale.ROOT,
"Can't update non dynamic settings [%s] for open indices %s",
skippedSettings,
openIndices
)
);
}
if (skippedSettings.isEmpty() == false && openIndices.isEmpty() == false) {
throw new IllegalArgumentException(
String.format(
Locale.ROOT,
"Can't update non dynamic settings [%s] for open indices %s",
skippedSettings,
openIndices
)
);
}

if (IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.exists(openSettings)) {
final int updatedNumberOfReplicas = IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.get(openSettings);
if (preserveExisting == false) {
// Verify that this won't take us over the cluster shard limit.
shardLimitValidator.validateShardLimitOnReplicaUpdate(currentState, request.indices(), updatedNumberOfReplicas);
if (IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.exists(openSettings)) {
final int updatedNumberOfReplicas = IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.get(openSettings);
if (preserveExisting == false) {
// Verify that this won't take us over the cluster shard limit.
shardLimitValidator.validateShardLimitOnReplicaUpdate(currentState, request.indices(), updatedNumberOfReplicas);

/*
* We do not update the in-sync allocation IDs as they will be removed upon the first index operation
* which makes these copies stale.
*
* TODO: should we update the in-sync allocation IDs once the data is deleted by the node?
*/
routingTableBuilder = RoutingTable.builder(currentState.routingTable());
routingTableBuilder.updateNumberOfReplicas(updatedNumberOfReplicas, actualIndices);
metadataBuilder.updateNumberOfReplicas(updatedNumberOfReplicas, actualIndices);
logger.info("updating number_of_replicas to [{}] for indices {}", updatedNumberOfReplicas, actualIndices);
}
/*
* We do not update the in-sync allocation IDs as they will be removed upon the first index operation
* which makes these copies stale.
*
* TODO: should we update the in-sync allocation IDs once the data is deleted by the node?
*/
routingTableBuilder = RoutingTable.builder(currentState.routingTable());
routingTableBuilder.updateNumberOfReplicas(updatedNumberOfReplicas, actualIndices);
metadataBuilder.updateNumberOfReplicas(updatedNumberOfReplicas, actualIndices);
logger.info("updating number_of_replicas to [{}] for indices {}", updatedNumberOfReplicas, actualIndices);
}
}

updateIndexSettings(
openIndices,
metadataBuilder,
(index, indexSettings) -> indexScopedSettings.updateDynamicSettings(
openSettings,
indexSettings,
Settings.builder(),
index.getName()
),
preserveExisting,
indexScopedSettings
);
updateIndexSettings(
openIndices,
metadataBuilder,
(index, indexSettings) -> indexScopedSettings.updateDynamicSettings(
openSettings,
indexSettings,
Settings.builder(),
index.getName()
),
preserveExisting,
indexScopedSettings
);

updateIndexSettings(
closeIndices,
metadataBuilder,
(index, indexSettings) -> indexScopedSettings.updateSettings(
closedSettings,
indexSettings,
Settings.builder(),
index.getName()
),
preserveExisting,
indexScopedSettings
);
updateIndexSettings(
closedIndices,
metadataBuilder,
(index, indexSettings) -> indexScopedSettings.updateSettings(
closedSettings,
indexSettings,
Settings.builder(),
index.getName()
),
preserveExisting,
indexScopedSettings
);

if (IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.exists(normalizedSettings)
|| IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.exists(normalizedSettings)) {
for (String index : actualIndices) {
final Settings settings = metadataBuilder.get(index).getSettings();
MetadataCreateIndexService.validateTranslogRetentionSettings(settings);
MetadataCreateIndexService.validateStoreTypeSetting(settings);
}
if (IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.exists(normalizedSettings)
|| IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.exists(normalizedSettings)) {
for (String index : actualIndices) {
final Settings settings = metadataBuilder.get(index).getSettings();
MetadataCreateIndexService.validateTranslogRetentionSettings(settings);
MetadataCreateIndexService.validateStoreTypeSetting(settings);
}
boolean changed = false;
// increment settings versions
for (final String index : actualIndices) {
if (same(currentState.metadata().index(index).getSettings(), metadataBuilder.get(index).getSettings()) == false) {
changed = true;
final IndexMetadata.Builder builder = IndexMetadata.builder(metadataBuilder.get(index));
builder.settingsVersion(1 + builder.settingsVersion());
metadataBuilder.put(builder);
}
}
boolean changed = false;
// increment settings versions
for (final String index : actualIndices) {
if (same(currentState.metadata().index(index).getSettings(), metadataBuilder.get(index).getSettings()) == false) {
changed = true;
final IndexMetadata.Builder builder = IndexMetadata.builder(metadataBuilder.get(index));
builder.settingsVersion(1 + builder.settingsVersion());
metadataBuilder.put(builder);
}
}

final ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
boolean changedBlocks = false;
for (IndexMetadata.APIBlock block : IndexMetadata.APIBlock.values()) {
changedBlocks |= maybeUpdateClusterBlock(actualIndices, blocks, block.block, block.setting, openSettings);
}
changed |= changedBlocks;
final ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
boolean changedBlocks = false;
for (IndexMetadata.APIBlock block : IndexMetadata.APIBlock.values()) {
changedBlocks |= maybeUpdateClusterBlock(actualIndices, blocks, block.block, block.setting, openSettings);
}
changed |= changedBlocks;

if (changed == false) {
return currentState;
}
if (changed == false) {
return currentState;
}

ClusterState updatedState = ClusterState.builder(currentState)
.metadata(metadataBuilder)
.routingTable(routingTableBuilder == null ? currentState.routingTable() : routingTableBuilder.build())
.blocks(changedBlocks ? blocks.build() : currentState.blocks())
.build();
ClusterState updatedState = ClusterState.builder(currentState)
.metadata(metadataBuilder)
.routingTable(routingTableBuilder == null ? currentState.routingTable() : routingTableBuilder.build())
.blocks(changedBlocks ? blocks.build() : currentState.blocks())
.build();

// now, reroute in case things change that require it (like number of replicas)
updatedState = allocationService.reroute(updatedState, "settings update");
try {
for (Index index : openIndices) {
final IndexMetadata currentMetadata = currentState.getMetadata().getIndexSafe(index);
final IndexMetadata updatedMetadata = updatedState.metadata().getIndexSafe(index);
indicesService.verifyIndexMetadata(currentMetadata, updatedMetadata);
}
for (Index index : closeIndices) {
final IndexMetadata currentMetadata = currentState.getMetadata().getIndexSafe(index);
final IndexMetadata updatedMetadata = updatedState.metadata().getIndexSafe(index);
// Verifies that the current index settings can be updated with the updated dynamic settings.
indicesService.verifyIndexMetadata(currentMetadata, updatedMetadata);
// Now check that we can create the index with the updated settings (dynamic and non-dynamic).
// This step is mandatory since we allow to update non-dynamic settings on closed indices.
indicesService.verifyIndexMetadata(updatedMetadata, updatedMetadata);
}
} catch (IOException ex) {
throw ExceptionsHelper.convertToElastic(ex);
try {
for (Index index : openIndices) {
final IndexMetadata currentMetadata = currentState.metadata().getIndexSafe(index);
final IndexMetadata updatedMetadata = updatedState.metadata().getIndexSafe(index);
indicesService.verifyIndexMetadata(currentMetadata, updatedMetadata);
}
return updatedState;
for (Index index : closedIndices) {
final IndexMetadata currentMetadata = currentState.metadata().getIndexSafe(index);
final IndexMetadata updatedMetadata = updatedState.metadata().getIndexSafe(index);
// Verifies that the current index settings can be updated with the updated dynamic settings.
indicesService.verifyIndexMetadata(currentMetadata, updatedMetadata);
// Now check that we can create the index with the updated settings (dynamic and non-dynamic).
// This step is mandatory since we allow to update non-dynamic settings on closed indices.
indicesService.verifyIndexMetadata(updatedMetadata, updatedMetadata);
}
} catch (IOException ex) {
throw ExceptionsHelper.convertToElastic(ex);
}
},
ClusterStateTaskExecutor.unbatched()
);

return updatedState;
}
};

clusterService.submitStateUpdateTask("update-settings " + Arrays.toString(request.indices()), clusterTask, this.executor);
}

public static void updateIndexSettings(
Expand All @@ -256,7 +273,6 @@ public static void updateIndexSettings(
BiFunction<Index, Settings.Builder, Boolean> settingUpdater,
Boolean preserveExisting,
IndexScopedSettings indexScopedSettings

) {
for (Index index : indices) {
IndexMetadata indexMetadata = metadataBuilder.getSafe(index);
Expand Down
Loading

0 comments on commit 4bf8aec

Please sign in to comment.