From 4e5b94ae033c10921a4a3ebb7274d89420d11d6b Mon Sep 17 00:00:00 2001 From: Salvatore Campagna <93581129+salvatore-campagna@users.noreply.github.com> Date: Wed, 18 Dec 2024 15:38:42 +0100 Subject: [PATCH] feature: extend shard chanbes api to support data streams and aliases (#118937) Extends the `/{index}/ccr/shard_changes` API to accept data stream and alias names in addition to index names. This allows users to retrieve shard changes for data streams and indices accessed through aliases. When a data stream is provided, the API targets the **first backing index** for retrieving shard changes. Similarly, for an alias, the API targets the **first index** associated with the alias. --- .../xpack/ccr/rest/ShardChangesRestIT.java | 140 ++++++++++++++++-- .../ccr/rest/RestShardChangesAction.java | 112 ++++++++++---- 2 files changed, 214 insertions(+), 38 deletions(-) diff --git a/x-pack/plugin/ccr/src/javaRestTest/java/org/elasticsearch/xpack/ccr/rest/ShardChangesRestIT.java b/x-pack/plugin/ccr/src/javaRestTest/java/org/elasticsearch/xpack/ccr/rest/ShardChangesRestIT.java index e5dfea7b772f2..4c61904475093 100644 --- a/x-pack/plugin/ccr/src/javaRestTest/java/org/elasticsearch/xpack/ccr/rest/ShardChangesRestIT.java +++ b/x-pack/plugin/ccr/src/javaRestTest/java/org/elasticsearch/xpack/ccr/rest/ShardChangesRestIT.java @@ -26,6 +26,9 @@ import org.junit.ClassRule; import java.io.IOException; +import java.time.Instant; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; import java.util.List; import java.util.Locale; import java.util.Map; @@ -33,11 +36,14 @@ public class ShardChangesRestIT extends ESRestTestCase { private static final String CCR_SHARD_CHANGES_ENDPOINT = "/%s/ccr/shard_changes"; private static final String BULK_INDEX_ENDPOINT = "/%s/_bulk"; + private static final String DATA_STREAM_ENDPOINT = "/_data_stream/%s"; + private static final String INDEX_TEMPLATE_ENDPOINT = "/_index_template/%s"; private static final String[] SHARD_RESPONSE_FIELDS = new String[] { "took_in_millis", "operations", "shard_id", + "index_abstraction", "index", "settings_version", "max_seq_no_of_updates_or_deletes", @@ -46,6 +52,11 @@ public class ShardChangesRestIT extends ESRestTestCase { "aliases_version", "max_seq_no", "global_checkpoint" }; + + private static final String BULK_INDEX_TEMPLATE = """ + { "index": { "op_type": "create" } } + { "@timestamp": "%s", "name": "%s" } + """;; private static final String[] NAMES = { "skywalker", "leia", "obi-wan", "yoda", "chewbacca", "r2-d2", "c-3po", "darth-vader" }; @ClassRule public static ElasticsearchCluster cluster = ElasticsearchCluster.local() @@ -99,13 +110,86 @@ public void testShardChangesDefaultParams() throws IOException { createIndex(indexName, settings, mappings); assertTrue(indexExists(indexName)); - assertOK(client().performRequest(bulkRequest(indexName, randomIntBetween(10, 20)))); + assertOK(bulkIndex(indexName, randomIntBetween(10, 20))); final Request shardChangesRequest = new Request("GET", shardChangesEndpoint(indexName)); final Response response = client().performRequest(shardChangesRequest); assertOK(response); assertShardChangesResponse( - XContentHelper.convertToMap(JsonXContent.jsonXContent, EntityUtils.toString(response.getEntity()), false) + XContentHelper.convertToMap(JsonXContent.jsonXContent, EntityUtils.toString(response.getEntity()), false), + indexName + ); + } + + public void testDataStreamShardChangesDefaultParams() throws IOException { + final String templateName = randomAlphanumericOfLength(8).toLowerCase(Locale.ROOT); + assertOK(createIndexTemplate(templateName, """ + { + "index_patterns": [ "test-*-*" ], + "data_stream": {}, + "priority": 100, + "template": { + "mappings": { + "properties": { + "@timestamp": { + "type": "date" + }, + "name": { + "type": "keyword" + } + } + } + } + }""")); + + final String dataStreamName = "test-" + + randomAlphanumericOfLength(5).toLowerCase(Locale.ROOT) + + "-" + + randomAlphaOfLength(5).toLowerCase(Locale.ROOT); + assertOK(createDataStream(dataStreamName)); + + assertOK(bulkIndex(dataStreamName, randomIntBetween(10, 20))); + + final Request shardChangesRequest = new Request("GET", shardChangesEndpoint(dataStreamName)); + final Response response = client().performRequest(shardChangesRequest); + assertOK(response); + assertShardChangesResponse( + XContentHelper.convertToMap(JsonXContent.jsonXContent, EntityUtils.toString(response.getEntity()), false), + dataStreamName + ); + } + + public void testIndexAliasShardChangesDefaultParams() throws IOException { + final String indexName = randomAlphanumericOfLength(10).toLowerCase(Locale.ROOT); + final String aliasName = randomAlphanumericOfLength(8).toLowerCase(Locale.ROOT); + final Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), "1s") + .build(); + final String mappings = """ + { + "properties": { + "name": { + "type": "keyword" + } + } + } + """; + createIndex(indexName, settings, mappings); + assertTrue(indexExists(indexName)); + + final Request putAliasRequest = new Request("PUT", "/" + indexName + "/_alias/" + aliasName); + assertOK(client().performRequest(putAliasRequest)); + + assertOK(bulkIndex(aliasName, randomIntBetween(10, 20))); + + final Request shardChangesRequest = new Request("GET", shardChangesEndpoint(aliasName)); + final Response response = client().performRequest(shardChangesRequest); + assertOK(response); + assertShardChangesResponse( + XContentHelper.convertToMap(JsonXContent.jsonXContent, EntityUtils.toString(response.getEntity()), false), + aliasName ); } @@ -121,7 +205,7 @@ public void testShardChangesWithAllParameters() throws IOException { ); assertTrue(indexExists(indexName)); - assertOK(client().performRequest(bulkRequest(indexName, randomIntBetween(100, 200)))); + assertOK(bulkIndex(indexName, randomIntBetween(100, 200))); final Request shardChangesRequest = new Request("GET", shardChangesEndpoint(indexName)); shardChangesRequest.addParameter("from_seq_no", "0"); @@ -132,7 +216,8 @@ public void testShardChangesWithAllParameters() throws IOException { final Response response = client().performRequest(shardChangesRequest); assertOK(response); assertShardChangesResponse( - XContentHelper.convertToMap(JsonXContent.jsonXContent, EntityUtils.toString(response.getEntity()), false) + XContentHelper.convertToMap(JsonXContent.jsonXContent, EntityUtils.toString(response.getEntity()), false), + indexName ); } @@ -148,7 +233,7 @@ public void testShardChangesMultipleRequests() throws IOException { ); assertTrue(indexExists(indexName)); - assertOK(client().performRequest(bulkRequest(indexName, randomIntBetween(100, 200)))); + assertOK(bulkIndex(indexName, randomIntBetween(100, 200))); final Request firstRequest = new Request("GET", shardChangesEndpoint(indexName)); firstRequest.addParameter("from_seq_no", "0"); @@ -159,7 +244,8 @@ public void testShardChangesMultipleRequests() throws IOException { final Response firstResponse = client().performRequest(firstRequest); assertOK(firstResponse); assertShardChangesResponse( - XContentHelper.convertToMap(JsonXContent.jsonXContent, EntityUtils.toString(firstResponse.getEntity()), false) + XContentHelper.convertToMap(JsonXContent.jsonXContent, EntityUtils.toString(firstResponse.getEntity()), false), + indexName ); final Request secondRequest = new Request("GET", shardChangesEndpoint(indexName)); @@ -171,7 +257,8 @@ public void testShardChangesMultipleRequests() throws IOException { final Response secondResponse = client().performRequest(secondRequest); assertOK(secondResponse); assertShardChangesResponse( - XContentHelper.convertToMap(JsonXContent.jsonXContent, EntityUtils.toString(secondResponse.getEntity()), false) + XContentHelper.convertToMap(JsonXContent.jsonXContent, EntityUtils.toString(secondResponse.getEntity()), false), + indexName ); } @@ -231,17 +318,36 @@ public void testShardChangesMissingIndex() throws IOException { assertResponseException(ex, RestStatus.BAD_REQUEST, "Failed to process shard changes for index [" + indexName + "]"); } - private static Request bulkRequest(final String indexName, int numberOfDocuments) { + private static Response bulkIndex(final String indexName, int numberOfDocuments) throws IOException { final StringBuilder sb = new StringBuilder(); + long timestamp = System.currentTimeMillis(); for (int i = 0; i < numberOfDocuments; i++) { - sb.append(String.format(Locale.ROOT, "{ \"index\": { \"_id\": \"%d\" } }\n{ \"name\": \"%s\" }\n", i + 1, randomFrom(NAMES))); + sb.append( + String.format( + Locale.ROOT, + BULK_INDEX_TEMPLATE, + Instant.ofEpochMilli(timestamp).atOffset(ZoneOffset.UTC).format(DateTimeFormatter.ISO_OFFSET_DATE_TIME), + randomFrom(NAMES) + ) + ); + timestamp += 1000; // 1 second } final Request request = new Request("POST", bulkEndpoint(indexName)); request.setJsonEntity(sb.toString()); request.addParameter("refresh", "true"); - return request; + return client().performRequest(request); + } + + private Response createDataStream(final String dataStreamName) throws IOException { + return client().performRequest(new Request("PUT", dataStreamEndpoint(dataStreamName))); + } + + private static Response createIndexTemplate(final String templateName, final String mappings) throws IOException { + final Request request = new Request("PUT", indexTemplateEndpoint(templateName)); + request.setJsonEntity(mappings); + return client().performRequest(request); } private static String shardChangesEndpoint(final String indexName) { @@ -252,16 +358,28 @@ private static String bulkEndpoint(final String indexName) { return String.format(Locale.ROOT, BULK_INDEX_ENDPOINT, indexName); } + private static String dataStreamEndpoint(final String dataStreamName) { + return String.format(Locale.ROOT, DATA_STREAM_ENDPOINT, dataStreamName); + } + + private static String indexTemplateEndpoint(final String templateName) { + return String.format(Locale.ROOT, INDEX_TEMPLATE_ENDPOINT, templateName); + } + private void assertResponseException(final ResponseException ex, final RestStatus restStatus, final String error) { assertEquals(restStatus.getStatus(), ex.getResponse().getStatusLine().getStatusCode()); assertThat(ex.getMessage(), Matchers.containsString(error)); } - private void assertShardChangesResponse(final Map shardChangesResponseBody) { + private void assertShardChangesResponse(final Map shardChangesResponseBody, final String indexAbstractionName) { for (final String fieldName : SHARD_RESPONSE_FIELDS) { final Object fieldValue = shardChangesResponseBody.get(fieldName); assertNotNull("Field " + fieldName + " is missing or has a null value.", fieldValue); + if ("index_abstraction".equals(fieldName)) { + assertEquals(indexAbstractionName, fieldValue); + } + if ("operations".equals(fieldName)) { if (fieldValue instanceof List operationsList) { assertFalse("Field 'operations' is empty.", operationsList.isEmpty()); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestShardChangesAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestShardChangesAction.java index 84171ebce162f..4a1d26d05a980 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestShardChangesAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestShardChangesAction.java @@ -10,6 +10,8 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexAbstraction; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; @@ -32,6 +34,7 @@ import java.util.Arrays; import java.util.Comparator; import java.util.List; +import java.util.Locale; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -42,10 +45,14 @@ import static org.elasticsearch.rest.RestRequest.Method.GET; /** - * A REST handler that retrieves shard changes in a specific index whose name is provided as a parameter. - * It handles GET requests to the "/{index}/ccr/shard_changes" endpoint retrieving shard-level changes, - * such as translog operations, mapping version, settings version, aliases version, the global checkpoint, - * maximum sequence number and maximum sequence number of updates or deletes. + * A REST handler that retrieves shard changes in a specific index, data stream or alias whose name is + * provided as a parameter. It handles GET requests to the "/{index}/ccr/shard_changes" endpoint retrieving + * shard-level changes, such as Translog operations, mapping version, settings version, aliases version, + * the global checkpoint, maximum sequence number and maximum sequence number of updates or deletes. + *

+ * In the case of a data stream, the first backing index is considered the target for retrieving shard changes. + * In the case of an alias, the first index that the alias points to is considered the target for retrieving + * shard changes. *

* Note: This handler is only available for snapshot builds. */ @@ -84,32 +91,36 @@ public List routes() { */ @Override protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) throws IOException { - final var indexName = restRequest.param(INDEX_PARAM_NAME); + final var indexAbstractionName = restRequest.param(INDEX_PARAM_NAME); final var fromSeqNo = restRequest.paramAsLong(FROM_SEQ_NO_PARAM_NAME, DEFAULT_FROM_SEQ_NO); final var maxBatchSize = restRequest.paramAsSize(MAX_BATCH_SIZE_PARAM_NAME, DEFAULT_MAX_BATCH_SIZE); final var pollTimeout = restRequest.paramAsTime(POLL_TIMEOUT_PARAM_NAME, DEFAULT_POLL_TIMEOUT); final var maxOperationsCount = restRequest.paramAsInt(MAX_OPERATIONS_COUNT_PARAM_NAME, DEFAULT_MAX_OPERATIONS_COUNT); - final CompletableFuture indexUUIDCompletableFuture = asyncGetIndexUUID( + // NOTE: we first retrieve the concrete index name in case we are dealing with an alias or data stream. + // Then we use the concrete index name to retrieve the index UUID and shard stats. + final CompletableFuture indexNameCompletableFuture = asyncGetIndexName( client, - indexName, + indexAbstractionName, client.threadPool().executor(Ccr.CCR_THREAD_POOL_NAME) ); - final CompletableFuture shardStatsCompletableFuture = asyncShardStats( - client, - indexName, - client.threadPool().executor(Ccr.CCR_THREAD_POOL_NAME) + final CompletableFuture indexUUIDCompletableFuture = indexNameCompletableFuture.thenCompose( + concreteIndexName -> asyncGetIndexUUID(client, concreteIndexName, client.threadPool().executor(Ccr.CCR_THREAD_POOL_NAME)) + ); + final CompletableFuture shardStatsCompletableFuture = indexNameCompletableFuture.thenCompose( + concreteIndexName -> asyncShardStats(client, concreteIndexName, client.threadPool().executor(Ccr.CCR_THREAD_POOL_NAME)) ); return channel -> CompletableFuture.allOf(indexUUIDCompletableFuture, shardStatsCompletableFuture).thenRun(() -> { try { + final String concreteIndexName = indexNameCompletableFuture.get(DEFAULT_TIMEOUT_SECONDS, TimeUnit.SECONDS); final String indexUUID = indexUUIDCompletableFuture.get(DEFAULT_TIMEOUT_SECONDS, TimeUnit.SECONDS); final ShardStats shardStats = shardStatsCompletableFuture.get(DEFAULT_TIMEOUT_SECONDS, TimeUnit.SECONDS); final ShardId shardId = shardStats.getShardRouting().shardId(); final String expectedHistoryUUID = shardStats.getCommitStats().getUserData().get(Engine.HISTORY_UUID_KEY); final ShardChangesAction.Request shardChangesRequest = shardChangesRequest( - indexName, + concreteIndexName, indexUUID, shardId, expectedHistoryUUID, @@ -121,7 +132,12 @@ protected RestChannelConsumer prepareRequest(final RestRequest restRequest, fina client.execute(ShardChangesAction.INSTANCE, shardChangesRequest, new RestActionListener<>(channel) { @Override protected void processResponse(final ShardChangesAction.Response response) { - channel.sendResponse(new RestResponse(RestStatus.OK, shardChangesResponseToXContent(response, indexName, shardId))); + channel.sendResponse( + new RestResponse( + RestStatus.OK, + shardChangesResponseToXContent(response, indexAbstractionName, concreteIndexName, shardId) + ) + ); } }); @@ -132,7 +148,12 @@ protected void processResponse(final ShardChangesAction.Response response) { throw new IllegalStateException("Timeout while waiting for shard stats or index UUID", te); } }).exceptionally(ex -> { - channel.sendResponse(new RestResponse(RestStatus.BAD_REQUEST, "Failed to process shard changes for index [" + indexName + "]")); + channel.sendResponse( + new RestResponse( + RestStatus.BAD_REQUEST, + "Failed to process shard changes for index [" + indexAbstractionName + "] " + ex.getMessage() + ) + ); return null; }); } @@ -175,17 +196,20 @@ private static ShardChangesAction.Request shardChangesRequest( * Converts the response to XContent JSOn format. * * @param response The ShardChangesAction response. - * @param indexName The name of the index. + * @param indexAbstractionName The name of the index abstraction. + * @param concreteIndexName The name of the index. * @param shardId The ShardId. */ private static XContentBuilder shardChangesResponseToXContent( final ShardChangesAction.Response response, - final String indexName, + final String indexAbstractionName, + final String concreteIndexName, final ShardId shardId ) { try (XContentBuilder builder = XContentFactory.jsonBuilder()) { builder.startObject(); - builder.field("index", indexName); + builder.field("index_abstraction", indexAbstractionName); + builder.field("index", concreteIndexName); builder.field("shard_id", shardId); builder.field("mapping_version", response.getMappingVersion()); builder.field("settings_version", response.getSettingsVersion()); @@ -249,26 +273,60 @@ private static CompletableFuture supplyAsyncTask( }, executorService); } + /** + * Asynchronously retrieves the index name for a given index, alias or data stream. + * If the name represents a data stream, the name of the first backing index is returned. + * If the name represents an alias, the name of the first index that the alias points to is returned. + * + * @param client The NodeClient for executing the asynchronous request. + * @param indexAbstractionName The name of the index, alias or data stream. + * @return A CompletableFuture that completes with the retrieved index name. + */ + private static CompletableFuture asyncGetIndexName( + final NodeClient client, + final String indexAbstractionName, + final ExecutorService executorService + ) { + return supplyAsyncTask(() -> { + final ClusterState clusterState = client.admin() + .cluster() + .prepareState(new TimeValue(DEFAULT_TIMEOUT_SECONDS, TimeUnit.SECONDS)) + .get(GET_INDEX_UUID_TIMEOUT) + .getState(); + final IndexAbstraction indexAbstraction = clusterState.metadata().getIndicesLookup().get(indexAbstractionName); + if (indexAbstraction == null) { + throw new IllegalArgumentException( + String.format(Locale.ROOT, "Invalid index or data stream name [%s]", indexAbstractionName) + ); + } + if (indexAbstraction.getType() == IndexAbstraction.Type.DATA_STREAM + || indexAbstraction.getType() == IndexAbstraction.Type.ALIAS) { + return indexAbstraction.getIndices().getFirst().getName(); + } + return indexAbstractionName; + }, executorService, "Error while retrieving index name for index or data stream [" + indexAbstractionName + "]"); + } + /** * Asynchronously retrieves the shard stats for a given index using an executor service. * * @param client The NodeClient for executing the asynchronous request. - * @param indexName The name of the index for which to retrieve shard statistics. + * @param concreteIndexName The name of the index for which to retrieve shard statistics. * @param executorService The executorService service for executing the asynchronous task. * @return A CompletableFuture that completes with the retrieved ShardStats. * @throws ElasticsearchException If an error occurs while retrieving shard statistics. */ private static CompletableFuture asyncShardStats( final NodeClient client, - final String indexName, + final String concreteIndexName, final ExecutorService executorService ) { return supplyAsyncTask( - () -> Arrays.stream(client.admin().indices().prepareStats(indexName).clear().get(SHARD_STATS_TIMEOUT).getShards()) + () -> Arrays.stream(client.admin().indices().prepareStats(concreteIndexName).clear().get(SHARD_STATS_TIMEOUT).getShards()) .max(Comparator.comparingLong(shardStats -> shardStats.getCommitStats().getGeneration())) - .orElseThrow(() -> new ElasticsearchException("Unable to retrieve shard stats for index: " + indexName)), + .orElseThrow(() -> new ElasticsearchException("Unable to retrieve shard stats for index: " + concreteIndexName)), executorService, - "Error while retrieving shard stats for index [" + indexName + "]" + "Error while retrieving shard stats for index [" + concreteIndexName + "]" ); } @@ -276,25 +334,25 @@ private static CompletableFuture asyncShardStats( * Asynchronously retrieves the index UUID for a given index using an executor service. * * @param client The NodeClient for executing the asynchronous request. - * @param indexName The name of the index for which to retrieve the index UUID. + * @param concreteIndexName The name of the index for which to retrieve the index UUID. * @param executorService The executorService service for executing the asynchronous task. * @return A CompletableFuture that completes with the retrieved index UUID. * @throws ElasticsearchException If an error occurs while retrieving the index UUID. */ private static CompletableFuture asyncGetIndexUUID( final NodeClient client, - final String indexName, + final String concreteIndexName, final ExecutorService executorService ) { return supplyAsyncTask( () -> client.admin() .indices() .prepareGetIndex() - .setIndices(indexName) + .setIndices(concreteIndexName) .get(GET_INDEX_UUID_TIMEOUT) - .getSetting(indexName, IndexMetadata.SETTING_INDEX_UUID), + .getSetting(concreteIndexName, IndexMetadata.SETTING_INDEX_UUID), executorService, - "Error while retrieving index UUID for index [" + indexName + "]" + "Error while retrieving index UUID for index [" + concreteIndexName + "]" ); } }