diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS
index 68d02d5f7d544..bb12121cd3d8f 100644
--- a/.github/CODEOWNERS
+++ b/.github/CODEOWNERS
@@ -11,7 +11,7 @@
# 3. Use the command palette to run the CODEOWNERS: Show owners of current file command, which will display all code owners for the current file.
# Default ownership for all repo files
-* @abbashus @adnapibar @anasalkouz @andrross @Bukhtawar @CEHENKLE @dblock @dbwiddis @dreamer-89 @gbbafna @kartg @kotwanikunal @mch2 @msfroh @nknize @owaiskazi19 @reta @Rishikesh1159 @ryanbogan @sachinpkale @saratvemulapalli @setiah @shwetathareja @sohami @tlfeng @VachaShah
+* @abbashus @adnapibar @anasalkouz @andrross @Bukhtawar @CEHENKLE @dblock @dbwiddis @dreamer-89 @gbbafna @kotwanikunal @mch2 @msfroh @nknize @owaiskazi19 @reta @Rishikesh1159 @ryanbogan @sachinpkale @saratvemulapalli @setiah @shwetathareja @sohami @tlfeng @VachaShah
/modules/transport-netty4/ @peternied
@@ -24,4 +24,4 @@
/.github/ @peternied
-/MAINTAINERS.md @abbashus @adnapibar @anasalkouz @andrross @Bukhtawar @CEHENKLE @dblock @dbwiddis @dreamer-89 @gbbafna @kartg @kotwanikunal @mch2 @msfroh @nknize @owaiskazi19 @peternied @reta @Rishikesh1159 @ryanbogan @sachinpkale @saratvemulapalli @setiah @shwetathareja @sohami @tlfeng @VachaShah
+/MAINTAINERS.md @abbashus @adnapibar @anasalkouz @andrross @Bukhtawar @CEHENKLE @dblock @dbwiddis @dreamer-89 @gbbafna @kotwanikunal @mch2 @msfroh @nknize @owaiskazi19 @peternied @reta @Rishikesh1159 @ryanbogan @sachinpkale @saratvemulapalli @setiah @shwetathareja @sohami @tlfeng @VachaShah
diff --git a/.github/ISSUE_TEMPLATE/feature_request.yml b/.github/ISSUE_TEMPLATE/feature_request.yml
index 0159e771f7f80..c9df17bad9576 100644
--- a/.github/ISSUE_TEMPLATE/feature_request.yml
+++ b/.github/ISSUE_TEMPLATE/feature_request.yml
@@ -47,6 +47,10 @@ body:
- Storage:Remote
- Storage:Snapshots
- Storage
+ - ShardManagement:Placement
+ - ShardManagement:Performance
+ - ShardManagement:Resiliency
+ - ShardManagement:Insights
validations:
required: true
- type: textarea
diff --git a/.github/workflows/gradle-check.yml b/.github/workflows/gradle-check.yml
index 8ac44cc37d27c..1f5c187c28e7d 100644
--- a/.github/workflows/gradle-check.yml
+++ b/.github/workflows/gradle-check.yml
@@ -72,7 +72,7 @@ jobs:
- name: Upload Coverage Report
if: success()
- uses: codecov/codecov-action@v3
+ uses: codecov/codecov-action@v4
with:
files: ./codeCoverage.xml
diff --git a/CHANGELOG.md b/CHANGELOG.md
index e581915ffd64e..2ffca93bec349 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -11,14 +11,13 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add events correlation engine plugin ([#6854](https://github.com/opensearch-project/OpenSearch/issues/6854))
- Implement on behalf of token passing for extensions ([#8679](https://github.com/opensearch-project/OpenSearch/pull/8679), [#10664](https://github.com/opensearch-project/OpenSearch/pull/10664))
- Provide service accounts tokens to extensions ([#9618](https://github.com/opensearch-project/OpenSearch/pull/9618))
-- [AdmissionControl] Added changes for AdmissionControl Interceptor and AdmissionControlService for RateLimiting ([#9286](https://github.com/opensearch-project/OpenSearch/pull/9286))
- GHA to verify checklist items completion in PR descriptions ([#10800](https://github.com/opensearch-project/OpenSearch/pull/10800))
- Allow to pass the list settings through environment variables (like [], ["a", "b", "c"], ...) ([#10625](https://github.com/opensearch-project/OpenSearch/pull/10625))
-- [Admission Control] Integrate CPU AC with ResourceUsageCollector and add CPU AC stats to nodes/stats ([#10887](https://github.com/opensearch-project/OpenSearch/pull/10887))
- [S3 Repository] Add setting to control connection count for sync client ([#12028](https://github.com/opensearch-project/OpenSearch/pull/12028))
- Views, simplify data access and manipulation by providing a virtual layer over one or more indices ([#11957](https://github.com/opensearch-project/OpenSearch/pull/11957))
- Add Remote Store Migration Experimental flag and allow mixed mode clusters under same ([#11986](https://github.com/opensearch-project/OpenSearch/pull/11986))
- Remote reindex: Add support for configurable retry mechanism ([#12561](https://github.com/opensearch-project/OpenSearch/pull/12561))
+- [Admission Control] Integrate IO Usage Tracker to the Resource Usage Collector Service and Emit IO Usage Stats ([#11880](https://github.com/opensearch-project/OpenSearch/pull/11880))
### Dependencies
- Bump `log4j-core` from 2.18.0 to 2.19.0
@@ -116,13 +115,16 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Bump `netty` from 4.1.106.Final to 4.1.107.Final ([#12372](https://github.com/opensearch-project/OpenSearch/pull/12372))
- Bump `opentelemetry` from 1.34.1 to 1.35.0 ([#12388](https://github.com/opensearch-project/OpenSearch/pull/12388))
- Bump Apache Lucene from 9.9.2 to 9.10.0 ([#12392](https://github.com/opensearch-project/OpenSearch/pull/12392))
-- Bump `org.apache.logging.log4j:log4j-core` from 2.22.1 to 2.23.0 ([#12464](https://github.com/opensearch-project/OpenSearch/pull/12464))
+- Bump `org.apache.logging.log4j:log4j-core` from 2.22.1 to 2.23.1 ([#12464](https://github.com/opensearch-project/OpenSearch/pull/12464), [#12587](https://github.com/opensearch-project/OpenSearch/pull/12587))
- Bump `antlr4` from 4.11.1 to 4.13.1 ([#12445](https://github.com/opensearch-project/OpenSearch/pull/12445))
- Bump `com.netflix.nebula.ospackage-base` from 11.8.0 to 11.8.1 ([#12461](https://github.com/opensearch-project/OpenSearch/pull/12461))
- Bump `peter-evans/create-or-update-comment` from 3 to 4 ([#12462](https://github.com/opensearch-project/OpenSearch/pull/12462))
- Bump `lycheeverse/lychee-action` from 1.9.1 to 1.9.3 ([#12521](https://github.com/opensearch-project/OpenSearch/pull/12521))
- Bump `com.azure:azure-core` from 1.39.0 to 1.47.0 ([#12520](https://github.com/opensearch-project/OpenSearch/pull/12520))
- Bump `ch.qos.logback:logback-core` from 1.2.13 to 1.5.3 ([#12519](https://github.com/opensearch-project/OpenSearch/pull/12519))
+- Bump `codecov/codecov-action` from 3 to 4 ([#12585](https://github.com/opensearch-project/OpenSearch/pull/12585))
+- Bump `org.apache.zookeeper:zookeeper` from 3.9.1 to 3.9.2 ([#12580](https://github.com/opensearch-project/OpenSearch/pull/12580))
+- Bump `org.codehaus.woodstox:stax2-api` from 4.2.1 to 4.2.2 ([#12579](https://github.com/opensearch-project/OpenSearch/pull/12579))
### Changed
- Allow composite aggregation to run under a parent filter aggregation ([#11499](https://github.com/opensearch-project/OpenSearch/pull/11499))
@@ -139,6 +141,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Prevent read beyond slice boundary in ByteArrayIndexInput ([#10481](https://github.com/opensearch-project/OpenSearch/issues/10481))
- Fix the "highlight.max_analyzer_offset" request parameter with "plain" highlighter ([#10919](https://github.com/opensearch-project/OpenSearch/pull/10919))
- Warn about deprecated and ignored index.mapper.dynamic index setting ([#11193](https://github.com/opensearch-project/OpenSearch/pull/11193))
+- Fix `terms` query on `float` field when `doc_values` are turned off by reverting back to `FloatPoint` from `FloatField` ([#12499](https://github.com/opensearch-project/OpenSearch/pull/12499))
- Fix get task API does not refresh resource stats ([#11531](https://github.com/opensearch-project/OpenSearch/pull/11531))
### Security
diff --git a/MAINTAINERS.md b/MAINTAINERS.md
index 42a8a439445ca..5535c2fa26eae 100644
--- a/MAINTAINERS.md
+++ b/MAINTAINERS.md
@@ -16,7 +16,6 @@ This document contains a list of maintainers in this repo. See [opensearch-proje
| Daniel "dB." Doubrovkine | [dblock](https://github.com/dblock) | Amazon |
| Gaurav Bafna | [gbbafna](https://github.com/gbbafna) | Amazon |
| Himanshu Setia | [setiah](https://github.com/setiah) | Amazon |
-| Kartik Ganesh | [kartg](https://github.com/kartg) | Amazon |
| Kunal Kotwani | [kotwanikunal](https://github.com/kotwanikunal) | Amazon |
| Marc Handalian | [mch2](https://github.com/mch2) | Amazon |
| Michael Froh | [msfroh](https://github.com/msfroh) | Amazon |
@@ -40,3 +39,4 @@ This document contains a list of maintainers in this repo. See [opensearch-proje
|-------------------------|---------------------------------------------|-------------|
| Megha Sai Kavikondala | [meghasaik](https://github.com/meghasaik) | Amazon |
| Xue Zhou | [xuezhou25](https://github.com/xuezhou25) | Amazon |
+| Kartik Ganesh | [kartg](https://github.com/kartg) | Amazon |
diff --git a/buildSrc/src/testKit/thirdPartyAudit/sample_jars/build.gradle b/buildSrc/src/testKit/thirdPartyAudit/sample_jars/build.gradle
index 4b8f52ec07615..48dfb206375ca 100644
--- a/buildSrc/src/testKit/thirdPartyAudit/sample_jars/build.gradle
+++ b/buildSrc/src/testKit/thirdPartyAudit/sample_jars/build.gradle
@@ -17,7 +17,7 @@ repositories {
}
dependencies {
- implementation "org.apache.logging.log4j:log4j-core:2.23.0"
+ implementation "org.apache.logging.log4j:log4j-core:2.23.1"
}
["0.0.1", "0.0.2"].forEach { v ->
diff --git a/plugins/repository-azure/build.gradle b/plugins/repository-azure/build.gradle
index 1754f8f162efc..31db767b2c68e 100644
--- a/plugins/repository-azure/build.gradle
+++ b/plugins/repository-azure/build.gradle
@@ -64,7 +64,7 @@ dependencies {
api "com.fasterxml.jackson.datatype:jackson-datatype-jsr310:${versions.jackson}"
api "com.fasterxml.jackson.dataformat:jackson-dataformat-xml:${versions.jackson}"
api "com.fasterxml.jackson.module:jackson-module-jaxb-annotations:${versions.jackson}"
- api 'org.codehaus.woodstox:stax2-api:4.2.1'
+ api 'org.codehaus.woodstox:stax2-api:4.2.2'
implementation "com.fasterxml.woodstox:woodstox-core:${versions.woodstox}"
runtimeOnly "com.google.guava:guava:${versions.guava}"
api "org.apache.commons:commons-lang3:${versions.commonslang}"
diff --git a/plugins/repository-azure/licenses/stax2-api-4.2.1.jar.sha1 b/plugins/repository-azure/licenses/stax2-api-4.2.1.jar.sha1
deleted file mode 100644
index 2c12704cdc560..0000000000000
--- a/plugins/repository-azure/licenses/stax2-api-4.2.1.jar.sha1
+++ /dev/null
@@ -1 +0,0 @@
-a3f7325c52240418c2ba257b103c3c550e140c83
\ No newline at end of file
diff --git a/plugins/repository-azure/licenses/stax2-api-4.2.2.jar.sha1 b/plugins/repository-azure/licenses/stax2-api-4.2.2.jar.sha1
new file mode 100644
index 0000000000000..b15a7ead0d016
--- /dev/null
+++ b/plugins/repository-azure/licenses/stax2-api-4.2.2.jar.sha1
@@ -0,0 +1 @@
+b0d746cadea928e5264f2ea294ea9a1bf815bbde
\ No newline at end of file
diff --git a/server/src/internalClusterTest/java/org/opensearch/search/simple/SimpleSearchIT.java b/server/src/internalClusterTest/java/org/opensearch/search/simple/SimpleSearchIT.java
index 1c1587a3be600..98e749aa48cac 100644
--- a/server/src/internalClusterTest/java/org/opensearch/search/simple/SimpleSearchIT.java
+++ b/server/src/internalClusterTest/java/org/opensearch/search/simple/SimpleSearchIT.java
@@ -47,12 +47,14 @@
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.mapper.MapperService;
+import org.opensearch.index.query.ConstantScoreQueryBuilder;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.index.query.TermQueryBuilder;
import org.opensearch.search.rescore.QueryRescorerBuilder;
import org.opensearch.search.sort.SortOrder;
import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -676,6 +678,23 @@ public void testTermQueryBigInt() throws Exception {
assertEquals(1, searchResponse.getHits().getTotalHits().value);
}
+ public void testIndexOnlyFloatField() throws IOException {
+ prepareCreate("idx").setMapping("field", "type=float,doc_values=false").get();
+ ensureGreen("idx");
+
+ IndexRequestBuilder indexRequestBuilder = client().prepareIndex("idx");
+
+ for (float i = 9000.0F; i < 20000.0F; i++) {
+ indexRequestBuilder.setId(String.valueOf(i)).setSource("{\"field\":" + i + "}", MediaTypeRegistry.JSON).get();
+ }
+ String queryJson = "{ \"filter\" : { \"terms\" : { \"field\" : [ 10000.0 ] } } }";
+ XContentParser parser = createParser(JsonXContent.jsonXContent, queryJson);
+ parser.nextToken();
+ ConstantScoreQueryBuilder query = ConstantScoreQueryBuilder.fromXContent(parser);
+ SearchResponse searchResponse = client().prepareSearch("idx").setQuery(query).get();
+ assertEquals(1, searchResponse.getHits().getTotalHits().value);
+ }
+
public void testTooLongRegexInRegexpQuery() throws Exception {
createIndex("idx");
indexRandom(true, client().prepareIndex("idx").setSource("{}", MediaTypeRegistry.JSON));
diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java
index 896a234c115b6..8695ff878c8bc 100644
--- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java
+++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java
@@ -667,6 +667,7 @@ public void apply(Settings value, Settings current, Settings previous) {
// Settings related to resource trackers
ResourceTrackerSettings.GLOBAL_CPU_USAGE_AC_WINDOW_DURATION_SETTING,
ResourceTrackerSettings.GLOBAL_JVM_USAGE_AC_WINDOW_DURATION_SETTING,
+ ResourceTrackerSettings.GLOBAL_IO_USAGE_AC_WINDOW_DURATION_SETTING,
// Settings related to Searchable Snapshots
Node.NODE_SEARCH_CACHE_SIZE_SETTING,
diff --git a/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java b/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java
index 50774f7e0cb1c..3d129d4794a10 100644
--- a/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java
+++ b/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java
@@ -32,9 +32,6 @@
package org.opensearch.gateway;
import org.apache.logging.log4j.Logger;
-import org.apache.logging.log4j.message.ParameterizedMessage;
-import org.opensearch.ExceptionsHelper;
-import org.opensearch.OpenSearchTimeoutException;
import org.opensearch.action.FailedNodeException;
import org.opensearch.action.support.nodes.BaseNodeResponse;
import org.opensearch.action.support.nodes.BaseNodesResponse;
@@ -43,21 +40,22 @@
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.common.Nullable;
import org.opensearch.common.lease.Releasable;
+import org.opensearch.common.logging.Loggers;
import org.opensearch.core.action.ActionListener;
-import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.indices.store.ShardAttributes;
-import org.opensearch.transport.ReceiveTimeoutTransportException;
+import org.opensearch.indices.store.TransportNodesListShardStoreMetadata;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
+import reactor.util.annotation.NonNull;
+
import static java.util.Collections.emptyMap;
import static java.util.Collections.unmodifiableMap;
@@ -65,11 +63,9 @@
* Allows to asynchronously fetch shard related data from other nodes for allocation, without blocking
* the cluster update thread.
*
- * The async fetch logic maintains a map of which nodes are being fetched from in an async manner,
- * and once the results are back, it makes sure to schedule a reroute to make sure those results will
- * be taken into account.
+ * The async fetch logic maintains a cache {@link AsyncShardFetchCache} which is filled in async manner when nodes respond back.
+ * It also schedules a reroute to make sure those results will be taken into account.
*
- * It comes in two modes, to single fetch a shard or fetch a batch of shards.
* @opensearch.internal
*/
public abstract class AsyncShardFetch implements Releasable {
@@ -86,14 +82,12 @@ public interface Lister, N
protected final String type;
protected final Map shardAttributesMap;
private final Lister, T> action;
- private final Map> cache = new HashMap<>();
+ private final AsyncShardFetchCache cache;
private final AtomicLong round = new AtomicLong();
private boolean closed;
private final String reroutingKey;
private final Map> shardToIgnoreNodes = new HashMap<>();
- private final boolean enableBatchMode;
-
@SuppressWarnings("unchecked")
protected AsyncShardFetch(
Logger logger,
@@ -108,17 +102,17 @@ protected AsyncShardFetch(
shardAttributesMap.put(shardId, new ShardAttributes(shardId, customDataPath));
this.action = (Lister, T>) action;
this.reroutingKey = "ShardId=[" + shardId.toString() + "]";
- enableBatchMode = false;
+ cache = new ShardCache<>(logger, reroutingKey, type);
}
/**
* Added to fetch a batch of shards from nodes
*
- * @param logger Logger
- * @param type type of action
+ * @param logger Logger
+ * @param type type of action
* @param shardAttributesMap Map of {@link ShardId} to {@link ShardAttributes} to perform fetching on them a
- * @param action Transport Action
- * @param batchId For the given ShardAttributesMap, we expect them to tie with a single batch id for logging and later identification
+ * @param action Transport Action
+ * @param batchId For the given ShardAttributesMap, we expect them to tie with a single batch id for logging and later identification
*/
@SuppressWarnings("unchecked")
protected AsyncShardFetch(
@@ -133,7 +127,7 @@ protected AsyncShardFetch(
this.shardAttributesMap = shardAttributesMap;
this.action = (Lister, T>) action;
this.reroutingKey = "BatchID=[" + batchId + "]";
- enableBatchMode = true;
+ cache = new ShardCache<>(logger, reroutingKey, type);
}
@Override
@@ -141,19 +135,6 @@ public synchronized void close() {
this.closed = true;
}
- /**
- * Returns the number of async fetches that are currently ongoing.
- */
- public synchronized int getNumberOfInFlightFetches() {
- int count = 0;
- for (NodeEntry nodeEntry : cache.values()) {
- if (nodeEntry.isFetching()) {
- count++;
- }
- }
- return count;
- }
-
/**
* Fetches the data for the relevant shard. If there any ongoing async fetches going on, or new ones have
* been initiated by this call, the result will have no data.
@@ -166,7 +147,7 @@ public synchronized FetchResult fetchData(DiscoveryNodes nodes, Map 1) {
throw new IllegalStateException(
@@ -187,48 +168,24 @@ public synchronized FetchResult fetchData(DiscoveryNodes nodes, Map> nodesToFetch = findNodesToFetch(cache);
- if (nodesToFetch.isEmpty() == false) {
+ cache.fillShardCacheWithDataNodes(nodes);
+ List nodeIds = cache.findNodesToFetch();
+ if (nodeIds.isEmpty() == false) {
// mark all node as fetching and go ahead and async fetch them
// use a unique round id to detect stale responses in processAsyncFetch
final long fetchingRound = round.incrementAndGet();
- for (NodeEntry nodeEntry : nodesToFetch) {
- nodeEntry.markAsFetching(fetchingRound);
- }
- DiscoveryNode[] discoNodesToFetch = nodesToFetch.stream()
- .map(NodeEntry::getNodeId)
- .map(nodes::get)
- .toArray(DiscoveryNode[]::new);
+ cache.markAsFetching(nodeIds, fetchingRound);
+ DiscoveryNode[] discoNodesToFetch = nodeIds.stream().map(nodes::get).toArray(DiscoveryNode[]::new);
asyncFetch(discoNodesToFetch, fetchingRound);
}
// if we are still fetching, return null to indicate it
- if (hasAnyNodeFetching(cache)) {
+ if (cache.hasAnyNodeFetching()) {
return new FetchResult<>(null, emptyMap());
} else {
// nothing to fetch, yay, build the return value
- Map fetchData = new HashMap<>();
Set failedNodes = new HashSet<>();
- for (Iterator>> it = cache.entrySet().iterator(); it.hasNext();) {
- Map.Entry> entry = it.next();
- String nodeId = entry.getKey();
- NodeEntry nodeEntry = entry.getValue();
-
- DiscoveryNode node = nodes.get(nodeId);
- if (node != null) {
- if (nodeEntry.isFailed()) {
- // if its failed, remove it from the list of nodes, so if this run doesn't work
- // we try again next round to fetch it again
- it.remove();
- failedNodes.add(nodeEntry.getNodeId());
- } else {
- if (nodeEntry.getValue() != null) {
- fetchData.put(node, nodeEntry.getValue());
- }
- }
- }
- }
+ Map fetchData = cache.getCacheData(nodes, failedNodes);
Map> allIgnoreNodesMap = unmodifiableMap(new HashMap<>(shardToIgnoreNodes));
// clear the nodes to ignore, we had a successful run in fetching everything we can
@@ -268,77 +225,18 @@ protected synchronized void processAsyncFetch(List responses, List nodeEntry = cache.get(response.getNode().getId());
- if (nodeEntry != null) {
- if (nodeEntry.getFetchingRound() != fetchingRound) {
- assert nodeEntry.getFetchingRound() > fetchingRound : "node entries only replaced by newer rounds";
- logger.trace(
- "{} received response for [{}] from node {} for an older fetching round (expected: {} but was: {})",
- reroutingKey,
- nodeEntry.getNodeId(),
- type,
- nodeEntry.getFetchingRound(),
- fetchingRound
- );
- } else if (nodeEntry.isFailed()) {
- logger.trace(
- "{} node {} has failed for [{}] (failure [{}])",
- reroutingKey,
- nodeEntry.getNodeId(),
- type,
- nodeEntry.getFailure()
- );
- } else {
- // if the entry is there, for the right fetching round and not marked as failed already, process it
- logger.trace("{} marking {} as done for [{}], result is [{}]", reroutingKey, nodeEntry.getNodeId(), type, response);
- nodeEntry.doneFetching(response);
- }
- }
- }
+ cache.processResponses(responses, fetchingRound);
}
if (failures != null) {
- for (FailedNodeException failure : failures) {
- logger.trace("{} processing failure {} for [{}]", reroutingKey, failure, type);
- NodeEntry nodeEntry = cache.get(failure.nodeId());
- if (nodeEntry != null) {
- if (nodeEntry.getFetchingRound() != fetchingRound) {
- assert nodeEntry.getFetchingRound() > fetchingRound : "node entries only replaced by newer rounds";
- logger.trace(
- "{} received failure for [{}] from node {} for an older fetching round (expected: {} but was: {})",
- reroutingKey,
- nodeEntry.getNodeId(),
- type,
- nodeEntry.getFetchingRound(),
- fetchingRound
- );
- } else if (nodeEntry.isFailed() == false) {
- // if the entry is there, for the right fetching round and not marked as failed already, process it
- Throwable unwrappedCause = ExceptionsHelper.unwrapCause(failure.getCause());
- // if the request got rejected or timed out, we need to try it again next time...
- if (unwrappedCause instanceof OpenSearchRejectedExecutionException
- || unwrappedCause instanceof ReceiveTimeoutTransportException
- || unwrappedCause instanceof OpenSearchTimeoutException) {
- nodeEntry.restartFetching();
- } else {
- logger.warn(
- () -> new ParameterizedMessage(
- "{}: failed to list shard for {} on node [{}]",
- reroutingKey,
- type,
- failure.nodeId()
- ),
- failure
- );
- nodeEntry.doneFetching(failure.getCause());
- }
- }
- }
- }
+ cache.processFailures(failures, fetchingRound);
}
reroute(reroutingKey, "post_response");
}
+ public synchronized int getNumberOfInFlightFetches() {
+ return cache.getInflightFetches();
+ }
+
/**
* Implement this in order to scheduled another round that causes a call to fetch data.
*/
@@ -351,47 +249,6 @@ synchronized void clearCacheForNode(String nodeId) {
cache.remove(nodeId);
}
- /**
- * Fills the shard fetched data with new (data) nodes and a fresh NodeEntry, and removes from
- * it nodes that are no longer part of the state.
- */
- private void fillShardCacheWithDataNodes(Map> shardCache, DiscoveryNodes nodes) {
- // verify that all current data nodes are there
- for (final DiscoveryNode node : nodes.getDataNodes().values()) {
- if (shardCache.containsKey(node.getId()) == false) {
- shardCache.put(node.getId(), new NodeEntry(node.getId()));
- }
- }
- // remove nodes that are not longer part of the data nodes set
- shardCache.keySet().removeIf(nodeId -> !nodes.nodeExists(nodeId));
- }
-
- /**
- * Finds all the nodes that need to be fetched. Those are nodes that have no
- * data, and are not in fetch mode.
- */
- private List> findNodesToFetch(Map> shardCache) {
- List> nodesToFetch = new ArrayList<>();
- for (NodeEntry nodeEntry : shardCache.values()) {
- if (nodeEntry.hasData() == false && nodeEntry.isFetching() == false) {
- nodesToFetch.add(nodeEntry);
- }
- }
- return nodesToFetch;
- }
-
- /**
- * Are there any nodes that are fetching data?
- */
- private boolean hasAnyNodeFetching(Map> shardCache) {
- for (NodeEntry nodeEntry : shardCache.values()) {
- if (nodeEntry.isFetching()) {
- return true;
- }
- }
- return false;
- }
-
/**
* Async fetches data for the provided shard with the set of nodes that need to be fetched from.
*/
@@ -415,6 +272,72 @@ public void onFailure(Exception e) {
});
}
+ /**
+ * Cache implementation of transport actions returning single shard related data in the response.
+ * Store node level responses of transport actions like {@link TransportNodesListGatewayStartedShards} or
+ * {@link TransportNodesListShardStoreMetadata}.
+ *
+ * @param Response type of transport action.
+ */
+ static class ShardCache extends AsyncShardFetchCache {
+
+ private final Map> cache;
+
+ public ShardCache(Logger logger, String logKey, String type) {
+ super(Loggers.getLogger(logger, "_" + logKey), type);
+ cache = new HashMap<>();
+ }
+
+ @Override
+ public void initData(DiscoveryNode node) {
+ cache.put(node.getId(), new NodeEntry<>(node.getId()));
+ }
+
+ @Override
+ public void putData(DiscoveryNode node, K response) {
+ cache.get(node.getId()).doneFetching(response);
+ }
+
+ @Override
+ public K getData(DiscoveryNode node) {
+ return cache.get(node.getId()).getValue();
+ }
+
+ @NonNull
+ @Override
+ public Map getCache() {
+ return cache;
+ }
+
+ @Override
+ public void deleteShard(ShardId shardId) {
+ cache.clear(); // single shard cache can clear the full map
+ }
+
+ /**
+ * A node entry, holding the state of the fetched data for a specific shard
+ * for a giving node.
+ */
+ static class NodeEntry extends AsyncShardFetchCache.BaseNodeEntry {
+ @Nullable
+ private U value;
+
+ void doneFetching(U value) {
+ super.doneFetching();
+ this.value = value;
+ }
+
+ NodeEntry(String nodeId) {
+ super(nodeId);
+ }
+
+ U getValue() {
+ return value;
+ }
+
+ }
+ }
+
/**
* The result of a fetch operation. Make sure to first check {@link #hasData()} before
* fetching the actual data.
@@ -460,83 +383,4 @@ public void processAllocation(RoutingAllocation allocation) {
}
}
-
- /**
- * A node entry, holding the state of the fetched data for a specific shard
- * for a giving node.
- */
- static class NodeEntry {
- private final String nodeId;
- private boolean fetching;
- @Nullable
- private T value;
- private boolean valueSet;
- private Throwable failure;
- private long fetchingRound;
-
- NodeEntry(String nodeId) {
- this.nodeId = nodeId;
- }
-
- String getNodeId() {
- return this.nodeId;
- }
-
- boolean isFetching() {
- return fetching;
- }
-
- void markAsFetching(long fetchingRound) {
- assert fetching == false : "double marking a node as fetching";
- this.fetching = true;
- this.fetchingRound = fetchingRound;
- }
-
- void doneFetching(T value) {
- assert fetching : "setting value but not in fetching mode";
- assert failure == null : "setting value when failure already set";
- this.valueSet = true;
- this.value = value;
- this.fetching = false;
- }
-
- void doneFetching(Throwable failure) {
- assert fetching : "setting value but not in fetching mode";
- assert valueSet == false : "setting failure when already set value";
- assert failure != null : "setting failure can't be null";
- this.failure = failure;
- this.fetching = false;
- }
-
- void restartFetching() {
- assert fetching : "restarting fetching, but not in fetching mode";
- assert valueSet == false : "value can't be set when restarting fetching";
- assert failure == null : "failure can't be set when restarting fetching";
- this.fetching = false;
- }
-
- boolean isFailed() {
- return failure != null;
- }
-
- boolean hasData() {
- return valueSet || failure != null;
- }
-
- Throwable getFailure() {
- assert hasData() : "getting failure when data has not been fetched";
- return failure;
- }
-
- @Nullable
- T getValue() {
- assert failure == null : "trying to fetch value, but its marked as failed, check isFailed";
- assert valueSet : "value is not set, hasn't been fetched yet";
- return value;
- }
-
- long getFetchingRound() {
- return fetchingRound;
- }
- }
}
diff --git a/server/src/main/java/org/opensearch/gateway/AsyncShardFetchCache.java b/server/src/main/java/org/opensearch/gateway/AsyncShardFetchCache.java
new file mode 100644
index 0000000000000..3140ceef4f3ee
--- /dev/null
+++ b/server/src/main/java/org/opensearch/gateway/AsyncShardFetchCache.java
@@ -0,0 +1,316 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.gateway;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.message.ParameterizedMessage;
+import org.opensearch.ExceptionsHelper;
+import org.opensearch.OpenSearchTimeoutException;
+import org.opensearch.action.FailedNodeException;
+import org.opensearch.action.support.nodes.BaseNodeResponse;
+import org.opensearch.cluster.node.DiscoveryNode;
+import org.opensearch.cluster.node.DiscoveryNodes;
+import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
+import org.opensearch.core.index.shard.ShardId;
+import org.opensearch.transport.ReceiveTimeoutTransportException;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import reactor.util.annotation.NonNull;
+
+/**
+ * AsyncShardFetchCache will operate on the node level cache which is map of String and BaseNodeEntry. initData,
+ * putData and getData needs to be called for all the nodes. This class is responsible for managing the flow for all
+ * the nodes.
+ * It'll also give useful insights like how many ongoing fetches are happening, how many nodes are left for fetch or
+ * mark some node in fetching mode. All of these functionalities require checking the cache information and respond
+ * accordingly.
+ *
+ * initData : how to initialize an entry of shard cache for a node.
+ * putData : how to store the response of transport action in the cache.
+ * getData : how to get the stored data for any shard allocators like {@link PrimaryShardAllocator} or
+ * {@link ReplicaShardAllocator}
+ * deleteShard : how to clean up the stored data from cache for a shard.
+ *
+ * @param Response type of transport action which has the data to be stored in the cache.
+ *
+ * @opensearch.internal
+ */
+public abstract class AsyncShardFetchCache {
+ private final Logger logger;
+ private final String type;
+
+ protected AsyncShardFetchCache(Logger logger, String type) {
+ this.logger = logger;
+ this.type = type;
+ }
+
+ abstract void initData(DiscoveryNode node);
+
+ abstract void putData(DiscoveryNode node, K response);
+
+ abstract K getData(DiscoveryNode node);
+
+ @NonNull
+ abstract Map getCache();
+
+ /**
+ * Cleanup cached data for this shard once it's started. Cleanup only happens at shard level. Node entries will
+ * automatically be cleaned up once shards are assigned.
+ *
+ * @param shardId for which we need to free up the cached data.
+ */
+ abstract void deleteShard(ShardId shardId);
+
+ /**
+ * Returns the number of fetches that are currently ongoing.
+ */
+ int getInflightFetches() {
+ int count = 0;
+ for (BaseNodeEntry nodeEntry : getCache().values()) {
+ if (nodeEntry.isFetching()) {
+ count++;
+ }
+ }
+ return count;
+ }
+
+ /**
+ * Fills the shard fetched data with new (data) nodes and a fresh NodeEntry, and removes from
+ * it nodes that are no longer part of the state.
+ */
+ void fillShardCacheWithDataNodes(DiscoveryNodes nodes) {
+ // verify that all current data nodes are there
+ for (final DiscoveryNode node : nodes.getDataNodes().values()) {
+ if (getCache().containsKey(node.getId()) == false) {
+ initData(node);
+ }
+ }
+ // remove nodes that are not longer part of the data nodes set
+ getCache().keySet().removeIf(nodeId -> !nodes.nodeExists(nodeId));
+ }
+
+ /**
+ * Finds all the nodes that need to be fetched. Those are nodes that have no
+ * data, and are not in fetch mode.
+ */
+ List findNodesToFetch() {
+ List nodesToFetch = new ArrayList<>();
+ for (BaseNodeEntry nodeEntry : getCache().values()) {
+ if (nodeEntry.hasData() == false && nodeEntry.isFetching() == false) {
+ nodesToFetch.add(nodeEntry.getNodeId());
+ }
+ }
+ return nodesToFetch;
+ }
+
+ /**
+ * Are there any nodes that are fetching data?
+ */
+ boolean hasAnyNodeFetching() {
+ for (BaseNodeEntry nodeEntry : getCache().values()) {
+ if (nodeEntry.isFetching()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Get the data from cache, ignore the failed entries. Use getData functional interface to get the data, as
+ * different implementations may have different ways to populate the data from cache.
+ *
+ * @param nodes Discovery nodes for which we need to return the cache data.
+ * @param failedNodes return failedNodes with the nodes where fetch has failed.
+ * @return Map of cache data for every DiscoveryNode.
+ */
+ Map getCacheData(DiscoveryNodes nodes, Set failedNodes) {
+ Map fetchData = new HashMap<>();
+ for (Iterator extends Map.Entry> it = getCache().entrySet().iterator(); it.hasNext();) {
+ Map.Entry entry = (Map.Entry) it.next();
+ String nodeId = entry.getKey();
+ BaseNodeEntry nodeEntry = entry.getValue();
+
+ DiscoveryNode node = nodes.get(nodeId);
+ if (node != null) {
+ if (nodeEntry.isFailed()) {
+ // if its failed, remove it from the list of nodes, so if this run doesn't work
+ // we try again next round to fetch it again
+ it.remove();
+ failedNodes.add(nodeEntry.getNodeId());
+ } else {
+ K nodeResponse = getData(node);
+ if (nodeResponse != null) {
+ fetchData.put(node, nodeResponse);
+ }
+ }
+ }
+ }
+ return fetchData;
+ }
+
+ void processResponses(List responses, long fetchingRound) {
+ for (K response : responses) {
+ BaseNodeEntry nodeEntry = getCache().get(response.getNode().getId());
+ if (nodeEntry != null) {
+ if (validateNodeResponse(nodeEntry, fetchingRound)) {
+ // if the entry is there, for the right fetching round and not marked as failed already, process it
+ logger.trace("marking {} as done for [{}], result is [{}]", nodeEntry.getNodeId(), type, response);
+ putData(response.getNode(), response);
+ }
+ }
+ }
+ }
+
+ private boolean validateNodeResponse(BaseNodeEntry nodeEntry, long fetchingRound) {
+ if (nodeEntry.getFetchingRound() != fetchingRound) {
+ assert nodeEntry.getFetchingRound() > fetchingRound : "node entries only replaced by newer rounds";
+ logger.trace(
+ "received response for [{}] from node {} for an older fetching round (expected: {} but was: {})",
+ nodeEntry.getNodeId(),
+ type,
+ nodeEntry.getFetchingRound(),
+ fetchingRound
+ );
+ return false;
+ } else if (nodeEntry.isFailed()) {
+ logger.trace("node {} has failed for [{}] (failure [{}])", nodeEntry.getNodeId(), type, nodeEntry.getFailure());
+ return false;
+ }
+ return true;
+ }
+
+ private void handleNodeFailure(BaseNodeEntry nodeEntry, FailedNodeException failure, long fetchingRound) {
+ if (nodeEntry.getFetchingRound() != fetchingRound) {
+ assert nodeEntry.getFetchingRound() > fetchingRound : "node entries only replaced by newer rounds";
+ logger.trace(
+ "received failure for [{}] from node {} for an older fetching round (expected: {} but was: {})",
+ nodeEntry.getNodeId(),
+ type,
+ nodeEntry.getFetchingRound(),
+ fetchingRound
+ );
+ } else if (nodeEntry.isFailed() == false) {
+ // if the entry is there, for the right fetching round and not marked as failed already, process it
+ Throwable unwrappedCause = ExceptionsHelper.unwrapCause(failure.getCause());
+ // if the request got rejected or timed out, we need to try it again next time...
+ if (retryableException(unwrappedCause)) {
+ nodeEntry.restartFetching();
+ } else {
+ logger.warn(() -> new ParameterizedMessage("failed to list shard for {} on node [{}]", type, failure.nodeId()), failure);
+ nodeEntry.doneFetching(failure.getCause());
+ }
+ }
+ }
+
+ boolean retryableException(Throwable unwrappedCause) {
+ return unwrappedCause instanceof OpenSearchRejectedExecutionException
+ || unwrappedCause instanceof ReceiveTimeoutTransportException
+ || unwrappedCause instanceof OpenSearchTimeoutException;
+ }
+
+ void processFailures(List failures, long fetchingRound) {
+ for (FailedNodeException failure : failures) {
+ logger.trace("processing failure {} for [{}]", failure, type);
+ BaseNodeEntry nodeEntry = getCache().get(failure.nodeId());
+ if (nodeEntry != null) {
+ handleNodeFailure(nodeEntry, failure, fetchingRound);
+ }
+ }
+ }
+
+ /**
+ * Common function for removing whole node entry.
+ *
+ * @param nodeId nodeId to be cleaned.
+ */
+ void remove(String nodeId) {
+ this.getCache().remove(nodeId);
+ }
+
+ void markAsFetching(List nodeIds, long fetchingRound) {
+ for (String nodeId : nodeIds) {
+ getCache().get(nodeId).markAsFetching(fetchingRound);
+ }
+ }
+
+ /**
+ * A node entry, holding only node level fetching related information.
+ * Actual metadata of shard is stored in child classes.
+ */
+ static class BaseNodeEntry {
+ private final String nodeId;
+ private boolean fetching;
+ private boolean valueSet;
+ private Throwable failure;
+ private long fetchingRound;
+
+ BaseNodeEntry(String nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ String getNodeId() {
+ return this.nodeId;
+ }
+
+ boolean isFetching() {
+ return fetching;
+ }
+
+ void markAsFetching(long fetchingRound) {
+ assert fetching == false : "double marking a node as fetching";
+ this.fetching = true;
+ this.fetchingRound = fetchingRound;
+ }
+
+ void doneFetching() {
+ assert fetching : "setting value but not in fetching mode";
+ assert failure == null : "setting value when failure already set";
+ this.valueSet = true;
+ this.fetching = false;
+ }
+
+ void doneFetching(Throwable failure) {
+ assert fetching : "setting value but not in fetching mode";
+ assert valueSet == false : "setting failure when already set value";
+ assert failure != null : "setting failure can't be null";
+ this.failure = failure;
+ this.fetching = false;
+ }
+
+ void restartFetching() {
+ assert fetching : "restarting fetching, but not in fetching mode";
+ assert valueSet == false : "value can't be set when restarting fetching";
+ assert failure == null : "failure can't be set when restarting fetching";
+ this.fetching = false;
+ }
+
+ boolean isFailed() {
+ return failure != null;
+ }
+
+ boolean hasData() {
+ return valueSet || failure != null;
+ }
+
+ Throwable getFailure() {
+ assert hasData() : "getting failure when data has not been fetched";
+ return failure;
+ }
+
+ long getFetchingRound() {
+ return fetchingRound;
+ }
+ }
+}
diff --git a/server/src/main/java/org/opensearch/index/mapper/NumberFieldMapper.java b/server/src/main/java/org/opensearch/index/mapper/NumberFieldMapper.java
index 524d2b0e0dd38..eb3a99b0e0388 100644
--- a/server/src/main/java/org/opensearch/index/mapper/NumberFieldMapper.java
+++ b/server/src/main/java/org/opensearch/index/mapper/NumberFieldMapper.java
@@ -37,7 +37,6 @@
import org.apache.lucene.document.DoublePoint;
import org.apache.lucene.document.Field;
-import org.apache.lucene.document.FloatField;
import org.apache.lucene.document.FloatPoint;
import org.apache.lucene.document.IntPoint;
import org.apache.lucene.document.LongPoint;
@@ -372,7 +371,7 @@ public Query termsQuery(String field, List