ingestMetadata = in.readMap();
+ /**
+ * Builds a writeable ingest document by constructing the wrapped ingest document from the passed-in maps.
+ *
+ * This is intended for cases like deserialization, where we know the passed-in maps aren't self-referencing,
+ * and where a defensive copy is unnecessary.
+ */
+ private WriteableIngestDocument(Map sourceAndMetadata, Map ingestMetadata) {
this.ingestDocument = new IngestDocument(sourceAndMetadata, ingestMetadata);
}
+ WriteableIngestDocument(StreamInput in) throws IOException {
+ this(in.readMap(), in.readMap());
+ }
+
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeGenericMap(ingestDocument.getSourceAndMetadata());
diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java
index 9714fc7574418..d02958567a873 100644
--- a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java
+++ b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java
@@ -27,6 +27,7 @@
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.CountDown;
+import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.search.CanMatchShardResponse;
import org.elasticsearch.search.SearchPhaseResult;
@@ -450,7 +451,7 @@ public void writeTo(StreamOutput out) throws IOException {
public static void registerRequestHandler(TransportService transportService, SearchService searchService) {
transportService.registerRequestHandler(
FREE_CONTEXT_SCROLL_ACTION_NAME,
- ThreadPool.Names.SAME,
+ EsExecutors.DIRECT_EXECUTOR_SERVICE,
ScrollFreeContextRequest::new,
(request, channel, task) -> {
boolean freed = searchService.freeReaderContext(request.id());
@@ -460,7 +461,7 @@ public static void registerRequestHandler(TransportService transportService, Sea
TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_SCROLL_ACTION_NAME, false, SearchFreeContextResponse::new);
transportService.registerRequestHandler(
FREE_CONTEXT_ACTION_NAME,
- ThreadPool.Names.SAME,
+ EsExecutors.DIRECT_EXECUTOR_SERVICE,
SearchFreeContextRequest::new,
(request, channel, task) -> {
boolean freed = searchService.freeReaderContext(request.id());
@@ -470,7 +471,7 @@ public static void registerRequestHandler(TransportService transportService, Sea
TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_ACTION_NAME, false, SearchFreeContextResponse::new);
transportService.registerRequestHandler(
CLEAR_SCROLL_CONTEXTS_ACTION_NAME,
- ThreadPool.Names.SAME,
+ EsExecutors.DIRECT_EXECUTOR_SERVICE,
TransportRequest.Empty::new,
(request, channel, task) -> {
searchService.freeAllScrollContexts();
@@ -486,7 +487,7 @@ public static void registerRequestHandler(TransportService transportService, Sea
transportService.registerRequestHandler(
DFS_ACTION_NAME,
- ThreadPool.Names.SAME,
+ EsExecutors.DIRECT_EXECUTOR_SERVICE,
ShardSearchRequest::new,
(request, channel, task) -> searchService.executeDfsPhase(request, (SearchShardTask) task, new ChannelActionListener<>(channel))
);
@@ -495,7 +496,7 @@ public static void registerRequestHandler(TransportService transportService, Sea
transportService.registerRequestHandler(
QUERY_ACTION_NAME,
- ThreadPool.Names.SAME,
+ EsExecutors.DIRECT_EXECUTOR_SERVICE,
ShardSearchRequest::new,
(request, channel, task) -> searchService.executeQueryPhase(
request,
@@ -512,7 +513,7 @@ public static void registerRequestHandler(TransportService transportService, Sea
transportService.registerRequestHandler(
QUERY_ID_ACTION_NAME,
- ThreadPool.Names.SAME,
+ EsExecutors.DIRECT_EXECUTOR_SERVICE,
QuerySearchRequest::new,
(request, channel, task) -> {
searchService.executeQueryPhase(request, (SearchShardTask) task, new ChannelActionListener<>(channel));
@@ -522,7 +523,7 @@ public static void registerRequestHandler(TransportService transportService, Sea
transportService.registerRequestHandler(
QUERY_SCROLL_ACTION_NAME,
- ThreadPool.Names.SAME,
+ EsExecutors.DIRECT_EXECUTOR_SERVICE,
InternalScrollSearchRequest::new,
(request, channel, task) -> {
searchService.executeQueryPhase(request, (SearchShardTask) task, new ChannelActionListener<>(channel));
@@ -532,7 +533,7 @@ public static void registerRequestHandler(TransportService transportService, Sea
transportService.registerRequestHandler(
QUERY_FETCH_SCROLL_ACTION_NAME,
- ThreadPool.Names.SAME,
+ EsExecutors.DIRECT_EXECUTOR_SERVICE,
InternalScrollSearchRequest::new,
(request, channel, task) -> {
searchService.executeFetchPhase(request, (SearchShardTask) task, new ChannelActionListener<>(channel));
@@ -542,7 +543,7 @@ public static void registerRequestHandler(TransportService transportService, Sea
transportService.registerRequestHandler(
FETCH_ID_SCROLL_ACTION_NAME,
- ThreadPool.Names.SAME,
+ EsExecutors.DIRECT_EXECUTOR_SERVICE,
ShardFetchRequest::new,
(request, channel, task) -> {
searchService.executeFetchPhase(request, (SearchShardTask) task, new ChannelActionListener<>(channel));
@@ -552,7 +553,7 @@ public static void registerRequestHandler(TransportService transportService, Sea
transportService.registerRequestHandler(
FETCH_ID_ACTION_NAME,
- ThreadPool.Names.SAME,
+ EsExecutors.DIRECT_EXECUTOR_SERVICE,
true,
true,
ShardFetchSearchRequest::new,
@@ -565,7 +566,7 @@ public static void registerRequestHandler(TransportService transportService, Sea
// this is cheap, it does not fetch during the rewrite phase, so we can let it quickly execute on a networking thread
transportService.registerRequestHandler(
QUERY_CAN_MATCH_NAME,
- ThreadPool.Names.SAME,
+ EsExecutors.DIRECT_EXECUTOR_SERVICE,
ShardSearchRequest::new,
(request, channel, task) -> {
searchService.canMatch(request, new ChannelActionListener<>(channel));
@@ -575,7 +576,7 @@ public static void registerRequestHandler(TransportService transportService, Sea
transportService.registerRequestHandler(
QUERY_CAN_MATCH_NODE_NAME,
- ThreadPool.Names.SEARCH_COORDINATION,
+ transportService.getThreadPool().executor(ThreadPool.Names.SEARCH_COORDINATION),
CanMatchNodeRequest::new,
(request, channel, task) -> {
searchService.canMatch(request, new ChannelActionListener<>(channel));
diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java
index 9c78e5ad62aea..eb854be99562e 100644
--- a/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java
+++ b/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java
@@ -22,6 +22,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
+import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchPhaseResult;
@@ -68,7 +69,7 @@ public TransportOpenPointInTimeAction(
this.searchTransportService = searchTransportService;
transportService.registerRequestHandler(
OPEN_SHARD_READER_CONTEXT_NAME,
- ThreadPool.Names.SAME,
+ EsExecutors.DIRECT_EXECUTOR_SERVICE,
ShardOpenReaderRequest::new,
new ShardOpenReaderRequestHandler()
);
diff --git a/server/src/main/java/org/elasticsearch/action/support/HandledTransportAction.java b/server/src/main/java/org/elasticsearch/action/support/HandledTransportAction.java
index 24668d7130d52..a0e36a538913d 100644
--- a/server/src/main/java/org/elasticsearch/action/support/HandledTransportAction.java
+++ b/server/src/main/java/org/elasticsearch/action/support/HandledTransportAction.java
@@ -61,7 +61,14 @@ protected HandledTransportAction(
String executor
) {
super(actionName, actionFilters, transportService.getTaskManager());
- transportService.registerRequestHandler(actionName, executor, false, canTripCircuitBreaker, requestReader, new TransportHandler());
+ transportService.registerRequestHandler(
+ actionName,
+ transportService.getThreadPool().executor(executor),
+ false,
+ canTripCircuitBreaker,
+ requestReader,
+ new TransportHandler()
+ );
}
class TransportHandler implements TransportRequestHandler {
diff --git a/server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java b/server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java
index 570514355da56..ddd2cc43005e2 100644
--- a/server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java
+++ b/server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java
@@ -75,7 +75,7 @@ protected TransportBroadcastAction(
transportService.registerRequestHandler(
transportShardAction,
- executor,
+ this.executor,
shardRequestReader,
(request, channel, task) -> ActionListener.completeWith(
new ChannelActionListener<>(channel),
diff --git a/server/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java b/server/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java
index 15c28bdf33c87..4f78d68daa8f2 100644
--- a/server/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java
+++ b/server/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java
@@ -119,7 +119,7 @@ public TransportBroadcastByNodeAction(
transportService.registerRequestHandler(
transportNodeBroadcastAction,
- executor,
+ this.executor,
false,
canTripCircuitBreaker,
NodeRequest::new,
diff --git a/server/src/main/java/org/elasticsearch/action/support/broadcast/unpromotable/TransportBroadcastUnpromotableAction.java b/server/src/main/java/org/elasticsearch/action/support/broadcast/unpromotable/TransportBroadcastUnpromotableAction.java
index d5c7fc763fba6..579ed101b56ae 100644
--- a/server/src/main/java/org/elasticsearch/action/support/broadcast/unpromotable/TransportBroadcastUnpromotableAction.java
+++ b/server/src/main/java/org/elasticsearch/action/support/broadcast/unpromotable/TransportBroadcastUnpromotableAction.java
@@ -60,7 +60,12 @@ protected TransportBroadcastUnpromotableAction(
this.transportUnpromotableAction = actionName + "[u]";
this.executor = transportService.getThreadPool().executor(executor);
- transportService.registerRequestHandler(transportUnpromotableAction, executor, requestReader, new UnpromotableTransportHandler());
+ transportService.registerRequestHandler(
+ transportUnpromotableAction,
+ this.executor,
+ requestReader,
+ new UnpromotableTransportHandler()
+ );
}
protected abstract void unpromotableShardOperation(Task task, Request request, ActionListener listener);
diff --git a/server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java b/server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java
index 1efe70bcb5adc..b45c448ecc52c 100644
--- a/server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java
+++ b/server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java
@@ -82,7 +82,7 @@ protected TransportNodesAction(
this.transportService = Objects.requireNonNull(transportService);
this.finalExecutor = threadPool.executor(executor);
this.transportNodeAction = actionName + "[n]";
- transportService.registerRequestHandler(transportNodeAction, executor, nodeRequest, new NodeTransportHandler());
+ transportService.registerRequestHandler(transportNodeAction, finalExecutor, nodeRequest, new NodeTransportHandler());
}
@Override
diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java
index 5651af11719cf..d1e2b6dd98faa 100644
--- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java
+++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java
@@ -38,6 +38,7 @@
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
+import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.Assertions;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
@@ -187,11 +188,16 @@ protected TransportReplicationAction(
this.retryTimeout = REPLICATION_RETRY_TIMEOUT.get(settings);
this.forceExecutionOnPrimary = forceExecutionOnPrimary;
- transportService.registerRequestHandler(actionName, ThreadPool.Names.SAME, requestReader, this::handleOperationRequest);
+ transportService.registerRequestHandler(
+ actionName,
+ EsExecutors.DIRECT_EXECUTOR_SERVICE,
+ requestReader,
+ this::handleOperationRequest
+ );
transportService.registerRequestHandler(
transportPrimaryAction,
- executor,
+ threadPool.executor(executor),
forceExecutionOnPrimary,
true,
in -> new ConcreteShardRequest<>(requestReader, in),
@@ -201,7 +207,7 @@ protected TransportReplicationAction(
// we must never reject on because of thread pool capacity on replicas
transportService.registerRequestHandler(
transportReplicaAction,
- executor,
+ threadPool.executor(executor),
true,
true,
in -> new ConcreteReplicaRequest<>(replicaRequestReader, in),
diff --git a/server/src/main/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationAction.java b/server/src/main/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationAction.java
index 63ff4cef5a924..c526a2679306d 100644
--- a/server/src/main/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationAction.java
+++ b/server/src/main/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationAction.java
@@ -26,6 +26,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
+import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;
@@ -33,7 +34,6 @@
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
-import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
@@ -72,7 +72,7 @@ protected TransportInstanceSingleOperationAction(
this.transportService = transportService;
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.shardActionName = actionName + "[s]";
- transportService.registerRequestHandler(shardActionName, Names.SAME, request, new ShardTransportHandler());
+ transportService.registerRequestHandler(shardActionName, EsExecutors.DIRECT_EXECUTOR_SERVICE, request, new ShardTransportHandler());
}
@Override
diff --git a/server/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java b/server/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java
index 001a99ecd505d..9d980fabc6714 100644
--- a/server/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java
+++ b/server/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java
@@ -28,6 +28,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.logging.LoggerMessageFormat;
+import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.tasks.Task;
@@ -79,9 +80,14 @@ protected TransportSingleShardAction(
this.executor = executor;
if (isSubAction() == false) {
- transportService.registerRequestHandler(actionName, ThreadPool.Names.SAME, request, new TransportHandler());
+ transportService.registerRequestHandler(actionName, EsExecutors.DIRECT_EXECUTOR_SERVICE, request, new TransportHandler());
}
- transportService.registerRequestHandler(transportShardAction, ThreadPool.Names.SAME, request, new ShardTransportHandler());
+ transportService.registerRequestHandler(
+ transportShardAction,
+ EsExecutors.DIRECT_EXECUTOR_SERVICE,
+ request,
+ new ShardTransportHandler()
+ );
}
/**
diff --git a/server/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java b/server/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java
index b509008af2b4f..fb29e8a838db6 100644
--- a/server/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java
+++ b/server/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java
@@ -79,7 +79,12 @@ protected TransportTasksAction(
this.responsesReader = responsesReader;
this.responseReader = responseReader;
- transportService.registerRequestHandler(transportNodeAction, nodeExecutor, NodeTaskRequest::new, new NodeTransportHandler());
+ transportService.registerRequestHandler(
+ transportNodeAction,
+ transportService.getThreadPool().executor(nodeExecutor),
+ NodeTaskRequest::new,
+ new NodeTransportHandler()
+ );
}
@Override
diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterState.java b/server/src/main/java/org/elasticsearch/cluster/ClusterState.java
index 6ca8fe26edd54..3f9e7e4d8d9ae 100644
--- a/server/src/main/java/org/elasticsearch/cluster/ClusterState.java
+++ b/server/src/main/java/org/elasticsearch/cluster/ClusterState.java
@@ -629,21 +629,6 @@ public Iterator extends ToXContent> toXContentChunked(ToXContent.Params outerP
(builder, params) -> builder.endObject()
),
- // transportVersions - redundant with the nodes_versions section but has to stay for backwards compatibility
- // just use NODES again, its node-related information
- chunkedSection(
- metrics.contains(Metric.NODES),
- (builder, params) -> builder.startArray("transport_versions"),
- compatibilityVersions.entrySet().iterator(),
- e -> Iterators.single(
- (builder, params) -> builder.startObject()
- .field("node_id", e.getKey())
- .field("transport_version", e.getValue().transportVersion().toString())
- .endObject()
- ),
- (builder, params) -> builder.endArray()
- ),
-
// per-node version information
chunkedSection(
metrics.contains(Metric.NODES),
diff --git a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java
index 97bf6cc01c4e5..f9aa2c6c234be 100644
--- a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java
+++ b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java
@@ -36,6 +36,7 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.Index;
@@ -98,13 +99,13 @@ public ShardStateAction(
transportService.registerRequestHandler(
SHARD_STARTED_ACTION_NAME,
- ThreadPool.Names.SAME,
+ EsExecutors.DIRECT_EXECUTOR_SERVICE,
StartedShardEntry::new,
new ShardStartedTransportHandler(clusterService, new ShardStartedClusterStateTaskExecutor(allocationService, rerouteService))
);
transportService.registerRequestHandler(
SHARD_FAILED_ACTION_NAME,
- ThreadPool.Names.SAME,
+ EsExecutors.DIRECT_EXECUTOR_SERVICE,
FailedShardEntry::new,
new ShardFailedTransportHandler(clusterService, new ShardFailedClusterStateTaskExecutor(allocationService, rerouteService))
);
diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java
index dc52791a5d5e2..7ccea8e99918b 100644
--- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java
+++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java
@@ -271,7 +271,7 @@ public Coordinator(
);
transportService.registerRequestHandler(
COMMIT_STATE_ACTION_NAME,
- Names.CLUSTER_COORDINATION,
+ this.clusterCoordinationExecutor,
false,
false,
ApplyCommitRequest::new,
diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java b/server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java
index ad2faaccf0e96..99ab8650a52d8 100644
--- a/server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java
+++ b/server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java
@@ -124,7 +124,7 @@ public FollowersChecker(
updateFastResponseState(0, Mode.CANDIDATE);
transportService.registerRequestHandler(
FOLLOWER_CHECK_ACTION_NAME,
- Names.SAME,
+ EsExecutors.DIRECT_EXECUTOR_SERVICE,
false,
false,
FollowerCheckRequest::new,
diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java
index ce2754fa3854c..247034c88ed62 100644
--- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java
+++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java
@@ -28,6 +28,7 @@
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
+import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.TimeValue;
@@ -114,7 +115,7 @@ public class JoinHelper {
transportService.registerRequestHandler(
JOIN_ACTION_NAME,
- Names.CLUSTER_COORDINATION,
+ transportService.getThreadPool().executor(Names.CLUSTER_COORDINATION),
false,
false,
JoinRequest::new,
@@ -126,7 +127,7 @@ public class JoinHelper {
transportService.registerRequestHandler(
START_JOIN_ACTION_NAME,
- Names.CLUSTER_COORDINATION,
+ transportService.getThreadPool().executor(Names.CLUSTER_COORDINATION),
false,
false,
StartJoinRequest::new,
@@ -139,7 +140,7 @@ public class JoinHelper {
transportService.registerRequestHandler(
JOIN_PING_ACTION_NAME,
- ThreadPool.Names.SAME,
+ EsExecutors.DIRECT_EXECUTOR_SERVICE,
false,
false,
TransportRequest.Empty::new,
diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinValidationService.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinValidationService.java
index 248137f03fcaa..d9911ad12df84 100644
--- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinValidationService.java
+++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinValidationService.java
@@ -121,7 +121,7 @@ public JoinValidationService(
final var dataPaths = Environment.PATH_DATA_SETTING.get(settings);
transportService.registerRequestHandler(
JoinValidationService.JOIN_VALIDATE_ACTION_NAME,
- ThreadPool.Names.CLUSTER_COORDINATION,
+ this.responseExecutor,
ValidateJoinRequest::new,
(request, channel, task) -> {
final var remoteState = request.getOrReadState();
diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java b/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java
index 8a20e8e56d751..9fcae5bcf67f8 100644
--- a/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java
+++ b/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java
@@ -113,7 +113,7 @@ public class LeaderChecker {
transportService.registerRequestHandler(
LEADER_CHECK_ACTION_NAME,
- Names.SAME,
+ EsExecutors.DIRECT_EXECUTOR_SERVICE,
false,
false,
LeaderCheckRequest::new,
diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java b/server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java
index 45079b2bccd60..781a05d535b16 100644
--- a/server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java
+++ b/server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java
@@ -106,7 +106,7 @@ public PublicationTransportHandler(
transportService.registerRequestHandler(
PUBLISH_STATE_ACTION_NAME,
- ThreadPool.Names.CLUSTER_COORDINATION,
+ this.clusterCoordinationExecutor,
false,
false,
BytesTransportRequest::new,
diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/StatefulPreVoteCollector.java b/server/src/main/java/org/elasticsearch/cluster/coordination/StatefulPreVoteCollector.java
index 28ab04e5a7ccc..7bc3514206ffe 100644
--- a/server/src/main/java/org/elasticsearch/cluster/coordination/StatefulPreVoteCollector.java
+++ b/server/src/main/java/org/elasticsearch/cluster/coordination/StatefulPreVoteCollector.java
@@ -64,7 +64,7 @@ public StatefulPreVoteCollector(
transportService.registerRequestHandler(
REQUEST_PRE_VOTE_ACTION_NAME,
- Names.CLUSTER_COORDINATION,
+ this.clusterCoordinationExecutor,
false,
false,
PreVoteRequest::new,
diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java
index 517bc2293ab6b..ab67478192c11 100644
--- a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java
+++ b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java
@@ -475,7 +475,8 @@ public Iterator> settings() {
SETTING_INDEX_HIDDEN,
false,
Property.Dynamic,
- Property.IndexScope
+ Property.IndexScope,
+ Property.ServerlessPublic
);
/**
diff --git a/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java b/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java
index 046f0a1c64bb5..46e8f5210e447 100644
--- a/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java
+++ b/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java
@@ -110,7 +110,7 @@ public PeerFinder(
transportService.registerRequestHandler(
REQUEST_PEERS_ACTION_NAME,
- Names.CLUSTER_COORDINATION,
+ this.clusterCoordinationExecutor,
false,
false,
PeersRequest::new,
diff --git a/server/src/main/java/org/elasticsearch/gateway/LocalAllocateDangledIndices.java b/server/src/main/java/org/elasticsearch/gateway/LocalAllocateDangledIndices.java
index d0579342f5bd0..0262e37dd74a9 100644
--- a/server/src/main/java/org/elasticsearch/gateway/LocalAllocateDangledIndices.java
+++ b/server/src/main/java/org/elasticsearch/gateway/LocalAllocateDangledIndices.java
@@ -35,7 +35,6 @@
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.tasks.Task;
-import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
@@ -73,7 +72,7 @@ public LocalAllocateDangledIndices(
this.indexMetadataVerifier = indexMetadataVerifier;
transportService.registerRequestHandler(
ACTION_NAME,
- ThreadPool.Names.SAME,
+ EsExecutors.DIRECT_EXECUTOR_SERVICE,
AllocateDangledRequest::new,
new AllocateDangledRequestHandler()
);
diff --git a/server/src/main/java/org/elasticsearch/index/mapper/BinaryFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/BinaryFieldMapper.java
index f5d2dccab8052..98aaf3fbda596 100644
--- a/server/src/main/java/org/elasticsearch/index/mapper/BinaryFieldMapper.java
+++ b/server/src/main/java/org/elasticsearch/index/mapper/BinaryFieldMapper.java
@@ -67,7 +67,7 @@ public BinaryFieldMapper build(MapperBuilderContext context) {
name,
new BinaryFieldType(context.buildFullName(name), stored.getValue(), hasDocValues.getValue(), meta.getValue()),
multiFieldsBuilder.build(this, context),
- copyTo.build(),
+ copyTo,
this
);
}
diff --git a/server/src/main/java/org/elasticsearch/index/mapper/BooleanFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/BooleanFieldMapper.java
index f11a3781fd19c..f2383148c31ed 100644
--- a/server/src/main/java/org/elasticsearch/index/mapper/BooleanFieldMapper.java
+++ b/server/src/main/java/org/elasticsearch/index/mapper/BooleanFieldMapper.java
@@ -123,14 +123,7 @@ public BooleanFieldMapper build(MapperBuilderContext context) {
scriptValues(),
meta.getValue()
);
- return new BooleanFieldMapper(
- name,
- ft,
- multiFieldsBuilder.build(this, context),
- copyTo.build(),
- context.isSourceSynthetic(),
- this
- );
+ return new BooleanFieldMapper(name, ft, multiFieldsBuilder.build(this, context), copyTo, context.isSourceSynthetic(), this);
}
private FieldValues scriptValues() {
diff --git a/server/src/main/java/org/elasticsearch/index/mapper/CompletionFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/CompletionFieldMapper.java
index ccf4953608f4d..581c87d224f2d 100644
--- a/server/src/main/java/org/elasticsearch/index/mapper/CompletionFieldMapper.java
+++ b/server/src/main/java/org/elasticsearch/index/mapper/CompletionFieldMapper.java
@@ -85,14 +85,15 @@ public FieldMapper.Builder getMergeBuilder() {
}
public static class Defaults {
- public static final FieldType FIELD_TYPE = new FieldType();
+ public static final FieldType FIELD_TYPE;
static {
- FIELD_TYPE.setTokenized(true);
- FIELD_TYPE.setStored(false);
- FIELD_TYPE.setStoreTermVectors(false);
- FIELD_TYPE.setIndexOptions(IndexOptions.DOCS_AND_FREQS_AND_POSITIONS);
- FIELD_TYPE.setOmitNorms(true);
- FIELD_TYPE.freeze();
+ final FieldType ft = new FieldType();
+ ft.setTokenized(true);
+ ft.setStored(false);
+ ft.setStoreTermVectors(false);
+ ft.setIndexOptions(IndexOptions.DOCS_AND_FREQS_AND_POSITIONS);
+ ft.setOmitNorms(true);
+ FIELD_TYPE = freezeAndDeduplicateFieldType(ft);
}
public static final boolean DEFAULT_PRESERVE_SEPARATORS = true;
public static final boolean DEFAULT_POSITION_INCREMENTS = true;
@@ -205,7 +206,7 @@ public CompletionFieldMapper build(MapperBuilderContext context) {
CompletionFieldType ft = new CompletionFieldType(context.buildFullName(name), completionAnalyzer, meta.getValue());
ft.setContextMappings(contexts.getValue());
- return new CompletionFieldMapper(name, ft, multiFieldsBuilder.build(this, context), copyTo.build(), this);
+ return new CompletionFieldMapper(name, ft, multiFieldsBuilder.build(this, context), copyTo, this);
}
private void checkCompletionContextsLimit() {
diff --git a/server/src/main/java/org/elasticsearch/index/mapper/CustomDocValuesField.java b/server/src/main/java/org/elasticsearch/index/mapper/CustomDocValuesField.java
index b4ce98dcdaf9a..115b20d7d88b3 100644
--- a/server/src/main/java/org/elasticsearch/index/mapper/CustomDocValuesField.java
+++ b/server/src/main/java/org/elasticsearch/index/mapper/CustomDocValuesField.java
@@ -22,11 +22,12 @@
// used for binary, geo and range fields
public abstract class CustomDocValuesField implements IndexableField {
- public static final FieldType TYPE = new FieldType();
+ public static final FieldType TYPE;
static {
- TYPE.setDocValuesType(DocValuesType.BINARY);
- TYPE.setOmitNorms(true);
- TYPE.freeze();
+ FieldType ft = new FieldType();
+ ft.setDocValuesType(DocValuesType.BINARY);
+ ft.setOmitNorms(true);
+ TYPE = Mapper.freezeAndDeduplicateFieldType(ft);
}
private final String name;
diff --git a/server/src/main/java/org/elasticsearch/index/mapper/CustomTermFreqField.java b/server/src/main/java/org/elasticsearch/index/mapper/CustomTermFreqField.java
index 6ee379e598578..3b618989a2053 100644
--- a/server/src/main/java/org/elasticsearch/index/mapper/CustomTermFreqField.java
+++ b/server/src/main/java/org/elasticsearch/index/mapper/CustomTermFreqField.java
@@ -21,11 +21,13 @@
*/
public final class CustomTermFreqField extends Field {
- private static final FieldType FIELD_TYPE = new FieldType();
+ private static final FieldType FIELD_TYPE;
static {
- FIELD_TYPE.setTokenized(false);
- FIELD_TYPE.setOmitNorms(true);
- FIELD_TYPE.setIndexOptions(IndexOptions.DOCS_AND_FREQS);
+ final FieldType ft = new FieldType();
+ ft.setTokenized(false);
+ ft.setOmitNorms(true);
+ ft.setIndexOptions(IndexOptions.DOCS_AND_FREQS);
+ FIELD_TYPE = Mapper.freezeAndDeduplicateFieldType(ft);
}
private final int fieldValue;
diff --git a/server/src/main/java/org/elasticsearch/index/mapper/DateFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/DateFieldMapper.java
index 3bf593627faa9..9579d921c4176 100644
--- a/server/src/main/java/org/elasticsearch/index/mapper/DateFieldMapper.java
+++ b/server/src/main/java/org/elasticsearch/index/mapper/DateFieldMapper.java
@@ -358,7 +358,7 @@ public DateFieldMapper build(MapperBuilderContext context) {
);
Long nullTimestamp = parseNullValue(ft);
- return new DateFieldMapper(name, ft, multiFieldsBuilder.build(this, context), copyTo.build(), nullTimestamp, resolution, this);
+ return new DateFieldMapper(name, ft, multiFieldsBuilder.build(this, context), copyTo, nullTimestamp, resolution, this);
}
}
diff --git a/server/src/main/java/org/elasticsearch/index/mapper/FieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/FieldMapper.java
index 8119cf5e299d0..a9d90f80c8a18 100644
--- a/server/src/main/java/org/elasticsearch/index/mapper/FieldMapper.java
+++ b/server/src/main/java/org/elasticsearch/index/mapper/FieldMapper.java
@@ -528,7 +528,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
*/
public static class CopyTo {
- private static final CopyTo EMPTY = new CopyTo(Collections.emptyList());
+ private static final CopyTo EMPTY = new CopyTo(List.of());
public static CopyTo empty() {
return EMPTY;
@@ -537,7 +537,14 @@ public static CopyTo empty() {
private final List copyToFields;
private CopyTo(List copyToFields) {
- this.copyToFields = copyToFields;
+ this.copyToFields = List.copyOf(copyToFields);
+ }
+
+ public CopyTo withAddedFields(List fields) {
+ if (fields.isEmpty()) {
+ return this;
+ }
+ return new CopyTo(CollectionUtils.concatLists(copyToFields, fields));
}
public XContentBuilder toXContent(XContentBuilder builder) throws IOException {
@@ -551,31 +558,6 @@ public XContentBuilder toXContent(XContentBuilder builder) throws IOException {
return builder;
}
- public static class Builder {
- private final List copyToBuilders = new ArrayList<>();
-
- public Builder add(String field) {
- copyToBuilders.add(field);
- return this;
- }
-
- public boolean hasValues() {
- return copyToBuilders.isEmpty() == false;
- }
-
- public CopyTo build() {
- if (copyToBuilders.isEmpty()) {
- return EMPTY;
- }
- return new CopyTo(Collections.unmodifiableList(copyToBuilders));
- }
-
- public void reset(CopyTo copyTo) {
- copyToBuilders.clear();
- copyToBuilders.addAll(copyTo.copyToFields);
- }
- }
-
public List copyToFields() {
return copyToFields;
}
@@ -1216,7 +1198,7 @@ void check() {
public abstract static class Builder extends Mapper.Builder implements ToXContentFragment {
protected final MultiFields.Builder multiFieldsBuilder = new MultiFields.Builder();
- protected final CopyTo.Builder copyTo = new CopyTo.Builder();
+ protected CopyTo copyTo = CopyTo.EMPTY;
/**
* Creates a new Builder with a field name
@@ -1246,7 +1228,7 @@ protected void merge(FieldMapper in, Conflicts conflicts, MapperBuilderContext m
for (FieldMapper newSubField : in.multiFields.mappers) {
multiFieldsBuilder.update(newSubField, childContext);
}
- this.copyTo.reset(in.copyTo);
+ this.copyTo = in.copyTo;
validate();
}
@@ -1276,7 +1258,7 @@ protected void addScriptValidation(
if (s != null && multiFieldsBuilder.hasMultiFields()) {
throw new MapperParsingException("Cannot define multifields on a field with a script");
}
- if (s != null && copyTo.hasValues()) {
+ if (s != null && copyTo.copyToFields().isEmpty() == false) {
throw new MapperParsingException("Cannot define copy_to parameter on a field with a script");
}
});
@@ -1324,7 +1306,7 @@ public final void parse(String name, MappingParserContext parserContext, Map {
- TypeParsers.parseCopyFields(propNode).forEach(copyTo::add);
+ copyTo = copyTo.withAddedFields(TypeParsers.parseCopyFields(propNode));
iterator.remove();
continue;
}
diff --git a/server/src/main/java/org/elasticsearch/index/mapper/GeoPointFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/GeoPointFieldMapper.java
index 815148ea0598f..4f96723235035 100644
--- a/server/src/main/java/org/elasticsearch/index/mapper/GeoPointFieldMapper.java
+++ b/server/src/main/java/org/elasticsearch/index/mapper/GeoPointFieldMapper.java
@@ -213,7 +213,7 @@ public FieldMapper build(MapperBuilderContext context) {
indexMode
);
if (this.script.get() == null) {
- return new GeoPointFieldMapper(name, ft, multiFieldsBuilder.build(this, context), copyTo.build(), geoParser, this);
+ return new GeoPointFieldMapper(name, ft, multiFieldsBuilder.build(this, context), copyTo, geoParser, this);
}
return new GeoPointFieldMapper(name, ft, geoParser, this);
}
diff --git a/server/src/main/java/org/elasticsearch/index/mapper/GeoShapeFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/GeoShapeFieldMapper.java
index 37ccdfe3da36c..7ae410a1a9dcb 100644
--- a/server/src/main/java/org/elasticsearch/index/mapper/GeoShapeFieldMapper.java
+++ b/server/src/main/java/org/elasticsearch/index/mapper/GeoShapeFieldMapper.java
@@ -110,7 +110,7 @@ public GeoShapeFieldMapper build(MapperBuilderContext context) {
name,
ft,
multiFieldsBuilder.build(this, context),
- copyTo.build(),
+ copyTo,
new GeoShapeIndexer(orientation.get().value(), context.buildFullName(name)),
geoShapeParser,
this
diff --git a/server/src/main/java/org/elasticsearch/index/mapper/IpFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/IpFieldMapper.java
index 259f51027ce73..49339c756bc7f 100644
--- a/server/src/main/java/org/elasticsearch/index/mapper/IpFieldMapper.java
+++ b/server/src/main/java/org/elasticsearch/index/mapper/IpFieldMapper.java
@@ -178,7 +178,7 @@ public IpFieldMapper build(MapperBuilderContext context) {
dimension.getValue()
),
multiFieldsBuilder.build(this, context),
- copyTo.build(),
+ copyTo,
context.isSourceSynthetic(),
this
);
diff --git a/server/src/main/java/org/elasticsearch/index/mapper/KeywordFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/KeywordFieldMapper.java
index 31ca329d03375..8d9c77f503ab9 100644
--- a/server/src/main/java/org/elasticsearch/index/mapper/KeywordFieldMapper.java
+++ b/server/src/main/java/org/elasticsearch/index/mapper/KeywordFieldMapper.java
@@ -87,14 +87,15 @@ public final class KeywordFieldMapper extends FieldMapper {
public static final String CONTENT_TYPE = "keyword";
public static class Defaults {
- public static final FieldType FIELD_TYPE = new FieldType();
+ public static final FieldType FIELD_TYPE;
static {
- FIELD_TYPE.setTokenized(false);
- FIELD_TYPE.setOmitNorms(true);
- FIELD_TYPE.setIndexOptions(IndexOptions.DOCS);
- FIELD_TYPE.setDocValuesType(DocValuesType.SORTED_SET);
- FIELD_TYPE.freeze();
+ FieldType ft = new FieldType();
+ ft.setTokenized(false);
+ ft.setOmitNorms(true);
+ ft.setIndexOptions(IndexOptions.DOCS);
+ ft.setDocValuesType(DocValuesType.SORTED_SET);
+ FIELD_TYPE = freezeAndDeduplicateFieldType(ft);
}
public static TextSearchInfo TEXT_SEARCH_INFO = new TextSearchInfo(
@@ -323,7 +324,7 @@ public KeywordFieldMapper build(MapperBuilderContext context) {
fieldtype,
buildFieldType(context, fieldtype),
multiFieldsBuilder.build(this, context),
- copyTo.build(),
+ copyTo,
context.isSourceSynthetic(),
this
);
@@ -831,7 +832,7 @@ private KeywordFieldMapper(
this.indexed = builder.indexed.getValue();
this.hasDocValues = builder.hasDocValues.getValue();
this.indexOptions = builder.indexOptions.getValue();
- this.fieldType = fieldType;
+ this.fieldType = freezeAndDeduplicateFieldType(fieldType);
this.similarity = builder.similarity.getValue();
this.normalizerName = builder.normalizer.getValue();
this.splitQueriesOnWhitespace = builder.splitQueriesOnWhitespace.getValue();
diff --git a/server/src/main/java/org/elasticsearch/index/mapper/Mapper.java b/server/src/main/java/org/elasticsearch/index/mapper/Mapper.java
index 32796aed2f422..598e8c4d394e8 100644
--- a/server/src/main/java/org/elasticsearch/index/mapper/Mapper.java
+++ b/server/src/main/java/org/elasticsearch/index/mapper/Mapper.java
@@ -8,6 +8,7 @@
package org.elasticsearch.index.mapper;
+import org.apache.lucene.document.FieldType;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.util.StringLiteralDeduplicator;
import org.elasticsearch.index.IndexVersion;
@@ -15,8 +16,11 @@
import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
public abstract class Mapper implements ToXContentFragment, Iterable {
+
public abstract static class Builder {
protected final String name;
@@ -105,4 +109,27 @@ public String toString() {
public static String internFieldName(String fieldName) {
return fieldNameStringDeduplicator.deduplicate(fieldName);
}
+
+ private static final Map fieldTypeDeduplicator = new ConcurrentHashMap<>();
+
+ /**
+ * Freezes the given {@link FieldType} instances and tries to deduplicate it as long as the field does not return a non-empty value for
+ * {@link FieldType#getAttributes()}.
+ *
+ * @param fieldType field type to deduplicate
+ * @return deduplicated field type
+ */
+ public static FieldType freezeAndDeduplicateFieldType(FieldType fieldType) {
+ fieldType.freeze();
+ var attributes = fieldType.getAttributes();
+ if ((attributes != null && attributes.isEmpty() == false) || fieldType.getClass() != FieldType.class) {
+ // don't deduplicate subclasses or types with non-empty attribute maps to avoid memory leaks
+ return fieldType;
+ }
+ if (fieldTypeDeduplicator.size() > 1000) {
+ // guard against the case where we run up too many combinations via (vector-)dimensions combinations
+ fieldTypeDeduplicator.clear();
+ }
+ return fieldTypeDeduplicator.computeIfAbsent(fieldType, Function.identity());
+ }
}
diff --git a/server/src/main/java/org/elasticsearch/index/mapper/NumberFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/NumberFieldMapper.java
index 7590fd36d849a..34763eda69a28 100644
--- a/server/src/main/java/org/elasticsearch/index/mapper/NumberFieldMapper.java
+++ b/server/src/main/java/org/elasticsearch/index/mapper/NumberFieldMapper.java
@@ -268,14 +268,7 @@ protected Parameter>[] getParameters() {
@Override
public NumberFieldMapper build(MapperBuilderContext context) {
MappedFieldType ft = new NumberFieldType(context.buildFullName(name), this);
- return new NumberFieldMapper(
- name,
- ft,
- multiFieldsBuilder.build(this, context),
- copyTo.build(),
- context.isSourceSynthetic(),
- this
- );
+ return new NumberFieldMapper(name, ft, multiFieldsBuilder.build(this, context), copyTo, context.isSourceSynthetic(), this);
}
}
diff --git a/server/src/main/java/org/elasticsearch/index/mapper/PlaceHolderFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/PlaceHolderFieldMapper.java
index a7de1194ddceb..4ad0873b66d50 100644
--- a/server/src/main/java/org/elasticsearch/index/mapper/PlaceHolderFieldMapper.java
+++ b/server/src/main/java/org/elasticsearch/index/mapper/PlaceHolderFieldMapper.java
@@ -91,13 +91,7 @@ protected Parameter>[] getParameters() {
@Override
public PlaceHolderFieldMapper build(MapperBuilderContext context) {
PlaceHolderFieldType mappedFieldType = new PlaceHolderFieldType(context.buildFullName(name), type, Map.of());
- return new PlaceHolderFieldMapper(
- name,
- mappedFieldType,
- multiFieldsBuilder.build(this, context),
- copyTo.build(),
- unknownParams
- );
+ return new PlaceHolderFieldMapper(name, mappedFieldType, multiFieldsBuilder.build(this, context), copyTo, unknownParams);
}
}
diff --git a/server/src/main/java/org/elasticsearch/index/mapper/RangeFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/RangeFieldMapper.java
index a7f0bedc7c060..fcd2a425a6625 100644
--- a/server/src/main/java/org/elasticsearch/index/mapper/RangeFieldMapper.java
+++ b/server/src/main/java/org/elasticsearch/index/mapper/RangeFieldMapper.java
@@ -163,7 +163,7 @@ protected RangeFieldType setupFieldType(MapperBuilderContext context) {
@Override
public RangeFieldMapper build(MapperBuilderContext context) {
RangeFieldType ft = setupFieldType(context);
- return new RangeFieldMapper(name, ft, multiFieldsBuilder.build(this, context), copyTo.build(), type, this);
+ return new RangeFieldMapper(name, ft, multiFieldsBuilder.build(this, context), copyTo, type, this);
}
}
diff --git a/server/src/main/java/org/elasticsearch/index/mapper/SeqNoFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/SeqNoFieldMapper.java
index 1dc1d236d80d0..8e8c58b35c68a 100644
--- a/server/src/main/java/org/elasticsearch/index/mapper/SeqNoFieldMapper.java
+++ b/server/src/main/java/org/elasticsearch/index/mapper/SeqNoFieldMapper.java
@@ -50,10 +50,12 @@ public class SeqNoFieldMapper extends MetadataFieldMapper {
// Like Lucene's LongField but single-valued (NUMERIC doc values instead of SORTED_NUMERIC doc values)
private static class SingleValueLongField extends Field {
- private static final FieldType FIELD_TYPE = new FieldType();
+ private static final FieldType FIELD_TYPE;
static {
- FIELD_TYPE.setDimensions(1, Long.BYTES);
- FIELD_TYPE.setDocValuesType(DocValuesType.NUMERIC);
+ FieldType ft = new FieldType();
+ ft.setDimensions(1, Long.BYTES);
+ ft.setDocValuesType(DocValuesType.NUMERIC);
+ FIELD_TYPE = freezeAndDeduplicateFieldType(ft);
}
private final BytesRef pointValue;
diff --git a/server/src/main/java/org/elasticsearch/index/mapper/SourceFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/SourceFieldMapper.java
index ee709be35bc82..c5d5dbec1ef15 100644
--- a/server/src/main/java/org/elasticsearch/index/mapper/SourceFieldMapper.java
+++ b/server/src/main/java/org/elasticsearch/index/mapper/SourceFieldMapper.java
@@ -77,13 +77,14 @@ private enum Mode {
public static class Defaults {
public static final String NAME = SourceFieldMapper.NAME;
- public static final FieldType FIELD_TYPE = new FieldType();
+ public static final FieldType FIELD_TYPE;
static {
- FIELD_TYPE.setIndexOptions(IndexOptions.NONE); // not indexed
- FIELD_TYPE.setStored(true);
- FIELD_TYPE.setOmitNorms(true);
- FIELD_TYPE.freeze();
+ FieldType ft = new FieldType();
+ ft.setIndexOptions(IndexOptions.NONE); // not indexed
+ ft.setStored(true);
+ ft.setOmitNorms(true);
+ FIELD_TYPE = freezeAndDeduplicateFieldType(ft);
}
}
diff --git a/server/src/main/java/org/elasticsearch/index/mapper/TextFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/TextFieldMapper.java
index 6d2c5b1cb71ac..4c3e1d38b6d57 100644
--- a/server/src/main/java/org/elasticsearch/index/mapper/TextFieldMapper.java
+++ b/server/src/main/java/org/elasticsearch/index/mapper/TextFieldMapper.java
@@ -99,15 +99,16 @@ public static class Defaults {
public static final int INDEX_PREFIX_MIN_CHARS = 2;
public static final int INDEX_PREFIX_MAX_CHARS = 5;
- public static final FieldType FIELD_TYPE = new FieldType();
+ public static final FieldType FIELD_TYPE;
static {
- FIELD_TYPE.setTokenized(true);
- FIELD_TYPE.setStored(false);
- FIELD_TYPE.setStoreTermVectors(false);
- FIELD_TYPE.setOmitNorms(false);
- FIELD_TYPE.setIndexOptions(IndexOptions.DOCS_AND_FREQS_AND_POSITIONS);
- FIELD_TYPE.freeze();
+ FieldType ft = new FieldType();
+ ft.setTokenized(true);
+ ft.setStored(false);
+ ft.setStoreTermVectors(false);
+ ft.setOmitNorms(false);
+ ft.setIndexOptions(IndexOptions.DOCS_AND_FREQS_AND_POSITIONS);
+ FIELD_TYPE = freezeAndDeduplicateFieldType(ft);
}
/**
@@ -469,7 +470,7 @@ public TextFieldMapper build(MapperBuilderContext context) {
throw new MapperParsingException("Cannot use reserved field name [" + mapper.name() + "]");
}
}
- return new TextFieldMapper(name, fieldType, tft, prefixFieldInfo, phraseFieldInfo, multiFields, copyTo.build(), this);
+ return new TextFieldMapper(name, fieldType, tft, prefixFieldInfo, phraseFieldInfo, multiFields, copyTo, this);
}
}
@@ -635,7 +636,7 @@ private static final class SubFieldInfo {
private final String field;
SubFieldInfo(String field, FieldType fieldType, Analyzer analyzer) {
- this.fieldType = fieldType;
+ this.fieldType = Mapper.freezeAndDeduplicateFieldType(fieldType);
this.analyzer = analyzer;
this.field = field;
}
@@ -1138,7 +1139,7 @@ protected TextFieldMapper(
if (fieldType.indexOptions() == IndexOptions.NONE && fieldType().fielddata()) {
throw new IllegalArgumentException("Cannot enable fielddata on a [text] field that is not indexed: [" + name() + "]");
}
- this.fieldType = fieldType;
+ this.fieldType = freezeAndDeduplicateFieldType(fieldType);
this.prefixFieldInfo = prefixFieldInfo;
this.phraseFieldInfo = phraseFieldInfo;
this.indexCreatedVersion = builder.indexCreatedVersion;
diff --git a/server/src/main/java/org/elasticsearch/index/mapper/TextSearchInfo.java b/server/src/main/java/org/elasticsearch/index/mapper/TextSearchInfo.java
index 744ebc6d6013a..e0cb747f48ceb 100644
--- a/server/src/main/java/org/elasticsearch/index/mapper/TextSearchInfo.java
+++ b/server/src/main/java/org/elasticsearch/index/mapper/TextSearchInfo.java
@@ -28,12 +28,13 @@ public record TextSearchInfo(
NamedAnalyzer searchQuoteAnalyzer
) {
- private static final FieldType SIMPLE_MATCH_ONLY_FIELD_TYPE = new FieldType();
+ private static final FieldType SIMPLE_MATCH_ONLY_FIELD_TYPE;
static {
- SIMPLE_MATCH_ONLY_FIELD_TYPE.setTokenized(false);
- SIMPLE_MATCH_ONLY_FIELD_TYPE.setOmitNorms(true);
- SIMPLE_MATCH_ONLY_FIELD_TYPE.freeze();
+ FieldType ft = new FieldType();
+ ft.setTokenized(false);
+ ft.setOmitNorms(true);
+ SIMPLE_MATCH_ONLY_FIELD_TYPE = Mapper.freezeAndDeduplicateFieldType(ft);
}
/**
@@ -100,7 +101,7 @@ public TextSearchInfo(
NamedAnalyzer searchAnalyzer,
NamedAnalyzer searchQuoteAnalyzer
) {
- this.luceneFieldType = luceneFieldType;
+ this.luceneFieldType = Mapper.freezeAndDeduplicateFieldType(luceneFieldType);
this.similarity = similarity;
this.searchAnalyzer = Objects.requireNonNull(searchAnalyzer);
this.searchQuoteAnalyzer = Objects.requireNonNull(searchQuoteAnalyzer);
diff --git a/server/src/main/java/org/elasticsearch/index/mapper/flattened/FlattenedFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/flattened/FlattenedFieldMapper.java
index 35f2d37fa0dd6..51be6290df657 100644
--- a/server/src/main/java/org/elasticsearch/index/mapper/flattened/FlattenedFieldMapper.java
+++ b/server/src/main/java/org/elasticsearch/index/mapper/flattened/FlattenedFieldMapper.java
@@ -208,7 +208,6 @@ public FlattenedFieldMapper build(MapperBuilderContext context) {
if (multiFields.iterator().hasNext()) {
throw new IllegalArgumentException(CONTENT_TYPE + " field [" + name + "] does not support [fields]");
}
- CopyTo copyTo = this.copyTo.build();
if (copyTo.copyToFields().isEmpty() == false) {
throw new IllegalArgumentException(CONTENT_TYPE + " field [" + name + "] does not support [copy_to]");
}
diff --git a/server/src/main/java/org/elasticsearch/index/mapper/vectors/DenseVectorFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/vectors/DenseVectorFieldMapper.java
index 155dac09f0b86..0e4f871fbb8ca 100644
--- a/server/src/main/java/org/elasticsearch/index/mapper/vectors/DenseVectorFieldMapper.java
+++ b/server/src/main/java/org/elasticsearch/index/mapper/vectors/DenseVectorFieldMapper.java
@@ -26,7 +26,6 @@
import org.apache.lucene.search.KnnFloatVectorQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.util.BytesRef;
-import org.apache.lucene.util.VectorUtil;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.fielddata.FieldDataContext;
@@ -57,7 +56,6 @@
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.time.ZoneId;
-import java.util.Arrays;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
@@ -71,10 +69,8 @@
* A {@link FieldMapper} for indexing a dense vector of floats.
*/
public class DenseVectorFieldMapper extends FieldMapper {
- private static final float EPS = 1e-4f;
public static final IndexVersion MAGNITUDE_STORED_INDEX_VERSION = IndexVersion.V_7_5_0;
public static final IndexVersion INDEXED_BY_DEFAULT_INDEX_VERSION = IndexVersion.V_8_11_0;
- public static final IndexVersion DOT_PRODUCT_AUTO_NORMALIZED = IndexVersion.V_8_11_0;
public static final IndexVersion LITTLE_ENDIAN_FLOAT_STORED_INDEX_VERSION = IndexVersion.V_8_9_0;
public static final String CONTENT_TYPE = "dense_vector";
@@ -194,7 +190,7 @@ public DenseVectorFieldMapper build(MapperBuilderContext context) {
indexOptions.getValue(),
indexVersionCreated,
multiFieldsBuilder.build(this, context),
- copyTo.build()
+ copyTo
);
}
}
@@ -325,7 +321,6 @@ public void checkVectorBounds(float[] vector) {
@Override
void checkVectorMagnitude(
- IndexVersion indexVersion,
VectorSimilarity similarity,
Function appender,
float squaredMagnitude
@@ -388,12 +383,7 @@ public Field parseKnnVector(DocumentParserContext context, DenseVectorFieldMappe
squaredMagnitude += value * value;
}
fieldMapper.checkDimensionMatches(index, context);
- checkVectorMagnitude(
- fieldMapper.indexCreatedVersion,
- fieldMapper.similarity,
- errorByteElementsAppender(vector),
- squaredMagnitude
- );
+ checkVectorMagnitude(fieldMapper.similarity, errorByteElementsAppender(vector), squaredMagnitude);
return createKnnVectorField(fieldMapper.fieldType().name(), vector, fieldMapper.similarity.function);
}
@@ -485,31 +475,20 @@ public void checkVectorBounds(float[] vector) {
@Override
void checkVectorMagnitude(
- IndexVersion indexVersion,
VectorSimilarity similarity,
Function appender,
float squaredMagnitude
) {
StringBuilder errorBuilder = null;
- if (indexVersion.before(DOT_PRODUCT_AUTO_NORMALIZED)) {
- if (similarity == VectorSimilarity.DOT_PRODUCT && Math.abs(squaredMagnitude - 1.0f) > EPS) {
- errorBuilder = new StringBuilder(
- "The [" + VectorSimilarity.DOT_PRODUCT + "] similarity can only be used with unit-length vectors."
- );
- }
- if (similarity == VectorSimilarity.COSINE && Math.sqrt(squaredMagnitude) == 0.0f) {
- errorBuilder = new StringBuilder(
- "The [" + similarity + "] similarity does not support vectors with zero magnitude."
- );
- }
- } else {
- if ((similarity == VectorSimilarity.COSINE || similarity == VectorSimilarity.DOT_PRODUCT)
- && Math.sqrt(squaredMagnitude) == 0.0f) {
- errorBuilder = new StringBuilder(
- "The [" + similarity + "] similarity does not support vectors with zero magnitude."
- );
- }
+ if (similarity == VectorSimilarity.DOT_PRODUCT && Math.abs(squaredMagnitude - 1.0f) > 1e-4f) {
+ errorBuilder = new StringBuilder(
+ "The [" + VectorSimilarity.DOT_PRODUCT + "] similarity can only be used with unit-length vectors."
+ );
+ } else if (similarity == VectorSimilarity.COSINE && Math.sqrt(squaredMagnitude) == 0.0f) {
+ errorBuilder = new StringBuilder(
+ "The [" + VectorSimilarity.COSINE + "] similarity does not support vectors with zero magnitude."
+ );
}
if (errorBuilder != null) {
@@ -532,15 +511,7 @@ public Field parseKnnVector(DocumentParserContext context, DenseVectorFieldMappe
}
fieldMapper.checkDimensionMatches(index, context);
checkVectorBounds(vector);
- checkVectorMagnitude(
- fieldMapper.indexCreatedVersion,
- fieldMapper.similarity,
- errorFloatElementsAppender(vector),
- squaredMagnitude
- );
- if (fieldMapper.indexCreatedVersion.onOrAfter(DOT_PRODUCT_AUTO_NORMALIZED)) {
- fieldMapper.similarity.floatPreprocessing(vector, squaredMagnitude);
- }
+ checkVectorMagnitude(fieldMapper.similarity, errorFloatElementsAppender(vector), squaredMagnitude);
return createKnnVectorField(fieldMapper.fieldType().name(), vector, fieldMapper.similarity.function);
}
@@ -598,7 +569,6 @@ abstract double parseKnnVectorToByteBuffer(DocumentParserContext context, DenseV
public abstract void checkVectorBounds(float[] vector);
abstract void checkVectorMagnitude(
- IndexVersion indexVersion,
VectorSimilarity similarity,
Function errorElementsAppender,
float squaredMagnitude
@@ -717,21 +687,6 @@ float score(float similarity, ElementType elementType, int dim) {
case FLOAT -> (1 + similarity) / 2f;
};
}
-
- @Override
- void floatPreprocessing(float[] vector, float squareSum) {
- if (squareSum == 0) {
- throw new IllegalArgumentException("Cannot normalize a zero-length vector");
- }
- // Vector already has a magnitude have `1`
- if (Math.abs(squareSum - 1.0f) < EPS) {
- return;
- }
- float length = (float) Math.sqrt(squareSum);
- for (int i = 0; i < vector.length; i++) {
- vector[i] /= length;
- }
- }
};
public final VectorSimilarityFunction function;
@@ -746,8 +701,6 @@ public final String toString() {
}
abstract float score(float similarity, ElementType elementType, int dim);
-
- void floatPreprocessing(float[] vector, float squareSum) {}
}
private abstract static class IndexOptions implements ToXContent {
@@ -906,13 +859,11 @@ public Query createKnnQuery(byte[] queryVector, int numCands, Query filter, Floa
}
if (similarity == VectorSimilarity.DOT_PRODUCT || similarity == VectorSimilarity.COSINE) {
- int squaredMagnitude = VectorUtil.dotProduct(queryVector, queryVector);
- elementType.checkVectorMagnitude(
- indexVersionCreated,
- similarity,
- elementType.errorByteElementsAppender(queryVector),
- squaredMagnitude
- );
+ float squaredMagnitude = 0.0f;
+ for (byte b : queryVector) {
+ squaredMagnitude += b * b;
+ }
+ elementType.checkVectorMagnitude(similarity, elementType.errorByteElementsAppender(queryVector), squaredMagnitude);
}
Query knnQuery = new KnnByteVectorQuery(name(), queryVector, numCands, filter);
if (similarityThreshold != null) {
@@ -940,22 +891,11 @@ public Query createKnnQuery(float[] queryVector, int numCands, Query filter, Flo
elementType.checkVectorBounds(queryVector);
if (similarity == VectorSimilarity.DOT_PRODUCT || similarity == VectorSimilarity.COSINE) {
- float squaredMagnitude = VectorUtil.dotProduct(queryVector, queryVector);
- elementType.checkVectorMagnitude(
- indexVersionCreated,
- similarity,
- elementType.errorFloatElementsAppender(queryVector),
- squaredMagnitude
- );
- // We don't want to normalize the original query vector.
- // It mutates it in place and might cause down stream weirdness
- // Instead we copy the value and then normalize that copy
- if (similarity == VectorSimilarity.DOT_PRODUCT
- && elementType == ElementType.FLOAT
- && indexVersionCreated.onOrAfter(DOT_PRODUCT_AUTO_NORMALIZED)) {
- queryVector = Arrays.copyOf(queryVector, queryVector.length);
- similarity.floatPreprocessing(queryVector, squaredMagnitude);
+ float squaredMagnitude = 0.0f;
+ for (float e : queryVector) {
+ squaredMagnitude += e * e;
}
+ elementType.checkVectorMagnitude(similarity, elementType.errorFloatElementsAppender(queryVector), squaredMagnitude);
}
Query knnQuery = switch (elementType) {
case BYTE -> {
diff --git a/server/src/main/java/org/elasticsearch/index/mapper/vectors/SparseVectorFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/vectors/SparseVectorFieldMapper.java
index 082c2d898e637..758ad8e508ea3 100644
--- a/server/src/main/java/org/elasticsearch/index/mapper/vectors/SparseVectorFieldMapper.java
+++ b/server/src/main/java/org/elasticsearch/index/mapper/vectors/SparseVectorFieldMapper.java
@@ -66,7 +66,7 @@ public SparseVectorFieldMapper build(MapperBuilderContext context) {
name,
new SparseVectorFieldType(context.buildFullName(name), meta.getValue()),
multiFieldsBuilder.build(this, context),
- copyTo.build()
+ copyTo
);
}
}
diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java
index 21b8ca31f01a7..eac119f920f6a 100644
--- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java
+++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java
@@ -79,7 +79,7 @@ public PeerRecoverySourceService(
// node. Upon receiving START_RECOVERY, the source node will initiate the peer recovery.
transportService.registerRequestHandler(
Actions.START_RECOVERY,
- ThreadPool.Names.GENERIC,
+ transportService.getThreadPool().executor(ThreadPool.Names.GENERIC),
StartRecoveryRequest::new,
(request, channel, task) -> recover(request, task, new ChannelActionListener<>(channel))
);
@@ -89,7 +89,7 @@ public PeerRecoverySourceService(
// action will fail and the target node will send a new START_RECOVERY request.
transportService.registerRequestHandler(
Actions.REESTABLISH_RECOVERY,
- ThreadPool.Names.GENERIC,
+ transportService.getThreadPool().executor(ThreadPool.Names.GENERIC),
ReestablishRecoveryRequest::new,
(request, channel, task) -> reestablish(request, new ChannelActionListener<>(channel))
);
diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java
index 5f06519f84c0c..b3b23ac14d158 100644
--- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java
+++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java
@@ -122,7 +122,7 @@ public PeerRecoveryTargetService(
transportService.registerRequestHandler(
Actions.FILES_INFO,
- ThreadPool.Names.GENERIC,
+ threadPool.executor(ThreadPool.Names.GENERIC),
RecoveryFilesInfoRequest::new,
new RecoveryRequestHandler<>() {
@Override
@@ -140,7 +140,7 @@ protected void handleRequest(RecoveryFilesInfoRequest request, RecoveryTarget ta
);
transportService.registerRequestHandler(
Actions.RESTORE_FILE_FROM_SNAPSHOT,
- ThreadPool.Names.GENERIC,
+ threadPool.executor(ThreadPool.Names.GENERIC),
RecoverySnapshotFileRequest::new,
new RecoveryRequestHandler<>() {
@Override
@@ -151,13 +151,13 @@ protected void handleRequest(RecoverySnapshotFileRequest request, RecoveryTarget
);
transportService.registerRequestHandler(
Actions.FILE_CHUNK,
- ThreadPool.Names.GENERIC,
+ threadPool.executor(ThreadPool.Names.GENERIC),
RecoveryFileChunkRequest::new,
new FileChunkTransportRequestHandler()
);
transportService.registerRequestHandler(
Actions.CLEAN_FILES,
- ThreadPool.Names.GENERIC,
+ threadPool.executor(ThreadPool.Names.GENERIC),
RecoveryCleanFilesRequest::new,
new RecoveryRequestHandler<>() {
@Override
@@ -179,7 +179,7 @@ protected void handleRequest(RecoveryCleanFilesRequest request, RecoveryTarget t
);
transportService.registerRequestHandler(
Actions.PREPARE_TRANSLOG,
- ThreadPool.Names.GENERIC,
+ threadPool.executor(ThreadPool.Names.GENERIC),
RecoveryPrepareForTranslogOperationsRequest::new,
new RecoveryRequestHandler<>() {
@Override
@@ -194,13 +194,13 @@ protected void handleRequest(
);
transportService.registerRequestHandler(
Actions.TRANSLOG_OPS,
- ThreadPool.Names.GENERIC,
+ threadPool.executor(ThreadPool.Names.GENERIC),
RecoveryTranslogOperationsRequest::new,
new TranslogOperationsRequestHandler()
);
transportService.registerRequestHandler(
Actions.FINALIZE,
- ThreadPool.Names.GENERIC,
+ threadPool.executor(ThreadPool.Names.GENERIC),
RecoveryFinalizeRecoveryRequest::new,
new RecoveryRequestHandler<>() {
@Override
@@ -215,7 +215,7 @@ protected void handleRequest(
);
transportService.registerRequestHandler(
Actions.HANDOFF_PRIMARY_CONTEXT,
- ThreadPool.Names.GENERIC,
+ threadPool.executor(ThreadPool.Names.GENERIC),
RecoveryHandoffPrimaryContextRequest::new,
new HandoffPrimaryContextRequestHandler()
);
diff --git a/server/src/main/java/org/elasticsearch/indices/store/IndicesStore.java b/server/src/main/java/org/elasticsearch/indices/store/IndicesStore.java
index e12a38e383397..f949dd59e8968 100644
--- a/server/src/main/java/org/elasticsearch/indices/store/IndicesStore.java
+++ b/server/src/main/java/org/elasticsearch/indices/store/IndicesStore.java
@@ -31,6 +31,7 @@
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
@@ -102,7 +103,7 @@ public IndicesStore(
this.threadPool = threadPool;
transportService.registerRequestHandler(
ACTION_SHARD_EXISTS,
- ThreadPool.Names.SAME,
+ EsExecutors.DIRECT_EXECUTOR_SERVICE,
ShardActiveRequest::new,
new ShardActiveRequestHandler()
);
diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java
index 25827018e44a7..66931c7791bc8 100644
--- a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java
+++ b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java
@@ -9,6 +9,7 @@
package org.elasticsearch.ingest;
import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.index.VersionType;
@@ -94,14 +95,26 @@ public IngestDocument(String index, String id, long version, String routing, Ver
/**
* Copy constructor that creates a new {@link IngestDocument} which has exactly the same properties as the one provided.
+ *
+ * @throws IllegalArgumentException if the passed-in ingest document references itself
*/
public IngestDocument(IngestDocument other) {
this(
- new IngestCtxMap(deepCopyMap(other.ctxMap.getSource()), other.ctxMap.getMetadata().clone()),
+ new IngestCtxMap(deepCopyMap(ensureNoSelfReferences(other.ctxMap.getSource())), other.ctxMap.getMetadata().clone()),
deepCopyMap(other.ingestMetadata)
);
}
+ /**
+ * Internal helper utility method to get around the issue that a {@code this(...) } constructor call must be the first statement
+ * in a constructor. This is only for use in the {@link IngestDocument#IngestDocument(IngestDocument)} copy constructor, it's not a
+ * general purpose method.
+ */
+ private static Map ensureNoSelfReferences(Map source) {
+ CollectionUtils.ensureNoSelfReferences(source, null);
+ return source;
+ }
+
/**
* Constructor to create an IngestDocument from its constituent maps. The maps are shallow copied.
*/
diff --git a/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java b/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java
index f09b2c12c99dd..5bd811962a4c4 100644
--- a/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java
+++ b/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java
@@ -98,7 +98,7 @@ public void execute(IngestDocument ingestDocument, BiConsumer persistedClusterStateServiceFactories = pluginsService
.filterPlugins(ClusterCoordinationPlugin.class)
@@ -1374,7 +1376,7 @@ private PersistedClusterStateService newPersistedClusterStateService(
if (persistedClusterStateServiceFactories.size() == 1) {
return persistedClusterStateServiceFactories.get(0)
- .newPersistedClusterStateService(nodeEnvironment, xContentRegistry, clusterSettings, threadPool);
+ .newPersistedClusterStateService(nodeEnvironment, xContentRegistry, clusterSettings, threadPool, compatibilityVersions);
}
return new PersistedClusterStateService(nodeEnvironment, xContentRegistry, clusterSettings, threadPool::relativeTimeInMillis);
diff --git a/server/src/main/java/org/elasticsearch/plugins/ClusterCoordinationPlugin.java b/server/src/main/java/org/elasticsearch/plugins/ClusterCoordinationPlugin.java
index b5c0536554952..14305e1a8a04f 100644
--- a/server/src/main/java/org/elasticsearch/plugins/ClusterCoordinationPlugin.java
+++ b/server/src/main/java/org/elasticsearch/plugins/ClusterCoordinationPlugin.java
@@ -15,6 +15,7 @@
import org.elasticsearch.cluster.coordination.PreVoteCollector;
import org.elasticsearch.cluster.coordination.Reconfigurator;
import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.version.CompatibilityVersions;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.NodeEnvironment;
@@ -76,12 +77,24 @@ CoordinationState.PersistedState createPersistedState(
}
interface PersistedClusterStateServiceFactory {
+
+ @Deprecated(forRemoval = true)
PersistedClusterStateService newPersistedClusterStateService(
NodeEnvironment nodeEnvironment,
NamedXContentRegistry xContentRegistry,
ClusterSettings clusterSettings,
ThreadPool threadPool
);
+
+ default PersistedClusterStateService newPersistedClusterStateService(
+ NodeEnvironment nodeEnvironment,
+ NamedXContentRegistry xContentRegistry,
+ ClusterSettings clusterSettings,
+ ThreadPool threadPool,
+ CompatibilityVersions compatibilityVersions
+ ) {
+ return newPersistedClusterStateService(nodeEnvironment, xContentRegistry, clusterSettings, threadPool);
+ }
}
interface ReconfiguratorFactory {
diff --git a/server/src/main/java/org/elasticsearch/repositories/VerifyNodeRepositoryAction.java b/server/src/main/java/org/elasticsearch/repositories/VerifyNodeRepositoryAction.java
index ad6cbbaad1cdc..d940415c38916 100644
--- a/server/src/main/java/org/elasticsearch/repositories/VerifyNodeRepositoryAction.java
+++ b/server/src/main/java/org/elasticsearch/repositories/VerifyNodeRepositoryAction.java
@@ -56,7 +56,7 @@ public VerifyNodeRepositoryAction(
this.repositoriesService = repositoriesService;
transportService.registerRequestHandler(
ACTION_NAME,
- ThreadPool.Names.SNAPSHOT,
+ transportService.getThreadPool().executor(ThreadPool.Names.SNAPSHOT),
VerifyNodeRepositoryRequest::new,
new VerifyNodeRepositoryRequestHandler()
);
diff --git a/server/src/main/java/org/elasticsearch/search/SearchModule.java b/server/src/main/java/org/elasticsearch/search/SearchModule.java
index babce70239871..111bec2c88509 100644
--- a/server/src/main/java/org/elasticsearch/search/SearchModule.java
+++ b/server/src/main/java/org/elasticsearch/search/SearchModule.java
@@ -317,9 +317,7 @@ public SearchModule(Settings settings, List plugins) {
registerIntervalsSourceProviders();
requestCacheKeyDifferentiator = registerRequestCacheKeyDifferentiator(plugins);
namedWriteables.addAll(SortValue.namedWriteables());
- registerGenericNamedWriteable(
- new SearchPlugin.GenericNamedWriteableSpec(GeoBoundingBox.class.getSimpleName(), GeoBoundingBox::new)
- );
+ registerGenericNamedWriteable(new SearchPlugin.GenericNamedWriteableSpec("GeoBoundingBox", GeoBoundingBox::new));
}
public List getNamedWriteables() {
diff --git a/server/src/main/java/org/elasticsearch/tasks/TaskCancellationService.java b/server/src/main/java/org/elasticsearch/tasks/TaskCancellationService.java
index 39a76f3508daa..6ab95072727c0 100644
--- a/server/src/main/java/org/elasticsearch/tasks/TaskCancellationService.java
+++ b/server/src/main/java/org/elasticsearch/tasks/TaskCancellationService.java
@@ -23,6 +23,7 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.NodeDisconnectedException;
@@ -60,13 +61,13 @@ public TaskCancellationService(TransportService transportService) {
this.deduplicator = new ResultDeduplicator<>(transportService.getThreadPool().getThreadContext());
transportService.registerRequestHandler(
BAN_PARENT_ACTION_NAME,
- ThreadPool.Names.SAME,
+ EsExecutors.DIRECT_EXECUTOR_SERVICE,
BanParentTaskRequest::new,
new BanParentRequestHandler()
);
transportService.registerRequestHandler(
CANCEL_CHILD_ACTION_NAME,
- ThreadPool.Names.SAME,
+ EsExecutors.DIRECT_EXECUTOR_SERVICE,
CancelChildRequest::new,
new CancelChildRequestHandler()
);
diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java
index 9542d4b366ded..c38f4b26c665f 100644
--- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java
+++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java
@@ -27,6 +27,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.CountDown;
+import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.node.ReportingService;
@@ -539,7 +540,7 @@ Collection getConnections() {
static void registerRemoteClusterHandshakeRequestHandler(TransportService transportService) {
transportService.registerRequestHandler(
REMOTE_CLUSTER_HANDSHAKE_ACTION_NAME,
- ThreadPool.Names.SAME,
+ EsExecutors.DIRECT_EXECUTOR_SERVICE,
false,
false,
TransportService.HandshakeRequest::new,
diff --git a/server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java b/server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java
index ed3432f66087b..aa28f8a76b58e 100644
--- a/server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java
+++ b/server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java
@@ -12,6 +12,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
@@ -188,7 +189,7 @@ public static void registerProxyActionWithDynamicResponseType(
RequestHandlerRegistry extends TransportRequest> requestHandler = service.getRequestHandler(action);
service.registerRequestHandler(
getProxyAction(action),
- ThreadPool.Names.SAME,
+ EsExecutors.DIRECT_EXECUTOR_SERVICE,
true,
false,
in -> cancellable
diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java
index 83fc21396e0f6..5369b9a9eec13 100644
--- a/server/src/main/java/org/elasticsearch/transport/TransportService.java
+++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java
@@ -1118,46 +1118,6 @@ public static boolean isValidActionName(String actionName) {
return false;
}
- /**
- * Temporary passthrough function that continues to take a String rather than Executor type.
- *
- * @param action
- * @param executor
- * @param requestReader
- * @param handler
- * @param
- */
- public void registerRequestHandler(
- String action,
- String executor,
- Writeable.Reader requestReader,
- TransportRequestHandler handler
- ) {
- registerRequestHandler(action, threadPool.executor(executor), requestReader, handler);
- }
-
- /**
- * Temporary passthrough function that continues to take a String rather than Executor type.
- *
- * @param action
- * @param executor
- * @param forceExecution
- * @param canTripCircuitBreaker
- * @param requestReader
- * @param handler
- * @param
- */
- public void registerRequestHandler(
- String action,
- String executor,
- boolean forceExecution,
- boolean canTripCircuitBreaker,
- Writeable.Reader requestReader,
- TransportRequestHandler handler
- ) {
- registerRequestHandler(action, threadPool.executor(executor), forceExecution, canTripCircuitBreaker, requestReader, handler);
- }
-
/**
* Registers a new request handler
*
diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/TransportDeleteDesiredBalanceActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/TransportDeleteDesiredBalanceActionTests.java
index f5e812e7d0983..a2bfa68fdf25f 100644
--- a/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/TransportDeleteDesiredBalanceActionTests.java
+++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/TransportDeleteDesiredBalanceActionTests.java
@@ -38,6 +38,7 @@
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ClusterServiceUtils;
+import org.elasticsearch.test.MockUtils;
import org.elasticsearch.test.gateway.TestGatewayAllocator;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
@@ -56,17 +57,20 @@
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class TransportDeleteDesiredBalanceActionTests extends ESAllocationTestCase {
public void testReturnsErrorIfAllocatorIsNotDesiredBalanced() throws Exception {
-
var listener = new PlainActionFuture();
+ ThreadPool threadPool = mock(ThreadPool.class);
+ TransportService transportService = MockUtils.setupTransportServiceWithThreadpoolExecutor(threadPool);
+
new TransportDeleteDesiredBalanceAction(
- mock(TransportService.class),
+ transportService,
mock(ClusterService.class),
- mock(ThreadPool.class),
+ threadPool,
mock(ActionFilters.class),
mock(IndexNameExpressionResolver.class),
mock(AllocationService.class),
@@ -131,8 +135,12 @@ public DesiredBalance compute(
var listener = new PlainActionFuture();
+ // TODO: temporary, remove in #97879
+ TransportService transportService = mock(TransportService.class);
+ when(transportService.getThreadPool()).thenReturn(threadPool);
+
var action = new TransportDeleteDesiredBalanceAction(
- mock(TransportService.class),
+ transportService,
clusterService,
threadPool,
mock(ActionFilters.class),
diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/TransportGetDesiredBalanceActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/TransportGetDesiredBalanceActionTests.java
index 804dbf66ac0d0..80f0b435645e6 100644
--- a/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/TransportGetDesiredBalanceActionTests.java
+++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/TransportGetDesiredBalanceActionTests.java
@@ -36,6 +36,7 @@
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.shard.ShardId;
@@ -45,6 +46,7 @@
import org.elasticsearch.test.AbstractChunkedSerializingTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
+import org.junit.Before;
import java.util.HashMap;
import java.util.List;
@@ -59,6 +61,7 @@
import static org.elasticsearch.cluster.ClusterModule.SHARDS_ALLOCATOR_TYPE_SETTING;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -66,16 +69,27 @@ public class TransportGetDesiredBalanceActionTests extends ESAllocationTestCase
private final DesiredBalanceShardsAllocator desiredBalanceShardsAllocator = mock(DesiredBalanceShardsAllocator.class);
private final ClusterInfoService clusterInfoService = mock(ClusterInfoService.class);
- private final TransportGetDesiredBalanceAction transportGetDesiredBalanceAction = new TransportGetDesiredBalanceAction(
- mock(TransportService.class),
- mock(ClusterService.class),
- mock(ThreadPool.class),
- mock(ActionFilters.class),
- mock(IndexNameExpressionResolver.class),
- desiredBalanceShardsAllocator,
- clusterInfoService,
- TEST_WRITE_LOAD_FORECASTER
- );
+ private TransportService transportService = mock(TransportService.class);
+ private ThreadPool threadPool = mock(ThreadPool.class);
+ private TransportGetDesiredBalanceAction transportGetDesiredBalanceAction;
+
+ @Before
+ public void initialize() {
+ // TODO: temporary, remove in #97879
+ when(transportService.getThreadPool()).thenReturn(threadPool);
+ when(threadPool.executor(anyString())).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE);
+
+ transportGetDesiredBalanceAction = new TransportGetDesiredBalanceAction(
+ transportService,
+ mock(ClusterService.class),
+ threadPool,
+ mock(ActionFilters.class),
+ mock(IndexNameExpressionResolver.class),
+ desiredBalanceShardsAllocator,
+ clusterInfoService,
+ TEST_WRITE_LOAD_FORECASTER
+ );
+ }
private static DesiredBalanceResponse execute(TransportGetDesiredBalanceAction action, ClusterState clusterState) throws Exception {
return PlainActionFuture.get(
@@ -96,11 +110,10 @@ private DesiredBalanceResponse executeAction(ClusterState clusterState) throws E
public void testReturnsErrorIfAllocatorIsNotDesiredBalanced() throws Exception {
var clusterState = ClusterState.builder(ClusterName.DEFAULT).metadata(metadataWithConfiguredAllocator(BALANCED_ALLOCATOR)).build();
-
final var action = new TransportGetDesiredBalanceAction(
- mock(TransportService.class),
+ transportService,
mock(ClusterService.class),
- mock(ThreadPool.class),
+ threadPool,
mock(ActionFilters.class),
mock(IndexNameExpressionResolver.class),
mock(ShardsAllocator.class),
diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/coordination/ClusterFormationInfoActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/coordination/ClusterFormationInfoActionTests.java
index e560959dfacf8..f190c422e165c 100644
--- a/server/src/test/java/org/elasticsearch/action/admin/cluster/coordination/ClusterFormationInfoActionTests.java
+++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/coordination/ClusterFormationInfoActionTests.java
@@ -17,6 +17,7 @@
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.monitor.StatusInfo;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.EqualsHashCodeTestUtils;
@@ -30,6 +31,7 @@
import java.util.List;
import java.util.Map;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -162,6 +164,10 @@ public void testTransportDoExecute() {
Coordinator coordinator = mock(Coordinator.class);
ClusterFormationFailureHelper.ClusterFormationState clusterFormationState = getClusterFormationState();
when(coordinator.getClusterFormationState()).thenReturn(clusterFormationState);
+
+ // TODO: temporary, remove in #97879
+ when(transportService.getThreadPool()).thenReturn(threadPool);
+ when(threadPool.executor(anyString())).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE);
ClusterFormationInfoAction.TransportAction action = new ClusterFormationInfoAction.TransportAction(
transportService,
actionFilters,
diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/coordination/MasterHistoryActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/coordination/MasterHistoryActionTests.java
index 3674a711d8968..f362327a3d8e4 100644
--- a/server/src/test/java/org/elasticsearch/action/admin/cluster/coordination/MasterHistoryActionTests.java
+++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/coordination/MasterHistoryActionTests.java
@@ -16,6 +16,7 @@
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.EqualsHashCodeTestUtils;
import org.elasticsearch.threadpool.ThreadPool;
@@ -24,6 +25,7 @@
import java.util.ArrayList;
import java.util.List;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -83,6 +85,10 @@ public void testTransportDoExecute() {
when(threadPool.relativeTimeInMillis()).thenReturn(System.currentTimeMillis());
MasterHistory masterHistory = new MasterHistory(threadPool, clusterService);
when(masterHistoryService.getLocalMasterHistory()).thenReturn(masterHistory);
+
+ // TODO: temporary, remove in #97879
+ when(transportService.getThreadPool()).thenReturn(threadPool);
+ when(threadPool.executor(anyString())).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE);
MasterHistoryAction.TransportAction action = new MasterHistoryAction.TransportAction(
transportService,
actionFilters,
diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/desirednodes/TransportUpdateDesiredNodesActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/desirednodes/TransportUpdateDesiredNodesActionTests.java
index e4e8293810f56..508cb9b304c59 100644
--- a/server/src/test/java/org/elasticsearch/action/admin/cluster/desirednodes/TransportUpdateDesiredNodesActionTests.java
+++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/desirednodes/TransportUpdateDesiredNodesActionTests.java
@@ -27,6 +27,7 @@
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.tasks.Task;
+import org.elasticsearch.test.MockUtils;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@@ -54,10 +55,12 @@ public void validate(List desiredNodes) {}
};
public void testWriteBlocks() {
+ ThreadPool threadPool = mock(ThreadPool.class);
+ TransportService transportService = MockUtils.setupTransportServiceWithThreadpoolExecutor(threadPool);
final TransportUpdateDesiredNodesAction action = new TransportUpdateDesiredNodesAction(
- mock(TransportService.class),
+ transportService,
mock(ClusterService.class),
- mock(ThreadPool.class),
+ threadPool,
mock(ActionFilters.class),
mock(IndexNameExpressionResolver.class),
NO_OP_SETTINGS_VALIDATOR,
@@ -79,10 +82,12 @@ public void testWriteBlocks() {
}
public void testNoBlocks() {
+ ThreadPool threadPool = mock(ThreadPool.class);
+ TransportService transportService = MockUtils.setupTransportServiceWithThreadpoolExecutor(threadPool);
final TransportUpdateDesiredNodesAction action = new TransportUpdateDesiredNodesAction(
- mock(TransportService.class),
+ transportService,
mock(ClusterService.class),
- mock(ThreadPool.class),
+ threadPool,
mock(ActionFilters.class),
mock(IndexNameExpressionResolver.class),
NO_OP_SETTINGS_VALIDATOR,
@@ -103,10 +108,13 @@ public void validate(List desiredNodes) {
}
};
ClusterService clusterService = mock(ClusterService.class);
+
+ ThreadPool threadPool = mock(ThreadPool.class);
+ TransportService transportService = MockUtils.setupTransportServiceWithThreadpoolExecutor(threadPool);
final TransportUpdateDesiredNodesAction action = new TransportUpdateDesiredNodesAction(
- mock(TransportService.class),
+ transportService,
clusterService,
- mock(ThreadPool.class),
+ threadPool,
mock(ActionFilters.class),
mock(IndexNameExpressionResolver.class),
validator,
diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/repositories/reservedstate/ReservedRepositoryActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/repositories/reservedstate/ReservedRepositoryActionTests.java
index 3b3a996e50cd3..1b3e0ae6fe7bf 100644
--- a/server/src/test/java/org/elasticsearch/action/admin/cluster/repositories/reservedstate/ReservedRepositoryActionTests.java
+++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/repositories/reservedstate/ReservedRepositoryActionTests.java
@@ -20,6 +20,7 @@
import org.elasticsearch.repositories.RepositoryMissingException;
import org.elasticsearch.reservedstate.TransformState;
import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.test.MockUtils;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xcontent.XContentParser;
@@ -132,14 +133,16 @@ public Repository create(RepositoryMetadata metadata) {
}
};
+ ThreadPool threadPool = mock(ThreadPool.class);
+ TransportService transportService = MockUtils.setupTransportServiceWithThreadpoolExecutor(threadPool);
RepositoriesService repositoriesService = spy(
new RepositoriesService(
Settings.EMPTY,
mock(ClusterService.class),
- mock(TransportService.class),
+ transportService,
Map.of(),
Map.of("fs", fsFactory),
- mock(ThreadPool.class),
+ threadPool,
null
)
);
diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteResponseTests.java
index d2a482fa58b0e..80ff2168f6344 100644
--- a/server/src/test/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteResponseTests.java
+++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteResponseTests.java
@@ -126,12 +126,6 @@ public void testToXContentWithDeprecatedClusterState() {
"max_index_version": %s
}
},
- "transport_versions": [
- {
- "node_id": "node0",
- "transport_version": "8000099"
- }
- ],
"nodes_versions": [
{
"node_id": "node0",
diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/settings/ClusterGetSettingsTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/settings/ClusterGetSettingsTests.java
index 9e757894bcdda..00b500254883c 100644
--- a/server/src/test/java/org/elasticsearch/action/admin/cluster/settings/ClusterGetSettingsTests.java
+++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/settings/ClusterGetSettingsTests.java
@@ -17,6 +17,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.test.MockUtils;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@@ -47,10 +48,12 @@ public void testRequestConstruction() {
public void testTransportFilters() throws Exception {
final SettingsFilter filter = new SettingsFilter(List.of("persistent.foo.filtered", "transient.foo.filtered"));
+ ThreadPool threadPool = mock(ThreadPool.class);
+ TransportService transportService = MockUtils.setupTransportServiceWithThreadpoolExecutor(threadPool);
TransportClusterGetSettingsAction action = new TransportClusterGetSettingsAction(
- mock(TransportService.class),
+ transportService,
mock(ClusterService.class),
- mock(ThreadPool.class),
+ threadPool,
filter,
mock(ActionFilters.class),
mock(IndexNameExpressionResolver.class)
diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/settings/ClusterUpdateSettingsRequestTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/settings/ClusterUpdateSettingsRequestTests.java
index ebe77f4d3ed4c..6373b94ffb94a 100644
--- a/server/src/test/java/org/elasticsearch/action/admin/cluster/settings/ClusterUpdateSettingsRequestTests.java
+++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/settings/ClusterUpdateSettingsRequestTests.java
@@ -16,6 +16,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.reservedstate.action.ReservedClusterSettingsAction;
import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.test.MockUtils;
import org.elasticsearch.test.XContentTestUtils;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@@ -85,10 +86,12 @@ private static ClusterUpdateSettingsRequest createTestItem() {
public void testOperatorHandler() throws IOException {
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
+ final ThreadPool threadPool = mock(ThreadPool.class);
+ TransportService transportService = MockUtils.setupTransportServiceWithThreadpoolExecutor(threadPool);
TransportClusterUpdateSettingsAction action = new TransportClusterUpdateSettingsAction(
- mock(TransportService.class),
+ transportService,
mock(ClusterService.class),
- mock(ThreadPool.class),
+ threadPool,
mock(ActionFilters.class),
mock(IndexNameExpressionResolver.class),
clusterSettings
diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/create/TransportCreateIndexActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/create/TransportCreateIndexActionTests.java
index 671bf13fb0e77..d68641d04dd74 100644
--- a/server/src/test/java/org/elasticsearch/action/admin/indices/create/TransportCreateIndexActionTests.java
+++ b/server/src/test/java/org/elasticsearch/action/admin/indices/create/TransportCreateIndexActionTests.java
@@ -24,6 +24,7 @@
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.test.MockUtils;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.junit.Before;
@@ -90,10 +91,13 @@ public void setUp() throws Exception {
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
IndexNameExpressionResolver indexNameExpressionResolver = new IndexNameExpressionResolver(threadContext, SYSTEM_INDICES);
this.metadataCreateIndexService = mock(MetadataCreateIndexService.class);
+
+ final ThreadPool threadPool = mock(ThreadPool.class);
+ TransportService transportService = MockUtils.setupTransportServiceWithThreadpoolExecutor(threadPool);
this.action = new TransportCreateIndexAction(
- mock(TransportService.class),
+ transportService,
mock(ClusterService.class),
- mock(ThreadPool.class),
+ threadPool,
metadataCreateIndexService,
mock(ActionFilters.class),
indexNameExpressionResolver,
diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverActionTests.java
index 12308036689c6..bb1cf85013498 100644
--- a/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverActionTests.java
+++ b/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverActionTests.java
@@ -37,6 +37,7 @@
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.cache.query.QueryCacheStats;
@@ -78,6 +79,7 @@
import static org.hamcrest.Matchers.is;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
@@ -354,6 +356,10 @@ public void testConditionEvaluationWhenAliasToWriteAndReadIndicesConsidersOnlyPr
EmptySystemIndices.INSTANCE,
WriteLoadForecaster.DEFAULT
);
+
+ // TODO: temporary, remove in #97879
+ when(mockTransportService.getThreadPool()).thenReturn(mockThreadPool);
+ when(mockThreadPool.executor(anyString())).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE);
final TransportRolloverAction transportRolloverAction = new TransportRolloverAction(
mockTransportService,
mockClusterService,
diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/segments/IndicesSegmentResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/segments/IndicesSegmentResponseTests.java
index 5fe7a528d68c7..ad7689320a94f 100644
--- a/server/src/test/java/org/elasticsearch/action/admin/indices/segments/IndicesSegmentResponseTests.java
+++ b/server/src/test/java/org/elasticsearch/action/admin/indices/segments/IndicesSegmentResponseTests.java
@@ -67,7 +67,7 @@ public void testChunking() {
0,
Collections.emptyList()
),
- response -> response.getIndices().size() + 4
+ response -> 11 * response.getIndices().size() + 4
);
}
}
diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/settings/put/TransportUpdateSettingsActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/settings/put/TransportUpdateSettingsActionTests.java
index 0e1581606f803..807462b6052c3 100644
--- a/server/src/test/java/org/elasticsearch/action/admin/indices/settings/put/TransportUpdateSettingsActionTests.java
+++ b/server/src/test/java/org/elasticsearch/action/admin/indices/settings/put/TransportUpdateSettingsActionTests.java
@@ -25,6 +25,7 @@
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.test.MockUtils;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.junit.Before;
@@ -68,10 +69,13 @@ public void setUp() throws Exception {
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
IndexNameExpressionResolver indexNameExpressionResolver = new IndexNameExpressionResolver(threadContext, SYSTEM_INDICES);
MetadataUpdateSettingsService metadataUpdateSettingsService = mock(MetadataUpdateSettingsService.class);
+
+ final ThreadPool threadPool = mock(ThreadPool.class);
+ TransportService transportService = MockUtils.setupTransportServiceWithThreadpoolExecutor(threadPool);
this.action = new TransportUpdateSettingsAction(
- mock(TransportService.class),
+ transportService,
mock(ClusterService.class),
- mock(ThreadPool.class),
+ threadPool,
metadataUpdateSettingsService,
mock(ActionFilters.class),
indexNameExpressionResolver,
diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/stats/FieldUsageStatsResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/stats/FieldUsageStatsResponseTests.java
index 6fb53760de5c2..32bedb6082842 100644
--- a/server/src/test/java/org/elasticsearch/action/admin/indices/stats/FieldUsageStatsResponseTests.java
+++ b/server/src/test/java/org/elasticsearch/action/admin/indices/stats/FieldUsageStatsResponseTests.java
@@ -46,7 +46,7 @@ public void testToXContentChunkPerIndex() {
AbstractChunkedSerializingTestCase.assertChunkCount(
new FieldUsageStatsResponse(indices, indices, 0, List.of(), perIndex),
- ignored -> indices + 2
+ ignored -> 3 * indices + 2
);
}
}
diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsResponseTests.java
index a1280f7658804..6ee17ec7963b6 100644
--- a/server/src/test/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsResponseTests.java
+++ b/server/src/test/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsResponseTests.java
@@ -21,7 +21,6 @@
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.json.JsonXContent;
-import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
@@ -116,7 +115,7 @@ public void testGetIndices() {
}
}
- public void testChunkedEncodingPerIndex() throws IOException {
+ public void testChunkedEncodingPerIndex() {
final int shards = randomIntBetween(1, 10);
final List stats = new ArrayList<>(shards);
for (int i = 0; i < shards; i++) {
@@ -143,7 +142,7 @@ public void testChunkedEncodingPerIndex() throws IOException {
AbstractChunkedSerializingTestCase.assertChunkCount(
indicesStatsResponse,
new ToXContent.MapParams(Map.of("level", "indices")),
- ignored -> 4 + shards
+ ignored -> 4 + 2 * shards
);
}
diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/template/reservedstate/ReservedComposableIndexTemplateActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/template/reservedstate/ReservedComposableIndexTemplateActionTests.java
index 553dd2368e5bc..28476a0d8b839 100644
--- a/server/src/test/java/org/elasticsearch/action/admin/indices/template/reservedstate/ReservedComposableIndexTemplateActionTests.java
+++ b/server/src/test/java/org/elasticsearch/action/admin/indices/template/reservedstate/ReservedComposableIndexTemplateActionTests.java
@@ -40,6 +40,7 @@
import org.elasticsearch.reservedstate.ReservedClusterStateHandler;
import org.elasticsearch.reservedstate.TransformState;
import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.test.MockUtils;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xcontent.NamedXContentRegistry;
@@ -673,10 +674,12 @@ public void testAddRemoveIndexTemplatesWithOverlap() throws Exception {
}
public void testHandlerCorrectness() {
+ final ThreadPool threadPool = mock(ThreadPool.class);
+ TransportService transportService = MockUtils.setupTransportServiceWithThreadpoolExecutor(threadPool);
var putIndexAction = new TransportPutComposableIndexTemplateAction(
- mock(TransportService.class),
+ transportService,
null,
- mock(ThreadPool.class),
+ threadPool,
null,
mock(ActionFilters.class),
null
@@ -687,9 +690,9 @@ public void testHandlerCorrectness() {
containsInAnyOrder(reservedComposableIndexName("aaa"))
);
var delIndexAction = new TransportDeleteComposableIndexTemplateAction(
- mock(TransportService.class),
+ transportService,
null,
- mock(ThreadPool.class),
+ threadPool,
null,
mock(ActionFilters.class),
null
@@ -701,9 +704,9 @@ public void testHandlerCorrectness() {
);
var putComponentAction = new TransportPutComponentTemplateAction(
- mock(TransportService.class),
+ transportService,
null,
- mock(ThreadPool.class),
+ threadPool,
null,
mock(ActionFilters.class),
null,
@@ -716,9 +719,9 @@ public void testHandlerCorrectness() {
);
var delComponentAction = new TransportDeleteComponentTemplateAction(
- mock(TransportService.class),
+ transportService,
null,
- mock(ThreadPool.class),
+ threadPool,
null,
mock(ActionFilters.class),
null
@@ -921,10 +924,12 @@ public void testTemplatesWithReservedPrefix() throws Exception {
PutComposableIndexTemplateAction.Request pr = new PutComposableIndexTemplateAction.Request(conflictingTemplateName);
+ final ThreadPool threadPool = mock(ThreadPool.class);
+ TransportService transportService = MockUtils.setupTransportServiceWithThreadpoolExecutor(threadPool);
var putTemplateAction = new TransportPutComposableIndexTemplateAction(
- mock(TransportService.class),
+ transportService,
null,
- mock(ThreadPool.class),
+ threadPool,
null,
mock(ActionFilters.class),
null
diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java
index ff96f4a00b883..1b0c24664be31 100644
--- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java
+++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java
@@ -23,7 +23,6 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AtomicArray;
-import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;
@@ -32,6 +31,7 @@
import org.elasticsearch.indices.EmptySystemIndices;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.test.MockUtils;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@@ -42,7 +42,6 @@
import java.util.function.Function;
import static java.util.Collections.emptySet;
-import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -107,9 +106,6 @@ private void indicesThatCannotBeCreatedTestCase(
when(clusterService.localNode()).thenReturn(localNode);
when(localNode.isIngestNode()).thenReturn(randomBoolean());
- final ThreadPool threadPool = mock(ThreadPool.class);
- when(threadPool.executor(anyString())).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE);
-
final IndexNameExpressionResolver indexNameExpressionResolver = new IndexNameExpressionResolver(
new ThreadContext(Settings.EMPTY),
EmptySystemIndices.INSTANCE
@@ -120,9 +116,11 @@ public boolean hasIndexAbstraction(String indexAbstraction, ClusterState state)
}
};
+ final ThreadPool threadPool = mock(ThreadPool.class);
+ TransportService transportService = MockUtils.setupTransportServiceWithThreadpoolExecutor(threadPool);
TransportBulkAction action = new TransportBulkAction(
threadPool,
- mock(TransportService.class),
+ transportService,
clusterService,
null,
null,
diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java
index fd84a1e5fe816..85a74174b094f 100644
--- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java
+++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java
@@ -183,6 +183,11 @@ public void setupAction() {
MockitoAnnotations.openMocks(this);
// setup services that will be called by action
transportService = mock(TransportService.class);
+
+ // TODO: temporary, remove in #97879
+ when(transportService.getThreadPool()).thenReturn(threadPool);
+ when(threadPool.executor(anyString())).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE);
+
clusterService = mock(ClusterService.class);
localIngest = true;
// setup nodes for local and remote
diff --git a/server/src/test/java/org/elasticsearch/action/ingest/WriteableIngestDocumentTests.java b/server/src/test/java/org/elasticsearch/action/ingest/WriteableIngestDocumentTests.java
index f4cbfac402a5a..593cfa92877ba 100644
--- a/server/src/test/java/org/elasticsearch/action/ingest/WriteableIngestDocumentTests.java
+++ b/server/src/test/java/org/elasticsearch/action/ingest/WriteableIngestDocumentTests.java
@@ -41,9 +41,11 @@
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.sameInstance;
public class WriteableIngestDocumentTests extends AbstractXContentTestCase {
+ @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/99403")
public void testEqualsAndHashcode() throws Exception {
Map sourceAndMetadata = RandomDocumentPicks.randomSource(random());
int numFields = randomIntBetween(1, IngestDocument.Metadata.values().length);
@@ -183,6 +185,14 @@ public void testXContentHashSetSerialization() throws Exception {
}
}
+ public void testCopiesTheIngestDocument() {
+ IngestDocument document = createRandomIngestDoc();
+ WriteableIngestDocument wid = new WriteableIngestDocument(document);
+
+ assertThat(wid.getIngestDocument(), equalTo(document));
+ assertThat(wid.getIngestDocument(), not(sameInstance(document)));
+ }
+
static IngestDocument createRandomIngestDoc() {
XContentType xContentType = randomFrom(XContentType.values());
BytesReference sourceBytes = RandomObjects.randomSource(random(), xContentType);
diff --git a/server/src/test/java/org/elasticsearch/action/support/ReservedStateAwareHandledTransportActionTests.java b/server/src/test/java/org/elasticsearch/action/support/ReservedStateAwareHandledTransportActionTests.java
index 93c4ea98adcd3..c2ff59f0ccbe0 100644
--- a/server/src/test/java/org/elasticsearch/action/support/ReservedStateAwareHandledTransportActionTests.java
+++ b/server/src/test/java/org/elasticsearch/action/support/ReservedStateAwareHandledTransportActionTests.java
@@ -25,6 +25,7 @@
import org.elasticsearch.reservedstate.action.ReservedClusterSettingsAction;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.test.MockUtils;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
@@ -47,7 +48,8 @@ public void testRejectImmutableConflictClusterStateUpdate() {
ClusterService clusterService = mock(ClusterService.class);
doReturn(clusterState).when(clusterService).state();
- Action handler = new Action("internal:testAction", clusterService, mock(TransportService.class), mock(ActionFilters.class));
+ TransportService transportService = MockUtils.setupTransportServiceWithThreadpoolExecutor();
+ Action handler = new Action("internal:testAction", clusterService, transportService, mock(ActionFilters.class));
// nothing should happen here, since the request doesn't touch any of the immutable state keys
var future = new PlainActionFuture();
@@ -61,7 +63,7 @@ public void testRejectImmutableConflictClusterStateUpdate() {
FakeReservedStateAwareAction action = new FakeReservedStateAwareAction(
"internal:testClusterSettings",
clusterService,
- mock(TransportService.class),
+ transportService,
mock(ActionFilters.class),
null
);
diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/PostWriteRefreshTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/PostWriteRefreshTests.java
index e45fc51cf6bdd..d14429647c7d3 100644
--- a/server/src/test/java/org/elasticsearch/action/support/replication/PostWriteRefreshTests.java
+++ b/server/src/test/java/org/elasticsearch/action/support/replication/PostWriteRefreshTests.java
@@ -21,6 +21,7 @@
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.engine.DocIdSeqNoAndSource;
@@ -31,7 +32,6 @@
import org.elasticsearch.index.shard.ReplicationGroup;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.transport.MockTransportService;
-import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
@@ -67,7 +67,7 @@ public void setUp() throws Exception {
transportService.acceptIncomingRequests();
transportService.registerRequestHandler(
TransportUnpromotableShardRefreshAction.NAME,
- ThreadPool.Names.SAME,
+ EsExecutors.DIRECT_EXECUTOR_SERVICE,
UnpromotableShardRefreshRequest::new,
(request, channel, task) -> {
unpromotableRefreshRequestReceived.set(true);
diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java
index 8e95277bdc84f..7726feaa30868 100644
--- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java
+++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java
@@ -424,7 +424,7 @@ protected TestAction(boolean withDocumentFailureOnPrimary, boolean withDocumentF
),
TransportWriteActionTests.this.clusterService,
null,
- null,
+ TransportWriteActionTests.threadPool,
null,
new ActionFilters(new HashSet<>()),
TestRequest::new,
diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterStateTests.java b/server/src/test/java/org/elasticsearch/cluster/ClusterStateTests.java
index e3c5865333b94..b96a362b2a867 100644
--- a/server/src/test/java/org/elasticsearch/cluster/ClusterStateTests.java
+++ b/server/src/test/java/org/elasticsearch/cluster/ClusterStateTests.java
@@ -208,12 +208,6 @@ public void testToXContent() throws IOException {
"max_index_version":%s
}
},
- "transport_versions" : [
- {
- "node_id" : "nodeId1",
- "transport_version" : "%s"
- }
- ],
"nodes_versions" : [
{
"node_id" : "nodeId1",
@@ -373,7 +367,6 @@ public void testToXContent() throws IOException {
IndexVersion.MINIMUM_COMPATIBLE,
IndexVersion.current(),
TransportVersion.current(),
- TransportVersion.current(),
IndexVersion.current(),
IndexVersion.current(),
allocationId,
@@ -470,12 +463,6 @@ public void testToXContent_FlatSettingTrue_ReduceMappingFalse() throws IOExcepti
"max_index_version" : %s
}
},
- "transport_versions" : [
- {
- "node_id" : "nodeId1",
- "transport_version" : "%s"
- }
- ],
"nodes_versions" : [
{
"node_id" : "nodeId1",
@@ -631,7 +618,6 @@ public void testToXContent_FlatSettingTrue_ReduceMappingFalse() throws IOExcepti
IndexVersion.MINIMUM_COMPATIBLE,
IndexVersion.current(),
TransportVersion.current(),
- TransportVersion.current(),
IndexVersion.current(),
IndexVersion.current(),
allocationId,
@@ -728,12 +714,6 @@ public void testToXContent_FlatSettingFalse_ReduceMappingTrue() throws IOExcepti
"max_index_version" : %s
}
},
- "transport_versions" : [
- {
- "node_id" : "nodeId1",
- "transport_version" : "%s"
- }
- ],
"nodes_versions" : [
{
"node_id" : "nodeId1",
@@ -895,7 +875,6 @@ public void testToXContent_FlatSettingFalse_ReduceMappingTrue() throws IOExcepti
IndexVersion.MINIMUM_COMPATIBLE,
IndexVersion.current(),
TransportVersion.current(),
- TransportVersion.current(),
IndexVersion.current(),
IndexVersion.current(),
allocationId,
@@ -953,7 +932,6 @@ public void testToXContentSameTypeName() throws IOException {
"master_node" : null,
"blocks" : { },
"nodes" : { },
- "transport_versions" : [ ],
"nodes_versions" : [ ],
"metadata" : {
"cluster_uuid" : "clusterUUID",
@@ -1185,9 +1163,9 @@ public static int expectedChunkCount(ToXContent.Params params, ClusterState clus
chunkCount += 2 + clusterState.blocks().indices().size();
}
- // nodes, transport_versions, nodes_versions
+ // nodes, nodes_versions
if (metrics.contains(ClusterState.Metric.NODES)) {
- chunkCount += 6 + clusterState.nodes().size() + 2 * clusterState.compatibilityVersions().size();
+ chunkCount += 4 + clusterState.nodes().size() + clusterState.compatibilityVersions().size();
}
// metadata
diff --git a/server/src/test/java/org/elasticsearch/index/mapper/ParametrizedMapperTests.java b/server/src/test/java/org/elasticsearch/index/mapper/ParametrizedMapperTests.java
index 248d94cb67a63..8ff319134b964 100644
--- a/server/src/test/java/org/elasticsearch/index/mapper/ParametrizedMapperTests.java
+++ b/server/src/test/java/org/elasticsearch/index/mapper/ParametrizedMapperTests.java
@@ -174,7 +174,7 @@ protected Parameter>[] getParameters() {
@Override
public FieldMapper build(MapperBuilderContext context) {
- return new TestMapper(name(), context.buildFullName(name), multiFieldsBuilder.build(this, context), copyTo.build(), this);
+ return new TestMapper(name(), context.buildFullName(name), multiFieldsBuilder.build(this, context), copyTo, this);
}
}
diff --git a/server/src/test/java/org/elasticsearch/index/mapper/vectors/DenseVectorFieldMapperTests.java b/server/src/test/java/org/elasticsearch/index/mapper/vectors/DenseVectorFieldMapperTests.java
index 726ce0aa043c1..c2762d859f266 100644
--- a/server/src/test/java/org/elasticsearch/index/mapper/vectors/DenseVectorFieldMapperTests.java
+++ b/server/src/test/java/org/elasticsearch/index/mapper/vectors/DenseVectorFieldMapperTests.java
@@ -320,7 +320,7 @@ public void testDotProductWithInvalidNorm() throws Exception {
b -> b.field("type", "dense_vector").field("dims", 3).field("index", true).field("similarity", VectorSimilarity.DOT_PRODUCT)
)
);
- float[] vector = { 0f, 0f, 0f };
+ float[] vector = { -12.1f, 2.7f, -4 };
DocumentParsingException e = expectThrows(
DocumentParsingException.class,
() -> mapper.parse(source(b -> b.array("field", vector)))
@@ -329,7 +329,23 @@ public void testDotProductWithInvalidNorm() throws Exception {
assertThat(
e.getCause().getMessage(),
containsString(
- "The [dot_product] similarity does not support vectors with zero magnitude. Preview of invalid vector: [0.0, 0.0, 0.0]"
+ "The [dot_product] similarity can only be used with unit-length vectors. Preview of invalid vector: [-12.1, 2.7, -4.0]"
+ )
+ );
+
+ DocumentMapper mapperWithLargerDim = createDocumentMapper(
+ fieldMapping(
+ b -> b.field("type", "dense_vector").field("dims", 6).field("index", true).field("similarity", VectorSimilarity.DOT_PRODUCT)
+ )
+ );
+ float[] largerVector = { -12.1f, 2.7f, -4, 1.05f, 10.0f, 29.9f };
+ e = expectThrows(DocumentParsingException.class, () -> mapperWithLargerDim.parse(source(b -> b.array("field", largerVector))));
+ assertNotNull(e.getCause());
+ assertThat(
+ e.getCause().getMessage(),
+ containsString(
+ "The [dot_product] similarity can only be used with unit-length vectors. "
+ + "Preview of invalid vector: [-12.1, 2.7, -4.0, 1.05, 10.0, ...]"
)
);
}
@@ -499,7 +515,7 @@ public void testDefaultParamsBeforeIndexByDefault() throws Exception {
assertNull(denseVectorFieldType.getSimilarity());
}
- public void testParamsBeforeIndexByDefault() throws Exception {
+ public void testtParamsBeforeIndexByDefault() throws Exception {
DocumentMapper documentMapper = createDocumentMapper(INDEXED_BY_DEFAULT_PREVIOUS_INDEX_VERSION, fieldMapping(b -> {
b.field("type", "dense_vector").field("dims", 3).field("index", true).field("similarity", "dot_product");
}));
diff --git a/server/src/test/java/org/elasticsearch/index/mapper/vectors/DenseVectorFieldTypeTests.java b/server/src/test/java/org/elasticsearch/index/mapper/vectors/DenseVectorFieldTypeTests.java
index 448d6aff0f4e8..d22056d49beb5 100644
--- a/server/src/test/java/org/elasticsearch/index/mapper/vectors/DenseVectorFieldTypeTests.java
+++ b/server/src/test/java/org/elasticsearch/index/mapper/vectors/DenseVectorFieldTypeTests.java
@@ -137,9 +137,9 @@ public void testFloatCreateKnnQuery() {
);
e = expectThrows(
IllegalArgumentException.class,
- () -> dotProductField.createKnnQuery(new float[] { 0.0f, 0.0f, 0.0f }, 10, null, null)
+ () -> dotProductField.createKnnQuery(new float[] { 0.3f, 0.1f, 1.0f }, 10, null, null)
);
- assertThat(e.getMessage(), containsString("The [dot_product] similarity does not support vectors with zero magnitude."));
+ assertThat(e.getMessage(), containsString("The [dot_product] similarity can only be used with unit-length vectors."));
DenseVectorFieldType cosineField = new DenseVectorFieldType(
"f",
diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java
index 66503bf4cea79..a18e7e8ce46f9 100644
--- a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java
+++ b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java
@@ -19,6 +19,7 @@
import org.elasticsearch.indices.recovery.plan.RecoveryPlannerService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
+import org.elasticsearch.test.MockUtils;
import org.elasticsearch.test.NodeRoles;
import org.elasticsearch.transport.TransportService;
@@ -38,8 +39,9 @@ public void testDuplicateRecoveries() throws IOException {
final ClusterService clusterService = mock(ClusterService.class);
when(clusterService.getSettings()).thenReturn(NodeRoles.dataNode());
when(indicesService.clusterService()).thenReturn(clusterService);
+ TransportService transportService = MockUtils.setupTransportServiceWithThreadpoolExecutor();
PeerRecoverySourceService peerRecoverySourceService = new PeerRecoverySourceService(
- mock(TransportService.class),
+ transportService,
indicesService,
new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)),
mock(RecoveryPlannerService.class)
diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestDocumentTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestDocumentTests.java
index 924ca1fc7a1e9..ae46668352048 100644
--- a/server/src/test/java/org/elasticsearch/ingest/IngestDocumentTests.java
+++ b/server/src/test/java/org/elasticsearch/ingest/IngestDocumentTests.java
@@ -1069,6 +1069,17 @@ public void testCopyConstructor() {
assertThat(ingestDocument.getFieldValue("_id", String.class), equalTo("bar1"));
assertThat(ingestDocument.getFieldValue("hello", String.class), equalTo("world1"));
}
+
+ {
+ // the copy constructor rejects self-references
+ IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
+ List