From 3590be7cbd95c207905197dec4d4e40f08a69434 Mon Sep 17 00:00:00 2001 From: Kostas Krikellas <131142368+kkrik-es@users.noreply.github.com> Date: Wed, 11 Dec 2024 13:34:44 +0200 Subject: [PATCH] Split request processing for routing to pre- and post-processing. (#118420) --- .../elasticsearch/action/DocWriteRequest.java | 9 +++- .../action/bulk/BulkOperation.java | 3 +- .../action/delete/DeleteRequest.java | 5 -- .../action/index/IndexRequest.java | 11 +++-- .../action/update/UpdateRequest.java | 5 -- .../cluster/routing/IndexRouting.java | 49 ++++++++----------- .../cluster/routing/IndexRoutingTests.java | 22 ++++----- .../ESIndexLevelReplicationTestCase.java | 2 +- .../test/InternalTestCluster.java | 2 +- 9 files changed, 50 insertions(+), 58 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java b/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java index 0b4c4dbb1fca6..292f962869558 100644 --- a/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java +++ b/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java @@ -160,9 +160,14 @@ public interface DocWriteRequest extends IndicesRequest, Accountable { boolean isRequireDataStream(); /** - * Finalize the request before executing or routing it. + * Finalize the request before routing it. */ - void process(IndexRouting indexRouting); + default void preRoutingProcess(IndexRouting indexRouting) {} + + /** + * Finalize the request after routing it. + */ + default void postRoutingProcess(IndexRouting indexRouting) {} /** * Pick the appropriate shard id to receive this request. diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java index ad1fda2534fab..4df228240add5 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java @@ -314,8 +314,9 @@ private Map> groupRequestsByShards( continue; } IndexRouting indexRouting = concreteIndices.routing(concreteIndex); - docWriteRequest.process(indexRouting); + docWriteRequest.preRoutingProcess(indexRouting); int shardId = docWriteRequest.route(indexRouting); + docWriteRequest.postRoutingProcess(indexRouting); List shardRequests = requestsByShard.computeIfAbsent( new ShardId(concreteIndex, shardId), shard -> new ArrayList<>() diff --git a/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java b/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java index 631336e080a6a..1a5495412f605 100644 --- a/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java +++ b/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java @@ -237,11 +237,6 @@ public boolean isRequireDataStream() { return false; } - @Override - public void process(IndexRouting indexRouting) { - // Nothing to do - } - @Override public int route(IndexRouting indexRouting) { return indexRouting.deleteShard(id, routing); diff --git a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java index 5254c6fd06db7..d5b8b657bd14e 100644 --- a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -685,8 +685,13 @@ public VersionType versionType() { } @Override - public void process(IndexRouting indexRouting) { - indexRouting.process(this); + public void preRoutingProcess(IndexRouting indexRouting) { + indexRouting.preProcess(this); + } + + @Override + public void postRoutingProcess(IndexRouting indexRouting) { + indexRouting.postProcess(this); } /** @@ -885,7 +890,7 @@ public Index getConcreteWriteIndex(IndexAbstraction ia, Metadata metadata) { @Override public int route(IndexRouting indexRouting) { - return indexRouting.indexShard(id, routing, contentType, source, this::routing); + return indexRouting.indexShard(id, routing, contentType, source); } public IndexRequest setRequireAlias(boolean requireAlias) { diff --git a/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java b/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java index 91e21eb9e80a3..657ad029626af 100644 --- a/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java +++ b/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java @@ -683,11 +683,6 @@ public boolean isRequireDataStream() { return false; } - @Override - public void process(IndexRouting indexRouting) { - // Nothing to do - } - @Override public int route(IndexRouting indexRouting) { return indexRouting.updateShard(id, routing); diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/IndexRouting.java b/server/src/main/java/org/elasticsearch/cluster/routing/IndexRouting.java index be0e3429a2ce4..d9343909f779f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/IndexRouting.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/IndexRouting.java @@ -41,7 +41,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.function.Consumer; import java.util.function.IntConsumer; import java.util.function.IntSupplier; import java.util.function.Predicate; @@ -80,19 +79,21 @@ private IndexRouting(IndexMetadata metadata) { this.routingFactor = metadata.getRoutingFactor(); } - public abstract void process(IndexRequest indexRequest); + /** + * Finalize the request before routing, with data needed for routing decisions. + */ + public void preProcess(IndexRequest indexRequest) {} + + /** + * Finalize the request after routing, incorporating data produced by the routing logic. + */ + public void postProcess(IndexRequest indexRequest) {} /** * Called when indexing a document to generate the shard id that should contain * a document with the provided parameters. */ - public abstract int indexShard( - String id, - @Nullable String routing, - XContentType sourceType, - BytesReference source, - Consumer routingHashSetter - ); + public abstract int indexShard(String id, @Nullable String routing, XContentType sourceType, BytesReference source); /** * Called when updating a document to generate the shard id that should contain @@ -163,7 +164,7 @@ private abstract static class IdAndRoutingOnly extends IndexRouting { protected abstract int shardId(String id, @Nullable String routing); @Override - public void process(IndexRequest indexRequest) { + public void preProcess(IndexRequest indexRequest) { // generate id if not already provided final String id = indexRequest.id(); if (id == null) { @@ -187,13 +188,7 @@ private static boolean isNewIndexVersion(final IndexVersion creationVersion) { } @Override - public int indexShard( - String id, - @Nullable String routing, - XContentType sourceType, - BytesReference source, - Consumer routingHashSetter - ) { + public int indexShard(String id, @Nullable String routing, XContentType sourceType, BytesReference source) { if (id == null) { throw new IllegalStateException("id is required and should have been set by process"); } @@ -278,6 +273,7 @@ public static class ExtractFromSource extends IndexRouting { private final Predicate isRoutingPath; private final XContentParserConfiguration parserConfig; private final boolean trackTimeSeriesRoutingHash; + private int hash = Integer.MAX_VALUE; ExtractFromSource(IndexMetadata metadata) { super(metadata); @@ -295,22 +291,17 @@ public boolean matchesField(String fieldName) { } @Override - public void process(IndexRequest indexRequest) {} + public void postProcess(IndexRequest indexRequest) { + if (trackTimeSeriesRoutingHash) { + indexRequest.routing(TimeSeriesRoutingHashFieldMapper.encode(hash)); + } + } @Override - public int indexShard( - String id, - @Nullable String routing, - XContentType sourceType, - BytesReference source, - Consumer routingHashSetter - ) { + public int indexShard(String id, @Nullable String routing, XContentType sourceType, BytesReference source) { assert Transports.assertNotTransportThread("parsing the _source can get slow"); checkNoRouting(routing); - int hash = hashSource(sourceType, source).buildHash(IndexRouting.ExtractFromSource::defaultOnEmpty); - if (trackTimeSeriesRoutingHash) { - routingHashSetter.accept(TimeSeriesRoutingHashFieldMapper.encode(hash)); - } + hash = hashSource(sourceType, source).buildHash(IndexRouting.ExtractFromSource::defaultOnEmpty); return hashToShardId(hash); } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/IndexRoutingTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/IndexRoutingTests.java index e39ccdf7af5e2..943fb6fd63b0b 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/IndexRoutingTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/IndexRoutingTests.java @@ -48,7 +48,7 @@ public void testSimpleRoutingRejectsEmptyId() { IndexMetadata.builder("test").settings(settings(IndexVersion.current())).numberOfShards(2).numberOfReplicas(1).build() ); IndexRequest req = new IndexRequest().id(""); - Exception e = expectThrows(IllegalArgumentException.class, () -> indexRouting.process(req)); + Exception e = expectThrows(IllegalArgumentException.class, () -> indexRouting.preProcess(req)); assertThat(e.getMessage(), equalTo("if _id is specified it must not be empty")); } @@ -58,7 +58,7 @@ public void testSimpleRoutingAcceptsId() { ); String id = randomAlphaOfLength(10); IndexRequest req = new IndexRequest().id(id); - indexRouting.process(req); + indexRouting.preProcess(req); assertThat(req.id(), equalTo(id)); assertThat(req.getAutoGeneratedTimestamp(), equalTo(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP)); } @@ -68,7 +68,7 @@ public void testSimpleRoutingAssignedRandomId() { IndexMetadata.builder("test").settings(settings(IndexVersion.current())).numberOfShards(2).numberOfReplicas(1).build() ); IndexRequest req = new IndexRequest(); - indexRouting.process(req); + indexRouting.preProcess(req); assertThat(req.id(), not(nullValue())); assertThat(req.getAutoGeneratedTimestamp(), not(equalTo(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP))); } @@ -458,7 +458,7 @@ public void testRequiredRouting() { */ private int shardIdFromSimple(IndexRouting indexRouting, String id, @Nullable String routing) { return switch (between(0, 3)) { - case 0 -> indexRouting.indexShard(id, routing, null, null, null); + case 0 -> indexRouting.indexShard(id, routing, null, null); case 1 -> indexRouting.updateShard(id, routing); case 2 -> indexRouting.deleteShard(id, routing); case 3 -> indexRouting.getShard(id, routing); @@ -470,7 +470,7 @@ public void testRoutingAllowsId() { IndexRouting indexRouting = indexRoutingForPath(between(1, 5), randomAlphaOfLength(5)); String id = randomAlphaOfLength(5); IndexRequest req = new IndexRequest().id(id); - indexRouting.process(req); + indexRouting.preProcess(req); assertThat(req.id(), equalTo(id)); } @@ -483,7 +483,7 @@ public void testRoutingAllowsId() { public void testRoutingPathLeavesIdNull() { IndexRouting indexRouting = indexRoutingForPath(between(1, 5), randomAlphaOfLength(5)); IndexRequest req = new IndexRequest(); - indexRouting.process(req); + indexRouting.preProcess(req); assertThat(req.id(), nullValue()); } @@ -491,7 +491,7 @@ public void testRoutingPathEmptySource() throws IOException { IndexRouting routing = indexRoutingForPath(between(1, 5), randomAlphaOfLength(5)); Exception e = expectThrows( IllegalArgumentException.class, - () -> routing.indexShard(randomAlphaOfLength(5), null, XContentType.JSON, source(Map.of()), null) + () -> routing.indexShard(randomAlphaOfLength(5), null, XContentType.JSON, source(Map.of())) ); assertThat(e.getMessage(), equalTo("Error extracting routing: source didn't contain any routing fields")); } @@ -500,7 +500,7 @@ public void testRoutingPathMismatchSource() throws IOException { IndexRouting routing = indexRoutingForPath(between(1, 5), "foo"); Exception e = expectThrows( IllegalArgumentException.class, - () -> routing.indexShard(randomAlphaOfLength(5), null, XContentType.JSON, source(Map.of("bar", "dog")), null) + () -> routing.indexShard(randomAlphaOfLength(5), null, XContentType.JSON, source(Map.of("bar", "dog"))) ); assertThat(e.getMessage(), equalTo("Error extracting routing: source didn't contain any routing fields")); } @@ -521,7 +521,7 @@ public void testRoutingIndexWithRouting() throws IOException { String docRouting = randomAlphaOfLength(5); Exception e = expectThrows( IllegalArgumentException.class, - () -> indexRouting.indexShard(randomAlphaOfLength(5), docRouting, XContentType.JSON, source, null) + () -> indexRouting.indexShard(randomAlphaOfLength(5), docRouting, XContentType.JSON, source) ); assertThat( e.getMessage(), @@ -615,7 +615,7 @@ public void testRoutingPathObjectArraysInSource() throws IOException { BytesReference source = source(Map.of("a", List.of("foo", Map.of("foo", "bar")))); Exception e = expectThrows( IllegalArgumentException.class, - () -> routing.indexShard(randomAlphaOfLength(5), null, XContentType.JSON, source, s -> {}) + () -> routing.indexShard(randomAlphaOfLength(5), null, XContentType.JSON, source) ); assertThat( e.getMessage(), @@ -683,7 +683,7 @@ private IndexRouting indexRoutingForPath(IndexVersion createdVersion, int shards private void assertIndexShard(IndexRouting routing, Map source, int expectedShard) throws IOException { byte[] suffix = randomSuffix(); BytesReference sourceBytes = source(source); - assertThat(routing.indexShard(randomAlphaOfLength(5), null, XContentType.JSON, sourceBytes, s -> {}), equalTo(expectedShard)); + assertThat(routing.indexShard(randomAlphaOfLength(5), null, XContentType.JSON, sourceBytes), equalTo(expectedShard)); IndexRouting.ExtractFromSource r = (IndexRouting.ExtractFromSource) routing; String idFromSource = r.createId(XContentType.JSON, sourceBytes, suffix); assertThat(shardIdForReadFromSourceExtracting(routing, idFromSource), equalTo(expectedShard)); diff --git a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index c61b188441a55..68976155a519a 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -870,7 +870,7 @@ private void executeShardBulkOnPrimary( ) { for (BulkItemRequest itemRequest : request.items()) { if (itemRequest.request() instanceof IndexRequest) { - ((IndexRequest) itemRequest.request()).process(primary.indexSettings().getIndexRouting()); + itemRequest.request().preRoutingProcess(primary.indexSettings().getIndexRouting()); } } final PlainActionFuture permitAcquiredFuture = new PlainActionFuture<>(); diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 6d46605e201f9..1f7a17e43c214 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -2350,7 +2350,7 @@ synchronized String routingKeyForShard(Index index, int shard, Random random) { IndexRouting indexRouting = IndexRouting.fromIndexMetadata(clusterState.metadata().getIndexSafe(index)); while (true) { String routing = RandomStrings.randomAsciiLettersOfLength(random, 10); - if (shard == indexRouting.indexShard("id", routing, null, null, null)) { + if (shard == indexRouting.indexShard("id", routing, null, null)) { return routing; } }