> pinnedTimestampWithFetchTimestamp_3 = RemoteStorePinnedTimestampService.getPinnedTimestamps();
+ long lastFetchTimestamp_3 = pinnedTimestampWithFetchTimestamp_3.v1();
+ assertTrue(lastFetchTimestamp_3 != -1);
+ assertEquals(Set.of(timestamp1, timestamp3), pinnedTimestampWithFetchTimestamp_3.v2());
+ });
+
+ remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueMinutes(3));
+ }
+}
diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java
index 31c73e2fc03ae..86d586cd17146 100644
--- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java
+++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java
@@ -250,7 +250,7 @@ public void testStatsResponseFromLocalNode() {
}
}
- @TestLogging(reason = "Getting trace logs from remote store package", value = "org.opensearch.remotestore:TRACE")
+ @TestLogging(reason = "Getting trace logs from remote store package", value = "org.opensearch.index.shard:TRACE")
public void testDownloadStatsCorrectnessSinglePrimarySingleReplica() throws Exception {
setup();
// Scenario:
@@ -280,11 +280,13 @@ public void testDownloadStatsCorrectnessSinglePrimarySingleReplica() throws Exce
.get(0)
.getSegmentStats();
logger.info(
- "Zero state primary stats: {}ms refresh time lag, {}b bytes lag, {}b upload bytes started and {}b upload bytes failed.",
+ "Zero state primary stats: {}ms refresh time lag, {}b bytes lag, {}b upload bytes started, {}b upload bytes failed , {} uploads succeeded, {} upload byes succeeded.",
zeroStatePrimaryStats.refreshTimeLagMs,
zeroStatePrimaryStats.bytesLag,
zeroStatePrimaryStats.uploadBytesStarted,
- zeroStatePrimaryStats.uploadBytesFailed
+ zeroStatePrimaryStats.uploadBytesFailed,
+ zeroStatePrimaryStats.totalUploadsSucceeded,
+ zeroStatePrimaryStats.uploadBytesSucceeded
);
assertTrue(
zeroStatePrimaryStats.totalUploadsStarted == zeroStatePrimaryStats.totalUploadsSucceeded
@@ -348,7 +350,7 @@ public void testDownloadStatsCorrectnessSinglePrimarySingleReplica() throws Exce
}
}
- @TestLogging(reason = "Getting trace logs from remote store package", value = "org.opensearch.remotestore:TRACE")
+ @TestLogging(reason = "Getting trace logs from remote store package", value = "org.opensearch.index.shard:TRACE")
public void testDownloadStatsCorrectnessSinglePrimaryMultipleReplicaShards() throws Exception {
setup();
// Scenario:
@@ -382,11 +384,13 @@ public void testDownloadStatsCorrectnessSinglePrimaryMultipleReplicaShards() thr
.get(0)
.getSegmentStats();
logger.info(
- "Zero state primary stats: {}ms refresh time lag, {}b bytes lag, {}b upload bytes started and {}b upload bytes failed.",
+ "Zero state primary stats: {}ms refresh time lag, {}b bytes lag, {}b upload bytes started, {}b upload bytes failed , {} uploads succeeded, {} upload byes succeeded.",
zeroStatePrimaryStats.refreshTimeLagMs,
zeroStatePrimaryStats.bytesLag,
zeroStatePrimaryStats.uploadBytesStarted,
- zeroStatePrimaryStats.uploadBytesFailed
+ zeroStatePrimaryStats.uploadBytesFailed,
+ zeroStatePrimaryStats.totalUploadsSucceeded,
+ zeroStatePrimaryStats.uploadBytesSucceeded
);
assertTrue(
zeroStatePrimaryStats.totalUploadsStarted == zeroStatePrimaryStats.totalUploadsSucceeded
@@ -617,7 +621,7 @@ public void testNonZeroPrimaryStatsOnNewlyCreatedIndexWithZeroDocs() throws Exce
}
assertZeroTranslogDownloadStats(translogStats);
});
- }, 5, TimeUnit.SECONDS);
+ }, 10, TimeUnit.SECONDS);
}
public void testStatsCorrectnessOnFailover() {
diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/WritableWarmIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/WritableWarmIT.java
index a51bd6b20fff0..88c9ae436e85f 100644
--- a/server/src/internalClusterTest/java/org/opensearch/remotestore/WritableWarmIT.java
+++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/WritableWarmIT.java
@@ -20,6 +20,8 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsException;
import org.opensearch.common.util.FeatureFlags;
+import org.opensearch.core.common.unit.ByteSizeUnit;
+import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.index.IndexModule;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.index.shard.IndexShard;
@@ -65,11 +67,20 @@ protected Settings featureFlagSettings() {
return featureSettings.build();
}
+ @Override
+ protected Settings nodeSettings(int nodeOrdinal) {
+ ByteSizeValue cacheSize = new ByteSizeValue(16, ByteSizeUnit.GB);
+ return Settings.builder()
+ .put(super.nodeSettings(nodeOrdinal))
+ .put(Node.NODE_SEARCH_CACHE_SIZE_SETTING.getKey(), cacheSize.toString())
+ .build();
+ }
+
public void testWritableWarmFeatureFlagDisabled() {
Settings clusterSettings = Settings.builder().put(super.nodeSettings(0)).put(FeatureFlags.TIERED_REMOTE_INDEX, false).build();
InternalTestCluster internalTestCluster = internalCluster();
internalTestCluster.startClusterManagerOnlyNode(clusterSettings);
- internalTestCluster.startDataOnlyNode(clusterSettings);
+ internalTestCluster.startDataAndSearchNodes(1);
Settings indexSettings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
@@ -94,7 +105,7 @@ public void testWritableWarmFeatureFlagDisabled() {
public void testWritableWarmBasic() throws Exception {
InternalTestCluster internalTestCluster = internalCluster();
internalTestCluster.startClusterManagerOnlyNode();
- internalTestCluster.startDataOnlyNode();
+ internalTestCluster.startDataAndSearchNodes(1);
Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
diff --git a/server/src/internalClusterTest/java/org/opensearch/search/query/SearchQueryIT.java b/server/src/internalClusterTest/java/org/opensearch/search/query/SearchQueryIT.java
index 01ad06757640c..3cf63e2f19a16 100644
--- a/server/src/internalClusterTest/java/org/opensearch/search/query/SearchQueryIT.java
+++ b/server/src/internalClusterTest/java/org/opensearch/search/query/SearchQueryIT.java
@@ -52,6 +52,7 @@
import org.opensearch.common.time.DateFormatter;
import org.opensearch.common.unit.Fuzziness;
import org.opensearch.common.xcontent.XContentFactory;
+import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.MediaTypeRegistry;
import org.opensearch.core.xcontent.XContentBuilder;
@@ -66,6 +67,7 @@
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.index.query.RangeQueryBuilder;
import org.opensearch.index.query.TermQueryBuilder;
+import org.opensearch.index.query.TermsQueryBuilder;
import org.opensearch.index.query.WildcardQueryBuilder;
import org.opensearch.index.query.WrapperQueryBuilder;
import org.opensearch.index.query.functionscore.ScoreFunctionBuilders;
@@ -84,6 +86,7 @@
import java.io.IOException;
import java.io.Reader;
+import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZoneOffset;
@@ -98,6 +101,8 @@
import java.util.concurrent.ExecutionException;
import java.util.regex.Pattern;
+import org.roaringbitmap.RoaringBitmap;
+
import static java.util.Collections.singletonMap;
import static org.opensearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
@@ -1157,6 +1162,41 @@ public void testTermsQuery() throws Exception {
assertHitCount(searchResponse, 0L);
}
+ public void testTermsQueryWithBitmapDocValuesQuery() throws Exception {
+ assertAcked(
+ prepareCreate("products").setMapping(
+ jsonBuilder().startObject()
+ .startObject("properties")
+ .startObject("product")
+ .field("type", "integer")
+ .field("index", false)
+ .endObject()
+ .endObject()
+ .endObject()
+ )
+ );
+ indexRandom(
+ true,
+ client().prepareIndex("products").setId("1").setSource("product", 1),
+ client().prepareIndex("products").setId("2").setSource("product", 2),
+ client().prepareIndex("products").setId("3").setSource("product", new int[] { 1, 3 }),
+ client().prepareIndex("products").setId("4").setSource("product", 4)
+ );
+
+ RoaringBitmap r = new RoaringBitmap();
+ r.add(1);
+ r.add(4);
+ byte[] array = new byte[r.serializedSizeInBytes()];
+ r.serialize(ByteBuffer.wrap(array));
+ BytesArray bitmap = new BytesArray(array);
+ // directly building the terms query builder, so pass in the bitmap value as BytesArray
+ SearchResponse searchResponse = client().prepareSearch("products")
+ .setQuery(constantScoreQuery(termsQuery("product", bitmap).valueType(TermsQueryBuilder.ValueType.BITMAP)))
+ .get();
+ assertHitCount(searchResponse, 3L);
+ assertSearchHits(searchResponse, "1", "3", "4");
+ }
+
public void testTermsLookupFilter() throws Exception {
assertAcked(prepareCreate("lookup").setMapping("terms", "type=text", "other", "type=text"));
indexRandomForConcurrentSearch("lookup");
diff --git a/server/src/internalClusterTest/java/org/opensearch/search/sort/FieldSortIT.java b/server/src/internalClusterTest/java/org/opensearch/search/sort/FieldSortIT.java
index e40928f15e8a8..fdb12639c65be 100644
--- a/server/src/internalClusterTest/java/org/opensearch/search/sort/FieldSortIT.java
+++ b/server/src/internalClusterTest/java/org/opensearch/search/sort/FieldSortIT.java
@@ -42,6 +42,7 @@
import org.opensearch.action.bulk.BulkRequestBuilder;
import org.opensearch.action.index.IndexRequestBuilder;
import org.opensearch.action.search.SearchPhaseExecutionException;
+import org.opensearch.action.search.SearchRequestBuilder;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.ShardSearchFailure;
import org.opensearch.cluster.metadata.IndexMetadata;
@@ -90,6 +91,7 @@
import static org.opensearch.script.MockScriptPlugin.NAME;
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
+import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertFailures;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertFirstHit;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoFailures;
@@ -919,7 +921,7 @@ public void testSortMissingNumbers() throws Exception {
client().prepareIndex("test")
.setId("3")
.setSource(
- jsonBuilder().startObject().field("id", "3").field("i_value", 2).field("d_value", 2.2).field("u_value", 2).endObject()
+ jsonBuilder().startObject().field("id", "3").field("i_value", 2).field("d_value", 2.2).field("u_value", 3).endObject()
)
.get();
@@ -964,6 +966,18 @@ public void testSortMissingNumbers() throws Exception {
assertThat(searchResponse.getHits().getAt(1).getId(), equalTo("1"));
assertThat(searchResponse.getHits().getAt(2).getId(), equalTo("3"));
+ logger.info("--> sort with custom missing value");
+ searchResponse = client().prepareSearch()
+ .setQuery(matchAllQuery())
+ .addSort(SortBuilders.fieldSort("i_value").order(SortOrder.ASC).missing(randomBoolean() ? 1 : "1"))
+ .get();
+ assertNoFailures(searchResponse);
+
+ assertThat(searchResponse.getHits().getTotalHits().value, equalTo(3L));
+ assertThat(searchResponse.getHits().getAt(0).getId(), equalTo("1"));
+ assertThat(searchResponse.getHits().getAt(1).getId(), equalTo("2"));
+ assertThat(searchResponse.getHits().getAt(2).getId(), equalTo("3"));
+
// FLOAT
logger.info("--> sort with no missing (same as missing _last)");
searchResponse = client().prepareSearch()
@@ -1001,6 +1015,18 @@ public void testSortMissingNumbers() throws Exception {
assertThat(searchResponse.getHits().getAt(1).getId(), equalTo("1"));
assertThat(searchResponse.getHits().getAt(2).getId(), equalTo("3"));
+ logger.info("--> sort with custom missing value");
+ searchResponse = client().prepareSearch()
+ .setQuery(matchAllQuery())
+ .addSort(SortBuilders.fieldSort("d_value").order(SortOrder.ASC).missing(randomBoolean() ? 1.1 : "1.1"))
+ .get();
+ assertNoFailures(searchResponse);
+
+ assertThat(searchResponse.getHits().getTotalHits().value, equalTo(3L));
+ assertThat(searchResponse.getHits().getAt(0).getId(), equalTo("1"));
+ assertThat(searchResponse.getHits().getAt(1).getId(), equalTo("2"));
+ assertThat(searchResponse.getHits().getAt(2).getId(), equalTo("3"));
+
// UNSIGNED_LONG
logger.info("--> sort with no missing (same as missing _last)");
searchResponse = client().prepareSearch()
@@ -1037,6 +1063,24 @@ public void testSortMissingNumbers() throws Exception {
assertThat(searchResponse.getHits().getAt(0).getId(), equalTo("2"));
assertThat(searchResponse.getHits().getAt(1).getId(), equalTo("1"));
assertThat(searchResponse.getHits().getAt(2).getId(), equalTo("3"));
+
+ logger.info("--> sort with custom missing value");
+ searchResponse = client().prepareSearch()
+ .setQuery(matchAllQuery())
+ .addSort(SortBuilders.fieldSort("u_value").order(SortOrder.ASC).missing(randomBoolean() ? 2 : "2"))
+ .get();
+ assertNoFailures(searchResponse);
+
+ assertThat(searchResponse.getHits().getTotalHits().value, equalTo(3L));
+ assertThat(searchResponse.getHits().getAt(0).getId(), equalTo("1"));
+ assertThat(searchResponse.getHits().getAt(1).getId(), equalTo("2"));
+ assertThat(searchResponse.getHits().getAt(2).getId(), equalTo("3"));
+
+ logger.info("--> sort with negative missing value");
+ SearchRequestBuilder searchRequestBuilder = client().prepareSearch()
+ .setQuery(matchAllQuery())
+ .addSort(SortBuilders.fieldSort("u_value").order(SortOrder.ASC).missing(randomBoolean() ? -1 : "-1"));
+ assertFailures(searchRequestBuilder, RestStatus.BAD_REQUEST, containsString("Value [-1] is out of range for an unsigned long"));
}
public void testSortMissingNumbersMinMax() throws Exception {
diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotIT.java
index e688a4491b1a7..2331d52c3a1bc 100644
--- a/server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotIT.java
+++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotIT.java
@@ -19,6 +19,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.store.RemoteBufferedOutputDirectory;
+import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
@@ -287,8 +288,14 @@ public void testDeleteMultipleShallowCopySnapshotsCase3() throws Exception {
public void testRemoteStoreCleanupForDeletedIndex() throws Exception {
disableRepoConsistencyCheck("Remote store repository is being used in the test");
final Path remoteStoreRepoPath = randomRepoPath();
- internalCluster().startClusterManagerOnlyNode(remoteStoreClusterSettings(REMOTE_REPO_NAME, remoteStoreRepoPath));
- internalCluster().startDataOnlyNode(remoteStoreClusterSettings(REMOTE_REPO_NAME, remoteStoreRepoPath));
+ Settings settings = remoteStoreClusterSettings(REMOTE_REPO_NAME, remoteStoreRepoPath);
+ // Disabling pinned timestamp as this test is specifically for shallow snapshot.
+ settings = Settings.builder()
+ .put(settings)
+ .put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED.getKey(), false)
+ .build();
+ internalCluster().startClusterManagerOnlyNode(settings);
+ internalCluster().startDataOnlyNode(settings);
final Client clusterManagerClient = internalCluster().clusterManagerClient();
ensureStableCluster(2);
diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java
index 1c199df4d548e..a19bbe49ad340 100644
--- a/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java
+++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java
@@ -67,7 +67,6 @@
import java.util.stream.StreamSupport;
import static org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest.Metric.FS;
-import static org.opensearch.common.util.FeatureFlags.TIERED_REMOTE_INDEX;
import static org.opensearch.core.common.util.CollectionUtils.iterableAsArrayList;
import static org.opensearch.index.store.remote.filecache.FileCacheSettings.DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING;
import static org.opensearch.test.NodeRoles.clusterManagerOnlyNode;
@@ -1019,11 +1018,12 @@ public void testStartSearchNode() throws Exception {
internalCluster().startNode(Settings.builder().put(onlyRole(DiscoveryNodeRole.SEARCH_ROLE)));
// test start node without search role
internalCluster().startNode(Settings.builder().put(onlyRole(DiscoveryNodeRole.DATA_ROLE)));
- // test start non-dedicated search node with TIERED_REMOTE_INDEX feature enabled
- internalCluster().startNode(
- Settings.builder()
- .put(onlyRoles(Set.of(DiscoveryNodeRole.SEARCH_ROLE, DiscoveryNodeRole.DATA_ROLE)))
- .put(TIERED_REMOTE_INDEX, true)
+ // test start non-dedicated search node, if the user doesn't configure the cache size, it fails
+ assertThrows(
+ SettingsException.class,
+ () -> internalCluster().startNode(
+ Settings.builder().put(onlyRoles(Set.of(DiscoveryNodeRole.SEARCH_ROLE, DiscoveryNodeRole.DATA_ROLE)))
+ )
);
// test start non-dedicated search node
assertThrows(
diff --git a/server/src/main/java/org/apache/lucene/codecs/lucene90/Lucene90DocValuesConsumerWrapper.java b/server/src/main/java/org/apache/lucene/codecs/lucene90/Lucene90DocValuesConsumerWrapper.java
new file mode 100644
index 0000000000000..67ee45f4c9306
--- /dev/null
+++ b/server/src/main/java/org/apache/lucene/codecs/lucene90/Lucene90DocValuesConsumerWrapper.java
@@ -0,0 +1,46 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.apache.lucene.codecs.lucene90;
+
+import org.apache.lucene.codecs.DocValuesConsumer;
+import org.apache.lucene.index.SegmentWriteState;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * This class is an abstraction of the {@link DocValuesConsumer} for the Star Tree index structure.
+ * It is responsible to consume various types of document values (numeric, binary, sorted, sorted numeric,
+ * and sorted set) for fields in the Star Tree index.
+ *
+ * @opensearch.experimental
+ */
+public class Lucene90DocValuesConsumerWrapper implements Closeable {
+
+ private final Lucene90DocValuesConsumer lucene90DocValuesConsumer;
+
+ public Lucene90DocValuesConsumerWrapper(
+ SegmentWriteState state,
+ String dataCodec,
+ String dataExtension,
+ String metaCodec,
+ String metaExtension
+ ) throws IOException {
+ lucene90DocValuesConsumer = new Lucene90DocValuesConsumer(state, dataCodec, dataExtension, metaCodec, metaExtension);
+ }
+
+ public Lucene90DocValuesConsumer getLucene90DocValuesConsumer() {
+ return lucene90DocValuesConsumer;
+ }
+
+ @Override
+ public void close() throws IOException {
+ lucene90DocValuesConsumer.close();
+ }
+}
diff --git a/server/src/main/java/org/apache/lucene/codecs/lucene90/Lucene90DocValuesProducerWrapper.java b/server/src/main/java/org/apache/lucene/codecs/lucene90/Lucene90DocValuesProducerWrapper.java
new file mode 100644
index 0000000000000..a213852c59094
--- /dev/null
+++ b/server/src/main/java/org/apache/lucene/codecs/lucene90/Lucene90DocValuesProducerWrapper.java
@@ -0,0 +1,46 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.apache.lucene.codecs.lucene90;
+
+import org.apache.lucene.codecs.DocValuesProducer;
+import org.apache.lucene.index.SegmentReadState;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * This class is a custom abstraction of the {@link DocValuesProducer} for the Star Tree index structure.
+ * It is responsible for providing access to various types of document values (numeric, binary, sorted, sorted numeric,
+ * and sorted set) for fields in the Star Tree index.
+ *
+ * @opensearch.experimental
+ */
+public class Lucene90DocValuesProducerWrapper implements Closeable {
+
+ private final Lucene90DocValuesProducer lucene90DocValuesProducer;
+
+ public Lucene90DocValuesProducerWrapper(
+ SegmentReadState state,
+ String dataCodec,
+ String dataExtension,
+ String metaCodec,
+ String metaExtension
+ ) throws IOException {
+ lucene90DocValuesProducer = new Lucene90DocValuesProducer(state, dataCodec, dataExtension, metaCodec, metaExtension);
+ }
+
+ public DocValuesProducer getLucene90DocValuesProducer() {
+ return lucene90DocValuesProducer;
+ }
+
+ @Override
+ public void close() throws IOException {
+ lucene90DocValuesProducer.close();
+ }
+}
diff --git a/server/src/main/java/org/apache/lucene/index/SortedNumericDocValuesWriterWrapper.java b/server/src/main/java/org/apache/lucene/index/SortedNumericDocValuesWriterWrapper.java
new file mode 100644
index 0000000000000..f7759fcced284
--- /dev/null
+++ b/server/src/main/java/org/apache/lucene/index/SortedNumericDocValuesWriterWrapper.java
@@ -0,0 +1,53 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.apache.lucene.index;
+
+import org.apache.lucene.util.Counter;
+
+/**
+ * A wrapper class for writing sorted numeric doc values.
+ *
+ * This class provides a convenient way to add sorted numeric doc values to a field
+ * and retrieve the corresponding {@link SortedNumericDocValues} instance.
+ *
+ * @opensearch.experimental
+ */
+public class SortedNumericDocValuesWriterWrapper {
+
+ private final SortedNumericDocValuesWriter sortedNumericDocValuesWriter;
+
+ /**
+ * Sole constructor. Constructs a new {@link SortedNumericDocValuesWriterWrapper} instance.
+ *
+ * @param fieldInfo the field information for the field being written
+ * @param counter a counter for tracking memory usage
+ */
+ public SortedNumericDocValuesWriterWrapper(FieldInfo fieldInfo, Counter counter) {
+ sortedNumericDocValuesWriter = new SortedNumericDocValuesWriter(fieldInfo, counter);
+ }
+
+ /**
+ * Adds a value to the sorted numeric doc values for the specified document.
+ *
+ * @param docID the document ID
+ * @param value the value to add
+ */
+ public void addValue(int docID, long value) {
+ sortedNumericDocValuesWriter.addValue(docID, value);
+ }
+
+ /**
+ * Returns the {@link SortedNumericDocValues} instance containing the sorted numeric doc values
+ *
+ * @return the {@link SortedNumericDocValues} instance
+ */
+ public SortedNumericDocValues getDocValues() {
+ return sortedNumericDocValuesWriter.getDocValues();
+ }
+}
diff --git a/server/src/main/java/org/opensearch/action/ActionModule.java b/server/src/main/java/org/opensearch/action/ActionModule.java
index 574b7029a6501..c86e6580122d5 100644
--- a/server/src/main/java/org/opensearch/action/ActionModule.java
+++ b/server/src/main/java/org/opensearch/action/ActionModule.java
@@ -1200,9 +1200,12 @@ public void unregisterDynamicRoute(NamedRoute route) {
* @param route The {@link RestHandler.Route}.
* @return the corresponding {@link RestSendToExtensionAction} if it is registered, null otherwise.
*/
- @SuppressWarnings("unchecked")
public RestSendToExtensionAction get(RestHandler.Route route) {
- return routeRegistry.get(route);
+ if (route instanceof NamedRoute) {
+ return routeRegistry.get((NamedRoute) route);
+ }
+ // Only NamedRoutes are map keys so any other route is not in the map
+ return null;
}
}
}
diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestStats.java b/server/src/main/java/org/opensearch/action/search/SearchRequestStats.java
index 97ef94055faf7..d1d5f568fc09d 100644
--- a/server/src/main/java/org/opensearch/action/search/SearchRequestStats.java
+++ b/server/src/main/java/org/opensearch/action/search/SearchRequestStats.java
@@ -27,6 +27,7 @@
@PublicApi(since = "2.11.0")
public final class SearchRequestStats extends SearchRequestOperationsListener {
Map phaseStatsMap = new EnumMap<>(SearchPhaseName.class);
+ StatsHolder tookStatsHolder;
public static final String SEARCH_REQUEST_STATS_ENABLED_KEY = "search.request_stats_enabled";
public static final Setting SEARCH_REQUEST_STATS_ENABLED = Setting.boolSetting(
@@ -40,6 +41,7 @@ public final class SearchRequestStats extends SearchRequestOperationsListener {
public SearchRequestStats(ClusterSettings clusterSettings) {
this.setEnabled(clusterSettings.get(SEARCH_REQUEST_STATS_ENABLED));
clusterSettings.addSettingsUpdateConsumer(SEARCH_REQUEST_STATS_ENABLED, this::setEnabled);
+ tookStatsHolder = new StatsHolder();
for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) {
phaseStatsMap.put(searchPhaseName, new StatsHolder());
}
@@ -57,6 +59,18 @@ public long getPhaseMetric(SearchPhaseName searchPhaseName) {
return phaseStatsMap.get(searchPhaseName).timing.sum();
}
+ public long getTookCurrent() {
+ return tookStatsHolder.current.count();
+ }
+
+ public long getTookTotal() {
+ return tookStatsHolder.total.count();
+ }
+
+ public long getTookMetric() {
+ return tookStatsHolder.timing.sum();
+ }
+
@Override
protected void onPhaseStart(SearchPhaseContext context) {
phaseStatsMap.get(context.getCurrentPhase().getSearchPhaseName()).current.inc();
@@ -75,6 +89,23 @@ protected void onPhaseFailure(SearchPhaseContext context, Throwable cause) {
phaseStatsMap.get(context.getCurrentPhase().getSearchPhaseName()).current.dec();
}
+ @Override
+ protected void onRequestStart(SearchRequestContext searchRequestContext) {
+ tookStatsHolder.current.inc();
+ }
+
+ @Override
+ protected void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
+ tookStatsHolder.current.dec();
+ tookStatsHolder.total.inc();
+ tookStatsHolder.timing.inc(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - searchRequestContext.getAbsoluteStartNanos()));
+ }
+
+ @Override
+ protected void onRequestFailure(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
+ tookStatsHolder.current.dec();
+ }
+
/**
* Holder of statistics values
*
diff --git a/server/src/main/java/org/opensearch/action/search/SearchShardTask.java b/server/src/main/java/org/opensearch/action/search/SearchShardTask.java
index dfecf4f462c4d..ed2943db94420 100644
--- a/server/src/main/java/org/opensearch/action/search/SearchShardTask.java
+++ b/server/src/main/java/org/opensearch/action/search/SearchShardTask.java
@@ -37,8 +37,8 @@
import org.opensearch.core.tasks.TaskId;
import org.opensearch.search.fetch.ShardFetchSearchRequest;
import org.opensearch.search.internal.ShardSearchRequest;
-import org.opensearch.tasks.CancellableTask;
import org.opensearch.tasks.SearchBackpressureTask;
+import org.opensearch.wlm.QueryGroupTask;
import java.util.Map;
import java.util.function.Supplier;
@@ -50,7 +50,7 @@
* @opensearch.api
*/
@PublicApi(since = "1.0.0")
-public class SearchShardTask extends CancellableTask implements SearchBackpressureTask {
+public class SearchShardTask extends QueryGroupTask implements SearchBackpressureTask {
// generating metadata in a lazy way since source can be quite big
private final MemoizedSupplier metadataSupplier;
diff --git a/server/src/main/java/org/opensearch/action/search/SearchTask.java b/server/src/main/java/org/opensearch/action/search/SearchTask.java
index d3c1043c50cce..2a1a961e7607b 100644
--- a/server/src/main/java/org/opensearch/action/search/SearchTask.java
+++ b/server/src/main/java/org/opensearch/action/search/SearchTask.java
@@ -35,8 +35,8 @@
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.tasks.TaskId;
-import org.opensearch.tasks.CancellableTask;
import org.opensearch.tasks.SearchBackpressureTask;
+import org.opensearch.wlm.QueryGroupTask;
import java.util.Map;
import java.util.function.Supplier;
@@ -49,7 +49,7 @@
* @opensearch.api
*/
@PublicApi(since = "1.0.0")
-public class SearchTask extends CancellableTask implements SearchBackpressureTask {
+public class SearchTask extends QueryGroupTask implements SearchBackpressureTask {
// generating description in a lazy way since source can be quite big
private final Supplier descriptionSupplier;
private SearchProgressListener progressListener = SearchProgressListener.NOOP;
diff --git a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java
index 7d3237d43cd5c..88bf7ebea8e52 100644
--- a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java
+++ b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java
@@ -101,6 +101,7 @@
import org.opensearch.transport.RemoteTransportException;
import org.opensearch.transport.Transport;
import org.opensearch.transport.TransportService;
+import org.opensearch.wlm.QueryGroupTask;
import java.util.ArrayList;
import java.util.Arrays;
@@ -442,6 +443,12 @@ private void executeRequest(
);
searchRequestContext.getSearchRequestOperationsListener().onRequestStart(searchRequestContext);
+ // At this point either the QUERY_GROUP_ID header will be present in ThreadContext either via ActionFilter
+ // or HTTP header (HTTP header will be deprecated once ActionFilter is implemented)
+ if (task instanceof QueryGroupTask) {
+ ((QueryGroupTask) task).setQueryGroupId(threadPool.getThreadContext());
+ }
+
PipelinedRequest searchRequest;
ActionListener listener;
try {
diff --git a/server/src/main/java/org/opensearch/client/OriginSettingClient.java b/server/src/main/java/org/opensearch/client/OriginSettingClient.java
index 1b0e08cc489c4..27d87227df7bc 100644
--- a/server/src/main/java/org/opensearch/client/OriginSettingClient.java
+++ b/server/src/main/java/org/opensearch/client/OriginSettingClient.java
@@ -36,6 +36,7 @@
import org.opensearch.action.ActionType;
import org.opensearch.action.support.ContextPreservingActionListener;
import org.opensearch.common.util.concurrent.ThreadContext;
+import org.opensearch.common.util.concurrent.ThreadContextAccess;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.action.ActionResponse;
@@ -65,7 +66,11 @@ protected void
ActionListener listener
) {
final Supplier supplier = in().threadPool().getThreadContext().newRestorableContext(false);
- try (ThreadContext.StoredContext ignore = in().threadPool().getThreadContext().stashWithOrigin(origin)) {
+ try (
+ ThreadContext.StoredContext ignore = ThreadContextAccess.doPrivileged(
+ () -> in().threadPool().getThreadContext().stashWithOrigin(origin)
+ )
+ ) {
super.doExecute(action, request, new ContextPreservingActionListener<>(supplier, listener));
}
}
diff --git a/server/src/main/java/org/opensearch/client/support/AbstractClient.java b/server/src/main/java/org/opensearch/client/support/AbstractClient.java
index 6c6049f04231b..509cd732357d6 100644
--- a/server/src/main/java/org/opensearch/client/support/AbstractClient.java
+++ b/server/src/main/java/org/opensearch/client/support/AbstractClient.java
@@ -416,6 +416,7 @@
import org.opensearch.common.action.ActionFuture;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.ThreadContext;
+import org.opensearch.common.util.concurrent.ThreadContextAccess;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.common.bytes.BytesReference;
@@ -2148,7 +2149,9 @@ protected void
ActionListener listener
) {
ThreadContext threadContext = threadPool().getThreadContext();
- try (ThreadContext.StoredContext ctx = threadContext.stashAndMergeHeaders(headers)) {
+ try (
+ ThreadContext.StoredContext ctx = ThreadContextAccess.doPrivileged(() -> threadContext.stashAndMergeHeaders(headers))
+ ) {
super.doExecute(action, request, listener);
}
}
diff --git a/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java b/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java
index 7fa63ae8abc62..c7820c2c9a365 100644
--- a/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java
+++ b/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java
@@ -105,7 +105,8 @@ public CoordinationState(
.getLastAcceptedConfiguration();
this.publishVotes = new VoteCollection();
this.isRemoteStateEnabled = isRemoteStoreClusterStateEnabled(settings);
- this.isRemotePublicationEnabled = FeatureFlags.isEnabled(REMOTE_PUBLICATION_EXPERIMENTAL)
+ this.isRemotePublicationEnabled = isRemoteStateEnabled
+ && FeatureFlags.isEnabled(REMOTE_PUBLICATION_EXPERIMENTAL)
&& localNode.isRemoteStatePublicationEnabled();
}
diff --git a/server/src/main/java/org/opensearch/cluster/metadata/ComposableIndexTemplate.java b/server/src/main/java/org/opensearch/cluster/metadata/ComposableIndexTemplate.java
index 594dda83c41e2..63bbe4144c4fb 100644
--- a/server/src/main/java/org/opensearch/cluster/metadata/ComposableIndexTemplate.java
+++ b/server/src/main/java/org/opensearch/cluster/metadata/ComposableIndexTemplate.java
@@ -184,7 +184,7 @@ public ComposableIndexTemplate(StreamInput in) throws IOException {
this.version = in.readOptionalVLong();
this.metadata = in.readMap();
this.dataStreamTemplate = in.readOptionalWriteable(DataStreamTemplate::new);
- if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
+ if (in.getVersion().onOrAfter(Version.V_2_16_0)) {
this.context = in.readOptionalWriteable(Context::new);
} else {
this.context = null;
@@ -248,7 +248,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalVLong(this.version);
out.writeMap(this.metadata);
out.writeOptionalWriteable(dataStreamTemplate);
- if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
+ if (out.getVersion().onOrAfter(Version.V_2_16_0)) {
out.writeOptionalWriteable(context);
}
}
diff --git a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java
index 9e7fe23f29872..df0d2609ad83d 100644
--- a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java
+++ b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java
@@ -43,6 +43,7 @@
import org.opensearch.cluster.block.ClusterBlockLevel;
import org.opensearch.cluster.node.DiscoveryNodeFilters;
import org.opensearch.cluster.routing.allocation.IndexMetadataUpdater;
+import org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
import org.opensearch.common.Nullable;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.collect.MapBuilder;
@@ -686,6 +687,8 @@ public static APIBlock readFrom(StreamInput input) throws IOException {
private final boolean isSystem;
private final boolean isRemoteSnapshot;
+ private final int indexTotalShardsPerNodeLimit;
+
private IndexMetadata(
final Index index,
final long version,
@@ -711,7 +714,8 @@ private IndexMetadata(
final int routingPartitionSize,
final ActiveShardCount waitForActiveShards,
final Map rolloverInfos,
- final boolean isSystem
+ final boolean isSystem,
+ final int indexTotalShardsPerNodeLimit
) {
this.index = index;
@@ -746,6 +750,7 @@ private IndexMetadata(
this.rolloverInfos = Collections.unmodifiableMap(rolloverInfos);
this.isSystem = isSystem;
this.isRemoteSnapshot = IndexModule.Type.REMOTE_SNAPSHOT.match(this.settings);
+ this.indexTotalShardsPerNodeLimit = indexTotalShardsPerNodeLimit;
assert numberOfShards * routingFactor == routingNumShards : routingNumShards + " must be a multiple of " + numberOfShards;
}
@@ -899,6 +904,10 @@ public Set inSyncAllocationIds(int shardId) {
return inSyncAllocationIds.get(shardId);
}
+ public int getIndexTotalShardsPerNodeLimit() {
+ return this.indexTotalShardsPerNodeLimit;
+ }
+
@Nullable
public DiscoveryNodeFilters requireFilters() {
return requireFilters;
@@ -1583,6 +1592,8 @@ public IndexMetadata build() {
);
}
+ final int indexTotalShardsPerNodeLimit = ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING.get(settings);
+
final String uuid = settings.get(SETTING_INDEX_UUID, INDEX_UUID_NA_VALUE);
return new IndexMetadata(
@@ -1610,7 +1621,8 @@ public IndexMetadata build() {
routingPartitionSize,
waitForActiveShards,
rolloverInfos,
- isSystem
+ isSystem,
+ indexTotalShardsPerNodeLimit
);
}
diff --git a/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java b/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java
index 440b9e267cf0a..6163fd624c838 100644
--- a/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java
+++ b/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java
@@ -853,6 +853,12 @@ public Map views() {
return Optional.ofNullable((ViewMetadata) this.custom(ViewMetadata.TYPE)).map(ViewMetadata::views).orElse(Collections.emptyMap());
}
+ public Map queryGroups() {
+ return Optional.ofNullable((QueryGroupMetadata) this.custom(QueryGroupMetadata.TYPE))
+ .map(QueryGroupMetadata::queryGroups)
+ .orElse(Collections.emptyMap());
+ }
+
public DecommissionAttributeMetadata decommissionAttributeMetadata() {
return custom(DecommissionAttributeMetadata.TYPE);
}
@@ -1391,6 +1397,13 @@ public Builder put(final QueryGroup queryGroup) {
return queryGroups(existing);
}
+ public Builder remove(final QueryGroup queryGroup) {
+ Objects.requireNonNull(queryGroup, "queryGroup should not be null");
+ Map existing = new HashMap<>(getQueryGroups());
+ existing.remove(queryGroup.get_id());
+ return queryGroups(existing);
+ }
+
private Map