Skip to content

Commit

Permalink
Merge branch 'main' into fix-lookupjoin-wildcard-fieldcaps
Browse files Browse the repository at this point in the history
  • Loading branch information
craigtaverner authored Dec 11, 2024
2 parents 85d3362 + a054bbc commit e3ae17a
Show file tree
Hide file tree
Showing 31 changed files with 158 additions and 325 deletions.
2 changes: 2 additions & 0 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,8 @@ tests:
- class: org.elasticsearch.xpack.security.operator.OperatorPrivilegesIT
method: testEveryActionIsEitherOperatorOnlyOrNonOperator
issue: https://github.com/elastic/elasticsearch/issues/118220
- class: org.elasticsearch.xpack.esql.action.EsqlActionBreakerIT
issue: https://github.com/elastic/elasticsearch/issues/118238

# Examples:
#
Expand Down
8 changes: 1 addition & 7 deletions plugins/discovery-ec2/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
apply plugin: 'elasticsearch.legacy-yaml-rest-test'
apply plugin: 'elasticsearch.internal-cluster-test'
apply plugin: 'elasticsearch.internal-java-rest-test'

esplugin {
description 'The EC2 discovery plugin allows to use AWS API for the unicast discovery mechanism.'
Expand All @@ -29,12 +29,6 @@ dependencies {
api "joda-time:joda-time:2.10.10"
}

restResources {
restApi {
include '_common', 'cluster', 'nodes'
}
}

tasks.named("dependencyLicenses").configure {
mapping from: /aws-java-sdk-.*/, to: 'aws-java-sdk'
mapping from: /jackson-.*/, to: 'jackson'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.discovery.ec2;

import org.elasticsearch.client.Request;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.junit.ClassRule;

import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;

import static org.hamcrest.Matchers.hasItem;

public class DiscoveryEc2PluginLoadedIT extends ESRestTestCase {

@ClassRule
public static ElasticsearchCluster cluster = ElasticsearchCluster.local().plugin("discovery-ec2").build();

@Override
protected String getTestRestCluster() {
return cluster.getHttpAddresses();
}

public void testPluginLoaded() throws IOException {
final var nodesInfoResponse = assertOKAndCreateObjectPath(client().performRequest(new Request("GET", "/_nodes/plugins")));
for (final var nodeId : nodesInfoResponse.evaluateMapKeys("nodes")) {
final var pluginCount = asInstanceOf(List.class, nodesInfoResponse.evaluateExact("nodes", nodeId, "plugins")).size();
final var pluginNames = new HashSet<String>();
for (int i = 0; i < pluginCount; i++) {
pluginNames.add(
Objects.requireNonNull(nodesInfoResponse.evaluateExact("nodes", nodeId, "plugins", Integer.toString(i), "name"))
);
}
assertThat(pluginNames, hasItem("discovery-ec2"));
}
}

}

This file was deleted.

This file was deleted.

2 changes: 0 additions & 2 deletions server/src/main/java/org/elasticsearch/TransportVersions.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@ static TransportVersion def(int id) {
@UpdateForV9(owner = UpdateForV9.Owner.CORE_INFRA) // remove the transport versions with which v9 will not need to interact
public static final TransportVersion ZERO = def(0);
public static final TransportVersion V_7_0_0 = def(7_00_00_99);
public static final TransportVersion V_7_2_0 = def(7_02_00_99);
public static final TransportVersion V_7_2_1 = def(7_02_01_99);
public static final TransportVersion V_7_3_0 = def(7_03_00_99);
public static final TransportVersion V_7_4_0 = def(7_04_00_99);
public static final TransportVersion V_7_5_0 = def(7_05_00_99);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,14 @@ public interface DocWriteRequest<T> extends IndicesRequest, Accountable {
boolean isRequireDataStream();

/**
* Finalize the request before executing or routing it.
* Finalize the request before routing it.
*/
void process(IndexRouting indexRouting);
default void preRoutingProcess(IndexRouting indexRouting) {}

/**
* Finalize the request after routing it.
*/
default void postRoutingProcess(IndexRouting indexRouting) {}

/**
* Pick the appropriate shard id to receive this request.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,8 +314,9 @@ private Map<ShardId, List<BulkItemRequest>> groupRequestsByShards(
continue;
}
IndexRouting indexRouting = concreteIndices.routing(concreteIndex);
docWriteRequest.process(indexRouting);
docWriteRequest.preRoutingProcess(indexRouting);
int shardId = docWriteRequest.route(indexRouting);
docWriteRequest.postRoutingProcess(indexRouting);
List<BulkItemRequest> shardRequests = requestsByShard.computeIfAbsent(
new ShardId(concreteIndex, shardId),
shard -> new ArrayList<>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,11 +237,6 @@ public boolean isRequireDataStream() {
return false;
}

@Override
public void process(IndexRouting indexRouting) {
// Nothing to do
}

@Override
public int route(IndexRouting indexRouting) {
return indexRouting.deleteShard(id, routing);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -685,8 +685,13 @@ public VersionType versionType() {
}

@Override
public void process(IndexRouting indexRouting) {
indexRouting.process(this);
public void preRoutingProcess(IndexRouting indexRouting) {
indexRouting.preProcess(this);
}

@Override
public void postRoutingProcess(IndexRouting indexRouting) {
indexRouting.postProcess(this);
}

/**
Expand Down Expand Up @@ -885,7 +890,7 @@ public Index getConcreteWriteIndex(IndexAbstraction ia, Metadata metadata) {

@Override
public int route(IndexRouting indexRouting) {
return indexRouting.indexShard(id, routing, contentType, source, this::routing);
return indexRouting.indexShard(id, routing, contentType, source);
}

public IndexRequest setRequireAlias(boolean requireAlias) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -683,11 +683,6 @@ public boolean isRequireDataStream() {
return false;
}

@Override
public void process(IndexRouting indexRouting) {
// Nothing to do
}

@Override
public int route(IndexRouting indexRouting) {
return indexRouting.updateShard(id, routing);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1618,11 +1618,7 @@ private static class IndexMetadataDiff implements Diff<IndexMetadata> {
version = in.readLong();
mappingVersion = in.readVLong();
settingsVersion = in.readVLong();
if (in.getTransportVersion().onOrAfter(TransportVersions.V_7_2_0)) {
aliasesVersion = in.readVLong();
} else {
aliasesVersion = 1;
}
aliasesVersion = in.readVLong();
state = State.fromId(in.readByte());
if (in.getTransportVersion().onOrAfter(SETTING_DIFF_VERSION)) {
settings = null;
Expand Down Expand Up @@ -1688,9 +1684,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeLong(version);
out.writeVLong(mappingVersion);
out.writeVLong(settingsVersion);
if (out.getTransportVersion().onOrAfter(TransportVersions.V_7_2_0)) {
out.writeVLong(aliasesVersion);
}
out.writeVLong(aliasesVersion);
out.writeByte(state.id);
assert settings != null
: "settings should always be non-null since this instance is not expected to have been read from another node";
Expand Down Expand Up @@ -1776,9 +1770,7 @@ public static IndexMetadata readFrom(StreamInput in, @Nullable Function<String,
builder.version(in.readLong());
builder.mappingVersion(in.readVLong());
builder.settingsVersion(in.readVLong());
if (in.getTransportVersion().onOrAfter(TransportVersions.V_7_2_0)) {
builder.aliasesVersion(in.readVLong());
}
builder.aliasesVersion(in.readVLong());
builder.setRoutingNumShards(in.readInt());
builder.state(State.fromId(in.readByte()));
builder.settings(readSettingsFromStream(in));
Expand Down Expand Up @@ -1848,9 +1840,7 @@ public void writeTo(StreamOutput out, boolean mappingsAsHash) throws IOException
out.writeLong(version);
out.writeVLong(mappingVersion);
out.writeVLong(settingsVersion);
if (out.getTransportVersion().onOrAfter(TransportVersions.V_7_2_0)) {
out.writeVLong(aliasesVersion);
}
out.writeVLong(aliasesVersion);
out.writeInt(routingNumShards);
out.writeByte(state.id());
settings.writeTo(out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.IntConsumer;
import java.util.function.IntSupplier;
import java.util.function.Predicate;
Expand Down Expand Up @@ -80,19 +79,21 @@ private IndexRouting(IndexMetadata metadata) {
this.routingFactor = metadata.getRoutingFactor();
}

public abstract void process(IndexRequest indexRequest);
/**
* Finalize the request before routing, with data needed for routing decisions.
*/
public void preProcess(IndexRequest indexRequest) {}

/**
* Finalize the request after routing, incorporating data produced by the routing logic.
*/
public void postProcess(IndexRequest indexRequest) {}

/**
* Called when indexing a document to generate the shard id that should contain
* a document with the provided parameters.
*/
public abstract int indexShard(
String id,
@Nullable String routing,
XContentType sourceType,
BytesReference source,
Consumer<String> routingHashSetter
);
public abstract int indexShard(String id, @Nullable String routing, XContentType sourceType, BytesReference source);

/**
* Called when updating a document to generate the shard id that should contain
Expand Down Expand Up @@ -163,7 +164,7 @@ private abstract static class IdAndRoutingOnly extends IndexRouting {
protected abstract int shardId(String id, @Nullable String routing);

@Override
public void process(IndexRequest indexRequest) {
public void preProcess(IndexRequest indexRequest) {
// generate id if not already provided
final String id = indexRequest.id();
if (id == null) {
Expand All @@ -187,13 +188,7 @@ private static boolean isNewIndexVersion(final IndexVersion creationVersion) {
}

@Override
public int indexShard(
String id,
@Nullable String routing,
XContentType sourceType,
BytesReference source,
Consumer<String> routingHashSetter
) {
public int indexShard(String id, @Nullable String routing, XContentType sourceType, BytesReference source) {
if (id == null) {
throw new IllegalStateException("id is required and should have been set by process");
}
Expand Down Expand Up @@ -278,6 +273,7 @@ public static class ExtractFromSource extends IndexRouting {
private final Predicate<String> isRoutingPath;
private final XContentParserConfiguration parserConfig;
private final boolean trackTimeSeriesRoutingHash;
private int hash = Integer.MAX_VALUE;

ExtractFromSource(IndexMetadata metadata) {
super(metadata);
Expand All @@ -295,22 +291,17 @@ public boolean matchesField(String fieldName) {
}

@Override
public void process(IndexRequest indexRequest) {}
public void postProcess(IndexRequest indexRequest) {
if (trackTimeSeriesRoutingHash) {
indexRequest.routing(TimeSeriesRoutingHashFieldMapper.encode(hash));
}
}

@Override
public int indexShard(
String id,
@Nullable String routing,
XContentType sourceType,
BytesReference source,
Consumer<String> routingHashSetter
) {
public int indexShard(String id, @Nullable String routing, XContentType sourceType, BytesReference source) {
assert Transports.assertNotTransportThread("parsing the _source can get slow");
checkNoRouting(routing);
int hash = hashSource(sourceType, source).buildHash(IndexRouting.ExtractFromSource::defaultOnEmpty);
if (trackTimeSeriesRoutingHash) {
routingHashSetter.accept(TimeSeriesRoutingHashFieldMapper.encode(hash));
}
hash = hashSource(sourceType, source).buildHash(IndexRouting.ExtractFromSource::defaultOnEmpty);
return hashToShardId(hash);
}

Expand Down
21 changes: 2 additions & 19 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -941,36 +941,19 @@ public boolean allowSearchIdleOptimization() {
*/
public abstract int countChanges(String source, long fromSeqNo, long toSeqNo) throws IOException;

/**
* @deprecated This method is deprecated will and be removed once #114618 is applied to the serverless repository.
* @see #newChangesSnapshot(String, long, long, boolean, boolean, boolean, long)
*/
@Deprecated
public abstract Translog.Snapshot newChangesSnapshot(
String source,
long fromSeqNo,
long toSeqNo,
boolean requiredFullRange,
boolean singleConsumer,
boolean accessStats
) throws IOException;

/**
* Creates a new history snapshot from Lucene for reading operations whose seqno in the requesting seqno range (both inclusive).
* This feature requires soft-deletes enabled. If soft-deletes are disabled, this method will throw an {@link IllegalStateException}.
*/
public Translog.Snapshot newChangesSnapshot(
public abstract Translog.Snapshot newChangesSnapshot(
String source,
long fromSeqNo,
long toSeqNo,
boolean requiredFullRange,
boolean singleConsumer,
boolean accessStats,
long maxChunkSize
) throws IOException {
// TODO: Remove this default implementation once the deprecated newChangesSnapshot is removed
return newChangesSnapshot(source, fromSeqNo, toSeqNo, requiredFullRange, singleConsumer, accessStats);
}
) throws IOException;

/**
* Checks if this engine has every operations since {@code startingSeqNo}(inclusive) in its history (either Lucene or translog)
Expand Down
Loading

0 comments on commit e3ae17a

Please sign in to comment.