Skip to content

Commit

Permalink
Split request processing for routing to pre- and post-processing. (el…
Browse files Browse the repository at this point in the history
  • Loading branch information
kkrik-es authored Dec 11, 2024
1 parent 2d66d25 commit 3590be7
Show file tree
Hide file tree
Showing 9 changed files with 50 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,14 @@ public interface DocWriteRequest<T> extends IndicesRequest, Accountable {
boolean isRequireDataStream();

/**
* Finalize the request before executing or routing it.
* Finalize the request before routing it.
*/
void process(IndexRouting indexRouting);
default void preRoutingProcess(IndexRouting indexRouting) {}

/**
* Finalize the request after routing it.
*/
default void postRoutingProcess(IndexRouting indexRouting) {}

/**
* Pick the appropriate shard id to receive this request.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,8 +314,9 @@ private Map<ShardId, List<BulkItemRequest>> groupRequestsByShards(
continue;
}
IndexRouting indexRouting = concreteIndices.routing(concreteIndex);
docWriteRequest.process(indexRouting);
docWriteRequest.preRoutingProcess(indexRouting);
int shardId = docWriteRequest.route(indexRouting);
docWriteRequest.postRoutingProcess(indexRouting);
List<BulkItemRequest> shardRequests = requestsByShard.computeIfAbsent(
new ShardId(concreteIndex, shardId),
shard -> new ArrayList<>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,11 +237,6 @@ public boolean isRequireDataStream() {
return false;
}

@Override
public void process(IndexRouting indexRouting) {
// Nothing to do
}

@Override
public int route(IndexRouting indexRouting) {
return indexRouting.deleteShard(id, routing);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -685,8 +685,13 @@ public VersionType versionType() {
}

@Override
public void process(IndexRouting indexRouting) {
indexRouting.process(this);
public void preRoutingProcess(IndexRouting indexRouting) {
indexRouting.preProcess(this);
}

@Override
public void postRoutingProcess(IndexRouting indexRouting) {
indexRouting.postProcess(this);
}

/**
Expand Down Expand Up @@ -885,7 +890,7 @@ public Index getConcreteWriteIndex(IndexAbstraction ia, Metadata metadata) {

@Override
public int route(IndexRouting indexRouting) {
return indexRouting.indexShard(id, routing, contentType, source, this::routing);
return indexRouting.indexShard(id, routing, contentType, source);
}

public IndexRequest setRequireAlias(boolean requireAlias) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -683,11 +683,6 @@ public boolean isRequireDataStream() {
return false;
}

@Override
public void process(IndexRouting indexRouting) {
// Nothing to do
}

@Override
public int route(IndexRouting indexRouting) {
return indexRouting.updateShard(id, routing);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.IntConsumer;
import java.util.function.IntSupplier;
import java.util.function.Predicate;
Expand Down Expand Up @@ -80,19 +79,21 @@ private IndexRouting(IndexMetadata metadata) {
this.routingFactor = metadata.getRoutingFactor();
}

public abstract void process(IndexRequest indexRequest);
/**
* Finalize the request before routing, with data needed for routing decisions.
*/
public void preProcess(IndexRequest indexRequest) {}

/**
* Finalize the request after routing, incorporating data produced by the routing logic.
*/
public void postProcess(IndexRequest indexRequest) {}

/**
* Called when indexing a document to generate the shard id that should contain
* a document with the provided parameters.
*/
public abstract int indexShard(
String id,
@Nullable String routing,
XContentType sourceType,
BytesReference source,
Consumer<String> routingHashSetter
);
public abstract int indexShard(String id, @Nullable String routing, XContentType sourceType, BytesReference source);

/**
* Called when updating a document to generate the shard id that should contain
Expand Down Expand Up @@ -163,7 +164,7 @@ private abstract static class IdAndRoutingOnly extends IndexRouting {
protected abstract int shardId(String id, @Nullable String routing);

@Override
public void process(IndexRequest indexRequest) {
public void preProcess(IndexRequest indexRequest) {
// generate id if not already provided
final String id = indexRequest.id();
if (id == null) {
Expand All @@ -187,13 +188,7 @@ private static boolean isNewIndexVersion(final IndexVersion creationVersion) {
}

@Override
public int indexShard(
String id,
@Nullable String routing,
XContentType sourceType,
BytesReference source,
Consumer<String> routingHashSetter
) {
public int indexShard(String id, @Nullable String routing, XContentType sourceType, BytesReference source) {
if (id == null) {
throw new IllegalStateException("id is required and should have been set by process");
}
Expand Down Expand Up @@ -278,6 +273,7 @@ public static class ExtractFromSource extends IndexRouting {
private final Predicate<String> isRoutingPath;
private final XContentParserConfiguration parserConfig;
private final boolean trackTimeSeriesRoutingHash;
private int hash = Integer.MAX_VALUE;

ExtractFromSource(IndexMetadata metadata) {
super(metadata);
Expand All @@ -295,22 +291,17 @@ public boolean matchesField(String fieldName) {
}

@Override
public void process(IndexRequest indexRequest) {}
public void postProcess(IndexRequest indexRequest) {
if (trackTimeSeriesRoutingHash) {
indexRequest.routing(TimeSeriesRoutingHashFieldMapper.encode(hash));
}
}

@Override
public int indexShard(
String id,
@Nullable String routing,
XContentType sourceType,
BytesReference source,
Consumer<String> routingHashSetter
) {
public int indexShard(String id, @Nullable String routing, XContentType sourceType, BytesReference source) {
assert Transports.assertNotTransportThread("parsing the _source can get slow");
checkNoRouting(routing);
int hash = hashSource(sourceType, source).buildHash(IndexRouting.ExtractFromSource::defaultOnEmpty);
if (trackTimeSeriesRoutingHash) {
routingHashSetter.accept(TimeSeriesRoutingHashFieldMapper.encode(hash));
}
hash = hashSource(sourceType, source).buildHash(IndexRouting.ExtractFromSource::defaultOnEmpty);
return hashToShardId(hash);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public void testSimpleRoutingRejectsEmptyId() {
IndexMetadata.builder("test").settings(settings(IndexVersion.current())).numberOfShards(2).numberOfReplicas(1).build()
);
IndexRequest req = new IndexRequest().id("");
Exception e = expectThrows(IllegalArgumentException.class, () -> indexRouting.process(req));
Exception e = expectThrows(IllegalArgumentException.class, () -> indexRouting.preProcess(req));
assertThat(e.getMessage(), equalTo("if _id is specified it must not be empty"));
}

Expand All @@ -58,7 +58,7 @@ public void testSimpleRoutingAcceptsId() {
);
String id = randomAlphaOfLength(10);
IndexRequest req = new IndexRequest().id(id);
indexRouting.process(req);
indexRouting.preProcess(req);
assertThat(req.id(), equalTo(id));
assertThat(req.getAutoGeneratedTimestamp(), equalTo(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP));
}
Expand All @@ -68,7 +68,7 @@ public void testSimpleRoutingAssignedRandomId() {
IndexMetadata.builder("test").settings(settings(IndexVersion.current())).numberOfShards(2).numberOfReplicas(1).build()
);
IndexRequest req = new IndexRequest();
indexRouting.process(req);
indexRouting.preProcess(req);
assertThat(req.id(), not(nullValue()));
assertThat(req.getAutoGeneratedTimestamp(), not(equalTo(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP)));
}
Expand Down Expand Up @@ -458,7 +458,7 @@ public void testRequiredRouting() {
*/
private int shardIdFromSimple(IndexRouting indexRouting, String id, @Nullable String routing) {
return switch (between(0, 3)) {
case 0 -> indexRouting.indexShard(id, routing, null, null, null);
case 0 -> indexRouting.indexShard(id, routing, null, null);
case 1 -> indexRouting.updateShard(id, routing);
case 2 -> indexRouting.deleteShard(id, routing);
case 3 -> indexRouting.getShard(id, routing);
Expand All @@ -470,7 +470,7 @@ public void testRoutingAllowsId() {
IndexRouting indexRouting = indexRoutingForPath(between(1, 5), randomAlphaOfLength(5));
String id = randomAlphaOfLength(5);
IndexRequest req = new IndexRequest().id(id);
indexRouting.process(req);
indexRouting.preProcess(req);
assertThat(req.id(), equalTo(id));
}

Expand All @@ -483,15 +483,15 @@ public void testRoutingAllowsId() {
public void testRoutingPathLeavesIdNull() {
IndexRouting indexRouting = indexRoutingForPath(between(1, 5), randomAlphaOfLength(5));
IndexRequest req = new IndexRequest();
indexRouting.process(req);
indexRouting.preProcess(req);
assertThat(req.id(), nullValue());
}

public void testRoutingPathEmptySource() throws IOException {
IndexRouting routing = indexRoutingForPath(between(1, 5), randomAlphaOfLength(5));
Exception e = expectThrows(
IllegalArgumentException.class,
() -> routing.indexShard(randomAlphaOfLength(5), null, XContentType.JSON, source(Map.of()), null)
() -> routing.indexShard(randomAlphaOfLength(5), null, XContentType.JSON, source(Map.of()))
);
assertThat(e.getMessage(), equalTo("Error extracting routing: source didn't contain any routing fields"));
}
Expand All @@ -500,7 +500,7 @@ public void testRoutingPathMismatchSource() throws IOException {
IndexRouting routing = indexRoutingForPath(between(1, 5), "foo");
Exception e = expectThrows(
IllegalArgumentException.class,
() -> routing.indexShard(randomAlphaOfLength(5), null, XContentType.JSON, source(Map.of("bar", "dog")), null)
() -> routing.indexShard(randomAlphaOfLength(5), null, XContentType.JSON, source(Map.of("bar", "dog")))
);
assertThat(e.getMessage(), equalTo("Error extracting routing: source didn't contain any routing fields"));
}
Expand All @@ -521,7 +521,7 @@ public void testRoutingIndexWithRouting() throws IOException {
String docRouting = randomAlphaOfLength(5);
Exception e = expectThrows(
IllegalArgumentException.class,
() -> indexRouting.indexShard(randomAlphaOfLength(5), docRouting, XContentType.JSON, source, null)
() -> indexRouting.indexShard(randomAlphaOfLength(5), docRouting, XContentType.JSON, source)
);
assertThat(
e.getMessage(),
Expand Down Expand Up @@ -615,7 +615,7 @@ public void testRoutingPathObjectArraysInSource() throws IOException {
BytesReference source = source(Map.of("a", List.of("foo", Map.of("foo", "bar"))));
Exception e = expectThrows(
IllegalArgumentException.class,
() -> routing.indexShard(randomAlphaOfLength(5), null, XContentType.JSON, source, s -> {})
() -> routing.indexShard(randomAlphaOfLength(5), null, XContentType.JSON, source)
);
assertThat(
e.getMessage(),
Expand Down Expand Up @@ -683,7 +683,7 @@ private IndexRouting indexRoutingForPath(IndexVersion createdVersion, int shards
private void assertIndexShard(IndexRouting routing, Map<String, Object> source, int expectedShard) throws IOException {
byte[] suffix = randomSuffix();
BytesReference sourceBytes = source(source);
assertThat(routing.indexShard(randomAlphaOfLength(5), null, XContentType.JSON, sourceBytes, s -> {}), equalTo(expectedShard));
assertThat(routing.indexShard(randomAlphaOfLength(5), null, XContentType.JSON, sourceBytes), equalTo(expectedShard));
IndexRouting.ExtractFromSource r = (IndexRouting.ExtractFromSource) routing;
String idFromSource = r.createId(XContentType.JSON, sourceBytes, suffix);
assertThat(shardIdForReadFromSourceExtracting(routing, idFromSource), equalTo(expectedShard));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -870,7 +870,7 @@ private void executeShardBulkOnPrimary(
) {
for (BulkItemRequest itemRequest : request.items()) {
if (itemRequest.request() instanceof IndexRequest) {
((IndexRequest) itemRequest.request()).process(primary.indexSettings().getIndexRouting());
itemRequest.request().preRoutingProcess(primary.indexSettings().getIndexRouting());
}
}
final PlainActionFuture<Releasable> permitAcquiredFuture = new PlainActionFuture<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2350,7 +2350,7 @@ synchronized String routingKeyForShard(Index index, int shard, Random random) {
IndexRouting indexRouting = IndexRouting.fromIndexMetadata(clusterState.metadata().getIndexSafe(index));
while (true) {
String routing = RandomStrings.randomAsciiLettersOfLength(random, 10);
if (shard == indexRouting.indexShard("id", routing, null, null, null)) {
if (shard == indexRouting.indexShard("id", routing, null, null)) {
return routing;
}
}
Expand Down

0 comments on commit 3590be7

Please sign in to comment.